码力全开 / PyTorch分布式数据并行入门

Created Mon, 04 Aug 2025 10:45:50 +0800 Modified Tue, 05 Aug 2025 09:52:18 +0800
1242 Words 2 min

当要训练的模型数据比较大时,可以通过并行技术加速训练。在PyTorch的distributed包中提供了分布式数据并行的实现。

首先先介绍下DataParallel(缩写为DP)与DistributedDataParallel(缩写为DDP)的区别。其中前者是单进程多线程的,其只能在单机上进行运行。相反后者是多进程并支持单/多机训练的。

为了创建DDP模型,首先需要正确设置进程组。下面是一个简单的示例代码:

import os
import sys
import tempfile
import torch
import torch.distributed as dist
import torch.nn as nn
import torch.optim as optim
import torch.multiprocessing as mp

from torch.nn.parallel import DistributedDataParallel as DDP

def setup(rank, world_size):
    os.environ['MASTER_ADDR'] = 'localhost'
    os.environ['MASTER_PORT'] = '12355'

    # initialize the process group
    dist.init_process_group("gloo", rank=rank, world_size=world_size)

def cleanup():
    dist.destroy_process_group()

其中关于DistributedDataParallel的内容可以参考

需要注意的是,在Windows系统上torch.distributed包只支持Gloo、FileStore和TcpStore后端。通过init_process_group初始化进程组,因为分布式数据并行整个还是需要进行通信的,否则也不知道其完成进度。

之后创建1个简单的模块,并使用DDP进行装饰并使用一些虚拟的输入数据。

class ToyModel(nn.Module):
    def __init__(self):
        super(ToyModel, self).__init__()
        self.net1 = nn.Linear(10, 10)
        self.relu = nn.ReLU()
        self.net2 = nn.Linear(10, 5)

    def forward(self, x):
        return self.net2(self.relu(self.net1(x)))


def demo_basic(rank, world_size):
    print(f"Running basic DDP example on rank {rank}.")
    # 初始化GPU进程组
    setup(rank, world_size)

    # create model and move it to GPU with id rank
    model = ToyModel().to(rank)
    ddp_model = DDP(model, device_ids=[rank])

    loss_fn = nn.MSELoss()
    optimizer = optim.SGD(ddp_model.parameters(), lr=0.001)

    optimizer.zero_grad()
    outputs = ddp_model(torch.randn(20, 10))
    labels = torch.randn(20, 5).to(rank)
    loss_fn(outputs, labels).backward()
    optimizer.step()
    
    # 对进程组进行清理
    cleanup()
    print(f"Finished running basic DDP example on rank {rank}.")


def run_demo(demo_fn, world_size):
    mp.spawn(demo_fn,
             args=(world_size,),
             nprocs=world_size,
             join=True)

而点对点通信主要通过阻塞的sendrecv函数来实现,或者使用立即计数的isendirecv函数。关于其集体的通信的方式主要有如下几种:

  1. Scatter,平均划分
  2. Gather,汇聚
  3. Reduce
  4. All-Reduce
  5. Broadcast,广播
  6. All-Gather

Gather与Reduce都将数据汇总在一个节点上,但是Reduce会对这些数据进行相加得到1个值,而Gather是得到1个列表。

与点对点通信相反,集体的方式允许组内所有进程进行通信。可以通过dist.new_group方法创建一个组。

而数据的采样可以使用DistributedSampler。这样可以确保分布式中数据量保持不变。

下面是在单机双GPU上训练的模型代码,比如在T4 x 2的服务器上:

import os
import torch
import torch.nn as nn
import torch.optim as optim
import torch.distributed as dist
from torch.nn.parallel import DistributedDataParallel as DDP
from torch.utils.data import Dataset, DataLoader
from torch.utils.data.distributed import DistributedSampler

# 1. 定义简单模型
class SimpleModel(nn.Module):
    def __init__(self):
        super().__init__()
        self.net = nn.Sequential(
            nn.Linear(10, 100),
            nn.ReLU(),
            nn.Linear(100, 50),
            nn.ReLU(),
            nn.Linear(50, 2)
        )
    
    def forward(self, x):
        return self.net(x)

# 2. 定义虚拟数据集
class DummyDataset(Dataset):
    def __init__(self, size=10000):
        self.data = torch.randn(size, 10)
        self.targets = torch.randint(0, 2, (size,))
    
    def __len__(self):
        return len(self.data)
    
    def __getitem__(self, idx):
        return self.data[idx], self.targets[idx]

# 3. 训练函数
def train(rank, world_size):
    # 初始化进程组
    os.environ['MASTER_ADDR'] = 'localhost'
    os.environ['MASTER_PORT'] = '12355'
    dist.init_process_group("nccl", rank=rank, world_size=world_size)
    
    # 设置当前GPU设备
    torch.cuda.set_device(rank)
    
    # 创建模型并移至GPU
    model = SimpleModel().to(rank)
    model = DDP(model, device_ids=[rank])
    
    # 定义损失函数和优化器
    criterion = nn.CrossEntropyLoss()
    optimizer = optim.SGD(model.parameters(), lr=0.01)
    
    # 准备数据加载器
    dataset = DummyDataset()
    sampler = DistributedSampler(dataset, num_replicas=world_size, rank=rank)
    dataloader = DataLoader(dataset, batch_size=32, sampler=sampler)
    
    # 训练循环
    for epoch in range(10):
        sampler.set_epoch(epoch)  # 确保每个epoch有不同的shuffle
        model.train()
        
        for batch_idx, (data, target) in enumerate(dataloader):
            data, target = data.to(rank), target.to(rank)
            optimizer.zero_grad()
            output = model(data)
            loss = criterion(output, target)
            loss.backward()
            optimizer.step()
            
            if batch_idx % 100 == 0:
                print(f"Rank {rank}, Epoch {epoch}, Batch {batch_idx}, Loss: {loss.item()}")
    
    # 清理进程组
    dist.destroy_process_group()

if __name__ == "__main__":
    world_size = 2  # 使用2个GPU
    torch.multiprocessing.spawn(train, args=(world_size,), nprocs=world_size, join=True)

将上述代码保存在ddp_train.py中,之后在单机双GPU的Linux系统上进行运行:

python ddp_train.py

之后就可以看到其输出了。

最后,还可以借助其他一些框架,如DeepSpeed简化分布式数据的训练过程。

参考文章:

https://docs.pytorch.org/tutorials/intermediate/ddp_tutorial.html

https://docs.pytorch.org/tutorials/intermediate/dist_tuto.html

https://www.cnblogs.com/liyier/p/18136458

如果喜欢这篇文章或对您有帮助,可以:[☕] 请我喝杯咖啡 | [💓] 小额赞助