跳转到主要内容

PQ是一个PostgreSQL的事务队列。

项目描述

这是一个用Python编写的PostgreSQL事务队列系统。

PQ does the job!

它允许您以各种方式将项目推入队列或从队列中取出,同时也提供两种调度选项:延迟处理和优先级。

该系统使用一个单独的表来存储所有队列中的作业;具体细节很容易自定义。

该系统目前仅支持psycopg2数据库驱动程序 - 或者为PyPy的psycopg2cffi

基本的队列实现类似于Ryan Smith用Ruby编写的queue_classic库,但使用SKIP LOCKED进行并发控制。

在性能方面,该实现每秒大约有1000次操作。使用PyPy解释器,这会与可用核心的数量成线性比例。

入门

所有功能都封装在单个类PQ中。

class PQ(conn=None, pool=None, table="queue", schema=None)

可选的schema参数可以用来在必要时对表进行模式限定。

示例用法

from psycopg2 import connect
from pq import PQ

conn = connect('dbname=example user=postgres')
pq = PQ(conn)

对于多线程操作,可以使用连接池,例如 psycopg2.pool.ThreadedConnectionPool

你可能想要确保数据库使用 utf-8 编码创建。

要创建和配置队列表,请调用 create() 方法。

pq.create()

队列

pq 对象通过 Python 的字典接口暴露队列。

queue = pq['apples']

queue 对象提供 getput 方法,如下所述,并且它还作为一个上下文管理器工作,它管理一个事务。

with queue as cursor:
    ...

上下文管理器内的语句要么作为事务提交,要么原子性地拒绝。这在使用队列管理作业时很有用,因为它允许您从队列中检索作业,执行作业并写入结果,具有事务语义。

方法

使用 put(data) 方法将项目插入队列。它接受与 JSON 兼容的对象,如 Python 字典。

queue.put({'kind': 'Cox'})
queue.put({'kind': 'Arthur Turner'})
queue.put({'kind': 'Golden Delicious'})

使用 get(block=True) 从队列中提取项目。默认行为是阻塞,直到有项目可用,默认超时为一秒,之后返回 None

def eat(kind):
    print 'umm, %s apples taste good.' % kind

job = queue.get()
eat(**job.data)

job 对象除了 data 属性外还提供额外的元数据,如字符串表示所示。

>>> job
<pq.Job id=77709 size=1 enqueued_at="2014-02-21T16:22:06Z" schedule_at=None>

获取操作也可以通过迭代访问。

for job in queue:
    if job is None:
        break

    eat(**job.data)

迭代器在无项目可用时将阻塞。同样,默认超时为一秒,之后迭代器返回 None

然后应用程序可以选择跳出循环,或再次等待项目准备好。

for job in queue:
    if job is not None:
        eat(**job.data)

    # This is an infinite loop!

调度

项目可以调度,以便它们在稍后的时间才被提取。

queue.put({'kind': 'Cox'}, '5m')

在此示例中,项目将在五分钟后准备好。该方法还接受 datetimetimedelta 对象。

优先级

如果某些项目比其他项目更重要,可以表达时间期望。

queue.put({'kind': 'Cox'}, expected_at='5m')

这告诉队列处理器优先处理此项目,而不是在较晚时间期望的项目,反之亦然,优先处理期望时间较早的项目。注意,未设置优先级的项目将最后被提取。

可以组合调度和优先级选项。

queue.put({'kind': 'Cox'}, '1h', '2h')

此项目将在一小时后提取,即使如此,它也仅根据其两小时的优先级进行处理。

编码和解码

使用内置的 json 模块将任务数据编码和解码为 JSON。如果您想使用不同的实现或需要配置此,请将 encode 和/或 decode 参数传递给 PQ 构造函数。

Pickles

如果提供队列名称为 <name>/pickle(例如,'jobs/pickle'),则项目将自动使用 Python 内置的 cPickle 模块进行序列化和反序列化。

queue = pq['apples/pickle']

class Apple(object):
    def __init__(self, kind):
       self.kind = kind

queue.put(Apple('Cox'))

这允许您存储大多数对象,而无需添加任何额外的序列化代码。

使用旧 pickle 协议 0 来确保序列化数据被编码为 ascii,这应该与任何数据库编码兼容。请注意,pickle 数据在数据库级别仍然被包装为 JSON 字符串。

虽然使用pickle协议是一种简单的对象序列化方法,但对于高级用户来说,直接在对象上使用JSON序列化可能更好,例如,可以使用内置的json模块中的对象钩子机制或子类化JSONEncoder。

任务

pq提供了一个高级API,帮助管理任务

from pq.tasks import PQ

pq = PQ(...)

queue = pq['default']

@queue.task(schedule_at='1h')
def eat(job_id, kind):
    print 'umm, %s apples taste good.' % kind

eat('Cox')

queue.work()

tasksjobs在失败时可以选择重新调度

@queue.task(schedule_at='1h', max_retries=2, retry_in='10s')
def eat(job_id, kind):
    # ...

可以在调用task时覆盖时间期望

eat('Cox', _expected_at='2m', _schedule_at='1m')

** 注意 ** 第一个位置参数是作业的id。它是PostgreSQL中记录的PK。

线程安全

只要提供连接池,其中每个线程都获得自己的数据库连接,所有对象都是线程安全的。

变更

1.9.1 (2023-04-04)

1.9.0 (2020-09-29)

  • 任务执行器现在将job_id作为第一个参数接收。

1.8.2 (2020-08-14)

  • 添加了对超过63个字符的队列名称的支持。

    如果使用超过此限制的名称,则需要数据库迁移(删除并重新创建pq_notify触发器)。如果不使用,则不需要迁移。

  • 如果在检索过程中抛出异常,则将连接返回到池中

1.8.1 (2019-07-30)

  • 添加可覆盖的encodedecode方法,它们负责将任务数据转换为JSON及其相反过程。

1.8.0 (2019-07-03)

  • 更改任务优先级的策略。现在,具有nullexpected_at值的任务将在设置值的项之后处理。

1.7.0 (2019-04-07)

  • 使用SKIP LOCKED而不是咨询锁机制(PostgreSQL 9.5+)。

1.6.1 (2018-11-14)

  • 修复队列类工厂模式。

1.6 (2018-11-12)

  • 修复与NamedTupleCursor的兼容性。

  • 修复重复列名问题。

  • 添加了指定自己的队列类的选项。

1.5 (2017-04-18)

  • 修复了Python 2的兼容性问题。

1.4 (2016-03-25)

  • 添加了工作类和处理辅助装饰器。[jeanphix]

1.3 (2015-05-11)

  • Python 3兼容性。[migurski]

  • 修复时区问题。

1.2 (2014-10-21)

改进

  • 修复了并发问题,其中随着队列大小的增加,会保留大量锁。

  • 修复数据库连接资源问题。

1.1 (2014-02-27)

特性

  • 队列现在也是上下文管理器,提供了事务语义。

  • 队列现在返回任务对象,这些对象提供元数据并允许读取和写入任务数据。

改进

  • 现在可以使用相同的连接池与不同的队列一起使用。

错误

  • Literal字符串包装器与psycopg2不正确地工作。

  • 事务管理器现在正确地将连接返回到池中。

1.0 (2013-11-20)

  • 初始公共发布。

项目详情


下载文件

下载适用于您的平台文件。如果您不确定选择哪个,请了解更多关于安装包的信息。

源分发

pq-1.9.1.tar.gz (15.8 kB 查看哈希值)

上传时间:

支持者