分佈式鎖的實現原理

介紹分佈式鎖的實現原理。

一、分佈式鎖概述

分佈式鎖,顧名思義,就是在分佈式環境下使用的鎖。衆所周知,在併發編程中,我們經常需要藉助併發控制工具,如 mutex、synchronized 等,來保障線程安全。但是,這種線程安全僅作用在同一內存環境中。在實際業務中,爲了保障服務的可靠性,我們通常會採用多節點進行部署。在這種分佈式情況下,各實例間的內存不共享,線程安全並不能保證併發安全,如下例,同一實例中線程 A 與線程 B 之間的併發安全並不能保證實例 1 與實例 2 之間的併發安全:

因此,當遇到分佈式系統的併發安全問題時,我們就可能會需要引入分佈式鎖來解決。  

用於實現分佈式鎖的組件通常都會具備以下的一些特性:

目前開源社區中常見的分佈式鎖解決方案,大多是基於具備集羣部署能力的 key-value 存儲中間件來實現,最爲常用的方案基本上是基於 Redis、zookeeper 來實現,筆者將從上述分佈式鎖的特性出發,介紹一下這兩類的分佈式鎖解決方案的優缺點。

彩蛋提醒

我們爲大家準備了抽獎福利,請繼續閱讀下去。

二、分佈式鎖的實現原理

2.1  Redis 實現分佈式鎖  

Redis 由於其高性能、使用及部署便利性,在很多場景下是實現分佈式鎖的首選。首先我們看下 Redis 是如何實現互斥性的。在單機部署的模式下,Redis 由於其單線程處理命令的線程模型,天然的具備互斥能力;而在哨兵 / 集羣模式下,寫命令也是單獨發送到某個單獨節點上進行處理,可以保證互斥性;其核心的命令是 set if not exist:

SET lockKey lockValue NX

成功設置 lockValue 的實例,就相當於搶鎖成功。但如果持有鎖的實例宕機,因爲 Redis 服務端並沒有感知客戶端狀態的能力,因此會出現鎖無法釋放的問題:

這種情況下,就需要給 key 設置一個過期時間 expireTime:

SET lockKey lockValue EX expireTime NX

左右滑動查看完整代碼

如果持有鎖的實例宕機無法釋放鎖,則鎖會自動過期,這樣可以就避免鎖無法釋放的問題。在一些簡單的場景下,通過該方式實現的分佈式鎖已經可以滿足需求。但這種方式存在一個明顯問題:如果業務的實際處理時間比鎖過期時間長,鎖就會被誤釋放,導致其他實例也可以加鎖:

這種情況下,就需要通過其他機制來保證鎖在業務處理結束後再釋放,一個常用的方式就是通過後臺線程的方式來實現鎖的自動續期。

Redssion 是開源社區中比較受歡迎的一個 Java 語言實現的 Redis 客戶端,其對 Java 中 Lock 接口定義進行擴展,實現了 Redis 分佈式鎖,並通過 watchDog 機制(本質上即是後臺線程運作)來對鎖進行自動續期。以下是一個簡單的 Reddison 分佈式鎖的使用例子:

RLock rLock = RedissonClient.getLock("test-lock");
try {
    if (rLock.tryLock()) {
        // do something
    }
} finally {
    rLock.unlock();
}

Redssion 的默認實現 RedissonLock 爲可重入互斥非公平鎖,其 tryLock 方法會基於三個可選參數執行:

我們不妨通過參數最全的 

RedissonLock#tryLock(long waitTime, long leaseTime, TimeUnit unit) 方法源碼來一探其完整的加鎖過程:

public boolean tryLock(long waitTime, long leaseTime, TimeUnit unit) throws InterruptedException {
    ...
    // tryAcquire方法返回鎖的剩餘有效時長ttl,如果未上鎖,則爲null
    Long ttl = tryAcquire(waitTime, leaseTime, unit, threadId);
    if (ttl == null) {
        // 獲取鎖成功
        return true;
    }
    // 計算剩餘等待時長,剩餘等待時長小於0,則不再嘗試獲取鎖,獲取鎖失敗,後續有多處同樣的判斷邏輯,將精簡省略
   time -= System.currentTimeMillis() - current;
    if (time <= 0) {
        acquireFailed(waitTime, unit, threadId);
        return false;
    }
    // 等待時長大於0,則會對鎖釋放的事件進行訂閱,持有鎖的客戶端在鎖釋放時會發布鎖釋放事件通知其他客戶端搶鎖,由此可得知該默認實現爲非公平鎖。
    // Redisson對Redis發佈訂閱機制的實現,底層大量使用了CompletableFuture、CompletionStage等接口來編寫異步回調代碼,感興趣的讀者可以詳細瞭解,此處不作展開
    CompletableFuture<RedissonLockEntry> subscribeFuture = subscribe(threadId);
    try {
        subscribeFuture.get(time, TimeUnit.MILLISECONDS);
    } catch (TimeoutException e) {
        ...
    } catch (ExecutionException e) {
        ...
    }
    try {
        ...
        // 循環嘗試獲取鎖
        while (true) {
            long currentTime = System.currentTimeMillis();
            ttl = tryAcquire(waitTime, leaseTime, unit, threadId);
            // lock acquired
            if (ttl == null) {
                return true;
            }
            ...
            // 此處通過信號量來將線程阻塞一定時間,避免無效的申請鎖浪費資源;在阻塞期間,如果收到了鎖釋放的事件,則會通過信號量提前喚起阻塞線程,重新嘗試獲取鎖;
            currentTime = System.currentTimeMillis();
            if (ttl >= 0 && ttl < time) {
                // 若ttl(鎖過期時長)小於time(剩餘等待時長),則將線程阻塞ttl
                commandExecutor.getNow(subscribeFuture).getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS);
            } else {
                // 若等待時長小於ttl,則將線程阻塞time
                commandExecutor.getNow(subscribeFuture).getLatch().tryAcquire(time, TimeUnit.MILLISECONDS);
            }
            ...
        }
    } finally {
        // 取消訂閱
        unsubscribe(commandExecutor.getNow(subscribeFuture), threadId);
    }
}

上述代碼邏輯主要集中在處理 waitTime 參數,在併發競爭不激烈、可以容忍一定的等待時間的情況下,合理設置 waitTime 參數可以提高業務併發運行成功率,避免搶鎖失敗直接返回錯誤;但在併發競爭激烈、對性能有較高要求時,建議不設置 waitTime,或者直接使用沒有 waitTime 參數的 lock() 方法,通過快速失敗來提高系統吞吐量。

一個比較值得注意的點是,如果設置了 waitTime 參數,則 Redisson 通過將 RedissonLockEntry 中信號量(Semaphore)的許可證數初始化爲 0 來達到一定程度的限流,保證鎖釋放後只有一個等待中的線程會被喚醒去請求 Redis 服務端,把喚醒等待線程的工作分攤到各個客戶端實例上,可以很大程度上緩解非公平鎖給 Redis 服務端帶來的驚羣效應壓力。

public class RedissonLockEntry implements PubSubEntry<RedissonLockEntry> {
    ...
    private final Semaphore latch;
    public RedissonLockEntry(CompletableFuture<RedissonLockEntry> promise) {
        super();
        //  RedissonLockEntry 中的Semaphore的許可證數初始化爲0
        this.latch = new Semaphore(0);
        this.promise = promise;
    }
    ...
}

獲取鎖的核心邏輯,會通過 

RedissonLock#tryAcquire 

方法調用到 RedissonLock#tryAcquireAsync 方法。

private RFuture<Long> tryAcquireAsync(long waitTime, long leaseTime, TimeUnit unit, long threadId) {
    RFuture<Long> ttlRemainingFuture;
    if (leaseTime > 0) {
        // 若leaseTime大於零,會設置鎖的租期爲leaseTime
        ttlRemainingFuture = tryLockInnerAsync(waitTime, leaseTime, unit, threadId, RedisCommands.EVAL_LONG);
    } else {
        // 若leaseTime小於或等於零,會設置鎖的租期爲internalLockLeaseTime,這是一個通過lockWatchdogTimeout配置的值,默認爲30s
        ttlRemainingFuture = tryLockInnerAsync(waitTime, internalLockLeaseTime,
                TimeUnit.MILLISECONDS, threadId, RedisCommands.EVAL_LONG);
    }
    // 此處的handleNoSync方法是爲了解決Redis發生故障轉移,集羣拓撲改變後,只有持有鎖的客戶端能再次獲得鎖的bug,爲3.20.1版本修復,詳見Redisson issue#4822
    CompletionStage<Long> s = handleNoSync(threadId, ttlRemainingFuture);
    ttlRemainingFuture = new CompletableFutureWrapper<>(s);
    // 根據加鎖情況來進行後續處理
    CompletionStage<Long> f = ttlRemainingFuture.thenApply(ttlRemaining -> {
        // lock acquired
        // 若ttl爲空,說明加鎖不成功
        if (ttlRemaining == null) {
            if (leaseTime > 0) {
                // 若leaseTime>0,則將internalLockLeaseTime變量設置爲leaseTime,以便後續解鎖使用
                internalLockLeaseTime = unit.toMillis(leaseTime);
            } else {
                // 若leaseTime<=0,則開啓看門狗機制,通過定時任務進行鎖續期
                scheduleExpirationRenewal(threadId);
            }
        }
        return ttlRemaining;
    });
    return new CompletableFutureWrapper<>(f);
}
// 加鎖的lua腳本
<T> RFuture<T> tryLockInnerAsync(long waitTime, long leaseTime, TimeUnit unit, long threadId, RedisStrictCommand<T> command) {
    return evalWriteAsync(getRawName(), LongCodec.INSTANCE, command,
            "if ((Redis.call('exists', KEYS[1]) == 0) " +
                        "or (Redis.call('hexists', KEYS[1], ARGV[2]) == 1)) then " +
                    "Redis.call('hincrby', KEYS[1], ARGV[2], 1); " +
                    "Redis.call('pexpire', KEYS[1], ARGV[1]); " +
                    "return nil; " +
                "end; " +
                "return Redis.call('pttl', KEYS[1]);",
            Collections.singletonList(getRawName()), unit.toMillis(leaseTime), getLockName(threadId));
}

可以看到,若 leaseTime 大於 0,則不會開啓看門狗機制,鎖在過期後即失效,在使用時請務必留意。上述代碼中執行的 scheduleExpirationRenewal 方法即爲看門狗機制的實現邏輯:

protected void scheduleExpirationRenewal(long threadId) {
    // 每個鎖都會對應一個ExpirationEntry類,第一次加鎖時不存在oldEntry
    ExpirationEntry = new ExpirationEntry();
    ExpirationEntry oldEntry = EXPIRATION_RENEWAL_MAP.putIfAbsent(getEntryName(), entry);
    if (oldEntry != null) {
        // 非首次加鎖,重入計數,不作其他操作
        oldEntry.addThreadId(threadId);
    } else {
        // 首次加鎖,調用renewExpiration()方法進行自動續期
        entry.addThreadId(threadId);
        try {
            renewExpiration();
        } finally {
            // 若當前線程被中斷,則取消對鎖的自動續期。
            if (Thread.currentThread().isInterrupted()) {
                cancelExpirationRenewal(threadId);
            }
        }
    }
}
private void renewExpiration() {
    ...
    // 此處使用的是netty的時間輪來執行定時續期,此處不對時間輪做展開,感興趣的讀者可詳細瞭解
    Timeout task = getServiceManager().newTimeout(new TimerTask() {
        @Override
        public void run(Timeout timeout) throws Exception {
            ...
            CompletionStage<Boolean> future = renewExpirationAsync(threadId);
            future.whenComplete((res, e) -> {
                if (e != null) {
                    log.error("Can't update lock {} expiration", getRawName(), e);
                    EXPIRATION_RENEWAL_MAP.remove(getEntryName());
                    return;
                }
                if (res) {
                    // 若續期成功,則遞歸調用,等待任務的下一次執行
                    renewExpiration();
                } else {
                    // 若續期結果爲false,說明鎖已經過期了,或鎖易主了,則清理當前線程關聯的信息,等待線程結束
                    cancelExpirationRenewal(null);
                }
            });
        }
        // 時間輪的執行週期爲internalLockLeaseTime / 3,即默認情況下,internalLockLeaseTime爲30s時,每10s觸發一次自動續期
    }, internalLockLeaseTime / 3, TimeUnit.MILLISECONDS);
    ee.setTimeout(task);
}
protected CompletionStage<Boolean> renewExpirationAsync(long threadId) {
    // 執行重置過期時間的lua腳本
    return evalWriteAsync(getRawName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,
            "if (Redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " +
                    "Redis.call('pexpire', KEYS[1], ARGV[1]); " +
                    "return 1; " +
                    "end; " +
                    "return 0;",
            Collections.singletonList(getRawName()),
            internalLockLeaseTime, getLockName(threadId));
}

上面一段代碼即是看門狗調度的核心代碼,本質上即是通過定時調度線程執行 lua 腳本來進行鎖續期。值得留意的是 scheduleExpirationRenewal 

方法中的 ExpirationEntry,該對象與鎖一一關聯,會存儲嘗試獲取該鎖的線程(無論是否獲取成功)以及重入鎖的次數,在鎖失效 / 鎖釋放時,會根據該對象中存儲的線程逐一進行資源釋放操作,以保證資源的正確釋放。

最後,對上述 Redisson 可重入非公平鎖源碼進行一下總結:

2.2 zookeeper 實現分佈式鎖

zookeeper(後文均簡稱 zk )基於 zab 協議實現的分佈式協調服務,天生具備實現分佈式鎖的基礎條件。我們可以從 zk 的一些基本機制入手,瞭解其是如何實現分佈式鎖的。

public class NodeHashMapImpl implements NodeHashMap {
    private final ConcurrentHashMap<String, DataNode> nodes;
    private final boolean digestEnabled;
    private final DigestCalculator digestCalculator;
    private final AdHash hash;
    ...
}

左右滑動查看完整代碼

瞭解了上述 zk 特點,我們不難發現 zk 也是具備互斥性、自動釋放的特性的。同時,zk 由於 session 機制的存在,服務端可以感知到客戶端的狀態,因此不需要有由客戶端來進行節點續期,zk 服務端可以主動地清理失聯客戶端創建的節點,避免鎖無法釋放的問題。zk 實現分佈式鎖的主要步驟如下:

  1. client1 申請加鎖,創建 /lock/xxx-lock-0000000000 節點(臨時順序節點),並監聽其父節點 /lock;

  2. client1 查詢 /lock 節點下的節點列表,並判斷自己創建的 /xxx-lock-0000000000 是否爲 /lock 節點下的第一個節點;當前沒有其他客戶端加鎖,所以 client1 獲取鎖成功;

  3. 若 client2 此時來加鎖,則會創建 /lock/xxx-lock-0000000001 節點;此時 client2 查詢 /lock 節點下的節點列表,此時 /xxx-lock-0000000001 並非 /lock 下的第一個節點,因此加鎖不成功,此時 client2 則會監聽其上一個節點 /xxx-lock-0000000000;

  4. client1 釋放鎖,client1 刪除 /xxx-lock-0000000000 節點,zk 服務端通過長連接 session 通知監聽了 /xxx-lock-0000000000 節點的 client2 來獲取鎖

  5. 收到釋放事件的 client2 查詢 /lock 節點下的節點列表,此時自己創建的 /xxx-lock-0000000001 爲最小節點,因此獲取鎖成功。

上述是 zk 公平鎖的一種常見實現方式。值得注意的是, zk 客戶端通常並不會實現非公平鎖。事實上,zk 上鎖的粒度不侷限於上述步驟中的客戶端,zk 客戶端每次獲取鎖請求(即每一個嘗試獲取鎖的線程)都會向 zk 服務端請求創建一個臨時順序節點。

以上述步驟爲例,如果需要實現非公平鎖,則會導致其餘的所有節點都需要監聽第一個節點 /xxx-lock-0000000000 的釋放事件,相當於所有等待鎖釋放的線程都會監聽同一個節點,這種機制無法像 Redisson 一樣把喚醒鎖的壓力分攤到客戶端上(或者說實現起來比較困難),會產生比較嚴重的驚羣效應,因此使用 zk 實現的分佈式鎖一般情況下都是公平鎖。

Curator 是一個比較常用的 zk 客戶端,我們可以通過 Curator 的加鎖過程,來了解 zk 分佈式鎖的設計原理。Curator 中比較常用的是可重入互斥公平鎖 InterProcessMutex:

InterProcessMutex mutex = new InterProcessMutex(zkClient, "/lock");
try {
    // acquire方法的兩個參數:等待時長及時間單位
    if (mutex.acquire(3, TimeUnit.SECONDS)) {
        log.info("加鎖成功");
    } else {
        log.info("加鎖失敗");
    }
} finally {
    mutex.release();
}

InterProcessMutex 同樣提供了等待時長參數,用於設置沒有立即獲取到鎖時是快速失敗還是阻塞等待,下一步,方法會調用到 

InterProcessMutex#internalLock 方法中:

private boolean internalLock(long time, TimeUnit unit) throws Exception
{
    // 註釋的意思:一個LockData對象只會被一個持有鎖的線程進行修改,因此不需要對LockData進行併發控制。如此說明的原因是zk的互斥特性保證了下方attemptLock方法的互斥,由此保證了LockData不會被併發修改
    /*
        Note on concurrency: a given lockData instance
        can be only acted on by a single thread so locking isn't necessary
    */
    Thread currentThread = Thread.currentThread();
    // LockData用於記錄當前持有鎖的線程數據
    LockData lockData = threadData.get(currentThread);
    if ( lockData != null )
    {
        // 線程不爲空,則進行重入,重入次數+1
        // re-entering
        lockData.lockCount.incrementAndGet();
        return true;
    }
    // 向zk服務獲取分佈式鎖,getLockNodeBytes
    String lockPath = internals.attemptLock(time, unit, getLockNodeBytes());
    if ( lockPath != null )
    {
        // 若lockPath不爲空,則獲取鎖成功,記錄當前持有鎖的線程
        LockData newLockData = new LockData(currentThread, lockPath);
        threadData.put(currentThread, newLockData);
        return true;
    }
    return false;
}

InterProcessMutex#internalLock 會調用到

LockInternals#attemptLock 方法:

String attemptLock(long time, TimeUnit unit, byte[] lockNodeBytes) throws Exception
{
    ...
    while ( !isDone )
    {
        isDone = true;
        try
        {
            // 創建鎖節點
            ourPath = driver.createsTheLock(client, path, localLockNodeBytes);
            // 判斷是否成功獲取鎖
            hasTheLock = internalLockLoop(startMillis, millisToWait, ourPath);
        }
        catch ( KeeperException.NoNodeException e )
        {
            // 捕獲由於網絡中斷、session過期等原因導致的無法獲得節點異常,此處根據配置的zk客戶端重試策略決定是否重試,默認重試策略爲Exponential Backoff
            ...retry or not...
        }
    }
    if ( hasTheLock )
    {
        return ourPath;
    }
    return null;
}
public String createsTheLock(CuratorFramework client, String path, byte[] lockNodeBytes) throws Exception
{
    String ourPath;
    if ( lockNodeBytes != null )
    {  
        // 在其他類型的鎖實現中,lockNodeBytes可能不爲空,則根據lockNodeBytes來獲取節點路徑,此處暫不作展開
        ourPath = client.create().creatingParentContainersIfNeeded().withProtection().withMode(CreateMode.EPHEMERAL_SEQUENTIAL).forPath(path, lockNodeBytes);
    }
    else
    {
        // 在可重入互斥鎖中,客戶端向zk服務端請求創建一個 EPHEMERAL_SEQUENTIAL 臨時順序節點
        ourPath = client.create().creatingParentContainersIfNeeded().withProtection().withMode(CreateMode.EPHEMERAL_SEQUENTIAL).forPath(path);
    }
    return ourPath;
}

上述代碼中,創建鎖節點並不會產生互斥,而是會直接向 zk 服務端請求創建臨時順序節點。此時,客戶端還未真正的獲得鎖,判斷加鎖成功的核心邏輯在

 LockInternals#internalLockLoop 方法中:

private boolean internalLockLoop(long startMillis, Long millisToWait, String ourPath) throws Exception
{
    boolean     haveTheLock = false;
    boolean     doDelete = false;
    try
    {
        if ( revocable.get() != null )
        {  
            // curator鎖撤銷機制,通過實現Curator中的Revocable接口的makeRevocable方法,可以將鎖設置爲可撤銷鎖,其他線程可以在符合條件時將鎖撤銷,此處暫不涉及
            client.getData().usingWatcher(revocableWatcher).forPath(ourPath);
        }
        // 客戶端實例就緒,則嘗試循環獲取鎖
        while ( (client.getState() == CuratorFrameworkState.STARTED) && !haveTheLock ) 
        {
            // 獲取當前父節點下的排好序的子節點
            List<String>        children = getSortedChildren();
            // 得到當前節點名
            String              sequenceNodeName = ourPath.substring(basePath.length() + 1); // +1 to include the slash
            // 根據 children 列表與當前節點名,計算當前節點是否爲第一個節點,若不是第一個節點,則在 PredicateResults中返回需要監聽的前一個節點節點,若爲最小節點,則獲取鎖成功
            PredicateResults    predicateResults = driver.getsTheLock(client, children, sequenceNodeName, maxLeases);
            if ( predicateResults.getsTheLock() )
            {
                // 獲取鎖成功
                haveTheLock = true;
            }
            else
            {
                // 拼接前一個節點的節點路徑
                String  previousSequencePath = basePath + "/" + predicateResults.getPathToWatch();
                synchronized(this)
                {
                    try
                    {
                        // 將前一個節點的監聽器放到當前客戶端中,當前一個節點被釋放時,就會喚醒當前客戶端
                        client.getData().usingWatcher(watcher).forPath(previousSequencePath);
                        if ( millisToWait != null )
                        {
                            millisToWait -= (System.currentTimeMillis() - startMillis);
                            startMillis = System.currentTimeMillis();
                            // 計算剩餘等待時長,若等待時長小於0,則不再嘗試獲取鎖,並標記當前線程創建的節點需要刪除
                            if ( millisToWait <= 0 )
                            {
                                doDelete = true;    // timed out - delete our node
                                break;
                            }
                            // 若等待時長大於0,則阻塞線程,等待鎖釋放
                            wait(millisToWait);
                        }
                        else
                        {
                            // 在其他的一些加鎖場景中,默認會持久等待到鎖釋放位置,當前可重入互斥鎖暫不涉及
                            wait();
                        }
                    }
                    catch ( KeeperException.NoNodeException e )
                    {
                        // it has been deleted (i.e. lock released). Try to acquire again
                    }
                }
            }
        }
    }
    catch ( Exception e )
    {
        ThreadUtils.checkInterrupted(e);
        doDelete = true;
        throw e;
    }
    finally
    {
        if ( doDelete )
        {
            // 刪除當前節點
            deleteOurPath(ourPath);
        }
    }
    return haveTheLock;
}
private synchronized void notifyFromWatcher()
{
    // 當zk客戶端收到鎖釋放事件時,會遍歷當前客戶端註冊過的所有的監聽器,並找到合適的監聽器進行回調,最終通過notifyAll喚醒監聽被釋放節點的線程
    notifyAll();
}

上述 curator 加鎖的核心代碼雖然比較長,但整體邏輯與我們前面分析過的加鎖邏輯是一致的,主要做了三件事:

三、總結——如何選擇合適的分佈式併發安全解決方案?

Redis 與 zk 由於客戶端與服務端的交互機制上存在比較大的差異,相應的分佈式鎖實現原理也有所不同。兩者都是優秀的支持分佈式部署的系統,自然具備分區容錯性,但分佈式系統總繞不過去一個經典的問題——CAP 理論:在滿足了分區容錯性的前提下,分佈式系統只能滿足可用性、數據一致性兩者其一。

對比之下,Redis 在可用性上更勝一籌,屬於 AP 系統;zk 具備更強的數據一致性,屬於 CP 系統,而基於 AP、CP 的特性去實現的分佈式鎖,自然也會存在不同程度的問題。

Redis 的集羣模式並沒有嚴格地實現分佈式共識算法,因此 Redis 是不具備一致性的。爲了保證高可用性,Redis 集羣的主從節點使用的是異步複製,從節點並不保證與主節點數據一致,只能儘量的追趕主節點的最新數據;因此,當主節點發生故障,進行主從切換時,實際上有可能會發生數據丟失問題:

zk 實現了 zab 算法,在數據一致性上給出了比較可靠的方案,但是由於 zab 協議的兩階段提交要求所有節點的寫請求處理就緒後,纔算寫入成功,這無疑會導致性能的下降。此外,在 zk 集羣發生 leader 重選舉的過程中,對外會表現爲不可用狀態,此時可用性上就會存在問題:

由上可知,分佈式併發安全解決方案並不存在完美的 “銀彈”,因此更多時候我們應當根據自身業務情況,合理地選擇合適的解決方案。

顯而易見地,如果業務場景有較高的請求量,併發競爭比較激烈,對性能有較高要求,此時通過 Redis 來實現分佈式鎖會是比較合適的方案。但是如果業務場景對數據一致性要求比較高,或是系統交互鏈路比較長,一但發生數據不一致時,會導致系統出現難以恢復的問題時,採用 zk 來實現分佈式鎖則是更優的解決方案。

總體上看,Redis 由於其本身的高性能可以滿足大多數場景下的性能要求,而 zk 則保證了較高數據一致性。但倘若遇到了既要求高性能、又要求數據一致性、還要引入鎖機制來保障併發安全的場景,這時候就必須重新審視系統設計是否合理了,畢竟高併發與鎖是一對矛盾,可用性與數據一致性是一對矛盾,我們應該通過良好的方案、系統設計,來避免讓我們的系統陷入這些矛盾的困境中。

本文由 Readfog 進行 AMP 轉碼,版權歸原作者所有。
來源https://mp.weixin.qq.com/s/7wL07VWiVcz0nnD6HKRK2Q