Pytorch DP原理

DP(DataParallel)=数据并行,DP框架会将数据切片,然后模型拷贝,并在不同的backend上并行执行。官方手册说明

DP使用

1
2
3
4
5
6
7
8
9
10
11
12
model = LinearModel()
model = torch.nn.DataParallel(model,device_ids=[0,1,2])
criterion = torch.nn.MSELoss(size_average=False)
optimizer = torch.optim.SGD(model.parameters(), lr=0.01)

for epoch in range(10):
y_pred = model(x_data)
loss = criterion(y_pred, y_data)

optimizer.zero_grad()
loss.backward()
optimizer.step()

从代码中看到,DP仅支持cuda,xpu或者昇腾,并不支持cpu。

DP原理

相关文件

文件 功能
torch/nn/parallel/data_parallel.py DP类实现
torch/nn/parallel/_functions.py DP中使用的自定义autograd Function
torch/nn/parallel/comm.py 线程间数据传递接口,例如,gather,scatter等
torch/nn/parallel/replicate.py 模型拷贝相关
torch/nn/parallel/parallel_apply.py 多线程执行模型
torch/nn/parallel/scatter_gather.py scatter和gather的封装
torch/csrc/cuda/python_comm.cpp C语言实现的数据传递函数extension注册
torch/csrc/cuda/comm.cpp C语言的数据传递实现

基本原理

DP基于单机多卡,所有的卡都参与训练。相对于非数据并行训练来说,DP会将一个batch数据切分为更小的batch,将数据复制到每一张计算的卡上,然后复制模型到所有的卡上,进行多线程执行,Forward计算完成后,会在device[0]上收集到一组预测值。device[0]计算loss,然后执行Backward计算梯度。(计算Backward没有看到创建线程的过程,单线程执行??

DP原理图

DP wapper

回顾DP使用方法,参与数据并行的模型进需要使用DataParalle包一层即可。

1
model = torch.nn.DataParallel(model,device_ids=[0,1,2])

可以理解,下图就是上述代码的实现,DP就是在原本的模型执行之前增加了数据切分(Scatter),模型复制(Broadcast),然后将模型的并行执行结果进行合并(Gather)。

image-20230621152641246

数据传递

DP的Forward函数核心代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
def forward(self, *inputs: Any, **kwargs: Any) -> Any:
...
# 训练数据切分,并拷贝到每一个卡上
inputs, module_kwargs = self.scatter(inputs, kwargs, self.device_ids)

...
# 模型数据拷贝到每一个卡上
replicas = self.replicate(self.module, self.device_ids[:len(inputs)])
# 每张卡启动一个线程执行,每个线程执行模型的forward函数
outputs = self.parallel_apply(replicas, inputs, module_kwargs)
# 集合所有线程的预测结果
return self.gather(outputs, self.output_device)

由于是单机单进程,所以数据的拷贝过程相对简单,以上述代码的scatter和broadcast为例,数据切分,然后将切分后的数据移动到指定设备上。但是Tensor还是在同一个数组中。

image-20230626101410741
image-20230621152441395

自定义自动梯度方法

Pytorch实现了自动梯度计算,除了官方实现的梯度计算之外,还能够自定义梯度计算方法,具体描述参考官方说明。以下是代码中对Function类的使用举例说明:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
Examples::

>>> # xdoctest: +REQUIRES(env:TORCH_DOCTEST_AUTOGRAD)
>>> class Exp(Function):
>>> @staticmethod
>>> def forward(ctx, i):
>>> result = i.exp()
>>> ctx.save_for_backward(result)
>>> return result
>>>
>>> @staticmethod
>>> def backward(ctx, grad_output):
>>> result, = ctx.saved_tensors
>>> return grad_output * result
>>>
>>> # Use it by calling the apply method:
>>> # xdoctest: +SKIP
>>> output = Exp.apply(input)
"""

实现自定义自动梯度方法,需要继承Function类,并实现forward和backward方法,并且这两个方法均是静态的。使用自定义自动梯度方法的时候,不应该调用forward,而是调用Exp.apply,这样会返回forward的结果。相当于自己定义了算子的前向和后向的处理方法。当在执行反向传播的时候,会按反顺序调用自定义自动梯度方法的backward方法。

DP实现了三个自定义自动梯度方法,分别是Broadcast,Gather和Scatter,其反向传播方法内分别调用了Reduce_add,Scatter和Gather,即forward和backward中的动作是成对的。

训练流程

正向

  • ①:将输入的一个batch切分成三个更小的batch
  • ②:将模型的参数复制三份,分别发送到三个device上
  • ③:启动三个线程,每个线程在不同的device上进行原始模型的前向计算
  • ④:将三个线程的输出结果拼接到一起

Loss

  • 在device[0]上计算Loss

反向

  • ④:将Loss切分,分别发送给对应的模型
  • ③:执行三个模型的backward方法,更新梯度(Backward Engine多线程处理
  • ②:将三个模型上的梯度相加,并储存到Original Module中(直接相加?
  • ①:将三个模型的梯度信息拼接到一起
image-20230621144922423

官方建议

  1. 不建议使用DP来进行数据并行,因为这不是一个真正的并发执行(python GIL),并且,由于数据的转移,反而会减慢模型的执行,建议使用DDP。
  2. 所有拷贝来的数据都是一次性的,每次迭代后都会销毁重建,所以这些模型上如果有数据修改,将不会持久化。(Input最后做的gather是为了下一层网络的输入

参考

Pytorch DATAPARALLEL

Extending torch.autograd

PyTorch 源码解读之 DP & DDP:模型并行和分布式训练解析

[原创][深度][PyTorch] DDP系列第二篇:实现原理与源代码解析