基于PostgreSQL的后端作业队列
项目描述
pgjobq
在Postgres之上构建的作业队列。
项目状态
请不要将此用于除实验或启发以外的任何用途。在某个时候,我可能会决定长期支持此项目(此时此警告将被删除),但在那之前,这只是个试验场,可能面临破坏性变更(包括破坏性模式变更)。
目的
有时你有一个Postgres数据库,需要队列。你可以建立更多基础设施(SQS、Redis等),或者你可以使用现有的数据库。对于持久队列,有许多用例不需要无限可伸缩性、快照或其他完整队列/事件总线/作业代理的高级功能。
特性
- 尽力一次交付(作业一次只交付给一个工作者)
- 自动重传失败的作业(即使你的进程崩溃)
- 低延迟交付(近实时,使用PostgreSQL的
NOTIFY
功能) - 低延迟完成跟踪(使用
NOTIFY
) - 死信队列
- 作业属性和属性过滤
- 作业依赖关系(用于处理DAG-like工作流或使作业按FIFO处理)
- 持久化计划作业(在数据库中计划,而不是客户端应用程序中)
- 作业取消(对于队列中的作业是保证的,对于已签出的作业是尽力而为的)
- 批量发送和轮询以支持大型工作负载
- 背压/边界队列
- 全类型异步Python客户端(使用asyncpg)
- 指数退避重试
- 遥测钩子,用于采样带有EXPLAIN的查询或与OpenTelemetry集成。
可能的功能
- 回复队列和响应处理
示例
from contextlib import AsyncExitStack
import anyio
import asyncpg # type: ignore
from pgjobq import create_queue, connect_to_queue, migrate_to_latest_version
async def main() -> None:
async with AsyncExitStack() as stack:
pool: asyncpg.Pool = await stack.enter_async_context(
asyncpg.create_pool( # type: ignore
"postgres://postgres:postgres@localhost/postgres"
)
)
await migrate_to_latest_version(pool)
await create_queue("myq", pool)
queue = await stack.enter_async_context(
connect_to_queue("myq", pool)
)
async with anyio.create_task_group() as tg:
async def worker() -> None:
async with queue.receive() as msg_handle_rcv_stream:
# receive a single job
async with (await msg_handle_rcv_stream.receive()).acquire():
print("received")
# do some work
await anyio.sleep(1)
print("done processing")
print("acked")
tg.start_soon(worker)
tg.start_soon(worker)
async with queue.send(b'{"foo":"bar"}') as completion_handle:
print("sent")
await completion_handle.wait()
print("completed")
tg.cancel_scope.cancel()
if __name__ == "__main__":
anyio.run(main)
# prints:
# "sent"
# "received"
# "done processing"
# "acked"
# "completed"
开发
- 克隆仓库
- 启动一个可丢弃的 PostgreSQL 实例(例如
docker run -it -e POSTGRES_PASSWORD=postgres -p 5432:5432 postgres
) - 运行
make test
在 GitHub 上查看此版本:v0.10.0
项目详情
下载文件
下载您平台的文件。如果您不确定选择哪个,请了解更多关于 安装包 的信息。
源分发
pgjobq-0.10.0.tar.gz (19.2 kB 查看哈希)
构建分发
pgjobq-0.10.0-py3-none-any.whl (24.1 kB 查看哈希)
关闭
pgjobq-0.10.0.tar.gz 的哈希
算法 | 哈希摘要 | |
---|---|---|
SHA256 | 1deb0cb74b76ab1822de0f3e0a8913f1b94505268c7b65f9e950b6045d12a2a2 |
|
MD5 | 07f04d07d473652a6c570783c66f4337 |
|
BLAKE2b-256 | 9f4c580141800f827f03946483c0cc761b0c5355163293caa857488e96202db3 |
关闭
pgjobq-0.10.0-py3-none-any.whl 的哈希
算法 | 哈希摘要 | |
---|---|---|
SHA256 | 8c0421c0584dbe8ce05a6351507666c42e4bd70c22ea58121c9a4b7b7b2165c5 |
|
MD5 | 7da3268775c73bb78085e351986870b1 |
|
BLAKE2b-256 | 56572b38980cf3a1777a041bc0eb6f670240711a604c724f8d0300cc4c800f03 |