调度celery任务在可能遥远的未来运行
项目描述
使用单独的存储后端(目前仅支持redis)和cronjob组合,调度celery任务在可能遥远的未来运行。
使用方法
通过添加设置如 longterm_scheduler_backend = 'redis://localhost:6739/1' 到您的celery配置来配置存储。(存储还尊重内置的celery配置设置 redis_socket_timeout,redis_socket_connect_timeout 和 redis_max_connections。)
配置您的celery应用使用自定义的任务类 MYCELERY = celery.Celery(task_cls=celery_longterm_scheduler.Task)
设置cronjob来运行 celery longterm_scheduler(例如,每5分钟运行一次)
现在您可以通过调用 mytask.apply_async(args, kwargs, eta=datetime) 正常调度任务。这返回一个正常的 AsyncResult 对象,但仅支持读取 .id;任何其他方法或属性可能会明确或隐式失败。
您可以通过调用 celery_longterm_scheduler.get_scheduler(MYCELERY).revoke('mytaskid') 完全删除一个已计划的作业(遗憾的是,我们无法挂钩到 celery 内置的 AsyncResult.revoke())。revoke() 在成功时返回 True,如果给定的任务在存储后端找不到(例如,因为它已经到期并执行),则返回 False。
与将正常作业发送到 celery 代理(附带时间信息)不同,这将在调度器存储后端创建一个作业条目。cronjob 然后定期检查存储中是否有到期的作业,然后才将正常的 celery 作业发送到代理。
理由
为什么不使用 celery 内置的 apply_async(eta=)?因为您永远无法真正删除一个挂起的作业。 AsyncResult('mytaskid').revoke() 只能将任务 ID 添加到 statedb 中,它必须永远存在,以便作业被识别为已撤销。对于计划在 6 个月或更长时间后运行的作业,这将会创建一个无法管理的、不断增长的 statedb。
为什么不使用 celerybeat?因为它是为周期性作业构建的,而我们需要单次作业。而且从 celerybeat 实现中获益不大,特别是既然我们想使用 redis 作为存储(因为我们已经将其用作代理和结果后端)。
实现
Redis 架构
celery_longterm_scheduler 假设它与一个专用 redis 数据库通信。它使用 SET jobid job-configuration 为每个计划作业创建一个条目(作业配置使用 JSON 序列化),并使用一个名为 scheduled_task_id_by_time 的单个排序集,其中包含按到期时的 Unix 时间戳(UTC)评分的 jobids。
运行测试
使用 tox 和 py.test。也许需要安装 tox(例如,通过 pip install tox),然后只需运行 tox。
对于集成测试,您需要安装 redis 二进制文件(测试将启动 自己的服务器)。
celery_longterm_scheduler 变更
1.3.0 (2024-01-08)
也支持 rediss:// URL
1.2.0 (2022-06-23)
升级到 celery-5.x
1.1.2 (2020-05-27)
添加 bw-compat 以使 py3 可以读取 py2 序列化的任务
1.1.1 (2019-12-19)
升级到当前 redis 客户端库版本 3.x
1.1.0 (2019-11-28)
使 Python-3 兼容。
1.0.1 (2018-01-17)
不要尝试在 apply_async(eta=None) 调用上进行调度
1.0.0 (2017-09-29)
初始发布
项目详情
下载文件
下载您平台上的文件。如果您不确定选择哪个,请了解有关 安装包 的更多信息。
源分布
celery_longterm_scheduler-1.3.0.tar.gz 的哈希值
算法 | 哈希摘要 | |
---|---|---|
SHA256 | d19a9d0f9d598cf8b13de35c2c2fdcd647b8c279c632ba959b0b1bc84ece86ea |
|
MD5 | 6feea098d09bf98593db89425064e3ab |
|
BLAKE2b-256 | ed418f9f78e698ecd769a08121acf5a27e62d365b8b630778c88cb64ce991eb7 |