跳转到主要内容

有用的celery扩展。

项目描述

celery-heimdall

codecov GitHub PyPI - Python Version

Celery Heimdall是一个基于Redis构建的Celery后台工作框架的通用工具集,它不试图处理每个用例,而是提供一个简单、现代、易于维护的即插即用解决方案,适用于90%的项目。

功能

  • 全局唯一任务,允许在任意时刻或时间周期内只执行一个任务副本(例如:“不允许排队,直到一小时后”)
  • 全局速率限制。Celery内置了速率限制,但它是对每个工作进程的速率限制,这使得它不适用于限制API请求等目的。

安装

pip install celery-heimdall

使用

唯一任务

假设你有一个在用户按下按钮时启动的任务。这个任务需要很长时间和大量资源来生成报告。你不想用户按下按钮10次并启动10个任务。在这种情况下,你需要Heimdall所称为的唯一任务

from celery import shared_task
from celery_heimdall import HeimdallTask

@shared_task(base=HeimdallTask, heimdall={'unique': True})
def generate_report(customer_id):
    pass

我们在这里所做的只是更改Celery将用于运行任务的基类,并传递一些Heimdall将使用的选项。这个任务现在是唯一的——对于给定的参数,只有一个会在同一时间运行。

过期

如果我们的任务终止或发生错误,会发生什么?我们可能会陷入一个永远不会释放锁的情况,这被称为死锁。为了解决这个问题,我们在任务再次排队之前设置了一个最大时间。

from celery import shared_task
from celery_heimdall import HeimdallTask

@shared_task(
  base=HeimdallTask,
  heimdall={
    'unique': True,
    'unique_timeout': 60 * 60
  }
)
def generate_report(customer_id):
  pass

现在,即使任务卡住、工作者内存耗尽、机器着火等情况,generate_report也将在一个小时后再次被允许运行。

自定义键

默认情况下,任务名称及其参数的哈希值用作锁键。但这种情况可能并不是你想要的。如果你只想一次处理一个报告,即使是为不同的客户呢?例如:

from celery import shared_task
from celery_heimdall import HeimdallTask

@shared_task(
  base=HeimdallTask,
  heimdall={
    'unique': True,
    'key': lambda args, kwargs: 'generate_report'
  }
)
def generate_report(customer_id):
  pass

通过指定自己的键函数,我们可以完全自定义判断任务是否唯一的方式。

现有任务

默认情况下,如果你尝试排队一个正在运行的唯一任务,Heimdall将返回现有任务的AsyncResult。这让你可以编写不需要关心任务是否唯一的简单代码。想象一个简单的API端点,当它被调用时开始生成报告,但我们只想让它一次运行一次。以下是你需要的所有内容:

import time
from celery import shared_task
from celery_heimdall import HeimdallTask

@shared_task(base=HeimdallTask, heimdall={'unique': True})
def generate_report(customer_id):
  time.sleep(10)

def my_api_call(customer_id: int):
  return {
    'status': 'RUNNING',
    'task_id': generate_report.delay(customer_id).id
  }

每次以相同的customer_id调用my_api_call时,generate_report.delay()将返回相同的task_id,直到原始任务完成。

有时你会在尝试再次排队任务时捕捉到任务已经在运行。我们可以告诉Heimdall在这种情况下抛出异常。

import time
from celery import shared_task
from celery_heimdall import HeimdallTask, AlreadyQueuedError


@shared_task(
  base=HeimdallTask,
  heimdall={
    'unique': True,
    'unique_raises': True
  }
)
def generate_report(customer_id):
  time.sleep(10)


def my_api_call(customer_id: int):
  try:
    task = generate_report.delay(customer_id)
    return {'status': 'STARTED', 'task_id': task.id}
  except AlreadyQueuedError as exc:
    return {'status': 'ALREADY_RUNNING', 'task_id': exc.likely_culprit}

在定义任务时将unique_raises设置为True,当你尝试两次排队一个唯一的任务时,将抛出AlreadyQueuedError异常。该AlreadyQueuedError有两个属性:

  • likely_culprit,其中包含已运行的任务的ID;
  • expires_in,这是已运行的任务被认为已过期的剩余时间(以秒为单位)。

唯一间隔任务

如果我们希望任务每小时只运行一次,即使它已经完成,会发生什么?在这种情况下,我们希望它运行,但完成时不要清除锁。

from celery import shared_task
from celery_heimdall import HeimdallTask

@shared_task(
  base=HeimdallTask,
  heimdall={
    'unique': True,
    'unique_timeout': 60 * 60,
    'unique_wait_for_expiry': True
  }
)
def generate_report(customer_id):
  pass

通过将unique_wait_for_expiry设置为True,任务将完成,并且不允许在unique_timeout过去之前再次排队generate_report()

速率限制

Celery提供了开箱即用的速率限制。然而,这种速率限制是按工作者基础进行的。没有可靠的方法来跨所有工作者限制任务的速率。Heimdall使这变得简单。

from celery import shared_task
from celery_heimdall import HeimdallTask, RateLimit

@shared_task(
  base=HeimdallTask,
  heimdall={
    'rate_limit': RateLimit((2, 60))
  }
)
def download_report_from_amazon(customer_id):
  pass

这意味着“每60秒,只允许这个任务运行2次”。如果任务无法运行,因为它会违反速率限制,它将被重新安排。

需要注意的是,这并不保证你的任务将每秒正好运行两次,只是它不会每秒运行超过两次。任务将通过随机抖动重新安排,以防止暴风雨群问题

动态速率限制

就像你可以动态提供任务的关键一样,你还可以基于该关键动态提供速率限制。

from celery import shared_task
from celery_heimdall import HeimdallTask, RateLimit


@shared_task(
  base=HeimdallTask,
  heimdall={
    # Provide a lower rate limit for the customer with the ID 10, for everyone
    # else provide a higher rate limit.
    'rate_limit': RateLimit(lambda args: (1, 30) if args[0] == 10 else (2, 30)),
    'key': lambda args, kwargs: f'customer_{args[0]}'
  }
)
def download_report_from_amazon(customer_id):
  pass

灵感

这些是受本项目启发的更成熟的项目,它们可能支持比本项目更旧的Celery和Python版本。

项目详情


下载文件

下载适用于您平台的文件。如果您不确定选择哪个,请了解更多关于安装包的信息。

源代码分发

celery_heimdall-1.0.1.tar.gz (14.0 kB 查看哈希值)

上传时间: 源代码

构建分发

celery_heimdall-1.0.1-py3-none-any.whl (14.7 kB 查看哈希值)

上传时间: Python 3

支持