第一步:引入Maven依赖 org.springframework.kafka spring-kafka 第二步:新增配置文件 以下为大致结构,供参考 spring: kafka:
<dependency> <groupId>org.springframework.kafkagroupId> <artifactId>spring-kafkaartifactId>dependency>
spring: kafka: # 第一个kafka的配置 first: bootstrap-servers: xxx.xxx.xxx.xxx:xxxx producer: retries: x acks: -1 consumer: enable-auto-commit: false group-id: first-consumer listener: ack-mode: xx # 第二个kafka的配置 second: bootstrap-servers: xxx.xxx.xxx.xxx:xxxx producer: batch-size: xxxx buffer-memory: xxxxxx consumer: auto-offset-reset: earliest group-id: second-consumer listener: concurrency: xx
@Configurationpublic class FirstKafkaConfig { @Primary @ConfigurationProperties(prefix = "spring.kafka.first") @Bean public KafkaProperties firstKafkaProperties() { return new KafkaProperties(); } @Primary @Bean public KafkaTemplate<String, String> firstKafkaTemplate( @Autowired @Qualifier("firstKafkaProperties") KafkaProperties firstKafkaProperties) { return new KafkaTemplate<>(firstProducerFactory(firstKafkaProperties)); } @Bean public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<Integer, String>> firstKafkaListenerContainerFactory(@Autowired @Qualifier("firstKafkaProperties") KafkaProperties firstKafkaProperties) { ConcurrentKafkaListenerContainerFactory<Integer, String> factory = new ConcurrentKafkaListenerContainerFactory<>(); factory.setConsumerFactory(firstConsumerFactory(firstKafkaProperties)); return factory; } private ConsumerFactory<? super Integer, ? super String> firstConsumerFactory(KafkaProperties firstKafkaProperties) { return new DefaultKafkaConsumerFactory<>(firstKafkaProperties.buildConsumerProperties()); } private DefaultKafkaProducerFactory<String, String> firstProducerFactory(KafkaProperties firstKafkaProperties) { return new DefaultKafkaProducerFactory<>(firstKafkaProperties.buildProducerProperties()); }}
@Configurationpublic class SecondKafkaConfig { @ConfigurationProperties(prefix = "spring.kafka.second") @Bean public KafkaProperties secondKafkaProperties() { return new KafkaProperties(); } @Bean public KafkaTemplate<String, String> secondKafkaTemplate( @Autowired @Qualifier("secondKafkaProperties") KafkaProperties secondKafkaProperties) { return new KafkaTemplate<>(secondProducerFactory(secondKafkaProperties)); } @Bean public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<Integer, String>> secondKafkaListenerContainerFactory(@Autowired @Qualifier("secondKafkaProperties") KafkaProperties secondKafkaProperties) { ConcurrentKafkaListenerContainerFactory<Integer, String> factory = new ConcurrentKafkaListenerContainerFactory<>(); factory.setConsumerFactory(secondConsumerFactory(secondKafkaProperties)); return factory; } private ConsumerFactory<? super Integer, ? super String> secondConsumerFactory(KafkaProperties secondKafkaProperties) { return new DefaultKafkaConsumerFactory<>(secondKafkaProperties.buildConsumerProperties()); } private DefaultKafkaProducerFactory<String, String> secondProducerFactory(KafkaProperties secondKafkaProperties) { return new DefaultKafkaProducerFactory<>(secondKafkaProperties.buildProducerProperties()); }}
@Resourceprivate KafkaTemplate<String, String> firstKafkaTemplate;@Resource(name = "secondKafkaTemplate")private KafkaTemplate<String, String> secondKafkaTemplate;
@KafkaListener( containerFactory = "secondKafkaListenerContainerFactory", topics = {"xxxx"}, groupId = "second-consumer")public void testConsumer(ConsumerRecord<?, ?> record, Acknowledgment ack) { //do something}
来源地址:https://blog.csdn.net/qq_35893873/article/details/130620679
--结束END--
本文标题: springboot多kafka配置
本文链接: https://www.lsjlt.com/news/393090.html(转载时请注明来源链接)
有问题或投稿请发送至: 邮箱/279061341@qq.com QQ/279061341
下载Word文档到电脑,方便收藏和打印~
2024-04-03
2024-04-03
2024-04-01
2024-01-21
2024-01-21
2024-01-21
2024-01-21
2023-12-23
回答
回答
回答
回答
回答
回答
回答
回答
回答
回答
0