Golang - 長連接 - 狀態推送

【導讀】本文是一篇 Go 預言實現的後段狀態推送設計與實踐,寫的非常詳細,一起來學習吧!

狀態推送

前言:掃碼登錄功能自微信提出後,越來越多的被應用於各個 web 與 app。這兩天公司要做一個掃碼登錄功能,在 leader 的技術支持幫助下(基本都靠 leader 排坑),終於將服務搭建起來,並且支持上萬併發。

長連接選擇

決定做掃碼登錄功能之後,在網上查看了很多的相關資料。對於掃碼登錄的實現方式有很多,淘寶用的是輪詢,微信用長連接,QQ 用輪詢……。方式雖多,但目前看來大體分爲兩種,1: 輪詢,2: 長連接。(兩種方式各有利弊吧,我研究不深,優缺點就不贅述了)
在和 leader 討論之後選擇了用長連接的方式。所以對長連接的實現方式調研了很多:

  1. 微信長連接:通過動態加載 script 的方式實現。

這種方式好在沒有跨域問題。

2.websocket 長連接:在 PC 端與服務端搭起一條長連接後,服務端主動不斷地向 PC 端推送狀態。這應該是最完美的做法了。

  1. 我使用的長連接:PC 端向服務端發送請求,服務端並不立即響應,而是 hold 住,等到用戶掃碼之後再響應這個請求,響應後連接斷開。

爲什麼不採用 websocket 呢?因爲當時比較急、而對於 websocket 的使用比較陌生,所以沒有使用。不過我現在這種做法在資源使用上比 websocket 低很多。

接口設計

(本來想把 leader 畫的一副架構圖放上來,但涉及到公司,不敢)
自己畫的一副流程圖

稍微解釋一下:
第一條連接:打開 PC 界面的時候向服務端發送請求並建立長連接(1)。當 APP 成功掃碼後(2),響應這次請求(3)。
第二條連接類似。

分析得出我們的服務只需要兩個接口即可

  1. 與 PC 建立長連接的接口
  2. 接收 APP 端數據並將數據發送給前端的接口

再細想可將這兩個接口抽象爲:
1.PC 獲取狀態接口:get
2.APP 設置狀態接口:set

具體實現

用 GO 寫的(不多嗶嗶)
長連接的根本原理:連接請求後,服務端利用 channel 阻塞住。等到 channel 中有 value 後,將 value 響應

Router
func Router(){
    http.HandleFunc("/status/get", Get)
    http.HandleFunc("/status/set", Set)
}
GET

每一條連接需要有一個 KEY 作標識,不然 APP 設置的狀態不知道該發給那臺 PC。每一條連接即一個 channel

var Status map[string](chan string) = make(map[string](chan string))

func Get(w http.ResponseWriter, r *http.Request){
    ...        //接收key的操作
    key = ...  //PC在請求接口時帶着的key
    Status[key] = make(chan string)    //不需要緩衝區
    value := <-Status[key]
    ResponseJson(w, 0, "success", value)    //自己封的響應JSON方法
}
SET

APP 掃碼後可以得到二維碼中的 KEY,同時將想給 PC 發送的 VALUE 一起發送給服務端

func Set(w http.ResponseWriter, r *http.Request){
    ...        
    key = ...
    value = ...    //向PC傳遞的值
    Status[key] <- value
}

這就是實現的最基本原理。
接下來我們一點點實現其他的功能。

1. 超時

從網上找了很多資料,大部分都說這種方式

srv := &http.Server{  
    ReadTimeout: 5 * time.Second,
    WriteTimeout: 10 * time.Second,
}
log.Println(srv.ListenAndServe())

這種方式確實是設置讀超時與寫超時。但(親測)這種超時方式並不友善,假如現在WriteTimeout是 10s,PC 端請求過來之後,長連接建立。PC 處於pending狀態,並且服務端被channel阻塞住。10s 之後,由於超時連接失效(並沒有斷,我也不瞭解其中原理)。PC 並不知道連接斷了,依然處於pending狀態,服務端的這個goroutine依然被阻塞在這裏。這個時候我調用 set 接口,第一次調用沒用反應,但第二次調用 PC 端就能成功接收 value。

從圖可以看出,我設置的WriteTimeout爲 10s,但這條長連接即使 15s 依然能收到成功響應。(ps:我調用了兩次 set 接口,第一次沒有反應)


研究後決定不使用這種方式設置超時,採用接口內部定時的方式實現超時返回

select {
    case <-`Timer`:
        utils.ResponseJson(w, -1, "timeout", nil)
    case value := <-statusChan:
        utils.ResponseJson(w, 0, "success", value)
    }

Timer即爲定時器。剛開始 Timer 是這樣定義的

Timer := time.After(60 * time.Second)

60s 後Timer會自動返回一個值,這時上面的通道就開了,響應 timeout
但這樣做有一個弊端,這個定時器一旦創建就必須等待 60s,並且我沒想到辦法提前將定時器關了。如果這個長連接剛建立後 5s 就被響應,那麼這個定時器就要多存在 55s。這樣對資源是一種浪費,並不合理。
這裏選用了context作爲定時器

ctx, cancel := context.WithTimeout(context.Background(), time.Duration(Timeout)*time.Second)
defer cancel()
select {
    case <-ctx.Done():
        utils.ResponseJson(w, -1, "timeout", nil)
    case result := <-Status[key]:
        utils.ResponseJson(w, 0, "success", result)
}

ctx 在初始化的時候就設置了超時時間time.Duration(Timeout)*time.Second
超時之後ctx.Done()返回完成,起到定時作用。如果沒有 cancel() 則會有一樣的問題。原因如下

context 對比 time 包。提供了手動關閉定時器的方法cancel()
只要 get 請求結束,都會去關閉定時器,這樣可以避免資源浪費(一定程度避免內存泄漏)。

即使 golang 官方文檔中,也推薦defer cancel()這樣寫

官方文檔也寫到:即使 ctx 會在到期時關閉,但在任何場景手動調用 cancel 都是很好的做法。

2. 多機支持

服務如果只部署在一臺機器上,萬一機器跪了,那就全跪了。
所以我們的服務必須同時部署在多個機器上工作。即使其中一臺掛了,也不影響服務使用。
這個圖不會畫,只能用 leader 的圖了

在項目初期討論的時候 leader 給出了兩種方案。1. 如圖使用 redis 做多機調度。2. 使用 zookeeper 將消息發送給多機

因爲現在是用 redis 做的,只講述下 redis 的實現。(但依賴 redis 並不是很好,多機的負載均衡還要依賴其他工具。zookeeper 能夠解決這個問題,之後會將 redis 換成 zookeeper)

首先我們要明確多機的難點在哪?

我們有兩個接口,get、set。get 是給前端建立長連接用的。set 是後端設置狀態用的。

假設有兩臺機器 A、B。若前端的請求發送到 A 機器上,即 A 機器與前端連接,此時後端調用 set 接口,如果調用的是 A 機器的 set 接口,那是最好,長連接就能成功響應。但如果調用了 B 機器的 set 接口,B 機器上又沒有這條連接,那麼這條連接就無法響應。
所以難點在於如何將同一個 key 的 get、set 分配到一臺機器。

有人給我提過一個意見:在做負載均衡的時候,就將連接分配到指定機器。剛開始我覺的很有道理,但細細想,如果這樣做,在以後如果要加機器或減機器的時候會很麻煩。對橫向的增減機器不友善。

最後我還是採用了 leader 給出的方案:用 redis 綁定 key 與機器的關係

即前端請求到一臺機器上,以 key 做鍵,以機器 IP 做值放在 redis 裏面。後端請求 set 接口時先用 key 去 redis 裏面拿到機器 IP,再將 value 發送到這臺機器上。

此時就多了一個接口,用於機器內部相互調用

ChanSet
func Router(){
    http.HandleFunc("/status/get", Get)
    http.HandleFunc("/status/set", Set)
    http.HandleFunc("/channel/set", ChanSet)
}

func ChanSet(w http.ResponseWriter, r *http.Request){
    ...
    key = ...
    value = ...
    Status[key] <- value
}
GET
func Get(w http.ResponseWriter, r *http.Request){
    ...        
    IP = getLocalIp()       //得到本機IP
    RedisSet(key, IP)       //以key做鍵,IP做值放入redis
    Status[key] <- value
    ...
}
SET
func Set(w http.ResponseWriter, r *http.Request){
    ...
    IP = RedisGet(key)    //用key去取對應機器的IP
    Post(IP, key, value) //將key與value都發送給這臺機器
}

這裏相當於用 redis sentinel 做多臺機器的通信。哨兵會幫我們將數據同步到所有機器上

這樣即可實現多機支持

3. 跨域

剛部署到線上的時候,第一次嘗試就跪了。查看錯誤...(Access-Control-Allow-Origin)...
因爲前端是通過 AJAX 請求的長連接服務,所以存在跨域問題。

在服務端設置允許跨域

func Get(w http.ResponseWriter, r *http.Request){
    ...
    w.Header().Set("Access-Control-Allow-Origin""*")
    w.Header().Add("Access-Control-Allow-Headers""Content-Type")
    ...
}

若是像微信的做法,動態的加載 script 方式,則沒有跨域問題。

服務端直接允許跨域,可能會有安全問題,但我不是很瞭解,這裏爲了使用,就允許跨域了。

4.Map 併發讀寫問題

跨域問題解決之後,線上可以正常使用了。緊接着請測試同學壓測了一下。
預期單機併發 10000 以上,測試同學直接壓了 10000,服務掛了。
可能預期有點高,5000 吧,於是壓了 5000,服務掛了。
1000 呢,服務掛了。
100,服務掛了。
……
這下豁然開朗,不可能是機器問題,絕對是有 BUG
看了下報錯

去看了下官方文檔

Map 是不能併發的寫操作,但可以併發的讀。
原來對 Map 操作是這樣寫的

func Get(w http.ResponseWriter, r *http.Request){
    ...
    `Status[key] = make(chan string)`
    `defer close(Status[key])`
    ...
    select {
    case <-ctx.Done():
        utils.ResponseJson(w, -1, "timeout", nil)
    case `result := <-Status[key]`:
        utils.ResponseJson(w, 0, "success", result)
    }
    ...
}

func ChanSet(w http.ResponseWriter, r *http.Request){
    ...
    `Status[key] <- value`
    ...
}

Status[key] = make(chan string)在 Status(map) 裏面初始化一個通道,是 map 的寫操作
result := <-Status[key]從 Status[key] 通道中讀取一個值,由於是通道,這個值取出來後,通道內就沒有了,所以這一步也是對 map 的寫操作
Status[key] <- value向 Status[key] 內放入一個值,map 的寫操作
由於這三處操作的是一個 map,所以要加同一把鎖

var Mutex sync.Mutex
func Get(w http.ResponseWriter, r *http.Request){
    ...
    //這裏是同組大佬教我的寫法,通道之間的拷貝傳遞的是指針,即statusChan與Status[key]指向的是同一個通道
    statusChan := make(chan string)
    Mutex.Lock()
    Status[key] = statusChan
    Mutex.Unlock()
    
    //在連接結束後將這些資源都釋放
    defer func(){
        Mutex.Lock()
        delete(Status, key)
        Mutex.Unlock()
        close(statusChan)
        RedisDel(key)
    }()
    
    select {
        case <-ctx.Done():
            utils.ResponseJson(w, -1, "timeout", nil)
        case result := <-statusChan:
            utils.ResponseJson(w, 0, "success", result)
    }
    ...
}

func ChanSet(w http.ResponseWriter, r *http.Request){
    ...
    Mutex.Lock()
    Status[key] <- value
    Mutex.Unlock()
    ...
}

到現在,服務就可以正常使用了,並且支持上萬併發。

5.Redis 過期時間

服務正常使用之後,leader review 代碼,提出 redis 的數據爲什麼不設置過期時間,反而要自己手動刪除。我一想,對啊。
於是設置了過期時間並且將RedisDel(key)刪了。
設置完之後不出意外的服務跪了。
究其原因
我用一個 key=1 請求 get,會在 redis 內存儲一條數據記錄 (1 => Ip). 如果我 set 了這條連接,按之前的邏輯會將 redis 裏的這條數據刪掉,而現在是等待它過期。若是在過期時間內,再次以這個 key=1,調用 set 接口。set 接口依然會從 redis 中拿到 IP,Post 數據到 ChanSet 接口。而 ChanSet 中Status[key] <- value由於Status[key]是關閉的, 會阻塞在這裏,阻塞不要緊,但之前這裏加了鎖,導致整個程序都阻塞在這裏。
這裏和 leader 討論過,仍使用 redis 過期時間但需要修復這個 Bug

func ChanSet(w http.ResponseWriter, r *http.Request){
    Mutex.Lock()
    ch := Status[key]
    Mutex.Unlock()

    if ch != nil {
        ch <- value
    }
}

不過這樣有一個問題,就是同一個 key,在過期時間內是無法多次使用的。不過這與業務要求並不衝突。

6.Linux 文件最大句柄數

在給測試同學測試之前,自己也壓測了一下。不過剛上來就瘋狂報錯,“%¥#@¥……%……%%..too many fail open...”
搜索結果是 linux 默認最大句柄數 1024.
開了下自己的機器 ulimit \-a 果然 1024。修改(修改方法不多 BB)

7. 同時監聽兩個端口

服務有兩個 API,get 是給前端使用的,對外開放。set 是給後端使用的,內部接口。所以這兩個接口需要放在兩個端口上。
由於http.ListenAndServe()本身有阻塞,故第一個監聽需要一個 goroutine

go http.ListenAndServe(":11000", FrontendMux)    //對外開放的端口
http.ListenAndServe(":11001", BackendMux)    //內部使用的端口

轉自:hammermax

segmentfault.com/a/1190000017866100

本文由 Readfog 進行 AMP 轉碼,版權歸原作者所有。
來源https://mp.weixin.qq.com/s/PCxY5tdABVA5byIwZ1lZFg