目录
一、介绍
二、七种阻塞队列
三、阻塞队列的实现原理
1. 初始化对象时
2. put()添加时
3. take()获取时
四、DelayQueue实例
五、参考资料
一、介绍
阻塞队列BlockingQueue是对队列的 *** 作进行阻塞,主要有两个功能:
阻塞插入:队列满时,会阻塞插入元素的线程,直到队列不满。阻塞移除:队列空时,会阻塞获取元素的线程,直到队列非空。如下表所示,对阻塞插入/移除有4中处理方式,且对应的方法有所不同。
方法/处理方式 | 抛出异常 | 返回特殊值 | 一直阻塞 | 超时退出 |
插入 | add(e) | offer(e) | put(e) | offer(e, ltimeout, unit) |
移除 | remove() | poll() | take() | poll(timeout, unit) |
检查 | element() | peek() | 不可用 | 不可用 |
注意 | 1. 抛出异常:队列已满,抛出IllegalStateException; 队列为空,抛出NoSuchElementException 2. 返回特殊值:插入成功,则true;否则返回null; 获取成功,则为元素,否则返回null 3. 一直阻塞:直到队列可用或响应中断退出 4. 超时退出:超过指定时间则退出 |
如下图所示,是BlockingQueue的实现类图。
有界队列是指队列的元素数量有限,即队列元素个数。公平访问队列是指阻塞的线程按照阻塞的先后顺序访问队列,即先阻塞线程先访问队列。非公平性访问有可能先阻塞的线程最后才访问队列。一般默认非公平访问,提高了并发量和吞吐量。如下表所示,是七种阻塞队列的对比。
类型 | 特点 |
ArrayBlockingQueue | 1. 数组结构的有界阻塞队列; 2. FIFO原则对元素排序; 3. 支持公平访问队列,默认非公平; 4. 初始化this.items = new Object[capacity]。 |
LinkedBlockingQueue | 1. 链表结构的有界阻塞队列; 2. FIFO原则对元素排序; 3. 默认最大长度Integer.MAX_VALUE; 4. 初始化last = head = new Node 5. 线程池SingleThreadExecutor、FixedThreadPool使用该队列。 |
PriorityBlockingQueue | 1. 元素排序的无界阻塞队列; 2. 默认是自然升序;自定义Comparator进行排序; 3. 不能保证同顺序元素的顺序。 |
DelayQueue | 1. 延时无界阻塞队列; 2. 只有延时到期时才能从队列获取元素; 3. 队列元素必须实现Delayed接口; 4. 应用场景:缓存系统的设计、定时任务调度等。 |
SynchronousQueue | 1. 不存储元素的同步阻塞队列; 2. 每次添加元素时,必须等待移除(队列不存储元素),反之依然; 3. 适用场景:传递性的数据; 4. 线程池CachedThreadPool使用该队列。 |
LinkedTransferQueue | 1. 链表结构的无界阻塞队列; 2. 消费者空闲时,添加元素时直接传输给消费者,无需添加到队列尾部; 消费者非空时,添加元素时添加到队列尾部,且等到消费者消费才返回; 3. transfer()见2;tryTransfer():判定是否能直接传给消费者。 |
LinkedBlockingDeque | 1. 链表结构的双向阻塞队列; 2. FIFO双向队列:队列两端都可插入/移除元素; 3. 优点:两端 *** 作,多线程减少竞争。 |
如果队列是空的,消费者会一直等待,当生产者添加元素时,消费者是如何知道当前队列有元素的呢?查看java.util.concurrent.ArrayBlockingQueue使用了Condition的等待/通知机制实现。
1. 初始化对象时如下代码所示,初始化ArrayBlockingQueue对象时,会初始化notEmpty、notFull属性。
public ArrayBlockingQueue(int capacity, boolean fair) {
if (capacity <= 0)
throw new IllegalArgumentException();
this.items = new Object[capacity];
// 获取重入锁
lock = new ReentrantLock(fair);
// 等待take()操作的线程
notEmpty = lock.newCondition();
// 等待put()操作的线程
notFull = lock.newCondition();
}
2. put()添加时
添加元素时,队列已满则进入while循环,调用Condition的await()来阻塞当前线程,进入WAITING状态;退出while循环,说明队列不满,唤醒当前线程。
// 添加元素
public void put(E e) throws InterruptedException {
checkNotNull(e);
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
// 队列已满时,Condition调用await()来阻塞当前线程
while (count == items.length)
notFull.await();
// 退出while循环,即:队列不满时,唤醒当前线程
enqueue(e);
} finally {
lock.unlock();
}
}
// 队列不满时,唤醒当前线程
private void enqueue(E x) {
// assert lock.getHoldCount() == 1;
// assert items[putIndex] == null;
final Object[] items = this.items;
items[putIndex] = x;
if (++putIndex == items.length)
putIndex = 0;
count++;
// 唤醒当前线程
notEmpty.signal();
}
3. take()获取时
获取元素时,队列已空则进入while循环,调用Condition的await()来阻塞当前线程,进入WAITING状态;退出while循环,说明队列不空,唤醒当前线程。
// 移除元素
public E take() throws InterruptedException {
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
// 队列已空时,Condition调用await()来阻塞当前线程
while (count == 0)
notEmpty.await();
// 退出while循环,即:队列不空时,唤醒当前线程
return dequeue();
} finally {
lock.unlock();
}
}
// 队列不空时,唤醒当前线程
private E dequeue() {
// assert lock.getHoldCount() == 1;
// assert items[takeIndex] != null;
final Object[] items = this.items;
@SuppressWarnings("unchecked")
E x = (E) items[takeIndex];
items[takeIndex] = null;
if (++takeIndex == items.length)
takeIndex = 0;
count--;
if (itrs != null)
itrs.elementDequeued();
// 唤醒当前线程
notFull.signal();
return x;
}
四、DelayQueue实例
DelayQueue是支持延时获取元素的无界阻塞队列。其元素必须实现Delayed接口,在创建元素时指定过期时间。只有在延迟期满时才能从队列中提取元素。
元素实现Delayed接口,必须覆写getDelay(TimeUnit unit)、compareTo(Delayed o)方法。注意getDelay()方法时可以指定任意时间单位,一旦以秒或分作为单位,而延时时间又精确不到就麻烦了。使用时请注意当scheduledTime小于当前时间时,getDelay会返回负数。
可以参考ScheduledThreadPoolExecutor.ScheduledFutureTask类的实现。
DelayQueue delayQueue = new DelayQueue<>();
public static SimpleDateFormat dateFormat = new SimpleDateFormat("HH:mm:ss");
@Test
public void delayQueueTest() throws InterruptedException {
// 添加元素
new Thread(new Runnable() {
@SneakyThrows
@Override
public void run() {
for (int i = 0; i < 3; i++) {
// 生成随机的过期时间,10~20s
int random = (int)(Math.random()*10 + 10);
// 创建元素
MyDelayElement myDelayElement = new MyDelayElement("myDelayElement" + i, random * 1000);
// 添加到延时队列
delayQueue.add(myDelayElement);
}
}
}).start();
Thread.sleep(1000);
while (true) {
// 延时队列为元素为0时,退出循环
if (delayQueue.isEmpty()) {
break;
} else {
// 弹出一个元素
MyDelayElement myDelayElement = delayQueue.poll();
if (Objects.isNull(myDelayElement)) {
continue;
} else {
System.out.println(myDelayElement.getName() + ", currentTime: " + dateFormat.format(new Date(myDelayElement.getCurrentTime())) + ", scheduledTime: " + dateFormat.format(new Date(myDelayElement.getScheduledTime())));
System.out.println("=============================================");
}
}
}
System.out.println("currentThread end");
}
// 定义队列元素
public class MyDelayElement implements Delayed {
// 当前时间
private final long currentTime = System.currentTimeMillis();
private String name;
// 到期的绝对时间
private long scheduledTime;
public MyDelayElement(String name, long delayTime) {
this.name = name;
// 计算到期的绝对时间
scheduledTime = currentTime + delayTime;
}
@Override
public long getDelay(TimeUnit unit) {
return unit.convert(scheduledTime - System.currentTimeMillis(), TimeUnit.MILLISECONDS);
}
@Override
public int compareTo(Delayed o) {
return (int) (this.scheduledTime - ((MyDelayElement) o).scheduledTime);
}
public String getName() {
return name;
}
public long getScheduledTime() {
return scheduledTime;
}
public long getCurrentTime() {
return currentTime;
}
}
// 代码执行结果
myDelayElement1, currentTime: 15:44:59, scheduledTime: 15:45:09
=============================================
myDelayElement0, currentTime: 15:44:59, scheduledTime: 15:45:13
=============================================
myDelayElement2, currentTime: 15:44:59, scheduledTime: 15:45:18
=============================================
currentThread end
五、参考资料
Lock锁<一> _ 基础_爱我所爱0505的博客-CSDN博客
DelayQueue实现原理及应用场景分析_五星上炕的博客-CSDN博客_delayqueue
欢迎分享,转载请注明来源:内存溢出
评论列表(0条)