WebSocket 協議 - 源碼分析
本文是 WebSocket 系列文章的第 3 篇,從源碼角度理解 WebSocket 是如何實現的。分析的是 gorilla websocket,即 WebSocket 協議 - 實戰中服務端使用的 WebSocket 庫。
Gorilla WebSocket 是一個由 Go 語言實現的,經過很好測試並且廣泛使用的 WebSocket 庫,它提供了簡單易用、功能強大的 API 接口。目前在 github 上已有 2 萬 +⭐️。
Part1 前言
Gorilla WebSocket 既然是一個庫,對於使用者來說,就是調用該庫的 API 接口完成業務功能。庫封裝了內部功能,屏蔽底層實現,使用者無需關心內部實現邏輯。而本文是源碼分析,就是要分析提供 API 接口的內部實現。所以我們就從 API 入手,抽絲剝繭瞭解實現細節。
WebSocket 作爲一個應用層協議,與 HTTP 協議處於同一層級。Gorilla WebSocket 庫層次結構如下,位於底層 TCP 協議和業務模塊之間。
可以看到 Gorilla WebSocket 處於承上啓下的關鍵位置。我們的業務模塊數據是如何寫入到下層的 TCP;對端發送的 TCP 數據又是如何解析爲業務層數據;初始階段 HTTP 連接是如何提升爲 WebSocket。這依次對應着 Gorilla WebSocket 中的 Upgrade、Read 和 Write 關鍵接口,在下面的章節依次詳細分析。
Part2 Upgrade 實現
在 WebSocket 協議 - 概念原理中提到 WebSocket 握手階段採用的是 HTTP 協議,該過程對應到 Gorilla WebSocket 庫中就是 Upgrade 接口。
1 原理
握手採用 HTTP 協議,那 Upgrade 接口位於 HTTP 層之上,此時可以將 Upgrade 理解爲 HTTP 的 handler 函數,一個特殊的業務處理函數。
標準的 HTTP handler 處理函數接受一個 http.ResponseWriter 和 *http.Request 參數,Upgrade 既然是一個特殊的 handler,那麼它的入參也必有一個 http.ResponseWriter 和 *http.Request。事實確實如此,Upgrade 的簽名爲 func (u *Upgrader) Upgrade(w http.ResponseWriter, r *http.Request, responseHeader http.Header) (*Conn, error)
。
Upgrade 會返回一個 *Conn
,這是一個 WebSocket 連接句柄。所以通過調用 Upgrade 方法,成功建立起 WebSocket 連接。
2 靜態結構
3 處理流程
4 關鍵代碼分析
參數校驗工具函數
tokenListContainsValue 是一個參數校驗工具函數,在 util.go 文件中,檢查 name 和 value 值是否在 Header 中。
if !tokenListContainsValue(r.Header, "Connection", "upgrade") {
...
}
if !tokenListContainsValue(r.Header, "Upgrade", "websocket") {
...
}
if !tokenListContainsValue(r.Header, "Sec-Websocket-Version", "13") {
...
}
現在來看 tokenListContainsValue 實現細節,入參 header 本質上是一個 map[string][]string
類型,所以根據傳入的 name 從 header 中取出對應的 value,value 是一個 []string, 所以依次判斷每個 string 是否有給定的 value 值。由於每個 value 支持逗號分隔,所以調用 nextToken
提取每個 token。例如,如果從 header 獲取到的 value 是 "other,websocket,more", tokenListContainsValue(r.Header, "Upgrade", "websocket")
會返回 true。
func tokenListContainsValue(header http.Header, name string, value string) bool {
headers:
for _, s := range header[name] {
for {
var t string
t, s = nextToken(skipSpace(s))
if t == "" {
continue headers
}
s = skipSpace(s)
if s != "" && s[0] != ',' {
continue headers
}
if equalASCIIFold(t, value) {
return true
}
if s == "" {
continue headers
}
s = s[1:]
}
}
return false
}
NOTE:📢注意上述代碼使用 continue label,我們可能使用的比較少,特別是在其他語言中很少有類似的語法。其實在 Go 語言中,break label 和 continue label 是兩個非常有趣的語法。在我之前的文章 Go 語言中常見 100 問題 -#34 Ignoring how the break statement works 有專門講解 break label。
continue label 實現的效果是跳過 for _, s := range header[name]
當前的循環,繼續剩餘循環處理。因爲 header[name] 值是一個 string 切片,所以代碼中 continue headers 效果是跳過當前 s, 繼續執行下一個字符串處理。
採用 continue headers 本質是跳過內存的 for 循環,繼續外層的 for 循環。但是我個人認爲作者這裏寫複雜了,採用 break 也可以實現相同的效果(即將 continue headers 替換爲 break),並且代碼更簡潔。
Hijack 獲取原始 TCP 連接
通過 Upgrade 操作將 HTTP 協議升級爲 WebSocket 協議,一個重要的目標是獲取底層 TCP 的控制權,拿到原始的 TCP 連接句柄,後續處理不在經過 HTTP 庫,直接由 WebSocket 接管。
具體獲取代碼如下,ResponseWriter 實現了 Hijacker 接口,通過它調用 h.Hijack() 方法即獲得原始的 netConn。默認的 ResponseWriter 對於 HTTP/1.x 連接支持 Hijacker 接口, 而 HTTP2 不支持,所以進行 w.(http.Hijacker) 斷言處理。
h, ok := w.(http.Hijacker)
if !ok {
return u.returnError(w, r, http.StatusInternalServerError, "websocket: response does not implement http.Hijacker")
}
var brw *bufio.ReadWriter
netConn, brw, err := h.Hijack()
if err != nil {
return u.returnError(w, r, http.StatusInternalServerError, err.Error())
}
Part3 Read 實現
Read 即從 TCP 連接中讀取數據,然後將讀到的二進制數據解析爲 WebSocket 協議幀結構,簡單來說就是數據序列化。
1 原理
根據如下 WebSocket 幀結構,將收到的二進制數據解析出來。
0 1 2 3
0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1
+-+-+-+-+-------+-+-------------+-------------------------------+
|F|R|R|R| opcode|M| Payload len | Extended payload length |
|I|S|S|S| (4) |A| (7) | (16/64) |
|N|V|V|V| |S| | (if payload len==126/127) |
| |1|2|3| |K| | |
+-+-+-+-+-------+-+-------------+ - - - - - - - - - - - - - - - +
| Extended payload length continued, if payload len == 127 |
+ - - - - - - - - - - - - - - - +-------------------------------+
| |Masking-key, if MASK set to 1 |
+-------------------------------+-------------------------------+
| Masking-key (continued) | Payload Data |
+-------------------------------- - - - - - - - - - - - - - - - +
: Payload Data continued ... :
+ - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - +
| Payload Data continued ... |
+---------------------------------------------------------------+
2 處理流程
3 關鍵代碼分析
讀取操作核心處理都在 NextReader 函數中,該函數返回 WebSocket 消息數據類型以及 data 讀取器。
關鍵操作在如下的 for 循環中,也就是說在讀取數據出現異常,或者讀取到的是文本數據或二進制數據纔會返回。其他像讀取到的是控制消息都不會返回,繼續卡住。因爲調用方是業務模塊,所以只關心讀取到業務數據纔會處理,如果沒有讀取阻塞在 NextReader 上完全合情合理。
func (c *Conn) NextReader() (messageType int, r io.Reader, err error) {
...
for c.readErr == nil {
frameType, err := c.advanceFrame()
if err != nil {
c.readErr = hideTempErr(err)
break
}
if frameType == TextMessage || frameType == BinaryMessage {
c.messageReader = &messageReader{c}
c.reader = c.messageReader
if c.readDecompress {
c.reader = c.newDecompressionReader(c.reader)
}
return frameType, c.reader, nil
}
}
...
return noFrame, nil, c.readErr
}
真正讀取操作是在 advanceFrame 方法中。調用 c.read(2) 讀取前兩個字節。將第一個字節即 p[0] 與 0xf 取與操作,獲取到低 4 個 bit 值,對應到原理小節中的 opcode 字段。
將 p[1] 即第二字節與 0x7f 進行取與操作,得到其低 7 個 bit,即對應原理圖中的 Payload len。如果值爲 126,讀取接下的 2 個字節,即爲實際 data 大小,如果值爲 127,則讀取接下來的 8 個字節。
func (c *Conn) advanceFrame() (int, error) {
...
var errors []string
p, err := c.read(2)
if err != nil {
return noFrame, err
}
frameType := int(p[0] & 0xf)
final := p[0]&finalBit != 0
rsv1 := p[0]&rsv1Bit != 0
rsv2 := p[0]&rsv2Bit != 0
rsv3 := p[0]&rsv3Bit != 0
mask := p[1]&maskBit != 0
c.setReadRemaining(int64(p[1] & 0x7f))
...
switch c.readRemaining {
case 126:
p, err := c.read(2)
if err != nil {
return noFrame, err
}
if err := c.setReadRemaining(int64(binary.BigEndian.Uint16(p))); err != nil {
return noFrame, err
}
case 127:
p, err := c.read(8)
if err != nil {
return noFrame, err
}
if err := c.setReadRemaining(int64(binary.BigEndian.Uint64(p))); err != nil {
return noFrame, err
}
}
...
}
Part4 Write 實現
Write 即向 TCP 連接中寫入數據,然後將業務模塊發送的 WebSocket 幀數據翻譯爲二進制數據,簡單來說就是數據反序列化,跟 Read 是逆向操作。
1 原理
根據 WebSocket 幀結構,將業務層數據封裝成二進制數據。
2 處理流程
3 關鍵代碼分析
Write 核心操作在於封包,下面結合代碼分析具體實現細節。封包處理在 flushFrame 方法中實現。
構造 WebSocket 的第一個字節 b0 和第二個字節 b1。c.writeBuf 承載整個 WebSocket 報文數據。
func (w *messageWriter) flushFrame(final bool, extra []byte) error {
...
b0 := byte(w.frameType)
if final {
b0 |= finalBit
}
if w.compress {
b0 |= rsv1Bit
}
w.compress = false
b1 := byte(0)
if !c.isServer {
b1 |= maskBit
}
...
}
WriteBuf 結構如下,開頭預留了 2+8+4 個字節,這個是按 header 最大長度預留的。maxFrameHeaderSize 後面纔是實際的數據。
統一預留最大 14 個字節的 header 是方便代碼處理,對於 server 端,是不用填充 Masking-key 數據的,所以代碼中直接將 framePos 設置爲 4,真正有效數據是 c.WriteBuf[4:],而對於 client 端,需要填充 Masking-key , 所以它的 framePos 就是 0。
func (w *messageWriter) flushFrame(final bool, extra []byte) error {
...
// Assume that the frame starts at beginning of c.writeBuf.
framePos := 0
if c.isServer {
// Adjust up if mask not included in the header.
framePos = 4
}
...
}
填充數據長度大小到 payload length。這一部分大小是不固定的,如果 data 長度大於等於 65536,則表示長度這一部分佔 8 個字節。
case length >= 65536:
c.writeBuf[framePos] = b0
c.writeBuf[framePos+1] = b1 | 127
binary.BigEndian.PutUint64(c.writeBuf[framePos+2:], uint64(length))
如果 data 長度小於 65536 但大於 125,則表示長度這一部分佔 2 個字節。
case length > 125:
framePos += 6
c.writeBuf[framePos] = b0
c.writeBuf[framePos+1] = b1 | 126
binary.BigEndian.PutUint16(c.writeBuf[framePos+2:], uint16(length))
如果 data 長度小於等於 125,這不用額外空間表示長度,payload len 大小就是。
default:
framePos += 8
c.writeBuf[framePos] = b0
c.writeBuf[framePos+1] = b1 | byte(length)
Part5 併發性分析
關於讀寫的併發性,doc.go 文件中有如下說明。核心就是不支持多併發讀、多併發寫。一個併發讀和一個併發寫是支持的。
Connections support one concurrent reader and one concurrent writer.Applications are responsible for ensuring that no more than one goroutine calls the write methods (NextWriter, SetWriteDeadline, WriteMessage,WriteJSON, EnableWriteCompression, SetCompressionLevel) concurrently and that no more than one goroutine calls the read methods (NextReader,SetReadDeadline, ReadMessage, ReadJSON, SetPongHandler, SetPingHandler)concurrently.
像下面這樣開啓一個讀 goroutine 和 一個寫 goroutine 是✅正確的。
conn, err := websocketUpgrader.Upgrade(w, r, nil)
if err != nil {
log.Println(err)
return
}
client := NewClient(conn, m)
m.addClient(client)
go client.readMessages()
go client.writeMessages()
下面同時開啓兩個寫 goroutine 是❎錯誤的。
conn, err := websocketUpgrader.Upgrade(w, r, nil)
if err != nil {
log.Println(err)
return
}
client := NewClient(conn, m)
m.addClient(client)
go client.writeMessages()
go client.writeMessages()
下面來看寫操作爲啥不支持併發,代碼註釋強調 "best-effort” 意味着系統會盡力去執行這項檢測,但可能不保證在所有情況下都能檢測到併發寫入。
Write the buffers to the connection with best-effort detection of concurrent writes.
在進行寫入前,先判斷 c.isWriting 是否 true, 如果已經爲 true, 表明有寫操作正在進行,直接 panic。
如果沒有人寫入,先將 c.isWriting 設置爲 true, 然後發送數據,發送完畢之後又將 c.isWriting 置爲 false。
func (w *messageWriter) flushFrame(final bool, extra []byte) error {
...
// Write the buffers to the connection with best-effort detection of
// concurrent writes. See the concurrency section in the package
// documentation for more info.
if c.isWriting {
panic("concurrent write to websocket connection")
}
c.isWriting = true
// 向tcp socket中寫入數據
err := c.write(w.frameType, c.writeDeadline, c.writeBuf[framePos:w.pos], extra)
if !c.isWriting {
panic("concurrent write to websocket connection")
}
c.isWriting = false
...
}
寫操作 write 通過 channel 進行排他,要從 channel 獲取到數據後才能進行。通過 defer 保證寫之後向通道發送數據,爲下一次數據寫入做好準備。
func (c *Conn) write(frameType int, deadline time.Time, buf0, buf1 []byte) error {
<-c.mu
defer func() { c.mu <- struct{}{} }()
...
return nil
}
可以看到,上述 write 操作通過 channel 保證順序執行,保證寫入安全。真正不能併發的原因,是在外層函數,即下面的代碼只是在儘可能檢查是否併發,但是還是有可能兩個 goroutine 邏輯同時走到 if c.isWriting
,並且 c.isWriting 爲 false,都會繼續後面的邏輯,引發第二次 if !c.isWriting
判斷時出現 panic。
if c.isWriting {
panic("concurrent write to websocket connection")
}
c.isWriting = true
err := c.write(w.frameType, c.writeDeadline, c.writeBuf[framePos:w.pos], extra)
if !c.isWriting {
panic("concurrent write to websocket connection")
}
c.isWriting = false
本文由 Readfog 進行 AMP 轉碼,版權歸原作者所有。
來源:https://mp.weixin.qq.com/s/HD4FMKLvHkqBlWDFqWRcqg