億級高性能通知系統實踐

在一個公司中,消息通知系統是不可或缺的一部分,每個團隊都可能開發了一套獨自的消息通知組件,隨着公司業務團隊的日益增長,維護繁瑣、排查問題複雜、開發成本等問題就會凸顯出來。(例如我們的企微羣通知,由於消息內容不同模板不同,一個項目內使用的組件就有 3 種,還不包含其他通知部分。)

基於這樣的背景,我們就迫切需要開發一套通用的消息通知系統。那麼如何高效地處理大量的消息請求以及服務穩定性的保障,成爲了開發者需要面對的重要挑戰。本文將探討如何構建高性能的消息通知系統。

1 服務劃分

2 系統設計

2.1 首次消息發送

在接受消息發送請求的時候,一般會通過 RPC 服務請求和 MQ 消息消費進行處理,這兩種方式各有優缺點,RPC 這種方式,我們無需考慮消息的丟失問題,MQ 可以實現異步解耦、削峯填谷。

2.1.1 冪等性的處理

爲了防止接收到同樣的消息內容進行發送處理,我們通常會做一些冪等性的設計。冪等性的判斷有很多手段,比如先加鎖再查詢或利用數據庫的唯一主鍵等來實現,但其實在我們消息量很大的時候,查數據庫就有點慢了。因爲發送消息的這種場景,重複消息一般在短時間內發生的,一般不會有跨很多天來一筆已經發送過的消息,所以可以設計利用 Redis 來實現,先判斷是否有相同的 Redis Key,再判斷消息內容是否相同,有可能相同的 Redis Key,發送不同的消息內容,這種是允許的,具體看對應的業務需求。

    private boolean isDuplicate(MessageDto messageDto) {
        String redisKey = getRedisKey(messageDto);
        boolean isDuplicate = false;
        try {
            if (!RedisUtils.setNx(redisKey, messageDto, 30*60L)) {
                isDuplicate = true;
            }
            if (isDuplicate) {
                MessageDto oldDTO = RedisUtils.getObject(redisKey);
                if (Objects.equals(messageDto,oldDTO)) {
                    log.info("消息重複了");
                } else {
                    isDuplicate = false;
                }
            }
        } catch (Exception e) {
            isDuplicate = false;
        }
        return isDuplicate;
    }

2.1.2 問題服務動態發現器

上文提到路由器中的路由策略包括配置的路由策略動態服務異常自發現路由策略,其中動態服務異常自發現路由策略核心在於服務異常自發現,核心是依據問題服務動態發現器實現的,當我們發現某一個消息通道服務異常時可以自動路由採用異常通知執行器執行。

我們主要是藉助sentinel的 API 在各自節點 JVM 內實現的,針對設置的時間窗口請求的總次數失敗的總次數進行統計,達到設定值,就認爲請求的服務有問題了,認定其爲異常服務。核心主要是以下兩個方法,其中loadExecuteHandlerRules方法主要是對流控規則的設定,我們可以通過 Apollo 或 Nacos 進行動態的修改,judge方法是對請求和失敗的攔截,判斷允許正常訪問,一旦攔截後就認爲是異常服務,在內存中進行標記記錄,後續請求通過異常執行器執行處理。

當我們看到這兒會不會有疑問,問題服務在啥時候會恢復正常呢,難道服務出現一次問題,就一直被認定爲問題服務了?當時不是的,我們也設計了類似熔斷器那樣的自動恢復功能,在判斷爲問題服務後會經過一段時間的靜默期靜默期內所有對該服務的請求都走異常通知器的執行流程,當靜默期過後,此時到達了半熔斷期,就是如果訪問正常的次數達到一定值後,就會恢復爲正常。

    //加載執行器的規則 durationInSec 時間窗口長度  requestCount  請求總量 failCount失敗總量
    public void loadExecuteHandlerRules(Long durationInSec,Long requestCount,Long failCount) {
        List<ParamFlowRule> rules = new ArrayList<>();
        //REQUEST_RESOURCE  請求資源 可自定義
        rules.add(ofParamFlowRule(REQUEST_RESOURCE, requestCount, durationInSec));
        //REQUEST_RESOURCE  失敗資源 可自定義
        rules.add(ofParamFlowRule(FAIl_RESOURCE, failCount, durationInSec));
        ParamFlowRuleManager.loadRules(rules);
    }

    public ParamFlowRule ofParamFlowRule(String resource, Long failCount, Long durationInSec) {
        ParamFlowRule rule = new ParamFlowRule();
        rule.setResource(FAIl_RESOURCE);
        rule.setGrade(RuleConstant.FLOW_GRADE_QPS);
        rule.setCount(failCount);
        rule.setDurationInSec(durationInSec);
        rule.setParamIdx(0);
        return rule;
    }
   //key 請求的標識key,可以是對應某一服務的標識,reqSuc 請求是否成功,false是失敗,true是成功
    public static boolean judge(String key, boolean reqSuc) {
        return isBlock(REQUEST_RESOURCE, reqSuc, key) && isBlock(FAIl_RESOURCE, reqSuc, key);
    }

    public Boolean isBlock(String resource, boolean reqSuc, String key) {
        boolean block = false;
        Entry failEntry = null;
        try {
            failEntry = entry(resource, EntryType.IN, reqSuc ? 0 : 1, key);
        } catch (BlockException e) {
            block = true;
        } finally {
            if (failEntry != null) {
                failEntry.exit();
            }
        }
        return block;
    }

2.1.3 sentinel 滑動窗口的實現原理 (環形數組)

根據傳入的時間窗口大小和數量,計算數組的數量,數組的下標就是windowsIdwindowsStart是每個數組的起始時間值。

例如:統計 1s 的請求量,設置兩個窗口,那麼每個窗口對應的 id 就是 0、1,相應的時間範圍就是 0m-500ms,500ms-1000ms。如果當前時間是 700ms,那麼對應的窗口 id=(700/500)%2=0, 對應的 windowStart=700-(700%500)=200,對應的起始就是 id 爲 0 的窗口;如果當前時間是 1200ms,對應的窗口 id=(1200/500)%2=0;對應的 windowStart=1200-(1200%500)=1000 大於 id=0 的起始時間,重置 id 爲 0 的窗口起始值,id=0 的位置不變。

2.1.4 線程池的動態調整

消息處理完成後,利用線程池進行異步發送,線程池分爲正常服務的線程池異常服務的線程池,至於爲啥設計不同的線程池,我們在下面穩定性設計方面闡述。線程池核心參數的設定一般會根據任務類型和 CPU 核數進行一個初始化的設定,後續我們一般會壓測來動態的調整來滿足我們的目標。那麼我們怎樣可以設計一個可以動態調整的線程池呢?

一般我們可以通過 Apollo 或 Nacos 等統一配置來動態修改線程池的參數,但是線程池的阻塞隊列長度是不允許修改的,當然我們可以自己自定義一個隊列來實現這樣的功能。接下來我們講述的這種設計,是不用通過自定義阻塞隊列的方式去實現的。

 ThreadPoolExecutor pool = new ThreadPoolExecutor(poolSize, poolSize,0L, TimeUnit.MILLISECONDS,new LinkedBlockingQueue<Runnable>());

我們直接定義了一個無界的線程池,核心線程數最大線程數相等,而且用的是默認的丟棄策略,那麼就有疑問了,這樣的線程池我們在使用的時候,會有內存溢出和消息的丟失風險,彆着急,我們繼續往下看。

   Notifier notifier = getNotifier();
   if (!notifier.isBusy()) {
        notifier.execute(msgContent);
    } 

  public boolean isBusy() {
      return notifyPool.getQueue().size() >= config.getMaxHandlerSize() * 2;
    }

在每次添加任務的時候會判斷線程池隊列中的任務是否達到設定的最大值,如果達到就不會繼續添加了,當前線程池處於繁忙狀態了,後續可以利用 MQ 落庫,之後通過重試任務進行發送了,也保證了永遠不會觸發線程池的拒絕策略。

2.2 重試消息發送

部分消息因爲系統達到瓶頸處理不過來或某些消息發送失敗需要重試,這些消息都可以通過任務重試來進行處理,當然利用這種方式也可以實現延遲消息的發送。

實現這種重試的消息機制可以利用分佈式定時任務調度框架,一般爲了提高重試效率,會採用分片廣播這種方式,自己做好消息重複發送的控制,我們也可以利用調度線程池來實現。

  public void init() {
        ScheduledExecutorService scheduledService = new ScheduledThreadPoolExecutor(taskRepository.size());
        for (Map.Entry<String, TaskHandler> entry : taskRepository.entrySet()) {
            final String taskName = entry.getKey();
            final TaskHandler handler = entry.getValue();
            scheduledService.scheduleAtFixedRate(new Runnable() {
                @Override
                public void run() {
                    try {
                        // 是否繁忙判斷
                        if (handler.isBusy()) {
                            return;
                        }
                        handleTask(taskName, handler);
                    } catch (Throwable e) {
                        logger.error(taskName + " task hanlder fail!", e);
                    }
                }
            }, 30, 5, TimeUnit.SECONDS);
        }
    }

每次進行任務撈取進行調度時,會首先判斷下當前 handler 是否繁忙,其實就是重試不同類型任務的線程池資源是否充足,如果不充足的話,即使撈取出來,也一直是排隊等待。

  public void handTask(String taskName, TaskHandler handler) {
        Lock lock = LockFactory.getLock(taskName);
        List<ScheduleTaskEntity> taskList = null;
        try {
            if (lock.tryLock()) {
                taskList = getTaskList(taskName, handler);
            }
        } finally {
            lock.unlock();
        }
        if (taskList == null) return;
        handler.handleData(taskList);
    }

爲了防止不同的節點處理相同的任務進行了加鎖控制,每次撈取的任務量是根據不同任務 handler 設置的量來確定的,撈取完成後發送至 MQ,然後採用線程池進行發送處理。

2.2.1 ES 與 MySQL 數據同步

由於發送消息的數據量,後臺在進行數據查詢時主要是通過 ES 進行查詢處理的,這就涉及到數據庫數據與 ES 數據一致性的問題。當然也可以採用分庫分表寬表等技術進行處理,分庫分表對一些非分片鍵的查詢可能不太友好。

ES 更新完成後修改數據庫狀態爲更新完成狀態,若此時通知記錄表還有更新,就會將同步狀態初始化,若修改數據庫爲 init 先於同步完成後的更新就會出現數據不一致的問題,所以每次同步時攜帶上數據庫中的update_time,大於等於 db 中的update_time纔會更新完成(其實update_time就是一個版本號)。

ES按月滾動建立索引,每月新建立的索引,標籤都是hot,新增的數據都會放入hot節點上進行存儲,到了第二月,通過定時任務將上月索引的 tag 修改爲cold,ES 集羣就會自動將數據遷移到標籤爲cold節點上(cold節點的性能一般配置都比較低,對性能要求並不高)。

3 穩定性的保障

上述一系列的設計是圍繞高性能進行考慮的,當然在穩定性方面我們也不能忽略,下述幾方面也是我們在穩定性方面的考慮。

3.1 流量突增

面對流量突增時做了兩層降級。當流量緩慢增大時,線程池繁忙後,利用MQ做了一次流量削峯異步落庫,後續定時任務處理發送,發送的延時時間是 0s;當流量陡增,用sentinel進行判斷,不經任何判斷直接MQ削峯落庫,後續消費是延遲消費的,待資源空閒才進行撈取處理。

3.2 問題服務的資源隔離

首先我們想想爲啥要做問題服務的隔離呢,不做會有什麼後果呢?設想一下如果不隔離,問題服務正常服務採用同一線程池資源進行處理,當問題服務請求請求耗時時間較長,線程釋放慢,會導致大量正常服務的消息不能及時進行處理,這樣就會導致問題服務影響到正常服務的消息處理,所以才需要做問題服務與正常服務的資源艙壁隔離

3.3 第三方服務的保護

正常的第三方服務一般都會做限流降級設置,防止服務被擊垮。如果一些開發水平欠缺的服務沒有做,就需要我們進行考慮了,一方面不能因爲我們的請求量較大,影響到別人服務,另一方面,我們的服務不能因爲第三方服務而引發問題,所以通常我們需要考慮進行熔斷處置。

3.4 中間件的容錯

在我們使用各種中間件時,也應該考慮的中間件的問題。比如公司 MQ 需要進行擴容升級,會使 MQ 宕機數秒,針對這種問題的容錯,在進行開發時也應儘可能的考慮設計到。

3.5 完善的監控體系

我們也應該建立完善的監控系統,來保障服務的穩定運行,能在問題擴散之前及時發現處理,能在問題發生後進行快速的處理,能在後期優化處理時提供輔助依據。

3.6 服務的雙活部署、彈性擴縮容

在運維層面,也應該考慮服務不同機房的部署,以保證服務的可用性,爲了應對流量的變化同時也基於成本的考慮,也可以基於服務的綜合指標進行彈性擴縮容

4 總結

任何一個系統的設計,我們都應該從服務架構、系統功能、穩定性保障等方面去進行考慮。如何具備良好的擴展性與容錯性,輕鬆應對各種複雜多變的業務場景也是我們面臨的設計挑戰。當然技術方案的設計從無萬全之策,亦不存在一勞永逸的‘銀彈’,所以需要結合具體的業務場景進行自己的思考與設計。

關於作者
趙培龍 採貨俠 JAVA 開發工程師

本文由 Readfog 進行 AMP 轉碼,版權歸原作者所有。
來源https://mp.weixin.qq.com/s/94zSJjCCeP5QnHKEL2oCTw