Go 併發控制:sync-Map 詳解

我們知道,Go 中的 map 類型是非併發安全的,所以 Go 就在 sync 包中提供了 map 的併發原語 sync.Map,允許併發操作,本文就帶大家詳細解讀下 sync.Map 的原理。

使用示例

sync.Map 提供了基礎類型 map 的常用功能,使用示例如下:

package main

import (
"fmt"
"sync"
)

func main() {
var s sync.Map

// 存儲鍵值對
 s.Store("name""江湖十年")
 s.Store("age", 20)
 s.Store("location""Beijing")

// 讀取值
if value, ok := s.Load("name"); ok {
  fmt.Println("name:", value)
 }

// 刪除一個鍵
 s.Delete("age")

// 遍歷 sync.Map
 s.Range(func(key, value interface{}) bool {
  fmt.Printf("%s: %s\n", key, value)
returntrue// 繼續遍歷
 })
}

sync.Map 是一個結構體,和其他常見的併發原語一樣零值可用。Store 方法用於存儲一個鍵值對,Load 方法根據給定的鍵讀取對應的值,Delete 方法則可以刪除一個鍵值對,Range 方法則用於遍歷 sync.Map 中存儲的所有鍵值對。

以上便是 sync.Map 的基本使用方法,更多功能將會在稍後的源碼解讀部分介紹。

源碼解讀

我們可以在 sync.Map 文檔中看到其定義和實現的 exported 方法:

https://pkg.go.dev/sync@go1.23.0#Map

type Map
    func (m *Map) Clear()
    func (m *Map) CompareAndDelete(key, old any) (deleted bool)
    func (m *Map) CompareAndSwap(key, old, new any) (swapped bool)
    func (m *Map) Delete(key any)
    func (m *Map) Load(key any) (value any, ok bool)
    func (m *Map) LoadAndDelete(key any) (value any, loaded bool)
    func (m *Map) LoadOrStore(key, value any) (actual any, loaded bool)
    func (m *Map) Range(f func(key, value any) bool)
    func (m *Map) Store(key, value any)
    func (m *Map) Swap(key, value any) (previous any, loaded bool)

sync.Map 結構體不僅提供了 map 的基本功能,還提供瞭如 CompareAndSwapLoadOrStore 等複合操作,隨着我們對源碼的深入探究,你將學會這裏所有方法的原理和用途。

核心結構體

sync.Map 主要由以下幾個核心結構體組成:

https://github.com/golang/go/blob/go1.23.0/src/sync/map.go

type Map struct {
    mu   sync.Mutex               // 保護 dirty map 的互斥鎖
    read atomic.Pointer[readOnly] // 只讀 map,原子操作無需加鎖
    dirty map[any]*entry          // 可變 map,包含所有數據,操作時需要加鎖
    misses int                    // 記錄 read map 的 miss 計數
}

type readOnly struct {
    m       map[any]*entry // 存儲數據的只讀 map
    amended bool           // 是否有新的 key 只存在於 dirty map
}

type entry struct {
    p atomic.Pointer[any] // 存儲值的指針,支持原子操作
}

var expunged = new(any) // 用於標記從 dirty map 中被刪除的元素

sync.Map 結構體字段不多,不過 sync.Map 源碼整體上來說邏輯還是比較複雜的,並且細節很多,我先爲你大致梳理下 sync.Map 的設計思路,方便後續理解源碼。

我們可以粗略的將 sync.Map 的設計類比成緩存系統,當我們要讀取 map 中某個 key 對應的值時,sync.Map 會優先從緩存中讀取,如果緩存未命中,則繼續從 DB 中讀取。sync.Map 還會在特定條件滿足的情況下同步緩存和 DB 中的數據。

有了緩存系統的類比,我再爲你講解 sync.Map 的源碼,就更容易理解了。

在 sync.Map 結構體中,mu 字段是一個互斥鎖,用於保護對 dirty 屬性的操作;dirty 字段是一個 map 類型,可以類比爲緩存系統中的 DB,所以理論上 dirty 中的數據應該是全量的;read 字段是一個原子類型的指針,指向 readOnly 結構體,readOnly 內部其實也是使用 map 來存儲數據,你可以將 read 類比爲緩存;misses 字段則用於計數,記錄緩存未命中的次數,當我們要讀取 map 中某個 key 對應的值時,優先從 read 讀取,如果 read 中不存在,則 misses 值加 1,然後繼續從 dirty 中讀取,當 misses 值達到某個閾值時,sync.Map 就會將 dirty 提升爲 read

readOnly 結構體中 m 字段用於存儲 map 數據;amended 用於標記是否有新的 key 只存在於 dirty 中,而不在 read 中。當我們要讀取 sync.Map 中某個 key 對應的值時,會優先從 read.m 中讀取,如果 read.m 中不存在,read.amended 則決定了是否需要繼續從 dirty 中讀取,read.amended 爲 false 時,表示 read.m 和 dirty 中的數據一致,所以無需繼續從 dirty 中讀取;read.amended 爲 true 時,表示 read.m 中的數據要落後於 dirtyread.m 存在數據缺失,所以可以嘗試繼續從 dirty 中讀取,如果 dirty 中還是沒有,才最終確定 sync.Map 中確實沒有此 key

entry 結構體代表一個 map 中的 value,即在 sync.Map 中存儲的 key 對應的值(value),所有存入 sync.Map 中的值都會被包裝成 entry 對象,這樣可以方便標記鍵值對的狀態。我們可以看到這裏還有一個全局變量 expunged,值爲一個任意的指針,用於標記 sync.Map 中的某個鍵值對已經從 dirty 中被刪除。一個 entry 對象可能有 3 種狀態,當其內部的字段 p 值爲一個對象的指針時,表示正常狀態,即這個鍵值對正常存在於 sync.Map 中;當 p 的值爲 nil 時,表示這個鍵值對已經被刪除;當 p 的值爲 expunged 時,表示這個鍵值對已經被徹底刪除。關於這兩個刪除狀態,你可以簡單的將 nil 類比爲數據庫中的邏輯刪除,expunged 理解爲物理刪除,稍後閱讀完源碼,你就能理解其用意了。

接下來我將對 sync.Map 提供的方法進行一一講解。

核心方法

首先,我們來看 sync.Map 提供的幾個比較核心的方法 StoreLoadDeleteRange 和 Clear

Store

Store 方法可以將指定的鍵值對存入 sync.Map

Store 方法源碼如下:

func (m *Map) Store(key, value any) {
 _, _ = m.Swap(key, value)
}

不過 Store 方法實際上會將操作代理到 Swap 方法。

Swap 方法能夠交換給定 key 對應的值,並返回之前的舊值(如果存在的話),返回值 loaded 表示 key 是否存在。即 Swap 用於存儲鍵值對,它可以設置一個新的鍵值對,也可以更新一個已存在的鍵值對。

Swap 方法源碼如下:

func (m *Map) Swap(key, value any) (previous any, loaded bool) {
// 先嚐試從 read map 獲取 key 對應的 entry
 read := m.loadReadOnly()
if e, ok := read.m[key]; ok { // 如果 key 存在
// 嘗試交換值
if v, ok := e.trySwap(&value); ok { // 如果交換成功
   if v == nil { // 說明已被刪除,但沒有徹底刪除(expunged)
    returnnil, false// 新增鍵值對成功
   }
   return *v, true// 交換成功
  }
 }

// 未找到或需要修改 dirty map,需要加鎖處理
 m.mu.Lock()
 read = m.loadReadOnly()
if e, ok := read.m[key]; ok { // 如果 key 在 read map 中
if e.unexpungeLocked() {
   // 如果 entry 之前被 expunged,意味着 dirty map 不爲 nil 且該 entry 不在 dirty map 中
   m.dirty[key] = e
  }
// 進行值交換
if v := e.swapLocked(&value); v != nil {
   loaded = true
   previous = *v
  }
 } elseif e, ok := m.dirty[key]; ok { // 如果 key 在 dirty map 中
// 進行值交換
if v := e.swapLocked(&value); v != nil {
   loaded = true
   previous = *v
  }
 } else { // key 即不在 read map 中,也不在 dirty map 中
if !read.amended {
   // 添加第一個新 key 到 dirty map,需要先標記 read map 爲不完整
   m.dirtyLocked()
   m.read.Store(&readOnly{m: read.m, amended: true})
  }
// 在 dirty map 中創建新 entry
  m.dirty[key] = newEntry(value)
 }
 m.mu.Unlock()
return previous, loaded
}

Swap 方法會先嚐試從 read 中獲取給定 key 對應的值 entry,此時無需加鎖。

如果 key 存在,說明是更新操作,那麼調用 e.trySwap(&value) 嘗試交換 key 對應的值。如果返回的 ok 爲 true 說明交換成功,那麼此時有兩種情況,① 如果返回的 v 等於 nil 說明這個 key 以前存在過,但是已經被刪除了,不過還沒有被徹底刪除(expunged),此時返回的 loaded 值爲 false 標識 key 不存在,是新增鍵值對成功;② 如果 v 的值不爲 nil,說明 key 存在,是更新操作,返回的 loaded 值設爲 true。如果返回的 ok 爲 false,說明這個 key 以前存在過,但是已經被徹底刪除了expunged),交換失敗,程序繼續往下執行。

這裏也可以發現,read 其實並不完全是 “只讀”,在更新的情況下,其實是可以寫的(調用 e.trySwap(&value) 會直接修改 read.m 中的值,稍後我們將會看到源碼實現)。對於同一個鍵值對,read.m 和 dirty 指向的是同一個 entry 指針對象,所以,其實這裏修改 read 時 dirty 也會同步修改。read 真正代表的其實是無鎖操作,只有修改 dirty 的時候才需要加鎖。

如果 key 在 read 中未找到,則接下來會涉及對 dirty 的操作,所以需要加鎖。

接着,這裏會再次調用 m.loadReadOnly() 重新加載 read,並檢查 read 中是否存在 key,這也是保證併發安全的常見操作,雙重檢查(double-checking)。

如果這次檢查發現 key 在 read 中,那麼調用 e.swapLocked(&value) 將 entry 的值更新成新的值,並且這裏也會通過 m.dirty[key] = e 同時更新 dirty(調用 e.unexpungeLocked() 能確保 entry 未被標記爲 expunged)。

如果 key 不在 read 中,而在 dirty 中存在,那麼直接將其更新成新的值。

如果 key 即不在 read 中,也不在 dirty 中,則說明是新插入的鍵值對,在 dirty 中創建並保存新的 entry。此外,這裏有一個判斷 if !read.amended,如果成立,即 read.amended 值爲 false,意味着現在的 read 中數據是完整的,此時 dirty 有可能爲 nil(後續講解,你將知道何時會出現這種情況),調用 m.dirtyLocked() 能夠確保 dirty 不爲 nil,如果爲 nil 則將其初始化,並且這裏會將 read.amended 值標記爲 true,即標記 read.m 是不完整的(因爲這裏只將 newEntry(value) 賦值給了 dirty,並沒有賦值給 read),以後如果在 read.m 裏查詢不到 key,就會去 dirty 中繼續查找。

等這一切都做完後,鍵值對就一定被存儲到了 sync.Map 中,調用 m.mu.Unlock() 解鎖,並返回存儲鍵值對的結果。

接下來我們再看下 Swap 方法中的幾個依賴方法是如何實現的。

首先是用於加載 read 的 *Map.loadReadOnly 方法實現:

func (m *Map) loadReadOnly() readOnly {
 if p := m.read.Load(); p != nil {
  return *p
 }
 return readOnly{}
}

read 屬性是 atomic.Pointer 類型,其 Load 方法用於加載指針對象,*p 則可以獲取指針中的值。

調用 e.trySwap(&value) 方法可以嘗試將 entry 中的值更新,其實現如下:

func (e *entry) trySwap(i *any) (*any, bool) {
 for {
  p := e.p.Load()
  if p == expunged {
   return nil, false
  }
  if e.p.CompareAndSwap(p, i) {
   return p, true
  }
 }
}

這裏使用 for 循環來執行 CAS 操作,直到操作成功返回。

如果 entry 的值爲 expunged,說明此 entry 已被徹底刪除,那麼無法交換其值。**這是因爲,如果某個 entry 對象的值被標記爲 expunged,那麼它一定是在 read 中有,而在 dirty 中沒有,所以不要交換,不然 read 的數據就比 dirty 多了,這是不允許發生的(後續看到什麼情況下 entry 被賦值爲 expunged 你就更加清晰這裏我在說什麼了)。**如果允許交換,調用 CompareAndSwap 方法可以原子的交換值,交換成功返回 true,失敗則進行下一輪循環。

調用 e.unexpungeLocked() 能確保 entry 未被標記爲 expunged,其實現如下:

func (e *entry) unexpungeLocked() (wasExpunged bool) {
 return e.p.CompareAndSwap(expunged, nil)
}

這裏僅有一行代碼,如果 entry 的值爲 expunged,則將其交換爲 nil

調用 e.swapLocked(&value) 方法可以更新 entry 對象中的值,其實現如下:

func (e *entry) swapLocked(i *any) *any {
 return e.p.Swap(i)
}

調用 m.dirtyLocked() 能夠確保 dirty 被正確初始化,其實現如下:

func (m *Map) dirtyLocked() {
if m.dirty != nil {
return
 }

 read := m.loadReadOnly()
 m.dirty = make(map[any]*entry, len(read.m))
for k, e := range read.m {
if !e.tryExpungeLocked() {
   m.dirty[k] = e
  }
 }
}

如果 dirty 不爲 nil,則無需處理,直接返回。至於 dirty 何時會爲 nil 呢?這裏留個疑問,後續閱讀 Load 方法源碼時,你就知道了。

否則,需要初始化一個新的 dirty,然後全量遍歷 read.m,如果 !e.tryExpungeLocked() 成立,則將 entry 存入 dirty。這裏使用遍歷的方式將 read.m 拷貝到 dirty,而沒有直接全量賦值,目的是爲了遍歷每一個對象,從而過濾出已經標記爲刪除的數據(值爲 nil 和 expunged 的都會被清理),防止 read 無限增長下去(因爲 dirty 會在特定情況下提升爲 read,所以這裏 dirty 過濾掉了被刪除的數據,下次提升爲 read 時,這個新的 read 就比舊的 read 中垃圾數據更少)。

這裏用到的 e.tryExpungeLocked() 實現如下:

func (e *entry) tryExpungeLocked() (isExpunged bool) {
 p := e.p.Load()
 for p == nil {
  if e.p.CompareAndSwap(nil, expunged) {
   return true
  }
  p = e.p.Load()
 }
 return p == expunged
}

*entry.tryExpungeLocked 方法的作用是過濾掉值爲 nil 或 expunged 的 entry,即已經標記爲刪除的鍵值對。

這裏同樣使用了 CAS 操作,將 entry 值爲 nil 的, 轉換爲 expunged,表示徹底刪除。

如果操作成功,直接返回,所以我們可以發現, sync.Map 中一個值的刪除過程是,如果在 read 中先置爲了 nil,之後在某次從 read 複製全量數據到 dirty 時,會將其置爲 expunged 狀態,並且不會被拷貝到 dirty。所以此時 read 和 dirty 中就存在數據不一致了,在 read 中這個值被標記爲徹底刪除 expunged,在 dirty 沒有此鍵值對。所以我們在前文說:如果某個 entry 對象的值被標記爲 expunged,那麼它一定是在 read 中有,而在 dirty 中沒有。

你可以回顧一下前文中講的 e.unexpungeLocked() 方法能確保 entry 未被標記爲 expunged,如果 entry 的值爲 expunged,則將其改回 nil。並通過 m.dirty[key] = e 將這個 entry 賦值給 dirty,然後修改其值爲最新值。

所以 e.unexpungeLocked() 方法和 e.tryExpungeLocked() 方法其實是有連繫的,二者是相反操作,它們通過修改 entry 的值來標識一個鍵值對的狀態。其實 sync.Map 之所以這麼設計,也是爲了減少 entry 被徹底刪除的可能性,我們可以認爲,某個鍵值對被加入進來,之後刪除掉,那麼很有可能下次還會被重新加入進來,所以才搞了個 nil 作爲邏輯刪除,而非徹底從 map 中刪除,也許可以救一下。而 expunged 則用來處理某個被刪除的對象在 read 中存在,而在 dirty 中不存在的情況,當讀取 read 發現其已經被標記爲徹底刪除,那麼就沒比較加鎖再去 dirty 中查找了。

並且,我們還可以發現,因爲調用 dirtyLocked 可能全量遍歷 read.m,所以就可能會導致 sync.Map 的性能退化。

調用 newEntry(value) 可以創建一個新的 entry 對象,其實現如下:

func newEntry(i any) *entry {
 e := &entry{}
 e.p.Store(&i)
 return e
}

至此,Store 方法涉及的所有源碼,就都講解清楚了。

需要特別強調的是,以上提到的方法中,帶有 Locked 結尾的方法,都需要外部持有鎖纔可以調用,以保證併發安全。

Load

Load 方法用於從 sync.Map 中讀取給定 key 對應的值,如果 key 不存在,則返回值爲 nil,且第二個返回值 ok 爲 false

Load 方法源碼如下:

func (m *Map) Load(key any) (value any, ok bool) {
 read := m.loadReadOnly() // 加載只讀 map
 e, ok := read.m[key]     // 嘗試從 read map 中查找 key

// 如果 read map 中找不到 key,並且 dirty map 可能包含新鍵(amended 爲 true),則進入慢路徑
if !ok && read.amended {
  m.mu.Lock() // 操作 dirty 需要加鎖
// 在獲取鎖的過程中,dirty 可能已經被提升爲 read,因此需要重新加載 read map 進行 double-checking
  read = m.loadReadOnly()
  e, ok = read.m[key]
if !ok && read.amended {
   // 如果在 read map 仍然找不到,則嘗試從 dirty map 查找
   e, ok = m.dirty[key] // 從 dirty map 中查找 key
   // 無論 key 是否存在,記錄一次 miss:
   // 在 dirty map 提升爲 read map 之前,這個 key 的查詢都會走慢路徑
   m.missLocked() // miss 計數值 + 1
  }
  m.mu.Unlock()
 }

// 如果最終 key 仍然不存在,則返回 nil
if !ok {
returnnil, false
 }
return e.load() // 返回找到的值
}

同 Store 方法一樣,Load 方法也會優先加載只讀的 map 對象 read,並嘗試從 read 中查找 key

如果 ok 說明找到了,那麼可以直接調用 e.load() 加載並返回找到的值。這是一個快速路徑,不需要加鎖。

如果 !ok 說明沒找到,並且 read.amended 爲 true 時,說明 dirty 中可能包含新的鍵值對,則需要加鎖,進入到慢路徑查找。

先進行 double-checking,在獲取鎖的過程中,dirty 可能已經被提升爲 read,因此需要重新加載 read。如果這次檢查 read 還不滿足,則嘗試從 dirty 中查找,並且 misses 計數值加 1。

查找結束後進行解鎖操作,然後根據 ok 的值返回結果,如果 !ok 說明沒找到,返回 nil。否則說明查找到了 key,調用 e.load() 返回找到的值。

接下來我們同樣列出 Load 方法中幾個依賴方法的具體實現。

調用 m.missLocked() 方法可以將 misses 計數值加 1,其實現如下:

func (m *Map) missLocked() {
 m.misses++
 if m.misses < len(m.dirty) {
  return
 }
 m.read.Store(&readOnly{m: m.dirty})
 m.dirty = nil
 m.misses = 0
}

可以發現,當 misses 的次數達到 dirty 中全量數據的長度時(所以 misses 的閾值就是 dirty 中全量數據的長度),dirty 會被提升爲 read(這裏直接使用 dirty 作爲 readOnly 的 m 屬性值),這一步相當於刷新了緩存,更新成最新數據,並且 dirty 被置爲了 nilmisses 計數值清零。 所以這裏也解釋了前文中的疑問,dirty 何時會爲 nil 呢?就是在此時。那麼這個目的又是什麼呢?還記得前文中講過調用 m.dirtyLocked() 方法時,如果 dirty 值爲 nil,則會遍歷 read.m 的數據,然後過濾掉被刪除的值,將正常值存入 dirty 嗎,這就是爲了有一個機會,能夠清理被標記刪除的對象,以防止 sync.Map 無限增長。

這裏有個細節,構造 readOnly 對象時 amended 沒有賦值,默認值爲 false,標識 read 和 dirty 現在數據是等價的,read 是全量數據(其實 dirty 中此時數據爲 nil,所以此時以 read 數據爲準)。後續查找 key 時,如果 read 中沒找到,就無需再加鎖繼續查找 dirty 了。

調用 e.load() 能夠加載 entry 對象中記錄的具體值,其實現如下:

func (e *entry) load() (value any, ok bool) {
 p := e.p.Load()
 if p == nil || p == expunged {
  return nil, false
 }
 return *p, true
}

這裏無需過多解釋,就是從 atomic.Pointer 類型中獲取包含的值。

那麼現在 Load 方法也講解完成了。

其實到這裏我們可以總結出,sync.Map 之所以設計出 read + dirty 兩層結構,其實就是以空間換時間的思路,做了兩層的數據冗餘,對於 read 的操作都是原子性的,無需加鎖,而 read 中數據存在滯後的情況下,再加鎖訪問 dirty

Delete

Delete 方法用於從 sync.Map 中刪除指定的鍵值對。

Delete 方法源碼如下:

func (m *Map) Delete(key any) {
 m.LoadAndDelete(key) // 直接調用 LoadAndDelete 進行刪除,忽略其返回值
}

在 Delete 方法內部,直接調用了 LoadAndDelete 方法。LoadAndDelete 方法會刪除 key 對應的值,並返回刪除前的值(如果存在),第二個返回值 loaded 標識該 key 是否存在。

LoadAndDelete 方法源碼如下:

func (m *Map) LoadAndDelete(key any) (value any, loaded bool) {
// 先從 read map 讀取,避免加鎖
 read := m.loadReadOnly() // 加載只讀 map
 e, ok := read.m[key]
if !ok && read.amended {
// 若 read map 沒有 key,且 dirty map 可能包含該 key,則加鎖檢查 dirty map
  m.mu.Lock()
  read = m.loadReadOnly() // double-checking
  e, ok = read.m[key]
if !ok && read.amended {
   // 在 dirty map 查找 key
   e, ok = m.dirty[key]
   delete(m.dirty, key) // 刪除 dirty map 中的 key
   // 記錄一次 miss,表示該 key 走了慢路徑,直到 dirty map 提升爲 read map
   m.missLocked()
  }
  m.mu.Unlock()
 }
if ok {
// 若 key 存在,調用 entry.delete() 刪除值並返回
return e.delete()
 }
returnnil, false// read map 和 dirty map 都無此 key
}

有了前面兩個方法的講解,現在閱讀 LoadAndDelete 方法的源碼就輕鬆多了。

同樣的套路,優先加載只讀 map,先嚐試從 read 中查找 key。若 read 中沒有該 key,且 dirty 中可能包含該 key,則加鎖檢查 dirty。同樣做了 double-checking 處理,再次查詢 read 依然不滿足,才嘗試從 dirty 中查找。這裏的操作是先查找再刪除 dirty 中的 key,這也正是 Load 和 Delete 兩個操作。此外,這裏還會記錄一次 misses

Load 和 Delete 操作都結束後進行解鎖操作,之後根據 ok 的值,返回結果,如果 ok 則說明查找到了 key,調用 e.delete() 刪除值並返回,否則說明未查到該 key,返回 nil

LoadAndDelete 方法中依賴的 *entry.delete 方法實現如下:

func (e *entry) delete() (value any, ok bool) {
 for {
  p := e.p.Load()
  if p == nil || p == expunged {
   return nil, false
  }
  if e.p.CompareAndSwap(p, nil) {
   return *p, true
  }
 }
}

當 p 的值爲 nil 或 expunged 時,說明該 key 已經被刪除了,所以返回的 ok 值爲 false,否則通過 CAS 操作對其進行刪除,刪除成功返回 true

注意這裏的刪除只會將 entry 中存儲的值置爲 nil,並不會徹底刪除 entry 對象。

Range

Range 方法可以遍歷 sync.Map 中所有的鍵值對,並依次調用給定的 f(key, value) 函數,如果 f 返回 false,則停止遍歷。

Range 方法源碼如下:

func (m *Map) Range(f func(key, value any) bool) {
// 讀取當前的 read map
 read := m.loadReadOnly()

// 如果 read map 是完整的(未修改),直接遍歷
if read.amended {
// 若 read.amended == true,說明 dirty map 存在新增 key,需要提升爲 read
  m.mu.Lock()
  read = m.loadReadOnly()
if read.amended {
   // 提升 dirty map 爲 read
   read = readOnly{m: m.dirty}
   copyRead := read
   m.read.Store(©Read)
   // 清空 dirty map 並重置 miss 計數
   m.dirty = nil
   m.misses = 0
  }
  m.mu.Unlock()
 }

// 遍歷 read map
for k, e := range read.m {
  v, ok := e.load()
if !ok {
   continue
  }
// 調用用戶提供的函數 f
if !f(k, v) {
   break// f 返回 false 時停止遍歷
  }
 }
}

對於 Range 操作來說,只需要遍歷 map,即這是一個讀取操作,所以會讀取當前的 read 值,如果 read 是完整的(read.amended == false),那麼可以直接遍歷 read,並且每次 for 循環中都會調用用戶提供的函數 f,如果 f 返回 false,則停止遍歷。

如果 read.amended 值爲 true,說明 dirty 中存在新增的 key,那麼就將其提升爲 read,然後再進行 read 遍歷。因爲需要操作 dirty,所以會加鎖,並且進行了 double-checking,在提升 dirty 爲 read 後,會清空 dirty 並重置 misses 計數。所以說調用 Range 方法也是一個可能導致後面 sync.Map 會出現性能退化的原因。

這裏將 dirty 提升爲 read 的邏輯跟在 Load 方法中調用 m.missLocked() 的邏輯類似。

Clear

Clear 方法會刪除所有鍵值對,使 sync.Map 變爲空。

Clear 方法源碼如下:

func (m *Map) Clear() {
 read := m.loadReadOnly() // 加載只讀 map
// 如果 read map 爲空且 dirty map 也未修改(amended 爲 false),則無需執行清理
iflen(read.m) == 0 && !read.amended {
// 如果 map 已經爲空,則避免分配新的 readOnly。
return
 }

 m.mu.Lock() // 加鎖
defer m.mu.Unlock()

// double-checking
// 重新加載 read map,避免在獲取鎖的過程中發生變化
 read = m.loadReadOnly()
iflen(read.m) > 0 || read.amended {
// 清空 read map,存儲一個新的空 readOnly 結構
  m.read.Store(&readOnly{})
 }

// 清空 dirty map
 clear(m.dirty)
// 復位 miss 計數,防止下一次操作時剛清空的 dirty map 立即被提升到 read map
 m.misses = 0
}

Clear 方法首先會加載 read,並判斷 read.m 是否爲空,如果爲空且 dirty 中也不存在新的 key,則無需進一步處理,直接返回。否則,需要加鎖後進一步處理。在 double-checking 後會再次判斷 read.m 是否爲空,如果不爲空或 read.amended 爲 true,則清空 read。最終,再清空 dirty 並重置 misses 計數。

複合操作方法

sync.Map 不僅提供了 map 的常規操作方法,它還實現了幾個複合操作方法 LoadAndDeleteLoadOrStoreCompareAndSwap 和 CompareAndDelete。所謂複合方法,就是一個方法同時支持兩個以上的操作,而且這幾個複合方法都能夠見名知意。

其中 LoadAndDelete 方法前文中已經講解,那麼接下來就介紹下其他幾個方法。

LoadOrStore

LoadOrStore 方法用來獲取或保存一個鍵值對,如果 key 存在,則返回其當前值,否則存儲並返回給定的 valueloaded 返回值表示是否是從 map 中加載的值(true 表示 key 已存在,false 表示存儲了新值)。

LoadOrStore 方法源碼如下:

func (m *Map) LoadOrStore(key, value any) (actual any, loaded bool) {
// 優先檢查只讀 map,避免加鎖,提高併發性能
 read := m.loadReadOnly() // 加載只讀 map
if e, ok := read.m[key]; ok {
// 嘗試加載或存儲值
  actual, loaded, ok := e.tryLoadOrStore(value)
if ok {
   return actual, loaded
  }
 }

// 加鎖處理
 m.mu.Lock()
defer m.mu.Unlock()

// 進入慢路徑

// 重新加載 read map 以防數據變化
 read = m.loadReadOnly()
if e, ok := read.m[key]; ok { // key 在 read map 中
// 解除 expunged 狀態並放入 dirty map
if e.unexpungeLocked() {
   m.dirty[key] = e
  }
// 嘗試加載或存儲
  actual, loaded, _ = e.tryLoadOrStore(value)
 } elseif e, ok := m.dirty[key]; ok { // key 僅存在於 dirty map 中
  actual, loaded, _ = e.tryLoadOrStore(value)
  m.missLocked() // 記錄一次 miss
 } else { // 該 key 完全不存在
if !read.amended {
   // 這是 dirty map 第一次存入新 key,需要確保它已分配,並標記 read map 爲不完整
   m.dirtyLocked()
   m.read.Store(&readOnly{m: read.m, amended: true})
  }
// 在 dirty map 中存儲新值
  m.dirty[key] = newEntry(value)
  actual, loaded = value, false
 }

return actual, loaded
}

LoadOrStore 方法先嚐試從 read 中獲取給定 key 對應的值,如果存在,則調用 e.tryLoadOrStore(value) 嘗試加載或存儲值,並且根據返回值 ok 來決定是否返回。

其中 *entry.tryLoadOrStore 方法實現如下:

func (e *entry) tryLoadOrStore(i any) (actual any, loaded, ok bool) {
 p := e.p.Load()
if p == expunged {
// entry 被標記爲 expunged,不可修改
returnnil, false, false
 }
if p != nil {
// entry 已經存在,loaded 爲 true,返回當前值
return *p, true, true
 }

 ic := i
// 循環執行 CAS 操作,直到成功返回
for {
// 如果當前值爲 nil,則嘗試存儲新值
if e.p.CompareAndSwap(nil, &ic) {
   return i, false, true// 存儲成功,loaded 爲 false
  }

// 每次循環都要執行一次檢查
  p = e.p.Load()
if p == expunged {
   // entry 被 expunged,返回失敗
   returnnil, false, false
  }
if p != nil {
   // entry 已經存在,loaded 爲 true,返回當前值
   return *p, true, true
  }
 }
}

如果 entry 已經被標記爲 expungedtryLoadOrStore 不會修改 entry 的值,並且返回值 ok 置爲 false。如果 entry 中的值不爲 nil,說明是正常狀態的值,直接返回其 value。否則 entry 中的值被標記爲 niltryLoadOrStore 會繼續通過 CAS 操作原子地存儲或加載一個值。

如果當前 entry 中的值爲 nil,那麼 CAS 操作就會成功,返回 loaded 爲 false。在併發情況下,當前 entry 中的值可能會被其他 goroutine 修改成功,所以 CAS 操作如果失敗,則還需要對 entry 中的值狀態再次做檢查。for 循環最終會在某個 case 操作成功後返回。

然後我們再回到 LoadOrStore 方法中的邏輯繼續向下看。

如果 LoadOrStore 方法在 read 中沒有找到給定的 key,或者 e.tryLoadOrStore(value) 失敗,則需要加鎖後做進一步檢查。如果這一次在 read 中找到了給定的 key,則將 entry 記錄到 dirty 中,並調用 e.tryLoadOrStore(value) 嘗試加載或存儲值。值得注意的是,這裏會先調用 e.unexpungeLocked() 確保 dirty 未被標記爲 expunged。如果 key 不在 read 中,而在 dirty 中存在,那麼直接將其更新成新的值,並記錄一次 misses。如果 key 即不在 read 中,也不在 dirty 中,則說明是新插入的鍵值對,在 dirty 中創建並保存新的 entry

這段邏輯與 Swap 方法中的邏輯非常相似,你可以對比學習。

CompareAndSwap

CompareAndSwap 方法僅在給定的 key 存在且對應的值等於傳入的 old 時(old 必須是可比較的類型),將其替換爲 new

CompareAndSwap 方法源碼如下:

func (m *Map) CompareAndSwap(key, old, new any) (swapped bool) {
// 先嚐試從 read map 查找 key
 read := m.loadReadOnly()
if e, ok := read.m[key]; ok { // 如果 key 在 read map 中
// 直接在 entry 上嘗試 CompareAndSwap
return e.tryCompareAndSwap(old, new)
 } elseif !read.amended {
// 如果 key 不存在,並且 read map 沒有標記爲 amended,說明 dirty map 也不會有 key
returnfalse
 }

// 進入 dirty map 處理,需要加鎖
 m.mu.Lock()
defer m.mu.Unlock()
 read = m.loadReadOnly()

if e, ok := read.m[key]; ok { // 如果 key 在 read map 中
  swapped = e.tryCompareAndSwap(old, new)
 } elseif e, ok := m.dirty[key]; ok { // 如果 key 在 dirty map 中
  swapped = e.tryCompareAndSwap(old, new)
// 這裏雖然 CompareAndSwap 不會修改 map 的 key 集合,
// 但因爲涉及鎖的操作,需要增加一次 miss 計數,
// 這樣 dirty map 之後會被提升爲 read,提高訪問效率。
  m.missLocked()
 }
return swapped
}

如果給定的 key 在 read 中存在,則直接調用 e.tryCompareAndSwap(old, new) 嘗試比較並交換。

*entry.tryCompareAndSwap 方法實現如下:

func (e *entry) tryCompareAndSwap(old, new any) bool {
// 加載當前 entry 存儲的值
 p := e.p.Load()
// 如果值爲 nil(條目已刪除)或 expunged(條目已被徹底刪除)或當前值不等於 old,則返回 false
if p == nil || p == expunged || *p != old {
returnfalse
 }

// 複製 new 以優化逃逸分析,避免在比較失敗時不必要的堆分配
 nc := new
// 循環執行 CAS 操作,直到成功返回
for {
// 嘗試使用原子操作 CompareAndSwap 替換值
if e.p.CompareAndSwap(p, &nc) {
   returntrue
  }

// 每次循環都要執行一次檢查
// 重新加載最新的值,確保併發情況下仍滿足條件
  p = e.p.Load()
// 如果值發生變化,或者條目已刪除,則返回 false
if p == nil || p == expunged || *p != old {
   returnfalse
  }
 }
}

tryCompareAndSwap 方法會比較 entry 中的值是否等於給定的 old 值,如果相等且它的值未被刪除(未被標記爲 nil 或 expunged),則將其替換爲 new 值。如果它的值已被刪除或不等於 old 值,則會返回 false,且不對 entry 做任何修改。

回到 CompareAndSwap 方法邏輯中,如果給定的 key 不在 read 中且 read.amended 爲 true,則需要加鎖繼續到 dirty 中查找,且 misses 計數值加 1。

這個方法的整體邏輯還是比較簡單的。

CompareAndDelete

CompareAndDelete 方法僅在給定的 key 存在且對應的值等於 old 時刪除該鍵值對。

CompareAndDelete 方法源碼如下:

func (m *Map) CompareAndDelete(key, old any) (deleted bool) {
// 先嚐試從 read map 查找 key
 read := m.loadReadOnly()
 e, ok := read.m[key]

// 如果 key 不在 read map 中,但 read.amended == true,說明可能在 dirty map
if !ok && read.amended {
  m.mu.Lock() // 進入 dirty map 處理,需要加鎖
  read = m.loadReadOnly()
  e, ok = read.m[key]
if !ok && read.amended {
   e, ok = m.dirty[key]
   // 不直接從 m.dirty 刪除 key,而是僅執行“compare”邏輯。
   // 當 dirty map 未來被提升爲 read 時,該 entry 會被標記 expunged。
   //
   // 記錄一次 miss,這樣該 key 未來會走慢路徑,直到 dirty map 變爲 read。
   m.missLocked()
  }
  m.mu.Unlock()
 }

// 如果 key 存在於 read map 或 dirty map
for ok {
  p := e.p.Load()
// 如果值已被刪除(nil)、expunged(徹底刪除)或者值不等於 old,則返回 false
if p == nil || p == expunged || *p != old {
   returnfalse
  }
// CAS 交換值爲 nil,成功刪除
if e.p.CompareAndSwap(p, nil) {
   returntrue
  }
 }
returnfalse
}

這個方法主要邏輯也比較簡單,我就不一行一行的講解了。值得一提的是,這裏的刪除操作與 Delete 方法一樣也是通過 CAS 操作將 entry 中的值改爲 nil 來實現的。

至此,sync.Map 中的所有源碼咱們就都講解完成了。

總結

sync.Map 的用法非常簡單,它提供了 map 的基礎功能,並且還擴展了幾個複合功能。

要讀懂 sync.Map 的源碼,可以將其類比爲緩存系統,先對其設計思路做到心中有數,再閱讀源碼,不然很容易陷入到細節而不知所措。

你需要記住 sync.Map 的主體邏輯是,所有方法都優先嚐試無鎖操作 read,如果 read 條件不滿足,再嘗試加鎖操作 dirtyread 和 dirty 存儲鍵值對時,指向的是同一個 entry 對象。對於 entry 實例,你需要理解其 3 種狀態,nilexpunged 和正常指針對象,它們之間是如何轉換的。

所有對 read 的操作都無需加鎖,對 dirty 的操作才需要加鎖,再結合 entry 的 nil 和 expunged 兩種狀態,sync.Map 可以儘可能實現無鎖操作。不過,Range 方法和 m.missLocked() 操作都有可能使 dirty 爲 nil,這會導致 sync.Map 後續操作出現性能退化,需要格外小心。此外,其實嚴格來講,read 並不代表只讀 map,部分更新、刪除動作也都會操作 read,只有新插入一個鍵值對時才一定要操作 ditry

這篇文章內容有點長,有些文字寫的也很囉嗦,但這都是筆者有意而爲之,目的是讓你加深印象,多讀幾遍,理解更加深刻。只有明白了 sync.Map 的原理,才能將其用好。最後強調一點,謹慎起見,必要時還是要進行性能測試來驗證 sync.Map 是否能滿足我們的業務需求。

本文示例源碼我都放在了 GitHub 中,歡迎點擊查看。

希望此文能對你有所啓發。

延伸閱讀

聯繫我

本文由 Readfog 進行 AMP 轉碼,版權歸原作者所有。
來源https://mp.weixin.qq.com/s/Bh3j9zGccXjm6jfQL6JxmA