RocketMQ集群搭建

RocketMQ集群搭建,第1张

RocketMQ集群搭建

文章目录
  • 前言
  • 快速搭建RocketMQ集群
    • 机器环境
    • 创建用户
    • 系统配置
    • 关闭防火墙
    • 安装java
    • 安装RocketMQ
    • 配置RocketMQ集群
      • 配置第一组broker-a
    • 配置第二组Broker-b
    • 启动RocketMQ
      • 先启动nameServer
    • 再启动broker
      • 启动状态检查
      • 测试mqadmin管理工具
    • 命令行快速验证
  • 搭建管理控制台

前言

记录RocketMQ

快速搭建RocketMQ集群 机器环境

准备三台机器,root密码 root ;IP地址:

192.168.232.128 worker1 
192.168.232.129 worker2 
192.168.232.130 worker3
创建用户
useradd oper
passwd oper (密码输入 123qweasd)
系统配置

切换oper用户,在worker1上 生成key
ssh-kengen
然后分发给其他机器

ssh-copy-id worker1 
ssh-copy-id worker2 
ssh-copy-id worker3

这样就可以在worker1上直接ssh 或者scp到另外的机器,不需要输密码了。

关闭防火墙
systemctl stop firewalld.service 
firewall-cmd --state
安装java

给oper创建/app目录
上传jdk的tar包
修改~/.bash_profile,配置环境变量。source生效。

export JAVA_HOME=/app/jdk1.8/
安装RocketMQ

上传tar包,直接解压。然后配置环境变量

export ROCKETMQ_HOME=/app/rocketmq/rocketmq-all-4.7.1-bin-release 1

RocketMQ在4.5版本之前都不支持master宕机后slave自动切换。在4.5版本后,增加了基于Dleger实现的主从切换。

配置RocketMQ集群

搭建一个2主2从异步刷盘的集群,所以我们会使用conf/2m-2s-async下的配
文件,实际项目中,为了达到高可用,一般会使用dleger。预备设计的集群情况如下

所以修改的配置文件是进入rocketmq的config目录下修改2m-2s-async的配置文件。–只需要配置broker.conf。
在rocketmq的config目录下可以看到rocketmq建议的各种配置方式:

  • 2m-2s-async: 2主2从异步刷盘(吞吐量较大,但是消息可能丢失),
  • 2m-2s-sync:2主2从同步刷盘(吞吐量会下降,但是消息更安全),
  • 2m-noslave:2主无从(单点故障),然后还可以直接配置broker.conf,进行单点环境配置。
  • 而dleger就是用来实现主从切换的。集群中的节点会基于Raft协议随机选举出一个leader,其他的就都是follower。通常正式环境都会采用这种方式来搭建集群。

采用2m-2s-async的方式搭建集群

配置第一组broker-a

在worker2上先配置borker-a的master节点。先配置2m-2s-async/broker-a.properties

#所属集群名字,名字一样的节点就在同一个集群内 
brokerClusterName=rocketmq-cluster 
#broker名字,名字一样的节点就是一组主从节点。 
brokerName=broker-a 
#brokerid,0就表示是Master,>0的都是表示 
Slave brokerId=0 
#nameServer地址,分号分割 
namesrvAddr=worker1:9876;
worker2:9876;worker3:9876 
#在发送消息时,自动创建服务器不存在的topic,默认创建的队列数 
defaultTopicQueueNums=4 
#是否允许 Broker 自动创建Topic,建议线下开启,线上关闭
 autoCreateTopicEnable=true 
#是否允许 Broker 自动创建订阅组,建议线下开启,线上关闭 
autoCreateSubscriptionGroup=true 
#Broker 对外服务的监听端口 
listenPort=10911 
#删除文件时间点,默认凌晨 4点 
deleteWhen=04 
#文件保留时间,默认 48 小时 
fileReservedTime=120 
#commitLog每个文件的大小默认1G 
mapedFileSizeCommitLog=1073741824 
#ConsumeQueue每个文件默认存30W条,根据业务情况调整 
mapedFileSizeConsumeQueue=300000 
#destroyMapedFileIntervalForcibly=120000 
#redeleteHangedFileInterval=120000 
#检测物理文件磁盘空间 
diskMaxUsedSpaceRatio=88 
#存储路径 
storePathRootDir=/app/rocketmq/store
 #commitLog 存储路径 
storePathCommitLog=/app/rocketmq/store/commitlog 
#消费队列存储路径存储路径 
storePathConsumeQueue=/app/rocketmq/store/consumequeue 
#消息索引存储路径 
storePathIndex=/app/rocketmq/store/index 
#checkpoint 文件存储路径 
storeCheckpoint=/app/rocketmq/store/checkpoint 
#abort 文件存储路径
abortFile=/app/rocketmq/store/abort 
#限制的消息大小 
maxMessageSize=65536 
#flushCommitLogLeastPages=4 
#flushConsumeQueueLeastPages=2 
#flushCommitLogThoroughInterval=10000 
#flushConsumeQueueThoroughInterval=60000 
#Broker 的角色 
#- ASYNC_MASTER 异步复制Master 
#- SYNC_MASTER 同步双写Master 
#- SLAVE 
brokerRole=ASYNC_MASTER 
#刷盘方式 
#- ASYNC_FLUSH 异步刷盘
#- SYNC_FLUSH 同步刷盘 
flushDiskType=ASYNC_FLUSH 
#checkTransactionMessageEnable=false 
#发消息线程池数量 
#sendMessageThreadPoolNums=128 
#拉消息线程池数量 
#pullMessageThreadPoolNums=128

该节点对应的从节点在worker3上。修改2m-2s-async/broker-a-s.properties 只需要修改brokerId和 brokerRole

#所属集群名字,名字一样的节点就在同一个集群内 
brokerClusterName=rocketmq-cluster 
#broker名字,名字一样的节点就是一组主从节点。 
brokerName=broker-a 
#brokerid,0就表示是Master,>0的都是表示 
Slave brokerId=1
#nameServer地址,分号分割 
namesrvAddr=worker1:9876;
worker2:9876;worker3:9876 
#在发送消息时,自动创建服务器不存在的topic,默认创建的队列数 
defaultTopicQueueNums=4 
#是否允许 Broker 自动创建Topic,建议线下开启,线上关闭
 autoCreateTopicEnable=true 
#是否允许 Broker 自动创建订阅组,建议线下开启,线上关闭 
autoCreateSubscriptionGroup=true 
#Broker 对外服务的监听端口 
listenPort=10911 
#删除文件时间点,默认凌晨 4点 
deleteWhen=04 
#文件保留时间,默认 48 小时 
fileReservedTime=120 
#commitLog每个文件的大小默认1G 
mapedFileSizeCommitLog=1073741824 
#ConsumeQueue每个文件默认存30W条,根据业务情况调整 
mapedFileSizeConsumeQueue=300000 
#destroyMapedFileIntervalForcibly=120000 
#redeleteHangedFileInterval=120000 
#检测物理文件磁盘空间 
diskMaxUsedSpaceRatio=88 
#存储路径 
storePathRootDir=/app/rocketmq/store
 #commitLog 存储路径 
storePathCommitLog=/app/rocketmq/store/commitlog 
#消费队列存储路径存储路径 
storePathConsumeQueue=/app/rocketmq/store/consumequeue 
#消息索引存储路径 
storePathIndex=/app/rocketmq/store/index 
#checkpoint 文件存储路径 
storeCheckpoint=/app/rocketmq/store/checkpoint 
#abort 文件存储路径
abortFile=/app/rocketmq/store/abort 
#限制的消息大小 
maxMessageSize=65536 
#flushCommitLogLeastPages=4 
#flushConsumeQueueLeastPages=2 
#flushCommitLogThoroughInterval=10000 
#flushConsumeQueueThoroughInterval=60000 
#Broker 的角色 
#- ASYNC_MASTER 异步复制Master 
#- SYNC_MASTER 同步双写Master 
#- SLAVE 
brokerRole=SLAVE 
#刷盘方式 
#- ASYNC_FLUSH 异步刷盘
#- SYNC_FLUSH 同步刷盘 
flushDiskType=ASYNC_FLUSH 
#checkTransactionMessageEnable=false 
#发消息线程池数量 
#sendMessageThreadPoolNums=128 
#拉消息线程池数量 
#pullMessageThreadPoolNums=128
配置第二组Broker-b

这一组broker的主节点在worker3上,所以需要配置worker3上的config/2m-2s-async/brokerb.properties

#所属集群名字,名字一样的节点就在同一个集群内 
brokerClusterName=rocketmq-cluster 
#broker名字,名字一样的节点就是一组主从节点。 
brokerName=broker-b 
#brokerid,0就表示是Master,>0的都是表示 Slave 
brokerId=0 
#nameServer地址,分号分割 
namesrvAddr=worker1:9876;worker2:9876;worker3:9876 
#在发送消息时,自动创建服务器不存在的topic,默认创建的队列数 
defaultTopicQueueNums=4 
#是否允许 Broker 自动创建Topic,建议线下开启,线上关闭 
autoCreateTopicEnable=true 
#是否允许 Broker 自动创建订阅组,建议线下开启,线上关闭 
autoCreateSubscriptionGroup=true 
#Broker 对外服务的监听端口 
listenPort=10911 
#删除文件时间点,默认凌晨 4点 
deleteWhen=04 
#文件保留时间,默认 48 小时 
fileReservedTime=120 
#commitLog每个文件的大小默认1G 
mapedFileSizeCommitLog=1073741824 
#ConsumeQueue每个文件默认存30W条,根据业务情况调整 1234567891011121314151617181920212223
然后他对应的slave在worker2上,
修改work2上的 conf/2m-2s-async/broker-b-s.properties
mapedFileSizeConsumeQueue=300000 
#destroyMapedFileIntervalForcibly=120000 
#redeleteHangedFileInterval=120000 
#检测物理文件磁盘空间 
diskMaxUsedSpaceRatio=88 
#存储路径 
storePathRootDir=/app/rocketmq/store 
#commitLog 存储路径 
storePathCommitLog=/app/rocketmq/store/commitlog 
#消费队列存储路径存储路径 
storePathConsumeQueue=/app/rocketmq/store/consumequeue 
#消息索引存储路径 
storePathIndex=/app/rocketmq/store/index 
#checkpoint 
文件存储路径 
storeCheckpoint=/app/rocketmq/store/checkpoint 
#abort 文件存储路径 
abortFile=/app/rocketmq/store/abort 
#限制的消息大小 
maxMessageSize=65536 
#flushCommitLogLeastPages=4 
#flushConsumeQueueLeastPages=2 
#flushCommitLogThoroughInterval=10000 
#flushConsumeQueueThoroughInterval=60000 
#Broker 的角色 
#- ASYNC_MASTER 异步复制Master 
#- SYNC_MASTER 同步双写Master 
#- SLAVE brokerRole=ASYNC_MASTER 
#刷盘方式 
#- ASYNC_FLUSH 异步刷盘 
#- SYNC_FLUSH 同步刷盘 
flushDiskType=ASYNC_FLUSH 
#checkTransactionMessageEnable=false
 #发消息线程池数量 
#sendMessageThreadPoolNums=128 
#拉消息线程池数量 
#pullMessageThreadPoolNums=128

然后他对应的slave在worker2上,修改work2上的 conf/2m-2s-async/broker-b-s.properties

#所属集群名字,名字一样的节点就在同一个集群内 
brokerClusterName=rocketmq-cluster 
#broker名字,名字一样的节点就是一组主从节点。 
brokerName=broker-b 
#brokerid,0就表示是Master,>0的都是表示 Slave 
brokerId=1
#nameServer地址,分号分割 
namesrvAddr=worker1:9876;worker2:9876;worker3:9876 
#在发送消息时,自动创建服务器不存在的topic,默认创建的队列数 
defaultTopicQueueNums=4 
#是否允许 Broker 自动创建Topic,建议线下开启,线上关闭 
autoCreateTopicEnable=true 
#是否允许 Broker 自动创建订阅组,建议线下开启,线上关闭 
autoCreateSubscriptionGroup=true 
#Broker 对外服务的监听端口 
listenPort=10911 
#删除文件时间点,默认凌晨 4点 
deleteWhen=04 
#文件保留时间,默认 48 小时 
fileReservedTime=120 
#commitLog每个文件的大小默认1G 
mapedFileSizeCommitLog=1073741824 
#ConsumeQueue每个文件默认存30W条,根据业务情况调整 1234567891011121314151617181920212223
然后他对应的slave在worker2上,
修改work2上的 conf/2m-2s-async/broker-b-s.properties
mapedFileSizeConsumeQueue=300000 
#destroyMapedFileIntervalForcibly=120000 
#redeleteHangedFileInterval=120000 
#检测物理文件磁盘空间 
diskMaxUsedSpaceRatio=88 
#存储路径 
storePathRootDir=/app/rocketmq/store 
#commitLog 存储路径 
storePathCommitLog=/app/rocketmq/store/commitlog 
#消费队列存储路径存储路径 
storePathConsumeQueue=/app/rocketmq/store/consumequeue 
#消息索引存储路径 
storePathIndex=/app/rocketmq/store/index 
#checkpoint 
文件存储路径 
storeCheckpoint=/app/rocketmq/store/checkpoint 
#abort 文件存储路径 
abortFile=/app/rocketmq/store/abort 
#限制的消息大小 
maxMessageSize=65536 
#flushCommitLogLeastPages=4 
#flushConsumeQueueLeastPages=2 
#flushCommitLogThoroughInterval=10000 
#flushConsumeQueueThoroughInterval=60000 
#Broker 的角色 
#- ASYNC_MASTER 异步复制Master 
#- SYNC_MASTER 同步双写Master 
#- SLAVE 
brokerRole=SLAVE 
#刷盘方式 
#- ASYNC_FLUSH 异步刷盘 
#- SYNC_FLUSH 同步刷盘 
flushDiskType=ASYNC_FLUSH 
#checkTransactionMessageEnable=false
 #发消息线程池数量 
#sendMessageThreadPoolNums=128 
#拉消息线程池数量 
#pullMessageThreadPoolNums=128

这样broker就配置完成了。

需要注意的配置项:

  • 同一机器上两个实例的store目录不能相同,否则会报错 Lock failed,MQ
    already started
  • 同一机器上两个实例的listenPort也不能相同。否则会报端口占用的错nameserver不需要进行配置,直接启动就行。这也看出nameserver是无状态的。
启动RocketMQ

启动就比较简单了,直接调用bin目录下的脚本就行。只是启动之前要注意看下他们的JVM内存配置,默认的配置都比较高。

先启动nameServer

修改三个节点上的bin/runserver.sh,调整里面的jvm内存配置。找到下面这一行调整下内存

JAVA_OPT="${JAVA_OPT} -server -Xms512m -Xmx512m -Xmn256m - XX:metaspaceSize=128m -XX:MaxmetaspaceSize=320m"

直接在三个节点上启动nameServer。

nohup bin/mqadminsrv &

启动完成后,在nohup.out里看到这一条关键日志就是启动成功了。

The Name Server boot success. serializeType=JSON

使用jps指令可以看到一个NamesrvStartup进程。
这里也看到,RocketMQ在runserver.sh中是使用的CMS垃圾回收期,而在runbroker.sh中使用的是G1垃圾回收期。

再启动broker

启动broker是使用的mqbroker指令,只是注意启动broker时需要通过-c 指定对应的配置文件。在worker2上启动broker-a的master节点和broker-b的slave节点

nohup ./mqbroker -c ../conf/2m-2s-async/broker-a.properties & 
nohup ./mqbroker -c ../conf/2m-2s-async/broker-b-s.properties &

在work3上启动broker-b的master节点和broker-a的slave节点

nohup ./mqbroker -c ../conf/2m-2s-async/broker-b.properties & 
nohup ./mqbroker -c ../conf/2m-2s-async/broker-a-s.properties &

启动slave时,如果遇到报错 Lock failed,MQ already started ,那是因为有多个实例共用了同一个storePath造成的,这时就需要调整store的路径。

启动状态检查

使用jps指令,能看到一个NameSrvStartup进程和两个BrokerStartup进程。nohup.out中也有启动成功的日志。对应的日志文件:

# 查看nameServer日志 
tail -500f ~/logs/rocketmqlogs/namesrv.log 
# 查看broker日志 
tail -500f ~/logs/rocketmqlogs/broker.log 1234
测试mqadmin管理工具

RocketMQ的源代码中并没有为我们提供类似于Nacos或者RabbitMQ那样的控制台,只提供了一个mqadmin指令来管理RocketMQ,命令在bin目录下。使用方式是 ./mqadmin {command} {args}

命令行快速验证

在RocketMQ的安装包中,提供了一个tools.sh工具可以用来在命令行快速验证RocketMQ服务。在worker2上进入RocketMQ的安装目录:

  • 发送消息:默认会发1000条消息
bin/tools.sh org.apache.rocketmq.example.quickstart.Producer 1
  • 接收消息:
bin/tools.sh org.apache.rocketmq.example.quickstart.Consumer

注意,这是官方提供的Demo,但是官方的源码中,这两个类都是没有指定nameServer的,所以运行会有点问题。要指定NameServer地址,可以配置一个环境变量NAMESRV_ADDR,这样默认会读取这个NameServer地址。可以配到.bash_profile里或者直接临时指定。

export NAMESRV_ADDR='worker1:9876;worker2:9876;worker3:9876'

然后就可以正常执行了。这个NameServer地址的读取方式见源码中
org.apache.rocketmq.common.utils.NameServerAddressUtils

public static String getNameServerAddresses() { return System.getProperty("rocketmq.namesrv.addr", System.getenv("NAMESRV_ADDR")); }

这个方法就是在DefaultMQProducer中默认的设置NameServer地址的方式,这个rokcetmq.namesrv.addr属性可以在java中使用System.setproperties指定,也可以在SpringBoot中配到配置文件里。这个tools.sh就封装了一个简单的运行RocketMQ的环境,可以运行源码中的其他示例,然后自己
的例子也可以放到RocketMQ的lib目录下去执行。

搭建管理控制台

RocketMQ源代码中并没有提供控制台,但是有一个Rocket的社区扩展项目中提供了一个控制台,地址: https://github.com/apache/rocketmq-externals
下载下来后,进入其中的rocket-console目录,使用maven进行编译

mvn clean package -Dmaven.test.skip=true

编译完成后,获取target下的jar包,就可以直接执行。但是这个时候要注意,在这个项目的application.properties中需要指定nameserver的地址。默认这个属性是空的。可以在jar包的当前目录下增加一application.properties文件,覆盖jar包中默认的一个属性:

rocketmq.config.namesrvAddr=worker1:9876;worker2:9876;worker3:9876 1

然后执行:

java -jar rocketmq-console-ng-1.0.1.jar

启动完成后,可以访问 http://192.168.232.128:8080看到管理页面
如果需要搭建高可用,需要使用Dleger搭建高可用集群

欢迎分享,转载请注明来源:内存溢出

原文地址: https://www.outofmemory.cn/zaji/5683475.html

(0)
打赏 微信扫一扫 微信扫一扫 支付宝扫一扫 支付宝扫一扫
上一篇 2022-12-17
下一篇 2022-12-17

发表评论

登录后才能评论

评论列表(0条)

保存