这篇文章主要介绍“SpringBoot怎么配置双kafka”,在日常操作中,相信很多人在springboot怎么配置双kafka问题上存在疑惑,小编查阅了各式资料,整理出简单好用的操作方法,希望对大家解答”springboot怎么配置双ka
这篇文章主要介绍“SpringBoot怎么配置双kafka”,在日常操作中,相信很多人在springboot怎么配置双kafka问题上存在疑惑,小编查阅了各式资料,整理出简单好用的操作方法,希望对大家解答”springboot怎么配置双kafka”的疑惑有所帮助!接下来,请跟着小编一起来学习吧!
使用Spring Boot 2.0.8.RELEASE 版本
<dependency> <groupId>org.springframework.kafka</groupId> <artifactId>spring-kafka</artifactId></dependency>
spring: kafka: bootstrap-servers: 180.167.180.242:9092 #kafka的访问地址,多个用","隔开 consumer: enable-auto-commit: true group-id: kafka #群组ID outkafka: bootstrap-servers: localhost:9092 #kafka的访问地址,多个用","隔开 consumer: enable-auto-commit: true group-id: kafka_1 #群组ID
import java.util.HashMap;import java.util.Map; import org.apache.kafka.clients.consumer.ConsumerConfig;import org.apache.kafka.clients.producer.ProducerConfig;import org.apache.kafka.common.serialization.StringDeserializer;import org.apache.kafka.common.serialization.StringSerializer;import org.springframework.beans.factory.annotation.Value;import org.springframework.context.annotation.Bean;import org.springframework.context.annotation.Configuration;import org.springframework.context.annotation.Primary;import org.springframework.kafka.annotation.EnableKafka;import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;import org.springframework.kafka.config.KafkaListenerContainerFactory;import org.springframework.kafka.core.ConsumerFactory;import org.springframework.kafka.core.DefaultKafkaConsumerFactory;import org.springframework.kafka.core.DefaultKafkaProducerFactory;import org.springframework.kafka.core.KafkaTemplate;import org.springframework.kafka.core.ProducerFactory;import org.springframework.kafka.listener.ConcurrentMessageListenerContainer; @Configuration@EnableKafkapublic class KafkaConfig { @Value("${spring.kafka.bootstrap-servers}") private String innerServers; @Value("${spring.kafka.consumer.group-id}") private String innerGroupid; @Value("${spring.kafka.consumer.enable-auto-commit}") private String innerEnableAutoCommit; @Bean @Primary//理解为默认优先选择当前容器下的消费者工厂 KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<Integer, String>> kafkaListenerContainerFactory() { ConcurrentKafkaListenerContainerFactory<Integer, String> factory = new ConcurrentKafkaListenerContainerFactory<>(); factory.setConsumerFactory(consumerFactory()); factory.setConcurrency(3); factory.getContainerProperties().setPollTimeout(3000); return factory; } @Bean//第一个消费者工厂的bean public ConsumerFactory<Integer, String> consumerFactory() { return new DefaultKafkaConsumerFactory<>(consumerConfigs()); } @Bean public Map<String, Object> consumerConfigs() { Map<String, Object> props = new HashMap<>(); props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, innerServers); props.put(ConsumerConfig.GROUP_ID_CONFIG, innerGroupid); props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, innerEnableAutoCommit);// props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "100");// props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "15000"); props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); return props; } @Bean //生产者工厂配置 public ProducerFactory<String, String> producerFactory() { return new DefaultKafkaProducerFactory<>(senderProps()); } @Bean //kafka发送消息模板 public KafkaTemplate<String, String> kafkaTemplate() { return new KafkaTemplate<String, String>(producerFactory()); } private Map<String, Object> senderProps() { Map<String, Object> props = new HashMap<>(); props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, innerServers); props.put(ProducerConfig.RETRIES_CONFIG, 0); props.put(ProducerConfig.ACKS_CONFIG, "1"); // 以下配置当缓存数量达到16kb,就会触发网络请求,发送消息// props.put(ProducerConfig.BATCH_SIZE_CONFIG, 16384); // 每条消息在缓存中的最长时间,如果超过这个时间就会忽略batch.size的限制,由客户端立即将消息发送出去// props.put(ProducerConfig.LINGER_MS_CONFIG, 1);// props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, 33554432); props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class); props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class); return props; } @Value("${spring.outkafka.bootstrap-servers}") private String outServers; @Value("${spring.outkafka.consumer.group-id}") private String outGroupid; @Value("${spring.outkafka.consumer.enable-auto-commit}") private String outEnableAutoCommit; static { } @Bean KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<Integer, String>> kafkaListenerContainerFactoryOutSchedule() { ConcurrentKafkaListenerContainerFactory<Integer, String> factory = new ConcurrentKafkaListenerContainerFactory<>(); factory.setConsumerFactory(consumerFactoryOutSchedule()); factory.setConcurrency(3); factory.getContainerProperties().setPollTimeout(3000); return factory; } @Bean public ConsumerFactory<Integer, String> consumerFactoryOutSchedule() { return new DefaultKafkaConsumerFactory<>(consumerConfigsOutSchedule()); } @Bean public Map<String, Object> consumerConfigsOutSchedule() { Map<String, Object> props = new HashMap<>(); props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, outServers); props.put(ConsumerConfig.GROUP_ID_CONFIG, outGroupid); props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, outEnableAutoCommit); props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); return props; } @Bean //生产者工厂配置 public ProducerFactory<String, String> producerOutFactory() { return new DefaultKafkaProducerFactory<>(senderOutProps()); } @Bean //kafka发送消息模板 public KafkaTemplate<String, String> kafkaOutTemplate() { return new KafkaTemplate<String, String>(producerOutFactory()); } private Map<String, Object> senderOutProps() { Map<String, Object> props = new HashMap<>(); props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, outServers); props.put(ProducerConfig.RETRIES_CONFIG, 0); props.put(ProducerConfig.ACKS_CONFIG, "1"); // 以下配置当缓存数量达到16kb,就会触发网络请求,发送消息// props.put(ProducerConfig.BATCH_SIZE_CONFIG, 16384); // 每条消息在缓存中的最长时间,如果超过这个时间就会忽略batch.size的限制,由客户端立即将消息发送出去// props.put(ProducerConfig.LINGER_MS_CONFIG, 1);// props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, 33554432); props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class); props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class); return props; }}
import org.springframework.beans.factory.annotation.Autowired;import org.springframework.kafka.core.KafkaTemplate;import org.springframework.kafka.support.SendResult;import org.springframework.scheduling.annotation.EnableScheduling;import org.springframework.stereotype.Component;import org.springframework.util.concurrent.ListenableFuture; import lombok.extern.slf4j.Slf4j; @Component // 这个必须加入容器不然,不会执行@EnableScheduling // 这里是为了测试加入定时调度@Slf4jpublic class MyKafkaProducer { @Autowired private KafkaTemplate<String, String> kafkaTemplate; @Autowired private KafkaTemplate<String, String> kafkaOutTemplate; public ListenableFuture<SendResult<String, String>> send(String topic, String key, String JSON) { ListenableFuture<SendResult<String, String>> result = kafkaTemplate.send(topic, key, json); log.info("inner kafka send #topic=" + topic + "#key=" + key + "#json=" + json + "#推送成功==========="); return result; } public ListenableFuture<SendResult<String, String>> sendOut(String topic, String key, String json) { ListenableFuture<SendResult<String, String>> result = kafkaOutTemplate.send(topic, key, json); log.info("out kafka send #topic=" + topic + "#key=" + key + "#json=" + json + "#推送成功==========="); return result; } }
@Slf4j@RunWith(SpringJUnit4ClassRunner.class)@SpringBootTest(classes={OesBcServiceApplication.class})public class MoreKafkaTest { @Autowired private MyKafkaProducer kafkaProducer; @Test public void sendInner() { for (int i = 0; i < 1; i++) { kafkaProducer.send("inner_test", "douzi" + i, "liyuehua" + i); kafkaProducer.sendOut("out_test", "douziout" + i, "fanbingbing" + i); } }}
@Component@Slf4jpublic class KafkaConsumer { @KafkaListener(topics={"inner_test"}, containerFactory="kafkaListenerContainerFactory") public void innerlistener(ConsumerRecord<String, String> record) { log.info("inner kafka receive #key=" + record.key() + "#value=" + record.value()); } @KafkaListener(topics={"out_test"}, containerFactory="kafkaListenerContainerFactoryOutSchedule") public void outListener(ConsumerRecord<String, String> record) { log.info("out kafka receive #key=" + record.key() + "#value=" + record.value()); }}
07-11 12:41:27.811 INFO [com.wondertek.oes.bc.service.send.MyKafkaProducer] - inner kafka send #topic=inner_test#key=douzi0#json=liyuehua0#推送成功===========
07-11 12:41:27.995 INFO [com.wondertek.oes.bc.service.send.KafkaConsumer] - inner kafka receive #key=douzi0#value=liyuehua0
07-11 12:41:28.005 INFO [com.wondertek.oes.bc.service.send.MyKafkaProducer] - out kafka send #topic=out_test#key=douziout0#json=fanbingbing0#推送成功===========
07-11 12:41:28.013 INFO [com.wondertek.oes.bc.service.send.KafkaConsumer] - out kafka receive #key=douziout0#value=fanbingbing0
到此,关于“springboot怎么配置双kafka”的学习就结束了,希望能够解决大家的疑惑。理论与实践的搭配能更好的帮助大家学习,快去试试吧!若想继续学习更多相关知识,请继续关注编程网网站,小编会继续努力为大家带来更多实用的文章!
--结束END--
本文标题: springboot怎么配置双kafka
本文链接: https://www.lsjlt.com/news/357688.html(转载时请注明来源链接)
有问题或投稿请发送至: 邮箱/279061341@qq.com QQ/279061341
下载Word文档到电脑,方便收藏和打印~
2024-05-15
2024-05-15
2024-05-15
2024-05-15
2024-05-15
2024-05-15
2024-05-15
2024-05-15
2024-05-15
2024-05-15
回答
回答
回答
回答
回答
回答
回答
回答
回答
回答
0