在Java并发编程中,PriorityBlockingQueue是一个结合优先级队列与阻塞队列特性的线程安全集合类。下面将详细解析其实现原理与核心操作。
PriorityBlockingQueue是JUC中一个线程安全的无界阻塞优先级队列,它结合了PriorityQyeye以及BlockingQueue的特性。

与传统的LinkedBlockingQueue和ArrayBlockingQueue不同,该队列中的元素按照优先级顺序出队,而非遵循先进先出原则。
使用该队列时,元素必须实现Comparable接口,或者在构造队列时提供Comparator比较器。
private static final int DEFAULT_INITIAL_CAPACITY = 11;private transient Object[] queue;private transient int size;private transient Comparator<? super E> comparator;private final ReentrantLock lock = new ReentrantLock();@SuppressWarnings("serial") // Classes implementing Condition may be serializable.
private final Condition notEmpty = lock.newCondition();private transient volatile int allocationSpinLock;private PriorityQueue q;
内部使用queue数组作为存储结构,实现为平衡二叉堆。size记录当前元素数量,默认初始容量为11。
comparator字段决定元素排序规则,若为空则采用元素自然排序。
通过ReentrantLock实现线程安全,notEmpty条件用于队列空时阻塞获取操作。
allocationSpinLock作为扩容标识,q字段保持版本兼容性。
public PriorityBlockingQueue() {
this(DEFAULT_INITIAL_CAPACITY, null);
}public PriorityBlockingQueue(int initialCapacity) {
this(initialCapacity, null);
}public PriorityBlockingQueue(int initialCapacity,
Comparator<? super E> comparator) {
if (initialCapacity < 1)
throw new IllegalArgumentException();
this.comparator = comparator;
this.queue = new Object[Math.max(1, initialCapacity)];
}
无参构造采用默认容量和自然排序,其他构造方法支持自定义容量和比较器。
作为无界队列的插入方法,offer始终返回true且不会阻塞。
public boolean offer(E e) {
if (e == null)
throw new NullPointerException();
final ReentrantLock lock = this.lock;
lock.lock();
int n, cap;
Object[] es;
while ((n = size) >= (cap = (es = queue).length))
tryGrow(es, cap);
try {
final Comparator<? super E> cmp;
if ((cmp = comparator) == null)
siftUpComparable(n, e, es);
else
siftUpUsingComparator(n, e, es, cmp);
size = n + 1;
notEmpty.signal();
} finally {
lock.unlock();
}
return true;
}
由于是无界队列,put直接调用offer方法。
public void put(E e) {
offer(e); // never need to block
}
非阻塞获取队首元素,队列空时返回null。
public E poll() {
final ReentrantLock lock = this.lock;
lock.lock();
try {
return dequeue();
} finally {
lock.unlock();
}
}
阻塞式获取方法,队列空时会等待。
public E take() throws InterruptedException {
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
E result;
try {
while ( (result = dequeue()) == null)
notEmpty.await();
} finally {
lock.unlock();
}
return result;
}
通过加锁确保获取准确的队列大小。
public int size() {
final ReentrantLock lock = this.lock;
lock.lock();
try {
return size;
} finally {
lock.unlock();
}
}
PriorityBlockingQueue通过精巧的锁机制和扩容策略,实现了高效的线程安全优先级队列,是处理并发任务的理想选择。