Airflow DAG本地测试框架。
项目描述
Airflow DAG不变性测试
注意: 欢迎贡献。我希望这个库将属于Airflow社区。
Dagcheck是一个断言DAG不变性的框架。dagcheck的用户可以定义要通过断言测试的DAG不变性,dagcheck将生成验证这些不变性的DAG运行场景。
Dagcheck的创建是为了让Airflow用户能够用这些特性编写他们的DAG测试
- 它们易于阅读和理解
- 它们不会协调真实的基础设施更改
- 它们在本地开发环境中运行
- 它们作为开发者的流程的一部分快速运行
- 它们可以在CI/CD中运行并捕捉未来的问题
dagcheck
对于复杂且随时间变化的DAG特别有用。来自dagcheck
的测试允许您将复杂的依赖性检查从您的脑海中卸载到自动测试。
示例
考虑这个复杂DAG的示例。这个DAG有多个可能的执行路径 - 在失败的情况下,我们可能想确保它不会泄露资源。例如,我们可能编写一个测试来检查如果创建资源成功,我们将在任何失败场景中独立地清理它。
例如,如果任务create_entry_group
成功,然后我们检查任务delete_entry_group
将始终运行,如下所示
example_dag = DagBag().get_dag("example_complex")
# First check: If we create an entry group, we want to make sure
# it will be cleaned up.
assert_that(
given(example_dag)
.when(task("create_entry_group"), succeeds())
.then(task("delete_entry_group"), will_run())
)
通过创建这个测试并在CI中运行它,我们可以快速确保我们的DAG无论变化多大都将按预期行为。
要查看API使用的其他示例,请查看我们的单元测试和我们的小型样本DAGs。
配置dagcheck
待办事项(pabloem)
注意事项和陷阱
Dagcheck通过模拟DAG执行场景来工作。
依赖于副作用的结果的DAGs
Dagcheck模拟DAG执行,但它不会编排任何更改。如果您的DAG执行的部分依赖于其他操作符的副作用,那么Dagcheck将不知道这一点。
例如,考虑一个执行数据库导出操作、检查这些文件的输出并用于其他目的的DAG。类似于
(
DatabaseExportOperator(
'data_warehouse_export'
destination='database_export_file',
...
) >>
CheckFileExistsOperator(
'check_export_went_well'
filename='database_export_file'
) >>
ArchiveFileInColdStorageOperator(
'save_backup_to_storage'
...
)
)
在上面的代码示例中,以下语句是正确的
- 如果数据库导出运行正常,那么文件存在检查应该成功,归档操作符将运行。
- 这是因为有一个隐含的假设,即如果
data_warehouse_export
运行正常(即成功),则check_export_went_well
将成功。
但以下dagcheck测试将失败
# Bad test example:
assert_that(
given(the_dag)
.when('data_warehouse_export', succeeds())
.then('save_backup_to_storage', will_run())
)
这个测试失败是因为Dagcheck不知道这个隐含的假设,并且假设在data_warehouse_export
和save_backup_to_storage
之间的中间任务可能仍然会失败。
有几种方法可以将此测试编写为与dagcheck兼容。下面是其中之一
# Good test example:
assert_that(
given(the_dag)
.when('data_warehouse_export', succeeds())
.then('check_export_went_well', will_run())
assert_that(
given(the_dag)
.when('check_export_went_well', succeeds())
.then('save_backup_to_storage', will_run())
)
在首次发布之前待办事项
- 确定库的名称(dagcheck?dagtest?flowtest?ilikedags?flowcheck?assertflow?)
- 确定这属于Airflow还是独立库
- 实现DAG故障检查和DAG假设检查器。
原始开发笔记
- 2022/09/16: 再次开始开发环境
我开始将库作为airflow/的一部分进行开发,后来将其放入airflow_play/dagcheck/
目录。因此,dagcheck/
目录中的许多导入路径都经过了修改。
目前,dagcheck测试需要一个运行的Airflow实例。为了设置dagcheck的本地开发环境,您需要运行
# From airflow_play/
# Activate your local virtualenv
. venv/bin/activate
# Run your standalone Airflow instance that runs beside the code
export AIRFLOW_HOME=~/codes/airflow_play/home/
airflow standalone
设置好后,您可以运行测试。
项目详情
dagcheck-0.1.2.tar.gz的哈希值
算法 | 哈希摘要 | |
---|---|---|
SHA256 | a4de65440e24eb35aa49b4cb0ab16cef82bbc0507b14d7dd98261952fd0fdd97 |
|
MD5 | ac2fa292dcb11784afea3d73dd73dbe9 |
|
BLAKE2b-256 | 429c3354fe85995f8ca6dd56972c404ade7fbc2dede38d083752327e910c68f4 |