本篇文章小编给大家分享一下Java中阻塞队列代码示例解析,文章代码介绍的很详细,小编觉得挺不错的,现在分享给大家供大家参考,有需要的小伙伴们可以来看看。
什么是阻塞队列
在数据结构中,队列遵循FIFO(先进先出)原则。在java中,Queue接口定义了定义了基本行为,由子类完成实现,常见的队列有ArrayDeque、LinkedList等,这些都是非线程安全的,在java 1.5中新增了阻塞队列,当队列满时,添加元素的线程呈阻塞状态;当队列为空时,获取元素的线程呈阻塞状态。
生产者、消费者模型
生产者将元素添加到队列中,消费中获取数据后完成数据处理。两者通过队列解决了生产者和消费者的耦合关系;当生产者的生产速度与消费者的消费速度不一致时,可以通过大道缓冲的目的。
阻塞队列的使用场景
线程池
在线程池中,当工作线程数大于等于corePoolSize时,后续的任务后添加到阻塞队列中;
目前有那些阻塞队列
在java中,BlockingQueue接口定义了阻塞队列的行为,常用子类是ArrayBlockingQueue和LinkedBlockingQueue。
BlockingQueue继承了Queue接口,拥有其全部特性。在BlockingQueue的java doc中对其中的操作方法做了汇总
插入元素
add(e):当队列已满时,再添加元素会抛出异常IllegalStateException
offer(e):添加成功,返回true,否则返回false
put:(e):当队列已满时,再添加元素会使线程变为阻塞状态
offer(e, time,unit):当队列已满时,在末尾添加数据,如果在指定时间内没有添加成功,返回false,反之是true
删除元素
remove(e):返回true表示已成功删除,否则返回false
poll():如果队列为空返回null,否则返回队列中的第一个元素
take():获取队列中的第一个元素,如果队列为空,获取元素的线程变为阻塞状态
poll(time, unit):当队列为空时,线程被阻塞,如果超过指定时间,线程退出
检查元素
element():获取队头元素,如果元素为null,抛出NoSuchElementException
peek():获取队头元素,如果队列为空返回null,否则返回目标元素
ArrayBlockingQueue
底层基于数组的有界阻塞队列,在构造此队列时必须指定容量;
构造函数
// 第一个 public ArrayBlockingQueue(int capacity, boolean fair,Collection extends E> c) { this(capacity, fair); final ReentrantLock lock = this.lock; lock.lock(); // Lock only for visibility, not mutual exclusion try { int i = 0; try { for (E e : c) { checkNotNull(e); items[i++] = e; } } catch (ArrayIndexOutOfBoundsException ex) { throw new IllegalArgumentException(); } count = i; putIndex = (i == capacity) ? 0 : i; } finally { lock.unlock(); } } // 第二个 public ArrayBlockingQueue(int capacity, boolean fair) { if (capacity <= 0) throw new IllegalArgumentException(); this.items = new Object[capacity]; lock = new ReentrantLock(fair); notEmpty = lock.newCondition(); notFull = lock.newCondition(); } // 第三个 public ArrayBlockingQueue(int capacity) { this(capacity, false); }
capacity:队列的初始容量
fair:线程访问队列的公平性。如果为true按照FIFO的原则处理,反之;默认为falsec:
已有元素的集合,类型于合并两个数组
put()方法
public void put(E e) throws InterruptedException { // 检查元素是否为null checkNotNull(e); final ReentrantLock lock = this.lock; // 获取锁 lock.lockInterruptibly(); try { // 如果当前队列为空,变为阻塞状态 while (count == items.length) notFull.await(); // 反之,就添加元素 enqueue(e); } finally { // 解锁 lock.unlock(); } } private void enqueue(E x) { final Object[] items = this.items; items[putIndex] = x; if (++putIndex == items.length) putIndex = 0; count++; // 此时队列不为空,唤醒消费者 notEmpty.signal(); }
take()方法
public E take() throws InterruptedException { final ReentrantLock lock = this.lock; // 获取锁 lock.lockInterruptibly(); try { // 如果队列为空,消费者变为阻塞状态 while (count == 0) notEmpty.await(); // 不为空,就获取数据 return dequeue(); } finally { // 解锁 lock.unlock(); } } private E dequeue() { final Object[] items = this.items; @SuppressWarnings("unchecked") // 获取队头元素x E x = (E) items[takeIndex]; items[takeIndex] = null; if (++takeIndex == items.length) takeIndex = 0; count--; if (itrs != null) itrs.elementDequeued(); // 此时队列没有满,同时生产者继续添加数据 notFull.signal(); return x; }
LinkedBlockingQueue
底层基于单向链表的无界阻塞队列,如果不指定初始容量,默认为Integer.MAX_VALUE,否则为指定容量
构造函数
// 不指定容量 public LinkedBlockingQueue() { this(Integer.MAX_VALUE); } // 指定容量 public LinkedBlockingQueue(int capacity) { if (capacity <= 0) throw new IllegalArgumentException(); this.capacity = capacity; last = head = new Node(null); } // 等同于合并数组 public LinkedBlockingQueue(Collection extends E> c) { this(Integer.MAX_VALUE); final ReentrantLock putLock = this.putLock; putLock.lock(); // Never contended, but necessary for visibility try { int n = 0; for (E e : c) { if (e == null) throw new NullPointerException(); if (n == capacity) throw new IllegalStateException("Queue full"); enqueue(new Node (e)); ++n; } count.set(n); } finally { putLock.unlock(); } }
put()方法
public void put(E e) throws InterruptedException { // 元素为空,抛出异常 if (e == null) throw new NullPointerException(); int c = -1; Nodenode = new Node (e); final ReentrantLock putLock = this.putLock; // 获取队列中的数据量 final AtomicInteger count = this.count; // 获取锁 putLock.lockInterruptibly(); try { // 队列满了,变为阻塞状态 while (count.get() == capacity) { notFull.await(); } // 将目标元素添加到链表的尾端 enqueue(node); // 总数增加 c = count.getAndIncrement(); // 队列还没有满,继续添加元素 if (c + 1 < capacity) notFull.signal(); } finally { // 解锁 putLock.unlock(); } if (c == 0) signalNotEmpty(); }
take()方法
public E take() throws InterruptedException { E x; int c = -1; // 获取队列中的工作数 final AtomicInteger count = this.count; final ReentrantLock takeLock = this.takeLock; // 获取锁 takeLock.lockInterruptibly(); try { // 如果队列为空,变为阻塞状态 while (count.get() == 0) { notEmpty.await(); } // 获取队头元素 x = dequeue(); // 递减 c = count.getAndDecrement(); // 通知消费者 if (c > 1) notEmpty.signal(); } finally { // 解锁 takeLock.unlock(); } if (c == capacity) // signalNotFull(); return x; }
对比
相同点
两者都是通过Condition通知生产者和消费者完成元素的添加和获取
都可以指定容量
不同点
ArrayBlockingQueue基于数据,LinkedBlockingQueue基于链表
ArrayBlockingQueue内有一把锁,LinkedBlockingQueue内有两把锁
自己动手实现一个阻塞队列
通过分析源码可以知道,阻塞队列其实是通过通知机制Condition完成生产者和消费的互通。也可以通过Object类中的wait()和notify、notifyAll实现。下面是自己写的一个阻塞队列
public class BlockQueue { // 对象锁 public static final Object LOCK = new Object(); // 控制变量的值 来通知双方 public boolean condition; public void put() { synchronized (LOCK) { while (condition) { try { // 满了 System.out.println("put 队列满了,开始阻塞"); LOCK.wait(); } catch (InterruptedException e) { e.printStackTrace(); } } condition = true; System.out.println("put 改为true,唤醒消费者"); LOCK.notifyAll(); } } public void take() { synchronized (LOCK) { while (!condition) { // 没满 System.out.println("take 队列没满,开始阻塞"); try { LOCK.wait(); } catch (InterruptedException e) { e.printStackTrace(); } } condition = false; System.out.println("take 改为false,唤醒生产者"); LOCK.notifyAll(); } } }
忍者必须死34399账号登录版 最新版v1.0.138v2.0.72
下载勇者秘境oppo版 安卓版v1.0.5
下载忍者必须死3一加版 最新版v1.0.138v2.0.72
下载绝世仙王官方正版 最新安卓版v1.0.49
下载Goat Simulator 3手机版 安卓版v1.0.8.2
Goat Simulator 3手机版是一个非常有趣的模拟游
Goat Simulator 3国际服 安卓版v1.0.8.2
Goat Simulator 3国际版是一个非常有趣的山羊模
烟花燃放模拟器中文版 2025最新版v1.0
烟花燃放模拟器是款仿真的烟花绽放模拟器类型单机小游戏,全方位
我的世界动漫世界 手机版v友y整合
我的世界动漫世界模组整合包是一款加入了动漫元素的素材整合包,
我的世界贝爷生存整合包 最新版v隔壁老王
我的世界MITE贝爷生存整合包是一款根据原版MC制作的魔改整