blob: 43467da45849260eb9735fe357f3c21f537be706 [file] [log] [blame] [view] [edit]
<!--- INCLUDE .*/example-([a-z]+)-([0-9a-z]+)\.kt
/*
* Copyright 2016-2019 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
*/
// This file was automatically generated from coroutines-guide.md by Knit tool. Do not edit.
package kotlinx.coroutines.guide.$$1$$2
-->
<!--- KNIT ../kotlinx-coroutines-core/jvm/test/guide/.*-##\.kt -->
<!--- TEST_OUT ../kotlinx-coroutines-core/jvm/test/guide/test/FlowGuideTest.kt
// This file was automatically generated from flow.md by Knit tool. Do not edit.
package kotlinx.coroutines.guide.test
import org.junit.Test
class FlowGuideTest {
-->
**Table of contents**
<!--- TOC -->
* [Asynchronous Flow](#asynchronous-flow)
* [Representing multiple values](#representing-multiple-values)
* [Sequences](#sequences)
* [Suspending functions](#suspending-functions)
* [Flows](#flows)
* [Flows are cold](#flows-are-cold)
* [Flow cancellation](#flow-cancellation)
* [Flow builders](#flow-builders)
* [Intermediate flow operators](#intermediate-flow-operators)
* [Transform operator](#transform-operator)
* [Size-limiting operators](#size-limiting-operators)
* [Terminal flow operators](#terminal-flow-operators)
* [Flows are sequential](#flows-are-sequential)
* [Flow context](#flow-context)
* [Wrong emission withContext](#wrong-emission-withcontext)
* [flowOn operator](#flowon-operator)
* [Buffering](#buffering)
* [Conflation](#conflation)
* [Processing the latest value](#processing-the-latest-value)
* [Composing multiple flows](#composing-multiple-flows)
* [Zip](#zip)
* [Combine](#combine)
* [Flattening flows](#flattening-flows)
* [flatMapConcat](#flatmapconcat)
* [flatMapMerge](#flatmapmerge)
* [flatMapLatest](#flatmaplatest)
* [Flow exceptions](#flow-exceptions)
* [Collector try and catch](#collector-try-and-catch)
* [Everything is caught](#everything-is-caught)
* [Exception transparency](#exception-transparency)
* [Transparent catch](#transparent-catch)
* [Catching declaratively](#catching-declaratively)
* [Flow completion](#flow-completion)
* [Imperative finally block](#imperative-finally-block)
* [Declarative handling](#declarative-handling)
* [Upstream exceptions only](#upstream-exceptions-only)
* [Imperative versus declarative](#imperative-versus-declarative)
* [Launching flow](#launching-flow)
<!--- END_TOC -->
## Asynchronous Flow
Suspending functions asynchronously return a single value, but how can you return
multiple asynchronously computed values? That is what Kotlin Flows are for.
### Representing multiple values
Multiple values can be represented in Kotlin using [collections].
For example, we can have a function `foo()` that returns a [List]
of three numbers and print them all using [forEach]:
<div class="sample" markdown="1" theme="idea" data-min-compiler-version="1.3">
```kotlin
fun foo(): List<Int> = listOf(1, 2, 3)
fun main() {
foo().forEach { value -> println(value) }
}
```
</div>
> You can get full code [here](../kotlinx-coroutines-core/jvm/test/guide/example-flow-01.kt).
This code outputs:
```text
1
2
3
```
<!--- TEST -->
#### Sequences
If the numbers are computed with some CPU-consuming blocking code
(each computation taking 100ms) then we can represent the numbers using a [Sequence]:
<div class="sample" markdown="1" theme="idea" data-min-compiler-version="1.3">
```kotlin
fun foo(): Sequence<Int> = sequence { // sequence builder
for (i in 1..3) {
Thread.sleep(100) // pretend we are computing it
yield(i) // yield next value
}
}
fun main() {
foo().forEach { value -> println(value) }
}
```
</div>
> You can get full code [here](../kotlinx-coroutines-core/jvm/test/guide/example-flow-02.kt).
This code outputs the same numbers, but it waits 100ms before printing each one.
<!--- TEST
1
2
3
-->
#### Suspending functions
However, this computation blocks the main thread that is running the code.
When those values are computed by an asynchronous code we can mark function `foo` with a `suspend` modifier,
so that it can perform its work without blocking and return the result as a list:
<div class="sample" markdown="1" theme="idea" data-min-compiler-version="1.3">
```kotlin
import kotlinx.coroutines.*
//sampleStart
suspend fun foo(): List<Int> {
delay(1000) // pretend we are doing something asynchronous here
return listOf(1, 2, 3)
}
fun main() = runBlocking<Unit> {
foo().forEach { value -> println(value) }
}
//sampleEnd
```
</div>
> You can get full code [here](../kotlinx-coroutines-core/jvm/test/guide/example-flow-03.kt).
This code prints the numbers after waiting for a second.
<!--- TEST
1
2
3
-->
#### Flows
Using `List<Int>` result type we can only return all the values at once. To represent
the stream of values that are being asynchronously computed we can use [`Flow<Int>`][Flow] type similarly
to the `Sequence<Int>` type for synchronously computed values:
<div class="sample" markdown="1" theme="idea" data-min-compiler-version="1.3">
```kotlin
import kotlinx.coroutines.*
import kotlinx.coroutines.flow.*
//sampleStart
fun foo(): Flow<Int> = flow { // flow builder
for (i in 1..3) {
delay(100) // pretend we are doing something useful here
emit(i) // emit next value
}
}
fun main() = runBlocking<Unit> {
// Launch a concurrent coroutine to see that the main thread is not blocked
launch {
for (k in 1..3) {
println("I'm not blocked $k")
delay(100)
}
}
// Collect the flow
foo().collect { value -> println(value) }
}
//sampleEnd
```
</div>
> You can get full code [here](../kotlinx-coroutines-core/jvm/test/guide/example-flow-04.kt).
This code waits 100ms before printing each number without blocking the main thread. This is verified
by printing "I'm not blocked" every 100ms from a separate coroutine that is running in the main thread:
```text
I'm not blocked 1
1
I'm not blocked 2
2
I'm not blocked 3
3
```
<!--- TEST -->
Notice the following differences of the code with the [Flow] from the earlier examples:
* A builder function for [Flow] type is called [flow].
* Code inside the `flow { ... }` builder block can suspend.
* The function `foo()` is no longer marked with `suspend` modifier.
* Values are _emitted_ from the flow using [emit][FlowCollector.emit] function.
* Values are _collected_ from the flow using [collect][collect] function.
> You can replace [delay] with `Thread.sleep` in the body of `foo`'s `flow { ... }` and see that the main
thread is blocked in this case.
### Flows are cold
Flows are _cold_ streams similarly to sequences &mdash; the code inside a [flow] builder does not
run until the flow is collected. This becomes clear in the following example:
<div class="sample" markdown="1" theme="idea" data-min-compiler-version="1.3">
```kotlin
import kotlinx.coroutines.*
import kotlinx.coroutines.flow.*
//sampleStart
fun foo(): Flow<Int> = flow {
println("Flow started")
for (i in 1..3) {
delay(100)
emit(i)
}
}
fun main() = runBlocking<Unit> {
println("Calling foo...")
val flow = foo()
println("Calling collect...")
flow.collect { value -> println(value) }
println("Calling collect again...")
flow.collect { value -> println(value) }
}
//sampleEnd
```
</div>
> You can get full code [here](../kotlinx-coroutines-core/jvm/test/guide/example-flow-05.kt).
Which prints:
```text
Calling foo...
Calling collect...
Flow started
1
2
3
Calling collect again...
Flow started
1
2
3
```
<!--- TEST -->
That is a key reason why the `foo()` function (which returns a flow) is not marked with `suspend` modifier.
By itself, `foo()` returns quickly and does not wait for anything. The flow starts every time it is collected,
that is why we see that when we call `collect` again, we get "Flow started" printed again.
### Flow cancellation
Flow adheres to general cooperative cancellation of coroutines. However, flow infrastructure does not introduce
additional cancellation points. It is fully transparent for cancellation. As usual, flow collection can be
cancelled when the flow is suspended in a cancellable suspending function (like [delay]) and cannot be cancelled otherwise.
The following example shows how the flow gets cancelled on timeout when running in [withTimeoutOrNull] block
and stops executing its code:
<div class="sample" markdown="1" theme="idea" data-min-compiler-version="1.3">
```kotlin
import kotlinx.coroutines.*
import kotlinx.coroutines.flow.*
//sampleStart
fun foo(): Flow<Int> = flow {
for (i in 1..3) {
delay(100)
println("Emitting $i")
emit(i)
}
}
fun main() = runBlocking<Unit> {
withTimeoutOrNull(250) { // Timeout after 250ms
foo().collect { value -> println(value) }
}
println("Done")
}
//sampleEnd
```
</div>
> You can get full code [here](../kotlinx-coroutines-core/jvm/test/guide/example-flow-06.kt).
Notice how only two numbers get emitted by the flow in `foo()` function, producing the following output:
```text
Emitting 1
1
Emitting 2
2
Done
```
<!--- TEST -->
### Flow builders
The `flow { ... }` builder from the previous examples is the most basic one. There are other builders for
convenient declaration of flows:
* [flowOf] builder that defines a flow emitting a fixed set of values.
* Various collections and sequences can be converted to flows using `.asFlow()` extension functions.
Thus, the example that prints numbers from 1 to 3 from a flow can be written as:
<div class="sample" markdown="1" theme="idea" data-min-compiler-version="1.3">
```kotlin
import kotlinx.coroutines.*
import kotlinx.coroutines.flow.*
fun main() = runBlocking<Unit> {
//sampleStart
// Convert an integer range to a flow
(1..3).asFlow().collect { value -> println(value) }
//sampleEnd
}
```
</div>
> You can get full code [here](../kotlinx-coroutines-core/jvm/test/guide/example-flow-07.kt).
<!--- TEST
1
2
3
-->
### Intermediate flow operators
Flows can be transformed with operators similarly to collections and sequences.
Intermediate operators are applied to an upstream flow and return a downstream flow.
These operators are cold, just like flows are. A call to such an operator is not
a suspending function itself. It works quickly, returning the definition of a new transformed flow.
The basic operators have familiar names like [map] and [filter].
The important difference from sequences is that blocks of
code inside those operators can call suspending functions.
For example, a flow of incoming requests can be
mapped to results with the [map] operator even when performing a request is a long-running
operation that is implemented by a suspending function:
<div class="sample" markdown="1" theme="idea" data-min-compiler-version="1.3">
```kotlin
import kotlinx.coroutines.*
import kotlinx.coroutines.flow.*
//sampleStart
suspend fun performRequest(request: Int): String {
delay(1000) // imitate long-running asynchronous work
return "response $request"
}
fun main() = runBlocking<Unit> {
(1..3).asFlow() // a flow of requests
.map { request -> performRequest(request) }
.collect { response -> println(response) }
}
//sampleEnd
```
</div>
> You can get full code [here](../kotlinx-coroutines-core/jvm/test/guide/example-flow-08.kt).
It produces the following three lines, each line appearing after a second:
```text
response 1
response 2
response 3
```
<!--- TEST -->
#### Transform operator
Among the flow transformation operators, the most general one is called [transform]. It can be used to imitate
simple transformations like [map] and [filter] as well as implement more complex transformations.
Using `transform` operator, you can [emit][FlowCollector.emit] arbitrary values an arbitrary number of times.
For example, using `transform` we can emit a string before performing a long-running asynchronous request
and follow it with a response:
<div class="sample" markdown="1" theme="idea" data-min-compiler-version="1.3">
```kotlin
import kotlinx.coroutines.*
import kotlinx.coroutines.flow.*
suspend fun performRequest(request: Int): String {
delay(1000) // imitate long-running asynchronous work
return "response $request"
}
fun main() = runBlocking<Unit> {
//sampleStart
(1..3).asFlow() // a flow of requests
.transform { request ->
emit("Making request $request")
emit(performRequest(request))
}
.collect { response -> println(response) }
//sampleEnd
}
```
</div>
> You can get full code [here](../kotlinx-coroutines-core/jvm/test/guide/example-flow-09.kt).
The output of this code is:
```text
Making request 1
response 1
Making request 2
response 2
Making request 3
response 3
```
<!--- TEST -->
#### Size-limiting operators
Size-limiting intermediate operators like [take] cancel the execution of the flow when the corresponding limit
is reached. Cancellation in coroutines is always performed by throwing an exception so that all the resource-management
functions (like `try { ... } finally { ... }` blocks) operate normally in case of cancellation:
<div class="sample" markdown="1" theme="idea" data-min-compiler-version="1.3">
```kotlin
import kotlinx.coroutines.*
import kotlinx.coroutines.flow.*
//sampleStart
fun numbers(): Flow<Int> = flow {
try {
emit(1)
emit(2)
println("This line will not execute")
emit(3)
} finally {
println("Finally in numbers")
}
}
fun main() = runBlocking<Unit> {
numbers()
.take(2) // take only the first two
.collect { value -> println(value) }
}
//sampleEnd
```
</div>
> You can get full code [here](../kotlinx-coroutines-core/jvm/test/guide/example-flow-10.kt).
The output of this code clearly shows that execution of the `flow { ... }` body in `numbers()` function
had stopped after emitting the second number:
```text
1
2
Finally in numbers
```
<!--- TEST -->
### Terminal flow operators
Terminal operators on flows are _suspending functions_ that start a collection of the flow.
The [collect] operator is the most basic one, but there are other terminal operators for
convenience:
* Conversion to various collections like [toList] and [toSet].
* Operators to get the [first] value and to ensure that a flow emits a [single] value.
* Reducing a flow to a value with [reduce] and [fold].
For example:
<div class="sample" markdown="1" theme="idea" data-min-compiler-version="1.3">
```kotlin
import kotlinx.coroutines.*
import kotlinx.coroutines.flow.*
fun main() = runBlocking<Unit> {
//sampleStart
val sum = (1..5).asFlow()
.map { it * it } // squares of numbers from 1 to 5
.reduce { a, b -> a + b } // sum them (terminal operator)
println(sum)
//sampleEnd
}
```
</div>
> You can get full code [here](../kotlinx-coroutines-core/jvm/test/guide/example-flow-11.kt).
Prints a single number:
```text
55
```
<!--- TEST -->
### Flows are sequential
Each individual collection of a flow is performed sequentially unless special operators that operate
on multiple flows are used. The collection works directly in the coroutine that calls a terminal operator.
No new coroutines are launched by default.
Each emitted value is processed by all intermediate operators from
upstream to downstream and is delivered to the terminal operator after that.
See the following example that filters even integers and maps them to strings:
<div class="sample" markdown="1" theme="idea" data-min-compiler-version="1.3">
```kotlin
import kotlinx.coroutines.*
import kotlinx.coroutines.flow.*
fun main() = runBlocking<Unit> {
//sampleStart
(1..5).asFlow()
.filter {
println("Filter $it")
it % 2 == 0
}
.map {
println("Map $it")
"string $it"
}.collect {
println("Collect $it")
}
//sampleEnd
}
```
</div>
> You can get full code [here](../kotlinx-coroutines-core/jvm/test/guide/example-flow-12.kt).
Producing:
```text
Filter 1
Filter 2
Map 2
Collect string 2
Filter 3
Filter 4
Map 4
Collect string 4
Filter 5
```
<!--- TEST -->
### Flow context
Collection of a flow always happens in the context of the calling coroutine. For example, if there is
a `foo` flow, then the following code runs in the context specified
by the author of this code, regardless of implementation details of the `foo` flow:
<div class="sample" markdown="1" theme="idea" data-highlight-only>
```kotlin
withContext(context) {
foo.collect { value ->
println(value) // run in the specified context
}
}
```
</div>
<!--- CLEAR -->
This property of a flow is called _context preservation_.
So, by default, code in the `flow { ... }` builder runs in the context that is provided by a collector
of the corresponding flow. For example, consider the implementation of `foo` that prints the thread
it is called on and emits three numbers:
<div class="sample" markdown="1" theme="idea" data-min-compiler-version="1.3">
```kotlin
import kotlinx.coroutines.*
import kotlinx.coroutines.flow.*
fun log(msg: String) = println("[${Thread.currentThread().name}] $msg")
//sampleStart
fun foo(): Flow<Int> = flow {
log("Started foo flow")
for (i in 1..3) {
emit(i)
}
}
fun main() = runBlocking<Unit> {
foo().collect { value -> log("Collected $value") }
}
//sampleEnd
```
</div>
> You can get full code [here](../kotlinx-coroutines-core/jvm/test/guide/example-flow-13.kt).
Running this code produces:
```text
[main @coroutine#1] Started foo flow
[main @coroutine#1] Collected 1
[main @coroutine#1] Collected 2
[main @coroutine#1] Collected 3
```
<!--- TEST FLEXIBLE_THREAD -->
Since `foo().collect` is called from the main thread, the body of `foo`'s flow is also called in the main thread.
This is a perfect default for fast-running or asynchronous code that does not care about the execution context and
does not block the caller.
#### Wrong emission withContext
However, the long-running CPU-consuming code might need to be executed in the context of [Dispatchers.Default] and UI-updating
code might need to be executed in the context of [Dispatchers.Main]. Usually, [withContext] is used
to change the context in code using Kotlin coroutines, but code in the `flow { ... }` builder has to honor context
preservation property and is not allowed to [emit][FlowCollector.emit] from a different context.
Try running the following code:
<div class="sample" markdown="1" theme="idea" data-min-compiler-version="1.3">
```kotlin
import kotlinx.coroutines.*
import kotlinx.coroutines.flow.*
//sampleStart
fun foo(): Flow<Int> = flow {
// WRONG way to change context for CPU-consuming code in flow builder
kotlinx.coroutines.withContext(Dispatchers.Default) {
for (i in 1..3) {
Thread.sleep(100) // pretend we are computing it in CPU-consuming way
emit(i) // emit next value
}
}
}
fun main() = runBlocking<Unit> {
foo().collect { value -> println(value) }
}
//sampleEnd
```
</div>
> You can get full code [here](../kotlinx-coroutines-core/jvm/test/guide/example-flow-14.kt).
This code produces the following exception:
<!--- TEST EXCEPTION
Exception in thread "main" java.lang.IllegalStateException: Flow invariant is violated:
Flow was collected in [CoroutineId(1), "coroutine#1":BlockingCoroutine{Active}@5511c7f8, BlockingEventLoop@2eac3323],
but emission happened in [CoroutineId(1), "coroutine#1":DispatchedCoroutine{Active}@2dae0000, DefaultDispatcher].
Please refer to 'flow' documentation or use 'flowOn' instead
at ...
-->
> Note that we had to use a fully qualified name of [kotlinx.coroutines.withContext][withContext] function in this example to
demonstrate this exception. A short name of `withContext` would have resolved to a special stub function that
produces compilation error to prevent us from running into this problem.
#### flowOn operator
The exception refers to [flowOn] function that shall be used to change the context of flow emission.
The correct way of changing the context of a flow is shown in the below example, which also prints
names of the corresponding threads to show how it all works:
<div class="sample" markdown="1" theme="idea" data-min-compiler-version="1.3">
```kotlin
import kotlinx.coroutines.*
import kotlinx.coroutines.flow.*
fun log(msg: String) = println("[${Thread.currentThread().name}] $msg")
//sampleStart
fun foo(): Flow<Int> = flow {
for (i in 1..3) {
Thread.sleep(100) // pretend we are computing it in CPU-consuming way
log("Emitting $i")
emit(i) // emit next value
}
}.flowOn(Dispatchers.Default) // RIGHT way to change context for CPU-consuming code in flow builder
fun main() = runBlocking<Unit> {
foo().collect { value ->
log("Collected $value")
}
}
//sampleEnd
```
</div>
> You can get full code [here](../kotlinx-coroutines-core/jvm/test/guide/example-flow-15.kt).
Notice how `flow { ... }` works in the background thread, while collection happens in the main thread:
<!--- TEST FLEXIBLE_THREAD
[DefaultDispatcher-worker-1 @coroutine#2] Emitting 1
[main @coroutine#1] Collected 1
[DefaultDispatcher-worker-1 @coroutine#2] Emitting 2
[main @coroutine#1] Collected 2
[DefaultDispatcher-worker-1 @coroutine#2] Emitting 3
[main @coroutine#1] Collected 3
-->
Another observation here is that [flowOn] operator had changed the default sequential nature of the flow.
Now collection happens in one coroutine ("coroutine#1") and emission happens in another coroutine
("coroutine#2") that is running in another thread concurrently with collecting coroutine. The [flowOn] operator
creates another coroutine for an upstream flow when it has to change the [CoroutineDispatcher] in its context.
### Buffering
Running different parts of a flow in different coroutines can be helpful from the standpoint of overall time it takes
to collect the flow, especially when long-running asynchronous operations are involved. For example, consider a case when
emission by `foo()` flow is slow, taking 100 ms to produce an element; and collector is also slow,
taking 300 ms to process an element. Let us see how long does it take to collect such a flow with three numbers:
<div class="sample" markdown="1" theme="idea" data-min-compiler-version="1.3">
```kotlin
import kotlinx.coroutines.*
import kotlinx.coroutines.flow.*
import kotlin.system.*
//sampleStart
fun foo(): Flow<Int> = flow {
for (i in 1..3) {
delay(100) // pretend we are asynchronously waiting 100 ms
emit(i) // emit next value
}
}
fun main() = runBlocking<Unit> {
val time = measureTimeMillis {
foo().collect { value ->
delay(300) // pretend we are processing it for 300 ms
println(value)
}
}
println("Collected in $time ms")
}
//sampleEnd
```
</div>
> You can get full code [here](../kotlinx-coroutines-core/jvm/test/guide/example-flow-16.kt).
It produces something like this, the whole collection taking around 1200 ms (three numbers times 400 ms each):
```text
1
2
3
Collected in 1220 ms
```
<!--- TEST ARBITRARY_TIME -->
We can use [buffer] operator on a flow to run emitting code of `foo()` concurrently with collecting code,
as opposed to running them sequentially:
<div class="sample" markdown="1" theme="idea" data-min-compiler-version="1.3">
```kotlin
import kotlinx.coroutines.*
import kotlinx.coroutines.flow.*
import kotlin.system.*
fun foo(): Flow<Int> = flow {
for (i in 1..3) {
delay(100) // pretend we are asynchronously waiting 100 ms
emit(i) // emit next value
}
}
fun main() = runBlocking<Unit> {
//sampleStart
val time = measureTimeMillis {
foo()
.buffer() // buffer emissions, don't wait
.collect { value ->
delay(300) // pretend we are processing it for 300 ms
println(value)
}
}
println("Collected in $time ms")
//sampleEnd
}
```
</div>
> You can get full code [here](../kotlinx-coroutines-core/jvm/test/guide/example-flow-17.kt).
It produces the same numbers faster, as we have effectively created a processing pipeline,
only having to wait 100 ms for the first number and then spending only 300 ms to process
each number. This way it takes around 1000 ms to run:
```text
1
2
3
Collected in 1071 ms
```
<!--- TEST ARBITRARY_TIME -->
> Note that [flowOn] operator uses the same buffering mechanism when it has to change [CoroutineDispatcher],
but here we explicitly request buffering without changing execution context.
#### Conflation
When flow represents partial results of some operation or operation status updates, it may not be necessary
to process each value, but only to process the most recent ones. In this case, [conflate] operator can be used to skip
intermediate values when a collector is too slow to process them. Building on the previous example:
<div class="sample" markdown="1" theme="idea" data-min-compiler-version="1.3">
```kotlin
import kotlinx.coroutines.*
import kotlinx.coroutines.flow.*
import kotlin.system.*
fun foo(): Flow<Int> = flow {
for (i in 1..3) {
delay(100) // pretend we are asynchronously waiting 100 ms
emit(i) // emit next value
}
}
fun main() = runBlocking<Unit> {
//sampleStart
val time = measureTimeMillis {
foo()
.conflate() // conflate emissions, don't process each one
.collect { value ->
delay(300) // pretend we are processing it for 300 ms
println(value)
}
}
println("Collected in $time ms")
//sampleEnd
}
```
</div>
> You can get full code [here](../kotlinx-coroutines-core/jvm/test/guide/example-flow-18.kt).
We see that while the first number was being processed the second and the third ones were already produced, so
the second one was _conflated_ and only the most recent (the third one) was delivered to the collector:
```text
1
3
Collected in 758 ms
```
<!--- TEST ARBITRARY_TIME -->
#### Processing the latest value
Conflation is one way to speed up processing when both emitter and collector are slow. It does that by dropping emitted values.
The other way is to cancel slow collector and restart it every time a new value is emitted. There is
a family of `xxxLatest` operators that perform the same essential logic of `xxx` operator, but cancel the
code in their block on a new value. Let us change the previous example from [conflate] to [collectLatest]:
<div class="sample" markdown="1" theme="idea" data-min-compiler-version="1.3">
```kotlin
import kotlinx.coroutines.*
import kotlinx.coroutines.flow.*
import kotlin.system.*
fun foo(): Flow<Int> = flow {
for (i in 1..3) {
delay(100) // pretend we are asynchronously waiting 100 ms
emit(i) // emit next value
}
}
fun main() = runBlocking<Unit> {
//sampleStart
val time = measureTimeMillis {
foo()
.collectLatest { value -> // cancel & restart on the latest value
println("Collecting $value")
delay(300) // pretend we are processing it for 300 ms
println("Done $value")
}
}
println("Collected in $time ms")
//sampleEnd
}
```
</div>
> You can get full code [here](../kotlinx-coroutines-core/jvm/test/guide/example-flow-19.kt).
Since the body of [collectLatest] takes 300 ms, but new values are emitted every 100 ms, we see that the block
is run on every value, but completes only for the last value:
```text
Collecting 1
Collecting 2
Collecting 3
Done 3
Collected in 741 ms
```
<!--- TEST ARBITRARY_TIME -->
### Composing multiple flows
There are several ways to compose multiple flows.
#### Zip
Similarly to [Sequence.zip] extension function in the Kotlin standard library,
flows have [zip] operator that combines the corresponding values of two flows:
<div class="sample" markdown="1" theme="idea" data-min-compiler-version="1.3">
```kotlin
import kotlinx.coroutines.*
import kotlinx.coroutines.flow.*
fun main() = runBlocking<Unit> {
//sampleStart
val nums = (1..3).asFlow() // numbers 1..3
val strs = flowOf("one", "two", "three") // strings
nums.zip(strs) { a, b -> "$a -> $b" } // compose a single string
.collect { println(it) } // collect and print
//sampleEnd
}
```
</div>
> You can get full code [here](../kotlinx-coroutines-core/jvm/test/guide/example-flow-20.kt).
This example prints:
```text
1 -> one
2 -> two
3 -> three
```
<!--- TEST -->
#### Combine
When flow represents the most recent value of some variable or operation (see also a related
section on [conflation](#conflation)) it might be needed to perform a computation that depends on
the most recent values of the corresponding flows and to recompute it whenever any of upstream
flows emit a value. The corresponding family of operators is called [combine].
For example, if the numbers in the previous example update every 300ms, but strings update every 400 ms,
then zipping them using [zip] operator would still produce the same result,
albeit results are going to be printed every 400 ms:
> We use [onEach] intermediate operator in this example to delay each element and thus make the code
that emits sample flows more declarative and shorter.
<div class="sample" markdown="1" theme="idea" data-min-compiler-version="1.3">
```kotlin
import kotlinx.coroutines.*
import kotlinx.coroutines.flow.*
fun main() = runBlocking<Unit> {
//sampleStart
val nums = (1..3).asFlow().onEach { delay(300) } // numbers 1..3 every 300 ms
val strs = flowOf("one", "two", "three").onEach { delay(400) } // strings every 400 ms
val startTime = System.currentTimeMillis() // remember the start time
nums.zip(strs) { a, b -> "$a -> $b" } // compose a single string with "zip"
.collect { value -> // collect and print
println("$value at ${System.currentTimeMillis() - startTime} ms from start")
}
//sampleEnd
}
```
</div>
> You can get full code [here](../kotlinx-coroutines-core/jvm/test/guide/example-flow-21.kt).
<!--- TEST ARBITRARY_TIME
1 -> one at 437 ms from start
2 -> two at 837 ms from start
3 -> three at 1243 ms from start
-->
However, using [combine] operator here instead of [zip]:
<div class="sample" markdown="1" theme="idea" data-min-compiler-version="1.3">
```kotlin
import kotlinx.coroutines.*
import kotlinx.coroutines.flow.*
fun main() = runBlocking<Unit> {
//sampleStart
val nums = (1..3).asFlow().onEach { delay(300) } // numbers 1..3 every 300 ms
val strs = flowOf("one", "two", "three").onEach { delay(400) } // strings every 400 ms
val startTime = System.currentTimeMillis() // remember the start time
nums.combine(strs) { a, b -> "$a -> $b" } // compose a single string with "combine"
.collect { value -> // collect and print
println("$value at ${System.currentTimeMillis() - startTime} ms from start")
}
//sampleEnd
}
```
</div>
> You can get full code [here](../kotlinx-coroutines-core/jvm/test/guide/example-flow-22.kt).
We get quite a different output, where a line is printed at each emission from either `nums` or `strs` flows:
```text
1 -> one at 452 ms from start
2 -> one at 651 ms from start
2 -> two at 854 ms from start
3 -> two at 952 ms from start
3 -> three at 1256 ms from start
```
<!--- TEST ARBITRARY_TIME -->
### Flattening flows
Flows represent asynchronously received sequences of values, so it is quite easy to get in a situation where
each value triggers a request for another sequence of values. For example, we can have the following
function that returns a flow of two strings 500 ms apart:
<div class="sample" markdown="1" theme="idea" data-highlight-only>
```kotlin
fun requestFlow(i: Int): Flow<String> = flow {
emit("$i: First")
delay(500) // wait 500 ms
emit("$i: Second")
}
```
</div>
<!--- CLEAR -->
Now if we have a flow of three integers and call `requestFlow` for each of them like this:
<div class="sample" markdown="1" theme="idea" data-highlight-only>
```kotlin
(1..3).asFlow().map { requestFlow(it) }
```
</div>
<!--- CLEAR -->
Then we end up with a flow of flows (`Flow<Flow<String>>`) that needs to be _flattened_ into a single flow for
further processing. Collections and sequences have [flatten][Sequence.flatten] and [flatMap][Sequence.flatMap]
operators for this purpose. However, the asynchronous nature of flows calls for different _modes_ of flattening
thus there is a family of flattening operators on flows.
#### flatMapConcat
Concatenating mode is implemented by [flatMapConcat] and [flattenConcat] operators. They are the most direct
analogues of the corresponding sequence operators. They wait for inner flow to complete before
starting to collect the next one as the following example shows:
<div class="sample" markdown="1" theme="idea" data-min-compiler-version="1.3">
```kotlin
import kotlinx.coroutines.*
import kotlinx.coroutines.flow.*
fun requestFlow(i: Int): Flow<String> = flow {
emit("$i: First")
delay(500) // wait 500 ms
emit("$i: Second")
}
fun main() = runBlocking<Unit> {
//sampleStart
val startTime = System.currentTimeMillis() // remember the start time
(1..3).asFlow().onEach { delay(100) } // a number every 100 ms
.flatMapConcat { requestFlow(it) }
.collect { value -> // collect and print
println("$value at ${System.currentTimeMillis() - startTime} ms from start")
}
//sampleEnd
}
```
</div>
> You can get full code [here](../kotlinx-coroutines-core/jvm/test/guide/example-flow-23.kt).
The sequential nature of [flatMapConcat] is clearly seen in the output:
```text
1: First at 121 ms from start
1: Second at 622 ms from start
2: First at 727 ms from start
2: Second at 1227 ms from start
3: First at 1328 ms from start
3: Second at 1829 ms from start
```
<!--- TEST ARBITRARY_TIME -->
#### flatMapMerge
Another flattening mode is to concurrently collect all the incoming flows and merge their values into
a single flow so that values are emitted as soon as possible.
It is implemented by [flatMapMerge] and [flattenMerge] operators. They both accept an optional
`concurrency` parameter that limits the number of concurrent flows that are collected at the same time
(it is equal to [DEFAULT_CONCURRENCY] by default).
<div class="sample" markdown="1" theme="idea" data-min-compiler-version="1.3">
```kotlin
import kotlinx.coroutines.*
import kotlinx.coroutines.flow.*
fun requestFlow(i: Int): Flow<String> = flow {
emit("$i: First")
delay(500) // wait 500 ms
emit("$i: Second")
}
fun main() = runBlocking<Unit> {
//sampleStart
val startTime = System.currentTimeMillis() // remember the start time
(1..3).asFlow().onEach { delay(100) } // a number every 100 ms
.flatMapMerge { requestFlow(it) }
.collect { value -> // collect and print
println("$value at ${System.currentTimeMillis() - startTime} ms from start")
}
//sampleEnd
}
```
</div>
> You can get full code [here](../kotlinx-coroutines-core/jvm/test/guide/example-flow-24.kt).
The concurrent nature of [flatMapMerge] is obvious:
```text
1: First at 136 ms from start
2: First at 231 ms from start
3: First at 333 ms from start
1: Second at 639 ms from start
2: Second at 732 ms from start
3: Second at 833 ms from start
```
<!--- TEST ARBITRARY_TIME -->
> Note that [flatMapMerge] call its block of code (`{ requestFlow(it) }` in this example) sequentially, but
collects the resulting flows concurrently, so it is equivalent to performing a sequential
`map { requestFlow(it) }` first and then calling [flattenMerge] on the result.
#### flatMapLatest
In a similar way to [collectLatest] operator that was shown in
["Processing the latest value"](#processing-the-latest-value) section, there is the corresponding "Latest"
flattening mode where collection of the previous flow is cancelled as soon as new flow is emitted.
It is implemented by [flatMapLatest] operator.
<div class="sample" markdown="1" theme="idea" data-min-compiler-version="1.3">
```kotlin
import kotlinx.coroutines.*
import kotlinx.coroutines.flow.*
fun requestFlow(i: Int): Flow<String> = flow {
emit("$i: First")
delay(500) // wait 500 ms
emit("$i: Second")
}
fun main() = runBlocking<Unit> {
//sampleStart
val startTime = System.currentTimeMillis() // remember the start time
(1..3).asFlow().onEach { delay(100) } // a number every 100 ms
.flatMapLatest { requestFlow(it) }
.collect { value -> // collect and print
println("$value at ${System.currentTimeMillis() - startTime} ms from start")
}
//sampleEnd
}
```
</div>
> You can get full code [here](../kotlinx-coroutines-core/jvm/test/guide/example-flow-25.kt).
The output of this example speaks for the way [flatMapLatest] works:
```text
1: First at 142 ms from start
2: First at 322 ms from start
3: First at 425 ms from start
3: Second at 931 ms from start
```
<!--- TEST ARBITRARY_TIME -->
> Note that [flatMapLatest] cancels all the code in its block (`{ requestFlow(it) }` in this example) on a new value.
It makes no difference in this particular example, because the call to `requestFlow` itself is fast, not-suspending,
and cannot be cancelled. However, it would show up if we were to use suspending functions like `delay` in there.
### Flow exceptions
Flow collection can complete with an exception when emitter or any code inside any of the operators throw an exception.
There are several ways to handle these exceptions.
#### Collector try and catch
A collector can use Kotlin's [`try/catch`][exceptions] block to handle exceptions:
<div class="sample" markdown="1" theme="idea" data-min-compiler-version="1.3">
```kotlin
import kotlinx.coroutines.*
import kotlinx.coroutines.flow.*
//sampleStart
fun foo(): Flow<Int> = flow {
for (i in 1..3) {
println("Emitting $i")
emit(i) // emit next value
}
}
fun main() = runBlocking<Unit> {
try {
foo().collect { value ->
println(value)
check(value <= 1) { "Collected $value" }
}
} catch (e: Throwable) {
println("Caught $e")
}
}
//sampleEnd
```
</div>
> You can get full code [here](../kotlinx-coroutines-core/jvm/test/guide/example-flow-26.kt).
This code successfully catches an exception in [collect] terminal operator and,
as you can see, no more values are emitted after that:
```text
Emitting 1
1
Emitting 2
2
Caught java.lang.IllegalStateException: Collected 2
```
<!--- TEST -->
#### Everything is caught
The previous example actually catches any exception happening in emitter or in any intermediate or terminal operators.
For example, let us change the code so that emitted values are [mapped][map] to strings,
but the corresponding code produces an exception:
<div class="sample" markdown="1" theme="idea" data-min-compiler-version="1.3">
```kotlin
import kotlinx.coroutines.*
import kotlinx.coroutines.flow.*
//sampleStart
fun foo(): Flow<String> =
flow {
for (i in 1..3) {
println("Emitting $i")
emit(i) // emit next value
}
}
.map { value ->
check(value <= 1) { "Crashed on $value" }
"string $value"
}
fun main() = runBlocking<Unit> {
try {
foo().collect { value -> println(value) }
} catch (e: Throwable) {
println("Caught $e")
}
}
//sampleEnd
```
</div>
> You can get full code [here](../kotlinx-coroutines-core/jvm/test/guide/example-flow-27.kt).
This exception is still caught and collection is stopped:
```text
Emitting 1
string 1
Emitting 2
Caught java.lang.IllegalStateException: Crashed on 2
```
<!--- TEST -->
### Exception transparency
But how can code of emitter encapsulate its exception handling behavior?
Flows must be _transparent to exceptions_ and it is a violation of exception transparency to [emit][FlowCollector.emit] values in the
`flow { ... }` builder from inside of `try/catch` block. This guarantees that a collector throwing an exception
can always catch it using `try/catch` as in the previous example.
The emitter can use [catch] operator that preserves this exception transparency and allows encapsulation
of its exception handling. The body of the `catch` operator can analyze an exception
and react to it in different ways depending on which exception was caught:
* Exceptions can be rethrown using `throw`.
* Exceptions can be turned into emission of values using [emit][FlowCollector.emit] from the body of [catch].
* Exceptions can be ignored, logged, or processed by some other code.
For example, let us emit a text on catching an exception:
<div class="sample" markdown="1" theme="idea" data-min-compiler-version="1.3">
```kotlin
import kotlinx.coroutines.*
import kotlinx.coroutines.flow.*
fun foo(): Flow<String> =
flow {
for (i in 1..3) {
println("Emitting $i")
emit(i) // emit next value
}
}
.map { value ->
check(value <= 1) { "Crashed on $value" }
"string $value"
}
fun main() = runBlocking<Unit> {
//sampleStart
foo()
.catch { e -> emit("Caught $e") } // emit on exception
.collect { value -> println(value) }
//sampleEnd
}
```
</div>
> You can get full code [here](../kotlinx-coroutines-core/jvm/test/guide/example-flow-28.kt).
The output of the example is the same, even though we do not have `try/catch` around the code anymore.
<!--- TEST
Emitting 1
string 1
Emitting 2
Caught java.lang.IllegalStateException: Crashed on 2
-->
#### Transparent catch
The [catch] intermediate operator, honoring exception transparency, catches only upstream exceptions
(that is an exception from all the operators above `catch`, but not below it).
If the block in `collect { ... }` (placed below `catch`) throws an exception then it escapes:
<div class="sample" markdown="1" theme="idea" data-min-compiler-version="1.3">
```kotlin
import kotlinx.coroutines.*
import kotlinx.coroutines.flow.*
//sampleStart
fun foo(): Flow<Int> = flow {
for (i in 1..3) {
println("Emitting $i")
emit(i)
}
}
fun main() = runBlocking<Unit> {
foo()
.catch { e -> println("Caught $e") } // does not catch downstream exceptions
.collect { value ->
check(value <= 1) { "Collected $value" }
println(value)
}
}
//sampleEnd
```
</div>
> You can get full code [here](../kotlinx-coroutines-core/jvm/test/guide/example-flow-29.kt).
The "Caught ..." message is not printed despite the `catch` operator:
<!--- TEST EXCEPTION
Emitting 1
1
Emitting 2
Exception in thread "main" java.lang.IllegalStateException: Collected 2
at ...
-->
#### Catching declaratively
We can combine a declarative nature of [catch] operator with a desire to handle all exceptions by moving the body
of [collect] operator into [onEach] and putting it before the `catch` operator. Collection of this flow must
be triggered by a call to `collect()` without parameters:
<div class="sample" markdown="1" theme="idea" data-min-compiler-version="1.3">
```kotlin
import kotlinx.coroutines.*
import kotlinx.coroutines.flow.*
fun foo(): Flow<Int> = flow {
for (i in 1..3) {
println("Emitting $i")
emit(i)
}
}
fun main() = runBlocking<Unit> {
//sampleStart
foo()
.onEach { value ->
check(value <= 1) { "Collected $value" }
println(value)
}
.catch { e -> println("Caught $e") }
.collect()
//sampleEnd
}
```
</div>
> You can get full code [here](../kotlinx-coroutines-core/jvm/test/guide/example-flow-30.kt).
Now we can see that "Caught ..." message is printed and thus we can catch all exceptions without explicitly
using a `try/catch` block:
<!--- TEST EXCEPTION
Emitting 1
1
Emitting 2
Caught java.lang.IllegalStateException: Collected 2
-->
### Flow completion
When flow collection completes (normally or exceptionally) it may be needed to execute some action.
As you might have already noticed, it also can be done in two ways: imperative and declarative.
#### Imperative finally block
In addition to `try`/`catch`, a collector can also use `finally` block to execute an action
upon `collect` completion.
<div class="sample" markdown="1" theme="idea" data-min-compiler-version="1.3">
```kotlin
import kotlinx.coroutines.*
import kotlinx.coroutines.flow.*
//sampleStart
fun foo(): Flow<Int> = (1..3).asFlow()
fun main() = runBlocking<Unit> {
try {
foo().collect { value -> println(value) }
} finally {
println("Done")
}
}
//sampleEnd
```
</div>
> You can get full code [here](../kotlinx-coroutines-core/jvm/test/guide/example-flow-31.kt).
This code prints three numbers produced by the `foo()` flow followed by "Done" string:
```text
1
2
3
Done
```
<!--- TEST -->
#### Declarative handling
For declarative approach, flow has [onCompletion] intermediate operator that is invoked
when the flow is completely collected.
The previous example can be rewritten using [onCompletion] operator and produces the same output:
<div class="sample" markdown="1" theme="idea" data-min-compiler-version="1.3">
```kotlin
import kotlinx.coroutines.*
import kotlinx.coroutines.flow.*
fun foo(): Flow<Int> = (1..3).asFlow()
fun main() = runBlocking<Unit> {
//sampleStart
foo()
.onCompletion { println("Done") }
.collect { value -> println(value) }
//sampleEnd
}
```
</div>
> You can get full code [here](../kotlinx-coroutines-core/jvm/test/guide/example-flow-32.kt).
<!--- TEST
1
2
3
Done
-->
The key advantage of [onCompletion] is a nullable `Throwable` parameter of the lambda that can be used
to determine whether flow collection was completed normally or exceptionally. In the following
example `foo()` flow throws exception after emitting number 1:
<div class="sample" markdown="1" theme="idea" data-min-compiler-version="1.3">
```kotlin
import kotlinx.coroutines.*
import kotlinx.coroutines.flow.*
//sampleStart
fun foo(): Flow<Int> = flow {
emit(1)
throw RuntimeException()
}
fun main() = runBlocking<Unit> {
foo()
.onCompletion { cause -> if (cause != null) println("Flow completed exceptionally") }
.catch { cause -> println("Caught exception") }
.collect { value -> println(value) }
}
//sampleEnd
```
</div>
> You can get full code [here](../kotlinx-coroutines-core/jvm/test/guide/example-flow-33.kt).
As you may expect, it prints:
```text
1
Flow completed exceptionally
Caught exception
```
<!--- TEST -->
[onCompletion] operator, unlike [catch], does not handle the exception. As we can see from the above
example code, the exception still flows downstream. It will be delivered to further `onCompletion` operators
and can be handled with `catch` operator.
#### Upstream exceptions only
Just like [catch] operator, [onCompletion] sees only exception coming from upstream and does not
see downstream exceptions. For example, run the following code:
<div class="sample" markdown="1" theme="idea" data-min-compiler-version="1.3">
```kotlin
import kotlinx.coroutines.*
import kotlinx.coroutines.flow.*
//sampleStart
fun foo(): Flow<Int> = (1..3).asFlow()
fun main() = runBlocking<Unit> {
foo()
.onCompletion { cause -> println("Flow completed with $cause") }
.collect { value ->
check(value <= 1) { "Collected $value" }
println(value)
}
}
//sampleEnd
```
</div>
> You can get full code [here](../kotlinx-coroutines-core/jvm/test/guide/example-flow-34.kt).
And you can see the completion cause is null, yet collection failed with exception:
```text
1
Flow completed with null
Exception in thread "main" java.lang.IllegalStateException: Collected 2
```
<!--- TEST EXCEPTION -->
### Imperative versus declarative
Now we know how to collect flow, handle its completion and exceptions in both imperative and declarative ways.
The natural question here is which approach should be preferred and why.
As a library, we do not advocate for any particular approach and believe that both options
are valid and should be selected according to your own preferences and code style.
### Launching flow
It is convenient to use flows to represent asynchronous events that are coming from some source.
In this case, we need an analogue of `addEventListener` function that registers a piece of code with a reaction
on incoming events and continues further work. The [onEach] operator can serve this role.
However, `onEach` is an intermediate operator. We also need a terminal operator to collect the flow.
Otherwise, just calling `onEach` has no effect.
If we use [collect] terminal operator after `onEach`, then code after it waits until the flow is collected:
<div class="sample" markdown="1" theme="idea" data-min-compiler-version="1.3">
```kotlin
import kotlinx.coroutines.*
import kotlinx.coroutines.flow.*
//sampleStart
// Imitate a flow of events
fun events(): Flow<Int> = (1..3).asFlow().onEach { delay(100) }
fun main() = runBlocking<Unit> {
events()
.onEach { event -> println("Event: $event") }
.collect() // <--- Collecting the flow waits
println("Done")
}
//sampleEnd
```
</div>
> You can get full code [here](../kotlinx-coroutines-core/jvm/test/guide/example-flow-35.kt).
As you can see, it prints:
```text
Event: 1
Event: 2
Event: 3
Done
```
<!--- TEST -->
Here [launchIn] terminal operator comes in handy. Replacing `collect` with `launchIn` we can
launch collection of the flow in a separate coroutine, so that execution of further code
immediately continues:
<div class="sample" markdown="1" theme="idea" data-min-compiler-version="1.3">
```kotlin
import kotlinx.coroutines.*
import kotlinx.coroutines.flow.*
// Imitate a flow of events
fun events(): Flow<Int> = (1..3).asFlow().onEach { delay(100) }
//sampleStart
fun main() = runBlocking<Unit> {
events()
.onEach { event -> println("Event: $event") }
.launchIn(this) // <--- Launching the flow in a separate coroutine
println("Done")
}
//sampleEnd
```
</div>
> You can get full code [here](../kotlinx-coroutines-core/jvm/test/guide/example-flow-36.kt).
It prints:
```text
Done
Event: 1
Event: 2
Event: 3
```
<!--- TEST -->
The required parameter to `launchIn` must specify a [CoroutineScope] in which the coroutine to collect the flow is
launched. In the above example this scope comes from [runBlocking]
coroutine builder, so while the flow is running this [runBlocking] scope waits for completion of its child coroutine
and keeps the main function from returning and terminating this example.
In real applications a scope is going to come from some entity with a limited
lifetime. As soon as the lifetime of this entity is terminated the corresponding scope is cancelled, cancelling
collection of the corresponding flow. This way the pair of `onEach { ... }.launchIn(scope)` works
like `addEventListener`. However, there is no need for the corresponding `removeEventListener` function,
as cancellation and structured concurrency serve this purpose.
Note, that [launchIn] also returns a [Job] which can be used to [cancel][Job.cancel] the corresponding flow collection
coroutine only without cancelling the whole scope or to [join][Job.join] it.
<!-- stdlib references -->
[collections]: https://kotlinlang.org/docs/reference/collections-overview.html
[List]: https://kotlinlang.org/api/latest/jvm/stdlib/kotlin.collections/-list/index.html
[forEach]: https://kotlinlang.org/api/latest/jvm/stdlib/kotlin.collections/for-each.html
[Sequence]: https://kotlinlang.org/api/latest/jvm/stdlib/kotlin.sequences/index.html
[Sequence.zip]: https://kotlinlang.org/api/latest/jvm/stdlib/kotlin.sequences/zip.html
[Sequence.flatten]: https://kotlinlang.org/api/latest/jvm/stdlib/kotlin.sequences/flatten.html
[Sequence.flatMap]: https://kotlinlang.org/api/latest/jvm/stdlib/kotlin.sequences/flat-map.html
[exceptions]: https://kotlinlang.org/docs/reference/exceptions.html
<!--- MODULE kotlinx-coroutines-core -->
<!--- INDEX kotlinx.coroutines -->
[delay]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines/delay.html
[withTimeoutOrNull]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines/with-timeout-or-null.html
[Dispatchers.Default]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines/-dispatchers/-default.html
[Dispatchers.Main]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines/-dispatchers/-main.html
[withContext]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines/with-context.html
[CoroutineDispatcher]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines/-coroutine-dispatcher/index.html
[CoroutineScope]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines/-coroutine-scope/index.html
[runBlocking]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines/run-blocking.html
[Job]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines/-job/index.html
[Job.cancel]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines/-job/cancel.html
[Job.join]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines/-job/join.html
<!--- INDEX kotlinx.coroutines.flow -->
[Flow]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.flow/-flow/index.html
[flow]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.flow/flow.html
[FlowCollector.emit]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.flow/-flow-collector/emit.html
[collect]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.flow/collect.html
[flowOf]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.flow/flow-of.html
[map]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.flow/map.html
[filter]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.flow/filter.html
[transform]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.flow/transform.html
[take]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.flow/take.html
[toList]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.flow/to-list.html
[toSet]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.flow/to-set.html
[first]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.flow/first.html
[single]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.flow/single.html
[reduce]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.flow/reduce.html
[fold]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.flow/fold.html
[flowOn]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.flow/flow-on.html
[buffer]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.flow/buffer.html
[conflate]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.flow/conflate.html
[collectLatest]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.flow/collect-latest.html
[zip]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.flow/zip.html
[combine]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.flow/combine.html
[onEach]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.flow/on-each.html
[flatMapConcat]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.flow/flat-map-concat.html
[flattenConcat]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.flow/flatten-concat.html
[flatMapMerge]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.flow/flat-map-merge.html
[flattenMerge]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.flow/flatten-merge.html
[DEFAULT_CONCURRENCY]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.flow/-d-e-f-a-u-l-t_-c-o-n-c-u-r-r-e-n-c-y.html
[flatMapLatest]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.flow/flat-map-latest.html
[catch]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.flow/catch.html
[onCompletion]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.flow/on-completion.html
[launchIn]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.flow/launch-in.html
<!--- END -->