ThreadPool
创建
我们以下列代码为例:
1public static ExecutorService newFixedThreadPool(int nThreads) {
2 return new ThreadPoolExecutor(nThreads, nThreads,
3 0L, TimeUnit.MILLISECONDS,
4 new LinkedBlockingQueue<Runnable>());
5}
可见默认使用LinkedBlockingQueue作为工作队列,其构造器:
1public LinkedBlockingQueue() {
2 this(Integer.MAX_VALUE);
3}
可见,这其实是一个有界队列,虽然大小为int最大值。
ThreadPoolExecutor便是JDK线程池的核心了,类图:
ThreadPoolExecutor构造器:
1public ThreadPoolExecutor(int corePoolSize,
2 int maximumPoolSize,
3 long keepAliveTime,
4 TimeUnit unit,
5 BlockingQueue<Runnable> workQueue) {
6 this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
7 Executors.defaultThreadFactory(), defaultHandler);
8}
线程工厂
默认的线程工厂是Executors的内部类,核心的newThread方法:
1public Thread newThread(Runnable r) {
2 Thread t = new Thread(group, r, namePrefix + threadNumber.getAndIncrement(), 0);
3 if (t.isDaemon())
4 t.setDaemon(false);
5 if (t.getPriority() != Thread.NORM_PRIORITY)
6 t.setPriority(Thread.NORM_PRIORITY);
7 return t;
8}
namePrefix定义:
1namePrefix = "pool-" + poolNumber.getAndIncrement() + "-thread-";
这便是为线程池默认创建的线程起名的地方了,Thread构造器的最后一个0为stackSize,0表示忽略此参数。
拒绝策略
从上面可以看出,线程池默认使用有界队列,所以当队列满的时候就需要考虑如何处理这种情况。
线程池默认采用的是:
1private static final RejectedExecutionHandler defaultHandler = new AbortPolicy();
即抛出异常,线程池退出:
1public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
2 throw new RejectedExecutionException("Task " + r.toString() + " rejected from " + e.toString());
3}
execute
1public void execute(Runnable command) {
2 if (command == null)
3 throw new NullPointerException();
4 int c = ctl.get();
5 //corePoolSize为volatile,下面会提到为什么
6 if (workerCountOf(c) < corePoolSize) {
7 if (addWorker(command, true))
8 //创建新线程成功,交由其执行
9 return;
10 c = ctl.get();
11 }
12 if (isRunning(c) && workQueue.offer(command)) {
13 int recheck = ctl.get();
14 if (! isRunning(recheck) && remove(command))
15 reject(command);
16 else if (workerCountOf(recheck) == 0)
17 addWorker(null, false);
18 }
19 else if (!addWorker(command, false))
20 reject(command);
21}
控制变量
ctl是线程池的核心控制变量:
1private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
有以下两个用途:
- 高3位标志线程池的运行状态,比如运行、关闭。
- 低29位存储当前工作线程的个数,所以一个线程池最多可以创建2 ^ 29 - 1(约为5亿)个线程。
线程创建
当我们调用execute方法时,线程池将首先检查当前线程数是否已达到上限,如果没有创建新的工作线程,而不是入队。
1private boolean addWorker(Runnable firstTask, boolean core) {
2 retry:
3 for (;;) {
4 int c = ctl.get();
5 int rs = runStateOf(c);
6 // 检查线程池状态,如果已关闭,返回false
7 if (rs >= SHUTDOWN && ! (rs == SHUTDOWN && firstTask == null && ! workQueue.isEmpty()))
8 return false;
9 for (;;) {
10 int wc = workerCountOf(c);
11 //检查是否达到上限
12 if (wc >= CAPACITY || wc >= (core ? corePoolSize : maximumPoolSize))
13 return false;
14 //如果CAS增加线程数成功,中断循环 ,进行线程创建
15 if (compareAndIncrementWorkerCount(c))
16 break retry;
17 c = ctl.get(); // Re-read ctl
18 if (runStateOf(c) != rs)
19 continue retry;
20 // else CAS failed due to workerCount change; retry inner loop
21 }
22 }
23
24 boolean workerStarted = false;
25 boolean workerAdded = false;
26 Worker w = null;
27 try {
28 w = new Worker(firstTask);
29 final Thread t = w.thread;
30 if (t != null) {
31 final ReentrantLock mainLock = this.mainLock;
32 //shutdown等方法也需要加锁,所以可以保证线程安全
33 mainLock.lock();
34 try {
35 //再次检查状态
36 int rs = runStateOf(ctl.get());
37 if (rs < SHUTDOWN || (rs == SHUTDOWN && firstTask == null)) {
38 if (t.isAlive()) // precheck that t is startable
39 throw new IllegalThreadStateException();
40 //workers是一个HashSet
41 workers.add(w);
42 int s = workers.size();
43 //用于记录出现过的最大线程数
44 if (s > largestPoolSize)
45 largestPoolSize = s;
46 workerAdded = true;
47 }
48 } finally {
49 mainLock.unlock();
50 }
51 if (workerAdded) {
52 t.start();
53 workerStarted = true;
54 }
55 }
56 } finally {
57 if (! workerStarted)
58 addWorkerFailed(w);
59 }
60 return workerStarted;
61}
核心 vs 最大线程数
注意execute方法中的细节,第一次addWorker调用的core参数为true,即表示已corePoolSize为上限,后两次为false。这就说明了execute方法执行时遵从一下顺序进行尝试:
- 如果当前线程数小于corePoolSize,那么增加线程。
- 尝试加入队列。
- 如果入队失败那么尝试将线程数增加至maximumPoolSize。
- 如果还是失败,那么交给RejectedExecutionHandler。
Worker
这里的"线程(即Worker)"其实是ThreadPoolExecutor的内部类。
又见AQS。构造器:
1Worker(Runnable firstTask) {
2 setState(-1); // inhibit interrupts until runWorker
3 this.firstTask = firstTask;
4 this.thread = getThreadFactory().newThread(this);
5}
其run方法的真正逻辑由ThreadPoolExecutor.runWorker实现:
1final void runWorker(Worker w) {
2 Thread wt = Thread.currentThread();
3 Runnable task = w.firstTask;
4 w.firstTask = null;
5 boolean completedAbruptly = true;
6 try {
7 while (task != null || (task = getTask()) != null) {
8 w.lock();
9 //中断状态
10 if ((runStateAtLeast(ctl.get(), STOP) || (Thread.interrupted() &&
11 runStateAtLeast(ctl.get(), STOP))) && !wt.isInterrupted())
12 wt.interrupt();
13 try {
14 task.run();
15 } finally {
16 task = null;
17 w.completedTasks++;
18 w.unlock();
19 }
20 }
21 completedAbruptly = false;
22 } finally {
23 processWorkerExit(w, completedAbruptly);
24 }
25}
锁
可以看出,一次任务的执行是在所在Worker的锁的保护下进行的,结合后面shutdownNow的源码可以发现,shutdownNow中断Worker的前提是获得锁,这就很好的体现了shutdownNow的语义: 阻止新任务的提交,等待所有已有任务执行完毕。
中断状态
这里有两种意义 :
- 如果线程池处于STOP(或之后)的状态,即shutdownNow方法已被调用,那么此处代码将确保线程的中断标志位一定被设置。
- 如果线程池处于STOP之前的状态,比如SHUTDOWN或RUNNING,那么Worker不应响应中断,即应当清除中断标志,但是暂时没有想到谁会设置Worker线程的中断标志位,难道是我们的业务代码?
在这里扒一扒到底什么是线程中断:
1public void interrupt() {
2 synchronized (blockerLock) {
3 Interruptible b = blocker;
4 if (b != null) {
5 interrupt0(); // Just to set the interrupt flag
6 b.interrupt(this);
7 return;
8 }
9 }
10 interrupt0();
11}
blocker在nio部分已经见过了,interrupt0的最终native实现位于openjdk\hotspot\src\os\solaris\vm\os_solaris.cpp(Linux):
1void os::interrupt(Thread* thread) {
2 OSThread* osthread = thread->osthread();
3 int isInterrupted = osthread->interrupted();
4 if (!isInterrupted) {
5 //设置标志位
6 osthread->set_interrupted(true);
7 OrderAccess::fence();
8 //唤醒sleep()?
9 ParkEvent * const slp = thread->_SleepEvent ;
10 if (slp != NULL) slp->unpark() ;
11 }
12 //唤醒LockSupport.park()?
13 if (thread->is_Java_thread()) {
14 ((JavaThread*)thread)->parker()->unpark();
15 }
16 //唤醒Object.wait()?
17 ParkEvent * const ev = thread->_ParkEvent ;
18 if (ev != NULL) ev->unpark() ;
19 // When events are used everywhere for os::sleep, then this thr_kill
20 // will only be needed if UseVMInterruptibleIO is true.
21 if (!isInterrupted) {
22 int status = thr_kill(osthread->thread_id(), os::Solaris::SIGinterrupt());
23 assert_status(status == 0, status, "thr_kill");
24 // Bump thread interruption counter
25 RuntimeService::record_thread_interrupt_signaled_count();
26 }
27}
与java里已知的可被中断的阻塞大体可以找到对应关系。
任务获取
1private Runnable getTask() {
2 boolean timedOut = false; // Did the last poll() time out?
3 for (;;) {
4 int c = ctl.get();
5 int rs = runStateOf(c);
6 // 线程池已经关闭且队列中没有剩余的任务,退出
7 if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
8 decrementWorkerCount();
9 return null;
10 }
11 int wc = workerCountOf(c);
12 // 如果启用了超时并且已经超时且队列中没有任务,线程退出
13 boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
14 if ((wc > maximumPoolSize || (timed && timedOut)) && (wc > 1 || workQueue.isEmpty())) {
15 if (compareAndDecrementWorkerCount(c))
16 return null;
17 continue;
18 }
19 try {
20 Runnable r = timed ? workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) : workQueue.take();
21 if (r != null)
22 return r;
23 timedOut = true;
24 } catch (InterruptedException retry) {
25 //如果被中断不是马上退出,而是在下一次循环中检查线程池状态
26 timedOut = false;
27 }
28 }
29}
结合runWorker方法可以发现,如果getTask返回null,那么即说明当前Worker线程应该退出。
超时
allowCoreThreadTimeOut定义如下:
1private volatile boolean allowCoreThreadTimeOut;
默认为false,如果开启,Worker不会无限期等待任务,而是超时之后便退出。我们可以通过allowCoreThreadTimeOut方法进行设置:
1public void allowCoreThreadTimeOut(boolean value) {
2 if (value && keepAliveTime <= 0)
3 throw new IllegalArgumentException("Core threads must have nonzero keep alive times");
4 if (value != allowCoreThreadTimeOut) {
5 allowCoreThreadTimeOut = value;
6 if (value)
7 interruptIdleWorkers();
8 }
9}
注意同时需传入一个大于零的keepAliveTime。所以受这两个参数的影响,当没有任务执行时线程数并不一定等于corePoolSize。
超额线程回收
这里的超额指超出corePoolSize的线程,源码中有一处隐蔽的细节:
1boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
当线程数大于corePoolSize时timed也为true,再结合下面的条件判断可以得出结论: 当线程池当前的线程数超过corePoolSize且队列为空且corePoolSize不为0(0是被允许的),超出的线程会退出。
这一点可以使用测试代码test.Test的maxPoolSize方法进行验证。
另一种减少线程数的方法就是调用setCorePoolSize或setMaximumPoolSize重设线程池相关参数。
退出
Worker在退出时将触发processWorkerExit方法:
1private void processWorkerExit(Worker w, boolean completedAbruptly) {
2 if (completedAbruptly) // If abrupt, then workerCount wasn't adjusted
3 decrementWorkerCount();
4 final ReentrantLock mainLock = this.mainLock;
5 mainLock.lock();
6 try {
7 completedTaskCount += w.completedTasks;
8 workers.remove(w);
9 } finally {
10 mainLock.unlock();
11 }
12
13 tryTerminate();
14
15 int c = ctl.get();
16 if (runStateLessThan(c, STOP)) {
17 if (!completedAbruptly) {
18 int min = allowCoreThreadTimeOut ? 0 : corePoolSize;
19 if (min == 0 && ! workQueue.isEmpty())
20 min = 1;
21 if (workerCountOf(c) >= min)
22 return; // replacement not needed
23 }
24 addWorker(null, false);
25 }
26}
其逻辑可以分为3个部分。
状态修改
线程池内部使用如下变量统计总共完成的任务数:
1private long completedTaskCount;
在退出时Worker线程将自己完成的数量加至以上变量中。并且将自身从Worker Set中移除。
关闭线程池
tryTerminate方法将会尝试关闭线程池。
1final void tryTerminate() {
2 for (;;) {
3 int c = ctl.get();
4 if (isRunning(c) || runStateAtLeast(c, TIDYING) ||
5 (runStateOf(c) == SHUTDOWN && ! workQueue.isEmpty()))
6 return;
7 if (workerCountOf(c) != 0) { // Eligible to terminate
8 interruptIdleWorkers(ONLY_ONE);
9 return;
10 }
11 final ReentrantLock mainLock = this.mainLock;
12 mainLock.lock();
13 try {
14 if (ctl.compareAndSet(c, ctlOf(TIDYING, 0))) {
15 try {
16 //空实现
17 terminated();
18 } finally {
19 ctl.set(ctlOf(TERMINATED, 0));
20 termination.signalAll();
21 }
22 return;
23 }
24 } finally {
25 mainLock.unlock();
26 }
27 // else retry on failed CAS
28 }
29}
什么情况下才会尝试调用interruptIdleWorkers呢?
- 当前状态为STOP,即执行了shutdownNow()方法。
- 当前状态为SHUTDOWN且任务队列为null,这正对应shutdown()方法被调用且所有任务已执行完毕。
那么为什么只中断一个Worker线程而不是全部呢?猜测是这相当于链式唤醒,一个唤醒另一个直到最后一个将状态最终修改为TERMINATED。
1termination.signalAll();
用于唤醒正在等待线程终结的线程,termination定义如下:
1private final Condition termination = mainLock.newCondition();
awaitTermination方法部分源码:
1nanos = termination.awaitNanos(nanos);
线程重生
为什么叫重生呢?首先回顾一下runWorker方法任务执行的相关源码:
1try {
2 task.run();
3} catch (RuntimeException x) {
4 thrown = x; throw x;
5} catch (Error x) {
6 thrown = x; throw x;
7} catch (Throwable x) {
8 thrown = x; throw new Error(x);
9} finally {
10 afterExecute(task, thrown);
11}
可以看到,异常又被重新抛了出去,也就是说如果我们任务出现了未检查异常就会导致Worker线程的退出,而processWorkerExit方法将会检测当前线程池是否还需要再增加Worker,如果是由于任务逻辑异常导致的退出势必是需要增加的,这便是"重生"。
submit & FutureTask
我们以单参数Callable
1public <T> Future<T> submit(Callable<T> task) {
2 RunnableFuture<T> ftask = newTaskFor(task);
3 execute(ftask);
4 return ftask;
5}
AbstractExecutorService.newTaskFor:
1protected <T> RunnableFuture<T> newTaskFor(Callable<T> callable) {
2 return new FutureTask<T>(callable);
3}
被包装成了一个FutureTask对象:
FutureTask组合了Runnable和Future两个接口。下面我们来看一下其主要方法的实现。
get
1public V get() {
2 int s = state;
3 if (s <= COMPLETING)
4 s = awaitDone(false, 0L);
5 return report(s);
6}
state为状态标识,其声明(和可取的值)如下:
1private volatile int state;
2private static final int NEW = 0;
3private static final int COMPLETING = 1;
4private static final int NORMAL = 2;
5private static final int EXCEPTIONAL = 3;
6private static final int CANCELLED = 4;
7private static final int INTERRUPTING = 5;
8private static final int INTERRUPTED = 6;
显然get方法的核心便是用于进行等待的awaitDone方法:
1private int awaitDone(boolean timed, long nanos) {
2 final long deadline = timed ? System.nanoTime() + nanos : 0L;
3 WaitNode q = null;
4 boolean queued = false;
5 for (;;) {
6 if (Thread.interrupted()) {
7 removeWaiter(q);
8 throw new InterruptedException();
9 }
10 int s = state;
11 if (s > COMPLETING) {
12 //已经完成
13 if (q != null)
14 q.thread = null;
15 return s;
16 }
17 else if (s == COMPLETING)
18 //正在完成,只需要让CPU空转进行等待即可
19 Thread.yield();
20 else if (q == null)
21 q = new WaitNode();
22 else if (!queued)
23 //CAS将新节点q设为等待链表的头结点
24 queued = UNSAFE.compareAndSwapObject(this, waitersOffset, q.next = waiters, q);
25 else if (timed) {
26 nanos = deadline - System.nanoTime();
27 if (nanos <= 0L) {
28 removeWaiter(q);
29 return state;
30 }
31 LockSupport.parkNanos(this, nanos);
32 }
33 else
34 LockSupport.park(this);
35 }
36}
这里并没有使用Lock或是Condition,而是直接使用了类似AQS等待队列的思想。我们来看一下WaitNode的类图:
属性thread取自构造器:
1WaitNode() { thread = Thread.currentThread(); }
而report方法用于根据最后的状态采取对应的动作,比如抛出异常或者是返回结果:
1private V report(int s) throws ExecutionException {
2 Object x = outcome;
3 if (s == NORMAL)
4 return (V)x;
5 if (s >= CANCELLED)
6 throw new CancellationException();
7 throw new ExecutionException((Throwable)x);
8}
run
很自然的想到一个问题: 是在哪里将状态设为已完成的呢?
1public void run() {
2 //runner屏障,防止任务重复执行
3 if (state != NEW || !UNSAFE.compareAndSwapObject(this, runnerOffset, null, Thread.currentThread()))
4 return;
5 try {
6 Callable<V> c = callable;
7 //volatile读
8 if (c != null && state == NEW) {
9 //窗口开始
10 V result;
11 boolean ran;
12 try {
13 result = c.call();
14 ran = true;
15 //窗口结束
16 } catch (Throwable ex) {
17 result = null;
18 ran = false;
19 setException(ex);
20 }
21 if (ran)
22 set(result);
23 }
24 } finally {
25 // 解除runner屏障
26 runner = null;
27 // state must be re-read after nulling runner to prevent
28 // leaked interrupts
29 int s = state;
30 if (s >= INTERRUPTING)
31 handlePossibleCancellationInterrupt(s);
32 }
33}
set和setException方法便是用于改变任务的状态,通知我们的等待线程,将在后面进行说明。
这里最有意思的是handlePossibleCancellationInterrupt方法的调用,注释中提到的"泄漏的中断"指的是什么呢?其实在任务call方法调用前后存在一个状态被其它线程修改的时间窗口,窗口的起止位置见上面源码。
在这个窗口时间内,另外一个线程完全可能通过对cancel方法的调用将状态改为INTERRUPTING或CANCELLED。cancel方法的说明见下面,注意,一旦当前状态不再是NEW,那么set和setException方法便不会执行,因为其前提条件是状态为NEW,set方法部分源码:
1protected void set(V v) {
2 if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) {
3 //...
4 }
5}
所以handlePossibleCancellationInterrupt被执行的条件是:
在业务逻辑(call方法)执行期间发生了cancell调用。
1private void handlePossibleCancellationInterrupt(int s) {
2 if (s == INTERRUPTING)
3 while (state == INTERRUPTING)
4 Thread.yield(); // wait out pending interrupt
5}
线程到这里便会空转等待,直到cancel线程将状态最终修改为INTERRUPTED。为什么要这么做呢?
猜测Doug Lea大神是为了保证被取消的线程晚于取消线程退出。
这里还有一个很有意思的问题,这里能不能清除中断标志呢?答案是不能。因为cancel靠中断取消任务的执行,同时我们也有可能利用中断语义自主结束任务的执行,FutureTask在这里不能分辨出是取消还是用户中断。那么问题来了,额外再引入一个标志变量可否?
cancel
1public boolean cancel(boolean mayInterruptIfRunning) {
2 if (!(state == NEW && UNSAFE.compareAndSwapInt(this, stateOffset, NEW,
3 mayInterruptIfRunning ? INTERRUPTING : CANCELLED)))
4 return false;
5 try { // in case call to interrupt throws exception
6 if (mayInterruptIfRunning) {
7 try {
8 Thread t = runner;
9 if (t != null)
10 t.interrupt();
11 } finally { // final state
12 UNSAFE.putOrderedInt(this, stateOffset, INTERRUPTED);
13 }
14 }
15 } finally {
16 finishCompletion();
17 }
18 return true;
19}
可以看出,只有任务尚处于NEW状态时此方法才会返回true。这里有一个有意思的问题,为什么对于INTERRUPTED状态的设置使用putOrderedInt方法呢?
putOrderedInt方法是一种底层的优化手段,效果就是对volatile变量进行普通写操作,也就是说并不保证可见性,可以参考:
AtomicXXX.lazySet(…) in terms of happens before edges
可以进行此处优化的原因是执行到这里时状态(state)必定为INTERRUPTING或CANCELLED,而对于get/run等方法其实并不关心状态具体是哪一种,get方法源码回顾:
1public V get() throws InterruptedException, ExecutionException {
2 int s = state;
3 if (s <= COMPLETING)
4 s = awaitDone(false, 0L);
5 return report(s);
6}
只要state大于COMPLETING便会直接report,既然对其它线程没有影响也就没必要保证可见性(再加一次内存屏障了)。
finishCompletion方法用以最后执行唤醒等待线程等操作:
1private void finishCompletion() {
2 // assert state > COMPLETING;
3 for (WaitNode q; (q = waiters) != null;) {
4 if (UNSAFE.compareAndSwapObject(this, waitersOffset, q, null)) {
5 for (;;) {
6 Thread t = q.thread;
7 if (t != null) {
8 q.thread = null;
9 LockSupport.unpark(t);
10 }
11 WaitNode next = q.next;
12 if (next == null)
13 break;
14 q.next = null; // unlink to help gc
15 q = next;
16 }
17 break;
18 }
19 }
20 //模板方法,空实现
21 done();
22 callable = null; // to reduce footprint
23}
其实就是一个遍历等待链表并逐个unpark的过程。
set
1protected void set(V v) {
2 if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) {
3 outcome = v;
4 UNSAFE.putOrderedInt(this, stateOffset, NORMAL); // final state
5 finishCompletion();
6 }
7}
一目了然。
shutdown
1public void shutdown() {
2 final ReentrantLock mainLock = this.mainLock;
3 mainLock.lock();
4 try {
5 //设置状态
6 advanceRunState(SHUTDOWN);
7 interruptIdleWorkers();
8 } finally {
9 mainLock.unlock();
10 }
11 tryTerminate();
12}
这里加了锁,是时候总结一下这个mainLock用在哪些地方了:
- shutdown & shutdownNow
- awaitTermination
- getPoolSize
- getActiveCount
- getLargestPoolSize
- getTaskCount
- getCompletedTaskCount
- toString
可见,锁用在对Worker集合的操作以及线程池的关闭、线程数量获取上。tryTerminate方法已经见识过了,这里重点在于interruptIdleWorkers:
1private void interruptIdleWorkers(boolean onlyOne) {
2 final ReentrantLock mainLock = this.mainLock;
3 mainLock.lock();
4 try {
5 for (Worker w : workers) {
6 Thread t = w.thread;
7 if (!t.isInterrupted() && w.tryLock()) {
8 try {
9 t.interrupt();
10 } catch (SecurityException ignore) {
11 } finally {
12 w.unlock();
13 }
14 }
15 if (onlyOne)
16 break;
17 }
18 } finally {
19 mainLock.unlock();
20 }
21}
onlyOne参数为false,这里最有意思的便是w.tryLock()。回顾之前Worker部分,Worker继承自AbstractQueuedSynchronizer,而Worker对业务逻辑的执行处于其自身锁的保护之下,也就是说,如果Worker当前正在由任务执行,根本不可能被中断,这就符合了线程池shutdown不会中断正在执行的任务的语义。由于interruptIdleWorkers执行时线程池的状态已被修改为SHUTDOWN,所以在下一次进行任务获取的时候Worker线程自然会感知到shutdown调用,等到将队列中所有任务执行完毕时自然也就退出了,参考上面任务获取一节。
shutdownNow
1public List<Runnable> shutdownNow() {
2 List<Runnable> tasks;
3 final ReentrantLock mainLock = this.mainLock;
4 mainLock.lock();
5 try {
6 advanceRunState(STOP);
7 interruptWorkers();
8 tasks = drainQueue();
9 } finally {
10 mainLock.unlock();
11 }
12 tryTerminate();
13 return tasks;
14}
这里会将所有尚未来得及执行的任务一并返回。
1private void interruptWorkers() {
2 final ReentrantLock mainLock = this.mainLock;
3 mainLock.lock();
4 try {
5 for (Worker w : workers)
6 w.interruptIfStarted();
7 } finally {
8 mainLock.unlock();
9 }
10}
Worker.interruptIfStarted:
1void interruptIfStarted() {
2 Thread t;
3 if (getState() >= 0 && (t = thread) != null && !t.isInterrupted()) {
4 try {
5 t.interrupt();
6 } catch (SecurityException ignore) {
7 }
8 }
9}
getState() >= 0表示当前Worker已启动。没有获取锁直接中断,这便是和shutdown的区别了。drainQueue其实是对BlockingQueue接口drainTo方法的调用,因为线程池的队列必须是一个BlockingQueue。
这里有一个很有意思的细节:
如果我们submit的任务尚未被执行,shutdownNow就被调用了,同时有一个线程正在阻塞在future上,那么此线程会被唤醒吗?
答案是不会,源码中没有看到相关唤醒的代码,测试方法test.Test.canWakeUp可以证明这一现象。
getActiveCount
此方法用以获取线程池中当前正在执行任务的线程数,其实现很有趣:
1public int getActiveCount() {
2 final ReentrantLock mainLock = this.mainLock;
3 mainLock.lock();
4 try {
5 int n = 0;
6 for (Worker w : workers)
7 if (w.isLocked())
8 ++n;
9 return n;
10 } finally {
11 mainLock.unlock();
12 }
13}
是通过判断Worker是否持有锁完成的,新技能get。
finalize
ThreadPoolExecutor覆盖了此方法:
1protected void finalize() {
2 shutdown();
3}