用于在dask集群上设置torch DDP的库
项目描述
dask-pytorch-ddp
dask-pytorch-ddp
是一个Python包,它使使用分布式数据并行在Dask集群上训练PyTorch模型变得简单。项目的预期范围是
- 在Dask集群上启动PyTorch工作进程
- 使用分布式数据存储(例如,S3)作为常规PyTorch数据集
- 跟踪和记录中间结果、训练统计信息和检查点的机制。
目前,这个库和提供的示例主要针对计算机视觉任务,但这个库旨在对任何类型的PyTorch任务都很有用。唯一真正特定于图像处理的是 S3ImageFolder
数据集类。在图像之外实现PyTorch数据集(假设映射样式随机访问)需要实现 __getitem__(self, idx: int):
和 __len__(self):
。我们计划在未来添加更多其他用例的示例,并欢迎提供扩展功能的PR。
典型的非dask工作流程
非dask PyTorch使用的典型示例如下
加载数据
创建一个数据集(ImageFolder
),并用DataLoader
包装它
transform = transforms.Compose([
transforms.Resize(256),
transforms.CenterCrop(250),
transforms.ToTensor()
])
whole_dataset = ImageFolder(path, transform=transform)
batch_size = 100
num_workers = 64
indices = list(range(len(data)))
np.random.shuffle(indices)
train_idx = indices[:num]
test_idx = indices[num:num+num]
train_sampler = SubsetRandomSampler(train_idx)
train_loader = DataLoader(data, sampler=train_sampler, batch_size=batch_size, num_workers=num_workers)
训练模型
遍历数据集,并通过优化器步进来训练模型
device = torch.device(0)
net = models.resnet18(pretrained=False)
model = net.to(device)
device_ids = [0]
criterion = nn.CrossEntropyLoss().cuda()
lr = 0.001
optimizer = optim.SGD(model.parameters(), lr=lr, momentum=0.9)
count = 0
for epoch in range(n_epochs):
model.train() # Set model to training mode
for inputs, labels in train_loader:
inputs = inputs.to(device)
labels = labels.to(device)
outputs = model(inputs)
_, preds = torch.max(outputs, 1)
loss = criterion(outputs, labels)
# zero the parameter gradients
optimizer.zero_grad()
loss.backward()
optimizer.step()
count += 1
现在使用Dask
使用dask_pytorch_ddp和PyTorch分布式数据并行,我们可以在多个工作者上训练,如下所示
加载数据
从S3加载数据集,并显式设置多进程上下文(Dask默认为spawn,但PyTorch通常配置为使用fork)
from dask_pytorch_ddp.data import S3ImageFolder
whole_dataset = S3ImageFolder(bucket, prefix, transform=transform)
train_loader = torch.utils.data.DataLoader(
whole_dataset, sampler=train_sampler, batch_size=batch_size, num_workers=num_workers, multiprocessing_context=mp.get_context('fork')
)
并行训练
将训练循环包装在函数中(并添加指标记录。这不是必需的,但非常有用)。将模型转换为PyTorch分布式数据并行(DDP
)模型,该模型知道如何在工作者之间同步梯度。
import uuid
import pickle
import logging
import json
key = uuid.uuid4().hex
rh = DaskResultsHandler(key)
def run_transfer_learning(bucket, prefix, samplesize, n_epochs, batch_size, num_workers, train_sampler):
worker_rank = int(dist.get_rank())
device = torch.device(0)
net = models.resnet18(pretrained=False)
model = net.to(device)
model = DDP(model, device_ids=[0])
criterion = nn.CrossEntropyLoss().cuda()
lr = 0.001
optimizer = optim.SGD(model.parameters(), lr=lr, momentum=0.9)
whole_dataset = S3ImageFolder(bucket, prefix, transform=transform)
train_loader = torch.utils.data.DataLoader(
whole_dataset,
sampler=train_sampler,
batch_size=batch_size,
num_workers=num_workers,
multiprocessing_context=mp.get_context('fork')
)
count = 0
for epoch in range(n_epochs):
# Each epoch has a training and validation phase
model.train() # Set model to training mode
for inputs, labels in train_loader:
dt = datetime.datetime.now().isoformat()
inputs = inputs.to(device)
labels = labels.to(device)
outputs = model(inputs)
_, preds = torch.max(outputs, 1)
loss = criterion(outputs, labels)
# zero the parameter gradients
optimizer.zero_grad()
loss.backward()
optimizer.step()
count += 1
# statistics
rh.submit_result(
f"worker/{worker_rank}/data-{dt}.json",
json.dumps({'loss': loss.item(), 'epoch': epoch, 'count': count, 'worker': worker_rank})
)
if (count % 100) == 0 and worker_rank == 0:
rh.submit_result(f"checkpoint-{dt}.pkl", pickle.dumps(model.state_dict()))
它是如何工作的?
dask-pytorch-ddp
主要是对现有pytorch
功能的封装。pytorch.distributed
提供了分布式数据并行(DDP)的基础设施。
在DDP中,你创建N个工作者,0号工作者是“主”,负责协调缓冲区和梯度的同步。在SGD中,梯度通常在批量的所有数据点之间平均。通过在多个工作者上运行批次并平均梯度,DDP可以使你以更大的批次大小((N * batch_size)
)运行SGD。
dask-pytorch-ddp
设置了一些环境变量来配置“主”主机和端口,然后在训练之前调用init_process_group
,训练后调用destroy_process_group
。这是数据科学家通常手动完成的过程。
多GPU机器
dask_cuda_worker
会自动为每个工作者旋转CUDA_VISIBLE_DEVICES
(通常每个GPU一个)。因此,你的PyTorch代码应该始终从0号GPU开始。
例如,如果我有8个GPU的机器,第3个工作者将CUDA_VISIBLE_DEVICES
设置为2,3,4,5,6,7,0,1
。在那个工作者上,如果调用torch.device(0)
,我会得到GPU 2。
还有其他什么?
dask-pytorch-ddp
还实现了一个基于S3的ImageFolder
。计划实现更多分布式友好的数据集。dask-pytorch-ddp
还实现了一个基本的成果聚合框架,这样就可以轻松地跨不同工作者收集训练指标。目前,只实现了利用Dask pub-sub通信协议的DaskResultsHandler
,但计划实现基于S3的结果处理器。
一些注意事项
Dask通常生成进程。PyTorch通常采用Fork。当使用具有多进程支持的数据加载器时,最好传递Fork
多进程上下文,以强制在数据加载器中使用Forking。
一些Dask部署不允许生成进程。要覆盖此限制,可以更改distributed.worker.daemon设置。
环境变量是这样做的一种方便方式
DASK_DISTRIBUTED__WORKER__DAEMON=False
项目详情
dask-pytorch-ddp-0.2.2.tar.gz 的哈希值
算法 | 哈希摘要 | |
---|---|---|
SHA256 | 1a4d9e1df1e0c0f26f7fea64451de90381574a9aa89acc299b1a29c7d930537a |
|
MD5 | 40e238a4620d3eadacd6a882da18c3eb |
|
BLAKE2b-256 | 58744390ccac984b68f23dedb477e1724ed7988b748bb8ff77925c87b5004aad |
dask_pytorch_ddp-0.2.2-py3-none-any.whl 的哈希值
算法 | 哈希摘要 | |
---|---|---|
SHA256 | cc8038f1efd1545c3f00b9abcfb6b41a0706f686315238d55072df0546659cba |
|
MD5 | f13faa27f4831b7afa31dc21a72f69f3 |
|
BLAKE2b-256 | 8b26c89e04f695433fa95c8e914d614bc94c03b7624617c3004c51bbe7ec198a |