通过RabbitMQ发送和接收领域事件
项目描述
领域事件代理
这个库在RabbitMQ主题交换之上提供了一个浅层,用于发布和接收领域事件。发布者和订阅者无需了解彼此,可以按任何顺序启动和停止。每个订阅者控制自己的重试策略,是否需要在他们下线时使用持久队列,或者在没有错误的情况下在订阅者中设置死信队列。
配置
这个库需要连接到RabbitMQ。默认情况下,使用本地的RabbitMQ实例。这可以通过传递一个amqp URL给publish_domain_event
或实例化Publisher
或Subscriber
时进行更改。
from domain_event_broker import Subscriber
subscriber = Subscriber('amqp://user:password@rabbitmq-host/domain-events')
集成
Django
这个库可以通过Django设置进行配置。将domain_event_broker.django添加到您的INSTALLED_APPS
,并在设置中设置DOMAIN_EVENT_BROKER
。
INSTALLED_APPS = (
'domain_event_broker.django',
)
DOMAIN_EVENT_BROKER = 'amqp://user:password@rabbitmq-host/domain-events'
更多信息可以在文档中找到。
发送事件
可以通过调用publish_domain_event
发送事件。
from domain_event_broker import publish_domain_event
publish_domain_event('user.registered', {'user_id': user.id})
域事件会立即发送。当在数据库事务中发出域事件时,建议在事务提交后再进行发布。使用提交钩子可以避免在事务因错误回滚后产生虚假的域事件。
接收事件
订阅者可以监听一个或多个域事件——通过绑定键进行控制。绑定键可以包含通配符。将为每个订阅者创建一个队列。RabbitMQ 会负责将相关的事件仅路由到该队列。
此脚本将接收用户域中发送的所有事件
from domain_event_broker import Subscriber
def log_user_event(event):
print(event)
subscriber = Subscriber()
subscriber.register(log_user_event, 'printer', ['user.*'])
subscriber.start_consuming()
重试策略
如果在消费消息时出现问题——例如,一个网络服务已关闭——订阅者可以引发错误,在给定的延迟后重试处理事件
from domain_event_broker import Subscriber
def sync_user_data(event):
try:
publish_to_service(event)
except ServiceIsDown:
raise Retry(5.0 ** event.retries) # 1s, 5s, 25s
subscriber = Subscriber()
subscriber.register(sync_user_data, 'sync_data', ['user.*'], max_retries=3)
subscriber.start_consuming()
延迟重试绑定到消费者,而不是事件。如果超过 max_retries
,事件将被丢弃或死信处理。
开发
确保您已在本地上安装 RabbitMQ 以进行测试。
- 创建虚拟环境并激活它
- 使用
pip install -r requirements.txt -r dev_requirements.txt -e .
安装依赖项 - 使用
pytest
运行测试
项目详情
下载文件
下载您平台上的文件。如果您不确定选择哪个,请了解更多关于 安装包 的信息。
源分布
domain-event-broker-3.0.2.tar.gz (32.2 kB 查看哈希值)
构建分布
关闭
domain-event-broker-3.0.2.tar.gz 的哈希值
算法 | 哈希摘要 | |
---|---|---|
SHA256 | c1a0894fb1038d0590901115bb5a5ce72ce1cf71ba9ed598552388f1fc87de64 |
|
MD5 | 77961f7d896f68ea86d9cf851516850f |
|
BLAKE2b-256 | a73f7ecb9c82d2efe793856d631413b75f0ef140b8bdcd0c8eaa7ecc2b26b5c5 |
关闭
domain_event_broker-3.0.2-py3-none-any.whl 的哈希值
算法 | 哈希摘要 | |
---|---|---|
SHA256 | fb804658dab811d417d3b802450659b9b52343fd4b9b35e7578b532bf7f55de5 |
|
MD5 | 31d0ae09f2a0a4a02ff65cb760aa55f8 |
|
BLAKE2b-256 | 104820198c2a6aaa284bbf8430b2bcbe6f8d3cf040eaaa0f298797d670f1363a |