高可用性 (HA) DAG工具
项目描述
airflow-ha
高可用性 (HA) DAG工具
概述
此库提供了一个名为HighAvailabilityOperator
的操作符,它继承自PythonSensor
并运行用户提供的python_callable
。返回值可以触发以下操作
返回 | 结果 | 当前DAG运行结束状态 |
---|---|---|
(PASS, RETRIGGER) |
重新触发相同的DAG再次运行 | pass |
(PASS, STOP) |
完成DAG,直到下一次计划运行 | pass |
(FAIL, RETRIGGER) |
重新触发相同的DAG再次运行 | fail |
(FAIL, STOP) |
完成DAG,直到下一次计划运行 | fail |
(*, CONTINUE) |
继续运行传感器 | N/A |
注意:如果传感器超时,行为将与(Result.PASS, Action.RETRIGGER)
匹配。
示例 - 恒定运行
考虑以下DAG
with DAG(
dag_id="test-high-availability",
description="Test HA Operator",
schedule=timedelta(days=1),
start_date=datetime(2024, 1, 1),
catchup=False,
):
ha = HighAvailabilityOperator(
task_id="ha",
timeout=30,
poke_interval=5,
python_callable=lambda **kwargs: choice(
(
(Result.PASS, Action.CONTINUE),
(Result.PASS, Action.RETRIGGER),
(Result.PASS, Action.STOP),
(Result.FAIL, Action.CONTINUE),
(Result.FAIL, Action.RETRIGGER),
(Result.FAIL, Action.STOP),
)
),
)
pre = PythonOperator(task_id="pre", python_callable=lambda **kwargs: "test")
pre >> ha
retrigger_fail = PythonOperator(task_id="retrigger_fail", python_callable=lambda **kwargs: "test")
ha.retrigger_fail >> retrigger_fail
stop_fail = PythonOperator(task_id="stop_fail", python_callable=lambda **kwargs: "test")
ha.stop_fail >> stop_fail
retrigger_pass = PythonOperator(task_id="retrigger_pass", python_callable=lambda **kwargs: "test")
ha.retrigger_pass >> retrigger_pass
stop_pass = PythonOperator(task_id="stop_pass", python_callable=lambda **kwargs: "test")
ha.stop_pass >> stop_pass
这将生成以下拓扑结构的DAG
此DAG表现出酷炫的行为。如果检查返回CONTINUE
,则DAG将继续运行传感器。如果检查返回RETRIGGER
或间隔过期,则DAG将重新触发自己并完成。如果检查返回STOP
,则DAG将完成而不会重新触发自己。如果检查返回PASS
,则当前DAG运行将以成功状态结束。如果检查返回FAIL
,则当前DAG运行将以失败状态结束。
这允许人们构建“始终运行”的DAG,而不需要单个长时间阻塞的任务。
这个库用于构建airflow-supervisor,该库使用supervisor作为进程监控器,通过airflow-ha
检查和重启作业。
示例 - 递归
您也可以使用这个库构建递归DAGs——或者称为“循环DAGs”,尽管这个名字有些矛盾。
以下代码创建一个DAG,使用递减计数器触发自身,起始值为3。
with DAG(
dag_id="test-ha-counter",
description="Test HA Countdown",
schedule=timedelta(days=1),
start_date=datetime(2024, 1, 1),
catchup=False,
):
def _get_count(**kwargs):
# The default is 3
return kwargs['dag_run'].conf.get('counter', 3) - 1
get_count = PythonOperator(task_id="get-count", python_callable=_get_count)
def _keep_counting(**kwargs):
count = kwargs["task_instance"].xcom_pull(key="return_value", task_ids="get-count")
return (Result.PASS, Action.RETRIGGER) if count > 0 else (Result.PASS, Action.STOP) if count == 0 else (Result.FAIL, Action.STOP)
keep_counting = HighAvailabilityOperator(
task_id="ha",
timeout=30,
poke_interval=5,
python_callable=_keep_counting,
pass_trigger_kwargs={"conf": '''{"counter": {{ ti.xcom_pull(key="return_value", task_ids="get-count") }}}'''},
)
get_count >> keep_counting
许可证
本软件采用Apache 2.0许可证。有关详细信息,请参阅LICENSE文件。
项目详情
下载文件
下载您平台上的文件。如果您不确定选择哪个,请了解有关安装包的更多信息。
源分布
airflow_ha-0.1.2.tar.gz (9.5 kB 查看哈希)
构建分布
airflow_ha-0.1.2-py3-none-any.whl (9.3 kB 查看哈希)