使用docker安装RocketMq

使用docker安装RocketMq,第1张

使用docker安装RocketMq

1.创建namesrv服务
拉取镜像

docker pull rocketmqinc/rocketmq

创建nameServer存储路径

mkdir -p  /docker/rocketmq/data/namesrv/logs  /docker/rocketmq/data/namesrv/store

构建namesrv容器

docker run -d 
--restart=always 
--name rmqnamesrv 
-p 9876:9876 
-v /docker/rocketmq/data/namesrv/logs:/root/logs 
-v /docker/rocketmq/data/namesrv/store:/root/store 
-e "MAX_POSSIBLE_HEAP=100000000" 
rocketmqinc/rocketmq 
sh mqnamesrv 

2.创建broker节点
创建broker数据存储路径

mkdir -p  /docker/rocketmq/data/broker/logs   /docker/rocketmq/data/broker/store /docker/rocketmq/conf

创建配置文件

vi /docker/rocketmq/conf/broker.conf
# 所属集群名称,如果节点较多可以配置多个
brokerClusterName = DefaultCluster
#broker名称,master和slave使用相同的名称,表明他们的主从关系
brokerName = broker-a
#0表示Master,大于0表示不同的slave
brokerId = 0
#表示几点做消息删除动作,默认是凌晨4点
deleteWhen = 04
#在磁盘上保留消息的时长,单位是小时
fileReservedTime = 48
#有三个值:SYNC_MASTER,ASYNC_MASTER,SLAVE;同步和异步表示Master和Slave之间同步数据的机制;
brokerRole = ASYNC_MASTER
#刷盘策略,取值为:ASYNC_FLUSH,SYNC_FLUSH表示同步刷盘和异步刷盘;SYNC_FLUSH消息写入磁盘后才返回成功状态,ASYNC_FLUSH不需要;
flushDiskType = ASYNC_FLUSH
#设置broker节点所在服务器的ip地址(公网IP)
brokerIP1 = 192.168.52.136

构建broker容器

docker run -d  
--restart=always 
--name rmqbroker 
--link rmqnamesrv:namesrv 
-p 10911:10911 
-p 10909:10909 
-v  /docker/rocketmq/data/broker/logs:/root/logs 
-v  /docker/rocketmq/data/broker/store:/root/store 
-v /docker/rocketmq/conf/broker.conf:/opt/rocketmq-4.4.0/conf/broker.conf 
-e "NAMESRV_ADDR=namesrv:9876" 
-e "MAX_POSSIBLE_HEAP=200000000" 
rocketmqinc/rocketmq 
sh mqbroker -c /opt/rocketmq-4.4.0/conf/broker.conf 

3.创建rockermq-console服务
拉取镜像

docker pull pangliang/rocketmq-console-ng

构建rockermq-console容器

docker run -d 
--restart=always 
--name rmqadmin 
-e "JAVA_OPTS=-Drocketmq.namesrv.addr=192.168.52.136:9876 
-Dcom.rocketmq.sendMessageWithVIPChannel=false" 
-p 9999:8080 
pangliang/rocketmq-console-ng
IP为内网IP

4.开放端口

5.测试

6.编写代码
引入maven依赖

 
            org.apache.rocketmq
            rocketmq-client
            4.5.1
        

生产者

public class SyncProducer {
    public static void main(String[] args) throws Exception {
        final DefaultMQProducer producer=new DefaultMQProducer("test_producer");
        //这里需要设置NameServer地址
        producer.setNamesrvAddr("101.43.12.115:9876");
        producer.start();
        for (int i = 0; i < 10; i++) {
         new Thread(){
             @Override
             public void run() {
                 while (true){
                     try {
                     Message message=new Message("TopicTest","TagA",("Test").getBytes(RemotingHelper.DEFAULT_CHARSET));
                         producer.send(message);
                     } catch (Exception e) {
                         e.printStackTrace();
                     }
                 }
             }
         }.start();
        }
        while (true){
            Thread.sleep(1000);
        }
    }
}

消费者

public class Consumer {
    public static void main(String[] args) throws Exception{
        DefaultMQPushConsumer consumer=new DefaultMQPushConsumer("test_consumer");
        //这里需要设置NameServer地址
        consumer.setNamesrvAddr("101.43.12.115:9876");
        //订阅Topic,你要消费哪些Topic的消息
        consumer.subscribe("TopicTest","*");
        //这里注册一个回掉接口,去接收获取到的消息
        consumer.registerMessageListener(new MessageListenerConcurrently() {
            @Override
            public ConsumeConcurrentlyStatus consumeMessage(List list, ConsumeConcurrentlyContext consumeConcurrentlyContext) {
                System.out.println(list);
                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
            }
        });
        consumer.start();
    }
}

结果显示

欢迎分享,转载请注明来源:内存溢出

原文地址: https://www.outofmemory.cn/zaji/5718159.html

(0)
打赏 微信扫一扫 微信扫一扫 支付宝扫一扫 支付宝扫一扫
上一篇 2022-12-17
下一篇 2022-12-18

发表评论

登录后才能评论

评论列表(0条)

保存