跳转到主要内容

用于在dask集群上设置torch DDP的库

项目描述

dask-pytorch-ddp

dask-pytorch-ddp 是一个Python包,它使使用分布式数据并行在Dask集群上训练PyTorch模型变得简单。项目的预期范围是

  • 在Dask集群上启动PyTorch工作进程
  • 使用分布式数据存储(例如,S3)作为常规PyTorch数据集
  • 跟踪和记录中间结果、训练统计信息和检查点的机制。

目前,这个库和提供的示例主要针对计算机视觉任务,但这个库旨在对任何类型的PyTorch任务都很有用。唯一真正特定于图像处理的是 S3ImageFolder 数据集类。在图像之外实现PyTorch数据集(假设映射样式随机访问)需要实现 __getitem__(self, idx: int):__len__(self):。我们计划在未来添加更多其他用例的示例,并欢迎提供扩展功能的PR。

典型的非dask工作流程

非dask PyTorch使用的典型示例如下

加载数据

创建一个数据集(ImageFolder),并用DataLoader包装它

transform = transforms.Compose([
    transforms.Resize(256),
    transforms.CenterCrop(250),
    transforms.ToTensor()
])

whole_dataset = ImageFolder(path, transform=transform)

batch_size = 100
num_workers = 64
indices = list(range(len(data)))
np.random.shuffle(indices)
train_idx = indices[:num]
test_idx = indices[num:num+num]

train_sampler = SubsetRandomSampler(train_idx)
train_loader = DataLoader(data, sampler=train_sampler, batch_size=batch_size, num_workers=num_workers)

训练模型

遍历数据集,并通过优化器步进来训练模型

device = torch.device(0)
net = models.resnet18(pretrained=False)
model = net.to(device)
device_ids = [0]

criterion = nn.CrossEntropyLoss().cuda()
lr = 0.001
optimizer = optim.SGD(model.parameters(), lr=lr, momentum=0.9)
count = 0
for epoch in range(n_epochs):
    model.train()  # Set model to training mode
    for inputs, labels in train_loader:
        inputs = inputs.to(device)
        labels = labels.to(device)
        outputs = model(inputs)
        _, preds = torch.max(outputs, 1)
        loss = criterion(outputs, labels)

        # zero the parameter gradients
        optimizer.zero_grad()
        loss.backward()
        optimizer.step()
        count += 1

现在使用Dask

使用dask_pytorch_ddp和PyTorch分布式数据并行,我们可以在多个工作者上训练,如下所示

加载数据

从S3加载数据集,并显式设置多进程上下文(Dask默认为spawn,但PyTorch通常配置为使用fork)

from dask_pytorch_ddp.data import S3ImageFolder

whole_dataset = S3ImageFolder(bucket, prefix, transform=transform)
train_loader = torch.utils.data.DataLoader(
    whole_dataset, sampler=train_sampler, batch_size=batch_size, num_workers=num_workers, multiprocessing_context=mp.get_context('fork')
)

并行训练

将训练循环包装在函数中(并添加指标记录。这不是必需的,但非常有用)。将模型转换为PyTorch分布式数据并行(DDP)模型,该模型知道如何在工作者之间同步梯度。

import uuid
import pickle
import logging
import json


key = uuid.uuid4().hex
rh = DaskResultsHandler(key)

def run_transfer_learning(bucket, prefix, samplesize, n_epochs, batch_size, num_workers, train_sampler):
    worker_rank = int(dist.get_rank())
    device = torch.device(0)
    net = models.resnet18(pretrained=False)
    model = net.to(device)
    model = DDP(model, device_ids=[0])

    criterion = nn.CrossEntropyLoss().cuda()
    lr = 0.001
    optimizer = optim.SGD(model.parameters(), lr=lr, momentum=0.9)
    whole_dataset = S3ImageFolder(bucket, prefix, transform=transform)

    train_loader = torch.utils.data.DataLoader(
        whole_dataset,
        sampler=train_sampler,
        batch_size=batch_size,
        num_workers=num_workers,
        multiprocessing_context=mp.get_context('fork')
    )

    count = 0
    for epoch in range(n_epochs):
        # Each epoch has a training and validation phase
        model.train()  # Set model to training mode
        for inputs, labels in train_loader:
            dt = datetime.datetime.now().isoformat()
            inputs = inputs.to(device)
            labels = labels.to(device)
            outputs = model(inputs)
            _, preds = torch.max(outputs, 1)
            loss = criterion(outputs, labels)

            # zero the parameter gradients
            optimizer.zero_grad()
            loss.backward()
            optimizer.step()
            count += 1

            # statistics
            rh.submit_result(
                f"worker/{worker_rank}/data-{dt}.json",
                json.dumps({'loss': loss.item(), 'epoch': epoch, 'count': count, 'worker': worker_rank})
            )
            if (count % 100) == 0 and worker_rank == 0:
                rh.submit_result(f"checkpoint-{dt}.pkl", pickle.dumps(model.state_dict()))

它是如何工作的?

dask-pytorch-ddp主要是对现有pytorch功能的封装。pytorch.distributed提供了分布式数据并行(DDP)的基础设施。

在DDP中,你创建N个工作者,0号工作者是“主”,负责协调缓冲区和梯度的同步。在SGD中,梯度通常在批量的所有数据点之间平均。通过在多个工作者上运行批次并平均梯度,DDP可以使你以更大的批次大小((N * batch_size))运行SGD。

dask-pytorch-ddp设置了一些环境变量来配置“主”主机和端口,然后在训练之前调用init_process_group,训练后调用destroy_process_group。这是数据科学家通常手动完成的过程。

多GPU机器

dask_cuda_worker会自动为每个工作者旋转CUDA_VISIBLE_DEVICES(通常每个GPU一个)。因此,你的PyTorch代码应该始终从0号GPU开始。

例如,如果我有8个GPU的机器,第3个工作者将CUDA_VISIBLE_DEVICES设置为2,3,4,5,6,7,0,1。在那个工作者上,如果调用torch.device(0),我会得到GPU 2。

还有其他什么?

dask-pytorch-ddp还实现了一个基于S3的ImageFolder。计划实现更多分布式友好的数据集。dask-pytorch-ddp还实现了一个基本的成果聚合框架,这样就可以轻松地跨不同工作者收集训练指标。目前,只实现了利用Dask pub-sub通信协议DaskResultsHandler,但计划实现基于S3的结果处理器。

一些注意事项

Dask通常生成进程。PyTorch通常采用Fork。当使用具有多进程支持的数据加载器时,最好传递Fork多进程上下文,以强制在数据加载器中使用Forking。

一些Dask部署不允许生成进程。要覆盖此限制,可以更改distributed.worker.daemon设置。

环境变量是这样做的一种方便方式

DASK_DISTRIBUTED__WORKER__DAEMON=False

项目详情


下载文件

下载适合您平台版本的文件。如果您不确定要选择哪个,请了解更多关于安装包的信息。

源分布

dask-pytorch-ddp-0.2.2.tar.gz (9.4 kB 查看哈希)

上传时间

构建分布

dask_pytorch_ddp-0.2.2-py3-none-any.whl (9.9 kB 查看哈希值)

上传时间 Python 3

支持