RocketMQ 消息重試機制解析!

文章首發到公衆號:月伴飛魚

文章內容收錄到個人網站,方便閱讀:http://hardyfish.top/

文章內容收錄到個人網站,方便閱讀:http://hardyfish.top/

由於網絡抖動、服務宕機等一些不確定的因素,RocketMQ在發送消息的時候很有可能出現消息發送或者消費失敗的問題。

所以RocketMQ消息重試分爲 2 種:

Producer端重試和Consumer端重試。

Producer 端重試

生產者端的消息失敗,也就是ProducerMQ上發消息沒有發送成功。

  • 比如網絡抖動致使生產者發送消息到MQ失敗。

這種消息失敗重試可以手動設置發送失敗重試的次數。

producer.setRetryTimesWhenSendFailed(3);

官方說明

Producersend方法本身支持內部重試。

重試邏輯:

  • 默認至多重試 2 次。

  • 這個方法的總耗時時間不超過sendMsgTimeout設置的值,默認 10s。

如果本身向Broker發送消息產生超時異常,就不會再重試。

  • 以上策略也是在一定程度上保證了消息可以發送成功。

如果業務對消息可靠性要求比較高,建議增加相應的重試邏輯:

  • 比如調用send同步方法發送失敗時,則嘗試將消息存儲到DB

  • 然後由後臺線程定時重試,確保消息一定到達Broker

重試策略

消息發送重試有三種策略:

同步發送失敗策略、異步發送失敗策略和消息刷盤失敗策略。

同步發送失敗策略:

普通消息,消息發送默認採用round-robin策略 (輪轉) 來選擇所發送到的隊列。

  • 如果發送失敗,默認重試 2 次。

但在重試時是不會選擇上次發送失敗的Broker,而是選擇其它Broker

DefaultMQProducer producer = new DefaultMQProducer("pg");
// 設置同步發送失敗時重試發送的次數,默認爲2次
producer.setRetryTimesWhenSendFailed(3);
// 設置發送超時時限爲5s,默認10s
producer.setSendMsgTimeout(5000);

異步發送失敗策略:

異步發送失敗重試時,異步重試不會選擇其他Broker,僅在當前Broker上做重試。

  • 所以該策略無法保證消息不丟失。
DefaultMQProducer producer = new DefaultMQProducer("pg");
producer.setNamesrvAddr("rocketmqOS:9876");
// 指定異步發送失敗後不進行重試發送
producer.setRetryTimesWhenSendAsyncFailed(0);

消息刷盤失敗策略:

消息刷盤超時,默認是不會將消息嘗試發送到其他Broker

對於重要消息可以通過在Broker的配置文件設置retryAnotherBrokerWhenNotStoreOK屬性爲true來開啓。

幾種情況

異步發送在發送過程中出現異常進行重試:

在解析請求結果時,發現響應狀態碼有其它異常(消息可能未正確被Broker處理)會繼續進行重試。

  • 重試依然選擇當前Broker

但是如果響應結果不爲空的話,即使處理響應時發生異常也不會進行重試。

同步發送時:

如果發送過程中沒有異常,但是發送結果不 OK,也會選擇另一個Broker繼續進行重試。

順序消息發送失敗不進行重試:

順序消息:指的是同步 + 指定消息隊列的方式發送。

Consumer 端重試

消息正常的到了消費者,結果消費者發生異常,處理失敗了。

例如反序列化失敗,消息數據本身無法處理等。

順序消息

順序消息的消費重試

順序消息,當Consumer消費消息失敗後,爲了保證消息的順序性,其會自動不斷地進行消息重試,直到消費成功。

  • 消費重試默認間隔時間爲1000ms

重試期間應用會出現消息消費被阻塞的情況。

DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("cg");
// 順序消息消費失敗的消費重試時間間隔,單位毫秒,默認爲1000,其取值範圍爲[10, 30000]
consumer.setSuspendCurrentQueueTimeMillis(100);

由於對順序消息的重試是無休止的,不間斷的,直至消費成功。

**注意:**順序消息沒有發送失敗重試機制,但具有消費失敗重試機制。

消費狀態

順序消費目前兩個狀態:SUCCESSSUSPEND_CURRENT_QUEUE_A_MOMENT

SUSPEND_CURRENT_QUEUE_A_MOMENT意思是先暫停消費一下:

  • SuspendCurrentQueueTimeMillis時間間隔後再重試一下,而不是放到重試隊列裏。
public enum ConsumeOrderlyStatus {
    SUCCESS,
    
    @Deprecated
    ROLLBACK,
    
    @Deprecated
    COMMIT,
    
    SUSPEND_CURRENT_QUEUE_A_MOMENT;
}

併發消息

併發消息的消費重試

在併發消費中,可能會有多個線程同時消費一個隊列的消息。

因此即使發送端通過發送順序消息保證消息在同一個隊列中按照FIFO的順序,也無法保證消息實際被順序消費。

  • 所有併發消費也可以稱之爲無序消費

對於無序消息 (普通消息、延時消息、事務消息):

  • Consumer消費消息失敗時,可以通過設置返回狀態達到消息重試的效果。

注意:

無序消息的重試只針對集羣消費模式生效。

廣播消費模式不提供失敗重試特性:即消費失敗後,失敗消息不再重試,繼續消費新的消息。

消費狀態

Consumer端消息消費有兩種狀態:

一個是成功 (CONSUME_SUCCESS),一個是失敗 & 稍後重試 (RECONSUME_LATER)。

Consumer爲了保證消息消費成功,只有使用方明確表示消費成功。

  • 返回CONSUME_SUCCESSRocketMQ纔會認爲消息消費成功。

若是消息消費失敗,只要返回:ConsumeConcurrentlyStatus.RECONSUME_LATER

  • RocketMQ就會認爲消息消費失敗了,要重新投遞。
public enum ConsumeConcurrentlyStatus {
    CONSUME_SUCCESS,
    RECONSUME_LATER;   
}

重試機制

爲了保證消息是確定被至少消費成功一次,RocketMQ會把這批消息重發回Broker

  • Topic不是原Topic而是一個RETRY Topic

在延遲的某個時間點(默認是 10 秒,業務可設置)後,再次投遞。

而若是一直這樣重複消費都持續失敗到必定次數(默認 16 次),就會投遞到死信隊列

在啓動Broker的過程當中,能夠觀察到以下輸出:

2024-09-19 16:29:58 INFO main - messageDelayLevel=1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h

RECONSUME_LATER 策略:

若是消費失敗,那麼 1S 後再次消費,若是失敗,那麼 5S 後,再次消費,…… 直至 2H 後若是消費還失敗。

  • 那麼該條消息就會終止發送給消費者了。

消息重試間隔時間如下:

spKmXP

某條消息在一直消費失敗的前提下,將會在接下來的 4 小時 46 分鐘之內進行 16 次重試。

  • 超過這個時間範圍消息將不再重試投遞,而被投遞至死信隊列

修改消費重試次數:

DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("cg");
// 修改消費重試次數
consumer.setMaxReconsumeTimes(10);

基本原理

重試的 MessageRocketMQ 的做法並不是將其投遞迴原 Topic,而是重試隊列

每個 ConsumerGroup 都有自己的重試隊列:

  • 其名稱是由特定的前綴拼接上 ConsumerGroup 所組成,默認 %RETRY%+消費者組名稱

  • 所以在 Consumer 啓動時,就會同時消費其 ConsumerGroup 對應的重試隊列普通隊列

消費失敗的 MessageConsumer 會將其投回 Broker

  • 相當於這條 Message 已經被消費掉了,之後重試的只是內容相同、但實際不是同一條的 Message

  • 然後會校驗重試的次數,如果達到 16 次則會進入死信隊列 **,**組成爲 %DLQ%+消費者組名稱

  • 未達到最大重試次數,則會根據重試間隔時間等級將其投遞到延遲隊列SCHEDULE_TOPIC_XXXX中。

  • 然後等到了延遲等級對應的時間之後,再投遞到 ConsumerGroup 所對應的重試隊列當中,供後續消費。

消息重複

如果消費端收到兩條一樣的消息,應該怎樣處理?

《RocketMQ 原理簡介》中講到:

RocketMQ 無法避免消息重複。

所以如果業務對消費重複非常敏感,務必要在業務側去重,有以下幾種去重方式:

消費端處理消息的業務邏輯保持冪等性

  • 如何保證冪等性,可以看我之前的文章!

保證每條消息都有唯一編號且保證消息處理成功與去重表的日誌同時出現。

  • 利用一張日誌表來記錄已經處理成功的消息的 ID。

  • 如果新到的消息 ID 已經在日誌表中,那麼就不再處理這條消息。

本文由 Readfog 進行 AMP 轉碼,版權歸原作者所有。
來源https://mp.weixin.qq.com/s/Ei-eYFfE0devt_vq0ZRfuw