Redisson 分佈式鎖的源碼解讀
之前秒殺項目中就用到了這個 Redisson 分佈式鎖 👇,這篇就一起來看看源碼吧!
tryLock 加鎖 流程
// RedissonLock.java
@Override
public boolean tryLock() {
return get(tryLockAsync());
}
@Override
public RFuture<Boolean> tryLockAsync() {
return tryLockAsync(Thread.currentThread().getId());
}
@Override
public RFuture<Boolean> tryLockAsync(long threadId) {
return tryAcquireOnceAsync(-1, -1, null, threadId);
}
private RFuture<Boolean> tryAcquireOnceAsync(long waitTime, long leaseTime, TimeUnit unit, long threadId) {
RFuture<Boolean> acquiredFuture;
// 續租時間:鎖的過期時間(沒有設置的話就用默認的 internalLockLeaseTime 看門狗時間)
if (leaseTime > 0) {
acquiredFuture = tryLockInnerAsync(waitTime, leaseTime, unit, threadId, RedisCommands.EVAL_NULL_BOOLEAN);
} else {
acquiredFuture = tryLockInnerAsync(waitTime, internalLockLeaseTime,
TimeUnit.MILLISECONDS, threadId, RedisCommands.EVAL_NULL_BOOLEAN);
}
CompletionStage<Boolean> f = acquiredFuture.thenApply(acquired -> {
// lock acquired
if (acquired) {
if (leaseTime > 0) {
internalLockLeaseTime = unit.toMillis(leaseTime);
} else {
// 沒配置過期時間就執行這裏
scheduleExpirationRenewal(threadId);
}
}
return acquired;
});
return new CompletableFutureWrapper<>(f);
}
代碼很長,主要看
tryLockInnerAsync
和scheduleExpirationRenewal
方法。
前置知識
// EVAL 命令,用於在 Redis 服務器端執行 Lua 腳本。
RedisStrictCommand<Boolean> EVAL_NULL_BOOLEAN = new RedisStrictCommand<Boolean>("EVAL", new BooleanNullReplayConvertor());
// BooleanNullReplayConvertor 判斷是不是 NULL。
public class BooleanNullReplayConvertor implements Convertor<Boolean> {
@Override
public Boolean convert(Object obj) { return obj == null; }
}
tryLockInnerAsync
<T> RFuture<T> tryLockInnerAsync(long waitTime, long leaseTime, TimeUnit unit, long threadId, RedisStrictCommand<T> command) {
// getRawName 即 鎖的名稱
return evalWriteAsync(getRawName(), LongCodec.INSTANCE, command,
// 鎖不存在,添加 hash 數據,可重入次數加一,毫秒級別過期時間,返回 null
"if (redis.call('exists', KEYS[1]) == 0) then " +
"redis.call('hincrby', KEYS[1], ARGV[2], 1); " +
"redis.call('pexpire', KEYS[1], ARGV[1]); " +
"return nil; " +
"end; " +
// 鎖存在,可重入次數加一,毫秒級別過期時間,返回 null
"if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " +
"redis.call('hincrby', KEYS[1], ARGV[2], 1); " +
"redis.call('pexpire', KEYS[1], ARGV[1]); " +
"return nil; " +
"end; " +
// 鎖被別人佔有, 返回毫秒級別過期時間
"return redis.call('pttl', KEYS[1]);",
Collections.singletonList(getRawName()), unit.toMillis(leaseTime), getLockName(threadId));
}
ARGV[1] 過期時間
ARGV[2] 即 getLockName(threadId) ,這裏是 redisson 客戶端 id + 這個線程 ID , 如下 👇
scheduleExpirationRenewal (看門狗機制)
上面加鎖完,就來到這段代碼。
沒有設置過期時間的話,默認給你設置 30 s 過期,並每隔 10s 自動續期,確保鎖不會在使用過程中過期。
同時,防止客戶端宕機,留下死鎖。
// RedissonBaseLock.java
protected void scheduleExpirationRenewal(long threadId) {
ExpirationEntry entry = new ExpirationEntry();
ExpirationEntry oldEntry = EXPIRATION_RENEWAL_MAP.putIfAbsent(getEntryName(), entry);
if (oldEntry != null) {
oldEntry.addThreadId(threadId);
} else {
entry.addThreadId(threadId);
try {
// 看這裏
renewExpiration();
} finally {
if (Thread.currentThread().isInterrupted()) {
cancelExpirationRenewal(threadId);
}
}
}
}
private void renewExpiration() {
ExpirationEntry ee = EXPIRATION_RENEWAL_MAP.get(getEntryName());
if (ee == null) {
return;
}
// 延時任務,10s 續期一次。
Timeout task = commandExecutor.getConnectionManager().newTimeout(new TimerTask() {
@Override
public void run(Timeout timeout) throws Exception {
ExpirationEntry ent = EXPIRATION_RENEWAL_MAP.get(getEntryName());
if (ent == null) {
return;
}
Long threadId = ent.getFirstThreadId();
if (threadId == null) {
return;
}
// 續期操作
CompletionStage<Boolean> future = renewExpirationAsync(threadId);
future.whenComplete((res, e) -> {
if (e != null) {
log.error("Can't update lock " + getRawName() + " expiration", e);
EXPIRATION_RENEWAL_MAP.remove(getEntryName());
return;
}
if (res) {
// reschedule itself
renewExpiration();
} else {
cancelExpirationRenewal(null);
}
});
}
// 三分之一時間,30s /3= 10s
}, internalLockLeaseTime / 3, TimeUnit.MILLISECONDS);
ee.setTimeout(task);
}
// 續期腳本
protected CompletionStage<Boolean> renewExpirationAsync(long threadId) {
return evalWriteAsync(getRawName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,
"if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " +
"redis.call('pexpire', KEYS[1], ARGV[1]); " +
"return 1; " +
"end; " +
"return 0;",
Collections.singletonList(getRawName()),
internalLockLeaseTime, getLockName(threadId));
}
get
上面的加鎖操作,最終返回的是 return new CompletableFutureWrapper<>(f);
這個異步操作。
還記得上面的 BooleanNullReplayConvertor 嗎,當 eval 執行加鎖腳本時,成功會返回 null,並在這裏轉成 True 。
@Override
public <V> V get(RFuture<V> future) {
if (Thread.currentThread().getName().startsWith("redisson-netty")) {
throw new IllegalStateException("Sync methods can't be invoked from async/rx/reactive listeners");
}
try {
return future.toCompletableFuture().get();
} catch (InterruptedException e) {
future.cancel(true);
Thread.currentThread().interrupt();
throw new RedisException(e);
} catch (ExecutionException e) {
throw convertException(e);
}
}
那麼,加鎖的部分到這裏就結束, 解鎖 的就簡單過一下 👇
unlock 解鎖
// RedissonLock.java
protected RFuture<Boolean> unlockInnerAsync(long threadId) {
return evalWriteAsync(getRawName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,
// 不存在,直接返回 null
"if (redis.call('hexists', KEYS[1], ARGV[3]) == 0) then " +
"return nil;" +
"end; " +
// 減一
"local counter = redis.call('hincrby', KEYS[1], ARGV[3], -1); " +
// 大於0,設置毫秒級過期時間,並返回0
"if (counter > 0) then " +
"redis.call('pexpire', KEYS[1], ARGV[2]); " +
"return 0; " +
// 刪除鎖,並向指定channel發佈 0 這個消息,並返回1
"else " +
"redis.call('del', KEYS[1]); " +
"redis.call('publish', KEYS[2], ARGV[1]); " +
"return 1; " +
"end; " +
// 返回 null
"return nil;",
Arrays.asList(getRawName(), getChannelName()), LockPubSub.UNLOCK_MESSAGE, internalLockLeaseTime, getLockName(threadId));
}
KEYS[1] 爲鎖名,KEYS[2] channel 名 👇
ARGV[1] 爲 0 👇, ARGV[2] 過期時間,ARGV[3] 爲 redisson 客戶端 id + 這個線程 ID
解鎖後,取消續期任務。
結尾
通過源碼,我們瞭解到上文提到的 redisson 框架的幾個特點:自動續期,可重入鎖, lua 腳本。
當然,它不止這些功能,小夥伴們可以在這裏查閱 👇
https://github.com/redisson/redisson/wiki/Redisson%E9%A1%B9%E7%9B%AE%E4%BB%8B%E7%BB%8D
本文就到這裏啦,感謝您的閱讀,有不對的地方也請您幫忙指正!謝謝~😋
本文由 Readfog 進行 AMP 轉碼,版權歸原作者所有。
來源:https://mp.weixin.qq.com/s/vJrJmFdBG-9ls_2TvmSneQ