跳转到主要内容

Apache Airflow的Great Expectations提供程序

项目描述

Apache Airflow Great Expectations 提供商

一组针对Great Expectations的Airflow操作符,Great Expectations是一个用于测试和验证数据的Python库。

版本警告

由于apply_default装饰器的删除,此版本的提供商需要Airflow 2.1.0+。如果您的Airflow版本小于2.1.0,并且想要安装此提供商版本,请首先将Airflow升级到至少版本2.1.0。否则,您的Airflow包版本将自动升级,您必须手动运行airflow upgrade db以完成迁移。

兼容性说明

  • 此操作符目前仅与Great Expectations V3批请求API一起工作。如果您想结合V2批Kwargs API使用此操作符,则必须使用版本低于0.1.0的版本。
  • 此操作符使用Great Expectations检查点而不是以前的ValidationOperators。
  • 因此,此操作符需要Great Expectations >=v0.13.9,这从发布0.0.5开始被固定在requirements.txt中。
  • Great Expectations版本0.13.8中存在一个会导致此操作符无法工作的错误。
  • Great Expectations版本0.13.7及以下版本将与此操作符的版本0.0.4及以下版本兼容。

此软件包最近使用apache-airflow=2.4.3great-expectation=0.15.34进行了单元测试。

以前,有一个单独的BigQuery操作符,用于促进GCP存储的使用。此功能现在已集成到核心Great Expectations库中,因此通用操作符可以与任何具有工作数据上下文和数据源的底层和SQL方言一起使用。

安装

先决条件:运行great-expectationsapache-airflow的环境 - 这些是此包的依赖项要求。

pip install airflow-provider-great-expectations

根据您的用例,您可能需要在Dockerfile中添加ENV AIRFLOW__CORE__ENABLE_XCOM_PICKLING=true以启用XCOM在任务之间传递数据。

用法

该操作符需要一个DataContext来运行,可以指定为

  1. 一个包含基于yaml的DataContext配置的目录的路径
  2. 一个Great Expectations DataContextConfig对象

此外,还可以提供检查点,可以指定为

  1. 指定DataContext的检查点存储中已存在的检查点的名称
  2. 一个Great Expectations CheckpointConfig对象

尽管如果没有提供检查点,将构建一个默认的检查点。

该操作符还允许您传递一个包含kwargs的Python字典,这些kwargs将在运行时添加/替换到检查点中。

模块

Great Expectations Base Operator:Great Expectations的基础操作符。通过

from great_expectations_provider.operators.great_expectations import GreatExpectationsOperator

之前可用的电子邮件警报功能

版本0.0.7中可用的电子邮件警报功能已被删除,以便使操作符的用途更加集中,并与运行Great Expectations验证等更相关。现在,基操作符的构造函数中有一个validation_failure_callback参数,可以在Great Expectations框架本身提供的通知机制不足的情况下用于任何类型的失败通知。

示例

请参阅example_dags目录,以获取一个示例DAG,其中包含一些演示操作符功能的示例任务。

示例DAG可以通过两种方式之一进行测试

使用开源的Astro CLI(推荐)

  1. 使用Astro CLI初始化项目

  2. 将示例DAG复制到您的Astro项目的dags/文件夹

  3. 将此存储库中include文件夹中的目录复制到您的Astro项目的include目录

  4. 将您的GCP credentials.json文件复制到Astro项目的根目录

  5. 在您的Dockerfile中添加以下内容,以安装airflow-provider-great-expectations包,启用xcom pickling,并添加运行示例DAG所需的Airflow变量和连接

    RUN pip install --user airflow_provider_great_expectations
    ENV AIRFLOW__CORE__ENABLE_XCOM_PICKLING=True
    ENV GOOGLE_APPLICATION_CREDENTIALS=/usr/local/airflow/credentials.json
    ENV AIRFLOW_VAR_MY_PROJECT=<YOUR_GCP_PROJECT_ID>
    ENV AIRFLOW_VAR_MY_BUCKET=<YOUR_GCS_BUCKET>
    ENV AIRFLOW_VAR_MY_DATASET=<YOUR_BQ_DATASET>
    ENV AIRFLOW_VAR_MY_TABLE=<YOUR_BQ_TABLE>
    ENV AIRFLOW_CONN_MY_BIGQUERY_CONN_ID='google-cloud-platform://?extra__google_cloud_platform__scope=https%3A%2F%2Fwww.googleapis.com%2Fauth%2Fbigquery&extra__google_cloud_platform__project=bombora-dev&extra__google_cloud_platform__key_path=%2Fusr%2Flocal%2Fairflow%2Fairflow-gcp.bombora-dev.iam.gserviceaccount.com.json'
    
  6. 运行astro dev start在本地Airflow实例上查看DAG(您需要运行Docker)

使用纯Airflow安装:

  1. 将示例DAG添加到您的dags/文件夹
  2. 使include/中的great_expectationsdata目录在您的环境中可用
  3. 将您的DAG文件中的data_filege_root_dir路径更改为指向适当的目录
  4. great-expectations/checkpoints/*.yml中的路径更改为指向您数据文件的绝对路径
  5. 在您的airflow.cfg中将enable_xcom_pickling的值更改为true
  6. 设置适当的Airflow变量和连接,如上所述使用astro CLI

开发

设置虚拟环境

可以使用任何虚拟环境工具,但最简单的方法可能是使用Python标准库中包含的venv工具。

例如,可以使用以下命令为针对此包的开发创建虚拟环境(假设使用bash

# Create the virtual environment using venv:
$ python -m venv --prompt my-af-ge-venv .venv

# Activate the virtual environment:
$ . .venv/bin/activate

# Install the package and testing dependencies:
(my-af-ge-venv) $ pip install -e '.[tests]'

运行单元、集成和功能测试

完成上述步骤后,可以通过以下两种方法之一运行单元和集成测试。

使用pytest

pytest库和CLI因其丰富的API以及它为您提供的对测试输出、测试标记等内容的额外控制而受到此项目和许多Python开发者的青睐。它作为requirements.txt中的依赖项包含在内。

当在上述过程中创建的虚拟环境中运行简单的命令pytest -p no:warnings时,如果所有测试通过,则提供简洁的输出,过滤掉可能由Airflow发出的弃用警告;如果不通过,则提供详细到必要的输出

(my-af-ge-venv) $ pytest -p no:warnings
=========================================================================================== test session starts ============================================================================================
platform darwin -- Python 3.7.4, pytest-6.2.4, py-1.10.0, pluggy-0.13.1
rootdir: /Users/jpayne/repos-bombora/bombora-airflow-provider-great-expectations, configfile: pytest.ini, testpaths: tests
plugins: anyio-3.3.0
collected 7 items

tests/operators/test_great_expectations.py .......                                                                                                                                                   [100%]

============================================================================================ 7 passed in 11.99s ============================================================================================

功能测试

功能测试涉及简单地使用上述方法之一运行示例DAG,只是本地开发包需要安装到目标Airflow环境中。

再次强调,推荐的方法是使用Astro CLI

**此操作符处于开发初期阶段!请随时提交问题、PR或加入Great Expectations Slack中的#integration-airflow频道以获取反馈。感谢Pete DeJoyAstronomer.io团队的支持。

项目详情


下载文件

下载适用于您平台的应用文件。如果您不确定选择哪个,请了解有关安装包的更多信息。

源分布

构建分布

支持