ReentrantLock
ReentrantLock
类图
如下:
Sync
观察ReentrantLock关键方法的源码可以发现,其实现是委托给内部类Sync实现的,所以逻辑的关键便在于此类。例子:
1public void lock() {
2 sync.lock();
3}
类图
很明显,锁的公平性便是由NonFairSync和FairSync实现的,从ReentrantLock的构造器可以印证这一点:
1public ReentrantLock(boolean fair) {
2 sync = fair ? new FairSync() : new NonfairSync();
3}
锁的公平性指的是等待时间最长的线程最有机会获得锁,但是这样会导致性能的下降,因为此时对于线程的调度和操作系统的调度是矛盾的。参考:
非公平锁
lock
NonfairSync.lock:
1final void lock() {
2 if (compareAndSetState(0, 1))
3 setExclusiveOwnerThread(Thread.currentThread());
4 else
5 acquire(1);
6}
快速尝试
AbstractQueuedSynchronizer.compareAndSetState:
1protected final boolean compareAndSetState(int expect, int update) {
2 return unsafe.compareAndSwapInt(this, stateOffset, expect, update);
3}
AbstractQueuedSynchronizer内部有一个state变量:
1/**
2 * The synchronization state.
3 */
4private volatile int state;
其记录了线程重入此锁的次数,如果为0,那么表示现在没有线程持有此锁,此时使用一个CAS操作即可快速完成锁的申请,这便是快速尝试。
setExclusiveOwnerThread方法用以记录是哪个线程当前持有锁。
如果快速尝试失败(即锁已被某个线程持有),那么将调用AbstractQueuedSynchronizer.acquire()方法:
1public final void acquire(int arg) {
2 if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
3 selfInterrupt();
4}
是否重入
NonfairSync.tryAcquire:
1protected final boolean tryAcquire(int acquires) {
2 return nonfairTryAcquire(acquires);
3}
Sync.nonfairTryAcquire:
1final boolean nonfairTryAcquire(int acquires) {
2 final Thread current = Thread.currentThread();
3 //获取重入次数
4 int c = getState();
5 //再次检查是否没有线程持有当前锁
6 if (c == 0) {
7 if (compareAndSetState(0, acquires)) {
8 setExclusiveOwnerThread(current);
9 return true;
10 }
11 }
12 //是否当前线程正在重入
13 else if (current == getExclusiveOwnerThread()) {
14 int nextc = c + acquires;
15 //支持的最大重入次数: Ingteger.MAX_VALUE
16 if (nextc < 0) // overflow
17 throw new Error("Maximum lock count exceeded");
18 setState(nextc);
19 return true;
20 }
21 return false;
22}
很明显了,这里体现的便是重入。
锁队列
如果锁正在被其它线程持有,那么只能进入队列等待锁。
原理
JDK使用的是CLH队列的一种变种。什么是CLH队列?可以参考:
队列的节点在AbstractQueuedSynchronizer的嵌套类Node中定义,其类图:
注: 大写的表示常量。
Node节点形成这样的结构:
nextWaiter用于组成Condition等待队列。众所周知,Condition只能用在已经获得锁的情况下,所以Condition等待队列不同于锁队列,Condition队列结构如下图:
源码
入队
入队实际上就是创建一个新的节点并将其设为tail的过程。
AbstractQueuedSynchronizer.addWaiter():
1private Node addWaiter(Node mode) {
2 Node node = new Node(Thread.currentThread(), mode);
3 // Try the fast path of enq; backup to full enq on failure
4 Node pred = tail;
5 if (pred != null) {
6 node.prev = pred;
7 if (compareAndSetTail(pred, node)) {
8 pred.next = node;
9 return node;
10 }
11 }
12 enq(node);
13 return node;
14}
可见,此处同样使用了快速尝试的思想,如果CAS尝试失败,那么再调用enq方法,源码:
1private Node enq(final Node node) {
2 for (;;) {
3 Node t = tail;
4 if (t == null) { // Must initialize
5 //可见,head其实是一个"空的Node"
6 if (compareAndSetHead(new Node()))
7 tail = head;
8 } else {
9 node.prev = t;
10 if (compareAndSetTail(t, node)) {
11 t.next = node;
12 return t;
13 }
14 }
15 }
16}
其实还是CAS操作,加了一个死循环,直到成功为止。
锁获取/等待
当前线程被加入到锁队列之后,整下的便是排队等候了。锁队列中当前节点可以获得锁的条件便是上一个节点(prev)释放了锁。
AbstractQueuedSynchronizer.acquireQueued:
1final boolean acquireQueued(final Node node, int arg) {
2 boolean failed = true;
3 try {
4 boolean interrupted = false;
5 for (;;) {
6 final Node p = node.predecessor();
7 //前一个是head,那么表示当前节点即是等待时间最长的线程,并立即尝试获得锁
8 if (p == head && tryAcquire(arg)) {
9 setHead(node);
10 p.next = null; // help GC
11 failed = false;
12 return interrupted;
13 }
14 //执行到这里说明当前节点不是等待时间最长的节点或者锁竞争失败
15 if (shouldParkAfterFailedAcquire(p, node) &&
16 parkAndCheckInterrupt())
17 interrupted = true;
18 }
19 } finally {
20 if (failed)
21 cancelAcquire(node);
22 }
23}
从这里可以看出,当当前节点就是等待时间最长的节点(队首节点)时,仍然需要调用tryAcquire竞争锁,而与此同时新来的线程有可能同时调用tryAcquire方法与之竞争,这便是非公平性的体现。
shouldParkAfterFailedAcquire方法用于检测当前线程是否应该休眠,源码:
1private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
2 int ws = pred.waitStatus;
3 if (ws == Node.SIGNAL)
4 /*
5 * This node has already set status asking a release
6 * to signal it, so it can safely park.
7 */
8 return true;
9 //上一个节点已经被取消,所以需要"跳过"前面所有已经被取消的节点
10 if (ws > 0) {
11 do {
12 node.prev = pred = pred.prev;
13 } while (pred.waitStatus > 0);
14 pred.next = node;
15 } else {
16 /*
17 * waitStatus must be 0 or PROPAGATE. Indicate that we
18 * need a signal, but don't park yet. Caller will need to
19 * retry to make sure it cannot acquire before parking.
20 */
21 compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
22 }
23 return false;
24}
从这里可以看出,当前一个节点的status为Signal时,当前节点(线程)便进入挂起状态,等待前一个节点释放锁,前一个节点释放锁时,会唤醒当前节点,那时当前节点会再次进行锁竞争。
再来看一下是如何挂起线程的,AbstractQueuedSynchronizer.parkAndCheckInterrupt:
1private final boolean parkAndCheckInterrupt() {
2 LockSupport.park(this);
3 return Thread.interrupted();
4}
LockSupport相当于一个工具类,只有静态方法,私有构造器。
LockSupport.park:
1public static void park(Object blocker) {
2 Thread t = Thread.currentThread();
3 setBlocker(t, blocker);
4 //native
5 UNSAFE.park(false, 0L);
6 setBlocker(t, null);
7}
此时,线程便会阻塞(休眠)在UNSAFE.park(false, 0L);这一句,直到有以下三种情形发生:
-
unpark方法被调用
-
interrupt方法被调用
-
The call spuriously (that is, for no reason) returns.
其实说的就是Spurious wakeup: 虚假唤醒。虚假唤醒指的便是阻塞的线程被"莫名其妙"的唤醒,这是多核处理器中不可避免的问题,参见:
其实经常写的:
1while (!condition) 2 await();便是为了防止此问题。
从acquireQueued方法的源码中可以看出,即使发生了上面所说的三个条件,也只是将变量interrupted设为了true而已,这也就是为什么lock方法会"义无反顾"地在这里等待锁的原因了。
自中断
当获得锁成功后,会将自己中断:
AbstractQueuedSynchronizer.selfInterrupt:
1static void selfInterrupt() {
2 Thread.currentThread().interrupt();
3}
lockInterruptibly
此方法将在以下两种情况下返回:
- 获得锁
- 被中断
AbstractQueuedSynchronizer.acquireInterruptibly:
1public final void acquireInterruptibly(int arg) throws InterruptedException {
2 if (Thread.interrupted())
3 throw new InterruptedException();
4 if (!tryAcquire(arg))
5 doAcquireInterruptibly(arg);
6}
tryAcquire方法前面已经说过了,这里不再赘述。
AbstractQueuedSynchronizer.doAcquireInterruptibly:
1private void doAcquireInterruptibly(int arg) throws InterruptedException {
2 final Node node = addWaiter(Node.EXCLUSIVE);
3 boolean failed = true;
4 try {
5 for (;;) {
6 final Node p = node.predecessor();
7 if (p == head && tryAcquire(arg)) {
8 setHead(node);
9 p.next = null; // help GC
10 failed = false;
11 return;
12 }
13 if (shouldParkAfterFailedAcquire(p, node) &&
14 parkAndCheckInterrupt())
15 //看这里!
16 throw new InterruptedException();
17 }
18 } finally {
19 if (failed)
20 cancelAcquire(node);
21 }
22}
和lock方法相比其实只有一行不一样,这便是lockInterruptibly可以相应中断的原因了。
tryLock
此方法只是尝试一下现在能不能获得锁,不管结果怎样马上返回。
源码就是Sync.nonfairTryAcquire方法,前面已经说过了。
tryLock带时间参数
AbstractQueuedSynchronizer.tryAcquireNanos:
1public final boolean tryAcquireNanos(int arg, long nanosTimeout) throws InterruptedException {
2 if (Thread.interrupted())
3 throw new InterruptedException();
4 return tryAcquire(arg) || doAcquireNanos(arg, nanosTimeout);
5}
doAcquireNanos:
1private boolean doAcquireNanos(int arg, long nanosTimeout) throws InterruptedException {
2 if (nanosTimeout <= 0L)
3 return false;
4 final long deadline = System.nanoTime() + nanosTimeout;
5 final Node node = addWaiter(Node.EXCLUSIVE);
6 boolean failed = true;
7 try {
8 for (;;) {
9 final Node p = node.predecessor();
10 if (p == head && tryAcquire(arg)) {
11 setHead(node);
12 p.next = null; // help GC
13 failed = false;
14 return true;
15 }
16 nanosTimeout = deadline - System.nanoTime();
17 if (nanosTimeout <= 0L)
18 return false;
19 if (shouldParkAfterFailedAcquire(p, node) && nanosTimeout > spinForTimeoutThreshold)
20 //挂起指定的时间
21 LockSupport.parkNanos(this, nanosTimeout);
22 if (Thread.interrupted())
23 throw new InterruptedException();
24 }
25 } finally {
26 if (failed)
27 cancelAcquire(node);
28 }
29}
spinForTimeoutThreshold为1000纳秒,可以看出,如果给定的等待时间大于1000纳秒,才会进行线程休眠,否则将会一直轮询。
unlock
调用了AbstractQueuedSynchronizer.release:
1public final boolean release(int arg) {
2 if (tryRelease(arg)) {
3 Node h = head;
4 if (h != null && h.waitStatus != 0)
5 unparkSuccessor(h);
6 return true;
7 }
8 return false;
9}
锁释放
Sync.tryRelease:
1protected final boolean tryRelease(int releases) {
2 int c = getState() - releases;
3 if (Thread.currentThread() != getExclusiveOwnerThread())
4 throw new IllegalMonitorStateException();
5 boolean free = false;
6 if (c == 0) {
7 free = true;
8 setExclusiveOwnerThread(null);
9 }
10 setState(c);
11 return free;
12}
由于锁释放的时候必定拥有锁,所以可以放心大胆的搞。如果当前线程已经不再持有此锁,那么返回true。
节点唤醒
如果当前线程已经不再持有此锁(即tryRelease返回true),那么将会唤醒锁队列中的下一个或多个节点。
unparkSuccessor:
1 private void unparkSuccessor(Node node) {
2 int ws = node.waitStatus;
3 if (ws < 0)
4 compareAndSetWaitStatus(node, ws, 0);
5
6 Node s = node.next;
7 if (s == null || s.waitStatus > 0) {
8 s = null;
9 for (Node t = tail; t != null && t != node; t = t.prev)
10 if (t.waitStatus <= 0)
11 s = t;
12 }
13 if (s != null)
14 LockSupport.unpark(s.thread);
15}
可以看出,先是检查下一个节点(next),如果没有被取消,那么唤醒它即可,如果已经被取消,那么将倒着从后面查找。
公平锁
原理和非公平锁大同小异,在这里只说下是如何体现其公平性的。
FairSync.lock:
1final void lock() {
2 acquire(1);
3}
最终执行的是FairSync.tryAcquire:
1protected final boolean tryAcquire(int acquires) {
2 final Thread current = Thread.currentThread();
3 int c = getState();
4 if (c == 0) {
5 //这里
6 if (!hasQueuedPredecessors() &&
7 compareAndSetState(0, acquires)) {
8 setExclusiveOwnerThread(current);
9 return true;
10 }
11 }
12 else if (current == getExclusiveOwnerThread()) {
13 int nextc = c + acquires;
14 if (nextc < 0)
15 throw new Error("Maximum lock count exceeded");
16 setState(nextc);
17 return true;
18 }
19 return false;
20}
关键便是!hasQueuedPredecessors这个条件,这就保证了队列前面的节点一定会先获得锁。