广告
返回顶部
首页 > 资讯 > 后端开发 > Python >RocketMQ设计之异步刷盘
  • 323
分享到

RocketMQ设计之异步刷盘

2024-04-02 19:04:59 323人浏览 薄情痞子

Python 官方文档:入门教程 => 点击学习

摘要

上一篇RocketMQ设计之同步刷盘 异步刷盘方式:在返回写成功状态时,消息可能只是被写入了内存的PAGECACHE,写操作的返回快,吞吐量大;当内存里的消息量积累到一定程度时,统一

上一篇RocketMQ设计之同步刷盘

异步刷盘方式:在返回写成功状态时,消息可能只是被写入了内存的PAGECACHE,写操作的返回快,吞吐量大;当内存里的消息量积累到一定程度时,统一触发写磁盘操作,快速写入

RocketMQ默认采用异步刷盘,异步刷盘两种策略:开启缓冲池,不开启缓冲池

CommitLog的handleDiskFlush方法:

public void handleDiskFlush(AppendMessageResult result, PutMessageResult putMessageResult, MessageExt messageExt) {
    // Synchronization flush
    if (FlushDiskType.SYNC_FLUSH == this.defaultMessageStore.getMessageStoreConfig().getFlushDiskType()) {
        final GroupCommitService service = (GroupCommitService) this.flushCommitLogService;
        if (messageExt.isWaitStoreMsGoK()) {
            GroupCommitRequest request = new GroupCommitRequest(result.getWroteOffset() + result.getWroteBytes());
            service.putRequest(request);
            boolean flushOK = request.waitForFlush(this.defaultMessageStore.getMessageStoreConfig().getSyncFlushTimeout());
            if (!flushOK) {
                log.error("do groupcommit, wait for flush failed, topic: " + messageExt.getTopic() + " tags: " + messageExt.getTags()
                    + " client address: " + messageExt.getBornHostString());
                putMessageResult.setPutMessageStatus(PutMessageStatus.FLUSH_DISK_TIMEOUT);
            }
        } else {
            service.wakeup();
        }
    }
    // Asynchronous flush
    else {
        if (!this.defaultMessageStore.getMessageStoreConfig().isTransientStorePoolEnable()) {
            flushCommitLogService.wakeup();
        } else {
            commitLogService.wakeup();
        }
    }
}

不开启缓冲池:默认不开启,刷盘线程FlushRealTimeService会每间隔500毫秒尝试去刷盘。

class FlushRealTimeService extends FlushCommitLogService {
    private long lastFlushTimestamp = 0;
    private long printTimes = 0;

    public void run() {
        CommitLog.log.info(this.getServiceName() + " service started");

        while (!this.isStopped()) {
            boolean flushCommitLogTimed = CommitLog.this.defaultMessageStore.getMessageStoreConfig().isFlushCommitLogTimed();

            //每次Flush间隔500毫秒
            int interval = CommitLog.this.defaultMessageStore.getMessageStoreConfig().getFlushIntervalCommitLog();
            //每次Flush最少4页内存数据(16KB)
            int flushPhysicQueueLeastPages = CommitLog.this.defaultMessageStore.getMessageStoreConfig().getFlushCommitLogLeastPages();

               //距离上次刷盘时间阈值为10秒
            int flushPhysicQueueThoroughInterval =
                CommitLog.this.defaultMessageStore.getMessageStoreConfig().getFlushCommitLogThoroughInterval();

            boolean printFlushProgress = false;

            // Print flush progress
            long currentTimeMillis = System.currentTimeMillis();
            if (currentTimeMillis >= (this.lastFlushTimestamp + flushPhysicQueueThoroughInterval)) {
                this.lastFlushTimestamp = currentTimeMillis;
                flushPhysicQueueLeastPages = 0;
                printFlushProgress = (printTimes++ % 10) == 0;
            }

            try {
                if (flushCommitLogTimed) {
                    Thread.sleep(interval);
                } else {
                    this.waitForRunning(interval);
                }

                if (printFlushProgress) {
                    this.printFlushProgress();
                }

                long begin = System.currentTimeMillis();
                CommitLog.this.mappedFileQueue.flush(flushPhysicQueueLeastPages);
                long storeTimestamp = CommitLog.this.mappedFileQueue.getStoreTimestamp();
                if (storeTimestamp > 0) {
                    CommitLog.this.defaultMessageStore.getStoreCheckpoint().setPhysicMsgTimestamp(storeTimestamp);
                }
                long past = System.currentTimeMillis() - begin;
                if (past > 500) {
                    log.info("Flush data to disk costs {} ms", past);
                }
            } catch (Throwable e) {
                CommitLog.log.warn(this.getServiceName() + " service has exception. ", e);
                this.printFlushProgress();
            }
        }

        // NORMal shutdown, to ensure that all the flush before exit
        boolean result = false;
        for (int i = 0; i < RETRY_TIMES_OVER && !result; i++) {
            result = CommitLog.this.mappedFileQueue.flush(0);
            CommitLog.log.info(this.getServiceName() + " service shutdown, retry " + (i + 1) + " times " + (result ? "OK" : "Not OK"));
        }

        this.printFlushProgress();

        CommitLog.log.info(this.getServiceName() + " service end");
    }

    @Override
    public String getServiceName() {
        return FlushRealTimeService.class.getSimpleName();
    }

    private void printFlushProgress() {
        // CommitLog.log.info("how much disk fall behind memory, "
        // + CommitLog.this.mappedFileQueue.howMuchFallBehind());
    }

    @Override
    public long getJointime() {
        return 1000 * 60 * 5;
    }
}
  • 判断是否超过10秒没刷盘了,如果超过强制刷盘
  • 等待Flush间隔500ms
  • 通过MappedFile刷盘
  • 设置StoreCheckpoint刷盘时间点
  • 超过500ms的刷盘记录日志
  • Broker正常停止前,把内存page中的数据刷盘

开启缓冲池:

class CommitRealTimeService extends FlushCommitLogService {

    private long lastCommitTimestamp = 0;

    @Override
    public String getServiceName() {
        return CommitRealTimeService.class.getSimpleName();
    }

    @Override
    public void run() {
        CommitLog.log.info(this.getServiceName() + " service started");
        while (!this.isStopped()) {
            //每次提交间隔200毫秒
            int interval = CommitLog.this.defaultMessageStore.getMessageStoreConfig().getCommitIntervalCommitLog();

            //每次提交最少4页内存数据(16KB)
            int commitDataLeastPages = CommitLog.this.defaultMessageStore.getMessageStoreConfig().getCommitCommitLogLeastPages();

            //距离上次提交时间阈值为200毫秒
            int commitDataThoroughInterval =
                CommitLog.this.defaultMessageStore.getMessageStoreConfig().getCommitCommitLogThoroughInterval();

            long begin = System.currentTimeMillis();
            if (begin >= (this.lastCommitTimestamp + commitDataThoroughInterval)) {
                this.lastCommitTimestamp = begin;
                commitDataLeastPages = 0;
            }

            try {
                boolean result = CommitLog.this.mappedFileQueue.commit(commitDataLeastPages);
                long end = System.currentTimeMillis();
                if (!result) {
                    this.lastCommitTimestamp = end; // result = false means some data committed.
                    //now wake up flush thread.
                    flushCommitLogService.wakeup();
                }

                if (end - begin > 500) {
                    log.info("Commit data to file costs {} ms", end - begin);
                }
                this.waitForRunning(interval);
            } catch (Throwable e) {
                CommitLog.log.error(this.getServiceName() + " service has exception. ", e);
            }
        }

        boolean result = false;
        for (int i = 0; i < RETRY_TIMES_OVER && !result; i++) {
            result = CommitLog.this.mappedFileQueue.commit(0);
            CommitLog.log.info(this.getServiceName() + " service shutdown, retry " + (i + 1) + " times " + (result ? "OK" : "Not OK"));
        }
        CommitLog.log.info(this.getServiceName() + " service end");
    }
}

RocketMQ申请一块和CommitLog文件相同大小的堆外内存来做缓冲池,数据会先写入缓冲池,提交线程CommitRealTimeService也每间隔500毫秒尝试提交到文件通道等待刷盘,刷盘最终由FlushRealTimeService来完成,和不开启缓冲池的处理一致。使用缓冲池的目的是多条数据合并写入,从而提高io性能。

  • 判断是否超过200毫秒没提交,需要强制提交
  • 提交到MappedFile,此时还未刷盘
  • 然后唤醒刷盘线程
  • 在Broker正常停止前,提交内存page中的数据

到此这篇关于RocketMQ设计之异步刷盘的文章就介绍到这了,更多相关RocketMQ异步刷盘内容请搜索编程网以前的文章或继续浏览下面的相关文章希望大家以后多多支持编程网!

--结束END--

本文标题: RocketMQ设计之异步刷盘

本文链接: https://www.lsjlt.com/news/143228.html(转载时请注明来源链接)

有问题或投稿请发送至: 邮箱/279061341@qq.com    QQ/279061341

本篇文章演示代码以及资料文档资料下载

下载Word文档到电脑,方便收藏和打印~

下载Word文档
猜你喜欢
  • RocketMQ设计之异步刷盘
    上一篇RocketMQ设计之同步刷盘 异步刷盘方式:在返回写成功状态时,消息可能只是被写入了内存的PAGECACHE,写操作的返回快,吞吐量大;当内存里的消息量积累到一定程度时,统一...
    99+
    2022-11-13
  • RocketMQ设计之同步刷盘
    同步刷盘方式:在返回写成功状态时,消息已经被写入磁盘。具体流程是,消息写入内存的PAGECACHE后,立刻通知刷盘线程刷盘,然后等待刷盘完成,刷盘线程执行完成后唤醒等待的线程,返回消...
    99+
    2022-11-13
  • RocketMQ设计之同步刷盘的示例分析
    这篇文章主要介绍RocketMQ设计之同步刷盘的示例分析,文中介绍的非常详细,具有一定的参考价值,感兴趣的小伙伴们一定要看完!在同步刷盘模式下,当消息写到内存后,会等待数据写到磁盘的CommitLog文件。CommitLog的handleD...
    99+
    2023-06-29
  • RocketMQ设计之故障规避机制
    NameServer为了简化和客户端通信,发现Broker故障时并不会立即通知客户端。故障规避机制就是用来解决当Broker出现故障,Producer不能及时感知而导致消息发送失败的...
    99+
    2022-11-13
  • RocketMQ设计之主从复制和读写分离
    目录一、主从复制二、读写分离一、主从复制 RocketMQ为了提高消费的高可用性,避免Broker发生单点故障引起Broker上的消息无法及时消费,同时避免单个机器上硬盘坏损出现消费...
    99+
    2022-11-13
  • RocketMQ设计之故障规避机制的示例分析
    这篇文章给大家分享的是有关RocketMQ设计之故障规避机制的示例分析的内容。小编觉得挺实用的,因此分享给大家做个参考,一起跟随小编过来看看吧。NameServer为了简化和客户端通信,发现Broker故障时并不会立即通知客户端。故障规避机...
    99+
    2023-06-29
  • RocketMQ之NameServer架构设计及启动关闭流程源码分析
    目录NameServer1.架构设计2.核心类与配置NamesrvControllerNamesrvConfigNettyServerConfigRouteInfoManager3....
    99+
    2022-11-12
  • 如何从RocketMQ消息持久化设计看磁盘性能瓶颈的突破
    今天就跟大家聊聊有关如何从RocketMQ消息持久化设计看磁盘性能瓶颈的突破,可能很多人都不太了解,为了让大家更加了解,小编给大家总结了以下内容,希望大家根据这篇文章可以有所收获。从RocketMQ消息持久化设计看磁盘性能瓶颈的突破分布式消...
    99+
    2023-06-19
  • Java 设计模式之责任链模式及异步责任链详解
    目录一、定义二、普通责任链模式三、异步责任链模式一、定义 责任链模式(Chain of Responsibility Pattern):避免将一个请求的发送者与接受者耦合在一起,让多...
    99+
    2022-11-12
  • C++算法设计之马踏棋盘的实现
    本文实例为大家分享了C++算法设计之马踏棋盘的具体代码,供大家参考,具体内容如下 (一)马踏棋盘经典算法描述:   (1)马踏棋盘是经典的程序设计问题之一,主要的解决方案有...
    99+
    2022-11-13
  • android异步任务设计思详解(AsyncTask)
    这里说有设计思想是我根据查看Android源代码提炼出来的代码逻辑,所以不会跟Google工程师的原始设计思想100%符合(也有可能是0%),但是本文一定可以帮助你理解Asyn...
    99+
    2022-06-06
    asynctask Android
  • Java架构设计之六步拆解DDD
    目录引言项目需求信息DDD落地实践战略设计1、业务分析(1)事前准备(2)邀请参会的人(3)业务讨论2、领域建模(1)领域对象分析(2)构建业务聚合3、划分边界上下文战术设计1、微服...
    99+
    2022-11-13
  • 浅谈Java并发之同步器设计
    前言: 在 Java并发内存模型详情了解到多进程(线程)读取共享资源的时候存在竞争条件。 计算机中通过设计同步器来协调进程(线程)之间执行顺序。同步器作用就像登机安检人员一样可以协...
    99+
    2022-11-12
  • Java设计模式之代理模式与@Async异步注解失效的解决
    目录JDK动态代理实现自定义异步注解(@Async)SpringAOP实现自定义异步注解Spring的异步注解@Async失效分析自定义注解实现方式 JDK动态代理实现自定义异步注解...
    99+
    2022-11-13
  • Java毕业设计实战之在线网盘系统的实现
    一、项目简述 功能:用户的邮箱注册、验证码验证以及用户登录。 不需要注册账号,也可以上传满足条件的临时文件,但是只4小时内有效。 文件的管理,上传、下载、重命名、删除、查看统计数据、...
    99+
    2022-11-13
  • Java并发之同步器设计的方法是什么
    本篇内容介绍了“Java并发之同步器设计的方法是什么”的有关知识,在实际案例的操作过程中,不少人都会遇到这样的困境,接下来就让小编带领大家学习一下如何处理这些情况吧!希望大家仔细阅读,能够学有所成!在 Java并发之内存模型了解到多进程(线...
    99+
    2023-06-16
  • 类和原型的设计模式之复制与委托差异
    目录小引“类”设计模式举个例子:“原型”设计模式小结小引 JavaScript 技能持有者一定有问过这个问题: JavaScript ...
    99+
    2022-11-13
  • Go语言异步API设计的扇入扇出模式详解
    目录前言扇入/扇出服务Go 语言实现扇入/扇出模式前言 扇出/扇入模式是更高级 API 集成的主要内容。这些应用程序并不总是表现出相同的可用性或性能特征。 扇出是从电子工程中借用的一...
    99+
    2022-11-11
  • Kotlin 协程异步热数据流的设计与使用讲解
    目录一.异步冷数据流二.异步热数据流1.异步热数据流的设计1)SharedFlow接口2)MutableSharedFlow接口2.异步热数据流的使用1)MutableSharedF...
    99+
    2022-11-13
  • 探究 Python 异步编程框架中,接口设计的优缺点。
    Python 异步编程框架在当今的开发领域中已经变得越来越重要。异步编程框架可以帮助开发者更好地利用 CPU,使得程序能够更加高效地运行。在 Python 异步编程框架中,接口设计是非常重要的一部分。接下来,我们将 一、异步编程框架的基本原...
    99+
    2023-06-27
    异步编程 框架 接口
软考高级职称资格查询
编程网,编程工程师的家园,是目前国内优秀的开源技术社区之一,形成了由开源软件库、代码分享、资讯、协作翻译、讨论区和博客等几大频道内容,为IT开发者提供了一个发现、使用、并交流开源技术的平台。
  • 官方手机版

  • 微信公众号

  • 商务合作