遵循最佳实践的RabbitMQ实用工具
项目描述
lapinmq
作者: Madhur Tandon
lapin ~ rabbit 在法语中
通过pip install lapinmq
安装
TLDR
lapinmq 包含遵循最佳实践的RabbitMQ(或任何其他AMQP协议兼容的消息代理)实用工具,例如
- 具有后台心跳的长生存消费者和发布者
- 多线程消费者,可以与预取和手动确认并行处理消息
- 异步和同步发布者确认的选项(针对不同的网络延迟场景)-- 详细信息在这里
- 通过回调协调异步发布者确认和消费者确认的能力,当消费者也是发布者时非常有用。
- 检测不可路由的消息,即对于交换机和提供的路由键不存在队列的情况
这些实用工具构建在pika
之上 -- RabbitMQ团队官方推荐的库。
注意
- 该库提供了发送和接收消息的简单抽象。
- 支持
工作队列中的竞争消费者
、发布/订阅
、路由
、主题
等流行模式。 - 尚未实现RPC(请求/回复)模式。
- 不支持资源声明(队列、交换机等)。请确保这些在之前已经存在。
- 除非我们声明一个临时专用队列。
- 欢迎为改进代码库、示例、文档等做出贡献。
为什么还需要另一个pika包装器?
- 文档在某种意义上是原始的,因为教程没有展示如何使消费者和发布者长生存
- 将预取计数设置为> 1仅仅意味着消费者RAM中有多个消息,并且没有合适的示例说明如何利用多线程并行处理它们
- 在高网络延迟场景中,同步通道确认可能导致发送吞吐量非常低,因此,选择异步发布者选项。
用法
在使用以下工具之前,请确保设置以下环境变量
- RABBITMQ_HOST: RabbitMQ服务器可访问的主机地址
- RABBITMQ_PORT: RabbitMQ服务器的端口号(默认为5672)
- RABBITMQ_USERNAME: RabbitMQ服务器的用户名
- RABBITMQ_PASSWORD: RabbitMQ服务器的密码
发送消息
import time
from lapinmq.publisher import Publisher
from lapinmq.utils import wait_for_sigterm
p = Publisher(kind="sync") # or perhaps p = Publisher(kind="async")
p.start()
for i in range(100):
p.send_message_to_queue(queue_name='task_queue', body='...')
# if the publisher is async, can also pass callbacks i.e.
# p.send_message_to_queue(queue_name='task_queue', body='...', callbacks={})
time.sleep(300) # 5 minutes in seconds, to showcase long wait times between sending messages
# OR we can also use the `send_message` function for more advanced usage
for i in range(100):
p.send_message(exchange='', routing_key='task_queue', body='...')
# no time.sleep() here to showcase 0 wait time between sending messages
sigterm_received = wait_for_sigterm()
assert sigterm_received
p.stop()
注意:如果消息不可路由,建议配置备用交换机。这是因为 lapinmq
假设最佳实践,即让发布者崩溃而不是丢弃不可路由的消息。这是为了避免后续丢弃不可路由的消息。崩溃可以立即提供反馈以便检查问题。
接收消息
要接收消息,使用队列名称和任务函数创建一个 lapinmq.Consumer
类的实例。也可以传入工作进程的数量(默认为1)。
任务函数接收接收到的消息作为参数,并必须返回以下4种状态之一
- MessageStatus.SUCCESS: 如果消息处理成功,并且应从队列中删除消息。
- MessageStatus.FAIL_RETRY_LATER: 如果消息处理失败,并且应稍后重试消息。
- MessageStatus.FAIL_DO_NOT_RETRY: 如果消息处理失败,并且不应稍后重试消息。
- MessageStatus.HANDLED_VIA_CALLBACK: 对于高级用法,稍后介绍
如果函数抛出异常,则该情况类似于 FAIL_RETRY_LATER
from lapinmq.message import MessageStatus
from lapinmq.consumer import Consumer
from lapinmq.utils import wait_for_sigterm
def task(message):
body = message.body.decode()
# process the received message body
return MessageStatus.SUCCESS # or perhaps FAIL_RETRY_LATER or FAIL_DO_NOT_RETRY
c = Consumer(
queue_name='task_queue',
task_function=task,
worker_threads=3 # this consumer can process 3 messages in parallel
)
c.start()
sigterm_received = wait_for_sigterm()
assert sigterm_received
c.stop()
注意:也可以传入交换机和绑定键。
以下示例将使队列 task_queue
与交换机 color
绑定,绑定键为 black
和 red
。
请确保队列 task_queue
和交换机 color
在此之前存在。
c = Consumer(
queue_name='task_queue',
task_function=task,
exchange='color',
binding_keys=['black', 'red'],
worker_threads=3 # this consumer can process 3 messages in parallel
)
临时和独占队列
要处理 publish/subscribe
、routing
和 topics
等模式,通常需要创建临时独占队列。可以通过将空字符串传递给 queue_name
参数来实现。在这种情况下,服务器将选择一个随机的队列名称。
请再次确保交换机 direct_logs
在此之前存在。
c1 = Consumer(
queue_name='',
task_function=task_info,
exchange='direct_logs',
binding_keys=['info'],
worker_threads=3
)
c2 = Consumer(
queue_name='',
task_function=task_warning,
exchange='direct_logs',
binding_keys=['info', 'warning'],
worker_threads=3
)
c3 = Consumer(
queue_name='',
task_function=task_error,
exchange='direct_logs',
binding_keys=['info', 'warning', 'error'],
worker_threads=3
)
注意:发布者和消费者类的 p.start()
和 c.start()
方法分别在下面启动一个 daemon
线程。这意味着调用这些方法的脚本必须有一些等待机制。否则,如果主线程退出,发布/消费也会停止。
在上述示例中,等待机制正在监听 SIGTERM。
另一种用法可能是休眠一段时间
p.start() # OR c.start()
print("will terminate after 10 mins")
time.sleep(600)
p.stop() # OR c.stop()
但是,以下示例将不起作用
p.start() # OR c.start()
print("main thread exiting now")
因为在这里,在主线程退出后,守护线程也会退出,即我们在 stdout 上显示 main thread exiting now
后停止发布/消费消息。
最后,由于队列通常用于像 web-server 这样的长期运行的应用程序,主线程按定义永远不会退出。因此,没有等待机制实际上并不是一个问题。
高级:既是消费者又是发布者的消费者
考虑两个队列 - 一个是消费者从其中获取消息的源队列,然后处理它并将消息(相同或不同的)发送到目标队列,从而也充当发布者。
注意:源队列和目标队列可以引用同一个队列,但术语是使用来理解这个概念的。
本质上,task()
函数还负责向目标队列发送消息。
但是,我们只想在消息到达目标队列并且我们已经从代理那里收到交付确认时,才承认(删除)来自源队列的消息。
同步发布者
在这种情况下,除非收到之前发送的消息的确认,否则我们无法向代理发送消息。这提供了强大的保证,但在高网络延迟场景中可能会显著变慢。
from lapinmq.message import MessageStatus
from lapinmq.consumer import Consumer
from lapinmq.publisher import Publisher
from lapinmq.utils import wait_for_sigterm
p = Publisher(kind="sync")
p.start()
def task(message):
body = message.body.decode()
# process the received message body
# send a message to the destination queue
p.send_message_to_queue(queue_name='destination_queue', body='...')
return MessageStatus.SUCCESS # or perhaps FAIL_RETRY_LATER or FAIL_DO_NOT_RETRY
c = Consumer(
queue_name='task_queue',
task_function=task,
worker_threads=3 # this consumer can process 3 messages in parallel
)
c.start()
sigterm_received = wait_for_sigterm()
assert sigterm_received
c.stop()
p.stop()
异步发布者
在这种情况下,我们不知道代理何时会为发送给它的消息提供确认。然而,一旦我们收到来自代理的交付确认,就会触发一个回调。我们可以在回调中向源队列发送确认。这种机制有助于解决时序问题,因为我们应该在收到发送消息的交付确认后,才承认来自源队列的消息。
提供acknowledge
、reject
或reject with retry
功能的函数作为消息对象中的message.ack
、message.reject
、message.reject_retry
。
from lapinmq.message import MessageStatus
from lapinmq.consumer import Consumer
from lapinmq.publisher import Publisher
from lapinmq.utils import wait_for_sigterm
p = Publisher(kind="async")
p.start()
def task(message):
body = message.body.decode()
# process the received message body
# send a message to the destination queue
p.send_message_to_queue(
queue_name='destination_queue',
body='...',
callbacks={
# when the broker confirms that message has been sent to the destination queue,
# send an acknowledgement to the source queue
'ack_callback': message.ack,
# when the broker confirms that message has not been sent to the destination queue,
# reject the message from the source queue and try again later
'nack_callback': message.reject_retry,
})
# callbacks are responsible for various kinds of acknowlegements to the source queue
return MessageStatus.HANDLED_VIA_CALLBACK
c = Consumer(
queue_name='task_queue',
task_function=task,
worker_threads=3 # this consumer can process 3 messages in parallel
)
c.start()
sigterm_received = wait_for_sigterm()
assert sigterm_received
c.stop()
p.stop()
当然,可以在这些回调中传递任何函数,但理想的情况是向源队列发送确认。唯一条件是传递给回调的函数不接受任何参数。接受参数的函数可以使用functools.partial
模块转换为接受0个参数的函数。
import json
from functools import partial
from lapinmq.publisher import Publisher
p = Publisher(kind="async")
p.start()
def task(message):
body = message.body.decode()
value = json.loads(body).get("eta")
def custom_callback(val):
# increment some metric
counter.increment(val)
# send acknowledgement to the source queue
message.ack()
custom_fn = partial(custom_callback, value)
# send a message to the destination queue
p.send_message_to_queue(
queue_name='destination_queue',
body='...',
callbacks={
# when the broker confirms that message has been sent to the destination queue,
# call the custom_callback function that also sends the acknowledgement to the source queue
'ack_callback': custom_fn,
'nack_callback': message.reject_retry,
})
return MessageStatus.HANDLED_VIA_CALLBACK
c = Consumer(
queue_name='task_queue',
task_function=task,
worker_threads=3 # this consumer can process 3 messages in parallel
)
c.start()
sigterm_received = wait_for_sigterm()
assert sigterm_received
c.stop()
p.stop()
潜在改进
多通道的连接复用
一个潜在的改进是将单个AMQP连接与多个通道进行复用。
考虑以下代码片段:
from lapinmq.consumer import Consumer
c1 = Consumer(
queue_name='preprocessing_task_queue',
task_function=task,
worker_threads=3 # this consumer can process 3 messages in parallel
)
c2 = Consumer(
queue_name='transcoding_task_queue',
task_function=task,
worker_threads=5 # this consumer can process 5 messages in parallel
)
当前,库为上面定义的每个消费者创建1个连接和1个通道。相反,我们可以创建1个连接和2个通道...
创建连接的成本更高,因为它是实际的TCP连接。它们受到底层操作系统允许的套接字/文件描述符等的限制。此外,每个新的TCP连接都涉及大量的握手等。
影响
如果一个在kubernetes集群内部运行的pod有X个消费者(如上面代码片段中的X=2)-->当前实现中的X个连接和X个通道。
如果我们扩展到50个pod,那么我们将有50X个消费者-->50X个连接和50X个通道。
但是,通过连接复用,X个消费者将意味着1个连接和X个通道。因此,在扩展时,50个pod将意味着拥有50个连接和50X个通道。
显然,改进是从50X到50,即所需连接数。
限制
pika
-- 在底层使用的库不是线程安全的,因此,在一个进程中使用一个连接,并在不同的线程中使用不同的通道是不可能的。
FAQ 在此处 也描述了相同的内容,并建议为每个线程创建一个pika连接(已在当前实现中完成)。
因此,这个潜在的改进在pika
中是不可能的。
附录
用简单的话来说一些概念
1. 长期存在的消费者和发布者,它们在后台有心跳
消费者和发布者需要向RabbitMQ代理发送心跳以表明它们处于活动状态,从而指示保持底层TCP连接打开。然而,
- 对于消费者,如果处理消息的时间比平常长(比如10分钟),这会干扰消费者的心跳机制,因为消息的处理会阻塞主线程。
- 对于发布者,如果我们想在发送下一个后续消息之前等待一段时间(比如10分钟),这种等待会干扰发布者的心跳机制,因为它会阻塞主线程。
因此,我们的想法是在单独的线程中卸载心跳机制,使我们能够保持底层连接打开。
考虑以下场景(为了理解发送者/发布者的案例)
- 打开冰箱,放一个苹果进去,关上冰箱。重复这个过程1000次。
- 在上面的例子中,打开冰箱 ~ 打开TCP连接
- 放一个苹果进去 ~ 向RabbitMQ代理发送消息
- 关上冰箱 ~ 关闭TCP连接
- 心跳就是我们对管家喊话,不要关上冰箱,管家每隔几分钟就会这么做
显然,计算机(没有多线程或多进程)只能做一项任务,即要么保持苹果,要么喊管家提醒她不要关冰箱。如果我们花太多时间保持苹果,管家就会关上冰箱。
我们想要避免这种情况,因为TCP连接的创建成本很高(涉及多个握手),并且每次发送消息就创建一个新的连接没有意义。
因此,一个长期存在的发布者,底层TCP连接(/冰箱)保持打开(在单独的线程中)是一个更好的方法,然后我们可以向代理(/冰箱中)发送尽可能多的消息(/保持尽可能多的苹果)。当我们完成时,我们最终可以关闭连接(/冰箱)。
2. 可以通过设置prefetch > 1并行处理消息的多线程消费者
预取计数 ~ 消费者可以在RAM中保持多少条消息,即在任何时刻的内存。
显然,如果消费者在RAM(即内存)中保持>1条消息,并且一次只处理其中的一条,那么其他保存在RAM中的消息就处于闲置状态。
例如,假设有一个队列有1000条消息,2个消费者,每个消费者的预取计数分别为2和3。那么,第一个消费者在RAM中获得2条消息,第二个消费者在RAM中获得3条消息,剩下995条消息在队列中。
为了提取最大性能,第一个消费者应该启动2个线程,以便能够并行处理这2条消息。一旦其中一条消息被处理,第一个消费者就会收到另一条消息(这样在任何时刻,这个消费者RAM中的消息数最多为2),空闲的线程应该取走它(而不是为每条新消息创建一个新的线程)-- 确保线程数始终为2,一旦完成前面的消息就立即取走新消息。
结论:本质上,RabbitMQ的消费者也实现了
竞争消费者模式(线程等待消息到达)但在操作系统级别
即线程是RabbitMQ消费者内部队列的消费者,该队列在RAM中存储N条消息,由预取计数确定。
3. 同步与异步发布者确认
虽然这篇文章对主题给出了很好的概述,一个简单的类比也可以有所帮助
- 假设我们想要给一个朋友(/经纪人)1000元,100张10元的钞票。每张钞票是一条消息。
- 我们可以给一张钞票,等朋友确认收到了,然后我们才能给他另一张钞票,再次等待他的确认来处理第二张钞票,依此类推... 这是同步方法,如果我和我的朋友相隔很远(想象一下如果朋友住在另一个国家!),这个过程可能会变慢。因此,在高度网络延迟场景中,同步发布者确认可能会变得非常慢,但它们也很稳健,因为我们保证了在发送另一条消息之前,每个发送的消息都已由经纪人接收。
- 对于另一种方法,我们可以一次性将所有100张纸币给我们的朋友,然后由朋友来确认这些纸币——他可以选择逐张确认,或者批量确认,或者两者混合。例如,他可以确认收到前70张纸币,然后是1张,接着是1张,然后是25张,然后是2张,最后是1张,即总共为100张纸币提供了6个确认。唯一的问题是,我们不知道这些确认将在何时到达,因此称为异步。
该库同时提供这两种类型的发布者,并在需要使用异步发布者时提供回调帮助,同时注意解决一些时序问题(在上面的例子中,当消费者也是发布者时应该更清晰)。
项目详情
下载文件
下载适合您平台的应用程序。如果您不确定该选择哪一个,请了解有关安装包的更多信息。
源代码分发
构建分发
lapinmq-0.1.1.tar.gz的散列值
算法 | 散列摘要 | |
---|---|---|
SHA256 | 6765aface0ae246c6856f1af1213c27070f044c99fc2bf36e7c181d27c913d70 |
|
MD5 | 1c79ab2cbeaf44b184f31d4999176e18 |
|
BLAKE2b-256 | de0e2f4cac6a5815675fd7dbc20bd5798224bfff0071322d99d8a0b5a775a78a |
lapinmq-0.1.1-py3-none-any.whl的散列值
算法 | 散列摘要 | |
---|---|---|
SHA256 | 514eed26b3d9d6b353b634eb953831d40b20563354ded95bb45d6e3abefd0913 |
|
MD5 | e4308f811aa16d64c74b32bf7e04be5e |
|
BLAKE2b-256 | 1a839c0fa362258edea9de45c52db8e3b8b5090074ecc5288e9948e7135ccd0c |