多线程是Java中一个重要的知识点。

但我们最初操作线程时,一般是通过直接new Thread或者new Runnable等方式。

这样的话就会频繁生成,销毁线程。

那么有没有能够节省资源的方法呢?

线程池就出现了。

线程池做的工作只要是控制运行的线程数量,处理过程中将任务放入队列,然后在线程创建后启动这些任务,如果线程数量超过了最大数量,超出数量的线程排队等候,等其他线程执行完毕,再从队列中取出任务来执行。

它的主要特点为:线程复用、控制最大并发数、管理线程。

这个东西看着还是挺不错的,那么它如何使用呢?

最简单的是通过Executors类进行创建。

通过Executors可以创建三种线程池。

CachedThreadPool();
FixedThreadPool();
SingleThreadExecutor();

这分别代表什么意思呢?

SingleThreadExecutor()指的是单线程的线程池,池中只有一个线程,如果有多个任务,那么需要进入阻塞队列等待,等待当前线程执行完此前的任务后继续进行。

FixedThreadPool(int 线程数量)指的是特定线程数量的线程池,根据构造方法内传入的数据,在池中创建对应的线程数量,同样如果任务超出当前线程数量,也会进入阻塞队列等待。

CachedThreadPool()更有趣一些,初始情况下线程池内没有任何线程,而是根据任务的多少,动态创建线程进行处理,如果说动态创建的线程60秒内无人使用,线程将会被销毁。

但是开发中使用这些线程池存在着很大的隐患。

为什么呢?我们从线程池的几大参数出发来研究。

打开ThreadPoolExecutor类的构造方法我们可以看见,构造时有如下七个参数。

public ThreadPoolExecutor(int corePoolSize,
                            int maximumPoolSize,
                            long keepAliveTime,
                            TimeUnit unit,
                            BlockingQueue<Runnable> workQueue,
                            ThreadFactory threadFactory,
                            RejectedExecutionHandler handler) {
      if (corePoolSize < 0 ||
          maximumPoolSize <= 0 ||
          maximumPoolSize < corePoolSize ||
          keepAliveTime < 0)
          throw new IllegalArgumentException();
      if (workQueue == null || threadFactory == null || handler == null)
          throw new NullPointerException();
      this.acc = System.getSecurityManager() == null ?
              null :
              AccessController.getContext();
      this.corePoolSize = corePoolSize;
      this.maximumPoolSize = maximumPoolSize;
      this.workQueue = workQueue;
      this.keepAliveTime = unit.toNanos(keepAliveTime);
      this.threadFactory = threadFactory;
      this.handler = handler;
  }

1、corePoolSize:线程池中的常驻核心线程数

2、maximumPoolSize:线程池中能够容纳同时
执行的最大线程数,此值必须大于等于1

3、keepAliveTime:多余的空闲线程的存活时间
当前池中线程数量超过corePoolSize时,当空闲时间
达到keepAliveTime时,多余线程会被销毁直到
只剩下corePoolSize个线程为止

4、unit:keepAliveTime的单位

5、workQueue:任务队列,被提交但尚未被执行的任务

6、threadFactory:表示生成线程池中工作线程的线程工厂,
用于创建线程。

7、handler:拒绝策略,表示当队列满了,并且工作线程大于
等于线程池的最大线程数(maximumPoolSize)时如何来拒绝
请求执行的runnable的策略。

但是光看这些解释恐怕还是很难理解,此处举一个例子。

很多人应该会遇到过交通违章处理罚单的情况。

一般来说,交警大队的大厅中会有一些处理窗口,违章的人们拿着驾照,领好号码,坐在大厅中等待处理。

根据来违章处理人数的多少,交警大队开启的窗口数量也会随之增减。

每个窗口就好比一个线程,前来处理违章的人就好比一个任务,一个线程同时只能处理一个任务。

假如交警队有五个窗口,平时只开放两个窗口,那同时进行的线程相当于只有两个。

那第三个进来的人,也就是任务,只能坐在大厅里等候叫号了,这个大厅充当的角色就是workQueue任务队列,这些人因为没有足够的窗口处理违章,所以被阻塞了。

但是大厅里的座位也是有上限的,因为平时大家都要上班,可能只有周末可以来处理违章。

也许到了周六,大厅突然就爆满了。假设大厅里有五个座位,这天来了十个人。

这种情况下又该如何是好?

本来是两个窗口有两个人处理违章,剩下五个在座位上等着,那还有三个人怎么办?

交警大队不是有5个窗口嘛?把那三个也打开不就好了?

那这样,最早来的两个人继续处理他们的违章,坐着等的早来的三个人去新开的窗口处理,那门外的三个人就也可以进来了。

那此时如果来了第十一个人怎么办呢?

现在所有的窗口都满了,等待的大厅也满了,这时候可能就需要交警们想办法劝他明天再来了。

那如果本来五个窗口一同应战,但是渐渐地,来处理违章的人开始减少了,那么在交警们观察了一段时间后,发现没必要再开五个窗口了,这时候交警大队多开的三个窗口的交警同志就会去休息了。

这个case可以与我们的七大参数一一对应,corePoolSize常驻核心数就如同那两个常开放的窗口一样,也就是线程池中常驻的线程,不管业务多少,它们总会存在。

maximumPoolSize也就是池中最大容纳的线程数,就相当于交警大队中总的窗口数一样,虽然池中不一定会保持着最大的线程数量,但是最大可以提供这么多条线程。

keepAliveTime也就是相当于交警判断是否关闭窗口所花的时间,如果一个线程长时间没有人使用,而且他的存在使线程池中线程数量超过了常驻核心数的话,那它就会被销毁。

unit只是keepAliveTime的单位,没什么好说的。

workQueue就相当于大厅,那些暂时没有线程去执行的任务就等在workQueue中,待到有线程执行他们时才被取出来,一般这个队列都是阻塞队列。

threadFactory相当于创建线程的工厂,这里就相当于如果交警不够了,就立刻训练交警上场处理违章。

handler就相当于当窗口和大厅都满了的时候交警处理的策略,换做线程池便是当使用中的线程数已经达到了最大并且任务队列已经满了的时候,程序处理任务的方法。

完整的工作原理便是:

1、在创建了线程池后,开始等待任务。

2、当调用execute()方法添加一个任务时,线程池会做出如下判断:

  • 如果正在运行的线程数量小于corePoolSize,那么马上创建线程运行这个任务;
  • 如果正在运行的线程数量大于或等于corePoolSize,那么将这个任务放入队列;
  • 如果这个时候队列满了且正在运行的线程数量还小于maximumPoolSize,那么创建非核心线程立刻运行这个任务;
  • 如果队列满了且正在运行的线程数量大于或等于maximumPoolSize,那么线程池会启动拒绝策略。
3、当一个线程完成任务时,它会从队列中取下一个任务来执行。

4、当一个线程空闲超过一定的时间(keepAliveTime)时,线程会判断:如果当前运行的线程数大于corePoolSize,那么这个线程就被停掉。所以线程池的所有任务完成后,它最终会收缩到corePoolSize的大小。

我们看看之前提到的三种线程池的底层代码是什么。

FixedThreadPool:

public static ExecutorService newFixedThreadPool(int nThreads) {
        return new ThreadPoolExecutor(nThreads, nThreads,
                                      0L, TimeUnit.MILLISECONDS,
                                      new LinkedBlockingQueue<Runnable>());
    }

可以看到就是在ThreadPoolExecutor类中传入了对应的参数。

FixedThreadPool就是将核心线程数和最大线程数同样设置为了传入的参数数量,同时队列使用了LinkedBlockingQueue。

SingleThreadExecutor:

public static ExecutorService newSingleThreadExecutor() {
        return new FinalizableDelegatedExecutorService
            (new ThreadPoolExecutor(1, 1,
                                    0L, TimeUnit.MILLISECONDS,
                                    new LinkedBlockingQueue<Runnable>()));
    }

SingleThreadExecutor就是将核心线程数和最大线程数同样设置为了1,同时队列使用了LinkedBlockingQueue。

CachedThreadPool:

public static ExecutorService newCachedThreadPool() {
        return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                      60L, TimeUnit.SECONDS,
                                      new SynchronousQueue<Runnable>());
    }

CachedThreadPool是将核心线程数设为了0,最大线程数设为了int的最大值,然后如果60秒不使用便自动销毁线程。

那么他们的问题也随之出现。

我们注意到CachedThreadPool的最大值是Integer.MAX_VALUE。

同时SingleThreadExecutor和FixedThreadPool使用的是LinkedBlockingQueue的默认构造方法,这个构造方法具体处理方式如下:

public LinkedBlockingQueue() {
        this(Integer.MAX_VALUE);
    }

也就是说,队列的最大容量也是Integer.MAX_VALUE。

int的最大值是2147483647,那么对于SingleThreadExecutor和FixedThreadPool来说,最大可以储存的任务数量就是2147483647个,而对CachedThreadPool来说,最多可以创建2147483647个线程。

这是很危险的,如果有大量的请求并发,内存随时有可能因为如此高的并发量导致溢出。

所以这些参数在实际开发过程中,我们只能自己定义。

那拒绝策略有哪些呢?

有以下四种:

AbortPolicy(默认):直接抛出RejectedExecutionException异常阻止系统正常运行

CallerRunsPolicy:“调用者运行”一种调节机制,该策略既不会抛弃任务,也不会抛出异常,而是将某些任务回退到调用者,从而降低新任务的流量。相当于如果我是main线程发出的任务,但是我这里已经超量了,那么就将任务还给main线程。

DiscardOldestPolicy:抛弃队列中等待最久的任务,然后把当前任务加入队列中尝试再次提交当前任务。

DiscardPolicy:该策略默默地丢弃无法处理的任务,不予任何处理也不抛出异常。如果允许任务丢失,这是最好的一种策略。

默认的三种线程池底层都采用了defaultHandler,也就是默认的AbortPolicy。

因此所有参数,都可以按照实际场景的变化,按照需求选取。