- Kafka的简单使用
- 1.kafka常用命令
- 2. go语言的kafka API
- 2.1.sarama
- 2.2下载及安装
- 2.3使用
- kafka知识点目录
Linux
//启动zookeeper ./zookeeper-server-start.sh ../config/zookeeper.properties //启动kafka ./kafka-server-start.sh ../config/server.properties //创建一个1分区的topic ./kafka-topics.sh --create --bootstrap-server 127.0.0.1:9093 --replication-factor 1 --partitions 1 --topic test //查看当前主题中的所有topic ./kafka-topics.sh --list --bootstrap-server 127.0.0.1:9093 //查看指定的topic ./kafka-topics.sh --describe --bootstrap-server 127.0.0.1:9092 --topic test //生产消息 ./kafka-console-producer.sh --broker-list 127.0.0.1:9093 --topic test //消费消息 //⽅式⼀:从当前主题中的最后⼀条消息的offset(偏移量位置)+1开始消费 ./kafka-console-consumer.sh --bootstrap-server 127.0.0.1:9093 --topic test //⽅式⼆:从当前主题中的第⼀条消息开始消费 ./kafka-console-consumer.sh --bootstrap-server 127.0.0.1:9093 --from-beginning --topic test
windows
//启动zookeeper .binzkServer.cmd //启动kafka .binwindowskafka-server-start.bat .configserver.properties //创建一个1分区的topic .binwindowskafka-topics.bat --create --zookeeper 127.0.0.1:2183 --replication-factor 1 --partitions 1 --topic test //查看当前主题中的所有topic .binwindowskafka-topics.bat --list --bootstrap-server 127.0.0.1:9093 //查看指定的topic .binwindowskafka-topics.bat --describe --zookeeper 127.0.0.1:9093 --topic test //生产消息 .binwindowskafka-console-producer.bat --broker-list 127.0.0.1:9093 --topic test //消费消息 .binwindowskafka-console-consumer.bat --bootstrap-server 127.0.0.1:9093 --topic test --from-beginning
2. go语言的kafka API 2.1.sarama
Go语言中连接kafka使用第三方库: github.com/Shopify/sarama。
2.2下载及安装go get github.com/Shopify/sarama
注意事项: sarama v1.20之后的版本加入了zstd压缩算法,需要用到cgo,在Windows平台编译时会提示类似如下错误: github.com/DataDog/zstd exec: “gcc”:executable file not found in %PATH% 所以在Windows平台请使用v1.19版本的sarama。(如果不会版本控制请查看博客里面的go module章节)
参考视频:
https://www.bilibili.com/video/BV1pE41117MG?p=12
连接kafka发送消息
package main import ( "fmt" "github.com/Shopify/sarama" ) // 基于sarama第三方库开发的kafka client func main() { config := sarama.NewConfig() config.Producer.RequiredAcks = sarama.WaitForAll // 发送完数据需要leader和follow都确认 config.Producer.Partitioner = sarama.NewRandomPartitioner // 新选出一个partition config.Producer.Return.Successes = true // 成功交付的消息将在success channel返回 // 构造一个消息 msg := &sarama.ProducerMessage{} msg.Topic = "web_log" msg.Value = sarama.StringEncoder("this is a test log") // 连接kafka client, err := sarama.NewSyncProducer([]string{"127.0.0.1:9092"}, config) if err != nil { fmt.Println("producer closed, err:", err) return } defer client.Close() // 发送消息 pid, offset, err := client.SendMessage(msg) if err != nil { fmt.Println("send msg failed, err:", err) return } fmt.Printf("pid:%v offset:%vn", pid, offset) }
连接kafka消费消息
package main import ( "fmt" "github.com/Shopify/sarama" ) // kafka consumer func main() { consumer, err := sarama.NewConsumer([]string{"127.0.0.1:9092"}, nil) if err != nil { fmt.Printf("fail to start consumer, err:%vn", err) return } partitionList, err := consumer.Partitions("web_log") // 根据topic取到所有的分区 if err != nil { fmt.Printf("fail to get list of partition:err%vn", err) return } fmt.Println(partitionList) for partition := range partitionList { // 遍历所有的分区 // 针对每个分区创建一个对应的分区消费者 pc, err := consumer.ConsumePartition("web_log", int32(partition), sarama.OffsetNewest) if err != nil { fmt.Printf("failed to start consumer for partition %d,err:%vn", partition, err) return } defer pc.AsyncClose() // 异步从每个分区消费信息 go func(sarama.PartitionConsumer) { for msg := range pc.Messages() { fmt.Printf("Partition:%d Offset:%d Key:%v Value:%v", msg.Partition, msg.Offset, msg.Key, msg.Value) } }(pc) } }
kafka知识点目录
1.Linux环境部署kafka
2.Win10环境部署kafka
3.docker部署kafka
4.kafka的简单使用
5.kafka消息的细节
6.kafka主题和分区的概念
7.kafka集群 *** 作
8.kafka生产者实现细节
9.kafka消费者实现细节
10.kafka集群中的controller、rebalance、HW
11.kafka中的优化问题
12.Kafka-eagle监控平台
13.kafka错误汇总
欢迎分享,转载请注明来源:内存溢出
评论列表(0条)