(三)延迟队列DelayQueue实现订单自动取消

(三)延迟队列DelayQueue实现订单自动取消,第1张

DelayQueue :,1)java自带延时获取元素, 无界 阻塞队列,2)队列 内部用PriorityQueue实现 。     创建元素时可 指定多久 才能从队列中获取当前元素。期满才从队列中 提取 ,没到延时时间, 阻塞 当前线程。

泛型队列,继承Delayed,需重写getDelay和compareTo方法。

1.public class DelayQueue <E extends Delayed >extends AbstractQueue <E>

2.public int compareTo (T o)往DelayQueue 加入数据 执行,根据返回值判断位置。排得越 前,越先被消费

3. long getDelay (TimeUnit unit)判断消息是否到期。负数,已到期,可读。

优点: java自带,轻量级,使用简单

缺点: 存储 内存中 ,服务器 重启 会造成数据 丢失 ,配合redis使用。数量大用mq

订单类,实现Delayed接口

unit.convert(this.createdTime.toInstant(ZoneOffset.of("+8")).toEpochMilli()+expireTime-System.currentTimeMillis(),TimeUnit.MILLISECONDS)

DelayQueue 分布式 环境中就会 重复执行;所以加redis:

每次生成订单时, 同时向 redis setnx 设定该未支付订单,

每次查询待支付订单时须从 redis 中也查一遍,

redis 不存在该订单,改为已取消。

AB 两个队列,A 队列设置 消息过期时间 , 没有消费者 ,A 过期自动转发到 B , B 队列消费者 取消 。

https://www.jianshu.com/p/2ef8646fe8f7

https://www.v2ex.com/amp/t/407801

早期需要延迟处理的业务场景,更多的是通过定时任务扫表,然后执行满足条件的记录,具有频率高、命中低、资源消耗大的缺点。随着消息中间件的普及,延迟消息可以很好的处理这种场景,本文主要介绍延迟消息的使用场景以及基于常见的消息中间件如何实现延迟队列,最后给出了一个在网易公开课使用延迟队列的实践。

1、有效期:限时活动、拼团。。。

2、超时处理:取消超时未支付订单、超时自动确认收货。。。

4、重试:网络异常重试、打车派单、依赖条件未满足重试。。。

5、定时任务:智能设备定时启动。。。

1、RabbitMQ

1)简介:基于AMQP协议,使用Erlang编写,实现了一个Broker框架

a、Broker:接收和分发消息的代理服务器

b、Virtual Host:虚拟主机之间相互隔离,可理解为一个虚拟主机对应一个消息服务

c、Exchange:交换机,消息发送到指定虚拟机的交换机上

d、Binding:交换机与队列绑定,并通过路由策略和routingKey将消息投递到一个或多个队列中

e、Queue:存放消息的队列,FIFO,可持久化

f、Channel:信道,消费者通过信道消费消息,一个TCP连接上可同时创建成百上千个信道,作为消息隔离

2)延迟队列实现:RabbitMQ的延迟队列基于消息的存活时间TTL(Time To Live)和死信交换机DLE(Dead Letter Exchanges)实现

a、TTL:RabbitMQ支持对队列和消息各自设置存活时间,取二者中较小的值,即队列无消费者连接或消息在队列中一直未被消费的过期时间

b、DLE:过期的消息通过绑定的死信交换机,路由到指定的死信队列,消费者实际上消费的是死信队列上的消息

3)缺点:

a、配置麻烦,额外增加一个死信交换机和一个死信队列的配置

b、脆弱性,配置错误或者生产者消费者连接的队列错误都有可能造成延迟失效

2、RocketMQ

1)简介:来源于阿里,目前为Apache顶级开源项目,使用Java编写,基于长轮询的拉取方式,支持事务消息,并解决了顺序消息和海量堆积的问题

a、Broker:存放Topic并根据读取Producer的提交日志,将逻辑上的一个Topic分多个Queue存储,每个Queue上存储消息在提交日志上的位置

b、Name Server:无状态的节点,维护Topic与Broker的对应关系以及Broker的主从关系

2)延迟队列实现:RocketMQ发送延时消息时先把消息按照延迟时间段发送到指定的队列中(rocketmq把每种延迟时间段的消息都存放到同一个队列中),然后通过一个定时器进行轮训这些队列,查看消息是否到期,如果到期就把这个消息发送到指定topic的队列中

3)缺点:延迟时间粒度受限制(1s/5s/10s/30s/1m/2m/3m/4m/5m/6m/7m/8m/9m/10m/20m/30m/1h/2h)

3、Kafka

1)简介:来源于Linkedin,目前为Apache顶级开源项目,使用Scala和Java编写,基于zookeeper协调的分布式、流处理的日志系统,升级版为Jafka

2)延迟队列实现:Kafka支持延时生产、延时拉取、延时删除等,其基于时间轮和JDK的DelayQueue实现

a、时间轮(TimingWheel):是一个存储定时任务的环形队列,底层采用数组实现,数组中的每个元素可以存放一个定时任务列表

b、定时任务列表(TimerTaskList):是一个环形的双向链表,链表中的每一项表示的都是定时任务项

c、定时任务项(TimerTaskEntry):封装了真正的定时任务TimerTask

d、层级时间轮:当任务的到期时间超过了当前时间轮所表示的时间范围时,就会尝试添加到上层时间轮中,类似于钟表就是一个三级时间轮

e、JDK DelayQueue:存储TimerTaskList,并根据其expiration来推进时间轮的时间,每推进一次除执行相应任务列表外,层级时间轮也会进行相应调整

3)缺点:

a、延迟精度取决于时间格设置

b、延迟任务除由超时触发还可能被外部事件触发而执行

4、ActiveMQ

1)简介:基于JMS协议,Java编写的Apache顶级开源项目,支持点对点和发布订阅两种模式。

a、点对点(point-to-point):消息发送到指定的队列,每条消息只有一个消费者能够消费,基于拉模型

b、发布订阅(publish/subscribe):消息发送到主题Topic上,每条消息会被订阅该Topic的所有消费者各自消费,基于推模型

2)延迟队列实现:需要延迟的消息会先存储在JobStore中,通过异步线程任务JobScheduler将到达投递时间的消息投递到相应队列上

a、Broker Filter:Broker中定义了一系列BrokerFilter的子类构成拦截器链,按顺序对消息进行相应处理

b、ScheduleBroker:当消息中指定了延迟相关属性,并且jobId为空时,会生成调度任务存储到JobStore中,此时消息不会进入到队列

c、JobStore:基于BTree存储,key为任务执行的时间戳,value为该时间戳下需要执行的任务列表

d、JobScheduler:取JobStore中最小的key执行(调度时间最早的),执行时间<=当前时间,将该任务列表依次投递到所属的队列,对于需要重复投递和投递失败的会再次存入JobStore中。

注: 此处JobScheduler的执行时间间隔可动态变化,默认0.5s,有新任务时会立即执行(Object->notifyAll())并设置时间间隔为0.1s,没有新任务后,下次执行时间为最近任务的调度执行时间。

3)缺点:投递到队列失败,将消息重新存入JobStore,消息调度执行时间=系统当前时间+延迟时间,会导致消息被真实投递的时间可能为设置的延迟时间的整数倍

5、Redis

1)简介:基于Key-Value的NoSQL数据库,由于其极高的性能常被当作缓存来使用,其数据结构支持:字符串、哈希、列表、集合、有序集合

2)延迟队列实现:Redis的延迟队列基于有序集合,score为执行时间戳,value为任务实体或任务实体引用

3)缺点:

a、实现复杂,本身不支持

b、完全基于内存,延迟时间长浪费内存资源

6、消息队列对比

1、公开课延迟队列技术选型

1)业务场景:关闭超时未支付订单、限时优惠活动、拼团

2)性能要求:订单、活动、拼团 数据量可控,上述MQ均能满足要求

3)可靠性:使用ActiveMQ、RabbitMQ、RocketMQ作为延迟队列更普遍

4)可用性:ActiveMQ、RocketMQ自身支持延迟队列功能,且目前公开课业务中使用的中间件为ActiveMQ和Kafka

5)延迟时间灵活:活动的开始和结束时间比较灵活,而RocketMQ时间粒度较粗,Kafka会依赖时间格有精度缺失

结论: 最终选择ActiveMQ来作为延迟队列

2、业务场景:关闭未支付订单

1)关闭微信未支付订单

2)关闭IOS未支付订单

3、ActiveMQ使用方式

1)activemq.xml中支持调度任务

2)发送消息时,设置message的延迟属性

其中:

a、延迟处理

AMQ_SCHEDULED_DELAY:设置多长时间后,投递给消费者(毫秒)

b、重复投递

AMQ_SCHEDULED_PERIOD:重复投递时间间隔(毫秒)

AMQ_SCHEDULED_REPEAT:重复投递次数

c、指定调度计划

AMQ_SCHEDULED_CRON:corn正则表达式

4、公开课使用中进行的优化

1)可靠性:针对实际投递时间可能翻倍的问题,结合ActiveMQ的重复投递,在消费者逻辑中做幂等处理来保证延迟时间的准确性

2)可追溯性:延迟消息及消费情况做数据库冗余存储

3)易用性:业务上定义好延迟枚举类型,直接使用JmsDelayTemplate发送,无需关心数据备份和参数等细节

1、无论是基于死信队列还是基于数据先存储后投递,本质上都是将延迟待发送的消息数据与正常订阅的队列分开存储,从而降低耦合度

2、无论是检查队头消息TTL还是调度存储的延迟数据,本质上都是通过定时任务来完成的,但是定时任务的触发策略以及延迟数据的存储方式决定了不同中间件之间的性能优劣

张浩,2018年加入网易传媒,高级Java开发工程师,目前在网易公开课主要做支付财务体系、版本迭代相关的工作。


欢迎分享,转载请注明来源:内存溢出

原文地址: https://www.outofmemory.cn/bake/11411913.html

(0)
打赏 微信扫一扫 微信扫一扫 支付宝扫一扫 支付宝扫一扫
上一篇 2023-05-15
下一篇 2023-05-15

发表评论

登录后才能评论

评论列表(0条)

保存