Java异步编程实战
上QQ阅读APP看书,第一时间看更新

2.2 显式使用线程池实现异步编程

2.2.1 如何显式使用线程池实现异步编程

在Java中我们可以使用线程池来实现线程复用,每当我们需要执行异步任务时,可以把任务投递到线程池里进行异步执行。我们可以修改上节的代码,使用线程池来执行异步任务,修改后代码如下:

            // 0自定义线程池
            private final static int AVALIABLE_PROCESSORS = Runtime.getRuntime().
        availableProcessors();
            private final static ThreadPoolExecutor POOL_EXECUTOR = new
        ThreadPoolExecutor(AVALIABLE_PROCESSORS,
                    AVALIABLE_PROCESSORS * 2, 1, TimeUnit.MINUTES, new
        LinkedBlockingQueue<>(5),
                    new ThreadPoolExecutor.CallerRunsPolicy());

            public static void main(String[] args) throws InterruptedException,
        ExecutionException {

                long start = System.currentTimeMillis();

                // 1.开启异步单元执行任务A
                POOL_EXECUTOR.execute(() -> {
                    try {
                        doSomethingA();

                    } catch (Exception e) {
                        e.printStackTrace();
                    }
                });
                  // 2.执行任务B
                  doSomethingB();

                  // 3.同步等待线程A运行结束
                  System.out.println(System.currentTimeMillis() - start);

                  // 4.挂起当前线程
                  Thread.currentThread().join();
              }

上面代码0创建了一个线程池,这里我们设置线程池核心线程个数为当前物理机的CPU核数,最大线程个数为当前物理机CPU核数的2倍;设置线程池阻塞队列的大小为5;需要注意的是,我们将线程池的拒绝策略设置为CallerRunsPolicy,即当线程池任务饱和,执行拒绝策略时不会丢弃新的任务,而是会使用调用线程来执行;另外我们使用了命名的线程创建工厂,以便排查问题时可以方便追溯是哪个相关业务。

创建完线程池后,代码1则把异步任务提交到了线程池内运行,而不是直接开启一个新线程来运行;这里使用线程池起到了复用线程的作用,避免了线程的频繁创建与销毁,另外对线程个数也起到了限制作用。

其实通过上面代码我们可以进一步释放main线程的负担,也就是可以把任务doSomethingB的执行也提交到线程池内进行异步执行,代码如下:

            // 0自定义线程池
            private final static int AVALIABLE_PROCESSORS = Runtime.getRuntime().
        availableProcessors();
            private final static ThreadPoolExecutor POOL_EXECUTOR = new
        ThreadPoolExecutor(AVALIABLE_PROCESSORS,
                    AVALIABLE_PROCESSORS * 2, 1, TimeUnit.MINUTES, new
        LinkedBlockingQueue<>(5),
                    new ThreadPoolExecutor.CallerRunsPolicy());

            public static void main(String[] args) throws InterruptedException,
        ExecutionException {

                long start = System.currentTimeMillis();

                // 1.开启异步单元执行任务A
                POOL_EXECUTOR.execute(() -> {
                    try {
                            doSomethingA();

                        } catch (Exception e) {
                            e.printStackTrace();
                        }
                    });

                    // 2.执行任务B
                    POOL_EXECUTOR.execute(() -> {
                        try {
                            doSomethingB();

                        } catch (Exception e) {
                            e.printStackTrace();
                        }
                    });

                    // 3.同步等待线程A运行结束
                    System.out.println(System.currentTimeMillis() - start);

                    // 4.挂起当前线程
                    Thread.currentThread().join();
                }

如上面代码所示,main函数所在线程只需要把两个任务提交到线程池后就可以做自己的事情了,具体两个任务是由线程池中的线程执行。

上面演示了向线程池内投递异步任务并没有返回值的情况,其实我们可以向线程池投递一个Callable类型的异步任务,并且获取其执行结果,代码如下:

        public class AsyncThreadPoolExample {

            public static String doSomethingA() {

                try {
                    Thread.sleep(2000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                System.out.println("--- doSomethingA---");
                return "A Task Done";
            }
            // 0自定义线程池
            private final static int AVALIABLE_PROCESSORS = Runtime.getRuntime().
        availableProcessors();
            private final static ThreadPoolExecutor POOL_EXECUTOR = new
        ThreadPoolExecutor(AVALIABLE_PROCESSORS,
                    AVALIABLE_PROCESSORS * 2, 1, TimeUnit.MINUTES, new
        LinkedBlockingQueue<>(5),
                    new NamedThreadFactory("ASYNC-POOL"), new ThreadPoolExecutor.
        CallerRunsPolicy());

            public static void main(String[] args) throws InterruptedException,
        ExecutionException {

                // 1.开启异步单元执行任务A
                Future<? > resultA = POOL_EXECUTOR.submit(() -> doSomethingA());

                // 2.同步等待执行结果
                System.out.println(resultA.get());
            }
        }

如上面代码所示,doSomethingA方法具有String类型的返回值,代码0创建了一个线程池,在main方法中,代码1使用lambda表达式将Callable类型的任务提交到线程池,提交后会马上返回一个Future对象,代码2在futureA上调用get()方法阻塞等待异步任务的执行结果。

如上代码确实可以在main函数所在线程获取到异步任务的执行结果,但是main线程必须以阻塞的代价来获取结果,在异步任务执行完毕前,main函数所在线程就不能做其他事情了,这显然不是我们所需要的,具体怎么解决这个问题,下章我们会具体讲解。

2.2.2 线程池ThreadPoolExecutor原理剖析

1.概述

线程池作为异步执行的利器,我们有必要讲解下其内部实现,以便让大家对异步编程有更深入的理解。首先我们看下其类图结构图,如图2-1所示。

图2-1 线程池类图结构

如图2-1所示,成员变量ctl是Integer的原子变量,使用一个变量同时记录线程池状态和线程池中线程个数,假设计算机硬件的Integer类型是32位二进制标示,如下面代码所示,其中高3位用来表示线程池状态,后面29位用来记录线程池线程个数:

        //用来标记线程池状态(高3位),线程个数(低29位)
        //默认是RUNNING状态,线程个数为0
        private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));

        //线程个数掩码位数,并不是所有平台int类型是32位,所以准确说是具体平台下Integer的二进制位
        数-3后的剩余位数才是线程的个数
        private static final int COUNT_BITS = Integer.SIZE -3;

        //线程最大个数(低29位)00011111111111111111111111111111
        private static final int CAPACITY    = (1 << COUNT_BITS) -1;

线程池的主要状态列举如下:

        //(高3位):11100000000000000000000000000000
        private static final int RUNNING     = -1 << COUNT_BITS;

        //(高3位):00000000000000000000000000000000
        private static final int SHUTDOWN    =   0 << COUNT_BITS;

        //(高3位):00100000000000000000000000000000
        private static final int STOP         =   1 << COUNT_BITS;
        //(高3位):01000000000000000000000000000000
        private static final int TIDYING     =   2 << COUNT_BITS;

        //(高3位):01100000000000000000000000000000
        private static final int TERMINATED =   3 << COUNT_BITS;

线程池状态含义:

● RUNNING:接收新任务并且处理阻塞队列里的任务。

● SHUTDOWN:拒绝新任务但是处理阻塞队列里的任务。

● STOP:拒绝新任务并且抛弃阻塞队列里的任务,同时中断正在处理的任务。

● TIDYING:所有任务都执行完(包含阻塞队列里面任务),当前线程池活动线程为0,将要调用terminated方法。

● TERMINATED:终止状态。terminated方法调用完成以后的状态。

线程池状态之间转换路径:

● RUNNING→SHUTDOWN:当显式调用shutdown()方法时,或者隐式调用了finalize(),它里面调用了shutdown()方法时。

● RUNNING或者SHUTDOWN→STOP:当显式调用shutdownNow()方法时。

● SHUTDOWN→TIDYING:当线程池和任务队列都为空时。

● STOP→TIDYING:当线程池为空时。

● TIDYING→TERMINATED :当terminated() hook方法执行完成时。

线程池同时提供了一些方法用来获取线程池的运行状态和线程池中的线程个数,代码如下:

        // 获取高三位 运行状态
        private static int runStateOf(int c)      { return c & ~CAPACITY; }

        //获取低29位 线程个数
        private static int workerCountOf(int c)   { return c & CAPACITY; }

        //计算ctl新值,线程状态 与 线程个数
        private static int ctlOf(int rs, int wc) { return rs | wc; }

另外线程池是可配置的,使用者可以根据自己的需要对线程池的参数进行调整,如类图中线程池提供了可供使用者配置的参数:

● corePoolSize:线程池核心线程个数。

● workQueue:用于保存等待执行的任务的阻塞队列,比如基于数组的有界Array-BlockingQueue、基于链表的无界LinkedBlockingQueue、最多只有一个元素的同步队列SynchronousQueue、优先级队列PriorityBlockingQueue等。

● maximunPoolSize:线程池最大线程数量。

● threadFactory:创建线程的工厂类。

● defaultHandler:饱和策略,当队列满了并且线程个数达到maximunPoolSize后采取的策略,比如AbortPolicy(抛出异常)、CallerRunsPolicy(使用调用者所在线程来运行任务)、DiscardOldestPolicy(调用poll丢弃一个任务,执行当前任务)、DiscardPolicy(默默丢弃,不抛出异常)。

● keeyAliveTime:存活时间。如果当前线程池中的线程数量比核心线程数量要多,并且是闲置状态的话,这些闲置的线程能存活的最大时间。

前文图2-1中变量mainLock是独占锁,用来控制新增Worker线程时的原子性,termination是该锁对应的条件队列,在线程调用awaitTermination时用来存放阻塞的线程。

Worker继承AQS和Runnable接口,是具体承载任务的对象。Worker继承了AQS,实现了简单不可重入独占锁,其中state=0标示锁未被获取的状态,state=1标示锁已经被获取的状态,state = -1是创建Worker时默认的状态。创建时状态设置为-1是为了避免该线程在运行runWorker()方法前被中断,下面会具体讲解到。其中变量firstTask记录该工作线程执行的第一个任务,Thread是具体执行任务的线程。

DefaultThreadFactory是线程工厂,newThread方法是对线程的一个修饰。其中,poolNumber是个静态的原子变量,用来统计线程工厂的个数,threadNumber用来记录每个线程工厂创建了多少线程,这两个值也作为线程池和线程的名称的一部分。

ThreadPoolExecutor提供了一系列构造函数让我们创建线程池,比如:

        ThreadPoolExecutor(int corePoolSize, //核心线程个数
                            int maximumPoolSize, //最大线程个数
                            long keepAliveTime, //非核心不活跃线程最大存活时间
                            TimeUnit unit, //keepAliveTime的单位
                            BlockingQueue<Runnable> workQueue, //阻塞队列类型
                            ThreadFactory threadFactory, //线程池创建工厂
                            RejectedExecutionHandler handler)//拒绝策略

则当我们需要创建自己的线程池时,就可以显式地新建一个该实例出来。

2.提交任务到线程池原理解析

ThreadPoolExecutor中提交任务到线程池的方法有下面几种,如表2-1所示。

表2-1 提交任务到线程池的方法

首先我们看方法public void execute(Runnable command)提交任务到线程池的逻辑:

        public void execute(Runnable command) {

            //(1) 如果任务为null,则抛出NPE异常
            if (command == null)
                throw new NullPointerException();

            //(2)获取当前线程池的状态+线程个数变量的组合值
            int c = ctl.get();

            //(3)当前线程池线程个数是否小于corePoolSize,小于则开启新线程运行
            if (workerCountOf(c) < corePoolSize) {
                if (addWorker(command, true))
                    return;
                c = ctl.get();
            }
        //(4)如果线程池处于RUNNING状态,则添加任务到阻塞队列
        if (isRunning(c) && workQueue.offer(command)) {

            //(4.1)二次检查
            int recheck = ctl.get();
            //(4.2)如果当前线程池状态不是RUNNING则从队列删除任务,并执行拒绝策略
            if (! isRunning(recheck) && remove(command))
                reject(command);

            //(4.3)如果当前线程池线程为空,则添加一个线程
            else if (workerCountOf(recheck) == 0)
                addWorker(null, false);
        }
        //(5)如果队列满了,则新增线程,新增失败则执行拒绝策略
        else if (! addWorker(command, false))
            reject(command);
    }

● 代码3是指如果当前线程池线程个数小于corePoolSize,则会在调用方法addWorker新增一个核心线程执行该任务。

● 如果当前线程池线程个数大于等于corePoolSize则执行代码4,如果当前线程池处于RUNNING状态则添加当前任务到任务队列。这里需要判断线程池状态是因为线程池有可能已经处于非RUNNING状态,而非RUNNING状态下是抛弃新任务的。

● 如果任务添加任务队列成功,则执行代码4.2对线程池状态进行二次校验,这是因为添加任务到任务队列后,执行代码4.2前线程池的状态有可能已经变化了,如果当前线程池状态不是RUNNING则把任务从任务队列移除,移除后执行拒绝策略;如果二次校验通过,则执行代码4.3重新判断当前线程池里面是否还有线程,如果没有则新增一个线程。

● 如果代码4添加任务失败,则说明任务队列满了,那么执行代码5尝试调用addWorker方法新开启线程来执行该任务;如果当前线程池的线程个数大于maximumPoolSize则addWorker返回false,执行配置的拒绝策略。

下面我们来看public Future<? > submit(Runnable task)方法提交任务的逻辑:

    public Future<? > submit(Runnable task) {
        // 6 NPE判断
              if (task == null) throw new NullPointerException();
              // 7 包装任务为FutureTask
              RunnableFuture<Void> ftask = newTaskFor(task, null);
              // 8 投递到线程池执行
              execute(ftask);
              // 9 返回ftask
              return ftask;
          }

代码7调用newTaskFor方法对我们提交的Runnable类型任务进行包装,包装为RunnableFuture类型任务,然后提交RunnableFuture任务到线程池后返回ftask对象。

下面我们来看newTaskFor的代码逻辑:

        protected <T> RunnableFuture<T> newTaskFor(Runnable runnable, T value) {
            return new FutureTask<T>(runnable, value);
        }

由代码可知,其内部创建了一个FutureTask对象,构造函数如下:

              public FutureTask(Runnable runnable, V result) {
                  //将runnable适配为Callable类型任务,并且让result作为执行结果
                  this.callable = Executors.callable(runnable, result);
                  this.state = NEW;         // ensure visibility of callable
              }

上述代码中的FutureTask会在运行时执行给定的Runnable,并将在任务Runnable执行完成后,把给定的结果value通过FutureTask的get方法返回。

下面我们看public Future submit(Runnable task, T result)方法的逻辑,其代码如下:

        public <T> Future<T> submit(Callable<T> task) {
            if (task == null) throw new NullPointerException();
            RunnableFuture<T> ftask = newTaskFor(task);
            execute(ftask);
            return ftask;
        }

        protected <T> RunnableFuture<T> newTaskFor(Callable<T> callable) {
            return new FutureTask<T>(callable);
        }

由上述代码可知,两个参数的submit方法类似,不同在于该方法接收的是含有返回值的Callable类型的任务,最终也是转换为FutureTask后提交到线程池,并返回。

3.线程池中任务执行原理解析

当用户线程提交任务到线程池后,在线程池没有执行拒绝策略的情况下,用户线程会马上返回,而提交的任务要么直接切换到线程池中的Worker线程来执行,要么先放入线程池的阻塞队列里面,稍后再由Worker线程来执行。本节我们就看下具体执行异步任务的Worker线程是如何工作的。首先我们看下Worker的构造函数:

        Worker(Runnable firstTask) {
            setState(-1); // 在调用runWorker前禁止中断
            this.firstTask = firstTask;
            this.thread = getThreadFactory().newThread(this); //创建一个线程
        }

在上述代码中,Worker构造函数内首先设置Worker的运行状态status为-1,是为了避免当前Worker在调用runWorker方法前被中断(当其他线程调用了线程池的shutdownNow时,如果Worker状态≥0则会中断该线程)。在前面的小节中我们讲到Worker继承了AbstractQueuedSynchronizer类,实现了简单不可重入独占锁,其中status=0标示锁未被获取的状态,state=1标示锁已经被获取的状态,state = -1是创建Worker时默认的状态。然后把传递的任务firstTask保存起来,最后使用线程池中指定的线程池工厂创建一个线程作为该Worker对象的执行线程。

由于Worker本身实现了Runnable方法,所以下面我们看其run方法内是如何执行任务的:

                  public void run() {
                      runWorker(this); //委托给runWorker方法
                  }

runWorker方法的代码如下:

        final void runWorker(Worker w) {
                Thread wt = Thread.currentThread();
                Runnable task = w.firstTask;
                  w.firstTask = null;
                  w.unlock(); //(1)status设置为0,允许中断
                  boolean completedAbruptly = true;
                  try {
                      //(2)
                      while (task ! = null || (task = getTask()) ! = null) {

                            //(2.1)
                          w.lock();
                          ...
                          try {
                              //(2.2)任务执行前干一些事情
                              beforeExecute(wt, task);
                              Throwable thrown = null;
                              try {
                                  task.run(); //(2.3)执行任务
                              } catch (RuntimeException x) {
                                  thrown = x; throw x;
                              } catch (Error x) {
                                  thrown = x; throw x;
                              } catch (Throwable x) {
                                  thrown = x; throw new Error(x);
                              } finally {
                                  //(2.4)任务执行完毕后干一些事情
                                  afterExecute(task, thrown);
                              }
                          } finally {
                              task = null;
                              //(2.5)统计当前Worker完成了多少个任务
                              w.completedTasks++;
                              w.unlock();
                          }
                      }
                      completedAbruptly = false;
                  } finally {
                      //(3)执行清工作
                      processWorkerExit(w, completedAbruptly);
                  }
              }

如上代码在运行runWorker的代码1时会调用unlock方法,该方法把status变为了0,所以这时候调用shutdownNow会中断Worker线程。

如代码2所示,如果当前task==null或者调用getTask从任务队列获取的任务返回null,则跳转到代码3执行清理工作,当前Worker也就退出执行了。如果task不为null则执行代码2.1获取工作线程内部持有的独占锁,然后执行扩展接口代码2.2,代码2.3具体执行任务,代码2.4在任务执行完毕后做一些事情,代码2.5统计当前Worker完成了多少个任务,并释放锁。

这里在执行具体任务期间加锁,是为了避免任务运行期间,其他线程调用了shutdown方法关闭线程池时中断正在执行任务的线程。

代码3执行清理任务,其代码如下:

        private void processWorkerExit(Worker w, boolean completedAbruptly) {
              ...

            //(3.1)统计整个线程池完成的任务个数,并从工作集里面删除当前woker
            final ReentrantLock mainLock = this.mainLock;
            mainLock.lock();
            try {
                completedTaskCount += w.completedTasks;
                workers.remove(w);
            } finally {
                mainLock.unlock();
            }

            //(3.2)尝试设置线程池状态为TERMINATED,如果当前是shutdonw状态并且工作队列为空
            //或者当前是stop状态且当前线程池里面没有活动线程
            tryTerminate();

            //(3.3)如果当前线程个数小于核心个数,则增加
            int c = ctl.get();
            if (runStateLessThan(c, STOP)) {
                if (! completedAbruptly) {
                    int min = allowCoreThreadTimeOut ? 0 : corePoolSize;
                    if (min == 0 && ! workQueue.isEmpty())
                        min = 1;
                    if (workerCountOf(c) >= min)
                        return; // replacement not needed
                }
                addWorker(null, false);
            }
        }

代码3.1统计线程池完成任务个数,可知在统计前加了全局锁,把当前工作线程中完成的任务累加到全局计数器,然后从工作集中删除当前Worker。

代码3.2判断如果当前线程池状态是shutdown状态并且工作队列为空,或者当前是stop状态并且当前线程池里面没有活动线程,则设置线程池状态为TERMINATED。

代码3.3判断当前线程中的线程个数是否小于核心线程个数,如果是则新增一个线程。

4.关闭线程池原理解析

线程池中有两种模式的线程池关闭方法,如表2-2所示。

表2-2 关闭线程池的方法

首先我们来看public void shutdown()方法的代码逻辑:

        public void shutdown() {
            final ReentrantLock mainLock = this.mainLock;
            mainLock.lock();
            try {
                //(1)权限检查
                checkShutdownAccess();

                //(2)设置当前线程池状态为SHUTDOWN,如果已经是SHUTDOWN则直接返回
                advanceRunState(SHUTDOWN);

                //(3)设置中断标志
                interruptIdleWorkers();
                onShutdown();
            } finally {
                  mainLock.unlock();
              }
              //(4)尝试状态变为TERMINATED
              tryTerminate();
          }

代码1检查如果设置了安全管理器,则看当前调用shutdown命令的线程是否有关闭线程的权限,如果有权限则还要看调用线程是否有中断工作线程的权限,如果没有权限则抛出SecurityException或者NullPointerException异常。

代码2的内容如下,如果当前状态>=SHUTDOWN则直接返回,否则设置当前状态为SHUTDOWN:

        private void advanceRunState(int targetState) {
            for (; ; ) {
                int c = ctl.get();
                if (runStateAtLeast(c, targetState) ||
                    ctl.compareAndSet(c, ctlOf(targetState, workerCountOf(c))))
                    break;
            }
        }

代码3的内容如下,设置所有空闲线程的中断标志,这里首先加了全局锁,同时只有一个线程可以调用shutdown设置中断标志。然后尝试获取Worker本身的锁,获取成功则设置中断标识,由于正在执行的任务已经获取了锁,所以正在执行的任务没有被中断。这里中断的是阻塞到getTask()方法,企图从队列里获取任务的线程,也就是空闲线程。

        private void interruptIdleWorkers(boolean onlyOne) {
            final ReentrantLock mainLock = this.mainLock;
            mainLock.lock();
            try {
                for (Worker w : workers) {
                    Thread t = w.thread;
                    //如果工作线程没有被中断,并且没有正在运行则设置中断
                    if (! t.isInterrupted() && w.tryLock()) {
                        try {
                            t.interrupt();
                        } catch (SecurityException ignore) {
        } finally {
            w.unlock();
        }
    }
    if (onlyOne)
        break;
}
} finally {
mainLock.unlock();
}
}

代码4尝试将线程池的状态变为TERMINATED, tryTerminate的代码如下:

        final void tryTerminate() {
                for (; ; ) {
                    ...
                    int c = ctl.get();
                    ...

                    final ReentrantLock mainLock = this.mainLock;
                    mainLock.lock();
                    try {//设置当前线程池状态为TIDYING
                        if (ctl.compareAndSet(c, ctlOf(TIDYING, 0))) {
                            try {
                                  terminated();
                            } finally {
                                  //设置当前线程池状态为TERMINATED
                                  ctl.set(ctlOf(TERMINATED, 0));
                                  //激活调用条件变量termination的await系列方法被阻塞的所有线程
                                  termination.signalAll();
                            }
                            return;
                        }
                    } finally {
                        mainLock.unlock();
                    }
                }
            }

如上述代码所示,首先使用CAS设置当前线程池状态为TIDYING,如果成功则执行扩展接口terminated在线程池状态变为TERMINATED前做一些事情,然后设置当前线程池状态为TERMINATED,最后调用termination.signalAll()来激活调用线程池的awaitTermination系列方法被阻塞的所有线程。

下面我们来看public void shutdownNow()方法的代码逻辑:

        public List<Runnable> shutdownNow() {

            List<Runnable> tasks;
            final ReentrantLock mainLock = this.mainLock;
            mainLock.lock();
            try {
                checkShutdownAccess(); //(5)权限检查
                advanceRunState(STOP); //(6) 设置线程池状态为stop
                interruptWorkers(); //(7)中断所有线程
                tasks = drainQueue(); //(8)移动队列任务到tasks
            } finally {
                mainLock.unlock();
            }
            //(9)终止状态
            tryTerminate();
            return tasks;
        }

首先调用代码5检查权限,然后调用代码6设置当前线程池状态为STOP,接着执行代码7中断所有的工作线程,这里需要注意的是中断所有线程,包含空闲线程和正在执行任务的线程:

              private void interruptWorkers() {
                  final ReentrantLock mainLock = this.mainLock;
                  mainLock.lock();
                  try {
                      for (Worker w : workers)
                          w.interruptIfStarted();
                  } finally {
                      mainLock.unlock();
                  }
              }

然后调用代码8将当前任务队列的任务移动到tasks列表,代码如下:

            private List<Runnable> drainQueue() {
              //8.1获取任务队列
              BlockingQueue<Runnable> q = workQueue;
              ArrayList<Runnable> taskList = new ArrayList<Runnable>();
              //8.2 从任务队列移除任务到taskList列表
              q.drainTo(taskList);
              //8.3 如果q还不为空,则说明drainTo接口调用失效,则循环移除
              if (! q.isEmpty()) {
                  for (Runnable r : q.toArray(new Runnable[0])) {
                      if (q.remove(r))
                          taskList.add(r);
                  }
              }
              //8.4返回异常的任务列表
              return taskList;
          }

由上述代码可知,调用线程池队列的drainTo方法把队列中的任务移除到taskList里,如果发现线程池队列还不为空(比如DelayQueue或者其他类型的队列drainTo可能移除元素失败),则循环移除里面的元素,最后返回移除的任务列表。

5.线程池的拒绝策略解析

线程池是通过池化少量线程来提供线程复用的,当调用线程向线程池中投递大量任务后,线程池可能就处于饱和状态了。所谓饱和状态是指当前线程池队列已经满了,并且线程池中的线程已经达到了最大线程个数。当线程池处于饱和状态时,再向线程池投递任务,而对于投递的任务如何处理,是由线程池拒绝策略决定的。拒绝策略的执行是在execute方法,大家可以返回前面章节查看。

线程池中提供了RejectedExecutionHandler接口,用来提供对线程池拒绝策略的抽象,其定义如下:

        public interface RejectedExecutionHandler {
            void rejectedExecution(Runnable r, ThreadPoolExecutor executor);
        }

线程池中提供了一系列该接口的实现供我们使用,如表2-3所示。

表2-3 线程池提供的拒绝策略

首先我们看下AbortPolicy策略的代码:

        public static class AbortPolicy implements RejectedExecutionHandler {
            public AbortPolicy() { }
            /**
              * 抛出RejectedExecutionException.
              *
              * @param r the runnable task requested to be executed
              * @param e the executor attempting to execute this task
              * @throws RejectedExecutionException always
              */
            public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
                throw new RejectedExecutionException("Task " + r.toString() +
                                                      " rejected from " +
                                                      e.toString());
            }
        }

由上述代码可知,该拒绝策略执行时会直接向调用线程抛出RejectedExecutionException异常,并丢失提交的任务r。

然后我们看下CallerRunsPolicy策略的代码:

        public static class CallerRunsPolicy implements RejectedExecutionHandler {

            public CallerRunsPolicy() { }

            /**
              * 使用调用线程执行任务r
              *
              * @param r the runnable task requested to be executed
              * @param e the executor attempting to execute this task
              */
            public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
                if (! e.isShutdown()) {
                        r.run();
                    }
                }
            }

分析上述代码,该拒绝策略执行时,如果线程池没有被关闭,则会直接使用调用线程执行提交的任务r,否则默默丢弃该任务。

然后我们看下DiscardPolicy策略的代码:

        public static class DiscardPolicy implements RejectedExecutionHandler {

            public DiscardPolicy() { }

            /**
              * 什么都不做,默默丢弃提交的任务
              *
              * @param r the runnable task requested to be executed
              * @param e the executor attempting to execute this task
              */
            public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
            }
        }

该拒绝策略执行时,什么都不做,默默丢弃提交的任务。

最后我们看下DiscardOldestPolicy策略的代码:

              public static class DiscardOldestPolicy implements RejectedExecutionHandler {

                  public DiscardOldestPolicy() { }

                  /**
                    * 丢弃线程池队列里面最老的任务,并把当前任务提交到线程池
                    * @param r the runnable task requested to be executed
                    * @param e the executor attempting to execute this task
                    */
                  public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
                      if (! e.isShutdown()) {
                          e.getQueue().poll(); //移除队首元素
                          e.execute(r); //提交任务r到线程池执行
                      }
                  }
              }

该拒绝策略首先会丢弃线程池队列里面最老的任务,然后把当前任务r提交到线程池。