跳转到主要内容

DVC中使用的可扩展任务队列。

项目描述

PyPI Status Python Version License

Tests Codecov pre-commit Black

dvc-task是一个用于从独立的Python应用程序中排队、运行和管理后台作业(进程)的库。dvc-task建立在Celery之上,但不需要完整的AMQP消息服务器(或任何其他传统上用作Celery代理的“重型”服务器)。

功能

  • dvc_task.proc模块用于在Celery任务中运行和管理后台进程

  • 预配置的Celery应用程序,适用于独立桌面应用程序的使用

    • 使用Kombu文件系统传输作为消息代理,并使用标准的文件系统Celery结果后端

    • 允许独立应用程序在不使用额外的消息和结果后端服务器的情况下使用Celery

  • 预配置的“临时”Celery工作进程,当Celery队列变空时将自动终止自身

    • 允许独立应用程序直接从Python代码中按需启动Celery工作进程(而不是需要“永远运行”的CLI celery工作进程)

需求

  • Celery 5.3或更高版本

  • Kombu 5.3或更高版本

注意:Celery官方不支持Windows,但dvc-task在Windows上已测试过(并在Windows上的DVC中使用)。

安装

您可以通过pipPyPI安装dvc-task

$ pip install dvc-task

使用

进程 (dvc_task.proc)

进程模块提供了一个简单的API来管理后台任务中的后台进程。后台进程在Celery任务中运行,但进程状态与Celery分开存储,因此可以从Celery生产者或消费者应用程序外部访问管理进程的信息。

配置好Celery应用程序后,可以通过ProcessManager.run(返回proc.tasks.run Celery任务的签名)来排队(并运行)作业

from dvc_task.proc import ProcessManager

@app.task
def my_task():
    manager = ProcessManager(wdir=".")
    manager.run(["echo", "hello world"], name="foo").delay()

ProcessManager将在wdir中为每个管理的进程创建一个子目录。

$ tree .
.
└── 25mYD6MyLNewXXdMVYCCr3
    ├── 25mYD6MyLNewXXdMVYCCr3.json
    ├── 25mYD6MyLNewXXdMVYCCr3.out
    └── 25mYD6MyLNewXXdMVYCCr3.pid
1 directory, 3 files

至少,该目录将包含<id>.pid<id>.json文件。

  • <id>.json:一个描述进程的JSON文件,包含以下字典键
    • pid:进程PID

    • stdout:进程重定向的stdout文件路径(默认重定向到<id>.out

    • stderr:进程重定向的stderr文件路径(默认将stderr重定向到stdout)

    • stdin:进程重定向的stdin文件路径(交互式进程尚不支持,stdin目前总是null

    • returncode:进程的返回码(如果进程尚未退出,则为null

  • <id>.pid:仅包含进程PID的标准pidfile

ProcessManager实例可以在Celery任务外创建,以按需管理和监控进程

>>> from dvc_task.proc import ProcessManager
>>> manager = ProcessManager(wdir=".")
>>> names = [name for name, _info in manager.processes()]
['25mYD6MyLNewXXdMVYCCr3']
>>> for line in manager.follow(names[0]):
...     print(line)
...
hello world

Celery Workers (dvc_task.worker)

dvc-task包含一个预配置的Celery工作进程(TemporaryWorker),可以从Python代码启动。当队列变空时,TemporaryWorker将消费Celery任务。一旦队列变空,工作进程将等待直到指定的超时时间,以添加新任务到队列。如果超时后队列仍然为空,工作进程将退出。

以下是如何实例化具有60秒超时,Celery工作进程名称为my-worker-1的工作进程

>>> from dvc_task.worker import TemporaryWorker
>>> worker = TemporaryWorker(my_app, timeout=60)
>>> worker.start("my-worker-1")

注意,worker.start在调用线程中运行Celery工作进程。

Celery Applications (dvc_task.app)

dvc-task包含一个预配置的Celery应用程序(FSApp),它使用Kombu文件系统传输作为Celery代理,并使用Celery文件系统结果存储后端。 FSApp旨在在无法使用传统Celery生产者/消费者设置(具有适当的消息和存储后端)的独立Python应用程序中使用。

>>> from dvc_task.app import FSApp
>>> my_app = FSApp(wdir=".")

FSApp 提供迭代器以访问 Kombu 消息,这些消息可能正在队列中等待,或者已经被处理。这允许调用者在不使用 Celery inspect API 的情况下访问 Celery 任务信息(该 API 仅在 Celery 工作进程积极运行时有效)。

>>> for msg in my_app.iter_processed():
...     msg
<Message object at 0x102e7f0d0 with details {'state': 'RECEIVED', 'content_type': 'application/json', 'delivery_tag': '0244c11a-1bcc-47fc-8587-66909a55fdc6', ...}>
<Message object at 0x1027fd4c0 with details {'state': 'RECEIVED', 'content_type': 'application/json', 'delivery_tag': '491415d1-9527-493a-a5d7-88ed355da77c', ...}>
<Message object at 0x102e6f160 with details {'state': 'RECEIVED', 'content_type': 'application/json', 'delivery_tag': 'ea6ab7a4-0398-42ab-9f12-8da1f8e12a8a', ...}>
<Message object at 0x102e6f310 with details {'state': 'RECEIVED', 'content_type': 'application/json', 'delivery_tag': '77c4a335-2102-4bee-9cb8-ef4d8ef9713f', ...}>

贡献

欢迎贡献。要了解更多信息,请参阅贡献指南

许可证

在 Apache 2.0 许可证下分发,dvc-task 是免费和开源软件。

问题

如果您遇到任何问题,请附上详细描述提交问题

项目详情


下载文件

下载适合您平台的文件。如果您不确定选择哪个,请了解更多关于 安装包 的信息。

源分发

dvc_task-0.40.1.tar.gz (36.1 kB 查看哈希值)

上传时间

构建分发

dvc_task-0.40.1-py3-none-any.whl (21.4 kB 查看哈希值)

上传时间 Python 3

支持者

AWSAWS 云计算和安全赞助商 DatadogDatadog 监控 FastlyFastly CDN GoogleGoogle 下载分析 MicrosoftMicrosoft PSF赞助商 PingdomPingdom 监控 SentrySentry 错误记录 StatusPageStatusPage 状态页面