kafka是一个分布式流处理平台,是三大MQ中间件之一。是一种高吞吐量的分布式发布订阅消息系统。
快速认知概念Broker: kafka的服务端程序,可以认为一个mq节点就是一个broker。
Topic: 每条发布到mq的消息都有一个类别,称为topic,主题的意思。
Producer: 生产者,创建消息发送给mq的topic
Consumer: 消费者,消费队列中的消息
Partition: 是Topic的实际存储空间,一个Topic有一个或多个Partition。Partition是一个有序队列
Replication 副本:也就是partition,副本分为leader和follower,learder挂了后,follower会自动升级为leader,只有leader才能和producer和consumer交互
ConsumerGroup:消费者组,同一个消费者组里同时只能有一个消费者能从相同的partition消费消息
MQ模型点对点:所有消费者在同一个组里,每条消息只会被一个消费者消费
发布订阅:比如每个消费者都属于不同组,则kafka消息可以广播到每个消费者
springboot 中对topic的 *** 作
springboot依赖版本
org.springframework.kafka spring-kafka2.7.0
创建和展示topic详情
public class KafkaAdminTest { public static final String TOPIC_NAME = "default_topic"; public KafkaAdmin kafkaAdmin(){ Mapconfig = new HashMap<>(); //填上自己的IP和端口 config.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG,"ip:port"); return new KafkaAdmin(config); } @Test public void createTopic(){ KafkaAdmin kafkaAdmin = kafkaAdmin(); //设置topic参数 名称 partition数量 备份数量(1代表只有leader,没有follower) 备份数 量不能大于集群节点数量,否则报错 NewTopic newTopic = new NewTopic(TOPIC_NAME, 6, (short)1); kafkaAdmin.createOrModifyTopics(newTopic); } @Test public void describeTopics(){ KafkaAdmin kafkaAdmin = kafkaAdmin(); Map describeTopics = kafkaAdmin.describeTopics(TOPIC_NAME); Set > entries = describeTopics.entrySet(); entries.stream().forEach((entry)-> System.err.println("name :"+entry.getKey()+" , desc: "+ entry.getValue())); }
对于更高级的功能,您可以AdminClient直接使用。KafkaAdmin内部也是使用AdminClient
public class KafkaAdminTest { private static final String TOPIC_NAME = "default_topic"; public static AdminClient initAdminClient(){ Properties properties = new Properties(); //填上自己的IP和端口 properties.setProperty(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG,"ip:port"); AdminClient adminClient = AdminClient.create(properties); return adminClient; } @Test public void createTopicTest(){ AdminClient adminClient = initAdminClient(); //指定分区数量,副本数量不能大于集群节点数量 NewTopic newTopic = new NewTopic(TOPIC_NAME,6,(short) 1); CreateTopicsResult createTopicsResult = adminClient.createTopics(Arrays.asList(newTopic)); try { //future等待创建,成功则不会有任何报错 createTopicsResult.all().get(); } catch (InterruptedException e) { e.printStackTrace(); } catch (ExecutionException e) { e.printStackTrace(); } } @Test public void listTopicTest() throws ExecutionException, InterruptedException { AdminClient adminClient = initAdminClient(); //是否查看内部的topic,可以不用 ListTopicsOptions options = new ListTopicsOptions(); options.listInternal(true); ListTopicsResult listTopicsResult = adminClient.listTopics(options); Settopics = listTopicsResult.names().get(); for(String name : topics){ System.err.println(name); } } @Test public void delTopicTest() throws ExecutionException, InterruptedException { AdminClient adminClient = initAdminClient(); DeleteTopicsResult deleteTopicsResult = adminClient.deleteTopics(Arrays.asList(TOPIC_NAME)); deleteTopicsResult.all().get(); } @Test public void detailTopicTest() throws ExecutionException, InterruptedException { AdminClient adminClient = initAdminClient(); DescribeTopicsResult describeTopicsResult = adminClient.describeTopics(Arrays.asList(TOPIC_NAME)); Map stringTopicDescriptionMap = describeTopicsResult.all().get(); Set > entries = stringTopicDescriptionMap.entrySet(); entries.stream().forEach((entry)-> System.out.println("name :"+entry.getKey()+" , desc: "+ entry.getValue())); } @Test public void incrPartitionTopicTest() throws ExecutionException, InterruptedException { Map infoMap = new HashMap<>(1); AdminClient adminClient = initAdminClient(); //分区数量不能比原有的数量小 NewPartitions newPartitions = NewPartitions.increaseTo(8); infoMap.put(TOPIC_NAME,newPartitions); CreatePartitionsResult createPartitionsResult = adminClient.createPartitions(infoMap); createPartitionsResult.all().get(); } }
上述代码对topic的 *** 作,在公司中可能并不需要开发人员 *** 作,下一期会讲解spingboot中生产者和消费者的代码开发。
欢迎分享,转载请注明来源:内存溢出
评论列表(0条)