一文帶你理解 RocketMQ 廣播模式實現機制

大家好,我是君哥。今天聊聊 RocketMQ 的廣播消息實現機制。

RocketMQ 有兩種消費模式,集羣模式和廣播模式。

集羣模式是指 RocketMQ 中的一條消息只能被同一個消費者組中的一個消費者消費。如下圖,Producer 向 TopicTest 這個 Topic 併發寫入 3 條新消息,分別被分配到了 MessageQueue1~MessageQueue3 這 3 個隊列,然後 Group 中的三個 Consumer 分別消費了一條消息:

廣播模式是  RocketMQ 中的消息會被消費組中的每個消費者都消費一次,如下圖:

使用 RocketMQ 的廣播模式時,需要在消費端進行定義,下面是一段官方示例:

public static void main(String[] args) throws InterruptedException, MQClientException {
 DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("please_rename_unique_group_name_1");

 consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);

 consumer.setMessageModel(MessageModel.BROADCASTING);

 consumer.subscribe("TopicTest""TagA || TagC || TagD");

 consumer.registerMessageListener(new MessageListenerConcurrently() {
  @Override
  public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,
   ConsumeConcurrentlyContext context) {
   System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msgs);
   return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
  }
 });

 consumer.start();
 System.out.printf("Broadcast Consumer Started.%n");
}

從代碼中可以看到,在定義 Consumer 時,通過 messageModel 這個屬性指定消費模式,這裏指定爲 BROADCASTING,也就啓動了廣播模式的消費者。

1 消費者啓動

以 RocketMQ 推模式爲例,看一下消費者調用關係類圖:

DefaultMQPushConsumer 作爲啓動入口類,它的 start 方法調用了 DefaultMQPushConsumerImpl 類的 start 方法,下面重點看一下這個方法。

1.1 拷貝訂閱關係

start 方法中調用了 copySubscription 方法,代碼如下:

private void copySubscription() throws MQClientException {
 try {
  //拷貝訂閱關係
  switch (this.defaultMQPushConsumer.getMessageModel()) {
   case BROADCASTING:
    break;
   case CLUSTERING:
    final String retryTopic = MixAll.getRetryTopic(this.defaultMQPushConsumer.getConsumerGroup());
    SubscriptionData subscriptionData = FilterAPI.buildSubscriptionData(retryTopic, SubscriptionData.SUB_ALL);
    this.rebalanceImpl.getSubscriptionInner().put(retryTopic, subscriptionData);
    break;
   default:
    break;
  }
 } catch (Exception e) {
  throw new MQClientException("subscription exception", e);
 }
}

這裏的代碼有一點需要注意:集羣模式會創建一個重試 Topic 的訂閱關係,而廣播模式是不會創建這個訂閱關係的。也就是說廣播模式不考慮重試。

1.2 初始化偏移量

下面是初始化 offset 的代碼:

if (this.defaultMQPushConsumer.getOffsetStore() != null) {
 this.offsetStore = this.defaultMQPushConsumer.getOffsetStore();
} else {
 switch (this.defaultMQPushConsumer.getMessageModel()) {
  case BROADCASTING:
   this.offsetStore = new LocalFileOffsetStore(this.mQClientFactory, this.defaultMQPushConsumer.getConsumerGroup());
   break;
  case CLUSTERING:
   this.offsetStore = new RemoteBrokerOffsetStore(this.mQClientFactory, this.defaultMQPushConsumer.getConsumerGroup());
   break;
  default:
   break;
 }
 this.defaultMQPushConsumer.setOffsetStore(this.offsetStore);
}

從上面的代碼可以看到,廣播模式使用了 LocalFileOffsetStore,也就是說偏移量保存在客戶端本地,除了在內存中會保存,在本地文件中也會保存。

2 消息拉取

ConsumeMessageService 是真正拉取消息的地方,消費者初始化時會初始化 ConsumeMessageService,並且這裏會區分併發消息還是順序消息。

2.1 順序消息

在集羣模式下,需要獲取到 processQueue 的鎖纔會拉取消息,而在廣播模式下,不用獲取鎖,直接就可以拉取消息。判斷邏輯如下:

//ConsumeMessageOrderlyService.ConsumeRequest
final Object objLock = messageQueueLock.fetchLockObject(this.messageQueue);
synchronized (objLock) {
 if (MessageModel.BROADCASTING.equals(ConsumeMessageOrderlyService.this.defaultMQPushConsumerImpl.messageModel())
      || (this.processQueue.isLocked() && !this.processQueue.isLockExpired())) {
  }
}

這裏有個疑問,對於順序消息,獲取鎖是必須的,這樣才能保證一個 processQueue 只能由一個線程進行處理,從而保證消費的順序性。那對於廣播模式,爲什麼不用獲取 processQueue 的鎖呢?難道廣播模式不支持順序消息?

2.2 併發消息

對於併發消息,廣播模式不同的是,對消費結果的處理。集羣模式消費失敗後需要把消息發送回 Broker 等待再次被拉取,而廣播模式則不需要重試。代碼如下:

//ConsumeMessageConcurrentlyService.rocessConsumeResult
switch (this.defaultMQPushConsumer.getMessageModel()) {
 case BROADCASTING:
  for (int i = ackIndex + 1; i < consumeRequest.getMsgs().size(); i++) {
   MessageExt msg = consumeRequest.getMsgs().get(i);
   log.warn("BROADCASTING, the message consume failed, drop it, {}", msg.toString());
  }
  break;
 case CLUSTERING:
  List<MessageExt> msgBackFailed = new ArrayList<MessageExt>(consumeRequest.getMsgs().size());
  for (int i = ackIndex + 1; i < consumeRequest.getMsgs().size(); i++) {
   MessageExt msg = consumeRequest.getMsgs().get(i);
   boolean result = this.sendMessageBack(msg, context);
   if (!result) {
    msg.setReconsumeTimes(msg.getReconsumeTimes() + 1);
    msgBackFailed.add(msg);
   }
  }

  if (!msgBackFailed.isEmpty()) {
   consumeRequest.getMsgs().removeAll(msgBackFailed);

   this.submitConsumeRequestLater(msgBackFailed, consumeRequest.getProcessQueue(), consumeRequest.getMessageQueue());
  }
  break;
 default:
  break;
}

這再次說明,廣播模式是不支持消息重試的。

3 重平衡

在消費者啓動過程中,會調用 RebalanceService 的 start 方法,進行重平衡。從重平衡的代碼中可以看到,廣播模式消費者會消費所有 MessageQueue,而集羣模式下會根據負載均衡策略選擇其中幾個 MessageQueue。代碼如下:

private void rebalanceByTopic(final String topic, final boolean isOrder) {
 switch (messageModel) {
  case BROADCASTING: {
   Set<MessageQueue> mqSet = this.topicSubscribeInfoTable.get(topic);
   if (mqSet != null) {
    boolean changed = this.updateProcessQueueTableInRebalance(topic, mqSet, isOrder);
    //省略部分邏輯
   } else {
   }
   break;
  }
  case CLUSTERING: {
   Set<MessageQueue> mqSet = this.topicSubscribeInfoTable.get(topic);
   List<String> cidAll = this.mQClientFactory.findConsumerIdList(topic, consumerGroup);
   //省略部分邏輯
   if (mqSet != null && cidAll != null) {
    //省略部分邏輯
    try {
     allocateResult = strategy.allocate(
      this.consumerGroup,
      this.mQClientFactory.getClientId(),
      mqAll,
      cidAll);
    } catch (Throwable e) {
     return;
    }

    Set<MessageQueue> allocateResultSet = new HashSet<MessageQueue>();
    if (allocateResult != null) {
     allocateResultSet.addAll(allocateResult);
    }

    boolean changed = this.updateProcessQueueTableInRebalance(topic, allocateResultSet, isOrder);
    //省略部分邏輯
   }
   break;
  }
  default:
   break;
 }
}

上面 updateProcessQueueTableInRebalance 這個方法調用前,要獲取到需要消費的 MessageQueue 集合。廣播模式下,直接取了訂閱的 Topic 下的所有集合元素,而集羣模式下,則需要通過負責均衡獲取當前消費者自己要消費的 MessageQueue 集合。

4 總結

本文主要講解了 RocketMQ 廣播消息的實現機制,理解廣播消息,要把握下面幾點:

  1. 偏移量保存在消費者本地內存和文件中;

  2. 廣播消息不支持重試;

  3. 從源碼上看,廣播模式並不能支持順序消息;

  4. 廣播模式消費者訂閱了 Topic 下的所有 MessageQueue,不會重平衡。

君哥聊技術 後端架構師,定期分享技術乾貨,包括後端開發、分佈式、中間件、雲原生等。同時也會分享職場心得、程序人生。關注我,一起進階。

本文由 Readfog 進行 AMP 轉碼,版權歸原作者所有。
來源https://mp.weixin.qq.com/s/g-jUZDH8f6ZX6tIykqByMg