0%

Kotlin Flow

环境介绍

本文所使用的运行环境如下:

术语翻译

  • flow:流
  • emit:发送,发射(本文统一翻译成“发送”)
  • emitter:发射器
  • collect:收集
  • collector:收集器
  • terminal flow operators:末端流操作符,终端流操作符(本文统一翻译成“末端流操作符”)
  • intermediate flow operators:中端流操作符,中间流操作符,过渡流操作符(本文统一翻译成“中端流操作符”)
  • structured concurrency:结构化并发

思维导图

定义

Flow 是一种类似于序列的冷流,是序列的异步版本,这是一种收集类型,其中的值是逐个生成的。与序列一样,只有需要某个值时,Flow 才会根据需要生成该值,而且 Flow 可以包含无限数量的值。

Flow 函数定义如下,就只是表示这个类能够被订阅收集:

1
2
3
public interface Flow<out T> {
public suspend fun collect(collector: FlowCollector<T>)
}

而其中的 FlowCollector 则定义了发送数据的功能:

1
2
3
public fun interface FlowCollector<in T> {
public suspend fun emit(value: T)
}

Flow 全面支持协程。这意味着您可以使用协程构建、转换和耗用 Flow。您还可以控制并发,即利用 Flow 通过声明的方式协调多个协程的执行。

Flow 是值的异步序列

Flow 通过异步操作(例如网络请求、数据库调用或其他异步代码),一次生成一个值(而不是一次生成所有值)。它通过其 API 支持协程,因此您也可以使用协程来转换 flow。

在基础 Kotlin 协程中,

  • launch 启动的协程没有返回值,运行后就结束。
  • async 启动的协程,通过 await 函数获取协程的返回值,但只能返回单个值,且无法进行数据流操作。

Flow 的出现,恰好弥补了 Kotlin 协程对于多个值异步运算的不足,并且允许进行复杂的数据流操作。

Flow 如何运行

先看下面的示例:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
fun makeFlow() = flow {
println("sending first value")
emit(1)
println("first value collected, sending another value")
emit(2)
println("second value collected, sending a third value")
emit(3)
println("done")
}

scope.launch {
makeFlow().collect { value ->
println("got $value")
}
println("flow is completed")
}

运行上面的代码会输出以下内容:

1
2
3
4
5
6
7
8
sending first value
got 1
first value collected, sending another value
got 2
second value collected, sending a third value
got 3
done
flow is completed

可以看到 collect lambda 与 flow 构建器交替执行。每次 flow 构建器调用 emit 时,它都会 suspends,直到元素完全处理为止。当从流中请求另一个值时,它会从上次停止的位置 resume,直到它再次调用 emitflow 构建器执行完成后,流将被取消,同时 collect 恢复,从而允许调用协程输出 “flow is completed”。

collect 的调用非常重要。流使用挂起操作符(也被称为末端操作符)(例如 collect),而不是公开 Iterator,以便始终知晓流何时在被主动耗用。更重要的是,它可以在调用方无法再请求更多值时获知消息,以便清理资源。

Flow 完全是使用协程构建的。通过使用协程的 suspendresume 机制,可以将生产方 (Flow) 的执行与使用方 (collect) 同步。

Flow 何时运行

collect 操作符运行时,Flow 才会开始运行。通过调用 flow 构建器或其它 API 来创建新的 Flow 后,并不会立刻执行工作。挂起操作符 collectFlow 中被称为末端操作符。还有很多其它末端操作符,例如 kotlinx-coroutines 附带的 toListfirstsingle 等。

默认情况下,Flow 将在以下情况下执行:

  • 每次调用末端操作符时(且每个新调用均与之前启动的任何调用无关)
  • 直到运行 Flow 的协程被取消
  • 当上一个值已完全处理,并且又请求了另一个值时

这些规则是 Flow 的默认行为,可以创建一个 Flow,该 Flow 可以与先前运行的 Flow 共享状态,在每个末端操作符之间不会重新启动,并且通过内置或自定义的 Flow 转换在收集过程中独立执行。

执行 Flow 的过程称为收集 Flow。默认情况下,Flow被收集之前(即应用任何末端操作符)不会执行任何操作。

由于存在以上这些规则,所以 Flow 可以参与结构化并发,并且可以安全地从 Flow 启动长时间运行的协程。Flow 不会泄露资源,因为在调用方被取消时,系统始终会按照协程合作取消规则清理这些资源。

Flow supports structured concurrency

Because a flow allows you to consume values only with terminal operators, it can support structured concurrency.

When the consumer of a flow is cancelled, the entire Flow is cancelled. Due to structured concurrency, it is impossible to leak a coroutine from an intermediate step

我们使用 take 操作符修改一下上面的示例,以便仅查看前两个元素,然后再收集该 Flow 两次:

1
2
3
4
5
6
7
8
9
10
import kotlinx.coroutines.flow.collect

scope.launch {
val repeatableFlow = makeFlow().take(2) // we only care about the first two elements
println("first collection")
repeatableFlow.collect()
println("collecting again")
repeatableFlow.collect()
println("second collection completed")
}

程序运行结果如下:

1
2
3
4
5
6
7
first collection
sending first value
first value collected, sending another value
collecting again
sending first value
first value collected, sending another value
second collection completed

每次调用 collect 时,flow lambda 都会从头开始执行。如果 Flow 执行高成本的工作(例如发出网络请求),则这一点非常重要。此外,由于我们应用了 take(2) 操作符,因此该 flow 将只生成两个值。第二次调用 emit 后,将不会再次恢复 flow lambda,因此将不会再输出“second value collected…”这一行。

默认情况下,每次应用末端操作符时,Flow 都会从头重新开始执行。如果 Flow 执行高成本的工作(如发出网络请求),则这一点非常重要。

Flow 构建器

创建 Flow 数据流最简单的方法是使用 flow 构建器:

1
2
3
public fun <T> flow(
@BuilderInference block: suspend FlowCollector<T>.() -> Unit
): Flow<T> = SafeFlow(block)

其参数使用 suspend 修饰,是以 FlowCollector 作为接收者的挂起函数,能直接调用 emit 发送数据。

点击右上角运行按钮可以查看程序运行结果。

simple 函数会快速返回,并且不会等待任何内容。每次收集( collect) 流时,流都会重新开始运行。这就是为什么每次调用 collect 时都会输出 “Flow started”。

根据上述代码我们还可知如下内容:

  • Flow 类型的构建器函数名为 flow
  • flow { ... } 构建块中的代码可以挂起。
  • 函数 simple 不需要使用 suspend 修饰符。
  • 流使用 emit 函数 发射 值。
  • 流使用 collect 函数 收集 值。

其它创建 Flow 的方式还有很多,例如 asFlowflowOf 等。

此外,官方还提供了很多将类型转换到流的扩展函数。大家可以查看下官方文档。

说明:

  • flow 函数的内部定义我们可以看出,block 是由 suspend 修饰的,所以内部可以调用挂起函数。

  • flow 代码块的 emit 函数是线程不安全的,所以 flow 函数的不能修改协程上下文,无法调用如 withContext 等函数,避免下游 collect 被调度到其他线程。

  • 如果要修改数据流的协程调度,只能调用 flowOn 函数。

Flow 的取消机制

Flow 的取消机制与协程的一致。在可取消挂起函数(例如: delay)中,Flowcollect 操作是可以被取消的。

示例:

点击右上角运行按钮可以查看程序运行结果。

通过运行结果可以看出,程序运行 250ms 后,Flow 直接被取消了,Emitting 33 并没有输出。

通过 flow 构建器创建的 Flow,会在每次 emit 值时,执行额外的 ensureActive 检测,从而可以取消 Flow

再看一个示例:

点击右上角运行按钮可以查看程序运行结果。

通过运行结果可以看到,当 value3 时,虽然我们调用了 cancel() 函数,但是 Flow 并没有立刻被取消,而是在下一次发送值时,也就是执行 emit(4) 时,Flow 才被取消。

然而,出于性能考虑,大多数 Flow 操作符并不会进行额外的取消检测。例如,如果您使用 IntRange.asFlow 扩展来编写相同的循环,由于没有挂起操作,因此就不会有取消检测:

当在大量循环操作中使用 Flow 时,必须显示的进行取消检测。例如,可以通过添加 .onEach { currentCoroutineContext().ensureActive() } 来达到此目的。不过,有一个更简单的做法,那就是使用 cancellable 操作符:

运行结果表明,输出 3 之后,由于调用了 cancel() 因此产生 JobCancellationException 异常导致程序异常退出。

中端流操作符

可以使用操作符转换流,就像转换集合序列一样。 过渡操作符应用于上游流,并返回下游流。 这些操作符也是冷操作符,就像流一样。这类操作符本身不是挂起函数。它运行的速度很快,返回新的转换流的定义。

基础的中端流操作符名字看起来很熟悉,比如 mapfilter。 这些操作符与序列的主要区别在于,这些操作符中的代码可以调用挂起函数。

举例来说,一个接收请求的流可以使用 map 操作符转换结果,即使是通过挂起函数,实现一个长时间的请求操作:

转换操作符 (Transform operator)

在流的转换操作符中,最通用的是称为 transform 的操作符。它可以用于模仿像 mapfilter 这样的简单转换,以及实现更复杂的转换。使用 transform 操作符,我们可以发出任意值,并可发出任意次。

通过 transform 操作符,我们可以对流中的值进行更灵活的转换。这允许我们根据需要发出零个、一个或多个新值,甚至可以在一个单一的 emit 操作中多次发出不同的值。这种操作符在需要进行自定义、动态的流转换时非常有用,提供了更高级别的灵活性和控制权。

限长操作符 (Size-limiting operators)

限长中端操作符(例如 take)在达到相应限制时会取消流的执行。在协程中,这种取消操作始终会抛出一个异常,从而取消流的执行,在 catch 块中,我们可以捕获这种异常。这种异常机制确保了在取消流的情况下,资源管理函数(如 try {...} finally {...} 块)会正常运行,从而确保了程序的正确性和可靠性:

末端流操作符

由于 Flow冷流,因此必须要调用末端操作符(例如,collect)才会执行数据流的生产操作。

  • 数据流的创建与数据流的消费是成对出现的
  • 多个数据流订阅消费,也会同样有多个数据源生产创建

流的末端操作符是一种启动流收集的挂起函数collect 操作符是最基础的一个,另外还有一些更方便使用的末端操作符:

  • 将流转换为各种集合,例如 toListtoSet

  • 获取流的第一个值的操作符 first。确保流只发送一个值的操作符 single

  • 使用 reducefold 将流规约到单个值的操作符。

例如:

流是顺序执行的

流的每次单独收集操作都是顺序执行的,除非使用了能够操作多个流的特殊操作符(例如 buffer)。收集操作直接在调用末端操作符的协程中执行。默认情况下不会启动新协程。每个发送 (emit) 的值都会经过从上游到下游的所有中端操作符的处理,最后传递给末端操作符处理。

请看以下示例,该示例对偶数进行过滤并将它们映射为字符串:

请点击右上角运行按钮,查看运行结果,了解流的具体执行过程。

流上下文

流的收集始终在调用协程的上下文中进行。

例如,如果有一个 simple 流,那么下面的代码会在指定的上下文 context 中运行,而不受 simple 流实现细节的影响:

1
2
3
4
5
withContext(context) {
simple().collect { value ->
println(value) // run in the specified context
}
}

流的这种属性称为上下文保留 (context preservation) 。

默认情况下,flow { ... } 构建器中的代码是运行在相应流的收集器提供的上下文中的。让我们来看一个示例:

collect { ... } 中的代码是允许切换上下文的。

切换上下文的常见陷阱

对于那些需要长时间运行且消耗 CPU 的代码,可能会在 Dispatchers.Default 上下文中执行,而更新 UI 的代码可能需要在 Dispatchers.Main 上下文中执行。通常情况下,可以使用 withContext 在 Kotlin 协程中切换上下文,但是在 flow { ... } 构建器中的代码遵循上下文保留属性,因此在 flow { ... } 构建器中的代码是不允许从不同的上下文发送数据的。

以下示例中的 simple() 流并不会完成切换上下文操作,相反还会产生异常:

flowOn 操作符

切换流运行的上下文(即 flow { ... } 构建器中的代码)的正确方法是使用 flowOn 函数。具体用法请看下面的示例:

本例中需要注意的是,flow { ... } 中的代码运行在后台线程,而收集操作则运行在主线程。

还有一点值得留意,flowOn 操作符改变了流的默认顺序性质 (default sequential nature),而默认情况下,流数据的产生 (emit) 和处理 (collect) 是发生在相同协程中的(详见之前的示例)。收集操作发生在(”coroutine#1”)中,而发送操作发生在另一个协程(”coroutine#2”)中,后者运行在与收集协程所在线程并发运行的另一个线程中。当使用 flowOn 操作符切换 CoroutineDispatcher 时,它为上游流创建了一个新的协程。

缓冲(buffer)

从整体上来讲,为了减小收集流所花费的时间,将流的不同部分在不同的协程中运行是一个行之有效的办法,特别是涉及到长时间运行的异步操作时。先来看一个示例:

在上例中,每产生一个数据耗时约 100ms,每处理一个数据耗时约 300ms。对于 3 个数据,流的总体运行时间约为 1200ms = 3 * (100ms + 300ms) 左右。

这里我们可以引入 buffer 操作符,改变流的运行顺序,让流的发送代码与收集代码并行运行。来看下面的示例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
import kotlinx.coroutines.*
import kotlinx.coroutines.flow.*
import kotlin.system.*

fun simple(): Flow<Int> = flow {
for (i in 1..3) {
delay(100) // pretend we are asynchronously waiting 100 ms
emit(i) // emit next value
}
}

fun main() = runBlocking<Unit> {

val time = measureTimeMillis {
simple()
.buffer() // buffer emissions, don't wait
.collect { value ->
delay(300) // pretend we are processing it for 300 ms
println(value)
}
}
println("Collected in $time ms")

}

运行结果如下:

1
2
3
4
1
2
3
Collected in 1067 ms

让我们来分析一下执行过程:

在这个示例中,由于我们使用了 buffer 操作符,因此流的运行顺序不再像默认情况那样,需要等待收集处理完数据后,才可以再次发送数据。使用 buffer 操作符后,流的运行顺序变为发送数据无需等待收集处理结束,便可以直接开始下一次的数据发送。buffer 操作符让流的发送部分与收集部分并行运行了。

所以上例中,发送第一个数据前等待了 100ms,之后不需要等待 emit(1) 的收集处理完成(也就是无需等待 collect 处理所需要的 300ms ),便可直接开始下一次循环。也就是说在大约 300ms 时间,3 个数据就可以全部发送出去。而收集部分每处理一个数据还是需要花费 300ms 的。

由于发送与收集现在是并行的,在发送方 100ms 后发送第一个数据开始,收集方就开始处理数据(处理花费 300ms),与此同时发送方无需等待继续开始下一次的数据发送。因此流的总体运行时间大约为 100ms + 3 * 300ms = 1000ms。

我们可以用一个时间轴来直观的看一下程序随着时间运行的过程(理想情况,不考虑每条命令自身花费时间):

1
2
3
4
5
└──────┸──────┸──────┸──────┸──────┸──────┸──────┸──────┸──────┸──────┘
0    ┆   200   ┆   400   600 ┆   800    1000ms
   emit(1) ┆ emit(3) ┆ ┆ ┆
┆ emit(2)    ┆           ┆ ┆
同步执行collect print#1 print#2 print#3

注意,当我们使用 flowOn 操作符切换 CoroutineDispatcher 时,flowOn 操作符会使用相同的缓冲机制,只不过 buffer 操作符不会改变执行的上下文。

合并(conflate)

有时我们可能不需要在 collect 时处理每个被发送过来的值,而只需要处理最近较新的值(only most recent ones)。在这种情况下,可以使用 conflate 操作符,在收集器处理速度较慢时跳过中间值。让我们基于之前的示例进行演示,新代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
import kotlinx.coroutines.*
import kotlinx.coroutines.flow.*
import kotlin.system.*

fun simple(): Flow<Int> = flow {
for (i in 1..3) {
delay(100) // pretend we are asynchronously waiting 100 ms
emit(i) // emit next value
}
}

fun main() = runBlocking<Unit> {
val time = measureTimeMillis {
simple()
.conflate() // conflate emissions, don't process each one
.collect { value ->
delay(300) // pretend we are processing it for 300 ms
println(value)
}
}
println("Collected in $time ms")
}

这个示例中,仅把 buffer() 替换成成了 conflate(),让我们看一下运行结果:

1
2
3
1
3
Collected in 758 ms

可以看到,在第一条数据处理过程中,已经产生了第二条和第三条数据,因此当第一条数据处理完之后,已经有了两条数据待处理,因此当继续收集数据时,第二个数据就成了被合并 (conflated) 的数据,只有最新的数据(第三条数据)发送给收集器进行处理。

让我们再用一个时间轴来直观的看一下程序随着时间运行的过程(理想情况,不考虑每条命令自身花费时间):

1
2
3
4
5
└──────┸──────┸──────┸──────┸──────┸──────┸──────┸──────┸──────┸──────┘
0    ┆   200   ┆   400   600 ┆   800    1000ms
   emit(1) ┆ emit(3) ┆ conflated#2
┆ emit(2)    ┆           ┆
同步执行collect print#1 print#3

最后让我们看一下 conflate 函数的实现:

1
public fun <T> Flow<T>.conflate(): Flow<T> = buffer(CONFLATED)

可以看到 conflate 的本质还是 buffer,只不过将 buffer 的第一个参数 capacity 设置成了 CONFLATED 而已。而 buffercapacity 参数默认值是 BUFFERED

处理最新值(xxxLatest)

当发射器和收集器的处理都较慢时,合并(Conflation)是加快处理速度的一种方法,它通过丢弃发出的值来实现。另一种方法是在每次发出新值时取消较慢的收集器并重新启动它。Flow API 中有一系列的 xxxLatest 操作符,其执行与 xxx 操作符相同的基本逻辑,但会在有新值时取消其代码块的执行

让我们尝试将上一个示例中的 conflate 替换成 collectLatest,新示例如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
import kotlinx.coroutines.*
import kotlinx.coroutines.flow.*
import kotlin.system.*

fun simple(): Flow<Int> = flow {
for (i in 1..3) {
delay(100) // pretend we are asynchronously waiting 100 ms
emit(i) // emit next value
}
}

fun main() = runBlocking<Unit> {
val time = measureTimeMillis {
simple()
.collectLatest { value -> // cancel & restart on the latest value
println("Collecting $value")
delay(300) // pretend we are processing it for 300 ms
println("Done $value")
}
}
println("Collected in $time ms")
}

运行结果如下:

1
2
3
4
5
Collecting 1
Collecting 2
Collecting 3
Done 3
Collected in 685 ms

由于 collectLatest 的函数体需要花费 300 毫秒,但是每 100 毫秒才发送一次值,所以我们看到每个发送过来的值虽然都可以进入 collectLatest 代码块,但是只有最后一个值可以完整的被收集器处理。

让我们再用一个时间轴来直观的看一下程序随着时间运行的过程(理想情况,不考虑每条命令自身花费时间):

1
2
3
4
5
6
7
8
9
10
11
└──────────┸──────────┸──────────┸──────────┸──────────┸──────────┸──────────┸──────────┘
0    ┆    200    ┆    400    600   800ms
       ┆    ┆  ┆          ┆
   emit(1) emit(2) emit(3) ┆
┆    ┆       ┆          ┆
同步执行collect#1  ┆ 取消执行collect#2    ┆
┆ 取消执行collect#1  ┆          ┆
┆ 同步执行collect#2  ┆          ┆
       ┆    ┆ 同步执行collect#3          ┆
       ┆    ┆  ┆          ┆
print#1   print#2 print#3 Done#3

最后,以 collectLatest 为例,让我们看一下它的函数实现:

1
2
3
public suspend fun <T> Flow<T>.collectLatest(action: suspend (value: T) -> Unit) {
mapLatest(action).buffer(0).collect()
}

可以看到,collectLatest 内部依然使用了 buffer,并将 capacity 参数的值设置成了 0

组合多个流

组合多个流有很多种方式。

Zip

就像 Kotlin 标准库中的 Sequence.zip 扩展函数一样, 流同样拥有一个 zip 操作符,用于组合两个流中的相关值。请看如下示例

1
2
3
4
5
6
7
8
9
10
import kotlinx.coroutines.*
import kotlinx.coroutines.flow.*

fun main() = runBlocking<Unit> {

val nums = (1..3).asFlow() // numbers 1..3
val strs = flowOf("one", "two", "three") // strings
nums.zip(strs) { a, b -> "$a -> $b" } // compose a single string
.collect { println(it) } // collect and print
}

执行结果如下:

1
2
3
1 -> one
2 -> two
3 -> three

Combine

Flow 代表变量或操作的最近较新值时 (most recent value)(参见 conflate 部分),可能需要执行一个计算操作,该计算依赖于相应 Flow 的最近较新值,并在任何上游 Flow 发出值时重新计算它。对应的操作符系列 (corresponding family of operators) 被称为 combine

对于先前那个例子,如果 nums 每 300 毫秒更新一次,但 strs 每 400 毫秒更新一次,然后再使用 zip 操作符合并它们,仍会产生相同的结果,只不过结果每 400 毫秒才打印一次。修改后的代码如下:

在这个示例中,我们使用了 onEach 中端操作符来延时发送每个元素,这会让代码更加简洁明了。

1
2
3
4
5
6
7
8
9
10
11
12
13
import kotlinx.coroutines.*
import kotlinx.coroutines.flow.*

fun main() = runBlocking<Unit> {

val nums = (1..3).asFlow().onEach { delay(300) } // numbers 1..3 every 300 ms
val strs = flowOf("one", "two", "three").onEach { delay(400) } // strings every 400 ms
val startTime = System.currentTimeMillis() // remember the start time
nums.zip(strs) { a, b -> "$a -> $b" } // compose a single string with "zip"
.collect { value -> // collect and print
println("$value at ${System.currentTimeMillis() - startTime} ms from start")
}
}

运行结果如下:

1
2
3
1 -> one at 445 ms from start
2 -> two at 845 ms from start
3 -> three at 1249 ms from start

如果我们再将例子中的 zip 替换成 combine 的话,结果又会如何呢?修改后的代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
import kotlinx.coroutines.*
import kotlinx.coroutines.flow.*

fun main() = runBlocking<Unit> {

val nums = (1..3).asFlow().onEach { delay(300) } // numbers 1..3 every 300 ms
val strs = flowOf("one", "two", "three").onEach { delay(400) } // strings every 400 ms
val startTime = System.currentTimeMillis() // remember the start time
nums.combine(strs) { a, b -> "$a -> $b" } // compose a single string with "combine"
.collect { value -> // collect and print
println("$value at ${System.currentTimeMillis() - startTime} ms from start")
}
}

执行结果如下:

1
2
3
4
5
1 -> one at 444 ms from start
2 -> one at 646 ms from start
2 -> two at 845 ms from start
3 -> two at 947 ms from start
3 -> three at 1246 ms from start

可以看到,我们得到了完全不同的结果。numsstrs 流中的每次数据发送,都会打印一行结果。

让我们再用一个时间轴来直观的看一下程序随着时间运行的过程(理想情况,不考虑每条命令自身花费时间):

1
2
3
4
5
6
7
└──────┸──────┸──────┸──────┸──────┸──────┸──────┸──────┸──────┸──────┸──────┸──────┘
0       200   ┆   400   600    800  ┆   1000 1200ms
             ┆    ┆ ┆ ┆ ┆ ┆
   emit(1) ┆ emit(2) ┆ emit(3) ┆
emit(one)      ┆ emit(two) ┆ emit(three)
  ┆ ┆ ┆ ┆ ┆
1->one 2->one 2->two 3->two 3->three

展平流 (Flattening flows)

流代表异步接收的值序列,因此很容易遇到这样的情况: 每个值都会触发对另一个值序列的请求。例如,以下函数会返回两个字符串的流,返回间隔 500 毫秒:

1
2
3
4
5
fun requestFlow(i: Int): Flow<String> = flow {
emit("$i: First")
delay(500) // wait 500 ms
emit("$i: Second")
}

现在,如果我们有一个包含三个整数的流,并对每个整数调用 requestFlow,如下所示:

1
val flowWithMap: Flow<Flow<String>> = (1..3).asFlow().map { requestFlow(it) }

这样我们就得到了一个包含流的流 (Flow<Flow<String>>),若需要对其做进一步处理的话,需要先将其展平 (flatten) 为单一流。集合和序列都有 flattenflatMap 操作符来实现这一点。然而,由于流的异步性质,我们需要不同的展平模式,因此就需要一系列应用在流上的展平操作符。

flatMapConcat

将流的流串联由 flatMapConcatflattenConcat 操作符提供。它们是相应序列操作符的最直接类比。它们在等待内部流完成之后才开始收集下一个值,请看下面的示例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
import kotlinx.coroutines.*
import kotlinx.coroutines.flow.*

fun requestFlow(i: Int): Flow<String> = flow {
emit("$i: First")
delay(500) // wait 500 ms
emit("$i: Second")
}

fun main() = runBlocking<Unit> {

val startTime = System.currentTimeMillis() // remember the start time
(1..3).asFlow().onEach { delay(100) } // emit a number every 100 ms
.flatMapConcat { requestFlow(it) }
.collect { value -> // collect and print
println("$value at ${System.currentTimeMillis() - startTime} ms from start")
}

}

运行结果如下:

1
2
3
4
5
6
1: First at 121 ms from start
1: Second at 622 ms from start
2: First at 727 ms from start
2: Second at 1227 ms from start
3: First at 1328 ms from start
3: Second at 1829 ms from start

通过输出结果,我们可以清晰地看到 flatMapConcat 的顺序特性。

flatMapMerge

另一种展平操作是并发收集所有传入的流,并将它们的值合并到一个单独的流,以便尽快的发出值。 它由 flatMapMergeflattenMerge 操作符实现。它们都接受一个可选的并发参数 concurrency(默认情况下,它等于 DEFAULT_CONCURRENCY),该参数限制了同时收集的流的数量。

来看下面的示例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
import kotlinx.coroutines.*
import kotlinx.coroutines.flow.*

fun requestFlow(i: Int): Flow<String> = flow {
emit("$i: First")
delay(500) // wait 500 ms
emit("$i: Second")
}

fun main() = runBlocking<Unit> {

val startTime = System.currentTimeMillis() // remember the start time
(1..3).asFlow().onEach { delay(100) } // a number every 100 ms
.flatMapMerge { requestFlow(it) }
.collect { value -> // collect and print
println("$value at ${System.currentTimeMillis() - startTime} ms from start")
}

}

运行结果如下:

1
2
3
4
5
6
1: First at 173 ms from start
2: First at 269 ms from start
3: First at 370 ms from start
1: Second at 673 ms from start
2: Second at 770 ms from start
3: Second at 872 ms from start

可以明显的看到 flatMapMerge 的同步特性。

请注意,flatMapMerge 虽然是按顺序调用其代码块(即本例中的 { requestFlow(it) }),但会同时并发收集结果流,相当于执行顺序是首先执行 map { requestFlow(it) },然后在其返回结果上调用 flattenMerge

flatMapLatest

collectLatest 操作符类似(在“处理最新值”一节中已经描述过),同样存在相对应的“最新” (Latest) 展平模式,在发出新流后立即取消先前流的收集。 这由 flatMapLatest 操作符来实现。

请看下面的示例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
import kotlinx.coroutines.*
import kotlinx.coroutines.flow.*

fun requestFlow(i: Int): Flow<String> = flow {
emit("$i: First")
delay(500) // wait 500 ms
emit("$i: Second")
}

fun main() = runBlocking<Unit> {

val startTime = System.currentTimeMillis() // remember the start time
(1..3).asFlow().onEach { delay(100) } // a number every 100 ms
.flatMapLatest { requestFlow(it) }
.collect { value -> // collect and print
println("$value at ${System.currentTimeMillis() - startTime} ms from start")
}

}

执行结果如下:

1
2
3
4
1: First at 164 ms from start
2: First at 301 ms from start
3: First at 403 ms from start
3: Second at 903 ms from start

上面这个例子非常好的展示了 flatMapLatest 的工作方式。

请注意,flatMapLatest 在收到一个新值时,会取消块中的所有代码 (即本例中的 { requestFlow(it) })。在上面这个特定示例中可能看不出有什么影响,因为调用 requestFlow 自身是快速的,非挂起的,且不能被取消。然而,如果我们在 requestFlow 中使用类似 delay 这样的挂起函数的话,那么输出结果的差异将会显现出来。

Android 上的 Kotlin 数据流

数据流 Flow 包含三个重要角色:

  • 数据提供方:生成添加到数据流中的数据。得益于协程,数据流还可以异步生成数据。
  • 中介(可选):可修改发送到数据流的值,或修正数据流本身。
  • 数据使用方:使用数据流中的值。

StateFlow

StateFlow 是一个状态容器可观察数据流,可向其收集器发出当前状态更新和新状态更新。也可以通过其 value 属性读取当前状态值。如需更新状态并将其发送到数据流,请为 MutableStateFlow 类的 value 属性分配一个新值。

在 Android 中,StateFlow 非常适合需要让可变状态保持可观察的类。

文章中有如下一段话,很好的总结了什么时候应该用 StateFlow

When exposing UI state to a view, use StateFlow. It’s a safe and efficient observer designed to hold UI state.

如需将任何数据流转换为 StateFlow,请使用 stateIn 中间操作符。

利用 shareIn 使冷数据流变为热数据流

StateFlow 是热数据流,只要该数据流被收集,或对它的任何其他引用在垃圾回收根中存在,该数据流就会一直存于内存中。您可以使用 shareIn 操作符将冷数据流变为热数据流。

SharedFlow

shareIn 函数会返回一个热数据流 SharedFlow,此数据流会向从其中收集值的所有使用方发出数据。SharedFlowStateFlow 的可配置性极高的泛化数据流。

您无需使用 shareIn 即可创建 SharedFlow

Flow 常用 API

可参见官方文档。也可参见中文文档

  • stateIn
  • shareIn
  • flatMapLatest (switchMap has been deprecated)
  • transformLatest
  • collectAsStateWithLifecycle
  • combine
    • combineTransform
    • zip
  • filter
  • map
  • onEach
  • flowOn
  • transform

数据流收集被中止原因

数据流收集可能会由于以下原因而停止:

  • 收集数据的协程被取消,此操作也会让 数据提供方 停止活动。
  • 数据提供方完成发出数据项。在这种情况下,数据流将关闭,调用 collect 的协程则继续执行。

捕获异常

如需处理异常,可以使用 catch 操作符,如:

1
2
3
4
5
6
7
8
9
10
11
fun getNewsData() {
viewModelScope.launch {
remoteRepository.news
.catch {
// todo 在这里收集异常
}
.collect {
newsData.value = it
}
}
}

参考文献

坚持原创及高品质技术分享,您的支持将鼓励我继续创作!