跳转到主要内容

{{ 描述 }}

项目描述

DGP UI

这个库和应用程序提供了一个围绕Airflow的包装器,提供了一种通过基于配置的web-ui添加/删除DAGs(管道)的方法,该配置定义了管道'种类'以及每种类型所需的参数。

管道仪表板

Pipeline Dashboard

编辑/新建管道

Edit/New Pipeline

管道状态

Pipeline Status

快速入门

  1. 创建包含以下内容的文件夹
  • 一个configuration.yaml文件,其中包含您的管道种类详情,例如
{
    "kinds": [
        {
            "name": "kind1",
            "display": "Kind 1",
            "fields": [
                {
                    "name": "param1",
                    "display": "Parameter 1"
                },
                {
                    "name": "param2",
                    "display": "Parameter 2"
                }
            ]
        },
        {
            "name": "kind2",
            "display": "Kind 2",
            "fields": [
                {
                    "name": "param3",
                    "display": "Parameter 3"
                },
                {
                    "name": "param4",
                    "display": "Parameter 4"
                }
            ]
        }
    ],
    "schedules": [
        {
            "name": "monthly",
            "display": "Monthly"
        },
        {
            "name": "daily",
            "display": "Daily"
        }
    ]

}

(如果没有指定计划,将使用默认的计划列表)。

  • Airflow DAGs Creator - 一个Python文件,它读取管道配置并创建您的Airflow DAGs。示例代码
import datetime
import logging
from airflow import DAG
from airflow.operators.bash_operator import BashOperator
from airflow.utils import dates
from etl_server.models import Models

etl_models = Models()

default_args = {
    'owner': 'Airflow',
    'depends_on_past': False,
    'start_date': dates.days_ago(1),
}

for pipeline in etl_models.all_pipelines():
  # pipeline looks like this:
  # {
  #   "id": "<identifier>",
  #   "name": "<English Name of Pipeline>",
  #   "kind": "<kind-name>",
  #   "schedule": "<schedule>",
  #   "params": {
  #      "field1": "value1",
  #      .. other fields, based on kind's fields in configuration
  #   }
  # }
    dag_id = pipeline['id']
    logging.info('Initializing DAG %s', dag_id)
    dag = DAG(dag_id, default_args=default_args, schedule_interval=datetime.timedelta(days=1))
    task = BashOperator(task_id=dag_id,
                        bash_command='echo "%s"; sleep 10 ; echo done' % pipeline['name'],
                        dag=dag)
    globals()[dag_id] = dag
  1. 使用docker-compose设置运行服务器,示例docker-compose.yaml文件
version: "3"

services:

  db:
    image: postgres:12
    environment:
      POSTGRES_PASSWORD: postgres
      POSTGRES_USER: postgres
      POSTGRES_DB: etls
    expose:
      - 5432
    volumes: 
      - /var/lib/postgresql/data

  server:
    build: .
    image: akariv/airflow-config-ui
    environment:
      DATABASE_URL: postgresql://postgres:postgres@db/etls
      AIRFLOW__CORE__SQL_ALCHEMY_CONN: postgresql://postgres:postgres@db/etls
    expose:
      - 5000
    ports:
      - 5000:5000
    depends_on: 
      - db
    volumes: 
      - /path/to/local/dags/folder/:/app/dags

运行后(docker-compose up -d server),在浏览器中打开http://localhost:5000以查看web UI。

另一种选择是创建一个新的Docker镜像,该镜像继承自akariv/airflow-config-ui,并用configuration.json文件和您的DAG Python文件替换/app/dags/的内容。

由以下提供支持