学生处网站建设招标公告,跑腿网站建设,做程序的网站,电商设计培训学校因为业务需要#xff0c;最近接到一项任务#xff0c;是如何利用pytorch实现model parallel以及distributed training。搜罗了网上很多资料#xff0c;以及阅读了pytorch官方的教程#xff0c;都没有可参考的案例。讲的比较多的是data parallel#xff0c;关于model paral…因为业务需要最近接到一项任务是如何利用pytorch实现model parallel以及distributed training。搜罗了网上很多资料以及阅读了pytorch官方的教程都没有可参考的案例。讲的比较多的是data parallel关于model parallel的研究发现不多。 通过阅读pytorch官方主页发现这个example是进行model parallel的 官方博客地址DISTRIBUTED PIPELINE PARALLELISM USING RPC 官方的example地址Distributed Pipeline Parallel Example 通过阅读代码发现这个代码以Resnet 50 model为例将model直接拆分成两部分并指定两部分在不同的worker运行代码实现了在同一台机器上创建多进程来拆分模型运行。关于这个代码的详细介绍可搜索关键词pytorch RPC 的分布式管道并行这里不多介绍。 通过在本地运行代码发现不满足多机器运行的需求。接下来是思考的心路里程。
首先通过代码发现python main.py程序运行时无法指定rank那么在跨机器运行时如何知道哪台机器是worker1worker2这个地方我们首先怀疑需要去修改worker人为在代码中指定worker的IP地址如修改main.py 代码中191行 修改前 model DistResNet50(split_size, [worker1, worker2]) 修改后 model DistResNet50(split_size, [worker1xxx.xxx.xxx.xxx, worker2xxx.xxx.xxx.xxx]) 然后很自然的就报错了这里无法识别这样的worker名不支持直接指定这条路也就走不通了。接着只能重新阅读代码到最后251行我们发现 mp.spawn(run_worker, args(world_size, num_split), nprocsworld_size, joinTrue) 尤其是这行代码中mp.spawn引起了我们的怀疑这不是多进程么这本质还是在多进程情况下来执行程序无法跨机器啊不符合我们的需求。最后的最后我们重新阅读pytorch rpc机制并通过简单测试程序让两台机器互相通信其中一台机器发起运算请求并传输原始数据另外一台机器负责接收数据并进行相关运算这个程序当时在两台物理机器上测试成功了那说明rpc实现通信这件事并不复杂。结合前面给的代码我们决定将worke1和worker2分开写代码分开执行并且在代码中需要指定这些worker所属的rank这样理论上就能够将原始代码修改成分机器的rpc通信运行了。
上面主要是我们的心理历程话不多说接下来show the code。 实验环境两台机器均是cpu环境conda安装的环境也保证了一致。 master机器代码 # https://github.com/pytorch/examples/blob/main/distributed/rpc/pipeline/main.pyimport os
import threading
import time
import torch
import torch.nn as nn
import torch.distributed.autograd as dist_autograd
import torch.distributed.rpc as rpc
import torch.optim as optim
from torch.distributed.optim import DistributedOptimizer
from torch.distributed.rpc import RReffrom torchvision.models.resnet import Bottleneck
os.environ[MASTER_ADDR] XXX.XXX.XXX.XXX # 指定master ip地址
os.environ[MASTER_PORT] 7856 # 指定master 端口号#########################################################
# Define Model Parallel ResNet50 #
########################################################## In order to split the ResNet50 and place it on two different workers, we
# implement it in two model shards. The ResNetBase class defines common
# attributes and methods shared by two shards. ResNetShard1 and ResNetShard2
# contain two partitions of the model layers respectively.num_classes 1000def conv1x1(in_planes, out_planes, stride1):1x1 convolutionreturn nn.Conv2d(in_planes, out_planes, kernel_size1, stridestride, biasFalse)class ResNetBase(nn.Module):def __init__(self, block, inplanes, num_classes1000,groups1, width_per_group64, norm_layerNone):super(ResNetBase, self).__init__()self._lock threading.Lock()self._block blockself._norm_layer nn.BatchNorm2dself.inplanes inplanesself.dilation 1self.groups groupsself.base_width width_per_groupdef _make_layer(self, planes, blocks, stride1):norm_layer self._norm_layerdownsample Noneprevious_dilation self.dilationif stride ! 1 or self.inplanes ! planes * self._block.expansion:downsample nn.Sequential(conv1x1(self.inplanes, planes * self._block.expansion, stride),norm_layer(planes * self._block.expansion),)layers []layers.append(self._block(self.inplanes, planes, stride, downsample, self.groups,self.base_width, previous_dilation, norm_layer))self.inplanes planes * self._block.expansionfor _ in range(1, blocks):layers.append(self._block(self.inplanes, planes, groupsself.groups,base_widthself.base_width, dilationself.dilation,norm_layernorm_layer))return nn.Sequential(*layers)def parameter_rrefs(self):rCreate one RRef for each parameter in the given local module, and return alist of RRefs.return [RRef(p) for p in self.parameters()]class ResNetShard1(ResNetBase):The first part of ResNet.def __init__(self, device, *args, **kwargs):super(ResNetShard1, self).__init__(Bottleneck, 64, num_classesnum_classes, *args, **kwargs)self.device deviceself.seq nn.Sequential(nn.Conv2d(3, self.inplanes, kernel_size7, stride2, padding3, biasFalse),self._norm_layer(self.inplanes),nn.ReLU(inplaceTrue),nn.MaxPool2d(kernel_size3, stride2, padding1),self._make_layer(64, 3),self._make_layer(128, 4, stride2)).to(self.device)for m in self.modules():if isinstance(m, nn.Conv2d):nn.init.kaiming_normal_(m.weight, modefan_out, nonlinearityrelu)elif isinstance(m, nn.BatchNorm2d):nn.init.ones_(m.weight)nn.init.zeros_(m.bias)def forward(self, x_rref):x x_rref.to_here().to(self.device)with self._lock:out self.seq(x)return out.cpu()class ResNetShard2(ResNetBase):The second part of ResNet.def __init__(self, device, *args, **kwargs):super(ResNetShard2, self).__init__(Bottleneck, 512, num_classesnum_classes, *args, **kwargs)self.device deviceself.seq nn.Sequential(self._make_layer(256, 6, stride2),self._make_layer(512, 3, stride2),nn.AdaptiveAvgPool2d((1, 1)),).to(self.device)self.fc nn.Linear(512 * self._block.expansion, num_classes).to(self.device)def forward(self, x_rref):x x_rref.to_here().to(self.device)with self._lock:out self.fc(torch.flatten(self.seq(x), 1))return out.cpu()class DistResNet50(nn.Module):Assemble two parts as an nn.Module and define pipelining logicdef __init__(self, split_size, workers, *args, **kwargs):super(DistResNet50, self).__init__()self.split_size split_size# Put the first part of the ResNet50 on workers[0]self.p1_rref rpc.remote(workers[0],ResNetShard1,args (cuda:0,) args,kwargs kwargs)# Put the second part of the ResNet50 on workers[1]self.p2_rref rpc.remote(workers[1],ResNetShard2,args (cpu,) args,kwargs kwargs)def forward(self, xs):# Split the input batch xs into micro-batches, and collect async RPC# futures into a listout_futures []for x in iter(xs.split(self.split_size, dim0)):x_rref RRef(x)y_rref self.p1_rref.remote().forward(x_rref)print(y_rref)z_fut self.p2_rref.rpc_async().forward(y_rref)print(z_fut)out_futures.append(z_fut)# collect and cat all output tensors into one tensor.return torch.cat(torch.futures.wait_all(out_futures))def parameter_rrefs(self):remote_params []remote_params.extend(self.p1_rref.remote().parameter_rrefs().to_here())remote_params.extend(self.p2_rref.remote().parameter_rrefs().to_here())return remote_params#########################################################
# Run RPC Processes #
#########################################################num_batches 3
batch_size 8
image_w 128
image_h 128if __name____main__:options rpc.TensorPipeRpcBackendOptions(num_worker_threads256, rpc_timeout300)# 初始化主节点的RPC连接rpc.init_rpc(master, rank0, world_size2, rpc_backend_optionsoptions)for num_split in [1,2]:tik time.time()model DistResNet50(num_split, [master, worker])loss_fn nn.MSELoss()opt DistributedOptimizer(optim.SGD,model.parameter_rrefs(),lr0.05,)one_hot_indices torch.LongTensor(batch_size) \.random_(0, num_classes) \.view(batch_size, 1)for i in range(num_batches):print(fProcessing batch {i})# generate random inputs and labelsinputs torch.randn(batch_size, 3, image_w, image_h)labels torch.zeros(batch_size, num_classes) \.scatter_(1, one_hot_indices, 1)with dist_autograd.context() as context_id:outputs model(inputs)dist_autograd.backward(context_id, [loss_fn(outputs, labels)])opt.step(context_id)tok time.time()print(fnumber of splits {num_split}, execution time {tok - tik})# 关闭RPC连接rpc.shutdown()
worker端的代码 # https://github.com/pytorch/examples/blob/main/distributed/rpc/pipeline/main.pyimport os
import threading
import time
from functools import wrapsimport torch
import torch.nn as nn
import torch.distributed.rpc as rpc
from torch.distributed.rpc import RReffrom torchvision.models.resnet import Bottleneck
os.environ[MASTER_ADDR] XXX.XXX.XXX.XXX # 指定master 端口号
os.environ[MASTER_PORT] 7856 # 指定master 端口号#########################################################
# Define Model Parallel ResNet50 #
########################################################## In order to split the ResNet50 and place it on two different workers, we
# implement it in two model shards. The ResNetBase class defines common
# attributes and methods shared by two shards. ResNetShard1 and ResNetShard2
# contain two partitions of the model layers respectively.num_classes 1000def conv1x1(in_planes, out_planes, stride1):1x1 convolutionreturn nn.Conv2d(in_planes, out_planes, kernel_size1, stridestride, biasFalse)class ResNetBase(nn.Module):def __init__(self, block, inplanes, num_classes1000,groups1, width_per_group64, norm_layerNone):super(ResNetBase, self).__init__()self._lock threading.Lock()self._block blockself._norm_layer nn.BatchNorm2dself.inplanes inplanesself.dilation 1self.groups groupsself.base_width width_per_groupdef _make_layer(self, planes, blocks, stride1):norm_layer self._norm_layerdownsample Noneprevious_dilation self.dilationif stride ! 1 or self.inplanes ! planes * self._block.expansion:downsample nn.Sequential(conv1x1(self.inplanes, planes * self._block.expansion, stride),norm_layer(planes * self._block.expansion),)layers []layers.append(self._block(self.inplanes, planes, stride, downsample, self.groups,self.base_width, previous_dilation, norm_layer))self.inplanes planes * self._block.expansionfor _ in range(1, blocks):layers.append(self._block(self.inplanes, planes, groupsself.groups,base_widthself.base_width, dilationself.dilation,norm_layernorm_layer))return nn.Sequential(*layers)def parameter_rrefs(self):rCreate one RRef for each parameter in the given local module, and return alist of RRefs.return [RRef(p) for p in self.parameters()]class ResNetShard2(ResNetBase):The second part of ResNet.def __init__(self, device, *args, **kwargs):super(ResNetShard2, self).__init__(Bottleneck, 512, num_classesnum_classes, *args, **kwargs)self.device deviceself.seq nn.Sequential(self._make_layer(256, 6, stride2),self._make_layer(512, 3, stride2),nn.AdaptiveAvgPool2d((1, 1)),).to(self.device)self.fc nn.Linear(512 * self._block.expansion, num_classes).to(self.device)def forward(self, x_rref):x x_rref.to_here().to(self.device)print(x)with self._lock:out self.fc(torch.flatten(self.seq(x), 1))return out.cpu()#########################################################
# Run RPC Processes #
#########################################################if __name____main__:options rpc.TensorPipeRpcBackendOptions(num_worker_threads256, rpc_timeout300)# 初始化工作节点的RPC连接rpc.init_rpc(worker, rank1, world_size2, rpc_backend_optionsoptions)# 等待主节点的调用rpc.shutdown()
代码中的MASTER_ADDR和port需要指定为一致分别在master机器上运行master.pyworker机器上运行worker.py这样就可以实现Resnet 50 model在两台物理机器上model parallel。 注意事项
确保物理机器能够互相ping通同时关闭防火墙两个物理机器最好都是linux环境我们的实验发现pytorch的分布式不支持在Windows环境运行两个物理机器的python运行环境要求保持一致