跳转到主要内容

基于文件系统的任务队列

项目描述

文件系统任务队列

这是一个使用文件系统作为消息队列的任务队列。这个项目是由难以运行持久化服务(如redis、rabbitmq或数据库)的使用场景激发的。如果您能够运行持久化服务,您应该首选这种方法。这个包的初始动机是通过登录节点上的服务来提交任务到HPC集群,并在HPC工作节点上处理任务,而不需要在登录节点上运行服务。

本项目使用 filelock。使用这个库,可以安全地说,如果底层文件系统遵循 flock 调用,则每个任务都将保证只执行一次。如果任何工作节点在非符合规定的文件系统上,至少保证执行一次。

请注意,NFS v2和v3通过 rpc.lockd 使用外部文件锁系统,这不是在所有地方都启用的,因为它是一个外部服务。当前的NFS v4内置了对文件锁的支持,但问题是许多HPC中心仍在使用v3。否则,现在可以假设您的文件系统支持锁。

请注意,任务状态是在文件系统中管理的。因此,如果任务数量巨大,请不要使用此功能。可以考虑将任务分块或将它们发送到dask(然后将其分解成许多小任务)的插件,例如file_queue.plugins.dask.DaskWorker。每次任务状态修改都会在文件系统中产生2-4次I/O操作。

安装

  • pip install fs-task-queue

API

创建队列非常简单,只需提供队列所在的目录。

from fs_task_queue import Queue

queue = Queue("path/to/queue")

通过相同的接口也支持通过SSH提交作业和监控。当前工作进程无法通过SSH连接。

from fs_task_queue.plugins import SSHQueue

queue = SSHQueue("ssh://<username>:<password>@<hostname>:<port>/<path>")

接下来,我们可以将作业提交/入队到队列中。

import operator

job = queue.enqueue(operator.add, 1, 2)

您可以直接尝试获取作业的结果或获取其状态。

print(job.get_status())
print(job.result)

您还可以等待作业完成

result = job.wait()

工作进程

启动工作进程就像提供队列所在的文件系统目录一样简单。

fs-task-queue-worker --path ./path/to/queue

支持通过fs_task_queue.plugin.dask.DaskWorker使用dask工作进程,用于将作业发送到dask集群而不是在本地执行。

工作进程在任务队列中运行一个持续循环,收集任务。工作进程创建一个文件path/to/queue/workers/<worker-id>,其中它会

  • 每30秒持续接触该文件
  • 检查该文件是否存在,如果不存在,则停止工作进程

许可

BSD-3

项目详情


下载文件

下载适用于您平台的文件。如果您不确定选择哪个,请了解更多关于安装包的信息。

源代码分发

fs-task-queue-0.1.9.tar.gz (12.3 kB 查看哈希)

上传时间 源代码