• 周五. 3月 29th, 2024

5G编程聚合网

5G时代下一个聚合的编程学习网

热门标签

python之 rabbitmq

admin

11月 28, 2021

一.发布hello world

首先我们看一个最简单的消息队列系统

 1 #!/usr/bin/env python3
 2 # -*- coding: utf-8 -*-
 3 """
 4 @author: zengchunyun
 5 """
 6 import pika
 7 
 8 
 9 connection = pika.BlockingConnection(pika.ConnectionParameters("localhost"))
10 channel = connection.channel()
11 
12 channel.queue_declare(queue="hello")  # 在发送之前,我们要确保这个队列存在,如果我们发送一个消息到不存在的队列,rabbitmq会认为这个是垃圾消息
13 # 我们已经创建了一个队列名为hello,待会我们的消息都会发送到这个队列
14 
15 
16 # rabbitmq不允许我们直接将消息发送到队列,而是通过一个交换器,现在我们使用一个特殊对交换器,它是一个空对字符串标识对交换器,它能确保我们对消息该放到哪个
17 # 队列,这个队列需要特殊对rouning_key
18 channel.basic_publish(exchange="",
19                       routing_key="hello",
20                       body="hello world",)
21 
22 print(" [x] Sent 'hello world")
23 
24 # 在退出之前,我们需要确保网络缓冲区已经清空,且我们对消息的确已经发送到rabbitMQ,我们可以关闭这个连接
25 connection.close()

发布者

 1 #!/usr/bin/env python3
 2 # -*- coding: utf-8 -*-
 3 """
 4 @author: zengchunyun
 5 """
 6 import pika
 7 import time
 8 
 9 connection = pika.BlockingConnection(pika.ConnectionParameters(host="localhost"))
10 channel = connection.channel()
11 
12 channel.queue_declare(queue="hello")  # 为了确保每次订阅的队列都存在,我们先声明一个队列
13 
14 # 接收队列消息需要通过回调函数来接收
15 
16 
17 def callback(ch, method, properties, body):
18     print(" [x] Received %r" % body)
19     time.sleep(3)
20     ch.basic_ack(delivery_tag = method.delivery_tag)
21 
22 channel.basic_consume(callback,
23                       queue="hello",
24                       no_ack=True)  # 这里我们需要告诉rabbitMQ这个回调函数会从我们的hello队列接收消息,关闭消息确认标记,
25 # 那么当worker工作中异常,如没有完成任务就关闭了连接,可能会丢失任务,使用no_ack=True默认只要rabbit分配任务给该worker了,就会将任务从队列删除
26 
27 print(" [*] Waiting for messages. To exit press CTRL+C")
28 channel.start_consuming()  # 我们这里进入了一个永不终止的循环等待数据状态

订阅者

上面这个系统只是单一的消息队列,

发表回复

您的电子邮箱地址不会被公开。 必填项已用*标注