MQ的消息补偿机制

MQ的消息补偿机制,第1张

目录

前言:

常见问题及解决思路

一、消息防丢方案

二、消息防堆积方案

三、消息发送失败补偿方案

3.1 消息发送失败处理方案

3.2 消息发送失败补偿方案

3.3 confirm方案对比

四、 消息消费失败处理方案

4.1.消息消费失败处理方案

4.2死信队列补偿机制

五、日志与监控

5.1关键节点日志记录

5.2 重点指标监控 

前言:

本文主要描述在MQ使用过程中遇见的一些异常情况,并对异常情况,我们有什么思路去解决问题。

常见问题及解决思路 一、消息防丢方案
  • 生产者开启Confirm确认机制。
  • MQ消息默认设置为持久化,为所有队列设置镜像队列。
  • 消费者默认设置手动确认autoAck=false,并设置死信队列。
二、消息防堆积方案
  • 加强对不合理使用MQ的审批。
  • 监控消费能力(耗时<300ms),及时预警。
  • 框架层实现发送方限流。(默认值:100条/s)
  • 设置消息TTL。
  • 使用惰性队列。
三、消息发送失败补偿方案    3.1 消息发送失败处理方案
  1. 场景一:消息找不到队列导致消息发送失败。

  

  • 方案一:设置备用交换机(AE)

        

所有无法正确路由的消息都发往这个备份交换器中,可以为所有的交换器设置同一个AE,不过这里需要提前确保的是AE已经正确的绑定了队列,最好类型也是fanout的。

  • 方案二:设置mandatory=true

当mandatory参数设为true时,交换器无法根据自身的类型和路由键找到一个符合条件的队列的话,那么RabbitMQ会调用Basic.Return命令将消息返回给生产者。

这时候可以通过调用channel.addReturnListener来添加ReturnListener监听器实现。

总结:建议使用方案二,错误消息记录Warn级别日志,进行监控。

  1. 场景二:生产者客户端发送出去之后可以发生网络丢包、网络故障等造成消息丢失。
  • 方案一:开启MQ的事务机制

在通过channel.txSelect方法开启事务之后,我们便可以发布消息给RabbitMQ了,如果事务提交成功,则消息一定到达了RabbitMQ中,如果在事务提交执行之前由于RabbitMQ异常崩溃或者其他原因抛出异常,这个时候我们便可以将其捕获,进而通过执行channel.txRollback方法来实现事务回滚。

缺点:只有消息成功被RabbitMQ接收,事务才能提交成功,否则我们便可在捕获异常之后进行事务回滚,与此同时可以进行消息重发。但是使用事务机制的话会“吸干”RabbitMQ的性能。

  • 方案二:生产者将信道设置成confirm(确认)模式

一旦信道进入confirm模式,所有在该信道上面发布的消息都会被指派一个唯一的ID(从1开始),一旦消息被投递到所有匹配的队列之后,RabbitMQ就会发送一个确认(Basic.Ack)给生产者(包含消息的唯一ID),这就使得生产者知晓消息已经正确到达了目的地了。RabbitMQ回传给生产者的确认消息中的deliveryTag包含了确认消息的序号,此外RabbitMQ也可以设置channel.basicAck方法中的multiple参数,表示到这个序号之前的所有消息都已经得到了处理。

总结:建议使用方案二,避免开启事务后的性能消耗。

3.2 消息发送失败补偿方案

方案一:当消息发送失败后,框架层将错误信息写入文件(在容器中,需要挂载日志文件),然后生产者起定时任务,读取文件中的信息,进行重发。

方案二:

当消息发送失败后,结合MQ配置,对消息进行重试并记录error日志,达到重试次数后,将处理结果通过回调接口的方式告诉生产者,生产者去进行额外的补偿机制。

总结:建议使用方案二,在方案一中,首先写文件是一件比较重的设计,涉及IO,其次异常发生时,对生产者不透明。生产者不知道何时消息能从文件中进行补发。

3.3 confirm方案对比

客户端实现生产者confirm有三种方式:

  • 普通confirm模式:每发送一条消息后,调用waitForConfirms()方法,等待服务器端confirm。实际上是一种串行confirm了。
  • 批量confirm模式:每发送一批消息后,调用waitForConfirms()方法,等待服务器端confirm。
  • 异步confirm模式:提供一个回调方法,服务端confirm了一条或者多条消息后Client端会回调这个方法

  • 第一种:

普通confirm模式最简单,publish一条消息后,等待服务器端confirm,如果服务端返回false或者超时时间内未返回,客户端进行消息重传。

关键代码如下:

  • 第二种:

批量confirm模式稍微复杂一点,客户端程序需要定期(每隔多少秒)或者定量(达到多少条)或者两则结合起来publish消息,然后等待服务器端confirm, 相比普通confirm模式,批量极大提升confirm效率,但是问题在于一旦出现confirm返回false或者超时的情况时,客户端需要将这一批次的消息全部重发,这会带来明显的重复消息数量,并且,当消息经常丢失时,批量confirm性能应该是不升反降的。

  • 第三种:

异步confirm模式的编程实现最复杂,Channel对象提供的ConfirmListener()回调方法只包含deliveryTag(当前Chanel发出的消息序号),我们需要自己为每一个Channel维护一个unconfirm的消息序号集合,每publish一条数据,集合中元素加1,每回调一次handleAck方法,unconfirm集合删掉相应的一条(multiple=false)或多条(multiple=true)记录。从程序运行效率上看,这个unconfirm集合最好采用有序集合SortedSet存储结构。实际上,SDK中的waitForConfirms()方法也是通过SortedSet维护消息序号的。

总结:建议使用方案一,发送方目前没有性能瓶颈。使用方案一进行可靠性确认。

四、 消息消费失败处理方案  4.1.消息消费失败处理方案

 由于网络波动等原因,导致消息消费失败

  • 方案一:设置死信队列

当消息发送失败后,设置requeue=false消息进入死信队列,并获取死信队列的长度,设置重新发送到正常队列的重试时间和重试间隔,重新发送到正常队列。

监控死信队列长度,日志记录及时预警。

  • 方案二:将消息存入本地客户端,进行重发。

将消息失败的信息、存入redis进行重发,或者将消息持久化到消费端的数据库表。

  • 方案三:设置回调队列,生产者重发消息

RPC方案,远程调用方案,生产者通过回调队列,对消费失败的消息,进行重发。

总结:死信队列能很好的处理消息消费失败的场景,使用MQ原生的支持,避免过重的设计,使用方案一即可。

4.2死信队列补偿机制

  • 方案一:当消息消费失败后,进入死信队列,运维同事监控死信队列,及时预警。当评估不是因为消息数据的原因,导致消费失败。邮件告知领导审批后,运维同事在管理平台,手动将消息迁移到正常队列。

  • 方案二:当消息消费失败后,进入死信队列,框架层实现逻辑,获取对应死信队列的消息长度,当大于0时并判断是否超过重试次数并达到重试间隔。当没有超过重试次数时,自动将消息从死信队列迁移到正常队列。

总结:为了实现自动恢复机制,框架层应该支持方案二的重试机制,避免人为参与产生的错误。

五、日志与监控 5.1关键节点日志记录
  • MQ成功接受消息时。(info)
  • 生产者消息发送失败时。(error)
  • 生产者confrim确认失败时。(error)
  • 生产消息量过大,限流时。(error)
  • 生产者连接MQ超时时。 (error)
  • 消息大小大于10KB时。(error)
  • 消费者成功消费消息是。(info)
  • 消费者连接MQ超时时。(error)
  • 消费者消费失败时。(error)
  • 消费者进入死信队列时。(error)
  • 消费耗时低于300ms时。(error)
 5.2 重点指标监控 

指标

安全值范围

队列堆积数

<10000

队列未消费者绑定数

<0

Channel数量

<100

系统TPS

<3000

集群TPS

<8000

消息大小

<10KB

消息丢失

<0

异常日志

<0

                                  消费耗时

<300ms

死信队列

<0条

CPU

<40%

内存

<35%

磁盘

>200MB

欢迎分享,转载请注明来源:内存溢出

原文地址: http://www.outofmemory.cn/langs/720538.html

(0)
打赏 微信扫一扫 微信扫一扫 支付宝扫一扫 支付宝扫一扫
上一篇 2022-04-25
下一篇 2022-04-25

发表评论

登录后才能评论

评论列表(0条)

保存