Java中的几种阻塞队列
Java中的BlockingQueue接口是一个线程安全的存取队列,适用于生产者消费者的应用场景中,支持两个附加操作:
- 生产者线程会一直不断的往阻塞队列中放入数据,直到队列满了为止。队列满了后,生产者线程阻塞等待消费者线程取出数据。
- 消费者线程会一直不断的从阻塞队列中取出数据,直到队列空了为止。队列空了后,消费者线程阻塞等待生产者线程放入数据。
BlockingQueue接口
BlockingQueue提供四种不同的处理方法。
抛出异常
|
返回特殊值
|
一直阻塞
|
超时退出
|
|
插入方法
|
add(o)
|
offer(o)
|
put(o)
|
offer(o, timeout, timeunit)
|
移除方法
|
remove(o)
|
poll()
|
take(o)
|
poll(o, timeout, timeunit)
|
检查方法
|
element()
|
peek()
|
—
|
—
|
- 抛出异常:
- add: 插入数据时,如果阻塞队列满,那么抛出异常IllegalStateException,否则插入成功返回true。当使用有界(capacity-restricted queue)阻塞队列时,建议使用offer方法。
- IllegalStateException - if the element cannot be added at this time due to capacity restrictions
- ClassCastException - if the class of the specified element prevents it from being added to this queue
- NullPointerException - if the specified element is null
- IllegalArgumentException - if some property of the specified element prevents it from being added to this queue
- remove: 删除数据时,如果队列中有此数据,删除成功返回true,否则返回false。如果包含一个或者多个object,那么只移除一个就返回true。注意:remove(o)是BlockingQueue接口的方法,remove()是Queue接口的方法。
- element: 如果队列为空,那么抛出异常NoSuchElementException。如果队列不为空,查询返回队列头部的数据,但是不移除数据,这点不同于remove(),element同样是Queue接口的方法。
- 返回特殊值:
- offer: 插入数据时,如果阻塞队列没满,那么插入成功返回true,否则返回false。当使用有界(capacity-restricted queue)阻塞队列时,建议使用offer方法,不建议会抛出异常的add方法。
- poll: 此方法是Queue接口的。如果队列不为空,查询、移除并返回队列头部元素。如果队列为空,那么返回null。
- peek: 此方法是Queue接口的。如果队列为空,返回null,这点不同于poll。如果队列不为空,查询返回队列头部的数据,但是不移除数据,这点不同于remove()。
- 一直阻塞:
- put: 插入数据时,如果队列已满,那么阻塞等待队列可用,等待期间如果被中断,那么抛出InterruptedException。
- take: 查询、删除并返回队列头部元素,如果队列为空,那么阻塞等待队列可用,等待期间如果被中断,那么抛出InterruptedException。
- 超时退出:
- offer: 插入数据时,如果队列已满,那么阻塞指定时间等待队列可用,等待期间如果被中断,那么抛出InterruptedException。如果插入成功,那么返回true,如果在达到指定时间后仍然队列不可用,那么返回false。
- poll: 查询、删除并返回队列头部元素,如果队列为空,那么阻塞指定时间等待队列可用,等待期间如果被中断,那么抛出InterruptedException。如果删除成功,那么返回队列头部元素,如果在达到指定时间后仍然队列不可用,那么返回null。
Queue队列不能插入null,否则会抛出NullPointerException。
Java里的阻塞队列
JDK7提供了7个阻塞队列。分别是
- ArrayBlockingQueue :一个由数组结构组成的有界阻塞队列。
- LinkedBlockingQueue :一个由链表结构组成的有界阻塞队列。
- PriorityBlockingQueue :一个支持优先级排序的无界阻塞队列。
- DelayQueue:基于PriorityQueue实现的支持延时获取元素的阻塞队列。
- SynchronousQueue:一个不存储元素的阻塞队列。
- LinkedTransferQueue:一个由链表结构组成的无界阻塞队列。
- LinkedBlockingDeque:一个由链表结构组成的双向阻塞队列。
ArrayBlockingQueue
ArrayBlockingQueue是基于数组(array-based)的先进先出(FIFO)有界(bounded)阻塞队列。
- 创建队列时,必须要指定队列容量(capacity),即数组大小。
- 创建队列时,可以传入Collection来初始化队列元素。
- 队列一旦被创建,那么队列容量不能被改变。
- 队列支持公平模式和非公平模式,默认非公平模式。
- 队列中只有一把锁,写锁和读锁未分离,并发控制采用了经典的two-condition(notEmpty、notFull)算法。
LinkedBlockingQueue
LinkedBlockingQueue是基于链表(linked nodes)的先进先出(FIFO)的可选界(optionally-bounded)的阻塞队列。
- 创建队列时,为了避免额外开销,可以指定队列容量(capacity);如果不指定队列容量,那么默认队列容量为Integer.MAX_VALUE。
- 创建队列时,可以可以传入Collection来初始化队列元素,此时不能指定队列容量,默认为Integer.MAX_VALUE。
- 队列中的count即当前队列元素个数,采用AtomicInteger,避免put和take的竞争。
- 与ArrayBlockingQueue不同的是,LinkedBlockingQueue队列中有两把锁,读锁和写锁是分离的。
- 在使用LinkedBlockingQueue时,若队列大小为默认值,且生产速度大于消费速度时,可能会内存溢出。
- LinkedBlockingQueue理论上来说比ArrayBlockingQueue有更高的吞吐量,但是在大多数的实际应用场景中,却没有很好的表现。
PriorityBlockingQueue
PriorityBlockingQueue是基于数组(array based)的支持优先级的无界(unbounded)的阻塞队列。此队列的数据结构是堆。
- 创建队列时,如果指定初始化容量(initialCapacity),那么默认初始化容量DEFAULT_INITIAL_CAPACITY为11。
- 创建队列时,可以指定队列初始化容量(initialCapacity),不是队列容量(capacity)。
- PriorityBlockingQueue的无界(unbounded)相对于LinkedBlockingQueue的可选界(optionally-bounded)来说,无界是指不能在创建队列时,不能指定队列的最大容量(capacity),并不是说PriorityBlockingQueue本身无界。LinkedBlockingQueue默认(注意,这里指的是默认容量,即,你可以指定大于Integer.MAX_VALUE的值)的最大容量是Integer.MAX_VALUE,而PriorityBlockingQueue的最大容量是MAX_ARRAY_SIZE=Integer.MAX_VALUE-8。
- PriorityBlockingQueue无界的另一个意思就是生产者线程不会因为队列满了就阻塞,因为队列是无界的,没有容量满了这一说。offer(E e, long timeout, TimeUnit unit)的后两个参数没有任何作用查看源代码发现,其方法的实现直接是调用了offer(e)。但是当队列为空时,take仍然会阻塞。
- offer(e)永远不会返回false,offer(E e, long timeout, TimeUnit unit)永远不会返回false或者阻塞。
- PriorityBlockingQueue通过数组来实现队列,在原有数组满了的情况下,通过复制数组来扩展队列容量,如果新扩展的数组容量大小超过MAX_ARRAY_SIZE,那么抛出OutOfMemoryError异常。
- 默认情况下元素采取自然顺序排列,也可以通过比较器comparator来指定元素的排序规则。元素按照升序排列。
DelayQueue
DelayQueue是基于PriorityQueue实现的支持延时获取元素的阻塞队列。
- DelayQueue中存放的对象必须实现Delayed接口。
- 如果没有到期元素,那么就没有head,poll方法返回null。
- 当一个元素的getDelay(TimeUnit.NANOSECONDS)返回值小于等于0时,该元素过期。
- 虽然不能用take和poll移除未过期的元素,但是这些未过期的元素仍然和过期元素一样同等对待。例如,size方法返回的数量就是过期元素和未过期元素的之和。
DelayQueue的适用场景:
- 关闭空闲链接。服务器中有很多空闲链接,在一定时间后,关闭他们。
- 删除过期缓存。在一定时间后,删除某些缓存的对象。
- 任务超时处理。在网络协议滑动窗口请求应答式交互时,处理超时未响应的请求。
- 生成订单后的60秒后,给用户发送短信通知。
- 下单15分钟后,如果用户不付款就关闭订单。
通过上面几条场景例子,可以看出来,DelayQueue适用于在一定时间后,做某些业务处理。
SynchronousQueue
SynchronousQueue是一个没有数据缓冲的BlockingQueue。
- 一个线程的插入必须等待另一个线程的删除操作才能完成,反之亦然。
- SynchronousQueue适合传递性设计(handoff designs),即一个线程中运行的对象,需要将某些信息、任务或者事件等传递给另一个线程中运行的对象的场景。
- SynchronousQueue支持公平和非公平模式。
- 不能在同步队列上进行peek,因为仅在试图要移除元素时,该元素才存在。
- 不能迭代队列,因为其中没有元素可用于迭代。
- Executors.newCachedThreadPool()中就使用了SynchronousQueue队列。
LinkedTransferQueue
LinkedTransferQueue是基于链表(linked nodes)的无界(unbounded)阻塞队列。
- 无界队列(Integer.MAX_VALUE),进出队列采用FIFO(先进先出)原则。
- 生产者会一直阻塞直到所添加到队列的元素被某一个消费者所消费。主要用于线程间消息的传递,与SynchronousQueue很类似,但是比起SynchronousQueue更好用。
- LinkedTransferQueue既可以使用BlockingQueue的put方法进行常规的添加元素操作,也可以使用transfer方法进行阻塞添加。
- 相比SynchronousQueue灵活之处在于,队列长度非0,阻塞插入和非阻塞插入的元素可以共存。
- 如果当前有消费者正在等待接收元素(消费者使用take()方法或带时间限制的poll()方法时),transfer方法可以把生产者传入的元素立刻transfer(传输)给消费者。如果没有消费者在等待接收元素,transfer方法会将元素存放在队列的tail节点,并等到该元素被消费者消费了才返回。
LinkedBlockingDeque
LinkedBlockingDueue是基于链表(linked nodes)的可选界(optionally-bounded)的双向阻塞队列。
- 该阻塞队列同时支持FIFO和FILO两种操作方式,即可以从队列的头和尾同时操作(插入/删除)。
- 该阻塞队列是支持线程安全。
- 在初始化LinkedBlockingDeque时可以设置容量防止其过渡膨胀。
- 双向阻塞队列可以运用在“工作窃取”(work stealing)模式中。
注意:本文归作者所有,未经作者允许,不得转载