5 張圖帶你理解 RocketMQ 延時消息機制
大家好,我是君哥。今天來聊一聊 RocketMQ 的延時消息是怎麼實現的。
延時消息是指發送到 RocketMQ 後不會馬上被消費者拉取到,而是等待固定的時間,才能被消費者拉取到。
延時消息的使用場景很多,比如電商場景下關閉超時未支付的訂單,某些場景下需要在固定時間後發送提示消息。
1 生產者
首先看一個生產者發送延時消息的官方示例代碼:
public static void main(String[] args) throws Exception {
// Instantiate a producer to send scheduled messages
DefaultMQProducer producer = new DefaultMQProducer("ExampleProducerGroup");
// Launch producer
producer.start();
int totalMessagesToSend = 100;
for (int i = 0; i < totalMessagesToSend; i++) {
Message message = new Message("TestTopic", ("Hello scheduled message " + i).getBytes());
// This message will be delivered to consumer 10 seconds later.
message.setDelayTimeLevel(3);
// Send the message
producer.send(message);
}
// Shutdown producer after use.
producer.shutdown();
}
從上面的代碼可以看到,跟普通消息不一樣的是,消息設置 setDelayTimeLevel 屬性值,這裏設置爲 3,這裏最終將 3 這個延時級別複製給了 DELAY 屬性。
關於延時級別,可以看下面這個定義:
//MessageStoreConfig類
private String messageDelayLevel = "1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h";
這裏延時級別有 18 個,上面的示例代碼中延遲級別是 3,消息會延遲 10s 後消費者才能拉取。
2 Broker 處理
2.1 寫入消息
Broker 收到消息後,會將消息寫入 CommitLog。在寫入時,會判斷消息 DELAY 屬性是否大於 0。代碼如下:
//CommitLog 類
if (msg.getDelayTimeLevel() > 0) {
if (msg.getDelayTimeLevel() > this.defaultMessageStore.getScheduleMessageService().getMaxDelayLevel()) {
msg.setDelayTimeLevel(this.defaultMessageStore.getScheduleMessageService().getMaxDelayLevel());
}
topic = TopicValidator.RMQ_SYS_SCHEDULE_TOPIC;
int queueId = ScheduleMessageService.delayLevel2QueueId(msg.getDelayTimeLevel());
// Backup real topic, queueId
MessageAccessor.putProperty(msg, MessageConst.PROPERTY_REAL_TOPIC, msg.getTopic());
MessageAccessor.putProperty(msg, MessageConst.PROPERTY_REAL_QUEUE_ID, String.valueOf(msg.getQueueId()));
msg.setPropertiesString(MessageDecoder.messageProperties2String(msg.getProperties()));
msg.setTopic(topic);
msg.setQueueId(queueId);
}
從上面的代碼可以看到,CommitLog 寫入時並沒有直接寫入,而是把 Topic 改爲 SCHEDULE_TOPIC_XXXX,把 queueId 改爲延時級別減 1。因爲延時級別有 18 個,所以這裏有 18 個隊列。如下圖:
2.2 調度消息
延時消息寫入後,會有一個調度任務不停地拉取這些延時消息,這個邏輯在類 ScheduleMessageService。這個類的初始化代碼如下:
public void start() {
if (started.compareAndSet(false, true)) {
this.load();
this.deliverExecutorService = new ScheduledThreadPoolExecutor(this.maxDelayLevel, new ThreadFactoryImpl("ScheduleMessageTimerThread_"));
//省略部分邏輯
for (Map.Entry<Integer, Long> entry : this.delayLevelTable.entrySet()) {
Integer level = entry.getKey();
Long timeDelay = entry.getValue();
Long offset = this.offsetTable.get(level);
if (null == offset) {
offset = 0L;
}
if (timeDelay != null) {
//省略部分邏輯
this.deliverExecutorService.schedule(new DeliverDelayedMessageTimerTask(level, offset), FIRST_DELAY_TIME, TimeUnit.MILLISECONDS);
}
}
//省略持久化的邏輯
}
}
上面的 load() 方法會加載一個 delayLevelTable(ConcurrentHashMap 類型),key 保存延時級別(從 1 開始),value 保存延時時間 (單位是 ms)。
load() 方法結束後,創建了一個有 18 個核心線程的定時線程池,然後遍歷 delayLevelTable,創建 18 個任務(DeliverDelayedMessageTimerTask)進行每個延時級別的任務調度。任務調度的代碼邏輯如下:
public void executeOnTimeup() {
ConsumeQueue cq =
ScheduleMessageService.this.defaultMessageStore.findConsumeQueue(TopicValidator.RMQ_SYS_SCHEDULE_TOPIC,
delayLevel2QueueId(delayLevel));
if (cq == null) {
this.scheduleNextTimerTask(this.offset, DELAY_FOR_A_WHILE);
return;
}
SelectMappedBufferResult bufferCQ = cq.getIndexBuffer(this.offset);
if (bufferCQ == null) {
//省略部分邏輯
this.scheduleNextTimerTask(resetOffset, DELAY_FOR_A_WHILE);
return;
}
long nextOffset = this.offset;
try {
int i = 0;
ConsumeQueueExt.CqExtUnit cqExtUnit = new ConsumeQueueExt.CqExtUnit();
for (; i < bufferCQ.getSize() && isStarted(); i += ConsumeQueue.CQ_STORE_UNIT_SIZE) {
long offsetPy = bufferCQ.getByteBuffer().getLong();
int sizePy = bufferCQ.getByteBuffer().getInt();
long tagsCode = bufferCQ.getByteBuffer().getLong();
//省略部分邏輯
long now = System.currentTimeMillis();
long deliverTimestamp = this.correctDeliverTimestamp(now, tagsCode);
nextOffset = offset + (i / ConsumeQueue.CQ_STORE_UNIT_SIZE);
long countdown = deliverTimestamp - now;
if (countdown > 0) {
this.scheduleNextTimerTask(nextOffset, DELAY_FOR_A_WHILE);
return;
}
MessageExt msgExt = ScheduleMessageService.this.defaultMessageStore.lookMessageByOffset(offsetPy, sizePy);
if (msgExt == null) {
continue;
}
MessageExtBrokerInner msgInner = ScheduleMessageService.this.messageTimeup(msgExt);
//事務消息判斷省略
boolean deliverSuc;
//只保留同步
deliverSuc = this.syncDeliver(msgInner, msgExt.getMsgId(), nextOffset, offsetPy, sizePy);
if (!deliverSuc) {
this.scheduleNextTimerTask(nextOffset, DELAY_FOR_A_WHILE);
return;
}
}
nextOffset = this.offset + (i / ConsumeQueue.CQ_STORE_UNIT_SIZE);
} catch (Exception e) {
log.error("ScheduleMessageService, messageTimeup execute error, offset = {}", nextOffset, e);
} finally {
bufferCQ.release();
}
this.scheduleNextTimerTask(nextOffset, DELAY_FOR_A_WHILE);
}
這段代碼可以參考下面的流程圖來進行理解:
上面有一個修正投遞時間的函數,這個函數的意義是如果已經過了投遞時間,那麼立即投遞。代碼如下:
private long correctDeliverTimestamp(final long now, final long deliverTimestamp) {
long result = deliverTimestamp;
long maxTimestamp = now + ScheduleMessageService.this.delayLevelTable.get(this.delayLevel);
if (deliverTimestamp > maxTimestamp) {
result = now;
}
return result;
}
注意:消息從 CommitLog 轉發到 ConsumeQueue 時,會判斷是否是延時消息(Topic = SCHEDULE_TOPIC_XXXX 並且延時級別大於 0),如果是延時消息,就會修改 tagsCode 值爲消息投遞的時間戳,而 tagsCode 原值是 tag 的 HashCode。代碼如下:
//CommitLog類checkMessageAndReturnSize方法
if (delayLevel > 0) {
tagsCode = this.defaultMessageStore.getScheduleMessageService().computeDeliverTimestamp(delayLevel,
storeTimestamp);
}
如下圖:
而 ScheduleMessageService 調度線程將消息從 ConsumeQueue 重新投遞到原始隊列中時,會把 tagsCode 再次修改爲 tag 的 HashCode,代碼如下:
//類MessageExtBrokerInner,這個方法被 messageTimeup 方法調用。
public static long tagsString2tagsCode(final TopicFilterType filter, final String tags) {
if (null == tags || tags.length() == 0) { return 0; }
return tags.hashCode();
}
如下圖:
2.3 一個問題
如果有一個業務場景,要求延時消息 3 小時才能消費,而 RocketMQ 的延時消息最大延時級別只支持延時 2 小時,怎麼處理?
這裏提供兩個思路供大家參考:
-
在 Broker 上修改 messageDelayLevel 的默認配置;
-
在客戶端緩存 msgId,先設置延時級別是 18(2h),當客戶端拉取到消息後首先判斷有沒有緩存,如果有緩存則再次發送延時消息,這次延時級別是 17(1h),如果沒有緩存則進行消費。
3 總結
經過上面的講解,延時消息的處理流程如下:
最後,延時消息的延時時間並不精確,這個時間是 Broker 調度線程把消息重新投遞到原始的 MessageQueue 的時間,如果發生消息積壓或者 RocketMQ 客戶端發生流量管控,客戶端拉取到消息後進行處理的時間可能會超出預設的延時時間。
君哥聊技術 後端架構師,定期分享技術乾貨,包括後端開發、分佈式、中間件、雲原生等。同時也會分享職場心得、程序人生。關注我,一起進階。
本文由 Readfog 進行 AMP 轉碼,版權歸原作者所有。
來源:https://mp.weixin.qq.com/s/QNkDu0f1R-MFCY-TcBDA9Q