跳转到主要内容

Postgresql作业调度

项目描述

作业

一个基于PL/PGSQL的工作队列(发布者/消费者),具有python asyncio/asyncpg api

alpha软件

功能

  • 实现两层API

    一个postgresql层:任务可以从PL/PGSQL函数或过程发布。也可以使用触发器扩展。

    一个python层(或任何具有postgresql驱动的客户端)。默认实现基于asyncio python,使用出色的asyncpg驱动程序。

  • 它与postgrest兼容。所有过程和表都在一个拥有的postgresql模式中定义,并且可以通过它暴露,使用postgrest。

  • 重试逻辑、schedule_at或超时都在发布方法中实现。任务可以带有max_retries参数或特定超时来发布。

  • 内部使用两个表 jobs.job_queue(用于安排待处理和运行中的任务的表)和jobs.job(将完成的任务移动到这里的表)。

  • 默认情况下,任务会重试三次,带有回退。

  • 超时作业会过期,默认情况下,任务有一个60秒的超时。

  • 可以通过提供scheduled_at参数来在未来安排任务。

  • 有视图可以监控队列统计信息:jobs.all(所有任务)、jobs.expiredjobs.running

  • 任务也可以进行优先级排序,提供优先级数字,优先级更高的任务优先级高于其他任务。

  • consumer_topic允许使用*(topic.element.%)消费任务

  • 在我笔记本电脑上的基本基准测试表明,它可以处理每秒1000个任务,但无论如何,这取决于您的postgresql实例。

  • 除了工作进程守护进程之外,任务还可以从cronjob、常规python或kubernetes作业中消费。(可以用于并行化k8作业)

权衡

  • 所有作业都必须被肯定地确认或拒绝(ack/nack)

从postgresql使用

SELECT job_id FROM
    jobs.publish(
        i_task -- method or function to be executed,
        i_body::jsonb = null -- arguments passed to it (on python {args:[], kwargs:{}}),
        i_scheduled_at: timestamp = null, -- when the task should run
        i_timeout:numeric(7,2) -- timeout in seconds for the job
        i_priority:integer = null -- gretare number more priority
    )

在工作进程侧

SELECT * from jobs.consume(
    num: integer -- number of desired jobs
);

返回要处理的工作列表

或选择性消费一个主题

SELECT * from jobs.consume_topic('topic.xxx.%', 10)

工作被标记为正在处理,并且应该使用

SELECT FROM jobs.ack(job_id);

或返回一个失败的工作。

SELECT FROM jobs.nack(job_id, traceback, i_schedule_at)

此外,您还可以在一次请求中批量入队多个作业,使用

SELECT * FROM jobs.publish_bulk(jobs.bulk_job[]);

其中 jobs.bulk_job 是

create type jobs.bulk_job as (
    task varchar,
    body jsonb,
    scheduled_at timestamp,
    timeout integer,
    priority integer,
    max_retries integer
);

从 Python 使用

在此侧,实现一个工作器,应该是这样的

    db = await asyncpg.connect(dsn)
    while True:
        jobs = await jobs.consume(db, 1)
        for job in jobs:
            try:
                await jobs.run(db, job["job_id"])
                await jobs.ack(job["job_id"])
            except Exception as e:
                await jobs.nack(job["job_id"], str(e))
        await asyncio.sleep(1)

在发布者侧,作业可以从 PostgreSQL 事务中入队

db = await asyncpg.connect(dsn)
async with db.transaction():
    # do whatever is needed,
    # queue a task
    await jobs.publish("package.file.sum", args=[1,2])

安装软件包

pip install pgjobs
jobs-migrate postgresql://user:password@localhost:5432/db

This will create the schema on the `jobs` postgresql schema

要运行工作器

jobs-worker postgresql://dsn

目前那里没有太多实现,只有一个单线程的工作器,需要更多的关注 :) 如果您的应用程序位于 Python 软件包中,像 yourpackage.file.method 这样的任务将可以正常运行。

可观察性和监控

使用 psql 或通过 postgresql_exporter 暴露

待办事项

  • 使用 pg_notify 连接通知,当任务入队、被选中、完成时。有了这个,编写一个发送通知给已连接客户的 WS 就变得非常简单。

  • 改进工作器,使其在每个 asyncio 任务上运行每个作业

  • 更好地处理 Python 侧的异常

  • 修复 requirements 文件

  • 添加 GitHub Actions 以运行 CI

  • 编写更好的文档和一些示例

项目详情


下载文件

下载您平台的文件。如果您不确定选择哪个,请了解有关 安装软件包 的更多信息。

源分布

pgjobs-0.2.1.tar.gz (12.1 kB 查看散列)

上传时间

构建分布

pgjobs-0.2.1-py3-none-any.whl (13.5 kB 查看散列)

上传时间 Python 3

由以下支持