Go timer Code Review

概述

Go 語言中的計時器常用於定期執行某個操作或者在指定時間內執行某個操作,主要通過標準庫的 time 包中的相關方法來實現,如 time.After(), time.Tick() 等。 相信讀者已經可以熟練掌握應用層面,本文主要分析計時器功能的內部代碼實現。

K 叉堆

在正式分析源代碼之前,我們首先簡單瞭解下 K 叉堆 這種數據結構。

K 叉堆(k-ary heap)是一種基於二叉樹 (K=2) 的變體堆數據結構,在 K 叉堆中,每個節點最多有 K 個子節點。

K 叉堆和二叉堆一樣,單個節點只和其父節點、子節點之間有約束關係,鄰居節點之間沒有任何約束關係。

K 叉堆 滿足如下條件:

  1. K 叉堆是一棵完全二叉樹結構,除了葉子節點外,所有非葉子節點層都完全填充 (如果葉子節點也被完全填充,那就是一顆滿二叉樹)

  2. 和二叉堆一樣,也可以分爲最小 K 叉堆和最大 K 叉堆

# 示例: 三叉最大堆

         10
     /    |   \
   7      9     8
 / |   /
4  6  5 7


# 示例: 三叉最小堆

         10
      /   |  \
    12    11  13
  / | \
14 15 18

使用 K 叉堆 可以實現類似於二叉堆的功能,快速取出最小(或最大)元素,並維護堆的元素有序性,時間複雜度都是 O(logK N), 但是 K 叉堆的層級更低 (因爲底數 K 更大),所以 K 叉堆的插入和刪除操作時間複雜度實際上更低一些。 此外,二叉堆對數組的訪問範圍更大也更加隨機,但是 K 叉堆更集中於數組的前部,對局部性緩存更友好,有助於提升性能。

源碼路徑

計時器的源代碼文件路徑爲 $GOROOT/src/runtime/time.go,筆者的 Go 版本爲 go1.19 linux/amd64

數據結構

四叉堆

Go 的計時器實現選擇了 四叉堆 數據結構 (因爲時間涉及先後,所以是最小四叉堆),每個節點保存 4 個 timer 對象,緩存數據局部性比二叉堆好 (極端情況下性能可以翻倍), 沒有選擇更多的叉樹堆,應該是在性能和實際應用場景方面做的折衷 (畢竟大多數業務中定時器數量不會太多)。

四叉堆數據結構示意圖

下面是一個典型的的計時器堆結構示意圖 (時間單位部分只保留 分 - 秒):

計時器四叉堆示意圖

計時器對象

timer 對象是計時器的運行時表示,存儲在處理器 P 的 四叉堆 結構中,然後由處理器中的 goroutine 負責執行計時器的回調函數。

type timer struct {
 // 計時器關聯的處理器
 pp puintptr

 // 計時器下次被喚醒的時間
 when   int64
 // 計時器被喚醒的週期時間
 period int64
 // 計時器被喚醒時的回調函數
 f      func(any, uintptr)
 // 計時器被喚醒時的回調函數參數1
 arg    any
 // 計時器被喚醒時的回調函數參數2
 seq    uintptr

 // 當計時器處於 timerModifiedXX 狀態時,設置 when 字段
 // 詳情見 modtimer 函數
 nextwhen int64

 // 計時器狀態
 status atomic.Uint32
}

P 的計時器字段

每個處理器 P 中有一個用於存儲計時器的四叉堆數據字段 (timers),這樣可以提高數據局部性性能,並且避免了不同處理器之間的鎖爭用。

type p struct {
 // 處理器中第一個時間器被喚醒的時間
 // 也就是時間最早的計時器
 timer0When atomic.Int64

 // 計時器操作鎖
 timersLock mutex

 // 存儲計時器的四叉堆
 timers []*timer

 // 計時器數量
 numTimers atomic.Uint32

 // 處於 timerDeleted 狀態的計時器數量
 deletedTimers atomic.Uint32
}

G 的計時器字段

每個 goroutine 中引用了一個計時器,畢竟 goroutine 是負責具體幹活的。

type g struct {
 timer *timer
}

關係示意圖

每個處理器中的計時器最終都需要分配給 goroutine 來負責具體的執行。

處理器和計時器結構示意圖

10 種狀態

定時器共有 10 種狀態來表示整個生命週期中的相關操作和狀態變化,關聯的是 timer 對象的 status 字段。

const (
 // 初始狀態
 timerNoStatus = iota

  // 等待啓動
 timerWaiting

 // 運行
 timerRunning

 // 刪除 (不會再次運行)
 timerDeleted

 // 正在刪除中
 timerRemoving

 // 已經從堆中刪除
 timerRemoved

 // 正在修改中
 timerModifying

 // 被修改到了更早的時間 (具體的值關聯到了 nextwhen 字段)
 timerModifiedEarlier

 // 被修改到了更晚的時間 (具體的值關聯到了 nextwhen 字段)
 timerModifiedLater

 // 已經被修改並正在移動中 (四叉堆數據變化)
 timerMoving
)

狀態機

根據標準庫中源代碼的註釋,我們可以畫出如下狀態機圖示。

計時器狀態機

算法

現在我們有了定時器數據結構和對應的示意圖,剩下的就是基於數據結構形成的算法部分,開始愉快地閱讀源代碼。:-)

新增計時器

函數 addtimer 用於新增計時器。

func addtimer(t *timer) {
 // 參數合法性檢測
  ...

 // 計時器狀態修改爲等待啓動
 t.status.Store(timerWaiting)

 when := t.when

 // 禁止搶佔
 mp := acquirem()

 pp := getg().m.p.ptr()
 // 獲取計時器操作鎖
 lock(&pp.timersLock)
 // 清理處理器中的定時器
 cleantimers(pp)
 // 將參數計時器加入處理器的四叉堆中
 doaddtimer(pp, t)
 // 釋放計時器操作鎖
 unlock(&pp.timersLock)

 // 觸發調度: 喚醒網絡輪詢器中休眠的線程
 wakeNetPoller(when)

 releasem(mp)
}

函數 doaddtimer 用於將計時器添加到指定的處理器中。

func doaddtimer(pp *p, t *timer) {
 // 計時器的喚醒和執行依賴於網絡輪詢器
 if netpollInited.Load() == 0 {
  // 如果網絡輪詢器未初始化
  // 那就先進行其初始化
  netpollGenericInit()
 }

  // 計時器關聯處理器
 t.pp.set(pp)
 // 先將計時器添加到四叉堆尾部
 i := len(pp.timers)
 pp.timers = append(pp.timers, t)
 // 然後通過向上調整 (排序) 操作將計時器放到對應的位置
 siftupTimer(pp.timers, i)
 // 如果計時器調整後位於四叉堆第一個元素
 // 更新處理器的第一個計時器的喚醒時間
 if t == pp.timers[0] {
  pp.timer0When.Store(t.when)
 }
 // 更新計時器數量
 pp.numTimers.Add(1)
}

修改計時器

函數 modtimer 用於修改計時器,該函數會被網絡輪訓器、Ticker.ResetTimer.Reset 方法調用。

func modtimer(t *timer, when, period int64, f func(any, uintptr), arg any, seq uintptr) bool {
  // 參數合法性檢測
  ...

 status := uint32(timerNoStatus)
 wasRemoved := false
 var pending bool
 var mp *m
loop:
 for {
  // 狀態機內部變化
  // 這裏可以對照着上面的狀態機圖示查看源代碼
  switch status = t.status.Load(); status {
  case timerWaiting, timerModifiedEarlier, timerModifiedLater:
            ...
  case timerNoStatus, timerRemoved:
   ...
  case timerDeleted:
            ...
  case timerRunning, timerRemoving, timerMoving:
            ...
  case timerModifying:
            ...
  default:
   badTimer()
  }
 }

 t.period = period
 t.f = f
 t.arg = arg
 t.seq = seq

 if wasRemoved {
  // 如果計時器已經被刪除
  // 創建新的計時器
  // 這部分代碼和 addtimer 函數內部代碼幾乎一致,這裏直接省略 ...
        ...
 } else {
  // 計時器可能位於另一個處理器的四叉堆中
  //    讀者可以思考一下爲什麼會出現這種情況
  // 如果直接修改 when 字段,另一個處理器的四叉堆就亂序了
  // 因此將新的 when 值放入 nextwhen 字段
  // 並讓另一個處理器在四叉堆調整時設置 when 字段
  t.nextwhen = when

  // 默認計時器被修改到了更晚的時間
  newStatus := uint32(timerModifiedLater)
  if when < t.when {
   // 如果修改後時間小於修改前時間
   // 將狀態設置爲 timerModifiedEarlier
   newStatus = timerModifiedEarlier
  }

  // 如果修改後時間小於修改前時間
  // 觸發調度: 喚醒網絡輪詢器中休眠的線程
  if newStatus == timerModifiedEarlier {
   wakeNetPoller(when)
  }
 }

 return pending
}

刪除計時器

函數 deltimer 用於刪除計時器。

// 計時器可能位於另一個處理器的四叉堆中
// 所以不能直接將其刪除,否則另一個處理器的四叉堆就亂序了
// 只能將其狀態標記爲 “刪除” (類似軟刪除機制)
// 最後由計時器所在處理器刪除
func deltimer(t *timer) bool {
 for {
  // 狀態機內部變化
  // 這裏可以對照着上面的狀態機圖示查看源代碼
  switch s := t.status.Load(); s {
  case timerWaiting, timerModifiedLater:
   ...
  case timerModifiedEarlier:
            ...
  case timerDeleted, timerRemoving, timerRemoved:
            ...
  case timerRunning, timerMoving:
   ...
  case timerNoStatus:
   ...
  case timerModifying:
            ...
  default:
   badTimer()
  }
 }
}

函數 dodeltimer0 用於刪除當前處理器四叉堆的堆頂計時器。

func dodeltimer0(pp *p) {
 // 刪除堆頂計時器
 // 通過將堆頂計時器替換爲堆中最後一個計時器實現
 last := len(pp.timers) - 1
 if last > 0 {
  pp.timers[0] = pp.timers[last]
 }
 pp.timers[last] = nil
 pp.timers = pp.timers[:last]

 // 刪除完成後,重新進行堆排序
 if last > 0 {
  siftdownTimer(pp.timers, 0)
 }
 updateTimer0When(pp)
 n := pp.numTimers.Add(-1)
}

清除定時器

函數 cleantimers 用於清除計時器。

func cleantimers(pp *p) {
 gp := getg()
 for {
  // 四叉堆爲空時直接返回
  if len(pp.timers) == 0 {
   return
  }

  // 取出四叉堆頂的計時器
  t := pp.timers[0]

  // 只處理三種狀態的計時器
  switch s := t.status.Load(); s {
  case timerDeleted:
   // 將計時器狀態修改爲正在刪除中
   if !t.status.CompareAndSwap(s, timerRemoving) {
    continue
   }
   // 刪除計時器
   dodeltimer0(pp)
   // 將計時器狀態修改爲已刪除
   if !t.status.CompareAndSwap(timerRemoving, timerRemoved) {
    badTimer()
   }
   // 計時器數量減 1
   pp.deletedTimers.Add(-1)
  case timerModifiedEarlier, timerModifiedLater:
   // 將計時器狀態修改爲正在移動中
   if !t.status.CompareAndSwap(s, timerMoving) {
    continue
   }
   // 更新計時器下次被喚醒時間
   t.when = t.nextwhen
   // 首先刪除計時器
   dodeltimer0(pp)
   // 然後將計時器加入四叉堆中
   doaddtimer(pp, t)
   // 將計時器狀態修改爲等待啓動
   if !t.status.CompareAndSwap(timerMoving, timerWaiting) {
    badTimer()
   }
  default:
   return
  }
 }
}

調整計時器

函數 adjusttimers 用於調整計時器,內部主要處理三種狀態的計時器:

  1. timerDeleted

  2. timerModifiedEarlier

  3. timerModifiedLater

func adjusttimers(pp *p, now int64) {
 ...

 var moved []*timer
 for i := 0; i < len(pp.timers); i++ {
  // 遍歷四叉堆,逐個處理計時器
  t := pp.timers[i]

  switch s := t.status.Load(); s {
  case timerDeleted:
   if t.status.CompareAndSwap(s, timerRemoving) {
    // 刪除計時器
    changed := dodeltimer(pp, i)
    // 將計時器狀態修改爲已刪除
    if !t.status.CompareAndSwap(timerRemoving, timerRemoved) {
     badTimer()
    }
    // 計時器數量減 1
    pp.deletedTimers.Add(-1)
   }
  case timerModifiedEarlier, timerModifiedLater:
   if t.status.CompareAndSwap(s, timerMoving) {
    // 更新計時器下次被喚醒時間
    t.when = t.nextwhen
                // 首先刪除計時器
    changed := dodeltimer(pp, i)
    // 然後將計數器放入一個單獨的隊列
    moved = append(moved, t)
   }

      ...
  }
 }

 if len(moved) > 0 {
  //
  addAdjustedTimers(pp, moved)
 }

 if verifyTimers {
  // 檢測四叉堆中的計時器是否存在異常
  verifyTimerHeap(pp)
 }
}

函數 dodeltimer 用於刪除處理器中指定索引的計時器,並返回四叉堆排序中受到影響的計時器索引最小值,其內部實現和 dodeltimer0 函數差不多,這裏不在贅述。

func dodeltimer(pp *p, i int) int {
    ...
}

函數 addAdjustedTimers 將參數計時器隊列中的所有計時器加入四叉堆中。

func addAdjustedTimers(pp *p, moved []*timer) {
 for _, t := range moved {
  doaddtimer(pp, t)
  // 將計時器狀態修改爲等待喚醒
  if !t.status.CompareAndSwap(timerMoving, timerWaiting) {
   badTimer()
  }
 }
}

運行計時器

函數 runtimer 用於計時器運行,內部會檢查四叉堆的堆頂計時器,如果堆頂計時器準備就緒可以啓動運行,那就直接運行並在運行完成後刪除或更新該計時器 (取決於計時器的具體類型)。 如果有計時器運行,返回 0,如果沒有計時器,返回 -1, 如果沒有可運行的計時器,返回堆頂計時器的下次啓動時間。

func runtimer(pp *p, now int64) int64 {
 for {
  // 四叉堆頂元素
  t := pp.timers[0]

  // 狀態機內部變化
  // 這裏可以對照着上面的狀態機圖示查看源代碼
  switch s := t.status.Load(); s {
  case timerWaiting:
   if t.when > now {
    // 沒有可運行的計時器,返回堆頂計時器的下次啓動時間
    return t.when
   }

   // 運行計時器
   runOneTimer(pp, t, now)
   // 有計時器運行,返回 0
   return 0

  case timerDeleted:
   // 刪除堆頂計時器
   dodeltimer0(pp)
   // 計時器數量減 1
   pp.deletedTimers.Add(-1)
   if len(pp.timers) == 0 {
    // 如果沒有計時器,返回 -1
    return -1
   }

  ...
 }
}

函數 runOneTimer 用於運行單個計時器。

func runOneTimer(pp *p, t *timer, now int64) {
 f := t.f
 arg := t.arg
 seq := t.seq

 // 如果計時器喚醒週期大於 0
 if t.period > 0 {
  // 修改計時器下一次運行時間
  delta := t.when - now
  t.when += t.period * (1 + -delta/t.period)
  if t.when < 0 {
   t.when = maxWhen
  }
  // 更新計時器在四叉堆中的排序 (位置)
  siftdownTimer(pp.timers, 0)
  // 修改計時器狀態爲等待啓動
  if !t.status.CompareAndSwap(timerRunning, timerWaiting) {
   badTimer()
  }
  updateTimer0When(pp)
 } else {
  // 如果計時器喚醒週期小於 0
  // 將計時器從堆中刪除
  dodeltimer0(pp)
  // 將計時器狀態改爲初始狀態
  if !t.status.CompareAndSwap(timerRunning, timerNoStatus) {
   badTimer()
  }
 }

 // 執行計時器回調函數
 f(arg, seq)

 ...
}

調度器觸發條件

調度器

函數 checkTimers 用於運行指定處理器中的計時器,有兩個調用方:

  1. runtime.findRunnable 函數用於獲取可運行的 goroutine

  2. runtime.stealWork 函數用於從其他處理器竊取 goroutine 或者計時器

func checkTimers(pp *p, now int64) (rnow, pollUntil int64, ran bool) {
 // 選出離當前時間最近的計時器
 next := pp.timer0When.Load()
 nextAdj := pp.timerModifiedEarliest.Load()
 if next == 0 || (nextAdj != 0 && nextAdj < next) {
  next = nextAdj
 }

 if next == 0 {
  // 沒有可以啓動運行的計時器
  // 直接返回
  return now, 0, false
 }

 if now == 0 {
   // 如果參數 now 爲 0,將其設置爲當前時間
  now = nanotime()
 }

 // 沒有可以啓動的計時器
 if now < next {
  // 如果參數處理器和當前處理器不一致,直接返回
  // 如果需要刪除的計時器數量比例低於 25%,直接返回
  if pp != getg().m.p.ptr() || int(pp.deletedTimers.Load()) <= int(pp.numTimers.Load()/4) {
   return now, next, false
  }
 }


 // 如果處理器中還有計時器
 // 首先進行一次調整,詳情見: adjusttimers 函數
 if len(pp.timers) > 0 {
  adjusttimers(pp, now)
  for len(pp.timers) > 0 {
   // 計時器運行
   if tw := runtimer(pp, now); tw != 0 {
    if tw > 0 {
     // 如果沒有可運行的計時器,使用堆頂計時器的下次啓動時間
     pollUntil = tw
    }
    break
   }
   ran = true
  }
 }

 // 如果參數處理器和當前處理器一致 (避免鎖爭用)
 // 並且需要刪除的計時器數量比例高於 25%
 // 刪除處理器的四叉堆中狀態被標記爲 timerDeleted 的所有計時器
 if pp == getg().m.p.ptr() && int(pp.deletedTimers.Load()) > len(pp.timers)/4 {
  clearDeletedTimers(pp)
 }

 return now, pollUntil, ran
}

系統監控

獲取最近的計時器

函數 timeSleepUntil 返回所有處理器中離當前時間最近的計時器的啓動時間,如果沒有任何計時器,就返回常量 maxWhen

該函數只能被 sysmon 函數 (監控線程方法) 和 checkdead (檢測死鎖) 函數調用。

func timeSleepUntil() int64 {
 next := int64(maxWhen)

 // 遍歷所有處理器
 for _, pp := range allp {
  w := pp.timer0When.Load()
  if w != 0 && w < next {
   next = w
  }

  // 獲取處於臨時轉換狀態的計時器時間
  w = pp.timerModifiedEarliest.Load()
  if w != 0 && w < next {
   next = w
  }
 }

 return next
}

sysmon 監控

sysmon 監控在之前的 GMP 調度器 一文中已經提到過,這裏不再贅述,着重看一下內部調用 timeSleepUntil 函數的部分。

func sysmon() {
 ...

 for {
  ...

  now := nanotime()
  if debug.schedtrace <= 0 && (sched.gcwaiting.Load() || sched.npidle.Load() == gomaxprocs) {
   if sched.gcwaiting.Load() || sched.npidle.Load() == gomaxprocs {
    // 獲取最近的計時器的啓動時間
    // 根據計時器的啓動時間調整休眠時間,便於第一時間啓動定時器
    next := timeSleepUntil()
    if next > now {
        ...
    }
   }
  }

  ...
 }
}

小結

本文着重介紹了 Go 中計時器的數據結構和狀態操作相關的算法,Go 的計時器採用 四叉堆 數據結構並且直接綁定到處理器 P 上, 通過 GMP 全局調度體系可以直接管理所有處理器中的計時器,並且使單個計時器的啓動延遲最小化。文中沒有摘錄計時器四叉堆的算法相關代碼, 感興趣的讀者可以自行閱讀 runtime.siftupTimer, runtime.siftdownTimer 兩個函數。

Reference

鏈接

[1]

d-ary heap: https://en.wikipedia.org/wiki/D-ary_heap

[2]

Quadradic heap: https://en.wikipedia.org/wiki/Quadradic_heap

[3]

K-ary Heap: https://www.geeksforgeeks.org/k-ary-heap/

[4]

Go 語言設計與實現: https://draveness.me/golang/docs/part3-runtime/ch06-concurrency/golang-timer/

本文由 Readfog 進行 AMP 轉碼,版權歸原作者所有。
來源https://mp.weixin.qq.com/s/BMMZX4F2oatPyksZtbczuQ