一致性緩存理論分析與技術實戰
0 前言
工程實踐場景中,我們通常用數據庫完成數據的持久存儲,而數據存儲側的性能調優也是一個永恆經典的話題. 在一些請求量大、讀多寫少的場景中,一種性能優化方式是考慮在數據庫之上添加一層緩存組件,這樣一方面能減輕數據庫的訪問壓力,一方面也能提升查詢操作的性能.
然而由於緩存(如 redis)和數據庫(如 mysql)是兩個獨立的存儲組件,在操作過程中無法在跨組件的基礎上保證 “事務” 的語義,因此不可避免地會面臨緩存數據與數據庫數據不一致的問題.
我們把上述問題稱爲 “緩存一致性” 問題,本篇將緊密圍繞該問題,針對設計緩存與數據庫的讀、寫流程進行串聯梳理,總結出能夠兼顧數據一致性與操作性能的執行方法論.
理論先行,實踐緊隨. 本篇所探討的內容,將統一通過 go 語言在我的開源項目 consistent_cache 中予以實踐驗證——開源項目傳送門:https://github.com/xiaoxuxiansheng/consistent_cache
1 理論分析
1.1 緩存一致性問題
緩存(cache) 相比於數據庫(db) 而言更加輕便快捷,但與之相對的也存在成本高、容量小、穩定性弱的問題,因此 cache 中的數據通常是依附於 db 中的持久化數據而存在,一筆數據的更新操作和存儲終態最終還是要以 db 爲準.
從這個角度出發來看,在寫操作密集的場景下,使用 cache 的收益並不明顯;然而在讀多寫少的場景中,我們通過將一些相對穩定不變的數據從 db 冗餘到 cache 中,由 cache 同一收口數據查詢能力,能夠很大程度提升查詢性能並降低 db 的訪問壓力.
隨之而來的首要問題就是 cache 中的數據如何與 db 保持一致. 展開來說就是,cache 和 db 本身是兩個獨立的存儲組件,跨組件間的數據一致性問題屬於分佈式事務的範疇,本身有一套解決的方案,但受限於其高昂的實現成本,因此不適用於絕大多數場景,這部分內容可以參考我之前發表的文章——萬字長文漫談分佈式事務實現原理.
在本篇中,我們更多將注意力集中在,如何針對數據的讀寫流程進行秩序地組織串聯,以此來滿足 cache 和 db 間數據的最終一致性,並儘可能追求即時一致性的語義.
1.2 讀寫流程串講
當前比較主流的緩存一致性實現思路中,針對讀、寫流程的職責進行如下拆解:
-
在寫流程中:只負責寫 db,不負責更新 cache;與之相對的,需要在寫 db 前,先負責將 cache 中的髒數據刪除,由後續到來的讀操作負責將更新後的遷移到緩存中
-
在讀流程中:嘗試讀 cache;如果 cache 中數據 miss,則讀取 db,並將讀到的數據更新到 cache 中,以便後續的讀操作複用
此處我們針對上述流程的設計原則通過 qa 形式進行簡要剖析:
question A:爲什麼寫流程不在寫完 db 後直接更新 cache 數據?
-
原因 I:cache 相比於 db 空間小、成本高,因此希望儘可能將訪問頻繁的熱數據加載到 cache 中進行復用. 而 cache 的目標是面向讀密集場景,數據的熱度由讀操作的頻率來決定,因此將寫 cache 的執行權交由讀流程負責,而非寫流程;
-
原因 II:在明確已經由讀流程負責寫 cache 的前提下,寫流程就不再執行重複動作,以此實現流程簡化,也能規避部分因併發場景而導致的 cache 數據一致性問題
question B:爲什麼寫流程需要刪除 cache 數據?
-
在讀流程中,當 cache 中存在數據,會立即讀取並返回結果. 因此倘若寫操作不刪除 cache 中的髒數據,那麼後續到來的讀操作都會因此讀取到錯誤的結果.
-
當寫操作刪除 cache 刪除後,讀流程訪問 cache 時發現數據 miss,就會讀取 db,並將此時正確的數據重新寫入 cache 中
question C:爲什麼寫流程需要先刪除 cache 再寫 db?
-
逆向思考:倘若先寫 db 後刪 cache,由於兩個操作無法保證原子性,一旦前者成功後者失敗,就會導致 cache 中永久存在錯誤的數據;
-
反之,先刪 cache 後寫 db,哪怕前者成功後者失敗,無非是額外增加一次將數據由 db 冗餘到 cache 中的成本而已
question D:上述流程是嚴謹的嗎?是否還存在哪些環節可能存在 cache 數據不一致問題?
這個問題留給各位讀者,請大家短暫駐步於此, 腦中思考該問題的答案,並帶着問題再進入到後續小節的閱讀內容中.
1.3 緩存雙刪策略
question D 的答案是否定的.
我們需要明白,在實際場景中,一系列讀、寫流程是併發執行的,兩個流程下各執行步驟的相對次序可能因爲機器、網絡等不確定因素髮生變化. 此處我們就給出一個具體 badcase 加以分析說明:
- 背景:
數據庫中有一筆 kv 數據,key:a; value:b
{
"key": "a",
"value": "b"
}
此時,一筆讀操作和一筆寫操作併發啓動,
讀操作讀取 key:a 對應的數據:
get a
寫操作執行指令,期望將 key:a 對應的 value 更新爲 c
- 過程
按照 1.2 小節中介紹的流程,下面我們進行讀寫流程的步驟串聯:
(1)moment1:寫流程刪除 cache 中的 key:a 數據
(2)moment2:讀流程讀 cache,發現 key:a 數據 miss,準備進一步讀 db
(3)moment3:讀流程讀 db,此時數據還是老版本,value 爲 b
(4)moment4:寫流程寫 db,將 key:a 對應 value 更新爲 c
(5)moment5:讀流程把讀到的 value:b 作爲 key:a 的映射數據寫入 cache
至此,讀、寫流程均結束了,然而現狀是,db 中 key:a 數據已經更新爲 c,然而 cache 中 key:a 對應的還是髒數據 b.
產生上述問題的本質原因就是,在併發場景下,步驟(1)寫流程刪除 cache 數據後,並無法阻止在(1)-(4)期間內,讀流程再次讀 db 並將髒數據遷移到 cache 中.
此處,我們首先想到的應對方式是緩存雙刪策略——即在寫流程寫 db 前後,分別執行一次刪除 cache 操作.
以上述案例加以說明,就是寫流程在 moment4 之後,額外增加一個 moment6,再一次將 cache 中 key:a 對應的數據刪除.
那麼,到此爲止,是否還存在 cache 數據不一致的可能性呢?
請大家短暫駐足思考,並帶着你的答案進入下一小節.
1.4 緩存延時雙刪策略
1.3 小節中,緩存雙刪中存在的問題其實很簡單,就在於我們無法保證,寫流程中,第二次刪除 cache 的動作一定能執行在讀流程寫 cache 的操作之後,也就是 moment5 和 moment6 兩個時刻的相對次序是不穩定的:
因此,我們進一步引入緩存延時雙刪策略. 這裏的 “延時” 就體現在,寫流程寫完 db 後,會等待指定時長,保證此期間可能持有髒數據的讀流程都完成寫 cache 操作後,再執行第二次的刪 cache 操作,以此來實現緩存數據的 “最終一致性” 語義.
緩存延時雙刪策略已經是可用於工程化落地的實現方案,其核心的讀寫流程串聯如下圖:
1.5 寫緩存禁用機制
然而,緩存延時雙刪機制同樣有其短板所在,核心分爲兩點:
-
cache 數據弱一致: 寫 db 到延時執行二次刪 cache 操作期間,cache 中都可能存在髒數據,因此無法保證 cache 數據的即時一致性,只能保證最終一致性語義
-
二次刪 cache 操作不穩定: 延時二次刪 cache 操作存在執行失敗的可能性,一旦失敗,cache 數據的 “最終一致性語義” 都將無法保證
在分佈式場景,我們往往需要在流程性能和數據一致性之間進行權衡取捨. 爲了進一步保證 cache 數據的強一致性語義,我們可以嘗試引入 “鎖機制”.
最簡單粗暴的實現方式,就是通過一把 key 維度的分佈式讀寫鎖,實現讀流程和寫流程的隔離. 這樣兩個流程下的執行步驟也就不會混淆在一起,上述問題自然也就得到了根除.
然而,該做法的代價卻是對讀操作性能的大幅度犧牲,針對某筆數據,一旦產生寫操作,那麼在此期間所有讀操作都要陷入阻塞等待的狀態,這並不是我們所樂於接受的.
在此基礎上,我們進一步對實現思路進行升級. 在 1.3-1.4 小節的思路演進過程中,我們發現導致 cache 數據不一致的罪魁禍首,本質上是讀流程把髒數據寫入 cache 的操作,而非宏觀意義上的整個讀流程.
因此,我們要做的不是完全隔離讀、寫流程,也不需要使用分佈式鎖這麼重的工具,而是退而求其次,針對一筆數據的維度啓用一個 “開關” 機制,能夠用於控制讀流程是否啓用寫 cache 機制即可:
-
每當有寫流程到達時,先將該筆數據的 “開關” 關閉,然後正常執行後續流程,執行完成後再重新將 “開關” 打開
-
在 “開關” 關閉期間,所有到達的讀流程正常執行步驟,唯獨不會在讀 db 後執行寫 cache 操作
通過上述的 “寫緩存禁用” 機制,就保證了數據的強一致性. 寫流程執行期間,該筆數據對應的讀流程不會阻塞,只是相當於 cache 機制被暫時屏蔽,讀流程需要統一從 db 中獲取最精確的數據而已.
此處值得一提的是,在寫流程完成寫 db 操作後,通過需要延遲一段時間再重新開啓該筆數據下的 “寫緩存機制”,其本質思路和緩存延時雙刪策略中 “延時” 的用意是一致的,就是避免在併發場景下,讀取到 db 髒數據的讀流程寫 cache 操作恰好發生在寫流程 “寫緩存機制” 啓用之後.
1.6 其他緩存相關問題
除此之外,在 cache 與 db 交互流程中,還存在幾個經典問題,也將在本篇中展開探討,並在技術實戰環節中對具體的應對策略進行展示:
- 緩存雪崩
倘若導致大量 cache key 在同一時刻過期,那麼這一瞬間紛湧而至的大量讀請求都會因爲 cache 數據 miss 而集體湧入到 db 中,導致 db 壓力陡增.
緩存雪崩問題的常用解決思路是切斷問題的直接導火索,對 cache key 的過期時間進行打散,比如可以在預設過期時間的基礎上,加上隨機擾動值,因此來避免大面積 cache key 同時失效的情形.
- 緩存穿透
倘若讀操作頻繁請求 db 中不存在的數據,那麼該數據自然也無法寫入 cache,最終所有請求都會直擊 db,導致 db 壓力較大.
針對緩存穿透問題的解法之一,是存儲層之上額外封裝一層布隆過濾器,用於提前過濾大部分不存在的 key,具體技術原理可參見我之前發表的文章——布隆過濾器技術原理及應用實戰.
而在本篇中,我們將採用另一種解法:倘若在讀流程中發現數據在 db 中不存在,則同樣會寫 cache,但會針對該筆數據加以特殊標記,讓後續讀操作通過讀 cache 就能獲得到數據不存在的信息.
2 技術實戰
介紹理論基礎部分,下面展示一致性緩存服務項目 consistent_cach 的源碼實現,進入技術實戰環節.
2.1 架構
整個操作流程涉及到對緩存模塊 cache 和數據庫模塊 db 的操作,因此在實現上拆分出三個核心模塊:
-
一致性緩存服務 service:整體上串聯讀、寫流程,包含其中針對 cache 和 db 的每個交互節點. (保證緩存數據一致性的核心邏輯,項目中重點實現部分)
-
緩存模塊 cache:對緩存組件的抽象(定義成一個抽象 interface 由使用方自行實現,項目中提供了一個 redis 實現版本)
-
數據庫模塊 db:對數據庫組件的抽象(定義成一個抽象 interface 由使用方自行實現,項目中提供了一個 mysql 實現版本)
一致性緩存服務 service 的實現代碼代碼位於 ./service.go,核心成員屬性包括其持有的緩存模塊 cache 和數據庫模塊 db 的實例引用:
type Service struct {
opts *Options // 策略參數
cache Cache // 緩存模塊
db DB // 數據庫模塊
}
// 構造一致性緩存服務. 緩存和數據庫均由使用方提供具體的實現版本
func NewService(cache Cache, db DB, opts ...Option) *Service {
s := Service{
cache: cache,
db: db,
opts: &Options{},
}
for _, opt := range opts {
opt(s.opts)
}
repair(s.opts)
return &s
}
針對緩存模塊抽象接口——Cache 的定義代碼位於 ./interface.go,其中包含如下核心方法:
-
Get: 在 cache 中查詢數據. 如果數據在 cache 中不存在,需要返回指定錯誤:ErrorCacheMiss
-
Del: 從 cache 中刪除數據
-
Disable: 針對某條數據禁用寫緩存機制,爲了防止意外會給一個兜底過期時間(寫流程刪 cache 前執行)
-
Enable: 針對某條數據,延時啓用寫緩存機制(寫流程寫 db 後執行)
-
PutWhenEnable:只在寫緩存機制啓用的前提下,執行寫 cache 操作(讀流程在讀 db 後執行)
var (
// 數據在緩存中不存在
ErrorCacheMiss = errors.New("cache miss")
)
// 緩存模塊的抽象接口定義
type Cache interface {
// 啓用某個 key 對應讀流程寫緩存機制(默認情況下爲啓用狀態)
Enable(ctx context.Context, key string, delayMilis int64) error
// 禁用某個 key 對應讀流程寫緩存機制
Disable(ctx context.Context, key string, expireSeconds int64) error
// 讀取 key 對應緩存
Get(ctx context.Context, key string) (string, error)
// 刪除 key 對應緩存
Del(ctx context.Context, key string) error
// 校驗某個 key 對應讀流程寫緩存機制是否啓用,倘若啓用則寫入緩存(默認情況下爲啓用狀態)
PutWhenEnable(ctx context.Context, key, value string, expireSeconds int64) (bool, error)
}
針對數據庫模塊定義了抽象的 interface——DB:
-
Put: 將一條數據 obj 寫入(或更新)到 db
-
Get: 從 db 中讀取一條數據,此處 obj 作爲接收數據的指針(倘若 db 中不存在數據,需要返回指定錯誤 ErrorDBMiss)
// 數據庫中不存在數據
var ErrorDBMiss = errors.New("db miss")
// 數據庫模塊的抽象接口定義
type DB interface {
// 數據寫入數據庫
Put(ctx context.Context, obj Object) error
// 從數據庫讀取數據
Get(ctx context.Context, obj Object) error
}
針對一條數據記錄,使用抽象 interface——Object 進行抽象,其需要實現幾個方法:
-
KeyColumn: db 中唯一鍵的字段名,也會作爲 cache 中的 key
-
Key: 唯一鍵的值
-
Write: 將數據記錄 object 內容序列化成字符串
-
Read: 讀取字符串,將內容反序列化到數據記錄 object 中
// 每次讀寫操作時,操作的一筆數據記錄
type Object interface {
// 獲取 key 對應的字段名
KeyColumn() string
// 獲取 key 對應的值
Key() string
// 將 object 序列化成字符串
Write() (string, error)
// 讀取字符串內容,反序列化到 object 實例中
Read(body string) error
}
2.2 緩存
在 consistent_cache 項目中,基於 redis 實現了一個緩存模塊:
-
disable 禁用寫緩存機制: 通過 redis setEx 指令,在 redis 中設置與 key 一一對應的標識鍵 disableKey 實現
-
enable 延時啓用寫緩存機制: 通過 redis expire 指令,給 disableKey 設置一個較短過期時間的方式來實現延時啓用效果
-
get、del:直接通過 redis get、del 操作實現
-
putWhenEnable: 通過 lua 腳本實現,保證只在 disableKey 不存在時,纔會寫入 cache
redis 緩存模塊實現類代碼位於 ./redis/cache.go:
// redis 實現版本的緩存模塊
type Cache struct {
// 抽象的客戶端模塊. 實現版本爲 redis 客戶端
client Client
}
// 構造器函數
func NewRedisCache(config *Config) *Cache {
return &Cache{client: NewRClient(config)}
}
其中各核心方法的源碼內容展示如下:
// 啓用某個 key 對應讀流程寫緩存機制(默認情況下爲啓用狀態)
func (c *Cache) Enable(ctx context.Context, key string, delayMilis int64) error {
// redis 中刪除 key 對應的 disable key. 只要 disable key 標識不存在,則讀流程寫緩存機制視爲啓用狀態
// 給 disable key 設置一個相對較短的過期時間
return c.client.PExpire(ctx, key, delayMilis)
}
// 禁用某個 key 的讀流程寫緩存機制
func (c *Cache) Disable(ctx context.Context, key string, expireSeconds int64) error {
// redis 中設置 key 對應的 disable key. 只要 disable key 標識存在,則讀流程寫緩存機制視爲禁用狀態
return c.client.SetEx(ctx, c.disableKey(key), "1", expireSeconds)
}
// 讀取 key 對應緩存內容
func (c *Cache) Get(ctx context.Context, key string) (string, error) {
// 從 redis 中讀取 kv 對
reply, err := c.client.Get(ctx, key)
if err != nil && !errors.Is(err, redis.ErrNil) {
return "", err
}
// 倘若緩存中不存在數據,返回指定錯誤 ErrorCacheMiss
if errors.Is(err, redis.ErrNil) {
return "", consistent_cache.ErrorCacheMiss
}
return reply, nil
}
// 刪除 key 對應緩存
func (c *Cache) Del(ctx context.Context, key string) error {
// 從 reids 中刪除 kv 對
return c.client.Del(ctx, key)
}
其中,在 putWhenEnable 方法中,因爲需要對多個指令進行原子化執行,因此涉及到對 lua 腳本的使用:
// 校驗某個 key 對應讀流程寫緩存機制是否啓用,倘若啓用則寫入緩存(默認情況下爲啓用狀態)
func (c *Cache) PutWhenEnable(ctx context.Context, key, value string, expireSeconds int64) (bool, error) {
// 運行 redis lua 腳本,保證只有在 disable key 不存在時,纔會執行 key 的寫入
reply, err := c.client.Eval(ctx, LuaCheckEnableAndWriteCache, 2, []interface{}{
c.disableKey(key),
key,
value,
expireSeconds,
})
if err != nil {
return false, err
}
return cast.ToInt(reply) == 1, nil
}
對應 lua 腳本代碼如下,位於 ./redis.lua.go:
const (
// 通過 lua 腳本確保在 disable key 不存在時,才執行 key value 對寫入
LuaCheckEnableAndWriteCache = `
local disable_key = KEYS[1];
local disable_flag = redis.call("get",disable_key);
if disable_flag then
return 0;
end
local key = KEYS[2];
local value = ARGV[1];
redis.call("set",key,value);
local cache_expire_seconds = tonumber(ARGV[2]);
redis.call("expire",key,cache_expire_seconds);
return 1;
`
)
值得一提的是,由於在 putWhenEnable 對應 lua 腳本中,涉及到對數據 key 以及與其一一一應的 disableKey 的操作,因此需要通過 hash tag 保證這兩個 key 在 redis cluster 模式下也能被分發到同一個節點,這樣 lua 腳本的執行纔是有效的.
// 基於 key 映射得到 v key 表達式
func (c *Cache) disableKey(key string) string {
// 通過 {hash_tag},保證在 redis 集羣模式下,key 和 disable key 也會被分發到相同節點
return fmt.Sprintf("Enable_Lock_Key_{%s}", key)
}
hash tag 的使用機制可以參見:https://redis.io/docs/latest/commands/cluster-keyslot/
2.3 數據庫
在 consistent_cache 中,針對數據庫模塊 db 提供了一個 mysql 實現版本,連接 mysql 的客戶端使用 gorm 版本. 這部分代碼位於 ./mysql/*:
// 判斷操作模型是否聲明瞭表名
type tabler interface {
TableName() string
}
// 數據庫模塊的抽象接口定義
type DB struct {
db *gorm.DB
}
在數據寫流程中:
-
執行 create 操作
-
倘若發生唯一鍵衝突錯誤,則改爲執行 update 操作
更多細節通過源碼註釋的方式給出:
// 數據寫入數據庫
func (d *DB) Put(ctx context.Context, obj consistent_cache.Object) error {
db := d.db
// 倘若 obj 顯式聲明瞭表名,則進行應用
tabler, ok := obj.(tabler)
if ok {
db = db.Table(tabler.TableName())
}
// 此處通過兩個非原子性動作實現 upsert 效果:
// 1 嘗試創建記錄
// 2 倘若發生唯一鍵衝突,則改爲執行更新操作
err := db.WithContext(ctx).Create(obj).Error
if err == nil {
return nil
}
// 判斷是否爲唯一鍵衝突,若是的話,則改爲更新操作
if IsDuplicateEntryErr(err) {
return db.WithContext(ctx).Debug().Where(fmt.Sprintf("`%s` = ?", obj.KeyColumn()), obj.Key()).Updates(obj).Error
}
// 其他錯誤直接返回
return err
}
針對 mysql 唯一鍵衝突的判斷方法:
import "github.com/go-sql-driver/mysql"
func IsDuplicateEntryErr(err error) bool {
var mysqlErr *mysql.MySQLError
if errors.As(err, &mysqlErr) && mysqlErr.Number == DuplicateEntryErrCode {
return true
}
return false
}
在讀流程中:
-
• 通過數據唯一鍵作爲檢索條件進行 select 操作
-
• 倘若數據不存在,需要返回指定錯誤 ErrorDBMiss
// 從數據庫讀取數據
func (d *DB) Get(ctx context.Context, obj consistent_cache.Object) error {
db := d.db
// 倘若 obj 顯式聲明瞭表名,則進行應用
tabler, ok := obj.(tabler)
if ok {
db = db.Table(tabler.TableName())
}
// select 語句讀取通過唯一鍵檢索數據記錄
err := db.WithContext(ctx).Where(fmt.Sprintf("`%s` = ?", obj.KeyColumn()), obj.Key()).First(obj).Error
// 倘若 db 中不存在數據,返回指定錯誤 ErrorDBMiss
if errors.Is(err, gorm.ErrRecordNotFound) {
return consistent_cache.ErrorDBMiss
}
return err
}
更多有關 gorm 的使用示例和底層原理可以參見我之前發表的文章:
2.4 一致性緩存服務
在一致性緩存服務 service 提供的寫數據方法 Put 中,包含如下核心步驟:
-
通過緩存模塊 cache 的 disable 操作,禁用寫緩存機制
-
在 cache 中刪除對應的數據 key
-
將數據寫入到 db 中
-
調用 cache 的 enable 方法,實現延時啓用寫緩存機制的效果
// 寫操作
func (s *Service) Put(ctx context.Context, obj Object) error {
// 1 針對 key 維度禁用讀流程寫緩存機制
if err := s.cache.Disable(ctx, obj.Key(), s.opts.disableExpireSeconds); err != nil {
return err
}
defer func() {
go func() {
tctx, cancel := context.WithTimeout(context.Background(), time.Second)
defer cancel()
if err := s.cache.Enable(tctx, obj.Key(), s.opts.enableDelayMilis); err != nil {
s.opts.logger.Errorf("enable fail, key: %s, err: %v", obj.Key(), err)
}
}()
}()
// 2 刪除 key 維度對應緩存
if err := s.cache.Del(ctx, obj.Key()); err != nil {
return err
}
// 3 數據寫入 db
return s.db.Put(ctx, obj)
}
在一致性緩存服務 service 提供的讀數據方法 Get 中,包含如下核心步驟:
-
嘗試從 cache 中讀到空數據標識(針對緩存穿透問題啓用的 NullData),則直接返回數據不存在的錯誤 ErrorDataNotExist
-
倘若 cache 缺失數據(ErrorCacheMiss),則從 db 中讀數據
-
從 db 讀到數據後,調用 cache 模塊的 putWhenEnable 方法,保證只在寫緩存機制啓用的情況下,將數據寫入 cache
-
倘若 db 中也沒有數據(ErrorDBMiss),同樣調用 cache 模塊的 putWhenEnable 方法,保證只在寫緩存機制啓用的情況下,在 cache 中寫入 NullData 標識
// 爲響應緩存穿透問題,啓用在緩存中寫入空數據的標識
const NullData = "Err_Syntax_Null_Data"
// 數據在 cache 和 db 中均不存在
var ErrorDataNotExist = errors.New("data not exist")
// 2 讀操作
func (s *Service) Get(ctx context.Context, obj Object) (useCache bool, err error) {
// 1 讀取緩存
v, err := s.cache.Get(ctx, obj.Key())
// 2 非緩存 miss 類錯誤,直接拋出錯誤
if err != nil && !errors.Is(err, ErrorCacheMiss) {
return false, err
}
// 3 讀取到緩存結果
if err == nil {
// 3.1 讀取到的數據爲 EmptyData. 是爲了防止緩存穿透而設置的空值
if v == NullData {
return true, ErrorDataNotExist
}
// 3.2 正常讀取到數據
return true, obj.Read(v)
}
// 4 緩存 miss,讀 db
if err = s.db.Get(ctx, obj); err != nil && !errors.Is(err, ErrorDBMiss) {
return false, err
}
// 5 db 中也沒有數據,則嘗試往 cache 中寫入 NullData
if errors.Is(err, ErrorDBMiss) {
if ok, err := s.cache.PutWhenEnable(ctx, obj.Key(), NullData, s.opts.CacheExpireSeconds()); err != nil {
s.opts.logger.Errorf("put null data into cache fail, key: %s, err: %v", obj.Key(), err)
} else {
s.opts.logger.Infof("put null data into cache resp, key: %s, ok: %t", obj.Key(), ok)
}
return false, ErrorDataNotExist
}
// 6 成功獲取到數據了,則需要將其寫入緩存
v, err = obj.Write()
if err != nil {
return false, err
}
if ok, err := s.cache.PutWhenEnable(ctx, obj.Key(), v, s.opts.CacheExpireSeconds()); err != nil {
s.opts.logger.Errorf("put data into cache fail, key: %s, data: %v, err: %v", obj.Key(), v, err)
} else {
s.opts.logger.Infof("put data into cache resp, key: %s, v: %v, ok: %t", obj.Key(), v, ok)
}
// 7 返回讀取到的結果
return false, nil
}
其中爲了應對緩存雪崩問題,在讀流程寫緩存步驟中,針對過期時間可以添加一個隨機擾動值:
func (o *Options) CacheExpireSeconds() int64 {
if !o.cacheExpireRandomMode {
return o.cacheExpireSeconds
}
// 過期時間在 1~2倍之間取隨機值
return o.cacheExpireSeconds + o.rander.Int63n(o.cacheExpireSeconds+1)
}
3 使用示例
3.1 主流程
最後是關於整個一致性緩存服務的使用示例,這部分代碼位於 ./example/example_test.go 中.
其中核心步驟包括:
-
輸入 redis 地址、密碼,啓用 redis 緩存模塊
-
輸入 mysql dsn,啓用 mysql 數據庫模塊
-
創建一致性緩存服務實例
-
執行寫操作
-
執行讀操作
const (
redisAddress = "請輸入 redis 地址"
redisPassword = "請輸入 redis 密碼"
mysqlDSN = "請輸入 mysql dsn"
)
func newService() *consistent_cache.Service {
// 緩存模塊
cache := redis.NewRedisCache(&redis.Config{
Address: redisAddress,
Password: redisPassword,
})
// 數據庫模塊
db := mysql.NewDB(mysqlDSN)
return consistent_cache.NewService(cache, db,
consistent_cache.WithCacheExpireSeconds(120),
consistent_cache.WithDisableExpireSeconds(1),
)
}
func Test_consistent_Cache(t *testing.T) {
service := newService()
ctx := context.Background()
exp := Example{
Key_: "test",
Data: "test",
}
// 寫操作
if err := service.Put(ctx, &exp); err != nil {
t.Error(err)
return
}
// 讀操作
expReceiver := Example{
Key_: "test",
}
if _, err := service.Get(ctx, &expReceiver); err != nil {
t.Error(err)
return
}
// 讀取到的數據結果 以及是否使用到緩存
t.Logf("read data: %s, ", expReceiver.Data)
}
3.2 object 實現
在使用示例中,同樣給出了針對數據記錄 Object interface 的實現類 Example,代碼位於 ./example/example_po.go:
type Example struct {
ID uint `json:"id" gorm:"primarykey"`
Key_ string `json:"key" gorm:"column:key"`
Data string `json:"data" gorm:"column:data"`
}
// 獲取對應的表名
func (e *Example) TableName() string {
return "example"
}
// 獲取 key 對應的字段名
func (e *Example) KeyColumn() string {
return "key"
}
// 獲取 key 對應的值
func (e *Example) Key() string {
return e.Key_
}
func (e *Example) DataColumn() []string {
return []string{"data"}
}
// 將 object 序列化成字符串
func (e *Example) Write() (string, error) {
body, err := json.Marshal(e)
if err != nil {
return "", err
}
return string(body), nil
}
// 讀取字符串內容,反序列化到 object 實例中
func (e *Example) Read(body string) error {
return json.Unmarshal([]byte(body), e)
}
4 總結
本篇和大家一起針對緩存 cache 與數據庫 db 之間的數據一致性問題展開了理論探討,推演了從緩存延時雙刪到寫緩存禁用機制的演進過程.
基於上述理論,我通過 golang 開發了 lib 庫,集成了緩存一致性讀寫流程中的核心步驟,並在本文中針對其中的技術細節進行了介紹,並在最後給出對應的使用示例.
本文由 Readfog 進行 AMP 轉碼,版權歸原作者所有。
來源:https://mp.weixin.qq.com/s/h1oi92BbdFdTGtey0wQLLQ