| <!--- TEST_NAME FlowGuideTest --> |
| |
| [//]: # (title: Asynchronous Flow) |
| |
| A suspending function asynchronously returns a single value, but how can we return |
| multiple asynchronously computed values? This is where Kotlin Flows come in. |
| |
| ## Representing multiple values |
| |
| Multiple values can be represented in Kotlin using [collections]. |
| For example, we can have a `simple` function that returns a [List] |
| of three numbers and then print them all using [forEach]: |
| |
| ```kotlin |
| fun simple(): List<Int> = listOf(1, 2, 3) |
| |
| fun main() { |
| simple().forEach { value -> println(value) } |
| } |
| ``` |
| {kotlin-runnable="true" kotlin-min-compiler-version="1.3"} |
| |
| > You can get the full code from [here](../../kotlinx-coroutines-core/jvm/test/guide/example-flow-01.kt). |
| > |
| {type="note"} |
| |
| This code outputs: |
| |
| ```text |
| 1 |
| 2 |
| 3 |
| ``` |
| |
| <!--- TEST --> |
| |
| ### Sequences |
| |
| If we are computing the numbers with some CPU-consuming blocking code |
| (each computation taking 100ms), then we can represent the numbers using a [Sequence]: |
| |
| ```kotlin |
| fun simple(): Sequence<Int> = sequence { // sequence builder |
| for (i in 1..3) { |
| Thread.sleep(100) // pretend we are computing it |
| yield(i) // yield next value |
| } |
| } |
| |
| fun main() { |
| simple().forEach { value -> println(value) } |
| } |
| ``` |
| {kotlin-runnable="true" kotlin-min-compiler-version="1.3"} |
| |
| > You can get the full code from [here](../../kotlinx-coroutines-core/jvm/test/guide/example-flow-02.kt). |
| > |
| {type="note"} |
| |
| This code outputs the same numbers, but it waits 100ms before printing each one. |
| |
| <!--- TEST |
| 1 |
| 2 |
| 3 |
| --> |
| |
| ### Suspending functions |
| |
| However, this computation blocks the main thread that is running the code. |
| When these values are computed by asynchronous code we can mark the `simple` function with a `suspend` modifier, |
| so that it can perform its work without blocking and return the result as a list: |
| |
| ```kotlin |
| import kotlinx.coroutines.* |
| |
| //sampleStart |
| suspend fun simple(): List<Int> { |
| delay(1000) // pretend we are doing something asynchronous here |
| return listOf(1, 2, 3) |
| } |
| |
| fun main() = runBlocking<Unit> { |
| simple().forEach { value -> println(value) } |
| } |
| //sampleEnd |
| ``` |
| {kotlin-runnable="true" kotlin-min-compiler-version="1.3"} |
| |
| > You can get the full code from [here](../../kotlinx-coroutines-core/jvm/test/guide/example-flow-03.kt). |
| > |
| {type="note"} |
| |
| This code prints the numbers after waiting for a second. |
| |
| <!--- TEST |
| 1 |
| 2 |
| 3 |
| --> |
| |
| ### Flows |
| |
| Using the `List<Int>` result type, means we can only return all the values at once. To represent |
| the stream of values that are being computed asynchronously, we can use a [`Flow<Int>`][Flow] type just like we would use a `Sequence<Int>` type for synchronously computed values: |
| |
| ```kotlin |
| import kotlinx.coroutines.* |
| import kotlinx.coroutines.flow.* |
| |
| //sampleStart |
| fun simple(): Flow<Int> = flow { // flow builder |
| for (i in 1..3) { |
| delay(100) // pretend we are doing something useful here |
| emit(i) // emit next value |
| } |
| } |
| |
| fun main() = runBlocking<Unit> { |
| // Launch a concurrent coroutine to check if the main thread is blocked |
| launch { |
| for (k in 1..3) { |
| println("I'm not blocked $k") |
| delay(100) |
| } |
| } |
| // Collect the flow |
| simple().collect { value -> println(value) } |
| } |
| //sampleEnd |
| ``` |
| {kotlin-runnable="true" kotlin-min-compiler-version="1.3"} |
| |
| > You can get the full code from [here](../../kotlinx-coroutines-core/jvm/test/guide/example-flow-04.kt). |
| > |
| {type="note"} |
| |
| This code waits 100ms before printing each number without blocking the main thread. This is verified |
| by printing "I'm not blocked" every 100ms from a separate coroutine that is running in the main thread: |
| |
| ```text |
| I'm not blocked 1 |
| 1 |
| I'm not blocked 2 |
| 2 |
| I'm not blocked 3 |
| 3 |
| ``` |
| |
| <!--- TEST --> |
| |
| Notice the following differences in the code with the [Flow] from the earlier examples: |
| |
| * A builder function of [Flow] type is called [flow][_flow]. |
| * Code inside a `flow { ... }` builder block can suspend. |
| * The `simple` function is no longer marked with a `suspend` modifier. |
| * Values are _emitted_ from the flow using an [emit][FlowCollector.emit] function. |
| * Values are _collected_ from the flow using a [collect][collect] function. |
| |
| > We can replace [delay] with `Thread.sleep` in the body of `simple`'s `flow { ... }` and see that the main |
| > thread is blocked in this case. |
| > |
| {type="note"} |
| |
| ## Flows are cold |
| |
| Flows are _cold_ streams similar to sequences — the code inside a [flow][_flow] builder does not |
| run until the flow is collected. This becomes clear in the following example: |
| |
| ```kotlin |
| import kotlinx.coroutines.* |
| import kotlinx.coroutines.flow.* |
| |
| //sampleStart |
| fun simple(): Flow<Int> = flow { |
| println("Flow started") |
| for (i in 1..3) { |
| delay(100) |
| emit(i) |
| } |
| } |
| |
| fun main() = runBlocking<Unit> { |
| println("Calling simple function...") |
| val flow = simple() |
| println("Calling collect...") |
| flow.collect { value -> println(value) } |
| println("Calling collect again...") |
| flow.collect { value -> println(value) } |
| } |
| //sampleEnd |
| ``` |
| {kotlin-runnable="true" kotlin-min-compiler-version="1.3"} |
| |
| > You can get the full code from [here](../../kotlinx-coroutines-core/jvm/test/guide/example-flow-05.kt). |
| > |
| {type="note"} |
| |
| Which prints: |
| |
| ```text |
| Calling simple function... |
| Calling collect... |
| Flow started |
| 1 |
| 2 |
| 3 |
| Calling collect again... |
| Flow started |
| 1 |
| 2 |
| 3 |
| ``` |
| |
| <!--- TEST --> |
| |
| This is a key reason the `simple` function (which returns a flow) is not marked with `suspend` modifier. |
| The `simple()` call itself returns quickly and does not wait for anything. The flow starts afresh every time it is |
| collected and that is why we see "Flow started" every time we call `collect` again. |
| |
| ## Flow cancellation basics |
| |
| Flows adhere to the general cooperative cancellation of coroutines. As usual, flow collection can be |
| cancelled when the flow is suspended in a cancellable suspending function (like [delay]). |
| The following example shows how the flow gets cancelled on a timeout when running in a [withTimeoutOrNull] block |
| and stops executing its code: |
| |
| ```kotlin |
| import kotlinx.coroutines.* |
| import kotlinx.coroutines.flow.* |
| |
| //sampleStart |
| fun simple(): Flow<Int> = flow { |
| for (i in 1..3) { |
| delay(100) |
| println("Emitting $i") |
| emit(i) |
| } |
| } |
| |
| fun main() = runBlocking<Unit> { |
| withTimeoutOrNull(250) { // Timeout after 250ms |
| simple().collect { value -> println(value) } |
| } |
| println("Done") |
| } |
| //sampleEnd |
| ``` |
| {kotlin-runnable="true" kotlin-min-compiler-version="1.3"} |
| |
| > You can get the full code from [here](../../kotlinx-coroutines-core/jvm/test/guide/example-flow-06.kt). |
| > |
| {type="note"} |
| |
| Notice how only two numbers get emitted by the flow in the `simple` function, producing the following output: |
| |
| ```text |
| Emitting 1 |
| 1 |
| Emitting 2 |
| 2 |
| Done |
| ``` |
| |
| <!--- TEST --> |
| |
| See [Flow cancellation checks](#flow-cancellation-checks) section for more details. |
| |
| ## Flow builders |
| |
| The `flow { ... }` builder from the previous examples is the most basic one. There are other builders |
| that allow flows to be declared: |
| |
| * The [flowOf] builder defines a flow that emits a fixed set of values. |
| * Various collections and sequences can be converted to flows using the `.asFlow()` extension function. |
| |
| For example, the snippet that prints the numbers 1 to 3 from a flow can be rewritten as follows: |
| |
| ```kotlin |
| import kotlinx.coroutines.* |
| import kotlinx.coroutines.flow.* |
| |
| fun main() = runBlocking<Unit> { |
| //sampleStart |
| // Convert an integer range to a flow |
| (1..3).asFlow().collect { value -> println(value) } |
| //sampleEnd |
| } |
| ``` |
| {kotlin-runnable="true" kotlin-min-compiler-version="1.3"} |
| |
| > You can get the full code from [here](../../kotlinx-coroutines-core/jvm/test/guide/example-flow-07.kt). |
| > |
| {type="note"} |
| |
| <!--- TEST |
| 1 |
| 2 |
| 3 |
| --> |
| |
| ## Intermediate flow operators |
| |
| Flows can be transformed using operators, in the same way as you would transform collections and |
| sequences. |
| Intermediate operators are applied to an upstream flow and return a downstream flow. |
| These operators are cold, just like flows are. A call to such an operator is not |
| a suspending function itself. It works quickly, returning the definition of a new transformed flow. |
| |
| The basic operators have familiar names like [map] and [filter]. |
| An important difference of these operators from sequences is that blocks of |
| code inside these operators can call suspending functions. |
| |
| For example, a flow of incoming requests can be |
| mapped to its results with a [map] operator, even when performing a request is a long-running |
| operation that is implemented by a suspending function: |
| |
| ```kotlin |
| import kotlinx.coroutines.* |
| import kotlinx.coroutines.flow.* |
| |
| //sampleStart |
| suspend fun performRequest(request: Int): String { |
| delay(1000) // imitate long-running asynchronous work |
| return "response $request" |
| } |
| |
| fun main() = runBlocking<Unit> { |
| (1..3).asFlow() // a flow of requests |
| .map { request -> performRequest(request) } |
| .collect { response -> println(response) } |
| } |
| //sampleEnd |
| ``` |
| {kotlin-runnable="true" kotlin-min-compiler-version="1.3"} |
| |
| > You can get the full code from [here](../../kotlinx-coroutines-core/jvm/test/guide/example-flow-08.kt). |
| > |
| {type="note"} |
| |
| It produces the following three lines, each appearing one second after the previous: |
| |
| ```text |
| response 1 |
| response 2 |
| response 3 |
| ``` |
| |
| <!--- TEST --> |
| |
| ### Transform operator |
| |
| Among the flow transformation operators, the most general one is called [transform]. It can be used to imitate |
| simple transformations like [map] and [filter], as well as implement more complex transformations. |
| Using the `transform` operator, we can [emit][FlowCollector.emit] arbitrary values an arbitrary number of times. |
| |
| For example, using `transform` we can emit a string before performing a long-running asynchronous request |
| and follow it with a response: |
| |
| ```kotlin |
| import kotlinx.coroutines.* |
| import kotlinx.coroutines.flow.* |
| |
| suspend fun performRequest(request: Int): String { |
| delay(1000) // imitate long-running asynchronous work |
| return "response $request" |
| } |
| |
| fun main() = runBlocking<Unit> { |
| //sampleStart |
| (1..3).asFlow() // a flow of requests |
| .transform { request -> |
| emit("Making request $request") |
| emit(performRequest(request)) |
| } |
| .collect { response -> println(response) } |
| //sampleEnd |
| } |
| ``` |
| {kotlin-runnable="true" kotlin-min-compiler-version="1.3"} |
| |
| > You can get the full code from [here](../../kotlinx-coroutines-core/jvm/test/guide/example-flow-09.kt). |
| > |
| {type="note"} |
| |
| The output of this code is: |
| |
| ```text |
| Making request 1 |
| response 1 |
| Making request 2 |
| response 2 |
| Making request 3 |
| response 3 |
| ``` |
| |
| <!--- TEST --> |
| |
| ### Size-limiting operators |
| |
| Size-limiting intermediate operators like [take] cancel the execution of the flow when the corresponding limit |
| is reached. Cancellation in coroutines is always performed by throwing an exception, so that all the resource-management |
| functions (like `try { ... } finally { ... }` blocks) operate normally in case of cancellation: |
| |
| ```kotlin |
| import kotlinx.coroutines.* |
| import kotlinx.coroutines.flow.* |
| |
| //sampleStart |
| fun numbers(): Flow<Int> = flow { |
| try { |
| emit(1) |
| emit(2) |
| println("This line will not execute") |
| emit(3) |
| } finally { |
| println("Finally in numbers") |
| } |
| } |
| |
| fun main() = runBlocking<Unit> { |
| numbers() |
| .take(2) // take only the first two |
| .collect { value -> println(value) } |
| } |
| //sampleEnd |
| ``` |
| {kotlin-runnable="true" kotlin-min-compiler-version="1.3"} |
| |
| > You can get the full code from [here](../../kotlinx-coroutines-core/jvm/test/guide/example-flow-10.kt). |
| > |
| {type="note"} |
| |
| The output of this code clearly shows that the execution of the `flow { ... }` body in the `numbers()` function |
| stopped after emitting the second number: |
| |
| ```text |
| 1 |
| 2 |
| Finally in numbers |
| ``` |
| |
| <!--- TEST --> |
| |
| ## Terminal flow operators |
| |
| Terminal operators on flows are _suspending functions_ that start a collection of the flow. |
| The [collect] operator is the most basic one, but there are other terminal operators, which can make it easier: |
| |
| * Conversion to various collections like [toList] and [toSet]. |
| * Operators to get the [first] value and to ensure that a flow emits a [single] value. |
| * Reducing a flow to a value with [reduce] and [fold]. |
| |
| For example: |
| |
| ```kotlin |
| import kotlinx.coroutines.* |
| import kotlinx.coroutines.flow.* |
| |
| fun main() = runBlocking<Unit> { |
| //sampleStart |
| val sum = (1..5).asFlow() |
| .map { it * it } // squares of numbers from 1 to 5 |
| .reduce { a, b -> a + b } // sum them (terminal operator) |
| println(sum) |
| //sampleEnd |
| } |
| ``` |
| {kotlin-runnable="true" kotlin-min-compiler-version="1.3"} |
| |
| > You can get the full code from [here](../../kotlinx-coroutines-core/jvm/test/guide/example-flow-11.kt). |
| > |
| {type="note"} |
| |
| Prints a single number: |
| |
| ```text |
| 55 |
| ``` |
| |
| <!--- TEST --> |
| |
| ## Flows are sequential |
| |
| Each individual collection of a flow is performed sequentially unless special operators that operate |
| on multiple flows are used. The collection works directly in the coroutine that calls a terminal operator. |
| No new coroutines are launched by default. |
| Each emitted value is processed by all the intermediate operators from |
| upstream to downstream and is then delivered to the terminal operator after. |
| |
| See the following example that filters the even integers and maps them to strings: |
| |
| ```kotlin |
| import kotlinx.coroutines.* |
| import kotlinx.coroutines.flow.* |
| |
| fun main() = runBlocking<Unit> { |
| //sampleStart |
| (1..5).asFlow() |
| .filter { |
| println("Filter $it") |
| it % 2 == 0 |
| } |
| .map { |
| println("Map $it") |
| "string $it" |
| }.collect { |
| println("Collect $it") |
| } |
| //sampleEnd |
| } |
| ``` |
| {kotlin-runnable="true" kotlin-min-compiler-version="1.3"} |
| |
| > You can get the full code from [here](../../kotlinx-coroutines-core/jvm/test/guide/example-flow-12.kt). |
| > |
| {type="note"} |
| |
| Producing: |
| |
| ```text |
| Filter 1 |
| Filter 2 |
| Map 2 |
| Collect string 2 |
| Filter 3 |
| Filter 4 |
| Map 4 |
| Collect string 4 |
| Filter 5 |
| ``` |
| |
| <!--- TEST --> |
| |
| ## Flow context |
| |
| Collection of a flow always happens in the context of the calling coroutine. For example, if there is |
| a `simple` flow, then the following code runs in the context specified |
| by the author of this code, regardless of the implementation details of the `simple` flow: |
| |
| ```kotlin |
| withContext(context) { |
| simple().collect { value -> |
| println(value) // run in the specified context |
| } |
| } |
| ``` |
| |
| <!--- CLEAR --> |
| |
| This property of a flow is called _context preservation_. |
| |
| So, by default, code in the `flow { ... }` builder runs in the context that is provided by a collector |
| of the corresponding flow. For example, consider the implementation of a `simple` function that prints the thread |
| it is called on and emits three numbers: |
| |
| ```kotlin |
| import kotlinx.coroutines.* |
| import kotlinx.coroutines.flow.* |
| |
| fun log(msg: String) = println("[${Thread.currentThread().name}] $msg") |
| |
| //sampleStart |
| fun simple(): Flow<Int> = flow { |
| log("Started simple flow") |
| for (i in 1..3) { |
| emit(i) |
| } |
| } |
| |
| fun main() = runBlocking<Unit> { |
| simple().collect { value -> log("Collected $value") } |
| } |
| //sampleEnd |
| ``` |
| {kotlin-runnable="true" kotlin-min-compiler-version="1.3"} |
| |
| > You can get the full code from [here](../../kotlinx-coroutines-core/jvm/test/guide/example-flow-13.kt). |
| > |
| {type="note"} |
| |
| Running this code produces: |
| |
| ```text |
| [main @coroutine#1] Started simple flow |
| [main @coroutine#1] Collected 1 |
| [main @coroutine#1] Collected 2 |
| [main @coroutine#1] Collected 3 |
| ``` |
| |
| <!--- TEST FLEXIBLE_THREAD --> |
| |
| Since `simple().collect` is called from the main thread, the body of `simple`'s flow is also called in the main thread. |
| This is the perfect default for fast-running or asynchronous code that does not care about the execution context and |
| does not block the caller. |
| |
| ### A common pitfall when using withContext |
| |
| However, the long-running CPU-consuming code might need to be executed in the context of [Dispatchers.Default] and UI-updating |
| code might need to be executed in the context of [Dispatchers.Main]. Usually, [withContext] is used |
| to change the context in the code using Kotlin coroutines, but code in the `flow { ... }` builder has to honor the context |
| preservation property and is not allowed to [emit][FlowCollector.emit] from a different context. |
| |
| Try running the following code: |
| |
| ```kotlin |
| import kotlinx.coroutines.* |
| import kotlinx.coroutines.flow.* |
| |
| //sampleStart |
| fun simple(): Flow<Int> = flow { |
| // The WRONG way to change context for CPU-consuming code in flow builder |
| kotlinx.coroutines.withContext(Dispatchers.Default) { |
| for (i in 1..3) { |
| Thread.sleep(100) // pretend we are computing it in CPU-consuming way |
| emit(i) // emit next value |
| } |
| } |
| } |
| |
| fun main() = runBlocking<Unit> { |
| simple().collect { value -> println(value) } |
| } |
| //sampleEnd |
| ``` |
| {kotlin-runnable="true" kotlin-min-compiler-version="1.3"} |
| |
| > You can get the full code from [here](../../kotlinx-coroutines-core/jvm/test/guide/example-flow-14.kt). |
| > |
| {type="note"} |
| |
| This code produces the following exception: |
| |
| ```text |
| Exception in thread "main" java.lang.IllegalStateException: Flow invariant is violated: |
| Flow was collected in [CoroutineId(1), "coroutine#1":BlockingCoroutine{Active}@5511c7f8, BlockingEventLoop@2eac3323], |
| but emission happened in [CoroutineId(1), "coroutine#1":DispatchedCoroutine{Active}@2dae0000, Dispatchers.Default]. |
| Please refer to 'flow' documentation or use 'flowOn' instead |
| at ... |
| ``` |
| |
| <!--- TEST EXCEPTION --> |
| |
| ### flowOn operator |
| |
| The exception refers to the [flowOn] function that shall be used to change the context of the flow emission. |
| The correct way to change the context of a flow is shown in the example below, which also prints the |
| names of the corresponding threads to show how it all works: |
| |
| ```kotlin |
| import kotlinx.coroutines.* |
| import kotlinx.coroutines.flow.* |
| |
| fun log(msg: String) = println("[${Thread.currentThread().name}] $msg") |
| |
| //sampleStart |
| fun simple(): Flow<Int> = flow { |
| for (i in 1..3) { |
| Thread.sleep(100) // pretend we are computing it in CPU-consuming way |
| log("Emitting $i") |
| emit(i) // emit next value |
| } |
| }.flowOn(Dispatchers.Default) // RIGHT way to change context for CPU-consuming code in flow builder |
| |
| fun main() = runBlocking<Unit> { |
| simple().collect { value -> |
| log("Collected $value") |
| } |
| } |
| //sampleEnd |
| ``` |
| {kotlin-runnable="true" kotlin-min-compiler-version="1.3"} |
| |
| > You can get the full code from [here](../../kotlinx-coroutines-core/jvm/test/guide/example-flow-15.kt). |
| > |
| {type="note"} |
| |
| Notice how `flow { ... }` works in the background thread, while collection happens in the main thread: |
| |
| <!--- TEST FLEXIBLE_THREAD |
| [DefaultDispatcher-worker-1 @coroutine#2] Emitting 1 |
| [main @coroutine#1] Collected 1 |
| [DefaultDispatcher-worker-1 @coroutine#2] Emitting 2 |
| [main @coroutine#1] Collected 2 |
| [DefaultDispatcher-worker-1 @coroutine#2] Emitting 3 |
| [main @coroutine#1] Collected 3 |
| --> |
| |
| Another thing to observe here is that the [flowOn] operator has changed the default sequential nature of the flow. |
| Now collection happens in one coroutine ("coroutine#1") and emission happens in another coroutine |
| ("coroutine#2") that is running in another thread concurrently with the collecting coroutine. The [flowOn] operator |
| creates another coroutine for an upstream flow when it has to change the [CoroutineDispatcher] in its context. |
| |
| ## Buffering |
| |
| Running different parts of a flow in different coroutines can be helpful from the standpoint of the overall time it takes |
| to collect the flow, especially when long-running asynchronous operations are involved. For example, consider a case when |
| the emission by a `simple` flow is slow, taking 100 ms to produce an element; and collector is also slow, |
| taking 300 ms to process an element. Let's see how long it takes to collect such a flow with three numbers: |
| |
| ```kotlin |
| import kotlinx.coroutines.* |
| import kotlinx.coroutines.flow.* |
| import kotlin.system.* |
| |
| //sampleStart |
| fun simple(): Flow<Int> = flow { |
| for (i in 1..3) { |
| delay(100) // pretend we are asynchronously waiting 100 ms |
| emit(i) // emit next value |
| } |
| } |
| |
| fun main() = runBlocking<Unit> { |
| val time = measureTimeMillis { |
| simple().collect { value -> |
| delay(300) // pretend we are processing it for 300 ms |
| println(value) |
| } |
| } |
| println("Collected in $time ms") |
| } |
| //sampleEnd |
| ``` |
| {kotlin-runnable="true" kotlin-min-compiler-version="1.3"} |
| |
| > You can get the full code from [here](../../kotlinx-coroutines-core/jvm/test/guide/example-flow-16.kt). |
| > |
| {type="note"} |
| |
| It produces something like this, with the whole collection taking around 1200 ms (three numbers, 400 ms for each): |
| |
| ```text |
| 1 |
| 2 |
| 3 |
| Collected in 1220 ms |
| ``` |
| |
| <!--- TEST ARBITRARY_TIME --> |
| |
| We can use a [buffer] operator on a flow to run emitting code of the `simple` flow concurrently with collecting code, |
| as opposed to running them sequentially: |
| |
| ```kotlin |
| import kotlinx.coroutines.* |
| import kotlinx.coroutines.flow.* |
| import kotlin.system.* |
| |
| fun simple(): Flow<Int> = flow { |
| for (i in 1..3) { |
| delay(100) // pretend we are asynchronously waiting 100 ms |
| emit(i) // emit next value |
| } |
| } |
| |
| fun main() = runBlocking<Unit> { |
| //sampleStart |
| val time = measureTimeMillis { |
| simple() |
| .buffer() // buffer emissions, don't wait |
| .collect { value -> |
| delay(300) // pretend we are processing it for 300 ms |
| println(value) |
| } |
| } |
| println("Collected in $time ms") |
| //sampleEnd |
| } |
| ``` |
| {kotlin-runnable="true" kotlin-min-compiler-version="1.3"} |
| |
| > You can get the full code from [here](../../kotlinx-coroutines-core/jvm/test/guide/example-flow-17.kt). |
| > |
| {type="note"} |
| |
| It produces the same numbers just faster, as we have effectively created a processing pipeline, |
| having to only wait 100 ms for the first number and then spending only 300 ms to process |
| each number. This way it takes around 1000 ms to run: |
| |
| ```text |
| 1 |
| 2 |
| 3 |
| Collected in 1071 ms |
| ``` |
| |
| <!--- TEST ARBITRARY_TIME --> |
| |
| > Note that the [flowOn] operator uses the same buffering mechanism when it has to change a [CoroutineDispatcher], |
| > but here we explicitly request buffering without changing the execution context. |
| > |
| {type="note"} |
| |
| ### Conflation |
| |
| When a flow represents partial results of the operation or operation status updates, it may not be necessary |
| to process each value, but instead, only most recent ones. In this case, the [conflate] operator can be used to skip |
| intermediate values when a collector is too slow to process them. Building on the previous example: |
| |
| ```kotlin |
| import kotlinx.coroutines.* |
| import kotlinx.coroutines.flow.* |
| import kotlin.system.* |
| |
| fun simple(): Flow<Int> = flow { |
| for (i in 1..3) { |
| delay(100) // pretend we are asynchronously waiting 100 ms |
| emit(i) // emit next value |
| } |
| } |
| |
| fun main() = runBlocking<Unit> { |
| //sampleStart |
| val time = measureTimeMillis { |
| simple() |
| .conflate() // conflate emissions, don't process each one |
| .collect { value -> |
| delay(300) // pretend we are processing it for 300 ms |
| println(value) |
| } |
| } |
| println("Collected in $time ms") |
| //sampleEnd |
| } |
| ``` |
| {kotlin-runnable="true" kotlin-min-compiler-version="1.3"} |
| |
| > You can get the full code from [here](../../kotlinx-coroutines-core/jvm/test/guide/example-flow-18.kt). |
| > |
| {type="note"} |
| |
| We see that while the first number was still being processed the second, and third were already produced, so |
| the second one was _conflated_ and only the most recent (the third one) was delivered to the collector: |
| |
| ```text |
| 1 |
| 3 |
| Collected in 758 ms |
| ``` |
| |
| <!--- TEST ARBITRARY_TIME --> |
| |
| ### Processing the latest value |
| |
| Conflation is one way to speed up processing when both the emitter and collector are slow. It does it by dropping emitted values. |
| The other way is to cancel a slow collector and restart it every time a new value is emitted. There is |
| a family of `xxxLatest` operators that perform the same essential logic of a `xxx` operator, but cancel the |
| code in their block on a new value. Let's try changing [conflate] to [collectLatest] in the previous example: |
| |
| ```kotlin |
| import kotlinx.coroutines.* |
| import kotlinx.coroutines.flow.* |
| import kotlin.system.* |
| |
| fun simple(): Flow<Int> = flow { |
| for (i in 1..3) { |
| delay(100) // pretend we are asynchronously waiting 100 ms |
| emit(i) // emit next value |
| } |
| } |
| |
| fun main() = runBlocking<Unit> { |
| //sampleStart |
| val time = measureTimeMillis { |
| simple() |
| .collectLatest { value -> // cancel & restart on the latest value |
| println("Collecting $value") |
| delay(300) // pretend we are processing it for 300 ms |
| println("Done $value") |
| } |
| } |
| println("Collected in $time ms") |
| //sampleEnd |
| } |
| ``` |
| {kotlin-runnable="true" kotlin-min-compiler-version="1.3"} |
| |
| > You can get the full code from [here](../../kotlinx-coroutines-core/jvm/test/guide/example-flow-19.kt). |
| > |
| {type="note"} |
| |
| Since the body of [collectLatest] takes 300 ms, but new values are emitted every 100 ms, we see that the block |
| is run on every value, but completes only for the last value: |
| |
| ```text |
| Collecting 1 |
| Collecting 2 |
| Collecting 3 |
| Done 3 |
| Collected in 741 ms |
| ``` |
| |
| <!--- TEST ARBITRARY_TIME --> |
| |
| ## Composing multiple flows |
| |
| There are lots of ways to compose multiple flows. |
| |
| ### Zip |
| |
| Just like the [Sequence.zip] extension function in the Kotlin standard library, |
| flows have a [zip] operator that combines the corresponding values of two flows: |
| |
| ```kotlin |
| import kotlinx.coroutines.* |
| import kotlinx.coroutines.flow.* |
| |
| fun main() = runBlocking<Unit> { |
| //sampleStart |
| val nums = (1..3).asFlow() // numbers 1..3 |
| val strs = flowOf("one", "two", "three") // strings |
| nums.zip(strs) { a, b -> "$a -> $b" } // compose a single string |
| .collect { println(it) } // collect and print |
| //sampleEnd |
| } |
| ``` |
| {kotlin-runnable="true" kotlin-min-compiler-version="1.3"} |
| |
| > You can get the full code from [here](../../kotlinx-coroutines-core/jvm/test/guide/example-flow-20.kt). |
| > |
| {type="note"} |
| |
| This example prints: |
| |
| ```text |
| 1 -> one |
| 2 -> two |
| 3 -> three |
| ``` |
| |
| <!--- TEST --> |
| |
| ### Combine |
| |
| When flow represents the most recent value of a variable or operation (see also the related |
| section on [conflation](#conflation)), it might be needed to perform a computation that depends on |
| the most recent values of the corresponding flows and to recompute it whenever any of the upstream |
| flows emit a value. The corresponding family of operators is called [combine]. |
| |
| For example, if the numbers in the previous example update every 300ms, but strings update every 400 ms, |
| then zipping them using the [zip] operator will still produce the same result, |
| albeit results that are printed every 400 ms: |
| |
| > We use a [onEach] intermediate operator in this example to delay each element and make the code |
| > that emits sample flows more declarative and shorter. |
| > |
| {type="note"} |
| |
| ```kotlin |
| import kotlinx.coroutines.* |
| import kotlinx.coroutines.flow.* |
| |
| fun main() = runBlocking<Unit> { |
| //sampleStart |
| val nums = (1..3).asFlow().onEach { delay(300) } // numbers 1..3 every 300 ms |
| val strs = flowOf("one", "two", "three").onEach { delay(400) } // strings every 400 ms |
| val startTime = System.currentTimeMillis() // remember the start time |
| nums.zip(strs) { a, b -> "$a -> $b" } // compose a single string with "zip" |
| .collect { value -> // collect and print |
| println("$value at ${System.currentTimeMillis() - startTime} ms from start") |
| } |
| //sampleEnd |
| } |
| ``` |
| {kotlin-runnable="true" kotlin-min-compiler-version="1.3"} |
| |
| > You can get the full code from [here](../../kotlinx-coroutines-core/jvm/test/guide/example-flow-21.kt). |
| > |
| {type="note"} |
| |
| <!--- TEST ARBITRARY_TIME |
| 1 -> one at 437 ms from start |
| 2 -> two at 837 ms from start |
| 3 -> three at 1243 ms from start |
| --> |
| |
| However, when using a [combine] operator here instead of a [zip]: |
| |
| ```kotlin |
| import kotlinx.coroutines.* |
| import kotlinx.coroutines.flow.* |
| |
| fun main() = runBlocking<Unit> { |
| //sampleStart |
| val nums = (1..3).asFlow().onEach { delay(300) } // numbers 1..3 every 300 ms |
| val strs = flowOf("one", "two", "three").onEach { delay(400) } // strings every 400 ms |
| val startTime = System.currentTimeMillis() // remember the start time |
| nums.combine(strs) { a, b -> "$a -> $b" } // compose a single string with "combine" |
| .collect { value -> // collect and print |
| println("$value at ${System.currentTimeMillis() - startTime} ms from start") |
| } |
| //sampleEnd |
| } |
| ``` |
| {kotlin-runnable="true" kotlin-min-compiler-version="1.3"} |
| |
| > You can get the full code from [here](../../kotlinx-coroutines-core/jvm/test/guide/example-flow-22.kt). |
| > |
| {type="note"} |
| |
| We get quite a different output, where a line is printed at each emission from either `nums` or `strs` flows: |
| |
| ```text |
| 1 -> one at 452 ms from start |
| 2 -> one at 651 ms from start |
| 2 -> two at 854 ms from start |
| 3 -> two at 952 ms from start |
| 3 -> three at 1256 ms from start |
| ``` |
| |
| <!--- TEST ARBITRARY_TIME --> |
| |
| ## Flattening flows |
| |
| Flows represent asynchronously received sequences of values, and so it is quite easy to get into a situation |
| where each value triggers a request for another sequence of values. For example, we can have the following |
| function that returns a flow of two strings 500 ms apart: |
| |
| ```kotlin |
| fun requestFlow(i: Int): Flow<String> = flow { |
| emit("$i: First") |
| delay(500) // wait 500 ms |
| emit("$i: Second") |
| } |
| ``` |
| |
| <!--- CLEAR --> |
| |
| Now if we have a flow of three integers and call `requestFlow` on each of them like this: |
| |
| ```kotlin |
| (1..3).asFlow().map { requestFlow(it) } |
| ``` |
| |
| <!--- CLEAR --> |
| |
| Then we will end up with a flow of flows (`Flow<Flow<String>>`) that needs to be _flattened_ into a single flow for |
| further processing. Collections and sequences have [flatten][Sequence.flatten] and [flatMap][Sequence.flatMap] |
| operators for this. However, due to the asynchronous nature of flows they call for different _modes_ of flattening, |
| and hence, a family of flattening operators on flows exists. |
| |
| ### flatMapConcat |
| |
| Concatenation of flows of flows is provided by the [flatMapConcat] and [flattenConcat] operators. They are the |
| most direct analogues of the corresponding sequence operators. They wait for the inner flow to complete before |
| starting to collect the next one as the following example shows: |
| |
| ```kotlin |
| import kotlinx.coroutines.* |
| import kotlinx.coroutines.flow.* |
| |
| fun requestFlow(i: Int): Flow<String> = flow { |
| emit("$i: First") |
| delay(500) // wait 500 ms |
| emit("$i: Second") |
| } |
| |
| fun main() = runBlocking<Unit> { |
| //sampleStart |
| val startTime = System.currentTimeMillis() // remember the start time |
| (1..3).asFlow().onEach { delay(100) } // emit a number every 100 ms |
| .flatMapConcat { requestFlow(it) } |
| .collect { value -> // collect and print |
| println("$value at ${System.currentTimeMillis() - startTime} ms from start") |
| } |
| //sampleEnd |
| } |
| ``` |
| {kotlin-runnable="true" kotlin-min-compiler-version="1.3"} |
| |
| > You can get the full code from [here](../../kotlinx-coroutines-core/jvm/test/guide/example-flow-23.kt). |
| > |
| {type="note"} |
| |
| The sequential nature of [flatMapConcat] is clearly seen in the output: |
| |
| ```text |
| 1: First at 121 ms from start |
| 1: Second at 622 ms from start |
| 2: First at 727 ms from start |
| 2: Second at 1227 ms from start |
| 3: First at 1328 ms from start |
| 3: Second at 1829 ms from start |
| ``` |
| |
| <!--- TEST ARBITRARY_TIME --> |
| |
| ### flatMapMerge |
| |
| Another flattening operation is to concurrently collect all the incoming flows and merge their values into |
| a single flow so that values are emitted as soon as possible. |
| It is implemented by [flatMapMerge] and [flattenMerge] operators. They both accept an optional |
| `concurrency` parameter that limits the number of concurrent flows that are collected at the same time |
| (it is equal to [DEFAULT_CONCURRENCY] by default). |
| |
| ```kotlin |
| import kotlinx.coroutines.* |
| import kotlinx.coroutines.flow.* |
| |
| fun requestFlow(i: Int): Flow<String> = flow { |
| emit("$i: First") |
| delay(500) // wait 500 ms |
| emit("$i: Second") |
| } |
| |
| fun main() = runBlocking<Unit> { |
| //sampleStart |
| val startTime = System.currentTimeMillis() // remember the start time |
| (1..3).asFlow().onEach { delay(100) } // a number every 100 ms |
| .flatMapMerge { requestFlow(it) } |
| .collect { value -> // collect and print |
| println("$value at ${System.currentTimeMillis() - startTime} ms from start") |
| } |
| //sampleEnd |
| } |
| ``` |
| {kotlin-runnable="true" kotlin-min-compiler-version="1.3"} |
| |
| > You can get the full code from [here](../../kotlinx-coroutines-core/jvm/test/guide/example-flow-24.kt). |
| > |
| {type="note"} |
| |
| The concurrent nature of [flatMapMerge] is obvious: |
| |
| ```text |
| 1: First at 136 ms from start |
| 2: First at 231 ms from start |
| 3: First at 333 ms from start |
| 1: Second at 639 ms from start |
| 2: Second at 732 ms from start |
| 3: Second at 833 ms from start |
| ``` |
| |
| <!--- TEST ARBITRARY_TIME --> |
| |
| > Note that the [flatMapMerge] calls its block of code (`{ requestFlow(it) }` in this example) sequentially, but |
| > collects the resulting flows concurrently, it is the equivalent of performing a sequential |
| > `map { requestFlow(it) }` first and then calling [flattenMerge] on the result. |
| > |
| {type="note"} |
| |
| ### flatMapLatest |
| |
| In a similar way to the [collectLatest] operator, that was described in the section |
| ["Processing the latest value"](#processing-the-latest-value), there is the corresponding "Latest" |
| flattening mode where the collection of the previous flow is cancelled as soon as new flow is emitted. |
| It is implemented by the [flatMapLatest] operator. |
| |
| ```kotlin |
| import kotlinx.coroutines.* |
| import kotlinx.coroutines.flow.* |
| |
| fun requestFlow(i: Int): Flow<String> = flow { |
| emit("$i: First") |
| delay(500) // wait 500 ms |
| emit("$i: Second") |
| } |
| |
| fun main() = runBlocking<Unit> { |
| //sampleStart |
| val startTime = System.currentTimeMillis() // remember the start time |
| (1..3).asFlow().onEach { delay(100) } // a number every 100 ms |
| .flatMapLatest { requestFlow(it) } |
| .collect { value -> // collect and print |
| println("$value at ${System.currentTimeMillis() - startTime} ms from start") |
| } |
| //sampleEnd |
| } |
| ``` |
| {kotlin-runnable="true" kotlin-min-compiler-version="1.3"} |
| |
| > You can get the full code from [here](../../kotlinx-coroutines-core/jvm/test/guide/example-flow-25.kt). |
| > |
| {type="note"} |
| |
| The output here in this example is a good demonstration of how [flatMapLatest] works: |
| |
| ```text |
| 1: First at 142 ms from start |
| 2: First at 322 ms from start |
| 3: First at 425 ms from start |
| 3: Second at 931 ms from start |
| ``` |
| |
| <!--- TEST ARBITRARY_TIME --> |
| |
| > Note that [flatMapLatest] cancels all the code in its block (`{ requestFlow(it) }` in this example) when a new value |
| > is received. |
| > It makes no difference in this particular example, because the call to `requestFlow` itself is fast, not-suspending, |
| > and cannot be cancelled. However, a differnce in output would be visible if we were to use suspending functions |
| > like `delay` in `requestFlow`. |
| > |
| {type="note"} |
| |
| ## Flow exceptions |
| |
| Flow collection can complete with an exception when an emitter or code inside the operators throw an exception. |
| There are several ways to handle these exceptions. |
| |
| ### Collector try and catch |
| |
| A collector can use Kotlin's [`try/catch`][exceptions] block to handle exceptions: |
| |
| ```kotlin |
| import kotlinx.coroutines.* |
| import kotlinx.coroutines.flow.* |
| |
| //sampleStart |
| fun simple(): Flow<Int> = flow { |
| for (i in 1..3) { |
| println("Emitting $i") |
| emit(i) // emit next value |
| } |
| } |
| |
| fun main() = runBlocking<Unit> { |
| try { |
| simple().collect { value -> |
| println(value) |
| check(value <= 1) { "Collected $value" } |
| } |
| } catch (e: Throwable) { |
| println("Caught $e") |
| } |
| } |
| //sampleEnd |
| ``` |
| {kotlin-runnable="true" kotlin-min-compiler-version="1.3"} |
| |
| > You can get the full code from [here](../../kotlinx-coroutines-core/jvm/test/guide/example-flow-26.kt). |
| > |
| {type="note"} |
| |
| This code successfully catches an exception in [collect] terminal operator and, |
| as we see, no more values are emitted after that: |
| |
| ```text |
| Emitting 1 |
| 1 |
| Emitting 2 |
| 2 |
| Caught java.lang.IllegalStateException: Collected 2 |
| ``` |
| |
| <!--- TEST --> |
| |
| ### Everything is caught |
| |
| The previous example actually catches any exception happening in the emitter or in any intermediate or terminal operators. |
| For example, let's change the code so that emitted values are [mapped][map] to strings, |
| but the corresponding code produces an exception: |
| |
| ```kotlin |
| import kotlinx.coroutines.* |
| import kotlinx.coroutines.flow.* |
| |
| //sampleStart |
| fun simple(): Flow<String> = |
| flow { |
| for (i in 1..3) { |
| println("Emitting $i") |
| emit(i) // emit next value |
| } |
| } |
| .map { value -> |
| check(value <= 1) { "Crashed on $value" } |
| "string $value" |
| } |
| |
| fun main() = runBlocking<Unit> { |
| try { |
| simple().collect { value -> println(value) } |
| } catch (e: Throwable) { |
| println("Caught $e") |
| } |
| } |
| //sampleEnd |
| ``` |
| {kotlin-runnable="true" kotlin-min-compiler-version="1.3"} |
| |
| > You can get the full code from [here](../../kotlinx-coroutines-core/jvm/test/guide/example-flow-27.kt). |
| > |
| {type="note"} |
| |
| This exception is still caught and collection is stopped: |
| |
| ```text |
| Emitting 1 |
| string 1 |
| Emitting 2 |
| Caught java.lang.IllegalStateException: Crashed on 2 |
| ``` |
| |
| <!--- TEST --> |
| |
| ## Exception transparency |
| |
| But how can code of the emitter encapsulate its exception handling behavior? |
| |
| Flows must be _transparent to exceptions_ and it is a violation of the exception transparency to [emit][FlowCollector.emit] values in the |
| `flow { ... }` builder from inside of a `try/catch` block. This guarantees that a collector throwing an exception |
| can always catch it using `try/catch` as in the previous example. |
| |
| The emitter can use a [catch] operator that preserves this exception transparency and allows encapsulation |
| of its exception handling. The body of the `catch` operator can analyze an exception |
| and react to it in different ways depending on which exception was caught: |
| |
| * Exceptions can be rethrown using `throw`. |
| * Exceptions can be turned into emission of values using [emit][FlowCollector.emit] from the body of [catch]. |
| * Exceptions can be ignored, logged, or processed by some other code. |
| |
| For example, let us emit the text on catching an exception: |
| |
| ```kotlin |
| import kotlinx.coroutines.* |
| import kotlinx.coroutines.flow.* |
| |
| fun simple(): Flow<String> = |
| flow { |
| for (i in 1..3) { |
| println("Emitting $i") |
| emit(i) // emit next value |
| } |
| } |
| .map { value -> |
| check(value <= 1) { "Crashed on $value" } |
| "string $value" |
| } |
| |
| fun main() = runBlocking<Unit> { |
| //sampleStart |
| simple() |
| .catch { e -> emit("Caught $e") } // emit on exception |
| .collect { value -> println(value) } |
| //sampleEnd |
| } |
| ``` |
| {kotlin-runnable="true" kotlin-min-compiler-version="1.3"} |
| |
| > You can get the full code from [here](../../kotlinx-coroutines-core/jvm/test/guide/example-flow-28.kt). |
| > |
| {type="note"} |
| |
| The output of the example is the same, even though we do not have `try/catch` around the code anymore. |
| |
| <!--- TEST |
| Emitting 1 |
| string 1 |
| Emitting 2 |
| Caught java.lang.IllegalStateException: Crashed on 2 |
| --> |
| |
| ### Transparent catch |
| |
| The [catch] intermediate operator, honoring exception transparency, catches only upstream exceptions |
| (that is an exception from all the operators above `catch`, but not below it). |
| If the block in `collect { ... }` (placed below `catch`) throws an exception then it escapes: |
| |
| ```kotlin |
| import kotlinx.coroutines.* |
| import kotlinx.coroutines.flow.* |
| |
| //sampleStart |
| fun simple(): Flow<Int> = flow { |
| for (i in 1..3) { |
| println("Emitting $i") |
| emit(i) |
| } |
| } |
| |
| fun main() = runBlocking<Unit> { |
| simple() |
| .catch { e -> println("Caught $e") } // does not catch downstream exceptions |
| .collect { value -> |
| check(value <= 1) { "Collected $value" } |
| println(value) |
| } |
| } |
| //sampleEnd |
| ``` |
| {kotlin-runnable="true" kotlin-min-compiler-version="1.3"} |
| |
| > You can get the full code from [here](../../kotlinx-coroutines-core/jvm/test/guide/example-flow-29.kt). |
| > |
| {type="note"} |
| |
| A "Caught ..." message is not printed despite there being a `catch` operator: |
| |
| ```text |
| Emitting 1 |
| 1 |
| Emitting 2 |
| Exception in thread "main" java.lang.IllegalStateException: Collected 2 |
| at ... |
| ``` |
| |
| <!--- TEST EXCEPTION --> |
| |
| ### Catching declaratively |
| |
| We can combine the declarative nature of the [catch] operator with a desire to handle all the exceptions, by moving the body |
| of the [collect] operator into [onEach] and putting it before the `catch` operator. Collection of this flow must |
| be triggered by a call to `collect()` without parameters: |
| |
| ```kotlin |
| import kotlinx.coroutines.* |
| import kotlinx.coroutines.flow.* |
| |
| fun simple(): Flow<Int> = flow { |
| for (i in 1..3) { |
| println("Emitting $i") |
| emit(i) |
| } |
| } |
| |
| fun main() = runBlocking<Unit> { |
| //sampleStart |
| simple() |
| .onEach { value -> |
| check(value <= 1) { "Collected $value" } |
| println(value) |
| } |
| .catch { e -> println("Caught $e") } |
| .collect() |
| //sampleEnd |
| } |
| ``` |
| {kotlin-runnable="true" kotlin-min-compiler-version="1.3"} |
| |
| > You can get the full code from [here](../../kotlinx-coroutines-core/jvm/test/guide/example-flow-30.kt). |
| > |
| {type="note"} |
| |
| Now we can see that a "Caught ..." message is printed and so we can catch all the exceptions without explicitly |
| using a `try/catch` block: |
| |
| ```text |
| Emitting 1 |
| 1 |
| Emitting 2 |
| Caught java.lang.IllegalStateException: Collected 2 |
| ``` |
| |
| <!--- TEST EXCEPTION --> |
| |
| ## Flow completion |
| |
| When flow collection completes (normally or exceptionally) it may need to execute an action. |
| As you may have already noticed, it can be done in two ways: imperative or declarative. |
| |
| ### Imperative finally block |
| |
| In addition to `try`/`catch`, a collector can also use a `finally` block to execute an action |
| upon `collect` completion. |
| |
| ```kotlin |
| import kotlinx.coroutines.* |
| import kotlinx.coroutines.flow.* |
| |
| //sampleStart |
| fun simple(): Flow<Int> = (1..3).asFlow() |
| |
| fun main() = runBlocking<Unit> { |
| try { |
| simple().collect { value -> println(value) } |
| } finally { |
| println("Done") |
| } |
| } |
| //sampleEnd |
| ``` |
| {kotlin-runnable="true" kotlin-min-compiler-version="1.3"} |
| |
| > You can get the full code from [here](../../kotlinx-coroutines-core/jvm/test/guide/example-flow-31.kt). |
| > |
| {type="note"} |
| |
| This code prints three numbers produced by the `simple` flow followed by a "Done" string: |
| |
| ```text |
| 1 |
| 2 |
| 3 |
| Done |
| ``` |
| |
| <!--- TEST --> |
| |
| ### Declarative handling |
| |
| For the declarative approach, flow has [onCompletion] intermediate operator that is invoked |
| when the flow has completely collected. |
| |
| The previous example can be rewritten using an [onCompletion] operator and produces the same output: |
| |
| ```kotlin |
| import kotlinx.coroutines.* |
| import kotlinx.coroutines.flow.* |
| |
| fun simple(): Flow<Int> = (1..3).asFlow() |
| |
| fun main() = runBlocking<Unit> { |
| //sampleStart |
| simple() |
| .onCompletion { println("Done") } |
| .collect { value -> println(value) } |
| //sampleEnd |
| } |
| ``` |
| {kotlin-runnable="true" kotlin-min-compiler-version="1.3"} |
| |
| > You can get the full code from [here](../../kotlinx-coroutines-core/jvm/test/guide/example-flow-32.kt). |
| > |
| {type="note"} |
| |
| <!--- TEST |
| 1 |
| 2 |
| 3 |
| Done |
| --> |
| |
| The key advantage of [onCompletion] is a nullable `Throwable` parameter of the lambda that can be used |
| to determine whether the flow collection was completed normally or exceptionally. In the following |
| example the `simple` flow throws an exception after emitting the number 1: |
| |
| ```kotlin |
| import kotlinx.coroutines.* |
| import kotlinx.coroutines.flow.* |
| |
| //sampleStart |
| fun simple(): Flow<Int> = flow { |
| emit(1) |
| throw RuntimeException() |
| } |
| |
| fun main() = runBlocking<Unit> { |
| simple() |
| .onCompletion { cause -> if (cause != null) println("Flow completed exceptionally") } |
| .catch { cause -> println("Caught exception") } |
| .collect { value -> println(value) } |
| } |
| //sampleEnd |
| ``` |
| {kotlin-runnable="true" kotlin-min-compiler-version="1.3"} |
| |
| > You can get the full code from [here](../../kotlinx-coroutines-core/jvm/test/guide/example-flow-33.kt). |
| > |
| {type="note"} |
| |
| As you may expect, it prints: |
| |
| ```text |
| 1 |
| Flow completed exceptionally |
| Caught exception |
| ``` |
| |
| <!--- TEST --> |
| |
| The [onCompletion] operator, unlike [catch], does not handle the exception. As we can see from the above |
| example code, the exception still flows downstream. It will be delivered to further `onCompletion` operators |
| and can be handled with a `catch` operator. |
| |
| ### Successful completion |
| |
| Another difference with [catch] operator is that [onCompletion] sees all exceptions and receives |
| a `null` exception only on successful completion of the upstream flow (without cancellation or failure). |
| |
| ```kotlin |
| import kotlinx.coroutines.* |
| import kotlinx.coroutines.flow.* |
| |
| //sampleStart |
| fun simple(): Flow<Int> = (1..3).asFlow() |
| |
| fun main() = runBlocking<Unit> { |
| simple() |
| .onCompletion { cause -> println("Flow completed with $cause") } |
| .collect { value -> |
| check(value <= 1) { "Collected $value" } |
| println(value) |
| } |
| } |
| //sampleEnd |
| ``` |
| {kotlin-runnable="true" kotlin-min-compiler-version="1.3"} |
| |
| > You can get the full code from [here](../../kotlinx-coroutines-core/jvm/test/guide/example-flow-34.kt). |
| > |
| {type="note"} |
| |
| We can see the completion cause is not null, because the flow was aborted due to downstream exception: |
| |
| ```text |
| 1 |
| Flow completed with java.lang.IllegalStateException: Collected 2 |
| Exception in thread "main" java.lang.IllegalStateException: Collected 2 |
| ``` |
| |
| <!--- TEST EXCEPTION --> |
| |
| ## Imperative versus declarative |
| |
| Now we know how to collect flow, and handle its completion and exceptions in both imperative and declarative ways. |
| The natural question here is, which approach is preferred and why? |
| As a library, we do not advocate for any particular approach and believe that both options |
| are valid and should be selected according to your own preferences and code style. |
| |
| ## Launching flow |
| |
| It is easy to use flows to represent asynchronous events that are coming from some source. |
| In this case, we need an analogue of the `addEventListener` function that registers a piece of code with a reaction |
| for incoming events and continues further work. The [onEach] operator can serve this role. |
| However, `onEach` is an intermediate operator. We also need a terminal operator to collect the flow. |
| Otherwise, just calling `onEach` has no effect. |
| |
| If we use the [collect] terminal operator after `onEach`, then the code after it will wait until the flow is collected: |
| |
| ```kotlin |
| import kotlinx.coroutines.* |
| import kotlinx.coroutines.flow.* |
| |
| //sampleStart |
| // Imitate a flow of events |
| fun events(): Flow<Int> = (1..3).asFlow().onEach { delay(100) } |
| |
| fun main() = runBlocking<Unit> { |
| events() |
| .onEach { event -> println("Event: $event") } |
| .collect() // <--- Collecting the flow waits |
| println("Done") |
| } |
| //sampleEnd |
| ``` |
| {kotlin-runnable="true" kotlin-min-compiler-version="1.3"} |
| |
| > You can get the full code from [here](../../kotlinx-coroutines-core/jvm/test/guide/example-flow-35.kt). |
| > |
| {type="note"} |
| |
| As you can see, it prints: |
| |
| ```text |
| Event: 1 |
| Event: 2 |
| Event: 3 |
| Done |
| ``` |
| |
| <!--- TEST --> |
| |
| The [launchIn] terminal operator comes in handy here. By replacing `collect` with `launchIn` we can |
| launch a collection of the flow in a separate coroutine, so that execution of further code |
| immediately continues: |
| |
| ```kotlin |
| import kotlinx.coroutines.* |
| import kotlinx.coroutines.flow.* |
| |
| // Imitate a flow of events |
| fun events(): Flow<Int> = (1..3).asFlow().onEach { delay(100) } |
| |
| //sampleStart |
| fun main() = runBlocking<Unit> { |
| events() |
| .onEach { event -> println("Event: $event") } |
| .launchIn(this) // <--- Launching the flow in a separate coroutine |
| println("Done") |
| } |
| //sampleEnd |
| ``` |
| {kotlin-runnable="true" kotlin-min-compiler-version="1.3"} |
| |
| > You can get the full code from [here](../../kotlinx-coroutines-core/jvm/test/guide/example-flow-36.kt). |
| > |
| {type="note"} |
| |
| It prints: |
| |
| ```text |
| Done |
| Event: 1 |
| Event: 2 |
| Event: 3 |
| ``` |
| |
| <!--- TEST --> |
| |
| The required parameter to `launchIn` must specify a [CoroutineScope] in which the coroutine to collect the flow is |
| launched. In the above example this scope comes from the [runBlocking] |
| coroutine builder, so while the flow is running, this [runBlocking] scope waits for completion of its child coroutine |
| and keeps the main function from returning and terminating this example. |
| |
| In actual applications a scope will come from an entity with a limited |
| lifetime. As soon as the lifetime of this entity is terminated the corresponding scope is cancelled, cancelling |
| the collection of the corresponding flow. This way the pair of `onEach { ... }.launchIn(scope)` works |
| like the `addEventListener`. However, there is no need for the corresponding `removeEventListener` function, |
| as cancellation and structured concurrency serve this purpose. |
| |
| Note that [launchIn] also returns a [Job], which can be used to [cancel][Job.cancel] the corresponding flow collection |
| coroutine only without cancelling the whole scope or to [join][Job.join] it. |
| |
| ### Flow cancellation checks |
| |
| For convenience, the [flow][_flow] builder performs additional [ensureActive] checks for cancellation on each emitted value. |
| It means that a busy loop emitting from a `flow { ... }` is cancellable: |
| |
| ```kotlin |
| import kotlinx.coroutines.* |
| import kotlinx.coroutines.flow.* |
| |
| //sampleStart |
| fun foo(): Flow<Int> = flow { |
| for (i in 1..5) { |
| println("Emitting $i") |
| emit(i) |
| } |
| } |
| |
| fun main() = runBlocking<Unit> { |
| foo().collect { value -> |
| if (value == 3) cancel() |
| println(value) |
| } |
| } |
| //sampleEnd |
| ``` |
| {kotlin-runnable="true" kotlin-min-compiler-version="1.3"} |
| |
| > You can get the full code from [here](../../kotlinx-coroutines-core/jvm/test/guide/example-flow-37.kt). |
| > |
| {type="note"} |
| |
| We get only numbers up to 3 and a [CancellationException] after trying to emit number 4: |
| |
| ```text |
| Emitting 1 |
| 1 |
| Emitting 2 |
| 2 |
| Emitting 3 |
| 3 |
| Emitting 4 |
| Exception in thread "main" kotlinx.coroutines.JobCancellationException: BlockingCoroutine was cancelled; job="coroutine#1":BlockingCoroutine{Cancelled}@6d7b4f4c |
| ``` |
| |
| <!--- TEST EXCEPTION --> |
| |
| However, most other flow operators do not do additional cancellation checks on their own for performance reasons. |
| For example, if you use [IntRange.asFlow] extension to write the same busy loop and don't suspend anywhere, |
| then there are no checks for cancellation: |
| |
| ```kotlin |
| import kotlinx.coroutines.* |
| import kotlinx.coroutines.flow.* |
| |
| //sampleStart |
| fun main() = runBlocking<Unit> { |
| (1..5).asFlow().collect { value -> |
| if (value == 3) cancel() |
| println(value) |
| } |
| } |
| //sampleEnd |
| ``` |
| {kotlin-runnable="true" kotlin-min-compiler-version="1.3"} |
| |
| > You can get the full code from [here](../../kotlinx-coroutines-core/jvm/test/guide/example-flow-38.kt). |
| > |
| {type="note"} |
| |
| All numbers from 1 to 5 are collected and cancellation gets detected only before return from `runBlocking`: |
| |
| ```text |
| 1 |
| 2 |
| 3 |
| 4 |
| 5 |
| Exception in thread "main" kotlinx.coroutines.JobCancellationException: BlockingCoroutine was cancelled; job="coroutine#1":BlockingCoroutine{Cancelled}@3327bd23 |
| ``` |
| |
| <!--- TEST EXCEPTION --> |
| |
| #### Making busy flow cancellable |
| |
| In the case where you have a busy loop with coroutines you must explicitly check for cancellation. |
| You can add `.onEach { currentCoroutineContext().ensureActive() }`, but there is a ready-to-use |
| [cancellable] operator provided to do that: |
| |
| ```kotlin |
| import kotlinx.coroutines.* |
| import kotlinx.coroutines.flow.* |
| |
| //sampleStart |
| fun main() = runBlocking<Unit> { |
| (1..5).asFlow().cancellable().collect { value -> |
| if (value == 3) cancel() |
| println(value) |
| } |
| } |
| //sampleEnd |
| ``` |
| {kotlin-runnable="true" kotlin-min-compiler-version="1.3"} |
| |
| > You can get the full code from [here](../../kotlinx-coroutines-core/jvm/test/guide/example-flow-39.kt). |
| > |
| {type="note"} |
| |
| With the `cancellable` operator only the numbers from 1 to 3 are collected: |
| |
| ```text |
| 1 |
| 2 |
| 3 |
| Exception in thread "main" kotlinx.coroutines.JobCancellationException: BlockingCoroutine was cancelled; job="coroutine#1":BlockingCoroutine{Cancelled}@5ec0a365 |
| ``` |
| |
| <!--- TEST EXCEPTION --> |
| |
| ## Flow and Reactive Streams |
| |
| For those who are familiar with [Reactive Streams](https://www.reactive-streams.org/) or reactive frameworks such as RxJava and project Reactor, |
| design of the Flow may look very familiar. |
| |
| Indeed, its design was inspired by Reactive Streams and its various implementations. But Flow main goal is to have as simple design as possible, |
| be Kotlin and suspension friendly and respect structured concurrency. Achieving this goal would be impossible without reactive pioneers and their tremendous work. You can read the complete story in [Reactive Streams and Kotlin Flows](https://medium.com/@elizarov/reactive-streams-and-kotlin-flows-bfd12772cda4) article. |
| |
| While being different, conceptually, Flow *is* a reactive stream and it is possible to convert it to the reactive (spec and TCK compliant) Publisher and vice versa. |
| Such converters are provided by `kotlinx.coroutines` out-of-the-box and can be found in corresponding reactive modules (`kotlinx-coroutines-reactive` for Reactive Streams, `kotlinx-coroutines-reactor` for Project Reactor and `kotlinx-coroutines-rx2`/`kotlinx-coroutines-rx3` for RxJava2/RxJava3). |
| Integration modules include conversions from and to `Flow`, integration with Reactor's `Context` and suspension-friendly ways to work with various reactive entities. |
| |
| <!-- stdlib references --> |
| |
| [collections]: https://kotlinlang.org/docs/reference/collections-overview.html |
| [List]: https://kotlinlang.org/api/latest/jvm/stdlib/kotlin.collections/-list/ |
| [forEach]: https://kotlinlang.org/api/latest/jvm/stdlib/kotlin.collections/for-each.html |
| [Sequence]: https://kotlinlang.org/api/latest/jvm/stdlib/kotlin.sequences/ |
| [Sequence.zip]: https://kotlinlang.org/api/latest/jvm/stdlib/kotlin.sequences/zip.html |
| [Sequence.flatten]: https://kotlinlang.org/api/latest/jvm/stdlib/kotlin.sequences/flatten.html |
| [Sequence.flatMap]: https://kotlinlang.org/api/latest/jvm/stdlib/kotlin.sequences/flat-map.html |
| [exceptions]: https://kotlinlang.org/docs/reference/exceptions.html |
| |
| <!--- MODULE kotlinx-coroutines-core --> |
| <!--- INDEX kotlinx.coroutines --> |
| |
| [delay]: https://kotlinlang.org/api/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines/delay.html |
| [withTimeoutOrNull]: https://kotlinlang.org/api/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines/with-timeout-or-null.html |
| [Dispatchers.Default]: https://kotlinlang.org/api/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines/-dispatchers/-default.html |
| [Dispatchers.Main]: https://kotlinlang.org/api/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines/-dispatchers/-main.html |
| [withContext]: https://kotlinlang.org/api/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines/with-context.html |
| [CoroutineDispatcher]: https://kotlinlang.org/api/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines/-coroutine-dispatcher/index.html |
| [CoroutineScope]: https://kotlinlang.org/api/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines/-coroutine-scope/index.html |
| [runBlocking]: https://kotlinlang.org/api/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines/run-blocking.html |
| [Job]: https://kotlinlang.org/api/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines/-job/index.html |
| [Job.cancel]: https://kotlinlang.org/api/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines/cancel.html |
| [Job.join]: https://kotlinlang.org/api/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines/-job/join.html |
| [ensureActive]: https://kotlinlang.org/api/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines/ensure-active.html |
| [CancellationException]: https://kotlinlang.org/api/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines/-cancellation-exception/index.html |
| |
| <!--- INDEX kotlinx.coroutines.flow --> |
| |
| [Flow]: https://kotlinlang.org/api/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.flow/-flow/index.html |
| [_flow]: https://kotlinlang.org/api/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.flow/flow.html |
| [FlowCollector.emit]: https://kotlinlang.org/api/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.flow/-flow-collector/emit.html |
| [collect]: https://kotlinlang.org/api/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.flow/collect.html |
| [flowOf]: https://kotlinlang.org/api/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.flow/flow-of.html |
| [map]: https://kotlinlang.org/api/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.flow/map.html |
| [filter]: https://kotlinlang.org/api/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.flow/filter.html |
| [transform]: https://kotlinlang.org/api/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.flow/transform.html |
| [take]: https://kotlinlang.org/api/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.flow/take.html |
| [toList]: https://kotlinlang.org/api/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.flow/to-list.html |
| [toSet]: https://kotlinlang.org/api/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.flow/to-set.html |
| [first]: https://kotlinlang.org/api/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.flow/first.html |
| [single]: https://kotlinlang.org/api/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.flow/single.html |
| [reduce]: https://kotlinlang.org/api/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.flow/reduce.html |
| [fold]: https://kotlinlang.org/api/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.flow/fold.html |
| [flowOn]: https://kotlinlang.org/api/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.flow/flow-on.html |
| [buffer]: https://kotlinlang.org/api/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.flow/buffer.html |
| [conflate]: https://kotlinlang.org/api/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.flow/conflate.html |
| [collectLatest]: https://kotlinlang.org/api/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.flow/collect-latest.html |
| [zip]: https://kotlinlang.org/api/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.flow/zip.html |
| [combine]: https://kotlinlang.org/api/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.flow/combine.html |
| [onEach]: https://kotlinlang.org/api/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.flow/on-each.html |
| [flatMapConcat]: https://kotlinlang.org/api/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.flow/flat-map-concat.html |
| [flattenConcat]: https://kotlinlang.org/api/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.flow/flatten-concat.html |
| [flatMapMerge]: https://kotlinlang.org/api/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.flow/flat-map-merge.html |
| [flattenMerge]: https://kotlinlang.org/api/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.flow/flatten-merge.html |
| [DEFAULT_CONCURRENCY]: https://kotlinlang.org/api/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.flow/-d-e-f-a-u-l-t_-c-o-n-c-u-r-r-e-n-c-y.html |
| [flatMapLatest]: https://kotlinlang.org/api/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.flow/flat-map-latest.html |
| [catch]: https://kotlinlang.org/api/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.flow/catch.html |
| [onCompletion]: https://kotlinlang.org/api/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.flow/on-completion.html |
| [launchIn]: https://kotlinlang.org/api/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.flow/launch-in.html |
| [IntRange.asFlow]: https://kotlinlang.org/api/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.flow/as-flow.html |
| [cancellable]: https://kotlinlang.org/api/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.flow/cancellable.html |
| |
| <!--- END --> |