mq 的PublishSubscribe 模式

mq 的PublishSubscribe 模式,第1张

mq 的Publish/Subscribe 模式

一个交换机绑定多个队列,使用交换机使用fanout 类型,那么会发给与之绑定的所有队列。路由key 一定要为空字符串

代码如下:
Publish

import pika
from pika.exchange_type import ExchangeType

class Producer(object):
    def __init__(self, queue_name,exchange_name, username, password, host, port, virtual_host):
        con_param = {
            "host": host,
            "port": port,
            "virtual_host": virtual_host,
            "credentials": pika.credentials.PlainCredentials(
                username, password)
        }
        # 建立连接
        self.con = pika.BlockingConnection(pika.ConnectionParameters(**con_param))
        # 声明队列
        self.channel = self.con.channel()
        self.channel.queue_declare(queue=queue_name)
        self.channel.exchange_declare(exchange=exchange_name, exchange_type=ExchangeType.fanout)

    def send_message(self,queue_name,exchange_name,routing_key, body):
        """fanout 类型的交换机 routing_key 为空字符串,给所有绑定这个交换价的队列发送消息"""
        # 绑定交换机
        self.channel.queue_bind(queue=queue_name, exchange=exchange_name, routing_key="")
        # 发送消息
        self.channel.basic_publish(exchange=exchange_name, routing_key=routing_key, body=body)
        # 关闭通道
        self.channel.close()
        # 关闭连接
        self.con.close()


if __name__ == '__main__':
    p = Producer("test", "logs","tom", "tom@tom", "localhost", 5672, "/afei")
    p.send_message("test", "logs","","have a good time")
    p1 = Producer("test01", "logs","tom", "tom@tom", "localhost", 5672, "/afei")
    p1.send_message("test01","logs","","good luck !")

Subscribe

import pika


class Consumer(object):

    def __init__(self, queue_name, username, password, host, port, virtual_host):
        con_param = {
            "host": host,
            "port": port,
            "virtual_host": virtual_host,
            "credentials": pika.credentials.PlainCredentials(
                username, password)
        }

        # 建立连接
        self.con = pika.BlockingConnection(pika.ConnectionParameters(**con_param))
        # 创建通道
        self.channel = self.con.channel()
        self.queue_name = queue_name

    def consume_message(self):
        def callback(ch, method, properties, body):
            print("ch===%r" % ch)
            print("method===%r" % method)
            print("properties===%r" % properties)
            print("[x] Received %r" % body)

        # 消费对象
        self.channel.basic_consume(queue=self.queue_name, on_message_callback=callback, auto_ack=True)
        # 开始消费
        self.channel.start_consuming()
        self.channel.close()


if __name__ == '__main__':
    try:
        c = Consumer("test", "tom", "tom@tom", "localhost", 5672, "/afei")
        c.consume_message()
    except KeyboardInterrupt:
        exit(0)

欢迎分享,转载请注明来源:内存溢出

原文地址: http://www.outofmemory.cn/zaji/5707244.html

(0)
打赏 微信扫一扫 微信扫一扫 支付宝扫一扫 支付宝扫一扫
上一篇 2022-12-17
下一篇 2022-12-17

发表评论

登录后才能评论

评论列表(0条)

保存