iis服务器助手广告广告
返回顶部
首页 > 资讯 > 精选 >怎么理解spark的自定义分区和排序及spark与jdbc
  • 235
分享到

怎么理解spark的自定义分区和排序及spark与jdbc

2023-06-02 20:06:42 235人浏览 薄情痞子
摘要

这篇文章将为大家详细讲解有关怎么理解spark的自定义分区和排序及spark与jdbc,文章内容质量较高,因此小编分享给大家做个参考,希望大家阅读完这篇文章后对相关知识有一定的了解。//自定义分区import org.apache

这篇文章将为大家详细讲解有关怎么理解spark的自定义分区和排序及spark与jdbc,文章内容质量较高,因此小编分享给大家做个参考,希望大家阅读完这篇文章后对相关知识有一定的了解。

//自定义分区import org.apache.spark.SparkConfimport org.apache.spark.SparkConfimport org.apache.spark.SparkContextimport org.apache.spark.Partitionerobject PrimitivePartitionTest {  def main(args: Array[String]): Unit = {    val conf = new SparkConf    conf.setMaster("local[2]").setAppName("Partitioner")    val context = new SparkContext(conf)    val rdd = context.parallelize(List(("hgs",2),("wd",44),("cm",99),("zz",100),("xzhh",67)), 2)    //实例化类,并设置分区类    val partitioner = new CustomPartitioner(2)    val rdd1 = rdd.partitionBy(partitioner)    rdd1.saveAsTextFile("c:\\partitioner")    context.stop()      }}//自定义分区类继承spark的Partitionerclass CustomPartitioner(val partitions:Int ) extends Partitioner{         def numPartitions: Int= this.partitions       def getPartition(key: Any): Int={      if(key.toString().length()<=2)        0      else        1          }}
//自定义排序package hgs.spark.othertestimport org.apache.spark.SparkConfimport org.apache.spark.SparkContextimport Scala.math.Ordered//自定义排序第一种实现方式,通过继承orderedclass Student(val name:String,var age:Int) extends Ordered[Student] with Serializable{  def compare(that: Student): Int={    return this.age-that.age  }}class Boy(val name:String,var age:Int) extends  Serializable{  }//第二种方式通过实现隐式转换实现object MyPredef{  implicit def toOrderBoy = new Ordering[Boy]{   def compare(x: Boy, y: Boy): Int={     x.age - y.age   }  }}//引入隐式转换import MyPredef._object CutstomOrder {   def main(args: Array[String]): Unit = {     val conf = new SparkConf()     conf.setMaster("local[2]").setAppName("CutstomOrder")     val context = new SparkContext(conf)     val rdd = context.parallelize(List(("hgs",2),("wd",44),("cm",99),("zz",100),("xzhh",67)), 2)     //下面的第二个参数false为降序排列     //val rdd_sorted = rdd.sortBy(f=>new Student(f._1,f._2), false, 1)     val rdd_sorted = rdd.sortBy(f=>new Boy(f._1,f._2), false, 1)     rdd_sorted.saveAsTextFile("d:\\ordered")     context.stop()   } }
//JDBCimport org.apache.spark.SparkConfimport org.apache.spark.SparkContextimport org.apache.spark.rdd.JdbcRDDimport java.sql.Connectionimport java.sql.DriverManagerimport java.sql.ResultSetimport scala.collection.mutable.ListBufferobject DataFromJdbcToSpark {  def main(args: Array[String]): Unit = {     val conf = new SparkConf()    conf.setMaster("local[2]").setAppName("BroadCastTest")    val context = new SparkContext(conf)    val sql = "select name,age from test where id>=? and id <=?"    var list = new ListBuffer[(String,Int)]()    //第七个参数是一个自定义的函数,spark会调用该函数,完成自定义的逻辑,y的数据类型是ResultSet,该函数不可以想自己定义的数组添加数据,    //应为应用的函数会将结果保存在JdbcRDD中    val jdbcRDD = new JdbcRDD(context,getConnection,sql,1,8,2,y=>{    (y.getString(1),y.getInt(2))           })          println(jdbcRDD.collect().toBuffer)     context.stop()      }      def getConnection():Connection={    Class.forName("com.mysql.jdbc.Driver")    val  conn = DriverManager.getConnection("jdbc:Mysql://192.168.6.133:3306/hgs","root","123456");    conn  }}//----------------------------------------------------------------------package hgs.spark.othertestimport java.sql.Connectionimport java.sql.DriverManagerimport org.apache.commons.dbutils.QueryRunnerimport org.apache.spark.SparkConfimport org.apache.spark.SparkContext//将spark计算后的结果录入数据库object DataFromSparktoJdbc {    def main(args: Array[String]): Unit = {        val conf = new SparkConf    conf.setMaster("local[2]").setAppName("DataFromSparktoJdbc")    val context = new SparkContext(conf)    val addressrdd= context.textFile("d:\\Words")    val words = addressrdd.flatMap(_.split(" ")).map(x=>(x,1)).reduceByKey(_+_)    //println(words.partitions.size)    var p:Int =0    words.foreachPartition(iter=>{      //每个分区一个链接      val qr = new QueryRunner()      val conn = getConnection      println(conn)      val sql = s"insert into words values(?,?)"      //可以修改为批量插入效率更高      while(iter.hasNext){        val tpm = iter.next()          val obj1 :Object = tpm._1        val obj2 :Object = new Integer(tpm._2)        //obj1+conn.toString()可以看到数据库的插入数据作用有三个不同的链接        qr.update(conn, sql,obj1+conn.toString(),obj2)      }      //println(conn)      //println(p)      conn.close()          })    words.saveAsTextFile("d:\\wordresult")  }  def getConnection():Connection={    Class.forName("com.mysql.jdbc.Driver")    val  conn = DriverManager.getConnection("jdbc:mysql://192.168.6.133:3306/hgs","root","123456");    conn  }  }
//广播变量package hgs.spark.othertestimport org.apache.spark.SparkConfimport org.apache.spark.SparkContextobject BroadCastTest{  def main(args: Array[String]): Unit = {    val conf = new SparkConf()    conf.setMaster("local[2]").setAppName("BroadCastTest")    val context = new SparkContext(conf)    val addressrdd= context.textFile("d:\\address")    val splitaddrdd =     addressrdd.map(x=>{            val cs = x.split(",")      (cs(0),cs(1))    }).collect().toMap    //广播变量,数据被缓存在每个节点,减少了节点之间的数据传送,可以有效的增加效率,广播出去的可以是任意的数据类型    val maprdd = context.broadcast(splitaddrdd)    val namerdd = context.textFile("d:\\name")        val result = namerdd.map(x=>{      //该出使用了广播的出去的数组      maprdd.value.getOrElse(x, "UnKnown")          })    println(result.collect().toBuffer)    context.stop()  }}
其他一些知识点1.spark 广播变量 rdd.brodcastz(rdd),广播变量的用处是将数据汇聚传输到各个excutor上面,这样在做数据处理的时候减少了数据的传输2.wordcount程序context.textFile(args(0),1).flatMap(_.split(" ")).map((_,1)).reduceByKey(_+_) wordcount程序代码,一个wordcount会产生5个RDDsc.textFile() 会产生两个RDD 1.hadoopRDD-> MapPartitionsRDD   flatMap() 会产生MapPartitionsRDD   map 会产生MapPartitionsRDD   reduceByKey 产生ShuuledRDD   saveAsTextFile   3.缓存数据到内存 rdd.cache   清理缓存 rdd.unpersist(true),rdd.persist存储及级别 cache方法调用的是persist方法4.spark 远程debug,需要设置sparkcontext.setMaster("spark://xx.xx.xx.xx:7077").setjar("d:/jars/xx.jar")

关于怎么理解spark的自定义分区和排序及spark与jdbc就分享到这里了,希望以上内容可以对大家有一定的帮助,可以学到更多知识。如果觉得文章不错,可以把它分享出去让更多的人看到。

--结束END--

本文标题: 怎么理解spark的自定义分区和排序及spark与jdbc

本文链接: https://www.lsjlt.com/news/231154.html(转载时请注明来源链接)

有问题或投稿请发送至: 邮箱/279061341@qq.com    QQ/279061341

本篇文章演示代码以及资料文档资料下载

下载Word文档到电脑,方便收藏和打印~

下载Word文档
猜你喜欢
  • 怎么理解spark的自定义分区和排序及spark与jdbc
    这篇文章将为大家详细讲解有关怎么理解spark的自定义分区和排序及spark与jdbc,文章内容质量较高,因此小编分享给大家做个参考,希望大家阅读完这篇文章后对相关知识有一定的了解。//自定义分区import org.apache...
    99+
    2023-06-02
  • 关于Spark Streaming感知kafka动态分区的问题该怎么理解
    关于Spark Streaming感知kafka动态分区的问题该怎么理解,针对这个问题,这篇文章详细介绍了相对应的分析和解答,希望可以帮助更多想解决这个问题的小伙伴找到更简单易行的方法。小编主要是讲解Spark Streaming与kafk...
    99+
    2023-06-19
软考高级职称资格查询
编程网,编程工程师的家园,是目前国内优秀的开源技术社区之一,形成了由开源软件库、代码分享、资讯、协作翻译、讨论区和博客等几大频道内容,为IT开发者提供了一个发现、使用、并交流开源技术的平台。
  • 官方手机版

  • 微信公众号

  • 商务合作