Apache Spark与RAPIDS和cuML的集成
项目描述
Spark Rapids ML (Python)
此PySpark兼容API利用RAPIDS cuML Python API提供许多常用ML算法的GPU加速实现。这些实现已适配使用PySpark进行分布式训练和推理。
安装
为简单起见,以下说明仅使用Spark本地模式,假设服务器至少有一个GPU。
首先,根据这些说明安装RAPIDS cuML。以CUDA Toolkit 11.8为例
conda create -n rapids-24.08 \
-c rapidsai -c conda-forge -c nvidia \
cuml=24.08 cuvs=24.08 python=3.9 cuda-version=11.8
注意:在测试时,我们建议使用conda或docker简化安装并隔离实验环境。一旦您有一个工作环境,如果需要,您可以尝试直接安装。
注意:您可以从rapids.ai中选择与您的环境兼容的最新版本。
一旦您有了conda环境,激活它并安装所需的软件包。
conda activate rapids-24.08
## for development access to notebooks, tests, and benchmarks
git clone --branch main https://github.com/NVIDIA/spark-rapids-ml.git
cd spark-rapids-ml/python
# install additional non-RAPIDS python dependencies for dev
pip install -r requirements_dev.txt
pip install -e .
## OPTIONAL: for package installation only
# install additional non-RAPIDS python dependencies
pip install -r https://raw.githubusercontent.com/NVIDIA/spark-rapids-ml/main/python/requirements.txt
pip install spark-rapids-ml
示例
这些示例展示了使用玩具数据集的API。然而,当使用需要大量计算的大数据集时,GPU更为有效。因此,一旦你对环境设置有信心,请使用更具代表性的数据集来评估GPU如何提高性能。
PySpark shell
线性回归
## pyspark --master local[*]
# from pyspark.ml.regression import LinearRegression
from spark_rapids_ml.regression import LinearRegression
from pyspark.ml.linalg import Vectors
df = spark.createDataFrame([
(1.0, Vectors.dense(1.0, 0.0)),
(0.0, Vectors.dense(0.0, 1.0))], ["label", "features"])
# number of partitions should match number of GPUs in Spark cluster
df = df.repartition(1)
lr = LinearRegression(regParam=0.0, solver="normal")
lr.setMaxIter(5)
lr.setRegParam(0.0)
lr.setFeaturesCol("features")
lr.setLabelCol("label")
model = lr.fit(df)
model.coefficients
# DenseVector([0.5, -0.5])
K-Means
## pyspark --master local[*]
# from pyspark.ml.clustering import KMeans
from spark_rapids_ml.clustering import KMeans
from pyspark.ml.linalg import Vectors
data = [(Vectors.dense([0.0, 0.0]),), (Vectors.dense([1.0, 1.0]),),
(Vectors.dense([9.0, 8.0]),), (Vectors.dense([8.0, 9.0]),)]
df = spark.createDataFrame(data, ["features"])
# number of partitions should match number of GPUs in Spark cluster
df = df.repartition(1)
kmeans = KMeans(k=2)
kmeans.setSeed(1)
kmeans.setMaxIter(20)
kmeans.setFeaturesCol("features")
model = kmeans.fit(df)
centers = model.clusterCenters()
print(centers)
# [array([0.5, 0.5]), array([8.5, 8.5])]
model.setPredictionCol("newPrediction")
transformed = model.transform(df)
transformed.show()
# +----------+-------------+
# | features|newPrediction|
# +----------+-------------+
# |[0.0, 0.0]| 1|
# |[1.0, 1.0]| 1|
# |[9.0, 8.0]| 0|
# |[8.0, 9.0]| 0|
# +--------+----------+-------------+
rows[0].newPrediction == rows[1].newPrediction
# True
rows[2].newPrediction == rows[3].newPrediction
# True
PCA
## pyspark --master local[*]
# from pyspark.ml.feature import PCA
from spark_rapids_ml.feature import PCA
data = [(Vectors.sparse(5, [(1, 1.0), (3, 7.0)]),),
(Vectors.dense([2.0, 0.0, 3.0, 4.0, 5.0]),),
(Vectors.dense([4.0, 0.0, 0.0, 6.0, 7.0]),)]
df = spark.createDataFrame(data,["features"])
# number of partitions should match number of GPUs in Spark cluster
df = df.repartition(1)
pca = PCA(k=2, inputCol="features")
pca.setOutputCol("pca_features")
model = pca.fit(df)
model.setOutputCol("output")
model.transform(df).collect()[0].output
# [-1.6485728230896184, -4.013282697765595]
model.explainedVariance
# DenseVector([0.7944, 0.2056])
model.pc
# DenseMatrix(5, 2, [0.4486, -0.133, 0.1252, -0.2165, 0.8477, -0.2842, -0.0562, 0.7636, -0.5653, -0.1156], False)
Jupyter Notebooks
要本地运行示例笔记本,请参阅这些说明。
要在Databricks中运行示例笔记本(假设您已经有了Databricks账户),请遵循这些说明。
API兼容性
虽然Spark Rapids ML API试图模仿PySpark ML API以最小化最终用户代码更改,但底层实现完全不同,因此存在一些差异。
- 不支持的ML参数 - 一些PySpark ML算法的ML参数无法直接映射到其相应的cuML实现。在这些情况下,将忽略ML参数的默认值,如果由最终用户代码明确设置
- 将打印警告(对于非关键情况,影响应最小,例如
initSteps
)。 - 将引发异常(对于关键情况,可能极大地影响结果,例如
weightCol
)。
- 将打印警告(对于非关键情况,影响应最小,例如
- 不支持的函数 - 一些PySpark ML函数可能无法映射到底层的cuML实现,或者对于GPU来说可能没有意义。在这些情况下,如果调用该函数,将引发错误。
- cuML参数 - 可能存在一些cuML特定的参数,这些参数可能有助于优化GPU性能。这些参数可以通过各种类构造函数提供,但它们不是通过getter和setter公开的,以避免与PySpark ML参数混淆。如果需要,可以通过
cuml_params
属性来观察它们。 - 算法结果 - 由于GPU实现与其PySpark ML CPU对应项完全不同,因此生成的结果可能会有细微差异。这可能是由各种原因造成的,包括不同的优化、随机初始化或算法设计。虽然这些差异应该是相对较小的,但它们仍应在特定用例的上下文中进行审查,以确定它们是否在可接受的范围内。
示例
# from pyspark.ml.clustering import KMeans
from spark_rapids_ml.clustering import KMeans
from pyspark.ml.linalg import Vectors
data = [(Vectors.dense([0.0, 0.0]), 2.0), (Vectors.dense([1.0, 1.0]), 2.0),
(Vectors.dense([9.0, 8.0]), 2.0), (Vectors.dense([8.0, 9.0]), 2.0)]
df = spark.createDataFrame(data, ["features", "weighCol"]).repartition(1)
# `k` is a Spark ML Param, `max_samples_per_batch` is a cuML parameter
kmeans = KMeans(k=3, max_samples_per_batch=16384)
kmeans.setK(2) # setter is available for `k`, but not for `max_samples_per_batch`
kmeans.setInitSteps(10) # non-critical unsupported param, prints a warning
# kmeans.setWeightCol("weight") # critical unsupported param, raises an error
# show cuML-specific parameters
print(kmeans.cuml_params)
# {'n_clusters': 2, 'max_iter': 20, 'tol': 0.0001, 'verbose': False, 'random_state': 1909113551, 'init': 'scalable-k-means++', 'n_init': 1, 'oversampling_factor': 2.0, 'max_samples_per_batch': 16384}
model = kmeans.fit(df)
sample = df.head().features # single example
# unsupported method, raises an error, since not optimal use-case for GPUs
# model.predict(sample)
centers = model.clusterCenters()
print(centers) # slightly different results
# [[8.5, 8.5], [0.5, 0.5]]
# PySpark: [array([0.5, 0.5]), array([8.5, 8.5])]
启用无包导入更改的CLIs
使用包含在spark_rapids_ml
中的某些实验性CLIs,导入来自pyspark.ml
的估计器和模型并部署到pyspark应用程序脚本中,可以通过直接调用(在这种情况下,脚本中创建和配置了spark上下文和会话)或通过spark-submit
进行加速,而无需更改包导入语句为spark_rapids_ml
,如上述示例所示。在直接调用自包含的pyspark应用程序的情况下,可以使用以下命令
python -m spark_rapids_ml spark_enabled_application.py < application options >
如果使用spark-submit
部署应用程序,可以使用以下包含的CLI(使用原始pip install spark-rapids-ml
安装)
spark-rapids-submit --master < master > < other spark submit options > application.py < application options >
目前,任何不支持相应加速spark_rapids_ml
对象的方法或属性都将导致错误。
API文档
项目详情
哈希值 for spark_rapids_ml-24.8.0-486_7f8e779-py3-none-any.whl
算法 | 哈希摘要 | |
---|---|---|
SHA256 | 6bb61a2d4428630fd84b17837f1d6b4e05c4cf3bd0a54a021f1928b006684854 |
|
MD5 | 5d2ded9230df5b730c0b3ada2375506e |
|
BLAKE2b-256 | 0f69e7d56f4e3a37f74028989b0616582e053909d4bbc39b9865ae3b3dcd1db2 |