gotraining 併發編程之 Channel 篇
我是一隻可愛的土撥鼠,專注於分享 Go 職場、招聘和求職,解 Gopher 之憂!歡迎關注我。
歡迎大家加入 Go 招聘交流羣,來這裏找志同道合的小夥伴!跟土撥鼠們一起交流學習。
目錄
-
前言
-
設計指南
-
語言機制
-
設計理念
-
圖解
-
傳遞保證
-
有數據和沒數據的信號
-
狀態
-
文章鏈接
-
Buffer Bloat - 2011
-
CodeReview
-
高級 CodeReview
-
練習
-
練習 1
-
練習 2
-
練習 3
-
練習 4
前言
上週土撥鼠寫了一篇看看國外的 Gopher 培訓教程是怎樣的?, 大概介紹了一下課程目錄內容。
今天給大家帶來concurrency/channels
的翻譯文章。由 “起風了” 翻譯自 https://github.com/ardanlabs/gotraining/tree/master/topics/go/concurrency/channels#channels,土撥鼠參與校對。
由於本人翻譯水平有限,不足之處煩請指出。
channels 允許 goroutine 通過使用信號語義相互通信。channels 通過使用發送 / 接收數據或通過識別單個 channel 上的狀態變化來實現此信號。不要把 channel 當作隊列來構建程序,而是把重點放在簡化所需編排的信號和語義上。
設計指南
下面講解一下 channel 的設計指南
語言機制
-
使用 channels 編排和協調 goroutine
-
關注信號語義而不是數據共享。
-
有數據或無數據的信號
-
質疑它們用於同步對共享狀態的訪問
-
在有些情況下,channel 可以更簡單,但是最初的問題
-
無緩衝的 channels:
-
接收在發送之前
-
優點: 能 100% 保證發送的信號已被接收
-
缺點: 接收信號時的延遲未知
-
帶緩衝的 channels:
-
緩衝區越大,保證性越差
-
緩衝區爲 1 可以給你一個延遲發送的保證
-
發送在接收之前
-
優點: 減少信號之間的阻塞延遲
-
缺點: 不能保證發送的信號何時被接收到
-
關閉 channels:
-
關閉在接收之前 (如有緩衝)
-
沒有數據的信號
-
非常適合信號取消和用於設置 deadline
-
nil channel :
-
發送和接收塊
-
關閉信號
-
非常適合速度限制或短暫停頓
設計理念
解決不同的問題,可能需要不同的 channel 語義。可以根據你需要的語義,採用不同的架構選擇。
-
如果給一個指定大小的 channel 發送信號會阻塞正在發送的 goroutine:
-
謹慎使用大於 1 的緩衝 channel
-
大於 1 的緩衝區必須有原因 / 測量值
-
必須要明白髮送 goroutine 阻塞時會發生什麼
-
如果給一個指定大小的 channel 發送信號不會阻塞正在發送的 goroutine:
-
下降 (Drop) 模式
-
扇出 (Fan Out) 模式
-
每次發送都保證有確切的緩衝區數量
-
確認你測試過緩衝的最大容量
-
緩衝區,越小越好
-
將阻塞延遲降低到零並不一定意味着有更好的吞吐量
-
如果一個緩衝區能夠提供足夠的吞吐量,那麼就保留它
-
對大於 1 的緩衝區提出質疑,並對其大小進行度量
-
尋找儘可能小的緩衝區,以提供足夠好的吞吐量
-
在考慮緩衝區時,不要考慮性能
-
緩衝區可以幫助減少信令之間的阻塞延遲
圖解
傳遞保證
傳遞保證
是基於一個問題:“我是否需要去保證特定的 goroutine 已收到發送的信號?”
img
有數據和沒數據的信號
當使用數據發出信號時,可以選擇三個 channel 配置選項,具體取決於所需的保證類型。
img
無數據信號主要用來取消。它允許一個 goroutine 用信號通知另一個 goroutine 來取消他們正在做的事情並繼續執行 。可以使用無緩衝和緩衝的 channel 來實現取消。
img
狀態
channel 的行爲直接受其當前狀態的影響,其狀態有三個:nil,open,close
img
文章鏈接
Channels 的行爲
- The Behavior Of Channels[1] -- William Kennedy
Channel 通信
- Channel Communication[2]
通過通信來共享內存
-
Share Memory By Communicating[3] -- Andrew Gerrand
-
Go 編程:在通訊中共享內存 [4]
Go 中 Channel 的本質
- The Nature Of Channels In Go[5]-- William Kennedy
理解 Channel 視頻
- Understanding Channels[6] - Kavya Joshi
Buffer Bloat - 2011
-
大的緩衝能及時阻止後端的大量警告。
-
它們可以在短時間內增強處理洪峯的能力。
-
它會增加延遲而不是減少延遲
-
使用帶緩衝的 channels 可以提供一種保持連續性的方法
-
不要僅僅是爲了性能纔去使用它。
-
使用它們可以良好地處理結構化的突發流量數據。
-
使用它們來快速交換數據。
Bufferbloat: Dark Buffers in the Internet[7]
Buffer Bloat Videos[8]
CodeReview
基本原理
-
github https://github.com/ardanlabs/gotraining/blob/master/topics/go/concurrency/channels/example1/example1.go
-
Go Playground https://play.golang.org/p/vG1rI7VteNH
網球比賽
-
github https://github.com/ardanlabs/gotraining/blob/master/topics/go/concurrency/channels/example2/example2.go
-
Go Playground https://play.golang.org/p/PvFKD_tNwir
接力賽
-
github https://github.com/ardanlabs/gotraining/blob/master/topics/go/concurrency/channels/example3/example3.go
-
Go Playground https://play.golang.org/p/OLdBCGUvzbx
扇出模式
-
github https://github.com/ardanlabs/gotraining/blob/master/topics/go/concurrency/channels/example4/example4.go
-
Go Playground https://play.golang.org/p/zxzHAHIr3Xj
監控運行時間
-
github https://github.com/ardanlabs/gotraining/blob/master/topics/go/concurrency/channels/example5/example5.go
-
Go Playground https://play.golang.org/p/vZ95XZuYVPF
高級 CodeReview
channel 有序通信
https://play.golang.org/p/YwKFJPkB4gC
https://github.com/ardanlabs/gotraining/blob/master/topics/go/concurrency/channels/advanced/example1/example1.go
// Sample program to show the order of channel communication for unbuffered,
// buffered and closing channels based on the specification.
// https://golang.org/ref/mem#tmp_7
package main
import "fmt"
func main() {
unBuffered()
buffered()
closed()
}
// With unbuffered channels, the receive happens before the corresponding send.
// The write to a happens before the receive on c, which happens before the
// corresponding send on c completes, which happens before the print.
func unBuffered() {
c := make(chan int)
var a string
go func() {
a = "hello, world"
<-c
}()
c <- 0
// We are guaranteed to print "hello, world".
fmt.Println(a)
}
// With buffered channels, the send happens before the corresponding receive.
// The write to a happens before the send on c, which happens before the
// corresponding receive on c completes, which happens before the print.
func buffered() {
c := make(chan int, 10)
var a string
go func() {
a = "hello, world"
c <- 0
}()
<-c
// We are guaranteed to print "hello, world".
fmt.Println(a)
}
// With both types of channels, a close happens before the corresponding receive.
// The write to a happens before the close on c, which happens before the
// corresponding receive on c completes, which happens before the print.
func closed() {
c := make(chan int, 10)
var a string
go func() {
a = "hello, world"
close(c)
}()
<-c
// We are guaranteed to print "hello, world".
fmt.Println(a)
}
練習
練習 1
編寫一個程序,其中兩個 goroutine 來回傳遞一個整數十次。當每個 goroutine 接收到整數時打印。每次通過整數都增加。一旦整數等於 10,立刻終止程序。
https://play.golang.org/p/cEUYThI5etc
// Write a program where two goroutines pass an integer back and forth
// ten times. Display when each goroutine receives the integer. Increment
// the integer with each pass. Once the integer equals ten, terminate
// the program cleanly.
package main
import (
"fmt"
"sync"
)
func main() {
// Create an unbuffered channel.
share := make(chan int)
// Create the WaitGroup and add a count
// of two, one for each goroutine.
var wg sync.WaitGroup
wg.Add(2)
// Launch two goroutines.
go func() {
goroutine("Bill", share)
wg.Done()
}()
go func() {
goroutine("Joan", share)
wg.Done()
}()
// Start the sharing.
share <- 1
// Wait for the program to finish.
wg.Wait()
}
// goroutine simulates sharing a value.
func goroutine(name string, share chan int) {
for {
// Wait to receive a value.
value, ok := <-share
if !ok {
// If the channel was closed, return.
fmt.Printf("Goroutine %s Down\n", name)
return
}
// Display the value.
fmt.Printf("Goroutine %s Inc %d\n", name, value)
// Terminate when the value is 10.
if value == 10 {
close(share)
fmt.Printf("Goroutine %s Down\n", name)
return
}
// Increment the value and send it
// over the channel.
share <- (value + 1)
}
}
練習 2
編寫一個程序,使用扇出模式同時生成 100 個隨機數。讓每個 goroutine 生成一個隨機數,並通過緩衝 channel 將該數字返回給主 goroutine。設置緩衝區 channel 的大小,以便永遠不會發送阻塞。不要分配比您需要的更多的緩衝區。讓主 goroutine 顯示它收到的每個隨機數,然後終止程序。
https://play.golang.org/p/cEUYThI5etc
// Write a program that uses a fan out pattern to generate 100 random numbers
// concurrently. Have each goroutine generate a single random number and return
// that number to the main goroutine over a buffered channel. Set the size of
// the buffer channel so no send every blocks. Don't allocate more buffers than
// you need. Have the main goroutine display each random number is receives and
// then terminate the program.
package main
import (
"fmt"
"math/rand"
"time"
)
const (
goroutines = 100
)
func init() {
rand.Seed(time.Now().UnixNano())
}
func main() {
// Create the buffer channel with a buffer for
// each goroutine to be created.
values := make(chan int, goroutines)
// Iterate and launch each goroutine.
for gr := 0; gr < goroutines; gr++ {
// Create an anonymous function for each goroutine that
// generates a random number and sends it on the channel.
go func() {
values <- rand.Intn(1000)
}()
}
// Create a variable to be used to track received messages.
// Set the value to the number of goroutines created.
wait := goroutines
// Iterate receiving each value until they are all received.
// Store them in a slice of ints.
var nums []int
for wait > 0 {
nums = append(nums, <-values)
wait--
}
// Print the values in our slice.
fmt.Println(nums)
}
練習 3
編寫一個程序,最多同時生成 100 個隨機數。不要發送所有的 100 個值,因爲發送 / 接收的數量是未知的。
// Write a program that uses goroutines to generate up to 100 random numbers.
// Do not send values that are divisible by 2. Have the main goroutine receive
// values and add them to a slice.
package main
import (
"fmt"
"math/rand"
"sync"
"time"
)
const (
goroutines = 100
)
func init() {
rand.Seed(time.Now().UnixNano())
}
func main() {
// Create the channel for sharing results.
values := make(chan int)
// Create a sync.WaitGroup to monitor the Goroutine pool. Add the count.
var wg sync.WaitGroup
wg.Add(goroutines)
// Iterate and launch each goroutine.
for gr := 0; gr < goroutines; gr++ {
// Create an anonymous function for each goroutine.
go func() {
// Ensure the waitgroup is decremented when this function returns.
defer wg.Done()
// Generate a random number up to 1000.
n := rand.Intn(1000)
// Return early if the number is divisible by 2. n%2 == 0
if n%2 == 0 {
return
}
// Send the odd values through the channel.
values <- n
}()
}
// Create a goroutine that waits for the other goroutines to finish then
// closes the channel.
go func() {
wg.Wait()
close(values)
}()
// Receive from the channel until it is closed.
// Store values in a slice of ints.
var nums []int
for n := range values {
nums = append(nums, n)
}
// Print the values in our slice.
fmt.Printf("Result count: %d\n", len(nums))
fmt.Println(nums)
}
練習 4
編寫一個程序,使用 work pool 同時生成最多 100 個隨機數。拒絕偶數值。如果已收集到 100 個奇數,就讓協程停止運行。
https://play.golang.org/p/9nZ8YgmsAIX
// Write a program that creates a fixed set of workers to generate random
// numbers. Discard any number divisible by 2. Continue receiving until 100
// numbers are received. Tell the workers to shut down before terminating.
package main
import (
"fmt"
"math/rand"
"runtime"
"sync"
)
func main() {
// Create the channel for sharing results.
values := make(chan int)
// Create a channel "shutdown" to tell goroutines when to terminate.
shutdown := make(chan struct{})
// Define the size of the worker pool. Use runtime.GOMAXPROCS(0) to size the pool based on number of processors.
poolSize := runtime.GOMAXPROCS(0)
// Create a sync.WaitGroup to monitor the Goroutine pool. Add the count.
var wg sync.WaitGroup
wg.Add(poolSize)
// Create a fixed size pool of goroutines to generate random numbers.
for i := 0; i < poolSize; i++ {
go func(id int) {
// Start an infinite loop.
for {
// Generate a random number up to 1000.
n := rand.Intn(1000)
// Use a select to either send the number or receive the shutdown signal.
select {
// In one case send the random number.
case values <- n:
fmt.Printf("Worker %d sent %d\n", id, n)
// In another case receive from the shutdown channel.
case <-shutdown:
fmt.Printf("Worker %d shutting down\n", id)
wg.Done()
return
}
}
}(i)
}
// Create a slice to hold the random numbers.
var nums []int
for i := range values {
// continue the loop if the value was even.
if i%2 == 0 {
fmt.Println("Discarding", i)
continue
}
// Store the odd number.
fmt.Println("Keeping", i)
nums = append(nums, i)
// break the loop once we have 100 results.
if len(nums) == 100 {
break
}
}
// Send the shutdown signal by closing the channel.
fmt.Println("Receiver sending shutdown signal")
close(shutdown)
// Wait for the Goroutines to finish.
wg.Wait()
// Print the values in our slice.
fmt.Printf("Result count: %d\n", len(nums))
fmt.Println(nums)
}
參考資料
[1]
The Behavior Of Channels: https://www.ardanlabs.com/blog/2017/10/the-behavior-of-channels.html
[2]
Channel Communication: https://golang.org/ref/mem#tmp_7
[3]
Share Memory By Communicating: http://blog.golang.org/share-memory-by-communicating
[4]
Go 編程:在通訊中共享內存: https://learnku.com/docs/go-blog/share-memory-by-communicating/6586
[5]
The Nature Of Channels In Go: https://www.ardanlabs.com/blog/2014/02/the-nature-of-channels-in-go.html
[6]
Understanding Channels: https://www.youtube.com/watch?v=KBZlN0izeiY
[7]
Bufferbloat: Dark Buffers in the Internet: https://www.youtube.com/watch?v=qbIozKVz73g
[8]
Buffer Bloat Videos: http://www.bufferbloat.net/projects/cerowrt/wiki/Bloat-videos
本文由 Readfog 進行 AMP 轉碼,版權歸原作者所有。
來源:https://mp.weixin.qq.com/s/YJAJJErj0aRx6poriVQ-3A