NSQ 原理解析
不知道大家有沒有這種感覺,在做 ppt 彙報,或者一些分享時候,準備的如果用 100 來打比方,最終演講出來的效果可能不到 80,導致的因素一方面是緊張導致沒說清楚,一方面是講着講着忘記了。
當然,作爲聽衆,可能聽到的效果不足 30,因爲如果內容比較枯燥,到後半程真正會去聽的也很少;或者中途走神了,再去聽後面的已經銜接不上了,只能繼續擺爛。所以,提前拋出一些有意思的話題,可以儘可能讓聽者多聽一會。
後面的資料補充也很重要,作爲分享知識的補充,便於聽者在沒有聽清楚時候後面翻看,雖然基本也很少有人會去翻看。
不過做一場技術分享,是一件使技術人的技術情懷得到滿足的事情。
作爲一個技術人,多少還是有一些技術情懷的,主要體現在這些方面:
-
完成一個模塊,對自己的代碼沾沾自喜一陣,雖然過幾天就看不懂了
-
看一些開源項目,提個 issue,pr,利用開源項目裏面的一些做法解決了一些業務上的難題,沒白看
-
幫助別人解決技術問題
-
寫一些技術文章總結,做技術分享
說是技術情懷,也是技術人的成就感。
本文只粘貼了部分我覺得最重要的內容的 ppt 頁,偏向工作原理性質的東西,通過該文章,可以瞭解到一款最簡單的消息隊列是怎麼實現的,該文章不像以往文章大段源碼,讀起來比較輕鬆,希望爲讀者工作中的某些問題提供一些思路。
message in producer
-
客戶端的 sdk 提供了 publish 等方法,當客戶端調用 Publish 等發送消息時候。
-
客戶端與 nsqd 會建立 tcp 連接,這裏是懶加載的模式,建立連接的同時,客戶端會啓動 routerloop,readloop 等任務
-
nsqd 裏面的 tcpserver 會啓動一個 IOLoop 來服務這條 tcp 連接,
-
一切就緒了,消息會被封裝成一個 transaction,這個 transaction 包含消息體和一個 chan,並等待這個 chan,chan 返回則消息發送成功,這個 transaction 被放到 transactionChan 管道里,
-
管道會被 routerloop 消費,
-
進而將這個 transaction append 到 transactions 數組裏面,
-
之後會將消息體以及指令發送到 nsqd,nsqd 做處理。
-
nsqd 處理後返回 ok,
-
被 readloop 讀取後發往 router,中間有一個 chan,圖裏沒體現,
-
router 會將消息對應的 transaction 從切片 transactions 裏面 pop 出來,並把 transaction 中的 chan 釋放。客戶端那邊的方法就會返回。
這裏對 transactions 切片的操作是沒有加鎖的,因爲消息的發送與結束是在一個協程中處理的,是順序的發送。
message in nsqd
先看下 Topic 的數據結構
-
channelMap: topic 下 channel 集合;
-
backend: 磁盤隊列的 interface,go-diskqueue 實現,Put 方法寫消息,ReadChan 返回一個 chan,用於讀消息;
-
memoryMsgChan: 內存消息隊列,chan 類型,容量默認 10000
在每一個 topic 被創建的時候,都會啓動一個對應的 messagePump 協程,負責消費 topic 下的 memoryMsgChan 和 backend 中的 message,並把 message 分發給該 topic 每個 Channel,調用 channel 的 PutMessage 方法。
看一下 channel 的數據結構,主要比 topic 多了四個結構
channel 的 PutMessage 方法實現與 Topic 的一致,也是先寫 chan,滿了就寫 backend;區別是後面消費管道的消費者不同,channel 間接的對接了消費者。
message into consumer
-
consumer 與 nsqd 建立 tcp 連接
-
Nsqd 會爲 consumer 開啓一個 IOLoop,啓動一個 messagePump 協程服務;consumer 這邊,會啓動 readloop,writeloop 也來服務這條連接
-
consumer 發送一個 SUB 指令,指定自己要消費的 topic,channel
-
該 topic 與 channel 會被 nsqd 找到
-
Consumer 向 nsqd 發送 rdy 指令,代表自己已經就緒,nsqd 可以推數據了
-
此時該 messagePump 會消費 channel 對應的 memoryMsgChan 和 backend 中的數據
-
將消息發送到 consumer,被 consumer 的 readloop 獲取到
-
Readloop 把消息放到 incomingMessages 信道里面
-
會被業務協程獲取到,並對消息進行處理
-
業務協程返回後,會對該消息封裝成 Finish 等方法
-
發送回到 nsqd,代表該消息已經消費
可以同時啓動多個消費者對同一個 channel 做消費,增強消費能力,nsqd 端對消費者的負載均衡處理方式也很簡單。多個 messagePump 同時去監聽 channel 下的兩種消息管道。
at least once
NSQ 在消息的傳遞過程中遵循的是 at least once,所以在某些時候會出現消息重複的情況;下面是消息在傳遞給消費者過程中的流程。
消息正常處理情況:
-
Consumer 對應的 messagePump 獲取到一條消息
-
計算好消息的超時時間,將消息標記到 channel 的 inflightMessages 中,這是一個 map 結構,用於通過 ID 做標記;將消息放到 InflightPQ 中,這是一個最小堆,最靠近當前時間的消息會放在堆頂
-
將消息發送到 consumer,進行處理
-
在規定時間內處理完成,發送 Fin 信號,被 consumer 對應的 IOLoop 接收到
-
執行 FIN 指令,分別從 map 和最小堆中去掉該消息。
消息超時重傳情況,步驟 1-3 同上:
-
消息消費超時,可能是系統卡住等導致的一些情況
-
在 nsqd 啓動時候,會啓動一個 queueScanLoop 協程池,會定期去每個 channel 下面的 inflightPQ 最小堆檢查有沒有消息超時,方法就是比較堆頂的元素
-
如果超時,就會把消息 pop 出來,並從 map 中刪掉
-
將消息重新投遞到 channel 裏面,重新消費
有時候想要對一條消息進行重新消費,就可以通過 REQ 機制完成:
-
當客戶端想要將一些消息做重新消費的時候,可以通過 sdk 發送一條 REQ 的指令
-
會將消息從 inflight 的最小堆和 map 中雙雙去掉
-
將消息放到 deferedmessage map 中,並計算好下次發送的時間放到 defererPQ 最小堆中
-
啓動的 queueScanLoop 協程池中的 ququeScanWorker 會對延遲的消息進行檢查
-
一旦時間到了,就會取出消息,放到 channel 中去重新消費
磁盤隊列的數據格式
使用注意事項
MaxInFlight
大致講講 MaxInFlight 的原理,其中有很多細節沒有涉及:
併發處理消息的能力指的就是消費者一個時間點,能同時接收到多少條消息,通過這個參數可以控制 nsq 推送消息的速率。
-
假設設置 MaxInFlight 設置爲 M,consumer 能連接 nsqd 節點數量爲 N,分配給每一個 nsqd 節點的數值就是 M/N,如果值小於 1,則置爲 1
-
會給每個 nsqd 節點發送 RDY 指令時候,攜帶上這個 M/N,此時,nsqd 節點就知道該 consumer 最多一次接收多少了,設置成 rdyCount
-
當發送消息之前,會對比一下自己的 inflight 參數,如果小於 rdyCount(M/N),就發送,並自增這個 inflight,大於就不發送,等待通知(select default 嘗試通知)
-
當消息結束時候,會自減 inflifht,並通過 channel 嘗試通知到 3 步驟。實現一個類似滑動窗口的作用
NSQ 犧牲了一些副本機制,實現了高效與簡單。代碼還是很通俗易懂,感興趣的同學可以抽個時間研究一下。
本文由 Readfog 進行 AMP 轉碼,版權歸原作者所有。
來源:https://mp.weixin.qq.com/s/pve_gzKI_f0Mnl8ZYCitZQ