广告
返回顶部
首页 > 资讯 > 精选 >SpringBoot怎么整合Pulsar
  • 576
分享到

SpringBoot怎么整合Pulsar

2023-07-02 14:07:24 576人浏览 独家记忆
摘要

这篇文章主要介绍了SpringBoot怎么整合Pulsar的相关知识,内容详细易懂,操作简单快捷,具有一定借鉴价值,相信大家阅读完这篇springBoot怎么整合Pulsar文章都会有所收获,下面我们一起来看看吧。一、添加pom.xml依赖

这篇文章主要介绍了SpringBoot怎么整合Pulsar的相关知识,内容详细易懂,操作简单快捷,具有一定借鉴价值,相信大家阅读完这篇springBoot怎么整合Pulsar文章都会有所收获,下面我们一起来看看吧。

一、添加pom.xml依赖

<parent>    <groupId>org.springframework.boot</groupId>    <artifactId>spring-boot-starter-parent</artifactId>    <version>2.7.0</version></parent><dependencies>    <dependency>        <groupId>org.springframework.boot</groupId>        <artifactId>spring-boot-starter-WEB</artifactId>    </dependency>    <dependency>        <groupId>org.apache.pulsar</groupId>        <artifactId>pulsar-client</artifactId>        <version>2.10.0</version>    </dependency>    <dependency>        <groupId>org.projectlombok</groupId>        <artifactId>lombok</artifactId>        <version>1.18.24</version>        <scope>provided</scope>    </dependency></dependencies><build>    <plugins>        <plugin>            <groupId>org.apache.Maven.plugins</groupId>            <artifactId>maven-compiler-plugin</artifactId>            <configuration>                <source>8</source>                <target>8</target>            </configuration>        </plugin>    </plugins></build>

二、Pulsar 参数类

import lombok.Data;import org.springframework.boot.context.properties.ConfigurationProperties;import org.springframework.stereotype.Component;import java.util.Map;@Component@ConfigurationProperties(prefix = "tdMQ.pulsar")@Datapublic class PulsarProperties {        private String serviceurl;        private String tdcNamespace;        private String tdcToken;        private String cluster;        private Map<String, String> topicMap;        private Map<String, String> subMap;        private String onOff;}

三、Pulsar 配置类

import org.apache.pulsar.client.api.AuthenticationFactory;import org.apache.pulsar.client.api.PulsarClient;import org.apache.pulsar.client.api.PulsarClientException;import org.springframework.beans.factory.annotation.Autowired;import org.springframework.boot.context.properties.EnableConfigurationProperties;import org.springframework.context.annotation.Bean;import org.springframework.context.annotation.Configuration;@Configuration@EnableConfigurationProperties(PulsarProperties.class)public class PulsarConfig {    @Autowired    PulsarProperties pulsarProperties;    @Bean    public PulsarClient getPulsarClient() {        try {            return PulsarClient.builder()                    .authentication(AuthenticationFactory.token(pulsarProperties.getTdcToken()))                    .serviceUrl(pulsarProperties.getServiceurl())                    .build();        } catch (PulsarClientException e) {            System.out.println(e);            throw new RuntimeException("初始化Pulsar Client失败");        }    }}

四、不同消费数据类型的监听器

import com.yibo.pulsar.pojo.User;import org.apache.pulsar.client.api.Consumer;import org.apache.pulsar.client.api.Message;import org.apache.pulsar.client.api.MessageListener;import org.springframework.stereotype.Component;@Componentpublic class UserMessageListener implements MessageListener<User> {    @Override    public void received(Consumer<User> consumer, Message<User> msg) {        try {            User user = msg.getValue();            System.out.println(user);            consumer.acknowledge(msg);        } catch (Exception e) {            consumer.negativeAcknowledge(msg);        }    }}import org.apache.pulsar.client.api.Consumer;import org.apache.pulsar.client.api.Message;import org.apache.pulsar.client.api.MessageListener;import org.springframework.stereotype.Component;@Componentpublic class StringMessageListener implements MessageListener<String> {    @Override    public void received(Consumer<String> consumer, Message<String> msg) {        try {            System.out.println(msg.getValue());            consumer.acknowledge(msg);        } catch (Exception e) {            consumer.negativeAcknowledge(msg);        }    }}

五、Pulsar的核心服务类

import com.yibo.pulsar.common.listener.StringMessageListener;import com.yibo.pulsar.common.listener.UserMessageListener;import com.yibo.pulsar.pojo.User;import org.apache.pulsar.client.api.*;import org.apache.pulsar.client.impl.schema.AvroSchema;import org.springframework.beans.factory.annotation.Autowired;import org.springframework.context.annotation.Bean;import org.springframework.stereotype.Component;import java.util.concurrent.TimeUnit;@Componentpublic class PulsarCommon {    @Autowired    private PulsarProperties pulsarProperties;    @Autowired    private PulsarClient client;    @Autowired    private UserMessageListener userMessageListener;    @Autowired    private StringMessageListener stringMessageListener;        public <T> Producer<T> createProducer(String topic, Schema<T> schema) {        try {            return client.newProducer(schema)                    .topic(pulsarProperties.getCluster() + "/" + pulsarProperties.getTdcNamespace() + "/" + topic)                    .batchingMaxPublishDelay(10, TimeUnit.MILLISECONDS)                    .sendTimeout(10, TimeUnit.SECONDS)                    .blockIfQueueFull(true)                    .create();        } catch (PulsarClientException e) {            throw new RuntimeException("初始化Pulsar Producer失败");        }    }        public <T> Consumer<T> createConsumer(String topic, String subscription,                                   MessageListener<T> messageListener, Schema<T> schema) {        try {            return client.newConsumer(schema)                    .topic(pulsarProperties.getCluster() + "/" + pulsarProperties.getTdcNamespace() + "/" + topic)                    .subscriptionName(subscription)                    .ackTimeout(10, TimeUnit.SECONDS)                    .subscriptionType(SubscriptionType.Shared)                    .messageListener(messageListener)                    .subscribe();        } catch (PulsarClientException e) {            throw new RuntimeException("初始化Pulsar Consumer失败");        }    }            public <T> void sendAsyncMessage(T message, Producer<T> producer) {        producer.sendAsync(message).thenAccept(msgId -> {        });    }                public <T> void sendSyncMessage(T message, Producer<T> producer) throws PulsarClientException {        MessageId send = producer.send(message);        System.out.println();        System.out.println();        System.out.println();        System.out.println();        System.out.println(send);    }        //-----------consumer-----------    @Bean(name = "comment-publish-topic-consumer")    public Consumer<String> getCommentPublishTopicConsumer() {        return this.createConsumer(pulsarProperties.getTopicMap().get("comment-publish-topic"),                pulsarProperties.getSubMap().get("comment-publish-topic-test"),                stringMessageListener, Schema.STRING);    }    @Bean(name = "reply-publish-topic-consumer")    public Consumer<User> getReplyPublishTopicConsumer() {        return this.createConsumer(pulsarProperties.getTopicMap().get("reply-publish-topic"),                pulsarProperties.getSubMap().get("reply-publish-topic-test"),                userMessageListener, AvroSchema.of(User.class));    }    //-----------producer-----------    @Bean(name = "comment-publish-topic-producer")    public Producer<String> getCommentPublishTopicProducer() {        return this.createProducer(pulsarProperties.getTopicMap().get("comment-publish-topic"),Schema.STRING);    }    @Bean(name = "reply-publish-topic-producer")    public Producer<User> getReplyPublishTopicProducer() {        return this.createProducer(pulsarProperties.getTopicMap().get("reply-publish-topic"), AvroSchema.of(User.class));    }}

六、Pulsar整合spring cloud

后来发现如上代码会导致BUG-> 在更新Nacos配置之后 Consumer会挂掉
经排查发现结果是由于@RefreshScope注解导致,此注解将摧毁Bean,PulsarConsumer和Producer都将被摧毁,只是说Producer将在下⼀次调⽤中完成重启,Consumer则不能重启,因为没有调⽤,那么怎么解决呢?

就是发布系列事件以刷新容器

import lombok.extern.slf4j.Slf4j;import org.springframework.beans.factory.annotation.Autowired;import org.springframework.context.ApplicationContext;import org.springframework.context.ApplicationEvent;import org.springframework.context.ApplicationListener;import org.springframework.stereotype.Component;@Component@Slf4jpublic class RefreshPulsarListener implements ApplicationListener {    @Autowired    ApplicationContext applicationContext;    @Override    public void onApplicationEvent(ApplicationEvent event) {        if (event.getSource().equals("__refreshAll__")) {            log.info("Nacos配置中心配置修改 重启Pulsar====================================");            log.info("重启PulsarClient,{}", applicationContext.getBean("getPulsarClient"));            log.info("重启PulsarConsumer,{}", applicationContext.getBean("comment-publish-topic-consumer"));            log.info("重启PulsarConsumer,{}", applicationContext.getBean("reply-publish-topic-consumer"));        }    }}

关于“SpringBoot怎么整合Pulsar”这篇文章的内容就介绍到这里,感谢各位的阅读!相信大家对“SpringBoot怎么整合Pulsar”知识都有一定的了解,大家如果还想学习更多知识,欢迎关注编程网精选频道。

--结束END--

本文标题: SpringBoot怎么整合Pulsar

本文链接: https://www.lsjlt.com/news/342517.html(转载时请注明来源链接)

有问题或投稿请发送至: 邮箱/279061341@qq.com    QQ/279061341

本篇文章演示代码以及资料文档资料下载

下载Word文档到电脑,方便收藏和打印~

下载Word文档
猜你喜欢
  • SpringBoot怎么整合Pulsar
    这篇文章主要介绍了SpringBoot怎么整合Pulsar的相关知识,内容详细易懂,操作简单快捷,具有一定借鉴价值,相信大家阅读完这篇SpringBoot怎么整合Pulsar文章都会有所收获,下面我们一起来看看吧。一、添加pom.xml依赖...
    99+
    2023-07-02
  • SpringBoot怎么整合Apache Pulsar
    这篇文章主要介绍了SpringBoot怎么整合Apache Pulsar的相关知识,内容详细易懂,操作简单快捷,具有一定借鉴价值,相信大家阅读完这篇SpringBoot怎么整合Apache Pulsar文章都会有所收获,...
    99+
    2023-07-05
  • SpringBoot整合Pulsar的实现示例
    目录一、添加pom.xml依赖二、Pulsar 参数类三、Pulsar 配置类四、不同消费数据类型的监听器五、Pulsar的核心服务类六、Pulsar整合Spring Cloud一、...
    99+
    2022-11-13
  • SpringBoot整合Apache Pulsar教程示例
    目录正文准备工作创建 SpringBoot 项目添加 Maven 依赖编写消息生产者编写消息消费者测试总结正文 推荐一个基于SpringBoot开发的全平台数据(数据库管理工具)功...
    99+
    2023-03-10
    SpringBoot整合Apache Pulsar SpringBoot 整合
  • SpringBoot怎么整合EasyExcel
    这篇文章主要介绍“SpringBoot怎么整合EasyExcel”,在日常操作中,相信很多人在SpringBoot怎么整合EasyExcel问题上存在疑惑,小编查阅了各式资料,整理出简单好用的操作方法,希望对大家解答”SpringBoot怎...
    99+
    2023-06-21
  • SpringBoot怎么整合SpringDataJPA
    本篇内容主要讲解“SpringBoot怎么整合SpringDataJPA”,感兴趣的朋友不妨来看看。本文介绍的方法操作简单快捷,实用性强。下面就让小编来带大家学习“SpringBoot怎么整合SpringDataJPA”吧!目录Spring...
    99+
    2023-06-20
  • SpringBoot怎么整合chatGPT
    这篇文章主要介绍了SpringBoot怎么整合chatGPT的相关知识,内容详细易懂,操作简单快捷,具有一定借鉴价值,相信大家阅读完这篇SpringBoot怎么整合chatGPT文章都会有所收获,下面我们一起来看看吧。1 添加依赖 ...
    99+
    2023-07-05
  • SpringBoot怎么整合Kafka
    本文小编为大家详细介绍“SpringBoot怎么整合Kafka”,内容详细,步骤清晰,细节处理妥当,希望这篇“SpringBoot怎么整合Kafka”文章能帮助大家解决疑惑,下面跟着小编的思路慢慢深入,一起...
    99+
    2022-10-19
  • springboot怎么整合docker
    今天就跟大家聊聊有关springboot怎么整合docker,可能很多人都不太了解,为了让大家更加了解,小编给大家总结了以下内容,希望大家根据这篇文章可以有所收获。docker 是一个开源的应用容器引擎,基于 Go 语言 并遵从Apache...
    99+
    2023-06-19
  • Springboot怎么整合RabbitMQ
    本篇文章给大家分享的是有关Springboot怎么整合RabbitMQ,小编觉得挺实用的,因此分享给大家学习,希望大家阅读完这篇文章后可以有所收获,话不多说,跟着小编一起来看看吧。准备工作15minIDEAmaven 3.0在开始构建项目之...
    99+
    2023-06-19
  • Springboot怎么整合https
    本篇内容介绍了“Springboot怎么整合https”的有关知识,在实际案例的操作过程中,不少人都会遇到这样的困境,接下来就让小编带领大家学习一下如何处理这些情况吧!希望大家仔细阅读,能够学有所成!1 简介HTTP是不安全的,我们需要给它...
    99+
    2023-06-08
  • SpringBoot怎么整合SpringDataRedis
    今天小编给大家分享一下SpringBoot怎么整合SpringDataRedis的相关知识点,内容详细,逻辑清晰,相信大部分人都还太了解这方面的知识,所以分享这篇文章给大家参考一下,希望大家阅读完这篇文章后有所收获,下面我们一起来了解一下吧...
    99+
    2023-06-19
  • SpringBoot怎么整合tkMapper
    本篇内容主要讲解“SpringBoot怎么整合tkMapper”,感兴趣的朋友不妨来看看。本文介绍的方法操作简单快捷,实用性强。下面就让小编来带大家学习“SpringBoot怎么整合tkMapper”吧!SpringBoot整合tkMapp...
    99+
    2023-07-04
  • SpringBoot怎么整合Mybatis
    这篇文章主要介绍了SpringBoot怎么整合Mybatis的相关知识,内容详细易懂,操作简单快捷,具有一定借鉴价值,相信大家阅读完这篇SpringBoot怎么整合Mybatis文章都会有所收获,下面我们一起来看看吧。Mybatis的简单介...
    99+
    2023-07-05
  • SpringBoot中怎么整合SpringSecurity
    SpringBoot中怎么整合SpringSecurity,很多新手对此不是很清楚,为了帮助大家解决这个难题,下面小编将为大家详细讲解,有这方面需求的人可以来学习下,希望你能有所收获。1.导包<dependency> ...
    99+
    2023-06-05
  • springboot怎么整合xxl-job
    本文小编为大家详细介绍“springboot怎么整合xxl-job”,内容详细,步骤清晰,细节处理妥当,希望这篇“springboot怎么整合xxl-job”文章能帮助大家解决疑惑,下面跟着小编的思路慢慢深入,一起来学习新知识吧。关于xxl...
    99+
    2023-07-02
  • SpringBoot中怎么整合Redis
    SpringBoot中怎么整合Redis,针对这个问题,这篇文章详细介绍了相对应的分析和解答,希望可以帮助更多想解决这个问题的小伙伴找到更简单易行的方法。一、安装首先要在本地安装一个redis程序,安装过程十分简单(略过),安装完成后进入到...
    99+
    2023-06-16
  • SpringBoot怎么快速整合SpringSecurity
    这篇文章主要介绍了SpringBoot怎么快速整合SpringSecurity的相关知识,内容详细易懂,操作简单快捷,具有一定借鉴价值,相信大家阅读完这篇SpringBoot怎么快速整合SpringSecurity文章都会有所收获,下面我们...
    99+
    2023-07-05
  • SpringBoot怎么整合mybatis+mybatis-plus
    本文小编为大家详细介绍“SpringBoot怎么整合mybatis+mybatis-plus”,内容详细,步骤清晰,细节处理妥当,希望这篇“SpringBoot怎么整合mybatis+mybatis-plus”文章能帮助大家解决疑惑,下面跟...
    99+
    2023-07-02
  • SpringBoot怎么整合Redis缓存
    SpringBoot怎么整合Redis缓存?针对这个问题,今天小编总结了这篇文章,希望能帮助更多想解决这个问题的朋友找到更加简单易行的办法。1、引入缓存依赖<dependency> &...
    99+
    2022-10-18
软考高级职称资格查询
编程网,编程工程师的家园,是目前国内优秀的开源技术社区之一,形成了由开源软件库、代码分享、资讯、协作翻译、讨论区和博客等几大频道内容,为IT开发者提供了一个发现、使用、并交流开源技术的平台。
  • 官方手机版

  • 微信公众号

  • 商务合作