RocketMQ面试题

RocketMQ面试题,第1张

RocketMQ面试题 内容分类详情Java高频面试题汇总入口JVMJVM面试题并发并发面试题SpringCloudSpringCloud面试题DubboDubbo面试题RocketMQRocketMQ面试题 多个 MQ 如何选型 MQ描述RabbitMQerlang 开发,对消息堆积的支持并不好,当大量消息积压的时候,会导致 RabbitMQ 的性能急剧下 降。每秒钟可以处理几万到十几万条消息。RocketMQJava 开发,面向互联网集群化功能丰富,对在线业务的响应时延做了很多的优化,大多数情况下可以做 到毫秒级的响应,每秒钟大概能处理几十万条消息。KafkaScala 开发,面向日志功能丰富,性能最高。当你的业务场景中,每秒钟消息数量没有那么多的时候, Kafka 的时延反而会比较高。所以,Kafka 不太适合在线业务场景。ActiveMQJava 开发,简单,稳定,性能不如前面三个。小型系统用也可以,但是不推荐。推荐用互联网主流的。 为什么要使用 MQ 作用描述解耦系统耦合度降低,没有强依赖关系。异步不需要同步执行的远程调用可以有效提高响应时间。削峰请求达到峰值后,后端 service 还可以保持固定消费速率消费,不会被压垮。 RocketMQ 由哪些角色组成,每个角色作用和特点是什么 角色作用Nameserver无状态,动态列表;这是和 ZooKeeper 的重要区别之一。ZooKeeper 是有状态的。Producer消息生产者,负责发消息到 Broker。Broker就是 MQ 本身,负责收发消息、持久化消息等。Consumer消息消费者,负责从 Broker 上拉取消息进行消费,消费完进行 ack。 队列和topic的关系


Broker信息怎么注册

broker向namesrv注册

    在broker初始化完成后,就会进行启动工作,BrokerController.start方法中有这么一行代码

    首先是去TopicConfigManager组件中,把本地的TopicConfig表中的数据与数据的版本封装成一个wrapper实体

    调用doRegisterBrokerAll方法进行注册

RocketMQ 消费模式有几种

消费模型由 Consumer 决定,消费维度为 Topic。

集群消费

1.一条消息只会被同 Group 中的一个 Consumer 消费

2.多个 Group 同时消费一个 Topic 时,每个 Group 都会有一个 Consumer 消费到数据。

广播消费

消息将对一个 Consumer Group 下的各个 Consumer 实例都消费一遍。即即使这些
Consumer 属于同一个 Consumer Group,消息也会被 Consumer Group 中的每个
Consumer 都消费一次。

消费消息是push还是pull

RocketMQ没有真正意义的push,都是pull,虽然有push类,但实际底层实现采用的是长轮询机制,即拉取方式

消息流转过程

生产阶段,Producer 新建消息,然后通过网络将消息投递给 MQ Broker存储阶段,消息将会存储在 Broker 端磁盘中消息阶段, Consumer 将会从 Broker 拉取消息 发送消息的过程 三种发送方式

同步 :发送者向 MQ 执行 送消息 API 时,同步等待,直到消息服务器返回发送结果。异步 :发送者向 MQ 执行发送消息 API 时,指定消息发送成功后的回调函数,然后调用消息发送 API 后,立即返回,消息发送者线程不阻塞 ,直到运行结束,消息发送成功或失败的回调任务在 个新的线程中执行。如果消息发送失败会进行重试,可以通过调整参数来控制消息重试次数。单向::消息发送者向 MQ 执行发送消息 时,直接返回,不等待消息服务器的结果,
也不注册回调函数,简单地说,就是只管发,不在乎消息是否成功存储在消息服务器上。

消息发送流程主要的步骤有:验证消息、查找路由、 消息发送

1.消息长度验证

消息在发送之前,首先要确保生产者处于运行状态,然后验证消息是否符合相应的规范,具体的规范要求是主题名称、消息体不能为、 消息长度不能等于0且不能超过允许发送消息的最大长度 4M (maxMessageSize=1024 * 1024 *4)

2.查找主题路由信息

消息发送之前,首先需要获取Topic的路由信息,只有获取了这些信息我们才知道消息要发送的Broker节点的信息。

3.消息发送

消费消息的过程

队列分配,即消费端的负载均衡,由 RebalanceImpl 组件实现

RocketMQ在消费端的负载均衡如下图所示,各个partition均匀分布在各个consumer上,如下图所示:

进入到rebalanceImpl的doRebalance方法,其中调用了rebalanceByTopic,我们看看rebalanceByTopic中的逻辑:

先获取topic下的MessageQueue,一个MessageQueue实际上就是一个partition然后获取当前topic和group的client id,即当前group中消费此topic的客户端随后对partition和client id做排序然后调用strategy获取当前客户端需要消费的partition最后更新订阅

拉取消息,在队列分配完成的基础上,从 Broker 中拉取消息 ,由 PullMessageService 组件实现;拉取完成后对应的 PullCallback 处理

消息处理,触发消费逻辑以及后续的消费结果处理,由 ConsumeMessageService 组件实现

队列分配流程

队列分配流程如下:

获取指定 Topic 下的消息队列集合如果是广播模式,则不需要进行负载均衡,消费者直接负责所有消息队列集群模式则需要获取指定 Topic 的所有消费者集合,根据负载均衡算法将消息队列分配给消费者消息队列分配完毕后,则需要为每个消息队列创建对应的任务队列,即 ProcessQueue为每个任务队列创建对应的消息拉取任务,后续消息拉取服务会定时扫描任务池进行消息拉取 *** 作

RocketMQ Broker中的消息被消费后会立即删除吗

不会,每条消息都会持久化到CommitLog中,每个Consumer连接到Broker后会维持消费进度信息,当有消息消费后只是当前Consumer的消费进度(CommitLog的offset)更新了

那么消息会堆积吗?什么时候清理过期消息

不会,4.6版本默认48小时后会删除不再使用的CommitLog文件

检查这个文件最后访问时间判断是否大于过期时间指定时间删除,默认凌晨4点 为什么要主动拉取消息而不使用事件监听方式

事件驱动方式是建立好长连接,由事件(发送数据)的方式来实时推送。

如果broker主动推送消息的话有可能push速度快,消费速度慢的情况,那么就会造成消息在consumer端堆积过多,同时又不能被其他consumer消费的情况。而pull的方式可以根据当前自身情况来pull,不会造成过多的压力而造成瓶颈。所以采取了pull的方式。

如何保证消息不丢失

producer

采取send()同步发消息,发送结果是同步感知的。发送失败后可以重试,设置重试次数。默认3次。

producer.setRetryTimesWhenSendFailed(10);

集群部署,比如发送失败了的原因可能是当前Broker宕机了,重试的时候会发送到其他Broker上。

Broker端

修改刷盘策略为同步刷盘。默认情况下是异步刷盘的。

flushDiskType = SYNC_FLUSH

集群部署,主从模式,高可用。

Consumer端

完全消费正常后在进行手动ack确认。

消息重复消费产生的场景

ACK

正常情况下在consumer真正消费完消息后应该发送ack,通知broker该消息已正常消费,从queue中剔除

当ack因为网络原因无法发送到broker,broker会认为词条消息没有被消费,此后会开启消息重投机制把消息再次投递到consumer

消费模式

在CLUSTERING模式下,消息在broker中会保证相同group的consumer消费一次,但是针对不同group的consumer会推送多次

如何处理consumer重复消费

方式一:Redis的setNX() , 做消息id去重 java版本目前不支持设置过期时间

//Redis中 *** 作,判断是否已经 *** 作过 TODO
boolean flag =  jedis.setNX(key);
if(flag){
        //消费
}else{
        //忽略,重复消费
}

方式二:redis的 Incr 原子 *** 作:key自增,大于0 返回值大于0则说明消费过,(key可以是消息的md5取值, 或者如果消息id设计合理直接用id做key)

int num =  jedis.incr(key);
if(num == 1){
    //消费
}else{
    //忽略,重复消费
}

方式三:数据库去重表

设计一个去重表,某个字段使用Message的key做唯一索引,因为存在唯一索引,所以重复消费会失败

CREATE TABLE message_record ( id int(11) unsigned NOT NULL AUTO_INCREMENT, key varchar(128) DEFAULT NULL, create_time datetime DEFAULT NULL, PRIMARY KEY (id), UNIQUE KEY key (key) ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4;
消息堆积产生的场景

下游消费系统如果宕机了,导致几百万条消息在消息中间件里积压,此时怎么处理

如何解决堆积的大量消息

先定位问题,如果能正常消费,排除代码问题。通过上线更多consumer临时解决消息堆积问题。

比如说消费者挂了,然后broker堆积了很多消息,然后可以先把堆积的消息读到别的地方比如mysql或者es然后去后续进行处理,然后把RocketMQ堆积的消息删掉,启动消费者保障消费者正常消费,这里要注意的是删除堆积消息之前,需要停止mq。

堆积的消息会不会进死信队列

不会,消息在消费失败后会进入重试队列(%RETRY%+ConsumerGroup),16次(默认16次)才会进入死信队列(%DLQ%+ConsumerGroup)。

看过RocketMQ 的源码吗,使用的设计模式有哪些

设计模式有单例、工厂、策略、门面模式。单例工厂无处不在,策略印象深刻比如发消息和消费消息的时候queue的负载均衡就是N个策略算法类,有随机、hash等,这也是能够快速扩容天然支持集群的必要原因之一。持久化做的也比较完善,采取的CommitLog来落盘,同步异步两种方式。

高吞吐量下如何优化生产者和消费者的性能

同一group下,多机部署,并行消费

单个Consumer提高消费线程个数

批量消费

消息批量拉取业务逻辑批量处理

欢迎分享,转载请注明来源:内存溢出

原文地址: https://www.outofmemory.cn/zaji/5716356.html

(0)
打赏 微信扫一扫 微信扫一扫 支付宝扫一扫 支付宝扫一扫
上一篇 2022-12-17
下一篇 2022-12-17

发表评论

登录后才能评论

评论列表(0条)

保存