跳转到主要内容

围绕PostgreSQL编写的消息队列。

项目描述

https://img.shields.io/pypi/v/psycopg2_mq.svg main CI Status

psycopg2_mq 是一个基于 PostgreSQLSQLAlchemypsycopg2 的消息队列。

目前,该库仅提供用于构建多线程工作系统的底层构造。它分为两个组件

  • psycopg2_mq.MQWorker - 一个可重用的工作对象,它管理一个单线程工作进程,可以接受任务并执行它们。应用程序应该为每个线程创建一个工作进程。它支持线程安全的优雅关闭API。

  • psycopg2_mq.MQSource - 一个源对象,提供客户端API用于调用和查询任务状态。

预期这些核心组件将被封装到您自己的应用中,您可以根据需要以任何方式实现,而不指定特定的CLI或框架。

数据模型

队列

工作进程运行队列中定义的任务。目前,每个队列将并发运行任务,而未来的版本可能支持每个队列的序列执行。每个注册的队列应包含一个 execute_job(job) 方法。

任务

队列的 execute_job 方法将传递一个包含以下属性的 Job 对象

  • id

  • queue

  • method

  • args

  • cursor

为了方便,提供了一个 extend(**kw) 方法,可用于向对象添加额外属性。这在定义队列及其方法之间的合同时很有用。

游标

可以使用 cursor_key 对象来安排一个 Job。对于任何游标,只能有一个挂起的作业和一个正在运行的作业。当有另一个作业挂起时,新安排的作业将被忽略,并返回挂起的作业。

提供给工作者一个包含游标数据的 job.cursor 字典,并在作业完成后将其保存回数据库。这有效地给作业提供了一些持久的共享状态,并序列化了给定游标下的所有作业。

延迟作业

可以通过向 when 参数提供一个 datetime 对象,将 Job 延迟到将来运行。结合游标键,这可以提供对作业运行频率的良好节流。例如,使用 cursor_key 延迟30秒运行作业,在此期间安排的任何作业都将被丢弃。这里假设参数是常数,执行数据在游标或另一个表中。作为最后的手段,可以使用 conflict_resolver 回调在参数不能是常数时修改作业的属性。

调度

可以定义支持 RFC 5545 RRULE 调度的 JobSchedule。这些功能强大,可以支持时区、基于日历的频率以及使用 DTSTART 的锚点时间的简单重复规则。可以将 Cron 作业转换为这种语法以简化场景。

psycopg2-mq 工作者将自动协商哪个工作者负责管理调度,因此集群工作者应按预期运行。

示例工作者

from psycopg2_mq import (
    MQWorker,
    make_default_model,
)
from sqlalchemy import (
    MetaData,
    create_engine,
)
import sys

class EchoQueue:
    def execute_job(self, job):
        return f'hello, {job.args["name"]} from method="{job.method}"'

if __name__ == '__main__':
    engine = create_engine(sys.argv[1])
    metadata = MetaData()
    model = make_default_model(metadata)
    worker = MQWorker(
        engine=engine,
        queues={
            'echo': EchoQueue(),
        },
        model=model,
    )
    worker.run()

示例源

engine = create_engine()
metadata = MetaData()
model = make_default_model(metadata)
session_factory = sessionmaker()
session_factory.configure(bind=engine)

dbsession = session_factory()
with dbsession.begin():
    mq = MQSource(
        dbsession=dbsession,
        model=model,
    )
    job = mq.call('echo', 'hello', {'name': 'Andy'})
    print(f'queued job={job.id}')

更改

0.10 (2023-08-06)

  • 添加对 Python 3.12 的支持。

  • 取消对 Python 3.7 和 3.8 的支持。

  • 修复了关闭时的竞态条件,其中作业在触发器消失而池仍在关闭时失败。

0.9 (2023-04-21)

  • 添加对 Python 3.10 和 3.11 的支持。

  • [重大更改] 防止可折叠作业的重试。要求使用 call 来调用它们,以便有机会指定 conflict_resolver

  • 修复了默认模型模式中的错误,其中可折叠数据库索引未标记为唯一。

  • 在重试作业时复制跟踪信息。

  • 将字符串化的异常捕获到作业结果中的 message 键,与现有的 tbexcargs 键一起。

  • 工作者没有识别 capture_signals=False,导致在其他线程中运行事件循环时出现问题。

  • 优化代码库并添加一些真正的测试。太好了!

0.8.3 (2022-04-15)

  • [重大更改] 删除 MQWorker.make_job_context

0.8.2 (2022-04-15)

  • 取消对 Python 3.6 的支持。

  • [重大更改] 要求 SQLAlchemy 1.4+ 并解决与 SQLAlchemy 2.0 相关的弃用警告。

  • [重大更改] 将 update_job_id 重命名为 updated_job_idJobCursor 模型中。

0.8.1 (2022-04-15)

  • 确保在 JobContext 上填充 trace 属性。

  • 添加 MQWorker.make_job_context,该函数可以用于完全覆盖使用 Job 对象和打开数据库会话来创建的 JobContext 工厂。

0.8.0 (2022-04-15)

  • [重大变更] 将 update_job_id 外键添加到 JobCursor 模型,以便能够知道哪个作业最后更新了游标中的值。

  • [重大变更] 将 trace JSON 块添加到 Job 模型。

  • 在创建新作业时支持 trace JSON 块。此值在运行中的作业上下文中可用,可以在创建子作业或向外部系统发出请求时传递跟踪元数据。

    创建作业时查看 MQSource.call 的新 trace 参数。处理作业时查看 JobContext.trace 属性。

  • 添加一个标准的 FailedJobError 异常,作业可以抛出此异常以使用自定义结果对象标记失败。这与导致调用 MQWorker.result_from_error 方法的未处理的异常不同。

0.7.0 (2022-03-03)

  • 修复了与游标关联的丢失作业的边缘情况。在多个工作者运行的情况下,如果一个工作者丢失了数据库连接,则另一个工作者被设计为注意到并标记作业丢失。然而,作业实际上可能并未丢失,并且工作者在恢复其连接后可以恢复,并将作业重新标记为运行。在这种情况下,我们不希望另一个作业在相同的游标上开始。为了修复此问题,如果同一游标上标记为丢失的另一个作业,则不会运行新作业。您需要首先将作业标记为未丢失(可能是失败)以解除对游标上其余作业的阻塞。

0.6.2 (2022-03-01)

  • 将维护工作优先级高于运行新作业。存在一个“先有鸡还是先有蛋”的问题,即作业会被标记为正在运行,但需要被标记为丢失。然而,标记为丢失的优先级低于尝试启动新作业。在许多作业同时安排的情况下,工作者总是尝试启动新作业,而不运行维护工作,因此作业永远不会被标记为丢失,从而有效地阻塞了队列。

0.6.1 (2022-01-15)

  • 修复了在 0.6.0 版本中安排新作业时引入的错误。

0.6.0 (2022-01-14)

  • [重大变更] 对模型进行更改以标记作业为可折叠。

  • [重大变更] 对游标索引进行模型更改。

  • 如果以下任一条件成立,则允许在相同的游标上安排多个挂起的作业:

    1. 队列或方法与游标上现有的挂起作业不同。

    2. 在安排作业时将 collapse_on_cursor 设置为 False

0.5.7 (2021-03-07)

  • 为作业上下文添加一个 schedule_id 属性,以便作业可以知道它们是否是从安排中执行的。

0.5.6 (2021-02-28)

  • 某些从作业中抛出的 UnicodeDecodeError 异常可能会触发序列化失败(UntranslatableCharacter),因为它们可能包含序列 \u0000`,虽然这在 Python 中是有效的,但 Postgres 不允许。因此,在处理原始字节时,我们将使用替换字符对其进行解码,以便可以正确存储。虽然不是理想的情况,但比完全无法存储错误要好。

0.5.5 (2021-01-22)

  • 修复了一些旧代码,导致作业完成后释放工作者锁。

0.5.4 (2021-01-20)

  • 在标记作业为丢失时记录错误级别的日志。

0.5.3 (2021-01-11)

  • schedule_id 信息复制到重试作业。

0.5.2 (2021-01-11)

  • [重大变更] 要求 call_schedule 接受一个 ID 而不是对象。

0.5.1 (2021-01-09)

  • 删除背景作业 lock_id 列的 UNIQUE 约束。

0.5 (2021-01-09)

0.4.5 (2020-12-22)

  • 在插入语句中使用列对象以支持ORM级别的同义词,使模式可以具有不同名称的列。

0.4.4 (2019-11-07)

  • 确保在作业完成时释放advisory锁。

0.4.3 (2019-10-31)

  • 确保维护(查找丢失的作业)始终在由timeout参数定义的固定间隔内运行。

0.4.2 (2019-10-30)

  • 当连接丢失时,通过重新锁定它们并确保它们被标记为运行来恢复活动作业。

0.4.1 (2019-10-30)

  • 尝试重新连接到数据库,如果重新连接尝试失败,则崩溃。

0.4 (2019-10-28)

  • 向Job模型添加一个worker列,以跟踪处理作业的工作进程。

  • 向MQWorker添加一个可选的name参数,以命名工作进程 - 该值将被记录在每个作业中。

  • 向MQWorker添加一个threads参数(默认为1),以支持从同一工作进程实例处理多个作业,而不是为每个线程创建一个工作进程。

  • 向MQWorker添加一个capture_signals参数(默认为True),它将捕获SIGTERM、SIGINT和SIGUSR1。前两者将触发优雅关闭 - 它们将使进程停止处理新作业,同时完成活动作业。后者将向stderr输出当前工作进程状态的JSON转储。

0.3.3 (2019-10-23)

  • 只有当作业成功完成时才保存游标更新。

0.3.2 (2019-10-22)

  • 在超时时标记丢失的作业,而不仅仅是当工作进程启动时,以便更早地捕获它们。

0.3.1 (2019-10-17)

  • 当尝试使用游标和早于同一游标上的挂起作业的scheduled_time安排作业时,作业将更新为在较早的时间运行。

  • 当尝试使用游标和同一游标上已经存在挂起作业时,可以向MQSource.call提供conflict_resolver函数来更新作业属性,合并参数的方式由用户决定。

0.3 (2019-10-15)

  • 向Job模型添加一个新的cursor_snapshot列,它将包含作业开始时游标的值。

0.2 (2019-10-09)

  • 为作业添加游标支持。这需要模式迁移来添加cursor_key列、一个新的JobCursor模型和一些新的索引。

0.1.6 (2019-10-07)

  • 在psycopg2_mq.MQSource.call中支持传递自定义kwargs到作业,以允许作业表上有自定义列。

0.1.5 (2019-05-17)

  • 修复了序列化字符串或循环时的回归。

0.1.4 (2019-05-09)

  • 当作业失败时更安全地序列化异常对象。

0.1.3 (2018-09-04)

  • 将线程重命名为在处理作业时包含作业id。

0.1.2 (2018-09-04)

  • 将Job.params重命名为Job.args。

0.1.1 (2018-09-04)

  • 使psycopg2成为一个可选依赖项,以便允许应用程序如果愿意可以依赖于psycopg2-binary。

0.1 (2018-09-04)

  • 初始版本。

项目详情


下载文件

下载适用于您平台的文件。如果您不确定选择哪个,请了解更多关于安装包的信息。

源分布

psycopg2_mq-0.10.tar.gz (28.7 kB 查看哈希值)

上传时间

构建分布

psycopg2_mq-0.10-py3-none-any.whl (20.8 kB 查看哈希值)

上传时间 Python 3

支持