前言
Kotlin 只在标准库中提供了最低级别的API以使各种其他库能够利用协程。
在 kotlin 中,async
和 await
不是关键字,而且甚至不是标准库的一部分。
此外,与 futures and promises 相比,Kotlin 的挂起函数概念为异步操作提供了更安全且不易出错的抽象。
kotlinx.coroutines
是由 JetBrains 开发的丰富的协程库。 它包含本指南涵盖的许多支持协程的高级原语,包括launch
、async
等。
- 协程不像线程一样存在调度,即系统为线程分配处理器使用权。除了挂起和取消,协程会一直运行直到它完成。
协程基础
协程是可挂起计算的一个实例。 它在概念上类似于线程,因为它需要一个与其余代码同时工作的代码块来运行。
但是,协程并不绑定到任何特定的线程。 它可以在一个线程中暂停执行并在另一个线程中恢复。
1 | fun main() = runBlocking { // this: CoroutineScope |
launch
是一个协程构建器。 它与其余代码同时启动一个新的协程,该协程继续独立工作。 这就是首先打印 Hello 的原因。
delay
是一个特殊的挂起函数。 它将协程暂停特定时间。 挂起协程不会阻塞底层线程,但允许其他协程运行并使用底层线程来执行它们的代码。
runBlocking
也是一个协程构建器,它将常的 fun main()
的非协程世界与 runBlocking { ... }
花括号内的协程代码联系起来。 这在 IDE 中通过以下方式突出显示:在 runBlocking
左大括号之后的 this: CoroutineScope
提示。
runBlocking
的名称意味着运行它的线程在调用期间被阻塞,直到 runBlocking { ... }
中的所有协程完成执行。 你会经常看到在应用程序的最顶层使用 runBlocking
,而在实际代码中很少见,因为线程是昂贵的资源,阻塞它们效率低下,通常是不希望的。
结构化并发
协程遵循结构化并发原则,这意味着新的协程只能在限定协程生命周期的特定 CoroutineScope 中启动。 上面的例子表明 runBlocking 建立了相应的范围,这就是为什么前面的例子等到 World! 在延迟一秒后打印,然后才退出。
在实际应用程序中,您将启动大量协程。 结构化并发确保它们不会丢失并且不会泄漏。 在其所有子协程完成之前,外部作用域无法完成。 结构化并发还确保正确报告代码中的任何错误并且永远不会丢失。
挂起函数可以像常规函数一样在协程中使用,但它们的附加特性是它们可以反过来使用其他挂起函数(如本例中的delay
)来挂起协程的执行。
Scope 构建器
除了不同构建器提供的协程作用域外,还可以使用 coroutineScope
构建器声明自己的作用域。 它创建了一个协程范围,并且在所有启动的子进程完成之前不会完成。
runBlocking 和 coroutineScope 构建器可能看起来很相似,因为它们都在等待自己的 body 及其所有子节点完成。 主要区别在于 runBlocking 方法阻塞当前线程等待,而 coroutineScope 只是挂起,释放底层线程用于其他用途。 由于这个区别,runBlocking 是一个常规函数,而 coroutineScope 是一个挂起函数。
Scope 构建器 和 并发
可以在任何挂起函数中使用 coroutineScope 构建器来执行多个并发操作
显式 job
launch
协程构建器返回一个 Job 对象,该对象是已启动协程的句柄,可用于显式等待其完成。
取消和超时
取消协程执行
在长时间运行的应用程序中,可能需要对后台协程进行细粒度控制。 例如,用户可能已经关闭了启动协程的页面,现在不再需要其结果并且可以取消其操作。 launch
函数返回一个 Job 可以用来取消正在运行的协程:
取消是合作的
协程取消是合作的。 协程代码必须合作才能取消。 kotlinx.coroutines
中的所有挂起函数都是可以取消的。 他们检查协程的取消并在取消时抛出 CancellationException
。 但是,如果协程在计算中工作并且不检查取消,则无法取消。
使计算代码可取消
有两种方法可以使计算代码可取消。 第一个是定期调用检查取消的挂起函数。 有一个 yield
函数,它是一个很好的选择。
另一种是显式检查取消状态。isActive
是通过 CoroutineScope
对象在协程内部可用的扩展属性。
使用 finally 关闭资源
可取消的挂起函数在取消时会抛出 CancellationException,这可以按通常的方式处理。 例如,try {...} finally {...}
表达式和 Kotlin use
函数在协程被取消时正常执行它们的终结操作
运行不可取消的代码块
任何在上一个示例的 finally
块中使用挂起函数的尝试都会导致 CancellationException,因为运行此代码的协程被取消。
通常,这不是问题,因为所有行为良好的关闭操作(关闭文件、取消job或关闭任何类型的通信通道)通常都是非阻塞的,并且不涉及任何挂起功能。
但是,在极少数情况下,当需要在取消的协程中挂起时,可以使用 withContext
函数和 NonCancellable
上下文将相应的代码包装在 withContext(NonCancellable) {...}
中。
超时
取消协程执行的最明显的实际原因是因为它的执行时间已经超过了一些超时。 虽然可以手动跟踪对相应 Job 的引用并启动一个单独的协程以在延迟后取消跟踪的协程,但有一个准备使用的 withTimeout
函数可以做到这一点。
withTimeout
抛出的 TimeoutCancellationException
是 CancellationException
的子类。 我们之前没有在控制台上看到它的堆栈跟踪。 那是因为在取消的协程内部 CancellationException 被认为是协程完成的正常原因。 然而,在这个例子中,我们在 main
函数中使用了 withTimeout。
由于取消只是一个异常,所有资源都以通常的方式关闭。 如果需要专门针对任何类型的超时执行一些附加操作,可以可以将超时代码包装在 try {...} catch (e: TimeoutCancellationException) {...}
块中,或使用 withTimeoutOrNull
函数,它类似于 withTimeout
,但在超时时返回 null
而不是抛出异常
异步超时和资源
withTimeout
中的超时事件相对于在其块中运行的代码是异步的,并且可能随时发生,甚至在从超时块内部返回之前。 如果您在块内打开或获取一些需要在块外关闭或释放的资源,请记住这一点。
组合挂起函数
默认是顺序的
协程中的代码,就像在常规代码中一样,默认是顺序的
使用 async 来并发
从概念上讲,async
就像 launch
。 它启动一个单独的协程,它是一个轻量级线程,可与所有其他协程同时工作。 不同之处在于,launch
返回一个 Job
并且不携带任何结果值,而 async
返回一个 Deferred
- 一个轻量级的非阻塞future,表示稍后提供结果的承诺。 可以在deferred值上使用 .await()
来获得其最终结果,但 Deferred
也是一个 job
,因此可以在需要时取消它。
懒启动的 async
可选地,可以通过将其 start
参数设置为 CoroutineStart.LAZY
来使 async
变得惰性。 在这种模式下,它仅在 await
需要其结果或调用其 Job
的 start
函数时才启动协程。
异步风格的函数
Coroutine context and dispatchers
newSingleThreadContext
创建一个线程供协程运行。 专用线程是非常昂贵的资源。 在实际应用程序中,当不再需要时,必须使用 close
函数释放它,或者将其存储在顶级变量中并在整个应用程序中重用。
Flow
Flow 是类似于 sequence 的 cold stream —— flow builder 内的代码在 collect flow 之前不会运行。
Flow 遵循协程的一般合作取消。 与往常一样,当 flow 在可取消的 suspend 函数(如 delay)中 suspend 时,可以取消 flow collection。
1 | fun simple(): Flow<Int> = flow { |
flow { ... }
builder 是最基本的 builder。 还有其他允许声明 flow 的 builder:
flowOf
构建器定义了发出一组固定值的 flow- 可以使用
.asFlow()
扩展函数将各种集合和 sequence 转换为 flow。
Intermediate flow operator
可以使用 operator 来转换 flow,就像转换集合和 sequence 一样。 Intermediate operator 应用于上游 flow 并返回下游 flow。 这些 operator 是冷酷的,就像 flow 一样。
对此类 operator 的调用本身并不是 suspend 函数。 它工作速度很快,返回新转换了的 flow 的定义。
基本 operator 具有熟悉的名称,例如 map 和 filter。 这些 operator 与 sequence 的一个重要区别是这些运算符内的代码块可以调用 suspend 函数。
Transform operator
在 flow transformation operator 中,最通用的一种称为 transform。 它可用于模仿简单的转换(如 map 和 filter),以及实现更复杂的转换。
使用 transform
operator,可以发出任意次数的任意值。
1 | (1..3).asFlow() // a flow of requests |
1 | Making request 1 |
Size-limiting operator
当达到相应的限制时,诸如 take 之类的 Size-limiting intermediate operator 会取消 flow 的执行。 协程中的取消始终通过抛出异常来执行,以便所有资源管理函数(如 try { ... } finally { ... }
块)在取消时正常运行:
1 | fun numbers(): Flow<Int> = flow { |
1 | 1 |
Terminal flow operators
flow 上的 terminal operator 是开始 flow 的收集的 suspend 函数。 collect
operator 是最基本的 operator,但还有其他 terminal operator,可以使操作变得更容易:
- 转换为各种集合,例如
toList
和toSet
。 - 用于获取第一个值并确保 flow 发出单个值的运算符。
- 使用
reduce
和fold
将 flow 减少到某个值。
flow 是顺序的
除非使用对多个 flow 进行操作的特殊运算符,否则 flow 的每个单独收集都会按顺序执行。 该收集直接在调用 terminal operator 的协程中工作。
默认情况下不会启动新的协程。 每个发出的值都经过从上游到下游的所有 intermediate operator 的处理,然后传递给 terminal operator。
Flow context
flow 的收集始终发生在调用协程的上下文中。 例如,如果有一个 simple
flow,那么以下代码将在该代码作者指定的上下文中运行,而不管 simple
flow 的实现细节如何
flow 的这种属性称为 context preservation。
因此,默认情况下,flow { ... }
构建器中的代码在相应 flow 的收集器提供的上下文中运行。 例如,考虑一个简单函数的实现,该函数打印调用它的线程并发出三个数字:
使用 withContext 时的常见陷阱
但是,长时间运行的 CPU 消耗代码可能需要在 Dispatchers.Default
上下文中执行,UI 更新代码可能需要在 Dispatchers.Main
上下文中执行。
通常,withContext
用于更改使用 Kotlin 协程的代码中的上下文,但 flow { ... }
构建器中的代码必须遵守上下文保留属性,并且不允许从不同的上下文 emit。
flowOn operator
异常是指用于更改 flow emit 上下文的 flowOn
函数。 下面的示例显示了更改 flow 上下文的正确方法,该示例还打印相应线程的名称以显示其工作原理:
1 | fun simple(): Flow<Int> = flow { |
1 | [DefaultDispatcher-worker-1 @coroutine#2] Emitting 1 |
请注意 flow { ... }
如何在后台线程中工作,而收集发生在主线程中:
这里要观察的另一件事是 flowOn
运算符更改了 flow 的默认顺序性质。 现在,收集发生在一个协程(“coroutine#1”)中,发射发生在另一个协程(“coroutine#2”)中,该协程与收集协程并发的另一个线程中运行。
当必须更改上下文中的 CoroutineDispatcher
时, flowOn
运算符会为上游 flow 创建另一个协程。
Buffering
从收集 flow 所需的总时间的角度来看,在不同的协程中运行 flow 的不同部分可能会有所帮助,特别是在涉及长时间运行的异步操作时。
例如,考虑一个 simple
flow 的发射速度很慢的情况,需要 100 毫秒才能产生一个元素; 而且收集器也很慢,处理一个元素需要 300 毫秒。 让我们看看收集这样一个包含三个数字的 flow 需要多长时间:
我们可以在 flow 上使用 buffer operator 来与收集代码并发运行 simple
flow 的发出代码,而不是顺序运行它们:
1 | val time = measureTimeMillis { |
它生成相同数字的速度更快,因为我们有效地创建了一个处理管道,只需等待 100 毫秒即可获取第一个数字,然后仅花费 300 毫秒来处理每个数字。 这样运行大约需要 1000 毫秒:
请注意,flowOn
运算符在必须更改 CoroutineDispatcher
时使用相同的缓冲机制,但这里我们显式请求缓冲而不更改执行上下文。
合并
当 flow 表示操作或操作状态更新的部分结果时,可能不需要处理每个值,而只需处理最近的值。 在这种情况下,当收集器处理速度太慢时,可以使用 conflate
运算符来跳过中间值。
1 | val time = measureTimeMillis { |
处理最新值
当发射器和收集器都很慢时,合并是加速处理的一种方法。 它通过删除发出的值来实现这一点。 另一种方法是取消慢速收集器并在每次发出新值时重新启动它。
有一系列 xxxLatest
运算符,它们执行与 xxx 运算符相同的基本逻辑,但在新值上取消其块中的代码。 让我们尝试在前面的示例中将conflate更改为collectLatest:
1 | val time = measureTimeMillis { |
由于 collectLatest 的主体需要300毫秒,但每100毫秒发出一次新值,我们看到该块在每个值上运行,但仅针对最后一个值完成
组合多个 flow
zip
就像 Kotlin 标准库中的 Sequence.zip
扩展函数一样,flow 有一个 zip
运算符,用于组合两个 flow 的相应值:
1 | val nums = (1..3).asFlow() // numbers 1..3 |
Combine
当 flow 表示变量或操作的最新值时,可能需要执行取决于相应 flow 的最新值的计算,并在任何上游 flow 发出一个值时重新计算它 。 相应的运算符系列称为 combine
。
例如,如果上一个示例中的数字每 300 毫秒更新一次,但字符串每 400 毫秒更新一次,则使用 zip
运算符对它们进行压缩仍将产生相同的结果,尽管结果每 400 毫秒打印一次:
1 | val nums = (1..3).asFlow().onEach { delay(300) } // numbers 1..3 every 300 ms |
1 | 1 -> one at 441 ms from start |
当在这里使用 combine
运算符而不是 zip
时:
1 | val nums = (1..3).asFlow().onEach { delay(300) } // numbers 1..3 every 300 ms |
1 | 1 -> one at 433 ms from start |
扁平化 flow
flow 表示异步接收的值序列,因此很容易陷入每个值触发对另一个值序列的请求的情况。 例如,我们可以使用以下函数返回两个相隔 500 毫秒的字符串 flow:
1 | fun requestFlow(i: Int): Flow<String> = flow { |
现在,如果我们有一个包含三个整数的 flow,并对每个整数调用 requestFlow
,如下所示:
1 | (1..3).asFlow().map { requestFlow(it) } |
然后我们最终会得到一个 flow 的 flow (Flow<Flow<String>>
),需要将其展平为单个 flow 以进行进一步处理。 为此,集合和 sequence 具有 flatten
和 flatMap
运算符。
然而,由于 flow 的异步性质,它们需要不同的扁平化模式,因此存在一系列 flow 上的扁平化运算符。
flatMapConcat
flow 的串联由 flatMapConcat
和 flattenConcat
运算符提供。 它们是相应 sequence 运算符最直接的类似物。 它们等待内部 flow 完成,然后再开始收集下一个 flow,如下例所示:
1 | val startTime = System.currentTimeMillis() // remember the start time |
flatMapMerge
另一个扁平化操作是并发收集所有传入 flow 并将它们的值合并到单个 flow 中,以便尽快发出值。 它由 flatMapMerge
和 flattenMerge
运算符实现。
它们都接受一个可选的 concurrency
参数,该参数限制同时收集的并发 flow 的数量(默认情况下等于 DEFAULT_CONCURRENCY
)。
1 | val startTime = System.currentTimeMillis() // remember the start time |
请注意,flatMapMerge
按顺序调用其代码块(本例中为 { requestFlow(it) }
),但并发收集结果 flow,这相当于首先执行顺序 map { requestFlow(it) }
,然后在结果上调用 flattenMerge
。
Flow exception
当发射器或运算符内的代码抛出异常时,flow 收集可能会在异常情况下完成。 有多种方法可以处理这些异常。
Collector try and catch
收集器可以使用 Kotlin 的 try/catch
块来处理异常:
1 | fun simple(): Flow<Int> = flow { |
Everything is caught
前面的示例实际上捕获了发射器或任何中间或终端运算符中发生的任何异常。 例如,让我们更改代码,以便将发出的值映射到字符串,但相应的代码会产生异常:
1 | fun simple(): Flow<String> = |
异常透明度
Flow completion
当 flow 收集完成时(正常或异常),它可能需要执行一个操作。 可以通过两种方式完成:命令式或声明式。
命令式 finally 块
除了 try/catch
之外,收集器还可以使用 finally
块在 collect
完成后执行操作。
声明式处理
对于声明式方法,flow 具有 onCompletion
中间运算符,当 flow 完全收集时将调用该运算符。
可以使用 onCompletion
运算符重写前面的示例并产生相同的输出:
1 | simple() |
onCompletion
的主要优点是 lambda 的可空 Throwable
参数,可用于确定 flow 收集是正常完成还是异常完成。 在以下示例中,简单流程在发出数字 1 后引发异常:
1 | fun simple(): Flow<Int> = flow { |
与 catch
不同,onCompletion
运算符不处理异常。 从上面的示例代码中我们可以看到,异常仍然流向下游。 它将被传递给进一步的 onCompletion
运算符,并且可以使用 catch
运算符进行处理。
Successful completion
与 catch
运算符的另一个区别是 onCompletion
会看到所有异常,并且仅在上游 flow 成功完成(没有取消或失败)时接收空异常。
Launching flow
使用 flow 来表示来自某个源的异步事件很容易。 在这种情况下,我们需要一个类似于 addEventListener
函数的函数,该函数注册一段代码并对传入事件做出反应并继续进一步的工作。onEach
运算符可以充当此角色。 然而,onEach
是一个中间运算符。 我们还需要一个终端操作符来收集 flow。 否则,仅仅调用 onEach
是没有效果的。
如果我们在 onEach
之后使用 collect
终端操作符,那么它后面的代码将等待 flow 被收集:
launchIn
终端操作符在这里派上用场。 通过将 collect
替换为 launchIn
,我们可以在单独的协程中启动 flow 的收集,以便立即继续执行进一步的代码:
1 | fun main() = runBlocking<Unit> { |
launchIn
所需的参数必须指定一个 CoroutineScope
,在其中启动用于收集 flow 的协程。 在上面的示例中,此作用域来自 runBlocking 协程构建器,因此当 flow 运行时,此 runBlocking 作用域会等待其子协程完成,并阻止 main 函数返回和终止此示例。
在实际应用中,scope 将来自具有有限生命周期的实体。 一旦该实体的生命周期终止,相应的作用域就会被取消,从而取消相应 flow 的收集。
这样,一对 onEach { ... }.launchIn(scope)
的工作方式就像 addEventListener`` 一样。 但是,不需要相应的 removeEventListener
函数,因为取消和结构化并发可以实现此目的。
请注意,launchIn
还返回一个 Job
,该 Job
只能用于取消相应的 flow 收集协程,而无需取消整个作用域或 join
它。
Flow cancellation checks
为了方便起见,flow
构建器对每个发出的值执行额外的 ensureActive
检查以取消取消。 这意味着从 flow { ... }
发出的繁忙循环是可以取消的:
1 | fun foo(): Flow<Int> = flow { |
然而,出于性能原因,大多数其他 flow operator 不会自行执行额外的取消检查。 例如,如果使用 IntRange.asFlow 扩展编写相同的繁忙循环并且不在任何地方 suspend,则不会检查取消:
Making busy flow cancellable
如果协程有繁忙循环,则必须显式检查取消。 可以添加 .onEach { currentCoroutineContext().ensureActive() }
,但提供了一个现成的 cancellable
operator 来执行此操作:
1 | fun main() = runBlocking<Unit> { |
StateFlow
StateFlow
是一个 state-holder observable flow,它向其收集器发出当前和新的状态更新。 当前状态值也可以通过其 value
属性读取。
要更新状态并将其发送到 flow,请为 MutableStateFlow
类的 value
属性分配一个新值。
在 Android 中,StateFlow
非常适合需要维护可观察的可变状态的类。
按照 Kotlin flow 中的示例,可以从 LatestNewsViewModel
公开 StateFlow
,以便 view 可以侦听 UI 状态更新,并本质上使屏幕状态在配置更改中幸存。
负责更新 MutableStateFlow
的类是生产者,所有从 StateFlow
收集的类都是消费者。 与使用 flow 构建器构建的 cold flow 不同,StateFlow
是 hot flow:
从 flow 中收集不会触发任何生产者代码。 StateFlow
始终处于活动状态并位于内存中,并且仅当垃圾收集 root 没有其他对它的引用时,它才有资格进行垃圾收集。
当新的使用者开始从 flow 中收集数据时,它会接收 flow 中的最后一个状态以及任何后续状态。 可以在其他可观察类(例如 LiveData
)中找到此行为。
View 与任何其他流一样侦听 StateFlow:
1 | class LatestNewsActivity : AppCompatActivity() { |
1 | 警告:如果 UI 需要更新,切勿直接从 launch 或 launchIn 扩展函数收集 UI 流。 即使 view 不可见,这些函数也会处理事件。 |
要将任何流转换为 StateFlow
,请使用 stateIn
intermediate operator。
StateFlow, Flow, and LiveDataStateFlow
和 LiveData
有相似之处。 两者都是可观察的数据持有者类,并且在应用程序架构中使用时都遵循类似的模式。
但请注意,StateFlow
和 LiveData
的行为确实不同:
StateFlow
需要将初始状态传递给构造函数,而LiveData
则不需要。- 当 view 进入 STOPPED 状态时,
LiveData.observe()
会自动取消注册使用者,而从StateFlow
或任何其他 flow 收集不会自动停止收集。 要实现相同的行为,需要从Lifecycle.repeatOnLifecycle
块收集 flow。
Making cold flows hot using shareIn
StateFlow
是一个 hot flow —— 只要该 flow 被收集或者垃圾收集根中存在对其的任何其他引用,它就会保留在内存中。 可以使用 shareIn
运算符将冷 flow 转为热 flow。
以 Kotlin 流中创建的 callbackFlow 为例,可以使用 shareIn
在收集器之间共享从 Firestore 检索的数据,而不是让每个收集器创建一个新 flow。 需要传递以下内容:
- 用于共享 flow 的
CoroutineScope
。 该作用域的生存时间应该比任何消费者都长,以便根据需要保持共享 flow 的存活时间。 - 每个新收集器重播的项目数。
- 启动行为策略。在此示例中,
1
2
3
4
5
6
7
8
9
10
11class NewsRemoteDataSource(...,
private val externalScope: CoroutineScope,
) {
val latestNews: Flow<List<ArticleHeadline>> = flow {
...
}.shareIn(
externalScope,
replay = 1,
started = SharingStarted.WhileSubscribed()
)
}latestNews
flow 将最后发出的项目重播到新的收集器,并且只要externalScope
处于活动状态且存在活动收集器,就保持活动状态。
当存在活跃订阅者时,SharingStarted.WhileSubscribed()
启动策略使上游生产者保持活动状态。 还可以使用其他启动策略,例如SharingStarted.Eagerly
立即启动生产者或SharingStarted.Lazily
在第一个订阅者出现后开始共享并保持 flow 永远处于活动状态。
SharedFlow
shareIn
函数返回一个 SharedFlow
,这是一个 hot flow,它向所有从中收集数据的消费者发出值。 SharedFlow
是 StateFlow
的高度可配置的泛化。
可以在不使用 shareIn
的情况下创建 SharedFlow
。 例如,可以使用 SharedFlow 将 tick 发送到应用程序的其余部分,以便所有内容同时定期刷新。
除了获取最新新闻之外,可能还想刷新用户信息部分及其最喜欢的主题集合。 在以下代码片段中,TickHandler
公开 SharedFlow
,以便其他类知道何时刷新其内容。
与 StateFlow
一样,在类中使用 MutableSharedFlow
类型的支持属性将项目发送到 flow:
可以通过以下方式自定义 SharedFlow
行为:
- replay 允许为新订阅者重新发送一些先前发出的值。
onBufferOverflow
允许指定缓冲区何时充满要发送的项目的策略。 默认值为 BufferOverflow.SUSPEND,这会使调用者挂起。 其他选项是 DROP_LATEST 或 DROP_OLDEST。
MutableSharedFlow
还有一个 subscriptionCount
属性,其中包含活动收集器的数量,以便可以相应地优化业务逻辑。 如果不想重播发送到流的最新信息,MutableSharedFlow
还包含一个 ResetReplayCache
函数。