跳转到主要内容

dask-pytorch已被重命名为dask-pytorch-ddp

项目描述

dask-pytorch-ddp

注意:dask-pytorch已被重命名为dask-pytorch-ddp!

我们发布了dask-pytorch v0.1.2的最终版本,该版本与dask-pytorch-ddp v0.2.0相同,以简化此过渡。您可以在PyPI上找到dask-pytorch-ddp:[https://pypi.python.org/pypi/dask-pytorch-ddp/](https://pypi.python.org/pypi/dask-pytorch-ddp/)

dask-pytorch-ddp是一个Python包,它使用分布式数据并行使在Dask集群上训练PyTorch模型变得简单。项目的预期范围包括

  • 在Dask集群上启动PyTorch工作节点
  • 使用分布式数据存储(例如,S3)作为常规PyTorch数据集
  • 跟踪和记录中间结果、训练统计数据和检查点的机制。

目前,这个库和提供的示例针对计算机视觉任务进行了优化,但这个库旨在适用于任何类型的PyTorch任务。唯一真正特定于图像处理的是S3ImageFolder数据集类。在图像处理之外实现PyTorch数据集(假设为映射样式随机访问)需要实现__getitem__(self, idx: int):__len__(self):。我们计划在将来添加更多其他用例的示例,并欢迎PR扩展功能。

典型的非Dask工作流程

非Dask PyTorch使用的典型示例如下

加载数据

创建数据集(ImageFolder),并用DataLoader包装它

transform = transforms.Compose([
    transforms.Resize(256),
    transforms.CenterCrop(250),
    transforms.ToTensor()
])

whole_dataset = ImageFolder(path, transform=transform)

batch_size = 100
num_workers = 64
indices = list(range(len(data)))
np.random.shuffle(indices)
train_idx = indices[:num]
test_idx = indices[num:num+num]

train_sampler = SubsetRandomSampler(train_idx)
train_loader = DataLoader(data, sampler=train_sampler, batch_size=batch_size, num_workers=num_workers)

训练模型

遍历数据集,并通过优化器步进来训练模型

device = torch.device(0)
net = models.resnet18(pretrained=False)
model = net.to(device)
device_ids = [0]

criterion = nn.CrossEntropyLoss().cuda()
lr = 0.001
optimizer = optim.SGD(model.parameters(), lr=lr, momentum=0.9)
count = 0
for epoch in range(n_epochs):
    model.train()  # Set model to training mode
    for inputs, labels in train_loader:
        inputs = inputs.to(device)
        labels = labels.to(device)
        outputs = model(inputs)
        _, preds = torch.max(outputs, 1)
        loss = criterion(outputs, labels)

        # zero the parameter gradients
        optimizer.zero_grad()
        loss.backward()
        optimizer.step()
        count += 1

现在在Dask上

使用dask_pytorch_ddp和PyTorch分布式数据并行,我们可以如下进行训练

加载数据

从S3加载数据集,并显式设置多进程上下文(Dask默认为spawn,但PyTorch通常配置为使用fork)

from dask_pytorch_ddp.data import S3ImageFolder

whole_dataset = S3ImageFolder(bucket, prefix, transform=transform)
train_loader = torch.utils.data.DataLoader(
    whole_dataset, sampler=train_sampler, batch_size=batch_size, num_workers=num_workers, multiprocessing_context=mp.get_context('fork')
)

并行训练

将训练循环包裹在一个函数中(并添加指标记录。不是必需的,但非常有用)。将模型转换为PyTorch分布式数据并行(DDP)模型,该模型知道如何在工作者之间同步梯度。

import uuid
import pickle
import logging
import json


key = uuid.uuid4().hex
rh = DaskResultsHandler(key)

def run_transfer_learning(bucket, prefix, samplesize, n_epochs, batch_size, num_workers, train_sampler):
    worker_rank = int(dist.get_rank())
    device = torch.device(0)
    net = models.resnet18(pretrained=False)
    model = net.to(device)
    model = DDP(model, device_ids=[0])

    criterion = nn.CrossEntropyLoss().cuda()
    lr = 0.001
    optimizer = optim.SGD(model.parameters(), lr=lr, momentum=0.9)
    whole_dataset = S3ImageFolder(bucket, prefix, transform=transform)

    train_loader = torch.utils.data.DataLoader(
        whole_dataset,
        sampler=train_sampler,
        batch_size=batch_size,
        num_workers=num_workers,
        multiprocessing_context=mp.get_context('fork')
    )

    count = 0
    for epoch in range(n_epochs):
        # Each epoch has a training and validation phase
        model.train()  # Set model to training mode
        for inputs, labels in train_loader:
            dt = datetime.datetime.now().isoformat()
            inputs = inputs.to(device)
            labels = labels.to(device)
            outputs = model(inputs)
            _, preds = torch.max(outputs, 1)
            loss = criterion(outputs, labels)

            # zero the parameter gradients
            optimizer.zero_grad()
            loss.backward()
            optimizer.step()
            count += 1

            # statistics
            rh.submit_result(
                f"worker/{worker_rank}/data-{dt}.json",
                json.dumps({'loss': loss.item(), 'epoch': epoch, 'count': count, 'worker': worker_rank})
            )
            if (count % 100) == 0 and worker_rank == 0:
                rh.submit_result(f"checkpoint-{dt}.pkl", pickle.dumps(model.state_dict()))

它是如何工作的?

dask-pytorch-ddp主要是在现有的pytorch功能周围的一个包装。 pytorch.distributed提供了分布式数据并行(DDP)的基础设施。

在DDP中,你创建N个工作者,0号工作者是“主”,并协调缓冲区和梯度的同步。在SGD中,梯度通常在批次中的所有数据点之间平均。通过在多个工作者上运行批次并平均梯度,DDP使你能够使用更大的批次大小(N * batch_size)来运行SGD。

dask-pytorch-ddp设置了一些环境变量来配置“主”主机和端口,然后在训练之前调用init_process_group,在训练之后调用destroy_process_group。这是数据科学家通常手动执行的过程。

多GPU机器

dask_cuda_worker自动为每个工作者旋转CUDA_VISIBLE_DEVICES(通常每个GPU一个)。因此,你的PyTorch代码应该始终从0号GPU开始。

例如,如果我有8个GPU的机器,第3个工作者将CUDA_VISIBLE_DEVICES设置为2,3,4,5,6,7,0,1。在该工作者上,如果我调用torch.device(0),我将得到GPU 2。

还有什么?

dask-pytorch-ddp还实现了基于S3的ImageFolder。还计划实现更多面向分布式友好的数据集。dask-pytorch-ddp还实现了一个基本的成果汇总框架,以便轻松收集不同工作者之间的训练指标。目前,仅实现了利用Dask pub-sub通信协议DaskResultsHandler,但计划实现基于S3的结果处理程序。

一些注意事项

Dask通常生成进程。PyTorch通常分叉。当使用支持多进程的数据加载器时,传递Fork多进程上下文以强制在数据加载器中使用分叉是一个好主意。

某些Dask部署不允许生成进程。要覆盖此设置,你可以更改distributed.worker.daemon设置。

环境变量是一种方便的执行方式

DASK_DISTRIBUTED__WORKER__DAEMON=False

项目详情


下载文件

下载适合您平台的文件。如果您不确定选择哪个,请了解更多关于安装包的信息。

源分布

dask-pytorch-0.1.2.tar.gz (8.7 kB 查看哈希值)

上传时间 源代码

构建版本

dask_pytorch-0.1.2-py3-none-any.whl (9.6 kB 查看哈希值)

上传时间 Python 3

支持者