跳转到主要内容

从YAML文件动态构建Airflow DAGs

项目描述

dag-factory

Github Actions Coverage PyPi Code Style Downloads

dag-factory 是一个用于从YAML配置文件动态生成 Apache Airflow DAGs 的库。

安装

要安装 dag-factory,请运行 pip install dag-factory。它需要Python 3.6.0+ 和 Apache Airflow 2.0+。

用法

在您的Airflow环境中安装 dag-factory 后,创建DAGs有两个步骤。首先,我们需要创建一个YAML配置文件。例如

example_dag1:
  default_args:
    owner: 'example_owner'
    start_date: 2018-01-01  # or '2 days'
    end_date: 2018-01-05
    retries: 1
    retry_delay_sec: 300
  schedule_interval: '0 3 * * *'
  concurrency: 1
  max_active_runs: 1
  dagrun_timeout_sec: 60
  default_view: 'tree'  # or 'graph', 'duration', 'gantt', 'landing_times'
  orientation: 'LR'  # or 'TB', 'RL', 'BT'
  description: 'this is an example dag!'
  on_success_callback_name: print_hello
  on_success_callback_file: /usr/local/airflow/dags/print_hello.py
  on_failure_callback_name: print_hello
  on_failure_callback_file: /usr/local/airflow/dags/print_hello.py
  tasks:
    task_1:
      operator: airflow.operators.bash_operator.BashOperator
      bash_command: 'echo 1'
    task_2:
      operator: airflow.operators.bash_operator.BashOperator
      bash_command: 'echo 2'
      dependencies: [task_1]
    task_3:
      operator: airflow.operators.bash_operator.BashOperator
      bash_command: 'echo 3'
      dependencies: [task_1]

然后,在您的Airflow环境中的DAGs文件夹中,您需要创建一个这样的Python文件

from airflow import DAG
import dagfactory

dag_factory = dagfactory.DagFactory("/path/to/dags/config_file.yml")

dag_factory.clean_dags(globals())
dag_factory.generate_dags(globals())

这个DAG将在Airflow中生成并准备好运行!

如果您有多个配置文件,可以像这样导入它们

# 'airflow' word is required for the dagbag to parse this file
from dagfactory import load_yaml_dags

load_yaml_dags(globals_dict=globals(), suffix=['dag.yaml'])

screenshot

注意

HttpSensor(自0.10.0版本起)

airflow.sensors.http_sensor与所有支持的Airflow版本兼容。在Airflow 2.0+中,可以在操作符值中使用新的包名:airflow.providers.http.sensors.http

以下示例展示了在Python文件中response_check逻辑

task_2:
      operator: airflow.sensors.http_sensor.HttpSensor
      http_conn_id: 'test-http'
      method: 'GET'
      response_check_name: check_sensor
      response_check_file: /path/to/example1/http_conn.py
      dependencies: [task_1]

response_check逻辑也可以作为lambda提供

task_2:
      operator: airflow.sensors.http_sensor.HttpSensor
      http_conn_id: 'test-http'
      method: 'GET'
      response_check_lambda: 'lambda response: "ok" in reponse.text'
      dependencies: [task_1]

优势

  • 无需了解Python即可构建DAG
  • 无需学习Airflow原语即可构建DAG
  • 避免重复代码
  • 每个人都喜欢YAML! ;)

贡献

欢迎贡献力量!只需提交Pull Request或Github Issue。

项目详情


下载文件

下载您平台上的文件。如果您不确定选择哪一个,请了解更多关于安装包的信息。

源分布

dag-factory-0.19.0.tar.gz (16.2 kB 查看散列)

上传

构建分布

dag_factory-0.19.0-py2.py3-none-any.whl (17.0 kB 查看散列)

上传 Python 2 Python 3

由以下组织支持

AWS AWS 云计算和安全赞助商 Datadog Datadog 监控 Fastly Fastly CDN Google Google 下载分析 Microsoft Microsoft PSF 赞助商 Pingdom Pingdom 监控 Sentry Sentry 错误记录 StatusPage StatusPage 状态页