跳转到主要内容

PostgreSQL支持的消息队列

项目描述

pgmq

基于Postgres构建的消息队列。

目的

有时您有一个Postgres数据库,需要队列。您可以选择建立更多的基础设施(SQS、Redis等),或者使用现有的数据库。有许多持久队列的使用场景,这些场景不需要无限的扩展性、快照或其他完整队列/事件总线/消息代理的先进功能。

特性

  • 尽力一次交付(消息一次只发送给一个工作者)
  • 自动重发失败的消息
  • 低延迟交付(近实时,使用PostgreSQL的NOTIFY功能)
  • 低延迟完成跟踪(使用NOTIFY
  • 批量发送和接收
  • 全类型异步Python客户端(使用asyncpg
  • 持久计划消息(在数据库中计划,而不是在客户端应用程序中)

可能的功能

  • 指数退避
  • 死信队列
  • 自定义交付排序键
  • 响应/完成消息
  • 消息属性和属性过滤
  • FIFO交付
  • 背压/边界队列

未计划的功能

  • 发送响应数据(目前需要发送到带外)
  • 支持“订阅”(这是一个简单的队列,不是消息代理)

示例

from contextlib import AsyncExitStack

import anyio
import asyncpg  # type: ignore
from pgmq import create_queue, connect_to_queue, migrate_to_latest_version

async def main() -> None:

    async with AsyncExitStack() as stack:
        pool: asyncpg.Pool = await stack.enter_async_context(
            asyncpg.create_pool(  # type: ignore
                "postgres://postgres:postgres@localhost/postgres"
            )
        )
        await migrate_to_latest_version(pool)
        await create_queue("myq", pool)
        queue = await stack.enter_async_context(
            connect_to_queue("myq", pool)
        )
        async with anyio.create_task_group() as tg:

            async def worker() -> None:
                async with queue.receive() as msg_handle_rcv_stream:
                    # receive a single message
                    async with (await msg_handle_rcv_stream.receive()).acquire():
                        print("received")
                        # do some work
                        await anyio.sleep(1)
                        print("done processing")
                        print("acked")

            tg.start_soon(worker)
            tg.start_soon(worker)

            async with queue.send(b'{"foo":"bar"}') as completion_handle:
                print("sent")
                await completion_handle()
                print("completed")
                tg.cancel_scope.cancel()


if __name__ == "__main__":
    anyio.run(main)
    # prints:
    # "sent"
    # "received"
    # "done processing"
    # "acked"
    # "completed"

开发

  1. 克隆仓库
  2. 启动可丢弃的PostgreSQL实例(例如 docker run -it -e POSTGRES_PASSWORD=postgres -p 5432:5432 postgres
  3. 运行 make test

在GitHub上查看此版本:v0.6.1

项目详情


下载文件

下载适用于您平台的文件。如果您不确定选择哪个,请了解更多关于安装包的信息。

源代码分发

pgmq-0.6.1.tar.gz (12.7 kB 查看哈希值)

上传时间 源代码

构建分发

pgmq-0.6.1-py3-none-any.whl (13.5 kB 查看哈希值)

上传时间 Python 3

由以下机构支持