跳转到主要内容

Pyspark测试辅助库

项目描述

chispa

image PyPI - Downloads PyPI version

chispa提供快速PySpark测试辅助方法,可以输出描述性错误消息。

此库可以轻松编写高质量的PySpark代码。

有趣的事实:“chispa”在西班牙语中意味着Spark ;)

安装

使用pip install chispa安装最新版本。

如果您使用Poetry,请使用poetry add chispa -G dev将此库添加为开发依赖项。

列相等性

假设您有一个用于删除字符串中非单词字符的函数。

def remove_non_word_characters(col):
    return F.regexp_replace(col, "[^\\w\\s]+", "")

创建一个SparkSession,以便您可以创建DataFrame。

from pyspark.sql import SparkSession

spark = (SparkSession.builder
  .master("local")
  .appName("chispa")
  .getOrCreate())

创建一个包含包含非单词字符的字符串的列的DataFrame,运行remove_non_word_characters函数,并使用chispa的assert_column_equality方法检查是否已删除所有这些字符。

import pytest

from chispa.column_comparer import assert_column_equality
import pyspark.sql.functions as F

def test_remove_non_word_characters_short():
    data = [
        ("jo&&se", "jose"),
        ("**li**", "li"),
        ("#::luisa", "luisa"),
        (None, None)
    ]
    df = (spark.createDataFrame(data, ["name", "expected_name"])
        .withColumn("clean_name", remove_non_word_characters(F.col("name"))))
    assert_column_equality(df, "clean_name", "expected_name")

让我们编写另一个测试,以了解描述性错误消息如何帮助您轻松调试底层问题。

以下是失败的测试

def test_remove_non_word_characters_nice_error():
    data = [
        ("matt7", "matt"),
        ("bill&", "bill"),
        ("isabela*", "isabela"),
        (None, None)
    ]
    df = (spark.createDataFrame(data, ["name", "expected_name"])
        .withColumn("clean_name", remove_non_word_characters(F.col("name"))))
    assert_column_equality(df, "clean_name", "expected_name")

以下是格式化的错误消息

ColumnsNotEqualError

您可以看到,matt7 / matt 行的数据导致了错误(注意它被高亮显示为红色)。其他行被涂成蓝色,因为它们是相等的。

DataFrame相等性

我们还可以通过创建两个DataFrame并验证它们是否相等来测试remove_non_word_characters方法。

创建两个DataFrame较慢且需要更多的代码,但比较整个DataFrame对于某些测试是必要的。

from chispa.dataframe_comparer import *

def test_remove_non_word_characters_long():
    source_data = [
        ("jo&&se",),
        ("**li**",),
        ("#::luisa",),
        (None,)
    ]
    source_df = spark.createDataFrame(source_data, ["name"])

    actual_df = source_df.withColumn(
        "clean_name",
        remove_non_word_characters(F.col("name"))
    )

    expected_data = [
        ("jo&&se", "jose"),
        ("**li**", "li"),
        ("#::luisa", "luisa"),
        (None, None)
    ]
    expected_df = spark.createDataFrame(expected_data, ["name", "clean_name"])

    assert_df_equality(actual_df, expected_df)

让我们编写另一个会导致错误的测试,这样您就可以看到描述性的错误消息。

def test_remove_non_word_characters_long_error():
    source_data = [
        ("matt7",),
        ("bill&",),
        ("isabela*",),
        (None,)
    ]
    source_df = spark.createDataFrame(source_data, ["name"])

    actual_df = source_df.withColumn(
        "clean_name",
        remove_non_word_characters(F.col("name"))
    )

    expected_data = [
        ("matt7", "matt"),
        ("bill&", "bill"),
        ("isabela*", "isabela"),
        (None, None)
    ]
    expected_df = spark.createDataFrame(expected_data, ["name", "clean_name"])

    assert_df_equality(actual_df, expected_df)

以下是格式化的错误消息

DataFramesNotEqualError

忽略行顺序

您可以轻松地比较DataFrame,忽略行的顺序。DataFrame的内容通常更重要,而不是行的顺序。

以下是df1的内容

+--------+
|some_num|
+--------+
|       1|
|       2|
|       3|
+--------+

以下是df2的内容

+--------+
|some_num|
+--------+
|       2|
|       1|
|       3|
+--------+

以下是忽略行顺序时确认df1df2相等的步骤。

assert_df_equality(df1, df2, ignore_row_order=True)

如果您没有指定ignore_row_order,则测试将因以下消息而失败

ignore_row_order_false

默认情况下,行不按顺序排列,因为排序会减慢函数的执行速度。

忽略列顺序

本节解释了如何比较DataFrame,忽略列的顺序。

假设您有以下df1

+----+----+
|num1|num2|
+----+----+
|   1|   7|
|   2|   8|
|   3|   9|
+----+----+

以下是df2的内容

+----+----+
|num2|num1|
+----+----+
|   7|   1|
|   8|   2|
|   9|   3|
+----+----+

以下是忽略列顺序比较df1df2相等性的步骤

assert_df_equality(df1, df2, ignore_column_order=True)

如果您在没有忽略列顺序的情况下运行assert_df_equality(df1, df2),则会看到以下错误消息

ignore_column_order_false

忽略可空性

模式中的每一列都有三个属性:名称、数据类型和可空属性。如果将nullable设置为true,则列可以接受null值。

在比较DataFrame时,有时您想忽略可空属性。

假设您有以下df1

+-----+---+
| name|age|
+-----+---+
| juan|  7|
|bruna|  8|
+-----+---+

这是另一个df2

+-----+---+
| name|age|
+-----+---+
| juan|  7|
|bruna|  8|
+-----+---+

您可能会惊讶地发现,在这个例子中,df1df2不相等,并且会因以下消息而失败

nullable_off_error

请查看这个人为的例子中的代码,以更好地理解错误

def ignore_nullable_property():
    s1 = StructType([
       StructField("name", StringType(), True),
       StructField("age", IntegerType(), True)])
    df1 = spark.createDataFrame([("juan", 7), ("bruna", 8)], s1)
    s2 = StructType([
       StructField("name", StringType(), True),
       StructField("age", IntegerType(), False)])
    df2 = spark.createDataFrame([("juan", 7), ("bruna", 8)], s2)
    assert_df_equality(df1, df2)

通过添加一个标志,您可以在评估相等性时忽略可空属性

assert_df_equality(df1, df2, ignore_nullable=True)

ArrayType()中包含的元素也有一个可空属性,以及列模式的可空属性。在传递ignore_nullable=True时,这些属性也被忽略。

再次,请查看以下代码,以了解ignore_nullable=True绕过的错误

def ignore_nullable_property_array():
    s1 = StructType([
        StructField("name", StringType(), True),
        StructField("coords", ArrayType(DoubleType(), True), True),])
    df1 = spark.createDataFrame([("juan", [1.42, 3.5]), ("bruna", [2.76, 3.2])], s1)
    s2 = StructType([
        StructField("name", StringType(), True),
        StructField("coords", ArrayType(DoubleType(), False), True),])
    df2 = spark.createDataFrame([("juan", [1.42, 3.5]), ("bruna", [2.76, 3.2])], s2)
    assert_df_equality(df1, df2)

允许NaN相等

Python有NaN(不是一个数字)值,并且默认情况下,两个NaN值被认为不相等。创建两个NaN值,比较它们,并确认它们默认情况下被认为不相等。

nan1 = float('nan')
nan2 = float('nan')
nan1 == nan2 # False

pandas将NaN值视为默认相等,但此库需要您设置标志才能将两个NaN值视为相等。

assert_df_equality(df1, df2, allow_nan_equality=True)

自定义格式化

您可以指定以下打印错误消息的自定义格式

from chispa import FormattingConfig

formats = FormattingConfig(
        mismatched_rows={"color": "light_yellow"},
        matched_rows={"color": "cyan", "style": "bold"},
        mismatched_cells={"color": "purple"},
        matched_cells={"color": "blue"},
    )

assert_basic_rows_equality(df1.collect(), df2.collect(), formats=formats)

或类似地

from chispa import FormattingConfig, Color, Style

formats = FormattingConfig(
        mismatched_rows={"color": Color.LIGHT_YELLOW},
        matched_rows={"color": Color.CYAN, "style": Style.BOLD},
        mismatched_cells={"color": Color.PURPLE},
        matched_cells={"color": Color.BLUE},
    )

assert_basic_rows_equality(df1.collect(), df2.collect(), formats=formats)

您还可以在conftest.py中定义这些格式,并通过固定装置注入它们

@pytest.fixture()
def chispa_formats():
    return FormattingConfig(
        mismatched_rows={"color": "light_yellow"},
        matched_rows={"color": "cyan", "style": "bold"},
        mismatched_cells={"color": "purple"},
        matched_cells={"color": "blue"},
    )

def test_shows_assert_basic_rows_equality(chispa_formats):
  ...
  assert_basic_rows_equality(df1.collect(), df2.collect(), formats=chispa_formats)

custom_formats

近似列相等

我们可以检查列是否近似相等,这对于浮点数比较特别有用。

这是一个创建具有两个浮点列的DataFrame并验证这些列近似相等的测试。在这个例子中,如果差异小于0.1,则认为值是近似相等的。

def test_approx_col_equality_same():
    data = [
        (1.1, 1.1),
        (2.2, 2.15),
        (3.3, 3.37),
        (None, None)
    ]
    df = spark.createDataFrame(data, ["num1", "num2"])
    assert_approx_column_equality(df, "num1", "num2", 0.1)

这是一个列不相近的测试示例。

def test_approx_col_equality_different():
    data = [
        (1.1, 1.1),
        (2.2, 2.15),
        (3.3, 5.0),
        (None, None)
    ]
    df = spark.createDataFrame(data, ["num1", "num2"])
    assert_approx_column_equality(df, "num1", "num2", 0.1)

这个失败的测试将输出可读的错误消息,以便轻松调试问题。

ColumnsNotEqualError

近似DataFrame相等

让我们创建两个DataFrame并确认它们近似相等。

def test_approx_df_equality_same():
    data1 = [
        (1.1, "a"),
        (2.2, "b"),
        (3.3, "c"),
        (None, None)
    ]
    df1 = spark.createDataFrame(data1, ["num", "letter"])

    data2 = [
        (1.05, "a"),
        (2.13, "b"),
        (3.3, "c"),
        (None, None)
    ]
    df2 = spark.createDataFrame(data2, ["num", "letter"])

    assert_approx_df_equality(df1, df2, 0.1)

assert_approx_df_equality方法很智能,它只为DataFrame中的浮点数执行近似相等性操作。对于字符串和其他类型,它将执行常规相等性。

让我们对两个不相等的DataFrame执行近似相等性比较。

def test_approx_df_equality_different():
    data1 = [
        (1.1, "a"),
        (2.2, "b"),
        (3.3, "c"),
        (None, None)
    ]
    df1 = spark.createDataFrame(data1, ["num", "letter"])

    data2 = [
        (1.1, "a"),
        (5.0, "b"),
        (3.3, "z"),
        (None, None)
    ]
    df2 = spark.createDataFrame(data2, ["num", "letter"])

    assert_approx_df_equality(df1, df2, 0.1)

以下是输出的漂亮错误消息

DataFramesNotEqualError

模式不匹配消息

DataFrame等式消息在分析DataFrame的实际内容之前先执行模式比较。具有不同模式的DataFrame应该尽可能快速地报错。

让我们比较一个具有字符串列和整数列的DataFrame与一个具有两个整数列的DataFrame,以观察模式不匹配消息。

def test_schema_mismatch_message():
    data1 = [
        (1, "a"),
        (2, "b"),
        (3, "c"),
        (None, None)
    ]
    df1 = spark.createDataFrame(data1, ["num", "letter"])

    data2 = [
        (1, 6),
        (2, 7),
        (3, 8),
        (None, None)
    ]
    df2 = spark.createDataFrame(data2, ["num", "num2"])

    assert_df_equality(df1, df2)

以下是错误消息

SchemasNotEqualError

支持的PySpark / Python版本

chispa目前支持PySpark 2.4+和Python 3.5+

如果您使用的是较旧的Python版本,请使用chispa v0.8.2。

当chispa 1.x发布时,将停止支持PySpark 2。

基准测试

待办事项:需要将这些方法与spark-testing-base方法进行比较基准测试

在您的本地机器上开发chispa

鼓励您克隆和/或分支此存储库。

此项目使用Poetry进行打包和依赖关系管理。

  • 使用poetry install设置虚拟环境
  • 使用poetry run pytest tests运行测试

研究代码库是了解PySpark的绝佳方式!

贡献

鼓励任何人提交拉取请求、打开问题或提交错误报告。

如果他们做出了良好的贡献,我们将很乐意将人们提升为库维护者。

项目详情


下载文件

下载适合您平台的应用程序。如果您不确定选择哪个,请了解更多关于安装包的信息。

源分布

chispa-0.10.1.tar.gz (16.4 kB 查看散列)

上传时间:

构建分布

chispa-0.10.1-py3-none-any.whl (16.5 kB 查看散列)

上传时间: Python 3

由以下支持

AWS AWS 云计算和安全赞助商 Datadog Datadog 监控 Fastly Fastly CDN Google Google 下载分析 Microsoft Microsoft PSF 赞助商 Pingdom Pingdom 监控 Sentry Sentry 错误日志 StatusPage StatusPage 状态页面