dask-pytorch已被重命名为dask-pytorch-ddp
项目描述
dask-pytorch-ddp
注意:dask-pytorch已被重命名为dask-pytorch-ddp!
我们发布了dask-pytorch v0.1.2的最终版本,该版本与dask-pytorch-ddp v0.2.0相同,以简化此过渡。您可以在PyPI上找到dask-pytorch-ddp:[https://pypi.python.org/pypi/dask-pytorch-ddp/](https://pypi.python.org/pypi/dask-pytorch-ddp/)
dask-pytorch-ddp
是一个Python包,它使用分布式数据并行使在Dask集群上训练PyTorch模型变得简单。项目的预期范围包括
- 在Dask集群上启动PyTorch工作节点
- 使用分布式数据存储(例如,S3)作为常规PyTorch数据集
- 跟踪和记录中间结果、训练统计数据和检查点的机制。
目前,这个库和提供的示例针对计算机视觉任务进行了优化,但这个库旨在适用于任何类型的PyTorch任务。唯一真正特定于图像处理的是S3ImageFolder
数据集类。在图像处理之外实现PyTorch数据集(假设为映射样式随机访问)需要实现__getitem__(self, idx: int):
和__len__(self):
。我们计划在将来添加更多其他用例的示例,并欢迎PR扩展功能。
典型的非Dask工作流程
非Dask PyTorch使用的典型示例如下
加载数据
创建数据集(ImageFolder
),并用DataLoader
包装它
transform = transforms.Compose([
transforms.Resize(256),
transforms.CenterCrop(250),
transforms.ToTensor()
])
whole_dataset = ImageFolder(path, transform=transform)
batch_size = 100
num_workers = 64
indices = list(range(len(data)))
np.random.shuffle(indices)
train_idx = indices[:num]
test_idx = indices[num:num+num]
train_sampler = SubsetRandomSampler(train_idx)
train_loader = DataLoader(data, sampler=train_sampler, batch_size=batch_size, num_workers=num_workers)
训练模型
遍历数据集,并通过优化器步进来训练模型
device = torch.device(0)
net = models.resnet18(pretrained=False)
model = net.to(device)
device_ids = [0]
criterion = nn.CrossEntropyLoss().cuda()
lr = 0.001
optimizer = optim.SGD(model.parameters(), lr=lr, momentum=0.9)
count = 0
for epoch in range(n_epochs):
model.train() # Set model to training mode
for inputs, labels in train_loader:
inputs = inputs.to(device)
labels = labels.to(device)
outputs = model(inputs)
_, preds = torch.max(outputs, 1)
loss = criterion(outputs, labels)
# zero the parameter gradients
optimizer.zero_grad()
loss.backward()
optimizer.step()
count += 1
现在在Dask上
使用dask_pytorch_ddp和PyTorch分布式数据并行,我们可以如下进行训练
加载数据
从S3加载数据集,并显式设置多进程上下文(Dask默认为spawn,但PyTorch通常配置为使用fork)
from dask_pytorch_ddp.data import S3ImageFolder
whole_dataset = S3ImageFolder(bucket, prefix, transform=transform)
train_loader = torch.utils.data.DataLoader(
whole_dataset, sampler=train_sampler, batch_size=batch_size, num_workers=num_workers, multiprocessing_context=mp.get_context('fork')
)
并行训练
将训练循环包裹在一个函数中(并添加指标记录。不是必需的,但非常有用)。将模型转换为PyTorch分布式数据并行(DDP
)模型,该模型知道如何在工作者之间同步梯度。
import uuid
import pickle
import logging
import json
key = uuid.uuid4().hex
rh = DaskResultsHandler(key)
def run_transfer_learning(bucket, prefix, samplesize, n_epochs, batch_size, num_workers, train_sampler):
worker_rank = int(dist.get_rank())
device = torch.device(0)
net = models.resnet18(pretrained=False)
model = net.to(device)
model = DDP(model, device_ids=[0])
criterion = nn.CrossEntropyLoss().cuda()
lr = 0.001
optimizer = optim.SGD(model.parameters(), lr=lr, momentum=0.9)
whole_dataset = S3ImageFolder(bucket, prefix, transform=transform)
train_loader = torch.utils.data.DataLoader(
whole_dataset,
sampler=train_sampler,
batch_size=batch_size,
num_workers=num_workers,
multiprocessing_context=mp.get_context('fork')
)
count = 0
for epoch in range(n_epochs):
# Each epoch has a training and validation phase
model.train() # Set model to training mode
for inputs, labels in train_loader:
dt = datetime.datetime.now().isoformat()
inputs = inputs.to(device)
labels = labels.to(device)
outputs = model(inputs)
_, preds = torch.max(outputs, 1)
loss = criterion(outputs, labels)
# zero the parameter gradients
optimizer.zero_grad()
loss.backward()
optimizer.step()
count += 1
# statistics
rh.submit_result(
f"worker/{worker_rank}/data-{dt}.json",
json.dumps({'loss': loss.item(), 'epoch': epoch, 'count': count, 'worker': worker_rank})
)
if (count % 100) == 0 and worker_rank == 0:
rh.submit_result(f"checkpoint-{dt}.pkl", pickle.dumps(model.state_dict()))
它是如何工作的?
dask-pytorch-ddp
主要是在现有的pytorch
功能周围的一个包装。 pytorch.distributed
提供了分布式数据并行(DDP)的基础设施。
在DDP中,你创建N个工作者,0号工作者是“主”,并协调缓冲区和梯度的同步。在SGD中,梯度通常在批次中的所有数据点之间平均。通过在多个工作者上运行批次并平均梯度,DDP使你能够使用更大的批次大小(N * batch_size)
来运行SGD。
dask-pytorch-ddp
设置了一些环境变量来配置“主”主机和端口,然后在训练之前调用init_process_group
,在训练之后调用destroy_process_group
。这是数据科学家通常手动执行的过程。
多GPU机器
dask_cuda_worker
自动为每个工作者旋转CUDA_VISIBLE_DEVICES
(通常每个GPU一个)。因此,你的PyTorch代码应该始终从0号GPU开始。
例如,如果我有8个GPU的机器,第3个工作者将CUDA_VISIBLE_DEVICES
设置为2,3,4,5,6,7,0,1
。在该工作者上,如果我调用torch.device(0)
,我将得到GPU 2。
还有什么?
dask-pytorch-ddp
还实现了基于S3的ImageFolder
。还计划实现更多面向分布式友好的数据集。dask-pytorch-ddp
还实现了一个基本的成果汇总框架,以便轻松收集不同工作者之间的训练指标。目前,仅实现了利用Dask pub-sub通信协议的DaskResultsHandler
,但计划实现基于S3的结果处理程序。
一些注意事项
Dask通常生成进程。PyTorch通常分叉。当使用支持多进程的数据加载器时,传递Fork
多进程上下文以强制在数据加载器中使用分叉是一个好主意。
某些Dask部署不允许生成进程。要覆盖此设置,你可以更改distributed.worker.daemon设置。
环境变量是一种方便的执行方式
DASK_DISTRIBUTED__WORKER__DAEMON=False
项目详情
下载文件
下载适合您平台的文件。如果您不确定选择哪个,请了解更多关于安装包的信息。
源分布
构建版本
dask-pytorch-0.1.2.tar.gz 的哈希值
算法 | 哈希摘要 | |
---|---|---|
SHA256 | f367ffbbb69c23614c76f959e6db3cc040bf1bfeaa3a944437e8fbb79d2549b8 |
|
MD5 | 920a73492e9c0c0c49fc62ca896efcd1 |
|
BLAKE2b-256 | 41a543338e6bfb66013d1bc614f722ea9aa7a27a98f4bc5c28380b44a34e767b |
dask_pytorch-0.1.2-py3-none-any.whl 的哈希值
算法 | 哈希摘要 | |
---|---|---|
SHA256 | 05d00005bf55d3b32bf9dcfcb6a86dda3e176e666192aa0ebf3b58b80247f9b6 |
|
MD5 | c0b1f797da88ae55cf9070690ee5ff78 |
|
BLAKE2b-256 | 4b15ccf5cca2ea2bea33431b12341f2f65fca7c1cf488a5d32bc6024c6bb132b |