广告
返回顶部
首页 > 资讯 > 精选 >Spring Boot 集成 Kafkad的实现方法
  • 122
分享到

Spring Boot 集成 Kafkad的实现方法

2023-06-14 11:06:14 122人浏览 安东尼
摘要

本篇内容介绍了“Spring Boot 集成 kafkad的实现方法”的有关知识,在实际案例的操作过程中,不少人都会遇到这样的困境,接下来就让小编带领大家学习一下如何处理这些情况吧!希望大家仔细阅读,能够学有所成!spring Boot 作

本篇内容介绍了“Spring Boot 集成 kafkad的实现方法”的有关知识,在实际案例的操作过程中,不少人都会遇到这样的困境,接下来就让小编带领大家学习一下如何处理这些情况吧!希望大家仔细阅读,能够学有所成!

spring Boot 作为主流微服务框架,拥有成熟的社区生态。市场应用广泛,为了方便大家,整理了一个基于spring boot的常用中间件快速集成入门系列手册,涉及rpc缓存消息队列、分库分表、注册中心、分布式配置等常用开源组件,大概有几十篇文章,陆续会开放出来,感兴趣同学请提前关注&收藏

消息通信有两种基本模型,即发布-订阅(Pub-Sub)模型和点对点(Point to Point)模型,发布-订阅支持生产者消费者之间的一对多关系,而点对点模型中有且仅有一个消费者。

前言

Kafka是由Apache软件基金会开发的一个开源流处理平台,由Scala和Java编写。该项目的目标是为处理实时数据提供一个统一、高吞吐、低延迟的平台。其持久化层本质上是一个“按照分布式事务日志架构的大规模发布/订阅消息队列”。

Kafka高效地处理实时流式数据,可以实现与StORMHBasespark的集成。作为聚类部署到多台服务器上,Kafka处理它所有的发布和订阅消息系统使用了四个api,即生产者API、消费者API、Stream API和Connector API。它能够传递大规模流式消息,自带容错功能,已经取代了一些传统消息系统,如JMS、AMQP等。

为什么使用kafka?

  • 削峰填谷。缓冲上下游瞬时突发流量,保护 “脆弱” 的下游系统不被压垮,避免引发全链路服务 “雪崩”。

  • 系统解耦。发送方和接收方的松耦合,一定程度简化了开发成本,减少了系统间不必要的直接依赖。

  • 异步通信:消息队列允许用户把消息放入队列但不立即处理它。

  • 可恢复性:即使一个处理消息的进程挂掉,加入队列中的消息仍然可以在系统恢复后被处理。

业务场景

  • 一些同步业务流程的非核心逻辑,对时间要求不是特别高,可以解耦异步来执行

  • 系统日志收集,采集并同步到kafka,一般采用elk组合玩法

  • 一些大数据平台,用于各个系统间数据传递

基本架构

Kafka 运行在一个由一台或多台服务器组成的集群上,并且分区可以跨集群节点分布

Spring Boot 集成 Kafkad的实现方法

Producer 生产消息,发送到Broker中

Leader状态的Broker接收消息,写入到相应topic中。在一个分区内,这些消息被索引并连同时间戳存储在一起

Leader状态的Broker接收完毕以后,传给Follow状态的Broker作为副本备份

Consumer 消费者的进程可以从分区订阅,并消费消息

常用术语

  • Broker。负责接收和处理客户端发送过来的请求,以及对消息进行持久化。虽然多个 Broker 进程能够运行在同一台机器上,但更常见的做法是将不同的 Broker 分散运行在不同的机器上

  • 主题:Topic。主题是承载消息的逻辑容器,在实际使用中多用来区分具体的业务。

  • 分区:Partition。一个有序不变的消息序列。每个主题下可以有多个分区。

  • 消息:这里的消息就是指 Kafka 处理的主要对象。

  • 消息位移:Offset。表示分区中每条消息的位置信息,是一个单调递增且不变的值。

  • 副本:Replica。Kafka 中同一条消息能够被拷贝到多个地方以提供数据冗余,这些地方就是所谓的副本。副本还分为领导者副本和追随者副本,各自有不同的角色划分。每个分区可配置多个副本实现高可用。一个分区的N个副本一定在N个不同的Broker上。

  • Leader:每个分区多个副本的“主”副本,生产者发送数据的对象,以及消费者消费数据的对象,都是 Leader。

  • Follower:每个分区多个副本的“从”副本,实时从 Leader 中同步数据,保持和 Leader 数据的同步。Leader 发生故障时,某个 Follower 还会成为新的 Leader。

  • 生产者:Producer。向主题发布新消息的应用程序。

  • 消费者:Consumer。从主题订阅新消息的应用程序。

  • 消费者位移:Consumer Offset。表示消费者消费进度,每个消费者都有自己的消费者位移。offset保存在broker端的内部topic中,不是在clients中保存

  • 消费者组:Consumer Group。多个消费者实例共同组成的一个组,同时消费多个分区以实现高吞吐。

  • 重平衡:Rebalance。消费者组内某个消费者实例挂掉后,其他消费者实例自动重新分配订阅主题分区的过程。Rebalance 是 Kafka 消费者端实现高可用的重要手段。

代码演示

外部依赖:

在 pom.xml 中添加 Kafka 依赖:

<dependency>    <groupId>org.springframework.kafka</groupId>    <artifactId>spring-kafka</artifactId></dependency>

由于spring-boot-starter-parent 指定的版本号是2.1.5.RELEASE,spring boot 会对外部框架的版本号统一管理,spring-kafka 引入的版本是 2.2.6.RELEASE

配置文件:

在配置文件 application.yaml 中配置 Kafka 的相关参数,具体内容如下:

Spring:  kafka:    bootstrap-servers: localhost:9092    producer:      retries: 3  # 生产者发送失败时,重试次数      batch-size: 16384      buffer-memory: 33554432      key-serializer: org.apache.kafka.common.serialization.StringSerializer # 生产者消息key和消息value的序列化处理类      value-serializer: org.apache.kafka.common.serialization.StringSerializer    consumer:      group-id: tomge-consumer-group  # 默认消费者group id      auto-offset-reset: earliest      enable-auto-commit: true      auto-commit-interval: 100      key-deserializer: org.apache.kafka.common.serialization.StringDeserializer      value-deserializer: org.apache.kafka.common.serialization.StringDeserializer

对应的配置类 org.springframework.boot.autoconfigure.kafka.KafkaProperties,来初始化kafka相关的bean实例对象,并注册到spring容器中。

发送消息:

Spring Boot 作为一款支持快速开发的集成性框架,同样提供了一批以 -Template 命名的模板工具类用于实现消息通信。对于 Kafka 而言,这个工具类就是KafkaTemplate

KafkaTemplate 提供了一系列 send 方法用来发送消息,典型的 send 方法定义如下代码所示:

public ListenableFuture<SendResult<K, V>> send(String topic, @Nullable V data) { 。。。。 省略}

生产端提供了一个restful接口,模拟发送一条创建新用户消息。

@GetMapping("/add_user")public Object add() {    try {        Long id = Long.valueOf(new Random().nextInt(1000));        User user = User.builder().id(id).userName("TomGE").age(29).address("上海").build();        ListenableFuture<SendResult> listenableFuture = kafkaTemplate.send(addUserTopic, JSON.tojsONString(user));                // 提供回调方法,可以监控消息的成功或失败的后续处理        listenableFuture.addCallback(new ListenableFutureCallback<SendResult>() {            @Override            public void onFailure(Throwable throwable) {                System.out.println("发送消息失败," + throwable.getMessage());            }            @Override            public void onSuccess(SendResult sendResult) {                // 消息发送到的topic                String topic = sendResult.getRecordMetadata().topic();                // 消息发送到的分区                int partition = sendResult.getRecordMetadata().partition();                // 消息在分区内的offset                long offset = sendResult.getRecordMetadata().offset();                System.out.println(String.format("发送消息成功,topc:%s, partition: %s, offset:%s ", topic, partition, offset));            }        });        return "消息发送成功";    } catch (Exception e) {        e.printStackTrace();        return "消息发送失败";    }}

实际上开发使用的Kafka默认允许自动创建Topic,创建Topic时默认的分区数量是1,可以通过server.properties文件中的num.partitions=1修改默认分区数量。在生产环境中通常会关闭自动创建功能,Topic需要由运维人员先创建好。

消费消息:

在 Kafka 中消息通过服务器推送给各个消费者,而 Kafka 的消费者在消费消息时,需要提供一个监听器(Listener)对某个 Topic 实现监听,从而获取消息,这也是 Kafka 消费消息的唯一方式。

定义一个消费类,在处理具体消息业务逻辑的方法上添加 @KafkaListener 注解,并配置要消费的topic,代码如下所示:

@Componentpublic class UserConsumer {    @KafkaListener(topics = "add_user")    public void receiveMesage(String content) {        System.out.println("消费消息:" + content);    }}

是不是很简单,添加kafka依赖、使用KafkaTemplate、@KafkaListener注解就完成消息的生产和消费,其实是SpringBoot在背后默默的做了很多工作,如果感兴趣可以研究下spring-boot-autoconfigure ,里面提供了常用开源框架的客户端实例封装。

“Spring Boot 集成 Kafkad的实现方法”的内容就介绍到这里了,感谢大家的阅读。如果想了解更多行业相关的知识可以关注编程网网站,小编将为大家输出更多高质量的实用文章!

--结束END--

本文标题: Spring Boot 集成 Kafkad的实现方法

本文链接: https://www.lsjlt.com/news/270343.html(转载时请注明来源链接)

有问题或投稿请发送至: 邮箱/279061341@qq.com    QQ/279061341

本篇文章演示代码以及资料文档资料下载

下载Word文档到电脑,方便收藏和打印~

下载Word文档
猜你喜欢
  • Spring Boot 集成 Kafkad的实现方法
    本篇内容介绍了“Spring Boot 集成 Kafkad的实现方法”的有关知识,在实际案例的操作过程中,不少人都会遇到这样的困境,接下来就让小编带领大家学习一下如何处理这些情况吧!希望大家仔细阅读,能够学有所成!Spring Boot 作...
    99+
    2023-06-14
  • Spring Boot 集成 Kafkad的实现示例
    目录前言 为什么使用kafka?业务场景基本架构 常用术语 代码演示 外部依赖:配置文件:发送消息:消费消息:演示工程代码 Spring Boot 作为主流微服务框架,拥有成熟的社区...
    99+
    2022-11-12
  • spring boot集成redisson的方法
    本文小编为大家详细介绍“spring boot集成redisson的方法”,内容详细,步骤清晰,细节处理妥当,希望这篇“spring boot集成redisson的方法”文章能帮助大家解决疑惑,下面跟着小编的思路慢慢深入...
    99+
    2023-06-29
  • spring boot集成测试的方法
    这篇文章主要介绍“spring boot集成测试的方法”,在日常操作中,相信很多人在spring boot集成测试的方法问题上存在疑惑,小编查阅了各式资料,整理出简单好用的操作方法,希望对大家解答”spri...
    99+
    2022-10-19
  • Spring Boot 集成PageHelper的使用方法
    目录前言:一、基本集成引入jar包Yml配置文件中添加相关配置封装相关分页方法示例代码前段传入参数执行结果二、分页中的排序字段如何防止SQL注入问题三、复杂的SQL分页语句四、分页失...
    99+
    2022-11-13
  • spring boot集成p6spy的最佳实践
    目录前言p6spy-spring-boot-starter快速集成第一步:导入依赖第二步:配置application.properties配置智能提示兼容原生所有配置项 前言 P6S...
    99+
    2022-11-13
  • Spring Boot集成springfox-swagger2构建restful API的方法教程
    前言之前跟大家分享了Spring MVC集成springfox-swagger2构建restful API,简单写了如何在springmvc中集成swagger2。这边记录下在springboot中如何集成swagger2。其实使用基本相同...
    99+
    2023-05-31
    springboot springfox-swagger2 restful
  • 开发脚手架集成Spring Boot Actuator监控的方法
    今天小编给大家分享一下开发脚手架集成Spring Boot Actuator监控的方法的相关知识点,内容详细,逻辑清晰,相信大部分人都还太了解这方面的知识,所以分享这篇文章给大家参考一下,希望大家阅读完这篇文章后有所收获...
    99+
    2023-06-30
  • spring boot集成WebSocket日志实时输出到web页面的方法
    本篇内容介绍了“spring boot集成WebSocket日志实时输出到web页面的方法”的有关知识,在实际案例的操作过程中,不少人都会遇到这样的困境,接下来就让小编带领大家学习一下如何处理这些情况吧!希望大家仔细阅读,能够学有...
    99+
    2023-06-29
  • Spring Boot 项目集成Redis的方式详解
    集成方式 使用Jedis Jedis是Redis官方推荐的面向Java的操作Redis的客户端,是对服务端直连后进行操作。如果直接使用Jedis进行连接,多线程环境下是非线程安全的...
    99+
    2022-11-12
  • Spring Boot集成JavaMailSender发送邮件功能的实现
    目录前言集成步骤添加依赖邮件配置信息邮件配置类代码实现发送简单邮件邮件实体类业务实现类测试类扩展功能发送Html内容的邮件业务实现类测试类发送带附件邮件业务实现类测试类发送模板邮件添...
    99+
    2022-11-13
  • Spring boot2.0 实现日志集成的方法(2)
    目录前言:logback.xml配置文件定义引用自定义logback.xml文件附加说明前言: 上一章Spring boot2.0 日志集成方法分享(1)讲解了s...
    99+
    2022-11-13
  • Spring boot2.0 实现日志集成的方法(3)
    目录前言具体实现定义日志注解定义日志切面基本使用输出信息总结前言 上一章Spring boot2.0 实现日志集成的方法(2)主要讲解了将日志信息根据类别输出到不...
    99+
    2022-11-13
  • spring-boot集成spring-security的oauth2如何实现github登录网站
    这篇文章主要为大家展示了“spring-boot集成spring-security的oauth2如何实现github登录网站”,内容简而易懂,条理清晰,希望能够帮助大家解决疑惑,下面让小编带领大家一起研究并学习一下“spring-boot集...
    99+
    2023-05-30
    spring boot oauth2.0
  • 新鲜速递:Spring Boot 3 项目快速集成 Spring Security 6的方法
    准备 Spring Boot 3正式版本已发版了半个月,Spring Security6也一并更新,但是网络上的相关中文文档较少,盲目进行集成容易出错,所以本文讲如何快速集成。这里不再赘述Spring Boot3和Spring Securi...
    99+
    2023-08-25
    spring spring boot java
  • spring boot集成redisson的最佳实践示例
    目录前言集成jedis实例,xml方式集成前引用的jar spring bean配置xml集成redisson实例,java bean的方式集成前引入的jarjavabea...
    99+
    2022-11-13
  • Spring Boot集成Sorl搜索客户端的实现代码
    Apache Solr是一个搜索引擎。Spring Boot为solr客户端库及Spring Data Solr提供的基于solr客户端库的抽象提供了基本的配置。Spring Boot提供了一个用于聚集依赖的spring-boot-star...
    99+
    2023-05-30
    spring boot sorl
  • Spring Boot集成Shiro并使用SHA-256加密密码的方法
    这篇文章主要介绍“Spring Boot集成Shiro并使用SHA-256加密密码的方法”,在日常操作中,相信很多人在Spring Boot集成Shiro并使用SHA-256加密密码的方法问题上存在疑惑,小...
    99+
    2022-10-19
  • 教你在Spring Boot微服务中集成gRPC通讯的方法
    一、首先声明gRPC接口 这里引入的是最新的gRpc-core 1.37版本, 采用的grcp-spring-boot-starter封装的版本进行实现,github地址: http...
    99+
    2022-11-12
  • Spring Boot使用注解集成Redis缓存的方法是什么
    这篇文章主要介绍“Spring Boot使用注解集成Redis缓存的方法是什么”,在日常操作中,相信很多人在Spring Boot使用注解集成Redis缓存的方法是什么问题上存在疑惑,小编查阅了各式资料,整理出简单好用的操作方法,希望对大家...
    99+
    2023-06-04
软考高级职称资格查询
编程网,编程工程师的家园,是目前国内优秀的开源技术社区之一,形成了由开源软件库、代码分享、资讯、协作翻译、讨论区和博客等几大频道内容,为IT开发者提供了一个发现、使用、并交流开源技术的平台。
  • 官方手机版

  • 微信公众号

  • 商务合作