PQ是一个PostgreSQL的事务队列。
项目描述
这是一个用Python编写的PostgreSQL事务队列系统。
它允许您以各种方式将项目推入队列或从队列中取出,同时也提供两种调度选项:延迟处理和优先级。
该系统使用一个单独的表来存储所有队列中的作业;具体细节很容易自定义。
该系统目前仅支持psycopg2数据库驱动程序 - 或者为PyPy的psycopg2cffi。
基本的队列实现类似于Ryan Smith用Ruby编写的queue_classic库,但使用SKIP LOCKED进行并发控制。
在性能方面,该实现每秒大约有1000次操作。使用PyPy解释器,这会与可用核心的数量成线性比例。
入门
所有功能都封装在单个类PQ中。
class PQ(conn=None, pool=None, table="queue", schema=None)
可选的schema参数可以用来在必要时对表进行模式限定。
示例用法
from psycopg2 import connect
from pq import PQ
conn = connect('dbname=example user=postgres')
pq = PQ(conn)
对于多线程操作,可以使用连接池,例如 psycopg2.pool.ThreadedConnectionPool。
你可能想要确保数据库使用 utf-8 编码创建。
要创建和配置队列表,请调用 create() 方法。
pq.create()
队列
pq 对象通过 Python 的字典接口暴露队列。
queue = pq['apples']
queue 对象提供 get 和 put 方法,如下所述,并且它还作为一个上下文管理器工作,它管理一个事务。
with queue as cursor:
...
上下文管理器内的语句要么作为事务提交,要么原子性地拒绝。这在使用队列管理作业时很有用,因为它允许您从队列中检索作业,执行作业并写入结果,具有事务语义。
方法
使用 put(data) 方法将项目插入队列。它接受与 JSON 兼容的对象,如 Python 字典。
queue.put({'kind': 'Cox'})
queue.put({'kind': 'Arthur Turner'})
queue.put({'kind': 'Golden Delicious'})
使用 get(block=True) 从队列中提取项目。默认行为是阻塞,直到有项目可用,默认超时为一秒,之后返回 None。
def eat(kind):
print 'umm, %s apples taste good.' % kind
job = queue.get()
eat(**job.data)
job 对象除了 data 属性外还提供额外的元数据,如字符串表示所示。
>>> job <pq.Job id=77709 size=1 enqueued_at="2014-02-21T16:22:06Z" schedule_at=None>
获取操作也可以通过迭代访问。
for job in queue:
if job is None:
break
eat(**job.data)
迭代器在无项目可用时将阻塞。同样,默认超时为一秒,之后迭代器返回 None。
然后应用程序可以选择跳出循环,或再次等待项目准备好。
for job in queue:
if job is not None:
eat(**job.data)
# This is an infinite loop!
调度
项目可以调度,以便它们在稍后的时间才被提取。
queue.put({'kind': 'Cox'}, '5m')
在此示例中,项目将在五分钟后准备好。该方法还接受 datetime 和 timedelta 对象。
优先级
如果某些项目比其他项目更重要,可以表达时间期望。
queue.put({'kind': 'Cox'}, expected_at='5m')
这告诉队列处理器优先处理此项目,而不是在较晚时间期望的项目,反之亦然,优先处理期望时间较早的项目。注意,未设置优先级的项目将最后被提取。
可以组合调度和优先级选项。
queue.put({'kind': 'Cox'}, '1h', '2h')
此项目将在一小时后提取,即使如此,它也仅根据其两小时的优先级进行处理。
编码和解码
使用内置的 json 模块将任务数据编码和解码为 JSON。如果您想使用不同的实现或需要配置此,请将 encode 和/或 decode 参数传递给 PQ 构造函数。
Pickles
如果提供队列名称为 <name>/pickle(例如,'jobs/pickle'),则项目将自动使用 Python 内置的 cPickle 模块进行序列化和反序列化。
queue = pq['apples/pickle']
class Apple(object):
def __init__(self, kind):
self.kind = kind
queue.put(Apple('Cox'))
这允许您存储大多数对象,而无需添加任何额外的序列化代码。
使用旧 pickle 协议 0 来确保序列化数据被编码为 ascii,这应该与任何数据库编码兼容。请注意,pickle 数据在数据库级别仍然被包装为 JSON 字符串。
虽然使用pickle协议是一种简单的对象序列化方法,但对于高级用户来说,直接在对象上使用JSON序列化可能更好,例如,可以使用内置的json模块中的对象钩子机制或子类化JSONEncoder。
任务
pq提供了一个高级API,帮助管理任务。
from pq.tasks import PQ
pq = PQ(...)
queue = pq['default']
@queue.task(schedule_at='1h')
def eat(job_id, kind):
print 'umm, %s apples taste good.' % kind
eat('Cox')
queue.work()
tasks的jobs在失败时可以选择重新调度
@queue.task(schedule_at='1h', max_retries=2, retry_in='10s')
def eat(job_id, kind):
# ...
可以在调用task时覆盖时间期望
eat('Cox', _expected_at='2m', _schedule_at='1m')
** 注意 ** 第一个位置参数是作业的id。它是PostgreSQL中记录的PK。
线程安全
只要提供连接池,其中每个线程都获得自己的数据库连接,所有对象都是线程安全的。
变更
1.9.1 (2023-04-04)
添加对PostgreSQL 14的支持 [kalekseev]。
请参阅https://postgresql.ac.cn/docs/14/release-14.html#id-1.11.6.11.4。
添加使用自定义模式的支持(问题#35)。
1.9.0 (2020-09-29)
任务执行器现在将job_id作为第一个参数接收。
1.8.2 (2020-08-14)
添加了对超过63个字符的队列名称的支持。
如果使用超过此限制的名称,则需要数据库迁移(删除并重新创建pq_notify触发器)。如果不使用,则不需要迁移。
如果在检索过程中抛出异常,则将连接返回到池中
1.8.1 (2019-07-30)
添加可覆盖的encode和decode方法,它们负责将任务数据转换为JSON及其相反过程。
1.8.0 (2019-07-03)
更改任务优先级的策略。现在,具有nullexpected_at值的任务将在设置值的项之后处理。
1.7.0 (2019-04-07)
使用SKIP LOCKED而不是咨询锁机制(PostgreSQL 9.5+)。
1.6.1 (2018-11-14)
修复队列类工厂模式。
1.6 (2018-11-12)
修复与NamedTupleCursor的兼容性。
修复重复列名问题。
添加了指定自己的队列类的选项。
1.5 (2017-04-18)
修复了Python 2的兼容性问题。
1.4 (2016-03-25)
添加了工作类和处理辅助装饰器。[jeanphix]
1.3 (2015-05-11)
Python 3兼容性。[migurski]
修复时区问题。
1.2 (2014-10-21)
改进
修复了并发问题,其中随着队列大小的增加,会保留大量锁。
修复数据库连接资源问题。
1.1 (2014-02-27)
特性
队列现在也是上下文管理器,提供了事务语义。
队列现在返回任务对象,这些对象提供元数据并允许读取和写入任务数据。
改进
现在可以使用相同的连接池与不同的队列一起使用。
错误
Literal字符串包装器与psycopg2不正确地工作。
事务管理器现在正确地将连接返回到池中。
1.0 (2013-11-20)
初始公共发布。
项目详情
pq-1.9.1.tar.gz的哈希值
算法 | 哈希摘要 | |
---|---|---|
SHA256 | d64af0efb8a3ebd11b2e0e662a6426c5e1e99f7f2e446f4439ba0699193e4ad6 |
|
MD5 | 646c9654bcd997136a2592a2d4372d3d |
|
BLAKE2b-256 | a1797babafd0ec7e279078f03a6a402e0160e158028b0afdede436c4efef2004 |