Logstash整合kafka方法。
环境ubuntu16.04
kafka0.9.0.0安装
logstash2.4.1安装
zookeeper3.4.9安装
JDK1.8安装
启动zk
cd /app/zookeeper/bin
./zkServer.sh start
启动kafka
cd /app/kafka
bin/kafka-server-start.sh -daemon config/server.properties &
创建topic
bin/kafka-topics.sh --create --zookeeper node1:2181 --replication-factor 1 --partitions 1 --topic test_topic
查看topic
bin/kafka-topics.sh --list --zookeeper node1:2181?
新开窗口连接kafka客户端消费消息
bin/kafka-console-consumer.sh --zookeeper node1:2181?--topic test_topic?
整合logstash和kafka
官方指导文件https://www.elastic.co/guide/en/logstash/current/index.html
output plugins --->kafka
注意版本对应
编辑logstash的配置启动文件
cd /app/logstash
vi FileLogstashKafka.conf? ? 这个文件要自己创建
input {
? ? file{
? ? ? ? path => "/app/logstash/testFile.txt"
? ? }
?}
output {
? kafka {
? ? codec => json
? ? topic_id => "test_topic"
? ? bootstrap_servers => "node1:9092"
? ? batch_size => 1
? }
}
配置说明:logstash从文件testFile.txt中采集数据,然后把数据输出到kafka的test_topic主题中,batch_size=1只是测试用,生产根据具体业务来定。
启动logstash的FileLogstashKafka.conf文件
bin/logstash -f FileLogstashKafka.conf
往testFile.txt文件中添加数据
cd /app/logstash
echo "hello word" >> testFile.txt
查看kafka消费者是否消费了这条消息
欢迎分享,转载请注明来源:内存溢出
评论列表(0条)