iis服务器助手广告广告
返回顶部
首页 > 资讯 > 精选 >RocketMQ怎么实现请求异步处理
  • 466
分享到

RocketMQ怎么实现请求异步处理

2023-06-19 09:06:38 466人浏览 八月长安
摘要

这篇文章主要介绍“RocketMQ怎么实现请求异步处理”,在日常操作中,相信很多人在RocketMQ怎么实现请求异步处理问题上存在疑惑,小编查阅了各式资料,整理出简单好用的操作方法,希望对大家解答”RocketMQ怎么实现请求异步处理”的疑

这篇文章主要介绍“RocketMQ怎么实现请求异步处理”,在日常操作中,相信很多人在RocketMQ怎么实现请求异步处理问题上存在疑惑,小编查阅了各式资料,整理出简单好用的操作方法,希望对大家解答”RocketMQ怎么实现请求异步处理”的疑惑有所帮助!接下来,请跟着小编一起来学习吧!

一、RocketMQ

1、架构图片

RocketMQ怎么实现请求异步处理

2、角色分类

(1)、Broker

RocketMQ 的核心,接收 Producer 发过来的消息、处理 Consumer 的消费消息请求、消息的持 久化存储、服务端过滤功能等 。

(2)、NameServer

消息队列中的状态服务器集群的各个组件通过它来了解全局的信息 。类似微服务中注册中心的服务注册,发现,下线,上线的概念。

热备份: NamServer可以部署多个,相互之间独立,其他角色同时向多个NameServer 机器上报状态信息。

心跳机制: NameServer 中的 Broker、 Topic等状态信息不会持久存储,都是由各个角色定时上报并存储到内存中,超时不上报的话, NameServer会认为某个机器出故障不可用。

(3)、Producer

消息的生成者,最常用的producer类就是DefaultMQProducer。

(4)、Consumer

消息的消费者,常用Consumer类 DefaultMQPushConsumer 收到消息后自动调用传入的处理方法来处理,实时性高 DefaultMQPullConsumer 用户自主控制 ,灵活性更高。

3、通信机制

(1)、Broker启动后需要完成一次将自己注册至NameServer的操作;随后每隔30s时间定时向NameServer更新Topic路由信息。

(2)、Producer发送消息时候,需要根据消息的Topic从本地缓存的获取路由信息。如果没有则更新路由信息会从NameServer重新拉取,同时Producer会默认每隔30s向NameServer拉取一次路由信息。

(3)、Consumer消费消息时候,从NameServer获取的路由信息,并再完成客户端的负载均衡后,监听指定消息队列获取消息并进行消费。

二、代码实现案例

1、项目结构图

RocketMQ怎么实现请求异步处理

版本描述

<spring-boot.version>2.1.3.RELEASE</spring-boot.version><rocketmq.version>4.3.0</rocketmq.version>

2、配置文件

rocketmq:  # 生产者配置  producer:    isOnOff: on    # 发送同一类消息的设置为同一个group,保证唯一    groupName: CicadaGroup    # 服务地址    namesrvAddr: 127.0.0.1:9876    # 消息最大长度 默认1024*4(4M)    maxMessageSize: 4096    # 发送消息超时时间,默认3000    sendMsgTimeout: 3000    # 发送消息失败重试次数,默认2    retryTimesWhenSendFailed: 2  # 消费者配置  consumer:    isOnOff: on    # 官方建议:确保同一组中的每个消费者订阅相同的主题。    groupName: CicadaGroup    # 服务地址    namesrvAddr: 127.0.0.1:9876    # 接收该 Topic 下所有 Tag    topics: CicadaTopic~*;    consumeThreadMin: 20    consumeThreadMax: 64    # 设置一次消费消息的条数,默认为1条    consumeMessageBatchMaxSize: 1# 配置 Group  Topic  Tagrocket:  group: rocketGroup  topic: rocketTopic  tag: rocketTag

3、生产者配置

@Configurationpublic class ProducerConfig {    private static final Logger LOG = LoggerFactory.getLogger(ProducerConfig.class) ;    @Value("${rocketmq.producer.groupName}")    private String groupName;    @Value("${rocketmq.producer.namesrvAddr}")    private String namesrvAddr;    @Value("${rocketmq.producer.maxMessageSize}")    private Integer maxMessageSize ;    @Value("${rocketmq.producer.sendMsgTimeout}")    private Integer sendMsgTimeout;    @Value("${rocketmq.producer.retryTimesWhenSendFailed}")    private Integer retryTimesWhenSendFailed;    @Bean    public DefaultMQProducer getRocketMQProducer() {        DefaultMQProducer producer;        producer = new DefaultMQProducer(this.groupName);        producer.setNamesrvAddr(this.namesrvAddr);        //如果需要同一个JVM中不同的producer往不同的mq集群发送消息,需要设置不同的instanceName        if(this.maxMessageSize!=null){            producer.setMaxMessageSize(this.maxMessageSize);        }        if(this.sendMsgTimeout!=null){            producer.setSendMsgTimeout(this.sendMsgTimeout);        }        //如果发送消息失败,设置重试次数,默认为2次        if(this.retryTimesWhenSendFailed!=null){            producer.setRetryTimesWhenSendFailed(this.retryTimesWhenSendFailed);        }        try {            producer.start();        } catch (MQClientException e) {            e.printStackTrace();        }        return producer;    }}

4、消费者配置

@Configurationpublic class ConsumerConfig {    private static final Logger LOG = LoggerFactory.getLogger(ConsumerConfig.class) ;    @Value("${rocketmq.consumer.namesrvAddr}")    private String namesrvAddr;    @Value("${rocketmq.consumer.groupName}")    private String groupName;    @Value("${rocketmq.consumer.consumeThreadMin}")    private int consumeThreadMin;    @Value("${rocketmq.consumer.consumeThreadMax}")    private int consumeThreadMax;    @Value("${rocketmq.consumer.topics}")    private String topics;    @Value("${rocketmq.consumer.consumeMessageBatchMaxSize}")    private int consumeMessageBatchMaxSize;    @Resource    private RocketMsgListener msgListener;    @Bean    public DefaultMQPushConsumer getRocketMQConsumer(){        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(groupName);        consumer.setNamesrvAddr(namesrvAddr);        consumer.setConsumeThreadMin(consumeThreadMin);        consumer.setConsumeThreadMax(consumeThreadMax);        consumer.reGISterMessageListener(msgListener);        consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET);        consumer.setConsumeMessageBatchMaxSize(consumeMessageBatchMaxSize);        try {            String[] topicTagsArr = topics.split(";");            for (String topicTags : topicTagsArr) {                String[] topicTag = topicTags.split("~");                consumer.subscribe(topicTag[0],topicTag[1]);            }            consumer.start();        }catch (MQClientException e){            e.printStackTrace();        }        return consumer;    }}

5、消息监听配置

@Componentpublic class RocketMsgListener implements MessageListenerConcurrently {    private static final Logger LOG = LoggerFactory.getLogger(RocketMsgListener.class) ;    @Resource    private ParamConfigService paramConfigService ;    @Override    public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext context) {        if (CollectionUtils.isEmpty(list)){            return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;        }        MessageExt messageExt = list.get(0);        LOG.info("接受到的消息为:"+new String(messageExt.getBody()));        int reConsume = messageExt.getReconsumeTimes();        // 消息已经重试了3次,如果不需要再次消费,则返回成功        if(reConsume ==3){            return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;        }        if(messageExt.getTopic().equals(paramConfigService.rocketTopic)){            String tags = messageExt.getTags() ;            switch (tags){                case "rocketTag":                    LOG.info("开户 tag == >>"+tags);                    break ;                default:                    LOG.info("未匹配到Tag == >>"+tags);                    break;            }        }        // 消息消费成功        return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;    }}

6、配置参数绑定

@Servicepublic class ParamConfigService {    @Value("${rocket.group}")    public String rocketGroup ;    @Value("${rocket.topic}")    public String rocketTopic ;    @Value("${rocket.tag}")    public String rocketTag ;}

7、消息发送测试

@Servicepublic class RocketMqServiceImpl implements RocketMqService {    @Resource    private DefaultMQProducer defaultMQProducer;    @Resource    private ParamConfigService paramConfigService ;    @Override    public SendResult openAccountMsg(String msgInfo) {        // 可以不使用Config中的Group        defaultMQProducer.setProducerGroup(paramConfigService.rocketGroup);        SendResult sendResult = null;        try {            Message sendMsg = new Message(paramConfigService.rocketTopic,                                          paramConfigService.rocketTag,                                         "open_account_key", msgInfo.getBytes());            sendResult = defaultMQProducer.send(sendMsg);        } catch (Exception e) {            e.printStackTrace();        }        return sendResult ;    }}

三、项目源码

GitHub·地址https://github.com/cicadasmile/middle-ware-parentGitEE·地址Https://gitee.com/cicadasmile/middle-ware-parent

到此,关于“RocketMQ怎么实现请求异步处理”的学习就结束了,希望能够解决大家的疑惑。理论与实践的搭配能更好的帮助大家学习,快去试试吧!若想继续学习更多相关知识,请继续关注编程网网站,小编会继续努力为大家带来更多实用的文章!

--结束END--

本文标题: RocketMQ怎么实现请求异步处理

本文链接: https://www.lsjlt.com/news/295240.html(转载时请注明来源链接)

有问题或投稿请发送至: 邮箱/279061341@qq.com    QQ/279061341

本篇文章演示代码以及资料文档资料下载

下载Word文档到电脑,方便收藏和打印~

下载Word文档
猜你喜欢
  • RocketMQ怎么实现请求异步处理
    这篇文章主要介绍“RocketMQ怎么实现请求异步处理”,在日常操作中,相信很多人在RocketMQ怎么实现请求异步处理问题上存在疑惑,小编查阅了各式资料,整理出简单好用的操作方法,希望对大家解答”RocketMQ怎么实现请求异步处理”的疑...
    99+
    2023-06-19
  • SpringBoot2 高级应用(01):整合 RocketMQ ,实现请求异步处理
    本文源码:GitHub·点这里 || GitEE·点这里一、RocketMQ1、架构图片2、角色分类(1)、BrokerRocketMQ 的核心,接收 Producer 发过来的消息、处理 Consumer 的消费消息请求、消息的持 久化...
    99+
    2023-06-02
  • react异步请求数据怎么实现
    在React中实现异步请求数据有多种方式,以下是其中几种常用的方法:1. 使用`fetch` API:`fetch`是现代浏览器提供...
    99+
    2023-09-13
    react
  • servlet异步请求的实现
    目录1、什么是servlet异步请求2、Servlet异步请求示例2.1、示例准备2.2、实现自定义的Servlet2.3、异步任务2.4、测试场景1、什么是servlet异步请求 ...
    99+
    2024-04-02
  • python中异步IO怎么同时处理请求
    小编给大家分享一下python中异步IO怎么同时处理请求,相信大部分人都还不怎么了解,因此分享这篇文章给大家参考一下,希望大家阅读完这篇文章后大有收获,下面让我们一起去了解一下吧!python主要应用领域有哪些1、云计算,典型应用OpenS...
    99+
    2023-06-14
  • 如何异步请求处理函数
    本篇文章为大家展示了如何异步请求处理函数,内容简明扼要并且容易理解,绝对能使你眼前一亮,通过这篇文章的详细介绍希望你能有所收获。sendmail.php代码如下:<php$name = $_POST[...
    99+
    2024-04-02
  • 小程序异步请求怎么改为同步请求
    小程序的网络请求默认是异步请求,无法直接改为同步请求。但是可以使用ES6的async/await来实现类似同步的效果。1. 在异步请...
    99+
    2023-10-20
    小程序
  • django异步请求处理的方法是什么
    Django中的异步请求处理可以通过以下几种方法实现:1. 使用Django的内置异步任务处理机制:Django提供了一个名为`as...
    99+
    2023-09-26
    Django
  • Vuejs2.0中怎么实现一个异步跨域请求
    Vuejs2.0中怎么实现一个异步跨域请求,针对这个问题,这篇文章详细介绍了相对应的分析和解答,希望可以帮助更多想解决这个问题的小伙伴找到更简单易行的方法。首先我们来安装Vue-Cli开发模板(这个模板可以...
    99+
    2024-04-02
  • JavaServlet线程中AsyncContext异步处理Http请求
    目录AsyncContextAsyncContext使用示例及测试示例测试结果AsyncContext应用场景背景AsyncContext解决生产问题AsyncContext Asy...
    99+
    2023-03-01
    Java AsyncContext异步处理 Java Servlet AsyncContext
  • Django 中的并发请求处理:使用 Celery 实现异步任务。
    Django 中的并发请求处理:使用 Celery 实现异步任务 在现代 Web 应用程序中,处理并发请求是至关重要的。Django 是一个强大的 Web 框架,但是它并不擅长处理大量的并发请求。为了解决这个问题,我们可以使用 Celery...
    99+
    2023-08-08
    并发 apache django
  • 如何实现ajax发送异步请求
    这篇文章将为大家详细讲解有关如何实现ajax发送异步请求,小编觉得挺实用的,因此分享给大家做个参考,希望大家阅读完这篇文章后可以有所收获。具体内容如下第一步(得到XMLHttpRequest)ajax其实只...
    99+
    2024-04-02
  • Ajax如何实现异步请求技术
    这篇文章将为大家详细讲解有关Ajax如何实现异步请求技术,小编觉得挺实用的,因此分享给大家做个参考,希望大家阅读完这篇文章后可以有所收获。AJAX的全称是Asynchronous JavaScript an...
    99+
    2024-04-02
  • react怎么请求数据异步
    本教程操作环境:Windows10系统、react18.0.0版、Dell G3电脑。react怎么请求数据异步?react异步请求数据方法。关于react异步请求数据有很多种方案。1、saga (用了er6生成器函数)2、promise3...
    99+
    2023-05-14
    React
  • ajax如何实现异步请求刷新
    这篇文章给大家分享的是有关ajax如何实现异步请求刷新的内容。小编觉得挺实用的,因此分享给大家做个参考,一起跟随小编过来看看吧。在网站刷新的方法中,ajax刷新是一种用户体验良好的刷新方式,在结合ssh等流...
    99+
    2024-04-02
  • Node.js 事件循环如何处理异步请求
    ...
    99+
    2024-04-02
  • Node.js中怎么实现异步处理
    这篇文章将为大家详细讲解有关Node.js中怎么实现异步处理,文章内容质量较高,因此小编分享给大家做个参考,希望大家阅读完这篇文章后对相关知识有一定的了解。异步的“坑”最近一段时间参与开发了一个Node.j...
    99+
    2024-04-02
  • java中怎么异步请求网络
    本篇文章给大家分享的是有关java中怎么异步请求网络,小编觉得挺实用的,因此分享给大家学习,希望大家阅读完这篇文章后可以有所收获,话不多说,跟着小编一起来看看吧。java怎么实现异步请求网络,这里实现思路有很多种,但是我们这里有个比较方便的...
    99+
    2023-06-02
  • PHP开发中如何处理异步请求和并发处理
    在Web开发中,经常会遇到需要处理大量并发请求和异步请求的情况。以PHP为例,我们可以利用一些技术和工具来处理这些需求,提高系统的性能和响应能力。本文将介绍如何处理异步请求和并发处理,并提供一些具体的代码示例。一、异步请求的处理使用Ajax...
    99+
    2023-10-21
    异步请求 PHP开发 并发处理
  • ASP应用程序中如何处理异步请求?
    随着互联网技术的不断发展,越来越多的网站需要处理大量的异步请求,以提高网站的性能和用户体验。ASP应用程序也不例外,如何处理异步请求是一个非常重要的问题。本文将介绍ASP应用程序中如何处理异步请求,希望能对ASP开发者有所帮助。 一、什么...
    99+
    2023-07-27
    缓存 shell 异步编程
软考高级职称资格查询
编程网,编程工程师的家园,是目前国内优秀的开源技术社区之一,形成了由开源软件库、代码分享、资讯、协作翻译、讨论区和博客等几大频道内容,为IT开发者提供了一个发现、使用、并交流开源技术的平台。
  • 官方手机版

  • 微信公众号

  • 商务合作