Go 併發控制:semaphore 詳解
今天我們來介紹一個 Go 官方庫 x 提供的擴展併發原語 semaphore
,譯爲 “信號量”。因爲它就像一個信號一樣控制多個 goroutine 之間協作。
概念講解
我先簡單介紹下信號量的概念,爲不熟悉的讀者作爲補充知識。
一個生活中的例子:假設一個餐廳總共有 10 張餐桌,每來 1 位顧客佔用 1 張餐桌,那麼同一時間共計可以有 10 人在就餐,超過 10 人則需要排隊等位;如果有 1 位顧客就餐完成,則可以讓排隊等待的第 1 位顧客來就餐。
如果加入信號量,則可以這樣理解:
-
10 個餐桌是有限的資源,即信號量初始值爲 10。
-
當顧客進入餐廳,如果餐桌有空,顧客可以被分配 1 個餐桌,信號量的值減 1。
-
如果沒有空餐桌,顧客需要排隊等待,直到有空餐桌爲止(信號量值爲 0 時,新的顧客必須等待)。
-
有顧客就餐完成後,餐桌空出,信號量的值加 1,新的顧客(排隊等待中的顧客,或者無人排隊時新來的顧客)可以被分配餐桌。
這就是信號量的應用場景,信號量幫助管理餐桌的數量,確保餐廳在任何時刻不會接待超過可容納的顧客數量。
如果放在 Go 程序中,我們就可以使用信號量來實現任務池的功能,控制有限個 goroutine 來併發執行任務。這個我們放在後面來實現,先看下 semaphore
源碼是如何實現的。
源碼解讀
我們可以在 semaphore
文檔中看到其定義和實現的 exported 方法:
https://pkg.go.dev/golang.org/x/sync@v0.10.0/semaphore#pkg-index
type Weighted
func NewWeighted(n int64) *Weighted
func (s *Weighted) Acquire(ctx context.Context, n int64) error
func (s *Weighted) Release(n int64)
func (s *Weighted) TryAcquire(n int64) bool
semaphore.Weighted
是一個結構體表示信號量對象,NewWeighted
是它的構造函數,用於實例化一個包含 n
個資源的信號量對象,即信號量的初始值爲 n
。
它實現了 3 個方法,分別是:
-
Acquire
:用來請求n
個資源,即對信號量的值進行減n
操作。如果資源不足,則阻塞等待,直到有足夠的資源數,或者ctx
被取消。 -
Release
:用來釋放n
個資源,即對信號量的值進行加n
操作。 -
TryAcquire
:用來請求n
個資源,即對信號量的值進行減n
操作。與Acquire
不同的是,TryAcquire
不會阻塞等待,成功返回true
,失敗返回false
。
現在,你是否能將 semaphore.Weighted
對象和它實現的幾個方法與前文中餐廳就餐的例子對應上了呢?
接下來我們看一下 Weighted
結構體是如何定義的:
https://github.com/golang/sync/blob/v0.9.0/semaphore/semaphore.go
// Weighted 信號量結構體
type Weighted struct {
size int64 // 資源總數量
cur int64 // 當前已經使用的資源數
mu sync.Mutex // 互斥鎖,保證對其他屬性的操作併發安全
waiters list.List // 等待者隊列,使用列表實現
}
Weighted
結構體包含 4 個字段:
-
size
是資源總數。在餐廳的例子中就是 10 張餐桌。 -
cur
記錄了當前已經使用的資源數。在餐廳的例子中就是已經被顧客佔用的餐桌數,size - cur
就是剩餘資源數,即空餐桌數。 -
mu
是互斥鎖,用於保證對其他屬性的操作是併發安全的。 -
waiters
是等待者隊列,記錄所有排隊的等待者。在餐廳的例子中,當 10 張餐桌都被佔滿,新來的顧客就要進入等待者隊列。
構造函數 NewWeighted
實現如下:
// NewWeighted 構造一個信號量對象
func NewWeighted(n int64) *Weighted {
w := &Weighted{size: n}
return w
}
此外,semaphore
包還爲等待者定義了一個結構體 waiter
:
// 等待者結構體
type waiter struct {
n int64 // 請求資源數
ready chan<- struct{} // 當獲取到資源時被關閉,用於喚醒當前等待者
}
waiter
結構體包含 2 個字段:
-
n
記錄了當前等待者請求的資源數。 -
ready
是一個channel
類型,當資源滿足,ready
會被close
掉,等待者就會被喚醒。
稍後你將看到它的用處。
我們回過頭來看下 Weighted
結構體的第一個方法 Acquire
的實現:
// Acquire 請求 n 個資源
// 如果資源不足,則阻塞等待,直到有足夠的資源數,或者 ctx 被取消
// 成功返回 nil,失敗返回 ctx.Err() 並且不改變資源數
func (s *Weighted) Acquire(ctx context.Context, n int64) error {
done := ctx.Done()
s.mu.Lock() // 加鎖保證併發安全
// 如果在分配資源前 ctx 已經取消,則直接返回 ctx.Err()
select {
case <-done:
s.mu.Unlock()
return ctx.Err()
default:
}
// 如果資源數足夠,且不存在其他等待者,則請求資源成功,將 cur 加上 n,並返回
if s.size-s.cur >= n && s.waiters.Len() == 0 {
s.cur += n
s.mu.Unlock()
returnnil
}
// 如果請求的資源數大於資源總數,不可能滿足,則阻塞等待 ctx 取消,並返回 ctx.Err()
if n > s.size {
// Don't make other Acquire calls block on one that's doomed to fail.
s.mu.Unlock()
<-done
return ctx.Err()
}
// 資源不夠或者存在其他等待者,則繼續執行
// 加入等待隊列
ready := make(chanstruct{}) // 創建一個 channel 作爲一個屬性記錄到等待者對象 waiter 中,用於後續通知其喚醒
w := waiter{n: n, ready: ready} // 構造一個等待者對象 waiter
elem := s.waiters.PushBack(w) // 將 waiter 追加到等待者隊列
s.mu.Unlock()
// 使用 select 實現阻塞等待
select {
case <-done: // 檢查 ctx 是否被取消
s.mu.Lock()
select {
case <-ready: // 檢查當前 waiter 是否被喚醒
// 進入這裏,說明是 ctx 被取消後 waiter 被喚醒
s.cur -= n // 那麼就當作 waiter 沒有被喚醒,將請求的資源數還回去
s.notifyWaiters() // 通知等待隊列,檢查隊列中下一個 waiter 資源數是否滿足
default: // 將當前 waiter 從等待者隊列中移除
isFront := s.waiters.Front() == elem // 當前 waiter 是否爲第一個等待者
s.waiters.Remove(elem) // 從隊列中移除
// 如果當前 waiter 是隊列中第一個等待者,並且還有剩餘的資源數
if isFront && s.size > s.cur {
s.notifyWaiters() // 通知等待隊列,檢查隊列中下一個 waiter 資源數是否滿足
}
}
s.mu.Unlock()
return ctx.Err()
case <-ready: // 被喚醒
select {
case <-done: // 再次檢查 ctx 是否被取消
// 進入這裏,說明 waiter 被喚醒後 ctx 卻被取消了,當作未被喚醒來處理
s.Release(n) // 釋放資源
return ctx.Err()
default:
}
returnnil// 成功返回 nil
}
}
這是 Weighted
結構體實現最爲複雜的一個方法,不過我在代碼中寫了非常詳細的註釋,來幫助你理解。
Acquire
主要邏輯爲:
-
先檢查
ctx
是否被取消,如果在分配資源前ctx
已經取消,則直接返回ctx.Err()
。 -
檢查當前剩餘資源數是否滿足請求的資源數,如果資源數足夠,且不存在其他等待者,則請求資源成功,將
cur
加上n
,並返回。 -
校驗請求的資源數是否合法,如果請求的資源數大於資源總數,則不可能被滿足,此時會阻塞等待
ctx
被取消,並返回ctx.Err()
。 -
如果目前資源不夠,或者存在其他等待者,則代碼將繼續執行,進入阻塞等待邏輯:
-
如果是
ctx
被取消,還會再檢查一下waiter
是否被喚醒,如果被喚醒,則還是以ctx
被取消爲準,會當作waiter
沒有被喚醒,將請求的資源數還回去,並調用s.notifyWaiters()
通知等待者隊列,檢查隊列中下一個waiter
資源數是否滿足;如果沒被喚醒,則將當前waiter
從等待者隊列中移除,如果當前waiter
是隊列中第一個等待者,並且還有剩餘的資源數,則還會調用s.notifyWaiters()
通知等待者隊列,檢查隊列中下一個waiter
資源數是否滿足。 -
如果是
waiter
被喚醒,還會再檢查一下ctx
是否被取消,如果被取消,則以ctx
被取消爲準,會調用s.Release(n)
釋放掉當前waiter
請求的資源;如果沒被取消,則返回nil
表示請求資源成功。 -
首先構造一個等待者對象
waiter
,並將其加入等待者隊列。 -
接着,使用
select
實現阻塞等待。此時又分兩種情況,ctx
被取消,或者waiter
被喚醒。
那麼接下來,我們看看釋放資源的 Release
方法邏輯:
// Release 釋放 n 個資源
func (s *Weighted) Release(n int64) {
s.mu.Lock() // 加鎖保證併發安全
s.cur -= n // 釋放資源
if s.cur < 0 {
s.mu.Unlock()
panic("semaphore: released more than held")
}
s.notifyWaiters() // 通知等待隊列,檢查隊列中下一個 waiter 資源數是否滿足
s.mu.Unlock()
}
這裏的邏輯就簡單多了,首先執行 s.cur -= n
減少當前已經使用的資源數,即這一步就是釋放資源操作。注意這裏還對 s.cur
是否小於 0 進行了判斷,所以使用時一定是申請多少資源就釋放多少資源,不要用錯。接着同樣調用 s.notifyWaiters()
通知等待者隊列,檢查隊列中下一個 waiter
資源數是否滿足。
現在,是時候看看 notifyWaiters
方法是如何實現的了:
// 檢查隊列中下一個 waiter 資源數是否滿足
func (s *Weighted) notifyWaiters() {
// 循環檢查下一個 waiter 請求的資源數是否滿足,滿足則出隊,不滿足則終止循環
for {
next := s.waiters.Front() // 獲取隊首元素
if next == nil {
break// 沒有 waiter 了,隊列爲空終止循環
}
w := next.Value.(waiter)
if s.size-s.cur < w.n { // 當前 waiter 資源數不滿足,退出循環
// 不繼續查找隊列中後續 waiter 請求資源是否滿足,避免產生飢餓
break
}
// 資源數滿足,喚醒 waiter
s.cur += w.n // 記錄使用的資源數
s.waiters.Remove(next) // 從隊列中移除 waiter
close(w.ready) // 利用關閉 channel 的操作,來喚醒 waiter
}
}
notifyWaiters
方法內部的核心邏輯是:循環檢查下一個 waiter
請求的資源數是否滿足,滿足則出隊,不滿足則終止循環。
可以發現,next
是隊首元素,所以說等待者隊列是先進先出(FIFO
)的。
這裏還有一個要注意的點是,當前 waiter
資源數不滿足時,直接退出循環,而不再繼續查找隊列中後續 waiter
請求資源是否滿足。比如當前可用資源數爲 5,等待者隊列中有兩個等待者 waiter1
和 waiter2
,waiter1
請求的資源數是 10,waiter2
請求的資源數是 1,此時就會退出循環,waiter1
和 waiter2
都繼續等待,而不會先將資源分配給 waiter2
。這樣做是爲了避免之後的 waiter3
、waiter4
... 總是比 waiter1
請求的資源數小,而導致 waiter1
長時間阻塞,從而產生飢餓。
Weighted
還有最後一個方法 TryAcquire
我們一起來看看是如何實現的:
// TryAcquire 嘗試請求 n 個資源
// 不阻塞,成功返回 true,失敗返回 false 並且不改變資源數
func (s *Weighted) TryAcquire(n int64) bool {
s.mu.Lock() // 加鎖保證併發安全
// 剩餘資源數足夠,且不存在其他等待者,則請求資源成功
success := s.size-s.cur >= n && s.waiters.Len() == 0
if success {
s.cur += n // 記錄當前已經使用的資源數
}
s.mu.Unlock()
return success
}
這個方法實現也很簡單,就是檢查剩餘資源數是否足夠,且不存在其他等待者,如果滿足,則請求資源成功,增加當前已經使用的資源數 s.cur += n
,然後返回 true
,否則返回 false
。
至此,semaphore
包的源碼就講解完成了。
使用示例
熟悉了 semaphore
包的源碼,那麼如何使用就不在話下了。如下是 semaphore
包文檔中提供的 worker pool
模式示例,演示如何使用信號量來限制並行任務中運行的 goroutine 數量。代碼如下:
https://pkg.go.dev/golang.org/x/sync@v0.10.0/semaphore#example-package-WorkerPool
package main
import (
"context"
"fmt"
"log"
"runtime"
"golang.org/x/sync/semaphore"
)
func main() {
ctx := context.TODO()
var (
maxWorkers = runtime.GOMAXPROCS(0) // worker pool 支持的最大 worker 數量,取當前機器 CPU 核心數
sem = semaphore.NewWeighted(int64(maxWorkers)) // 信號量,資源總數即爲最大 worker 數量
out = make([]int, 32) // 總任務數量
)
// 一次最多啓動 maxWorkers 數量個 goroutine 計算輸出
for i := range out {
// 當最大工作數 maxWorkers 個 goroutine 正在執行時,Acquire 會阻塞直到其中一個 goroutine 完成
if err := sem.Acquire(ctx, 1); err != nil { // 請求資源
log.Printf("Failed to acquire semaphore: %v", err)
break
}
// 開啓新的 goroutine 執行計算任務
go func(i int) {
defer sem.Release(1) // 任務執行完成後釋放資源
out[i] = collatzSteps(i + 1) // 執行 Collatz 步驟計算
}(i)
}
// 獲取所有的 tokens 以等待全部 goroutine 執行完成
if err := sem.Acquire(ctx, int64(maxWorkers)); err != nil {
log.Printf("Failed to acquire semaphore: %v", err)
}
fmt.Println(out)
}
// collatzSteps computes the number of steps to reach 1 under the Collatz
// conjecture. (See https://en.wikipedia.org/wiki/Collatz_conjecture.)
func collatzSteps(n int) (steps int) {
if n <= 0 {
panic("nonpositive input")
}
for ; n > 1; steps++ {
if steps < 0 {
panic("too many steps")
}
if n%2 == 0 {
n /= 2
continue
}
const maxInt = int(^uint(0) >> 1)
if n > (maxInt-1)/3 {
panic("overflow")
}
n = 3*n + 1
}
return steps
}
這段代碼利用信號量(semaphore
)實現一個工作池(worker pool
),來限制最大併發協程(goroutine
)數。
通過 runtime.GOMAXPROCS(0)
可以獲取當前機器 CPU 核心數(比如在我的 Apple M1 Pro 機器中這個值是 10),以此作爲 worker pool
支持的最大 worker
數量,這也是信號量的資源總數。out
切片長度爲 32,即總任務數爲 32。使用 for
循環來開啓新的 goroutine 執行計算任務 collatzSteps(i + 1)
,不過在開啓新的 goroutine 前,會調用 sem.Acquire(ctx, 1)
向 worker pool
申請一個資源,當最大工作數 maxWorkers
個 goroutine 正在執行時,Acquire
會阻塞直到其中一個 goroutine 完成,任務執行完成後在 defer
語句中調用 sem.Release(1)
釋放資源。
關於計算任務
collatzSteps
我們其實不必深究,只需要知道這是一個耗時任務即可。簡單來說,Collatz 是一個數學猜想,提出了一個看似簡單的整數序列規則,對於任意正整數n
執行操作,如果n
是偶數,則將n
除以 2,如果n
是奇數,則將n
乘以 3 再加 1(即3n + 1
),反覆執行這些操作後,最終所有整數都將達到 1。collatzSteps
函數實現了計算在 Collatz 猜想下一個正整數n
需要多少步才能達到 1。如果你實在感興趣可以點擊鏈接 https://en.wikipedia.org/wiki/Collatz_conjecture 瞭解一下。
值得注意的是,在 for
循環提交完所有任務後,使用 sem.Acquire(ctx, int64(maxWorkers))
獲取了信號量中全部資源,這樣就能夠確保所有任務執行完成後纔會退出,它的作用類似 sync.WaitGroup.Wait()
。
那麼接下來我們使用 sync.WaitGroup
來實現同樣的功能:
https://github.com/jianghushinian/blog-go-example/blob/main/x/sync/semaphore/waitgroup/main.go
func main() {
var (
maxWorkers = runtime.GOMAXPROCS(0) // 獲取系統可用的最大 CPU 核心數
out = make([]int, 32) // 存儲 Collatz 結果
wg sync.WaitGroup // 用於等待 goroutine 完成
sem = make(chanstruct{}, maxWorkers) // 用於限制最大併發數
)
for i := range out {
// 通過 sem 管理併發,確保最多隻有 maxWorkers 個 goroutine 同時執行
sem <- struct{}{} // 如果 sem 已滿,這裏會阻塞,直到有空閒槽位
// 增加 WaitGroup 計數
wg.Add(1)
go func(i int) {
defer wg.Done() // goroutine 完成時,減少 WaitGroup 計數
defer func() { <-sem }() // goroutine 完成時,從 sem 中釋放一個槽位
// 執行 Collatz 步驟計算
out[i] = collatzSteps(i + 1)
}(i)
}
// 等待所有 goroutine 完成
wg.Wait()
// 輸出結果
fmt.Println(out)
}
現在對比一下使用 sync.WaitGroup
和 semaphore
實現的代碼,是否能加深你對信號量這一功能的理解呢?
最後,我們再來留個作業,請使用 errgroup
實現一遍這個示例程序。
如果你對 sync.WaitGroup
或 errgroup
不熟悉,可以參考我的文章「Go 併發控制:sync.WaitGroup 詳解」和「Go 併發控制:errgroup 詳解」。
總結
本文對 Go 中的擴展併發原語 semaphore
進行了講解,並帶你看了其源碼的實現,以及介紹瞭如何使用。
不知道你有沒有發現,在初始化 semaphore
時,傳遞的 n
如果爲 1,那麼這個信號量其實就相當於互斥鎖 sync.Mutex
。
semaphore
可以用來實現 worker pool
模式,並且使用套路或場景與 sync.WaitGroup
也比較相似,你可以對比學習。
本文示例源碼我都放在了 GitHub 中,歡迎點擊查看。
希望此文能對你有所啓發。
延伸閱讀
-
Go 併發控制:sync.WaitGroup 詳解:https://jianghushinian.cn/2024/12/23/sync-waitgroup/
-
Go 併發控制:errgroup 詳解:https://jianghushinian.cn/2024/11/04/x-sync-errgroup/
-
semaphore Documentation:https://pkg.go.dev/golang.org/x/sync@v0.10.0/semaphore
-
semaphore GitHub 源碼:https://github.com/golang/sync/blob/v0.9.0/semaphore/semaphore.go
-
本文 GitHub 示例代碼:https://github.com/jianghushinian/blog-go-example/tree/main/x/sync/semaphore
聯繫我
-
公衆號:Go 編程世界
-
微信:jianghushinian
-
郵箱:jianghushinian007@outlook.com
-
博客:https://jianghushinian.cn
-
GitHub:https://github.com/jianghushinian
本文由 Readfog 進行 AMP 轉碼,版權歸原作者所有。
來源:https://mp.weixin.qq.com/s/noUEVyNAILTVFwKOkX4Ziw