kafka-topics.sh --bootstrap-server ${kafkaAddress} --topic ${topicName} --describe
kafka-topics.sh --bootstrap-server ${kafkaAddress} --delete--topic ${topicName} --partitions ${partitions} --replication-factor ${replication}
kafka-topics.sh --bootstrap-server ${kafkaAddress} --list
kafka-console-consumer.sh --bootstrap-server ${kafkaAddress} --topic ${topicName} --from-beginning
kafka-consumer-groups.sh --describe --bootstrap-server ${kafkaAddress} --group ${groupName}
a.修改partitions数量
kafka-topics.sh --bootstrap-server ${kafkaAddress} --topic ${topicName} --alter --partitions 4
b.创建increase-replication-factor.json in config,配置各分区replication-factor位置
c.更新replication-factor
kafka-reassign-partitions.sh --bootstrap-server ${kafkaAddress} --reassignment-json-file config/increase-replication-factor.json --execute
情况是这样的,在我们系统中有多个Consumer的客户端(客户端个数是不确定的,因为在系统工作过程中有的业务节点会脱离,有些业务节点会增加进来),Producer也有多个。但是Producer发送的消息种类只有一种,所以topic只创建了一个, 消息量很大,所以使用了多个Consumer来处理。现在想实现如下的订阅/推送效果,多个Producer进行消息的推送,例如消息X1、X2、X3、X4、X5.。。。。。。然后由多个Consumer分别进行拉去,Consumer1拉取到:X1、X4、X7。。。Consumer2拉取到:X2、X5、X8.。。。。。。。。如此类推kafka topic 分区的原因主要是为了并发访问,增大吞吐量。具体来讲,每一个topic均被分成多个互不重复的partition,其中每个partition均被放在一个或多个broker服务器上,这样就提供了容错。如果数据在不同的patition上,那么他们的访问将可能由不同的broker服务器来完成,这样就实现了并发访问。分区的信息储存在Zookeeper上。
所以,分区是kafka用来提供并发访问控制的一个概念。
欢迎分享,转载请注明来源:内存溢出
评论列表(0条)