广告
返回顶部
首页 > 资讯 > 移动开发 >Kotlin协程之Flow基础原理示例解析
  • 595
分享到

Kotlin协程之Flow基础原理示例解析

2024-04-02 19:04:59 595人浏览 安东尼
摘要

目录引言一.Flow的创建二.Flow的消费1.SafeFlow类2.AbstractFlow类3. SafeCollector类4.消费过程中的挂起引言 本文分析示例代码如下: l

引言

本文分析示例代码如下:

launch(Dispatchers.Main) {
    flow {
        emit(1)
        emit(2)
    }.collect {
        delay(1000)

        withContext(Dispatchers.io) {
            Log.d("liduo", "$it")
        }

        Log.d("liduo", "$it")
    }
}

一.Flow的创建

在协程中,可以通过flow方法创建一个Flow对象,一个Flow对象代表一个冷流。其中参数block是FlowCollector的扩展方法,并且可挂起。代码入下:

public fun <T> flow(@BuilderInference block: suspend FlowCollector<T>.() -> Unit): Flow<T> = SafeFlow(block)

FlowCollector是一个接口,用于收集上游的流发出的值,代码如下:

public interface FlowCollector<in T> {
    // 可挂起,非线程安全
    public suspend fun emit(value: T)
}

调用flow方法,会返回一个Flow接口指向的对象,代码如下:

public interface Flow<out T> {
   
    @InternalCoroutinesapi
    public suspend fun collect(collector: FlowCollector<T>)
}

这里flow方法的返回对象是一个SafeFlow类型的对象。至此Flow就创建完毕了。

二.Flow的消费

在协程中,当需要消费流时,会调用collect方法,触发流的消费,代码如下:

public suspend inline fun <T> Flow<T>.collect(crossinline action: suspend (value: T) -> Unit): Unit =
    collect(object : FlowCollector<T> {
        override suspend fun emit(value: T) = action(value)
    })

这里的collect方法不是Flow接口定义的方法,而是Flow的扩展方法,内部创建了一个匿名的FlowCollector对象,并且把action封装到了FlowCollector对象的emit方法中,最后将FlowCollector对象作为参数传入到了另一个collect方法,这个collect方法才是Flow接口定义的方法。

1.SafeFlow类

根据上面的分析,Flow对象最后返回的是一个SafeFlow类型的对象。因此,这里调用的另一个collect方法,就是SafeFlow类中的collect方法,代码如下:

private class SafeFlow<T>(private val block: suspend FlowCollector<T>.() -> Unit) : AbstractFlow<T>() {
    override suspend fun collectSafely(collector: FlowCollector<T>) {
        collector.block()
    }
}

SafeFlow类继承自AbstractFlow类,类中重写了collectSafely方法。因此调用的collect方法实际上是AbstractFlow类的方法。

2.AbstractFlow类

AbstractFlow类是一个抽象类,实现了Flow接口和CancellableFlow接口。实际上CancellableFlow接口继承自Flow接口,因此AbstractFlow类只重写了collect方法,代码如下:

@FlowPreview
public abstract class AbstractFlow<T> : Flow<T>, CancellableFlow<T> {

     // 核心方法 
    @InternalCoroutinesApi
    public final override suspend fun collect(collector: FlowCollector<T>) {
        // 创建SafeCollector对象,对collector进行包裹
        val safeCollector = SafeCollector(collector, coroutineContext)
        try {
            // 调用collectSafely方法
            collectSafely(safeCollector)
        } finally {
            // 释放拦截的续体
            safeCollector.releaseIntercepted()
        }
    }
    
    public abstract suspend fun collectSafely(collector: FlowCollector<T>)
}

collect方法内部调用了collectSafely方法,collectSafely方法在SafeFlow中被重写。collectSafely方法中会调用flow中的block,并提供一个SafeCollector类的环境。

3. SafeCollector类

当flow方法中的代码在执行时,会调用emit方法发射数据,这时由于block执行在SafeCollector类的环境中,因此调用的emit方法是SafeCollector类的方法。

SafeCollector类实现了FlowCollector接口并且继承自ContinuationImpl类,代码如下:

internal actual class SafeCollector<T> actual constructor(
    @JVMField internal actual val collector: FlowCollector<T>,
    @JvmField internal actual val collectContext: CoroutineContext
) : FlowCollector<T>, ContinuationImpl(NoOpContinuation, EmptyCoroutineContext), CoroutineStackFrame {
    
    ...
    // 保存上下文中元素数量,用于检查上下文是否变化
    @JvmField
    internal actual val collectContextSize = collectContext.fold(0) { count, _ -> count + 1 }
    // 保存上一次的上下文
    private var lastEmissionContext: CoroutineContext? = null
    // 执行结束后的续体
    private var completion: Continuation<Unit>? = null

    // 协程上下文
    override val context: CoroutineContext
        get() = completion?.context ?: EmptyCoroutineContext

    // 挂起的核心方法
    override fun invokeSuspend(result: Result<Any?>): Any? {
        result.onFailure { lastEmissionContext = DownstreamExceptionElement(it) }
        completion?.resumeWith(result as Result<Unit>)
        return COROUTINE_SUSPENDED
    }

    // 释放拦截的续体
    public actual override fun releaseIntercepted() {
        super.releaseIntercepted()
    }

    // 发射数据
    override suspend fun emit(value: T) {
        // 获取当前suspend方法续体
        return suspendCoroutineUninterceptedOrReturn sc@{ uCont ->
            try {
                // 调用重载的方法
                emit(uCont, value)
            } catch (e: Throwable) {
                 // 出现异常时,将异常封装成上下文,保存到lastEmissionContext
                lastEmissionContext = DownstreamExceptionElement(e)
                // 抛出异常
                throw e
            }
        }
    }

    // 重载的emit方法
    private fun emit(uCont: Continuation<Unit>, value: T): Any? {
        // 从续体中获取上下文
        val currentContext = uCont.context
        // 保证当前协程的Job是active的
        currentContext.ensuReactive()
        // 获取上次的上下文
        val previousContext = lastEmissionContext
        // 如果前后上下文发生变化
        if (previousContext !== currentContext) {
            // 检查上下文是否发生异常
            checkContext(currentContext, previousContext, value)
        }
        // 保存续体
        completion = uCont
        // 调用emitFun方法,传入collector,value,continuation
        return emitFun(collector as FlowCollector<Any?>, value, this as Continuation<Unit>)
    }

    // 检查上下文变化,防止并发
    private fun checkContext(
        currentContext: CoroutineContext,
        previousContext: CoroutineContext?,
        value: T
    ) {
        // 如果上次执行过程中发生了异常
        if (previousContext is DownstreamExceptionElement) {
            // 抛出异常
            exceptionTransparencyViolated(previousContext, value)
        }
        // 检查上下文是否发生变化,如果变化,则抛出异常
        checkContext(currentContext)
        lastEmissionContext = currentContext
    }
    
    // 用于抛出异常
    private fun exceptionTransparencyViolated(exception: DownstreamExceptionElement, value: Any?) {
        error("""
            Flow exception transparency is violated:
                Previous 'emit' call has thrown exception ${exception.e}, but then emission attempt of value '$value' has been detected.
                Emissions from 'catch' blocks are prohibited in order to avoid unspecified behaviour, 'Flow.catch' operator can be used instead.
                For a more detailed explanation, please refer to Flow documentation.
            """.trimIndent())
    }
}

emit方法最终会调用emitFun方法方法,代码如下:

private val emitFun =
    FlowCollector<Any?>::emit as Function3<FlowCollector<Any?>, Any?, Continuation<Unit>, Any?>

emitFun是一个lambda表达式,它将只有一个参数的emit方法转换成三个参数的方法。emitFun方法在编译时会被编译器处理,反编译后的代码逻辑大致如下:

@Nullable
public final Object invoke(@NotNull FlowCollector p1, @Nullable Object p2, @NotNull Continuation continuation) {
   InlineMarker.mark(0);
   // 核心执行
   Object var10000 = p1.emit(p2, continuation);
   InlineMarker.mark(2);
   InlineMarker.mark(1);
   return var10000;
}

可以看到,emitFun方法内部会调用FlowCollector类对象的emit方法,同时传入value和continuation作为参数。

而这个FlowCollector类对象就是一开始的collect方法封装的匿名类对象,代码如下:

public suspend inline fun <T> Flow<T>.collect(crossinline action: suspend (value: T) -> Unit): Unit =
    collect(object : FlowCollector<T> {
        override suspend fun emit(value: T) = action(value)
    })

调用它的emit方法,会直接调用action的invoke方法,并传入发射的数据,流在这里被最终消费。

通过上面的分析,可以知道消费的过程是在emit方法中被调用的,如果在消费的过程,没有发生挂起,那么emit方法执行完毕后,会继续执行flow方法里剩下的代码,而如果在消费的过程中发生了挂起,情况会稍有不同。

4.消费过程中的挂起

如果消费过程中发生挂起,那么emit方法会返回一个COROUTINE_SUSPENDED对象,suspendCoroutineUninterceptedOrReturn方法在收到COROUTINE_SUSPENDED对象后,会挂起当前协程。代码如下:

override suspend fun emit(value: T) {
    // 获取当前suspend方法续体
    return suspendCoroutineUninterceptedOrReturn sc@{ uCont ->
        try {
            // 调用重载的方法
            emit(uCont, value)
        } catch (e: Throwable) {
            // 出现异常时,将异常封装成上下文,保存到lastEmissionContext
            lastEmissionContext = DownstreamExceptionElement(e)
            // 抛出异常
            throw e
        }
    }
}

当消费过程执行完毕时,会通过传入的续体唤起外部协程恢复挂起状态。根据emitFun可以知道,这里传入的续体为this,也就是当前的SafeCollector类对象,代码如下:

emitFun(collector as FlowCollector<Any?>, value, this as Continuation<Unit>)

恢复挂起需要调用续体的resumeWith方法,上面提到SafeCollector类继承自ContinuationImpl类,SafeCollector类中没有重写resumeWith方法,而ContinuationImpl类中也没有重写resumeWith方法,因此实际调用的是ContinuationImpl类的父类BaseContinuationImpl类的resumeWith方法。如下图所示:

Kotlin协程:创建、启动、挂起、恢复中提到过,调用BaseContinuationImpl类的resumeWith方法,内部会调用invokeSuspend方法,而SafeCollector类重写了invokeSuspend方法,代码如下:

override fun invokeSuspend(result: Result<Any?>): Any? {
    // 尝试获取异常
    result.onFailure { lastEmissionContext = DownstreamExceptionElement(it) }
    // 如果没有异常,则恢复flow方法续体的执行
    completion?.resumeWith(result as Result<Unit>)
    // 返回挂起标识,这里挂起的是消费过程
    return COROUTINE_SUSPENDED
}

在invokeSuspend方法中,会调用resumeWith方法恢复生产过程——flow方法的执行,同时挂起消费过程的执行。全部过程如下图所示:

以上就是Kotlin协程之Flow基础原理示例解析的详细内容,更多关于Kotlin协程Flow原理的资料请关注编程网其它相关文章!

--结束END--

本文标题: Kotlin协程之Flow基础原理示例解析

本文链接: https://www.lsjlt.com/news/166533.html(转载时请注明来源链接)

有问题或投稿请发送至: 邮箱/279061341@qq.com    QQ/279061341

本篇文章演示代码以及资料文档资料下载

下载Word文档到电脑,方便收藏和打印~

下载Word文档
猜你喜欢
  • Kotlin协程之Flow基础原理示例解析
    目录引言一.Flow的创建二.Flow的消费1.SafeFlow类2.AbstractFlow类3. SafeCollector类4.消费过程中的挂起引言 本文分析示例代码如下: l...
    99+
    2022-11-13
  • Kotlin协程之Flow异常示例处理
    目录示例一.catch方法catchImpl方法二. onCompletion方法1.unsafeFlow方法2.ThrowingCollector类三. retryWhen方法示例...
    99+
    2022-11-13
  • Kotlin协程之Flow触发与消费示例解析
    目录示例一.Flow的触发与消费1.onEach方法2.transform方法3.collect方法二.多消费过程的执行三.总结示例 代码如下: launch(Dispatchers...
    99+
    2022-11-13
  • Kotlin协程的基础与使用示例详解
    目录一.协程概述1.概念2.特点3.原理1)续体传递2)状态机二.协程基础1.协程的上下文2.协程的作用域3.协程调度器4.协程的启动模式5.协程的生命周期1)协程状态的转换2)状态...
    99+
    2022-11-13
  • Kotlin协程Dispatchers原理示例详解
    目录前置知识demostartCoroutineCancellableintercepted()函数DefaultScheduler中找dispatch函数Runnable传入Wor...
    99+
    2022-11-13
    Kotlin协程Dispatchers Kotlin Dispatchers
  • Python基础之进程的示例分析
    这篇文章将为大家详细讲解有关Python基础之进程的示例分析,小编觉得挺实用的,因此分享给大家做个参考,希望大家阅读完这篇文章后可以有所收获。一、前言进程,一个新鲜的字眼,可能有些人并不了解,它是系统某个运行程序的载体,这个程序可以有单个或...
    99+
    2023-06-15
  • Java基础之线程锁的示例分析
    这篇文章将为大家详细讲解有关Java基础之线程锁的示例分析,小编觉得挺实用的,因此分享给大家做个参考,希望大家阅读完这篇文章后可以有所收获。一、 synchronized关键字对象锁a.当使用对象锁的时候,注意要是相同的对象,并且当有线程正...
    99+
    2023-06-20
  • nodejs基础之多进程的示例分析
    小编给大家分享一下nodejs基础之多进程的示例分析,相信大部分人都还不怎么了解,因此分享这篇文章给大家参考一下,希望大家阅读完这篇文章后大有收获,下面让我们一起去了解一下吧!具体如下:Node.js 多进...
    99+
    2022-10-19
  • Java基础元注解基本原理示例详解
    目录元注解@DocumentedIDEA Documented 文档生成@Target@Retention@Inherited示例@Repeatable示例@Native元注解 是...
    99+
    2023-01-17
    Java 元注解 Java 基础原理
  • Go语言基础闭包的原理分析示例详解
    目录一. 闭包概述二. 代码演示运行结果代码说明一. 闭包概述 闭包就是解决局部变量不能被外部访问的一种解决方案 闭包是把函数当作返回值的一种应用 二. 代码演示...
    99+
    2022-11-12
  • RxJava实战之订阅流基本原理示例解析
    目录正文订阅流subscribe的解读收下小结正文 本节,我们从Rxjava使用代码入手,去结合自己已有的知识体系,加查阅部分源码验证的方式,来一起探索一下Rxjava实现的基本原...
    99+
    2022-12-30
    RxJava订阅流基本原理 RxJava订阅流
  • mongodb基础之用户权限管理的示例分析
    这篇文章主要介绍了mongodb基础之用户权限管理的示例分析,具有一定借鉴价值,感兴趣的朋友可以参考下,希望大家阅读完这篇文章之后大有收获,下面让小编带着大家一起了解一下。启动mongodb并连接./bin...
    99+
    2022-10-18
  • RxJava构建流基本原理示例解析
    目录正文1.构建流1.1 just的解读1.2 map的解读1.3 subscribeOn、observeOn1.4 小结正文 本节,我们从Rxjava使用代码入手,去结合自己已有...
    99+
    2022-12-30
    RxJava构建流基本原理 RxJava构建流
  • java理论基础StreamAPI终端操作示例解析
    目录一、JavaStream管道数据处理操作二、ForEach和ForEachOrdered三、元素的收集collect3.1.收集为Set3.2.收集到List3.3.通用的收集方...
    99+
    2022-11-13
  • 计算机网络编程MQTT协议基础原理详解
    目录什么是 MQTT 协议MQTT 基础发布 - 订阅模式可拓展性消息过滤基于主题的过滤基于内容的过滤基于类型的过滤MQTT 与消息队列的区别MQTT 重要概念MQTT client...
    99+
    2022-11-12
  • Java基础之三大控制流程结构的示例分析
    这篇文章给大家分享的是有关Java基础之三大控制流程结构的示例分析的内容。小编觉得挺实用的,因此分享给大家做个参考,一起跟随小编过来看看吧。常用的java框架有哪些1.SpringMVC,Spring Web MVC是一种基于Java的实现...
    99+
    2023-06-14
  • Spring Aop基本流程原理示例详解
    一、代理对象的创建过程: AbstractAutowireCapableBeanFactory#initializeBean protectedObjectinitializeB...
    99+
    2022-11-12
  • java理论基础函数式接口特点示例解析
    目录一、函数式接口是什么?二、函数式接口的特点三、default关键字四、JDK中的函数式接口举例五、自定义Comparator排序一、函数式接口是什么? 所谓的函数式接口,实际上就...
    99+
    2022-11-13
  • MySQL数据库基础篇SQL窗口函数示例解析教程
    目录本文简介正文介绍聚合函数 + over()排序函数 + over()ntile()函数 + over()偏移函数 + over()本文简介 前段时间,黄同学写了一篇《MySQL窗...
    99+
    2022-11-12
  • java编程之AC自动机工作原理的示例分析
    这篇文章将为大家详细讲解有关java编程之AC自动机工作原理的示例分析,小编觉得挺实用的,因此分享给大家做个参考,希望大家阅读完这篇文章后可以有所收获。1.应用场景—多模字符串匹配我们现在考虑这样一个问题,在一个文本串text中,我们想找出...
    99+
    2023-05-30
    java
软考高级职称资格查询
编程网,编程工程师的家园,是目前国内优秀的开源技术社区之一,形成了由开源软件库、代码分享、资讯、协作翻译、讨论区和博客等几大频道内容,为IT开发者提供了一个发现、使用、并交流开源技术的平台。
  • 官方手机版

  • 微信公众号

  • 商务合作