当前位置: 首页 > news >正文

设计软件手机北海seo快速排名

设计软件手机,北海seo快速排名,自己做家具网站,wordpress seo蜘蛛访问统计插件ISR(In-sync Replicas)#xff1a;保持同步的副本 OSR(Outof-sync Replicas)#xff1a;不同步的副本。最开始所有的副本都在ISR中#xff0c;在kafka工作的过程中#xff0c;如果某个副本同步速度慢于replica.lag.time.max.ms指定的阈值#xff0c;则被踢出ISR存入OSR保持同步的副本 OSR(Outof-sync Replicas)不同步的副本。最开始所有的副本都在ISR中在kafka工作的过程中如果某个副本同步速度慢于replica.lag.time.max.ms指定的阈值则被踢出ISR存入OSR如果后续速度恢复可以回到ISR中 ARAssigned Replicas包括所有的分区的副本ARISROSR 不懂的可以看一下Kafka——副本Replica机制 一、在主题分区初始化时当前主题分区所有副本都是会Leader副本的maximalIsr中1、先获得leaderIsrUpdateLock写锁在锁内2、初始化ISR(只是把所有副本信息保存在maximalIsr这时候maximalIsr也是最大的时候 二、定时任务针对ISR缩容1、2种启动方式1zk模式2kraft模式 2、定时任务具体实现(1) 获得leaderIsrUpdateLock的读锁判断是否需要ISR的缩容(2)得到leaderIsrUpdateLock的写锁开始修改ISR(3) 缩容后的ISR先赋值给maximalIsrisr还是保持没有缩容前的 三、Follower请求Leader的Fetch数据时会判断是否加入ISR1、获得leaderIsrUpdateLock的读锁后再判断是否符合加入ISR条件2、获得leaderIsrUpdateLock的写锁再执行写入操作3、写入操作把新的Follower副本先加入maximalIsrisr保持扩容前的 四、修改完maximalIsr后都要把信息发给其他副本1、zk模式定时任务修改zk节点进行传播 2、kraft模式通过给controllerChannelManager发送请求通知 五、 maximalIsr和isr1、PartitionState中isr和maximalIsr两个字段的定义和为什么上面只是修改了maximalIsr2、什么时候maximalIsr会给isr赋值 一、在主题分区初始化时当前主题分区所有副本都是会Leader副本的maximalIsr中 如果不知到becomeLeaderOrFollower方法可以看一下kafka 3.5 主题分区的Follower创建Fetcher线程从Leader拉取数据源码 def becomeLeaderOrFollower(correlationId: Int,leaderAndIsrRequest: LeaderAndIsrRequest,onLeadershipChange: (Iterable[Partition], Iterable[Partition]) Unit): LeaderAndIsrResponse {//省略代码val partitionsBecomeLeader if (partitionsToBeLeader.nonEmpty)makeLeaders(controllerId, controllerEpoch, partitionsToBeLeader, correlationId, responseMap,highWatermarkCheckpoints, topicIdFromRequest)//省略代码 }private def makeLeaders(controllerId: Int,controllerEpoch: Int,partitionStates: Map[Partition, LeaderAndIsrPartitionState],correlationId: Int,responseMap: mutable.Map[TopicPartition, Errors],highWatermarkCheckpoints: OffsetCheckpoints,topicIds: String Option[Uuid]): Set[Partition] {//省略代码//更新分区信息以成为leader,成功则返回trueif (partition.makeLeader(partitionState, highWatermarkCheckpoints, topicIds(partitionState.topicName))) {//将成功成为leader的分区添加到partitionsToMakeLeaders集合中partitionsToMakeLeaders partition}//省略代码} 1、先获得leaderIsrUpdateLock写锁在锁内 def makeLeader(partitionState: LeaderAndIsrPartitionState,highWatermarkCheckpoints: OffsetCheckpoints,topicId: Option[Uuid]): Boolean {//获取了一个写锁leaderIsrUpdateLock以确保并发修改的同步。val (leaderHWIncremented, isNewLeader) inWriteLock(leaderIsrUpdateLock) {//省略代码 controllerEpoch partitionState.controllerEpoch//省略代码 val currentTimeMs time.milliseconds//代码检查了isLeader是否为false如果是则将isNewLeader设置为true。val isNewLeader !isLeader//代码将partitionState中的各种属性转换为Scala集合并尝试更新分配和ISR状态。val isNewLeaderEpoch partitionState.leaderEpoch leaderEpochval replicas partitionState.replicas.asScala.map(_.toInt)//遍历partitionState生成ISRisv有此分区所有的副本的信息包括Leader和Followerval isr partitionState.isr.asScala.map(_.toInt).toSetval addingReplicas partitionState.addingReplicas.asScala.map(_.toInt)val removingReplicas partitionState.removingReplicas.asScala.map(_.toInt)//省略代码//如果分区纪元大于或等于当前分区纪元则更新分配和 ISR updateAssignmentAndIsr(replicas replicas,isLeader true,isr isr,addingReplicas addingReplicas,removingReplicas removingReplicas,LeaderRecoveryState.RECOVERED)//省略代码。。。。。isNewLeader}updateAssignmentAndIsr这个会进行初始化ISR def updateAssignmentAndIsr(replicas: Seq[Int],isLeader: Boolean,isr: Set[Int],addingReplicas: Seq[Int],removingReplicas: Seq[Int],leaderRecoveryState: LeaderRecoveryState): Unit {if (isLeader) {//根据replicas过滤出所有非本地节点的副本标识符存储在followers中val followers replicas.filter(_ ! localBrokerId)//通过remoteReplicasMap.keys过滤出需要移除的副本标识符存储在removedReplicas中val removedReplicas remoteReplicasMap.keys.filterNot(followers.contains(_))//。通过迭代followers将新副本添加到remoteReplicasMap如果副本已存在则不进行任何操作。followers.foreach(id remoteReplicasMap.getAndMaybePut(id, new Replica(id, topicPartition)))remoteReplicasMap.removeAll(removedReplicas)} else {//清空remoteReplicasMapremoteReplicasMap.clear()}assignmentState if (addingReplicas.nonEmpty || removingReplicas.nonEmpty)OngoingReassignmentState(addingReplicas, removingReplicas, replicas)elseSimpleAssignmentState(replicas)partitionState CommittedPartitionState(isr, leaderRecoveryState)}通过调用CommittedPartitionState 给ISR(代码中字段是maximalIsr)赋值 2、初始化ISR(只是把所有副本信息保存在maximalIsr这时候maximalIsr也是最大的时候 case class CommittedPartitionState(isr: Set[Int],leaderRecoveryState: LeaderRecoveryState ) extends PartitionState {val maximalIsr isrval isInflight falseoverride def toString: String {sCommittedPartitionState(isr$isr s, leaderRecoveryState$leaderRecoveryState )} }至于为什么赋值给maximalIsr看一下下面第五章1节的PartitionState的定义其实就知道ISR还没有正式生效 二、定时任务针对ISR缩容 1、2种启动方式 1zk模式 kakfaServer.scala中的startup方法里会调用replicaManager.startup() 2kraft模式 BrokerServer.scala中startup方法------ sharedServer.loader.installPublishers(metadataPublishers)-------- scheduleInitializeNewPublishers(0);------------- initializeNewPublishers------------- publisher.onMetadataUpdate(delta, image, manifest);实现方法是BrokerMetadataPublisher.scala中的onMetadataUpdate-------------- initializeManagers()----------------- replicaManager.startup() 2、定时任务具体实现 首先直接看定时任务在ReplicaManager.scala类中 def startup(): Unit {//启动 ISR 过期线程// 从属者在从 ISR 中删除之前最多可以落后于领导者。replicaLagTimeMaxMs x 1.5scheduler.schedule(isr-expiration, () maybeShrinkIsr(), 0L, config.replicaLagTimeMaxMs / 2)}实现定时执行方法为maybeShrinkIsr private def maybeShrinkIsr(): Unit {trace(Evaluating ISR list of partitions to see which replicas can be removed from the ISR)// Shrink ISRs for non offline partitions//收缩非脱机分区的 ISR即遍历所有在线分区的ISRallPartitions.keys.foreach { topicPartition onlinePartition(topicPartition).foreach(_.maybeShrinkIsr())}}(1) 获得leaderIsrUpdateLock的读锁判断是否需要ISR的缩容 //检查是否需要更新ISRIn-Sync Replica列表并在需要更新时执行更新。 def maybeShrinkIsr(): Unit {def needsIsrUpdate: Boolean {//检查partitionState.isInflight是否为false并在获取leaderIsrUpdateLock的读锁内部调用needsShrinkIsr()来判断。!partitionState.isInflight inReadLock(leaderIsrUpdateLock) {needsShrinkIsr()}}if (needsIsrUpdate) {val alterIsrUpdateOpt inWriteLock(leaderIsrUpdateLock) {leaderLogIfLocal.flatMap { leaderLog //获取超过指定延迟时间的不同步副本的ID列表。val outOfSyncReplicaIds getOutOfSyncReplicas(replicaLagTimeMaxMs)partitionState match {case currentState: CommittedPartitionState if outOfSyncReplicaIds.nonEmpty //省略代码//准备更新ISR的操作。Some(prepareIsrShrink(currentState, outOfSyncReplicaIds))case _ None}}}//submitAlterPartition在LeaderAndIsr锁之外发送AlterPartition请求因为完成逻辑可能会增加高水位线high watermark并完成延迟操作。alterIsrUpdateOpt.foreach(submitAlterPartition)}}其中needsShrinkIsr的结果决定下面是否执行修改ISR操作 private def needsShrinkIsr(): Boolean {leaderLogIfLocal.exists { _ getOutOfSyncReplicas(replicaLagTimeMaxMs).nonEmpty }}/*** 如果追随者已经拥有与领导者相同leo则不会被视为不同步* 1、卡住的追随者如果副本的 leo 尚未针对 maxLagMs ms 进行更新则跟随者卡住应从 ISR 中删除* 2、慢速跟随器如果复制副本在最近 maxLagM 毫秒内未读取 leo则跟随器滞后应从 ISR 中删除* 这两种情况都是通过检查 lastCaughtUpTimeMs 来处理的该 lastCaughtUpTimeM 表示副本完全赶上的最后时间。如果违反上述任一条件则该副本将被视为不同步*如果 ISR 更新正在进行中我们将在此处返回一个空集**/def getOutOfSyncReplicas(maxLagMs: Long): Set[Int] {val current partitionStateif (!current.isInflight) {val candidateReplicaIds current.isr - localBrokerIdval currentTimeMs time.milliseconds()val leaderEndOffset localLogOrException.logEndOffsetcandidateReplicaIds.filter(replicaId isFollowerOutOfSync(replicaId, leaderEndOffset, currentTimeMs, maxLagMs))} else {Set.empty}} private def isFollowerOutOfSync(replicaId: Int,leaderEndOffset: Long,currentTimeMs: Long,maxLagMs: Long): Boolean {getReplica(replicaId).fold(true) { followerReplica //这里需要注意是感叹号结果取反!followerReplica.stateSnapshot.isCaughtUp(leaderEndOffset, currentTimeMs, maxLagMs)} } def isCaughtUp(leaderEndOffset: Long,currentTimeMs: Long,replicaMaxLagMs: Long): Boolean {//如果leo副本日志的logEndOffset或者当前时间减去最后的拉取时间间隔小于等于replicaMaxLagMs则返回trueleaderEndOffset logEndOffset || currentTimeMs - lastCaughtUpTimeMs replicaMaxLagMs} } (2)得到leaderIsrUpdateLock的写锁开始修改ISR 执行的操作是prepareIsrShrink方法 //在缩小 ISR 时我们不能假设更新会成功因为如果“AlterPartition”失败这可能会错误地推进HW。// 因此“PendingShrinkIsr”的“最大 ISR”是当前的 ISR。private[cluster] def prepareIsrShrink(currentState: CommittedPartitionState,outOfSyncReplicaIds: Set[Int]): PendingShrinkIsr {//把要去掉的副本从ISR中去掉val isrToSend partitionState.isr -- outOfSyncReplicaIds//组建一个新的ISRval isrWithBrokerEpoch addBrokerEpochToIsr(isrToSend.toList)val newLeaderAndIsr LeaderAndIsr(localBrokerId,leaderEpoch,partitionState.leaderRecoveryState,isrWithBrokerEpoch,partitionEpoch)val updatedState PendingShrinkIsr(outOfSyncReplicaIds,newLeaderAndIsr,currentState)partitionState updatedStateupdatedState}(3) 缩容后的ISR先赋值给maximalIsrisr还是保持没有缩容前的 PendingShrinkIsr方法会给ISR赋值 case class PendingShrinkIsr(outOfSyncReplicaIds: Set[Int],sentLeaderAndIsr: LeaderAndIsr,lastCommittedState: CommittedPartitionState ) extends PendingPartitionChange {val isr lastCommittedState.isrval maximalIsr isrval isInflight truedef notifyListener(alterPartitionListener: AlterPartitionListener): Unit {alterPartitionListener.markIsrShrink()}override def toString: String {sPendingShrinkIsr(outOfSyncReplicaIds$outOfSyncReplicaIds s, sentLeaderAndIsr$sentLeaderAndIsr s, leaderRecoveryState$leaderRecoveryState s, lastCommittedState$lastCommittedState )} }三、Follower请求Leader的Fetch数据时会判断是否加入ISR 在kafkaApis.scala中的fetch请求处理逻辑中有判断此次请求是Follower请求还是消费者的请求或者你可以看一下kakfa 3.5 kafka服务端处理消费者客户端拉取数据请求源码 def fetchRecords(fetchParams: FetchParams,fetchPartitionData: FetchRequest.PartitionData,fetchTimeMs: Long,maxBytes: Int,minOneMessage: Boolean,updateFetchState: Boolean): LogReadInfo {def readFromLocalLog(log: UnifiedLog): LogReadInfo {readRecords(log,fetchPartitionData.lastFetchedEpoch,fetchPartitionData.fetchOffset,fetchPartitionData.currentLeaderEpoch,maxBytes,fetchParams.isolation,minOneMessage)}//判断获取数据的请求是否来自Followerif (fetchParams.isFromFollower) {// Check that the request is from a valid replica before doing the readval (replica, logReadInfo) inReadLock(leaderIsrUpdateLock) {val localLog localLogWithEpochOrThrow(fetchPartitionData.currentLeaderEpoch,fetchParams.fetchOnlyLeader)val replica followerReplicaOrThrow(fetchParams.replicaId,fetchPartitionData)val logReadInfo readFromLocalLog(localLog)(replica, logReadInfo)}//todo Follower副本在fetch数据后修改一些信息if (updateFetchState !logReadInfo.divergingEpoch.isPresent) {//如果 fetch 来自 broker 的副本同步,那么就更新相关的 log end offsetupdateFollowerFetchState(replica,followerFetchOffsetMetadata logReadInfo.fetchedData.fetchOffsetMetadata,followerStartOffset fetchPartitionData.logStartOffset,followerFetchTimeMs fetchTimeMs,leaderEndOffset logReadInfo.logEndOffset,fetchParams.replicaEpoch)}logReadInfo} //省略代码}其中updateFollowerFetchState就是获取数据后进行一些处理 def updateFollowerFetchState(replica: Replica,followerFetchOffsetMetadata: LogOffsetMetadata,followerStartOffset: Long,followerFetchTimeMs: Long,leaderEndOffset: Long,brokerEpoch: Long): Unit {//通过判断是否存在延迟的DeleteRecordsRequest来确定是否需要计算低水位lowWatermarkIfLeader。如果没有延迟的DeleteRecordsRequest则将oldLeaderLW设为-1。val oldLeaderLW if (delayedOperations.numDelayedDelete 0) lowWatermarkIfLeader else -1L//获取副本的先前的跟随者日志结束偏移量val prevFollowerEndOffset replica.stateSnapshot.logEndOffset//调用replica.updateFetchState方法来更新副本的抓取状态包括跟随者的抓取偏移量元数据、起始偏移量、抓取时间、领导者的结束偏移量和代理节点的时期。replica.updateFetchState(followerFetchOffsetMetadata,followerStartOffset,followerFetchTimeMs,leaderEndOffset,brokerEpoch)//再次判断是否存在延迟的DeleteRecordsRequest如果没有则将newLeaderLW设为-1。val newLeaderLW if (delayedOperations.numDelayedDelete 0) lowWatermarkIfLeader else -1L//检查分区的低水位是否增加即新的低水位newLeaderLW是否大于旧的低水位oldLeaderLW。val leaderLWIncremented newLeaderLW oldLeaderLW//调用maybeExpandIsr方法来检查是否需要将该同步副本添加到ISRIn-Sync Replicas中。maybeExpandIsr(replica)//检查分区的高水位是否可以增加。如果副本的日志结束偏移量replica.stateSnapshot.logEndOffset发生变化val leaderHWIncremented if (prevFollowerEndOffset ! replica.stateSnapshot.logEndOffset) {//尝试增加高水位maybeIncrementLeaderHW方法并在leaderIsrUpdateLock锁的保护下执行该操作。inReadLock(leaderIsrUpdateLock) {leaderLogIfLocal.exists(leaderLog maybeIncrementLeaderHW(leaderLog, followerFetchTimeMs))}} else {false}//如果低水位或高水位发生变化则尝试完成延迟请求tryCompleteDelayedRequests方法。if (leaderLWIncremented || leaderHWIncremented)tryCompleteDelayedRequests()}1、获得leaderIsrUpdateLock的读锁后再判断是否符合加入ISR条件 其中maybeExpandIsr方法会尝试把当前副本添加到ISR和上面定时任务触发的maybeShrinkIsr差不多 /*** //检查并可能扩展分区的 ISR。*如果副本的 LEO current hw并且它在当前前导纪元内被赶到偏移量则会将其添加到 ISR 中。* 副本必须先赶到当前领导者纪元然后才能加入 ISR* 否则如果当前领导者的HW和 LEO 之间存在已提交的数据则副本可能会在获取已提交数据之前成为领导者并且数据将丢失。*/private def maybeExpandIsr(followerReplica: Replica): Unit {//partitionState不在inflight状态 并且ISR不包含此Follower副本并且分区状态不是isInflighttrue再获取leaderIsrUpdateLock读锁val needsIsrUpdate !partitionState.isInflight canAddReplicaToIsr(followerReplica.brokerId) inReadLock(leaderIsrUpdateLock) {//再一次判断是否符合条件到ISR的条件needsExpandIsr(followerReplica)}if (needsIsrUpdate) {//经过needsIsrUpdate的验证Follower符合添加到ISR的条件则获得leaderIsrUpdateLock的写锁进行操作val alterIsrUpdateOpt inWriteLock(leaderIsrUpdateLock) {// check if this replica needs to be added to the ISRpartitionState match {case currentState: CommittedPartitionState if needsExpandIsr(followerReplica) //prepareIsrExpand执行加入操作Some(prepareIsrExpand(currentState, followerReplica.brokerId))case _ None}}// Send the AlterPartition request outside of the LeaderAndIsr lock since the completion logic// may increment the high watermark (and consequently complete delayed operations).alterIsrUpdateOpt.foreach(submitAlterPartition)}}private def needsExpandIsr(followerReplica: Replica): Boolean {//isFollowerInSync 会判断Follower副本的leo是否大于当前Leader的HW大于则为truecanAddReplicaToIsr(followerReplica.brokerId) isFollowerInSync(followerReplica)}//条件1private def canAddReplicaToIsr(followerReplicaId: Int): Boolean {val current partitionState!current.isInflight !current.isr.contains(followerReplicaId) isReplicaIsrEligible(followerReplicaId)}//判断副本是否符合成为ISRIn-Sync Replica的条件private def isReplicaIsrEligible(followerReplicaId: Int): Boolean {metadataCache match {//对于KRaft元数据缓存//1、副本没有被标记为已隔离fenced//2、副本不处于受控关机状态controlled shutdown。//3、副本的元数据缓存的Broker epoch与其Fetch请求的Broker epoch匹配或者Fetch请求的Broker epoch为-1绕过epoch验证。case kRaftMetadataCache: KRaftMetadataCache val storedBrokerEpoch remoteReplicasMap.get(followerReplicaId).stateSnapshot.brokerEpochval cachedBrokerEpoch kRaftMetadataCache.getAliveBrokerEpoch(followerReplicaId)!kRaftMetadataCache.isBrokerFenced(followerReplicaId) !kRaftMetadataCache.isBrokerShuttingDown(followerReplicaId) isBrokerEpochIsrEligible(storedBrokerEpoch, cachedBrokerEpoch)//对于ZK元数据缓存只需确保副本是存活的Broker即可。尽管这里没有检查正在关闭的Broker但控制器会阻止它们加入ISR。case zkMetadataCache: ZkMetadataCache zkMetadataCache.hasAliveBroker(followerReplicaId)case _ true}} //条件2private def isFollowerInSync(followerReplica: Replica): Boolean {leaderLogIfLocal.exists { leaderLog val followerEndOffset followerReplica.stateSnapshot.logEndOffsetfollowerEndOffset leaderLog.highWatermark leaderEpochStartOffsetOpt.exists(followerEndOffset _)}} 2、获得leaderIsrUpdateLock的写锁再执行写入操作 方法是prepareIsrExpand //在扩展 ISR 时我们假设新副本将在我们收到确认之前将其放入 ISR。// 这可确保HW已经反映更新的 ISR即使在我们收到确认之前有延迟。// 或者如果更新失败则不会造成任何损害因为扩展的 ISR 对HW的推进提出了更严格的要求。private def prepareIsrExpand(currentState: CommittedPartitionState,newInSyncReplicaId: Int): PendingExpandIsr {//将当前的ISR与新的In-Sync Replica ID相结合得到要发送的ISR列表isrToSendval isrToSend partitionState.isr newInSyncReplicaId//调用addBrokerEpochToIsr方法为ISR列表中的每个副本添加Broker Epoch并将结果存储在isrWithBrokerEpoch中。val isrWithBrokerEpoch addBrokerEpochToIsr(isrToSend.toList)//使用localBrokerId作为新的leader将其他参数从当前的分区状态中获取并创建一个新的LeaderAndIsr对象newLeaderAndIsr。val newLeaderAndIsr LeaderAndIsr(localBrokerId,leaderEpoch,partitionState.leaderRecoveryState,isrWithBrokerEpoch,partitionEpoch)//创建一个PendingExpandIsr对象updatedState其中包含新的In-Sync Replica ID、新的LeaderAndIsr对象和当前状态val updatedState PendingExpandIsr(newInSyncReplicaId,newLeaderAndIsr,currentState)//将partitionState更新为updatedState。//返回updatedState作为结果。partitionState updatedStateupdatedState}3、写入操作把新的Follower副本先加入maximalIsrisr保持扩容前的 case class PendingExpandIsr(newInSyncReplicaId: Int,sentLeaderAndIsr: LeaderAndIsr,lastCommittedState: CommittedPartitionState ) extends PendingPartitionChange {//这个是现在正在生效的ISR集合val isr lastCommittedState.isr//而maximalIsr包含还没有正式生效的,防止因为修改失败影响流程val maximalIsr isr newInSyncReplicaIdval isInflight truedef notifyListener(alterPartitionListener: AlterPartitionListener): Unit {alterPartitionListener.markIsrExpand()}override def toString: String {sPendingExpandIsr(newInSyncReplicaId$newInSyncReplicaId s, sentLeaderAndIsr$sentLeaderAndIsr s, leaderRecoveryState$leaderRecoveryState s, lastCommittedState$lastCommittedState )} }四、修改完maximalIsr后都要把信息发给其他副本 上面不管是定时任务中的maybeShrinkIsr还是fetch请求中的maybeExpandIsr方法都会执行到下面这个函数 alterIsrUpdateOpt.foreach(submitAlterPartition)private def submitAlterPartition(proposedIsrState: PendingPartitionChange): CompletableFuture[LeaderAndIsr] {debug(sSubmitting ISR state change $proposedIsrState)//alterIsrManager.submit是提交 ISR 状态更改zk模式和kraft模式执行不同的函数//zk是ZkAlterPartitionManager中的submit//kraft是DefaultAlterPartitionManager中的submitval future alterIsrManager.submit(new TopicIdPartition(topicId.getOrElse(Uuid.ZERO_UUID), topicPartition),proposedIsrState.sentLeaderAndIsr,controllerEpoch)future.whenComplete { (leaderAndIsr, e) var hwIncremented falsevar shouldRetry falseinWriteLock(leaderIsrUpdateLock) {if (partitionState ! proposedIsrState) {//这意味着partitionState在我们得到AlterPartition响应之前是通过领导者选举或其他机制更新的。我们不知道控制器上到底发生了什么但我们知道此响应已过时因此我们忽略它。//省略代码} else if (leaderAndIsr ! null) {//修改ISR并且返回高位水是否递增hwIncremented handleAlterPartitionUpdate(proposedIsrState, leaderAndIsr)} else {shouldRetry handleAlterPartitionError(proposedIsrState, Errors.forException(e))}}//高水位标记是否增加。if (hwIncremented) {tryCompleteDelayedRequests()}if (shouldRetry) {//需要重试则自己调用自己submitAlterPartition(proposedIsrState)}}}1、zk模式 //将给定的leaderAndIsr信息写入ZooKeeper并返回一个LeaderAndIsr对象。override def submit(topicIdPartition: TopicIdPartition,leaderAndIsr: LeaderAndIsr,controllerEpoch: Int): CompletableFuture[LeaderAndIsr] {debug(sWriting new ISR ${leaderAndIsr.isr} to ZooKeeper with version s${leaderAndIsr.partitionEpoch} for partition $topicIdPartition)//调用ReplicationUtils.updateLeaderAndIsr方法更新ZooKeeper中的leaderAndIsr信息并返回更新是否成功updateSucceeded以及新的版本号newVersion。val (updateSucceeded, newVersion) ReplicationUtils.updateLeaderAndIsr(zkClient, topicIdPartition.topicPartition,leaderAndIsr, controllerEpoch)val future new CompletableFuture[LeaderAndIsr]()if (updateSucceeded) {//使用synchronized关键字同步访问isrChangeSet// Track which partitions need to be propagated to the controller//isrChangeSet是通过定时任务触发isrChangeSet synchronized {//将topicIdPartition.topicPartition添加到isrChangeSet中。isrChangeSet topicIdPartition.topicPartition//使用lastIsrChangeMs记录最后一次ISR更改的时间。lastIsrChangeMs.set(time.milliseconds())}//使用leaderAndIsr.withPartitionEpoch(newVersion)更新leaderAndIsr的分区时代并将其设置为future的结果。future.complete(leaderAndIsr.withPartitionEpoch(newVersion))} else {//省略代码}future}定时任务修改zk节点进行传播 kakfaServer.scala中启动函数会执行如下命令 alterPartitionManager.start()其中alterPartitionManager的实现是ZkAlterPartitionManager 实际执行的是如下代码创建定时任务 override def start(): Unit {scheduler.schedule(isr-change-propagation, () maybePropagateIsrChanges(), 0L,isrChangeNotificationConfig.checkIntervalMs)}/*** 此函数定期运行以查看是否需要传播 ISR。它在以下情况下传播 ISR* 1. 尚未传播 ISR 更改。* 2. 最近 5 秒内没有 ISR 更改或者自上次 ISR 传播以来已超过 60 秒。* 这允许在几秒钟内传播偶尔的 ISR 更改并避免在发生大量 ISR 更改时使控制器和其他代理不堪重负。*/private[server] def maybePropagateIsrChanges(): Unit {val now time.milliseconds()isrChangeSet synchronized {if (isrChangeSet.nonEmpty (lastIsrChangeMs.get() isrChangeNotificationConfig.lingerMs now ||lastIsrPropagationMs.get() isrChangeNotificationConfig.maxDelayMs now)) {zkClient.propagateIsrChanges(isrChangeSet)isrChangeSet.clear()lastIsrPropagationMs.set(now)}}}2、kraft模式 override def submit(topicIdPartition: TopicIdPartition,leaderAndIsr: LeaderAndIsr,controllerEpoch: Int): CompletableFuture[LeaderAndIsr] {val future new CompletableFuture[LeaderAndIsr]()val alterPartitionItem AlterPartitionItem(topicIdPartition, leaderAndIsr, future, controllerEpoch)//把要修改的LeaderAndIsr信息放入到map中val enqueued unsentIsrUpdates.putIfAbsent(alterPartitionItem.topicIdPartition.topicPartition, alterPartitionItem) nullif (enqueued) {maybePropagateIsrChanges()} else {future.completeExceptionally(new OperationNotAttemptedException(sFailed to enqueue ISR change state $leaderAndIsr for partition $topicIdPartition))}future}private[server] def maybePropagateIsrChanges(): Unit {//如果尚未收到请求请发送所有待处理项目。if (!unsentIsrUpdates.isEmpty inflightRequest.compareAndSet(false, true)) {//复制当前未发送的 ISR但不从映射中删除它们会在响应处理程序中清除val inflightAlterPartitionItems new ListBuffer[AlterPartitionItem]()unsentIsrUpdates.values.forEach(item inflightAlterPartitionItems.append(item))sendRequest(inflightAlterPartitionItems.toSeq)}} 通过给controllerChannelManager发送请求通知 其中controllerChannelManager是在BrokerServer.scala初始化时执行 alterPartitionManager.start() 实现类是DefaultAlterPartitionManager执行的是start方法方法内部是controllerChannelManager.start() private def sendRequest(inflightAlterPartitionItems: Seq[AlterPartitionItem]): Unit {val brokerEpoch brokerEpochSupplier()//构建一个AlterPartition请求并返回请求对象request以及一个映射topicNamesByIdsval (request, topicNamesByIds) buildRequest(inflightAlterPartitionItems, brokerEpoch)debug(sSending AlterPartition to controller $request)//我们不会使 AlterPartition 请求超时而是让它无限期地重试直到收到响应或者新的 LeaderAndIsr 覆盖现有的 isrState从而导致忽略这些分区的响应//controllerChannelManager.sendRequest方法用于将请求发送给控制器并提供一个ControllerRequestCompletionHandler作为回调处理程序。controllerChannelManager.sendRequest(request,new ControllerRequestCompletionHandler {override def onComplete(response: ClientResponse): Unit {debug(sReceived AlterPartition response $response)val error try {if (response.authenticationException ! null) {// For now we treat authentication errors as retriable. We use the// NETWORK_EXCEPTION error code for lack of a good alternative.// Note that BrokerToControllerChannelManager will still log the// authentication errors so that users have a chance to fix the problem.Errors.NETWORK_EXCEPTION} else if (response.versionMismatch ! null) {Errors.UNSUPPORTED_VERSION} else {//处理响应handleAlterPartitionResponse(response.requestHeader,response.responseBody.asInstanceOf[AlterPartitionResponse],brokerEpoch,inflightAlterPartitionItems,topicNamesByIds)}} finally {// clear the flag so future requests can proceedclearInFlightRequest()}//省略代码}//省略代码})} 其中handleAlterPartitionResponse是处理请求后响应结果的函数 def handleAlterPartitionResponse(requestHeader: RequestHeader,alterPartitionResp: AlterPartitionResponse,sentBrokerEpoch: Long,inflightAlterPartitionItems: Seq[AlterPartitionItem],topicNamesByIds: mutable.Map[Uuid, String]): Errors {val data alterPartitionResp.dataErrors.forCode(data.errorCode) match {//省略代码。。。。case Errors.NONE //创建一个partitionResponses的可变哈希映射用于存储分区级别的响应。val partitionResponses new mutable.HashMap[TopicPartition, Either[Errors, LeaderAndIsr]]()data.topics.forEach { topic //省略代码topic.partitions.forEach { partition //创建一个TopicPartition对象表示主题和分区索引。val tp new TopicPartition(topicName, partition.partitionIndex)val apiError Errors.forCode(partition.errorCode)debug(sController successfully handled AlterPartition request for $tp: $partition)if (apiError Errors.NONE) {//解析分区的leaderRecoveryState如果有效则将分区的响应存储到partitionResponses中。LeaderRecoveryState.optionalOf(partition.leaderRecoveryState).asScala match {case Some(leaderRecoveryState) partitionResponses(tp) Right(LeaderAndIsr(partition.leaderId,partition.leaderEpoch,partition.isr.asScala.toList.map(_.toInt),leaderRecoveryState,partition.partitionEpoch))//省略代码 }} else {partitionResponses(tp) Left(apiError)}}}//遍历入参的inflightAlterPartitionItems可以和响应结果对应inflightAlterPartitionItems.foreach { inflightAlterPartition partitionResponses.get(inflightAlterPartition.topicIdPartition.topicPartition) match {case Some(leaderAndIsrOrError) //如果找到响应将其从unsentIsrUpdates中移除并根据响应的类型完成inflightAlterPartition.future。unsentIsrUpdates.remove(inflightAlterPartition.topicIdPartition.topicPartition)leaderAndIsrOrError match {case Left(error) inflightAlterPartition.future.completeExceptionally(error.exception)case Right(leaderAndIsr) inflightAlterPartition.future.complete(leaderAndIsr)}//省略代码}}//省略代码}//省略代码}五、 maximalIsr和isr 1、PartitionState中isr和maximalIsr两个字段的定义和为什么上面只是修改了maximalIsr sealed trait PartitionState {/*** 仅包括已提交到 ZK 的同步副本。*/def isr: Set[Int]/***此集可能包括扩展后未提交的 ISR 成员。此“有效”ISR 用于推进高水位线以及确定 acksall produce 请求需要哪些副本*/def maximalIsr: Set[Int]/*** The leader recovery state. See the description for LeaderRecoveryState for details on the different values.*/def leaderRecoveryState: LeaderRecoveryState/*** 指示我们是否有正在进行的 更改分区 请求。*/def isInflight: Boolean }原因以maybeShrinkIsr举例: maybeShrinkIsr方法更新的是maximalIsr变量而不是ISR列表本身。maximalIsr是一个优化变量用于表示在上一次调用maybeShrinkIsr方法时ISR列表的最大长度。这样Kafka可以通过检查当前ISR列表的长度与maximalIsr的大小来判断是否需要进行收缩操作。更新maximalIsr变量而不是直接更新ISR列表本身可以减少内存拷贝的开销因为ISR列表可能在方法调用期间频繁地被更新。另外只更新maximalIsr变量而不更新ISR列表本身可以保持ISR列表的稳定性以便其他并发操作可以安全地访问ISR列表。 2、什么时候maximalIsr会给isr赋值 这里折磨了我2天还是没找到什么时候isr中的数据会根据maximalIsr修改网关资料都没有查到只是说适当的时机这个时机在哪里或者都讲解到修改maximalIsr就结束了就认为isr修改成功了我连单元测试都看了下面分析一个单元测试大家如果有结果可以在评论里给一下答案 ParameterizedTest ValueSource(strings Array(zk, kraft)) def testIsrNotExpandedIfReplicaIsFencedOrShutdown(quorum: String): Unit { val kraft quorum kraft val log logManager.getOrCreateLog(topicPartition, topicId None) seedLogData(log, numRecords 10, leaderEpoch 4) val controllerEpoch 0 val leaderEpoch 5 val remoteBrokerId brokerId 1 val replicas List(brokerId, remoteBrokerId) val isr Set(brokerId) val metadataCache: MetadataCache if (kraft) mock(classOf[KRaftMetadataCache]) else mock(classOf[ZkMetadif (kraft) { addBrokerEpochToMockMetadataCache(metadataCache.asInstanceOf[KRaftMetadataCache], replicas) } // Mark the remote broker as eligible or ineligible in the metadata cache of the leader. // When using kraft, we can make the broker ineligible by fencing it. // In ZK mode, we must mark the broker as alive for it to be eligible. def markRemoteReplicaEligible(eligible: Boolean): Unit { if (kraft) { when(metadataCache.asInstanceOf[KRaftMetadataCache].isBrokerFenced(remoteBrokerId)).thenReturn(!eligi} else { when(metadataCache.hasAliveBroker(remoteBrokerId)).thenReturn(eligible) } } //初始化分区 val partition new Partition( topicPartition, replicaLagTimeMaxMs Defaults.ReplicaLagTimeMaxMs, interBrokerProtocolVersion MetadataVersion.latest, localBrokerId brokerId, () defaultBrokerEpoch(brokerId), time, alterPartitionListener, delayedOperations, metadataCache, logManager, alterPartitionManager ) partition.createLogIfNotExists(isNew false, isFutureReplica false, offsetCheckpoints, None) assertTrue(partition.makeLeader( new LeaderAndIsrPartitionState() .setControllerEpoch(controllerEpoch) .setLeader(brokerId) .setLeaderEpoch(leaderEpoch) .setIsr(isr.toList.map(Int.box).asJava) .setPartitionEpoch(1) .setReplicas(replicas.map(Int.box).asJava) .setIsNew(true), offsetCheckpoints, None), Expected become leader transition to succeed) assertEquals(isr, partition.partitionState.isr) assertEquals(isr, partition.partitionState.maximalIsr) markRemoteReplicaEligible(true) // Fetch to let the follower catch up to the log end offset and // to check if an expansion is possible. //获取以让追随者赶上日志结束偏移量和检查是否可以扩展 fetchFollower(partition, replicaId remoteBrokerId, fetchOffset log.logEndOffset) // Follower fetches and catches up to the log end offset. //追随者获取并赶上日志结束偏移量。 assertReplicaState(partition, remoteBrokerId, lastCaughtUpTimeMs time.milliseconds(), logStartOffset 0L, logEndOffset log.logEndOffset ) // Expansion is triggered. //扩展被触发。 assertEquals(isr, partition.partitionState.isr) assertEquals(replicas.toSet, partition.partitionState.maximalIsr) assertEquals(1, alterPartitionManager.isrUpdates.size) // Controller rejects the expansion because the broker is fenced or offline. //控制器拒绝扩展因为代理处于受防护或脱机状态。 alterPartitionManager.failIsrUpdate(Errors.INELIGIBLE_REPLICA) // The leader reverts back to the previous ISR. //领导者将恢复到以前的 ISR。 assertEquals(isr, partition.partitionState.isr) assertEquals(isr, partition.partitionState.maximalIsr) assertFalse(partition.partitionState.isInflight) assertEquals(0, alterPartitionManager.isrUpdates.size) // The leader eventually learns about the fenced or offline broker. markRemoteReplicaEligible(false) // The follower fetches again. //追随者再次获取 fetchFollower(partition, replicaId remoteBrokerId, fetchOffset log.logEndOffset) // Expansion is not triggered because the follower is fenced. //不会触发扩展因为追随者被围栏 assertEquals(isr, partition.partitionState.isr) assertEquals(isr, partition.partitionState.maximalIsr) assertFalse(partition.partitionState.isInflight) assertEquals(0, alterPartitionManager.isrUpdates.size) // The broker is eventually unfenced or brought back online. //经纪人最终被解除围栏或重新上线。 markRemoteReplicaEligible(true) // The follower fetches again. //追随者再次获取。 fetchFollower(partition, replicaId remoteBrokerId, fetchOffset log.logEndOffset) // Expansion is triggered. //扩展被触发。 assertEquals(isr, partition.partitionState.isr) assertEquals(replicas.toSet, partition.partitionState.maximalIsr) assertTrue(partition.partitionState.isInflight) assertEquals(1, alterPartitionManager.isrUpdates.size) // Expansion succeeds. //扩容成功。 alterPartitionManager.completeIsrUpdate(newPartitionEpoch 1) // ISR is committed. //todo ISR 已提交。 assertEquals(replicas.toSet, partition.partitionState.isr) assertEquals(replicas.toSet, partition.partitionState.maximalIsr) assertFalse(partition.partitionState.isInflight) assertEquals(0, alterPartitionManager.isrUpdates.size) } 注意上面alterPartitionManager.completeIsrUpdate(newPartitionEpoch 1) 在这条命令之前maximalIsr已经是最新的了而isr还是旧的当执行完这个命令后isr和maximalIsr已经相同了都是最新的了 其中alterPartitionManager.completeIsrUpdate执行的是TestUtils类中如下方法 class MockAlterPartitionManager extends AlterPartitionManager {val isrUpdates: mutable.Queue[AlterPartitionItem] new mutable.Queue[AlterPartitionItem]()val inFlight: AtomicBoolean new AtomicBoolean(false)//这个命令会在fetchFollower命令里面执行执行链条//fetchFollower-fetchRecords-updateFollowerFetchState-maybeExpandIsr-submitAlterPartition-submit//主要是把数据存入isrUpdatesoverride def submit(topicPartition: TopicIdPartition,leaderAndIsr: LeaderAndIsr,controllerEpoch: Int): CompletableFuture[LeaderAndIsr] {val future new CompletableFuture[LeaderAndIsr]()if (inFlight.compareAndSet(false, true)) {isrUpdates AlterPartitionItem(topicPartition,leaderAndIsr,future,controllerEpoch)} else {future.completeExceptionally(new OperationNotAttemptedException(sFailed to enqueue AlterIsr request for $topicPartition since there is already an inflight request))}future}def completeIsrUpdate(newPartitionEpoch: Int): Unit {if (inFlight.compareAndSet(true, false)) {val item isrUpdates.dequeue()//第四章节kraft模式inflightAlterPartition.future.complete//第四章节zk模式future.complete(leaderAndIsr.withPartitionEpoch(newVersion))item.future.complete(item.leaderAndIsr.withPartitionEpoch(newPartitionEpoch))} else {fail(Expected an in-flight ISR update, but there was none)}}} 其中isrUpdates.dequeue()出来的就是AlterPartitionItem之后执行item.future.complete,之后isr修改完了很莫名其妙 我分析了第四章节和这个命令一样功能代码他这里也没有future.whenComplete的后续处理但是也修改了isr不明白
http://www.tj-hxxt.cn/news/133921.html

相关文章:

  • o2o商城网站系统开发服装公司网站多少钱
  • 聊城做wap网站价格阿里云的网站建设好不好
  • 嘉兴云推广网站动感地带套餐
  • 国家商标查询官方网站wordpress 免费 旅游
  • 各省网站备案时长网站建设 创业
  • 化工网站模板网站制作收费标准
  • 西昌手机网站制作the7做的网站
  • 咨询类网站建设wordpress实例教程
  • 企业网站建设的成本phicomm怎么做网站
  • 怎么免费上传网页网站网页空间的利用要
  • 新余网站网站建设上海 网站 备案
  • 做网站的公司面试帆客建设网站
  • 媒体门户网站建设方案深圳公司名称
  • 医馆网站建设方案重庆佳宇建设集团网站
  • 企业网站的种类校园网站建设考评办法
  • 河北邢台官方网站odoo做网站
  • 小说网站开发需求站点推广促销
  • 申请一个域名后怎么做网站wordpress手赚推广
  • 平原县网站seo优化排名云南建投第十建设有限公司网站
  • 百度网站开发基于什么语言站群网站源码
  • 做房产抵押网站需要什么做国外直播网站
  • 怎么建网站做淘宝客页面设置
  • 全国城市雕塑建设官方网站网站 主营业务
  • wordpress多功能图片主题windows优化大师有用吗
  • 白云网站建设价格广告网站定制
  • 石家庄智能模板建站萝岗网站建设制作
  • 云服务器上建网站wordpress怎么上传文件
  • 网站视频大全字体设计网
  • 做企业网站推广多少钱中国有哪些软件公司
  • 做营销网站视频中国建设教育协会的官方网站