二、模型介绍及代码实现RabiitMQ 目前有七大通信模型,分别是:
Hello World 模型(最简单的模型,点对点通过队列直接通信)
Work Queues 工作队列模型(在多个工人之间分配消息,竞争消费消息,能者多劳,消费快的工人能够消费更多的消息)
Publish / Subscribe 发布与订阅模型(Fanout 广播模式,一条消息可以被多个消费者消费)
Route 路由模型(Direct模式,根据消息类型,有选择性的消费消息)
Topic 动态路由模型(与Route路由模型类似,有选择性的消费消息,不同之处在于它可以定义动态路由)
RPC 通信模型(这里不作介绍)
Publisher /confirm/is 发布者确认模型(这里不作介绍)
引入依赖:
1. HelloWorld 模型com.rabbitmq amqp-client5.7.3
Hello World 模型是最基本的通信模型,它通过队列直接点对点进行通信。
生产者
public class Provider { public static void main(String[] args) { // 创建连接工厂 ConnectionFactory factory = new ConnectionFactory(); factory.setHost("121.41.179.236"); factory.setPort(5672); factory.setUsername("admin"); factory.setPassword("123456"); factory.setVirtualHost("/"); Connection connection = null; Channel channel = null; try { connection = factory.newConnection(); // 创建通道,交换机和队列都是通过通道来传递信息的 channel = connection.createChannel(); // 通道绑定队列 // 参数1:队列名称,若队列不存在,则创建 // 参数2:队列是否持久化 // 参数3:是否独占队列 // 参数4:队列消费完消息后,是否自动删除队列 // 参数5:额外传递的参数 channel.queueDeclare("hello", true, false, false, null); // 发送消息 // 参数1:交换机名称,若不往交换机发送消息,则留空 // 参数2:队列名称或路由名称 // 参数3:消息的额外设置(如设置消息的持久化,MessageProperties.TEXT_PLAIN表示消息持久化) // 参数4:发送的消息 channel.basicPublish("", "hello", null, "我是hello模型发送的信息".getBytes()); } catch (Exception e) { e.printStackTrace(); }finally { try { if (channel != null) { channel.close(); } if (connection != null) { connection.close(); } } catch (Exception e) { e.printStackTrace(); } } } }
消费者
public class Consumer { public static void main(String[] args) { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("121.41.179.236"); factory.setPort(5672); factory.setUsername("admin"); factory.setPassword("123456"); factory.setVirtualHost("/"); try { Connection connection = factory.newConnection(); // 创建通道,交换机和队列都是通过通道来传递信息的 Channel channel = connection.createChannel(); // 通道绑定队列(参数设置需要与消息生成者绑定的队列保持一致) // 参数1:队列名称,若队列不存在,则创建 // 参数2:队列是否持久化 // 参数3:是否独占队列 // 参数4:队列消费完消息后,是否自动删除队列 // 参数5:额外传递的参数 channel.queueDeclare("hello", true, false, false, null); // 从队列中消费消息 // 参数1:消费的队列名称 // 参数2:是否自动确认(拿到消息后,是否自动确认消息已消费完成) // 参数3:消费时的回调接口 channel.basicConsume("hello", true, new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { System.out.println("接收到来自hello模型的消息为:" + new String(body)); } }); } catch (Exception e) { e.printStackTrace(); } } }
由于每次创建连接对象的代码都是一样的,所以这里封装成一个工具类,方便使用。
public class ConnectUtils { private static Connection connection = null; public static Connection getConnection(){ if (connection == null){ synchronized (ConnectUtils.class) { if (connection == null) { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("121.41.179.236"); factory.setPort(5672); factory.setUsername("admin"); factory.setPassword("123456"); factory.setVirtualHost("/"); try { connection = factory.newConnection(); } catch (Exception e) { e.printStackTrace(); } } } } return connection; } public static void close(Connection connection, Channel channel) { try { if (channel != null){ channel.close(); } if (connection != null){ connection.close(); } }catch (Exception e){ e.printStackTrace(); } } }2. WorkQueues 模型
Work Queues 工作队列模型(在多个工人之间分配消息,竞争消费消息,能者多劳,即消费快的工人能够消费更多的消息,如:一个队列中有10条消息,甲消费的快,消费了8条,而乙只消费了2条)
生产者
public class Provider { public static void main(String[] args) throws IOException { Connection connection = ConnectUtils.getConnection(); // 创建通道 Channel channel = connection.createChannel(); // 通道绑定队列 channel.queueDeclare("work", true, false, false, null); // 模拟生成20条消息 for (int i = 0; i < 20; i++) { // 发送消息 // 参数1:交换机名称,若不往交换机发送消息,则留空 // 参数2:队列名称或路由名称 // 参数3:消息的额外设置(如设置消息的持久化,MessageProperties.TEXT_PLAIN表示消息持久化) // 参数4:发送的消息 channel.basicPublish("", "work", MessageProperties.TEXT_PLAIN, ("我是来自work queue工作模型的消息-" + i).getBytes()); } ConnectUtils.close(connection, channel); } }
消费者1
public class Consumer01 { public static void main(String[] args) throws IOException { Connection connection = ConnectUtils.getConnection(); // 创建通道 Channel channel = connection.createChannel(); // 通道绑定队列 channel.queueDeclare("work", true, false, false, null); // 设置一次只能消费一条消息 // 否则尽管该消费者消费消息的速度比较慢,但rabbitmq还是会将所有的消息平分给所有消费者进行处理 channel.basicQos(1); // 从队列中消费消息 // 将参数2设置为false,即关闭消息自动确认机制,转为手动确认,可实现能者多劳模式,谁处理的快,谁就多处理些。 channel.basicConsume("work", false, new DefaultConsumer(channel){ @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { try { // 模拟消费者1的处理速度较慢 Thread.sleep(500); }catch (Exception e) { e.printStackTrace(); } System.out.println("消费者1:接收到的消息为:" + new String(body)); // 手动确认消息已消费完成 // 参数1:确认队列中哪个具体消息 // 参数2:是否开启多个消息同时确认 channel.basicAck(envelope.getDeliveryTag(), false); } }); } }
消费者2
public class Consumer02 { public static void main(String[] args) throws IOException { Connection connection = ConnectUtils.getConnection(); // 创建通道 Channel channel = connection.createChannel(); // 通道绑定队列 channel.queueDeclare("work", true, false, false, null); // 设置一次只能消费一条消息 // 否则尽管该消费者消费消息的速度比较慢,但rabbitmq还是会将所有的消息平分给所有消费者进行处理 channel.basicQos(1); // 从队列中消费消息 // 将参数2设置为false,即关闭消息自动确认机制,转为手动确认,可实现能者多劳模式,谁处理的快,谁就多处理些。 channel.basicConsume("work", false, new DefaultConsumer(channel){ @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { System.out.println("消费者2:接收到的消息为:" + new String(body)); // 手动确认消息已消费完成 // 参数1:确认队列中哪个具体消息 // 参数2:是否开启多个消息同时确认 channel.basicAck(envelope.getDeliveryTag(), false); } }); } }3. Publish / Subscribe 模型
发布与订阅模型,即 fanout 广播模式,一条消息可以被多个消费者消费。
生产者
public class Provider { public static void main(String[] args) throws IOException { Connection connection = ConnectUtils.getConnection(); // 创建通道 Channel channel = connection.createChannel(); // 通道绑定交换机 // 参数1:交换机名称 // 参数2:交换机的类型 fanout、direct、topic channel.exchangeDeclare("logs", "fanout"); // 发送消息 // 参数1:交换机名称 // 参数2:自定义路由名称,若未使用路由,则留空 // 参数3:传递消息的额外设置(如设置消息的持久化) // 参数4:发送的消息 channel.basicPublish("logs", "", null, "我是来自fanout模型的消息".getBytes()); // 关闭资源 ConnectUtils.close(connection, channel); } }
消费者1
Connection connection = ConnectUtils.getConnection(); // 创建通道 Channel channel = connection.createChannel(); // 通道绑定交换机 channel.exchangeDeclare("logs", "fanout"); // 创建临时队列 String queue = channel.queueDeclare().getQueue(); // 将临时队列绑定到交换机上 // 参数1:队列名称 // 参数2:交换机名称 // 参数3:路由名称,未使用时则留空 channel.queueBind(queue, "logs", ""); // 从临时队列中消费消息 channel.basicConsume(queue, true, new DefaultConsumer(channel){ @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { System.out.println("消费者1-接收到的消息为:" + new String(body)); } });
消费者2
Connection connection = ConnectUtils.getConnection(); // 创建通道 Channel channel = connection.createChannel(); // 通道绑定交换机 channel.exchangeDeclare("logs", "fanout"); // 创建临时队列 String queue = channel.queueDeclare().getQueue(); // 将临时队列绑定到交换机上 // 参数1:队列名称 // 参数2:交换机名称 // 参数3:路由名称,未使用时则留空 channel.queueBind(queue, "logs", ""); // 从临时队列中消费消息 channel.basicConsume(queue, true, new DefaultConsumer(channel){ @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { System.out.println("消费者2-接收到的消息为:" + new String(body)); } });4. Route 模型
Route路由模型,即 direct 路由模式,消费者根据消息类型,有选择性的消费消息。
生产者
public class Provider { public static void main(String[] args) throws IOException { Connection connection = ConnectUtils.getConnection(); // 创建通道 Channel channel = connection.createChannel(); String exchangeName = "logs_direct"; // 通道绑定交换机(direct类型交换机) channel.exchangeDeclare(exchangeName, "direct"); // 发送消息(error 和 info 消息各一条) // 参数1:交换机名称 // 参数2:自定义路由名称,表示发送的消息类型 // 参数3:传递消息的额外设置(如设置消息的持久化) // 参数4:发送的消息 channel.basicPublish(exchangeName, "error", null, "我是来自direct模式下的error类型的消息".getBytes()); channel.basicPublish(exchangeName, "info", null, "我是来自direct模式下的info类型的消息".getBytes()); // 关闭资源 ConnectUtils.close(connection, channel); } }
消费者1
public class Consumer01 { public static void main(String[] args) throws IOException { Connection connection = ConnectUtils.getConnection(); // 创建通道 Channel channel = connection.createChannel(); // 通道绑定交换机 String exchangeName = "logs_direct"; channel.exchangeDeclare(exchangeName, "direct"); // 创建临时队列 String queue = channel.queueDeclare().getQueue(); // 将临时队列绑定到交换机上,并声明接收的消息类型为 error和 info // 参数1:队列名称 // 参数2:交换机名称 // 参数3:自定义接收的路由名称 channel.queueBind(queue, exchangeName, "error"); channel.queueBind(queue, exchangeName, "info"); // 从队列中消费消息 channel.basicConsume(queue, true, new DefaultConsumer(channel){ @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { System.out.println("消费者1-接收到的消息为:" + new String(body)); } }); } }
消费者2
public class Consumer02 { public static void main(String[] args) throws IOException { Connection connection = ConnectUtils.getConnection(); // 创建通道 Channel channel = connection.createChannel(); // 通道绑定交换机 String exchangeName = "logs_direct"; channel.exchangeDeclare(exchangeName, "direct"); // 创建临时队列 String queue = channel.queueDeclare().getQueue(); // 将临时队列绑定到交换机上,并声明接收的消息类型为 info // 参数1:队列名称 // 参数2:交换机名称 // 参数3:自定义接收的路由名称 channel.queueBind(queue, exchangeName, "info"); // 从队列中消费消息 channel.basicConsume(queue, true, new DefaultConsumer(channel){ @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { System.out.println("消费者2-接收到的消息为:" + new String(body)); } }); } }5. Topic 模型
动态路由模型,即 topic 动态路由模式,它与Route路由模型类似,也是有选择性的消费消息,不同之处在于它可以定义动态路由,即使用通配符的形式定义路由名称。例如:user.* 、user.#
* 代表匹配一个任意字符
# 代表匹配 0 到多个任意字符
生产者
public class Provider { public static void main(String[] args) throws IOException { Connection connection = ConnectUtils.getConnection(); // 创建通道 Channel channel = connection.createChannel(); String exchangeName = "logs_topic"; // 通道绑定交换机(topic类型交换机) channel.exchangeDeclare(exchangeName, "topic"); // 发送消息 // 参数1:交换机名称 // 参数2:自定义路由名称,表示发送的消息类型 // 参数3:传递消息的额外设置(如设置消息的持久化) // 参数4:发送的消息 channel.basicPublish(exchangeName, "user", null, "我是来自topic模式下的user类型的消息".getBytes()); channel.basicPublish(exchangeName, "user.insert", null, "我是来自topic模式下的user.insert类型的消息".getBytes()); channel.basicPublish(exchangeName, "user.info.findName", null, "我是来自topic模式下的user.info.findName类型的消息".getBytes()); // 关闭资源 ConnectUtils.close(connection, channel); } }
消费者1
public class Consumer01 { public static void main(String[] args) throws IOException { Connection connection = ConnectUtils.getConnection(); // 创建通道 Channel channel = connection.createChannel(); // 通道绑定交换机 String exchangeName = "logs_topic"; channel.exchangeDeclare(exchangeName, "topic"); // 创建临时队列 String queue = channel.queueDeclare().getQueue(); // 将临时队列绑定到交换机上,并使用通配符的形式声明接收的消息类型 // 参数1:队列名称 // 参数2:交换机名称 // 参数3:自定义接收的路由名称 // user.* 可接收 user.insert 等类型的消息 channel.queueBind(queue, exchangeName, "user.*"); // 从队列中消费消息 channel.basicConsume(queue, true, new DefaultConsumer(channel){ @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { System.out.println("消费者1-接收到的消息为:" + new String(body)); } }); } }
消费者2
public class Consumer02 { public static void main(String[] args) throws IOException { Connection connection = ConnectUtils.getConnection(); // 创建通道 Channel channel = connection.createChannel(); // 通道绑定交换机 String exchangeName = "logs_topic"; channel.exchangeDeclare(exchangeName, "topic"); // 创建临时队列 String queue = channel.queueDeclare().getQueue(); // 将临时队列绑定到交换机上,并使用通配符的形式声明接收的消息类型 // 参数1:队列名称 // 参数2:交换机名称 // 参数3:自定义接收的路由名称 // user.# 可接收 user、user.insert、user.info.findName 等类型的消息 channel.queueBind(queue, exchangeName, "user.#"); // 从队列中消费消息 channel.basicConsume(queue, true, new DefaultConsumer(channel){ @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { System.out.println("消费者2-接收到的消息为:" + new String(body)); } }); } }三、SpringBoot整合RabbitMQ
引入依赖:
org.springframework.boot spring-boot-starter-amqp
配置文件:
spring: # rabbitmq配置 rabbitmq: host: 121.41.179.236 port: 5672 username: admin password: 123456 virtual-host: /
1. HelloWorld 模型在SpringBoot中通过自动注入,可直接使用 RabbitTemplate 来 *** 作 RabbitMQ
生产者
@Autowired private RabbitTemplate rabbitTemplate; @GetMapping("/hello-world") public String helloWorld(String message){ // 点对点通过队列直接通信 // 参数1:队列名称 参数2:发送的消息 rabbitTemplate.convertAndSend("hello", message); return "ok"; }
消费者
@Component public class HelloConsumer { // 监听 hello队列 的消息 @RabbitListener(queuesToDeclare = @Queue(value = "hello", durable = "true")) public void receive(String message){ System.out.println("消费者-接收到消息:" + message); } }2. WorkQueues 模型
生产者
@GetMapping("/work-queues") public String workQueues(String message){ // 发送10条消息,模拟 work queues 工作队列,多个工人(消费者)竞争消费 for (int i = 0; i < 10; i++) { // 参数1:队列名称 参数2:发送的消息 rabbitTemplate.convertAndSend("work", message + i); } return "ok"; }
消费者1
// 消费者1 @RabbitListener(queuesToDeclare = @Queue("work")) public void receive1(String msg, Message message, Channel channel){ try { // 模拟处理消息的延迟 Thread.sleep(1000); System.out.println("消费者1-接收到消息:" + msg); // 手动确认消息已消费(使用手动确认消息时,需要在配置文件中设置acknowledge-mode=manual) channel.basicAck(message.getMessageProperties().getDeliveryTag(), false); } catch (Exception e) { e.printStackTrace(); } }
消费者2
// 消费者2 @RabbitListener(queuesToDeclare = @Queue("work")) public void receive2(String msg, Message message, Channel channel){ try { System.out.println("消费者2-接收到消息:" + msg); // 手动确认消息已消费(使用手动确认消息时,需要在配置文件中设置acknowledge-mode=manual) channel.basicAck(message.getMessageProperties().getDeliveryTag(), false); } catch (Exception e) { e.printStackTrace(); } }
结果
消费者2-接收到消息:消息0 消费者2-接收到消息:消息2 消费者2-接收到消息:消息3 消费者2-接收到消息:消息4 消费者2-接收到消息:消息5 消费者2-接收到消息:消息6 消费者2-接收到消息:消息7 消费者2-接收到消息:消息8 消费者2-接收到消息:消息9 消费者1-接收到消息:消息1
3. Publish / Subscribe 模型注意:
实现能者多劳模式时:
需要在配置文件中设置 spring.rabbitmq.listener.simple.prefetch=1
实现手动确认消息时:
需要在配置文件中设置 spring.rabbitmq.listener.simple.acknowledge-mode=manual
生产者
@GetMapping("/fanout") public String fanout(String message){ // fanout广播模式,一条消息可以被多个消费者消费。 // 发送一条消息到交换机中,所有绑定了该交换机的队列,都将收到消息,进而被对应的消费者消费 // 参数1:交换机名称 参数2:路由名称(消息的类型) 参数3:发送的消息 rabbitTemplate.convertAndSend("logs", "", message); return "ok"; }
消费者1
// 消费者1 @RabbitListener(bindings = { @QueueBinding( // 创建临时队列 value = @Queue, // 绑定的交换机 exchange = @Exchange(value = "logs", type = "fanout") ) }) public void receive1(String message){ System.out.println("消费者1-接收到消息:" + message); }
消费者2
// 消费者2 @RabbitListener(bindings = { @QueueBinding( value = @Queue, exchange = @Exchange(value = "logs", type = "fanout") ) }) public void receive2(String message){ System.out.println("消费者2-接收到消息:" + message); }
结果
消费者1-接收到消息:我是fanout消息 消费者2-接收到消息:我是fanout消息4. Route 模型
生产者
@GetMapping("/direct") public String direct(String message){ // direct路由模式,它也具有广播模式的特点,一条消息可以被多个消费者消费。 // 不同之处在于消费者可以根据消息的类型,有选择性的消费消息。 // 参数1:交换机名称 参数2:路由名称(消息的类型) 参数3:发送的消息 // 发送三种不同类型的消息 rabbitTemplate.convertAndSend("logs_direct", "error", message + "-error"); rabbitTemplate.convertAndSend("logs_direct", "warning", message + "-warning"); rabbitTemplate.convertAndSend("logs_direct", "info", message + "-info"); return "ok"; }
消费者1
// 消费者1 @RabbitListener(bindings = { @QueueBinding( // 创建临时队列 value = @Queue, // 绑定的交换机(type默认是direct,可不写) exchange = @Exchange(value = "logs_direct"), // 指明消费的消息类型(路由名称) key = "info" ) }) public void receive1(String message){ System.out.println("消费者1-接收到消息:" + message); }
消费者2
// 消费者2 @RabbitListener(bindings = { @QueueBinding( // 创建临时队列 value = @Queue, // 绑定的交换机(type默认是direct,可不写) exchange = @Exchange("logs_direct"), // 指明消费的消息类型(路由名称) key = "error" ), @QueueBinding( // 创建临时队列 value = @Queue, // 绑定的交换机(type默认是direct,可不写) exchange = @Exchange("logs_direct"), // 指明消费的消息类型(路由名称) key = "warning" ) }) public void receive2(String message){ System.out.println("消费者2-接收到消息:" + message); }
结果
# 消费者1可接收info类型的消息 消费者1-接收到消息:我是direct消息-info # 消费者2可接收error、warning类型的消息 消费者2-接收到消息:我是direct消息-error 消费者2-接收到消息:我是direct消息-warning5. Topic 模型
生产者
@GetMapping("/topic") public String topic(String message){ // topic动态路由模式,与 direct路由模式基本相同,也是根据消息的类型,有选择性的消费消息。 // 不同之处在于,topic模式下,消费者可以通过使用通配符(* #)来指定消费的消息类型。 // 参数1:交换机名称 参数2:路由名称(消息的类型) 参数3:发送的消息 // 发送三种不同类型的消息 rabbitTemplate.convertAndSend("logs_topic", "user", message + "-user"); rabbitTemplate.convertAndSend("logs_topic", "user.search", message + "-user.search"); rabbitTemplate.convertAndSend("logs_topic", "user.info.findName", message + "-user.info.findName"); return "ok"; }
消费者1
// 消费者1 @RabbitListener(bindings = { @QueueBinding( // 创建临时队列 value = @Queue, // 绑定的交换机 exchange = @Exchange(value = "logs_topic", type = "topic"), // 指明消费的消息类型(路由名称) key = "user.*" ) }) public void receive1(String message){ System.out.println("消费者1-接收到消息:" + message); }
消费者2
// 消费者2 @RabbitListener(bindings = { @QueueBinding( // 创建临时队列 value = @Queue, // 绑定的交换机 exchange = @Exchange(value = "logs_topic", type = "topic"), // 指明消费的消息类型(路由名称) key = "user.#" ) }) public void receive2(String message){ System.out.println("消费者2-接收到消息:" + message); }
结果
# 消费者1可接收user.*类型的消息 (*代表一个任意字符) 消费者1-接收到消息:我是topic消息-user.search # 消费者2可接收user.#类型的消息 (#代表0-多个任意字符) 消费者2-接收到消息:我是topic消息-user 消费者2-接收到消息:我是topic消息-user.search 消费者2-接收到消息:我是topic消息-user.info.findName四、RabbitMQ集群
集群相关信息可搜索其他资料
欢迎分享,转载请注明来源:内存溢出
评论列表(0条)