并发集合类4:PriorityBlockingQueue详解

作者:袖梨 2026-05-29

在Java并发编程中,PriorityBlockingQueue是一个结合优先级队列与阻塞队列特性的线程安全集合类。下面将详细解析其实现原理与核心操作。

什么是PriorityBlockingQueue?

PriorityBlockingQueueJUC中一个线程安全的无界阻塞优先级队列,它结合了PriorityQyeye以及BlockingQueue的特性。

并发集合类(4):PriorityBlockingQueue

与传统的LinkedBlockingQueueArrayBlockingQueue不同,该队列中的元素按照优先级顺序出队,而非遵循先进先出原则。

使用该队列时,元素必须实现Comparable接口,或者在构造队列时提供Comparator比较器。

PriorityBlockingQueue的内部数据结构

private static final int DEFAULT_INITIAL_CAPACITY = 11;private transient Object[] queue;private transient int size;private transient Comparator<? super E> comparator;private final ReentrantLock lock = new ReentrantLock();@SuppressWarnings("serial") // Classes implementing Condition may be serializable.
private final Condition notEmpty = lock.newCondition();private transient volatile int allocationSpinLock;private PriorityQueue q;

内部使用queue数组作为存储结构,实现为平衡二叉堆。size记录当前元素数量,默认初始容量为11。

comparator字段决定元素排序规则,若为空则采用元素自然排序。

通过ReentrantLock实现线程安全,notEmpty条件用于队列空时阻塞获取操作。

allocationSpinLock作为扩容标识,q字段保持版本兼容性。

PriorityBlockingQueue的方法

构造方法


public PriorityBlockingQueue() {
    this(DEFAULT_INITIAL_CAPACITY, null);
}public PriorityBlockingQueue(int initialCapacity) {
    this(initialCapacity, null);
}public PriorityBlockingQueue(int initialCapacity,
                             Comparator<? super E> comparator) {
    if (initialCapacity < 1)
        throw new IllegalArgumentException();
    this.comparator = comparator;
    this.queue = new Object[Math.max(1, initialCapacity)];
}

无参构造采用默认容量和自然排序,其他构造方法支持自定义容量和比较器。

offer操作

作为无界队列的插入方法,offer始终返回true且不会阻塞。

public boolean offer(E e) {
    if (e == null)
        throw new NullPointerException();
    final ReentrantLock lock = this.lock;
    lock.lock();
    int n, cap;
    Object[] es;
    while ((n = size) >= (cap = (es = queue).length))
        tryGrow(es, cap);
    try {
        final Comparator<? super E> cmp;
        if ((cmp = comparator) == null)
            siftUpComparable(n, e, es);
        else
            siftUpUsingComparator(n, e, es, cmp);
        size = n + 1;
        notEmpty.signal();
    } finally {
        lock.unlock();
    }
    return true;
}
  1. 获取锁
  2. 检查容量并触发扩容
  3. 根据比较器类型选择排序方式
  4. 唤醒等待线程
  5. 释放锁

put操作

由于是无界队列,put直接调用offer方法。

public void put(E e) {
    offer(e); // never need to block
}

poll操作

非阻塞获取队首元素,队列空时返回null

public E poll() {
    final ReentrantLock lock = this.lock;
    lock.lock();
    try {
        return dequeue();
    } finally {
        lock.unlock();
    }
}
  1. 加锁保证线程安全
  2. 执行出队操作
  3. 释放锁资源

take操作

阻塞式获取方法,队列空时会等待。

public E take() throws InterruptedException {
    final ReentrantLock lock = this.lock;
    lock.lockInterruptibly();
    E result;
    try {
        while ( (result = dequeue()) == null)
            notEmpty.await();
    } finally {
        lock.unlock();
    }
    return result;
}
  1. 获取可中断锁
  2. 循环检查队列状态
  3. 释放锁资源

size方法

通过加锁确保获取准确的队列大小。

public int size() {
    final ReentrantLock lock = this.lock;
    lock.lock();
    try {
        return size;
    } finally {
        lock.unlock();
    }
}

PriorityBlockingQueue通过精巧的锁机制和扩容策略,实现了高效的线程安全优先级队列,是处理并发任务的理想选择。

相关文章

精彩推荐