Go 官方設計了一個信號量庫

前言

哈嘍,大家好,我是asong。在寫上一篇文章請勿濫用 goroutine 時,發現Go語言擴展包提供了一個帶權重的信號量庫 Semaphore,使用信號量我們可以實現一個 "工作池" 控制一定數量的goroutine併發工作。因爲對源碼抱有好奇的態度,所以在週末仔細看了一下這個庫並進行了解析,在這裏記錄一下。

何爲信號量

要想知道一個東西是什麼,我都愛去百度百科上搜一搜,輸入 "信號量",這答案不就來了。

百度百科解釋:

信號量 (Semaphore),有時被稱爲信號燈,是 [多線程環境下使用的一種設施,是可以用來保證兩個或多個關鍵代碼段不被併發調用。在進入一個關鍵代碼段之前,線程必須獲取一個信號量;一旦該關鍵代碼段完成了,那麼該線程必須釋放信號量。其它想進入該關鍵代碼段的線程必須等待直到第一個線程釋放信號量。爲了完成這個過程,需要創建一個信號量 VI,然後將 Acquire Semaphore VI 以及 Release Semaphore VI 分別放置在每個關鍵代碼段的首末端。確認這些信號量 VI 引用的是初始創建的信號量。

通過這段解釋我們可以得知什麼是信號量,其實信號量就是一種變量或者抽象數據類型,用於控制併發系統中多個進程對公共資源的訪問,訪問具有原子性。信號量主要分爲兩類:

信號量工作原理

信號量是由操作系統來維護的,信號量只能進行兩種操作等待和發送信號,操作總結來說,核心就是PV操作:

在信號量進行 PV 操作時都爲原子操作,並且在 PV 原語執行期間不允許有中斷的發生。

PV 原語對信號量的操作可以分爲三種情況:

具體在什麼場景使用本文就不在繼續分析,接下來我們重點來看一下Go語言提供的擴展包Semaphore,看看它是怎樣實現的。

官方擴展包Semaphore

我們之前在分析Go語言源碼時總會看到這幾個函數:

func runtime_Semacquire(s *uint32)
func runtime_SemacquireMutex(s *uint32, lifo bool, skipframes int)
func runtime_Semrelease(s *uint32, handoff bool, skipframes int)

這幾個函數就是信號量的PV操作,不過他們都是給Go內部使用的,如果想使用信號量,那就可以使用官方的擴展包:Semaphore,這是一個帶權重的信號量,接下來我們就重點分析一下這個庫。

安裝方法:go get -u golang.org/x/sync

數據結構

type Weighted struct {
 size    int64 // 設置一個最大權值
 cur     int64 // 標識當前已被使用的資源數
 mu      sync.Mutex // 提供臨界區保護
 waiters list.List // 阻塞等待的調用者列表
}

semaphore庫核心結構就是Weighted,主要有4個字段:

type waiter struct {
 n     int64 // 等待調用者權重值
 ready chan<- struct{} // close channel就是喚醒
}

這裏只有兩個字段:

semaphore還提供了一個創建Weighted對象的方法,在初始化時需要給定最大權值:

// NewWeighted爲併發訪問創建一個新的加權信號量,該信號量具有給定的最大權值。
func NewWeighted(n int64) *Weighted {
 w := &Weighted{size: n}
 return w
}

阻塞獲取權值的方法 - Acquire

先直接看代碼吧:

func (s *Weighted) Acquire(ctx context.Context, n int64) error {
 s.mu.Lock() // 加鎖保護臨界區
 // 有資源可用並且沒有等待獲取權值的goroutine
 if s.size-s.cur >= n && s.waiters.Len() == 0 {
  s.cur += n // 加權
  s.mu.Unlock() // 釋放鎖
  return nil
 }
 // 要獲取的權值n大於最大的權值了
 if n > s.size {
  // 先釋放鎖,確保其他goroutine調用Acquire的地方不被阻塞
  s.mu.Unlock()
  // 阻塞等待context的返回
  <-ctx.Done()
  return ctx.Err()
 }
 // 走到這裏就說明現在沒有資源可用了
 // 創建一個channel用來做通知喚醒
 ready := make(chan struct{})
 // 創建waiter對象
 w := waiter{n: n, ready: ready}
 // waiter按順序入隊
 elem := s.waiters.PushBack(w)
 // 釋放鎖,等待喚醒,別阻塞其他goroutine
 s.mu.Unlock()

 // 阻塞等待喚醒
 select {
 // context關閉
 case <-ctx.Done():
  err := ctx.Err() // 先獲取context的錯誤信息
  s.mu.Lock()
  select {
  case <-ready:
   // 在context被關閉後被喚醒了,那麼試圖修復隊列,假裝我們沒有取消
   err = nil
  default:
   // 判斷是否是第一個元素
   isFront := s.waiters.Front() == elem
   // 移除第一個元素
   s.waiters.Remove(elem)
   // 如果是第一個元素且有資源可用通知其他waiter
   if isFront && s.size > s.cur {
    s.notifyWaiters()
   }
  }
  s.mu.Unlock()
  return err
 // 被喚醒了
 case <-ready:
  return nil
 }
}

註釋已經加到代碼中了,總結一下這個方法主要有三個流程:

不阻塞獲取權值的方法 - TryAcquire

func (s *Weighted) TryAcquire(n int64) bool {
 s.mu.Lock() // 加鎖
 // 有資源可用並且沒有等待獲取資源的goroutine
 success := s.size-s.cur >= n && s.waiters.Len() == 0
 if success {
  s.cur += n
 }
 s.mu.Unlock()
 return success
}

這個方法就簡單很多了,不阻塞地獲取權重爲n的信號量,成功時返回true,失敗時返回false並保持信號量不變。

釋放權重

func (s *Weighted) Release(n int64) {
 s.mu.Lock()
 // 釋放資源
 s.cur -= n
 // 釋放資源大於持有的資源,則會發生panic
 if s.cur < 0 {
  s.mu.Unlock()
  panic("semaphore: released more than held")
 }
 // 通知其他等待的調用者
 s.notifyWaiters()
 s.mu.Unlock()
}

這裏就是很常規的操作,主要就是資源釋放,同時進行安全性判斷,如果釋放資源大於持有的資源,則會發生 panic。

喚醒waiter

AcquireRelease方法中都調用了notifyWaiters,我們來分析一下這個方法:

func (s *Weighted) notifyWaiters() {
 for {
  // 獲取等待調用者隊列中的隊員
  next := s.waiters.Front()
  // 沒有要通知的調用者了
  if next == nil {
   break // No more waiters blocked.
  }

  // 斷言出waiter信息
  w := next.Value.(waiter)
  if s.size-s.cur < w.n {
   // 沒有足夠資源爲下一個調用者使用時,繼續阻塞該調用者,遵循先進先出的原則,
   // 避免需要資源數比較大的waiter被餓死
   //
   // 考慮一個場景,使用信號量作爲讀寫鎖,現有N個令牌,N個reader和一個writer
   // 每個reader都可以通過Acquire(1)獲取讀鎖,writer寫入可以通過Acquire(N)獲得寫鎖定
   // 但不包括所有的reader,如果我們允許reader在隊列中前進,writer將會餓死-總是有一個令牌可供每個reader
   break
  }

  // 獲取資源
  s.cur += w.n
  // 從waiter列表中移除
  s.waiters.Remove(next)
  // 使用channel的close機制喚醒waiter
  close(w.ready)
 }
}

這裏只需要注意一個點:喚醒waiter採用先進先出的原則,避免需要資源數比較大的 waiter 被餓死。

何時使用Semaphore

到這裏我們就把Semaphore的源代碼看了一篇,代碼行數不多,封裝的也很巧妙,那麼我們該什麼時候選擇使用它呢?

目前能想到一個場景就是Semaphore配合上errgroup實現一個 "工作池",使用Semaphore限制goroutine的數量,配合上errgroup做併發控制,示例如下:

const (
 limit = 2
) 

func main()  {
 serviceName := []string{
  "cart",
  "order",
  "account",
  "item",
  "menu",
 }
 eg,ctx := errgroup.WithContext(context.Background())
 s := semaphore.NewWeighted(limit)
 for index := range serviceName{
  name := serviceName[index]
  if err := s.Acquire(ctx,1); err != nil{
   fmt.Printf("Acquire failed and err is %s\n", err.Error())
   break
  }
  eg.Go(func() error {
   defer s.Release(1)
   return callService(name)
  })
 }

 if err := eg.Wait(); err != nil{
  fmt.Printf("err is %s\n", err.Error())
  return
 }
 fmt.Printf("run success\n")
}

func callService(name string) error {
 fmt.Println("call ",name)
 time.Sleep(1 * time.Second)
 return nil
}

結果如下:

call  order
call  cart
call  account
call  item
call  menu
run success

總結

本文我們主要賞析了Go官方擴展庫Semaphore的實現,他的設計思路簡單,僅僅用幾十行就完成了完美的封裝,值得我們借鑑學習。不過在實際業務場景中,我們使用信號量的場景並不多,大多數場景我們都可以使用channel來替代,但是有些場景使用Semaphore來實現會更好,比如上篇文章【[警惕] 請勿濫用 goroutine】我們使用channel+sync來控制goroutine數量,這種實現方式並不好,因爲實際已經起來了多個goroutine,只不過控制了工作的goroutine數量,如果改用semaphore實現纔是真正的控制了goroutine數量。

文中代碼已上傳github:https://github.com/asong2020/Golang_Dream/blob/master/code_demo/semaphore_demo/semaphore.go,歡迎star

創建了一個 Golang 學習交流羣,歡迎各位大佬們踊躍入羣,我們一起學習交流。入羣方式:關注公衆號獲取。更多學習資料請到公衆號領取。

本文由 Readfog 進行 AMP 轉碼,版權歸原作者所有。
來源https://mp.weixin.qq.com/s/DLw_R7-76XFoOTUVPmixLg