本篇文章为大家展示了如何解析spark-streaming中的SocketTextStream,内容简明扼要并且容易理解,绝对能使你眼前一亮,通过这篇文章的详细介绍希望你能有所收获。package hgs.spark.stream
本篇文章为大家展示了如何解析spark-streaming中的SocketTextStream,内容简明扼要并且容易理解,绝对能使你眼前一亮,通过这篇文章的详细介绍希望你能有所收获。
package hgs.spark.streamingimport org.apache.spark.SparkConfimport org.apache.spark.SparkContextimport org.apache.spark.streaming.Secondsimport org.apache.spark.streaming.StreaminGContextimport org.apache.spark.storage.StorageLevelimport org.apache.spark.HashPartitionerobject SocketStreamingTest { def main(args: Array[String]): Unit = { val conf = new SparkConf(); conf.setMaster("local[2]").setAppName("SocketStreaming") val context = new SparkContext(conf); //要添加spark-streaming的依赖包,spark的Seconds val streamContext = new StreamingContext(context,Seconds(5)); val ds = streamContext.socketTextStream("192.168.6.129", 8888, StorageLevel.MEMORY_ONLY); streamContext.checkpoint("d:\\chekpoint") //val ds2 = ds.flatMap(_.split(" ")).map((_,1)).reduceByKey((x,y)=>(x+y))//这种方式只是对该批次数据进行处理,并没有累计上一个批次 //updateFunc: (Iterator[(K, Seq[V], Option[S])]) K:单词, Seq[V]该批次单词出现次数列表,Option:上一次计算的结果 val updateFunc=(iter:Iterator[(String,Seq[Int],Option[Int])])=>{ //iter.flatMap(it=>Some(it._2.sum+it._3.getOrElse(0)).map((it._1,_)))//方式一 //iter.flatMap{case(x,y,z)=>{Some(y.sum+z.getOrElse(0)).map((x,_))}}//方式二 iter.flatMap(it=>Some(it._1,(it._2.sum.toInt+it._3.getOrElse(0))))//方式三 } val partitionner = new HashPartitioner(2) //通过updateStatByKey来进行累加 val ds2 = ds.flatMap(_.split(" ")).map((_,1)).updateStateByKey(updateFunc, partitionner, true) //打印 ds2.print() streamContext.start() streamContext.awaitTermination() }}
上述内容就是如何解析spark-streaming中的socketTextStream,你们学到知识或技能了吗?如果还想学到更多技能或者丰富自己的知识储备,欢迎关注编程网精选频道。
--结束END--
本文标题: 如何解析spark-streaming中的socketTextStream
本文链接: https://www.lsjlt.com/news/231145.html(转载时请注明来源链接)
有问题或投稿请发送至: 邮箱/279061341@qq.com QQ/279061341
下载Word文档到电脑,方便收藏和打印~
2024-05-24
2024-05-24
2024-05-24
2024-05-24
2024-05-24
2024-05-24
2024-05-24
2024-05-24
2024-05-24
2024-05-24
回答
回答
回答
回答
回答
回答
回答
回答
回答
回答
0