| <!--- TEST_NAME ChannelsGuideTest --> |
| |
| [//]: # (title: Channels) |
| |
| Deferred values provide a convenient way to transfer a single value between coroutines. |
| Channels provide a way to transfer a stream of values. |
| |
| ## Channel basics |
| |
| A [Channel] is conceptually very similar to `BlockingQueue`. One key difference is that |
| instead of a blocking `put` operation it has a suspending [send][SendChannel.send], and instead of |
| a blocking `take` operation it has a suspending [receive][ReceiveChannel.receive]. |
| |
| ```kotlin |
| import kotlinx.coroutines.* |
| import kotlinx.coroutines.channels.* |
| |
| fun main() = runBlocking { |
| //sampleStart |
| val channel = Channel<Int>() |
| launch { |
| // this might be heavy CPU-consuming computation or async logic, |
| // we'll just send five squares |
| for (x in 1..5) channel.send(x * x) |
| } |
| // here we print five received integers: |
| repeat(5) { println(channel.receive()) } |
| println("Done!") |
| //sampleEnd |
| } |
| ``` |
| {kotlin-runnable="true" kotlin-min-compiler-version="1.3"} |
| |
| > You can get the full code [here](../../kotlinx-coroutines-core/jvm/test/guide/example-channel-01.kt). |
| > |
| {type="note"} |
| |
| The output of this code is: |
| |
| ```text |
| 1 |
| 4 |
| 9 |
| 16 |
| 25 |
| Done! |
| ``` |
| |
| <!--- TEST --> |
| |
| ## Closing and iteration over channels |
| |
| Unlike a queue, a channel can be closed to indicate that no more elements are coming. |
| On the receiver side it is convenient to use a regular `for` loop to receive elements |
| from the channel. |
| |
| Conceptually, a [close][SendChannel.close] is like sending a special close token to the channel. |
| The iteration stops as soon as this close token is received, so there is a guarantee |
| that all previously sent elements before the close are received: |
| |
| ```kotlin |
| import kotlinx.coroutines.* |
| import kotlinx.coroutines.channels.* |
| |
| fun main() = runBlocking { |
| //sampleStart |
| val channel = Channel<Int>() |
| launch { |
| for (x in 1..5) channel.send(x * x) |
| channel.close() // we're done sending |
| } |
| // here we print received values using `for` loop (until the channel is closed) |
| for (y in channel) println(y) |
| println("Done!") |
| //sampleEnd |
| } |
| ``` |
| {kotlin-runnable="true" kotlin-min-compiler-version="1.3"} |
| |
| > You can get the full code [here](../../kotlinx-coroutines-core/jvm/test/guide/example-channel-02.kt). |
| > |
| {type="note"} |
| |
| <!--- TEST |
| 1 |
| 4 |
| 9 |
| 16 |
| 25 |
| Done! |
| --> |
| |
| ## Building channel producers |
| |
| The pattern where a coroutine is producing a sequence of elements is quite common. |
| This is a part of _producer-consumer_ pattern that is often found in concurrent code. |
| You could abstract such a producer into a function that takes channel as its parameter, but this goes contrary |
| to common sense that results must be returned from functions. |
| |
| There is a convenient coroutine builder named [produce] that makes it easy to do it right on producer side, |
| and an extension function [consumeEach], that replaces a `for` loop on the consumer side: |
| |
| ```kotlin |
| import kotlinx.coroutines.* |
| import kotlinx.coroutines.channels.* |
| |
| //sampleStart |
| fun CoroutineScope.produceSquares(): ReceiveChannel<Int> = produce { |
| for (x in 1..5) send(x * x) |
| } |
| |
| fun main() = runBlocking { |
| val squares = produceSquares() |
| squares.consumeEach { println(it) } |
| println("Done!") |
| //sampleEnd |
| } |
| ``` |
| {kotlin-runnable="true" kotlin-min-compiler-version="1.3"} |
| |
| > You can get the full code [here](../../kotlinx-coroutines-core/jvm/test/guide/example-channel-03.kt). |
| > |
| {type="note"} |
| |
| <!--- TEST |
| 1 |
| 4 |
| 9 |
| 16 |
| 25 |
| Done! |
| --> |
| |
| ## Pipelines |
| |
| A pipeline is a pattern where one coroutine is producing, possibly infinite, stream of values: |
| |
| ```kotlin |
| fun CoroutineScope.produceNumbers() = produce<Int> { |
| var x = 1 |
| while (true) send(x++) // infinite stream of integers starting from 1 |
| } |
| ``` |
| |
| And another coroutine or coroutines are consuming that stream, doing some processing, and producing some other results. |
| In the example below, the numbers are just squared: |
| |
| ```kotlin |
| fun CoroutineScope.square(numbers: ReceiveChannel<Int>): ReceiveChannel<Int> = produce { |
| for (x in numbers) send(x * x) |
| } |
| ``` |
| |
| The main code starts and connects the whole pipeline: |
| |
| <!--- CLEAR --> |
| |
| ```kotlin |
| import kotlinx.coroutines.* |
| import kotlinx.coroutines.channels.* |
| |
| fun main() = runBlocking { |
| //sampleStart |
| val numbers = produceNumbers() // produces integers from 1 and on |
| val squares = square(numbers) // squares integers |
| repeat(5) { |
| println(squares.receive()) // print first five |
| } |
| println("Done!") // we are done |
| coroutineContext.cancelChildren() // cancel children coroutines |
| //sampleEnd |
| } |
| |
| fun CoroutineScope.produceNumbers() = produce<Int> { |
| var x = 1 |
| while (true) send(x++) // infinite stream of integers starting from 1 |
| } |
| |
| fun CoroutineScope.square(numbers: ReceiveChannel<Int>): ReceiveChannel<Int> = produce { |
| for (x in numbers) send(x * x) |
| } |
| ``` |
| {kotlin-runnable="true" kotlin-min-compiler-version="1.3"} |
| |
| > You can get the full code [here](../../kotlinx-coroutines-core/jvm/test/guide/example-channel-04.kt). |
| > |
| {type="note"} |
| |
| <!--- TEST |
| 1 |
| 4 |
| 9 |
| 16 |
| 25 |
| Done! |
| --> |
| |
| > All functions that create coroutines are defined as extensions on [CoroutineScope], |
| > so that we can rely on [structured concurrency](composing-suspending-functions.md#structured-concurrency-with-async) to make |
| > sure that we don't have lingering global coroutines in our application. |
| > |
| {type="note"} |
| |
| ## Prime numbers with pipeline |
| |
| Let's take pipelines to the extreme with an example that generates prime numbers using a pipeline |
| of coroutines. We start with an infinite sequence of numbers. |
| |
| ```kotlin |
| fun CoroutineScope.numbersFrom(start: Int) = produce<Int> { |
| var x = start |
| while (true) send(x++) // infinite stream of integers from start |
| } |
| ``` |
| |
| The following pipeline stage filters an incoming stream of numbers, removing all the numbers |
| that are divisible by the given prime number: |
| |
| ```kotlin |
| fun CoroutineScope.filter(numbers: ReceiveChannel<Int>, prime: Int) = produce<Int> { |
| for (x in numbers) if (x % prime != 0) send(x) |
| } |
| ``` |
| |
| Now we build our pipeline by starting a stream of numbers from 2, taking a prime number from the current channel, |
| and launching new pipeline stage for each prime number found: |
| |
| ```Plain Text |
| numbersFrom(2) -> filter(2) -> filter(3) -> filter(5) -> filter(7) ... |
| ``` |
| |
| The following example prints the first ten prime numbers, |
| running the whole pipeline in the context of the main thread. Since all the coroutines are launched in |
| the scope of the main [runBlocking] coroutine |
| we don't have to keep an explicit list of all the coroutines we have started. |
| We use [cancelChildren][kotlin.coroutines.CoroutineContext.cancelChildren] |
| extension function to cancel all the children coroutines after we have printed |
| the first ten prime numbers. |
| |
| <!--- CLEAR --> |
| |
| ```kotlin |
| import kotlinx.coroutines.* |
| import kotlinx.coroutines.channels.* |
| |
| fun main() = runBlocking { |
| //sampleStart |
| var cur = numbersFrom(2) |
| repeat(10) { |
| val prime = cur.receive() |
| println(prime) |
| cur = filter(cur, prime) |
| } |
| coroutineContext.cancelChildren() // cancel all children to let main finish |
| //sampleEnd |
| } |
| |
| fun CoroutineScope.numbersFrom(start: Int) = produce<Int> { |
| var x = start |
| while (true) send(x++) // infinite stream of integers from start |
| } |
| |
| fun CoroutineScope.filter(numbers: ReceiveChannel<Int>, prime: Int) = produce<Int> { |
| for (x in numbers) if (x % prime != 0) send(x) |
| } |
| ``` |
| {kotlin-runnable="true" kotlin-min-compiler-version="1.3"} |
| |
| > You can get the full code [here](../../kotlinx-coroutines-core/jvm/test/guide/example-channel-05.kt). |
| > |
| {type="note"} |
| |
| The output of this code is: |
| |
| ```text |
| 2 |
| 3 |
| 5 |
| 7 |
| 11 |
| 13 |
| 17 |
| 19 |
| 23 |
| 29 |
| ``` |
| |
| <!--- TEST --> |
| |
| Note that you can build the same pipeline using |
| [`iterator`](https://kotlinlang.org/api/latest/jvm/stdlib/kotlin.sequences/iterator.html) |
| coroutine builder from the standard library. |
| Replace `produce` with `iterator`, `send` with `yield`, `receive` with `next`, |
| `ReceiveChannel` with `Iterator`, and get rid of the coroutine scope. You will not need `runBlocking` either. |
| However, the benefit of a pipeline that uses channels as shown above is that it can actually use |
| multiple CPU cores if you run it in [Dispatchers.Default] context. |
| |
| Anyway, this is an extremely impractical way to find prime numbers. In practice, pipelines do involve some |
| other suspending invocations (like asynchronous calls to remote services) and these pipelines cannot be |
| built using `sequence`/`iterator`, because they do not allow arbitrary suspension, unlike |
| `produce`, which is fully asynchronous. |
| |
| ## Fan-out |
| |
| Multiple coroutines may receive from the same channel, distributing work between themselves. |
| Let us start with a producer coroutine that is periodically producing integers |
| (ten numbers per second): |
| |
| ```kotlin |
| fun CoroutineScope.produceNumbers() = produce<Int> { |
| var x = 1 // start from 1 |
| while (true) { |
| send(x++) // produce next |
| delay(100) // wait 0.1s |
| } |
| } |
| ``` |
| |
| Then we can have several processor coroutines. In this example, they just print their id and |
| received number: |
| |
| ```kotlin |
| fun CoroutineScope.launchProcessor(id: Int, channel: ReceiveChannel<Int>) = launch { |
| for (msg in channel) { |
| println("Processor #$id received $msg") |
| } |
| } |
| ``` |
| |
| Now let us launch five processors and let them work for almost a second. See what happens: |
| |
| <!--- CLEAR --> |
| |
| ```kotlin |
| import kotlinx.coroutines.* |
| import kotlinx.coroutines.channels.* |
| |
| fun main() = runBlocking<Unit> { |
| //sampleStart |
| val producer = produceNumbers() |
| repeat(5) { launchProcessor(it, producer) } |
| delay(950) |
| producer.cancel() // cancel producer coroutine and thus kill them all |
| //sampleEnd |
| } |
| |
| fun CoroutineScope.produceNumbers() = produce<Int> { |
| var x = 1 // start from 1 |
| while (true) { |
| send(x++) // produce next |
| delay(100) // wait 0.1s |
| } |
| } |
| |
| fun CoroutineScope.launchProcessor(id: Int, channel: ReceiveChannel<Int>) = launch { |
| for (msg in channel) { |
| println("Processor #$id received $msg") |
| } |
| } |
| ``` |
| {kotlin-runnable="true" kotlin-min-compiler-version="1.3"} |
| |
| > You can get the full code [here](../../kotlinx-coroutines-core/jvm/test/guide/example-channel-06.kt). |
| > |
| {type="note"} |
| |
| The output will be similar to the following one, albeit the processor ids that receive |
| each specific integer may be different: |
| |
| ```text |
| Processor #2 received 1 |
| Processor #4 received 2 |
| Processor #0 received 3 |
| Processor #1 received 4 |
| Processor #3 received 5 |
| Processor #2 received 6 |
| Processor #4 received 7 |
| Processor #0 received 8 |
| Processor #1 received 9 |
| Processor #3 received 10 |
| ``` |
| |
| <!--- TEST lines.size == 10 && lines.withIndex().all { (i, line) -> line.startsWith("Processor #") && line.endsWith(" received ${i + 1}") } --> |
| |
| Note that cancelling a producer coroutine closes its channel, thus eventually terminating iteration |
| over the channel that processor coroutines are doing. |
| |
| Also, pay attention to how we explicitly iterate over channel with `for` loop to perform fan-out in `launchProcessor` code. |
| Unlike `consumeEach`, this `for` loop pattern is perfectly safe to use from multiple coroutines. If one of the processor |
| coroutines fails, then others would still be processing the channel, while a processor that is written via `consumeEach` |
| always consumes (cancels) the underlying channel on its normal or abnormal completion. |
| |
| ## Fan-in |
| |
| Multiple coroutines may send to the same channel. |
| For example, let us have a channel of strings, and a suspending function that |
| repeatedly sends a specified string to this channel with a specified delay: |
| |
| ```kotlin |
| suspend fun sendString(channel: SendChannel<String>, s: String, time: Long) { |
| while (true) { |
| delay(time) |
| channel.send(s) |
| } |
| } |
| ``` |
| |
| Now, let us see what happens if we launch a couple of coroutines sending strings |
| (in this example we launch them in the context of the main thread as main coroutine's children): |
| |
| <!--- CLEAR --> |
| |
| ```kotlin |
| import kotlinx.coroutines.* |
| import kotlinx.coroutines.channels.* |
| |
| fun main() = runBlocking { |
| //sampleStart |
| val channel = Channel<String>() |
| launch { sendString(channel, "foo", 200L) } |
| launch { sendString(channel, "BAR!", 500L) } |
| repeat(6) { // receive first six |
| println(channel.receive()) |
| } |
| coroutineContext.cancelChildren() // cancel all children to let main finish |
| //sampleEnd |
| } |
| |
| suspend fun sendString(channel: SendChannel<String>, s: String, time: Long) { |
| while (true) { |
| delay(time) |
| channel.send(s) |
| } |
| } |
| ``` |
| {kotlin-runnable="true" kotlin-min-compiler-version="1.3"} |
| |
| > You can get the full code [here](../../kotlinx-coroutines-core/jvm/test/guide/example-channel-07.kt). |
| > |
| {type="note"} |
| |
| The output is: |
| |
| ```text |
| foo |
| foo |
| BAR! |
| foo |
| foo |
| BAR! |
| ``` |
| |
| <!--- TEST --> |
| |
| ## Buffered channels |
| |
| The channels shown so far had no buffer. Unbuffered channels transfer elements when sender and receiver |
| meet each other (aka rendezvous). If send is invoked first, then it is suspended until receive is invoked, |
| if receive is invoked first, it is suspended until send is invoked. |
| |
| Both [Channel()] factory function and [produce] builder take an optional `capacity` parameter to |
| specify _buffer size_. Buffer allows senders to send multiple elements before suspending, |
| similar to the `BlockingQueue` with a specified capacity, which blocks when buffer is full. |
| |
| Take a look at the behavior of the following code: |
| |
| ```kotlin |
| import kotlinx.coroutines.* |
| import kotlinx.coroutines.channels.* |
| |
| fun main() = runBlocking<Unit> { |
| //sampleStart |
| val channel = Channel<Int>(4) // create buffered channel |
| val sender = launch { // launch sender coroutine |
| repeat(10) { |
| println("Sending $it") // print before sending each element |
| channel.send(it) // will suspend when buffer is full |
| } |
| } |
| // don't receive anything... just wait.... |
| delay(1000) |
| sender.cancel() // cancel sender coroutine |
| //sampleEnd |
| } |
| ``` |
| {kotlin-runnable="true" kotlin-min-compiler-version="1.3"} |
| |
| > You can get the full code [here](../../kotlinx-coroutines-core/jvm/test/guide/example-channel-08.kt). |
| > |
| {type="note"} |
| |
| It prints "sending" _five_ times using a buffered channel with capacity of _four_: |
| |
| ```text |
| Sending 0 |
| Sending 1 |
| Sending 2 |
| Sending 3 |
| Sending 4 |
| ``` |
| |
| <!--- TEST --> |
| |
| The first four elements are added to the buffer and the sender suspends when trying to send the fifth one. |
| |
| ## Channels are fair |
| |
| Send and receive operations to channels are _fair_ with respect to the order of their invocation from |
| multiple coroutines. They are served in first-in first-out order, e.g. the first coroutine to invoke `receive` |
| gets the element. In the following example two coroutines "ping" and "pong" are |
| receiving the "ball" object from the shared "table" channel. |
| |
| ```kotlin |
| import kotlinx.coroutines.* |
| import kotlinx.coroutines.channels.* |
| |
| //sampleStart |
| data class Ball(var hits: Int) |
| |
| fun main() = runBlocking { |
| val table = Channel<Ball>() // a shared table |
| launch { player("ping", table) } |
| launch { player("pong", table) } |
| table.send(Ball(0)) // serve the ball |
| delay(1000) // delay 1 second |
| coroutineContext.cancelChildren() // game over, cancel them |
| } |
| |
| suspend fun player(name: String, table: Channel<Ball>) { |
| for (ball in table) { // receive the ball in a loop |
| ball.hits++ |
| println("$name $ball") |
| delay(300) // wait a bit |
| table.send(ball) // send the ball back |
| } |
| } |
| //sampleEnd |
| ``` |
| {kotlin-runnable="true" kotlin-min-compiler-version="1.3"} |
| |
| > You can get the full code [here](../../kotlinx-coroutines-core/jvm/test/guide/example-channel-09.kt). |
| > |
| {type="note"} |
| |
| The "ping" coroutine is started first, so it is the first one to receive the ball. Even though "ping" |
| coroutine immediately starts receiving the ball again after sending it back to the table, the ball gets |
| received by the "pong" coroutine, because it was already waiting for it: |
| |
| ```text |
| ping Ball(hits=1) |
| pong Ball(hits=2) |
| ping Ball(hits=3) |
| pong Ball(hits=4) |
| ``` |
| |
| <!--- TEST --> |
| |
| Note that sometimes channels may produce executions that look unfair due to the nature of the executor |
| that is being used. See [this issue](https://github.com/Kotlin/kotlinx.coroutines/issues/111) for details. |
| |
| ## Ticker channels |
| |
| Ticker channel is a special rendezvous channel that produces `Unit` every time given delay passes since last consumption from this channel. |
| Though it may seem to be useless standalone, it is a useful building block to create complex time-based [produce] |
| pipelines and operators that do windowing and other time-dependent processing. |
| Ticker channel can be used in [select] to perform "on tick" action. |
| |
| To create such channel use a factory method [ticker]. |
| To indicate that no further elements are needed use [ReceiveChannel.cancel] method on it. |
| |
| Now let's see how it works in practice: |
| |
| ```kotlin |
| import kotlinx.coroutines.* |
| import kotlinx.coroutines.channels.* |
| |
| //sampleStart |
| fun main() = runBlocking<Unit> { |
| val tickerChannel = ticker(delayMillis = 200, initialDelayMillis = 0) // create a ticker channel |
| var nextElement = withTimeoutOrNull(1) { tickerChannel.receive() } |
| println("Initial element is available immediately: $nextElement") // no initial delay |
| |
| nextElement = withTimeoutOrNull(100) { tickerChannel.receive() } // all subsequent elements have 200ms delay |
| println("Next element is not ready in 100 ms: $nextElement") |
| |
| nextElement = withTimeoutOrNull(120) { tickerChannel.receive() } |
| println("Next element is ready in 200 ms: $nextElement") |
| |
| // Emulate large consumption delays |
| println("Consumer pauses for 300ms") |
| delay(300) |
| // Next element is available immediately |
| nextElement = withTimeoutOrNull(1) { tickerChannel.receive() } |
| println("Next element is available immediately after large consumer delay: $nextElement") |
| // Note that the pause between `receive` calls is taken into account and next element arrives faster |
| nextElement = withTimeoutOrNull(120) { tickerChannel.receive() } |
| println("Next element is ready in 100ms after consumer pause in 300ms: $nextElement") |
| |
| tickerChannel.cancel() // indicate that no more elements are needed |
| } |
| //sampleEnd |
| ``` |
| {kotlin-runnable="true" kotlin-min-compiler-version="1.3"} |
| |
| > You can get the full code [here](../../kotlinx-coroutines-core/jvm/test/guide/example-channel-10.kt). |
| > |
| {type="note"} |
| |
| It prints following lines: |
| |
| ```text |
| Initial element is available immediately: kotlin.Unit |
| Next element is not ready in 100 ms: null |
| Next element is ready in 200 ms: kotlin.Unit |
| Consumer pauses for 300ms |
| Next element is available immediately after large consumer delay: kotlin.Unit |
| Next element is ready in 100ms after consumer pause in 300ms: kotlin.Unit |
| ``` |
| |
| <!--- TEST --> |
| |
| Note that [ticker] is aware of possible consumer pauses and, by default, adjusts next produced element |
| delay if a pause occurs, trying to maintain a fixed rate of produced elements. |
| |
| Optionally, a `mode` parameter equal to [TickerMode.FIXED_DELAY] can be specified to maintain a fixed |
| delay between elements. |
| |
| <!--- MODULE kotlinx-coroutines-core --> |
| <!--- INDEX kotlinx.coroutines --> |
| |
| [CoroutineScope]: https://kotlinlang.org/api/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines/-coroutine-scope/index.html |
| [runBlocking]: https://kotlinlang.org/api/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines/run-blocking.html |
| [kotlin.coroutines.CoroutineContext.cancelChildren]: https://kotlinlang.org/api/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines/cancel-children.html |
| [Dispatchers.Default]: https://kotlinlang.org/api/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines/-dispatchers/-default.html |
| |
| <!--- INDEX kotlinx.coroutines.channels --> |
| |
| [Channel]: https://kotlinlang.org/api/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.channels/-channel/index.html |
| [SendChannel.send]: https://kotlinlang.org/api/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.channels/-send-channel/send.html |
| [ReceiveChannel.receive]: https://kotlinlang.org/api/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.channels/-receive-channel/receive.html |
| [SendChannel.close]: https://kotlinlang.org/api/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.channels/-send-channel/close.html |
| [produce]: https://kotlinlang.org/api/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.channels/produce.html |
| [consumeEach]: https://kotlinlang.org/api/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.channels/consume-each.html |
| [Channel()]: https://kotlinlang.org/api/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.channels/-channel.html |
| [ticker]: https://kotlinlang.org/api/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.channels/ticker.html |
| [ReceiveChannel.cancel]: https://kotlinlang.org/api/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.channels/-receive-channel/cancel.html |
| [TickerMode.FIXED_DELAY]: https://kotlinlang.org/api/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.channels/-ticker-mode/-f-i-x-e-d_-d-e-l-a-y/index.html |
| |
| <!--- INDEX kotlinx.coroutines.selects --> |
| |
| [select]: https://kotlinlang.org/api/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.selects/select.html |
| |
| <!--- END --> |