概述
提到鎖,想必大家可能最先想到的是Java JUC中的synchronized
關(guān)鍵字或者可重入鎖ReentrantLock
。它能夠保證我們的代碼在同一個時刻只有一個線程執(zhí)行,保證數(shù)據(jù)的一致性和完整性。但是它僅限于單體項目,也就是說它們只能保證單個JVM應(yīng)用內(nèi)線程的順序執(zhí)行。
如果你部署了多個節(jié)點,也就是分布式場景下如何保證不同節(jié)點在同一時刻只有一個線程執(zhí)行呢?場景的業(yè)務(wù)場景比如秒殺、搶優(yōu)惠券等,這就引入了我們的分布式鎖,本文我們主要講解利用Zookeeper的特性如何來實現(xiàn)我們的分布式鎖。
Zookeeper分布式鎖實現(xiàn)原理
利用Zookeeper的臨時順序節(jié)點和監(jiān)聽機制兩大特性,可以幫助我們實現(xiàn)分布式鎖。
- 首先得有一個持久節(jié)點
/locks
, 路徑服務(wù)于某個使用場景,如果有多個使用場景建議路徑不同。 - 請求進來時首先在
/locks
創(chuàng)建臨時有序節(jié)點,所有會看到在/locks
下面有seq-000000000, seq-00000001 等等節(jié)點。 - 然后判斷當前創(chuàng)建得節(jié)點是不是
/locks
路徑下面最小的節(jié)點,如果是,獲取鎖,不是,阻塞線程,同時設(shè)置監(jiān)聽器,監(jiān)聽前一個節(jié)點。 - 獲取到鎖以后,開始處理業(yè)務(wù)邏輯,最后delete當前節(jié)點,表示釋放鎖。
- 后一個節(jié)點就會收到通知,喚起線程,重復(fù)上面的判斷。
大家有沒有想過為什么要設(shè)置對前一個節(jié)點的監(jiān)聽?
主要為了避免羊群效應(yīng)。所謂羊群效應(yīng)就是一個節(jié)點掛掉,所有節(jié)點都去監(jiān)聽,然后做出反應(yīng),這樣會給服務(wù)器帶來巨大壓力,所以有了臨時順序節(jié)點,當一個節(jié)點掛掉,只有它后面的那一個節(jié)點才做出反應(yīng)。
原生Zookeeper客戶端實現(xiàn)分布式鎖
通過原生zookeeper api方式的實現(xiàn),可以加強我們對zk實現(xiàn)分布式鎖原理的理解。
public class DistributedLock {
private String connectString = "10.100.1.176:2281";
private int sessionTimeout = 2000;
private ZooKeeper zk;
private String rootNode = "lock";
private String subNode = "seq-";
private String waitPath;
// 當前client創(chuàng)建的子節(jié)點
private String currentNode;
private CountDownLatch countDownLatch = new CountDownLatch(1);
private CountDownLatch waitDownLatch = new CountDownLatch(1);
public DistributedLock() throws IOException, InterruptedException, KeeperException {
zk = new ZooKeeper(connectString, sessionTimeout, new Watcher() {
@Override
public void process(WatchedEvent event) {
// 如果連接建立時,喚醒 wait 在該 latch 上的線程
if(event.getState() == Event.KeeperState.SyncConnected) {
countDownLatch.countDown();
}
// 發(fā)生了 waitPath 的刪除事件
if(event.getType() == Event.EventType.NodeDeleted && event.getPath().equals(waitPath)) {
waitDownLatch.countDown();
}
}
});
// 等待連接建立,因為連接建立時異步過程
countDownLatch.await();
// 獲取根節(jié)點
Stat stat = zk.exists("/" + rootNode, false);
// 如果根節(jié)點不存在,則創(chuàng)建根節(jié)點
if(stat == null) {
System.out.println("創(chuàng)建根節(jié)點");
zk.create("/" + rootNode, new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
}
}
public void zkLock() {
try {
// 在根節(jié)點創(chuàng)建臨時順序節(jié)點
currentNode = zk.create("/" + rootNode + "/" + subNode, null, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL);
// 獲取子節(jié)點
List<String> childrenNodes = zk.getChildren("/" + rootNode, false);
// 如果只有一個子節(jié)點,說明是當前節(jié)點,直接獲得鎖
if(childrenNodes.size() == 1) {
return;
} else {
//對根節(jié)點下的所有臨時順序節(jié)點進行從小到大排序
Collections.sort(childrenNodes);
//當前節(jié)點名稱
String thisNode = currentNode.substring(("/" + rootNode + "/").length());
//獲取當前節(jié)點的位置
int index = childrenNodes.indexOf(thisNode);
if (index == -1) {
System.out.println("數(shù)據(jù)異常");
} else if (index == 0) {
// index == 0, 說明 thisNode 在列表中最小, 當前client 獲得鎖
return;
} else {
// 獲得排名比 currentNode 前 1 位的節(jié)點
this.waitPath = "/" + rootNode + "/" + childrenNodes.get(index - 1);
// 在 waitPath節(jié)點上注冊監(jiān)聽器, 當 waitPath 被刪除時,zookeeper 會回調(diào)監(jiān)聽器的 process 方法
zk.getData(waitPath, true, new Stat());
//進入等待鎖狀態(tài)
waitDownLatch.await();
}
}
} catch (KeeperException e) {
e.printStackTrace();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
public void zkUnlock() {
try {
zk.delete(this.currentNode, -1);
} catch (InterruptedException e) {
e.printStackTrace();
} catch (KeeperException e) {
e.printStackTrace();
}
}
}
測試代碼如下:
public class DistributedLockTest {
public static void main(String[] args) throws IOException, InterruptedException, KeeperException {
DistributedLock lock1 = new DistributedLock();
DistributedLock lock2 = new DistributedLock();
new Thread(() -> {
// 獲取鎖對象
try {
lock1.zkLock();
System.out.println("線程 1 獲取鎖");
Thread.sleep(5 * 1000);
System.out.println("線程 1 釋放鎖");
} catch (Exception e) {
e.printStackTrace();
} finally {
lock1.zkUnlock();
}
}).start();
new Thread(() -> {
// 獲取鎖對象
try {
lock2.zkLock();
System.out.println("線程 2 獲取鎖");
Thread.sleep(5 * 1000);
System.out.println("線程 2 釋放鎖");
} catch (Exception e) {
e.printStackTrace();
} finally {
lock2.zkUnlock();
}
}).start();
}
}
測試結(jié)果:
線程 2 獲取鎖
線程 2 釋放鎖
線程 1 獲取鎖
線程 1 釋放鎖
獲取鎖和釋放鎖成對出現(xiàn),說明分布式鎖生效了。
Curator框架實現(xiàn)分布式鎖
在實際的開發(fā)鐘,我們會直接使用成熟的框架Curator客戶端,它里面封裝了分布式鎖的實現(xiàn),避免我們?nèi)ブ貜?fù)造輪子。
- pom.xml添加如下依賴
<dependency>
<groupId>org.apache.curator<span class="hljs-name"groupId>
<artifactId>curator-recipes<span class="hljs-name"artifactId>
<version>5.2.1<span class="hljs-name"version>
<span class="hljs-name"dependency>
- 通過
InterProcessLock
實現(xiàn)分布式鎖
public class CuratorLockTest {
private String connectString = "10.100.1.14:2181";
private String rootNode = "/locks";
public static void main(String[] args) {
new CuratorLockTest().testLock();
}
public void testLock() {
// 分布式鎖1
InterProcessLock lock1 = new InterProcessMutex(getCuratorFramework(), rootNode);
// 分布式鎖2
InterProcessLock lock2 = new InterProcessMutex(getCuratorFramework(), rootNode);
// 第一個線程
new Thread(() -> {
// 獲取鎖對象
try {
lock1.acquire();
System.out.println("線程 1 獲取鎖");
// 測試鎖重入
lock1.acquire();
System.out.println("線程 1 再次獲取鎖");
Thread.sleep(5 * 1000);
lock1.release();
System.out.println("線程 1 釋放鎖");
lock1.release();
System.out.println("線程 1 再次釋放鎖");
} catch (Exception e) {
e.printStackTrace();
}
}).start();
// 第二個線程
new Thread(() -> {
// 獲取鎖對象
try {
lock2.acquire();
System.out.println("線程 2 獲取鎖");
// 測試鎖重入
lock2.acquire();
System.out.println("線程 2 再次獲取鎖");
Thread.sleep(5 * 1000);
lock2.release();
System.out.println("線程 2 釋放鎖");
lock2.release();
System.out.println("線程 2 再次釋放鎖");
} catch (Exception e) {
e.printStackTrace();
}
}).start();
}
public CuratorFramework getCuratorFramework() {
CuratorFramework client = CuratorFrameworkFactory.builder()
.connectString(connectString).connectionTimeoutMs(2000)
.sessionTimeoutMs(2000)
.retryPolicy(new ExponentialBackoffRetry(3000, 3)).build();
// 連接
client.start();
System.out.println("zookeeper 初始化完成...");
return client;
}
}
- 結(jié)果展示
線程 1 釋放鎖
線程 1 再次釋放鎖
線程 2 獲取鎖
線程 2 再次獲取鎖
線程 2 釋放鎖
線程 2 再次釋放鎖
有興趣的看下源碼,它是通過wait、notify來實現(xiàn)阻塞。
代碼 : https://github.com/alvinlkk/awesome-java-full-demo/tree/master/zookeeper-demo/zookeeper-lock
總結(jié)
ZooKeeper
分布式鎖(如InterProcessMutex
),能有效的解決分布式鎖問題,但是性能并不高。
因為每次在創(chuàng)建鎖和釋放鎖的過程中,都要動態(tài)創(chuàng)建、銷毀瞬時節(jié)點來實現(xiàn)鎖功能。大家知道,ZK中創(chuàng)建和刪除節(jié)點只能通過Leader服務(wù)器來執(zhí)行,然后Leader
服務(wù)器還需要將數(shù)據(jù)同不到所有的Follower
機器上,這樣頻繁的網(wǎng)絡(luò)通信,性能的短板是非常突出的。
在高性能,高并發(fā)的場景下,不建議使用ZooKeeper
的分布式鎖,可以使用Redis
的分布式鎖。而由于ZooKeeper
的高可用特性,所以在并發(fā)量不是太高的場景,推薦使用ZooKeeper
的分布式鎖。
-
JAVA
+關(guān)注
關(guān)注
19文章
2958瀏覽量
104548 -
代碼
+關(guān)注
關(guān)注
30文章
4748瀏覽量
68354 -
JVM
+關(guān)注
關(guān)注
0文章
157瀏覽量
12208 -
線程
+關(guān)注
關(guān)注
0文章
504瀏覽量
19651 -
zookeeper
+關(guān)注
關(guān)注
0文章
33瀏覽量
3665
發(fā)布評論請先 登錄
相關(guān)推薦
評論