iis服务器助手广告广告
返回顶部
首页 > 资讯 > 精选 >storm-kafka-client使用的示例分析
  • 570
分享到

storm-kafka-client使用的示例分析

2023-06-02 20:06:41 570人浏览 泡泡鱼
摘要

stORM-kafka-client使用的示例分析,相信很多没有经验的人对此束手无策,为此本文总结了问题出现的原因和解决方法,通过这篇文章希望你能解决这个问题。package hgs.core.sk;import jav

stORM-kafka-client使用的示例分析,相信很多没有经验的人对此束手无策,为此本文总结了问题出现的原因和解决方法,通过这篇文章希望你能解决这个问题。

package hgs.core.sk;import java.util.Map;import org.apache.kafka.clients.consumer.ConsumerConfig;import org.apache.storm.Config;import org.apache.storm.LocalCluster;import org.apache.storm.StormSubmitter;import org.apache.storm.kafka.spout.ByTopicRecordTranslator;import org.apache.storm.kafka.spout.KafkaSpout;import org.apache.storm.kafka.spout.KafkaSpoutConfig;import org.apache.storm.kafka.spout.KafkaSpoutConfig.FirstPollOffsetStrategy;import org.apache.storm.task.OutputCollector;import org.apache.storm.task.TopologyContext;import org.apache.storm.topology.OutputFieldsDeclarer;import org.apache.storm.topology.TopologyBuilder;import org.apache.storm.topology.base.BaseRichBolt;import org.apache.storm.tuple.Fields;import org.apache.storm.tuple.Tuple;import org.apache.storm.tuple.Values;//参考如下//https://commUnity.hortonworks.com/articles/87597/how-to-write-topology-with-the-new-kafka-spout-cli.html//Https://GitHub.com/apache/storm/blob/master/examples/storm-kafka-client-examples/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutTopologyMainNamedTopics.java#L52public class StormKafkaMainTest {public static void main(String[] args) {TopologyBuilder builder = new TopologyBuilder();//该类将传入的kafka记录转换为storm的tupleByTopicRecordTranslator<String,String> brt = new ByTopicRecordTranslator<>( (r) -> new Values(r.value(),r.topic()),new Fields("values","test7"));//设置要消费的topic即test7brt.forTopic("test7", (r) -> new Values(r.value(),r.topic()), new Fields("values","test7"));//类似之前的SpoutConfigKafkaSpoutConfig<String,String> ksc = KafkaSpoutConfig//bootstrapServers 以及topic(test7).builder("bigdata01:9092,bigdata02:9092,bigdata03:9092", "test7")//设置group.id.setProp(ConsumerConfig.GROUP_ID_CONFIG, "skc-test")//设置开始消费的气势位置.setFirstPollOffsetStrategy(FirstPollOffsetStrategy.LATEST)//设置提交消费边界的时长间隔.setOffsetCommitPeriodMs(10_000)//Translator.setRecordTranslator(brt).build();builder.setSpout("kafkaspout", new KafkaSpout<>(ksc), 2);builder.setBolt("mybolt1", new MyboltO(), 4).shuffleGrouping("kafkaspout");     Config config = new Config();     config.setNumWorkers(2);     config.setNuMackers(0);     try {StormSubmitter.submitTopology("storm-kafka-clients", config, builder.createTopology());} catch (Exception e) {e.printStackTrace();}      }}class  MyboltO extends  BaseRichBolt{private static final long serialVersionUID = 1L;OutputCollector collector = null;public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {this.collector = collector;}public void execute(Tuple input) {//这里把消息大一出来,在对应的woker下面的日志可以找到打印的内容String out = input.getString(0);System.out.println(out);//collector.ack(input);}public void declareOutputFields(OutputFieldsDeclarer declarer) {}}

pom.xml文件

<project xmlns="http://Maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"  xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">  <modelVersion>4.0.0</modelVersion>  <groupId>hgs</groupId>  <artifactId>core.sk</artifactId>  <version>1.0.0-SNAPSHOT</version>  <packaging>jar</packaging>  <name>core.sk</name>  <url>http://maven.apache.org</url>  <properties>    <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>  </properties>  <dependencies>    <dependency>      <groupId>junit</groupId>      <artifactId>junit</artifactId>      <version>3.8.1</version>      <scope>test</scope>    </dependency>    <!--    <dependency>    <groupId>org.apache.storm</groupId>    <artifactId>storm-kafka</artifactId>    <version>1.1.3</version></dependency> --><dependency>   <groupId>org.apache.storm</groupId>    <artifactId>storm-kafka-client</artifactId>    <version>1.1.3</version></dependency><dependency>  <groupId>org.apache.storm</groupId>  <artifactId>storm-core</artifactId>  <version>1.1.3</version>  <scope>provided</scope></dependency><dependency>    <groupId>org.apache.kafka</groupId>    <artifactId>kafka_2.11</artifactId>    <version>1.0.0</version>    <exclusions>    <exclusion>          <groupId>org.slf4j</groupId>          <artifactId>slf4j-log4j12</artifactId>        </exclusion>        <exclusion>            <groupId>org.apache.ZooKeeper</groupId>            <artifactId>zookeeper</artifactId>       </exclusion>    </exclusions></dependency><!-- <dependency>    <groupId>org.apache.storm</groupId>    <artifactId>storm-kafka-monitor</artifactId>    <version>1.2.2</version></dependency> --><!-- <dependency>    <groupId>org.apache.kafka</groupId>    <artifactId>kafka-clients</artifactId>    <version>0.8.2.1</version></dependency> --><dependency>    <groupId>org.clojure</groupId>    <artifactId>clojure</artifactId>    <version>1.7.0</version></dependency><!-- 尝试了很多次 都会有这个错误:java.lang.NullPointerException at org.apache.storm.kafka.monitor.KafkaOffsetLagUtil.getOffsetLags(KafkaOffsetLagUtil.java:272)最后修改为kafka相应的kafka-clients版本后问题得到解决,应该是该出的问题--><dependency>    <groupId>org.apache.kafka</groupId>    <artifactId>kafka-clients</artifactId>    <version>1.0.0</version></dependency> </dependencies>        <build>        <plugins>            <plugin>                <artifactId>maven-assembly-plugin</artifactId>                <version>2.2</version>                <configuration>                    <arcHive>                        <manifest>                            <!-- 我运行这个jar所运行的主类 -->                            <mainClass>hgs.core.sk.StormKafkaMainTest</mainClass>                        </manifest>                    </archive>                    <descriptorRefs>                        <descriptorRef>                            <!-- 必须是这样写 -->                            jar-with-dependencies                        </descriptorRef>                    </descriptorRefs>                </configuration>                                <executions>                    <execution>                        <id>make-assembly</id>                        <phase>package</phase>                        <Goals>                            <goal>single</goal>                        </goals>                    </execution>                </executions>            </plugin>                         <plugin>                <groupId>org.apache.maven.plugins</groupId>                <artifactId>maven-compiler-plugin</artifactId>                <configuration>                    <source>1.8</source>                    <target>1.8</target>                </configuration>            </plugin>        </plugins>    </build></project>
//以下为lambda表达式,因为在上面用大了,所以在这儿记录一下,以免以后看不懂import java.util.UUID;import org.junit.jupiter.api.Test;public class TEst {@Testpublic void sysConfig() {String[] ags = {"his is my first storm program so i hope it will success","i love bascketball","the day of my birthday i was alone"};String uuid = UUID.randomUUID().toString();String nexttuple= ags[new Random().nextInt(ags.length)];System.out.println(nexttuple);}@Testpublic void lambdaTest() {int b  = 100;//该出返回10*a的值、//"(a) -> 10*a" 相当于 new  testinter<T>();printPerson((a) -> 10*a) ;}void printPerson( testinter<Integer> t) {//穿过来的t需要一个参数a 即下面借口中定义的方法sysoutitems(int a )System.out.println(t.sysoutitems(100));};}//定义接口,在lambda表达式运用中,必须为借口,并且借口只能有一个方法interface testinter<T>{T sysoutitems(int a );//void aAndb(int a, int b );}

看完上述内容,你们掌握storm-kafka-client使用的示例分析的方法了吗?如果还想学到更多技能或想了解更多相关内容,欢迎关注编程网精选频道,感谢各位的阅读!

--结束END--

本文标题: storm-kafka-client使用的示例分析

本文链接: https://www.lsjlt.com/news/231214.html(转载时请注明来源链接)

有问题或投稿请发送至: 邮箱/279061341@qq.com    QQ/279061341

本篇文章演示代码以及资料文档资料下载

下载Word文档到电脑,方便收藏和打印~

下载Word文档
猜你喜欢
  • storm-kafka-client使用的示例分析
    storm-kafka-client使用的示例分析,相信很多没有经验的人对此束手无策,为此本文总结了问题出现的原因和解决方法,通过这篇文章希望你能解决这个问题。package hgs.core.sk;import jav...
    99+
    2023-06-02
  • Go中http client的示例分析
    这篇文章给大家分享的是有关Go中http client的示例分析的内容。小编觉得挺实用的,因此分享给大家做个参考,一起跟随小编过来看看吧。什么是gogo是golang的简称,golang 是Google开发的一种静态强类型、编译型、并发型,...
    99+
    2023-06-15
  • Kafka consumer Api的示例分析
    本篇文章给大家分享的是有关Kafka consumer Api的示例分析,小编觉得挺实用的,因此分享给大家学习,希望大家阅读完这篇文章后可以有所收获,话不多说,跟着小编一起来看看吧。     Kafka 在0....
    99+
    2023-06-03
  • springboot 1.5.2 集成kafka的示例分析
    这篇文章主要介绍springboot 1.5.2 集成kafka的示例分析,文中介绍的非常详细,具有一定的参考价值,感兴趣的小伙伴们一定要看完!具体如下:随着spring boot 1.5版本的发布,在spring项目中与kafka集成更为...
    99+
    2023-05-30
    springboot kafka
  • Kafka Java客户端代码的示例分析
    这篇文章将为大家详细讲解有关Kafka Java客户端代码的示例分析,文章内容质量较高,因此小编分享给大家做个参考,希望大家阅读完这篇文章后对相关知识有一定的了解。kafka是一种高吞吐量的分布式发布订阅消息系统kafka是linkedin...
    99+
    2023-06-17
  • 使用Ajax的示例分析
    这篇文章主要为大家展示了“使用Ajax的示例分析”,内容简而易懂,条理清晰,希望能够帮助大家解决疑惑,下面让小编带领大家一起研究并学习一下“使用Ajax的示例分析”这篇文章吧。什么是ajaxajax(异步j...
    99+
    2024-04-02
  • 使用VanillaJS的示例分析
    这篇文章给大家分享的是有关使用VanillaJS的示例分析的内容。小编觉得挺实用的,因此分享给大家做个参考,一起跟随小编过来看看吧。时间注意原始操作符比函数调用快,使用VanillaJS比如,一般不要这样:...
    99+
    2024-04-02
  • 使用Vue的示例分析
    这篇文章将为大家详细讲解有关使用Vue的示例分析,小编觉得挺实用的,因此分享给大家做个参考,希望大家阅读完这篇文章后可以有所收获。Vue的优点Vue具体轻量级框架、简单易学、双向数据绑定、组件化、数据和结构的分离、虚拟DOM、运行速度快等优...
    99+
    2023-06-15
  • 使用MySQL的示例分析
    这篇文章主要介绍了使用MySQL的示例分析,具有一定借鉴价值,感兴趣的朋友可以参考下,希望大家阅读完这篇文章之后大有收获,下面让小编带着大家一起了解一下。属性表(product_props)结构如下数据量8...
    99+
    2024-04-02
  • rsync使用的示例分析
    这篇文章给大家介绍rsync使用的示例分析,内容非常详细,感兴趣的小伙伴们可以参考借鉴,希望对大家能有所帮助。要想同步某一位置文件和目录到另一个位置去可以使用Rsync,备份的位置可以在本地服务器或远程服务器。rsync特征速度:第一次的r...
    99+
    2023-06-28
  • Kafka简单客户端编程的示例分析
    这篇文章将为大家详细讲解有关Kafka简单客户端编程的示例分析,小编觉得挺实用的,因此分享给大家做个参考,希望大家阅读完这篇文章后可以有所收获。一、创建配置类Config这个类很简单,只是存放了两个常量,一个是话题TOPIC,一个是线程数T...
    99+
    2023-05-30
    kafka
  • 使用svg的示例分析
    这篇文章主要为大家展示了“使用svg的示例分析”,内容简而易懂,条理清晰,希望能够帮助大家解决疑惑,下面让小编带领大家一起研究并学习一下“使用svg的示例分析”这篇文章吧。1、例子一css代码html,&n...
    99+
    2024-04-02
  • awk使用示例分析
    这篇文章主要讲解了“awk使用示例分析”,文中的讲解内容简单清晰,易于学习与理解,下面请大家跟着小编的思路慢慢深入,一起来研究和学习“awk使用示例分析”吧!awk是一个强大的文本分析工具。awk其名称得自于它的创始人 Alfred Aho...
    99+
    2023-06-04
  • Node.js使用示例分析
    本篇内容介绍了“Node.js使用示例分析”的有关知识,在实际案例的操作过程中,不少人都会遇到这样的困境,接下来就让小编带领大家学习一下如何处理这些情况吧!希望大家仔细阅读,能够学有所成!Node.js 的非阻塞 I/OI/O 即 ...
    99+
    2023-06-17
  • 使用v-model的示例分析
    小编给大家分享一下使用v-model的示例分析,相信大部分人都还不怎么了解,因此分享这篇文章给大家参考一下,希望大家阅读完这篇文章后大有收获,下面让我们一起去了解一下吧!Vue v-model是一个指令,它提供了input和form之间或两...
    99+
    2023-06-14
  • springboot使用nacos的示例分析
    springboot使用nacos的示例分析,相信很多没有经验的人对此束手无策,为此本文总结了问题出现的原因和解决方法,通过这篇文章希望你能解决这个问题。pom.xml:<xml version="1.0"...
    99+
    2023-06-22
  • Linux sed使用的示例分析
    这篇文章将为大家详细讲解有关Linux sed使用的示例分析,文章内容质量较高,因此小编分享给大家做个参考,希望大家阅读完这篇文章后对相关知识有一定的了解。sed 一种流向编辑器 stream editor,是Linux中三大文件处理工具(...
    99+
    2023-06-28
  • 如何进行Kafka 1.0.0 d代码示例分析
    这篇文章将为大家详细讲解有关如何进行Kafka 1.0.0 d代码示例分析,文章内容质量较高,因此小编分享给大家做个参考,希望大家阅读完这篇文章后对相关知识有一定的了解。package kafka.demo;import ...
    99+
    2023-06-02
  • C++BoostAny示例分析使用
    目录一、提要二、Boost.Any示例一、提要 强类型语言,例如 C++,要求每个变量都有一个特定的类型来定义它可以存储什么样的信息。其他语言,例如 JavaScript,允许开发人...
    99+
    2022-11-13
    C++ Boost Any C++ Boost Any示例
  • 使用批处理的示例分析
    这篇文章将为大家详细讲解有关使用批处理的示例分析,小编觉得挺实用的,因此分享给大家做个参考,希望大家阅读完这篇文章后可以有所收获。批处理程序删除自身.bat echo 有时候我们需要批处理程序在执行完成之后删除自身,可以用 del %0 例...
    99+
    2023-06-08
软考高级职称资格查询
编程网,编程工程师的家园,是目前国内优秀的开源技术社区之一,形成了由开源软件库、代码分享、资讯、协作翻译、讨论区和博客等几大频道内容,为IT开发者提供了一个发现、使用、并交流开源技术的平台。
  • 官方手机版

  • 微信公众号

  • 商务合作