深圳网站平台制作,外贸网站建设 佛山,凡科建设网站如何对话框,做网站年入千万文章目录 1. 概要2. TimingWheel2.1 核心参数2.2 添加任务2.3 推进时间 3. TimerTaskList3.1 添加节点3.2 删除节点3.3 刷新链表3.4 队列相关 4. 时间轮链表节点-TimerTaskEntry5. TimerTask6. Timer 和 SystemTimer - 设计降级逻辑7. 上层调用8. 小结 1. 概要
时间轮的文章
定时/延时任务-Netty时间轮的使用定时/延时任务-时间轮定时/延时任务-实现一个简单时间轮 定时/延时任务-实现一个分层时间轮定时/延时任务-Netty时间轮源码分析
上一篇文章中介绍了 Netty 时间轮的源码分析这篇文章就接着来看下 Kafka 的源码分析由于 Kafka 是使用的 Scala 语言所以可能会有点难分析不过还是会尽量说清楚的。
2. TimingWheel
2.1 核心参数
Kafka的时间轮实现比较简单主要核心参数就在 TimingWheel 里面那么下面就先看下核心参数
nonthreadsafe
private[timer] class TimingWheel(tickMs: Long, wheelSize: Int, startMs: Long, taskCounter: AtomicInteger, queue: DelayQueue[TimerTaskList]) {...
}首先这里就是 TimingWheel 的类定义同时构造函数参数列表
tickMs: Long表示时间轮的每个时间刻度tick的毫秒数wheelSize: Int表示时间轮的大小即有多少个时间刻度startMs: Long表示时间轮的起始时间以毫秒为单位taskCounter: AtomicInteger用来计数的原子整数表示任务的数量queue: DelayQueue[TimerTaskList]延迟队列用于存储时间轮的任务列表
上面几个就是时间轮构造参数下面就是时间轮的几个核心参数的构造
private[this] val interval tickMs * wheelSize
private[this] val buckets Array.tabulate[TimerTaskList](wheelSize) { _ new TimerTaskList(taskCounter) }
private[this] var currentTime startMs - (startMs % tickMs)
private[this] var overflowWheel: TimingWheel null首先是 interval这个就是我们之前说的一层时间轮的时间间隔在分层时间轮下当前层的时间轮间隔就是当前时间轮的格子数wheelSize * 每一格的时间间隔tickMs buckets 就是当前时间轮中的时间格子数组从代码中也能看到其实这里做的就是创建一个 wheelSize 长度的数组然后分别初始化currentTime 就是当前时间startMs 就是时间轮的启动时间假设启动时间是 43ms一个 tick 的时间是 20ms那么当前时间就是 startMs - (startMs % tickMs)结果就是 43 - 43 % 20 40currentTime 就是控制指针跳动的时间overflowWheel 就是上层时间轮上层时间轮的时间间隔 tickMs 就是本层时间轮的 interval看上面的图就可以看懂。 2.2 添加任务
def add(timerTaskEntry: TimerTaskEntry): Boolean {// 任务的过期时间val expiration timerTaskEntry.expirationMs // 任务取消了if (timerTaskEntry.cancelled) {false} else if (expiration currentTime tickMs) {// 如果任务过期时间已经小于当前时间格子的时间就说明要执行了false} else if (expiration currentTime interval) {// 如果过期时间 currentTime 本层时间轮的时间间隔就说明// 任务可以添加到本层时间轮中val virtualId expiration / tickMs// 获取对应下标的时间格子val bucket buckets((virtualId % wheelSize.toLong).toInt)// 添加任务bucket.add(timerTaskEntry)// 然后设置这个格子的过期时间添加到任务队列里面if (bucket.setExpiration(virtualId * tickMs)) {queue.offer(bucket)}true} else {// 都不满足那就说明过期时间已经超过本层时间轮的管理范围了需要// 到上层时间轮去加入任务if (overflowWheel null) addOverflowWheel()overflowWheel.add(timerTaskEntry)}
}其实里面的逻辑很简单
首先判断下如果任务已经取消了就直接返回 false添加失败判断下如果添加的任务的过期时间已经小于当前时间 一格时间时长那么表示这个任务已经过期了需要执行如果添加的任务小于 当前时间 本层时间轮的总时间那么这个任务还没有执行并且这个任务可以添加到本层时间轮中否则就是说本层时间轮没办法管理这个任务需要把这个任务添加到上层时间轮中
如果上层时间轮为空那么同时也会创建出上层时间轮
private[this] def addOverflowWheel(): Unit {// 加锁synchronized {if (overflowWheel null) {// 创建上层时间轮overflowWheel new TimingWheel(tickMs interval,wheelSize wheelSize,startMs currentTime,taskCounter taskCounter,queue)}}}对当前线程加锁然后创建上层时间轮注意上层时间轮的启动时间是当前时间 currentTime同时上层时间轮的 tickMs 时间间隔是本层时间轮的时间跨度 interval。注意所有时间轮使用一个延时队列。 2.3 推进时间
def advanceClock(timeMs: Long): Unit {if (timeMs currentTime tickMs) {// 设置当前时间currentTime timeMs - (timeMs % tickMs)// 同时也推进上层时间轮if (overflowWheel ! null) overflowWheel.advanceClock(currentTime)}
}设置当前时间其实所谓的推进时间就是推进当前时间在上面也说过了当前时间的计算就是timeMs - (timeMs % tickMs)同时除了推进当前时间轮还需要推进上层时间轮的时间指针。 3. TimerTaskList
下面就来看下时间轮上面的链表定义还是一样我们先看里面的参数定义因为链表其实参数并不多所以不需要一个一个拿出来介绍
private[this] val root new TimerTaskEntry(null, -1)
root.next root
root.prev rootprivate[this] val expiration new AtomicLong(-1L)def setExpiration(expirationMs: Long): Boolean {expiration.getAndSet(expirationMs) ! expirationMs}上面几个就是参数了链表肯定要有头尾节点了不过 kafka 这里是用了一个 root 节点同时作为头尾节点只有一个节点的时候就指向自己。 同时因为时间轮存放到任务队列里面是以 TimerTaskList 为单位去存储的为什么会这样呢前面的文章里面说过一个链表上面的任务延时等级是一样的所以没必要以任务节点为单位去存储这样如果一个链表上面有 100000 个任务延时队列里面就得放 100000 个节点我们直到延时队列时间复杂度是 O(logn)节点一旦比较多消耗的时间就多了。而且这 10000 个节点的过期时间是一样的所以用一个 TimerTaskList 来代替就行了。所以延时队列的节点就是一个个的 TimerTaskList。然后再看下下面的方法。
3.1 添加节点
其实添加节点就是双向链表的添加逻辑。
def add(timerTaskEntry: TimerTaskEntry): Unit {var done falsewhile (!done) {// 先删除这个任务timerTaskEntry.remove()synchronized {// 加锁timerTaskEntry.synchronized {if (timerTaskEntry.list null) {// 链表结构 tail - timerTaskEntry - root// tail rootval tail root.prevtimerTaskEntry.next roottimerTaskEntry.prev tailtimerTaskEntry.list thistail.next timerTaskEntryroot.prev timerTaskEntrytaskCounter.incrementAndGet()done true}}}}
}在添加任务到链表的时候首先会去删除一下这个任务确保这个任务没有在先前被添加到时间轮中然后加锁去添加。添加的时候其实就是形成 tail - timerTaskEntry - root 的结构双向的因为 tail root所以就是一个环形的双向链表。
3.2 删除节点
def remove(timerTaskEntry: TimerTaskEntry): Unit {synchronized {timerTaskEntry.synchronized {if (timerTaskEntry.list eq this) {timerTaskEntry.next.prev timerTaskEntry.prevtimerTaskEntry.prev.next timerTaskEntry.nexttimerTaskEntry.next nulltimerTaskEntry.prev nulltimerTaskEntry.list nulltaskCounter.decrementAndGet()}}}
}删除节点的逻辑也很简单就是两步
timerTaskEntry.next.prev timerTaskEntry.prevtimerTaskEntry.prev.next timerTaskEntry.next
最后再把当前 timerTaskEntry 的属性都置空然后让任务数量 - 1 就可以了。
3.3 刷新链表
所谓刷新链表就是把这个链表上面的所有任务都删掉然后执行传入的函数这个方法是当链表过期的时候就把上面的所有任务都删掉然后一个一个任务执行具体逻辑。
// f 类似 Java8 里面的 function其实这里就是传入一个 f 函数去处理节点
def flush(f: (TimerTaskEntry)Unit): Unit {synchronized {// 从头结点开始遍历var head root.nextwhile (head ne root) {// 调用上面的删除节点方法把节点从链表中移除掉remove(head)// 调用函数把任务添加到线程池中等待调度f(head)// 继续下一个节点head root.next}// 链表都没有任务了当然过期时间就设置成 -1 了expiration.set(-1L)}
}其实里面的 f 函数的逻辑就是把这个任务节点丢到线程池中等待线程去调度也就是具体执行任务。
3.4 队列相关
那既然 TimerTaskList 是要加入延时队列的肯定要有一个获取延时和比较的方法了
def getDelay(unit: TimeUnit): Long {unit.convert(max(getExpiration - Time.SYSTEM.hiResClockMs, 0), TimeUnit.MILLISECONDS)
}def compareTo(d: Delayed): Int {val other d.asInstanceOf[TimerTaskList]java.lang.Long.compare(getExpiration, other.getExpiration)
}getDelay 就是获取延时这里其实就是用任务的 过期时间 - 当前时间如果小于 0最后就会返回 0表示可以马上开始执行了compareTo 就是任务队列里面比较两个任务的延时时间
4. 时间轮链表节点-TimerTaskEntry
private[timer] class TimerTaskEntry(val timerTask: TimerTask, val expirationMs: Long) extends Ordered[TimerTaskEntry] {...
}这里就是链表节点了两个参数分别是任务和延时时间因为内容确实不多所以下面直接给出所有的逻辑。
private[timer] class TimerTaskEntry(val timerTask: TimerTask, val expirationMs: Long) extends Ordered[TimerTaskEntry] {// 所属的链表volatilevar list: TimerTaskList null// 链表前后节点var next: TimerTaskEntry nullvar prev: TimerTaskEntry null// 设置定时任务if (timerTask ! null) timerTask.setTimerTaskEntry(this)// 任务是否取消了如果任务取消了就会从当前节点中删掉所以下面就是// 判断下这个任务的所属的链表节点还是不是自己def cancelled: Boolean {timerTask.getTimerTaskEntry ! this}// 把当前节点从链表中删除掉def remove(): Unit {var currentList listwhile (currentList ! null) {currentList.remove(this)currentList list}}// 比较两个任务的延时时间override def compare(that: TimerTaskEntry): Int {java.lang.Long.compare(expirationMs, that.expirationMs)}
}说实话上面的逻辑确实难看懂没学过 scala 语言的话我也只是大概大概翻译下。核心逻辑能看懂就行了。
5. TimerTask
trait TimerTask extends Runnable {...
}在 Scala 语言中trait TimerTask extends Runnable 是一个特质trait声明表示该特质继承自 Runnable 接口。
trait 是 Scala 中的一个特性类似于 Java 中的接口interface。特质可以包含抽象方法和具体方法也可以有字段和实现。与 Java 接口不同Scala 特质可以混合mixin到类中实现多重继承的效果。
来看下里面的一些参数
val delayMs: Long // timestamp in millisecond
private[this] var timerTaskEntry: TimerTaskEntry null首先就是任务的延时时间 delayMs然后就是这个任务属于哪一个链表节点。再来看下面的几个方法
// 任务取消
def cancel(): Unit {synchronized {// 就是把任务从链表节点中移除掉if (timerTaskEntry ! null) timerTaskEntry.remove()timerTaskEntry null}
}// 设置任务到链表节点上
private[timer] def setTimerTaskEntry(entry: TimerTaskEntry): Unit {synchronized {// 如果任务所属链表节点不等于要设置的节点说明这个任务有可能原来在另一条// 链表上现在要加入当前的链表所以需要首先把任务节点从原来的链表上移除// 因为任务都不在原来的链表上了节点肯定也带删掉if (timerTaskEntry ! null timerTaskEntry ! entry)// 就把当前节点从链表上移除掉timerTaskEntry.remove()// 然后设置所属节点为传入的节点timerTaskEntry entry}
}// 获取当前任务所属的链表节点
private[timer] def getTimerTaskEntry: TimerTaskEntry timerTaskEntry其实里面的方法并不多下面简单来说下
任务取消就是把链表节点从链表中移除掉同时把当前任务所属的链表节点置空逻辑不复杂因为任务都要删除了链表节点肯定不能继续待在链表中的设置任务到新的链表节点这里面如果发现这个任务原来已经设置过了现在要设置到一个新的链表上就需要先把当前节点从链表上移除掉然后再重新设置新的节点
6. Timer 和 SystemTimer - 设计降级逻辑
Timer 是 Scala 定义的一个接口包括几种时间轮的方法下面就来简单看下
trait Timer {/*** Add a new task to this executor. It will be executed after the tasks delay* (beginning from the time of submission)* param timerTask the task to add*/def add(timerTask: TimerTask): Unit/*** Advance the internal clock, executing any tasks whose expiration has been* reached within the duration of the passed timeout.* param timeoutMs* return whether or not any tasks were executed*/def advanceClock(timeoutMs: Long): Boolean/*** Get the number of tasks pending execution* return the number of tasks*/def size: Int/*** Shutdown the timer service, leaving pending tasks unexecuted*/def shutdown(): Unit
}
可以看到 Timer 接口里面定义的四个方法分别就是添加、推进时间轮、时间轮任务数、关闭时间轮那下面就来看下 Timer 的实现类 SystemTimerSystemTimer 也是时间轮的顶层管理类。
threadsafe
class SystemTimer(executorName: String,tickMs: Long 1,wheelSize: Int 20,startMs: Long Time.SYSTEM.hiResClockMs) extends Timer {...
}可以看到SystemTimer 接收几个参数
executorName执行任务的线程名称tickMs默认的最底层时间轮的时间间隔wheelSize每一层时间轮的大小startMs启动时间就是当前时间
下面来看下几个变量
// 执行任务的线程池
private[this] val taskExecutor Executors.newFixedThreadPool(1,(runnable: Runnable) KafkaThread.nonDaemon(executor- executorName, runnable))// 延时队列
private[this] val delayQueue new DelayQueue[TimerTaskList]()
// 任务数量
private[this] val taskCounter new AtomicInteger(0)
// 时间轮
private[this] val timingWheel new TimingWheel(tickMs tickMs,wheelSize wheelSize,startMs startMs,taskCounter taskCounter,delayQueue
)// 读写锁
private[this] val readWriteLock new ReentrantReadWriteLock()
private[this] val readLock readWriteLock.readLock()
private[this] val writeLock readWriteLock.writeLock()下面看下几个方法首先就是添加任务的方法
def add(timerTask: TimerTask): Unit {// 加锁readLock.lock()try {// 添加任务创建一个链表节点把任务放到链表节点中// 再调用 addTimerTaskEntry 把链表节点添加到链表上addTimerTaskEntry(new TimerTaskEntry(timerTask, timerTask.delayMs Time.SYSTEM.hiResClockMs))} finally {// 解锁readLock.unlock()}}private def addTimerTaskEntry(timerTaskEntry: TimerTaskEntry): Unit {// 调用时间轮TimingWheel 的add方法添加if (!timingWheel.add(timerTaskEntry)) {// 添加失败的情况下要么就是过期了要么就是取消了if (!timerTaskEntry.cancelled)// 如果不是取消了那么就要执行这个过期任务taskExecutor.submit(timerTaskEntry.timerTask)}}在添加任务任务的时候如果任务已经过期了或者任务被取消了那么就会判断如果不是任务取消就会把任务丢到线程池里面去执行。上面就是添加的方法下面再看下推进时间轮的方法。
def advanceClock(timeoutMs: Long): Boolean {
// 从延时队列里面获取过期链表超时时间 timeoutMs
var bucket delayQueue.poll(timeoutMs, TimeUnit.MILLISECONDS)// 如果不为空if (bucket ! null) {writeLock.lock()try {while (bucket ! null) {// 推进时间轮timingWheel.advanceClock(bucket.getExpiration)// 然后执行过期链表下面的所有任务bucket.flush(reinsert)// 继续阻塞bucket delayQueue.poll()}} finally {// 解锁writeLock.unlock()}true} else {false}
}// 把链表节点重新添加回时间轮上
private[this] val reinsert (timerTaskEntry: TimerTaskEntry)
addTimerTaskEntry(timerTaskEntry)private def addTimerTaskEntry(timerTaskEntry: TimerTaskEntry): Unit {// 重新添加回时间轮如果添加失败再执行任务if (!timingWheel.add(timerTaskEntry)) {if (!timerTaskEntry.cancelled)taskExecutor.submit(timerTaskEntry.timerTask)}}首先推进时间轮的时候会从延时队列里面获取过期的链表第一次获取超时时间是 timeoutMs这个是上层调用者设置的。这里如果没有获取到任务就不会往下走推进时间轮其实这里就是解决了 Netty 时间轮的空轮转问题。 Netty 的时间轮是不断执行的不管有没有任务过期都会去遍历当前 tick 的链表下面的所有任务同时推进时间轮看看有没有任务需要执行所以如果 Netty 时间轮中有一个很长时间都不会执行的任务在遍历的时候就做了很多 “无用功”。 Kafka 则是通过延时队列的方式没有任务就不会去遍历推进时间轮有了任务才会去处理。所以这也算是一种精确唤醒执行了。 推进时间轮的方法已经说了当推进时间轮之后就回去调用 bucket.flush(reinsert) 方法前面我们说过 flush 会传入一个 f 函数用来执行过期链表上面的任务 这个函数就是 reinsert。 那么问题来了不是说执行过期任务吗为什么是重新把任务添加回时间轮上
这就不得不说下分层时间轮的降级逻辑了分层时间轮中上层时间轮的任务只有降级到最底层时间轮才能被执行。比如现在时间轮的层级是 1 - 2 - 3那么 3 号时间轮上面的任务要降级到 1 才能被执行。那么如何才能降级呢我们直到时间轮是不断被推进的也就是 currentTime 是不断增大的所以当链表节点重新添加回时间轮的时候原本应该添加到 3 号时间轮的节点会添加到 2 号同理 2 号的节点会添加到 1 号还是不清楚的可以去看下概要里面的时间轮介绍。
最后刷新链表完成之后继续阻塞在任务队列里面不过这里阻塞就没有超时时间了因为可以避免无意义的唤醒防止空轮转直到有任务才醒来。如果说时间轮添加了一个更快执行的任务那么在添加方法里面就会往 delay 队列添加一个更早过期的节点这里 SystemTimer 也会被更快唤醒。
7. 上层调用
上面就是时间轮的核心实现了那么你可能会好奇时间轮在哪被调用了其实就是在 DelayedOperation.scala 里面执行。我们看下这个方法里面的 advanceClock。
def advanceClock(timeoutMs: Long): Unit {timeoutTimer.advanceClock(timeoutMs)if (estimatedTotalOperations.get - numDelayed purgeInterval) {estimatedTotalOperations.getAndSet(numDelayed)debug(Begin purging watch lists)val purged watcherLists.foldLeft(0) {case (sum, watcherList) sum watcherList.allWatchers.map(_.purgeCompleted()).sum}debug(Purged %d elements from watch lists..format(purged))}
}timeoutTimer.advanceClock(timeoutMs) 推进时间轮 下面的逻辑先就不用细看了。那么这个 advanceClock 方法又是在哪被调用了呢
private class ExpiredOperationReaper extends ShutdownableThread(ExpirationReaper-%d-%s.format(brokerId, purgatoryName),false) {override def doWork(): Unit {advanceClock(200L)}}这个方法会每隔 200ms 推动一次时间轮从而推动延时任务的执行。
8. 小结
好了到这里分层时间轮 Kafka 的源码就写好了下面还会介绍下 RocketMQ 的延时任务源码的逻辑不过在这之前我会简单说下 SpringBoot 的定时延时任务毕竟 Java 的框架里面肯定少不了 SpringBoot 的身影。至于 Dubbo看了下里面的时间轮源码跟 Netty 的基本一模一样 Netty 在上一篇文章里面也介绍过了所以后面就不再介绍。 如有错误欢迎提出