跳转到主要内容

此软件包帮助用户在他们Spark集群上使用TensorFlow进行分布式训练。

项目描述

Spark TensorFlow Distributor

此软件包帮助用户在他们Spark集群上使用TensorFlow进行分布式训练。

安装

此软件包需要Python 3.6+,tensorflow>=2.1.0pyspark>=3.0.0 运行。要安装 spark-tensorflow-distributor,请运行

pip install spark-tensorflow-distributor

安装不会安装PySpark,因为对于大多数用户,PySpark已经安装。此外,未安装 tensorflow,因此用户可以选择通过 pip install tensorflowpip install tensorflow-cpu 之间的常规和仅CPU安装。如果您没有安装PySpark,您可以直接安装它

pip install pyspark>=3.0.*

注意,为了使用此软件包的许多功能,您必须在您的集群上设置Spark自定义资源调度以用于GPU。请参阅Spark文档。

运行测试

对于集成测试,首先构建主和工人镜像,然后运行测试脚本。

docker-compose build --build-arg PYTHON_INSTALL_VERSION=3.7
./tests/integration/run.sh

对于代码风格检查,运行以下命令。

./tests/lint.sh

要使用自动格式化工具,运行以下命令。

yapf --recursive --in-place spark_tensorflow_distributor

示例

pyspark 脚本中运行以下示例代码

from spark_tensorflow_distributor import MirroredStrategyRunner

# Adapted from https://tensorflowcn.cn/tutorials/distribute/multi_worker_with_keras
def train():
    import tensorflow as tf
    import uuid

    BUFFER_SIZE = 10000
    BATCH_SIZE = 64

    def make_datasets():
        (mnist_images, mnist_labels), _ = \
            tf.keras.datasets.mnist.load_data(path=str(uuid.uuid4())+'mnist.npz')

        dataset = tf.data.Dataset.from_tensor_slices((
            tf.cast(mnist_images[..., tf.newaxis] / 255.0, tf.float32),
            tf.cast(mnist_labels, tf.int64))
        )
        dataset = dataset.repeat().shuffle(BUFFER_SIZE).batch(BATCH_SIZE)
        return dataset

    def build_and_compile_cnn_model():
        model = tf.keras.Sequential([
            tf.keras.layers.Conv2D(32, 3, activation='relu', input_shape=(28, 28, 1)),
            tf.keras.layers.MaxPooling2D(),
            tf.keras.layers.Flatten(),
            tf.keras.layers.Dense(64, activation='relu'),
            tf.keras.layers.Dense(10, activation='softmax'),
        ])
        model.compile(
            loss=tf.keras.losses.sparse_categorical_crossentropy,
            optimizer=tf.keras.optimizers.SGD(learning_rate=0.001),
            metrics=['accuracy'],
        )
        return model

    train_datasets = make_datasets()
    options = tf.data.Options()
    options.experimental_distribute.auto_shard_policy = tf.data.experimental.AutoShardPolicy.DATA
    train_datasets = train_datasets.with_options(options)
    multi_worker_model = build_and_compile_cnn_model()
    multi_worker_model.fit(x=train_datasets, epochs=3, steps_per_epoch=5)

MirroredStrategyRunner(num_slots=8).run(train)

项目详情


下载文件

下载适用于您的平台的文件。如果您不确定选择哪个,请了解更多关于 安装包 的信息。

源代码发行版

spark_tensorflow_distributor-1.0.0.tar.gz (9.2 kB 查看哈希值)

上传时间 源代码

构建分发版

spark_tensorflow_distributor-1.0.0-py3-none-any.whl (8.9 kB 查看哈希值)

上传时间 Python 3

支持

AWS AWS 云计算和安全赞助商 Datadog Datadog 监控 Fastly Fastly CDN Google Google 下载分析 Microsoft Microsoft PSF赞助商 Pingdom Pingdom 监控 Sentry Sentry 错误日志 StatusPage StatusPage 状态页面