当前位置: 首页 > news >正文

能不能同行网站做站长统计个人做网站 私活

能不能同行网站做站长统计,个人做网站 私活,建设工程英语网站,wordpress配置多站点选举集群状态 es中存储的数据有一下几种#xff0c;state元数据、lucene索引文件、translog事务日志 元数据信息可以分为#xff1a; 集群层面的元信息-对应着metaData数据结构#xff0c;主要是clusterUUid、settings、templates等索引层面的元信息-对应着indexMetaData数…选举集群状态 es中存储的数据有一下几种state元数据、lucene索引文件、translog事务日志 元数据信息可以分为 集群层面的元信息-对应着metaData数据结构主要是clusterUUid、settings、templates等索引层面的元信息-对应着indexMetaData数据结构主要存储分片数量、mappings索引字段映射等分片层面的元信息-对应着shardStateMetaData主要是version、indexUUid、主分片等 每个节点可能会有不同的集群状态需要选择正确的元数据作为权威源数据。状态信息的管理在gatewayService中它实现了ClusterStateListener接口当选择完主节点后会发布一个集群状态task触发回调方法clusterChanged //恢复分片分配状态 performStateRecovery(enforceRecoverAfterTime, reason);集群层和索引层元数据恢复在gateway模块完成 public void clusterChanged(final ClusterChangedEvent event) {if (lifecycle.stoppedOrClosed()) {return;}final ClusterState state event.state();//只有主节点才能执行if (state.nodes().isLocalNodeElectedMaster() false) {// not our job to recoverreturn;}//已经执行过了集群状态和索引状态恢复了if (state.blocks().hasGlobalBlock(STATE_NOT_RECOVERED_BLOCK) false) {// already recoveredreturn;}//这段省略主要是检查是否达到恢复状态条件......//恢复状态performStateRecovery(enforceRecoverAfterTime, reason); }首先判断只有主节点可以执行状态选举然后判断是否已经在执行了状态恢复任务了如果是则直接返回如果没有则执行恢复状态任务 最终会调用recoveryRunnable.run() final Gateway gateway new Gateway(settings, clusterService, listGatewayMetaState);recoveryRunnable () -gateway.performStateRecovery(new GatewayRecoveryListener());执行gateway的performStateRecovery方法 首先回去所有master资格的节点信息 //具有master资格的node节点final String[] nodesIds clusterService.state().nodes().getMasterNodes().keys().toArray(String.class);获取其他master节点的元数据 //获取集群及信息final TransportNodesListGatewayMetaState.NodesGatewayMetaState nodesState listGatewayMetaState.list(nodesIds, null).actionGet();这里我们看下TransportNodesListGatewayMetaState的构造函数 public TransportNodesListGatewayMetaState(ThreadPool threadPool, ClusterService clusterService, TransportService transportService,ActionFilters actionFilters, GatewayMetaState metaState) {super(ACTION_NAME, threadPool, clusterService, transportService, actionFilters,Request::new, NodeRequest::new, ThreadPool.Names.GENERIC, NodeGatewayMetaState.class);this.metaState metaState; }//注册action处理类 transportService.registerRequestHandler(actionName, executor, false, canTripCircuitBreaker, requestReader,new TransportHandler());回到list方法会调用doExecute方法 public ActionFutureNodesGatewayMetaState list(String[] nodesIds, Nullable TimeValue timeout) {PlainActionFutureNodesGatewayMetaState future PlainActionFuture.newFuture();execute(new Request(nodesIds).timeout(timeout), future);return future; }protected void doExecute(Task task, NodesRequest request, ActionListenerNodesResponse listener) {//执行new AsyncAction(task, request, listener).start(); }发送所有节点获取元数据 void start() {final DiscoveryNode[] nodes request.concreteNodes();if (nodes.length 0) {//没有需要获取数据的node// nothing to notifythreadPool.generic().execute(() - listener.onResponse(newResponse(request, responses)));return;}TransportRequestOptions.Builder builder TransportRequestOptions.builder();if (request.timeout() ! null) {builder.withTimeout(request.timeout());}//循环发送请求给所有节点for (int i 0; i nodes.length; i) {final int idx i;final DiscoveryNode node nodes[i];final String nodeId node.getId();try {TransportRequest nodeRequest newNodeRequest(request);if (task ! null) {nodeRequest.setParentTask(clusterService.localNode().getId(), task.getId());}//发送请求transportService.sendRequest(node, transportNodeAction, nodeRequest, builder.build(),new TransportResponseHandlerNodeResponse() {Overridepublic NodeResponse read(StreamInput in) throws IOException {return newNodeResponse(in);}//处理返回Overridepublic void handleResponse(NodeResponse response) {onOperation(idx, response);}Overridepublic void handleException(TransportException exp) {onFailure(idx, node.getId(), exp);}Overridepublic String executor() {return ThreadPool.Names.SAME;}});} catch (Exception e) {onFailure(idx, nodeId, e);}} }对端接收请求后处理在上面注册的NodeTransportHandler构造每个节点元数据返回 //node请求处理class NodeTransportHandler implements TransportRequestHandlerNodeRequest {Overridepublic void messageReceived(NodeRequest request, TransportChannel channel, Task task) throws Exception {channel.sendResponse(nodeOperation(request, task));}}protected NodeGatewayMetaState nodeOperation(NodeRequest request) {return new NodeGatewayMetaState(clusterService.localNode(), metaState.getMetadata());}我们继续回到每个节点发送请求的返回处理 //处理返回 Override public void handleResponse(NodeResponse response) {onOperation(idx, response); }private void onOperation(int idx, NodeResponse nodeResponse) {//记录node的返回结果responses.set(idx, nodeResponse);//当所有节点都返回结果了无论是失败还是成功了if (counter.incrementAndGet() responses.length()) {finishHim();} }private void finishHim() {NodesResponse finalResponse;try {finalResponse newResponse(request, responses);} catch (Exception e) {logger.debug(failed to combine responses from nodes, e);listener.onFailure(e);return;}//触发监听回调listener.onResponse(finalResponse); }及获取到了其他节点的元数据继续回到performStateRecovery 需要获取的master角色节点数 //需要分配数量 final int requiredAllocation Math.max(1, minimumMasterNodes);开始通过版本号选择集群层元数据,比较版本号选择版本号最大的集群状态 //集群元数据 for (final TransportNodesListGatewayMetaState.NodeGatewayMetaState nodeState : nodesState.getNodes()) {if (nodeState.metadata() null) {continue;}found;if (electedGlobalState null) {electedGlobalState nodeState.metadata();//比较版本号大的胜出} else if (nodeState.metadata().version() electedGlobalState.version()) {electedGlobalState nodeState.metadata();}for (final ObjectCursorIndexMetadata cursor : nodeState.metadata().indices().values()) {indices.addTo(cursor.value.getIndex(), 1);} }检查是否有足够数量节点返回了集群状态 //没有获取足够的节点返回消息 if (found requiredAllocation) {listener.onFailure(found [ found ] metadata states, required [ requiredAllocation ]);return; }构造集群状态删除索引信息下面会选择索引级元数据 //更新全局状态清理索引我们在下一阶段选择它们final Metadata.Builder metadataBuilder Metadata.builder(electedGlobalState).removeAllIndices();遍历所有节点选择返回的索引元数据版本最高的节点作为索引级元数据然后将索引级元数据添加到metadataBuilder中 for (int i 0; i keys.length; i) {if (keys[i] ! null) {final Index index (Index) keys[i];IndexMetadata electedIndexMetadata null;int indexMetadataCount 0;for (final TransportNodesListGatewayMetaState.NodeGatewayMetaState nodeState : nodesState.getNodes()) {if (nodeState.metadata() null) {continue;}final IndexMetadata indexMetadata nodeState.metadata().index(index);if (indexMetadata null) {continue;}if (electedIndexMetadata null) {electedIndexMetadata indexMetadata;//比较版本号选择最大版本号} else if (indexMetadata.getVersion() electedIndexMetadata.getVersion()) {electedIndexMetadata indexMetadata;}indexMetadataCount;}if (electedIndexMetadata ! null) {if (indexMetadataCount requiredAllocation) {logger.debug([{}] found [{}], required [{}], not adding, index, indexMetadataCount, requiredAllocation);} // TODO if this logging statement is correct then we are missing an else here//设置索引级元数据metadataBuilder.put(electedIndexMetadata, false);}} }构造恢复后的集群级元数据和索引级元数据 //恢复后的集群状态 ClusterState recoveredState Function.ClusterStateidentity().andThen(state - ClusterStateUpdaters.upgradeAndArchiveUnknownOrInvalidSettings(state, clusterService.getClusterSettings())).apply(ClusterState.builder(clusterService.getClusterName()).metadata(metadataBuilder).build());listener.onSuccess(recoveredState);调用GatewayRecoveryListener的onSuccess向集群提交任务 class GatewayRecoveryListener implements Gateway.GatewayStateRecoveredListener {Overridepublic void onSuccess(final ClusterState recoveredState) {logger.trace(successful state recovery, importing cluster state...);clusterService.submitStateUpdateTask(local-gateway-elected-state,new RecoverStateUpdateTask() {Overridepublic ClusterState execute(final ClusterState currentState) {final ClusterState updatedState ClusterStateUpdaters.mixCurrentStateAndRecoveredState(currentState, recoveredState);return super.execute(ClusterStateUpdaters.recoverClusterBlocks(updatedState));}});}Overridepublic void onFailure(final String msg) {logger.info(state recovery failed: {}, msg);resetRecoveredFlags();}}调用RecoverStateUpdateTask的execute方法 Override public ClusterState execute(final ClusterState currentState) {if (currentState.blocks().hasGlobalBlock(STATE_NOT_RECOVERED_BLOCK) false) {logger.debug(cluster is already recovered);return currentState;}//状态信息恢复完成final ClusterState newState Function.ClusterStateidentity().andThen(ClusterStateUpdaters::updateRoutingTable).andThen(ClusterStateUpdaters::removeStateNotRecoveredBlock).apply(currentState);//开始分配分片return allocationService.reroute(newState, state recovered); }集群元数据和索引级元数据恢复完成开始分配分片 元数据的持久化 具有master资格的节点和数据节点可以持久化集群状态当接收到集群状态变更时会将其持久化到磁盘GatewayClusterApplier实现了ClusterStateApplier当集群状态变更时会调用applyClusterState方法 Override public void applyClusterState(ClusterChangedEvent event) {if (event.state().blocks().disableStatePersistence()) {incrementalClusterStateWriter.setIncrementalWrite(false);return;}try {// Hack: This is to ensure that non-master-eligible Zen2 nodes always store a current term// thats higher than the last accepted term.// TODO: can we get rid of this hack?if (event.state().term() incrementalClusterStateWriter.getPreviousManifest().getCurrentTerm()) {incrementalClusterStateWriter.setCurrentTerm(event.state().term());}//更新磁盘上的元数据incrementalClusterStateWriter.updateClusterState(event.state());incrementalClusterStateWriter.setIncrementalWrite(true);} catch (WriteStateException e) {logger.warn(Exception occurred when storing new meta data, e);} }将集群级元数据和索引级元数据落盘 void updateClusterState(ClusterState newState) throws WriteStateException {//元数据Metadata newMetadata newState.metadata();final long startTimeMillis relativeTimeMillisSupplier.getAsLong();final AtomicClusterStateWriter writer new AtomicClusterStateWriter(metaStateService, previousManifest);//全局元数据long globalStateGeneration writeGlobalState(writer, newMetadata);//索引级元数据MapIndex, Long indexGenerations writeIndicesMetadata(writer, newState);Manifest manifest new Manifest(previousManifest.getCurrentTerm(), newState.version(), globalStateGeneration, indexGenerations);writeManifest(writer, manifest);previousManifest manifest;previousClusterState newState;final long durationMillis relativeTimeMillisSupplier.getAsLong() - startTimeMillis;final TimeValue finalSlowWriteLoggingThreshold this.slowWriteLoggingThreshold;if (durationMillis finalSlowWriteLoggingThreshold.getMillis()) {logger.warn(writing cluster state took [{}ms] which is above the warn threshold of [{}]; wrote metadata for [{}] indices and skipped [{}] unchanged indices,durationMillis, finalSlowWriteLoggingThreshold, writer.getIndicesWritten(), writer.getIndicesSkipped());} else {logger.debug(writing cluster state took [{}ms]; wrote metadata for [{}] indices and skipped [{}] unchanged indices,durationMillis, writer.getIndicesWritten(), writer.getIndicesSkipped());} }加载磁盘元数据 在node实例的start方法中会调用gatewayMetaState.start方法 //集群元数据 final GatewayMetaState gatewayMetaState injector.getInstance(GatewayMetaState.class); gatewayMetaState.start(settings(), transportService, clusterService, injector.getInstance(MetaStateService.class),injector.getInstance(MetadataIndexUpgradeService.class), injector.getInstance(MetadataUpgrader.class),injector.getInstance(PersistedClusterStateService.class));然后会调用loadFullState方法 //加载元数据 manifestClusterStateTuple metaStateService.loadFullState();public TupleManifest, Metadata loadFullState() throws IOException {//加载最新的状态文件final Manifest manifest MANIFEST_FORMAT.loadLatestState(logger, namedXContentRegistry, nodeEnv.nodeDataPaths());if (manifest null) {return loadFullStateBWC();}//构建元数据final Metadata.Builder metadataBuilder;if (manifest.isGlobalGenerationMissing()) {metadataBuilder Metadata.builder();} else {final Metadata globalMetadata METADATA_FORMAT.loadGeneration(logger, namedXContentRegistry, manifest.getGlobalGeneration(),nodeEnv.nodeDataPaths());if (globalMetadata ! null) {metadataBuilder Metadata.builder(globalMetadata);} else {throw new IOException(failed to find global metadata [generation: manifest.getGlobalGeneration() ]);}}//索引级元数据for (Map.EntryIndex, Long entry : manifest.getIndexGenerations().entrySet()) {final Index index entry.getKey();final long generation entry.getValue();final String indexFolderName index.getUUID();final IndexMetadata indexMetadata INDEX_METADATA_FORMAT.loadGeneration(logger, namedXContentRegistry, generation,nodeEnv.resolveIndexFolder(indexFolderName));if (indexMetadata ! null) {metadataBuilder.put(indexMetadata, false);} else {throw new IOException(failed to find metadata for existing index index.getName() [location: indexFolderName , generation: generation ]);}}return new Tuple(manifest, metadataBuilder.build()); }从磁盘读取构建索引级元数据和集群级元数据用于构建集群状态对象ClusterState
http://www.tj-hxxt.cn/news/223917.html

相关文章:

  • 企业网站php开源系统杭州专业网站建设公司哪家好
  • 中国建设银行悦生活网站食品公司网站设计项目
  • 网站分类导航代码做旅游网站的目的和意义
  • 小米网站开发语言免费设计app的网站建设
  • 二七免费网站建设百度快照收录
  • vps可以用了做网站吗网站设计公司收费标准
  • 有网站怎么开发app视频网站开发php
  • 网页制作正版网站湖南优化公司
  • 株洲网站建设 株洲网站制作唐山企业网站模板建站
  • 沈阳企业网站优化排名方案网站加速
  • 网站建设 个人服务器seo关键词首页排名
  • 茶网站建设实训报告全国最大房产网络平台
  • 科普网站建设经验wordpress网址插件
  • 扬中网站建设公司班级网站设计
  • 谷歌seo网站运营怎么注册微网站吗
  • 电脑做网站用word网站站建设
  • 保山做网站建设网站备案 类型
  • php做的汽车销售网站网站建设模块下载
  • 网页制作免费网站德阳做网站
  • 做短裙的视频网站3705房产网
  • 自己写的网站如何添加 cnzz统计建设网站必须用dns
  • 为什么asp.net做的网站上传后不显示照片网站文件名优化
  • 牡丹江 网站建设网上服务旗舰店
  • soho没有注册公司 能建一个外贸网站吗网页设计素材网站大全
  • 运城建设厅官方网站有没有专门做团购的网站
  • 学设计网站沈阳创新网站建设报价
  • 网站建设需要集齐哪5份资料黑群晖做网站
  • 广州建设银行官方网站域名查询万网
  • 建设银行网站银行登录负责做网站的叫什么公司
  • dedecms网站模板江苏企业展厅设计公司