分佈式鎖工具 Redisson,太香了!!
一、Redisson 概述
什麼是 Redisson?—— Redisson Wiki
Redisson 是一個在 Redis 的基礎上實現的 Java 駐內存數據網格(In-Memory Data Grid)。它不僅提供了一系列的分佈式的 Java 常用對象,還提供了許多分佈式服務。其中包括 (BitSet, Set, Multimap, SortedSet, Map, List, Queue, BlockingQueue, Deque, BlockingDeque, Semaphore, Lock, AtomicLong, CountDownLatch, Publish / Subscribe, Bloom filter, Remote service, Spring cache, Executor service, Live Object service, Scheduler service) Redisson 提供了使用 Redis 的最簡單和最便捷的方法。Redisson 的宗旨是促進使用者對 Redis 的關注分離(Separation of Concern),從而讓使用者能夠將精力更集中地放在處理業務邏輯上。
一個基於 Redis 實現的分佈式工具,有基本分佈式對象和高級又抽象的分佈式服務,爲每個試圖再造分佈式輪子的程序員帶來了大部分分佈式問題的解決辦法。
Redisson 和 Jedis、Lettuce 有什麼區別?倒也不是雷鋒和雷鋒塔
Redisson 和它倆的區別就像一個用鼠標操作圖形化界面,一個用命令行操作文件。Redisson 是更高層的抽象,Jedis 和 Lettuce 是 Redis 命令的封裝。
-
Jedis 是 Redis 官方推出的用於通過 Java 連接 Redis 客戶端的一個工具包,提供了 Redis 的各種命令支持
-
Lettuce 是一種可擴展的線程安全的 Redis 客戶端,通訊框架基於 Netty,支持高級的 Redis 特性,比如哨兵,集羣,管道,自動重新連接和 Redis 數據模型。Spring Boot 2.x 開始 Lettuce 已取代 Jedis 成爲首選 Redis 的客戶端。
-
Redisson 是架設在 Redis 基礎上,通訊基於 Netty 的綜合的、新型的中間件,企業級開發中使用 Redis 的最佳範本
Jedis 把 Redis 命令封裝好,Lettuce 則進一步有了更豐富的 Api,也支持集羣等模式。但是兩者也都點到爲止,只給了你操作 Redis 數據庫的腳手架,而 Redisson 則是基於 Redis、Lua 和 Netty 建立起了成熟的分佈式解決方案,甚至 redis 官方都推薦的一種工具集。
二、分佈式鎖
分佈式鎖怎麼實現?
分佈式鎖是併發業務下的剛需,雖然實現五花八門:ZooKeeper 有 Znode 順序節點,數據庫有表級鎖和樂 / 悲觀鎖,Redis 有 setNx,但是殊途同歸,最終還是要回到互斥上來,本篇介紹 Redisson,那就以 redis 爲例。
怎麼寫一個簡單的 Redis 分佈式鎖?
以 Spring Data Redis 爲例,用 RedisTemplate 來操作 Redis(setIfAbsent 已經是 setNx + expire 的合併命令),如下
// 加鎖
public Boolean tryLock(String key, String value, long timeout, TimeUnit unit) {
return redisTemplate.opsForValue().setIfAbsent(key, value, timeout, unit);
}
// 解鎖,防止刪錯別人的鎖,以uuid爲value校驗是否自己的鎖
public void unlock(String lockName, String uuid) {
if(uuid.equals(redisTemplate.opsForValue().get(lockName)){ redisTemplate.opsForValue().del(lockName); }
}
// 結構
if(tryLock){
// todo
}finally{
unlock;
}
簡單 1.0 版本完成,聰明的小張一眼看出,這是鎖沒錯,但 get 和 del 操作非原子性,併發一旦大了,無法保證進程安全。於是小張提議,用 Lua 腳本
Lua 腳本是什麼?
Lua 腳本是 redis 已經內置的一種輕量小巧語言,其執行是通過 redis 的 eval/evalsha 命令來運行,把操作封裝成一個 Lua 腳本,如論如何都是一次執行的原子操作。
於是 2.0 版本通過 Lua 腳本刪除
lockDel.lua 如下
if redis.call('get', KEYS[1]) == ARGV[1]
then
-- 執行刪除操作
return redis.call('del', KEYS[1])
else
-- 不成功,返回0
return 0
end
delete 操作時執行 Lua 命令
// 解鎖腳本
DefaultRedisScript<Object> unlockScript = new DefaultRedisScript();
unlockScript.setScriptSource(new ResourceScriptSource(new ClassPathResource("lockDel.lua")));
// 執行lua腳本解鎖
redisTemplate.execute(unlockScript, Collections.singletonList(keyName), value);
2.0 似乎更像一把鎖,但好像又缺少了什麼,小張一拍腦袋,synchronized 和 ReentrantLock 都很絲滑,因爲他們都是可重入鎖,一個線程多次拿鎖也不會死鎖,我們需要可重入。
怎麼保證可重入?
重入就是,同一個線程多次獲取同一把鎖是允許的,不會造成死鎖,這一點 synchronized 偏向鎖提供了很好的思路,synchronized 的實現重入是在 JVM 層面,JAVA 對象頭 MARK WORD 中便藏有線程 ID 和計數器來對當前線程做重入判斷,避免每次 CAS。
當一個線程訪問同步塊並獲取鎖時,會在對象頭和棧幀中的鎖記錄裏存儲偏向的線程 ID,以後該線程在進入和退出同步塊時不需要進行 CAS 操作來加鎖和解鎖,只需簡單測試一下對象頭的 Mark Word 裏是否存儲着指向當前線程的偏向鎖。如果測試成功,表示線程已經獲得了鎖。如果測試失敗,則需要再測試一下 Mark Word 中偏向鎖標誌是否設置成 1:沒有則 CAS 競爭;設置了,則 CAS 將對象頭偏向鎖指向當前線程。
再維護一個計數器,同個線程進入則自增 1,離開再減 1,直到爲 0 才能釋放
可重入鎖
仿造該方案,我們需改造 Lua 腳本:
需要存儲 鎖名稱 lockName、獲得該鎖的線程 id 和對應線程的進入次數 count
加鎖
每次線程獲取鎖時,判斷是否已存在該鎖
不存在
設置 hash 的 key 爲線程 id,value 初始化爲 1
設置過期時間
返回獲取鎖成功 true
存在
繼續判斷是否存在當前線程 id 的 hash key
存在,線程 key 的 value + 1,重入次數增加 1,設置過期時間
不存在,返回加鎖失敗
- 解鎖
每次線程來解鎖時,判斷是否已存在該鎖
存在
是否有該線程的 id 的 hash key,有則減 1,無則返回解鎖失敗
減 1 後,判斷剩餘 count 是否爲 0,爲 0 則說明不再需要這把鎖,執行 del 命令刪除
- 存儲結構
爲了方便維護這個對象,我們用 Hash 結構來存儲這些字段。Redis 的 Hash 類似 Java 的 HashMap,適合存儲對象。
hget lockname1 threadId
設置一個名字爲 lockname1 的 hash 結構,該 hash 結構 key 爲 threadId,值 value 爲 1
hget lockname1 threadId
獲取 lockname1 的 threadId 的值
存儲結構爲
lockname 鎖名稱
key1:threadId 唯一鍵,線程id
value1:count 計數器,記錄該線程獲取鎖的次數
redis 中的結構
- 計數器的加減
當同一個線程獲取同一把鎖時,我們需要對對應線程的計數器 count 做加減
判斷一個 redis key 是否存在,可以用exists
,而判斷一個 hash 的 key 是否存在,可以用hexists
而 redis 也有 hash 自增的命令hincrby
每次自增 1 時 hincrby lockname1 threadId 1
,自減 1 時 hincrby lockname1 threadId -1
- 解鎖的判斷
當一把鎖不再被需要了,每次解鎖一次,count 減 1,直到爲 0 時,執行刪除
綜合上述的存儲結構和判斷流程,加鎖和解鎖 Lua 如下
加鎖 lock.lua
local key = KEYS[1];
local threadId = ARGV[1];
local releaseTime = ARGV[2];
-- lockname不存在
if(redis.call('exists', key) == 0) then
redis.call('hset', key, threadId, '1');
redis.call('expire', key, releaseTime);
return 1;
end;
-- 當前線程已id存在
if(redis.call('hexists', key, threadId) == 1) then
redis.call('hincrby', key, threadId, '1');
redis.call('expire', key, releaseTime);
return 1;
end;
return 0;
解鎖 unlock.lua
local key = KEYS[1];
local threadId = ARGV[1];
-- lockname、threadId不存在
if (redis.call('hexists', key, threadId) == 0) then
return nil;
end;
-- 計數器-1
local count = redis.call('hincrby', key, threadId, -1);
-- 刪除lock
if (count == 0) then
redis.call('del', key);
return nil;
end;
代碼
/**
* @description 原生redis實現分佈式鎖
* @date 2021/2/6 10:51 下午
**/
@Getter
@Setter
public class RedisLock {
private RedisTemplate redisTemplate;
private DefaultRedisScript<Long> lockScript;
private DefaultRedisScript<Object> unlockScript;
public RedisLock(RedisTemplate redisTemplate) {
this.redisTemplate = redisTemplate;
// 加載加鎖的腳本
lockScript = new DefaultRedisScript<>();
this.lockScript.setScriptSource(new ResourceScriptSource(new ClassPathResource("lock.lua")));
this.lockScript.setResultType(Long.class);
// 加載釋放鎖的腳本
unlockScript = new DefaultRedisScript<>();
this.unlockScript.setScriptSource(new ResourceScriptSource(new ClassPathResource("unlock.lua")));
}
/**
* 獲取鎖
*/
public String tryLock(String lockName, long releaseTime) {
// 存入的線程信息的前綴
String key = UUID.randomUUID().toString();
// 執行腳本
Long result = (Long) redisTemplate.execute(
lockScript,
Collections.singletonList(lockName),
key + Thread.currentThread().getId(),
releaseTime);
if (result != null && result.intValue() == 1) {
return key;
} else {
return null;
}
}
/**
* 解鎖
* @param lockName
* @param key
*/
public void unlock(String lockName, String key) {
redisTemplate.execute(unlockScript,
Collections.singletonList(lockName),
key + Thread.currentThread().getId()
);
}
}
至此已經完成了一把分佈式鎖,符合互斥、可重入、防死鎖的基本特點。
嚴謹的小張覺得雖然當個普通互斥鎖,已經穩穩夠用,可是業務裏總是又很多特殊情況的,比如 A 進程在獲取到鎖的時候,因業務操作時間太長,鎖釋放了但是業務還在執行,而此刻 B 進程又可以正常拿到鎖做業務操作,兩個進程操作就會存在依舊有共享資源的問題。
而且如果負責儲存這個分佈式鎖的 Redis 節點宕機以後,而且這個鎖正好處於鎖住的狀態時,這個鎖會出現鎖死的狀態。
小張不是槓精,因爲庫存操作總有這樣那樣的特殊。
所以我們希望在這種情況時,可以延長鎖的 releaseTime 延遲釋放鎖來直到完成業務期望結果,這種不斷延長鎖過期時間來保證業務執行完成的操作就是鎖續約。
讀寫分離也是常見,一個讀多寫少的業務爲了性能,常常是有讀鎖和寫鎖的。
而此刻的擴展已經超出了一把簡單輪子的複雜程度,光是處理續約,就夠小張喝一壺,何況在性能(鎖的最大等待時間)、優雅(無效鎖申請)、重試(失敗重試機制)等方面還要下功夫研究。在小張苦思冥想時,旁邊的小白湊過來看了看小張,很好奇,都 2021 年了,爲什麼不直接用 redisson 呢?
Redisson 就有這把你要的鎖。
三、Redisson 分佈式鎖
號稱簡單的 Redisson 分佈式鎖的使用姿勢是什麼?
- 依賴
<!-- 原生,本章使用-->
<dependency>
<groupId>org.redisson</groupId>
<artifactId>redisson</artifactId>
<version>3.13.6</version>
</dependency>
<!-- 另一種Spring集成starter,本章未使用 -->
<dependency>
<groupId>org.redisson</groupId>
<artifactId>redisson-spring-boot-starter</artifactId>
<version>3.13.6</version>
</dependency>
- 配置
@Configuration
public class RedissionConfig {
@Value("${spring.redis.host}")
private String redisHost;
@Value("${spring.redis.password}")
private String password;
private int port = 6379;
@Bean
public RedissonClient getRedisson() {
Config config = new Config();
config.useSingleServer().
setAddress("redis://" + redisHost + ":" + port).
setPassword(password);
config.setCodec(new JsonJacksonCodec());
return Redisson.create(config);
}
}
- 啓用分佈式鎖
@Resource
private RedissonClient redissonClient;
RLock rLock = redissonClient.getLock(lockName);
try {
boolean isLocked = rLock.tryLock(expireTime, TimeUnit.MILLISECONDS);
if (isLocked) {
// TODO
}
} catch (Exception e) {
rLock.unlock();
}
簡潔明瞭,只需要一個 RLock,既然推薦 Redisson,就往裏面看看他是怎麼實現的。
四、RLock
RLock 是 Redisson 分佈式鎖的最核心接口,繼承了 concurrent 包的 Lock 接口和自己的 RLockAsync 接口,RLockAsync 的返回值都是 RFuture,是 Redisson 執行異步實現的核心邏輯,也是 Netty 發揮的主要陣地。
RLock 如何加鎖?
從 RLock 進入,找到 RedissonLock 類,找到 tryLock 方法再遞進到幹事的 tryAcquireOnceAsync 方法,這是加鎖的主要代碼(版本不一此處實現有差別,和最新 3.15.x 有一定出入,但是核心邏輯依然未變。此處以 3.13.6 爲例)
private RFuture<Boolean> tryAcquireOnceAsync(long waitTime, long leaseTime, TimeUnit unit, long threadId) {
if (leaseTime != -1L) {
return this.tryLockInnerAsync(waitTime, leaseTime, unit, threadId, RedisCommands.EVAL_NULL_BOOLEAN);
} else {
RFuture<Boolean> ttlRemainingFuture = this.tryLockInnerAsync(waitTime, this.commandExecutor.getConnectionManager().getCfg().getLockWatchdogTimeout(), TimeUnit.MILLISECONDS, threadId, RedisCommands.EVAL_NULL_BOOLEAN);
ttlRemainingFuture.onComplete((ttlRemaining, e) -> {
if (e == null) {
if (ttlRemaining) {
this.scheduleExpirationRenewal(threadId);
}
}
});
return ttlRemainingFuture;
}
}
此處出現 leaseTime 時間判斷的 2 個分支,實際上就是加鎖時是否設置過期時間,未設置過期時間(-1)時則會有 watchDog 的鎖續約(下文),一個註冊了加鎖事件的續約任務。我們先來看有過期時間 tryLockInnerAsync 部分,
evalWriteAsync 是 eval 命令執行 lua 的入口
<T> RFuture<T> tryLockInnerAsync(long leaseTime, TimeUnit unit, long threadId, RedisStrictCommand<T> command) {
this.internalLockLeaseTime = unit.toMillis(leaseTime);
return this.commandExecutor.evalWriteAsync(this.getName(), LongCodec.INSTANCE, command, "if (redis.call('exists', KEYS[1]) == 0) then redis.call('hset', KEYS[1], ARGV[2], 1); redis.call('pexpire', KEYS[1], ARGV[1]); return nil; end; if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then redis.call('hincrby', KEYS[1], ARGV[2], 1); redis.call('pexpire', KEYS[1], ARGV[1]); return nil; end; return redis.call('pttl', KEYS[1]);", Collections.singletonList(this.getName()), new Object[]{this.internalLockLeaseTime, this.getLockName(threadId)});
}
這裏揭開真面目,eval 命令執行 Lua 腳本的地方,此處的 Lua 腳本展開
-- 不存在該key時
if (redis.call('exists', KEYS[1]) == 0) then
-- 新增該鎖並且hash中該線程id對應的count置1
redis.call('hincrby', KEYS[1], ARGV[2], 1);
-- 設置過期時間
redis.call('pexpire', KEYS[1], ARGV[1]);
return nil;
end;
-- 存在該key 並且 hash中線程id的key也存在
if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then
-- 線程重入次數++
redis.call('hincrby', KEYS[1], ARGV[2], 1);
redis.call('pexpire', KEYS[1], ARGV[1]);
return nil;
end;
return redis.call('pttl', KEYS[1]);
和前面我們寫自定義的分佈式鎖的腳本幾乎一致,看來 redisson 也是一樣的實現,具體參數分析:
// keyName
KEYS[1] = Collections.singletonList(this.getName())
// leaseTime
ARGV[1] = this.internalLockLeaseTime
// uuid+threadId組合的唯一值
ARGV[2] = this.getLockName(threadId)
總共 3 個參數完成了一段邏輯:
判斷該鎖是否已經有對應 hash 表存在,
• 沒有對應的 hash 表:則 set 該 hash 表中一個 entry 的 key 爲鎖名稱,value 爲 1,之後設置該 hash 表失效時間爲 leaseTime
• 存在對應的 hash 表:則將該 lockName 的 value 執行 + 1 操作,也就是計算進入次數,再設置失效時間 leaseTime
• 最後返回這把鎖的 ttl 剩餘時間
也和上述自定義鎖沒有區別
既然如此,那解鎖的步驟也肯定有對應的 - 1 操作,再看 unlock 方法,同樣查找方法名,一路到
protected RFuture<Boolean> unlockInnerAsync(long threadId) {
return this.commandExecutor.evalWriteAsync(this.getName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN, "if (redis.call('exists', KEYS[1]) == 0) then redis.call('publish', KEYS[2], ARGV[1]); return 1; end;if (redis.call('hexists', KEYS[1], ARGV[3]) == 0) then return nil;end; local counter = redis.call('hincrby', KEYS[1], ARGV[3], -1); if (counter > 0) then redis.call('pexpire', KEYS[1], ARGV[2]); return 0; else redis.call('del', KEYS[1]); redis.call('publish', KEYS[2], ARGV[1]); return 1; end; return nil;", Arrays.asList(this.getName(), this.getChannelName()), new Object[]{LockPubSub.unlockMessage, this.internalLockLeaseTime, this.getLockName(threadId)});
}
掏出 Lua 部分
-- 不存在key
if (redis.call('hexists', KEYS[1], ARGV[3]) == 0) then
return nil;
end;
-- 計數器 -1
local counter = redis.call('hincrby', KEYS[1], ARGV[3], -1);
if (counter > 0) then
-- 過期時間重設
redis.call('pexpire', KEYS[1], ARGV[2]);
return 0;
else
-- 刪除併發布解鎖消息
redis.call('del', KEYS[1]);
redis.call('publish', KEYS[2], ARGV[1]);
return 1;
end;
return nil;
Arrays.asList(getName(), getChannelName())
name 鎖名稱
channelName,用於pubSub發佈消息的channel名稱
ARGV 變量有三個LockPubSub.UNLOCK_MESSAGE, internalLockLeaseTime, getLockName(threadId)
LockPubSub.UNLOCK_MESSAGE,channel發送消息的類別,此處解鎖爲0
internalLockLeaseTime,watchDog配置的超時時間,默認爲30s
lockName 這裏的lockName指的是uuid和threadId組合的唯一值
步驟如下:
如果該鎖不存在則返回 nil;
如果該鎖存在則將其線程的 hash key 計數器 - 1,
計數器 counter>0,重置下失效時間,返回 0;否則,刪除該鎖,發佈解鎖消息 unlockMessage,返回 1;
其中 unLock 的時候使用到了 Redis 發佈訂閱 PubSub 完成消息通知。
而訂閱的步驟就在 RedissonLock 的加鎖入口的 lock 方法裏
long threadId = Thread.currentThread().getId();
Long ttl = this.tryAcquire(-1L, leaseTime, unit, threadId);
if (ttl != null) {
// 訂閱
RFuture<RedissonLockEntry> future = this.subscribe(threadId);
if (interruptibly) {
this.commandExecutor.syncSubscriptionInterrupted(future);
} else {
this.commandExecutor.syncSubscription(future);
}
// 省略
當鎖被其他線程佔用時,通過監聽鎖的釋放通知(在其他線程通過 RedissonLock 釋放鎖時,會通過發佈訂閱 pub/sub 功能發起通知),等待鎖被其他線程釋放,也是爲了避免自旋的一種常用效率手段。
- 解鎖消息
爲了一探究竟通知了什麼,通知後又做了什麼,進入 LockPubSub。
這裏只有一個明顯的監聽方法 onMessage,其訂閱和信號量的釋放都在父類 PublishSubscribe,我們只關注監聽事件的實際操作
protected void onMessage(RedissonLockEntry value, Long message) {
Runnable runnableToExecute;
if (message.equals(unlockMessage)) {
// 從監聽器隊列取監聽線程執行監聽回調
runnableToExecute = (Runnable)value.getListeners().poll();
if (runnableToExecute != null) {
runnableToExecute.run();
}
// getLatch()返回的是Semaphore,信號量,此處是釋放信號量
// 釋放信號量後會喚醒等待的entry.getLatch().tryAcquire去再次嘗試申請鎖
value.getLatch().release();
} else if (message.equals(readUnlockMessage)) {
while(true) {
runnableToExecute = (Runnable)value.getListeners().poll();
if (runnableToExecute == null) {
value.getLatch().release(value.getLatch().getQueueLength());
break;
}
runnableToExecute.run();
}
}
}
發現一個是默認解鎖消息,一個是 ** 讀鎖解鎖消息 **, 因爲 redisson 是有提供讀寫鎖的,而讀寫鎖讀讀情況和讀寫、寫寫情況互斥情況不同,我們只看上面的默認解鎖消息 unlockMessage 分支
LockPubSub 監聽最終執行了 2 件事
-
runnableToExecute.run() 執行監聽回調
-
value.getLatch().release(); 釋放信號量
Redisson 通過 LockPubSub 監聽解鎖消息,執行監聽回調和釋放信號量通知等待線程可以重新搶鎖。
這時再回來看 tryAcquireOnceAsync 另一分支
private RFuture<Boolean> tryAcquireOnceAsync(long waitTime, long leaseTime, TimeUnit unit, long threadId) {
if (leaseTime != -1L) {
return this.tryLockInnerAsync(waitTime, leaseTime, unit, threadId, RedisCommands.EVAL_NULL_BOOLEAN);
} else {
RFuture<Boolean> ttlRemainingFuture = this.tryLockInnerAsync(waitTime, this.commandExecutor.getConnectionManager().getCfg().getLockWatchdogTimeout(), TimeUnit.MILLISECONDS, threadId, RedisCommands.EVAL_NULL_BOOLEAN);
ttlRemainingFuture.onComplete((ttlRemaining, e) -> {
if (e == null) {
if (ttlRemaining) {
this.scheduleExpirationRenewal(threadId);
}
}
});
return ttlRemainingFuture;
}
}
可以看到,無超時時間時,在執行加鎖操作後,還執行了一段費解的邏輯
ttlRemainingFuture.onComplete((ttlRemaining, e) -> {
if (e == null) {
if (ttlRemaining) {
this.scheduleExpirationRenewal(threadId);
}
}
})
此處涉及到 Netty 的 Future/Promise-Listener 模型(參考 Netty 中的異步編程),Redisson 中幾乎全部以這種方式通信(所以說 Redisson 是基於 Netty 通信機制實現的),理解這段邏輯可以試着先理解
在 Java 的 Future 中,業務邏輯爲一個 Callable 或 Runnable 實現類,該類的 call() 或 run() 執行完畢意味着業務邏輯的完結,在 Promise 機制中,可以在業務邏輯中人工設置業務邏輯的成功與失敗,這樣更加方便的監控自己的業務邏輯。
這塊代碼的表面意義就是,在執行異步加鎖的操作後,加鎖成功則根據加鎖完成返回的 ttl 是否過期來確認是否執行一段定時任務。
這段定時任務的就是 watchDog 的核心。
- 鎖續約
查看 RedissonLock.this.scheduleExpirationRenewal(threadId)
private void scheduleExpirationRenewal(long threadId) {
RedissonLock.ExpirationEntry entry = new RedissonLock.ExpirationEntry();
RedissonLock.ExpirationEntry oldEntry = (RedissonLock.ExpirationEntry)EXPIRATION_RENEWAL_MAP.putIfAbsent(this.getEntryName(), entry);
if (oldEntry != null) {
oldEntry.addThreadId(threadId);
} else {
entry.addThreadId(threadId);
this.renewExpiration();
}
}
private void renewExpiration() {
RedissonLock.ExpirationEntry ee = (RedissonLock.ExpirationEntry)EXPIRATION_RENEWAL_MAP.get(this.getEntryName());
if (ee != null) {
Timeout task = this.commandExecutor.getConnectionManager().newTimeout(new TimerTask() {
public void run(Timeout timeout) throws Exception {
RedissonLock.ExpirationEntry ent = (RedissonLock.ExpirationEntry)RedissonLock.EXPIRATION_RENEWAL_MAP.get(RedissonLock.this.getEntryName());
if (ent != null) {
Long threadId = ent.getFirstThreadId();
if (threadId != null) {
RFuture<Boolean> future = RedissonLock.this.renewExpirationAsync(threadId);
future.onComplete((res, e) -> {
if (e != null) {
RedissonLock.log.error("Can't update lock " + RedissonLock.this.getName() + " expiration", e);
} else {
if (res) {
RedissonLock.this.renewExpiration();
}
}
});
}
}
}
}, this.internalLockLeaseTime / 3L, TimeUnit.MILLISECONDS);
ee.setTimeout(task);
}
}
拆分來看,這段連續嵌套且冗長的代碼實際上做了幾步
• 添加一個 netty 的 Timeout 回調任務,每(internalLockLeaseTime / 3)毫秒執行一次,執行的方法是 renewExpirationAsync
• renewExpirationAsync 重置了鎖超時時間,又註冊一個監聽器,監聽回調又執行了 renewExpiration
renewExpirationAsync 的 Lua 如下
protected RFuture<Boolean> renewExpirationAsync(long threadId) {
return this.commandExecutor.evalWriteAsync(this.getName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN, "if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then redis.call('pexpire', KEYS[1], ARGV[1]); return 1; end; return 0;", Collections.singletonList(this.getName()), new Object[]{this.internalLockLeaseTime, this.getLockName(threadId)});
}
if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then
redis.call('pexpire', KEYS[1], ARGV[1]);
return 1;
end;
return 0;
重新設置了超時時間。
Redisson 加這段邏輯的目的是什麼?
目的是爲了某種場景下保證業務不影響,如任務執行超時但未結束,鎖已經釋放的問題。
當一個線程持有了一把鎖,由於並未設置超時時間 leaseTime,Redisson 默認配置了 30S,開啓 watchDog,每 10S 對該鎖進行一次續約,維持 30S 的超時時間,直到任務完成再刪除鎖。
這就是 Redisson 的鎖續約,也就是 WatchDog 實現的基本思路。
- 流程概括
通過整體的介紹,流程簡單概括:
A、B 線程爭搶一把鎖,A 獲取到後,B 阻塞
B 線程阻塞時並非主動 CAS,而是 PubSub 方式訂閱該鎖的廣播消息
A 操作完成釋放了鎖,B 線程收到訂閱消息通知
B 被喚醒開始繼續搶鎖,拿到鎖
詳細加鎖解鎖流程總結如下圖:
五、公平鎖
以上介紹的可重入鎖是非公平鎖,Redisson 還基於 Redis 的隊列(List)和 ZSet 實現了公平鎖
公平的定義是什麼?
公平就是按照客戶端的請求先來後到排隊來獲取鎖,先到先得,也就是 FIFO,所以隊列和容器順序編排必不可少
FairSync
回顧 JUC 的 ReentrantLock 公平鎖的實現
/**
* Sync object for fair locks
*/
static final class FairSync extends Sync {
private static final long serialVersionUID = -3000897897090466540L;
final void lock() {
acquire(1);
}
/**
* Fair version of tryAcquire. Don't grant access unless
* recursive call or no waiters or is first.
*/
protected final boolean tryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();
if (c == 0) {
if (!hasQueuedPredecessors() &&
compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
}
else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires;
if (nextc < 0)
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}
}
AQS已經提供了整個實現,是否公平取決於實現類取出節點邏輯是否順序取
AbstractQueuedSynchronizer 是用來構建鎖或者其他同步組件的基礎框架,通過內置 FIFO 隊列來完成資源獲取線程的排隊工作,他自身沒有實現同步接口,僅僅定義了若干同步狀態獲取和釋放的方法來供自定義同步組件使用(上圖),支持獨佔和共享獲取,這是基於模版方法模式的一種設計,給公平 / 非公平提供了土壤。
我們用 2 張圖來簡單解釋 AQS 的等待流程(出自《JAVA 併發編程的藝術》)
一張是同步隊列(FIFO 雙向隊列)管理 獲取同步狀態失敗(搶鎖失敗)的線程引用、等待狀態和前驅後繼節點的流程圖
一張是獨佔式獲取同步狀態的總流程,核心 acquire(int arg) 方法調用流程
可以看出鎖的獲取流程
AQS 維護一個同步隊列,獲取狀態失敗的線程都會加入到隊列中進行自旋,移出隊列或停止自旋的條件是前驅節點爲頭節點切成功獲取了同步狀態。
而比較另一段非公平鎖類NonfairSync
可以發現,控制公平和非公平的關鍵代碼,在於 hasQueuedPredecessors 方法。
static final class NonfairSync extends Sync {
private static final long serialVersionUID = 7316153563782823691L;
/**
* Performs lock. Try immediate barge, backing up to normal
* acquire on failure.
*/
final void lock() {
if (compareAndSetState(0, 1))
setExclusiveOwnerThread(Thread.currentThread());
else
acquire(1);
}
protected final boolean tryAcquire(int acquires) {
return nonfairTryAcquire(acquires);
}
}
NonfairSync 減少了了 hasQueuedPredecessors 判斷條件,該方法的作用就是
查看同步隊列中當前節點是否有前驅節點,如果有比當前線程更早請求獲取鎖則返回 true。
保證每次都取隊列的第一個節點(線程)來獲取鎖,這就是公平規則
爲什麼 JUC 以默認非公平鎖呢?
因爲當一個線程請求鎖時,只要獲取來同步狀態即成功獲取。在此前提下,剛釋放的線程再次獲取同步狀態的幾率會非常大,使得其他線程只能在同步隊列中等待。但這樣帶來的好處是,非公平鎖大大減少了系統線程上下文的切換開銷。
可見公平的代價是性能與吞吐量。
Redis 裏沒有 AQS,但是有 List 和 zSet,看看 Redisson 是怎麼實現公平的。
RedissonFairLock
RedissonFairLock 用法依然很簡單
RLock fairLock = redissonClient.getFairLock(lockName);
fairLock.lock();
RedissonFairLock 繼承自 RedissonLock,同樣一路向下找到加鎖實現方法 tryLockInnerAsync。
這裏有 2 段冗長的 Lua,但是 Debug 發現,公平鎖的入口在 command == RedisCommands.EVAL_LONG 之後,此段 Lua 較長,參數也多,我們着重分析 Lua 的實現規則
參數
-- lua中的幾個參數
KEYS = Arrays.<Object>asList(getName(), threadsQueueName, timeoutSetName)
KEYS[1]: lock_name, 鎖名稱
KEYS[2]: "redisson_lock_queue:{xxx}" 線程隊列
KEYS[3]: "redisson_lock_timeout:{xxx}" 線程id對應的超時集合
ARGV = internalLockLeaseTime, getLockName(threadId), currentTime + threadWaitTime, currentTime
ARGV[1]: "{leaseTime}" 過期時間
ARGV[2]: "{Redisson.UUID}:{threadId}"
ARGV[3] = 當前時間 + 線程等待時間:(10:00:00) + 5000毫秒 = 10:00:05
ARGV[4] = 當前時間(10:00:00) 部署服務器時間,非redis-server服務器時間
公平鎖實現的 Lua 腳本
-- 1.死循環清除過期key
while true do
-- 獲取頭節點
local firstThreadId2 = redis.call('lindex', KEYS[2], 0);
-- 首次獲取必空跳出循環
if firstThreadId2 == false then
break;
end;
-- 清除過期key
local timeout = tonumber(redis.call('zscore', KEYS[3], firstThreadId2));
if timeout <= tonumber(ARGV[4]) then
redis.call('zrem', KEYS[3], firstThreadId2);
redis.call('lpop', KEYS[2]);
else
break;
end;
end;
-- 2.不存在該鎖 && (不存在線程等待隊列 || 存在線程等待隊列而且第一個節點就是此線程ID),加鎖部分主要邏輯
if (redis.call('exists', KEYS[1]) == 0) and
((redis.call('exists', KEYS[2]) == 0) or (redis.call('lindex', KEYS[2], 0) == ARGV[2])) then
-- 彈出隊列中線程id元素,刪除Zset中該線程id對應的元素
redis.call('lpop', KEYS[2]);
redis.call('zrem', KEYS[3], ARGV[2]);
local keys = redis.call('zrange', KEYS[3], 0, -1);
-- 遍歷zSet所有key,將key的超時時間(score) - 當前時間ms
for i = 1, #keys, 1 do
redis.call('zincrby', KEYS[3], -tonumber(ARGV[3]), keys[i]);
end;
-- 加鎖設置鎖過期時間
redis.call('hset', KEYS[1], ARGV[2], 1);
redis.call('pexpire', KEYS[1], ARGV[1]);
return nil;
end;
-- 3.線程存在,重入判斷
if redis.call('hexists', KEYS[1], ARGV[2]) == 1 then
redis.call('hincrby', KEYS[1], ARGV[2],1);
redis.call('pexpire', KEYS[1], ARGV[1]);
return nil;
end;
-- 4.返回當前線程剩餘存活時間
local timeout = redis.call('zscore', KEYS[3], ARGV[2]);
if timeout ~= false then
-- 過期時間timeout的值在下方設置,此處的減法算出的依舊是當前線程的ttl
return timeout - tonumber(ARGV[3]) - tonumber(ARGV[4]);
end;
-- 5.尾節點剩餘存活時間
local lastThreadId = redis.call('lindex', KEYS[2], -1);
local ttl;
-- 尾節點不空 && 尾節點非當前線程
if lastThreadId ~= false and lastThreadId ~= ARGV[2] then
-- 計算隊尾節點剩餘存活時間
ttl = tonumber(redis.call('zscore', KEYS[3], lastThreadId)) - tonumber(ARGV[4]);
else
-- 獲取lock_name剩餘存活時間
ttl = redis.call('pttl', KEYS[1]);
end;
-- 6.末尾排隊
-- zSet 超時時間(score),尾節點ttl + 當前時間 + 5000ms + 當前時間,無則新增,有則更新
-- 線程id放入隊列尾部排隊,無則插入,有則不再插入
local timeout = ttl + tonumber(ARGV[3]) + tonumber(ARGV[4]);
if redis.call('zadd', KEYS[3], timeout, ARGV[2]) == 1 then
redis.call('rpush', KEYS[2], ARGV[2]);
end;
return ttl;
- 公平鎖加鎖步驟
通過以上 Lua,可以發現,lua 操作的關鍵結構是列表(list)和有序集合(zSet)。
其中 list 維護了一個等待的線程隊列 redisson_lock_queue:{xxx},zSet 維護了一個線程超時情況的有序集合 redisson_lock_timeout:{xxx},儘管 lua 較長,但是可以拆分爲 6 個步驟
- 隊列清理
- 保證隊列中只有未過期的等待線程
- 首次加鎖
- hset 加鎖,pexpire 過期時間
- 重入判斷
- 此處同可重入鎖 lua
-
返回 ttl
-
計算尾節點 ttl
-
- 初始值爲鎖的剩餘過期時間
- 末尾排隊
- ttl + 2 * currentTime + waitTime 是 score 的默認值計算公式
- 模擬
如果模擬以下順序,就會明瞭 redisson 公平鎖整個加鎖流程
假設 t1 10:00:00 < t2 10:00:10 < t3 10:00:20
t1:當線程 1 初次獲取鎖
等待隊列無頭節點,跳出死循環 ->2
不存在該鎖 && 不存在線程等待隊列 成立
2.1 lpop 和 zerm、zincrby 都是無效操作,只有加鎖生效,說明是首次加鎖,加鎖後返回 nil
加鎖成功,線程 1 獲取到鎖,結束
t2:線程 2 嘗試獲取鎖(線程 1 未釋放鎖)
等待隊列無頭節點,跳出死循環 ->2
不存在該鎖 不成立 ->3
非重入線程 ->4
4.score 無值 ->5
尾節點爲空,設置 ttl 初始值爲 lock_name 的 ttl -> 6
按照 ttl + waitTime + currentTime + currentTime 來設置 zSet 超時時間 score,並且加入等待隊列,線程 2 爲頭節點
score = 20S + 5000ms + 10:00:10 + 10:00:10 = 10:00:35 + 10:00:10
t3:線程 3 嘗試獲取鎖(線程 1 未釋放鎖)
- 等待隊列有頭節點
1.1 未過期 ->2
不存在該鎖不成立 ->3
非重入線程 ->4
4.score 無值 ->5
- 尾節點不爲空 && 尾節點線程爲 2,非當前線程
5.1 取出之前設置的 score,減去當前時間:ttl = score - currentTime ->6
- 按照 ttl + waitTime + currentTime + currentTime 來設置 zSet 超時時間 score,並且加入等待隊列
score = 10S + 5000ms + 10:00:20 + 10:00:20 = 10:00:35 + 10:00:20
如此一來,三個需要搶奪一把鎖的線程,完成了一次排隊,在 list 中排列他們等待線程 id,在 zSet 中存放過期時間(便於排列優先級)。其中返回 ttl 的線程 2 客戶端、線程 3 客戶端將會一直按一定間隔自旋重複執行該段 Lua,嘗試加鎖,如此一來便和 AQS 有了異曲同工之處。
而當線程 1 釋放鎖之後(這裏依舊有通過 Pub/Sub 發佈解鎖消息,通知其他線程獲取)
10:00:30 線程 2 嘗試獲取鎖(線程 1 已釋放鎖)
等待隊列有頭節點,未過期 ->2
不存在該鎖 & 等待隊列頭節點是當前線程 成立
2.1 刪除當前線程的隊列信息和 zSet 信息,超時時間爲:
線程 2 10:00:35 + 10:00:10 - 10:00:30 = 10:00:15
線程 3 10:00:35 + 10:00:20 - 10:00:30 = 10:00:25
2.2 線程 2 獲取到鎖,重新設置過期時間
加鎖成功,線程 2 獲取到鎖,結束
排隊結構如圖
公平鎖的釋放腳本和重入鎖類似,多了一步加鎖開頭的清理過期 key 的 while true 邏輯,在此不再展開篇幅描述。
由上可以看出,Redisson 公平鎖的玩法類似於延遲隊列的玩法,核心都在 Redis 的 List 和 zSet 結構的搭配,但又借鑑了 AQS 實現,在定時判斷頭節點上如出一轍(watchDog),保證了鎖的競爭公平和互斥。併發場景下,lua 腳本里,zSet 的 score 很好地解決了順序插入的問題,排列好優先級。並且爲了防止因異常而退出的線程無法清理,每次請求都會判斷頭節點的過期情況給予清理,最後釋放時通過 CHANNEL 通知訂閱線程可以來獲取鎖,重複一開始的步驟,順利交接到下一個順序線程。
六、總結
Redisson 整體實現分佈式加解鎖流程的實現稍顯複雜,作者 Rui Gu 對 Netty 和 JUC、Redis 研究深入,利用了很多高級特性和語義,值得深入學習,本次介紹也只是單機 Redis 下鎖實現,Redisson 也提供了多機情況下的聯鎖(MultiLock) 和官方推薦的紅鎖(RedLock),下一章再詳細介紹。
所以,當你真的需要分佈式鎖時,不妨先來 Redisson 裏找找。
作者:長江水面寫日記
來源:juejin.cn/post/6961380552519712798
本文由 Readfog 進行 AMP 轉碼,版權歸原作者所有。
來源:https://mp.weixin.qq.com/s/qeowtQ2WW6TR2DYdN5vj_A