iis服务器助手广告广告
返回顶部
首页 > 资讯 > 后端开发 > Python >RocketMQ producer发送者浅析
  • 954
分享到

RocketMQ producer发送者浅析

RocketMQ producerRocketMQ发送者 2023-05-17 11:05:27 954人浏览 安东尼

Python 官方文档:入门教程 => 点击学习

摘要

发送者其实比较简单,需要做的就是首先确定往哪里发送,其次怎么让消息发送顺畅。我们就看一下具体的代码吧。 首先调用start方法。完成各个类的初始化,启动多个定时任务,其中一个定时任务

发送者其实比较简单,需要做的就是首先确定往哪里发送,其次怎么让消息发送顺畅。我们就看一下具体的代码吧。

首先调用start方法。完成各个类的初始化,启动多个定时任务,其中一个定时任务是updateTopicRouteInfoFromNameServer,这个方法里面和nameService建立长连接,同时维护了topicRouteTable和brokerAddrTable等缓存。topicRouteTable里面维护了这个topic包括有哪些queue和broker。这样producer才可以知道要发往哪里。

启动的流程主要在这个方法中:

MQClientInstance#start

public void start() throws MQClientException {
    synchronized (this) {
        switch (this.serviceState) {
            case CREATE_JUST:
                this.serviceState = ServiceState.START_FaiLED;
                // If not specified,looking address from name server
                if (null == this.clientConfig.getNamesrvAddr()) {
                    this.mQClientapiImpl.fetchNameServerAddr();
                }
                // Start request-response channel
                this.mQClientAPIImpl.start();
                // Start various schedule tasks
                this.startScheduledTask();
                // Start pull service
                this.pullMessageService.start();
                // Start rebalance service
                this.rebalanceService.start();
                // Start push service
                this.defaultMQProducer.getDefaultMQProducerImpl().start(false);
                log.info("the client factory [{}] start OK", this.clientId);
                this.serviceState = ServiceState.RUNNING;
                break;
            case START_FAILED:
                throw new MQClientException("The Factory object[" + this.getClientId() + "] has been created before, and failed.", null);
            default:
                break;
        }
    }
}

其中启动了一系列定时任务,包括org.apache.RocketMQ.client.impl.factory.MQClientInstance#updateTopicRouteInfoFromNameServer这个方法

    public boolean updateTopicRouteInfoFromNameServer(final String topic, boolean isDefault,
        DefaultMQProducer defaultMQProducer) {
        try {
            if (this.lockNamesrv.tryLock(LOCK_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS)) {
                try {
                    TopicRouteData topicRouteData;
                    if (isDefault && defaultMQProducer != null) {
                        // 从nameServer获取topciRouteData
                        topicRouteData = this.mQClientAPIImpl.getDefaultTopicRouteInfoFromNameServer(defaultMQProducer.getCreateTopicKey(),
                            clientConfig.getMqClientApiTimeout());
                        if (topicRouteData != null) {
                            for (QueueData data : topicRouteData.getQueueDatas()) {
                                int queueNums = Math.min(defaultMQProducer.getDefaultTopicQueueNums(), data.getReadQueueNums());
                                data.setReadQueueNums(queueNums);
                                data.setWriteQueueNums(queueNums);
                            }
                        }
                    } else {
                        topicRouteData = this.mQClientAPIImpl.getTopicRouteInfoFromNameServer(topic, clientConfig.getMqClientApiTimeout());
                    }
                    if (topicRouteData != null) {
                        TopicRouteData old = this.topicRouteTable.get(topic);
                        boolean changed = topicRouteData.topicRouteDataChanged(old);
                        if (!changed) {
                            changed = this.isNeedUpdateTopicRouteInfo(topic);
                        } else {
                            log.info("the topic[{}] route info changed, old[{}] ,new[{}]", topic, old, topicRouteData);
                        }
                        if (changed) {
                            for (BrokerData bd : topicRouteData.getBrokerDatas()) {
                                this.brokerAddrTable.put(bd.getBrokerName(), bd.getBrokerAddrs());
                            }
                            // Update endpoint map
                            {
                                ConcurrentMap<MessageQueue, String> mqEndPoints = topicRouteData2EndpointsForStaticTopic(topic, topicRouteData);
                                if (!mqEndPoints.isEmpty()) {
                                    topicEndPointsTable.put(topic, mqEndPoints);
                                }
                            }
                            // Update Pub info
                            {
                                // 生成topicPublishInfo
                                TopicPublishInfo publishInfo = topicRouteData2TopicPublishInfo(topic, topicRouteData);
                                publishInfo.setHaveTopicRouterInfo(true);
                                for (Entry<String, MQProducerInner> entry : this.producerTable.entrySet()) {
                                    MQProducerInner impl = entry.getValue();
                                    if (impl != null) {
                                        // 更新 topicPublishInfo
                                        impl.updateTopicPublishInfo(topic, publishInfo);
                                    }
                                }
                            }
                            // Update sub info
                            if (!consumerTable.isEmpty()) {
                                Set<MessageQueue> subscribeInfo = topicRouteData2TopicSubscribeInfo(topic, topicRouteData);
                                for (Entry<String, MQConsumerInner> entry : this.consumerTable.entrySet()) {
                                    MQConsumerInner impl = entry.getValue();
                                    if (impl != null) {
                                        impl.updateTopicSubscribeInfo(topic, subscribeInfo);
                                    }
                                }
                            }
                            TopicRouteData cloneTopicRouteData = new TopicRouteData(topicRouteData);
                            log.info("topicRouteTable.put. Topic = {}, TopicRouteData[{}]", topic, cloneTopicRouteData);
                            this.topicRouteTable.put(topic, cloneTopicRouteData);
                            return true;
                        }
                    } else {
                        log.warn("updateTopicRouteInfoFromNameServer, getTopicRouteInfoFromNameServer return null, Topic: {}. [{}]", topic, this.clientId);
                    }
                } catch (MQClientException e) {
                    if (!topic.startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX) && !topic.equals(TopicValidator.AUTO_CREATE_TOPIC_KEY_TOPIC)) {
                        log.warn("updateTopicRouteInfoFromNameServer Exception", e);
                    }
                } catch (RemotingException e) {
                    log.error("updateTopicRouteInfoFromNameServer Exception", e);
                    throw new IllegalStateException(e);
                } finally {
                    this.lockNamesrv.unlock();
                }
            } else {
                log.warn("updateTopicRouteInfoFromNameServer tryLock timeout {}ms. [{}]", LOCK_TIMEOUT_MILLIS, this.clientId);
            }
        } catch (InterruptedException e) {
            log.warn("updateTopicRouteInfoFromNameServer Exception", e);
        }
        return false;
    }

通过方法名也知道是从nameServer获取这个topic相关的broke数据,拿到TopicRouteData数据。先更新brokerAddrTable,存储borker具体的地址。然后在org.apache.rocketmq.client.impl.factory.MQClientInstance#topicRouteData2TopicPublishInfo里面再进一步生成TopicPublishInfo数据。TopicPublishInfo是对TopicRouteData的一个封装,除了TopicRouteData,还有messageQueue数据,messageQueue是Queue和Borker的交集,会根据配置的queue数量,生成具体的messageQueue,queueId就是0,1,2,3,4他们自己的顺序。

所以有了TopicPublishInfo数据,就知道往哪里发了。

发送消息的过程。

  • 先找到TopicPublishInfo。TopicPublishInfo里面有一个MessageQueue的list。
  • 从MessageQueueList里面拿到一个messageQueue。 如果没有开启sendLatencyFaultEnable,默认就是采用轮询方法。具体的轮询方式就是,TopicPublishInfo里面维护了一个序号index,每次index自增1,然后通过index去MessageQueueList里面拿一个。
  • 拿到了MessageQueue之后,里面有broker的name,根据name去找broker的ip地址,发送数据。这个ip地址就是前面提到的brokerAddrTable变量,在updateTopicRouteInfoFromNameServer方法里面维护的。

到此这篇关于RocketMQ producer发送者浅析的文章就介绍到这了,更多相关RocketMQ producer内容请搜索编程网以前的文章或继续浏览下面的相关文章希望大家以后多多支持编程网!

--结束END--

本文标题: RocketMQ producer发送者浅析

本文链接: https://www.lsjlt.com/news/211127.html(转载时请注明来源链接)

有问题或投稿请发送至: 邮箱/279061341@qq.com    QQ/279061341

本篇文章演示代码以及资料文档资料下载

下载Word文档到电脑,方便收藏和打印~

下载Word文档
猜你喜欢
  • RocketMQ producer发送者浅析
    发送者其实比较简单,需要做的就是首先确定往哪里发送,其次怎么让消息发送顺畅。我们就看一下具体的代码吧。 首先调用start方法。完成各个类的初始化,启动多个定时任务,其中一个定时任务...
    99+
    2023-05-17
    RocketMQ producer RocketMQ发送者
  • RocketMQ producer同步发送和单向发送源码分析
    这篇文章主要介绍“RocketMQ producer同步发送和单向发送源码分析”的相关知识,小编通过实际案例向大家展示操作过程,操作方法简单快捷,实用性强,希望这篇“RocketMQ producer同步发送和单向发送源...
    99+
    2023-07-05
  • RocketMQ消息发送流程源码剖析
    目录正文读源码1 调用defaultMQProducerImpl.send()2 设置过期时间3 执行defaultMQProducerImpl.sendDefaultImpl()方...
    99+
    2022-11-13
    RocketMQ消息发送流程 RocketMQ 消息
  • PythonHttp发送请求浅析
    目录前言浅析requests浅析aiohttp浅析httpx结语前言 相信很多人使用Python做接口请求(Http客户端请求)的时候,很多人肯定用过这几个:urllib,urlli...
    99+
    2022-11-11
  • SpringBoot集成RocketMQ发送事务消息的原理解析
    目录简介原理具体实现消费者消费者生产者消息监听器消息事务测试正常测试异常测试代码调整执行结果总结简介 RocketMQ 事务消息(Transactional Message)是指应用...
    99+
    2022-11-13
  • 浅析web前端开发者的招聘要求
    随着互联网的不断发展,Web前端开发成为了非常重要的职业之一。Web前端开发人员主要负责开发网站的前端部分,包括页面设计、交互效果实现、数据展示等方面。前端开发人员的职责非常广泛,需要掌握多种编程语言和工具,同时具备创新、学习能力和团队协作...
    99+
    2023-05-14
  • 浅析nodejs实现Websocket的数据接收与发送
    WebSocket是HTML5开始提供的一种浏览器与服务器间进行全双工通讯的网络技术。在WebSocket API中,浏览器和服务器只需要要做一个握手(handshaking)的动作,然后,浏览器和服务器之...
    99+
    2022-06-04
    数据 nodejs Websocket
  • 浅析前端开发者要如何学习Node语言?
    本文整理自我初学 Node.js 时的笔记,用以向对 Node.js 这门语言有兴趣的读者简明扼要的介绍 Node.js 是什么,以及该如何学习这门语言。1. 什么是 Node.js?Node.js 是 Ryan Dahl 在 2009 年...
    99+
    2023-05-14
    前端 Node.js JavaScript
  • 浅析前端开发者可以学习Web3D技术吗
    随着互联网的发展,网页设计也越来越重要。前端开发者需要不断地学习新技术和新领域,以适应市场变化和用户需求。其中,Web3D技术逐渐成为前端开发的新关键词,它可以让网站更加生动和多样化。那么,前端开发者可以学习Web3D技术吗?首先,Web3...
    99+
    2023-05-14
  • 浅析linux下如何用脚本自动发送文本mail邮件
    1. 安装msmtp luther@gliethttp:~$ sudo apt-get install msmtp 2. 编辑配置脚本(~/.msmtprc是默认配置文件,也可以使用-C选项指定配置文件路径...
    99+
    2022-06-04
    如何用 脚本 自动发送
软考高级职称资格查询
编程网,编程工程师的家园,是目前国内优秀的开源技术社区之一,形成了由开源软件库、代码分享、资讯、协作翻译、讨论区和博客等几大频道内容,为IT开发者提供了一个发现、使用、并交流开源技术的平台。
  • 官方手机版

  • 微信公众号

  • 商务合作