用于bash命令DAG的cmd_queue模块
项目描述
命令队列 - cmd_queue
阅读文档 |
|
Gitlab |
|
Pypi |
|
幻灯片 |
https://docs.google.com/presentation/d/1BjJkjMx6bxu1uek-hAGpwj760u9rraVn7st8J5OsZME |
这是一个简单的模块,用于“生成”一个bash脚本,可以在单个机器上调度多个作业(如果可能的话,并行执行)。它具有3个后端,复杂性递增:串行、tmux和slurm。
在串行模式下,将编写一个bash脚本,按顺序执行您的作业。没有外部依赖项
在tmux模式下,将打开多个tmux会话,每个会话都执行您的作业的独立部分。处理依赖项。
在slurm模式下,使用真正的重量级调度算法。在这种模式下,我们只需将您的作业转换为slurm命令并执行它们。
在内部,我们根据您指定的依赖关系构建一个DAG(有向无环图),并使用它来适当地排序作业。
默认情况下,执行您的作业的bash脚本会输出到控制台。如果用户只想手动运行管道的一部分,这提供了细粒度的控制。但如果被要求运行,cmd_queue将执行bash作业。
特性
Bash命令调度
执行是可选的,也可以只打印命令
总是可用的无并行性串行后端
基于Tmux的轻量级后端
基于Slurm的重型后端
Python和bash接口
丰富的监控/实时控制
安装
cmd_queue软件包可在PyPI上获得。
pip install cmd_queue
串行队列后端始终可用。要访问其他后端,您必须安装其关联的依赖项。Tmux后端是最容易的,只需安装tmux即可(例如,在Debian系统上使用sudo apt install tmux)。
其他后端需要更复杂的设置。Slurm后端需要安装Slurm并且守护程序正在运行。Slurm后端是功能性的且经过测试,但仍可进行改进(需要帮助)。类似地,Airflow后端需要配置过的Airflow服务器,但尚未完全功能或测试(需要贡献使Airflow工作或更容易工作)!
Tmux队列演示
安装后,以下命令运行tmux队列的演示
# Reproduce the
INTERACTIVE_TEST=1 xdoctest -m cmd_queue.tmux_queue TMUXMultiQueue.monitor:1
这执行以下代码,创建两个并行的tmux工作进程并向具有非平凡依赖关系的多个bash作业提交。
# xdoctest: +REQUIRES(env:INTERACTIVE_TEST)
from cmd_queue.tmux_queue import * # NOQA
# Setup a lot of longer running jobs
n = 2
self = TMUXMultiQueue(size=n, name='demo_cmd_queue')
first_job = None
for i in range(n):
prev_job = None
for j in range(4):
command = f'sleep 1 && echo "This is job {i}.{j}"'
job = self.submit(command, depends=prev_job)
prev_job = job
first_job = first_job or job
command = f'sleep 1 && echo "this is the last job"'
job = self.submit(command, depends=[prev_job, first_job])
self.print_commands(style='rich')
self.print_graph()
if self.is_available():
self.run(block=True, other_session_handler='kill')
当运行print_commands命令时,首先显示将分布在多个新tmux会话中的所有提交的命令。这些是将要执行的命令。这有助于在队列执行前使用run之前检查您的bash命令模板是否正确。
print_graph命令将使用网络文本渲染要执行的DAG。最后,使用带有block=True的run调用,启动执行DAG并显示进度和作业状态在丰富的或文本监控器中。
在运行此演示时,可以简单地附加到tmux会话(例如,使用tmux a),并检查正在运行的特定队列。我们建议在tmux会话中使用<ctrl-b>s来查看和导航tmux会话。与Slurm后端不同,DAG的整个执行过程对开发人员来说是完全透明的!以下截图显示了运行此演示时生成的tmux会话。
默认情况下,如果没有错误,这些会话将在执行完成后退出,但这是可以配置的。同样,如果有错误,tmux会话将保持以允许进行调试。
动机
最近,我需要在2个GPU上的4个作业上运行几个作业,然后在他们全部完成后执行一个脚本。我本应该使用slurm或其他适当的排队系统来安排作业,但相反,我使用tmux编写了自己的hacky调度器。
这对我来说非常有效,并且能够有效地安排作业,而不需要在机器上使用像slurm这样的重型软件。
最终,我在机器上安装了slurm,并将我的tmux_queue API抽象为通用的“命令队列”,该队列可以使用以下三个后端之一:串行、tmux或slurm。
利基
有大量的DAG调度器
airflow
luigi
submitit
rq_scheduler
该功能适用于您有大量相互依赖的bash命令行时,并希望使用Python中定义的逻辑来模板化这些参数。
我们计划添加一个airflow后端。
使用方法
使用 cmd_queue 有两种方式
在Python中创建一个Queue对象,然后调用其 .submit 方法传入shell调用。它返回一个对象,您可以使用它来指定后续调用 .submit 的依赖项。这简单地将您所有的CLI调用组织到一个bash脚本中,可以检查并运行。有不同后端可以在满足依赖项的情况下并行执行作业。
您还可以通过CLI使用它,详细信息请参阅 cmd_queue –help。使用方法基本上相同。您创建一个队列,提交作业到它,您可以检查它,也可以运行它。
Python中使用示例
import cmd_queue
# Create a Queue object
self = cmd_queue.Queue.create(name='demo_queue', backend='serial')
# Submit bash invocations that you want to run, and mark dependencies.
job1 = self.submit('echo hello')
job2 = self.submit('echo world', depends=[job1])
job3 = self.submit('echo foo')
job4 = self.submit('echo bar', depends=[job2, job3])
job5 = self.submit('echo spam', depends=[job1])
# Print a graph of job dependencies
self.print_graph()
# Display the simplified bash script to be executed.
self.print_commands()
# Execute the jobs
self.run()
CLI中使用示例
# Create a Queue
cmd_queue new "demo_cli_queue"
# Submit bash invocations that you want to run, and mark dependencies.
cmd_queue submit --jobname job1 "demo_cli_queue" -- echo hello
cmd_queue submit --jobname job2 --depends job1 "demo_cli_queue" -- echo world
cmd_queue submit --jobname job3 "demo_cli_queue" -- echo foo
cmd_queue submit --jobname job4 --depends job1,job2 "demo_cli_queue" -- echo bar
cmd_queue submit --jobname job5 --depends job1 "demo_cli_queue" -- echo spam
# Display the simplified bash script to be executed.
cmd_queue show "demo_cli_queue" --backend=serial
# Execute the jobs
cmd_queue run "demo_cli_queue" --backend=serial
示例
所有依赖项检查和记录逻辑都由bash本身处理。用Python编写(或更好的是模板化)您的bash脚本,然后使用cmd_queue将这些命令序列“转译”为纯bash。
import cmd_queue
# Create a Queue object
self = cmd_queue.Queue.create(name='demo_queue', backend='serial')
# Submit bash invocations that you want to run, and mark dependencies.
job1 = self.submit('echo hello && sleep 0.5')
job2 = self.submit('echo world && sleep 0.5', depends=[job1])
job3 = self.submit('echo foo && sleep 0.5')
job4 = self.submit('echo bar && sleep 0.5')
job5 = self.submit('echo spam && sleep 0.5', depends=[job1])
job6 = self.submit('echo spam && sleep 0.5')
job7 = self.submit('echo err && false')
job8 = self.submit('echo spam && sleep 0.5')
job9 = self.submit('echo eggs && sleep 0.5', depends=[job8])
job10 = self.submit('echo bazbiz && sleep 0.5', depends=[job9])
# Display the simplified bash script to be executed.
self.print_commands()
# Execute the jobs
self.run()
这会按适当顺序打印bash命令以解决依赖关系。
# --- /home/joncrall/.cache/base_queue/demo_queue_2022-04-08_cc9d551e/demo_queue_2022-04-08_cc9d551e.sh
#!/bin/bash
#
# Jobs
#
### Command 1 / 10 - demo_queue-job-0
echo hello && sleep 0.5
#
### Command 2 / 10 - demo_queue-job-1
echo world && sleep 0.5
#
### Command 3 / 10 - demo_queue-job-2
echo foo && sleep 0.5
#
### Command 4 / 10 - demo_queue-job-3
echo bar && sleep 0.5
#
### Command 5 / 10 - demo_queue-job-4
echo spam && sleep 0.5
#
### Command 6 / 10 - demo_queue-job-5
echo spam && sleep 0.5
#
### Command 7 / 10 - demo_queue-job-6
echo err && false
#
### Command 8 / 10 - demo_queue-job-7
echo spam && sleep 0.5
#
### Command 9 / 10 - demo_queue-job-8
echo eggs && sleep 0.5
#
### Command 10 / 10 - demo_queue-job-9
echo bazbiz && sleep 0.5
可以通过选择更强大的后端来并行运行相同的代码。tmux后端是最轻量级的并行后端。
# Need to tell the tmux queue how many processes can run at the same time
import cmd_queue
self = cmd_queue.Queue.create(size=4, name='demo_queue', backend='tmux')
job1 = self.submit('echo hello && sleep 0.5')
job2 = self.submit('echo world && sleep 0.5', depends=[job1])
job3 = self.submit('echo foo && sleep 0.5')
job4 = self.submit('echo bar && sleep 0.5')
job5 = self.submit('echo spam && sleep 0.5', depends=[job1])
job6 = self.submit('echo spam && sleep 0.5')
job7 = self.submit('echo err && false')
job8 = self.submit('echo spam && sleep 0.5')
job9 = self.submit('echo eggs && sleep 0.5', depends=[job8])
job10 = self.submit('echo bazbiz && sleep 0.5', depends=[job9])
# Display the "user-friendly" pure bash
self.print_commands()
# Display the real bash that gets executed under the hood
# that is independencly executable, tracks the success / failure of each job,
# and manages dependencies.
self.print_commands(1, 1)
# Blocking will display a job monitor while it waits for everything to
# complete
self.run(block=True)
这会打印出每个tmux会话中将要执行的bash命令序列。
# --- /home/joncrall/.cache/base_queue/demo_queue_2022-04-08_a1ef7600/queue_demo_queue_0_2022-04-08_a1ef7600.sh
#!/bin/bash
#
# Jobs
#
### Command 1 / 3 - demo_queue-job-7
echo spam && sleep 0.5
#
### Command 2 / 3 - demo_queue-job-8
echo eggs && sleep 0.5
#
### Command 3 / 3 - demo_queue-job-9
echo bazbiz && sleep 0.5
# --- /home/joncrall/.cache/base_queue/demo_queue_2022-04-08_a1ef7600/queue_demo_queue_1_2022-04-08_a1ef7600.sh
#!/bin/bash
#
# Jobs
#
### Command 1 / 2 - demo_queue-job-2
echo foo && sleep 0.5
#
### Command 2 / 2 - demo_queue-job-6
echo err && false
# --- /home/joncrall/.cache/base_queue/demo_queue_2022-04-08_a1ef7600/queue_demo_queue_2_2022-04-08_a1ef7600.sh
#!/bin/bash
#
# Jobs
#
### Command 1 / 2 - demo_queue-job-0
echo hello && sleep 0.5
#
### Command 2 / 2 - demo_queue-job-5
echo spam && sleep 0.5
# --- /home/joncrall/.cache/base_queue/demo_queue_2022-04-08_a1ef7600/queue_demo_queue_3_2022-04-08_a1ef7600.sh
#!/bin/bash
#
# Jobs
#
### Command 1 / 1 - demo_queue-job-3
echo bar && sleep 0.5
# --- /home/joncrall/.cache/base_queue/demo_queue_2022-04-08_a1ef7600/queue_demo_queue_4_2022-04-08_a1ef7600.sh
#!/bin/bash
#
# Jobs
#
### Command 1 / 1 - demo_queue-job-4
echo spam && sleep 0.5
# --- /home/joncrall/.cache/base_queue/demo_queue_2022-04-08_a1ef7600/queue_demo_queue_5_2022-04-08_a1ef7600.sh
#!/bin/bash
#
# Jobs
#
### Command 1 / 1 - demo_queue-job-1
echo world && sleep 0.5
Slurm模式是真正的解决方案。但是,您需要在机器上安装slurm才能使用它。tmux是一个更轻量级的工具。我们可以在这里指定slurm选项
import cmd_queue
self = cmd_queue.Queue.create(name='demo_queue', backend='slurm')
job1 = self.submit('echo hello && sleep 0.5', cpus=4, mem='8GB')
job2 = self.submit('echo world && sleep 0.5', depends=[job1], parition='default')
job3 = self.submit('echo foo && sleep 0.5')
job4 = self.submit('echo bar && sleep 0.5')
job5 = self.submit('echo spam && sleep 0.5', depends=[job1])
job6 = self.submit('echo spam && sleep 0.5')
job7 = self.submit('echo err && false')
job8 = self.submit('echo spam && sleep 0.5')
job9 = self.submit('echo eggs && sleep 0.5', depends=[job8])
job10 = self.submit('echo bazbiz && sleep 0.5', depends=[job9])
# Display the "user-friendly" pure bash
self.print_commands()
# Display the real bash that gets executed under the hood
# that is independencly executable, tracks the success / failure of each job,
# and manages dependencies.
self.print_commands(1, 1)
# Blocking will display a job monitor while it waits for everything to
# complete
self.run(block=True)
这会打印出非常简单的slurm提交脚本
# --- /home/joncrall/.cache/slurm_queue/demo_queue-20220408T170615-a9e238b5/demo_queue-20220408T170615-a9e238b5.sh
mkdir -p "$HOME/.cache/slurm_queue/demo_queue-20220408T170615-a9e238b5/logs"
JOB_000=$(sbatch --job-name="J0000-demo_queue-20220408T170615-a9e238b5" --cpus-per-task=4 --mem=8000 --output="/home/joncrall/.cache/slurm_queue/demo_queue-20220408T170615-a9e238b5/logs/J0000-demo_queue-20220408T170615-a9e238b5.sh" --wrap 'echo hello && sleep 0.5' --parsable)
JOB_001=$(sbatch --job-name="J0002-demo_queue-20220408T170615-a9e238b5" --output="/home/joncrall/.cache/slurm_queue/demo_queue-20220408T170615-a9e238b5/logs/J0002-demo_queue-20220408T170615-a9e238b5.sh" --wrap 'echo foo && sleep 0.5' --parsable)
JOB_002=$(sbatch --job-name="J0003-demo_queue-20220408T170615-a9e238b5" --output="/home/joncrall/.cache/slurm_queue/demo_queue-20220408T170615-a9e238b5/logs/J0003-demo_queue-20220408T170615-a9e238b5.sh" --wrap 'echo bar && sleep 0.5' --parsable)
JOB_003=$(sbatch --job-name="J0005-demo_queue-20220408T170615-a9e238b5" --output="/home/joncrall/.cache/slurm_queue/demo_queue-20220408T170615-a9e238b5/logs/J0005-demo_queue-20220408T170615-a9e238b5.sh" --wrap 'echo spam && sleep 0.5' --parsable)
JOB_004=$(sbatch --job-name="J0006-demo_queue-20220408T170615-a9e238b5" --output="/home/joncrall/.cache/slurm_queue/demo_queue-20220408T170615-a9e238b5/logs/J0006-demo_queue-20220408T170615-a9e238b5.sh" --wrap 'echo err && false' --parsable)
JOB_005=$(sbatch --job-name="J0007-demo_queue-20220408T170615-a9e238b5" --output="/home/joncrall/.cache/slurm_queue/demo_queue-20220408T170615-a9e238b5/logs/J0007-demo_queue-20220408T170615-a9e238b5.sh" --wrap 'echo spam && sleep 0.5' --parsable)
JOB_006=$(sbatch --job-name="J0001-demo_queue-20220408T170615-a9e238b5" --output="/home/joncrall/.cache/slurm_queue/demo_queue-20220408T170615-a9e238b5/logs/J0001-demo_queue-20220408T170615-a9e238b5.sh" --wrap 'echo world && sleep 0.5' "--dependency=afterok:${JOB_000}" --parsable)
JOB_007=$(sbatch --job-name="J0004-demo_queue-20220408T170615-a9e238b5" --output="/home/joncrall/.cache/slurm_queue/demo_queue-20220408T170615-a9e238b5/logs/J0004-demo_queue-20220408T170615-a9e238b5.sh" --wrap 'echo spam && sleep 0.5' "--dependency=afterok:${JOB_000}" --parsable)
JOB_008=$(sbatch --job-name="J0008-demo_queue-20220408T170615-a9e238b5" --output="/home/joncrall/.cache/slurm_queue/demo_queue-20220408T170615-a9e238b5/logs/J0008-demo_queue-20220408T170615-a9e238b5.sh" --wrap 'echo eggs && sleep 0.5' "--dependency=afterok:${JOB_005}" --parsable)
JOB_009=$(sbatch --job-name="J0009-demo_queue-20220408T170615-a9e238b5" --output="/home/joncrall/.cache/slurm_queue/demo_queue-20220408T170615-a9e238b5/logs/J0009-demo_queue-20220408T170615-a9e238b5.sh" --wrap 'echo bazbiz && sleep 0.5' "--dependency=afterok:${JOB_008}" --parsable)
项目详情
下载文件
下载适合您平台的文件。如果您不确定选择哪个,请了解更多关于 安装包 的信息。
源代码分发
构建分发
cmd_queue-0.2.0-py3-none-any.whl 的哈希值
算法 | 哈希摘要 | |
---|---|---|
SHA256 | 5581cc426492d08b696eac4023178681d17f470fce0b8cc44242105cbc5a7429 |
|
MD5 | 3a320c630d6bd1986fe8a1f01bd8aaef |
|
BLAKE2b-256 | c304e8bd5a397d797786a2eb0c29c88011594cf592642eace34337f87e2a0245 |