跳转到主要内容

Oozie到Airflow迁移工具

项目描述

Oozie到Airflow

Build Status codecov Code style: black License Updates Python 3

一个轻松在Apache Oozie工作流和Apache Airflow工作流之间转换的工具。

该程序针对Apache Airflow >= 1.10和Apache Oozie 1.0 XML模式。

如果您想为该项目做出贡献,请参阅CONTRIBUTING.md

目录

gh-md-toc创建

背景

Apache Airflow是由AirBnB于2014年开发的一个工作流管理系统。它是一个平台,可以编程创建、安排和监控工作流。Airflow工作流设计为Python中的有向无环图 (DAG) 任务。

Apache Oozie是一个用于管理Apache Hadoop作业的工作流调度系统。Oozie工作流也设计为XML中的有向无环图 (DAG)。

以下列出了一些差异

规格。 任务 依赖项 "子工作流程" 参数化 通知
Oozie XML 动作节点 控制节点 子工作流程 EL函数/属性文件 基于URL的回调
Airflow Python 操作符 触发规则,set_downstream() 子Dag jinja2和宏 回调/电子邮件

运行程序

所需Python依赖项

此外,目录中包含的shell脚本init.sh可以执行,用于设置依赖项,并使您的本地机器准备就绪以转换示例。

# Allow init.sh to execute
$ chmod +x init.sh
# Execute init.sh
$ ./init.sh

将bin目录添加到您的PATH

您可以将bin子目录添加到您的PATH,然后以下所有脚本都可以运行而无需添加bin路径。

例如,您可以通过将类似的行添加到您的.bash_profile或虚拟环境中的bin/postactivate来执行此操作

export PATH=${PATH}:<INSERT_PATH_TO_YOUR_OOZIE_PROJECT>/bin

否则,您需要从bin子目录运行它们 - 在路径前加上路径,例如

./bin/o2a --help

在以下所有示例中,假设bin目录已添加到您的PATH中。

运行转换

您可以通过调用以下命令运行程序(至少):o2a -i <INPUT_APPLICATION_FOLDER> -o <OUTPUT_FOLDER_PATH>

示例:o2a -i examples/demo -o output/demo

这是完整的用法指南,可以通过运行o2a -h获得

usage: o2a [-h] -i INPUT_DIRECTORY_PATH -o OUTPUT_DIRECTORY_PATH [-d DAG_NAME]
           [-u USER] [-s START_DAYS_AGO] [-v SCHEDULE_INTERVAL]

Convert Apache Oozie workflows to Apache Airflow workflows.

optional arguments:
  -h, --help            show this help message and exit
  -i INPUT_DIRECTORY_PATH, --input-directory-path INPUT_DIRECTORY_PATH
                        Path to input directory
  -o OUTPUT_DIRECTORY_PATH, --output-directory-path OUTPUT_DIRECTORY_PATH
                        Desired output directory
  -d DAG_NAME, --dag-name DAG_NAME
                        Desired DAG name [defaults to input directory name]
  -u USER, --user USER  The user to be used in place of all ${user.name}
                        [defaults to user who ran the conversion]
  -s START_DAYS_AGO, --start-days-ago START_DAYS_AGO
                        Desired DAG start as number of days ago
  -v SCHEDULE_INTERVAL, --schedule-interval SCHEDULE_INTERVAL
                        Desired DAG schedule interval as number of days

应用程序文件夹结构

应用程序文件夹必须遵循以下结构

<APPLICATION>/
             |- job.properties        - job properties that are used to run the job
             |- hdfs                  - folder with application - should be copied to HDFS
             |     |- workflow.xml    - Oozie workflow xml (1.0 schema)
             |     |- ...             - additional folders required to be copied to HDFS
             |- configuration.template.properties - template of configuration values used during conversion
             |- configuration.properties          - generated properties for configuration values

支持的Oozie功能

控制节点

分支

分支节点将执行路径分割成多个并发执行路径。

汇合

合并节点等待直到上一个分支节点的所有并发执行到达它。分支和合并节点必须成对使用。合并节点假定并发执行路径是同一分支节点的子节点。

<workflow-app name="[WF-DEF-NAME]" xmlns="uri:oozie:workflow:0.1">
    ...
    <fork name="[FORK-NODE-NAME]">
        <path start="[NODE-NAME]" />
        ...
        <path start="[NODE-NAME]" />
    </fork>
    ...
    <join name="[JOIN-NODE-NAME]" to="[NODE-NAME]" />
    ...
</workflow-app>

决策

决策节点允许工作流程根据要执行的路径进行选择。

决策节点的行为可以视为switch-case语句。

决策节点由一系列谓词-转换对以及一个默认转换组成。谓词按顺序或出现顺序评估,直到其中一个评估为true并执行相应的转换。如果没有谓词评估为true,则执行默认转换。

谓词是JSP表达式语言(EL)表达式(请参阅本文件的第4.2节),它解析为布尔值,true或false。例如:${fs:fileSize('/usr/foo/myinputdir') gt 10 * GB}

<workflow-app name="[WF-DEF-NAME]" xmlns="uri:oozie:workflow:0.1">
    ...
    <decision name="[NODE-NAME]">
        <switch>
            <case to="[NODE_NAME]">[PREDICATE]</case>
            ...
            <case to="[NODE_NAME]">[PREDICATE]</case>
            <default to="[NODE_NAME]"/>
        </switch>
    </decision>
    ...
</workflow-app>

开始

开始节点是工作流程作业的入口点,它表示工作流程作业必须转换到的第一个工作流程节点。

当工作流程启动时,它自动转换为start中指定的节点。

工作流程定义必须有一个开始节点。

<workflow-app name="[WF-DEF-NAME]" xmlns="uri:oozie:workflow:0.1">
  ...
  <start to="[NODE-NAME]"/>
  ...
</workflow-app>

结束

结束节点是工作流程作业的终点,它表示工作流程作业已成功完成。

当工作流程作业到达结束节点时,它成功完成(SUCCEEDED)。

如果在到达结束节点时有一个或多个由工作流程作业启动的操作正在执行,则这些操作将被终止。在这种情况下,工作流程作业仍然被视为成功运行。

工作流程定义必须有一个结束节点。

<workflow-app name="[WF-DEF-NAME]" xmlns="uri:oozie:workflow:0.1">
    ...
    <end name="[NODE-NAME]"/>
    ...
</workflow-app>

终止

终止节点允许工作流程作业以错误退出。

当工作流程作业到达终止节点时,它以错误状态完成(KILLED)。

如果在到达终止节点时有一个或多个由工作流程作业启动的操作正在执行,则这些操作将被终止。

工作流程定义可以有零个或多个终止节点。

<workflow-app name="[WF-DEF-NAME]" xmlns="uri:oozie:workflow:0.1">
    ...
    <kill name="[NODE-NAME]">
        <message>[MESSAGE-TO-LOG]</message>
    </kill>
    ...
</workflow-app>

已知限制

本程序的目标是模仿由Oozie工作流文件概述的动作和控制流程。不幸的是,目前关于执行流程有一些限制尚未解决。当存在4个节点A、B、C、D,并且有如下Oozie指定的执行路径时,可能存在执行路径无法正确执行的情况。

A executes ok to C
B executes error to C

A executes error to D
B executes ok to D

在这种情况下,Airflow没有足够的细粒度节点执行控制。转换器未来应该能够处理这种情况,但目前不能保证其正常工作。

这是因为如果A到C正常,B在出错时也会到C,C的触发规则必须设置为DUMMY,但这意味着如果A出错,B正常,C将执行错误。

这种限制是临时的,将在Oozie-2-Airflow转换器的未来版本中删除。

EL函数

目前,支持非常有限的Oozie EL函数。它们的工作方式是存在一个字典映射,将每个Oozie EL函数字符串映射到相应的Python函数。这在utils/el_utils.py中。这种设计允许自定义EL函数映射。默认情况下,所有内容都映射到模块o2a_libs。这意味着为了使用EL函数映射,应该将o2a_libs文件夹复制到Airflow DAG文件夹中。然后应该由Airflow工作者拾取并解析,然后对所有DAG可用。

示例

所有示例都可以在examples目录中找到。

演示示例

演示示例包含多个动作和控制节点。控制节点包括forkjoindecisionstartendkill。就动作节点而言,有fsmap-reducepig

大多数这些功能已经支持,但当程序遇到不知道如何解析的节点时,它将执行一种“骨架转换”操作 - 它将所有未知节点转换为虚拟节点。这将使用户能够手动解析节点,如果他们希望这样做的话,因为控制流程是存在的。

演示可以按以下方式运行:

o2a -i examples/demo -o output/demo

这将解析并将输出写入到output/demo目录中的输出文件。

当前限制

决策节点尚未完全实现,因为没有支持所有EL函数。因此,为了在Airflow中运行,您必须编辑Python输出文件并更改决策节点表达式。

输出

在这个例子中,输出将出现在/output/ssh/test_demo_dag.py。此外,在/output/ssh/subdag_test.py中生成子工作流。

子工作流示例

childwf示例是demo示例的子工作流。它可以按以下方式运行:

o2a -i examples/childwf -o output/childwf

请确保首先复制examples/subwf/configuration.template.properties,将其重命名为configuration.properties,并填写配置数据。

输出

在这个例子中,输出将出现在output/childwf/test_childwf_dag.py

当前限制

无已知限制。

SSH示例

ssh示例可以按以下方式运行:

o2a -i examples/ssh -o output/ssh

这将转换指定的Oozie XML,并将输出写入指定的输出目录,在本例中为output/ssh/test_ssh_dag.py

在SSH规范方面,Apache Oozie与Apache Airflow存在一些差异。在Airflow中,您需要添加/编辑一个包含指定SSH操作所需凭据的SSH特定连接。例如,如果SSH节点看起来像

<action name="ssh">
    <ssh xmlns="uri:oozie:ssh-action:0.1">
        <host>user@apache.org</host>
        <command>echo</command>
        <args>"Hello Oozie!"</args>
    </ssh>
    <ok to="end"/>
    <error to="fail"/>
</action>

那么默认的Airflow SSH连接ssh_default至少应该设置一个密码。这可以在Airflow Web UI的管理 > 连接下找到。从命令行无法编辑连接,因此您必须添加一个,如下所示

airflow connections --add --conn_id <SSH_CONN_ID> --conn_type SSH --conn_password <PASSWORD>

更多信息请参阅Airflow文档

输出

在本例中,输出将出现在/output/ssh/test_ssh_dag.py中。

转换后的DAG在Airflow中使用SSHOperator

当前限制

无已知限制。

MapReduce示例

MapReduce示例可以按以下方式运行

o2a -i examples/mapreduce -o output/mapreduce

请确保首先复制examples/mapreduce/configuration.template.properties,将其重命名为configuration.properties,并填写配置数据。

输出

在本例中,输出将出现在/output/mapreduce/test_mapreduce_dag.py中。

转换后的DAG在Airflow中使用DataProcHadoopOperator

当前限制

1. 退出状态不可用

来自Oozie文档

Hadoop作业计数器和作业退出状态(失败、终止或成功)必须在Hadoop作业结束后可供工作流作业使用。这些信息可以从决策节点和其他操作配置中使用。

目前我们使用DataProcHadoopOperator,该操作不将作业退出状态存储在XCOM中以供其他任务使用。

2. 配置选项

来自Oozie文档(下划线为我们添加的)

Hadoop JobConf属性可以作为以下部分指定

  • config-default.xml的一部分
  • 与工作流应用程序捆绑的JobConf XML文件
  • 工作流定义中的标签
  • 内联map-reduce操作配置
  • 由工作流定义中的标签指定的OozieActionConfigurator的实现

目前配置map-reduce操作的唯一支持方式是使用内联操作配置,即使用工作流XML文件定义中的<configuration>标签。

3. 流和管道

流和管道目前不支持。

FS示例

FS示例可以按以下方式运行

o2a -i examples/fs -o output/fs

请确保首先复制examples/fs/configuration.template.properties,将其重命名为configuration.properties,并填写配置数据。

输出

在本例中,输出将出现在/output/fs/test_fs_dag.py中。

转换后的DAG在Airflow中使用BashOperator

当前限制

目前并非所有FS操作都是幂等的。这将得到修复。

Pig示例

Pig示例可以按以下方式运行

o2a -i examples/pig -o output/pig

请确保首先复制examples/pig/configuration.template.properties,将其重命名为configuration.properties,并填写配置数据。

输出

在本例中,输出将出现在output/pig/test_pig_dag.py中。

转换后的DAG在Airflow中使用DataProcPigOperator

当前限制

1. 配置选项

来自Oozie文档(下划线为我们添加的)

Hadoop JobConf属性可以作为以下部分指定

  • config-default.xml的一部分
  • 与工作流应用程序捆绑的JobConf XML文件
  • 工作流定义中的标签
  • 内联pig操作配置。

目前配置pig操作的唯一支持方式是使用内联操作配置,即使用工作流XML文件定义中的<configuration>标签。

Shell示例

Shell示例可以按以下方式运行

o2a -i examples/shell -o output/shell

请首先复制examples/shell/configuration.template.properties,重命名为configuration.properties,并填充配置数据。

输出

在这个例子中,输出将出现在output/shell/test_shell_dag.py

转换后的DAG使用Airflow中的BashOperator,通过调用gcloud dataproc jobs submit pig --cluster=<cluster> --region=<region> --execute 'sh <action> <args>'来执行所需的shell动作。

当前限制

1. 退出状态不可用

来自Oozie文档

Shell作业的输出(STDOUT)可以在Shell作业结束后提供给工作流作业。这些信息可以在决策节点中使用。

目前我们使用的是只能存储作业输出最后一行的BashOperator。在这种情况下,这一行没有帮助,因为它与Dataproc作业提交状态有关,而不是与Shell动作的结果有关。

2. 无Shell启动器配置

来自Oozie文档

可以通过文件、使用job-xml元素或内联、使用配置元素来指定Shell启动器配置。

目前无法指定Shell启动器配置(被忽略)。

Spark示例

Shell示例可以按以下方式运行

o2a -i examples/spark -o output/spark

请首先复制/examples/spark/configuration.template.properties,重命名为configuration.properties,并填充配置数据。

输出

在这个例子中,输出将出现在/output/spark/spark.py

转换后的DAG使用Airflow中的DataProcSparkOperator

当前限制

1. 仅支持用Java编写的任务

来自Oozie文档

jar元素表示由逗号分隔的jar文件或Python文件的列表。

该解决方案仅使用单个jar文件进行了测试。

2. 无Spark启动器配置

来自Oozie文档

可以通过文件、使用job-xml元素或内联、使用配置元素来指定Shell启动器配置。

目前无法指定Spark启动器配置(被忽略)。

3. 不支持所有元素

以下元素不受支持:job-trackername-nodemastermode

子工作流示例

子工作流示例可以运行为

o2a -i examples/subwf -o output/subwf

请确保首先复制examples/subwf/configuration.template.properties,将其重命名为configuration.properties,并填写配置数据。

输出

在这个例子中,输出将出现在output/subwf/test_subwf_dag.py。此外,同一目录下还将生成一个名为subdag_test.py(名称将很快更改)的文件,其中包含返回实际Airflow子DAG的工厂方法sub_dag()

转换后的DAG使用Airflow中的SubDagOperator

当前限制

目前生成的子工作流名称是固定的,这意味着每个DAG文件夹只能支持一个子工作流。这很快就会得到解决。

决策示例

决策示例可以运行为

o2a -i examples/decision -o output/decision

请首先复制examples/decision/configuration.template.properties,重命名为configuration.properties,并填充配置数据。

输出

在这个例子中,输出将出现在output/decision/test_decision_dag.py

转换后的DAG使用Airflow中的BranchPythonOperator

当前限制

决策示例尚未完全实现,因为EL函数尚未完全实现,所以条件目前是硬编码的。一旦EL函数实现,示例中的条件将得到更新。

EL示例

Oozie表达式语言(EL)示例可以运行为:o2a -i examples/el -o output/el

这展示了使用o2a_libs目录将EL函数映射到Python方法的能力。此示例假定用户已设置有效的Apache Airflow SSH连接,并且已将o2a_libs目录复制到dags文件夹中。

请注意,当前版本仅支持单个EL变量或单个EL函数。目前不支持变量/函数链式调用。

输出

在这个例子中,输出将显示在output/el/test_el_dag.py中。

当前限制

决策示例尚未完全实现,因为EL函数尚未完全实现,所以条件目前是硬编码的。一旦EL函数实现,示例中的条件将得到更新。

项目详情


下载文件

下载适用于您平台的文件。如果您不确定选择哪个,请了解更多关于安装包的信息。

源分发

o2a-potiuk-0.0.1.tar.gz (61.3 kB 查看哈希值)

上传时间

构建分发

o2a_potiuk-0.0.1-py3-none-any.whl (111.4 kB 查看哈希值)

上传时间 Python 3

由以下支持