跳转到主要内容

未提供项目描述

项目描述

django-postgres-queue

django-postgres-queue 是一个由 postgres 支持的 Django 任务队列系统。

为什么是 postgres?

你以为你不应该使用 RDBMS 作为队列吗?然而,postgres 有一些特性使其不像你想象的那么糟糕,它有一些吸引人的优点。

  • 事务行为和可靠性。

    添加任务与其他数据库工作原子性。不需要使用 transaction.on_commit 钩子,也没有事务提交但任务丢失的风险。

    处理任务与其他数据库工作原子性。任务执行完成的数据库工作要么提交,要么任务不会被标记为已处理,绝无例外。如果任务只执行数据库工作,你就能实现真正的至少一次消息处理。

  • 操作简单性。

    通过重用我们无论如何都在使用的持久性、事务性存储,无需配置、监控和备份另一个有状态服务。对于小型团队和轻量级负载,这是一个正确的权衡。

  • 易于内省。

    由于任务存储在数据库表中,因此可以轻松查询和监视队列的状态。

  • 安全性。

    通过使用 postgres 事务,如果工作进程死亡,则不会留下处于锁定或模糊状态的工作。任务立即可供另一个工作进程获取。你甚至可以kill -9一个工作进程,并确保你的数据库和队列将保持一致状态。

  • 优先级队列。

    由于在选取下一个任务时明确指定了顺序,因此可以轻松确保优先级高的任务首先处理。

缺点

  • 比专用队列服务器的吞吐量低。

  • 比专用队列服务器更难扩展关系数据库。

  • 雷鸣般的群体。Postgres 没有通知单个工作进程唤醒的方法,所以我们可以在使用 LISTEN/NOTIFY 将任务入队时唤醒每个工作进程,或者工作进程必须进行短轮询。

  • 在至少一次交付的情况下,Postgres 事务必须在任务执行期间保持打开状态。对于长时间运行的任务,这可能会导致表膨胀和性能问题。

  • 当任务在至少一次交付的情况下崩溃或抛出异常时,它将立即有资格重试。如果您想实现重试延迟,您必须捕获异常并带有延迟重新入队任务。如果您的任务在未抛出异常的情况下崩溃(例如 SIGKILL),您可能会陷入无限重试循环,这会阻止其他任务的处理。

工作原理

django-postgres-queue 能够在一个查询中声明、处理和删除任务。

DELETE FROM dpq_job
WHERE id = (
    SELECT id
    FROM dpq_job
    WHERE execute_at <= now()
    ORDER BY priority DESC, created_at
    FOR UPDATE SKIP LOCKED
    LIMIT 1
)
RETURNING *;

一旦这个查询运行,任务就不能被其他工作进程声明。当事务提交时,任务将被删除。如果事务回滚或工作进程崩溃,任务将立即对其他工作进程可用。

为了实现至少一次交付,我们开始一个事务,处理任务,然后提交事务。对于最多一次,我们声明任务并立即提交事务,然后处理任务。对于没有任何外部影响且仅执行数据库操作的任务,至少一次的行为实际上是恰好一次(因为工作的声明和数据库工作将一起提交或回滚)。

与 Celery 的比较

django-postgres-queue 与 Celery 扮演相同的角色。除了使用 postgres 作为其后端之外,它旨在更简单,没有 Celery 所做的任何奇怪的操作(元编程、篡改日志、自动导入模块)。有一些样板代码来弥补元编程的缺乏,但我认为这比通过字符串导入事物更好。

用法

要求

django-postgres-queue 需要 Python 3、至少 postgres 9.5 和至少 Django 1.11。

安装

使用 pip 进行安装

pip install django-postgres-queue

然后添加 'dpq' 到您的 INSTALLED_APPS。运行 manage.py migrate 以创建作业表。

实例化一个队列对象。这可以放在任何地方,并且可以命名为您想要的任何名称。例如,someapp/queue.py

from dpq.queue import AtLeastOnceQueue

queue = AtLeastOnceQueue(
    tasks={
        # ...
    },
    notify_channel='my-queue',
)

您需要将此队列实例导入到队列或处理任务中。使用 AtLeastOnceQueue 进行至少一次交付,或使用 AtMostOnceQueue 进行最多一次交付。

django-postgres-queue 附带一个管理命令基类,您可以使用它来消费任务。可以按您喜欢的任何名称调用它,例如在 someapp/management/commands/worker.py

from dpq.commands import Worker

from someapp.queue import queue

class Command(Worker):
    queue = queue

然后您可以使用 manage.py worker 运行来启动您的工人。

任务函数接受两个参数 - 正在使用的队列实例以及此任务的作业实例。函数可以定义在任何地方,并按您喜欢的任何名称调用。以下是一个示例

def debug_task(queue, job):
    print(job.args)

要将它注册为任务,请将其添加到您的队列实例中

queue = AtLeastOnceQueue(tasks={
    'debug_task': debug_task,
})

关键字是任务名称,用于入队任务。它不必与函数名称匹配。

要入队任务,请使用队列实例上的 enqueue 方法

queue.enqueue('debug_task', {'some_args': 0})

假设您为该队列运行了一个工作进程,任务将立即运行。第二个参数必须是单个可序列化为 json 的值,并将作为 job.args 可用于任务。

监控

任务只是存储在 dpq_job 表中的数据库行,因此您可以使用 SQL 来监控系统。

要获取当前任务的数量

SELECT count(*) FROM dpq_job WHERE execute_at <= now()

这包括准备处理的任务和当前正在处理的任务。要查看当前正在处理的任务,我们需要对 postgres 行锁有可见性。这可以通过 pgrowlocks 扩展 提供。一旦安装,此查询将计算正在运行的任务数量

SELECT count(*)
FROM pgrowlocks('dpq_job')
WHERE 'For Update' = ANY(modes);

您可以将pgrowlocks的结果与dpq_job结合,以获取正在进行的任务的全列表。

日志记录

django-postgres-queue通过Python的日志框架进行日志记录,因此可以使用Django设置中的LOGGING字典进行配置。默认配置下不会记录任何内容,因此请确保配置某种形式的日志记录。所有内容都记录在dpq命名空间下。以下是一个示例配置,它将INFO级别的消息记录到stdout

LOGGING = {
    'version': 1,
    'root': {
        'level': 'DEBUG',
        'handlers': ['console'],
    },
    'formatters': {
        'verbose': {
            'format': '%(levelname)s %(asctime)s %(module)s %(process)d %(thread)d %(message)s',
        },
    },
    'handlers': {
        'console': {
            'level': 'INFO',
            'class': 'logging.StreamHandler',
            'formatter': 'verbose',
        },
    },
    'loggers': {
        'dpq': {
            'handlers': ['console'],
            'level': 'INFO',
            'propagate': False,
        },
    }
}

同时,也将WARNING及以上级别的消息记录到类似Sentry的系统中也是明智的。

LOGGING = {
    'version': 1,
    'root': {
        'level': 'INFO',
        'handlers': ['sentry', 'console'],
    },
    'formatters': {
        'verbose': {
            'format': '%(levelname)s %(asctime)s %(module)s %(process)d %(thread)d %(message)s',
        },
    },
    'handlers': {
        'console': {
            'level': 'INFO',
            'class': 'logging.StreamHandler',
            'formatter': 'verbose',
        },
        'sentry': {
            'level': 'WARNING',
            'class': 'raven.contrib.django.handlers.SentryHandler',
        },
    },
    'loggers': {
        'dpq': {
            'level': 'INFO',
            'handlers': ['console', 'sentry'],
            'propagate': False,
        },
    },
}

您还可以使用内置的logging.FileHandler将日志记录到文件中。

实用技巧

这些技巧不是《django-postgres-queue》的官方支持特性。我们提供它们,以便您可以模仿其他任务队列中的一些常见功能。

测试中运行任务

在测试将任务排队到队列中的代码时,显式运行测试中的所有待处理任务可能很有用。为此,您可以

while queue.run_once(): pass

这将运行迄今为止已排队的所有任务,现在您可以断言它们做了正确的事情。

CELERY_ALWAYS_EAGER

Celery使用CELERY_ALWAYS_EAGER设置立即运行任务,而无需将其排队到工作者。它可以在测试期间以及在没有工作者关闭的开发环境中调试时使用。

class EagerAtLeastOnceQueue(AtLeastOnceQueue):
    def enqueue(self, *args, **kwargs):
        job = super().enqueue(*args, **kwargs)
        if settings.QUEUE_ALWAYS_EAGER:
            self.run_job(job)
        return job

项目详情


下载文件

下载适合您平台的文件。如果您不确定选择哪个,请了解更多关于安装软件包的信息。

源代码分发

makavafal-django-postgres-queue-1.0.5.tar.gz (16.3 kB 查看哈希)

上传时间 源代码

构建分发

makavafal_django_postgres_queue-1.0.5-py3-none-any.whl (17.3 kB 查看哈希)

上传时间 Python 3

由以下机构支持

AWS AWS 云计算和安全赞助商 Datadog Datadog 监控 Fastly Fastly CDN Google Google 下载分析 Microsoft Microsoft PSF 赞助商 Pingdom Pingdom 监控 Sentry Sentry 错误日志 StatusPage StatusPage 状态页面