Flake8插件,用于检查循环中PySpark withColumn的使用
项目描述
Flake8-pyspark-with-column
入门
pip install flake8-pyspark-with-column
flake8 --select PSRPK001,PSPRT002,PSPRK003,PSPRK004
或者您可以将以下 tox.ini 文件添加到项目的根目录中
[flake8]
select =
PSPRK001,
PSPRK002,
PSPRK003,
PSPRK004
关于
A flake8插件,用于检测在循环或内部使用 withColumn。根据PySpark文档中关于 withColumn 方法的说明
此方法在内部引入了投影。因此,多次调用它,例如,通过循环添加多个列,可以生成大的计划,这可能导致性能问题,甚至引发 StackOverflowException。为了避免这种情况,请使用 select() 一次添加多个列。
幕后发生了什么?
当您运行PySpark应用程序时,以下情况会发生
- Spark创建
Unresolved Logical Plan,它是解析SQL的结果 - Spark 对此计划进行分析以创建一个
已分析逻辑计划 - Spark 应用优化规则以创建一个
优化逻辑计划
withColumn 存在什么问题?它会在未解析的计划中创建一个单独的节点。因此,调用 withColumn 500 次将在未解析的计划中创建具有 500 个节点的计划。在分析期间,Spark 应该访问每个节点以检查该列是否存在并具有正确的数据类型。之后,Spark 将开始应用规则,但规则是递归地按计划每次应用一次,因此 500 次对 withColumn 的连续调用将需要应用相应的规则 500 次。所有这些可能会显着增加从 未解析逻辑计划 到 优化逻辑计划 的时间。
另一方面,无论我们想要添加多少列,withColumns 和 select(*cols) 都会在计划中创建单个节点。
规则
此插件包含以下规则
PSPRK001:检测到循环中使用withColumnPSPRK002:检测到reduce内部使用withColumnPSPRK003:检测到循环中使用withColumnRenamedPSPRK004:检测到reduce内部使用withColumnRenamed
示例
让我们假设我们想要将一个 ML 模型应用于我们的数据,但我们的模型需要双精度值,而我们的表中包含十进制值。目标是将所有 Decimal 列转换为 Double。
withColumn 实现(不良示例)
def cast_to_double(df: DataFrame) -> DataFrame:
for field in df.schema.fields:
if isinstance(field.dataType, DecimalType):
df = df.withColumn(field.name, col(field.name).cast(DoubleType()))
return df
无 withColumn 实现(良好示例)
def cast_to_double(df: DataFrame) -> DataFrame:
cols_to_select = []
for field in df.schema.fields:
if isinstance(field.dataType, DecimalType):
cols_to_select.append(col(field.name).cast(DoubleType()).alias(field.name))
else:
cols_to_select.append(col(field.name))
return df.select(*cols_to_select)
使用方法
flake8 %your-code-here%
项目详情
关闭
flake8_pyspark_with_column-0.0.4.tar.gz 的哈希
| 算法 | 哈希摘要 | |
|---|---|---|
| SHA256 | 14e221e267aaaa570a302c05111944985e9f69669f0a974afb8594468dcc117a |
|
| MD5 | dc9fd1d85c2cb4b7ba1892d3680c520d |
|
| BLAKE2b-256 | 27fed64091f77768945bc16d281bb261a261a78968a0ecdb661ac8ca0f296e2e |
关闭
flake8_pyspark_with_column-0.0.4-py2.py3-none-any.whl 的哈希
| 算法 | 哈希摘要 | |
|---|---|---|
| SHA256 | c3ca754f8fe3888244b73c837ccdafecefef2fbdd6f002983cde5c7a5d9013f1 |
|
| MD5 | 5c7eb9fcc39525908583bc5d5c7a34ed |
|
| BLAKE2b-256 | 8a63af4e5ec2d63b93ad6ac3c1a89396dde995cadd1916394af01512a691f97c |