ThreadPool

创建

我们以下列代码为例:

1public static ExecutorService newFixedThreadPool(int nThreads) {
2    return new ThreadPoolExecutor(nThreads, nThreads,
3        0L, TimeUnit.MILLISECONDS,
4        new LinkedBlockingQueue<Runnable>());
5}

可见默认使用LinkedBlockingQueue作为工作队列,其构造器:

1public LinkedBlockingQueue() {
2    this(Integer.MAX_VALUE);
3}

可见,这其实是一个有界队列,虽然大小为int最大值。

ThreadPoolExecutor便是JDK线程池的核心了,类图:

ThreadPoolExecutor

ThreadPoolExecutor构造器:

1public ThreadPoolExecutor(int corePoolSize,
2                          int maximumPoolSize,
3                          long keepAliveTime,
4                          TimeUnit unit,
5                          BlockingQueue<Runnable> workQueue) {
6    this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
7         Executors.defaultThreadFactory(), defaultHandler);
8}

线程工厂

ThreadFactory

默认的线程工厂是Executors的内部类,核心的newThread方法:

1public Thread newThread(Runnable r) {
2    Thread t = new Thread(group, r, namePrefix + threadNumber.getAndIncrement(), 0);
3    if (t.isDaemon())
4        t.setDaemon(false);
5    if (t.getPriority() != Thread.NORM_PRIORITY)
6        t.setPriority(Thread.NORM_PRIORITY);
7    return t;
8}

namePrefix定义:

1namePrefix = "pool-" + poolNumber.getAndIncrement() + "-thread-";

这便是为线程池默认创建的线程起名的地方了,Thread构造器的最后一个0为stackSize,0表示忽略此参数。

拒绝策略

从上面可以看出,线程池默认使用有界队列,所以当队列满的时候就需要考虑如何处理这种情况。

RejectedExecutionHandler

线程池默认采用的是:

1private static final RejectedExecutionHandler defaultHandler = new AbortPolicy();

即抛出异常,线程池退出:

1public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
2    throw new RejectedExecutionException("Task " + r.toString() + " rejected from " + e.toString());
3}

execute

 1public void execute(Runnable command) {
 2    if (command == null)
 3        throw new NullPointerException();
 4    int c = ctl.get();
 5    //corePoolSize为volatile,下面会提到为什么
 6    if (workerCountOf(c) < corePoolSize) {
 7        if (addWorker(command, true))
 8            //创建新线程成功,交由其执行
 9            return;
10        c = ctl.get();
11    }
12    if (isRunning(c) && workQueue.offer(command)) {
13        int recheck = ctl.get();
14        if (! isRunning(recheck) && remove(command))
15            reject(command);
16        else if (workerCountOf(recheck) == 0)
17            addWorker(null, false);
18    }
19    else if (!addWorker(command, false))
20        reject(command);
21}

控制变量

ctl是线程池的核心控制变量:

1private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));

有以下两个用途:

  • 高3位标志线程池的运行状态,比如运行、关闭。
  • 低29位存储当前工作线程的个数,所以一个线程池最多可以创建2 ^ 29 - 1(约为5亿)个线程

线程创建

当我们调用execute方法时,线程池将首先检查当前线程数是否已达到上限,如果没有创建新的工作线程,而不是入队。

 1private boolean addWorker(Runnable firstTask, boolean core) {
 2    retry:
 3    for (;;) {
 4        int c = ctl.get();
 5        int rs = runStateOf(c);
 6        // 检查线程池状态,如果已关闭,返回false
 7        if (rs >= SHUTDOWN && ! (rs == SHUTDOWN && firstTask == null && ! workQueue.isEmpty()))
 8            return false;
 9        for (;;) {
10            int wc = workerCountOf(c);
11            //检查是否达到上限
12            if (wc >= CAPACITY || wc >= (core ? corePoolSize : maximumPoolSize))
13                return false;
14            //如果CAS增加线程数成功,中断循环 ,进行线程创建
15            if (compareAndIncrementWorkerCount(c))
16                break retry;
17            c = ctl.get();  // Re-read ctl
18            if (runStateOf(c) != rs)
19                continue retry;
20            // else CAS failed due to workerCount change; retry inner loop
21        }
22    }
23
24    boolean workerStarted = false;
25    boolean workerAdded = false;
26    Worker w = null;
27    try {
28        w = new Worker(firstTask);
29        final Thread t = w.thread;
30        if (t != null) {
31            final ReentrantLock mainLock = this.mainLock;
32            //shutdown等方法也需要加锁,所以可以保证线程安全
33            mainLock.lock();
34            try {
35                //再次检查状态
36                int rs = runStateOf(ctl.get());
37                if (rs < SHUTDOWN || (rs == SHUTDOWN && firstTask == null)) {
38                    if (t.isAlive()) // precheck that t is startable
39                        throw new IllegalThreadStateException();
40                    //workers是一个HashSet
41                    workers.add(w);
42                    int s = workers.size();
43                    //用于记录出现过的最大线程数
44                    if (s > largestPoolSize)
45                        largestPoolSize = s;
46                    workerAdded = true;
47                }
48            } finally {
49                mainLock.unlock();
50            }
51            if (workerAdded) {
52                t.start();
53                workerStarted = true;
54            }
55        }
56    } finally {
57        if (! workerStarted)
58            addWorkerFailed(w);
59    }
60    return workerStarted;
61}

核心 vs 最大线程数

注意execute方法中的细节,第一次addWorker调用的core参数为true,即表示已corePoolSize为上限,后两次为false。这就说明了execute方法执行时遵从一下顺序进行尝试:

  • 如果当前线程数小于corePoolSize,那么增加线程。
  • 尝试加入队列。
  • 如果入队失败那么尝试将线程数增加至maximumPoolSize。
  • 如果还是失败,那么交给RejectedExecutionHandler。

Worker

这里的"线程(即Worker)"其实是ThreadPoolExecutor的内部类。

Worker

又见AQS。构造器:

1Worker(Runnable firstTask) {
2    setState(-1); // inhibit interrupts until runWorker
3    this.firstTask = firstTask;
4    this.thread = getThreadFactory().newThread(this);
5}

其run方法的真正逻辑由ThreadPoolExecutor.runWorker实现:

 1final void runWorker(Worker w) {
 2    Thread wt = Thread.currentThread();
 3    Runnable task = w.firstTask;
 4    w.firstTask = null;
 5    boolean completedAbruptly = true;
 6    try {
 7        while (task != null || (task = getTask()) != null) {
 8            w.lock();
 9            //中断状态
10            if ((runStateAtLeast(ctl.get(), STOP) || (Thread.interrupted() &&
11                 runStateAtLeast(ctl.get(), STOP))) && !wt.isInterrupted())
12                wt.interrupt();
13            try {
14                task.run();
15            } finally {
16                task = null;
17                w.completedTasks++;
18                w.unlock();
19            }
20        }
21        completedAbruptly = false;
22    } finally {
23        processWorkerExit(w, completedAbruptly);
24    }
25}

可以看出,一次任务的执行是在所在Worker的锁的保护下进行的,结合后面shutdownNow的源码可以发现,shutdownNow中断Worker的前提是获得锁,这就很好的体现了shutdownNow的语义: 阻止新任务的提交,等待所有已有任务执行完毕。

中断状态

这里有两种意义 :

  • 如果线程池处于STOP(或之后)的状态,即shutdownNow方法已被调用,那么此处代码将确保线程的中断标志位一定被设置。
  • 如果线程池处于STOP之前的状态,比如SHUTDOWN或RUNNING,那么Worker不应响应中断,即应当清除中断标志,但是暂时没有想到谁会设置Worker线程的中断标志位,难道是我们的业务代码?

在这里扒一扒到底什么是线程中断:

 1public void interrupt() {
 2    synchronized (blockerLock) {
 3        Interruptible b = blocker;
 4        if (b != null) {
 5            interrupt0();           // Just to set the interrupt flag
 6            b.interrupt(this);
 7            return;
 8        }
 9    }
10    interrupt0();
11}

blocker在nio部分已经见过了,interrupt0的最终native实现位于openjdk\hotspot\src\os\solaris\vm\os_solaris.cpp(Linux):

 1void os::interrupt(Thread* thread) {
 2  OSThread* osthread = thread->osthread();
 3  int isInterrupted = osthread->interrupted();
 4  if (!isInterrupted) {
 5      //设置标志位
 6      osthread->set_interrupted(true);
 7      OrderAccess::fence();
 8      //唤醒sleep()?
 9      ParkEvent * const slp = thread->_SleepEvent ;
10      if (slp != NULL) slp->unpark() ;
11  }
12  //唤醒LockSupport.park()?
13  if (thread->is_Java_thread()) {
14    ((JavaThread*)thread)->parker()->unpark();
15  }
16  //唤醒Object.wait()?
17  ParkEvent * const ev = thread->_ParkEvent ;
18  if (ev != NULL) ev->unpark() ;
19  // When events are used everywhere for os::sleep, then this thr_kill
20  // will only be needed if UseVMInterruptibleIO is true.
21  if (!isInterrupted) {
22    int status = thr_kill(osthread->thread_id(), os::Solaris::SIGinterrupt());
23    assert_status(status == 0, status, "thr_kill");
24    // Bump thread interruption counter
25    RuntimeService::record_thread_interrupt_signaled_count();
26  }
27}

与java里已知的可被中断的阻塞大体可以找到对应关系。

任务获取

 1private Runnable getTask() {
 2    boolean timedOut = false; // Did the last poll() time out?
 3    for (;;) {
 4        int c = ctl.get();
 5        int rs = runStateOf(c);
 6        // 线程池已经关闭且队列中没有剩余的任务,退出
 7        if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
 8            decrementWorkerCount();
 9            return null;
10        }
11        int wc = workerCountOf(c);
12        // 如果启用了超时并且已经超时且队列中没有任务,线程退出
13        boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
14        if ((wc > maximumPoolSize || (timed && timedOut)) && (wc > 1 || workQueue.isEmpty())) {
15            if (compareAndDecrementWorkerCount(c))
16                return null;
17            continue;
18        }
19        try {
20            Runnable r = timed ? workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) : workQueue.take();
21            if (r != null)
22                return r;
23            timedOut = true;
24        } catch (InterruptedException retry) {
25            //如果被中断不是马上退出,而是在下一次循环中检查线程池状态
26            timedOut = false;
27        }
28    }
29}

结合runWorker方法可以发现,如果getTask返回null,那么即说明当前Worker线程应该退出。

超时

allowCoreThreadTimeOut定义如下:

1private volatile boolean allowCoreThreadTimeOut;

默认为false,如果开启,Worker不会无限期等待任务,而是超时之后便退出。我们可以通过allowCoreThreadTimeOut方法进行设置:

1public void allowCoreThreadTimeOut(boolean value) {
2    if (value && keepAliveTime <= 0)
3        throw new IllegalArgumentException("Core threads must have nonzero keep alive times");
4    if (value != allowCoreThreadTimeOut) {
5        allowCoreThreadTimeOut = value;
6        if (value)
7            interruptIdleWorkers();
8    }
9}

注意同时需传入一个大于零的keepAliveTime。所以受这两个参数的影响,当没有任务执行时线程数并不一定等于corePoolSize。

超额线程回收

这里的超额指超出corePoolSize的线程,源码中有一处隐蔽的细节:

1boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;

当线程数大于corePoolSize时timed也为true,再结合下面的条件判断可以得出结论: 当线程池当前的线程数超过corePoolSize且队列为空且corePoolSize不为0(0是被允许的),超出的线程会退出

这一点可以使用测试代码test.Test的maxPoolSize方法进行验证。

另一种减少线程数的方法就是调用setCorePoolSize或setMaximumPoolSize重设线程池相关参数。

退出

Worker在退出时将触发processWorkerExit方法:

 1private void processWorkerExit(Worker w, boolean completedAbruptly) {
 2    if (completedAbruptly) // If abrupt, then workerCount wasn't adjusted
 3        decrementWorkerCount();
 4    final ReentrantLock mainLock = this.mainLock;
 5    mainLock.lock();
 6    try {
 7        completedTaskCount += w.completedTasks;
 8        workers.remove(w);
 9    } finally {
10        mainLock.unlock();
11    }
12
13    tryTerminate();
14
15    int c = ctl.get();
16    if (runStateLessThan(c, STOP)) {
17        if (!completedAbruptly) {
18            int min = allowCoreThreadTimeOut ? 0 : corePoolSize;
19            if (min == 0 && ! workQueue.isEmpty())
20                min = 1;
21            if (workerCountOf(c) >= min)
22                return; // replacement not needed
23        }
24        addWorker(null, false);
25    }
26}

其逻辑可以分为3个部分。

状态修改

线程池内部使用如下变量统计总共完成的任务数:

1private long completedTaskCount;

在退出时Worker线程将自己完成的数量加至以上变量中。并且将自身从Worker Set中移除。

关闭线程池

tryTerminate方法将会尝试关闭线程池。

 1final void tryTerminate() {
 2    for (;;) {
 3        int c = ctl.get();
 4        if (isRunning(c) || runStateAtLeast(c, TIDYING) ||
 5            (runStateOf(c) == SHUTDOWN && ! workQueue.isEmpty()))
 6            return;
 7        if (workerCountOf(c) != 0) { // Eligible to terminate
 8            interruptIdleWorkers(ONLY_ONE);
 9            return;
10        }
11        final ReentrantLock mainLock = this.mainLock;
12        mainLock.lock();
13        try {
14            if (ctl.compareAndSet(c, ctlOf(TIDYING, 0))) {
15                try {
16                    //空实现
17                    terminated();
18                } finally {
19                    ctl.set(ctlOf(TERMINATED, 0));
20                    termination.signalAll();
21                }
22                return;
23            }
24        } finally {
25            mainLock.unlock();
26        }
27        // else retry on failed CAS
28    }
29}

什么情况下才会尝试调用interruptIdleWorkers呢?

  • 当前状态为STOP,即执行了shutdownNow()方法。
  • 当前状态为SHUTDOWN且任务队列为null,这正对应shutdown()方法被调用且所有任务已执行完毕。

那么为什么只中断一个Worker线程而不是全部呢?猜测是这相当于链式唤醒,一个唤醒另一个直到最后一个将状态最终修改为TERMINATED。

1termination.signalAll();

用于唤醒正在等待线程终结的线程,termination定义如下:

1private final Condition termination = mainLock.newCondition();

awaitTermination方法部分源码:

1nanos = termination.awaitNanos(nanos);
线程重生

为什么叫重生呢?首先回顾一下runWorker方法任务执行的相关源码:

 1try {
 2    task.run();
 3} catch (RuntimeException x) {
 4    thrown = x; throw x;
 5} catch (Error x) {
 6    thrown = x; throw x;
 7} catch (Throwable x) {
 8    thrown = x; throw new Error(x);
 9} finally {
10    afterExecute(task, thrown);
11}

可以看到,异常又被重新抛了出去,也就是说如果我们任务出现了未检查异常就会导致Worker线程的退出,而processWorkerExit方法将会检测当前线程池是否还需要再增加Worker,如果是由于任务逻辑异常导致的退出势必是需要增加的,这便是"重生"。

submit & FutureTask

我们以单参数Callable task方法为例,AbstractExecutorService.submit:

1public <T> Future<T> submit(Callable<T> task) {
2    RunnableFuture<T> ftask = newTaskFor(task);
3    execute(ftask);
4    return ftask;
5}

AbstractExecutorService.newTaskFor:

1protected <T> RunnableFuture<T> newTaskFor(Callable<T> callable) {
2    return new FutureTask<T>(callable);
3}

被包装成了一个FutureTask对象:

FutureTask

FutureTask组合了Runnable和Future两个接口。下面我们来看一下其主要方法的实现。

get

1public V get() {
2    int s = state;
3    if (s <= COMPLETING)
4        s = awaitDone(false, 0L);
5    return report(s);
6}

state为状态标识,其声明(和可取的值)如下:

1private volatile int state;
2private static final int NEW          = 0;
3private static final int COMPLETING   = 1;
4private static final int NORMAL       = 2;
5private static final int EXCEPTIONAL  = 3;
6private static final int CANCELLED    = 4;
7private static final int INTERRUPTING = 5;
8private static final int INTERRUPTED  = 6;

显然get方法的核心便是用于进行等待的awaitDone方法:

 1private int awaitDone(boolean timed, long nanos) {
 2    final long deadline = timed ? System.nanoTime() + nanos : 0L;
 3    WaitNode q = null;
 4    boolean queued = false;
 5    for (;;) {
 6        if (Thread.interrupted()) {
 7            removeWaiter(q);
 8            throw new InterruptedException();
 9        }
10        int s = state;
11        if (s > COMPLETING) {
12            //已经完成
13            if (q != null)
14                q.thread = null;
15            return s;
16        }
17        else if (s == COMPLETING)
18            //正在完成,只需要让CPU空转进行等待即可
19            Thread.yield();
20        else if (q == null)
21            q = new WaitNode();
22        else if (!queued)
23            //CAS将新节点q设为等待链表的头结点
24            queued = UNSAFE.compareAndSwapObject(this, waitersOffset, q.next = waiters, q);
25        else if (timed) {
26            nanos = deadline - System.nanoTime();
27            if (nanos <= 0L) {
28                removeWaiter(q);
29                return state;
30            }
31            LockSupport.parkNanos(this, nanos);
32        }
33        else
34            LockSupport.park(this);
35    }
36}

这里并没有使用Lock或是Condition,而是直接使用了类似AQS等待队列的思想。我们来看一下WaitNode的类图:

WaitNode

属性thread取自构造器:

1WaitNode() { thread = Thread.currentThread(); }

而report方法用于根据最后的状态采取对应的动作,比如抛出异常或者是返回结果:

1private V report(int s) throws ExecutionException {
2    Object x = outcome;
3    if (s == NORMAL)
4        return (V)x;
5    if (s >= CANCELLED)
6        throw new CancellationException();
7    throw new ExecutionException((Throwable)x);
8}

run

很自然的想到一个问题: 是在哪里将状态设为已完成的呢?

 1public void run() {
 2    //runner屏障,防止任务重复执行
 3    if (state != NEW || !UNSAFE.compareAndSwapObject(this, runnerOffset, null, Thread.currentThread()))
 4        return;
 5    try {
 6        Callable<V> c = callable;
 7        //volatile读
 8        if (c != null && state == NEW) {
 9            //窗口开始
10            V result;
11            boolean ran;
12            try {
13                result = c.call();
14                ran = true;
15                //窗口结束
16            } catch (Throwable ex) {
17                result = null;
18                ran = false;
19                setException(ex);
20            }
21            if (ran)
22                set(result);
23        }
24    } finally {
25        // 解除runner屏障
26        runner = null;
27        // state must be re-read after nulling runner to prevent
28        // leaked interrupts
29        int s = state;
30        if (s >= INTERRUPTING)
31            handlePossibleCancellationInterrupt(s);
32    }
33}

set和setException方法便是用于改变任务的状态,通知我们的等待线程,将在后面进行说明。

这里最有意思的是handlePossibleCancellationInterrupt方法的调用,注释中提到的"泄漏的中断"指的是什么呢?其实在任务call方法调用前后存在一个状态被其它线程修改的时间窗口,窗口的起止位置见上面源码。

在这个窗口时间内,另外一个线程完全可能通过对cancel方法的调用将状态改为INTERRUPTING或CANCELLED。cancel方法的说明见下面,注意,一旦当前状态不再是NEW,那么set和setException方法便不会执行,因为其前提条件是状态为NEW,set方法部分源码:

1protected void set(V v) {
2    if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) {
3        //...
4    }
5}

所以handlePossibleCancellationInterrupt被执行的条件是:

在业务逻辑(call方法)执行期间发生了cancell调用

1private void handlePossibleCancellationInterrupt(int s) {
2    if (s == INTERRUPTING)
3        while (state == INTERRUPTING)
4            Thread.yield(); // wait out pending interrupt
5}

线程到这里便会空转等待,直到cancel线程将状态最终修改为INTERRUPTED。为什么要这么做呢?

猜测Doug Lea大神是为了保证被取消的线程晚于取消线程退出。

这里还有一个很有意思的问题,这里能不能清除中断标志呢?答案是不能。因为cancel靠中断取消任务的执行,同时我们也有可能利用中断语义自主结束任务的执行,FutureTask在这里不能分辨出是取消还是用户中断。那么问题来了,额外再引入一个标志变量可否?

cancel

 1public boolean cancel(boolean mayInterruptIfRunning) {
 2    if (!(state == NEW && UNSAFE.compareAndSwapInt(this, stateOffset, NEW,
 3        mayInterruptIfRunning ? INTERRUPTING : CANCELLED)))
 4        return false;
 5    try {    // in case call to interrupt throws exception
 6        if (mayInterruptIfRunning) {
 7            try {
 8                Thread t = runner;
 9                if (t != null)
10                    t.interrupt();
11            } finally { // final state
12                UNSAFE.putOrderedInt(this, stateOffset, INTERRUPTED);
13            }
14        }
15    } finally {
16        finishCompletion();
17    }
18    return true;
19}

可以看出,只有任务尚处于NEW状态时此方法才会返回true。这里有一个有意思的问题,为什么对于INTERRUPTED状态的设置使用putOrderedInt方法呢?

putOrderedInt方法是一种底层的优化手段,效果就是对volatile变量进行普通写操作,也就是说并不保证可见性,可以参考:

AtomicXXX.lazySet(…) in terms of happens before edges

可以进行此处优化的原因是执行到这里时状态(state)必定为INTERRUPTING或CANCELLED,而对于get/run等方法其实并不关心状态具体是哪一种,get方法源码回顾:

1public V get() throws InterruptedException, ExecutionException {
2    int s = state;
3    if (s <= COMPLETING)
4        s = awaitDone(false, 0L);
5    return report(s);
6}

只要state大于COMPLETING便会直接report,既然对其它线程没有影响也就没必要保证可见性(再加一次内存屏障了)。

finishCompletion方法用以最后执行唤醒等待线程等操作:

 1private void finishCompletion() {
 2    // assert state > COMPLETING;
 3    for (WaitNode q; (q = waiters) != null;) {
 4        if (UNSAFE.compareAndSwapObject(this, waitersOffset, q, null)) {
 5            for (;;) {
 6                Thread t = q.thread;
 7                if (t != null) {
 8                    q.thread = null;
 9                    LockSupport.unpark(t);
10                }
11                WaitNode next = q.next;
12                if (next == null)
13                    break;
14                q.next = null; // unlink to help gc
15                q = next;
16            }
17            break;
18        }
19    }
20    //模板方法,空实现
21    done();
22    callable = null;        // to reduce footprint
23}

其实就是一个遍历等待链表并逐个unpark的过程。

set

1protected void set(V v) {
2    if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) {
3        outcome = v;
4        UNSAFE.putOrderedInt(this, stateOffset, NORMAL); // final state
5        finishCompletion();
6    }
7}

一目了然。

shutdown

 1public void shutdown() {
 2    final ReentrantLock mainLock = this.mainLock;
 3    mainLock.lock();
 4    try {
 5        //设置状态
 6        advanceRunState(SHUTDOWN);
 7        interruptIdleWorkers();
 8    } finally {
 9        mainLock.unlock();
10    }
11    tryTerminate();
12}

这里加了锁,是时候总结一下这个mainLock用在哪些地方了:

  • shutdown & shutdownNow
  • awaitTermination
  • getPoolSize
  • getActiveCount
  • getLargestPoolSize
  • getTaskCount
  • getCompletedTaskCount
  • toString

可见,锁用在对Worker集合的操作以及线程池的关闭、线程数量获取上。tryTerminate方法已经见识过了,这里重点在于interruptIdleWorkers:

 1private void interruptIdleWorkers(boolean onlyOne) {
 2    final ReentrantLock mainLock = this.mainLock;
 3    mainLock.lock();
 4    try {
 5        for (Worker w : workers) {
 6            Thread t = w.thread;
 7            if (!t.isInterrupted() && w.tryLock()) {
 8                try {
 9                    t.interrupt();
10                } catch (SecurityException ignore) {
11                } finally {
12                    w.unlock();
13                }
14            }
15            if (onlyOne)
16                break;
17        }
18    } finally {
19        mainLock.unlock();
20    }
21}

onlyOne参数为false,这里最有意思的便是w.tryLock()。回顾之前Worker部分,Worker继承自AbstractQueuedSynchronizer,而Worker对业务逻辑的执行处于其自身锁的保护之下,也就是说,如果Worker当前正在由任务执行,根本不可能被中断,这就符合了线程池shutdown不会中断正在执行的任务的语义。由于interruptIdleWorkers执行时线程池的状态已被修改为SHUTDOWN,所以在下一次进行任务获取的时候Worker线程自然会感知到shutdown调用,等到将队列中所有任务执行完毕时自然也就退出了,参考上面任务获取一节。

shutdownNow

 1public List<Runnable> shutdownNow() {
 2    List<Runnable> tasks;
 3    final ReentrantLock mainLock = this.mainLock;
 4    mainLock.lock();
 5    try {
 6        advanceRunState(STOP);
 7        interruptWorkers();
 8        tasks = drainQueue();
 9    } finally {
10        mainLock.unlock();
11    }
12    tryTerminate();
13    return tasks;
14}

这里会将所有尚未来得及执行的任务一并返回。

 1private void interruptWorkers() {
 2    final ReentrantLock mainLock = this.mainLock;
 3    mainLock.lock();
 4    try {
 5        for (Worker w : workers)
 6            w.interruptIfStarted();
 7    } finally {
 8        mainLock.unlock();
 9    }
10}

Worker.interruptIfStarted:

1void interruptIfStarted() {
2    Thread t;
3    if (getState() >= 0 && (t = thread) != null && !t.isInterrupted()) {
4        try {
5            t.interrupt();
6        } catch (SecurityException ignore) {
7        }
8    }
9}

getState() >= 0表示当前Worker已启动。没有获取锁直接中断,这便是和shutdown的区别了。drainQueue其实是对BlockingQueue接口drainTo方法的调用,因为线程池的队列必须是一个BlockingQueue。

这里有一个很有意思的细节:

如果我们submit的任务尚未被执行,shutdownNow就被调用了,同时有一个线程正在阻塞在future上,那么此线程会被唤醒吗?

答案是不会,源码中没有看到相关唤醒的代码,测试方法test.Test.canWakeUp可以证明这一现象。

getActiveCount

此方法用以获取线程池中当前正在执行任务的线程数,其实现很有趣:

 1public int getActiveCount() {
 2    final ReentrantLock mainLock = this.mainLock;
 3    mainLock.lock();
 4    try {
 5        int n = 0;
 6        for (Worker w : workers)
 7            if (w.isLocked())
 8                ++n;
 9        return n;
10    } finally {
11        mainLock.unlock();
12    }
13}

是通过判断Worker是否持有锁完成的,新技能get。

finalize

ThreadPoolExecutor覆盖了此方法:

1protected void finalize() {
2    shutdown();
3}