iis服务器助手广告广告
返回顶部
首页 > 资讯 > 数据库 >分布式 | DBLE LOAD DATA 功能实现解析
  • 489
分享到

分布式 | DBLE LOAD DATA 功能实现解析

分布式|DBLELOADDATA功能实现解析 2019-03-17 18:03:41 489人浏览 猪猪侠
摘要

1.概述 本篇文章主要介绍 DBLE LOAD DATA 大规模数据导入功能的实现,包括方案设计、源码解读。 下面就让我们一起来探秘 DBLE 是如何实现该功能的吧! 2.方案设计 LOAD DATA 为 Mysql 提供的从文本文件导

分布式 | DBLE LOAD DATA 功能实现解析


1.概述

本篇文章主要介绍 DBLE LOAD DATA 大规模数据导入功能的实现,包括方案设计、源码解读。

下面就让我们一起来探秘 DBLE 是如何实现该功能的吧!

2.方案设计

LOAD DATA 为 Mysql 提供的从文本文件导入数据到表的语法,作为数据库中间件,当然也需要实现对应的功能,来满足用户的导入数据需求。

DBLE 对该功能的实现其实就是直接模拟了 mysql 对 LOAD DATA 命令相应的处理协议。当然作为数据库中间件,还需要处理相应数据的存储、数据路由情况以及与后端 Mysql 的交互等方面的逻辑。

下图即为 DBLE 对 LOAD DATA 处理的整体流程:

DBLE LOAD DATA整体处理流程

3.源码解读

DBLE 与 LOAD DATA 功能实现相关的类其实主要有两个,一个是 ServerLoadDatainfileHandler 类,一个是 LoadDataUtil 类,ServerLoadDataInfileHandler 类主要处理的是与客户端交互的逻辑,而 LoadDataUtil 类主要处理的是与后端 MySQL 交互的逻辑。

下面我们就从客户端发送命令到 DBLE 处理,最后到 DBLE 与后端 MySQL 交互的过程,来详细看下相应的代码。

当客户端发来 LOAD DATA 导入数据到表命令的时候,DBLE 作为服务端会接收到相应的命令并进行处理,对应的代码在 ServerQueryHandler#query 方法中,这里会判断 SQL 的类型为 LOAD DATA,然后进一步处理:

public void query(String sql) {
        ServerConnection c = this.source;
        if (LOGGER.isDebugEnabled()) {
            LOGGER.debug(String.valueOf(c) + sql);
        }
         ……
        int rs = ServerParse.parse(sql);
        boolean isWithHint = ServerParse.startWithHint(sql);
        int sqlType = rs & 0xff;
        ……
        switch (sqlType) {
        ……
            case ServerParse.LOAD_DATA_INFILE_SQL:
                    //对LOAD DATA的处理,调用FrontendConnection#loadDataInfileStart方法
                    c.loadDataInfileStart(sql);
                    break;
      ……
      }
  }

继续看一下 FrontendConnection#loadDataInfileStart 方法:

	public void loadDataInfileStart(String sql) {
        if (loadDataInfileHandler != null) {
            try {
                //进一步调用了ServerLoadDataInfileHandler#start方法
                loadDataInfileHandler.start(sql);
            } catch (Exception e) {
                LOGGER.info("load data error", e);
                writeErrMessage(ErrorCode.ERR_HANDLE_DATA, e.getMessage());
            }

        } else {
            writeErrMessage(ErrorCode.ER_UNKNOWN_COM_ERROR, "load data infile sql is not  unsupported!");
        }
    }

下面便进入到了 ServerLoadDataInfileHandler#start 方法,前面讲过该类主要处理的是 DBLE 与客户端的交互逻辑。

该方法比较长,大家可以去细看,主要功能还是解析了客户端发送过来的 SQL 语句,然后针对 LOAD DATA 语法,如果导入文件是本机文件,则直接进行解析,否则的话会向客户端发送获取文件的命令,让客户端传输文件过来:

public void start(String strSql) {
        ……
        parseLoadDataPram();
        //如果文件不在本地,则向客户端发送命令,请求数据文件,这里的local可能会让人疑惑,但MySQL语法确实是这么规定的,load data local用法反而是文件不在本地的用法
        if (statement.isLocal()) {
            isStartLoadData = true;
            //request file from client
            ByteBuffer buffer = serverConnection.allocate();
            RequestFilePacket filePacket = new RequestFilePacket();
            filePacket.setFileName(fileName.getBytes());
            filePacket.setPacketId(1);
            filePacket.write(buffer, serverConnection, true);
        } else {
            //如果文件在本地的话,先判断文件是否存在,不存在则报错,存在的话需要对文件进行读取,计算每一行的路由结果,然后对不同节点的数据分别进行存储
            if (!new File(fileName).exists()) {
                String msg = fileName + " is not found!";
                clear();
                serverConnection.writeErrMessage(ErrorCode.ER_FILE_NOT_FOUND, msg);
            } else {
                if (parseFileByLine(fileName, loadData.getCharset(), loadData.getLineTerminatedBy())) {
                    RouteResultset rrs = buildResultSet(routeResultMap);
                    if (rrs != null) {
                        flushDataToFile();
                        isStartLoadData = false;
                        serverConnection.getSession2().execute(rrs);
                    }
                }
            }
        }
    }

DBLE 发送命令给客户端后,客户端便会源源不断地把数据文件发送过来,对发送过来文件的处理逻辑在 ServerLoadDataInfileHandler#handle 方法中,该方法其实就是对传输过来的文件进行转储,默认数据小于 200Mb 则存在内存中,否则的话存储到本地文件:

public void handle(byte[] data) {
        try {
            if (sql == null) {
                clear();
                serverConnection.writeErrMessage(ErrorCode.ER_UNKNOWN_COM_ERROR, "Unknown command");
                return;
            }
            BinaryPacket packet = new BinaryPacket();
            ByteArrayInputStream inputStream = new ByteArrayInputStream(data, 0, data.length);
            packet.read(inputStream);
            //这里就是对发送过来的文件进行转储
            saveByteOrToFile(packet.getData(), false);
        } catch (IOException e) {
            throw new RuntimeException(e);
        }
    }

文件发送完成,客户端还会发送一个空包过来,告诉 DBLE 数据发送完了,然后 DBLE 会进行下一步处理(其实这里就是 MySQL 协议中的规定),下一步处理的逻辑在 ServerLoadDataInfileHandler#end 方法中。

该方法也比较长,主要处理逻辑是将接受过来的文件进一步计算路由,根据计算结果将文件根据不同节点分别存储,最后构建路由结果集,通过 DBLE 下发 LOAD DATA 命令到后端不同的 MySQL 节点:

public void end(byte packId) {
        isStartLoadData = false;
        this.packID = packId;
        //empty packet for end
        saveByteOrToFile(null, true);

        if (isHasStoreToFile) {
            //这里便是计算路由,并根据路由结果存储不同节点的数据文件
            parseFileByLine(tempFile, loadData.getCharset(), loadData.getLineTerminatedBy());
        }
        ……
        //构建路由结果集,下发后端MySQL,执行LOAD DATA命令
        RouteResultset rrs = buildResultSet(routeResultMap);
        if (rrs != null) {
            flushDataToFile();
            serverConnection.getSession2().execute(rrs);
        }
}

DBLE 与后端 MySQL 的交互逻辑跟客户端与 DBLE 的交互逻辑基本一样,因为都是基于 MySQL 协议嘛,DBLE 这边还需要做的就是将不同节点的数据文件发送给后端的 MySQL,具体的逻辑在 LoadDataUtil#requestFileDataResponse 方法中,该方法就是将 DBLE 处理过的数据文件,发送到后端的 MySQL 了,由 MySQL 来进行真正的数据存储:

public static void requestFileDataResponse(byte[] data, BackendConnection conn) {
        byte packId = data[3];
        MySQLConnection c = (MySQLConnection) conn;
        RouteResultsetnode rrn = (RouteResultsetNode) conn.getAttachment();
        LoadData loadData = rrn.getLoadData();
        List loadDataData = loadData.getData();

        BufferedInputStream in = null;
        try {
            //如果数据较小,都在内存中,则直接发送
            if (loadDataData != null && loadDataData.size() > 0) {
                ByteArrayOutputStream bos = new ByteArrayOutputStream();
                for (String loadDataDataLine : loadDataData) {
                    String s = loadDataDataLine + loadData.getLineTerminatedBy();
                    byte[] bytes = s.getBytes(CharsetUtil.getJavaCharset(loadData.getCharset()));
                    bos.write(bytes);
                }
                packId = writeToBackConnection(packId, new ByteArrayInputStream(bos.toByteArray()), c);
            } else {
                //否则的话,先读取文件,然后再发送数据
                in = new BufferedInputStream(new FileInputStream(loadData.getFileName()));
                packId = writeToBackConnection(packId, in, c);
            }
        } 
     ……
    }

到这里,整个 DBLE 对 LOAD DATA 的处理流程就讲完啦。

4.总结

本篇文章主要分析讲解了 DBLE 对 LOAD DATA 功能的实现,包括方案设计以及源码解读,希望大家看完后能对整个 LOAD DATA 功能有更进一步的了解。

您可能感兴趣的文档:

--结束END--

本文标题: 分布式 | DBLE LOAD DATA 功能实现解析

本文链接: https://www.lsjlt.com/news/4941.html(转载时请注明来源链接)

有问题或投稿请发送至: 邮箱/279061341@qq.com    QQ/279061341

本篇文章演示代码以及资料文档资料下载

下载Word文档到电脑,方便收藏和打印~

下载Word文档
猜你喜欢
  • Redis如何实现分布式锁功能
    Redis如何实现分布式锁功能分布式锁是在分布式系统中常用的一种同步机制,它可以帮助我们在多个进程或多台服务器之间实现对共享资源的互斥访问。Redis作为一种高性能的缓存和消息队列中间件,也提供了实现分布式锁的功能。本文将介绍Redis如何...
    99+
    2023-11-07
    分布式 redis
  • Redis如何实现分布式缓存功能
    Redis如何实现分布式缓存功能,需要具体代码示例摘要:Redis是一个高性能的数据缓存和存储系统,它具备分布式特性,可以支持分布式缓存的功能。本文将介绍Redis如何实现分布式缓存,并提供具体的代码示例来帮助读者理解。概述分布式缓存是一种...
    99+
    2023-11-07
    redis 实现 分布式缓存
  • Redis如何实现分布式搜索功能
    Redis是一款高性能的NoSQL数据库,其提供了丰富的功能和数据结构,包括字符串、哈希表、列表、集合和有序集合等。除此之外,Redis还提供了一些高级功能,例如发布订阅、Lua脚本和事务等。其中,Redis的分布式搜索功能非常实用,可以帮...
    99+
    2023-11-08
    实现方式 搜索功能 Redis分布式
  • 分析ZooKeeper分布式锁的实现
    目录一、分布式锁方案比较二、ZooKeeper实现分布式锁2.1、方案一2.2、方案二一、分布式锁方案比较 方案 ...
    99+
    2024-04-02
  • redis实现分布式锁实例分析
    本文小编为大家详细介绍“redis实现分布式锁实例分析”,内容详细,步骤清晰,细节处理妥当,希望这篇“redis实现分布式锁实例分析”文章能帮助大家解决疑惑,下面跟着小编的思路慢慢深入,一起来学习新知识吧。1、业务场景引入模拟一个电商系统,...
    99+
    2023-06-29
  • 如何使用Redis实现分布式限流功能
    如何使用Redis实现分布式限流功能引言:随着互联网的快速发展,业务系统的访问量也日益增加。当流量集中到某一业务系统时,会给系统的稳定性和性能带来一定的威胁。为了保护业务系统,限流成为一种必不可少的手段。在分布式系统中,使用Redis可以方...
    99+
    2023-11-07
    分布式 redis 限流
  • elasticsearch分布式及数据的功能源码分析
    从功能上说,可以分为两部分,分布式功能和数据功能。分布式功能主要是节点集群及集群附属功能如restful借口、集群性能检测功能等,数据功能主要是索引和搜索。代码上这些功能并不是完全独...
    99+
    2024-04-02
  • 如何使用Redis实现分布式计算功能
    如何使用Redis实现分布式计算功能引言:随着互联网技术的快速发展,越来越多的应用程序需要处理大规模的数据和复杂的计算。在传统的单机计算环境下,处理这些任务可能会变得非常困难和低效。为了充分利用分布式系统的优势,一种常见的解决方案是将计算任...
    99+
    2023-11-07
    计算 分布式 redis
  • redisson实现分布式锁的源码解析
    目录redisson测试代码加锁设计锁续期设计锁的自旋重试解锁设计撤销锁续期解锁成功唤排队线程 redisson redisson 实现分布式锁的机制如下: 依赖版本 implem...
    99+
    2024-04-02
  • Java-Redis-Redisson分布式锁的功能使用及实现
    目录前置基础设施功能使用和介绍其他悲观锁的实现方式前置 Java-Redis-Redisson配置基础上我们进行了改造,让锁的使用更加方便 基础设施 RedissonLock imp...
    99+
    2022-11-13
    Java Redis Redisson分布式锁 Java  Redisson分布式锁
  • 基于redis分布式锁如何实现秒杀功能
    这篇文章主要介绍了基于redis分布式锁如何实现秒杀功能,具有一定借鉴价值,感兴趣的朋友可以参考下,希望大家阅读完这篇文章之后大有收获,下面让小编带着大家一起了解一下。业务场景所谓秒杀,从业务角度看,是短时...
    99+
    2024-04-02
  • redis分布式锁RedissonLock的实现细节解析
    redis分布式锁RedissonLock 简单使用 String key = "key-lock"; RLock lock = redisson.getLock(key); l...
    99+
    2024-04-02
  • Redis分布式锁实例分析讲解
    目录1 一人一单并发安全问题2 分布式锁的原理和实现2.1 什么是分布式锁2.2 分布式锁的实现1 一人一单并发安全问题 之前一人一单的业务使用的悲观锁,在分布式系统下,是无法生效的。 理想的情况下是这样的:一个线程成功...
    99+
    2022-12-06
    Redis分布式锁总结 Redis分布式锁
  • Redisson分布式限流的实现原理解析
    目录正文RRateLimiter使用RRateLimiter的实现RRateLimiter使用时注意事项RRateLimiter是非公平限流器Rate不要设置太大限流的上限取决于Redis单实例的性能分布式限流的本质正文...
    99+
    2023-02-12
    Redisson分布式限流 Redisson 分布式
  • 如何利用Redis和Node.js实现分布式缓存功能
    如何利用Redis和Node.js实现分布式缓存功能Redis是一个开源的内存数据库,其提供了快速可扩展的键值存储,常用于缓存、消息队列和数据存储等场景。Node.js是一个基于Chrome V8引擎的JavaScript运行时,适用于高并...
    99+
    2023-10-22
    redis nodejs 分布式缓存
  • Go语言分布式打包:如何实现高效的load?
    随着互联网的快速发展,分布式系统成为了当前互联网时代的一大趋势。分布式系统能够通过将计算任务分散到多台计算机上,提高系统的可靠性和可扩展性。在分布式系统中,打包是一个常见的操作,将多个小文件或者数据打包成一个大文件,便于传输和存储。在这篇...
    99+
    2023-10-04
    分布式 打包 load
  • Spring boot 整合 Redisson实现分布式锁并验证功能
    目录简述1. 在idea中新建spring boot工程并引入所需依赖2. 编写相关代码实现3. 模拟实际环境验证3.1 下载idea的docker插件并配置相关镜像信息3.2 将s...
    99+
    2024-04-02
  • 如何利用Redis和C#实现分布式缓存功能
    如何利用Redis和C#实现分布式缓存功能简介:在分布式系统中,缓存是一个重要的组件,它可以减少数据库的负载,提高系统的性能和可伸缩性。Redis是一个流行的缓存服务器,它的简单性、高效性和可扩展性使其成为了一个理想的选择。本文将介绍如何使...
    99+
    2023-10-22
    缓存 分布式 redis
  • 如何在 IDE 中完美地实现 ASP 和 Load 分布式
    随着互联网的不断发展,分布式计算也逐渐成为了一种必要的技术手段。在分布式计算中,ASP 和 Load 是两个非常重要的概念,它们可以帮助我们更好地分配计算资源和处理任务,提高系统的性能和可靠性。本文将介绍。 一、ASP 分布式 ASP(A...
    99+
    2023-09-10
    load 分布式 ide
  • Go 分布式链路追踪实现原理解析
    目录为什么需要分布式链路追踪系统微服务架构给运维、排障带来新挑战分布式链路追踪系统如何帮助我们分布式链路追踪系统架构概览核心概念一般架构协议标准和开源实现应用侧调用链跟踪实现方案概览...
    99+
    2024-04-02
软考高级职称资格查询
编程网,编程工程师的家园,是目前国内优秀的开源技术社区之一,形成了由开源软件库、代码分享、资讯、协作翻译、讨论区和博客等几大频道内容,为IT开发者提供了一个发现、使用、并交流开源技术的平台。
  • 官方手机版

  • 微信公众号

  • 商务合作