一概述
BlockingQueue作为线程容器可以为线程同步提供有力的保障
二BlockingQueue定义的常用方法
BlockingQueue定义的常用方法如下
抛出异常 特殊值 阻塞 超时
插入 add(e) offer(e) put(e) offer(e time unit)
移除 remove() poll() take() poll(time unit)
检查 element() peek() 不可用 不可用
)add(anObject)把anObject加到BlockingQueue里即如果BlockingQueue可以容纳则返回true否则招聘异常
)offer(anObject)表示如果可能的话将anObject加到BlockingQueue里即如果BlockingQueue可以容纳则返回true否则返回false
)put(anObject)把anObject加到BlockingQueue里如果BlockQueue没有空间则调用此方法的线程被阻断直到BlockingQueue里面有空间再继续
)poll(time)取走BlockingQueue里排在首位的对象若不能立即取出则可以等time参数规定的时间取不到时返回null
)take()取走BlockingQueue里排在首位的对象若BlockingQueue为空阻断进入等待状态直到Blocking有新的对象被加入为止
其中BlockingQueue 不接受null 元素试图addput 或offer 一个null 元素时某些实现会抛出NullPointerExceptionnull 被用作指示poll 操作失败的警戒值
三BlockingQueue的几个注意点
【】BlockingQueue 可以是限定容量的它在任意给定时间都可以有一个remainingCapacity超出此容量便无法无阻塞地put 附加元素没有任何内部容量约束的BlockingQueue 总是报告IntegerMAX_VALUE 的剩余容量
【】BlockingQueue 实现主要用于生产者使用者队列但它另外还支持Collection 接口因此举例来说使用remove(x) 从队列中移除任意一个元素是有可能的然而这种操作通常不 会有效执行只能有计划地偶尔使用比如在取消排队信息时
【】BlockingQueue 实现是线程安全的所有排队方法都可以使用内部锁或其他形式的并发控制来自动达到它们的目的然而大量的 Collection 操作(addAllcontainsAllretainAll 和removeAll)没有 必要自动执行除非在实现中特别说明因此举例来说在只添加了c 中的一些元素后addAll(c) 有可能失败(抛出一个异常)
【】BlockingQueue 实质上不 支持使用任何一种close或shutdown操作来指示不再添加任何项这种功能的需求和使用有依赖于实现的倾向例如一种常用的策略是对于生产者插入特殊的endofstream 或poison 对象并根据使用者获取这些对象的时间来对它们进行解释
四简要概述BlockingQueue常用的四个实现类
)ArrayBlockingQueue:规定大小的BlockingQueue其构造函数必须带一个int参数来指明其大小其所含的对象是以FIFO(先入先出)顺序排序的
)LinkedBlockingQueue:大小不定的BlockingQueue若其构造函数带一个规定大小的参数生成的BlockingQueue有大小限制若不带大小参数所生成的BlockingQueue的大小由IntegerMAX_VALUE来决定其所含的对象是以FIFO(先入先出)顺序排序的
)PriorityBlockingQueue:类似于LinkedBlockQueue但其所含对象的排序不是FIFO而是依据对象的自然排序顺序或者是构造函数的Comparator决定的顺序
)SynchronousQueue:特殊的BlockingQueue对其的操作必须是放和取交替完成的
其中LinkedBlockingQueue和ArrayBlockingQueue比较起来它们背后所用的数据结构不一样导致LinkedBlockingQueue的数据吞吐量要大于ArrayBlockingQueue但在线程数量很大时其性能的可预见性低于ArrayBlockingQueue
五具体BlockingQueue的实现类的内部细节
有耐心的同学请看具体实现类细节
ArrayBlockingQueue
ArrayBlockingQueue是一个由数组支持的有界阻塞队列此队列按 FIFO(先进先出)原则对元素进行排序队列的头部 是在队列中存在时间最长的元素队列的尾部 是在队列中存在时间最短的元素新元素插入到队列的尾部队列检索操作则是从队列头部开始获得元素
这是一个典型的有界缓存区固定大小的数组在其中保持生产者插入的元素和使用者提取的元素一旦创建了这样的缓存区就不能再增加其容量试图向已满队列中放入元素会导致放入操作受阻塞试图从空队列中检索元素将导致类似阻塞
ArrayBlockingQueue创建的时候需要指定容量capacity(可以存储的最大的元素个数因为它不会自动扩容)以及是否为公平锁(fair参数)
在创建ArrayBlockingQueue的时候默认创建的是非公平锁不过我们可以在它的构造函数里指定这里调用ReentrantLock的构造函数创建锁的时候调用了
public ReentrantLock(boolean fair) {
sync = (fair)? new FairSync() : new NonfairSync()
}
FairSync/ NonfairSync是ReentrantLock的内部类
线程按顺序请求获得公平锁而一个非公平锁可以闯入且当它尚未进入等待队列就会和等待队列head结点的线程发生竞争如果锁的状态可用请求非公平锁的线程可在等待队列中向前跳跃获得该锁内部锁synchronized没有提供确定的公平性保证
分三点来讲这个类
添加新元素的方法add/put/offer
该类的几个实例变量takeIndex/putIndex/count/
Condition实现
添加新元素的方法add/put/offer
首先谈到添加元素的方法首先得分析以下该类同步机制中用到的锁
Java代码
[java]
lock = new ReentrantLock(fair)
notEmpty = locknewCondition()//Condition Variable
notFull = locknewCondition()//Condition Variable
这三个都是该类的实例变量只有一个锁lock然后lock实例化出两个ConditionnotEmpty/noFull分别用来协调多线程的读写操作
Java代码
[java]
public boolean offer(E e) {
if (e == null) throw new NullPointerException()
final ReentrantLock lock = thislock;//每个对象对应一个显示的锁
locklock()//请求锁直到获得锁(不可以被interrupte)
try {
if (count == itemslength)//如果队列已经满了
return false;
else {
insert(e)
return true;
}
} finally {
lockunlock()//
}
}
看insert方法
private void insert(E x) {
items[putIndex] = x;
//增加全局index的值
/*
Inc方法体内部
final int inc(int i) {
return (++i == itemslength)? : i;
}
这里可以看出ArrayBlockingQueue采用从前到后向内部数组插入的方式插入新元素的如果插完了putIndex可能重新变为(在已经执行了移除操作的前提下否则在之前的判断中队列为满)
*/
putIndex = inc(putIndex)
++count;
notEmptysignal()//wake up one waiting thread
}
Java代码
[java]
public void put(E e) throws InterruptedException {
if (e == null) throw new NullPointerException()
final E[] items = ems;
final ReentrantLock lock = thislock;
locklockInterruptibly()//请求锁直到得到锁或者变为interrupted
try {
try {
while (count == itemslength)//如果满了当前线程进入noFull对应的等waiting状态
notFullawait()
} catch (InterruptedException ie) {
notFullsignal() // propagate to noninterrupted thread
throw ie;
}
insert(e)
} finally {
lockunlock()
}
}
Java代码
[java]
public boolean offer(E e long timeout TimeUnit unit)
throws InterruptedException {
if (e == null) throw new NullPointerException()
long nanos = unittoNanos(timeout)
final ReentrantLock lock = thislock;
locklockInterruptibly()
try {
for () {
if (count != itemslength) {
insert(e)
return true;
}
if (nanos <= )
return false;
try {
//如果没有被 signal/interruptes需要等待nanos时间才返回
nanos = notFullawaitNanos(nanos)
} catch (InterruptedException ie) {
notFullsignal() // propagate to noninterrupted thread
throw ie;
}
}
} finally {
lockunlock()
}
}
Java代码
[java]
public boolean add(E e) {
return superadd(e)
}
父类
public boolean add(E e) {
if (offer(e))
return true;
else
throw new IllegalStateException(Queue full)
}
该类的几个实例变量takeIndex/putIndex/count
Java代码
[java]
用三个数字来维护这个队列中的数据变更
/** items index for next take poll or remove */
private int takeIndex;
/** items index for next put offer or add */
private int putIndex;
/** Number of items in the queue */
private int count;
提取元素的三个方法take/poll/remove内部都调用了这个方法
Java代码
[java]
private E extract() {
final E[] items = ems;
E x = items[takeIndex];
items[takeIndex] = null;//移除已经被提取出的元素
takeIndex = inc(takeIndex)//策略和添加元素时相同
count;
notFullsignal()//提醒其他在notFull这个Condition上waiting的线程可以尝试工作了
return x;
}
从这个方法里可见tabkeIndex维护一个可以提取/移除元素的索引位置因为takeIndex是从递增的所以这个类是FIFO队列
putIndex维护一个可以插入的元素的位置索引
count显然是维护队列中已经存在的元素总数
Condition实现
Condition现在的实现只有ncurrentlocksAbstractQueueSynchoronizer内部的ConditionObject并且通过ReentranLock的newCondition()方法暴露出来这是因为Condition的await()/sinal()一般在locklock()与lockunlock()之间执行当执行conditionawait()方法时它会首先释放掉本线程持有的锁然后自己进入等待队列直到sinal()唤醒后又会重新试图去拿到锁拿到后执行await()下的代码其中释放当前锁和得到当前锁都需要ReentranLock的tryAcquire(int arg)方法来判定并且享受ReentranLock的重进入特性
Java代码
[java]
public final void await() throws InterruptedException {
if (Threadinterrupted())
throw new InterruptedException()
//加一个新的condition等待节点
Node node = addConditionWaiter()
//释放自己的锁
int savedState = fullyRelease(node)
int interruptMode = ;
while (!isOnSyncQueue(node)) {
//如果当前线程 等待状态时CONDITIONpark住当前线程等待condition的signal来解除
LockSupportpark(this)
if ((interruptMode = checkInterruptWhileWaiting(node)) != )
break;
}
if (acquireQueued(node savedState) && interruptMode != THROW_IE)
interruptMode = REINTERRUPT;
if (nodenextWaiter != null)
unlinkCancelledWaiters()
if (interruptMode != )
reportInterruptAfterWait(interruptMode)
}
SynchronousQueue
一种阻塞队列其中每个 put 必须等待一个 take反之亦然同步队列没有任何内部容量甚至连一个队列的容量都没有不能在同步队列上进行 peek因为仅在试图要取得元素时该元素才存在除非另一个线程试图移除某个元素否则也不能(使用任何方法)添加元素也不能迭代队列因为其中没有元素可用于迭代队列的头 是尝试添加到队列中的首个已排队线程元素如果没有已排队线程则不添加元素并且头为 null对于其他Collection 方法(例如 contains)SynchronousQueue 作为一个空集合此队列不允许 null 元素
同步队列类似于 CSP 和 Ada 中使用的 rendezvous 信道它非常适合于传递性设计在这种设计中在一个线程中运行的对象要将某些信息事件或任务传递给在另一个线程中运行的对象它就必须与该对象同步
对于正在等待的生产者和使用者线程而言此类支持可选的公平排序策略默认情况下不保证这种排序但是使用公平设置为 true 所构造的队列可保证线程以 FIFO 的顺序进行访问公平通常会降低吞吐量但是可以减小可变性并避免得不到服务
LinkedBlockingQueue
一个基于已链接节点的范围任意的 blocking queue此队列按 FIFO(先进先出)排序元素队列的头部 是在队列中时间最长的元素队列的尾部 是在队列中时间最短的元素新元素插入到队列的尾部并且队列检索操作会获得位于队列头部的元素链接队列的吞吐量通常要高于基于数组的队列但是在大多数并发应用程序中其可预知的性能要低
单向链表结构的队列如果不指定容量默认为IntegerMAX_VALUE通过putLock和takeLock两个锁进行同步两个锁分别实例化notFull和notEmpty两个Condtion用来协调多线程的存取动作其中某些方法(如removetoArraytoStringclear等)的同步需要同时获得这两个锁并且总是先putLocklock紧接着takeLocklock(在同一方法fullyLock中)这样的顺序是为了避免可能出现的死锁情况(我也想不明白为什么会是这样?)
PriorityBlockingQueue
一个无界的阻塞队列它使用与类 PriorityQueue 相同的顺序规则并且提供了阻塞检索的操作虽然此队列逻辑上是无界的但是由于资源被耗尽所以试图执行添加操作可能会失败(导致 OutOfMemoryError)此类不允许使用 null 元素依赖自然顺序的优先级队列也不允许插入不可比较的对象(因为这样做会抛出ClassCastException)
看它的三个属性就基本能看懂这个类了
Java代码
[java]
private final PriorityQueue q;
private final ReentrantLock lock = new ReentrantLock(true)
private final Condition notEmpty = locknewCondition()
lock说明本类使用一个lock来同步读写等操作
notEmpty协调队列是否有新元素提供而队列满了以后会调用PriorityQueue的grow方法来扩容
DelayQueue
Delayed 元素的一个无界阻塞队列只有在延迟期满时才能从中提取元素该队列的头部 是延迟期满后保存时间最长的 Delayed 元素如果延迟都还没有期满则队列没有头部并且 poll 将返回 null当一个元素的getDelay(TimeUnitNANOSECONDS) 方法返回一个小于或等于零的值时则出现期满此队列不允许使用 null 元素
Delayed接口继承自Comparable我们插入的E元素都要实现这个接口
DelayQueue的设计目的间API文档
An unbounded blocking queue of Delayed elements in which an element can only be taken when its delay has expired The head of the queue is that Delayed element whose delay expired furthest in the past If no delay has expired there is no head and poll will returnnull Expiration occurs when an elements getDelay(TimeUnitNANOSECONDS) method returns a value less than or equal to zero Even though unexpired elements cannot be removed using take or poll they are otherwise treated as normal elements For example the size method returns the count of both expired and unexpired elements This queue does not permit null elements
因为DelayQueue构造函数了里限定死不允许传入comparator(之前的PriorityBlockingQueue中没有限定死)即只能在compare方法里定义优先级的比较规则再看上面这段英文The head of the queue is that Delayed element whose delay expired furthest in the past说明compare方法实现的时候要保证最先加入的元素最早结束延时而 Expiration occurs when an elements getDelay(TimeUnitNANOSECONDS) method returns a value less than or equal to zero说明getDelay方法的实现必须保证延时到了返回的值变为<=的int
上面这段英文中还说明了在poll/take的时候队列中元素会判定这个elment有没有达到超时时间如果没有达到poll返回null而take进入等待状态但是除了这两个方法队列中的元素会被当做正常的元素来对待例如size方法返回所有元素的数量而不管它们有没有达到超时时间而协调的Condition available只对take和poll是有意义的
另外需要补充的是在ScheduledThreadPoolExecutor中工作队列类型是它的内部类DelayedWorkQueue而DelayedWorkQueue的Task容器是DelayQueue类型而ScheduledFutureTask作为Delay的实现类作为Runnable的封装后的Task类也就是说ScheduledThreadPoolExecutor是通过DelayQueue优先级判定规则来执行任务的
BlockingDque+LinkedBlockingQueue
BlockingDque为阻塞双端队列接口实现类有LinkedBlockingDque双端队列特别之处是它首尾都可以操作LinkedBlockingDque不同于LinkedBlockingQueue它只用一个lock来维护读写操作并由这个lock实例化出两个Condition notEmpty及notFull而LinkedBlockingQueue读和写分别维护一个lock