广告
返回顶部
首页 > 资讯 > 精选 >如何解决SpringBoot整合RocketMQ遇到的问题
  • 892
分享到

如何解决SpringBoot整合RocketMQ遇到的问题

2023-06-20 13:06:59 892人浏览 薄情痞子
摘要

本篇内容主要讲解“如何解决SpringBoot整合RocketMQ遇到的问题”,感兴趣的朋友不妨来看看。本文介绍的方法操作简单快捷,实用性强。下面就让小编来带大家学习“如何解决springBoot整合RocketMQ遇到的问题”吧!应用场景

本篇内容主要讲解“如何解决SpringBoot整合RocketMQ遇到的问题”,感兴趣的朋友不妨来看看。本文介绍的方法操作简单快捷,实用性强。下面就让小编来带大家学习“如何解决springBoot整合RocketMQ遇到的问题”吧!

应用场景

在实现RocketMQ消费时,一般会用到@RocketMQMessageListener注解定义Group、Topic以及selectorExpression(数据过滤、选择的规则)为了能支持动态筛选数据,一般都会使用表达式,然后通过apollo或者cloud config进行动态切换。

引入依赖

 <!-- RocketMq Spring Boot Starter--> <dependency>  <groupId>org.apache.rocketmq</groupId>  <artifactId>rocketmq-spring-boot-starter</artifactId>  <version>2.0.4</version>  </dependency>

消费者代码

@RocketMQMessageListener(consumerGroup = "${rocketmq.group}",topic ="${rocketmq.topic}",selectorExpression = "${rocketmq.selectorExpression}")public class Consumer implements RocketMQListener<String> {    @Override    public void onMessage(String s) {        System.out.println("消费到的数据为:"+s);    }}

问题排查

RocketMQMessageListener整个注解默认selectorExpression为*,表示接收当前Topic下的所有数据,如果我们想对tags进行动态配置,在使用${rocketmq.selectorExpression}表达式时会发现所有数据全被过滤了,跟踪源码(ListenerContainerConfiguration.java)发现在创建listener时selectorExpression的数据在通environment环境变量中获取对应的数据后又被覆盖了,导致整个过滤条件被变更为表达式。

@Override    public void afterSingletonsInstantiated() {    // 获取所有所有使用了RocketMQMessageListener注解的bean        Map<String, Object> beans = this.applicationContext.getBeansWithAnnotation(RocketMQMessageListener.class);        if (Objects.nonNull(beans)) {        // 循环注册容器            beans.forEach(this::reGISterContainer);        }    }    private void registerContainer(String beanName, Object bean) {        Class<?> clazz = aopProxyUtils.ultimateTargetClass(bean);// 校验当前bean是否实现了RocketMQListener接口        if (!RocketMQListener.class.isAssignableFrom(bean.getClass())) {            throw new IllegalStateException(clazz + " is not instance of " + RocketMQListener.class.getName());        }// 获取bean上的annotation        RocketMQMessageListener annotation = clazz.getAnnotation(RocketMQMessageListener.class);// 解析group及topic,可支持表达式        String consumerGroup = this.environment.resolvePlaceholders(annotation.consumerGroup());        String topic = this.environment.resolvePlaceholders(annotation.topic());        boolean listenerEnabled =            (boolean)rocketMQProperties.getConsumer().getListeners().getOrDefault(consumerGroup, Collections.EMPTY_MAP)                .getOrDefault(topic, true);        if (!listenerEnabled) {            log.debug(                "Consumer Listener (group:{},topic:{}) is not enabled by configuration, will ignore initialization.",                consumerGroup, topic);            return;        }        validate(annotation);        String containerBeanName = String.fORMat("%s_%s", DefaultRocketMQListenerContainer.class.getName(),            counter.incrementAndGet());        GenericApplicationContext genericApplicationContext = (GenericApplicationContext)applicationContext;// 注册bean的,调用createRocketMQListenerContainer        genericApplicationContext.registerBean(containerBeanName, DefaultRocketMQListenerContainer.class,            () -> createRocketMQListenerContainer(containerBeanName, bean, annotation));        DefaultRocketMQListenerContainer container = genericApplicationContext.getBean(containerBeanName,            DefaultRocketMQListenerContainer.class);        if (!container.isRunning()) {            try {                container.start();            } catch (Exception e) {                log.error("Started container failed. {}", container, e);                throw new RuntimeException(e);            }        }        log.info("Register the listener to container, listenerBeanName:{}, containerBeanName:{}", beanName, containerBeanName);    }    private DefaultRocketMQListenerContainer createRocketMQListenerContainer(String name, Object bean,        RocketMQMessageListener annotation) {        DefaultRocketMQListenerContainer container = new DefaultRocketMQListenerContainer();                container.setRocketMQMessageListener(annotation);                String nameServer = environment.resolvePlaceholders(annotation.nameServer());        nameServer = StringUtils.isEmpty(nameServer) ? rocketMQProperties.getNameServer() : nameServer;        String accessChannel = environment.resolvePlaceholders(annotation.accessChannel());        container.setNameServer(nameServer);        if (!StringUtils.isEmpty(accessChannel)) {            container.setAccessChannel(AccessChannel.valueOf(accessChannel));        }        container.setTopic(environment.resolvePlaceholders(annotation.topic()));        // 此处已经根据表达式将数据取出        String tags = environment.resolvePlaceholders(annotation.selectorExpression());        if (!StringUtils.isEmpty(tags)) {            container.setSelectorExpression(tags);        }        container.setConsumerGroup(environment.resolvePlaceholders(annotation.consumerGroup()));        // 此处将SelectorExpression的数据覆盖成了表达式        container.setRocketMQMessageListener(annotation);        container.setRocketMQListener((RocketMQListener)bean);        container.setObjectMapper(objectMapper);        container.setMessageConverter(rocketMQMessageConverter.getMessageConverter());        container.setName(name);  // REVIEW ME, use the same clientId or multiple?        return container;    }

问题解决

因为ListenerContainerConfiguration类是实现了SmartInitializingSingleton接口的afterSingletonsInstantiated方法,我们可以通过反射对selectorExpression的数据在ListenerContainerConfiguration进行初始化前进行解析并赋值回去。

@Configurationpublic class ChangeSelectorExpressionBeforeMQInit implements InitializingBean {    @Autowired    private ApplicationContext applicationContext;    @Autowired    private StandardEnvironment environment;    @Override    public void afterPropertiesSet() throws Exception {        Map<String,Object> beans =applicationContext.getBeansWithAnnotation(RocketMQMessageListener.class);        for (Object bean : beans.values()){            Class<?> clazz = AopProxyUtils.ultimateTargetClass(bean);            if (!RocketMQListener.class.isAssignableFrom(bean.getClass())) {                continue;            }            RocketMQMessageListener annotation = clazz.getAnnotation(RocketMQMessageListener.class);            InvocationHandler invocationHandler = Proxy.getInvocationHandler(annotation);            Field field = invocationHandler.getClass().getDeclaredField("memberValues");            field.setAccessible(true);            Map<String, Object> memberValues = (Map<String, Object>) field.get(invocationHandler);            for (Map.Entry<String,Object> entry: memberValues.entrySet()) {                if(Objects.nonNull(entry)){                    memberValues.put(entry.geTKEy(),environment.resolvePlaceholders(String.valueOf(entry.getValue())));                }            }        }    }}

除此之外,在2.1.0版本的依赖包中已经修复了此Bug,在不造成依赖冲突的前提下,建议使用2.1.0以上的版本包。

到此,相信大家对“如何解决SpringBoot整合RocketMQ遇到的问题”有了更深的了解,不妨来实际操作一番吧!这里是编程网网站,更多相关内容可以进入相关频道进行查询,关注我们,继续学习!

--结束END--

本文标题: 如何解决SpringBoot整合RocketMQ遇到的问题

本文链接: https://www.lsjlt.com/news/296954.html(转载时请注明来源链接)

有问题或投稿请发送至: 邮箱/279061341@qq.com    QQ/279061341

本篇文章演示代码以及资料文档资料下载

下载Word文档到电脑,方便收藏和打印~

下载Word文档
猜你喜欢
  • 如何解决SpringBoot整合RocketMQ遇到的问题
    本篇内容主要讲解“如何解决SpringBoot整合RocketMQ遇到的问题”,感兴趣的朋友不妨来看看。本文介绍的方法操作简单快捷,实用性强。下面就让小编来带大家学习“如何解决SpringBoot整合RocketMQ遇到的问题”吧!应用场景...
    99+
    2023-06-20
  • 解决SpringBoot整合RocketMQ遇到的坑
    应用场景 在实现RocketMQ消费时,一般会用到@RocketMQMessageListener注解定义Group、Topic以及selectorExpression(数...
    99+
    2022-11-12
  • SpringBoot整合RocketMQ遇到的坑怎么解决
    本篇内容主要讲解“SpringBoot整合RocketMQ遇到的坑怎么解决”,感兴趣的朋友不妨来看看。本文介绍的方法操作简单快捷,实用性强。下面就让小编来带大家学习“SpringBoot整合RocketMQ遇到的坑怎么解决”吧!应用场景在实...
    99+
    2023-06-08
  • 解决SpringBoot整合ElasticSearch遇到的连接问题
    SpringBoot整合ElasticSearch的连接问题 failed to load elasticsearch nodes : org.elasticsearch.clie...
    99+
    2022-11-12
  • 解决springboot整合druid遇到的坑
    springboot整合druid的坑 项目环境 springboot 2.1.6.RELEASE jdk 1.8 pom.xml配置 <?xm...
    99+
    2022-11-12
  • SpringBoot整合kafka遇到的版本不对应问题及解决
    目录SpringBoot整合kafka遇到版本不对应如果你的SpringBoot是2.0.3版本如果你的SpringBoot比较新,用的2.1.0版本SpringBoot整合kafk...
    99+
    2022-11-13
  • spring/springboot整合curator遇到的坑及解决
    目录整个代码可项目遇到了两个问题解决办法近期本人在搭建自己的调度平台项目使用到了zookeeper做执行器自动注册中心时,使用到了springboot2.0+curator4.0版本...
    99+
    2022-11-13
  • SpringBoot详解整合MyBatis过程中可能遇到的问题
    尽量不要用 jUnit 提供的单元测试 提一个要求尽量使用SpringBoot 提供的测试类进行测试,能够自动扫描组件以及使用容器中的bean对象 还有如果有组件 中存在注入对象的话...
    99+
    2022-11-13
  • 如何解决SpringBoot整合thymeleaf报错的问题
    这篇文章主要讲解了“如何解决SpringBoot整合thymeleaf报错的问题”,文中的讲解内容简单清晰,易于学习与理解,下面请大家跟着小编的思路慢慢深入,一起来研究和学习“如何解决SpringBoot整合thymeleaf报错的问题”吧...
    99+
    2023-06-20
  • 解决SpringBoot整合MybatisPlus分模块管理遇到的bug
    前言 这个Bug前前后后折腾了两天才找到答案,虽说不是完全两天的工作时间在调试这个问题,但是过程也确实曲折,所以做一下记录,也当做一次自我反省 背景 SpringBoot 与 MyB...
    99+
    2022-11-12
  • 如何解决SpringBoot配置文件application.yml遇到的问题
    这篇文章将为大家详细讲解有关如何解决SpringBoot配置文件application.yml遇到的问题,小编觉得挺实用的,因此分享给大家做个参考,希望大家阅读完这篇文章后可以有所收获。配置文件application.yml遇到的坑1.第一...
    99+
    2023-06-29
  • SpringBoot整合MyBatis过程中可能遇到的问题有哪些
    本文小编为大家详细介绍“SpringBoot整合MyBatis过程中可能遇到的问题有哪些”,内容详细,步骤清晰,细节处理妥当,希望这篇“SpringBoot整合MyBatis过程中可能遇到的问题有哪些”文章能帮助大家解决疑惑,下面跟着小编的...
    99+
    2023-07-02
  • SpringBoot @PathVariable使用时遇到的问题及解决
    目录@PathVariable使用时遇到的问题第一个问题解决办法第二个问题解决办法@PathVariable 404问题@PathVariable使用时遇到的问题 第一个问题 接口:...
    99+
    2022-11-12
  • 解决SpringBoot中使用@Transactional注解遇到的问题
    目录使用@Transactional注解遇到的问题1、不建议在接口上添加@Transactional注解2、@Transactional注解3、默认情况下4、数据库引擎需要支持事务管...
    99+
    2022-11-12
  • SpringBoot整合Lombok及常见问题解决
    目录Lombok2. Lombok注解失效原因整合过程1. 引入Lombok依赖:2. 安装Lombok插件优点与缺点(可能出现的问题即解决方法)Lombok Lombok能以简单的...
    99+
    2022-11-13
  • 解决Spring boot 整合Junit遇到的坑
    目录这是我在使用springboot整合Junit的时候遇到的坑1.在pom.xml中添加junit环境的依赖2.在src/test/java下建立测试类3.自己编写的启动类Spri...
    99+
    2022-11-12
  • SpringBoot整合Mybatis,解决TypeAliases配置失败的问题如何解决
    本篇内容主要讲解“SpringBoot整合Mybatis,解决TypeAliases配置失败的问题如何解决”,感兴趣的朋友不妨来看看。本文介绍的方法操作简单快捷,实用性强。下面就让小编来带大家学习“SpringBoot整合Mybatis,解...
    99+
    2023-06-20
  • SpringBoot启动遇到的异常问题及解决方案
    目录SpringBoot启动遇到异常1、 问题2、异常3、异常4、异常5、 异常6、异常7、异常SpringBoot优雅的处理异常使用异常创建统一异常类测试SpringBoot启动遇...
    99+
    2022-11-13
  • 使用springboot时,解决@Scheduled定时器遇到的问题
    目录@Scheduled定时器遇到的问题下面说一下@Scheduled 注解的几个参数一、可以通过配置文件配置进来的二、不可通过配置文件配置的 (作用相同)定时任务@Schedule...
    99+
    2022-11-12
  • 如何解决springboot整合cxf-jaxrs中json转换的问题
    本篇内容主要讲解“如何解决springboot整合cxf-jaxrs中json转换的问题”,感兴趣的朋友不妨来看看。本文介绍的方法操作简单快捷,实用性强。下面就让小编来带大家学习“如何解决springboot整合cxf-jaxrs中json...
    99+
    2023-06-20
软考高级职称资格查询
编程网,编程工程师的家园,是目前国内优秀的开源技术社区之一,形成了由开源软件库、代码分享、资讯、协作翻译、讨论区和博客等几大频道内容,为IT开发者提供了一个发现、使用、并交流开源技术的平台。
  • 官方手机版

  • 微信公众号

  • 商务合作