能不能同行网站做站长统计,个人做网站 私活,建设工程英语网站,wordpress配置多站点选举集群状态
es中存储的数据有一下几种#xff0c;state元数据、lucene索引文件、translog事务日志 元数据信息可以分为#xff1a;
集群层面的元信息-对应着metaData数据结构#xff0c;主要是clusterUUid、settings、templates等索引层面的元信息-对应着indexMetaData数…选举集群状态
es中存储的数据有一下几种state元数据、lucene索引文件、translog事务日志 元数据信息可以分为
集群层面的元信息-对应着metaData数据结构主要是clusterUUid、settings、templates等索引层面的元信息-对应着indexMetaData数据结构主要存储分片数量、mappings索引字段映射等分片层面的元信息-对应着shardStateMetaData主要是version、indexUUid、主分片等 每个节点可能会有不同的集群状态需要选择正确的元数据作为权威源数据。状态信息的管理在gatewayService中它实现了ClusterStateListener接口当选择完主节点后会发布一个集群状态task触发回调方法clusterChanged
//恢复分片分配状态
performStateRecovery(enforceRecoverAfterTime, reason);集群层和索引层元数据恢复在gateway模块完成
public void clusterChanged(final ClusterChangedEvent event) {if (lifecycle.stoppedOrClosed()) {return;}final ClusterState state event.state();//只有主节点才能执行if (state.nodes().isLocalNodeElectedMaster() false) {// not our job to recoverreturn;}//已经执行过了集群状态和索引状态恢复了if (state.blocks().hasGlobalBlock(STATE_NOT_RECOVERED_BLOCK) false) {// already recoveredreturn;}//这段省略主要是检查是否达到恢复状态条件......//恢复状态performStateRecovery(enforceRecoverAfterTime, reason);
}首先判断只有主节点可以执行状态选举然后判断是否已经在执行了状态恢复任务了如果是则直接返回如果没有则执行恢复状态任务 最终会调用recoveryRunnable.run()
final Gateway gateway new Gateway(settings, clusterService, listGatewayMetaState);recoveryRunnable () -gateway.performStateRecovery(new GatewayRecoveryListener());执行gateway的performStateRecovery方法 首先回去所有master资格的节点信息
//具有master资格的node节点final String[] nodesIds clusterService.state().nodes().getMasterNodes().keys().toArray(String.class);获取其他master节点的元数据
//获取集群及信息final TransportNodesListGatewayMetaState.NodesGatewayMetaState nodesState listGatewayMetaState.list(nodesIds, null).actionGet();这里我们看下TransportNodesListGatewayMetaState的构造函数
public TransportNodesListGatewayMetaState(ThreadPool threadPool, ClusterService clusterService, TransportService transportService,ActionFilters actionFilters, GatewayMetaState metaState) {super(ACTION_NAME, threadPool, clusterService, transportService, actionFilters,Request::new, NodeRequest::new, ThreadPool.Names.GENERIC, NodeGatewayMetaState.class);this.metaState metaState;
}//注册action处理类
transportService.registerRequestHandler(actionName, executor, false, canTripCircuitBreaker, requestReader,new TransportHandler());回到list方法会调用doExecute方法
public ActionFutureNodesGatewayMetaState list(String[] nodesIds, Nullable TimeValue timeout) {PlainActionFutureNodesGatewayMetaState future PlainActionFuture.newFuture();execute(new Request(nodesIds).timeout(timeout), future);return future;
}protected void doExecute(Task task, NodesRequest request, ActionListenerNodesResponse listener) {//执行new AsyncAction(task, request, listener).start();
}发送所有节点获取元数据
void start() {final DiscoveryNode[] nodes request.concreteNodes();if (nodes.length 0) {//没有需要获取数据的node// nothing to notifythreadPool.generic().execute(() - listener.onResponse(newResponse(request, responses)));return;}TransportRequestOptions.Builder builder TransportRequestOptions.builder();if (request.timeout() ! null) {builder.withTimeout(request.timeout());}//循环发送请求给所有节点for (int i 0; i nodes.length; i) {final int idx i;final DiscoveryNode node nodes[i];final String nodeId node.getId();try {TransportRequest nodeRequest newNodeRequest(request);if (task ! null) {nodeRequest.setParentTask(clusterService.localNode().getId(), task.getId());}//发送请求transportService.sendRequest(node, transportNodeAction, nodeRequest, builder.build(),new TransportResponseHandlerNodeResponse() {Overridepublic NodeResponse read(StreamInput in) throws IOException {return newNodeResponse(in);}//处理返回Overridepublic void handleResponse(NodeResponse response) {onOperation(idx, response);}Overridepublic void handleException(TransportException exp) {onFailure(idx, node.getId(), exp);}Overridepublic String executor() {return ThreadPool.Names.SAME;}});} catch (Exception e) {onFailure(idx, nodeId, e);}}
}对端接收请求后处理在上面注册的NodeTransportHandler构造每个节点元数据返回
//node请求处理class NodeTransportHandler implements TransportRequestHandlerNodeRequest {Overridepublic void messageReceived(NodeRequest request, TransportChannel channel, Task task) throws Exception {channel.sendResponse(nodeOperation(request, task));}}protected NodeGatewayMetaState nodeOperation(NodeRequest request) {return new NodeGatewayMetaState(clusterService.localNode(), metaState.getMetadata());}我们继续回到每个节点发送请求的返回处理
//处理返回
Override
public void handleResponse(NodeResponse response) {onOperation(idx, response);
}private void onOperation(int idx, NodeResponse nodeResponse) {//记录node的返回结果responses.set(idx, nodeResponse);//当所有节点都返回结果了无论是失败还是成功了if (counter.incrementAndGet() responses.length()) {finishHim();}
}private void finishHim() {NodesResponse finalResponse;try {finalResponse newResponse(request, responses);} catch (Exception e) {logger.debug(failed to combine responses from nodes, e);listener.onFailure(e);return;}//触发监听回调listener.onResponse(finalResponse);
}及获取到了其他节点的元数据继续回到performStateRecovery 需要获取的master角色节点数
//需要分配数量
final int requiredAllocation Math.max(1, minimumMasterNodes);开始通过版本号选择集群层元数据,比较版本号选择版本号最大的集群状态
//集群元数据
for (final TransportNodesListGatewayMetaState.NodeGatewayMetaState nodeState : nodesState.getNodes()) {if (nodeState.metadata() null) {continue;}found;if (electedGlobalState null) {electedGlobalState nodeState.metadata();//比较版本号大的胜出} else if (nodeState.metadata().version() electedGlobalState.version()) {electedGlobalState nodeState.metadata();}for (final ObjectCursorIndexMetadata cursor : nodeState.metadata().indices().values()) {indices.addTo(cursor.value.getIndex(), 1);}
}检查是否有足够数量节点返回了集群状态 //没有获取足够的节点返回消息
if (found requiredAllocation) {listener.onFailure(found [ found ] metadata states, required [ requiredAllocation ]);return;
}构造集群状态删除索引信息下面会选择索引级元数据
//更新全局状态清理索引我们在下一阶段选择它们final Metadata.Builder metadataBuilder Metadata.builder(electedGlobalState).removeAllIndices();遍历所有节点选择返回的索引元数据版本最高的节点作为索引级元数据然后将索引级元数据添加到metadataBuilder中
for (int i 0; i keys.length; i) {if (keys[i] ! null) {final Index index (Index) keys[i];IndexMetadata electedIndexMetadata null;int indexMetadataCount 0;for (final TransportNodesListGatewayMetaState.NodeGatewayMetaState nodeState : nodesState.getNodes()) {if (nodeState.metadata() null) {continue;}final IndexMetadata indexMetadata nodeState.metadata().index(index);if (indexMetadata null) {continue;}if (electedIndexMetadata null) {electedIndexMetadata indexMetadata;//比较版本号选择最大版本号} else if (indexMetadata.getVersion() electedIndexMetadata.getVersion()) {electedIndexMetadata indexMetadata;}indexMetadataCount;}if (electedIndexMetadata ! null) {if (indexMetadataCount requiredAllocation) {logger.debug([{}] found [{}], required [{}], not adding, index, indexMetadataCount, requiredAllocation);} // TODO if this logging statement is correct then we are missing an else here//设置索引级元数据metadataBuilder.put(electedIndexMetadata, false);}}
}构造恢复后的集群级元数据和索引级元数据
//恢复后的集群状态
ClusterState recoveredState Function.ClusterStateidentity().andThen(state - ClusterStateUpdaters.upgradeAndArchiveUnknownOrInvalidSettings(state, clusterService.getClusterSettings())).apply(ClusterState.builder(clusterService.getClusterName()).metadata(metadataBuilder).build());listener.onSuccess(recoveredState);调用GatewayRecoveryListener的onSuccess向集群提交任务
class GatewayRecoveryListener implements Gateway.GatewayStateRecoveredListener {Overridepublic void onSuccess(final ClusterState recoveredState) {logger.trace(successful state recovery, importing cluster state...);clusterService.submitStateUpdateTask(local-gateway-elected-state,new RecoverStateUpdateTask() {Overridepublic ClusterState execute(final ClusterState currentState) {final ClusterState updatedState ClusterStateUpdaters.mixCurrentStateAndRecoveredState(currentState, recoveredState);return super.execute(ClusterStateUpdaters.recoverClusterBlocks(updatedState));}});}Overridepublic void onFailure(final String msg) {logger.info(state recovery failed: {}, msg);resetRecoveredFlags();}}调用RecoverStateUpdateTask的execute方法
Override
public ClusterState execute(final ClusterState currentState) {if (currentState.blocks().hasGlobalBlock(STATE_NOT_RECOVERED_BLOCK) false) {logger.debug(cluster is already recovered);return currentState;}//状态信息恢复完成final ClusterState newState Function.ClusterStateidentity().andThen(ClusterStateUpdaters::updateRoutingTable).andThen(ClusterStateUpdaters::removeStateNotRecoveredBlock).apply(currentState);//开始分配分片return allocationService.reroute(newState, state recovered);
}集群元数据和索引级元数据恢复完成开始分配分片
元数据的持久化 具有master资格的节点和数据节点可以持久化集群状态当接收到集群状态变更时会将其持久化到磁盘GatewayClusterApplier实现了ClusterStateApplier当集群状态变更时会调用applyClusterState方法
Override
public void applyClusterState(ClusterChangedEvent event) {if (event.state().blocks().disableStatePersistence()) {incrementalClusterStateWriter.setIncrementalWrite(false);return;}try {// Hack: This is to ensure that non-master-eligible Zen2 nodes always store a current term// thats higher than the last accepted term.// TODO: can we get rid of this hack?if (event.state().term() incrementalClusterStateWriter.getPreviousManifest().getCurrentTerm()) {incrementalClusterStateWriter.setCurrentTerm(event.state().term());}//更新磁盘上的元数据incrementalClusterStateWriter.updateClusterState(event.state());incrementalClusterStateWriter.setIncrementalWrite(true);} catch (WriteStateException e) {logger.warn(Exception occurred when storing new meta data, e);}
}将集群级元数据和索引级元数据落盘
void updateClusterState(ClusterState newState) throws WriteStateException {//元数据Metadata newMetadata newState.metadata();final long startTimeMillis relativeTimeMillisSupplier.getAsLong();final AtomicClusterStateWriter writer new AtomicClusterStateWriter(metaStateService, previousManifest);//全局元数据long globalStateGeneration writeGlobalState(writer, newMetadata);//索引级元数据MapIndex, Long indexGenerations writeIndicesMetadata(writer, newState);Manifest manifest new Manifest(previousManifest.getCurrentTerm(), newState.version(), globalStateGeneration, indexGenerations);writeManifest(writer, manifest);previousManifest manifest;previousClusterState newState;final long durationMillis relativeTimeMillisSupplier.getAsLong() - startTimeMillis;final TimeValue finalSlowWriteLoggingThreshold this.slowWriteLoggingThreshold;if (durationMillis finalSlowWriteLoggingThreshold.getMillis()) {logger.warn(writing cluster state took [{}ms] which is above the warn threshold of [{}]; wrote metadata for [{}] indices and skipped [{}] unchanged indices,durationMillis, finalSlowWriteLoggingThreshold, writer.getIndicesWritten(), writer.getIndicesSkipped());} else {logger.debug(writing cluster state took [{}ms]; wrote metadata for [{}] indices and skipped [{}] unchanged indices,durationMillis, writer.getIndicesWritten(), writer.getIndicesSkipped());}
}加载磁盘元数据 在node实例的start方法中会调用gatewayMetaState.start方法
//集群元数据
final GatewayMetaState gatewayMetaState injector.getInstance(GatewayMetaState.class);
gatewayMetaState.start(settings(), transportService, clusterService, injector.getInstance(MetaStateService.class),injector.getInstance(MetadataIndexUpgradeService.class), injector.getInstance(MetadataUpgrader.class),injector.getInstance(PersistedClusterStateService.class));然后会调用loadFullState方法
//加载元数据
manifestClusterStateTuple metaStateService.loadFullState();public TupleManifest, Metadata loadFullState() throws IOException {//加载最新的状态文件final Manifest manifest MANIFEST_FORMAT.loadLatestState(logger, namedXContentRegistry, nodeEnv.nodeDataPaths());if (manifest null) {return loadFullStateBWC();}//构建元数据final Metadata.Builder metadataBuilder;if (manifest.isGlobalGenerationMissing()) {metadataBuilder Metadata.builder();} else {final Metadata globalMetadata METADATA_FORMAT.loadGeneration(logger, namedXContentRegistry, manifest.getGlobalGeneration(),nodeEnv.nodeDataPaths());if (globalMetadata ! null) {metadataBuilder Metadata.builder(globalMetadata);} else {throw new IOException(failed to find global metadata [generation: manifest.getGlobalGeneration() ]);}}//索引级元数据for (Map.EntryIndex, Long entry : manifest.getIndexGenerations().entrySet()) {final Index index entry.getKey();final long generation entry.getValue();final String indexFolderName index.getUUID();final IndexMetadata indexMetadata INDEX_METADATA_FORMAT.loadGeneration(logger, namedXContentRegistry, generation,nodeEnv.resolveIndexFolder(indexFolderName));if (indexMetadata ! null) {metadataBuilder.put(indexMetadata, false);} else {throw new IOException(failed to find metadata for existing index index.getName() [location: indexFolderName , generation: generation ]);}}return new Tuple(manifest, metadataBuilder.build());
}从磁盘读取构建索引级元数据和集群级元数据用于构建集群状态对象ClusterState