声明性消息处理应用程序
项目描述
msgapp: 声明性消息驱动应用程序
msgapp
帮助您编写具有最小样板的事件消费应用程序。它抽象了一些处理消息队列的繁琐细节,如确认、截止日期和解析。设计注重灵活性和可测试性,能够替换事件后端(目前仅为PubSub)并支持多个解析器(目前仅通过Pydantic提供JSON解析器)。
示例
Pydantic + PubSub
import anyio
from pydantic import BaseModel
from msgapp import App
from msgapp.producers.pubsub import PubSubQueue
from msgapp.parsers.json import PydanticParserFactory
class MyModel(BaseModel):
foo: str
baz: int
async def handler(model: MyModel) -> None:
# do something with the model
print(model)
# return to ack/consume the model
# raise an exception to signal an error
# and let the queue handle redelivery
app = App(
handler,
producer=PubSubQueue(subscription="projects/demo/subscriptions/foo-bar"),
parser=PydanticParserFactory(),
)
anyio.run(app.run)
Redis Streams + Pydantic
我们不包含Redis实现,因为Redis有很多用于消息传递的方法。例如,您可以使用Redis的PubSub功能进行fire-and-forget消息传递或使用Streams进行可靠的Kafka-like操作。
以下是一个使用Redis streams的示例实现。虽然这可能不是您想要的实现方式,但它应该能给您一些如何编写Redis生产者的思路。
from contextlib import asynccontextmanager
from dataclasses import dataclass
from typing import (
Any,
AsyncContextManager,
AsyncIterator,
Mapping,
Optional,
Sequence,
Tuple,
)
from redis.asyncio import Redis
from redis.exceptions import ResponseError
from msgapp._producer import Producer
@dataclass(frozen=True)
class RedisMessage:
payload: Mapping[bytes, bytes]
id: bytes
class RedisWrappedEnvelope:
def __init__(self, message: RedisMessage, body: bytes) -> None:
self._message = message
self._body = body
@property
def body(self) -> bytes:
return self._body
@property
def message(self) -> RedisMessage:
return self._message
class RedisProducer(Producer[Any]):
def __init__(
self,
client: "Redis[Any]",
stream: str,
group: str,
message_key: bytes,
consumer_name: str,
batch_size: int = 10,
poll_interval: int = 30,
) -> None:
self._client = client
self._stream = stream
self._group = group
self._batch_size = batch_size
self._poll_interval = poll_interval
self._message_key = message_key
self._consumer_name = consumer_name
async def pull(self) -> AsyncIterator[AsyncContextManager[RedisWrappedEnvelope]]:
try:
await self._client.xgroup_create(
name=self._stream, groupname=self._group, mkstream=True
)
except ResponseError as e:
if "Consumer Group name already exists" in e.args[0]:
pass
else:
raise
last_id: Optional[bytes] = None
items: Optional[
Sequence[Tuple[str, Sequence[Tuple[bytes, Mapping[bytes, bytes]]]]]
] = None
while True:
items = await self._client.xreadgroup(
groupname=self._group,
consumername=self._consumer_name,
streams={self._stream: last_id or ">"},
block=1,
count=1,
)
if not items:
continue
stream_items = next(iter(items))
if len(stream_items[1]) == 0:
last_id = None
continue
_, stream_messages = stream_items
for message_id, values in stream_messages:
last_id = message_id
wrapped_msg = RedisMessage(payload=values, id=message_id)
wrapped_envelope = RedisWrappedEnvelope(
wrapped_msg, values[self._message_key]
)
@asynccontextmanager
async def msg_wrapper(
envelope: RedisWrappedEnvelope = wrapped_envelope,
) -> AsyncIterator[RedisWrappedEnvelope]:
yield envelope
await self._client.xack( # type: ignore
self._stream, self._group, envelope.message.id
)
yield msg_wrapper()
if __name__ == "__main__":
import anyio
from pydantic import BaseModel
from msgapp import App
from msgapp.parsers.json import PydanticParserFactory
class MyModel(BaseModel):
foo: str
async def handler(message: MyModel) -> None:
print(repr(message))
stream = "mystream" # str(uuid4())
async def main() -> None:
client = Redis.from_url("redis://localhost")
producer = RedisProducer(client, stream, "mygroup", b"message", "consumer")
app = App(handler, parser=PydanticParserFactory(), producer=producer)
async with anyio.create_task_group() as tg:
tg.start_soon(app.run)
await client.xadd(stream, {b"message": b'{"foo": "bar"}'})
await client.xadd(stream, {b"message": b'{"foo": "baz"}'})
await anyio.sleep(1)
tg.cancel_scope.cancel()
anyio.run(main)
项目详情
下载文件
下载适用于您平台的文件。如果您不确定选择哪个,请了解有关安装包的更多信息。
源分发
msgapp-0.1.5.tar.gz (9.3 kB 查看哈希)
构建分发
msgapp-0.1.5-py3-none-any.whl (9.3 kB 查看哈希值)
关闭
msgapp-0.1.5.tar.gz 的哈希值
算法 | 哈希摘要 | |
---|---|---|
SHA256 | 1faac8c6ef98a18d6327e8e4b8061292ba2fb36709bb4a779789f828fbf93ac2 |
|
MD5 | 1fc68a30c6881bd766757f66757d64c5 |
|
BLAKE2b-256 | 2543d0625f80d3c836ca3e0a80306bf51975b7f88ab2c3f36bc2bd678db2cdd9 |
关闭
msgapp-0.1.5-py3-none-any.whl 的哈希值
算法 | 哈希摘要 | |
---|---|---|
SHA256 | 1aaa4d415ab8488566f5b199e21bf8818e40d0b08d0fc27b74e75cce68f73efc |
|
MD5 | edb881a40a055b8c5e289f6a316c5e6a |
|
BLAKE2b-256 | 1b900e1b637425d31043381269f4bbf4cf3e52567c4418afd6fcb8c8d7228f61 |