一个用于连接RabbitMQ的包装器,限制客户端仅使用单一用途通道(生产者或消费者),并支持间歇性连接的恢复。
项目描述
talus (名词) - ta·lus | ˈtā-ləs: 由岩石碎屑堆积形成的斜坡;兔子的偶尔栖息地。
一个用于连接RabbitMQ的包装器,限制客户端仅使用单一用途通道(生产者或消费者),并支持间歇性连接的恢复。
功能
引导分离生产者和消费者的连接
丢失连接时重新建立到服务器的连接
受限制的接口,以支持直接交换的简单生产/消费用例
安装
pip install talus
示例
创建一个监听队列、处理有效消息并在处理过程中发布消息的消费者
使用默认连接参数和连接重试器,期望RabbitMQ服务器在其默认配置下运行。
from talus import DurableConsumer
from talus import DurableProducer
from talus import ConnectionRetryerFactory
from talus import ConsumerConnectionParameterFactory, ProducerConnectionParameterFactory
from talus import MessageProcessorBase
from talus import ConsumeMessageBase, PublishMessageBase, MessageBodyBase
from talus import Queue
from talus import Exchange
from talus import Binding
from typing import Type
##########################
# Consumer Configurations#
##########################
# Configure messages that will be consumed
class ConsumeMessageBody(MessageBodyBase):
objectName: str
bucket: str
class ConsumeMessage(ConsumeMessageBase):
message_body_cls: Type[ConsumeMessageBody] = ConsumeMessageBody
# Configure the queue the messages should be consumed from
inbound_queue = Queue(name="inbound.q")
###########################
# Producer Configurations #
###########################
# Configure messages that will be produced
class ProducerMessageBody(MessageBodyBase):
key: str
code: str
class PublishMessage(PublishMessageBase):
message_body_cls: Type[ProducerMessageBody] = ProducerMessageBody
default_routing_key: str = "outbound.message.m"
# Configure the queues the message should be routed to
outbound_queue_one = Queue(name="outbound.one.q")
outbound_queue_two = Queue(name="outbound.two.q")
# Configure the exchange and queue bindings for publishing (Publish Message -> Outbound Queues)
publish_exchange = Exchange(name="outbound.exchange") # Direct exchange by default
bindings = [Binding(queue=outbound_queue_one, message=PublishMessage),
Binding(queue=outbound_queue_two, message=PublishMessage)] # publishing PublishMessage will route to both queues.
############################
# Processor Configurations #
############################
# Configure a message processor to handle the consumed messages
class MessageProcessor(MessageProcessorBase):
def process_message(self, message: ConsumeMessage):
print(message)
outbound_message = PublishMessage(
body=ProducerMessageBody(
key=message.body.objectName,
code="newBucket",
conversationId=message.body.conversationId,
)
) # crosswalk the values from the consumed message to the produced message
self.producer.publish(outbound_message)
print(outbound_message)
# Actually Connect and run the consumer
def main():
"""Starts a listener which will consume messages from the inbound queue and publish messages to the outbound queues."""
with DurableProducer(
queue_bindings=bindings,
publish_exchange=publish_exchange,
connection_parameters=ProducerConnectionParameterFactory(),
connection_retryer=ConnectionRetryerFactory(),
) as producer:
with DurableConsumer(
consume_queue=inbound_queue,
connection_parameters=ConsumerConnectionParameterFactory(),
connection_retryer=ConnectionRetryerFactory(),
) as consumer:
message_processor = MessageProcessor(message_cls=ConsumeMessage, producer=producer)
consumer.listen(message_processor)
if __name__ == "__main__":
# First message to consume
class InitialMessage(PublishMessageBase):
message_body_cls: Type[
ConsumeMessageBody] = ConsumeMessageBody
default_routing_key: str = "inbound.message.m"
initial_message_bindings = [Binding(queue=inbound_queue, message=InitialMessage)]
with DurableProducer(
queue_bindings=initial_message_bindings,
publish_exchange=publish_exchange,
connection_parameters=ProducerConnectionParameterFactory(),
connection_retryer=ConnectionRetryerFactory(),
) as producer:
producer.publish(InitialMessage(body={"objectName": "object", "bucket": "bucket"}))
# Consume the message and process it
main()
项目详情
下载文件
下载适用于您平台的文件。如果您不确定选择哪一个,请了解有关安装包的更多信息。
源分发
talus-1.2.1.tar.gz (21.6 kB 查看哈希值)
构建分发
talus-1.2.1-py3-none-any.whl (25.1 kB 查看哈希值)