跳转到主要内容

Django Channels的分布式任务。

项目描述

Django-CQ

PyPI version

描述

尝试为Django channels实现一个分布式任务队列。在RQ和Celery的基础上建模,可以实现复杂的任务工作流,所有这些都利用了Channels机制。

原因

有三个原因

  1. 目标是更容错的任务。有许多场合需要存储有关测试进展的信息。对于重要的任务,即使在Redis故障或工作进程关闭的情况下,也应将其存储。

  2. 倾向于利用与channels相同的机制。

  3. 希望能够有一些围绕子任务的额外功能,这些功能在Celery或RQ中似乎不可用。

限制

有两个限制

  • 必须使用REDIS作为Django缓存。

  • asgi_redis必须用作通道后端。

正在进行工作以取消这些限制。

安装

如果可以,请使用pip

pip install django-cq

或者生活在你自己的边缘

pip install -e https://github.com/furious-luke/django-cq.git#egg=django-cq

将包添加到您的设置文件中

INSTALLED_APPS = [
   'cq',
   ...
]

并在您的通道路由列表中包含路由信息

channel_routing = [
    include('cq.routing.channel_routing'),
    ...
]

您需要迁移以包括模型

./manage.py migrate

您可能希望为您的CQ任务创建一个新的通道层。默认层在通道消息上的存活时间很短,这会导致运行时间较长的任务杀死任何排队中的消息。更新您的设置文件以包括以下内容

CHANNEL_LAYERS = {
    'default': {
        ...
    },
    'long': {
        'BACKEND': 'asgi_redis.RedisChannelLayer',
        'CONFIG': {
            'hosts': [REDIS_URL],
            'expiry': 1800,
            'channel_capacity': {
                'cq-tasks': 1000
            }
        },
        'ROUTING': 'path.to.your.channels.channel_routing',
    },
}

CQ_CHANNEL_LAYER = 'long'

为了处理“cq-tasks”通道上发送的消息,需要启动一个工作进程

./manage.py cq_runworker

任务

基本任务使用很简单

@task
def send_email(cqt, addr):
    ...
    return 'OK'

task = send_emails.delay('dummy@dummy.org')
task.wait()
print(task.result)  # "OK"

在这里,cqtsend_email任务的表示。这可以用来启动子任务、链式后续任务等。

也可以通过直接调用它们来按顺序运行任务

result = send_email('dummy@dummy.org')
print(result)  # "OK"

子任务

对于更复杂的流程,可以从父任务中启动子任务

@task
def send_emails(cqt):
    ...
    for addr in email_addresses:
        cqt.subtask(send_email, addr)
    ...
    return 'OK'

task = send_emails.delay()
task.wait()
print(task.result)  # "OK"

子任务与其他使用delay在任务内部启动的任务之间的区别在于,子任务的父任务在所有子任务都完成后才会标记为完成。

from cq.models import Task

@task
def parent(cqt):
    task_a.delay()  # not a subtask
    cqt.subtask(task_b)  # subtask

parent.delay()
parent.status == Task.STATUS_WAITING  # True
# once task_b completes
parent.wait()
parent.status == Task.STATUS_COMPLETE  # True

链式任务

待办事项

@task
def calculate_something(cqt):
    return calc_a.delay(3).chain(add_a_to_4, (4,))

非原子任务

默认情况下,每个CQ任务都是原子的;除非任务在没有异常的情况下完成,否则对数据库的更改将不会持久化。如果您需要即使在发生错误的情况下也保留对数据库的更改,请使用atomic标志

@task(atomic=False)
def unsafe_task(cqt):
    pass

日志记录

对于长时间运行的任务,能够访问任务进展的持续日志非常有用。CQ任务有一个log方法,可以将日志消息发送到标准Django日志流,并将其缓存在正在运行的任务中。

@task
def long_task(cqt):
    cqt.log('standard old log')
    cqt.log('debugging log', logging.DEBUG)

如果当前任务是子任务,日志将发送到父任务。这样,有一个中心任务(顶级任务),可以用来监控子任务和链式任务网络的进度和状态。

性能

由于日志的处理方式,在有很多频繁的日志消息时可能会出现性能问题。有两种方法可以防止这种情况。

通过尽可能将publish设置为False来减少日志频率。这将缓存日志本地并存储在下一次publish=True调用中。

@task
def long_task(cqt):
    for ii in range(100):
        cqt.log('iteration %d' % ii, publish=False)
    cqt.log('done')  # publish=True

其次,通过限制保留的日志行数来减少日志量。limit选项指定了这一点。以下示例将只保留10个日志迭代

@task
def long_task(cqt):
    for ii in range(100):
        cqt.log('iteration %d' % ii, publish=False)
    cqt.log('done', limit=10)

存活时间

待办事项

重复任务

CQ 搭载了强大的重复任务功能。创建重复任务有两种方法:

  1. 从 Django 管理界面。

  2. 使用数据迁移。

从管理界面进入 cq重复任务。在那里您可以创建一个新的重复任务,指定要调用的后台任务和重复的 CRON 时间。

要从迁移中创建重复任务,请使用辅助函数 schedule_task

from django.db import migrations
from cq.models import schedule_task

from myapp.tasks import a_task


def add_repeating(apps, scema_editor):
    RepeatingTask = apps.get_model('cq.RepeatingTask')
    schedule_task(
        RepeatingTask,
        '* * * * *',
        a_task
    )


class Migration(migrations.Migration):
    operations = [
        migrations.RunPython(add_repeating, reverse_code=migrations.RunPython.noop)
    ]

合并

合并任务的挂起或排队实例将阻止其他任务实例运行。

项目详情


下载文件

下载适合您平台的文件。如果您不确定选择哪个,请了解有关 安装包 的更多信息。

源分布

此版本没有可用的源分布文件。请参阅 生成分布存档 的教程。

构建分布

django_cq-0.3.3-py3-none-any.whl (32.7 kB 查看哈希值)

上传时间 Python 3

支持者: