iis服务器助手广告广告
返回顶部
首页 > 资讯 > 精选 >基于Zookeeper怎么实现分布式锁
  • 875
分享到

基于Zookeeper怎么实现分布式锁

2023-06-22 02:06:08 875人浏览 泡泡鱼
摘要

这篇文章主要介绍“基于ZooKeeper怎么实现分布式锁”,在日常操作中,相信很多人在基于Zookeeper怎么实现分布式锁问题上存在疑惑,小编查阅了各式资料,整理出简单好用的操作方法,希望对大家解答”基于Zookeeper怎么实现分布式锁

这篇文章主要介绍“基于ZooKeeper怎么实现分布式”,在日常操作中,相信很多人在基于Zookeeper怎么实现分布式锁问题上存在疑惑,小编查阅了各式资料,整理出简单好用的操作方法,希望对大家解答”基于Zookeeper怎么实现分布式锁”的疑惑有所帮助!接下来,请跟着小编一起来学习吧!

1、什么是Zookeeper?

Zookeeper是一个分布式的,开源的分布式应用程序协调服务,是hadoopHBase的重要组件。

引用官网的图例:

基于Zookeeper怎么实现分布式锁

特征:

  1. zookeeper的数据机构是一种节点树的数据结构,znode是基本的单位,znode是一种和unix文件系统相似的节点,可以往这个节点存储或向这个节点获取数据

  2. 通过客户端可以对znode进行数据操作,还可以注册watcher监控znode的改变

2、Zookeeper节点类型

  • 持久节点(Persistent)

  • 持久顺序节点(Persistent_Sequential)

  • 临时节点(Ephemeral)

  • 临时顺序节点(Ephemeral_Sequential)

3、Zookeeper环境搭建

下载zookeeper,官网链接,https://zookeeper.apache.org/releases.html#download,去官网找到对应的软件下载到本地

修改配置文件,${ZOOKEEPER_HOME}\conf,找到zoo_sample.cfg文件,先备份一份,另外一份修改为zoo.cfg

基于Zookeeper怎么实现分布式锁

解压后点击zkServer.cmd运行服务端:

基于Zookeeper怎么实现分布式锁

4、Zookeeper基本使用

在cmd窗口或者直接在idea编辑器里的terminal输入命令:

zkCli.cmd -server 127.0.0.1:2181

基于Zookeeper怎么实现分布式锁

输入命令help查看帮助信息:

ZooKeeper -server host:port -client-configuration properties-file cmd args        addWatch [-m mode] path # optional mode is one of [PERSISTENT, PERSISTENT_RECURSIVE] - default is PERSISTENT_RECURSIVE        addauth scheme auth        close        config [-c] [-w] [-s]        connect host:port        create [-s] [-e] [-c] [-t ttl] path [data] [acl]        delete [-v version] path        deleteall path [-b batch size]        delquota [-n|-b|-N|-B] path        get [-s] [-w] path        getAcl [-s] path        getAllChildrenNumber path        getEphemerals path        history        listquota path        ls [-s] [-w] [-R] path        printwatches on|off        quit        reconfig [-s] [-v version] [[-file path] | [-members serverID=host:port1:port2;port3[,...]*]] | [-add serverId=host:port1:port2;port3[,...]]* [-remove serverId[,...]*]        redo cmdno        removewatches path [-c|-d|-a] [-l]        set [-s] [-v version] path data        setAcl [-s] [-v version] [-R] path acl        setquota -n|-b|-N|-B val path        stat [-w] path        sync path        version        whoami

create [-s] [-e] [-c] [-t ttl] path [data] [acl]-s表示顺序节点,-e表示临时节点,若不指定表示持久节点,acl是来进行权限控制的

[zk: 127.0.0.1:2181(CONNECTED) 1] create -s /zk-test 0Created /zk-test0000000000

查看

[zk: 127.0.0.1:2181(CONNECTED) 4] ls /[zk-test0000000000, zookeeper]

设置修改节点数据

set /zk-test 123

获取节点数据

get /zk-test

ps,zookeeper命令详情查看help帮助文档,也可以去官网看看文档

ok,然后java写个例子,进行watcher监听

package com.example.concurrent.zkSample;import org.I0Itec.zkclient.IZkDataListener;import org.I0Itec.zkclient.ZkClient;public class ZookeeperSample {    public static void main(String[] args) {        ZkClient client = new ZkClient("localhost:2181");        client.setZkSerializer(new MyZkSerializer());        client.subscribeDataChanges("/zk-test", new IZkDataListener() {            @Override            public void handleDataChange(String dataPath, Object data) throws Exception {                System.out.println("监听到节点数据改变!");            }            @Override            public void handleDataDeleted(String dataPath) throws Exception {                System.out.println("监听到节点数据被删除了");            }        });        try {            Thread.sleep(1000 * 60 * 2);        } catch (InterruptedException e) {            e.printStackTrace();        }    }}

5、Zookeeper应用场景

Zookeeper有什么典型的应用场景:

  1. 注册中心(dubbo

  2. 命名服务

  3. Master选举

  4. 集群管理

  5. 分布式队列

  6. 分布式锁

6、Zookeeper分布式锁

Zookeeper适合用来做分布式锁,然后具体实现是利用什么原理?我们知道zookeeper是类似于unix的文件系统,文件系统我们也知道在一个文件夹下面,会有文件名称不能一致的特性的,也就是互斥的特性。同样zookeeper也有这个特性,在同个znode节点下面,子节点命名不能重复。所以利用这个特性可以来实现分布式锁

业务场景:在高并发的情况下面进行订单场景,这是一个典型的电商场景

基于Zookeeper怎么实现分布式锁

自定义的Zookeeper序列化类:

package com.example.concurrent.zkSample;import org.I0Itec.zkclient.exception.ZkMarshallingError;import org.I0Itec.zkclient.serialize.ZkSerializer;import java.io.UnsupportedEncodingException;public class MyZkSerializer implements ZkSerializer {    private String charset = "UTF-8";    @Override    public byte[] serialize(Object o) throws ZkMarshallingError {        return String.valueOf(o).getBytes();    }    @Override    public Object deserialize(byte[] bytes) throws ZkMarshallingError {        try {            return new String(bytes , charset);        } catch (UnsupportedEncodingException e) {            throw new ZkMarshallingError();        }    }}

订单编号生成器类,因为SimpleDateFORMat是线程安全的,所以还是要加上ThreadLocal

package com.example.concurrent.zkSample;import java.text.DateFormat;import java.text.SimpleDateFormat;import java.util.Date;import java.util.concurrent.atomic.AtomicInteger;public class OrderCodeGenerator {    private static final String DATE_FORMAT = "yyyyMMddHHmmss";    private static AtomicInteger ai  = new AtomicInteger(0);    private static int i = 0;    private static ThreadLocal<SimpleDateFormat> threadLocal = new ThreadLocal<SimpleDateFormat>() {        @Override        protected SimpleDateFormat initialValue() {            return new SimpleDateFormat(DATE_FORMAT);        }    };    public static DateFormat getDateFormat() {        return (DateFormat) threadLocal.get();    }    public static String generatorOrderCode() {        try {            return getDateFormat().format(new Date(System.currentTimeMillis()))                    + i++;        } finally {            threadLocal.remove();        }    }}

pom.xml加上zookeeper客户端的配置:

<dependency>    <groupId>com.101tec</groupId>    <artifactId>zkclient</artifactId>    <version>0.10</version></dependency>

实现一个zookeeper分布式锁,思路是获取节点,这个是多线程竞争的,能获取到锁,也就是创建节点成功,就执行业务,其它抢不到锁的线程,阻塞等待,注册watcher监听锁是否释放了,释放了,取消注册watcher,继续抢锁

基于Zookeeper怎么实现分布式锁

package com.example.concurrent.zkSample;import lombok.extern.slf4j.Slf4j;import org.I0Itec.zkclient.IZkDataListener;import org.I0Itec.zkclient.ZkClient;import org.I0Itec.zkclient.exception.ZkNodeExistsException;import java.util.concurrent.CountDownLatch;import java.util.concurrent.TimeUnit;import java.util.concurrent.locks.Condition;import java.util.concurrent.locks.Lock;@Slf4jpublic class ZKDistributeLock implements Lock {    private String localPath;    private ZkClient zkClient;    ZKDistributeLock(String localPath) {        super();        this.localPath = localPath;        zkClient = new ZkClient("localhost:2181");        zkClient.setZkSerializer(new MyZkSerializer());    }    @Override    public void lock() {        while (!tryLock()) {            waitForLock();        }    }    private void waitForLock() {        // 创建countdownLatch协同        CountDownLatch countDownLatch = new CountDownLatch(1);        // 注册watcher监听        IZkDataListener listener = new IZkDataListener() {            @Override            public void handleDataChange(String path, Object o) throws Exception {                //System.out.println("zookeeper data has change!!!");            }            @Override            public void handleDataDeleted(String s) throws Exception {                // System.out.println("zookeeper data has delete!!!");                // 监听到锁释放了,释放线程                countDownLatch.countDown();            }        };        zkClient.subscribeDataChanges(localPath , listener);        // 线程等待        if (zkClient.exists(localPath)) {            try {                countDownLatch.await();            } catch (InterruptedException e) {                e.printStackTrace();            }        }        // 取消注册        zkClient.unsubscribeDataChanges(localPath , listener);    }    @Override    public void unlock() {        zkClient.delete(localPath);    }    @Override    public boolean tryLock() {        try {            zkClient.createEphemeral(localPath);        } catch (ZkNodeExistsException e) {            return false;        }        return true;    }    @Override    public boolean tryLock(long time, TimeUnit unit) throws InterruptedException {        return false;    }    @Override    public void lockInterruptibly() throws InterruptedException {    }    @Override    public Condition newCondition() {        return null;    }}

订单服务api

package com.example.concurrent.zkSample;public interface OrderService {    void createOrder();}

订单服务实现类,加上zookeeper分布式锁

package com.example.concurrent.zkSample;import java.util.concurrent.locks.Lock;public class OrderServiceInvoker implements OrderService{    @Override    public void createOrder() {        Lock zkLock = new ZKDistributeLock("/zk-test");        //Lock zkLock = new ZKDistributeImproveLock("/zk-test");        String orderCode = null;        try {            zkLock.lock();            orderCode = OrderCodeGenerator.generatorOrderCode();        } finally {            zkLock.unlock();        }        System.out.println(String.format("thread name : %s , orderCode : %s" ,                Thread.currentThread().getName(),                orderCode));    }}

因为搭建分布式环境比较繁琐,所以这里使用juc里的并发协同工具类,CyclicBarrier模拟多线程并发的场景,模拟分布式环境的高并发场景

package com.example.concurrent.zkSample;import java.util.concurrent.BrokenBarrierException;import java.util.concurrent.CyclicBarrier;public class ConcurrentDistributeTest {    public static void main(String[] args) {        // 多线程数        int threadSize = 30;        // 创建多线程循环屏障        CyclicBarrier cyclicBarrier = new CyclicBarrier(threadSize , ()->{            System.out.println("准备完成!");        }) ;        // 模拟分布式集群的场景        for (int i = 0 ; i < threadSize ; i ++) {            new Thread(()->{                OrderService orderService = new OrderServiceInvoker();                // 所有线程都等待                try {                    cyclicBarrier.await();                } catch (InterruptedException e) {                    e.printStackTrace();                } catch (BrokenBarrierException e) {                    e.printStackTrace();                }                // 模拟并发请求                orderService.createOrder();            }).start();        }    }}

跑多几次,没有发现订单号重复的情况,分布式锁还是有点效果的

thread name : Thread-6 , orderCode : 202112100945110

thread name : Thread-1 , orderCode : 202112100945111

thread name : Thread-13 , orderCode : 202112100945112

thread name : Thread-11 , orderCode : 202112100945113

thread name : Thread-14 , orderCode : 202112100945114

thread name : Thread-0 , orderCode : 202112100945115

thread name : Thread-8 , orderCode : 202112100945116

thread name : Thread-17 , orderCode : 202112100945117

thread name : Thread-10 , orderCode : 202112100945118

thread name : Thread-5 , orderCode : 202112100945119

thread name : Thread-2 , orderCode : 2021121009451110

thread name : Thread-16 , orderCode : 2021121009451111

thread name : Thread-19 , orderCode : 2021121009451112

thread name : Thread-4 , orderCode : 2021121009451113

thread name : Thread-18 , orderCode : 2021121009451114

thread name : Thread-3 , orderCode : 2021121009451115

thread name : Thread-9 , orderCode : 2021121009451116

thread name : Thread-12 , orderCode : 2021121009451117

thread name : Thread-15 , orderCode : 2021121009451118

thread name : Thread-7 , orderCode : 2021121009451219

注释加锁的代码,再加大并发数,模拟一下

package com.example.concurrent.zkSample;import java.util.concurrent.locks.Lock;public class OrderServiceInvoker implements OrderService{    @Override    public void createOrder() {        //Lock zkLock = new ZKDistributeLock("/zk-test");        //Lock zkLock = new ZKDistributeImproveLock("/zk-test");        String orderCode = null;        try {            //zkLock.lock();            orderCode = OrderCodeGenerator.generatorOrderCode();        } finally {            //zkLock.unlock();        }        System.out.println(String.format("thread name : %s , orderCode : %s" ,                Thread.currentThread().getName(),                orderCode));    }}

跑多几次,发现出现订单号重复的情况,所以分布式锁是可以保证分布式环境的线程安全的

基于Zookeeper怎么实现分布式锁

7、公平式Zookeeper分布式锁

上面例子是一种非公平锁的方式,一旦监听到锁释放了,所有线程都会去抢锁,所以容易出现“惊群效应”:

所以,需要改进分布式锁,改成一种公平锁的模式

公平锁:多个线程按照申请锁的顺序去获取锁,线程会在队列里排队,按照顺序去获取锁。只有队列第1个线程才能获取到锁,获取到锁之后,其它线程都会阻塞等待,等到持有锁的线程释放锁,其它线程才会被唤醒。

非公平锁:多个线程都会去竞争获取锁,获取不到就进入队列等待,竞争得到就直接获取锁;然后持有锁的线程释放锁之后,所有等待的线程就都会去竞争锁。

基于Zookeeper怎么实现分布式锁

流程图:

基于Zookeeper怎么实现分布式锁

代码改进:

package com.example.concurrent.zkSample;import org.I0Itec.zkclient.IZkDataListener;import org.I0Itec.zkclient.ZkClient;import org.I0Itec.zkclient.exception.ZkNodeExistsException;import java.util.Collections;import java.util.List;import java.util.concurrent.CountDownLatch;import java.util.concurrent.TimeUnit;import java.util.concurrent.locks.Condition;import java.util.concurrent.locks.Lock;public class ZKDistributeImproveLock implements Lock {    private String localPath;    private ZkClient zkClient;    private String currentPath;    private String beforePath;    ZKDistributeImproveLock(String localPath) {        super();        this.localPath = localPath;        zkClient = new ZkClient("localhost:2181");        zkClient.setZkSerializer(new MyZkSerializer());        if (!zkClient.exists(localPath)) {            try {                this.zkClient.createPersistent(localPath);            } catch (ZkNodeExistsException e) {            }        }    }    @Override    public void lock() {        while (!tryLock()) {            waitForLock();        }    }    private void waitForLock() {        CountDownLatch countDownLatch = new CountDownLatch(1);        // 注册watcher        IZkDataListener listener = new IZkDataListener() {            @Override            public void handleDataChange(String dataPath, Object data) throws Exception {            }            @Override            public void handleDataDeleted(String dataPath) throws Exception {                // 监听到锁释放,唤醒线程                countDownLatch.countDown();            }        };        zkClient.subscribeDataChanges(beforePath, listener);        // 线程等待        if (zkClient.exists(beforePath)) {            try {                countDownLatch.await();            } catch (InterruptedException e) {                e.printStackTrace();            }        }        // 取消注册        zkClient.unsubscribeDataChanges(beforePath , listener);    }    @Override    public void unlock() {        zkClient.delete(this.currentPath);    }    @Override    public boolean tryLock() {        if (this.currentPath == null) {            currentPath = zkClient.createEphemeralSequential(localPath +"/" , "123");        }        // 获取Znode节点下面的所有子节点        List<String> children = zkClient.getChildren(localPath);        // 列表排序        Collections.sort(children);        if (currentPath.equals(localPath + "/" + children.get(0))) { // 当前节点是第1个节点            return true;        } else {            //得到当前的索引号            int index = children.indexOf(currentPath.substring(localPath.length() + 1));            //取到前一个            beforePath = localPath + "/" + children.get(index - 1);        }        return false;    }    @Override    public boolean tryLock(long time, TimeUnit unit) throws InterruptedException {        return false;    }    @Override    public void lockInterruptibly() throws InterruptedException {    }    @Override    public Condition newCondition() {        return null;    }}

thread name : Thread-13 , orderCode : 202112100936140

thread name : Thread-3 , orderCode : 202112100936141

thread name : Thread-14 , orderCode : 202112100936142

thread name : Thread-16 , orderCode : 202112100936143

thread name : Thread-1 , orderCode : 202112100936144

thread name : Thread-9 , orderCode : 202112100936145

thread name : Thread-4 , orderCode : 202112100936146

thread name : Thread-5 , orderCode : 202112100936147

thread name : Thread-7 , orderCode : 202112100936148

thread name : Thread-2 , orderCode : 202112100936149

thread name : Thread-17 , orderCode : 2021121009361410

thread name : Thread-15 , orderCode : 2021121009361411

thread name : Thread-0 , orderCode : 2021121009361412

thread name : Thread-10 , orderCode : 2021121009361413

thread name : Thread-18 , orderCode : 2021121009361414

thread name : Thread-19 , orderCode : 2021121009361415

thread name : Thread-8 , orderCode : 2021121009361416

thread name : Thread-12 , orderCode : 2021121009361417

thread name : Thread-11 , orderCode : 2021121009361418

thread name : Thread-6 , orderCode : 2021121009361419

8、zookeeper和Redis锁对比?

Redis和Zookeeper都可以用来实现分布式锁,两者可以进行对比:

基于Redis实现分布式锁

  • 实现比较复杂

  • 存在死锁的可能

  • 性能比较好,基于内存 ,而且保证的是高可用,redis优先保证的是AP(分布式CAP理论)

基于Zookeeper实现分布式锁

  • 实现相对简单

  • 可靠性高,因为zookeeper保证的是CP(分布式CAP理论)

  • 性能相对较好 并发1~2万左右,并发太高,还是redis性能好

到此,关于“基于Zookeeper怎么实现分布式锁”的学习就结束了,希望能够解决大家的疑惑。理论与实践的搭配能更好的帮助大家学习,快去试试吧!若想继续学习更多相关知识,请继续关注编程网网站,小编会继续努力为大家带来更多实用的文章!

--结束END--

本文标题: 基于Zookeeper怎么实现分布式锁

本文链接: https://www.lsjlt.com/news/301882.html(转载时请注明来源链接)

有问题或投稿请发送至: 邮箱/279061341@qq.com    QQ/279061341

本篇文章演示代码以及资料文档资料下载

下载Word文档到电脑,方便收藏和打印~

下载Word文档
猜你喜欢
  • C++ 生态系统中流行库和框架的贡献指南
    作为 c++++ 开发人员,通过遵循以下步骤即可为流行库和框架做出贡献:选择一个项目并熟悉其代码库。在 issue 跟踪器中寻找适合初学者的问题。创建一个新分支,实现修复并添加测试。提交...
    99+
    2024-05-15
    框架 c++ 流行库 git
  • C++ 生态系统中流行库和框架的社区支持情况
    c++++生态系统中流行库和框架的社区支持情况:boost:活跃的社区提供广泛的文档、教程和讨论区,确保持续的维护和更新。qt:庞大的社区提供丰富的文档、示例和论坛,积极参与开发和维护。...
    99+
    2024-05-15
    生态系统 社区支持 c++ overflow 标准库
  • c++中if elseif使用规则
    c++ 中 if-else if 语句的使用规则为:语法:if (条件1) { // 执行代码块 1} else if (条件 2) { // 执行代码块 2}// ...else ...
    99+
    2024-05-15
    c++
  • c++中的继承怎么写
    继承是一种允许类从现有类派生并访问其成员的强大机制。在 c++ 中,继承类型包括:单继承:一个子类从一个基类继承。多继承:一个子类从多个基类继承。层次继承:多个子类从同一个基类继承。多层...
    99+
    2024-05-15
    c++
  • c++中如何使用类和对象掌握目标
    在 c++ 中创建类和对象:使用 class 关键字定义类,包含数据成员和方法。使用对象名称和类名称创建对象。访问权限包括:公有、受保护和私有。数据成员是类的变量,每个对象拥有自己的副本...
    99+
    2024-05-15
    c++
  • c++中优先级是什么意思
    c++ 中的优先级规则:优先级高的操作符先执行,相同优先级的从左到右执行,括号可改变执行顺序。操作符优先级表包含从最高到最低的优先级列表,其中赋值运算符具有最低优先级。通过了解优先级,可...
    99+
    2024-05-15
    c++
  • c++中a+是什么意思
    c++ 中的 a+ 运算符表示自增运算符,用于将变量递增 1 并将结果存储在同一变量中。语法为 a++,用法包括循环和计数器。它可与后置递增运算符 ++a 交换使用,后者在表达式求值后递...
    99+
    2024-05-15
    c++
  • c++中a.b什么意思
    c++kquote>“a.b”表示对象“a”的成员“b”,用于访问对象成员,可用“对象名.成员名”的语法。它还可以用于访问嵌套成员,如“对象名.嵌套成员名.成员名”的语法。 c++...
    99+
    2024-05-15
    c++
  • C++ 并发编程库的优缺点
    c++++ 提供了多种并发编程库,满足不同场景下的需求。线程库 (std::thread) 易于使用但开销大;异步库 (std::async) 可异步执行任务,但 api 复杂;协程库 ...
    99+
    2024-05-15
    c++ 并发编程
  • 如何在 Golang 中备份数据库?
    在 golang 中备份数据库对于保护数据至关重要。可以使用标准库中的 database/sql 包,或第三方包如 github.com/go-sql-driver/mysql。具体步骤...
    99+
    2024-05-15
    golang 数据库备份 mysql git 标准库
软考高级职称资格查询
编程网,编程工程师的家园,是目前国内优秀的开源技术社区之一,形成了由开源软件库、代码分享、资讯、协作翻译、讨论区和博客等几大频道内容,为IT开发者提供了一个发现、使用、并交流开源技术的平台。
  • 官方手机版

  • 微信公众号

  • 商务合作