抱歉,您的浏览器无法访问本站
本页面需要浏览器支持(启用)JavaScript
了解详情 >

原文

前言

Kotlin 只在标准库中提供了最低级别的API以使各种其他库能够利用协程。
在 kotlin 中,asyncawait 不是关键字,而且甚至不是标准库的一部分。
此外,与 futures and promises 相比,Kotlin 的挂起函数概念为异步操作提供了更安全且不易出错的抽象。

kotlinx.coroutines 是由 JetBrains 开发的丰富的协程库。 它包含本指南涵盖的许多支持协程的高级原语,包括launchasync等。

  • 协程不像线程一样存在调度,即系统为线程分配处理器使用权。除了挂起和取消,协程会一直运行直到它完成。

协程基础

协程是可挂起计算的一个实例。 它在概念上类似于线程,因为它需要一个与其余代码同时工作的代码块来运行。
但是,协程并不绑定到任何特定的线程。 它可以在一个线程中暂停执行并在另一个线程中恢复。

1
2
3
4
5
6
7
fun main() = runBlocking { // this: CoroutineScope
launch { // launch a new coroutine and continue
delay(1000L) // non-blocking delay for 1 second (default time unit is ms)
println("World!") // print after delay
}
println("Hello") // main coroutine continues while a previous one is delayed
}

launch 是一个协程构建器。 它与其余代码同时启动一个新的协程,该协程继续独立工作。 这就是首先打印 Hello 的原因。

delay 是一个特殊的挂起函数。 它将协程暂停特定时间。 挂起协程不会阻塞底层线程,但允许其他协程运行并使用底层线程来执行它们的代码。

runBlocking 也是一个协程构建器,它将常的 fun main() 的非协程世界与 runBlocking { ... } 花括号内的协程代码联系起来。 这在 IDE 中通过以下方式突出显示:在 runBlocking 左大括号之后的 this: CoroutineScope 提示。

runBlocking 的名称意味着运行它的线程在调用期间被阻塞,直到 runBlocking { ... } 中的所有协程完成执行。 你会经常看到在应用程序的最顶层使用 runBlocking,而在实际代码中很少见,因为线程是昂贵的资源,阻塞它们效率低下,通常是不希望的。

结构化并发

协程遵循结构化并发原则,这意味着新的协程只能在限定协程生命周期的特定 CoroutineScope 中启动。 上面的例子表明 runBlocking 建立了相应的范围,这就是为什么前面的例子等到 World! 在延迟一秒后打印,然后才退出。

在实际应用程序中,您将启动大量协程。 结构化并发确保它们不会丢失并且不会泄漏。 在其所有子协程完成之前,外部作用域无法完成。 结构化并发还确保正确报告代码中的任何错误并且永远不会丢失。

挂起函数可以像常规函数一样在协程中使用,但它们的附加特性是它们可以反过来使用其他挂起函数(如本例中的delay)来挂起协程的执行。

Scope 构建器

除了不同构建器提供的协程作用域外,还可以使用 coroutineScope 构建器声明自己的作用域。 它创建了一个协程范围,并且在所有启动的子进程完成之前不会完成。

runBlocking 和 coroutineScope 构建器可能看起来很相似,因为它们都在等待自己的 body 及其所有子节点完成。 主要区别在于 runBlocking 方法阻塞当前线程等待,而 coroutineScope 只是挂起,释放底层线程用于其他用途。 由于这个区别,runBlocking 是一个常规函数,而 coroutineScope 是一个挂起函数。

Scope 构建器 和 并发

可以在任何挂起函数中使用 coroutineScope 构建器来执行多个并发操作

显式 job

launch协程构建器返回一个 Job 对象,该对象是已启动协程的句柄,可用于显式等待其完成。

取消和超时

取消协程执行

在长时间运行的应用程序中,可能需要对后台协程进行细粒度控制。 例如,用户可能已经关闭了启动协程的页面,现在不再需要其结果并且可以取消其操作。 launch 函数返回一个 Job 可以用来取消正在运行的协程:

取消是合作的

协程取消是合作的。 协程代码必须合作才能取消。 kotlinx.coroutines 中的所有挂起函数都是可以取消的。 他们检查协程的取消并在取消时抛出 CancellationException。 但是,如果协程在计算中工作并且不检查取消,则无法取消。

使计算代码可取消

有两种方法可以使计算代码可取消。 第一个是定期调用检查取消的挂起函数。 有一个 yield 函数,它是一个很好的选择。
另一种是显式检查取消状态。isActive 是通过 CoroutineScope 对象在协程内部可用的扩展属性。

使用 finally 关闭资源

可取消的挂起函数在取消时会抛出 CancellationException,这可以按通常的方式处理。 例如,try {...} finally {...} 表达式和 Kotlin use 函数在协程被取消时正常执行它们的终结操作

运行不可取消的代码块

任何在上一个示例的 finally 块中使用挂起函数的尝试都会导致 CancellationException,因为运行此代码的协程被取消。
通常,这不是问题,因为所有行为良好的关闭操作(关闭文件、取消job或关闭任何类型的通信通道)通常都是非阻塞的,并且不涉及任何挂起功能。
但是,在极少数情况下,当需要在取消的协程中挂起时,可以使用 withContext 函数和 NonCancellable 上下文将相应的代码包装在 withContext(NonCancellable) {...} 中。

超时

取消协程执行的最明显的实际原因是因为它的执行时间已经超过了一些超时。 虽然可以手动跟踪对相应 Job 的引用并启动一个单独的协程以在延迟后取消跟踪的协程,但有一个准备使用的 withTimeout 函数可以做到这一点。

withTimeout 抛出的 TimeoutCancellationExceptionCancellationException 的子类。 我们之前没有在控制台上看到它的堆栈跟踪。 那是因为在取消的协程内部 CancellationException 被认为是协程完成的正常原因。 然而,在这个例子中,我们在 main 函数中使用了 withTimeout。

由于取消只是一个异常,所有资源都以通常的方式关闭。 如果需要专门针对任何类型的超时执行一些附加操作,可以可以将超时代码包装在 try {...} catch (e: TimeoutCancellationException) {...} 块中,或使用 withTimeoutOrNull 函数,它类似于 withTimeout ,但在超时时返回 null 而不是抛出异常

异步超时和资源

withTimeout 中的超时事件相对于在其块中运行的代码是异步的,并且可能随时发生,甚至在从超时块内部返回之前。 如果您在块内打开或获取一些需要在块外关闭或释放的资源,请记住这一点。

组合挂起函数

默认是顺序的

协程中的代码,就像在常规代码中一样,默认是顺序的

使用 async 来并发

从概念上讲,async 就像 launch。 它启动一个单独的协程,它是一个轻量级线程,可与所有其他协程同时工作。 不同之处在于,launch 返回一个 Job 并且不携带任何结果值,而 async 返回一个 Deferred - 一个轻量级的非阻塞future,表示稍后提供结果的承诺。 可以在deferred值上使用 .await() 来获得其最终结果,但 Deferred 也是一个 job,因此可以在需要时取消它。

懒启动的 async

可选地,可以通过将其 start 参数设置为 CoroutineStart.LAZY 来使 async 变得惰性。 在这种模式下,它仅在 await 需要其结果或调用其 Jobstart 函数时才启动协程。

异步风格的函数

Coroutine context and dispatchers

newSingleThreadContext 创建一个线程供协程运行。 专用线程是非常昂贵的资源。 在实际应用程序中,当不再需要时,必须使用 close 函数释放它,或者将其存储在顶级变量中并在整个应用程序中重用。

Flow

Flow 是类似于 sequence 的 cold stream —— flow builder 内的代码在 collect flow 之前不会运行。

Flow 遵循协程的一般合作取消。 与往常一样,当 flow 在可取消的 suspend 函数(如 delay)中 suspend 时,可以取消 flow collection。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
fun simple(): Flow<Int> = flow { 
for (i in 1..3) {
delay(100)
println("Emitting $i")
emit(i)
}
}

fun main() = runBlocking<Unit> {
withTimeoutOrNull(250) { // Timeout after 250ms
simple().collect { value -> println(value) }
}
println("Done")
}

flow { ... } builder 是最基本的 builder。 还有其他允许声明 flow 的 builder:

  • flowOf 构建器定义了发出一组固定值的 flow
  • 可以使用 .asFlow() 扩展函数将各种集合和 sequence 转换为 flow。

Intermediate flow operator

可以使用 operator 来转换 flow,就像转换集合和 sequence 一样。 Intermediate operator 应用于上游 flow 并返回下游 flow。 这些 operator 是冷酷的,就像 flow 一样。
对此类 operator 的调用本身并不是 suspend 函数。 它工作速度很快,返回新转换了的 flow 的定义。

基本 operator 具有熟悉的名称,例如 map 和 filter。 这些 operator 与 sequence 的一个重要区别是这些运算符内的代码块可以调用 suspend 函数。

Transform operator

在 flow transformation operator 中,最通用的一种称为 transform。 它可用于模仿简单的转换(如 map 和 filter),以及实现更复杂的转换。
使用 transform operator,可以发出任意次数的任意值。

1
2
3
4
5
6
7
8
9
10
11
(1..3).asFlow() // a flow of requests
.transform { request ->
emit("Making request $request")
emit(performRequest(request))
}
.collect { response -> println(response) }

suspend fun performRequest(request: Int): String {
delay(1000) // imitate long-running asynchronous work
return "response $request"
}
1
2
3
4
5
6
Making request 1
response 1
Making request 2
response 2
Making request 3
response 3

Size-limiting operator

当达到相应的限制时,诸如 take 之类的 Size-limiting intermediate operator 会取消 flow 的执行。 协程中的取消始终通过抛出异常来执行,以便所有资源管理函数(如 try { ... } finally { ... } 块)在取消时正常运行:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
fun numbers(): Flow<Int> = flow {
try {
emit(1)
emit(2)
println("This line will not execute")
emit(3)
} finally {
println("Finally in numbers")
}
}

fun main() = runBlocking<Unit> {
numbers()
.take(2) // take only the first two
.collect { value -> println(value) }
}
1
2
3
1
2
Finally in numbers

Terminal flow operators

flow 上的 terminal operator 是开始 flow 的收集的 suspend 函数。 collect operator 是最基本的 operator,但还有其他 terminal operator,可以使操作变得更容易:

  • 转换为各种集合,例如 toListtoSet
  • 用于获取第一个值并确保 flow 发出单个值的运算符。
  • 使用 reducefold 将 flow 减少到某个值。

flow 是顺序的

除非使用对多个 flow 进行操作的特殊运算符,否则 flow 的每个单独收集都会按顺序执行。 该收集直接在调用 terminal operator 的协程中工作。
默认情况下不会启动新的协程。 每个发出的值都经过从上游到下游的所有 intermediate operator 的处理,然后传递给 terminal operator。

Flow context

flow 的收集始终发生在调用协程的上下文中。 例如,如果有一个 simple flow,那么以下代码将在该代码作者指定的上下文中运行,而不管 simple flow 的实现细节如何

flow 的这种属性称为 context preservation。

因此,默认情况下,flow { ... } 构建器中的代码在相应 flow 的收集器提供的上下文中运行。 例如,考虑一个简单函数的实现,该函数打印调用它的线程并发出三个数字:

使用 withContext 时的常见陷阱

但是,长时间运行的 CPU 消耗代码可能需要在 Dispatchers.Default 上下文中执行,UI 更新代码可能需要在 Dispatchers.Main 上下文中执行。
通常,withContext 用于更改使用 Kotlin 协程的代码中的上下文,但 flow { ... } 构建器中的代码必须遵守上下文保留属性,并且不允许从不同的上下文 emit。

flowOn operator

异常是指用于更改 flow emit 上下文的 flowOn 函数。 下面的示例显示了更改 flow 上下文的正确方法,该示例还打印相应线程的名称以显示其工作原理:

1
2
3
4
5
6
7
8
9
10
11
12
13
fun simple(): Flow<Int> = flow {
for (i in 1..3) {
Thread.sleep(100) // pretend we are computing it in CPU-consuming way
log("Emitting $i")
emit(i) // emit next value
}
}.flowOn(Dispatchers.Default) // RIGHT way to change context for CPU-consuming code in flow builder

fun main() = runBlocking<Unit> {
simple().collect { value ->
log("Collected $value")
}
}
1
2
3
4
5
6
[DefaultDispatcher-worker-1 @coroutine#2] Emitting 1
[main @coroutine#1] Collected 1
[DefaultDispatcher-worker-1 @coroutine#2] Emitting 2
[main @coroutine#1] Collected 2
[DefaultDispatcher-worker-1 @coroutine#2] Emitting 3
[main @coroutine#1] Collected 3

请注意 flow { ... } 如何在后台线程中工作,而收集发生在主线程中:

这里要观察的另一件事是 flowOn 运算符更改了 flow 的默认顺序性质。 现在,收集发生在一个协程(“coroutine#1”)中,发射发生在另一个协程(“coroutine#2”)中,该协程与收集协程并发的另一个线程中运行。
当必须更改上下文中的 CoroutineDispatcher 时, flowOn 运算符会为上游 flow 创建另一个协程。

Buffering

从收集 flow 所需的总时间的角度来看,在不同的协程中运行 flow 的不同部分可能会有所帮助,特别是在涉及长时间运行的异步操作时。
例如,考虑一个 simple flow 的发射速度很慢的情况,需要 100 毫秒才能产生一个元素; 而且收集器也很慢,处理一个元素需要 300 毫秒。 让我们看看收集这样一个包含三个数字的 flow 需要多长时间:

我们可以在 flow 上使用 buffer operator 来与收集代码并发运行 simple flow 的发出代码,而不是顺序运行它们:

1
2
3
4
5
6
7
8
9
val time = measureTimeMillis {
simple()
.buffer() // buffer emissions, don't wait
.collect { value ->
delay(300) // pretend we are processing it for 300 ms
println(value)
}
}
println("Collected in $time ms")

它生成相同数字的速度更快,因为我们有效地创建了一个处理管道,只需等待 100 毫秒即可获取第一个数字,然后仅花费 300 毫秒来处理每个数字。 这样运行大约需要 1000 毫秒:

请注意,flowOn 运算符在必须更改 CoroutineDispatcher 时使用相同的缓冲机制,但这里我们显式请求缓冲而不更改执行上下文。

合并

当 flow 表示操作或操作状态更新的部分结果时,可能不需要处理每个值,而只需处理最近的值。 在这种情况下,当收集器处理速度太慢时,可以使用 conflate 运算符来跳过中间值。

1
2
3
4
5
6
7
8
9
val time = measureTimeMillis {
simple()
.conflate() // conflate emissions, don't process each one
.collect { value ->
delay(300) // pretend we are processing it for 300 ms
println(value)
}
}
println("Collected in $time ms")

处理最新值
当发射器和收集器都很慢时,合并是加速处理的一种方法。 它通过删除发出的值来实现这一点。 另一种方法是取消慢速收集器并在每次发出新值时重新启动它。
有一系列 xxxLatest 运算符,它们执行与 xxx 运算符相同的基本逻辑,但在新值上取消其块中的代码。 让我们尝试在前面的示例中将conflate更改为collectLatest:

1
2
3
4
5
6
7
8
9
val time = measureTimeMillis {
simple()
.collectLatest { value -> // cancel & restart on the latest value
println("Collecting $value")
delay(300) // pretend we are processing it for 300 ms
println("Done $value")
}
}
println("Collected in $time ms")

由于 collectLatest 的主体需要300毫秒,但每100毫秒发出一次新值,我们看到该块在每个值上运行,但仅针对最后一个值完成

组合多个 flow

zip

就像 Kotlin 标准库中的 Sequence.zip 扩展函数一样,flow 有一个 zip 运算符,用于组合两个 flow 的相应值:

1
2
3
4
val nums = (1..3).asFlow() // numbers 1..3
val strs = flowOf("one", "two", "three") // strings
nums.zip(strs) { a, b -> "$a -> $b" } // compose a single string
.collect { println(it) } // collect and print

Combine

当 flow 表示变量或操作的最新值时,可能需要执行取决于相应 flow 的最新值的计算,并在任何上游 flow 发出一个值时重新计算它 。 相应的运算符系列称为 combine

例如,如果上一个示例中的数字每 300 毫秒更新一次,但字符串每 400 毫秒更新一次,则使用 zip 运算符对它们进行压缩仍将产生相同的结果,尽管结果每 400 毫秒打印一次:

1
2
3
4
5
6
7
val nums = (1..3).asFlow().onEach { delay(300) } // numbers 1..3 every 300 ms
val strs = flowOf("one", "two", "three").onEach { delay(400) } // strings every 400 ms
val startTime = System.currentTimeMillis() // remember the start time
nums.zip(strs) { a, b -> "$a -> $b" } // compose a single string with "zip"
.collect { value -> // collect and print
println("$value at ${System.currentTimeMillis() - startTime} ms from start")
}
1
2
3
1 -> one at 441 ms from start
2 -> two at 841 ms from start
3 -> three at 1244 ms from start

当在这里使用 combine 运算符而不是 zip 时:

1
2
3
4
5
6
7
val nums = (1..3).asFlow().onEach { delay(300) } // numbers 1..3 every 300 ms
val strs = flowOf("one", "two", "three").onEach { delay(400) } // strings every 400 ms
val startTime = System.currentTimeMillis() // remember the start time
nums.combine(strs) { a, b -> "$a -> $b" } // compose a single string with "combine"
.collect { value -> // collect and print
println("$value at ${System.currentTimeMillis() - startTime} ms from start")
}
1
2
3
4
5
1 -> one at 433 ms from start
2 -> one at 636 ms from start
2 -> two at 835 ms from start
3 -> two at 937 ms from start
3 -> three at 1235 ms from start

扁平化 flow

flow 表示异步接收的值序列,因此很容易陷入每个值触发对另一个值序列的请求的情况。 例如,我们可以使用以下函数返回两个相隔 500 毫秒的字符串 flow:

1
2
3
4
5
fun requestFlow(i: Int): Flow<String> = flow {
emit("$i: First")
delay(500) // wait 500 ms
emit("$i: Second")
}

现在,如果我们有一个包含三个整数的 flow,并对每个整数调用 requestFlow,如下所示:

1
(1..3).asFlow().map { requestFlow(it) }

然后我们最终会得到一个 flow 的 flow (Flow<Flow<String>>),需要将其展平为单个 flow 以进行进一步处理。 为此,集合和 sequence 具有 flattenflatMap 运算符。
然而,由于 flow 的异步性质,它们需要不同的扁平化模式,因此存在一系列 flow 上的扁平化运算符。

flatMapConcat
flow 的串联由 flatMapConcatflattenConcat 运算符提供。 它们是相应 sequence 运算符最直接的类似物。 它们等待内部 flow 完成,然后再开始收集下一个 flow,如下例所示:

1
2
3
4
5
6
val startTime = System.currentTimeMillis() // remember the start time 
(1..3).asFlow().onEach { delay(100) } // emit a number every 100 ms
.flatMapConcat { requestFlow(it) }
.collect { value -> // collect and print
println("$value at ${System.currentTimeMillis() - startTime} ms from start")
}

flatMapMerge
另一个扁平化操作是并发收集所有传入 flow 并将它们的值合并到单个 flow 中,以便尽快发出值。 它由 flatMapMergeflattenMerge 运算符实现。
它们都接受一个可选的 concurrency 参数,该参数限制同时收集的并发 flow 的数量(默认情况下等于 DEFAULT_CONCURRENCY)。

1
2
3
4
5
6
val startTime = System.currentTimeMillis() // remember the start time 
(1..3).asFlow().onEach { delay(100) } // a number every 100 ms
.flatMapMerge { requestFlow(it) }
.collect { value -> // collect and print
println("$value at ${System.currentTimeMillis() - startTime} ms from start")
}

请注意,flatMapMerge 按顺序调用其代码块(本例中为 { requestFlow(it) }),但并发收集结果 flow,这相当于首先执行顺序 map { requestFlow(it) },然后在结果上调用 flattenMerge

Flow exception

当发射器或运算符内的代码抛出异常时,flow 收集可能会在异常情况下完成。 有多种方法可以处理这些异常。

Collector try and catch

收集器可以使用 Kotlin 的 try/catch 块来处理异常:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
fun simple(): Flow<Int> = flow {
for (i in 1..3) {
println("Emitting $i")
emit(i) // emit next value
}
}

fun main() = runBlocking<Unit> {
try {
simple().collect { value ->
println(value)
check(value <= 1) { "Collected $value" }
}
} catch (e: Throwable) {
println("Caught $e")
}
}

Everything is caught

前面的示例实际上捕获了发射器或任何中间或终端运算符中发生的任何异常。 例如,让我们更改代码,以便将发出的值映射到字符串,但相应的代码会产生异常:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
fun simple(): Flow<String> = 
flow {
for (i in 1..3) {
println("Emitting $i")
emit(i) // emit next value
}
}
.map { value ->
check(value <= 1) { "Crashed on $value" }
"string $value"
}

fun main() = runBlocking<Unit> {
try {
simple().collect { value -> println(value) }
} catch (e: Throwable) {
println("Caught $e")
}
}

异常透明度

Flow completion

当 flow 收集完成时(正常或异常),它可能需要执行一个操作。 可以通过两种方式完成:命令式或声明式。

命令式 finally 块

除了 try/catch 之外,收集器还可以使用 finally 块在 collect 完成后执行操作。

声明式处理

对于声明式方法,flow 具有 onCompletion 中间运算符,当 flow 完全收集时将调用该运算符。

可以使用 onCompletion 运算符重写前面的示例并产生相同的输出:

1
2
3
simple()
.onCompletion { println("Done") }
.collect { value -> println(value) }

onCompletion 的主要优点是 lambda 的可空 Throwable 参数,可用于确定 flow 收集是正常完成还是异常完成。 在以下示例中,简单流程在发出数字 1 后引发异常:

1
2
3
4
5
6
7
8
9
10
11
fun simple(): Flow<Int> = flow {
emit(1)
throw RuntimeException()
}

fun main() = runBlocking<Unit> {
simple()
.onCompletion { cause -> if (cause != null) println("Flow completed exceptionally") }
.catch { cause -> println("Caught exception") }
.collect { value -> println(value) }
}

catch 不同,onCompletion 运算符不处理异常。 从上面的示例代码中我们可以看到,异常仍然流向下游。 它将被传递给进一步的 onCompletion 运算符,并且可以使用 catch 运算符进行处理。

Successful completion

catch 运算符的另一个区别是 onCompletion 会看到所有异常,并且仅在上游 flow 成功完成(没有取消或失败)时接收空异常。

Launching flow

使用 flow 来表示来自某个源的异步事件很容易。 在这种情况下,我们需要一个类似于 addEventListener 函数的函数,该函数注册一段代码并对传入事件做出反应并继续进一步的工作。
onEach 运算符可以充当此角色。 然而,onEach 是一个中间运算符。 我们还需要一个终端操作符来收集 flow。 否则,仅仅调用 onEach 是没有效果的。

如果我们在 onEach 之后使用 collect 终端操作符,那么它后面的代码将等待 flow 被收集:

launchIn 终端操作符在这里派上用场。 通过将 collect 替换为 launchIn,我们可以在单独的协程中启动 flow 的收集,以便立即继续执行进一步的代码:

1
2
3
4
5
6
fun main() = runBlocking<Unit> {
events()
.onEach { event -> println("Event: $event") }
.launchIn(this) // <--- Launching the flow in a separate coroutine
println("Done")
}

launchIn 所需的参数必须指定一个 CoroutineScope,在其中启动用于收集 flow 的协程。 在上面的示例中,此作用域来自 runBlocking 协程构建器,因此当 flow 运行时,此 runBlocking 作用域会等待其子协程完成,并阻止 main 函数返回和终止此示例。

在实际应用中,scope 将来自具有有限生命周期的实体。 一旦该实体的生命周期终止,相应的作用域就会被取消,从而取消相应 flow 的收集。
这样,一对 onEach { ... }.launchIn(scope) 的工作方式就像 addEventListener`` 一样。 但是,不需要相应的 removeEventListener 函数,因为取消和结构化并发可以实现此目的。

请注意,launchIn 还返回一个 Job,该 Job 只能用于取消相应的 flow 收集协程,而无需取消整个作用域或 join 它。

Flow cancellation checks

为了方便起见,flow 构建器对每个发出的值执行额外的 ensureActive 检查以取消取消。 这意味着从 flow { ... } 发出的繁忙循环是可以取消的:

1
2
3
4
5
6
7
8
9
10
11
12
13
fun foo(): Flow<Int> = flow { 
for (i in 1..5) {
println("Emitting $i")
emit(i)
}
}

fun main() = runBlocking<Unit> {
foo().collect { value ->
if (value == 3) cancel()
println(value)
}
}

然而,出于性能原因,大多数其他 flow operator 不会自行执行额外的取消检查。 例如,如果使用 IntRange.asFlow 扩展编写相同的繁忙循环并且不在任何地方 suspend,则不会检查取消:

Making busy flow cancellable

如果协程有繁忙循环,则必须显式检查取消。 可以添加 .onEach { currentCoroutineContext().ensureActive() },但提供了一个现成的 cancellable operator 来执行此操作:

1
2
3
4
5
6
fun main() = runBlocking<Unit> {
(1..5).asFlow().cancellable().collect { value ->
if (value == 3) cancel()
println(value)
}
}

StateFlow

StateFlow 是一个 state-holder observable flow,它向其收集器发出当前和新的状态更新。 当前状态值也可以通过其 value 属性读取。
要更新状态并将其发送到 flow,请为 MutableStateFlow 类的 value 属性分配一个新值。

在 Android 中,StateFlow 非常适合需要维护可观察的可变状态的类。

按照 Kotlin flow 中的示例,可以从 LatestNewsViewModel 公开 StateFlow,以便 view 可以侦听 UI 状态更新,并本质上使屏幕状态在配置更改中幸存。

负责更新 MutableStateFlow 的类是生产者,所有从 StateFlow 收集的类都是消费者。 与使用 flow 构建器构建的 cold flow 不同,StateFlow 是 hot flow:
从 flow 中收集不会触发任何生产者代码。 StateFlow 始终处于活动状态并位于内存中,并且仅当垃圾收集 root 没有其他对它的引用时,它才有资格进行垃圾收集。

当新的使用者开始从 flow 中收集数据时,它会接收 flow 中的最后一个状态以及任何后续状态。 可以在其他可观察类(例如 LiveData)中找到此行为。

View 与任何其他流一样侦听 StateFlow:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
class LatestNewsActivity : AppCompatActivity() {
private val latestNewsViewModel = // getViewModel()

override fun onCreate(savedInstanceState: Bundle?) {
...
// Start a coroutine in the lifecycle scope
lifecycleScope.launch {
// 每当生命周期处于 STARTED 状态(或更高状态)时,repeatOnLifecycle 就会在新的协程中启动该块,并在 STOPPED 时取消它。
repeatOnLifecycle(Lifecycle.State.STARTED) {
// Trigger the flow and start listening for values.
// Note that this happens when lifecycle is STARTED and stops
// collecting when the lifecycle is STOPPED
latestNewsViewModel.uiState.collect { uiState ->
// New value received
when (uiState) {
is LatestNewsUiState.Success -> showFavoriteNews(uiState.news)
is LatestNewsUiState.Error -> showError(uiState.exception)
}
}
}
}
}
}
1
2
警告:如果 UI 需要更新,切勿直接从 launch 或 launchIn 扩展函数收集 UI 流。 即使 view 不可见,这些函数也会处理事件。 
此行为可能会导致应用程序崩溃。 为了避免这种情况,请使用上面所示的 RepeatOnLifecycle API。

要将任何流转换为 StateFlow,请使用 stateIn intermediate operator。

StateFlow, Flow, and LiveData
StateFlowLiveData 有相似之处。 两者都是可观察的数据持有者类,并且在应用程序架构中使用时都遵循类似的模式。

但请注意,StateFlowLiveData 的行为确实不同:

  • StateFlow 需要将初始状态传递给构造函数,而 LiveData 则不需要。
  • 当 view 进入 STOPPED 状态时,LiveData.observe() 会自动取消注册使用者,而从 StateFlow 或任何其他 flow 收集不会自动停止收集。 要实现相同的行为,需要从 Lifecycle.repeatOnLifecycle 块收集 flow。

Making cold flows hot using shareIn

StateFlow 是一个 hot flow —— 只要该 flow 被收集或者垃圾收集根中存在对其的任何其他引用,它就会保留在内存中。 可以使用 shareIn 运算符将冷 flow 转为热 flow。

以 Kotlin 流中创建的 callbackFlow 为例,可以使用 shareIn 在收集器之间共享从 Firestore 检索的数据,而不是让每个收集器创建一个新 flow。 需要传递以下内容:

  • 用于共享 flow 的 CoroutineScope。 该作用域的生存时间应该比任何消费者都长,以便根据需要保持共享 flow 的存活时间。
  • 每个新收集器重播的项目数。
  • 启动行为策略。
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    class NewsRemoteDataSource(...,
    private val externalScope: CoroutineScope,
    ) {
    val latestNews: Flow<List<ArticleHeadline>> = flow {
    ...
    }.shareIn(
    externalScope,
    replay = 1,
    started = SharingStarted.WhileSubscribed()
    )
    }
    在此示例中,latestNews flow 将最后发出的项目重播到新的收集器,并且只要 externalScope 处于活动状态且存在活动收集器,就保持活动状态。
    当存在活跃订阅者时,SharingStarted.WhileSubscribed() 启动策略使上游生产者保持活动状态。 还可以使用其他启动策略,例如 SharingStarted.Eagerly 立即启动生产者或 SharingStarted.Lazily 在第一个订阅者出现后开始共享并保持 flow 永远处于活动状态。

SharedFlow

shareIn 函数返回一个 SharedFlow,这是一个 hot flow,它向所有从中收集数据的消费者发出值。 SharedFlowStateFlow 的高度可配置的泛化。

可以在不使用 shareIn 的情况下创建 SharedFlow。 例如,可以使用 SharedFlow 将 tick 发送到应用程序的其余部分,以便所有内容同时定期刷新。
除了获取最新新闻之外,可能还想刷新用户信息部分及其最喜欢的主题集合。 在以下代码片段中,TickHandler 公开 SharedFlow,以便其他类知道何时刷新其内容。
StateFlow 一样,在类中使用 MutableSharedFlow 类型的支持属性将项目发送到 flow:

可以通过以下方式自定义 SharedFlow 行为:

  • replay 允许为新订阅者重新发送一些先前发出的值。
  • onBufferOverflow 允许指定缓冲区何时充满要发送的项目的策略。 默认值为 BufferOverflow.SUSPEND,这会使调用者挂起。 其他选项是 DROP_LATEST 或 DROP_OLDEST。

MutableSharedFlow 还有一个 subscriptionCount 属性,其中包含活动收集器的数量,以便可以相应地优化业务逻辑。 如果不想重播发送到流的最新信息,MutableSharedFlow 还包含一个 ResetReplayCache 函数。

评论