一个交换机绑定多个队列,使用交换机使用fanout 类型,那么会发给与之绑定的所有队列。路由key 一定要为空字符串
代码如下:
Publish
import pika from pika.exchange_type import ExchangeType class Producer(object): def __init__(self, queue_name,exchange_name, username, password, host, port, virtual_host): con_param = { "host": host, "port": port, "virtual_host": virtual_host, "credentials": pika.credentials.PlainCredentials( username, password) } # 建立连接 self.con = pika.BlockingConnection(pika.ConnectionParameters(**con_param)) # 声明队列 self.channel = self.con.channel() self.channel.queue_declare(queue=queue_name) self.channel.exchange_declare(exchange=exchange_name, exchange_type=ExchangeType.fanout) def send_message(self,queue_name,exchange_name,routing_key, body): """fanout 类型的交换机 routing_key 为空字符串,给所有绑定这个交换价的队列发送消息""" # 绑定交换机 self.channel.queue_bind(queue=queue_name, exchange=exchange_name, routing_key="") # 发送消息 self.channel.basic_publish(exchange=exchange_name, routing_key=routing_key, body=body) # 关闭通道 self.channel.close() # 关闭连接 self.con.close() if __name__ == '__main__': p = Producer("test", "logs","tom", "tom@tom", "localhost", 5672, "/afei") p.send_message("test", "logs","","have a good time") p1 = Producer("test01", "logs","tom", "tom@tom", "localhost", 5672, "/afei") p1.send_message("test01","logs","","good luck !")
Subscribe
import pika class Consumer(object): def __init__(self, queue_name, username, password, host, port, virtual_host): con_param = { "host": host, "port": port, "virtual_host": virtual_host, "credentials": pika.credentials.PlainCredentials( username, password) } # 建立连接 self.con = pika.BlockingConnection(pika.ConnectionParameters(**con_param)) # 创建通道 self.channel = self.con.channel() self.queue_name = queue_name def consume_message(self): def callback(ch, method, properties, body): print("ch===%r" % ch) print("method===%r" % method) print("properties===%r" % properties) print("[x] Received %r" % body) # 消费对象 self.channel.basic_consume(queue=self.queue_name, on_message_callback=callback, auto_ack=True) # 开始消费 self.channel.start_consuming() self.channel.close() if __name__ == '__main__': try: c = Consumer("test", "tom", "tom@tom", "localhost", 5672, "/afei") c.consume_message() except KeyboardInterrupt: exit(0)
欢迎分享,转载请注明来源:内存溢出
评论列表(0条)