广告
返回顶部
首页 > 资讯 > 后端开发 > Python >java连接zookeeper实现zookeeper教程
  • 453
分享到

java连接zookeeper实现zookeeper教程

2024-04-02 19:04:59 453人浏览 八月长安

Python 官方文档:入门教程 => 点击学习

摘要

目录java连接ZooKeeper实现zookeeperZookeeperJavaapi基本操作1.连接客户端2.恢复回话3.创建节点4.修改节点5.删除节点6.查询节点7.查询子节

java连接zookeeper实现zookeeper

Java服务端连接Zookeeper,进行节点信息的获取,管理…整理成一个基本工具

添加依赖:


<dependency>
   <groupId>org.apache.zookeeper</groupId>
   <artifactId>zookeeper</artifactId>
   <version>3.3.6</version>
</dependency>

具体代码如下:


package com; 
import java.util.List;
import java.util.concurrent.CountDownLatch;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.Watcher.Event.KeeperState;
import org.apache.zookeeper.ZooDefs.Ids;
import org.apache.zookeeper.ZooKeeper;
import org.apache.zookeeper.data.Stat;  
 
public class BaseZookeeper implements Watcher{ 
   private ZooKeeper zookeeper;
    
   private static final int SESSION_TIME_OUT = 2000;
   private CountDownLatch countDownLatch = new CountDownLatch(1);
   @Override
   public void process(WatchedEvent event) {
      if (event.getState() == KeeperState.SyncConnected) {
         System.out.println("Watch received event");
         countDownLatch.countDown();
      }
   }   
  
   
   public void connectZookeeper(String host) throws Exception{
      zookeeper = new ZooKeeper(host, SESSION_TIME_OUT, this);
      countDownLatch.await();
      System.out.println("zookeeper connection success");
   }
  
   
   public String createnode(String path,String data) throws Exception{
      return this.zookeeper.create(path, data.getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
   }
  
   
   public List<String> getChildren(String path) throws KeeperException, InterruptedException{
      List<String> children = zookeeper.getChildren(path, false);
      return children;
   }
  
   
   public String getData(String path) throws KeeperException, InterruptedException{
      byte[] data = zookeeper.getData(path, false, null);
      if (data == null) {
         return "";
      }
      return new String(data);
   }
  
   
   public Stat setData(String path,String data) throws KeeperException, InterruptedException{
      Stat stat = zookeeper.setData(path, data.getBytes(), -1);
      return stat;
   }
  
   
   public void deleteNode(String path) throws InterruptedException, KeeperException{
      zookeeper.delete(path, -1);
   }
  
   
   public String getCTime(String path) throws KeeperException, InterruptedException{
      Stat stat = zookeeper.exists(path, false);
      return String.valueOf(stat.getCtime());
   }
  
   
   public Integer getChildrenNum(String path) throws KeeperException, InterruptedException{
      int childenNum = zookeeper.getChildren(path, false).size();
      return childenNum;
   }
   
   public void closeConnection() throws InterruptedException{
      if (zookeeper != null) {
         zookeeper.close();
      }
   }  
}  
 

测试


public class Demo { 
    public static void main(String[] args) throws Exception {
        BaseZookeeper zookeeper = new BaseZookeeper();
        zookeeper.connectZookeeper("192.168.0.1:2181"); 
        List<String> children = zookeeper.getChildren("/");
        System.out.println(children);
    } 
}

ZookeeperJavaAPI基本操作

Zookeeper官方提供了两种语言的API,Java和C,在这里只演示JavaAPI

操作API的类中的变量,一下方法都会使用到


static Logger logg = LoggerFactory.getLogger(ZKApi.class);
private static final String zkServerPath = "10.33.57.28:2181";
private static final String zkServerPath = "127.0.0.1:2181";
private static final Integer timeOut = 5000;
private static Stat stat = new Stat(); 

以及实现接口Watcher的实现方法process


public void process(WatchedEvent event) {
    try {
        if (event.getType() == Event.EventType.NodeDataChanged) {
            ZooKeeper zk = null;
            zk = ZKApi.getZkConnect();
            byte[] resByt = new byte[0];
            resByt = zk.getData("/test1", false, stat);
            String resStr = new String(resByt);
            System.out.println("更改后的值:" + resStr);
            System.out.println("版本号的变化:" + stat.getVersion());
            System.out.println("-------");
            countDown.countDown();
        }else if(event.getType() == Event.EventType.NodeChildrenChanged){
            System.out.println("NodeChildrenChanged");
            ZooKeeper zk = null;
            zk = ZKApi.getZkConnect();
            List<String> srcChildList = zk.getChildren(event.getPath(), false);
            for (String child:srcChildList){
                System.out.println(child);
            }
            countDown.countDown();
        }else if(event.getType() == Event.EventType.NodeCreated){
            countDown.countDown();
        }else if (event.getType() == Event.EventType.NodeCreated){
            countDown.countDown();
        }
    }  catch (KeeperException e) {
        e.printStackTrace();
    }  catch (InterruptedException e) {
        e.printStackTrace();
    } catch (IOException e) {
        e.printStackTrace();
    }
}

1.连接客户端

创建客户端连接使用Zookeeper类的构造函数

Zookeeper构造函数总共四个如下:



public ZooKeeper(String connectString, int sessionTimeout, Watcher watcher)
public ZooKeeper(String connectString, int sessionTimeout, Watcher watcher, boolean canBeReadOnly)
public ZooKeeper(String connectString, int sessionTimeout, Watcher watcher,
	long sessionId, byte[] sessionPasswd)
public ZooKeeper(String connectString, int sessionTimeout, Watcher watcher,
        long sessionId, byte[] sessionPasswd, boolean canBeReadOnly)

连接客户端代码


public static ZooKeeper getZkConnect() throws IOException {
    ZooKeeper zk = new ZooKeeper(zkServerPath, timeOut, new ZKApi());
    logg.debug("连接状态:{}", zk.getState());
    return zk;
}

DEBUG [main] - zookeeper.disableAutoWatchReset is false
DEBUG [main] - 连接状态:CONNECTING

2.恢复回话


public static void recoveryConnect() throws IOException, InterruptedException {
    ZooKeeper zooKeeper = new ZooKeeper(zkServerPath, timeOut, new ZKApi());
    long sessionId = zooKeeper.getSessionId();
    byte[] sessionPasswd = zooKeeper.getSessionPasswd();
    logg.debug("开始连接服务器 . . .");
    logg.debug("连接状态:{}",zooKeeper.getState());
    new Thread().sleep(1000 );
    logg.debug("开始重连 . . . ");
    ZooKeeper zooSession = new ZooKeeper(zkServerPath, timeOut, new ZKApi(), sessionId, sessionPasswd);
    logg.debug("重连状态:{}",zooSession.getState());
    new Thread().sleep(200);
    logg.debug("重连状态:{}",zooSession.getState());
}

DEBUG [main] - 开始连接服务器 . . .
DEBUG [main] - 连接状态:CONNECTING
DEBUG [main-SendThread(hdfa67:2181)] - Canonicalized address to hdfa67
 INFO [main-SendThread(hdfa67:2181)] - Opening Socket connection to server hdfa67/10.33.57.67:2181. Will not attempt to authenticate using SASL (unknown error)
 INFO [main-SendThread(hdfa67:2181)] - Socket connection established to hdfa67/10.33.57.67:2181, initiating session
DEBUG [main-SendThread(hdfa67:2181)] - Session establishment request sent on hdfa67/10.33.57.67:2181
 INFO [main-SendThread(hdfa67:2181)] - Session establishment complete on server hdfa67/10.33.57.67:2181, sessionid = 0x10000ea59aa0011, neGotiated timeout = 5000
DEBUG [main] - 开始重连 . . . 
 INFO [main] - Initiating client connection, connectString=10.33.57.67:2181 sessionTimeout=5000 watcher=ZKApi@73a28541 sessionId=0 sessionPasswd=<hidden>
DEBUG [main] - 重连状态:CONNECTING
DEBUG [main-SendThread(hdfa67:2181)] - Canonicalized address to hdfa67
 INFO [main-SendThread(hdfa67:2181)] - Opening socket connection to server hdfa67/10.33.57.67:2181. Will not attempt to authenticate using SASL (unknown error)
 INFO [main-SendThread(hdfa67:2181)] - Socket connection established to hdfa67/10.33.57.67:2181, initiating session
DEBUG [main-SendThread(hdfa67:2181)] - Session establishment request sent on hdfa67/10.33.57.67:2181
 INFO [main-SendThread(hdfa67:2181)] - Session establishment complete on server hdfa67/10.33.57.67:2181, sessionid = 0x10000ea59aa0012, negotiated timeout = 5000
DEBUG [main] - 重连状态:CONNECTED

3.创建节点

创建节点通过zk客户端对象的create方法进行创建,主要有两个方法:一种是同步,一种是异步,接下来的修改等方法同样如此,就不多加解释了



public String create(final String path, byte data[], List<ACL> acl,
        CreateMode createMode)
public void create(final String path, byte data[], List<ACL> acl,
        CreateMode createMode,  StrinGCallback cb, Object ctx)

public static void createZkNode1() throws IOException, KeeperException, InterruptedException {
    ZooKeeper zk = getZkConnect();
    String result = zk.create("/test1", "test-data".getBytes(), 
    	ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);//创建一个/test的持续节点
    System.out.println(result);
//输出/test1
public static void createZkNode2() throws IOException, KeeperException, InterruptedException {
    ZooKeeper zk = getZkConnect();
    
    String ctx = "{'create': 'success'}";
    zk.create("/test2", "test-data".getBytes(), 
    	ZooDefs.Ids.OPEN_ACL_UNSAFE,
    	CreateMode.PERSISTENT,new CreateCallBack() ,ctx);
    new Thread().sleep(2000);//需要暂停一会,否则创建失败
}

4.修改节点


public Stat setData(final String path, byte data[], int version)
public void setData(final String path, byte data[], int version,
        StatCallback cb, Object ctx)

public static void setZkNode1() throws IOException, KeeperException, InterruptedException{
    ZooKeeper zk = getZkConnect();
    Stat stat = zk.setData("/test1", "modifyed-data".getBytes(), 0);
    System.out.println(stat.getVersion());
}
public static void setZkNode2() throws IOException, KeeperException, InterruptedException{
    ZooKeeper zk = getZkConnect();
    String ctx = "{'modify': 'success'}";
    zk.setData("/test1", "modifyed-data".getBytes(),0,new ModifyCalback(),ctx);
    new Thread().sleep(1000);//必须加上,否则回掉不成功
}

5.删除节点


public void delete(final String path, int version)
public void delete(final String path, int version, VoidCallback cb,
        Object ctx)

public static void deleteZkNode1() throws IOException, KeeperException, InterruptedException {
    ZooKeeper zk = getZkConnect();
    zk.delete("/test1",1);//不能够删除子节点
}
public static void deleteZkNode2() throws IOException, InterruptedException {
    ZooKeeper zk = getZkConnect();
    String ctx = "{'delete': 'success'}";
    zk.delete("/test2",0,new DeleteCallBack(),ctx);//不能够删除子节点
    new Thread().sleep(1000);//必须加上,否则回掉不成功
}

6.查询节点


public byte[] getData(String path, boolean watch, Stat stat)
public byte[] getData(final String path, Watcher watcher, Stat stat)
public void getData(final String path, Watcher watcher,DataCallback cb, Object ctx)             
public void getData(String path, boolean watch, DataCallback cb, Object ctx)        

public static CountDownLatch countDown = new CountDownLatch(1);
public static void selectData1() throws IOException, KeeperException, InterruptedException {
    ZooKeeper zk = getZkConnect();
    byte[] data = zk.getData("/test1", true, stat);
    String s = new String(data);
    System.out.println("value: "+s);
    countDown.await();
}

if (event.getType() == Event.EventType.NodeDataChanged) {
            ZooKeeper zk = null;
            zk = ZKApi.getZkConnect();
            byte[] resByt = new byte[0];
            resByt = zk.getData("/test1", false, stat);
            String resStr = new String(resByt);
            System.out.println("更改后的值:" + resStr);
            System.out.println("版本号的变化:" + stat.getVersion());
            System.out.println("-------");
            countDown.countDown();
        

由于更改之后,触发了监听器,再次在命令行中进行更改,出现了一下结果。

在这里插入图片描述

7.查询子节点

查询子节点的方法


public List<String> getChildren(final String path, Watcher watcher)
public List<String> getChildren(String path, boolean watch)
public void getChildren(final String path, Watcher watcher, ChildrenCallback cb, Object ctx)
public void getChildren(String path, boolean watch, Children2Callback cb, Object ctx)   
public List<String> getChildren(final String path, Watcher watcher, Stat stat)
public List<String> getChildren(String path, boolean watch, Stat stat)
public void getChildren(final String path, Watcher watcher, Children2Callback cb, Object ctx)
public void getChildren(String path, boolean watch, Children2Callback cb, Object ctx)
            

代码实现


public static CountDownLatch countDown = new CountDownLatch(1);
public static void selectchildData1() throws IOException, KeeperException, InterruptedException {
    ZooKeeper zk = getZkConnect();
    List<String> srcChildList = zk.getChildren("/test", true, stat);
    for (String child:srcChildList){
        System.out.println(child);
    }
    countDown.await();
}

if(event.getType() == Event.EventType.NodeChildrenChanged){
    System.out.println("NodeChildrenChanged");
    ZooKeeper zk = null;
    zk = ZKApi.getZkConnect();
    List<String> srcChildList = zk.getChildren(event.getPath(), false);
    for (String child:srcChildList){
        System.out.println(child);
    }

运行结果完成后,触监听器,再次删除test1

在这里插入图片描述

第二种异步方式实现


public static void selectchildData2() throws IOException, KeeperException, InterruptedException{
    ZooKeeper zk = getZkConnect();
    String ctx = "{'selectChild': 'success'}";
    zk.getChildren("/test",false,new ChildrenCallback(),ctx);
    new Thread().sleep(1000);
}

8.使用递归得到所有的节点


public static void selectchildData3() throws IOException, KeeperException, InterruptedException{
   getChild("/");
}
public static void getChild(String path) throws IOException, KeeperException, InterruptedException {
    System.out.println(path);
    ZooKeeper zk = getZkConnect();
    List<String> childrenList = zk.getChildren(path, false, stat);
    if(childrenList.isEmpty() || childrenList ==null)
        return;
    for(String s:childrenList){
        if(path.equals("/"))
           getChild(path+s);
        else {
            getChild(path+"/"+s);
        }
    }
}

运行结果:

/zookeeper
/zookeeper/config
/zookeeper/quota
/ldd
/ldd/l
/loo
/t1
/test1
/seq
/seq/seq30000000002
/seq/seq20000000001
/seq/se0000000003
/seq/seq10000000000

9.判断节点是否存在


public static void existNode() throws IOException, KeeperException, InterruptedException {
    ZooKeeper zk = getZkConnect();
    Stat stat = zk.exists("/ff", true);
    System.out.println(stat);
}
//输出null则不存在

10.自定义权限


public static void oneSelfACL() throws Exception {
    ZooKeeper zk = getZkConnect();
    ArrayList<ACL> acls = new ArrayList<ACL>();
  //  zk.create("/test1","test-data".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE,CreateMode.PERSISTENT); //所有人均可访问
    Id id1 = new Id("digest", ACLUtils.getDigestUserPassword("id1:123456"));
    Id id2 = new Id("digest", ACLUtils.getDigestUserPassword("id2:123456"));
   // Id ipId = new Id("ip","127.0.0.1");ip设置
    // acls.add(new ACL(ZooDefs.Perms.ALL,id1));
    acls.add(new ACL(ZooDefs.Perms.ALL,id1));
    acls.add(new ACL(ZooDefs.Perms.DELETE,id2));
    //注册过的用户必须通过addAuthInfo才可以操作节点
    zk.addAuthInfo("digest","id1:123456".getBytes());
    zk.create("/test2","test2-data".getBytes(), acls,CreateMode.PERSISTENT);
}

结果如下:

在这里插入图片描述

直接登录id1由于在程序已经注册完成

在这里插入图片描述

在这里插入图片描述

在这里插入图片描述

以上为个人经验,希望能给大家一个参考,也希望大家多多支持编程网。

--结束END--

本文标题: java连接zookeeper实现zookeeper教程

本文链接: https://www.lsjlt.com/news/156983.html(转载时请注明来源链接)

有问题或投稿请发送至: 邮箱/279061341@qq.com    QQ/279061341

本篇文章演示代码以及资料文档资料下载

下载Word文档到电脑,方便收藏和打印~

下载Word文档
猜你喜欢
  • java连接zookeeper实现zookeeper教程
    目录java连接zookeeper实现zookeeperZookeeperJavaAPI基本操作1.连接客户端2.恢复回话3.创建节点4.修改节点5.删除节点6.查询节点7.查询子节...
    99+
    2022-11-12
  • java连接zookeeper的3种方式小结
    目录java连接zookeeper3种方式1、使用zookeeper原始api2、使用ZkClient客户端连接,这种连接比较简单3、使用curator连接Java集成zookeep...
    99+
    2022-11-12
  • 解决java连接zookeeper很慢的问题
    目录java连接zookeeper很慢记一次惨痛的zookeeper连接教训java连接zookeeper很慢 最近在学习zookeeper,但是在连接zookeeper服务端时很慢...
    99+
    2022-11-12
  • 如何解决java连接zookeeper很慢的问题
    这篇文章主要为大家展示了“如何解决java连接zookeeper很慢的问题”,内容简而易懂,条理清晰,希望能够帮助大家解决疑惑,下面让小编带领大家一起研究并学习一下“如何解决java连接zookeeper很慢的问题”这篇文章吧。java连接...
    99+
    2023-06-25
  • Java调用Zookeeper的实现步骤
    目录watch机制常用APIJAVA调用watch机制 Zookeeper watch是一种监听通知机制,可以随时监听一些数据的变化,从而实现数据的及时性。 Zookeeper所有的...
    99+
    2022-11-12
  • Java如何实现ZooKeeper分布式锁
    这篇文章主要介绍了Java如何实现ZooKeeper分布式锁,具有一定借鉴价值,感兴趣的朋友可以参考下,希望大家阅读完这篇文章之后大有收获,下面让小编带着大家一起了解一下。什么是分布式锁在我们进行单机应用开发,涉及并发同步的时候,我们往往采...
    99+
    2023-06-29
  • ZooKeeper框架教程Curator分布式锁实现及源码分析
    目录  如何使用InterProcessMutex  实现思路   代码实现概述  InterProcessMutex源码分析&nb...
    99+
    2022-11-13
  • zookeeper+Springboot实现服务器动态上下线监听教程详解
    目录zookeeper+Springboot实现服务器动态上下线监听教程一.什么是服务器动态上下线监听二.为什么要实现对服务器上下线的监听三.编码实现四.测试1.启动客户端,开启监听...
    99+
    2022-11-13
  • ZooKeeper入门教程三分布式锁实现及完整运行源码
    目录1.0版本2.0版本LockSample类构造方法获取锁实现createLock()attemptLock()释放锁实现 TicketSeller类sell()sell...
    99+
    2022-11-13
  • Java如何连接数据库图文教程
    目录前言1. 加载驱动:​2. 添加JDBC:3.在Mysql数据库中进行建表,和添加数据的操作。​4. 连接数据库并读取数据:总结前言 Java连接MySQL数据库的方法:首先下载...
    99+
    2023-05-15
    java实现数据库连接 java连接mysql数据库的代码 java如何与数据库建立连接
  • Java使用IntelliJ IDEA连接MySQL的详细教程
    本文将使用IntelliJ IDEA演示教程 安装MySQL驱动步骤 工具: IntelliJ IDEA MySQL8.0.18 MySQL连接驱动:mysql-connecto...
    99+
    2022-11-12
  • SQL Server连接查询的实用教程
    前沿小补充 例3.48 查询平均成绩大于等于80分的学生学号和平均成绩 SELECT Sno,AVG(Grade) FROM SC WHERE AVG(Grade...
    99+
    2022-11-12
  • java中JDBC技术连接Mysql实现增删改查超级详细的教程
    文章目录 一、JDBC基本介绍1.简单介绍2.准确介绍 二、IDEA项目中导入Mysql驱动jar包三、使用java程序连接Mysql数据库进行增删改查操作1.创建Mysql数据库连接2.实现增加记录3.实现删除记录4.实现修改...
    99+
    2023-12-23
    java intellij-idea 开发语言 mysql 数据库
  • Java数据库连接池技术的入门教程
    池(Pool)技术在一定程度上可以明显优化服务器应用程序的性能,提高程序执行效率和降低系统资源开销。数据库连接池负责分配、管理和释放数据库连接,它允许应用程序重复使用一个现有的数据库...
    99+
    2022-11-11
  • java连接SQL Server数据库的超详细教程
    目录前言一、sqlserver的配置1、Tcp\Ip 配置2、sql server身份验证配置二、jdbc下载及配置配置环境三、java配置及代码总结 前言 最近数据库大作...
    99+
    2022-11-13
  • nodejs连接dubbo服务的java工程实现示例
    目录前言1.启动dubbo提供服务2.新建node测试文件nodeTest.js3.下载所需依赖4.启动服务 前言 最近公司整体服务架构从MQ迁移到dubbo上,公司有部分项目使用了...
    99+
    2022-11-13
  • Linux系统下实现远程连接MySQL数据库的方法教程
    前言最近在工作中遇到了这个需求,估计搞了一个多小时才把这个远程连接搞好。一台本地电脑,一台云服务器,都是linux系统。下面来看看详细的介绍:步骤1、在服务器端开启远程访问首先进入mysql数据库,然后输入...
    99+
    2022-10-18
  • 怎么用Java实现redis连接池
    这篇文章主要介绍“怎么用Java实现redis连接池”,在日常操作中,相信很多人在怎么用Java实现redis连接池问题上存在疑惑,小编查阅了各式资料,整理出简单好用的操作方法,希望对大家解答”怎么用Jav...
    99+
    2022-10-18
  • JAVA实现心跳检测(长连接)
    目录1、心跳机制简介2、心跳机制实现方式3、客户端4 、服务端5、代码实现5.1 KeepAlive.java5.2 MyClient.java5.3 MyServer5.4 测试结...
    99+
    2023-03-01
    JAVA 心跳检测
  • Linux下mysql实现远程连接
    首先明白一点并不是mysql禁止远程连接,而是MYSQL的账号禁止远程连接。可能觉得我有点咬文嚼字了,不过我感觉分清这点还是很重要的。默认情况下,所有账号都是禁止远程连接的。在安装MYSQL的时候,在设置R...
    99+
    2022-10-18
软考高级职称资格查询
编程网,编程工程师的家园,是目前国内优秀的开源技术社区之一,形成了由开源软件库、代码分享、资讯、协作翻译、讨论区和博客等几大频道内容,为IT开发者提供了一个发现、使用、并交流开源技术的平台。
  • 官方手机版

  • 微信公众号

  • 商务合作