Apache Airflow的Fivetran异步提供程序
项目描述
Fivetran异步提供程序用于Apache Airflow
此软件包提供了一个异步操作符、传感器和钩子,将Fivetran集成到Apache Airflow中。FivetranSensor
允许您在运行下游进程之前监视Fivetran同步作业的完成情况。FivetranOperator
提交Fivetran同步作业,并在触发器上轮询其状态。由于异步传感器或操作符在触发器上进行轮询时可以释放工作槽位,因此它们在资源消耗方面比传统的“同步”传感器和操作符要少。
Fivetran自动化您的数据管道,Airflow自动化您的数据处理。
安装
先决条件:运行apache-airflow
的环境。
pip install airflow-provider-fivetran-async
配置
在Airflow用户界面中,为Fivetran配置一个连接。大多数连接配置字段将被留空。配置以下字段
连接ID
:fivetran
连接类型
:Fivetran
登录
:Fivetran API密钥密码
:Fivetran API密钥
在Fivetran账户设置的API配置部分中查找Fivetran API密钥和密钥。有关Fivetran API认证的更多信息,请参阅我们的文档:Fivetran API认证。
传感器假定连接ID
设置为fivetran
,但是如果您正在管理多个Fivetran账户,您可以将其设置为任何您喜欢的名称。请参阅示例DAG以了解如何指定自定义的连接ID
。
模块
Fivetran Operator Async
from fivetran_provider_async.operators import FivetranOperator
FivetranOperator
提交Fivetran同步作业并在触发时监控其完成。
FivetranOperator
要求您指定您要触发的Fivetran连接器的connector_id
。您可以在Fivetran仪表板中配置的连接器的设置页面中找到connector_id
。
只要wait_for_completion=True
(这是默认值),FivetranOperator
将等待同步完成。建议您以可延迟的模式运行(这也是默认值)。如果wait_for_completion=False
,则操作员将返回上次同步的时间戳。
通过以下方式将导入到您的DAG中
Fivetran Sensor Async
from fivetran_provider_async.sensors import FivetranSensor
FivetranSensor
监控Fivetran同步作业的完成情况。使用FivetranSensor
进行监控,确保仅在Fivetran同步作业完成后触发下游过程,从而确保数据一致性。
FivetranSensor
要求您指定您要等待的Fivetran连接器的connector_id
。您可以在Fivetran仪表板中配置的连接器的设置页面中找到connector_id
。
您可以使用多个FivetranSensor
实例来监控多个Fivetran连接器。
FivetranSensor
在以下两种情况下最有用
- Fivetran使用与Airflow调度器不同的调度器。
- 您在
FivetranOperator
中设置了wait_for_completion=False
,并且您需要在以后等待FivetranOperator
任务。(您可能想这样安排您的DAG,以便某些任务依赖于启动同步,而其他任务依赖于完成同步)。
如果您正在执行第1种模式,您可以将completed_after_time
设置为data_interval_end
或带有一些缓冲的data_interval_end
fivetran_sensor = FivetranSensor(
task_id="wait_for_fivetran_externally_scheduled_sync",
connector_id="bronzing_largely",
poke_interval=5,
completed_after_time="{{ data_interval_end + macros.timedelta(minutes=1) }}",
)
如果您正在执行第2种模式,您可以使用XCom将目标完成时间传递给传感器
fivetran_op = FivetranOperator(
task_id="fivetran_sync_my_db",
connector_id="bronzing_largely",
wait_for_completion=False,
)
fivetran_sensor = FivetranSensor(
task_id="wait_for_fivetran_db_sync",
connector_id="bronzing_largely",
poke_interval=5,
completed_after_time="{{ task_instance.xcom_pull('fivetran_sync_op', key='return_value') }}",
)
fivetran_op >> fivetran_sensor
您还可以指定不带completed_after_time
的FivetranSensor
。在这种情况下,传感器将记录上次完成时间,并等待新的完成时间。
示例
请参阅示例目录中的示例DAG。
问题
请在我们的官方仓库https://github.com/astronomer/airflow-provider-fivetran-async中提交问题和拉取请求
我们很高兴收到您的反馈。请将任何反馈发送到作者邮箱humans@astronomer.io。
项目详情
哈希值 for airflow_provider_fivetran_async-2.1.0-py3-none-any.whl
算法 | 哈希摘要 | |
---|---|---|
SHA256 | 4dbba63b2b93114b2d0a84869b9eaa2e5da5225c9fa1c407417cffc4bba35b6f |
|
MD5 | d3283d8272d9f204e92615283e687ba3 |
|
BLAKE2b-256 | eefa08c0c977f6224630c72ac8e77620e7d16b5bd166f4777a232b853c17ebd5 |