圖解 RocketMQ 架構
寫在前面
Kafka、RocketMQ 都是很出名的中間件,上次我們講解了 Kafka,這次我們來講講 RocketMQ 的原理。
基本架構圖
解析
RocketMQ 總共可以分成四個模塊
-
NameServer:提供
服務發現和路由功能
,管理各種元數據信息。 -
Broker:消息存儲和路由分發節點,負責
存儲消息和將消息路由
給消費者。 -
Producer:消息生產者,負責
產生併發送消息到指定的 Topic
。 -
Consumer:消息消費者,訂閱 Topic 並從 Broker 拉取消息進行處理。
大體讀寫步驟
-
註冊 Broker Cluster 到 NameServer。
-
註冊 Producer、Consumer 到 NameServer。
-
Producer 獲取 MQ 集羣的 Broker、Queue 等信息。
-
Producer 發消息,一般消息都會選擇 master 進行寫入,而 slave 進行讀取。
-
順序寫入消息到 commit log 中。
-
queue log 會記錄每條 commit log 的存儲信息,當然不會記錄所有,只記錄一些重要的,commitLogOffset 等等。
-
master 將消息異步給 slave。
-
Consumer 讀取 slave 的消息。
-
Consumer 返回 ACK 作爲確認消息消費成功。
-
NameServer
當 Broker 服務啓動後,會向 NameServer 註冊信息,比如broker中的Topic、消費偏移量、隊列、ip、端口
等,由 Broker 的心跳發送到 NameServer,BrokerCluster 中的每一個節點都會註冊到NameServer上。
所有的broker信息
。不過 NameService 是無狀態
的, NameService 之間不會相互通信,那麼一個 NameService 掛了,不會影響另外一個 NameService。
註冊完 Broker 之後,NameServer 會每隔 10s 發送心跳檢查 Broker,如果 Broker 超過 120s 還沒有響應,則這個 Broker 被視爲宕機
- Broker
2.1 CommitLog & Message Queue
Broker 啓動,跟所有的 NameServer 保持長連接,每 30s 發送一次發送心跳包(像心跳一樣持續穩定的發送請求)。心跳包中包含當前 Broker 信息 ( IP+ 端口等)以及存儲所有 Topic 信息。註冊成功後,NameServer 集羣中就有 Topic 跟 Broker 的映射關係。
-
Commit log 存儲消息實體。順序寫,隨機讀。雖然是隨機讀,但是利用 package 機制,可以批量地從磁盤讀取,作爲 cache 存到內存中,加速後續的讀取速度。
-
Message queue 存儲消息的
偏移量
。讀消息先讀 message queue,根據偏移量到 commit log 讀消息本身。
所以其實我們的消息不是存放在 queue 中,而是存放在 commit log 中,這就是爲什麼 queue 會被稱爲邏輯隊列
2.2 Index File
2.2.1 介紹
因爲所有的消息都存在 CommitLog 中,如果要實現根據 key 查詢 消息的方法,就會變得非常困難,所以爲了解決這種業務需求,有了 IndexFile 的存在。用於爲生成的索引文件提供訪問服務,通過消息 Key 值查詢消息真正的實體內容。
IndexFile 如何創建?以創建的時間戳命名。參數:phyOffset 物理偏移量(也就是 commitLogOffset)、keys。
2.2.2 按照 MessageId 查詢
RocketMQ 中的 MessageId 的長度總共有 16 字節,其中包含了消息存儲主機地址(IP 地址和端口),消息 Commit Log offset。
-
Client 端從 MessageId 中解析出 Broker 的地址(IP 地址和端口)和 Commit Log 的偏移地址發送一個 RPC 請求。
-
Broker 端讀取消息的過程用其中的 commitLog offset 和 size 去 commitLog 中找到真正的記錄並解析成一個完整的消息返回。
2.2.3 按照 Message Key 查詢
-
找槽位:slotKey =
40 byte + hash(topic + "#" + key) % 500W * 4byte
。 -
計算槽位:slotValue = 最新插入 index 的位置。
-
遍歷單向鏈表:從 slotValue 找到最新 index 在整個索引文件中位置 =
40byte +500w*4byte + slotValue*20byte
,然後根據單個索引文件的 pre index 值找到前一個索引,一直遍歷下去,直到index數據中key hash和時間區間都滿足即可
。添加到 commitLogOffset 的 list) 中。 -
最終根據其中的
commitLogOffset
從 CommitLog 文件中讀取消息的實體內容。 -
Producer
略。Producer 好像除了負載均衡
,就沒什麼好講的地方了。
- Consumer
在 RocketMQ 中,Consumer 端的兩種消費模式(Push/Pull)都是基於拉模式來獲取消息的,pull需要手動實現拉取消息,push只需要實現消費監聽器。但實際底層都是pull。
在 Consumer 啓動後,會通過定時任務不斷地向所有 Broker 實例發送心跳包,包含:消息消費分組名稱、訂閱關係集合、消息通信模式和客戶端 id 的值等信息
Broker 端在收到 Consumer 的心跳消息後,會將它維護在 ConsumerManager 的本地緩存變量。會根據消費者組獲取對應維護的消費者組信息。
如果是新加入的consumer獲取訂閱信息變了
,會通知這個消費者組裏面的其他消費者說消費者有變化,被通知到的消費者就會重新負載均衡
。
參考
[1] https://www.modb.pro/db/141171
[2] https://www.cnblogs.com/duanxz/p/5020398.html
[3] https://www.cnblogs.com/dennyzhangdd/p/15035116.html
[4] https://cloud.tencent.com/developer/article/2277381
本文由 Readfog 進行 AMP 轉碼,版權歸原作者所有。
來源:https://mp.weixin.qq.com/s/muZd3xsKnFkfnEjBfOW2mA