跳转到主要内容

运行动态DAG工作流

项目描述

Adage - DAG执行器

CI Code Health PyPI Coverage Status Documentation Status

这是一个小型实验性软件包,用于了解如何在定义时间不知道所有内容的情况下描述工作流。任务应在多进程池中或在多个celery工作进程或IPython集群中使用。

示例

example image

问题

工作流可以舒适地表示为有向无环图(DAG)。但有时在处理开始之前,图的确切结构是未知的。相反,通常只有关于可能边的一些部分信息,并且根据节点中的某个结果,DAG可能会附加更多节点和边。

例如,一个节点(称为“节点A”)可能正在下载文件列表,这些文件可以并行处理。因此,DAG将针对每个文件处理(让我们称它们为“节点_file_1”到“节点_file_n”)为“节点A”创建一个节点。由于确切文件数量在运行时才知道,因此我们无法提前绘制DAG。此外,在“映射”步骤之后,可能还需要一个“归约”步骤来合并单个结果。这只能在知道“映射”节点数量后才能安排。

另一个例子是,可能有一组节点运行某种类型的任务(例如,生成PDF文件)。可以想象需要有一个“归约”类型任务来合并所有这些单个PDF文件。虽然任何给定节点都不知道其他PDF生成任务在哪里调度,但可以在没有更多指向PDF生成任务的边的情况下等待,然后向DAG添加PDF合并节点。

解决方案

总的来说,我们希望各个节点对其所属的DAG能够执行有限的操作。具体来说,我们只能允许查询DAG的结构以及追加操作,节点不能删除节点。我们实现这一点的办法是,我们有一个仅追加的已安排规则记录。规则是一对函数(谓词,主体),它们在DAG上操作。谓词是一个查询函数,它检查图以确定DAG是否有足够的信息来应用主体(例如,是否还可以追加某种类型的边?)。如果DAG确实有足够的信息,则主体(即对DAG的仅追加操作)被应用,即添加节点。定期遍历规则列表,以便在可能的情况下扩展DAG。

规则的规则

有一些规则规则需要遵守,以便

  • 谓词的责任是通知所有必要的节点都已存在于DAG中。例如

    • 等待直到无法将特定类型的节点添加到DAG中。这要求我们知道在全局级别上有效的哪种类型的边。
      • 等待直到DAG中存在一定数量的节点(比如说)
    • 通过它们的唯一ID选择一组节点(用于将子DAG附加到节点内部)
  • 唯一可以动态添加的有效边是指向现有节点的边。指向现有节点的边会引入以前不存在的依赖关系,因此该作业可能已经运行或正在运行

项目详情


下载文件

下载您平台上的文件。如果您不确定选择哪一个,请了解有关安装包的更多信息。

源分发

adage-0.11.0.tar.gz (18.1 kB 查看哈希值)

上传时间 源码

构建分发版

adage-0.11.0-py3-none-any.whl (18.8 kB 查看哈希值)

上传时间 Python 3

支持者