gotraining 併發編程之 Channel 篇

我是一隻可愛的土撥鼠,專注於分享 Go 職場、招聘和求職,解 Gopher 之憂!歡迎關注我。

歡迎大家加入 Go 招聘交流羣,來這裏找志同道合的小夥伴!跟土撥鼠們一起交流學習。

目錄

前言

上週土撥鼠寫了一篇看看國外的 Gopher 培訓教程是怎樣的?, 大概介紹了一下課程目錄內容。

今天給大家帶來concurrency/channels的翻譯文章。由 “起風了” 翻譯自 https://github.com/ardanlabs/gotraining/tree/master/topics/go/concurrency/channels#channels,土撥鼠參與校對。

由於本人翻譯水平有限,不足之處煩請指出。

channels 允許 goroutine 通過使用信號語義相互通信。channels 通過使用發送 / 接收數據或通過識別單個 channel 上的狀態變化來實現此信號。不要把 channel 當作隊列來構建程序,而是把重點放在簡化所需編排的信號和語義上。

設計指南

下面講解一下 channel 的設計指南

語言機制

設計理念

解決不同的問題,可能需要不同的 channel 語義。可以根據你需要的語義,採用不同的架構選擇。

圖解

傳遞保證

傳遞保證是基於一個問題:“我是否需要去保證特定的 goroutine 已收到發送的信號?”

img

有數據和沒數據的信號

當使用數據發出信號時,可以選擇三個 channel 配置選項,具體取決於所需的保證類型。

img

無數據信號主要用來取消。它允許一個 goroutine 用信號通知另一個 goroutine 來取消他們正在做的事情並繼續執行 。可以使用無緩衝和緩衝的 channel 來實現取消。

img

狀態

channel 的行爲直接受其當前狀態的影響,其狀態有三個:nil,open,close

img

文章鏈接

Channels 的行爲

Channel 通信

通過通信來共享內存

Go 中 Channel 的本質

理解 Channel 視頻

Buffer Bloat - 2011

Bufferbloat: Dark Buffers in the Internet[7]

Buffer Bloat Videos[8]

CodeReview

基本原理

網球比賽

接力賽

扇出模式

監控運行時間

高級 CodeReview

channel 有序通信

https://play.golang.org/p/YwKFJPkB4gC

https://github.com/ardanlabs/gotraining/blob/master/topics/go/concurrency/channels/advanced/example1/example1.go

// Sample program to show the order of channel communication for unbuffered,
// buffered and closing channels based on the specification.
// https://golang.org/ref/mem#tmp_7
package main

import "fmt"

func main() {
 unBuffered()
 buffered()
 closed()
}

// With unbuffered channels, the receive happens before the corresponding send.
// The write to a happens before the receive on c, which happens before the
// corresponding send on c completes, which happens before the print.
func unBuffered() {
 c := make(chan int)
 var a string

 go func() {
  a = "hello, world"
  <-c
 }()

 c <- 0

 // We are guaranteed to print "hello, world".
 fmt.Println(a)
}

// With buffered channels, the send happens before the corresponding receive.
// The write to a happens before the send on c, which happens before the
// corresponding receive on c completes, which happens before the print.
func buffered() {
 c := make(chan int, 10)
 var a string

 go func() {
  a = "hello, world"
  c <- 0
 }()

 <-c

 // We are guaranteed to print "hello, world".
 fmt.Println(a)
}

// With both types of channels, a close happens before the corresponding receive.
// The write to a happens before the close on c, which happens before the
// corresponding receive on c completes, which happens before the print.
func closed() {
 c := make(chan int, 10)
 var a string

 go func() {
  a = "hello, world"
  close(c)
 }()

 <-c

 // We are guaranteed to print "hello, world".
 fmt.Println(a)
}

練習

練習 1

編寫一個程序,其中兩個 goroutine 來回傳遞一個整數十次。當每個 goroutine 接收到整數時打印。每次通過整數都增加。一旦整數等於 10,立刻終止程序。

https://play.golang.org/p/cEUYThI5etc

// Write a program where two goroutines pass an integer back and forth
// ten times. Display when each goroutine receives the integer. Increment
// the integer with each pass. Once the integer equals ten, terminate
// the program cleanly.
package main

import (
 "fmt"
 "sync"
)

func main() {

 // Create an unbuffered channel.
 share := make(chan int)

 // Create the WaitGroup and add a count
 // of two, one for each goroutine.
 var wg sync.WaitGroup
 wg.Add(2)

 // Launch two goroutines.
 go func() {
  goroutine("Bill", share)
  wg.Done()
 }()

 go func() {
  goroutine("Joan", share)
  wg.Done()
 }()

 // Start the sharing.
 share <- 1

 // Wait for the program to finish.
 wg.Wait()
}

// goroutine simulates sharing a value.
func goroutine(name string, share chan int) {
 for {

  // Wait to receive a value.
  value, ok := <-share
  if !ok {

   // If the channel was closed, return.
   fmt.Printf("Goroutine %s Down\n", name)
   return
  }

  // Display the value.
  fmt.Printf("Goroutine %s Inc %d\n", name, value)

  // Terminate when the value is 10.
  if value == 10 {
   close(share)
   fmt.Printf("Goroutine %s Down\n", name)
   return
  }

  // Increment the value and send it
  // over the channel.
  share <- (value + 1)
 }
}

練習 2

編寫一個程序,使用扇出模式同時生成 100 個隨機數。讓每個 goroutine 生成一個隨機數,並通過緩衝 channel 將該數字返回給主 goroutine。設置緩衝區 channel 的大小,以便永遠不會發送阻塞。不要分配比您需要的更多的緩衝區。讓主 goroutine 顯示它收到的每個隨機數,然後終止程序。

https://play.golang.org/p/cEUYThI5etc

// Write a program that uses a fan out pattern to generate 100 random numbers
// concurrently. Have each goroutine generate a single random number and return
// that number to the main goroutine over a buffered channel. Set the size of
// the buffer channel so no send every blocks. Don't allocate more buffers than
// you need. Have the main goroutine display each random number is receives and
// then terminate the program.
package main

import (
 "fmt"
 "math/rand"
 "time"
)

const (
 goroutines = 100
)

func init() {
 rand.Seed(time.Now().UnixNano())
}

func main() {

 // Create the buffer channel with a buffer for
 // each goroutine to be created.
 values := make(chan int, goroutines)

 // Iterate and launch each goroutine.
 for gr := 0; gr < goroutines; gr++ {

  // Create an anonymous function for each goroutine that
  // generates a random number and sends it on the channel.
  go func() {
   values <- rand.Intn(1000)
  }()
 }

 // Create a variable to be used to track received messages.
 // Set the value to the number of goroutines created.
 wait := goroutines

 // Iterate receiving each value until they are all received.
 // Store them in a slice of ints.
 var nums []int
 for wait > 0 {
  nums = append(nums, <-values)
  wait--
 }

 // Print the values in our slice.
 fmt.Println(nums)
}

練習 3

編寫一個程序,最多同時生成 100 個隨機數。不要發送所有的 100 個值,因爲發送 / 接收的數量是未知的。

// Write a program that uses goroutines to generate up to 100 random numbers.
// Do not send values that are divisible by 2. Have the main goroutine receive
// values and add them to a slice.
package main

import (
 "fmt"
 "math/rand"
 "sync"
 "time"
)

const (
 goroutines = 100
)

func init() {
 rand.Seed(time.Now().UnixNano())
}

func main() {

 // Create the channel for sharing results.
 values := make(chan int)

 // Create a sync.WaitGroup to monitor the Goroutine pool. Add the count.
 var wg sync.WaitGroup
 wg.Add(goroutines)

 // Iterate and launch each goroutine.
 for gr := 0; gr < goroutines; gr++ {

  // Create an anonymous function for each goroutine.
  go func() {

   // Ensure the waitgroup is decremented when this function returns.
   defer wg.Done()

   // Generate a random number up to 1000.
   n := rand.Intn(1000)

   // Return early if the number is divisible by 2. n%2 == 0
   if n%2 == 0 {
    return
   }

   // Send the odd values through the channel.
   values <- n
  }()
 }

 // Create a goroutine that waits for the other goroutines to finish then
 // closes the channel.
 go func() {
  wg.Wait()
  close(values)
 }()

 // Receive from the channel until it is closed.
 // Store values in a slice of ints.
 var nums []int
 for n := range values {
  nums = append(nums, n)
 }

 // Print the values in our slice.
 fmt.Printf("Result count: %d\n", len(nums))
 fmt.Println(nums)
}

練習 4

編寫一個程序,使用 work pool 同時生成最多 100 個隨機數。拒絕偶數值。如果已收集到 100 個奇數,就讓協程停止運行。

https://play.golang.org/p/9nZ8YgmsAIX

// Write a program that creates a fixed set of workers to generate random
// numbers. Discard any number divisible by 2. Continue receiving until 100
// numbers are received. Tell the workers to shut down before terminating.
package main

import (
 "fmt"
 "math/rand"
 "runtime"
 "sync"
)

func main() {

 // Create the channel for sharing results.
 values := make(chan int)

 // Create a channel "shutdown" to tell goroutines when to terminate.
 shutdown := make(chan struct{})

 // Define the size of the worker pool. Use runtime.GOMAXPROCS(0) to size the pool based on number of processors.
 poolSize := runtime.GOMAXPROCS(0)

 // Create a sync.WaitGroup to monitor the Goroutine pool. Add the count.
 var wg sync.WaitGroup
 wg.Add(poolSize)

 // Create a fixed size pool of goroutines to generate random numbers.
 for i := 0; i < poolSize; i++ {
  go func(id int) {

   // Start an infinite loop.
   for {

    // Generate a random number up to 1000.
    n := rand.Intn(1000)

    // Use a select to either send the number or receive the shutdown signal.
    select {

    // In one case send the random number.
    case values <- n:
     fmt.Printf("Worker %d sent %d\n", id, n)

    // In another case receive from the shutdown channel.
    case <-shutdown:
     fmt.Printf("Worker %d shutting down\n", id)
     wg.Done()
     return
    }
   }
  }(i)
 }

 // Create a slice to hold the random numbers.
 var nums []int
 for i := range values {

  // continue the loop if the value was even.
  if i%2 == 0 {
   fmt.Println("Discarding", i)
   continue
  }

  // Store the odd number.
  fmt.Println("Keeping", i)
  nums = append(nums, i)

  // break the loop once we have 100 results.
  if len(nums) == 100 {
   break
  }
 }

 // Send the shutdown signal by closing the channel.
 fmt.Println("Receiver sending shutdown signal")
 close(shutdown)
  
 // Wait for the Goroutines to finish.
 wg.Wait()

 // Print the values in our slice.
 fmt.Printf("Result count: %d\n", len(nums))
 fmt.Println(nums)
}

參考資料

[1]

The Behavior Of Channels: https://www.ardanlabs.com/blog/2017/10/the-behavior-of-channels.html

[2]

Channel Communication: https://golang.org/ref/mem#tmp_7

[3]

Share Memory By Communicating: http://blog.golang.org/share-memory-by-communicating

[4]

Go 編程:在通訊中共享內存: https://learnku.com/docs/go-blog/share-memory-by-communicating/6586

[5]

The Nature Of Channels In Go: https://www.ardanlabs.com/blog/2014/02/the-nature-of-channels-in-go.html

[6]

Understanding Channels: https://www.youtube.com/watch?v=KBZlN0izeiY

[7]

Bufferbloat: Dark Buffers in the Internet: https://www.youtube.com/watch?v=qbIozKVz73g

[8]

Buffer Bloat Videos: http://www.bufferbloat.net/projects/cerowrt/wiki/Bloat-videos

本文由 Readfog 進行 AMP 轉碼,版權歸原作者所有。
來源https://mp.weixin.qq.com/s/YJAJJErj0aRx6poriVQ-3A