跳转到主要内容

Flake8插件,用于检查循环中PySpark withColumn的使用

项目描述

Flake8-pyspark-with-column

Upload Python Package PyPI - Downloads

入门

pip install flake8-pyspark-with-column
flake8 --select PSRPK001,PSPRT002,PSPRK003,PSPRK004

或者您可以将以下 tox.ini 文件添加到项目的根目录中

[flake8]
select = 
    PSPRK001,
    PSPRK002,
    PSPRK003,
    PSPRK004

关于

A flake8插件,用于检测在循环或内部使用 withColumn。根据PySpark文档中关于 withColumn 方法的说明

此方法在内部引入了投影。因此,多次调用它,例如,通过循环添加多个列,可以生成大的计划,这可能导致性能问题,甚至引发 StackOverflowException。为了避免这种情况,请使用 select() 一次添加多个列。

幕后发生了什么?

当您运行PySpark应用程序时,以下情况会发生

  1. Spark创建 Unresolved Logical Plan,它是解析SQL的结果
  2. Spark 对此计划进行分析以创建一个 已分析逻辑计划
  3. Spark 应用优化规则以创建一个 优化逻辑计划

spark-flow

withColumn 存在什么问题?它会在未解析的计划中创建一个单独的节点。因此,调用 withColumn 500 次将在未解析的计划中创建具有 500 个节点的计划。在分析期间,Spark 应该访问每个节点以检查该列是否存在并具有正确的数据类型。之后,Spark 将开始应用规则,但规则是递归地按计划每次应用一次,因此 500 次对 withColumn 的连续调用将需要应用相应的规则 500 次。所有这些可能会显着增加从 未解析逻辑计划优化逻辑计划 的时间。

bechmark

另一方面,无论我们想要添加多少列,withColumnsselect(*cols) 都会在计划中创建单个节点。

规则

此插件包含以下规则

  • PSPRK001:检测到循环中使用 withColumn
  • PSPRK002:检测到 reduce 内部使用 withColumn
  • PSPRK003:检测到循环中使用 withColumnRenamed
  • PSPRK004:检测到 reduce 内部使用 withColumnRenamed

示例

让我们假设我们想要将一个 ML 模型应用于我们的数据,但我们的模型需要双精度值,而我们的表中包含十进制值。目标是将所有 Decimal 列转换为 Double

withColumn 实现(不良示例)

def cast_to_double(df: DataFrame) -> DataFrame:
  for field in df.schema.fields:
    if isinstance(field.dataType, DecimalType):
      df = df.withColumn(field.name, col(field.name).cast(DoubleType()))
  return df

withColumn 实现(良好示例)

def cast_to_double(df: DataFrame) -> DataFrame:
  cols_to_select = []
  for field in df.schema.fields:
    if isinstance(field.dataType, DecimalType):
      cols_to_select.append(col(field.name).cast(DoubleType()).alias(field.name))
    else:
      cols_to_select.append(col(field.name))
  return df.select(*cols_to_select)

使用方法

flake8 %your-code-here%

screenshot of how it works

项目详情


下载文件

下载适合您平台的应用程序。如果您不确定选择哪个,请了解更多关于 安装包 的信息。

源分发

flake8_pyspark_with_column-0.0.4.tar.gz (8.6 kB 查看哈希)

上传时间

构建分发

flake8_pyspark_with_column-0.0.4-py2.py3-none-any.whl (8.2 kB 查看哈希)

上传时间 Python 2 Python 3

支持者

AWS AWS 云计算和安全赞助商 Datadog Datadog 监控 Fastly Fastly CDN Google Google 下载分析 Microsoft Microsoft PSF 赞助商 Pingdom Pingdom 监控 Sentry Sentry 错误记录 StatusPage StatusPage 状态页面