精品国产人成在线_亚洲高清无码在线观看_国产在线视频国产永久2021_国产AV综合第一页一个的一区免费影院黑人_最近中文字幕MV高清在线视频

0
  • 聊天消息
  • 系統(tǒng)消息
  • 評論與回復(fù)
登錄后你可以
  • 下載海量資料
  • 學習在線課程
  • 觀看技術(shù)視頻
  • 寫文章/發(fā)帖/加入社區(qū)
會員中心
創(chuàng)作中心

完善資料讓更多小伙伴認識你,還能領(lǐng)取20積分哦,立即完善>

3天內(nèi)不再提示

用Zookeeper怎么實現(xiàn)一個分布式鎖?

jf_78858299 ? 來源:JAVA旭陽 ? 作者:JAVA旭陽 ? 2023-05-11 11:02 ? 次閱讀

概述

提到鎖,想必大家可能最先想到的是Java JUC中的synchronized關(guān)鍵字或者可重入鎖ReentrantLock。它能夠保證我們的代碼在同一個時刻只有一個線程執(zhí)行,保證數(shù)據(jù)的一致性和完整性。但是它僅限于單體項目,也就是說它們只能保證單個JVM應(yīng)用內(nèi)線程的順序執(zhí)行。

如果你部署了多個節(jié)點,也就是分布式場景下如何保證不同節(jié)點在同一時刻只有一個線程執(zhí)行呢?場景的業(yè)務(wù)場景比如秒殺、搶優(yōu)惠券等,這就引入了我們的分布式鎖,本文我們主要講解利用Zookeeper的特性如何來實現(xiàn)我們的分布式鎖。

Zookeeper分布式鎖實現(xiàn)原理

利用Zookeeper的臨時順序節(jié)點和監(jiān)聽機制兩大特性,可以幫助我們實現(xiàn)分布式鎖。

圖片

  1. 首先得有一個持久節(jié)點/locks, 路徑服務(wù)于某個使用場景,如果有多個使用場景建議路徑不同。
  2. 請求進來時首先在/locks創(chuàng)建臨時有序節(jié)點,所有會看到在/locks下面有seq-000000000, seq-00000001 等等節(jié)點。
  3. 然后判斷當前創(chuàng)建得節(jié)點是不是/locks路徑下面最小的節(jié)點,如果是,獲取鎖,不是,阻塞線程,同時設(shè)置監(jiān)聽器,監(jiān)聽前一個節(jié)點。
  4. 獲取到鎖以后,開始處理業(yè)務(wù)邏輯,最后delete當前節(jié)點,表示釋放鎖。
  5. 后一個節(jié)點就會收到通知,喚起線程,重復(fù)上面的判斷。

大家有沒有想過為什么要設(shè)置對前一個節(jié)點的監(jiān)聽?

主要為了避免羊群效應(yīng)。所謂羊群效應(yīng)就是一個節(jié)點掛掉,所有節(jié)點都去監(jiān)聽,然后做出反應(yīng),這樣會給服務(wù)器帶來巨大壓力,所以有了臨時順序節(jié)點,當一個節(jié)點掛掉,只有它后面的那一個節(jié)點才做出反應(yīng)。

原生Zookeeper客戶端實現(xiàn)分布式鎖

通過原生zookeeper api方式的實現(xiàn),可以加強我們對zk實現(xiàn)分布式鎖原理的理解。

public class DistributedLock {

    private String connectString = "10.100.1.176:2281";

    private int sessionTimeout = 2000;

    private ZooKeeper zk;

    private String rootNode = "lock";

    private String subNode = "seq-";

    private String waitPath;

    // 當前client創(chuàng)建的子節(jié)點
    private String currentNode;

    private CountDownLatch countDownLatch = new CountDownLatch(1);

    private CountDownLatch waitDownLatch = new CountDownLatch(1);

    public DistributedLock() throws IOException, InterruptedException, KeeperException {
        zk = new ZooKeeper(connectString, sessionTimeout, new Watcher() {
            @Override
            public void process(WatchedEvent event) {
                // 如果連接建立時,喚醒 wait 在該 latch 上的線程
                if(event.getState() == Event.KeeperState.SyncConnected) {
                    countDownLatch.countDown();
                }

                //  發(fā)生了 waitPath 的刪除事件
                if(event.getType() == Event.EventType.NodeDeleted && event.getPath().equals(waitPath)) {
                    waitDownLatch.countDown();
                }
            }
        });

        // 等待連接建立,因為連接建立時異步過程
        countDownLatch.await();
        // 獲取根節(jié)點
        Stat stat = zk.exists("/" + rootNode, false);
        // 如果根節(jié)點不存在,則創(chuàng)建根節(jié)點
        if(stat == null) {
            System.out.println("創(chuàng)建根節(jié)點");
            zk.create("/" + rootNode, new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
        }
    }

    public void zkLock() {
        try {
            // 在根節(jié)點創(chuàng)建臨時順序節(jié)點
            currentNode = zk.create("/" + rootNode + "/" + subNode, null, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL);

            // 獲取子節(jié)點
            List<String> childrenNodes = zk.getChildren("/" + rootNode, false);
            // 如果只有一個子節(jié)點,說明是當前節(jié)點,直接獲得鎖
            if(childrenNodes.size() == 1) {
                return;
            } else {
                //對根節(jié)點下的所有臨時順序節(jié)點進行從小到大排序
                Collections.sort(childrenNodes);
                //當前節(jié)點名稱
                String thisNode = currentNode.substring(("/" + rootNode + "/").length());
                //獲取當前節(jié)點的位置
                int index = childrenNodes.indexOf(thisNode);
                if (index == -1) {
                    System.out.println("數(shù)據(jù)異常");
                } else if (index == 0) {
                    // index == 0, 說明 thisNode 在列表中最小, 當前client 獲得鎖
                    return;
                } else {
                    // 獲得排名比 currentNode 前 1 位的節(jié)點
                    this.waitPath = "/" + rootNode + "/" + childrenNodes.get(index - 1);
                    // 在 waitPath節(jié)點上注冊監(jiān)聽器, 當 waitPath 被刪除時,zookeeper 會回調(diào)監(jiān)聽器的 process 方法
                    zk.getData(waitPath, true, new Stat());
                    //進入等待鎖狀態(tài)
                    waitDownLatch.await();
                }
            }
        } catch (KeeperException e) {
            e.printStackTrace();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }

    public void zkUnlock() {
        try {
            zk.delete(this.currentNode, -1);
        } catch (InterruptedException e) {
            e.printStackTrace();
        } catch (KeeperException e) {
            e.printStackTrace();
        }
    }
}

測試代碼如下:

public class DistributedLockTest {

    public static void main(String[] args) throws IOException, InterruptedException, KeeperException {
        DistributedLock lock1 = new DistributedLock();
        DistributedLock lock2 = new DistributedLock();

        new Thread(() -> {
            // 獲取鎖對象
            try {
                lock1.zkLock();
                System.out.println("線程 1 獲取鎖");
                Thread.sleep(5 * 1000);
                System.out.println("線程 1 釋放鎖");
            } catch (Exception e) {
                e.printStackTrace();
            } finally {
                lock1.zkUnlock();
            }
        }).start();

        new Thread(() -> {
            // 獲取鎖對象
            try {
                lock2.zkLock();
                System.out.println("線程 2 獲取鎖");
                Thread.sleep(5 * 1000);

                System.out.println("線程 2 釋放鎖");
            } catch (Exception e) {
                e.printStackTrace();
            } finally {
                lock2.zkUnlock();
            }
        }).start();
    }
}

測試結(jié)果:

線程 2 獲取鎖
線程 2 釋放鎖
線程 1 獲取鎖
線程 1 釋放鎖

獲取鎖和釋放鎖成對出現(xiàn),說明分布式鎖生效了。

Curator框架實現(xiàn)分布式鎖

在實際的開發(fā)鐘,我們會直接使用成熟的框架Curator客戶端,它里面封裝了分布式鎖的實現(xiàn),避免我們?nèi)ブ貜?fù)造輪子。

  1. pom.xml添加如下依賴
<dependency>
            <groupId>org.apache.curator<span class="hljs-name"groupId>
            <artifactId>curator-recipes<span class="hljs-name"artifactId>
            <version>5.2.1<span class="hljs-name"version>
        <span class="hljs-name"dependency>
  1. 通過InterProcessLock實現(xiàn)分布式鎖
public class CuratorLockTest {

    private String connectString = "10.100.1.14:2181";

    private String rootNode = "/locks";

    public static void main(String[] args) {

        new CuratorLockTest().testLock();

    }

    public void testLock() {
        // 分布式鎖1
        InterProcessLock lock1 = new InterProcessMutex(getCuratorFramework(), rootNode);
        // 分布式鎖2
        InterProcessLock lock2 = new InterProcessMutex(getCuratorFramework(), rootNode);
        // 第一個線程
        new Thread(() -> {
            // 獲取鎖對象
            try {
                lock1.acquire();
                System.out.println("線程 1 獲取鎖");
                // 測試鎖重入
                lock1.acquire();
                System.out.println("線程 1 再次獲取鎖");
                Thread.sleep(5 * 1000);
                lock1.release();
                System.out.println("線程 1 釋放鎖");
                lock1.release();
                System.out.println("線程 1 再次釋放鎖");
            } catch (Exception e) {
                e.printStackTrace();
            }
        }).start();

        // 第二個線程
        new Thread(() -> {
            // 獲取鎖對象
            try {
                lock2.acquire();
                System.out.println("線程 2 獲取鎖");
                // 測試鎖重入
                lock2.acquire();
                System.out.println("線程 2 再次獲取鎖");
                Thread.sleep(5 * 1000);
                lock2.release();
                System.out.println("線程 2 釋放鎖");
                lock2.release();
                System.out.println("線程 2 再次釋放鎖");
            } catch (Exception e) {
                e.printStackTrace();
            }
        }).start();
    }

    public CuratorFramework getCuratorFramework() {
        CuratorFramework client = CuratorFrameworkFactory.builder()
                .connectString(connectString).connectionTimeoutMs(2000)
                .sessionTimeoutMs(2000)
                .retryPolicy(new ExponentialBackoffRetry(3000, 3)).build();
        // 連接
        client.start();
        System.out.println("zookeeper 初始化完成...");
        return client;
    }
}
  1. 結(jié)果展示
線程 1 釋放鎖
線程 1 再次釋放鎖
線程 2 獲取鎖
線程 2 再次獲取鎖
線程 2 釋放鎖
線程 2 再次釋放鎖

有興趣的看下源碼,它是通過wait、notify來實現(xiàn)阻塞。

代碼https://github.com/alvinlkk/awesome-java-full-demo/tree/master/zookeeper-demo/zookeeper-lock

總結(jié)

ZooKeeper分布式鎖(如InterProcessMutex),能有效的解決分布式鎖問題,但是性能并不高。

因為每次在創(chuàng)建鎖和釋放鎖的過程中,都要動態(tài)創(chuàng)建、銷毀瞬時節(jié)點來實現(xiàn)鎖功能。大家知道,ZK中創(chuàng)建和刪除節(jié)點只能通過Leader服務(wù)器來執(zhí)行,然后Leader服務(wù)器還需要將數(shù)據(jù)同不到所有的Follower機器上,這樣頻繁的網(wǎng)絡(luò)通信,性能的短板是非常突出的。

在高性能,高并發(fā)的場景下,不建議使用ZooKeeper的分布式鎖,可以使用Redis的分布式鎖。而由于ZooKeeper的高可用特性,所以在并發(fā)量不是太高的場景,推薦使用ZooKeeper的分布式鎖。

聲明:本文內(nèi)容及配圖由入駐作者撰寫或者入駐合作網(wǎng)站授權(quán)轉(zhuǎn)載。文章觀點僅代表作者本人,不代表電子發(fā)燒友網(wǎng)立場。文章及其配圖僅供工程師學習之用,如有內(nèi)容侵權(quán)或者其他違規(guī)問題,請聯(lián)系本站處理。 舉報投訴
  • JAVA
    +關(guān)注

    關(guān)注

    19

    文章

    2958

    瀏覽量

    104548
  • 代碼
    +關(guān)注

    關(guān)注

    30

    文章

    4748

    瀏覽量

    68354
  • JVM
    JVM
    +關(guān)注

    關(guān)注

    0

    文章

    157

    瀏覽量

    12208
  • 線程
    +關(guān)注

    關(guān)注

    0

    文章

    504

    瀏覽量

    19651
  • zookeeper
    +關(guān)注

    關(guān)注

    0

    文章

    33

    瀏覽量

    3665
收藏 人收藏

    評論

    相關(guān)推薦

    在 Java 中利用 redis 實現(xiàn)分布式服務(wù)

    在 Java 中利用 redis 實現(xiàn)分布式服務(wù)
    發(fā)表于 07-05 13:14

    ZooKeeper分布式橋梁開發(fā)

    從傳統(tǒng)Java Web轉(zhuǎn)入分布式系統(tǒng)應(yīng)用,再到接觸分布式協(xié)調(diào)框架ZooKeeper,通過痛苦的思維邏輯和理念轉(zhuǎn)變,歷經(jīng)一個月時間,小伙伴們終于把Zo
    發(fā)表于 10-09 17:46 ?0次下載
    <b class='flag-5'>ZooKeeper</b><b class='flag-5'>分布式</b>橋梁開發(fā)

    Redis 分布式的正確實現(xiàn)方式

    分布式般有三種實現(xiàn)方式:1. 數(shù)據(jù)庫樂觀;2. 基于Redis的分布式
    的頭像 發(fā)表于 05-31 14:19 ?3572次閱讀

    開源分布式協(xié)調(diào)框架Zookeeper知識點詳解

    1 ZooKeeper簡介 ZooKeeper開源的分布式協(xié)調(diào)框架,它的定位是為分布式應(yīng)
    的頭像 發(fā)表于 05-03 09:32 ?1740次閱讀
    開源<b class='flag-5'>分布式</b>協(xié)調(diào)框架<b class='flag-5'>Zookeeper</b>五<b class='flag-5'>個</b>知識點詳解

    為什么需要分布式 基于Zookeeper安全嗎

    講清楚。導致很多讀者看了很多文章,依舊云里霧里。例如下面這些問題,你能清晰地回答上來嗎? 基于 Redis 如何實現(xiàn)分布式? Redi
    的頭像 發(fā)表于 08-10 18:06 ?5583次閱讀

    為什么需要分布式?基于Redis如何實現(xiàn)分布式?

    分布式鎖相對應(yīng)的是「單機」,我們在寫多線程程序時,避免同時操作共享變量產(chǎn)生數(shù)據(jù)問題,通常會使用
    的頭像 發(fā)表于 03-24 11:55 ?1080次閱讀

    深入理解redis分布式

    系統(tǒng)不同進程共同訪問共享資源的實現(xiàn)。如果不同的系統(tǒng)或同一個系統(tǒng)的不同主機之間共享了某個臨界資源,往往需要互斥來防止彼此干擾,以保證
    的頭像 發(fā)表于 10-08 14:13 ?910次閱讀
    深入理解redis<b class='flag-5'>分布式</b><b class='flag-5'>鎖</b>

    tldb提供分布式使用方法

    前言:分布式分布式系統(tǒng)中極為重要的工具。目前有多種分布式
    的頭像 發(fā)表于 11-02 14:44 ?863次閱讀
    tldb提供<b class='flag-5'>分布式</b><b class='flag-5'>鎖</b>使用方法

    redis分布式如何實現(xiàn)

    的情況,分布式的作用就是確保在同時間只有客戶端可以訪問共享資源,從而保證數(shù)據(jù)的致性和正
    的頭像 發(fā)表于 11-16 11:29 ?499次閱讀

    zookeeper分布式原理

    是提供高可用的、致性的機制,用于解決分布式系統(tǒng)中常見的致性問題,比如Leader選舉、分布式
    的頭像 發(fā)表于 12-03 16:33 ?620次閱讀

    Zookeeper的原理和作用

    Zookeeper分布式協(xié)調(diào)服務(wù),它提供了組豐富的API和工具,用于構(gòu)建分布式應(yīng)用。它可
    的頭像 發(fā)表于 12-03 16:45 ?1362次閱讀

    zookeeper的核心配置文件是什么

    來定制化Zookeeper的行為和性能。 、介紹 Zookeeper高性能的分布式協(xié)調(diào)服
    的頭像 發(fā)表于 12-04 10:33 ?745次閱讀

    redis分布式方法

    Redis是種高性能的分布式緩存和鍵值存儲系統(tǒng),它提供了種可靠的分布式解決方案。在分布式
    的頭像 發(fā)表于 12-04 11:22 ?1401次閱讀

    如何實現(xiàn)Redis分布式

    Redis是開源的內(nèi)存數(shù)據(jù)存儲系統(tǒng),可用于高速讀寫操作。在分布式系統(tǒng)中,為了保證數(shù)據(jù)的致性和避免競態(tài)條件,常常需要使用分布式
    的頭像 發(fā)表于 12-04 11:24 ?663次閱讀

    分布式的三種實現(xiàn)方式

    ,下面將分別介紹三種常見的實現(xiàn)方式。 、基于數(shù)據(jù)庫實現(xiàn)分布式分布式系統(tǒng)中,數(shù)據(jù)庫是最常
    的頭像 發(fā)表于 12-28 10:01 ?859次閱讀