1.创建namesrv服务
拉取镜像
docker pull rocketmqinc/rocketmq
创建nameServer存储路径
mkdir -p /docker/rocketmq/data/namesrv/logs /docker/rocketmq/data/namesrv/store
构建namesrv容器
docker run -d --restart=always --name rmqnamesrv -p 9876:9876 -v /docker/rocketmq/data/namesrv/logs:/root/logs -v /docker/rocketmq/data/namesrv/store:/root/store -e "MAX_POSSIBLE_HEAP=100000000" rocketmqinc/rocketmq sh mqnamesrv
2.创建broker节点
创建broker数据存储路径
mkdir -p /docker/rocketmq/data/broker/logs /docker/rocketmq/data/broker/store /docker/rocketmq/conf
创建配置文件
vi /docker/rocketmq/conf/broker.conf
# 所属集群名称,如果节点较多可以配置多个 brokerClusterName = DefaultCluster #broker名称,master和slave使用相同的名称,表明他们的主从关系 brokerName = broker-a #0表示Master,大于0表示不同的slave brokerId = 0 #表示几点做消息删除动作,默认是凌晨4点 deleteWhen = 04 #在磁盘上保留消息的时长,单位是小时 fileReservedTime = 48 #有三个值:SYNC_MASTER,ASYNC_MASTER,SLAVE;同步和异步表示Master和Slave之间同步数据的机制; brokerRole = ASYNC_MASTER #刷盘策略,取值为:ASYNC_FLUSH,SYNC_FLUSH表示同步刷盘和异步刷盘;SYNC_FLUSH消息写入磁盘后才返回成功状态,ASYNC_FLUSH不需要; flushDiskType = ASYNC_FLUSH #设置broker节点所在服务器的ip地址(公网IP) brokerIP1 = 192.168.52.136
构建broker容器
docker run -d --restart=always --name rmqbroker --link rmqnamesrv:namesrv -p 10911:10911 -p 10909:10909 -v /docker/rocketmq/data/broker/logs:/root/logs -v /docker/rocketmq/data/broker/store:/root/store -v /docker/rocketmq/conf/broker.conf:/opt/rocketmq-4.4.0/conf/broker.conf -e "NAMESRV_ADDR=namesrv:9876" -e "MAX_POSSIBLE_HEAP=200000000" rocketmqinc/rocketmq sh mqbroker -c /opt/rocketmq-4.4.0/conf/broker.conf
3.创建rockermq-console服务
拉取镜像
docker pull pangliang/rocketmq-console-ng
构建rockermq-console容器
docker run -d --restart=always --name rmqadmin -e "JAVA_OPTS=-Drocketmq.namesrv.addr=192.168.52.136:9876 -Dcom.rocketmq.sendMessageWithVIPChannel=false" -p 9999:8080 pangliang/rocketmq-console-ng IP为内网IP
4.开放端口
5.测试
6.编写代码
引入maven依赖
org.apache.rocketmq rocketmq-client4.5.1
生产者
public class SyncProducer { public static void main(String[] args) throws Exception { final DefaultMQProducer producer=new DefaultMQProducer("test_producer"); //这里需要设置NameServer地址 producer.setNamesrvAddr("101.43.12.115:9876"); producer.start(); for (int i = 0; i < 10; i++) { new Thread(){ @Override public void run() { while (true){ try { Message message=new Message("TopicTest","TagA",("Test").getBytes(RemotingHelper.DEFAULT_CHARSET)); producer.send(message); } catch (Exception e) { e.printStackTrace(); } } } }.start(); } while (true){ Thread.sleep(1000); } } }
消费者
public class Consumer { public static void main(String[] args) throws Exception{ DefaultMQPushConsumer consumer=new DefaultMQPushConsumer("test_consumer"); //这里需要设置NameServer地址 consumer.setNamesrvAddr("101.43.12.115:9876"); //订阅Topic,你要消费哪些Topic的消息 consumer.subscribe("TopicTest","*"); //这里注册一个回掉接口,去接收获取到的消息 consumer.registerMessageListener(new MessageListenerConcurrently() { @Override public ConsumeConcurrentlyStatus consumeMessage(Listlist, ConsumeConcurrentlyContext consumeConcurrentlyContext) { System.out.println(list); return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; } }); consumer.start(); } }
结果显示
欢迎分享,转载请注明来源:内存溢出
评论列表(0条)