跳转到主要内容

Airflow DAG本地测试框架。

项目描述

Airflow DAG不变性测试

注意: 欢迎贡献。我希望这个库将属于Airflow社区。

Dagcheck是一个断言DAG不变性的框架。dagcheck的用户可以定义要通过断言测试的DAG不变性,dagcheck将生成验证这些不变性的DAG运行场景。

Dagcheck的创建是为了让Airflow用户能够用这些特性编写他们的DAG测试

  • 它们易于阅读和理解
  • 它们不会协调真实的基础设施更改
  • 它们在本地开发环境中运行
  • 它们作为开发者的流程的一部分快速运行
  • 它们可以在CI/CD中运行并捕捉未来的问题

dagcheck对于复杂且随时间变化的DAG特别有用。来自dagcheck的测试允许您将复杂的依赖性检查从您的脑海中卸载到自动测试。

示例

考虑这个复杂DAG的示例。这个DAG有多个可能的执行路径 - 在失败的情况下,我们可能想确保它不会泄露资源。例如,我们可能编写一个测试来检查如果创建资源成功,我们将在任何失败场景中独立地清理它。

例如,如果任务create_entry_group成功,然后我们检查任务delete_entry_group始终运行,如下所示

example_dag = DagBag().get_dag("example_complex")

# First check: If we create an entry group, we want to make sure
# it will be cleaned up.
assert_that(
    given(example_dag)
    .when(task("create_entry_group"), succeeds())
    .then(task("delete_entry_group"), will_run())
)

通过创建这个测试并在CI中运行它,我们可以快速确保我们的DAG无论变化多大都将按预期行为。

要查看API使用的其他示例,请查看我们的单元测试和我们的小型样本DAGs

配置dagcheck

待办事项(pabloem)

注意事项和陷阱

Dagcheck通过模拟DAG执行场景来工作。

依赖于副作用的结果的DAGs

Dagcheck模拟DAG执行,但它不会编排任何更改。如果您的DAG执行的部分依赖于其他操作符的副作用,那么Dagcheck将不知道这一点。

例如,考虑一个执行数据库导出操作、检查这些文件的输出并用于其他目的的DAG。类似于

(
  DatabaseExportOperator(
    'data_warehouse_export'
    destination='database_export_file',
    ...
  ) >>
  CheckFileExistsOperator(
    'check_export_went_well'
    filename='database_export_file'
  ) >>
  ArchiveFileInColdStorageOperator(
    'save_backup_to_storage'
    ...
  )
)

在上面的代码示例中,以下语句是正确的

  • 如果数据库导出运行正常,那么文件存在检查应该成功,归档操作符将运行。
  • 这是因为有一个隐含的假设,即如果data_warehouse_export运行正常(即成功),则check_export_went_well将成功

但以下dagcheck测试将失败

# Bad test example:
assert_that(
  given(the_dag)
  .when('data_warehouse_export', succeeds())
  .then('save_backup_to_storage', will_run())
)

这个测试失败是因为Dagcheck不知道这个隐含的假设,并且假设在data_warehouse_exportsave_backup_to_storage之间的中间任务可能仍然会失败。

有几种方法可以将此测试编写为与dagcheck兼容。下面是其中之一

# Good test example:

assert_that(
  given(the_dag)
  .when('data_warehouse_export', succeeds())
  .then('check_export_went_well', will_run())

assert_that(
  given(the_dag)
  .when('check_export_went_well', succeeds())
  .then('save_backup_to_storage', will_run())
)

在首次发布之前待办事项

  • 确定库的名称(dagcheck?dagtest?flowtest?ilikedags?flowcheck?assertflow?)
  • 确定这属于Airflow还是独立库
  • 实现DAG故障检查和DAG假设检查器。

原始开发笔记

  • 2022/09/16: 再次开始开发环境

我开始将库作为airflow/的一部分进行开发,后来将其放入airflow_play/dagcheck/目录。因此,dagcheck/目录中的许多导入路径都经过了修改。

目前,dagcheck测试需要一个运行的Airflow实例。为了设置dagcheck的本地开发环境,您需要运行

# From airflow_play/

# Activate your local virtualenv
. venv/bin/activate

# Run your standalone Airflow instance that runs beside the code
export AIRFLOW_HOME=~/codes/airflow_play/home/
airflow standalone

设置好后,您可以运行测试。

项目详情


下载文件

下载您平台上的文件。如果您不确定选择哪个,请了解更多关于安装包的信息。

源分发

dagcheck-0.1.2.tar.gz (20.3 kB 查看哈希值)

上传时间

由以下赞助

AWS AWS 云计算和安全赞助商 Datadog Datadog 监控 Fastly Fastly CDN Google Google 下载分析 Microsoft Microsoft PSF 赞助商 Pingdom Pingdom 监控 Sentry Sentry 错误记录 StatusPage StatusPage 状态页面