golang之消息队列rabbitmq

golang之消息队列rabbitmq,第1张

文章目录 消息队列的作用:收发流程docker安装官方文档消息收发模式1.p发给队列,单个c消费,这里用的默认exchange,收发模式是direct2.p发给队列,多个c消费,这里用的默认exchange,收发模式是direct3.fandout模式:p将消息发给x,x将同一个消息发给所有q,c 按 1,2方式消费q的消息4.direct(路由)模式:p 按照路由Routing Key将消息发给q,(一个消息可能发给多个q),c 按 1,2方式消费q的消息5.topic模式:p 按照路由Routing Key将消息发给q,(一个消息可能发给多个q),c 按 1,2方式消费q的消息,与4的区别是topic可以有通配符匹配 用go *** 作rabbitmq写代码的思路收发模式2示例:fanout模式示例:routing(路由)模式示例topic模式 高级 *** 作消费者确认模式:消费限流延迟消息持久化交换机持久化:队列持久化消息持久化

消息队列的作用: 异步,将同步的消息变为异步,例如我们可以使用rpc调用另一个服务,但是我们必须等待返回(同步),用mq可以变异步解耦,将单体服务拆分多个微服务,实现了分布式部署,单个服务的修改、增加或删除,不影响其他服务,不需要全部服务关闭重启抗压,由于是异步,解耦的,高并发请求到来时,我们不直接发送给服务,而是发给MQ,让服务决定什么时候接收消息,提供服务,这样就缓解了服务的压力
图示:
用户注册后发邮件和虚拟币:
异步解耦图:

抗压图:
收发流程 生产者发送消息的流程 生产者连接RabbitMQ,建立TCP连接( Connection),开启信道(Channel)生产者声明一个Exchange(交换器),并设置相关属性,比如交换器类型、是否持久化等生产者声明一个队列井设置相关属性,比如是否排他、是否持久化、是否自动删除等生产者通过 routingKey (路由Key)将交换器和队列绑定( binding )起来生产者发送消息至RabbitMQ Broker,其中包含 routingKey (路由键)、交换器等信息相应的交换器根据接收到的 routingKey 查找相匹配的队列。如果找到,则将从生产者发送过来的消息存入相应的队列中。如果没有找到,则根据生产者配置的属性选择丢弃还是回退给生产者关闭信道。关闭连接 消费者接收消息的过程 消费者连接到RabbitMQ Broker ,建立一个连接(Connection ) ,开启一个信道(Channel) 。消费者向RabbitMQ Broker 请求消费相应队列中的消息,可能会设置相应的回调函数, 以及
做一些准备工作等待RabbitMQ Broker 回应并投递相应队列中的消息, 消费者接收消息。消费者确认( ack) 接收到的消息。RabbitMQ 从队列中删除相应己经被确认的消息。关闭信道。关闭连接。
docker安装

拉取image:

docker pull rabbitmq:3.8-management-alpine

启动容器:
5672进行通信,15672 ,web管理工具

docker run -d --name rmq \
-e RABBITMQ_DEFAULT_USER=用户名 \
-e RABBITMQ_DEFAULT_PASS=密码 \
-p 15672:15672 \
-p 5672:5672 \
rabbitmq:3.8-management-alpine
官方文档

官方文档

消息收发模式

明确连个概念,exchange(路由) queue(队列)
工作模式:

以下用p 代指生产者,用 c 代指消费者,用 x 代指 exchange

1.p发给队列,单个c消费,这里用的默认exchange,收发模式是direct

2.p发给队列,多个c消费,这里用的默认exchange,收发模式是direct

3.fandout模式:p将消息发给x,x将同一个消息发给所有q,c 按 1,2方式消费q的消息

4.direct(路由)模式:p 按照路由Routing Key将消息发给q,(一个消息可能发给多个q),c 按 1,2方式消费q的消息 5.topic模式:p 按照路由Routing Key将消息发给q,(一个消息可能发给多个q),c 按 1,2方式消费q的消息,与4的区别是topic可以有通配符匹配 用go *** 作rabbitmq 写代码的思路

在初始化中完成

声明exchange声明queue将queue与key、exchange绑定

然后用conn.Channel()和rabbitmq交互

go get github.com/rabbitmq/amqp091-go
收发模式2示例:
package main

import (
	"fmt"
	"github.com/streadway/amqp"
	"time"
)

func main() {
	conn, err := amqp.Dial("amqp://用户名:密码@IP:端口/")
	if err != nil {
		panic(err)
	}

	ch, err := conn.Channel()
	if err != nil {
		panic(err)
	}
	//durable 服务器重启还有queue  autoDelete 自动删除 exclusive 独占连接,这个q别人连不上 noWait 是否等待返回的一些状态结果
	//关于queue的一些设置
	q, err := ch.QueueDeclare("go_q1", true, false, false, false, nil)
	if err != nil {
		panic(err)
	}

	// 开启消费者
	go consume("c1",conn, q.Name)
	go consume("c2",conn, q.Name)

	i := 0
	for {
		i++
		err := ch.Publish("", q.Name, false, false, amqp.Publishing{
			Body: []byte(fmt.Sprintf("message %d", i)),
		})
		if err != nil {
			panic(err)
		}
		time.Sleep(200 * time.Millisecond)
	}
}

func consume(name string,conn *amqp.Connection, q string)  {
	ch, err :=  conn.Channel()
	if err != nil {
		panic(err)
	}
	msgs, err := ch.Consume(q,name,true, false,false,false,nil)
	if err != nil {
		panic(err)
	}

	for msg := range msgs {
		fmt.Printf("%s:%s\n",name,msg.Body)
	}
}
fanout模式示例:
package main

import (
	"fmt"
	"github.com/streadway/amqp"
	"time"
)

func main() {
	conn, err := amqp.Dial("amqp://用户名:密码@IP:端口/")
	if err != nil {
		panic(err)
	}

	ch, err := conn.Channel()
	if err != nil {
		panic(err)
	}

	err = ch.ExchangeDeclare("ex","fanout",true,false,false,false,nil)
	if err != nil {
		panic(err)
	}

	go subscribe(conn,"ex")
	go subscribe(conn,"ex")

	i := 0
	for {
		i++
		err := ch.Publish("ex", "", false, false, amqp.Publishing{
			Body: []byte(fmt.Sprintf("message %d", i)),
		})
		if err != nil {
			panic(err)
		}
		time.Sleep(200 * time.Millisecond)
	}
}

func subscribe(conn *amqp.Connection, ex string) {
	ch, err :=  conn.Channel()
	if err != nil {
		panic(err)
	}
	defer ch.Close()

	q, err := ch.QueueDeclare("", false, true, false, false, nil)
	if err != nil {
		panic(err)
	}
	defer ch.QueueDelete(q.Name, false,false,false)
	err = ch.QueueBind(q.Name,"",ex,false,nil)
	if err != nil {
		panic(err)
	}
	consume("c3",ch,q.Name)

}

func consume(name string,ch *amqp.Channel, q string)  {
	msgs, err := ch.Consume(q,name,true, false,false,false,nil)
	if err != nil {
		panic(err)
	}

	for msg := range msgs {
		fmt.Printf("%s:%s\n",name,msg.Body)
	}
}

写代码的时候注意,收发消息,一定要在不同的channel进行,大家可以把channel认为是一个tcp连接的分割。建立exchang的channel可以进行发消息,不可以进行收消息
可以看到有一个exchange,对应2个queue。对应一条tcp连接(分成3个channel,1个向exchange发,2个从queue收)


routing(路由)模式示例
package main

import (
	"fmt"
	"github.com/streadway/amqp"
	"strconv"
	"time"
)

const (
	exchangeName = "ex_routing"
	key1     = "key1"
	key2     = "key2"
	queueBindKey1 = "queue1"
	queueBindKey2 = "queue2"
)

func main() {
	dsn := fmt.Sprintf("amqp://%s:%s@%s:%d/", "xxxxx", "xxxxx", "xxxxx", "xxxxx")
	conn, err := amqp.Dial(dsn)
	if err != nil {
		panic(err)
	}

	ch, err := conn.Channel()
	if err != nil {
		panic(err)
	}
	InitMQ(ch,queueBindKey1,key1,exchangeName)
	InitMQ(ch,queueBindKey2,key2,exchangeName)

	go subscribe(conn, key1,queueBindKey1)
	go subscribe(conn, key2,queueBindKey2)

	i := 0
	for {
		i++
		sendMessage(ch,exchangeName,key1,strconv.Itoa(i))
		sendMessage(ch,exchangeName,key2,strconv.Itoa(i))
		time.Sleep(500 * time.Millisecond)
	}
}

func InitMQ(ch *amqp.Channel, queue,key,exchange string) {
	// 声明 exchange
	err := ch.ExchangeDeclare(exchangeName, "direct", true, false, false, false, nil)
	if err != nil {
		panic(err)
	}
	// 声明 queue
	_, err = ch.QueueDeclare(queue, false, false, false, false, nil)
	if err != nil {
		panic(err)
	}
	// 将 queue 与 exchange 和 key 绑定
	err = ch.QueueBind(queue, key, exchange, false, nil)
	if err != nil {
		panic(err)
	}

}

func sendMessage(ch *amqp.Channel, exchange string, key string,message string) {
	err := ch.Publish(exchange, key, false, false, amqp.Publishing{
		Body: []byte(fmt.Sprintf("send to %s, message: %v", key,message)),
	})
	if err != nil {
		panic(err)
	}

}

func subscribe(conn *amqp.Connection, key string,queue string) {
	ch, err := conn.Channel()
	if err != nil {
		panic(err)
	}
	defer ch.Close()
	key = fmt.Sprintf("%s haha",key)
	consume(key, ch, queue)
}

func consume(name string, ch *amqp.Channel, queue string) {
	msgs, err := ch.Consume(queue, name, true, false, false, false, nil)

	if err != nil {
		panic(err)
	}

	for msg := range msgs {
		fmt.Printf("%s:%s\n", name, msg.Body)
	}
}

绑定图:

topic模式

是rabbitmq最高级模式了,没啥说的,重点就是,.匹配1个#匹配0或多个

package main

import (
	"fmt"
	"github.com/streadway/amqp"
	"log"
	"time"
)

const (
	TopicExchange = "topicExchange"
	BindingKey1   = "*.*.red"
	BindingKey2   = "*.error.*"
	BindingKey3   = "shanghai.*.*"
	Queue1        = "queue1"
	Queue2        = "queue2"
	Queue3        = "queue3"
	RoutingKey1   = "beijing.error"
	RoutingKey2   = "shanghai.fatal.red"
)

func main() {
	dsn := fmt.Sprintf("amqp://%s:%s@%s:%d/", "用户名", "密码", "ip", port)
	conn, err := amqp.Dial(dsn)
	if err != nil {
		panic(err)
	}

	ch, err := conn.Channel()
	if err != nil {
		panic(err)
	}
	InitMQ(ch, Queue1, BindingKey1, TopicExchange)
	InitMQ(ch, Queue2, BindingKey2, TopicExchange)
	InitMQ(ch, Queue3, BindingKey3, TopicExchange)
	ch2 := GenChannel(conn)
	go subscribe(ch2, BindingKey1, Queue1, func(msgs <-chan amqp.Delivery, s string) {
		for msg := range msgs {
			fmt.Printf("%v 绑定 %v 收到消息:%v\n", Queue1,BindingKey1,string(msg.Body))
		}
	})
	go subscribe(ch2, BindingKey2, Queue2, func(msgs <-chan amqp.Delivery, s string) {
		for msg := range msgs {
			fmt.Printf("%v 绑定 %v 收到消息:%v\n", Queue2,BindingKey2,string(msg.Body))
		}
	})
	go subscribe(ch2, BindingKey3, Queue3, func(msgs <-chan amqp.Delivery, s string) {
		for msg := range msgs {
			fmt.Printf("%v 绑定 %v 收到消息:%v\n", Queue3,BindingKey3,string(msg.Body))
		}
	})
	for {
		sendMessage(ch, TopicExchange, RoutingKey1, "beijing.error")
		sendMessage(ch, TopicExchange, RoutingKey2, "shanghai.fatal.red")
		time.Sleep(500 * time.Millisecond)
	}
}

func GenChannel(conn *amqp.Connection) *amqp.Channel {
	ch, err := conn.Channel()
	if err != nil {
		log.Fatal(err)
	}
	return ch
}

func InitMQ(ch *amqp.Channel, queue, key, exchange string) {
	// 声明 exchange
	err := ch.ExchangeDeclare(exchange, "topic", true, false, false, false, nil)
	if err != nil {
		panic(err)
	}
	// 声明 queue
	_, err = ch.QueueDeclare(queue, false, false, false, false, nil)
	if err != nil {
		panic(err)
	}
	// 将 queue 与 exchange 和 key 绑定
	err = ch.QueueBind(queue, key, exchange, false, nil)
	if err != nil {
		panic(err)
	}

}

func sendMessage(ch *amqp.Channel, exchange string, key string, message string) {
	err := ch.Publish(exchange, key, false, false, amqp.Publishing{
		Body: []byte(fmt.Sprintf("send to %s, message: %v", key, message)),
	})
	if err != nil {
		panic(err)
	}

}

func subscribe(ch *amqp.Channel, key string, queue string, callback func(<-chan amqp.Delivery, string)) {
	msgs, err := ch.Consume(queue, key, true, false, false, false, nil)

	if err != nil {
		panic(err)
	}
	callback(msgs, key)
}

高级 *** 作 消费者确认模式:

将消费消息,设置为手动确认:

成功时确认:msg.Ack(false)
失败时消息处理方式:

不进行确认,会进入unacked,当消费者重启后,或者同一队列的其他消费者可以消费

重新入列

msg.Reject(true)
丢弃
msg.Reject(false)
package main

import (
	"fmt"
	"github.com/streadway/amqp"
	"log"
	"time"
)

const (
	TopicExchange = "topicExchange"
	BindingKey1   = "*.*.red"
	BindingKey2   = "*.error.*"
	BindingKey3   = "shanghai.*.*"
	Queue1        = "queue1"
	Queue2        = "queue2"
	Queue3        = "queue3"
	RoutingKey1   = "beijing.error"
	RoutingKey2   = "shanghai.fatal.red"
)

func main() {
	dsn := fmt.Sprintf("amqp://%s:%s@%s:%d/", "xxxxx", "xxxxx", "xxxxx", "xxxxx")
	conn, err := amqp.Dial(dsn)
	if err != nil {
		panic(err)
	}

	ch, err := conn.Channel()
	if err != nil {
		panic(err)
	}
	InitMQ(ch, Queue1, BindingKey1, TopicExchange)
	InitMQ(ch, Queue2, BindingKey2, TopicExchange)
	InitMQ(ch, Queue3, BindingKey3, TopicExchange)
	ch2 := GenChannel(conn)
	go subscribe(ch2, BindingKey1, Queue1, func(msgs <-chan amqp.Delivery, s string) {
		for msg := range msgs {
			fmt.Printf("%v 绑定 %v 收到消息:%v\n", Queue1,BindingKey1,string(msg.Body))
			// false 拒绝重新入列,即丢弃
			//msg.Reject(false)
			// true 重新入列
			msg.Reject(true)
		}
	})
	go subscribe(ch2, BindingKey2, Queue2, func(msgs <-chan amqp.Delivery, s string) {
		for msg := range msgs {
			fmt.Printf("%v 绑定 %v 收到消息:%v\n", Queue2,BindingKey2,string(msg.Body))
			msg.Ack(false)
		}
	})
	go subscribe(ch2, BindingKey3, Queue3, func(msgs <-chan amqp.Delivery, s string) {
		for msg := range msgs {
			fmt.Printf("%v 绑定 %v 收到消息:%v\n", Queue3,BindingKey3,string(msg.Body))
			msg.Ack(false)
		}
	})
	cycleCount := 1
	for i:=0;i<cycleCount;i++ {
		sendMessage(ch, TopicExchange, RoutingKey1, "beijing.error")
		sendMessage(ch, TopicExchange, RoutingKey2, "shanghai.fatal.red")
		time.Sleep(500 * time.Millisecond)
	}
	select {}
}

func GenChannel(conn *amqp.Connection) *amqp.Channel {
	ch, err := conn.Channel()
	if err != nil {
		log.Fatal(err)
	}
	return ch
}

func InitMQ(ch *amqp.Channel, queue, key, exchange string) {
	// 声明 exchange
	err := ch.ExchangeDeclare(exchange, "topic", true, false, false, false, nil)
	if err != nil {
		panic(err)
	}
	// 声明 queue
	_, err = ch.QueueDeclare(queue, false, false, false, false, nil)
	if err != nil {
		panic(err)
	}
	// 将 queue 与 exchange 和 key 绑定
	err = ch.QueueBind(queue, key, exchange, false, nil)
	if err != nil {
		panic(err)
	}

}

func sendMessage(ch *amqp.Channel, exchange string, key string, message string) {
	err := ch.Publish(exchange, key, false, false, amqp.Publishing{
		Body: []byte(fmt.Sprintf("send to %s, message: %v", key, message)),
	})
	if err != nil {
		panic(err)
	}

}

func subscribe(ch *amqp.Channel, key string, queue string, callback func(<-chan amqp.Delivery, string)) {
	msgs, err := ch.Consume(queue, key, false, false, false, false, nil)

	if err != nil {
		panic(err)
	}
	callback(msgs, key)
}

消费限流

限制未ack的最多有5个,必须设置为手动ack才有效

示例:

package main

import (
	"fmt"
	"github.com/streadway/amqp"
	"log"
	"time"
)

const (
	TopicExchange = "topicExchange"
	BindingKey1   = "*.*.red"
	BindingKey2   = "*.error.*"
	BindingKey3   = "shanghai.*.*"
	Queue1        = "queue1"
	Queue2        = "queue2"
	Queue3        = "queue3"
	RoutingKey1   = "beijing.error"
	RoutingKey2   = "shanghai.fatal.red"
)

func main() {
	dsn := fmt.Sprintf("amqp://%s:%s@%s:%d/", "xxxx", "xxxx", "xxxx", xxxx)
	conn, err := amqp.Dial(dsn)
	if err != nil {
		panic(err)
	}

	ch, err := conn.Channel()
	if err != nil {
		panic(err)
	}
	InitMQ(ch, Queue1, BindingKey1, TopicExchange)
	InitMQ(ch, Queue2, BindingKey2, TopicExchange)
	InitMQ(ch, Queue3, BindingKey3, TopicExchange)
	ch2 := GenChannel(conn)
	// 限制未ack的最多有5个,必须设置为手动ack才有效
	ch2.Qos(5,0,false)
	go subscribe(ch2, BindingKey1, Queue1, func(msgs <-chan amqp.Delivery, s string) {
		for msg := range msgs {
			go func(msg amqp.Delivery) {
				fmt.Printf("%v 绑定 %v 收到消息:%v\n", Queue1,BindingKey1,string(msg.Body))
				time.Sleep(time.Second * 5)
				msg.Ack(false)
			}(msg)
		}
	})
	go subscribe(ch2, BindingKey2, Queue2, func(msgs <-chan amqp.Delivery, s string) {
		for msg := range msgs {
			fmt.Printf("%v 绑定 %v 收到消息:%v\n", Queue2,BindingKey2,string(msg.Body))
			msg.Ack(false)
		}
	})
	go subscribe(ch2, BindingKey3, Queue3, func(msgs <-chan amqp.Delivery, s string) {
		for msg := range msgs {
			fmt.Printf("%v 绑定 %v 收到消息:%v\n", Queue3,BindingKey3,string(msg.Body))
			msg.Ack(false)
		}
	})
	cycleCount := 10
	for i:=0;i<cycleCount;i++ {
		sendMessage(ch, TopicExchange, RoutingKey1, "beijing.error")
		sendMessage(ch, TopicExchange, RoutingKey2, "shanghai.fatal.red")
		time.Sleep(500 * time.Millisecond)
	}
	select {}
}

func GenChannel(conn *amqp.Connection) *amqp.Channel {
	ch, err := conn.Channel()
	if err != nil {
		log.Fatal(err)
	}
	return ch
}

func InitMQ(ch *amqp.Channel, queue, key, exchange string) {
	// 声明 exchange
	err := ch.ExchangeDeclare(exchange, "topic", true, false, false, false, nil)
	if err != nil {
		panic(err)
	}
	// 声明 queue
	_, err = ch.QueueDeclare(queue, false, false, false, false, nil)
	if err != nil {
		panic(err)
	}
	// 将 queue 与 exchange 和 key 绑定
	err = ch.QueueBind(queue, key, exchange, false, nil)
	if err != nil {
		panic(err)
	}

}

func sendMessage(ch *amqp.Channel, exchange string, key string, message string) {
	err := ch.Publish(exchange, key, false, false, amqp.Publishing{
		Body: []byte(fmt.Sprintf("send to %s, message: %v", key, message)),
	})
	if err != nil {
		panic(err)
	}

}

func subscribe(ch *amqp.Channel, key string, queue string, callback func(<-chan amqp.Delivery, string)) {
	msgs, err := ch.Consume(queue, key, false, false, false, false, nil)

	if err != nil {
		panic(err)
	}
	callback(msgs, key)
}

延迟消息

借助rabbitmq-delayed-message-exchange插件实现(需要先安装好)

package main

import (
	"fmt"
	"github.com/streadway/amqp"
	"log"
	"time"
)

const (
	TopicExchange = "topicExchange"
	DelayExchange = "delayExchange"
	BindingKey1   = "*.*.red"
	BindingKey2   = "*.error.#"
	BindingKey3   = "shanghai.*.*"
	Queue1        = "queue1"
	Queue2        = "queue2"
	Queue3        = "queue3"
	RoutingKey1   = "beijing.error"
	RoutingKey2   = "shanghai.fatal.red"
)

func main() {
	dsn := fmt.Sprintf("amqp://%s:%s@%s:%d/", "xxxx", "xxxx", "xxxx", 5672)
	conn, err := amqp.Dial(dsn)
	if err != nil {
		panic(err)
	}

	ch, err := conn.Channel()
	if err != nil {
		panic(err)
	}
	InitMQ(ch, Queue1, BindingKey1, TopicExchange)
	InitMQ(ch, Queue2, BindingKey2, TopicExchange)
	InitMQ(ch, Queue3, BindingKey3, TopicExchange)
	InitDelayMQ(ch, Queue2, "", DelayExchange)

	ch2 := GenChannel(conn)
	// 限制未ack的最多有5个,必须设置为手动ack才有效
	ch2.Qos(5, 0, false)
	go subscribe(ch2, BindingKey1, Queue1, func(msgs <-chan amqp.Delivery, s string) {
		for msg := range msgs {
			go func(msg amqp.Delivery) {
				fmt.Printf("%v 绑定 %v 收到消息:%v\n", Queue1, BindingKey1, string(msg.Body))
				msg.Ack(false)
			}(msg)
		}
	})
	go subscribe(ch2, BindingKey2, Queue2, func(msgs <-chan amqp.Delivery, s string) {
		for msg := range msgs {
			fmt.Println(time.Now())
			fmt.Printf("%v 绑定 %v 收到消息:%v\n", Queue2, BindingKey2, string(msg.Body))
			isFail := true
			// 如果失败发送延迟消息给
			if isFail {
				delay,ok := msg.Headers["x-delay"].(int32)
				if ok {
					delay = delay * 2
					fmt.Println(delay)
				}else{
					delay = 1000
				}
				sendDelayMessage(ch, DelayExchange, "", string(msg.Body), int(delay))
				msg.Reject(false)
			} else {
				msg.Ack(false)
			}
		}
	})
	go subscribe(ch2, BindingKey3, Queue3, func(msgs <-chan amqp.Delivery, s string) {
		for msg := range msgs {
			fmt.Printf("%v 绑定 %v 收到消息:%v\n", Queue3, BindingKey3, string(msg.Body))
			msg.Ack(false)
		}
	})
	// 设置confirm,发送端消息确认
	//var notifyConfirm chan amqp.Confirmation
	//SetConfirm(ch, notifyConfirm)
	//go ListenConfirm(notifyConfirm)
	//var notifyReturn chan amqp.Return
	//NotifyReturn(notifyReturn,ch)
	//go ListReturn(notifyReturn)

	cycleCount := 1
	for i := 0; i < cycleCount; i++ {
		fmt.Println(i)
		//sendDelayMessage(ch, DelayExchange, "", "beijing.error-----------------",3000)
		sendMessage(ch, TopicExchange, RoutingKey1, "beijing.error")
		//sendMessage(ch, TopicExchange, RoutingKey2, "shanghai.fatal.red")
		time.Sleep(500 * time.Millisecond)
	}
	select {}
}

func GenChannel(conn *amqp.Connection) *amqp.Channel {
	ch, err := conn.Channel()
	if err != nil {
		log.Fatal(err)
	}
	return ch
}

func InitMQ(ch *amqp.Channel, queue, key, exchange string) {
	// 声明 exchange
	err := ch.ExchangeDeclare(exchange, "topic", true, false, false, false, nil)
	if err != nil {
		panic(err)
	}
	// 声明 queue
	_, err = ch.QueueDeclare(queue, false, false, false, false, nil)
	if err != nil {
		panic(err)
	}
	// 将 queue 与 exchange 和 key 绑定
	err = ch.QueueBind(queue, key, exchange, false, nil)
	if err != nil {
		panic(err)
	}

}

func InitDelayMQ(ch *amqp.Channel, queue, key, exchange string) {
	//申明交换机
	err := ch.ExchangeDeclare(exchange, "x-delayed-message",
		false, false, false, false,
		map[string]interface{}{"x-delayed-type": "direct"})
	if err != nil {
		log.Fatal(err)
	}
	// 声明 queue
	_, err = ch.QueueDeclare(queue, false, false, false, false, nil)
	if err != nil {
		panic(err)
	}
	// 将 queue 与 exchange 和 key 绑定
	err = ch.QueueBind(queue, key, exchange, false, nil)
	if err != nil {
		panic(err)
	}
}

func sendMessage(ch *amqp.Channel, exchange string, key string, message string) {
	err := ch.Publish(exchange, key, true, false, amqp.Publishing{
		Body: []byte(fmt.Sprintf("%v", message)),
	})
	if err != nil {
		panic(err)
	}

}

func sendDelayMessage(ch *amqp.Channel, exchange string, key string, message string, delay int) {
	err := ch.Publish(exchange, key, true, false, amqp.Publishing{
		Headers: map[string]interface{}{"x-delay": delay},
		Body:    []byte(fmt.Sprintf("%v", message)),
	})
	if err != nil {
		panic(err)
	}

}

func subscribe(ch *amqp.Channel, key string, queue string, callback func(<-chan amqp.Delivery, string)) {
	msgs, err := ch.Consume(queue, key, false, false, false, false, nil)

	if err != nil {
		panic(err)
	}
	callback(msgs, key)
}

func SetConfirm(ch *amqp.Channel, notifyConfirm chan amqp.Confirmation) {
	err := ch.Confirm(false)
	if err != nil {
		log.Println(err)
	}
	notifyConfirm = ch.NotifyPublish(make(chan amqp.Confirmation))
}

func ListenConfirm(notifyConfirm chan amqp.Confirmation) {
	for ret := range notifyConfirm {
		if ret.Ack {
			fmt.Println("消息发送成功")
		} else {
			fmt.Println("消息发送失败")
		}
	}
}

func NotifyReturn(notifyReturn chan amqp.Return, channel *amqp.Channel) {
	notifyReturn = channel.NotifyReturn(make(chan amqp.Return))
}
func ListReturn(notifyReturn chan amqp.Return) {
	ret := <-notifyReturn
	if string(ret.Body) != "" {
		fmt.Println("消息没有投递到队列:", string(ret.Body))
		panic("skfh")
	}
}

持久化 交换机持久化:

交换机持久化是指将交换机的属性数据存储在磁盘上,当 MQ 的服务器发生意外或关闭之后,在重启 RabbitMQ 时不需要重新手动或执行代码去创建交换机了,交换机会自动被创建,相当于一直存在。

队列持久化

如果不将队列设置为持久化,那么在 RabbitMQ 服务重启之后,相关队列的元数据会丢失,数据也会丢失。队列都没有了,消息也找不到地方存储了。

消息持久化

RabbitMQ 的消息是依附于队列存在的,所以要想消息持久化,那么前提是队列也必须设置持久化。
在创建消息的时候,添加一个持久化消息的属性(将 delivery_mode 设置为 2)。


设置了队列和消息的持久化,当 RabbitMQ 服务重启之后,消息依旧会存在;

仅设置队列持久化,重启之后消息会丢失;

仅设置消息持久化,重启之后队列会消失,因此消息也就丢失了,所以只设置消息持久化而不设置队列持久化是没有意义的;

将所有的消息都设置为持久化(写入磁盘的速度比写入内存的速度慢的多),可能会影响 RabbitMQ 的性能,对于可靠性不是那么高的消息可以不采用持久化来提高 RabbitMQ 的吞吐量。

欢迎分享,转载请注明来源:内存溢出

原文地址: https://www.outofmemory.cn/langs/996214.html

(0)
打赏 微信扫一扫 微信扫一扫 支付宝扫一扫 支付宝扫一扫
上一篇 2022-05-21
下一篇 2022-05-21

发表评论

登录后才能评论

评论列表(0条)

保存