PostgreSQL支持的消息队列
项目描述
pgmq
基于Postgres构建的消息队列。
目的
有时您有一个Postgres数据库,需要队列。您可以选择建立更多的基础设施(SQS、Redis等),或者使用现有的数据库。有许多持久队列的使用场景,这些场景不需要无限的扩展性、快照或其他完整队列/事件总线/消息代理的先进功能。
特性
- 尽力一次交付(消息一次只发送给一个工作者)
- 自动重发失败的消息
- 低延迟交付(近实时,使用PostgreSQL的
NOTIFY
功能) - 低延迟完成跟踪(使用
NOTIFY
) - 批量发送和接收
- 全类型异步Python客户端(使用asyncpg)
- 持久计划消息(在数据库中计划,而不是在客户端应用程序中)
可能的功能
- 指数退避
- 死信队列
- 自定义交付排序键
- 响应/完成消息
- 消息属性和属性过滤
- FIFO交付
- 背压/边界队列
未计划的功能
- 发送响应数据(目前需要发送到带外)
- 支持“订阅”(这是一个简单的队列,不是消息代理)
示例
from contextlib import AsyncExitStack
import anyio
import asyncpg # type: ignore
from pgmq import create_queue, connect_to_queue, migrate_to_latest_version
async def main() -> None:
async with AsyncExitStack() as stack:
pool: asyncpg.Pool = await stack.enter_async_context(
asyncpg.create_pool( # type: ignore
"postgres://postgres:postgres@localhost/postgres"
)
)
await migrate_to_latest_version(pool)
await create_queue("myq", pool)
queue = await stack.enter_async_context(
connect_to_queue("myq", pool)
)
async with anyio.create_task_group() as tg:
async def worker() -> None:
async with queue.receive() as msg_handle_rcv_stream:
# receive a single message
async with (await msg_handle_rcv_stream.receive()).acquire():
print("received")
# do some work
await anyio.sleep(1)
print("done processing")
print("acked")
tg.start_soon(worker)
tg.start_soon(worker)
async with queue.send(b'{"foo":"bar"}') as completion_handle:
print("sent")
await completion_handle()
print("completed")
tg.cancel_scope.cancel()
if __name__ == "__main__":
anyio.run(main)
# prints:
# "sent"
# "received"
# "done processing"
# "acked"
# "completed"
开发
- 克隆仓库
- 启动可丢弃的PostgreSQL实例(例如
docker run -it -e POSTGRES_PASSWORD=postgres -p 5432:5432 postgres
) - 运行
make test
在GitHub上查看此版本:v0.6.1
项目详情
下载文件
下载适用于您平台的文件。如果您不确定选择哪个,请了解更多关于安装包的信息。
源代码分发
pgmq-0.6.1.tar.gz (12.7 kB 查看哈希值)
构建分发
pgmq-0.6.1-py3-none-any.whl (13.5 kB 查看哈希值)
关闭
pgmq-0.6.1.tar.gz的哈希值
算法 | 哈希摘要 | |
---|---|---|
SHA256 | 2469a8c1f32ff7342b980eb0614e37e188be413df036fc9b6b06c800c77d4420 |
|
MD5 | 28dbb48c8c284911cb9312a637297611 |
|
BLAKE2b-256 | 91a4dd62df6b1356c749435f690177bda063bde2be60595318c6203845f732f3 |
关闭
pgmq-0.6.1-py3-none-any.whl的哈希值
算法 | 哈希摘要 | |
---|---|---|
SHA256 | 5cca8234063478a76f86c4f591e96312d7465d8ce237215e1088321dc9e68b7a |
|
MD5 | e4f43877fcfa8a65f67e65b81ecabfb0 |
|
BLAKE2b-256 | b5a68dd6fb557594a6e0ae498daf879a29bfd6757c2e0da68885d410725a17d3 |