跳转到主要内容

Apache Airflow的Fivetran异步提供程序

项目描述

Fivetran异步提供程序用于Apache Airflow

此软件包提供了一个异步操作符、传感器和钩子,将Fivetran集成到Apache Airflow中。FivetranSensor允许您在运行下游进程之前监视Fivetran同步作业的完成情况。FivetranOperator提交Fivetran同步作业,并在触发器上轮询其状态。由于异步传感器或操作符在触发器上进行轮询时可以释放工作槽位,因此它们在资源消耗方面比传统的“同步”传感器和操作符要少。

Fivetran自动化您的数据管道,Airflow自动化您的数据处理。

安装

先决条件:运行apache-airflow的环境。

pip install airflow-provider-fivetran-async

配置

在Airflow用户界面中,为Fivetran配置一个连接。大多数连接配置字段将被留空。配置以下字段

  • 连接IDfivetran
  • 连接类型Fivetran
  • 登录:Fivetran API密钥
  • 密码:Fivetran API密钥

Fivetran账户设置API配置部分中查找Fivetran API密钥和密钥。有关Fivetran API认证的更多信息,请参阅我们的文档:Fivetran API认证

传感器假定连接ID设置为fivetran,但是如果您正在管理多个Fivetran账户,您可以将其设置为任何您喜欢的名称。请参阅示例DAG以了解如何指定自定义的连接ID

模块

Fivetran Operator Async

from fivetran_provider_async.operators import FivetranOperator

FivetranOperator提交Fivetran同步作业并在触发时监控其完成。

FivetranOperator要求您指定您要触发的Fivetran连接器的connector_id。您可以在Fivetran仪表板中配置的连接器的设置页面中找到connector_id

只要wait_for_completion=True(这是默认值),FivetranOperator将等待同步完成。建议您以可延迟的模式运行(这也是默认值)。如果wait_for_completion=False,则操作员将返回上次同步的时间戳。

通过以下方式将导入到您的DAG中

Fivetran Sensor Async

from fivetran_provider_async.sensors import FivetranSensor

FivetranSensor监控Fivetran同步作业的完成情况。使用FivetranSensor进行监控,确保仅在Fivetran同步作业完成后触发下游过程,从而确保数据一致性。

FivetranSensor要求您指定您要等待的Fivetran连接器的connector_id。您可以在Fivetran仪表板中配置的连接器的设置页面中找到connector_id

您可以使用多个FivetranSensor实例来监控多个Fivetran连接器。

FivetranSensor在以下两种情况下最有用

  1. Fivetran使用与Airflow调度器不同的调度器。
  2. 您在FivetranOperator中设置了wait_for_completion=False,并且您需要在以后等待FivetranOperator任务。(您可能想这样安排您的DAG,以便某些任务依赖于启动同步,而其他任务依赖于完成同步)。

如果您正在执行第1种模式,您可以将completed_after_time设置为data_interval_end或带有一些缓冲的data_interval_end

fivetran_sensor = FivetranSensor(
    task_id="wait_for_fivetran_externally_scheduled_sync",
    connector_id="bronzing_largely",
    poke_interval=5,
    completed_after_time="{{ data_interval_end + macros.timedelta(minutes=1) }}",
)

如果您正在执行第2种模式,您可以使用XCom将目标完成时间传递给传感器

fivetran_op = FivetranOperator(
    task_id="fivetran_sync_my_db",
    connector_id="bronzing_largely",
    wait_for_completion=False,
)

fivetran_sensor = FivetranSensor(
    task_id="wait_for_fivetran_db_sync",
    connector_id="bronzing_largely",
    poke_interval=5,
    completed_after_time="{{ task_instance.xcom_pull('fivetran_sync_op', key='return_value') }}",
)

fivetran_op >> fivetran_sensor

您还可以指定不带completed_after_timeFivetranSensor。在这种情况下,传感器将记录上次完成时间,并等待新的完成时间。

示例

请参阅示例目录中的示例DAG。

问题

请在我们的官方仓库https://github.com/astronomer/airflow-provider-fivetran-async中提交问题拉取请求

我们很高兴收到您的反馈。请将任何反馈发送到作者邮箱humans@astronomer.io

项目详情


下载文件

下载适用于您平台的应用文件。如果您不确定选择哪个,请了解更多关于安装包的信息。

源代码分布

airflow_provider_fivetran_async-2.1.0.tar.gz (29.8 kB 查看哈希值)

上传时间 源代码

构建分布

airflow_provider_fivetran_async-2.1.0-py3-none-any.whl (41.0 kB 查看哈希值)

上传时间 Python 3

支持者