Postgresql作业调度
项目描述
作业
一个基于PL/PGSQL的工作队列(发布者/消费者),具有python asyncio/asyncpg api
alpha软件
功能
-
实现两层API
一个postgresql层:任务可以从PL/PGSQL函数或过程发布。也可以使用触发器扩展。
一个python层(或任何具有postgresql驱动的客户端)。默认实现基于asyncio python,使用出色的asyncpg驱动程序。
-
它与postgrest兼容。所有过程和表都在一个拥有的postgresql模式中定义,并且可以通过它暴露,使用postgrest。
-
重试逻辑、schedule_at或超时都在发布方法中实现。任务可以带有max_retries参数或特定超时来发布。
-
内部使用两个表
jobs.job_queue
(用于安排待处理和运行中的任务的表)和jobs.job
(将完成的任务移动到这里的表)。 -
默认情况下,任务会重试三次,带有回退。
-
超时作业会过期,默认情况下,任务有一个60秒的超时。
-
可以通过提供
scheduled_at
参数来在未来安排任务。 -
有视图可以监控队列统计信息:
jobs.all
(所有任务)、jobs.expired
和jobs.running
。 -
任务也可以进行优先级排序,提供优先级数字,优先级更高的任务优先级高于其他任务。
-
consumer_topic允许使用*(topic.element.%)消费任务
-
在我笔记本电脑上的基本基准测试表明,它可以处理每秒1000个任务,但无论如何,这取决于您的postgresql实例。
-
除了工作进程守护进程之外,任务还可以从cronjob、常规python或kubernetes作业中消费。(可以用于并行化k8作业)
权衡
- 所有作业都必须被肯定地确认或拒绝(ack/nack)
从postgresql使用
SELECT job_id FROM
jobs.publish(
i_task -- method or function to be executed,
i_body::jsonb = null -- arguments passed to it (on python {args:[], kwargs:{}}),
i_scheduled_at: timestamp = null, -- when the task should run
i_timeout:numeric(7,2) -- timeout in seconds for the job
i_priority:integer = null -- gretare number more priority
)
在工作进程侧
SELECT * from jobs.consume(
num: integer -- number of desired jobs
);
返回要处理的工作列表
或选择性消费一个主题
SELECT * from jobs.consume_topic('topic.xxx.%', 10)
工作被标记为正在处理,并且应该使用
SELECT FROM jobs.ack(job_id);
或返回一个失败的工作。
SELECT FROM jobs.nack(job_id, traceback, i_schedule_at)
此外,您还可以在一次请求中批量入队多个作业,使用
SELECT * FROM jobs.publish_bulk(jobs.bulk_job[]);
其中 jobs.bulk_job 是
create type jobs.bulk_job as (
task varchar,
body jsonb,
scheduled_at timestamp,
timeout integer,
priority integer,
max_retries integer
);
从 Python 使用
在此侧,实现一个工作器,应该是这样的
db = await asyncpg.connect(dsn)
while True:
jobs = await jobs.consume(db, 1)
for job in jobs:
try:
await jobs.run(db, job["job_id"])
await jobs.ack(job["job_id"])
except Exception as e:
await jobs.nack(job["job_id"], str(e))
await asyncio.sleep(1)
在发布者侧,作业可以从 PostgreSQL 事务中入队
db = await asyncpg.connect(dsn)
async with db.transaction():
# do whatever is needed,
# queue a task
await jobs.publish("package.file.sum", args=[1,2])
安装软件包
pip install pgjobs
jobs-migrate postgresql://user:password@localhost:5432/db
This will create the schema on the `jobs` postgresql schema
要运行工作器
jobs-worker postgresql://dsn
目前那里没有太多实现,只有一个单线程的工作器,需要更多的关注 :) 如果您的应用程序位于 Python 软件包中,像 yourpackage.file.method
这样的任务将可以正常运行。
可观察性和监控
使用 psql 或通过 postgresql_exporter 暴露
待办事项
-
使用 pg_notify 连接通知,当任务入队、被选中、完成时。有了这个,编写一个发送通知给已连接客户的 WS 就变得非常简单。
-
改进工作器,使其在每个 asyncio 任务上运行每个作业
-
更好地处理 Python 侧的异常
-
修复 requirements 文件
-
添加 GitHub Actions 以运行 CI
-
编写更好的文档和一些示例
项目详情
下载文件
下载您平台的文件。如果您不确定选择哪个,请了解有关 安装软件包 的更多信息。