Django中背景工作者和任务的实现和回滚
项目描述
Django Tasks
Django中背景工作者和任务的实现和回滚,如DEP 0014中定义。
警告:此软件包正在积极开发中,可能会随时发布破坏性更改。如果您在生产环境中使用此软件包,请确保将其固定到特定版本。
安装
python -m pip install django-tasks
第一步是将django_tasks
添加到您的INSTALLED_APPS
中。
INSTALLED_APPS = [
# ...
"django_tasks",
]
其次,您需要配置一个后端。这将任务连接到将要执行它们的任何东西。
如果省略,则使用以下配置
TASKS = {
"default": {
"BACKEND": "django_tasks.backends.immediate.ImmediateBackend"
}
}
默认情况下包含了一些后端
django_tasks.backends.dummy.DummyBackend
:不执行任务,只存储它们。这对于测试特别有用。django_tasks.backends.immediate.ImmediateBackend
:在当前线程中立即执行任务django_tasks.backends.database.DatabaseBackend
:将任务存储在数据库中(通过Django的ORM),并使用db_worker
管理命令检索和执行它们
注意:DatabaseBackend
还需要将django_tasks.backends.database
添加到INSTALLED_APPS
。
使用方法
注意:此文档仍在进行中。更多详细信息也可以在DEP中找到。测试也是良好的详尽参考。
定义任务
使用task
装饰器创建任务。
from django_tasks import task
@task()
def calculate_meaning_of_life() -> int:
return 42
任务装饰器接受一些参数来自定义任务
priority
:任务的优先级(介于-100和100之间。默认为0)queue_name
:是否在特定队列上运行任务backend
:此任务应使用的后端名称(如TASKS
中定义)enqueue_on_commit
:当当前事务成功提交时是否将任务入队,或立即入队。默认情况下,这由后端处理(见下文)。enqueue_on_commit
不能使用.using
修改。
这些属性(除enqueue_on_commit
外)也可以使用.using
在运行时进行修改
modified_task = calculate_meaning_of_life.using(priority=10)
除了上述属性外,还可以传递run_after
以指定任务应运行的具体时间。可以传递时区和timedelta
。
入队任务
要执行任务,请调用其上的enqueue
方法
result = calculate_meaning_of_life.enqueue()
可以查询返回的TaskResult
以获取正在运行的任务的当前状态以及其返回值。
如果任务有参数,可以将这些参数原封不动地传递给enqueue
。
事务
默认情况下,任务在当前事务(如果有)成功提交后入队(使用Django的transaction.on_commit
方法),而不是立即入队。
可以使用ENQUEUE_ON_COMMIT
设置进行配置。True
和False
强制行为。
TASKS = {
"default": {
"BACKEND": "django_tasks.backends.immediate.ImmediateBackend",
"ENQUEUE_ON_COMMIT": False
}
}
也可以通过将enqueue_on_commit
传递给task
装饰器来按任务进行配置。
队列名称
默认情况下,任务被入队到“default”队列。当使用多个队列时,限制允许的名称可能很有用,以免错过任务。
TASKS = {
"default": {
"BACKEND": "django_tasks.backends.immediate.ImmediateBackend",
"QUEUES": ["default", "special"]
}
}
将任务入队到未知队列名称将引发InvalidTaskError
。
要禁用队列名称验证,将QUEUES
设置为[]
。
数据库后端工作程序
首先,您需要将django_tasks.backends.database
添加到INSTALLED_APPS
。
INSTALLED_APPS = [
# ...
"django_tasks",
"django_tasks.backends.database",
]
然后,运行迁移
./manage.py migrate
接下来,配置数据库后端
TASKS = {
"default": {
"BACKEND": "django_tasks.backends.database.DatabaseBackend"
}
}
最后,您可以运行db_worker
命令来在创建任务时运行任务。查看--help
以获取更多选项。
./manage.py db_worker
修剪旧任务
过了一段时间后,任务可能会开始在您的数据库中积累。这可以使用prune_db_task_results
管理命令来管理,该命令根据给定的保留策略删除已完成的和失败的任务。查看--help
以获取可用的选项。
检索任务结果
当入队任务时,您会得到一个TaskResult
,但可能需要在其他地方(另一个请求,另一个任务等)检索该结果。这可以通过get_result
(或aget_result
)来完成
result_id = result.id
# Later, somewhere else...
calculate_meaning_of_life.get_result(result_id)
只能以这种方式检索同类型的任务。要检索任何任务的输出,可以在后端上调用get_result
。
from django_tasks import default_task_backend
default_task_backend.get_result(result_id)
返回值
如果您的任务返回了某些内容,您可以从TaskResult
对象的.return_value
属性中检索它。访问未完成的任务(即不是COMPLETE
或FAILED
)的此属性将引发ValueError
。
assert result.status == ResultStatus.COMPLETE
assert result.return_value == 42
如果结果在后台被更新,您可以调用其上的refresh
来更新其值。使用get_result
获取的结果始终是最新的。
assert result.status == ResultStatus.NEW
result.refresh()
assert result.status == ResultStatus.COMPLETE
异常
如果任务引发了异常,其.exception
将包含引发的异常。
assert isinstance(result.exception, ValueError)
作为异常序列化过程的一部分,一些信息将丢失。跟踪信息将减少到一个字符串,您可以打印它来帮助调试。
assert isinstance(result.traceback, str)
堆栈帧、globals()
和locals()
不可用。
如果异常无法序列化,则.result
为None
。
后端内省
由于django-tasks
支持多个不同的后端,这些后端可能不支持所有功能,因此在运行时确定这一点以确保所选任务队列满足要求,或者在它不满足要求时优雅地降级功能,这可能很有用。
supports_defer
:是否可以带有run_after
属性的作业入队?supports_async_task
:是否可以入队协程?supports_get_result
:是否可以在事后检索结果(从任何线程/进程)?
from django_tasks import default_task_backend
assert default_task_backend.supports_get_result
这特别适用于与Django的系统检查框架结合使用。
信号
提供了一些信号,以便更容易地响应某些任务事件。
虽然信号可用,但它们可能不是最易于维护的方法。
django_tasks.signals.task_enqueued
:当任务入队时调用。发送者是后端类。还使用入队的task_result
调用。django_tasks.signals.task_finished
:当任务完成(COMPLETE
或FAILED
)时调用。发送者是后端类。还使用完成的task_result
调用。
贡献
有关如何贡献的信息,请参阅CONTRIBUTING.md。
项目详情
下载文件
下载您平台上的文件。如果您不确定选择哪个,请了解更多关于安装包的信息。