iis服务器助手广告广告
返回顶部
首页 > 资讯 > 精选 >ZooKeeper三分布式锁实现及完整运行的代码
  • 459
分享到

ZooKeeper三分布式锁实现及完整运行的代码

2023-06-29 01:06:15 459人浏览 泡泡鱼
摘要

本文小编为大家详细介绍“ZooKeeper三分布式锁实现及完整运行的代码”,内容详细,步骤清晰,细节处理妥当,希望这篇“ZooKeeper三分布式锁实现及完整运行的代码”文章能帮助大家解决疑惑,下面跟着小编的思路慢慢深入,一起来学习新知识吧

本文小编为大家详细介绍“ZooKeeper分布式实现及完整运行的代码”,内容详细,步骤清晰,细节处理妥当,希望这篇“ZooKeeper三分布式锁实现及完整运行的代码”文章能帮助大家解决疑惑,下面跟着小编的思路慢慢深入,一起来学习新知识吧。

1.0版本

首先我们先介绍一个简单的zookeeper实现分布式锁的思路:

用zookeeper中一个临时节点代表锁,比如在/exlusive_lock下创建临时子节点/exlusive_lock/lock。

  • 所有客户端争相创建此节点,但只有一个客户端创建成功。

  • 创建成功代表获取锁成功,此客户端执行业务逻辑

  • 未创建成功的客户端,监听/exlusive_lock变更

  • 获取锁的客户端执行完成后,删除/exlusive_lock/lock,表示锁被释放

  • 锁被释放后,其他监听/exlusive_lock变更的客户端得到通知,再次争相创建临时子节点/exlusive_lock/lock。此时相当于回到了第2步。

我们的程序按照上述逻辑直至抢占到锁,执行完业务逻辑。

上述是较为简单的分布式锁实现方式。能够应付一般使用场景,但存在着如下两个问题:

锁的获取顺序和最初客户端争抢顺序不一致,这不是一个公平锁。每次锁获取都是当次最先抢到锁的客户端。

羊群效应,所有没有抢到锁的客户端都会监听/exlusive_lock变更。当并发客户端很多的情况下,所有的客户端都会接到通知去争抢锁,此时就出现了羊群效应。

为了解决上面的问题,我们重新设计。

2.0版本

我们在2.0版本中,让每个客户端在/exlusive_lock下创建的临时节点为有序节点,这样每个客户端都在/exlusive_lock下有自己对应的锁节点,而序号排在最前面的节点,代表对应的客户端获取锁成功。排在后面的客户端监听自己前面一个节点,那么在他前序客户端执行完成后,他将得到通知,获得锁成功。逻辑修改如下:

  • 每个客户端往/exlusive_lock下创建有序临时节点/exlusive_lock/lock_。创建成功后/exlusive_lock下面会有每个客户端对应的节点,如/exlusive_lock/lock_000000001

  • 客户端取得/exlusive_lock下子节点,并进行排序,判断排在最前面的是否为自己。如果自己的锁节点在第一位,代表获取锁成功,此客户端执行业务逻辑

  • 如果自己的锁节点不在第一位,则监听自己前一位的锁节点。例如,自己锁节点lock_000000002,那么则监听lock_000000001.

  • 当前一位锁节点(lock_000000001)对应的客户端执行完成,释放了锁,将会触发监听客户端(lock_000000002)的逻辑。

  • 监听客户端重新执行第2步逻辑,判断自己是否获得了锁。

如此修改后,每个客户端只关心自己前序锁是否释放,所以每次只会有一个客户端得到通知。而且,所有客户端的执行顺序和最初锁创建的顺序是一致的。解决了1.0版本的两个问题。

接下来我们看看代码如何实现。

LockSample类

此类是分布式锁类,实现了2个分布式锁的相关方法:

获取锁

释放锁

主要程序逻辑围绕着这两个方法的实现,特别是获取锁的逻辑。我们先看一下该类的成员变量:

private ZooKeeper zkClient;private static final String LOCK_ROOT_PATH = "/Locks";private static final String LOCK_node_NAME = "Lock_";private String lockPath;

定义了zkClient,用来操作zookeeper。

锁的根路径,及自增节点的前缀。此处生产环境应该由客户端传入。

当前锁的路径。

构造方法

public LockSample() throws ioException {    zkClient= new ZooKeeper("localhost:2181", 10000, new Watcher() {        @Override        public void process(WatchedEvent event) {            if(event.getState()== Event.KeeperState.Disconnected){                System.out.println("失去连接");             }        }    });}

创建zkClient,同时创建了状态监听。此监听可以去掉,这里只是打印出失去连接状态。

获取锁实现

暴露出来的获取锁的方法为acquireLock(),逻辑很简单:

public  void acquireLock() throws InterruptedException, KeeperException {    //创建锁节点    createLock();    //尝试获取锁    attemptLock();}

首先创建锁节点,然后尝试去取锁。真正的逻辑都在这两个方法中。

createLock()

先判断锁的根节点/Locks是否存在,不存在的话创建。然后在/Locks下创建有序临时节点,并设置当前的锁路径变量lockPath。

代码如下:

private void createLock() throws KeeperException, InterruptedException {    //如果根节点不存在,则创建根节点    Stat stat = zkClient.exists(LOCK_ROOT_PATH, false);    if (stat == null) {        zkClient.create(LOCK_ROOT_PATH, new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);    }     // 创建EPHEMERAL_SEQUENTIAL类型节点    String lockPath = zkClient.create(LOCK_ROOT_PATH + "/" + LOCK_NODE_NAME,            Thread.currentThread().getName().getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE,            CreateMode.EPHEMERAL_SEQUENTIAL);    System.out.println(Thread.currentThread().getName() + " 锁创建: " + lockPath);    this.lockPath=lockPath;}
attemptLock()

这是最核心的方法,客户端尝试去获取锁,是对2.0版本逻辑的实现,这里就不再重复逻辑,直接看代码:

private void attemptLock() throws KeeperException, InterruptedException {    // 获取Lock所有子节点,按照节点序号排序    List<String> lockPaths = null;    lockPaths = zkClient.getChildren(LOCK_ROOT_PATH, false);    Collections.sort(lockPaths);    int index = lockPaths.indexOf(lockPath.substring(LOCK_ROOT_PATH.length() + 1));    // 如果lockPath是序号最小的节点,则获取锁    if (index == 0) {        System.out.println(Thread.currentThread().getName() + " 锁获得, lockPath: " + lockPath);        return ;    } else {        // lockPath不是序号最小的节点,监听前一个节点        String preLockPath = lockPaths.get(index - 1);         Stat stat = zkClient.exists(LOCK_ROOT_PATH + "/" + preLockPath, watcher);         // 假如前一个节点不存在了,比如说执行完毕,或者执行节点掉线,重新获取锁        if (stat == null) {            attemptLock();        } else { // 阻塞当前进程,直到preLockPath释放锁,被watcher观察到,notifyAll后,重新acquireLock            System.out.println(" 等待前锁释放,prelocakPath:"+preLockPath);            synchronized (watcher) {                watcher.wait();            }            attemptLock();        }    }}

注意这一行代码

Stat stat = zkClient.exists(LOCK_ROOT_PATH + "/" + preLockPath, watcher);

我们在获取前一个节点的时候,同时设置了监听watcher。如果前锁存在,则阻塞主线程

watcher定义代码如下:

private Watcher watcher = new Watcher() {    @Override    public void process(WatchedEvent event) {        System.out.println(event.getPath() + " 前锁释放");        synchronized (this) {            notifyAll();        }    }};

watcher只是notifyAll,让主线程继续执行,以便再次调用attemptLock(),去尝试获取lock。如果没有异常情况的话,此时当前客户端应该能够成功获取锁。

释放锁实现

释放锁原语实现很简单,参照releaseLock()方法。代码如下:

public void releaseLock() throws KeeperException, InterruptedException {    zkClient.delete(lockPath, -1);    zkClient.close();    System.out.println(" 锁释放:" + lockPath);}

关于分布式锁的代码到此就讲解完了,我们再看下客户端如何使用它。

我们创建一个TicketSeller类,作为客户端来使用分布式锁。

 TicketSeller类

sell()

不带锁的业务逻辑方法,代码如下:

private void sell(){    System.out.println("售票开始");    // 线程随机休眠数毫秒,模拟现实中的费时操作    int sleepMillis = (int) (Math.random() * 2000);    try {        //代表复杂逻辑执行了一段时间        Thread.sleep(sleepMillis);    } catch (InterruptedException e) {        e.printStackTrace();    }    System.out.println("售票结束");}

仅是为了演示,sleep了一段时间。

sellTicketWithLock()

此方法中,加锁后执行业务逻辑,代码如下:

public void sellTicketWithLock() throws KeeperException, InterruptedException, IOException {    LockSample lock = new LockSample();    lock.acquireLock();    sell();    lock.releaseLock();}

测试入口

接下来我们写一个main函数做测试:

public static void main(String[] args) throws KeeperException, InterruptedException, IOException {    TicketSeller ticketSeller = new TicketSeller();    for(int i=0;i<1000;i++){        ticketSeller.sellTicketWithLock();    }}

main函数中我们循环调用ticketSeller.sellTicketWithLock(),执行加锁后的卖票逻辑。

测试方法

先启动一个java程序运行,可以看到日志输出如下:

main 锁创建: /Locks/Lock_0000000391main 锁获得, lockPath: /Locks/Lock_0000000391售票开始售票结束 锁释放:/Locks/Lock_0000000391main 锁创建: /Locks/Lock_0000000392main 锁获得, lockPath: /Locks/Lock_0000000392售票开始售票结束 锁释放:/Locks/Lock_0000000392main 锁创建: /Locks/Lock_0000000393main 锁获得, lockPath: /Locks/Lock_0000000393售票开始售票结束 锁释放:/Locks/Lock_0000000393

可见每次执行都是按照锁的顺序执行,而且由于只有一个进程,并没有锁的争抢发生。

我们再启动一个同样的程序,锁的争抢此时发生了,可以看到双方的日志输出如下:

程序1:

main 锁获得, lockPath: /Locks/Lock_0000000471售票开始售票结束 锁释放:/Locks/Lock_0000000471main 锁创建: /Locks/Lock_0000000473 等待前锁释放,prelocakPath:Lock_0000000472/Locks/Lock_0000000472 前锁释放main 锁获得, lockPath: /Locks/Lock_0000000473售票开始售票结束 锁释放:/Locks/Lock_0000000473

可以看到Lock_0000000471执行完成后,该进程获取的锁为Lock_0000000473,这说明Lock_0000000472被另外一个进程创建了。此时Lock_0000000473在等待前锁释放。Lock_0000000472释放后,Lock_0000000473才获得锁,然后才执行业务逻辑。

我们再看程序2的日志:

main 锁获得, lockPath: /Locks/Lock_0000000472售票开始售票结束 锁释放:/Locks/Lock_0000000472main 锁创建: /Locks/Lock_0000000474 等待前锁释放,prelocakPath:Lock_0000000473/Locks/Lock_0000000473 前锁释放main 锁获得, lockPath: /Locks/Lock_0000000474售票开始售票结束 锁释放:/Locks/Lock_0000000474

可以看到,确实是进程2获取了Lock_0000000472。

zookeeper实现分布式锁就先讲到这。注意代码只做演示用,并不适合生产环境使用。

代码清单如下:

1、LockSample

import org.apache.zookeeper.*;import org.apache.zookeeper.data.Stat; import java.io.IOException;import java.util.Collections;import java.util.List; public class LockSample {     //ZooKeeper配置信息    private ZooKeeper zkClient;    private static final String LOCK_ROOT_PATH = "/Locks";    private static final String LOCK_NODE_NAME = "Lock_";    private String lockPath;     // 监控lockPath的前一个节点的watcher    private Watcher watcher = new Watcher() {        @Override        public void process(WatchedEvent event) {            System.out.println(event.getPath() + " 前锁释放");            synchronized (this) {                notifyAll();            }         }    };     public LockSample() throws IOException {        zkClient= new ZooKeeper("localhost:2181", 10000, new Watcher() {            @Override            public void process(WatchedEvent event) {                if(event.getState()== Event.KeeperState.Disconnected){                    System.out.println("失去连接");                 }            }        });    }     //获取锁的原语实现.    public  void acquireLock() throws InterruptedException, KeeperException {        //创建锁节点        createLock();        //尝试获取锁        attemptLock();    }     //创建锁的原语实现。在lock节点下创建该线程的锁节点    private void createLock() throws KeeperException, InterruptedException {        //如果根节点不存在,则创建根节点        Stat stat = zkClient.exists(LOCK_ROOT_PATH, false);        if (stat == null) {            zkClient.create(LOCK_ROOT_PATH, new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);        }         // 创建EPHEMERAL_SEQUENTIAL类型节点        String lockPath = zkClient.create(LOCK_ROOT_PATH + "/" + LOCK_NODE_NAME,                Thread.currentThread().getName().getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE,                CreateMode.EPHEMERAL_SEQUENTIAL);        System.out.println(Thread.currentThread().getName() + " 锁创建: " + lockPath);        this.lockPath=lockPath;    }     private void attemptLock() throws KeeperException, InterruptedException {        // 获取Lock所有子节点,按照节点序号排序        List<String> lockPaths = null;         lockPaths = zkClient.getChildren(LOCK_ROOT_PATH, false);         Collections.sort(lockPaths);         int index = lockPaths.indexOf(lockPath.substring(LOCK_ROOT_PATH.length() + 1));         // 如果lockPath是序号最小的节点,则获取锁        if (index == 0) {            System.out.println(Thread.currentThread().getName() + " 锁获得, lockPath: " + lockPath);            return ;        } else {            // lockPath不是序号最小的节点,监控前一个节点            String preLockPath = lockPaths.get(index - 1);             Stat stat = zkClient.exists(LOCK_ROOT_PATH + "/" + preLockPath, watcher);             // 假如前一个节点不存在了,比如说执行完毕,或者执行节点掉线,重新获取锁            if (stat == null) {                attemptLock();            } else { // 阻塞当前进程,直到preLockPath释放锁,被watcher观察到,notifyAll后,重新acquireLock                System.out.println(" 等待前锁释放,prelocakPath:"+preLockPath);                synchronized (watcher) {                    watcher.wait();                }                attemptLock();            }        }    }     //释放锁的原语实现    public void releaseLock() throws KeeperException, InterruptedException {        zkClient.delete(lockPath, -1);        zkClient.close();        System.out.println(" 锁释放:" + lockPath);    }  }

2、TicketSeller

import org.apache.zookeeper.KeeperException;import java.io.IOException;public class TicketSeller {    private void sell(){        System.out.println("售票开始");        // 线程随机休眠数毫秒,模拟现实中的费时操作        int sleepMillis = (int) (Math.random() * 2000);        try {            //代表复杂逻辑执行了一段时间            Thread.sleep(sleepMillis);        } catch (InterruptedException e) {            e.printStackTrace();        }        System.out.println("售票结束");    }     public void sellTicketWithLock() throws KeeperException, InterruptedException, IOException {        LockSample lock = new LockSample();        lock.acquireLock();        sell();        lock.releaseLock();    }     public static void main(String[] args) throws KeeperException, InterruptedException, IOException {        TicketSeller ticketSeller = new TicketSeller();        for(int i=0;i<1000;i++){            ticketSeller.sellTicketWithLock();         }    }}

读到这里,这篇“ZooKeeper三分布式锁实现及完整运行的代码”文章已经介绍完毕,想要掌握这篇文章的知识点还需要大家自己动手实践使用过才能领会,如果想了解更多相关内容的文章,欢迎关注编程网精选频道。

--结束END--

本文标题: ZooKeeper三分布式锁实现及完整运行的代码

本文链接: https://www.lsjlt.com/news/321710.html(转载时请注明来源链接)

有问题或投稿请发送至: 邮箱/279061341@qq.com    QQ/279061341

本篇文章演示代码以及资料文档资料下载

下载Word文档到电脑,方便收藏和打印~

下载Word文档
猜你喜欢
  • ZooKeeper三分布式锁实现及完整运行的代码
    本文小编为大家详细介绍“ZooKeeper三分布式锁实现及完整运行的代码”,内容详细,步骤清晰,细节处理妥当,希望这篇“ZooKeeper三分布式锁实现及完整运行的代码”文章能帮助大家解决疑惑,下面跟着小编的思路慢慢深入,一起来学习新知识吧...
    99+
    2023-06-29
  • ZooKeeper入门教程三分布式锁实现及完整运行源码
    目录1.0版本2.0版本LockSample类构造方法获取锁实现createLock()attemptLock()释放锁实现 TicketSeller类sell()sell...
    99+
    2024-04-02
  • springboot+zookeeper实现分布式锁的示例代码
    目录依赖本地封装配置测试代码JMeter测试InterProcessMutex内部实现了zookeeper分布式锁的机制,所以接下来我们尝试使用这个工具来为我们的业务加上分布式锁处理...
    99+
    2024-04-02
  • ZooKeeper分布式锁的实现方式
    本篇内容介绍了“ZooKeeper分布式锁的实现方式”的有关知识,在实际案例的操作过程中,不少人都会遇到这样的困境,接下来就让小编带领大家学习一下如何处理这些情况吧!希望大家仔细阅读,能够学有所成!目录一、分布式锁方案比较二、ZooKeep...
    99+
    2023-06-20
  • 分析ZooKeeper分布式锁的实现
    目录一、分布式锁方案比较二、ZooKeeper实现分布式锁2.1、方案一2.2、方案二一、分布式锁方案比较 方案 ...
    99+
    2024-04-02
  • Zookeeper的分布式锁的实现方式
    这篇文章主要讲解了“Zookeeper的分布式锁的实现方式”,文中的讲解内容简单清晰,易于学习与理解,下面请大家跟着小编的思路慢慢深入,一起来研究和学习“Zookeeper的分布式锁的实现方式”吧!1. 背景最近在学习 Zookeeper,...
    99+
    2023-06-05
  • ZooKeeper框架教程Curator分布式锁实现及源码分析
    目录  如何使用InterProcessMutex  实现思路   代码实现概述  InterProcessMutex源码分析&nb...
    99+
    2024-04-02
  • Spring Boot整合Zookeeper实现分布式锁的场景分析
    目录一、Java当中关于锁的概念1.1.什么是锁1.2.锁的使用场景1.3.什么是分布式锁1.4.分布式锁的使用场景二、zk实现分布式锁2.1.zk中锁的种类:2.2.zk如何上读锁...
    99+
    2024-04-02
  • ZooKeeper的Curator分布式锁怎么实现
    本篇内容介绍了“ZooKeeper的Curator分布式锁怎么实现”的有关知识,在实际案例的操作过程中,不少人都会遇到这样的困境,接下来就让小编带领大家学习一下如何处理这些情况吧!希望大家仔细阅读,能够学有所成!Curator中有着更为标准...
    99+
    2023-06-29
  • ZooKeeper能否用于分布式锁的实现
    是的,ZooKeeper可以用于分布式锁的实现。ZooKeeper是一个分布式协调服务,可以用来实现分布式系统中的一些共享资源管理问...
    99+
    2024-03-07
    ZooKeeper
  • zookeeper实战之实现分布式锁的方法
    目录一、分布式锁的通用实现思路二、ZK实现分布式锁的思路三、ZK实现分布式锁的编码实现1、核心工具类实现2、测试代码编写线程安全问题复现使用上面封装的ZkLockHelper实现的分...
    99+
    2022-11-13
    zookeeper分布式锁 zookeeper实现分布式锁 zookeeper分布式锁原理
  • C# 实现Zookeeper分布式锁的参考示例
    目录  分布式锁   Zookeeper分布式锁原理  C#实现Zookeeper分布式锁  分布式锁    互联网初期,我们系统一般都是单点部署,也就是在一台服务器完成系统的部署,...
    99+
    2024-04-02
  • 基于Redis分布式锁的实现代码
    概述 目前几乎很多大型网站及应用都是分布式部署的,分布式场景中的数据一致性问题一直是一个比较重要的话题。分布式的CAP理论告诉我们“任何一个分布式系统都无法同时满足一致性(Consistency)、可用性(...
    99+
    2024-04-02
  • SpringBoot整合分布式锁redisson的示例代码
    目录1、导入maven坐标2、redisson配置类(如果redis没有密码就不需要private String password)3、创建redisson的bean4、测试,入队5...
    99+
    2023-02-23
    SpringBoot整合分布式锁redisson SpringBoot整合 redisson
  • 分布式锁的原理及Redis怎么实现分布式锁
    这篇文章主要介绍“分布式锁的原理及Redis怎么实现分布式锁”,在日常操作中,相信很多人在分布式锁的原理及Redis怎么实现分布式锁问题上存在疑惑,小编查阅了各式资料,整理出简单好用的操作方法,希望对大家解...
    99+
    2023-02-02
    redis
  • redis分布式锁之可重入锁的实现代码
    上篇redis实现的分布式锁,有一个问题,它不可重入。 所谓不可重入锁,即若当前线程执行某个方法已经获取了该锁,那么在方法中尝试再次获取锁时,就会获取不到被阻塞。 同一个人拿一个锁 ...
    99+
    2024-04-02
  • java实现分布式锁的代码怎么写
    在Java中实现分布式锁可以使用以下几种方式:1. 基于数据库的实现:   - 创建一个数据库表,表中有一个唯一索引字段用...
    99+
    2023-10-23
    java
  • Java实现redis分布式锁的三种方式
    目录一、引入原因二、分布式锁实现过程中的问题问题一:异常导致锁没有释放问题二:获取锁与设置过期时间操作不是原子性的问题三:锁过期之后被别的线程重新获取与释放问题四:锁的释放不是原子性...
    99+
    2022-11-13
    Java redis分布式锁 Java 分布式锁
  • python 三种方式实现截屏(详解+完整代码)
    一、方法一 # PIL中的ImageGrab模块# 使用PIL中的ImageGrab模块简单,但是效率有点低# PIL是Python Imaging Library,它为python解释器提供图像编辑函数能力。 ImageGrab模块可用于...
    99+
    2023-09-08
    python
  • 用Go+Redis实现分布式锁的示例代码
    目录为什么需要分布式锁 分布式锁需要具备特性 实现 Redis 锁应先掌握哪些知识点 set 命令 Redis.lua 脚本 go-zero 分布式锁 RedisLock 源码分析 ...
    99+
    2024-04-02
软考高级职称资格查询
编程网,编程工程师的家园,是目前国内优秀的开源技术社区之一,形成了由开源软件库、代码分享、资讯、协作翻译、讨论区和博客等几大频道内容,为IT开发者提供了一个发现、使用、并交流开源技术的平台。
  • 官方手机版

  • 微信公众号

  • 商务合作