建筑模板种类,连云港seo公司,做网上营销怎样推广,中心网站建设管理工作1.概述
前面一章分析了集群下启动阶段选举过程#xff0c;一旦完成选举#xff0c;通过执行QuorumPeer的setPeerState将设置好选举结束后自身的状态。然后#xff0c;将再次执行QuorumPeer的run的新的一轮循环#xff0c;
QuorumPeer的run的每一轮循环#xff0c;先判断…1.概述
前面一章分析了集群下启动阶段选举过程一旦完成选举通过执行QuorumPeer的setPeerState将设置好选举结束后自身的状态。然后将再次执行QuorumPeer的run的新的一轮循环
QuorumPeer的run的每一轮循环先判断自身当前状态 (1). 自身为LOOKING 则需在本轮循环开启选举并完成选举。 (2). 自身为FOLLOWING
protected Follower makeFollower(FileTxnSnapLog logFactory) throws IOException {return new Follower(this, new FollowerZooKeeperServer(logFactory, this, this.zkDb));
}try {LOG.info(FOLLOWING);setFollower(makeFollower(logFactory));follower.followLeader();
} catch (Exception e) {LOG.warn(Unexpected exception, e);
} finally {follower.shutdown();setFollower(null);updateServerState();
}则应转换为从节点先通过follower.followLeader();与主同步再履行起从节点的角色任务。 (3). 自身为LEADING
protected Leader makeLeader(FileTxnSnapLog logFactory) throws IOException, X509Exception {return new Leader(this, new LeaderZooKeeperServer(logFactory, this, this.zkDb));
}try {setLeader(makeLeader(logFactory));leader.lead();setLeader(null);
} catch (Exception e) {LOG.warn(Unexpected exception, e);
} finally {if (leader ! null) {leader.shutdown(Forcing shutdown);setLeader(null);}updateServerState();
}则应转换为主节点先通过leader.lead();完成集群同步后再履行起主节点的角色任务。
本部分讨论集群同步过程。集群同步是一个完成选举的主和从相互协作最终大家达成一致的过程。
2.集群同步过程前的两次同步
2.1.主节点的leader.lead()
self.setZabState(QuorumPeer.ZabState.DISCOVERY);
self.tick.set(0);
zk.loadData();
leaderStateSummary new StateSummary(self.getCurrentEpoch(), zk.getLastProcessedZxid());
cnxAcceptor new LearnerCnxAcceptor();
cnxAcceptor.start();上述动作里先是设置ZabState为DISCOVERY。启动节点一开始是ELECTION来着。 其中zk.loadData();由于在启动节点选举前已经执行过了一次基于快照redo的数据实体恢复所以这里啥也不用做。 构建一个StateSummary实例这个实例包含了主节点下数据实体阶段性的一个反映。对应的那个集群轮次最后一个zxid是啥。 通过cnxAcceptor.start();使得主节点开始作为一个服务端允许其他集群成员来连接以便执行集群同步及后续的请求处理。 对每个接入的集群成员在主节点方面将通过accept得到通信套接字并分配一个LearnerHandler来维护和集群成员的通信。
LearnerHandler fh new LearnerHandler(socket, is, Leader.this);
fh.start();fh.start();将开启一个线程在此线程中将接收来自连接对端的包并对其执行处理。 接下来主节点执行的是
long epoch getEpochToPropose(self.getId(), self.getAcceptedEpoch());主节点执行getEpochToPropose的目的是为了获得新产生的集群的轮次要设置为何值 此处将产生同步等待直到算上主节点自身有半数以上集群成员连接到的主节点并执行了getEpochToPropose才能获得继续。 对主节点此处的同步等待超时下将引发主节点停止。并设置自身状态为LOOKING。这样在QuorumPeer的run的新一轮循环里将重新开始选举过程。
2.2.从节点的follower.followLeader()
self.setZabState(QuorumPeer.ZabState.DISCOVERY);
QuorumServer leaderServer findLeader();
connectToLeader(leaderServer.addr, leaderServer.hostname);
connectionTime System.currentTimeMillis(); 上述动作里先是设置ZabState为DISCOVERY。启动节点一开始是ELECTION来着。 然后执行findLeader找到主节点。借助自身的投票和集群全局配置很容易定位出来。 connectToLeader将同步方式发起到服务端连接。 若指定时间或指定次数内连接未建立将引发从节点停止。并设置自身状态为LOOKING。这样在QuorumPeer的run的新一轮循环里将重新开始选举过程。 若连接成功在异步发包下会启动一个线程用于异步发包。 从节点接下来执行
long newEpochZxid registerWithLeader(Leader.FOLLOWERINFO);这里面从节点先向主节点发一个包
long lastLoggedZxid self.getLastLoggedZxid();
QuorumPacket qp new QuorumPacket();
qp.setType(Leader.FOLLOWERINFO);// 包类型
qp.setZxid(ZxidUtils.makeZxid(self.getAcceptedEpoch(), 0));// 包里的zxid
LearnerInfo li new LearnerInfo(self.getId(), 0x10000, self.getQuorumVerifier().getVersion());
ByteArrayOutputStream bsid new ByteArrayOutputStream();
BinaryOutputArchive boa BinaryOutputArchive.getArchive(bsid);
boa.writeRecord(li, LearnerInfo);
qp.setData(bsid.toByteArray());// 包的数据体是一个LearnerInfo实例。包含从节点sid0x10000集群全局信息。
writePacket(qp, true);现在从节点等待主节点的回复。
2.3.主节点实现从节点接入及收包处理 主节点会为每个接入到其的从节点分配一个LearnerHandler实例此实例将占据一个独立的线程来处理来自对应从节点的数据包收取和处理。
LeaderHandler的run一开始执行
learnerMaster.addLearnerHandler(this);这样将向leader的learners集合中加入此LearnerHandler实例。 然后主将开始收取首个包。并对首个包进行合法性检测对从节点必须是Leader.FOLLOWERINFO。 若检测失败服务端方面会停止线程关闭被动连接从leader的learnersforwardingFollowersobservingLearners集合移除此LearnerHandler实例。客户端方面则将引发从节点停止。并设置自身状态为LOOKING。这样在QuorumPeer的run的新一轮循环里将重新开始选举过程。后续分析过程失败过程处理类似。我们将按成功的流程去分析。
我们分析主节点这边对首个包的解析
QuorumPacket qp new QuorumPacket();
// 反向序列化获得包
ia.readRecord(qp, packet);
byte[] learnerInfoData qp.getData();// 获得包中数据体
ByteBuffer bbsid ByteBuffer.wrap(learnerInfoData);
this.sid bbsid.getLong();
this.version bbsid.getInt(); // protocolVersion--0x10000
String followerInfo learnerMaster.getPeerInfo(this.sid);
// 这样将获得从节点首个包里的zxid从中提取出epoch
long lastAcceptedEpoch ZxidUtils.getEpochFromZxid(qp.getZxid());
long peerLastZxid;
StateSummary ss null;
long zxid qp.getZxid();
long newEpoch learnerMaster.getEpochToPropose(this.getSid(), lastAcceptedEpoch);2.4. 集群同步阶段确定新集群的epoch 通过2.1到2.3我们知道。 选举结束后主节点作为服务端允许集群从节点的接入。 然后集群主节点将阻塞等待。 对每个结束选举的从节点将连接主节点成功后发出首个Leader.FOLLOWERINFO包。然后等待回复。 主节点为每个接入的从节点分配LearnerHandler实例并启动线程处理收包和收包处理。 每个LearnerHandler的线程收取首个包Leader.FOLLOWERINFO后通过learnerMaster.getEpochToPropose陷入与主节点一样的阻塞等待。
当算上主节点自身有半数以上成员陷入上述阻塞等待后将基于所有等待成员中acceptedEpoch最大值1作为新集群的epoch设置到leader的epoch。 若上述过程某个成员连接上出现错误或等待超时则将执行连接断开。角色的集群同步停止。重新变为LOOKING状态并进入选举过程这样的处理。 主节点自己等待超时或出错则主节点的集群同步停止。重新变为LOOKING状态并进入选举流程。主节点集群同步停止时会引发每个连到主节点的从节点和主节点的连接断开。使得从节点停止。重新变为LOOKING状态。
2.5.我们分别针对主节点从节点分析完成getEpochToPropose等待后的处理流程 2.5.1.对主节点
zk.setZxid(ZxidUtils.makeZxid(epoch, 0));
synchronized (this) {lastProposed zk.getZxid();
}
newLeaderProposal.packet new QuorumPacket(NEWLEADER, zk.getZxid(), null, null);
newLeaderProposal.addQuorumVerifier(self.getQuorumVerifier());
waitForEpochAck(self.getId(), leaderStateSummary);主节点后续执行waitForEpochAck。 主节点执行此步骤会阻塞在waitForEpochAck中收集到半数以上的成员及其StateSummary后在满足 (1). 完成收集未引发超时 (2). 不存在从节点StateSummary领先主节点的情况。 满足上述两个条件主节点将继续。并设置electionFinished为true。 若某个条件不满足主节点将停止并断开与连到其的从的连接这样将引发从节点也停止。最终主从均停止。并发起下一轮选举。
2.5.2.对从节点
long newLeaderZxid ZxidUtils.makeZxid(newEpoch, 0);
byte[] ver new byte[4];
ByteBuffer.wrap(ver).putInt(0x10000);
QuorumPacket newEpochPacket new QuorumPacket(Leader.LEADERINFO, newLeaderZxid, ver, null);
oa.writeRecord(newEpochPacket, packet);
messageTracker.trackSent(Leader.LEADERINFO);
bufferedOutput.flush();// 立即发送
QuorumPacket ackEpochPacket new QuorumPacket();
ia.readRecord(ackEpochPacket, packet);主这边会向其回复一个Leader.LEADERINFO包其中包含基于新集群epoch得到的zxid版本信息(0x10000)。 从节点这边
readPacket(qp);
final long newEpoch ZxidUtils.getEpochFromZxid(qp.getZxid());// 取得回复包里的zxid
leaderProtocolVersion ByteBuffer.wrap(qp.getData()).getInt();// 取得ver
byte[] epochBytes new byte[4];
final ByteBuffer wrappedEpochBytes ByteBuffer.wrap(epochBytes);
wrappedEpochBytes.putInt((int) self.getCurrentEpoch());// 放入自身currentEpoch
self.setAcceptedEpoch(newEpoch);// 用回复包里zxid导出的epoch设置自身acceptedEpoch
QuorumPacket ackNewEpoch new QuorumPacket(Leader.ACKEPOCH, lastLoggedZxid, epochBytes, null);
writePacket(ackNewEpoch, true);// 向主节点发包
return ZxidUtils.makeZxid(newEpoch, 0);再来看主节点这边
QuorumPacket ackEpochPacket new QuorumPacket();
ia.readRecord(ackEpochPacket, packet);// 等待连主成员对回复包1的回复
messageTracker.trackReceived(ackEpochPacket.getType());
ByteBuffer bbepoch ByteBuffer.wrap(ackEpochPacket.getData());//
ss new StateSummary(bbepoch.getInt(), ackEpochPacket.getZxid());// 从节点的currentEpoch从节点的zxid
learnerMaster.waitForEpochAck(this.getSid(), ss);// 连主的多数一起同步等待然后继续运行。waitForEpochAck这个执行中将陷入等待。直到满足 (1). waitForEpochAck使得主节点收集到多数以上成员的sidss。 (2). 收集达到半数以上未超时。 (3). 未出现从节点的ss领先主节点。 上述三者满足时等待者和主节点继续运行。 对LearnerHandler中执行waitForEpochAck超时将断开与从节点连接。引发从节点停止并重新进入下一轮选举。 这一同步达成后主节点中electionFinished设置为true。
到目前为止我们分析了成员完成选举确认自身身份后经历过了两个由主节点这边主导的阻塞等待。 第一个阻塞等待是getEpochToPropose通过此等待基于所有连主成员中acceptedEpoch最大值1作为新集群epoch。 此阶段对从节点等待超时或出错会使得其停止作为从并重新进入选举。 此节点对主节点等待超时或出错会使得其自身和所有从均停止自身角色并重新进入选举。 成功则使得我们获得新集群的epoch及基于epoch的zxid。
第二个阻塞等待是waitForEpochAck通过此等待主要是再一次确认主节点的合法性。即主节点自身的进度确实是参与选举的各个节点中最靠前的。 比如一个123三个成员的集群。 (1). 123启动并运行很长时间。 (2). 全部停止。 (3). 手动删除12的快照redo。 (4). 启动12。使得12各自完成选举预期2为主1为从。 (5). 启动3假设此时12各自处于waitForEpochAck。3选举结束依据收到的Notification确定2为主。并连接2也进入waitForEpochAck。此时就会产生某个从节点的状态领先主节点的情况。这时就要使得主和从全部停止角色并再次选举。再次选举将选择3为主。此时waitForEpochAck将获得通过。一旦主和从的waitForEpochAck结束。此时及时有新的从连到主且领先于主也不会被考虑了。
下面将进入真正的集群同步阶段。
3. 集群同步
我们继续分别从主节点从节点角度分析其后续流程。 3.1. 主节点
self.setCurrentEpoch(epoch);
self.setLeaderAddressAndId(self.getQuorumAddress(), self.getId());
self.setZabState(QuorumPeer.ZabState.SYNCHRONIZATION);
waitForNewLeaderAck(self.getId(), zk.getZxid());只有完成前述两个节点的同步且未出错。主节点采用将新集群的epoch设置到其currentEpoch中。 注意对主节点和从节点只要其完成了前述中第一个同步就会各自设置自己的acceptedEpoch。 所以accepted反映的是集群已经完成的集群选举次数。一个完成的集群选举的集群并不一定可以成为对外服务的集群。必须继续完成前述第二个同步及完成后续集群同步才可以。
主节点执行waitForNewLeaderAck是我们遇到的新集群形成中主节点主导的第三次同步。 先分析从节点方面在第二个同步完成后的后续动作再分析第三次同步等待的意义。
3.2.从节点 先分析LearnerHandler中后续处理
peerLastZxid ss.getLastZxid();
// 这个函数完成的同步请求及主节点中尚未提交请求的排队
boolean needSnap syncFollower(peerLastZxid, learnerMaster);
// 在需要快照的情况下
if (needSnap) {try {long zxidToSend learnerMaster.getZKDatabase().getDataTreeLastProcessedZxid();// 立即发一个快照包。这样保证快照DIFFTRUNC三种同步方式下均是先处理同步。再继续处理主节点中尚未提交请求。oa.writeRecord(new QuorumPacket(Leader.SNAP, zxidToSend, null, null), packet);messageTracker.trackSent(Leader.SNAP);bufferedOutput.flush()learnerMaster.getZKDatabase().serializeSnapshot(oa);oa.writeString(BenWasHere, signature);bufferedOutput.flush();} finally {ServerMetrics.getMetrics().SNAP_COUNT.add(1);}
} else {ServerMetrics.getMetrics().DIFF_COUNT.add(1);
}
// 同步尚未提交包Leader.NEWLEADER
QuorumPacket newLeaderQP new QuorumPacket(Leader.NEWLEADER, newLeaderZxid, learnerMaster.getQuorumVerifierBytes(), null);
queuedPackets.add(newLeaderQP);
bufferedOutput.flush();
startSendingPackets(); 我们分析上述过程现在我们已经针对新集群完成了两个同步。 且知道从节点的lastzxid也知道主节点的lastzxid。 那么我们要做的就是让从节点和主节点同步同步的结果就是从节点的lastzxid变成和主节点的一样。 具体分析之前可以直观想到同步的几种情况 (1). 从节点落后于主节点 落后的不多。 落后的较多。 (2). 从节点领先于主节点
我们继续分析syncFollower
private void queueOpPacket(int type, long zxid) {QuorumPacket packet new QuorumPacket(type, zxid, null, null);queuePacket(packet);
}// 若是差异化同步或截断同步将在此函数内得到处理
// 若是快照同步得外部处理。// 进入是
// needOpPacket 为true
// needSnap 也为true// 这样的话从节点方面的lastZxid中序号为0表示过对于epoch下没处理过具体事务。
boolean isPeerNewEpochZxid (peerLastZxid 0xffffffffL) 0;
long currentZxid peerLastZxid;
boolean needSnap true;
ZKDatabase db learnerMaster.getZKDatabase();
boolean txnLogSyncEnabled db.isTxnLogSyncEnabled();
ReentrantReadWriteLock lock db.getLogLock();
ReadLock rl lock.readLock();
try {rl.lock();// 快照恢复后主节点后续落地的集群类型的请求最近N个请求中// 最大者的zxidlong maxCommittedLog db.getmaxCommittedLog();// 最小者的zxidlong minCommittedLog db.getminCommittedLog();// 这个是主节点最后落地的zxid。一般可认为就是maxCommittedLog。long lastProcessedZxid db.getDataTreeLastProcessedZxid();if (db.getCommittedLog().isEmpty()) {minCommittedLog lastProcessedZxid;maxCommittedLog lastProcessedZxid;}if (lastProcessedZxid peerLastZxid) { // 如果从节点的lastZxid和主节点的一致// 使用一个Leader.DIFF完成同步queueOpPacket(Leader.DIFF, peerLastZxid);// 差异化同步needOpPacket false;needSnap false;} else if (peerLastZxid maxCommittedLog !isPeerNewEpochZxid) {// 这里的意思是从节点领先于主节点// 通过一个TRUNC完成同步queueOpPacket(Leader.TRUNC, maxCommittedLog);// 同步节点结束时从节点的lastZxidcurrentZxid maxCommittedLog;needOpPacket false;needSnap false;} else if ((maxCommittedLog peerLastZxid) (minCommittedLog peerLastZxid)) {// 这意味着从节点落后于主节点。但落后的不算多。// 情况1当peerLastZxid在迭代范围内出现时同步过程为// DIFFpeerLastZxid后的每个包{packetCOMMIT}// 情况2当peerLastZxid在迭代范围内没出现且peerLastZxid对应轮次下没事务请求被处理// DIFF首个大于peerLastZxid及其后的每个包{packetCOMMIT}// 情况3当peerLastZxid在迭代范围内没出现从节点处理了某些请求首个大于peerLastZxid的packetZxid属于后续轮次// 最终将采用快照完成同步阶段// 情况4当peerLastZxid在迭代范围内没出现从节点处理了某些请求首个大与peerLastZxid的packetZxid属于相同轮次// TRUNC{截掉从节点中不在主节点中的请求}对主节点中大于peerLastZxid的每个包{packetCOMMIT}IteratorProposal itr db.getCommittedLog().iterator();currentZxid queueCommittedProposals(itr, peerLastZxid, null, maxCommittedLog);needSnap false;} else if (peerLastZxid minCommittedLog txnLogSyncEnabled) {// 这意味着从节点落后主节点比较多且设置了需要较多redo下优先选择快照long sizeLimit db.calculateTxnLogSizeLimit();// 计算redo尺寸限制IteratorProposal txnLogItr db.getProposalsFromTxnLog(peerLastZxid, sizeLimit);// 在估算所需redo尺寸达到限制时hasNext为false。// 此时采用快照来同步相应的currentZxid db.getDataTreeLastProcessedZxid();if (txnLogItr.hasNext()) {currentZxid queueCommittedProposals(txnLogItr, peerLastZxid, minCommittedLog, maxCommittedLog);if (currentZxid minCommittedLog) {currentZxid peerLastZxid;queuedPackets.clear();needOpPacket true;} else {IteratorProposal committedLogItr db.getCommittedLog().iterator();currentZxid queueCommittedProposals(committedLogItr, currentZxid, null, maxCommittedLog);needSnap false;}}if (txnLogItr instanceof TxnLogProposalIterator) {TxnLogProposalIterator txnProposalItr (TxnLogProposalIterator) txnLogItr;txnProposalItr.close();}}if (needSnap) {currentZxid db.getDataTreeLastProcessedZxid();}// 主节点继续将一些尚未落地的包发给从节点leaderLastZxid learnerMaster.startForwarding(this, currentZxid);
} finally {rl.unlock();
}
if (needOpPacket !needSnap) {needSnap true;// 采用快照来同步
}
return needSnap;关于queueCommittedProposals的分析
protected long queueCommittedProposals(IteratorProposal itr, long peerLastZxid, Long maxZxid, Long lastCommittedZxid) {boolean isPeerNewEpochZxid (peerLastZxid 0xffffffffL) 0;long queuedZxid peerLastZxid;long prevProposalZxid -1;while (itr.hasNext()) {Proposal propose itr.next();long packetZxid propose.packet.getZxid();if ((maxZxid ! null) (packetZxid maxZxid)) {break;}// 若maxZxid存在至多处理到maxZxidif (packetZxid peerLastZxid) {prevProposalZxid packetZxid;// zxid过小对端已经处理过。略过。continue;}// 对迭代范围内首个大于或等于peerLastZxid的packetZxidif (needOpPacket) {// 情况1这个packetZxid等于peerLastZxidif (packetZxid peerLastZxid) {queueOpPacket(Leader.DIFF, lastCommittedZxid);needOpPacket false;continue;}// 情况这个packetZxid大于peerLastZxid。// 这样迭代范围前一部分均小于peerLastZxid后一部分均大于peerLastZxidif (isPeerNewEpochZxid) {// 情况2peerLastZxid对应轮次下从节点没处理过事务// 此时忽略此peerLastZxidqueueOpPacket(Leader.DIFF, lastCommittedZxid);needOpPacket false;// 不再需要发op包了} else {// peerLastZxid对应轮次是处理过事务请求。// 情况3主节点中首个大于peerLastZxid的包已经是后续轮次的包了。// 这样最终会用快照方式来处理同步if (ZxidUtils.getEpochFromZxid(packetZxid) ! ZxidUtils.getEpochFromZxid(peerLastZxid)) {return queuedZxid;// 直接返回peerLastZxid} // 情况4主节点处理的请求中跳过了某些从节点轮次下处理过的请求queueOpPacket(Leader.TRUNC, prevProposalZxid);needOpPacket false;}}// 情况1当peerLastZxid在迭代范围内出现时同步过程为// DIFFpeerLastZxid后的每个包{packetCOMMIT}// 情况2当peerLastZxid在迭代范围内没出现且peerLastZxid对应轮次下没事务请求被处理// DIFF首个大于peerLastZxid及其后的每个包{packetCOMMIT}// 情况3当peerLastZxid在迭代范围内没出现从节点处理了某些请求首个大与peerLastZxid的packetZxid属于后续轮次// 最终将采用快照完成同步阶段// 情况4当peerLastZxid在迭代范围内没出现从节点处理了某些请求首个大与peerLastZxid的packetZxid属于相同轮次// TRUNC{截掉从节点中不在主节点中的请求}对主节点中大于peerLastZxid的每个包{packetCOMMIT}if (packetZxid queuedZxid) {continue;}queuePacket(propose.packet);queueOpPacket(Leader.COMMIT, packetZxid);queuedZxid packetZxid;}return queuedZxid;}分析下来结论为 (1). 情况1当peerLastZxid在迭代范围内出现 同步策略为DIFFpeerLastZxid后的每个包{packetCOMMIT} currentZxid为maxCommittedLog (2). 情况2当peerLastZxid在迭代范围内没出现且peerLastZxid对应轮次下没事务请求被处理 同步策略为DIFF首个大于peerLastZxid及其后的每个包{packetCOMMIT} currentZxid为maxCommittedLog (3). 情况3当peerLastZxid在迭代范围内没出现peerLastZxid所在轮次下从节点处理了某些请求首个大于peerLastZxid的packetZxid属于后续轮次 同步策略为采用快照完成同步阶段 currentZxid为peerLastZxid (4). 情况4当peerLastZxid在迭代范围内没出现从节点处理了某些请求首个大与peerLastZxid的packetZxid属于相同轮次 同步策略为TRUNC{截掉从节点中不在主节点中的请求}对主节点中大于peerLastZxid的每个包{packetCOMMIT} currentZxid为maxCommittedLog
关于queueCommittedProposals的分析 针对peerLastZxid minCommittedLog txnLogSyncEnabled来分析 此场景可认为是从节点和主节点差的比较多且指定了在差的达到一定量时优先采用快照来同步。 此情况下在估算所需redo尺寸达到限制时hasNext将为false。 此时采用快照来同步相应的currentZxid db.getDataTreeLastProcessedZxid();
估算所需redo尺寸没达到限制时hasNext将为true
currentZxid queueCommittedProposals(txnLogItr, peerLastZxid, minCommittedLog, maxCommittedLog);
// 只有选择快照同步策略下返回值才会小于minCommittedLog
if (currentZxid minCommittedLog) {// 此时将采用快照来同步currentZxid db.getDataTreeLastProcessedZxid();currentZxid peerLastZxid;queuedPackets.clear();needOpPacket true;
} else {// 此时采用DIFF同步currentZxid maxCommittedLog;IteratorProposal committedLogItr db.getCommittedLog().iterator();currentZxid queueCommittedProposals(committedLogItr, currentZxid, null, maxCommittedLog);needSnap false;
}结合前述分析。这里只有在queueCommittedProposals返回值为peerLastZxid时会采用快照同步。且此时currentZxid db.getDataTreeLastProcessedZxid(); 其他情况下将采用DIFF/TRUNC主节点后续包方式来同步此时currentZxid maxCommittedLog;
syncFollower中先是同步同步后执行leaderLastZxid learnerMaster.startForwarding(this, currentZxid);以继续向从节点发一些需要确认的尚未提交的包。
public synchronized long startForwarding(LearnerHandler handler, long lastSeenZxid) {if (lastProposed lastSeenZxid) {for (Proposal p : toBeApplied) {if (p.packet.getZxid() lastSeenZxid) {continue;}// 这里继续向从节点发一些主节点这边需要应用应用到整个集群且zxid大于同步后从节点这边zxid的包handler.queuePacket(p.packet);QuorumPacket qp new QuorumPacket(Leader.COMMIT, p.packet.getZxid(), null, null);handler.queuePacket(qp);}if (handler.getLearnerType() LearnerType.PARTICIPANT) {// outstandingProposals含义后续分析ListLong zxids new ArrayListLong(outstandingProposals.keySet());Collections.sort(zxids);for (Long zxid : zxids) {if (zxid lastSeenZxid) {continue;}handler.queuePacket(outstandingProposals.get(zxid).packet);}}}if (handler.getLearnerType() LearnerType.PARTICIPANT) {// 将从节点加入主节点的forwardingFollowers集合addForwardingFollower(handler);// } else {addObserverLearnerHandler(handler);}return lastProposed;
}前面分析中LearnerHandler这边执行集群同步的三个阶段为 (1). 与主同步 (2). 继续向从发送主节点这边需要应用的包 (3). 发NEWLEADER包
QuorumPacket newLeaderQP new QuorumPacket(Leader.NEWLEADER, newLeaderZxid, learnerMaster.getQuorumVerifierBytes(), null);
queuedPackets.add(newLeaderQP);然后执行
qp new QuorumPacket();
ia.readRecord(qp, packet);
messageTracker.trackReceived(qp.getType());
learnerMaster.waitForNewLeaderAck(getSid(), qp.getZxid());我们接着分析从节点对集群同步节点收到的这些包的处理。
3.3.从节点对集群同步包的处理 从节点在registerWithLeader后执行的流程为
// 取得新集群epoch
long newEpoch ZxidUtils.getEpochFromZxid(newEpochZxid);
if (newEpoch self.getAcceptedEpoch()) {throw new IOException(Error: Epoch of leader is lower);
}
long startTime Time.currentElapsedTime();
self.setLeaderAddressAndId(leaderServer.addr, leaderServer.getId());
self.setZabState(QuorumPeer.ZabState.SYNCHRONIZATION);
syncWithLeader(newEpochZxid);
self.setZabState(QuorumPeer.ZabState.BROADCAST);我们分析syncWithLeader 从节点点在syncWithLeader里依次处理同步主节点同步后的包NEWLEADER。 其中处理NewLeader包将向主发一个writePacket(new QuorumPacket(Leader.ACK, newLeaderZxid, null, null), true);
3.4.主节点这边收到Leader.ACK时处理 由于前述可知主节点这边主节点自己和负责与从节点通信的LearnerHandler此时均执行waitForNewLeaderAck。 当达到半数以上的Leader.ACK收集而未发生超时时将一起继续后续动作。
对LearnerHandler为
learnerMaster.waitForStartup();
queuedPackets.add(new QuorumPacket(Leader.UPTODATE, -1, null, null));即先等待主节点作为服务端对外提供服务后再向从节点发送Leader.UPTODATE。 我们继续分析从节点这边收到Leader.UPTODATE的处理
// 向主节点发送Leader.ACK
QuorumPacket ack new QuorumPacket(Leader.ACK, 0, null, null);
ack.setZxid(ZxidUtils.makeZxid(newEpoch, 0));
writePacket(ack, true);
// 启动对外服务端
zk.startServing();
// 更新自身选票中集群的epoch
self.updateElectionVote(newEpoch);
if (zk instanceof FollowerZooKeeperServer) {FollowerZooKeeperServer fzk (FollowerZooKeeperServer) zk;// 对未提交包for (PacketInFlight p : packetsNotCommitted) {fzk.logRequest(p.hdr, p.rec, p.digest);// 日志}for (Long zxid : packetsCommitted) {fzk.commit(zxid);// 提交}
} 此后进入从节点常规运行阶段
self.setZabState(QuorumPeer.ZabState.BROADCAST);
completedSync true;
QuorumPacket qp new QuorumPacket();
while (this.isRunning()) {// 接收来自主节点的包readPacket(qp);// 处理processPacket(qp);
}3.5.主节点waitForNewLeaderAck的后续处理
startZkServer();// 起点服务端开启外部服务
if (!System.getProperty(zookeeper.leaderServes, yes).equals(no)) {self.setZooKeeperServer(zk);
}
self.setZabState(QuorumPeer.ZabState.BROADCAST);
self.adminServer.setZooKeeperServer(zk);
boolean tickSkip true;
String shutdownMessage null;
while (true) {synchronized (this) {long start Time.currentElapsedTime();long cur start;long end start self.tickTime / 2;while (cur end) {wait(end - cur);cur Time.currentElapsedTime();}if (!tickSkip) {self.tick.incrementAndGet();}SyncedLearnerTracker syncedAckSet new SyncedLearnerTracker();syncedAckSet.addQuorumVerifier(self.getQuorumVerifier());syncedAckSet.addAck(self.getId());for (LearnerHandler f : getLearners()) {if (f.synced()) {syncedAckSet.addAck(f.getSid());}}if (!this.isRunning()) {shutdownMessage Unexpected internal error;break;}if (!tickSkip !syncedAckSet.hasAllQuorums() !(self.getQuorumVerifier().overrideQuorumDecision(getForwardingFollowers()) self.getQuorumVerifier().revalidateOutstandingProp(this, new ArrayList(outstandingProposals.values()), lastCommitted))) {shutdownMessage Not sufficient followers synced, only synced with sids: [ syncedAckSet.ackSetsToString() ];break;}tickSkip !tickSkip;}for (LearnerHandler f : getLearners()) {f.ping();}
}
if (shutdownMessage ! null) {shutdown(shutdownMessage);
}上述可认为是集群正常运行中主节点不断检测各个从节点的连接情况一旦出现连接人数不足等异常情况。及时停止主节点运行以便重新选举出合适集群。