跳转到主要内容

Oozie到Airflow迁移工具

项目描述

Oozie到Airflow

codecov Code style: black License Python 3

一个易于在Apache Oozie工作流和Apache Airflow工作流之间进行转换的工具。

该程序针对Apache Airflow >= 2.x和Apache Oozie 1.0 XML模式。

如果您想为项目做出贡献,请查看CONTRIBUTING.md

目录

背景

Apache Airflow是AirBnB于2014年开发的工作流管理系统。它是一个平台,可以以编程方式创建、调度和监控工作流。Airflow工作流设计为Python中任务的有向无环图 (DAGs)。Airflow调度器在遵循指定依赖关系的同时,在多个工作者上执行您的任务。

Apache Oozie是一个用于管理Apache Hadoop作业的工作流调度系统。Oozie工作流也设计为XML中的有向无环图 (DAGs)。

以下有一些差异

规格 任务 依赖关系 "子工作流" 参数化 通知
Oozie XML 操作节点 控制节点 子工作流 EL函数/属性文件 基于URL的回调
Airflow Python 操作符 触发规则,set_downstream() 子DAG jinja2和宏 回调/电子邮件

运行程序

请注意,您需要Python >= 3.8来运行转换器。

从PyPi安装

您可以通过pip install o2a从PyPi安装o2a。安装后,o2ao2a-validate-workflows应该在您的路径中可用。

从源代码安装

  1. (可选) 安装virtualenv

    如果您使用o2a的源代码,可以通过virtualenv设置来设置环境(例如,您可以使用virtualenvwrapper创建一个)。

  2. 安装Oozie-to-Airflow - 您有两个选择

    1. 自动:使用pip install -e .从本地文件夹安装o2a

      这将负责,包括将bin子目录添加到PATH中。

    2. 更手动

      1. 在您的virtualenv中,您可以通过pip install -r requirements.txt安装所有要求。

      2. 您可以将bin子目录添加到您的PATH中,然后以下所有脚本都可以运行而无需添加./bin前缀。例如,您可以将类似以下行添加到.bash_profile或虚拟环境中的bin/postactivate

      export PATH=${PATH}:<INSERT_PATH_TO_YOUR_OOZIE_PROJECT>/bin
      

      否则,您需要从bin子目录运行所有脚本,例如

      ./bin/o2a --help
      

在下面的所有示例命令中,假设bin目录已添加到您的PATH中 - 要么是从PyPi安装,要么是从源代码安装。

运行转换

您可以通过以下方式运行程序: o2a -i <INPUT_APPLICATION_FOLDER> -o <OUTPUT_FOLDER_PATH>

示例: o2a -i examples/demo -o output/demo

这是完整的用法指南,通过运行 o2a -h 可获得。

usage: o2a [-h] -i INPUT_DIRECTORY_PATH -o OUTPUT_DIRECTORY_PATH [-n DAG_NAME]
           [-u USER] [-s START_DAYS_AGO] [-v SCHEDULE_INTERVAL] [-d]

Convert Apache Oozie workflows to Apache Airflow workflows.

optional arguments:
  -h, --help            show this help message and exit
  -i INPUT_DIRECTORY_PATH, --input-directory-path INPUT_DIRECTORY_PATH
                        Path to input directory
  -o OUTPUT_DIRECTORY_PATH, --output-directory-path OUTPUT_DIRECTORY_PATH
                        Desired output directory
  -n DAG_NAME, --dag-name DAG_NAME
                        Desired DAG name [defaults to input directory name]
  -u USER, --user USER  The user to be used in place of all ${user.name}
                        [defaults to user who ran the conversion]
  -s START_DAYS_AGO, --start-days-ago START_DAYS_AGO
                        Desired DAG start as number of days ago
  -v SCHEDULE_INTERVAL, --schedule-interval SCHEDULE_INTERVAL
                        Desired DAG schedule interval as number of days
  -d, --dot             Renders workflow files in DOT format

应用程序文件夹结构

输入应用程序目录必须遵循以下结构。

<APPLICATION>/
             |- job.properties        - job properties that are used to run the job
             |- hdfs                  - folder with application - should be copied to HDFS
             |     |- workflow.xml    - Oozie workflow xml (1.0 schema)
             |     |- ...             - additional folders required to be copied to HDFS
             |- configuration.template.properties - template of configuration values used during conversion
             |- configuration.properties          - generated properties for configuration values

o2a库

转换后的Airflow DAGs使用通用库。这些库必须在PYTHONPATH上对所有Airflow组件(调度器、web服务器和工作者)可用,以便在解析DAGs时可以导入它们。

这些库位于o2a/o2a_libs文件夹中,最容易使它们对所有DAGs可用的方式是通过PyPi安装,使用pip install o2a-lib

支持的Oozie功能

控制节点

分支和连接

分支节点将执行路径分割成多个并发执行路径。

连接节点等待之前分支节点的所有并发执行到达它。分支和连接节点必须成对使用。连接节点假设并发执行路径是同一分支节点的子节点。

<workflow-app name="[WF-DEF-NAME]" xmlns="uri:oozie:workflow:0.1">
    ...
    <fork name="[FORK-NODE-NAME]">
        <path start="[NODE-NAME]" />
        ...
        <path start="[NODE-NAME]" />
    </fork>
    ...
    <join name="[JOIN-NODE-NAME]" to="[NODE-NAME]" />
    ...
</workflow-app>

决策

决策节点使工作流程能够在执行的路径上做出选择。

决策节点的行为可以看作是switch-case语句。

决策节点由一系列谓词-转换对以及一个默认转换组成。谓词按顺序或出现顺序评估,直到其中一个评估为true,并采取相应的转换。如果没有谓词评估为true,则采取默认转换。

谓词是JSP表达式语言(EL)表达式(参见本文件的第4.2节),它们解析为布尔值,true或false。例如: ${fs:fileSize('/usr/foo/myinputdir') gt 10 * GB}

<workflow-app name="[WF-DEF-NAME]" xmlns="uri:oozie:workflow:0.1">
    ...
    <decision name="[NODE-NAME]">
        <switch>
            <case to="[NODE_NAME]">[PREDICATE]</case>
            ...
            <case to="[NODE_NAME]">[PREDICATE]</case>
            <default to="[NODE_NAME]"/>
        </switch>
    </decision>
    ...
</workflow-app>

开始

开始节点是工作流程作业的入口点,它指示工作流程作业必须转换到的第一个工作流程节点。

当工作流程启动时,它会自动转换到start中指定的节点。

工作流程定义必须有一个开始节点。

<workflow-app name="[WF-DEF-NAME]" xmlns="uri:oozie:workflow:0.1">
  ...
  <start to="[NODE-NAME]"/>
  ...
</workflow-app>

结束

结束节点是工作流程作业的结束,它指示工作流程作业已成功完成。

当工作流程作业达到结束时,它会成功完成(SUCCEEDED)。

如果在达到结束节点时,工作流程作业启动了一个或多个操作正在执行,这些操作将被终止。在这种情况下,工作流程作业仍然被认为是成功运行的。

工作流程定义必须有一个结束节点。

<workflow-app name="[WF-DEF-NAME]" xmlns="uri:oozie:workflow:0.1">
    ...
    <end name="[NODE-NAME]"/>
    ...
</workflow-app>

终止

终止节点允许工作流程作业以错误退出。

当工作流程作业达到终止时,它会错误完成(KILLED)。

如果在达到终止节点时,工作流程作业启动了一个或多个操作正在执行,这些操作将被终止。

工作流程定义可以有零个或多个终止节点。

<workflow-app name="[WF-DEF-NAME]" xmlns="uri:oozie:workflow:0.1">
    ...
    <kill name="[NODE-NAME]">
        <message>[MESSAGE-TO-LOG]</message>
    </kill>
    ...
</workflow-app>

EL函数

目前,支持一个非常小的Oozie EL函数集。它们的工作方式是将EL表达式转换为jinja模板。转换使用Lark执行。所有必需的变量都应该传递到job.properties中。EL函数的等效函数可以在o2a_libs/functions.py中找到。

例如,以下EL表达式 ${wf:user() == firstNotNull(arg1, arg2)} 被转换为以下Jinja等价表达式:{{functions.wf.user() == functions.first_not_null(arg1, arg2)}},并且它要求job.properties文件包含arg1arg2的值。

此设计允许进行自定义EL函数映射,如果需要的话。默认情况下,所有内容都映射到模块o2a_libs.functions。这意味着为了使用EL函数映射,应该将o2a_libs.functions文件夹复制到Airflow DAG文件夹中。然后,这应该会被Airflow工作进程捕获并解析,并可供所有DAG使用。

工作流和节点通知

工作流作业可以配置为在开始和结束工作流动作节点以及开始和完成工作流作业时发送HTTP GET通知。更多信息请参见Oozie文档

Oozie-to-Airflow支持此功能。《code>job.properties文件包含工作流和动作节点通知的URL - 以下为示例

oozie.wf.workflow.notification.url=http://example.com/workflow?job-id=$jobId&status=$status
oozie.wf.action.notification.url=http://example.com/action?job-id=$jobId&node-name=$nodeName&status=$status

如果存在,Oozie-to-Airflow将为要发送的每个通知插入额外的BashOperator,在适当的节点之前或之后(对于节点通知)或在工作流开始或结束时(对于工作流通知)。在BashOperator内部将使用curl向适当的URL端点发送HTTP GET请求。

无通知的示例DAG

dag without notifications

带通知的相同DAG

dag with notifications

Airflow特定优化

由于Oozie和Airflow在某些工作流运行方面存在差异,因此Airflow DAG的输出可能与Oozie XML存在一些差异。

移除不必要的控制节点

在Airflow中,您不需要像Oozie中那样多的显式控制节点。例如,您永远不需要开始节点,在大多数情况下,结束节点也不需要。

我们在O2A中引入了Transformers的概念,它修改工作流。以下是一些移除不必要的控制节点的例子

  • RemoveEndTransformer - 当End节点未连接到决策节点时,删除End节点及其所有关系
  • RemoveKillTransformer - 当Kill节点未连接到决策节点时,删除Kill节点及其所有关系
  • RemoveStartTransformer - 删除Start节点及其所有关系
  • RemoveForkTransformer - 当没有上游节点时,删除Fork节点
  • RemoveJoinTransformer - 当没有下游节点时,删除Join节点

移除不可访问的节点

在Oozie中,为了执行节点,必须能够将其追溯到开始节点。如果一个节点是“松散”的,并且以任何方式(直接或通过其“父节点”)未连接到开始节点,它将被跳过。

然而,在Airflow中,所有任务都将执行。因此,为了复制Oozie中“跳过”松散节点行为,我们需要在转换阶段删除未连接到开始节点的节点。

这是通过RemoveInaccessibleNodeTransformer实现的。

常见已知限制

在Oozie-To-Airflow转换器的实现中存在一些限制。无法编写处理Oozie复杂工作流所有情况的转换器,因为某些功能无法轻松映射到现有的Airflow Operators,或者由于当前的Dataproc + Composer限制无法进行测试。其中一些限制可能在将来被移除。以下是现在我们已知的一些常见已知限制列表。

许多这些限制不是阻止器 - 工作流仍然会被转换为Python DAG,应该可以手动(或自动)后处理DAG以添加自定义功能。因此,即使存在这些限制,您在转换许多Oozie工作流时仍可以节省大量工作。

在以下“示例”部分列出了每个操作的更具体的限制。

文件/归档功能

在撰写本文时,我们无法确定文件/归档功能是否按预期工作。当我们映射适当的文件/归档方法时,似乎Oozie对文件/归档的处理有些不稳定。这不会阻止运行大多数操作,但某些特定复杂的工作流可能会出现问题。需要进一步测试真实的、生产环境的Oozie工作流来验证我们的实现。

示例Oozie文档

并非所有全局配置方法都受支持

Oozie实现了一些将配置参数传递给动作的方法。在现有的配置选项中,以下选项不受支持(但可以按需轻松添加)

支持uber.jar功能

uber.jar功能不受支持。 Oozie文档

支持.so和.jar库文件

Oozie将lib文件夹中的.so和.jar文件添加到所有运行的作业的本地缓存中,以供LD_LIBRARY_PATH/CLASSPATH使用。目前只有Java Mapper支持它。

缺少针对终止节点的自定义消息

终止节点可能指定了自定义日志消息。这尚未实现。 Oozie文档

不支持捕获输出

在几个动作中,您可以捕获任务的输出。这尚未实现。 示例Oozie文档

子工作流DAG必须放置在示例中

目前所有子工作流DAG必须位于examples文件夹中

EL函数支持

目前实现了许多EL函数(基本函数、fs函数和wf函数的子集)。请查看此文档以获取有关当前状态的完整信息。以下wf:functions尚未实现

所有实现的功能都可以在o2a_libs模块中找到。Oozie函数的驼峰命名已被替换为蛇形等价物(例如,lastErrorNode变为last_error_node)。

此外,由于Oozie和Airflow之间的差异,一些已实现的功能可能无法保留原始EL表达式的全部逻辑。在足够通用的方式下实现它以覆盖所有可能的案例很困难,因此将这些函数的实现留给用户会更简单。如果您需要自定义这些函数,则完全有可能为每个函数提供自己的实现,并且在许多情况下,特定的实现会比通用的实现更容易。

不支持通知代理

在Oozie中,可以使用oozie.wf.workflow.notification.proxy属性配置代理,通过该代理发送通知。

这不受支持。目前通知将直接发送,而不使用代理。

Oozie到Airflow转换的云执行环境

云环境设置

在GCP中使用Cloud Composer和Dataproc运行Oozie工作流以及运行oozie-to-airflow转换的DAG是一种简单的方法。这是转换器目前支持的环境,也是它经过大量测试的环境。这些服务允许在不进行大量本地设置的情况下进行测试。以下是有关支持环境的详细信息

Cloud Composer

  • composer-2.2.0-airflow-2.5.1
  • python版本3 (3.8.10)
  • 机器n1-standard-1
  • 节点数量:3
  • 其他PyPi包
    • sshtunnel==0.1.4

带有Oozie的Cloud Dataproc集群

  • n1-standard-2,4个vCPU,20 GB内存(需要至少16 GB RAM)
  • 主磁盘大小,50 GB
  • 镜像1.3.29-debian9
  • Hadoop版本
  • 初始化操作: oozie-5.2.sh

以下是你应该遵循的设置步骤

  1. 创建一个 Dataproc 集群,请参阅下面的 创建 Dataproc 集群
  2. 创建一个至少包含 Airflow 版本 2.0 的 Cloud Composer 环境,以测试 Apache Airflow 工作流程。
  3. 在 Composer 中设置所有必需的 Airflow 连接。这对于诸如 SSHOperator 等功能是必需的。

创建 Dataproc 集群

我们准备了 Dataproc 的 初始化操作,允许在 Dataproc 上运行 Oozie 5.2.0。

请将 oozie-5.2.sh 上传到您的 GCS 存储桶,并使用以下命令创建集群

请注意,您至少需要 20GB RAM 来在集群上运行 Oozie 作业。下面的自定义机器类型有足够的 RAM 来处理 oozie。

gcloud dataproc clusters create <CLUSTER_NAME> --region europe-west1 --subnet default --zone "" \
     --single-node --master-machine-type custom-4-20480 --master-boot-disk-size 500 \
     --image-version 1.3-deb9 --project <PROJECT_NAME> --initialization-actions 'gs://<BUCKET>/<FOLDER>/oozie-5.1.sh' \
     --initialization-action-timeout=30m

注意 1:创建集群可能需要约 20 分钟 注意 2:初始化操作只能在 单节点集群 和 Dataproc 1.3 上工作

集群创建完成后,可以从 示例 MapReduce 作业 中运行步骤,在主节点上执行 Oozie 的示例 Map-Reduce 作业。

Oozie 在端口 11000 上提供 Web UI。要启用访问,请遵循 官方说明 如何连接到集群 Web 接口。

在主节点上发出 oozie jobs 命令可以显示作业及其状态的列表。

有关测试 Oozie 到 Airflow 转换过程的更多信息,请参阅 CONTRIBUTING.md

示例

所有示例都可以在 examples 目录中找到。

EL示例

运行中

Oozie 表达式语言 (EL) 示例可以按以下方式运行: o2a -i examples/el -o output/el

这将展示使用 o2a/o2a_libs 文件夹将 EL 函数映射到 Python 方法的能力。此示例假定用户已设置有效的 Apache Airflow SSH 连接,并且 o2a/o2a_libs 文件夹已复制到 dags 文件夹(保留 o2a 父目录)。

请注意,根据当前版本,只能支持单个 EL 变量或单个 EL 函数。目前不支持变量/函数链。

输出

在此示例中,输出将创建在 ./output/el/ 文件夹中。

已知限制

决策示例尚不完全功能,因为 EL 函数尚未完全实现,因此条件目前是硬编码的。一旦实现 EL 函数,示例中的条件将进行更新。

Github 问题: 实现决策节点

SSH示例

先决条件

要更改示例中的 userhost,请编辑 examples/ssh/hdfs/workflow.xml

运行中

ssh 示例可以按以下方式运行

o2a -i examples/ssh -o output/ssh

这将转换指定的 Oozie XML,并将输出写入指定的输出目录,在本例中为 output/ssh/ssh.py

在SSH规范方面,Apache Oozie和Apache Airflow存在一些差异。在Airflow中,您需要添加/编辑一个包含指定SSH操作所需凭证的SSH特定连接。例如,如果SSH节点看起来像

<action name="ssh">
    <ssh xmlns="uri:oozie:ssh-action:0.1">
        <host>user@apache.org</host>
        <command>echo</command>
        <args>"Hello Oozie!"</args>
    </ssh>
    <ok to="end"/>
    <error to="fail"/>
</action>

那么默认的Airflow SSH连接ssh_default至少应该设置一个密码。这可以在Airflow Web UI中的管理员 > 连接下找到。从命令行无法编辑连接,因此您必须添加一个,如下所示:

airflow connections --add --conn_id <SSH_CONN_ID> --conn_type SSH --conn_password <PASSWORD>

更多信息请参阅Airflow的文档

输出

在本例中,输出将创建在./output/ssh/文件夹中。

转换后的DAG使用Airflow中的SSHOperator

已知限制

没有已知的限制。

电子邮件示例

先决条件

请首先复制/examples/email/configuration.template.properties,将其重命名为configuration.properties,并填写配置数据。

运行中

电子邮件示例可以按以下方式运行:

o2a -i examples/email -o output/email

输出

在本例中,输出将创建在./output/email/文件夹中。

转换后的DAG使用Airflow中的EmailOperator

先决条件

在Oozie中,SMTP服务器配置位于oozie-site.xml中。

对于Airflow,它需要位于airflow.cfg中。示例Airflow SMTP配置

[email]
email_backend = airflow.utils.email.send_email_smtp

[smtp]
smtp_host = example.com
smtp_starttls = True
smtp_ssl = False
smtp_user = airflow_user
smtp_password = password
smtp_port = 587
smtp_mail_from = airflow_user@example.com

有关设置Airflow配置选项的更多信息,请参阅此处

已知限制

1. 不支持附件

由于从Airflow内部提取文件到HDFS的复杂性,并提供给EmailOperator,发送附件的功能尚未实现。

解决方案: 在O2A中实现从Airflow内部HDFS提取文件的机制。

GitHub问题: 在Email映射器中添加对附件的支持

2. 不支持<content_type>标签

来自Oozie文档

从uri:oozie:email-action:0.2,也可以指定邮件内容类型为<content_type>text/html</content_type>。"text/plain"是默认值。

不幸的是,当前EmailOperator只接受mime_subtype参数。然而,它仅适用于多部分子类型,因为操作员将子类型追加到multipart/前缀。因此,从Oozie传递htmlplain没有意义。

因此,电子邮件将始终使用EmailOperator的默认内容类型值发送,该值为multipart/mixed

解决方案: 修改Airflow的EmailOperator以支持更多内容类型。

GitHub问题: 在Email映射器中支持内容类型

3. ccbcc字段在EmailOperator中未进行模板化

在EmailOperator中,只有'to'、'subject'和'html_content'字段进行了模板化。在实践中,这涵盖了Oozie电子邮件操作节点除ccbcc之外的所有字段。

因此,如果在这两个字段中的任何一个动作节点中存在EL函数,则需要Airflow中的Jinja表达式,它将不会工作 - 表达式将不会执行,而会被当作普通字符串处理。

解决方案: 修改Airflow的EmailOperator以将更多字段标记为template_fields

GitHub问题: 在EmailOperator中未模板化CC:和BCC:字段

MapReduce示例

先决条件

请首先复制examples/mapreduce/configuration.template.properties,将其重命名为configuration.properties,并填写配置数据。

运行中

MapReduce示例可以按以下方式运行:

o2a -i examples/mapreduce -o output/mapreduce

输出

在这个示例中,输出将在 ./output/mapreduce/ 文件夹中创建。

转换后的DAG使用了Airflow中的DataProcHadoopOperator

已知限制

1. 退出状态不可用

来自Oozie文档

Hadoop作业的计数器和作业退出状态(失败、杀死或成功)必须在Hadoop作业结束后可用。这些信息可以在决策节点和其他动作配置中使用。

目前我们使用DataProcHadoopOperator,它不会将作业退出状态存储在XCOM中供其他任务使用。

GitHub问题:在MapReduce操作中实现退出状态和计数器

2. 配置选项

来自Oozie文档(下划线为我们添加的)

Hadoop JobConf属性可以指定为config-default.xml的一部分

  • 与工作流应用程序捆绑的JobConf XML文件
  • 工作流定义中的标签

工作流定义中标签指定的OozieActionConfigurator的实现。

当前仅支持通过内联动作配置来配置map-reduce动作,即在工作流的XML文件定义中使用<configuration>标签。

处理全局配置属性

3. 流和管道

流和管道目前不支持。

FS示例

先决条件

GitHub问题:实现流支持

运行中

请首先复制examples/fs/configuration.template.properties,将其重命名为configuration.properties,并填写配置数据。

FS示例可以运行为

输出

o2a -i examples/fs -o output/fs

在这个示例中,输出将在./output/fs/文件夹中创建。

已知限制

转换后的DAG使用Airflow中的BashOperator

GitHub问题:FS Mapper和幂等性

FSMapper不支持dirFiles。

GitHub问题:在FsMapper中添加对dirFiles的支持

Java示例

先决条件

GitHub问题:实现流支持

运行中

Java示例可以运行为

o2a -i examples/java -o output/java

输出

在这个示例中,输出将在./output/java/文件夹中创建。

转换后的DAG使用了Airflow中的DataProcHadoopOperator

已知限制

  1. 通过oozie.launcher.action.main.class覆盖动作的Main类尚未实现。

GitHub问题:使用属性覆盖Java主类

Pig示例

先决条件

请首先复制examples/pig/configuration.template.properties,将其重命名为configuration.properties,并填写配置数据。

运行中

Pig示例可以运行为

o2a -i examples/pig -o output/pig

输出

在这个示例中,输出将在./output/pig/文件夹中创建。

转换后的DAG使用Airflow中的DataProcPigOperator

已知限制

1. 配置选项

来自Oozie文档(下划线为我们添加的)

Hadoop JobConf属性可以指定为config-default.xml的一部分

  • 与工作流应用程序捆绑的JobConf XML文件
  • 内联pig动作配置。

当前仅支持通过内联动作配置来配置pig动作,即在工作流的XML文件定义中使用<configuration>标签。

当前仅支持通过内联动作配置来配置map-reduce动作,即在工作流的XML文件定义中使用<configuration>标签。

Shell示例

先决条件

请首先复制examples/shell/configuration.template.properties,将其重命名为configuration.properties,并填写配置数据。

运行中

Shell 示例可以按以下方式运行:

o2a -i examples/shell -o output/shell

输出

在这个示例中,输出将被创建在 ./output/shell/ 文件夹中。

转换后的 DAG 使用 Airflow 中的 BashOperator,通过调用 gcloud dataproc jobs submit pig --cluster=<cluster> --region=<region> --execute 'sh <action> <args>' 来执行 Pig 的所需 Shell 操作。

已知限制

1. 退出状态不可用

来自 Oozie 文档

Shell 作业的输出(STDOUT)可以在 Shell 作业结束后提供给工作流作业。这些信息可以从决策节点内部使用。

目前我们使用的是 BashOperator,它只能将作业输出的最后一行存储在 XCOM 中。在这种情况下,该行没有帮助,因为它与 Dataproc 作业提交状态有关,而不是与 Shell 操作的结果有关。

GitHub 问题:Finalize shell mapper

2. 没有Shell启动器配置

来自 Oozie 文档

可以使用文件、使用 job-xml 元素指定 Shell 启动器配置,以及使用配置元素进行内联。

目前没有指定 Shell 启动器配置的方法(它被忽略)。

GitHub 问题:Shell Launcher Configuration

Spark示例

先决条件

请首先复制 /examples/spark/configuration.template.properties,将其重命名为 configuration.properties 并填写配置数据。

运行中

Spark 示例可以按以下方式运行:

o2a -i examples/spark -o output/spark

输出

在这个示例中,输出将被创建在 ./output/spark/ 文件夹中。

转换后的 DAG 使用 Airflow 中的 DataProcSparkOperator

已知限制

1. 只支持用 Java 编写的任务

来自 Oozie 文档

jar 元素表示逗号分隔的 jar 文件或 Python 文件列表。

解决方案仅与单个 jar 文件进行了测试。

2. 没有Spark启动器配置

来自 Oozie 文档

可以使用文件、使用 job-xml 元素指定 Shell 启动器配置,以及使用配置元素进行内联。

目前无法指定 Spark 启动器配置(它被忽略)。

3. 不支持所有元素

以下元素不受支持:job-trackername-nodemastermode

子工作流示例

先决条件

请首先复制 examples/subwf/configuration.template.properties,将其重命名为 configuration.properties 并填写配置数据。

运行中

Sub-workflow 示例可以按以下方式运行:

o2a -i examples/subwf -o output/subwf

输出

在这个示例中,输出(连同子工作流 DAG)将创建在 ./output/subwf/ 文件夹中。

转换后的 DAG 使用 Airflow 中的 SubDagOperator

已知限制

没有已知的限制。

DistCp示例

先决条件

请首先复制 examples/distcp/configuration.template.properties,将其重命名为 configuration.properties 并填写配置数据。

运行中

DistCp 示例可以按以下方式运行:

o2a -i examples/distcp -o output/distcp

输出

在这个示例中,输出将被创建在 ./output/distcp/ 文件夹中。

转换后的 DAG 使用 Airflow 中的 BashOperator,通过 gcloud dataproc jobs submit hadoop 命令提交 Hadoop DistCp 作业。

已知限制

由于未知原因,使用 Oozie 运行的示例系统测试失败。由 Airflow 运行的转换 DAG 成功完成。

决策示例

先决条件

请首先复制 examples/decision/configuration.template.properties,将其重命名为 configuration.properties 并填写配置数据。

运行中

决策示例可以按以下方式运行:

o2a -i examples/decision -o output/decision

输出

在这个示例中,输出将被创建在 ./output/decision/ 文件夹中。

转换后的 DAG 使用 Airflow 中的 BranchPythonOperator

已知限制

决策示例尚不完全功能,因为 EL 函数尚未完全实现,因此条件目前是硬编码的。一旦实现 EL 函数,示例中的条件将进行更新。

Github 问题: 实现决策节点

Hive/Hive2示例

先决条件

请首先复制 /examples/hive/configuration.template.properties,将其重命名为 configuration.properties 并填写配置数据。

运行中

Hive 示例可以按以下方式运行

o2a -i examples/hive -o output/hive

输出

在此示例中,输出将创建在 ./output/hive/ 文件夹中。

转换后的 DAG 使用了 Airflow 中的 DataProcHiveOperator

已知限制

1. 仅支持连接到本地 Hive 实例。

不支持连接配置选项。

2. 不支持所有元素

对于 Hive,以下元素不受支持: job-trackername-node。对于 Hive2,以下元素不受支持: job-trackername-nodejdbc-urlpassword

两个问题的 Github 问题: Hive 连接配置和其他元素

演示示例

演示示例包含多个操作节点和控制节点。控制节点包括 forkjoindecisionstartendkill。操作节点包括 fsmap-reducepig

大多数这些已经得到支持,但当程序遇到它不知道如何解析的节点时,它将执行一种“骨架转换” - 它将所有未知节点转换为虚拟节点。这将使用户能够在控制流存在的情况下手动解析节点。

先决条件

请首先复制 examples/demo/configuration.template.properties,将其重命名为 configuration.properties 并填写配置数据。

运行中

演示可以按以下方式运行

o2a -i examples/demo -o output/demo

这将解析并将输出写入 output/demo 目录中的输出文件。

已知限制

由于目前不支持所有 EL 函数,决策节点不完全功能。因此,为了让它在 Airflow 中运行,您可能需要编辑 Python 输出文件并更改决策节点表达式。

GitHub 上的问题: 实现决策节点

输出

在此示例中,输出(包括子工作流 DAG)将创建在 ./output/demo/ 文件夹中。

子工作流示例

先决条件

请首先复制 examples/subwf/configuration.template.properties,将其重命名为 configuration.properties 并填写配置数据。

运行中

子工作流示例是 demo 示例的子工作流。它可以按以下方式运行

o2a -i examples/childwf -o output/childwf

输出

在此示例中,输出将创建在 ./output/childwf/ 文件夹中。

已知限制

没有已知的限制。

项目详情


下载文件

下载适合您平台文件的文件。如果您不确定选择哪个,请了解更多关于 安装包 的信息。

源分布

o2a-2.0.1.tar.gz (96.1 kB 查看哈希值)

上传时间

构建分布

o2a-2.0.1-py3-none-any.whl (145.9 kB 查看哈希值)

上传时间 Python 3

支持

AWS AWS 云计算和安全赞助商 Datadog Datadog 监控 Fastly Fastly CDN Google Google 下载分析 Microsoft Microsoft PSF赞助商 Pingdom Pingdom 监控 Sentry Sentry 错误日志 StatusPage StatusPage 状态页面