Go 併發控制 Wait - Cancel
Wait 和 Cancel 兩種併發控制方式,在使用 Go 開發服務的時候到處都有體現,只要使用了併發就會用到這兩種模式。
在 Go 語言中,分別有 sync.WaitGroup 和 context.Context 來實現這兩種模式。
sync.WaitGroup 等待多個線程完成
對於要等待 n 個線程完成後再進行下一步的同步操作的做法,使用 sync.WaitGroup 來等待一組事件:
func main() {
var wg sync.WaitGroup
// 開N個後臺打印線程
for i := 0; i < 10; i++ {
wg.Add(1)
go func() {
defer wg.Done()
fmt.Println("你好, 世界")
}()
}
// 等待 N 個後臺線程完成
wg.Wait()
}
每個 sync.WaitGroup 值內部維護着一個計數。此計數的初始值爲 0。如果一個 sync.WaitGroup 值的 Wait 方法在此計數爲 0 的時候被調用,則此調用不會阻塞,否則此調用將一直阻塞到此計數變爲 0 爲止。
爲了讓一個 WaitGroup 值的使用有意義,在此值的計數爲 0 的情況下,對它的下一次 Add 方法的調用必須出現在對它的下一次 Wait 方法的調用之前,即 Add 方法的調用在協程之外。
e.g.
func worker(args ...interface{}) {
if len(args) == 0 {
return
}
interval, ok := args[0].(int)
if !ok {
return
}
time.Sleep(time.Second * (time.Duration(interval)))
}
func spawnGroup(n int, f func(args ...interface{}), args ...interface{}) chan struct{} {
c := make(chan struct{})
var wg sync.WaitGroup
for i := 0; i < n; i++ {
wg.Add(1)
go func(i int) {
defer wg.Done()
name := fmt.Sprintf("worker-%d:", i)
f(args...)
fmt.Println(name, "done")
}(i)
}
go func() {
wg.Wait()
c <- struct{}{}
}()
return c
}
func main() {
done := spawnGroup(5, worker, 3)
fmt.Println("spawn a group of workers")
<-done
fmt.Println("group workers done")
}
context.Context 超時控制和資源回收
-
context.Context 類型的值可以協調多個 groutine 中的代碼執行 “取消” 操作,並且可以存儲鍵值對,最重要的是它是併發安全的。
-
與它協作的 API 都可以由外部控制執行 “取消” 操作,例如:取消一個 HTTP 請求的執行。
Go 語言是帶內存自動回收的特性,因此內存一般不會泄漏。但是 goroutine 的確存在泄漏的情況,同時泄漏的 goroutine 引用的內存同樣無法被回收。
package main
import (
"fmt"
)
func main() {
ch := func() <-chan int {
ch := make(chan int)
go func() {
for i := 0; ; i++ {
ch <- i
}
}()
return ch
}()
for v := range ch {
fmt.Println(v)
if v == 5 {
break
}
}
}
上面的程序中後臺 goroutine 向管道輸入自然數序列,main 函數中輸出序列。但是當 break 跳出 for 循環的時候,後臺 goroutine 就處於無法被回收的狀態了。
我們可以通過 context 包來避免這個問題:
package main
import (
"context"
"fmt"
)
func main() {
ctx, cancel := context.WithCancel(context.Background())
ch := func(ctx context.Context) <-chan int {
ch := make(chan int)
go func() {
for i := 0; ; i++ {
select {
case <-ctx.Done():
return
case ch <- i:
}
}
}()
return ch
}(ctx)
for v := range ch {
fmt.Println(v)
if v == 5 {
cancel()
break
}
}
}
當 main 函數在 break 跳出循環時,通過調用 cancel 來通知後臺 goroutine 退出,這樣就避免了 goroutine 的泄漏。
-
context.Background 是上下文的默認值,所有其他的上下文都應該從它衍生出來。
-
context.WithCancel 函數能夠從 context.Context 中衍生出一個新的子上下文並返回用於取消該上下文的函數。一旦我們執行返回的取消函數,當前上下文以及它的子上下文都會被取消,所有的 goroutine 都會同步收到這一取消信號。
e.g.
使用 httptest 包的 NewServer 函數創建了三個模擬的 “氣象數據服務中心”,然後將這三個“氣象數據服務中心” 的實例傳入 first 函數。後者創建了三個 goroutine,每個 goroutine 對應向一個 “氣象數據服務中心” 發起查詢請求。
-
三個發起查詢的 goroutine 都會將應答結果寫入同一個 channel 中,first 獲取第一個結果數據後就返回了。
-
通過增加一個定時器,並通過 select 原語監視該定時器事件和響應 channel 上的事件。如果響應 channel 上長時間沒有數據返回,則當定時器事件觸發後,first 函數返回。
-
加上了 “超時模式” 的版本依然有一個明顯的問題,那就是即便 first 函數因超時返回,三個已經創建的 goroutine 可能依然處在向 “氣象數據服務中心” 請求或等待應答中,沒有返回,也沒有被回收,資源仍然在佔用,即使它們的存在已經沒有了任何意義。一種合理的解決思路是讓這三個 goroutine 支持 “取消” 操作。這種情況下,我們一般使用 Go 的 context 包來實現 “取消” 模式。
package main
import (
"context"
"errors"
"fmt"
"io"
"log"
"net/http"
"net/http/httptest"
"time"
)
type result struct {
value string
}
func first(servers ...*httptest.Server) (result, error) {
c := make(chan result)
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
queryFunc := func(i int, server *httptest.Server) {
url := server.URL
req, err := http.NewRequest("GET", url, nil)
if err != nil {
log.Printf("query goroutine-%d: http NewRequest error: %s\n", i, err)
return
}
req = req.WithContext(ctx)
log.Printf("query goroutine-%d: send request...\n", i)
resp, err := http.DefaultClient.Do(req)
if err != nil {
log.Printf("query goroutine-%d: get return error: %s\n", i, err)
return
}
log.Printf("query goroutine-%d: get response\n", i)
defer resp.Body.Close()
body, _ := io.ReadAll(resp.Body)
c <- result{
value: string(body),
}
return
}
for i, serv := range servers {
go queryFunc(i, serv)
}
select {
case r := <-c:
return r, nil
case <-time.After(500 * time.Millisecond):
return result{}, errors.New("timeout")
}
}
func fakeWeatherServer(name string, interval int) *httptest.Server {
return httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
log.Printf("%s receive a http request\n", name)
time.Sleep(time.Duration(interval) * time.Millisecond)
w.Write([]byte(name + ":ok"))
}))
}
func main() {
result, err := first(
fakeWeatherServer("open-weather-1", 200),
fakeWeatherServer("open-weather-2", 1000),
fakeWeatherServer("open-weather-3", 600),
)
if err != nil {
log.Println("invoke first error:", err)
return
}
fmt.Println(result)
time.Sleep(10 * time.Second)
}
利用 context.WithCancel 創建了一個可以取消的 context.Context 變量,在每個發起查詢請求的 goroutine 中,我們用該變量更新了 request 中的 ctx 變量,使其支持 “被取消”。
這樣在 first 函數中,無論是成功得到某個查詢 goroutine 的返回結果,還是超時失敗返回,通過 defer cancel() 設定 cancel 函數在 first 函數返回前被執行,那些尚未返回的在途中查詢的 goroutine 都將收到 cancel 事件並退出 ( http 包支持利用 context.Context 的超時和 cancel 機制)。
e.g.
http 包支持利用 context.Context 的超時機制。
// Create a new context
// With a deadline of 100 milliseconds
ctx, _ := context.WithTimeout(context.Background(), 100*time.Millisecond)
// Make a request, that will call the google homepage
req, _ := http.NewRequest(http.MethodGet, "http://google.com", nil)
// Associate the cancellable context we just created to the request
req = req.WithContext(ctx)
// Create a new HTTP client and execute the request
client := http.DefaultClient
res, err := client.Do(req)
// If the request failed, log to STDOUT
if err != nil {
fmt.Println("Request failed:", err)
return
}
// Print the statuscode if the request succeeds
fmt.Println("Response received, status code:", res.StatusCode)
}
e.g.
在下面這段代碼中,我們創建了一個過期時間爲 1s 的上下文,並向上下文傳入 handle 函數,該方法會使用 500ms 的時間處理傳入的請求:
func handle(ctx context.Context, duration time.Duration) {
select {
case <-ctx.Done():
fmt.Println("handle", ctx.Err())
case <-time.After(duration):
fmt.Println("process request with", duration)
}
}
func main() {
ctx, cancel := context.WithTimeout(context.Background(), time.Second)
defer cancel()
go handle(ctx, 500*time.Millisecond)
select {
case <-ctx.Done():
fmt.Println("main", ctx.Err())
}
time.Sleep(time.Second)
}
因爲過期時間大於處理時間,所以我們有足夠的時間處理該請求,運行上述代碼會打印出下面的內容:
process request with 500ms
main context deadline exceeded
handle 函數沒有進入超時的 select 分支,但是 main 函數的 select 卻會等待 context.Context 超時並打印出 main context deadline exceeded。
如果我們將處理請求時間增加至 1500ms,整個程序都會因爲上下文的過期而被中止:
handle context deadline exceeded
main context deadline exceeded
相信上面的例子能夠幫助理解 context.Context 的使用方法和設計原理 —— 多個 goroutine 同時訂閱 ctx.Done 管道中的消息,一旦接收到取消信號就立刻停止當前正在執行的工作。
本文由 Readfog 進行 AMP 轉碼,版權歸原作者所有。
來源:https://mp.weixin.qq.com/s/sDlaFCX75m6urBkbTEtJ1Q