Apache Airflow的Great Expectations提供程序
项目描述
Apache Airflow Great Expectations 提供商
一组针对Great Expectations的Airflow操作符,Great Expectations是一个用于测试和验证数据的Python库。
版本警告
由于apply_default装饰器的删除,此版本的提供商需要Airflow 2.1.0+。如果您的Airflow版本小于2.1.0,并且想要安装此提供商版本,请首先将Airflow升级到至少版本2.1.0。否则,您的Airflow包版本将自动升级,您必须手动运行airflow upgrade db以完成迁移。
兼容性说明
- 此操作符目前仅与Great Expectations V3批请求API一起工作。如果您想结合V2批Kwargs API使用此操作符,则必须使用版本低于0.1.0的版本。
- 此操作符使用Great Expectations检查点而不是以前的ValidationOperators。
- 因此,此操作符需要Great Expectations >=v0.13.9,这从发布0.0.5开始被固定在requirements.txt中。
- Great Expectations版本0.13.8中存在一个会导致此操作符无法工作的错误。
- Great Expectations版本0.13.7及以下版本将与此操作符的版本0.0.4及以下版本兼容。
此软件包最近使用apache-airflow=2.4.3
和great-expectation=0.15.34
进行了单元测试。
以前,有一个单独的BigQuery操作符,用于促进GCP存储的使用。此功能现在已集成到核心Great Expectations库中,因此通用操作符可以与任何具有工作数据上下文和数据源的底层和SQL方言一起使用。
安装
先决条件:运行great-expectations
和apache-airflow
的环境 - 这些是此包的依赖项要求。
pip install airflow-provider-great-expectations
根据您的用例,您可能需要在Dockerfile中添加ENV AIRFLOW__CORE__ENABLE_XCOM_PICKLING=true
以启用XCOM在任务之间传递数据。
用法
该操作符需要一个DataContext来运行,可以指定为
- 一个包含基于yaml的DataContext配置的目录的路径
- 一个Great Expectations DataContextConfig对象
此外,还可以提供检查点,可以指定为
- 指定DataContext的检查点存储中已存在的检查点的名称
- 一个Great Expectations CheckpointConfig对象
尽管如果没有提供检查点,将构建一个默认的检查点。
该操作符还允许您传递一个包含kwargs的Python字典,这些kwargs将在运行时添加/替换到检查点中。
模块
Great Expectations Base Operator:Great Expectations的基础操作符。通过
from great_expectations_provider.operators.great_expectations import GreatExpectationsOperator
之前可用的电子邮件警报功能
版本0.0.7
中可用的电子邮件警报功能已被删除,以便使操作符的用途更加集中,并与运行Great Expectations验证等更相关。现在,基操作符的构造函数中有一个validation_failure_callback
参数,可以在Great Expectations框架本身提供的通知机制不足的情况下用于任何类型的失败通知。
示例
请参阅example_dags目录,以获取一个示例DAG,其中包含一些演示操作符功能的示例任务。
示例DAG可以通过两种方式之一进行测试
使用开源的Astro CLI(推荐)
-
使用Astro CLI初始化项目
-
将示例DAG复制到您的Astro项目的
dags/
文件夹 -
将此存储库中
include
文件夹中的目录复制到您的Astro项目的include
目录 -
将您的GCP
credentials.json
文件复制到Astro项目的根目录 -
在您的
Dockerfile
中添加以下内容,以安装airflow-provider-great-expectations
包,启用xcom pickling,并添加运行示例DAG所需的Airflow变量和连接RUN pip install --user airflow_provider_great_expectations ENV AIRFLOW__CORE__ENABLE_XCOM_PICKLING=True ENV GOOGLE_APPLICATION_CREDENTIALS=/usr/local/airflow/credentials.json ENV AIRFLOW_VAR_MY_PROJECT=<YOUR_GCP_PROJECT_ID> ENV AIRFLOW_VAR_MY_BUCKET=<YOUR_GCS_BUCKET> ENV AIRFLOW_VAR_MY_DATASET=<YOUR_BQ_DATASET> ENV AIRFLOW_VAR_MY_TABLE=<YOUR_BQ_TABLE> ENV AIRFLOW_CONN_MY_BIGQUERY_CONN_ID='google-cloud-platform://?extra__google_cloud_platform__scope=https%3A%2F%2Fwww.googleapis.com%2Fauth%2Fbigquery&extra__google_cloud_platform__project=bombora-dev&extra__google_cloud_platform__key_path=%2Fusr%2Flocal%2Fairflow%2Fairflow-gcp.bombora-dev.iam.gserviceaccount.com.json'
-
运行
astro dev start
在本地Airflow实例上查看DAG(您需要运行Docker)
使用纯Airflow安装:
- 将示例DAG添加到您的
dags/
文件夹 - 使
include/
中的great_expectations
和data
目录在您的环境中可用 - 将您的DAG文件中的
data_file
和ge_root_dir
路径更改为指向适当的目录 - 将
great-expectations/checkpoints/*.yml
中的路径更改为指向您数据文件的绝对路径 - 在您的
airflow.cfg
中将enable_xcom_pickling
的值更改为true
- 设置适当的Airflow变量和连接,如上所述使用
astro
CLI
开发
设置虚拟环境
可以使用任何虚拟环境工具,但最简单的方法可能是使用Python标准库中包含的venv
工具。
例如,可以使用以下命令为针对此包的开发创建虚拟环境(假设使用bash
)
# Create the virtual environment using venv:
$ python -m venv --prompt my-af-ge-venv .venv
# Activate the virtual environment:
$ . .venv/bin/activate
# Install the package and testing dependencies:
(my-af-ge-venv) $ pip install -e '.[tests]'
运行单元、集成和功能测试
完成上述步骤后,可以通过以下两种方法之一运行单元和集成测试。
使用pytest
pytest
库和CLI因其丰富的API以及它为您提供的对测试输出、测试标记等内容的额外控制而受到此项目和许多Python开发者的青睐。它作为requirements.txt
中的依赖项包含在内。
当在上述过程中创建的虚拟环境中运行简单的命令pytest -p no:warnings
时,如果所有测试通过,则提供简洁的输出,过滤掉可能由Airflow发出的弃用警告;如果不通过,则提供详细到必要的输出
(my-af-ge-venv) $ pytest -p no:warnings
=========================================================================================== test session starts ============================================================================================
platform darwin -- Python 3.7.4, pytest-6.2.4, py-1.10.0, pluggy-0.13.1
rootdir: /Users/jpayne/repos-bombora/bombora-airflow-provider-great-expectations, configfile: pytest.ini, testpaths: tests
plugins: anyio-3.3.0
collected 7 items
tests/operators/test_great_expectations.py ....... [100%]
============================================================================================ 7 passed in 11.99s ============================================================================================
功能测试
功能测试涉及简单地使用上述方法之一运行示例DAG,只是本地开发包需要安装到目标Airflow环境中。
再次强调,推荐的方法是使用Astro CLI
**此操作符处于开发初期阶段!请随时提交问题、PR或加入Great Expectations Slack中的#integration-airflow频道以获取反馈。感谢Pete DeJoy和Astronomer.io团队的支持。
项目详情
哈希值 for airflow_provider_great_expectations-0.2.9.tar.gz
算法 | 哈希摘要 | |
---|---|---|
SHA256 | ee1bca143e53716f37951e8abe2956be313a608cdd980e8b6e8ad45ca047adbf |
|
MD5 | 181be0aca10a882db02734e2aa5b6657 |
|
BLAKE2b-256 | 92e141e73da99d2641071436acf44e5792d385bb7e091e314d3934a844067055 |
哈希值 for airflow_provider_great_expectations-0.2.9-py3-none-any.whl
算法 | 哈希摘要 | |
---|---|---|
SHA256 | 52dd0205b049fb68f1da5d104d442bf01b2c1ec2afab88ece69eff72b4cc6ad2 |
|
MD5 | 25b3fadeb6881f1f1ca2ffee2210b886 |
|
BLAKE2b-256 | 1162c332072c426268395e2236fb241c1d6b350f0943e5882c09e14954fe5264 |