文章目录提示:文章写完后,目录可以自动生成,如何生成可参考右边的帮助文档
- 前言
- 一、kafka集群是什么?
- 二、使用步骤
- 1.环境准备
- 2.集群规划
- 3.kafka集群部署安装
- 4.SpringBoot集成kafka
- 总结
前言
关于kafka,网上的介绍有很多,简单说就是消息中间件,大数据项目中经常使用,我们项目是用于接收日志流水数据。相比其他消息中间件RabbitMQ优势在于:
(1)性能高,每秒百万级别;
(2)分布式,高可用,水平扩展。
提示:以下是本篇文章正文内容,下面案例可供参考
Kafka 是一个分布式消息系统,具有高水平扩展和高吞吐量的特点。在Kafka 集群中,没有 “中心主节点” 的概念,集群中所有的节点都是对等的。
分布式模型
Kafka每个主题的多个分区日志分布式地存储在Kafka集群上,同时为了故障容错,每个分区都会以副本的方式复制到多个消息代理节点上 其中一个节点会作为主副本( Leader ),其 节点作为备份副本( Follower ,也叫作从副本) 主副本会负责所有的客户端读写 *** 作,备份副本仅仅从主副本同步数据 当主副本 IH 现故障时,备份副本中的 个副本会被选择为新的主副本 因为每个分区的副本中只有主副本接受读写,所以每个服务端都会作为某些分区的主副本,以及另外一些分区的备份副本这样Kafka集群的所有服务端整体上对客户端是负载均衡的
说明 | 软件 |
---|---|
*** 作系统 | Windows 10 |
虚拟机 | VMware 15.5 |
Linux系统 | Centos 7 64位 |
Zookeeper | zookeeper-3.4.10 |
Hadoop | hadoop-3.3.1 |
HBase | Hbase2.4.4 |
Hive | Hive3.1.2 |
Kafka | kafka_2.12-3.0.0 |
开发工具 | IntelliJ IDEA 2020.1 |
IP | HostName | Software | Process |
---|---|---|---|
192.168.74.88 | hadoop01 | hadoop,zookeeper,hbase,hive,mysql,kafka | DataNode,NodeManager,QuorumPeerMain,JournalNode,NameNode,DFSZKFailoverController,ResourceManager,HMaster,HRegionServer,RunJar,RunJar,kafka |
192.168.74.89 | hadoop02 | hadoop,zookeeper,kafka | DataNode,NodeManager,QuorumPeerMain,JournalNode,NameNode,DFSZKFailoverController,HRegionServer,kafka |
192.168.74.90 | hadoop03 | hadoop,zookeeper,hbase,hvie,kafka | DataNode,NodeManager,QuorumPeerMain,JournalNode,ResourceManager,HMaster,HRegionServer,RunJar,kafka |
代码如下(示例):
# 创建一个新目录root
mkdir -p /data/kafka
chown hadoop:hadoop /data/kafka
su hadoop
cd /data/kafka
tar -zxvf kafka_2.12-3.0.0.tgz
# 重命名一下文件夹
mv kafka_2.12-3.0.0 app
# 系统环境变量
vim ~/.bashrc
gedit ~/.bashrc
export KAFKA_HOME=/data/kafka/app
export PATH=$KAFKA_HOME/bin:$PATH
source ~/.bashrc
cd /data/kafka/app/config
# 配置server.properties
vim /data/kafka/app/config/server.properties
source /data/kafka/app/config/server.properties
# 如果是kafka集群,需配置全局id
broker.id=10
############################# Socket Server Settings #############################
# 可以不设置,kafka自动获取hostname
listeners=PLAINTEXT://nn:9092
advertised.listeners=PLAINTEXT://nn:9092
############################# Log Basics #############################
# 最终存放消息的路径,建议放在kafka组件目录下,方便管理
log.dirs=/data/kafka/app/kafka-logs
num.partitions=3
############################# Zookeeper #############################
zookeeper.connect=hadoop01:2181,hadoop02:2181,hadoop03:2181/kafka-zk
# Timeout in ms for connecting to zookeeper
zookeeper.connection.timeout.ms=6000
# 考虑到后面项目中,对kafka在zk上方便更为管理,用了新的配置:zookeeper.connect=nn:2181,dn1:2181,dn2:2181/kafka-zk
# 启动kafka进程
kafka-server-start.sh /data/fkafka/app/config/server.properties
# 启动后提示内存不足“There is insufficient memory ”
因为kafka的启动脚本为最大堆申请1G内存,由于使用虚拟机跑项目,资源有限,将 kafka-server-start.sh的export KAFKA_HEAP_OPTS="-Xmx1G -Xms1G"修改为export KAFKA_HEAP_OPTS="-Xmx256M -Xms128M",最大堆空间为256M,初始堆空间为128M。
vim /data/kafka/app/bin/kafka-server-start.sh
export KAFKA_HEAP_OPTS="-Xmx1G -Xms1G"
修改为:
export KAFKA_HEAP_OPTS="-Xmx256M -Xms128M"
#分发到其他服务器
scp -r /data/kafka/app hadoop@hadoop02:/data/kafka
scp -r /data/kafka/app hadoop@hadoop03:/data/kafka
# kafka集群部署与测试
# 每个节点id需唯一nn设88,dn1设89,dn2设90
broker.id=10
ip和端口这里可以不配置,kafka自动读取,也方便把整个kafka目录分发到其他节点上
#listeners=PLAINTEXT://:9092
# 存放的日志,kafka自动创建
log.dirs=/data/kafka/app/kafka-logs
# 配置zk集群
zookeeper.connect=hadoop01:2181,hadoop02:2181,hadoop03:2181
# 分布在三个节点上启动kafka服务
kafka-server-start.sh -daemon /data/kafka/app/config/server.properties
# 创建一个新的topic:sparkapp,3份拷贝,3分区
hadoop01: kafka-topics.sh --create --zookeeper hadoop01:2181,hadoop02:2181,hadoop03:2181 --replication-factor 3 --partitions 3 --topic sparkapp
hadoop01: kafka-topics.sh --create --bootstrap-server hadoop01:9092,hadoop02:9092,hadoop03:9092 --replication-factor 3 --partitions 3 --topic sparkapp
# 查看sparkapp主分区及其副本分区的情况
hadoop01: kafka-topics.sh --describe --zookeeper hadoop01:2181 --topic sparkapp
hadoop01: kafka-topics.sh --describe --bootstrap-server hadoop01:9092 --topic sparkapp
#在nn节点启动producer进程,连接broker分别为nn自己、dn1节点和dn2节点,都能正常连接,同理,dn1、dn2的producer进程使用dn1、dn2、nn节点都能正常连接
kafka-console-producer.sh --broker-list hadoop01:9092,hadoop02:9092,hadoop03:9092 --topic sparkapp
hadoop01: kafka-console-producer.sh --broker-list hadoop01:9092 --topic sparkapp > sparkapp
hadoop01: kafka-console-producer.sh --broker-list hadoop02:9092 --topic sparkapp > sparkapp
hadoop01: kafka-console-producer.sh --broker-list hadoop03:9092 --topic sparkapp > sparkapp
#在nn节点启动producer进程,然后在dn1节点、dn2节点以及nn新shell分别启动consumer,看看一个producer生产msg,其他三个节点能否同时收到
hadoop02: kafka-console-consumer.sh --bootstrap-server hadoop01:9092 --topic sparkapp
hadoop03: kafka-console-consumer.sh --bootstrap-server hadoop01:9092 --topic sparkapp
hadoop01: kafka-console-consumer.sh --bootstrap-server hadoop01:9092 --topic sparkapp
4.SpringBoot集成kafka
application.yml
生产者
消费者
总结
记录点点滴滴
欢迎分享,转载请注明来源:内存溢出
评论列表(0条)