iis服务器助手广告广告
返回顶部
首页 > 资讯 > 精选 >RocketMQ设计之同步刷盘的示例分析
  • 865
分享到

RocketMQ设计之同步刷盘的示例分析

2023-06-29 14:06:06 865人浏览 泡泡鱼
摘要

这篇文章主要介绍RocketMQ设计之同步刷盘的示例分析,文中介绍的非常详细,具有一定的参考价值,感兴趣的小伙伴们一定要看完!在同步刷盘模式下,当消息写到内存后,会等待数据写到磁盘的CommitLog文件。CommitLog的handleD

这篇文章主要介绍RocketMQ设计之同步刷盘的示例分析,文中介绍的非常详细,具有一定的参考价值,感兴趣的小伙伴们一定要看完!

在同步刷盘模式下,当消息写到内存后,会等待数据写到磁盘的CommitLog文件。

CommitLog的handleDiskFlush方法:

public void handleDiskFlush(AppendMessageResult result, PutMessageResult putMessageResult, MessageExt messageExt) {    // Synchronization flush    if (FlushDiskType.SYNC_FLUSH == this.defaultMessageStore.getMessageStoreConfig().getFlushDiskType()) {        final GroupCommitService service = (GroupCommitService) this.flushCommitLogService;        if (messageExt.isWaitStoreMsGoK()) {            GroupCommitRequest request = new GroupCommitRequest(result.getWroteOffset() + result.getWroteBytes());            service.putRequest(request);            boolean flushOK = request.waitForFlush(this.defaultMessageStore.getMessageStoreConfig().getSyncFlushTimeout());            if (!flushOK) {                log.error("do groupcommit, wait for flush failed, topic: " + messageExt.getTopic() + " tags: " + messageExt.getTags()                    + " client address: " + messageExt.getBornHostString());                putMessageResult.setPutMessageStatus(PutMessageStatus.FLUSH_DISK_TIMEOUT);            }        } else {            service.wakeup();        }    }    // Asynchronous flush    else {        if (!this.defaultMessageStore.getMessageStoreConfig().isTransientStorePoolEnable()) {            flushCommitLogService.wakeup();        } else {            commitLogService.wakeup();        }    }}class GroupCommitService extends FlushCommitLogService {        private volatile List<GroupCommitRequest> requestsWrite = new ArrayList<GroupCommitRequest>();        private volatile List<GroupCommitRequest> requestsRead = new ArrayList<GroupCommitRequest>();        //提交刷盘任务到任务列表        public synchronized void putRequest(final GroupCommitRequest request) {            synchronized (this.requestsWrite) {                this.requestsWrite.add(request);            }            if (hasNotified.compareAndSet(false, true)) {                waitPoint.countDown(); // notify            }        }        private void swapRequests() {            List<GroupCommitRequest> tmp = this.requestsWrite;            this.requestsWrite = this.requestsRead;            this.requestsRead = tmp;        }        private void doCommit() {            synchronized (this.requestsRead) {                if (!this.requestsRead.isEmpty()) {                    for (GroupCommitRequest req : this.requestsRead) {                        // There may be a message in the next file, so a maximum of                        // two times the flush                        boolean flushOK = false;                        for (int i = 0; i < 2 && !flushOK; i++) {                            flushOK = CommitLog.this.mappedFileQueue.getFlushedWhere() >= req.getNextOffset();                            if (!flushOK) {                                CommitLog.this.mappedFileQueue.flush(0);                            }                        }                        req.wakeupCustomer(flushOK);                    }                    long storeTimestamp = CommitLog.this.mappedFileQueue.getStoreTimestamp();                    if (storeTimestamp > 0) {                        CommitLog.this.defaultMessageStore.getStoreCheckpoint().setPhysicMsgTimestamp(storeTimestamp);                    }                    this.requestsRead.clear();                } else {                    // Because of individual messages is set to not sync flush, it                    // will come to this process                    CommitLog.this.mappedFileQueue.flush(0);                }            }        }        public void run() {            CommitLog.log.info(this.getServiceName() + " service started");            while (!this.isStopped()) {                try {                    this.waitForRunning(10);                    this.doCommit();                } catch (Exception e) {                    CommitLog.log.warn(this.getServiceName() + " service has exception. ", e);                }            }            // Under nORMal circumstances shutdown, wait for the arrival of the            // request, and then flush            try {                Thread.sleep(10);            } catch (InterruptedException e) {                CommitLog.log.warn("GroupCommitService Exception, ", e);            }            synchronized (this) {                this.swapRequests();            }            this.doCommit();            CommitLog.log.info(this.getServiceName() + " service end");        }        @Override        protected void onWaitEnd() {            this.swapRequests();        }        @Override        public String getServiceName() {            return GroupCommitService.class.getSimpleName();        }        @Override        public long getJointime() {            return 1000 * 60 * 5;        }    }

GroupCommitRequest是刷盘任务,提交刷盘任务后,会在刷盘队列中等待刷盘,而刷盘线程

GroupCommitService每隔10毫秒写一批数据到磁盘。之所以不直接写是磁盘io压力大,写入性能低,每隔10毫秒写一次可以提升磁盘io效率和写入性能。

  • putRequest(request) 提交刷盘任务到任务列表

  • request.waitForFlush同步等待GroupCommitService将任务列表中的任务刷盘完成。

两个队列读写分离,requestsWrite是写队列,用户保存添加进来的刷盘任务,requestsRead是读队列,在刷盘之前会把写队列的数据放入读队列。

CommitLog的doCommit方法:

private void doCommit() {            synchronized (this.requestsRead) {                if (!this.requestsRead.isEmpty()) {                    for (GroupCommitRequest req : this.requestsRead) {                        // There may be a message in the next file, so a maximum of                        // two times the flush                        boolean flushOK = false;                        for (int i = 0; i < 2 && !flushOK; i++) {                            //根据offset确定是否已经刷盘                            flushOK = CommitLog.this.mappedFileQueue.getFlushedWhere() >= req.getNextOffset();                            if (!flushOK) {                                CommitLog.this.mappedFileQueue.flush(0);                            }                        }                        req.wakeupCustomer(flushOK);                    }                    long storeTimestamp = CommitLog.this.mappedFileQueue.getStoreTimestamp();                    if (storeTimestamp > 0) {                        CommitLog.this.defaultMessageStore.getStoreCheckpoint().setPhysicMsgTimestamp(storeTimestamp);                    }                    //清空已刷盘的列表                    this.requestsRead.clear();                } else {                    // Because of individual messages is set to not sync flush, it                    // will come to this process                    CommitLog.this.mappedFileQueue.flush(0);                }            }        }
  • 刷盘的时候依次读取requestsRead中的数据写入磁盘,

  • 写入完成后清空requestsRead

读写分离设计的目的是在刷盘时不影响任务提交到列表。

CommitLog.this.mappedFileQueue.flush(0);是刷盘操作:

public boolean flush(final int flushLeastPages) {    boolean result = true;    MappedFile mappedFile = this.findMappedFileByOffset(this.flushedWhere, this.flushedWhere == 0);    if (mappedFile != null) {        long tmpTimeStamp = mappedFile.getStoreTimestamp();        int offset = mappedFile.flush(flushLeastPages);        long where = mappedFile.getFileFromOffset() + offset;        result = where == this.flushedWhere;        this.flushedWhere = where;        if (0 == flushLeastPages) {            this.storeTimestamp = tmpTimeStamp;        }    }    return result;}

通过MappedFile映射的CommitLog文件写入磁盘

这就是RocketMQ高可用设计之同步刷盘的基本情况了,大体思路就是一个读写分离的队列来刷盘,同步刷盘任务提交后会在刷盘队列中等待刷盘完成后再返回,而GroupCommitService每隔10毫秒写一批数据到磁盘。

以上是“RocketMQ设计之同步刷盘的示例分析”这篇文章的所有内容,感谢各位的阅读!希望分享的内容对大家有帮助,更多相关知识,欢迎关注编程网精选频道!

--结束END--

本文标题: RocketMQ设计之同步刷盘的示例分析

本文链接: https://www.lsjlt.com/news/325251.html(转载时请注明来源链接)

有问题或投稿请发送至: 邮箱/279061341@qq.com    QQ/279061341

本篇文章演示代码以及资料文档资料下载

下载Word文档到电脑,方便收藏和打印~

下载Word文档
猜你喜欢
  • RocketMQ设计之同步刷盘的示例分析
    这篇文章主要介绍RocketMQ设计之同步刷盘的示例分析,文中介绍的非常详细,具有一定的参考价值,感兴趣的小伙伴们一定要看完!在同步刷盘模式下,当消息写到内存后,会等待数据写到磁盘的CommitLog文件。CommitLog的handleD...
    99+
    2023-06-29
  • RocketMQ设计之同步刷盘
    同步刷盘方式:在返回写成功状态时,消息已经被写入磁盘。具体流程是,消息写入内存的PAGECACHE后,立刻通知刷盘线程刷盘,然后等待刷盘完成,刷盘线程执行完成后唤醒等待的线程,返回消...
    99+
    2024-04-02
  • RocketMQ设计之异步刷盘
    上一篇RocketMQ设计之同步刷盘 异步刷盘方式:在返回写成功状态时,消息可能只是被写入了内存的PAGECACHE,写操作的返回快,吞吐量大;当内存里的消息量积累到一定程度时,统一...
    99+
    2024-04-02
  • RocketMQ设计之故障规避机制的示例分析
    这篇文章给大家分享的是有关RocketMQ设计之故障规避机制的示例分析的内容。小编觉得挺实用的,因此分享给大家做个参考,一起跟随小编过来看看吧。NameServer为了简化和客户端通信,发现Broker故障时并不会立即通知客户端。故障规避机...
    99+
    2023-06-29
  • jQuery设计的示例分析
    这篇文章主要介绍了jQuery设计的示例分析,具有一定借鉴价值,感兴趣的朋友可以参考下,希望大家阅读完这篇文章之后大有收获,下面让小编带着大家一起了解一下。选择元素jQuery的基本设计思想和主要用法,就是...
    99+
    2024-04-02
  • ajax同步和异步XMLHTTP的示例分析
    这篇文章给大家分享的是有关ajax同步和异步XMLHTTP的示例分析的内容。小编觉得挺实用的,因此分享给大家做个参考,一起跟随小编过来看看吧。 在网页脚本编程中,绝大多数情况应...
    99+
    2024-04-02
  • Ajax中同步和异步的示例分析
    小编给大家分享一下Ajax中同步和异步的示例分析,相信大部分人都还不怎么了解,因此分享这篇文章给大家参考一下,希望大家阅读完这篇文章后大有收获,下面让我们一起去了解一下吧!通过ajax向后台发送和接收数据时...
    99+
    2024-04-02
  • xmlplus组件设计之按钮的示例分析
    这篇文章主要介绍xmlplus组件设计之按钮的示例分析,文中介绍的非常详细,具有一定的参考价值,感兴趣的小伙伴们一定要看完!除了图标以外,按钮也许是最简单的组件了,现在来看看如何定义按钮组件。使用原生按钮组...
    99+
    2024-04-02
  • RocketMQ事务消息的示例分析
    小编给大家分享一下RocketMQ事务消息的示例分析,相信大部分人都还不怎么了解,因此分享这篇文章给大家参考一下,希望大家阅读完这篇文章后大有收获,下面让我们一起去了解一下吧!一、大事务 = 小事务 + 异步我们以一个转帐的场景为例来说明这...
    99+
    2023-06-04
  • mysql数据库同步的示例分析
    这篇文章主要介绍了mysql数据库同步的示例分析,具有一定借鉴价值,感兴趣的朋友可以参考下,希望大家阅读完这篇文章之后大有收获,下面让小编带着大家一起了解一下。 同步介绍: MySQL 的数据同步,在M...
    99+
    2024-04-02
  • MySQL半同步复制的示例分析
    这篇文章主要介绍MySQL半同步复制的示例分析,文中介绍的非常详细,具有一定的参考价值,感兴趣的小伙伴们一定要看完!代码分析int repl_semi_report_commit(Trans_pa...
    99+
    2024-04-02
  • React中setState同步和异步的示例分析
    这篇文章主要介绍React中setState同步和异步的示例分析,文中介绍的非常详细,具有一定的参考价值,感兴趣的小伙伴们一定要看完! React起源于Facebook的内部项目。React的出现是革命性的创新,React的是一个...
    99+
    2023-06-15
  • JS设计模式之状态模的示例分析
    这篇文章主要介绍了JS设计模式之状态模的示例分析,具有一定借鉴价值,感兴趣的朋友可以参考下,希望大家阅读完这篇文章之后大有收获,下面让小编带着大家一起了解一下。1. 概述当一个对象的内在状态改变时允许改变其...
    99+
    2024-04-02
  • Java设计模式UML之类图的示例分析
    小编给大家分享一下Java设计模式UML之类图的示例分析,相信大部分人都还不怎么了解,因此分享这篇文章给大家参考一下,希望大家阅读完这篇文章后大有收获,下面让我们一起去了解一下吧!1.UML基本介绍UML——Unified modeling...
    99+
    2023-06-29
  • Java多线程之synchronized同步代码块的示例分析
    小编给大家分享一下Java多线程之synchronized同步代码块的示例分析,相信大部分人都还不怎么了解,因此分享这篇文章给大家参考一下,希望大家阅读完这篇文章后大有收获,下面让我们一起去了解一下吧!1. 同步方法和同步块,哪种更好?同步...
    99+
    2023-06-29
  • jQuery无刷新上传之uploadify的示例分析
    这篇文章给大家分享的是有关jQuery无刷新上传之uploadify的示例分析的内容。小编觉得挺实用的,因此分享给大家做个参考,一起跟随小编过来看看吧。效果图一:从官网下载开发包添加到项目中,我对这个开发包...
    99+
    2024-04-02
  • Ajax请求中异步与同步的示例分析
    这篇文章主要为大家展示了“Ajax请求中异步与同步的示例分析”,内容简而易懂,条理清晰,希望能够帮助大家解决疑惑,下面让小编带领大家一起研究并学习一下“Ajax请求中异步与同步的示例分析”这篇文章吧。 ...
    99+
    2024-04-02
  • JavaScript设计模式之工厂模式的示例分析
    这篇文章主要为大家展示了“JavaScript设计模式之工厂模式的示例分析”,内容简而易懂,条理清晰,希望能够帮助大家解决疑惑,下面让小编带领大家一起研究并学习一下“JavaS...
    99+
    2024-04-02
  • java设计模式之状态模式的示例分析
    这篇文章给大家分享的是有关java设计模式之状态模式的示例分析的内容。小编觉得挺实用的,因此分享给大家做个参考,一起跟随小编过来看看吧。 定义 很多时候,一个对象的行为会根据一个动态的属性变化而变化,这...
    99+
    2024-04-02
  • JavaScript设计模式之代理模式的示例分析
    这篇文章主要为大家展示了“JavaScript设计模式之代理模式的示例分析”,内容简而易懂,条理清晰,希望能够帮助大家解决疑惑,下面让小编带领大家一起研究并学习一下“JavaScript设计模式之代理模式的...
    99+
    2024-04-02
软考高级职称资格查询
编程网,编程工程师的家园,是目前国内优秀的开源技术社区之一,形成了由开源软件库、代码分享、资讯、协作翻译、讨论区和博客等几大频道内容,为IT开发者提供了一个发现、使用、并交流开源技术的平台。
  • 官方手机版

  • 微信公众号

  • 商务合作