rabbitMQ之消息确认

rabbitMQ之消息确认,第1张

rabbitMQ之消息确认 RabbitMQ生产者之消息确认

1.通过事务机制
RabbitMQ的事务机制 *** 作过程与事务型数据库有些类似:

1.channel.txSelect()用于开启事务
2.channel.txCommit()用于提交事务
3.channel.txRollback()用于回滚事务
--------------------------------------------------------------------------------
示例:
try {
 channel.txSelect();
 String message = "";
 channel.basicPublish("",QUEUE_NAME,MessageProperties.PERSISTENT_TEXT_PLAIN,message.getBytes());
 channel.txCommite();
}catch (Exception e) {
e.printStackTrace();
channel.toRollback();
}
-----------------------------------------------------详细例子--------------------------------------------
package com.dfyang.rabbitmq.tx;

import com.dfyang.rabbitmq.RabbitConnectionFactory;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.MessageProperties;

public class TXProducer {
    private static final String EXCHANGE_NAME = "tx.exchange";
    private static final String QUEUE_NAME = "tx.queue";
    private static final String ROUTING_KEY = "tx";

    public static void main(String[] args) throws Exception {
        Connection connection = RabbitConnectionFactory.getConnection();
        Channel channel = connection.createChannel();
        channel.exchangeDeclare(EXCHANGE_NAME, "direct");
        channel.queueDeclare(QUEUE_NAME, true, false, false, null);
        channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, ROUTING_KEY);
        String message = "test!";
        try {
            channel.txSelect();
            channel.basicPublish(EXCHANGE_NAME, ROUTING_KEY, MessageProperties.PERSISTENT_TEXT_PLAIN, message.getBytes());
            channel.txCommit();
        } catch (Exception e) {
            channel.txRollback();
        }
        channel.close();
        connection.close();
    }
}


1.客户端发送Tx.Select 将信道置为事务
2.Broker回复Tx.Select-Ok 确认已将信道置为事务模式
3.在发送完消息之后,客户端发送Tx.Commit提交事务
4.Broker回复Tx.Commit-Ok确认事务提交。

2.生产者/confirm/i机制

1.单条/confirm/i(发送一条等待确认一条)
channel./confirm/iSelect();//将信道置为/confirm/i
String message = "";
channel.basicPublish(EXCHANGE_NAME,"",MessageProperties.PERSISTENT_TEXT_PLAN,message.getBytes());
if(!channel.waitFor/confirm/is()){
System.out.println("消息发送失败");
}
System.out.println("消息发送成功");
单条/confirm/i模式的效率仅仅比事务模式高一点,这种模式是阻塞的。
2.批量confirm
批量/confirm/i模式就是先开启/confirm/i模式,发送多条之后再调用waitFor/confirm/is()方法确认,这样发送多条之后才会等待一次确认消息,效率比单条/confirm/i模式高了许多。但是如果返回false或者超时,这一批次的消息就要全部重发,如果经常丢消息,效率并不比单条/confirm/i高。。
-----------------------------------详细代码--------------------------------
package com.dfyang.rabbitmq.tx;

import com.dfyang.rabbitmq.RabbitConnectionFactory;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.MessageProperties;

import java.util.Queue;

public class TXProducer {
    private static final String EXCHANGE_NAME = "tx.exchange";
    private static final String QUEUE_NAME = "tx.queue";
    private static final String ROUTING_KEY = "tx";

    public static void main(String[] args) throws Exception {
        Connection connection = RabbitConnectionFactory.getConnection();
        Channel channel = connection.createChannel();
        channel.exchangeDeclare(EXCHANGE_NAME, "direct");
        channel.queueDeclare(QUEUE_NAME, true, false, false, null);
        channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, ROUTING_KEY);
        String message = "test!";
        channel./confirm/iSelect();
        for (int i = 0; i < 10000; i++) {
            channel.basicPublish(EXCHANGE_NAME, ROUTING_KEY, MessageProperties.PERSISTENT_TEXT_PLAIN, message.getBytes());
        }
        channel.waitFor/confirm/isOrDie();
        channel.close();
        connection.close();
    }
}

3.异步/confirm/i模式:采用异步模式将不用阻塞等待borker服务器确认接收到消息就可以继续发送消息
package com.springrabbitmq.comfirm;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client./confirm/iListener;
import com.rabbitmq.client.Connection;
import com.springrabbitmq.util.RabbitMQConnectionUtil;

import java.io.IOException;
import java.util.*;
import java.util.concurrent.TimeoutException;


public class Send3 {

    private static final String QUEUE_NAME="test_queue_/confirm/i1";

    public static void main(String[] args) throws IOException, TimeoutException, InterruptedException {
        Connection connection = RabbitMQConnectionUtil.getConnection();
        Channel channel = connection.createChannel();
        channel.queueDeclare(QUEUE_NAME,false,false,false,null);

        //开启/confirm/i模式
        //注意已经定义队列为AMQP的事务机制的话,就不能再改成/confirm/i
        channel./confirm/iSelect();

        //未确认的消息标识
        final SortedSet confirmSet = Collections.synchronizedSortedSet(new TreeSet());

        ///confirm/i监听
        channel.add/confirm/iListener(new /confirm/iListener() {
            //没有问题的handleAck,从un/confirm/ied集合里移除元素表示确认收到了
            public void handleAck(long deliveryTag, boolean multiple) throws IOException {
                if (multiple){
                    System.out.println("------handleAck----multiple------");
                    //multiple为true时,移除(deliveryTag+1)之前的多个元素
                    /confirm/iSet.headSet(deliveryTag+1).clear();
                }else {
                    System.out.println("------handleAck----multiple------false");
                    //multiple为false时,移除一个
                    /confirm/iSet.remove(deliveryTag);
                }
            }

            //RabbitMQ异常时没有收到消息,/confirm/i会回执一条Nack给生产者
            public void handleNack(long deliveryTag, boolean multiple) throws IOException {
                if (multiple){
                    System.out.println("------handleNack----multiple------");
                    //multiple为true时,移除(deliveryTag+1)之前的多个元素
                    /confirm/iSet.headSet(deliveryTag+1).clear();
                }else {
                    System.out.println("------handleNack----multiple------false");
                    //multiple为false时,移除一个
                    /confirm/iSet.remove(deliveryTag);
                }
            }
        });

        String msg="Hello Confirm Message!![异步]";

        for (int i=0;i<20;i++){
            //channel为每次发布的消息指派一个ID
            long seqNo = channel.getNextPublishSeqNo();
            channel.basicPublish("", QUEUE_NAME, null, msg.getBytes());
            System.out.println("Message Send Success!"+i);
            //每次发布后将ID添加到un/confirm/ied未确定是否发送成功的集合Set中
            /confirm/iSet.add(seqNo);
            System.out.println(/confirm/iSet.size());
        }

    }
}


欢迎分享,转载请注明来源:内存溢出

原文地址: https://www.outofmemory.cn/zaji/5690279.html

(0)
打赏 微信扫一扫 微信扫一扫 支付宝扫一扫 支付宝扫一扫
上一篇 2022-12-17
下一篇 2022-12-17

发表评论

登录后才能评论

评论列表(0条)

保存