Go sync-Pool 性能優化的利器

sync.Pool 簡介

sync.Pool 是什麼

sync.Pool 是 Go 語言標準庫中提供的一個用於對象複用的工具,它具有以下特點:

  1. 對象緩存:使用 Get、Put 方法可以獲取和歸還 sync.Pool 中的數據,從而減輕內存分配與垃圾回收的壓力;

  2. 自動回收:sync.Pool 中的對象可能會被自動回收。這意味着即使你將對象放入池中,也不能保證該對象會一直存在於池中。當內存緊張或者垃圾回收器運行時,sync.Pool 中的對象可能會被回收,所以 sync.Pool 適用於存儲那些臨時使用、生命週期較短的對象;

  3. 併發安全:sync.Pool 對於併發訪問是安全的。多個 goroutine 可以同時從池中獲取和放回對象,而不需要額外的同步機制;

  4. 狀態不可靠:由於 sync.Pool 中的對象可能會被自動回收,所以不能依賴池中對象的狀態。在從池中獲取對象後,應該根據具體的使用場景對對象進行初始化,以確保對象處於正確的狀態。

sync.Pool 適用場景

  1. 頻繁創建和銷燬對象的場景
func (engine *Engine) ServeHTTP(w http.ResponseWriter, req *http.Request) {
    // 從 Pool 中獲取 Context
    c := engine.pool.Get().(*Context)
    // 重置 writermem
    c.writermem.reset(w)
    // 重置 Request
    c.Request = req
    // 重置其他字段
    c.reset()
    
    engine.handleHTTPRequest(c)
    
    // 將 Context 對象放回 Pool
    engine.pool.Put(c)
}
func Println(a ...any) (n int, err error) {
    return Fprintln(os.Stdout, a...)
}

func Fprintln(w io.Writer, a ...any) (n int, err error) {
    // 從 Pool 中獲取 pp 對象,並初始化
    p := newPrinter()
    p.doPrintln(a)
    n, err = w.Write(p.buf)
    
    // 清理 pp 對象狀態,並向 Pool 歸還 pp 對象
    p.free()
    return
}


func newPrinter() *pp {
    p := ppFree.Get().(*pp)
    p.panicking = false
    p.erroring = false
    p.wrapErrs = false
    p.fmt.init(&p.buf)
    return p
}

func (p *pp) free() {
    if cap(p.buf) > 64*1024 {
       p.buf = nil
    } else {
       p.buf = p.buf[:0]
    }
    if cap(p.wrappedErrs) > 8 {
       p.wrappedErrs = nil
    }

    p.arg = nil
    p.value = reflect.Value{}
    p.wrappedErrs = p.wrappedErrs[:0]
    ppFree.Put(p)
}
  1. 減少內存分配壓力的場景

sync.Pool 用法

基礎用法

package main

import (
    "fmt"
    "sync"
)

type Person struct {
    Name string
    Age  int
}

type personPool struct {
    pool sync.Pool
}

func (pp *personPool) Get(name string, age int) (p *Person, err error) {
    // 從池中獲取一個對象
    p, ok := pp.pool.Get().(*Person)
    if !ok {
       return nil, err
    }

    // 初始化
    p.Name = name
    p.Age = age

    return p, nil
}

func (pp *personPool) Put(p *Person) {
    // 清理狀態
    p.Name = ""
    p.Age = 0

    // 歸還
    pp.pool.Put(p)
}

var PersonPool = &personPool{
    pool: sync.Pool{
       New: func() interface{} {
          return new(Person)
       },
    },
}

func main() {
    // 從池中獲取一個對象
    p1, err := PersonPool.Get("tom", 23)
    if err != nil {
       fmt.Println(err)
    }

    fmt.Println("Got person from pool:", p1.Name)

    // 使用完畢後放回池中
    PersonPool.Put(p1)
}
  1. 通常會把 sync.Pool 對象聲明成全局變量,因爲 sync.Pool 對象作爲緩存池通常不需要頻繁創建和回收;

  2. 在需要臨時對象時使用 Get 方法獲取,通常 Get 之後需要進行斷言和一些初始化操作,Get 之後的數據狀態是不確定的。

  3. 使用完臨時對象後需要使用 Put 方法將臨時對象重新放入緩存池中,比較好的做法是在 Put 前要對臨時對象做一些清理工作,以免影響下一次複用。

性能對比

package main

import (
    "sync"
    "testing"
)

type MyObject struct {
    Data []byte
}

var objectPool = sync.Pool{
    New: func() interface{} {
       return &MyObject{
          Data: make([]byte, 0, 1024),
       }
    },
}

func withoutPool() {
    obj := &MyObject{
       Data: make([]byte, 0, 1024),
    }
    // 模擬對對象的使用
    _ = obj.Data
}

func withPool() {
    obj := objectPool.Get().(*MyObject)
    // 模擬對對象的使用
    _ = obj.Data
    objectPool.Put(obj)
}

func BenchmarkWithoutPool(b *testing.B) {
    b.ReportAllocs()
    for i := 0; i < b.N; i++ {
       withoutPool()
    }
}

func BenchmarkWithPool(b *testing.B) {
    b.ReportAllocs()
    for i := 0; i < b.N; i++ {
       withPool()
    }
}

>>> go test -bench=.
BenchmarkWithoutPool-12         74913964                14.67 ns/op            0 B/op          0 allocs/op
BenchmarkWithPool-12            135602400                8.755 ns/op           0 B/op          0 allocs/op
PASS
ok      pool    3.962s

通過基準測試可以看出,使用 sync.Pool 可以提高大概 40% 的性能,在實際開發中我們也可以使用 sync.Pool 提高我們服務的性能。

注意事項

在使用 sync.Pool 時,有以下幾點需要注意:

對象狀態不可預期

  1. 由於 sync.Pool 中的對象可能會被隨時回收和複用,不能依賴池中對象的初始狀態。當從池中獲取一個對象時,它可能處於任何狀態,不能假定其字段已經被正確初始化。

  2. 例如,如果你從池中獲取一個結構體對象用於存儲數據,在使用前必須明確地對其進行初始化操作,以確保數據的準確性和一致性。否則,可能會出現不可預測的錯誤結果。

package main

import (
    "fmt"
    "runtime"
    "sync"
)

type Person1 struct {
    Name string
}

var personPool1 = sync.Pool{
    New: func() interface{} {
       return &Person1{
          Name: "default_name",
       }
    },
}

func main() {
    // 從池中獲取一個對象
    p1 := personPool1.Get().(*Person1)
    p1.Name = "Alice"
    fmt.Println("Got person from pool:", p1.Name)

    // 使用完畢後放回池中
    personPool1.Put(p1)

    // 再次從池中獲取,可能會獲取到之前放回去的對象
    p2 := personPool1.Get().(*Person1)
    fmt.Println("沒有 GC 的對象:", p2.Name)

    // 執行 GC
    runtime.GC()

    // GC 後,可能會獲取到一個全新的對象
    p3 := personPool1.Get().(*Person1)
    fmt.Println("GC 後的對象:", p3.Name)
}

>>> go run main2.go
Got person from pool: Alice
沒有 GC 的對象: Alice
GC 後的對象: default_name

由於 GC 的執行時機和 Put 的內容我們很難掌控,所以通過 Get 得到數據的狀態是不確定的。

併發安全但仍需謹慎

  1. sync.Pool 的併發安全只能保證多個 goroutine 可以同時從池中獲取和放回對象,無需額外的同步機制。然而,在實際使用中,如果對從池中獲取的對象進行復雜的操作,且這些操作涉及到多個 goroutine 之間的交互,仍需要謹慎考慮是否需要額外的同步措施。

  2. 比如,多個 goroutine 同時獲取到同一個對象並嘗試修改其狀態,如果不加以適當的同步控制,可能會導致數據競爭和不一致的問題。

func main() {
    // 從池中獲取一個對象,併發安全
    p1, err := PersonPool.Get("tom", 23)
    if err != nil {
       fmt.Println(err)
    }

    wg := sync.WaitGroup{}
    mtx := sync.Mutex{}
    for i := 0; i < 100; i++ {
       wg.Add(1)
       go func() {
          defer wg.Done()
          // 非併發安全,需要其他同步機制
          mtx.Lock()
          p1.Score += 1
          mtx.Unlock()
       }()
    }
    wg.Wait()

    fmt.Println("Got person from pool:", p1.Name, p1.Score)

    // 使用完畢後放回池中,併發安全
    PersonPool.Put(p1)
}

適用場景特定

  1. sync.Pool 適用於存儲那些臨時使用、生命週期較短的對象。對於需要長期保存狀態或者具有複雜生命週期管理需求的對象,不適合使用 sync.Pool。

  2. 例如,對於需要在多個不同的操作階段都保持一致狀態的業務對象,使用 sync.Pool 可能會導致狀態丟失和不可預測的行爲。而對於一些純粹的臨時計算結果的存儲對象,sync.Pool 則可以有效地提高性能和減少內存分配壓力。

sync.Pool 的工作原理

數據結構

Pool 結構體

type Pool struct {
    // noCopy 用於防止 Pool 被複制(可以使用 go vet 檢測)
    noCopy noCopy
    
    // local 的主要作用是,多個 goroutine 同時訪問 Pool 時,可以減少競爭,提升性能。
    // 實際類型是 [P]poolLocal。長度是 localSize。
    local unsafe.Pointer
    // []poolLocal 的長度。也就是 local 數組的長度
    localSize uintptr
    
    // 存放的是上一輪 GC 時的 local 字段的值。
    victim unsafe.Pointer
    // victim 數組的長度
    victimSize uintptr
    
    // 新建對象的方法。
    // Get 的時候如果 Pool 中沒有對象可用,會調用這個方法來新建一個對象。
    New func() any
}

字段說明:

爲了提高性能 sync.Pool 爲每個 P 都分配了一個 poolLocal ,這樣就避免了競態問題:

poolLocal 結構體

type poolLocal struct {
    poolLocalInternal

    // Prevents false sharing on widespread platforms with
    // 128 mod (cache line size) = 0 .
    pad [128 - unsafe.Sizeof(poolLocalInternal{})%128]byte
}

poolLocal 是爲每個 P 分配的緩存池,poolLocal 有兩個字段,其中 pad 是一個填充字段,是防止出現僞共享的問題,poolLocalInternal 字段是真正的存儲數據的。

poolLocalInternal 結構體

type poolLocalInternal struct {
    private any       // Can be used only by the respective P.
    shared  poolChain // Local P can pushHead/popHead; any P can popTail.
}

type poolChain struct {
    head *poolChainElt
    tail *poolChainElt
}

poolLocalInternal 是真正存儲數據的,private 字段是當前 P 私有的,保存的數據只有當前 P 才能獲取,shared 字段是一個雙向鏈表,長度會動態調整,當前 P 和 其他 P 都能從 shared 裏面獲取數據,區別是當前 P 是使用 popHead()、pushHead() 獲取或填充數據, 其他 P 只能使用 popTail() 獲取數據。

poolChainElt 結構體

type poolChainElt struct {
    poolDequeue
    next, prev *poolChainElt
}

type poolDequeue struct {
    headTail uint64
    vals []eface
}

poolChainElt 是鏈表的一個節點,prev 和 next 是指向前後節點的指針,poolDequeue 是一個環形隊列,vals 是環形隊列保存數據的地方,headTail 的高 32 位保存 head 節點的索引,低 32 位保存 tail 節點的索引。

整體數據結構

Put 方法

Put 源碼

func (p *Pool) Put(x any) {
    // 如果 Put 的值是 nil,則直接返回
    if x == nil {
       return
    }
    
    // ...
    
    // 將當前的 goroutine 和 P 綁定,禁止被搶佔,返回當前 P 的本地緩存(poolLocal)和 P 的 ID
    l, _ := p.pin()
    
    
    if l.private == nil {
       // 如果本地 private 爲空,則將 x 放入本地 private
       l.private = x
    } else {
       // 如果本地 private 不爲空,則將 x 放入本地 shared 的頭部
       l.shared.pushHead(x)
    }
    
    // 將當前的 goroutine 和 P 解綁,允許被搶佔
    runtime_procUnpin()
    // ...
}

Pool Put 的流程:

  1. 如果 Put 的值是 nil,則直接返回。

  2. 將當前的 goroutine 和 P 綁定,禁止被搶佔,返回當前 P 的本地緩存(poolLocal)和 P 的 ID。

  3. 如果本地 private 爲空,則將 x 放入本地 private。

  4. 如果本地 private 不爲空,則將 x 放入本地 shared 的頭部。

  5. 將當前的 goroutine 和 P 解綁,允許被搶佔。

pin()

// 將當前的 goroutine 和 P 綁定,禁止被搶。
func (p *Pool) pin() (*poolLocal, int) {
    // procPin 函數的目的是爲了當前 G 綁定到 P 上。
    pid := runtime_procPin() // 返回當前 P 的 id。
    
    // 在 pinSlow 中,我們會存儲 local,然後再存儲 localSize,
    // 這裏我們以相反的順序讀取。 由於我們禁用了搶佔,
    // 因此 GC 不能在兩者之間發生。
    s := runtime_LoadAcquintptr(&p.localSize) // load-acquire
    l := p.local                              // load-consume
    if uintptr(pid) < s {                     // pid < s,說明當前 P 已經初始化過了。
        return indexLocal(l, pid), pid // 返回當前 P 的 poolLocal。
    }
    
    return p.pinSlow() // 如果當前 P 沒有初始化過,那麼就調用 pinSlow()}
  1. 調用 runtime_procPin 將當前協程與 P 綁定,禁用搶佔,並返回當前 P 的 id,完成後需要調用 runtime_procUnpin() 進行解綁,禁止搶佔的目的是防止發生 GC 導致 p.local 的長度和 p.localSize 不一致;

  2. pid < s 時說明當前 P 已經初始化過了,調用 indexLocal 返回當前 P 的 poolLocal。

  3. 如果當前 P 沒有初始化過,那麼就調用 pinSlow()

indexLocal 邏輯比較簡單,通過偏移量來獲取對應 pid 的 poolLocal,之前的文章《深入理解 go unsafe》介紹過如何通過偏移量來獲取數組的元素。

func indexLocal(l unsafe.Pointer, i int) *poolLocal {
    lp := unsafe.Pointer(uintptr(l) + uintptr(i)*unsafe.Sizeof(poolLocal{}))
    return (*poolLocal)(lp)
}

pinSlow()

func (p *Pool) pinSlow() (*poolLocal, int) {
    // 在互斥鎖下重試。
    // 在固定時無法鎖定互斥鎖。
    runtime_procUnpin()       // 解除當前 P 的綁定。
    allPoolsMu.Lock()         // 加全局鎖。
    defer allPoolsMu.Unlock() // 解鎖。
    pid := runtime_procPin()  // 重新綁定當前 P。
    // 在綁定時不會調用 poolCleanup。(無法被搶佔,GC 不會發生)
    s := p.localSize
    l := p.local
    
    if uintptr(pid) < s { // 這其實是一個 double-checking,如果在加鎖期間,其他 goroutine 已經初始化過了,就直接返回。
        return indexLocal(l, pid), pid
    }
    
    // p.local == nil 說明 pool 還沒有初始化過。
    if p.local == nil { // 如果當前 P 沒有初始化過,那麼就將當前 P 添加到 allPools 中。
        allPools = append(allPools, p)
    }
    
    // 當 local 數組爲空,或者和當前的 runtime.GOMAXPROCS 不一致時,
    // 將觸發重新創建 local 數組,以和 P 的個數保持一致。
    // 如果在 GC 之間更改了 GOMAXPROCS,我們將重新分配數組並丟棄舊數組。
    size := runtime.GOMAXPROCS(0)                            // 獲取當前 GOMAXPROCS(也就是 P 的個數)
    local := make([]poolLocal, size)                         // 創建一個 poolLocal 數組
    atomic.StorePointer(&p.local, unsafe.Pointer(&local[0])) // 原子操作:將數組的地址賦值給 p.local, &local[0] 的地址就是數組的起始地址
    runtime_StoreReluintptr(&p.localSize, uintptr(size))     // 原子操作:將 local 的長度賦值給 p.localSize
    return &local[pid], pid                                  // 返回當前 P 關聯的 poolLocal,以及當前 P 的 id。
}

pinSlow 的流程:

  1. 解除當前 P 的綁定。

  2. 加全局 Pool 的鎖。

  3. 重新綁定當前 P,重新綁定的目的是防止搶佔,以免發生 GC 時 localSize 和 local 的長度不一致。

  4. 如果當前 P 的 id 小於 localSize,那麼就返回當前 P 的 poolLocal。(典型的 double-checking)

  5. 如果 local 還沒初始化,那麼將當前 P 的 poolLocal 添加到 allPools 中。

  6. 初始化 local。最後返回當前 P 的 poolLocal。

shared.pushHead()

// 添加一個元素到隊列頭部
func (c *poolChain) pushHead(val any) {
    // 鏈表頭
    d := c.head
    if d == nil { // 鏈表爲空
        // 初始化鏈表。
        // 新建 poolChainElt,然後 c 的 head 和 tail 都指向這個新建的元素。
        const initSize = 8 // 初始化大小爲 8
        // 新建一個節點,類型爲 poolChainElt
        d = new(poolChainElt)
        d.vals = make([]eface, initSize)
        // 將 c 的 head 和 tail 都指向這個新建的元素
        c.head = d
        // 使用原子操作保存 c.tail,因爲其他 goroutine 也可能會修改 c.tail。
        storePoolChainElt(&c.tail, d)
    }
    
    // poolQueue 還沒滿的時候可以成功 push,pushHead 會返回 true。
    // poolQueue 滿的時候 pushHead 返回 false。
    if d.pushHead(val) {
        return
    }
    
    // 當前 dequeue 已滿。分配一個兩倍大小的新 dequeue。
    newSize := len(d.vals) * 2
    if newSize >= dequeueLimit { // 限制單個 dequeue 的最大大小
        newSize = dequeueLimit
    }
    
    // 新建 poolChainElt,然後 c 的 head 指向這個新建的元素。
    // 同時,d 的 next 指向這個新建的元素。
    d2 := &poolChainElt{prev: d} // 因爲是加到隊列頭,所以 prev 指向 d
    d2.vals = make([]eface, newSize)
    c.head = d2
    storePoolChainElt(&d.next, d2)
    d2.pushHead(val)
}
  1. 如果鏈表爲空時,初始化鏈表,新建一個 poolChainElt 的鏈表節點並放入鏈表中,鏈表節點內部的 poolQueue 是一個環形隊列。

  1. 然後嘗試將 val 添加到鏈表頭部,頭部節點的 poolQueue 還沒滿的時候可以成功 push,pushHead 會返回 true,poolQueue 滿的時候 pushHead 返回 false。

  2. 如果頭部節點的 poolQueue 已經滿了,就會再生成一個鏈表節點,新生成的鏈表節點的環形隊列長度是前一個節點環形隊列長度的兩倍。

Get 方法

Get 源碼

// Get 從 Pool 中獲取一個對象
func (p *Pool) Get() any {
    // ...
    // pin 將當前的 goroutine 和 P 綁定,禁止被搶佔,返回當前 P 的本地緩存(poolLocal)和 P 的 ID。
    l, pid := p.pin()
    // 先看 private 是否爲 nil,如果不爲 nil,就直接返回 private,並將 private 置爲 nil。
    x := l.private
    l.private = nil
    if x == nil {
        // 嘗試從本地 shared 的頭部取。
        x, _ = l.shared.popHead()
        if x == nil { // 如果本地 shared 的頭部取不到,就從其他 P 的 shared 的尾部取。
            x = p.getSlow(pid)
        }
    }
    // 將當前的 goroutine 和 P 解綁,允許被搶佔。
    runtime_procUnpin()
    // ...
    // 如果 x 爲 nil 並且 p.New 不爲 nil,則返回 p.New() 的結果。
    // 沒有就 New 一個。
    if x == nil && p.New != nil {
        x = p.New()
    }
    
    return x
}

Pool Get 的流程可以總結如下:

  1. 將當前的 goroutine 和 P 綁定,禁止被搶佔,返回當前 P 的本地緩存(poolLocal)和 P 的 ID;

  2. 從本地 private 取,如果取不到,就從本地 shared 的頭部取,如果取不到,再調用 getSlow 獲取;

  3. 將當前的 goroutine 和 P 解綁,允許被搶佔;

  4. 如果 p.New 不爲 nil,則返回 p.New 的結果。

getSlow

// 從其他 P 的 shared 的尾部取。
func (p *Pool) getSlow(pid int) any {
    // 獲取 local 的大小和 local。
    size := runtime_LoadAcquintptr(&p.localSize) // load-acquire
    locals := p.local                            // load-consume
    // 嘗試從其他 P 的 shared 的尾部取。
    for i := 0; i < int(size); i++ {
        l := indexLocal(locals, (pid+i+1)%int(size)) // pid+i+1 的用途從下一個 P 開始取。
        if x, _ := l.shared.popTail(); x != nil { // 嘗試從每一個 P 的 shared 的尾部取,獲取到則返回。
            return x
        }
    }
    
    // 嘗試從 victim cache 取。
    // 我們在嘗試從所有主緩存中偷取之後執行此操作,
    // 因爲我們希望 victim cache 中的對象儘可能地老化。
    size = atomic.LoadUintptr(&p.victimSize)
    if uintptr(pid) >= size { // 如果 pid 大於 size,會發生越界,直接返回 nil。這意味着 gomaxprocs 相比上一次 poolCleanup 的時候變大了。
        return nil
    }
    
    locals = p.victim
    l := indexLocal(locals, pid)
    if x := l.private; x != nil { // victim 實際上也是一個 poolLocal 數組,每個 poolLocal 都有一個 private 字段,這個字段就是 victim cache。
        l.private = nil
        return x
    }
    for i := 0; i < int(size); i++ {
        l := indexLocal(locals, (pid+i)%int(size))
        if x, _ := l.shared.popTail(); x != nil {
            return x
        }
    }
    
    // 將 victim cache 標記爲空,以便將來的 Get 不會再考慮它。
    atomic.StoreUintptr(&p.victimSize, 0)
    
    return nil
}

getSlow 的主要流程:

  1. 從其他 P 對應的 poolLocal 中獲取緩存對象;

  2. 如果獲取不到再從 victim 中獲取緩存對象。

Get 整體流程

poolCleanup() 函數

var (
    allPoolsMu Mutex

    allPools []*Pool

    oldPools []*Pool
)


func poolCleanup() {
    for _, p := range oldPools {
       p.victim = nil
       p.victimSize = 0
    }

    // Move primary cache to victim cache.
    for _, p := range allPools {
       p.victim = p.local
       p.victimSize = p.localSize
       p.local = nil
       p.localSize = 0
    }

    oldPools, allPools = allPools, nil
}

poolCleanup 函數會在 GC 開始之前 STW 時被調用,主要功能:

  1. 清理所有 pool 的 victim 和 victimSize;

  2. 將所有 pool 的 local 和 localSize 賦值給 victim 和 victimSize;

  3. 清理所有 pool 的 local 和 localSize;

總結

  1. sync.Pool 是 Go 語言中用於對象複用的工具,具有對象緩存、自動回收、併發安全和狀態不可靠等特點。

  2. 它適用於頻繁創建和銷燬對象以及減少內存分配壓力的場景,但在使用時需要注意對象狀態不可預期、併發安全仍需謹慎以及適用場景特定等問題。

  3. 通過合理使用 sync.Pool,可以提高程序的性能和內存利用率。在實際開發中,需要根據具體的需求和場景來判斷是否選擇使用 sync.Pool,並遵循其注意事項。

本文由 Readfog 進行 AMP 轉碼,版權歸原作者所有。
來源https://mp.weixin.qq.com/s/rsvhSCmLULwfHrMek8VCBg