Go 併發控制:semaphore 詳解

今天我們來介紹一個 Go 官方庫 x 提供的擴展併發原語 semaphore,譯爲 “信號量”。因爲它就像一個信號一樣控制多個 goroutine 之間協作。

概念講解

我先簡單介紹下信號量的概念,爲不熟悉的讀者作爲補充知識。

一個生活中的例子:假設一個餐廳總共有 10 張餐桌,每來 1 位顧客佔用 1 張餐桌,那麼同一時間共計可以有 10 人在就餐,超過 10 人則需要排隊等位;如果有 1 位顧客就餐完成,則可以讓排隊等待的第 1 位顧客來就餐。

如果加入信號量,則可以這樣理解:

這就是信號量的應用場景,信號量幫助管理餐桌的數量,確保餐廳在任何時刻不會接待超過可容納的顧客數量。

如果放在 Go 程序中,我們就可以使用信號量來實現任務池的功能,控制有限個 goroutine 來併發執行任務。這個我們放在後面來實現,先看下 semaphore 源碼是如何實現的。

源碼解讀

我們可以在 semaphore 文檔中看到其定義和實現的 exported 方法:

https://pkg.go.dev/golang.org/x/sync@v0.10.0/semaphore#pkg-index

type Weighted
    func NewWeighted(n int64) *Weighted
    func (s *Weighted) Acquire(ctx context.Context, n int64) error
    func (s *Weighted) Release(n int64)
    func (s *Weighted) TryAcquire(n int64) bool

semaphore.Weighted 是一個結構體表示信號量對象,NewWeighted 是它的構造函數,用於實例化一個包含 n 個資源的信號量對象,即信號量的初始值爲 n

它實現了 3 個方法,分別是:

現在,你是否能將 semaphore.Weighted 對象和它實現的幾個方法與前文中餐廳就餐的例子對應上了呢?

接下來我們看一下 Weighted 結構體是如何定義的:

https://github.com/golang/sync/blob/v0.9.0/semaphore/semaphore.go

// Weighted 信號量結構體
type Weighted struct {
 size    int64      // 資源總數量
 cur     int64      // 當前已經使用的資源數
 mu      sync.Mutex // 互斥鎖,保證對其他屬性的操作併發安全
 waiters list.List  // 等待者隊列,使用列表實現
}

Weighted 結構體包含 4 個字段:

構造函數 NewWeighted 實現如下:

// NewWeighted 構造一個信號量對象
func NewWeighted(n int64) *Weighted {
 w := &Weighted{size: n}
 return w
}

此外,semaphore 包還爲等待者定義了一個結構體 waiter

// 等待者結構體
type waiter struct {
 n     int64           // 請求資源數
 ready chan<- struct{} // 當獲取到資源時被關閉,用於喚醒當前等待者
}

waiter 結構體包含 2 個字段:

稍後你將看到它的用處。

我們回過頭來看下 Weighted 結構體的第一個方法 Acquire 的實現:

// Acquire 請求 n 個資源
// 如果資源不足,則阻塞等待,直到有足夠的資源數,或者 ctx 被取消
// 成功返回 nil,失敗返回 ctx.Err() 並且不改變資源數
func (s *Weighted) Acquire(ctx context.Context, n int64) error {
 done := ctx.Done()

 s.mu.Lock() // 加鎖保證併發安全

// 如果在分配資源前 ctx 已經取消,則直接返回 ctx.Err()
select {
case <-done:
  s.mu.Unlock()
return ctx.Err()
default:
 }

// 如果資源數足夠,且不存在其他等待者,則請求資源成功,將 cur 加上 n,並返回
if s.size-s.cur >= n && s.waiters.Len() == 0 {
  s.cur += n
  s.mu.Unlock()
returnnil
 }

// 如果請求的資源數大於資源總數,不可能滿足,則阻塞等待 ctx 取消,並返回 ctx.Err()
if n > s.size {
// Don't make other Acquire calls block on one that's doomed to fail.
  s.mu.Unlock()
  <-done
return ctx.Err()
 }

// 資源不夠或者存在其他等待者,則繼續執行

// 加入等待隊列
 ready := make(chanstruct{})    // 創建一個 channel 作爲一個屬性記錄到等待者對象 waiter 中,用於後續通知其喚醒
 w := waiter{n: n, ready: ready} // 構造一個等待者對象 waiter
 elem := s.waiters.PushBack(w)   // 將 waiter 追加到等待者隊列
 s.mu.Unlock()

// 使用 select 實現阻塞等待
select {
case <-done: // 檢查 ctx 是否被取消
  s.mu.Lock()
select {
case <-ready: // 檢查當前 waiter 是否被喚醒
   // 進入這裏,說明是 ctx 被取消後 waiter 被喚醒
   s.cur -= n        // 那麼就當作 waiter 沒有被喚醒,將請求的資源數還回去
   s.notifyWaiters() // 通知等待隊列,檢查隊列中下一個 waiter 資源數是否滿足
default: // 將當前 waiter 從等待者隊列中移除
   isFront := s.waiters.Front() == elem // 當前 waiter 是否爲第一個等待者
   s.waiters.Remove(elem)               // 從隊列中移除
   // 如果當前 waiter 是隊列中第一個等待者,並且還有剩餘的資源數
   if isFront && s.size > s.cur {
    s.notifyWaiters() // 通知等待隊列,檢查隊列中下一個 waiter 資源數是否滿足
   }
  }
  s.mu.Unlock()
return ctx.Err()

case <-ready: // 被喚醒
select {
case <-done: // 再次檢查 ctx 是否被取消
   // 進入這裏,說明 waiter 被喚醒後 ctx 卻被取消了,當作未被喚醒來處理
   s.Release(n) // 釋放資源
   return ctx.Err()
default:
  }
returnnil// 成功返回 nil
 }
}

這是 Weighted 結構體實現最爲複雜的一個方法,不過我在代碼中寫了非常詳細的註釋,來幫助你理解。

Acquire 主要邏輯爲:

那麼接下來,我們看看釋放資源的 Release 方法邏輯:

// Release 釋放 n 個資源
func (s *Weighted) Release(n int64) {
 s.mu.Lock() // 加鎖保證併發安全
 s.cur -= n  // 釋放資源
 if s.cur < 0 {
  s.mu.Unlock()
  panic("semaphore: released more than held")
 }
 s.notifyWaiters() // 通知等待隊列,檢查隊列中下一個 waiter 資源數是否滿足
 s.mu.Unlock()
}

這裏的邏輯就簡單多了,首先執行 s.cur -= n 減少當前已經使用的資源數,即這一步就是釋放資源操作。注意這裏還對 s.cur 是否小於 0 進行了判斷,所以使用時一定是申請多少資源就釋放多少資源,不要用錯。接着同樣調用 s.notifyWaiters() 通知等待者隊列,檢查隊列中下一個 waiter 資源數是否滿足。

現在,是時候看看 notifyWaiters 方法是如何實現的了:

// 檢查隊列中下一個 waiter 資源數是否滿足
func (s *Weighted) notifyWaiters() {
// 循環檢查下一個 waiter 請求的資源數是否滿足,滿足則出隊,不滿足則終止循環
for {
  next := s.waiters.Front() // 獲取隊首元素
if next == nil {
   break// 沒有 waiter 了,隊列爲空終止循環
  }

  w := next.Value.(waiter)
if s.size-s.cur < w.n { // 當前 waiter 資源數不滿足,退出循環
   // 不繼續查找隊列中後續 waiter 請求資源是否滿足,避免產生飢餓
   break
  }

// 資源數滿足,喚醒 waiter
  s.cur += w.n           // 記錄使用的資源數
  s.waiters.Remove(next) // 從隊列中移除 waiter
close(w.ready)         // 利用關閉 channel 的操作,來喚醒 waiter
 }
}

notifyWaiters 方法內部的核心邏輯是:循環檢查下一個 waiter 請求的資源數是否滿足,滿足則出隊,不滿足則終止循環。

可以發現,next 是隊首元素,所以說等待者隊列是先進先出(FIFO)的。

這裏還有一個要注意的點是,當前 waiter 資源數不滿足時,直接退出循環,而不再繼續查找隊列中後續 waiter 請求資源是否滿足。比如當前可用資源數爲 5,等待者隊列中有兩個等待者 waiter1 和 waiter2waiter1 請求的資源數是 10,waiter2 請求的資源數是 1,此時就會退出循環,waiter1 和 waiter2 都繼續等待,而不會先將資源分配給 waiter2。這樣做是爲了避免之後的 waiter3waiter4... 總是比 waiter1 請求的資源數小,而導致 waiter1 長時間阻塞,從而產生飢餓。

Weighted 還有最後一個方法 TryAcquire 我們一起來看看是如何實現的:

// TryAcquire 嘗試請求 n 個資源
// 不阻塞,成功返回 true,失敗返回 false 並且不改變資源數
func (s *Weighted) TryAcquire(n int64) bool {
 s.mu.Lock() // 加鎖保證併發安全
 // 剩餘資源數足夠,且不存在其他等待者,則請求資源成功
 success := s.size-s.cur >= n && s.waiters.Len() == 0
 if success {
  s.cur += n // 記錄當前已經使用的資源數
 }
 s.mu.Unlock()
 return success
}

這個方法實現也很簡單,就是檢查剩餘資源數是否足夠,且不存在其他等待者,如果滿足,則請求資源成功,增加當前已經使用的資源數 s.cur += n,然後返回 true,否則返回 false

至此,semaphore 包的源碼就講解完成了。

使用示例

熟悉了 semaphore 包的源碼,那麼如何使用就不在話下了。如下是 semaphore 包文檔中提供的 worker pool 模式示例,演示如何使用信號量來限制並行任務中運行的 goroutine 數量。代碼如下:

https://pkg.go.dev/golang.org/x/sync@v0.10.0/semaphore#example-package-WorkerPool

package main

import (
"context"
"fmt"
"log"
"runtime"

"golang.org/x/sync/semaphore"
)

func main() {
 ctx := context.TODO()

var (
  maxWorkers = runtime.GOMAXPROCS(0)                    // worker pool 支持的最大 worker 數量,取當前機器 CPU 核心數
  sem        = semaphore.NewWeighted(int64(maxWorkers)) // 信號量,資源總數即爲最大 worker 數量
  out        = make([]int, 32)                          // 總任務數量
 )

// 一次最多啓動 maxWorkers 數量個 goroutine 計算輸出
for i := range out {
// 當最大工作數 maxWorkers 個 goroutine 正在執行時,Acquire 會阻塞直到其中一個 goroutine 完成
if err := sem.Acquire(ctx, 1); err != nil { // 請求資源
   log.Printf("Failed to acquire semaphore: %v", err)
   break
  }

// 開啓新的 goroutine 執行計算任務
go func(i int) {
   defer sem.Release(1)         // 任務執行完成後釋放資源
   out[i] = collatzSteps(i + 1) // 執行 Collatz 步驟計算
  }(i)
 }

// 獲取所有的 tokens 以等待全部 goroutine 執行完成
if err := sem.Acquire(ctx, int64(maxWorkers)); err != nil {
  log.Printf("Failed to acquire semaphore: %v", err)
 }

 fmt.Println(out)

}

// collatzSteps computes the number of steps to reach 1 under the Collatz
// conjecture. (See https://en.wikipedia.org/wiki/Collatz_conjecture.)
func collatzSteps(n int) (steps int) {
if n <= 0 {
panic("nonpositive input")
 }

for ; n > 1; steps++ {
if steps < 0 {
   panic("too many steps")
  }

if n%2 == 0 {
   n /= 2
   continue
  }

const maxInt = int(^uint(0) >> 1)
if n > (maxInt-1)/3 {
   panic("overflow")
  }
  n = 3*n + 1
 }

return steps
}

這段代碼利用信號量(semaphore)實現一個工作池(worker pool),來限制最大併發協程(goroutine)數。

通過 runtime.GOMAXPROCS(0) 可以獲取當前機器 CPU 核心數(比如在我的 Apple M1 Pro 機器中這個值是 10),以此作爲 worker pool 支持的最大 worker 數量,這也是信號量的資源總數。out 切片長度爲 32,即總任務數爲 32。使用 for 循環來開啓新的 goroutine 執行計算任務 collatzSteps(i + 1),不過在開啓新的 goroutine 前,會調用 sem.Acquire(ctx, 1) 向 worker pool 申請一個資源,當最大工作數 maxWorkers 個 goroutine 正在執行時,Acquire 會阻塞直到其中一個 goroutine 完成,任務執行完成後在 defer 語句中調用 sem.Release(1) 釋放資源。

關於計算任務 collatzSteps 我們其實不必深究,只需要知道這是一個耗時任務即可。簡單來說,Collatz 是一個數學猜想,提出了一個看似簡單的整數序列規則,對於任意正整數 n 執行操作,如果 n 是偶數,則將 n 除以 2,如果 n 是奇數,則將 n 乘以 3 再加 1(即 3n + 1),反覆執行這些操作後,最終所有整數都將達到 1。collatzSteps 函數實現了計算在 Collatz 猜想下一個正整數 n 需要多少步才能達到 1。如果你實在感興趣可以點擊鏈接 https://en.wikipedia.org/wiki/Collatz_conjecture 瞭解一下。

值得注意的是,在 for 循環提交完所有任務後,使用 sem.Acquire(ctx, int64(maxWorkers)) 獲取了信號量中全部資源,這樣就能夠確保所有任務執行完成後纔會退出,它的作用類似 sync.WaitGroup.Wait()

那麼接下來我們使用 sync.WaitGroup 來實現同樣的功能:

https://github.com/jianghushinian/blog-go-example/blob/main/x/sync/semaphore/waitgroup/main.go

func main() {
var (
  maxWorkers = runtime.GOMAXPROCS(0)           // 獲取系統可用的最大 CPU 核心數
  out        = make([]int, 32)                 // 存儲 Collatz 結果
  wg         sync.WaitGroup                    // 用於等待 goroutine 完成
  sem        = make(chanstruct{}, maxWorkers) // 用於限制最大併發數
 )

for i := range out {
// 通過 sem 管理併發,確保最多隻有 maxWorkers 個 goroutine 同時執行
  sem <- struct{}{} // 如果 sem 已滿,這裏會阻塞,直到有空閒槽位

// 增加 WaitGroup 計數
  wg.Add(1)

go func(i int) {
   defer wg.Done()          // goroutine 完成時,減少 WaitGroup 計數
   defer func() { <-sem }() // goroutine 完成時,從 sem 中釋放一個槽位

   // 執行 Collatz 步驟計算
   out[i] = collatzSteps(i + 1)
  }(i)
 }

// 等待所有 goroutine 完成
 wg.Wait()

// 輸出結果
 fmt.Println(out)
}

現在對比一下使用 sync.WaitGroup 和 semaphore 實現的代碼,是否能加深你對信號量這一功能的理解呢?

最後,我們再來留個作業,請使用 errgroup 實現一遍這個示例程序。

如果你對 sync.WaitGroup 或 errgroup 不熟悉,可以參考我的文章「Go 併發控制:sync.WaitGroup 詳解」和「Go 併發控制:errgroup 詳解」。

總結

本文對 Go 中的擴展併發原語 semaphore 進行了講解,並帶你看了其源碼的實現,以及介紹瞭如何使用。

不知道你有沒有發現,在初始化 semaphore 時,傳遞的 n 如果爲 1,那麼這個信號量其實就相當於互斥鎖 sync.Mutex

semaphore 可以用來實現 worker pool 模式,並且使用套路或場景與 sync.WaitGroup 也比較相似,你可以對比學習。

本文示例源碼我都放在了 GitHub 中,歡迎點擊查看。

希望此文能對你有所啓發。

延伸閱讀

聯繫我

本文由 Readfog 進行 AMP 轉碼,版權歸原作者所有。
來源https://mp.weixin.qq.com/s/noUEVyNAILTVFwKOkX4Ziw