理解 Java ArrayBlockingQueue 实现

概述

ArrayBlockingQueue 是一个基于数组的有界阻塞队列。该队列以 FIFO 的形式组织元素,队列头是进入队列时间最长的元素,队列尾则是进入队列时间最短的元素。除此之外,该队列还支持公平访问。定义如下:

public class ArrayBlockingQueue<E> extends AbstractQueue<E> implements BlockingQueue<E>, java.io.Serializable

实现

构造方法

ArrayBlockingQueue 是经典的有界队列,创建队列需要指定队列大小,有三种构造方法:

public ArrayBlockingQueue(int capacity);
public ArrayBlockingQueue(int capacity, boolean fair);
public ArrayBlockingQueue(int capacity, boolean fair,
                          Collection<? extends E> c);

默认情况下,队列不保证线程公平访问,可以在创建队列时指定公平性。

核心数据结构

ArrayBlockingQueue 核心数组定义为:

/** The queued items */
final Object[] items;

该数组会在构造方法中被初始化,大小为 capacity。

除此之外,还需要记录队列头和队列尾的位置:

/** items index for next take, poll, peek or remove */
int takeIndex;

/** items index for next put, offer, or add */
int putIndex;

takeIndex 总是指向队列头元素位置,而 putIndex 总是指向下一个元素的位置。

以及队列中元素的个数:

/** Number of elements in the queue */
int count;

并发控制

ArrayBlockingQueue 允许多个线程同时向队列尾插入元素,也允许多个线程同时从队列头移除元素,主要使用可重入锁保证并发访问:

/** Main lock guarding all access */
final ReentrantLock lock;

同时还有两个与该锁绑定的 Condition,用于实现阻塞的插入和移除:

/** Condition for waiting takes */
private final Condition notEmpty;

/** Condition for waiting puts */
private final Condition notFull;

方法实现

我们知道 BlockingQueue 提供了四类方法,下面将看看 ArrayBlockingQueue 如何实现这四类方法

add

add 方法尝试向队列尾插入一个指定元素,插入成功则返回 true;如果队列已满,则抛出 IllegalStateException。add 方法实现较简单:

public boolean add(E e) {
    return super.add(e);
}

只是调用了父类 AbstractQueue 的 add 方法,父类 add 方法实现如下:

public boolean add(E e) {
    if (offer(e))
        return true;
    else
        throw new IllegalStateException("Queue full");
}

父类 offer 方法是抽象方法,具体取决于子类的实现。所以最终 add 方法依赖于 offer 方法。

offer

offer 方法尝试向队列尾插入一个指定元素,插入成功则返回 true,否则返回 false:

public boolean offer(E e) {
    checkNotNull(e);
    final ReentrantLock lock = this.lock;
    lock.lock();
    try {
        if (count == items.length)
            return false;
        else {
            enqueue(e);
            return true;
        }
    } finally {
        lock.unlock();
    }
}

首先判断元素 e 是否为空,如果不为空那么继续进行插入过程。插入时,当前线程首先需要获得锁,这样可以保证其他线程不会对队列做修改。如果队列已满,那么返回 false;否则调用 enqueue 方法将元素入队。

enqueue 方法实现如下:

private void enqueue(E x) {
    final Object[] items = this.items;
    items[putIndex] = x;
    if (++putIndex == items.length)
        putIndex = 0;
    count++;
    notEmpty.signal();
}

enqueue 不仅将元素插入队列,还调用 notEmpty.signal() 方法。如果有线程正在等待队列不为空,那么 signal() 将会唤醒其中一个线程,使该线程能够继续执行。

当线程释放锁时,锁可以保证内存同步,使得随后线程能够”看见“上一个线程的操作。

除此之外,还可以在指定时间内向队列尾插入一个指定元素,插入成功则返回 true;如果超时不能插入,返回 false:

public boolean offer(E e, long timeout, TimeUnit unit)
    throws InterruptedException {

    checkNotNull(e);
    long nanos = unit.toNanos(timeout);
    final ReentrantLock lock = this.lock;
    lock.lockInterruptibly();
    try {
        while (count == items.length) {
            if (nanos <= 0)
                return false;
            nanos = notFull.awaitNanos(nanos);
        }
        enqueue(e);
        return true;
    } finally {
        lock.unlock();
    }
}

offer 方法整体逻辑与 put 方法类似,不过在循环检查队列不为满的过程中,调用 notFull.awaitNanos 得到剩余可等待时间。如果剩余可等待时间小于等于 0,说明已经超时,返回 false。

put

put 方法尝试向对列尾插入一个指定元素,插入成功则立即返回,否则一直阻塞线程直到队列不满:

public void put(E e) throws InterruptedException {
    checkNotNull(e);
    final ReentrantLock lock = this.lock;
    lock.lockInterruptibly();
    try {
        while (count == items.length)
            notFull.await();
        enqueue(e);
    } finally {
        lock.unlock();
    }
}

类似地,线程需要获得锁才能继续执行插入过程。注意到线程是可中断的获取锁,说明线程可以响应中断信号。如果线程被中断,将抛出 InterruptedException。

在插入过程中,首先循环判断队列是否已满。如果队列已满,那么调用 notFull.await(),这时当前线程会释放锁,并进入阻塞状态。当线程被唤醒时,这时线程已经持有锁,继续执行循环,判断队列是否已满。如果队列还是满,那么线程继续阻塞。重复这个过程直到队列不满,那么这时使用 enqueue 将元素入队。

poll

poll 尝试获取并删除队列头元素,如果队列为空,那么返回 null:

public E poll() {
    final ReentrantLock lock = this.lock;
    lock.lock();
    try {
        return (count == 0) ? null : dequeue();
    } finally {
        lock.unlock();
    }
}

线程获得锁之后,首先判断队列是否不为空。如果队列为空,那么返回 null;否则使用 dequeue() 获取并删除头元素:

private E dequeue() {
    final Object[] items = this.items;
    @SuppressWarnings("unchecked")
    E x = (E) items[takeIndex];
    items[takeIndex] = null;
    if (++takeIndex == items.length)
        takeIndex = 0;
    count--;
    if (itrs != null)
        itrs.elementDequeued();
    notFull.signal();
    return x;
}

dequeue 获得 takeIndex 所指向的元素,还调用了 notFull.signal()。如果有线程正在等待队列不为满,那么 signal() 将会唤醒其中一个线程。

poll 还可以尝试在指定时间内获取并删除队列头元素,实现思路类似于 offer,不再赘述。

take

take 方法尝试获取并删除队列头元素,如果成功则返回头元素,否则一直阻塞线程直到队列不空:

public E take() throws InterruptedException {
    final ReentrantLock lock = this.lock;
    lock.lockInterruptibly();
    try {
        while (count == 0)
            notEmpty.await();
        return dequeue();
    } finally {
        lock.unlock();
    }
}

take 方法也使用了循环检查,只有当队列不为空,才使用 dequeue() 取出并删除队列头元素。

peek

peek 方法尝试返回队列头元素,不会删除该头元素:

public E peek() {
    final ReentrantLock lock = this.lock;
    lock.lock();
    try {
        return itemAt(takeIndex); // null when queue is empty
    } finally {
        lock.unlock();
    }
}

itemAt 只是简单地返回对应下标的元素,注意该元素可能为 null:

final E itemAt(int i) {
    return (E) items[i];
}

remove

remove 方法尝试从队列中删除一个元素 e,使得 o.equals(e)。如果删除成功返回 true;否则返回 false:

public boolean remove(Object o) {
    if (o == null) return false;
    final Object[] items = this.items;
    final ReentrantLock lock = this.lock;
    lock.lock();
    try {
        if (count > 0) {
            final int putIndex = this.putIndex;
            int i = takeIndex;
            do {
                if (o.equals(items[i])) {
                    removeAt(i);
                    return true;
                }
                if (++i == items.length)
                    i = 0;
            } while (i != putIndex);
        }
        return false;
    } finally {
        lock.unlock();
    }
}

线程获得锁之后,首先判断队列是否不为空;如果为空,返回 false;否则继续进行删除过程。首先找到元素 items[i],使得 o.equals(items[i]),然后调用 removeAt(i) 删除该元素并返回 true。如果找不到这样的元素,那么返回 false。

removeAt 定义如下:

void removeAt(final int removeIndex) {
    final Object[] items = this.items;
    if (removeIndex == takeIndex) {
        // removing front item; just advance
        items[takeIndex] = null;
        if (++takeIndex == items.length)
            takeIndex = 0;
        count--;
        if (itrs != null)
            itrs.elementDequeued();
    } else {
        // an "interior" remove
        // slide over all others up through putIndex.
        final int putIndex = this.putIndex;
        for (int i = removeIndex;;) {
            int next = i + 1;
            if (next == items.length)
                next = 0;
            if (next != putIndex) {
                items[i] = items[next];
                i = next;
            } else {
                items[i] = null;
                this.putIndex = i;
                break;
            }
        }
        count--;
        if (itrs != null)
            itrs.removedAt(removeIndex);
    }
    notFull.signal();
}

在 removeAt 方法中,如果要删除的元素下标等于 takeIndex,那么直接删除就可以了;否则进行”滑动删除“。

contains

contains 方法判断队列中是否存在元素 e,使得 o.equals(e)。该方法需要遍历整个队列:

public boolean contains(Object o) {
    if (o == null) return false;
    final Object[] items = this.items;
    final ReentrantLock lock = this.lock;
    lock.lock();
    try {
        if (count > 0) {
            final int putIndex = this.putIndex;
            int i = takeIndex;
            do {
                if (o.equals(items[i]))
                    return true;
                if (++i == items.length)
                    i = 0;
            } while (i != putIndex);
        }
        return false;
    } finally {
        lock.unlock();
    }
}

clear

clear 将删除队列中所有元素:

public void clear() {
    final Object[] items = this.items;
    final ReentrantLock lock = this.lock;
    lock.lock();
    try {
        int k = count;
        if (k > 0) {
            final int putIndex = this.putIndex;
            int i = takeIndex;
            do {
                items[i] = null;
                if (++i == items.length)
                    i = 0;
            } while (i != putIndex);
            takeIndex = putIndex;
            count = 0;
            if (itrs != null)
                itrs.queueIsEmpty();
            for (; k > 0 && lock.hasWaiters(notFull); k--)
                notFull.signal();
        }
    } finally {
        lock.unlock();
    }
}

总结

ArrayBlockingQueue 基于 ReentrantLock 实现了 BlockingQueue 接口。每次只能有一个线程获得锁,然后才能进行插入/删除等操作。同时,两个 Condition 变量 notFull 和 notEmpty 可以实现阻塞地插入和移除。

另外,可以注意到在很多方法中,都存在将 final 成员变量拷贝到局部 final 变量,如加锁时都要创建局部变量:

final ReentrantLock lock = this.lock;

这时实现者 Doug Lea 的使用风格,原因在 In ArrayBlockingQueue, why copy final member field into local final variable? 提到:

copying to locals produces the smallest bytecode, and for low-level code it’s nice to write code that’s a little closer to the machine.


Previous post: Java BlockingQueue 接口

Next post: 理解 Java ReentrantLock 实现