广告
返回顶部
首页 > 资讯 > 移动开发 >Flow如何解决背压问题的方法详解
  • 711
分享到

Flow如何解决背压问题的方法详解

Flow 解决背压问题Flow 背压 2022-11-16 00:11:59 711人浏览 八月长安
摘要

目录前言关于背压(BackPressure)背压问题是什么定义背压策略Flow的背压机制模拟背压问题背压处理方式使用buffer进行缓存收集使用conflate解决使用collect

前言

随着时间的推移,越来越多的主流应用已经开始全面拥抱Kotlin,协程的引入,Flow的诞生,给予了开发很多便捷,作为协程与响应式编程结合的流式处理框架,一方面它简单的数据转换与操作符,没有繁琐的操作符处理,广受大部分开发的青睐,另一方面它并没有响应式编程带来的背压问题(BackPressure)的困扰;接下来,本文将会就Flow如何解决背压问题进行探讨

关于背压(BackPressure)

背压问题是什么

首先我们要明确背压问题是什么,它是如何产生的?简单来说,在一般的流处理框架中,消息的接收处理速度跟不上消息的发送速度,从而导致数据不匹配,造成积压。如果不及时正确处理背压问题,会导致一些严重的问题

  • 比如说,消息拥堵了,系统运行不畅从而导致崩溃
  • 比如说,资源被消耗殆尽,甚至会发生数据丢失的情况

如下图所示,可以直观了解背压问题的产生,它在生产者的生产速率高于消费者的处理速率的情况下出现

定义背压策略

既然知道了背压问题我们已经知道是如何产生的,就要去尝试如何正确处理它,大致意思是,如果你有一个流,你需要一个缓冲区,以防数据产生的速度快于消耗的速度,所以往往就会针对这个背压策略进行些讨论

  • 定义的中间缓冲区需要多大才比较合适?
  • 如果缓冲区数据已满了,我们怎么样处理新的事件?

对于以上问题,通过学习Flow里的背压策略,相信可以很快就知道答案了

Flow的背压机制

由于Flow是基于协程中使用的,它不需要一些巧妙设计的解决方案来明确处理背压,在Flow中,不同于一些传统的响应式框架,它的背压管理是使用Kotlin挂起函数suspend实现的,看下源码你会发现,它里面所有的函数方法都是使用suspend修饰符标记,这个修饰符就是为了暂停调度者的执行不阻塞线程。因此,Flow<T>在同一个协程中发射和收集时,如果收集器跟不上数据流,它可以简单地暂停元素的发射,直到它准备好接收更多。看到这,是不是觉得有点难懂.......

简单举个例子,假设我们拥有一个烤箱,可以用来烤面包,由于烤箱容量的限制,一次只能烤4个面包,如果你试着一次烤8个面包,会大大加大烤箱的承载负荷,这已经远远超过了它的内存使用量,很有可能会因此烧掉你的面包。

模拟背压问题

回顾下之前所说的,当我们消耗的速度比生产的速度慢的时候,就会产生背压,下面用代码来模拟下这个过程

首先先创建一个方法,用来每秒发送元素

fun currentTime() = System.currentTimeMillis()
fun threadName() = Thread.currentThread().name
var start: Long = 0
​
fun createEmitter(): Flow<Int> =
    (1..5)
        .asFlow()
        .onStart { start = currentTime() }
        .onEach {
            delay(1000L)
            print("Emit $it (${currentTime() - start}ms) ")
        }

接着需要收集元素,这里我们延迟3秒再接收元素, 延迟是为了夸大缓慢的消费者并创建一个超级慢的收集器。

fun main() {
    runBlocking {
        val time = measureTimeMillis {
            createEmitter().collect {
                print("\nCollect $it starts ${start - currentTime()}ms")
                delay(3000L)
                println("   Collect $it ends ${currentTime() - start}ms")
            }
        }
        print("\nCollected in $time ms")
    }
}

看下输出结果,如下图所示

这样整个过程下来,大概需要20多秒才能结束,这里我们模拟了接收元素比发送元素慢的情况,因此就需要一个背压机制,而这正是Flow本质中的,它并不需要另外的设计来解决背压

背压处理方式

使用buffer进行缓存收集

为了使缓冲和背压处理正常工作,我们需要在单独的协程中运行收集器。这就是.buffer()操作符进来的地方,它是将所有发出的项目发送Channel到在单独的协程中运行的收集器。

public fun <T> Flow<T>.buffer(
    capacity: Int = BUFFERED, 
    onBufferOverflow: BufferOverflow = BufferOverflow.SUSPEND
): Flow<T>

它还为我们提供了缓冲功能,我们可以指定capacity我们的缓冲区和处理策略onBufferOverflow,所以当Buffer溢出的时候,它为我们提供了三个选项

enum BufferOverflow { 
   SUSPEND,
   DROP_OLDEST,
   DROP_LATEST
 }
  • 默认使用SUSPEND:会将当前协程挂起,直到缓冲区中的数据被消费了
  • DROP_OLDEST:它会丢弃最老的数据
  • DROP_LATEST: 它会丢弃最新的数据

好的,我们回到上文所展示的模拟示例,这时候我们可以加入缓冲收集buffer,不指定任何参数,这样默认就是使用SUSPEND,它会将当前协程进行挂起

此时当收集器繁忙的时候,程序就开始缓冲,并在第一次收集方法调用结束的时候,两次发射后再次开始收集,此时流程的耗时时长缩短到大约16秒就可以执行完毕,如下图所示输出结果

使用conflate解决

conflate操作符于Channel中的Conflate模式是一直的,新数据会直接覆盖掉旧数据,它不设缓冲区,也就是缓冲区大小为 0,丢弃旧数据,也就是采取 DROP_OLDEST 策略,那么不就等于buffer(0,BufferOverflow.DROP_OLDEST),可以看下它的源码可以佐证我们的判断

public fun <T> Flow<T>.conflate(): Flow<T> = buffer(CONFLATED)

在某些情况下,由于根本原因是解决生产消费速率不匹配的问题,我们需要做一些取舍的操作,conflate将丢弃掉旧数据,只有在收集器空闲之前发出的最后一个元素才被收集,将上文的模拟实例改为conflate执行,你会发现我们直接丢弃掉了2和4,或者说新的数据直接覆盖掉了它们,整个流程只需要10秒左右就执行完成了

使用collectLatest解决

通过官方介绍,我们知道collectLatest作用在于当原始流发出一个新的值的时候,前一个值的处理将被取消,也就是不会被接收, 和conflate的区别在于它不会用新的数据覆盖,而是每一个都会被处理,只不过如果前一个还没被处理完后一个就来了的话,处理前一个数据的逻辑就会被取消

suspend fun <T> Flow<T>.collectLatest(action: suspend (T) -> Unit)

还是上文的模拟实例,这里我们使用collectLatest看下输出结果:

这样也是有副作用的,如果每个更新都非常重要,例如一些视图,状态刷新,这个时候就不必要用collectLatest ; 当然如果有些更新可以无损失的覆盖,例如数据库刷新,就可以使用到collectLatest,具体详细的使用场景,还需要靠开发者自己去衡量选择使用

小结

对于Flow可以说不需要额外提供什么巧妙的方式解决背压问题,Flow的本质,亦或者说Kotlin协程本身就已经提供了相应的解决方案;开发者只需要在不同的场景中选择正确的背压策略即可。总的来说,它们都是通过使用Kotlin挂起函数suspend,当流的收集器不堪重负时,它可以简单地暂停发射器,然后在准备好接受更多元素时恢复它。

关于挂起函数suspend这里就不过多赘述了,只需要明白的一点是它与传统的基于线程的同步数据管道中背压管理非常相似,无非就是,缓慢的消费者通过阻塞生产者的线程自动向生产者施加背压,简单来说,suspend通过透明地管理跨线程的背压而不阻塞它们,将其超越单个线程并进入异步编程领域。

以上就是Flow如何解决背压问题的方法详解的详细内容,更多关于Flow 解决背压问题的资料请关注编程网其它相关文章!

--结束END--

本文标题: Flow如何解决背压问题的方法详解

本文链接: https://www.lsjlt.com/news/172949.html(转载时请注明来源链接)

有问题或投稿请发送至: 邮箱/279061341@qq.com    QQ/279061341

本篇文章演示代码以及资料文档资料下载

下载Word文档到电脑,方便收藏和打印~

下载Word文档
猜你喜欢
  • Flow如何解决背压问题的方法详解
    目录前言关于背压(BackPressure)背压问题是什么定义背压策略Flow的背压机制模拟背压问题背压处理方式使用buffer进行缓存收集使用conflate解决使用collect...
    99+
    2022-11-16
    Flow 解决背压问题 Flow 背压
  • Node.js中如何解决“背压”问题
    Node.js中如何解决“背压”问题,很多新手对此不是很清楚,为了帮助大家解决这个难题,下面小编将为大家详细讲解,有这方面需求的人可以来学习下,希望你能有所收获。Node.js...
    99+
    2022-10-19
  • Redis解决Session共享问题的方法详解
    企业项目中,一般都是将项目部署到多台服务器上,用nginx做负载均衡。这样可以减轻单台服务器的压力,不过这样也带来一些问题,例如之前单机部署的话,session存取都是直接了当的,因为请求就只到这一台服务器上,不需要考虑...
    99+
    2022-07-11
    Redis解决Session共享 Redis Session共享
  • 如何解决Python复杂zip文件的解压问题
    这篇文章主要介绍“如何解决Python复杂zip文件的解压问题”,在日常操作中,相信很多人在如何解决Python复杂zip文件的解压问题问题上存在疑惑,小编查阅了各式资料,整理出简单好用的操作方法,希望对大家解答”如何解决Python复杂z...
    99+
    2023-06-25
  • C++回溯与分支限界算法分别解决背包问题详解
    目录算法思想回溯代码分支限界代码算法思想 分支限界法与回溯法的求解目标不同。 回溯法的求解目标是找出解空间中满足约束条件的所有解,而分支限界法的求解目标则是找出满足约束条件的一个解,...
    99+
    2022-11-13
  • Java跨域问题分析与解决方法详解
    目录一、前言二、什么是跨域问题三、 为什么会出现跨域问题四、什么情况下会出现跨域五、如何解决跨域问题5.1 使用@CrossOrigin注解5.2 使用WebMvcConfigure...
    99+
    2023-05-20
    Java跨域问题原理 Java跨域问题解决方法 Java跨域问题
  • Dedecms图集上传zip压缩包解压图片顺序不对问题的解决方法
    其实这并不是DEDEcms解压出错,而是因为DEDECMS排列图标顺序是按解压时间的前后来排列的,知道了原因,我们只要改变DEDECMS默认的排列规则就可以解决这个问题了。大家可以看到: dedecms管理后台目录:de...
    99+
    2022-06-12
    Dedecms 图集 图片顺序不对
  • 如何解决webpack4 css打包压缩问题
    这篇文章将为大家详细讲解有关如何解决webpack4 css打包压缩问题,小编觉得挺实用的,因此分享给大家做个参考,希望大家阅读完这篇文章后可以有所收获。这两天一直在练习这个webpack4, 发现有好多问...
    99+
    2022-10-19
  • C++中编译优化问题和解决方法的详解
    C++中编译优化问题和解决方法的详解摘要:C++编译器优化是提高程序性能的重要手段。然而,在实际开发中,我们经常会遇到一些与编译优化相关的问题,比如编译器错误优化、代码性能下降等。本文将针对这些问题进行详细的解析,并提供相应的解决方法,以期...
    99+
    2023-10-22
    问题解决方法 C++编译优化 详解编程关键词
  • JS前端常见的竞态问题解决方法详解
    目录什么是竞态问题取消过期请求XMLHttpRequest 取消请求fetch API 取消请求axios 取消请求可取消的 promise忽略过期请求封装指令式 promise使用...
    99+
    2022-11-13
    JS前端竞态 前端竞态解决
  • CSS背景显示范围的问题如何解决
    今天小编给大家分享一下CSS背景显示范围的问题如何解决的相关知识点,内容详细,逻辑清晰,相信大部分人都还太了解这方面的知识,所以分享这篇文章给大家参考一下,希望大家阅读完这篇文章后有所收获,下面我们一起来了...
    99+
    2022-10-19
  • 图文详解XP升Win8磁盘问题解决方法
    安win8可能遇到磁盘分区问题   Win8正式版已经发布多时了,很多用户都希望能将原有的系统(Windows XP、windows7)升级到最新的win8正版,升级的具体方法包括通过Win8启动盘(制作U...
    99+
    2022-06-04
    解决方法 磁盘 详解
  • Java用递归方法解决汉诺塔问题详解
    目录前言一、问题描述二、问题分析三、解决方案四、示例前言 博主之前有写过关于递归问题的思维模式: 递归的思路 下面将用这种思维模式来求解经典汉诺塔问题。 一、问题描述 汉诺塔(又称河...
    99+
    2022-11-13
  • Android关于Button背景或样式失效问题解决方法
    目录前言问题描述:问题原因:解决方法:总结前言 最近在学习安卓开发的时候遇到了一个问题,使用Android Studio在为Button设置背景颜色的时候发现设置好后却在运行模拟机上...
    99+
    2022-11-13
  • IE6 position:fixed问题的解决方法
    IE6 position:fixed问题的解决方法,很多新手对此不是很清楚,为了帮助大家解决这个难题,下面小编将为大家详细讲解,有这方面需求的人可以来学习下,希望你能有所收获。...
    99+
    2022-10-19
  • Vue如何解决跨域问题详解
    什么是跨域 跨域指浏览器不允许当前页面的所在的源去请求另一个源的数据。源指协议,端口,域名。只要这个3个中有一个不同就是跨域。 这里列举一个经典的列子: #协议跨域http://a....
    99+
    2022-11-13
  • 如何解决IE下background背景图片无法显示问题
    本篇内容介绍了“如何解决IE下background背景图片无法显示问题”的有关知识,在实际案例的操作过程中,不少人都会遇到这样的困境,接下来就让小编带领大家学习一下如何处理这些情况吧!希望大家仔细阅读,能够...
    99+
    2022-10-19
  • 详解Java泛型中类型擦除问题的解决方法
    以前就了解过Java泛型的实现是不完整的,最近在做一些代码重构的时候遇到一些Java泛型类型擦除的问题,简单的来说,Java泛型中所指定的类型在编译时会将其去除,因此List&nbs...
    99+
    2022-11-13
  • 如何解决MySQL压缩版zip安装问题
    小编给大家分享一下如何解决MySQL压缩版zip安装问题,相信大部分人都还不怎么了解,因此分享这篇文章给大家参考一下,希望大家阅读完这篇文章后大有收获,下面让我们一起去了解一下吧!安装mysql的压缩版出现...
    99+
    2022-10-18
  • php压缩中文乱码问题如何解决
    这篇“php压缩中文乱码问题如何解决”文章的知识点大部分人都不太理解,所以小编给大家总结了以下内容,内容详细,步骤清晰,具有一定的借鉴价值,希望大家阅读完这篇文章能有所收获,下面我们一起来看看这篇“php压缩中文乱码问题如何解决”文章吧。p...
    99+
    2023-07-04
软考高级职称资格查询
编程网,编程工程师的家园,是目前国内优秀的开源技术社区之一,形成了由开源软件库、代码分享、资讯、协作翻译、讨论区和博客等几大频道内容,为IT开发者提供了一个发现、使用、并交流开源技术的平台。
  • 官方手机版

  • 微信公众号

  • 商务合作