Pyspark测试辅助库
项目描述
chispa
chispa提供快速PySpark测试辅助方法,可以输出描述性错误消息。
此库可以轻松编写高质量的PySpark代码。
有趣的事实:“chispa”在西班牙语中意味着Spark ;)
安装
使用pip install chispa
安装最新版本。
如果您使用Poetry,请使用poetry add chispa -G dev
将此库添加为开发依赖项。
列相等性
假设您有一个用于删除字符串中非单词字符的函数。
def remove_non_word_characters(col):
return F.regexp_replace(col, "[^\\w\\s]+", "")
创建一个SparkSession
,以便您可以创建DataFrame。
from pyspark.sql import SparkSession
spark = (SparkSession.builder
.master("local")
.appName("chispa")
.getOrCreate())
创建一个包含包含非单词字符的字符串的列的DataFrame,运行remove_non_word_characters
函数,并使用chispa的assert_column_equality
方法检查是否已删除所有这些字符。
import pytest
from chispa.column_comparer import assert_column_equality
import pyspark.sql.functions as F
def test_remove_non_word_characters_short():
data = [
("jo&&se", "jose"),
("**li**", "li"),
("#::luisa", "luisa"),
(None, None)
]
df = (spark.createDataFrame(data, ["name", "expected_name"])
.withColumn("clean_name", remove_non_word_characters(F.col("name"))))
assert_column_equality(df, "clean_name", "expected_name")
让我们编写另一个测试,以了解描述性错误消息如何帮助您轻松调试底层问题。
以下是失败的测试
def test_remove_non_word_characters_nice_error():
data = [
("matt7", "matt"),
("bill&", "bill"),
("isabela*", "isabela"),
(None, None)
]
df = (spark.createDataFrame(data, ["name", "expected_name"])
.withColumn("clean_name", remove_non_word_characters(F.col("name"))))
assert_column_equality(df, "clean_name", "expected_name")
以下是格式化的错误消息
您可以看到,matt7
/ matt
行的数据导致了错误(注意它被高亮显示为红色)。其他行被涂成蓝色,因为它们是相等的。
DataFrame相等性
我们还可以通过创建两个DataFrame并验证它们是否相等来测试remove_non_word_characters
方法。
创建两个DataFrame较慢且需要更多的代码,但比较整个DataFrame对于某些测试是必要的。
from chispa.dataframe_comparer import *
def test_remove_non_word_characters_long():
source_data = [
("jo&&se",),
("**li**",),
("#::luisa",),
(None,)
]
source_df = spark.createDataFrame(source_data, ["name"])
actual_df = source_df.withColumn(
"clean_name",
remove_non_word_characters(F.col("name"))
)
expected_data = [
("jo&&se", "jose"),
("**li**", "li"),
("#::luisa", "luisa"),
(None, None)
]
expected_df = spark.createDataFrame(expected_data, ["name", "clean_name"])
assert_df_equality(actual_df, expected_df)
让我们编写另一个会导致错误的测试,这样您就可以看到描述性的错误消息。
def test_remove_non_word_characters_long_error():
source_data = [
("matt7",),
("bill&",),
("isabela*",),
(None,)
]
source_df = spark.createDataFrame(source_data, ["name"])
actual_df = source_df.withColumn(
"clean_name",
remove_non_word_characters(F.col("name"))
)
expected_data = [
("matt7", "matt"),
("bill&", "bill"),
("isabela*", "isabela"),
(None, None)
]
expected_df = spark.createDataFrame(expected_data, ["name", "clean_name"])
assert_df_equality(actual_df, expected_df)
以下是格式化的错误消息
忽略行顺序
您可以轻松地比较DataFrame,忽略行的顺序。DataFrame的内容通常更重要,而不是行的顺序。
以下是df1
的内容
+--------+
|some_num|
+--------+
| 1|
| 2|
| 3|
+--------+
以下是df2
的内容
+--------+
|some_num|
+--------+
| 2|
| 1|
| 3|
+--------+
以下是忽略行顺序时确认df1
和df2
相等的步骤。
assert_df_equality(df1, df2, ignore_row_order=True)
如果您没有指定ignore_row_order
,则测试将因以下消息而失败
默认情况下,行不按顺序排列,因为排序会减慢函数的执行速度。
忽略列顺序
本节解释了如何比较DataFrame,忽略列的顺序。
假设您有以下df1
+----+----+
|num1|num2|
+----+----+
| 1| 7|
| 2| 8|
| 3| 9|
+----+----+
以下是df2
的内容
+----+----+
|num2|num1|
+----+----+
| 7| 1|
| 8| 2|
| 9| 3|
+----+----+
以下是忽略列顺序比较df1
和df2
相等性的步骤
assert_df_equality(df1, df2, ignore_column_order=True)
如果您在没有忽略列顺序的情况下运行assert_df_equality(df1, df2)
,则会看到以下错误消息
忽略可空性
模式中的每一列都有三个属性:名称、数据类型和可空属性。如果将nullable
设置为true,则列可以接受null值。
在比较DataFrame时,有时您想忽略可空属性。
假设您有以下df1
+-----+---+
| name|age|
+-----+---+
| juan| 7|
|bruna| 8|
+-----+---+
这是另一个df2
+-----+---+
| name|age|
+-----+---+
| juan| 7|
|bruna| 8|
+-----+---+
您可能会惊讶地发现,在这个例子中,df1
和df2
不相等,并且会因以下消息而失败
请查看这个人为的例子中的代码,以更好地理解错误
def ignore_nullable_property():
s1 = StructType([
StructField("name", StringType(), True),
StructField("age", IntegerType(), True)])
df1 = spark.createDataFrame([("juan", 7), ("bruna", 8)], s1)
s2 = StructType([
StructField("name", StringType(), True),
StructField("age", IntegerType(), False)])
df2 = spark.createDataFrame([("juan", 7), ("bruna", 8)], s2)
assert_df_equality(df1, df2)
通过添加一个标志,您可以在评估相等性时忽略可空属性
assert_df_equality(df1, df2, ignore_nullable=True)
在ArrayType()
中包含的元素也有一个可空属性,以及列模式的可空属性。在传递ignore_nullable=True
时,这些属性也被忽略。
再次,请查看以下代码,以了解ignore_nullable=True
绕过的错误
def ignore_nullable_property_array():
s1 = StructType([
StructField("name", StringType(), True),
StructField("coords", ArrayType(DoubleType(), True), True),])
df1 = spark.createDataFrame([("juan", [1.42, 3.5]), ("bruna", [2.76, 3.2])], s1)
s2 = StructType([
StructField("name", StringType(), True),
StructField("coords", ArrayType(DoubleType(), False), True),])
df2 = spark.createDataFrame([("juan", [1.42, 3.5]), ("bruna", [2.76, 3.2])], s2)
assert_df_equality(df1, df2)
允许NaN相等
Python有NaN(不是一个数字)值,并且默认情况下,两个NaN值被认为不相等。创建两个NaN值,比较它们,并确认它们默认情况下被认为不相等。
nan1 = float('nan')
nan2 = float('nan')
nan1 == nan2 # False
pandas将NaN值视为默认相等,但此库需要您设置标志才能将两个NaN值视为相等。
assert_df_equality(df1, df2, allow_nan_equality=True)
自定义格式化
您可以指定以下打印错误消息的自定义格式
from chispa import FormattingConfig
formats = FormattingConfig(
mismatched_rows={"color": "light_yellow"},
matched_rows={"color": "cyan", "style": "bold"},
mismatched_cells={"color": "purple"},
matched_cells={"color": "blue"},
)
assert_basic_rows_equality(df1.collect(), df2.collect(), formats=formats)
或类似地
from chispa import FormattingConfig, Color, Style
formats = FormattingConfig(
mismatched_rows={"color": Color.LIGHT_YELLOW},
matched_rows={"color": Color.CYAN, "style": Style.BOLD},
mismatched_cells={"color": Color.PURPLE},
matched_cells={"color": Color.BLUE},
)
assert_basic_rows_equality(df1.collect(), df2.collect(), formats=formats)
您还可以在conftest.py
中定义这些格式,并通过固定装置注入它们
@pytest.fixture()
def chispa_formats():
return FormattingConfig(
mismatched_rows={"color": "light_yellow"},
matched_rows={"color": "cyan", "style": "bold"},
mismatched_cells={"color": "purple"},
matched_cells={"color": "blue"},
)
def test_shows_assert_basic_rows_equality(chispa_formats):
...
assert_basic_rows_equality(df1.collect(), df2.collect(), formats=chispa_formats)
近似列相等
我们可以检查列是否近似相等,这对于浮点数比较特别有用。
这是一个创建具有两个浮点列的DataFrame并验证这些列近似相等的测试。在这个例子中,如果差异小于0.1,则认为值是近似相等的。
def test_approx_col_equality_same():
data = [
(1.1, 1.1),
(2.2, 2.15),
(3.3, 3.37),
(None, None)
]
df = spark.createDataFrame(data, ["num1", "num2"])
assert_approx_column_equality(df, "num1", "num2", 0.1)
这是一个列不相近的测试示例。
def test_approx_col_equality_different():
data = [
(1.1, 1.1),
(2.2, 2.15),
(3.3, 5.0),
(None, None)
]
df = spark.createDataFrame(data, ["num1", "num2"])
assert_approx_column_equality(df, "num1", "num2", 0.1)
这个失败的测试将输出可读的错误消息,以便轻松调试问题。
近似DataFrame相等
让我们创建两个DataFrame并确认它们近似相等。
def test_approx_df_equality_same():
data1 = [
(1.1, "a"),
(2.2, "b"),
(3.3, "c"),
(None, None)
]
df1 = spark.createDataFrame(data1, ["num", "letter"])
data2 = [
(1.05, "a"),
(2.13, "b"),
(3.3, "c"),
(None, None)
]
df2 = spark.createDataFrame(data2, ["num", "letter"])
assert_approx_df_equality(df1, df2, 0.1)
assert_approx_df_equality
方法很智能,它只为DataFrame中的浮点数执行近似相等性操作。对于字符串和其他类型,它将执行常规相等性。
让我们对两个不相等的DataFrame执行近似相等性比较。
def test_approx_df_equality_different():
data1 = [
(1.1, "a"),
(2.2, "b"),
(3.3, "c"),
(None, None)
]
df1 = spark.createDataFrame(data1, ["num", "letter"])
data2 = [
(1.1, "a"),
(5.0, "b"),
(3.3, "z"),
(None, None)
]
df2 = spark.createDataFrame(data2, ["num", "letter"])
assert_approx_df_equality(df1, df2, 0.1)
以下是输出的漂亮错误消息
模式不匹配消息
DataFrame等式消息在分析DataFrame的实际内容之前先执行模式比较。具有不同模式的DataFrame应该尽可能快速地报错。
让我们比较一个具有字符串列和整数列的DataFrame与一个具有两个整数列的DataFrame,以观察模式不匹配消息。
def test_schema_mismatch_message():
data1 = [
(1, "a"),
(2, "b"),
(3, "c"),
(None, None)
]
df1 = spark.createDataFrame(data1, ["num", "letter"])
data2 = [
(1, 6),
(2, 7),
(3, 8),
(None, None)
]
df2 = spark.createDataFrame(data2, ["num", "num2"])
assert_df_equality(df1, df2)
以下是错误消息
支持的PySpark / Python版本
chispa目前支持PySpark 2.4+和Python 3.5+
如果您使用的是较旧的Python版本,请使用chispa v0.8.2。
当chispa 1.x发布时,将停止支持PySpark 2。
基准测试
待办事项:需要将这些方法与spark-testing-base方法进行比较基准测试
在您的本地机器上开发chispa
鼓励您克隆和/或分支此存储库。
此项目使用Poetry进行打包和依赖关系管理。
- 使用
poetry install
设置虚拟环境 - 使用
poetry run pytest tests
运行测试
研究代码库是了解PySpark的绝佳方式!
贡献
鼓励任何人提交拉取请求、打开问题或提交错误报告。
如果他们做出了良好的贡献,我们将很乐意将人们提升为库维护者。
项目详情
下载文件
下载适合您平台的应用程序。如果您不确定选择哪个,请了解更多关于安装包的信息。