iis服务器助手广告广告
返回顶部
首页 > 资讯 > 精选 >spark-streaming-kafka怎样通过KafkaUtils.createDirectStream的方式处理数据
  • 774
分享到

spark-streaming-kafka怎样通过KafkaUtils.createDirectStream的方式处理数据

2023-06-02 19:06:10 774人浏览 独家记忆
摘要

spark-streaming-kafka怎样通过KafkaUtils.createDirectStream的方式处理数据,相信很多没有经验的人对此束手无策,为此本文总结了问题出现的原因和解决方法,通过这篇文章希望你能解决这个问题。pack

spark-streaming-kafka怎样通过KafkaUtils.createDirectStream的方式处理数据,相信很多没有经验的人对此束手无策,为此本文总结了问题出现的原因和解决方法,通过这篇文章希望你能解决这个问题。

package hgs.spark.streamingimport org.apache.spark.SparkConfimport org.apache.spark.SparkContextimport org.apache.spark.streaming.StreaminGContextimport org.apache.spark.streaming.Secondsimport org.apache.spark.streaming.kafka.KafkaUtilsimport org.apache.spark.streaming.kafka.KafkaClusterimport Scala.collection.immutable.Mapimport java.util.NoSuchElementExceptionimport org.apache.spark.SparkExceptionimport kafka.common.TopicAndPartitionimport kafka.message.MessageAndMetadataimport org.codehaus.jackson.map.deser.std.PrimitiveArrayDeserializers.StringDeserimport kafka.serializer.StringDecoderimport org.apache.spark.streaming.kafka.DirectKafkaInputDStreamimport org.apache.spark.rdd.RDDimport org.apache.spark.streaming.kafka.HasOffsetRangesimport org.apache.spark.HashPartitionerobject SparkStreamingKafkaDirectWordCount {  def main(args: Array[String]): Unit = {     val conf = new SparkConf().setAppName("KafkaWordCount").setMaster("local[5]")     conf.set("spark.streaming.kafka.maxRatePerPartition", "1")     val sc = new SparkContext(conf)     val ssc = new StreamingContext(sc,Seconds(1))      ssc.checkpoint("d:\\checkpoint")     val kafkaParams = Map[String,String](         "metadata.broker.list"->"bigdata01:9092,bigdata02:9092,bigdata03:9092",         "group.id"->"group_hgs",         "ZooKeeper.connect"->"bigdata01:2181,bigdata02:2181,bigdata03:2181")     val kc = new KafkaCluster(kafkaParams)     val topics = Set[String]("test")     //每个rdd返回的数据是(K,V)类型的,该函数规定了函数返回数据的类型     val mmdFunct = (mmd: MessageAndMetadata[String, String])=>(mmd.topic+" "+mmd.partition,mmd.message())          val rds = KafkaUtils.createDirectStream[String,String,StringDecoder,StringDecoder,(String,String)](ssc, kafkaParams, getOffsets(topics,kc,kafkaParams),mmdFunct)     val updateFunc=(iter:Iterator[(String,Seq[Int],Option[Int])])=>{        //iter.flatMap(it=>Some(it._2.sum+it._3.getOrElse(0)).map((it._1,_)))//方式一        //iter.flatMap{case(x,y,z)=>{Some(y.sum+z.getOrElse(0)).map((x,_))}}//方式二        iter.flatMap(it=>Some(it._1,(it._2.sum.toInt+it._3.getOrElse(0))))//方式三      }     val words = rds.flatMap(x=>x._2.split(" ")).map((_,1))     //val wordscount = words.map((_,1)).updateStateByKey(updateFunc, new HashPartitioner(sc.defaultMinPartitions), true)     //println(getOffsets(topics,kc,kafkaParams))     rds.foreachRDD(rdd=>{       if(!rdd.isEmpty()){         //对每个dataStreamoffset进行更新         upateOffsets(topics,kc,rdd,kafkaParams)       }     }       )          words.print()     ssc.start()     ssc.awaitTermination()           }        def getOffsets(topics : Set[String],kc:KafkaCluster,kafkaParams:Map[String,String]):Map[TopicAndPartition, Long]={       val topicAndPartitionsOrNull =  kc.getPartitions(topics)    if(topicAndPartitionsOrNull.isLeft){      throw new SparkException(s"$topics in the set may not found")    }    else{      val topicAndPartitions = topicAndPartitionsOrNull.right.get      val groups = kafkaParams.get("group.id").get      val offsetOrNull = kc.getConsumerOffsets(groups, topicAndPartitions)      if(offsetOrNull.isLeft){        println(s"$groups you assignment may not exists!now redirect to zero!")        //如果没有消费过,则从最开始的位置消费        val erliestOffset = kc.getEarliestLeaderOffsets(topicAndPartitions)        if(erliestOffset.isLeft)          throw new SparkException(s"Topics and Partions not definded not found!")        else          erliestOffset.right.get.map(x=>(x._1,x._2.offset))      }      else{        //如果消费组已经存在则从记录的地方开始消费        offsetOrNull.right.get      }    }      }    //每次拉取数据后存储offset到ZK  def upateOffsets(topics : Set[String],kc:KafkaCluster,directRDD:RDD[(String,String)],kafkaParams:Map[String,String]){    val offsetRanges =  directRDD.asInstanceOf[HasOffsetRanges].offsetRanges    for(offr <-offsetRanges){      val topicAndPartitions = TopicAndPartition(offr.topic,offr.partition)      val yesOrNo = kc.setConsumerOffsets(kafkaParams.get("group.id").get, Map(topicAndPartitions->offr.untilOffset))      if(yesOrNo.isLeft){        println(s"Error when update offset of $topicAndPartitions")      }    }  }     }

看完上述内容,你们掌握spark-streaming-kafka怎样通过KafkaUtils.createDirectStream的方式处理数据的方法了吗?如果还想学到更多技能或想了解更多相关内容,欢迎关注编程网精选频道,感谢各位的阅读!

--结束END--

本文标题: spark-streaming-kafka怎样通过KafkaUtils.createDirectStream的方式处理数据

本文链接: https://www.lsjlt.com/news/231139.html(转载时请注明来源链接)

有问题或投稿请发送至: 邮箱/279061341@qq.com    QQ/279061341

本篇文章演示代码以及资料文档资料下载

下载Word文档到电脑,方便收藏和打印~

下载Word文档
猜你喜欢
  • spark-streaming-kafka怎样通过KafkaUtils.createDirectStream的方式处理数据
    spark-streaming-kafka怎样通过KafkaUtils.createDirectStream的方式处理数据,相信很多没有经验的人对此束手无策,为此本文总结了问题出现的原因和解决方法,通过这篇文章希望你能解决这个问题。pack...
    99+
    2023-06-02
  • Spark Streaming及其在实时数据处理中的应用
    Spark Streaming是Apache Spark提供的一种实时流处理框架,可以对实时数据进行高效的处理和分析。它可以将数据流...
    99+
    2024-03-05
    Spark
  • 大数据流处理中Flume、Kafka和NiFi的对比是怎样的
    今天就跟大家聊聊有关大数据流处理中Flume、Kafka和NiFi的对比是怎样的,可能很多人都不太了解,为了让大家更加了解,小编给大家总结了以下内容,希望大家根据这篇文章可以有所收获。我们将简要介绍三种Apache处理工具:Flume、Ka...
    99+
    2023-06-02
  • spark实时数据处理的方法是什么
    Spark实时数据处理的方法包括使用Spark Streaming、Structured Streaming和Spark SQL。S...
    99+
    2024-04-02
  • Python使用Kafka处理数据的方法详解
    目录一、安装Kafka-Python包二、生产者三、消费者四、批量发送和批量消费五、总结Kafka是一个分布式的流数据平台,它可以快速地处理大量的实时数据。Python是一种广泛使用...
    99+
    2023-05-16
    Python Kafka处理数据 Python Kafka数据 Python 处理数据 Python Kafka
  • 如何通过AmazonAurora实现数据库的分布式事务处理
    Amazon Aurora是一个关系型数据库服务,其支持分布式事务处理。要通过Amazon Aurora实现数据库的分布式事务处理,...
    99+
    2024-04-09
    AmazonAurora
  • 怎么通过Ajax方式绑定select选项数据
    本篇文章给大家分享的是有关怎么通过Ajax方式绑定select选项数据,小编觉得挺实用的,因此分享给大家学习,希望大家阅读完这篇文章后可以有所收获,话不多说,跟着小编一起来看看吧。解决办法在HTML代码段中只需写入<select&nb...
    99+
    2023-06-08
  • java中怎样使用Files.readLines()处理文本中行数据方式
    java中怎样使用Files.readLines()处理文本中行数据方式,很多新手对此不是很清楚,为了帮助大家解决这个难题,下面小编将为大家详细讲解,有这方面需求的人可以来学习下,希望你能有所收获。使用Files.readLines()处理...
    99+
    2023-06-22
  • SpringMVC通过Ajax处理Json数据的步骤详解
    目录一、前言:二、使用步骤 1.引入jar2.Person类3.前端页面4.Controller## 5.测试SpringMVC通过Ajax处理Json数据的实现 一、前言...
    99+
    2024-04-02
  • Java阻塞的处理方式是怎样的
    这篇文章主要介绍了Java阻塞的处理方式是怎样的的相关知识,内容详细易懂,操作简单快捷,具有一定借鉴价值,相信大家阅读完这篇Java阻塞的处理方式是怎样的文章都会有所收获,下面我们一起来看看吧。在Java1.4以前,Java的网络编程是只有...
    99+
    2023-06-17
  • Kafka怎么处理数据的顺序性和并发性
    Kafka通过分区和副本的机制来处理数据的顺序性和并发性。 数据的顺序性:Kafka中的数据被分为多个分区,每个分区内的数据是有...
    99+
    2024-03-14
    Kafka
  • Mybatis通过Mapper代理连接数据库的方法
    1.在数据库中创建表和相应字段,如下图我创建了三个字段分别为fromname,message,toname,类型为varchar 2.创建对应的pojo实体类,注意类型要和数据库创...
    99+
    2024-04-02
  • Java8处理数据的函数式方式是什么
    本篇内容介绍了“Java8处理数据的函数式方式是什么”的有关知识,在实际案例的操作过程中,不少人都会遇到这样的困境,接下来就让小编带领大家学习一下如何处理这些情况吧!希望大家仔细阅读,能够学有所成!处理数据的函数式方式Java 8 不仅仅添...
    99+
    2023-06-17
  • SQL Server中怎么通过重建方式还原master数据库
    这期内容当中小编将会给大家带来有关SQL Server中怎么通过重建方式还原master数据库,文章内容丰富且以专业的角度为大家分析和叙述,阅读完这篇文章希望大家可以有所收获。具体内容如下1、备份maste...
    99+
    2024-04-02
  • Redis 通过 RDB 方式进行数据备份与还原的方法
    目录IntroRedis 持久化RDB的优点RDB的缺点AOF 优点AOF 缺点备份还原Intro 有的时候我们需要对 Redis 的数据进行迁移,今天介绍一下通过 RDB(快照)文...
    99+
    2024-04-02
  • Java Apache Kafka 的时空之舞:串联数据处理的过去、现在和未来
    ...
    99+
    2024-04-02
  • J2ME通过Servlet访问数据库的步骤分别是怎样的
    本篇文章为大家展示了J2ME通过Servlet访问数据库的步骤分别是怎样的,内容简明扼要并且容易理解,绝对能使你眼前一亮,通过这篇文章的详细介绍希望你能有所收获。你知道J2ME访问数据库的方式吗,这里向大家...
    99+
    2024-04-02
  • Wormhole大数据流式处理平台的设计思想是怎样的
    本篇文章为大家展示了Wormhole大数据流式处理平台的设计思想是怎样的,内容简明扼要并且容易理解,绝对能使你眼前一亮,通过这篇文章的详细介绍希望你能有所收获。导读:互联网的迅猛发展使得数据不再昂贵,而如何从数据中更快速获取价值变得日益重要...
    99+
    2023-06-19
  • Linux虚拟机怎么通过拷贝方式复制Oracle数据库
    本篇内容主要讲解“Linux虚拟机怎么通过拷贝方式复制Oracle数据库”,感兴趣的朋友不妨来看看。本文介绍的方法操作简单快捷,实用性强。下面就让小编来带大家学习“Linux虚拟机怎么通过拷贝方式复制Ora...
    99+
    2024-04-02
  • PyTorch中的数据并行处理是怎样的
    PyTorch中的数据并行处理是怎样的,针对这个问题,这篇文章详细介绍了相对应的分析和解答,希望可以帮助更多想解决这个问题的小伙伴找到更简单易行的方法。通过 PyTorch 使用多个 GPU 非常简单。你可以将模型放在一个 GPU:&nbs...
    99+
    2023-06-04
软考高级职称资格查询
编程网,编程工程师的家园,是目前国内优秀的开源技术社区之一,形成了由开源软件库、代码分享、资讯、协作翻译、讨论区和博客等几大频道内容,为IT开发者提供了一个发现、使用、并交流开源技术的平台。
  • 官方手机版

  • 微信公众号

  • 商务合作