跳转到主要内容

声明性消息处理应用程序

项目描述

msgapp: 声明性消息驱动应用程序

msgapp 帮助您编写具有最小样板的事件消费应用程序。它抽象了一些处理消息队列的繁琐细节,如确认、截止日期和解析。设计注重灵活性和可测试性,能够替换事件后端(目前仅为PubSub)并支持多个解析器(目前仅通过Pydantic提供JSON解析器)。

示例

Pydantic + PubSub

import anyio
from pydantic import BaseModel
from msgapp import App
from msgapp.producers.pubsub import PubSubQueue
from msgapp.parsers.json import PydanticParserFactory

class MyModel(BaseModel):
    foo: str
    baz: int

async def handler(model: MyModel) -> None:
    # do something with the model
    print(model)
    # return to ack/consume the model
    # raise an exception to signal an error
    # and let the queue handle redelivery

app = App(
    handler,
    producer=PubSubQueue(subscription="projects/demo/subscriptions/foo-bar"),
    parser=PydanticParserFactory(),
)

anyio.run(app.run)

Redis Streams + Pydantic

我们不包含Redis实现,因为Redis有很多用于消息传递的方法。例如,您可以使用Redis的PubSub功能进行fire-and-forget消息传递或使用Streams进行可靠的Kafka-like操作。

以下是一个使用Redis streams的示例实现。虽然这可能不是您想要的实现方式,但它应该能给您一些如何编写Redis生产者的思路。

from contextlib import asynccontextmanager
from dataclasses import dataclass
from typing import (
    Any,
    AsyncContextManager,
    AsyncIterator,
    Mapping,
    Optional,
    Sequence,
    Tuple,
)

from redis.asyncio import Redis
from redis.exceptions import ResponseError

from msgapp._producer import Producer


@dataclass(frozen=True)
class RedisMessage:
    payload: Mapping[bytes, bytes]
    id: bytes


class RedisWrappedEnvelope:
    def __init__(self, message: RedisMessage, body: bytes) -> None:
        self._message = message
        self._body = body

    @property
    def body(self) -> bytes:
        return self._body

    @property
    def message(self) -> RedisMessage:
        return self._message


class RedisProducer(Producer[Any]):
    def __init__(
        self,
        client: "Redis[Any]",
        stream: str,
        group: str,
        message_key: bytes,
        consumer_name: str,
        batch_size: int = 10,
        poll_interval: int = 30,
    ) -> None:
        self._client = client
        self._stream = stream
        self._group = group
        self._batch_size = batch_size
        self._poll_interval = poll_interval
        self._message_key = message_key
        self._consumer_name = consumer_name

    async def pull(self) -> AsyncIterator[AsyncContextManager[RedisWrappedEnvelope]]:
        try:
            await self._client.xgroup_create(
                name=self._stream, groupname=self._group, mkstream=True
            )
        except ResponseError as e:
            if "Consumer Group name already exists" in e.args[0]:
                pass
            else:
                raise
        last_id: Optional[bytes] = None
        items: Optional[
            Sequence[Tuple[str, Sequence[Tuple[bytes, Mapping[bytes, bytes]]]]]
        ] = None
        while True:
            items = await self._client.xreadgroup(
                groupname=self._group,
                consumername=self._consumer_name,
                streams={self._stream: last_id or ">"},
                block=1,
                count=1,
            )
            if not items:
                continue
            stream_items = next(iter(items))
            if len(stream_items[1]) == 0:
                last_id = None
                continue
            _, stream_messages = stream_items
            for message_id, values in stream_messages:
                last_id = message_id

                wrapped_msg = RedisMessage(payload=values, id=message_id)
                wrapped_envelope = RedisWrappedEnvelope(
                    wrapped_msg, values[self._message_key]
                )

                @asynccontextmanager
                async def msg_wrapper(
                    envelope: RedisWrappedEnvelope = wrapped_envelope,
                ) -> AsyncIterator[RedisWrappedEnvelope]:
                    yield envelope
                    await self._client.xack(  # type: ignore
                        self._stream, self._group, envelope.message.id
                    )

                yield msg_wrapper()


if __name__ == "__main__":
    import anyio
    from pydantic import BaseModel

    from msgapp import App
    from msgapp.parsers.json import PydanticParserFactory

    class MyModel(BaseModel):
        foo: str

    async def handler(message: MyModel) -> None:
        print(repr(message))

    stream = "mystream"  # str(uuid4())

    async def main() -> None:
        client = Redis.from_url("redis://localhost")
        producer = RedisProducer(client, stream, "mygroup", b"message", "consumer")

        app = App(handler, parser=PydanticParserFactory(), producer=producer)

        async with anyio.create_task_group() as tg:
            tg.start_soon(app.run)
            await client.xadd(stream, {b"message": b'{"foo": "bar"}'})
            await client.xadd(stream, {b"message": b'{"foo": "baz"}'})
            await anyio.sleep(1)
            tg.cancel_scope.cancel()

    anyio.run(main)

项目详情


下载文件

下载适用于您平台的文件。如果您不确定选择哪个,请了解有关安装包的更多信息。

源分发

msgapp-0.1.5.tar.gz (9.3 kB 查看哈希)

上传于

构建分发

msgapp-0.1.5-py3-none-any.whl (9.3 kB 查看哈希值)

上传于 Python 3

支持者

AWSAWS云计算和安全赞助商DatadogDatadog监控FastlyFastlyCDNGoogleGoogle下载分析MicrosoftMicrosoftPSF赞助商PingdomPingdom监控SentrySentry错误日志StatusPageStatusPage状态页面