Go 併發 IO 優化

1. 背景

有的時候我們會遇到併發 IO 的情況,例如,併發爬蟲下載網絡上的圖片。如果併發度過高或者下載的內容過大,會導致網絡 IO 耗時急劇上升。這時候就需要優化一下每次網絡 IO 的耗時。

2. 網絡下載圖片用例

以下載網絡數據爲例,下面是典型的代碼。

func TestHttpGet(t *testing.T) {
    rsp, err := http.Get("http://xxx.com")
    if err != nil {
        t.Errorf("get err:%v", err)
        return
    }
    defer rsp.Body.Close()
    body, err := ioutil.ReadAll(rsp.Body)
    t.Logf("body len:%v, read err:%v", len(body), err)
}

在代碼中,首先通過 http.Get 獲取網絡上的資源,這段耗時不容易在業務層面優化。因此想要優化整體耗時,只有從讀取響應 rsp.Body 入手。

3. ioutil.ReadAll

3.1. 源碼分析

ioutil.ReadAll 中其實是調用了 bytes.Buffer.ReadFrom 函數,buf 的初始容量是 bytes.MinRead = 512

// readAll reads from r until an error or EOF and returns the data it read
// from the internal buffer allocated with a specified capacity.
func readAll(r io.Reader, capacity int64) (b []byte, err error) {
    var buf bytes.Buffer
    if int64(int(capacity)) == capacity {
        buf.Grow(int(capacity))
    }
    _, err = buf.ReadFrom(r)
    return buf.Bytes(), err
}

// ReadAll reads from r until an error or EOF and returns the data it read.
// A successful call returns err == nil, not err == EOF. Because ReadAll is
// defined to read from src until EOF, it does not treat an EOF from Read
// as an error to be reported.
func ReadAll(r io.Reader) ([]byte, error) {
    return readAll(r, bytes.MinRead)
}

buf.ReadFrom(r) 中,首先將 b.buf 擴容 MinRead = 512 字節,然後從 r 中一輪一輪讀取數據,直到 b.buf 填完。

// MinRead is the minimum slice size passed to a Read call by
// Buffer.ReadFrom. As long as the Buffer has at least MinRead bytes beyond
// what is required to hold the contents of r, ReadFrom will not grow the
// underlying buffer.
const MinRead = 512

// ReadFrom reads data from r until EOF and appends it to the buffer, growing
// the buffer as needed. The return value n is the number of bytes read. Any
// error except io.EOF encountered during the read is also returned. If the
// buffer becomes too large, ReadFrom will panic with ErrTooLarge.
func (b *Buffer) ReadFrom(r io.Reader) (n int64, err error) {
    b.lastRead = opInvalid
    for {
        i := b.grow(MinRead)
        b.buf = b.buf[:i]
        m, e := r.Read(b.buf[i:cap(b.buf)])
        if m < 0 {
            panic(errNegativeRead)
        }

        b.buf = b.buf[:i+m]
        n += int64(m)
        if e == io.EOF {
            return n, nil // e is EOF, so return nil explicitly
        }
        if e != nil {
            return n, e
        }
    }
}

b.grow(n) 函數用來將 bytes.Buffer 擴容,以便容納下 n 個 byte,如果 b 已經無法擴容了,則會產生一個 panic,攜帶 ErrTooLarge error。

bytes.Buffer 的定義如下:

// A Buffer is a variable-sized buffer of bytes with Read and Write methods.
// The zero value for Buffer is an empty buffer ready to use.
type Buffer struct {
    buf      []byte // contents are the bytes buf[off : len(buf)]
    off      int    // read at &buf[off], write at &buf[len(buf)]
    lastRead readOp // last read operation, so that Unread* can work correctly.
}

b.grow(n) 函數的邏輯爲:

  1. 如果 b.buf 的長度 len + n 小於等於 b.buf 的容量:cap(b.buf)。則直接返回長度 len。

  2. 如果 b.buf = nil 並且 n < 64時,則會新建一個長度爲 n,容量爲 64 的 []byte 數組並返回。

  3. 如果b.buf的長度 len + n小於等於 b.buf 一半容量:cap(b.buf)/2,就把b.buf[b.offset:]部分的數據移到b.buf開頭,b.offset是 bytes.Buffer 開始讀的位置,這樣就是把 b.buf中可用的數據向前移到開頭。

  4. 如果b.buf的長度 len + n大於 b.buf 一半容量:cap(b.buf)/2,則會調用 makeSlice分配一個新的 []byte,長度爲當前容量的二倍加 n:cap(b.buf)*2+n,然後把原來的數據複製到新 buf 中:copy(buf, b.buf[b.off:])

所以說,如果bytes.Buffer初始的 buf 容量不夠大,而需要讀取的數據太大的話,會頻繁的進行內存分配,這是耗時增加的原因。

而在readall函數中,bytes.Buffer 的初始容量是 512 字節,之後會成倍增加直到滿足數據大小。

3.2. 資源分配分析

爲了避免網絡 IO 測試對外產生的影響,使用磁盤 IO 來替代網絡 IO,分析 ioutil.ReadAll 的內存分配。磁盤 IO 使用一個 72MB 的測試文件:test.data.rar

寫一個下面的單測代碼:

func TestReadAll(t *testing.T) {
    file, err := os.Open(testName)
    if err != nil {
        t.Errorf("open err:%v", err)
        return
    }
    _, err = ioutil.ReadAll(file)
    if err != nil {
        t.Errorf("readall err:%v", err)
        return
    }
}

執行單元測試,並儲存內存和 cpu 概要信息。

go test --run TestReadAll$ -v -memprofile readall.mem -memprofilerate 1 -cpuprofile readall.cpu

接下來使用 pprof 分析內存和 cpu 的概要文件。

3.2.1. cpu 分析

首先分析 cpu 概要文件。在 bash 中輸入:

go tool pprof -http=:8080 readall.cpu

則會在打開一個頁面,裏面就是各個函數的耗時,例如,TestReadAll 就花了 90ms。

3.2.2. 內存分析

接下來是內存概要文件分析。在 bash 中輸入:

go tool pprof -http=:8080 readall.mem

可以看到在 ioutil.ReadAll 進行了多次內存分配。這是因爲在 ioutil.ReadAll 內部會多次調用 bytes.BufferGrow(n) 函數進行擴容,最後一次擴容產生了一個 128 MB 的切片。

128MB 正好是測試文件大小向上取整的 512 字節的整數倍。

4. io.Copy

前面說到,使用 ioutil.ReadAll 讀取大文件時會出現頻繁的內存分配,增加大量不必要的耗時。

那我們會想,可以直接避免內存頻繁分配嗎?反正內存也不會省,那我們在之前直接一次分配夠了,之後就不會有額外的內存分配耗時了。

io.Copy 就可以實現這個功能。

4.1. 預分配文件大小內存

func TestIOCopy(t *testing.T) {
    file, err := os.Open(testName)
    if err != nil {
        t.Errorf("open err:%v", err)
        return
    }
    data := make([]byte, 0, 74077894)
    buf := bytes.NewBuffer(data)
    _, err = io.Copy(buf, file)
    if err != nil {
        t.Errorf("readall err:%v", err)
        return
    }
}

在上面代碼中,預分配文件大小的內存,然後調用 io.Copy複製數據。

io.Copy 函數中會直接調用 buf.ReadFrom 讀取 file 中的數據。

// ReadFrom reads data from r until EOF and appends it to the buffer, growing
// the buffer as needed. The return value n is the number of bytes read. Any
// error except io.EOF encountered during the read is also returned. If the
// buffer becomes too large, ReadFrom will panic with ErrTooLarge.
func (b *Buffer) ReadFrom(r io.Reader) (n int64, err error) {
    b.lastRead = opInvalid
    for {
        i := b.grow(MinRead)
        b.buf = b.buf[:i]
        m, e := r.Read(b.buf[i:cap(b.buf)])
        if m < 0 {
            panic(errNegativeRead)
        }

        b.buf = b.buf[:i+m]
        n += int64(m)
        if e == io.EOF {
            return n, nil // e is EOF, so return nil explicitly
        }
        if e != nil {
            return n, e
        }
    }
}

執行單測生成 cpu 和內存概要文件:

go test --run TestIOCopy -v -memprofile iocopy.mem -memprofilerate 1 -cpuprofile iocopy.cpu

分析 cpu 時間如下,可以看到只花了 40ms,比之前的 ioutil.ReadAll 低 50ms。但是還是調用了 buffer.grow 函數,說明在這個單測中還是存在額外的內存分配。

分析內存概要文件如下,可以發現的確有額外的內存分配,並且分配的內存是文件大小的兩倍。這說明耗時還有進一步下降的空間。

4.2. 預分配雙倍文件大小內存

在代碼中預先分配雙倍文件大小的內存:

func TestIOCopy(t *testing.T) {
    file, err := os.Open(testName)
    if err != nil {
        t.Errorf("open err:%v", err)
        return
    }
    data := make([]byte, 0, 74077894*2)
    buf := bytes.NewBuffer(data)
    _, err = io.Copy(buf, file)
    if err != nil {
        t.Errorf("readall err:%v", err)
        return
    }
}

執行單測,分析 cpu 和內存概要文件。

分析 cpu 耗時,可以看到只花了 10ms,比最開始使用 ioutil.ReadAll 減少 80ms。

內存概要分析如下,可以看到除了最開始的內存分配,代碼內部沒有額外的內存分配了,這也是耗時進一步下降的原因。

5. 併發壓測

前面的測試只是運行一次,比較 cpu 耗時和內存分配次數。但是在實際業務中,可能存在併發 IO 的情形,這種情況下,io.copyioutil.ReadAll 能提高多少性能呢?

下面的單測中,分別運行 100 次併發示例進行比較,在 readAllDataiocpoyData 函數中併發度控制在 10,計算量爲 100。執行單元測試,統計總的 cpu 耗時和內存分佈。

注意,下面的 iocpoyData 函數中,預分配的內存大小是雙倍的文件大小。

func TestReadAllIOCopy(t *testing.T) {
    for i := 0; i < 100; i++ {
        readmax, readtotal := readAllData(t, testName)
        copymax, copytotal := iocopyData(t, testName)
        t.Logf("Max copy/read:%v, total copy/read:%v",
            float64(copymax)/float64(readmax), float64(copytotal)/float64(readtotal))
    }
}
func readAllData(t *testing.T, fileName string) (int64, int64) {
    mu := &sync.Mutex{}
    var max int64
    var total int64
    ctrl := make(chan struct{}, 10)
    wg := &sync.WaitGroup{}
    for i := 0; i < 100; i++ {
        ctrl <- struct{}{}
        wg.Add(1)
        go func() {
            defer func() {
                <-ctrl
                wg.Done()
            }()
            start := time.Now()
            file, err := os.Open(fileName)
            if err != nil {
                t.Errorf("open err:%v", err)
                return
            }
            _, err = ioutil.ReadAll(file)
            if err != nil {
                t.Errorf("readall err:%v", err)
                return
            }
            cost := time.Since(start).Milliseconds()
            atomic.AddInt64(&total, cost)
            mu.Lock()
            if cost > max {
                max = cost
            }
            mu.Unlock()
        }()
    }
    wg.Wait()
    return max, total
}

func iocopyData(t *testing.T, fileName string) (int64, int64) {
    mu := &sync.Mutex{}
    var max int64
    var total int64
    wg := &sync.WaitGroup{}
    ctrl := make(chan struct{}, 10)
    for i := 0; i < 100; i++ {
        ctrl <- struct{}{}
        wg.Add(1)
        go func() {
            defer func() {
                <-ctrl
                wg.Done()
            }()
            start := time.Now()
            file, err := os.Open(fileName)
            if err != nil {
                t.Errorf("open err:%v", err)
                return
            }
            fileInfo, er := os.Stat(fileName)
            if er != nil {
                t.Errorf("state err:%v", err)
                return
            }
            data := make([]byte, 0, fileInfo.Size()*2)
            buf := bytes.NewBuffer(data)
            _, err = io.Copy(buf, file)
            if err != nil {
                t.Errorf("copy err:%v", err)
                return
            }
            cost := time.Since(start).Milliseconds()
            atomic.AddInt64(&total, cost)
            mu.Lock()
            if cost > max {
                max = cost
            }
            mu.Unlock()
        }()
    }
    wg.Wait()
    return max, total
}

5.1. cpu 分析

下圖是 cpu 時間的分析,可以看到 readAllData 花了 704.03s,iocopyData 只花了 161s,後者是前者比例的 22.8%。

兩個函數都會調用 runtime.makeSlice 進行內存分配,不同的是 readAllData 花費了 248.8s 在調用這個函數上面,而 readAllData 只花了 131.6s,後者是前者的 52.8%,這個結果也是和代碼實現相吻合的。

5.2. 內存分析

接下來看一下兩者的內存分析。

readAllData 在內部多次分配內存,所以內存消耗也要比 iocopyData 大很多。readAllData 執行的時候花了 2.44TB 的內存,幾乎全部用在了 bytes.makeSlice 上面;而 iocopyData 則只在最開始手動進行了內存分配,共花了 1.35TB 內存了;後者是前者內存消耗的 55.3%。這個比例與前面內存分配消耗的時間比例也是吻合的。

總結

綜上所述,在涉及頻繁 IO 的情況下,儘可能使用 io.Copy 並且分配指定內存可以降低代碼運行時間,並且提高內存效率。當指定的內存大小是需要讀取的數據大小的兩倍時,效率達到最高。

本文由 Readfog 進行 AMP 轉碼,版權歸原作者所有。
來源https://mp.weixin.qq.com/s/NzCn8rw09Ogy7tUTs_O0Mg