理解 Java Semaphore 实现

概述

Semaphore (也称为信号量)表示一定数量的许可(permit)。线程每次成功调用 acquire 就会获得一个许可,剩余许可数量就会减一。如果剩余许可数量为 0,那么阻塞当前线程;类似的,线程每次成功调用 release 会释放一个许可。

与 ReentrantLock 相似,Semaphore 构造方法也可以接受一个可选的 fairness 参数。

实现

Semaphore 依赖于 AbstractQueuedSynchronizer(以下简称 AQS),使用同步状态表示许可的数量,核心成员变量为:

private final Sync sync;

Sync 是一个抽象类,子类分别有 NonfairSync 和 FairSync。sync 将在构造方法中被初始化,默认创建 NonfairSync 对象;当然用户也可以指定公平性:

public Semaphore(int permits) {
    sync = new NonfairSync(permits);
}
    
public Semaphore(int permits, boolean fair) {
    sync = fair ? new FairSync(permits) : new NonfairSync(permits);
}

传入的 permits 可以为负,意味着线程调用 acquire 之前必须先调用 release。

Sync

抽象类 Sync 继承 AQS:

abstract static class Sync extends AbstractQueuedSynchronizer 

因为每个线程可以拥有一份或者多份许可,并且只要剩余许可大于 0,那么线程就可以继续竞争剩余许可,这说明同步状态是被共享的获取。根据 AQS 协议,Sync 和其子类需要实现 tryAcquireShared 和 tryReleaseShared 。

nonfairTryAcquireShared

nonfairTryAcquireShared 定义在父类 Sync 中:

final int nonfairTryAcquireShared(int acquires) {
    for (;;) {
        int available = getState();
        int remaining = available - acquires;
        if (remaining < 0 ||
            compareAndSetState(available, remaining))
                return remaining;
    }
}

nonfairTryAcquireShared 实现较为简单,使用循环 CAS 的方式更新同步状态。如果更新成功,那么返回剩余许可数量;如果许可数量不够,那么返回一个负数。

tryReleaseShared

protected final boolean tryReleaseShared(int releases) {
    for (;;) {
        int current = getState();
        int next = current + releases;
        if (next < current) // overflow
            throw new Error("Maximum permit count exceeded");
        if (compareAndSetState(current, next))
            return true;
    }
}

tryReleaseShared 实现也类似于 nonfairTryAcquireShared,不过这里需要关注许可数量溢出。

NonfairSync

NonfairSync 版本的 tryAcquireShared 实现为:

protected int tryAcquireShared(int acquires) {
    return nonfairTryAcquireShared(acquires);
}

实际上直接调用了父类的 nonfairTryAcquireShared 方法。

FairSync

FairSync 版本的 tryAcquireShared 实现为:

protected int tryAcquireShared(int acquires) {
    for (;;) {
        if (hasQueuedPredecessors())
            return -1;
        int available = getState();
        int remaining = available - acquires;
        if (remaining < 0 ||
            compareAndSetState(available, remaining))
                return remaining;
        }
    }

tryAcquireShared 同样运行在无限循环中,每次循环首先调用 hasQueuedPrecedecessors 判断等待队列中当前线程所在的节点是否有前驱节点,如果有,那么不应该由当前线程获得许可;否则,尝试使用 compareAndSetState 获取许可。

Semaphore

acquire

public void acquire() throws InterruptedException {
    sync.acquireSharedInterruptibly(1);
}

线程调用 acquire 将尝试获取一个许可,如果没有可用许可,那么会一直阻塞直到获得许可或者被其它线程中断。

当然也可以指定需要获取的许可数量,这样的话会一直阻塞直到获得了所有的许可或者被中断:

public void acquire(int permits) throws InterruptedException {
    if (permits < 0) throw new IllegalArgumentException();
    sync.acquireSharedInterruptibly(permits);
}

sync.acquireSharedInterruptibly 继承于父类,在 AQS 中实现为:

public final void acquireSharedInterruptibly(int arg)
         throws InterruptedException {
    if (Thread.interrupted())
        throw new InterruptedException();
    if (tryAcquireShared(arg) < 0)
        doAcquireSharedInterruptibly(arg);
}

该方法首先调用 tryAcquireShared 尝试获取许可,如果获取失败,那么调用 doAcquireSharedInterruptibly 阻塞等待。等待过程中如果线程被中断,那么就会抛出 InterruptedException 异常。

acquireUninterruptibly

public void acquireUninterruptibly() {
    sync.acquireShared(1);
}

acquireUninterruptibly 以不响应中断的方式获取一个许可。

可以指定许可的数量:

public void acquireUninterruptibly(int permits) {
    if (permits < 0) throw new IllegalArgumentException();
    sync.acquireShared(permits);
}

sync.acquireShared 同样继承于父类 AQS,实现类似于 acquireSharedInterruptibly:

public final void acquireShared(int arg) {
    if (tryAcquireShared(arg) < 0)
        doAcquireShared(arg);
}

如果 tryAcquireShared 获取许可失败,那么调用 doAcquireShared 阻塞等待。

tryAcquire

tryAcquire 尝试能否以非阻塞的方式获得一个许可:

public boolean tryAcquire() {
    return sync.nonfairTryAcquireShared(1) >= 0;
}

也可以获取指定数量的许可:

public boolean tryAcquire(int permits) {
    if (permits < 0) throw new IllegalArgumentException();
    return sync.nonfairTryAcquireShared(permits) >= 0;
}

或者在指定时间内获得指定数量的许可:

public boolean tryAcquire(int permits, long timeout, TimeUnit unit)
    throws InterruptedException {
    if (permits < 0) throw new IllegalArgumentException();
    return sync.tryAcquireSharedNanos(permits, unit.toNanos(timeout));
}

release

public void release() {
    sync.releaseShared(1);
}

release 会释放一个许可,释放之后如果有线程在等待许可,那么会选择其中一个线程并尝试满足其许可要求。

Semaphore 不要求调用线程 release 释放许可之前必须先调用 acquire 获得许可。

当然,也可以指定要释放的许可数量:

public void release(int permits) {
    if (permits < 0) throw new IllegalArgumentException();
        sync.releaseShared(permits);
}

hasQueuedThreads

public final boolean hasQueuedThreads() {
    return sync.hasQueuedThreads();
}

hasQueuedThreads 可以判断是否有线程在等待获取许可。

getQueuedLength

public final int getQueueLength() {
    return sync.getQueueLength();
}

getQueuedLength 返回等待队列的大致长度。

getQueuedThreads

protected Collection<Thread> getQueuedThreads() {
    return sync.getQueuedThreads();
}

getQueuedThreads 返回等待队列的线程集合。

注意,由于中断和超时的存在,上述三个方法的结果都不是绝对准确的。


Previous post: 理解 Java ReentrantLock 实现

Next post: 理解 Java LockSupport 工具