Go 官方設計了一個信號量庫
前言
哈嘍,大家好,我是
asong
。在寫上一篇文章請勿濫用 goroutine 時,發現Go
語言擴展包提供了一個帶權重的信號量庫 Semaphore,使用信號量我們可以實現一個 "工作池" 控制一定數量的goroutine
併發工作。因爲對源碼抱有好奇的態度,所以在週末仔細看了一下這個庫並進行了解析,在這裏記錄一下。
何爲信號量
要想知道一個東西是什麼,我都愛去百度百科上搜一搜,輸入 "信號量",這答案不就來了。
百度百科解釋:
信號量 (Semaphore),有時被稱爲信號燈,是 [多線程環境下使用的一種設施,是可以用來保證兩個或多個關鍵代碼段不被併發調用。在進入一個關鍵代碼段之前,線程必須獲取一個信號量;一旦該關鍵代碼段完成了,那麼該線程必須釋放信號量。其它想進入該關鍵代碼段的線程必須等待直到第一個線程釋放信號量。爲了完成這個過程,需要創建一個信號量 VI,然後將 Acquire Semaphore VI 以及 Release Semaphore VI 分別放置在每個關鍵代碼段的首末端。確認這些信號量 VI 引用的是初始創建的信號量。
通過這段解釋我們可以得知什麼是信號量,其實信號量就是一種變量或者抽象數據類型,用於控制併發系統中多個進程對公共資源的訪問,訪問具有原子性。信號量主要分爲兩類:
-
二值信號量:顧名思義,其值只有兩種
0
或者1
,相當於互斥量,當值爲1
時資源可用,當值爲0
時,資源被鎖住,進程阻塞無法繼續執行。 -
計數信號量:信號量是一個任意的整數,起始時,如果計數器的計數值爲
0
,那麼創建出來的信號量就是不可獲得的狀態,如果計數器的計數值大於0
,那麼創建出來的信號量就是可獲得的狀態,並且總共獲取的次數等於計數器的值。
信號量工作原理
信號量是由操作系統來維護的,信號量只能進行兩種操作等待和發送信號,操作總結來說,核心就是PV
操作:
-
P 原語:P 是荷蘭語 Proberen(測試) 的首字母。爲阻塞原語,負責把當前進程由運行狀態轉換爲阻塞狀態,直到另外一個進程喚醒它。操作爲:申請一個空閒資源 (把信號量減 1),若成功,則退出;若失敗,則該進程被阻塞;
-
V 原語:V 是荷蘭語 Verhogen(增加) 的首字母。爲喚醒原語,負責把一個被阻塞的進程喚醒,它有一個參數表,存放着等待被喚醒的進程信息。操作爲:釋放一個被佔用的資源 (把信號量加 1),如果發現有被阻塞的進程,則選擇一個喚醒之。
在信號量進行 PV 操作時都爲原子操作,並且在 PV 原語執行期間不允許有中斷的發生。
PV 原語對信號量的操作可以分爲三種情況:
-
把信號量視爲某種類型的共享資源的剩餘個數,實現對一類共享資源的訪問
-
把信號量用作進程間的同步
-
視信號量爲一個加鎖標誌,實現對一個共享變量的訪問
具體在什麼場景使用本文就不在繼續分析,接下來我們重點來看一下Go
語言提供的擴展包Semaphore
,看看它是怎樣實現的。
官方擴展包Semaphore
我們之前在分析Go
語言源碼時總會看到這幾個函數:
func runtime_Semacquire(s *uint32)
func runtime_SemacquireMutex(s *uint32, lifo bool, skipframes int)
func runtime_Semrelease(s *uint32, handoff bool, skipframes int)
這幾個函數就是信號量的PV
操作,不過他們都是給Go
內部使用的,如果想使用信號量,那就可以使用官方的擴展包:Semaphore,這是一個帶權重的信號量,接下來我們就重點分析一下這個庫。
安裝方法:go get -u golang.org/x/sync
數據結構
type Weighted struct {
size int64 // 設置一個最大權值
cur int64 // 標識當前已被使用的資源數
mu sync.Mutex // 提供臨界區保護
waiters list.List // 阻塞等待的調用者列表
}
semaphore
庫核心結構就是Weighted
,主要有4
個字段:
-
size
:這個代表的是最大權值,在創建Weighted
對象指定 -
cur
:相當於一個遊標,來記錄當前已使用的權值 -
mu
:互斥鎖,併發情況下做臨界區保護 -
waiters
:阻塞等待的調用者列表,使用鏈表數據結構保證先進先出的順序,存儲的數據是waiter
對象,waiter
數據結構如下:
type waiter struct {
n int64 // 等待調用者權重值
ready chan<- struct{} // close channel就是喚醒
}
這裏只有兩個字段:
-
n
:這個就是等待調用者的權重值 -
ready
:這就是一個channel
,利用channel
的close
機制實現喚醒
semaphore
還提供了一個創建Weighted
對象的方法,在初始化時需要給定最大權值:
// NewWeighted爲併發訪問創建一個新的加權信號量,該信號量具有給定的最大權值。
func NewWeighted(n int64) *Weighted {
w := &Weighted{size: n}
return w
}
阻塞獲取權值的方法 - Acquire
先直接看代碼吧:
func (s *Weighted) Acquire(ctx context.Context, n int64) error {
s.mu.Lock() // 加鎖保護臨界區
// 有資源可用並且沒有等待獲取權值的goroutine
if s.size-s.cur >= n && s.waiters.Len() == 0 {
s.cur += n // 加權
s.mu.Unlock() // 釋放鎖
return nil
}
// 要獲取的權值n大於最大的權值了
if n > s.size {
// 先釋放鎖,確保其他goroutine調用Acquire的地方不被阻塞
s.mu.Unlock()
// 阻塞等待context的返回
<-ctx.Done()
return ctx.Err()
}
// 走到這裏就說明現在沒有資源可用了
// 創建一個channel用來做通知喚醒
ready := make(chan struct{})
// 創建waiter對象
w := waiter{n: n, ready: ready}
// waiter按順序入隊
elem := s.waiters.PushBack(w)
// 釋放鎖,等待喚醒,別阻塞其他goroutine
s.mu.Unlock()
// 阻塞等待喚醒
select {
// context關閉
case <-ctx.Done():
err := ctx.Err() // 先獲取context的錯誤信息
s.mu.Lock()
select {
case <-ready:
// 在context被關閉後被喚醒了,那麼試圖修復隊列,假裝我們沒有取消
err = nil
default:
// 判斷是否是第一個元素
isFront := s.waiters.Front() == elem
// 移除第一個元素
s.waiters.Remove(elem)
// 如果是第一個元素且有資源可用通知其他waiter
if isFront && s.size > s.cur {
s.notifyWaiters()
}
}
s.mu.Unlock()
return err
// 被喚醒了
case <-ready:
return nil
}
}
註釋已經加到代碼中了,總結一下這個方法主要有三個流程:
-
流程一:有資源可用時並且沒有等待權值的
goroutine
,走正常加權流程; -
流程二:想要獲取的權值
n
大於初始化時設置最大的權值了,這個goroutine
永遠不會獲取到信號量,所以阻塞等待context
的關閉; -
流程三:前兩步都沒問題的話,就說明現在系統沒有資源可用了,這時就需要阻塞等待喚醒,在阻塞等待喚醒這裏有特殊邏輯;
-
特殊邏輯二:
context
關閉後,則根據是否有可用資源決定通知後面等待喚醒的調用者,這樣做的目的其實是爲了避免當不同的context
控制不同的goroutine
時,未關閉的goroutine
不會被阻塞住,依然執行,來看這樣一個例子(因爲goroutine
的搶佔式調度,所以這個例子也會具有偶然性): -
特殊邏輯一:如果在
context
被關閉後被喚醒了,那麼就先忽略掉這個cancel
,試圖修復隊列。func main() { s := semaphore.NewWeighted(3) ctx,cancel := context.WithTimeout(context.Background(), time.Second * 2) defer cancel() for i :=0; i < 3; i++{ if i != 0{ go func(num int) { if err := s.Acquire(ctx,3); err != nil{ fmt.Printf("goroutine: %d, err is %s\n", num, err.Error()) return } time.Sleep(2 * time.Second) fmt.Printf("goroutine: %d run over\n",num) s.Release(3) }(i) }else { go func(num int) { ct,cancel := context.WithTimeout(context.Background(), time.Second * 3) defer cancel() if err := s.Acquire(ct,3); err != nil{ fmt.Printf("goroutine: %d, err is %s\n", num, err.Error()) return } time.Sleep(3 * time.Second) fmt.Printf("goroutine: %d run over\n",num) s.Release(3) }(i) } } time.Sleep(10 * time.Second) }
上面的例子中
goroutine:0
使用ct
對象來做控制,超時時間爲3s
,goroutine:1
和goroutine:2
對象使用ctx
對象來做控制,超時時間爲2s
,這三個goroutine
佔用的資源都等於最大資源數,也就是說只能有一個goruotine
運行成功,另外兩個goroutine
都會被阻塞,因爲goroutine
是搶佔式調度,所以我們不能確定哪個gouroutine
會第一個被執行,這裏我們假設第一個獲取到信號量的是gouroutine:2
,阻塞等待的調用者列表順序是:goroutine:1
->goroutine:0
,因爲在goroutine:2
中有一個2s
的延時,所以會觸發ctx
的超時,ctx
會下發Done
信號,因爲goroutine:2
和goroutine:1
都是被ctx
控制的,所以就會把goroutine:1
從等待者隊列中取消,但是因爲goroutine:1
屬於隊列的第一個隊員,並且因爲goroutine:2
已經釋放資源,那麼就會喚醒goroutine:0
繼續執行,畫個圖表示一下:使用這種方式可以避免
goroutine
永久失眠。
不阻塞獲取權值的方法 - TryAcquire
func (s *Weighted) TryAcquire(n int64) bool {
s.mu.Lock() // 加鎖
// 有資源可用並且沒有等待獲取資源的goroutine
success := s.size-s.cur >= n && s.waiters.Len() == 0
if success {
s.cur += n
}
s.mu.Unlock()
return success
}
這個方法就簡單很多了,不阻塞地獲取權重爲n
的信號量,成功時返回true
,失敗時返回false
並保持信號量不變。
釋放權重
func (s *Weighted) Release(n int64) {
s.mu.Lock()
// 釋放資源
s.cur -= n
// 釋放資源大於持有的資源,則會發生panic
if s.cur < 0 {
s.mu.Unlock()
panic("semaphore: released more than held")
}
// 通知其他等待的調用者
s.notifyWaiters()
s.mu.Unlock()
}
這裏就是很常規的操作,主要就是資源釋放,同時進行安全性判斷,如果釋放資源大於持有的資源,則會發生 panic。
喚醒waiter
在Acquire
和Release
方法中都調用了notifyWaiters
,我們來分析一下這個方法:
func (s *Weighted) notifyWaiters() {
for {
// 獲取等待調用者隊列中的隊員
next := s.waiters.Front()
// 沒有要通知的調用者了
if next == nil {
break // No more waiters blocked.
}
// 斷言出waiter信息
w := next.Value.(waiter)
if s.size-s.cur < w.n {
// 沒有足夠資源爲下一個調用者使用時,繼續阻塞該調用者,遵循先進先出的原則,
// 避免需要資源數比較大的waiter被餓死
//
// 考慮一個場景,使用信號量作爲讀寫鎖,現有N個令牌,N個reader和一個writer
// 每個reader都可以通過Acquire(1)獲取讀鎖,writer寫入可以通過Acquire(N)獲得寫鎖定
// 但不包括所有的reader,如果我們允許reader在隊列中前進,writer將會餓死-總是有一個令牌可供每個reader
break
}
// 獲取資源
s.cur += w.n
// 從waiter列表中移除
s.waiters.Remove(next)
// 使用channel的close機制喚醒waiter
close(w.ready)
}
}
這裏只需要注意一個點:喚醒waiter
採用先進先出的原則,避免需要資源數比較大的 waiter 被餓死。
何時使用Semaphore
到這裏我們就把Semaphore
的源代碼看了一篇,代碼行數不多,封裝的也很巧妙,那麼我們該什麼時候選擇使用它呢?
目前能想到一個場景就是Semaphore
配合上errgroup
實現一個 "工作池",使用Semaphore
限制goroutine
的數量,配合上errgroup
做併發控制,示例如下:
const (
limit = 2
)
func main() {
serviceName := []string{
"cart",
"order",
"account",
"item",
"menu",
}
eg,ctx := errgroup.WithContext(context.Background())
s := semaphore.NewWeighted(limit)
for index := range serviceName{
name := serviceName[index]
if err := s.Acquire(ctx,1); err != nil{
fmt.Printf("Acquire failed and err is %s\n", err.Error())
break
}
eg.Go(func() error {
defer s.Release(1)
return callService(name)
})
}
if err := eg.Wait(); err != nil{
fmt.Printf("err is %s\n", err.Error())
return
}
fmt.Printf("run success\n")
}
func callService(name string) error {
fmt.Println("call ",name)
time.Sleep(1 * time.Second)
return nil
}
結果如下:
call order
call cart
call account
call item
call menu
run success
總結
本文我們主要賞析了Go
官方擴展庫Semaphore
的實現,他的設計思路簡單,僅僅用幾十行就完成了完美的封裝,值得我們借鑑學習。不過在實際業務場景中,我們使用信號量的場景並不多,大多數場景我們都可以使用channel
來替代,但是有些場景使用Semaphore
來實現會更好,比如上篇文章【[警惕] 請勿濫用 goroutine】我們使用channel+sync
來控制goroutine
數量,這種實現方式並不好,因爲實際已經起來了多個goroutine
,只不過控制了工作的goroutine
數量,如果改用semaphore
實現纔是真正的控制了goroutine
數量。
文中代碼已上傳github
:https://github.com/asong2020/Golang_Dream/blob/master/code_demo/semaphore_demo/semaphore.go,歡迎star
。
創建了一個 Golang 學習交流羣,歡迎各位大佬們踊躍入羣,我們一起學習交流。入羣方式:關注公衆號獲取。更多學習資料請到公衆號領取。
本文由 Readfog 進行 AMP 轉碼,版權歸原作者所有。
來源:https://mp.weixin.qq.com/s/DLw_R7-76XFoOTUVPmixLg