返回顶部
首页 > 资讯 > 数据库 >大数据之使用Spark全量抽取MySQL的数据到Hive数据库
  • 513
分享到

大数据之使用Spark全量抽取MySQL的数据到Hive数据库

hivespark大数据数据库scala 2023-09-17 08:09:04 513人浏览 薄情痞子
摘要

文章目录 前言 一、读题分析 二、使用步骤 1.导入配置文件到pom.xml 2.代码部分 三、重难点分析 总结 前言 本题来源于全国职业技能大赛之大数据技术赛项赛题-离线数据处理-数据抽取(其他暂不透露) 题目:编写S

文章目录

前言

一、读题分析

二、使用步骤

1.导入配置文件到pom.xml

2.代码部分

三、重难点分析

总结


前言

本题来源于全国职业技能大赛之大数据技术赛项赛题-离线数据处理-数据抽取(其他暂不透露)

题目:编写Scala代码,使用sparkMysql的shtd_industry库中表EnvironmentData,ChangeRecord,BaseMachine,MachineData,ProduceRecord全量抽取到Hive的ods库(需自建)中对应表environmentdata,changerecord,basemachine, machinedata, producerecord中。

以下面题目为例:

抽取mysql的shtd_industry库中EnvironmentData表的全量数据进入Hive的ods库中表environmentdata,字段排序、类型不变,同时添加静态分区,分区字段类型为String,且值为当前日期的前一天日期(分区字段格式为yyyyMMdd)。并在hive cli执行show partitions ods.environmentdata命令,将结果截图粘贴至对应报告中;


提示:以下是本篇文章正文内容,下面案例可供参考(使用Scala语言编写)

一、读题分析

涉及组件:Spark,Mysql,Hive

涉及知识点:

  1. Spark读取数据库数据
  2. DataFrameapi的使用(重点)
  3. Spark写入数据库数据
  4. Hive数据库的基本操作

二、使用步骤

1.导入配置文件到pom.xml

                            org.apache.spark            spark-sql_2.11            ${spark.version}                                    org.apache.spark            spark-hive_2.11            ${spark.version}                                    mysql            mysql-connector-java            5.1.37        

2.代码部分

由于不是很难,直接上代码,代码如下(示例):

package A.offlineDataProcessing.shtd_industry.task1_dataExtractionimport org.apache.spark.sql.functions.litimport org.apache.spark.sql.{DataFrame, SparkSession}import java.text.SimpleDateFORMatimport java.util.{Calendar, Properties}object SparkToMysqlToHive {  def main(args: Array[String]): Unit = {    // 创建Spark对象会话    val spark = SparkSession.builder()      .appName("MySQL to Hive")      .master("spark://bigdata1:7077")      .enableHiveSupport().getOrCreate()    // 连接MySQL数据库并设置属性    val jdbcUrl = "jdbc:mysql://bigdata1:3306/shtd_industry"    val table = "EnvironmentData"    val properties = new Properties    properties.put("user", "root")    properties.put("passWord", "123456")    // Read data from MySQL    val df: DataFrame = spark.read.jdbc(jdbcUrl, table, properties)    println("-------------------自定义操作-------------------------")    // Add partition column    val dateFormat = new SimpleDateFormat("yyyyMMdd")    //    第一个getTime返回的是一个 Date 对象    //    第二个 getTime 方法返回的是一个整数值,表示此 Date 对象表示的时间距离标准基准时间(1970年1月1日00:00:00 GMT)的毫秒数。    val yesterday = dateFormat.format(Calendar.getInstance().getTime.getTime - 24 * 60 * 60 * 1000)    //对MySQL来的数据进行withCoulum操作,有就修改,没有就添加    val dfWithPartition: DataFrame = df.withColumn("etldate", lit(yesterday))    println("-------------------写入数据-------------------------")    // Write data to Hive    //    mode模式为覆盖,还有append为追加    //    partitionBy 根据指定列进行分区    //    saveAsTable保存表    dfWithPartition.write.mode("overwrite")      .partitionBy("etldate")      .saveAsTable("ods.environmentdata")  }}

hive数据库相关的操作在这不做演示


三、重难点分析

没有难点,主要涉及能否自定义函数完成任务需求

val dateFormat = new SimpleDateFormat("yyyyMMdd")    //    第一个getTime返回的是一个 Date 对象    //    第二个 getTime 方法返回的是一个整数值,表示此 Date 对象表示的时间距离标准基准时间(1970年1月1日00:00:00 GMT)的毫秒数。    val yesterday = dateFormat.format(Calendar.getInstance().getTime.getTime - 24 * 60 * 60 * 1000)    //对MySQL来的数据进行withCoulum操作,有就修改,没有就添加    val dfWithPartition: DataFrame = df.withColumn("etldate", lit(yesterday))

总结

本文仅仅介绍了Spark读取MySQL的数据到hive数据库的操作,spark提供了许多方法,我们不必写SQL语法就可以直接对数据进行操作,还是很方便的,并且难度也不高(比flink简单)。

如转载请标明出处

来源地址:https://blog.csdn.net/qq_36920766/article/details/129774263

您可能感兴趣的文档:

--结束END--

本文标题: 大数据之使用Spark全量抽取MySQL的数据到Hive数据库

本文链接: https://www.lsjlt.com/news/410909.html(转载时请注明来源链接)

有问题或投稿请发送至: 邮箱/279061341@qq.com    QQ/279061341

猜你喜欢
软考高级职称资格查询
编程网,编程工程师的家园,是目前国内优秀的开源技术社区之一,形成了由开源软件库、代码分享、资讯、协作翻译、讨论区和博客等几大频道内容,为IT开发者提供了一个发现、使用、并交流开源技术的平台。
  • 官方手机版

  • 微信公众号

  • 商务合作