跳转到主要内容

消息交换引擎,用于使用像RabbitMQ这样的代理构建管道

项目描述

BarterDude

Build Status Coverage Status

消息交换引擎,用于使用 RabbitMQ 等代理构建管道。该项目建立在优秀的 async-worker 之上。

barter

安装

使用 Python 3.6+

pip install barterdude

用法

使用此完整示例构建您的消费者

import logging
from barterdude import BarterDude
from barterdude.monitor import Monitor
from barterdude.hooks.healthcheck import Healthcheck
from barterdude.hooks.logging import Logging
from barterdude.hooks.metrics.prometheus import Prometheus
from barterdude.message import Message
from barterdude.conf import getLogger

# from my_project import MyHook # (you can build your own hooks)


# configure RabbitMQ
barterdude = BarterDude(
    hostname="localhost",
    username="guest",
    password="guest",
    prefetch=256,
)

# Prometheus labels for automatic metrics
labels = dict(
    app_name="my_app",
    team_name="my_team"
)
healthcheck = Healthcheck(barterdude) # automatic and customizable healthcheck
prometheus = Prometheus(labels) # automatic and customizable Prometheus integration

self.logger = getLogger("my_app", logging.DEBUG) # automatic json log with barterdude namespace
# BARTERDUDE_DEFAULT_APP_NAME is an env var to control your project namespace
# BARTERDUDE_DEFAULT_LOG_LEVEL is an env var to control loglevel by number 0, 10, 20, etc...

monitor = Monitor(
    healthcheck,
    prometheus,
    # MyHook(barterdude, "/path"), # (will make localhost:8080/path url)
    Logging() # automatic and customizable logging
)

my_metric = prometheus.metrics.counter(name="fail", description="fail again")  # It's the same as https://github.com/prometheus/client_python


@barterdude.consume_amqp(
    ["queue1", "queue2"],
    monitor,
    coroutines = 10,  # number of coroutines spawned to consume messages (1 per message)
    bulk_flush_interval = 60.0,  #  max waiting time for messages to start process n_coroutines
    requeue_on_fail = True  # should retry or not the message
)
async def your_consumer(msg: Message): # you receive only one message and we parallelize processing for you
    await barterdude.publish_amqp(
        exchange="my_exchange",
        data=msg.body
    )
    if msg.body == "fail":

        my_metric.inc() # you can use prometheus metrics
        healthcheck.force_fail() # you can use your hooks inside consumer too
        msg.reject(requeue=False) # You can force to reject a message, exactly equal https://b2wdigital.github.io/async-worker/src/asyncworker/asyncworker.rabbitmq.html#asyncworker.rabbitmq.message.RabbitMQMessage

    if msg.body == "exception":
        raise Exception() # this will reject the message and requeue

    # if everything is fine, than message automatically is accepted


barterdude.run() # you will start consume and start a server on http://localhost:8080
# Change host and port with ASYNCWORKER_HTTP_HOST and ASYNCWORKER_HTTP_PORT env vars

构建您自己的钩子

基础钩子(简单的一个)

这些钩子在消息检索成功和失败时被调用。

from barterdude.hooks import BaseHook
from asyncworker.rabbitmq.message import RabbitMQMessage

class MyCounterHook(BaseHook):
    _consume = _fail = _success = 0

    async def on_success(self, message: RabbitMQMessage):
        self._success += 1

    async def on_fail(self, message: RabbitMQMessage, error: Exception):
        self._fail += 1

    async def before_consume(self, message: RabbitMQMessage):
        self._consume += 1

Http Hook (开放路由)

这些钩子可以完成简单钩子所能完成的一切,但响应路由。

from aiohttp import web
from barterdude.hooks import HttpHook
from asyncworker.rabbitmq.message import RabbitMQMessage

class MyCounterHttpHook(HttpHook):
    _consume = _fail = _success = 0

    async def __call__(self, req: web.Request):
        return web.json_response(dict(
            consumed=self._consume,
            success=self._success,
            fail=self._fail
        ))

    async def on_success(self, message: RabbitMQMessage):
        self._success += 1

    async def on_fail(self, message: RabbitMQMessage, error: Exception):
        self._fail += 1

    async def before_consume(self, message: RabbitMQMessage):
        self._consume += 1

数据共享

遵循async-workeraiohttp中的方法,BarterDude反对使用全局变量,即单例。

为了在应用程序中全局共享数据状态,BarterDude的行为类似于一个dict。例如,可以在一个BarterDude实例中保存一个全局变量

from barterdude import BarterDude

barterdude = BarterDude()
baterdude["my_variable"] = data

并在消费者中获取它

async def consumer_access_storage(msg):
    data = baterdude["my_variable"]

在Healthcheck上监控额外模块

如果您在应用程序中运行额外模块,如工作进程或服务,可以将它们包含在健康检查中。

首先,您需要更新您的模块以实现HealthcheckMonitored接口

from barterdude.hooks.healthcheck import HealthcheckMonitored

class ExtraService(HealthcheckMonitored):

实现该接口需要在您的模块中定义healthcheck方法。它应该返回一个布尔值,表示您的模块是否健康

    def healthcheck(self):
        return self._thread.is_alive()

最后,您需要让BarterDude和Healthcheck模块知道您的模块。为此,您将使用数据共享功能

from barterdude import BarterDude
from app.extra_service import ExtraService

barterdude = BarterDude()
barterdude["extra_service"] = ExtraService()

如果您已经在BartedDude启动时使用数据共享模型运行了您的额外模块,那么就完成了

@app.run_on_startup
async def startup(app):
    app["client_session"] = ClientSession()
    app["extra_service"] = ExtraService()

健康检查模块将识别所有实现HealthcheckMonitored接口的共享模块,并自动运行其健康检查方法。所有监控模块的结果将包含在健康检查端点的响应体中,如果任何模块失败,健康检查端点将指示这一点

{
    "extra_service": "ok",
    "message": "Success rate: 1.0 (expected: 0.9)",
    "fail": 0,
    "success": 1,
    "status": "ok"
}

模式验证

可以由JSON模式验证消费的消息

@barterdude.consume_amqp(
    ["queue1", "queue2"],
    monitor,
    validation_schema={
            "$schema": "https://json-schema.fullstack.org.cn/draft-04/schema#",
            "$id": "http://example.com/example.json",
            "type": "object",
            "title": "Message Schema",
            "description": "The root schema comprises the entire JSON document.",
            "additionalProperties": True,
            "required": [
                "key"
            ],
            "properties": {
                "key": {
                    "$id": "#/properties/key",
                    "type": "string",
                    "title": "The Key Schema",
                    "description": "An explanation about message.",
                    "default": ""
                }
            }
        },
    requeue_on_validation_fail=False # invalid messages are removed without requeue
)

您仍然可以在生产消息之前或在需要时验证消息

from barterdude.message import MessageValidation

validator = MessageValidation(json_schema)
validator.validate(message)

数据保护

Barterdude考虑了GDPR数据保护,并且默认情况下不记录消息体,但您可以使用环境变量BARTERDUDE_LOG_REDACTED=0来禁用

现在消息体将由日志钩子记录

此配置仅控制BarterDude的默认日志钩子,并且不会影响用户自定义日志。如果您想使用此配置控制日志,请使用

from baterdude.conf import BARTERDUDE_LOG_REDACTED

HTTP端点

简单端点

如果您想公开HTTP端点,可以通过添加一个映射到钩子的路由轻松地在Barterdude工作进程中实现这一点。

barterdude.add_endpoint(
    routes=["/some_hook"],
    methods=["GET"],
    hook=some_hook,
)

Barterdude的回调端点

您还可以公开一个HTTP端点,该端点调用工作进程的回调以模拟从队列中消费和处理消息。这样,您可以通过传递消息体和头信息来发起请求,此请求的响应将包含工作进程将执行的所有操作信息,而无需真正发布消息。

barterdude.add_callback_endpoint(
    routes=["/execute"],
    hook=execute,
)

为了使用Barterdude对象的模拟实例,您还需要修改回调方法的签名以接收一个可选的Barterdude模拟参数。然后,您将必须选择使用哪个。只有回调端点调用会将Barterdude对象传递给您的回调。

async def execute(rabbitmq_message: RabbitMQMessage, barterdude_arg=None):
    bd = barterdude_arg if barterdude_arg is not None else barterdude

请求和响应示例

# Request
{
	"body": {
		"list_id": 105515152
	},
	"headers": {
		"trace_id": "random_id"
	}
}

# Response
{
	"message_calls": [
		{
			"method": "accept",
			"args": [],
			"kwargs": {}
		}
	],
	"barterdude_calls": [
		{
			"method": "publish_amqp",
			"args": [],
			"kwargs": {
				"exchange": "NEXT_EXCHANGE_TO_BE_CALLED",
				"data": {
					"list_id": 1055151521,
					"subject": "vendo samsung galaxy s21",
					"timestamp": 1657231231000
				},
				"properties": {
					"headers": {
						"has_failed": false,
						"trace_id": "random_id"
					}
				}
			}
		}
	]
}

副作用

如果您的回调具有具有副作用的服务,例如在数据库中插入行或更新API,您可以传递这些服务的模拟实例,这些实例将被注入以防止发生副作用。

barterdude.add_callback_endpoint(
    routes=["/execute"],
    hook=execute,
    mock_dependencies=[
        (
            fake_database_service,  # fake service instance to be used by the worker
            "database_service",     # name used in the data sharing/dependency injection
        ),
    ]
)

强制副作用

如果您在调用回调端点时希望发布消息,可以传递参数should_mock_barterdude: false。这样,消息就会被发布。此外,您不需要模拟工作进程使用的服务,所有副作用都会发生,并且您的工怍将像从队列中消费消息一样处理您的消息。

请求和响应示例

# Request
{
	"body": {
		"list_id": 105515152
	},
	"headers": {
		"trace_id": "random_id"
	},
        "should_mock_barterdude": false
}

# Response
{
	"message_calls": [
		{
			"method": "accept",
			"args": [],
			"kwargs": {}
		}
	]
    # message will be published, so we won't have information about publish method's calls
}

测试

为了测试异步消费者,我们推荐使用asynctest

from asynctest import TestCase


class TestMain(TestCase):
    def test_should_pass(self):
        self.assertTrue(True)

希望您喜欢! :wink

贡献

关于开发和贡献,请遵循贡献指南并始终尊重行为准则

项目详情


下载文件

下载您平台上的文件。如果您不确定该选择哪个,请了解有关安装包的更多信息。

源分发

barterdude-1.3.2.tar.gz (35.8 kB 查看哈希值)

上传时间

由以下支持