将PySpark转换为通用的DataFrame API
项目描述
SQLFrame实现了PySpark DataFrame API,以便能够直接在数据库引擎上运行转换管道 - 无需Spark集群或依赖项。
SQLFrame目前支持以下引擎(开发中还有更多)
SQLFrame还有一个“独立”会话,可以用于生成SQL,而无需连接到任何数据库引擎。
SQLFrame非常适合
- 希望利用引擎全部处理能力的用户
- 希望在本地快速运行PySpark代码而不需要启动Spark会话的用户
- 希望将DataFrame代码表示为SQL以便调试或与他人分享的用户
- 希望在没有使用Spark进行处理的复杂性的情况下运行PySpark DataFrame代码的用户
安装
# BigQuery
pip install "sqlframe[bigquery]"
# DuckDB
pip install "sqlframe[duckdb]"
# Postgres
pip install "sqlframe[postgres]"
# Snowflake
pip install "sqlframe[snowflake]"
# Spark
pip install "sqlframe[spark]"
# Standalone
pip install sqlframe
请参阅特定引擎文档以获取额外的设置说明。
配置
SQLFrame为引擎执行生成一致准确且复杂的SQL。然而,当使用df.sql(optimize=True)时,它会产生更易于阅读的SQL。有关如何配置此输出以及如何利用OpenAI增强SQL的详细信息,请参阅生成SQL配置。
SQLFrame默认使用Spark方言进行输入和输出。可以通过更改设置,使SQLFrame更接近您使用的引擎的原生DataFrame API。请参阅输入和输出方言配置。
激活SQLFrame
SQLFrame可以替换pyspark导入或与之并行使用。要替换pyspark导入,请使用激活函数来设置要使用的引擎。
from sqlframe import activate
# Activate SQLFrame to run directly on DuckDB
activate(engine="duckdb")
from pyspark.sql import SparkSession
session = SparkSession.builder.getOrCreate()
SQLFrame也可以直接导入,这样既可以保留pyspark导入,又可以提供更接近引擎原生的DataFrame API。
from sqlframe.duckdb import DuckDBSession
session = DuckDBSession.builder.getOrCreate()
示例用法
from sqlframe import activate
# Activate SQLFrame to run directly on BigQuery
activate(engine="bigquery")
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark.sql import Window
session = SparkSession.builder.getOrCreate()
table_path = '"bigquery-public-data".samples.natality'
# Top 5 years with the greatest year-over-year % change in new families with single child
df = (
session.table(table_path)
.where(F.col("ever_born") == 1)
.groupBy("year")
.agg(F.count("*").alias("num_single_child_families"))
.withColumn(
"last_year_num_single_child_families",
F.lag(F.col("num_single_child_families"), 1).over(Window.orderBy("year"))
)
.withColumn(
"percent_change",
(F.col("num_single_child_families") - F.col("last_year_num_single_child_families"))
/ F.col("last_year_num_single_child_families")
)
.orderBy(F.abs(F.col("percent_change")).desc())
.select(
F.col("year").alias("year"),
F.format_number("num_single_child_families", 0).alias("new families single child"),
F.format_number(F.col("percent_change") * 100, 2).alias("percent change"),
)
.limit(5)
)
>>> df.sql(optimize=True)
WITH `t94228` AS (
SELECT
`natality`.`year` AS `year`,
COUNT(*) AS `num_single_child_families`
FROM `bigquery-public-data`.`samples`.`natality` AS `natality`
WHERE
`natality`.`ever_born` = 1
GROUP BY
`natality`.`year`
), `t39093` AS (
SELECT
`t94228`.`year` AS `year`,
`t94228`.`num_single_child_families` AS `num_single_child_families`,
LAG(`t94228`.`num_single_child_families`, 1) OVER (ORDER BY `t94228`.`year`) AS `last_year_num_single_child_families`
FROM `t94228` AS `t94228`
)
SELECT
`t39093`.`year` AS `year`,
FORMAT('%\'.0f', ROUND(CAST(`t39093`.`num_single_child_families` AS FLOAT64), 0)) AS `new families single child`,
FORMAT('%\'.2f', ROUND(CAST((((`t39093`.`num_single_child_families` - `t39093`.`last_year_num_single_child_families`) / `t39093`.`last_year_num_single_child_families`) * 100) AS FLOAT64), 2)) AS `percent change`
FROM `t39093` AS `t39093`
ORDER BY
ABS(`percent_change`) DESC
LIMIT 5
>>> df.show()
+------+---------------------------+----------------+
| year | new families single child | percent change |
+------+---------------------------+----------------+
| 1989 | 1,650,246 | 25.02 |
| 1974 | 783,448 | 14.49 |
| 1977 | 1,057,379 | 11.38 |
| 1985 | 1,308,476 | 11.15 |
| 1975 | 868,985 | 10.92 |
+------+---------------------------+----------------+
项目详情
下载文件
下载适合您平台的文件。如果您不确定选择哪个,请了解更多关于安装包的信息。
源代码分发
sqlframe-3.3.1.tar.gz (29.0 MB 查看哈希值)
构建分发
sqlframe-3.3.1-py3-none-any.whl (170.5 kB 查看哈希值)
关闭
sqlframe-3.3.1.tar.gz的哈希值
算法 | 哈希摘要 | |
---|---|---|
SHA256 | 73ea5e4944071c0949fcc7d7773a0906609e0e1556c36b6b46f4f01848818b71 |
|
MD5 | 58fb10e062d609b434d36616b37e8937 |
|
BLAKE2b-256 | c6472c151021e9d16636c452af6a9c414ee4b7ecfc127deffdcf112d33f21c2c |
关闭
sqlframe-3.3.1-py3-none-any.whl的哈希值
算法 | 哈希摘要 | |
---|---|---|
SHA256 | a7a467b06e190ddc8f315994d933b0fadebb8291ffee39a73c69200ad5b19755 |
|
MD5 | 9c34b9acbf8686b9d003ab4a2a32ed72 |
|
BLAKE2b-256 | bd2013457bda9a0b4bc713e59435a7629f22bd73d3d9e4de3c7363dc2e067a1a |