Django Channels的分布式任务。
项目描述
Django-CQ
描述
尝试为Django channels实现一个分布式任务队列。在RQ和Celery的基础上建模,可以实现复杂的任务工作流,所有这些都利用了Channels机制。
原因
有三个原因
-
目标是更容错的任务。有许多场合需要存储有关测试进展的信息。对于重要的任务,即使在Redis故障或工作进程关闭的情况下,也应将其存储。
-
倾向于利用与channels相同的机制。
-
希望能够有一些围绕子任务的额外功能,这些功能在Celery或RQ中似乎不可用。
限制
有两个限制
-
必须使用REDIS作为Django缓存。
-
asgi_redis
必须用作通道后端。
正在进行工作以取消这些限制。
安装
如果可以,请使用pip
pip install django-cq
或者生活在你自己的边缘
pip install -e https://github.com/furious-luke/django-cq.git#egg=django-cq
将包添加到您的设置文件中
INSTALLED_APPS = [
'cq',
...
]
并在您的通道路由列表中包含路由信息
channel_routing = [
include('cq.routing.channel_routing'),
...
]
您需要迁移以包括模型
./manage.py migrate
您可能希望为您的CQ任务创建一个新的通道层。默认层在通道消息上的存活时间很短,这会导致运行时间较长的任务杀死任何排队中的消息。更新您的设置文件以包括以下内容
CHANNEL_LAYERS = {
'default': {
...
},
'long': {
'BACKEND': 'asgi_redis.RedisChannelLayer',
'CONFIG': {
'hosts': [REDIS_URL],
'expiry': 1800,
'channel_capacity': {
'cq-tasks': 1000
}
},
'ROUTING': 'path.to.your.channels.channel_routing',
},
}
CQ_CHANNEL_LAYER = 'long'
为了处理“cq-tasks”通道上发送的消息,需要启动一个工作进程
./manage.py cq_runworker
任务
基本任务使用很简单
@task
def send_email(cqt, addr):
...
return 'OK'
task = send_emails.delay('dummy@dummy.org')
task.wait()
print(task.result) # "OK"
在这里,cqt
是send_email
任务的表示。这可以用来启动子任务、链式后续任务等。
也可以通过直接调用它们来按顺序运行任务
result = send_email('dummy@dummy.org')
print(result) # "OK"
子任务
对于更复杂的流程,可以从父任务中启动子任务
@task
def send_emails(cqt):
...
for addr in email_addresses:
cqt.subtask(send_email, addr)
...
return 'OK'
task = send_emails.delay()
task.wait()
print(task.result) # "OK"
子任务与其他使用delay
在任务内部启动的任务之间的区别在于,子任务的父任务在所有子任务都完成后才会标记为完成。
from cq.models import Task
@task
def parent(cqt):
task_a.delay() # not a subtask
cqt.subtask(task_b) # subtask
parent.delay()
parent.status == Task.STATUS_WAITING # True
# once task_b completes
parent.wait()
parent.status == Task.STATUS_COMPLETE # True
链式任务
待办事项
@task
def calculate_something(cqt):
return calc_a.delay(3).chain(add_a_to_4, (4,))
非原子任务
默认情况下,每个CQ任务都是原子的;除非任务在没有异常的情况下完成,否则对数据库的更改将不会持久化。如果您需要即使在发生错误的情况下也保留对数据库的更改,请使用atomic
标志
@task(atomic=False)
def unsafe_task(cqt):
pass
日志记录
对于长时间运行的任务,能够访问任务进展的持续日志非常有用。CQ任务有一个log
方法,可以将日志消息发送到标准Django日志流,并将其缓存在正在运行的任务中。
@task
def long_task(cqt):
cqt.log('standard old log')
cqt.log('debugging log', logging.DEBUG)
如果当前任务是子任务,日志将发送到父任务。这样,有一个中心任务(顶级任务),可以用来监控子任务和链式任务网络的进度和状态。
性能
由于日志的处理方式,在有很多频繁的日志消息时可能会出现性能问题。有两种方法可以防止这种情况。
通过尽可能将publish
设置为False
来减少日志频率。这将缓存日志本地并存储在下一次publish=True
调用中。
@task
def long_task(cqt):
for ii in range(100):
cqt.log('iteration %d' % ii, publish=False)
cqt.log('done') # publish=True
其次,通过限制保留的日志行数来减少日志量。limit
选项指定了这一点。以下示例将只保留10个日志迭代
@task
def long_task(cqt):
for ii in range(100):
cqt.log('iteration %d' % ii, publish=False)
cqt.log('done', limit=10)
存活时间
待办事项
重复任务
CQ 搭载了强大的重复任务功能。创建重复任务有两种方法:
-
从 Django 管理界面。
-
使用数据迁移。
从管理界面进入 cq
和 重复任务
。在那里您可以创建一个新的重复任务,指定要调用的后台任务和重复的 CRON 时间。
要从迁移中创建重复任务,请使用辅助函数 schedule_task
。
from django.db import migrations
from cq.models import schedule_task
from myapp.tasks import a_task
def add_repeating(apps, scema_editor):
RepeatingTask = apps.get_model('cq.RepeatingTask')
schedule_task(
RepeatingTask,
'* * * * *',
a_task
)
class Migration(migrations.Migration):
operations = [
migrations.RunPython(add_repeating, reverse_code=migrations.RunPython.noop)
]
合并
合并任务的挂起或排队实例将阻止其他任务实例运行。
项目详情
下载文件
下载适合您平台的文件。如果您不确定选择哪个,请了解有关 安装包 的更多信息。
源分布
构建分布
django_cq-0.3.3-py3-none-any.whl 的哈希值
算法 | 哈希摘要 | |
---|---|---|
SHA256 | 93dcf2ea6dccb85fc5e00b6454fc8fb0d715aa94c4f91d797396f387e743caea |
|
MD5 | e15089339bade72295fa6b6f1cfb8e93 |
|
BLAKE2b-256 | 1bc2091e8837e10d90087605ca122465a6b112d3a54ab99a236e5c9bcbbb9af7 |