室内设计网站配色app,广州的服装网站建设,做网站先学美工,自己做网站服务器堆作为必须掌握的数据结构之一#xff0c;在众多场景中也得到了广泛的应用。 比较典型的#xff0c;如java中的优先队列PriorityQueue、算法中的TOP-K问题、最短路径Dijkstra算法等#xff0c;在这些经典应用中堆都担任着灵魂般的角色。
理论基础
binary heap
再一起回忆…堆作为必须掌握的数据结构之一在众多场景中也得到了广泛的应用。 比较典型的如java中的优先队列PriorityQueue、算法中的TOP-K问题、最短路径Dijkstra算法等在这些经典应用中堆都担任着灵魂般的角色。
理论基础
binary heap
再一起回忆一下堆的一个性质堆总是一棵完全二叉树。有些文章中也将堆称为二叉堆(binary heap)。 在堆中再根据堆顶点为最大值与最小值分为大顶堆与小顶堆。
新增一个元素需要进行sift-up操作其时间复杂度为O(logN)
构造二叉堆有两种方式
一种是比较简单的方式遍历每个元素进行sift-up其时间复杂度为O(N*logN)另一种是将元素以完全二叉树进行存储遍历每个非叶子节点自下而上构建子堆其时间负载度为O(N)
删除堆顶元素需要对堆末尾元素进行sift-down其时间复杂度也为O(logN)。
堆排序的过程是在构建好堆后再逐个删除堆顶元素其时间复杂度O(N(N-1)*logN)约为O(NlogN)
堆排序整体运行过程动画如下
d-ary deap
除了二叉堆外还有三叉堆、四叉堆、五叉堆这些N叉堆即维基百科中的d-ary heap。 The d-ary heap or d-heap is a priority queue data structure, a generalization of the binary heap in which the nodes have d children instead of 2. N叉堆与二叉堆进行对比由于N叉堆树的高度更低上推(sift-up)过程的时间复杂度是二叉堆的O(logN2)倍即新增元素时则会更快。
删除堆顶元素时进行sift-down操作时间复杂度为O(N * log s / log N)。(N为维度s为堆中节点个数)
在N叉堆中四叉堆由于综合性能相对稳定在N叉堆中脱颖而出。
测试数据可参考https://vearne.cc/archives/39627
GO中的应用(time.ticker源码分析)
在有了理论基础后再看下四叉堆在GO中的应用-timer(定时任务)。
ticker用法
在go项目中可以使用go自带的time.ticker进行简单的定时任务。示例代码如下
// 新建一个ticker定设置周期为1秒
ticker : time.NewTicker(time.Second * 3)
// 在一个协程接收ticker的channel回调
go func() {for {-ticker.C// 周期到达后输出当前时间fmt.Println(tick--, time.Now().String())}
}()
time.Sleep(time.Hour)输出示例为:
……
tick-- 2023-10-08 21:01:30.1830277 0800 CST m3.009288301
tick-- 2023-10-08 21:01:33.1811243 0800 CST m6.007384901
tick-- 2023-10-08 21:01:36.179331 0800 CST m9.005591601
……以上一个定时任务代码就完成了
ticker结构
以上简短的代码便完成一个定时任务的功能再来探究一下它的原理。
一个Ticker由两部分组成:
一个接收消息的channel一个runtimeTimer结构体
type Ticker struct {C -chan Time // The channel on which the ticks are delivered.r runtimeTimer
}type runtimeTimer struct {pp uintptrwhen int64period int64f func(any, uintptr) // NOTE: must not be closurearg anyseq uintptrnextwhen int64status uint32
}从NewTicker方法入手
func NewTicker(d Duration) *Ticker {if d 0 {panic(errors.New(non-positive interval for NewTicker))}c : make(chan Time, 1)t : Ticker{C: c,r: runtimeTimer{when: when(d), //下次触发时间period: int64(d),//运行周期f: sendTime,//触发时执行的动作arg: c,},}startTimer(t.r)//启动Timerreturn t
}// sendTime does a non-blocking send of the current time on c.
func sendTime(c any, seq uintptr) {select {case c.(chan Time) - Now()://将当前时间发送给等待的channeldefault://channel缓存区满了不执行任何操作}
}以上代码在NewTicker方法中创建了一个Ticker并调用了startTimer方法。 且runtimeTimer与一个ticker是一对一的关系用一个堆来存储所有的定时任务则一个ticker是一个节点。
startTimer方法
startTimer在time包下无法找到实现代码需要在go源码的runtime下查看。 如上图所示源码在src/runtime/time.go文件中。
// startTimer adds t to the timer heap.
//
//go:linkname startTimer time.startTimer
func startTimer(t *timer) {if raceenabled {racerelease(unsafe.Pointer(t))}addtimer(t)
}// Note: this changes some unsynchronized operations to synchronized operations
// addtimer adds a timer to the current P.
// This should only be called with a newly created timer.
// That avoids the risk of changing the when field of a timer in some Ps heap,
// which could cause the heap to become unsorted.
func addtimer(t *timer) {// when must be positive. A negative value will cause runtimer to// overflow during its delta calculation and never expire other runtime// timers. Zero will cause checkTimers to fail to notice the timer.if t.when 0 {throw(timer when must be positive)}if t.period 0 {throw(timer period must be non-negative)}if t.status.Load() ! timerNoStatus {throw(addtimer called with initialized timer)}t.status.Store(timerWaiting)when : t.when// Disable preemption while using pp to avoid changing another Ps heap.mp : acquirem()pp : getg().m.p.ptr()lock(pp.timersLock)cleantimers(pp)doaddtimer(pp, t)unlock(pp.timersLock)wakeNetPoller(when)releasem(mp)
}addtimer方法为关键方法。看懂addtimer的整体方法需要对go中的GMP模型有一定的了解。 G(gorountine协程)M(thread线程)P(processor处理器) 咱们这里仅看主流程直接看doaddtimer方法。
doaddtimer方法(新增节点)
// doaddtimer adds t to the current Ps heap.
// The caller must have locked the timers for pp.
func doaddtimer(pp *p, t *timer) {// Timers rely on the network poller, so make sure the poller// has started.if netpollInited.Load() 0 {// netpool如未初始化则进行初始化netpollGenericInit()}if t.pp ! 0 {throw(doaddtimer: P already set in timer)}// 给timer绑定pt.pp.set(pp)i : len(pp.timers)// 将此timer添加到p的timer集合中放到堆的末尾pp.timers append(pp.timers, t)// 堆内新增了元素进行上推操作在保持堆的特性siftupTimer(pp.timers, i)if t pp.timers[0] {pp.timer0When.Store(t.when)}// p的timer计数器加1pp.numTimers.Add(1)
}再来详细学习下4叉堆的siftup具体是如何操作的。 siftupTimer方法中的t为堆的所有元素i为要进行siftup元素的索引也就是新增的元素索引。
siftupTimer方法如下
func siftupTimer(t []*timer, i int) int {// 判断新增元素的正确性if i len(t) {badTimer()}// 获取出新增元素的具体运行时间when : t[i].whenif when 0 {badTimer()}// 新增元素的值tmp : t[i]for i 0 {// 获取出新增元素的父节点索引四叉堆时父节点索引为(i-1)/4p : (i - 1) / 4 // parentif when t[p].when {// 新增元素的运行时间晚于父节点则无需继续siftupbreak}// 将原父节点位置往下降一级t[i] t[p]// 新增元素的位置往上提升一级i p}if tmp ! t[i] {// 新增元素的值在最后确定了位置后才赋值而不是每次都进行交换t[i] tmp}return i
}runtimer方法(执行/删除节点)
当timer堆中维护好后就可以准备执行timer堆中的timer了。 此过程为持续从堆顶取出timer判断timer是否达到了执行的条件时间、状态如果条件满足就执行此timer。
执行timer的方法为time中的runtimer方法执行时主要关注runOneTimer方法。源码如下
func runtimer(pp *p, now int64) int64 {for {// 获取出当前p的堆顶timert : pp.timers[0]if t.pp.ptr() ! pp {throw(runtimer: bad p)}// 对堆顶timer的状态进行判断switch s : t.status.Load(); s {case timerWaiting:if t.when now {// Not ready to run.return t.when}if !t.status.CompareAndSwap(s, timerRunning) {// 已在运行不重复运行continue}// Note that runOneTimer may temporarily unlock// pp.timersLock.runOneTimer(pp, t, now)return 0case timerDeleted:if !t.status.CompareAndSwap(s, timerRemoving) {continue}dodeltimer0(pp)if !t.status.CompareAndSwap(timerRemoving, timerRemoved) {badTimer()}pp.deletedTimers.Add(-1)if len(pp.timers) 0 {return -1}case timerModifiedEarlier, timerModifiedLater:if !t.status.CompareAndSwap(s, timerMoving) {continue}t.when t.nextwhendodeltimer0(pp)doaddtimer(pp, t)if !t.status.CompareAndSwap(timerMoving, timerWaiting) {badTimer()}case timerModifying:// Wait for modification to complete.osyield()case timerNoStatus, timerRemoved:// Should not see a new or inactive timer on the heap.badTimer()case timerRunning, timerRemoving, timerMoving:// These should only be set when timers are locked,// and we didnt do it.badTimer()default:badTimer()}}
}// runOneTimer runs a single timer.
// The caller must have locked the timers for pp.
// This will temporarily unlock the timers while running the timer function.
//
//go:systemstack
func runOneTimer(pp *p, t *timer, now int64) {if raceenabled {ppcur : getg().m.p.ptr()if ppcur.timerRaceCtx 0 {ppcur.timerRaceCtx racegostart(abi.FuncPCABIInternal(runtimer) sys.PCQuantum)}raceacquirectx(ppcur.timerRaceCtx, unsafe.Pointer(t))}// 取出timer中的function和参数f : t.farg : t.argseq : t.seqif t.period 0 {// tick类型的timer以实际运行时间和固定周期计算出下次运行时间// Leave in heap but adjust next time to fire.delta : t.when - nowt.when t.period * (1 -delta/t.period)if t.when 0 { // check for overflow.t.when maxWhen}// siftdown堆顶节点重新调整堆siftdownTimer(pp.timers, 0)if !t.status.CompareAndSwap(timerRunning, timerWaiting) {badTimer()}updateTimer0When(pp)} else {// 非tick类型的timer执行删除// Remove from heap.dodeltimer0(pp)if !t.status.CompareAndSwap(timerRunning, timerNoStatus) {badTimer()}}if raceenabled {// Temporarily use the current Ps racectx for g0.gp : getg()if gp.racectx ! 0 {throw(runOneTimer: unexpected racectx)}gp.racectx gp.m.p.ptr().timerRaceCtx}unlock(pp.timersLock)// 执行timer的function和参数f(arg, seq)lock(pp.timersLock)if raceenabled {gp : getg()gp.racectx 0}
}在删除堆顶节点时执行的是siftdownTimer方法。其源码如下
// siftdownTimer puts the timer at position i in the right place
// in the heap by moving it down toward the bottom of the heap.
func siftdownTimer(t []*timer, i int) {n : len(t)if i n {badTimer()}// 获取出要调整节点的执行时间when : t[i].whenif when 0 {badTimer()}tmp : t[i]for {// c为调整节点的最左子节点从左往右第1个c : i*4 1 // left child// c3为调整节点的中间节点从左往右第3个c3 : c 2 // mid childif c n {break}// 最左子节点的下次执行时间w : t[c].when// 左边第2个节点的执行时间比最左子节点执行时间更先执行if c1 n t[c1].when w {// 左边部分timer排序交换最先执行的排左边w t[c1].whenc}// 判断中间节点是否存在if c3 n {// 中间子节点的timer执行时间w3 : t[c3].whenif c31 n t[c31].when w3 {// 同上将最先执行的往左排w3 t[c31].whenc3}// 子节点整体做对比左侧与右侧对比if w3 w {// 将最先执行的放在左边w w3c c3}}if w when {// 堆已调整完毕break}// 将最左的子节点向上升一级t[i] t[c]// 原i向下降一级i c}if tmp ! t[i] {// 将siftdown节点调整到最终确定的位置t[i] tmp}
}某一个timer运行时会判断此timer是否为周期性timer周期性timer会将堆顶节点进行移除再计算出下次执行时间并使用sift-down将此timer下沉到适当的位置以整体满足堆的特性。
dodeltimer0(临时性timer)
从runOneTimer方法中可以看到有两个分支分别为
timer有period(周期性定时任务类型)timer无period(仅计时类型)
前面看的siftdownTimer是周期性定时任务会执行的方法。如果为临时性定时任务如倒计时或time.sleep场景中则最终运行的为dodeltimer0方法。
源码如下
// dodeltimer0 removes timer 0 from the current Ps heap.
// We are locked on the P when this is called.
// It reports whether it saw no problems due to races.
// The caller must have locked the timers for pp.
func dodeltimer0(pp *p) {if t : pp.timers[0]; t.pp.ptr() ! pp {throw(dodeltimer0: wrong P)} else {t.pp 0}// 获取到堆中的最后一个节点last : len(pp.timers) - 1if last 0 {// 最后一个节点放到堆顶pp.timers[0] pp.timers[last]}// 删除堆中的原末尾节点pp.timers[last] nilpp.timers pp.timers[:last]if last 0 {// 对放到堆顶的原末尾节点进行siftdown操作siftdownTimer(pp.timers, 0)}// 更新timer集合updateTimer0When(pp)n : pp.numTimers.Add(-1)if n 0 {// If there are no timers, then clearly none are modified.pp.timerModifiedEarliest.Store(0)}
}从源码可以看出当临时性timer触发后会将此节点删除不会再次入堆。这个过程咱们所了解的常规堆排序的过程是一致的只是这里用的是四叉堆堆排序中用的是二叉堆。
proc.checkTimers(运行检测)
前面所提到的持续从堆顶取timer并判断是否满足执行条件的步骤在proc.checkTimers方法中也就是它才是timer执行的入口。此方法的上层调度可通过跟踪源码查看到后期再详细深入探究。
timer与robfig/cron对比
由于参与的GO项目中有常看到另一个框架https://github.com/robfig/cron看着源码不太多就浅浅看了下总结出以下几点
cron是基于timer开发的底层内部仍是使用的timercron支持的任务最小周期为秒timer的最小周期无限制cron中的某一任务是可能并行运行的而timer.tick中的同一任务不会出现同时运行的情况
比较关键的点为第3点具体选择时看具体的应用场景
总结
在数据量不太大的情况下四叉堆的综合性能比二叉堆更优GO中time.timer和time.tick是使用四叉堆实现的time.tick的任务每次运行后会重新入堆time.timer的任务每次运行后会从堆顶删除