跳转到主要内容

Kedro-Airflow使将Kedro项目部署到Airflow变得容易

项目描述

Kedro-Airflow

License Python Version PyPI Version Code Style: Black

Apache Airflow 是一个用于编排复杂工作流和数据处理管道的工具。Kedro-Airflow插件可用于

  • 在原型设计阶段快速创建管道。您可以在Kedro中编写Python函数,而无需担心调度器、守护进程、服务或需要重新创建Airflow DAG文件。
  • 在Kedro中进行自动依赖关系解析。这允许您绕过Airflow中指定任务顺序的需求。
  • 在多个工作者之间分发Kedro任务。您还可以启用对任务运行时间的监控和调度。

安装

kedro-airflow 是一个Python插件。要安装它

pip install kedro-airflow

使用方法

您可以使用 kedro-airflow 将Kedro管道作为Airflow DAG部署,步骤如下

步骤1:生成DAG文件

在Kedro项目的根目录中运行

kedro airflow create

此命令将在您项目的 airflow_dags/ 目录中生成一个 Airflow DAG 文件。您可以通过传递 --pipeline 标志来为特定的 Kedro 管道生成 DAG 文件,并通过 --env 标志来为特定的 Kedro 环境生成 DAG 文件。传递 --all 将将所有注册的 Kedro 管道转换为 Airflow DAG。

步骤 2:将 DAG 文件复制到 Airflow DAG 文件夹。

有关 DAG 文件夹的更多信息,请访问 Airflow 文档。可以通过编辑此文件来自定义 Airflow DAG 配置。

步骤 3:在 Airflow 执行器环境中打包和安装 Kedro 管道

生成和部署 DAG 文件后,您需要将 Kedro 管道打包并安装到 Airflow 执行器环境中。有关更多详细信息,请访问Apache Airflow 部署指南

常见问题解答(FAQ)

如果我的 DAG 文件位于项目文件夹以外的目录中怎么办?

默认情况下,生成的 DAG 文件配置为与您的项目位于同一目录,如此模板所示。如果您的 DAG 文件位于项目目录之外,您需要在运行 kedro airflow create 命令后手动调整此设置。

如果我想要使用不同的 Jinja2 模板怎么办?

您可以使用额外的命令行参数 --jinja-file(别名 -j)来提供 Jinja2 模板的替代路径。请注意,这些文件必须接受与默认 Jinja2 模板相同的变量。

kedro airflow create --jinja-file=./custom/template.j2

如何动态地传递参数到 Airflow DAG 中?

kedro-airflow 从 Kedro 项目的 conf/baseconf/local 中的 airflow.yml 中获取配置。或者它可以在以 airflow 开头的文件夹中。这些参数被 Kedro 读取。参数可以全局指定,也可以按管道指定。

# Global parameters
default:
    start_date: [2023, 1, 1]
    max_active_runs: 3
    # https://airflow.org.cn/docs/stable/scheduler.html#dag-runs
    schedule_interval: "@once"
    catchup: false
    # Default settings applied to all tasks
    owner: "airflow"
    depends_on_past: false
    email_on_failure: false
    email_on_retry: false
    retries: 1
    retry_delay: 5

# Arguments specific to the pipeline (overrides the parameters above)
data_science:
    owner: "airflow-ds"

参数也可以通过命令行中的 --params 传递

kedro airflow create --params "schedule_interval='@weekly'"

这些变量会被传递到从您的管道创建 Airflow DAG 的 Jinja2 模板。

如果我想使用不同于 airflow*airflow** 的配置模式怎么办?

为了配置配置加载器,更新您 Kedro 项目中的 settings.py 文件。例如,如果您想使用 scheduler 名称,则按如下方式更改文件:

CONFIG_LOADER_ARGS = {"config_patterns": {"airflow": ["scheduler*", "scheduler/**"]}}

请参考 Kedro 的官方文档,了解如何添加模板、自定义解析器等。

如果我想传递不同的参数怎么办?

要传递默认模板中未指定的参数,只需传递一个自定义模板(见:"如果我想使用不同的 Jinja2 模板?")。

参数的语法为

{{ argument_name }}

为了使参数可选,可以使用

{{ argument_name | default("default_value") }}

有关示例,请参阅默认模板(airflow_dag_template.j2)。

如果我想使用除了 airflow.yml 之外的配置文件怎么办?

默认配置模式为 ["airflow*", "airflow/**"]。要配置 OmegaConfigLoader,请按如下方式更新您 Kedro 项目中的 settings.py 文件:

from kedro.config import OmegaConfigLoader

CONFIG_LOADER_CLASS = OmegaConfigLoader
CONFIG_LOADER_ARGS = {
    # other args
    "config_patterns": {  # configure the pattern for configuration files
        "airflow": ["airflow*", "airflow/**"]
    }
}

请参考 Kedro 的官方文档,了解如何添加模板、自定义解析器等。(https://docs.kedro.org/en/stable/configuration/advanced_configuration.html#how-to-do-templating-with-the-omegaconfigloader

如何使用 Airflow 运行时参数?

在用户界面触发 Airflow DAG 时,可以传递参数。为了使用此功能,请使用 Params 语法 创建自定义模板。有关使用自定义模板的说明,请参阅 "如果我想使用不同的 Jinja2 模板怎么办?"

如果我想使用不同的 Airflow Operator 呢?

要使用的 Airflow Operator 取决于项目运行的环境。您可以通过提供自定义模板来设置要使用的 Operator。有关使用自定义模板的说明,请参阅 "如果我想使用不同的 Jinja2 模板怎么办?"。丰富的 Operator 提供意味着 kedro-airflow 插件为特定 Operator 提供模板。由 kedro-airflow 提供的默认模板使用 BaseOperator

我可以将节点分组在一起吗?

当使用 Airflow 运行 Kedro 节点时,MemoryDatasets 通常不会在 Operator 之间共享。这将导致 DAG 运行失败。

MemoryDatasets 可以用于在 Kedro 中的节点之间提供逻辑分离,而不需要写入磁盘(在分布式运行时需要多个执行器)的开销。

通过 MemoryDatasets 连接的节点可以通过 --group-in-memory 标志进行分组。这保留了在 Kedro 中进行逻辑分离的选项,同时计算开销很小。

可以通过更改模板来使用 任务组。有关使用自定义模板的说明,请参阅 "如果我想使用不同的 Jinja2 模板怎么办?"

我可以贡献吗?

是的!想帮助构建 Kedro-Airflow 吗?请查看我们关于 贡献 的指南。

您使用什么许可证?

Kedro-Airflow 在 Apache 2.0 许可证下授权。

Python 版本支持策略

项目详细信息


下载文件

下载适用于您的平台文件。如果您不确定选择哪个,请了解更多关于 安装包 的信息。

源分发

kedro_airflow-0.9.1.tar.gz (16.0 kB 查看哈希值)

上传

构建分发

kedro_airflow-0.9.1-py3-none-any.whl (10.5 kB 查看哈希值)

上传时间 Python 3

由以下提供支持