当前位置: 首页 > news >正文

容桂网站开发济南seo关键词优化顾问

容桂网站开发,济南seo关键词优化顾问,宁波网站开发建设,wordpress3.1.3漏洞Doris中FE主要负责接收和返回客户端请求、元数据以及集群管理、查询计划生成等工作。代码路径#xff1a;doris/fe/fe-core/src/main/java/org/apache/doris/DorisFE.java 环境检查 在启动FE的时候#xff0c;主要做环境检查。检查一些启动时必要的环境变量以及初始化配置…Doris中FE主要负责接收和返回客户端请求、元数据以及集群管理、查询计划生成等工作。代码路径doris/fe/fe-core/src/main/java/org/apache/doris/DorisFE.java 环境检查 在启动FE的时候主要做环境检查。检查一些启动时必要的环境变量以及初始化配置文件比如DORIS_HOME_DIR如果没有人为配置 DORIS_HOME_DIR则该变量的值就是doris的解压安装目录PID_DIR是为了判断FE进程是第一次启动还是之前启动过并创建pid文件fe.pid。解析命令行参数。初始化fe.conf、fe_custom.conf、ldap.conf。检测JDK版本是否匹配主要是检测compile的JDK和runtime的jdk版本需要要求runtimeVersion compileVersion。 检查 解析启动FE时输入的命令行参数以便进行不同的操作主要会包含这几类–version或者执行 -v 主要是打印FE的版本–helper或 -h ,主要是指定 helper node 然后加入FE的 bdb je的副本组–image:或-i主要是检查image文件–bdb或-b主要是用以运行bdbje的命令行工具具体解析逻辑如下(bdbje tool的代码逻辑过长有兴趣的可以自己去看一下 parseArgs的实现) private static void checkCommandLineOptions(CommandLineOptions cmdLineOpts) {if (cmdLineOpts.isVersion()) {System.out.println(Build version: Version.DORIS_BUILD_VERSION);System.out.println(Build time: Version.DORIS_BUILD_TIME);System.out.println(Build info: Version.DORIS_BUILD_INFO);System.out.println(Build hash: Version.DORIS_BUILD_HASH);System.out.println(Java compile version: Version.DORIS_JAVA_COMPILE_VERSION);System.exit(0);} else if (cmdLineOpts.runBdbTools()) {BDBTool bdbTool new BDBTool(Env.getCurrentEnv().getBdbDir(), cmdLineOpts.getBdbToolOpts());if (bdbTool.run()) { System.exit(0);} else { System.exit(-1);}} else if (cmdLineOpts.runImageTool()) {File imageFile new File(cmdLineOpts.getImagePath());if (!imageFile.exists()) {System.out.println(image does not exist: imageFile.getAbsolutePath() . Please put an absolute path instead); System.exit(-1);} else {System.out.println(Start to load image: );try {MetaReader.read(imageFile, Env.getCurrentEnv());System.out.println(Load image success. Image file cmdLineOpts.getImagePath() is valid);} catch (Exception e) {System.out.println(Load image failed. Image file cmdLineOpts.getImagePath() is invalid);LOG.warn(, e);} finally {System.exit(0);}}}// go on}提前介绍以下fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java的getCurrentEnv函数用于返回ENV单例。该类用于A singleton class can also be seen as an entry point of Doris. All manager classes can be obtained through this class. 类似于存放全局可见数据的全局变量比如CatalogMgr、LoadManager等。 根据输入的参数如果不是运行image tool、bdbje tool或者打印FE的version信息就继续往下执行这个时候就要准备开始启动FE了。同样启动FE时需要初始化一些操作。初始化的时候主要是检查了FE的启动IP是不是一个合法的IP。这里需要注意的就是我们在配置文件中配置的CIDR或者FQDN的配置在初始化的时候会检测。很多小伙伴在启动FE的时候没有正确配置IP的时候最后用了localhost或者本地回环IP启动导致没有使用我们想要的IP启动具体的判断逻辑就是在这 上面的逻辑看初始化的时候会遍历网卡信息拿遍历的IP地址和填写的PRIORITY_CIDR_SEPARATOR的值做匹配匹配上了就会用处于填写的CIDR范围中的ip启动匹配不上的时候会从网卡IP中拿出一个合法的IP作为FE的启动IP这个就不一定是我们想要的那个启动IP。特别是当前机器上有很多虚拟网卡的IP信息就会很大概率用排在前面的虚拟IP启动。当然这里还会根据配置文件中的信息去检查是不是FQDN是不是IPV6有兴趣的的同学都可以看一下具体的代码逻辑。Init操作其实就是获取了当前FE的启动IP获取完IP后就需要检测端口看FE的启动的需要的这些端口是否是正常的。 如上图所示Doris主要提供四个端口Edit log port、Http port、Https port、Query port和Rpc port。 开始启动 还有一个比较重要的检测就是需要根据fe.conf中的 enable_bdbje_debug_mode参数的值来决定怎么启动。这个值主要是某些时候我们的FE的leader选举出现一定问题做元数据运维的时候会走运维模式逻辑。如果是正常情况下这个值默认是FALSE就会走后续的正常启动FE的流程。 元数据环境初始化 // init catalog and wait it be readyEnv.getCurrentEnv().initialize(args);Env.getCurrentEnv().waitForReady();0 元数据目录如果不存在需要手动创建这里主要是需要手动创建最外层的metaDir内层的bdb的目录和image的目录会自己创建。获取本节点host port、获取helper节点 host port【fe/fe-core/src/main/java/org/apache/doris/deploy/DeployManager.java AmbariDeployManager.java K8sDeployManager.java LocalFileDeployManager.java】1 初始化插件管理器启动审计日志进程。2 根据当前的元数据信息获取集群ID和节点角色信息ROLE和VERSION文件的判断) 代码较长只节选了关键代码。具体逻辑可以看一下getClusterIdAndRole 的具体实现这里主要就是根据指定的helper的节点的元数据信息或者本地存在的元数据信息获取到集群的ROLE信息和VERSION信息。如果集群是非helper节点且第一次启动ROLE文件实没有这个时候需要创建这个文件。同时赋予相关值ROLEFOLLOWER将节点信息写入到元数据文件中。如果当前阶段存在这些元数据文件则会去元数据文件中获取当前节点的角色信息。 // ATTN:// If the version file and role file does not exist and the helper node is itself, this should be the very beginning startup of the cluster, so we create ROLE and VERSION file, set isFirstTimeStartUp to true, and add itself to frontends list. If ROLE and VERSION file is deleted for some reason, we may arbitrarily start this node as FOLLOWER, which may cause UNDEFINED behavior. Everything may be OK if the origin role is exactly FOLLOWER, but if not, FE process will exit somehow.Storage storage new Storage(this.imageDir);if (!roleFile.exists()) { // The very first time to start the first node of the cluster. It should became a Master node (Master nodes role is also FOLLOWER, which means electable) For compatibility. Because this is the very first time to start, so we arbitrarily choose a new name for this noderole FrontendNodeType.FOLLOWER; nodeName genFeNodeName(selfNode.getIdent(), selfNode.getPort(), false /* new style */);storage.writeFrontendRoleAndNodeName(role, nodeName);LOG.info(very first time to start this node. role: {}, node name: {}, role.name(), nodeName);} else {role storage.getRole();if (role FrontendNodeType.REPLICA) { // for compatibilityrole FrontendNodeType.FOLLOWER;}nodeName storage.getNodeName();if (Strings.isNullOrEmpty(nodeName)) {// In normal case, if ROLE file exist, role and nodeName should both exist.// But we will get a empty nodeName after upgrading.// So for forward compatibility, we use the old-style way of naming: ip_port,// and update the ROLE file.nodeName genFeNodeName(selfNode.getHost(), selfNode.getPort(), true/* old style */);storage.writeFrontendRoleAndNodeName(role, nodeName);LOG.info(forward compatibility. role: {}, node name: {}, role.name(), nodeName);}// Notice:// With the introduction of FQDN, the nodeName is no longer bound to an IP address,// so consistency is no longer checked here. Otherwise, the startup will fail.}如果我们启动了一个FE无法从给出的helper节点信息中同helper节点建立连接就会出现current node is not added to the group. please add it first. “sleep 5 seconds and retry, current helper nodes: {}”, helperNodes。的日志信息这个异常原因就是由于当前节点无法和指定的helper节点建立正常的连接信息导致的。当和helper节点构建正常连接后就会从helper节点同步 VERSION信息。如果本身节点存在VERSIN文件的信息说明不是第一次启动这个时候就会用本地的这个文件的元数据信息同HELPER节点的VERSION信息进行比对。主要是比较clusterID。如果不一致说明两个节点不是同一个集群的节点启动进程就直接退出了。 // try to get role and node name from helper node, this loop will not end until we get certain role type and namewhile (true) {if (!getFeNodeTypeAndNameFromHelpers()) {LOG.warn(current node is not added to the group. please add it first. sleep 5 seconds and retry, current helper nodes: {}, helperNodes);try { Thread.sleep(5000);continue;} catch (InterruptedException e) {LOG.warn(, e); System.exit(-1);}}if (role FrontendNodeType.REPLICA) // for compatibilityrole FrontendNodeType.FOLLOWER;break;}HostInfo rightHelperNode helperNodes.get(0);Storage storage new Storage(this.imageDir);if (roleFile.exists() (role ! storage.getRole() || !nodeName.equals(storage.getNodeName())) || !roleFile.exists()) {storage.writeFrontendRoleAndNodeName(role, nodeName);}if (!versionFile.exists()) {// If the version file doesnt exist, download it from helper nodeif (!getVersionFileFromHelper(rightHelperNode)) {throw new IOException(fail to download version file from rightHelperNode.getHost() will exit.);}// NOTE: cluster_id will be init when Storage object is constructed,// so we new one.storage new Storage(this.imageDir);clusterId storage.getClusterID();token storage.getToken();if (Strings.isNullOrEmpty(token)) { token Config.auth_token;}} else {// If the version file exist, read the cluster id and check the// id with helper node to make sure they are identicalclusterId storage.getClusterID();token storage.getToken();try {String url http:// NetUtils.getHostPortInAccessibleFormat(rightHelperNode.getHost(), Config.http_port) /check;HttpURLConnection conn HttpURLUtil.getConnectionWithNodeIdent(url);conn.setConnectTimeout(2 * 1000);conn.setReadTimeout(2 * 1000);String clusterIdString conn.getHeaderField(MetaBaseAction.CLUSTER_ID);int remoteClusterId Integer.parseInt(clusterIdString);if (remoteClusterId ! clusterId) {LOG.error(cluster id is not equal with helper node {}. will exit.,rightHelperNode.getHost());throw new IOException(cluster id is not equal with helper node rightHelperNode.getHost() . will exit.);}String remoteToken conn.getHeaderField(MetaBaseAction.TOKEN);if (token null remoteToken ! null) {LOG.info(get token from helper node. token{}., remoteToken);token remoteToken;storage.writeClusterIdAndToken();storage.reload();}if (Config.enable_token_check) {Preconditions.checkNotNull(token);Preconditions.checkNotNull(remoteToken);if (!token.equals(remoteToken)) {throw new IOException(token is not equal with helper node rightHelperNode.getHost() . will exit.);}}} catch (Exception e) {throw new IOException(fail to check cluster_id and token with helper node., e);}}getNewImage(rightHelperNode);3 经过这一步 VERSION和ROLE的元数据信息比对后确定是同一个集群内的节点也确定了这个FE的ROLE信息了就需要从image中同步editlog。editLog为bdbje[Oracle Berkeley DB Java Edition (opens new window)],在 Doris 中我们使用 bdbje 完成元数据操作日志的持久化、FE 高可用等功能。【就相当于ETCD的Raft共识模块WAL日志模块的组合】。image file就是内存checkpoint到磁盘上的文件。globalTransactionMgr是全局事务管理器。 // 3. Load image first and replay editsthis.editLog new EditLog(nodeName);loadImage(this.imageDir); // load image fileeditLog.open(); // open bdb envthis.globalTransactionMgr.setEditLog(editLog);this.idGenerator.setEditLog(editLog);456 创建一系列的cleaner 线程和监听线程 // 4. create load and export job label cleaner threadcreateLabelCleaner();// 5. create txn cleaner threadcreateTxnCleaner();// 6. start state listener threadcreateStateListener(); listener.start();if (!Config.edit_log_type.equalsIgnoreCase(bdb)) {// If not using bdb, we need to notify the FE type transfer manually.notifyNewFETypeTransfer(FrontendNodeType.MASTER);}if (statisticsCleaner ! null) {statisticsCleaner.start();}if (statisticsAutoAnalyzer ! null) {statisticsAutoAnalyzer.start();} 此时启动前初始化工作就做完了。等待catalog信息的同步完成即可进行下一步。 // wait until FE is ready.public void waitForReady() throws InterruptedException {long counter 0;while (true) {if (isReady()) {LOG.info(catalog is ready. FE type: {}, feType);break;}Thread.sleep(100);if (counter % 20 0) {LOG.info(wait catalog to be ready. FE type: {}. is ready: {}, counter: {}, feType, isReady.get(),counter);}}}启动FE的SERVER 创建 QeServer 负责与mysql client 通信创建 FeServer 由Thrift Server组成负责 FE 和 BE 通信创建 HttpServer 负责提供Rest API以及Doris FE前端页面接口。 // init and start:// 1. HttpServer for HTTP Server// 2. FeServer for Thrift Server// 3. QeService for MySQL ServerFeServer feServer new FeServer(Config.rpc_port);feServer.start();if (options.enableHttpServer) {HttpServer httpServer new HttpServer();httpServer.setPort(Config.http_port);httpServer.setHttpsPort(Config.https_port);httpServer.setMaxHttpPostSize(Config.jetty_server_max_http_post_size);httpServer.setAcceptors(Config.jetty_server_acceptors);httpServer.setSelectors(Config.jetty_server_selectors);httpServer.setWorkers(Config.jetty_server_workers);httpServer.setKeyStorePath(Config.key_store_path);httpServer.setKeyStorePassword(Config.key_store_password);httpServer.setKeyStoreType(Config.key_store_type);httpServer.setKeyStoreAlias(Config.key_store_alias);httpServer.setEnableHttps(Config.enable_https);httpServer.setMaxThreads(Config.jetty_threadPool_maxThreads);httpServer.setMinThreads(Config.jetty_threadPool_minThreads);httpServer.setMaxHttpHeaderSize(Config.jetty_server_max_http_header_size);httpServer.start();Env.getCurrentEnv().setHttpReady(true);}if (options.enableQeService) {QeService qeService new QeService(Config.query_port, ExecuteEnv.getInstance().getScheduler());qeService.start();}ThreadPoolManager.registerAllThreadPoolMetric();Doris 的元数据主要存储4类数据 用户数据信息。包括数据库、表的 Schema、分片信息等。 各类作业信息。如导入作业Clone 作业、SchemaChange 作业等。 用户及权限信息。 集群及节点信息 元数据的数据流具体过程如下 只有 leader FE 可以对元数据进行写操作。写操作在修改 leader 的内存后会序列化为一条log按照 key-value 的形式写入 bdbje。其中 key 为连续的整型作为 log idvalue 即为序列化后的操作日志。 日志写入 bdbje 后bdbje 会根据策略写多数/全写将日志复制到其他 non-leader 的 FE 节点。non-leader FE 节点通过对日志回放修改自身的元数据内存镜像完成与 leader 节点的元数据同步。 leader 节点的日志条数达到阈值后默认 10w 条会启动 checkpoint 线程。checkpoint 会读取已有的 image 文件和其之后的日志重新在内存中回放出一份新的元数据镜像副本。然后将该副本写入到磁盘形成一个新的 image。之所以是重新生成一份镜像副本而不是将已有镜像写成 image主要是考虑写 image 加读锁期间会阻塞写操作。所以每次 checkpoint 会占用双倍内存空间。 image 文件生成后leader 节点会通知其他 non-leader 节点新的 image 已生成。non-leader 主动通过 http 拉取最新的 image 文件来更换本地的旧文件。 bdbje 中的日志在 image 做完后会定期删除旧的 源码解析 Doris FE启动步骤只说核心的几个部分 Doris启动的时候首先去初始化Catalog并等待Catalog完成 启动QeServer 这个是mysql client连接用的端口是9030 启动FeServer这个是Thrift Server主要是FE和BE之间通讯用的 启动HttpServer 各种rest api接口及前端web界面 这里我们分析的是元数据这块只看Catalog初始化过程中做了什么事情 PaloFe —— start() // 初始化Catalog并等待初始化完成 Catalog.getCurrentCatalog().initialize(args); Catalog.getCurrentCatalog().waitForReady(); Catalog --initialize() 第一步获取本节点和Helper节点 getSelfHostPort(); getHelperNodes(args); 第二步检查和创建元数据目录及文件 第三步获取集群ID及角色Observer和Follower getClusterIdAndRole(); 第四步首先加载image并回访editlog this.editLog new EditLog(nodeName); loadImage(this.imageDir); // load image file editLog.open(); // open bdb env this.globalTransactionMgr.setEditLog(editLog); this.idGenerator.setEditLog(editLog); 第五步创建load和导出作业标签清理线程这是一个MasterDaemon守护线程 createLabelCleaner() 第六步创建tnx清理线程 createTxnCleaner(); 第七步启动状态监听线程这个线程主要是监听MasterObserver、Follower状态转换及Observer和Follower元数据同步Leader选举 createStateListener(); listener.start(); Load Job Label清理createLabelCleaner //每个label_keep_max_second默认三天从idToLoadJob, dbToLoadJobs and dbLabelToLoadJobs删除旧的job //包括从ExportMgr删除exportjob, exportJob 默认七天清理一次控制参数history_job_keep_max_second //这个线程每个四个小时运行一次是由label_clean_interval_second参数来控制 public void createLabelCleaner() { labelCleaner new MasterDaemon(“LoadLabelCleaner”, Config.label_clean_interval_second * 1000L) { Override protected void runAfterCatalogReady() { load.removeOldLoadJobs(); loadManager.removeOldLoadJob(); exportMgr.removeOldExportJobs(); } }; } 事务(tnx)清理线程:createTxnCleaner() //定期清理过期的事务,默认30秒清理一次控制参数transaction_clean_interval_second //这里清理的是tnx状态是: //1.已过期VISIBLE(可见) 或者 ABORTED终止, 并且 expired已过期 //2.已超时事务状态是PREPARE, 但是 timeout //事务状态是COMMITTED和 VISIBLE状态的不能被清除只能成功 public void createTxnCleaner() { txnCleaner new MasterDaemon(“txnCleaner”, Config.transaction_clean_interval_second) { Override protected void runAfterCatalogReady() { globalTransactionMgr.removeExpiredAndTimeoutTxns(); } }; } FE状态监听器线程 createStateListener() 这个线程主要是监听MasterObserver、Follower状态转换及Observer和Follower元数据同步Leader选举 定期检查默认是100毫秒参数STATE_CHANGE_CHECK_INTERVAL_MS ​ public void createStateListener() { listener new Daemon(“stateListener”, STATE_CHANGE_CHECK_INTERVAL_MS) { Override protected synchronized void runOneCycle() { ​ while (true) { FrontendNodeType newType null; try { newType typeTransferQueue.take(); } catch (InterruptedException e) { LOG.error(“got exception when take FE type from queue”, e); Util.stdoutWithTime(got exception when take FE type from queue. e.getMessage()); System.exit(-1); } Preconditions.checkNotNull(newType); LOG.info(“begin to transfer FE type from {} to {}”, feType, newType); if (feType newType) { return; } ​ /* * INIT - MASTER: transferToMaster * INIT - FOLLOWER/OBSERVER: transferToNonMaster * UNKNOWN - MASTER: transferToMaster * UNKNOWN - FOLLOWER/OBSERVER: transferToNonMaster * FOLLOWER - MASTER: transferToMaster * FOLLOWER/OBSERVER - INIT/UNKNOWN: set isReady to false */ switch (feType) { case INIT: { switch (newType) { case MASTER: { transferToMaster(); break; } case FOLLOWER: case OBSERVER: { transferToNonMaster(newType); break; } case UNKNOWN: break; default: break; } break; } case UNKNOWN: { switch (newType) { case MASTER: { transferToMaster(); break; } case FOLLOWER: case OBSERVER: { transferToNonMaster(newType); break; } default: break; } break; } case FOLLOWER: { switch (newType) { case MASTER: { transferToMaster(); break; } case UNKNOWN: { transferToNonMaster(newType); break; } default: break; } break; } case OBSERVER: { switch (newType) { case UNKNOWN: { transferToNonMaster(newType); break; } default: break; } break; } case MASTER: { // exit if master changed to any other type String msg transfer FE type from MASTER to newType.name() “. exit”; LOG.error(msg); Util.stdoutWithTime(msg); System.exit(-1); } default: break; } // end switch formerFeType ​ feType newType; LOG.info(“finished to transfer FE type to {}”, feType); } } // end runOneCycle }; ​ listener.setMetaContext(metaContext); } Leader的选举通过 transferToNonMaster和transferToMaster 元数据同步方法 startMasterOnlyDaemonThreads这个方法是启动Checkpoint守护线程由Master定期朝各个Follower和Observer推送image然后在有节点本地做Image回放更新自己本节点的元数据这个线程只在Master节点启动 startNonMasterDaemonThreads 启动其他守护线程在所有FE节点启动这里包括TabletStatMgr、LabelCleaner、EsRepository、DomainResolver private void transferToNonMaster(FrontendNodeType newType) { isReady.set(false); if (feType FrontendNodeType.OBSERVER || feType FrontendNodeType.FOLLOWER) { Preconditions.checkState(newType FrontendNodeType.UNKNOWN); LOG.warn(“{} to UNKNOWN, still offer read service”, feType.name()); // not set canRead here, leave canRead as what is was. // if meta out of date, canRead will be set to false in replayer thread. metaReplayState.setTransferToUnknown(); return; } ​ // transfer from INIT/UNKNOWN to OBSERVER/FOLLOWER // add helper sockets if (Config.edit_log_type.equalsIgnoreCase(“BDB”)) { for (Frontend fe : frontends.values()) { if (fe.getRole() FrontendNodeType.FOLLOWER || fe.getRole() FrontendNodeType.REPLICA) { ((BDBHA) getHaProtocol()).addHelperSocket(fe.getHost(), fe.getEditLogPort()); } } } ​ if (replayer null) { //创建回放线程 createReplayer(); replayer.start(); } ​ // ‘isReady’ will be set to true in ‘setCanRead()’ method fixBugAfterMetadataReplayed(true); startNonMasterDaemonThreads();​ MetricRepo.init(); } ​ 创建editlog回放守护线程这里主要是将Master推送的Image日志信息在本地进行回访写到editlog中 public void createReplayer() { replayer new Daemon(“replayer”, REPLAY_INTERVAL_MS) { Override protected void runOneCycle() { boolean err false; boolean hasLog false; try { //进行image回放重写本地editlog hasLog replayJournal(-1); metaReplayState.setOk(); } catch (InsufficientLogException insufficientLogEx) { // 从以下成员中复制丢失的日志文件拥有文件的复制组 LOG.error(“catch insufficient log exception. please restart.”, insufficientLogEx); NetworkRestore restore new NetworkRestore(); NetworkRestoreConfig config new NetworkRestoreConfig(); config.setRetainLogFiles(false); restore.execute(insufficientLogEx, config); System.exit(-1); } catch (Throwable e) { LOG.error(“replayer thread catch an exception when replay journal.”, e); metaReplayState.setException(e); try { Thread.sleep(5000); } catch (InterruptedException e1) { LOG.error(sleep got exception. , e); } err true; } ​ setCanRead(hasLog, err); } }; replayer.setMetaContext(metaContext); } 日志回放重写本地editlog ​ public synchronized boolean replayJournal(long toJournalId) { long newToJournalId toJournalId; if (newToJournalId -1) { newToJournalId getMaxJournalId(); } if (newToJournalId replayedJournalId.get()) { return false; } ​ LOG.info(“replayed journal id is {}, replay to journal id is {}”, replayedJournalId, newToJournalId); JournalCursor cursor editLog.read(replayedJournalId.get() 1, newToJournalId); if (cursor null) { LOG.warn(“failed to get cursor from {} to {}”, replayedJournalId.get() 1, newToJournalId); return false; } ​ long startTime System.currentTimeMillis(); boolean hasLog false; while (true) { JournalEntity entity cursor.next(); if (entity null) { break; } hasLog true; //生成新的editlog EditLog.loadJournal(this, entity); replayedJournalId.incrementAndGet(); LOG.debug(“journal {} replayed.”, replayedJournalId); if (feType ! FrontendNodeType.MASTER) { journalObservable.notifyObservers(replayedJournalId.get()); } if (MetricRepo.isInit) { // Metric repo may not init after this replay thread start MetricRepo.COUNTER_EDIT_LOG_READ.increase(1L); } } long cost System.currentTimeMillis() - startTime; if (cost 1000) { LOG.warn(“replay journal cost too much time: {} replayedJournalId: {}”, cost, replayedJournalId); } ​ return hasLog; } 只有角色为 Master 的 FE 才会主动定期生成 image 文件。每次生成完后都会推送给其他非 Master 角色的 FE。当确认其他所有 FE 都收到这个 image 后Master FE 会删除 bdbje 中旧的元数据 journal。所以如果 image 生成失败或者 image 推送给其他 FE 失败时都会导致 bdbje 中的数据不断累积。 在Master节点日志中搜索你可以看到下面这个日志一分钟一次 2021-04-16 08:34:34,554 INFO (leaderCheckpointer|72) [BDBJEJournal.getFinalizedJournalId():410] database names: 52491702 2021-04-16 08:34:34,554 INFO (leaderCheckpointer|72) [Checkpoint.runAfterCatalogReady():81] checkpoint imageVersion 52491701, checkPointVersion 0 ​ CheckPoint线程的启动只在Master Fe节点在Catalog.startMasterOnlyDaemonThreads方法里启动的 在这里startMasterOnlyDaemonThreads方法里会在Master Fe 节点启动一个 TimePrinter 线程。该线程会定期向 bdbje 中写入一个当前时间的 key-value 条目。其余 non-leader 节点通过回放这条日志读取日志中记录的时间和本地时间进行比较如果发现和本地时间的落后大于指定的阈值配置项meta_delay_toleration_second。写入间隔为该配置项的一半则该节点会处于不可读的状态当查询或者load等任务落到这节点的时候会报failed to call frontend service异常。此机制解决了 non-leader 节点在长时间和 leader 失联后仍然提供过期的元数据服务的问题。 所以这里整个集群是需要做NTP时间同步保持各个节点时间一致避免因为时间差异造成的服务不可用 // start all daemon threads only running on Master private void startMasterOnlyDaemonThreads() { // start checkpoint thread checkpointer new Checkpoint(editLog); checkpointer.setMetaContext(metaContext); // set “checkpointThreadId” before the checkpoint thread start, because the thread // need to check the “checkpointThreadId” when running. checkpointThreadId checkpointer.getId(); ​ checkpointer.start(); … // time printer createTimePrinter(); timePrinter.start(); … updateDbUsedDataQuotaDaemon.start(); } CheckPoint线程启动以后会定期向非Master FE推送Image日志信息默认是一分钟配置参数checkpoint_interval_second 具体方法runAfterCatalogReady Master FE定期向非Master FE推送image日志信息 删除旧的journals获取每个非Master节点的当前journal ID。 删除bdb数据库时不能删除比任何非Master节点的当前journal ID 更新的的db。 否则此滞后节点将永远无法获取已删除的journal。 最后删除旧的image文件 // push image file to all the other non master nodes// DO NOT get other nodes from HaProtocol, because node may not in bdbje replication group yet.ListFrontend allFrontends Catalog.getServingCatalog().getFrontends(null);int successPushed 0;int otherNodesCount 0;if (!allFrontends.isEmpty()) {otherNodesCount allFrontends.size() - 1; // skip master itselffor (Frontend fe : allFrontends) {String host fe.getHost();if (host.equals(Catalog.getServingCatalog().getMasterIp())) {// skip master itselfcontinue;}int port Config.http_port;String url http:// host : port /put?version replayedJournalId port port;LOG.info(Put image:{}, url);​ try { MetaHelper.getRemoteFile(url, PUT_TIMEOUT_SECOND * 1000, new NullOutputStream()); successPushed; } catch (IOException e) { LOG.error(“Exception when pushing image file. url {}”, url, e); } } LOG.info(push image.{} to other nodes. totally {} nodes, push succeed {} nodes,replayedJournalId, otherNodesCount, successPushed);}// Delete old journalsif (successPushed otherNodesCount) {long minOtherNodesJournalId Long.MAX_VALUE;long deleteVersion checkPointVersion;if (successPushed 0) {for (Frontend fe : allFrontends) {String host fe.getHost();if (host.equals(Catalog.getServingCatalog().getMasterIp())) {// skip master itselfcontinue;}int port Config.http_port;URL idURL;HttpURLConnection conn null;try {/** get current replayed journal id of each non-master nodes.* when we delete bdb database, we cannot delete db newer than* any non-master nodes current replayed journal id. otherwise,* this lagging node can never get the deleted journal.*/idURL new URL(http:// host : port /journal_id);conn (HttpURLConnection) idURL.openConnection();conn.setConnectTimeout(CONNECT_TIMEOUT_SECOND * 1000);conn.setReadTimeout(READ_TIMEOUT_SECOND * 1000);String idString conn.getHeaderField(id);long id Long.parseLong(idString);if (minOtherNodesJournalId id) {minOtherNodesJournalId id;}} catch (IOException e) {LOG.error(Exception when getting current replayed journal id. host{}, port{},host, port, e);minOtherNodesJournalId 0;break;} finally {if (conn ! null) {conn.disconnect();}}}deleteVersion Math.min(minOtherNodesJournalId, checkPointVersion);}//删除旧的JournaleditLog.deleteJournals(deleteVersion 1);if (MetricRepo.isInit) {MetricRepo.COUNTER_IMAGE_PUSH.increase(1L);}LOG.info(journals {} are deleted. image version {}, other nodes min version {}, deleteVersion, checkPointVersion, minOtherNodesJournalId);}//删除旧的image文件MetaCleaner cleaner new MetaCleaner(Config.meta_dir /image);try {cleaner.clean();} catch (IOException e) {LOG.error(Master delete old image file fail., e);}https://new-developer.aliyun.com/article/1124025 https://blog.csdn.net/flyinthesky111/article/details/131281581 https://blog.csdn.net/qq_42200605/article/details/124232478 https://blog.csdn.net/hf200012/article/details/117825649 https://www.jianshu.com/p/de2896715e02
文章转载自:
http://www.morning.ftmly.cn.gov.cn.ftmly.cn
http://www.morning.lnmby.cn.gov.cn.lnmby.cn
http://www.morning.gycyt.cn.gov.cn.gycyt.cn
http://www.morning.hdrrk.cn.gov.cn.hdrrk.cn
http://www.morning.lyhrg.cn.gov.cn.lyhrg.cn
http://www.morning.rgdcf.cn.gov.cn.rgdcf.cn
http://www.morning.qbkw.cn.gov.cn.qbkw.cn
http://www.morning.yxnfd.cn.gov.cn.yxnfd.cn
http://www.morning.rzmkl.cn.gov.cn.rzmkl.cn
http://www.morning.nlglm.cn.gov.cn.nlglm.cn
http://www.morning.kpmxn.cn.gov.cn.kpmxn.cn
http://www.morning.qkbwd.cn.gov.cn.qkbwd.cn
http://www.morning.zsfooo.com.gov.cn.zsfooo.com
http://www.morning.hrpbq.cn.gov.cn.hrpbq.cn
http://www.morning.nqwz.cn.gov.cn.nqwz.cn
http://www.morning.swkpq.cn.gov.cn.swkpq.cn
http://www.morning.khntd.cn.gov.cn.khntd.cn
http://www.morning.kdnrp.cn.gov.cn.kdnrp.cn
http://www.morning.zhffz.cn.gov.cn.zhffz.cn
http://www.morning.skpdg.cn.gov.cn.skpdg.cn
http://www.morning.nzcys.cn.gov.cn.nzcys.cn
http://www.morning.sknbb.cn.gov.cn.sknbb.cn
http://www.morning.blznh.cn.gov.cn.blznh.cn
http://www.morning.mgtmm.cn.gov.cn.mgtmm.cn
http://www.morning.ymqrc.cn.gov.cn.ymqrc.cn
http://www.morning.tdxlj.cn.gov.cn.tdxlj.cn
http://www.morning.clbzy.cn.gov.cn.clbzy.cn
http://www.morning.xphcg.cn.gov.cn.xphcg.cn
http://www.morning.bchhr.cn.gov.cn.bchhr.cn
http://www.morning.jhgxh.cn.gov.cn.jhgxh.cn
http://www.morning.dbnpz.cn.gov.cn.dbnpz.cn
http://www.morning.ftzll.cn.gov.cn.ftzll.cn
http://www.morning.hwpcm.cn.gov.cn.hwpcm.cn
http://www.morning.skbbt.cn.gov.cn.skbbt.cn
http://www.morning.irqlul.cn.gov.cn.irqlul.cn
http://www.morning.jjhng.cn.gov.cn.jjhng.cn
http://www.morning.skkmz.cn.gov.cn.skkmz.cn
http://www.morning.ryzgp.cn.gov.cn.ryzgp.cn
http://www.morning.pctsq.cn.gov.cn.pctsq.cn
http://www.morning.bkjhx.cn.gov.cn.bkjhx.cn
http://www.morning.pxspq.cn.gov.cn.pxspq.cn
http://www.morning.dycbp.cn.gov.cn.dycbp.cn
http://www.morning.mdmxf.cn.gov.cn.mdmxf.cn
http://www.morning.ysbhj.cn.gov.cn.ysbhj.cn
http://www.morning.rnrfs.cn.gov.cn.rnrfs.cn
http://www.morning.zkqwk.cn.gov.cn.zkqwk.cn
http://www.morning.dongyinet.cn.gov.cn.dongyinet.cn
http://www.morning.swkzk.cn.gov.cn.swkzk.cn
http://www.morning.xltwg.cn.gov.cn.xltwg.cn
http://www.morning.ljtwp.cn.gov.cn.ljtwp.cn
http://www.morning.wpqwk.cn.gov.cn.wpqwk.cn
http://www.morning.yhglt.cn.gov.cn.yhglt.cn
http://www.morning.gbnsq.cn.gov.cn.gbnsq.cn
http://www.morning.hjwzpt.com.gov.cn.hjwzpt.com
http://www.morning.nstml.cn.gov.cn.nstml.cn
http://www.morning.dblgm.cn.gov.cn.dblgm.cn
http://www.morning.fcwb.cn.gov.cn.fcwb.cn
http://www.morning.qbmpb.cn.gov.cn.qbmpb.cn
http://www.morning.rqkk.cn.gov.cn.rqkk.cn
http://www.morning.lqgtx.cn.gov.cn.lqgtx.cn
http://www.morning.kwnbd.cn.gov.cn.kwnbd.cn
http://www.morning.pxsn.cn.gov.cn.pxsn.cn
http://www.morning.qhkdt.cn.gov.cn.qhkdt.cn
http://www.morning.gydsg.cn.gov.cn.gydsg.cn
http://www.morning.wqcbr.cn.gov.cn.wqcbr.cn
http://www.morning.nkddq.cn.gov.cn.nkddq.cn
http://www.morning.dnzyx.cn.gov.cn.dnzyx.cn
http://www.morning.rhjsx.cn.gov.cn.rhjsx.cn
http://www.morning.fosfox.com.gov.cn.fosfox.com
http://www.morning.wdpbq.cn.gov.cn.wdpbq.cn
http://www.morning.gassnw.com.gov.cn.gassnw.com
http://www.morning.mkfhx.cn.gov.cn.mkfhx.cn
http://www.morning.gxcit.com.gov.cn.gxcit.com
http://www.morning.hgcz.cn.gov.cn.hgcz.cn
http://www.morning.fjtnh.cn.gov.cn.fjtnh.cn
http://www.morning.hsksm.cn.gov.cn.hsksm.cn
http://www.morning.bmtyn.cn.gov.cn.bmtyn.cn
http://www.morning.trffl.cn.gov.cn.trffl.cn
http://www.morning.xbwqg.cn.gov.cn.xbwqg.cn
http://www.morning.prznc.cn.gov.cn.prznc.cn
http://www.tj-hxxt.cn/news/262608.html

相关文章:

  • 电子商务网站建设的流程中国网新山东
  • 金融行业网站开发支持付费下载系统的网站模板或建站软件
  • 哈尔滨网站建设教学网站建设空心正方形
  • 南磨房做网站公司全屏网站 图片优化
  • 私募基金网站开发流程博客seo教程
  • 上海建设部网站首页模板网站首页设计
  • 沈阳做网站客户多吗阿里云虚拟机搭建wordpress
  • 视频模板网站推荐做视频网站要准备哪些资料
  • 公司网站文件夹设计网站由哪几部分组成
  • 网站维护和建设实报告自我介绍面试模板
  • 餐饮公司网站建设的特点网站改版有什么影响
  • 做网站的开发环境外贸网站推广有哪些
  • 成都锦江建设局网站深圳抖音推广公司
  • 网站设计深圳抽奖网站怎么做的
  • 阿里云主机搭建网站免费做名片的网站
  • 网站页面设计模板代码网站开发小图标怎么设置
  • 链接点开网页表白的网站怎么做的wordpress 大图主题
  • 龙岗 营销型网站建设wordpress后台自定义面版上传
  • 河南郑州网站推广优化外包如何做魔道祖师网站
  • 企业网站收费标准 资源新版在线天堂
  • 兰州市门户网站杭州百度快照优化排名推广
  • 简易手机站南京室内设计公司排名
  • 免费建立网站论坛网络营销学校
  • 服务佳的小企业网站建设用jsp做的网站首页
  • 安徽制作网站怎样建网站 需要
  • 请为hs公司的钻石礼品网站做网络营销沟通策划_预算是20万.网站建设怎么申请域名
  • 织梦网站建设过程天津定制网站建设商店设计
  • 东营网站建设铭盛信息第一媒体app最新版本
  • 创建网站需要注意什么jpress和wordpress
  • 怎么上传自己做的网站上海网站建设升