iis服务器助手广告广告
返回顶部
首页 > 资讯 > 精选 >springboot怎么配置双kafka
  • 709
分享到

springboot怎么配置双kafka

2023-07-06 03:07:52 709人浏览 安东尼
摘要

这篇文章主要介绍“SpringBoot怎么配置双kafka”,在日常操作中,相信很多人在springboot怎么配置双kafka问题上存在疑惑,小编查阅了各式资料,整理出简单好用的操作方法,希望对大家解答”springboot怎么配置双ka

这篇文章主要介绍“SpringBoot怎么配置双kafka”,在日常操作中,相信很多人在springboot怎么配置双kafka问题上存在疑惑,小编查阅了各式资料,整理出简单好用的操作方法,希望对大家解答”springboot怎么配置双kafka”的疑惑有所帮助!接下来,请跟着小编一起来学习吧!

springboot配置双kafka

使用Spring Boot 2.0.8.RELEASE 版本

引入Maven kafka jar、准备两个kafka;

<dependency>    <groupId>org.springframework.kafka</groupId>    <artifactId>spring-kafka</artifactId></dependency>

配置yml配置文件

spring:  kafka:    bootstrap-servers: 180.167.180.242:9092 #kafka的访问地址,多个用","隔开    consumer:      enable-auto-commit: true      group-id: kafka #群组ID  outkafka:    bootstrap-servers: localhost:9092 #kafka的访问地址,多个用","隔开    consumer:      enable-auto-commit: true      group-id: kafka_1 #群组ID

配置KafkaConfig类

import java.util.HashMap;import java.util.Map; import org.apache.kafka.clients.consumer.ConsumerConfig;import org.apache.kafka.clients.producer.ProducerConfig;import org.apache.kafka.common.serialization.StringDeserializer;import org.apache.kafka.common.serialization.StringSerializer;import org.springframework.beans.factory.annotation.Value;import org.springframework.context.annotation.Bean;import org.springframework.context.annotation.Configuration;import org.springframework.context.annotation.Primary;import org.springframework.kafka.annotation.EnableKafka;import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;import org.springframework.kafka.config.KafkaListenerContainerFactory;import org.springframework.kafka.core.ConsumerFactory;import org.springframework.kafka.core.DefaultKafkaConsumerFactory;import org.springframework.kafka.core.DefaultKafkaProducerFactory;import org.springframework.kafka.core.KafkaTemplate;import org.springframework.kafka.core.ProducerFactory;import org.springframework.kafka.listener.ConcurrentMessageListenerContainer; @Configuration@EnableKafkapublic class KafkaConfig {    @Value("${spring.kafka.bootstrap-servers}")    private String innerServers;    @Value("${spring.kafka.consumer.group-id}")    private String innerGroupid;    @Value("${spring.kafka.consumer.enable-auto-commit}")    private String innerEnableAutoCommit;     @Bean    @Primary//理解为默认优先选择当前容器下的消费者工厂    KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<Integer, String>> kafkaListenerContainerFactory() {        ConcurrentKafkaListenerContainerFactory<Integer, String> factory = new ConcurrentKafkaListenerContainerFactory<>();        factory.setConsumerFactory(consumerFactory());        factory.setConcurrency(3);        factory.getContainerProperties().setPollTimeout(3000);        return factory;    }     @Bean//第一个消费者工厂的bean    public ConsumerFactory<Integer, String> consumerFactory() {        return new DefaultKafkaConsumerFactory<>(consumerConfigs());    }     @Bean    public Map<String, Object> consumerConfigs() {        Map<String, Object> props = new HashMap<>();        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, innerServers);        props.put(ConsumerConfig.GROUP_ID_CONFIG, innerGroupid);        props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, innerEnableAutoCommit);//        props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "100");//        props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "15000");        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);        return props;    }        @Bean //生产者工厂配置    public ProducerFactory<String, String> producerFactory() {        return new DefaultKafkaProducerFactory<>(senderProps());    }        @Bean //kafka发送消息模板    public KafkaTemplate<String, String> kafkaTemplate() {        return new KafkaTemplate<String, String>(producerFactory());    }            private Map<String, Object> senderProps() {        Map<String, Object> props = new HashMap<>();        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, innerServers);                props.put(ProducerConfig.RETRIES_CONFIG, 0);                props.put(ProducerConfig.ACKS_CONFIG, "1");                // 以下配置当缓存数量达到16kb,就会触发网络请求,发送消息//        props.put(ProducerConfig.BATCH_SIZE_CONFIG, 16384);        // 每条消息在缓存中的最长时间,如果超过这个时间就会忽略batch.size的限制,由客户端立即将消息发送出去//        props.put(ProducerConfig.LINGER_MS_CONFIG, 1);//        props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, 33554432);        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);        return props;    }        @Value("${spring.outkafka.bootstrap-servers}")    private String outServers;    @Value("${spring.outkafka.consumer.group-id}")    private String outGroupid;    @Value("${spring.outkafka.consumer.enable-auto-commit}")    private String outEnableAutoCommit;         static {            }            @Bean    KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<Integer, String>> kafkaListenerContainerFactoryOutSchedule() {        ConcurrentKafkaListenerContainerFactory<Integer, String> factory = new ConcurrentKafkaListenerContainerFactory<>();        factory.setConsumerFactory(consumerFactoryOutSchedule());        factory.setConcurrency(3);        factory.getContainerProperties().setPollTimeout(3000);        return factory;    }     @Bean    public ConsumerFactory<Integer, String> consumerFactoryOutSchedule() {        return new DefaultKafkaConsumerFactory<>(consumerConfigsOutSchedule());    }         @Bean    public Map<String, Object> consumerConfigsOutSchedule() {        Map<String, Object> props = new HashMap<>();        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, outServers);        props.put(ConsumerConfig.GROUP_ID_CONFIG, outGroupid);        props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, outEnableAutoCommit);        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);        return props;    }        @Bean //生产者工厂配置    public ProducerFactory<String, String> producerOutFactory() {        return new DefaultKafkaProducerFactory<>(senderOutProps());    }        @Bean //kafka发送消息模板    public KafkaTemplate<String, String> kafkaOutTemplate() {        return new KafkaTemplate<String, String>(producerOutFactory());    }            private Map<String, Object> senderOutProps() {        Map<String, Object> props = new HashMap<>();        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, outServers);                props.put(ProducerConfig.RETRIES_CONFIG, 0);                props.put(ProducerConfig.ACKS_CONFIG, "1");                // 以下配置当缓存数量达到16kb,就会触发网络请求,发送消息//        props.put(ProducerConfig.BATCH_SIZE_CONFIG, 16384);        // 每条消息在缓存中的最长时间,如果超过这个时间就会忽略batch.size的限制,由客户端立即将消息发送出去//        props.put(ProducerConfig.LINGER_MS_CONFIG, 1);//        props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, 33554432);        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);        return props;    }}

发送工具类MyKafkaProducer

import org.springframework.beans.factory.annotation.Autowired;import org.springframework.kafka.core.KafkaTemplate;import org.springframework.kafka.support.SendResult;import org.springframework.scheduling.annotation.EnableScheduling;import org.springframework.stereotype.Component;import org.springframework.util.concurrent.ListenableFuture; import lombok.extern.slf4j.Slf4j; @Component // 这个必须加入容器不然,不会执行@EnableScheduling // 这里是为了测试加入定时调度@Slf4jpublic class MyKafkaProducer {     @Autowired    private KafkaTemplate<String, String> kafkaTemplate;     @Autowired    private KafkaTemplate<String, String> kafkaOutTemplate;     public ListenableFuture<SendResult<String, String>> send(String topic, String key, String JSON) {        ListenableFuture<SendResult<String, String>> result = kafkaTemplate.send(topic, key, json);        log.info("inner kafka send #topic=" + topic + "#key=" + key + "#json=" + json + "#推送成功===========");        return result;    }     public ListenableFuture<SendResult<String, String>> sendOut(String topic, String key, String json) {        ListenableFuture<SendResult<String, String>> result = kafkaOutTemplate.send(topic, key, json);        log.info("out kafka send #topic=" + topic + "#key=" + key + "#json=" + json + "#推送成功===========");        return result;    } }

测试类

@Slf4j@RunWith(SpringJUnit4ClassRunner.class)@SpringBootTest(classes={OesBcServiceApplication.class})public class MoreKafkaTest {        @Autowired    private MyKafkaProducer kafkaProducer;        @Test    public void sendInner() {        for (int i = 0; i < 1; i++) {            kafkaProducer.send("inner_test", "douzi" + i, "liyuehua" + i);            kafkaProducer.sendOut("out_test", "douziout" + i, "fanbingbing" + i);        }    }}

接收类

@Component@Slf4jpublic class KafkaConsumer {      @KafkaListener(topics={"inner_test"}, containerFactory="kafkaListenerContainerFactory")    public void innerlistener(ConsumerRecord<String, String> record) {        log.info("inner kafka receive #key=" + record.key() + "#value=" + record.value());    }        @KafkaListener(topics={"out_test"}, containerFactory="kafkaListenerContainerFactoryOutSchedule")    public void outListener(ConsumerRecord<String, String> record) {        log.info("out kafka receive #key=" + record.key() + "#value=" + record.value());    }}

测试结果

07-11 12:41:27.811 INFO  [com.wondertek.oes.bc.service.send.MyKafkaProducer] - inner kafka send #topic=inner_test#key=douzi0#json=liyuehua0#推送成功===========
 
07-11 12:41:27.995 INFO  [com.wondertek.oes.bc.service.send.KafkaConsumer] - inner kafka receive #key=douzi0#value=liyuehua0
07-11 12:41:28.005 INFO  [com.wondertek.oes.bc.service.send.MyKafkaProducer] - out kafka send #topic=out_test#key=douziout0#json=fanbingbing0#推送成功===========
07-11 12:41:28.013 INFO  [com.wondertek.oes.bc.service.send.KafkaConsumer] - out kafka receive #key=douziout0#value=fanbingbing0

到此,关于“springboot怎么配置双kafka”的学习就结束了,希望能够解决大家的疑惑。理论与实践的搭配能更好的帮助大家学习,快去试试吧!若想继续学习更多相关知识,请继续关注编程网网站,小编会继续努力为大家带来更多实用的文章!

--结束END--

本文标题: springboot怎么配置双kafka

本文链接: https://www.lsjlt.com/news/357688.html(转载时请注明来源链接)

有问题或投稿请发送至: 邮箱/279061341@qq.com    QQ/279061341

本篇文章演示代码以及资料文档资料下载

下载Word文档到电脑,方便收藏和打印~

下载Word文档
猜你喜欢
  • springboot怎么配置双kafka
    这篇文章主要介绍“springboot怎么配置双kafka”,在日常操作中,相信很多人在springboot怎么配置双kafka问题上存在疑惑,小编查阅了各式资料,整理出简单好用的操作方法,希望对大家解答”springboot怎么配置双ka...
    99+
    2023-07-06
  • springboot之配置双kafka全过程
    目录springboot配置双kafka引入Maven kafka jar、准备两个kafka;配置yml配置文件配置KafkaConfig类发送工具类MyKafkaProducer...
    99+
    2023-05-16
    springboot配置双kafka springboot配置kafka springboot kafka
  • kafka springBoot 配置
    1、properties 配置 control.command.kafka.enabled=truecontrol.command.kafka.bootstrap-servers=172.0.0.1:9092control.command....
    99+
    2023-08-30
    kafka spring boot linq
  • springboot多kafka配置
    第一步:引入maven依赖 org.springframework.kafka spring-kafka 第二步:新增配置文件 以下为大致结构,供参考 spring: kafka: ...
    99+
    2023-09-04
    kafka java spring boot
  • springboot怎么配置双数据库
    在Spring Boot中配置双数据库可以使用多个数据源。可以按照以下步骤进行配置:1. 在application.properti...
    99+
    2023-10-21
    springboot 数据库
  • springboot怎么配置双数据源
    在Spring Boot中配置双数据源,可以使用多个数据源的配置,并为每个数据源创建对应的Bean。以下是配置双数据源的步骤:1. ...
    99+
    2023-10-27
    springboot
  • SpringBoot+Jpa项目配置双数据源怎么实现
    这篇文章主要介绍“SpringBoot+Jpa项目配置双数据源怎么实现”,在日常操作中,相信很多人在SpringBoot+Jpa项目配置双数据源怎么实现问题上存在疑惑,小编查阅了各式资料,整理出简单好用的操作方法,希望对大家解答”Sprin...
    99+
    2023-06-22
  • springboot + JPA 配置双数据源实战
    目录springboot + JPA 配置双数据源1、首先配置application.yml文件设置主从数据库2、使用配置类读取application.yml配置的两个数据源3、然后...
    99+
    2024-04-02
  • SpringBoot怎么整合Kafka
    本文小编为大家详细介绍“SpringBoot怎么整合Kafka”,内容详细,步骤清晰,细节处理妥当,希望这篇“SpringBoot怎么整合Kafka”文章能帮助大家解决疑惑,下面跟着小编的思路慢慢深入,一起...
    99+
    2024-04-02
  • websphere怎么配置双向ssl
    要配置双向 SSL(Secure Sockets Layer)连接,需要进行以下步骤:1. 生成证书和密钥库:- 首先,使用 key...
    99+
    2023-08-25
    websphere ssl
  • springboot项目配置多个kafka的示例代码
    目录1.spring-kafka2.配置文件相关信息3.kafka配置类4.消费主题消息1.spring-kafka <dependency> <groupI...
    99+
    2023-05-17
    springboot配置多个kafka springboot配置kafka
  • Linux双网卡怎么配置
    本篇内容介绍了“Linux双网卡怎么配置”的有关知识,在实际案例的操作过程中,不少人都会遇到这样的困境,接下来就让小编带领大家学习一下如何处理这些情况吧!希望大家仔细阅读,能够学有所成!配置双网卡网络接口eth0和eth2,并在服务器上配置...
    99+
    2023-06-05
  • Kafka的监听地址怎么配置
    本文小编为大家详细介绍“Kafka的监听地址怎么配置”,内容详细,步骤清晰,细节处理妥当,希望这篇“Kafka的监听地址怎么配置”文章能帮助大家解决疑惑,下面跟着小编的思路慢慢深入,一起来学习新知识吧。目前监听相关的参数主要有下面几个:li...
    99+
    2023-06-28
  • SpringBoot集成Kafka配置工具类的详细代码
    目录1、单播模式,只有一个消费者组2、广播模式,多个消费者组spring-kafka 是基于 java版的 kafka client与spring的集成,提供了 KafkaTempl...
    99+
    2024-04-02
  • Ubuntu-16.04中怎么配置Apache Kafka集群
    这期内容当中小编将会给大家带来有关Ubuntu-16.04中怎么配置Apache Kafka集群,文章内容丰富且以专业的角度为大家分析和叙述,阅读完这篇文章希望大家可以有所收获。Apache Kafka是一个免费的开源流处理软件平台,由Ap...
    99+
    2023-06-02
  • mysql双主模式怎么配置
    在MySQL中,双主模式也被称为双主复制或双向复制,它可以实现数据库的读写分离和高可用性。在双主模式下,两个MySQL服务器都可以接...
    99+
    2023-10-26
    mysql
  • Linux中怎么配置双网卡
    今天小编给大家分享一下Linux中怎么配置双网卡的相关知识点,内容详细,逻辑清晰,相信大部分人都还太了解这方面的知识,所以分享这篇文章给大家参考一下,希望大家阅读完这篇文章后有所收获,下面我们一起来了解一下吧。一、VMware虚拟机添加一个...
    99+
    2023-06-27
  • ssm怎么配置双数据源
    在 AWS Systems Manager (SSM) 中配置双数据源可以通过以下步骤完成:1. 登录 AWS 管理控制台并打开 S...
    99+
    2023-09-21
    ssm
  • SpringBoot+Jpa项目配置双数据源的实现
    目录引言配置yml文件创建数据源配置类为每个数据库创建配置类引言 今天为大家带来一些非常有用的实战技巧,比如在我们需要对两个数据库进行操作的时候而哦我们通常用的只是单数据库查询,这...
    99+
    2024-04-02
  • ubuntu双网卡路由怎么配置
    要配置Ubuntu双网卡路由,您需要执行以下步骤: 确保您有两个网卡连接到不同的网络。假设其中一个是eth0,另一个是eth1。...
    99+
    2024-02-29
    ubuntu
软考高级职称资格查询
编程网,编程工程师的家园,是目前国内优秀的开源技术社区之一,形成了由开源软件库、代码分享、资讯、协作翻译、讨论区和博客等几大频道内容,为IT开发者提供了一个发现、使用、并交流开源技术的平台。
  • 官方手机版

  • 微信公众号

  • 商务合作