RabbitMQ、RocketMQ、Kafka 三元歸一
RabbitMQ
RabbitMQ 各組件的功能
-
Broker :一個 RabbitMQ 實例就是一個 Broker
-
Virtual Host :虛擬主機。相當於 MySQL 的 DataBase,一個 Broker 上可以存在多個 vhost,vhost 之間相互隔離。每個 vhost 都擁有自己的隊列、交換機、綁定和權限機制。vhost 必須在連接時指定,默認的 vhost 是 /。
-
Exchange :交換機,用來接收生產者發送的消息並將這些消息路由給服務器中的隊列。
-
Queue :消息隊列,用來保存消息直到發送給消費者。它是消息的容器。一個消息可投入一個或多個隊列。
-
Banding :綁定關係,用於消息隊列和交換機之間的關聯。通過路由鍵(Routing Key)將交換機和消息隊列關聯起來。
-
Channel :管道,一條雙向數據流通道。不管是發佈消息、訂閱隊列還是接收消息,這些動作都是通過管道完成。因爲對於操作系統來說,建立和銷燬 TCP 都是非常昂貴的開銷,所以引入了管道的概念,以複用一條 TCP 連接。
-
Connection :生產者 / 消費者 與 broker 之間的 TCP 連接。
-
Publisher :消息的生產者。
-
Consumer :消息的消費者。
-
Message :消息,它是由消息頭和消息體組成。消息頭則包括 Routing-Key、Priority(優先級)等。
RabbitMQ 的多種交換機類型
Exchange 分發消息給 Queue 時, Exchange 的類型對應不同的分發策略,有 3 種類型的 Exchange :Direct、Fanout、Topic。
-
Direct:消息中的 Routing Key 如果和 Binding 中的 Routing Key 完全一致, Exchange 就會將消息分發到對應的隊列中。
-
Fanout:每個發到 Fanout 類型交換機的消息都會分發到所有綁定的隊列上去。Fanout 交換機沒有 Routing Key 。它在三種類型的交換機中轉發消息是最快的。
-
Topic:Topic 交換機通過模式匹配分配消息,將 Routing Key 和某個模式進行匹配。它只能識別兩個通配符:"#" 和 "*"。### 匹配 0 個或多個單詞, * 匹配 1 個單詞。
TTL
TTL(Time To Live):生存時間。RabbitMQ 支持消息的過期時間,一共 2 種。
-
在消息發送時進行指定。通過配置消息體的 Properties ,可以指定當前消息的過期時間。
-
在創建 Exchange 時指定。從進入消息隊列開始計算,只要超過了隊列的超時時間配置,那麼消息會自動清除。
生產者的消息確認機制
Confirm 機制:
-
消息的確認,是指生產者投遞消息後,如果 Broker 收到消息,則會給我們生產者一個應答。
-
生產者進行接受應答,用來確認這條消息是否正常的發送到了 Broker,這種方式也是消息的可靠性投遞的核心保障!
如何實現 Confirm 確認消息?
-
在 channel 上開啓確認模式:channel.confirmSelect()
-
在 channel 上開啓監聽:addConfirmListener ,監聽成功和失敗的處理結果,根據具體的結果對消息進行重新發送或記錄日誌處理等後續操作。
Return 消息機制:
Return Listener 用於處理一些不可路由的消息。
我們的消息生產者,通過指定一個 Exchange 和 Routing,把消息送達到某一個隊列中去,然後我們的消費者監聽隊列進行消息的消費處理操作。
但是在某些情況下,如果我們在發送消息的時候,當前的 exchange 不存在或者指定的路由 key 路由不到,這個時候我們需要監聽這種不可達消息,就需要使用到 Returrn Listener。
基礎 API 中有個關鍵的配置項 Mandatory :如果爲 true,監聽器會收到路由不可達的消息,然後進行處理。如果爲 false,broker 端會自動刪除該消息。
同樣,通過監聽的方式, chennel.addReturnListener(ReturnListener rl) 傳入已經重寫過 handleReturn 方法的 ReturnListener。
消費端 ACK 與 NACK
消費端進行消費的時候,如果由於業務異常可以進行日誌的記錄,然後進行補償。但是對於服務器宕機等嚴重問題,我們需要手動 ACK 保障消費端消費成功。
// deliveryTag:消息在mq中的唯一標識
// multiple:是否批量(和qos設置類似的參數)
// requeue:是否需要重回隊列。或者丟棄或者重回隊首再次消費。
public void basicNack(long deliveryTag, boolean multiple, boolean requeue)
如上代碼,消息在消費端重回隊列是爲了對沒有成功處理消息,把消息重新返回到 Broker。一般來說,實際應用中都會關閉重回隊列(避免進入死循環),也就是設置爲 false。
死信隊列 DLX
死信隊列(DLX Dead-Letter-Exchange):當消息在一個隊列中變成死信之後,它會被重新推送到另一個隊列,這個隊列就是死信隊列。
DLX 也是一個正常的 Exchange,和一般的 Exchange 沒有區別,它能在任何的隊列上被指定,實際上就是設置某個隊列的屬性。
當這個隊列中有死信時,RabbitMQ 就會自動的將這個消息重新發布到設置的 Exchange 上去,進而被路由到另一個隊列。
RocketMQ
阿里巴巴雙十一官方指定消息產品,支撐阿里巴巴集團所有的消息服務,歷經十餘年高可用與高可靠的嚴苛考驗,是阿里巴巴交易鏈路的核心產品。
Rocket:火箭的意思。
他有以下核心概念:Broker 、 Topic 、 Tag 、 MessageQueue 、 NameServer 、 Group 、 Offset 、 Producer 以及 Consumer 。
下面來詳細介紹。
-
Broker:消息中轉角色,負責存儲消息,轉發消息。Broker 是具體提供業務的服務器,單個 Broker 節點與所有的 NameServer 節點保持長連接及心跳,並會定時將 Topic 信息註冊到 NameServer,順帶一提底層的通信和連接都是基於 Netty 實現的。Broker 負責消息存儲,以 Topic 爲緯度支持輕量級的隊列,單機可以支撐上萬隊列規模,支持消息推拉模型。官網上有數據顯示:具有上億級消息堆積能力,同時可嚴格保證消息的有序性。
-
Topic:主題!它是消息的第一級類型。比如一個電商系統可以分爲:交易消息、物流消息等,一條消息必須有一個 Topic 。Topic 與生產者和消費者的關係非常鬆散,一個 Topic 可以有 0 個、1 個、多個生產者向其發送消息,一個生產者也可以同時向不同的 Topic 發送消息。一個 Topic 也可以被 0 個、1 個、多個消費者訂閱。
-
Tag:標籤!可以看作子主題,它是消息的第二級類型,用於爲用戶提供額外的靈活性。使用標籤,同一業務模塊不同目的的消息就可以用相同 Topic 而不同的 Tag 來標識。比如交易消息又可以分爲:交易創建消息、交易完成消息等,一條消息可以沒有 Tag。標籤有助於保持您的代碼乾淨和連貫,並且還可以爲 RabbitMQ 提供的查詢系統提供幫助。
-
MessageQueue:一個 Topic 下可以設置多個消息隊列,發送消息時執行該消息的 Topic,RocketMQ 會輪詢該 Topic 下的所有隊列將消息發出去。消息的物理管理單位。一個 Topic 下可以有多個 Queue,Queue 的引入使得消息的存儲可以分佈式集羣化,具有了水平擴展能力。
-
NameServer:類似 Kafka 中的 ZooKeeper,但 NameServer 集羣之間是沒有通信的,相對 ZK 來說更加輕量。它主要負責對於源數據的管理,包括了對於 Topic 和路由信息的管理。每個 Broker 在啓動的時候會到 NameServer 註冊,Producer 在發送消息前會根據 Topic 去 NameServer 獲取對應 Broker 的路由信息,Consumer 也會定時獲取 Topic 的路由信息。
-
Producer:生產者,支持三種方式發送消息:同步、異步和單向單向發送 :消息發出去後,可以繼續發送下一條消息或執行業務代碼,不等待服務器迴應,且沒有回調函數。異步發送 :消息發出去後,可以繼續發送下一條消息或執行業務代碼,不等待服務器迴應,有回調函數。同步發送 :消息發出去後,等待服務器響應成功或失敗,才能繼續後面的操作。
-
Consumer:消費者,支持 PUSH 和 PULL 兩種消費模式,支持集羣消費和廣播消費集羣消費 :該模式下一個消費者集羣共同消費一個主題的多個隊列,一個隊列只會被一個消費者消費,如果某個消費者掛掉,分組內其它消費者會接替掛掉的消費者繼續消費。廣播消費 :會發給消費者組中的每一個消費者進行消費。相當於 RabbitMQ 的發佈訂閱模式。
-
Group:分組,一個組可以訂閱多個 Topic。分爲 ProducerGroup,ConsumerGroup,代表某一類的生產者和消費者,一般來說同一個服務可以作爲 Group,同一個 Group 一般來說發送和消費的消息都是一樣的
-
Offset:在 RocketMQ 中,所有消息隊列都是持久化,長度無限的數據結構,所謂長度無限是指隊列中的每個存儲單元都是定長,訪問其中的存儲單元使用 Offset 來訪問,Offset 爲 Java Long 類型,64 位,理論上在 100 年內不會溢出,所以認爲是長度無限。也可以認爲 Message Queue 是一個長度無限的數組,Offset 就是下標。
延時消息
開源版的 RocketMQ 不支持任意時間精度,僅支持特定的 level,例如定時 5s,10s,1min 等。其中,level=0 級表示不延時,level=1 表示 1 級延時,level=2 表示 2 級延時,以此類推。
延時等級如下:
messageDelayLevel=1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h
順序消息
消息有序指的是可以按照消息的發送順序來消費(FIFO)。RocketMQ 可以嚴格的保證消息有序,可以分爲 分區有序 或者 全局有序 。
事務消息
消息隊列 MQ 提供類似 X/Open XA 的分佈式事務功能,通過消息隊列 MQ 事務消息能達到分佈式事務的最終一致。上圖說明了事務消息的大致流程:正常事務消息的發送和提交、事務消息的補償流程。
事務消息發送及提交:
-
發送 half 消息
-
服務端響應消息寫入結果
-
根據發送結果執行本地事務(如果寫入失敗,此時 half 消息對業務不可見,本地邏輯不執行);
-
根據本地事務狀態執行 Commit 或 Rollback(Commit 操作生成消息索引,消息對消費者可見)。
事務消息的補償流程:
-
對沒有 Commit/Rollback 的事務消息(pending 狀態的消息),從服務端發起一次 “回查”;
-
Producer 收到回查消息,檢查回查消息對應的本地事務的狀態。
-
根據本地事務狀態,重新 Commit 或 RollBack
其中,補償階段用於解決消息 Commit 或 Rollback 發生超時或者失敗的情況。
事務消息狀態:
事務消息共有三種狀態:提交狀態、回滾狀態、中間狀態:
-
TransactionStatus.CommitTransaction:提交事務,它允許消費者消費此消息。
-
TransactionStatus.RollbackTransaction:回滾事務,它代表該消息將被刪除,不允許被消費。
-
TransactionStatus.Unkonwn:中間狀態,它代表需要檢查消息隊列來確定消息狀態。
RocketMQ 的高可用機制
RocketMQ是天生支持分佈式的,可以配置主從以及水平擴展。
Master 角色的 Broker 支持讀和寫,Slave 角色的 Broker 僅支持讀,也就是 Producer 只能和 Master 角色的 Broker 連接寫入消息;Consumer 可以連接 Master 角色的 Broker,也可以連接 Slave 角色的 Broker 來讀取消息。
消息消費的高可用(主從):
在 Consumer 的配置文件中,並不需要設置是從 Master 讀還是從 Slave 讀,當 Master 不可用或者繁忙的時候,Consumer 會被自動切換到從 Slave 讀。有了自動切換 Consumer 這種機制,當一個 Master 角色的機器出現故障後,Consumer 仍然可以從 Slave 讀取消息,不影響 Consumer 程序。這就達到了消費端的高可用性。RocketMQ 目前還不支持把 Slave 自動轉成 Master,如果機器資源不足,需要把 Slave 轉成 Master,則要手動停止 Slave 角色的 Broker,更改配置文件,用新的配置文件啓動 Broker。
消息發送高可用(配置多個主節點):
在創建 Topic 的時候,把 Topic 的多個 Message Queue 創建在多個 Broker 組上(相同 Broker 名稱,不同 brokerId 的機器組成一個 Broker 組),這樣當一個 Broker 組的 Master 不可用後,其他組的 Master 仍然可用,Producer 仍然可以發送消息。
主從複製:
如果一個 Broker 組有 Master 和 Slave,消息需要從 Master 複製到 Slave 上,有同步和異步兩種複製方式。
-
同步複製:同步複製方式是等 Master 和 Slave 均寫成功後才反饋給客戶端寫成功狀態。如果 Master 出故障, Slave 上有全部的備份數據,容易恢復同步複製會增大數據寫入延遲,降低系統吞吐量。
-
異步複製:異步複製方式是隻要 Master 寫成功 即可反饋給客戶端寫成功狀態。在異步複製方式下,系統擁有較低的延遲和較高的吞吐量,但是如果 Master 出了故障,有些數據因爲沒有被寫 入 Slave,有可能會丟失
通常情況下,應該把 Master 和 Save 配置成同步刷盤方式,主從之間配置成異步的複製方式,這樣即使有一臺機器出故障,仍然能保證數據不丟,是個不錯的選擇。
負載均衡
Producer 負載均衡:
Producer 端,每個實例在發消息的時候,默認會輪詢所有的 Message Queue 發送,以達到讓消息平均落在不同的 Queue 上。而由於 Queue 可以散落在不同的 Broker,所以消息就發送到不同的 Broker 下,如下圖:
如果 Consumer 實例的數量比 Message Queue 的總數量還多的話,多出來的 Consumer 實例將無法分到 Queue,也就無法消費到消息,也就無法起到分攤負載的作用了。所以需要控制讓 Queue 的總數量大於等於 Consumer 的數量。
-
消費者的集羣模式:啓動多個消費者就可以保證消費者的負載均衡(均攤隊列)
-
默認使用的是均攤隊列:會按照 Queue 的數量和實例的數量平均分配 Queue 給每個實例,這樣每個消費者可以均攤消費的隊列,如下圖所示 6 個隊列和三個生產者。
-
另外一種平均的算法環狀輪流分 Queue 的形式,每個消費者,均攤不同主節點的一個消息隊列,如下圖所示:
對於廣播模式並不是負載均衡的,要求一條消息需要投遞到一個消費組下面所有的消費者實例,所以也就沒有消息被分攤消費的說法。
死信隊列
當一條消息消費失敗,RocketMQ 就會自動進行消息重試。而如果消息超過最大重試次數,RocketMQ 就會認爲這個消息有問題。但是此時,RocketMQ 不會立刻將這個有問題的消息丟棄,而會將其發送到這個消費者組對應的一種特殊隊列:死信隊列。死信隊列的名稱是 %DLQ%+ConsumGroup 。
死信隊列具有以下特性:
-
一個死信隊列對應一個 Group ID, 而不是對應單個消費者實例。
-
如果一個 Group ID 未產生死信消息,消息隊列 RocketMQ 不會爲其創建相應的死信隊列。
-
一個死信隊列包含了對應 Group ID 產生的所有死信消息,不論該消息屬於哪個 Topic。
Kafka
Kafka 是一個分佈式、支持分區的、多副本的,基於 ZooKeeper 協調的分佈式消息系統。
它最大的特性就是可以實時的處理大量數據以滿足各種需求場景:比如基於 Hadoop 的批處理系統、低延遲的實時系統、Storm/Spark 流式處理引擎,Web/Nginx 日誌、訪問日誌,消息服務等等,用 Scala 語言編寫。屬於 Apache 基金會的頂級開源項目。
先看一下 Kafka 的架構圖 :
Kafka 的核心概念
在 Kafka 中有幾個核心概念:
-
Broker:消息中間件處理節點,一個 Kafka 節點就是一個 Broker,一個或者多個 Broker 可以組成一個 Kafka 集羣
-
Topic:Kafka 根據 topic 對消息進行歸類,發佈到 Kafka 集羣的每條消息都需要指定一個 topic
-
Producer:消息生產者,向 Broker 發送消息的客戶端
-
Consumer:消息消費者,從 Broker 讀取消息的客戶端
-
ConsumerGroup:每個 Consumer 屬於一個特定的 ConsumerGroup,一條消息可以被多個不同的 ConsumerGroup 消費,但是一個 ConsumerGroup 中只能有一個 Consumer 能夠消費該消息
-
Partition:物理上的概念,一個 topic 可以分爲多個 partition,每個 partition 內部消息是有序的
-
Leader:每個 Partition 有多個副本,其中有且僅有一個作爲 Leader,Leader 是負責數據讀寫的 Partition。
-
Follower:Follower 跟隨 Leader,所有寫請求都通過 Leader 路由,數據變更會廣播給所有 Follower,Follower 與 Leader 保持數據同步。如果 Leader 失效,則從 Follower 中選舉出一個新的 Leader。當 Follower 與 Leader 掛掉、卡住或者同步太慢,Leader 會把這個 Follower 從 ISR 列表 中刪除,重新創建一個 Follower。
-
Offset:偏移量。Kafka 的存儲文件都是按照 offset.kafka 來命名,用 Offset 做名字的好處是方便查找。例如你想找位於 2049 的位置,只要找到 2048.kafka 的文件即可。
可以這麼來理解 Topic,Partition 和 Broker:
一個 Topic,代表邏輯上的一個業務數據集,比如訂單相關操作消息放入訂單 Topic,用戶相關操作消息放入用戶 Topic,對於大型網站來說,後端數據都是海量的,訂單消息很可能是非常巨量的,比如有幾百個 G 甚至達到 TB 級別,如果把這麼多數據都放在一臺機器上可定會有容量限制問題,那麼就可以在 Topic 內部劃分多個 Partition 來分片存儲數據,不同的 Partition 可以位於不同的機器上,相當於分佈式存儲。每臺機器上都運行一個 Kafka 的進程 Broker。
Kafka 核心總控制器 Controller
在 Kafka 集羣中會有一個或者多個 Broker,其中有一個 Broker 會被選舉爲控制器(Kafka Controller),可以理解爲 Broker-Leader ,它負責管理整個 集羣中所有分區和副本的狀態。
Partition-Leader
Controller 選舉機制
在 Kafka 集羣啓動的時候,選舉的過程是集羣中每個 Broker 都會嘗試在 ZooKeeper 上創建一個 /controller 臨時節點,ZooKeeper 會保證有且僅有一個 Broker 能創建成功,這個 Broker 就會成爲集羣的總控器 Controller。
當這個 Controller 角色的 Broker 宕機了,此時 ZooKeeper 臨時節點會消失,集羣裏其他 Broker 會一直監聽這個臨時節 點,發現臨時節點消失了,就競爭再次創建臨時節點,就是我們上面說的選舉機制,ZooKeeper 又會保證有一個 Broker 成爲新的 Controller。具備控制器身份的 Broker 需要比其他普通的 Broker 多一份職責,具體細節如下:
-
監聽 Broker 相關的變化。爲 ZooKeeper 中的 / brokers/ids / 節點添加 BrokerChangeListener,用來處理 Broker 增減的變化。
-
監聽 Topic 相關的變化。爲 ZooKeeper 中的 / brokers/topics 節點添加 TopicChangeListener,用來處理 Topic 增減的變化;爲 ZooKeeper 中的 / admin/delete_topics 節點添加 TopicDeletionListener,用來處理刪除 Topic 的動作。
-
從 ZooKeeper 中讀取獲取當前所有與 Topic、Partition 以及 Broker 有關的信息並進行相應的管理 。對於所有 Topic 所對應的 ZooKeeper 中的 / brokers/topics / 節點添加 PartitionModificationsListener,用來監聽 Topic 中的分區分配變化。
-
更新集羣的元數據信息,同步到其他普通的 Broker 節點中
Partition 副本選舉 Leader 機制
Controller 感知到分區 Leader 所在的 Broker 掛了,Controller 會從 ISR 列表(參數
unclean.leader.election.enable=false 的前提下)裏挑第一個 Broker 作爲 Leader(第一個 Broker 最先放進 ISR 列表,可能是同步數據最多的副本),如果參數 unclean.leader.election.enable 爲 true,代表在 ISR 列表裏所有副本都掛了的時候可以在 ISR 列表以外的副本中選 Leader,這種設置,可以提高可用性,但是選出的新 Leader 有可能數據少很多。副本進入 ISR 列表有兩個條件:
-
副本節點不能產生分區,必須能與 ZooKeeper 保持會話以及跟 Leader 副本網絡連通
-
副本能複製 Leader 上的所有寫操作,並且不能落後太多。(與 Leader 副本同步滯後的副本,是由 replica.lag.time.max.ms 配置決定的,超過這個時間都沒有跟 Leader 同步過的一次的副本會被移出 ISR 列表)
消費者消費消息的 Offset 記錄機制
每個 Consumer 會定期將自己消費分區的 Offset 提交給 Kafka 內部 Topic:consumer_offsets,提交過去的時候,key 是 consumerGroupId+topic + 分區號,value 就是當前 Offset 的值,Kafka 會定期清理 Topic 裏的消息,最後就保留最新的那條數據。
因爲__consumer_offsets 可能會接收高併發的請求,Kafka 默認給其分配 50 個分區(可以通過
offsets.topic.num.partitions 設置),這樣可以通過加機器的方式抗大併發。
消費者 Rebalance 機制
Rebalance 就是說 如果消費組裏的消費者數量有變化或消費的分區數有變化,Kafka 會重新分配消費者與消費分區的關係 。比如 consumer group 中某個消費者掛了,此時會自動把分配給他的分區交給其他的消費者,如果他又重啓了,那麼又會把一些分區重新交還給他。
注意:Rebalance 只針對 subscribe 這種不指定分區消費的情況,如果通過 assign 這種消費方式指定了分區,Kafka 不會進行 Rebalance。
如下情況可能會觸發消費者 Rebalance:
-
消費組裏的 Consumer 增加或減少了
-
動態給 Topic 增加了分區
-
消費組訂閱了更多的 Topic
Rebalance 過程中,消費者無法從 Kafka 消費消息,這對 Kafka 的 TPS 會有影響,如果 Kafka 集羣內節點較多,比如數百 個,那重平衡可能會耗時極多,所以應儘量避免在系統高峯期的重平衡發生。
Rebalance 過程如下
當有消費者加入消費組時,消費者、消費組及組協調器之間會經歷以下幾個階段:
組協調器 GroupCoordinator:每個 consumer group 都會選擇一個 Broker 作爲自己的組協調器 coordinator,負責監控這個消費組裏的所有消費者的心跳,以及判斷是否宕機,然後開啓消費者 Rebalance。consumer group 中的每個 consumer 啓動時會向 Kafka 集羣中的某個節點發送 FindCoordinatorRequest 請求來查找對應的組協調器 GroupCoordinator,並跟其建立網絡連接。組協調器選擇方式:通過如下公式可以選出 consumer 消費的 Offset 要提交到__consumer_offsets 的哪個分區,這個分區 Leader 對應的 Broker 就是這個 consumer group 的 coordinator 公式:
hash(consumer group id) % 對應主題的分區數
第二階段:加入消費組 JOIN GROUP
在成功找到消費組所對應的 GroupCoordinator 之後就進入加入消費組的階段,在此階段的消費者會向 GroupCoordinator 發送 JoinGroupRequest 請求,並處理響應。然後 GroupCoordinator 從一個 consumer group 中選擇第一個加入 group 的 consumer 作爲 Leader(消費組協調器),把 consumer group 情況發送給這個 Leader,接着這個 Leader 會負責制定分區方案。
第三階段(SYNC GROUP)
consumer leader 通過給 GroupCoordinator 發送 SyncGroupRequest,接着 GroupCoordinator 就把分區方案下發給各個 consumer,他們會根據指定分區的 Leader Broker 進行網絡連接以及消息消費。
消費者 Rebalance 分區分配策略
主要有三種 Rebalance 的策略:range 、 round-robin 、 sticky 。默認情況爲 range 分配策略。
假設一個主題有 10 個分區(0-9),現在有三個 consumer 消費:
range 策略:按照分區序號排序分配 ,假設 n=分區數/消費者數量 = 3, m=分區數 % 消費者數量 = 1,那麼前 m 個消 費者每個分配 n+1 個分區,後面的(消費者數量-m )個消費者每個分配 n 個分區。比如分區 0~ 3 給一個 consumer,分區 4~ 6 給一個 consumer,分區 7~9 給一個 consumer。
round-robin 策略:輪詢分配 ,比如分區 0、3、6、9 給一個 consumer,分區 1、4、7 給一個 consumer,分區 2、5、 8 給一個 consumer
sticky 策略:初始時分配策略與 round-robin 類似,但是在 rebalance 的時候,需要保證如下兩個原則:
-
分區的分配要儘可能均勻 。
-
分區的分配儘可能與上次分配的保持相同。
當兩者發生衝突時,第一個目標優先於第二個目標 。這樣可以最大程度維持原來的分區分配的策略。比如對於第一種 range 情況的分配,如果第三個 consumer 掛了,那麼重新用 sticky 策略分配的結果如下:consumer1 除了原有的 0~ 3,會再分配一個 7 consumer2 除了原有的 4~ 6,會再分配 8 和 9。
Producer 發佈消息機制剖析
1、寫入方式
producer 採用 push 模式將消息發佈到 broker,每條消息都被 append 到 patition 中,屬於順序寫磁盤(順序寫磁盤 比 隨機寫 效率要高,保障 kafka 吞吐率)。
2、消息路由
producer 發送消息到 broker 時,會根據分區算法選擇將其存儲到哪一個 partition。其路由機制爲:
hash(key)%分區數
3、寫入流程
-
producer 先從 ZooKeeper 的 "/brokers/…/state" 節點找到該 partition 的 leader
-
producer 將消息發送給該 leader
-
leader 將消息寫入本地 log
-
followers 從 leader pull 消息,寫入本地 log 後向 leader 發送 ACK
-
leader 收到所有 ISR 中的 replica 的 ACK 後,增加 HW(high watermark,最後 commit 的 offset)並向 producer 發送 ACK
HW 與 LEO
HW 俗稱高水位 ,HighWatermark 的縮寫,取一個 partition 對應的 ISR 中最小的 LEO(log-end-offset)作爲 HW, consumer 最多隻能消費到 HW 所在的位置。另外每個 replica 都有 HW,leader 和 follower 各自負責更新自己的 HW 的狀 態。對於 leader 新寫入的消息,consumer 不能立刻消費,leader 會等待該消息被所有 ISR 中的 replicas 同步後更新 HW, 此時消息才能被 consumer 消費。這樣就保證瞭如果 leader 所在的 broker 失效,該消息仍然可以從新選舉的 leader 中獲取。對於來自內部 broker 的讀取請求,沒有 HW 的限制。
日誌分段存儲
Kafka 一個分區的消息數據對應存儲在一個文件夾下,以 topic 名稱 + 分區號命名,消息在分區內是分段存儲的, 每個段的消息都存儲在不一樣的 log 文件裏,Kafka 規定了一個段位的 log 文件最大爲 1G,做這個限制目的是爲了方便把 log 文件加載到內存去操作:
1 ### 部分消息的offset索引文件,kafka每次往分區發4K(可配置)消息就會記錄一條當前消息的offset到index文件,
2 ### 如果要定位消息的offset會先在這個文件裏快速定位,再去log文件裏找具體消息
3 00000000000000000000.index
4 ### 消息存儲文件,主要存offset和消息體
5 00000000000000000000.log
6 ### 消息的發送時間索引文件,kafka每次往分區發4K(可配置)消息就會記錄一條當前消息的發送時間戳與對應的offset到timeindex文件,
7 ### 如果需要按照時間來定位消息的offset,會先在這個文件裏查找
8 00000000000000000000.timeindex
9
10 00000000000005367851.index
11 00000000000005367851.log
12 00000000000005367851.timeindex
13
14 00000000000009936472.index
15 00000000000009936472.log
16 00000000000009936472.timeindex
這個 9936472 之類的數字,就是代表了這個日誌段文件裏包含的起始 Offset,也就說明這個分區裏至少都寫入了接近 1000 萬條數據了。Kafka Broker 有一個參數,log.segment.bytes,限定了每個日誌段文件的大小,最大就是 1GB。一個日誌段文件滿了,就自動開一個新的日誌段文件來寫入,避免單個文件過大,影響文件的讀寫性能,這個過程叫做 log rolling,正在被寫入的那個日誌段文件,叫做 active log segment。
最後附一張 ZooKeeper 節點數據圖
MQ 帶來的一些問題、及解決方案
如何保證順序消費?
-
RabbitMQ:一個 Queue 對應一個 Consumer 即可解決。
-
RocketMQ****hash(key)% 隊列數
-
Kafka:hash(key)% 分區數
如何實現延遲消費?
-
RabbitMQ:兩種方案
死信隊列 + TTL 引入 RabbitMQ 的延遲插件 -
RocketMQ:天生支持延時消息。
-
Kafka:步驟如下
專門爲要延遲的消息創建一個 Topic 新建一個消費者去消費這個 Topic 消息持久化再開一個線程定時去拉取持久化的消息,放入實際要消費的 Topic 實際消費的消費者從實際要消費的 Topic 拉取消息。
如何保證消息的可靠性投遞
-
Broker--> 消費者:手動 ACK
-
生產者 -->Broker:兩種方案
數據庫持久化:
1.將業務訂單數據和生成的Message進行持久化操作(一般情況下插入數據庫,這裏如果分庫的話可能涉及到分佈式事務)
2.將Message發送到Broker服務器中
3.通過RabbitMQ的Confirm機制,在producer端,監聽服務器是否ACK。
4.如果ACK了,就將Message這條數據狀態更新爲已發送。如果失敗,修改爲失敗狀態。
5.分佈式定時任務查詢數據庫3分鐘(這個具體時間應該根據的時效性來定)之前的發送失敗的消息
6.重新發送消息,記錄發送次數
7.如果發送次數過多仍然失敗,那麼就需要人工排查之類的操作。
優點:能夠保證消息百分百不丟失。
缺點:第一步會涉及到分佈式事務問題。
消息的延遲投遞:
流程圖中,顏色不同的代表不同的message
1.將業務訂單持久化
2.發送一條Message到broker(稱之爲主Message),再發送相同的一條到不同的隊列或者交換機(這條稱爲確認Message)中。
3.主Message由實際業務處理端消費後,生成一條響應Message。之前的確認Message由Message Service應用處理入庫。
4~6.實際業務處理端發送的確認Message由Message Service接收後,將原Message狀態修改。
7.如果該條Message沒有被確認,則通過rpc調用重新由producer進行全過程。
優點:相對於持久化方案來說響應速度有所提升
缺點:系統複雜性有點高,萬一兩條消息都失敗了,消息存在丟失情況,仍需 Confirm 機制做補償。
RocketMQ
生產者弄丟數據:
Producer 在把 Message 發送 Broker 的過程中,因爲網絡問題等發生丟失,或者 Message 到了 Broker,但是出了問題,沒有保存下來。針對這個問題,RocketMQ 對 Producer 發送消息設置了 3 種方式:
同步發送
異步發送
單向發送
Broker 弄丟數據:
Broker 接收到 Message 暫存到內存,Consumer 還沒來得及消費,Broker 掛掉了。
可以通過 持久化 設置去解決:
-
創建 Queue 的時候設置持久化,保證 Broker 持久化 Queue 的元數據,但是不會持久化 Queue 裏面的消息
-
將 Message 的 deliveryMode 設置爲 2,可以將消息持久化到磁盤,這樣只有 Message 支持化到磁盤之後纔會發送通知 Producer ack
這兩步過後,即使 Broker 掛了,Producer 肯定收不到 ack 的,就可以進行重發。
消費者弄丟數據:
Consumer 有消費到 Message,但是內部出現問題,Message 還沒處理,Broker 以爲 Consumer 處理完了,只會把後續的消息發送。這時候,就要 關閉 autoack,消息處理過後,進行手動 ack , 多次消費失敗的消息,會進入 死信隊列 ,這時候需要人工干預。
Kafka
生產者弄丟數據
設置了 acks=all ,一定不會丟,要求是,你的 leader 接收到消息,所有的 follower 都同步到了消息之後,才認爲本次寫成功了。如果沒滿足這個條件,生產者會自動不斷的重試,重試無限次。
Broker 弄丟數據
Kafka 某個 broker 宕機,然後重新選舉 partition 的 leader。大家想想,要是此時其他的 follower 剛好還有些數據沒有同步,結果此時 leader 掛了,然後選舉某個 follower 成 leader 之後,不就少了一些數據?這就丟了一些數據啊。
此時一般是要求起碼設置如下 4 個參數:
replication.factor
min.insync.replicas
acks=all
retries=MAX
我們生產環境就是按照上述要求配置的,這樣配置之後,至少在 Kafka broker 端就可以保證在 leader 所在 broker 發生故障,進行 leader 切換時,數據不會丟失。
消費者弄丟數據
你消費到了這個消息,然後消費者那邊自動提交了 offset,讓 Kafka 以爲你已經消費好了這個消息,但其實你纔剛準備處理這個消息,你還沒處理,你自己就掛了,此時這條消息就丟咯。
這不是跟 RabbitMQ 差不多嗎,大家都知道 Kafka 會自動提交 offset,那麼只要 關閉自動提交 offset,在處理完之後自己手動提交 offset,就可以保證數據不會丟。但是此時確實還是可能會有重複消費,比如你剛處理完,還沒提交 offset,結果自己掛了,此時肯定會重複消費一次,自己保證冪等性就好了。
如何保證消息的冪等?
以 RocketMQ 爲例,下面列出了消息重複的場景:
發送時消息重複
當一條消息已被成功發送到服務端並完成持久化,此時出現了網絡閃斷或者客戶端宕機,導致服務端對客戶端應答失敗。如果此時生產者意識到消息發送失敗並嘗試再次發送消息,消費者後續會收到兩條內容相同並且 Message ID 也相同的消息。
投遞時消息重複
消息消費的場景下,消息已投遞到消費者並完成業務處理,當客戶端給服務端反饋應答的時候網絡閃斷。爲了保證消息至少被消費一次,消息隊列 RocketMQ 版的服務端將在網絡恢復後再次嘗試投遞之前已被處理過的消息,消費者後續會收到兩條內容相同並且 Message ID 也相同的消息。
負載均衡時消息重複(包括但不限於網絡抖動、Broker 重啓以及消費者應用重啓)
當消息隊列 RocketMQ 版的 Broker 或客戶端重啓、擴容或縮容時,會觸發 Rebalance,此時消費者可能會收到重複消息。
那麼,有什麼解決方案呢?直接上圖。
如何解決消息積壓的問題?
關於這個問題,有幾個點需要考慮:
如何快速讓積壓的消息被消費掉?
臨時寫一個消息分發的消費者,把積壓隊列裏的消息均勻分發到 N 個隊列中,同時一個隊列對應一個消費者,相當於消費速度提高了 N 倍。
修改前:
修改後:
積壓時間太久,導致部分消息過期,怎麼處理?
批量重導。在業務不繁忙的時候,比如凌晨,提前準備好程序,把丟失的那批消息查出來,重新導入到 MQ 中。
消息大量積壓,MQ 磁盤被寫滿了,導致新消息進不來了,丟掉了大量消息,怎麼處理?
這個沒辦法。誰讓【消息分發的消費者】寫的太慢了,你臨時寫程序,接入數據來消費,消費一個丟棄一個,都不要了,快速消費掉所有的消息。然後走第二個方案,到了晚上再補數據吧。
作者:讓我來搞這個 bug
來源:www.jianshu.com/p/2975d354fca5
本文由 Readfog 進行 AMP 轉碼,版權歸原作者所有。
來源:https://mp.weixin.qq.com/s/ZQEo7iwjwUkEsR0o0b-02Q