Linux 64位 *** 作系统
64bit JDK 1.8+
unzip rocketmq-all-4.4.0-bin-release.zip
tail -f /root/logs/rocketmqlogs/namesrv.log启动Broker
修改runbroker.sh,runserver.sh
运行nohup ./mqbroker -n localhost:9876 &查看日志
tail -f /root/logs/rocketmqlogs/broker.log测试 发送消息
export NAMESRV_ADDR=localhost:9876 /usr/local/rocketmq/bin/tools.sh org.apache.rocketmq.example.quickstart.Producer接收消息
export NAMESRV_ADDR=localhost:9876 /usr/local/rocketmq/bin/tools.sh org.apache.rocketmq.example.quickstart.Consumer关闭RocketMQ
/usr/local/rocketmq/bin/mqshutdown broker /usr/local/rocketmq/bin/mqshutdown namesrv原理介绍
解压后只需要rocketmq-console
修改rocketmq-consolesrcmainresources
到根目录下执行打包
mvn clean package -Dmaven.test.skip=true
target目录下运行jar包
访问localhost:7777
RocketMQ消息基础 导入依赖消息发送org.apache.rocketmq rocketmq-spring-boot-starter2.0.2
//创建消息生产者,设置生产组名 DefaultMQProducer cg_producer_group = new DefaultMQProducer("cg_producer_group"); //设置NameServer地址 cg_producer_group.setNamesrvAddr("192.168.238.130:9876"); //启动生产者 cg_producer_group.start(); //构建消息对象,主题,标签,内容 Message message = new Message("cgTopic", "cgTag", ("RocketMQ Message Test").getBytes()); //发送消息,设置超时时间 SendResult result = cg_producer_group.send(message, 1000); System.out.println("result----------------------: "+result); //关闭生产者 cg_producer_group.shutdown();消息消费
//创建消费者,指定组名 DefaultMQPushConsumer cg_consumer_group = new DefaultMQPushConsumer("cg_consumer_group"); //为消费者设置NameServer地址 cg_consumer_group.setNamesrvAddr("192.168.238.130:9876"); //指定订阅的主题和标签 cg_consumer_group.subscribe("cgTopic",""); //设置回调函数,编写消息处理方法 cg_consumer_group.registerMessageListener(new MessageListenerConcurrently() { @Override public ConsumeConcurrentlyStatus consumeMessage(List微服务发送消息案例 订单微服务 导入依赖list, ConsumeConcurrentlyContext consumeConcurrentlyContext) { System.out.println("Message--------: " + list); return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; } }); //启动消息消费者 cg_consumer_group.start();
修改配置文件com.alibaba.cloud spring-cloud-starter-alibaba-nacos-discoveryorg.apache.rocketmq rocketmq-spring-boot-starter2.0.2 org.apache.rocketmq rocketmq-client4.4.0
spring: cloud: nacos: discovery: server-addr: localhost:8848 #rocketmq rocketmq: name-server: 192.168.238.130:9876 producer: group: shop-order #生产者组发送消息
@Autowired private RocketMQTemplate rocketMQTemplate; @GetMapping("/order/prod/{pid}") public Order order(@PathVariable("pid") Long pid) { //直接根据服务id获取服务 Product product = productFeignClient.findByPid(pid); if (product.getId() == 250L){ return new Order().setId(250L); } log.info("查询{}号商品:{}", pid, JSON.toJSONString(product)); Order order = new Order().setNumber(1).setPid(pid).setUid(1L); log.info("订单信息{}", order.toString()); orderService.createOrder(order); //mq发送 rocketMQTemplate.convertAndSend("order-topic",order); return order; }用户微服务 导入依赖
修改配置文件com.alibaba.cloud spring-cloud-starter-alibaba-nacos-discoveryorg.apache.rocketmq rocketmq-spring-boot-starter2.0.2 org.apache.rocketmq rocketmq-client4.4.0
spring: cloud: nacos: discovery: server-addr: localhost:8848 #rocketmq rocketmq: name-server: 192.168.238.130:9876 producer: group: shop-order #生产者组实现接口
@Service //消费者组名 消费者主题 @RocketMQMessageListener( //消费者组名 consumerGroup = "shop-user", //消费者主题 topic = "order-topic", //消费模式:无序和有序 consumeMode = ConsumeMode.CONCURRENTLY, //消息模式:广播和集群,默认是集群 messageModel = MessageModel.CLUSTERING) public class SmsService implements RocketMQListener{ @Override public void onMessage(Order order) { System.out.println("接收到一个订单信息"); System.out.println(order.toString()); } }
广播消费:每个消费者实例都会收到消息,也就是一条消息可以被每个消费者实例处理集群模式: 一条消息只能被一个消费者实例消费
普通消息@Autowired private RocketMQTemplate rocketMQTemplate;可靠同步发送
@Test public void testSyncSend() { SendResult result = rocketMQTemplate.syncSend("test-topic-1:test-tag", "同步消息", 10000); System.out.println("运行结果" + result); }可靠异步发送
@Test public void testAsyncSend() { rocketMQTemplate.asyncSend("test-topic-1:test-tag", "异步消息", new SendCallback() { @Override public void onSuccess(SendResult sendResult) { System.out.println(sendResult); } @Override public void onException(Throwable throwable) { System.out.println(throwable); } }); System.out.println("======================"); try { Thread.sleep(10000); } catch (InterruptedException e) { e.printStackTrace(); } }单向发送
@Test public void testOneWay(){ rocketMQTemplate.sendOneWay("test-topic-1:test-tag","单向消息"); }顺序消息
@Test public void testOneWayOrderly(){ // hashkey 要求不重复 rocketMQTemplate.sendOneWayOrderly("test-topic-1:test-tag","单向顺序消息","xx"); }
同时还有同步顺序消息,异步顺序消息都是在原方法后加上Orderly,同时加上hashkey参数,指定唯一值
事务消息
@Entity(name = "shop_txlog") @Data public class TxLog { @Id private String txId; private Date date; }dao
public interface TxLogDao extends JpaRepositoryservice{ }
@Service public class OrderServiceImplTest { @Autowired private OrderDao orderDao; @Autowired private TxLogDao txLogDao; @Autowired private RocketMQTemplate rocketMQTemplate; public void createOrderBefore(Order order) { String txId = UUID.randomUUID().toString(); rocketMQTemplate.sendMessageInTransaction( "tx_producer_group", "tx_topic", MessageBuilder.withPayload(order).setHeader("txId",txId).build(), order ); } @Transactional public void createOrder(String txId,Order order) { orderDao.save(order); TxLog txLog = new TxLog(); txLog.setTxId(txId); txLog.setDate(new Date()); txLogDao.save(txLog); } }listen
@Service @RocketMQTransactionListener(txProducerGroup = "tx_producer_group") public class OrderServiceImplTestListener implements RocketMQLocalTransactionListener { @Autowired private OrderServiceImplTest orderServiceImplTest; @Autowired private TxLogDao txLogDao; @Override public RocketMQLocalTransactionState executeLocalTransaction(Message message, Object o) { String txId = (String) message.getHeaders().get("txId"); try { Order order = (Order) o; orderServiceImplTest.createOrder(txId, order); return RocketMQLocalTransactionState.COMMIT; } catch (Exception e) { return RocketMQLocalTransactionState.ROLLBACK; } } @Override public RocketMQLocalTransactionState checkLocalTransaction(Message message) { String txId = (String) message.getHeaders().get("txId"); TxLog txLog = txLogDao.findById(txId).get(); if (txLog != null) { return RocketMQLocalTransactionState.COMMIT; } else { return RocketMQLocalTransactionState.ROLLBACK; } } }controller
orderServiceImplTest.createOrderBefore(order);
欢迎分享,转载请注明来源:内存溢出
评论列表(0条)