跳转到主要内容

基于PostgreSQL的后端作业队列

项目描述

pgjobq

在Postgres之上构建的作业队列。

项目状态

请不要将此用于除实验或启发以外的任何用途。在某个时候,我可能会决定长期支持此项目(此时此警告将被删除),但在那之前,这只是个试验场,可能面临破坏性变更(包括破坏性模式变更)。

目的

有时你有一个Postgres数据库,需要队列。你可以建立更多基础设施(SQS、Redis等),或者你可以使用现有的数据库。对于持久队列,有许多用例不需要无限可伸缩性、快照或其他完整队列/事件总线/作业代理的高级功能。

特性

  • 尽力一次交付(作业一次只交付给一个工作者)
  • 自动重传失败的作业(即使你的进程崩溃)
  • 低延迟交付(近实时,使用PostgreSQL的NOTIFY功能)
  • 低延迟完成跟踪(使用NOTIFY
  • 死信队列
  • 作业属性和属性过滤
  • 作业依赖关系(用于处理DAG-like工作流或使作业按FIFO处理)
  • 持久化计划作业(在数据库中计划,而不是客户端应用程序中)
  • 作业取消(对于队列中的作业是保证的,对于已签出的作业是尽力而为的)
  • 批量发送和轮询以支持大型工作负载
  • 背压/边界队列
  • 全类型异步Python客户端(使用asyncpg
  • 指数退避重试
  • 遥测钩子,用于采样带有EXPLAIN的查询或与OpenTelemetry集成。

可能的功能

  • 回复队列和响应处理

示例

from contextlib import AsyncExitStack

import anyio
import asyncpg  # type: ignore
from pgjobq import create_queue, connect_to_queue, migrate_to_latest_version

async def main() -> None:

    async with AsyncExitStack() as stack:
        pool: asyncpg.Pool = await stack.enter_async_context(
            asyncpg.create_pool(  # type: ignore
                "postgres://postgres:postgres@localhost/postgres"
            )
        )
        await migrate_to_latest_version(pool)
        await create_queue("myq", pool)
        queue = await stack.enter_async_context(
            connect_to_queue("myq", pool)
        )
        async with anyio.create_task_group() as tg:

            async def worker() -> None:
                async with queue.receive() as msg_handle_rcv_stream:
                    # receive a single job
                    async with (await msg_handle_rcv_stream.receive()).acquire():
                        print("received")
                        # do some work
                        await anyio.sleep(1)
                        print("done processing")
                        print("acked")

            tg.start_soon(worker)
            tg.start_soon(worker)

            async with queue.send(b'{"foo":"bar"}') as completion_handle:
                print("sent")
                await completion_handle.wait()
                print("completed")
                tg.cancel_scope.cancel()


if __name__ == "__main__":
    anyio.run(main)
    # prints:
    # "sent"
    # "received"
    # "done processing"
    # "acked"
    # "completed"

开发

  1. 克隆仓库
  2. 启动一个可丢弃的 PostgreSQL 实例(例如 docker run -it -e POSTGRES_PASSWORD=postgres -p 5432:5432 postgres
  3. 运行 make test

在 GitHub 上查看此版本:v0.10.0

项目详情


下载文件

下载您平台的文件。如果您不确定选择哪个,请了解更多关于 安装包 的信息。

源分发

pgjobq-0.10.0.tar.gz (19.2 kB 查看哈希)

上传时间

构建分发

pgjobq-0.10.0-py3-none-any.whl (24.1 kB 查看哈希)

上传时间 Python 3

由以下组织支持