iis服务器助手广告广告
返回顶部
首页 > 资讯 > 精选 >spring kafka框架中@KafkaListener注解怎么使用
  • 620
分享到

spring kafka框架中@KafkaListener注解怎么使用

2023-07-05 04:07:14 620人浏览 泡泡鱼
摘要

这篇文章主要介绍“spring kafka框架中@KafkaListener注解怎么使用”,在日常操作中,相信很多人在spring kafka框架中@KafkaListener注解怎么使用问题上存在疑惑,小编查阅了各式资

这篇文章主要介绍“spring kafka框架中@KafkaListener注解怎么使用”,在日常操作中,相信很多人在spring kafka框架中@KafkaListener注解怎么使用问题上存在疑惑,小编查阅了各式资料,整理出简单好用的操作方法,希望对大家解答”spring kafka框架中@KafkaListener注解怎么使用”的疑惑有所帮助!接下来,请跟着小编一起来学习吧!

    简介

    Kafka 目前主要作为一个分布式的发布订阅式的消息系统使用,也是目前最流行的消息队列系统之一。因此,也越来越多的框架对 kafka 做了集成,比如本文将要说到的 spring-kafka。

    Kafka 既然作为一个消息发布订阅系统,就包括消息生成者和消息消费者。本文主要讲述的 spring-kafka 框架的 kafkaListener 注解的深入解读和使用案例。

    解读

    源码解读

    @Target({ ElementType.TYPE, ElementType.METHOD, ElementType.ANNOTATioN_TYPE })@Retention(RetentionPolicy.RUNTIME)@MessageMapping@Documented@Repeatable(KafkaListeners.class)public @interface KafkaListener {      String id() default "";      String containerFactory() default "";      String[] topics() default {};      String topicPattern() default "";      TopicPartition[] topicPartitions() default {};      String containerGroup() default "";      String errorHandler() default "";      String groupId() default "";      boolean idIsGroup() default true;      String clientIdPrefix() default "";      String beanRef() default "__listener";}

    使用案例

    ConsumerRecord 类消费

    使用 ConsumerRecord 类接收有一定的好处,ConsumerRecord 类里面包含分区信息、消息头、消息体等内容,如果业务需要获取这些参数时,使用 ConsumerRecord 会是个不错的选择。如果使用具体的类型接收消息体则更加方便,比如说用 String 类型去接收消息体。

    这里我们编写一个 Listener 方法,监听 "topic1"Topic,并把 ConsumerRecord 里面所包含的内容打印到控制台中:

    @Componentpublic class Listener {    private static final Logger log = LoggerFactory.getLogger(Listener.class);    @KafkaListener(id = "consumer", topics = "topic1")    public void consumerListener(ConsumerRecord record) {        log.info("topic.quick.consumer receive : " + record.toString());    }}

    批量消费

    批量消费在现实业务场景中是很有实用性的。因为批量消费可以增大 kafka 消费吞吐量, 提高性能。

    批量消费实现步骤:

    重新创建一份新的消费者配置,配置为一次拉取 10 条消息

    创建一个监听容器工厂,命名为:batchContainerFactory,设置其为批量消费并设置并发量为 5,这个并发量根据分区数决定,必须小于等于分区数,否则会有线程一直处于空闲状态。

    创建一个分区数为 8 的 Topic。

    创建监听方法,设置消费 id 为 “batchConsumer”,clientID 前缀为“batch”,监听“batch”,使用“batchContainerFactory” 工厂创建该监听容器。

    @Componentpublic class BatchListener {    private static final Logger log= LoggerFactory.getLogger(BatchListener.class);    private Map consumerProps() {        Map props = new HashMap<>();        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");        props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true);        props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "1000");        props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "15000");        //一次拉取消息数量        props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, "10");        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,                NumberDeserializers.IntegerDeserializer.class);        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,                StringDeserializer.class);        return props;    }    @Bean("batchContainerFactory")    public ConcurrentKafkaListenerContainerFactory listenerContainer() {        ConcurrentKafkaListenerContainerFactory container                = new ConcurrentKafkaListenerContainerFactory();        container.setConsumerFactory(new DefaultKafkaConsumerFactory(consumerProps()));        //设置并发量,小于或等于Topic的分区数        container.setConcurrency(5);        //必须 设置为批量监听        container.setBatchListener(true);        return container;    }    @Bean    public NewTopic batchTopic() {        return new NewTopic("topic.batch", 8, (short) 1);    }    @KafkaListener(id = "batchConsumer",clientIdPrefix = "batch"            ,topics = {"topic.batch"},containerFactory = "batchContainerFactory")    public void batchListener(List data) {        log.info("topic.batch  receive : ");        for (String s : data) {            log.info(  s);        }    }}

    监听 Topic 中指定的分区

    使用 @KafkaListener 注解的 topicPartitions 属性监听不同的 partition 分区。

    @TopicPartition:topic-- 需要监听的 Topic 的名称,partitions &ndash; 需要监听 Topic 的分区 id。

    partitionOffsets &ndash; 可以设置从某个偏移量开始监听,@PartitionOffset:partition &ndash; 分区 Id,非数组,initialOffset &ndash; 初始偏移量。

    @Beanpublic NewTopic batchWithPartitionTopic() {    return new NewTopic("topic.batch.partition", 8, (short) 1);}@KafkaListener(id = "batchWithPartition",clientIdPrefix = "bwp",containerFactory = "batchContainerFactory",        topicPartitions = {                @TopicPartition(topic = "topic.batch.partition",partitions = {"1","3"}),                @TopicPartition(topic = "topic.batch.partition",partitions = {"0","4"},                        partitionOffsets = @PartitionOffset(partition = "2",initialOffset = "100"))        })public void batchListenerWithPartition(List data) {    log.info("topic.batch.partition  receive : ");    for (String s : data) {        log.info(s);    }}

    注解方式获取消息头及消息体

    当你接收的消息包含请求头,以及你监听方法需要获取该消息非常多的字段时可以通过这种方式。。这里使用的是默认的监听容器工厂创建的,如果你想使用批量消费,把对应的类型改为 List 即可,比如 List data , List key。

    @Payload:获取的是消息的消息体,也就是发送内容

    @Header(KafkaHeaders.RECEIVED_MESSAGE_KEY):获取发送消息的 key

    @Header(KafkaHeaders.RECEIVED_PARTITION_ID):获取当前消息是从哪个分区中监听到的

    @Header(KafkaHeaders.RECEIVED_TOPIC):获取监听的 TopicName

    @Header(KafkaHeaders.RECEIVED_TIMESTAMP):获取时间戳

    @KafkaListener(id = "params", topics = "topic.params")public void otherListener(@Payload String data,                         @Header(KafkaHeaders.RECEIVED_MESSAGE_KEY) Integer key,                         @Header(KafkaHeaders.RECEIVED_PARTITION_ID) int partition,                         @Header(KafkaHeaders.RECEIVED_TOPIC) String topic,                         @Header(KafkaHeaders.RECEIVED_TIMESTAMP) long ts) {    log.info("topic.params receive : \n"+            "data : "+data+"\n"+            "key : "+key+"\n"+            "partitionId : "+partition+"\n"+            "topic : "+topic+"\n"+            "timestamp : "+ts+"\n"    );}

    使用 Ack 机制确认消费

    Kafka 是通过最新保存偏移量进行消息消费的,而且确认消费的消息并不会立刻删除,所以我们可以重复的消费未被删除的数据,当第一条消息未被确认,而第二条消息被确认的时候,Kafka 会保存第二条消息的偏移量,也就是说第一条消息再也不会被监听器所获取,除非是根据第一条消息的偏移量手动获取。Kafka 的 ack 机制可以有效的确保消费不被丢失。因为自动提交是在 kafka 拉取到数据之后就直接提交,这样很容易丢失数据,尤其是在需要事物控制的时候。

    使用 Kafka 的 Ack 机制比较简单,只需简单的三步即可:

    • 设置 ENABLE_AUTO_COMMIT_CONFIG=false,禁止自动提交

    • 设置 AckMode=MANUAL_IMMEDIATE

    • 监听方法加入 Acknowledgment ack 参数

    4.使用 Consumer.seek 方法,可以指定到某个偏移量的位置

    @Componentpublic class AckListener {    private static final Logger log = LoggerFactory.getLogger(AckListener.class);    private Map consumerProps() {        Map props = new HashMap<>();        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");        props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);        props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "1000");        props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "15000");        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, IntegerDeserializer.class);        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);        return props;    }    @Bean("ackContainerFactory")    public ConcurrentKafkaListenerContainerFactory ackContainerFactory() {        ConcurrentKafkaListenerContainerFactory factory = new ConcurrentKafkaListenerContainerFactory();        factory.setConsumerFactory(new DefaultKafkaConsumerFactory(consumerProps()));        factory.getContainerProperties().setAckMode(AbstractMessageListenerContainer.AckMode.MANUAL_IMMEDIATE);        factory.setConsumerFactory(new DefaultKafkaConsumerFactory(consumerProps()));        return factory;    }    @KafkaListener(id = "ack", topics = "topic.ack", containerFactory = "ackContainerFactory")    public void ackListener(ConsumerRecord record, Acknowledgment ack) {        log.info("topic.quick.ack receive : " + record.value());        ack.acknowledge();    }}

    解决重复消费

    上一节中使用 ack 手动提交偏移量时,假如 consumer 挂了重启,那它将从 committed offset 位置开始重新消费,而不是 consume offset 位置。这也就意味着有可能重复消费。

    在 0.9 客户端中,有 3 种 ack 策略:

    策略 1: 自动的,周期性的 ack。

    策略 2:consumer.commitSync(),调用 commitSync,手动同步 ack。每处理完 1 条消息,commitSync 1 次。

    策略 3:consumer. commitASync(),手动异步 ack。、

    那么使用策略 2,提交每处理完 1 条消息,就发送一次 commitSync。那这样是不是就可以解决 “重复消费” 了呢?如下代码:

    while (true) {        List buffer = new ArrayList<>();        ConsumerRecords records = consumer.poll(100);        for (ConsumerRecord record : records) {            buffer.add(record);        }        insertIntoDb(buffer);    //消除处理,存到db        consumer.commitSync();   //同步发送ack        buffer.clear();    }}

    答案是否定的!因为上面的 insertIntoDb 和 commitSync 做不到原子操作:如果在数据处理完成,commitSync 的时候挂了,服务器再次重启,消息仍然会重复消费。

         那么如何解决重复消费的问题呢?答案是自己保存 committed offset,而不是依赖 kafka 的集群保存 committed offset,把消息的处理和保存 offset 做成一个原子操作,并且对消息加入唯一 id, 进行判重。

    依照官方文档, 要自己保存偏移量, 需要:

    • enable.auto.commit=false, 禁用自动 ack。

    • 每次取到消息,把对应的 offset 存下来。

    • 下次重启,通过 consumer.seek 函数,定位到自己保存的 offset,从那开始消费。

    • 更进一步处理可以对消息加入唯一 id, 进行判重。

    到此,关于“spring kafka框架中@KafkaListener注解怎么使用”的学习就结束了,希望能够解决大家的疑惑。理论与实践的搭配能更好的帮助大家学习,快去试试吧!若想继续学习更多相关知识,请继续关注编程网网站,小编会继续努力为大家带来更多实用的文章!

    --结束END--

    本文标题: spring kafka框架中@KafkaListener注解怎么使用

    本文链接: https://www.lsjlt.com/news/349401.html(转载时请注明来源链接)

    有问题或投稿请发送至: 邮箱/279061341@qq.com    QQ/279061341

    本篇文章演示代码以及资料文档资料下载

    下载Word文档到电脑,方便收藏和打印~

    下载Word文档
    猜你喜欢
    • spring kafka框架中@KafkaListener注解怎么使用
      这篇文章主要介绍“spring kafka框架中@KafkaListener注解怎么使用”,在日常操作中,相信很多人在spring kafka框架中@KafkaListener注解怎么使用问题上存在疑惑,小编查阅了各式资...
      99+
      2023-07-05
    • spring kafka @KafkaListener详解与使用过程
      目录说明@KafkaListener详解id 监听器的idgroupId 消费组名errorHandler 异常处理containerFactory 监听器工厂properties ...
      99+
      2023-02-20
      spring kafka使用 spring kafka @KafkaListener @KafkaListener使用
    • springkafka框架中@KafkaListener注解解读和使用案例
      目录简介解读源码解读使用案例ConsumerRecord 类消费批量消费批量消费实现步骤:监听 Topic 中指定的分区注解方式获取消息头及消息体使用 Ack 机制确认消费解决重复消...
      99+
      2023-02-20
      @KafkaListener 注解 kafka @KafkaListener 注解 kafka @KafkaListener
    • Spring框架中@PostConstruct注解详解
      目录初始化方式一:@PostConstruct注解初始化方式二:实现InitializingBean接口补充:@PostConstruct注释规则总结初始化方式一:@PostCons...
      99+
      2024-04-02
    • 怎么在java中使用Spring框架
      怎么在java中使用Spring框架?相信很多没有经验的人对此束手无策,为此本文总结了问题出现的原因和解决方法,通过这篇文章希望你能解决这个问题。Java是什么Java是一门面向对象编程语言,可以编写桌面应用程序、Web应用程序、分布式系统...
      99+
      2023-06-14
    • 怎么在Spring框架中注入依赖
      这篇文章将为大家详细讲解有关怎么在Spring框架中注入依赖,文章内容质量较高,因此小编分享给大家做个参考,希望大家阅读完这篇文章后对相关知识有一定的了解。三种依赖注入的方式属性注入,通过setter方法注入bean的属性值或依赖的对象 构...
      99+
      2023-05-30
      spring
    • spring框架怎么使用redis
      "spring框架使用redis的方法:在pom.xml中导入redis的相关依赖,例如:<dependency><groupId>redis.clients</groupId><arti...
      99+
      2024-04-02
    • Spring Boot中怎么使用Spring Retry重试框架
      今天小编给大家分享一下Spring Boot中怎么使用Spring Retry重试框架的相关知识点,内容详细,逻辑清晰,相信大部分人都还太了解这方面的知识,所以分享这篇文章给大家参考一下,希望大家阅读完这篇文章后有所收获...
      99+
      2023-06-30
    • Spring Boot中怎么使用Spring-Retry重试框架
      这篇文章主要介绍“Spring Boot中怎么使用Spring-Retry重试框架”,在日常操作中,相信很多人在Spring Boot中怎么使用Spring-Retry重试框架问题上存在疑惑,小编查阅了各式资料,整理出简...
      99+
      2023-06-30
    • 使用Spring注入Hibernate验证框架
      目录Spring注入Hibernate验证框架Spring配置文件Hibernate内置的验证约束注解如下表所示springmvc使用Hibernate的校验框架validation...
      99+
      2024-04-02
    • 【Spring框架全系列】方法注解@Bean的使用
      📬📬哈喽,大家好,我是小浪。上篇博客我们介绍了五大类注解的使用方法,以及如何解决Spring使用五大类注解生成bean-Name的问题;那么,谈到如何更简单的读取和存储对象,这里我们还需要介绍另外一个"方...
      99+
      2023-09-27
      java spring boot spring
    • 怎么使用Spring Boot Kafka
      本篇内容介绍了“怎么使用Spring Boot  Kafka”的有关知识,在实际案例的操作过程中,不少人都会遇到这样的困境,接下来就让小编带领大家学习一下如何处理这些情况吧!希望大家仔细阅读,能够...
      99+
      2024-04-02
    • Spring框架学习常用注解汇总
      目录类注解方法或属性上注解参数注解类注解 @component 标注类,泛指各种组件,类不属于各种分类的时候,用它做标注。 @Service 标注类,声明该类为业务层组件,用于处理业...
      99+
      2024-04-02
    • 如何使用Spring注入Hibernate验证框架
      本篇内容介绍了“如何使用Spring注入Hibernate验证框架”的有关知识,在实际案例的操作过程中,不少人都会遇到这样的困境,接下来就让小编带领大家学习一下如何处理这些情况吧!希望大家仔细阅读,能够学有所成!Spring注入Hibern...
      99+
      2023-06-21
    • Spring Boot Rest常用框架注解有哪些
      本篇内容主要讲解“Spring Boot Rest常用框架注解有哪些”,感兴趣的朋友不妨来看看。本文介绍的方法操作简单快捷,实用性强。下面就让小编来带大家学习“Spring Boot Rest常用框架...
      99+
      2023-07-02
    • Kafka和Storm怎么在Spring boot中使用
      这篇文章给大家介绍Kafka和Storm怎么在Spring boot中使用,内容非常详细,感兴趣的小伙伴们可以参考借鉴,希望对大家能有所帮助。使用工具及环境配置 java 版本jdk-1.8 编译工具使用IDEA-2017 maven作为项...
      99+
      2023-05-30
    • Spring @Cacheable注解中key怎么使用
      本篇内容介绍了“Spring @Cacheable注解中key怎么使用”的有关知识,在实际案例的操作过程中,不少人都会遇到这样的困境,接下来就让小编带领大家学习一下如何处理这些情况吧!希望大家仔细阅读,能够学有所成!Spring ...
      99+
      2023-06-22
    • Spring Boot中怎么使用@KafkaListener并发批量接收消息
      这篇“Spring Boot中怎么使用@KafkaListener并发批量接收消息”文章的知识点大部分人都不太理解,所以小编给大家总结了以下内容,内容详细,步骤清晰,具有一定的借鉴价值,希望大家阅读完这篇文章能有所收获,下面我们一...
      99+
      2023-07-05
    • 怎么在不使用spring框架中使用aop的功能
      本篇文章为大家展示了怎么在不使用spring框架中使用aop的功能,内容简明扼要并且容易理解,绝对能使你眼前一亮,通过这篇文章的详细介绍希望你能有所收获。Spring框架的AOP机制可以让开发者把业务流程中的通用功能抽取出来,单独编写功能代...
      99+
      2023-06-22
    • Spring Boot Rest常用框架注解详情简介
      目录开始Spring Boot Rest的先决条件在Spring Initializer创建Spring Boot项目Spring Boot注解@RestController@Req...
      99+
      2024-04-02
    软考高级职称资格查询
    编程网,编程工程师的家园,是目前国内优秀的开源技术社区之一,形成了由开源软件库、代码分享、资讯、协作翻译、讨论区和博客等几大频道内容,为IT开发者提供了一个发现、使用、并交流开源技术的平台。
    • 官方手机版

    • 微信公众号

    • 商务合作