用于在dask集群上设置torch DDP的库
项目描述
dask-pytorch-ddp
dask-pytorch-ddp 是一个Python包,它使使用分布式数据并行在Dask集群上训练PyTorch模型变得简单。项目的预期范围是
- 在Dask集群上启动PyTorch工作进程
- 使用分布式数据存储(例如,S3)作为常规PyTorch数据集
- 跟踪和记录中间结果、训练统计信息和检查点的机制。
目前,这个库和提供的示例主要针对计算机视觉任务,但这个库旨在对任何类型的PyTorch任务都很有用。唯一真正特定于图像处理的是 S3ImageFolder 数据集类。在图像之外实现PyTorch数据集(假设映射样式随机访问)需要实现 __getitem__(self, idx: int): 和 __len__(self):。我们计划在未来添加更多其他用例的示例,并欢迎提供扩展功能的PR。
典型的非dask工作流程
非dask PyTorch使用的典型示例如下
加载数据
创建一个数据集(ImageFolder),并用DataLoader包装它
transform = transforms.Compose([
    transforms.Resize(256),
    transforms.CenterCrop(250),
    transforms.ToTensor()
])
whole_dataset = ImageFolder(path, transform=transform)
batch_size = 100
num_workers = 64
indices = list(range(len(data)))
np.random.shuffle(indices)
train_idx = indices[:num]
test_idx = indices[num:num+num]
train_sampler = SubsetRandomSampler(train_idx)
train_loader = DataLoader(data, sampler=train_sampler, batch_size=batch_size, num_workers=num_workers)
训练模型
遍历数据集,并通过优化器步进来训练模型
device = torch.device(0)
net = models.resnet18(pretrained=False)
model = net.to(device)
device_ids = [0]
criterion = nn.CrossEntropyLoss().cuda()
lr = 0.001
optimizer = optim.SGD(model.parameters(), lr=lr, momentum=0.9)
count = 0
for epoch in range(n_epochs):
    model.train()  # Set model to training mode
    for inputs, labels in train_loader:
        inputs = inputs.to(device)
        labels = labels.to(device)
        outputs = model(inputs)
        _, preds = torch.max(outputs, 1)
        loss = criterion(outputs, labels)
        # zero the parameter gradients
        optimizer.zero_grad()
        loss.backward()
        optimizer.step()
        count += 1
现在使用Dask
使用dask_pytorch_ddp和PyTorch分布式数据并行,我们可以在多个工作者上训练,如下所示
加载数据
从S3加载数据集,并显式设置多进程上下文(Dask默认为spawn,但PyTorch通常配置为使用fork)
from dask_pytorch_ddp.data import S3ImageFolder
whole_dataset = S3ImageFolder(bucket, prefix, transform=transform)
train_loader = torch.utils.data.DataLoader(
    whole_dataset, sampler=train_sampler, batch_size=batch_size, num_workers=num_workers, multiprocessing_context=mp.get_context('fork')
)
并行训练
将训练循环包装在函数中(并添加指标记录。这不是必需的,但非常有用)。将模型转换为PyTorch分布式数据并行(DDP)模型,该模型知道如何在工作者之间同步梯度。
import uuid
import pickle
import logging
import json
key = uuid.uuid4().hex
rh = DaskResultsHandler(key)
def run_transfer_learning(bucket, prefix, samplesize, n_epochs, batch_size, num_workers, train_sampler):
    worker_rank = int(dist.get_rank())
    device = torch.device(0)
    net = models.resnet18(pretrained=False)
    model = net.to(device)
    model = DDP(model, device_ids=[0])
    criterion = nn.CrossEntropyLoss().cuda()
    lr = 0.001
    optimizer = optim.SGD(model.parameters(), lr=lr, momentum=0.9)
    whole_dataset = S3ImageFolder(bucket, prefix, transform=transform)
    train_loader = torch.utils.data.DataLoader(
        whole_dataset,
        sampler=train_sampler,
        batch_size=batch_size,
        num_workers=num_workers,
        multiprocessing_context=mp.get_context('fork')
    )
    count = 0
    for epoch in range(n_epochs):
        # Each epoch has a training and validation phase
        model.train()  # Set model to training mode
        for inputs, labels in train_loader:
            dt = datetime.datetime.now().isoformat()
            inputs = inputs.to(device)
            labels = labels.to(device)
            outputs = model(inputs)
            _, preds = torch.max(outputs, 1)
            loss = criterion(outputs, labels)
            # zero the parameter gradients
            optimizer.zero_grad()
            loss.backward()
            optimizer.step()
            count += 1
            # statistics
            rh.submit_result(
                f"worker/{worker_rank}/data-{dt}.json",
                json.dumps({'loss': loss.item(), 'epoch': epoch, 'count': count, 'worker': worker_rank})
            )
            if (count % 100) == 0 and worker_rank == 0:
                rh.submit_result(f"checkpoint-{dt}.pkl", pickle.dumps(model.state_dict()))
它是如何工作的?
dask-pytorch-ddp主要是对现有pytorch功能的封装。pytorch.distributed提供了分布式数据并行(DDP)的基础设施。
在DDP中,你创建N个工作者,0号工作者是“主”,负责协调缓冲区和梯度的同步。在SGD中,梯度通常在批量的所有数据点之间平均。通过在多个工作者上运行批次并平均梯度,DDP可以使你以更大的批次大小((N * batch_size))运行SGD。
dask-pytorch-ddp设置了一些环境变量来配置“主”主机和端口,然后在训练之前调用init_process_group,训练后调用destroy_process_group。这是数据科学家通常手动完成的过程。
多GPU机器
dask_cuda_worker会自动为每个工作者旋转CUDA_VISIBLE_DEVICES(通常每个GPU一个)。因此,你的PyTorch代码应该始终从0号GPU开始。
例如,如果我有8个GPU的机器,第3个工作者将CUDA_VISIBLE_DEVICES设置为2,3,4,5,6,7,0,1。在那个工作者上,如果调用torch.device(0),我会得到GPU 2。
还有其他什么?
dask-pytorch-ddp还实现了一个基于S3的ImageFolder。计划实现更多分布式友好的数据集。dask-pytorch-ddp还实现了一个基本的成果聚合框架,这样就可以轻松地跨不同工作者收集训练指标。目前,只实现了利用Dask pub-sub通信协议的DaskResultsHandler,但计划实现基于S3的结果处理器。
一些注意事项
Dask通常生成进程。PyTorch通常采用Fork。当使用具有多进程支持的数据加载器时,最好传递Fork多进程上下文,以强制在数据加载器中使用Forking。
一些Dask部署不允许生成进程。要覆盖此限制,可以更改distributed.worker.daemon设置。
环境变量是这样做的一种方便方式
DASK_DISTRIBUTED__WORKER__DAEMON=False
项目详情
dask-pytorch-ddp-0.2.2.tar.gz 的哈希值
| 算法 | 哈希摘要 | |
|---|---|---|
| SHA256 | 1a4d9e1df1e0c0f26f7fea64451de90381574a9aa89acc299b1a29c7d930537a | |
| MD5 | 40e238a4620d3eadacd6a882da18c3eb | |
| BLAKE2b-256 | 58744390ccac984b68f23dedb477e1724ed7988b748bb8ff77925c87b5004aad | 
dask_pytorch_ddp-0.2.2-py3-none-any.whl 的哈希值
| 算法 | 哈希摘要 | |
|---|---|---|
| SHA256 | cc8038f1efd1545c3f00b9abcfb6b41a0706f686315238d55072df0546659cba | |
| MD5 | f13faa27f4831b7afa31dc21a72f69f3 | |
| BLAKE2b-256 | 8b26c89e04f695433fa95c8e914d614bc94c03b7624617c3004c51bbe7ec198a |