跳转到主要内容

用于bash命令DAG的cmd_queue模块

项目描述

命令队列 - cmd_queue

Pypi Downloads GitlabCIPipeline GitlabCICoverage ReadTheDocs

阅读文档

https://cmd-queue.readthedocs.io

Gitlab

https://gitlab.kitware.com/computer-vision/cmd_queue

Pypi

https://pypi.ac.cn/project/cmd_queue

幻灯片

https://docs.google.com/presentation/d/1BjJkjMx6bxu1uek-hAGpwj760u9rraVn7st8J5OsZME

这是一个简单的模块,用于“生成”一个bash脚本,可以在单个机器上调度多个作业(如果可能的话,并行执行)。它具有3个后端,复杂性递增:串行、tmux和slurm。

在串行模式下,将编写一个bash脚本,按顺序执行您的作业。没有外部依赖项

在tmux模式下,将打开多个tmux会话,每个会话都执行您的作业的独立部分。处理依赖项。

在slurm模式下,使用真正的重量级调度算法。在这种模式下,我们只需将您的作业转换为slurm命令并执行它们。

在内部,我们根据您指定的依赖关系构建一个DAG(有向无环图),并使用它来适当地排序作业。

默认情况下,执行您的作业的bash脚本会输出到控制台。如果用户只想手动运行管道的一部分,这提供了细粒度的控制。但如果被要求运行,cmd_queue将执行bash作业。

特性

  • Bash命令调度

  • 执行是可选的,也可以只打印命令

  • 总是可用的无并行性串行后端

  • 基于Tmux的轻量级后端

  • 基于Slurm的重型后端

  • Python和bash接口

  • 丰富的监控/实时控制

安装

cmd_queue软件包可在PyPI上获得。

pip install cmd_queue

串行队列后端始终可用。要访问其他后端,您必须安装其关联的依赖项。Tmux后端是最容易的,只需安装tmux即可(例如,在Debian系统上使用sudo apt install tmux)。

其他后端需要更复杂的设置。Slurm后端需要安装Slurm并且守护程序正在运行。Slurm后端是功能性的且经过测试,但仍可进行改进(需要帮助)。类似地,Airflow后端需要配置过的Airflow服务器,但尚未完全功能或测试(需要贡献使Airflow工作或更容易工作)!

Tmux队列演示

安装后,以下命令运行tmux队列的演示

# Reproduce the
INTERACTIVE_TEST=1 xdoctest -m cmd_queue.tmux_queue TMUXMultiQueue.monitor:1

这执行以下代码,创建两个并行的tmux工作进程并向具有非平凡依赖关系的多个bash作业提交。

 # xdoctest: +REQUIRES(env:INTERACTIVE_TEST)
 from cmd_queue.tmux_queue import *  # NOQA
 # Setup a lot of longer running jobs
 n = 2
 self = TMUXMultiQueue(size=n, name='demo_cmd_queue')
 first_job = None
 for i in range(n):
     prev_job = None
     for j in range(4):
        command = f'sleep 1 && echo "This is job {i}.{j}"'
        job = self.submit(command, depends=prev_job)
        prev_job = job
        first_job = first_job or job
command = f'sleep 1 && echo "this is the last job"'
job = self.submit(command, depends=[prev_job, first_job])
self.print_commands(style='rich')
self.print_graph()
if self.is_available():
    self.run(block=True, other_session_handler='kill')

当运行print_commands命令时,首先显示将分布在多个新tmux会话中的所有提交的命令。这些是将要执行的命令。这有助于在队列执行前使用run之前检查您的bash命令模板是否正确。

https://i.imgur.com/rVbyHzM.png

print_graph命令将使用网络文本渲染要执行的DAG。最后,使用带有block=Truerun调用,启动执行DAG并显示进度和作业状态在丰富的或文本监控器中。

https://i.imgur.com/4mxFIMk.gif

在运行此演示时,可以简单地附加到tmux会话(例如,使用tmux a),并检查正在运行的特定队列。我们建议在tmux会话中使用<ctrl-b>s来查看和导航tmux会话。与Slurm后端不同,DAG的整个执行过程对开发人员来说是完全透明的!以下截图显示了运行此演示时生成的tmux会话。

https://i.imgur.com/46LRK8M.png

默认情况下,如果没有错误,这些会话将在执行完成后退出,但这是可以配置的。同样,如果有错误,tmux会话将保持以允许进行调试。

动机

最近,我需要在2个GPU上的4个作业上运行几个作业,然后在他们全部完成后执行一个脚本。我本应该使用slurm或其他适当的排队系统来安排作业,但相反,我使用tmux编写了自己的hacky调度器。

这对我来说非常有效,并且能够有效地安排作业,而不需要在机器上使用像slurm这样的重型软件。

最终,我在机器上安装了slurm,并将我的tmux_queue API抽象为通用的“命令队列”,该队列可以使用以下三个后端之一:串行、tmux或slurm。

利基

有大量的DAG调度器

  • airflow

  • luigi

  • submitit

  • rq_scheduler

该功能适用于您有大量相互依赖的bash命令行时,并希望使用Python中定义的逻辑来模板化这些参数。

我们计划添加一个airflow后端。

使用方法

使用 cmd_queue 有两种方式

  1. 在Python中创建一个Queue对象,然后调用其 .submit 方法传入shell调用。它返回一个对象,您可以使用它来指定后续调用 .submit 的依赖项。这简单地将您所有的CLI调用组织到一个bash脚本中,可以检查并运行。有不同后端可以在满足依赖项的情况下并行执行作业。

  2. 您还可以通过CLI使用它,详细信息请参阅 cmd_queue –help。使用方法基本上相同。您创建一个队列,提交作业到它,您可以检查它,也可以运行它。

Python中使用示例

import cmd_queue

# Create a Queue object
self = cmd_queue.Queue.create(name='demo_queue', backend='serial')

# Submit bash invocations that you want to run, and mark dependencies.
job1 = self.submit('echo hello')
job2 = self.submit('echo world', depends=[job1])
job3 = self.submit('echo foo')
job4 = self.submit('echo bar', depends=[job2, job3])
job5 = self.submit('echo spam', depends=[job1])

# Print a graph of job dependencies
self.print_graph()

# Display the simplified bash script to be executed.
self.print_commands()

# Execute the jobs
self.run()

CLI中使用示例

# Create a Queue
cmd_queue new "demo_cli_queue"

# Submit bash invocations that you want to run, and mark dependencies.
cmd_queue submit --jobname job1 "demo_cli_queue" -- echo hello
cmd_queue submit --jobname job2 --depends job1 "demo_cli_queue" -- echo world
cmd_queue submit --jobname job3 "demo_cli_queue" -- echo foo
cmd_queue submit --jobname job4 --depends job1,job2 "demo_cli_queue" -- echo bar
cmd_queue submit --jobname job5 --depends job1  "demo_cli_queue" -- echo spam

# Display the simplified bash script to be executed.
cmd_queue show "demo_cli_queue" --backend=serial

# Execute the jobs
cmd_queue run "demo_cli_queue" --backend=serial

示例

所有依赖项检查和记录逻辑都由bash本身处理。用Python编写(或更好的是模板化)您的bash脚本,然后使用cmd_queue将这些命令序列“转译”为纯bash。

import cmd_queue

# Create a Queue object
self = cmd_queue.Queue.create(name='demo_queue', backend='serial')

# Submit bash invocations that you want to run, and mark dependencies.
job1 = self.submit('echo hello && sleep 0.5')
job2 = self.submit('echo world && sleep 0.5', depends=[job1])
job3 = self.submit('echo foo && sleep 0.5')
job4 = self.submit('echo bar && sleep 0.5')
job5 = self.submit('echo spam && sleep 0.5', depends=[job1])
job6 = self.submit('echo spam && sleep 0.5')
job7 = self.submit('echo err && false')
job8 = self.submit('echo spam && sleep 0.5')
job9 = self.submit('echo eggs && sleep 0.5', depends=[job8])
job10 = self.submit('echo bazbiz && sleep 0.5', depends=[job9])

# Display the simplified bash script to be executed.
self.print_commands()

# Execute the jobs
self.run()

这会按适当顺序打印bash命令以解决依赖关系。

# --- /home/joncrall/.cache/base_queue/demo_queue_2022-04-08_cc9d551e/demo_queue_2022-04-08_cc9d551e.sh

#!/bin/bash
#
# Jobs
#
### Command 1 / 10 - demo_queue-job-0
echo hello && sleep 0.5
#
### Command 2 / 10 - demo_queue-job-1
echo world && sleep 0.5
#
### Command 3 / 10 - demo_queue-job-2
echo foo && sleep 0.5
#
### Command 4 / 10 - demo_queue-job-3
echo bar && sleep 0.5
#
### Command 5 / 10 - demo_queue-job-4
echo spam && sleep 0.5
#
### Command 6 / 10 - demo_queue-job-5
echo spam && sleep 0.5
#
### Command 7 / 10 - demo_queue-job-6
echo err && false
#
### Command 8 / 10 - demo_queue-job-7
echo spam && sleep 0.5
#
### Command 9 / 10 - demo_queue-job-8
echo eggs && sleep 0.5
#
### Command 10 / 10 - demo_queue-job-9
echo bazbiz && sleep 0.5

可以通过选择更强大的后端来并行运行相同的代码。tmux后端是最轻量级的并行后端。

# Need to tell the tmux queue how many processes can run at the same time
import cmd_queue
self = cmd_queue.Queue.create(size=4, name='demo_queue', backend='tmux')
job1 = self.submit('echo hello && sleep 0.5')
job2 = self.submit('echo world && sleep 0.5', depends=[job1])
job3 = self.submit('echo foo && sleep 0.5')
job4 = self.submit('echo bar && sleep 0.5')
job5 = self.submit('echo spam && sleep 0.5', depends=[job1])
job6 = self.submit('echo spam && sleep 0.5')
job7 = self.submit('echo err && false')
job8 = self.submit('echo spam && sleep 0.5')
job9 = self.submit('echo eggs && sleep 0.5', depends=[job8])
job10 = self.submit('echo bazbiz && sleep 0.5', depends=[job9])

# Display the "user-friendly" pure bash
self.print_commands()

# Display the real bash that gets executed under the hood
# that is independencly executable, tracks the success / failure of each job,
# and manages dependencies.
self.print_commands(1, 1)

# Blocking will display a job monitor while it waits for everything to
# complete
self.run(block=True)

这会打印出每个tmux会话中将要执行的bash命令序列。

# --- /home/joncrall/.cache/base_queue/demo_queue_2022-04-08_a1ef7600/queue_demo_queue_0_2022-04-08_a1ef7600.sh

#!/bin/bash
#
# Jobs
#
### Command 1 / 3 - demo_queue-job-7
echo spam && sleep 0.5
#
### Command 2 / 3 - demo_queue-job-8
echo eggs && sleep 0.5
#
### Command 3 / 3 - demo_queue-job-9
echo bazbiz && sleep 0.5

# --- /home/joncrall/.cache/base_queue/demo_queue_2022-04-08_a1ef7600/queue_demo_queue_1_2022-04-08_a1ef7600.sh

#!/bin/bash
#
# Jobs
#
### Command 1 / 2 - demo_queue-job-2
echo foo && sleep 0.5
#
### Command 2 / 2 - demo_queue-job-6
echo err && false

# --- /home/joncrall/.cache/base_queue/demo_queue_2022-04-08_a1ef7600/queue_demo_queue_2_2022-04-08_a1ef7600.sh

#!/bin/bash
#
# Jobs
#
### Command 1 / 2 - demo_queue-job-0
echo hello && sleep 0.5
#
### Command 2 / 2 - demo_queue-job-5
echo spam && sleep 0.5

# --- /home/joncrall/.cache/base_queue/demo_queue_2022-04-08_a1ef7600/queue_demo_queue_3_2022-04-08_a1ef7600.sh

#!/bin/bash
#
# Jobs
#
### Command 1 / 1 - demo_queue-job-3
echo bar && sleep 0.5

# --- /home/joncrall/.cache/base_queue/demo_queue_2022-04-08_a1ef7600/queue_demo_queue_4_2022-04-08_a1ef7600.sh

#!/bin/bash
#
# Jobs
#
### Command 1 / 1 - demo_queue-job-4
echo spam && sleep 0.5

# --- /home/joncrall/.cache/base_queue/demo_queue_2022-04-08_a1ef7600/queue_demo_queue_5_2022-04-08_a1ef7600.sh

#!/bin/bash
#
# Jobs
#
### Command 1 / 1 - demo_queue-job-1
echo world && sleep 0.5

Slurm模式是真正的解决方案。但是,您需要在机器上安装slurm才能使用它。tmux是一个更轻量级的工具。我们可以在这里指定slurm选项

import cmd_queue
self = cmd_queue.Queue.create(name='demo_queue', backend='slurm')
job1 = self.submit('echo hello && sleep 0.5', cpus=4, mem='8GB')
job2 = self.submit('echo world && sleep 0.5', depends=[job1], parition='default')
job3 = self.submit('echo foo && sleep 0.5')
job4 = self.submit('echo bar && sleep 0.5')
job5 = self.submit('echo spam && sleep 0.5', depends=[job1])
job6 = self.submit('echo spam && sleep 0.5')
job7 = self.submit('echo err && false')
job8 = self.submit('echo spam && sleep 0.5')
job9 = self.submit('echo eggs && sleep 0.5', depends=[job8])
job10 = self.submit('echo bazbiz && sleep 0.5', depends=[job9])

# Display the "user-friendly" pure bash
self.print_commands()

# Display the real bash that gets executed under the hood
# that is independencly executable, tracks the success / failure of each job,
# and manages dependencies.
self.print_commands(1, 1)

# Blocking will display a job monitor while it waits for everything to
# complete
self.run(block=True)

这会打印出非常简单的slurm提交脚本

# --- /home/joncrall/.cache/slurm_queue/demo_queue-20220408T170615-a9e238b5/demo_queue-20220408T170615-a9e238b5.sh

mkdir -p "$HOME/.cache/slurm_queue/demo_queue-20220408T170615-a9e238b5/logs"
JOB_000=$(sbatch --job-name="J0000-demo_queue-20220408T170615-a9e238b5" --cpus-per-task=4 --mem=8000 --output="/home/joncrall/.cache/slurm_queue/demo_queue-20220408T170615-a9e238b5/logs/J0000-demo_queue-20220408T170615-a9e238b5.sh" --wrap 'echo hello && sleep 0.5' --parsable)
JOB_001=$(sbatch --job-name="J0002-demo_queue-20220408T170615-a9e238b5" --output="/home/joncrall/.cache/slurm_queue/demo_queue-20220408T170615-a9e238b5/logs/J0002-demo_queue-20220408T170615-a9e238b5.sh" --wrap 'echo foo && sleep 0.5' --parsable)
JOB_002=$(sbatch --job-name="J0003-demo_queue-20220408T170615-a9e238b5" --output="/home/joncrall/.cache/slurm_queue/demo_queue-20220408T170615-a9e238b5/logs/J0003-demo_queue-20220408T170615-a9e238b5.sh" --wrap 'echo bar && sleep 0.5' --parsable)
JOB_003=$(sbatch --job-name="J0005-demo_queue-20220408T170615-a9e238b5" --output="/home/joncrall/.cache/slurm_queue/demo_queue-20220408T170615-a9e238b5/logs/J0005-demo_queue-20220408T170615-a9e238b5.sh" --wrap 'echo spam && sleep 0.5' --parsable)
JOB_004=$(sbatch --job-name="J0006-demo_queue-20220408T170615-a9e238b5" --output="/home/joncrall/.cache/slurm_queue/demo_queue-20220408T170615-a9e238b5/logs/J0006-demo_queue-20220408T170615-a9e238b5.sh" --wrap 'echo err && false' --parsable)
JOB_005=$(sbatch --job-name="J0007-demo_queue-20220408T170615-a9e238b5" --output="/home/joncrall/.cache/slurm_queue/demo_queue-20220408T170615-a9e238b5/logs/J0007-demo_queue-20220408T170615-a9e238b5.sh" --wrap 'echo spam && sleep 0.5' --parsable)
JOB_006=$(sbatch --job-name="J0001-demo_queue-20220408T170615-a9e238b5" --output="/home/joncrall/.cache/slurm_queue/demo_queue-20220408T170615-a9e238b5/logs/J0001-demo_queue-20220408T170615-a9e238b5.sh" --wrap 'echo world && sleep 0.5' "--dependency=afterok:${JOB_000}" --parsable)
JOB_007=$(sbatch --job-name="J0004-demo_queue-20220408T170615-a9e238b5" --output="/home/joncrall/.cache/slurm_queue/demo_queue-20220408T170615-a9e238b5/logs/J0004-demo_queue-20220408T170615-a9e238b5.sh" --wrap 'echo spam && sleep 0.5' "--dependency=afterok:${JOB_000}" --parsable)
JOB_008=$(sbatch --job-name="J0008-demo_queue-20220408T170615-a9e238b5" --output="/home/joncrall/.cache/slurm_queue/demo_queue-20220408T170615-a9e238b5/logs/J0008-demo_queue-20220408T170615-a9e238b5.sh" --wrap 'echo eggs && sleep 0.5' "--dependency=afterok:${JOB_005}" --parsable)
JOB_009=$(sbatch --job-name="J0009-demo_queue-20220408T170615-a9e238b5" --output="/home/joncrall/.cache/slurm_queue/demo_queue-20220408T170615-a9e238b5/logs/J0009-demo_queue-20220408T170615-a9e238b5.sh" --wrap 'echo bazbiz && sleep 0.5' "--dependency=afterok:${JOB_008}" --parsable)

项目详情


下载文件

下载适合您平台的文件。如果您不确定选择哪个,请了解更多关于 安装包 的信息。

源代码分发

此版本没有可用的源代码分发文件。请参阅有关 生成分发存档 的教程。

构建分发

cmd_queue-0.2.0-py3-none-any.whl (85.1 kB 查看哈希值)

上传时间 Python 3

由以下赞助

AWS AWS 云计算和安全赞助商 Datadog Datadog 监控 Fastly Fastly CDN Google Google 下载分析 Microsoft Microsoft PSF 赞助商 Pingdom Pingdom 监控 Sentry Sentry 错误记录 StatusPage StatusPage 状态页面