分佈式鎖的實現原理
介紹分佈式鎖的實現原理。
一、分佈式鎖概述
分佈式鎖,顧名思義,就是在分佈式環境下使用的鎖。衆所周知,在併發編程中,我們經常需要藉助併發控制工具,如 mutex、synchronized 等,來保障線程安全。但是,這種線程安全僅作用在同一內存環境中。在實際業務中,爲了保障服務的可靠性,我們通常會採用多節點進行部署。在這種分佈式情況下,各實例間的內存不共享,線程安全並不能保證併發安全,如下例,同一實例中線程 A 與線程 B 之間的併發安全並不能保證實例 1 與實例 2 之間的併發安全:
因此,當遇到分佈式系統的併發安全問題時,我們就可能會需要引入分佈式鎖來解決。
用於實現分佈式鎖的組件通常都會具備以下的一些特性:
-
互斥性:提供分佈式環境下的互斥原語來加鎖 / 釋放鎖,當然是分佈式鎖最基本的特性。
-
自動釋放:爲了應對分佈式系統中各實例因通信故障導致鎖不能釋放的問題,自動釋放的特性通常也是很有必要的。
-
分區容錯性:應用在分佈式系統的組件,具備分區容錯性也是一項重要的特性,否則就會成爲整個系統的瓶頸。
目前開源社區中常見的分佈式鎖解決方案,大多是基於具備集羣部署能力的 key-value 存儲中間件來實現,最爲常用的方案基本上是基於 Redis、zookeeper 來實現,筆者將從上述分佈式鎖的特性出發,介紹一下這兩類的分佈式鎖解決方案的優缺點。
彩蛋提醒
我們爲大家準備了抽獎福利,請繼續閱讀下去。
二、分佈式鎖的實現原理
2.1 Redis 實現分佈式鎖
Redis 由於其高性能、使用及部署便利性,在很多場景下是實現分佈式鎖的首選。首先我們看下 Redis 是如何實現互斥性的。在單機部署的模式下,Redis 由於其單線程處理命令的線程模型,天然的具備互斥能力;而在哨兵 / 集羣模式下,寫命令也是單獨發送到某個單獨節點上進行處理,可以保證互斥性;其核心的命令是 set if not exist:
SET lockKey lockValue NX
成功設置 lockValue 的實例,就相當於搶鎖成功。但如果持有鎖的實例宕機,因爲 Redis 服務端並沒有感知客戶端狀態的能力,因此會出現鎖無法釋放的問題:
這種情況下,就需要給 key 設置一個過期時間 expireTime:
SET lockKey lockValue EX expireTime NX
左右滑動查看完整代碼
如果持有鎖的實例宕機無法釋放鎖,則鎖會自動過期,這樣可以就避免鎖無法釋放的問題。在一些簡單的場景下,通過該方式實現的分佈式鎖已經可以滿足需求。但這種方式存在一個明顯問題:如果業務的實際處理時間比鎖過期時間長,鎖就會被誤釋放,導致其他實例也可以加鎖:
這種情況下,就需要通過其他機制來保證鎖在業務處理結束後再釋放,一個常用的方式就是通過後臺線程的方式來實現鎖的自動續期。
Redssion 是開源社區中比較受歡迎的一個 Java 語言實現的 Redis 客戶端,其對 Java 中 Lock 接口定義進行擴展,實現了 Redis 分佈式鎖,並通過 watchDog 機制(本質上即是後臺線程運作)來對鎖進行自動續期。以下是一個簡單的 Reddison 分佈式鎖的使用例子:
RLock rLock = RedissonClient.getLock("test-lock");
try {
if (rLock.tryLock()) {
// do something
}
} finally {
rLock.unlock();
}
Redssion 的默認實現 RedissonLock 爲可重入互斥非公平鎖,其 tryLock 方法會基於三個可選參數執行:
-
waitTime(獲取鎖的最長等待時長):默認爲 - 1,waitTime 參數決定在獲取鎖的過程中是否需要進行等待,如果 waitTime>0,則在獲取鎖的過程中線程會等待一定時間並持續嘗試獲取鎖,否則獲取鎖失敗會直接返回。
-
leaseTime(鎖持有時長):默認爲 - 1。當 leaseTime<=0 時,會開啓 watchDog 機制進行自動續期,而 leaseTime>0 時則不會進行自動續期,到達 leaseTime 鎖即過期釋放
-
unit(時間單位):標識 waitTime 及 leaseTime 的時間單位
我們不妨通過參數最全的
RedissonLock#tryLock(long waitTime, long leaseTime, TimeUnit unit) 方法源碼來一探其完整的加鎖過程:
public boolean tryLock(long waitTime, long leaseTime, TimeUnit unit) throws InterruptedException {
...
// tryAcquire方法返回鎖的剩餘有效時長ttl,如果未上鎖,則爲null
Long ttl = tryAcquire(waitTime, leaseTime, unit, threadId);
if (ttl == null) {
// 獲取鎖成功
return true;
}
// 計算剩餘等待時長,剩餘等待時長小於0,則不再嘗試獲取鎖,獲取鎖失敗,後續有多處同樣的判斷邏輯,將精簡省略
time -= System.currentTimeMillis() - current;
if (time <= 0) {
acquireFailed(waitTime, unit, threadId);
return false;
}
// 等待時長大於0,則會對鎖釋放的事件進行訂閱,持有鎖的客戶端在鎖釋放時會發布鎖釋放事件通知其他客戶端搶鎖,由此可得知該默認實現爲非公平鎖。
// Redisson對Redis發佈訂閱機制的實現,底層大量使用了CompletableFuture、CompletionStage等接口來編寫異步回調代碼,感興趣的讀者可以詳細瞭解,此處不作展開
CompletableFuture<RedissonLockEntry> subscribeFuture = subscribe(threadId);
try {
subscribeFuture.get(time, TimeUnit.MILLISECONDS);
} catch (TimeoutException e) {
...
} catch (ExecutionException e) {
...
}
try {
...
// 循環嘗試獲取鎖
while (true) {
long currentTime = System.currentTimeMillis();
ttl = tryAcquire(waitTime, leaseTime, unit, threadId);
// lock acquired
if (ttl == null) {
return true;
}
...
// 此處通過信號量來將線程阻塞一定時間,避免無效的申請鎖浪費資源;在阻塞期間,如果收到了鎖釋放的事件,則會通過信號量提前喚起阻塞線程,重新嘗試獲取鎖;
currentTime = System.currentTimeMillis();
if (ttl >= 0 && ttl < time) {
// 若ttl(鎖過期時長)小於time(剩餘等待時長),則將線程阻塞ttl
commandExecutor.getNow(subscribeFuture).getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS);
} else {
// 若等待時長小於ttl,則將線程阻塞time
commandExecutor.getNow(subscribeFuture).getLatch().tryAcquire(time, TimeUnit.MILLISECONDS);
}
...
}
} finally {
// 取消訂閱
unsubscribe(commandExecutor.getNow(subscribeFuture), threadId);
}
}
上述代碼邏輯主要集中在處理 waitTime 參數,在併發競爭不激烈、可以容忍一定的等待時間的情況下,合理設置 waitTime 參數可以提高業務併發運行成功率,避免搶鎖失敗直接返回錯誤;但在併發競爭激烈、對性能有較高要求時,建議不設置 waitTime,或者直接使用沒有 waitTime 參數的 lock() 方法,通過快速失敗來提高系統吞吐量。
一個比較值得注意的點是,如果設置了 waitTime 參數,則 Redisson 通過將 RedissonLockEntry 中信號量(Semaphore)的許可證數初始化爲 0 來達到一定程度的限流,保證鎖釋放後只有一個等待中的線程會被喚醒去請求 Redis 服務端,把喚醒等待線程的工作分攤到各個客戶端實例上,可以很大程度上緩解非公平鎖給 Redis 服務端帶來的驚羣效應壓力。
public class RedissonLockEntry implements PubSubEntry<RedissonLockEntry> {
...
private final Semaphore latch;
public RedissonLockEntry(CompletableFuture<RedissonLockEntry> promise) {
super();
// RedissonLockEntry 中的Semaphore的許可證數初始化爲0
this.latch = new Semaphore(0);
this.promise = promise;
}
...
}
獲取鎖的核心邏輯,會通過
RedissonLock#tryAcquire
方法調用到 RedissonLock#tryAcquireAsync 方法。
private RFuture<Long> tryAcquireAsync(long waitTime, long leaseTime, TimeUnit unit, long threadId) {
RFuture<Long> ttlRemainingFuture;
if (leaseTime > 0) {
// 若leaseTime大於零,會設置鎖的租期爲leaseTime
ttlRemainingFuture = tryLockInnerAsync(waitTime, leaseTime, unit, threadId, RedisCommands.EVAL_LONG);
} else {
// 若leaseTime小於或等於零,會設置鎖的租期爲internalLockLeaseTime,這是一個通過lockWatchdogTimeout配置的值,默認爲30s
ttlRemainingFuture = tryLockInnerAsync(waitTime, internalLockLeaseTime,
TimeUnit.MILLISECONDS, threadId, RedisCommands.EVAL_LONG);
}
// 此處的handleNoSync方法是爲了解決Redis發生故障轉移,集羣拓撲改變後,只有持有鎖的客戶端能再次獲得鎖的bug,爲3.20.1版本修復,詳見Redisson issue#4822
CompletionStage<Long> s = handleNoSync(threadId, ttlRemainingFuture);
ttlRemainingFuture = new CompletableFutureWrapper<>(s);
// 根據加鎖情況來進行後續處理
CompletionStage<Long> f = ttlRemainingFuture.thenApply(ttlRemaining -> {
// lock acquired
// 若ttl爲空,說明加鎖不成功
if (ttlRemaining == null) {
if (leaseTime > 0) {
// 若leaseTime>0,則將internalLockLeaseTime變量設置爲leaseTime,以便後續解鎖使用
internalLockLeaseTime = unit.toMillis(leaseTime);
} else {
// 若leaseTime<=0,則開啓看門狗機制,通過定時任務進行鎖續期
scheduleExpirationRenewal(threadId);
}
}
return ttlRemaining;
});
return new CompletableFutureWrapper<>(f);
}
// 加鎖的lua腳本
<T> RFuture<T> tryLockInnerAsync(long waitTime, long leaseTime, TimeUnit unit, long threadId, RedisStrictCommand<T> command) {
return evalWriteAsync(getRawName(), LongCodec.INSTANCE, command,
"if ((Redis.call('exists', KEYS[1]) == 0) " +
"or (Redis.call('hexists', KEYS[1], ARGV[2]) == 1)) then " +
"Redis.call('hincrby', KEYS[1], ARGV[2], 1); " +
"Redis.call('pexpire', KEYS[1], ARGV[1]); " +
"return nil; " +
"end; " +
"return Redis.call('pttl', KEYS[1]);",
Collections.singletonList(getRawName()), unit.toMillis(leaseTime), getLockName(threadId));
}
可以看到,若 leaseTime 大於 0,則不會開啓看門狗機制,鎖在過期後即失效,在使用時請務必留意。上述代碼中執行的 scheduleExpirationRenewal 方法即爲看門狗機制的實現邏輯:
protected void scheduleExpirationRenewal(long threadId) {
// 每個鎖都會對應一個ExpirationEntry類,第一次加鎖時不存在oldEntry
ExpirationEntry = new ExpirationEntry();
ExpirationEntry oldEntry = EXPIRATION_RENEWAL_MAP.putIfAbsent(getEntryName(), entry);
if (oldEntry != null) {
// 非首次加鎖,重入計數,不作其他操作
oldEntry.addThreadId(threadId);
} else {
// 首次加鎖,調用renewExpiration()方法進行自動續期
entry.addThreadId(threadId);
try {
renewExpiration();
} finally {
// 若當前線程被中斷,則取消對鎖的自動續期。
if (Thread.currentThread().isInterrupted()) {
cancelExpirationRenewal(threadId);
}
}
}
}
private void renewExpiration() {
...
// 此處使用的是netty的時間輪來執行定時續期,此處不對時間輪做展開,感興趣的讀者可詳細瞭解
Timeout task = getServiceManager().newTimeout(new TimerTask() {
@Override
public void run(Timeout timeout) throws Exception {
...
CompletionStage<Boolean> future = renewExpirationAsync(threadId);
future.whenComplete((res, e) -> {
if (e != null) {
log.error("Can't update lock {} expiration", getRawName(), e);
EXPIRATION_RENEWAL_MAP.remove(getEntryName());
return;
}
if (res) {
// 若續期成功,則遞歸調用,等待任務的下一次執行
renewExpiration();
} else {
// 若續期結果爲false,說明鎖已經過期了,或鎖易主了,則清理當前線程關聯的信息,等待線程結束
cancelExpirationRenewal(null);
}
});
}
// 時間輪的執行週期爲internalLockLeaseTime / 3,即默認情況下,internalLockLeaseTime爲30s時,每10s觸發一次自動續期
}, internalLockLeaseTime / 3, TimeUnit.MILLISECONDS);
ee.setTimeout(task);
}
protected CompletionStage<Boolean> renewExpirationAsync(long threadId) {
// 執行重置過期時間的lua腳本
return evalWriteAsync(getRawName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,
"if (Redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " +
"Redis.call('pexpire', KEYS[1], ARGV[1]); " +
"return 1; " +
"end; " +
"return 0;",
Collections.singletonList(getRawName()),
internalLockLeaseTime, getLockName(threadId));
}
上面一段代碼即是看門狗調度的核心代碼,本質上即是通過定時調度線程執行 lua 腳本來進行鎖續期。值得留意的是 scheduleExpirationRenewal
方法中的 ExpirationEntry,該對象與鎖一一關聯,會存儲嘗試獲取該鎖的線程(無論是否獲取成功)以及重入鎖的次數,在鎖失效 / 鎖釋放時,會根據該對象中存儲的線程逐一進行資源釋放操作,以保證資源的正確釋放。
最後,對上述 Redisson 可重入非公平鎖源碼進行一下總結:
-
Redisson 加鎖時,根據 waitTime 參數是否大於 0 來決定加鎖失敗時採用等待並再次嘗試 / 快速失敗的策略;
-
Redisson 加鎖時根據 leaseTime 參數是否小於等於 0 來決定是否開啓看門狗機制進行定時續期;
-
Redisson 底層使用了 netty 實現的時間輪來進行定時續期任務的調度,執行週期爲 internalLockLeaseTime / 3,默認爲 10s。
2.2 zookeeper 實現分佈式鎖
zookeeper(後文均簡稱 zk )基於 zab 協議實現的分佈式協調服務,天生具備實現分佈式鎖的基礎條件。我們可以從 zk 的一些基本機制入手,瞭解其是如何實現分佈式鎖的。
- zab:爲了保證分佈式一致性,zk 實現了 zab(Zk Atomic Broadcast,zk 原子廣播)協議,在 zab 協議下,zk 集羣分爲 Leader 節點及 Follower 節點,其中,負責處理寫請求的 Leader 節點在集羣中是唯一的,多個 Follower 則負責同步 Leader 節點的數據,處理客戶端的讀請求。同時,zk 處理寫請求時底層數據存儲使用的是 ConcurrentHashMap,以保證併發安全;
public class NodeHashMapImpl implements NodeHashMap {
private final ConcurrentHashMap<String, DataNode> nodes;
private final boolean digestEnabled;
private final DigestCalculator digestCalculator;
private final AdHash hash;
...
}
左右滑動查看完整代碼
-
臨時順序節點:zk 的數據呈樹狀結構,樹上的每一個節點爲一個基本數據單元,稱爲 Znode。zk 可以創建一類臨時順序
(EPHEMERAL_SEQUENTIAL)節點,在滿足一定條件時會可以自動釋放;同時,同一層級的節點名稱會按節點的創建順序進行命名,第一個節點爲 xxx-0000000000,第二個節點則爲 xxx-0000000001,以此類推;
- session:zk 的服務端與客戶端使用 session 機制進行通信,簡單來說即是通過長連接來進行交互,zk 服務端會通過心跳來監控客戶端是否處於活動狀態。若客戶端長期無心跳或斷開連接,則 zk 服務端會定期關閉這些 session,主動斷開與客戶端的通信。
瞭解了上述 zk 特點,我們不難發現 zk 也是具備互斥性、自動釋放的特性的。同時,zk 由於 session 機制的存在,服務端可以感知到客戶端的狀態,因此不需要有由客戶端來進行節點續期,zk 服務端可以主動地清理失聯客戶端創建的節點,避免鎖無法釋放的問題。zk 實現分佈式鎖的主要步驟如下:
-
client1 申請加鎖,創建 /lock/xxx-lock-0000000000 節點(臨時順序節點),並監聽其父節點 /lock;
-
client1 查詢 /lock 節點下的節點列表,並判斷自己創建的 /xxx-lock-0000000000 是否爲 /lock 節點下的第一個節點;當前沒有其他客戶端加鎖,所以 client1 獲取鎖成功;
-
若 client2 此時來加鎖,則會創建 /lock/xxx-lock-0000000001 節點;此時 client2 查詢 /lock 節點下的節點列表,此時 /xxx-lock-0000000001 並非 /lock 下的第一個節點,因此加鎖不成功,此時 client2 則會監聽其上一個節點 /xxx-lock-0000000000;
-
client1 釋放鎖,client1 刪除 /xxx-lock-0000000000 節點,zk 服務端通過長連接 session 通知監聽了 /xxx-lock-0000000000 節點的 client2 來獲取鎖
-
收到釋放事件的 client2 查詢 /lock 節點下的節點列表,此時自己創建的 /xxx-lock-0000000001 爲最小節點,因此獲取鎖成功。
上述是 zk 公平鎖的一種常見實現方式。值得注意的是, zk 客戶端通常並不會實現非公平鎖。事實上,zk 上鎖的粒度不侷限於上述步驟中的客戶端,zk 客戶端每次獲取鎖請求(即每一個嘗試獲取鎖的線程)都會向 zk 服務端請求創建一個臨時順序節點。
以上述步驟爲例,如果需要實現非公平鎖,則會導致其餘的所有節點都需要監聽第一個節點 /xxx-lock-0000000000 的釋放事件,相當於所有等待鎖釋放的線程都會監聽同一個節點,這種機制無法像 Redisson 一樣把喚醒鎖的壓力分攤到客戶端上(或者說實現起來比較困難),會產生比較嚴重的驚羣效應,因此使用 zk 實現的分佈式鎖一般情況下都是公平鎖。
Curator 是一個比較常用的 zk 客戶端,我們可以通過 Curator 的加鎖過程,來了解 zk 分佈式鎖的設計原理。Curator 中比較常用的是可重入互斥公平鎖 InterProcessMutex:
InterProcessMutex mutex = new InterProcessMutex(zkClient, "/lock");
try {
// acquire方法的兩個參數:等待時長及時間單位
if (mutex.acquire(3, TimeUnit.SECONDS)) {
log.info("加鎖成功");
} else {
log.info("加鎖失敗");
}
} finally {
mutex.release();
}
InterProcessMutex 同樣提供了等待時長參數,用於設置沒有立即獲取到鎖時是快速失敗還是阻塞等待,下一步,方法會調用到
InterProcessMutex#internalLock 方法中:
private boolean internalLock(long time, TimeUnit unit) throws Exception
{
// 註釋的意思:一個LockData對象只會被一個持有鎖的線程進行修改,因此不需要對LockData進行併發控制。如此說明的原因是zk的互斥特性保證了下方attemptLock方法的互斥,由此保證了LockData不會被併發修改
/*
Note on concurrency: a given lockData instance
can be only acted on by a single thread so locking isn't necessary
*/
Thread currentThread = Thread.currentThread();
// LockData用於記錄當前持有鎖的線程數據
LockData lockData = threadData.get(currentThread);
if ( lockData != null )
{
// 線程不爲空,則進行重入,重入次數+1
// re-entering
lockData.lockCount.incrementAndGet();
return true;
}
// 向zk服務獲取分佈式鎖,getLockNodeBytes
String lockPath = internals.attemptLock(time, unit, getLockNodeBytes());
if ( lockPath != null )
{
// 若lockPath不爲空,則獲取鎖成功,記錄當前持有鎖的線程
LockData newLockData = new LockData(currentThread, lockPath);
threadData.put(currentThread, newLockData);
return true;
}
return false;
}
InterProcessMutex#internalLock 會調用到
LockInternals#attemptLock 方法:
String attemptLock(long time, TimeUnit unit, byte[] lockNodeBytes) throws Exception
{
...
while ( !isDone )
{
isDone = true;
try
{
// 創建鎖節點
ourPath = driver.createsTheLock(client, path, localLockNodeBytes);
// 判斷是否成功獲取鎖
hasTheLock = internalLockLoop(startMillis, millisToWait, ourPath);
}
catch ( KeeperException.NoNodeException e )
{
// 捕獲由於網絡中斷、session過期等原因導致的無法獲得節點異常,此處根據配置的zk客戶端重試策略決定是否重試,默認重試策略爲Exponential Backoff
...retry or not...
}
}
if ( hasTheLock )
{
return ourPath;
}
return null;
}
public String createsTheLock(CuratorFramework client, String path, byte[] lockNodeBytes) throws Exception
{
String ourPath;
if ( lockNodeBytes != null )
{
// 在其他類型的鎖實現中,lockNodeBytes可能不爲空,則根據lockNodeBytes來獲取節點路徑,此處暫不作展開
ourPath = client.create().creatingParentContainersIfNeeded().withProtection().withMode(CreateMode.EPHEMERAL_SEQUENTIAL).forPath(path, lockNodeBytes);
}
else
{
// 在可重入互斥鎖中,客戶端向zk服務端請求創建一個 EPHEMERAL_SEQUENTIAL 臨時順序節點
ourPath = client.create().creatingParentContainersIfNeeded().withProtection().withMode(CreateMode.EPHEMERAL_SEQUENTIAL).forPath(path);
}
return ourPath;
}
上述代碼中,創建鎖節點並不會產生互斥,而是會直接向 zk 服務端請求創建臨時順序節點。此時,客戶端還未真正的獲得鎖,判斷加鎖成功的核心邏輯在
LockInternals#internalLockLoop 方法中:
private boolean internalLockLoop(long startMillis, Long millisToWait, String ourPath) throws Exception
{
boolean haveTheLock = false;
boolean doDelete = false;
try
{
if ( revocable.get() != null )
{
// curator鎖撤銷機制,通過實現Curator中的Revocable接口的makeRevocable方法,可以將鎖設置爲可撤銷鎖,其他線程可以在符合條件時將鎖撤銷,此處暫不涉及
client.getData().usingWatcher(revocableWatcher).forPath(ourPath);
}
// 客戶端實例就緒,則嘗試循環獲取鎖
while ( (client.getState() == CuratorFrameworkState.STARTED) && !haveTheLock )
{
// 獲取當前父節點下的排好序的子節點
List<String> children = getSortedChildren();
// 得到當前節點名
String sequenceNodeName = ourPath.substring(basePath.length() + 1); // +1 to include the slash
// 根據 children 列表與當前節點名,計算當前節點是否爲第一個節點,若不是第一個節點,則在 PredicateResults中返回需要監聽的前一個節點節點,若爲最小節點,則獲取鎖成功
PredicateResults predicateResults = driver.getsTheLock(client, children, sequenceNodeName, maxLeases);
if ( predicateResults.getsTheLock() )
{
// 獲取鎖成功
haveTheLock = true;
}
else
{
// 拼接前一個節點的節點路徑
String previousSequencePath = basePath + "/" + predicateResults.getPathToWatch();
synchronized(this)
{
try
{
// 將前一個節點的監聽器放到當前客戶端中,當前一個節點被釋放時,就會喚醒當前客戶端
client.getData().usingWatcher(watcher).forPath(previousSequencePath);
if ( millisToWait != null )
{
millisToWait -= (System.currentTimeMillis() - startMillis);
startMillis = System.currentTimeMillis();
// 計算剩餘等待時長,若等待時長小於0,則不再嘗試獲取鎖,並標記當前線程創建的節點需要刪除
if ( millisToWait <= 0 )
{
doDelete = true; // timed out - delete our node
break;
}
// 若等待時長大於0,則阻塞線程,等待鎖釋放
wait(millisToWait);
}
else
{
// 在其他的一些加鎖場景中,默認會持久等待到鎖釋放位置,當前可重入互斥鎖暫不涉及
wait();
}
}
catch ( KeeperException.NoNodeException e )
{
// it has been deleted (i.e. lock released). Try to acquire again
}
}
}
}
}
catch ( Exception e )
{
ThreadUtils.checkInterrupted(e);
doDelete = true;
throw e;
}
finally
{
if ( doDelete )
{
// 刪除當前節點
deleteOurPath(ourPath);
}
}
return haveTheLock;
}
private synchronized void notifyFromWatcher()
{
// 當zk客戶端收到鎖釋放事件時,會遍歷當前客戶端註冊過的所有的監聽器,並找到合適的監聽器進行回調,最終通過notifyAll喚醒監聽被釋放節點的線程
notifyAll();
}
上述 curator 加鎖的核心代碼雖然比較長,但整體邏輯與我們前面分析過的加鎖邏輯是一致的,主要做了三件事:
-
獲取當前父節點的有序子節點序列;
-
判斷當前節點是否爲第一個節點;
-
若爲第一個節點,則獲取鎖成功,否則爲當前 zk 客戶端增加一個前一節點的監聽器,如果此時還在等待時長內,則使用 wait 方法掛起線程,否則刪除當前節點。
三、總結——如何選擇合適的分佈式併發安全解決方案?
- 繞不過的 CAP 理論
Redis 與 zk 由於客戶端與服務端的交互機制上存在比較大的差異,相應的分佈式鎖實現原理也有所不同。兩者都是優秀的支持分佈式部署的系統,自然具備分區容錯性,但分佈式系統總繞不過去一個經典的問題——CAP 理論:在滿足了分區容錯性的前提下,分佈式系統只能滿足可用性、數據一致性兩者其一。
對比之下,Redis 在可用性上更勝一籌,屬於 AP 系統;zk 具備更強的數據一致性,屬於 CP 系統,而基於 AP、CP 的特性去實現的分佈式鎖,自然也會存在不同程度的問題。
- Redis 分佈式鎖的一致性問題
Redis 的集羣模式並沒有嚴格地實現分佈式共識算法,因此 Redis 是不具備一致性的。爲了保證高可用性,Redis 集羣的主從節點使用的是異步複製,從節點並不保證與主節點數據一致,只能儘量的追趕主節點的最新數據;因此,當主節點發生故障,進行主從切換時,實際上有可能會發生數據丟失問題:
- zk 性能及可用性問題
zk 實現了 zab 算法,在數據一致性上給出了比較可靠的方案,但是由於 zab 協議的兩階段提交要求所有節點的寫請求處理就緒後,纔算寫入成功,這無疑會導致性能的下降。此外,在 zk 集羣發生 leader 重選舉的過程中,對外會表現爲不可用狀態,此時可用性上就會存在問題:
由上可知,分佈式併發安全解決方案並不存在完美的 “銀彈”,因此更多時候我們應當根據自身業務情況,合理地選擇合適的解決方案。
顯而易見地,如果業務場景有較高的請求量,併發競爭比較激烈,對性能有較高要求,此時通過 Redis 來實現分佈式鎖會是比較合適的方案。但是如果業務場景對數據一致性要求比較高,或是系統交互鏈路比較長,一但發生數據不一致時,會導致系統出現難以恢復的問題時,採用 zk 來實現分佈式鎖則是更優的解決方案。
- 上述方案都無法滿足要求?
總體上看,Redis 由於其本身的高性能可以滿足大多數場景下的性能要求,而 zk 則保證了較高數據一致性。但倘若遇到了既要求高性能、又要求數據一致性、還要引入鎖機制來保障併發安全的場景,這時候就必須重新審視系統設計是否合理了,畢竟高併發與鎖是一對矛盾,可用性與數據一致性是一對矛盾,我們應該通過良好的方案、系統設計,來避免讓我們的系統陷入這些矛盾的困境中。
本文由 Readfog 進行 AMP 轉碼,版權歸原作者所有。
來源:https://mp.weixin.qq.com/s/7wL07VWiVcz0nnD6HKRK2Q