跳转到主要内容

将PySpark转换为通用的DataFrame API

项目描述

SQLFrame Logo

SQLFrame实现了PySpark DataFrame API,以便能够直接在数据库引擎上运行转换管道 - 无需Spark集群或依赖项。

SQLFrame目前支持以下引擎(开发中还有更多)

SQLFrame还有一个“独立”会话,可以用于生成SQL,而无需连接到任何数据库引擎。

SQLFrame非常适合

  • 希望利用引擎全部处理能力的用户
  • 希望在本地快速运行PySpark代码而不需要启动Spark会话的用户
  • 希望将DataFrame代码表示为SQL以便调试或与他人分享的用户
  • 希望在没有使用Spark进行处理的复杂性的情况下运行PySpark DataFrame代码的用户

安装

# BigQuery
pip install "sqlframe[bigquery]"
# DuckDB
pip install "sqlframe[duckdb]"
# Postgres
pip install "sqlframe[postgres]"
# Snowflake
pip install "sqlframe[snowflake]"
# Spark
pip install "sqlframe[spark]"
# Standalone
pip install sqlframe

请参阅特定引擎文档以获取额外的设置说明。

配置

SQLFrame为引擎执行生成一致准确且复杂的SQL。然而,当使用df.sql(optimize=True)时,它会产生更易于阅读的SQL。有关如何配置此输出以及如何利用OpenAI增强SQL的详细信息,请参阅生成SQL配置

SQLFrame默认使用Spark方言进行输入和输出。可以通过更改设置,使SQLFrame更接近您使用的引擎的原生DataFrame API。请参阅输入和输出方言配置

激活SQLFrame

SQLFrame可以替换pyspark导入或与之并行使用。要替换pyspark导入,请使用激活函数来设置要使用的引擎。

from sqlframe import activate

# Activate SQLFrame to run directly on DuckDB
activate(engine="duckdb")

from pyspark.sql import SparkSession
session = SparkSession.builder.getOrCreate()

SQLFrame也可以直接导入,这样既可以保留pyspark导入,又可以提供更接近引擎原生的DataFrame API。

from sqlframe.duckdb import DuckDBSession

session = DuckDBSession.builder.getOrCreate()

示例用法

from sqlframe import activate

# Activate SQLFrame to run directly on BigQuery
activate(engine="bigquery")

from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark.sql import Window

session = SparkSession.builder.getOrCreate()
table_path = '"bigquery-public-data".samples.natality'
# Top 5 years with the greatest year-over-year % change in new families with single child
df = (
  session.table(table_path)
  .where(F.col("ever_born") == 1)
  .groupBy("year")
  .agg(F.count("*").alias("num_single_child_families"))
  .withColumn(
    "last_year_num_single_child_families",
    F.lag(F.col("num_single_child_families"), 1).over(Window.orderBy("year"))
  )
  .withColumn(
    "percent_change",
    (F.col("num_single_child_families") - F.col("last_year_num_single_child_families"))
    / F.col("last_year_num_single_child_families")
  )
  .orderBy(F.abs(F.col("percent_change")).desc())
  .select(
    F.col("year").alias("year"),
    F.format_number("num_single_child_families", 0).alias("new families single child"),
    F.format_number(F.col("percent_change") * 100, 2).alias("percent change"),
  )
  .limit(5)
)
>>> df.sql(optimize=True)
WITH `t94228` AS (
  SELECT
    `natality`.`year` AS `year`,
    COUNT(*) AS `num_single_child_families`
  FROM `bigquery-public-data`.`samples`.`natality` AS `natality`
  WHERE
    `natality`.`ever_born` = 1
  GROUP BY
    `natality`.`year`
), `t39093` AS (
  SELECT
    `t94228`.`year` AS `year`,
    `t94228`.`num_single_child_families` AS `num_single_child_families`,
    LAG(`t94228`.`num_single_child_families`, 1) OVER (ORDER BY `t94228`.`year`) AS `last_year_num_single_child_families`
  FROM `t94228` AS `t94228`
)
SELECT
  `t39093`.`year` AS `year`,
  FORMAT('%\'.0f', ROUND(CAST(`t39093`.`num_single_child_families` AS FLOAT64), 0)) AS `new families single child`,
  FORMAT('%\'.2f', ROUND(CAST((((`t39093`.`num_single_child_families` - `t39093`.`last_year_num_single_child_families`) / `t39093`.`last_year_num_single_child_families`) * 100) AS FLOAT64), 2)) AS `percent change`
FROM `t39093` AS `t39093`
ORDER BY
  ABS(`percent_change`) DESC
LIMIT 5
>>> df.show()
+------+---------------------------+----------------+
| year | new families single child | percent change |
+------+---------------------------+----------------+
| 1989 |         1,650,246         |     25.02      |
| 1974 |          783,448          |     14.49      |
| 1977 |         1,057,379         |     11.38      |
| 1985 |         1,308,476         |     11.15      |
| 1975 |          868,985          |     10.92      |
+------+---------------------------+----------------+

项目详情


下载文件

下载适合您平台的文件。如果您不确定选择哪个,请了解更多关于安装包的信息。

源代码分发

sqlframe-3.3.1.tar.gz (29.0 MB 查看哈希值)

上传时间 源代码

构建分发

sqlframe-3.3.1-py3-none-any.whl (170.5 kB 查看哈希值)

上传时间 Python 3

支持者

AWS AWS 云计算和安全赞助商 Datadog Datadog 监控 Fastly Fastly CDN Google Google 下载分析 Microsoft Microsoft PSF 赞助商 Pingdom Pingdom 监控 Sentry Sentry 错误记录 StatusPage StatusPage 状态页面