1.简介
2.安装
3.使用
- 3.1.创建简单列队
- 3.2.创建工作列队
- 3.3.创建订阅列队
- 3.4.创建路由列队
- 3.5.创建主题列队
- 3.6.事务
- 3.7.确认模式
- 3.7.1.同步确认
- 3.7.2.异步确认
- 使用springBoot 简单的实现AMQP
在计算机科学中,消息队列(英语:Message queue)是一种进程间通信或同一进程的不同线程间的通信方式,软件的序列用来处理一系列的输入,通常是来自用户的。消息队列提供了异步的通信协议,每一个序列中的记录包含了详细说明的数据,包含发生的时间,输入设备的种类,以及特定的输入参数,也就是说:消息的发送者和接收者不需要同时与消息队列交互。消息保存在队列中,直到接收者取回它。
2.1、实现:2.2、特点:消息队列常常保存在链表结构中。拥有权限的进程才可以向消息队列中写入或读取消息
目前,有很多消息队列有很多的实现,包括 JBoss Messing、JORAM、Apache、ActiveMQ、SunPoen Message Queue、IBM MQ、Apache Qpid和HTTPSQS
当前使用较多的消息队列有:RabbitMQ、RocketMQ、ActiveMQ、Kafka、ZeroMQ、metaMq等而部分数据库如:Redis,Mysql,以及phxsql也可以实现消息队列的功能。
注意:MQ是消费者-生产者模型中的一个典型代表,一端往消息队列中不断写入消息,而另一端则可以读取或者订阅队列中的消息。MQ和JMS类似,但不同的是JMS是SUN JAVA消息中间件服务的一个标准和API定义,而MQ则是遵循了AMQP协议的具体实现和产品。
1.AMDP,即Advanced Message Queuing Protocol,一个提供统一消息服务的应用层标准高级消息队列协议,是应用层协议的一个开放标准,为面向消息的中间件设计。基于此协议的客户端与中间件可传递消息,并不受客户端/中间件影响,不同的开发语言等条件的限制。
2.AMS,即java消息服务(java Message Service)应用程序接口,是一个java平台中关于面向消息中间件的API,用于在两个应用程序之间,或分布式系统中发送消息,进行异步通信。java消息服务是一个与具体平台无关的API,绝大多数MOM提供商都对JMS提供服务。常见的消息列队,大部分都实现了JMI API 如:ActiveMQ,Redis以及RabbitMQ 等
2.3、优缺点: 优点:应用耦合、异步处理、流量削峰
-
解耦:
传统模式
传统模式缺点:
系统间耦合性太强,如上图所示,系统A在代码中直接调用系统B和系统C的代码,如果将D系统接入,系统A还需要修改代码,太麻烦!
中间件模式:
中间件模式优点:
将消息写入列队,需要消息的系统自己从消息列队中订阅,从而系统A不需要做如何修改。
-
异步
传统模式:
传统模式缺点:
一些非必要的业务逻辑以同步的方式运行,太耗费时间。
中间件模式:
中间件模式优点:
使用消息队列发送消息,减少耗时。
-
削峰
传统模式:
传统模式缺点:
并发量大的时候,所有的请求直接怼到数据库,造成数据库连接异常。
中间件模式:
中间件模式的优点:系统A慢慢的按照数据库能处理的并发量,从消息队列中慢慢拉取消息。在生产中,这个短暂的最高峰期积压是允许的。
缺点:系统可用性低、系统复杂性增加
2.4、使用场景: 消息列队,是分布式系统中重要的组件,其通用的使用场景可以简单的描述为:当不需要立即获取结果,但是并发量又需要进行控制的时候,差不多就是需要使用消息列队的时候。
在项目中,将一些无需及时返回且耗时的 *** 作提取出来,进行了异步处理,而这种异步处理的方式大大的节省了服务器请求的响应时间,从而提高了系统的吞吐量。
2.5、为什么使用RabbitMQ: AMQP,即Advanced Meassage Queueing Protocol,高级消息列队协议,是应用层的一个开发标准,为面向消息的中间件设计。消息中间件主要用于组件之间的解耦,消息的发送者无需知道消息的使用者的存在,反之亦然。
AMQP的主要特征是面向消息、列队、路由(包括点对点和发布/订阅)、可靠性、安全。
RabbitMQ是一个开源的AMQP实现,服务器端用Erlang语言编写,支持多种客户端,如:Python、Ruby、.NET、Java、JMS、C、PHP、Actionscript、XMPP、STOMP等,支持AJAX。用于分布式系统中存储转发消息,在易用性、扩展性、高可用性等方面表现不俗。
总结如下
- 基于AMQP协议
- 高并发(服务器可以接受最大任务数量)
- 高性能(单位时间内服务器可以处理的任务数)
- 高可用(单位时间内的服务器可以正常工作的时间比例)
- 强大的社区支持,以及很多公司都在使用
- 支持插件
- 支持多语言
brew install erlang
erlang对应Rabbit版本:
测试erlang是否安装成功:
官网下载地址:https://www.rabbitmq.com/download.html
开启RabbitMQ图形化管理界面插件:rabbitmq-plugins enable rabbitmq_management、关闭RabbitMQ图形化管理界面插件:rabbitmq-plugins disable rabbitmq_management
- 使用rabbitmq-plugins list指令查看 rabbitmq 的插件启动情况:
开启RabbitMQ服务rabbitmq-service、关闭RabbitMq服务rabbitmqctl stop
在浏览器访问localhost:15672进入rabbitmq图形界面管理登陆系统:
默认用户名:guest ,默认密码: guest
登陆之后进入rabbitmq图形界面管理系统:
创建成功:
4.2、添加一个名称为web的用户:每次创建虚拟主机guest用户会默认加入虚拟主机
添加成功:
4.3、将web用户添加到虚拟主机:添加成功:
4.4、使用java代码实现AMQP:导入依赖
4.4.1、创建简单列队:com.rabbitmq amqp-client5.4.3
简单列队:生产者将消息发送到“hello”队列。消费者从该队列接收消息。
4.4.1.1:创建简单列队生产者:public class Send { //定义队列名称 private static final String QUEUE_NAME = "hello"; public static void main(String[] args) throws Exception { //创建连接工厂 ConnectionFactory connectionFactory = new ConnectionFactory(); //连接工厂配置 connectionFactory.setHost("localhost"); //端口号 connectionFactory.setPort(5672); //用户名 connectionFactory.setUsername("web"); //用户密码 connectionFactory.setPassword("web"); //虚拟主机名 connectionFactory.setVirtualHost("/web"); //创建连接 try (Connection connection = connectionFactory.newConnection(); //创建信道 Channel channel = connection.createChannel()){ channel.queueDeclare(QUEUE_NAME,false,false,false,null); //准备消息 String message = "Hello world"; //发送消息 channel.basicPublish("",QUEUE_NAME,null,message.getBytes(StandardCharsets.UTF_8)); System.out.println(message); } } }
4.4.1.2:创建简单列队消费者:启动生产者服务:
消息堵塞:
public class Recv { //定义队列名称 private static final String QUEUE_NAME = "hello"; public static void main(String[] args) throws Exception{ //创建连接工厂 ConnectionFactory connectionFactory = new ConnectionFactory(); //连接工厂配置 connectionFactory.setHost("localhost"); //端口号 connectionFactory.setPort(5672); //用户名 connectionFactory.setUsername("web"); //用户密码 connectionFactory.setPassword("web"); //虚拟主机名 connectionFactory.setVirtualHost("/web"); //创建信道 Connection connection = connectionFactory.newConnection(); Channel channel = connection.createChannel(); //绑定队列 channel.queueDeclare(QUEUE_NAME,false,false,false,null); //打印消息 DeliverCallback deliverCallback = (consumerTag,delivery)->{ String message = new String(delivery.getBody(),"UTF-8"); System.out.println(message); }; channel.basicConsume(QUEUE_NAME,true,deliverCallback,consumerTag->{}); } }
4.4.2:创建工作列队启动消费者:
消费者消费消息:
工作列队:(一个生产者对应多个消费者,但是只能有一个消费者获得消息!!!)
4.4.2.1:创建工作列队-生产者:public class Send { //定义队列名称 private static final String QUEUE_NAME = "work_fair"; public static void main(String[] args) throws Exception { //创建连接工厂 ConnectionFactory connectionFactory = new ConnectionFactory(); //连接工厂配置 connectionFactory.setHost("localhost"); connectionFactory.setPort(5672); connectionFactory.setUsername("web"); connectionFactory.setPassword("web"); connectionFactory.setVirtualHost("/web"); //创建连接 try (Connection connection = connectionFactory.newConnection(); //创建信道 Channel channel = connection.createChannel()){ channel.queueDeclare(QUEUE_NAME,false,false,false,null); //准备消息 String message = "Hello world"; //发送消息 for (int i = 0; i < 20; i++) { channel.basicPublish("",QUEUE_NAME,null,(message+i).getBytes(StandardCharsets.UTF_8)); System.out.println(message+i); } } } }
4.4.2.2:创建工作列队-消费者01启动生产者:
消息堵塞:
public class Recv01 { //定义队列名称 private static final String QUEUE_NAME = "work_fair"; public static void main(String[] args) throws Exception{ //创建连接工厂 ConnectionFactory connectionFactory = new ConnectionFactory(); //连接工厂配置 connectionFactory.setHost("localhost"); connectionFactory.setPort(5672); connectionFactory.setUsername("web"); connectionFactory.setPassword("web"); connectionFactory.setVirtualHost("/web"); Connection connection = connectionFactory.newConnection(); Channel channel = connection.createChannel(); //绑定队列 channel.queueDeclare(QUEUE_NAME,false,false,false,null); //限制消费1条消息,消费完在继续消费下一天消息(限流) int prefetchCount = 1; channel.basicQos(prefetchCount); //打印消息 DeliverCallback deliverCallback = (consumerTag,delivery)->{ String message = new String(delivery.getBody(),"UTF-8"); System.out.println(message); //消费者01接收一条消息后休眠10毫秒 try { Thread.sleep(10); } catch (InterruptedException e) { e.printStackTrace(); } channel.basicAck(delivery.getEnvelope().getDeliveryTag(),false); }; channel.basicConsume(QUEUE_NAME,false,deliverCallback,consumerTag->{}); } }
启动消费者:每个消费者每隔10 毫秒取一条消息
消费者01:成功取得列队消息->
消费者02:成功取得列队消息->
(消费者02代码和消费者01一样)
4.4.3:发布订阅列队一个生产者将消息首先发送到交换器,交换器绑定到多个队列,然后被监听该队列的消费者所接收并消费。
4.4.3.1:创建发布列队-生产者:public class Send { //定义发布队列名称 private static final String EXCHANGE_NAME = "exchange_fanout"; public static void main(String[] args) throws Exception { //创建连接工厂 ConnectionFactory connectionFactory = new ConnectionFactory(); //连接工厂配置 connectionFactory.setHost("localhost"); connectionFactory.setPort(5672); connectionFactory.setUsername("web"); connectionFactory.setPassword("web"); connectionFactory.setVirtualHost("/web"); //创建连接 try (Connection connection = connectionFactory.newConnection(); //创建信道 Channel channel = connection.createChannel()){ channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.FANOUT); //准备消息 String message = "Hello world"; //发送消息 channel.basicPublish(EXCHANGE_NAME,"",null,message.getBytes(StandardCharsets.UTF_8)); System.out.println(message); } } }
4.4.3.2:创建发布列队-消费者:启动成功:成功将消息绑定到交换机上
public class Recv01 { //定义订阅队列名称 private static final String EXCHANGE_NAME = "exchange_fanout"; public static void main(String[] args) throws Exception{ //创建连接工厂 ConnectionFactory connectionFactory = new ConnectionFactory(); //连接工厂配置 connectionFactory.setHost("localhost"); connectionFactory.setPort(5672); connectionFactory.setUsername("web"); connectionFactory.setPassword("web"); connectionFactory.setVirtualHost("/web"); Connection connection = connectionFactory.newConnection(); Channel channel = connection.createChannel(); //绑定交换机 channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.FANOUT); //声明队列,排他队列 String queue = channel.queueDeclare().getQueue(); //队列和交换机绑定 channel.queueBind(queue,EXCHANGE_NAME,""); //打印消息 DeliverCallback deliverCallback = (consumerTag,delivery)->{ String message = new String(delivery.getBody(),"UTF-8"); System.out.println(message); }; channel.basicConsume(queue,true,deliverCallback,consumerTag->{}); } }
4.4.4:路由列队:启动订阅消费者:所有订阅消费者都可以获得消息。
生产者将消息发送到direct交换器,在绑定队列和交换器的时候有一个路由key,生产者发送的消息会指定一个路由key,那么消息只会发送到相应key相同的队列,接着监听该队列的消费者消费消息。
也就是让消费者有选择性的接收消息。
4.4.4.1:创建路由列队-生产者:public class Send { //定义队列名称 private static final String EXCHANGE_NAME = "exchange_direct"; public static void main(String[] args) throws Exception { //创建连接工厂 ConnectionFactory connectionFactory = new ConnectionFactory(); //连接工厂配置 connectionFactory.setHost("localhost"); connectionFactory.setPort(5672); connectionFactory.setUsername("web"); connectionFactory.setPassword("web"); connectionFactory.setVirtualHost("/web"); //创建连接 try (Connection connection = connectionFactory.newConnection(); //创建信道 Channel channel = connection.createChannel()){ channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT); //准备消息 String message = "Hello world"; channel.basicPublish(EXCHANGE_NAME,"orange",null,message.getBytes(StandardCharsets.UTF_8)); channel.basicPublish(EXCHANGE_NAME,"green",null,message.getBytes(StandardCharsets.UTF_8)); } } }
4.4.4.2:创建路由列队-消费者:启动路由生产者:
会将消息发送到名为EXCHANGE_NAME的交换机中,分别将消息key设置为orange和green
public class Secv01 { //定义队列名称 private static final String EXCHANGE_NAME = "exchange_direct"; public static void main(String[] args) throws Exception{ //创建连接工厂 ConnectionFactory connectionFactory = new ConnectionFactory(); //连接工厂配置 connectionFactory.setHost("localhost"); connectionFactory.setPort(5672); connectionFactory.setUsername("web"); connectionFactory.setPassword("web"); connectionFactory.setVirtualHost("/web"); Connection connection = connectionFactory.newConnection(); Channel channel = connection.createChannel(); //绑定交换机 channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT); //声明队列,排他队列 String queue = channel.queueDeclare().getQueue(); //队列和交换机绑定 channel.queueBind(queue,EXCHANGE_NAME,"black"); channel.queueBind(queue,EXCHANGE_NAME,"green"); //打印消息 DeliverCallback deliverCallback = (consumerTag,delivery)->{ String message = new String(delivery.getBody(),"UTF-8"); System.out.println(message); }; channel.basicConsume(queue,true,deliverCallback,consumerTag->{}); } }
4.4.5:主题路由列队:(使用最多的模式,通过模糊匹配,使得 *** 作更加自如)启动消费者:
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-HXBlxAVB-1634903973672)(/Users/haoruijie/Library/Application Support/typora-user-images/image-20211022135436669.png)]
消费者01会取到交换机名为EXCHANGE_NAME,key值为green和orange 而消费者02只会收到key值为orange的消息
通过通配符模式来判断路由key通俗的来讲就是模糊匹配。
4.4.5.1:创建主题路由列队-生产者:*.匹配一个字符 #.匹配所有字符
public class Send { //定义队列名称 private static final String EXCHANGE_NAME = "exchange_topic"; public static void main(String[] args) throws Exception { //创建连接工厂 ConnectionFactory connectionFactory = new ConnectionFactory(); //连接工厂配置 connectionFactory.setHost("localhost"); connectionFactory.setPort(5672); connectionFactory.setUsername("web"); connectionFactory.setPassword("web"); connectionFactory.setVirtualHost("/web"); //创建连接 try (Connection connection = connectionFactory.newConnection(); //创建信道 Channel channel = connection.createChannel()){ channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.TOPIC); //准备消息 String message1 = "Hello world1"; String message2 = "Hello world2"; String message3 = "Hello world3"; //设置交换机key值 String routingKey1 = "quick.orange.rabbit"; String routingKey2 = "lazy.pink.rabbit"; String routingKey3 = "quick.hello.male"; //发送消息 channel.basicPublish(EXCHANGE_NAME,routingKey1,null,message1.getBytes(StandardCharsets.UTF_8)); channel.basicPublish(EXCHANGE_NAME,routingKey2,null,message2.getBytes(StandardCharsets.UTF_8)); channel.basicPublish(EXCHANGE_NAME,routingKey3,null,message3.getBytes(StandardCharsets.UTF_8)); } } }
4.4.5.2:创建主题路由列队-消费者:启动路由生产者:
会将消息发送到名为EXCHANGE_NAME的交换机中,分别将消息key设置为routingKey1、routingKey2和routingKey3
public class Recv01 { //定义队列名称 private static final String EXCHANGE_NAME = "exchange_topic"; public static void main(String[] args) throws Exception{ //创建连接工厂 ConnectionFactory connectionFactory = new ConnectionFactory(); //连接工厂配置 connectionFactory.setHost("localhost"); connectionFactory.setPort(5672); connectionFactory.setUsername("web"); connectionFactory.setPassword("web"); connectionFactory.setVirtualHost("/web"); Connection connection = connectionFactory.newConnection(); Channel channel = connection.createChannel(); //绑定交换机 channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.TOPIC); //声明队列,排他队列 String queue = channel.queueDeclare().getQueue(); //队列和交换机绑定绑定key channel.queueBind(queue,EXCHANGE_NAME,"*.orange.*"); channel.queueBind(queue,EXCHANGE_NAME,"lazy.#"); //打印消息 DeliverCallback deliverCallback = (consumerTag,delivery)->{ String message = new String(delivery.getBody(),"UTF-8"); System.out.println(message); }; channel.basicConsume(queue,true,deliverCallback,consumerTag->{}); } }
4.5、事务:启动消费者
运行结果:
消费者01只会匹配routingKey1和routingKey2的消息
使用事务会大幅度降低性能 (一般不会使用) 开启事务会知道生产者是否将消息成功提交到列队里
4.5.1创建事务列队: 4.5.1.1:创建事务列队-生产者:public class Send { //定义队列名称 private static final String QUEUE_NAME = "tx"; public static void main(String[] args) throws Exception { //创建连接工厂 ConnectionFactory connectionFactory = new ConnectionFactory(); //连接工厂配置 connectionFactory.setHost("localhost"); connectionFactory.setPort(5672); connectionFactory.setUsername("web"); connectionFactory.setPassword("web"); connectionFactory.setVirtualHost("/web"); //创建连接 Connection connection = null; //创建信道 Channel channel = null; try { connection = connectionFactory.newConnection(); channel = connection.createChannel(); //开启事务 channel.txSelect(); channel.queueDeclare(QUEUE_NAME,false,false,false,null); //准备消息 String message = "Hello world"; //发送消息 channel.basicPublish("",QUEUE_NAME,null,message.getBytes(StandardCharsets.UTF_8)); //制造异常(遇到异常事务回滚) int a = 1/0; //提交事务 channel.txCommit(); }catch (Exception e){ //事务回滚 channel.txRollback(); e.printStackTrace(); }finally { //关闭连接 if (channel!=null){ channel.close(); } if (connection!=null){ connection.close(); } } } }4.5.1.2:创建事务列队-消费者:
public class Recv { //定义队列名称 private static final String QUEUE_NAME = "tx"; public static void main(String[] args) throws Exception{ //创建连接工厂 ConnectionFactory connectionFactory = new ConnectionFactory(); //连接工厂配置 connectionFactory.setHost("localhost"); connectionFactory.setPort(5672); connectionFactory.setUsername("yeb"); connectionFactory.setPassword("yeb"); connectionFactory.setVirtualHost("/yeb"); Connection connection = connectionFactory.newConnection(); Channel channel = connection.createChannel(); //绑定队列 channel.queueDeclare(QUEUE_NAME,false,false,false,null); //打印消息 DeliverCallback deliverCallback = (consumerTag,delivery)->{ String message = new String(delivery.getBody(),"UTF-8"); System.out.println(message); }; channel.basicConsume(QUEUE_NAME,true,deliverCallback,consumerTag->{}); } }
4.6、确认模式:启动消费者:
启动生产者: 发现异常事务回调 消费者没有消息消费
(确认生产者是否把消息发送到了服务器)
4.6.1:同步确认:(同步确认会影响性能一般不会使用)
4.6.1.1:创建同步-确认-生产者:public class Send { //定义队列名称 private static final String QUEUE_NAME = "sync"; public static void main(String[] args) throws Exception { //创建连接工厂 ConnectionFactory connectionFactory = new ConnectionFactory(); //连接工厂配置 connectionFactory.setHost("localhost"); connectionFactory.setPort(5672); connectionFactory.setUsername("web"); connectionFactory.setPassword("web"); connectionFactory.setVirtualHost("/web"); //创建连接 Connection connection = null; //创建信道 Channel channel = null; try { connection = connectionFactory.newConnection(); channel = connection.createChannel(); //启动确认模式 channel./confirm/iSelect(); channel.queueDeclare(QUEUE_NAME,false,false,false,null); //准备消息 String message = "Hello world"; //发送消息 channel.basicPublish("",QUEUE_NAME,null,message.getBytes(StandardCharsets.UTF_8)); //普通确认,只能单条确认 if(channel.waitFor/confirm/is()){ System.out.println("确认成功!"); } //普通批量确认 ,如果有一条不成功就会抛异常,全部成功不会抛异常 //channel.waitForConfirmsOrDie(); }catch (Exception e){ e.printStackTrace(); }finally { if (channel!=null){ channel.close(); } if (connection!=null){ connection.close(); } } } }
4.6.2:异步确认:启动生产者:
发送消息成功:
(异步确认效率是最高的)
4.6.2.1:创建异步-确认-生产者:public class Send { //定义队列名称 private static final String QUEUE_NAME = "async"; public static void main(String[] args) throws Exception { //创建连接工厂 ConnectionFactory connectionFactory = new ConnectionFactory(); //连接工厂配置 connectionFactory.setHost("localhost"); connectionFactory.setPort(5672); connectionFactory.setUsername("yeb"); connectionFactory.setPassword("yeb"); connectionFactory.setVirtualHost("/yeb"); //创建连接 Connection connection = null; //创建信道 Channel channel = null; try { final SortedSetset = Collections.synchronizedSortedSet(new TreeSet ()); connection = connectionFactory.newConnection(); channel = connection.createChannel(); //启动确认模式 channel./confirm/iSelect(); channel.queueDeclare(QUEUE_NAME,false,false,false,null); //天际channel监听 channel.add/confirm/iListener(new /confirm/iListener() { //已确认 @Override public void handleAck(long l, boolean b) throws IOException { //b为true确认多条成功 为false确认单条成功 if (b){ System.out.println("确认多条成功"); set.headSet(l+1L).clear(); }else { System.out.println("确认单条成功"+l); set.remove(l); } } //未确认 @Override public void handleNack(long l, boolean b) throws IOException { //b为true多条未确认 为false单条未确认 if (b){ System.out.println("多条未确认"); set.headSet(l+1L).clear(); }else { System.out.println("单体未确认"+l); set.remove(l); } } }); int i =0; while (i<20){ i++; //准备消息 String message = "Hello world"+i; Long seqNo = channel.getNextPublishSeqNo(); //发送消息 channel.basicPublish("",QUEUE_NAME,null,message.getBytes(StandardCharsets.UTF_8)); set.add(seqNo); System.out.println("[x] Sent'"+message+"'"); } }catch (Exception e){ e.printStackTrace(); }finally { if (channel!=null){ channel.close(); } if (connection!=null){ connection.close(); } } } }
4.7、使用springBoot 简单的实现AMQP: 4.7.1:创建springboot多模块项目: 4.7.1.1:创建消费者模块、并继承父模块: 4.7.1.2:编写消费者模块配置文件:启动生产者:
确认发送消息成功:
消息发送成功:
#配置端口号 server: port: 8002 spring: #Rabbitmq生产出配置 rabbitmq: #ip host: 127.0.0.1 #用户名 username: guest #密码 password: guest #端口 port: 56724.7.1.3:编写消费者测试类:
@Component public class Test { @RabbitListener(queues = "hello") public void demo(String hello){ System.out.println(hello); } }4.7.1.4:编写queue配置文件:
@Configuration public class RabbitmqConfig { @Bean public Queue queue(){ return new Queue("hello"); } }4.7.1.4:启动消费者服务:
启动成功hello列队已存在
4.7.2.1:创建生产者模块、继承父模块: 4.7.2.2:编写生产者配置文件:#配置端口号 server: port: 8001 spring: #Rabbitmq生产出配置 rabbitmq: #ip host: 127.0.0.1 #用户名 username: web #密码 password: web #端口 port: 56724.7.2.3:编写生产者测试类:
@Component public class Test { @Autowired private RabbitTemplate template; public void demo(){ template.convertAndSend("hello","hello world"); } }4.7.2.4:发送消息:
发送被消费者立即消费
欢迎分享,转载请注明来源:内存溢出
评论列表(0条)