Kedro-Airflow使将Kedro项目部署到Airflow变得容易
项目描述
Kedro-Airflow
Apache Airflow 是一个用于编排复杂工作流和数据处理管道的工具。Kedro-Airflow插件可用于
- 在原型设计阶段快速创建管道。您可以在Kedro中编写Python函数,而无需担心调度器、守护进程、服务或需要重新创建Airflow DAG文件。
- 在Kedro中进行自动依赖关系解析。这允许您绕过Airflow中指定任务顺序的需求。
- 在多个工作者之间分发Kedro任务。您还可以启用对任务运行时间的监控和调度。
安装
kedro-airflow
是一个Python插件。要安装它
pip install kedro-airflow
使用方法
您可以使用 kedro-airflow
将Kedro管道作为Airflow DAG部署,步骤如下
步骤1:生成DAG文件
在Kedro项目的根目录中运行
kedro airflow create
此命令将在您项目的 airflow_dags/
目录中生成一个 Airflow DAG 文件。您可以通过传递 --pipeline
标志来为特定的 Kedro 管道生成 DAG 文件,并通过 --env
标志来为特定的 Kedro 环境生成 DAG 文件。传递 --all
将将所有注册的 Kedro 管道转换为 Airflow DAG。
步骤 2:将 DAG 文件复制到 Airflow DAG 文件夹。
有关 DAG 文件夹的更多信息,请访问 Airflow 文档。可以通过编辑此文件来自定义 Airflow DAG 配置。
步骤 3:在 Airflow 执行器环境中打包和安装 Kedro 管道
生成和部署 DAG 文件后,您需要将 Kedro 管道打包并安装到 Airflow 执行器环境中。有关更多详细信息,请访问Apache Airflow 部署指南。
常见问题解答(FAQ)
如果我的 DAG 文件位于项目文件夹以外的目录中怎么办?
默认情况下,生成的 DAG 文件配置为与您的项目位于同一目录,如此模板所示。如果您的 DAG 文件位于项目目录之外,您需要在运行 kedro airflow create
命令后手动调整此设置。
如果我想要使用不同的 Jinja2 模板怎么办?
您可以使用额外的命令行参数 --jinja-file
(别名 -j
)来提供 Jinja2 模板的替代路径。请注意,这些文件必须接受与默认 Jinja2 模板相同的变量。
kedro airflow create --jinja-file=./custom/template.j2
如何动态地传递参数到 Airflow DAG 中?
kedro-airflow
从 Kedro 项目的 conf/base
或 conf/local
中的 airflow.yml
中获取配置。或者它可以在以 airflow
开头的文件夹中。这些参数被 Kedro 读取。参数可以全局指定,也可以按管道指定。
# Global parameters
default:
start_date: [2023, 1, 1]
max_active_runs: 3
# https://airflow.org.cn/docs/stable/scheduler.html#dag-runs
schedule_interval: "@once"
catchup: false
# Default settings applied to all tasks
owner: "airflow"
depends_on_past: false
email_on_failure: false
email_on_retry: false
retries: 1
retry_delay: 5
# Arguments specific to the pipeline (overrides the parameters above)
data_science:
owner: "airflow-ds"
参数也可以通过命令行中的 --params
传递
kedro airflow create --params "schedule_interval='@weekly'"
这些变量会被传递到从您的管道创建 Airflow DAG 的 Jinja2 模板。
如果我想使用不同于 airflow*
和 airflow**
的配置模式怎么办?
为了配置配置加载器,更新您 Kedro 项目中的 settings.py
文件。例如,如果您想使用 scheduler
名称,则按如下方式更改文件:
CONFIG_LOADER_ARGS = {"config_patterns": {"airflow": ["scheduler*", "scheduler/**"]}}
请参考 Kedro 的官方文档,了解如何添加模板、自定义解析器等。
如果我想传递不同的参数怎么办?
要传递默认模板中未指定的参数,只需传递一个自定义模板(见:"如果我想使用不同的 Jinja2 模板?")。
参数的语法为
{{ argument_name }}
为了使参数可选,可以使用
{{ argument_name | default("default_value") }}
有关示例,请参阅默认模板(airflow_dag_template.j2
)。
如果我想使用除了 airflow.yml
之外的配置文件怎么办?
默认配置模式为 ["airflow*", "airflow/**"]
。要配置 OmegaConfigLoader
,请按如下方式更新您 Kedro 项目中的 settings.py
文件:
from kedro.config import OmegaConfigLoader
CONFIG_LOADER_CLASS = OmegaConfigLoader
CONFIG_LOADER_ARGS = {
# other args
"config_patterns": { # configure the pattern for configuration files
"airflow": ["airflow*", "airflow/**"]
}
}
请参考 Kedro 的官方文档,了解如何添加模板、自定义解析器等。(https://docs.kedro.org/en/stable/configuration/advanced_configuration.html#how-to-do-templating-with-the-omegaconfigloader)
如何使用 Airflow 运行时参数?
在用户界面触发 Airflow DAG 时,可以传递参数。为了使用此功能,请使用 Params 语法 创建自定义模板。有关使用自定义模板的说明,请参阅 "如果我想使用不同的 Jinja2 模板怎么办?"。
如果我想使用不同的 Airflow Operator 呢?
要使用的 Airflow Operator 取决于项目运行的环境。您可以通过提供自定义模板来设置要使用的 Operator。有关使用自定义模板的说明,请参阅 "如果我想使用不同的 Jinja2 模板怎么办?"。丰富的 Operator 提供意味着 kedro-airflow
插件为特定 Operator 提供模板。由 kedro-airflow
提供的默认模板使用 BaseOperator
。
我可以将节点分组在一起吗?
当使用 Airflow 运行 Kedro 节点时,MemoryDatasets 通常不会在 Operator 之间共享。这将导致 DAG 运行失败。
MemoryDatasets 可以用于在 Kedro 中的节点之间提供逻辑分离,而不需要写入磁盘(在分布式运行时需要多个执行器)的开销。
通过 MemoryDatasets 连接的节点可以通过 --group-in-memory
标志进行分组。这保留了在 Kedro 中进行逻辑分离的选项,同时计算开销很小。
可以通过更改模板来使用 任务组。有关使用自定义模板的说明,请参阅 "如果我想使用不同的 Jinja2 模板怎么办?"。
我可以贡献吗?
是的!想帮助构建 Kedro-Airflow 吗?请查看我们关于 贡献 的指南。
您使用什么许可证?
Kedro-Airflow 在 Apache 2.0 许可证下授权。
Python 版本支持策略
- Kedro-Airflow 支持由 CPython 核心团队积极维护的所有 Python 版本。当一个 Python 版本达到生命周期的尽头 时,该版本的
kedro-airflow
将不再提供支持。这不是一个破坏性变更。
项目详细信息
下载文件
下载适用于您的平台文件。如果您不确定选择哪个,请了解更多关于 安装包 的信息。
源分发
构建分发
kedro_airflow-0.9.1.tar.gz 的哈希值
算法 | 哈希摘要 | |
---|---|---|
SHA256 | 3e0d9832ba41e17f5255e8ac1fb0d12b05d38d96d7af3184c86d0bcd443dcc50 |
|
MD5 | f738cda1807fc37b0a20e2172979e9a4 |
|
BLAKE2b-256 | 7a8a71fc769c2939511f57fada5619c056bb241cf982151d72d80d6e58ef0884 |
kedro_airflow-0.9.1-py3-none-any.whl 的哈希值
算法 | 哈希摘要 | |
---|---|---|
SHA256 | c6db75e134cb277cdc94d37b96b5d9b31da9b619562ac6b65c541c5cde6e081f |
|
MD5 | 300b1f0e486aa1c95a61080c1e27db93 |
|
BLAKE2b-256 | c25bae1bcc21a4e9d08f70dc0c077adc3ca6ede4f7fd593b0c861cd1a5f57dd2 |