跳转到主要内容

一个简单的基于asyncio的作业编排引擎

项目描述

README

一个简单的asyncio编排引擎

此库的主要和唯一目的是允许对涉及asyncio兼容作业的场景进行静态描述,这些作业之间存在依赖关系,即给定的作业在其要求未完成之前不能启动。

简而言之,您将

  • 定义一组Job对象,
  • 以及它们的requires关系;也就是说,对于每一个,在触发之前需要完成哪些其他作业,
  • 并通过一个Scheduler对象运行此逻辑,该对象将编排整个场景。

其他功能允许您

  • 定义作业为关键或不;一个引发异常的关键作业会导致编排突然终止;
  • 定义作业为永久运行,在这种情况下,调度程序当然不会等待它,而是当所有其他作业完成后终止它;
  • 为整个调度程序定义一个全局的timeout
  • 定义一个窗口,即允许同时运行的作业的最大数量;
  • 定义嵌套调度程序:一个Scheduler实例也是一个作业,一个调度程序可以像普通作业一样插入另一个调度程序;嵌套调度程序允许重用,因为工作流程部分可以是普通Python函数返回的。

可以创建作业对象

  • 要么作为一个从常规asyncio协程创建的Job实例;
  • 或者通过特殊化AbstractJob类,并定义其co_run()方法;例如,apssh库中的SshJob就是这样实现的。apssh

为了方便,Sequence类是一个辅助类,可以帮助您从手动管理必须按顺序运行的长时间作业字符串中的requires依赖中解放出来。

完整文档

本文档、asynciojobs的API参考文档和变更日志可在http://asynciojobs.readthedocs.io找到。

联系作者: thierry dot parmentelat at inria dot fr

许可证: CC BY-NC-ND

先决条件

asynciojobs需要asyncio和python-3.5或更高版本。

import sys
major, minor = sys.version_info[:2]
if (major, minor) < (3, 5):
    print("asynciojobs won't work in this environment")

安装

asynciojobs需要python-3.5,并可以从pypi仓库安装。

pip3 install asynciojobs

额外依赖graphviz

此安装方法将不会尝试安装graphviz Python包,该包难以安装,并且在运行时对调度器来说并非绝对必要。

尽管如此,我们仍然建议在开发场景中安装它,因为我们将会看到,asynciojobs提供了调度器的图形表示,这对于调试非常有用。

示例

import asyncio

在我们的所有示例中,我们将使用Watch类,它是一个类似秒表的辅助类;我们更愿意显示从开始到现在的运行时间,这对应于手表实例被创建或重置的时间。

import time
from asynciojobs import Watch

watch = Watch()
time.sleep(0.5)
watch.print_elapsed('some')
000.000  
000.505 some

现在我们可以编写一个简单的协程,通过小示例来展示调度器。

import time

watch = Watch()

# just print a message when entering and exiting, and sleep in the middle
async def in_out(timeout):
    global watch
    watch.print_elapsed("-> in_out({})\n".format(timeout))
    await asyncio.sleep(timeout)
    watch.print_elapsed("<- in_out({})\n".format(timeout))
    # return something easy to recognize: the number of milliseconds
    return 1000 * timeout
000.000  

示例A:并行运行

并行运行一系列协程(类似于gather)可以这样实现:

from asynciojobs import Job, Scheduler
a1, a2, a3 = Job(in_out(0.1)), Job(in_out(0.2)), Job(in_out(0.25)),

这里我们说的是有三个工作,它们之间没有关系。

所以当我们运行它们时,我们将同时启动所有3个协程,并在它们全部完成后返回。

# this is required in our case because our coroutines
# use watch to show time elapsed since reset()
watch.reset()

sa = Scheduler(a1, a2, a3)
sa
Scheduler with 0 done + 0 ongoing + 3 idle = 3 job(s)
sa.run()
000.016 -> in_out(0.1)
000.016 -> in_out(0.2)
000.016 -> in_out(0.25)
000.119 <- in_out(0.1)
000.219 <- in_out(0.2)
000.269 <- in_out(0.25)





True

注意run()方法是一个普通的Python函数,这在README中更容易展示,但实际上它只是co_run()协程方法的包装器。

多种编程风格

该库为创建调度器和作业提供了很大的灵活性。特别是上面的例子中,我们首先创建了作业,然后将它们添加到调度器中;也可以反过来这样做,如下所示的结构是完全等价的

# if it is more to your taste, you can as well
# create the scheduler first
sa2 = Scheduler()

# and then add jobs in it as you create them
a1, a2 = Job(in_out(0.1), scheduler=sa2), Job(in_out(0.2), scheduler=sa2)

# or add them later on
a3 = Job(in_out(0.25))
sa2.add(a3)
Scheduler with 0 done + 0 ongoing + 3 idle = 3 job(s)
watch.reset()
sa2.run()
000.000 -> in_out(0.1)
000.000 -> in_out(0.2)
000.000 -> in_out(0.25)
000.105 <- in_out(0.1)
000.205 <- in_out(0.2)
000.254 <- in_out(0.25)





True

获取单个结果

我们可以立即看到如何获取各种作业的结果。

a1.result()
100.0

示例B:添加需求(依赖项)

现在我们可以在作业之间添加需求依赖项,如下所示。这里我们想要运行

  • 作业1后跟作业2
  • 所有这些都与作业3并行运行

我们借此机会展示作业可以标记一个标签,这对于更友好的显示可能很有用。

b1, b2, b3 = (Job(in_out(0.1), label="b1"),
              Job(in_out(0.2), label="b2"),
              Job(in_out(0.25)))

b2.requires(b1)

现在b2需要b1完成才能开始。因此,只有前两个协程在开始时启动,只有当b1完成时,b2才开始。

watch.reset()

# with this setup we are certain that b3 ends in the middle of b2
sb = Scheduler(b1, b2, b3)
sb.run()
000.000 -> in_out(0.1)
000.000 -> in_out(0.25)
000.103 <- in_out(0.1)
000.104 -> in_out(0.2)
000.255 <- in_out(0.25)
000.309 <- in_out(0.2)





True

示例B':使用Sequence实现完全相同的效果

示例B中的上述代码与以下代码完全相同

from asynciojobs import Sequence

sb2 = Scheduler(
    Sequence(Job(in_out(0.1), label="bp1"),
             Job(in_out(0.2), label="bp2")),
    Job(in_out(0.25)))
watch.reset()

sb2.run()
000.000 -> in_out(0.1)
000.000 -> in_out(0.25)
000.104 <- in_out(0.1)
000.104 -> in_out(0.2)
000.255 <- in_out(0.25)
000.309 <- in_out(0.2)





True

Scheduler.run()的返回值

注意,由于sb.run()返回了True,我们可以推断出所有作业都已完成。实际上,run()返回True 仅当

  • 如果指定了超时,则所有作业在分配的超时时间内完成,并且
  • 没有关键作业引发异常。

注意:

如果这两个条件不满足会发生什么取决于调度器对象上的critical属性

  • 如果调度器不是critical:那么如果这些条件中的任何一项未满足,run()将返回False
  • 如果调度器自身是关键的,则run()将根据失败的原因抛出异常,具体请参见co_run()

选择这种行为是为了让嵌套调度器能够正确地执行:它允许异常从内部调度器向上冒泡到顶级调度器,并触发其突然终止。

还可以参考failed_critical()failed_time_out()debrief()why()

检查调度器和结果 - Scheduler.list()

在看到更多示例之前,让我们看看更多关于在run完成之后如何获取发生情况的示例。例如,检查作业b1是否已完成

print(b1.is_done())
True

检查作业b3是否未引发异常

print(b3.raised_exception())
None

要查看调度器的概述,只需使用list()方法,该方法将总结调度器的内容

sb.list()
1 ⚠ ☉ ☓   <Job `b1`> [[ -> 100.0]] 
2 ⚠ ☉ ☓   <Job `b2`> [[ -> 200.0]] requires={1}
3 ⚠ ☉ ☓   <Job `Job[in_out (...)]`> [[ -> 250.0]] 

list()显示的文本表示显示了所有作业,包括

  • 其在图拓扑顺序中的排名(具有环的图需要使用list_safe()
  • 其相对于作业生命周期的进度
  • 其标签,或未指定时的一些计算得到的标签
  • 如果作业已运行,则为其结果或异常
  • 其要求

作业实例的单独生命周期如下

空闲 → 调度 → 运行 → 完成

其中,“调度”状态是针对已达到最大同时作业数的案例 - 查看jobs_window - 因此作业实际上已经满足所有要求,但仍然等待其轮到它

考虑到这一点,以下是对使用的符号及其含义的完整列表

  • :空闲(读作:要求未满足)
  • :调度(读作:在作业窗口中等待一个空位)
  • :运行
  • :完成
  • :引发异常
  • :顺利通过(未引发异常)
  • :定义为关键
  • :定义为永远

以下是list()的输出示例,包括所有可能的作业组合

01 ⚠   ⚐ ∞ <J `forever=True crit.=True status=idle boom=False`>
02 ⚠   ⚐   <J `forever=False crit.=True status=idle boom=False`> - requires {01}
03     ⚐ ∞ <J `forever=True crit.=False status=idle boom=False`> - requires {02}
04     ⚐   <J `forever=False crit.=False status=idle boom=False`> - requires {03}
05 ⚠   ⚑ ∞ <J `forever=True crit.=True status=scheduled boom=False`> - requires {04}
06 ⚠   ⚑   <J `forever=False crit.=True status=scheduled boom=False`> - requires {05}
07     ⚑ ∞ <J `forever=True crit.=False status=scheduled boom=False`> - requires {06}
08     ⚑   <J `forever=False crit.=False status=scheduled boom=False`> - requires {07}
09 ⚠ ☉ ↺ ∞ <J `forever=True crit.=True status=running boom=False`> - requires {08}
10 ⚠ ☉ ↺   <J `forever=False crit.=True status=running boom=False`> - requires {09}
11   ☉ ↺ ∞ <J `forever=True crit.=False status=running boom=False`> - requires {10}
12   ☉ ↺   <J `forever=False crit.=False status=running boom=False`> - requires {11}
13 ⚠ ★ ☓ ∞ <J `forever=True crit.=True status=done boom=True`>!! CRIT. EXC. => bool:True!! - requires {12}
14 ⚠ ★ ☓   <J `forever=False crit.=True status=done boom=True`>!! CRIT. EXC. => bool:True!! - requires {13}
15   ★ ☓ ∞ <J `forever=True crit.=False status=done boom=True`>!! exception => bool:True!! - requires {14}
16   ★ ☓   <J `forever=False crit.=False status=done boom=True`>!! exception => bool:True!! - requires {15}
17 ⚠ ☉ ☓ ∞ <J `forever=True crit.=True status=done boom=False`>[[ -> 0]] - requires {16}
18 ⚠ ☉ ☓   <J `forever=False crit.=True status=done boom=False`>[[ -> 0]] - requires {17}
19   ☉ ☓ ∞ <J `forever=True crit.=False status=done boom=False`>[[ -> 0]] - requires {18}
20   ☉ ☓   <J `forever=False crit.=False status=done boom=False`>[[ -> 0]] - requires {19}

请注意,如果您的区域/终端无法输出这些,则代码将尝试使用纯ASCII输出。

图形表示

获取调度器的图形表示很容易。在Jupyter笔记本内部,您只需执行例如

sb2.graph()

然而,在readthedocs的上下文中,这个笔记本被转换为静态的markdown文件,因此我们无法使用这种优雅的方法。相反,我们使用更原始的工作流程,首先创建dot文件作为图,然后使用外部工具生成png文件

# this can always be done, it does not require graphviz to be installed
sb2.export_as_dotfile("readme-example-b.dot")
'(Over)wrote readme-example-b.dot'
# assuming you have the 'dot' program installed (it ships with graphviz)

import os

os.system("dot -Tpng readme-example-b.dot -o readme-example-b.png")
0

现在我们可以查看结果,您可以在这里识别示例B的逻辑

example B as a png file

示例C:无限循环或未返回的协程

有时处理无限循环是有用的;例如,如果我们想要完全分离动作和打印,我们可以使用asyncio.Queue来实现简单的消息总线,如下所示

message_bus = asyncio.Queue()

async def monitor_loop(bus):
    while True:
        message = await bus.get()
        print("BUS: {}".format(message))

现在我们需要对in_out协程进行修改版本,使其与这个消息总线交互而不是打印任何内容

async def in_out_bus(timeout, bus):
    global watch
    await bus.put("{} -> in_out({})".format(watch.elapsed(), timeout))
    await asyncio.sleep(timeout)
    await bus.put("{} <- in_out({})".format(watch.elapsed(), timeout))
    # return something easy to recognize
    return 10 * timeout

我们可以重新播放先前的场景,将监控循环作为单独的作业添加

然而,我们需要使用forever=True声明这个额外的作业,这样调度器就知道它不需要等待监控循环,因为我们事先知道这个监控循环将按设计永远不返回。

c1, c2, c3, c4 = (Job(in_out_bus(0.2, message_bus), label="c1"),
                  Job(in_out_bus(0.4, message_bus), label="c2"), 
                  Job(in_out_bus(0.3, message_bus), label="c3"),
                  Job(monitor_loop(message_bus), forever=True, label="monitor"))

c3.requires(c1)
watch.reset()

sc = Scheduler(c1, c2, c3, c4)
sc.run()
BUS: 000.000 -> in_out(0.2)
BUS: 000.000 -> in_out(0.4)
BUS: 000.205 <- in_out(0.2)
BUS: 000.206 -> in_out(0.3)
BUS: 000.406 <- in_out(0.4)
BUS: 000.509 <- in_out(0.3)





True

请注意,run() 总是在所有非 forever 任务完成后立即终止。另一方面,forever 任务会被取消,因此在场景结束时当然没有返回值。

sc.list()
1 ⚠ ☉ ☓   <Job `c1`> [[ -> 2.0]] 
2 ⚠ ☉ ☓   <Job `c2`> [[ -> 4.0]] 
3 ⚠ ☉ ☓   <Job `c3`> [[ -> 3.0]] requires={1}
4 ⚠ ☉ ↺ ∞ <Job `monitor`> [not done] 

在图形表示中,无限期任务会以虚线边框显示。

# a function to materialize the rustic way of producing a graphical representation
def make_png(scheduler, prefix):
    dotname = "{}.dot".format(prefix)
    pngname = "{}.png".format(prefix)
    scheduler.export_as_dotfile(dotname)
    os.system("dot -Tpng {dotname} -o {pngname}".format(**locals()))
    print(pngname)
make_png(sc, "readme-example-c")
readme-example-c.png

example-c

注意:调度器本质上是一组任务,因此调度器中任务的创建顺序在内存中不会被保留。

示例D:指定全局超时

一个 Scheduler 对象有一个 timeout 属性,可以设置为一个持续时间(以秒为单位)。当提供时,run() 将确保其全局持续时间不超过此值,并在超时触发时返回 False 或抛出 TimeoutError

当然,这可以与任何数量和依赖关系的任务一起使用,但为了简单起见,让我们看看只有一个无限循环任务的情况。

async def forever():
    global watch
    for i in range(100000):
        print("{}: forever {}".format(watch.elapsed(), i))
        await asyncio.sleep(.1)
        
j = Job(forever(), forever=True)
watch.reset()
sd = Scheduler(j, timeout=0.25, critical=False)
sd.run()
000.000: forever 0
000.104: forever 1
000.209: forever 2
11:34:57.169 SCHEDULER(None): PureScheduler.co_run: TIMEOUT occurred





False

如您所见,在这种情况下 run() 的结果是 False,因为并非所有任务都已完成。除此之外,任务现在处于此状态。

j
⚠ ☉ ↺ ∞ <Job `Job[forever (...)]`> [not done] 

处理异常

一个任务实例可以是 关键的 或不是;这意味着如下:

  • 如果关键任务引发异常,整个调度器将立即中止并返回 False
  • 如果非关键任务引发异常,整个调度器将继续进行。

在这两种情况下,异常都可以通过相应的 Job 对象中的 raised_exception() 方法检索。

示例E:非关键任务

async def boom(n):
    await asyncio.sleep(n)
    raise Exception("boom after {}s".format(n))
# by default everything is non critical
e1 = Job(in_out(0.2), label='begin')
e2 = Job(boom(0.2), label="boom", critical=False)
e3 = Job(in_out(0.3), label='end')

se = Scheduler(Sequence(e1, e2, e3), critical=False)
# with these settings, jobs 'end' is not hindered 
# by the middle job raising an exception
watch.reset()
se.run()
000.000 -> in_out(0.2)
000.205 <- in_out(0.2)
000.409 -> in_out(0.3)
000.710 <- in_out(0.3)





True
# in this listing you can see that job 'end' 
# has been running and has returned '300' as expected
se.list()
1 ⚠ ☉ ☓   <Job `begin`> [[ -> 200.0]] 
2   ★ ☓   <Job `boom`> !! exception => Exception:boom after 0.2s!! requires={1}
3 ⚠ ☉ ☓   <Job `end`> [[ -> 300.0]] requires={2}

非关键任务和调度器会以细黑边框显示。

make_png(se, "readme-example-e")
readme-example-e.png

example E

示例F:关键任务

boom 任务设置为关键任务会导致调度器退出。

f1 = Job(in_out(0.2), label="begin")
f2 = Job(boom(0.2), label="boom", critical=True)
f3 = Job(in_out(0.3), label="end")

sf = Scheduler(Sequence(f1, f2, f3), critical=False)
# with this setup, orchestration stops immediately
# when the exception triggers in boom()
# and the last job does not run at all
watch.reset()
sf.run()
000.000 -> in_out(0.2)
000.202 <- in_out(0.2)
11:34:58.383 SCHEDULER(None): Emergency exit upon exception in critical job





False
# as you can see, job 'end' has not even started here
sf.list()
1 ⚠ ☉ ☓   <Job `begin`> [[ -> 200.0]] 
2 ⚠ ★ ☓   <Job `boom`> !! CRIT. EXC. => Exception:boom after 0.2s!! requires={1}
3 ⚠   ⚐   <Job `end`> [not done] requires={2}

关键任务和调度器会以粗红边框显示。

make_png(sf, "readme-example-f")
readme-example-f.png

example F

限制同时运行的任务数量

Scheduler 有一个 jobs_window 属性,允许指定同时运行的最大任务数量。

jobs_windows 未指定或为 0 时,表示不对正在运行的任务施加限制。

# let's define a simple coroutine
async def aprint(message, delay=0.5):
     print(message)
     await asyncio.sleep(delay)
# let us now add 8 jobs that take 0.5 second each
s = Scheduler(jobs_window=4)

for i in range(1, 9):
    s.add(Job(aprint("{} {}-th job".format(watch.elapsed(), i), 0.5)))
# so running them with a window of 4 means approx. 1 second
watch.reset()
s.run()
# expect around 1 second
print("total duration = {}s".format(watch.elapsed()))
000.469 1-th job
000.469 2-th job
000.469 3-th job
000.469 4-th job
000.469 5-th job
000.469 6-th job
000.469 7-th job
000.469 8-th job
total duration = 001.004s

嵌套调度器

如介绍中所述,一个 Scheduler 实例本身可以用作任务。这使得将复杂场景分解成部分并以模块化方式组合它们变得容易。

让我们考虑以下示例

# we start with the creation of an internal scheduler
# that has a simple diamond structure

sub_sched = Scheduler(label="critical nested", critical=True)
subj1 = Job(aprint("subj1"), label='subj1', scheduler=sub_sched)
subj2 = Job(aprint("subj2"), label='subj2', required=subj1, scheduler=sub_sched)
subj3 = Job(aprint("subj3"), label='subj3', required=subj1, scheduler=sub_sched)
subj4 = Job(aprint("subj4"), label='subj4', required=(subj2, subj3), scheduler=sub_sched)
make_png(sub_sched, "readme-subscheduler")
readme-subscheduler.png

internal scheduler

现在我们可以创建一个主调度器,其中 一个任务是这个低级调度器

# the main scheduler
main_sched = Scheduler(
    Sequence(
        Job(aprint("main-start"), label="main-start"),
        # the way to graft the low-level logic in this main workflow
        # is to just use the ShcdulerJob instance as a job
        sub_sched,
        Job(aprint("main-end"), label="main-end"),
    )
)

这种嵌套结构由 list()graph() 两种方式渲染。

# list() shows the contents of sub-schedulers implemented as Scheduler instances
main_sched.list()
1 ⚠   ⚐   <Job `main-start`> [not done] 
2 ⚠   ⚐   <Scheduler `critical nested`> [not done] requires={1} -> entries={3}
3 ⚠   ⚐   > <Job `subj1`> [not done] 
4 ⚠   ⚐   > <Job `subj2`> [not done] requires={3}
5 ⚠   ⚐   > <Job `subj3`> [not done] requires={3}
6 ⚠   ⚐   > <Job `subj4`> [not done] requires={4, 5}
2 --end-- < <Scheduler `critical nested`> exits={6}
7 ⚠   ⚐   <Job `main-end`> [not done] requires={2}

当使用 Scheduler 来描述嵌套调度器时,asynciojobs 还将生成图形输出,正确显示整体结构。

让我们再次以另一种方式做这件事,以便在 readthedocs 中正确显示。

make_png(main_sched, "readme-nested")
readme-nested.png

这是主调度器通过 graph() 渲染的方式

执行后产生以下输出

main_sched.run()
main-start
subj1
subj2
subj3
subj4
main-end





True

嵌套调度器的优点

此功能可以用来处理以下问题

  • 您希望能够重用代码——例如编写库——而嵌套调度器是一种方便的方式来解决这个问题;函数可以返回作为调度器实现的 workflow 的部分,这些部分可以很容易地混合在一个更大的场景中;

  • 在另一个维度上,嵌套调度器可以是解决方案,如果

    • 您希望 jobs_window 属性仅应用于您的任务子集;
    • 或您需要 timeout 属性仅应用于您的任务子集;
    • 您有需要在整个场景结束之前更早终止的 forever 任务。

历史说明

在内部,asynciojobs 配有 PureScheduler 类。

一个 PureScheduler 实例是一个完全功能的调度器,但不能用作嵌套调度器。

在实现方面,Scheduler 是一个混合类,继承自 PureSchedulerAbstractJob

在此库的先前版本中,Scheduler 类不能嵌套,创建可嵌套调度器需要特定的类,如表格所示

版本仅调度可嵌套调度器
<= 0.8Scheduler无可用类
== 0.9SchedulerSchedulerJob
>= 0.10PureSchedulerScheduler

总之,从版本 0.10 开始,用户主要不需要担心这个问题,仅创建可嵌套的 Scheduler 对象是推荐的方法。

Scheduler 类上的其他有用功能

检查/故障排除:Scheduler.debrief()

Scheduler.debrief()是为已经运行并返回 False 的调度器设计的,它输出的列表与 list() 相同,但增加了关于作业数量的统计信息,最重要的是,增加了抛出异常的作业堆栈的统计信息。

清理:Scheduler.sanitize()

在某些情况下,例如测试场景中,可能需要向不在调度器中的作业添加要求。 sanitize 方法移除此类额外要求,除非你确定这不是你的情况,否则在编排之前显式调用它可能是个好主意。

早期检查:Scheduler.check_cycles()

check_cycles 将检查要求图中的循环。它返回一个布尔值。在运行编排之前调用它是个好主意。

需要一个协程代替吗?:Scheduler.co_run()

run() 是一个常规的 def 函数(即不是 async def),但实际上只是原生协程 co_run() 的包装器。

def run(self, *args, **kwds):
    loop = asyncio.get_event_loop()
    return loop.run_until_complete(self.co_run(*args, **kwds))

清理 - shutdown() 方法。

调度器对象公开 shutdown() 方法。

当资源附加到各种作业时,该方法应由用户显式调用,并且可以释放这些资源。

与较旧的 asynciojobs 版本的做法相反,当时嵌套调度器还不是非常有用,这个调用现在需要显式调用,它不再在编排结束后自动由 run() 调用。

尽管这种清理在本地 Job 实例的情况下实际上并不很有用,但一些应用程序库,如 apssh,定义了附加到网络连接的作业,例如 apssh 中的 ssh 连接,能够显式地终止这些连接是很方便的。

可视化 - 在笔记本中:Scheduler.graph()

如果您已安装 graphviz 软件包,您可以使用 graph() 方法在 Jupyter 笔记本中检查调度器实例,该方法返回一个 graphviz.Digraph 实例;这样就可以在笔记本中交互式地显示调度器图 - 参见http://graphviz.readthedocs.io/en/stable/manual.html#jupyter-notebooks

这是一个简单的示例

# and a simple scheduler with an initialization and 2 concurrent tasks
s = Scheduler()
j1 = Job(aprint("j1"), label="init", critical=False, scheduler=s)
j2 = Job(aprint("j2"), label="critical job", critical=True, scheduler=s, required=j1)
j3 = Job(aprint("j3"), label="forever job", critical=False, forever=True, scheduler=s, required=j1)
s.graph()

svg

在一个常规笔记本中,这就是您需要做的全部工作来查看调度器的图。然而,在这个 README 中,一旦在 readthedocs.io 上渲染,图表在转换中丢失了,所以请继续阅读以查看该图。

可视化 - 长方法:Scheduler.export_as_dotfile()

如果不在笔记本中可视化,或者如果您没有安装 graphviz,您仍然可以从调度器对象生成一个 dot 文件。

s.export_as_dotfile('readme-dotfile.dot')
'(Over)wrote readme-dotfile.dot'

然后稍后 - 可能是在另一台主机上 - 您可以使用此 dot 文件作为输入,使用 dot 程序(它是 graphviz 的一部分)生成 .png 图像,例如:

import os
os.system("dot -Tpng readme-dotfile.dot -o readme-dotfile.png")
0

现在允许我们将最后调度器的图形渲染为 png 文件

manually inserted readme

图例:在这个小例子中,我们可以看到

  • 关键任务带有粗的红边;
  • 而普通任务有更细的黑边;
  • 永久任务有虚线边;
  • 而常规任务有实线边。

尽管这里没有说明,但相同的图例也适用于嵌套调度器。

注意,如果您已经安装了 graphviz,则可以更简单地生成 png 文件,即不需要创建 dot 文件,如下所示

# a trick to produce a png file on a box that has graphviz pip-installed
g = s.graph()
g.format = 'png'
g.render('readme')
'readme.png'

故障排除

一般来说,特别是当处理嵌套调度器时,重要的是要注意以下限制。

不要在多个调度器中插入任务

一个特定的任务应该只插入 一个 调度器。请注意,代码并不检查这一点,强制执行此规则是程序员的职责。

未插入任何调度器的任务当然永远不会运行。

在多个调度器中插入的任务可能会表现得非常奇怪,因为每个调度器都有可能移动它。

您只能在同一调度器中的任务之间创建需求

对于嵌套调度器,可能会诱人创建不属于同一调度器但属于兄弟或堂兄弟调度器的任务之间的依赖关系。

目前不支持这种情况,任务只能有对 同一调度器中的其他任务 的需求。

如前所述,目前代码中没有提供强制执行此规则的机制,违反此规则会导致出现意外行为。

创建所需数量的任务实例

另一个常见的错误是尝试在调度器中的多个位置重用任务实例。每个实例都携带任务进度的状态,因此需要创建与任务数量相同的实例/副本,并且不要尝试共享任务对象。

特别是,如果您将已完成的任务实例取出来,并尝试将其插入到新的调度器中,它将被视为已完成,并且不会再次运行。

不能两次运行相同的调度器

类似地,一旦调度器完成(假设一切顺利)- 实质上所有任务都被标记为完成,再次尝试运行它要么什么也不做,要么引发异常。

自定义任务

自定义 Job

Job 实际上是 AbstractJob 的特殊化,规范是 co_run() 方法本身应该是一个协程,因为这是当 Scheduler 运行该任务时被触发的。

AbstractJob.co_shutdown()

调度器上的 shutdown() 方法向所有(可能嵌套的)任务发送 co_shutdown() 方法。在 Job 类中的默认行为是不做任何事情,但可以通过 AbstractJob 的子类在相关的情况下重新定义。通常,SshJob 的实现将允许多个 SshJob 实例共享给定的 SSH 连接,因此 co_shutdown() 可以用于关闭底层的 SSH 连接。

apssh 库和 SshJob

您可以通过特殊化 job.AbstractJob 轻松定义自己的 Job 类。作为一个例子,这是在开发 asynciojobs 时的主要目标,您可以在 apssh 库中找到一个 SshJob 类,它允许您轻松地编排涉及多个主机(您使用 ssh 与之交互)的场景。

项目详情


下载文件

下载适合您平台的文件。如果您不确定选择哪个,请了解更多关于安装包的信息。

源代码分发

asynciojobs-0.18.1.tar.gz (51.9 kB 查看哈希值)

上传时间 源代码

构建分发

asynciojobs-0.18.1-py3-none-any.whl (44.6 kB 查看哈希值)

上传时间 Python 3

支持者