Condition
Condition
类图
ConditionObject是AbstractQueuedSynchronizer的内部类。
创建
创建是通过Lock.newCondition()来完成的。
ReentrantLock.newCondition:
1public Condition newCondition() {
2 return sync.newCondition();
3}
Sync.newCondition:
1final ConditionObject newCondition() {
2 return new ConditionObject();
3}
await
ConditionObject.await:
1public final void await() throws InterruptedException {
2 if (Thread.interrupted())
3 throw new InterruptedException();
4 Node node = addConditionWaiter();
5 int savedState = fullyRelease(node);
6 int interruptMode = 0;
7 while (!isOnSyncQueue(node)) {
8 LockSupport.park(this);
9 if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
10 break;
11 }
12 if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
13 interruptMode = REINTERRUPT;
14 if (node.nextWaiter != null) // clean up if cancelled
15 unlinkCancelledWaiters();
16 if (interruptMode != 0)
17 reportInterruptAfterWait(interruptMode);
18}
条件队列
addConditionWaiter用于向Lock的锁队列的条件队列添加新的等待节点,什么是锁队列参见ReentrantLock。
1private Node addConditionWaiter() {
2 Node t = lastWaiter;
3 // If lastWaiter is cancelled, clean out.
4 if (t != null && t.waitStatus != Node.CONDITION) {
5 unlinkCancelledWaiters();
6 t = lastWaiter;
7 }
8 Node node = new Node(Thread.currentThread(), Node.CONDITION);
9 if (t == null)
10 firstWaiter = node;
11 else
12 t.nextWaiter = node;
13 lastWaiter = node;
14 return node;
15}
一目了然。
锁释放
调用Condition.await方法的前提是拥有锁,await会释放锁。
AbstractQueuedSynchronizer.fullyRelease:
1final int fullyRelease(Node node) {
2 boolean failed = true;
3 try {
4 int savedState = getState();
5 if (release(savedState)) {
6 failed = false;
7 return savedState;
8 } else {
9 throw new IllegalMonitorStateException();
10 }
11 } finally {
12 if (failed)
13 node.waitStatus = Node.CANCELLED;
14 }
15}
可以看出,不管有多少次重入,都会被一次性释放。fullyRelease方法返回的是锁释放之前的重入次数。
阻塞
锁释放后,便会将当前线程阻塞直到被signal或中断。
1while (!isOnSyncQueue(node)) {
2 LockSupport.park(this);
3 if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
4 break;
5}
isOnSyncQueue用以判断节点是否在锁队列(SyncQueue)上,源码:
1final boolean isOnSyncQueue(Node node) {
2 if (node.waitStatus == Node.CONDITION || node.prev == null)
3 return false;
4 if (node.next != null) // If has successor, it must be on queue
5 return true;
6 return findNodeFromTail(node);
7}
可见,只要满足waitStatus是CONDITION或prev/next为空,那么就不在锁队列上,因为条件队列是用nextWaiter连接的,而不是prev/next。
那么是么时候会spin到锁队列中去呢,应该是在signal中。
先中断 or 先唤醒
checkInterruptWhileWaiting方法用以判断中断是发生在被signal之前还是之后。
ConditionObject.checkInterruptWhileWaiting:
1private int checkInterruptWhileWaiting(Node node) {
2 //THROW_IE: -1,先中断
3 //REINTERRUPT: 1,先唤醒
4 return Thread.interrupted() ?
5 (transferAfterCancelledWait(node) ? THROW_IE : REINTERRUPT) : 0;
6}
AbstractQueuedSynchronizer.transferAfterCancelledWait:
1final boolean transferAfterCancelledWait(Node node) {
2 if (compareAndSetWaitStatus(node, Node.CONDITION, 0)) {
3 enq(node);
4 return true;
5 }
6 //等待signal线程完成到锁队列的转换工作
7 while (!isOnSyncQueue(node))
8 Thread.yield();
9 return false;
10}
可见,对先中断和先唤醒的区分是通过状态是不是CONDITION完成的,如果是先中断,那么将其状态设为0并将节点加入到锁队列。
节点的状态被设为0的原因后面会提到。
重新获得锁
由于当前线程本来是拥有锁的,所以从阻塞中恢复(被中断或唤醒)之后,需要恢复到最初的状态,才能继续执行后续的操作。
恢复是通过acquireQueued方法完成的:
1if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
2 interruptMode = REINTERRUPT;
从这里也可以看出保存最初的锁状态(savedState)的原因。
acquireQueued方法在ReentrantLock中已经说过了,在此不再赘述,只是注意一点:
AbstractQueuedSynchronizer.shouldParkAfterFailedAcquire:
1private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
2 int ws = pred.waitStatus;
3 if (ws == Node.SIGNAL)
4 return true;
5 if (ws > 0) {
6 do {
7 node.prev = pred = pred.prev;
8 } while (pred.waitStatus > 0);
9 pred.next = node;
10 } else {
11 //here
12 compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
13 }
14 return false;
15}
可以看出,0状态被设为了SIGNAL,这就解释了上面被中断后为什么要将状态设为0了。
是否取消
获得锁之后,首先会对整个条件队列进行扫描,如果有被取消的节点,那么将其从队列中移除。
ConditionObject.unlinkCancelledWaiters:
1private void unlinkCancelledWaiters() {
2 Node t = firstWaiter;
3 Node trail = null;
4 while (t != null) {
5 Node next = t.nextWaiter;
6 if (t.waitStatus != Node.CONDITION) {
7 t.nextWaiter = null;
8 if (trail == null)
9 firstWaiter = next;
10 else
11 trail.nextWaiter = next;
12 if (next == null)
13 lastWaiter = trail;
14 }
15 else
16 trail = t;
17 t = next;
18 }
19}
就是一个喜闻乐见的链表操作。
中断报告
ConditionObject.reportInterruptAfterWait:
1private void reportInterruptAfterWait(int interruptMode) throws InterruptedException {
2 if (interruptMode == THROW_IE)
3 throw new InterruptedException();
4 else if (interruptMode == REINTERRUPT)
5 //重新设置中断标志位
6 selfInterrupt();
7}
signalAll
ConditionObject.signalAll:
1public final void signalAll() {
2 //检查当前线程是不是拥有锁
3 if (!isHeldExclusively())
4 throw new IllegalMonitorStateException();
5 Node first = firstWaiter;
6 if (first != null)
7 doSignalAll(first);
8}
doSignalAll:
1private void doSignalAll(Node first) {
2 lastWaiter = firstWaiter = null;
3 do {
4 Node next = first.nextWaiter;
5 first.nextWaiter = null;
6 transferForSignal(first);
7 first = next;
8 } while (first != null);
9}
可见,这里所做的就是将节点从条件队列转移到锁队列中去。
节点转移
AbstractQueuedSynchronizer.transferForSignal:
1final boolean transferForSignal(Node node) {
2 if (!compareAndSetWaitStatus(node, Node.CONDITION, 0))
3 return false;
4 Node p = enq(node);
5 int ws = p.waitStatus;
6 //将状态设为SIGNAL
7 if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL))
8 LockSupport.unpark(node.thread);
9 return true;
10}
CAS
既然await和signal操作都是在有锁的情况下调用的,那么为什么还是存在大量CAS调用?
推测是这样的,await有一段代码是在没有锁的情况下执行的:
1while (!isOnSyncQueue(node)) {
2 LockSupport.park(this);
3 if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
4 break;
5}
与这段代码的交互需要CAS。
总结
唤醒操作(signal/signalAll)并不会使线程马上获得锁,而是使之拥有获得锁的资格。