Go 異步編程小技巧
我們通過一個簡單的例子看一下 Goroutine 的使用
func main() {
go func() {
fmt.Println("Goroutine started")
// do some work
fmt.Println("Goroutine finished")
}()
// wait for Goroutine to finish
time.Sleep(time.Second)
fmt.Println("Program finished")
}
這裏我們通過 time.Sleep
來等待協程執行結束在這個例子裏面是沒問題的,因爲我們只是模擬了操作,並沒有在 Goroutine
做任何費時的動作,但是一旦我們的操作超過 1s,這個時候就會因爲主程序結束而導致 Goroutine
沒有正常執行完,我們希望等到執行完成再告訴我們的程序可以結束了。
這種情況可以在 Goroutine 的外部創建一個 channel,在 Goroutine 內部向 channel 中發送信號,當外部程序讀取到該信號時,便可以退出 Goroutine 的執行。
退出控制
可以使用 chan struct{}
或 chan bool
這樣的 channel,然後在 Goroutine 函數中通過向 channel 發送消息來通知外部程序退出。
func worker(done chan bool) {
fmt.Println("Goroutine started")
// do some work
fmt.Println("Goroutine finished")
done <- true
}
func main() {
done := make(chan bool, 1)
go worker(done)
// wait for worker to finish
<-done
fmt.Println("Program finished")
}
也可以使用 context
來對協程進行控制。
在 Goroutine 函數內部使用 context.Context
,並且在外部程序中調用 cancel()
方法,當外部程序調用 cancel()
時,Goroutine 函數會收到一個信號,可以在函數中檢查該信號並退出執行。
func worker(ctx context.Context) {
fmt.Println("Goroutine started")
// do some work
select {
case <-ctx.Done():
fmt.Println("Goroutine cancelled")
return
default:
fmt.Println("Goroutine finished")
}
}
func main() {
ctx, cancel := context.WithTimeout(context.Background(), time.Second)
defer cancel()
go worker(ctx)
// wait for worker to finish
time.Sleep(2 * time.Second)
fmt.Println("Program finished")
}
如果沒有正確使用 context 會產生內存泄露。
那 context 結構是什麼樣的?context 是 一個樹狀的結構,父 context 取消的話,從對應父 context 衍生出來的子 context 都會被取消,所以才能夠實現我們的線程控制。
如果在 Goroutine 中執行的任務需要長時間運行,例如 I/O 操作或者阻塞操作,應該使用超時控制來防止 Goroutine 長時間阻塞。可以使用 time.After()
和 time.Tick()
來實現超時控制。
func worker() {
fmt.Println("Goroutine started")
select {
case <-time.After(3 * time.Second):
fmt.Println("Goroutine timeout")
case <-time.After(5 * time.Second):
fmt.Println("Goroutine finished")
}
}
func main() {
go worker()
// wait for worker to finish
time.Sleep(6 * time.Second)
fmt.Println("Program finished")
}
也可以直接使用 context
進行超時控制
waitgroup 的也是我們控制協程退出的機制,但是每次使用都需要去 Add
和 Done
,我們通過一個簡單的封裝來減少這個操作,避免忘記 Add
或者 Done
導致程序不符合預期
import "sync"
type Group struct {
wg sync.WaitGroup
}
func (g *Group) Wait() {
g.wg.Wait()
}
func (g *Group) Start(f func()) {
g.wg.Add(1)
go func() {
defer g.wg.Done()
f()
}()
}
這裏最主要的是 Start
方法,內部將 Add
和 Done
進行了封裝,雖然只有短短的幾行代碼,卻能夠讓我們每次使用 waitgroup
的時候不會忘記去對計數器增加一和完成計數器。
數據安全
如何確保 Goroutine 的數據操作安全性?
可以用 Channel 來保證數據操作的安全
package main
import (
"fmt"
"time"
)
type Counter struct {
ch chan int
value int
}
func NewCounter() *Counter {
c := &Counter{
ch: make(chan int, 8),
value: 0,
}
go func() {
for {
// 將緩衝區的數據取出來遞增到value上,沒有數據則阻塞
select {
case <-c.ch:
c.value++
}
}
}()
return c
}
func (c *Counter) Inc() {
// 寫入到管道中
c.ch <- 1
}
func (c *Counter) Value() int {
if len(c.ch) > 0 {
<-c.ch
c.value++
}
return c.value
}
func main() {
c := NewCounter()
for i := 0; i < 10; i++ {
go func() {
c.Inc()
}()
}
time.Sleep(2 * time.Second)
fmt.Print(c.value)
}
也可以通過 mutex
來保證數據的安全
package main
import (
"fmt"
"sync"
"time"
)
type Counter struct {
mu sync.Mutex
value int
}
func (c *Counter) Inc() {
c.mu.Lock()
defer c.mu.Unlock()
c.value++
}
func (c *Counter) Value() int {
c.mu.Lock()
defer c.mu.Unlock()
return c.value
}
func main() {
counter := Counter{}
for i := 0; i < 10; i++ {
go func() {
for j := 0; j < 10000; j++ {
counter.Inc()
}
}()
}
time.Sleep(time.Second)
fmt.Println(counter.Value())
}
這個遞增的例子其實用 metux
會更好的理解,這裏主要展示在不同場景下,可以靈活的去選擇自己更好的方式進行併發數據安全的實現。
在 Kubernetes 中我們能看到很多的修改都是通過寫入 channel 後再去執行,這樣能保證單協程規避併發問題,也能夠將生產和消費進行解耦。
但是如果我們僅僅只是通過上鎖來修改 map
,那這個時候 channel 的性能就遠不如直接上鎖來的好,我們看以下的代碼進行性能測試。
writeToMapWithMutex
是通過加鎖的方式來操作 map,而 writeToMapWithChannel
則是寫入 channel 後再由另一個協程去消費。
package map_modify
import (
"sync"
)
const mapSize = 1000
const numIterations = 100000
func writeToMapWithMutex() {
m := make(map[int]int)
var mutex sync.Mutex
for i := 0; i < numIterations; i++ {
mutex.Lock()
m[i%mapSize] = i
mutex.Unlock()
}
}
func writeToMapWithChannel() {
m := make(map[int]int)
ch := make(chan struct {
key int
value int
}, 256)
var wg sync.WaitGroup
go func() {
wg.Add(1)
for {
entry, ok := <-ch
if !ok {
wg.Done()
return
}
m[entry.key] = entry.value
}
}()
for i := 0; i < numIterations; i++ {
ch <- struct {
key int
value int
}{i % mapSize, i}
}
close(ch)
wg.Wait()
}
通過 benchmark
進行測試
go test -bench .
goos: windows
goarch: amd64
pkg: golib/examples/map_modify
cpu: Intel(R) Core(TM) i7-9700 CPU @ 3.00GHz
BenchmarkMutex-8 532 2166059 ns/op
BenchmarkChannel-8 186 6409804 ns/op
可以看到直接加鎖修改 map 的效率是更高的,所以在修改不復雜的情況下我們優先選擇直接 sync.Mutex
來規避併發修改的問題
限制數量
可以通過 帶緩衝區的 channel
來控制,如果緩衝區已經被寫滿,則需要等待其他執行完的 Goroutine 將數據讀走再繼續操作。
package main
import (
"sync"
"fmt"
)
// 控制併發度爲5
const N = 5
var APIList = []string{"a", "b", "c", "d", "e", "f", "g", "h", "i", "j"}
func main() {
var(
c = make(chan struct{},N)
wg sync.WaitGroup
)
for i := 0; i < len(APIList) ;i++{
c <- struct{}{}
wg.Add(1)
go func(index int){
fmt.Printf("%s", APIList[index])
_ = <-c
wg.Done()
}(i)
}
wg.Wait()
fmt.Println()
fmt.Print("done")
}
我們可以通過信號量來控制是否要運行協程,從而達到控制併發數量的目的。
type BoundedFrequencyRunner struct {
sync.Mutex
// 主動觸發
run chan struct{}
// 定時器限制
timer *time.Timer
// 真正執行的邏輯
fn func()
}
func NewBoundedFrequencyRunner(fn func()) *BoundedFrequencyRunner {
return &BoundedFrequencyRunner{
run: make(chan struct{}, 1),
fn: fn,
timer: time.NewTimer(0),
}
}
邊界運行期的結構體,可以看到它持有了需要執行的邏輯,還有定時器和主動觸發的 chan
,都是爲了能夠讓我們在初始化之後能夠正常執行方法
// Run 觸發執行 ,這裏只能夠寫入一個信號量,多餘的直接丟棄,不會阻塞,這裏也可以根據自己的需要增加排隊的個數
func (b *BoundedFrequencyRunner) Run() {
select {
case b.run <- struct{}{}:
fmt.Println("寫入信號量成功")
default:
fmt.Println("已經觸發過一次,直接丟棄信號量")
}
}
func (b *BoundedFrequencyRunner) Loop() {
b.timer.Reset(time.Second * 1)
for {
select {
case <-b.run:
fmt.Println("run 信號觸發")
b.tryRun()
case <-b.timer.C:
fmt.Println("timer 觸發執行")
b.tryRun()
}
}
}
func (b *BoundedFrequencyRunner) tryRun() {
b.Lock()
defer b.Unlock()
// 可以增加限流器等限制邏輯
b.timer.Reset(time.Second * 1)
b.fn()
}
panic 的處理
可以使用 recover 函數來捕獲 Goroutine 中的 panic,並進行相應的處理,如果沒有對相應的 Goroutine 進行異常處理,會導致主線程 panic
所以我們可以通過代碼的封裝,不直接使用 go
關鍵字,來將每次啓動的協程都帶上 recover
。通過封裝 HandleCrash
方法來實現,這個方式也是 Kubernets
中的實現。
package runtime
var (
ReallyCrash = true
)
// 全局默認的Panic處理
var PanicHandlers = []func(interface{}){logPanic}
// 允許外部傳入額外的異常處理
func HandleCrash(additionalHandlers ...func(interface{})) {
if r := recover(); r != nil {
for _, fn := range PanicHandlers {
fn(r)
}
for _, fn := range additionalHandlers {
fn(r)
}
if ReallyCrash {
panic(r)
}
}
}
這裏既支持了內部異常的函數處理,也支持外部傳入額外的異常處理,如果不想要 Crash
的話也可以自己進行修改。
package runtime
func Go(fn func()) {
go func() {
defer HandleCrash()
fn()
}()
}
要起協程的時候可以通過 Go
方法來執行,這樣也避免了自己忘記增加 panic
的處理。
底層實現
go
關鍵字啓動後編譯器器會通過 cmd/compile/internal/gc.state.stmt 和 cmd/compile/internal/gc.state.call 兩個方法將該關鍵字轉換成 runtime.newproc 函數調用。
啓動一個新的 Goroutine 來執行任務時,會通過 runtime.newproc
初始化一個 g
來運行協程。
在 Go 的運行時(runtime)系統中,G 表示 Goroutine,M 表示 Machine(即操作系統線程),P 表示 Processor。其中,G 是 Goroutine 執行的實體,M 是 Goroutine 的承載者,P 是調度器。
Goroutine 在 Go 運行時(runtime)系統中可以有以下 9 種狀態:
-
Gidle:Goroutine 處於空閒狀態,即沒有被創建或者被回收;
-
Grunnable:Goroutine 可以被調度器調度執行,但是還未被選中執行;
-
Grunning:Goroutine 正在執行中,被賦予了 M 和 P 的資源;
-
Gsyscall:Goroutine 發起了系統調用,進入系統調用阻塞狀態;
-
Gwaiting:Goroutine 被阻塞等待某個事件的發生,比如等待 I/O、等待鎖、等待 channel 等;
-
Gscan:GC 正在掃描棧空間
-
Gdead:沒有正在執行的用戶代碼
-
Gcopystack:棧正在被拷貝,沒有正在執行的代碼
-
Gpreempted:Goroutine 被搶佔,即在運行過程中被調度器中斷。等待重新喚醒
在 Go 中,每個 Goroutine 都是由 Go 運行時調度器(Scheduler)進行調度的。調度器負責將 Goroutine 轉換成線程上的執行上下文,並在多個線程之間分配 Goroutine 的執行。
Go 調度器的調度策略是基於協作式調度的。 也就是說,調度器會在 Goroutine 主動讓出執行權(例如在 I/O 操作、channel 操作、time.Sleep() 等操作中)時,將 CPU 的執行權轉交給其他 Goroutine。這種調度策略可以保證 Goroutine 之間的調度是非常輕量級的。
在 Go 中,Goroutine 的調度時機一般有以下幾種情況:
-
當前 Goroutine 主動讓出執行權時,調度器會將 CPU 的執行權轉交給其他 Goroutine。
-
當前 Goroutine 執行的時間超過了 Go 運行時所設置的閾值時,調度器會將當前 Goroutine 暫停,將 CPU 的執行權轉交給其他 Goroutine。
-
當前 Goroutine 進行 I/O 操作、channel 操作或者其他系統調用時,調度器會將當前 Goroutine 暫停,將 CPU 的執行權轉交給其他 Goroutine。
-
當前 Goroutine 被阻塞在同步原語(例如 sync.Mutex)時,調度器會將當前 Goroutine 暫停,將 CPU 的執行權轉交給其他 Goroutine。
需要注意的是,在 Go 中 Goroutine 的調度是非確定性的,也就是說,Goroutine 之間的調度是不可預測的。這種調度策略可以保證 Goroutine 的執行具有隨機性,可以充分利用多核 CPU 的性能。
具體性能優勢的大小,取決於應用程序的具體實現、硬件環境和應用場景等因素,因此無法給出一個具體的數字來衡量其性能優勢的大小。在不同的場景下,性能優勢的差異也會有所不同。
在實現上採用的是用戶態調度,不需要進行內核態和用戶態之間的切換,從而可以更快地切換和調度多個 Goroutine。相比之下,傳統的線程需要佔用更多的資源和時間,因此在多併發的情況下,Go 的 Goroutine 會更加高效。
在實際應用中,要根據具體的場景和需求來選擇合適的併發方式,不能盲目地追求 Goroutine 的性能優勢而忽略其他的因素。
舉個例子如果併發去訪問同一個庫,如果併發度是 10 的話,那麼 QPS 的量將會被擴大 10 倍,如果這個時候數據庫扛不住對應的併發,會造成雪崩的情況,所以這種時候並不適合用併發來優化程序的性能。
資源的消耗
1. 內存的消耗
因爲打開 Goroutine 時需要有對應的數據結構來存儲,所以會產生內存的消耗。
通過開啓協程並進行阻塞,來查看前後內存的變化情況
func getGoroutineMemConsume() {
var c chan int var wg sync.WaitGroup
const goroutineNum = 1000000 memConsumed := func() uint64 {
runtime.GC() //GC,排除對象影響 var memStat runtime.MemStats
runtime.ReadMemStats(&memStat)
return memStat.Sys
}
noop := func() {
wg.Done()
<-c //防止goroutine退出,內存被釋放
}
wg.Add(goroutineNum)
before := memConsumed() //獲取創建goroutine前內存 for i := 0; i < goroutineNum; i++ {
go noop()
}
wg.Wait()
after := memConsumed() //獲取創建goroutine後內存 fmt.Println(runtime.NumGoroutine())
fmt.Printf("%.3f KB bytes\n", float64(after-before)/goroutineNum/1024)
}
每個協程至少需要消耗 2KB 的空間,那麼假設計算機的內存是 2GB,那麼至多允許 2GB/2KB = 1M 個協程同時存在。
2.CPU 的消耗
因爲開啓 goroutine 後需要進行調度,而且每次開啓一個任務時,執行任務也會佔用 CPU。
一個 Goroutine 消耗多少 CPU 實際上跟執行函數的邏輯有着很大的關係,如果執行的函數是 CPU 密集型的計算,並且持續的時間很長,那麼這個時候 CPU 就會優先到達瓶頸。
所以具體的 CPU 消耗需要看具體的邏輯才能夠進行判斷。
寫在最後
理解 Goroutine 使用和原理,對我們在學習雲原生的知識是有極大的幫助的,在 kubernetes
中用了非常多的異步編程技巧,如果我們沒有異步編程的知識儲備,那代碼看起來會是雲裏霧裏的,瞭解了上面這些協程的用法之後,在閱讀 go
相關項目的時候也會如虎添翼,幫助我們快速理解。
本文由 Readfog 進行 AMP 轉碼,版權歸原作者所有。
來源:https://mp.weixin.qq.com/s/AvrfEHOj72cXttQDTv9sgw