RabbitMQ、RocketMQ、Kafka 三元歸一

RabbitMQ

RabbitMQ 各組件的功能

RabbitMQ 的多種交換機類型

Exchange 分發消息給 Queue 時, Exchange 的類型對應不同的分發策略,有 3 種類型的 Exchange :DirectFanoutTopic

TTL

TTL(Time To Live):生存時間。RabbitMQ 支持消息的過期時間,一共 2 種。

生產者的消息確認機制

Confirm 機制:

如何實現 Confirm 確認消息?

  1. 在 channel 上開啓確認模式:channel.confirmSelect()

  2. 在 channel 上開啓監聽:addConfirmListener ,監聽成功和失敗的處理結果,根據具體的結果對消息進行重新發送或記錄日誌處理等後續操作。

Return 消息機制:

Return Listener 用於處理一些不可路由的消息

我們的消息生產者,通過指定一個 Exchange 和 Routing,把消息送達到某一個隊列中去,然後我們的消費者監聽隊列進行消息的消費處理操作。

但是在某些情況下,如果我們在發送消息的時候,當前的 exchange 不存在或者指定的路由 key 路由不到,這個時候我們需要監聽這種不可達消息,就需要使用到 Returrn Listener。

基礎 API 中有個關鍵的配置項 Mandatory :如果爲 true,監聽器會收到路由不可達的消息,然後進行處理。如果爲 false,broker 端會自動刪除該消息。

同樣,通過監聽的方式, chennel.addReturnListener(ReturnListener rl) 傳入已經重寫過 handleReturn 方法的 ReturnListener。

消費端 ACK 與 NACK

消費端進行消費的時候,如果由於業務異常可以進行日誌的記錄,然後進行補償。但是對於服務器宕機等嚴重問題,我們需要手動 ACK 保障消費端消費成功。

// deliveryTag:消息在mq中的唯一標識

// multiple:是否批量(和qos設置類似的參數)

// requeue:是否需要重回隊列。或者丟棄或者重回隊首再次消費。

public void basicNack(long deliveryTag, boolean multiple, boolean requeue)

如上代碼,消息在消費端重回隊列是爲了對沒有成功處理消息,把消息重新返回到 Broker。一般來說,實際應用中都會關閉重回隊列(避免進入死循環),也就是設置爲 false。

死信隊列 DLX

死信隊列(DLX Dead-Letter-Exchange):當消息在一個隊列中變成死信之後,它會被重新推送到另一個隊列,這個隊列就是死信隊列。

DLX 也是一個正常的 Exchange,和一般的 Exchange 沒有區別,它能在任何的隊列上被指定,實際上就是設置某個隊列的屬性。

當這個隊列中有死信時,RabbitMQ 就會自動的將這個消息重新發布到設置的 Exchange 上去,進而被路由到另一個隊列。

RocketMQ

阿里巴巴雙十一官方指定消息產品,支撐阿里巴巴集團所有的消息服務,歷經十餘年高可用與高可靠的嚴苛考驗,是阿里巴巴交易鏈路的核心產品。

Rocket:火箭的意思。

RocketMQ 的核心概念

他有以下核心概念:Broker 、 Topic 、 Tag 、 MessageQueue 、 NameServer 、 Group 、 Offset 、 Producer 以及 Consumer 。

下面來詳細介紹。

延時消息

開源版的 RocketMQ 不支持任意時間精度,僅支持特定的 level,例如定時 5s,10s,1min 等。其中,level=0 級表示不延時,level=1 表示 1 級延時,level=2 表示 2 級延時,以此類推。

延時等級如下:

messageDelayLevel=1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h

順序消息

消息有序指的是可以按照消息的發送順序來消費(FIFO)。RocketMQ 可以嚴格的保證消息有序,可以分爲 分區有序 或者 全局有序 。

事務消息

消息隊列 MQ 提供類似 X/Open XA 的分佈式事務功能,通過消息隊列 MQ 事務消息能達到分佈式事務的最終一致。上圖說明了事務消息的大致流程:正常事務消息的發送和提交、事務消息的補償流程。

事務消息發送及提交:

  1. 發送 half 消息

  2. 服務端響應消息寫入結果

  3. 根據發送結果執行本地事務(如果寫入失敗,此時 half 消息對業務不可見,本地邏輯不執行);

  4. 根據本地事務狀態執行 Commit 或 Rollback(Commit 操作生成消息索引,消息對消費者可見)。

事務消息的補償流程:

  1. 對沒有 Commit/Rollback 的事務消息(pending 狀態的消息),從服務端發起一次 “回查”;

  2. Producer 收到回查消息,檢查回查消息對應的本地事務的狀態。

  3. 根據本地事務狀態,重新 Commit 或 RollBack

其中,補償階段用於解決消息 Commit 或 Rollback 發生超時或者失敗的情況。

事務消息狀態:

事務消息共有三種狀態:提交狀態、回滾狀態、中間狀態:

  1. TransactionStatus.CommitTransaction:提交事務,它允許消費者消費此消息。

  2. TransactionStatus.RollbackTransaction:回滾事務,它代表該消息將被刪除,不允許被消費。

  3. TransactionStatus.Unkonwn:中間狀態,它代表需要檢查消息隊列來確定消息狀態。

RocketMQ 的高可用機制

‍RocketMQ‍是天生支持分佈式的,可以配置主從以及水平擴展。

Master 角色的 Broker 支持讀和寫,Slave 角色的 Broker 僅支持讀,也就是 Producer 只能和 Master 角色的 Broker 連接寫入消息;Consumer 可以連接 Master 角色的 Broker,也可以連接 Slave 角色的 Broker 來讀取消息。

消息消費的高可用(主從):

在 Consumer 的配置文件中,並不需要設置是從 Master 讀還是從 Slave 讀,當 Master 不可用或者繁忙的時候,Consumer 會被自動切換到從 Slave 讀。有了自動切換 Consumer 這種機制,當一個 Master 角色的機器出現故障後,Consumer 仍然可以從 Slave 讀取消息,不影響 Consumer 程序。這就達到了消費端的高可用性。RocketMQ 目前還不支持把 Slave 自動轉成 Master,如果機器資源不足,需要把 Slave 轉成 Master,則要手動停止 Slave 角色的 Broker,更改配置文件,用新的配置文件啓動 Broker。

消息發送高可用(配置多個主節點):

在創建 Topic 的時候,把 Topic 的多個 Message Queue 創建在多個 Broker 組上(相同 Broker 名稱,不同 brokerId 的機器組成一個 Broker 組),這樣當一個 Broker 組的 Master 不可用後,其他組的 Master 仍然可用,Producer 仍然可以發送消息。

主從複製:

如果一個 Broker 組有 Master 和 Slave,消息需要從 Master 複製到 Slave 上,有同步和異步兩種複製方式。

通常情況下,應該把 Master 和 Save 配置成同步刷盤方式,主從之間配置成異步的複製方式,這樣即使有一臺機器出故障,仍然能保證數據不丟,是個不錯的選擇。

負載均衡

Producer 負載均衡:

Producer 端,每個實例在發消息的時候,默認會輪詢所有的 Message Queue 發送,以達到讓消息平均落在不同的 Queue 上。而由於 Queue 可以散落在不同的 Broker,所以消息就發送到不同的 Broker 下,如下圖:

Consumer 負載均衡:

如果 Consumer 實例的數量比 Message Queue 的總數量還多的話,多出來的 Consumer 實例將無法分到 Queue,也就無法消費到消息,也就無法起到分攤負載的作用了。所以需要控制讓 Queue 的總數量大於等於 Consumer 的數量。

對於廣播模式並不是負載均衡的,要求一條消息需要投遞到一個消費組下面所有的消費者實例,所以也就沒有消息被分攤消費的說法。

死信隊列

當一條消息消費失敗,RocketMQ 就會自動進行消息重試。而如果消息超過最大重試次數,RocketMQ 就會認爲這個消息有問題。但是此時,RocketMQ 不會立刻將這個有問題的消息丟棄,而會將其發送到這個消費者組對應的一種特殊隊列:死信隊列。死信隊列的名稱是 %DLQ%+ConsumGroup 。

死信隊列具有以下特性:

  1. 一個死信隊列對應一個 Group ID, 而不是對應單個消費者實例。

  2. 如果一個 Group ID 未產生死信消息,消息隊列 RocketMQ 不會爲其創建相應的死信隊列。

  3. 一個死信隊列包含了對應 Group ID 產生的所有死信消息,不論該消息屬於哪個 Topic。

Kafka

Kafka 是一個分佈式、支持分區的、多副本的,基於 ZooKeeper 協調的分佈式消息系統。

它最大的特性就是可以實時的處理大量數據以滿足各種需求場景:比如基於 Hadoop 的批處理系統、低延遲的實時系統、Storm/Spark 流式處理引擎,Web/Nginx 日誌、訪問日誌,消息服務等等,用 Scala 語言編寫。屬於 Apache 基金會的頂級開源項目。

先看一下 Kafka 的架構圖 :

Kafka 的核心概念

在 Kafka 中有幾個核心概念:

可以這麼來理解 Topic,Partition 和 Broker:

一個 Topic,代表邏輯上的一個業務數據集,比如訂單相關操作消息放入訂單 Topic,用戶相關操作消息放入用戶 Topic,對於大型網站來說,後端數據都是海量的,訂單消息很可能是非常巨量的,比如有幾百個 G 甚至達到 TB 級別,如果把這麼多數據都放在一臺機器上可定會有容量限制問題,那麼就可以在 Topic 內部劃分多個 Partition 來分片存儲數據,不同的 Partition 可以位於不同的機器上,相當於分佈式存儲。每臺機器上都運行一個 Kafka 的進程 Broker。

Kafka 核心總控制器 Controller

在 Kafka 集羣中會有一個或者多個 Broker,其中有一個 Broker 會被選舉爲控制器(Kafka Controller),可以理解爲 Broker-Leader ,它負責管理整個 集羣中所有分區和副本的狀態。

Partition-Leader

Controller 選舉機制

在 Kafka 集羣啓動的時候,選舉的過程是集羣中每個 Broker 都會嘗試在 ZooKeeper 上創建一個 /controller 臨時節點,ZooKeeper 會保證有且僅有一個 Broker 能創建成功,這個 Broker 就會成爲集羣的總控器 Controller。

當這個 Controller 角色的 Broker 宕機了,此時 ZooKeeper 臨時節點會消失,集羣裏其他 Broker 會一直監聽這個臨時節 點,發現臨時節點消失了,就競爭再次創建臨時節點,就是我們上面說的選舉機制,ZooKeeper 又會保證有一個 Broker 成爲新的 Controller。具備控制器身份的 Broker 需要比其他普通的 Broker 多一份職責,具體細節如下:

  1. 監聽 Broker 相關的變化。爲 ZooKeeper 中的 / brokers/ids / 節點添加 BrokerChangeListener,用來處理 Broker 增減的變化。

  2. 監聽 Topic 相關的變化。爲 ZooKeeper 中的 / brokers/topics 節點添加 TopicChangeListener,用來處理 Topic 增減的變化;爲 ZooKeeper 中的 / admin/delete_topics 節點添加 TopicDeletionListener,用來處理刪除 Topic 的動作。

  3. 從 ZooKeeper 中讀取獲取當前所有與 Topic、Partition 以及 Broker 有關的信息並進行相應的管理 。對於所有 Topic 所對應的 ZooKeeper 中的 / brokers/topics / 節點添加 PartitionModificationsListener,用來監聽 Topic 中的分區分配變化。

  4. 更新集羣的元數據信息,同步到其他普通的 Broker 節點中

Partition 副本選舉 Leader 機制

Controller 感知到分區 Leader 所在的 Broker 掛了,Controller 會從 ISR 列表(參數
unclean.leader.election.enable=false 的前提下)裏挑第一個 Broker 作爲 Leader(第一個 Broker 最先放進 ISR 列表,可能是同步數據最多的副本),如果參數 unclean.leader.election.enable 爲 true,代表在 ISR 列表裏所有副本都掛了的時候可以在 ISR 列表以外的副本中選 Leader,這種設置,可以提高可用性,但是選出的新 Leader 有可能數據少很多。副本進入 ISR 列表有兩個條件:

  1. 副本節點不能產生分區,必須能與 ZooKeeper 保持會話以及跟 Leader 副本網絡連通

  2. 副本能複製 Leader 上的所有寫操作,並且不能落後太多。(與 Leader 副本同步滯後的副本,是由 replica.lag.time.max.ms 配置決定的,超過這個時間都沒有跟 Leader 同步過的一次的副本會被移出 ISR 列表)

消費者消費消息的 Offset 記錄機制

每個 Consumer 會定期將自己消費分區的 Offset 提交給 Kafka 內部 Topic:consumer_offsets,提交過去的時候,key 是 consumerGroupId+topic + 分區號,value 就是當前 Offset 的值,Kafka 會定期清理 Topic 裏的消息,最後就保留最新的那條數據。

因爲__consumer_offsets 可能會接收高併發的請求,Kafka 默認給其分配 50 個分區(可以通過
offsets.topic.num.partitions 設置),這樣可以通過加機器的方式抗大併發。

消費者 Rebalance 機制

Rebalance 就是說 如果消費組裏的消費者數量有變化或消費的分區數有變化,Kafka 會重新分配消費者與消費分區的關係 。比如 consumer group 中某個消費者掛了,此時會自動把分配給他的分區交給其他的消費者,如果他又重啓了,那麼又會把一些分區重新交還給他。

注意:Rebalance 只針對 subscribe 這種不指定分區消費的情況,如果通過 assign 這種消費方式指定了分區,Kafka 不會進行 Rebalance。

如下情況可能會觸發消費者 Rebalance:

  1. 消費組裏的 Consumer 增加或減少了

  2. 動態給 Topic 增加了分區

  3. 消費組訂閱了更多的 Topic

Rebalance 過程中,消費者無法從 Kafka 消費消息,這對 Kafka 的 TPS 會有影響,如果 Kafka 集羣內節點較多,比如數百 個,那重平衡可能會耗時極多,所以應儘量避免在系統高峯期的重平衡發生。

Rebalance 過程如下

當有消費者加入消費組時,消費者、消費組及組協調器之間會經歷以下幾個階段:

第一階段:選擇組協調器

組協調器 GroupCoordinator:每個 consumer group 都會選擇一個 Broker 作爲自己的組協調器 coordinator,負責監控這個消費組裏的所有消費者的心跳,以及判斷是否宕機,然後開啓消費者 Rebalance。consumer group 中的每個 consumer 啓動時會向 Kafka 集羣中的某個節點發送 FindCoordinatorRequest 請求來查找對應的組協調器 GroupCoordinator,並跟其建立網絡連接。組協調器選擇方式:通過如下公式可以選出 consumer 消費的 Offset 要提交到__consumer_offsets 的哪個分區,這個分區 Leader 對應的 Broker 就是這個 consumer group 的 coordinator 公式:

hash(consumer group id) % 對應主題的分區數

第二階段:加入消費組 JOIN GROUP

在成功找到消費組所對應的 GroupCoordinator 之後就進入加入消費組的階段,在此階段的消費者會向 GroupCoordinator 發送 JoinGroupRequest 請求,並處理響應。然後 GroupCoordinator 從一個 consumer group 中選擇第一個加入 group 的 consumer 作爲 Leader(消費組協調器),把 consumer group 情況發送給這個 Leader,接着這個 Leader 會負責制定分區方案。

第三階段(SYNC GROUP)

consumer leader 通過給 GroupCoordinator 發送 SyncGroupRequest,接着 GroupCoordinator 就把分區方案下發給各個 consumer,他們會根據指定分區的 Leader Broker 進行網絡連接以及消息消費。

消費者 Rebalance 分區分配策略

主要有三種 Rebalance 的策略:range 、 round-robin 、 sticky 。默認情況爲 range 分配策略

假設一個主題有 10 個分區(0-9),現在有三個 consumer 消費:

range 策略:按照分區序號排序分配 ,假設 n=分區數/消費者數量 = 3, m=分區數 % 消費者數量 = 1,那麼前 m 個消 費者每個分配 n+1 個分區,後面的(消費者數量-m )個消費者每個分配 n 個分區。比如分區 0~ 3 給一個 consumer,分區 4~ 6 給一個 consumer,分區 7~9 給一個 consumer。

round-robin 策略:輪詢分配 ,比如分區 0、3、6、9 給一個 consumer,分區 1、4、7 給一個 consumer,分區 2、5、 8 給一個 consumer

sticky 策略:初始時分配策略與 round-robin 類似,但是在 rebalance 的時候,需要保證如下兩個原則:

  1. 分區的分配要儘可能均勻 。

  2. 分區的分配儘可能與上次分配的保持相同。

當兩者發生衝突時,第一個目標優先於第二個目標 。這樣可以最大程度維持原來的分區分配的策略。比如對於第一種 range 情況的分配,如果第三個 consumer 掛了,那麼重新用 sticky 策略分配的結果如下:consumer1 除了原有的 0~ 3,會再分配一個 7 consumer2 除了原有的 4~ 6,會再分配 8 和 9。

Producer 發佈消息機制剖析

1、寫入方式

producer 採用 push 模式將消息發佈到 broker,每條消息都被 append 到 patition 中,屬於順序寫磁盤(順序寫磁盤 比 隨機寫 效率要高,保障 kafka 吞吐率)。

2、消息路由

producer 發送消息到 broker 時,會根據分區算法選擇將其存儲到哪一個 partition。其路由機制爲:

hash(key)%分區數

3、寫入流程

  1. producer 先從 ZooKeeper 的 "/brokers/…/state" 節點找到該 partition 的 leader

  2. producer 將消息發送給該 leader

  3. leader 將消息寫入本地 log

  4. followers 從 leader pull 消息,寫入本地 log 後向 leader 發送 ACK

  5. leader 收到所有 ISR 中的 replica 的 ACK 後,增加 HW(high watermark,最後 commit 的 offset)並向 producer 發送 ACK

HW 與 LEO

HW 俗稱高水位 ,HighWatermark 的縮寫,取一個 partition 對應的 ISR 中最小的 LEO(log-end-offset)作爲 HW, consumer 最多隻能消費到 HW 所在的位置。另外每個 replica 都有 HW,leader 和 follower 各自負責更新自己的 HW 的狀 態。對於 leader 新寫入的消息,consumer 不能立刻消費,leader 會等待該消息被所有 ISR 中的 replicas 同步後更新 HW, 此時消息才能被 consumer 消費。這樣就保證瞭如果 leader 所在的 broker 失效,該消息仍然可以從新選舉的 leader 中獲取。對於來自內部 broker 的讀取請求,沒有 HW 的限制。

日誌分段存儲

Kafka 一個分區的消息數據對應存儲在一個文件夾下,以 topic 名稱 + 分區號命名,消息在分區內是分段存儲的, 每個段的消息都存儲在不一樣的 log 文件裏,Kafka 規定了一個段位的 log 文件最大爲 1G,做這個限制目的是爲了方便把 log 文件加載到內存去操作:

### 部分消息的offset索引文件,kafka每次往分區發4K(可配置)消息就會記錄一條當前消息的offset到index文件, ### 如果要定位消息的offset會先在這個文件裏快速定位,再去log文件裏找具體消息 

3 00000000000000000000.index 

4 ### 消息存儲文件,主要存offset和消息體 

5 00000000000000000000.log 

6 ### 消息的發送時間索引文件,kafka每次往分區發4K(可配置)消息就會記錄一條當前消息的發送時間戳與對應的offset到timeindex文件, ### 如果需要按照時間來定位消息的offset,會先在這個文件裏查找 

8 00000000000000000000.timeindex 

9 

10 00000000000005367851.index 

11 00000000000005367851.log 

12 00000000000005367851.timeindex 

13 

14 00000000000009936472.index 

15 00000000000009936472.log 

16 00000000000009936472.timeindex

這個 9936472 之類的數字,就是代表了這個日誌段文件裏包含的起始 Offset,也就說明這個分區裏至少都寫入了接近 1000 萬條數據了。Kafka Broker 有一個參數,log.segment.bytes,限定了每個日誌段文件的大小,最大就是 1GB。一個日誌段文件滿了,就自動開一個新的日誌段文件來寫入,避免單個文件過大,影響文件的讀寫性能,這個過程叫做 log rolling,正在被寫入的那個日誌段文件,叫做 active log segment。

最後附一張 ZooKeeper 節點數據圖

MQ 帶來的一些問題、及解決方案

如何保證順序消費?

如何實現延遲消費?

如何保證消息的可靠性投遞

RabbitMQ

數據庫持久化:

1.將業務訂單數據和生成的Message進行持久化操作(一般情況下插入數據庫,這裏如果分庫的話可能涉及到分佈式事務)

2.將Message發送到Broker服務器中

3.通過RabbitMQ的Confirm機制,在producer端,監聽服務器是否ACK。

4.如果ACK了,就將Message這條數據狀態更新爲已發送。如果失敗,修改爲失敗狀態。

5.分佈式定時任務查詢數據庫3分鐘(這個具體時間應該根據的時效性來定)之前的發送失敗的消息

6.重新發送消息,記錄發送次數

7.如果發送次數過多仍然失敗,那麼就需要人工排查之類的操作。

優點:能夠保證消息百分百不丟失。

缺點:第一步會涉及到分佈式事務問題。

消息的延遲投遞:

流程圖中,顏色不同的代表不同的message

1.將業務訂單持久化

2.發送一條Message到broker(稱之爲主Message),再發送相同的一條到不同的隊列或者交換機(這條稱爲確認Message)中。

3.主Message由實際業務處理端消費後,生成一條響應Message。之前的確認Message由Message Service應用處理入庫。

4~6.實際業務處理端發送的確認Message由Message Service接收後,將原Message狀態修改。

7.如果該條Message沒有被確認,則通過rpc調用重新由producer進行全過程。

優點:相對於持久化方案來說響應速度有所提升

缺點:系統複雜性有點高,萬一兩條消息都失敗了,消息存在丟失情況,仍需 Confirm 機制做補償。

RocketMQ

生產者弄丟數據:

Producer 在把 Message 發送 Broker 的過程中,因爲網絡問題等發生丟失,或者 Message 到了 Broker,但是出了問題,沒有保存下來。針對這個問題,RocketMQ 對 Producer 發送消息設置了 3 種方式:

同步發送
異步發送
單向發送

Broker 弄丟數據:

Broker 接收到 Message 暫存到內存,Consumer 還沒來得及消費,Broker 掛掉了。

可以通過 持久化 設置去解決:

  1. 創建 Queue 的時候設置持久化,保證 Broker 持久化 Queue 的元數據,但是不會持久化 Queue 裏面的消息

  2. 將 Message 的 deliveryMode 設置爲 2,可以將消息持久化到磁盤,這樣只有 Message 支持化到磁盤之後纔會發送通知 Producer ack

這兩步過後,即使 Broker 掛了,Producer 肯定收不到 ack 的,就可以進行重發。

消費者弄丟數據:

Consumer 有消費到 Message,但是內部出現問題,Message 還沒處理,Broker 以爲 Consumer 處理完了,只會把後續的消息發送。這時候,就要 關閉 autoack,消息處理過後,進行手動 ack , 多次消費失敗的消息,會進入 死信隊列 ,這時候需要人工干預。

Kafka

生產者弄丟數據

設置了 acks=all ,一定不會丟,要求是,你的 leader 接收到消息,所有的 follower 都同步到了消息之後,才認爲本次寫成功了。如果沒滿足這個條件,生產者會自動不斷的重試,重試無限次。

Broker 弄丟數據

Kafka 某個 broker 宕機,然後重新選舉 partition 的 leader。大家想想,要是此時其他的 follower 剛好還有些數據沒有同步,結果此時 leader 掛了,然後選舉某個 follower 成 leader 之後,不就少了一些數據?這就丟了一些數據啊。

此時一般是要求起碼設置如下 4 個參數:

replication.factor
min.insync.replicas
acks=all
retries=MAX

我們生產環境就是按照上述要求配置的,這樣配置之後,至少在 Kafka broker 端就可以保證在 leader 所在 broker 發生故障,進行 leader 切換時,數據不會丟失。

消費者弄丟數據

你消費到了這個消息,然後消費者那邊自動提交了 offset,讓 Kafka 以爲你已經消費好了這個消息,但其實你纔剛準備處理這個消息,你還沒處理,你自己就掛了,此時這條消息就丟咯。

這不是跟 RabbitMQ 差不多嗎,大家都知道 Kafka 會自動提交 offset,那麼只要 關閉自動提交 offset,在處理完之後自己手動提交 offset,就可以保證數據不會丟。但是此時確實還是可能會有重複消費,比如你剛處理完,還沒提交 offset,結果自己掛了,此時肯定會重複消費一次,自己保證冪等性就好了。

如何保證消息的冪等?

以 RocketMQ 爲例,下面列出了消息重複的場景:

發送時消息重複

當一條消息已被成功發送到服務端並完成持久化,此時出現了網絡閃斷或者客戶端宕機,導致服務端對客戶端應答失敗。如果此時生產者意識到消息發送失敗並嘗試再次發送消息,消費者後續會收到兩條內容相同並且 Message ID 也相同的消息。

投遞時消息重複

消息消費的場景下,消息已投遞到消費者並完成業務處理,當客戶端給服務端反饋應答的時候網絡閃斷。爲了保證消息至少被消費一次,消息隊列 RocketMQ 版的服務端將在網絡恢復後再次嘗試投遞之前已被處理過的消息,消費者後續會收到兩條內容相同並且 Message ID 也相同的消息。

負載均衡時消息重複(包括但不限於網絡抖動、Broker 重啓以及消費者應用重啓)

當消息隊列 RocketMQ 版的 Broker 或客戶端重啓、擴容或縮容時,會觸發 Rebalance,此時消費者可能會收到重複消息。

那麼,有什麼解決方案呢?直接上圖。

如何解決消息積壓的問題?

關於這個問題,有幾個點需要考慮:

如何快速讓積壓的消息被消費掉?

臨時寫一個消息分發的消費者,把積壓隊列裏的消息均勻分發到 N 個隊列中,同時一個隊列對應一個消費者,相當於消費速度提高了 N 倍。

修改前:

修改後:

積壓時間太久,導致部分消息過期,怎麼處理?

批量重導。在業務不繁忙的時候,比如凌晨,提前準備好程序,把丟失的那批消息查出來,重新導入到 MQ 中。

消息大量積壓,MQ 磁盤被寫滿了,導致新消息進不來了,丟掉了大量消息,怎麼處理?

這個沒辦法。誰讓【消息分發的消費者】寫的太慢了,你臨時寫程序,接入數據來消費,消費一個丟棄一個,都不要了,快速消費掉所有的消息。然後走第二個方案,到了晚上再補數據吧。

作者:讓我來搞這個 bug

來源:www.jianshu.com/p/2975d354fca5

本文由 Readfog 進行 AMP 轉碼,版權歸原作者所有。
來源https://mp.weixin.qq.com/s/ZQEo7iwjwUkEsR0o0b-02Q