本篇文章小编给大家分享一下Java中阻塞队列代码示例解析,文章代码介绍的很详细,小编觉得挺不错的,现在分享给大家供大家参考,有需要的小伙伴们可以来看看。
什么是阻塞队列
在数据结构中,队列遵循FIFO(先进先出)原则。在java中,Queue接口定义了定义了基本行为,由子类完成实现,常见的队列有ArrayDeque、LinkedList等,这些都是非线程安全的,在java 1.5中新增了阻塞队列,当队列满时,添加元素的线程呈阻塞状态;当队列为空时,获取元素的线程呈阻塞状态。
生产者、消费者模型
生产者将元素添加到队列中,消费中获取数据后完成数据处理。两者通过队列解决了生产者和消费者的耦合关系;当生产者的生产速度与消费者的消费速度不一致时,可以通过大道缓冲的目的。
阻塞队列的使用场景
线程池
在线程池中,当工作线程数大于等于corePoolSize时,后续的任务后添加到阻塞队列中;
目前有那些阻塞队列
在java中,BlockingQueue接口定义了阻塞队列的行为,常用子类是ArrayBlockingQueue和LinkedBlockingQueue。
BlockingQueue继承了Queue接口,拥有其全部特性。在BlockingQueue的java doc中对其中的操作方法做了汇总
插入元素
add(e):当队列已满时,再添加元素会抛出异常IllegalStateException
offer(e):添加成功,返回true,否则返回false
put:(e):当队列已满时,再添加元素会使线程变为阻塞状态
offer(e, time,unit):当队列已满时,在末尾添加数据,如果在指定时间内没有添加成功,返回false,反之是true
删除元素
remove(e):返回true表示已成功删除,否则返回false
poll():如果队列为空返回null,否则返回队列中的第一个元素
take():获取队列中的第一个元素,如果队列为空,获取元素的线程变为阻塞状态
poll(time, unit):当队列为空时,线程被阻塞,如果超过指定时间,线程退出
检查元素
element():获取队头元素,如果元素为null,抛出NoSuchElementException
peek():获取队头元素,如果队列为空返回null,否则返回目标元素
ArrayBlockingQueue
底层基于数组的有界阻塞队列,在构造此队列时必须指定容量;
构造函数
// 第一个 public ArrayBlockingQueue(int capacity, boolean fair,Collection extends E> c) { this(capacity, fair); final ReentrantLock lock = this.lock; lock.lock(); // Lock only for visibility, not mutual exclusion try { int i = 0; try { for (E e : c) { checkNotNull(e); items[i++] = e; } } catch (ArrayIndexOutOfBoundsException ex) { throw new IllegalArgumentException(); } count = i; putIndex = (i == capacity) ? 0 : i; } finally { lock.unlock(); } } // 第二个 public ArrayBlockingQueue(int capacity, boolean fair) { if (capacity <= 0) throw new IllegalArgumentException(); this.items = new Object[capacity]; lock = new ReentrantLock(fair); notEmpty = lock.newCondition(); notFull = lock.newCondition(); } // 第三个 public ArrayBlockingQueue(int capacity) { this(capacity, false); }
capacity:队列的初始容量
fair:线程访问队列的公平性。如果为true按照FIFO的原则处理,反之;默认为falsec:
已有元素的集合,类型于合并两个数组
put()方法
public void put(E e) throws InterruptedException { // 检查元素是否为null checkNotNull(e); final ReentrantLock lock = this.lock; // 获取锁 lock.lockInterruptibly(); try { // 如果当前队列为空,变为阻塞状态 while (count == items.length) notFull.await(); // 反之,就添加元素 enqueue(e); } finally { // 解锁 lock.unlock(); } } private void enqueue(E x) { final Object[] items = this.items; items[putIndex] = x; if (++putIndex == items.length) putIndex = 0; count++; // 此时队列不为空,唤醒消费者 notEmpty.signal(); }
take()方法
public E take() throws InterruptedException { final ReentrantLock lock = this.lock; // 获取锁 lock.lockInterruptibly(); try { // 如果队列为空,消费者变为阻塞状态 while (count == 0) notEmpty.await(); // 不为空,就获取数据 return dequeue(); } finally { // 解锁 lock.unlock(); } } private E dequeue() { final Object[] items = this.items; @SuppressWarnings("unchecked") // 获取队头元素x E x = (E) items[takeIndex]; items[takeIndex] = null; if (++takeIndex == items.length) takeIndex = 0; count--; if (itrs != null) itrs.elementDequeued(); // 此时队列没有满,同时生产者继续添加数据 notFull.signal(); return x; }
LinkedBlockingQueue
底层基于单向链表的无界阻塞队列,如果不指定初始容量,默认为Integer.MAX_VALUE,否则为指定容量
构造函数
// 不指定容量 public LinkedBlockingQueue() { this(Integer.MAX_VALUE); } // 指定容量 public LinkedBlockingQueue(int capacity) { if (capacity <= 0) throw new IllegalArgumentException(); this.capacity = capacity; last = head = new Node(null); } // 等同于合并数组 public LinkedBlockingQueue(Collection extends E> c) { this(Integer.MAX_VALUE); final ReentrantLock putLock = this.putLock; putLock.lock(); // Never contended, but necessary for visibility try { int n = 0; for (E e : c) { if (e == null) throw new NullPointerException(); if (n == capacity) throw new IllegalStateException("Queue full"); enqueue(new Node (e)); ++n; } count.set(n); } finally { putLock.unlock(); } }
put()方法
public void put(E e) throws InterruptedException { // 元素为空,抛出异常 if (e == null) throw new NullPointerException(); int c = -1; Nodenode = new Node (e); final ReentrantLock putLock = this.putLock; // 获取队列中的数据量 final AtomicInteger count = this.count; // 获取锁 putLock.lockInterruptibly(); try { // 队列满了,变为阻塞状态 while (count.get() == capacity) { notFull.await(); } // 将目标元素添加到链表的尾端 enqueue(node); // 总数增加 c = count.getAndIncrement(); // 队列还没有满,继续添加元素 if (c + 1 < capacity) notFull.signal(); } finally { // 解锁 putLock.unlock(); } if (c == 0) signalNotEmpty(); }
take()方法
public E take() throws InterruptedException { E x; int c = -1; // 获取队列中的工作数 final AtomicInteger count = this.count; final ReentrantLock takeLock = this.takeLock; // 获取锁 takeLock.lockInterruptibly(); try { // 如果队列为空,变为阻塞状态 while (count.get() == 0) { notEmpty.await(); } // 获取队头元素 x = dequeue(); // 递减 c = count.getAndDecrement(); // 通知消费者 if (c > 1) notEmpty.signal(); } finally { // 解锁 takeLock.unlock(); } if (c == capacity) // signalNotFull(); return x; }
对比
相同点
两者都是通过Condition通知生产者和消费者完成元素的添加和获取
都可以指定容量
不同点
ArrayBlockingQueue基于数据,LinkedBlockingQueue基于链表
ArrayBlockingQueue内有一把锁,LinkedBlockingQueue内有两把锁
自己动手实现一个阻塞队列
通过分析源码可以知道,阻塞队列其实是通过通知机制Condition完成生产者和消费的互通。也可以通过Object类中的wait()和notify、notifyAll实现。下面是自己写的一个阻塞队列
public class BlockQueue { // 对象锁 public static final Object LOCK = new Object(); // 控制变量的值 来通知双方 public boolean condition; public void put() { synchronized (LOCK) { while (condition) { try { // 满了 System.out.println("put 队列满了,开始阻塞"); LOCK.wait(); } catch (InterruptedException e) { e.printStackTrace(); } } condition = true; System.out.println("put 改为true,唤醒消费者"); LOCK.notifyAll(); } } public void take() { synchronized (LOCK) { while (!condition) { // 没满 System.out.println("take 队列没满,开始阻塞"); try { LOCK.wait(); } catch (InterruptedException e) { e.printStackTrace(); } } condition = false; System.out.println("take 改为false,唤醒生产者"); LOCK.notifyAll(); } } }