iis服务器助手广告广告
返回顶部
首页 > 资讯 > 后端开发 > PHP编程 >使用Spring Boot和Kafka实现消息发送和订阅
  • 179
分享到

使用Spring Boot和Kafka实现消息发送和订阅

springbootkafka后端订阅消息队列 2023-09-05 20:09:29 179人浏览 独家记忆
摘要

文章目录 一,新建Spring Boot1,Maven配置2,无法识别为SpringBoot项目3,无效的源发行版4,无法访问SpringApplication5,运行直接Finish6,服务

文章目录

一,新建Spring Boot

最近忙着搞低代码开发,好久没新建spring项目了,结果今天心血来潮准备建个SpringBoot项目
在这里插入图片描述
注意Type选Maven,java选8,其他默认
在这里插入图片描述

1,Maven配置

点下一步后完成就新建了一个spring boot项目,配置下Maven环境,主要是settings.xml文件,里面要包含阿里云仓库,不然可能依赖下载不下来
在这里插入图片描述
在这里插入图片描述

2,无法识别为SpringBoot项目

在maven配置没问题的前提下,IDEA无法识别这是一个Spring Boot项目,倒腾半天,终于发现问题原因所在=======>是Maven版本太高的原因
在这里插入图片描述
把.mvn/wrapper目录下的maven-wrapper.properties文件第一行的版本号降低,比如说降为3.5.4,然后重新点下Maven的同步按钮
在这里插入图片描述

3,无效的源发行版

接下来运行项目报错:java: 无效的源发行版: 14
在这里插入图片描述
修改pom.xml中java.version值为8,原来是17

<properties>        <java.version>17java.version>    properties>

4,无法访问SpringApplication

继续运行,继续报错在这里插入图片描述
降低spring-boot-starter-parent版本,原来是3.1.3,改为2.7.2

5,运行直接Finish

继续运行,没报错,服务直接Finished
在这里插入图片描述
需要添加WEB依赖

 <dependency>            <groupId>org.springframework.bootgroupId>            <artifactId>spring-boot-starter-webartifactId>        dependency>

6,服务运行成功

终于,一个空的spring boot项目成功跑起来了,喜极而泣
在这里插入图片描述

二,安装启动kafka

1,下载

官网=>https://kafka.apache.org/downloads,下载最新版的kafka,目前是3.5.1
在这里插入图片描述

2,配置

解压到D盘Config目录下即完成安装,目录为D:\Config\kafka_2.13-3.5.1
修改配置文件
(1) server.properties

broker.id=1log.dirs=/Config/kafka_2.13-3.5.1/logs-kafka

(2) ZooKeeper.properties

dataDir=/Config/kafka_2.13-3.5.1/logs-zookeeper

3,启动

先启动zookeeper

bin\windows\zookeeper-server-start.bat config\zookeeper.properties

再启动kafka

bin\windows\kafka-server-start.bat config\server.properties

停止的时候,先停止kafka,再停止zookeeper,直接ctrl+c停止

4,其他命令

1,查看topic列表

bin\windows\kafka-topics.bat --list --bootstrap-server localhost:9092

2,查看topic具体信息

bin\windows\kafka-topics.bat --describe --bootstrap-server localhost:9092 --topic test

3,创建topic

bin\windows\kafka-topics.bat --create --bootstrap-server localhost:9092 --replication-factor 1 --partitions 1 --topic test

三,生产消费消息

1,加入依赖

 <dependency>            <groupId>org.springframework.kafkagroupId>            <artifactId>spring-kafkaartifactId>        dependency>

2,yam配置文件

application.yaml

spring:  profiles:    active: dev

application-dev.yaml

server:  port: 8082  servlet:    context-path: /test-kafkaspring:  cache:    type: ehcache    config: classpath:ehcache.xml  jpa:    database-platfORM: com.enigmabridge.hibernate.dialect.sqliteDialect  kafka:    bootstrap-servers: 127.0.0.1:9092    consumer:      group-id: kafka-demo-kafka-group      key-deserializer: org.apache.kafka.common.serialization.StringDeserializer       value-deserializer: org.apache.kafka.common.serialization.StringDeserializer     producer:      key-serializer: org.apache.kafka.common.serialization.StringSerializer       value-serializer: org.apache.kafka.common.serialization.StringSerializer       retries: 10

3,报错enabled mechanisms are []

Connection to node -1 (activate.navicat.com/127.0.0.1:9092) failed authentication due to: Unexpected handshake request with client mechanism PLAIN, enabled mechanisms are []

在这里插入图片描述
这个错误我本地测试下来是因为没把账号密码配置这块注释掉
在这里插入图片描述

4,生产者生产消息

@Slf4j@Componentpublic class KafkaProducer {    @Autowired    private KafkaTemplate<String, String> kafkaTemplate;    public String sendMessage(String content) {        String topic = "test_topic";        kafkaTemplate.send(topic, content).addCallback(success -> {            String topic = success.getRecordMetadata().topic();            int partition = success.getRecordMetadata().partition();            long offset = success.getRecordMetadata().offset();            log.info("发送成功:主题:{},分区:{},偏移量:{}",topic,partition,offset);        }, failure -> {            log.info("发送失败:{}",failure.getMessage());        });        return "发送成功";    }}

5,订阅和消费消息

一,订阅主题
1,获取消费者

import org.apache.kafka.clients.consumer.ConsumerConfig;import org.apache.kafka.clients.consumer.KafkaConsumer;import org.springframework.beans.factory.annotation.Value;import org.springframework.context.annotation.Configuration;import org.springframework.stereotype.Component;import java.util.Properties;@Configuration@Componentpublic class KafkaConfig {    @Value("${spring.kafka.bootstrap-servers}")    private String bootstrapServers;    @Value("${spring.kafka.consumer.group-id}")    private String groupId;    @Value("${spring.kafka.consumer.key-deserializer}")    private String keyDeserializer;    @Value("${spring.kafka.consumer.value-deserializer}")    private String valueDeserializer;    public KafkaConsumer<String, String> createConsumer() {        Properties props = new Properties();        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);        props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, keyDeserializer);        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, valueDeserializer);        props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");        KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);        return consumer;    }}

2,订阅topic

 KafkaConsumer<String, String> consumer = kafkaConfig.createConsumer();        consumer.subscribe(Collections.singleton("traffic"));

3,拉取消息

 ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100)); for (ConsumerRecord<String, String> record : records) {String key = record.key();String value = record.value();log.info("\n收到消息key=>{}\n收到消息value=>{}",key,value);}

4,消费位移,释放资源

// 提交消费位移consumer.commitSync();// 关闭消费者以释放资源consumer.close();

二,点对点模式

@Slf4j@Componentpublic class KafkaConsumer {    @KafkaListener(topics = {"test_topic"})    public void handlerMsg(String content) {        log.info("接收到消息:消息值:{} ",content);    }}

6,接口

@Slf4j@RestControllerpublic class KafkaController {    @Autowired    private KafkaProducer kafkaProducer;    @PostMapping("/sendMessage")    public String sendMessage(@RequestParam String content) {        kafkaProducer.sendMessage(content);        return "ok";    }}

7,测试结果

在这里插入图片描述
接收到消息
在这里插入图片描述

四,参考博文

  1. 解决IDEA无法识别SpringBoot项目
  2. SpringBoot从入门到精通(十二)SpringBoot集成Kafka
  3. Kafka的下载安装以及使用
  4. Kafka消息消费流程详解
  5. Kafka之Consumer使用与基本原理

来源地址:https://blog.csdn.net/diyangxia/article/details/132567238

--结束END--

本文标题: 使用Spring Boot和Kafka实现消息发送和订阅

本文链接: https://www.lsjlt.com/news/395980.html(转载时请注明来源链接)

有问题或投稿请发送至: 邮箱/279061341@qq.com    QQ/279061341

本篇文章演示代码以及资料文档资料下载

下载Word文档到电脑,方便收藏和打印~

下载Word文档
猜你喜欢
软考高级职称资格查询
编程网,编程工程师的家园,是目前国内优秀的开源技术社区之一,形成了由开源软件库、代码分享、资讯、协作翻译、讨论区和博客等几大频道内容,为IT开发者提供了一个发现、使用、并交流开源技术的平台。
  • 官方手机版

  • 微信公众号

  • 商务合作