环境介绍
本文所使用的运行环境如下:
- Kotlin 版本:1.9.10
- Coroutine 版本:1.7.3
- OpenJDK 版本:17.0.7+10-b829.16 x86_64
- IDE:IntelliJ IDEA 2023.2 (Ultimate Edition)
术语翻译
- flow:流
- emit:发送,发射(本文统一翻译成“发送”)
- emitter:发射器
- collect:收集
- collector:收集器
- terminal flow operators:末端流操作符,终端流操作符(本文统一翻译成“末端流操作符”)
- intermediate flow operators:中端流操作符,中间流操作符,过渡流操作符(本文统一翻译成“中端流操作符”)
- structured concurrency:结构化并发
思维导图
定义
Flow
是一种类似于序列的冷流,是序列的异步版本,这是一种收集类型,其中的值是逐个生成的。与序列一样,只有需要某个值时,Flow
才会根据需要生成该值,而且 Flow
可以包含无限数量的值。
Flow 函数定义如下,就只是表示这个类能够被订阅收集:
1 | public interface Flow<out T> { |
而其中的 FlowCollector
则定义了发送数据的功能:
1 | public fun interface FlowCollector<in T> { |
Flow
全面支持协程。这意味着您可以使用协程构建、转换和耗用 Flow
。您还可以控制并发,即利用 Flow
通过声明的方式协调多个协程的执行。
Flow 是值的异步序列
Flow
通过异步操作(例如网络请求、数据库调用或其他异步代码),一次生成一个值(而不是一次生成所有值)。它通过其 API 支持协程,因此您也可以使用协程来转换 flow。
在基础 Kotlin 协程中,
launch
启动的协程没有返回值,运行后就结束。async
启动的协程,通过await
函数获取协程的返回值,但只能返回单个值,且无法进行数据流操作。
而 Flow
的出现,恰好弥补了 Kotlin 协程对于多个值异步运算的不足,并且允许进行复杂的数据流操作。
Flow 如何运行
先看下面的示例:
1 | fun makeFlow() = flow { |
运行上面的代码会输出以下内容:
1 | sending first value |
可以看到 collect
lambda 与 flow
构建器交替执行。每次 flow
构建器调用 emit
时,它都会 suspends
,直到元素完全处理为止。当从流中请求另一个值时,它会从上次停止的位置 resume
,直到它再次调用 emit
。flow
构建器执行完成后,流将被取消,同时 collect
恢复,从而允许调用协程输出 “flow is completed”。
对 collect
的调用非常重要。流使用挂起操作符(也被称为末端操作符)(例如 collect
),而不是公开 Iterator
,以便始终知晓流何时在被主动耗用。更重要的是,它可以在调用方无法再请求更多值时获知消息,以便清理资源。
Flow
完全是使用协程构建的。通过使用协程的suspend
和resume
机制,可以将生产方 (Flow
) 的执行与使用方 (collect
) 同步。
Flow 何时运行
当 collect
操作符运行时,Flow
才会开始运行。通过调用 flow
构建器或其它 API 来创建新的 Flow
后,并不会立刻执行工作。挂起操作符 collect
在 Flow
中被称为末端操作符。还有很多其它末端操作符,例如 kotlinx-coroutines
附带的 toList
、first
和 single
等。
默认情况下,Flow
将在以下情况下执行:
- 每次调用末端操作符时(且每个新调用均与之前启动的任何调用无关)
- 直到运行
Flow
的协程被取消 - 当上一个值已完全处理,并且又请求了另一个值时
这些规则是 Flow
的默认行为,可以创建一个 Flow
,该 Flow
可以与先前运行的 Flow
共享状态,在每个末端操作符之间不会重新启动,并且通过内置或自定义的 Flow
转换在收集过程中独立执行。
执行 Flow
的过程称为收集 Flow。默认情况下,Flow
在被收集之前(即应用任何末端操作符)不会执行任何操作。
由于存在以上这些规则,所以 Flow
可以参与结构化并发,并且可以安全地从 Flow
启动长时间运行的协程。Flow
不会泄露资源,因为在调用方被取消时,系统始终会按照协程合作取消规则清理这些资源。
Flow supports structured concurrency
Because a flow allows you to consume values only with terminal operators, it can support structured concurrency.
When the consumer of a flow is cancelled, the entire
Flow
is cancelled. Due to structured concurrency, it is impossible to leak a coroutine from an intermediate step
我们使用 take
操作符修改一下上面的示例,以便仅查看前两个元素,然后再收集该 Flow
两次:
1 | import kotlinx.coroutines.flow.collect |
程序运行结果如下:
1 | first collection |
每次调用 collect
时,flow
lambda 都会从头开始执行。如果 Flow
执行高成本的工作(例如发出网络请求),则这一点非常重要。此外,由于我们应用了 take(2)
操作符,因此该 flow 将只生成两个值。第二次调用 emit
后,将不会再次恢复 flow lambda,因此将不会再输出“second value collected…”这一行。
默认情况下,每次应用末端操作符时,
Flow
都会从头重新开始执行。如果Flow
执行高成本的工作(如发出网络请求),则这一点非常重要。
Flow 构建器
创建 Flow
数据流最简单的方法是使用 flow
构建器:
1 | public fun <T> flow( |
其参数使用 suspend
修饰,是以 FlowCollector
作为接收者的挂起函数,能直接调用 emit
发送数据。
点击右上角运行按钮可以查看程序运行结果。
simple
函数会快速返回,并且不会等待任何内容。每次收集( collect
) 流时,流都会重新开始运行。这就是为什么每次调用 collect
时都会输出 “Flow started”。
根据上述代码我们还可知如下内容:
- Flow 类型的构建器函数名为
flow
。 flow { ... }
构建块中的代码可以挂起。- 函数
simple
不需要使用suspend
修饰符。 - 流使用
emit
函数 发射 值。 - 流使用
collect
函数 收集 值。
其它创建 Flow
的方式还有很多,例如 asFlow
、flowOf
等。
此外,官方还提供了很多将类型转换到流的扩展函数。大家可以查看下官方文档。
说明:
由
flow
函数的内部定义我们可以看出,block
是由suspend
修饰的,所以内部可以调用挂起函数。flow
代码块的emit
函数是线程不安全的,所以flow
函数的不能修改协程上下文,无法调用如withContext
等函数,避免下游collect
被调度到其他线程。如果要修改数据流的协程调度,只能调用
flowOn
函数。
Flow 的取消机制
Flow
的取消机制与协程的一致。在可取消挂起函数(例如: delay
)中,Flow
的 collect
操作是可以被取消的。
示例:
点击右上角运行按钮可以查看程序运行结果。
通过运行结果可以看出,程序运行 250ms 后,Flow
直接被取消了,Emitting 3
及 3
并没有输出。
通过 flow
构建器创建的 Flow
,会在每次 emit
值时,执行额外的 ensureActive
检测,从而可以取消 Flow
。
再看一个示例:
点击右上角运行按钮可以查看程序运行结果。
通过运行结果可以看到,当 value
等 3
时,虽然我们调用了 cancel()
函数,但是 Flow
并没有立刻被取消,而是在下一次发送值时,也就是执行 emit(4)
时,Flow
才被取消。
然而,出于性能考虑,大多数 Flow
操作符并不会进行额外的取消检测。例如,如果您使用 IntRange.asFlow
扩展来编写相同的循环,由于没有挂起操作,因此就不会有取消检测:
当在大量循环操作中使用 Flow
时,必须显示的进行取消检测。例如,可以通过添加 .onEach { currentCoroutineContext().ensureActive() }
来达到此目的。不过,有一个更简单的做法,那就是使用 cancellable
操作符:
运行结果表明,输出 3
之后,由于调用了 cancel()
因此产生 JobCancellationException
异常导致程序异常退出。
中端流操作符
可以使用操作符转换流,就像转换集合与序列一样。 过渡操作符应用于上游流,并返回下游流。 这些操作符也是冷操作符,就像流一样。这类操作符本身不是挂起函数。它运行的速度很快,返回新的转换流的定义。
基础的中端流操作符名字看起来很熟悉,比如 map
与 filter
。 这些操作符与序列的主要区别在于,这些操作符中的代码可以调用挂起函数。
举例来说,一个接收请求的流可以使用 map
操作符转换结果,即使是通过挂起函数,实现一个长时间的请求操作:
转换操作符 (Transform operator)
在流的转换操作符中,最通用的是称为 transform
的操作符。它可以用于模仿像 map
和 filter
这样的简单转换,以及实现更复杂的转换。使用 transform
操作符,我们可以发出任意值,并可发出任意次。
通过 transform
操作符,我们可以对流中的值进行更灵活的转换。这允许我们根据需要发出零个、一个或多个新值,甚至可以在一个单一的 emit
操作中多次发出不同的值。这种操作符在需要进行自定义、动态的流转换时非常有用,提供了更高级别的灵活性和控制权。
限长操作符 (Size-limiting operators)
限长中端操作符(例如 take
)在达到相应限制时会取消流的执行。在协程中,这种取消操作始终会抛出一个异常,从而取消流的执行,在 catch
块中,我们可以捕获这种异常。这种异常机制确保了在取消流的情况下,资源管理函数(如 try {...} finally {...}
块)会正常运行,从而确保了程序的正确性和可靠性:
末端流操作符
由于 Flow
是冷流,因此必须要调用末端操作符(例如,collect
)才会执行数据流的生产操作。
- 数据流的创建与数据流的消费是成对出现的
- 多个数据流订阅消费,也会同样有多个数据源生产创建
流的末端操作符是一种启动流收集的挂起函数。collect
操作符是最基础的一个,另外还有一些更方便使用的末端操作符:
例如:
流是顺序执行的
流的每次单独收集操作都是顺序执行的,除非使用了能够操作多个流的特殊操作符(例如 buffer
)。收集操作直接在调用末端操作符的协程中执行。默认情况下不会启动新协程。每个发送 (emit) 的值都会经过从上游到下游的所有中端操作符的处理,最后传递给末端操作符处理。
请看以下示例,该示例对偶数进行过滤并将它们映射为字符串:
请点击右上角运行按钮,查看运行结果,了解流的具体执行过程。
流上下文
流的收集始终在调用协程的上下文中进行。
例如,如果有一个 simple
流,那么下面的代码会在指定的上下文 context
中运行,而不受 simple
流实现细节的影响:
1 | withContext(context) { |
流的这种属性称为上下文保留 (context preservation) 。
默认情况下,flow { ... }
构建器中的代码是运行在相应流的收集器提供的上下文中的。让我们来看一个示例:
但 collect { ... }
中的代码是允许切换上下文的。
切换上下文的常见陷阱
对于那些需要长时间运行且消耗 CPU 的代码,可能会在 Dispatchers.Default
上下文中执行,而更新 UI 的代码可能需要在 Dispatchers.Main
上下文中执行。通常情况下,可以使用 withContext
在 Kotlin 协程中切换上下文,但是在 flow { ... }
构建器中的代码遵循上下文保留属性,因此在 flow { ... }
构建器中的代码是不允许从不同的上下文发送数据的。
以下示例中的 simple()
流并不会完成切换上下文操作,相反还会产生异常:
flowOn 操作符
切换流运行的上下文(即 flow { ... }
构建器中的代码)的正确方法是使用 flowOn
函数。具体用法请看下面的示例:
本例中需要注意的是,flow { ... }
中的代码运行在后台线程,而收集操作则运行在主线程。
还有一点值得留意,flowOn
操作符改变了流的默认顺序性质 (default sequential nature),而默认情况下,流数据的产生 (emit) 和处理 (collect) 是发生在相同协程中的(详见之前的示例)。收集操作发生在(”coroutine#1”)中,而发送操作发生在另一个协程(”coroutine#2”)中,后者运行在与收集协程所在线程并发运行的另一个线程中。当使用 flowOn
操作符切换 CoroutineDispatcher
时,它为上游流创建了一个新的协程。
缓冲(buffer)
从整体上来讲,为了减小收集流所花费的时间,将流的不同部分在不同的协程中运行是一个行之有效的办法,特别是涉及到长时间运行的异步操作时。先来看一个示例:
在上例中,每产生一个数据耗时约 100ms,每处理一个数据耗时约 300ms。对于 3 个数据,流的总体运行时间约为 1200ms = 3 * (100ms + 300ms) 左右。
这里我们可以引入 buffer
操作符,改变流的运行顺序,让流的发送代码与收集代码并行运行。来看下面的示例:
1 | import kotlinx.coroutines.* |
运行结果如下:
1 | 1 |
让我们来分析一下执行过程:
在这个示例中,由于我们使用了 buffer
操作符,因此流的运行顺序不再像默认情况那样,需要等待收集处理完数据后,才可以再次发送数据。使用 buffer
操作符后,流的运行顺序变为发送数据无需等待收集处理结束,便可以直接开始下一次的数据发送。buffer
操作符让流的发送部分与收集部分并行运行了。
所以上例中,发送第一个数据前等待了 100ms,之后不需要等待 emit(1)
的收集处理完成(也就是无需等待 collect
处理所需要的 300ms ),便可直接开始下一次循环。也就是说在大约 300ms 时间,3 个数据就可以全部发送出去。而收集部分每处理一个数据还是需要花费 300ms 的。
由于发送与收集现在是并行的,在发送方 100ms 后发送第一个数据开始,收集方就开始处理数据(处理花费 300ms),与此同时发送方无需等待继续开始下一次的数据发送。因此流的总体运行时间大约为 100ms + 3 * 300ms = 1000ms。
我们可以用一个时间轴来直观的看一下程序随着时间运行的过程(理想情况,不考虑每条命令自身花费时间):
1 | └──────┸──────┸──────┸──────┸──────┸──────┸──────┸──────┸──────┸──────┘ |
注意,当我们使用 flowOn
操作符切换 CoroutineDispatcher
时,flowOn
操作符会使用相同的缓冲机制,只不过 buffer
操作符不会改变执行的上下文。
合并(conflate)
有时我们可能不需要在 collect
时处理每个被发送过来的值,而只需要处理最近较新的值(only most recent ones)。在这种情况下,可以使用 conflate
操作符,在收集器处理速度较慢时跳过中间值。让我们基于之前的示例进行演示,新代码如下:
1 | import kotlinx.coroutines.* |
这个示例中,仅把 buffer()
替换成成了 conflate()
,让我们看一下运行结果:
1 | 1 |
可以看到,在第一条数据处理过程中,已经产生了第二条和第三条数据,因此当第一条数据处理完之后,已经有了两条数据待处理,因此当继续收集数据时,第二个数据就成了被合并 (conflated) 的数据,只有最新的数据(第三条数据)发送给收集器进行处理。
让我们再用一个时间轴来直观的看一下程序随着时间运行的过程(理想情况,不考虑每条命令自身花费时间):
1 | └──────┸──────┸──────┸──────┸──────┸──────┸──────┸──────┸──────┸──────┘ |
最后让我们看一下 conflate
函数的实现:
1 | public fun <T> Flow<T>.conflate(): Flow<T> = buffer(CONFLATED) |
可以看到 conflate
的本质还是 buffer
,只不过将 buffer
的第一个参数 capacity
设置成了 CONFLATED
而已。而 buffer
的 capacity
参数默认值是 BUFFERED
。
处理最新值(xxxLatest)
当发射器和收集器的处理都较慢时,合并(Conflation)是加快处理速度的一种方法,它通过丢弃发出的值来实现。另一种方法是在每次发出新值时取消较慢的收集器并重新启动它。Flow API 中有一系列的 xxxLatest
操作符,其执行与 xxx
操作符相同的基本逻辑,但会在有新值时取消其代码块的执行。
让我们尝试将上一个示例中的 conflate
替换成 collectLatest
,新示例如下:
1 | import kotlinx.coroutines.* |
运行结果如下:
1 | Collecting 1 |
由于 collectLatest
的函数体需要花费 300 毫秒,但是每 100 毫秒才发送一次值,所以我们看到每个发送过来的值虽然都可以进入 collectLatest
代码块,但是只有最后一个值可以完整的被收集器处理。
让我们再用一个时间轴来直观的看一下程序随着时间运行的过程(理想情况,不考虑每条命令自身花费时间):
1 | └──────────┸──────────┸──────────┸──────────┸──────────┸──────────┸──────────┸──────────┘ |
最后,以 collectLatest
为例,让我们看一下它的函数实现:
1 | public suspend fun <T> Flow<T>.collectLatest(action: suspend (value: T) -> Unit) { |
可以看到,collectLatest
内部依然使用了 buffer
,并将 capacity
参数的值设置成了 0
。
组合多个流
组合多个流有很多种方式。
Zip
就像 Kotlin 标准库中的 Sequence.zip 扩展函数一样, 流同样拥有一个 zip
操作符,用于组合两个流中的相关值。请看如下示例:
1 | import kotlinx.coroutines.* |
执行结果如下:
1 | 1 -> one |
Combine
当 Flow
代表变量或操作的最近较新值时 (most recent value)(参见 conflate
部分),可能需要执行一个计算操作,该计算依赖于相应 Flow
的最近较新值,并在任何上游 Flow
发出值时重新计算它。对应的操作符系列 (corresponding family of operators) 被称为 combine
。
对于先前那个例子,如果 nums
每 300 毫秒更新一次,但 strs
每 400 毫秒更新一次,然后再使用 zip
操作符合并它们,仍会产生相同的结果,只不过结果每 400 毫秒才打印一次。修改后的代码如下:
在这个示例中,我们使用了
onEach
中端操作符来延时发送每个元素,这会让代码更加简洁明了。
1 | import kotlinx.coroutines.* |
运行结果如下:
1 | 1 -> one at 445 ms from start |
如果我们再将例子中的 zip
替换成 combine
的话,结果又会如何呢?修改后的代码如下:
1 | import kotlinx.coroutines.* |
执行结果如下:
1 | 1 -> one at 444 ms from start |
可以看到,我们得到了完全不同的结果。nums
或 strs
流中的每次数据发送,都会打印一行结果。
让我们再用一个时间轴来直观的看一下程序随着时间运行的过程(理想情况,不考虑每条命令自身花费时间):
1 | └──────┸──────┸──────┸──────┸──────┸──────┸──────┸──────┸──────┸──────┸──────┸──────┘ |
展平流 (Flattening flows)
流代表异步接收的值序列,因此很容易遇到这样的情况: 每个值都会触发对另一个值序列的请求。例如,以下函数会返回两个字符串的流,返回间隔 500 毫秒:
1 | fun requestFlow(i: Int): Flow<String> = flow { |
现在,如果我们有一个包含三个整数的流,并对每个整数调用 requestFlow
,如下所示:
1 | val flowWithMap: Flow<Flow<String>> = (1..3).asFlow().map { requestFlow(it) } |
这样我们就得到了一个包含流的流 (Flow<Flow<String>>
),若需要对其做进一步处理的话,需要先将其展平 (flatten) 为单一流。集合和序列都有 flatten
与 flatMap
操作符来实现这一点。然而,由于流的异步性质,我们需要不同的展平模式,因此就需要一系列应用在流上的展平操作符。
flatMapConcat
将流的流串联由 flatMapConcat
与 flattenConcat
操作符提供。它们是相应序列操作符的最直接类比。它们在等待内部流完成之后才开始收集下一个值,请看下面的示例:
1 | import kotlinx.coroutines.* |
运行结果如下:
1 | 1: First at 121 ms from start |
通过输出结果,我们可以清晰地看到 flatMapConcat
的顺序特性。
flatMapMerge
另一种展平操作是并发收集所有传入的流,并将它们的值合并到一个单独的流,以便尽快的发出值。 它由 flatMapMerge
与 flattenMerge
操作符实现。它们都接受一个可选的并发参数 concurrency
(默认情况下,它等于 DEFAULT_CONCURRENCY),该参数限制了同时收集的流的数量。
来看下面的示例:
1 | import kotlinx.coroutines.* |
运行结果如下:
1 | 1: First at 173 ms from start |
可以明显的看到 flatMapMerge
的同步特性。
请注意,
flatMapMerge
虽然是按顺序调用其代码块(即本例中的{ requestFlow(it) }
),但会同时并发收集结果流,相当于执行顺序是首先执行map { requestFlow(it) }
,然后在其返回结果上调用flattenMerge
。
flatMapLatest
与 collectLatest
操作符类似(在“处理最新值”一节中已经描述过),同样存在相对应的“最新” (Latest) 展平模式,在发出新流后立即取消先前流的收集。 这由 flatMapLatest
操作符来实现。
请看下面的示例:
1 | import kotlinx.coroutines.* |
执行结果如下:
1 | 1: First at 164 ms from start |
上面这个例子非常好的展示了 flatMapLatest
的工作方式。
请注意,
flatMapLatest
在收到一个新值时,会取消块中的所有代码 (即本例中的{ requestFlow(it) }
)。在上面这个特定示例中可能看不出有什么影响,因为调用requestFlow
自身是快速的,非挂起的,且不能被取消。然而,如果我们在requestFlow
中使用类似delay
这样的挂起函数的话,那么输出结果的差异将会显现出来。
Android 上的 Kotlin 数据流
数据流 Flow 包含三个重要角色:
- 数据提供方:生成添加到数据流中的数据。得益于协程,数据流还可以异步生成数据。
- 中介(可选):可修改发送到数据流的值,或修正数据流本身。
- 数据使用方:使用数据流中的值。
StateFlow
StateFlow
是一个状态容器可观察数据流,可向其收集器发出当前状态更新和新状态更新。也可以通过其 value
属性读取当前状态值。如需更新状态并将其发送到数据流,请为 MutableStateFlow
类的 value
属性分配一个新值。
在 Android 中,StateFlow
非常适合需要让可变状态保持可观察的类。
该文章中有如下一段话,很好的总结了什么时候应该用 StateFlow
:
When exposing UI state to a view, use StateFlow. It’s a safe and efficient observer designed to hold UI state.
如需将任何数据流转换为 StateFlow
,请使用 stateIn
中间操作符。
利用 shareIn 使冷数据流变为热数据流
StateFlow
是热数据流,只要该数据流被收集,或对它的任何其他引用在垃圾回收根中存在,该数据流就会一直存于内存中。您可以使用 shareIn
操作符将冷数据流变为热数据流。
SharedFlow
shareIn
函数会返回一个热数据流 SharedFlow
,此数据流会向从其中收集值的所有使用方发出数据。SharedFlow
是 StateFlow
的可配置性极高的泛化数据流。
您无需使用 shareIn
即可创建 SharedFlow
。
Flow 常用 API
stateIn
shareIn
flatMapLatest
(switchMap
has been deprecated)transformLatest
collectAsStateWithLifecycle
combine
- combineTransform
- zip
filter
map
onEach
flowOn
transform
数据流收集被中止原因
数据流收集可能会由于以下原因而停止:
- 收集数据的协程被取消,此操作也会让 数据提供方 停止活动。
- 数据提供方完成发出数据项。在这种情况下,数据流将关闭,调用
collect
的协程则继续执行。
捕获异常
如需处理异常,可以使用 catch 操作符,如:
1 | fun getNewsData() { |