iis服务器助手广告广告
返回顶部
首页 > 资讯 > 后端开发 > Python >SpringBoot整合Pulsar的实现示例
  • 353
分享到

SpringBoot整合Pulsar的实现示例

2024-04-02 19:04:59 353人浏览 独家记忆

Python 官方文档:入门教程 => 点击学习

摘要

目录一、添加pom.xml依赖二、Pulsar 参数类三、Pulsar 配置类四、不同消费数据类型的监听器五、Pulsar的核心服务类六、Pulsar整合spring cloud一、

一、添加pom.xml依赖

<parent>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-parent</artifactId>
    <version>2.7.0</version>
</parent>

<dependencies>
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-WEB</artifactId>
    </dependency>

    <dependency>
        <groupId>org.apache.pulsar</groupId>
        <artifactId>pulsar-client</artifactId>
        <version>2.10.0</version>
    </dependency>

    <dependency>
        <groupId>org.projectlombok</groupId>
        <artifactId>lombok</artifactId>
        <version>1.18.24</version>
        <scope>provided</scope>
    </dependency>
</dependencies>

<build>
    <plugins>
        <plugin>
            <groupId>org.apache.Maven.plugins</groupId>
            <artifactId>maven-compiler-plugin</artifactId>
            <configuration>
                <source>8</source>
                <target>8</target>
            </configuration>
        </plugin>
    </plugins>
</build>    

二、Pulsar 参数类

import lombok.Data;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.stereotype.Component;

import java.util.Map;



@Component
@ConfigurationProperties(prefix = "tdMQ.pulsar")
@Data
public class PulsarProperties {

    
    private String serviceurl;

    
    private String tdcNamespace;

    
    private String tdcToken;

    
    private String cluster;

    
    private Map<String, String> topicMap;

    
    private Map<String, String> subMap;

    
    private String onOff;
}

三、Pulsar 配置类

import org.apache.pulsar.client.api.AuthenticationFactory;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.PulsarClientException;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.context.properties.EnableConfigurationProperties;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;



@Configuration
@EnableConfigurationProperties(PulsarProperties.class)
public class PulsarConfig {

    @Autowired
    PulsarProperties pulsarProperties;

    @Bean
    public PulsarClient getPulsarClient() {

        try {
            return PulsarClient.builder()
                    .authentication(AuthenticationFactory.token(pulsarProperties.getTdcToken()))
                    .serviceUrl(pulsarProperties.getServiceurl())
                    .build();
        } catch (PulsarClientException e) {
            System.out.println(e);
            throw new RuntimeException("初始化Pulsar Client失败");
        }
    }

}

四、不同消费数据类型的监听器

import com.yibo.pulsar.pojo.User;
import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.MessageListener;
import org.springframework.stereotype.Component;



@Component
public class UserMessageListener implements MessageListener<User> {

    @Override
    public void received(Consumer<User> consumer, Message<User> msg) {
        try {
            User user = msg.getValue();
            System.out.println(user);
            consumer.acknowledge(msg);
        } catch (Exception e) {
            consumer.negativeAcknowledge(msg);
        }
    }
}
import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.MessageListener;
import org.springframework.stereotype.Component;



@Component
public class StringMessageListener implements MessageListener<String> {

    @Override
    public void received(Consumer<String> consumer, Message<String> msg) {
        try {
            System.out.println(msg.getValue());
            consumer.acknowledge(msg);
        } catch (Exception e) {
            consumer.negativeAcknowledge(msg);
        }
    }
}

五、Pulsar的核心服务类

import com.yibo.pulsar.common.listener.StringMessageListener;
import com.yibo.pulsar.common.listener.UserMessageListener;
import com.yibo.pulsar.pojo.User;
import org.apache.pulsar.client.api.*;
import org.apache.pulsar.client.impl.schema.AvroSchema;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.stereotype.Component;

import java.util.concurrent.TimeUnit;



@Component
public class PulsarCommon {

    @Autowired
    private PulsarProperties pulsarProperties;

    @Autowired
    private PulsarClient client;

    @Autowired
    private UserMessageListener userMessageListener;

    @Autowired
    private StringMessageListener stringMessageListener;


    
    public <T> Producer<T> createProducer(String topic, Schema<T> schema) {

        try {
            return client.newProducer(schema)
                    .topic(pulsarProperties.getCluster() + "/" + pulsarProperties.getTdcNamespace() + "/" + topic)
                    .batchingMaxPublishDelay(10, TimeUnit.MILLISECONDS)
                    .sendTimeout(10, TimeUnit.SECONDS)
                    .blockIfQueueFull(true)
                    .create();
        } catch (PulsarClientException e) {
            throw new RuntimeException("初始化Pulsar Producer失败");
        }
    }


    
    public <T> Consumer<T> createConsumer(String topic, String subscription,
                                   MessageListener<T> messageListener, Schema<T> schema) {
        try {
            return client.newConsumer(schema)
                    .topic(pulsarProperties.getCluster() + "/" + pulsarProperties.getTdcNamespace() + "/" + topic)
                    .subscriptionName(subscription)
                    .ackTimeout(10, TimeUnit.SECONDS)
                    .subscriptionType(SubscriptionType.Shared)
                    .messageListener(messageListener)
                    .subscribe();
        } catch (PulsarClientException e) {
            throw new RuntimeException("初始化Pulsar Consumer失败");
        }
    }

    
    
    public <T> void sendAsyncMessage(T message, Producer<T> producer) {
        producer.sendAsync(message).thenAccept(msgId -> {
        });
    }
    
    
    
    public <T> void sendSyncMessage(T message, Producer<T> producer) throws PulsarClientException {
        MessageId send = producer.send(message);
        System.out.println();
        System.out.println();
        System.out.println();
        System.out.println();
        System.out.println(send);
    }

    
    //-----------consumer-----------
    @Bean(name = "comment-publish-topic-consumer")
    public Consumer<String> getCommentPublishTopicConsumer() {
        return this.createConsumer(pulsarProperties.getTopicMap().get("comment-publish-topic"),
                pulsarProperties.getSubMap().get("comment-publish-topic-test"),
                stringMessageListener, Schema.STRING);
    }


    @Bean(name = "reply-publish-topic-consumer")
    public Consumer<User> getReplyPublishTopicConsumer() {
        return this.createConsumer(pulsarProperties.getTopicMap().get("reply-publish-topic"),
                pulsarProperties.getSubMap().get("reply-publish-topic-test"),
                userMessageListener, AvroSchema.of(User.class));
    }


    //-----------producer-----------
    @Bean(name = "comment-publish-topic-producer")
    public Producer<String> getCommentPublishTopicProducer() {
        return this.createProducer(pulsarProperties.getTopicMap().get("comment-publish-topic"),Schema.STRING);
    }


    @Bean(name = "reply-publish-topic-producer")
    public Producer<User> getReplyPublishTopicProducer() {
        return this.createProducer(pulsarProperties.getTopicMap().get("reply-publish-topic"), AvroSchema.of(User.class));
    }
}

六、Pulsar整合Spring Cloud

后来发现如上代码会导致BUG-> 在更新Nacos配置之后 Consumer会挂掉
经排查发现结果是由于@RefreshScope注解导致,此注解将摧毁Bean,PulsarConsumer和Producer都将被摧毁,只是说Producer将在下⼀次调⽤中完成重启,Consumer则不能重启,因为没有调⽤,那么怎么解决呢?

就是发布系列事件以刷新容器

import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.ApplicationContext;
import org.springframework.context.ApplicationEvent;
import org.springframework.context.ApplicationListener;
import org.springframework.stereotype.Component;



@Component
@Slf4j
public class RefreshPulsarListener implements ApplicationListener {

    @Autowired
    ApplicationContext applicationContext;

    @Override
    public void onApplicationEvent(ApplicationEvent event) {

        if (event.getSource().equals("__refreshAll__")) {
            log.info("Nacos配置中心配置修改 重启Pulsar====================================");
            log.info("重启PulsarClient,{}", applicationContext.getBean("getPulsarClient"));
            log.info("重启PulsarConsumer,{}", applicationContext.getBean("comment-publish-topic-consumer"));
            log.info("重启PulsarConsumer,{}", applicationContext.getBean("reply-publish-topic-consumer"));
        }
    }

}

参考:

https://wenku.baidu.com/view/4d3337ab6b0203D8ce2f0066f5335a8102d266a7.html

Https://gitee.com/zhaoyuxuan66/pulsar-SpringCloud_boot-demo/tree/master/

https://blog.csdn.net/weixin_56227932/article/details/122897075

http://www.zzvips.com/article/219361.html

https://mp.weixin.qq.com/s/4w0eucDNcrYrsiDXHzLwuQ

到此这篇关于SpringBoot整合Pulsar的实现示例的文章就介绍到这了,更多相关SpringBoot整合Pulsar内容请搜索编程网以前的文章或继续浏览下面的相关文章希望大家以后多多支持编程网!

--结束END--

本文标题: SpringBoot整合Pulsar的实现示例

本文链接: https://www.lsjlt.com/news/153223.html(转载时请注明来源链接)

有问题或投稿请发送至: 邮箱/279061341@qq.com    QQ/279061341

本篇文章演示代码以及资料文档资料下载

下载Word文档到电脑,方便收藏和打印~

下载Word文档
猜你喜欢
  • SpringBoot整合Pulsar的实现示例
    目录一、添加pom.xml依赖二、Pulsar 参数类三、Pulsar 配置类四、不同消费数据类型的监听器五、Pulsar的核心服务类六、Pulsar整合Spring Cloud一、...
    99+
    2024-04-02
  • SpringBoot整合Apache Pulsar教程示例
    目录正文准备工作创建 SpringBoot 项目添加 Maven 依赖编写消息生产者编写消息消费者测试总结正文 推荐一个基于SpringBoot开发的全平台数据(数据库管理工具)功...
    99+
    2023-03-10
    SpringBoot整合Apache Pulsar SpringBoot 整合
  • Springboot整合FreeMarker的实现示例
    目录一、项目搭建1、新建模块2、导入依赖 :将不相关的依赖删掉3、新建软件包,放入student实体类4、新建StudentMapper接口5、Springboot04Applica...
    99+
    2024-04-02
  • springboot 整合 clickhouse的实现示例
    目录前言前置准备使用jdbc方式操作clickhouse与springboot的整合代码完整整合步骤前言 了解了clickhouse的基础概念和相关的理论之后,本篇将通过实例代码演示...
    99+
    2024-04-02
  • SpringBoot怎么整合Pulsar
    这篇文章主要介绍了SpringBoot怎么整合Pulsar的相关知识,内容详细易懂,操作简单快捷,具有一定借鉴价值,相信大家阅读完这篇SpringBoot怎么整合Pulsar文章都会有所收获,下面我们一起来看看吧。一、添加pom.xml依赖...
    99+
    2023-07-02
  • SpringBoot整合JWT的实现示例
    目录一. JWT简介二. Java实现JWT(SpringBoot方式整合)JWT总结一. JWT简介 1. 什么是JWT? JWT(JSON Web Token)是为了在...
    99+
    2024-04-02
  • SpringBoot整合Redis的实现示例
    目录1.需求说明2.整合实现2.1.创建Springboot工程2.2.redis配置3.编写测试类4.注意事项和细节1.需求说明 在 springboot 中 , 整合 redis...
    99+
    2023-01-28
    SpringBoot整合Redis SpringBoot Redis整合
  • SpringBoot整合WebService的实现示例
    目录SpringBoot搭建WebService程序一、定义规范接口二、搭建WebService服务端三、搭建WebService客户端WebService是一种传统的SOA技术架构...
    99+
    2024-04-02
  • springboot整合spring-retry的实现示例
    目录1、背景2、解决方案2.1 pom文件2.2 applicat启动类2.3 controller类2.4 service测试类(重点)2.5 项目启动2.6 使用swagger进...
    99+
    2024-04-02
  • SpringBoot怎么整合Apache Pulsar
    这篇文章主要介绍了SpringBoot怎么整合Apache Pulsar的相关知识,内容详细易懂,操作简单快捷,具有一定借鉴价值,相信大家阅读完这篇SpringBoot怎么整合Apache Pulsar文章都会有所收获,...
    99+
    2023-07-05
  • springboot 整合canal实现示例解析
    目录前言环境准备一、快速搭建canal服务搭建步骤1、服务器使用docker快速安装一个mysql并开启binlog日志2、上传canal安装包并解压3、进入到第二步解压后的文件目录...
    99+
    2024-04-02
  • SpringBoot整合Netty实现WebSocket的示例代码
    目录一、pom.xml依赖配置二、代码2.1、NettyServer 类2.2、SocketHandler 类2.3、ChannelHandlerPool 类2.4、Applicat...
    99+
    2024-04-02
  • SpringBoot整合MybatisPlus实现基本CRUD的示例代码
    目录一、引入相应的依赖二、进行配置三、新建数据库表四、配置 Mybatis Plus 自动填充五、实现User实体类、UserMapper、UserService六、使用Restfu...
    99+
    2023-05-19
    SpringBoot MybatisPlus实现CRUD SpringBoot MybatisPlus CRUD
  • SpringBoot整合Mybatis Plus多数据源的实现示例
    目录导读添加依赖application.properties 2种方式创建DataSource Master配置,使用druid连接池 Slave配置 启动类演示导读   有一个这样...
    99+
    2024-04-02
  • SpringBoot整合aws的示例代码
    业务需求 将本地的一些文件保存到aws上 引入依赖 创建client 工具类 引入依赖 <dependency> ...
    99+
    2024-04-02
  • SpringBoot整合Liquibase的示例代码
    目录整合1整合2SpringBoot整合Liquibase虽然不难但坑还是有一点的,主要集中在配置路径相关的地方,在此记录一下整合的步骤,方便以后自己再做整合时少走弯路,当然也希望能...
    99+
    2024-04-02
  • SpringBoot整合ElasticSearch的示例代码
    ElasticSearch作为基于Lucene的搜索服务器,既可以作为一个独立的服务部署,也可以签入Web应用中。SpringBoot作为Spring家族的全新框架,使得使用SpringBoot开发Spring应用变得非常简单。本文要介绍如...
    99+
    2023-05-31
    spring boot elasticsearch
  • SpringBoot整合MyBatis的示例分析
    这篇文章主要介绍了SpringBoot整合MyBatis的示例分析,具有一定借鉴价值,感兴趣的朋友可以参考下,希望大家阅读完这篇文章之后大有收获,下面让小编带着大家一起了解一下。1.整合MyBatis操作前面一篇提到了SpringBoot整...
    99+
    2023-06-15
  • SpringBoot整合MybatisPlus的示例分析
    这篇文章给大家分享的是有关SpringBoot整合MybatisPlus的示例分析的内容。小编觉得挺实用的,因此分享给大家做个参考,一起跟随小编过来看看吧。创建个SpringBoot项目勾选生所需的依赖:我把application的后缀改为...
    99+
    2023-06-20
  • SpringBoot整合SpringDataRedis的示例代码
      本文介绍下SpringBoot如何整合SpringDataRedis框架的,SpringDataRedis具体的内容在前面已经介绍过了,可自行参考。 1....
    99+
    2024-04-02
软考高级职称资格查询
编程网,编程工程师的家园,是目前国内优秀的开源技术社区之一,形成了由开源软件库、代码分享、资讯、协作翻译、讨论区和博客等几大频道内容,为IT开发者提供了一个发现、使用、并交流开源技术的平台。
  • 官方手机版

  • 微信公众号

  • 商务合作