Pytorch DDP原理

DDP=分布式数据并行(DISTRIBUTED DATA PARALLEL),和DP一样也是一种数据并行的方法,详细文档可以参考官方手册

DDP使用

以下是使用NPU进行2个节点并行的示例代码。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
import os
import torch
import torch_npu
import torch.distributed as dist
import torch.multiprocessing as mp
import torch.nn as nn
import torch.optim as optim
from torch.nn.parallel import DistributedDataParallel as DDP


def example(rank, world_size):
torch.npu.set_device(rank)
device = torch.device('npu')
# create default process group
dist.init_process_group("hccl", rank=rank, world_size=world_size)
# create local model
model = nn.Linear(10, 10).to(device)
# construct DDP model
ddp_model = DDP(model, device_ids=[rank])
# define loss function and optimizer
loss_fn = nn.MSELoss()
optimizer = optim.SGD(ddp_model.parameters(), lr=0.001)
# forward pass
outputs = ddp_model(torch.randn(20, 10).to(device))
labels = torch.randn(20, 10).to(device)

# backward pass
loss = loss_fn(outputs, labels)
loss.backward()

# update parameters
optimizer.step()

def main():
world_size = 2
mp.spawn(example,
args=(world_size,),
nprocs=world_size,
join=True)

if __name__=="__main__":
# Environment variables which need to be
# set when using c10d's default "env"
# initialization mode.
os.environ["MASTER_ADDR"] = "localhost"
os.environ["MASTER_PORT"] = "29500"
main()

DDP原理

相关文件

文件 功能
torch/nn/parallel/distributed.py DistributedDataParallel类实现
torch/csrc/distributed/c10d/reducer.cpp DDP reduce类实现文件
torch/csrc/distributed/c10d/reducer.hpp DDP reduce接口文件
torch/csrc/distributed/c10d/init.cpp python C插件注册文件

进程交互

image-20230629163126602
  • 主线程通过torch.multiprocessing,fork出新的进程,满足word_size的要求,然后各个线程独立运行,主进程将自己的host和port放到环境变量中共子进程使用。
  • 各个进程初始化process group,这些进程属于同一个进程组。
  • 每个进程分别创建模型,DDP对象,定义loss函数和优化器。
  • 创建DDP对象时,主进程会同步模型参数,这一步完成后,各个进程的模型一致。
  • 每个模型输入部分数据,这些数据需要调用者自行分割,可以使用DistributedSampler。
  • 然后执行模型的训练过程,每次backward执行完后,调用注册的reducer hook函数,做梯度聚合

Reducer

普通模型的DDP,除了每次反向执行后的梯度同步之外,进程之间没有其他通信的需求。DDP构造时,会在每个模型参数上配置回调函数(autograd_hook),当参数的梯度计算完成后,会调用这个回调函数。为了能够一边计算,一边同步梯度数据,DDP将相邻参数分组,每个组叫一个bucket。

ddp_grad_sync.png

这是参数分组和allreduce的官方示意图,回调函数的执行流程如下:

image-20230629170221846
  • 每个参数梯度计算完成后,均会调用回调函数,该回调函数会将参数标记为ready。
  • 然后检查bucket中的pending是否为0,也就是bucket的参数是否全部完成梯度计算,如果完成,则准备进行一步all reduce。
  • all reduce是一个异步方法,任务执行后不阻塞,继续执行。(如果某个进程的当前bucket没有执行完成,dist backend会等待,执行完成后异步返回)。
  • 当所有的bucket都发起all reduce后,等待每个bucket的all reduce结果,所有结果都返回后,完成本次backward。

参考

DISTRIBUTED DATA PARALLEL

原创 深度 PyTorch DDP系列第二篇:实现原理与源代码解析