ScheduledThreadPool

创建

我们一般使用下面的方式进行创建:

1public static ScheduledExecutorService newSingleThreadScheduledExecutor() {
2    return new DelegatedScheduledExecutorService(new ScheduledThreadPoolExecutor(1));
3}

DelegatedScheduledExecutorService实际上是对ScheduledExecutorService接口方法的转发,目的是只将ScheduledExecutorService接口的public方法暴露出来,这其实就是门面模式。

显然这里的核心便是ScheduledThreadPoolExecutor了:

ScheduledThreadPoolExecutor

ScheduledThreadPoolExecutor构造器:

1public ScheduledThreadPoolExecutor(int corePoolSize) {
2    super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS, new DelayedWorkQueue());
3}

所以,默认情况下创建的是corePoolSize为1的线程池,而maximumPoolSize却为int最大值!

其工作队列DelayedWorkQueue是ScheduledThreadPoolExecutor的嵌套类:

DelayedWorkQueue

单次调度

1public <V> ScheduledFuture<V> schedule(Callable<V> callable, long delay, TimeUnit unit) {
2    RunnableScheduledFuture<V> t = decorateTask(callable,
3        new ScheduledFutureTask<V>(callable, triggerTime(delay, unit)));
4    delayedExecute(t);
5    return t;
6}

触发时间计算

1private long triggerTime(long delay, TimeUnit unit) {
2    return triggerTime(unit.toNanos((delay < 0) ? 0 : delay));
3}
4long triggerTime(long delay) {
5     return now() + ((delay < (Long.MAX_VALUE >> 1)) ? delay : overflowFree(delay));
6}

now方法即返回当前纳秒表示的时间,所以触发时间就是当前时间加延时。

任务包装

Callable任务被包装成了ScheduledFutureTask对象,其是ScheduledThreadPoolExecutor的内部类:

ScheduledFutureTask

decorateTask是一个模板方法,空实现。

调度

核心便是delayedExecute方法:

 1private void delayedExecute(RunnableScheduledFuture<?> task) {
 2    if (isShutdown())
 3        reject(task);
 4    else {
 5        super.getQueue().add(task);
 6        if (isShutdown() &&
 7            !canRunInCurrentRunState(task.isPeriodic()) &&
 8            remove(task))
 9            task.cancel(false);
10        else
11            ensurePrestart();
12    }
13}

isShutdown方法在父类ThreadPoolExecutor中实现,利用的便是我们已经提到过的状态记录的方法。

工作队列

DelayedWorkQueue的类图位于上面创建一节中,其实此队列便是调度实现的核心,此队列实际上用数组实现了一个小顶堆,其add方法实际上通过offer方法实现:

 1public boolean offer(Runnable x) {
 2    RunnableScheduledFuture<?> e = (RunnableScheduledFuture<?>)x;
 3    final ReentrantLock lock = this.lock;
 4    lock.lock();
 5    try {
 6        int i = size;
 7        //扩容
 8        if (i >= queue.length)
 9            grow();
10        size = i + 1;
11        //队列为empty
12        if (i == 0) {
13            queue[0] = e;
14            setIndex(e, 0);
15        } else {
16            siftUp(i, e);
17        }
18        if (queue[0] == e) {
19            leader = null;
20            available.signal();
21        }
22    } finally {
23        lock.unlock();
24    }
25    return true;
26}

queue便是用以实现小顶堆的数组:

1private RunnableScheduledFuture<?>[] queue = new RunnableScheduledFuture<?>[INITIAL_CAPACITY];

初始大小为16.

堆首先是一棵完全二叉树,按照如下的顺序将其节点存储到数组中:

堆节点顺序

满足以下的性质:

  • 任一节点的父节点的数组下标为i / 2.
  • 节点的左子节点的下标为i * 2, 右子节点的下标为i * 2 + 1.
  • 添加节点时将节点放在数组的最后一个位置,然后不断的将此节点的值与其父节点比较,如果不满足堆的条件,交换之.
  • 堆排序的时间复杂度: O(NlongN).

扩容

1private void grow() {
2    int oldCapacity = queue.length;
3    int newCapacity = oldCapacity + (oldCapacity >> 1); // grow 50%
4    if (newCapacity < 0) // overflow
5        newCapacity = Integer.MAX_VALUE;
6    queue = Arrays.copyOf(queue, newCapacity);
7}

1.5倍扩容,最大取int最大值。

上移

即使堆再次平衡的过程:

 1private void siftUp(int k, RunnableScheduledFuture<?> key) {
 2    while (k > 0) {
 3        int parent = (k - 1) >>> 1;
 4        RunnableScheduledFuture<?> e = queue[parent];
 5        if (key.compareTo(e) >= 0)
 6            break;
 7        queue[k] = e;
 8        setIndex(e, k);
 9        k = parent;
10    }
11    queue[k] = key;
12    setIndex(key, k);
13}

很容易理解,就是一个和父节点交换,直到父节点的延时时间小于当前任务。

状态检查

将任务提交到队列后会再次对线程池当前的状态进行检查,相关源码:

1super.getQueue().add(task);
2if (isShutdown() && !canRunInCurrentRunState(task.isPeriodic()) && remove(task)) {
3    task.cancel(false);
4}

isShutdown成立的条件是当前状态不是运行状态,isPeriodic方法用以判断任务是否是持续任务:

1public boolean isPeriodic() {
2    return period != 0;
3}

period是ScheduledFutureTask的属性,其不同的取值意义如下:

  • 正值: 按固定的时间间隔调度
  • 负值: 按固定的时间延迟进行调度
  • 零: 单次任务

canRunInCurrentRunState:

1boolean canRunInCurrentRunState(boolean periodic) {
2    return isRunningOrShutdown(periodic ?
3                                continueExistingPeriodicTasksAfterShutdown :
4                                executeExistingDelayedTasksAfterShutdown);
5}

ThreadPoolExecutor.isRunningOrShutdown:

1final boolean isRunningOrShutdown(boolean shutdownOK) {
2    int rs = runStateOf(ctl.get());
3    return rs == RUNNING || (rs == SHUTDOWN && shutdownOK);
4}

这里判断的是在当前的状态下是否可以执行任务,SHUTDOWN态是由于shutdown方法被调用所致,不是shutdownNow。

continueExistingPeriodicTasksAfterShutdown和executeExistingDelayedTasksAfterShutdown属性ScheduledThreadPoolExecutor为我们留下了setter方法,你懂的。

任务移除

如果当前已不能进行任务执行,那么便将刚提交的任务从堆中移除,核心的实现为DelayedWorkQueue的同名方法:

 1public boolean remove(Object x) {
 2    final ReentrantLock lock = this.lock;
 3    lock.lock();
 4    try {
 5        int i = indexOf(x);
 6        if (i < 0)
 7            return false;
 8        setIndex(queue[i], -1);
 9        int s = --size;
10        RunnableScheduledFuture<?> replacement = queue[s];
11        queue[s] = null;
12        if (s != i) {
13            siftDown(i, replacement);
14            if (queue[i] == replacement)
15                //不能进行下移,再试试上移?
16                siftUp(i, replacement);
17        }
18        return true;
19    } finally {
20        lock.unlock();
21    }
22}

关键在于条件判断if (s != i),即被移除的节点不是最后(数组的最后)一个节点,在这种情况下会导致数组i处出现一个空位,所以在这里进行了先下移再上移的尝试,以使用最末节点或其它节点填补此空位,同时数组大小减一。

任务取消

ScheduledFutureTask.cancel:

1public boolean cancel(boolean mayInterruptIfRunning) {
2    boolean cancelled = super.cancel(mayInterruptIfRunning);
3    if (cancelled && removeOnCancel && heapIndex >= 0)
4        remove(this);
5    return cancelled;
6}

父类FutureTask的cancel方法已经见过了,removeOnCancel为ScheduledThreadPoolExecutor的属性,默认为false,其实这里调用remove是不必要的,因为已经被调用过了。

Worker启动

ThreadPoolExecutor.ensurePrestart:

1void ensurePrestart() {
2    int wc = workerCountOf(ctl.get());
3    if (wc < corePoolSize)
4        addWorker(null, true);
5    else if (wc == 0)
6        addWorker(null, false);
7}

即使corePoolSize为0,也要保证有一个Worker线程。

任务获取

在ThreadPoolExecutor我们已经见过了,Worker线程通过调用任务队列的take方法进行获取:

 1public RunnableScheduledFuture<?> take() throws InterruptedException {
 2    final ReentrantLock lock = this.lock;
 3    lock.lockInterruptibly();
 4    try {
 5        for (;;) {
 6            RunnableScheduledFuture<?> first = queue[0];
 7            //堆为空
 8            if (first == null)
 9                available.await();
10            else {
11                long delay = first.getDelay(NANOSECONDS);
12                //getDelay返回的是延时执行时间和当前时间的差,非正值说明此任务可以执行了
13                if (delay <= 0)
14                    return finishPoll(first);
15                first = null;
16                if (leader != null)
17                    //已存在leader,所以当前线程为follower,永久等待
18                    available.await();
19                else {
20                    Thread thisThread = Thread.currentThread();
21                    leader = thisThread;
22                    try {
23                        //当前线程成为leader,等待至下一次任务执行时间
24                        available.awaitNanos(delay);
25                    } finally {
26                        if (leader == thisThread)
27                            leader = null;
28                    }
29                }
30            }
31        }
32    } finally {
33        if (leader == null && queue[0] != null)
34            //当前线程接下来要去执行定时任务逻辑,所以唤醒一个follower(如果有),使之成为新的leader
35            available.signal();
36        lock.unlock();
37    }
38}

这里其实应用了Leader/Follower模式,参考:

Leader/Follower多线程网络模型介绍

使用这种模式的原因猜想应该是这样: 由于定时任务的特殊性,在某一时刻应该只有一个任务等开始时间最短,这样的话只让一个线程阻塞至既定时间即可,其它线程及时醒来也不能立即执行任务,从而造成了性能的浪费。

如果堆为空,那么等待的Worker何时被唤醒呢?玄机就在offer方法,相关源码:

1if (queue[0] == e) {
2    leader = null;
3    available.signal();
4}

为什么新任务被至于堆顶时需要唤醒Worker呢,因为这就意味着之前堆为空或最近需要执行任务的时间已经改变,需要重新调整leader的睡眠时间。

finishPoll方法很容易猜到,就是填补堆顶的空缺:

1private RunnableScheduledFuture<?> finishPoll(RunnableScheduledFuture<?> f) {
2    int s = --size;
3    RunnableScheduledFuture<?> x = queue[s];
4    queue[s] = null;
5    if (s != 0)
6        siftDown(0, x);
7    setIndex(f, -1);
8    return f;
9}

将最后 一个元素从堆顶使其"沉沦"。

重生

对于持续执行的任务,在一次执行完成后应该将其再次放入到堆中,以待下次执行,这一步是在ScheduledFutureTask的run方法中完成:

 1public void run() {
 2    boolean periodic = isPeriodic();
 3    if (!canRunInCurrentRunState(periodic))
 4        cancel(false);
 5    //单次任务
 6    else if (!periodic)
 7        ScheduledFutureTask.super.run();
 8    //持续任务
 9    else if (ScheduledFutureTask.super.runAndReset()) {
10        //设置下次执行的时间
11        setNextRunTime();
12        //重新加入到堆中
13        reExecutePeriodic(outerTask);
14    }
15}

FutureTask.runAndReset方法便是调用任务逻辑的地方,不同于我们已经见过的run方法,这里不会设置任务执行的结果(即outcome属性),也不会改变Future的状态,所以即使一次执行完毕,Future看到的状态仍是未完成。

shutdown

主要逻辑由父类ThreadPoolExecutor实现,唯一的区别便在于ScheduledThreadPoolExecutor实现了父类的模板方法onShutdown(简略版源码):

 1@Override void onShutdown() {
 2    BlockingQueue<Runnable> q = super.getQueue();
 3    boolean keepDelayed = getExecuteExistingDelayedTasksAfterShutdownPolicy();
 4    boolean keepPeriodic = getContinueExistingPeriodicTasksAfterShutdownPolicy();
 5    if (!keepDelayed && !keepPeriodic) {
 6        for (Object e : q.toArray())
 7            if (e instanceof RunnableScheduledFuture<?>)
 8                ((RunnableScheduledFuture<?>) e).cancel(false);
 9        q.clear();
10    }
11}

这里所做的就是将堆中所有未执行的任务取消,所以如果有线程阻塞在等待任务的结果上最终可以返回。

shutdownNow

直接调用父类的方法实现,可以想象,这便会导致线程池已关闭但Future.get无法返回。