当前位置: 首页 > news >正文

潍坊网站建设最新报价百度pc端提升排名

潍坊网站建设最新报价,百度pc端提升排名,计算机毕业论文10000字范文,深圳网站建设一条龙Nacos集群数据同步 ​ 当我们有服务进行注册以后,会写入注册信息同时会触发ClientChangedEvent事件,通过这个事件,就会开始进行Nacos的集群数据同步,当然这其中只有有一个Nacos节点来处理对应的客户端请求,其实这其中…

Nacos集群数据同步

​ 当我们有服务进行注册以后,会写入注册信息同时会触发ClientChangedEvent事件,通过这个事件,就会开始进行Nacos的集群数据同步,当然这其中只有有一个Nacos节点来处理对应的客户端请求,其实这其中还涉及到一个负责节点非负责节点

负责节点

​ 这是首先我们要查看的是DistroClientDataProcessor(客户端数据一致性处理器)类型,这个类型会处理当前节点负责的Client,那我们要查看其中的syncToAllServer方法。

private void syncToAllServer(ClientEvent event) {Client client = event.getClient();// 判断客户端是否为空,是否是临时实例,判断是否是负责节点if (null == client || !client.isEphemeral() || !clientManager.isResponsibleClient(client)) {return;}if (event instanceof ClientEvent.ClientDisconnectEvent) {// 客户端断开连接DistroKey distroKey = new DistroKey(client.getClientId(), TYPE);distroProtocol.sync(distroKey, DataOperation.DELETE);} else if (event instanceof ClientEvent.ClientChangedEvent) {// 客户端新增/修改DistroKey distroKey = new DistroKey(client.getClientId(), TYPE);distroProtocol.sync(distroKey, DataOperation.CHANGE);}
}

​ distroProtocol会循环所有其他nacos节点,提交一个异步任务,这个异步任务会延迟1s,其实这里我们就可以看到这里涉及到客户端的断开和客户端的新增和修改,对于Delete操作,由DistroSyncDeleteTask处理,对于Change操作,由DistroSyncChangeTask处理,这里我们从DistroSyncChangeTask来看

public class DistroSyncChangeTask extends AbstractDistroExecuteTask {private static final DataOperation OPERATION = DataOperation.CHANGE;public DistroSyncChangeTask(DistroKey distroKey, DistroComponentHolder distroComponentHolder) {super(distroKey, distroComponentHolder);}@Overrideprotected DataOperation getDataOperation() {return OPERATION;}// 无回调@Overrideprotected boolean doExecute() {String type = getDistroKey().getResourceType();DistroData distroData = getDistroData(type);if (null == distroData) {Loggers.DISTRO.warn("[DISTRO] {} with null data to sync, skip", toString());return true;}return getDistroComponentHolder().findTransportAgent(type).syncData(distroData, getDistroKey().getTargetServer());}// 有回调@Overrideprotected void doExecuteWithCallback(DistroCallback callback) {String type = getDistroKey().getResourceType();DistroData distroData = getDistroData(type);if (null == distroData) {Loggers.DISTRO.warn("[DISTRO] {} with null data to sync, skip", toString());return;}getDistroComponentHolder().findTransportAgent(type).syncData(distroData, getDistroKey().getTargetServer(), callback);}@Overridepublic String toString() {return "DistroSyncChangeTask for " + getDistroKey().toString();}// 从DistroClientDataProcessor获取DistroDataprivate DistroData getDistroData(String type) {DistroData result = getDistroComponentHolder().findDataStorage(type).getDistroData(getDistroKey());if (null != result) {result.setType(OPERATION);}return result;}
}

​ 获取到的DistroData,其实是从ClientManager实时获取Client。

// DistroClientDataProcessor
@Override
public DistroData getDistroData(DistroKey distroKey) {Client client = clientManager.getClient(distroKey.getResourceKey());if (null == client) {return null;}// 把生成的同步数据放入到数组中byte[] data = ApplicationUtils.getBean(Serializer.class).serialize(client.generateSyncData());return new DistroData(distroKey, data);
}

​ AbstractClient继承了Client,同时给DistroClientDataProcessorClient提供Client的注册信息,包括客户端注册了哪些namespace,哪些group,哪些service,哪些instance。

@Override
public ClientSyncData generateSyncData() {List<String> namespaces = new LinkedList<>();List<String> groupNames = new LinkedList<>();List<String> serviceNames = new LinkedList<>();List<InstancePublishInfo> instances = new LinkedList<>();for (Map.Entry<Service, InstancePublishInfo> entry : publishers.entrySet()) {namespaces.add(entry.getKey().getNamespace());groupNames.add(entry.getKey().getGroup());serviceNames.add(entry.getKey().getName());instances.add(entry.getValue());}return new ClientSyncData(getClientId(), namespaces, groupNames, serviceNames, instances);
}

​ 这里我们在回过头来看syncData方法,这个方法实际上是由DistroClientTransportAgent封装为DistroDataRequest调用其他Nacos节点。

@Override
public boolean syncData(DistroData data, String targetServer) {if (isNoExistTarget(targetServer)) {return true;}DistroDataRequest request = new DistroDataRequest(data, data.getType());Member member = memberManager.find(targetServer);if (checkTargetServerStatusUnhealthy(member)) {Loggers.DISTRO.warn("[DISTRO] Cancel distro sync caused by target server {} unhealthy", targetServer);return false;}try {Response response = clusterRpcClientProxy.sendRequest(member, request);return checkResponse(response);} catch (NacosException e) {Loggers.DISTRO.error("[DISTRO-FAILED] Sync distro data failed! ", e);}return false;
}

非负责节点

​ 当负责节点将数据发送给非负责节点以后,将要处理发送过来的Client数据。这里我们要看DistroClientDataProcessor.processData方法

@Override
public boolean processData(DistroData distroData) {switch (distroData.getType()) {case ADD:case CHANGE:ClientSyncData clientSyncData = ApplicationUtils.getBean(Serializer.class).deserialize(distroData.getContent(), ClientSyncData.class);//处理同步数据handlerClientSyncData(clientSyncData);return true;case DELETE:String deleteClientId = distroData.getDistroKey().getResourceKey();Loggers.DISTRO.info("[Client-Delete] Received distro client sync data {}", deleteClientId);clientManager.clientDisconnected(deleteClientId);return true;default:return false;}
}

​ 然后来查看具体处理方法handlerClientSyncData

private void handlerClientSyncData(ClientSyncData clientSyncData) {Loggers.DISTRO.info("[Client-Add] Received distro client sync data {}", clientSyncData.getClientId());// 同步客户端连接clientManager.syncClientConnected(clientSyncData.getClientId(), clientSyncData.getAttributes());// 获取Client(此时注册到的是ConnectionBasedClient)Client client = clientManager.getClient(clientSyncData.getClientId());// 更新Client数据upgradeClient(client, clientSyncData);
}

​ DistroClientDataProcessor的upgradeClient方法,更新Client里的注册表信息,发布对应事件

private void upgradeClient(Client client, ClientSyncData clientSyncData) {List<String> namespaces = clientSyncData.getNamespaces();List<String> groupNames = clientSyncData.getGroupNames();List<String> serviceNames = clientSyncData.getServiceNames();List<InstancePublishInfo> instances = clientSyncData.getInstancePublishInfos();Set<Service> syncedService = new HashSet<>();for (int i = 0; i < namespaces.size(); i++) {Service service = Service.newService(namespaces.get(i), groupNames.get(i), serviceNames.get(i));Service singleton = ServiceManager.getInstance().getSingleton(service);syncedService.add(singleton);InstancePublishInfo instancePublishInfo = instances.get(i);if (!instancePublishInfo.equals(client.getInstancePublishInfo(singleton))) {client.addServiceInstance(singleton, instancePublishInfo);NotifyCenter.publishEvent(new ClientOperationEvent.ClientRegisterServiceEvent(singleton, client.getClientId()));}}for (Service each : client.getAllPublishedService()) {if (!syncedService.contains(each)) {client.removeServiceInstance(each);NotifyCenter.publishEvent(new ClientOperationEvent.ClientDeregisterServiceEvent(each, client.getClientId()));}}
}

​ **注意:**这里要注意下此时的Client实现类ConnectionBasedClient,只不过它的isNative属性为false,这是非负责节点和负责节点的主要区别。

​ 其实判断当前nacos节点是否为负责节点的依据就是这个isNative属性,如果是客户端直接注册在这个nacos节点上的ConnectionBasedClient,它的isNative属性为true;如果是由Distro协议,同步到这个nacos节点上的ConnectionBasedClient,它的isNative属性为false。

​ 那其实我们都知道2.x的版本以后使用了长连接,所以通过长连接建立在哪个节点上,哪个节点就是责任节点,客户端也只会向这个责任节点发送请求

Distro协议负责集群数据统一

​ Distro为了确保集群间数据一致,不仅仅依赖于数据发生改变时的实时同步,后台有定时任务做数据同步。

​ 在1.x版本中,责任节点每5s同步所有Service的Instance列表的摘要(md5)给非责任节点,非责任节点用对端传来的服务md5比对本地服务的md5,如果发生改变,需要反查责任节点。

​ 在2.x版本中,对这个流程做了改造,责任节点会发送Client全量数据,非责任节点定时检测同步过来的Client是否过期,减少1.x版本中的反查。

​ 责任节点每5s向其他节点发送DataOperation=VERIFY类型的DistroData,来维持非责任节点的Client数据不过期。

//DistroVerifyTimedTask 
@Override
public void run() {try {// 所有其他节点List<Member> targetServer = serverMemberManager.allMembersWithoutSelf();if (Loggers.DISTRO.isDebugEnabled()) {Loggers.DISTRO.debug("server list is: {}", targetServer);}for (String each : distroComponentHolder.getDataStorageTypes()) {// 遍历想这些节点发送Client.isNative=true的DistroData,type = VERIFYverifyForDataStorage(each, targetServer);}} catch (Exception e) {Loggers.DISTRO.error("[DISTRO-FAILED] verify task failed.", e);}
}

​ 非责任节点每5s扫描isNative=false的client,如果client30s内没有被VERIFY的DistroData更新过续租时间,会删除这个同步过来的Client数据。

//ConnectionBasedClientManager->ExpiredClientCleaner
private static class ExpiredClientCleaner implements Runnable {private final ConnectionBasedClientManager clientManager;public ExpiredClientCleaner(ConnectionBasedClientManager clientManager) {this.clientManager = clientManager;}@Overridepublic void run() {long currentTime = System.currentTimeMillis();for (String each : clientManager.allClientId()) {ConnectionBasedClient client = (ConnectionBasedClient) clientManager.getClient(each);if (null != client && client.isExpire(currentTime)) {clientManager.clientDisconnected(each);}}}
} 
-------------------------------------------------------------------------------------------
@Override
public boolean isExpire(long currentTime) {// 判断30s内没有续租 认为过期return !isNative() && currentTime - getLastRenewTime() > ClientConfig.getInstance().getClientExpiredTime();
}
http://www.tj-hxxt.cn/news/2027.html

相关文章:

  • 做简历网站百度公司全称叫什么
  • 微博白菜网站怎么做如何在各种网站投放广告
  • 建筑网站 国外seo专家招聘
  • 自助网站系统百度关键词查询工具免费
  • 二级域名网站建设引擎优化seo是什么
  • 网站的架设百度seo排名优化
  • 怎么优化网站的单个关键词排名网站百度推广
  • 苏州做网站推广哪家好对网站外部的搜索引擎优化
  • 国际网站开发客户的技巧中文搜索引擎排行榜
  • 国外有哪些做服装的网站有哪些深圳网络整合营销公司
  • 免费推广引流百度官网优化
  • 阿三做网站免费文件外链网站
  • 推广游戏网站怎么做世界十大搜索引擎排名
  • 河南建设资格执业网站营销方案策划书
  • wordpress无插件对接公众号网站关键词优化网站推广
  • 长春做网站设计新媒体平台
  • 长沙58同城招聘网最新招聘而的跟地seo排名点击软件
  • 企业网站设计怎么做新乡seo网络推广费用
  • 兰溪建设网站威海百度seo
  • 温州哪里有网站如何自己免费制作网站
  • 华为仓颉编程语言杭州百度人工优化
  • ae做的动效怎么放在网站上有了域名如何建立网站
  • 建搜索型网站查询网
  • 网站开发饼图样式做一个电商平台大概需要多少钱
  • 制作关于灯的网站抖音搜索seo软件
  • 中国网站建设公司自动推广引流app
  • 找人做时时彩网站正规推广平台
  • 网站进行中英文转换怎么做深圳网站开发技术
  • 展示型网站php今天的三个新闻
  • 网页设计入门模板德阳seo优化