一文帶你瞭解 Pulsar Broker 負載均衡

趙亞軍,StreamNative Solution Engineer。

導語

Apache Pulsar 採用了計算和存儲分離的分佈式架構,其友好的彈性伸縮能力頗受廣大用戶歡迎。Pulsar 除了存儲層的分片架構規避了歷史數據平衡的缺陷,計算層 Broker 的負載均衡模塊也在彈性伸縮過程以及資源調節中發揮着重要的作用。本文將帶大家揭開 Pulsar Broker 負載均衡的神祕面紗。

原理介紹

負載均衡對於一個分佈式系統而言是不可或缺的重要模塊,Apache Pulsar 實現了 Broker 無需手工干預的動態負載均衡,得益於 Pulsar 在設計之初從架構上分離了存儲層和計算層,可以更加靈活地分配 Broker 與 Topic 的映射關係。Pulsar Broker 節點本身並不會存儲任何狀態信息,其消息持久化保存到 BookKeeper 上,而 Broker 之間用於協調狀態信息和 Topic 相關的元數據信息存儲在 Metadata Store 中,這些特性讓 Pulsar Broker 成爲無狀態服務。當多個 Broker 之間進行負載均衡時,無需將 Topic 中的消息從一個 Broker 複製到另一個 Broker。

對於分佈式消息隊列而言,當某個 Broker 節點出現高負載並且整體負載分佈不均時能夠進行負載平衡,進行負載均衡的最小單位一般是 Topic/Partition。Pulsar 也採用了這樣的思路,不過若想支持百萬級 Topic,如果按照單個 Topic/Partition 來調度,勢必會爲負載均衡帶來非常大的計算壓力,Pulsar 採用了聚合分組(Bundle)的管理方式,負載均衡的最小單位設計爲 Bundle,多個 Topic/Partition 分配到同一個 Bundle 內,減少了用於負載計算的數據規模,從而提升了支持 Topic 的數量級。

Pulsar 提供了 “開箱即用” 的負載管理功能。Broker 中的負載均衡由org.apache.pulsar.broker.loadbalance模塊實現,當 Broker 出現負載超載,Loadbalance 觸發均衡流程,通過智能策略進行自動 unload bundle,整體的處理過程如下:

1.LoadManager 模塊依據負載指標、負載策略和觸發門限來獲取符合要求的 Broker。
2.Loadbalance 模塊遍歷該 Broker 內的所有 Bundle,從而獲得被卸載的 Bundle 列表。
3. 根據 Bundle 名稱得到對應的 Namespace 名稱。
4. 使用管理 API 卸載 Bundle。

限於篇幅本文不打算介紹 Bundle 與 Topic/Partition 的映射關係,只介紹基於負載的自動 unload bundle,主要包括在 Broker 負載均衡中用到的衡量指標、負載均衡策略、Bundle 卸載流程等,最後進行了驗證測試。

負載衡量指標

觸發 loadbalance 的前提是集羣中 Broker 節點出現負載超過閥值的情況,包括 CPU 使用率、堆內 / 堆外內存使用率、Broker 的帶寬 IN / 帶寬 OUT、Broker 上 Bundle 的負載等。

負載信息

Broker 實時負載信息

Broker 的負載信息在 ZooKeeper 中保存,路徑爲 / loadbalance/brokers/[broker host:port],保存的 Broker 實時負載數據樣例如下:

{

//以下爲broker的鏈接信息
"webServiceUrl":"<http://localhost:8080>",
"pulsarServiceUrl":"pulsar://localhost:6650",
"persistentTopicsEnabled":true,
"nonPersistentTopicsEnabled":true,

//以下爲Broker最新的系統資源使用量
"cpu":{
       "usage":0.0,
       "limit":0.0
       },
"memory":{
      "usage":0.0,
      "limit":0.0
       },
"directMemory":{
     "usage":0.0,
     "limit":0.0
       },
"bandwidthIn":{
     "usage":0.0,
     "limit":0.0
       },
"bandwidthOut":{
     "usage":0.0,
     "limit":0.0
       },


//以下爲Broker上所有Bundle用量的統計
"msgThroughputIn":0.0,
"msgThroughputOut":0.0,
"msgRateIn":0.0,
"msgRateOut":0.0,
"lastUpdate":1668497351981,

//以下爲Broker上的詳細信息
"lastStats":{},
"numTopics":0,
"numBundles":0,
"numConsumers":0,
"numProducers":0,

//以下爲其他信息
"Bundles":[],
"lastBundleGains":[],
"lastBundleLosses":[],
"Protocols":{},
"advertisedListeners":{},
"bundleStats":{},
"maxResourceUsage":0.0,
"loadReportType":"LocalBrokerData"
}

Broker 平均負載信息

Broker 的平均負載信息在 ZooKeeper 中保存,路徑爲 / loadbalance/broker-time-average/[broker host:port],保存的 Broker 平均負載數據樣例如下:

{
    "shortTermMsgThroughputIn": 4.866669527654777,
    "shortTermMsgThroughputOut": 5.533336591887468,
    "shortTermMsgRateIn": 0.06666670585828466,
    "shortTermMsgRateOut": 0.06666670592635504,
    "longTermMsgThroughputIn": 4.865849676427839,
    "longTermMsgThroughputOut": 5.532400443487092,
    "longTermMsgRateIn": 0.06665547501955964,
    "longTermMsgRateOut": 0.06665542702996446
}

Bundle 負載信息

Bundle 的歷史負載信息在 ZooKeeper 中保存,路徑爲 / loadbalance/bundle-data/[tenant]/[namespace]/[bundle],保存的 Bundle 負載數據樣例如下:

{
  "shortTermData" : {
    "maxSamples" : 10,
    "numSamples" : 6,
    "msgThroughputIn" : 0.0,
    "msgThroughputOut" : 8.911721910539358,
    "msgRateIn" : 0.0,
    "msgRateOut" : 0.15913789125963138
  },
  "longTermData" : {
    "maxSamples" : 1000,
    "numSamples" : 6,
    "msgThroughputIn" : 0.0,
    "msgThroughputOut" : 8.911721910539358,
    "msgRateIn" : 0.0,
    "msgRateOut" : 0.15913789125963138
  },
  "topics" : 1
}

負載上報

Broker 實時負載信息上報

Pulsar 有專門的定時調度線程池 loadManagerExecutor,用於每個 Broker 按照loadBalancerHostUsageCheckIntervalMinutes的頻率收集本地 Broker 的負載信息,不斷刷新本地緩存並上報當前 Broker 的負載信息。每次執行時,先刷新本地負載信息的 Cache,當有負載變化比例超過loadBalancerReportUpdateThresholdPercentage配置值,或距離上次更新大於loadBalancerReportUpdateMaxIntervalMinutes配置值時,觸發負載信息上報到 ZooKeeper。

具體代碼如下(ModularLoadManagerImpl.java 中):

// Determine if the broker data requires an update by delegating to the update condition.
private boolean needBrokerDataUpdate() {
   final long updateMaxIntervalMillis = TimeUnit.MINUTES       .toMillis(conf.getLoadBalancerReportUpdateMaxIntervalMinutes());
   ……
   if (timeSinceLastReportWrittenToStore > updateMaxIntervalMillis) {
       log.info("Writing local data to metadata store because time since last" + " update exceeded threshold of {} minutes",               conf.getLoadBalancerReportUpdateMaxIntervalMinutes());
       // Always update after surpassing the maximum interval.
       return true;
   }
   ……
   if (maxChange > conf.getLoadBalancerReportUpdateThresholdPercentage()) {
       log.info("Writing local data to metadata store because maximum change {}% exceeded threshold {}%; " + "time since last report written is {} seconds", maxChange,
conf.getLoadBalancerReportUpdateThresholdPercentage(),timeSinceLastReportWrittenToStore / 1000.0);
       return true;
   }
   return false;
}

以上參數配置如下(本篇所有配置均在broker.conf中,下同):

# Percentage of change to trigger load report update
loadBalancerReportUpdateThresholdPercentage=10

# maximum interval to update load report
loadBalancerReportUpdateMaxIntervalMinutes=15

# Frequency of report to collect
loadBalancerHostUsageCheckIntervalMinutes=1

Broker 平均負載信息上報

Leader Broker 會通過 ZooKeeper 的 Watcher 感知到每個 Broker 的實時數據更新,然後從 ZooKeeper 中讀取並保存在 Leader 的緩存中,接着通過定時任務在內存中爲每個 Broker 計算短期和長期的歷史負載數據(TimeAverageBrokerData),最後更新到 ZooKeeper 中。其中短期負載信息應用於被動的快速決策,長期負載信息應用於穩定狀態下的決策。短期和長期的計算方法見官方介紹 [1],上報的頻率見如下配置:

# Interval to flush dynamic resource quota to ZooKeeper
loadBalancerResourceQuotaUpdateIntervalMinutes=15

上報頻率的代碼在 PulsarService 類的 startLeaderElectionService() 方法中。
上報動作的代碼在 ModularLoadManagerImpl 類的 writeBundleDataOnZooKeeper() 方法中。

Bundle 歷史負載信息上報

Bundle 的歷史負載信息(TimeAverageMessageData)獲取、計算和上報頻率與上面 Broker 的一致。

均衡策略

均衡策略是 Broker 依據負載信息判定哪個 Broker 上哪些 Bundle 需要被卸載的方法,通過對 Bundle 的卸載達到整個集羣的整體負載更加均衡。當前 Pulsar Broker 支持三種負載策略,分別是 OverloadShedder、ThresholdShedder 和 UniformLoadShedder。下面分別進行介紹。
涉及負載均衡的全局參數配置如下(在broker.conf中):

# Enable load balancer
loadBalancerEnabled=true

# Enable/disable automatic bundle unloading for load-shedding
loadBalancerSheddingEnabled=true

# Load shedding interval. Broker periodically checks whether some traffic should be offload from
# some over-loaded broker to other under-loaded brokers
loadBalancerSheddingIntervalMinutes=1

# Prevent the same topics to be shed and moved to other broker more than once within this timeframe
loadBalancerSheddingGracePeriodMinutes=30

OverloadShedder

OverloadShedder 策略是根據單個 Broker 的負載使用率來判斷是否超負荷,這種策略的目的是給高負載的 Broker 降低負載。 

原理:

當某個 Broker 的 CPU、網絡帶寬 IN/OUT、內存和 Direct 內存中任何一個指標的當前負載使用率超過了loadBalancerBrokerOverloadedThresholdPercentage設置的比例,將在該 Broker 上卸載一個 Bundle。

被卸載的最小吞吐量 =(當前負載使用率 - loadBalancerBrokerOverloadedThresholdPercentage / 100.0 + 5%)*(該 Broker 當前吞吐量 IN + 該 Broker 當前吞吐量 OUT)。

遍歷該 Broker 上所有 Bundle,獲取被卸載的 Bundle 列表,只有滿足如下條件的 Bundle 纔會被卸載:

在這個策略中,只有在某個 Broker 超負載時纔會卸載 Bundle,總體上確保 Broker 少卸載 Bundle,從而保障系統的平穩運行。如果在生產環境使用該策略,需要觀察多個節點負載是否均勻分佈,如果均勻分佈,則會出現多個節點幾乎同時到達門限而導致負載反覆被調整的情況。

配置:

在 Pulsar 2.10.0 版本之前,默認策略爲 OverloadShedder。

# load shedding strategy, support OverloadShedder and ThresholdShedder, default is ThresholdShedder since 2.10.0
loadBalancerLoadSheddingStrategy=org.apache.pulsar.broker.loadbalance.impl.OverloadShedder

在該策略下的門限配置:

# Usage threshold to determine a broker as over-loaded
loadBalancerBrokerOverloadedThresholdPercentage=85

ThresholdShedder

ThresholdShedder 策略是根據 Broker 的負載與所有Broker的平均負載 + 門限 比較來判斷是否超負荷。這種策略的目的是讓集羣中的每個 Broker 的負載達到最平衡的狀態。

原理:

    1. 根據每個 Broker 的 LoadData 獲取集羣 Broker 的平均負載,計算方式爲:集羣 Broker 的平均負載 = 每個集羣的負載求和 / Broker 數 其中,每個 Broker 的負載計算如下:每個 Broker 的負載 = (歷史資源使用率 * 歷史資源使用率佔比 loadBalancerHistoryResourcePercentage + 當前資源使用率 * (1 - 歷史資源使用率佔比 loadBalancerHistoryResourcePercentage) 當前資源使用率取各個資源裏面的最大值,計算如下:Broker 的當前資源使用率 = Max(CPU * CPU 權重,Heap 內存 * Heap 內存權重,Direct 內存 * Direct 內存權重,BandwithIn * BandwithIn 權重,BandwithOut * BandwithOut 權重) 被卸載的最小吞吐量 =(當前負載使用率 - loadBalancerBrokerThresholdShedderPercentage/ 100.0 + 5%)*(該 Broker 當前吞吐量 IN + 該 Broker 當前吞吐量 OUT)。
    1. 遍歷所有 Broker,如果 Broker 的資源使用率 > (集羣 Broker 的平均負載 + 負載門限 loadBalancerBrokerThresholdShedderPercentage),則該 Broker 被加入卸載備選列表中。
    1. 遍歷備選 Broker 中所有的 Bundle,獲取被卸載的 Bundle 列表,當滿足以下條件時纔會被卸載,且一次只卸載一個 Bundle。

實現代碼在 ThresholdShedder 類的 findBundlesForUnloading() 方法中,在 Bundle 的選擇過程中要對 Bundle 按照 shortTermData 中 msgThroughputIn 和 msgThroughputOut 之和從大到小排序,卸載前面 N 個最大的 Bundle 吞吐,直到滿足被卸載的最小吞吐量。

配置:

從 Pulsar 2.10.0 版本開始,默認策略爲 ThresholdShedder。配置如下:

# load shedding strategy, support OverloadShedder and ThresholdShedder, default is ThresholdShedder since 2.10.0
loadBalancerLoadSheddingStrategy=org.apache.pulsar.broker.loadbalance.impl.ThresholdShedder

如上所述,在該策略下,滿足 Broker 的資源使用超過所有 Broker 平均資源的使用率 + 門限的條件時會觸發 Bundle 的卸載,門限的配置如下:

# The broker resource usage threshold.
# When the broker resource usage is greater than the pulsar cluster average resource usage,
# the threshold shedder will be triggered to offload bundles from the broker.
# It only takes effect in the ThresholdShedder strategy.
loadBalancerBrokerThresholdShedderPercentage=10

其他幾個影響計算的參數分別爲:歷史資源使用率的佔比、帶寬 IN 的計算權重、帶寬 OUT 的計算權重、CPU 使用計算權重、Heap 內存使用計算權重和 Direct 內存使用計算權重,最後一個參數是爲了避免 Bundle 被反覆卸載增加的 Bundle 最小吞吐門限。

# When calculating new resource usage, the history usage accounts for.
# It only takes effect in the ThresholdShedder strategy.
loadBalancerHistoryResourcePercentage=0.9

# The BandWithIn usage weight when calculating new resource usage.
# It only takes effect in the ThresholdShedder strategy.
loadBalancerBandwithInResourceWeight=1.0

# The BandWithOut usage weight when calculating new resource usage.
# It only takes effect in the ThresholdShedder strategy.
loadBalancerBandwithOutResourceWeight=1.0

# The CPU usage weight when calculating new resource usage.
# It only takes effect in the ThresholdShedder strategy.
loadBalancerCPUResourceWeight=1.0

# The heap memory usage weight when calculating new resource usage.
# It only takes effect in the ThresholdShedder strategy.
loadBalancerMemoryResourceWeight=1.0

# The direct memory usage weight when calculating new resource usage.
# It only takes effect in the ThresholdShedder strategy.
loadBalancerDirectMemoryResourceWeight=1.0

# Bundle unload minimum throughput threshold (MB), avoiding bundle unload frequently.
# It only takes effect in the ThresholdShedder strategy.
loadBalancerBundleUnloadMinThroughputThreshold=10

UniformLoadShedder

UniformLoadShedder 策略是根據 msgRate 最高的 Broker 和 msgRate 最低的 Broker 之間的比率,最大吞吐量與最小吞吐量之比,與配置的門限進行對比來判斷是否超負荷。該策略傾向於在所有 Broker 之間均勻分配負載。該策略只關注 msgRate 和吞吐量指標。

原理:

該策略基本實現思路如下:

    1. 找出需要卸載 Bundle 的 Broker。檢查 msgRate 最高的 Broker 和 msgRate 最低的 Broker 之間的比率和最大吞吐量與最小吞吐量之比。如果 msgRate 的比率高於配置的閾值 loadBalancerMsgRateDifferenceShedderThreshold 或吞吐量之間的比例高於配置的 loadBalancerMsgThroughputMultiplierDifferenceShedderThreshold,則需要對 Bundle 進行卸載。滿足上述條件的最大 msgRate 或者最大吞吐量的 Broker 爲需要卸載 Bundle 的 Broker。

msgRate 比率和吞吐量比例的計算方法:
msgRate 比率 =(最大 msgRate - 最小 msgRate)/ 最小 msgRate * 100
吞吐量比例 = 最大吞吐量 / 最小吞吐量
其中:
msgRate=MsgRateIn() + MsgRateOut();
throughput = MsgThroughputIn()+MsgThroughputOut();

實現代碼在UniformLoadShedder.findBundlesForUnloading()中,部分代碼如下:

double msgRateDifferencePercentage = ((maxMsgRate.getValue() - minMsgRate.getValue()) * 100)(minMsgRate.getValue());

double msgThroughputDifferenceRate = maxThroughputRate.getValue() / minThroughputRate.getValue();

// if the threshold matches then find out how much load needs to be unloaded by considering number of msgRate and throughput.

boolean isMsgRateThresholdExceeded = conf.getLoadBalancerMsgRateDifferenceShedderThreshold() > 0 && msgRateDifferencePercentage > conf.getLoadBalancerMsgRateDifferenceShedderThreshold();

boolean isMsgThroughputThresholdExceeded = conf.getLoadBalancerMsgThroughputMultiplierDifferenceShedderThreshold() > 0 && msgThroughputDifferenceRate > conf.getLoadBalancerMsgThroughputMultiplierDifferenceShedderThreshold();

if (isMsgRateThresholdExceeded || isMsgThroughputThresholdExceeded) {
   if (log.isDebugEnabled()) {
log.debug( "Found bundles for uniform load balancing. ""overloaded broker {} with (msgRate,throughput)= ({},{}) ""and underloaded broker {} with (msgRate,throughput)= ({},{})",
overloadedBroker.getValue(), maxMsgRate.getValue(), maxThroughputRate.getValue(),
underloadedBroker.getValue(), minMsgRate.getValue(), minThroughputRate.getValue());
   }
    1. 找出需要卸載的 Bundle,在 Bundle 的選擇過程中要對 Bundle 按照 shortTermData 中 msgRate 和吞吐量進行從大到小排序。如果 msgRate 超過門限,按照 msgRateIn 和 msgRateOut 之和進行排序;如果吞吐量超過門限,按照 ThroughputIn 和 msgThroughputOut 之和進行排序,卸載前面 N 個最大的 Bundle 的 msgRate 或吞吐量,直到滿足被卸載的最小 msgRate 或吞吐量。當滿足以下條件時該 Bundle 將被卸載:

配置:

要使用該 UniformLoadShedder 策略,需在調度策略中配置如下:

loadBalancerLoadSheddingStrategy=org.apache.pulsar.broker.loadbalance.impl.UniformLoadShedder

門限配置如下:

    1. Broker 的最大消息吞吐率大於 Broker 的最小消息吞吐率的比率 ((Max-Min)/Min) 超過比率門限 loadBalancerMsgRateDifferenceShedderThreshold。
# Message-rate percentage threshold between highest and least loaded brokers for
# uniform load shedding. (eg: broker1 with 50K msgRate and broker2 with 30K msgRate
# will have 66% msgRate difference and load balancer can unload bundles from broker-1
# to broker-2)
loadBalancerMsgRateDifferenceShedderThreshold=50
    1. Broker 的最大消息吞吐率和 Broker 的最小吞吐率的倍數 (Max/Min) 超過倍數門限 loadBalancerMsgThroughputMultiplierDifferenceShedderThreshold。
# Message-throughput threshold between highest and least loaded brokers for
# uniform load shedding. (eg: broker1 with 450MB msgRate and broker2 with 100MB msgRate
# will have 4.5 times msgThroughout difference and load balancer can unload bundles
# from broker-1 to broker-2)
loadBalancerMsgThroughputMultiplierDifferenceShedderThreshold=4

Bundle 卸載

從以上步驟中得到被卸載的 Bundle 列表後,很容易根據 Bundle 的名稱得到對應的 Namespace 名稱,因爲 Bundle 的名稱中含有 Namespace 信息,如:property/public/default/0x00000000_0xFFFFFFFF。最後使用管理流 REST API 卸載 Bundle,從而完成了一次完整的負載均衡過程。

部分實現代碼(ModularLoadManagerImpl.java)如下:

bundlesToUnload.asMap().forEach((broker, bundles) -> {
   bundles.forEach(bundle -> {
       final String namespaceName = LoadManagerShared.getNamespaceNameFromBundleName(bundle);  //get namespace name from bundle.
       final String bundleRange = LoadManagerShared.getBundleRangeFromBundleName(bundle);
       if (!shouldAntiAffinityNamespaceUnload(namespaceName, bundleRange, broker)) {
           return;
       }

       log.info("[{}] Unloading bundle: {} from broker {}",strategy.getClass().getSimpleName(), bundle, broker);
       try {
           pulsar.getAdminClient().namespaces().unloadNamespaceBundle(namespaceName, bundleRange);       //unload bundle using REST API.
           loadData.getRecentlyUnloadedBundles().put(bundle, System.currentTimeMillis());
       } catch (PulsarServerException | PulsarAdminException e) {
           log.warn("Error when trying to perform load shedding on {} for broker {}", bundle, broker, e);
       }
   });

實測

爲了更容易理解前兩種負載均衡在 Broker 中的實現過程,下面分別進行測試。

OverloadShedder

測試環境:

K8s 部署,3 個 Broker 節點,在 Toolset 中通過 Pulsar-perf 發起 10000 條 / s 的生產消息任務。

配置(broker.conf):

loadBalancerLoadSheddingStrategy=org.apache.pulsar.broker.loadbalance.impl.OverloadShedder
loadBalancerBrokerOverloadedThresholdPercentage=10        #配置較低是爲了更容易觸發均衡。

日誌:

  1. 1. 從日誌中可以看出 Broker0、Broker1 和 Broker2 的 Memory 負載分別爲 49.24%、69.15% 和 73.88%,都超過了負載門限配置 10%,全部滿足卸載的條件;

    1. 再對 Broker 上的 Bundle 進行一一判斷,從日誌上看,Broker0 和 Broker2 都只有一個 Bundle,最後滿足條件的 Bundle 爲 Broker1 上的 public/default/0x20000000_0x30000000,把該 Bundle 從 Broker1 上卸載到其他 Broker 上。詳細日誌如下:
2022-11-18T10:44:22,366+0000 [pulsar-load-manager-1-1] INFO  org.apache.pulsar.broker.loadbalance.impl.OverloadShedder - Attempting to shed load on test1-sn-platform-broker-2.test1-sn-platform-broker-headless.pulsar1.svc.cluster.local:8080, which has resource usage 73.88626337051392% above threshold 10.0% -- Offloading at least 8.293027558988957 MByte/s of traffic (cpu: 0.75%, memory: 73.89%, directMemory: 4.69%, bandwidthIn: 0.00%, bandwidthOut: 0.00%)
2022-11-18T10:44:22,366+0000 [pulsar-load-manager-1-1] WARN  org.apache.pulsar.broker.loadbalance.impl.OverloadShedder - HIGH USAGE WARNING : Sole namespace bundle public/default/0xc0000000_0xd0000000 is overloading broker test1-sn-platform-broker-2.test1-sn-platform-broker-headless.pulsar1.svc.cluster.local:8080. No Load Shedding will be done on this broker
2022-11-18T10:44:22,366+0000 [pulsar-load-manager-1-1] INFO  org.apache.pulsar.broker.loadbalance.impl.OverloadShedder - Attempting to shed load on test1-sn-platform-broker-1.test1-sn-platform-broker-headless.pulsar1.svc.cluster.local:8080, which has resource usage 69.15156841278076% above threshold 10.0% -- Offloading at least 15.313356030795111 MByte/s of traffic (cpu: 5.20%, memory: 69.15%, directMemory: 3.13%, bandwidthIn: 0.00%, bandwidthOut: 0.00%)
2022-11-18T10:44:22,366+0000 [pulsar-load-manager-1-1] INFO  org.apache.pulsar.broker.loadbalance.impl.OverloadShedder - Attempting to shed load on test1-sn-platform-broker-0.test1-sn-platform-broker-headless.pulsar1.svc.cluster.local:8080, which has resource usage 49.24582540988922% above threshold 10.0% -- Offloading at least 0.0 MByte/s of traffic (cpu: 0.69%, memory: 49.25%, directMemory: 3.13%, bandwidthIn: 0.00%, bandwidthOut: 0.00%)
2022-11-18T10:44:22,366+0000 [pulsar-load-manager-1-1] WARN  org.apache.pulsar.broker.loadbalance.impl.OverloadShedder - HIGH USAGE WARNING : Sole namespace bundle my-tenant1/my-ns1/0x80000000_0xc0000000 is overloading broker test1-sn-platform-broker-0.test1-sn-platform-broker-headless.pulsar1.svc.cluster.local:8080. No Load Shedding will be done on this broker

2022-11-18T10:44:22,366+0000 [pulsar-load-manager-1-1] INFO  org.apache.pulsar.broker.loadbalance.impl.ModularLoadManagerImpl - [OverloadShedder] Unloading bundle: public/default/0x20000000_0x30000000 from broker test1-sn-platform-broker-1.test1-sn-platform-broker-headless.pulsar1.svc.cluster.local:8080
2022-11-18T10:44:22,368+0000 [pulsar-web-37-4] INFO  org.apache.pulsar.broker.admin.impl.NamespacesBase - [null] Unloading namespace bundle public/default/0x20000000_0x30000000
2022-11-18T10:44:22,370+0000 [pulsar-web-37-4] INFO  org.eclipse.jetty.server.RequestLog - 172.17.0.49 - - [18/Nov/2022:10:44:22 +0000] "PUT /admin/v2/namespaces/public/default/0x20000000_0x30000000/unload HTTP/1.1" 307 0 "-" "Pulsar-Java-v2.9.3.7" 2

從以上日誌打印可以看出首先是三個 Broker 的負載超過了配置的門限值,然後選擇滿足條件的 Bundle,最後卸載該 Bundle。

監控:

從下圖可以看到,在箭頭指示時刻發生了 Bundle 從 Broker1 卸載。

ThresholdShedder

測試環境: K8s 部署,3 個 Broker 節點,在 toolset 中通過 pulsar-perf 發起 10000 條 / s 的生產消息任務。

配置(broker.conf):

loadBalancerLoadSheddingStrategy=org.apache.pulsar.broker.loadbalance.impl.ThresholdShedder
loadBalancerBrokerThresholdShedderPercentage=3    #爲了更容易觸發
loadBalancerBundleUnloadMinThroughputThreshold=0.1    #爲了更容易觸發
loadBalancerHistoryResourcePercentage=0     #爲了簡化計算

日誌:

  1. 1. 從日誌中可以看出 Broker0、Broker1 和 Broker2 的當前負載分別爲 38%、51% 和 36%,平均爲 42%,觸發門限配置爲 3%,3 個 Broker 當前使用率-平均使用率 分別爲:-4%、9% 和 - 6%,所以只有 Broker1 超過了負載門限配置 3%,滿足卸載的條件;

    1. 卸載的最小數據量 =(當前資源的吞吐 IN + 當前資源的吞吐 OUT)*(Broker 的當前使用率 - 平均使用率 - 門限 + 5% 的冗餘),日誌顯示爲 0.1143MB/s。
    1. 最後滿足卸載條件的 Bundle 是 Broker1 上的 public/default/0xd0000000_0xe0000000。
2022-11-19T15:35:01,971+0000 [pulsar-load-manager-1-1] INFO  org.apache.pulsar.broker.loadbalance.impl.ThresholdShedder - test1-sn-platform-broker-2.test1-sn-platform-broker-headless.pulsar1.svc.cluster.local:8080 broker load: historyUsage=56%, resourceUsage=36%
2022-11-19T15:35:01,971+0000 [pulsar-load-manager-1-1] INFO  org.apache.pulsar.broker.loadbalance.impl.ThresholdShedder - test1-sn-platform-broker-1.test1-sn-platform-broker-headless.pulsar1.svc.cluster.local:8080 broker load: historyUsage=50%, resourceUsage=51%
2022-11-19T15:35:01,971+0000 [pulsar-load-manager-1-1] INFO  org.apache.pulsar.broker.loadbalance.impl.ThresholdShedder - test1-sn-platform-broker-0.test1-sn-platform-broker-headless.pulsar1.svc.cluster.local:8080 broker load: historyUsage=49%, resourceUsage=38%
2022-11-19T15:35:01,971+0000 [pulsar-load-manager-1-1] INFO  org.apache.pulsar.broker.loadbalance.impl.ThresholdShedder - brokers' resource avgUsage:42%
2022-11-19T15:35:01,971+0000 [pulsar-load-manager-1-1] INFO  org.apache.pulsar.broker.loadbalance.impl.ThresholdShedder - [test1-sn-platform-broker-2.test1-sn-platform-broker-headless.pulsar1.svc.cluster.local:8080] broker is not overloaded, ignoring at this point, currentUsage:36%
2022-11-19T15:35:01,971+0000 [pulsar-load-manager-1-1] INFO  org.apache.pulsar.broker.loadbalance.impl.ThresholdShedder - Attempting to shed load on test1-sn-platform-broker-1.test1-sn-platform-broker-headless.pulsar1.svc.cluster.local:8080, which has max resource usage above avgUsage  and threshold 0.5125840759277344% > 0.4212801106770833% + 0.03% -- Offloading at least 0.11431703679843176 MByte/s of traffic, left throughput 0.9127536209359574 MByte/s
2022-11-19T15:35:01,971+0000 [pulsar-load-manager-1-1] INFO  org.apache.pulsar.broker.loadbalance.impl.ThresholdShedder - [test1-sn-platform-broker-0.test1-sn-platform-broker-headless.pulsar1.svc.cluster.local:8080] broker is not overloaded, ignoring at this point, currentUsage:38%
2022-11-19T15:35:01,971+0000 [pulsar-load-manager-1-1] INFO  org.apache.pulsar.broker.loadbalance.impl.ModularLoadManagerImpl - [ThresholdShedder] Unloading bundle: public/default/0xd0000000_0xe0000000 from broker test1-sn-platform-broker-1.test1-sn-platform-broker-headless.pulsar1.svc.cluster.local:8080
2022-11-19T15:35:01,976+0000 [pulsar-web-37-4] INFO  org.apache.pulsar.broker.admin.impl.NamespacesBase - [null] Unloading namespace bundle public/default/0xd0000000_0xe0000000
2022-11-19T15:35:01,980+0000 [pulsar-web-37-4] INFO  org.eclipse.jetty.server.RequestLog - 172.17.0.35 - - [19/Nov/2022:15:35:01 +0000] "PUT /admin/v2/namespaces/public/default/0xd0000000_0xe0000000/unload HTTP/1.1" 307 0 "-" "Pulsar-Java-v2.9.3.7" 4

監控:

從下圖可以看出,在箭頭指示時刻發生了 Bundle 從 Broker1 卸載。

上面對 OverloadShedder 和 ThresholdShedder 兩種負載均衡策略進行了測試,我們人爲配置較低的門限便於觸發負載均衡,從日誌的打印可以看出,pulsar-load-manager 會根據檢測到的負載跟配置的負載門限進行對比,從而選擇出負載比較高的 Broker 和 Broker 中的 Bundle,最後通過管理流 admin 進行 Bundle 的卸載,達到多個 Broker 之間負載均衡的目的。

總結

本文只是從理論上對 Pulsar Broker 的負載均衡進行了闡述,而且僅限於基於負載均衡的 Bundle 自動卸載,在我們實際生產中,需要多觀察實際的資源負載情況,根據實際情況選擇合適的均衡策略,對門限進行合理的配置和優化,從而達到負載均衡的最佳效果。

參考:

  1. 1. https://pulsar.apache.org/docs/next/develop-load-manager

  2. 2. https://pulsar.apache.org/docs/next/administration-load-balance

    1. 《深入解析 Apache Pulsar》林琳著,電子工業出版社。

引用鏈接

[1] 官方介紹: https://pulsar.apache.org/docs/next/develop-load-manager/#bundle-data

本文由 Readfog 進行 AMP 轉碼,版權歸原作者所有。
來源https://mp.weixin.qq.com/s/7Y1RmoLg47_RpQstKRZICg