iis服务器助手广告广告
返回顶部
首页 > 资讯 > 精选 >Springboot如何集成Kafka进行批量消费
  • 912
分享到

Springboot如何集成Kafka进行批量消费

2023-06-22 02:06:17 912人浏览 泡泡鱼
摘要

本篇内容主要讲解“SpringBoot如何集成kafka进行批量消费”,感兴趣的朋友不妨来看看。本文介绍的方法操作简单快捷,实用性强。下面就让小编来带大家学习“springboot如何集成Kafka进行批量消费”吧!引入依赖<depe

本篇内容主要讲解“SpringBoot如何集成kafka进行批量消费”,感兴趣的朋友不妨来看看。本文介绍的方法操作简单快捷,实用性强。下面就让小编来带大家学习springboot如何集成Kafka进行批量消费”吧!

引入依赖

<dependency>                <groupId>org.springframework.kafka</groupId>                <artifactId>spring-kafka</artifactId>                <version>1.3.11.RELEASE</version>            </dependency>

因为我的项目的 springboot 版本是 1.5.22.RELEASE,所以引的是 1.3.11.RELEASE 的包。读者可以根据下图来自行选择对应的版本。图片更新可能不及时,详情可查看spring-kafka 官方网站。

Springboot如何集成Kafka进行批量消费

注:这里有个踩坑点,如果引入包版本不对,项目启动时会抛出org.springframework.core.log.LogAccessor 异常:

java.lang.ClassNotFoundException: org.springframework.core.log.LogAccessor

创建配置类

    @Configuration    @EnableKafka    public class KafkaConsumerConfig {        private static final org.slf4j.Logger LOGGER = LoggerFactory.getLogger(KafkaConsumerConfig.class);        @Value("${kafka.bootstrap.servers}")        private String kafkaBootstrapServers;        @Value("${kafka.group.id}")        private String kafkaGroupId;        @Value("${kafka.topic}")        private String kafkaTopic;        public static final String CONFIG_PATH = "/home/admin/xxx/BOOT-INF/classes/kafka_client_jaas.conf";        public static final String LOCATION_PATH = "/home/admin/xxx/BOOT-INF/classes/kafka.client.truststore.jks";        @Bean        public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, String>> kafkaListenerContainerFactory() {            ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();            factory.setConsumerFactory(consumerFactory());            // 设置并发量,小于或者等于 Topic 的分区数            factory.setConcurrency(5);            // 设置为批量监听            factory.setBatchListener(Boolean.TRUE);            factory.getContainerProperties().setPollTimeout(30000);            return factory;        }        public ConsumerFactory<String, String> consumerFactory() {            return new DefaultKafkaConsumerFactory<>(consumerConfigs());        }        public Map<String, Object> consumerConfigs() {            Map<String, Object> props = new HashMap<>();            //设置接入点,请通过控制台获取对应Topic的接入点。            props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaBootstrapServers);            //设置SSL根证书的路径,请记得将XXX修改为自己的路径。            //与SASL路径类似,该文件也不能被打包到jar中。            System.setProperty("java.security.auth.login.config", CONFIG_PATH);            props.put(SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG, LOCATION_PATH);            //根证书存储的密码,保持不变。            props.put(SslConfigs.SSL_TRUSTSTORE_PASSWord_CONFIG, "KafkaOnsClient");            //接入协议,目前支持使用SASL_SSL协议接入。            props.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SASL_SSL");            //SASL鉴权方式,保持不变。            props.put(SaslConfigs.SASL_MECHANISM, "PLAIN");            // 自动提交            props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, Boolean.TRUE);            //两次Poll之间的最大允许间隔。            //消费者超过该值没有返回心跳,服务端判断消费者处于非存活状态,服务端将消费者从Consumer Group移除并触发Rebalance,默认30s。            props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 30000);            //设置单次拉取的量,走公网访问时,该参数会有较大影响。            props.put(ConsumerConfig.MAX_PARTITION_FETCH_BYTES_CONFIG, 32000);            props.put(ConsumerConfig.FETCH_MAX_BYTES_CONFIG, 32000);            //每次Poll的最大数量。            //注意该值不要改得太大,如果Poll太多数据,而不能在下次Poll之前消费完,则会触发一次负载均衡,产生卡顿。            props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 30);            //消息的反序列化方式。            props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");            props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");            //当前消费实例所属的消费组,请在控制台申请之后填写。            //属于同一个组的消费实例,会负载消费消息。            props.put(ConsumerConfig.GROUP_ID_CONFIG, kafkaGroupId);            //Hostname校验改成空。            props.put(SslConfigs.SSL_ENDPOINT_IDENTIFICATION_ALGoRITHM_CONFIG, "");            return props;        }    }

注:此处通过 factory.setConcurrency(5); 配置了并发量为 5 ,假设我们线上的 Topic 有 12 个分区。那么将会是 3 个线程分配到 2 个分区,2 个线程分配到 3 个分区,3 * 2 + 2 * 3 = 12。

Kafka 消费者

    @Component    public class KafkaMessageListener {        private static final Logger LOGGER = LoggerFactory.getLogger(KafkaMessageListener.class);        @KafkaListener(topics = {"${kafka.topic}"})        public void listen(List<ConsumerRecord<String, String>> recordList) {            for (ConsumerRecord<String,String> record : recordList) {                // 打印消息的分区以及偏移量                LOGGER.info("Kafka Consume partition:{}, offset:{}", record.partition(), record.offset());                String value = record.value();                System.out.println("value = " + value);                // 处理业务逻辑 ...            }        }    }

因为我在配置类中设置了批量监听,所以此处 listen 方法的入参是List:List<ConsumerRecord<String, String>>。

到此,相信大家对“Springboot如何集成Kafka进行批量消费”有了更深的了解,不妨来实际操作一番吧!这里是编程网网站,更多相关内容可以进入相关频道进行查询,关注我们,继续学习!

--结束END--

本文标题: Springboot如何集成Kafka进行批量消费

本文链接: https://www.lsjlt.com/news/301854.html(转载时请注明来源链接)

有问题或投稿请发送至: 邮箱/279061341@qq.com    QQ/279061341

本篇文章演示代码以及资料文档资料下载

下载Word文档到电脑,方便收藏和打印~

下载Word文档
猜你喜欢
  • Springboot如何集成Kafka进行批量消费
    本篇内容主要讲解“Springboot如何集成Kafka进行批量消费”,感兴趣的朋友不妨来看看。本文介绍的方法操作简单快捷,实用性强。下面就让小编来带大家学习“Springboot如何集成Kafka进行批量消费”吧!引入依赖<depe...
    99+
    2023-06-22
  • Springboot集成Kafka进行批量消费及踩坑点
    目录引入依赖创建配置类Kafka 消费者引入依赖 <dependency> <groupId>or...
    99+
    2022-11-12
  • 如何在spring中使用kafka对消费者进行监听
    这期内容当中小编将会给大家带来有关如何在spring中使用kafka对消费者进行监听,文章内容丰富且以专业的角度为大家分析和叙述,阅读完这篇文章希望大家可以有所收获。整合过程 引入spring-kafka的依赖包 <depe...
    99+
    2023-06-06
  • 使用springboot如何实现对activemq进行集成
    这篇文章给大家介绍使用springboot如何实现对activemq进行集成,内容非常详细,感兴趣的小伙伴们可以参考借鉴,希望对大家能有所帮助。ActiveMQActiveMQ 是Apache出品,最流行的,能力强劲的开源消息总线。Acti...
    99+
    2023-05-31
    springboot activemq
  • 如何进行SpringBoot开发的集成参数校验
    这期内容当中小编将会给大家带来有关如何进行SpringBoot开发的集成参数校验,文章内容丰富且以专业的角度为大家分析和叙述,阅读完这篇文章希望大家可以有所收获。对于 web服务来说,为防止非法参数对业务造...
    99+
    2022-10-19
软考高级职称资格查询
编程网,编程工程师的家园,是目前国内优秀的开源技术社区之一,形成了由开源软件库、代码分享、资讯、协作翻译、讨论区和博客等几大频道内容,为IT开发者提供了一个发现、使用、并交流开源技术的平台。
  • 官方手机版

  • 微信公众号

  • 商务合作