跳转到主要内容

创建数据验证工作流程的简单框架。

项目描述

Version Build status Coverage License Documentation status

数据验证框架

本项目提供创建数据验证工作流程的简单工具。这些工作流程基于luigi库。

该框架的主要目标是将数据必须遵循的规范与实际测试数据的代码汇总到同一位置。这避免了存储规范的多份文档和存储代码的仓库。

安装

应使用pip安装此包

pip install data-validation-framework

使用

构建工作流程

构建新的工作流程很简单,如下面的示例所示

import luigi
import data_validation_framework as dvf


class ValidationTask1(dvf.task.ElementValidationTask):
    """Use the class docstring to describe the specifications of the ValidationTask1."""

    output_columns = {"col_name": None}

    @staticmethod
    def validation_function(row, output_path, *args, **kwargs):
        # Return the validation result for one row of the dataset
        if row["col_name"] <= 10:
            return dvf.result.ValidationResult(is_valid=True)
        else:
            return dvf.result.ValidationResult(
                is_valid=False,
                ret_code=1,
                comment="The value should always be <= 10"
            )


def external_validation_function(df, output_path, *args, **kwargs):
    # Update the dataset inplace here by setting values to the 'is_valid' column.
    # The 'ret_code' and 'comment' values are optional, they will be added to the report
    # in order to help the user to understand why the dataset did not pass the validation.

    # We can use the value from kwargs["param_value"] here.
    if int(kwargs["param_value"]) <= 10:
        df["is_valid"] = True
    else:
        df["is_valid"] = False
        df["ret_code"] = 1
        df["comment"] = "The value should always be <= 10"


class ValidationTask2(dvf.task.SetValidationTask):
    """In some cases you might want to keep the docstring to describe what a developer
    needs to know, not the end-user. In this case, you can use the ``__specifications__``
    attribute to store the specifications."""

    a_parameter = luigi.Parameter()

    __specifications__ = """Use the __specifications__ to describe the specifications of the
    ValidationTask2."""

    def inputs(self):
        return {ValidationTask1: {"col_name": "new_col_name_in_current_task"}}

    def kwargs(self):
        return {"param_value": self.a_parameter}

    validation_function = staticmethod(external_validation_function)


class ValidationWorkflow(dvf.task.ValidationWorkflow):
    """Use the global workflow specifications to give general context to the end-user."""

    def inputs(self):
        return {
            ValidationTask1: {},
            ValidationTask2: {},
        }

ValidationWorkflow类中只定义了需要进行验证的子任务。子任务可以是dvf.task.ElementValidationTaskdvf.task.SetValidationTask。在两种情况下,都可以定义这些子任务之间的关系,因为一个子任务可能需要另一个子任务的结果才能正常运行。这分为两步定义

  1. 在所需的任务中,应该定义一个output_columns属性,以便后续任务知道可用哪些数据,如上一个示例中的ValidationTask1所示。
  2. 在需要另一个任务的任务中,应该定义一个inputs方法,如上一个示例中的ValidationTask2所示。

dvf.task.ElementValidationTask的子类应返回一个dvf.result.ValidationResult对象。而dvf.task.SetValidationTask的子类应返回一个包含至少以下列["is_valid", "ret_code", "comment", "exception"]Pandas.DataFrame对象,并且与输入数据集具有相同的索引。

生成工作流的规范

可以使用以下luigi命令生成数据应遵循的规范

luigi --module test_validation ValidationWorkflow --log-level INFO --local-scheduler --result-path out --ValidationTask2-a-parameter 15 --specifications-only

运行工作流

可以使用以下luigi命令运行工作流(注意,模块test_validation必须包含在您的sys.path中)

luigi --module test_validation ValidationWorkflow --log-level INFO --local-scheduler --dataset-df dataset.csv --result-path out --ValidationTask2-a-parameter 15

此工作流将生成以下文件

  • out/report_ValidationWorkflow.pdf:PDF验证报告。
  • out/ValidationTask1/report.csv:包含ValidationTask1有效性值的CSV文件。
  • out/ValidationTask2/report.csv:包含ValidationTask2有效性值的CSV文件。
  • out/ValidationWorkflow/report.csv:包含完整工作流有效性值的CSV文件。

.. 注意:

As any `luigi <https://luigi.readthedocs.io/en/stable>`_ workflow, the values can be stored
into a `luigi.cfg` file instead of being passed to the CLI.

高级功能

需要常规Luigi任务

在某些情况下,可能需要在验证工作流中执行常规Luigi任务。在这种情况下,可以使用extra_requires()方法传递这些额外要求。然后在验证任务中,可以使用extra_input()方法获取这些额外要求的目标。

class TestTaskA(luigi.Task):

    def run(self):
        # Do something and write the 'target.file'

    def output(self):
        return target.OutputLocalTarget("target.file")

class TestTaskB(task.SetValidationTask):

    output_columns = {"extra_target_path": None}

    def kwargs(self):
        return {"extra_task_target_path": self.extra_input().path}

    def extra_requires(self):
        return TestTaskA()

    @staticmethod
    def validation_function(df, output_path, *args, **kwargs):
        df["is_valid"] = True
        df["extra_target_path"] = kwargs["extra_task_target_path"]

资助与致谢

本软件的开发得到了瑞士联邦理工学院(EPFL)洛桑联邦理工学院(EPFL)的研究中心蓝脑项目的资金支持。

有关许可证和作者,请分别参见LICENSE.txtAUTHORS.md

版权所有 © 2022-2023 蓝脑项目/EPFL

项目详情


下载文件

下载适合您平台的文件。如果您不确定选择哪个,请了解更多关于安装包的信息。

源分布

data_validation_framework-0.8.0.tar.gz (196.7 kB 查看哈希值)

上传时间

构建分布

data_validation_framework-0.8.0-py3-none-any.whl (26.1 kB 查看哈希值)

上传时间 Python 3

支持