博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
RabbitMQ中RPC的实现及其通信机制
阅读量:6940 次
发布时间:2019-06-27

本文共 5162 字,大约阅读时间需要 17 分钟。

RabbitMQ中RPC的实现:客户端发送请求消息,服务端回复响应消息,为了接受响应response,客户端需要发送一个回调队列的地址来接受响应,每条消息在发送的时候会带上一个唯一的correlation_id,相应的服务端处理计算后会将结果返回到对应的correlation_id。

RPC调用流程:

当生产者启动时,它会创建一个匿名的独占回调队列,对于一个RPC请求,生产者发送一条具有两个属性的消息:reply_to(回调队列),correlation_id(每个请求的唯一值),请求被发送到rpc_queue队列,消费者等待该队列上的请求。当一个请求出现时,它会执行该任务,将带有结果的消息发送回生产者。生产者等待回调队列上的数据,当消息出现时,它检查相关ID属性,如果它与请求中的值匹配,则返回对应用程序的响应。

 RabbitMQ斐波拉契计算的RPC,消费者实现:

"""基于RabbitMQ实现RPC通信机制 --> 服务端"""import pikaimport uuidfrom functools import lru_cacheclass RabbitServer(object):    def __init__(self):        self.conn = pika.BlockingConnection(            pika.ConnectionParameters(host='localhost', port=5672)        )        self.channel = self.conn.channel()        # 声明一个队列,并进行持久化,exclusive设置为false        self.channel.queue_declare(            exclusive=False, durable=True, queue='task_queue'        )        # 声明一个exhange交换机,类型为topic        self.channel.exchange_declare(            exchange='logs_rpc', exchange_type='topic', durable=True        )        # 将队列与交换机进行绑定        routing_keys = ['#']  # 接受所有的消息        for routing_key in routing_keys:            self.channel.queue_bind(                exchange='logs_rpc', queue='task_queue', routing_key=routing_key            )    @lru_cache()    def fib(self, n):        """        斐波那契数列.===>程序的处理逻辑        使用lru_cache 优化递归        :param n:        :return:        """        if n == 0:            return 0        elif n == 1:            return 1        else:            return self.fib(n - 1) + self.fib(n - 2)    def call_back(self, channel, method, properties, body):        print('------------------------------------------')        print('接收到的消息为(斐波那契数列的入参项为):{}'.format(str(body)))        print('消息的相关属性为:')        print(properties)        value = self.fib(int(body))        print('斐波那契数列的运行结果为:{}'.format(str(value)))        # 交换机将消息发送到队列        self.channel.basic_publish(            exchange='',            routing_key=properties.reply_to,            body=str(value),            properties=pika.BasicProperties(                delivery_mode=2,                correlation_id=properties.correlation_id,            ))        # 消费者对消息进行确认        self.channel.basic_ack(delivery_tag=method.delivery_tag)    def receive_msg(self):        print('开始接受消息...')        self.channel.basic_qos(prefetch_count=1)        self.channel.basic_consume(            consumer_callback=self.call_back,            queue='task_queue',            no_ack=False,  # 消费者对消息进行确认            consumer_tag=str(uuid.uuid4())        )    def consume(self):        self.receive_msg()        self.channel.start_consuming()if __name__ == '__main__':    rabbit_consumer = RabbitServer()    rabbit_consumer.consume()

 生产者实现:

"""基于RabbitMQ实现RPC通信机制 --> 客户端"""import pikaimport uuidimport timeclass RabbitClient(object):    def __init__(self):        # 与RabbitMq服务器建立连接        self.conn = pika.BlockingConnection(            pika.ConnectionParameters(host='localhost', port=5672)        )        self.channel = self.conn.channel()        # 声明一个exchange交换机,交换机的类型为topic        self.channel.exchange_declare(            exchange='logs_rpc', exchange_type='topic', durable=True        )        # 声明一个回调队列,用于接受RPC回调结果的运行结果        result = self.channel.queue_declare(durable=True, exclusive=False)        self.call_queue = result.method.queue        # 从回调队列当中获取运行结果.        self.channel.basic_consume(            consumer_callback=self.on_response,            queue=self.call_queue,            no_ack=False        )    def on_response(self, channel, method, properties, body):        """        对收到的消息进行确认        找到correlation_id与服务端的消息标识匹配的消息结果        :param channel:        :param method:        :param properties:        :param body:        :return:        """        if self.corr_id == properties.correlation_id:            self.response = body            print('斐波那契数列的RPC返回结果是:{}'.format(body))            print('相关属性信息:')            print(properties)        self.channel.basic_ack(delivery_tag=method.delivery_tag)    def send_msg(self, routing_key, message):        """        exchange交换机将根据消息的路由键将消息路由到对应的queue当中        :param routing_key: 消息的路由键        :param message: 生成者发送的消息        :return:        """        self.response = None        self.corr_id = str(uuid.uuid4())        self.channel.basic_publish(            exchange='logs_rpc',            routing_key=routing_key,            body=message,            properties=pika.BasicProperties(                delivery_mode=2,                correlation_id=self.corr_id,                reply_to=self.call_queue,            ))        while self.response is None:            print('等待远程服务端的返回结果...')            self.conn.process_data_events()  # 非阻塞式的不断获取消息.        return self.response    def close(self):        self.conn.close()if __name__ == "__main__":    rabbit_producer = RabbitClient()    routing_key = 'hello every one'    start_time = int(time.time())    for item in range(2000):        num = str(item)        print('生产者发送的消息为:{}'.format(num))        rabbit_producer.send_msg(routing_key, num)    end_time = int(time.time())    print("耗时{}s".format(str(end_time - start_time)))

计算2000以内的斐波拉契数列,执行结果如下:

 

 

转载于:https://www.cnblogs.com/FG123/p/10137411.html

你可能感兴趣的文章
[LeetCode]Maximum Product Subarray
查看>>
thinkphp 读取页面报错 说 /Runtime/Cache/Home/XXXXXX.php 错误
查看>>
第三届开源操作系统年度技术年会 --- 资料下载
查看>>
揭秘Windows10 UWP中的httpclient接口[2]
查看>>
Cookie的使用
查看>>
Jenkins和maven自动化构建java程序
查看>>
读取系统执行状态的shell脚本
查看>>
arcgis 10.1 导入数据到oracle 发布地图服务
查看>>
高吞吐高并发Java NIO服务的架构(NIO架构及应用之一)
查看>>
znujljhzolktftcc
查看>>
高流量站点NGINX与PHP-fpm配置优化
查看>>
eclipse中改变默认的workspace的方法及说明
查看>>
一个基于MVVM的TableView组件化实现方案
查看>>
教你区分LVDS屏线及屏接口定义
查看>>
C函数tolower,与toupper
查看>>
c 进程间的通信
查看>>
【张宴】PHP在金山游戏运营中的应用
查看>>
tomcat发布静态网页
查看>>
python函数参数前面单星号(*)和双星号(**)的区别
查看>>
深入理解javascript作用域系列第三篇——声明提升(hoisting)
查看>>