Condition

Condition

类图

Condition类图

ConditionObject是AbstractQueuedSynchronizer的内部类。

创建

创建是通过Lock.newCondition()来完成的。

ReentrantLock.newCondition:

1public Condition newCondition() {
2    return sync.newCondition();
3}

Sync.newCondition:

1final ConditionObject newCondition() {
2    return new ConditionObject();
3}

await

ConditionObject.await:

 1public final void await() throws InterruptedException {
 2    if (Thread.interrupted())
 3        throw new InterruptedException();
 4    Node node = addConditionWaiter();
 5    int savedState = fullyRelease(node);
 6    int interruptMode = 0;
 7    while (!isOnSyncQueue(node)) {
 8        LockSupport.park(this);
 9        if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
10            break;
11    }
12    if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
13        interruptMode = REINTERRUPT;
14    if (node.nextWaiter != null) // clean up if cancelled
15        unlinkCancelledWaiters();
16    if (interruptMode != 0)
17        reportInterruptAfterWait(interruptMode);
18}

条件队列

addConditionWaiter用于向Lock的锁队列的条件队列添加新的等待节点,什么是锁队列参见ReentrantLock。

 1private Node addConditionWaiter() {
 2    Node t = lastWaiter;
 3    // If lastWaiter is cancelled, clean out.
 4    if (t != null && t.waitStatus != Node.CONDITION) {
 5        unlinkCancelledWaiters();
 6        t = lastWaiter;
 7    }
 8    Node node = new Node(Thread.currentThread(), Node.CONDITION);
 9    if (t == null)
10        firstWaiter = node;
11    else
12        t.nextWaiter = node;
13    lastWaiter = node;
14    return node;
15}

一目了然。

锁释放

调用Condition.await方法的前提是拥有锁,await会释放锁

AbstractQueuedSynchronizer.fullyRelease:

 1final int fullyRelease(Node node) {
 2    boolean failed = true;
 3    try {
 4        int savedState = getState();
 5        if (release(savedState)) {
 6            failed = false;
 7            return savedState;
 8        } else {
 9            throw new IllegalMonitorStateException();
10        }
11    } finally {
12        if (failed)
13            node.waitStatus = Node.CANCELLED;
14    }
15}

可以看出,不管有多少次重入,都会被一次性释放。fullyRelease方法返回的是锁释放之前的重入次数。

阻塞

锁释放后,便会将当前线程阻塞直到被signal或中断。

1while (!isOnSyncQueue(node)) {
2    LockSupport.park(this);
3    if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
4        break;
5}

isOnSyncQueue用以判断节点是否在锁队列(SyncQueue)上,源码:

1final boolean isOnSyncQueue(Node node) {
2    if (node.waitStatus == Node.CONDITION || node.prev == null)
3        return false;
4    if (node.next != null) // If has successor, it must be on queue
5        return true;
6    return findNodeFromTail(node);
7}

可见,只要满足waitStatus是CONDITION或prev/next为空,那么就不在锁队列上,因为条件队列是用nextWaiter连接的,而不是prev/next。

那么是么时候会spin到锁队列中去呢,应该是在signal中。

先中断 or 先唤醒

checkInterruptWhileWaiting方法用以判断中断是发生在被signal之前还是之后。

ConditionObject.checkInterruptWhileWaiting:

1private int checkInterruptWhileWaiting(Node node) {
2    //THROW_IE: -1,先中断
3    //REINTERRUPT: 1,先唤醒
4    return Thread.interrupted() ?
5        (transferAfterCancelledWait(node) ? THROW_IE : REINTERRUPT) : 0;
6}

AbstractQueuedSynchronizer.transferAfterCancelledWait:

 1final boolean transferAfterCancelledWait(Node node) {
 2    if (compareAndSetWaitStatus(node, Node.CONDITION, 0)) {
 3        enq(node);
 4        return true;
 5    }
 6    //等待signal线程完成到锁队列的转换工作
 7    while (!isOnSyncQueue(node))
 8        Thread.yield();
 9    return false;
10}

可见,对先中断和先唤醒的区分是通过状态是不是CONDITION完成的,如果是先中断,那么将其状态设为0并将节点加入到锁队列。

节点的状态被设为0的原因后面会提到。

重新获得锁

由于当前线程本来是拥有锁的,所以从阻塞中恢复(被中断或唤醒)之后,需要恢复到最初的状态,才能继续执行后续的操作。

恢复是通过acquireQueued方法完成的:

1if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
2    interruptMode = REINTERRUPT;

从这里也可以看出保存最初的锁状态(savedState)的原因。

acquireQueued方法在ReentrantLock中已经说过了,在此不再赘述,只是注意一点:

AbstractQueuedSynchronizer.shouldParkAfterFailedAcquire:

 1private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
 2    int ws = pred.waitStatus;
 3    if (ws == Node.SIGNAL)
 4        return true;
 5    if (ws > 0) {
 6        do {
 7            node.prev = pred = pred.prev;
 8        } while (pred.waitStatus > 0);
 9        pred.next = node;
10    } else {
11        //here
12        compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
13    }
14    return false;
15}

可以看出,0状态被设为了SIGNAL,这就解释了上面被中断后为什么要将状态设为0了。

是否取消

获得锁之后,首先会对整个条件队列进行扫描,如果有被取消的节点,那么将其从队列中移除。

ConditionObject.unlinkCancelledWaiters:

 1private void unlinkCancelledWaiters() {
 2    Node t = firstWaiter;
 3    Node trail = null;
 4    while (t != null) {
 5        Node next = t.nextWaiter;
 6        if (t.waitStatus != Node.CONDITION) {
 7            t.nextWaiter = null;
 8            if (trail == null)
 9                firstWaiter = next;
10            else
11                trail.nextWaiter = next;
12            if (next == null)
13                lastWaiter = trail;
14        }
15        else
16            trail = t;
17        t = next;
18    }
19}

就是一个喜闻乐见的链表操作。

中断报告

ConditionObject.reportInterruptAfterWait:

1private void reportInterruptAfterWait(int interruptMode) throws InterruptedException {
2    if (interruptMode == THROW_IE)
3        throw new InterruptedException();
4    else if (interruptMode == REINTERRUPT)
5        //重新设置中断标志位
6        selfInterrupt();
7}

signalAll

ConditionObject.signalAll:

1public final void signalAll() {
2    //检查当前线程是不是拥有锁
3    if (!isHeldExclusively())
4        throw new IllegalMonitorStateException();
5    Node first = firstWaiter;
6    if (first != null)
7        doSignalAll(first);
8}

doSignalAll:

1private void doSignalAll(Node first) {
2    lastWaiter = firstWaiter = null;
3    do {
4        Node next = first.nextWaiter;
5        first.nextWaiter = null;
6        transferForSignal(first);
7        first = next;
8    } while (first != null);
9}

可见,这里所做的就是将节点从条件队列转移到锁队列中去

节点转移

AbstractQueuedSynchronizer.transferForSignal:

 1final boolean transferForSignal(Node node) {
 2    if (!compareAndSetWaitStatus(node, Node.CONDITION, 0))
 3        return false;
 4    Node p = enq(node);
 5    int ws = p.waitStatus;
 6    //将状态设为SIGNAL
 7    if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL))
 8        LockSupport.unpark(node.thread);
 9    return true;
10}

CAS

既然await和signal操作都是在有锁的情况下调用的,那么为什么还是存在大量CAS调用?

推测是这样的,await有一段代码是在没有锁的情况下执行的:

1while (!isOnSyncQueue(node)) {
2    LockSupport.park(this);
3    if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
4        break;
5}

与这段代码的交互需要CAS。

总结

唤醒操作(signal/signalAll)并不会使线程马上获得锁,而是使之拥有获得锁的资格。