iis服务器助手广告广告
返回顶部
首页 > 资讯 > 精选 >Kafka和Storm怎么在Spring boot中使用
  • 490
分享到

Kafka和Storm怎么在Spring boot中使用

2023-05-30 16:05:41 490人浏览 安东尼
摘要

这篇文章给大家介绍kafka和StORM怎么在Spring Boot中使用,内容非常详细,感兴趣的小伙伴们可以参考借鉴,希望对大家能有所帮助。使用工具及环境配置 java 版本jdk-1.8 编译工具使用idea-2017 Maven作为项

这篇文章给大家介绍kafka和StORM怎么在Spring Boot中使用,内容非常详细,感兴趣的小伙伴们可以参考借鉴,希望对大家能有所帮助。

使用工具及环境配置

java 版本jdk-1.8

编译工具使用idea-2017

Maven作为项目管理

spring boot-1.5.8.RELEASE

需求体现

为什么需要整合到spring boot

为了使用spring boot 统一管理各种微服务,及同时避免多个分散配置

具体思路及整合原因

使用spring boot统一管理kafka、storm、redis等所需要的bean,通过其他服务日志收集至Kafka,KafKa实时发送日志至storm,在strom bolt时进行相应的处理操作

遇到的问题

使用spring boot并没有相关整合storm

以spring boot启动方式不知道如何触发提交Topolgy

提交Topology时遇到numbis not client localhost 问题

Storm bolt中无法通过注解获得实例化bean进行相应的操作

解决思路

在整合之前我们需要知道相应的spring boot 的启动方式及配置(如果你在阅读本文时,默认你已经对storm,kafka及spring boot有相关了解及使用)

spring boot 对storm进行整合的例子在网上很少,但是因为有相应的需求,因此我们还是需要整合.

首先导入所需要jar包:

<dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka-clients</artifactId> <version>0.10.1.1</version> </dependency> <dependency> <groupId>org.springframework.cloud</groupId> <artifactId>spring-cloud-starter-stream-kafka</artifactId> <exclusions> <exclusion>  <artifactId>ZooKeeper</artifactId>  <groupId>org.apache.zookeeper</groupId> </exclusion> <exclusion>  <artifactId>spring-boot-actuator</artifactId>  <groupId>org.springframework.boot</groupId> </exclusion> <exclusion>  <artifactId>kafka-clients</artifactId>  <groupId>org.apache.kafka</groupId> </exclusion> </exclusions> </dependency> <dependency> <groupId>org.springframework.kafka</groupId> <artifactId>spring-kafka</artifactId> <exclusions> <exclusion>  <artifactId>kafka-clients</artifactId>  <groupId>org.apache.kafka</groupId> </exclusion> </exclusions> </dependency> <dependency> <groupId>org.springframework.data</groupId> <artifactId>spring-data-hadoop</artifactId> <version>2.5.0.RELEASE</version> <exclusions> <exclusion>  <groupId>org.slf4j</groupId>  <artifactId>slf4j-log4j12</artifactId> </exclusion> <exclusion>  <artifactId>commons-logging</artifactId>  <groupId>commons-logging</groupId> </exclusion> <exclusion>  <artifactId>Netty</artifactId>  <groupId>io.netty</groupId> </exclusion> <exclusion>  <artifactId>jackson-core-asl</artifactId>  <groupId>org.codehaus.jackson</groupId> </exclusion> <exclusion>  <artifactId>curator-client</artifactId>  <groupId>org.apache.curator</groupId> </exclusion> <exclusion>  <artifactId>jettison</artifactId>  <groupId>org.codehaus.jettison</groupId> </exclusion> <exclusion>  <artifactId>jackson-mapper-asl</artifactId>  <groupId>org.codehaus.jackson</groupId> </exclusion> <exclusion>  <artifactId>jackson-jaxrs</artifactId>  <groupId>org.codehaus.jackson</groupId> </exclusion> <exclusion>  <artifactId>snappy-java</artifactId>  <groupId>org.xerial.snappy</groupId> </exclusion> <exclusion>  <artifactId>jackson-xc</artifactId>  <groupId>org.codehaus.jackson</groupId> </exclusion> <exclusion>  <artifactId>guava</artifactId>  <groupId>com.Google.guava</groupId> </exclusion> <exclusion>  <artifactId>hadoop-mapReduce-client-core</artifactId>  <groupId>org.apache.hadoop</groupId> </exclusion> <exclusion>  <artifactId>zookeeper</artifactId>  <groupId>org.apache.zookeeper</groupId> </exclusion> <exclusion>  <artifactId>servlet-api</artifactId>  <groupId>javax.servlet</groupId> </exclusion> </exclusions> </dependency> <dependency> <groupId>org.apache.zookeeper</groupId> <artifactId>zookeeper</artifactId> <version>3.4.10</version> <exclusions> <exclusion>  <artifactId>slf4j-log4j12</artifactId>  <groupId>org.slf4j</groupId> </exclusion> </exclusions> </dependency> <dependency> <groupId>org.apache.HBase</groupId> <artifactId>hbase-client</artifactId> <version>1.2.4</version> <exclusions> <exclusion>  <artifactId>log4j</artifactId>  <groupId>log4j</groupId> </exclusion> <exclusion>  <artifactId>zookeeper</artifactId>  <groupId>org.apache.zookeeper</groupId> </exclusion> <exclusion>  <artifactId>netty</artifactId>  <groupId>io.netty</groupId> </exclusion> <exclusion>  <artifactId>hadoop-common</artifactId>  <groupId>org.apache.hadoop</groupId> </exclusion> <exclusion>  <artifactId>guava</artifactId>  <groupId>com.google.guava</groupId> </exclusion> <exclusion>  <artifactId>hadoop-annotations</artifactId>  <groupId>org.apache.hadoop</groupId> </exclusion> <exclusion>  <artifactId>hadoop-yarn-common</artifactId>  <groupId>org.apache.hadoop</groupId> </exclusion> <exclusion>  <artifactId>slf4j-log4j12</artifactId>  <groupId>org.slf4j</groupId> </exclusion> </exclusions> </dependency> <dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-common</artifactId> <version>2.7.3</version> <exclusions> <exclusion>  <artifactId>commons-logging</artifactId>  <groupId>commons-logging</groupId> </exclusion> <exclusion>  <artifactId>curator-client</artifactId>  <groupId>org.apache.curator</groupId> </exclusion> <exclusion>  <artifactId>jackson-mapper-asl</artifactId>  <groupId>org.codehaus.jackson</groupId> </exclusion> <exclusion>  <artifactId>jackson-core-asl</artifactId>  <groupId>org.codehaus.jackson</groupId> </exclusion> <exclusion>  <artifactId>log4j</artifactId>  <groupId>log4j</groupId> </exclusion> <exclusion>  <artifactId>snappy-java</artifactId>  <groupId>org.xerial.snappy</groupId> </exclusion> <exclusion>  <artifactId>zookeeper</artifactId>  <groupId>org.apache.zookeeper</groupId> </exclusion> <exclusion>  <artifactId>guava</artifactId>  <groupId>com.google.guava</groupId> </exclusion> <exclusion>  <artifactId>hadoop-auth</artifactId>  <groupId>org.apache.hadoop</groupId> </exclusion> <exclusion>  <artifactId>commons-lang</artifactId>  <groupId>commons-lang</groupId> </exclusion> <exclusion>  <artifactId>slf4j-log4j12</artifactId>  <groupId>org.slf4j</groupId> </exclusion> <exclusion>  <artifactId>servlet-api</artifactId>  <groupId>javax.servlet</groupId> </exclusion> </exclusions> </dependency> <dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-mapreduce-examples</artifactId> <version>2.7.3</version> <exclusions> <exclusion>  <artifactId>commons-logging</artifactId>  <groupId>commons-logging</groupId> </exclusion> <exclusion>  <artifactId>netty</artifactId>  <groupId>io.netty</groupId> </exclusion> <exclusion>  <artifactId>guava</artifactId>  <groupId>com.google.guava</groupId> </exclusion> <exclusion>  <artifactId>log4j</artifactId>  <groupId>log4j</groupId> </exclusion> <exclusion>  <artifactId>servlet-api</artifactId>  <groupId>javax.servlet</groupId> </exclusion> </exclusions> </dependency> <!--storm--> <dependency> <groupId>org.apache.storm</groupId> <artifactId>storm-core</artifactId> <version>${storm.version}</version> <scope>${provided.scope}</scope> <exclusions> <exclusion>  <groupId>org.apache.logging.log4j</groupId>  <artifactId>log4j-slf4j-impl</artifactId> </exclusion> <exclusion>  <artifactId>servlet-api</artifactId>  <groupId>javax.servlet</groupId> </exclusion> </exclusions> </dependency> <dependency> <groupId>org.apache.storm</groupId> <artifactId>storm-kafka</artifactId> <version>1.1.1</version> <exclusions> <exclusion>  <artifactId>kafka-clients</artifactId>  <groupId>org.apache.kafka</groupId> </exclusion> </exclusions> </dependency>

其中去除jar包是因为需要相与项目构建依赖有多重依赖问题,storm版本为1.1.0  spring boot相关依赖为

```java

<!-- spring boot -->  <dependency>   <groupId>org.springframework.boot</groupId>   <artifactId>spring-boot-starter</artifactId>   <exclusions>    <exclusion>     <groupId>org.springframework.boot</groupId>     <artifactId>spring-boot-starter-logging</artifactId>    </exclusion>   </exclusions>  </dependency>  <dependency>   <groupId>org.springframework.boot</groupId>   <artifactId>spring-boot-starter-WEB</artifactId>  </dependency>  <dependency>   <groupId>org.springframework.boot</groupId>   <artifactId>spring-boot-starter-aop</artifactId>  </dependency>  <dependency>   <groupId>org.springframework.boot</groupId>   <artifactId>spring-boot-starter-test</artifactId>   <scope>test</scope>  </dependency>  <dependency>   <groupId>org.springframework.boot</groupId>   <artifactId>spring-boot-starter-log4j2</artifactId>  </dependency>  <dependency>   <groupId>org.mybatis.spring.boot</groupId>   <artifactId>mybatis-spring-boot-starter</artifactId>   <version>${mybatis-spring.version}</version>  </dependency>  <dependency>   <groupId>org.springframework.boot</groupId>   <artifactId>spring-boot-configuration-processor</artifactId>   <optional>true</optional>  </dependency>

ps:maven的jar包仅因为项目使用需求,不是最精简,仅供大家参考.

项目结构:

config-存储不同环境配置文件

Kafka和Storm怎么在Spring boot中使用

存储构建spring boot 相关实现类 其他如构建名

启动spring boot的时候我们会发现

其实开始整合前,对storm了解的较少,属于刚开始没有接触过,后面参考发现整合到spring boot里面启动spring boot之后并没有相应的方式去触发提交Topolgy的函数,所以也造成了以为启动spring boot之后就完事了结果等了半个小时什么事情都没发生才发现没有实现触发提交函数.

为了解决这个问题我的想法是: 启动spring boot->创建kafka监听Topic然后启动Topolgy完成启动,可是这样的问题kafka监听这个主题会重复触发Topolgy,这明显不是我们想要的.看了一会后发现spring 有相关启动完成之后执行某个时间方法,这个对我来说简直是救星啊.所以现在触发Topolgy的思路变为:

启动spring boot ->执行触发方法->完成相应的触发条件

构建方法为:

@Configuration@Componentpublic class AutoLoad implements ApplicationListener<ContextRefreshedEvent> { private static String BROKERZKSTR; private static String TOPIC; private static String HOST; private static String PORT; public AutoLoad(@Value("${storm.brokerZkstr}") String brokerZkstr,     @Value("${zookeeper.host}") String host,     @Value("${zookeeper.port}") String port,     @Value("${kafka.default-topic}") String topic ){  BROKERZKSTR = brokerZkstr;  HOST= host;  TOPIC= topic;  PORT= port; } @Override public void onApplicationEvent(ContextRefreshedEvent event) {  try {   //实例化topologyBuilder类。   TopologyBuilder topologyBuilder = new TopologyBuilder();   //设置喷发节点并分配并发数,该并发数将会控制该对象在集群中的线程数。   BrokerHosts brokerHosts = new ZkHosts(BROKERZKSTR);   // 配置Kafka订阅的Topic,以及zookeeper中数据节点目录和名字   SpoutConfig spoutConfig = new SpoutConfig(brokerHosts, TOPIC, "/storm", "s32");   spoutConfig.scheme = new SchemeAsMultiScheme(new StringScheme());   spoutConfig.zkServers = Collections.singletonList(HOST);   spoutConfig.zkPort = Integer.parseInt(PORT);   //从Kafka最新输出日志读取   spoutConfig.startOffsetTime = OffsetRequest.LatestTime();   KafkaSpout receiver = new KafkaSpout(spoutConfig);   topologyBuilder.setSpout("kafka-spout", receiver, 1).setNumTasks(2);   topologyBuilder.setBolt("alarm-bolt", new AlarmBolt(), 1).setNumTasks(2).shuffleGrouping("kafka-spout");   Config config = new Config();   config.setDebug(false);      config.setNumWorkers(1);   LocalCluster cluster = new LocalCluster();   cluster.submitTopology("kafka-spout", config, topologyBuilder.createTopology());  } catch (Exception e) {   e.printStackTrace();  } }}

注:

启动项目时因为使用的是内嵌Tomcat进行启动,可能会报如下错误

[Tomcat-startStop-1] ERROR o.a.c.c.ContainerBase - A child container failed during startjava.util.concurrent.ExecutionException: org.apache.catalina.LifecycleException: Failed to start component [StandardEngine[Tomcat].StandardHost[localhost].TomcatEmbeddedContext[]] at java.util.concurrent.FutureTask.report(FutureTask.java:122) ~[?:1.8.0_144] at java.util.concurrent.FutureTask.get(FutureTask.java:192) ~[?:1.8.0_144] at org.apache.catalina.core.ContainerBase.startInternal(ContainerBase.java:939) [tomcat-embed-core-8.5.23.jar:8.5.23] at org.apache.catalina.core.StandardHost.startInternal(StandardHost.java:872) [tomcat-embed-core-8.5.23.jar:8.5.23] at org.apache.catalina.util.LifecycleBase.start(LifecycleBase.java:150) [tomcat-embed-core-8.5.23.jar:8.5.23] at org.apache.catalina.core.ContainerBase$StartChild.call(ContainerBase.java:1419) [tomcat-embed-core-8.5.23.jar:8.5.23] at org.apache.catalina.core.ContainerBase$StartChild.call(ContainerBase.java:1409) [tomcat-embed-core-8.5.23.jar:8.5.23] at java.util.concurrent.FutureTask.run$$$capture(FutureTask.java:266) [?:1.8.0_144] at java.util.concurrent.FutureTask.run(FutureTask.java) [?:1.8.0_144] at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) [?:1.8.0_144] at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) [?:1.8.0_144] at java.lang.Thread.run(Thread.java:748) [?:1.8.0_144]

这是因为有相应导入的jar包引入了servlet-api版本低于内嵌版本,我们需要做的就是打开maven依赖把其去除

<exclusion> <artifactId>servlet-api</artifactId> <groupId>javax.servlet</groupId></exclusion>

然后重新启动就可以了.

启动过程中还有可能报:

复制代码 代码如下:


org.apache.storm.utils.NimbusLeaderNotFoundException: Could not find leader nimbus from seed hosts [localhost]. Did you specify a valid list of nimbus hosts for config nimbus.seeds?at org.apache.storm.utils.NimbusClient.getConfiguredClientAs(NimbusClient.java:90

这个问题我思考了很久,发现网上的解释都是因为storm配置问题导致不对,可是我的storm是部署在服务器上的.并没有相关的配置,按理也应该去服务器上读取相关配置,可是结果并不是这样的。最后尝试了几个做法发现都不对,这里才发现,在构建集群的时候storm提供了相应的本地集群

LocalCluster cluster = new LocalCluster();

进行本地测试,如果在本地测试就使用其进行部署测试,如果部署到服务器上需要把:

cluster.submitTopology("kafka-spout", config, topologyBuilder.createTopology());//修正为:StormSubmitter.submitTopology("kafka-spout", config, topologyBuilder.createTopology());

进行任务提交;

以上解决了上面所述的问题1-3

问题4:是在bolt中使用相关bean实例,我发现我把其使用@Component加入spring中也无法获取到实例:我的猜想是在我们构建提交Topolgy的时候,它会在:

复制代码 代码如下:


topologyBuilder.setBolt("alarm-bolt",new AlarmBolt(),1).setNumTasks(2).shuffleGrouping("kafka-spout");

执行bolt相关:

@Override public void prepare(Map stormConf, TopologyContext context,      OutputCollector collector) {  this.collector = collector;  StormLauncher stormLauncher = StormLauncher.getStormLauncher();  dataRepositorys =(AlarmDataRepositorys)   stormLauncher.getBean("alarmdataRepositorys"); }

而不会实例化bolt,导致线程不一而spring 获取不到.(这里我也不是太明白,如果有大佬知道可以分享一波)

而我们使用spring boot的意义就在于这些获取这些繁杂的对象,这个问题困扰了我很久.最终想到,我们可以通过上下文getbean获取实例不知道能不能行,然后我就开始了定义:

例如我需要在bolt中使用一个服务:

@Service("alarmdataRepositorys")public class AlarmDataRepositorys extends RedisBase implements IAlarmDataRepositorys { private static final String ERRO = "erro";  @Override public String getErrNumFromRedis(String type,String key) {  if(type==null || key == null){   return null;  }else {   ValueOperations<String, String> valueOper = primaryStringRedisTemplate.opsForValue();   return valueOper.get(String.format("%s:%s:%s",ERRO,type,key));  } }  @Override public void setErrNumToRedis(String type, String key,String value) {  try {   ValueOperations<String, String> valueOper = primaryStringRedisTemplate.opsForValue();   valueOper.set(String.format("%s:%s:%s", ERRO,type, key), value, Dictionaries.ApiKeyDayOfLifeCycle, TimeUnit.SECONDS);  }catch (Exception e){   logger.info(Dictionaries.REDIS_ERROR_PREFIX+String.format("key为%s存入redis失败",key));  } }

这里我指定了该bean的名称,则在bolt执行prepare时:使用getbean方法获取了相关bean就能完成相应的操作.

然后kafka订阅主题发送至我bolt进行相关的处理.而这里getbean的方法是在启动bootmain函数定义:

@SpringBootApplication@EnableTransactionManagement@ComponentScan({"service","storm"})@EnableMongoRepositories(basePackages = {"storm"})@PropertySource(value = {"classpath:service.properties", "classpath:application.properties","classpath:storm.properties"})@ImportResource(locations = { "classpath:/configs/spring-hadoop.xml", "classpath:/configs/spring-hbase.xml"})public class StormLauncher extends SpringBootServletInitializer { //设置 安全线程launcher实例 private volatile static StormLauncher stormLauncher; //设置上下文 private ApplicationContext context; public static void main(String[] args) {  SpringApplicationBuilder application = new SpringApplicationBuilder(StormLauncher.class);  // application.web(false).run(args);该方式是spring boot不以web形式启动 application.run(args); StormLauncher s = new StormLauncher(); s.setApplicationContext(application.context()); setStormLauncher(s); } private static void setStormLauncher(StormLauncher stormLauncher) { StormLauncher.stormLauncher = stormLauncher; } public static StormLauncher getStormLauncher() { return stormLauncher; } @Override protected SpringApplicationBuilder configure(SpringApplicationBuilder application) { return application.sources(StormLauncher.class); }  public ApplicationContext getApplicationContext() { return context; }  private void setApplicationContext(ApplicationContext appContext) { this.context = appContext; }  public Object getBean(String name) { return context.getBean(name); }  public <T> T getBean(Class<T> clazz) { return context.getBean(clazz); }  public <T> T getBean(String name, Class<T> clazz) { return context.getBean(name, clazz); }

关于Kafka和Storm怎么在Spring boot中使用就分享到这里了,希望以上内容可以对大家有一定的帮助,可以学到更多知识。如果觉得文章不错,可以把它分享出去让更多的人看到。

--结束END--

本文标题: Kafka和Storm怎么在Spring boot中使用

本文链接: https://www.lsjlt.com/news/219929.html(转载时请注明来源链接)

有问题或投稿请发送至: 邮箱/279061341@qq.com    QQ/279061341

本篇文章演示代码以及资料文档资料下载

下载Word文档到电脑,方便收藏和打印~

下载Word文档
猜你喜欢
  • C++ 生态系统中流行库和框架的贡献指南
    作为 c++++ 开发人员,通过遵循以下步骤即可为流行库和框架做出贡献:选择一个项目并熟悉其代码库。在 issue 跟踪器中寻找适合初学者的问题。创建一个新分支,实现修复并添加测试。提交...
    99+
    2024-05-15
    框架 c++ 流行库 git
  • C++ 生态系统中流行库和框架的社区支持情况
    c++++生态系统中流行库和框架的社区支持情况:boost:活跃的社区提供广泛的文档、教程和讨论区,确保持续的维护和更新。qt:庞大的社区提供丰富的文档、示例和论坛,积极参与开发和维护。...
    99+
    2024-05-15
    生态系统 社区支持 c++ overflow 标准库
  • c++中if elseif使用规则
    c++ 中 if-else if 语句的使用规则为:语法:if (条件1) { // 执行代码块 1} else if (条件 2) { // 执行代码块 2}// ...else ...
    99+
    2024-05-15
    c++
  • c++中的继承怎么写
    继承是一种允许类从现有类派生并访问其成员的强大机制。在 c++ 中,继承类型包括:单继承:一个子类从一个基类继承。多继承:一个子类从多个基类继承。层次继承:多个子类从同一个基类继承。多层...
    99+
    2024-05-15
    c++
  • c++中如何使用类和对象掌握目标
    在 c++ 中创建类和对象:使用 class 关键字定义类,包含数据成员和方法。使用对象名称和类名称创建对象。访问权限包括:公有、受保护和私有。数据成员是类的变量,每个对象拥有自己的副本...
    99+
    2024-05-15
    c++
  • c++中优先级是什么意思
    c++ 中的优先级规则:优先级高的操作符先执行,相同优先级的从左到右执行,括号可改变执行顺序。操作符优先级表包含从最高到最低的优先级列表,其中赋值运算符具有最低优先级。通过了解优先级,可...
    99+
    2024-05-15
    c++
  • c++中a+是什么意思
    c++ 中的 a+ 运算符表示自增运算符,用于将变量递增 1 并将结果存储在同一变量中。语法为 a++,用法包括循环和计数器。它可与后置递增运算符 ++a 交换使用,后者在表达式求值后递...
    99+
    2024-05-15
    c++
  • c++中a.b什么意思
    c++kquote>“a.b”表示对象“a”的成员“b”,用于访问对象成员,可用“对象名.成员名”的语法。它还可以用于访问嵌套成员,如“对象名.嵌套成员名.成员名”的语法。 c++...
    99+
    2024-05-15
    c++
  • C++ 并发编程库的优缺点
    c++++ 提供了多种并发编程库,满足不同场景下的需求。线程库 (std::thread) 易于使用但开销大;异步库 (std::async) 可异步执行任务,但 api 复杂;协程库 ...
    99+
    2024-05-15
    c++ 并发编程
  • 如何在 Golang 中备份数据库?
    在 golang 中备份数据库对于保护数据至关重要。可以使用标准库中的 database/sql 包,或第三方包如 github.com/go-sql-driver/mysql。具体步骤...
    99+
    2024-05-15
    golang 数据库备份 mysql git 标准库
软考高级职称资格查询
编程网,编程工程师的家园,是目前国内优秀的开源技术社区之一,形成了由开源软件库、代码分享、资讯、协作翻译、讨论区和博客等几大频道内容,为IT开发者提供了一个发现、使用、并交流开源技术的平台。
  • 官方手机版

  • 微信公众号

  • 商务合作