iis服务器助手广告广告
返回顶部
首页 > 资讯 > 精选 >spring kafka @KafkaListener如何使用
  • 940
分享到

spring kafka @KafkaListener如何使用

2023-07-05 04:07:43 940人浏览 薄情痞子
摘要

今天小编给大家分享一下spring kafka @KafkaListener如何使用的相关知识点,内容详细,逻辑清晰,相信大部分人都还太了解这方面的知识,所以分享这篇文章给大家参考一下,希望大家阅读完这篇文章后有所收获,

今天小编给大家分享一下spring kafka @KafkaListener如何使用的相关知识点,内容详细,逻辑清晰,相信大部分人都还太了解这方面的知识,所以分享这篇文章给大家参考一下,希望大家阅读完这篇文章后有所收获,下面我们一起来了解一下吧。

说明

  • 从2.2.4版开始,您可以直接在注释上指定Kafka使用者属性,这些属性将覆盖在使用者工厂中配置的具有相同名称的所有属性。您不能通过这种方式指定group.id和client.id属性。他们将被忽略;

  • 可以使用#{…}或属性占位符(${…})在SpEL上配置注释上的大多数属性。

比如:

   @KafkaListener(id = "consumer-id",topics = "SHI_TOPIC1",concurrency = "${listen.concurrency:3}",            clientIdPrefix = "myClientId")

属性concurrency将会从容器中获取listen.concurrency的值,如果不存在就默认用3

@KafkaListener详解

id 监听器的id

①. 消费者线程命名规则

填写:

2020-11-19 14:24:15 c.d.b.k.KafkaListeners 120 [INFO] 线程:Thread[consumer-id5-1-C-1,5,main]-groupId:BASE-DEMO consumer-id5 消费

没有填写ID:

2020-11-19 10:41:26 c.d.b.k.KafkaListeners 137 [INFO] 线程:Thread[org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1,5,main] consumer-id7

②.在相同容器中的监听器ID不能重复

否则会报错

Caused by: java.lang.IllegalStateException: Another endpoint is already reGIStered with id

③.会覆盖消费者工厂的消费组GroupId

假如配置文件属性配置了消费组kafka.consumer.group-id=BASE-DEMO
正常情况它是该容器中的默认消费组
但是如果设置了 @KafkaListener(id = "consumer-id7", topics = {"SHI_TOPIC3"})
那么当前消费者的消费组就是consumer-id7 ;

当然如果你不想要他作为groupId的话 可以设置属性idIsGroup = false;那么还是会使用默认的GroupId;

④. 如果配置了属性groupId,则其优先级最高

 @KafkaListener(id = "consumer-id5",idIsGroup = false,topics = "SHI_TOPIC3",groupId = "groupId-test")

例如上面代码中最终这个消费者的消费组GroupId是 “groupId-test”

该id属性(如果存在)将用作Kafka消费者group.id属性,并覆盖消费者工厂中的已配置属性(如果存在)您还可以groupId显式设置或将其设置idIsGroup为false,以恢复使用使用者工厂的先前行为group.id。

groupId 消费组名

指定该消费组的消费组名; 关于消费组名的配置可以看看上面的 id 监听器的id

如何获取消费者 group.id

在监听器中调用KafkaUtils.getConsumerGroupId()可以获得当前的groupId; 可以在日志中打印出来; 可以知道是哪个客户端消费的;

topics 指定要监听哪些topic(与topicPattern、topicPartitions 三选一)

可以同时监听多个
topics = {"SHI_TOPIC3","SHI_TOPIC4"}

topicPattern 匹配Topic进行监听(与topics、topicPartitions 三选一) topicPartitions 显式分区分配

可以为监听器配置明确的主题和分区(以及可选的初始偏移量)

@KafkaListener(id = "thing2", topicPartitions =        { @TopicPartition(topic = "topic1", partitions = { "0", "1" }),          @TopicPartition(topic = "topic2", partitions = "0",             partitionOffsets = @PartitionOffset(partition = "1", initialOffset = "100"))        })public void listen(ConsumerRecord<?, ?> record) {    ...}

上面例子意思是 监听topic1的0,1分区;监听topic2的第0分区,并且第1分区从offset为100的开始消费;

errorHandler 异常处理

实现KafkaListenerErrorHandler; 然后做一些异常处理;

@Componentpublic class KafkaDefaultListenerErrorHandler implements KafkaListenerErrorHandler {    @Override    public Object handleError(Message<?> message, ListenerExecutionFailedException exception) {        return null;    }    @Override    public Object handleError(Message<?> message, ListenerExecutionFailedException exception, Consumer<?, ?> consumer) {    //do someting        return null;    }}

调用的时候 填写beanName;例如errorHandler="kafkaDefaultListenerErrorHandler"

containerFactory 监听器工厂

指定生成监听器的工厂类;

例如我写一个 批量消费的工厂类

        @Bean    public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<Integer, String>> batchFactory() {        ConcurrentKafkaListenerContainerFactory<Integer, String> factory =                new ConcurrentKafkaListenerContainerFactory<>();        factory.setConsumerFactory(kafkaConsumerFactory());        //设置为批量消费,每个批次数量在Kafka配置参数中设置ConsumerConfig.MAX_POLL_RECORDS_CONFIG        factory.setBatchListener(true);        return factory;    }

使用containerFactory = "batchFactory"

clientIdPrefix 客户端前缀

会覆盖消费者工厂的kafka.consumer.client-id属性; 最为前缀后面接 -n n是数字

concurrency并发

会覆盖消费者工厂中的concurrency ,这里的并发数就是多线程消费; 比如说单机情况下,你设置了3; 相当于就是启动了3个客户端来分配消费分区;分布式情况 总线程数=concurrency*机器数量; 并不是设置越多越好,具体如何设置请看Java concurrency之集合

        @Bean    public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<Integer, String>> concurrencyFactory() {        ConcurrentKafkaListenerContainerFactory<Integer, String> factory =                new ConcurrentKafkaListenerContainerFactory<>();        factory.setConsumerFactory(kafkaConsumerFactory());        factory.setConcurrency(6);        return factory;    }
    @KafkaListener(id = "consumer-id5",idIsGroup = false,topics = "SHI_TOPIC3", containerFactory = "concurrencyFactory",concurrency = "1)

虽然使用的工厂是concurrencyFactory(concurrency配置了6); 但是他最终生成的监听器数量 是1;

properties 配置其他属性

kafka中的属性看org.apache.kafka.clients.consumer.ConsumerConfig ;
同名的都可以修改掉;

用法

    @KafkaListener(id = "consumer-id5",idIsGroup = false,topics = "SHI_TOPIC3", containerFactory = "concurrencyFactory",concurrency = "1"            , clientIdPrefix = "myClientId5",groupId = "groupId-test",            properties = {                    "enable.auto.commit:false","max.poll.interval.ms:6000" },errorHandler="kafkaDefaultListenerErrorHandler")

@KafkaListener使用

KafkaListenerEndpointRegistry

    @Autowired    private KafkaListenerEndpointRegistry registry;       //.... 获取所有注册的监听器        registry.getAllListenerContainers();

设置入参验证器

当您将Spring Boot与验证启动器一起使用时,将LocalValidatorFactoryBean自动配置:如下

@Configuration@EnableKafkapublic class Config implements KafkaListenerConfigurer {    @Autowired    private LocalValidatorFactoryBean validator;    ...    @Override    public void configureKafkaListeners(KafkaListenerEndpointRegistrar registrar) {      registrar.setValidator(this.validator);    }}

使用

@KafkaListener(id="validated", topics = "annotated35", errorHandler = "validationErrorHandler",      containerFactory = "kafkaJSONListenerContainerFactory")public void validatedListener(@Payload @Valid ValidatedClass val) {    ...}@Beanpublic KafkaListenerErrorHandler validationErrorHandler() {    return (m, e) -> {        ...    };}

spring-kafka官方文档

扩展:Spring for Apache Kafka @KafkaListener使用及注意事项

官方文档:   https://docs.spring.io/spring-kafka/reference/html/

 @KafkaListener

The @KafkaListener annotation is used to designate a bean method as a listener for a listener container. The bean is wrapped in a MessagingMessageListenerAdapter configured with various features, such as converters to convert the data, if necessary, to match the method parameters.

If, say, six TopicPartition instances are provided and the concurrency is 3; each container gets two partitions. For five TopicPartition instances, two containers get two partitions, and the third gets one. If the concurrency is greater than the number of TopicPartitions, the concurrency is adjusted down such that each container gets one partition.

You can now configure a KafkaListenerErrorHandler to handle exceptions. See Handling Exceptions for more infORMation.

By default, the @KafkaListener id property is now used as the group.id property, overriding the property configured in the consumer factory (if present). Further, you can explicitly configure the groupId on the annotation. Previously, you would have needed a separate container factory (and consumer factory) to use different group.id values for listeners. To restore the previous behavior of using the factory configured group.id, set the idIsGroup property on the annotation to false.

示例:

   demo类:

public class Listener {    @KafkaListener(id = "foo", topics = "myTopic", clientIdPrefix = "myClientId")    public void listen(String data) {        ...    }}</code>配置类及注解:
@Configuration@EnableKafkapublic class KafkaConfig {    @Bean    KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<Integer, String>>                        kafkaListenerContainerFactory() {        ConcurrentKafkaListenerContainerFactory<Integer, String> factory =                                new ConcurrentKafkaListenerContainerFactory<>();        factory.setConsumerFactory(consumerFactory());        factory.setConcurrency(3);        factory.getContainerProperties().setPollTimeout(3000);        return factory;    }    @Bean    public ConsumerFactory<Integer, String> consumerFactory() {        return new DefaultKafkaConsumerFactory<>(consumerConfigs());    }    @Bean    public Map<String, Object> consumerConfigs() {        Map<String, Object> props = new HashMap<>();        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, embeddedKafka.getBrokersAsString());        ...        return props;    }}

以上就是“spring kafka @KafkaListener如何使用”这篇文章的所有内容,感谢各位的阅读!相信大家阅读完这篇文章都有很大的收获,小编每天都会为大家更新不同的知识,如果还想学习更多的知识,请关注编程网精选频道。

--结束END--

本文标题: spring kafka @KafkaListener如何使用

本文链接: https://www.lsjlt.com/news/349389.html(转载时请注明来源链接)

有问题或投稿请发送至: 邮箱/279061341@qq.com    QQ/279061341

本篇文章演示代码以及资料文档资料下载

下载Word文档到电脑,方便收藏和打印~

下载Word文档
猜你喜欢
  • spring kafka @KafkaListener如何使用
    今天小编给大家分享一下spring kafka @KafkaListener如何使用的相关知识点,内容详细,逻辑清晰,相信大部分人都还太了解这方面的知识,所以分享这篇文章给大家参考一下,希望大家阅读完这篇文章后有所收获,...
    99+
    2023-07-05
  • spring kafka @KafkaListener详解与使用过程
    目录说明@KafkaListener详解id 监听器的idgroupId 消费组名errorHandler 异常处理containerFactory 监听器工厂properties ...
    99+
    2023-02-20
    spring kafka使用 spring kafka @KafkaListener @KafkaListener使用
  • spring kafka框架中@KafkaListener注解怎么使用
    这篇文章主要介绍“spring kafka框架中@KafkaListener注解怎么使用”,在日常操作中,相信很多人在spring kafka框架中@KafkaListener注解怎么使用问题上存在疑惑,小编查阅了各式资...
    99+
    2023-07-05
  • spring Kafka中的@KafkaListener源码分析
    本篇内容主要讲解“spring Kafka中的@KafkaListener源码分析”,感兴趣的朋友不妨来看看。本文介绍的方法操作简单快捷,实用性强。下面就让小编来带大家学习“spring Kafka中的@KafkaListener源码分析”...
    99+
    2023-07-05
  • Kafka之kafka-topics.sh如何使用
    本文小编为大家详细介绍“Kafka之kafka-topics.sh如何使用”,内容详细,步骤清晰,细节处理妥当,希望这篇“Kafka之kafka-topics.sh如何使用”文章能帮助大家解决疑惑,下面跟着小编的思路慢慢深入,一起来学习新知...
    99+
    2023-07-05
  • 怎么使用Spring Boot Kafka
    本篇内容介绍了“怎么使用Spring Boot  Kafka”的有关知识,在实际案例的操作过程中,不少人都会遇到这样的困境,接下来就让小编带领大家学习一下如何处理这些情况吧!希望大家仔细阅读,能够...
    99+
    2024-04-02
  • @KafkaListener怎么使用
    这篇文章主要介绍“@KafkaListener怎么使用”,在日常操作中,相信很多人在@KafkaListener怎么使用问题上存在疑惑,小编查阅了各式资料,整理出简单好用的操作方法,希望对大家解答”@KafkaListener怎么使用”的疑...
    99+
    2023-07-05
  • 如何在spring中使用kafka对消费者进行监听
    这期内容当中小编将会给大家带来有关如何在spring中使用kafka对消费者进行监听,文章内容丰富且以专业的角度为大家分析和叙述,阅读完这篇文章希望大家可以有所收获。整合过程 引入spring-kafka的依赖包 <depe...
    99+
    2023-06-06
  • 如何在spring boot中使用spring-kafka实现一个接收消息功能
    本篇文章为大家展示了如何在spring boot中使用spring-kafka实现一个接收消息功能,内容简明扼要并且容易理解,绝对能使你眼前一亮,通过这篇文章的详细介绍希望你能有所收获。实现方法pom.xml文件如下<&#63...
    99+
    2023-05-31
    springboot spring-kafka
  • java中Kafka如何使用
    这篇文章将为大家详细讲解有关java中Kafka如何使用,小编觉得挺实用的,因此分享给大家做个参考,希望大家阅读完这篇文章后可以有所收获。前言官方文档:http://kafka.apache.org/中文文档:https://kafka.a...
    99+
    2023-06-25
  • Spring Boot中怎么使用@KafkaListener并发批量接收消息
    这篇“Spring Boot中怎么使用@KafkaListener并发批量接收消息”文章的知识点大部分人都不太理解,所以小编给大家总结了以下内容,内容详细,步骤清晰,具有一定的借鉴价值,希望大家阅读完这篇文章能有所收获,下面我们一...
    99+
    2023-07-05
  • spring boot怎么与kafka结合使用
    spring boot怎么与kafka结合使用?相信很多没有经验的人对此束手无策,为此本文总结了问题出现的原因和解决方法,通过这篇文章希望你能解决这个问题。引入相关依赖<dependency> <groupId>o...
    99+
    2023-05-31
    springboot kafka
  • 如何使用spring boot整合kafka和延迟启动消费者
    这篇文章给大家分享的是有关如何使用spring boot整合kafka和延迟启动消费者的内容。小编觉得挺实用的,因此分享给大家做个参考,一起跟随小编过来看看吧。spring boot 整合kafka,延迟启动消费者spring boot整合...
    99+
    2023-06-20
  • Kafka和Storm怎么在Spring boot中使用
    这篇文章给大家介绍Kafka和Storm怎么在Spring boot中使用,内容非常详细,感兴趣的小伙伴们可以参考借鉴,希望对大家能有所帮助。使用工具及环境配置 java 版本jdk-1.8 编译工具使用IDEA-2017 maven作为项...
    99+
    2023-05-30
  • 如何使用Scala开发Apache Kafka
    本篇内容介绍了“如何使用Scala开发Apache Kafka”的有关知识,在实际案例的操作过程中,不少人都会遇到这样的困境,接下来就让小编带领大家学习一下如何处理这些情况吧!希望大家仔细阅读,能够学有所成!Apache Kafka是一个广...
    99+
    2023-06-02
  • 【Kafka】Kafka Stream简单使用
    一、实时流式计算 1. 概念 一般流式计算会与批量计算相比较。在流式计算模型中,输入是持续的,可以认为在时间上是无界的,也就意味着,永远拿不到全量数据去做计算。同时,计算结果是持续输出的,也即计算结果在时间上也是无界的。流式计算一般对实...
    99+
    2023-08-30
    java kafka 微服务 Kafka Stream 实时流式计算
  • 如何将Golang和Kafka结合使用
    Kafka是一个开源的分布式消息队列,在大数据应用中常常被用于构建实时数据流处理应用。而Golang则是Google开发的一种编程语言,以其高效的并发性、强大的库和生态系统而闻名。那么,如何使用Golang与Kafka进行结合呢?首先,我们...
    99+
    2023-05-14
  • 使用kafka如何选择分区数及kafka性能测试
    kafka选择分区数及kafka性能测试 1、简言 ​ 如何选择合适的分区,这是我们经常面临的问题,不过针对这个问题,在网上并没有搜到固定的答案。因此,今天在这里主要通过...
    99+
    2024-04-02
  • 如何使用docker搭建kafka环境
    这篇文章主要介绍“如何使用docker搭建kafka环境”,在日常操作中,相信很多人在如何使用docker搭建kafka环境问题上存在疑惑,小编查阅了各式资料,整理出简单好用的操作方法,希望对大家解答”如何使用docker搭建kafka环境...
    99+
    2023-06-19
  • 如何进行kafka的安装和使用
    这篇文章将为大家详细讲解有关如何进行kafka的安装和使用,文章内容质量较高,因此小编分享给大家做个参考,希望大家阅读完这篇文章后对相关知识有一定的了解。1.       kafk...
    99+
    2023-06-04
软考高级职称资格查询
编程网,编程工程师的家园,是目前国内优秀的开源技术社区之一,形成了由开源软件库、代码分享、资讯、协作翻译、讨论区和博客等几大频道内容,为IT开发者提供了一个发现、使用、并交流开源技术的平台。
  • 官方手机版

  • 微信公众号

  • 商务合作