基於 Zookeeper 實現分佈式鎖實踐
基於 Zookeeper 實現分佈式鎖實踐
1、什麼是 Zookeeper?
Zookeeper 是一個分佈式的,開源的分佈式應用程序協調服務,是 Hadoop 和 hbase 的重要組件。
引用官網的圖例:
特徵:
-
zookeeper 的數據機構是一種節點樹的數據結構,zNode 是基本的單位,znode 是一種和 unix 文件系統相似的節點,可以往這個節點存儲或向這個節點獲取數據
-
通過客戶端可以對 znode 進行數據操作,還可以註冊 watcher 監控 znode 的改變
2、Zookeeper 節點類型
-
持久節點(Persistent)
-
持久順序節點(Persistent_Sequential)
-
臨時節點(Ephemeral)
-
臨時順序節點(Ephemeral_Sequential)
3、Zookeeper 環境搭建
下載 zookeeper,官網鏈接,https://zookeeper.apache.org/releases.html#download,去官網找到對應的軟件下載到本地
修改配置文件,${ZOOKEEPER_HOME}\conf
,找到 zoo_sample.cfg 文件,先備份一份,另外一份修改爲 zoo.cfg
解壓後點擊 zkServer.cmd 運行服務端:
4、Zookeeper 基本使用
在 cmd 窗口或者直接在 idea 編輯器裏的 terminal 輸入命令:
zkCli.cmd -server 127.0.0.1:2181
輸入命令help
查看幫助信息:
ZooKeeper -server host:port -client-configuration properties-file cmd args
addWatch [-m mode] path # optional mode is one of [PERSISTENT, PERSISTENT_RECURSIVE] - default is PERSISTENT_RECURSIVE
addauth scheme auth
close
config [-c] [-w] [-s]
connect host:port
create [-s] [-e] [-c] [-t ttl] path [data] [acl]
delete [-v version] path
deleteall path [-b batch size]
delquota [-n|-b|-N|-B] path
get [-s] [-w] path
getAcl [-s] path
getAllChildrenNumber path
getEphemerals path
history
listquota path
ls [-s] [-w] [-R] path
printwatches on|off
quit
reconfig [-s] [-v version] [[-file path] | [-members serverID=host:port1:port2;port3[,...]*]] | [-add serverId=host:port1:port2;port3[,...]]* [-remove serverId[,...]*]
redo cmdno
removewatches path [-c|-d|-a] [-l]
set [-s] [-v version] path data
setAcl [-s] [-v version] [-R] path acl
setquota -n|-b|-N|-B val path
stat [-w] path
sync path
version
whoami
create [-s] [-e] [-c] [-t ttl] path [data] [acl]
,-s
表示順序節點,-e
表示臨時節點,若不指定表示持久節點,acl
是來進行權限控制的
[zk: 127.0.0.1:2181(CONNECTED) 1] create -s /zk-test 0
Created /zk-test0000000000
查看
[zk: 127.0.0.1:2181(CONNECTED) 4] ls /
[zk-test0000000000, zookeeper]
設置修改節點數據
set /zk-test 123
獲取節點數據
get /zk-test
ps,zookeeper 命令詳情查看 help 幫助文檔,也可以去官網看看文檔
ok,然後 java 寫個例子,進行 watcher 監聽
package com.example.concurrent.zkSample;
import org.I0Itec.zkclient.IZkDataListener;
import org.I0Itec.zkclient.ZkClient;
/**
*
<pre>
* Zookeeper 例子
* </pre>
*
*
<pre>
* @author mazq
* 修改記錄
* 修改後版本: 修改人:修改日期: 2021/12/09 16:57 修改內容:
* </pre>
*/
public class ZookeeperSample {
public static void main(String[] args) {
ZkClient client = new ZkClient("localhost:2181");
client.setZkSerializer(new MyZkSerializer());
client.subscribeDataChanges("/zk-test", new IZkDataListener() {
@Override
public void handleDataChange(String dataPath, Object data) throws Exception {
System.out.println("監聽到節點數據改變!");
}
@Override
public void handleDataDeleted(String dataPath) throws Exception {
System.out.println("監聽到節點數據被刪除了");
}
});
try {
Thread.sleep(1000 * 60 * 2);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
5、Zookeeper 應用場景
Zookeeper 有什麼典型的應用場景:
-
註冊中心(Dubbo)
-
命名服務
-
Master 選舉
-
集羣管理
-
分佈式隊列
-
分佈式鎖
6、Zookeeper 分佈式鎖
Zookeeper 適合用來做分佈式鎖,然後具體實現是利用什麼原理?我們知道 zookeeper 是類似於 unix 的文件系統,文件系統我們也知道在一個文件夾下面,會有文件名稱不能一致的特性的,也就是互斥的特性。同樣 zookeeper 也有這個特性,在同個 znode 節點下面,子節點命名不能重複。所以利用這個特性可以來實現分佈式鎖
業務場景:在高併發的情況下面進行訂單場景,這是一個典型的電商場景
自定義的 Zookeeper 序列化類:
package com.example.concurrent.zkSample;
import org.I0Itec.zkclient.exception.ZkMarshallingError;
import org.I0Itec.zkclient.serialize.ZkSerializer;
import java.io.UnsupportedEncodingException;
public class MyZkSerializer implements ZkSerializer {
private String charset = "UTF-8";
@Override
public byte[] serialize(Object o) throws ZkMarshallingError {
return String.valueOf(o).getBytes();
}
@Override
public Object deserialize(byte[] bytes) throws ZkMarshallingError {
try {
return new String(bytes , charset);
} catch (UnsupportedEncodingException e) {
throw new ZkMarshallingError();
}
}
}
訂單編號生成器類,因爲 SimpleDateFormat 是線程不安全的,所以還是要加上 ThreadLocal
package com.example.concurrent.zkSample;
import java.text.DateFormat;
import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.concurrent.atomic.AtomicInteger;
public class OrderCodeGenerator {
private static final String DATE_FORMAT = "yyyyMMddHHmmss";
private static AtomicInteger ai = new AtomicInteger(0);
private static int i = 0;
private static ThreadLocal<SimpleDateFormat> threadLocal = new ThreadLocal<SimpleDateFormat>() {
@Override
protected SimpleDateFormat initialValue() {
return new SimpleDateFormat(DATE_FORMAT);
}
};
public static DateFormat getDateFormat() {
return (DateFormat) threadLocal.get();
}
public static String generatorOrderCode() {
try {
return getDateFormat().format(new Date(System.currentTimeMillis()))
+ i++;
} finally {
threadLocal.remove();
}
}
}
pom.xml 加上 zookeeper 客戶端的配置:
<dependency>
<groupId>com.101tec</groupId>
<artifactId>zkclient</artifactId>
<version>0.10</version>
</dependency>
實現一個 zookeeper 分佈式鎖,思路是獲取節點,這個是多線程競爭的,能獲取到鎖,也就是創建節點成功,就執行業務,其它搶不到鎖的線程,阻塞等待,註冊 watcher 監聽鎖是否釋放了,釋放了,取消註冊 watcher,繼續搶鎖
package com.example.concurrent.zkSample;
import lombok.extern.slf4j.Slf4j;
import org.I0Itec.zkclient.IZkDataListener;
import org.I0Itec.zkclient.ZkClient;
import org.I0Itec.zkclient.exception.ZkNodeExistsException;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
@Slf4j
public class ZKDistributeLock implements Lock {
private String localPath;
private ZkClient zkClient;
ZKDistributeLock(String localPath) {
super();
this.localPath = localPath;
zkClient = new ZkClient("localhost:2181");
zkClient.setZkSerializer(new MyZkSerializer());
}
@Override
public void lock() {
while (!tryLock()) {
waitForLock();
}
}
private void waitForLock() {
// 創建countdownLatch協同
CountDownLatch countDownLatch = new CountDownLatch(1);
// 註冊watcher監聽
IZkDataListener listener = new IZkDataListener() {
@Override
public void handleDataChange(String path, Object o) throws Exception {
//System.out.println("zookeeper data has change!!!");
}
@Override
public void handleDataDeleted(String s) throws Exception {
// System.out.println("zookeeper data has delete!!!");
// 監聽到鎖釋放了,釋放線程
countDownLatch.countDown();
}
};
zkClient.subscribeDataChanges(localPath , listener);
// 線程等待
if (zkClient.exists(localPath)) {
try {
countDownLatch.await();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
// 取消註冊
zkClient.unsubscribeDataChanges(localPath , listener);
}
@Override
public void unlock() {
zkClient.delete(localPath);
}
@Override
public boolean tryLock() {
try {
zkClient.createEphemeral(localPath);
} catch (ZkNodeExistsException e) {
return false;
}
return true;
}
@Override
public boolean tryLock(long time, TimeUnit unit) throws InterruptedException {
return false;
}
@Override
public void lockInterruptibly() throws InterruptedException {
}
@Override
public Condition newCondition() {
return null;
}
}
訂單服務 api
package com.example.concurrent.zkSample;
public interface OrderService {
void createOrder();
}
訂單服務實現類,加上 zookeeper 分佈式鎖
package com.example.concurrent.zkSample;
import java.util.concurrent.locks.Lock;
public class OrderServiceInvoker implements OrderService{
@Override
public void createOrder() {
Lock zkLock = new ZKDistributeLock("/zk-test");
//Lock zkLock = new ZKDistributeImproveLock("/zk-test");
String orderCode = null;
try {
zkLock.lock();
orderCode = OrderCodeGenerator.generatorOrderCode();
} finally {
zkLock.unlock();
}
System.out.println(String.format("thread name : %s , orderCode : %s" ,
Thread.currentThread().getName(),
orderCode));
}
}
因爲搭建分佈式環境比較繁瑣,所以這裏使用 juc 裏的併發協同工具類,CyclicBarrier 模擬多線程併發的場景,模擬分佈式環境的高併發場景
package com.example.concurrent.zkSample;
import java.util.concurrent.BrokenBarrierException;
import java.util.concurrent.CyclicBarrier;
public class ConcurrentDistributeTest {
public static void main(String[] args) {
// 多線程數
int threadSize = 30;
// 創建多線程循環屏障
CyclicBarrier cyclicBarrier = new CyclicBarrier(threadSize , ()->{
System.out.println("準備完成!");
}) ;
// 模擬分佈式集羣的場景
for (int i = 0 ; i < threadSize ; i ++) {
new Thread(()->{
OrderService orderService = new OrderServiceInvoker();
// 所有線程都等待
try {
cyclicBarrier.await();
} catch (InterruptedException e) {
e.printStackTrace();
} catch (BrokenBarrierException e) {
e.printStackTrace();
}
// 模擬併發請求
orderService.createOrder();
}).start();
}
}
}
跑多幾次,沒有發現訂單號重複的情況,分佈式鎖還是有點效果的
thread name : Thread-6 , orderCode : 202112100945110
thread name : Thread-1 , orderCode : 202112100945111
thread name : Thread-13 , orderCode : 202112100945112
thread name : Thread-11 , orderCode : 202112100945113
thread name : Thread-14 , orderCode : 202112100945114
thread name : Thread-0 , orderCode : 202112100945115
thread name : Thread-8 , orderCode : 202112100945116
thread name : Thread-17 , orderCode : 202112100945117
thread name : Thread-10 , orderCode : 202112100945118
thread name : Thread-5 , orderCode : 202112100945119
thread name : Thread-2 , orderCode : 2021121009451110
thread name : Thread-16 , orderCode : 2021121009451111
thread name : Thread-19 , orderCode : 2021121009451112
thread name : Thread-4 , orderCode : 2021121009451113
thread name : Thread-18 , orderCode : 2021121009451114
thread name : Thread-3 , orderCode : 2021121009451115
thread name : Thread-9 , orderCode : 2021121009451116
thread name : Thread-12 , orderCode : 2021121009451117
thread name : Thread-15 , orderCode : 2021121009451118
thread name : Thread-7 , orderCode : 2021121009451219
註釋加鎖的代碼,再加大併發數,模擬一下
package com.example.concurrent.zkSample;
import java.util.concurrent.locks.Lock;
public class OrderServiceInvoker implements OrderService{
@Override
public void createOrder() {
//Lock zkLock = new ZKDistributeLock("/zk-test");
//Lock zkLock = new ZKDistributeImproveLock("/zk-test");
String orderCode = null;
try {
//zkLock.lock();
orderCode = OrderCodeGenerator.generatorOrderCode();
} finally {
//zkLock.unlock();
}
System.out.println(String.format("thread name : %s , orderCode : %s" ,
Thread.currentThread().getName(),
orderCode));
}
}
跑多幾次,發現出現訂單號重複的情況,所以分佈式鎖是可以保證分佈式環境的線程安全的
7、公平式 Zookeeper 分佈式鎖
上面例子是一種非公平鎖的方式,一旦監聽到鎖釋放了,所有線程都會去搶鎖,所以容易出現“驚羣效應”
:
-
巨大的服務器性能損耗
-
網絡衝擊
-
可能造成宕機
所以,需要改進分佈式鎖,改成一種公平鎖的模式
-
公平鎖:多個線程按照申請鎖的順序去獲取鎖,線程會在隊列裏排隊,按照順序去獲取鎖。只有隊列第 1 個線程才能獲取到鎖,獲取到鎖之後,其它線程都會阻塞等待,等到持有鎖的線程釋放鎖,其它線程纔會被喚醒。
-
非公平鎖:多個線程都會去競爭獲取鎖,獲取不到就進入隊列等待,競爭得到就直接獲取鎖;然後持有鎖的線程釋放鎖之後,所有等待的線程就都會去競爭鎖。
流程圖:
代碼改進:
package com.example.concurrent.zkSample;
import org.I0Itec.zkclient.IZkDataListener;
import org.I0Itec.zkclient.ZkClient;
import org.I0Itec.zkclient.exception.ZkNodeExistsException;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
public class ZKDistributeImproveLock implements Lock {
private String localPath;
private ZkClient zkClient;
private String currentPath;
private String beforePath;
ZKDistributeImproveLock(String localPath) {
super();
this.localPath = localPath;
zkClient = new ZkClient("localhost:2181");
zkClient.setZkSerializer(new MyZkSerializer());
if (!zkClient.exists(localPath)) {
try {
this.zkClient.createPersistent(localPath);
} catch (ZkNodeExistsException e) {
}
}
}
@Override
public void lock() {
while (!tryLock()) {
waitForLock();
}
}
private void waitForLock() {
CountDownLatch countDownLatch = new CountDownLatch(1);
// 註冊watcher
IZkDataListener listener = new IZkDataListener() {
@Override
public void handleDataChange(String dataPath, Object data) throws Exception {
}
@Override
public void handleDataDeleted(String dataPath) throws Exception {
// 監聽到鎖釋放,喚醒線程
countDownLatch.countDown();
}
};
zkClient.subscribeDataChanges(beforePath, listener);
// 線程等待
if (zkClient.exists(beforePath)) {
try {
countDownLatch.await();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
// 取消註冊
zkClient.unsubscribeDataChanges(beforePath , listener);
}
@Override
public void unlock() {
zkClient.delete(this.currentPath);
}
@Override
public boolean tryLock() {
if (this.currentPath == null) {
currentPath = zkClient.createEphemeralSequential(localPath +"/" , "123");
}
// 獲取Znode節點下面的所有子節點
List<String> children = zkClient.getChildren(localPath);
// 列表排序
Collections.sort(children);
if (currentPath.equals(localPath + "/" + children.get(0))) { // 當前節點是第1個節點
return true;
} else {
//得到當前的索引號
int index = children.indexOf(currentPath.substring(localPath.length() + 1));
//取到前一個
beforePath = localPath + "/" + children.get(index - 1);
}
return false;
}
@Override
public boolean tryLock(long time, TimeUnit unit) throws InterruptedException {
return false;
}
@Override
public void lockInterruptibly() throws InterruptedException {
}
@Override
public Condition newCondition() {
return null;
}
}
thread name : Thread-13 , orderCode : 202112100936140
thread name : Thread-3 , orderCode : 202112100936141
thread name : Thread-14 , orderCode : 202112100936142
thread name : Thread-16 , orderCode : 202112100936143
thread name : Thread-1 , orderCode : 202112100936144
thread name : Thread-9 , orderCode : 202112100936145
thread name : Thread-4 , orderCode : 202112100936146
thread name : Thread-5 , orderCode : 202112100936147
thread name : Thread-7 , orderCode : 202112100936148
thread name : Thread-2 , orderCode : 202112100936149
thread name : Thread-17 , orderCode : 2021121009361410
thread name : Thread-15 , orderCode : 2021121009361411
thread name : Thread-0 , orderCode : 2021121009361412
thread name : Thread-10 , orderCode : 2021121009361413
thread name : Thread-18 , orderCode : 2021121009361414
thread name : Thread-19 , orderCode : 2021121009361415
thread name : Thread-8 , orderCode : 2021121009361416
thread name : Thread-12 , orderCode : 2021121009361417
thread name : Thread-11 , orderCode : 2021121009361418
thread name : Thread-6 , orderCode : 2021121009361419
8、zookeeper 和 Redis 鎖對比?
Redis 和 Zookeeper 都可以用來實現分佈式鎖,兩者可以進行對比:
-
基於 Redis 實現分佈式鎖
-
實現比較複雜
-
存在死鎖的可能
-
性能比較好,基於內存 ,而且保證的是高可用,redis 優先保證的是 AP(分佈式 CAP 理論)
-
基於 Zookeeper 實現分佈式鎖
-
實現相對簡單
-
可靠性高,因爲 zookeeper 保證的是 CP(分佈式 CAP 理論)
-
性能相對較好 併發 1~2 萬左右,併發太高,還是 redis 性能好
本博客代碼可以在 GitHub 找到下載鏈接
附錄參考資料
-
https://juejin.cn/post/6844903677367418893
-
https://blog.csdn.net/yangsnow_rain_wind/article/details/80749402
本文由 Readfog 進行 AMP 轉碼,版權歸原作者所有。
來源:https://mp.weixin.qq.com/s/GRZ115bme-XR9bWlCtJCfg