跳转到主要内容

Argo Workflows的DSL

项目描述

argo-python-dsl   发布

许可证   CI  

用于Argo Workflows的Python DSL

如果您是Argo的新用户,我们建议您查看纯YAML中的示例。该语言是描述性的,Argo的示例提供了详尽的解释。

对于经验更丰富的用户,这种DSL允许您在Python中以编程方式定义Argo Workflows,然后将其转换为Argo YAML规范。

DSL使用Argo Python客户端存储库中定义的Argo模型。结合这两种方法,我们获得了对Argo Workflows的完全底层控制。

入门

Hello World

本例演示了最基本的功能。通过继承 Workflow 类和单个模板(使用 @template 装饰器)定义一个 Workflow

工作流的入口点定义为 entrypoint 类属性。

Argo YAMLArgo Python

# @file: hello-world.yaml
apiVersion: argoproj.io/v1alpha1
kind: Workflow
metadata:
  name: hello-world
  generateName: hello-world-
spec:
  entrypoint: whalesay
  templates:
  - name: whalesay
    container:
      name: whalesay
      image: docker/whalesay:latest
      command: [cowsay]
      args: ["hello world"]

from argo.workflows.dsl import Workflow
from argo.workflows.dsl import template

from argo.workflows.dsl.templates import V1Container


class HelloWorld(Workflow):

    entrypoint = "whalesay"

    @template
    def whalesay(self) -> V1Container:
        container = V1Container(
            image="docker/whalesay:latest",
            name="whalesay",
            command=["cowsay"],
            args=["hello world"]
        )

        return container

DAG: 任务

本例演示了通过依赖关系定义的以 菱形 结构的任务。任务使用 @task 装饰器定义,并且它们必须返回一个有效的模板。

入口点自动创建为 main,用于 Workflow 的顶层任务。

Argo YAMLArgo Python

# @file: dag-diamond.yaml
# The following workflow executes a diamond workflow
#
#   A
#  / \
# B   C
#  \ /
#   D
apiVersion: argoproj.io/v1alpha1
kind: Workflow
metadata:
  name: dag-diamond
  generateName: dag-diamond-
spec:
  entrypoint: main
  templates:
  - name: main
    dag:
      tasks:
      - name: A
        template: echo
        arguments:
          parameters: [{name: message, value: A}]
      - name: B
        dependencies: [A]
        template: echo
        arguments:
          parameters: [{name: message, value: B}]
      - name: C
        dependencies: [A]
        template: echo
        arguments:
          parameters: [{name: message, value: C}]
      - name: D
        dependencies: [B, C]
        template: echo
        arguments:
          parameters: [{name: message, value: D}]

  # @task: [A, B, C, D]
  - name: echo
    inputs:
      parameters:
      - name: message
    container:
      name: echo
      image: alpine:3.7
      command: [echo, "{{inputs.parameters.message}}"]

from argo.workflows.dsl import Workflow

from argo.workflows.dsl.tasks import *
from argo.workflows.dsl.templates import *


class DagDiamond(Workflow):

    @task
    @parameter(name="message", value="A")
    def A(self, message: V1alpha1Parameter) -> V1alpha1Template:
        return self.echo(message=message)

    @task
    @parameter(name="message", value="B")
    @dependencies(["A"])
    def B(self, message: V1alpha1Parameter) -> V1alpha1Template:
        return self.echo(message=message)

    @task
    @parameter(name="message", value="C")
    @dependencies(["A"])
    def C(self, message: V1alpha1Parameter) -> V1alpha1Template:
        return self.echo(message=message)

    @task
    @parameter(name="message", value="D")
    @dependencies(["B", "C"])
    def D(self, message: V1alpha1Parameter) -> V1alpha1Template:
        return self.echo(message=message)

    @template
    @inputs.parameter(name="message")
    def echo(self, message: V1alpha1Parameter) -> V1Container:
        container = V1Container(
            image="alpine:3.7",
            name="echo",
            command=["echo", "{{inputs.parameters.message}}"],
        )

        return container

工件

工件 可以以类似于 参数 的三种形式传递:argumentsinputsoutputs,其中 arguments 是默认形式(简单使用 @artifact@parameter)。

即:inputs.artifact(...)

工件和参数都是逐个传递的,这意味着对于多个工件(参数),应调用

@inputs.artifact(name="artifact", ...)
@inputs.parameter(name="parameter_a", ...)
@inputs.parameter(...)
def foo(self, artifact: V1alpha1Artifact, prameter_b: V1alpha1Parameter, ...): pass

完整示例

Argo YAMLArgo Python

# @file: artifacts.yaml
apiVersion: argoproj.io/v1alpha1
kind: Workflow
metadata:
  name: artifact-passing
  generateName: artifact-passing-
spec:
  entrypoint: main
  templates:
  - name: main
    dag:
      tasks:
      - name: generate-artifact
        template: whalesay
      - name: consume-artifact
        template: print-message
        arguments:
          artifacts:
          # bind message to the hello-art artifact
          # generated by the generate-artifact step
          - name: message
            from: "{{tasks.generate-artifact.outputs.artifacts.hello-art}}"

  - name: whalesay
    container:
      name: "whalesay"
      image: docker/whalesay:latest
      command: [sh, -c]
      args: ["cowsay hello world | tee /tmp/hello_world.txt"]
    outputs:
      artifacts:
      # generate hello-art artifact from /tmp/hello_world.txt
      # artifacts can be directories as well as files
      - name: hello-art
        path: /tmp/hello_world.txt

  - name: print-message
    inputs:
      artifacts:
      # unpack the message input artifact
      # and put it at /tmp/message
      - name: message
        path: /tmp/message
    container:
      name: "print-message"
      image: alpine:latest
      command: [sh, -c]
      args: ["cat", "/tmp/message"]

from argo.workflows.dsl import Workflow

from argo.workflows.dsl.tasks import *
from argo.workflows.dsl.templates import *

class ArtifactPassing(Workflow):

    @task
    def generate_artifact(self) -> V1alpha1Template:
        return self.whalesay()

    @task
    @artifact(
        name="message",
        _from="{{tasks.generate-artifact.outputs.artifacts.hello-art}}"
    )
    def consume_artifact(self, message: V1alpha1Artifact) -> V1alpha1Template:
        return self.print_message(message=message)

    @template
    @outputs.artifact(name="hello-art", path="/tmp/hello_world.txt")
    def whalesay(self) -> V1Container:
        container = V1Container(
            name="whalesay",
            image="docker/whalesay:latest",
            command=["sh", "-c"],
            args=["cowsay hello world | tee /tmp/hello_world.txt"]
        )

        return container

    @template
    @inputs.artifact(name="message", path="/tmp/message")
    def print_message(self, message: V1alpha1Artifact) -> V1Container:
        container = V1Container(
            name="print-message",
            image="alpine:latest",
            command=["sh", "-c"],
            args=["cat", "/tmp/message"],
        )

        return container


进一步探索:闭包作用域

这里非常有趣。到目前为止,我们只触及了 Python 实现提供的部分好处。

如果我们想使用原生 Python 代码并将其作为工作流中的一个步骤来执行,我们有哪些选择?

选项 A) 是重用现有的思维模式,将代码放入字符串中,将其作为源传递给 V1ScriptTemplate 模型,并用 template 装饰器包装。以下代码块展示了这一点

import textwrap

class ScriptsPython(Workflow):

    ...

    @template
    def gen_random_int(self) -> V1alpha1ScriptTemplate:
        source = textwrap.dedent("""\
          import random
          i = random.randint(1, 100)
          print(i)
        """)

        template = V1alpha1ScriptTemplate(
            image="python:alpine3.6",
            name="gen-random-int",
            command=["python"],
            source=source
        )

        return template

这将产生

api_version: argoproj.io/v1alpha1
kind: Workflow
metadata:
  generate_name: scripts-python-
  name: scripts-python
spec:
  entrypoint: main

  ...

  templates:
  - name: gen-random-int
    script:
      command:
      - python
      image: python:alpine3.6
      name: gen-random-int
      source: 'import random\ni = random.randint(1, 100)\nprint(i)\n'

还不错,但也没有充分发挥潜力。既然我们已经在写 Python,为什么还要用字符串包装代码?这就是我们引入 闭包 的地方。

闭包

闭包 的逻辑非常简单。只需使用 @closure 装饰器将您要执行的功能包装在容器中。然后 闭包 将处理其余部分并返回一个 template(就像 @template 装饰器一样)。

我们唯一需要关心的是提供一个具有必要 Python 依赖项的镜像,并且该镜像存在于集群中。

未来计划消除这一步骤,但目前这是不可避免的。

遵循之前的示例

class ScriptsPython(Workflow):

    ...

    @closure(
      image="python:alpine3.6"
    )
    def gen_random_int() -> V1alpha1ScriptTemplate:
          import random

          i = random.randint(1, 100)
          print(i)

闭包实现了 V1alpha1ScriptTemplate,这意味着您可以传递诸如 resourcesenv 等等...

此外,请确保您导入了您正在使用的任何库,上下文不会被保留——闭包 作为静态方法工作,并且是从模块作用域中隔离的。

作用域

现在,如果我们有一个很大的函数(或整个脚本)怎么办。将其包装在一个单独的 Python 函数中并不太符合 Pythonic,而且会变得很繁琐。这就是我们可以利用 作用域 的地方。

比如说,我们想在运行 gen_random_int 函数之前初始化日志。

    ...

    @closure(
      scope="main",
      image="python:alpine3.6"
    )
    def gen_random_int(main) -> V1alpha1ScriptTemplate:
          import random

          main.init_logging()

          i = random.randint(1, 100)
          print(i)

    @scope(name="main")
    def init_logging(level="DEBUG"):
        import logging

        logging_level = getattr(logging, level, "INFO")
        logging.getLogger("__main__").setLevel(logging_level)

请注意我们做出的 3 个更改

    @closure(
      scope="main",  # <--- provide the closure a scope
      image="python:alpine3.6"
    )
    def gen_random_int(main):  # <--- use the scope name
    @scope(name="main")  # <--- add function to a scope
    def init_logging(level="DEBUG"):

给定作用域中的每个函数都通过作用域名称进行命名空间划分,并注入到闭包中。

即,生成的 YAML 看起来像这样

...
spec:
  ...
  templates:
    - name: gen-random-int
      script:
        command:
        - python
        image: python:alpine3.6
        name: gen-random-int
        source: |-
          import logging
          import random

          class main:
            """Scoped objects injected from scope 'main'."""

            @staticmethod
            def init_logging(level="DEBUG"):
              logging_level = getattr(logging, level, "INFO")
              logging.getLogger("__main__").setLevel(logging_level)


          main.init_logging()

          i = random.randint(1, 100)
          print(i)

使用 DSL 提交

假设我们正在运行 kubectl -n argo port-forward deployment/argo-server 2746:2746

工作流

from argo.workflows.client import (
    ApiClient,
    Configuration,
    WorkflowServiceApi,
    V1alpha1WorkflowCreateRequest)

from argo.workflows.dsl import Workflow
from argo.workflows.dsl.tasks import *
from argo.workflows.dsl.templates import *


class DagDiamond(Workflow):

    @task
    @parameter(name="message", value="A")
    def A(self, message: V1alpha1Parameter) -> V1alpha1Template:
        return self.echo(message=message)

    @task
    @parameter(name="message", value="B")
    @dependencies(["A"])
    def B(self, message: V1alpha1Parameter) -> V1alpha1Template:
        return self.echo(message=message)

    @task
    @parameter(name="message", value="C")
    @dependencies(["A"])
    def C(self, message: V1alpha1Parameter) -> V1alpha1Template:
        return self.echo(message=message)

    @task
    @parameter(name="message", value="D")
    @dependencies(["B", "C"])
    def D(self, message: V1alpha1Parameter) -> V1alpha1Template:
        return self.echo(message=message)

    @template
    @inputs.parameter(name="message")
    def echo(self, message: V1alpha1Parameter) -> V1Container:
        container = V1Container(
            image="alpine:3.7",
            name="echo",
            command=["echo", "{{inputs.parameters.message}}"],
        )

        return container

if __name__ == "__main__":
    wf = DagDiamond()
    config = Configuration(host="https://:2746")
    client = ApiClient(configuration=config)
    wf.submit(client, 'argo')

定时工作流

from argo.workflows.client import Configuration, ApiClient
from argo.workflows.dsl import template
from argo.workflows.dsl import CronWorkflow
from argo.workflows.dsl.templates import V1Container


class HelloCron(CronWorkflow):

    entrypoint = "whalesay"
    schedule = "0 0 1 1 *"

    @template
    def whalesay(self) -> V1Container:
        container = V1Container(
            image="docker/whalesay:latest",
            name="whalesay",
            command=["cowsay"],
            args=["hello world"],
        )
        return container


if __name__ == "__main__":
    wf = HelloCron()
    print(wf)
    config = Configuration(host="https://:2746")
    client = ApiClient(configuration=config)
    wf.submit(client, "argo")

编译还会将所有导入项移到前面,并删除重复项以方便和更自然的外观,这样您在查看生成的 YAML 时就不会感到眼花缭乱。


有关更多示例,请参阅 示例 文件夹。



作者

项目详情


下载文件

下载适用于您平台的文件。如果您不确定选择哪个,请了解更多关于安装包的信息。

源代码分发

argo-workflows-dsl-0.4.0.tar.gz (24.6 kB 查看哈希值)

上传时间: 源代码

构建分发

argo_workflows_dsl-0.4.0-py3-none-any.whl (29.4 kB 查看哈希值)

上传时间: Python 3

由以下组织支持