6.MQ-消息队列

6.MQ-消息队列,第1张

6.MQ-消息队列 入门介绍






RocketMQ入门 下载RockeyMQ

环境要求

Linux 64位 *** 作系统
64bit JDK 1.8+

上传到服务器并解压
unzip rocketmq-all-4.4.0-bin-release.zip


启动NameServer

查看日志
tail -f /root/logs/rocketmqlogs/namesrv.log

启动Broker

修改runbroker.sh,runserver.sh

运行
nohup ./mqbroker -n localhost:9876 &
查看日志
tail -f /root/logs/rocketmqlogs/broker.log

测试 发送消息
export NAMESRV_ADDR=localhost:9876
/usr/local/rocketmq/bin/tools.sh org.apache.rocketmq.example.quickstart.Producer
接收消息
export NAMESRV_ADDR=localhost:9876
/usr/local/rocketmq/bin/tools.sh org.apache.rocketmq.example.quickstart.Consumer
关闭RocketMQ
/usr/local/rocketmq/bin/mqshutdown broker
/usr/local/rocketmq/bin/mqshutdown namesrv
原理介绍


控制器安装 下载rocketmq-console

解压后只需要rocketmq-console

修改rocketmq-consolesrcmainresources

到根目录下执行打包

mvn clean package -Dmaven.test.skip=true

target目录下运行jar包

访问localhost:7777

RocketMQ消息基础 导入依赖


    org.apache.rocketmq
    rocketmq-spring-boot-starter
    2.0.2

消息发送
//创建消息生产者,设置生产组名
DefaultMQProducer cg_producer_group = new DefaultMQProducer("cg_producer_group");
//设置NameServer地址
cg_producer_group.setNamesrvAddr("192.168.238.130:9876");
//启动生产者
cg_producer_group.start();
//构建消息对象,主题,标签,内容
Message message = new Message("cgTopic", "cgTag", ("RocketMQ Message Test").getBytes());
//发送消息,设置超时时间
SendResult result = cg_producer_group.send(message, 1000);
System.out.println("result----------------------: "+result);
//关闭生产者
cg_producer_group.shutdown();
消息消费
//创建消费者,指定组名
DefaultMQPushConsumer cg_consumer_group = new DefaultMQPushConsumer("cg_consumer_group");
//为消费者设置NameServer地址
cg_consumer_group.setNamesrvAddr("192.168.238.130:9876");
//指定订阅的主题和标签
cg_consumer_group.subscribe("cgTopic","");
//设置回调函数,编写消息处理方法
cg_consumer_group.registerMessageListener(new MessageListenerConcurrently() {
    @Override
    public ConsumeConcurrentlyStatus consumeMessage(List list, ConsumeConcurrentlyContext consumeConcurrentlyContext) {
        System.out.println("Message--------: " + list);
        return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
    }
});
//启动消息消费者
cg_consumer_group.start();
微服务发送消息案例

订单微服务 导入依赖
    
        
            com.alibaba.cloud
            spring-cloud-starter-alibaba-nacos-discovery
        
        
        
            org.apache.rocketmq
            rocketmq-spring-boot-starter
            2.0.2
        
        
            org.apache.rocketmq
            rocketmq-client
            4.4.0
        
修改配置文件
spring:
  cloud:
    nacos:
      discovery:
        server-addr: localhost:8848
#rocketmq
rocketmq:
  name-server: 192.168.238.130:9876
  producer:
    group: shop-order #生产者组
发送消息
    @Autowired
    private RocketMQTemplate rocketMQTemplate;
    
    @GetMapping("/order/prod/{pid}")
    public Order order(@PathVariable("pid") Long pid) {
        //直接根据服务id获取服务
        Product product = productFeignClient.findByPid(pid);
        if (product.getId() == 250L){
            return new Order().setId(250L);
        }
        log.info("查询{}号商品:{}", pid, JSON.toJSONString(product));
        Order order = new Order().setNumber(1).setPid(pid).setUid(1L);
        log.info("订单信息{}", order.toString());
        orderService.createOrder(order);

        //mq发送
        
        rocketMQTemplate.convertAndSend("order-topic",order);

        return order;
    }
用户微服务 导入依赖
  
        
            com.alibaba.cloud
            spring-cloud-starter-alibaba-nacos-discovery
        
        
        
            org.apache.rocketmq
            rocketmq-spring-boot-starter
            2.0.2
        
        
            org.apache.rocketmq
            rocketmq-client
            4.4.0
        
修改配置文件
spring:
  cloud:
    nacos:
      discovery:
        server-addr: localhost:8848
#rocketmq
rocketmq:
  name-server: 192.168.238.130:9876
  producer:
    group: shop-order #生产者组
实现接口
@Service
//消费者组名 消费者主题
@RocketMQMessageListener(
        //消费者组名
        consumerGroup = "shop-user",
        //消费者主题
        topic = "order-topic",
        //消费模式:无序和有序
        consumeMode = ConsumeMode.CONCURRENTLY,
        //消息模式:广播和集群,默认是集群
        messageModel = MessageModel.CLUSTERING)
public class SmsService implements RocketMQListener {
    
    @Override
    public void onMessage(Order order) {
        System.out.println("接收到一个订单信息");
        System.out.println(order.toString());
    }
}

广播消费:每个消费者实例都会收到消息,也就是一条消息可以被每个消费者实例处理集群模式: 一条消息只能被一个消费者实例消费

普通消息

    @Autowired
    private RocketMQTemplate rocketMQTemplate;
可靠同步发送
    
    @Test
    public void testSyncSend() {
        
        SendResult result = rocketMQTemplate.syncSend("test-topic-1:test-tag", "同步消息", 10000);
        System.out.println("运行结果" + result);
    }

可靠异步发送
   
    @Test
    public void testAsyncSend() {
        
        rocketMQTemplate.asyncSend("test-topic-1:test-tag", "异步消息", new SendCallback() {
            
            @Override
            public void onSuccess(SendResult sendResult) {
                System.out.println(sendResult);
            }

            
            @Override
            public void onException(Throwable throwable) {
                System.out.println(throwable);
            }
        });
        System.out.println("======================");
        try {
            Thread.sleep(10000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }

单向发送
  
    @Test
    public void testOneWay(){
        rocketMQTemplate.sendOneWay("test-topic-1:test-tag","单向消息");
    }
顺序消息

 
    @Test
    public void testOneWayOrderly(){
    	// hashkey 要求不重复
        rocketMQTemplate.sendOneWayOrderly("test-topic-1:test-tag","单向顺序消息","xx");
    }

同时还有同步顺序消息,异步顺序消息都是在原方法后加上Orderly,同时加上hashkey参数,指定唯一值

事务消息


TxLog日志表
@Entity(name = "shop_txlog")
@Data
public class TxLog {
    @Id
    private String txId;
    private Date date;
}
dao
public interface TxLogDao extends JpaRepository {
}
service
@Service
public class OrderServiceImplTest {
    @Autowired
    private OrderDao orderDao;

    @Autowired
    private TxLogDao txLogDao;

    @Autowired
    private RocketMQTemplate rocketMQTemplate;

    
    public void createOrderBefore(Order order) {
        String txId = UUID.randomUUID().toString();
        rocketMQTemplate.sendMessageInTransaction(
                "tx_producer_group",
                "tx_topic",
                MessageBuilder.withPayload(order).setHeader("txId",txId).build(),
                order
        );
    }

    
    @Transactional
    public void createOrder(String txId,Order order) {
        orderDao.save(order);
        TxLog txLog = new TxLog();
        txLog.setTxId(txId);
        txLog.setDate(new Date());
        txLogDao.save(txLog);
    }
 }
listen
@Service
@RocketMQTransactionListener(txProducerGroup = "tx_producer_group")
public class OrderServiceImplTestListener implements RocketMQLocalTransactionListener {

    @Autowired
    private OrderServiceImplTest orderServiceImplTest;

    @Autowired
    private TxLogDao txLogDao;

    
    @Override
    public RocketMQLocalTransactionState executeLocalTransaction(Message message, Object o) {
        String txId = (String) message.getHeaders().get("txId");
        try {
            Order order = (Order) o;
            orderServiceImplTest.createOrder(txId, order);
            return RocketMQLocalTransactionState.COMMIT;
        } catch (Exception e) {
            return RocketMQLocalTransactionState.ROLLBACK;
        }
    }

    
    @Override
    public RocketMQLocalTransactionState checkLocalTransaction(Message message) {
        String txId = (String) message.getHeaders().get("txId");
        TxLog txLog = txLogDao.findById(txId).get();
        if (txLog != null) {
            return RocketMQLocalTransactionState.COMMIT;
        } else {
            return RocketMQLocalTransactionState.ROLLBACK;
        }
    }
}

controller
orderServiceImplTest.createOrderBefore(order);

欢迎分享,转载请注明来源:内存溢出

原文地址: https://www.outofmemory.cn/zaji/5700230.html

(0)
打赏 微信扫一扫 微信扫一扫 支付宝扫一扫 支付宝扫一扫
上一篇 2022-12-17
下一篇 2022-12-17

发表评论

登录后才能评论

评论列表(0条)

保存