在springboot上刨rabbitmq和mqtt的祖坟

在springboot上刨rabbitmq和mqtt的祖坟,第1张

在springboot上刨rabbitmq和mqtt的祖坟

文章目录
  • 1.rabbit mq和mqtt
    • 1.1mq
    • 1.2mqtt
  • 2.集成rabbitmq收发消息
    • 2.1引入pom文件增加配置文件
    • 2.2参数说明
      • 2.2.1交换机
      • 2.2.2队列
      • 2.2.3消息
    • 2.3发送消息
    • 2.4接收消息
  • 3.集成mqtt收发消息
    • 3.1引入pom文件增加配置文件
    • 3.2发送消息
    • 3.3接收消息
  • 4.github源码地址

1.rabbit mq和mqtt 1.1mq

之前介绍过rabbitmq的安装和底层实现有兴趣可以查阅之前的博客。

重要的说明:

  • 生产者 producer:生产者向队列发送消息
  • 消费者 consumer:消费者从队列获取消息
  • 交换机 exchange:生产者将消息发给交换机,由交换机分发消息
  • 通道 channel:信道,用于通信
  • 队列 queue:储存消息的地方
  • 路由键 RoutingKey:指定当前消息被谁接受
  • 绑定key BindingKey:指定当前Exchange下,什么样的路由键会被下派到当前绑定的Queue中

今天抽象型的说一下mq各部分的运行机制:

交换机队列交换机绑定,然后指定路由键,这样消息发送到交换机,交换机根据路由键去投递到匹配的队列中。

  • Direct Exchange 直连交换机
    需要将一个队列绑定到交换机上,要求该消息与一个特定的路由键完全匹配。
  • Fanout Exchange 扇形交换机
    一种不使用路由键的交换机,收到的消息直接投递到绑定的全部队列上,一个队列一份。
  • Topic Exchange 主题交换机
    路由键匹配,他的路由键支持通配符比如"test/",可以匹配到多个路由键,假设交换机收到两条消息路由键是“test/1”“test/2”,那么交换机会发送给路由键是"test/"的交换机一份。
    “#”会匹配之后多级的目录,“”只能匹配后一级目录。
    test/
    : test/1
    test/# : test/1/2/3/4/5/6/7/8/9/10

队列:

  • 简单队列:Hello World
    一个生产者对应一个消费者没有中间商赚差价(交换机)。
  • work模式:Work queus
    一个生产者多个消费者,消息只会被一个消费者处理,同一条消息不会被重复消费,也没有中间商赚差价(交换机)。
  • 订阅模式:Publish/Subscribe
    类似于mqtt协议的主题订阅,消息先到交换机,交换机再将消息分配给已经绑定的消费者。
  • 路由模式:Routing
    和订阅模式极度相同,队列可以设置多个key绑定交换机任何一个匹配上了都可以接收消息,比如key为 test/get、test/put,test/get和test/put任何一个来消息了路由器都会投递到队列中。
  • 通配符模式:Topics
    和交换机的同通配符差不多,只是这次通配符用在了队列绑定交换机的key上
  • RPC(没啥用)
1.2mqtt

MQTT是当前物联网使用比较多的一个协议,他是以主题订阅的形式进行工作,类似于八婆传瞎话,某一个八婆在某个小团体中说别人的坏话,然后你不在它的小团里不行,它说别人坏话的时候你不在也听不见。
(mqttclient订阅某一个主题,然后所有订阅这个主题的用户都可以收发消息,某个客户端发送消息其他所有在线用户会同步收到,即时消息不会存储,不在线收不到消息,即使下线又上线之前不在线的消息也不会收到)

2.集成rabbitmq收发消息

注意:一定要现有队列才可以发送消息,发送消息并不会创建交换机和队列,可以先让消费者启动也就是RabbitListener他会根据配置创建。

2.1引入pom文件增加配置文件

POM文件

 

    org.springframework.boot
    spring-boot-starter-amqp
    2.1.6.RELEASE

porperties

spring:
  #rabbitmq
  rabbitmq:
    port: 5672
    username: 账号
    password: 密码
    virtual-host: /
    host: ip
    listener:
      simple:
        ## auto表示自动完成这次消费,manual表示需要手动告知完成消费
        acknowledge-mode: manual
        ## listener开几个线程处理数据
        concurrency: 1
        ## linstener 最大开几个线程
        max-concurrency: 1
        ## 一次拿几条数据
        prefetch: 1
        # 开启重试,重试5次 间隔1秒
        retry:
          # 开启消费者(程序出现异常)重试机制,默认开启并一直重试
          enabled: true
          # 最大重试次数
          max-attempts: 5
          # 重试间隔时间(毫秒)
          initial-interval: 1000
        # 是否进入死信队列 true是 false不是
        default-requeue-rejected: false
2.2参数说明 2.2.1交换机
@AliasFor("name")
String value() default ""; // 交换机名称(两个都是)
@AliasFor("value")
String name() default ""; // 交换机名称(两个都是)
String type() default "direct"; // 交换机类型
String durable() default "true"; // 是否是持久化的,即使rabbitmq重启,交换机是否存在
String autoDelete() default "false"; // 当没有队列绑定交换机自动销毁
String internal() default "false"; // 是否为内部交换机,内部交换机只能路由交换机到交换机
String ignoreDeclarationExceptions() default "false"; // 忽略声明异常
String delayed() default "false";  // 是否开启延迟消息,需要使用延迟消息插件
Argument[] arguments() default {}; // 结构化参数,发送消息的时候,额外设置消息的参数(也就是header信息)
String declare() default "true"; // 是否有管理员
String[] admins() default {}; // 返回应该声明此组件的管理bean名称列表。默认情况下,所有管理员都将声明它
2.2.2队列
@AliasFor("name")
String value() default ""; // 队列名称(两个都是)
@AliasFor("value")
String name() default ""; // 队列名称(两个都是)
String durable() default ""; // 是否是持久化的,即使rabbitmq重启,队列是否存在
String exclusive() default ""; // 是否是排他队列,是否只在第一次创建它的Connection中有效,当Connection关闭,该Queue也会被删除
String autoDelete() default ""; // 当没有消费者自动销毁队列
String ignoreDeclarationExceptions() default "false"; // 忽略声明异常
Argument[] arguments() default {}; // 结构化参数,发送消息的时候,额外设置消息的参数(也就是header信息)
String declare() default "true"; // 是否有管理员
String[] admins() default {}; // 返回应该声明此组件的管理bean名称列表。默认情况下,所有管理员都将声明它
2.2.3消息
  • 设置exchange为持久化之后,并不能保证消息不丢失,因为此时发送往exchange中的消息并不是持久化的,需要配置delivery_mode=2指明message为持久的。FanoutExchange 发送的消息默认就是持久化
2.3发送消息
@Component
public class SendMessage implements CommandLineRunner {

    private int index = 0;

    private RabbitTemplate rabbitTemplate;

    public SendMessage(RabbitTemplate rabbitTemplate) {
        this.rabbitTemplate = rabbitTemplate;
    }

    @Override
    public void run(String... args) throws InterruptedException {
        //测试使用代码
        Boolean where = true;
        while (where) {
            System.out.println("Sending message...");
            try {
                rabbitTemplate.convertAndSend("test_exchange", "", "发送消息" + index);
            } catch (Exception e) {
                e.printStackTrace();
            }
            index++;
            Thread.sleep(1000);
        }
    }
}
2.4接收消息
@Component
public class ReceiveMessage {

    @Autowired
    private MqttClient mqttClient;

    @RabbitListener(bindings = @QueueBinding(
            value = @Queue(value = "test_queue", durable = "true"),
            exchange = @Exchange(name = "test_exchange", type = "fanout"),
            key = ""))
    @RabbitHandler
    private void receive1(Message message, Channel channel) {
        //消费者 *** 作
        try {
            receiveMessage(new String(message.getBody()));
            //告诉服务器收到这条消息 已经被我消费了 可以在队列删掉 这样以后就不会再发了 否则消息服务器以为这条消息没处理掉 后续还会在发
            channel.basicAck(message.getMessageProperties().getDeliveryTag(), false); // 正常消费队列,第二个参数的意思是小于该ack的消息是否等待全部完成后再一次性签收
//            channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, true);// 将消息重新放回队列
        } catch (Exception e) {
            e.printStackTrace();
        }
    }

    public void receiveMessage(String message) {
        // 1.接收AMQP消息
        Thread t = Thread.currentThread();
        String name = t.getName();
        System.out.println("name=" + name);
        System.out.println("-------------------接收消息-------------------");
        System.out.println(message);
        try {
            Thread.sleep(1000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        // 2.发送mqtt 消息
        String mqttTopic = "test";
        MqttMessage msg = new MqttMessage();
        String msgStr = message;
        msg.setPayload(msgStr.getBytes());//设置消息内容
        msg.setQos(0);//设置消息发送质量,可为0,1,2.
        msg.setRetained(false);//服务器是否保存最后一条消息,若保存,client再次上线时,将再次受到上次发送的最后一条消息。
        try {
            mqttClient.publish(mqttTopic, msg);//设置消息的topic,并发送
        } catch (MqttException e) {
            e.printStackTrace();
        }
    }
}
3.集成mqtt收发消息 3.1引入pom文件增加配置文件
  
    
        org.eclipse.paho
        org.eclipse.paho.client.mqttv3
        ${mqttv3.version}
    
#mqtt
mqtt:
  clientid: mqtt-system
  host: 182.92.101.122
  username: root
  pwd: hydf@8888
  completionTimeout: 30000
@Configuration
@RefreshScope
public class MqttConfig {

    @Value("${mqtt.host}")
    private String mqttHost;

    @Value("${mqtt.username}")
    private String mqttUserName;

    @Value("${mqtt.clientid}")
    private String clientId;

    @Value("${mqtt.pwd}")
    private String mqttPwd;

    @Value("${mqtt.completionTimeout}")
    private Integer completionTimeout;

    
    @Bean
    public MqttClient mqttClient() throws MqttException {
        //1.初始化mqtt参数
        MqttConnectOptions mOptions = new MqttConnectOptions();
        mOptions.setAutomaticReconnect(true);//断开后,是否自动连接
        mOptions.setCleanSession(false);//是否清空客户端的连接记录。若为true,则断开后,broker将自动清除该客户端连接信息
        mOptions.setConnectionTimeout(completionTimeout);//设置超时时间,单位为秒
        mOptions.setUserName(mqttUserName);//设置用户名。跟Client ID不同。用户名可以看做权限等级
        mOptions.setPassword(mqttPwd.toCharArray());//设置登录密码
        mOptions.setKeepAliveInterval(60);//心跳时间,单位为秒。即多长时间确认一次Client端是否在线
        mOptions.setMaxInflight(10);//允许同时发送几条消息(未收到broker确认信息)
        //2.创建mqtt客户端
        MqttClient client = null;
        try {
            client = new MqttClient("tcp://"+mqttHost, clientId,null);
            client.connect(mOptions);//连接broker
            client.setCallback(mqttCallback);//设置回调
        } catch (MqttException e) {
            e.printStackTrace();
        }
        // 订阅主题
        client.subscribe("test");
        return client;
    }

    
    static MqttCallback mqttCallback = new MqttCallback() {
        @Override
        public void connectionLost(Throwable cause) {
            System.out.println("MQTT Lost");
        }
        @Override
        public void messageArrived(String topic, MqttMessage message){
            System.out.println("收到消息!");
            System.out.println(new String(message.getPayload()));
        }
        @Override
        public void deliveryComplete(IMqttDeliveryToken token) {
            System.out.println("MQTT delivery Complete ");
        }
    };
}
3.2发送消息
 // 2.发送mqtt 消息
        String mqttTopic = "test";
        MqttMessage msg = new MqttMessage();
        String msgStr = message;
        msg.setPayload(msgStr.getBytes());//设置消息内容
        msg.setQos(0);//设置消息发送质量,可为0,1,2.
        msg.setRetained(false);//服务器是否保存最后一条消息,若保存,client再次上线时,将再次受到上次发送的最后一条消息。
        try {
            mqttClient.publish(mqttTopic, msg);//设置消息的topic,并发送
        } catch (MqttException e) {
            e.printStackTrace();
        }
3.3接收消息

接收消息代码,在3.1中初始化client的时候绑定的回调

4.github源码地址

https://github.com/1142235090/frame

欢迎分享,转载请注明来源:内存溢出

原文地址: https://www.outofmemory.cn/zaji/5624038.html

(0)
打赏 微信扫一扫 微信扫一扫 支付宝扫一扫 支付宝扫一扫
上一篇 2022-12-15
下一篇 2022-12-15

发表评论

登录后才能评论

评论列表(0条)

保存