RabbitMQ入门(二)——学习五种队列

RabbitMQ入门(二)——学习五种队列,第1张

rabbitMQ有五种队列:

简单队列、work模式、发布订阅模式、routing模式、topics模式

目录
  • 一、引入依赖
  • 二、 简单队列
    • 2.1 获取MQ连接
    • 2.2 生产者发送消息
    • 2.3 管理界面查看消息
    • 2.4 消费者接收消息
  • 三、 work模式
    • 3.1 生产者
    • 3.2 平均消费模式
    • 3.3 能者多劳模式
    • 3.4 消息的确认模式
      • 3.4.1 自动确认(true):
      • 3.4.2 手动确认(false) :
  • 四、发布订阅模式
    • 4.1 生产者
    • 4.2 消费者1
    • 4.3 消费者2
  • 五、routing模式
    • 5.1 生产者
    • 5.2 消费者1
    • 5.3 消费者2
    • 5.4 测试
  • 六、topics模式
    • 6.1 生产者
    • 5.2 消费者1
    • 6.3 消费者2
    • 6.4 测试

一、引入依赖
<dependency>
      <groupId>com.rabbitmqgroupId>
      <artifactId>amqp-clientartifactId>
      <version>3.4.1version>
dependency>
二、 简单队列

最简单的工作队列,一个生产者 P 发送消息到队列 Q,一个消费者 C 接收。也称为点对点模式。

2.1 获取MQ连接
public class ConnectionUtil {

    /**
     * 获取rabbitMQ连接
     * @return
     */
    public static Connection getConnection() throws IOException {
        //创建连接工厂
        ConnectionFactory connectionFactory = new ConnectionFactory();
        //设置连接ip
        connectionFactory.setHost("localhost");
        //设置端口
        connectionFactory.setPort(5672);
        //设置虚拟机
        connectionFactory.setVirtualHost("testhost");
        //设置用户名称
        connectionFactory.setUsername("admin");
        //设置密码
        connectionFactory.setPassword("admin");
        //获取连接
        Connection connection = connectionFactory.newConnection();
        return connection;
    }
}
2.2 生产者发送消息
public class send {
    //队列名称
    public static final String queueName= "q_test_01";

    public static void main(String[] args) throws IOException {
        //获取连接
        Connection connection = ConnectionUtil.getConnection();
        //创建通道
       Channel channel =  connection.createChannel();
        //声明队列
        channel.queueDeclare(queueName,false,false,false,null);

        //消息内容
        String message = "Hello,it's me";
        //推送消息
        channel.basicPublish("",queueName,null,message.getBytes());
        //关闭通道
        channel.close();
        //关闭连接
        connection.close();
    }
}
2.3 管理界面查看消息


2.4 消费者接收消息
public class receive {
    //队列名称
    public static final String queueName= "q_test_01";

    public static void main(String[] args) throws IOException, InterruptedException {
        //获取连接
        Connection connection = ConnectionUtil.getConnection();
        //创建通道
        Channel channel = connection.createChannel();
        //声明队列
        channel.queueDeclare(queueName,false,false,false,null);

        // 定义队列的消费者
        QueueingConsumer consumer = new QueueingConsumer(channel);

        //监听队列
        channel.basicConsume(queueName,true,consumer);
        //实时接收消息
        while(true){
            QueueingConsumer.Delivery delivery = consumer.nextDelivery();
            String message = new String(delivery.getBody());
            System.out.println("接收到信息:"+message);
        }
    }
}

控制台输出

同时从管理界面可以看到队列中的消息被消费

三、 work模式

一个生产者、两个消费者;一个消息只能被一个消费者获取。

work模式分为两种模式:

  • 两个消费者平均消费队列中的消息,即使他们的消费能力是不一样的(这种似乎不太符合实际的情况)
  • 能者多劳模式,处理消息能力强的消费者会获取更多的 消息(更符合实际需求)
3.1 生产者

向队列发送50条消息,每生产一条消息后都会休眠一段时间,并且越往后休眠的时间越长。

public class send {
    //队列名称
    public static final String queueName= "q_test_01";

    public static void main(String[] args) throws IOException, InterruptedException {
        //获取连接
        Connection connection = ConnectionUtil.getConnection();
        //创建通道
        Channel channel =  connection.createChannel();
        //声明队列
        channel.queueDeclare(queueName,false,false,false,null);

        for(int i=0;i<50;i++){
            String message = "message["+i+"]";
            //推送消息
            channel.basicPublish("",queueName,null,message.getBytes());
            //每次睡眠时间递增
            Thread.sleep(10*i);
        }
        //关闭通道
        channel.close();
        //关闭连接
        connection.close();
    }
}
3.2 平均消费模式

创建两个消费者:recv和recv2,各自每次消费消息时休眠时间不同,平均消费模式会无视休眠时间,两个消费者获得消息的数量是一样的。

  • 消费者recv(休眠时间:10ms)
public class recv{
    //消息队列名称
    private final static String QUEUE_NAME = "test_queue_work";

    public static void main(String[] argv) throws Exception {
        // 获取到连接
        Connection connection = ConnectionUtil.getConnection();
        //创建通道
        Channel channel = connection.createChannel();
        // 声明队列
        channel.queueDeclare(QUEUE_NAME, false, false, false, null);
        // 定义队列的消费者
        QueueingConsumer consumer = new QueueingConsumer(channel);
        // 监听队列,手动返回完成状态
        channel.basicConsume(QUEUE_NAME, false, consumer);
        // 获取消息
        while (true) {
            QueueingConsumer.Delivery delivery = consumer.nextDelivery();
            String message = new String(delivery.getBody());
            System.out.println(" 接收信息: '" + message + "'");
            // 休眠10豪秒
            Thread.sleep(10);
            channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
        }
    }
}
  • 消费者recv2(休眠时间:1000ms)
public class recv2{
//消息队列名称
    private final static String QUEUE_NAME = "test_queue_work";

    public static void main(String[] argv) throws Exception {
        // 获取到连接
        Connection connection = ConnectionUtil.getConnection();
        //创建通道
        Channel channel = connection.createChannel();
        // 声明队列
        channel.queueDeclare(QUEUE_NAME, false, false, false, null);
        // 定义队列的消费者
        QueueingConsumer consumer = new QueueingConsumer(channel);
        // 监听队列,手动返回完成状态
        channel.basicConsume(QUEUE_NAME, false, consumer);
        // 获取消息
        while (true) {
            QueueingConsumer.Delivery delivery = consumer.nextDelivery();
            String message = new String(delivery.getBody());
            System.out.println(" 接收信息: '" + message + "'");
            // 休眠1秒
            Thread.sleep(1000);
            channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
        }
    }
}

测试后发现两个消费者获得消息量是一致的。

3.3 能者多劳模式

在消费者中加入一下的代码,使同一时刻服务器只会发一条消息给消费者

channel.basicQos(1);

可以看到消费者recv收到了41条消息

消费者recv2收到了9条消息

这样 的结果才符合work模式的能者多劳模式,理能力高的人会接收到更多的消息。

3.4 消息的确认模式

从上面的消费者代码中,有一行跟简单队列的参数不一样。

/**
 * @var1 : 通道名称
 * @var2 : 是否自动确认消息(true,false)
 * @var3 :消费者对象
 **/
String basicConsume(String var1, boolean var2, Consumer var3) throws IOException;
3.4.1 自动确认(true):

只要消息从队列中获取,无论消费者获取到消息后是否成功消息,都认为是消息已经成功消费

3.4.2 手动确认(false) :

消费者从队列中获取消息后,服务器会将该消息标记为不可用状态,等待消费者的反馈,如果消费者一直没有反馈,那么该消息将一直处于不可用状态

消费者接收消息时,需要调用basicAck()进行反馈

四、发布订阅模式

一个生产者,一个交换器,多个消息队列,多个消费者。

交换机的类型为fanout


以用户注册为例,可以通过邮箱或者手机号注册用户,注册完后会向邮箱和手机号都发送注册完成信息。
实际上邮箱和手机号信息发送是不同的业务逻辑,不应该放在一块处理。这个时候就可以利用发布/订阅模式将消息发送到转换机(EXCHANGE),声明两个不同的队列(邮箱、手机),并绑定到交换机。这样生产者只需要发布一次消息,两个队列都会接收到消息发给对应的消费者。

4.1 生产者

向交换机发送20条消息,交换机将消息发送到多个队列

public class send {

    //交换机名称
    static final String TEST_EXCHANGE = "test_exchange";

    //消息队列1
    static final String TEST_QUEUE_1 = "test_subscribe_queue1";

    //消息队列2
    static final String TEST_QUEUE_2 = "test_subscribe_queue2";

    public static void main(String[] args) throws IOException {
        //获取连接
        Connection connection = ConnectionUtil.getConnection();
        //创建通道
        Channel channel = connection.createChannel();

        /**
         * 声明交换机
         * 参数1 : 交换机名称
         * 参数2 : 交换机类型,fanout、topic、direct、headers
         */
        channel.exchangeDeclare(TEST_EXCHANGE,"fanout");

        /**
         * 声明队列
         * 参数1 : 队列名称
         * 参数2:是否定义持久化队列
         * 参数3:是否独占本次连接
         * 参数4:是否在不使用的时候自动删除队列
         * 参数5:队列其它参数
         */
        channel.queueDeclare(TEST_QUEUE_1, true, false, false, null);
        channel.queueDeclare(TEST_QUEUE_2, true, false, false, null);

        //队列绑定交换机
        channel.queueBind(TEST_QUEUE_1, TEST_EXCHANGE, "");
        channel.queueBind(TEST_QUEUE_2, TEST_EXCHANGE, "");

        for(int i=0;i<20;i++){
            String message = "message[x] :"+i;

            /**
             * 推送消息
             * 参数1:交换机名称,如果没有指定则使用默认Default Exchage
             * 参数2:路由key,简单模式可以传递队列名称
             * 参数3:消息其它属性
             * 参数4:消息内容
             */
            channel.basicPublish(TEST_EXCHANGE,"",null,message.getBytes());
            System.out.println("已发送信息:"+message);
        }
        channel.close();
        connection.close();
    }

}
4.2 消费者1
public class Custom1 {

    public static void main(String[] args) throws IOException {
        //获取连接
        Connection connection = ConnectionUtil.getConnection();
        //创建通道
        Channel channel = connection.createChannel();

        //声明交换机
        channel.exchangeDeclare(send.TEST_EXCHANGE,"fanout");
        //声明队列
        channel.queueDeclare(send.TEST_QUEUE_1, true, false, false, null);
        //绑定交换机
        channel.queueBind(send.TEST_QUEUE_1, send.TEST_EXCHANGE, "");

        //创建消费者
        DefaultConsumer consumer = new DefaultConsumer(channel){
            /**
             * 重写数据处理
             * @param consumerTag
             * @param envelope
             * @param properties
             * @param body
             * @throws IOException
             */
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                //路由key
                System.out.println("路由key为:" + envelope.getRoutingKey());
                //交换机
                System.out.println("交换机为:" + envelope.getExchange());
                //消息id
                System.out.println("消息id为:" + envelope.getDeliveryTag());
                //收到的消息
                System.out.println("消费者1-接收到的消息为:" + new String(body, "utf-8"));
            }
        };

        //监听队列
        channel.basicConsume(send.TEST_QUEUE_1,true,consumer);
    }
}
4.3 消费者2
public class Custom2 {

    public static void main(String[] args) throws IOException {
        //获取连接
        Connection connection = ConnectionUtil.getConnection();
        //创建通道
        Channel channel = connection.createChannel();

        //声明交换机
        channel.exchangeDeclare(send.TEST_EXCHANGE,"fanout");
        //声明队列
        channel.queueDeclare(send.TEST_QUEUE_2, true, false, false, null);
        //绑定交换机
        channel.queueBind(send.TEST_QUEUE_2, send.TEST_EXCHANGE, "");

        //创建消费者
        DefaultConsumer consumer = new DefaultConsumer(channel){
            /**
             * 重写数据处理
             * @param consumerTag
             * @param envelope
             * @param properties
             * @param body
             * @throws IOException
             */
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                //路由key
                System.out.println("路由key为:" + envelope.getRoutingKey());
                //交换机
                System.out.println("交换机为:" + envelope.getExchange());
                //消息id
                System.out.println("消息id为:" + envelope.getDeliveryTag());
                //收到的消息
                System.out.println("消费者2-接收到的消息为:" + new String(body, "utf-8"));
            }
        };

        //监听队列
        channel.basicConsume(send.TEST_QUEUE_2,true,consumer);
    }
}
五、routing模式

不同于发布订阅模式把消息交给每一个绑定的队列,routing(路由)模式是根据消息的 Routing Key 进行判断,只有队列的Routing key 与消息的 Routing key 完全一致,才会接收到消息

交换机的类型为direct
发布信息时,需要指定Routing key

5.1 生产者

生成多个路由的消息

public class send {

    //交换机名称
    private final static String EXCHANGE_NAME       = "test_exchange_direct";

    //路由名称warning
    private final static String ROUTING_KEY_WARNING = "warning";

    //路由名称info
    private final static String ROUTING_KEY_INFO    = "info";

    //路由名称error
    private final static String ROUTING_KEY_ERROR   = "error";

    public static void main(String[] args)
    {
        try
        {
            //获取连接
            Connection connection = ConnectionUtil.getConnection();
            //获取通道
            Channel channel = connection.createChannel();
            //声明交换机
            channel.exchangeDeclare(EXCHANGE_NAME, "direct");


            //发送消息(warning级别日志)
            String message = "this is warning log";
            channel.basicPublish(EXCHANGE_NAME, ROUTING_KEY_WARNING, null, message.getBytes("utf-8"));
            System.out.println("[send]:" + message);

            //发送消息(info级别日志)
            message = "this is info log";
            channel.basicPublish(EXCHANGE_NAME, ROUTING_KEY_INFO, null, message.getBytes("utf-8"));
            System.out.println("[send]:" + message);

            //发送消息(error级别日志)
            message = "this is error log";
            channel.basicPublish(EXCHANGE_NAME, ROUTING_KEY_ERROR, null, message.getBytes("utf-8"));
            System.out.println("[send]:" + message);
            channel.close();
            connection.close();
        } catch (UnsupportedEncodingException e) {
            e.printStackTrace();
        } catch (IOException e) {
            e.printStackTrace();
        }

    }

}
5.2 消费者1

只接收routing key为error的消息

public class custom1 {
    //交换机名称
    private final static String EXCHANGE_NAME     = "test_exchange_direct";

    //路由名称error
    private final static String ROUTING_KEY_ERROR = "error";

    //队列名称
    private static final String QUEUE_NAME        = "test_queue_handel_error";

    public static void main(String[] args)
    {
        try
        {
            //获取连接
            Connection connection = ConnectionUtil.getConnection();
            //从连接中获取一个通道
            final Channel channel = connection.createChannel();
            //声明队列
            channel.queueDeclare(QUEUE_NAME, false, false, false, null);
            //将队列绑定到交换机(指定路由error)
            channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, ROUTING_KEY_ERROR);
            //每个时刻只发送消息到一个消费者
            channel.basicQos(1);
            //定义消费者
            DefaultConsumer consumer = new DefaultConsumer(channel){
                //消息接收时
                @Override
                public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                    String message = new String(body, "utf-8");
                    System.out.println("[test_queue_handel_error] Receive message:" + message);
                    try
                    {
                        //休息10ms处理业务
                        Thread.sleep(10);
                    }
                    catch (InterruptedException e)
                    {
                        e.printStackTrace();
                    }
                    finally
                    {
                        //手动应答
                        channel.basicAck(envelope.getDeliveryTag(), false);
                    }
                }
            };
            //监听队列,设置手动应答
            channel.basicConsume(QUEUE_NAME, false, consumer);
        }
        catch (IOException e)
        {
            e.printStackTrace();
        }
    }
}
5.3 消费者2

接收所有消息

public class custom2 {
    //交换机名称
    private final static String EXCHANGE_NAME       = "test_exchange_direct";

    //路由名称warning
    private final static String ROUTING_KEY_WARNING = "warning";

    //路由名称info
    private final static String ROUTING_KEY_INFO    = "info";

    //路由名称error
    private final static String ROUTING_KEY_ERROR   = "error";

    //队列名称
    private static final String QUEUE_NAME        = "test_queue_all_log";

    public static void main(String[] args){
        try{
            //获取连接
            Connection connection = ConnectionUtil.getConnection();
            //创建通道
            final Channel channel = connection.createChannel();
            //声明队列
            channel.queueDeclare(QUEUE_NAME, false, false, false, null);
            //将队列绑定到交换机(指定所有路由)
            channel.queueBind(QUEUE_NAME,EXCHANGE_NAME,ROUTING_KEY_WARNING);
            channel.queueBind(QUEUE_NAME,EXCHANGE_NAME,ROUTING_KEY_INFO);
            channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, ROUTING_KEY_ERROR);

            //每个时刻只发送消息到一个消费者
            channel.basicQos(1);
            //定义消费者
            DefaultConsumer consumer = new DefaultConsumer(channel){
                //消息接收时
                @Override
                public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                    String message = new String(body, "utf-8");
                    System.out.println("[test_queue_all_log] Receive message:" + message);
                    try
                    {
                        //休息1s处理业务
                        Thread.sleep(1000);
                    }
                    catch (InterruptedException e)
                    {
                        e.printStackTrace();
                    }
                    finally
                    {
                        //手动应答
                        channel.basicAck(envelope.getDeliveryTag(), false);
                    }
                }
            };
            //监听队列,设置手动应答
            channel.basicConsume(QUEUE_NAME, false, consumer);
        }
        catch (IOException e)
        {
            e.printStackTrace();
        }


    }
}

5.4 测试

生产者生成消息

消费者1接收到error的消息

消费者2接收到所有消息

六、topics模式

同样是消费者配置routing key匹配消息监听队列,不同于routing模式根据routing key相同匹配,topic模式匹配routingKey时采用通配符匹配

交换机的类型为topic
通配符#:匹配一个或多个词(每个词用.隔开),例如aa.#,可以匹配aa.bb、aa.cc、aa.bb.cc
通配符*:匹配一个词,比如aa.*,可以匹配aa.bb、aa.cc

6.1 生产者

生成多条信息

public class provider {
    static final String EXCHANGE_NAME = "test_exchange_topic";

    static final String TOPIC_ORANGE_KEY = "aa.orange.bb";

    static final String TOPIC_RABBIT_KEY = "aa.cc.rabbit";

    static final String TOPIC_LAZ_Y_KEY = "laz.y.bb.cc";

    public static void main(String[] args) throws IOException {
        //获取连接
        Connection connection = ConnectionUtil.getConnection();
        //创建通道
        Channel channel = connection.createChannel();

        //声明交换机
        channel.exchangeDeclare(EXCHANGE_NAME,"topic");

        //发送消息:ORANGE
        String message = "ORANGE";
        channel.basicPublish(EXCHANGE_NAME,TOPIC_ORANGE_KEY,null,message.getBytes());
        System.out.println("[seng]:"+message);

        //发送消息:RABBIT
        message = "RABBIT";
        channel.basicPublish(EXCHANGE_NAME,TOPIC_RABBIT_KEY,null,message.getBytes());
        System.out.println("[seng]:"+message);

        //发送消息:LAZ_Y
        message = "LAZ_Y";
        channel.basicPublish(EXCHANGE_NAME,TOPIC_LAZ_Y_KEY,null,message.getBytes());
        System.out.println("[seng]:"+message);

        //关闭通道
        channel.close();
        //关闭连接
        connection.close();

    }
}
5.2 消费者1

根据**.orange.**匹配routing key,接收消息

public class recv1 {
    static final String EXCHANGE_NAME = "test_exchange_topic";

    static final String TOPIC_ORANGE_KEY = "*.orange.*";

    static final String QUEUE_NAME = "test_queue_one_topic";

    public static void main(String[] args) throws IOException {
        //获取连接
        Connection connection = ConnectionUtil.getConnection();
        //创建通道
        final Channel channel = connection.createChannel();

        //声明队列
        channel.queueDeclare(QUEUE_NAME,false,false,false,null);
        //绑定队列
        channel.queueBind(QUEUE_NAME,EXCHANGE_NAME,TOPIC_ORANGE_KEY);

        //每个时刻只发送消息到一个消费者
        channel.basicQos(1);

        //定义消费者
        DefaultConsumer consumer = new DefaultConsumer(channel){
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                String message = new String(body);
                System.out.println("[receive]:"+message);
                try {
                    Thread.sleep(10);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                } finally {
                    //手动应答
                    channel.basicAck(envelope.getDeliveryTag(),false);
                }
            }
        };

        //监听(手动应答)
        channel.basicConsume(QUEUE_NAME,false,consumer);
    }
}
6.3 消费者2

根据*.*.rabbit或laz.y.#匹配routing key,接收消息

public class recv2 {
    static final String EXCHANGE_NAME = "test_exchange_topic";

    static final String TOPIC_RABBIT_KEY = "*.*.rabbit";

    static final String TOPIC_LAZ_Y_KEY = "laz.y.#";

    static final String QUEUE_NAME = "test_queue_more_topic";

    public static void main(String[] args) throws IOException {
        //获取连接
        Connection connection = ConnectionUtil.getConnection();
        //创建通道
        final Channel channel = connection.createChannel();

        //声明队列
        channel.queueDeclare(QUEUE_NAME,false,false,false,null);
        //绑定队列
        channel.queueBind(QUEUE_NAME,EXCHANGE_NAME,TOPIC_RABBIT_KEY);
        channel.queueBind(QUEUE_NAME,EXCHANGE_NAME,TOPIC_LAZ_Y_KEY);

        //每个时刻只发送消息到一个消费者
        channel.basicQos(1);

        //定义消费者
        DefaultConsumer consumer = new DefaultConsumer(channel){
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                String message = new String(body);
                System.out.println("[receive]:"+message);
                try {
                    Thread.sleep(1000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                } finally {
                    //手动应答
                    channel.basicAck(envelope.getDeliveryTag(),false);
                }
            }
        };

        //监听(手动应答)
        channel.basicConsume(QUEUE_NAME,false,consumer);
    }
}
6.4 测试

消费者1接收到对应的消息

消费者2接收到对应的信息

欢迎分享,转载请注明来源:内存溢出

原文地址: http://www.outofmemory.cn/langs/722410.html

(0)
打赏 微信扫一扫 微信扫一扫 支付宝扫一扫 支付宝扫一扫
上一篇 2022-04-26
下一篇 2022-04-26

发表评论

登录后才能评论

评论列表(0条)

保存