Reactor3 MpscLinkedQueue源码分析

Reactor3 MpscLinkedQueue源码分析,第1张

Reactor3 MpscLinkedQueue源码分析 前言

上一篇我们说到了SpsclinkedArrayQueue,它是一个单生产者单消费者的队列。在实际的开发过程中不可能都是单生产者,很多时候都是在高并发情况下需要引入队列来进行削峰。在Reactor3中MpsclinkedQueue就是这样一个多生产者单消费者的队列。

成员变量
//当前生产节点
private volatile linkedQueueNode producerNode;
//生产节点伴随的并发控制成员变量
    private final static AtomicReferenceFieldUpdater PRODUCER_NODE_UPDATER
            = AtomicReferenceFieldUpdater.newUpdater(MpsclinkedQueue.class, linkedQueueNode.class, "producerNode");
//当前消费节点
    private volatile linkedQueueNode consumerNode;
//消费节点伴随的并发控制成员变量
    private final static AtomicReferenceFieldUpdater CONSUMER_NODE_UPDATER
            = AtomicReferenceFieldUpdater.newUpdater(MpsclinkedQueue.class, linkedQueueNode.class, "consumerNode");

构造方法
public MpsclinkedQueue() {
        //初始化时使用一个空节点作为头结点
        linkedQueueNode node = new linkedQueueNode<>();
        //这里使用lazySet只保证了前面代码的有序性,后面的可见性没有保证,也就是只有storestore没有storeload
        CONSUMER_NODE_UPDATER.lazySet(this, node);
        //下面的代码会在最后加上storeLoad保证了整个结构的内存可见性
        PRODUCER_NODE_UPDATER.getAndSet(this, node);// this ensures correct construction:
        // StoreLoad
    }

这里使用lazySet的性能优化技巧在上一篇SpsclinkedArrayQueue没有提到,使用lazySet并不能保证修改变量后其他线程能马上看到最新值,所以在使用lazySet提升性能时,要清楚的知道自己再做什么。如果没有把握最好还是在保证程序正确的前提下在去优化性能。
上面的构造函数最后使用CAS会在最后加上StoreLoad内存屏障,将所有值都刷到主存,保证其他线程的可见。在Volatile变量写的前后分别加上StoreStore,StoreLoad是一种保守的策略,有些CPU会根据程序做一些优化。比如在这个例子中,中间如果将lazySet修改为CAS,有些CPU也是可以将这个CAS的效果优化为lazySet的效果,去掉后面的StoreLoad内存屏障。

生产
public final boolean offer(final E e) {
        Objects.requireNonNull(e, "The offered value 'e' must be non-null");

        //创建新的节点
        final linkedQueueNode nextNode = new linkedQueueNode<>(e);
        //这里使用CAS来控制并发,每次只有一个线程能抢占到节点
        final linkedQueueNode prevProducerNode = PRODUCER_NODE_UPDATER.getAndSet(this, nextNode);
        // Should a producer thread get interrupted here the chain WILL be broken until that thread is resumed
        // and completes the store in prev.next.
        //抢占成功后就可以偷懒使用lazySet慢悠悠的将值存放到这个节点,其他线程不会立马看到这个新的节点,
        //不过不会对结果有影响,订阅者会循环的拉取消息,过很小的一段时间就可以看到这个节点了
        prevProducerNode.soNext(nextNode); // StoreStore
        return true;
}


 public void soNext(linkedQueueNode n) {
      NEXT_UPDATER.lazySet(this, n);
}
消费
public E poll() {
        linkedQueueNode currConsumerNode = consumerNode; // don't load twice, it's alright
        linkedQueueNode nextNode = currConsumerNode.lvNext();

        if (nextNode != null) {
            // we have to null out the value because we are going to hang on to the node
            //获取下一个节点的值,并将他的值设置为null,因为这个节点即将成为新的头结点,
            //由于只是单消费者的原因,所以不会有并发安全的问题
            final E nextValue = nextNode.getAndNullValue();

            // Fix up the next ref of currConsumerNode to prevent promoted nodes from keeping new ones alive.
            // We use a reference to self instead of null because null is already a meaningful value (the next of
            // producer node is null).
            //将当前节点的下个节点的指针指向自己,不使用null的原因是null表示当前节点的下个节点可能是null.
            currConsumerNode.soNext(currConsumerNode);
            CONSUMER_NODE_UPDATeR.lazySet(this, nextNode);
            // currConsumerNode is now no longer referenced and can be collected
            return nextValue;
        } else if (currConsumerNode != producerNode) {
            //如果当前头节点的下一个节点为空,但是当前的头结点有和当前的生产节点是同一个节点,
            // 则说明当前头结点的数据不是最新的,下面通过一个循环来等待本地缓存刷新
            while ((nextNode = currConsumerNode.lvNext()) == null) {
            }
            //下面的逻辑就和上面一致了
            // got the next node...
            // we have to null out the value because we are going to hang on to the node
            final E nextValue = nextNode.getAndNullValue();

            // Fix up the next ref of currConsumerNode to prevent promoted nodes from keeping new ones alive.
            // We use a reference to self instead of null because null is already a meaningful value (the next of
            // producer node is null).
            currConsumerNode.soNext(currConsumerNode);
            CONSUMER_NODE_UPDATER.lazySet(this, nextNode);
            // currConsumerNode is now no longer referenced and can be collected
            return nextValue;
        }
        return null;
    }
总结

MpsclinkedQueue和SpsclinkedArrayQueue一样也是参考了JCTools,都是无锁队列。都再恰当的地方使用了lazySet来优化性能同时保证程序的正确性。当然,MpsclinkedQueue的单消费者的特性也不是由它自己来保证的。下一篇我们再来Reactor3如何使用这两个队列。

欢迎分享,转载请注明来源:内存溢出

原文地址: https://www.outofmemory.cn/zaji/5665631.html

(0)
打赏 微信扫一扫 微信扫一扫 支付宝扫一扫 支付宝扫一扫
上一篇 2022-12-16
下一篇 2022-12-16

发表评论

登录后才能评论

评论列表(0条)

保存