有用的celery扩展。
项目描述
celery-heimdall
Celery Heimdall是一个基于Redis构建的Celery后台工作框架的通用工具集,它不试图处理每个用例,而是提供一个简单、现代、易于维护的即插即用解决方案,适用于90%的项目。
功能
- 全局唯一任务,允许在任意时刻或时间周期内只执行一个任务副本(例如:“不允许排队,直到一小时后”)
- 全局速率限制。Celery内置了速率限制,但它是对每个工作进程的速率限制,这使得它不适用于限制API请求等目的。
安装
pip install celery-heimdall
使用
唯一任务
假设你有一个在用户按下按钮时启动的任务。这个任务需要很长时间和大量资源来生成报告。你不想用户按下按钮10次并启动10个任务。在这种情况下,你需要Heimdall所称为的唯一任务
from celery import shared_task
from celery_heimdall import HeimdallTask
@shared_task(base=HeimdallTask, heimdall={'unique': True})
def generate_report(customer_id):
pass
我们在这里所做的只是更改Celery将用于运行任务的基类,并传递一些Heimdall将使用的选项。这个任务现在是唯一的——对于给定的参数,只有一个会在同一时间运行。
过期
如果我们的任务终止或发生错误,会发生什么?我们可能会陷入一个永远不会释放锁的情况,这被称为死锁。为了解决这个问题,我们在任务再次排队之前设置了一个最大时间。
from celery import shared_task
from celery_heimdall import HeimdallTask
@shared_task(
base=HeimdallTask,
heimdall={
'unique': True,
'unique_timeout': 60 * 60
}
)
def generate_report(customer_id):
pass
现在,即使任务卡住、工作者内存耗尽、机器着火等情况,generate_report
也将在一个小时后再次被允许运行。
自定义键
默认情况下,任务名称及其参数的哈希值用作锁键。但这种情况可能并不是你想要的。如果你只想一次处理一个报告,即使是为不同的客户呢?例如:
from celery import shared_task
from celery_heimdall import HeimdallTask
@shared_task(
base=HeimdallTask,
heimdall={
'unique': True,
'key': lambda args, kwargs: 'generate_report'
}
)
def generate_report(customer_id):
pass
通过指定自己的键函数,我们可以完全自定义判断任务是否唯一的方式。
现有任务
默认情况下,如果你尝试排队一个正在运行的唯一任务,Heimdall将返回现有任务的AsyncResult
。这让你可以编写不需要关心任务是否唯一的简单代码。想象一个简单的API端点,当它被调用时开始生成报告,但我们只想让它一次运行一次。以下是你需要的所有内容:
import time
from celery import shared_task
from celery_heimdall import HeimdallTask
@shared_task(base=HeimdallTask, heimdall={'unique': True})
def generate_report(customer_id):
time.sleep(10)
def my_api_call(customer_id: int):
return {
'status': 'RUNNING',
'task_id': generate_report.delay(customer_id).id
}
每次以相同的customer_id
调用my_api_call
时,generate_report.delay()
将返回相同的task_id
,直到原始任务完成。
有时你会在尝试再次排队任务时捕捉到任务已经在运行。我们可以告诉Heimdall在这种情况下抛出异常。
import time
from celery import shared_task
from celery_heimdall import HeimdallTask, AlreadyQueuedError
@shared_task(
base=HeimdallTask,
heimdall={
'unique': True,
'unique_raises': True
}
)
def generate_report(customer_id):
time.sleep(10)
def my_api_call(customer_id: int):
try:
task = generate_report.delay(customer_id)
return {'status': 'STARTED', 'task_id': task.id}
except AlreadyQueuedError as exc:
return {'status': 'ALREADY_RUNNING', 'task_id': exc.likely_culprit}
在定义任务时将unique_raises
设置为True
,当你尝试两次排队一个唯一的任务时,将抛出AlreadyQueuedError
异常。该AlreadyQueuedError
有两个属性:
likely_culprit
,其中包含已运行的任务的ID;expires_in
,这是已运行的任务被认为已过期的剩余时间(以秒为单位)。
唯一间隔任务
如果我们希望任务每小时只运行一次,即使它已经完成,会发生什么?在这种情况下,我们希望它运行,但完成时不要清除锁。
from celery import shared_task
from celery_heimdall import HeimdallTask
@shared_task(
base=HeimdallTask,
heimdall={
'unique': True,
'unique_timeout': 60 * 60,
'unique_wait_for_expiry': True
}
)
def generate_report(customer_id):
pass
通过将unique_wait_for_expiry
设置为True
,任务将完成,并且不允许在unique_timeout
过去之前再次排队generate_report()
。
速率限制
Celery提供了开箱即用的速率限制。然而,这种速率限制是按工作者基础进行的。没有可靠的方法来跨所有工作者限制任务的速率。Heimdall使这变得简单。
from celery import shared_task
from celery_heimdall import HeimdallTask, RateLimit
@shared_task(
base=HeimdallTask,
heimdall={
'rate_limit': RateLimit((2, 60))
}
)
def download_report_from_amazon(customer_id):
pass
这意味着“每60秒,只允许这个任务运行2次”。如果任务无法运行,因为它会违反速率限制,它将被重新安排。
需要注意的是,这并不保证你的任务将每秒正好运行两次,只是它不会每秒运行超过两次。任务将通过随机抖动重新安排,以防止暴风雨群问题。
动态速率限制
就像你可以动态提供任务的关键一样,你还可以基于该关键动态提供速率限制。
from celery import shared_task
from celery_heimdall import HeimdallTask, RateLimit
@shared_task(
base=HeimdallTask,
heimdall={
# Provide a lower rate limit for the customer with the ID 10, for everyone
# else provide a higher rate limit.
'rate_limit': RateLimit(lambda args: (1, 30) if args[0] == 10 else (2, 30)),
'key': lambda args, kwargs: f'customer_{args[0]}'
}
)
def download_report_from_amazon(customer_id):
pass
灵感
这些是受本项目启发的更成熟的项目,它们可能支持比本项目更旧的Celery和Python版本。
- celery_once,遗憾的是该项目已被废弃,这也是本项目存在的原因。
- celery_singleton
- 由Vigrond编写的此片段,以及各个贡献者的后续改进。
项目详情
下载文件
下载适用于您平台的文件。如果您不确定选择哪个,请了解更多关于安装包的信息。
源代码分发
构建分发
celery_heimdall-1.0.1.tar.gz的哈希值
算法 | 哈希摘要 | |
---|---|---|
SHA256 | ab95c44e156b11fb3e0d58d50d1d69af17840e86ccfbaf5dd883c46408040bc7 |
|
MD5 | c7543d65b278a10f1d0714b764e8f059 |
|
BLAKE2b-256 | 86e7162c446d346ad8282581e77238f0a139771811aef67600425fa47ce6d5c0 |
celery_heimdall-1.0.1-py3-none-any.whl的哈希值
算法 | 哈希摘要 | |
---|---|---|
SHA256 | f7327f39bbd0242ca45d00c56b8c9021863a44f7a392b5b7e91e09931c13ce85 |
|
MD5 | 4cb37cf681201c38f3f7ed121e8c37a0 |
|
BLAKE2b-256 | 1752e264911b21804b3dada053aa8babb04c1fd1bbef50ae90cc1340adf67b69 |