使用Keras在Spark上实现分布式深度学习
项目描述
Elephas:使用Keras和Spark的分布式深度学习
Elephas是Keras的扩展,它允许您使用Spark在可扩展范围内运行分布式深度学习模型。Elephas目前支持以下应用程序:
- 深度学习模型的并行训练
- 分布式深度学习模型的推理和评估
集成模型的分布式训练(自3.0.0版本已移除)分布式超参数优化(自3.0.0版本已移除)- 使用Hugging Face模型的分布式训练和推理(自6.0.0版本添加)
从概念上讲,elephas的工作方式如下。
目录
简介
Elephas 将 Keras 的深度学习引入 Keras 和 Spark。Elephas 旨在保持 Keras 的简单性和高易用性,从而能够快速原型化分布式模型,这些模型可以在大量数据集上运行。关于一个入门示例,请参阅以下 iPython 笔记本。
ἐλέφας 在希腊语中意为 象牙,与 κέρας(意为 角)相伴随的项目。如果这听起来像是噩梦般的奇怪提及,你应该在 Keras 文档 中确认这一点。Elephas 还意味着 大象,就像填充的黄色大象。
Elephas 在 Keras 的基础上实现了一类数据并行算法,使用 Spark 的 RDDs 和数据框。Keras 模型在驱动程序上初始化,然后序列化并发送到工作节点,同时附带数据和广播的模型参数。Spark 工作节点反序列化模型,对其数据块进行训练,并将梯度发送回驱动程序。驱动程序上的“主”模型通过优化器更新,该优化器可以同步或异步地接受梯度。
入门指南
只需通过 PyPI 安装 elephas,Spark 会通过 pyspark
为您安装。
pip install elephas
这就完成了,你现在应该能够运行 Elephas 的示例了。
基本的Spark集成
安装完 Elephas 后,您可以按照以下步骤训练一个模型。首先,创建一个本地 pyspark 上下文
from pyspark import SparkContext, SparkConf
conf = SparkConf().setAppName('Elephas_App').setMaster('local[8]')
sc = SparkContext(conf=conf)
然后,定义并编译一个 Keras 模型
from tensorflow.keras.models import Sequential
from tensorflow.keras.layers import Dense, Dropout, Activation
from tensorflow.keras.optimizers import SGD
model = Sequential()
model.add(Dense(128, input_dim=784))
model.add(Activation('relu'))
model.add(Dropout(0.2))
model.add(Dense(128))
model.add(Activation('relu'))
model.add(Dropout(0.2))
model.add(Dense(10))
model.add(Activation('softmax'))
model.compile(loss='categorical_crossentropy', optimizer=SGD())
并从 NumPy 数组(或您想要创建 RDD 的任何方式)创建一个 RDD
from elephas.utils.rdd_utils import to_simple_rdd
rdd = to_simple_rdd(sc, x_train, y_train)
Elephas 中的基本模型是 SparkModel
。您通过传递一个编译后的 Keras 模型、更新频率和平行化模式来初始化 SparkModel
。之后,您只需在您的 RDD 上 fit
模型即可。Elephas 的 fit
与 Keras 模型具有相同的选项,因此您可以将 epochs
、batch_size
等等作为您从 tensorflow.keras 所熟悉的方式传递。
from elephas.spark_model import SparkModel
spark_model = SparkModel(model, frequency='epoch', mode='asynchronous')
spark_model.fit(rdd, epochs=20, batch_size=32, verbose=0, validation_split=0.1)
现在,您可以使用 spark-submit 运行您的脚本
spark-submit --driver-memory 1G ./your_script.py
可能需要进一步增加驱动程序的内存,因为网络中的参数集可能非常大,而在驱动程序上收集它们会消耗大量资源。请参阅示例文件夹中的几个工作示例。
分布式推理和评估
SparkModel
也可用于分布式推理(预测)和评估。与 fit
方法类似,predict
和 evaluate
方法遵循 Keras 模型 API。
from elephas.spark_model import SparkModel
# create/train the model, similar to the previous section (Basic Spark Integration)
model = ...
spark_model = SparkModel(model, ...)
spark_model.fit(...)
x_test, y_test = ... # load test data
predictions = spark_model.predict(x_test) # perform inference
evaluation = spark_model.evaluate(x_test, y_test) # perform evaluation/scoring
模式与训练中的数据并行性相同,因为模型被序列化并发送到工作节点,用于评估测试数据的一部分。预测方法将接受 NumPy 数组或 RDD。
Spark MLlib集成
在最后一个示例的基础上,要使用 Spark 的 MLlib 库与 Elephas 一起,请按以下方式创建一个 LabeledPoints 的 RDD 用于监督训练
from elephas.utils.rdd_utils import to_labeled_point
lp_rdd = to_labeled_point(sc, x_train, y_train, categorical=True)
对给定的 LabeledPoint-RDD 进行训练与我们已经看到的方法非常相似
from elephas.spark_model import SparkMLlibModel
spark_model = SparkMLlibModel(model, frequency='batch', mode='hogwild')
spark_model.train(lp_rdd, epochs=20, batch_size=32, verbose=0, validation_split=0.1,
categorical=True, nb_classes=nb_classes)
Spark ML集成
要使用 SparkML 估计器在数据框上训练模型,请使用以下语法。
df = to_data_frame(sc, x_train, y_train, categorical=True)
test_df = to_data_frame(sc, x_test, y_test, categorical=True)
estimator = ElephasEstimator(model, epochs=epochs, batch_size=batch_size, frequency='batch', mode='asynchronous',
categorical=True, nb_classes=nb_classes)
fitted_model = estimator.fit(df)
拟合估计器会产生一个 SparkML 转换器,我们可以通过在其上调用 transform 方法来用于预测和其他评估。
prediction = fitted_model.transform(test_df)
pnl = prediction.select("label", "prediction")
pnl.show(100)
import numpy as np
prediction_and_label = pnl.rdd.map(lambda row: (row.label, float(np.argmax(row.prediction))))
metrics = MulticlassMetrics(prediction_and_label)
print(metrics.weightedPrecision)
print(metrics.weightedRecall)
如果模型使用自定义激活函数、层或损失函数,则需要使用 set_custom_objects
方法提供
def custom_activation(x):
...
class CustomLayer(Layer):
...
model = Sequential()
model.add(CustomLayer(...))
estimator = ElephasEstimator(model, epochs=epochs, batch_size=batch_size)
estimator.set_custom_objects({'custom_activation': custom_activation, 'CustomLayer': CustomLayer})
Hadoop 集成
除了本地保存外,模型还可以直接保存到可网络访问的 Hadoop 集群。
spark_model.save('/absolute/file/path/model.h5', to_hadoop=True)
以下是如何将保存在可网络访问 Hadoop 集群上的模型加载。
from elephas.spark_model import load_spark_model
spark_model = load_spark_model('/absolute/file/path/model.h5', from_hadoop=True)
分布式超参数优化
更新:从 3.0.0 版本开始,已删除超参数优化功能,因为 Hyperas 已不再活跃,并且导致版本兼容性问题。要使用这些功能,请安装版本 2.1 或以下版本。
使用elephas进行超参数优化基于hyperas,它是hyperopt和keras的便捷包装器。每个Spark工作节点执行一系列试验,结果被收集并返回最佳模型。由于hyperopt的分布式模式(使用MongoDB)在编写时配置困难且易于出错,我们选择自行实现并行化。目前,唯一可用的优化算法是随机搜索。
本例的第一个部分大致直接来自hyperas文档。我们将数据和模型定义为函数,超参数范围通过大括号定义。有关如何工作的更多详细信息,请参阅hyperas文档。
from hyperopt import STATUS_OK
from hyperas.distributions import choice, uniform
def data():
from tensorflow.keras.datasets import mnist
from tensorflow.keras.utils import to_categorical
(x_train, y_train), (x_test, y_test) = mnist.load_data()
x_train = x_train.reshape(60000, 784)
x_test = x_test.reshape(10000, 784)
x_train = x_train.astype('float32')
x_test = x_test.astype('float32')
x_train /= 255
x_test /= 255
nb_classes = 10
y_train = to_categorical(y_train, nb_classes)
y_test = to_categorical(y_test, nb_classes)
return x_train, y_train, x_test, y_test
def model(x_train, y_train, x_test, y_test):
from tensorflow.keras.models import Sequential
from tensorflow.keras.layers import Dense, Dropout, Activation
from tensorflow.keras.optimizers import RMSprop
model = Sequential()
model.add(Dense(512, input_shape=(784,)))
model.add(Activation('relu'))
model.add(Dropout({{uniform(0, 1)}}))
model.add(Dense({{choice([256, 512, 1024])}}))
model.add(Activation('relu'))
model.add(Dropout({{uniform(0, 1)}}))
model.add(Dense(10))
model.add(Activation('softmax'))
rms = RMSprop()
model.compile(loss='categorical_crossentropy', optimizer=rms)
model.fit(x_train, y_train,
batch_size={{choice([64, 128])}},
nb_epoch=1,
show_accuracy=True,
verbose=2,
validation_data=(x_test, y_test))
score, acc = model.evaluate(x_test, y_test, show_accuracy=True, verbose=0)
print('Test accuracy:', acc)
return {'loss': -acc, 'status': STATUS_OK, 'model': model.to_json()}
定义基本设置后,只需要几行代码即可运行最小化。
from elephas.hyperparam import HyperParamModel
from pyspark import SparkContext, SparkConf
# Create Spark context
conf = SparkConf().setAppName('Elephas_Hyperparameter_Optimization').setMaster('local[8]')
sc = SparkContext(conf=conf)
# Define hyper-parameter model and run optimization
hyperparam_model = HyperParamModel(sc)
hyperparam_model.minimize(model=model, data=data, max_evals=5)
集成模型的分布式训练
更新:从 3.0.0 版本开始,已删除超参数优化功能,因为 Hyperas 已不再活跃,并且导致版本兼容性问题。要使用这些功能,请安装版本 2.1 或以下版本。
在上一节的基础上,可以通过在大型搜索空间中运行超参数优化并在前n个表现最好的模型上定义结果投票分类器,使用elephas训练集成模型。使用上面定义的data
和model
,这就像运行以下内容一样简单:
result = hyperparam_model.best_ensemble(nb_ensemble_models=10, model=model, data=data, max_evals=5)
在这个例子中,构建了一个包含10个模型的集成,基于Spark工作节点上最多5次运行的优化。
Hugging Face模型训练和推理
从6.0.0版本开始,Elephas现在支持使用HuggingFace模型进行分布式训练(和推理)(使用Tensorflow/Keras后端),目前仅限于文本分类和因果语言建模,并且处于"synchronous"
训练模式。在未来的版本中,我们希望将其扩展到其他类型的模型以及"asynchronous"
和"hogwild"
训练模式。这可以通过使用SparkHFModel
实现。
from elephas.spark_model import SparkHFModel
from elephas.utils.rdd_utils import to_simple_rdd
from sklearn.datasets import fetch_20newsgroups
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import LabelEncoder
from transformers import AutoTokenizer, TFAutoModelForSequenceClassification
from tensorflow.keras.optimizers import SGD
batch_size = ...
epochs = ...
num_workers = ...
newsgroups = fetch_20newsgroups(subset='train')
x = newsgroups.data
y = newsgroups.target
encoder = LabelEncoder()
y_encoded = encoder.fit_transform(y)
x_train, x_test, y_train, y_test = train_test_split(x, y_encoded, test_size=0.2)
model_name = 'albert-base-v2'
# Note: the expectation is that text data is being supplied - tokenization is handled during training
rdd = to_simple_rdd(spark_context, x_train, y_train)
model = TFAutoModelForSequenceClassification.from_pretrained(model_name, num_labels=len(np.unique(y_encoded)))
tokenizer = AutoTokenizer.from_pretrained(model_name)
model.compile(optimizer=SGD(), loss='sparse_categorical_crossentropy', metrics=['accuracy'])
spark_model = SparkHFModel(model, num_workers=num_workers, mode="synchronous", tokenizer=tokenizer)
spark_model.fit(rdd, epochs=epochs, batch_size=batch_size)
# Run inference on trained Spark model
predictions = spark_model.predict(spark_context.parallelize(x_test))
更多示例可以在examples
目录中看到,例如"hf_causal_modeling.py"
和"hf_text_classification.py"
。
计算模型与Keras模型相同,只是由于HuggingFace API的不同,模型的序列化和反序列化方式不同。
要使用此功能,只需使用带有huggingface
附加组件安装此包。
pip install elephas[huggingface]
讨论
过早的并行化可能不是所有问题的根源,但这样做可能不是最佳选择。请记住,更多的工人意味着每个工人拥有的数据更少,并且并行化模型并不是实际学习的借口。因此,如果您能够完美地将数据拟合到内存中,并且对模型的训练速度感到满意,那么仅使用keras即可。
这个规则的例外可能是在Spark生态系统中工作,并希望利用其中的资源。上述SparkML示例展示了如何使用Spark的评估模块,也许您希望将来进一步处理elephas模型的输出。在这种情况下,我们建议使用elephas作为简单的包装器,通过设置num_workers=1来实现。
请注意,目前elephas限制了自身仅使用数据并行算法,原因有两个。首先,Spark使得数据分发变得非常容易。其次,Spark和Theano都没有特别容易地将实际模型分成几部分,这使得模型并行化实际上难以实现。
话虽如此,我们希望您学会欣赏elephas作为数据并行深度学习算法易于设置和使用的实验平台。
文献
[1] J. Dean, G.S. Corrado, R. Monga, K. Chen, M. Devin, QV. Le, MZ. Mao, M’A. Ranzato, A. Senior, P. Tucker, K. Yang, and AY. Ng. 大规模分布式深度网络。
[2] F. Niu, B. Recht, C. Re, S.J. Wright HOGWILD!: 无锁并行化随机梯度下降的方法
[3] C. Noel, S. Osindero. Dogwild! — 分布式Hogwild用于CPU & GPU
维护者 / 贡献
这个伟大的项目是由Max Pumperla发起的,目前由Daniel Cahall维护(https://github.com/danielenricocahall)。如果您有任何问题,请随时提出问题或发送电子邮件至danielenricocahall@gmail.com。如果您想贡献力量,请随时提交PR或开始关于如何实施某项内容的讨论。
星标历史
项目详情
下载文件
下载适用于您平台的文件。如果您不确定选择哪个,请了解有关安装包的更多信息。
源代码发行版
构建发行版
elephas-6.1.0-py3-none-any.whl的哈希值
算法 | 哈希摘要 | |
---|---|---|
SHA256 | 13316710473f69a084fd8a189f58d2660258e79d356a704ca8210cb3bcc638b9 |
|
MD5 | 53e8a1782e8c21fe8674ff5249c3defa |
|
BLAKE2b-256 | 9a62f375663be4e276e9b2d289b739369c07a6783d1b5ab8989932bb139470e2 |