ruby – 使用EventMachine和RabbitMQ的RPC

ruby – 使用EventMachine和RabbitMQ的RPC,第1张

概述我一直在开始使用AMQP gem doc中提供的RabbitMQ RPC sample code,尝试编写执行同步远程调用的非常简单的代码: require "amqp"module RPC class Base include EM::Deferrable def rabbit(rabbit_callback) rabbit_loop = Proc.new 我一直在开始使用AMQP gem doc中提供的RabbitMQ RPC sample code,尝试编写执行同步远程调用的非常简单的代码:

require "amqp"module RPC  class Base    include EM::Deferrable    def rabbit(rabbit_callback)      rabbit_loop = Proc.new {        AMQP.connect do |connection|          AMQP::Channel.new(connection) do |channel|            channel.queue("rpc.queue",:exclusive => false,:durable => true) do |requests_queue|              self.callback(&rabbit_callback)              self.succeed(connection,channel,requests_queue)            end # requests_queue          end # AMQP.channel        end # AMQP.connect        Signal.trap("INT")  { connection.close { EM.stop } }        Signal.trap("TERM") { connection.close { EM.stop } }      }      if !EM.reactor_running?        EM.run do          rabbit_loop.call        end      else        rabbit_loop.call      end    end  end  class Server < Base    def run      server_loop = Proc.new do |connection,requests_queue|        consumer = AMQP::Consumer.new(channel,requests_queue).consume        consumer.on_delivery do |Metadata,payload|          puts "[requests] Got a request #{Metadata.message_ID}. Sending a reply to #{Metadata.reply_to}..."          channel.default_exchange.publish(Time.Now.to_s,:routing_key    => Metadata.reply_to,:correlation_ID => Metadata.message_ID,:mandatory      => true)          Metadata.ack        end      end      rabbit(server_loop)    end  end  class ClIEnt < Base    def sync_push(request)      result = nil      sync_request = Proc.new do |connection,requests_queue|        message_ID = Kernel.rand(10101010).to_s        response_queue = channel.queue("",:exclusive => true,:auto_delete => true)        response_queue.subscribe do |headers,payload|          if headers.correlation_ID == message_ID            result = payload            connection.close { EM.stop }          end        end        EM.add_timer(0.1) do           puts "[request] Sending a request...#{request} with ID #{message_ID}"          channel.default_exchange.publish(request,:routing_key => requests_queue.name,:reply_to    => response_queue.name,:message_ID  => message_ID)        end      end      rabbit(sync_request)      result    end  endend

这个想法很简单:我想让一个消息队列始终准备好(这是由rabbit方法处理的).每当客户端想要发送请求时,它首先创建响应的临时队列以及消息ID;然后,它将请求发布到主消息队列,并等待临时队列中具有相同消息ID的响应,以便知道该特定请求的答案何时准备就绪.我想,message_ID在某种程度上与临时队列是多余的(因为队列也应该是唯一的).

我现在使用此客户端/服务器代码运行虚拟脚本

# server session>> server = RPC::Server.new=> #<RPC::Server:0x007faaa23bb5b0>>> server.runUpdating clIEnt propertIEs[requests] Got a request 3315740. Sending a reply to amq.gen-QCv8nP2dI5Qd6bg2Q1Xhk0...

# clIEnt session>> clIEnt = RPC::ClIEnt.new=> #<RPC::ClIEnt:0x007ffb6be6aed8>>> clIEnt.sync_push "test 1"Updating clIEnt propertIEs[request] Sending a request...test 1 with ID 3315740=> "2012-11-02 21:58:45 +0100">> clIEnt.sync_push "test 2"AMQ::ClIEnt::ConnectionClosedError: Trying to send frame through a closed connection. Frame is #<AMQ::Protocol::MethodFrame:0x007ffb6b9c83d0 @payload="\x002\x00\n\x00\x00\x00\f\x00\x00\x00\x00",@channel=1>

有两点我真的不明白:

>与EventMachine相关:在客户端代码中,如果我希望我的消息实际发布,为什么我必须调用EM.add_timer?为什么使用EM.next_tick不起作用?我的理解是,在这里调用发布时,“一切”应该是“准备好”.
>与AMQP相关:为什么我的客户端因第二次请求的关闭连接而崩溃?每次推送新请求时都应创建一个全新的EM / AMQP循环.

遗憾的是,很少有在线处理EM / AMQP的代码,所以任何帮助都将深表感谢!
任何关于效率的评论也将受到高度赞赏.

解决方法 挖掘文档,我终于发现我实际上需要 once_declared回调以确保在客户端开始使用它时队列已准备就绪.

关于连接问题,似乎不知何故,使用EM :: Deferrable导致问题,因此(非常不令人满意)解决方案只包括不包括EM :: Deferrable.

require "amqp"module RPC  module Base    def rabbit(rabbit_callback)      rabbit_loop = Proc.new {        AMQP.start do |connection|          AMQP::Channel.new(connection) do |channel|            channel.queue("rpc.queue",:durable => true) do |requests_queue|              requests_queue.once_declared do                rabbit_callback.call(connection,requests_queue)              end            end          end        end        Signal.trap("INT")  { AMQP.stop { EM.stop } }        Signal.trap("TERM") { AMQP.stop { EM.stop } }      }      if !EM.reactor_running?        @do_not_stop_reactor = false        EM.run do          rabbit_loop.call        end      else        @do_not_stop_reactor = true        rabbit_loop.call      end    end  end  class Server    include Base    def run      server_loop = Proc.new do |connection,:mandatory      => true)          Metadata.ack        end      end      rabbit(server_loop)    end  end  class ClIEnt    include Base    def sync_push(request)      result = nil      sync_request = Proc.new do |connection,payload|          if headers.correlation_ID == message_ID            result = payload            AMQP.stop { EM.stop unless @do_not_stop_reactor }          end        end        response_queue.once_declared do          puts "[request] Sending a request...#{request} with ID #{message_ID}"          channel.default_exchange.publish(request,:message_ID  => message_ID)        end      end      rabbit(sync_request)      result    end  endend
总结

以上是内存溢出为你收集整理的ruby – 使用EventMachine和RabbitMQ的RPC全部内容,希望文章能够帮你解决ruby – 使用EventMachine和RabbitMQ的RPC所遇到的程序开发问题。

如果觉得内存溢出网站内容还不错,欢迎将内存溢出网站推荐给程序员好友。

欢迎分享,转载请注明来源:内存溢出

原文地址: http://www.outofmemory.cn/langs/1275155.html

(0)
打赏 微信扫一扫 微信扫一扫 支付宝扫一扫 支付宝扫一扫
上一篇 2022-06-08
下一篇 2022-06-08

发表评论

登录后才能评论

评论列表(0条)

保存