当前位置: 首页 > news >正文

学生处网站建设招标公告跑腿网站建设

学生处网站建设招标公告,跑腿网站建设,做程序的网站,电商设计培训学校因为业务需要#xff0c;最近接到一项任务#xff0c;是如何利用pytorch实现model parallel以及distributed training。搜罗了网上很多资料#xff0c;以及阅读了pytorch官方的教程#xff0c;都没有可参考的案例。讲的比较多的是data parallel#xff0c;关于model paral…因为业务需要最近接到一项任务是如何利用pytorch实现model parallel以及distributed training。搜罗了网上很多资料以及阅读了pytorch官方的教程都没有可参考的案例。讲的比较多的是data parallel关于model parallel的研究发现不多。 通过阅读pytorch官方主页发现这个example是进行model parallel的 官方博客地址DISTRIBUTED PIPELINE PARALLELISM USING RPC 官方的example地址Distributed Pipeline Parallel Example 通过阅读代码发现这个代码以Resnet 50 model为例将model直接拆分成两部分并指定两部分在不同的worker运行代码实现了在同一台机器上创建多进程来拆分模型运行。关于这个代码的详细介绍可搜索关键词pytorch RPC 的分布式管道并行这里不多介绍。 通过在本地运行代码发现不满足多机器运行的需求。接下来是思考的心路里程。 首先通过代码发现python main.py程序运行时无法指定rank那么在跨机器运行时如何知道哪台机器是worker1worker2这个地方我们首先怀疑需要去修改worker人为在代码中指定worker的IP地址如修改main.py 代码中191行 修改前 model DistResNet50(split_size, [worker1, worker2]) 修改后 model DistResNet50(split_size, [worker1xxx.xxx.xxx.xxx, worker2xxx.xxx.xxx.xxx]) 然后很自然的就报错了这里无法识别这样的worker名不支持直接指定这条路也就走不通了。接着只能重新阅读代码到最后251行我们发现 mp.spawn(run_worker, args(world_size, num_split), nprocsworld_size, joinTrue) 尤其是这行代码中mp.spawn引起了我们的怀疑这不是多进程么这本质还是在多进程情况下来执行程序无法跨机器啊不符合我们的需求。最后的最后我们重新阅读pytorch rpc机制并通过简单测试程序让两台机器互相通信其中一台机器发起运算请求并传输原始数据另外一台机器负责接收数据并进行相关运算这个程序当时在两台物理机器上测试成功了那说明rpc实现通信这件事并不复杂。结合前面给的代码我们决定将worke1和worker2分开写代码分开执行并且在代码中需要指定这些worker所属的rank这样理论上就能够将原始代码修改成分机器的rpc通信运行了。 上面主要是我们的心理历程话不多说接下来show the code。 实验环境两台机器均是cpu环境conda安装的环境也保证了一致。 master机器代码 # https://github.com/pytorch/examples/blob/main/distributed/rpc/pipeline/main.pyimport os import threading import time import torch import torch.nn as nn import torch.distributed.autograd as dist_autograd import torch.distributed.rpc as rpc import torch.optim as optim from torch.distributed.optim import DistributedOptimizer from torch.distributed.rpc import RReffrom torchvision.models.resnet import Bottleneck os.environ[MASTER_ADDR] XXX.XXX.XXX.XXX # 指定master ip地址 os.environ[MASTER_PORT] 7856 # 指定master 端口号######################################################### # Define Model Parallel ResNet50 # ########################################################## In order to split the ResNet50 and place it on two different workers, we # implement it in two model shards. The ResNetBase class defines common # attributes and methods shared by two shards. ResNetShard1 and ResNetShard2 # contain two partitions of the model layers respectively.num_classes 1000def conv1x1(in_planes, out_planes, stride1):1x1 convolutionreturn nn.Conv2d(in_planes, out_planes, kernel_size1, stridestride, biasFalse)class ResNetBase(nn.Module):def __init__(self, block, inplanes, num_classes1000,groups1, width_per_group64, norm_layerNone):super(ResNetBase, self).__init__()self._lock threading.Lock()self._block blockself._norm_layer nn.BatchNorm2dself.inplanes inplanesself.dilation 1self.groups groupsself.base_width width_per_groupdef _make_layer(self, planes, blocks, stride1):norm_layer self._norm_layerdownsample Noneprevious_dilation self.dilationif stride ! 1 or self.inplanes ! planes * self._block.expansion:downsample nn.Sequential(conv1x1(self.inplanes, planes * self._block.expansion, stride),norm_layer(planes * self._block.expansion),)layers []layers.append(self._block(self.inplanes, planes, stride, downsample, self.groups,self.base_width, previous_dilation, norm_layer))self.inplanes planes * self._block.expansionfor _ in range(1, blocks):layers.append(self._block(self.inplanes, planes, groupsself.groups,base_widthself.base_width, dilationself.dilation,norm_layernorm_layer))return nn.Sequential(*layers)def parameter_rrefs(self):rCreate one RRef for each parameter in the given local module, and return alist of RRefs.return [RRef(p) for p in self.parameters()]class ResNetShard1(ResNetBase):The first part of ResNet.def __init__(self, device, *args, **kwargs):super(ResNetShard1, self).__init__(Bottleneck, 64, num_classesnum_classes, *args, **kwargs)self.device deviceself.seq nn.Sequential(nn.Conv2d(3, self.inplanes, kernel_size7, stride2, padding3, biasFalse),self._norm_layer(self.inplanes),nn.ReLU(inplaceTrue),nn.MaxPool2d(kernel_size3, stride2, padding1),self._make_layer(64, 3),self._make_layer(128, 4, stride2)).to(self.device)for m in self.modules():if isinstance(m, nn.Conv2d):nn.init.kaiming_normal_(m.weight, modefan_out, nonlinearityrelu)elif isinstance(m, nn.BatchNorm2d):nn.init.ones_(m.weight)nn.init.zeros_(m.bias)def forward(self, x_rref):x x_rref.to_here().to(self.device)with self._lock:out self.seq(x)return out.cpu()class ResNetShard2(ResNetBase):The second part of ResNet.def __init__(self, device, *args, **kwargs):super(ResNetShard2, self).__init__(Bottleneck, 512, num_classesnum_classes, *args, **kwargs)self.device deviceself.seq nn.Sequential(self._make_layer(256, 6, stride2),self._make_layer(512, 3, stride2),nn.AdaptiveAvgPool2d((1, 1)),).to(self.device)self.fc nn.Linear(512 * self._block.expansion, num_classes).to(self.device)def forward(self, x_rref):x x_rref.to_here().to(self.device)with self._lock:out self.fc(torch.flatten(self.seq(x), 1))return out.cpu()class DistResNet50(nn.Module):Assemble two parts as an nn.Module and define pipelining logicdef __init__(self, split_size, workers, *args, **kwargs):super(DistResNet50, self).__init__()self.split_size split_size# Put the first part of the ResNet50 on workers[0]self.p1_rref rpc.remote(workers[0],ResNetShard1,args (cuda:0,) args,kwargs kwargs)# Put the second part of the ResNet50 on workers[1]self.p2_rref rpc.remote(workers[1],ResNetShard2,args (cpu,) args,kwargs kwargs)def forward(self, xs):# Split the input batch xs into micro-batches, and collect async RPC# futures into a listout_futures []for x in iter(xs.split(self.split_size, dim0)):x_rref RRef(x)y_rref self.p1_rref.remote().forward(x_rref)print(y_rref)z_fut self.p2_rref.rpc_async().forward(y_rref)print(z_fut)out_futures.append(z_fut)# collect and cat all output tensors into one tensor.return torch.cat(torch.futures.wait_all(out_futures))def parameter_rrefs(self):remote_params []remote_params.extend(self.p1_rref.remote().parameter_rrefs().to_here())remote_params.extend(self.p2_rref.remote().parameter_rrefs().to_here())return remote_params######################################################### # Run RPC Processes # #########################################################num_batches 3 batch_size 8 image_w 128 image_h 128if __name____main__:options rpc.TensorPipeRpcBackendOptions(num_worker_threads256, rpc_timeout300)# 初始化主节点的RPC连接rpc.init_rpc(master, rank0, world_size2, rpc_backend_optionsoptions)for num_split in [1,2]:tik time.time()model DistResNet50(num_split, [master, worker])loss_fn nn.MSELoss()opt DistributedOptimizer(optim.SGD,model.parameter_rrefs(),lr0.05,)one_hot_indices torch.LongTensor(batch_size) \.random_(0, num_classes) \.view(batch_size, 1)for i in range(num_batches):print(fProcessing batch {i})# generate random inputs and labelsinputs torch.randn(batch_size, 3, image_w, image_h)labels torch.zeros(batch_size, num_classes) \.scatter_(1, one_hot_indices, 1)with dist_autograd.context() as context_id:outputs model(inputs)dist_autograd.backward(context_id, [loss_fn(outputs, labels)])opt.step(context_id)tok time.time()print(fnumber of splits {num_split}, execution time {tok - tik})# 关闭RPC连接rpc.shutdown() worker端的代码 # https://github.com/pytorch/examples/blob/main/distributed/rpc/pipeline/main.pyimport os import threading import time from functools import wrapsimport torch import torch.nn as nn import torch.distributed.rpc as rpc from torch.distributed.rpc import RReffrom torchvision.models.resnet import Bottleneck os.environ[MASTER_ADDR] XXX.XXX.XXX.XXX # 指定master 端口号 os.environ[MASTER_PORT] 7856 # 指定master 端口号######################################################### # Define Model Parallel ResNet50 # ########################################################## In order to split the ResNet50 and place it on two different workers, we # implement it in two model shards. The ResNetBase class defines common # attributes and methods shared by two shards. ResNetShard1 and ResNetShard2 # contain two partitions of the model layers respectively.num_classes 1000def conv1x1(in_planes, out_planes, stride1):1x1 convolutionreturn nn.Conv2d(in_planes, out_planes, kernel_size1, stridestride, biasFalse)class ResNetBase(nn.Module):def __init__(self, block, inplanes, num_classes1000,groups1, width_per_group64, norm_layerNone):super(ResNetBase, self).__init__()self._lock threading.Lock()self._block blockself._norm_layer nn.BatchNorm2dself.inplanes inplanesself.dilation 1self.groups groupsself.base_width width_per_groupdef _make_layer(self, planes, blocks, stride1):norm_layer self._norm_layerdownsample Noneprevious_dilation self.dilationif stride ! 1 or self.inplanes ! planes * self._block.expansion:downsample nn.Sequential(conv1x1(self.inplanes, planes * self._block.expansion, stride),norm_layer(planes * self._block.expansion),)layers []layers.append(self._block(self.inplanes, planes, stride, downsample, self.groups,self.base_width, previous_dilation, norm_layer))self.inplanes planes * self._block.expansionfor _ in range(1, blocks):layers.append(self._block(self.inplanes, planes, groupsself.groups,base_widthself.base_width, dilationself.dilation,norm_layernorm_layer))return nn.Sequential(*layers)def parameter_rrefs(self):rCreate one RRef for each parameter in the given local module, and return alist of RRefs.return [RRef(p) for p in self.parameters()]class ResNetShard2(ResNetBase):The second part of ResNet.def __init__(self, device, *args, **kwargs):super(ResNetShard2, self).__init__(Bottleneck, 512, num_classesnum_classes, *args, **kwargs)self.device deviceself.seq nn.Sequential(self._make_layer(256, 6, stride2),self._make_layer(512, 3, stride2),nn.AdaptiveAvgPool2d((1, 1)),).to(self.device)self.fc nn.Linear(512 * self._block.expansion, num_classes).to(self.device)def forward(self, x_rref):x x_rref.to_here().to(self.device)print(x)with self._lock:out self.fc(torch.flatten(self.seq(x), 1))return out.cpu()######################################################### # Run RPC Processes # #########################################################if __name____main__:options rpc.TensorPipeRpcBackendOptions(num_worker_threads256, rpc_timeout300)# 初始化工作节点的RPC连接rpc.init_rpc(worker, rank1, world_size2, rpc_backend_optionsoptions)# 等待主节点的调用rpc.shutdown() 代码中的MASTER_ADDR和port需要指定为一致分别在master机器上运行master.pyworker机器上运行worker.py这样就可以实现Resnet 50 model在两台物理机器上model parallel。 注意事项 确保物理机器能够互相ping通同时关闭防火墙两个物理机器最好都是linux环境我们的实验发现pytorch的分布式不支持在Windows环境运行两个物理机器的python运行环境要求保持一致
http://www.tj-hxxt.cn/news/221484.html

相关文章:

  • 自然堂官方网站建设网站建设及推广话术
  • seo 能提高网站速度吗应届生出来做网站还是做报纸好
  • 网站开发视频下载wordpress模板代码
  • 做pc网站字体怎么安装到电脑wordpress
  • 下载织梦做网站软件有账号和密码怎么进公司网站后台
  • 企业标准网站模板wordpress垂直分页导航插件
  • 网站建设的三网合一手机网站做seo
  • dz 一步一步教你做网站互联业务登录页 网站
  • 建手机网站阿里云怎么申请域名
  • 网站建设新闻发布注意微网站界面尺寸
  • 苏州网站建设老板网站内容设计是什么
  • 雄县网站建设公司网站编辑内容
  • 汉口网站建设制作php网站留言板是怎么做的
  • 忻州市建设厅网站首页好的网站怎么设计师
  • 济南网站建设老威虚拟空间网站回收池有什么作用
  • 网站建设技术参数腾讯云网站建设
  • 网站建设学什么工信部信息备案网站
  • 上虞网站建设哪家好杭州seo教程
  • html模板网站推荐企业标识图片logo
  • 域名备案期间网站做网站 图文教程
  • 珠海网站建设创意产品网站建设设计方案
  • 网站外链如何做阿里云建站百度收录吗
  • 做一个网上商城网站建设费用多少钱报价单模板
  • python 做网站开发互联网网站设计
  • 山东省住房城乡建设厅查询网站沈阳信息工程学校中专
  • 事业单位网站建设注销情况说明编写软件的软件
  • 平台网站开发多少钱wordpress主题阿里百
  • 创建网站大约网站搭建需要多少钱
  • 公司做网站注意事项水友做的yyf网站
  • 平面设计的素材网站即墨专业医院网站制作公司