基于文件系统的任务队列
项目描述
文件系统任务队列
这是一个使用文件系统作为消息队列的任务队列。这个项目是由难以运行持久化服务(如redis、rabbitmq或数据库)的使用场景激发的。如果您能够运行持久化服务,您应该首选这种方法。这个包的初始动机是通过登录节点上的服务来提交任务到HPC集群,并在HPC工作节点上处理任务,而不需要在登录节点上运行服务。
本项目使用 filelock。使用这个库,可以安全地说,如果底层文件系统遵循 flock 调用,则每个任务都将保证只执行一次。如果任何工作节点在非符合规定的文件系统上,至少保证执行一次。
请注意,NFS v2和v3通过 rpc.lockd
使用外部文件锁系统,这不是在所有地方都启用的,因为它是一个外部服务。当前的NFS v4内置了对文件锁的支持,但问题是许多HPC中心仍在使用v3。否则,现在可以假设您的文件系统支持锁。
请注意,任务状态是在文件系统中管理的。因此,如果任务数量巨大,请不要使用此功能。可以考虑将任务分块或将它们发送到dask(然后将其分解成许多小任务)的插件,例如file_queue.plugins.dask.DaskWorker
。每次任务状态修改都会在文件系统中产生2-4次I/O操作。
安装
pip install fs-task-queue
API
创建队列非常简单,只需提供队列所在的目录。
from fs_task_queue import Queue
queue = Queue("path/to/queue")
通过相同的接口也支持通过SSH提交作业和监控。当前工作进程无法通过SSH连接。
from fs_task_queue.plugins import SSHQueue
queue = SSHQueue("ssh://<username>:<password>@<hostname>:<port>/<path>")
接下来,我们可以将作业提交/入队到队列中。
import operator
job = queue.enqueue(operator.add, 1, 2)
您可以直接尝试获取作业的结果或获取其状态。
print(job.get_status())
print(job.result)
您还可以等待作业完成
result = job.wait()
工作进程
启动工作进程就像提供队列所在的文件系统目录一样简单。
fs-task-queue-worker --path ./path/to/queue
支持通过fs_task_queue.plugin.dask.DaskWorker
使用dask
工作进程,用于将作业发送到dask集群而不是在本地执行。
工作进程在任务队列中运行一个持续循环,收集任务。工作进程创建一个文件path/to/queue/workers/<worker-id>
,其中它会
- 每30秒持续接触该文件
- 检查该文件是否存在,如果不存在,则停止工作进程
许可
项目详情
fs-task-queue-0.1.9.tar.gz的哈希
算法 | 哈希摘要 | |
---|---|---|
SHA256 | 38b11255440ea871098ee28db4dc227c8c46bc3f4b70a6fe7237bf4273e04d6e |
|
MD5 | 8f737f86011741b6d536a5a88fa4238b |
|
BLAKE2b-256 | 0f6ae48d43f54c83b83d23821a644966842c3ebcb2ce049b60a6b675b6615718 |