常用的限流框架都在這裏了!
作者:fredalxin
地址:https://fredal.xin/netflix-concuurency-limits
作爲應對高併發的手段之一,限流並不是一個新鮮的話題了。從 Guava 的 Ratelimiter 到 Hystrix,以及 Sentinel 都可作爲限流的工具。
自適應限流
一般的限流常常需要指定一個固定值 (qps) 作爲限流開關的閾值,這個值一是靠經驗判斷,二是靠通過大量的測試數據得出。但這個閾值,在流量激增、系統自動伸縮或者某某 commit 了一段有毒代碼後就有可能變得不那麼合適了。並且一般業務方也不太能夠正確評估自己的容量,去設置一個合適的限流閾值。
而此時自適應限流就是解決這樣的問題的,限流閾值不需要手動指定,也不需要去預估系統的容量,並且閾值能夠隨着系統相關指標變化而變化。
自適應限流算法借鑑了 TCP 擁塞算法,根據各種指標預估限流的閾值,並且不斷調整。大致獲得的效果如下:
TCP Vegas
vegas 是一種主動調整 cwnd 的擁塞控制算法,主要是設置兩個閾值 alpha 和 beta,然後通過計算目標速率和實際速率的差 diff,再比較差 diff 與 alpha 和 beta 的關係,對 cwnd 進行調節。僞代碼如下:
diff = cwnd*(1-baseRTT/RTT)
if (diff < alpha)
set: cwnd = cwnd + 1
else if (diff >= beta)
set: cwnd = cwnd - 1
else
set: cwnd = cwnd
其中 baseRTT 指的是測量的最小往返時間,RTT 指的是當前測量的往返時間,cwnd 指的是當前的 TCP 窗口大小。通常在 tcp 中 alpha 會被設置成 2-3,beta 會被設置成 4-6。這樣子,cwnd 就保持在了一個平衡的狀態。
netflix-concuurency-limits
concuurency-limits 是 netflix 推出的自適應限流組件,借鑑了 TCP 相關擁塞控制算法,主要是根據請求延時,及其直接影響到的排隊長度來進行限流窗口的動態調整。
alpha , beta & threshold
vegas 算法實現在了 VegasLimit 類中。先看一下初始化相關代碼:
private int initialLimit = 20;
private int maxConcurrency = 1000;
private MetricRegistry registry = EmptyMetricRegistry.INSTANCE;
private double smoothing = 1.0;
private Function<Integer, Integer> alphaFunc = (limit) -> 3 * LOG10.apply(limit.intValue());
private Function<Integer, Integer> betaFunc = (limit) -> 6 * LOG10.apply(limit.intValue());
private Function<Integer, Integer> thresholdFunc = (limit) -> LOG10.apply(limit.intValue());
private Function<Double, Double> increaseFunc = (limit) -> limit + LOG10.apply(limit.intValue());
private Function<Double, Double> decreaseFunc = (limit) -> limit - LOG10.apply(limit.intValue());
這裏首先定義了一個初始化值 initialLimit 爲 20,以及極大值 maxConcurrency1000。其次是三個 閾值函數 alphaFunc,betaFunc 以及 thresholdFunc。最後是兩個增減函數 increaseFunc 和 decreaseFunc。函數都是基於當前的併發值 limit 做運算的。
-
alphaFunc 可類比 vegas 算法中的 alpha,此處的實現是 3*log limit。limit 值從初始 20 增加到極大 1000 時候,相應的 alpha 從 3.9 增加到了 9。
-
betaFunc 則可類比爲 vegas 算法中的 beta,此處的實現是 6*log limit。limit 值從初始 20 增加到極大 1000 時候,相應的 alpha 從 7.8 增加到了 18。
-
thresholdFunc 算是新增的一個函數,表示一個較爲初始的閾值,小於這個值的時候 limit 會採取激進一些的增量算法。這裏的實現是 1 倍的 log limit。mit 值從初始 20 增加到極大 1000 時候,相應的 alpha 從 1.3 增加到了 3。
這三個函數值可以認爲確定了動態調整函數的四個區間範圍。當變量queueSize = limit × (1 − RTTnoLoad/RTTactual)
落到這四個區間的時候應用不同的調整函數。
變量 queueSize
其中變量爲 queueSize,計算方法即爲limit × (1 − RTTnoLoad/RTTactual)
,爲什麼這麼計算其實稍加領悟一下即可。
我們把系統處理請求的過程想象爲一個水管,到來的請求是往這個水管灌水,當系統處理順暢的時候,請求不需要排隊,直接從水管中穿過,這個請求的 RT 是最短的,即 RTTnoLoad;
反之,當請求堆積的時候,那麼處理請求的時間則會變爲:排隊時間 + 最短處理時間,即 RTTactual = inQueueTime + RTTnoLoad。而顯然排隊的隊列長度爲 總排隊時間 / 每個請求的處理時間及queueSize = (limit * inQueueTime) / (inQueueTime + RTTnoLoad) = limit × (1 − RTTnoLoad/RTTactual)
。
再舉個栗子,因爲假設當前延時即爲最佳延時,那麼自然是不用排隊的,即 queueSize=0。而假設當前延時爲最佳延時的一倍的時候,可以認爲處理能力折半,100 個流量進來會有一半即 50 個請求在排隊,及 queueSize= 100 * (1 − 1/2)=50
。
動態調整函數
調整函數中最重要的即增函數與減函數。從初始化的代碼中得知,增函數 increaseFunc 實現爲limit+log limit
,減函數 decreaseFunc 實現爲limit-log limit
,相對來說增減都是比較保守的。
看一下應用動態調整函數的相關代碼:
private int updateEstimatedLimit(long rtt, int inflight, boolean didDrop) {
final int queueSize = (int) Math.ceil(estimatedLimit * (1 - (double)rtt_noload / rtt));
double newLimit;
// Treat any drop (i.e timeout) as needing to reduce the limit
// 發現錯誤直接應用減函數decreaseFunc
if (didDrop) {
newLimit = decreaseFunc.apply(estimatedLimit);
// Prevent upward drift if not close to the limit
} else if (inflight * 2 < estimatedLimit) {
return (int)estimatedLimit;
} else {
int alpha = alphaFunc.apply((int)estimatedLimit);
int beta = betaFunc.apply((int)estimatedLimit);
int threshold = this.thresholdFunc.apply((int)estimatedLimit);
// Aggressive increase when no queuing
if (queueSize <= threshold) {
newLimit = estimatedLimit + beta;
// Increase the limit if queue is still manageable
} else if (queueSize < alpha) {
newLimit = increaseFunc.apply(estimatedLimit);
// Detecting latency so decrease
} else if (queueSize > beta) {
newLimit = decreaseFunc.apply(estimatedLimit);
// We're within he sweet spot so nothing to do
} else {
return (int)estimatedLimit;
}
}
newLimit = Math.max(1, Math.min(maxLimit, newLimit));
newLimit = (1 - smoothing) * estimatedLimit + smoothing * newLimit;
if ((int)newLimit != (int)estimatedLimit && LOG.isDebugEnabled()) {
LOG.debug("New limit={} minRtt={} ms winRtt={} ms queueSize={}",
(int)newLimit,
TimeUnit.NANOSECONDS.toMicros(rtt_noload) / 1000.0,
TimeUnit.NANOSECONDS.toMicros(rtt) / 1000.0,
queueSize);
}
estimatedLimit = newLimit;
return (int)estimatedLimit;
}
動態調整函數規則如下:
-
當變量 queueSize < threshold 時,選取較激進的增量函數,newLimit = limit+beta
-
當變量 queueSize < alpha 時,需要增大限流窗口,選擇增函數 increaseFunc,即 newLimit = limit + log limit
-
當變量 queueSize 處於 alpha,beta 之間時候,limit 不變
-
當變量 queueSize 大於 beta 時候,需要收攏限流窗口,選擇減函數 decreaseFunc,即 newLimit = limit - log limit
平滑遞減 smoothingDecrease
注意到可以設置變量 smoothing,這裏初始值爲 1,表示平滑遞減不起作用。
如果有需要的話可以按需設置,比如設置 smoothing 爲 0.5 時候,那麼效果就是採用減函數 decreaseFunc 時候效果減半,實現方式爲newLimitAfterSmoothing = 0.5 newLimit + 0.5 limit
。
本文由 Readfog 進行 AMP 轉碼,版權歸原作者所有。
來源:https://mp.weixin.qq.com/s/rP6iSvmQlUYbZpoKJbV-qQ