理解 Java ReentrantLock 实现

概述

ReentrantLock 是一种可重入的互斥锁,实现了 Lock 接口,提供与隐式 monitor 锁相同的互斥性和内存语义性。除此之外,ReentrantLock 还支持可重入:已经持有锁的线程可以继续多次获得该锁而不会被阻塞。

ReentrantLock 构造方法接受一个可选的 fairness 参数,可以在创建锁时指定公平性。公平锁与非公平锁各有利弊:非公平锁可以保证较高的吞吐量,但是可能会造成线程“饥饿”的情况;而公平锁可以保证线程按序获得锁,但是会造成更多次线程切换,减少吞吐量。

ReentrantLock 支持同一个线程最多 2147483647 次获得锁,超过这个限制将抛出 Error。

实现

构造方法

ReentrantLock 实现依赖于 AbstractQueuedSynchronizer(以下简称 AQS),使用同步状态表示线程已经获取锁的次数,如果同步状态为 0,表示没有线程占有锁;否则,同步状态的值表示线程获取锁的次数。核心成员变量为:

private final Sync sync;

Sync 为基类,继承自 AQS:

abstract static class Sync extends AbstractQueuedSynchronizer 

其子类有 NonfairSync 和 FairSync,分别对应非公平锁和公平锁实现。在 ReentrantLock 构造方法中,将根据公平性初始化 sync。如果没有指定公平性,那么创建 NonfairSync 对象;否则根据 fair 判断是创建 FairSync 还是 NonfairSync:

public ReentrantLock() {
    sync = new NonfairSync();
}
    
public ReentrantLock(boolean fair) {
    sync = fair ? new FairSync() : new NonfairSync();
}

ReentrantLock 只支持独占式获取同步状态,所以 Sync 和其子类必须实现 tryAcquire、tryRelease 和 isHeldExclusively。

核心操作

ReentrantLock 的核心操作分别是 lock 和 unlock。

lock

lock 定义如下:

//in ReentrantLock
public void lock() {
    sync.lock();
}

实际上调用 sync.lock 方法。根据 sync 是 FairSync 还是 NonfairSync,对应又有不同的实现。

在 NonfairSync 中,lock 实现如下:

//in NonfairSync
final void lock() {
    if (compareAndSetState(0, 1))
        setExclusiveOwnerThread(Thread.currentThread());
    else
        acquire(1);
}

首先尝试独占式获取同步状态,如果获取失败再调用 acquire 去竞争。

而在 FairSync 中,lock 直接调用 acquire 竞争同步状态:

//in FairSync
final void lock() {
    acquire(1);
}

acquire 是基类 Sync 继承 AQS 得到的方法,在 AQS 中 acquire 定义如下:

//in AQS
public final void acquire(int arg){
    if(!tryAcquire(arg) && 
        acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
        selfInterrupte();
}

tryAcquire 由 AQS 子类定义。acquire 首先调用 tryAcquire 获取同步状态,如果获取失败,那么再调用 addWaiter 将线程封装成节点并加入到同步队列,调用 acquireQueued 在队列中自旋等待。

在 NonfairSync 中,tryAcquire 定义如下:

//in NonfairSync
protected final boolean tryAcquire(int acquires) {
    return nonfairTryAcquire(acquires);
}

tryAcquire 把调用请求委托给 nonfairTryAcquire。nonfairTryAcquire 在基类 Sync 中定义,从名字可以看出该方法不考虑公平性:

//in Sync
final boolean nonfairTryAcquire(int acquires) {
    final Thread current = Thread.currentThread();
    int c = getState();
    if (c == 0) {
    if (compareAndSetState(0, acquires)) {
        setExclusiveOwnerThread(current);
        return true;
        }
    }
    else if (current == getExclusiveOwnerThread()) {
        int nextc = c + acquires;
        if (nextc < 0) // overflow
            throw new Error("Maximum lock count exceeded");
        setState(nextc);
        return true;
    }
    return false;
}

nonfairTryAcquire 首先通过 getState() 获取当前同步状态,如果同步状态为 0,说明当前没有线程占有锁,那么使用 compareAndSetState 以 CAS 方式占有锁,并且使用 setExclusiveOwnerThread 设置锁的占有者;否则,判断当前线程是否已经持有锁,如果是,那么增加锁持有次数;否则返回 false。

在 FairSync 中,tryAcquire 定义如下:

//in FairSync
protected final boolean tryAcquire(int acquires) {
    final Thread current = Thread.currentThread();
    int c = getState();
    if (c == 0) {
        if (!hasQueuedPredecessors() &&
            compareAndSetState(0, acquires)) {
            setExclusiveOwnerThread(current);
            return true;
        }
    }
    else if (current == getExclusiveOwnerThread()) {
        int nextc = c + acquires;
        if (nextc < 0)
            throw new Error("Maximum lock count exceeded");
        setState(nextc);
        return true;
    }
    return false;
}

tryAcquire 类似于父类 Sync 的 nonfairTryAcquire 实现,不同的是,如果同步状态等于 0,为了保证公平性,需要调用 hasQueuedPredecessors() 进行判断。hasQueuedPredecessors 检查同步队列中当前线程所在节点是否有前驱节点,如果 hasQueuedPredecessors 返回 true,说明有前驱节点,那么不应该由当前线程获得锁,tryAcquire 返回 false。如果同步状态不等于 0,说明已经有线程占用锁,那么需要判断当前线程是否是锁占用者,如果是那么就增加锁持有次数,否则也返回 false。

unlock

unlock 定义如下:

//in ReentrantLock
public void unlock(){
    sync.release(1);
}

实际上调用了 sync.release 方法。release 是基类 Sync 继承 AQS 得到的方法,在 AQS 中 release 定义如下:

//in AQS
public final boolean release(int arg) {
    if (tryRelease(arg)) {
        Node h = head;
        if (h != null && h.waitStatus != 0)
            unparkSuccessor(h);
        return true;
    }
    return false;
}

tryRelease 也由 AQS 子类定义。release 首先调用 tryRelease 将同步状态减少 arg 次(在这里是减少一次)。如果更新后的同步状态为 0,那么 tryRelease 返回 true,否则返回 false。当 tryRelease 返回 true,说明线程已经释放锁,那么使用 unparkSuccessor 通知同步队列后续节点。

tryRelease 在基类 Sync 中定义:

//in Sync
protected final boolean tryRelease(int releases) {
    int c = getState() - releases;
    if (Thread.currentThread() != getExclusiveOwnerThread())
        throw new IllegalMonitorStateException();
    boolean free = false;
    if (c == 0) {
        free = true;
        setExclusiveOwnerThread(null);
    }
    setState(c);
    return free;
}

其它加锁方式

ReentrantLock 还可以响应中断:在获取锁的过程中可以中断线程,对应方法为 lockInterruptibly:

//in ReentrantLock
public void lockInterruptibly() throws InterruptedException {
    sync.acquireInterruptibly(1);
}

acquireInterruptibly 是基类 Sync 继承 AQS 得到的方法,在 AQS 中 acquireInterruptibly 定义为:

//in AQS
public final void acquireInterruptibly(int arg)
            throws InterruptedException {
    if (Thread.interrupted())
        throw new InterruptedException();
    if (!tryAcquire(arg))
        doAcquireInterruptibly(arg);
}

首先判断线程是否被中断,如果是,那么抛出 InterruptedException 异常。否则调用 tryAcquire 尝试获取同步状态,如果获取失败,那么调用 doAcquireInterruptibly。

doAcquireInterruptibly 也定义在 AQS 中:

private void doAcquireInterruptibly(int arg)
        throws InterruptedException {
    final Node node = addWaiter(Node.EXCLUSIVE);
    boolean failed = true;
    try {
        for (;;) {
            final Node p = node.predecessor();
            if (p == head && tryAcquire(arg)) {
                setHead(node);
                p.next = null; // help GC
                failed = false;
                return;
            }
            if (shouldParkAfterFailedAcquire(p, node) &&
                parkAndCheckInterrupt())
                throw new InterruptedException();
            }
    } finally {
        if (failed)
            cancelAcquire(node);
        }
}

doAcquireInterruptibly 在自旋等待的过程中,还会检查线程是否被中断。如果线程被中断,那么就会抛出 InterruptedException 异常。

其它方法

ReentrantLock 还提供方法监控锁以及创建 Condition:

isHeldExclusively

protected final boolean isHeldExclusively() {
    return getExclusiveOwnerThread() == Thread.currentThread();
}

isHeldExclusively 判断是否是当前线程独占锁。

newCondition

final ConditionObject newCondition() {
    return new ConditionObject();
}

newCondition 返回一个与该锁绑定的 ConditionObject,每一个 ConditionObject 就代表一个等待条件。ReentrantLock 支持创建多个等待条件。

hasWaiters

public boolean hasWaiters(Condition condition) {
    if (condition == null)
        throw new NullPointerException();
    if (!(condition instanceof AbstractQueuedSynchronizer.ConditionObject))
        throw new IllegalArgumentException("not owner");
    return sync.hasWaiters((AbstractQueuedSynchronizer.ConditionObject)condition);
}

hasWaiters 可以判断是否有线程在等待指定的 Condition。考虑到超时和中断的存在,即使 hasWaiters 返回 true,也不能保证未来调用 signal 时有线程会被唤醒。

getWaitQueueLength

public int getWaitQueueLength(Condition condition) {
    if (condition == null)
        throw new NullPointerException();
    if (!(condition instanceof AbstractQueuedSynchronizer.ConditionObject))
        throw new IllegalArgumentException("not owner");
    return sync.getWaitQueueLength((AbstractQueuedSynchronizer.ConditionObject)condition);
}

返回指定 Condition 的同步队列长度。因为超时和中断的存在,返回值只能作为实际等待线程数量的上限。

getWaitingThreads

protected Collection<Thread> getWaitingThreads(Condition condition) {
    if (condition == null)
        throw new NullPointerException();
    if (!(condition instanceof AbstractQueuedSynchronizer.ConditionObject))
        throw new IllegalArgumentException("not owner");
    return sync.getWaitingThreads((AbstractQueuedSynchronizer.ConditionObject)condition);
}

返回所有在等待指定 Condition 的线程集合。同理,因为超时和中断的存在,该结果也不能保证绝对正确。


Previous post: 理解 Java ArrayBlockingQueue 实现

Next post: 理解 Java Semaphore 实现