作者颜挺帅@知乎(已授权)
来源丨https://zhuanlan.zhihu.com/p/489892744
极市平台编辑
由于工作需要,最近补充了分布式训练的知识。经过一番理论学习,我仍然觉得意犹未尽,许多知识点无法准确get(例如:分布式原语:scatter、all reduce代码层面应该是什么样的,ring all reduce 梯度同步时如何使用算法,parameter server如何部分更新参数)。
诺贝尔奖得主是著名的物理学家Richard Feynman办公室黑板上写着:"What I cannot create, I do not understand."。也常见于程序员"show me the code"的口号。因此,我计划写一系列关于分布式培训的文章,以代码的形式展示过去抽象分布式培训的概念,确保每个代码都可以执行、验证和复制,并贡献源代码让每个人相互交流。
经调查发现pytorch本系列文章将对分布式训练进行良好的抽象和完善的界面pytorch为了主框架,文章中的许多例子都来自pytorch在此基础上,对文档进行了调试和扩展。
最后,由于分布式训练的理论介绍网络上已经很多了,理论部分的介绍不会是本系列文章的重点,我会将重点放在代码层面的介绍上面。
Pytorch - 分布式训练极简体验:https://zhuanlan.zhihu.com/p/477073906
Pytorch - 分布式通信原语(附源码):https://zhuanlan.zhihu.com/p/478953028
Pytorch - 手写allreduce分布式训练(附源码):https://zhuanlan.zhihu.com/p/482557067
Pytorch - 算子间并行极简实现(附源码):https://zhuanlan.zhihu.com/p/483640235
Pytorch - 多机多卡极简实现(附源码):https://zhuanlan.zhihu.com/p/486130584
1. 介绍
Pytorch在1.9.0引入了torchrun,用其替代1.9.0以前版本的torch.distributed.launch
。torchrun在torch.distributed.launch
在功能的基础上,主要增加了两个功能:
Failover: 当worker当训练失败时,所有训练都将自动重新启动worker继续训练;
Elastic: 可以动态添加或删除node本文将通过一个例子来解释节点Elastic Training如何使用;
这个例子会先在Node0上启动4 GPU的worker group ,训练一段时间后,会是Node1上再启动4 GPU的workers,并与Node1上的workers构成新的worker group,最终形成2机8卡分布式训练。

2. 模型构建
神经网络模型是一个简单的全连接模型
classToyModel(nn.Module): def__init__(self): super(ToyModel,self).__init__() self.net1=nn.Linear(10,10) self.relu=nn.ReLU() self.net2=nn.Linear(10,5) defforward(self,x): returnself.net2(self.relu(self.net1(x)))
3. checkpoint 处理
因为再次添加或删除或删除node当时,一切都会发生worker kill掉下来,然后重切worker训练。因此,在训练代码中保存训练状态,以确保重启后继续训练。
需要保存的信息一般包括以下内容:
model :模型参数信息
optimizer :优化器的参数信心
epoch:目前执行到几个epoch
save和load代码如下所示
torch.save
:利用python的pickle将python的object 进行序列化,并保存到本地文件;torch.load
: 将torch.save本地文件反序列化并加载到内存中;model.state_dict():
存储了model 每个layer和其对应的param信息optimizer.state_dict()
:存储了优化器的参数信信息
defsave_checkpoint(epoch,model,optimizer,path): torch.save({ "epoch":epoch, "model_state_dict":model.state_dict(), "optimize_state_dict":optimizer.state_dict(), },path) defload_checkpoint(path): checkpoint=torch.load(path) returncheckpoint
4. 训练代码
初始化逻辑如下:
1~3行: 输出当前worker以下结果显示的关键环境变量
5~8行:创建模型、优化器和损失函数
10~12行:初始参数信息
14~19行:如果存在checkpoint,则加载checkpoint,并赋值给model、optimizer和firt_epoch
local_rank=int(os.environ["LOCAL_RANK"]) rank=int(os.environ["RANK"]) print(f"[{os.getpid()}](rank={rank},local_rank={local_rank})trainworkerstarting...") model=ToyModel().cuda(local_rank) ddp_model=DDP(model,[local_rank]) loss_fn=nn.MSELoss() optimizer=optim.SGD(ddp_model.parameters(),lr=0.001) &nsp;optimizer.zero_grad()
max_epoch = 100
first_epoch = 0
ckp_path = "checkpoint.pt"
if os.path.exists(ckp_path):
print(f"load checkpoint from {ckp_path}")
checkpoint = load_checkpoint(ckp_path)
model.load_state_dict(checkpoint["model_state_dict"])
optimizer.load_state_dict(checkpoint["optimize_state_dict"])
first_epoch = checkpoint["epoch"]
训练逻辑:
1行:epoch执行的次数为first_epoch到max_epoch,以便能够在worker被重启后继续原有的epoch继续训练;
2行:为了展示动态添加node效果,这里添加sleep函数来降低训练的速度;
3~8行:模型训练流程;
9行:为了简单,文本每个epoch进行一次checkpoint保存;将当前的epoch,model和optimizer保存到checkpoint中;
for i in range(first_epoch, max_epoch):
time.sleep(1) # 为了展示动态添加node效果,这里添加sleep函数来降低训练的速度
outputs = ddp_model(torch.randn(20, 10).to(local_rank))
labels = torch.randn(20, 5).to(local_rank)
loss = loss_fn(outputs, labels)
loss.backward()
print(f"[{os.getpid()}] epoch {i} (rank = {rank}, local_rank = {local_rank}) loss = {loss.item()}\n")
optimizer.step()
save_checkpoint(i, model, optimizer, ckp_path)
5. 启动方式
由于我们使用torchrun来启动多机多卡任务,无需使用spawn接口来启动多个进程(torchrun会负责将我们的python script启动为一个process),因此直接调用上文编写的train函数,并在前后分别添加DistributedDataParallel的初始化和效果函数即可。
下面代码描述了上文train接口的调用。
def run():
env_dict = {
key: os.environ[key]
for key in ("MASTER_ADDR", "MASTER_PORT", "WORLD_SIZE", "LOCAL_WORLD_SIZE")
}
print(f"[{os.getpid()}] Initializing process group with: {env_dict}")
dist.init_process_group(backend="nccl")
train()
dist.destroy_process_group()
if __name__ == "__main__":
run()
本例中使用torchrun来执行多机多卡的分布式训练任务(注:torch.distributed.launch
已经被pytorch淘汰了,尽量不要再使用)。启动脚本描述如下(注:node0和node1均通过该脚本进行启动)
--nnodes=1:3
:表示当前训练任务接受最少1个node,最多3个node参与分布式训练;--nproc_per_node=4
:表示每个node上节点有4个process--max_restarts=3
: worker group最大的重启次数;这里需要注意的是,node fail、node scale down和node scale up都会导致restart;--rdzv_id=1
:一个unique的job id,所有node均使用同一个job id;--rdzv_backend
: rendezvous的backend实现,默认支持c10d和etcd两种;rendezvous用于多个node之间的通信和协调;--rdzv_endpoint
:rendezvous的地址,应该为一个node的host ip和port;
torchrun \
--nnodes=1:3\
--nproc_per_node=4\
--max_restarts=3\
--rdzv_id=1\
--rdzv_backend=c10d\
--rdzv_endpoint="192.0.0.1:1234"\
train_elastic.py
6. 结果分析
代码:BetterDL - train_elastic.py:https://github.com/tingshua-yts/BetterDL/blob/master/test/pytorch/DDP/train_elastic.py
运行环境: 2台4卡 v100机器
image: pytorch/pytorch:1.11.0-cuda11.3-cudnn8-runtime
gpu: v100
先在node0上执行执行启动脚本
torchrun \
--nnodes=1:3\
--nproc_per_node=4\
--max_restarts=3\
--rdzv_id=1\
--rdzv_backend=c10d\
--rdzv_endpoint="192.0.0.1:1234"\
train_elastic.py
得到如下结果
2~5行:当前启动的是单机4卡的训练任务,因此WORLD_SIZE为4, LOCAL_WORKD_SIZE也为4
6~9行:共有4个rank参与了分布式训练,rank0~rank3
10~18行: rank0~rank3 均从epoch=0开始训练
r/workspace/DDP# sh run_elastic.sh
[4031] Initializing process group with: {'MASTER_ADDR': '192.0.0.1', 'MASTER_PORT': '44901', 'WORLD_SIZE': '4', 'LOCAL_WORLD_SIZE': '4'}
[4029] Initializing process group with: {'MASTER_ADDR': '192.0.0.1', 'MASTER_PORT': '44901', 'WORLD_SIZE': '4', 'LOCAL_WORLD_SIZE': '4'}
[4030] Initializing process group with: {'MASTER_ADDR': '192.0.0.1', 'MASTER_PORT': '44901', 'WORLD_SIZE': '4', 'LOCAL_WORLD_SIZE': '4'}
[4032] Initializing process group with: {'MASTER_ADDR': '192.0.0.1', 'MASTER_PORT': '44901', 'WORLD_SIZE': '4', 'LOCAL_WORLD_SIZE': '4'}
[4029] (rank = 0, local_rank = 0) train worker starting...
[4030] (rank = 1, local_rank = 1) train worker starting...
[4032] (rank = 3, local_rank = 3) train worker starting...
[4031] (rank = 2, local_rank = 2) train worker starting...
[4101] epoch 0 (rank = 1, local_rank = 1) loss = 0.9288564920425415
[4103] epoch 0 (rank = 3, local_rank = 3) loss = 0.9711472988128662
[4102] epoch 0 (rank = 2, local_rank = 2) loss = 1.0727070569992065
[4100] epoch 0 (rank = 0, local_rank = 0) loss = 0.9402943253517151
[4100] epoch 1 (rank = 0, local_rank = 0) loss = 1.0327017307281494
[4101] epoch 1 (rank = 1, local_rank = 1) loss = 1.4485043287277222
[4103] epoch 1 (rank = 3, local_rank = 3) loss = 1.0959293842315674
[4102] epoch 1 (rank = 2, local_rank = 2) loss = 1.0669530630111694
...
在node1上执行与上面相同的脚本
torchrun \
--nnodes=1:3\
--nproc_per_node=4\
--max_restarts=3\
--rdzv_id=1\
--rdzv_backend=c10d\
--rdzv_endpoint="192.0.0.1:1234"\
train_elastic.py
node1上结果如下:
2~5行:由于添加node1,当前执行的是2机8卡的分布式训练任务,因此WORLD_SIZE=8, LOCAL_WORLD_SIZE=4
6~9行:当前node1上workers的rank为rank4 ~rank7
13~20行: 由于node1是在node0上work训练到epoch35的时候加入的,因此其接着epoch 35开始训练
/workspace/DDP# sh run_elastic.sh
[696] Initializing process group with: {'MASTER_ADDR': '192.0.0.1', 'MASTER_PORT': '42913', 'WORLD_SIZE': '8', 'LOCAL_WORLD_SIZE': '4'}
[697] Initializing process group with: {'MASTER_ADDR': '192.0.0.1', 'MASTER_PORT': '42913', 'WORLD_SIZE': '8', 'LOCAL_WORLD_SIZE': '4'}
[695] Initializing process group with: {'MASTER_ADDR': '192.0.0.1', 'MASTER_PORT': '42913', 'WORLD_SIZE': '8', 'LOCAL_WORLD_SIZE': '4'}
[694] Initializing process group with: {'MASTER_ADDR': '192.0.0.1', 'MASTER_PORT': '42913', 'WORLD_SIZE': '8', 'LOCAL_WORLD_SIZE': '4'}
[697] (rank = 7, local_rank = 3) train worker starting...
[695] (rank = 5, local_rank = 1) train worker starting...
[694] (rank = 4, local_rank = 0) train worker starting...
[696] (rank = 6, local_rank = 2) train worker starting...
load checkpoint from checkpoint.ptload checkpoint from checkpoint.pt
load checkpoint from checkpoint.pt
load checkpoint from checkpoint.pt
[697] epoch 35 (rank = 7, local_rank = 3) loss = 1.1888569593429565
[694] epoch 35 (rank = 4, local_rank = 0) loss = 0.8916441202163696
[695] epoch 35 (rank = 5, local_rank = 1) loss = 1.5685604810714722
[696] epoch 35 (rank = 6, local_rank = 2) loss = 1.11683189868927
[696] epoch 36 (rank = 6, local_rank = 2) loss = 1.3724170923233032
[694] epoch 36 (rank = 4, local_rank = 0) loss = 1.061527967453003
[695] epoch 36 (rank = 5, local_rank = 1) loss = 0.96876460313797
[697] epoch 36 (rank = 7, local_rank = 3) loss = 0.8060566782951355
...
node0上结果如下:
6~9行: node0上的works在执行到epoch 35时,node1上执行了训练脚本,请求加入到训练任务中
10~13行:所有workers重新启动,由于添加了node1,当前执行的是2机8卡的分布式训练任务,因此WORLD_SIZE=8, LOCAL_WORLD_SIZE=4
14~17行:当前node1上works的rank为rank0~rank3
18~21行:加载checkpoint
22~30行:接着checkpoint中的model、optimizer和epoch继续训练
...
[4100] epoch 35 (rank = 0, local_rank = 0) loss = 1.0746158361434937
[4101] epoch 35 (rank = 1, local_rank = 1) loss = 1.1712706089019775
[4103] epoch 35 (rank = 3, local_rank = 3) loss = 1.1774182319641113
[4102] epoch 35 (rank = 2, local_rank = 2) loss = 1.0898035764694214
WARNING:torch.distributed.elastic.multiprocessing.api:Sending process 4100 closing signal SIGTERM
WARNING:torch.distributed.elastic.multiprocessing.api:Sending process 4101 closing signal SIGTERM
WARNING:torch.distributed.elastic.multiprocessing.api:Sending process 4102 closing signal SIGTERM
WARNING:torch.distributed.elastic.multiprocessing.api:Sending process 4103 closing signal SIGTERM
[4164] Initializing process group with: {'MASTER_ADDR': '192.0.0.1', 'MASTER_PORT': '42913', 'WORLD_SIZE': '8', 'LOCAL_WORLD_SIZE': '4'}
[4165] Initializing process group with: {'MASTER_ADDR': '192.0.0.1', 'MASTER_PORT': '42913', 'WORLD_SIZE': '8', 'LOCAL_WORLD_SIZE': '4'}
[4162] Initializing process group with: {'MASTER_ADDR': '192.0.0.1', 'MASTER_PORT': '42913', 'WORLD_SIZE': '8', 'LOCAL_WORLD_SIZE': '4'}
[4163] Initializing process group with: {'MASTER_ADDR': '192.0.0.1', 'MASTER_PORT': '42913', 'WORLD_SIZE': '8', 'LOCAL_WORLD_SIZE': '4'}
[4162] (rank = 0, local_rank = 0) train worker starting...
[4163] (rank = 1, local_rank = 1) train worker starting...
[4164] (rank = 2, local_rank = 2) train worker starting...
[4165] (rank = 3, local_rank = 3) train worker starting...
load checkpoint from checkpoint.pt
load checkpoint from checkpoint.pt
load checkpoint from checkpoint.pt
load checkpoint from checkpoint.pt
[4165] epoch 35 (rank = 3, local_rank = 3) loss = 1.3437936305999756
[4162] epoch 35 (rank = 0, local_rank = 0) loss = 1.5693414211273193
[4163] epoch 35 (rank = 1, local_rank = 1) loss = 1.199862003326416
[4164] epoch 35 (rank = 2, local_rank = 2) loss = 1.0465545654296875
[4163] epoch 36 (rank = 1, local_rank = 1) loss = 0.9741991758346558
[4162] epoch 36 (rank = 0, local_rank = 0) loss = 1.3609280586242676
[4164] epoch 36 (rank = 2, local_rank = 2) loss = 0.9585908055305481
[4165] epoch 36 (rank = 3, local_rank = 3) loss = 0.9169824123382568
...
本文仅做学术分享,如有侵权,请联系删文。
后台回复:
后台回复:
后台回复:
1.面向自动驾驶领域的多传感器数据融合技术
2.面向自动驾驶领域的3D点云目标检测全栈学习路线!(单模态+多模态/数据+代码)3.彻底搞透视觉三维重建:原理剖析、代码讲解、及优化改进4.国内首个面向工业级实战的点云处理课程5.激光-视觉-IMU-GPS融合SLAM算法梳理和代码讲解6.彻底搞懂视觉-惯性SLAM:基于VINS-Fusion正式开课啦7.彻底搞懂基于LOAM框架的3D激光SLAM: 源码剖析到算法优化8.彻底剖析室内、室外激光SLAM关键算法原理、代码和实战(cartographer+LOAM +LIO-SAM)
9.从零搭建一套结构光3D重建系统[理论+源码+实践]
10.单目深度估计方法:算法梳理与代码实现
11.自动驾驶中的深度学习模型部署实战
12.相机模型与标定(单目+双目+鱼眼)
13.重磅!四旋翼飞行器:算法与实战
14.ROS2从入门到精通:理论与实战
15.国内首个3D缺陷检测教程:理论、源码与实战
扫码添加小助手微信,可申请加入3D视觉工坊-学术论文写作与投稿 微信交流群,旨在
▲长按加微信群或投稿
▲长按关注公众号
学习3D视觉核心技术,扫描查看介绍,3天内无条件退款
圈里有高质量教程资料、可答疑解惑、助你高效解决问题