跳转到主要内容

调度celery任务在可能遥远的未来运行

项目描述

使用单独的存储后端(目前仅支持redis)和cronjob组合,调度celery任务在可能遥远的未来运行。

使用方法

  • 通过添加设置如 longterm_scheduler_backend = 'redis://localhost:6739/1' 到您的celery配置来配置存储。(存储还尊重内置的celery配置设置 redis_socket_timeoutredis_socket_connect_timeoutredis_max_connections。)

  • 配置您的celery应用使用自定义的任务类 MYCELERY = celery.Celery(task_cls=celery_longterm_scheduler.Task)

  • 设置cronjob来运行 celery longterm_scheduler(例如,每5分钟运行一次)

  • 现在您可以通过调用 mytask.apply_async(args, kwargs, eta=datetime) 正常调度任务。这返回一个正常的 AsyncResult 对象,但仅支持读取 .id;任何其他方法或属性可能会明确或隐式失败。

  • 您可以通过调用 celery_longterm_scheduler.get_scheduler(MYCELERY).revoke('mytaskid') 完全删除一个已计划的作业(遗憾的是,我们无法挂钩到 celery 内置的 AsyncResult.revoke())。revoke() 在成功时返回 True,如果给定的任务在存储后端找不到(例如,因为它已经到期并执行),则返回 False。

与将正常作业发送到 celery 代理(附带时间信息)不同,这将在调度器存储后端创建一个作业条目。cronjob 然后定期检查存储中是否有到期的作业,然后才将正常的 celery 作业发送到代理。

理由

为什么不使用 celery 内置的 apply_async(eta=)?因为您永远无法真正删除一个挂起的作业。 AsyncResult('mytaskid').revoke() 只能将任务 ID 添加到 statedb 中,它必须永远存在,以便作业被识别为已撤销。对于计划在 6 个月或更长时间后运行的作业,这将会创建一个无法管理的、不断增长的 statedb。

为什么不使用 celerybeat?因为它是为周期性作业构建的,而我们需要单次作业。而且从 celerybeat 实现中获益不大,特别是既然我们想使用 redis 作为存储(因为我们已经将其用作代理和结果后端)。

实现

Redis 架构

celery_longterm_scheduler 假设它与一个专用 redis 数据库通信。它使用 SET jobid job-configuration 为每个计划作业创建一个条目(作业配置使用 JSON 序列化),并使用一个名为 scheduled_task_id_by_time 的单个排序集,其中包含按到期时的 Unix 时间戳(UTC)评分的 jobids。

运行测试

使用 toxpy.test。也许需要安装 tox(例如,通过 pip install tox),然后只需运行 tox

对于集成测试,您需要安装 redis 二进制文件(测试将启动 自己的服务器)。

celery_longterm_scheduler 变更

1.3.0 (2024-01-08)

  • 也支持 rediss:// URL

1.2.0 (2022-06-23)

  • 升级到 celery-5.x

1.1.2 (2020-05-27)

  • 添加 bw-compat 以使 py3 可以读取 py2 序列化的任务

1.1.1 (2019-12-19)

  • 升级到当前 redis 客户端库版本 3.x

1.1.0 (2019-11-28)

  • 使 Python-3 兼容。

1.0.1 (2018-01-17)

  • 不要尝试在 apply_async(eta=None) 调用上进行调度

1.0.0 (2017-09-29)

  • 初始发布

项目详情


下载文件

下载您平台上的文件。如果您不确定选择哪个,请了解有关 安装包 的更多信息。

源分布

celery_longterm_scheduler-1.3.0.tar.gz (13.5 kB 查看哈希值

上传时间

支持者