阻塞队列BlockingQueue

阻塞队列BlockingQueue,第1张

目录

一、介绍

二、七种阻塞队列

三、阻塞队列的实现原理

1. 初始化对象时

2. put()添加时

3. take()获取时

四、DelayQueue实例

五、参考资料


一、介绍

        阻塞队列BlockingQueue是对队列的 *** 作进行阻塞,主要有两个功能:

阻塞插入:队列满时,会阻塞插入元素的线程,直到队列不满。阻塞移除:队列空时,会阻塞获取元素的线程,直到队列非空

        如下表所示,对阻塞插入/移除有4中处理方式,且对应的方法有所不同。

方法/处理方式抛出异常返回特殊值一直阻塞超时退出
插入add(e)offer(e)put(e)offer(e, ltimeout, unit)
移除remove()poll()take()poll(timeout, unit)
检查element()peek()不可用不可用
注意

1. 抛出异常:队列已满,抛出IllegalStateException;

                      队列为空,抛出NoSuchElementException

2. 返回特殊值:插入成功,则true;否则返回null;

                          获取成功,则为元素,否则返回null

3. 一直阻塞:直到队列可用或响应中断退出

4. 超时退出:超过指定时间则退出

二、七种阻塞队列

        如下图所示,是BlockingQueue的实现类图。

        有界队列是指队列的元素数量有限,即队列元素个数公平访问队列是指阻塞的线程按照阻塞的先后顺序访问队列,即先阻塞线程先访问队列。非公平性访问有可能先阻塞的线程最后才访问队列。一般默认非公平访问,提高了并发量和吞吐量。如下表所示,是七种阻塞队列的对比。

类型特点
ArrayBlockingQueue

1. 数组结构的有界阻塞队列;

2. FIFO原则对元素排序;

3. 支持公平访问队列,默认非公平

4. 初始化this.items = new Object[capacity]。

LinkedBlockingQueue

1. 链表结构的有界阻塞队列;

2. FIFO原则对元素排序;

3. 默认最大长度Integer.MAX_VALUE;

4. 初始化last = head = new Node(null);

5. 线程池SingleThreadExecutor、FixedThreadPool使用该队列。

PriorityBlockingQueue

1. 元素排序的无界阻塞队列;

2. 默认是自然升序;自定义Comparator进行排序

3. 不能保证同顺序元素的顺序。

DelayQueue

1. 延时无界阻塞队列;

2. 只有延时到期时才能从队列获取元素

3. 队列元素必须实现Delayed接口

4. 应用场景:缓存系统的设计、定时任务调度等。

SynchronousQueue

1. 不存储元素的同步阻塞队列;

2. 每次添加元素时,必须等待移除(队列不存储元素),反之依然

3. 适用场景:传递性的数据;

4. 线程池CachedThreadPool使用该队列。

LinkedTransferQueue

1. 链表结构的无界阻塞队列;

2. 消费者空闲时,添加元素时直接传输给消费者,无需添加到队列尾部

    消费者非空时,添加元素时添加到队列尾部,且等到消费者消费才返回;

3. transfer()见2;tryTransfer():判定是否能直接传给消费者。

LinkedBlockingDeque

1. 链表结构的双向阻塞队列;

2. FIFO双向队列:队列两端都可插入/移除元素

3. 优点:两端 *** 作,多线程减少竞争。

三、阻塞队列的实现原理

        如果队列是空的,消费者会一直等待,当生产者添加元素时,消费者是如何知道当前队列有元素的呢?查看java.util.concurrent.ArrayBlockingQueue使用了Condition的等待/通知机制实现

1. 初始化对象时

        如下代码所示,初始化ArrayBlockingQueue对象时,会初始化notEmpty、notFull属性

public ArrayBlockingQueue(int capacity, boolean fair) {
    if (capacity <= 0)
        throw new IllegalArgumentException();
    this.items = new Object[capacity];
    // 获取重入锁
    lock = new ReentrantLock(fair);
    // 等待take()操作的线程
    notEmpty = lock.newCondition();
    // 等待put()操作的线程
    notFull =  lock.newCondition();
}
2. put()添加时

         添加元素时,队列已满则进入while循环,调用Condition的await()来阻塞当前线程,进入WAITING状态;退出while循环,说明队列不满,唤醒当前线程

// 添加元素    
public void put(E e) throws InterruptedException {
    checkNotNull(e);
    final ReentrantLock lock = this.lock;
    lock.lockInterruptibly();
    try {
        // 队列已满时,Condition调用await()来阻塞当前线程
        while (count == items.length)
            notFull.await();
        // 退出while循环,即:队列不满时,唤醒当前线程
        enqueue(e);
    } finally {
        lock.unlock();
    }
}

// 队列不满时,唤醒当前线程    
private void enqueue(E x) {
    // assert lock.getHoldCount() == 1;
    // assert items[putIndex] == null;
    final Object[] items = this.items;
    items[putIndex] = x;
    if (++putIndex == items.length)
        putIndex = 0;
    count++;
    // 唤醒当前线程
    notEmpty.signal();
}
3. take()获取时

        获取元素时,队列已空则进入while循环,调用Condition的await()来阻塞当前线程,进入WAITING状态;退出while循环,说明队列不空,唤醒当前线程

// 移除元素
public E take() throws InterruptedException {
    final ReentrantLock lock = this.lock;
    lock.lockInterruptibly();
    try {
        // 队列已空时,Condition调用await()来阻塞当前线程
        while (count == 0)
            notEmpty.await();
        // 退出while循环,即:队列不空时,唤醒当前线程
        return dequeue();
    } finally {
        lock.unlock();
    }
}

// 队列不空时,唤醒当前线程 
private E dequeue() {
    // assert lock.getHoldCount() == 1;
    // assert items[takeIndex] != null;
    final Object[] items = this.items;
    @SuppressWarnings("unchecked")
    E x = (E) items[takeIndex];
    items[takeIndex] = null;
    if (++takeIndex == items.length)
        takeIndex = 0;
    count--;
    if (itrs != null)
        itrs.elementDequeued();
    // 唤醒当前线程
    notFull.signal();
    return x;
}
四、DelayQueue实例

        DelayQueue是支持延时获取元素的无界阻塞队列。其元素必须实现Delayed接口,在创建元素时指定过期时间。只有在延迟期满时才能从队列中提取元素。

        元素实现Delayed接口,必须覆写getDelay(TimeUnit unit)、compareTo(Delayed o)方法。注意getDelay()方法时可以指定任意时间单位,一旦以秒或分作为单位,而延时时间又精确不到就麻烦了。使用时请注意当scheduledTime小于当前时间时,getDelay会返回负数。

        可以参考ScheduledThreadPoolExecutor.ScheduledFutureTask类的实现。

DelayQueue delayQueue = new DelayQueue<>();
public static SimpleDateFormat dateFormat = new SimpleDateFormat("HH:mm:ss");
@Test
public void delayQueueTest() throws InterruptedException {
    // 添加元素
    new Thread(new Runnable() {
        @SneakyThrows
        @Override
        public void run() {
            for (int i = 0; i < 3; i++) {
                // 生成随机的过期时间,10~20s
                int random = (int)(Math.random()*10 + 10);

                // 创建元素
                MyDelayElement myDelayElement = new MyDelayElement("myDelayElement" + i, random * 1000);
                // 添加到延时队列
                delayQueue.add(myDelayElement);
            }
        }
    }).start();

    Thread.sleep(1000);

    while (true) {
        // 延时队列为元素为0时,退出循环
        if (delayQueue.isEmpty()) {
            break;
        } else {
            // 弹出一个元素
            MyDelayElement myDelayElement = delayQueue.poll();
            if (Objects.isNull(myDelayElement)) {
                continue;
            } else {
                System.out.println(myDelayElement.getName() + ", currentTime: " + dateFormat.format(new Date(myDelayElement.getCurrentTime())) + ", scheduledTime: " + dateFormat.format(new Date(myDelayElement.getScheduledTime())));
                System.out.println("=============================================");
            }
        }
    }
    System.out.println("currentThread end");
}

// 定义队列元素
public class MyDelayElement implements Delayed {
    // 当前时间
    private final long currentTime = System.currentTimeMillis();

    private String name;
    // 到期的绝对时间
    private long scheduledTime;

    public MyDelayElement(String name, long delayTime) {
        this.name = name;
        // 计算到期的绝对时间
        scheduledTime = currentTime + delayTime;
    }

    @Override
    public long getDelay(TimeUnit unit) {
        return unit.convert(scheduledTime - System.currentTimeMillis(), TimeUnit.MILLISECONDS);
    }

    @Override
    public int compareTo(Delayed o) {
        return (int) (this.scheduledTime - ((MyDelayElement) o).scheduledTime);
    }

    public String getName() {
        return name;
    }

    public long getScheduledTime() {
        return scheduledTime;
    }

    public long getCurrentTime() {
        return currentTime;
    }

}
// 代码执行结果
myDelayElement1, currentTime: 15:44:59, scheduledTime: 15:45:09
=============================================
myDelayElement0, currentTime: 15:44:59, scheduledTime: 15:45:13
=============================================
myDelayElement2, currentTime: 15:44:59, scheduledTime: 15:45:18
=============================================
currentThread end
五、参考资料

Lock锁<一> _ 基础_爱我所爱0505的博客-CSDN博客

DelayQueue实现原理及应用场景分析_五星上炕的博客-CSDN博客_delayqueue      

欢迎分享,转载请注明来源:内存溢出

原文地址: http://www.outofmemory.cn/langs/755836.html

(0)
打赏 微信扫一扫 微信扫一扫 支付宝扫一扫 支付宝扫一扫
上一篇 2022-04-30
下一篇 2022-04-30

发表评论

登录后才能评论

评论列表(0条)

保存