广告
返回顶部
首页 > 资讯 > 服务器 >ApacheHudi性能提升三倍的查询优化
  • 440
分享到

ApacheHudi性能提升三倍的查询优化

2024-04-02 19:04:59 440人浏览 八月长安
摘要

目录1. 背景2. 设置3. 测试4. 结果5. 总结从 Hudi 0.10.0版本开始,我们很高兴推出在数据库领域中称为 Z-Order 和 Hilbert 空间填充曲线的高级数据

从 Hudi 0.10.0版本开始,我们很高兴推出在数据库领域中称为 Z-Order 和 Hilbert 空间填充曲线的高级数据布局优化技术的支持。

1. 背景

Amazon EMR 团队最近发表了一篇很不错的文章展示了对数据进行聚簇是如何提高查询性能的,为了更好地了解发生了什么以及它与空间填充曲线的关系,让我们仔细研究该文章的设置。

文章中比较了 2 个 Apache Hudi 表(均来自 Amazon Reviews 数据集):

未聚簇的 amazon_reviews 表(即数据尚未按任何特定键重新排序

amazon_reviews_clustered 聚簇表。当数据被聚簇后,数据按字典顺序排列(这里我们将这种排序称为线性排序),排序列为star_ratingtotal_votes两列(见下图)

为了展示查询性能的改进,对这两个表执行以下查询:

这里要指出的重要考虑因素是查询指定了排序的两个列(star_rating 和 total_votes)。但不幸的是这是线性/词典排序的一个关键限制,如果添加更多列,排序的价值会会随之减少。

从上图可以看到,对于按字典顺序排列的 3 元组整数,只有第一列能够对所有具有相同值的记录具有关键的局部性属性:例如所有记录都具有以“开头的值” 1"、"2"、"3"(在第一列中)很好地聚簇在一起。但是如果尝试在第三列中查找所有值为"5"的值,会发现这些值现在分散在所有地方,根本没有局部性,过滤效果很差。

提高查询性能的关键因素是局部性:它使查询能够显着减少搜索空间和需要扫描、解析等的文件数量。

但是这是否意味着如果我们按表排序的列的第一个(或更准确地说是前缀)以外的任何内容进行过滤,我们的查询就注定要进行全面扫描?不完全是,局部性也是空间填充曲线在枚举多维空间时启用的属性(我们表中的记录可以表示为 N 维空间中的点,其中 N 是我们表中的列数)

那么它是如何工作的?我们以 Z 曲线为例:拟合二维平面的 Z 阶曲线如下所示:

可以看到按照路径,不是简单地先按一个坐标 ("x") 排序,然后再按另一个坐标排序,它实际上是在对它们进行排序,就好像这些坐标的位已交织成单个值一样:

在线性排序的情况下局部性仅使用第一列相比,该方法的局部性使用到所有列。

以类似的方式,希尔伯特曲线允许将 N 维空间中的点(我们表中的行)映射到一维曲线上,基本上对它们进行排序,同时仍然保留局部性的关键属性,在此处阅读有关希尔伯特曲线的更多详细信息,到目前为止我们的实验表明,使用希尔伯特曲线对数据进行排序会有更好的聚簇和性能结果。

现在让我们来看看它的实际效果!

2. 设置

我们将再次使用 Amazon Reviews 数据集,但这次我们将使用 Hudi 按 product_idcustomer_id 列元组进行 Z-Order排序,而不是聚簇或线性排序。

数据集不需要特别的准备,可以直接从 S3 中以 Parquet 格式下载并将其直接用作 spark 将其摄取到 Hudi 表。

启动spark-shell

./bin/spark-shell --master 'local[4]' --driver-memory 8G --executor-memory 8G \
  --jars ../../packaging/hudi-spark-bundle/target/hudi-spark3-bundle_2.12-0.10.0.jar \
  --packages org.apache.spark:spark-avro_2.12:2.4.4 \
  --conf 'spark.serializer=org.apache.spark.serializer.KryoSerializer'

导入Hudi表

import org.apache.hadoop.fs.{FileStatus, Path}
import Scala.collection.JavaConversions._
import org.apache.spark.sql.SaveMode._
import org.apache.hudi.{DataSourceReadOptions, DataSourceWriteOptions}
import org.apache.hudi.DataSourceWriteOptions._
import org.apache.hudi.common.fs.FSUtils
import org.apache.hudi.common.table.HoodieTableMetaClient
import org.apache.hudi.common.util.ClusteringUtils
import org.apache.hudi.config.HoodieClusterinGConfig
import org.apache.hudi.config.HoodieWriteConfig._
import org.apache.spark.sql.DataFrame
import java.util.stream.Collectors
val layoutOptStrategy = "z-order"; // OR "hilbert"
val inputPath = s"file:///${System.getProperty("user.home")}/datasets/amazon_reviews_parquet"
val tableName = s"amazon_reviews_${layoutOptStrategy}"
val outputPath = s"file:///tmp/hudi/$tableName"
def safeTableName(s: String) = s.replace('-', '_')
val commonOpts =
  Map(
    "hoodie.compact.inline" -> "false",
    "hoodie.bulk_insert.shuffle.parallelism" -> "10"
  )
////////////////////////////////////////////////////////////////
// Writing to Hudi
////////////////////////////////////////////////////////////////
val df = spark.read.parquet(inputPath)
df.write.fORMat("hudi")
  .option(DataSourceWriteOptions.TABLE_TYPE.key(), COW_TABLE_TYPE_OPT_VAL)
  .option("hoodie.table.name", tableName)
  .option(PRECOMBINE_FIELD.key(), "review_id")
  .option(RECORDKEY_FIELD.key(), "review_id")
  .option(DataSourceWriteOptions.PARTITIONPATH_FIELD.key(), "product_cateGory")
  .option("hoodie.clustering.inline", "true")
  .option("hoodie.clustering.inline.max.commits", "1")
  // NOTE: Small file limit is intentionally kept _ABOVE_ target file-size max threshold for Clustering,
  // to force re-clustering
  .option("hoodie.clustering.plan.strategy.small.file.limit", String.valueOf(1024 * 1024 * 1024)) // 1Gb
  .option("hoodie.clustering.plan.strategy.target.file.max.bytes", String.valueOf(128 * 1024 * 1024)) // 128Mb
  // NOTE: We're increasing cap on number of file-groups produced as part of the Clustering run to be able to accommodate for the 
  // whole dataset (~33Gb)
  .option("hoodie.clustering.plan.strategy.max.num.groups", String.valueOf(4096))
  .option(HoodieClusteringConfig.LAYOUT_OPTIMIZE_ENABLE.key, "true")
  .option(HoodieClusteringConfig.LAYOUT_OPTIMIZE_STRATEGY.key, layoutOptStrategy)
  .option(HoodieClusteringConfig.PLAN_STRATEGY_SORT_COLUMNS.key, "product_id,customer_id")
  .option(DataSourceWriteOptions.OPERATION.key(), DataSourceWriteOptions.BULK_INSERT_OPERATION_OPT_VAL)
  .option(BULK_INSERT_SORT_MODE.key(), "NONE")
  .options(commonOpts)
  .mode(ErrorIfExists)

3. 测试

每个单独的测试请在单独的 spark-shell 中运行,以避免缓存影响测试结果。

////////////////////////////////////////////////////////////////
// Reading
///////////////////////////////////////////////////////////////

// Temp Table w/ Data Skipping DISABLED
val readDf: DataFrame =
  spark.read.option(DataSourceReadOptions.ENABLE_DATA_SKIPPING.key(), "false").format("hudi").load(outputPath)

val rawSnapshotTableName = safeTableName(s"${tableName}_sql_snapshot")

readDf.createOrReplaceTempView(rawSnapshotTableName)


// Temp Table w/ Data Skipping ENABLED
val readDfSkip: DataFrame =
  spark.read.option(DataSourceReadOptions.ENABLE_DATA_SKIPPING.key(), "true").format("hudi").load(outputPath)

val dataSkippingSnapshotTableName = safeTableName(s"${tableName}_sql_snapshot_skipping")

readDfSkip.createOrReplaceTempView(dataSkippingSnapshotTableName)

// Query 1: Total votes by product_category, for 6 months
def runQuery1(tableName: String) = {
  // Query 1: Total votes by product_category, for 6 months
  spark.sql(s"SELECT sum(total_votes), product_category FROM $tableName WHERE review_date > '2013-12-15' AND review_date < '2014-06-01' GROUP BY product_category").show()
}

// Query 2: Average star rating by product_id, for some product
def runQuery2(tableName: String) = {
  spark.sql(s"SELECT avg(star_rating), product_id FROM $tableName WHERE product_id in ('B0184XC75U') GROUP BY product_id").show()
}

// Query 3: Count number of reviews by customer_id for some 5 customers
def runQuery3(tableName: String) = {
  spark.sql(s"SELECT count(*) as num_reviews, customer_id FROM $tableName WHERE customer_id in ('53096570','10046284','53096576','10000196','21700145') GROUP BY customer_id").show()
}

//
// Query 1: Is a "wide" query and hence it's expected to touch a lot of files
//
scala> runQuery1(rawSnapshotTableName)
+----------------+--------------------+
|sum(total_votes)|    product_category|
+----------------+--------------------+
|         1050944|                  PC|
|          867794|             Kitchen|
|         1167489|                Home|
|          927531|            Wireless|
|            6861|               Video|
|           39602| Digital_Video_Games|
|          954924|Digital_Video_Dow...|
|           81876|             Luggage|
|          320536|         Video_Games|
|          817679|              Sports|
|           11451|  Mobile_Electronics|
|          228739|  Home_Entertainment|
|         3769269|Digital_Ebook_Pur...|
|          252273|                Baby|
|          735042|             Apparel|
|           49101|    Major_Appliances|
|          484732|             Grocery|
|          285682|               Tools|
|          459980|         Electronics|
|          454258|            Outdoors|
+----------------+--------------------+
only showing top 20 rows

scala> runQuery1(dataSkippingSnapshotTableName)
+----------------+--------------------+
|sum(total_votes)|    product_category|
+----------------+--------------------+
|         1050944|                  PC|
|          867794|             Kitchen|
|         1167489|                Home|
|          927531|            Wireless|
|            6861|               Video|
|           39602| Digital_Video_Games|
|          954924|Digital_Video_Dow...|
|           81876|             Luggage|
|          320536|         Video_Games|
|          817679|              Sports|
|           11451|  Mobile_Electronics|
|          228739|  Home_Entertainment|
|         3769269|Digital_Ebook_Pur...|
|          252273|                Baby|
|          735042|             Apparel|
|           49101|    Major_Appliances|
|          484732|             Grocery|
|          285682|               Tools|
|          459980|         Electronics|
|          454258|            Outdoors|
+----------------+--------------------+
only showing top 20 rows

//
// Query 2: Is a "pointwise" query and hence it's expected that data-skipping should substantially reduce number 
// of files scanned (as compared to Baseline)
//
// NOTE: That Linear Ordering (as compared to Space-curve based on) will have similar effect on performance reducing
// total # of Parquet files scanned, since we're querying on the prefix of the ordering key
//
scala> runQuery2(rawSnapshotTableName)
+----------------+----------+
|avg(star_rating)|product_id|
+----------------+----------+
|             1.0|B0184XC75U|
+----------------+----------+


scala> runQuery2(dataSkippingSnapshotTableName)
+----------------+----------+
|avg(star_rating)|product_id|
+----------------+----------+
|             1.0|B0184XC75U|
+----------------+----------+

//
// Query 3: Similar to Q2, is a "pointwise" query, but querying other part of the ordering-key (product_id, customer_id)
// and hence it's expected that data-skipping should substantially reduce number of files scanned (as compared to Baseline, Linear Ordering).
//
// NOTE: That Linear Ordering (as compared to Space-curve based on) will _NOT_ have similar effect on performance reducing
// total # of Parquet files scanned, since we're NOT querying on the prefix of the ordering key
//
scala> runQuery3(rawSnapshotTableName)
+-----------+-----------+
|num_reviews|customer_id|
+-----------+-----------+
|         50|   53096570|
|          3|   53096576|
|         25|   10046284|
|          1|   10000196|
|         14|   21700145|
+-----------+-----------+

scala> runQuery3(dataSkippingSnapshotTableName)
+-----------+-----------+
|num_reviews|customer_id|
+-----------+-----------+
|         50|   53096570|
|          3|   53096576|
|         25|   10046284|
|          1|   10000196|
|         14|   21700145|
+-----------+-----------+

4. 结果

我们总结了以下的测试结果

可以看到多列线性排序对于按列(Q2、Q3)以外的列进行过滤的查询不是很有效,这与空间填充曲线(Z-order 和 Hilbert)形成了非常明显的对比,后者将查询时间加快多达 3倍 。值得注意的是性能提升在很大程度上取决于基础数据和查询,在我们内部数据的基准测试中,能够实现超过 11倍 的查询性能改进!

5. 总结

Apache Hudi v0.10 为开源带来了新的布局优化功能 Z-order 和 Hilbert。 使用这些行业领先的布局优化技术可以为用户查询带来显着的性能提升和成本节约!

以上就是Apache Hudi性能提升三倍的查询优化的详细内容,更多关于Apache Hudi查询优化的资料请关注编程网其它相关文章!

--结束END--

本文标题: ApacheHudi性能提升三倍的查询优化

本文链接: https://www.lsjlt.com/news/144494.html(转载时请注明来源链接)

有问题或投稿请发送至: 邮箱/279061341@qq.com    QQ/279061341

本篇文章演示代码以及资料文档资料下载

下载Word文档到电脑,方便收藏和打印~

下载Word文档
猜你喜欢
  • ApacheHudi性能提升三倍的查询优化
    目录1. 背景2. 设置3. 测试4. 结果5. 总结从 Hudi 0.10.0版本开始,我们很高兴推出在数据库领域中称为 Z-Order 和 Hilbert 空间填充曲线的高级数据...
    99+
    2022-11-13
  • DB2性能优化 – 如何通过db2优化工具提升SQL查询效率
           我们都知道,应用系统在运行一段时间后,用户报告系统运行会变慢,使他们不能完成所有的工作,完成事务和处理查询花费过长时间,或者应用程序...
    99+
    2022-10-18
  • MySQL 查询 - 排除某些字段的SQL查询,提升查询性能
    序言 某些时候,需要查询拥有很多很多字段的表,但是查询表时,其中有一些不需要查询的字段,会增加查询的负担,所以这时候,就需要排除这些冗余的字段,指定需要的字段查询,提升查询性能。 笔者尝试过使用 - ...
    99+
    2023-10-20
    mysql 数据库
  • 新建一个索引能够同时提升三条SQL的查询性能
    如题CREATE TABLE `score` (   `id` int(11) NOT NULL,   `...
    99+
    2022-10-18
  • 如何优化MySQL的查询性能?
    如何优化MySQL的查询性能?MySQL是一款广泛应用于Web开发的关系型数据库管理系统。然而,在处理大量数据和复杂查询时,MySQL的查询性能可能会受到影响,从而导致应用程序的响应时间变慢。为了提高MySQL的查询性能,我们可以采取以下几...
    99+
    2023-10-22
    索引优化 数据库缓存 查询重写
  • 提升MySQL查询效率及查询速度优化的方法是什么
    今天小编给大家分享一下提升MySQL查询效率及查询速度优化的方法是什么的相关知识点,内容详细,逻辑清晰,相信大部分人都还太了解这方面的知识,所以分享这篇文章给大家参考一下,希望大家阅读完这篇文章后有所收获,...
    99+
    2023-05-12
    mysql
  • 百万级高并发mongodb集群性能数十倍提升优化的实践过程
    百万级高并发mongodb集群性能数十倍提升优化的实践过程,很多新手对此不是很清楚,为了帮助大家解决这个难题,下面小编将为大家详细讲解,有这方面需求的人可以来学习下,希望你能有所收获。背景线上某集群峰值TPS超过100万/秒左右(主要为写流...
    99+
    2023-06-05
  • 提升MySQL查询效率及查询速度优化的四个方法详析
    目录一、利用EXPLAIN关键字来评估查询语句中的缺陷二、数据比较时采用相同类型的列以提高查询效率三、在Like关键字的起始处通配符要谨慎使用四、尽量使用其它形式来代替Like关键字总结在任何一个数据库中,查询优化都是不...
    99+
    2023-04-21
    mysql如何提高查询效率 提升mysql查询性能 MySQL查询优化
  • MySQL性能调优之查询优化的方法
    本篇内容介绍了“MySQL性能调优之查询优化的方法”的有关知识,在实际案例的操作过程中,不少人都会遇到这样的困境,接下来就让小编带领大家学习一下如何处理这些情况吧!希望大家仔细阅读,能够学有所成!一、查询慢...
    99+
    2022-10-19
  • 百万级高并发mongodb集群性能数十倍提升优化实践是怎样的
    百万级高并发mongodb集群性能数十倍提升优化实践是怎样的,很多新手对此不是很清楚,为了帮助大家解决这个难题,下面小编将为大家详细讲解,有这方面需求的人可以来学习下,希望你能有所收获。1、背景 线上某集群...
    99+
    2022-10-18
  • MySQL提升大量数据查询效率的优化神器
    目录前言查看SQL执行频率 定位低效率执行SQLexplain分析执行计划trace分析优化器执行计划使用索引优化SQL优化大量插入数据优化insert语句优化order by语句2、两种排序方式3、Files...
    99+
    2022-07-06
    MySQL提升大量数据查询效率 MySQL优化神器
  • .NET使用Collections.Pooled提升性能优化的方法
    目录简介Collections.Pooled如何使用性能对比PooledList<T>PooledDictionary<TKey, TValue>Pooled...
    99+
    2022-11-13
  • Oracle分布式查询语句的性能优化
    分析: 由于优化器无法判断或获得远端表的统计信息,故原执行计划默认会采取把远程表(无论大小)拉到本地再连接的方式执行, 这样如果远程表较大的情况将会比较缓慢,像上述查询耗时在1分钟以上。 ...
    99+
    2022-10-18
  • MySQL大数据查询性能优化的示例
    这篇文章将为大家详细讲解有关MySQL大数据查询性能优化的示例,小编觉得挺实用的,因此分享给大家做个参考,希望大家阅读完这篇文章后可以有所收获。MySQL性能优化包括表的优化与列类型选择,表的优化可以细分为...
    99+
    2022-10-18
  • 如何通过索引优化提升PHP与MySQL的查询速度?
    摘要:在PHP与MySQL开发中,经常遇到查询速度慢的问题。本文将介绍如何通过使用索引优化提升查询速度,并提供具体的代码示例,帮助读者更好地理解和应用。引言:在网站开发过程中,查询数据库是非常常见的操作。然而,当数据量较大的时候,查询速度可...
    99+
    2023-10-21
    查询 优化 索引
  • 如何优化PHP中的数据库查询性能
    随着业务的不断扩大和数据的增加,数据库的查询性能已经成为了许多 PHP 应用程序开发人员的关注重点。优化数据库查询性能能够提升应用程序的整体性能和稳定性,使其更加适应高并发、大规模数据量等复杂的应用场景。本文将介绍一些优化 PHP 中的数据...
    99+
    2023-05-23
    性能调优 PHP性能优化 数据库查询优化
  • Oracle提升查询性能之-简单范围分区表的创建
    分区表的优点: 1.提高查询性能:只需要搜索特定分区,而非整张表,提高了查询速度。 2.节约维护时间:单个分区的数据装载,索引重建,备份,维护远远小于整张表的维护时间。下面就让我们来创建一张分区表 第一步:...
    99+
    2022-10-18
  • PHP底层的代码优化与性能提升实战
    PHP是一种广泛应用于Web开发领域的开源服务器脚本语言,其简洁易学、功能强大、能够和众多开源数据库完美搭配等优点,让它在Web应用开发中大放异彩。然而在开发过程中,往往需要考虑PHP性能的问题。本文将介绍PHP的性能优化与提升实战,同时附...
    99+
    2023-11-08
    性能 优化 PHP
  • 如何实现MySQL底层优化:日志系统的优化与性能提升
    MySQL是目前最流行的关系型数据库管理系统之一,作为网站及应用程序后端的核心组件,其性能优化显得尤为关键。其中,日志系统是MySQL的重要组成部分,其性能对数据库的整体性能影响极大。因此,本文将深入讨论MySQL日志系统的优化和性能提升。...
    99+
    2023-11-08
    MySQL 优化 日志
  • 通过MySQL慢查询优化MySQL性能的方法讲解
    随着访问量的上升,MySQL数据库的压力就越大,几乎大部分使用MySQL架构的web应用在数据库上都会出现性能问题,通过mysql慢查询日志跟踪有问题的查询非常有用,可以分析出当前程序里有很耗费资源的sql...
    99+
    2022-10-18
软考高级职称资格查询
编程网,编程工程师的家园,是目前国内优秀的开源技术社区之一,形成了由开源软件库、代码分享、资讯、协作翻译、讨论区和博客等几大频道内容,为IT开发者提供了一个发现、使用、并交流开源技术的平台。
  • 官方手机版

  • 微信公众号

  • 商务合作