0%

Kotlin 协程使用详解

阅读指导

本文仍在进一步完善中…(要详解的内容实在是太多了,之后有可能分成多个文章讲解,写在一篇文章里实在是太长太长了。)

本文并不是讲解“Kotlin 协程”的底层实现,而是从应用层面详细的讲解协程的使用方法及相关知识。

引言

协程”是 Kotlin 中非常重要的一个功能,它大大简化了并发程序设计的难度。

协程说简单也很简单,因为只是做简单的异步或并发处理的话,它的用法简单到不行。但是要应对复杂的场景,就需要对协程有深入的了解。

直到我深入学习了 Go 语言后,我觉得非常必要再重新研究下 Kotlin 的协程,之前是自己浅薄了。不知者无畏啊!

下面先让我们一起来做一些基础测验,了解下您对协程的熟悉程度。如果您能轻松回答对下面的每一道题,那么恭喜您,您可以直接跳过本文,否则建议您继续阅读后文。

虽然本文篇幅非常长,不过还是建议大家仔细阅读,一定会让您有所心得。

示例1:协程执行顺序

请写出下例中的输出结果。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
fun main(): Unit = runBlocking {
println("check point 1")
CoroutineScope(Dispatchers.Default).launch {
delay(10)
println("check point 2")
val job: Job = launch {
delay(100)
println("check point 3")
}
coroutineScope {
delay(200)
println("check point 4")
}
println("check point 5")
withContext(Dispatchers.IO) {
delay(300)
println("check point 6")
}
println("check point 7")
job.invokeOnCompletion { println("check point 8") }
println("check point 9")
}
println("check point 10")

delay(1000)
}

点击右上角运行图标并滑动滚动条查看运行结果。

示例2:Job 的相等关系

请指明以下相等关系是否成立。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
fun main(): Unit = runBlocking {
val name = CoroutineName("my-coroutine")
val job1 = Job()
var job2: Job? = null
val job3 = launch(name + job1) {
val cn = coroutineContext[CoroutineName]
println("cn == name = ${cn == name}")
job2 = coroutineContext[Job]
println("job1 == job2 = ${job1 == job2}")
}
job3.join()
println("job1 == job3 = ${job1 == job3}")
println("job3 == job2 = ${job3 == job2}")
}

请运行并查看结果。希望您答对了。

示例3:协程的基本取消操作

请写出以下示例的输出结果。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
fun main(): Unit = runBlocking {
CoroutineScope(Dispatchers.Default).launch {
val job: Job = launch {
for(i in 0 until 5) {
Thread.sleep(100)
println("index: $i")
}
}
job.invokeOnCompletion {
println("I'm completed.")
}
job.cancel("cancel by manual")
println("job.isActive=${job.isActive} job.isCancelled=${job.isCancelled} job.isCompleted=${job.isCompleted}")
}

delay(1000)
}

请运行并查看结果。希望您回答对了。

示例4:协程的取消与子协程数

请写出以下示例的运行结果。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
fun main(): Unit = runBlocking {
val parentJob = coroutineContext.job
val job = launch {
println("I'm the first job.")
delay(100)
println("The first job is done.")
}
launch {
println("I'm the second job.")
delay(200)
println("The second job is done.")
}
delay(50)
job.cancel()
println("job.isCancelled=${job.isCancelled} job.isCompleted=${job.isCompleted} Number of children: ${parentJob.children.count()}")

job.join()
println("job.isCancelled=${job.isCancelled} job.isCompleted=${job.isCompleted} Number of children: ${parentJob.children.count()}")

delay(400)
println("Number of children: ${parentJob.children.count()}")
}

请运行并查看结果。

示例5:关于 Job

请写出以下示例的运行结果,并说明原因。

1
2
3
4
5
6
7
8
9
10
11
12
13
fun main(): Unit = runBlocking {
val job = Job()
launch(job) {
delay(500)
println("I'm a job.")
}
launch(job) {
delay(1000)
println("I'm another job")
}
job.join()
println("Program exit.")
}

请选中下面的空白文本查看运行结果:

I'm a job.
I'm another job
(waiting here forever)  

希望您回答对了。

示例6:关于 SupervisorJob() 的异常处理

请写出以下示例的运行结果,并说明 childJob1 产生异常后,对 childJob2parentJobcs.launch 的影响,并解释其原因。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
fun main() {
val cs = CoroutineScope(SupervisorJob())
val parentJob = cs.launch {
val childJob1 = launch {
delay(100)
println("Child 1 is going to die...")
throw AssertionError("exception from child job 1")
}
val childJob2 = launch {
delay(200)
println("Child 2 is done...")
}
delay(300)
println("I'm working on parentJob...")
childJob1.invokeOnCompletion { println("childJob1=$childJob1 cause: $it") }
childJob2.invokeOnCompletion { println("childJob2=$childJob2 cause: $it") }
}
cs.launch {
delay(300)
println("I'm another parent job...")
}
parentJob.invokeOnCompletion { println("parentJob=$parentJob cause: $it") }
Thread.sleep(1000)
}

请运行并查看结果。希望您能回答正确。

示例7:父子 Job 及 SupervisorJob() 异常处理

请写出以下示例的运行结果,并说明 childJob1 产生异常后,对 childJob2parentJobanotherParentJob 的影响,并解释其原因。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
fun main() {
val cs = CoroutineScope(Dispatchers.IO)
val parentJob = cs.launch(SupervisorJob()) {
val childJob1 = launch {
delay(100)
throw AssertionError("exception from child job 1")
}
val childJob2 = launch {
delay(200)
println("Child 2 is done...")
}
childJob1.invokeOnCompletion { println("childJob1=$childJob1 cause: ${it?.message}") }
childJob2.invokeOnCompletion { println("childJob2=$childJob2 cause: ${it?.message}") }
}
val anotherParentJob = cs.launch {
delay(400)
println("I'm another job...")
}
parentJob.invokeOnCompletion { println("parentJob=$parentJob cause: ${it?.message}") }
anotherParentJob.invokeOnCompletion { println("anotherParentJob=$parentJob cause: ${it?.message}") }
Thread.sleep(1000)
}

请运行并查看结果。希望您回答正确。

示例8:SupervisorJob() 的异常处理及执行顺序

请写出以下示例的执行结果。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
fun main() {
val sj = SupervisorJob()
val cs = CoroutineScope(sj)
cs.launch {
delay(100)
log("===> 2")
throw AssertionError("exception from child job 1")
}
cs.launch {
delay(150)
supervisorScope {
launch {
delay(200)
log("===> 4")
throw AssertionError("exception from child 1 in childJob2")
}
launch {
delay(300)
log("===> 5")
}
log("===> 3")
}
launch {
delay(100)
log("===> 7")
}
log("===> 6")
}
log("===> 1")
Thread.sleep(1000)
log("===> 8")
}

请运行并查看结果。希望您回答正确。

示例9:关于协程的取消

请写出以下示例的执行结果。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
fun main(): Unit = runBlocking {
val exceptionHandler = CoroutineExceptionHandler { _, e -> println("exceptionHandler") }
val cs = CoroutineScope(Job() + exceptionHandler)
val job = cs.launch {
try {
println("I'm working...")
delay(200)
println("Work done.")
} catch (err: CancellationException) {
println("Job has been cancelled.")
throw CancellationException("cancelled by user")
} finally {
println("Resources have been released safely.")
}
println("Job done.")
}
job.invokeOnCompletion { println("isCancelled=${job.isCancelled} isCompleted=${job.isCompleted}") }
delay(100)
job.cancelAndJoin()
println("Cancelled successfully")
}

请运行并查看结果。希望您回答正确。

示例10:关于 CoroutineExceptionHandler

请写出以下示例的执行结果,并说明原因。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
import kotlinx.coroutines.*

fun main() = runBlocking {
val handler = CoroutineExceptionHandler { _, err -> println("handle error: $err") }
val cs = CoroutineScope(SupervisorJob())
val job1 = cs.launch {
delay(50)
println("check point 1")
launch(handler) {
delay(50)
println("check point 2")
throw AssertionError("job1 exception")
}
delay(100)
println("check point 3")
}
job1.invokeOnCompletion { println("1 invokeOnCompletion cause: ${it?.message}") }
delay(300)
println("job1 done")
val job2 = cs.launch(handler) {
delay(200)
println("check point 4")
throw AssertionError("job2 exception")
}
job2.invokeOnCompletion { println("2 invokeOnCompletion cause: ${it?.message}") }
delay(300)
println("job2 done")
val job3 = cs.async(handler) {
delay(300)
println("check point 5")
throw AssertionError("job3 exception")
}
job3.invokeOnCompletion { println("3 invokeOnCompletion cause: ${it?.message}") }
val job3Result = job3.await()
println("job3Result=$job3Result")

delay(2000)
println("program exit")
}

请运行并查看结果。希望您回答正确。

示例11:关于 async 异常处理

请写出以下示例的执行结果,并说明原因。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
fun main() = runBlocking {
val scope = CoroutineScope(Job())
val deferred = scope.async {
println("async is working...")
throw IllegalAccessError("oops...")
}
try {
deferred.await()
} catch (e: IllegalAccessError) {
println("caught IllegalAccessError. cause: $e")
}
scope.launch { println("launch is working...") }
delay(1000)
println("program exit")
}

请运行并查看结果。希望您回答正确。

前言

协程是一种并发设计模式,您可以在 Android 平台上使用它来简化异步执行的代码。^1

A coroutine is a concurrency design pattern that you can use on Android to simplify code that executes asynchronously.

注意:协程并不是线程,也不是线程池,官网说的很清楚,协程是一种并发设计模式

但是从宏观角度来看,我们其实可以把它理解成线程。毕竟协程内部也是用线程池实现的。

PS:要想更好的了解协程,其中源码 JobSupport.kt 建议大家一定要看一下。

先来看一下学习“Kotlin 协程”的思维导图:

Kotlin 协程

协程作用域(CoroutineScope)

协程作用域(CoroutineScope)定义了协程的范围,用来追踪那些由 launchasync 启动的协程,并且在需要的时候可以调用 CoroutineScope.cancel() 方法随时取消由它启动的所有协程。

对于 Android 而言,协程作用域的功能显得格外重要,例如当用户离开界面时,某些异步操作尤其是那些比较耗时的异步操作可能就变得没有意义,因为用户一旦离开页面就不再需要显示它们的结果,因此我们可以取消那些不必要的协程,避免浪费系统资源。

协程的启动必须通过协程作用域。也就是说,要想使用协程,必须先创建一个对应的协程作用域才可以。可以把协程作用域理解成一个轻量版的 ExecutorService

CoroutineScope 是一个接口,用于定义一个新的协程作用域,是协程构建器的一部分。系统提供了一组协程相关的扩展函数,如 launch()async() 等。在使用协程时,我们会创建一个实现了 CoroutineScope 接口的类,通过它来管理协程的生命周期,例如启动、取消协程等。其接口定义如下:

1
2
3
public interface CoroutineScope {
public val coroutineContext: CoroutineContext
}

这里可以看出 CoroutineScope 保存了协程上下文。每一个协程构建器即协程 Builder(包括 launchasyncrunBlocking 等)都是 CoroutineScope 的扩展函数。协程 Builder 将继承 CoroutineScopecoroutineContext,并自动将该上下文传播到它的所有子元素,这种传播行为同样适用于协程的取消操作。

任何实现了 CoroutineScope 接口的类都可以作为协程作用域,例如 GlobalScopeCoroutineScope()MainScope() 等,我们可以通过它们来创建具体的 CoroutineScope 实例。

常用协程作用域

请注意:下文内容中字母的大小并没有写错。例如:coroutineScope 这里的首字母是小写的 c,并不是写错了,和刚才提到的 CoroutineScope 它们在用法及语义上有所不同,下文会有详细的讲解。

runBlocking

主要用于测试场景。不适用于日常开发。注意,在协程内部不要使用该该函数。详见官方文档

该函数是一个顶级函数(非挂起函数),也是一个高阶函数。它有两个参数,第一个参数是协程上下文。第二个参数是带有接收者的函数类型,具体为接收者是 CoroutineScope 的函数字面值,可用于启动协程。但是它会阻塞当前线程,但其内部运行的协程是非阻塞的。只有当内部相同作用域的所有协程都运行结束后,声明在 runBlocking 之后的代码才能执行。

GlobalScope

实际业务开发中不推荐使用该作用域。

全局协程作用域,通过 GlobalScope 创建的协程不会有父协程,可以把它称为根协程。它启动的协程是顶级协程,其生命周期只受整个应用程序的生命周期的限制,且不能取消。因此如果该作用域内的协程执行耗时的操作时,由于无法取消,因此会持续消耗内存资源,这可能会导致内存泄露,所以不适用于业务开发。

GlobalScope 采用单例实现,内部使用空上下文,即 EmptyCoroutineContext,且不含 Job

1
2
3
4
public object GlobalScope : CoroutineScope {
override val coroutineContext: CoroutineContext
get() = EmptyCoroutineContext
}

coroutineScope

用于创建一个新的协程作用域,并暂停当前协程,直到所有在该作用域中启动的协程完成或取消。

该函数是一个挂起函数,需要运行在协程内或其它挂起函数内。当这个作用域中的任何一个子协程失败时,则这个作用域及其内的所有子协程都会被取消。

coroutineScope 的外层协程被取消了,则 coroutineScope 内部的所有协程都会被自动取消。

该协程作用域内的上下文继承自外部协程的上下文,不过会覆盖其中的 Job

1
public suspend fun <R> coroutineScope(block: suspend CoroutineScope.() -> R): R

该函数是为并行分解工作而设计的,通常用于在协程中启动其他协程,并等待它们完成,然后再继续执行后面的代码。官方文档也给出了具体用法:

1
2
3
4
5
6
7
8
9
10
11
suspend fun showSomeData() = coroutineScope {
val data = async(Dispatchers.IO) { // <- extension on current scope
... load some UI data for the Main thread ...
}

withContext(Dispatchers.Main) {
doSomeWork()
val result = data.await()
display(result)
}
}

官方同样对上述代码进行了详细的解释:

  1. 一旦数据加载并显示在 UI 中,showSomeData 就会返回。
  2. 如果 doSomeWork 抛出异常,则会取消 async 并且 showSomeData 会重新抛出该异常。
  3. 如果 showSomeData 的外部协程被取消,若 asyncwithContext 已经启动的话,则它们都将被取消。
  4. 如果 async 失败了,则 withContext 将被取消。

supervisorScope

coroutineScope 类似,不同之处在于 supervisorScope 作用域下的子协程的异常不会影响父协程,也不会影响其他子协程。(作用域本身的失败(在 block 或取消中抛出异常)会导致作用域本身及其所有子协程失败,但不会取消父协程。)

1
public suspend fun <R> supervisorScope(block: suspend CoroutineScope.() -> R): R

CoroutineScope() 工厂方法

使用指定的上下文创建一个协程作用域。如果指定的上下文中不包含 Job 元素时,则会为其追加一个 Job() 元素。如果该作用域下的任一子协程运行失败或者它本身被取消了,那么其内所有子协程都将被取消,和 coroutineScope 的处理方式一样。

MainScope() 工厂方法

ActivityFragment 中推荐使用。

为 UI 组件创建主作用域,是一个顶级函数,上下文是 SupervisorJob() + Dispatchers.Main,说明它是一个在主线程运行的协程作用域,通过 cancel 对协程进行取消。

1
public fun MainScope(): CoroutineScope = ContextScope(SupervisorJob() + Dispatchers.Main)

MainScope 作用域的好处就是方便地绑定到 UI 组件的声明周期上,并且在 Activity 销毁的时候通过调用 mainScope.cancel() 取消其作用域。

1
2
3
4
5
6
7
8
9
class MyActivity {
// 还可以通过 '+' 附加其它上下文,例如:MainScope() + CoroutineName("MyActivity")
private val scope = MainScope()

override fun onDestroy() {
super.onDestroy()
scope.cancel()
}
}

lifecycleScope

ActivityFragment 中推荐使用。

该扩展属性是 AndroidLifecycle Ktx 库提供的具有生命周期感知的协程作用域,与 LifecycleOwnerlifecycle 绑定。当 lifecycle 被销毁时,该作用域也将被自动取消。

这是在 ActivityFragment 中推荐使用的作用域,因为它会与当前的 UI 组件绑定生命周期,当界面销毁时,该协程作用域也将被自动取消,不会造成协程泄漏。具体相同作用的,还有接下来要提到的 viewModelScope

1
2
public val LifecycleOwner.lifecycleScope: LifecycleCoroutineScope
get() = lifecycle.coroutineScope

该作用域的上下文如下:

1
SupervisorJob() + Dispatchers.Main.immediate

viewModelScope

ViewModel 中推荐使用。

该作用域是 ViewModel 的扩展属性,同样来自 AndroidLifecycle Ktx 库。与 lifecycleScope 类似,只不过该作用域是与 ViewModel 绑定生命周期的,当 ViewModel 被清除时,该作用域也将被自动取消,因此也不会造成协程泄漏。

1
2
3
4
5
6
7
8
9
10
11
public val ViewModel.viewModelScope: CoroutineScope
get() {
val scope: CoroutineScope? = this.getTag(JOB_KEY)
if (scope != null) {
return scope
}
return setTagIfAbsent(
JOB_KEY,
CloseableCoroutineScope(SupervisorJob() + Dispatchers.Main.immediate)
)
}

该作用域的上下文与 lifecycleScope 的相同:

1
SupervisorJob() + Dispatchers.Main.immediate

coroutineScope,CoroutineScope 以及 CoroutineScope() 工厂方法的区别

其实看过上面的讲解后,您可能已经知道了它们之前的区别,不过在这里我们再简单的叙述下:

  • CoroutineScope:是一个接口,用于提供一个协程作用域,使其能够管理协程的生命周期。每一个协程构建器(包括 launchasync 等)都是 CoroutineScope 的扩展函数。任何实现了 CoroutineScope 接口的类都可以作为协程作用域。
  • coroutineScope:是一个挂起函数,用于创建一个新的协程作用域。其内启动的所有协程都会在该作用域结束时被取消。它会等待所有内部协程执行完成之后再返回,同时会将任何未捕获的异常传递给父协程来处理。其协程作用域内的上下文继承自外部协程的上下文,不过会覆盖其中的 Job。设计的目的是为了并行分解工作。
  • CoroutineScope():工厂方法,用来创建具体的 CoroutineScope 实例。其参数是协程上下文,如果该上下文中不包含 Job 元素时,则会为其追加一个 Job() 元素。可以在上下文中指定调度器,而 coroutineScope 的上下文是继承自外部协程的。

分类和行为规则

官方框架在实现复合协程的过程中也提供了作用域,主要用于明确父子关系,以及取消或者异常处理等方面的传播行为。该作用域分为以下三种:

  • 顶级作用域:没有父协程的协程所在的作用域为顶级作用域。

  • 协同作用域:协程中启动新的协程,新协程为所在协程的子协程,这种情况下,子协程所在的作用域默认为协同作用域。此时子协程抛出的未捕获异常,都将传递给父协程处理,父协程同时也会被取消。

  • 主从作用域:与协同作用域在协程的父子关系上一致,区别在于,处于该作用域下的协程出现未捕获的异常时,不会将异常向上传递给父协程。

除了三种作用域中提到的行为以外,父子协程之间还存在以下规则:

  • 父协程被取消,则所有子协程均被取消。由于协同作用域和主从作用域中都存在父子协程关系,因此此条规则都适用。
  • 父协程需要等待子协程执行完毕之后才会最终进入完成状态,不管父协程自身的协程体是否已经执行完。
  • 子协程会继承父协程的协程上下文中的元素,如果自身有相同 key 的成员,则覆盖对应的 key,覆盖的效果仅限自身范围内有效。

父协程和子协程

正确理解父子协程是非常非常非常重要的

当一个协程在另外一个协程的协程作用域中启动时,它将通过 CoroutineScope.coroutineContext 继承其上下文,新启动的协程就被称为子协程,子协程的 Job 将成为父协程 Job 的子 Job。父协程总是会等待其所有子协程都完成后才结束自身,所以父协程不必显式跟踪它启动的所有子协程,也不必使用 job.join() 在末尾等待子协程完成。

下例中,虽然 parentJob 启动的三个子协程的延时时间各不相同,但它们最终都会打印出日志。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
fun main(args: Array<String>) {
println("=====> Program start. <=====")
runBlocking {
println("runBlocking - start")

val parentJob = launch {
repeat(3) { i ->
launch {
delay((i + 1) * 200L)
println("Coroutine $i is done")
}
}
println("request: I'm done and I don't explicitly join my children that are still active")
}

println("runBlocking - end")
}
println("=====> Program exit. <=====")
}

输出结果如下:

1
2
3
4
5
6
7
8
=====> Program start. <=====
runBlocking - start
runBlocking - end
request: I'm done and I don't explicitly join my children that are still active
Coroutine 0 is done
Coroutine 1 is done
Coroutine 2 is done
=====> Program exit. <=====

Job 父子关系的相互影响

Job 父子关系的相互影响主要有以下几点:

  • 子协程从父协程继承上下文。
  • 父协程会挂起,直到所有子协程都完成。
  • 当父协程被取消时,所有子协程都将被取消。
  • 默认情况下,子 Job 由于异常CancellationException 除外)被取消时,其父 Job 也会被取消,从而导致该父 Job 内的其它子 Job 也被自动取消。这种默认的行为可以通过 SupervisorJob 来修改。

获取父 Job 与子 Job

Job 可以引用它的所有子 Job,同样子 Job 也可以引用父 Job。这种父子关系允许我们在协程范围内实现取消和异常处理。

示例1:

1
2
3
4
5
6
7
8
9
fun main(): Unit = runBlocking {
val parentJob: Job = coroutineContext.job // Get parent job
val childJob: Job = launch {
delay(500)
}
println(childJob == parentJob) // false
val parentChildren: Sequence<Job> = parentJob.children // Get all children jobs
println(parentChildren.first() == childJob) // true
}

示例2:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
fun main() {
var cs = CoroutineScope(Dispatchers.IO)
var parentJob = cs.coroutineContext.job // Get parent job
var childJob = cs.launch {
// Get parent job of current child job
val innerParentJob = coroutineContext.job
val innerChildJob = launch {
delay(500)
}
// Get first child job of the child coroutine
val innerFirstChild = innerParentJob.children.firstOrNull()
println("innerFirstChild==innerChildJob -> ${innerFirstChild == innerChildJob}") // true
}
var firstChild = parentJob.children.firstOrNull() // Get first child job
println("firstChild==childJob -> ${firstChild == childJob}") // true

Thread.sleep(1000)

parentJob = Job()
cs = CoroutineScope(Dispatchers.IO)
childJob = cs.launch(parentJob) {
// Get parent job of current child job
val innerParentJob = coroutineContext.job
val innerChildJob = launch {
delay(500)
}
// Get first child job of the child coroutine
val innerFirstChild = innerParentJob.children.firstOrNull()
println("innerFirstChild==innerChildJob -> ${innerFirstChild == innerChildJob}") // true
}
firstChild = parentJob.children.firstOrNull() // Get first child job
println("firstChild==childJob -> ${firstChild == childJob}") // true

Thread.sleep(1000)
}

获取子 Job 的前提

这里有一点需要特别注意:

Job 能获取到子 Job 的前提是子 Job 尚未处于 Completed 状态。

这一点官网文档中并未提及,也因此给我在做测试时带了不少困扰,一度让我怀疑人生。

切记,一旦子 Job 处于完成状态,无论是正常完成还是或由异常导致完成,该子 Job 会被父 Job 移除。这也是上个示例中,childJob 协程为什么要 delay 一段时间的原因,我们需要这个子协程处于未完成状态(即处于 NewActiveCancelling 状态,理论上应该也包括 Completing 状态,但是这个状态下不太好验证。),这样才能通过父 Job 获取到其子 Job

以下示例可以验证这一点:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
fun main() = runBlocking {
val parentJob: Job = coroutineContext.job // Get parent job
val childJob: Job = launch {
delay(500)
}
println("children=${parentJob.children.count()} childJob=$childJob isActive=${childJob.isActive} isCancelled=${childJob.isCancelled} isCompleted=${childJob.isCompleted}")
childJob.cancel()
println("children=${parentJob.children.count()} childJob=$childJob isActive=${childJob.isActive} isCancelled=${childJob.isCancelled} isCompleted=${childJob.isCompleted}")
childJob.join()
println("children=${parentJob.children.count()} childJob=$childJob isActive=${childJob.isActive} isCancelled=${childJob.isCancelled} isCompleted=${childJob.isCompleted}")

println("==========")
val anotherParentJob: Job = coroutineContext.job
val anotherChildJob: Job = launch(start = CoroutineStart.LAZY) {
delay(500)
}
println("children=${anotherParentJob.children.count()} childJob=$anotherChildJob isActive=${anotherChildJob.isActive} isCancelled=${anotherChildJob.isCancelled} isCompleted=${anotherChildJob.isCompleted}")
anotherChildJob.start()
println("children=${anotherParentJob.children.count()} childJob=$anotherChildJob isActive=${anotherChildJob.isActive} isCancelled=${anotherChildJob.isCancelled} isCompleted=${anotherChildJob.isCompleted}")
anotherChildJob.join()
println("children=${anotherParentJob.children.count()} childJob=$anotherChildJob isActive=${anotherChildJob.isActive} isCancelled=${anotherChildJob.isCancelled} isCompleted=${anotherChildJob.isCompleted}")
}

运行结果如下:

1
2
3
4
5
6
7
children=1 childJob=StandaloneCoroutine{Active}@6d1e7682 isActive=true  isCancelled=false  isCompleted=false
children=1 childJob=StandaloneCoroutine{Cancelling}@6d1e7682 isActive=false isCancelled=true isCompleted=false
children=0 childJob=StandaloneCoroutine{Cancelled}@6d1e7682 isActive=false isCancelled=true isCompleted=true
==========
children=1 childJob=LazyStandaloneCoroutine{New}@c39f790 isActive=false isCancelled=false isCompleted=false
children=1 childJob=LazyStandaloneCoroutine{Active}@c39f790 isActive=true isCancelled=false isCompleted=false
children=0 childJob=LazyStandaloneCoroutine{Completed}@c39f790 isActive=false isCancelled=false isCompleted=true

结构化并发机制的注意事项

有以下两点需要特别留意下,也是最开始接触协程时,最容易犯错的地方:

  • 如果使用了新 Job 上下文替换了原有父 Job 上下文,那么结构化并发机制将不起作用。
  • 如果使用了新的协程作用域来启动协程(例如:CoroutineScope(Dispatchers.IO).launch),同样会导致结构化并发机制失效。

让我们先来看一看第一种情况的示例:

1
2
3
4
5
6
7
8
fun main(): Unit = runBlocking {
val originalParent: Job = coroutineContext.job // 原有的父 Job
launch(Job()) { // 使用新 Job 替换了原有的父 Job
delay(1000)
println("I won't be printed.")
}
println("program exit")
}

程序运行结果如下:

1
program exit

在上面的例子中,父协程 runBlocking[^runBlocking] 并不会等待子协程 launch 结束,因为它与子协程没有建立关系。这是因为子协程使用来自参数的 Job 作为父 Job,因此它与父协程 runBlocking 没有关系。

接着再来看第二种情况的示例:

1
2
3
4
5
6
7
8
9
10
11
12
fun main(): Unit = runBlocking {
// 这里是 runBlocking 作用域,该作用域下启动的子协程,runBlocking 会等待其执行结束
launch {
delay(1000)
log("I will be printed.")
}
CoroutineScope(Dispatchers.IO).launch { // 使用新的协程作用域替换了原有的协程作用域
delay(2000)
log("I won't be printed.")
}
log("program exit")
}

执行结果如下:

1
2
17:02:01.147	program exit
17:02:02.164 I will be printed.

本例中,由于在 runBlocking 作用域内,又启动了新的作用域 CoroutineScope(Dispatchers.IO),那么新的协程作用域与 runBlocking 作用域是没有任何关系的,因此 runBlocking 并不会等待其执行结束。

综上所述,当一个协程有它自己的(独立的) Job 或拥有独立的协程作用域时,那么它将与它所在的当前协程(即上例中的 runBlocking,也就是父协程)没有任何关系。相当于它继承了其它的上下文,所以父子关系将不再适用。这会导致结构化并发机制失效,从而产生不受控的协程,所以通常应该避免这种情况发生。

重要说明:每个协程都会创建自己的 Job

非常重要的一点,需要在这里特别说明下:子协程虽然会继承来自父协程的上下文,但有个例,那就是 Job 这个上下文。它是唯一一个不是子协程直接从父协程继承过来的上下文。

每个协程都会创建自己的 Job

传递给协程 Builder(例如,launch)参数的 Job不代表这个子协程的 Job 就是传递过来的这个 Job,而是指传递过来的 Job 将会成为这个子协程创建的 Job 的父 Job

请先看下图,注意各个 CoroutineContext 上下文,颜色不同代表它们是不同的上下文。尤其要留意下其中的 Job

Context Job

以下示例将说明这一点:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
fun main(): Unit = runBlocking {
val parentJob = coroutineContext.job
println("parentJob=$parentJob")
val childJob = launch {
val childJobInLaunch = coroutineContext[Job]
println("childJobInLaunch=$childJobInLaunch")
delay(500)
}
println("childJob=$childJob")
parentJob.children.forEachIndexed { index, job ->
println("parent job child[$index]=$job")
}
childJob.join()

println("==========")
val childJobWithParentJob = launch(parentJob) {
val childJobInLaunch = coroutineContext[Job]
println("childJobInLaunch=$childJobInLaunch")
delay(500)
}
println("childJobWithParentJob=$childJobWithParentJob")
parentJob.children.forEachIndexed { index, job ->
println("parent job child[$index]=$job")
}
childJobWithParentJob.join()
}

运行结果如下:

1
2
3
4
5
6
7
8
parentJob=BlockingCoroutine{Active}@7960847b
childJob=StandaloneCoroutine{Active}@7a7b0070
parent job child[0]=StandaloneCoroutine{Active}@7a7b0070
childJobInLaunch=StandaloneCoroutine{Active}@7a7b0070
==========
childJobWithParentJob=StandaloneCoroutine{Active}@47d384ee
parent job child[0]=StandaloneCoroutine{Active}@47d384ee
childJobInLaunch=StandaloneCoroutine{Active}@47d384ee

协程上下文(CoroutineContext)

CoroutineContext 表示协程上下文,是由一组用于定义协程行为的元素组成的一个数据集,与协程密切相关。是 Kotlin 协程的一个“基本结构单元”

注意CoroutineContext 是一个数据集合,与 ListMap 非常类似。

如何运用协程上下文是至关重要的,以此来实现正确的线程行为、生命周期、异常以及调试。

协程上下文由以下几种元素组成,它们均继承自 CoroutineContext

  • Job:协程的句柄,对协程的控制和管理生命周期。
  • CoroutineName:协程的名称,可用于调试。
  • CoroutineDispatcher:调度器,确定协程在指定的线程来执行。
  • CoroutineExceptionHandler:协程异常处理器,处理未捕获的异常。
  • ContinuationInterceptor:在协程启动的时候进行拦截操作。一般不需要使用该上下文。

协程上下文是一个有索引的 Element 实例集合,每个 element 在这个集合中有一个唯一的 Key

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
public interface CoroutineContext {
// 从这个上下文中返回带有给定 [key] 的元素或 null。
public operator fun <E : Element> get(key: Key<E>): E?

// 从 [initial] 值开始累加该上下文的项,并从左到右应用 [operation] 到当前累加器值和该上下文的每个元素。
public fun <R> fold(initial: R, operation: (R, Element) -> R): R

// 返回一个上下文,包含来自这个上下文的元素和来自其他 [context] 的元素。
public operator fun plus(context: CoroutineContext): CoroutineContext

// 返回一个包含来自该上下文的元素的上下文,但不包含指定的 [key] 元素。
public fun minusKey(key: Key<*>): CoroutineContext

// [CoroutineContext] 元素的键。[E] 是带有这个键的元素类型。
public interface Key<E : Element>

// [CoroutineContext] 的一个元素。协程上下文的一个元素本身就是一个单例上下文。
public interface Element : CoroutineContext {
// 这个协程上下文元素的 key
public val key: Key<*>

public override operator fun <E : Element> get(key: Key<E>): E?

public override fun <R> fold(initial: R, operation: (R, Element) -> R): R

public override fun minusKey(key: Key<*>): CoroutineContext
}
}
  • <E> get(key) 操作符方法:可以通过 key 从这个上下文中获取这个 Element 元素或者 null。由于该方法是一个 operator(操作符),因此可以像访问 Map 元素那样使用 context[key] 这种中括号的形式来访问。

  • fold():提供遍历当前上下文中所有元素的能力。

  • plus(context) 操作符方法:顾名思义它是一个加法运算,多个上下文元素可以通过 + 的形式返回一个新的上下文,新的上下文里包含所有的 Element,如果遇到重复的(Key 一样的),那么用 + 号右边的 Element 会替代左边的。有一个很重要的事情需要小心 —— 要注意它们结合的次序,因为这个 + 运算符是不对称的。

  • minusKey(key):与 plus 相反,减法运算,删除当前上下文中指定 key 的元素。返回的是不包含指定 key 元素的上下文。

Element:协程上下文的一个元素,本身就是一个单例上下文,里面有一个 key,是这个元素的索引。Element 本身也实现了CoroutineContext 接口,这就好比 Int 实现了 List<Int>。为什么元素本身也是集合呢?主要是 Element 它不会存放除它自己以外的数据;Element 属性又有一个 key,是协程上下文这个集合中元素的索引。这个索引在元素里面,说明元素一产生就找到自己的位置。

注意:协程上下文的内部实现实际是一个单链表。

CoroutineName

1
2
3
4
5
6
7
8
// 用户指定的协程名称。此名称用于调试模式。
public data class CoroutineName(
// 定义协程的名字
val name: String
) : AbstractCoroutineContextElement(CoroutineName) {
// CoroutineName 实例在协程上下文中的 key
public companion object Key : CoroutineContext.Key<CoroutineName>
}

CoroutineName是用户指定的协程名称,方便调试和定位问题:

1
2
3
4
5
6
GlobalScope.launch(CoroutineName("GlobalScope")) {
launch(CoroutineName("CoroutineA")) { // 指定协程名称
val coroutineName = coroutineContext[CoroutineName] // 获取协程名称
print(coroutineName)
}
}

协程内部可以通过 coroutineContext 这个全局属性直接获取当前协程的上下文。打印数据如下:

1
[DefaultDispatcher-worker-2] CoroutineName(CoroutineA)

上下文组合

从上面的协程创建的函数中可以看到,协程上下文的参数只有一个,但是怎么传递多个上下文元素呢?CoroutineContext 可以使用 + 运算符进行合并。由于 CoroutineContext 是由一组元素组成的,所以加号右侧的元素会覆盖加号左侧的元素,进而组成新创建的 CoroutineContext

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
fun main(args: Array<String>) {
runBlocking {
println("runBlocking - start")

// 通过 + 号运算添加多个上下文元素
var context = Job() + CoroutineName("cn-1") + Dispatchers.Main
println("contextName=${context[CoroutineName]}")
println("context=$context")

// 添加重复 Dispatchers 元素,Dispatchers.IO 会替换 Dispatchers.Main
context += Dispatchers.IO
println("context replace with Dispatchers.IO=$context")

// 移除 CoroutineName 元素
context = context.minusKey(context[CoroutineName]!!.key)
println("context remove CoroutineName=$context")
println("contextName=${context[CoroutineName]}")

// 移除 Dispatchers.IO 元素
context = context.minusKey(Dispatchers.IO.key)
println("context remove Dispatchers.IO=$context")

// 移除 Job 元素
context = context.minusKey(Job)
println("context remove Job=$context")
}
}

注意:如果有重复的元素(即 key 一致)则右边的会替换左边的元素。执行结果如下:

1
2
3
4
5
6
7
contextName=CoroutineName(cn-1)
context=[JobImpl{Active}@6c9f5c0d, CoroutineName(cn-1), Dispatchers.Main[missing]]
context replace with Dispatchers.IO=[JobImpl{Active}@6c9f5c0d, CoroutineName(cn-1), Dispatchers.IO]
context remove CoroutineName=[JobImpl{Active}@6c9f5c0d, Dispatchers.IO]
contextName=null
context remove Dispatchers.IO=JobImpl{Active}@6c9f5c0d
context remove Job=EmptyCoroutineContext

Job & Deferred

Job

什么是 Job

Job 是协程的句柄。Job 实例作为协程的唯一标识,用于处理协程,并且负责管理协程的生命周期。

我们可以把 Job 看成协程对象本身,它封装了协程中需要执行的代码逻辑。协程的操作方法都在 Job 身上。Job 具有生命周期并且可以取消,它本身也是上下文元素,继承自 CoroutineContext

Job 还可以有层级关系,一个 Job 可以包含多个子 Job。它们之间有如下关系:

  • 当父 Job 被取消后,所有的子 Job 也会被自动取消。
  • 当子 Job 由于异常CancellationException 除外)被取消的话,那么父 Job 也会被取消,从而导致该父 Job 内的其它子 Job 也被自动取消。这种默认的行为可以通过 SupervisorJob 来修改。
  • 具有多个子 Job 的父 Job 会等待所有子 Job 完成(或者取消)后,自己才会执行完成。

这里需要特别留意第二条,也就是子 Job 异常时,对父 Job 及其它子 Job 的影响。

Job 创建方法

创建 Job 实例的方法有两种:

  • 通过 launch 构建协程返回 Job。此种 Job 我们称之为 Coroutine job

  • 通过 Job()SupervisorJob() 这两个工厂方法来创建 Job。此种 Job 我们称之为 CompletableJob

    这两个工厂方法返回的都是 CompletableJob 类型(继承自 Job)的 Job。可以通过调用 CompletableJob.complete 方法来完成Job

这里稍微说明下,Job() 虽然长的像构造函数,但它却是一个工厂方法。因为 Job 是一个接口,不可能有构造方法。而且 Job() 返回的类型是 CompletableJob

1
2
3
4
public interface CompletableJob : Job {
public fun complete(): Boolean
public fun completeExceptionally(exception: Throwable): Boolean
}
  • fun complete(): Boolean

    用于完成 Job。 如果 Job 由于此次调用而完成,则返回 true,否则返回 false(代表已经完成了)。该函数被调用之后,若被再次调用,则不会有任何效果,并且始终返回 false

    此函数会将尚未完成未取消Job 转变成已完成状态。 不过,如果此 Job 含有子 Job,则会先转变为完成中状态,并等待其所有子 Job 完成后,再变为已完成状态。

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    fun main() = runBlocking {
    val job = Job()
    launch(job) {
    delay(500)
    println("First child done.")
    }
    launch(job) {
    delay(1000)
    println("Second child done.")
    }
    // We must complete job or else the join will in active status forever.
    job.complete()
    job.join() // Wait for the job done.
    }

    运行结果如下:

    1
    2
    First child done.
    Second child done.
  • fun completeExceptionally(exception: Throwable): Boolean

    complete() 类似,不同之处是使用一个指定的异常来完成 Job。 这意味着所有子 Job立刻被取消,异常信息将被包装在 CancellationException 中。

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    private val sdf = SimpleDateFormat("HH:mm:ss.SSS")

    fun log(msg: String) {
    println("${sdf.format(Date())}: $msg")
    }

    fun main(): Unit = runBlocking {
    val parentJob = Job()
    log("parentJob=$parentJob")
    val childJob = launch(parentJob) {
    delay(3000)
    log("Job done.")
    }
    log("childJob=$childJob")
    parentJob.invokeOnCompletion {
    log("cause: $it")
    log("in invokeOnCompletion() -> parentJob=$parentJob isActive=${parentJob.isActive} isCancelled=${parentJob.isCancelled} isCompleted=${parentJob.isCompleted}")
    log("in invokeOnCompletion() -> childJob=$childJob isActive=${childJob.isActive} isCancelled=${childJob.isCancelled} isCompleted=${childJob.isCompleted}")

    }
    log("prepare to call completeExceptionally()...")
    parentJob.completeExceptionally(AssertionError("complete by custom exception"))
    log("after completed -> parentJob=$parentJob isActive=${parentJob.isActive} isCancelled=${parentJob.isCancelled} isCompleted=${parentJob.isCompleted}")
    log("after completed -> childJob=$childJob isActive=${childJob.isActive} isCancelled=${childJob.isCancelled} isCompleted=${childJob.isCompleted}")

    delay(5000)
    log("program exit")

    运行结果如下:

    1
    2
    3
    4
    5
    6
    7
    8
    9
    15:30:26.604: parentJob=JobImpl{Active}@1f89ab83
    15:30:26.611: childJob=StandaloneCoroutine{Active}@3d04a311
    15:30:26.614: prepare to call completeExceptionally()...
    15:30:26.623: after completed -> parentJob=JobImpl{Cancelling}@1f89ab83 isActive=false isCancelled=true isCompleted=false
    15:30:26.623: after completed -> childJob=StandaloneCoroutine{Cancelling}@3d04a311 isActive=false isCancelled=true isCompleted=false
    15:30:26.628: cause: java.lang.AssertionError: complete by custom exception
    15:30:26.628: in invokeOnCompletion() -> parentJob=JobImpl{Cancelled}@1f89ab83 isActive=false isCancelled=true isCompleted=true
    15:30:26.628: in invokeOnCompletion() -> childJob=StandaloneCoroutine{Cancelled}@3d04a311 isActive=false isCancelled=true isCompleted=true
    15:30:31.629: program exit

在协程中获取 Job

由于 Job 是一个协程上下文,因此我们可以使用 coroutinContext[Job] 来访问它。同时还有一个 CoroutineContext 扩展属性 job,可以让我们直接使用 coroutinContext.job 来获取 Job

1
public val CoroutineContext.job: Job get() = get(Job) ?: error("Current context doesn't contain Job in it: $this")

使用示例:

1
2
3
4
fun main(): Unit = runBlocking {
val job1 = coroutineContext[Job]
val job2 = coroutineContext.job
}

Job 状态

Job 是一个接口类型,它具有以下三种状态:

状态 说明
isActive 活跃的。当 Job 处于活跃状态时,该值为 true。如果 Job 已经开始,但还没有完成,也没有取消或者失败,则均视为活跃状态。
isCompleted 已完成。无论任何原因导致 Job 完成,则该值为 true。已取消、已失败和已完成的 Job 均被视为完成状态。
isCancelled 已取消。无论任何原因导致 Job 被取消时,则该值为 true。无论是通过显式调用 cancel 或因为它已经失败亦或是它的子或父 Job 被取消,这些情况均被视为已取消状态。

其实 Job 是包含一系列如下表中的状态的,但是我们无法访问全部状态,仅能访问上述三个状态。

Job 实际状态值与可访问的状态值对应关系如下:

State isActive isCompleted isCancelled
New (optional initial state) 可选初始状态 false false false
Active (default initial state) 默认初始状态 true false false
Completing (transient state) 瞬时状态 true false false
Cancelling (transient state) 瞬时状态 false false true
Cancelled (final state) 最终状态 false true true
Completed (final state) 最终状态 false true false

Job States

Active 状态下,Job 处于运行状态。如果 Job 是通过协程构建器(即协程 Builder)创建的,那么这个状态就是协程主体运行时的状态。在这种状态下,我们可以启动子协程。大多数协程会在 Active 状态下启动,只有那些延迟启动的(即启动模式是 LAZY 的)才会以 New 状态启动。

当协程完成的时候,它的状态会变为 Completing,然后等待所有子协程完成。一旦它的所有子协程任务都完成了,其状态就会变为 Completed,这是一个最终状态。或者,如果 Job 在运行时(即处于 ActiveCompleting 状态)被取消或失败,其状态会变为 Cancelling,在这种状态下,我们会有最后的时机来做一些清理,比如关闭连接或释放资源。当该 Job 执行完成后(包括其所有子 Job 完成后),Job 才会进入到 Cancelled 最终状态。

要注意一点:一个协程被取消,不仅仅是将其停止,它是使用一个异常在内部取消的。因此我们可以在协程内部获悉此情况,进而有机会做一些必要的工作。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
fun main(): Unit = runBlocking {
val exceptionHandler = CoroutineExceptionHandler { _, e ->
println("exceptionHandler cause: $e")
}
// 根据需要及用法可选用 CoroutineExceptionHandler
val cs = CoroutineScope(Job() + exceptionHandler)
val job = cs.launch {
try {
repeat(10) { i ->
delay(200)
println("I'm working... $i")
}
println("I won't be printed.")
} catch (err: CancellationException) {
println("Job has been cancelled. Type: ${err.javaClass.simpleName} Reason: ${err.message}")
// 此处最好是将 CancellationException 异常抛出,或二次包装后再抛出,这样便可以尽快终止工作。
// 如果不抛出异常的话,则会等待协程执行完毕。
// throw err // 不会取消父 Job
// or
// 注意,若抛出其它异常(CancellationException 除外),则会同时取消父 Job
throw AssertionError("cancelled by custom reason", err)
} finally {
println("Resources have been released safely.")
}
println("I won't be printed either.")
}
job.invokeOnCompletion { println("invokeOnCompletion cause: ${it?.message}") }
delay(1100)
job.cancelAndJoin()
println("Cancelled successfully")
delay(1000)
}

执行结果如下:

1
2
3
4
5
6
7
8
9
10
I'm working... 0
I'm working... 1
I'm working... 2
I'm working... 3
I'm working... 4
Job has been cancelled. Type: JobCancellationException Reason: StandaloneCoroutine was cancelled
Resources have been released safely.
exceptionHandler cause: java.lang.AssertionError: cancelled by custom reason
invokeOnCompletion cause: cancelled by custom reason
Cancelled successfully

Job 接口及方法介绍

Job 接口的代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
public interface Job : CoroutineContext.Element {
// 活跃的,是否仍在执行
// 当 Job 处于活动状态时为 true
// 如果 Job 未被取消或没有失败,则均处于活跃状态
public val isActive: Boolean

//当 Job 正常结束或者由于异常结束,均返回 true
public val isCompleted: Boolean

// 当 Job 被主动取消或者由于异常结束,均返回 true
public val isCancelled: Boolean

// 启动协程,如果启动成功,则返回 true。如果协程已经启动或已完成或已取消,则返回 false
public fun start(): Boolean

// 取消 Job,可通过传入 Exception 说明具体原因
public fun cancel(cause: CancellationException? = null)

// 挂起协程直到此 Job 完成
public suspend fun join()

// 取消任务并等待任务完成,结合了 [cancel] 和 [join] 的调用
public suspend fun Job.cancelAndJoin()

// 给 Job 设置一个完成通知,当 Job 执行完成的时候会同步执行这个函数
public fun invokeOnCompletion(handler: CompletionHandler): DisposableHandle
}

Thread 相比,Job 同样有 join(),调用时会挂起(线程的 join() 则会阻塞线程),直到协程完成;它的 cancel() 可以类比 Threadinterrupt(),用于取消协程;isActive 则是可以类比 ThreadisAlive(),用于查询协程是否仍在执行。

下面列举几个比较有用的 Job 函数:

  • fun start(): Boolean

    调用该函数来启动对应的协程。如果当前协程还没有启动,调用该函数会返回 true。如果当前协程已经启动或者已经处于完成状态(Job 被取消也相当于完成状态,详见前文“Job 状态”),则调用该函数会返回 false

    注意

    • 启动模式是“LAZY”时,必须调用此函数才能启动协程。或者调用 join()await() 隐式启动协程。
    • 启动模式是“DEFAULT” 时,通常不需要调用此函数,因为该模式下,协程被调度后通常会立刻启动。只所以说是“通常”,是因为该模式下协程的调度和执行并不是原子的,所以就存在协程被调度后没有立刻执行的可能。调用此方法可以确保未被执行的协程可以被执行。
    • 启动模式是“ATOMIC” 时,不需要调用此函数,因为协程一定会启动,无论之前是否被取消。
    • 启动模式是“UNDISPATCHED”时,也不需要调用此函数,因为和“ATOMIC” 模式类似,协程一定会启动,无论之前是否被取消。
  • fun cancel(cause: CancellationException? = null)

    取消 Job,可设置取消原因。 取消原因可以通过 CancellationException 进行设定或者是直接设置一个错误消息(源自 Job 的扩展函数)。

    调用 cancel 方法并不会使 Job 立刻进入“Cancelled(已取消)”状态,而是会立刻进入“Cancelling(取消中)”状态,当该 Job 执行完成后(包括其所有子 Job 完成),才会进入“Cancelled(已取消)”状态。详见官网说明。

    再次强调一遍先前说过的话:一个协程被取消,不仅仅是将其停止,它是使用一个异常在内部取消的。因此我们可以在协程内部获悉此情况,进而有机会做一些必要的工作。

  • fun invokeOnCompletion(handler: CompletionHandler): DisposableHandle

    通过这个函数可以给 Job 设置一个完成通知,当 Job 执行完成的时候会同步执行这个通知函数。 回调的通知对象类型为:typealias CompletionHandler = (cause: Throwable?) -> Unit. CompletionHandler 参数代表了 Job 是如何执行完成的。 cause 有下面三种情况:

    • 如果 Job 是正常执行完成的,则 cause 参数为 null
    • 如果 Job 是正常取消的,则 cause 参数为 CancellationException 对象。这种情况不应该当做错误处理,这是任务正常取消的情形。所以一般不需要在错误日志中记录这种情况。
    • 其他情况表示 Job 执行失败了。

    这个函数的返回值为 DisposableHandle 对象,如果不再需要监控 Job 的完成情况了, 则可以调用 DisposableHandle.dispose 函数来取消监听。如果 Job 已经执行完了, 则无需调用 dispose 函数,会自动取消监听。

    CompletionHandler 函数不要再抛出任何异常。如果这么做的话,抛出的异常会被包装成 CompletionHandlerException 后再被抛出。这可能会导致不相关代码的崩溃。

    注意:CompletionHandler 函数的实现必须要速度快,非阻塞而且要是线程安全的。该函数可以与其它代码并发调用,但并不保证会在调用该函数的线程上执行该函数。

  • suspend fun join()

    注意:这是一个 suspend 函数,所以只能在协程内调用。

    这个函数会将协程挂起,等待其执行完成处于完成状态后,再恢复执行后续代码。如果协程已经完成,join() 则不会挂起,而是立即返回。所以 join() 函数一般用来在另外一个协程中等待其他 job 执行完成后继续执行,当其他 job 执行完成后, job.join() 函数恢复,这时 job 这个任务已经处于完成状态,而调用 job.join() 的协程还继续处于 activie 状态。

    如果 Job 处于 New 状态(例如,启动模式是 LAZY),那么调用 join() 则会启动该 Job 而后再等待其完成

    请注意,只有在其所有子 Job 都完成后,被挂起的协程才会变成完成状态。

    这个挂起函数是可以被取消的,因为总是需要检查调用 job.join() 的协程是否被取消。当调用该挂起函数的时候或 job 已经处于挂起状态时,若此时调用 job.join() 的协程被取消或已完成(例如由于异常导致协程完成),则此函数将抛出 CancellationException 异常。

    需要特别留意这一点,如果子协程失败了,那么父协程在子协程上调用 join() 时将会抛出 CancellationException 异常,因为子协程失败默认会取消父协程,除非子协程是从 supervisorScope 中启动的。

    此函数可用于带有 onJoin 子句的 select 调用。 使用 isCompleted 无需等待即可检查此 job 是否已完成。

  • suspend fun Job.cancelAndJoin()

    这一个便捷函数,结合了 cancel()join() 这两个功能。由于涉及到挂起函数 join(),所以该函数也是挂起函数。

    通常情况下,我们虽然取消了 job,但是依然希望它能执行完再结束,这种情况我们通常会先调用 cancel() 再调用 join()。于是就有了 cancelAndJoin() 这个便捷函数,在该函数体内,刚好是先调用的 cancel() 再调用的 join()

值得说明的是:Job 的所有方法都是线程安全的,因此我们可以在协程中放心调用。

结构化并发(structured concurrency)

协程遵循结构化并发的原则,这意味着新协程只能在特定的协程作用域内启动,这同时也限定了协程的生命周期。

在实际应用程序中,我们可以会启动大量的协程。结构化并发可以确保它们不丢失,不泄漏。在其所有子协程完成之前,外部的协程是无法完成的。结构化并发还确保了子协程可以正常上报其产生的异常而不会丢失这些异常信息。

从使用角度上来讲,结构化并发就是指在一个协程内启动另一个协程。

结构化并发完成依赖 Job 上下文,而这又体现在 Job 的父子关系上。因此正确理解 Job 的父子关系是非常非常重要的,这将影响父协程对子协程的管理,同时也会影响父子协程间的异常处理。然而,正确理解它们的关系并不容易,因为会有很容易让人引起误会的地方,从而引起无法取消子协程或错误的协程异常处理等问题。后文会详细的详解。

Deferred

Deferred 接口继承自 Job,所以 Job 相关的内容在 Deferred 上同样适用,并具有与 Job 相同的状态机制。它是 async 构建协程返回的一个协程任务,可通过调用 await() 方法等待协程执行完成并获取结果。不同的是 Job 没有结果值,Deffer 有结果值。

Deferred 额外提供了三个函数来处理和协程执行结果相关的操作。

1
2
3
4
5
6
7
8
9
10
11
12
public interface Deferred<out T> : Job {
// 等待协程执行完成并获取结果
public suspend fun await(): T

public val onAwait: SelectClause1<T>

@ExperimentalCoroutinesApi
public fun getCompleted(): T

@ExperimentalCoroutinesApi
public fun getCompletionExceptionOrNull(): Throwable?
}
  • T:这里多了一个泛型参数 T,它表示返回值类型,通过 await() 函数可以获取这个返回值。

  • suspend fun await(): T

    等待协程执行完毕并返回结果。如果异常结束或协程被取消,则会抛出 CancellationException 异常;如果协程尚未完成,则挂起直到协程执行完成。

  • fun getCompleted(): T

    返回协程执行结果。如果协程尚未执行完,调用该函数会抛出 IllegalStateException 异常。 如果协程被取消,则会抛出相应的异常。

    该函数的设计初衷是为了在 invokeOnCompletion 中获取协程的返回值,这样就可以确保是在协程完成后,再获取协程的返回值。

  • fun getCompletionExceptionOrNull(): Throwable?

    如果协程由于异常而被取消从而进入完成状态,则返回相应的异常信息。如果协程正常完成,则返回 null。如果协程没有执行完,调用该函数同样会抛出 IllegalStateException 异常。可以通过 isCompleted 来判断当前协程是否执行完毕。

    该函数的设计初衷同样是为了在 invokeOnCompletion 中获取协程的返回值,这样就可以确保是在协程完成后,再获取协程的返回值。

SupervisorJob

SupervisorJob 是一个顶级函数。函数定义如下:

1
public fun SupervisorJob(parent: Job? = null) : CompletableJob = SupervisorJobImpl(parent)

调用该函数会创建一个处于 Active 状态的 SupervisorJob。如前所述, Job 是有父子关系的。默认情况下,子 Job 失败同时也会取消父 Job。这种默认行为可能不是我们期望的,比如在 Activity 中有两个子 Job 分别获取同一篇文章的评论内容和作者信息,如果其中一个失败了,我们并不希望父 Job 被自动取消,这样会导致另外一个子 Job 也被取消。SupervisorJob 就可以改变这种默认行为,子 Job 失败不会影响父 Job,从而让父 Job 的各个子 Job 之间互不影响。

该函数有一个可选的 parent 参数,如果指定了该参数,则所返回的 SupervisorJob 就是参数 parent 的子 Job。如果 parent Job 失败了或者被取消了,则这个 SupervisorJob 本身及其所有子 Job 也会被取消。如果在这个 SupervisorJob 上调用 cancel 方法,并且指定了的除 CancellationException 以外的异常,则它的父 Job 也会被取消。

不过需要特别注意的是,刚开始学习 SupervisorJob 的时候会有很多让人误解的用法,导致一头雾水。

先来看一个符合预期的示例:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
fun main() {
val cs = CoroutineScope(SupervisorJob()) // 注意这里的 SupervisorJob()
val childJob1 = cs.launch {
delay(100)
log("child job 1: after working...")
throw AssertionError("exception from child job 1")
}
val childJob2 = cs.launch {
delay(1000)
log("I'm another normal child job.")
}
log("childJob1=$childJob1")
log("childJob2=$childJob2")
Thread.sleep(2000)
childJob1.invokeOnCompletion { log("childJob1=$childJob1 cause: $it") }
childJob2.invokeOnCompletion { log("childJob2=$childJob2 cause: $it") }
}

运行结果如下:

1
2
3
4
5
6
7
8
14:47:58.344	childJob1=StandaloneCoroutine{Active}@6c629d6e
14:47:58.351 childJob2=StandaloneCoroutine{Active}@66133adc
14:47:58.416 child job 1: after working...
Exception in thread "DefaultDispatcher-worker-2" java.lang.AssertionError: exception from child job 1
...其它异常信息略...
14:47:59.314 I'm another normal child job.
14:48:00.352 childJob1=StandaloneCoroutine{Cancelled}@6c629d6e cause: java.lang.AssertionError: exception from child job 1
14:48:00.353 childJob2=StandaloneCoroutine{Completed}@66133adc cause: null

childJob1 产生异常后,只有它自己被取消了,childJob2 并未受到任何影响是正常完成的。一切都像预期的那样,没有什么不对的地方。

让我们再来看一个不太符合预期的示例:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
fun main() {
val cs = CoroutineScope(SupervisorJob())
val parentJob = cs.launch {
val childJob1 = launch { // 注意这里又启动了一个协程
log("Child 1 is working...")
delay(100)
log("Child 1 is going to die...")
throw AssertionError("exception from child job 1")
}
val childJob2 = launch { // 注意这里又启动了一个协程
log("Child 2 is working...")
delay(1000)
log("Child 2 is done...")
}
log("childJob1=$childJob1")
log("childJob2=$childJob2")
childJob1.invokeOnCompletion { log("childJob1=$childJob1 cause: $it") }
childJob2.invokeOnCompletion { log("childJob2=$childJob2 cause: $it") }
}
parentJob.invokeOnCompletion { log("parentJob=$parentJob cause: $it") }
log("parentJob=$parentJob parentJob children count=${parentJob.children.count()}")
Thread.sleep(2000)
}

运行结果如下:

1
2
3
4
5
6
7
8
9
10
11
14:34:46.458	childJob1=StandaloneCoroutine{Active}@35c01fcf
14:34:46.458 Child 2 is working...
14:34:46.458 parentJob=StandaloneCoroutine{Active}@27082746 parentJob children count=2
14:34:46.458 Child 1 is working...
14:34:46.462 childJob2=StandaloneCoroutine{Active}@63a401d8
14:34:46.570 Child 1 is going to die...
14:34:46.571 childJob1=StandaloneCoroutine{Cancelled}@35c01fcf cause: java.lang.AssertionError: exception from child job 1
14:34:46.572 childJob2=StandaloneCoroutine{Cancelled}@63a401d8 cause: kotlinx.coroutines.JobCancellationException: Parent job is Cancelling; job=StandaloneCoroutine{Cancelling}@27082746
Exception in thread "DefaultDispatcher-worker-3" java.lang.AssertionError: exception from child job 1
...其它异常信息略...
14:34:46.583 parentJob=StandaloneCoroutine{Cancelled}@27082746 cause: java.lang.AssertionError: exception from child job 1

childJob1 产生异常后,其父 Job 也就是 parentJob 也同时被取消了,进而导致 childJob2 也被取消了。似乎 SupervisorJob() 没有生效。

让我们把上一个示例稍稍改动下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
fun main() {
val cs = CoroutineScope(Job()) // 注意这里的参数是 Job
val parentJob = cs.launch(SupervisorJob()) { // 注意这里的参数是 SupervisorJob
val childJob1 = launch {
log("Child 1 is working...")
delay(100)
log("Child 1 is going to die...")
throw AssertionError("exception from child job 1")
}
val childJob2 = launch {
log("Child 2 is working...")
delay(1000)
log("Child 2 is done...")
}
log("childJob1=$childJob1")
log("childJob2=$childJob2")
childJob1.invokeOnCompletion { log("childJob1=$childJob1 cause: $it") }
childJob2.invokeOnCompletion { log("childJob2=$childJob2 cause: $it") }
}
parentJob.invokeOnCompletion { log("parentJob=$parentJob cause: $it") }
log("parentJob=$parentJob")
parentJob.children.forEachIndexed { i, child -> log("child[$i]=$child") }
Thread.sleep(2000)
}

运行结果如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14:54:46.011	childJob1=StandaloneCoroutine{Active}@30a80103
14:54:46.011 Child 2 is working...
14:54:46.024 childJob2=StandaloneCoroutine{Active}@72acc02a
14:54:46.011 parentJob=StandaloneCoroutine{Active}@8807e25
14:54:46.011 Child 1 is working...
14:54:46.037 child[0]=StandaloneCoroutine{Active}@30a80103
14:54:46.037 child[1]=StandaloneCoroutine{Active}@72acc02a
14:54:46.129 Child 1 is going to die...
14:54:46.131 childJob1=StandaloneCoroutine{Cancelled}@30a80103 cause: java.lang.AssertionError: exception from child job 1
14:54:46.131 childJob2=StandaloneCoroutine{Cancelled}@72acc02a cause: kotlinx.coroutines.JobCancellationException: Parent job is Cancelling; job=StandaloneCoroutine{Cancelling}@8807e25
Exception in thread "DefaultDispatcher-worker-2" java.lang.AssertionError: exception from child job 1
...其它异常信息略...
14:54:46.145 parentJob=StandaloneCoroutine{Cancelled}@8807e25 cause: java.lang.AssertionError: exception from child job 1

结果和上一个示例一样,当 childJob1 产生异常后,其父 Job 也就是 parentJob 也同时被取消了,进而导致 childJob2 也被取消了。似乎 SupervisorJob() 又没生效。

看了上面几个示例后,是不是发现 SupervisorJob() 有些神奇?运行结果总是这么的出其不意。其实产生这种“不太符合预期”错觉的根本原因是没有正确理解父子 Job 关系导致的。

让先来分析一下刚刚那个示例,为也便于讲解,我们先把代码最简化处理:

1
2
3
4
5
val cs = CoroutineScope(Job()) // 注意这里用的是 Job
val parentJob = cs.launch(SupervisorJob()) { // 注意这里用的是 SupervisorJob
val childJob1 = launch { throw AssertionError("exception from child job 1") }
val childJob2 = launch { ... }
}

大家先来回答一个问题:childJob1childJob2 的父 Job 是哪种类型?是普通的 Job 类型,还是 SupervisorJob 类型?

答案是 Job 类型。希望你答对了。

虽然乍一看很容易让人误以为是 SupervisorJob 类型,但是若你还记得前文重点提及过的结论:

每个协程都会创建自己的 Job

那么你可能已经理解了其中的原因了。parentJob 是通过 cs.launch 启动的,而子协程唯一不会继承父协程的上下文就是 Job,所以通过 cs.launch 启动的协程会创建自己的 Job 而不是用的协程 Builder 的参数 SupervisorJob。也就是说,cs.launch 内部的 Job 依然是普通的 Job 类型,若其子 Job 产生异常的话(本例中的 childJob1),依然会取消它自己及其父 Job(本例中的 parentJob)。由于父 Job 被取消了,进而导致其另一个子 Job (本例中的 childJob2)也被取消了。

借本例再次说明下,协程 Builder 的参数 SupervisorJob 仅代表用该 SupervisorJob 作为 cs.launch 启动的 Job(即 parentJob) 的父 Job。也就是说,在本例中 SupervisorJob() 其实并没有被使用到。

以下示例可以说明刚才的 Job 父子关系:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
fun main() {
val cs = CoroutineScope(Job())
val supervisorJob = SupervisorJob() // SupervisorJobImpl{Active}@449b2d27
val parentJob = cs.launch(supervisorJob) { // StandaloneCoroutine{Completing}@5479e3f
val childJob1 = launch { delay(100) } // StandaloneCoroutine{Active}@23e2242b
val childJob2 = launch { delay(150) } // StandaloneCoroutine{Active}@1111ab24
}
parentJob.job.children.forEachIndexed { index, child ->
// child[0]=StandaloneCoroutine{Active}@23e2242b
// child[1]=StandaloneCoroutine{Active}@1111ab24
}
supervisorJob.job.children.forEachIndexed { index, child ->
// child[0]=StandaloneCoroutine{Completing}@5479e3f
}
}

用下图说明可能更加形象些:

Job Relations

了解了上面 Job 的父子关系后,再回看之前的例子:

1
2
3
4
5
val cs = CoroutineScope(Job())
val parentJob = cs.launch(SupervisorJob()) {
val childJob1 = launch { throw AssertionError("exception from child job 1") }
val childJob2 = launch { ... }
}

你现在应该明白为什么 SupevisorJob 没有起作用了,因为本例中的这个 SupevisorJob,其实并没有被使用到,自然就不会起作用。因此本例中,协程的取消关系是按照其默认情况处理的,也就是:子 Job 由于异常被取消,其父 Job 及该父 Job 下的其它的子 Job 也会被取消。

建议大家再仔细回看下前文中重点提到的注意事项。

若想让 SupervisorJob 生效的话,可以像下面这样,使用 supervisorScope 或者 CoroutineScope(SupervisorJob())

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
fun main() {
val cs = CoroutineScope(Job())
val parentJob = cs.launch {
supervisorScope { // 使用 supervisorScope
val childJob1 = launch {
delay(100)
throw AssertionError("exception from child job 1")
}
val childJob2 = launch {
delay(200)
log("Child 2 is done...")
}
}
}
Thread.sleep(1000)
}

执行结果如下:

1
2
3
Exception in thread "DefaultDispatcher-worker-2" java.lang.AssertionError: exception from child job 1
...其它异常信息略...
17:18:53.563 Child 2 is done...
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
fun main() {
val cs = CoroutineScope(SupervisorJob()) // 使用 CoroutineScope(SupervisorJob())
val childJob1 = cs.launch { // 直接启动协程,其内部没有再启动其它协程
// 若内部想继续开启受控的子协程,请使用以下方法之一:
// - 内部直接使用 launch 启动子协程。
// - 使用 launch 启动子协程时指定或复用受控的父 Job,以确保结构化并发机制有效:
// - launch(job) or launch(supervisorJob) // 指定的 Job 一定要是可控的,不要直接使用工厂方法 Job(如 Job() 等)作为父 Job,否则结构化并发机制将失效。
// - launch(cs.coroutineContext)
// - 内部再次使用 supervisorScope。
// - 使用新的协程作用域并指定或复用受控的父 Job,以确保结构化并发机制有效:
// - CoroutineScope(job) or CoroutineScope(supervisorJob) // 指定的 Job 一定要是可控的,不要直接使用工厂方法 Job(如 Job() 等)作为父 Job,否则结构化并发机制将失效。
// - CoroutineScope(cs.coroutineContext)
// 请读者自行尝试以上几种情况。
delay(100)
log("Child 1 is going to die...")
throw AssertionError("exception from child job 1")
}
val childJob2 = cs.launch { // 直接启动协程,其内部没有再启动其它协程
delay(150)
log("Child 2 is done.")
}
log("childJob1=$childJob1")
log("childJob2=$childJob2")
Thread.sleep(1000)
childJob1.invokeOnCompletion { log("childJob1=$childJob1 cause: $it") }
childJob2.invokeOnCompletion { log("childJob2=$childJob2 cause: $it") }
log("===> program done <===")
}

执行结果如下:

1
2
3
4
5
6
7
8
9
10:55:57.697	childJob1=StandaloneCoroutine{Active}@6c629d6e
10:55:57.705 childJob2=StandaloneCoroutine{Active}@66133adc
10:55:57.762 Child 1 is going to die...
Exception in thread "DefaultDispatcher-worker-1" java.lang.AssertionError: exception from child job 1
...其它异常信息略...
10:55:57.810 Child 2 is done.
10:55:58.711 childJob1=StandaloneCoroutine{Cancelled}@6c629d6e cause: java.lang.AssertionError: exception from child job 1
10:55:58.711 childJob2=StandaloneCoroutine{Completed}@66133adc cause: null
10:55:58.711 ===> program done <===

CoroutineDispatcher 调度器

CoroutineDispatcher 调度器用于指定协程所运行的线程。可以将协程限制在一个特定的线程上执行,或将它分派到一个线程池,亦或是让它不受限地运行。通俗的讲,CoroutineDispatcher 是用来帮助我们切换线程的。

所有协程构造器(如 launchasync 等)都接受一个可选参数,即 CoroutineContext ,该参数可用于显式指定所要使用的 CoroutineDispatcher 及其它协程上下文。

要在主线程之外运行代码,可以指定 Kotlin 协程在 DefaultIO 调度程序上执行工作。在 Kotlin 中,所有协程都必须在 CoroutineDispatcher 中运行,即使它们在主线程上运行也是如此。协程可以自行暂停,而 CoroutineDispatcher 负责将其恢复。

协程需要调度的位置就是挂起点的位置,只有当挂起点正在挂起的时候才会进行调度,实现调度需要使用协程的拦截器。调度的本质就是解决挂起点恢复之后的协程逻辑在哪里运行的问题。调度器也属于协程上下文一类,它继承自拦截器 CoroutineDispatcher。它是所有协程调度程序实现扩展的基类(我们很少会自己自定义调度器)。可以使用 newSingleThreadContextnewFixedThreadPoolContext 创建私有线程池。也可以使用 asCoroutineDispatcher 扩展函数将任意 java.util.concurrent.Executor 转换为调度程序。

Kotlin 提供了四个调度器,您可以使用它们来指定应在何处运行协程:

调度器模式 说明 适用场景
Dispatchers.Main UI 调度器,Android 上的主线程。 通常该调度器是单线程的,主要用于 UI 交互或一些轻量级任务。
Dispatchers.Default 默认调度器,非主线程。CPU 密集型任务调度器,适合用于后台计算。如果使用协程构造器时,没有指定调度器,则默认使用该调度器。 通常处理一些单纯的计算任务,或者执行时间较短任务比如:json 的解析,数据计算等。
Dispatchers.Unconfined 一个不局限于任何特定线程的协程调度程序,即非受限调度器。通常不会用到该调度器。 编者注:本人没有使用过该调度器,因此不知道适用于什么场景。
Dispatchers.IO IO 调度器,非主线程,执行的线程是 IO 线程。 适合执行磁盘和网络 IO 相关的操作,比如:网络处理,数据库操作,文件读写等。
  • Dispatchers.Default

    该调度器背后使用了一个共享的线程池来运行其中的任务。该调度器保证,使用该调度器时,其最大并发任务数等于 CPU 的内核数量,但最小值是 2。注意,该调度器和 Dispatchers.IO 共享线程池,只不过该调度器限制了最大并发数。

  • Dispatchers.Main

    通常情况下,该调度器是单线程的。可以直接使用该调度器,也可以使用工厂方法 MainScope()

    MainScope() 的实现就使用了 SupervisorJobDispatchers.Main

    1
    public fun MainScope(): CoroutineScope = ContextScope(SupervisorJob() + Dispatchers.Main)
  • Dispatchers.IO

    该调度器背后使用了一个共享的线程池来运行其中的任务。默认的线程数量为 64CPU 的内核数量(以其中的较大值为准)。注意,该调度器和 Dispatchers.Default 共享线程池,只不过它们的最大并发数限制不同。

    需要说明的是,如果在默认调度器 Dispatchers.Default 内部使用 withContext(Dispatchers.IO) { ... } 的话,通常并不会发生线程切换操作。 在这种情况下,系统底层会尽可能的让它们保持在同一个线程上运行。

    由于线程共享的原因,在该调度器运行期间,可能有超过 64 个线程(默认的并行数量)被创建,但不一定都会被使用。

  • Dispatchers.Unconfined

    由于该调度器未定义线程池,所以执行的时候默认在启动线程。协程被挂起后,再次恢复时所在的线程是由调用 resume 的线程决定的。

由于子协程会继承父协程的上下文,所以我们通常会在父协程上设置调度器,这样的话其所有子协程也会默认使用该调度器。

withContext

在 Android 开发中,我们常常在子线程中请求网络并获取数据,然后切换到主线程更新 UI。官方为我们提供了一个挂起函数 withContext 用于切换调度器。例如,调用 withContext(Dispatchers.IO) 可以创建一个在 IO 线程池中运行的代码块(即 block),您放在该块内的任何代码都始终通过 IO 调度器执行,

该函数会使用用户指定的调度器,将 block 的执行转移到指定的线程中。此外该函数可以有返回值。

最常见的用法就是在主线程中启动一个协程,然后再通过 withContext(Dispatchers.IO) 调度到 IO 线程上去做网络请求,获取结果返回后,主线程上的协程就会恢复继续执行,完成 UI 的更新,这样可以保证主线程是安全的。

由于 withContext 可在不引入回调的情况下控制线程池,因此可以将其应用于非常小的函数,如从数据库中读取数据或执行网络请求。一种不错的做法是使用 withContext 来确保每个函数都是主线程安全的,那么可以从主线程调用每个函数。调用方也就无需再考虑应该使用哪个线程来执行函数了。您可以使用外部 withContext 来让 Kotlin 只切换一次线程,这样可以在多次调用的情况下,尽可能的避免了线程切换所带来的性能损失。

与基于回调的方式相比,withContext() 不会增加额外的开销。此外,在某些情况下,还可以优化 withContext() 调用,使其超越基于回调的实现。例如,如果某个函数需要先后调用十次网络请求,你可以在最外层调用 withContext() 让协程只切换一次线程,这样即使每个网络请求内部均会使用 withContext(),它也会留在同一调度程序上,从而避免频率切换线程。此外,协程还优化了 Dispatchers.DefaultDispatchers.IO 之间的切换,以尽可能避免线程切换。

使用线程池的调度器(例如,Dispatchers.IODispatchers.Default)不能保证代码块一直在同一线程上从上到下执行,在某些情况下,协程在 suspendresume 后可能会将任务移交给另一个线程来执行。这意味着,对于整个 withContext() 块,由于多线程并发之间的原子性可见性等原因,先后读取到的线程局部变量可能并非是同个值。

启动模式

CoroutineStart 是一个枚举类,用于在协程构建器(即 start 参数)中指定启动选项。

启动模式 含义 说明
DEFAULT 默认启动模式,协程创建后立即开始调度。 注意,是立即调度,而不是立即执行。也就是说协程在执行前可能被取消。例如,在调用 launch 后,立刻调用 cancel
DEFAULT 是饿汉式启动,launch 调用后,会立即进入待调度状态,一旦调度器 OK 就可以开始执行。如果协程在执行前被取消,则会直接进入取消响应的状态。
LAZY 懒启动模式,启动后并不会有任何调度行为,直到我们需要它执行的时候才会产生调度。 包括主动调用该协程的 startjoin 或者 await 等函数时才会开始调度。如果协程在调度前就被取消,那么协程将直接进入异常结束状态(即 Cancelled 状态)。
ATOMIC 类似 DEFAULT,区别在于协程在开始执行之前,是无法取消的。即使协程之前已经被取消,它也会开始执行。 DEFAULT 相比,虽然同为立即调度,但 ATOMIC 将调度和执行两个步骤合二为一,就像它的名字一样,保证其调度和执行是原子操作,确保协程一定会执行,无论协程之前是否被取消过。
注意,该启动模式目前仍处于“实验阶段”。(ExperimentalCoroutinesApi)
UNDISPATCHED 类似 ATOMIC,立即执行协程,直到在当前线程中遇到第一个挂起点。即使协程之前已经被取消,它也会开始执行。 是立即执行,因此协程一定会执行。即使协程已经被取消,它也会开始执行,但不同之处在于它在同一个线程中开始执行。

这些启动模式的设计主要是为了应对某些特殊的场景,不过在业务开发实践中通常使用 DEFAULTLAZY 这两个启动模式就足够了。

示例1:DEFAULT 模式,使用 launch 调度协程后,立刻取消该协程,则很有可能该协程不会被执行。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
fun main(): Unit = runBlocking {
val job = launch(start = CoroutineStart.ATOMIC) {
try {
println("in job")
delay(100)
println("job done")
} catch (e: CancellationException) {
println("I'm be cancelled.")
}
}
job.invokeOnCompletion { println("invokeOnCompletion cause: ${it?.message}") }

println("1 job=$job")
job.cancel()
println("2 job=$job")

delay(1000)
println("program done")
}

运行结果如下:

1
2
3
4
1 job=StandaloneCoroutine{Active}@77468bd9
2 job=StandaloneCoroutine{Cancelling}@77468bd9
invokeOnCompletion cause: StandaloneCoroutine was cancelled
program done

示例2:仅将上述代码中的启动模式改成 ATOMIC,则协程一定会被启动。运行结果如下:

1
2
3
4
5
6
1 job=StandaloneCoroutine{Active}@7a7b0070
2 job=StandaloneCoroutine{Cancelling}@7a7b0070
in job
I'm be cancelled.
invokeOnCompletion cause: StandaloneCoroutine was cancelled
program done

示例3:仅将上述代码中的启动模式改成 UNDISPATCHED,则协程一定会被启动。运行结果如下:

1
2
3
4
5
6
in job
1 job=StandaloneCoroutine{Active}@6842775d
2 job=StandaloneCoroutine{Cancelling}@6842775d
I'm be cancelled.
invokeOnCompletion cause: StandaloneCoroutine was cancelled
program done

取消协程

job.cancel() 用于取消协程。详见上文“Job 接口及方法介绍”中关于“cancel() 函数”的讲解。

因为 cancel() 函数调用后会立刻返回而不是等待协程结束后再返回,但此时协程不一定就已经停止运行了。如果需要确保协程结束后再执行后续代码,就需要再调用 join() 方法来等待协程执行完毕。也可以通过调用 Job 的扩展函数 cancelAndJoin() 来完成相同操作,它会先执行 cancel 再执行 join

这里需要注意的是,协程作用域一旦变成 Cancelled 状态(这意味着该作用域内的协程都将被取消),就无法再使用该作用域启动新的协程了,除非将启动模式设置成 ATOMICUNDISPATCHED。可以回看前面“启动模式”这部分。以下示例可以说明这一点:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
fun main(): Unit = runBlocking {
val scope = CoroutineScope(Job())
var job = scope.launch {
println("I'll be started.")
delay(300)
}
job.cancelAndJoin()
scope.cancel()
delay(100)
println("1 scope=$scope job isCompleted=${job.isCompleted} isCancelled=${job.isCancelled}")
// -----
job = scope.launch(start = CoroutineStart.DEFAULT) {
println("I won't be started due to cancelled scope.")
delay(300)
}
delay(100)
println("2 scope=$scope job isCompleted=${job.isCompleted} isCancelled=${job.isCancelled}")
// -----
job = scope.launch(start = CoroutineStart.ATOMIC) {
println("I'll be started although I'm cancelled before. Started by ATOMIC mode.")
delay(300)
}
job.cancelAndJoin()
scope.cancel()
delay(100)
println("3 scope=$scope job isCompleted=${job.isCompleted} isCancelled=${job.isCancelled}")
// -----
job = scope.launch(start = CoroutineStart.UNDISPATCHED) {
println("I'll be started although I'm cancelled before. Started by UNDISPATCHED mode.")
delay(300)
}
job.cancelAndJoin()
scope.cancel()
delay(100)
println("4 scope=$scope job isCompleted=${job.isCompleted} isCancelled=${job.isCancelled}")
// -----
delay(1000)
println("program exit")
}

运行结果如下:

1
2
3
4
5
6
7
8
I'll be started.
1 scope=CoroutineScope(coroutineContext=JobImpl{Cancelled}@1bc6a36e) job isCompleted=true isCancelled=true
2 scope=CoroutineScope(coroutineContext=JobImpl{Cancelled}@1bc6a36e) job isCompleted=true isCancelled=true
I'll be started although I'm cancelled before. Started by ATOMIC mode.
3 scope=CoroutineScope(coroutineContext=JobImpl{Cancelled}@1bc6a36e) job isCompleted=true isCancelled=true
I'll be started although I'm cancelled before. Started by UNDISPATCHED mode.
4 scope=CoroutineScope(coroutineContext=JobImpl{Cancelled}@1bc6a36e) job isCompleted=true isCancelled=true
program exit

协程可能无法取消

并不是所有协程都可以响应取消操作,协程的取消操作是需要协作 (cooperative) 完成的,协程必须协作才能被取消。协程库中的所有挂起函数都是可取消的,它们在运行前检查协程是否被取消了,并在取消时抛出 CancellationException 从而结束整个任务。而如果协程在执行计算任务前没有判断自身是否已被取消的话,此时就无法取消协程。

所以即使以下代码主动取消了协程,协程也只会在完成既定循环后才结束运行,因为协程没有在每次循环前先进行检查,导致任务不受取消操作的影响。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
fun main() = runBlocking {
val startTime = System.currentTimeMillis()
val job = launch(Dispatchers.Default) {
var nextPrintTime = startTime
var i = 0
while (i < 5) { // while 内部没有判断协程状态,因此并不会响应 cancel() 操作。
if (System.currentTimeMillis() >= nextPrintTime) {
println("job: I'm sleeping ${i++} ...")
nextPrintTime += 500L
}
}
}
delay(1300L)
println("main: I'm tired of waiting!")
job.cancelAndJoin()
println("main: Now I can quit.")
}

为了实现取消协程的目的,就需要为上述代码加上判断协程是否还处于可运行状态的逻辑,当不可运行时就主动退出协程。isActive 是 CoroutineScope 的扩展属性,就用于判断协程是否还处于可运行状态。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
fun main() = runBlocking {
val startTime = System.currentTimeMillis()
val job = launch(Dispatchers.Default) {
var nextPrintTime = startTime
var i = 0
while (i < 5) {
if (isActive) { // 判断协程是否处于活跃状态
if (System.currentTimeMillis() >= nextPrintTime) {
println("job: I'm sleeping ${i++} ...")
nextPrintTime += 500L
}
} else {
return@launch
}
}
}
delay(1300L)
println("main: I'm tired of waiting!")
job.cancelAndJoin()
println("main: Now I can quit.")
}

取消协程这个操作类似于在 Java 中调用 Thread.interrupt() 方法来向线程发起中断请求,这两个操作都不会强制停止协程和线程,外部只是相当于发起一个停止运行的请求,需要依靠协程和线程响应请求后主动停止运行。

Java 和 Kotlin 之所以均没有提供一个可以直接强制停止线程或协程的方法,是因为这个操作可能会带来各种意想不到的情况。例如,在停止线程或协程的时候,它们可能还持有着某些排他性资源(例如:锁,数据库连接),如果强制性地停止,它们持有的锁就会一直无法得到释放,导致其它线程或协程一直无法得到目标资源,最终就可能导致线程死锁。所以 Thread.stop() 方法目前也是处于废弃状态,Java 官方并没有提供一个可靠的停止线程的方法。

用 finally 释放资源

可取消的挂起函数在取消时会抛出 CancellationException,可以依靠 try {...} finally {...} 或者 Kotlin 的 use 函数在取消协程后释放持有的资源。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
fun main(args: Array<String>) {
println("=====> Program start. <=====")
runBlocking {
println("runBlocking - start")
val job = launch {
try {
repeat(1000) { i ->
println("job: I'm sleeping $i ...")
delay(500L)
}
} catch (e: Throwable) {
println(e.message)
} finally {
println("job: I'm running finally")
}
}
delay(1300L)
println("main: I'm tired of waiting!")
job.cancelAndJoin()
println("runBlocking - end")
}
println("=====> Program exit. <=====")
}

运行结果如下:

1
2
3
4
5
6
7
8
9
10
=====> Program start. <=====
runBlocking - start
job: I'm sleeping 0 ...
job: I'm sleeping 1 ...
job: I'm sleeping 2 ...
main: I'm tired of waiting!
StandaloneCoroutine was cancelled
job: I'm running finally
runBlocking - end
=====> Program exit. <=====

NonCancellable

如果在上一个例子中的 finally 块中再调用挂起函数的话,将会导致抛出 CancellationException,因为此时协程已经被取消了。通常我们并不会遇到这种情况,因为常见的资源释放操作都是非阻塞的,且不涉及任何挂起函数。但在极少数情况下我们需要在取消的协程中再调用挂起函数,此时可以使用 withContext 函数和 NonCancellable 上下文将相应的代码包装在 withContext(NonCancellable) {...} 代码块中,NonCancellable 就用于创建一个无法取消的协程作用域。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
fun main(args: Array<String>) {
println("=====> Program start. <=====")
runBlocking {
println("runBlocking - start")

val launchA = launch {
try {
repeat(5) {
delay(50)
println("launchA-$it")
}
} finally {
delay(50)
println("launchA isCompleted")
}
}
val launchB = launch {
try {
repeat(5) {
delay(50)
println("launchB-$it")
}
} finally {
withContext(NonCancellable) {
delay(50)
println("launchB isCompleted")
}
}
}
//延时200毫秒,保证两个协程都已经被启动了
delay(200)
launchA.cancel()
launchB.cancel()

println("runBlocking - end")
}
println("=====> Program exit. <=====")
}

输出结果如下:

1
2
3
4
5
6
7
8
9
10
11
=====> Program start. <=====
runBlocking - start
launchA-0
launchB-0
launchA-1
launchB-1
launchA-2
launchB-2
runBlocking - end
launchB isCompleted
=====> Program exit. <=====

传播取消操作

一般情况下,协程的取消操作会通过协程的层次结构来进行传播:如果取消父协程或者父协程抛出异常,那么子协程都会被取消;而如果子协程被取消,则不会影响同级协程和父协程,但如果子协程抛出异常则也会导致同级协程和父协程被取消。

对于以下代码,子协程 job1 被取消并不影响子协程 job2 和父协程继续运行,但父协程被取消后子协程都会被递归取消。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
fun main(args: Array<String>) {
println("=====> Program start. <=====")
runBlocking {
println("runBlocking - start")

val request = launch {
val job1 = launch {
repeat(10) {
delay(300)
println("job1: $it")
if (it == 2) {
println("job1 canceled")
cancel()
}
}
}
val job2 = launch {
repeat(10) {
delay(300)
println("job2: $it")
}
}
}
delay(1600)
println("parent job canceled")
request.cancel()
delay(1000)

println("runBlocking - end")
}
println("=====> Program exit. <=====")
}

输出结果如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
=====> Program start. <=====
runBlocking - start
job1: 0
job2: 0
job1: 1
job2: 1
job1: 2
job1 canceled
job2: 2
job2: 3
job2: 4
parent job canceled
runBlocking - end
=====> Program exit. <=====

withTimeout

withTimeout 函数用于指定协程的运行超时时间,如果超时则会抛出 TimeoutCancellationException,从而令协程结束运行。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
fun main(args: Array<String>) {
println("=====> Program start. <=====")
runBlocking {
println("runBlocking - start")

val result = withTimeout(300) {
repeat(5) {
delay(100)
}
200
}
println(result)

println("runBlocking - end")
}
println("=====> Program exit. <=====")
}

输出结果:

1
2
3
4
=====> Program start. <=====
runBlocking - start
Exception in thread "main" kotlinx.coroutines.TimeoutCancellationException: Timed out waiting for 300 ms
...

withTimeout 方法抛出的 TimeoutCancellationExceptionCancellationException 的子类,之前我们并未在输出日志上看到关于 CancellationException 这类异常的堆栈信息,这是因为对于一个已取消的协程来说,CancellationException 被认为是触发协程结束的正常原因。但对于 withTimeout 方法来说,抛出异常是其上报超时情况的一种手段,所以该异常不会被协程内部消化掉。

如果不希望因为异常导致协程结束,可以改用 withTimeoutOrNull 方法,如果超时就会返回 null

协程的异常处理

异常的处理流程

当一个协程由于异常而执行失败时,那么该异常会传递给它的父协程。接下来,父协程会进行如下操作:

  • 取消它自己的所有子协程
  • 取消它自己
  • 将异常继续向上传播

异常会一直传播到最外层,且当前协程作用域所启动的所有与之有关系的协程都会被取消。注意“与之有关系的协程”这一点,详见“结构化并发机制的注意事项”中的说明。

注意:要时刻牢记这里讲到的,遇到异常时协程的处理流程。这一点非常非常重要

处理 launch 和 async 异常

我们可以通过如下策略来处理子协程的异常:

  • 使用 launch 启动的子协程,可以通过在其内部 try...catch 或使用 CoroutineExceptionHandler 上下文来捕获异常。
  • 使用 async 启动的子协程,可以通过捕获 Deferred.await 的异常进行处理。

处理 launch 异常示例:

1
2
3
4
5
6
7
8
9
10
// Catch any uncaught exceptions.
val handler = CoroutineExceptionHandler { _, err -> println("handle error: $err") }
val scope = CoroutineScope(Job() + handler)
scope.launch {
try {
codeWithExceptions()
} catch (e: IllegalAccessError) {
// Handle specific exception
}
}

处理 async 异常示例(注意,调用 await() 时要捕获 async 所有可能产生的异常,否则还是会导致程序崩溃。):

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
coroutineScope {
val scope = CoroutineScope(Job())
val deferred = scope.async {
// In this example, only throwing the following two exceptions won't
// cause program to crash.
throw IllegalAccessError("oops...") // or IllegalStateException("oops...")
// Throwing any other exceptions will cause the program to crash.
}
try {
deferred.await()
} catch (e: IllegalAccessError) {
// Handle any exceptions that may arise, otherwise the program will crash.
} catch (e: IllegalStateException) {
// Handle any exceptions that may arise, otherwise the program will crash.
}
}

我们再来看一个在 Android 中捕获协程异常的实例:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
class CoroutineActivity : BaseDemonstrationActivity<ActivityCoroutineBinding>() {
override fun getTagName(): String = ITAG

override fun getViewBinding(savedInstanceState: Bundle?): ActivityCoroutineBinding {
return ActivityCoroutineBinding.inflate(layoutInflater)
}

private val mainScope = MainScope()

override fun onCreate(savedInstanceState: Bundle?) {
super.onCreate(savedInstanceState)
mainScope.launch(Dispatchers.Default) {
delay(100)
LogContext.log.i(tag, "child 1")
}
mainScope.launch(Dispatchers.Default) {
delay(150)
LogContext.log.i(tag, "child 2")
throw AssertionError("child 2 exception")
}
mainScope.launch(Dispatchers.Default) {
delay(200)
LogContext.log.i(tag, "child 3")
}
}

override fun onDestroy() {
mainScope.cancel()
super.onDestroy()
}
}

运行结果如下:

1
2
3
4
5
6
7
8
11:09:22.623  5426-9234  LEO-Coro...ctivity  I  child 1
11:09:22.673 5426-9234 LEO-Coro...ctivity I child 2
11:09:22.723 5426-9236 LEO-Coro...ctivity I child 3
11:09:22.770 5426-9234 AndroidRuntime E FATAL EXCEPTION: DefaultDispatcher-worker-1
Process: com.leovp.demo.dev, PID: 5426
java.lang.AssertionError: child 2 exception
...异常信息略...
---------------------------- PROCESS ENDED (5426) for package com.leovp.demo.dev ----------------------------

之前讲过,MainScope() 的上下文是 SupervisorJob() + Dispatchers.Main,因此 child 2 抛出异常后,并不影响 mainScope 其它协程的运行,但是应用依然崩溃了,这是因为 child 2 抛出的异常我们并没有处理。

让我们稍微修改下代码,添加上异常处理:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
class CoroutineActivity : BaseDemonstrationActivity<ActivityCoroutineBinding>() {
override fun getTagName(): String = ITAG

override fun getViewBinding(savedInstanceState: Bundle?): ActivityCoroutineBinding {
return ActivityCoroutineBinding.inflate(layoutInflater)
}

private val mainScope = MainScope()
// 添加异常处理上下文
private val exceptionHandler = CoroutineExceptionHandler { _, err ->
LogContext.log.e(tag, "Cause: $err")
}

override fun onCreate(savedInstanceState: Bundle?) {
super.onCreate(savedInstanceState)
mainScope.launch(Dispatchers.Default) {
delay(100)
LogContext.log.i(tag, "child 1")
}
mainScope.launch(Dispatchers.Default + exceptionHandler) { // 追加异常处理
delay(150)
LogContext.log.i(tag, "child 2")
throw AssertionError("child 2 exception")
}
mainScope.launch(Dispatchers.Default) {
delay(200)
LogContext.log.i(tag, "child 3")
}
}

override fun onDestroy() {
mainScope.cancel()
super.onDestroy()
}
}

程序运行结果如下:

1
2
3
4
11:13:16.383  8354-10444 LEO-Coro...ctivity  I  child 1
11:13:16.433 8354-10444 LEO-Coro...ctivity I child 2
11:13:16.434 8354-10444 LEO-Coro...ctivity E Cause: java.lang.AssertionError: child 2 exception
11:13:16.483 8354-10444 LEO-Coro...ctivity I child 3

这次,应用并没有崩溃,是我们想要的结果。

launch 和 async 处理异常的差异

看过了上面的讲解,我们知道协程并非都是一发现异常就执行前面讲过的“异常的处理流程”。launchasync 在处理异常方面有着一些差异(详见官方文档):

  • launch 将异常视为未捕获异常,类似于 Java 的 Thread.uncaughtExceptionHandler。发现异常时会立刻抛出
  • async 默认情况下不会抛出异常,直到调用 async.await() 获取结果时才会抛出存在的异常。这意味着如果使用 async 启动新的协程,它会静默地将异常丢弃。

例如,以下的 fetchDocs() 方法由于并没有调用 Deferred.await(),因此异常并不会被抛给调用方。如果使用的是 launch 而非 async 的话,则异常会被立刻抛出。

1
2
3
4
5
6
7
8
9
private val ioScope = CoroutineScope(Dispatchers.IO)

private fun fetchDocs() {
ioScope.async {
delay(500)
log("taskA throw AssertionError")
throw AssertionError() // 此处异常不会被抛出,会被丢弃,直到调用 await() 时,才会将异常抛出。
}
}

async 处理异常值得注意的地方

launch 遇到异常就抛出,因此需要特别注意的地方并不多(详见下文“重要说明”部分)。不过 async 在处理异常方面,却有一些需要注意的地方。

  • async 虽然不会立刻抛出异常,但是异常的传播机制没有改变。这一点前面已经讲过了。
  • async 产生异常后,由于异常传播机制,会导致其所在的协程作用域也被取消。

让我们分别用几个示例来讲解这两点注意事项。

让我们先回顾下刚才讲过的一个示例:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
coroutineScope {
val scope = CoroutineScope(Job())
val deferred = scope.async {
// In this example, only throwing the following two exceptions won't
// cause program to crash.
throw IllegalAccessError("oops...") // or IllegalStateException("oops...")
// Throwing any other exceptions will cause the program to crash.
}
try {
deferred.await()
} catch (e: IllegalAccessError) {
// Handle any exceptions that may arise, otherwise the program will crash.
} catch (e: IllegalStateException) {
// Handle any exceptions that may arise, otherwise the program will crash.
}
}

注意:上述代码在 deferred 协程启动后,async 内部产生了 IllegalAccessError 异常,但该异常不会被立刻抛出,因此不会导致程序崩溃。但是按照刚才讲的异常处理流程可知,deferred 会先被取消,异常会继续向上传播,导致 scope 也被取消,因此它们俩的状态就都变成了 Cancelled 状态。而处于 Cancelled 状态的 scope 是无法再启动新协程的,除非将启动模式设置成 ATOMICUNDISPATCHED。这一点在“取消协程”一节中也提到过。

要牢记本小节开头讲到的“异常处理流程”。**async 虽然不会立刻抛出异常,但是异常的传播机制并没有改变。** 也就是说 async 出现了异常,会将该异常继续向上传播,因此上例中的 scope 才会被取消。

为了加深理解,我们再来看一个示例:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
fun main() = runBlocking {
supervisorScope { // 注意,这里使用的是 supervisorScope
val deferred = async { throw IllegalAccessError("custom exception") }
try {
delay(100)
println("I'm the check point.")
deferred.await()
} catch (e: IllegalAccessError) {
println("Caught custom exception. cause: $e")
}
launch {
println("Guess whether I will be printed?")
}
}
delay(1000)
println("program exit")
}

你能回答出写上述示例的执行结果吗?请选中如下空白行查看结果。

I'm the check point.
Caught custom exception. cause: java.lang.IllegalAccessError: custom exception
Guess whether I will be printed?
program exit

接下来,我们仅将上述示例中的 supervisorScope 替换成 coroutineScope,请您再次回答执行结果。

Exception in thread "main" java.lang.IllegalAccessError: custom exception
...其它异常信息略...

您都回答正确了吗?若没有,那让我们来分析下原因吧。

先来详细分析第一个示例:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
fun main() = runBlocking {
supervisorScope { // 注意,这里使用的是 supervisorScope
val deferred = async { throw IllegalAccessError("custom exception") }
try {
delay(100)
println("I'm the check point.")
deferred.await()
} catch (e: IllegalAccessError) {
println("Caught custom exception. cause: $e")
}
launch {
println("Guess whether I will be printed?")
}
}
delay(1000)
println("program exit")
}

deferred 协程启动后产生了 IllegalAccessError 异常,但该异常不会被立刻抛出,而是将其继续向上传播至 supervisorScope。而 supervisorScope单向取消,也就是会阻止异常继续向上传递,仅取消出错的协程,因此只有 deferred 被取消了。

而后程序继续执行到 try 块,正常打印 I'm the check point.。当执行到 deferred.await() 处,此时才能捕获到 async 抛出的异常并打印 Caught custom exception. cause: xxx

同样由于 supervisorScope单向取消特性才让接下来的 launch 得以正常启动,并打印出 Guess whether I will be printed?。之后,程序在等待一秒后,输出 program exit,程序正常结束。

接下来我们一起再详细分析下,将示例中的 supervisorScope 替换成 coroutineScope 后的执行结果。

supervisorScope 替换成 coroutineScope 后,意味着失去了单向取消特性。deferred 协程启动后产生的 IllegalAccessError 异常先向上传播至 coroutineScope 使其变成 Cancelling 状态(因为其尚未执行完)。此时程序执行到了 try 块,由于异常导致 coroutineScope 处于取消中状态,因此 delay(100) 函数会报 CancellationException 异常,而我们并没有捕获该异常,因此导致程序崩溃并结束,后面所有的代码便没有机会执行。

如果我们尝试捕获 delay(100) 函数产生的 CancellationException 异常的话:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
fun main() = runBlocking {
coroutineScope { // 注意,这里使用的是 coroutineScope
val deferred = async { throw IllegalAccessError("custom exception") }
try {
delay(100)
println("I'm the check point.")
deferred.await()
} catch (e: IllegalAccessError) {
println("Caught custom exception. cause: $e")
} catch (e: CancellationException) { // 这里捕获 delay(100) 产生的 CancellationException。
println("coroutineScope is cancelling. cause: $e")
}
launch {
println("Guess whether I will be printed?")
}
}
delay(1000)
println("program exit")
}

其执行结果如下:

1
2
3
coroutineScope is cancelling. cause: kotlinx.coroutines.JobCancellationException: ScopeCoroutine is cancelling; job=ScopeCoroutine{Cancelling}@5b1d2887
Exception in thread "main" java.lang.IllegalAccessError: custom exception
...其它异常信息略...

可以看到 CancellationException 被捕获了。之后程序继续执行到 launch 处,刚才已经讲过了,此时的 coroutineScope 由于异常导致其处于取消中状态,因此 launch 无法被启动(注意,launch 只是无法被启动,并不会导致异常),所以没有打印出 Guess whether I will be printed?

随后 coroutineScope 块执行结束,由于 coroutineScope 的取消是由异常导致的,因此异常会继续向上传播,最终导致程序崩溃并结束。

使用 CoroutineExceptionHandler

如果想主动捕获异常信息,可以使用 CoroutineExceptionHandler 来处理那些未被捕获的异常

作为协程的上下文元素之一,我们可以在这里进行自定义日志记录或异常处理,它类似于对线程使用 Thread.uncaughtExceptionHandler

之前已经讲过了 asynclaunch 在处理异常上的差别,通过它们的差异我们可知下面示例中的 async 并不会触发异常,因为没有调用 await()

以下代码只会捕获到 launch 抛出的异常:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
fun main() = runBlocking {
println("runBlocking - start")
val handler = CoroutineExceptionHandler { _, exception ->
println("Caught: $exception")
}
GlobalScope.launch(handler) {
throw AssertionError() // 此处异常会被正常抛出,可以被 CoroutineExceptionHandler 正常捕获。
}
GlobalScope.async(handler) { // CoroutineExceptionHandler 对 async 是无效的。下文会讲解。
throw ArithmeticException() // 由于没有调用 await(),所以此处异常不会被抛出。
}
delay(100)
println("runBlocking - end")
}

运行结果:

1
2
3
runBlocking - start
Caught java.lang.AssertionError
runBlocking - end

需要注意的是,如果 async 内部发生了异常且没有捕获时,那么调用 async.await() 依然会导致应用崩溃。详见下面的“重要说明”。

重要说明

关于 CoroutineExceptionHandler 有两点需要特别特别注意:

  1. CoroutineExceptionHandlerasync 是无效的。async 产生的异常,只有在调用 await() 时才会被抛出。 官方文档里有专门讲解。

In addition to that, async builder always catches all exceptions and represents them in the resulting Deferred object, so its CoroutineExceptionHandler has no effect either.

例如:

1
2
3
4
5
val handler = CoroutineExceptionHandler { _, err -> println("handle error: $err") }
val cs = CoroutineScope(SupervisorJob())
cs.async(handler) { // 此处的 CoroutineExceptionHandler 是无效的。
throw AssertionError("exception by manual") // 异常只有在调用 await() 时才会被抛出。
}
  1. 在子协程的 launch 方法上直接使用 CoroutineExceptionHandler 同样是无效的。官方文档里也有专门讲解。

In particular, all children coroutines (coroutines created in the context of another Job) delegate handling of their exceptions to their parent coroutine, which also delegates to the parent, and so on until the root, so the CoroutineExceptionHandler installed in their context is never used.

例如:

1
2
3
4
5
6
7
val handler = CoroutineExceptionHandler { _, err -> println("handle error: $err") }
val cs = CoroutineScope(SupervisorJob())
cs.launch {
launch(handler) { // 此处的 CoroutineExceptionHandler 是无效的。
throw AssertionError("exception by manual")
}
}

“引言”中的“示例10”可以证明以上两点。

使用 SupervisorJob

上文在讲解 Job 时,已经详细讲解过 SupervisorJob。这里再简要说明下和异常处理相关的话题。

由于异常导致的取消在协程中是一种双向关系,会在整个协程层次结构中传播,那如果我们需要的是单向取消该怎么实现呢?

例如,假设在 Activity 中启动了多个协程,如果单个协程所代表的子任务失败了,此时并不一定需要连锁终止整个 Activity 内部的所有其它协程任务,即此时希望子协程的异常不会传播给同级协程和父协程。而当 Activity 退出后,父协程的异常(即 CancellationException)又应该连锁传播给所有子协程,终止所有子协程。

可以使用 SupervisorJob 来实现上述效果,取消操作只会向下传播,一个子协程的运行失败不会影响到同级协程和父协程。

例如,以下示例中 firstChild 抛出的异常不会导致 secondChild 被取消,但当 supervisor 被取消时 secondChild 也被同时取消了。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
fun main() = runBlocking {
println("runBlocking - start")
val supervisor = SupervisorJob()
val cs = CoroutineScope(coroutineContext + supervisor)
val firstChild = cs.launch(CoroutineExceptionHandler { _, _ -> }) {
println("First child is failing")
throw AssertionError("First child is cancelled")
}
val secondChild = cs.launch {
firstChild.join()
println("First child is cancelled: ${firstChild.isCancelled}, but second one is still active")
try {
delay(Long.MAX_VALUE)
} finally {
println("Second child is cancelled because supervisor is cancelled")
}
}
firstChild.join()
println("Cancelling supervisor")
// 取消所有协程
supervisor.cancel()
secondChild.join()
println("runBlocking - end")
}

运行结果如下:

1
2
3
4
5
6
runBlocking - start
First child is failing
First child is cancelled: true, but second one is still active
Cancelling supervisor
Second child is cancelled because supervisor is cancelled
runBlocking - end

不过,如果异常没有被处理且 CoroutineContext 没有包含一个 CoroutineExceptionHandler 的话,异常会到达默认线程的 ExceptionHandler。在 JVM 中,异常会被打印在控制台,而在 Android 中,无论异常在哪个 Dispatcher 中发生,都会直接导致应用崩溃。所以如果上述例子中移除了 firstChild 包含的 CoroutineExceptionHandler 的话,在 JVM 控制台就会看到异常堆栈了:

1
2
3
4
5
6
7
8
runBlocking - start
First child is failing
Exception in thread "main" java.lang.AssertionError: First child is cancelled
...异常信息略...
First child is cancelled: true, but second one is still active
Cancelling supervisor
Second child is cancelled because supervisor is cancelled
runBlocking - end

使用 supervisorScope

使用 supervisorScope 同样可以达到单向取消的效果。之前在讲解 SupervisorScope 中也提及到相关的使用方法,这里再简要回顾下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
fun main() {
val cs = CoroutineScope(Job())
val parentJob = cs.launch {
supervisorScope { // 使用 supervisorScope
val childJob1 = launch {
delay(100)
throw AssertionError("exception from child job 1")
}
val childJob2 = launch {
delay(200)
log("Child 2 is done...")
}
}
}
Thread.sleep(1000)
}

运行结果如下:

1
2
3
Exception in thread "DefaultDispatcher-worker-2" java.lang.AssertionError: exception from child job 1
...其它异常信息略...
17:18:53.563 Child 2 is done...

可以看到 childJob2 被没有受到 childJob1 异常的影响。但如果将上述代码中的 supervisorScope 换成 coroutineScope 的话,则 childJob2 会直接被取消。

在 Android 中使用协程的最佳实践

https://developer.android.com/kotlin/coroutines/coroutines-best-practices?hl=zh-cn

注入调度程序

在创建新协程或调用 withContext 时,请勿对 Dispatchers 进行硬编码。

1
2
3
4
5
6
7
8
9
10
11
12
// DO inject Dispatchers
class NewsRepository(
private val defaultDispatcher: CoroutineDispatcher = Dispatchers.Default
) {
suspend fun loadNews() = withContext(defaultDispatcher) { /* ... */ }
}

// DO NOT hardcode Dispatchers
class NewsRepository {
// DO NOT use Dispatchers.Default directly, inject it instead
suspend fun loadNews() = withContext(Dispatchers.Default) { /* ... */ }
}

这种依赖项注入模式可以降低测试难度,因为您可以使用测试调度程序替换单元测试和插桩测试中的这些调度程序,以提高测试的确定性。

注意ViewModel 类的 viewModelScope 属性已硬编码为 Dispatchers.Main。通过调用 Dispatchers.setMain 并传入测试调度程序,在测试中替换该调度程序。

挂起函数应该能够安全地从主线程调用

挂起函数应该是主线程安全的,这意味着,您可以安全地从主线程调用挂起函数。如果某个类在协程中执行长期运行的阻塞操作,那么该类负责使用 withContext 将执行操作移出主线程。这适用于应用中的所有类,无论其属于架构的哪个部分都不例外。

ViewModel 应创建协程

ViewModel 类应首选创建协程,而不是公开挂起函数来执行业务逻辑。如果只需要发出一个值,而不是使用数据流公开状态,ViewModel 中的挂起函数就会非常有用。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
// DO create coroutines in the ViewModel
class LatestNewsViewModel(
private val getLatestNewsWithAuthors: GetLatestNewsWithAuthorsUseCase
) : ViewModel() {

private val _uiState = MutableStateFlow<LatestNewsUiState>(LatestNewsUiState.Loading)
val uiState: StateFlow<LatestNewsUiState> = _uiState

fun loadNews() {
viewModelScope.launch {
val latestNewsWithAuthors = getLatestNewsWithAuthors()
_uiState.value = LatestNewsUiState.Success(latestNewsWithAuthors)
}
}
}

// Prefer observable state rather than suspend functions from the ViewModel
class LatestNewsViewModel(
private val getLatestNewsWithAuthors: GetLatestNewsWithAuthorsUseCase
) : ViewModel() {
// DO NOT do this. News would probably need to be refreshed as well.
// Instead of exposing a single value with a suspend function, news should
// be exposed using a stream of data as in the code snippet above.
suspend fun loadNews() = getLatestNewsWithAuthors()
}

视图不应直接触发任何协程来执行业务逻辑,而应将这项工作委托给 ViewModel。这样一来,业务逻辑就会变得更易于测试,因为可以对 ViewModel 对象进行单元测试,而不必使用测试视图所必需的插桩测试。

此外,如果工作是在 viewModelScope 中启动,您的协程将在配置更改后自动保留。如果您改用 lifecycleScope 创建协程,则必须手动进行处理该操作。如果协程的存在时间需要比 ViewModel 的作用域更长,请查看“在业务和数据层中创建协程”部分

注意:视图应对与界面相关的逻辑触发协程。例如,从互联网提取映像或设置字符串格式。

不要公开可变类型

最好向其他类公开不可变类型。这样一来,对可变类型的所有更改都会集中在一个类中,便于在出现问题时进行调试。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
// DO expose immutable types
class LatestNewsViewModel : ViewModel() {

private val _uiState = MutableStateFlow(LatestNewsUiState.Loading)
val uiState: StateFlow<LatestNewsUiState> = _uiState

/* ... */
}

class LatestNewsViewModel : ViewModel() {

// DO NOT expose mutable types
val uiState = MutableStateFlow(LatestNewsUiState.Loading)

/* ... */
}

数据层和业务层应公开挂起函数和数据流

数据层和业务层中的类通常会公开函数以执行一次性调用,或接收数据随时间变化的通知。这些层中的类应该针对一次性调用公开挂起函数,并公开数据流以接收关于数据更改的通知

1
2
3
4
5
6
7
// Classes in the data and business layer expose
// either suspend functions or Flows
class ExampleRepository {
suspend fun makeNetworkRequest() { /* ... */ }

fun getExamples(): Flow<Example> { /* ... */ }
}

采用该最佳实践后,调用方(通常是演示层)能够控制这些层中发生的工作的执行和生命周期,并在需要时取消相应工作。

在业务层和数据层中创建协程

对于数据层或业务层中因不同原因而需要创建协程的类,它们可以选择不同的选项。

如果仅当用户查看当前屏幕时,要在这些协程中完成的工作才具有相关性,则应遵循调用方的生命周期。在大多数情况下,调用方将是 ViewModel。在这种情况下,应使用 coroutineScopesupervisorScope

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
class GetAllBooksAndAuthorsUseCase(
private val booksRepository: BooksRepository,
private val authorsRepository: AuthorsRepository,
private val defaultDispatcher: CoroutineDispatcher = Dispatchers.Default
) {
suspend fun getBookAndAuthors(): BookAndAuthors {
// In parallel, fetch books and authors and return when both requests
// complete and the data is ready
return coroutineScope {
val books = async(defaultDispatcher) {
booksRepository.getAllBooks()
}
val authors = async(defaultDispatcher) {
authorsRepository.getAllAuthors()
}
BookAndAuthors(books.await(), authors.await())
}
}
}

如果只要应用处于打开状态,要完成的工作就具有相关性,并且此工作不限于特定屏幕,那么此工作的存在时间应该比调用方的生命周期更长。对于这种情况,您应使用外部 CoroutineScope(如“不应取消的工作的协程和模式”这篇博文中所述)。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
class ArticlesRepository(
private val articlesDataSource: ArticlesDataSource,
private val externalScope: CoroutineScope,
private val defaultDispatcher: CoroutineDispatcher = Dispatchers.Default
) {
// As we want to complete bookmarking the article even if the user moves
// away from the screen, the work is done creating a new coroutine
// from an external scope
suspend fun bookmarkArticle(article: Article) {
externalScope.launch(defaultDispatcher) {
articlesDataSource.bookmarkArticle(article)
}
.join() // Wait for the coroutine to complete
}
}

externalScope 应由存在时间比当前屏幕更长的类进行创建和管理,并且可由 Application 类或作用域限定为导航图的 ViewModel 进行管理。

将协程设为可取消

协程取消属于协作操作,也就是说,在协程的 Job 被取消后,相应协程在挂起或检查是否存在取消操作之前不会被取消。如果您在协程中执行阻塞操作,请确保相应协程是可取消的。

例如,如果您要从磁盘读取多个文件,请先检查协程是否已取消,然后再开始读取每个文件。若要检查是否存在取消操作,有一种方法是调用 ensureActive 函数。

1
2
3
4
5
6
someScope.launch {
for(file in files) {
ensureActive() // Check for cancellation
readFile(file)
}
}

kotlinx.coroutines 中的所有挂起函数(例如 withContextdelay)都是可取消的。如果您的协程调用这些函数,您无需执行任何其他操作。

如需详细了解协程取消,请参阅“协程取消”这篇博文

留意异常

未处理协程中抛出的异常可能会导致应用崩溃。如果可能会发生异常,请在使用 viewModelScopelifecycleScope 创建的任何协程主体中捕获相应异常。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
class LoginViewModel(
private val loginRepository: LoginRepository
) : ViewModel() {

fun login(username: String, token: String) {
viewModelScope.launch {
try {
loginRepository.login(username, token)
// Notify view user logged in successfully
} catch (exception: IOException) {
// Notify view login attempt failed
}
}
}
}

注意:如需启用协程取消流程,请不要使用 CancellationException 类型的异常(不要捕获它们,或在被发现时总是重新抛出)。首选捕获特定类型的异常(如 IOException),而不是 ExceptionThrowable 等一般类型。

[^runBlocking]: runBlocking 是一个协程构建器。

参考文献

坚持原创及高品质技术分享,您的支持将鼓励我继续创作!