【rabbitmq怎样实现消费分组?rabbitmq怎么实现类似kafka的comsumerGroup分组机制?】

【rabbitmq怎样实现消费分组?rabbitmq怎么实现类似kafka的comsumerGroup分组机制?】,第1张

【rabbitmq怎样实现消费分组?rabbitmq怎么实现类似kafka的comsumerGroup分组机制?】 说明

最近使用rabbitmq作为消息队列,发现由生产者产生的一条消息要被集群A消费,同时一个月后可能会接入集群B也要消费该消息,在过去我们使用kafka的时候,kafka有consumer.group-id的概念,不同服务定义不同的consumer.group-id就可以做到集群A都是A服务属于同一个consumer.group-id,消费的时候集群A中只有一个节点会消费,同时接入集群B也是一样只要消费者Id不一样即可正常消费,很容易就实现了消费分组,同样的道理我们现在使用rabbitmq怎么去做到类似的消费分组功能呢?

利用rabbitmq的广播交换机FanoutExchange来实现消息分组消费

为什么能实现?这里的交换机FanoutExchange其实就类似于我们的定义的通道,每来一组消息我们就定义一个新的名称的FanoutExchange交换机,比如login事件消息要发送业务服务A和大数据服务B,我们就定义一个Login_FanoutExchange交换机,然后业务服务A需要使用该Login_FanoutExchange交换机过来的消息那么由业务服务A自行去定义自己的Queue并和该Login_FanoutExchange交换机进行绑定,绑定方式这里不介绍,有配置文件绑定,也有注解的绑定方式,自行选择你喜欢的,然后这个QueueA就可以消费该Login_FanoutExchange交换机过来的消息了,同样的,一个月后大数据服务B要使用该交换机Login_FanoutExchange的消息了,那么服务B也定义自己的QueueB去和该交换机绑定就行了,后面再来一个C服务也无所谓,同样定义自己的QueueC做绑定即可,这里要注意的就是这里面的Queue的name值设置每个服务应该不一样,Queue的name其实就相当于consumer.group-id以便rabbitmq来确定是不是同一个消费的标识;
后面如果有其他的一组消息事件定义不同的FanoutExchange即可,如果类比kafka,每一个这个FanoutExchange就相当于kafka的topic,每一个Queue的name值就相当于kafka的consumer.group-id,这样就实现了消息分组;
Queue的定义记得设置持久化就行,嗯,就是new Queue(“name”,true)第二个参数为true就行,毕竟交换机是不存储消息的,只有Queue可以持久化消息;

大概的图就是这样子:

欢迎分享,转载请注明来源:内存溢出

原文地址: http://www.outofmemory.cn/zaji/5687001.html

(0)
打赏 微信扫一扫 微信扫一扫 支付宝扫一扫 支付宝扫一扫
上一篇 2022-12-17
下一篇 2022-12-17

发表评论

登录后才能评论

评论列表(0条)

保存