广告
返回顶部
首页 > 资讯 > 操作系统 >kafka springBoot 配置
  • 438
分享到

kafka springBoot 配置

kafkaspringbootlinq 2023-08-30 10:08:14 438人浏览 八月长安
摘要

1、properties 配置 control.command.kafka.enabled=truecontrol.command.kafka.bootstrap-servers=172.0.0.1:9092control.command.

1、properties 配置

control.command.kafka.enabled=truecontrol.command.kafka.bootstrap-servers=172.0.0.1:9092control.command.kafka.command-topics=lastTopiccontrol.command.kafka.consumer.group-id=consumer-eslink-iwater-control-commandcontrol.command.kafka.consumer.properties.session.timeout.ms=30000control.command.kafka.consumer.properties.request.timeout.ms=90000control.command.kafka.consumer.fetch-min-size=10KBcontrol.command.kafka.consumer.fetch-max-wait=500control.command.kafka.consumer.max-poll-records=1000control.command.kafka.consumer.auto-offset-reset=earliestcontrol.command.kafka.listener.ack-mode=MANUAL_IMMEDIATEcontrol.command.kafka.listener.concurrency=1control.command.kafka.listener.type=SINGLEcontrol.command.kafka.producer.acks=allcontrol.command.kafka.producer.batchSize=4096control.command.kafka.producer.bufferMemory=40960control.command.kafka.producer.linger=10control.command.kafka.producer.retries=3

2、Config 配置

@Configuration@ConditionalOnExpression("${control.command.kafka.enabled:false}")@EnableKafkapublic class ControlCommandKafkaConfig {    @Bean("controlCommandKafkaProperties")    @ConfigurationProperties("control.command.kafka")    @Primary    public KafkaProperties kafkaProperties() {        return new KafkaProperties();    }    @Bean("controlCommandKafkaConsumerFactory")    public ConsumerFactory<Object, Object> kafkaConsumerFactory(            @Qualifier("controlCommandKafkaProperties") KafkaProperties inProps) {        DefaultKafkaConsumerFactory<Object, Object> consumerFactory = new DefaultKafkaConsumerFactory<>(inProps.buildConsumerProperties());        return consumerFactory;    }    @Bean("controlCommandBatchFactory")    @DependsOn("controlCommandKafkaProperties")    public KafkaListenerContainerFactory<?> egBatchFactory(@Qualifier("controlCommandKafkaProperties") KafkaProperties inProps,   @Qualifier("controlCommandKafkaConsumerFactory") ConsumerFactory consumerFactory) {        ConcurrentKafkaListenerContainerFactory<Object, Object> listenerFactory =                new ConcurrentKafkaListenerContainerFactory<>();        listenerFactory.setConsumerFactory(consumerFactory);        configureListenerFactory(listenerFactory, inProps);        configureContainer(listenerFactory.getContainerProperties(), inProps);        return listenerFactory;    }    @Bean("controlCommandKafkaTemplate")    public KafkaTemplate<String, String> kafkaTemplate(@Qualifier("controlCommandKafkaProperties") KafkaProperties inProps) {        return new KafkaTemplate(new DefaultKafkaProducerFactory(inProps.buildProducerProperties()));    }    private void configureListenerFactory(ConcurrentKafkaListenerContainerFactory<Object, Object> factory, KafkaProperties inProps) {        PropertyMapper map = PropertyMapper.get().alwaysApplyingWhenNonNull();        KafkaProperties.Listener properties = inProps.getListener();        // 设置监听线程并发数 control.command.kafka.listener.concurrency        map.from(properties::getConcurrency).to(factory::setConcurrency);        // control.kafka.listener.type=batch 时 批量监听        if (properties.getType().equals(KafkaProperties.Listener.Type.BATCH)) {            factory.setBatchListener(true);        }    }    private void configureContainer(ContainerProperties container, KafkaProperties inProps) {        PropertyMapper map = PropertyMapper.get().alwaysApplyingWhenNonNull();        KafkaProperties.Listener properties = inProps.getListener();        map.from(properties::getAckMode).to(container::setAckMode);        map.from(properties::getClientId).to(container::setClientId);        map.from(properties::getAckCount).to(container::setAckCount);        map.from(properties::getAckTime).as(Duration::toMillis).to(container::setAckTime);        map.from(properties::getPollTimeout).as(Duration::toMillis).to(container::setPollTimeout);        map.from(properties::getNoPollThreshold).to(container::setNoPollThreshold);        map.from(properties::getIdleEventInterval).as(Duration::toMillis).to(container::setIdleEventInterval);        map.from(properties::getMonitorInterval).as(Duration::getSeconds).as(Number::intValue)                .to(container::setMonitorInterval);        map.from(properties::getLoGContainerConfig).to(container::setLogContainerConfig);        map.from(properties::isMissingTopicsFatal).to(container::setMissingTopicsFatal);    }}

2.1 @EnableKafka 注解

‘@EnableKafka’ 是用于在 Spring Boot 应用程序中启用 Apache Kafka 的注解。当你在 spring Boot 应用程序的配置类上添加 @EnableKafka 注解时,它会激活 Kafka 基础设施,使你能够在应用程序中使用 Kafka 相关的组件。

以下是对这个注解的简要解释:

基础设施激活:@EnableKafka 注解告诉 Spring 在应用程序中设置所需的 Kafka 基础设施。这包括创建必要的 Kafka bean 和配置。

Kafka Template:在启用 Kafka 后,你可以使用 Spring Kafka 提供的 KafkaTemplate 类轻松地向 Kafka 主题发送消息。KafkaTemplate 抽象了底层的 Kafka 生产者 api,简化了消息发送的过程。

消息监听器:通过 @EnableKafka,你还可以使用 Spring 的 @KafkaListener 注解设置 Kafka 消息消费者。@KafkaListener 注解应用于 Spring 组件中的方法,使该方法可以作为 Kafka 消息监听器。当 Kafka 主题中有可用的消息时,带有 @KafkaListener 注解的方法将自动被调用,并处理消息内容。

下面是在 Spring Boot 应用程序中使用 @EnableKafka 的示例代码:

import org.springframework.boot.SpringApplication;import org.springframework.boot.autoconfigure.SpringBootApplication;import org.springframework.kafka.annotation.EnableKafka;@SpringBootApplication@EnableKafkapublic class MyKafkaApplication {    public static void main(String[] args) {        SpringApplication.run(MyKafkaApplication.class, args);    }}

通过这样的配置,你的 Spring Boot 应用程序将启用 Kafka 支持,你可以使用 KafkaTemplate 进行消息发送,使用 @KafkaListener 进行消息消费。请确保在 application.properties 或 application.yml 文件中配置 Kafka 相关属性,例如 Kafka 代理地址和其他必要的配置。

请注意,为了使 @EnableKafka 正常工作,你需要在项目的构建配置中包含所需的 Kafka 依赖项。对于 Maven,你可以添加以下依赖项:

<dependency>    <groupId>org.springframework.kafka</groupId>    <artifactId>spring-kafka</artifactId></dependency>

一旦 Kafka 正在运行,并且应用程序正确配置了 @EnableKafka,你就可以在 Spring Boot 应用程序中构建基于 Kafka 的消息传递功能。

2.2、@ConditionalOnExpression 注解

@ConditionalOnExpression 是 Spring Boot 中的一个条件注解之一。它允许你根据给定的 SpEL 表达式来决定是否启用或禁用某个 Bean 或配置。
条件注解可以用于在 Spring Boot 应用程序中根据特定条件来动态创建 Bean 或配置,从而根据不同的配置或环境来灵活地管理应用程序的行为。
@ConditionalOnExpression 的工作方式是:它在配置类或 Bean 上进行标记,然后在应用程序启动过程中解析 SpEL 表达式。如果 SpEL 表达式的结果为 true,则相关的 Bean 或配置将被启用,否则将被禁用。
以下是 @ConditionalOnExpression 的示例使用方式:

import org.springframework.context.annotation.Bean;import org.springframework.context.annotation.Conditional;import org.springframework.context.annotation.Configuration;@Configurationpublic class MyConfiguration {    @Bean    @ConditionalOnExpression("${myapp.feature.enabled:true}")    public MyBean myBean() {        // 返回需要创建的 Bean 实例        return new MyBean();    }}

2.3、 @RefreshScope 注解

@RefreshScope 是 spring cloud 中的一个注解,用于实现动态刷新 Spring Bean 的配置信息。它通常与 Spring Cloud Config 配合使用,能够在运行时更新应用程序配置,而无需重启应用。
微服务架构中,使用 Spring Cloud Config 可以将配置信息集中管理,然后通过 @RefreshScope 注解实现配置的动态刷新。这使得应用程序在运行时可以获取最新的配置信息,而不需要停止和启动应用。

2.4、@DependsOn 注解

@DependsOn 是 Spring Framework 中的一个注解,用于指定 Spring Bean 之间的依赖关系。通过在 Bean 上添加 @DependsOn 注解,你可以确保指定的 Bean 会在其所依赖的其他 Bean 初始化之后再进行初始化。
当一个 Bean 希望在另一个 Bean 初始化完成后再初始化时,可以使用 @DependsOn 注解来定义这种依赖关系。这对于确保 Bean 之间的正确顺序初始化非常有用,特别是当某些 Bean 需要依赖其他 Bean 才能正确地进行初始化或工作时。

2.5、 listener.ack-mode 消息的确认模式

Spring Kafka 中,AckMode 是用于配置消息消费者的消息确认模式(Acknowledgment Mode)。这个枚举类型用于决定在消费者处理完 Kafka 消息后如何向 Kafka 服务器发送确认,告知服务器消息是否已经被成功消费。

Spring Kafka 支持以下几种 AckMode:

  • AUTO: 这是默认的确认模式。在这种模式下,消费者会自动在处理完消息后向 Kafka 服务器发送确认。当消费者成功处理消息后,Kafka 服务器会将偏移量(offset)移动到下一条消息,表示该消息已成功消费。但是在这种模式下,如果处理消息时发生异常,Kafka 服务器会重新发送相同的消息,可能会导致消息的重复消费。

  • MANUAL: 在这种模式下,消费者需要手动管理消息的确认。当消费者成功处理消息后,需要调用 Acknowledgment 对象的 acknowledge() 方法,手动向 Kafka 服务器发送确认。这样可以确保消息的准确处理,避免重复消费。

  • MANUAL_IMMEDIATE: 这是另一种手动确认模式,与 MANUAL 模式类似。区别在于,当消费者调用 acknowledge() 方法时,它会立即提交确认而不是等待下一次轮询。这样可以更快地将确认提交给 Kafka 服务器。

你可以通过在 @KafkaListener 注解中设置 AckMode 来配置消息消费者的确认模式。例如:

import org.springframework.kafka.annotation.KafkaListener;import org.springframework.kafka.support.Acknowledgment;@KafkaListener(topics = "my_topic", groupId = "my_group", ackMode = "MANUAL")public void proceSSMessage(String message, Acknowledgment acknowledgment) {    // 处理消息的逻辑    // 手动确认消息    acknowledgment.acknowledge();}

在上面的例子中,我们将 ackMode 设置为 MANUAL,表明消息的确认模式为手动确认。在处理消息后,我们手动调用 acknowledgment.acknowledge() 来确认消息的消费。

选择合适的 AckMode 取决于你的应用程序需求和消费者的可靠性要求。如果应用程序对消息的重复消费有一定的容忍度,并且希望简化消费者的处理逻辑,可以选择 AUTO 模式。如果应用程序对消息的准确性要求较高,并且愿意手动确认消息的处理,可以选择 MANUAL 或 MANUAL_IMMEDIATE 模式。

2.6、kafka.consumer.auto-offset-reset 偏移量

kafka.consumer.auto-offset-reset 是 Kafka 消费者的一个重要配置属性,它决定了当一个新的消费者加入消费者组或者消费者在某个分区上没有有效的偏移量时,消费者应该从何处开始消费消息。

这个属性有以下几个可能的值:

  • rliest: 如果消费者在某个分区上没有有效的偏移量,或者消费者组第一次加入时,从最早的可用偏移量开始消费消息。换句话说,从分区的起始位置开始消费。

  • latest: 如果消费者在某个分区上没有有效的偏移量,或者消费者组第一次加入时,从最新的可用偏移量开始消费消息。换句话说,只消费自加入后产生的新消息。

  • none: 如果消费者在某个分区上没有有效的偏移量,抛出一个异常。这种情况下,消费者必须手动设置初始偏移量。

  • anything else: 抛出一个异常。

这个属性通常在 Kafka 消费者的配置中使用,用来控制消费者在特定情况下的起始消费位置。你可以根据你的业务需求来选择适合的值。如果你希望消费者能够从最早的消息开始消费,以确保不错过任何消息,可以将它设置为 earliest。如果你只关心新产生的消息,可以将它设置为 latest。

3、KafkaProducer 生产者

@Component@RefreshScope@ConditionalOnExpression("${control.command.kafka.enabled:false}")@DependsOn(value = {"controlCommandKafkaTemplate"})public class ControlCommandKafkaProducer extends BaseLogable {    @Resource(name = "controlCommandKafkaTemplate")    private KafkaTemplate<String, String> controlCommandKafkaTemplate;    public void send(String key, String topic, String msg) {        bizLogger.info("sent kafka msg, topic:{}, key: {}, msg: {}", topic, key, msg);        ListenableFuture<SendResult<String, String>> listenableFuture = controlCommandKafkaTemplate.send(topic, key, msg);        listenableFuture.addCallback(result -> bizLogger.info("sent msg success:{}", msg),                e -> {                    bizLogger.error("sent msg failure, msg: {}", msg, e);                });    }}

3.1 org.springframework.kafka.core.KafkaTemplate 使用

3.1.1 发送消息

import org.springframework.kafka.core.KafkaTemplate;import org.springframework.stereotype.Service;@Servicepublic class KafkaProducerService {        private final KafkaTemplate<String, String> kafkaTemplate;        public KafkaProducerService(KafkaTemplate<String, String> kafkaTemplate) {        this.kafkaTemplate = kafkaTemplate;    }        public void sendMessage(String topic, String message) {        kafkaTemplate.send(topic, message);    }}

3.1.2 发送消息并指定分区

public void sendMessageToPartition(String topic, int partition, String message) {    kafkaTemplate.send(topic, partition, null, message);}

3.1.3 发送消息并指定键

public void sendMessageWithKey(String topic, String key, String message) {    kafkaTemplate.send(topic, key, message);}

3.1.4 发送消息并等待确认

import org.apache.kafka.clients.producer.ProducerRecord;import org.springframework.kafka.support.SendResult;import org.springframework.kafka.support.future.ListenableFuture;import org.springframework.util.concurrent.ListenableFutureCallback;public void sendMessageAndWaitForConfirmation(String topic, String message) {    ProducerRecord<String, String> record = new ProducerRecord<>(topic, message);    ListenableFuture<SendResult<String, String>> future = kafkaTemplate.send(record);        future.addCallback(new ListenableFutureCallback<SendResult<String, String>>() {        @Override        public void onSuccess(SendResult<String, String> result) {            // 消息发送成功的处理逻辑        }        @Override        public void onFailure(Throwable ex) {            // 消息发送失败的处理逻辑        }    });}

4、KafkaConsumer 消费者

4.1 单条消费

单条消费时配置

control.command.kafka.listener.type=SINGLE

@Component@RefreshScope@ConditionalOnExpression("${control.command.kafka.enabled:false}")@DependsOn(value = {"controlCommandBatchFactory"})public class ControlCommandKafkaConsumer extends BaseLogable {        @KafkaListener(id = "${control.command.kafka.consumer.groupId}", topics = "${kafka.command.topic.ecgs-command-up}", containerFactory = "controlCommandBatchFactory")    public void listen(ConsumerRecord<String, String> record, Acknowledgment ack) {        bizLogger.info("kafka msg::: " + record.value() + ", key: " + record.key());        //接收Kafka消息        String data = record.value();        //接收消息确认        ack.acknowledge();    }}

4.2 多条消费

多条消费时配置

control.command.kafka.listener.type=BATCH

package cc.eslink.yq.iwater.kafka;import cc.eslink.common.base.BaseLogable;import cc.eslink.yq.iwater.dto.schedule.AlarmInfoDTO;import com.alibaba.fastJSON.jsON;import com.alibaba.fastjson.JSONObject;import com.alibaba.fastjson.TypeReference;import org.apache.commons.lang3.StringUtils;import org.apache.kafka.clients.consumer.ConsumerRecord;import org.springframework.boot.autoconfigure.condition.ConditionalOnExpression;import org.springframework.cloud.context.config.annotation.RefreshScope;import org.springframework.context.annotation.DependsOn;import org.springframework.kafka.annotation.KafkaListener;import org.springframework.kafka.support.Acknowledgment;import org.springframework.stereotype.Component;import java.util.List;@Component@RefreshScope@ConditionalOnExpression("${control.command.kafka.enabled:false}")@DependsOn(value = {"controlCommandBatchFactory"})public class ControlCommandKafkaConsumer extends BaseLogable {    @KafkaListener(id = "${control.command.kafka.consumer.groupId}", topics = "${kafka.command.topic.ecgs-command-up}", containerFactory = "controlCommandBatchFactory")    public void listen(List<ConsumerRecord<String, String>> records, Acknowledgment ack) {        bizLogger.info("== 接收到 kafka msg 条数:::{} ==", records.size());        //接收消息确认        ack.acknowledge();        //接收Kafka消息        for (ConsumerRecord<String, String> record : records) {            try {                final AlarmInfoDTO alarmInfoDTO = JSON.parseObject(record.value(), new TypeReference<AlarmInfoDTO>() {});            } catch (Exception e) {                expLogger.error("alarm.dispose error", record, e);            }        }    }}

来源地址:https://blog.csdn.net/weixin_41827053/article/details/132339253

--结束END--

本文标题: kafka springBoot 配置

本文链接: https://www.lsjlt.com/news/382614.html(转载时请注明来源链接)

有问题或投稿请发送至: 邮箱/279061341@qq.com    QQ/279061341

本篇文章演示代码以及资料文档资料下载

下载Word文档到电脑,方便收藏和打印~

下载Word文档
猜你喜欢
  • kafka springBoot 配置
    1、properties 配置 control.command.kafka.enabled=truecontrol.command.kafka.bootstrap-servers=172.0.0.1:9092control.command....
    99+
    2023-08-30
    kafka spring boot linq
  • springboot多kafka配置
    第一步:引入maven依赖 org.springframework.kafka spring-kafka 第二步:新增配置文件 以下为大致结构,供参考 spring: kafka: ...
    99+
    2023-09-04
    kafka java spring boot
  • springboot怎么配置双kafka
    这篇文章主要介绍“springboot怎么配置双kafka”,在日常操作中,相信很多人在springboot怎么配置双kafka问题上存在疑惑,小编查阅了各式资料,整理出简单好用的操作方法,希望对大家解答”springboot怎么配置双ka...
    99+
    2023-07-06
  • springboot之配置双kafka全过程
    目录springboot配置双kafka引入Maven kafka jar、准备两个kafka;配置yml配置文件配置KafkaConfig类发送工具类MyKafkaProducer...
    99+
    2023-05-16
    springboot配置双kafka springboot配置kafka springboot kafka
  • springboot项目配置多个kafka的示例代码
    目录1.spring-kafka2.配置文件相关信息3.kafka配置类4.消费主题消息1.spring-kafka <dependency> <groupI...
    99+
    2023-05-17
    springboot配置多个kafka springboot配置kafka
  • SpringBoot集成Kafka配置工具类的详细代码
    目录1、单播模式,只有一个消费者组2、广播模式,多个消费者组spring-kafka 是基于 java版的 kafka client与spring的集成,提供了 KafkaTempl...
    99+
    2022-11-13
  • Kafka kafka在windows下的安装与配置
    kafka在windows下的安装与配置   By: 授客 QQ:1033553122     1.测试环境................................................................
    99+
    2021-09-21
    Kafka kafka在windows下的安装与配置
  • Kafka的安装与配置
    一    jar包方式安装Kafka jar包下载地址:https://kafka.apache.org/downloads下载   1.配置java环境 1、上传jdk-8u341-linux-x64.tar.gz到服务器并安装: # t...
    99+
    2023-09-28
    kafka java linux 后端 中间件
  • Kafka 配置文件详情
    kafka的配置分为 broker、producter、consumer三个不同的配置 一 、BROKER 的全局配置 最为核心的三个配置 broker.id、log.dir、zookeeper.connect 。 -------...
    99+
    2019-10-06
    Kafka 配置文件详情
  • Kafka 集群配置SASL+ACL
    在Kafka0.9版本之前,Kafka集群时没有安全机制的。Kafka Client应用可以通过连接Zookeeper地址,例如zk1:2181:zk2:2181,zk3:2181等。来获取存储在Zookeeper中的Kafka元数据信息。...
    99+
    2023-01-31
    集群 Kafka ACL
  • springboot配置jpa
      配置方式  pom依赖  org.springframework.boot  spring-boot-starter-aop  application.xml配置  #jpa配置  spring.jpa...
    99+
    2022-10-18
  • SpringBoot怎么整合Kafka
    本文小编为大家详细介绍“SpringBoot怎么整合Kafka”,内容详细,步骤清晰,细节处理妥当,希望这篇“SpringBoot怎么整合Kafka”文章能帮助大家解决疑惑,下面跟着小编的思路慢慢深入,一起...
    99+
    2022-10-19
  • Kafka安装与配置详细过程
    本节详细介绍 Kafka 运行环境的搭建,为了节省篇幅,本节的内容以 Linux CentOS 作为安装演示的操作系统,其他 Linux 系列的操作系统也可以参考本节的内容。具体的操...
    99+
    2022-11-12
  • Kafka的监听地址怎么配置
    本文小编为大家详细介绍“Kafka的监听地址怎么配置”,内容详细,步骤清晰,细节处理妥当,希望这篇“Kafka的监听地址怎么配置”文章能帮助大家解决疑惑,下面跟着小编的思路慢慢深入,一起来学习新知识吧。目前监听相关的参数主要有下面几个:li...
    99+
    2023-06-28
  • Apache Kafka配置的方法是什么
    Apache Kafka的配置方法如下:1. 安装Kafka:首先需要下载和安装Kafka。可以在官网上下载Kafka二进制文件,解...
    99+
    2023-06-13
    Apache Kafka Apache Kafka
  • kafka添加安全验证配置方式
    目录服务端配置 1. config 目录添加kafka_server_jaas.conf 配置文件2. kafka-run-class.sh 添加3. config/ser...
    99+
    2022-11-13
    kafka安全验证 kafka配置 kafka安全验证配置
  • Linux系统中如何安装配置Kafka
    这篇文章主要为大家展示了Linux系统中如何安装配置Kafka,内容简而易懂,条理清晰,希望能够帮助大家解决疑惑,下面让小编带大家一起来研究并学习一下“Linux系统中如何安装配置Kafka”这篇文章吧。Linux系统安装配置Kafka具体...
    99+
    2023-06-28
  • Ubuntu-16.04中怎么配置Apache Kafka集群
    这期内容当中小编将会给大家带来有关Ubuntu-16.04中怎么配置Apache Kafka集群,文章内容丰富且以专业的角度为大家分析和叙述,阅读完这篇文章希望大家可以有所收获。Apache Kafka是一个免费的开源流处理软件平台,由Ap...
    99+
    2023-06-02
  • Springboot 配置SqlSessionFactory方式
    目录Springboot配置SqlSessionFactorySpringbootSqlSessionFactory错误Springboot 配置SqlSessionFactory ...
    99+
    2022-11-12
  • springboot redis缓存配置
    今天小编就为大家带来一篇springboot redis缓存配置的文章。小编觉得挺实用的,为此分享给大家做个参考。一起跟随小编过来看看吧。开启远程访问:找到redis中的redis.conf文件并编辑(在安...
    99+
    2022-10-18
软考高级职称资格查询
编程网,编程工程师的家园,是目前国内优秀的开源技术社区之一,形成了由开源软件库、代码分享、资讯、协作翻译、讨论区和博客等几大频道内容,为IT开发者提供了一个发现、使用、并交流开源技术的平台。
  • 官方手机版

  • 微信公众号

  • 商务合作