Dask与Tensorflow之间的交互
项目描述
从Dask启动TensorFlow集群
示例
给定一个Dask集群
from dask.distributed import Client
client = Client('scheduler-address:8786')
获取一个TensorFlow集群,通过名称指定组
from dask_tensorflow import start_tensorflow
tf_spec, dask_spec = start_tensorflow(client, ps=2, worker=4)
>>> tf_spec
{'worker': ['192.168.1.100:2222', '192.168.1.101:2222',
'192.168.1.102:2222', '192.168.1.103:2222'],
'ps': ['192.168.1.104:2222', '192.168.1.105:2222']}
这将在每个Dask工作节点上创建一个tensorflow.train.Server,并在每个工作节点上设置一个用于数据传输的队列。这些可通过tensorflow_server和tensorflow_queue属性直接在工作节点上访问。
更复杂的流程
通常然后我们设置长期运行的Dask任务,这些任务获取这些服务器并参与TensorFlow的通用计算。
from dask.distributed import worker_client
def ps_function(self):
with worker_client() as c:
tf_server = c.worker.tensorflow_server
tf_server.join()
ps_tasks = [client.submit(ps_function, workers=worker, pure=False)
for worker in dask_spec['ps']]
def worker_function(self):
with worker_client() as c:
tf_server = c.worker.tensorflow_server
# ... use tensorflow as desired ...
worker_tasks = [client.submit(worker_function, workers=worker, pure=False)
for worker in dask_spec['worker']]
一种简单灵活的方法是让这些函数在队列上阻塞,并从dask数组、dataframes等中向它们提供数据。
def worker_function(self):
with worker_client() as c:
tf_server = c.worker.tensorflow_server
queue = c.worker.tensorflow_queue
while not stopping_condition():
batch = queue.get()
# train with batch
然后将numpy和pandas dataframes的数据块丢弃到这些队列中
from distributed.worker_client import get_worker
def dump_batch(batch):
worker = get_worker()
worker.tensorflow_queue.put(batch)
import dask.dataframe as dd
df = dd.read_csv('hdfs:///path/to/*.csv')
# clean up dataframe as necessary
partitions = df.to_delayed() # delayed pandas dataframes
client.map(dump_batch, partitions)
项目详情
关闭
dask-tensorflow-0.0.2.tar.gz的哈希
算法 | 哈希摘要 | |
---|---|---|
SHA256 | 9687ca632ca9769ccc2c8f99300ebfc675b19ef6369a57e33c00e7178f412d2e |
|
MD5 | 0a27b2144c4f550cafd6ddb98a29d018 |
|
BLAKE2b-256 | 9c6eb09e3fa80760aef4bb77ed5cee6df94ef21dec347da6be0fea9505f9d792 |
关闭
dask_tensorflow-0.0.2-py2.py3-none-any.whl的哈希
算法 | 哈希摘要 | |
---|---|---|
SHA256 | b88b6f330f846bb1a9d58cfdd612ca5a3d3ef4874e3e81405395e6104d8ad2d9 |
|
MD5 | 944c7b4c6cde97705bc54036fff531ff |
|
BLAKE2b-256 | 00a56cd58713aacf16fc8ef801e3020894a1faba7710c19c047c3e9582081b20 |