Go 異步編程小技巧

我們通過一個簡單的例子看一下 Goroutine 的使用

func main() {
    go func() {
        fmt.Println("Goroutine started")
        // do some work
        fmt.Println("Goroutine finished")
    }()
    // wait for Goroutine to finish
    time.Sleep(time.Second)
    fmt.Println("Program finished")
}

這裏我們通過 time.Sleep 來等待協程執行結束在這個例子裏面是沒問題的,因爲我們只是模擬了操作,並沒有在 Goroutine 做任何費時的動作,但是一旦我們的操作超過 1s,這個時候就會因爲主程序結束而導致 Goroutine 沒有正常執行完,我們希望等到執行完成再告訴我們的程序可以結束了。

這種情況可以在 Goroutine 的外部創建一個 channel,在 Goroutine 內部向 channel 中發送信號,當外部程序讀取到該信號時,便可以退出 Goroutine 的執行。

退出控制

可以使用 chan struct{}chan bool 這樣的 channel,然後在 Goroutine 函數中通過向 channel 發送消息來通知外部程序退出。

func worker(done chan bool) {
    fmt.Println("Goroutine started")
    // do some work
    fmt.Println("Goroutine finished")
    done <- true
}

func main() {
    done := make(chan bool, 1)
    go worker(done)
    // wait for worker to finish
    <-done
    fmt.Println("Program finished")
}

也可以使用 context 來對協程進行控制。

在 Goroutine 函數內部使用 context.Context,並且在外部程序中調用 cancel() 方法,當外部程序調用 cancel() 時,Goroutine 函數會收到一個信號,可以在函數中檢查該信號並退出執行。

func worker(ctx context.Context) {
    fmt.Println("Goroutine started")
    // do some work
    select {
    case <-ctx.Done():
        fmt.Println("Goroutine cancelled")
        return
    default:
        fmt.Println("Goroutine finished")
    }
}

func main() {
    ctx, cancel := context.WithTimeout(context.Background(), time.Second)
    defer cancel()
    go worker(ctx)
    // wait for worker to finish
    time.Sleep(2 * time.Second)
    fmt.Println("Program finished")
}

如果沒有正確使用 context 會產生內存泄露。

那 context 結構是什麼樣的?context 是 一個樹狀的結構,父 context 取消的話,從對應父 context 衍生出來的子 context 都會被取消,所以才能夠實現我們的線程控制。

如果在 Goroutine 中執行的任務需要長時間運行,例如 I/O 操作或者阻塞操作,應該使用超時控制來防止 Goroutine 長時間阻塞。可以使用 time.After()time.Tick() 來實現超時控制。

func worker() {
    fmt.Println("Goroutine started")
    select {
    case <-time.After(3 * time.Second):
        fmt.Println("Goroutine timeout")
    case <-time.After(5 * time.Second):
        fmt.Println("Goroutine finished")
    }
}

func main() {
    go worker()
    // wait for worker to finish
    time.Sleep(6 * time.Second)
    fmt.Println("Program finished")
}

也可以直接使用 context 進行超時控制

waitgroup 的也是我們控制協程退出的機制,但是每次使用都需要去 AddDone ,我們通過一個簡單的封裝來減少這個操作,避免忘記 Add 或者 Done 導致程序不符合預期

import "sync"

type Group struct {
 wg sync.WaitGroup
}

func (g *Group) Wait() {
 g.wg.Wait()
}

func (g *Group) Start(f func()) {
 g.wg.Add(1)
 go func() {
  defer g.wg.Done()
  f()
 }()
}

這裏最主要的是 Start 方法,內部將 Add 和 Done 進行了封裝,雖然只有短短的幾行代碼,卻能夠讓我們每次使用 waitgroup 的時候不會忘記去對計數器增加一和完成計數器。

數據安全

如何確保 Goroutine 的數據操作安全性?

可以用 Channel 來保證數據操作的安全

package main

import (
 "fmt"
 "time"
)

type Counter struct {
 ch    chan int
 value int
}

func NewCounter() *Counter {
 c := &Counter{
  ch:    make(chan int, 8),
  value: 0,
 }
 go func() {
  for {
   // 將緩衝區的數據取出來遞增到value上,沒有數據則阻塞
   select {
   case <-c.ch:
    c.value++
   }
  }
 }()
 return c
}

func (c *Counter) Inc() {
 // 寫入到管道中
 c.ch <- 1
}

func (c *Counter) Value() int {
 if len(c.ch) > 0 {
  <-c.ch
  c.value++
 }
 return c.value
}

func main() {
 c := NewCounter()
 for i := 0; i < 10; i++ {
  go func() {
   c.Inc()
  }()
 }
 time.Sleep(2 * time.Second)

 fmt.Print(c.value)
}

也可以通過 mutex 來保證數據的安全

package main

import (
 "fmt"
 "sync"
 "time"
)

type Counter struct {
 mu    sync.Mutex
 value int
}

func (c *Counter) Inc() {
 c.mu.Lock()
 defer c.mu.Unlock()
 c.value++
}

func (c *Counter) Value() int {
 c.mu.Lock()
 defer c.mu.Unlock()
 return c.value
}

func main() {
 counter := Counter{}

 for i := 0; i < 10; i++ {
  go func() {
   for j := 0; j < 10000; j++ {
    counter.Inc()
   }
  }()
 }

 time.Sleep(time.Second)
 fmt.Println(counter.Value())
}

這個遞增的例子其實用 metux 會更好的理解,這裏主要展示在不同場景下,可以靈活的去選擇自己更好的方式進行併發數據安全的實現。

在 Kubernetes 中我們能看到很多的修改都是通過寫入 channel 後再去執行,這樣能保證單協程規避併發問題,也能夠將生產和消費進行解耦。

但是如果我們僅僅只是通過上鎖來修改 map ,那這個時候 channel 的性能就遠不如直接上鎖來的好,我們看以下的代碼進行性能測試。

writeToMapWithMutex 是通過加鎖的方式來操作 map,而 writeToMapWithChannel 則是寫入 channel 後再由另一個協程去消費。

package map_modify

import (
 "sync"
)

const mapSize = 1000
const numIterations = 100000

func writeToMapWithMutex() {
 m := make(map[int]int)
 var mutex sync.Mutex

 for i := 0; i < numIterations; i++ {
  mutex.Lock()
  m[i%mapSize] = i
  mutex.Unlock()
 }
}

func writeToMapWithChannel() {
 m := make(map[int]int)
 ch := make(chan struct {
  key   int
  value int
 }, 256)

 var wg sync.WaitGroup
 go func() {
  wg.Add(1)
  for {
   entry, ok := <-ch
   if !ok {
    wg.Done()
    return
   }
   m[entry.key] = entry.value
  }
 }()

 for i := 0; i < numIterations; i++ {
  ch <- struct {
   key   int
   value int
  }{i % mapSize, i}
 }
 close(ch)
 wg.Wait()
}

通過 benchmark 進行測試

go test -bench .

goos: windows
goarch: amd64
pkg: golib/examples/map_modify
cpu: Intel(R) Core(TM) i7-9700 CPU @ 3.00GHz
BenchmarkMutex-8             532           2166059 ns/op
BenchmarkChannel-8           186           6409804 ns/op

可以看到直接加鎖修改 map 的效率是更高的,所以在修改不復雜的情況下我們優先選擇直接 sync.Mutex 來規避併發修改的問題

限制數量

可以通過 帶緩衝區的 channel 來控制,如果緩衝區已經被寫滿,則需要等待其他執行完的 Goroutine 將數據讀走再繼續操作。

package main
import (
    "sync"
    "fmt"
)
// 控制併發度爲5
const N = 5

var APIList = []string{"a""b""c""d""e""f""g""h""i""j"}

func main() {
    var(
        c = make(chan struct{},N)
        wg sync.WaitGroup
    )

    for i := 0; i < len(APIList) ;i++{
        c <- struct{}{}
        wg.Add(1)
        go func(index int){
            fmt.Printf("%s", APIList[index])
            _ = <-c
            wg.Done()
        }(i)
    }
    wg.Wait()
    fmt.Println()
    fmt.Print("done")
}

我們可以通過信號量來控制是否要運行協程,從而達到控制併發數量的目的。

type BoundedFrequencyRunner struct {
 sync.Mutex

 // 主動觸發
 run chan struct{}

 // 定時器限制
 timer *time.Timer

 // 真正執行的邏輯
 fn func()
}

func NewBoundedFrequencyRunner(fn func()) *BoundedFrequencyRunner {
 return &BoundedFrequencyRunner{
  run:   make(chan struct{}, 1),
  fn:    fn,
  timer: time.NewTimer(0),
 }
}

邊界運行期的結構體,可以看到它持有了需要執行的邏輯,還有定時器和主動觸發的 chan,都是爲了能夠讓我們在初始化之後能夠正常執行方法

// Run 觸發執行 ,這裏只能夠寫入一個信號量,多餘的直接丟棄,不會阻塞,這裏也可以根據自己的需要增加排隊的個數
func (b *BoundedFrequencyRunner) Run() {
 select {
 case b.run <- struct{}{}:
  fmt.Println("寫入信號量成功")
 default:
  fmt.Println("已經觸發過一次,直接丟棄信號量")
 }
}

func (b *BoundedFrequencyRunner) Loop() {
 b.timer.Reset(time.Second * 1)
 for {
  select {
  case <-b.run:
   fmt.Println("run 信號觸發")
   b.tryRun()
  case <-b.timer.C:
   fmt.Println("timer 觸發執行")
   b.tryRun()
  }
 }
}

func (b *BoundedFrequencyRunner) tryRun() {
 b.Lock()
 defer b.Unlock()
 // 可以增加限流器等限制邏輯
 b.timer.Reset(time.Second * 1)
 b.fn()
}

panic 的處理

可以使用 recover 函數來捕獲 Goroutine 中的 panic,並進行相應的處理,如果沒有對相應的 Goroutine 進行異常處理,會導致主線程 panic

所以我們可以通過代碼的封裝,不直接使用 go 關鍵字,來將每次啓動的協程都帶上 recover 。通過封裝 HandleCrash 方法來實現,這個方式也是 Kubernets 中的實現。

package runtime

var (
 ReallyCrash = true
)

// 全局默認的Panic處理
var PanicHandlers = []func(interface{}){logPanic}

// 允許外部傳入額外的異常處理
func HandleCrash(additionalHandlers ...func(interface{})) {
 if r := recover(); r != nil {
  for _, fn := range PanicHandlers {
   fn(r)
  }
  for _, fn := range additionalHandlers {
   fn(r)
  }
  if ReallyCrash {
   panic(r)
  }
 }
}

這裏既支持了內部異常的函數處理,也支持外部傳入額外的異常處理,如果不想要 Crash 的話也可以自己進行修改。

package runtime

func Go(fn func()) {
 go func() {
  defer HandleCrash()
  fn()
 }()
}

要起協程的時候可以通過 Go 方法來執行,這樣也避免了自己忘記增加 panic 的處理。

底層實現

go 關鍵字啓動後編譯器器會通過 cmd/compile/internal/gc.state.stmt 和 cmd/compile/internal/gc.state.call 兩個方法將該關鍵字轉換成 runtime.newproc 函數調用。

啓動一個新的 Goroutine 來執行任務時,會通過 runtime.newproc 初始化一個 g 來運行協程。

在 Go 的運行時(runtime)系統中,G 表示 Goroutine,M 表示 Machine(即操作系統線程),P 表示 Processor。其中,G 是 Goroutine 執行的實體,M 是 Goroutine 的承載者,P 是調度器。

Goroutine 在 Go 運行時(runtime)系統中可以有以下 9 種狀態:

  1. Gidle:Goroutine 處於空閒狀態,即沒有被創建或者被回收;

  2. Grunnable:Goroutine 可以被調度器調度執行,但是還未被選中執行;

  3. Grunning:Goroutine 正在執行中,被賦予了 M 和 P 的資源;

  4. Gsyscall:Goroutine 發起了系統調用,進入系統調用阻塞狀態;

  5. Gwaiting:Goroutine 被阻塞等待某個事件的發生,比如等待 I/O、等待鎖、等待 channel 等;

  6. Gscan:GC 正在掃描棧空間

  7. Gdead:沒有正在執行的用戶代碼

  8. Gcopystack:棧正在被拷貝,沒有正在執行的代碼

  9. Gpreempted:Goroutine 被搶佔,即在運行過程中被調度器中斷。等待重新喚醒

在 Go 中,每個 Goroutine 都是由 Go 運行時調度器(Scheduler)進行調度的。調度器負責將 Goroutine 轉換成線程上的執行上下文,並在多個線程之間分配 Goroutine 的執行。

Go 調度器的調度策略是基於協作式調度的。 也就是說,調度器會在 Goroutine 主動讓出執行權(例如在 I/O 操作、channel 操作、time.Sleep() 等操作中)時,將 CPU 的執行權轉交給其他 Goroutine。這種調度策略可以保證 Goroutine 之間的調度是非常輕量級的。

在 Go 中,Goroutine 的調度時機一般有以下幾種情況:

  1. 當前 Goroutine 主動讓出執行權時,調度器會將 CPU 的執行權轉交給其他 Goroutine。

  2. 當前 Goroutine 執行的時間超過了 Go 運行時所設置的閾值時,調度器會將當前 Goroutine 暫停,將 CPU 的執行權轉交給其他 Goroutine。

  3. 當前 Goroutine 進行 I/O 操作、channel 操作或者其他系統調用時,調度器會將當前 Goroutine 暫停,將 CPU 的執行權轉交給其他 Goroutine。

  4. 當前 Goroutine 被阻塞在同步原語(例如 sync.Mutex)時,調度器會將當前 Goroutine 暫停,將 CPU 的執行權轉交給其他 Goroutine。

需要注意的是,在 Go 中 Goroutine 的調度是非確定性的,也就是說,Goroutine 之間的調度是不可預測的。這種調度策略可以保證 Goroutine 的執行具有隨機性,可以充分利用多核 CPU 的性能

具體性能優勢的大小,取決於應用程序的具體實現、硬件環境和應用場景等因素,因此無法給出一個具體的數字來衡量其性能優勢的大小。在不同的場景下,性能優勢的差異也會有所不同。

在實現上採用的是用戶態調度,不需要進行內核態和用戶態之間的切換,從而可以更快地切換和調度多個 Goroutine。相比之下,傳統的線程需要佔用更多的資源和時間,因此在多併發的情況下,Go 的 Goroutine 會更加高效。

在實際應用中,要根據具體的場景和需求來選擇合適的併發方式,不能盲目地追求 Goroutine 的性能優勢而忽略其他的因素。

舉個例子如果併發去訪問同一個庫,如果併發度是 10 的話,那麼 QPS 的量將會被擴大 10 倍,如果這個時候數據庫扛不住對應的併發,會造成雪崩的情況,所以這種時候並不適合用併發來優化程序的性能。

資源的消耗

1. 內存的消耗

因爲打開 Goroutine 時需要有對應的數據結構來存儲,所以會產生內存的消耗。

通過開啓協程並進行阻塞,來查看前後內存的變化情況

func getGoroutineMemConsume() {
 var c chan int var wg sync.WaitGroup
 const goroutineNum = 1000000 memConsumed := func() uint64 {
  runtime.GC() //GC,排除對象影響  var memStat runtime.MemStats
  runtime.ReadMemStats(&memStat)
  return memStat.Sys
 }

 noop := func() {
  wg.Done()
  <-c //防止goroutine退出,內存被釋放 
 }

 wg.Add(goroutineNum)
 before := memConsumed() //獲取創建goroutine前內存 for i := 0; i < goroutineNum; i++ {
  go noop()
 }
 wg.Wait()
 after := memConsumed() //獲取創建goroutine後內存 fmt.Println(runtime.NumGoroutine())
 fmt.Printf("%.3f KB bytes\n", float64(after-before)/goroutineNum/1024)
}

每個協程至少需要消耗 2KB 的空間,那麼假設計算機的內存是 2GB,那麼至多允許 2GB/2KB = 1M 個協程同時存在。

2.CPU 的消耗

因爲開啓 goroutine 後需要進行調度,而且每次開啓一個任務時,執行任務也會佔用 CPU。

一個 Goroutine 消耗多少 CPU 實際上跟執行函數的邏輯有着很大的關係,如果執行的函數是 CPU 密集型的計算,並且持續的時間很長,那麼這個時候 CPU 就會優先到達瓶頸。

所以具體的 CPU 消耗需要看具體的邏輯才能夠進行判斷。

寫在最後

理解 Goroutine 使用和原理,對我們在學習雲原生的知識是有極大的幫助的,在 kubernetes 中用了非常多的異步編程技巧,如果我們沒有異步編程的知識儲備,那代碼看起來會是雲裏霧裏的,瞭解了上面這些協程的用法之後,在閱讀 go 相關項目的時候也會如虎添翼,幫助我們快速理解。

本文由 Readfog 進行 AMP 轉碼,版權歸原作者所有。
來源https://mp.weixin.qq.com/s/AvrfEHOj72cXttQDTv9sgw