跳转到主要内容

使用Keras在Spark上实现分布式深度学习

项目描述

Elephas:使用Keras和Spark的分布式深度学习

Elephas

Build Status license Supported Versions

Elephas是Keras的扩展,它允许您使用Spark在可扩展范围内运行分布式深度学习模型。Elephas目前支持以下应用程序:

从概念上讲,elephas的工作方式如下。

Elephas

目录

简介

Elephas 将 Keras 的深度学习引入 KerasSpark。Elephas 旨在保持 Keras 的简单性和高易用性,从而能够快速原型化分布式模型,这些模型可以在大量数据集上运行。关于一个入门示例,请参阅以下 iPython 笔记本

ἐλέφας 在希腊语中意为 象牙,与 κέρας(意为 )相伴随的项目。如果这听起来像是噩梦般的奇怪提及,你应该在 Keras 文档 中确认这一点。Elephas 还意味着 大象,就像填充的黄色大象。

Elephas 在 Keras 的基础上实现了一类数据并行算法,使用 Spark 的 RDDs 和数据框。Keras 模型在驱动程序上初始化,然后序列化并发送到工作节点,同时附带数据和广播的模型参数。Spark 工作节点反序列化模型,对其数据块进行训练,并将梯度发送回驱动程序。驱动程序上的“主”模型通过优化器更新,该优化器可以同步或异步地接受梯度。

入门指南

只需通过 PyPI 安装 elephas,Spark 会通过 pyspark 为您安装。

pip install elephas

这就完成了,你现在应该能够运行 Elephas 的示例了。

基本的Spark集成

安装完 Elephas 后,您可以按照以下步骤训练一个模型。首先,创建一个本地 pyspark 上下文

from pyspark import SparkContext, SparkConf
conf = SparkConf().setAppName('Elephas_App').setMaster('local[8]')
sc = SparkContext(conf=conf)

然后,定义并编译一个 Keras 模型

from tensorflow.keras.models import Sequential
from tensorflow.keras.layers import Dense, Dropout, Activation
from tensorflow.keras.optimizers import SGD
model = Sequential()
model.add(Dense(128, input_dim=784))
model.add(Activation('relu'))
model.add(Dropout(0.2))
model.add(Dense(128))
model.add(Activation('relu'))
model.add(Dropout(0.2))
model.add(Dense(10))
model.add(Activation('softmax'))
model.compile(loss='categorical_crossentropy', optimizer=SGD())

并从 NumPy 数组(或您想要创建 RDD 的任何方式)创建一个 RDD

from elephas.utils.rdd_utils import to_simple_rdd
rdd = to_simple_rdd(sc, x_train, y_train)

Elephas 中的基本模型是 SparkModel。您通过传递一个编译后的 Keras 模型、更新频率和平行化模式来初始化 SparkModel。之后,您只需在您的 RDD 上 fit 模型即可。Elephas 的 fit 与 Keras 模型具有相同的选项,因此您可以将 epochsbatch_size 等等作为您从 tensorflow.keras 所熟悉的方式传递。

from elephas.spark_model import SparkModel

spark_model = SparkModel(model, frequency='epoch', mode='asynchronous')
spark_model.fit(rdd, epochs=20, batch_size=32, verbose=0, validation_split=0.1)

现在,您可以使用 spark-submit 运行您的脚本

spark-submit --driver-memory 1G ./your_script.py

可能需要进一步增加驱动程序的内存,因为网络中的参数集可能非常大,而在驱动程序上收集它们会消耗大量资源。请参阅示例文件夹中的几个工作示例。

分布式推理和评估

SparkModel 也可用于分布式推理(预测)和评估。与 fit 方法类似,predictevaluate 方法遵循 Keras 模型 API。

from elephas.spark_model import SparkModel

# create/train the model, similar to the previous section (Basic Spark Integration)
model = ...
spark_model = SparkModel(model, ...)
spark_model.fit(...)

x_test, y_test = ... # load test data

predictions = spark_model.predict(x_test) # perform inference
evaluation = spark_model.evaluate(x_test, y_test) # perform evaluation/scoring

模式与训练中的数据并行性相同,因为模型被序列化并发送到工作节点,用于评估测试数据的一部分。预测方法将接受 NumPy 数组或 RDD。

Spark MLlib集成

在最后一个示例的基础上,要使用 Spark 的 MLlib 库与 Elephas 一起,请按以下方式创建一个 LabeledPoints 的 RDD 用于监督训练

from elephas.utils.rdd_utils import to_labeled_point
lp_rdd = to_labeled_point(sc, x_train, y_train, categorical=True)

对给定的 LabeledPoint-RDD 进行训练与我们已经看到的方法非常相似

from elephas.spark_model import SparkMLlibModel
spark_model = SparkMLlibModel(model, frequency='batch', mode='hogwild')
spark_model.train(lp_rdd, epochs=20, batch_size=32, verbose=0, validation_split=0.1, 
                  categorical=True, nb_classes=nb_classes)

Spark ML集成

要使用 SparkML 估计器在数据框上训练模型,请使用以下语法。

df = to_data_frame(sc, x_train, y_train, categorical=True)
test_df = to_data_frame(sc, x_test, y_test, categorical=True)

estimator = ElephasEstimator(model, epochs=epochs, batch_size=batch_size, frequency='batch', mode='asynchronous',
                             categorical=True, nb_classes=nb_classes)
fitted_model = estimator.fit(df)

拟合估计器会产生一个 SparkML 转换器,我们可以通过在其上调用 transform 方法来用于预测和其他评估。

prediction = fitted_model.transform(test_df)
pnl = prediction.select("label", "prediction")
pnl.show(100)
import numpy as np
prediction_and_label = pnl.rdd.map(lambda row: (row.label, float(np.argmax(row.prediction))))

metrics = MulticlassMetrics(prediction_and_label)
print(metrics.weightedPrecision)
print(metrics.weightedRecall)

如果模型使用自定义激活函数、层或损失函数,则需要使用 set_custom_objects 方法提供

def custom_activation(x):
    ...
class CustomLayer(Layer):
    ...
model = Sequential()
model.add(CustomLayer(...))

estimator = ElephasEstimator(model, epochs=epochs, batch_size=batch_size)
estimator.set_custom_objects({'custom_activation': custom_activation, 'CustomLayer': CustomLayer})

Hadoop 集成

除了本地保存外,模型还可以直接保存到可网络访问的 Hadoop 集群。

spark_model.save('/absolute/file/path/model.h5', to_hadoop=True)

以下是如何将保存在可网络访问 Hadoop 集群上的模型加载。

from elephas.spark_model import load_spark_model

spark_model = load_spark_model('/absolute/file/path/model.h5', from_hadoop=True)

分布式超参数优化

更新:从 3.0.0 版本开始,已删除超参数优化功能,因为 Hyperas 已不再活跃,并且导致版本兼容性问题。要使用这些功能,请安装版本 2.1 或以下版本。

使用elephas进行超参数优化基于hyperas,它是hyperopt和keras的便捷包装器。每个Spark工作节点执行一系列试验,结果被收集并返回最佳模型。由于hyperopt的分布式模式(使用MongoDB)在编写时配置困难且易于出错,我们选择自行实现并行化。目前,唯一可用的优化算法是随机搜索。

本例的第一个部分大致直接来自hyperas文档。我们将数据和模型定义为函数,超参数范围通过大括号定义。有关如何工作的更多详细信息,请参阅hyperas文档。

from hyperopt import STATUS_OK
from hyperas.distributions import choice, uniform

def data():
    from tensorflow.keras.datasets import mnist
    from tensorflow.keras.utils import to_categorical
    (x_train, y_train), (x_test, y_test) = mnist.load_data()
    x_train = x_train.reshape(60000, 784)
    x_test = x_test.reshape(10000, 784)
    x_train = x_train.astype('float32')
    x_test = x_test.astype('float32')
    x_train /= 255
    x_test /= 255
    nb_classes = 10
    y_train = to_categorical(y_train, nb_classes)
    y_test = to_categorical(y_test, nb_classes)
    return x_train, y_train, x_test, y_test


def model(x_train, y_train, x_test, y_test):
    from tensorflow.keras.models import Sequential
    from tensorflow.keras.layers import Dense, Dropout, Activation
    from tensorflow.keras.optimizers import RMSprop

    model = Sequential()
    model.add(Dense(512, input_shape=(784,)))
    model.add(Activation('relu'))
    model.add(Dropout({{uniform(0, 1)}}))
    model.add(Dense({{choice([256, 512, 1024])}}))
    model.add(Activation('relu'))
    model.add(Dropout({{uniform(0, 1)}}))
    model.add(Dense(10))
    model.add(Activation('softmax'))

    rms = RMSprop()
    model.compile(loss='categorical_crossentropy', optimizer=rms)

    model.fit(x_train, y_train,
              batch_size={{choice([64, 128])}},
              nb_epoch=1,
              show_accuracy=True,
              verbose=2,
              validation_data=(x_test, y_test))
    score, acc = model.evaluate(x_test, y_test, show_accuracy=True, verbose=0)
    print('Test accuracy:', acc)
    return {'loss': -acc, 'status': STATUS_OK, 'model': model.to_json()}

定义基本设置后,只需要几行代码即可运行最小化。

from elephas.hyperparam import HyperParamModel
from pyspark import SparkContext, SparkConf

# Create Spark context
conf = SparkConf().setAppName('Elephas_Hyperparameter_Optimization').setMaster('local[8]')
sc = SparkContext(conf=conf)

# Define hyper-parameter model and run optimization
hyperparam_model = HyperParamModel(sc)
hyperparam_model.minimize(model=model, data=data, max_evals=5)

集成模型的分布式训练

更新:从 3.0.0 版本开始,已删除超参数优化功能,因为 Hyperas 已不再活跃,并且导致版本兼容性问题。要使用这些功能,请安装版本 2.1 或以下版本。

在上一节的基础上,可以通过在大型搜索空间中运行超参数优化并在前n个表现最好的模型上定义结果投票分类器,使用elephas训练集成模型。使用上面定义的datamodel,这就像运行以下内容一样简单:

result = hyperparam_model.best_ensemble(nb_ensemble_models=10, model=model, data=data, max_evals=5)

在这个例子中,构建了一个包含10个模型的集成,基于Spark工作节点上最多5次运行的优化。

Hugging Face模型训练和推理

从6.0.0版本开始,Elephas现在支持使用HuggingFace模型进行分布式训练(和推理)(使用Tensorflow/Keras后端),目前仅限于文本分类和因果语言建模,并且处于"synchronous"训练模式。在未来的版本中,我们希望将其扩展到其他类型的模型以及"asynchronous""hogwild"训练模式。这可以通过使用SparkHFModel实现。

from elephas.spark_model import SparkHFModel
from elephas.utils.rdd_utils import to_simple_rdd
from sklearn.datasets import fetch_20newsgroups
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import LabelEncoder
from transformers import AutoTokenizer, TFAutoModelForSequenceClassification
from tensorflow.keras.optimizers import SGD
batch_size = ...
epochs = ...
num_workers = ...

newsgroups = fetch_20newsgroups(subset='train')
x = newsgroups.data
y = newsgroups.target

encoder = LabelEncoder()
y_encoded = encoder.fit_transform(y)

x_train, x_test, y_train, y_test = train_test_split(x, y_encoded, test_size=0.2)

model_name = 'albert-base-v2'

# Note: the expectation is that text data is being supplied - tokenization is handled during training
rdd = to_simple_rdd(spark_context, x_train, y_train)

model = TFAutoModelForSequenceClassification.from_pretrained(model_name, num_labels=len(np.unique(y_encoded)))
tokenizer = AutoTokenizer.from_pretrained(model_name)
model.compile(optimizer=SGD(), loss='sparse_categorical_crossentropy', metrics=['accuracy'])
spark_model = SparkHFModel(model, num_workers=num_workers, mode="synchronous", tokenizer=tokenizer)

spark_model.fit(rdd, epochs=epochs, batch_size=batch_size)

# Run inference on trained Spark model
predictions = spark_model.predict(spark_context.parallelize(x_test))

更多示例可以在examples目录中看到,例如"hf_causal_modeling.py""hf_text_classification.py"

计算模型与Keras模型相同,只是由于HuggingFace API的不同,模型的序列化和反序列化方式不同。

要使用此功能,只需使用带有huggingface附加组件安装此包。

pip install elephas[huggingface]

讨论

过早的并行化可能不是所有问题的根源,但这样做可能不是最佳选择。请记住,更多的工人意味着每个工人拥有的数据更少,并且并行化模型并不是实际学习的借口。因此,如果您能够完美地将数据拟合到内存中,并且对模型的训练速度感到满意,那么仅使用keras即可。

这个规则的例外可能是在Spark生态系统中工作,并希望利用其中的资源。上述SparkML示例展示了如何使用Spark的评估模块,也许您希望将来进一步处理elephas模型的输出。在这种情况下,我们建议使用elephas作为简单的包装器,通过设置num_workers=1来实现。

请注意,目前elephas限制了自身仅使用数据并行算法,原因有两个。首先,Spark使得数据分发变得非常容易。其次,Spark和Theano都没有特别容易地将实际模型分成几部分,这使得模型并行化实际上难以实现。

话虽如此,我们希望您学会欣赏elephas作为数据并行深度学习算法易于设置和使用的实验平台。

文献

[1] J. Dean, G.S. Corrado, R. Monga, K. Chen, M. Devin, QV. Le, MZ. Mao, M’A. Ranzato, A. Senior, P. Tucker, K. Yang, and AY. Ng. 大规模分布式深度网络

[2] F. Niu, B. Recht, C. Re, S.J. Wright HOGWILD!: 无锁并行化随机梯度下降的方法

[3] C. Noel, S. Osindero. Dogwild! — 分布式Hogwild用于CPU & GPU

维护者 / 贡献

这个伟大的项目是由Max Pumperla发起的,目前由Daniel Cahall维护(https://github.com/danielenricocahall)。如果您有任何问题,请随时提出问题或发送电子邮件至danielenricocahall@gmail.com。如果您想贡献力量,请随时提交PR或开始关于如何实施某项内容的讨论。

星标历史

Star History Chart

项目详情


下载文件

下载适用于您平台的文件。如果您不确定选择哪个,请了解有关安装包的更多信息。

源代码发行版

本发布版没有提供源代码发行版文件。请参阅生成发行版存档的教程

构建发行版

elephas-6.1.0-py3-none-any.whl (33.9 kB 查看哈希值)

上传时间 Python 3

支持者

AWS AWS 云计算和安全赞助商 Datadog Datadog 监控 Fastly Fastly CDN Google Google 下载分析 Microsoft Microsoft PSF 赞助商 Pingdom Pingdom 监控 Sentry Sentry 错误记录 StatusPage StatusPage 状态页面