跳转到主要内容

Dask与Tensorflow之间的交互

项目描述

从Dask启动TensorFlow集群

示例

给定一个Dask集群

from dask.distributed import Client
client = Client('scheduler-address:8786')

获取一个TensorFlow集群,通过名称指定组

from dask_tensorflow import start_tensorflow
tf_spec, dask_spec = start_tensorflow(client, ps=2, worker=4)

>>> tf_spec
{'worker': ['192.168.1.100:2222', '192.168.1.101:2222',
            '192.168.1.102:2222', '192.168.1.103:2222'],
 'ps': ['192.168.1.104:2222', '192.168.1.105:2222']}

这将在每个Dask工作节点上创建一个tensorflow.train.Server,并在每个工作节点上设置一个用于数据传输的队列。这些可通过tensorflow_servertensorflow_queue属性直接在工作节点上访问。

更复杂的流程

通常然后我们设置长期运行的Dask任务,这些任务获取这些服务器并参与TensorFlow的通用计算。

from dask.distributed import worker_client

def ps_function(self):
    with worker_client() as c:
        tf_server = c.worker.tensorflow_server
        tf_server.join()

ps_tasks = [client.submit(ps_function, workers=worker, pure=False)
            for worker in dask_spec['ps']]

def worker_function(self):
    with worker_client() as c:
        tf_server = c.worker.tensorflow_server

        # ... use tensorflow as desired ...

worker_tasks = [client.submit(worker_function, workers=worker, pure=False)
                for worker in dask_spec['worker']]

一种简单灵活的方法是让这些函数在队列上阻塞,并从dask数组、dataframes等中向它们提供数据。

def worker_function(self):
    with worker_client() as c:
        tf_server = c.worker.tensorflow_server
        queue = c.worker.tensorflow_queue

        while not stopping_condition():
            batch = queue.get()
            # train with batch

然后将numpy和pandas dataframes的数据块丢弃到这些队列中

from distributed.worker_client import get_worker
def dump_batch(batch):
    worker = get_worker()
    worker.tensorflow_queue.put(batch)


import dask.dataframe as dd
df = dd.read_csv('hdfs:///path/to/*.csv')
# clean up dataframe as necessary
partitions = df.to_delayed()  # delayed pandas dataframes
client.map(dump_batch, partitions)

项目详情


下载文件

下载适合您平台文件。如果您不确定选择哪一个,请了解更多关于安装包的信息。

源分发

dask-tensorflow-0.0.2.tar.gz (4.8 kB 查看哈希)

上传时间: 源码

构建分发版

dask_tensorflow-0.0.2-py2.py3-none-any.whl (5.3 kB 查看哈希)

上传时间: Python 2 Python 3

由以下支持