一.了解RocketMQ?
rocketMQ是阿里开源的一款十分优秀的消息队列,rocketMQ具有很多其他消息队列不具有的特性,更重要的是rocketMQ是用java开发的学习成本较低,并且经历了双11的数据洪峰的考验。
二. 消息队列概念
MQ全称为Message Queue,消息队列(MQ)是一种应用程序对应用程序的通信方法。应用程序通过读写出入队列的消息(针对应用程序的数据)来通信,而无需专用连接来链接它们。
消息传递指的是程序之间通过在消息中发送数据进行通信,而不是通过直接调用彼此来通信。队列的使用除去了接收和发送应用程序同时执行的要求。
消息队列中间件是分布式系统中重要的组件,主要解决应用耦合,异步消息,流量削锋等问题
三.RocketMQ 是什么?
1.是一个队列模型的消息中间件,具有高性能、高可靠、高实时、分布式特点。
2.可以分布式。
3.Producer向一些队列轮流发送消息,队列集合称为 Topic,Consumer 如果做广播消费,则一个consumer实例消费这个Topic 对应的所有队列,如果做集群消费,则多个Consumer 实例平均消费这个topic对应的队列集合。(默认是集群消费)
4.能够保证严格的消息顺序(因为性能原因,不能保证消息不重复,因为总有网络不可达的情况发生,需业务端保证)。
5.提供丰富的消息拉取模式
6.高效的订阅者水平扩展能力
7.实时的消息订阅机制
8.亿级消息堆积能力
9.较少的依赖
三.安装
- 下载应用
地址https://www.apache.org/dyn/closer.cgi?path=rocketmq/4.9.2/rocketmq-all-4.9.2-bin-release.zip - 解压缩
- 配置环境变量
ROCKETMQ_HOME=“D:rocketmq”
NAMESRV_ADDR=“localhost:9876” - 启动
.binmqnamesrv.cmd
启动broker.binmqbroker.cmd -n localhost:9876 autoCreateTopicEnable=true
- 导入依赖
org.apache.rocketmq
rocketmq-client
4.9.1
发送的三种模式
发送同步确认结果
package com.woniuxy.cloud.simple;
import com.woniuxy.cloud.AppConstants;
import org.apache.rocketmq.client.exception.MQBrokerException;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.remoting.exception.RemotingException;
import java.io.UnsupportedEncodingException;
import java.util.Scanner;
public class Sender {
public static void main(String[] args) throws MQClientException, RemotingException, InterruptedException, MQBrokerException, UnsupportedEncodingException {
//(1)创建生产者 DefaultMQProducer producer = new DefaultMQProducer("TestSender"); producer.setNamesrvAddr(AppConstants.ROCKETMQ_NAMESERVER_ADDR); //(2)启动producer producer.start(); //循环发送,查看回调结果顺序 for (int i = 0; i < 10; i++) { //(3)构建消息并发送 String smsContent = "嘿嘿"; Message msg = new Message(AppConstants.SMS_TOPIC, "user_register", (smsContent + i).getBytes("UTF-8")); //同步发送到RocketMQ SendResult sendResult = producer.send(msg); System.out.println("sendResult:" + sendResult); } //(4)关闭producer producer.shutdown(); }
}
发送-异步确认发送结果
package com.woniu.wn63.SendAsync;
import com.woniu.wn63.AppConstants;
import org.apache.rocketmq.client.exception.MQBrokerException;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendCallback;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.remoting.exception.RemotingException;
import java.nio.charset.StandardCharsets;
import java.util.Scanner;
public class SendAsync {
public static void main( String[] args ) throws MQClientException, MQBrokerException, RemotingException, InterruptedException {
Scanner scanner = new Scanner(System.in);
//分组
DefaultMQProducer producer = new DefaultMQProducer(“msm1”);
producer.setNamesrvAddr(AppConstants.ROCKETMQ_NAMESERVER_ADDR);
//启动producer
producer.start();
//异步发送失败后重试次数
producer.setRetryTimesWhenSendAsyncFailed(0);
//构建消息并发送 while (true) { System.out.println("请输入要发送的消息"); String smsContent = scanner.next(); if (smsContent.equals("exit")) { //(4)关闭producer producer.shutdown(); } for ( int i = 0; i < 10; i++) { Message msg=new Message(AppConstants.SMS_TOPIC,"user_register",smsContent.getBytes(StandardCharsets.UTF_8)); //同步发送到rocket //SendResult send = producer.send(msg); //异步发送 producer.send(msg, new SendCallback() { @Override public void onSuccess(SendResult sendResult) { System.out.println(sendResult); } @Override public void onException(Throwable e) { e.printStackTrace(); } }); } // System.out.println("sendResult"+send); } }
}
发送-结束
发送方只负责发送消息,不等待服务端返回响应且没有回调函数触发,即只发送请求不等待应答。此方式发送消息的过程耗时非常短,一般在微秒级别。
package com.woniu.wn63.onewy;
import com.woniu.wn63.AppConstants;
import org.apache.rocketmq.client.exception.MQBrokerException;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendCallback;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.remoting.exception.RemotingException;
import java.nio.charset.StandardCharsets;
import java.util.Scanner;
public class SendoneWay {
public static void main( String[] args ) throws MQClientException, MQBrokerException, RemotingException, InterruptedException {
Scanner scanner = new Scanner(System.in);
//分组
DefaultMQProducer producer = new DefaultMQProducer(“msm1”);
producer.setNamesrvAddr(AppConstants.ROCKETMQ_NAMESERVER_ADDR);
//启动producer
producer.start();
//异步发送失败后重试次数
producer.setRetryTimesWhenSendAsyncFailed(0);
//构建消息并发送 while (true) { System.out.println("请输入要发送的消息"); String smsContent = scanner.next(); if (smsContent.equals("exit")) { //(4)关闭producer producer.shutdown(); } for ( int i = 0; i < 10; i++) { Message msg=new Message(AppConstants.SMS_TOPIC,"user_register",smsContent.getBytes(StandardCharsets.UTF_8)); //发送到RocketMQ,不等待结果 producer.sendoneway(msg); } producer.shutdown(); } }
}
消费
tag过滤
consumer.subscribe(AppConstants.SMS_TOPIC,"*");
consumer.subscribe(AppConstants.SMS_TOPIC,“user_reg”);
consumer.subscribe(AppConstants.SMS_TOPIC,“user_reg || user_cancel”);
属性过滤
通过putUserProperty来设置消息的属性
1、生产者例
DefaultMQProducer producer = new DefaultMQProducer(“please_rename_unique_group_name”);
producer.start();
Message msg = new Message(“TopicTest”,
tag,
("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET)
);
//>>>>>>> 设置一些属性
msg.putUserProperty(“a”, String.valueOf(i));
SendResult sendResult = producer.send(msg);
producer.shutdown();
2、生产者例
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(“please_rename_unique_group_name_4”);
//>>>>>>> 通过属性过滤:a >=0 and a <= 3
consumer.subscribe(“TopicTest”, MessageSelector.bySql(“a between 0 and 3”);
consumer.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List msgs, ConsumeConcurrentlyContext context) {
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
consumer.start();
消息模式
两种模式,一种集群模式,一种群发模式
消费者
package com.woniu.wn63.messagemodel;
import com.woniu.wn63.AppConstants;
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.common.protocol.heartbeat.MessageModel;
import java.io.UnsupportedEncodingException;
public class ReceiverPlus {
private String name; public ReceiverPlus(String name) { this.name = name; } public void receive() throws MQClientException, InterruptedException { //(1)创建消费者实例 //消费者分组,同一个名字的消费者组成一个集群 DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("TestReceive"); //在BROADCASTING模式下,需要设置实例名 consumer.setInstanceName(name); consumer.setNamesrvAddr(AppConstants.ROCKETMQ_NAMESERVER_ADDR); //(2)订阅某个主题,收到特定的消息 consumer.subscribe(AppConstants.SMS_TOPIC,"user_reg || user_cancel"); //设置消息模型 org.apache.rocketmq.common.protocol.heartbeat.MessageModel //MessageModel.BROADCASTING 群发模式 //MessageModel.CLUSTERING 集群模式 consumer.setMessageModel(MessageModel.CLUSTERING); //(3)向MQ注册一个监听器 consumer.registerMessageListener((MessageListenerConcurrently) (msgs, context) -> { for (MessageExt msgExt:msgs){ try { System.out.println(name+" 消息内容:"+new String(msgExt.getBody(),"utf-8")); } catch (UnsupportedEncodingException e) { e.printStackTrace(); } } return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; }); // (4)启动消费者实例 consumer.start(); System.out.printf(name+" Consumer Started.%n"); } public static void main(String[] args) throws MQClientException, InterruptedException { new ReceiverPlus("端午").receive(); new ReceiverPlus("妞妞").receive(); new ReceiverPlus("王五").receive(); }
}
生产者
package com.woniu.wn63.messagemodel;
import com.woniu.wn63.AppConstants;
import org.apache.rocketmq.client.exception.MQBrokerException;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.remoting.exception.RemotingException;
import java.nio.charset.StandardCharsets;
import java.util.Scanner;
public class Sender {
public static void main( String[] args ) throws MQClientException, MQBrokerException, RemotingException, InterruptedException {
Scanner scanner = new Scanner(System.in);
//分组
DefaultMQProducer producer = new DefaultMQProducer(“msm1”);
producer.setNamesrvAddr(AppConstants.ROCKETMQ_NAMESERVER_ADDR);
//启动producer
producer.start();
//构建消息并发送 while (true) { System.out.println("请输入要发送的消息"); String smsContent = scanner.next(); if (smsContent.equals("exit")) { //(4)关闭producer producer.shutdown(); } Message msg=new Message(AppConstants.SMS_TOPIC,"user_reg",smsContent.getBytes(StandardCharsets.UTF_8)); //同步发送到rocket SendResult send = producer.send(msg); System.out.println("sendResult"+send); } }
}
顺序消息
消息有序指的是可以按照消息的发送顺序来消费(FIFO)。RocketMQ可以严格的保证消息有序,可以分为分区有序或者全局有序。
顺序消费的原理解析,在默认的情况下消息发送会采取Round Robin轮询方式把消息发送到不同的queue(分区队列);而消费消息的时候从多个queue上拉取消息,这种情况发送和消费是不能保证顺序。但是如果控制发送的顺序消息只依次发送到同一个queue中,消费的时候只从这个queue上依次拉取,则就保证了顺序。当发送和消费参与的queue只有一个,则是全局有序;如果多个queue参与,则为分区有序,即相对每个queue,消息都是有序的。
下面用订单进行分区有序的示例。一个订单的顺序流程是:创建、付款、推送、完成。订单号相同的消息会被先后发送到同一个队列中,消费时,同一个OrderId获取到的肯定是同一个队列。
有很多场景需要顺序消息,比如先买票->再上车;淘宝买东西时,先下订单->付款->发货;等等
延时消息
比如电商里,提交了一个订单就可以发送一个延时消息,1h后去检查这个订单的状态,如果还是未付款就取消订单释放库存。
private String messageDelayLevel = “1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h”;
现在RocketMq并不支持任意时间的延时,需要设置几个固定的延时等级,从1s到2h分别对应着等级1到18 消息消费失败会进入延时消息队列,消息发送时间与设置的延时等级和重试次数有关,详见代码SendMessageProcessor.java
事务消息
事务消息发送步骤如下:
- 发送方将半事务消息发送至消息队列RocketMQ版服务端。
- 消息队列RocketMQ版服务端将消息持久化成功之后,向发送方返回Ack确认消息已经发送成功,此时消息为半事务消息。
- 发送方开始执行本地事务逻辑。
- 发送方根据本地事务执行结果向服务端提交二次确认(Commit或是Rollback),服务端收到Commit状态则将半事务消息标记为可投递,订阅方最终将收到该消息;服务端收到Rollback状态则删除半事务消息,订阅方将不会接受该消息。
事务消息回查步骤如下: - 在断网或者是应用重启的特殊情况下,上述步骤4提交的二次确认最终未到达服务端,经过固定时间后服务端将对消息发送方即生产者集群中任意一生产者实例发起消息回查。
- 发送方收到消息回查后,需要检查对应消息的本地事务执行的最终结果。
- 发送方根据检查得到的本地事务的最终状态再次提交二次确认,服务端仍按照步骤4对半事务消息进行 *** 作。
概念介绍
● 事务消息:消息队列RocketMQ版提供类似XA或Open XA的分布式事务功能,通过消息队列RocketMQ版事务消息能达到分布式事务的最终一致。
● 半事务消息:暂不能投递的消息,发送方已经成功地将消息发送到了消息队列RocketMQ版服务端,但是服务端未收到生产者对该消息的二次确认,此时该消息被标记成“暂不能投递”状态,处于该种状态下的消息即半事务消息。
● 消息回查:由于网络闪断、生产者应用重启等原因,导致某条事务消息的二次确认丢失,消息队列RocketMQ版服务端通过扫描发现某条消息长期处于“半事务消息”时,需要主动向消息生产者询问该消息的最终状态(Commit或是Rollback),该询问过程即消息回查。
欢迎分享,转载请注明来源:内存溢出
评论列表(0条)