消息交换引擎,用于使用像RabbitMQ这样的代理构建管道
项目描述
BarterDude
消息交换引擎,用于使用 RabbitMQ 等代理构建管道。该项目建立在优秀的 async-worker 之上。
安装
使用 Python 3.6+
pip install barterdude
用法
使用此完整示例构建您的消费者
import logging
from barterdude import BarterDude
from barterdude.monitor import Monitor
from barterdude.hooks.healthcheck import Healthcheck
from barterdude.hooks.logging import Logging
from barterdude.hooks.metrics.prometheus import Prometheus
from barterdude.message import Message
from barterdude.conf import getLogger
# from my_project import MyHook # (you can build your own hooks)
# configure RabbitMQ
barterdude = BarterDude(
hostname="localhost",
username="guest",
password="guest",
prefetch=256,
)
# Prometheus labels for automatic metrics
labels = dict(
app_name="my_app",
team_name="my_team"
)
healthcheck = Healthcheck(barterdude) # automatic and customizable healthcheck
prometheus = Prometheus(labels) # automatic and customizable Prometheus integration
self.logger = getLogger("my_app", logging.DEBUG) # automatic json log with barterdude namespace
# BARTERDUDE_DEFAULT_APP_NAME is an env var to control your project namespace
# BARTERDUDE_DEFAULT_LOG_LEVEL is an env var to control loglevel by number 0, 10, 20, etc...
monitor = Monitor(
healthcheck,
prometheus,
# MyHook(barterdude, "/path"), # (will make localhost:8080/path url)
Logging() # automatic and customizable logging
)
my_metric = prometheus.metrics.counter(name="fail", description="fail again") # It's the same as https://github.com/prometheus/client_python
@barterdude.consume_amqp(
["queue1", "queue2"],
monitor,
coroutines = 10, # number of coroutines spawned to consume messages (1 per message)
bulk_flush_interval = 60.0, # max waiting time for messages to start process n_coroutines
requeue_on_fail = True # should retry or not the message
)
async def your_consumer(msg: Message): # you receive only one message and we parallelize processing for you
await barterdude.publish_amqp(
exchange="my_exchange",
data=msg.body
)
if msg.body == "fail":
my_metric.inc() # you can use prometheus metrics
healthcheck.force_fail() # you can use your hooks inside consumer too
msg.reject(requeue=False) # You can force to reject a message, exactly equal https://b2wdigital.github.io/async-worker/src/asyncworker/asyncworker.rabbitmq.html#asyncworker.rabbitmq.message.RabbitMQMessage
if msg.body == "exception":
raise Exception() # this will reject the message and requeue
# if everything is fine, than message automatically is accepted
barterdude.run() # you will start consume and start a server on http://localhost:8080
# Change host and port with ASYNCWORKER_HTTP_HOST and ASYNCWORKER_HTTP_PORT env vars
构建您自己的钩子
基础钩子(简单的一个)
这些钩子在消息检索成功和失败时被调用。
from barterdude.hooks import BaseHook
from asyncworker.rabbitmq.message import RabbitMQMessage
class MyCounterHook(BaseHook):
_consume = _fail = _success = 0
async def on_success(self, message: RabbitMQMessage):
self._success += 1
async def on_fail(self, message: RabbitMQMessage, error: Exception):
self._fail += 1
async def before_consume(self, message: RabbitMQMessage):
self._consume += 1
Http Hook (开放路由)
这些钩子可以完成简单钩子所能完成的一切,但响应路由。
from aiohttp import web
from barterdude.hooks import HttpHook
from asyncworker.rabbitmq.message import RabbitMQMessage
class MyCounterHttpHook(HttpHook):
_consume = _fail = _success = 0
async def __call__(self, req: web.Request):
return web.json_response(dict(
consumed=self._consume,
success=self._success,
fail=self._fail
))
async def on_success(self, message: RabbitMQMessage):
self._success += 1
async def on_fail(self, message: RabbitMQMessage, error: Exception):
self._fail += 1
async def before_consume(self, message: RabbitMQMessage):
self._consume += 1
数据共享
遵循async-worker和aiohttp中的方法,BarterDude
反对使用全局变量,即单例。
为了在应用程序中全局共享数据状态,BarterDude
的行为类似于一个dict
。例如,可以在一个BarterDude
实例中保存一个全局变量
from barterdude import BarterDude
barterdude = BarterDude()
baterdude["my_variable"] = data
并在消费者中获取它
async def consumer_access_storage(msg):
data = baterdude["my_variable"]
在Healthcheck上监控额外模块
如果您在应用程序中运行额外模块,如工作进程或服务,可以将它们包含在健康检查中。
首先,您需要更新您的模块以实现HealthcheckMonitored
接口
from barterdude.hooks.healthcheck import HealthcheckMonitored
class ExtraService(HealthcheckMonitored):
实现该接口需要在您的模块中定义healthcheck
方法。它应该返回一个布尔值,表示您的模块是否健康
def healthcheck(self):
return self._thread.is_alive()
最后,您需要让BarterDude和Healthcheck模块知道您的模块。为此,您将使用数据共享功能
from barterdude import BarterDude
from app.extra_service import ExtraService
barterdude = BarterDude()
barterdude["extra_service"] = ExtraService()
如果您已经在BartedDude启动时使用数据共享模型运行了您的额外模块,那么就完成了
@app.run_on_startup
async def startup(app):
app["client_session"] = ClientSession()
app["extra_service"] = ExtraService()
健康检查模块将识别所有实现HealthcheckMonitored
接口的共享模块,并自动运行其健康检查方法。所有监控模块的结果将包含在健康检查端点的响应体中,如果任何模块失败,健康检查端点将指示这一点
{
"extra_service": "ok",
"message": "Success rate: 1.0 (expected: 0.9)",
"fail": 0,
"success": 1,
"status": "ok"
}
模式验证
可以由JSON模式验证消费的消息
@barterdude.consume_amqp(
["queue1", "queue2"],
monitor,
validation_schema={
"$schema": "https://json-schema.fullstack.org.cn/draft-04/schema#",
"$id": "http://example.com/example.json",
"type": "object",
"title": "Message Schema",
"description": "The root schema comprises the entire JSON document.",
"additionalProperties": True,
"required": [
"key"
],
"properties": {
"key": {
"$id": "#/properties/key",
"type": "string",
"title": "The Key Schema",
"description": "An explanation about message.",
"default": ""
}
}
},
requeue_on_validation_fail=False # invalid messages are removed without requeue
)
您仍然可以在生产消息之前或在需要时验证消息
from barterdude.message import MessageValidation
validator = MessageValidation(json_schema)
validator.validate(message)
数据保护
Barterdude考虑了GDPR数据保护,并且默认情况下不记录消息体,但您可以使用环境变量BARTERDUDE_LOG_REDACTED=0
来禁用
现在消息体将由日志钩子记录
此配置仅控制BarterDude的默认日志钩子,并且不会影响用户自定义日志。如果您想使用此配置控制日志,请使用
from baterdude.conf import BARTERDUDE_LOG_REDACTED
HTTP端点
简单端点
如果您想公开HTTP端点,可以通过添加一个映射到钩子的路由轻松地在Barterdude工作进程中实现这一点。
barterdude.add_endpoint(
routes=["/some_hook"],
methods=["GET"],
hook=some_hook,
)
Barterdude的回调端点
您还可以公开一个HTTP端点,该端点调用工作进程的回调以模拟从队列中消费和处理消息。这样,您可以通过传递消息体和头信息来发起请求,此请求的响应将包含工作进程将执行的所有操作信息,而无需真正发布消息。
barterdude.add_callback_endpoint(
routes=["/execute"],
hook=execute,
)
为了使用Barterdude对象的模拟实例,您还需要修改回调方法的签名以接收一个可选的Barterdude模拟参数。然后,您将必须选择使用哪个。只有回调端点调用会将Barterdude对象传递给您的回调。
async def execute(rabbitmq_message: RabbitMQMessage, barterdude_arg=None):
bd = barterdude_arg if barterdude_arg is not None else barterdude
请求和响应示例
# Request
{
"body": {
"list_id": 105515152
},
"headers": {
"trace_id": "random_id"
}
}
# Response
{
"message_calls": [
{
"method": "accept",
"args": [],
"kwargs": {}
}
],
"barterdude_calls": [
{
"method": "publish_amqp",
"args": [],
"kwargs": {
"exchange": "NEXT_EXCHANGE_TO_BE_CALLED",
"data": {
"list_id": 1055151521,
"subject": "vendo samsung galaxy s21",
"timestamp": 1657231231000
},
"properties": {
"headers": {
"has_failed": false,
"trace_id": "random_id"
}
}
}
}
]
}
副作用
如果您的回调具有具有副作用的服务,例如在数据库中插入行或更新API,您可以传递这些服务的模拟实例,这些实例将被注入以防止发生副作用。
barterdude.add_callback_endpoint(
routes=["/execute"],
hook=execute,
mock_dependencies=[
(
fake_database_service, # fake service instance to be used by the worker
"database_service", # name used in the data sharing/dependency injection
),
]
)
强制副作用
如果您在调用回调端点时希望发布消息,可以传递参数should_mock_barterdude: false
。这样,消息就会被发布。此外,您不需要模拟工作进程使用的服务,所有副作用都会发生,并且您的工怍将像从队列中消费消息一样处理您的消息。
请求和响应示例
# Request
{
"body": {
"list_id": 105515152
},
"headers": {
"trace_id": "random_id"
},
"should_mock_barterdude": false
}
# Response
{
"message_calls": [
{
"method": "accept",
"args": [],
"kwargs": {}
}
]
# message will be published, so we won't have information about publish method's calls
}
测试
为了测试异步消费者,我们推荐使用asynctest
库
from asynctest import TestCase
class TestMain(TestCase):
def test_should_pass(self):
self.assertTrue(True)
希望您喜欢! :wink
贡献
项目详情
barterdude-1.3.2.tar.gz的哈希值
算法 | 哈希摘要 | |
---|---|---|
SHA256 | 68ee4de80ab079a951fdbe400411004ab16932f953c93016b4db72262e4edc7f |
|
MD5 | bdcb4efa68c11c307628b3cbe6aef211 |
|
BLAKE2b-256 | 5645077217b25074d4028dfba3348cae849b41c648518e2a6467e9a331fffcdd |