rabbitMQ有五种队列:
目录简单队列、work模式、发布订阅模式、routing模式、topics模式
- 一、引入依赖
- 二、 简单队列
- 2.1 获取MQ连接
- 2.2 生产者发送消息
- 2.3 管理界面查看消息
- 2.4 消费者接收消息
- 三、 work模式
- 3.1 生产者
- 3.2 平均消费模式
- 3.3 能者多劳模式
- 3.4 消息的确认模式
- 3.4.1 自动确认(true):
- 3.4.2 手动确认(false) :
- 四、发布订阅模式
- 4.1 生产者
- 4.2 消费者1
- 4.3 消费者2
- 五、routing模式
- 5.1 生产者
- 5.2 消费者1
- 5.3 消费者2
- 5.4 测试
- 六、topics模式
- 6.1 生产者
- 5.2 消费者1
- 6.3 消费者2
- 6.4 测试
<dependency>
<groupId>com.rabbitmqgroupId>
<artifactId>amqp-clientartifactId>
<version>3.4.1version>
dependency>
二、 简单队列
最简单的工作队列,一个生产者 P 发送消息到队列 Q,一个消费者 C 接收。也称为点对点模式。
public class ConnectionUtil {
/**
* 获取rabbitMQ连接
* @return
*/
public static Connection getConnection() throws IOException {
//创建连接工厂
ConnectionFactory connectionFactory = new ConnectionFactory();
//设置连接ip
connectionFactory.setHost("localhost");
//设置端口
connectionFactory.setPort(5672);
//设置虚拟机
connectionFactory.setVirtualHost("testhost");
//设置用户名称
connectionFactory.setUsername("admin");
//设置密码
connectionFactory.setPassword("admin");
//获取连接
Connection connection = connectionFactory.newConnection();
return connection;
}
}
2.2 生产者发送消息
public class send {
//队列名称
public static final String queueName= "q_test_01";
public static void main(String[] args) throws IOException {
//获取连接
Connection connection = ConnectionUtil.getConnection();
//创建通道
Channel channel = connection.createChannel();
//声明队列
channel.queueDeclare(queueName,false,false,false,null);
//消息内容
String message = "Hello,it's me";
//推送消息
channel.basicPublish("",queueName,null,message.getBytes());
//关闭通道
channel.close();
//关闭连接
connection.close();
}
}
2.3 管理界面查看消息
public class receive {
//队列名称
public static final String queueName= "q_test_01";
public static void main(String[] args) throws IOException, InterruptedException {
//获取连接
Connection connection = ConnectionUtil.getConnection();
//创建通道
Channel channel = connection.createChannel();
//声明队列
channel.queueDeclare(queueName,false,false,false,null);
// 定义队列的消费者
QueueingConsumer consumer = new QueueingConsumer(channel);
//监听队列
channel.basicConsume(queueName,true,consumer);
//实时接收消息
while(true){
QueueingConsumer.Delivery delivery = consumer.nextDelivery();
String message = new String(delivery.getBody());
System.out.println("接收到信息:"+message);
}
}
}
控制台输出
同时从管理界面可以看到队列中的消息被消费
一个生产者、两个消费者;一个消息只能被一个消费者获取。
work模式分为两种模式:
- 两个消费者平均消费队列中的消息,即使他们的消费能力是不一样的(这种似乎不太符合实际的情况)
- 能者多劳模式,处理消息能力强的消费者会获取更多的 消息(更符合实际需求)
向队列发送50条消息,每生产一条消息后都会休眠一段时间,并且越往后休眠的时间越长。
public class send {
//队列名称
public static final String queueName= "q_test_01";
public static void main(String[] args) throws IOException, InterruptedException {
//获取连接
Connection connection = ConnectionUtil.getConnection();
//创建通道
Channel channel = connection.createChannel();
//声明队列
channel.queueDeclare(queueName,false,false,false,null);
for(int i=0;i<50;i++){
String message = "message["+i+"]";
//推送消息
channel.basicPublish("",queueName,null,message.getBytes());
//每次睡眠时间递增
Thread.sleep(10*i);
}
//关闭通道
channel.close();
//关闭连接
connection.close();
}
}
3.2 平均消费模式
创建两个消费者:recv和recv2,各自每次消费消息时休眠时间不同,平均消费模式会无视休眠时间,两个消费者获得消息的数量是一样的。
- 消费者recv(休眠时间:10ms)
public class recv{
//消息队列名称
private final static String QUEUE_NAME = "test_queue_work";
public static void main(String[] argv) throws Exception {
// 获取到连接
Connection connection = ConnectionUtil.getConnection();
//创建通道
Channel channel = connection.createChannel();
// 声明队列
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
// 定义队列的消费者
QueueingConsumer consumer = new QueueingConsumer(channel);
// 监听队列,手动返回完成状态
channel.basicConsume(QUEUE_NAME, false, consumer);
// 获取消息
while (true) {
QueueingConsumer.Delivery delivery = consumer.nextDelivery();
String message = new String(delivery.getBody());
System.out.println(" 接收信息: '" + message + "'");
// 休眠10豪秒
Thread.sleep(10);
channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
}
}
}
- 消费者recv2(休眠时间:1000ms)
public class recv2{
//消息队列名称
private final static String QUEUE_NAME = "test_queue_work";
public static void main(String[] argv) throws Exception {
// 获取到连接
Connection connection = ConnectionUtil.getConnection();
//创建通道
Channel channel = connection.createChannel();
// 声明队列
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
// 定义队列的消费者
QueueingConsumer consumer = new QueueingConsumer(channel);
// 监听队列,手动返回完成状态
channel.basicConsume(QUEUE_NAME, false, consumer);
// 获取消息
while (true) {
QueueingConsumer.Delivery delivery = consumer.nextDelivery();
String message = new String(delivery.getBody());
System.out.println(" 接收信息: '" + message + "'");
// 休眠1秒
Thread.sleep(1000);
channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
}
}
}
测试后发现两个消费者获得消息量是一致的。
3.3 能者多劳模式在消费者中加入一下的代码,使同一时刻服务器只会发一条消息给消费者
channel.basicQos(1);
可以看到消费者recv收到了41条消息
消费者recv2收到了9条消息
这样 的结果才符合work模式的能者多劳模式,理能力高的人会接收到更多的消息。
3.4 消息的确认模式从上面的消费者代码中,有一行跟简单队列的参数不一样。
/**
* @var1 : 通道名称
* @var2 : 是否自动确认消息(true,false)
* @var3 :消费者对象
**/
String basicConsume(String var1, boolean var2, Consumer var3) throws IOException;
3.4.1 自动确认(true):
只要消息从队列中获取,无论消费者获取到消息后是否成功消息,都认为是消息已经成功消费
消费者从队列中获取消息后,服务器会将该消息标记为不可用状态,等待消费者的反馈,如果消费者一直没有反馈,那么该消息将一直处于不可用状态
四、发布订阅模式消费者接收消息时,需要调用basicAck()进行反馈
一个生产者,一个交换器,多个消息队列,多个消费者。
交换机的类型为fanout
以用户注册为例,可以通过邮箱或者手机号注册用户,注册完后会向邮箱和手机号都发送注册完成信息。
实际上邮箱和手机号信息发送是不同的业务逻辑,不应该放在一块处理。这个时候就可以利用发布/订阅模式将消息发送到转换机(EXCHANGE),声明两个不同的队列(邮箱、手机),并绑定到交换机。这样生产者只需要发布一次消息,两个队列都会接收到消息发给对应的消费者。
向交换机发送20条消息,交换机将消息发送到多个队列
public class send {
//交换机名称
static final String TEST_EXCHANGE = "test_exchange";
//消息队列1
static final String TEST_QUEUE_1 = "test_subscribe_queue1";
//消息队列2
static final String TEST_QUEUE_2 = "test_subscribe_queue2";
public static void main(String[] args) throws IOException {
//获取连接
Connection connection = ConnectionUtil.getConnection();
//创建通道
Channel channel = connection.createChannel();
/**
* 声明交换机
* 参数1 : 交换机名称
* 参数2 : 交换机类型,fanout、topic、direct、headers
*/
channel.exchangeDeclare(TEST_EXCHANGE,"fanout");
/**
* 声明队列
* 参数1 : 队列名称
* 参数2:是否定义持久化队列
* 参数3:是否独占本次连接
* 参数4:是否在不使用的时候自动删除队列
* 参数5:队列其它参数
*/
channel.queueDeclare(TEST_QUEUE_1, true, false, false, null);
channel.queueDeclare(TEST_QUEUE_2, true, false, false, null);
//队列绑定交换机
channel.queueBind(TEST_QUEUE_1, TEST_EXCHANGE, "");
channel.queueBind(TEST_QUEUE_2, TEST_EXCHANGE, "");
for(int i=0;i<20;i++){
String message = "message[x] :"+i;
/**
* 推送消息
* 参数1:交换机名称,如果没有指定则使用默认Default Exchage
* 参数2:路由key,简单模式可以传递队列名称
* 参数3:消息其它属性
* 参数4:消息内容
*/
channel.basicPublish(TEST_EXCHANGE,"",null,message.getBytes());
System.out.println("已发送信息:"+message);
}
channel.close();
connection.close();
}
}
4.2 消费者1
public class Custom1 {
public static void main(String[] args) throws IOException {
//获取连接
Connection connection = ConnectionUtil.getConnection();
//创建通道
Channel channel = connection.createChannel();
//声明交换机
channel.exchangeDeclare(send.TEST_EXCHANGE,"fanout");
//声明队列
channel.queueDeclare(send.TEST_QUEUE_1, true, false, false, null);
//绑定交换机
channel.queueBind(send.TEST_QUEUE_1, send.TEST_EXCHANGE, "");
//创建消费者
DefaultConsumer consumer = new DefaultConsumer(channel){
/**
* 重写数据处理
* @param consumerTag
* @param envelope
* @param properties
* @param body
* @throws IOException
*/
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
//路由key
System.out.println("路由key为:" + envelope.getRoutingKey());
//交换机
System.out.println("交换机为:" + envelope.getExchange());
//消息id
System.out.println("消息id为:" + envelope.getDeliveryTag());
//收到的消息
System.out.println("消费者1-接收到的消息为:" + new String(body, "utf-8"));
}
};
//监听队列
channel.basicConsume(send.TEST_QUEUE_1,true,consumer);
}
}
4.3 消费者2
public class Custom2 {
public static void main(String[] args) throws IOException {
//获取连接
Connection connection = ConnectionUtil.getConnection();
//创建通道
Channel channel = connection.createChannel();
//声明交换机
channel.exchangeDeclare(send.TEST_EXCHANGE,"fanout");
//声明队列
channel.queueDeclare(send.TEST_QUEUE_2, true, false, false, null);
//绑定交换机
channel.queueBind(send.TEST_QUEUE_2, send.TEST_EXCHANGE, "");
//创建消费者
DefaultConsumer consumer = new DefaultConsumer(channel){
/**
* 重写数据处理
* @param consumerTag
* @param envelope
* @param properties
* @param body
* @throws IOException
*/
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
//路由key
System.out.println("路由key为:" + envelope.getRoutingKey());
//交换机
System.out.println("交换机为:" + envelope.getExchange());
//消息id
System.out.println("消息id为:" + envelope.getDeliveryTag());
//收到的消息
System.out.println("消费者2-接收到的消息为:" + new String(body, "utf-8"));
}
};
//监听队列
channel.basicConsume(send.TEST_QUEUE_2,true,consumer);
}
}
五、routing模式
不同于发布订阅模式把消息交给每一个绑定的队列,routing(路由)模式是根据消息的 Routing Key 进行判断,只有队列的Routing key 与消息的 Routing key 完全一致,才会接收到消息
5.1 生产者交换机的类型为direct
发布信息时,需要指定Routing key
生成多个路由的消息
public class send {
//交换机名称
private final static String EXCHANGE_NAME = "test_exchange_direct";
//路由名称warning
private final static String ROUTING_KEY_WARNING = "warning";
//路由名称info
private final static String ROUTING_KEY_INFO = "info";
//路由名称error
private final static String ROUTING_KEY_ERROR = "error";
public static void main(String[] args)
{
try
{
//获取连接
Connection connection = ConnectionUtil.getConnection();
//获取通道
Channel channel = connection.createChannel();
//声明交换机
channel.exchangeDeclare(EXCHANGE_NAME, "direct");
//发送消息(warning级别日志)
String message = "this is warning log";
channel.basicPublish(EXCHANGE_NAME, ROUTING_KEY_WARNING, null, message.getBytes("utf-8"));
System.out.println("[send]:" + message);
//发送消息(info级别日志)
message = "this is info log";
channel.basicPublish(EXCHANGE_NAME, ROUTING_KEY_INFO, null, message.getBytes("utf-8"));
System.out.println("[send]:" + message);
//发送消息(error级别日志)
message = "this is error log";
channel.basicPublish(EXCHANGE_NAME, ROUTING_KEY_ERROR, null, message.getBytes("utf-8"));
System.out.println("[send]:" + message);
channel.close();
connection.close();
} catch (UnsupportedEncodingException e) {
e.printStackTrace();
} catch (IOException e) {
e.printStackTrace();
}
}
}
5.2 消费者1
只接收routing key为error的消息
public class custom1 {
//交换机名称
private final static String EXCHANGE_NAME = "test_exchange_direct";
//路由名称error
private final static String ROUTING_KEY_ERROR = "error";
//队列名称
private static final String QUEUE_NAME = "test_queue_handel_error";
public static void main(String[] args)
{
try
{
//获取连接
Connection connection = ConnectionUtil.getConnection();
//从连接中获取一个通道
final Channel channel = connection.createChannel();
//声明队列
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
//将队列绑定到交换机(指定路由error)
channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, ROUTING_KEY_ERROR);
//每个时刻只发送消息到一个消费者
channel.basicQos(1);
//定义消费者
DefaultConsumer consumer = new DefaultConsumer(channel){
//消息接收时
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
String message = new String(body, "utf-8");
System.out.println("[test_queue_handel_error] Receive message:" + message);
try
{
//休息10ms处理业务
Thread.sleep(10);
}
catch (InterruptedException e)
{
e.printStackTrace();
}
finally
{
//手动应答
channel.basicAck(envelope.getDeliveryTag(), false);
}
}
};
//监听队列,设置手动应答
channel.basicConsume(QUEUE_NAME, false, consumer);
}
catch (IOException e)
{
e.printStackTrace();
}
}
}
5.3 消费者2
接收所有消息
public class custom2 {
//交换机名称
private final static String EXCHANGE_NAME = "test_exchange_direct";
//路由名称warning
private final static String ROUTING_KEY_WARNING = "warning";
//路由名称info
private final static String ROUTING_KEY_INFO = "info";
//路由名称error
private final static String ROUTING_KEY_ERROR = "error";
//队列名称
private static final String QUEUE_NAME = "test_queue_all_log";
public static void main(String[] args){
try{
//获取连接
Connection connection = ConnectionUtil.getConnection();
//创建通道
final Channel channel = connection.createChannel();
//声明队列
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
//将队列绑定到交换机(指定所有路由)
channel.queueBind(QUEUE_NAME,EXCHANGE_NAME,ROUTING_KEY_WARNING);
channel.queueBind(QUEUE_NAME,EXCHANGE_NAME,ROUTING_KEY_INFO);
channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, ROUTING_KEY_ERROR);
//每个时刻只发送消息到一个消费者
channel.basicQos(1);
//定义消费者
DefaultConsumer consumer = new DefaultConsumer(channel){
//消息接收时
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
String message = new String(body, "utf-8");
System.out.println("[test_queue_all_log] Receive message:" + message);
try
{
//休息1s处理业务
Thread.sleep(1000);
}
catch (InterruptedException e)
{
e.printStackTrace();
}
finally
{
//手动应答
channel.basicAck(envelope.getDeliveryTag(), false);
}
}
};
//监听队列,设置手动应答
channel.basicConsume(QUEUE_NAME, false, consumer);
}
catch (IOException e)
{
e.printStackTrace();
}
}
}
5.4 测试
生产者生成消息
消费者1接收到error的消息
消费者2接收到所有消息
同样是消费者配置routing key匹配消息监听队列,不同于routing模式根据routing key相同匹配,topic模式匹配routingKey时采用通配符匹配
6.1 生产者交换机的类型为topic
通配符#:匹配一个或多个词(每个词用.隔开),例如aa.#,可以匹配aa.bb、aa.cc、aa.bb.cc
通配符*:匹配一个词,比如aa.*,可以匹配aa.bb、aa.cc
生成多条信息
public class provider {
static final String EXCHANGE_NAME = "test_exchange_topic";
static final String TOPIC_ORANGE_KEY = "aa.orange.bb";
static final String TOPIC_RABBIT_KEY = "aa.cc.rabbit";
static final String TOPIC_LAZ_Y_KEY = "laz.y.bb.cc";
public static void main(String[] args) throws IOException {
//获取连接
Connection connection = ConnectionUtil.getConnection();
//创建通道
Channel channel = connection.createChannel();
//声明交换机
channel.exchangeDeclare(EXCHANGE_NAME,"topic");
//发送消息:ORANGE
String message = "ORANGE";
channel.basicPublish(EXCHANGE_NAME,TOPIC_ORANGE_KEY,null,message.getBytes());
System.out.println("[seng]:"+message);
//发送消息:RABBIT
message = "RABBIT";
channel.basicPublish(EXCHANGE_NAME,TOPIC_RABBIT_KEY,null,message.getBytes());
System.out.println("[seng]:"+message);
//发送消息:LAZ_Y
message = "LAZ_Y";
channel.basicPublish(EXCHANGE_NAME,TOPIC_LAZ_Y_KEY,null,message.getBytes());
System.out.println("[seng]:"+message);
//关闭通道
channel.close();
//关闭连接
connection.close();
}
}
5.2 消费者1
根据**.orange.**匹配routing key,接收消息
public class recv1 {
static final String EXCHANGE_NAME = "test_exchange_topic";
static final String TOPIC_ORANGE_KEY = "*.orange.*";
static final String QUEUE_NAME = "test_queue_one_topic";
public static void main(String[] args) throws IOException {
//获取连接
Connection connection = ConnectionUtil.getConnection();
//创建通道
final Channel channel = connection.createChannel();
//声明队列
channel.queueDeclare(QUEUE_NAME,false,false,false,null);
//绑定队列
channel.queueBind(QUEUE_NAME,EXCHANGE_NAME,TOPIC_ORANGE_KEY);
//每个时刻只发送消息到一个消费者
channel.basicQos(1);
//定义消费者
DefaultConsumer consumer = new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
String message = new String(body);
System.out.println("[receive]:"+message);
try {
Thread.sleep(10);
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
//手动应答
channel.basicAck(envelope.getDeliveryTag(),false);
}
}
};
//监听(手动应答)
channel.basicConsume(QUEUE_NAME,false,consumer);
}
}
6.3 消费者2
根据*.*.rabbit或laz.y.#匹配routing key,接收消息
public class recv2 {
static final String EXCHANGE_NAME = "test_exchange_topic";
static final String TOPIC_RABBIT_KEY = "*.*.rabbit";
static final String TOPIC_LAZ_Y_KEY = "laz.y.#";
static final String QUEUE_NAME = "test_queue_more_topic";
public static void main(String[] args) throws IOException {
//获取连接
Connection connection = ConnectionUtil.getConnection();
//创建通道
final Channel channel = connection.createChannel();
//声明队列
channel.queueDeclare(QUEUE_NAME,false,false,false,null);
//绑定队列
channel.queueBind(QUEUE_NAME,EXCHANGE_NAME,TOPIC_RABBIT_KEY);
channel.queueBind(QUEUE_NAME,EXCHANGE_NAME,TOPIC_LAZ_Y_KEY);
//每个时刻只发送消息到一个消费者
channel.basicQos(1);
//定义消费者
DefaultConsumer consumer = new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
String message = new String(body);
System.out.println("[receive]:"+message);
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
//手动应答
channel.basicAck(envelope.getDeliveryTag(),false);
}
}
};
//监听(手动应答)
channel.basicConsume(QUEUE_NAME,false,consumer);
}
}
6.4 测试
消费者1接收到对应的消息
消费者2接收到对应的信息
欢迎分享,转载请注明来源:内存溢出
评论列表(0条)