iis服务器助手广告广告
返回顶部
首页 > 资讯 > 精选 >Java Zookeeper分布式分片算法源码分析
  • 325
分享到

Java Zookeeper分布式分片算法源码分析

2023-07-05 07:07:26 325人浏览 泡泡鱼
摘要

这篇文章主要介绍了Java ZooKeeper分布式分片算法源码分析的相关知识,内容详细易懂,操作简单快捷,具有一定借鉴价值,相信大家阅读完这篇Java Zookeeper分布式分片算法源码分析文章都会有所收获,下面我们

这篇文章主要介绍了Java ZooKeeper分布式分片算法源码分析的相关知识,内容详细易懂,操作简单快捷,具有一定借鉴价值,相信大家阅读完这篇Java Zookeeper分布式分片算法源码分析文章都会有所收获,下面我们一起来看看吧。

    背景

    公司的一个服务需要做类似于分片的逻辑,一开始服务基于传统部署方式通过本地配置文件配置的方式就可以指定该机器服务的分片内容如:0,1,2,3,随着系统的升级迭代,该服务进行了容器化部署,所以原来基于本地配置文件各自配置分片数据的方式就不适用了,原来的部署方式使得服务是有状态,是一种非云原生的方式,所以该服务要重新设计实现一套分布式服务分片逻辑。

    技术方案

    分布式协调中间件

    要实现分布式服务分片的能力,需要有一个分布式中间件,如:RedisMysqlZookeeper等等都可以,我们选用Zookeeper

    基于Zookeeper的技术方案

    使用Zookeeper主要是基于Zookeeper的临时节点和节点变化监听机制,具体的技术设计如下:

    服务注册目录设计

    Zookeeper的数据存储结构类似于目录,服务注册后的目录类似如下结构:

    解释下该目录结构,首先/xxxx/xxxx/sharding是区别于其他业务的的目录,该目录节点是持久的,service是服务目录,标识一个服务,该节点也是持久的,ip1ip2是该服务注册到Zookeeper的机器列表节点,该节点是临时节点。

    /xxxx/xxxx/sharding/service/ip1
    -----|----|--------|-------/ip2

    服务分片处理流程
    • 服务启动,创建CuratorFramework客户端,设置客户端连接状态监听;

    • Zookeeper注册该机器的信息,这里设计简单,机器信息就是ip地址;

    • 注册机器信息后,从Zookeeper获取所有注册信息;

    • 根据Zookeeper获取的所有注册机器信息根据分片算法进行分片计算。

    编码实现

    ZookeeperConfig

    Zookeeper的配置信息

    @Datapublic class ZookeeperConfig {        private String zkAddress;        private String nodePath;        private String serviceName;        private Integer shardinGCount;    public ZookeeperConfig(String zkAddress, String nodePath, String serviceName, Integer shardingCount) {        this.zkAddress = zkAddress;        this.nodePath = nodePath;        this.serviceName = "/" + serviceName;        this.shardingCount = shardingCount;    }        private int baseSleepTimeMilliseconds = 1000;        private int maxSleepTimeMilliseconds = 3000;        private int maxRetries = 3;        private int sessionTimeoutMilliseconds;        private int connectionTimeoutMilliseconds;}

    InstanceInfo注册机器

    @AllArgsConstructor@EqualsAndHashCode()public class InstanceInfo {    private String ip;    public String getInstance() {        return ip;    }}

    ZookeeperShardingService分片服务

    @Slf4jpublic class ZookeeperShardingService {    public final Map<String, List<Integer>> caches = new HashMap<>(16);    private final CuratorFramework client;    private final ZookeeperConfig zkConfig;    private final ShardingStrategy shardingStrategy;    private final InstanceInfo instanceInfo;    private static final CountDownLatch COUNT_DOWN_LATCH = new CountDownLatch(1);    public ZookeeperShardingService(ZookeeperConfig zkConfig, ShardingStrategy shardingStrategy) {        this.zkConfig = zkConfig;        log.info("开始初始化zk, ip列表是: {}.", zkConfig.getZkAddress());        CuratorFrameworkFactory.Builder builder = CuratorFrameworkFactory.builder()                .connectString(zkConfig.getZkAddress())                .retryPolicy(new ExponentialBackoffRetry(zkConfig.getBaseSleepTimeMilliseconds(), zkConfig.getMaxRetries(), zkConfig.getMaxSleepTimeMilliseconds()));        if (0 != zkConfig.getSessionTimeoutMilliseconds()) {            builder.sessionTimeoutMs(zkConfig.getSessionTimeoutMilliseconds());        }        if (0 != zkConfig.getConnectionTimeoutMilliseconds()) {            builder.connectionTimeoutMs(zkConfig.getConnectionTimeoutMilliseconds());        }        this.shardingStrategy = shardingStrategy;        HostInfo host = new HostInfo();        this.instanceInfo = new InstanceInfo(host.getAddress());        client = builder.build();        client.getConnectionStateListenable().addListener(new ConnectionListener());        client.start();        try {            COUNT_DOWN_LATCH.await();        } catch (InterruptedException e) {            e.printStackTrace();        }        // 注册服务节点监听        reGISterPathChildListener(zkConfig.getNodePath() + zkConfig.getServiceName(), new ChildrenPathListener());        try {            if (!client.blockUntilConnected(zkConfig.getMaxSleepTimeMilliseconds() * zkConfig.getMaxRetries(), TimeUnit.MILLISECONDS)) {                client.close();                throw new KeeperException.OperationTimeoutException();            }        } catch (final Exception ex) {            ex.printStackTrace();            throw new RuntimeException(ex);        }    }        private void registerPathChildListener(String nodePath, PathChildrenCacheListener listener) {        try {            // 1. 创建一个PathChildrenCache            PathChildrenCache pathChildrenCache = new PathChildrenCache(client, nodePath, true);            // 2. 添加目录监听器            pathChildrenCache.getListenable().addListener(listener);            // 3. 启动监听器            pathChildrenCache.start(PathChildrenCache.StartMode.BUILD_INITIAL_CACHE);        } catch (Exception e) {            log.error("注册子目录监听器出现异常,nodePath:{}",nodePath,e);            throw new RuntimeException(e);        }    }        private void zkOp() throws Exception {        // 是否存在ruubypay-sharding主节点        if (null == client.checkExists().forPath(zkConfig.getNodePath())) {            client.create().creatingParentsIfNeeded().withMode(CreateMode.PERSISTENT).forPath(zkConfig.getNodePath(), Hashing.sha1().hashString("sharding", Charsets.UTF_8).toString().getBytes());        }        // 是否存服务主节点        if (null == client.checkExists().forPath(zkConfig.getNodePath() + zkConfig.getServiceName())) {            // 创建服务主节点            client.create().creatingParentsIfNeeded().withMode(CreateMode.PERSISTENT).forPath(zkConfig.getNodePath() + zkConfig.getServiceName());        }        // 检查是否存在临时节点        if (null == client.checkExists().forPath(zkConfig.getNodePath() + zkConfig.getServiceName() + "/" + instanceInfo.getInstance())) {            System.out.println(zkConfig.getNodePath() + zkConfig.getServiceName() +  "/" + instanceInfo.getInstance());            // 创建临时节点            client.create().creatingParentsIfNeeded().withMode(CreateMode.EPHEMERAL).forPath(zkConfig.getNodePath() + zkConfig.getServiceName() +                    "/" + instanceInfo.getInstance(), zkConfig.getShardingCount().toString().getBytes(StandardCharsets.UTF_8));        }        shardingFromZk();    }        private void shardingFromZk() throws Exception {        // 从 serviceName 节点下获取所有Ip列表        final GetChildrenBuilder childrenBuilder = client.getChildren();        final List<String> instanceList = childrenBuilder.watched().forPath(zkConfig.getNodePath() + zkConfig.getServiceName());        List<InstanceInfo> res = new ArrayList<>();        instanceList.forEach(s -> {            res.add(new InstanceInfo(s));        });        Map<InstanceInfo, List<Integer>> shardingResult = shardingStrategy.sharding(res, zkConfig.getShardingCount());        // 先清一遍缓存        caches.clear();        shardingResult.forEach((k, v) -> {            caches.put(k.getInstance().split("-")[0], v);        });    }        private class ConnectionListener implements ConnectionStateListener {        @Override        public void stateChanged(CuratorFramework client, ConnectionState newState) {            if (newState == ConnectionState.CONNECTED || newState == ConnectionState.LOST || newState == ConnectionState.RECONNECTED) {                try {                    zkOp();                } catch (Exception e) {                    e.printStackTrace();                    throw new RuntimeException(e);                } finally {                    COUNT_DOWN_LATCH.countDown();                }            }        }    }        private class ChildrenPathListener implements PathChildrenCacheListener {        @Override        public void childEvent(CuratorFramework client, PathChildrenCacheEvent event) {            PathChildrenCacheEvent.Type type = event.getType();            if (PathChildrenCacheEvent.Type.CHILD_ADDED == type || PathChildrenCacheEvent.Type.CHILD_REMOVED == type) {                try {                    shardingFromZk();                } catch (Exception e) {                    e.printStackTrace();                    throw new RuntimeException(e);                }            }        }    }}

    分片算法

    采用平均分配的算法

    public interface ShardingStrategy {    Map<InstanceInfo, List<Integer>> sharding(final List<InstanceInfo> list, Integer shardingCount);}public class AverageAllocationShardingStrategy implements ShardingStrategy {    @Override    public Map<InstanceInfo, List<Integer>> sharding(List<InstanceInfo> list, Integer shardingCount) {        if (list.isEmpty()) {            return null;        }        Map<InstanceInfo, List<Integer>> result = shardingAliquot(list, shardingCount);        addAliquant(list, shardingCount, result);        return result;    }    private Map<InstanceInfo, List<Integer>> shardingAliquot(final List<InstanceInfo> instanceInfos, final int shardingTotalCount) {        Map<InstanceInfo, List<Integer>> result = new LinkedHashMap<>(shardingTotalCount, 1);        int itemCountPerSharding = shardingTotalCount / instanceInfos.size();        int count = 0;        for (InstanceInfo each : instanceInfos) {            List<Integer> shardingitems = new ArrayList<>(itemCountPerSharding + 1);            for (int i = count * itemCountPerSharding; i < (count + 1) * itemCountPerSharding; i++) {                shardingItems.add(i);            }            result.put(each, shardingItems);            count++;        }        return result;    }    private void addAliquant(final List<InstanceInfo> instanceInfos, final int shardingTotalCount, final Map<InstanceInfo, List<Integer>> shardingResults) {        int aliquant = shardingTotalCount % instanceInfos.size();        int count = 0;        for (Map.Entry<InstanceInfo, List<Integer>> entry : shardingResults.entrySet()) {            if (count < aliquant) {                entry.getValue().add(shardingTotalCount / instanceInfos.size() * instanceInfos.size() + count);            }            count++;        }    }}

    关于“Java Zookeeper分布式分片算法源码分析”这篇文章的内容就介绍到这里,感谢各位的阅读!相信大家对“Java Zookeeper分布式分片算法源码分析”知识都有一定的了解,大家如果还想学习更多知识,欢迎关注编程网精选频道。

    --结束END--

    本文标题: Java Zookeeper分布式分片算法源码分析

    本文链接: https://www.lsjlt.com/news/350381.html(转载时请注明来源链接)

    有问题或投稿请发送至: 邮箱/279061341@qq.com    QQ/279061341

    本篇文章演示代码以及资料文档资料下载

    下载Word文档到电脑,方便收藏和打印~

    下载Word文档
    猜你喜欢
    • Java Zookeeper分布式分片算法源码分析
      这篇文章主要介绍了Java Zookeeper分布式分片算法源码分析的相关知识,内容详细易懂,操作简单快捷,具有一定借鉴价值,相信大家阅读完这篇Java Zookeeper分布式分片算法源码分析文章都会有所收获,下面我们...
      99+
      2023-07-05
    • Java Zookeeper分布式分片算法超详细讲解流程
      目录背景技术方案分布式协调中间件基于Zookeeper的技术方案服务注册目录设计服务分片处理流程编码实现总结背景 公司的一个服务需要做类似于分片的逻辑,一开始服务基于传统部署方式通过...
      99+
      2023-03-01
      Java Zookeeper分布式分片算法 Java Zookeeper Java分布式分片算法
    • 分布式Netty源码分析
      这篇文章主要介绍了分布式Netty源码分析的相关知识,内容详细易懂,操作简单快捷,具有一定借鉴价值,相信大家阅读完这篇分布式Netty源码分析文章都会有所收获,下面我们一起来看看吧。服务器端demo看下一个简单的Netty服务器端的例子pu...
      99+
      2023-06-29
    • ZooKeeper框架教程Curator分布式锁实现及源码分析
      目录  如何使用InterProcessMutex  实现思路   代码实现概述  InterProcessMutex源码分析&nb...
      99+
      2024-04-02
    • 分布式Netty源码EventLoopGroup分析
      这篇文章主要介绍“分布式Netty源码EventLoopGroup分析”的相关知识,小编通过实际案例向大家展示操作过程,操作方法简单快捷,实用性强,希望这篇“分布式Netty源码EventLoopGroup分析”文章能帮助大家解决问题。Ev...
      99+
      2023-06-29
    • 分布式Netty源码分析概览
      目录服务器端demoEventLoopGroup介绍功能1:先来看看注册Channel功能2:执行一些Runnable任务ChannelPipeline介绍bind过程sync介绍误...
      99+
      2024-04-02
    • 分析ZooKeeper分布式锁的实现
      目录一、分布式锁方案比较二、ZooKeeper实现分布式锁2.1、方案一2.2、方案二一、分布式锁方案比较 方案 ...
      99+
      2024-04-02
    • ShardingSphere数据分片算法及测试源码分析
      这篇文章主要介绍“ShardingSphere数据分片算法及测试源码分析”的相关知识,小编通过实际案例向大家展示操作过程,操作方法简单快捷,实用性强,希望这篇“ShardingSphere数据分片算法及测试源码分析”文章能帮助大家解决问题。...
      99+
      2023-07-05
    • 分布式Netty源码分析EventLoopGroup及介绍
      目录EventLoopGroup介绍功能1:先来看看注册Channel功能2:执行一些Runnable任务EventLoop介绍NioEventLoop介绍EpollEventLoo...
      99+
      2024-04-02
    • go语言分布式id生成器及分布式锁源码分析
      本文小编为大家详细介绍“go语言分布式id生成器及分布式锁源码分析”,内容详细,步骤清晰,细节处理妥当,希望这篇“go语言分布式id生成器及分布式锁源码分析”文章能帮助大家解决疑惑,下面跟着小编的思路慢慢深入,一起来学习新知识吧。分布式 i...
      99+
      2023-07-05
    • Netty分布式ByteBuf的分类方式源码解析
      目录ByteBuf根据不同的分类方式 会有不同的分类结果1.Pooled和Unpooled2.基于直接内存的ByteBuf和基于堆内存的ByteBuf3.safe和unsafe上一小...
      99+
      2024-04-02
    • Netty分布式源码分析监听读事件
      前文传送门:NioSocketChannel注册到selector 我们回到AbstractUnsafe的register0()方法: private void register0(...
      99+
      2024-04-02
    • Redisson分布式锁之加解锁源码分析
      这篇文章主要介绍“Redisson分布式锁之加解锁源码分析”的相关知识,小编通过实际案例向大家展示操作过程,操作方法简单快捷,实用性强,希望这篇“Redisson分布式锁之加解锁源码分析”文章能帮助大家解决问题。锁的可重入性我们都知道,Ja...
      99+
      2023-07-05
    • Java经典排序算法源码分析
      本篇内容主要讲解“Java经典排序算法源码分析”,感兴趣的朋友不妨来看看。本文介绍的方法操作简单快捷,实用性强。下面就让小编来带大家学习“Java经典排序算法源码分析”吧!1.1 快速排序快速排序,一种排序很快的方法,使用分治思想,就是说快...
      99+
      2023-07-05
    • Netty分布式flush方法刷新buffer队列源码分析
      本文小编为大家详细介绍“Netty分布式flush方法刷新buffer队列源码分析”,内容详细,步骤清晰,细节处理妥当,希望这篇“Netty分布式flush方法刷新buffer队列源码分析”文章能帮助大家解决疑惑,下面跟着小编的思路慢慢深入...
      99+
      2023-06-29
    • 怎么分析ZooKeeper分布式任务调度中心
      怎么分析ZooKeeper分布式任务调度中心,相信很多没有经验的人对此束手无策,为此本文总结了问题出现的原因和解决方法,通过这篇文章希望你能解决这个问题。一: 背景软件系统中,定时任务往往不可或缺,大家可能会采用Spring qu...
      99+
      2023-06-04
    • Netty分布式pipeline传播inbound事件源码分析
      目录传播inbound事件这里给大家看两种写法我们先以写法2为例, 将这种写法进行剖析我们跟进invokeChannelRead方法:我们跟到invokeChannelRead方法中...
      99+
      2024-04-02
    • elasticsearch分布式及数据的功能源码分析
      从功能上说,可以分为两部分,分布式功能和数据功能。分布式功能主要是节点集群及集群附属功能如restful借口、集群性能检测功能等,数据功能主要是索引和搜索。代码上这些功能并不是完全独...
      99+
      2024-04-02
    • Java如何实现ZooKeeper分布式锁
      这篇文章主要介绍了Java如何实现ZooKeeper分布式锁,具有一定借鉴价值,感兴趣的朋友可以参考下,希望大家阅读完这篇文章之后大有收获,下面让小编带着大家一起了解一下。什么是分布式锁在我们进行单机应用开发,涉及并发同步的时候,我们往往采...
      99+
      2023-06-29
    • Zookeeper事务日志预分配空间源码分析
      这篇文章主要介绍“Zookeeper事务日志预分配空间源码分析”,在日常操作中,相信很多人在Zookeeper事务日志预分配空间源码分析问题上存在疑惑,小编查阅了各式资料,整理出简单好用的操作方法,希望对大家解答”Zookeeper事务日志...
      99+
      2023-07-05
    软考高级职称资格查询
    编程网,编程工程师的家园,是目前国内优秀的开源技术社区之一,形成了由开源软件库、代码分享、资讯、协作翻译、讨论区和博客等几大频道内容,为IT开发者提供了一个发现、使用、并交流开源技术的平台。
    • 官方手机版

    • 微信公众号

    • 商务合作