跳转到主要内容

Apache Airflow中的经济实惠的Databricks工作流程

项目描述

弃用通知

随着 0.3.0 版本的 astro-provider-databricks 包发布,此提供者已被弃用,将不再接收更新。我们建议迁移到官方的 apache-airflow-providers-databricks>= 6.8.0 以获取最新功能和支持。对于此存储库中弃用的操作符和传感器,迁移到官方 Apache Airflow Databricks 提供者,只需按照以下示例更改 DAG 代码中的导入路径即可。

以前使用的导入路径(现在已弃用) 建议使用的导入路径
from astro_databricks.operators.notebook import DatabricksNotebookOperator from airflow.providers.databricks.operators.databricks import DatabricksNotebookOperator
from astro_databricks.operators.workflow import DatabricksWorkflowTaskGroup from airflow.providers.databricks.operators.databricks_workflow import DatabricksWorkflowTaskGroup
from astro_databricks.operators.common import DatabricksTaskOperator from airflow.providers.databricks.operators.databricks import DatabricksTaskOperator
from astro_databricks.plugins.plugin import AstroDatabricksPlugin from airflow.providers.airflow.providers.databricks.plugins.databricks_workflow import DatabricksWorkflowPlugin

存档

Airflow 中的 Databricks 工作流

Astro Databricks 提供者是 Apache Airflow 提供者,用于使用 Airflow 作为编写界面来编写 Databricks 工作流。将您的 Databricks 笔记本作为 Databricks 工作流运行可以导致 75% 的成本降低(通用计算 $0.40/DBU,作业计算 $0.07/DBU)。

虽然这是由 Astronomer 维护的,但任何人使用 Airflow 都可以使用它 - 您不需要成为 Astronomer 客户才能使用它。

在 Airflow 中定义您的 Databricks 工作流有以下一些优点

通过 Databricks 通过 Airflow
编写界面 通过 Databricks UI 的基于 Web 的界面 通过 Airflow DAG 代码
工作流计算定价
笔记本代码在源代码管理中
工作流结构在源代码管理中
从头开始重试
重试单个任务
工作流内的任务组
从其他 DAG 触发工作流
工作流级别的参数

示例

以下 Airflow DAG 说明了如何使用 DatabricksTaskGroupDatabricksNotebookOperator 在 Airflow 中定义 Databricks 工作流

from pendulum import datetime

from airflow.decorators import dag, task_group
from airflow.operators.trigger_dagrun import TriggerDagRunOperator
from astro_databricks import DatabricksNotebookOperator, DatabricksWorkflowTaskGroup

# define your cluster spec - can have from 1 to many clusters
job_cluster_spec = [
   {
      "job_cluster_key": "astro_databricks",
      "new_cluster": {
         "cluster_name": "",
         # ...
      },
   }
]

@dag(start_date=datetime(2023, 1, 1), schedule_interval="@daily", catchup=False)
def databricks_workflow_example():
   # the task group is a context manager that will create a Databricks Workflow
   with DatabricksWorkflowTaskGroup(
      group_id="example_databricks_workflow",
      databricks_conn_id="databricks_default",
      job_clusters=job_cluster_spec,
      # you can specify common fields here that get shared to all notebooks
      notebook_packages=[
         { "pypi": { "package": "pandas" } },
      ],
      # notebook_params supports templating
      notebook_params={
         "start_time": "{{ ds }}",
      }
   ) as workflow:
      notebook_1 = DatabricksNotebookOperator(
         task_id="notebook_1",
         databricks_conn_id="databricks_default",
         notebook_path="/Shared/notebook_1",
         source="WORKSPACE",
         # job_cluster_key corresponds to the job_cluster_key in the job_cluster_spec
         job_cluster_key="astro_databricks",
         # you can add to packages & params at the task level
         notebook_packages=[
            { "pypi": { "package": "scikit-learn" } },
         ],
         notebook_params={
            "end_time": "{{ macros.ds_add(ds, 1) }}",
         }
      )

      # you can embed task groups for easier dependency management
      @task_group(group_id="inner_task_group")
      def inner_task_group():
         notebook_2 = DatabricksNotebookOperator(
            task_id="notebook_2",
            databricks_conn_id="databricks_default",
            notebook_path="/Shared/notebook_2",
            source="WORKSPACE",
            job_cluster_key="astro_databricks",
         )

         notebook_3 = DatabricksNotebookOperator(
            task_id="notebook_3",
            databricks_conn_id="databricks_default",
            notebook_path="/Shared/notebook_3",
            source="WORKSPACE",
            job_cluster_key="astro_databricks",
         )

      notebook_4 = DatabricksNotebookOperator(
         task_id="notebook_4",
         databricks_conn_id="databricks_default",
         notebook_path="/Shared/notebook_4",
         source="WORKSPACE",
         job_cluster_key="astro_databricks",
      )

      notebook_1 >> inner_task_group() >> notebook_4

   trigger_workflow_2 = TriggerDagRunOperator(
      task_id="trigger_workflow_2",
      trigger_dag_id="workflow_2",
      execution_date="{{ next_execution_date }}",
   )

   workflow >> trigger_workflow_2

databricks_workflow_example_dag = databricks_workflow_example()

Airflow UI

Airflow UI

Databricks UI

Databricks UI

快速入门

查看以下快速入门指南

文档

该文档是工作正在进行中--我们旨在遵循 Diátaxis 系统

变更日志

Astro Databricks 遵循 语义版本控制 进行发布。阅读 变更日志 了解每个版本引入的更改的更多信息。

贡献指南

欢迎所有贡献、错误报告、错误修复、文档改进、增强和想法。

阅读 贡献指南 了解如何贡献的详细概述。

贡献者和维护者应遵守贡献者行为准则

许可

Apache 许可证 2.0

项目详情


下载文件

下载适合您平台的文件。如果您不确定选择哪个,请了解有关安装包的更多信息。

源分布

astro_provider_databricks-0.3.0.tar.gz (2.2 MB 查看哈希)

上传时间

构建分布

astro_provider_databricks-0.3.0-py3-none-any.whl (11.4 kB 查看哈希)

上传时间 Python 3

由以下提供支持

AWS AWS 云计算和安全赞助商 Datadog Datadog 监控 Fastly Fastly CDN Google Google 下载分析 Microsoft Microsoft PSF 赞助商 Pingdom Pingdom 监控 Sentry Sentry 错误日志 StatusPage StatusPage 状态页面