ReentrantLock

ReentrantLock

类图

如下:

ReentrantLock类图

Sync

观察ReentrantLock关键方法的源码可以发现,其实现是委托给内部类Sync实现的,所以逻辑的关键便在于此类。例子:

1public void lock() {
2    sync.lock();
3}

类图

Sync类图

很明显,锁的公平性便是由NonFairSync和FairSync实现的,从ReentrantLock的构造器可以印证这一点:

1public ReentrantLock(boolean fair) {
2    sync = fair ? new FairSync() : new NonfairSync();
3}

锁的公平性指的是等待时间最长的线程最有机会获得锁,但是这样会导致性能的下降,因为此时对于线程的调度和操作系统的调度是矛盾的。参考:

ReentrantLock(重入锁)以及公平性

非公平锁

lock

NonfairSync.lock:

1final void lock() {
2    if (compareAndSetState(0, 1))
3        setExclusiveOwnerThread(Thread.currentThread());
4    else
5        acquire(1);
6}

快速尝试

AbstractQueuedSynchronizer.compareAndSetState:

1protected final boolean compareAndSetState(int expect, int update) {
2    return unsafe.compareAndSwapInt(this, stateOffset, expect, update);
3}

AbstractQueuedSynchronizer内部有一个state变量:

1/**
2 * The synchronization state.
3 */
4private volatile int state;

其记录了线程重入此锁的次数,如果为0,那么表示现在没有线程持有此锁,此时使用一个CAS操作即可快速完成锁的申请,这便是快速尝试

setExclusiveOwnerThread方法用以记录是哪个线程当前持有锁。

如果快速尝试失败(即锁已被某个线程持有),那么将调用AbstractQueuedSynchronizer.acquire()方法:

1public final void acquire(int arg) {
2    if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
3        selfInterrupt();
4}

是否重入

NonfairSync.tryAcquire:

1protected final boolean tryAcquire(int acquires) {
2    return nonfairTryAcquire(acquires);
3}

Sync.nonfairTryAcquire:

 1final boolean nonfairTryAcquire(int acquires) {
 2    final Thread current = Thread.currentThread();
 3    //获取重入次数
 4    int c = getState();
 5    //再次检查是否没有线程持有当前锁
 6    if (c == 0) {
 7        if (compareAndSetState(0, acquires)) {
 8            setExclusiveOwnerThread(current);
 9            return true;
10        }
11    }
12    //是否当前线程正在重入
13    else if (current == getExclusiveOwnerThread()) {
14        int nextc = c + acquires;
15        //支持的最大重入次数: Ingteger.MAX_VALUE
16        if (nextc < 0) // overflow
17            throw new Error("Maximum lock count exceeded");
18        setState(nextc);
19        return true;
20    }
21    return false;
22}

很明显了,这里体现的便是重入

锁队列

如果锁正在被其它线程持有,那么只能进入队列等待锁。

原理

JDK使用的是CLH队列的一种变种。什么是CLH队列?可以参考:

JAVA并发编程学习笔记之CLH队列锁

队列的节点在AbstractQueuedSynchronizer的嵌套类Node中定义,其类图:

Node类图

注: 大写的表示常量。

Node节点形成这样的结构:

锁队列

nextWaiter用于组成Condition等待队列。众所周知,Condition只能用在已经获得锁的情况下,所以Condition等待队列不同于锁队列,Condition队列结构如下图:

Condition队列

源码
入队

入队实际上就是创建一个新的节点并将其设为tail的过程。

AbstractQueuedSynchronizer.addWaiter():

 1private Node addWaiter(Node mode) {
 2    Node node = new Node(Thread.currentThread(), mode);
 3    // Try the fast path of enq; backup to full enq on failure
 4    Node pred = tail;
 5    if (pred != null) {
 6        node.prev = pred;
 7        if (compareAndSetTail(pred, node)) {
 8            pred.next = node;
 9            return node;
10        }
11    }
12    enq(node);
13    return node;
14}

可见,此处同样使用了快速尝试的思想,如果CAS尝试失败,那么再调用enq方法,源码:

 1private Node enq(final Node node) {
 2    for (;;) {
 3        Node t = tail;
 4        if (t == null) { // Must initialize
 5            //可见,head其实是一个"空的Node"
 6            if (compareAndSetHead(new Node()))
 7                tail = head;
 8        } else {
 9            node.prev = t;
10            if (compareAndSetTail(t, node)) {
11                t.next = node;
12                return t;
13            }
14        }
15    }
16}

其实还是CAS操作,加了一个死循环,直到成功为止。

锁获取/等待

当前线程被加入到锁队列之后,整下的便是排队等候了。锁队列中当前节点可以获得锁的条件便是上一个节点(prev)释放了锁

AbstractQueuedSynchronizer.acquireQueued:

 1final boolean acquireQueued(final Node node, int arg) {
 2    boolean failed = true;
 3    try {
 4        boolean interrupted = false;
 5        for (;;) {
 6            final Node p = node.predecessor();
 7            //前一个是head,那么表示当前节点即是等待时间最长的线程,并立即尝试获得锁
 8            if (p == head && tryAcquire(arg)) {
 9                setHead(node);
10                p.next = null; // help GC
11                failed = false;
12                return interrupted;
13            }
14            //执行到这里说明当前节点不是等待时间最长的节点或者锁竞争失败
15            if (shouldParkAfterFailedAcquire(p, node) &&
16                parkAndCheckInterrupt())
17                interrupted = true;
18        }
19    } finally {
20        if (failed)
21            cancelAcquire(node);
22    }
23}

从这里可以看出,当当前节点就是等待时间最长的节点(队首节点)时,仍然需要调用tryAcquire竞争锁,而与此同时新来的线程有可能同时调用tryAcquire方法与之竞争,这便是非公平性的体现。

shouldParkAfterFailedAcquire方法用于检测当前线程是否应该休眠,源码:

 1private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
 2    int ws = pred.waitStatus;
 3    if (ws == Node.SIGNAL)
 4        /*
 5         * This node has already set status asking a release
 6         * to signal it, so it can safely park.
 7         */
 8        return true;
 9    //上一个节点已经被取消,所以需要"跳过"前面所有已经被取消的节点
10    if (ws > 0) {
11        do {
12            node.prev = pred = pred.prev;
13        } while (pred.waitStatus > 0);
14        pred.next = node;
15    } else {
16        /*
17         * waitStatus must be 0 or PROPAGATE.  Indicate that we
18         * need a signal, but don't park yet.  Caller will need to
19         * retry to make sure it cannot acquire before parking.
20         */
21        compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
22    }
23    return false;
24}

从这里可以看出,当前一个节点的status为Signal时,当前节点(线程)便进入挂起状态,等待前一个节点释放锁,前一个节点释放锁时,会唤醒当前节点,那时当前节点会再次进行锁竞争

再来看一下是如何挂起线程的,AbstractQueuedSynchronizer.parkAndCheckInterrupt:

1private final boolean parkAndCheckInterrupt() {
2    LockSupport.park(this);
3    return Thread.interrupted();
4}

LockSupport相当于一个工具类,只有静态方法,私有构造器。

LockSupport.park:

1public static void park(Object blocker) {
2    Thread t = Thread.currentThread();
3    setBlocker(t, blocker);
4    //native
5    UNSAFE.park(false, 0L);
6    setBlocker(t, null);
7}

此时,线程便会阻塞(休眠)在UNSAFE.park(false, 0L);这一句,直到有以下三种情形发生:

  • unpark方法被调用

  • interrupt方法被调用

  • The call spuriously (that is, for no reason) returns.

    其实说的就是Spurious wakeup: 虚假唤醒。虚假唤醒指的便是阻塞的线程被"莫名其妙"的唤醒,这是多核处理器中不可避免的问题,参见:

    Spurious wakeup

    其实经常写的:

    1while (!condition)
    2  await();
    

    便是为了防止此问题。

从acquireQueued方法的源码中可以看出,即使发生了上面所说的三个条件,也只是将变量interrupted设为了true而已,这也就是为什么lock方法会"义无反顾"地在这里等待锁的原因了。

自中断

当获得锁成功后,会将自己中断:

AbstractQueuedSynchronizer.selfInterrupt:

1static void selfInterrupt() {
2    Thread.currentThread().interrupt();
3}

lockInterruptibly

此方法将在以下两种情况下返回:

  • 获得锁
  • 被中断

AbstractQueuedSynchronizer.acquireInterruptibly:

1public final void acquireInterruptibly(int arg) throws InterruptedException {
2    if (Thread.interrupted())
3        throw new InterruptedException();
4    if (!tryAcquire(arg))
5        doAcquireInterruptibly(arg);
6}

tryAcquire方法前面已经说过了,这里不再赘述。

AbstractQueuedSynchronizer.doAcquireInterruptibly:

 1private void doAcquireInterruptibly(int arg) throws InterruptedException {
 2    final Node node = addWaiter(Node.EXCLUSIVE);
 3    boolean failed = true;
 4    try {
 5        for (;;) {
 6            final Node p = node.predecessor();
 7            if (p == head && tryAcquire(arg)) {
 8                setHead(node);
 9                p.next = null; // help GC
10                failed = false;
11                return;
12            }
13            if (shouldParkAfterFailedAcquire(p, node) &&
14                parkAndCheckInterrupt())
15                //看这里!
16                throw new InterruptedException();
17        }
18    } finally {
19        if (failed)
20            cancelAcquire(node);
21    }
22}

和lock方法相比其实只有一行不一样,这便是lockInterruptibly可以相应中断的原因了。

tryLock

此方法只是尝试一下现在能不能获得锁,不管结果怎样马上返回。

源码就是Sync.nonfairTryAcquire方法,前面已经说过了。

tryLock带时间参数

AbstractQueuedSynchronizer.tryAcquireNanos:

1public final boolean tryAcquireNanos(int arg, long nanosTimeout) throws InterruptedException {
2    if (Thread.interrupted())
3        throw new InterruptedException();
4    return tryAcquire(arg) || doAcquireNanos(arg, nanosTimeout);
5}

doAcquireNanos:

 1private boolean doAcquireNanos(int arg, long nanosTimeout) throws InterruptedException {
 2    if (nanosTimeout <= 0L)
 3        return false;
 4    final long deadline = System.nanoTime() + nanosTimeout;
 5    final Node node = addWaiter(Node.EXCLUSIVE);
 6    boolean failed = true;
 7    try {
 8        for (;;) {
 9            final Node p = node.predecessor();
10            if (p == head && tryAcquire(arg)) {
11                setHead(node);
12                p.next = null; // help GC
13                failed = false;
14                return true;
15            }
16            nanosTimeout = deadline - System.nanoTime();
17            if (nanosTimeout <= 0L)
18                return false;
19            if (shouldParkAfterFailedAcquire(p, node) && nanosTimeout > spinForTimeoutThreshold)
20                //挂起指定的时间
21                LockSupport.parkNanos(this, nanosTimeout);
22            if (Thread.interrupted())
23                throw new InterruptedException();
24        }
25    } finally {
26        if (failed)
27            cancelAcquire(node);
28    }
29}

spinForTimeoutThreshold为1000纳秒,可以看出,如果给定的等待时间大于1000纳秒,才会进行线程休眠,否则将会一直轮询。

unlock

调用了AbstractQueuedSynchronizer.release:

1public final boolean release(int arg) {
2    if (tryRelease(arg)) {
3        Node h = head;
4        if (h != null && h.waitStatus != 0)
5            unparkSuccessor(h);
6        return true;
7    }
8    return false;
9}

锁释放

Sync.tryRelease:

 1protected final boolean tryRelease(int releases) {
 2    int c = getState() - releases;
 3    if (Thread.currentThread() != getExclusiveOwnerThread())
 4        throw new IllegalMonitorStateException();
 5    boolean free = false;
 6    if (c == 0) {
 7        free = true;
 8        setExclusiveOwnerThread(null);
 9    }
10    setState(c);
11    return free;
12}

由于锁释放的时候必定拥有锁,所以可以放心大胆的搞。如果当前线程已经不再持有此锁,那么返回true。

节点唤醒

如果当前线程已经不再持有此锁(即tryRelease返回true),那么将会唤醒锁队列中的下一个或多个节点。

unparkSuccessor:

 1 private void unparkSuccessor(Node node) {
 2    int ws = node.waitStatus;
 3    if (ws < 0)
 4        compareAndSetWaitStatus(node, ws, 0);
 5
 6    Node s = node.next;
 7    if (s == null || s.waitStatus > 0) {
 8        s = null;
 9        for (Node t = tail; t != null && t != node; t = t.prev)
10            if (t.waitStatus <= 0)
11                s = t;
12    }
13    if (s != null)
14        LockSupport.unpark(s.thread);
15}

可以看出,先是检查下一个节点(next),如果没有被取消,那么唤醒它即可,如果已经被取消,那么将倒着从后面查找。

公平锁

原理和非公平锁大同小异,在这里只说下是如何体现其公平性的。

FairSync.lock:

1final void lock() {
2    acquire(1);
3}

最终执行的是FairSync.tryAcquire:

 1protected final boolean tryAcquire(int acquires) {
 2    final Thread current = Thread.currentThread();
 3    int c = getState();
 4    if (c == 0) {
 5        //这里
 6        if (!hasQueuedPredecessors() &&
 7            compareAndSetState(0, acquires)) {
 8            setExclusiveOwnerThread(current);
 9            return true;
10        }
11    }
12    else if (current == getExclusiveOwnerThread()) {
13        int nextc = c + acquires;
14        if (nextc < 0)
15            throw new Error("Maximum lock count exceeded");
16        setState(nextc);
17        return true;
18    }
19    return false;
20}

关键便是!hasQueuedPredecessors这个条件,这就保证了队列前面的节点一定会先获得锁