跳转到主要内容

高可用性 (HA) DAG工具

项目描述

airflow-ha

高可用性 (HA) DAG工具

Build Status codecov License PyPI

概述

此库提供了一个名为HighAvailabilityOperator的操作符,它继承自PythonSensor并运行用户提供的python_callable。返回值可以触发以下操作

返回 结果 当前DAG运行结束状态
(PASS, RETRIGGER) 重新触发相同的DAG再次运行 pass
(PASS, STOP) 完成DAG,直到下一次计划运行 pass
(FAIL, RETRIGGER) 重新触发相同的DAG再次运行 fail
(FAIL, STOP) 完成DAG,直到下一次计划运行 fail
(*, CONTINUE) 继续运行传感器 N/A

注意:如果传感器超时,行为将与(Result.PASS, Action.RETRIGGER)匹配。

示例 - 恒定运行

考虑以下DAG

with DAG(
    dag_id="test-high-availability",
    description="Test HA Operator",
    schedule=timedelta(days=1),
    start_date=datetime(2024, 1, 1),
    catchup=False,
):
    ha = HighAvailabilityOperator(
        task_id="ha",
        timeout=30,
        poke_interval=5,
        python_callable=lambda **kwargs: choice(
            (
                (Result.PASS, Action.CONTINUE),
                (Result.PASS, Action.RETRIGGER),
                (Result.PASS, Action.STOP),
                (Result.FAIL, Action.CONTINUE),
                (Result.FAIL, Action.RETRIGGER),
                (Result.FAIL, Action.STOP),
            )
        ),
    )
    
    pre = PythonOperator(task_id="pre", python_callable=lambda **kwargs: "test")
    pre >> ha
    
    retrigger_fail = PythonOperator(task_id="retrigger_fail", python_callable=lambda **kwargs: "test")
    ha.retrigger_fail >> retrigger_fail

    stop_fail = PythonOperator(task_id="stop_fail", python_callable=lambda **kwargs: "test")
    ha.stop_fail >> stop_fail
    
    retrigger_pass = PythonOperator(task_id="retrigger_pass", python_callable=lambda **kwargs: "test")
    ha.retrigger_pass >> retrigger_pass

    stop_pass = PythonOperator(task_id="stop_pass", python_callable=lambda **kwargs: "test")
    ha.stop_pass >> stop_pass

这将生成以下拓扑结构的DAG

此DAG表现出酷炫的行为。如果检查返回CONTINUE,则DAG将继续运行传感器。如果检查返回RETRIGGER或间隔过期,则DAG将重新触发自己并完成。如果检查返回STOP,则DAG将完成而不会重新触发自己。如果检查返回PASS,则当前DAG运行将以成功状态结束。如果检查返回FAIL,则当前DAG运行将以失败状态结束。

这允许人们构建“始终运行”的DAG,而不需要单个长时间阻塞的任务。

这个库用于构建airflow-supervisor,该库使用supervisor作为进程监控器,通过airflow-ha检查和重启作业。

示例 - 递归

您也可以使用这个库构建递归DAGs——或者称为“循环DAGs”,尽管这个名字有些矛盾。

以下代码创建一个DAG,使用递减计数器触发自身,起始值为3。

with DAG(
    dag_id="test-ha-counter",
    description="Test HA Countdown",
    schedule=timedelta(days=1),
    start_date=datetime(2024, 1, 1),
    catchup=False,
):
    
    def _get_count(**kwargs):
        # The default is 3
        return kwargs['dag_run'].conf.get('counter', 3) - 1

    get_count = PythonOperator(task_id="get-count", python_callable=_get_count)

    def _keep_counting(**kwargs):
        count = kwargs["task_instance"].xcom_pull(key="return_value", task_ids="get-count")
        return (Result.PASS, Action.RETRIGGER) if count > 0 else (Result.PASS, Action.STOP) if count == 0 else (Result.FAIL, Action.STOP)

    keep_counting = HighAvailabilityOperator(
        task_id="ha",
        timeout=30,
        poke_interval=5,
        python_callable=_keep_counting,
        pass_trigger_kwargs={"conf": '''{"counter": {{ ti.xcom_pull(key="return_value", task_ids="get-count") }}}'''},
    )

    get_count >> keep_counting

许可证

本软件采用Apache 2.0许可证。有关详细信息,请参阅LICENSE文件。

项目详情


下载文件

下载您平台上的文件。如果您不确定选择哪个,请了解有关安装包的更多信息。

源分布

airflow_ha-0.1.2.tar.gz (9.5 kB 查看哈希)

上传时间

构建分布

airflow_ha-0.1.2-py3-none-any.whl (9.3 kB 查看哈希)

上传时间 Python 3

由以下支持

AWS AWS 云计算和安全赞助商 Datadog Datadog 监控 Fastly Fastly CDN Google Google 下载分析 Microsoft Microsoft PSF 赞助商 Pingdom Pingdom 监控 Sentry Sentry 错误记录 StatusPage StatusPage 状态页面