简化DAG构建
项目描述
gusty允许您更轻松地控制Airflow的DAGs、任务组和任务。gusty管理任务集合,表示为任意数量的YAML、Python、SQL、Jupyter Notebook或R Markdown文件。将任务文件目录传递给gusty的create_dag
函数后,它会立即渲染为DAG。
gusty还为您定义的每个任务文件管理依赖关系(一个DAG内的依赖关系)和外部依赖关系(其他DAG中任务的依赖关系)。您只需在任务文件内部提供一个dependencies
或external_dependencies
列表,gusty就会自动设置每个任务的依赖关系并为任何列出的外部依赖关系创建外部任务传感器。
gusty与Airflow 1.x和Airflow 2.x兼容,并提供更多功能,所有这些功能旨在使DAG的创建、管理和迭代更加流畅,从而使您可以直观地设计DAG并构建任务。
文档
gusty的官方文档托管在此: https://pipeline-tools.github.io/gusty-docs/
gusty中有什么?
五种创建任务的方法
gusty将DAG目录中的每个文件都转换为任务。默认情况下,gusty支持五种不同的文件类型,这些类型为任务创建提供了方便的方法来指定操作符和操作符参数。
文件类型 | 如何工作 |
---|---|
.yml | 使用YAML声明operator 并传递任何操作符参数 |
.py | 只需编写Python代码,默认情况下,gusty将使用PythonOperator 执行您的文件。其他选项可用 |
.sql | 在YAML头部声明一个operator ,然后在主.sql文件中编写SQL。SQL会自动发送给operator |
.ipynb | 在notebook顶部放置一个YAML块,并指定一个可以渲染你的Jupyter Notebook的operator |
.Rmd | 使用notebook顶部的YAML块,并指定一个可以渲染你的R Markdown文档的operator |
以下是一个快速的YAML任务文件示例,可能被称为hello_world.yml
operator: airflow.operators.bash.BashOperator
bash_command: echo hello world
生成的任务将是一个具有任务id hello_world
的BashOperator
。
以下是用Python文件实现的相同方法,命名为hello_world.py
,gusty会将其自动转换为默认的PythonOperator
phrase = "hello world"
print(phrase)
最后,这里有一个略微不同的.sql
示例
---
operator: airflow.providers.sqlite.operators.sqlite.SqliteOperator
---
SELECT
column_1,
column_2
FROM your_table
易于依赖
声明性依赖
每个任务文件类型都支持dependencies
和external_dependencies
参数,gusty将使用这些参数来自动分配任务之间的依赖关系,并为给定任务中列出的任何外部依赖项创建外部任务传感器。
对于.yml、.ipynb和.Rmd任务文件类型,依赖关系和external_dependencies将使用YAML语法定义
operator: airflow.operators.bash.BashOperator
bash_command: echo hello world
dependencies:
- same_dag_task
external_dependencies:
- another_dag: another_task
- a_whole_dag: all
对于外部依赖项,当任务应该等待整个外部DAG成功运行时,可以使用关键字all
对于.py任务文件类型,我们可以在文件的顶部使用一些原始的markdown来定义这些依赖关系
# ---
# dependencies:
# - same_dag_task
# external_dependencies:
# - another_dag: another_task
# - a_whole_dag: all
# python_callable: say_hello
# ---
def say_hello():
phrase = "hello world"
print(phrase)
您还会注意到,我们将之前的Python代码包装在一个名为say_hello
的函数中,并将这个函数的名称传递给python_callable
参数。默认情况下,如果没有指定operator
和没有指定python_callable
,gusty将传递一个简单的函数来运行.py文件到PythonOperator
。如果您通过名称显式传递一个python_callable
,gusty将在.py文件中搜索该函数,并将其传递给PythonOperator
。
.py文件可以接受与任何其他任务文件类型相同的operator
参数,这意味着您可以使用任何其他相关的operator(例如,PythonVirtualenvOperator
)来按需执行Python代码。
动态依赖
gusty还可以通过任务对象的dependencies
属性检测和生成依赖关系。这意味着您也可以动态设置依赖关系。这个选项的一个流行例子是,如果您的operator运行SQL,您可以解析那个SQL中的表名,并将那些表名的列表附加到operator的dependencies
属性中。如果dependencies
属性中列出的表名也是DAG中的任务id,gusty将能够自动为您设置这些依赖关系!
DAG和TaskGroup控制
DAG和TaskGroup对象是通过目录和子目录创建的,分别是。您提供给gusty的create_dag
函数的目录路径将成为您的DAG(和DAG名称),在该DAG中的任何子目录默认将转换为TaskGroup。
gusty提供了一些配置DAG和TaskGroup对象的兼容方法,我们将在下面介绍。
元数据
任何目录或子目录中的特殊文件名是METADATA.yml
,gusty将使用它来确定如何配置该DAG或TaskGroup对象。
以下是一个您可能放置在DAG目录中的METADATA.yml
文件示例
description: "An example of a DAG created using METADATA.yml"
schedule_interval: "1 0 * * *"
default_args:
owner: airflow
depends_on_past: False
start_date: !days_ago 1
email: airflow@example.com
email_on_failure: False
email_on_retry: False
retries: 1
retry_delay: !timedelta 'minutes: 5'
以下是一个您可能放置在TaskGroup子目录中的METADATA
.yml文件示例
tooltip: "This is a task group tooltip"
prefix_group_id: True
dependencies:
- hello_world
如上例所示,gusty还会接受TaskGroup的METADATA.yml
中的dependencies
和external_dependencies
。这意味着gusty也可以连接您的TaskGroup依赖关系!
请注意,gusty 默认禁用了 TaskGroup 的 prefix_group_id
参数,因为这是 gusty 的少数观点之一,即除非你明确说明,否则任务应该有明确的名称。gusty 还为 Task Groups 提供了 suffix_group_id
参数!
create_dag
虽然 METADATA.yml
将始终是 DAG 或 TaskGroup 配置的主要信息来源,但 gusty 的 create_dag
函数也接受可以传递给 Airflow 的 DAG 类的任何参数,以及一个 task_group_defaults
字典,用于设置 gusty 创建的任何 TaskGroup 的默认行为。
以下是一个使用 create_dag
的示例,其中我们使用 create_dag
参数而不是元数据
from datetime import timedelta
from airflow.utils.dates import days_ago
from gusty import create_dag
dag = create_dag(
'/usr/local/airflow/dags/hello_world',
description="A DAG created without any metadata",
schedule_interval="1 0 * * *",
default_args={
"owner": "airflow",
"depends_on_past": False,
"start_date": days_ago(1),
"email": "airflow@example.com",
"email_on_failure": False,
"email_on_retry": False,
"retries": 1,
"retry_delay": timedelta(minutes=5),
},
task_group_defaults={
"tooltip": "This is a task group tooltip",
"prefix_group_id": True
}
)
你可能注意到 task_group_defaults
不包括依赖关系。对于 Task Groups,必须使用 TaskGroup 特定的元数据设置依赖关系。
在 create_dag
和 DAG 或 TaskGroup 的 METADATA.yml
中的默认参数可以混合使用。METADATA.yml
将始终覆盖 create_dag
中设置的默认值。
create_dags
如果你有多个位于单个目录中的 gusty DAG,你可以方便地使用复数的 create_dags
函数。
create_dags
的工作方式与 create_dag
相同,但有两大例外
-
create_dags
的第一个参数是包含许多 gusty DAG 的目录的路径。 -
create_dags
的第二个参数是globals()
。globals()
实际上是你的 DAG 分配的命名空间。
让我们调整上面的 create_dag
示例以使用 create_dags
from datetime import timedelta
from airflow.utils.dates import days_ago
from gusty import create_dags
create_dags(
'/usr/local/airflow/my_gusty_dags',
globals(),
description="A default description for my DAGs.",
schedule_interval="1 0 * * *",
default_args={
"owner": "airflow",
"depends_on_past": False,
"start_date": days_ago(1),
"email": "airflow@example.com",
"email_on_failure": False,
"email_on_retry": False,
"retries": 1,
"retry_delay": timedelta(minutes=5),
},
task_group_defaults={
"tooltip": "This is a task group tooltip",
"prefix_group_id": True
}
)
上面的代码将在 /usr/local/airflow/my_gusty_dags
目录中创建许多 gusty DAG。
DAG 级别功能
gusty 在 DAG 级别提供了额外的有用参数,以帮助您轻松地设计 DAG。
root_tasks
- 应该代表 DAG 根的任务 ID 列表。例如,HTTP 传感器可能必须成功,然后 DAG 中的下游任务才能运行。leaf_tasks
- 应该代表 DAG 叶的任务 ID 列表。例如,在 DAG 运行结束时,你可能需要将报告保存到 S3。external_dependencies
- 你还可以在 DAG 级别设置外部依赖关系!使 DAG 等待其他 DAG 的操作与上面的外部依赖关系示例中的操作一样。ignore_subfolders
- 如果你不想子目录生成 Task Groups,将其设置为True
。latest_only
- 默认启用,在 DAG 的绝对根处安装一个LatestOnlyOperator
,如果 DAG 运行不是当前运行,则跳过 DAG 中的所有任务。你可以在 Airflow 的文档中了解更多关于 LatestOnlyOperator 的信息:Airflow 文档。
这些参数可以放置在 create_dag
或 METADATA.yml
中!
本地操作符支持
虽然你可以将你的本地操作符存储在 Airflow 的 plugins
目录中,并相应地引用操作符的 plugins
路径,但 gusty 还允许你将本地操作符存储在位于你的 AIRFLOW_HOME
中的 operators
文件夹内。
为了使 gusty 能够按预期支持你的操作符,你的操作符名称必须是驼峰式,且包含该操作符的文件必须是蛇形命名。
例如,如果我们想使用 HelloOperator
,这个操作符需要存储在位于你的 AIRFLOW_HOME
中的 operators
文件夹内的 hello_operator.py
文件中。
来自你的操作符的 __init__
方法的任何字段都将从 gusty 传递到你的操作符。所以如果你的 HelloOperator
有一个 name
字段,你可以使用类似以下这样的 YAML 任务文件调用这个操作符
operator: local.HelloOperator
name: World
local.
语法是 gusty 用于知道在本地操作符文件夹中查找操作符的。
多任务生成
有时任务定义可能重复。为了解决这个问题,gusty 允许在前置文件中添加一个 multi_task_spec
块。这样,您可以使用单个任务定义文件生成多个类似任务!例如,假设您想创建两个 bash 任务,每个任务包含不同的 bash_command
。您可以在单个任务定义文件中这样定义这两个任务:
operator: airflow.operators.bash.BashOperator
multi_task_spec:
bash_task_1:
bash_command: echo first_task
bash_task_2:
bash_command: echo second_task
gusty 会将上述内容转换为两个任务实例,分别为 bash_task_1
和 bash_task_2
,每个实例都有一个独特的 bash_command
。
此外,对于 .py 文件中的特殊情况 python_callable
,您可以指定 python_callable_partials
。
# ---
# python_callable: main
# python_callable_partials:
# python_task_1:
# my_kwarg: a
# python_task_2:
# my_kwarg: b
# ---
def main(my_kwarg):
return my_kwarg
gusty 会将上述内容转换为两个任务实例,分别为 python_task_1
和 python_task_2
。其中,python_task_1
将返回 "a"
,而 python_task_2
将返回 "b"
。
multi_task_spec
和 python_callable_partials
是非互斥的,因此您可以根据需要混合配置。
一种方法,但不是唯一的方法
gusty 的一个优点是,如果您选择使用此包,gusty 不必是创建 DAG 的唯一方式。您可以使用 gusty 的 create_dag
从目录生成 DAG,然后在更传统的方法感觉更合适的情况下实现 Airflow DAG 的创建。
因此,您可以随意尝试 gusty 的方法,因为您不必在所有地方都坚持使用它。但尝试过后,您可能会惊讶地发现自己开始在每个地方都使用它!
容器化演示
作为一个额外的资源,您可以查看 gusty 和 Airflow 的容器化演示,在 gusty-demo 仓库 中,它展示了如何使用 gusty 和一些自定义操作员将 SQL 查询、Jupyter 笔记本和 RMarkdown 文档集成到同一个数据管道中。
开发
以下假设您已安装 Docker。
首次启动
首先,使用 git clone
克隆此仓库。
然后
export GUSTY_DEV_HOME="~/path/to/this/project"
cd $GUSTY_DEV_HOME
make build-image
make run-image
上述命令将在名为 gusty-testing
的名称下构建开发镜像,并运行一个名为 gusty-testing
的容器。
从这里,您可以
make exec
- 在运行容器中的终端中执行。make test
- 在临时容器中运行pytest
。make coverage
- 运行pytest
并生成覆盖率报告。make browse-coverage
- 在您的浏览器中打开覆盖率报告。make stop-container
- 停止正在运行的容器。make start-container
- 启动已停止的容器。
重新构建镜像
make stop-container # if you have a running container
make remove-container # if you have a stopped container
make build-image
make run-image
项目详情
下载文件
下载适合您平台的应用程序。如果您不确定选择哪个,请了解更多关于安装包的信息。