很多人都说:MQ 通过讲消息的发送和接收分离来实现应用程序的异步和解耦,这个给人的直觉是——MQ 就是异步的,是用来解耦的,但这个只是 MQ 的效果而不是目的。MQ 真正的目的是为了通讯,屏蔽底层复杂的通讯协议,定义了一套应用层的更加简单的通讯协议。
一个分布式系统中两个模块之间通讯要么是 HTTP,要么是自己开发的 TCP,但是这两种协议其实都是原始的协议。HTTP 协议很难实现两端通讯——模块 A 可以调用 B,B 也可以主动调用 A,如果要做到这个,两端都要背上 WebServer,而且还不长连接(HTTP 2.0 的库根本找不到)。TCP 就更加原始了,粘包、心跳、私有的协议,想一想就头皮发麻。
MQ 所要做的就是在这些协议之上构建一个简单的“协议”——生产者 / 消费者模型。MQ 带给我们的“协议”不是具体的通讯协议,而是更高层次的通讯模型。
消息队列的流派 有 Broker 的 MQ
这个流派通常有一台服务器作为 Broker,所有的消息都通过它中转。生产者把消息发送给它就结束自己的任务了,Broker 则把消息主动推给消费者(或者消费者主动轮询)
重 Topic (Kafka、ActiveMQ、JMS)生产者会发送 Key 和数据到 Broker,由 Broker 比较 Key 之后决定给哪个消费者。这种模式是我们最常见的模式,是我们对 MQ 最多的印象。在这种模式下,一个 Topic 往往是一个比较大的概念,甚至一个系统中就可能只有一个 Topic,Topic 某种意义上就是 Queue,生产者发送 Key 相当于说:“Hi,把数据放到 Key 的队列中”。
轻 Topic(RabbitMQ 或者 AMQP 协议)虽然架构一样但是 Kafka 的性能要比 JMS 的性能不知道高多少倍,所以基本上这种类型的 MQ 只有 Kafka 一种备选方案。
生产者发送 Key 和数据,消费者定义订阅的队列,Broker 收到数据之后会通过一定的逻辑计算出 Key 对应的队列,然后把数据交给队列。
这种模式下解耦了 Key 和 Queue,在这种架构中 Queue 是非常轻量级的(在 RabbitMQ 中它的上限取决于你的内存),消费者关心的知识自己的 Queue,生产者不必关心数据最终给谁,只要指定 Key 就行了,中间的那层映射在 QMQO 中叫做 Exchange。
AMQP 中有四种 Exchange:
这种结构的架构给通讯带来了很大的灵活性,我们能想到的通讯方式都可以用这四种 Exchange 表达出来。
无 Broker 的 MQ ZeroMQ无 Broker 的 MQ 的代表是 ZeroMQ。该作者非常睿智,他非常敏锐的意识到——MQ 是更高级的 Socket,它是解决通讯问题的,所以 ZeroMQ 被设计成了一个“库”而不是一个中间件。
节点之间通讯的消息都是发送到彼此的队列中,每个节点都既是生产者又是消费者。ZeroMQ 做的事情就是封装出一套类似于 Socket 的 API,可以完成发送数据、读取数据。
ZeroMQ 其实就是一个跨语言的、重量级的 Actor 模型邮箱库。你可以把自己的程序想象成一个 Actor,ZeroMQ 就是提供邮箱功能的库。ZeroMQ 可以实现同一台机器的 RPC 通讯也可以实现不同机器上的 TCP、UDP 通讯。
如果你需要一个强大的、灵活、野蛮的通讯能力,别犹豫 ZeroMQ。
Kafka 介绍
Kafka 最初由 linkedIn 公司开发,是一个分布式的、支持分区的(Partition)、多副本的(Replica)、基于 Zookeeper 协调的消息系统,它最大的特性就是可以实时的处理大量数据以满足各种需求场景。由 Scala 语言编写,linkedIn 与 2010 年贡献给了 Apache 基金会并成为顶级开源项目。
Kafka 的使用场景-
日志收集
一个公司可以用 Kafka 收集各种服务的 Log,通过 Kafka 以统一接口服务的方式开放给各种 Consumer,例如 Hadoop、Hbase、Solr 等。
-
消息系统:
解耦生产者和消费者、缓存消息等。
-
用户活动跟踪
Kafka 经常被用来记录 Web 用户或者 app 用户的各种活动,如浏览网页、搜索、点击等活动,这些活动信息被各个服务器发吧到 Kafka 的 Topic 中,然后订阅者通过订阅这些 Topic 来做实时的监控分析,后者装载到 Hadoop、数据仓库中做离线分析和挖掘。
-
运营指标
Kafka 也经常被用来记录运营监控数据,包括收集各种分布式应用的数据,生产各种 *** 作的集中反馈,比如报警和报告。
Kafka 是一个分布式的,分区的消息服务(官方称之为 Commit Log),它提供一个消息系统应该具备的功能,但是却有着独特的设计。
可以这样来说,Kafka 借鉴了 JMS 规范的思想,但是确并没有完全遵循 JMS 规范。
基础的消息相关术语:
服务端(Broker)和客户端(Producer、Consumer)之间通信通过 TCP 协议来完成。
Kafka 安装 环境准备
- jdk
- zookeeper
-
config/server.properties
# The id of the broker. This must be set to a unique integer for each broker. broker.id=6 # Hostname and port the broker will advertise to producers and consumers. advertised.listeners=PLAINTEXT://192.192.192.6:9092 # A comma separated list of directories under which to store log files. log.dirs=/opt/kafka/data # Zookeeperi connection string. zookeeper.connect=192.192.192.6:2181
- 启动 Zookeeper
cd /opt/zookeeper bin/zkServer.sh start
- 启动 Kafka
cd /opt/kafka bin/kafka-server-start.sh -daemon config/server.properties
- 验证是否启动成功(进入 ZK 查看指定 ID 的 Broker 是否存在)
cd /opt/zookeeper bin/zkCli.sh
ls /brokers/ids
# 创建主题 bin/kafka-topics.sh --create --bootstrap-server localhost:9092 --replication-factor 1 --partitions 1 --topic test # 查看 Kafka 有哪些 Topic bin/kafka-topics.sh --list --bootstrap-server localhost:9092 # 发送消息 bin/kafka-console-producer.sh --bootstrap-server localhost:9092 --topic test # 消费最新消息 bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 # 从头开始消费 bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --from-beginning关于消息的细节
- 生产者将消息发送给 Broker,Broker 会将消息保存在本地的日志文件中
/opt/kafka/data/<主题>-<分区>/00000000000000000000.log
- 消息的保存是有序的,通过 Offset 偏移量来描述消息的有序性
- 消费者消费消息时也是通过 Offset 来描述当前要消费的那条消息的位置
单播消息和多播消息 单播消息
在一个 Kafka 的 Topic 中,启动两个消费者和一个生产者,问:生产者发送消息,这条消息是否同时会被两个消费者消费?
如果两个消费者在同一个消费组,那么只有一个消费者可以收到订阅的 Topic 中的消息。
换言之,同一个消费组中只能有一个消费者收到一个 Topic 中的消息。
bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --consumer-property group.id=test-group多播消息
不同的消费组订阅同一个 Topic,那么不同的消费组中只有一个消费者能收到消息。
实际上也是多个消费组中的多个消费者收到了同一个消息。
bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --consumer-property group.id=test-group-1 bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --consumer-property group.id=test-group-2查看消费组的详细信息
bin/kafka-consumer-groups.sh --describe --bootstrap-server localhost:9092 --group test-group
重点关注以下几个信息:
Kafka 中主题和分区的概念 主题
Topic 在 Kafka 中是一个逻辑的概念,Kafka 通过 Topic 将消息进行分类,不同的 Topic 会被订阅该 Topic 的消费者消费。
但是有一个问题,如果说这个 Topic 中的消息非常非常多,因为消息是会被保存到 Log 日志文件中的,为了解决这个文件过大的问题, Kafka 提出了 Partition 的概念。
分区通过 Partition 将一个 Topic 中的消息分区来存储。这样的好处有多个:
- 分区存储,可以解决存储文件过大的问题
- 提供了读写的吞吐量,读和写可以同时在多个分区中进行
-
00000000000000000000.log
这个文件中保存的就是消息。文件中保存的消息,默认保存 7 天,7 天后消息会被删除。
-
__consumer_offsets-49
Kafka 内部创建了__consumer_offsets 主题,包含了 50 个分区,这个主题用来存放消费者消费某个主题的偏移量。每个消费者会把消费的主题的偏移量自主上报给 Kafka 的默认主题 __consumer_offsets,因此 Kafka 为了提升这个主题的并发性,默认设置了 50 个分区。
-
提交到哪个分区
通过 Hash 函数 hash(
) % <__consumer_offsets 主题的分区数> -
提交到该主题的内容
Key 为
+ + <分区号>,Value 就是当前 Offset 的值
-
Kafka 集群 *** 作 副本的概念
副本是为了主题中的分区创建多个备份,在 Kafka 集群的多个 Broker 中,会有一个副本作为 Leader,其它是 Follower。
查看 Topic 情况:
bin/kafka-topics.sh --describe --bootstrap-server localhost:9092 --topic my-replicated-topic
集群中有多个 Broker,创建主题时可以指明主题有多少个分区(把消息拆分到不同的分区中存储),可以为分区创建多个副本,不同的副本存放在不同的 Broker 里。
集群消费中分区分消费组的细节- 一个 Parittion 只能被一个消费组中的一个消费者消费,目的是为了保证消息的顺序性,但是多个 Partition 的多个消费者消费的总顺序是得不到保证的。
- Partition 的数量决定了消费组中消费者的数量,建议同一个消费者的数量不要超过 Partition 的数量,否则多的消费者消费不到消息。
- 如果消费者挂了,那么就会触发 Rebalance 机制,会让其它消费者来消费该分区。
客户端 API 依赖
生产者org.apache.kafka kafka-clients3.0.0
public class MyProducer { public static void main(String[] args) throws Exception { // 设置参数 Properties props = new Properties(); props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.192.192.6:9092"); props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); // 创建生产者 Producer生产者的同步发送消息producer = new KafkaProducer<>(props); // 创建消息 ProducerRecord record = new ProducerRecord<>("my-topic", "my-key", "my-value"); // 发送消息 Future future = producer.send(record); // 获取消息发送的元数据 Recordmetadata metadata = future.get(); System.out.printf("topic=%s partition=%d offset=%dn", metadata.topic(), metadata.partition(), metadata.offset()); } }
如果生产者发送消息没有收到 Ack,生产者会阻塞,阻塞到 3s 的时间,如果还没有收到消息,会进行重试,重试的次数为 3 次。
Future生产者的异步发送消息future = producer.send(record); Recordmetadata metadata = future.get();
异步发送,生产者发送完消息后就可以执行之后的业务,Broker 在收到 消息后异步调用生产者提供的 Callback 回调方法。
producer.send(record, (Recordmetadata metadata, Exception e) -> { if (e != null) { System.out.println("消息发送失败"); } });生产者中 Ack 配置
在同步发送的前提下,生产者在获得集群返回的 Ack 之前会一直阻塞,那么集群什么时候返回 Ack 呢?此时 Ack 有 3 个配置:
-
ack=0
Kafka 集群不需要任何的 Broker 收到消息,就立刻返回 Ack 给生产者。最容易丢消息,效率最高。
-
ack=1(默认)
多副本之间的 Leader 已经收到消息,并把消息写入到本地的 Log 中,才会返回 Ack 给生产者。性能和安全性是最均衡的。
-
ack=-1 / ack=all
有配置 min.insync.replicas=2(默认为 1,推荐配置大于等于 2),此时就需要 Leader 和一个 Follower 同步完后,才会返回 Ack 给生产者,此时集群中有 2 个 Broker 已完成数据的接收。这种方式最安全,但性能最差。
下面是关于 Ack 和重试的配置(如果没有收到 Ack 就开启重试):
props.put(ProducerConfig.ACKS_CONFIG, "1"); props.put(ProducerConfig.RETRIES_CONFIG, 3); // 重试间隔设置(发送失败会重试,默认间隔 100ms,重试能保证消息发送的可靠性,但是也可能造成消息重复发送,比如网络抖动,所以需要再接收者那边做好消息接收的幂等性处理) props.put(ProducerConfig.RETRY_BACKOFF_MS_CONFIG, 100);关于消息发送的缓冲区
- Kafka 默认会创建一个消息缓冲区,用来存放要发送的消息,缓冲区是 32m
props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, 33554432);
- Kafka 本地线程会去缓冲区中一次拉 16k 的数据发送给 Broker
props.put(ProducerConfig.BATCH_SIZE_CONFIG, 16384);
- 如果线程拉不到 16k 的数据,间隔 10ms 也会将已拉到的数据发送给 Broker
props.put(ProducerConfig.LINGER_MS_CONFIG, 10);
public class MyConsumer { public static void main(String[] args) { // 设置参数 Properties props = new Properties(); props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.192.192.6:9092"); props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); // 消费组 props.put(ConsumerConfig.GROUP_ID_CONFIG, "test-group"); // 创建消费者客户端 KafkaConsumer关于消费者自动提交和手动提交 Offset 提交的内容consumer = new KafkaConsumer<>(props); // 消费者订阅主题列表 consumer.subscribe(List.of("test")); while (true) { // poll() 是拉取消息的长轮询 ConsumerRecords records = consumer.poll(Duration.ofMillis(1000)); for (ConsumerRecord record : records) { System.out.printf("partitoin=%d offset=%d key=%s value=%sn", record.partition(), record.offset(), record.key(), record.value()); } } } }
消费者无论是自动提交还是手动提交,都需要把所属的消费组 + 消费的主题 + 消费的分区 + 消费的偏移量这样的信息提交到集群的 __consumer_offsets 主题里面。
自动提交消费者 poll 消息下来后就会自动提交 Offset。自动提交会丢消息,因为消费者在消费前提交 Offset,有可能提交完后还没消费就挂了。
// 是否自动提交 Offset(默认为 true) props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true"); // 自动提交 Offset 的间隔时间 props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "1000");手动提交
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
手动提交又分成了两种:
- 手动同步提交
在消费完消息后调用同步提交的方法,当集群返回 Ack 前一直阻塞,返回 Ack 后表示提交成功,执行之后的逻辑。
while (true) { ConsumerRecords
records = consumer.poll(Duration.ofMillis(1000)); for (ConsumerRecord record : records) { System.out.printf("partitoin=%d offset=%d key=%s value=%sn", record.partition(), record.offset(), record.key(), record.value()); } if (records.count() > 0) { // 手动同步提交 Offset,当前线程会阻塞,直到 Offset 提交成功 consumer.commitAsync(); } } - 手动异步提交
在消息消费完后提交,不需要等集群 Ack,直接执行之后的逻辑,可以设置一个回调方法供集群调用。
while (true) { ConsumerRecords
records = consumer.poll(Duration.ofMillis(1000)); for (ConsumerRecord record : records) { System.out.printf("partitoin=%d offset=%d key=%s value=%sn", record.partition(), record.offset(), record.key(), record.value()); } consumer.commitAsync((Map offsets, Exception e) -> { if (e != null) { System.out.println("提交失败"); } }); }
默认情况下,消费者一次会 poll 500 条消息。代码中设置了长轮询的时间为 1000 毫秒,意味着:
- 如果一次 poll 到 500 条,就直接执行 for 循环
- 如果这一次没有 poll 到 500 条,且时间在 1 秒内,那么长轮询继续 poll,要么到 500 条,要么到 1 秒
- 如果两次 poll 都没到 500 条,且 1 秒时间到了,那么直接执行 for 循环
如果两次 poll 的间隔超过 30 秒,集群会认为该消费者的消费能力过弱,该消费者会被踢出消费组,触发 Rebalance 机制,Rebalance 机制会造成性能开销。可以通过设置这个参数,让一次 poll 的消息少一点:
props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 500); props.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, 30 * 1000);消费者的健康状态检查
消费者每隔 1 秒向 Kafka 集群发送心跳,集群发现如果有超过 10 秒没有续约的消费者,将踢出消费组,触发该消费组的 Rebalance 机制,将分区交给消费组里的其它消费者进行消费。
props.put(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, 1000); props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 10 * 1000);指定分区、偏移量、时间消费
- 指定分区消费
consumer.assign(List.of(new TopicPartition("test", 0)));
- 指定 Offset 消费
// 从头消费 consumer.seekToBeginning(List.of(new TopicPartition("test", 0))); // 指定 Offset 消费 consumer.seek(new TopicPartition("test", 0), 10);
- 指定时间消费
根据时间,去所有的 Partition 中确定该时间对应的 Offset,然后去所有的 Partition 中找到该 Offset 之后的消息开始消费。
// long timestamp = new Date().getTime() - 1000 * 60 * 60; long timestamp = LocalDateTime.now().plusHours(-1).atZone(ZoneId.systemDefault()).toInstant().toEpochMilli(); Map
timestampsToSearch = consumer .partitionsFor(topic) .stream() .collect(Collectors.toMap( (PartitionInfo it) -> new TopicPartition(topic, it.partition()), (PartitionInfo it) -> timestamp, (v1, v2) -> v2 )); consumer .offsetsForTimes(timestampsToSearch) .forEach((TopicPartition key, OffsetAndTimestamp value) -> { // 如果查询偏移量的时间点大于最大的索引记录时间那么 value 就会为空 if (value != null) { // 根据 timestamp 确定 offset long offset = value.offset(); consumer.assign(List.of(key)); consumer.seek(key, offset); } });
新消费组中的消费者在启动以后,默认会从当前分区的最后一条消息的 offset + 1 开始消费(消费新消息)。通过以下的设置,让新的消费者第一次从头开始消费,之后开始消费新消息。
-
latest(默认)
消费新消息。
-
earliest
第一次从头开始消费,之后开始消费新消息。这个需要区别于 consumer.seekToBeginning()(每次都从头开始消费)。
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
Spring Boot 中使用 Kafka 依赖
配置文件org.springframework.kafka spring-kafka2.8.1
-
application.properties
spring.kafka.bootstrap-servers=192.192.192.6:9092 spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer spring.kafka.producer.value-serializer=org.apache.kafka.common.serialization.StringSerializer spring.kafka.producer.acks=1 spring.kafka.consumer.group-id=test-group spring.kafka.consumer.enable-auto-commit=false spring.kafka.consumer.auto-offset-reset=earliest spring.kafka.listener.ack-mode=MANUAL_IMMEDIATE
RECORD当每一条记录被消费者监听器处理之后提交 BATCH当每一批 poll 的数据被消费者监听器处理之后提交 TIME当每一批 poll 的数据被消费者监听器处理之后,距离上次提交时间大于 ack-time时提交 COUNT当每一批 poll 的数据被消费者监听器处理之后,被处理的 Record 数量大于等于 ack-count 时提交 COUNT_TIMETIME 和 COUNT 有一个条件满足时提交 MANUAL当每一批 poll 的数据被消费者监听器处理之后,手动调用 Acknowledgment.acknowledge() 后提交 MANUAL_IMMEDIATE手动调用 Acknowledgment.acknowledge() 后立即提交
@RestController public class MyKafkaController { @Autowired private KafkaTemplate消费者kafkaTemplate; @GetMapping("/send") public void send() { ListenableFuture > future = kafkaTemplate.send("test", 0, "key", "消息"); } }
@Component public class MyConsumer { @KafkaListener(topics = {"test"}, groupId = "test-group") public void listen(ConsumerRecord消费者中配置主题、分区、偏移量record, Acknowledgment ack) { System.out.println(record); // 手动提交 Offset ack.acknowledge(); } }
@KafkaListener( groupId = "test-group", topicPartitions = { @TopicPartition( topic = "topic-1", partitions = {"0", "1"}, partitionOffsets = {@PartitionOffset(partition = "1", initialOffset = "100")} ), @TopicPartition(topic = "topic-2") }, // 同组下的消费者个数,即并发消费数,建议小于等于分区总数 concurrency = "3" ) public void listen(ConsumerRecordrecord, Acknowledgment ack) { System.out.println(record); // 手动提交 Offset ack.acknowledge(); }
Kafka 集群中的 Controller、Rebalance、HW Controller
每个 Broker 启东时会向 ZK 创建一个临时序号节点,获得的序号最小的那个 Broker 将会作为集群中的 Controller,负责:
- 当集群中有一个副本的 Leader 挂掉,需要在集群中选举出一个新的 Leader,选举的规则是从 Isr 集合中最左边获得。
- 当集群中有 Broker 新增或减少,Controller 会同步信息给其它 Broker。
- 当集群中有分区新增或减少,Controller 会同步信息给其它 Broker。
消费组中的消费者没有指明分区来消费
触发的条件当消费组中的消费者和分区的关系发生变化的时候
分区分配的策略-
Range
根据公司计算得到每个消费者消费哪几个分区。前面的消费者是 <分区总数> / (<消费者数量> + 1),之后的消费者是 <分区总数> / <消费者数量>。
-
轮询
-
Sticky
粘合策略,如果需要 Rebalance,会在之前已分配的基础上调至,不会改变之前的分配情况。如果这个没有策略没有开启,那么就要进行全部重新分配。建议开启。
HW(High Watermark)俗称高水位,取一个 Partition 对应的 Isr 集合中最小的 LEO(LOG-END-OFFSET)作为 HW,Consumer 最多只能消费到 HW 所在的位置。另外每个 Replica 都有 HW,Leader 和 Follower 各自负责更新自己的 HW 状态。
对于 Leader 新写入的消息,Consumer 不能立刻消费,Leader 会等待该消息被所有 Isr 集合中的 Reolicas 同步后更新 HW,此时消息才能被 Consumer 消费。
这样就保证了如果 Leader 所在的 Broker 失效,但该消息仍然可以从新选举的 Leader 中获取,这样的目的是防止消息的丢失。
Kafka 的优化问题 如何防止消息丢失 生产者
- 使用同步发送。
- 把 Ack 设为 1 或者 all,并且设置同步的分区数(min.insync.replicas)为分区备份数。
- 把自动提交改成手动提交
在防止消息的方案中,如果生产者发送完消息后,因为网络抖动,没有收到 Ack,但实际上 Broker 已经收到消息了,此时生产者就会重试,于是 Broker 就会收到多条相同的消息,而造成消费者的重复消费。
解决方案-
生产者关闭重试
会造成消息丢失,不建议。
-
消费者解决非幂等性消费问题
业务 ID 唯一或分布式锁。
- 生产者保证消息按顺序消费,且消息不丢失,使用同步发送,Ack 设置成非 0 的值。
- 主题只能设置一个分区,消费组中只能有一个消费者。
消息的消费速度远赶不上生产速度,导致 Kafka 中有大量的数据没有被消费。随着没有消费的数据堆积越多,消费者的寻址性能会越来越差,最后导致整个 Kafka 对外提供的服务的性能很差,从而导致其它服务也访问速度变慢,造成服务雪崩。
解决方案- 在这个消费者中,使用多线程,充分利用机器的性能进行消费消息。
- 通过业务的架构设计,提高业务层面消费的性能。
- 创建多个消费组,多个消费者,部署到其它机器上,一起消费,提高消费者的消费速度
- 创建一个消费者,该消费者在 Kafka 另建一个主题,配上多个分区,再配上多个消费者。该消费者将 poll 下来的消息,不进行消费,直接转发到新建的主题上。此时,新的主题的多个分区的多个消费者就开始一起消费了。
订单创建后,超过 30 分钟没有支付,则需要取消订单,这种场景可以通过延时队列来实现。
解决方案- Kafka 中创建多个 Topic,每个 Topic 表示延时的间隔,如 topic_30m。
- 消息发送者发送消息到相应的 Topic,并带上消息的发送时间。
- 消费者订阅相应的 Topic,消费时长轮询消费整个 Topic 中的消息。
- 判断消费的发送时间和当前比较当前时间是否超过预设的值,如 30 分钟。
- 如果超过,去数据库中修改订单状态为已取消。
- 如果未超过,记录当前消息的 Offset 并不再继续消费之后的消息,下次消费 Offset 之后的消息继续进行判断。
欢迎分享,转载请注明来源:内存溢出
评论列表(0条)