跳转到主要内容

Apache Spark与RAPIDS和cuML的集成

项目描述

Spark Rapids ML (Python)

此PySpark兼容API利用RAPIDS cuML Python API提供许多常用ML算法的GPU加速实现。这些实现已适配使用PySpark进行分布式训练和推理。

安装

为简单起见,以下说明仅使用Spark本地模式,假设服务器至少有一个GPU。

首先,根据这些说明安装RAPIDS cuML。以CUDA Toolkit 11.8为例

conda create -n rapids-24.08 \
    -c rapidsai -c conda-forge -c nvidia \
    cuml=24.08 cuvs=24.08 python=3.9 cuda-version=11.8

注意:在测试时,我们建议使用conda或docker简化安装并隔离实验环境。一旦您有一个工作环境,如果需要,您可以尝试直接安装。

注意:您可以从rapids.ai中选择与您的环境兼容的最新版本。

一旦您有了conda环境,激活它并安装所需的软件包。

conda activate rapids-24.08

## for development access to notebooks, tests, and benchmarks
git clone --branch main https://github.com/NVIDIA/spark-rapids-ml.git
cd spark-rapids-ml/python
# install additional non-RAPIDS python dependencies for dev
pip install -r requirements_dev.txt
pip install -e .

## OPTIONAL: for package installation only
# install additional non-RAPIDS python dependencies
pip install -r https://raw.githubusercontent.com/NVIDIA/spark-rapids-ml/main/python/requirements.txt
pip install spark-rapids-ml

示例

这些示例展示了使用玩具数据集的API。然而,当使用需要大量计算的大数据集时,GPU更为有效。因此,一旦你对环境设置有信心,请使用更具代表性的数据集来评估GPU如何提高性能。

PySpark shell

线性回归

## pyspark --master local[*]
# from pyspark.ml.regression import LinearRegression
from spark_rapids_ml.regression import LinearRegression
from pyspark.ml.linalg import Vectors

df = spark.createDataFrame([
     (1.0, Vectors.dense(1.0, 0.0)),
     (0.0, Vectors.dense(0.0, 1.0))], ["label", "features"])

# number of partitions should match number of GPUs in Spark cluster
df = df.repartition(1)

lr = LinearRegression(regParam=0.0, solver="normal")
lr.setMaxIter(5)
lr.setRegParam(0.0)
lr.setFeaturesCol("features")
lr.setLabelCol("label")

model = lr.fit(df)

model.coefficients
# DenseVector([0.5, -0.5])

K-Means

## pyspark --master local[*]
# from pyspark.ml.clustering import KMeans
from spark_rapids_ml.clustering import KMeans
from pyspark.ml.linalg import Vectors
data = [(Vectors.dense([0.0, 0.0]),), (Vectors.dense([1.0, 1.0]),),
        (Vectors.dense([9.0, 8.0]),), (Vectors.dense([8.0, 9.0]),)]
df = spark.createDataFrame(data, ["features"])

# number of partitions should match number of GPUs in Spark cluster
df = df.repartition(1)

kmeans = KMeans(k=2)
kmeans.setSeed(1)
kmeans.setMaxIter(20)
kmeans.setFeaturesCol("features")

model = kmeans.fit(df)

centers = model.clusterCenters()
print(centers)
# [array([0.5, 0.5]), array([8.5, 8.5])]

model.setPredictionCol("newPrediction")
transformed = model.transform(df)
transformed.show()
# +----------+-------------+
# |  features|newPrediction|
# +----------+-------------+
# |[0.0, 0.0]|            1|
# |[1.0, 1.0]|            1|
# |[9.0, 8.0]|            0|
# |[8.0, 9.0]|            0|
# +--------+----------+-------------+
rows[0].newPrediction == rows[1].newPrediction
# True
rows[2].newPrediction == rows[3].newPrediction
# True

PCA

## pyspark --master local[*]
# from pyspark.ml.feature import PCA
from spark_rapids_ml.feature import PCA

data = [(Vectors.sparse(5, [(1, 1.0), (3, 7.0)]),),
        (Vectors.dense([2.0, 0.0, 3.0, 4.0, 5.0]),),
        (Vectors.dense([4.0, 0.0, 0.0, 6.0, 7.0]),)]
df = spark.createDataFrame(data,["features"])

# number of partitions should match number of GPUs in Spark cluster
df = df.repartition(1)

pca = PCA(k=2, inputCol="features")
pca.setOutputCol("pca_features")

model = pca.fit(df)

model.setOutputCol("output")
model.transform(df).collect()[0].output
# [-1.6485728230896184, -4.013282697765595]
model.explainedVariance
# DenseVector([0.7944, 0.2056])
model.pc
# DenseMatrix(5, 2, [0.4486, -0.133, 0.1252, -0.2165, 0.8477, -0.2842, -0.0562, 0.7636, -0.5653, -0.1156], False)

Jupyter Notebooks

要本地运行示例笔记本,请参阅这些说明

要在Databricks中运行示例笔记本(假设您已经有了Databricks账户),请遵循这些说明

API兼容性

虽然Spark Rapids ML API试图模仿PySpark ML API以最小化最终用户代码更改,但底层实现完全不同,因此存在一些差异。

  • 不支持的ML参数 - 一些PySpark ML算法的ML参数无法直接映射到其相应的cuML实现。在这些情况下,将忽略ML参数的默认值,如果由最终用户代码明确设置
    • 将打印警告(对于非关键情况,影响应最小,例如initSteps)。
    • 将引发异常(对于关键情况,可能极大地影响结果,例如weightCol)。
  • 不支持的函数 - 一些PySpark ML函数可能无法映射到底层的cuML实现,或者对于GPU来说可能没有意义。在这些情况下,如果调用该函数,将引发错误。
  • cuML参数 - 可能存在一些cuML特定的参数,这些参数可能有助于优化GPU性能。这些参数可以通过各种类构造函数提供,但它们不是通过getter和setter公开的,以避免与PySpark ML参数混淆。如果需要,可以通过cuml_params属性来观察它们。
  • 算法结果 - 由于GPU实现与其PySpark ML CPU对应项完全不同,因此生成的结果可能会有细微差异。这可能是由各种原因造成的,包括不同的优化、随机初始化或算法设计。虽然这些差异应该是相对较小的,但它们仍应在特定用例的上下文中进行审查,以确定它们是否在可接受的范围内。

示例

# from pyspark.ml.clustering import KMeans
from spark_rapids_ml.clustering import KMeans
from pyspark.ml.linalg import Vectors

data = [(Vectors.dense([0.0, 0.0]), 2.0), (Vectors.dense([1.0, 1.0]), 2.0),
        (Vectors.dense([9.0, 8.0]), 2.0), (Vectors.dense([8.0, 9.0]), 2.0)]
df = spark.createDataFrame(data, ["features", "weighCol"]).repartition(1)

# `k` is a Spark ML Param, `max_samples_per_batch` is a cuML parameter
kmeans = KMeans(k=3, max_samples_per_batch=16384)
kmeans.setK(2)  # setter is available for `k`, but not for `max_samples_per_batch`
kmeans.setInitSteps(10)  # non-critical unsupported param, prints a warning
# kmeans.setWeightCol("weight")  # critical unsupported param, raises an error

# show cuML-specific parameters
print(kmeans.cuml_params)
# {'n_clusters': 2, 'max_iter': 20, 'tol': 0.0001, 'verbose': False, 'random_state': 1909113551, 'init': 'scalable-k-means++', 'n_init': 1, 'oversampling_factor': 2.0, 'max_samples_per_batch': 16384}

model = kmeans.fit(df)

sample = df.head().features  # single example
# unsupported method, raises an error, since not optimal use-case for GPUs
# model.predict(sample)

centers = model.clusterCenters()
print(centers)  # slightly different results
# [[8.5, 8.5], [0.5, 0.5]]
# PySpark: [array([0.5, 0.5]), array([8.5, 8.5])]

启用无包导入更改的CLIs

使用包含在spark_rapids_ml中的某些实验性CLIs,导入来自pyspark.ml的估计器和模型并部署到pyspark应用程序脚本中,可以通过直接调用(在这种情况下,脚本中创建和配置了spark上下文和会话)或通过spark-submit进行加速,而无需更改包导入语句为spark_rapids_ml,如上述示例所示。在直接调用自包含的pyspark应用程序的情况下,可以使用以下命令

python -m spark_rapids_ml spark_enabled_application.py < application options >

如果使用spark-submit部署应用程序,可以使用以下包含的CLI(使用原始pip install spark-rapids-ml安装)

spark-rapids-submit --master < master > < other spark submit options > application.py < application options >

目前,任何不支持相应加速spark_rapids_ml对象的方法或属性都将导致错误。

API文档

项目详情


下载文件

下载适用于您的平台文件。如果您不确定选择哪个,请了解更多关于安装包的信息。

源代码分发

本版本没有提供源代码分发文件。请参阅有关生成分发存档的教程。

已构建的分发

spark_rapids_ml-24.8.0-486_7f8e779-py3-none-any.whl (111.9 kB 查看哈希值)

上传时间 Python 3

支持者