Redisson 高性能 Redis 分佈式鎖源碼分析

Redisson 實現分佈式鎖的機制如下:

原理描述

  1. 先線程 1 獲取鎖,如果獲取鎖成功,那麼會開啓一個後臺線程,每次間隔 10 秒進行續期。

  2. 併發情況,線程 2 會進行加鎖,如果無法獲取鎖,那麼就會進行自旋等待,等待到達一定次數過後,就會進行線程阻塞,並且訂閱解鎖消息。

  3. 當線程 1 釋放鎖之後,會觸發 redis 的解鎖消息,消息的觀察者會觀察到然後去喚醒解鎖的邏輯,線程 2 繼續競爭鎖。

  4. 對於鎖的重入,Redisson 是通過 hash 爲數據類型的,會存儲當前線程的 tid (本質是生成的 uuid 唯一 id).

測試代碼

下面我們將以一個秒殺的例子來說明:

依賴版本

implementation 'org.redisson:redisson-spring-boot-starter:3.17.0'

測試代碼

下面是模擬一個商品秒殺的場景,示例代碼如下:

public class RedissonTest {

    public static void main(String[] args) {
        //1. 配置部分
        Config config = new Config();
        String address = "redis://127.0.0.1:6379";
        SingleServerConfig serverConfig = config.useSingleServer();
        serverConfig.setAddress(address);
        serverConfig.setDatabase(0);
        config.setLockWatchdogTimeout(5000);
        Redisson redisson = (Redisson) Redisson.create(config);


        RLock rLock = redisson.getLock("goods:1000:1");
        //2. 加鎖
        rLock.lock();
        try {
            System.out.println("todo 邏輯處理 1000000.");
        } finally {
            if (rLock.isLocked() && rLock.isHeldByCurrentThread()) {
                //3. 解鎖
                rLock.unlock();
            }
        }

    }
}

加鎖設計

rLock.lock();是加鎖的核心代碼,我們一起來看看調用棧加鎖的核心方法是:org.redisson.RedissonLock#tryLockInnerAsync

<T> RFuture<T> tryLockInnerAsync(long waitTime, long leaseTime, TimeUnit unit, long threadId, RedisStrictCommand<T> command) {
        return evalWriteAsync(getRawName(), LongCodec.INSTANCE, command,
                "if (redis.call('exists', KEYS[1]) == 0) then " +
                        "redis.call('hincrby', KEYS[1], ARGV[2], 1); " +
                        "redis.call('pexpire', KEYS[1], ARGV[1]); " +
                        "return nil; " +
                        "end; " +
                        "if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " +
                        "redis.call('hincrby', KEYS[1], ARGV[2], 1); " +
                        "redis.call('pexpire', KEYS[1], ARGV[1]); " +
                        "return nil; " +
                        "end; " +
                        "return redis.call('pttl', KEYS[1]);",
                Collections.singletonList(getRawName()), unit.toMillis(leaseTime), getLockName(threadId));
    }

其實它的本質是調用一段 LUA 腳本進行加鎖, 需要注意的是這個地方使用的數據類型是 hash。這裏是用 hash 的好處就是可以通過同一個 key 來存儲重入的 tid

鎖續期設計

鎖的續期是在 org.redisson.RedissonLock#tryAcquireAsync方法中調用 scheduleExpirationRenewal實現的。

續期需要注意的是,看門狗是設置在主線程的延遲隊列的線程中。

這裏的好處就是如果我在一個進程中,同時加了 1000 把鎖,我們不需要啓動 1000 個子線程去續期,只需要創建 1000 個續期任務對象即可,在到達續期時間纔會喚醒續期線程。

tryAcquireAsync 代碼如下:

private <T> RFuture<Long> tryAcquireAsync(long waitTime, long leaseTime, TimeUnit unit, long threadId) {
    RFuture<Long> ttlRemainingFuture;
    if (leaseTime != -1) {
        ttlRemainingFuture = tryLockInnerAsync(waitTime, leaseTime, unit, threadId, RedisCommands.EVAL_LONG);
    } else {
        ttlRemainingFuture = tryLockInnerAsync(waitTime, internalLockLeaseTime,
                TimeUnit.MILLISECONDS, threadId, RedisCommands.EVAL_LONG);
    }
    
    CompletionStage<Long> f = ttlRemainingFuture.thenApply(ttlRemaining -> {
        // lock acquired
        if (ttlRemaining == null) {
            if (leaseTime != -1) {
                internalLockLeaseTime = unit.toMillis(leaseTime);
            } else {
                // 鎖過期時間續期
                scheduleExpirationRenewal(threadId);
            }
        }
        return ttlRemaining;
    });
    return new CompletableFutureWrapper<>(f);
}

鎖續期 scheduleExpirationRenewal代碼如下:

protected void scheduleExpirationRenewal(long threadId) {
    ExpirationEntry entry = new ExpirationEntry();
    ExpirationEntry oldEntry = EXPIRATION_RENEWAL_MAP.putIfAbsent(getEntryName(), entry);
    if (oldEntry != null) {
        oldEntry.addThreadId(threadId);
    } else {
        entry.addThreadId(threadId);
        try {
            renewExpiration();
        } finally {
            if (Thread.currentThread().isInterrupted()) {
                cancelExpirationRenewal(threadId);
            }
        }
    }
}

然後在調用 renewExpiration(); 執行續期邏輯, 其實這裏是一個定時任務 + 遞歸的方式實現續期的,用定時任務的好處就是不用去開 N 個字線程,只需要創建對應的任務對象即可。

備註:如果超級極端的情況下 N 把鎖,同時加鎖,同時需求。我們可以考慮在鎖的有效期上,給它加一個浮動時間比如 100 - 500ms. 這樣就能一定程度上避免 (參考的是緩存失效 / 擊穿的解決方案)

private void renewExpiration() {
    ExpirationEntry ee = EXPIRATION_RENEWAL_MAP.get(getEntryName());
    if (ee == null) {
        return;
    }

    // 創建延遲任務
    Timeout task = commandExecutor.getConnectionManager().newTimeout(new TimerTask() {
        @Override
        public void run(Timeout timeout) throws Exception {
            ExpirationEntry ent = EXPIRATION_RENEWAL_MAP.get(getEntryName());
            if (ent == null) {
                return;
            }
            Long threadId = ent.getFirstThreadId();
            if (threadId == null) {
                return;
            }

            // 真正的續期,調用 LUA 腳本續期
            RFuture<Boolean> future = renewExpirationAsync(threadId);
            future.whenComplete((res, e) -> {
                if (e != null) {
                    log.error("Can't update lock " + getRawName() + " expiration", e);
                    EXPIRATION_RENEWAL_MAP.remove(getEntryName());
                    return;
                }

                // 如果續期成功
                if (res) {
                    // reschedule itself
                    renewExpiration();
                } else {
                    cancelExpirationRenewal(null);
                }
            });
        }
    }, internalLockLeaseTime / 3, TimeUnit.MILLISECONDS);

    ee.setTimeout(task);
}

這裏還有一個小的點,就是續期的時間是 1/3 爲什麼呢?保證在下次續期的時候鎖不過期,如果是 1/2 可能在下次定時任務執行的時候 key 已經過期,如果小於 1/3 會導致頻繁續期,任務代價 / 收益比不高。

renewExpirationAsync方法, 裏面還是一段 LUA 腳本,進行重新設置鎖的過期時間。

    protected RFuture<Boolean> renewExpirationAsync(long threadId) {
        return evalWriteAsync(getRawName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,
                "if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " +
                        "redis.call('pexpire', KEYS[1], ARGV[1]); " +
                        "return 1; " +
                        "end; " +
                        "return 0;",
                Collections.singletonList(getRawName()),
                internalLockLeaseTime, getLockName(threadId));
    }

鎖的自旋重試

org.redisson.RedissonLock#lock(long, java.util.concurrent.TimeUnit, boolean)在執行獲取鎖失敗的時候,會進入重試。其實這裏就會執行 18 行以後的 while (true)邏輯

private void lock(long leaseTime, TimeUnit unit, boolean interruptibly) throws InterruptedException {
    long threadId = Thread.currentThread().getId();
    Long ttl = tryAcquire(-1, leaseTime, unit, threadId);
    // lock acquired
    if (ttl == null) {
        return;
    }

    // 訂閱鎖過期的消息
    CompletableFuture<RedissonLockEntry> future = subscribe(threadId);
    RedissonLockEntry entry;
    if (interruptibly) {
        entry = commandExecutor.getInterrupted(future);
    } else {
        entry = commandExecutor.get(future);
    }

    try {
        while (true) {
            ttl = tryAcquire(-1, leaseTime, unit, threadId);
            // lock acquired
            if (ttl == null) {
                break;
            }

            // waiting for message
            if (ttl >= 0) {
                try {
                    // 阻塞鎖的超時時間,等鎖過期後再嘗試加鎖
                    entry.getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS);
                } catch (InterruptedException e) {
                    if (interruptibly) {
                        throw e;
                    }
                    entry.getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS);
                }
            } else {
                if (interruptibly) {
                    entry.getLatch().acquire();
                } else {
                    entry.getLatch().acquireUninterruptibly();
                }
            }
        }
    } finally {
        unsubscribe(entry, threadId);
    }
//        get(lockAsync(leaseTime, unit));
}

entry.getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS);其實這裏就是一個間歇性自旋。等到上次鎖過期的時間,在喚醒進行搶鎖 entry.getLatch().acquire();

訂閱鎖失效

還有一個邏輯就是

CompletableFuture future = subscribe(threadId);

這裏其實是會訂閱一個消息,如果解鎖過後,會發布解鎖的消息。然後再喚醒當前多次競爭鎖進入休眠的線程。

解鎖設計

rLock.unlock(); 的核心就是釋放鎖,撤銷續期和喚醒在等待加鎖的線程(發佈解鎖成功消息)。

核心方法(解鎖): org.redisson.RedissonLock#unlockInnerAsync

protected RFuture<Boolean> unlockInnerAsync(long threadId) {
        return evalWriteAsync(getRawName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,
                "if (redis.call('hexists', KEYS[1], ARGV[3]) == 0) then " +
                        "return nil;" +
                        "end; " +
                        "local counter = redis.call('hincrby', KEYS[1], ARGV[3], -1); " +
                        "if (counter > 0) then " +
                        "redis.call('pexpire', KEYS[1], ARGV[2]); " +
                        "return 0; " +
                        "else " +
                        "redis.call('del', KEYS[1]); " +
                        // 發佈解鎖成功消息
                        "redis.call('publish', KEYS[2], ARGV[1]); " +
                        "return 1; " +
                        "end; " +
                        "return nil;",
                Arrays.asList(getRawName(), getChannelName()), LockPubSub.UNLOCK_MESSAGE, internalLockLeaseTime, getLockName(threadId));
    }

還是 LUA 的執行方式。

撤銷鎖續期

核心方法 org.redisson.RedissonBaseLock#unlockAsync(long)

@Override
public RFuture<Void> unlockAsync(long threadId) {
    // 解鎖
    RFuture<Boolean> future = unlockInnerAsync(threadId);

    // 撤銷續期
    CompletionStage<Void> f = future.handle((opStatus, e) -> {
        cancelExpirationRenewal(threadId);

        if (e != null) {
            throw new CompletionException(e);
        }
        if (opStatus == null) {
            IllegalMonitorStateException cause = new IllegalMonitorStateException("attempt to unlock lock, not locked by current thread by node id: "
                    + id + " thread-id: " + threadId);
            throw new CompletionException(cause);
        }

        return null;
    });

    return new CompletableFutureWrapper<>(f);
}

解鎖成功喚排隊線程

在 org.redisson.pubsub.LockPubSub#onMessage中回去喚醒阻塞的線程,讓執行前面的鎖自旋邏輯,具體代碼如下:

@Override
protected void onMessage(RedissonLockEntry value, Long message) {
    if (message.equals(UNLOCK_MESSAGE)) {
        Runnable runnableToExecute = value.getListeners().poll();
        if (runnableToExecute != null) {
            runnableToExecute.run();
        }

        value.getLatch().release();
    } else if (message.equals(READ_UNLOCK_MESSAGE)) {
        while (true) {
            Runnable runnableToExecute = value.getListeners().poll();
            if (runnableToExecute == null) {
                break;
            }
            runnableToExecute.run();
        }

        value.getLatch().release(value.getLatch().getQueueLength());
    }
}

公衆號:運維開發故事

github:https://github.com/orgs/sunsharing-note/dashboard

愛生活,愛運維

如果你覺得文章還不錯,就請點擊右上角選擇發送給朋友或者轉發到朋友圈。您的支持和鼓勵是我最大的動力。喜歡就請關注我吧~

本文由 Readfog 進行 AMP 轉碼,版權歸原作者所有。
來源https://mp.weixin.qq.com/s/qNXN8HS2TqfDgnFfYi-ffA