一致性緩存理論分析與技術實戰

0 前言

工程實踐場景中,我們通常用數據庫完成數據的持久存儲,而數據存儲側的性能調優也是一個永恆經典的話題. 在一些請求量大、讀多寫少的場景中,一種性能優化方式是考慮在數據庫之上添加一層緩存組件,這樣一方面能減輕數據庫的訪問壓力,一方面也能提升查詢操作的性能.

然而由於緩存(如 redis)和數據庫(如 mysql)是兩個獨立的存儲組件,在操作過程中無法在跨組件的基礎上保證 “事務” 的語義,因此不可避免地會面臨緩存數據與數據庫數據不一致的問題.

我們把上述問題稱爲 “緩存一致性” 問題,本篇將緊密圍繞該問題,針對設計緩存與數據庫的讀、寫流程進行串聯梳理,總結出能夠兼顧數據一致性與操作性能的執行方法論.

理論先行,實踐緊隨. 本篇所探討的內容,將統一通過 go 語言在我的開源項目 consistent_cache 中予以實踐驗證——開源項目傳送門:https://github.com/xiaoxuxiansheng/consistent_cache

1 理論分析

1.1 緩存一致性問題

緩存(cache) 相比於數據庫(db) 而言更加輕便快捷,但與之相對的也存在成本高、容量小、穩定性弱的問題,因此 cache 中的數據通常是依附於 db 中的持久化數據而存在,一筆數據的更新操作和存儲終態最終還是要以 db 爲準.

從這個角度出發來看,在寫操作密集的場景下,使用 cache 的收益並不明顯;然而在讀多寫少的場景中,我們通過將一些相對穩定不變的數據從 db 冗餘到 cache 中,由 cache 同一收口數據查詢能力,能夠很大程度提升查詢性能並降低 db 的訪問壓力.

隨之而來的首要問題就是 cache 中的數據如何與 db 保持一致. 展開來說就是,cache 和 db 本身是兩個獨立的存儲組件,跨組件間的數據一致性問題屬於分佈式事務的範疇,本身有一套解決的方案,但受限於其高昂的實現成本,因此不適用於絕大多數場景,這部分內容可以參考我之前發表的文章——萬字長文漫談分佈式事務實現原理.

在本篇中,我們更多將注意力集中在,如何針對數據的讀寫流程進行秩序地組織串聯,以此來滿足 cache 和 db 間數據的最終一致性,並儘可能追求即時一致性的語義.

1.2 讀寫流程串講

當前比較主流的緩存一致性實現思路中,針對讀、寫流程的職責進行如下拆解:

sy33cS

此處我們針對上述流程的設計原則通過 qa 形式進行簡要剖析:

question A:爲什麼寫流程不在寫完 db 後直接更新 cache 數據?

question B:爲什麼寫流程需要刪除 cache 數據?

question C:爲什麼寫流程需要先刪除 cache 再寫 db?

question D:上述流程是嚴謹的嗎?是否還存在哪些環節可能存在 cache 數據不一致問題?

這個問題留給各位讀者,請大家短暫駐步於此, 腦中思考該問題的答案,並帶着問題再進入到後續小節的閱讀內容中.

1.3 緩存雙刪策略

question D 的答案是否定的.

我們需要明白,在實際場景中,一系列讀、寫流程是併發執行的,兩個流程下各執行步驟的相對次序可能因爲機器、網絡等不確定因素髮生變化. 此處我們就給出一個具體 badcase 加以分析說明:

數據庫中有一筆 kv 數據,key:a; value:b

{
  "key""a",
  "value""b"
}

此時,一筆讀操作和一筆寫操作併發啓動,

讀操作讀取 key:a 對應的數據:

get a

寫操作執行指令,期望將 key:a 對應的 value 更新爲 c

按照 1.2 小節中介紹的流程,下面我們進行讀寫流程的步驟串聯:

(1)moment1:寫流程刪除 cache 中的 key:a 數據

(2)moment2:讀流程讀 cache,發現 key:a 數據 miss,準備進一步讀 db

(3)moment3:讀流程讀 db,此時數據還是老版本,value 爲 b

(4)moment4:寫流程寫 db,將 key:a 對應 value 更新爲 c

(5)moment5:讀流程把讀到的 value:b 作爲 key:a 的映射數據寫入 cache

至此,讀、寫流程均結束了,然而現狀是,db 中 key:a 數據已經更新爲 c,然而 cache 中 key:a 對應的還是髒數據 b.

產生上述問題的本質原因就是,在併發場景下,步驟(1)寫流程刪除 cache 數據後,並無法阻止在(1)-(4)期間內,讀流程再次讀 db 並將髒數據遷移到 cache 中.

此處,我們首先想到的應對方式是緩存雙刪策略——即在寫流程寫 db 前後,分別執行一次刪除 cache 操作.

以上述案例加以說明,就是寫流程在 moment4 之後,額外增加一個 moment6,再一次將 cache 中 key:a 對應的數據刪除.

那麼,到此爲止,是否還存在 cache 數據不一致的可能性呢?

請大家短暫駐足思考,並帶着你的答案進入下一小節.

1.4 緩存延時雙刪策略

1.3 小節中,緩存雙刪中存在的問題其實很簡單,就在於我們無法保證,寫流程中,第二次刪除 cache 的動作一定能執行在讀流程寫 cache 的操作之後,也就是 moment5 和 moment6 兩個時刻的相對次序是不穩定的:

因此,我們進一步引入緩存延時雙刪策略. 這裏的 “延時” 就體現在,寫流程寫完 db 後,會等待指定時長,保證此期間可能持有髒數據的讀流程都完成寫 cache 操作後,再執行第二次的刪 cache 操作,以此來實現緩存數據的 “最終一致性” 語義.

緩存延時雙刪策略已經是可用於工程化落地的實現方案,其核心的讀寫流程串聯如下圖:

1.5 寫緩存禁用機制

然而,緩存延時雙刪機制同樣有其短板所在,核心分爲兩點:

在分佈式場景,我們往往需要在流程性能和數據一致性之間進行權衡取捨. 爲了進一步保證 cache 數據的強一致性語義,我們可以嘗試引入 “鎖機制”.

最簡單粗暴的實現方式,就是通過一把 key 維度的分佈式讀寫鎖,實現讀流程和寫流程的隔離. 這樣兩個流程下的執行步驟也就不會混淆在一起,上述問題自然也就得到了根除.

然而,該做法的代價卻是對讀操作性能的大幅度犧牲,針對某筆數據,一旦產生寫操作,那麼在此期間所有讀操作都要陷入阻塞等待的狀態,這並不是我們所樂於接受的.

在此基礎上,我們進一步對實現思路進行升級. 在 1.3-1.4 小節的思路演進過程中,我們發現導致 cache 數據不一致的罪魁禍首,本質上是讀流程把髒數據寫入 cache 的操作,而非宏觀意義上的整個讀流程.

因此,我們要做的不是完全隔離讀、寫流程,也不需要使用分佈式鎖這麼重的工具,而是退而求其次,針對一筆數據的維度啓用一個 “開關” 機制,能夠用於控制讀流程是否啓用寫 cache 機制即可:

通過上述的 “寫緩存禁用” 機制,就保證了數據的強一致性. 寫流程執行期間,該筆數據對應的讀流程不會阻塞,只是相當於 cache 機制被暫時屏蔽,讀流程需要統一從 db 中獲取最精確的數據而已.

此處值得一提的是,在寫流程完成寫 db 操作後,通過需要延遲一段時間再重新開啓該筆數據下的 “寫緩存機制”,其本質思路和緩存延時雙刪策略中 “延時” 的用意是一致的,就是避免在併發場景下,讀取到 db 髒數據的讀流程寫 cache 操作恰好發生在寫流程 “寫緩存機制” 啓用之後.

1.6 其他緩存相關問題

除此之外,在 cache 與 db 交互流程中,還存在幾個經典問題,也將在本篇中展開探討,並在技術實戰環節中對具體的應對策略進行展示:

倘若導致大量 cache key 在同一時刻過期,那麼這一瞬間紛湧而至的大量讀請求都會因爲 cache 數據 miss 而集體湧入到 db 中,導致 db 壓力陡增.

緩存雪崩問題的常用解決思路是切斷問題的直接導火索,對 cache key 的過期時間進行打散,比如可以在預設過期時間的基礎上,加上隨機擾動值,因此來避免大面積 cache key 同時失效的情形.

倘若讀操作頻繁請求 db 中不存在的數據,那麼該數據自然也無法寫入 cache,最終所有請求都會直擊 db,導致 db 壓力較大.

針對緩存穿透問題的解法之一,是存儲層之上額外封裝一層布隆過濾器,用於提前過濾大部分不存在的 key,具體技術原理可參見我之前發表的文章——布隆過濾器技術原理及應用實戰.

而在本篇中,我們將採用另一種解法:倘若在讀流程中發現數據在 db 中不存在,則同樣會寫 cache,但會針對該筆數據加以特殊標記,讓後續讀操作通過讀 cache 就能獲得到數據不存在的信息.

2 技術實戰

介紹理論基礎部分,下面展示一致性緩存服務項目 consistent_cach 的源碼實現,進入技術實戰環節.

2.1 架構

整個操作流程涉及到對緩存模塊 cache 和數據庫模塊 db 的操作,因此在實現上拆分出三個核心模塊:

一致性緩存服務 service 的實現代碼代碼位於 ./service.go,核心成員屬性包括其持有的緩存模塊 cache 和數據庫模塊 db 的實例引用:

type Service struct {
    opts  *Options // 策略參數
    cache Cache // 緩存模塊
    db    DB    // 數據庫模塊
}


// 構造一致性緩存服務. 緩存和數據庫均由使用方提供具體的實現版本
func NewService(cache Cache, db DB, opts ...Option) *Service {
    s := Service{
        cache: cache,
        db:    db,
        opts:  &Options{},
    }


    for _, opt := range opts {
        opt(s.opts)
    }


    repair(s.opts)
    return &s
}

針對緩存模塊抽象接口——Cache 的定義代碼位於 ./interface.go,其中包含如下核心方法:

var (
    // 數據在緩存中不存在
    ErrorCacheMiss    = errors.New("cache miss")
)


// 緩存模塊的抽象接口定義
type Cache interface {
    // 啓用某個 key 對應讀流程寫緩存機制(默認情況下爲啓用狀態)
    Enable(ctx context.Context, key string, delayMilis int64) error
    // 禁用某個 key 對應讀流程寫緩存機制
    Disable(ctx context.Context, key string, expireSeconds int64) error
    // 讀取 key 對應緩存
    Get(ctx context.Context, key string) (string, error)
    // 刪除 key 對應緩存
    Del(ctx context.Context, key string) error
    // 校驗某個 key 對應讀流程寫緩存機制是否啓用,倘若啓用則寫入緩存(默認情況下爲啓用狀態)
    PutWhenEnable(ctx context.Context, key, value string, expireSeconds int64) (bool, error)
}

針對數據庫模塊定義了抽象的 interface——DB:

// 數據庫中不存在數據
var ErrorDBMiss       = errors.New("db miss")


// 數據庫模塊的抽象接口定義
type DB interface {
    // 數據寫入數據庫
    Put(ctx context.Context, obj Object) error
    // 從數據庫讀取數據
    Get(ctx context.Context, obj Object) error
}

針對一條數據記錄,使用抽象 interface——Object 進行抽象,其需要實現幾個方法:

// 每次讀寫操作時,操作的一筆數據記錄
type Object interface {
    // 獲取 key 對應的字段名
    KeyColumn() string
    // 獲取 key 對應的值
    Key() string


    // 將 object 序列化成字符串
    Write() (string, error)
    // 讀取字符串內容,反序列化到 object 實例中
    Read(body string) error
}

2.2 緩存

在 consistent_cache 項目中,基於 redis 實現了一個緩存模塊:

redis 緩存模塊實現類代碼位於 ./redis/cache.go:

// redis 實現版本的緩存模塊
type Cache struct {
    // 抽象的客戶端模塊. 實現版本爲 redis 客戶端
    client Client
}


// 構造器函數
func NewRedisCache(config *Config) *Cache {
    return &Cache{client: NewRClient(config)}
}

其中各核心方法的源碼內容展示如下:

// 啓用某個 key 對應讀流程寫緩存機制(默認情況下爲啓用狀態)
func (c *Cache) Enable(ctx context.Context, key string, delayMilis int64) error {
    // redis 中刪除 key 對應的 disable key. 只要 disable key 標識不存在,則讀流程寫緩存機制視爲啓用狀態
    // 給 disable key 設置一個相對較短的過期時間
    return c.client.PExpire(ctx, key, delayMilis)
}


// 禁用某個 key 的讀流程寫緩存機制
func (c *Cache) Disable(ctx context.Context, key string, expireSeconds int64) error {
    // redis 中設置 key 對應的 disable key. 只要 disable key 標識存在,則讀流程寫緩存機制視爲禁用狀態
    return c.client.SetEx(ctx, c.disableKey(key), "1", expireSeconds)
}


// 讀取 key 對應緩存內容
func (c *Cache) Get(ctx context.Context, key string) (string, error) {
    // 從 redis 中讀取 kv 對
    reply, err := c.client.Get(ctx, key)
    if err != nil && !errors.Is(err, redis.ErrNil) {
        return "", err
    }
    // 倘若緩存中不存在數據,返回指定錯誤 ErrorCacheMiss
    if errors.Is(err, redis.ErrNil) {
        return "", consistent_cache.ErrorCacheMiss
    }
    return reply, nil
}


// 刪除 key 對應緩存
func (c *Cache) Del(ctx context.Context, key string) error {
    // 從 reids 中刪除 kv 對
    return c.client.Del(ctx, key)
}

其中,在 putWhenEnable 方法中,因爲需要對多個指令進行原子化執行,因此涉及到對 lua 腳本的使用:

// 校驗某個 key 對應讀流程寫緩存機制是否啓用,倘若啓用則寫入緩存(默認情況下爲啓用狀態)
func (c *Cache) PutWhenEnable(ctx context.Context, key, value string, expireSeconds int64) (bool, error) {
    // 運行 redis lua 腳本,保證只有在 disable key 不存在時,纔會執行 key 的寫入
    reply, err := c.client.Eval(ctx, LuaCheckEnableAndWriteCache, 2, []interface{}{
        c.disableKey(key),
        key,
        value,
        expireSeconds,
    })
    if err != nil {
        return false, err
    }
    return cast.ToInt(reply) == 1, nil
}

對應 lua 腳本代碼如下,位於 ./redis.lua.go:

const (
    // 通過 lua 腳本確保在 disable key 不存在時,才執行 key value 對寫入
    LuaCheckEnableAndWriteCache = `
    local disable_key = KEYS[1];
    local disable_flag = redis.call("get",disable_key);
    if disable_flag then
        return 0;
    end
    local key = KEYS[2];
    local value = ARGV[1];
    redis.call("set",key,value);
    local cache_expire_seconds = tonumber(ARGV[2]);
    redis.call("expire",key,cache_expire_seconds);
    return 1;
`
)

值得一提的是,由於在 putWhenEnable 對應 lua 腳本中,涉及到對數據 key 以及與其一一一應的 disableKey 的操作,因此需要通過 hash tag 保證這兩個 key 在 redis cluster 模式下也能被分發到同一個節點,這樣 lua 腳本的執行纔是有效的.

// 基於 key 映射得到 v key 表達式
func (c *Cache) disableKey(key string) string {
    // 通過 {hash_tag},保證在 redis 集羣模式下,key 和 disable key 也會被分發到相同節點
    return fmt.Sprintf("Enable_Lock_Key_{%s}", key)
}

hash tag 的使用機制可以參見:https://redis.io/docs/latest/commands/cluster-keyslot/

2.3 數據庫

在 consistent_cache 中,針對數據庫模塊 db 提供了一個 mysql 實現版本,連接 mysql 的客戶端使用 gorm 版本. 這部分代碼位於 ./mysql/*:

// 判斷操作模型是否聲明瞭表名
type tabler interface {
    TableName() string
}


// 數據庫模塊的抽象接口定義
type DB struct {
    db *gorm.DB
}

在數據寫流程中:

更多細節通過源碼註釋的方式給出:

// 數據寫入數據庫
func (d *DB) Put(ctx context.Context, obj consistent_cache.Object) error {
    db := d.db
    // 倘若 obj 顯式聲明瞭表名,則進行應用
    tabler, ok := obj.(tabler)
    if ok {
        db = db.Table(tabler.TableName())
    }


    // 此處通過兩個非原子性動作實現 upsert 效果:
    // 1 嘗試創建記錄
    // 2 倘若發生唯一鍵衝突,則改爲執行更新操作
    err := db.WithContext(ctx).Create(obj).Error
    if err == nil {
        return nil
    }


    // 判斷是否爲唯一鍵衝突,若是的話,則改爲更新操作
    if IsDuplicateEntryErr(err) {
        return db.WithContext(ctx).Debug().Where(fmt.Sprintf("`%s` = ?", obj.KeyColumn()), obj.Key()).Updates(obj).Error
    }
    // 其他錯誤直接返回
    return err
}

針對 mysql 唯一鍵衝突的判斷方法:

import "github.com/go-sql-driver/mysql"


func IsDuplicateEntryErr(err error) bool {
    var mysqlErr *mysql.MySQLError
    if errors.As(err, &mysqlErr) && mysqlErr.Number == DuplicateEntryErrCode {
        return true
    }
    return false
}

在讀流程中:

// 從數據庫讀取數據
func (d *DB) Get(ctx context.Context, obj consistent_cache.Object) error {
    db := d.db
    // 倘若 obj 顯式聲明瞭表名,則進行應用
    tabler, ok := obj.(tabler)
    if ok {
        db = db.Table(tabler.TableName())
    }


    // select 語句讀取通過唯一鍵檢索數據記錄
    err := db.WithContext(ctx).Where(fmt.Sprintf("`%s` = ?", obj.KeyColumn()), obj.Key()).First(obj).Error
    // 倘若 db 中不存在數據,返回指定錯誤 ErrorDBMiss
    if errors.Is(err, gorm.ErrRecordNotFound) {
        return consistent_cache.ErrorDBMiss
    }
    return err
}

更多有關 gorm 的使用示例和底層原理可以參見我之前發表的文章:

2.4 一致性緩存服務

在一致性緩存服務 service 提供的寫數據方法 Put 中,包含如下核心步驟:

// 寫操作
func (s *Service) Put(ctx context.Context, obj Object) error {
    // 1 針對 key 維度禁用讀流程寫緩存機制
    if err := s.cache.Disable(ctx, obj.Key(), s.opts.disableExpireSeconds); err != nil {
        return err
    }


    defer func() {
        go func() {
            tctx, cancel := context.WithTimeout(context.Background(), time.Second)
            defer cancel()
            if err := s.cache.Enable(tctx, obj.Key(), s.opts.enableDelayMilis); err != nil {
                s.opts.logger.Errorf("enable fail, key: %s, err: %v", obj.Key(), err)
            }
        }()
    }()


    // 2 刪除 key 維度對應緩存
    if err := s.cache.Del(ctx, obj.Key()); err != nil {
        return err
    }


    // 3 數據寫入 db
    return s.db.Put(ctx, obj)
}

在一致性緩存服務 service 提供的讀數據方法 Get 中,包含如下核心步驟:

// 爲響應緩存穿透問題,啓用在緩存中寫入空數據的標識
const NullData = "Err_Syntax_Null_Data"


// 數據在 cache 和 db 中均不存在
var ErrorDataNotExist = errors.New("data not exist")


// 2 讀操作
func (s *Service) Get(ctx context.Context, obj Object) (useCache bool, err error) {
    // 1 讀取緩存
    v, err := s.cache.Get(ctx, obj.Key())
    // 2 非緩存 miss 類錯誤,直接拋出錯誤
    if err != nil && !errors.Is(err, ErrorCacheMiss) {
        return false, err
    }


    // 3 讀取到緩存結果
    if err == nil {
        // 3.1 讀取到的數據爲 EmptyData. 是爲了防止緩存穿透而設置的空值
        if v == NullData {
            return true, ErrorDataNotExist
        }
        // 3.2 正常讀取到數據
        return true, obj.Read(v)
    }


    // 4 緩存 miss,讀 db
    if err = s.db.Get(ctx, obj); err != nil && !errors.Is(err, ErrorDBMiss) {
        return false, err
    }


    // 5 db 中也沒有數據,則嘗試往 cache 中寫入 NullData
    if errors.Is(err, ErrorDBMiss) {
        if ok, err := s.cache.PutWhenEnable(ctx, obj.Key(), NullData, s.opts.CacheExpireSeconds()); err != nil {
            s.opts.logger.Errorf("put null data into cache fail, key: %s, err: %v", obj.Key(), err)
        } else {
            s.opts.logger.Infof("put null data into cache resp, key: %s, ok: %t", obj.Key(), ok)
        }


        return false, ErrorDataNotExist
    }


    // 6 成功獲取到數據了,則需要將其寫入緩存
    v, err = obj.Write()
    if err != nil {
        return false, err
    }
    if ok, err := s.cache.PutWhenEnable(ctx, obj.Key(), v, s.opts.CacheExpireSeconds()); err != nil {
        s.opts.logger.Errorf("put data into cache fail, key: %s, data: %v, err: %v", obj.Key(), v, err)
    } else {
        s.opts.logger.Infof("put data into cache resp, key: %s, v: %v, ok: %t", obj.Key(), v, ok)
    }


    // 7 返回讀取到的結果
    return false, nil
}

其中爲了應對緩存雪崩問題,在讀流程寫緩存步驟中,針對過期時間可以添加一個隨機擾動值:

func (o *Options) CacheExpireSeconds() int64 {
    if !o.cacheExpireRandomMode {
        return o.cacheExpireSeconds
    }


    // 過期時間在 1~2倍之間取隨機值
    return o.cacheExpireSeconds + o.rander.Int63n(o.cacheExpireSeconds+1)
}

3 使用示例

3.1 主流程

最後是關於整個一致性緩存服務的使用示例,這部分代碼位於 ./example/example_test.go 中.

其中核心步驟包括:

const (
    redisAddress  = "請輸入 redis 地址"
    redisPassword = "請輸入 redis 密碼"


    mysqlDSN = "請輸入 mysql dsn"
)


func newService() *consistent_cache.Service {
    // 緩存模塊
    cache := redis.NewRedisCache(&redis.Config{
        Address:  redisAddress,
        Password: redisPassword,
    })
    // 數據庫模塊
    db := mysql.NewDB(mysqlDSN)
    return consistent_cache.NewService(cache, db,
        consistent_cache.WithCacheExpireSeconds(120),
        consistent_cache.WithDisableExpireSeconds(1),
    )
}


func Test_consistent_Cache(t *testing.T) {
    service := newService()
    ctx := context.Background()
    exp := Example{
        Key_: "test",
        Data: "test",
    }
    // 寫操作
    if err := service.Put(ctx, &exp); err != nil {
        t.Error(err)
        return
    }


    // 讀操作
    expReceiver := Example{
        Key_: "test",
    }
    if _, err := service.Get(ctx, &expReceiver); err != nil {
        t.Error(err)
        return
    }


    // 讀取到的數據結果 以及是否使用到緩存
    t.Logf("read data: %s, ", expReceiver.Data)
}

3.2 object 實現

在使用示例中,同樣給出了針對數據記錄 Object interface 的實現類 Example,代碼位於 ./example/example_po.go:

type Example struct {
    ID   uint   `json:"id" gorm:"primarykey"`
    Key_ string `json:"key" gorm:"column:key"`
    Data string `json:"data" gorm:"column:data"`
}


// 獲取對應的表名
func (e *Example) TableName() string {
    return "example"
}


// 獲取 key 對應的字段名
func (e *Example) KeyColumn() string {
    return "key"
}


// 獲取 key 對應的值
func (e *Example) Key() string {
    return e.Key_
}


func (e *Example) DataColumn() []string {
    return []string{"data"}
}


// 將 object 序列化成字符串
func (e *Example) Write() (string, error) {
    body, err := json.Marshal(e)
    if err != nil {
        return "", err
    }
    return string(body), nil
}


// 讀取字符串內容,反序列化到 object 實例中
func (e *Example) Read(body string) error {
    return json.Unmarshal([]byte(body), e)
}

4 總結

本篇和大家一起針對緩存 cache 與數據庫 db 之間的數據一致性問題展開了理論探討,推演了從緩存延時雙刪到寫緩存禁用機制的演進過程.

基於上述理論,我通過 golang 開發了 lib 庫,集成了緩存一致性讀寫流程中的核心步驟,並在本文中針對其中的技術細節進行了介紹,並在最後給出對應的使用示例.

本文由 Readfog 進行 AMP 轉碼,版權歸原作者所有。
來源https://mp.weixin.qq.com/s/h1oi92BbdFdTGtey0wQLLQ