Go 併發控制 Wait - Cancel

Wait 和 Cancel 兩種併發控制方式,在使用 Go 開發服務的時候到處都有體現,只要使用了併發就會用到這兩種模式。

在 Go 語言中,分別有 sync.WaitGroup 和 context.Context 來實現這兩種模式。

sync.WaitGroup 等待多個線程完成

對於要等待 n 個線程完成後再進行下一步的同步操作的做法,使用 sync.WaitGroup 來等待一組事件:

func main() {
 var wg sync.WaitGroup
 // 開N個後臺打印線程
 for i := 0; i < 10; i++ {
  wg.Add(1)
  go func() {
   defer wg.Done()
   fmt.Println("你好, 世界")
  }()
 }
 // 等待 N 個後臺線程完成
 wg.Wait()
}

每個 sync.WaitGroup 值內部維護着一個計數。此計數的初始值爲 0。如果一個 sync.WaitGroup 值的 Wait 方法在此計數爲 0 的時候被調用,則此調用不會阻塞,否則此調用將一直阻塞到此計數變爲 0 爲止。

爲了讓一個 WaitGroup 值的使用有意義,在此值的計數爲 0 的情況下,對它的下一次 Add 方法的調用必須出現在對它的下一次 Wait 方法的調用之前,即 Add 方法的調用在協程之外

e.g.

func worker(args ...interface{}) {
 if len(args) == 0 {
  return
 }

 interval, ok := args[0].(int)
  if !ok {
   return
 }

 time.Sleep(time.Second * (time.Duration(interval)))
}

func spawnGroup(n int, f func(args ...interface{}), args ...interface{}) chan struct{} {
 c := make(chan struct{})
 var wg sync.WaitGroup

 for i := 0; i < n; i++ {
  wg.Add(1)
  go func(i int) {
   defer wg.Done()
   name := fmt.Sprintf("worker-%d:", i)
   f(args...)
   fmt.Println(name, "done")
  }(i)
 }

 go func() {
  wg.Wait()
  c <- struct{}{}
 }()

 return c
}

func main() {
 done := spawnGroup(5, worker, 3)
 fmt.Println("spawn a group of workers")
 <-done
 fmt.Println("group workers done")
}
context.Context 超時控制和資源回收
  1. context.Context 類型的值可以協調多個 groutine 中的代碼執行 “取消” 操作,並且可以存儲鍵值對,最重要的是它是併發安全的。

  2. 與它協作的 API 都可以由外部控制執行 “取消” 操作,例如:取消一個 HTTP 請求的執行。

Go 語言是帶內存自動回收的特性,因此內存一般不會泄漏。但是 goroutine 的確存在泄漏的情況,同時泄漏的 goroutine 引用的內存同樣無法被回收。

package main

import (
 "fmt"
)

func main() {
 ch := func() <-chan int {
  ch := make(chan int)
  go func() {
   for i := 0; ; i++ {
    ch <- i
   }
  }()
  return ch
 }()

 for v := range ch {
  fmt.Println(v)
  if v == 5 {
   break
  }
 }
}

上面的程序中後臺 goroutine 向管道輸入自然數序列,main 函數中輸出序列。但是當 break 跳出 for 循環的時候,後臺 goroutine 就處於無法被回收的狀態了。

我們可以通過 context 包來避免這個問題:

package main

import (
 "context"
 "fmt"
)

func main() {
 ctx, cancel := context.WithCancel(context.Background())

 ch := func(ctx context.Context) <-chan int {
  ch := make(chan int)
  go func() {
   for i := 0; ; i++ {
    select {
    case <-ctx.Done():
     return
    case ch <- i:
    }
   }
  }()
  return ch
 }(ctx)

 for v := range ch {
  fmt.Println(v)
  if v == 5 {
   cancel()
   break
  }
 }
}

當 main 函數在 break 跳出循環時,通過調用 cancel 來通知後臺 goroutine 退出,這樣就避免了 goroutine 的泄漏。

  1. context.Background 是上下文的默認值,所有其他的上下文都應該從它衍生出來。

  2. context.WithCancel 函數能夠從 context.Context 中衍生出一個新的子上下文並返回用於取消該上下文的函數。一旦我們執行返回的取消函數,當前上下文以及它的子上下文都會被取消,所有的 goroutine 都會同步收到這一取消信號。

e.g.

使用 httptest 包的 NewServer 函數創建了三個模擬的 “氣象數據服務中心”,然後將這三個“氣象數據服務中心” 的實例傳入 first 函數。後者創建了三個 goroutine,每個 goroutine 對應向一個 “氣象數據服務中心” 發起查詢請求。

  1. 三個發起查詢的 goroutine 都會將應答結果寫入同一個 channel 中,first 獲取第一個結果數據後就返回了。

  2. 通過增加一個定時器,並通過 select 原語監視該定時器事件和響應 channel 上的事件。如果響應 channel 上長時間沒有數據返回,則當定時器事件觸發後,first 函數返回。

  3. 加上了 “超時模式” 的版本依然有一個明顯的問題,那就是即便 first 函數因超時返回,三個已經創建的 goroutine 可能依然處在向 “氣象數據服務中心” 請求或等待應答中,沒有返回,也沒有被回收,資源仍然在佔用,即使它們的存在已經沒有了任何意義。一種合理的解決思路是讓這三個 goroutine 支持 “取消” 操作。這種情況下,我們一般使用 Go 的 context 包來實現 “取消” 模式。

package main

import (
 "context"
 "errors"
 "fmt"
 "io"
 "log"
 "net/http"
 "net/http/httptest"
 "time"
)

type result struct {
 value string
}

func first(servers ...*httptest.Server) (result, error) {
 c := make(chan result)
 ctx, cancel := context.WithCancel(context.Background())
 defer cancel()

 queryFunc := func(i int, server *httptest.Server) {
  url := server.URL
  req, err := http.NewRequest("GET", url, nil)
  if err != nil {
   log.Printf("query goroutine-%d: http NewRequest error: %s\n", i, err)
   return
  }
  req = req.WithContext(ctx)

  log.Printf("query goroutine-%d: send request...\n", i)
  resp, err := http.DefaultClient.Do(req)
  if err != nil {
   log.Printf("query goroutine-%d: get return error: %s\n", i, err)
   return
  }
  log.Printf("query goroutine-%d: get response\n", i)
  defer resp.Body.Close()
  body, _ := io.ReadAll(resp.Body)

  c <- result{
   value: string(body),
  }
 return
 }

 for i, serv := range servers {
  go queryFunc(i, serv)
 }

 select {
 case r := <-c:
  return r, nil
 case <-time.After(500 * time.Millisecond):
  return result{}, errors.New("timeout")
 }
}

func fakeWeatherServer(name string, interval int) *httptest.Server {
 return httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
  log.Printf("%s receive a http request\n", name)
  time.Sleep(time.Duration(interval) * time.Millisecond)
  w.Write([]byte(name + ":ok"))
 }))
}

func main() {
 result, err := first(
  fakeWeatherServer("open-weather-1", 200),
  fakeWeatherServer("open-weather-2", 1000),
  fakeWeatherServer("open-weather-3", 600),
 )
 if err != nil {
  log.Println("invoke first error:", err)
  return
 }

 fmt.Println(result)
 time.Sleep(10 * time.Second)
}

利用 context.WithCancel 創建了一個可以取消的 context.Context 變量,在每個發起查詢請求的 goroutine 中,我們用該變量更新了 request 中的 ctx 變量,使其支持 “被取消”。

這樣在 first 函數中,無論是成功得到某個查詢 goroutine 的返回結果,還是超時失敗返回,通過 defer cancel() 設定 cancel 函數在 first 函數返回前被執行,那些尚未返回的在途中查詢的 goroutine 都將收到 cancel 事件並退出 ( http 包支持利用 context.Context 的超時和 cancel 機制)。

e.g.

http 包支持利用 context.Context 的超時機制。

 // Create a new context
 // With a deadline of 100 milliseconds
 ctx, _ := context.WithTimeout(context.Background(), 100*time.Millisecond)

 // Make a request, that will call the google homepage
 req, _ := http.NewRequest(http.MethodGet, "http://google.com", nil)
 // Associate the cancellable context we just created to the request
 req = req.WithContext(ctx)

 // Create a new HTTP client and execute the request
 client := http.DefaultClient
 res, err := client.Do(req)
 // If the request failed, log to STDOUT
 if err != nil {
  fmt.Println("Request failed:", err)
  return
 }
 // Print the statuscode if the request succeeds
 fmt.Println("Response received, status code:", res.StatusCode)
}

e.g.

在下面這段代碼中,我們創建了一個過期時間爲 1s 的上下文,並向上下文傳入 handle 函數,該方法會使用 500ms 的時間處理傳入的請求:

func handle(ctx context.Context, duration time.Duration) {
 select {
 case <-ctx.Done():
  fmt.Println("handle", ctx.Err())
 case <-time.After(duration):
  fmt.Println("process request with", duration)
 }
}

func main() {
 ctx, cancel := context.WithTimeout(context.Background(), time.Second)
 defer cancel()
 go handle(ctx, 500*time.Millisecond)
 select {
 case <-ctx.Done():
  fmt.Println("main", ctx.Err())
 }
 time.Sleep(time.Second)
}

因爲過期時間大於處理時間,所以我們有足夠的時間處理該請求,運行上述代碼會打印出下面的內容:

process request with 500ms
main context deadline exceeded

handle 函數沒有進入超時的 select 分支,但是 main 函數的 select 卻會等待 context.Context 超時並打印出 main context deadline exceeded。

如果我們將處理請求時間增加至 1500ms,整個程序都會因爲上下文的過期而被中止:

handle context deadline exceeded
main context deadline exceeded

相信上面的例子能夠幫助理解 context.Context 的使用方法和設計原理 —— 多個 goroutine 同時訂閱 ctx.Done 管道中的消息,一旦接收到取消信號就立刻停止當前正在執行的工作。

本文由 Readfog 進行 AMP 轉碼,版權歸原作者所有。
來源https://mp.weixin.qq.com/s/sDlaFCX75m6urBkbTEtJ1Q