未提供项目描述
项目描述
django-postgres-queue
django-postgres-queue 是一个由 postgres 支持的 Django 任务队列系统。
为什么是 postgres?
你以为你不应该使用 RDBMS 作为队列吗?然而,postgres 有一些特性使其不像你想象的那么糟糕,它有一些吸引人的优点。
事务行为和可靠性。
添加任务与其他数据库工作原子性。不需要使用 transaction.on_commit 钩子,也没有事务提交但任务丢失的风险。
处理任务与其他数据库工作原子性。任务执行完成的数据库工作要么提交,要么任务不会被标记为已处理,绝无例外。如果任务只执行数据库工作,你就能实现真正的至少一次消息处理。
操作简单性。
通过重用我们无论如何都在使用的持久性、事务性存储,无需配置、监控和备份另一个有状态服务。对于小型团队和轻量级负载,这是一个正确的权衡。
易于内省。
由于任务存储在数据库表中,因此可以轻松查询和监视队列的状态。
安全性。
通过使用 postgres 事务,如果工作进程死亡,则不会留下处于锁定或模糊状态的工作。任务立即可供另一个工作进程获取。你甚至可以kill -9一个工作进程,并确保你的数据库和队列将保持一致状态。
优先级队列。
由于在选取下一个任务时明确指定了顺序,因此可以轻松确保优先级高的任务首先处理。
缺点
比专用队列服务器的吞吐量低。
比专用队列服务器更难扩展关系数据库。
雷鸣般的群体。Postgres 没有通知单个工作进程唤醒的方法,所以我们可以在使用 LISTEN/NOTIFY 将任务入队时唤醒每个工作进程,或者工作进程必须进行短轮询。
在至少一次交付的情况下,Postgres 事务必须在任务执行期间保持打开状态。对于长时间运行的任务,这可能会导致表膨胀和性能问题。
当任务在至少一次交付的情况下崩溃或抛出异常时,它将立即有资格重试。如果您想实现重试延迟,您必须捕获异常并带有延迟重新入队任务。如果您的任务在未抛出异常的情况下崩溃(例如 SIGKILL),您可能会陷入无限重试循环,这会阻止其他任务的处理。
工作原理
django-postgres-queue 能够在一个查询中声明、处理和删除任务。
DELETE FROM dpq_job
WHERE id = (
SELECT id
FROM dpq_job
WHERE execute_at <= now()
ORDER BY priority DESC, created_at
FOR UPDATE SKIP LOCKED
LIMIT 1
)
RETURNING *;
一旦这个查询运行,任务就不能被其他工作进程声明。当事务提交时,任务将被删除。如果事务回滚或工作进程崩溃,任务将立即对其他工作进程可用。
为了实现至少一次交付,我们开始一个事务,处理任务,然后提交事务。对于最多一次,我们声明任务并立即提交事务,然后处理任务。对于没有任何外部影响且仅执行数据库操作的任务,至少一次的行为实际上是恰好一次(因为工作的声明和数据库工作将一起提交或回滚)。
与 Celery 的比较
django-postgres-queue 与 Celery 扮演相同的角色。除了使用 postgres 作为其后端之外,它旨在更简单,没有 Celery 所做的任何奇怪的操作(元编程、篡改日志、自动导入模块)。有一些样板代码来弥补元编程的缺乏,但我认为这比通过字符串导入事物更好。
用法
要求
django-postgres-queue 需要 Python 3、至少 postgres 9.5 和至少 Django 1.11。
安装
使用 pip 进行安装
pip install django-postgres-queue
然后添加 'dpq' 到您的 INSTALLED_APPS。运行 manage.py migrate 以创建作业表。
实例化一个队列对象。这可以放在任何地方,并且可以命名为您想要的任何名称。例如,someapp/queue.py
from dpq.queue import AtLeastOnceQueue
queue = AtLeastOnceQueue(
tasks={
# ...
},
notify_channel='my-queue',
)
您需要将此队列实例导入到队列或处理任务中。使用 AtLeastOnceQueue 进行至少一次交付,或使用 AtMostOnceQueue 进行最多一次交付。
django-postgres-queue 附带一个管理命令基类,您可以使用它来消费任务。可以按您喜欢的任何名称调用它,例如在 someapp/management/commands/worker.py
from dpq.commands import Worker
from someapp.queue import queue
class Command(Worker):
queue = queue
然后您可以使用 manage.py worker 运行来启动您的工人。
任务函数接受两个参数 - 正在使用的队列实例以及此任务的作业实例。函数可以定义在任何地方,并按您喜欢的任何名称调用。以下是一个示例
def debug_task(queue, job):
print(job.args)
要将它注册为任务,请将其添加到您的队列实例中
queue = AtLeastOnceQueue(tasks={
'debug_task': debug_task,
})
关键字是任务名称,用于入队任务。它不必与函数名称匹配。
要入队任务,请使用队列实例上的 enqueue 方法
queue.enqueue('debug_task', {'some_args': 0})
假设您为该队列运行了一个工作进程,任务将立即运行。第二个参数必须是单个可序列化为 json 的值,并将作为 job.args 可用于任务。
监控
任务只是存储在 dpq_job 表中的数据库行,因此您可以使用 SQL 来监控系统。
要获取当前任务的数量
SELECT count(*) FROM dpq_job WHERE execute_at <= now()
这包括准备处理的任务和当前正在处理的任务。要查看当前正在处理的任务,我们需要对 postgres 行锁有可见性。这可以通过 pgrowlocks 扩展 提供。一旦安装,此查询将计算正在运行的任务数量
SELECT count(*)
FROM pgrowlocks('dpq_job')
WHERE 'For Update' = ANY(modes);
您可以将pgrowlocks的结果与dpq_job结合,以获取正在进行的任务的全列表。
日志记录
django-postgres-queue通过Python的日志框架进行日志记录,因此可以使用Django设置中的LOGGING字典进行配置。默认配置下不会记录任何内容,因此请确保配置某种形式的日志记录。所有内容都记录在dpq命名空间下。以下是一个示例配置,它将INFO级别的消息记录到stdout
LOGGING = {
'version': 1,
'root': {
'level': 'DEBUG',
'handlers': ['console'],
},
'formatters': {
'verbose': {
'format': '%(levelname)s %(asctime)s %(module)s %(process)d %(thread)d %(message)s',
},
},
'handlers': {
'console': {
'level': 'INFO',
'class': 'logging.StreamHandler',
'formatter': 'verbose',
},
},
'loggers': {
'dpq': {
'handlers': ['console'],
'level': 'INFO',
'propagate': False,
},
}
}
同时,也将WARNING及以上级别的消息记录到类似Sentry的系统中也是明智的。
LOGGING = {
'version': 1,
'root': {
'level': 'INFO',
'handlers': ['sentry', 'console'],
},
'formatters': {
'verbose': {
'format': '%(levelname)s %(asctime)s %(module)s %(process)d %(thread)d %(message)s',
},
},
'handlers': {
'console': {
'level': 'INFO',
'class': 'logging.StreamHandler',
'formatter': 'verbose',
},
'sentry': {
'level': 'WARNING',
'class': 'raven.contrib.django.handlers.SentryHandler',
},
},
'loggers': {
'dpq': {
'level': 'INFO',
'handlers': ['console', 'sentry'],
'propagate': False,
},
},
}
您还可以使用内置的logging.FileHandler将日志记录到文件中。
实用技巧
这些技巧不是《django-postgres-queue》的官方支持特性。我们提供它们,以便您可以模仿其他任务队列中的一些常见功能。
测试中运行任务
在测试将任务排队到队列中的代码时,显式运行测试中的所有待处理任务可能很有用。为此,您可以
while queue.run_once(): pass
这将运行迄今为止已排队的所有任务,现在您可以断言它们做了正确的事情。
CELERY_ALWAYS_EAGER
Celery使用CELERY_ALWAYS_EAGER设置立即运行任务,而无需将其排队到工作者。它可以在测试期间以及在没有工作者关闭的开发环境中调试时使用。
class EagerAtLeastOnceQueue(AtLeastOnceQueue):
def enqueue(self, *args, **kwargs):
job = super().enqueue(*args, **kwargs)
if settings.QUEUE_ALWAYS_EAGER:
self.run_job(job)
return job
项目详情
makavafal-django-postgres-queue-1.0.5.tar.gz的哈希
算法 | 哈希摘要 | |
---|---|---|
SHA256 | 88c44759a785c0c1204c01a4c77ddcc02dfbe7fb71c95a1074de9ce1760476d8 |
|
MD5 | 27ec957f2dd2752ed8cb439a95e8160a |
|
BLAKE2b-256 | 3873d93ebfd1ae7a1552cfe69a6f07f5153a68cff7dfaf0cef8e3bdc83c0e0a7 |
makavafal_django_postgres_queue-1.0.5-py3-none-any.whl的哈希
算法 | 哈希摘要 | |
---|---|---|
SHA256 | 409b09e0c3aff96b8e9665e780f89a73b9c1006200e050634be84f8af748b648 |
|
MD5 | eb6b9885124d5f1ac5b56560117e4fea |
|
BLAKE2b-256 | c394a1f6646cfccc6b873d408e4ef3ce396235e2b2cdce4ff49a783735482685 |