rabbitmq使用详解

rabbitmq使用详解,第1张

rabbitmq使用详解 1. 消息中间件概述 1.1. MQ概述 MQ 全称 Message Queue (消息队列),是在消息的传输过程中保存消息的容器。多用于分布式系统 之间进行通信。 1.2.MQ的优势: 1 、应用解耦      MQ 相当于一个中介,生产方通过 MQ 与消费方交互,它将应用程序进行解耦合。 系统的耦合性越高,容错性就越低,可维护性就越低。 使用 MQ 使得应用间解耦,提升容错性和可维护性。 2 、任务异步处理     将不需要同步处理的并且耗时长的 *** 作由消息队列通知消息接收方进行异步处理。提高了应用程序的响 应时间。 提升用户体验和系统吞吐量(单位时间内处理请求的数目)。 3 、削峰填谷     如订单系统,在下单的时候就会往数据库写数据。但是数据库只能支撑每秒 1000 左右的并发写入,并发 量再高就容易宕机。低峰期的时候并发也就100 多个,但是在高峰期时候,并发量会突然激增到 5000 以 上,这个时候数据库肯定卡死了。消息被 MQ 保存起来了,然后系统就可以按照自己的消费能力来消费,比如每秒 1000个消息,这样慢慢 写入数据库,这样就不会卡死数据库了。 但是使用了 MQ 之后,限制消费消息的速度为 1000 ,但是这样一来,高峰期产生的数据势必会被积压在 MQ中,高峰就被 “ 削 ” 掉了。但是因为消息积压,在高峰期过后的一段时间内,消费消息的速度还是会 维持在1000QPS ,直到消费完积压的消息 , 这就叫做 “ 填谷". 2.RabbitMQ. RabbitMQ 基础架构:

RabbitMQ 中的相关概念:
  • Broker:接收和分发消息的应用,RabbitMQ Server就是 Message Broker
  • Virtual host:出于多租户和安全因素设计的,把 AMQP 的基本组件划分到一个虚拟的分组中,类似于网络中的 namespace 概念。当多个不同的用户使用同一个 RabbitMQ server 提供的服务时,可以划分出多个vhost,每个用户在自己的 vhost 创建 exchange/queue 等
  • Connection:publisher/consumer 和 broker 之间的 TCP 连接
  • Channel:如果每一次访问 RabbitMQ 都建立一个 Connection,在消息量大的时候建立 TCP Connection的开销将是巨大的,效率也较低。Channel 是在 connection 内部建立的逻辑连接, 如果应用程序支持多线程,通常每个thread创建单独的 channel 进行通讯,AMQP method 包含 了channel id 帮助客户端和message broker 识别 channel,所以 channel 之间是完全隔离的。 Channel 作为轻量级的 Connection 极大减少了 *** 作系统建立 TCP connection 的开销.
  • Exchange:message 到达 broker 的第一站,根据分发规则,匹配查询表中的 routing key,分发 消息到queue 中去。常用的类型有:direct (point-to-point), topic (publish-subscribe) and fanout (multicast)
  • Queue:消息最终被送到这里等待 consumer 取走
  • Binding:exchange 和 queue 之间的虚拟连接,binding 中可以包含 routing key。Binding 信息 被保存到 exchange 中的查询表中,用于 message 的分发依据
交换机 的类型。 Exchange 有常见以下 3 种类型:
  1. Fanout:广播,将消息交给所有绑定到交换机的队列
  2. Direct:定向,把消息交给符合指定routing key 的队列
  3. Topic:通配符,把消息交给符合routing pattern(路由模式) 的队列
RabbitMQ6种使用模式 RabbitMQ 提供了 6 种模式:简单模式, work 模式, Publish/Subscribe 发布与订阅模式, Routing 路由 模式,Topics 主题模式, RPC 远程调用模式(远程调用,不太算 MQ ;暂不作介绍); 以下五种模式使用时思路相同 1.生产者端
  • 1.创建连接工厂
  • 2.连接工厂创建连接
  • 3.连接创建频道
  • 4.频道声明交换机名称,类型(简单模式和work模式不需要)
  • 5.频道声明队列(简单模式和work路由用队列名替代)
  • 6.队列绑定交换机(简单模式和work模式不需要)
  • 7.频道给指定路由发送消息(简单模式和work路由用队列名替代,交换机名用"")
  • 8.关闭资源
2.消费者端
  1. 创建和生产者相同的连接工厂
  2. 创建连接
  3. 创建频道
  4. 频道声明交换机名称,类型(简单模式和work模式不需要)
  5. 队列绑定交换机(简单模式和work模式不需要),频道声明要接收的队列,接收队列的路由(简单模式和work路由用队列名替代)
  6. 定义消费者并设置消息处理的回调方法
  7. 监听接收消息(不用关闭资源) 

简单模式:使用默认的交换机,使用时用" "代替,声明的队列不用声明绑定默认的交换机(队列绑定交换机这一步可省略),路由名称可用队列名替代,一个生产者对应一个队列,一个消费者

work模式:使用默认的交换机,使用时用" "代替,声明的队列不用声明绑定默认的交换机(队列绑定交换机这一步可省略),路由名称可用队列名替代,一个生产者对应一个队列,多个消费者,多个消费者竞争这个队列里的消息

发布订阅模式,交换机名称自定义,交换机类型FanOut,路由用""替代,一个生产者对应一个交换机,多个队列,多个消费者,每个队列都可以得到交换机中的所有消息,每个消费者获得声明的接收队列里的消息.

路由模式:交换机名称自定义,交换机类型Direct,路由名称自定义,可写多个,一个生产者对应一个交换机,多个队列,多个消费者,每个队列都是不同的路由,消息内容不同,,每个消费者获得声明的路由接收队列里的消息.

统配符模式:(同路由模式,用*替代任意一个单词,用#匹配一个或多个单词)交换机名称自定义,交换机类型Topic,路由名称自定义,可写多个,一个生产者对应一个交换机,多个队列,多个消费者,每个队列都是不同的路由,消息内容不同,,每个消费者获得声明的路由接收队列里的消息.

Routingkey(路由)  一般都是有一个或多个单词组成,多个单词之间以 ”.” 分割,例如: item.insert 通配符规则: # :匹配一个或多个词 * :匹配不多不少恰好 1 个词 举例: item.# :能够匹配 item.insert.abc 或者 item.insert item.* :只能匹配 item.insert

总的来说:流程就是,生产者经不同路由发出的消息都会经交换机转发给匹配不同路由的队列,最后再由消费者接收不同队列路由的消息(可参考示例代码理解,这里演示通配符模式--其中包含路由模式)

创建一个maven项目,引入下面依赖

  
        
            com.rabbitmq
            amqp-client
            5.6.0
        
    

消息生产者

import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

public class Provider {
    public static final String  TOPIC_EXCHANGE="topic_exchange";
    public static final String  TOPIC_QUEUE_ALL="topic_queue_all";
    public static final String  TOPIC_QUEUE_INSERT_DELETE="topic_queue_insert_delete";
    public static void main(String[] args) throws IOException, TimeoutException {
        //1.创建连接工厂
        ConnectionFactory connectionFactory = new ConnectionFactory();//创建连接工厂
        connectionFactory.setHost("192.168.61.128");//主机地址;默认为 localhost
        connectionFactory.setPort(5672);//连接端口;默认为 5672
        connectionFactory.setVirtualHost("/zwh");//虚拟主机名称;默认为 /  ,需要提前在rabbitmq控制台创建
        connectionFactory.setUsername("root");//连接用户名;默认为guest
        connectionFactory.setPassword("root");//连接密码;默认为guest
        //2.释放连接
        Connection connection = connectionFactory.newConnection();
        //3.创建频道
        Channel channel = connection.createChannel();
        //4.声明交换机(//参数1:交换机名称,参数2:交换机类型)
        channel.exchangeDeclare(TOPIC_EXCHANGE, BuiltinExchangeType.TOPIC);
        //5.声明队列
             
        channel.queueDeclare(TOPIC_QUEUE_ALL, true, false, false, null);
        channel.queueDeclare(TOPIC_QUEUE_INSERT_DELETE, true, false, false, null);
        //6.队列绑定交换机
        
        channel.queueBind(TOPIC_QUEUE_ALL,TOPIC_EXCHANGE,"item.#");
        channel.queueBind(TOPIC_QUEUE_INSERT_DELETE,TOPIC_EXCHANGE,"item.insert");
        channel.queueBind(TOPIC_QUEUE_INSERT_DELETE,TOPIC_EXCHANGE,"item.delete");
        //7.发送消息
        
        for (int i = 0; i < 3; i++) {
            String message = "你好,梅丽莎-----通配符模式----新增-------"+i;
            channel.basicPublish(TOPIC_EXCHANGE, "item.insert", null, message.getBytes());
            System.out.println(message);

        }
        for (int i = 0; i < 3; i++) {
            String message = "你好,梅丽莎-----通配符模式----修改-------"+i;
            channel.basicPublish(TOPIC_EXCHANGE, "item.update", null, message.getBytes());
            System.out.println(message);
        }
        for (int i = 0; i < 3; i++) {
            String message = "你好,梅丽莎-----通配符模式----删除-------"+i;
            channel.basicPublish(TOPIC_EXCHANGE, "item.delete", null, message.getBytes());
            System.out.println(message);
        }
        //8.关闭资源
        channel.close();
        connection.close();

    }
}

消费者1,接收匹配 item.*(例item.insert,只要item后面跟的是一个单词就可以),用item.#也可以,表示匹配以item.开头的(路由键中可以包含任意多的单词,最多不超过255个字节。)

import com.rabbitmq.client.*;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

public class Consumer1 {


    public static void main(String[] args) throws IOException, TimeoutException {
        //1.创建连接工厂
        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setHost("192.168.61.128");
        connectionFactory.setPort(5672);
        connectionFactory.setVirtualHost("/zwh");
        connectionFactory.setUsername("root");
        connectionFactory.setPassword("root");
        //2.释放连接
        Connection connection = connectionFactory.newConnection();
        //3.创建频道
        Channel channel = connection.createChannel();
        //4.声明交换机
        channel.exchangeDeclare(Provider.TOPIC_EXCHANGE, BuiltinExchangeType.TOPIC);
        //5.声明队列

        channel.queueDeclare(Provider.TOPIC_QUEUE_ALL, true, false, false, null);

        //6.队列绑定交换机
        channel.queueBind(Provider.TOPIC_QUEUE_ALL,Provider.TOPIC_EXCHANGE,"item.#");

        //5.定义接收消息后的回调接口
        DefaultConsumer consumer = new DefaultConsumer(channel) {
            @Override
            
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                System.out.println("路由key为:" + envelope.getRoutingKey()); //
                // 交换机

                System.out.println("交换机为:" + envelope.getExchange());
                // 消息id

                System.out.println("消息id为:" + envelope.getDeliveryTag());
                // 收到的消息

                System.out.println("consume1接收到的消息为:" + new String(body, "utf-8"));
            }
        };
        // 接收消息
        channel.basicConsume(Provider.TOPIC_QUEUE_ALL,true,consumer);

    }
}

消费者2

import com.rabbitmq.client.*;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

public class Consumer2 {


    public static void main(String[] args) throws IOException, TimeoutException {
        //1.创建连接工厂
        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setHost("192.168.61.128");
        connectionFactory.setPort(5672);
        connectionFactory.setVirtualHost("/zwh");
        connectionFactory.setUsername("root");
        connectionFactory.setPassword("root");
        //2.释放连接
        Connection connection = connectionFactory.newConnection();
        //3.创建频道
        Channel channel = connection.createChannel();
        //4.声明交换机
        channel.exchangeDeclare(Provider.TOPIC_EXCHANGE, BuiltinExchangeType.TOPIC);
        //5.声明队列

        channel.queueDeclare(Provider.TOPIC_QUEUE_INSERT_DELETE, true, false, false, null);

        //6.队列绑定交换机
        channel.queueBind(Provider.TOPIC_QUEUE_INSERT_DELETE,Provider.TOPIC_EXCHANGE,"item.insert");
        channel.queueBind(Provider.TOPIC_QUEUE_INSERT_DELETE,Provider.TOPIC_EXCHANGE,"item.delete");

        //5.定义接收消息后的回调接口
        DefaultConsumer consumer = new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                System.out.println("路由key为:" + envelope.getRoutingKey()); //
                // 交换机

                System.out.println("交换机为:" + envelope.getExchange());
                // 消息id

                System.out.println("消息id为:" + envelope.getDeliveryTag());
                // 收到的消息

                System.out.println("consume2接收到的消息为:" + new String(body, "utf-8"));
            }
        };
        // 接收消息
        channel.basicConsume(Provider.TOPIC_QUEUE_INSERT_DELETE,true,consumer);

    }
}

结果: consume1接收到9条消息,consume2接收到6条消息,

 

3.springmvc整合rabbitmq 消息生产者端

1.引入pom依赖(整合rabbitmq)

 
        
            org.springframework
            spring-context
            5.1.7.RELEASE
        
        
            org.springframework.amqp
            spring-rabbit
            2.1.8.RELEASE
        
        
            junit
            junit
            4.12
        
        
            org.springframework
            spring-test
            5.1.7.RELEASE
        
    

2.定义rabbitmq的连接配置 rabbitmq.properties

rabbitmq.host=192.168.61.128
rabbitmq.port=5672
rabbitmq.username=root
rabbitmq.password=root
rabbitmq.virtual-host=/zwh

3.在spring的配置文件 springcontext.xml文件中定义和管理交换机和队列(这里定义了一个简单模式,两个广播模式,三个路由通配符模式)



    
    
    

    
    

    

 
    
    
    
    
        
        
    
    

    
    
    
    
        
            
            
            
        
    

    
    

4.测试(在控制台能够看到消息)

package com.zhao;

import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.test.context.ContextConfiguration;
import org.springframework.test.context.junit4.SpringJUnit4ClassRunner;

import java.awt.peer.ScrollbarPeer;

import static org.junit.Assert.*;
@RunWith(SpringJUnit4ClassRunner.class)
@ContextConfiguration(locations = "classpath:springcontext.xml")
public class RabbitMQTest {

@Autowired
    private RabbitTemplate rabbitTemplate;
@Test
    public void send() {
    
    rabbitTemplate.convertAndSend("simple_queue", "are you ok?");
}
    @Test
    public void send1() {
        
        rabbitTemplate.convertAndSend("fanoutExchange", "", "广播队列的消息");
    }
    @Test
    public void send2() {

        
        rabbitTemplate.convertAndSend("topicExchange","item","item发送的消息");
        rabbitTemplate.convertAndSend("topicExchange","item.delete","item.delete发送的消息");
        rabbitTemplate.convertAndSend("topicExchange","item.de.up","item.de.up发送的消息");
    }
}
消息消费者端

1.引入依赖

 
        
            org.springframework
            spring-context
            5.1.7.RELEASE
        
        
            org.springframework.amqp
            spring-rabbit
            2.1.8.RELEASE
        
        
            junit
            junit
            4.12
        
        
            org.springframework
            spring-test
            5.1.7.RELEASE
        
    

2.定义rabbitmq的连接配置 rabbitmq.properties

rabbitmq.host=192.168.61.128
rabbitmq.port=5672
rabbitmq.username=root
rabbitmq.password=root
rabbitmq.virtual-host=/zwh

3.定义每个队列的监听类,一个队列一个监听类,实现messageListener接口,代码基本相同,改一下类名

import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageListener;

public class FanoutQueueListener1 implements MessageListener {
    public void onMessage(Message message) {
        try {
            String msg = new String(message.getBody(), "utf-8");
            System.out.printf("广播:接收路由名称为:%s,路由键为:%s,队列名为:%s的消息:%s n",
                    message.getMessageProperties().getReceivedExchange(),
                    message.getMessageProperties().getReceivedRoutingKey(),
                    message.getMessageProperties().getConsumerQueue(), msg);
        } catch (Exception e) { e.printStackTrace();
        }
    }
}

4.在spring的配置文件 springcontext.xml文件中设置监听各自队列的消息,建立监听关系


 
    


    
    
    
    
    
    

    
        
        
        
        
        
        
    

5.测试:写一个死循环,保持与队列的连接(在bean容器创建的时候监听方法已经在等待执行了,所以只要启动测试类就可以接收到消息)

package com.zhao.listener;

import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.test.context.ContextConfiguration;
import org.springframework.test.context.junit4.SpringJUnit4ClassRunner;

import static org.junit.Assert.*;
@RunWith(SpringJUnit4ClassRunner.class)
@ContextConfiguration(locations = "classpath:springcontext.xml")
public class SimpleQueueListenerTest {

    @Test
    public void onMessage() {
        while(true){

        }
    }
}

欢迎分享,转载请注明来源:内存溢出

原文地址: https://www.outofmemory.cn/zaji/5676225.html

(0)
打赏 微信扫一扫 微信扫一扫 支付宝扫一扫 支付宝扫一扫
上一篇 2022-12-16
下一篇 2022-12-16

发表评论

登录后才能评论

评论列表(0条)

保存