跳转到主要内容

通过RabbitMQ发送和接收领域事件

项目描述

领域事件代理

这个库在RabbitMQ主题交换之上提供了一个浅层,用于发布和接收领域事件。发布者和订阅者无需了解彼此,可以按任何顺序启动和停止。每个订阅者控制自己的重试策略,是否需要在他们下线时使用持久队列,或者在没有错误的情况下在订阅者中设置死信队列。

配置

这个库需要连接到RabbitMQ。默认情况下,使用本地的RabbitMQ实例。这可以通过传递一个amqp URLpublish_domain_event或实例化PublisherSubscriber时进行更改。

from domain_event_broker import Subscriber
subscriber = Subscriber('amqp://user:password@rabbitmq-host/domain-events')

集成

Django

这个库可以通过Django设置进行配置。将domain_event_broker.django添加到您的INSTALLED_APPS,并在设置中设置DOMAIN_EVENT_BROKER

INSTALLED_APPS = (
    'domain_event_broker.django',
    )

DOMAIN_EVENT_BROKER = 'amqp://user:password@rabbitmq-host/domain-events'

更多信息可以在文档中找到。

发送事件

可以通过调用publish_domain_event发送事件。

from domain_event_broker import publish_domain_event
publish_domain_event('user.registered', {'user_id': user.id})

域事件会立即发送。当在数据库事务中发出域事件时,建议在事务提交后再进行发布。使用提交钩子可以避免在事务因错误回滚后产生虚假的域事件。

接收事件

订阅者可以监听一个或多个域事件——通过绑定键进行控制。绑定键可以包含通配符。将为每个订阅者创建一个队列。RabbitMQ 会负责将相关的事件仅路由到该队列。

此脚本将接收用户域中发送的所有事件

from domain_event_broker import Subscriber

def log_user_event(event):
    print(event)

subscriber = Subscriber()
subscriber.register(log_user_event, 'printer', ['user.*'])
subscriber.start_consuming()

重试策略

如果在消费消息时出现问题——例如,一个网络服务已关闭——订阅者可以引发错误,在给定的延迟后重试处理事件

from domain_event_broker import Subscriber

def sync_user_data(event):
    try:
        publish_to_service(event)
    except ServiceIsDown:
        raise Retry(5.0 ** event.retries) # 1s, 5s, 25s

subscriber = Subscriber()
subscriber.register(sync_user_data, 'sync_data', ['user.*'], max_retries=3)
subscriber.start_consuming()

延迟重试绑定到消费者,而不是事件。如果超过 max_retries,事件将被丢弃或死信处理。

开发

确保您已在本地上安装 RabbitMQ 以进行测试。

  • 创建虚拟环境并激活它
  • 使用 pip install -r requirements.txt -r dev_requirements.txt -e . 安装依赖项
  • 使用 pytest 运行测试

项目详情


下载文件

下载您平台上的文件。如果您不确定选择哪个,请了解更多关于 安装包 的信息。

源分布

domain-event-broker-3.0.2.tar.gz (32.2 kB 查看哈希值)

上传时间

构建分布

domain_event_broker-3.0.2-py3-none-any.whl (21.5 kB 查看哈希值)

上传时间 Python 3

支持者

AWS AWS 云计算和安全赞助商 Datadog Datadog 监控 Fastly Fastly CDN Google Google 下载分析 Microsoft Microsoft PSF 赞助商 Pingdom Pingdom 监控 Sentry Sentry 错误记录 StatusPage StatusPage 状态页面