Argo Workflows的DSL
项目描述
argo-python-dsl 
用于Argo Workflows的Python DSL
如果您是Argo的新用户,我们建议您查看纯YAML中的示例。该语言是描述性的,Argo的示例提供了详尽的解释。
对于经验更丰富的用户,这种DSL允许您在Python中以编程方式定义Argo Workflows,然后将其转换为Argo YAML规范。
DSL使用Argo Python客户端存储库中定义的Argo模型。结合这两种方法,我们获得了对Argo Workflows的完全底层控制。
入门
Hello World
本例演示了最基本的功能。通过继承 Workflow
类和单个模板(使用 @template
装饰器)定义一个 Workflow
。
工作流的入口点定义为 entrypoint
类属性。
Argo YAML | Argo Python |
---|---|
# @file: hello-world.yaml
apiVersion: argoproj.io/v1alpha1
kind: Workflow
metadata:
name: hello-world
generateName: hello-world-
spec:
entrypoint: whalesay
templates:
- name: whalesay
container:
name: whalesay
image: docker/whalesay:latest
command: [cowsay]
args: ["hello world"]
|
from argo.workflows.dsl import Workflow
from argo.workflows.dsl import template
from argo.workflows.dsl.templates import V1Container
class HelloWorld(Workflow):
entrypoint = "whalesay"
@template
def whalesay(self) -> V1Container:
container = V1Container(
image="docker/whalesay:latest",
name="whalesay",
command=["cowsay"],
args=["hello world"]
)
return container
|
DAG: 任务
本例演示了通过依赖关系定义的以 菱形 结构的任务。任务使用 @task
装饰器定义,并且它们必须返回一个有效的模板。
入口点自动创建为 main
,用于 Workflow
的顶层任务。
Argo YAML | Argo Python |
---|---|
# @file: dag-diamond.yaml
# The following workflow executes a diamond workflow
#
# A
# / \
# B C
# \ /
# D
apiVersion: argoproj.io/v1alpha1
kind: Workflow
metadata:
name: dag-diamond
generateName: dag-diamond-
spec:
entrypoint: main
templates:
- name: main
dag:
tasks:
- name: A
template: echo
arguments:
parameters: [{name: message, value: A}]
- name: B
dependencies: [A]
template: echo
arguments:
parameters: [{name: message, value: B}]
- name: C
dependencies: [A]
template: echo
arguments:
parameters: [{name: message, value: C}]
- name: D
dependencies: [B, C]
template: echo
arguments:
parameters: [{name: message, value: D}]
# @task: [A, B, C, D]
- name: echo
inputs:
parameters:
- name: message
container:
name: echo
image: alpine:3.7
command: [echo, "{{inputs.parameters.message}}"]
|
from argo.workflows.dsl import Workflow
from argo.workflows.dsl.tasks import *
from argo.workflows.dsl.templates import *
class DagDiamond(Workflow):
@task
@parameter(name="message", value="A")
def A(self, message: V1alpha1Parameter) -> V1alpha1Template:
return self.echo(message=message)
@task
@parameter(name="message", value="B")
@dependencies(["A"])
def B(self, message: V1alpha1Parameter) -> V1alpha1Template:
return self.echo(message=message)
@task
@parameter(name="message", value="C")
@dependencies(["A"])
def C(self, message: V1alpha1Parameter) -> V1alpha1Template:
return self.echo(message=message)
@task
@parameter(name="message", value="D")
@dependencies(["B", "C"])
def D(self, message: V1alpha1Parameter) -> V1alpha1Template:
return self.echo(message=message)
@template
@inputs.parameter(name="message")
def echo(self, message: V1alpha1Parameter) -> V1Container:
container = V1Container(
image="alpine:3.7",
name="echo",
command=["echo", "{{inputs.parameters.message}}"],
)
return container
|
工件
工件
可以以类似于 参数
的三种形式传递:arguments
、inputs
和 outputs
,其中 arguments
是默认形式(简单使用 @artifact
或 @parameter
)。
即:inputs.artifact(...)
工件和参数都是逐个传递的,这意味着对于多个工件(参数),应调用
@inputs.artifact(name="artifact", ...)
@inputs.parameter(name="parameter_a", ...)
@inputs.parameter(...)
def foo(self, artifact: V1alpha1Artifact, prameter_b: V1alpha1Parameter, ...): pass
完整示例
Argo YAML | Argo Python |
---|---|
# @file: artifacts.yaml
apiVersion: argoproj.io/v1alpha1
kind: Workflow
metadata:
name: artifact-passing
generateName: artifact-passing-
spec:
entrypoint: main
templates:
- name: main
dag:
tasks:
- name: generate-artifact
template: whalesay
- name: consume-artifact
template: print-message
arguments:
artifacts:
# bind message to the hello-art artifact
# generated by the generate-artifact step
- name: message
from: "{{tasks.generate-artifact.outputs.artifacts.hello-art}}"
- name: whalesay
container:
name: "whalesay"
image: docker/whalesay:latest
command: [sh, -c]
args: ["cowsay hello world | tee /tmp/hello_world.txt"]
outputs:
artifacts:
# generate hello-art artifact from /tmp/hello_world.txt
# artifacts can be directories as well as files
- name: hello-art
path: /tmp/hello_world.txt
- name: print-message
inputs:
artifacts:
# unpack the message input artifact
# and put it at /tmp/message
- name: message
path: /tmp/message
container:
name: "print-message"
image: alpine:latest
command: [sh, -c]
args: ["cat", "/tmp/message"]
|
from argo.workflows.dsl import Workflow
from argo.workflows.dsl.tasks import *
from argo.workflows.dsl.templates import *
class ArtifactPassing(Workflow):
@task
def generate_artifact(self) -> V1alpha1Template:
return self.whalesay()
@task
@artifact(
name="message",
_from="{{tasks.generate-artifact.outputs.artifacts.hello-art}}"
)
def consume_artifact(self, message: V1alpha1Artifact) -> V1alpha1Template:
return self.print_message(message=message)
@template
@outputs.artifact(name="hello-art", path="/tmp/hello_world.txt")
def whalesay(self) -> V1Container:
container = V1Container(
name="whalesay",
image="docker/whalesay:latest",
command=["sh", "-c"],
args=["cowsay hello world | tee /tmp/hello_world.txt"]
)
return container
@template
@inputs.artifact(name="message", path="/tmp/message")
def print_message(self, message: V1alpha1Artifact) -> V1Container:
container = V1Container(
name="print-message",
image="alpine:latest",
command=["sh", "-c"],
args=["cat", "/tmp/message"],
)
return container
|
进一步探索:闭包
和 作用域
这里非常有趣。到目前为止,我们只触及了 Python 实现提供的部分好处。
如果我们想使用原生 Python 代码并将其作为工作流中的一个步骤来执行,我们有哪些选择?
选项 A) 是重用现有的思维模式,将代码放入字符串中,将其作为源传递给 V1ScriptTemplate
模型,并用 template
装饰器包装。以下代码块展示了这一点
import textwrap
class ScriptsPython(Workflow):
...
@template
def gen_random_int(self) -> V1alpha1ScriptTemplate:
source = textwrap.dedent("""\
import random
i = random.randint(1, 100)
print(i)
""")
template = V1alpha1ScriptTemplate(
image="python:alpine3.6",
name="gen-random-int",
command=["python"],
source=source
)
return template
这将产生
api_version: argoproj.io/v1alpha1
kind: Workflow
metadata:
generate_name: scripts-python-
name: scripts-python
spec:
entrypoint: main
...
templates:
- name: gen-random-int
script:
command:
- python
image: python:alpine3.6
name: gen-random-int
source: 'import random\ni = random.randint(1, 100)\nprint(i)\n'
还不错,但也没有充分发挥潜力。既然我们已经在写 Python,为什么还要用字符串包装代码?这就是我们引入 闭包
的地方。
闭包
闭包
的逻辑非常简单。只需使用 @closure
装饰器将您要执行的功能包装在容器中。然后 闭包
将处理其余部分并返回一个 template
(就像 @template
装饰器一样)。
我们唯一需要关心的是提供一个具有必要 Python 依赖项的镜像,并且该镜像存在于集群中。
未来计划消除这一步骤,但目前这是不可避免的。
遵循之前的示例
class ScriptsPython(Workflow):
...
@closure(
image="python:alpine3.6"
)
def gen_random_int() -> V1alpha1ScriptTemplate:
import random
i = random.randint(1, 100)
print(i)
闭包实现了 V1alpha1ScriptTemplate
,这意味着您可以传递诸如 resources
、env
等等...
此外,请确保您导入了您正在使用的任何库,上下文不会被保留——闭包
作为静态方法工作,并且是从模块作用域中隔离的。
作用域
现在,如果我们有一个很大的函数(或整个脚本)怎么办。将其包装在一个单独的 Python 函数中并不太符合 Pythonic,而且会变得很繁琐。这就是我们可以利用 作用域
的地方。
比如说,我们想在运行 gen_random_int
函数之前初始化日志。
...
@closure(
scope="main",
image="python:alpine3.6"
)
def gen_random_int(main) -> V1alpha1ScriptTemplate:
import random
main.init_logging()
i = random.randint(1, 100)
print(i)
@scope(name="main")
def init_logging(level="DEBUG"):
import logging
logging_level = getattr(logging, level, "INFO")
logging.getLogger("__main__").setLevel(logging_level)
请注意我们做出的 3 个更改
@closure(
scope="main", # <--- provide the closure a scope
image="python:alpine3.6"
)
def gen_random_int(main): # <--- use the scope name
@scope(name="main") # <--- add function to a scope
def init_logging(level="DEBUG"):
给定作用域中的每个函数都通过作用域名称进行命名空间划分,并注入到闭包中。
即,生成的 YAML 看起来像这样
...
spec:
...
templates:
- name: gen-random-int
script:
command:
- python
image: python:alpine3.6
name: gen-random-int
source: |-
import logging
import random
class main:
"""Scoped objects injected from scope 'main'."""
@staticmethod
def init_logging(level="DEBUG"):
logging_level = getattr(logging, level, "INFO")
logging.getLogger("__main__").setLevel(logging_level)
main.init_logging()
i = random.randint(1, 100)
print(i)
使用 DSL 提交
假设我们正在运行 kubectl -n argo port-forward deployment/argo-server 2746:2746
工作流
from argo.workflows.client import (
ApiClient,
Configuration,
WorkflowServiceApi,
V1alpha1WorkflowCreateRequest)
from argo.workflows.dsl import Workflow
from argo.workflows.dsl.tasks import *
from argo.workflows.dsl.templates import *
class DagDiamond(Workflow):
@task
@parameter(name="message", value="A")
def A(self, message: V1alpha1Parameter) -> V1alpha1Template:
return self.echo(message=message)
@task
@parameter(name="message", value="B")
@dependencies(["A"])
def B(self, message: V1alpha1Parameter) -> V1alpha1Template:
return self.echo(message=message)
@task
@parameter(name="message", value="C")
@dependencies(["A"])
def C(self, message: V1alpha1Parameter) -> V1alpha1Template:
return self.echo(message=message)
@task
@parameter(name="message", value="D")
@dependencies(["B", "C"])
def D(self, message: V1alpha1Parameter) -> V1alpha1Template:
return self.echo(message=message)
@template
@inputs.parameter(name="message")
def echo(self, message: V1alpha1Parameter) -> V1Container:
container = V1Container(
image="alpine:3.7",
name="echo",
command=["echo", "{{inputs.parameters.message}}"],
)
return container
if __name__ == "__main__":
wf = DagDiamond()
config = Configuration(host="https://:2746")
client = ApiClient(configuration=config)
wf.submit(client, 'argo')
定时工作流
from argo.workflows.client import Configuration, ApiClient
from argo.workflows.dsl import template
from argo.workflows.dsl import CronWorkflow
from argo.workflows.dsl.templates import V1Container
class HelloCron(CronWorkflow):
entrypoint = "whalesay"
schedule = "0 0 1 1 *"
@template
def whalesay(self) -> V1Container:
container = V1Container(
image="docker/whalesay:latest",
name="whalesay",
command=["cowsay"],
args=["hello world"],
)
return container
if __name__ == "__main__":
wf = HelloCron()
print(wf)
config = Configuration(host="https://:2746")
client = ApiClient(configuration=config)
wf.submit(client, "argo")
编译还会将所有导入项移到前面,并删除重复项以方便和更自然的外观,这样您在查看生成的 YAML 时就不会感到眼花缭乱。
有关更多示例,请参阅 示例 文件夹。
作者
- [ 维护者 ] Yudi Xue binarycrayon@gmail.com
- [ 前维护者 ] Marek Cermak macermak@redhat.com,prace.mcermak@gmail.com
项目详情
下载文件
下载适用于您平台的文件。如果您不确定选择哪个,请了解更多关于安装包的信息。
源代码分发
构建分发
argo-workflows-dsl-0.4.0.tar.gz的哈希值
算法 | 哈希摘要 | |
---|---|---|
SHA256 | 134b1ba784874a4bd909181413ee5d598bcb89251114c8cea0125e3c597916f4 |
|
MD5 | 6206e32bbdda3cea1001b353076f0a99 |
|
BLAKE2b-256 | 7c4fb56bec1ed7952aa882cfe7bff28a2921a13de6585a12771197e894058502 |
argo_workflows_dsl-0.4.0-py3-none-any.whl的哈希值
算法 | 哈希摘要 | |
---|---|---|
SHA256 | 16b36df85020c85c542d04138b84825af07bfe7973ff1cdb0491d33a62831197 |
|
MD5 | b4b5567bd830ca9e6a5d857018ee9bc5 |
|
BLAKE2b-256 | c1c266a9fb28945ac7fc0a6e672dd27036dac1f27d14888654034f20801ed6bc |