RocketMQ 原理學習

本文深入探討了一款分佈式、隊列模型的消息中間件。RocketMQ 設計用於高度可擴展的分佈式系統,旨在解決應用解耦、異步調用、流量削峯及確保分佈式環境下的最終一致性等關鍵問題。這不僅有助於開發者理解 RocketMQ 的工作原理,也爲想要深入優化或定製消息中間件的高級用戶提供了寶貴的參考資料。

簡介

RocketMQ 是一款分佈式、隊列模型的消息中間件,消息生產分爲 Topic 與 Queue 兩種模式,Push 和 Pull 兩種方式消費,支持嚴格的消息順序,億級別的堆積能力,支持消息回溯和多個維度的消息查詢。

作用:應用解耦、異步調用、流量削峯、分佈式最終一致性

源碼核心模塊:namesrv、client、broker、store、remoting

突出優勢:能保證嚴格的消息順序;提供豐富的消息獲取模式;消費者水平擴展能力;億級消息堆積能力等。

名詞解釋

NameServer(NameService)

用於服務發現,提供名稱服務。NameServer 是一個幾乎無狀態節點,可集羣部署,節點之間無任何信息同步。

內部維護 Topic 和 Broker 之間的對應關係,並且和所有 Broker 保持心跳鏈接,在 Producer 和 Consumer 需要發佈或者消費消息的時候,向 NameServer 發出請求來獲取連接的 Broker 的信息。

NameServer 可以部署多個,每個之間相互獨立。其他角色同時向多個 NameServer 機器上報狀態信息,從而達到熱備份的目的。

NameServer 類似 Kafka 中 Zookeeper 的角色。不使用 Zookeeper 作爲註冊中心是因爲 Zk 有自動選舉 Master 的功能,rocketMQ 的架構設計上決定了它不需要進行 Master 選舉,而只需要使用一個輕量級的元數據服務就行了。

Broker

RocketMQ 的服務器,負責消息中轉,主要職責是存儲、轉發消息。

部署相對複雜,Broker 分爲 Master 和 Slave,一個 Master 可以對應多個 Slave,但是一個 Slave 只能對應一個 Master,Master 與 Slave 的對應關係通過指定相同的 BrokerName,不同的 BrokerId 來定義。BrokerId 爲 0 表示 Master,非 0 表示 Slave。Master 可以部署多個。

每個 Broker 和 NameServer 集羣中的所有節點建立長連接,定時(30s)註冊 Topic 消息到所有 NameServer。NameServer 定時(10s)掃描所有存活 Broker 的連接,如果 NameServer 超過 2 分鐘沒有收到心跳,則 NameServer 斷開與該 Broker 的連接。

消息會發送到 Master 上,一旦 Master 上面記錄成功,不用等待 Slave 上是否記錄成功,Slave 會定時的去獲取消息記錄,所以 Slave 會和 Master 會存在時間差。Slave 可以作爲 Consumer 的服務提供者,如果寫入必須通過 Master,消費的時候則可以直接從 Slave 上獲取。

消息發送到 Broker 後需要進行持久化。

刷盤策略指的是消息發送到 Broker 內存後持久化到磁盤的方式,分爲同步刷盤和異步刷盤。

複製策略是 Broker 的 Master 與 Slave 間的數據同步方式,分爲同步複製與異步複製。

由於異步複製、異步刷盤可能會丟失少量信息,因此 Broker 默認採用的是同步雙寫的方式。消息寫入 Master 成功後,Master 會等待 Slave 同步數據成功後才向 Producer 返回成功 ACK,即 Master 與 Slave 都要寫入成功後纔會返回成功 ACK。這樣可以保證消息發送時消息不丟失。

Producer

消息生產者,負責產生消息,一般是業務系統。Producer 與 NameServer 集羣中的一個節點(隨機選擇)建立長連接,定期從 NameServer 取 Topic 路由消息,並向提供 Topic 服務的 Master 建立長連接,且定時向 Master 發送心跳。Producer 完全無狀態,可以集羣部署。Producer 會發布消息到 Master 上面,然後由 Master 同步給所有的 Slave。

Producer 每隔 30s 從 NameServer 獲取所有 Topic 隊列的最新情況,這意味着如果 Broker 不可用,Producer 最多 30s 能夠感知,在此期間內發放 Broker 的所有消息都會失敗。

消息發送時如果出現失敗,默認會重試 2 次。在重試時會盡量避開剛剛接收失敗的 Broker,而是選擇其他 Broker 上的隊列進行發送,從而提高消息發送的成功率。

Consumer

消息消費者,負責消費消息,一般是後臺系統負責異步消費。Consumer 與 NameServer 集羣中的一個其中一個節點(隨機選擇)建立長連接,定期從 NameServer 取 Topic 路由信息,並向提供 Topic 服務的 Master、Slave 建立長連接,且定時向 Master、Slave 發送心跳。Consumer 既可以從 Master 訂閱消息,也可以從 Slave 訂閱消息,訂閱規則由 Broker 配置決定。

rocketMQ 使用長輪詢的方式,Consumer 和 Broker 保持長連接,Broker 獲取到消息後,會通知 Consumer 來拉取消息,Consumer 可以選擇 立即拉取 或 等待一段時間 再拉取消息。

消費重試

當出現消費失敗的消息時,Broker 會爲每個消費者組設置一個重試隊列。當一條消息初次消費失敗,消息隊列會自動進行消費重試。達到最大次數(默認 16 次),若消費仍然失敗,此時會將該消息發送到死信隊列。對於 死信消息,通常需要開發人員進行特殊處理。

Topic

發佈或者訂閱的主題。Topic 一般由多個隊列組成,隊列會平均地散列到多個 Broker 上面。Producer 的發送機制會保證消息儘量平均地散列到所有隊列上去。Tag 屬於子 Topic,主要的作用是給業務提供更大的靈活性。

Offset

消息在 Broker 上的每個分區都會組織成一個文件列表,消費者拉取數據的時候,需要知道數據在文件中的偏移量,這個偏移量就是 Offset。Offset 是一個絕對的偏移量,Broker 會講將 Offset 轉爲具體文件的相對偏移量。

核心流程

RocketMQ 的核心流程包含三大部分,分別是啓動、發送消息、消費消息。

啓動:核心是 NameService 啓動和 broker 啓動,NameService 是 mq 的註冊中心,broker 啓動後將自己的地址註冊到 NameService;生產者發送消息時,從 NameService 中讀取 broker 地址;消費者消費消息時,從 NameService 讀取 broker 地址。

發送消息:Producer 將消息寫入到 RocketMQ 集羣中 Broker 中具體的 Queue。

消費消息:Consumer 從 RocketMQ 集羣中 Broker 中拉取具體的消息。

最後 Producer、Consumer、NameService、Broker 都是通過 Netty 的網絡協議進行通信的,下面介紹各個組件。

NameServer

消息中間件的設計思路一般是基於主題訂閱發佈的機制,消息生產者(Producer)發送某一個主題到消息服務器,消息服務器負責將消息持久化存儲,消息消費者(Consumer)訂閱該興趣的主題,消息服務器根據訂閱信息(路由信息)將消息推送到消費者(Push 模式)或者消費者主動向消息服務器拉去(Pull 模式),從而實現消息生產者與消息消費者解耦。

作用

Broker 消息服務器在啓動的時向所有 NameServer 註冊,消息生產者(Producer)在發送消息時之前先從 NameServer 獲取 Broker 服務器地址列表,然後根據負載均衡算法從列表中選擇一臺服務器進行發送。NameServer 與每臺 Broker 保持長連接,並間隔 30S 檢測 Broker 是否存活,如果檢測到 Broker 宕機,則從路由註冊表中刪除,但是路由變化不會馬上通知消息生產者。這樣設計的目的是爲了降低 NameServer 實現的複雜度,在消息發送端提供容錯機制保證消息發送的可用性。

NameServer 啓動流程

NameServerStartup 是 NameServer 的啓動類,負責解析配置文件、加載運行時參數信息和初始化並啓動 NameServerController,NameServerController 是 NameServer 的核心控制器。

RouteInfoManager 管理路由信息 RemotingServer 與 rocketMQ 其他組件(Broker、Producer 和 Consumer)通信。

路由管理

NameServer 的主要作用是爲消息的生產者和消息消費者提供關於主題 Topic 的路由信息。那麼 NameServer 需要存儲路由的基礎信息,還要管理 Broker 節點,包括路由註冊、路由刪除等。

以 Broker 進行路由註冊 & 心跳爲例,闡述請求從被 Broker 發出到被 NameServer 處理的流程。

Broker 啓動時向集羣中所有的 NameServer 發送心跳信息,每隔 30s 向集羣中所有 NameServer 發送心跳包,NameServer 收到心跳包時會更新 brokerLiveTable 緩存中 BrokerLiveInfo 的 lastUpdataTimeStamp 信息,然後 NameServer 每隔 10s 掃描 brokerLiveTable,如果連續 120S 沒有收到心跳包,NameServer 將移除 Broker 的路由信息同時關閉 Socket 連接。

如果 Broker 宕機,NameServer 無法收到心跳包,此時 NameServer 如何來剔除這些失效的 Broker 呢?

Broker 啓動後每間隔 30s 向 NameServer 集羣所有節點廣播發送心跳消息。NameServer 啓動後每間隔 5s 掃描自身維護的 Broker 活躍信息。如果 BrokerLive 的上一次的心跳更新時間 + 超時時間(和 broker 綁定,不是常量)< 現在時間,則認爲 Broker 失效,NameServer 將對應的 Broker 從路由信息中移除。

NameService 對 broker 的心跳檢查

利用 java 中的定時任務類 ScheduledExecutorService 來檢測 broker 的心跳,每隔 5 秒掃描一次。

遍歷 broker 列表,若一個 broker 滿足上一次的心跳更新時間 + 超時時間(和 broker 綁定,不是常量)< 現在時間,那麼就認爲這個 broker 宕機,將這個 broker 關閉。

Broker

Broker 啓動的核心類是:BrokerStartup,BrokerController

BrokerStartup 負責加載配置信息,構建 BrokerController,BrokerController 負責啓動服務以及註冊請求的 Processor,processor 的作用是根據請求的協議得到請求 code,broker 維持一張 code 和處理方法的映射關係,獲取到請求信息後調用相應的 processor 進行處理。

Producer

消息發送

Producer 發送消息

消息發送的時候,大致流程是:通過 mqFaultStrategy 會從 topicPublishInfoTable 中選擇一個消息隊列,然後通過 mQClientFactory 類將消息發送到隊列中。而這些對象的初始化在 DefaultMQProducerImpl 內部 start 的方法中:

調用 start() 初始化後,producer 便得到了 nameServer 的地址,並和所有的 broker 維持了心跳信息,接下來就可以發送消息了。通過上面的分析,DefaultMQProducerImpl 是發送的核心類,send 最終調用的也是 DefaultMQProducerImpl 的方法。

選擇隊列,兩種機制:

Consumer消費消息

概述

消息消費以組的模式展開,一個消費組內可以包含多個消費者 (同一個 JVM 實例內只允許不允許存在消費組相同的消費者),消費組之間要保持統一的訂閱關係,這一點很重要。

消費組之間有兩種消費模式:

消息服務器與消費者之間的消息傳送也有兩種方式:

消息拉取

消息消費模式有兩種模式:廣播模式與集羣模式。廣播模式比較簡單,每一個消費者需要拉取訂閱主題下所有隊列的消息在集羣模式下,同一個消費者組內有多個消息消費者,同一個主題存在多個消費隊列,消費者通過負載均衡的方式消費消息。

消息隊列負載均衡,通常的作法是一個消息隊列在同一個時間只允許被一個消費消費者消費,一個消息消費者可以同時消費多個消息隊列。

RocketMQ 使用一個單獨的線程 PullMessageService 來負責消息的拉取。PullMessageService 循環不斷阻塞的從 pullRequestQueue 中取出 pullRequest。

ProcessQueue 是 MessageQueue 在消息端的快照。PullMessageService 從消息服務器默認每次拉取 32 條消息,按消息的隊列偏移量順序存放在 ProcessQueue 中,PullMessagService 然後將消息提交到消費者消費線程池,消息成功消費後從 ProcessQueue 中移除。

一個 consumer 客戶端會分配一個拉取消息線程(PullMessageService),不停地從存放了 messageQuene 的阻塞隊列中 take 需要拉取消息的 messagequene,最後通過調用通知網絡層發起拉取消息拉取的網絡請求(實際就是交給 netty 的 worker 線程拉消息),netty 的 worker 線程拉取到消息後調用處理 PullCallback 處理拉取的結果。

消息消費過程分析

PullMessageService 負責對消息隊列進行消息拉取,從遠端服務器拉取消息後將消息存儲 ProcessQueue 消息隊列處理隊列中,然後調用 ConsumeMessageService#submitConsumeRequest 方法進行消息消費。

ConsumeMessageService 支持順序消息和併發消息。

調用業務實現的消費消息邏輯,得到消費消息結果(即使消費超時了,也最終會根據 messageListner 執行返回的結果來決定是否重新消費消息)。

消費完消息之後會將消費之後的 messageQuene 對應的 offset 存放在緩存 map 中。在消費者啓動後,會調用定時任務 persistAllConsumerOffset 將緩存的 offsetTable 提交給 broker。

由於 offset 是先存在內存中,定時器間隔幾秒提交給 broker,消費之後的 offset 是完全存在可能丟失的風險(例如 consumer 端突然宕機),從而會導致沒有提交 offset 到 broker,再次啓動 consumer 客戶端時,會重複消費。

消息拉取長輪詢機制分析

RocketMQ 未真正實現消息推模式,而是消費者主動向消息服務器拉取消息,RocketMQ 推模式是循環向消息服務端發起消息拉取請求,如果消息消費者向 RocketMQ 拉取消息時,消息未到達消費隊列時,如果不啓用長輪詢機制,則會在服務端等待 shortPollingTimeMills 時間後(掛起)再去判斷消息是否已經到達指定消息隊列,如果消息仍未到達則提示拉取消息客戶端 PULL—NOT—FOUND(消息不存在);如果開啓長輪詢模式,RocketMQ 一方面會每隔 5s 輪詢檢查一次消息是否可達,同時一有消息達到後立馬通知掛起線程再次驗證消息是否是自己感興趣的消息,如果是則從 CommitLog 文件中提取消息返回給消息拉取客戶端。

長輪詢在 broker 是如何實現的?

構造 PullRequest,然後放入到 PullRequestHoldService 中,在 PullRequestHoldService 會定期判斷 pullRequest 是否可以喚醒;

將需要 hold 處理的 PullRequest 放入到一個 ConcurrentHashMap 中,等待被檢查; 

checkHoldRequest 會對每個在 pullRequestTable 的 pullRequest 進行檢查,檢查邏輯在 notifyMessageArriving 方法。

方法中兩個重要的判定就是:比較當前的 offset 和 maxoffset,看是否有新的消息到來,有新的消息返回客戶端;另外一個就是比較當前的時間和阻塞的時間,看是否超過了最大的阻塞時間,超過也同樣返回。

順序消息消費

消息拉取服務將消息提交到消費者消費線程池後,ConsumeMessageOrderlyService#run 進行順序消息消費,consumer 在拉取消息之前對要拉取的 MessageQueue 加鎖,成功後才拉取消息。

存儲結構

RocketMQ 的主要存儲結構分成三大部分,分別是 commitLog、ConsumeQueue、indexFile:

CommitLog:消息存儲文件,所有消息主題的消息都存儲在 CommitLog 文件中,並且是順序存儲。

ConsumeQueue:消息消費隊列,消息到達 CommitLog 文件後,將異步轉發到消息消費隊列,供消息消費者消費。

IndexFile:消息索引文件,主要存儲了消息的 Key 和索引的位置的對應關係。

RocketMQ 消息存儲是由 CommitLog 和 ConsumeQueue 配合完成的。CommitLog 真正存儲消息,ConsumeQueue 是消息的邏輯隊列,類似數據庫的索引文件,存儲消息在實際 CommitLog 物理存儲中的偏移地址。每個 Topic 下的每個 Queue 都有一個對應的 ConsumeQueue 文件。

CommitLog 以物理文件存儲,每個 Broker 上的 CommitLog 被該機器所有的 Queue 共享。

對所有數據單獨存儲到一個 CommitLog,完全順序寫,隨機讀,對最終用戶展現的隊列實際上只存儲消息在 CommitLog 中的位置信息,並且串行方式刷盤。

這樣做的好處:

隊列輕量化,單個隊列數據量非常少。對磁盤的訪問串行化,避免磁盤競爭,不會因爲隊列增加導致 IOWAIT 增高。

缺點如下:

寫雖然是順序寫,但是讀卻變成了完全的隨機讀。讀一條消息,會先讀 ConsumeQueue,再讀 CommitLog,增加了開銷。要保證 CommitLog 與 ConsumeQueue 完全的一致,增加了複雜度。

rocketMQ 如何克服:

隨機讀,儘可能讓讀命中 PAGECACHE,減少 IO 讀操作,所以內存越大越好。如果系統中堆積的消息過多,讀數據要訪問磁盤會不會由於隨機讀導致系統性能急劇下降,答案是否定的。

訪問 PAGECACHE 時,即使只訪問 1k 的消息,系統也會提前預讀出更多數據,在下次讀時,就可能命中內存。

隨機訪問 Commit Log 磁盤數據,系統 IO 調度算法設置爲 NOOP 方式,會在一定程度上將完全的隨機讀變成順序跳躍方式,而順序跳躍方式讀較完全的隨機讀性能會高 5 倍以上。

由於 Consume Queue 存儲數據量極少,而且是順序讀,在 PAGECACHE 預讀作用下,Consume Queue 的讀性能幾乎與內存一致,即使堆積情況下。所以可認爲 Consume Queue 完全不會阻礙讀性能。

CommitLog 中存儲了所有的元信息,包含消息體,類似於 Mysql、Oracle 的 redolog,所以只要有 CommitLog 在,Consume Queue 即使數據丟失,仍然可以恢復出來。

Broker 消息存儲做了什麼事

Broker 主要就是將 Producer 發送過來的消息做持久化存儲 (CommitLog),之後對保存的物理位置做文件索引(IndexFile) 和實際消費排序(ConsumerQueue)。

Broker 主要負責消息的存儲、投遞和查詢以及服務高可用保證,爲了實現這些功能,Broker 包含了以下幾個重要子模塊:

  1. RemotingModule:整個 Broker 的實體,負責處理來自 clients 端的請求。

  2. ClientManager:負責管理客戶端 (Producer/Consumer) 和維護 Consumer 的 Topic 訂閱信息

  3. StoreService:提供方便簡單的 API 接口處理消息存儲到物理硬盤和查詢功能。

  4. HAService:高可用服務,提供 Master Broker 和 Slave Broker 之間的數據同步功能。

  5. IndexService:根據特定的 Message key 對投遞到 Broker 的消息進行索引服務,以提供消息的快速查詢。

消息存儲整體架構

RocketMQ 首先將消息的寫入轉化爲順序寫,即所有 Topic 的消息均寫入同一個文件(CommitLog)。同時,由於消息仍需要以 Topic 爲維度進行消費,因此 rocketMQ 基於 CommitLog 爲每個 Topic 異步構建多個邏輯隊列(ConsumeQueue)和索引信息(Index):ConsumeQueue 記錄了消息在 CommitLog 中的位置信息;給定 Topic 和消息 Key,索引文件(Index)提供消息檢索的能力。

RocketMQ 採用了單一的日誌文件,即把同 1 臺機器上面所有 topic 的所有 queue 的消息,存放在一個文件裏面,從而避免了隨機的磁盤寫入。

不同 Topic 的消息最終均被順序持久化至共享的 CommitLog,CommitLog 由固定大小的文件隊列組成,文件隊列被定義爲 MappedFileQueue,MappedFileQueue 中每個文件被定義爲 MappedFile,每個 MappedFile 對應一個具體的文件用於將消息持久化至磁盤。

MapedFileQueue 存儲隊列,數據定時刪除,無限增長。

隊列有多個文件(MapedFile)組成,當消息到達 broker 時,需要獲取最新的 MapedFile 寫入數據,調用 MapedFileQueue 的 getLastMapedFile 獲取,此函數如果集合中一個也沒有創建一個,如果最後一個寫滿了也創建一個新的。

MapedFileQueue 在獲取 getLastMapedFile 時,如果需要創建新的 MapedFile 會計算出下一個 MapedFile 文件地址,通過預分配服務 AllocateMapedFileService 異步預創建下一個 MapedFile 文件,這樣下次創建新文件請求就不要等待,因爲創建文件特別是一個 1G 的文件還是有點耗時的,後續如果是異步刷盤還需要將 mapedFile 中的消息序列化到 commitLog 物理文件。

Index 的整體設計思想類似持久化在磁盤的 HashMap,同樣使用鏈式地址法解決哈希衝突:每個 Hash Slot 關聯一個 Message Index 鏈表,多個 Message Index 通過 preIndexOffset 連接。

消息存儲實現

Producer 端:Producer 發送消息至 Broker 端,然後 Broker 端使用同步或者異步的方式對消息刷盤持久化,保存至 CommitLog 中。只要消息被刷盤持久化至磁盤文件 CommitLog 中,那麼 Producer 發送的消息就不會丟失。

Consumer 端:Consumer 也就肯定有機會去消費這條消息,至於消費的時間可以稍微滯後一些也沒有太大的關係。退一步地講,即使 Consumer 端第一次沒法拉取到待消費的消息,Broker 服務端也能夠通過長輪詢機制等待一定時間延遲後再次發起拉取消息的請求。這裏,rocketMQ 的具體做法是,使用 Broker 端的後臺服務線程—ReputMessageService 不停地分發請求並異步構建 ConsumeQueue(邏輯消費隊列)和 IndexFile(索引文件)數據。然後,Consumer 即可根據 ConsumerQueue 來查找待消費的消息了。其中,ConsumeQueue(邏輯消費隊列)作爲消費消息的索引,保存了指定 Topic 下的隊列消息在 CommitLog 中的起始物理偏移量 offset,消息大小 size 和消息 Tag 的 HashCode 值。而 IndexFile(索引文件)則只是爲了消息查詢提供了一種通過 key 或時間區間來查詢消息的方法。

消息存儲總結

消息生產與消息消費相互分離,Producer 端發送消息最終寫入的是 CommitLog(消息存儲的日誌數據文件),Consumer 端先從 ConsumeQueue(消息邏輯隊列)讀取持久化消息的起始物理位置偏移量 offset、大小 size 和消息 Tag 的 HashCode 值,隨後再從 CommitLog 中進行讀取待拉取消費消息的真正實體內容部分。

RocketMQ 的 CommitLog 文件採用混合型存儲(所有的 Topic 下的消息隊列共用同一個 CommitLog 的日誌數據文件),並通過建立類似索引文件—ConsumeQueue 的方式來區分不同 Topic 下面的不同 MessageQueue 的消息,同時爲消費消息起到一定的緩衝作用(只有 ReputMessageService 異步服務線程通過 doDispatch 異步生成了 ConsumeQueue 隊列的元素後,Consumer 端才能進行消費)。這樣,只要消息寫入並刷盤至 CommitLog 文件後,消息就不會丟失,即使 ConsumeQueue 中的數據丟失,也可以通過 CommitLog 來恢復。

生產者端的消息確實是順序寫入 CommitLog;訂閱消息時,消費者端也是順序讀取 ConsumeQueue

結語

總之,本文是一篇全面介紹 RocketMQ 的指南,不僅適合想要深入瞭解技術架構的開發者閱讀,也爲那些計劃在項目中應用消息隊列技術的團隊提供了寶貴的參考資料。通過掌握 RocketMQ 的工作原理,開發者可以更好地利用其特性解決實際業務中的消息傳遞需求,提升系統的彈性和可擴展性。

參考資料

本文由 Readfog 進行 AMP 轉碼,版權歸原作者所有。
來源https://mp.weixin.qq.com/s/vvL0BDSUOTJNiOiDX4RQ0A