| <!--- TEST_NAME SharedStateGuideTest --> |
| |
| [//]: # (title: Shared mutable state and concurrency) |
| |
| Coroutines can be executed parallelly using a multi-threaded dispatcher like the [Dispatchers.Default]. It presents |
| all the usual parallelism problems. The main problem being synchronization of access to **shared mutable state**. |
| Some solutions to this problem in the land of coroutines are similar to the solutions in the multi-threaded world, |
| but others are unique. |
| |
| ## The problem |
| |
| Let us launch a hundred coroutines all doing the same action a thousand times. |
| We'll also measure their completion time for further comparisons: |
| |
| ```kotlin |
| suspend fun massiveRun(action: suspend () -> Unit) { |
| val n = 100 // number of coroutines to launch |
| val k = 1000 // times an action is repeated by each coroutine |
| val time = measureTimeMillis { |
| coroutineScope { // scope for coroutines |
| repeat(n) { |
| launch { |
| repeat(k) { action() } |
| } |
| } |
| } |
| } |
| println("Completed ${n * k} actions in $time ms") |
| } |
| ``` |
| |
| We start with a very simple action that increments a shared mutable variable using |
| multi-threaded [Dispatchers.Default]. |
| |
| <!--- CLEAR --> |
| |
| ```kotlin |
| import kotlinx.coroutines.* |
| import kotlin.system.* |
| |
| suspend fun massiveRun(action: suspend () -> Unit) { |
| val n = 100 // number of coroutines to launch |
| val k = 1000 // times an action is repeated by each coroutine |
| val time = measureTimeMillis { |
| coroutineScope { // scope for coroutines |
| repeat(n) { |
| launch { |
| repeat(k) { action() } |
| } |
| } |
| } |
| } |
| println("Completed ${n * k} actions in $time ms") |
| } |
| |
| //sampleStart |
| var counter = 0 |
| |
| fun main() = runBlocking { |
| withContext(Dispatchers.Default) { |
| massiveRun { |
| counter++ |
| } |
| } |
| println("Counter = $counter") |
| } |
| //sampleEnd |
| ``` |
| {kotlin-runnable="true" kotlin-min-compiler-version="1.3"} |
| |
| > You can get the full code [here](../../kotlinx-coroutines-core/jvm/test/guide/example-sync-01.kt). |
| > |
| {type="note"} |
| |
| <!--- TEST LINES_START |
| Completed 100000 actions in |
| Counter = |
| --> |
| |
| What does it print at the end? It is highly unlikely to ever print "Counter = 100000", because a hundred coroutines |
| increment the `counter` concurrently from multiple threads without any synchronization. |
| |
| ## Volatiles are of no help |
| |
| There is a common misconception that making a variable `volatile` solves concurrency problem. Let us try it: |
| |
| <!--- CLEAR --> |
| |
| ```kotlin |
| import kotlinx.coroutines.* |
| import kotlin.system.* |
| |
| suspend fun massiveRun(action: suspend () -> Unit) { |
| val n = 100 // number of coroutines to launch |
| val k = 1000 // times an action is repeated by each coroutine |
| val time = measureTimeMillis { |
| coroutineScope { // scope for coroutines |
| repeat(n) { |
| launch { |
| repeat(k) { action() } |
| } |
| } |
| } |
| } |
| println("Completed ${n * k} actions in $time ms") |
| } |
| |
| //sampleStart |
| @Volatile // in Kotlin `volatile` is an annotation |
| var counter = 0 |
| |
| fun main() = runBlocking { |
| withContext(Dispatchers.Default) { |
| massiveRun { |
| counter++ |
| } |
| } |
| println("Counter = $counter") |
| } |
| //sampleEnd |
| ``` |
| {kotlin-runnable="true" kotlin-min-compiler-version="1.3"} |
| |
| > You can get the full code [here](../../kotlinx-coroutines-core/jvm/test/guide/example-sync-02.kt). |
| > |
| {type="note"} |
| |
| <!--- TEST LINES_START |
| Completed 100000 actions in |
| Counter = |
| --> |
| |
| This code works slower, but we still don't get "Counter = 100000" at the end, because volatile variables guarantee |
| linearizable (this is a technical term for "atomic") reads and writes to the corresponding variable, but |
| do not provide atomicity of larger actions (increment in our case). |
| |
| ## Thread-safe data structures |
| |
| The general solution that works both for threads and for coroutines is to use a thread-safe (aka synchronized, |
| linearizable, or atomic) data structure that provides all the necessary synchronization for the corresponding |
| operations that needs to be performed on a shared state. |
| In the case of a simple counter we can use `AtomicInteger` class which has atomic `incrementAndGet` operations: |
| |
| <!--- CLEAR --> |
| |
| ```kotlin |
| import kotlinx.coroutines.* |
| import java.util.concurrent.atomic.* |
| import kotlin.system.* |
| |
| suspend fun massiveRun(action: suspend () -> Unit) { |
| val n = 100 // number of coroutines to launch |
| val k = 1000 // times an action is repeated by each coroutine |
| val time = measureTimeMillis { |
| coroutineScope { // scope for coroutines |
| repeat(n) { |
| launch { |
| repeat(k) { action() } |
| } |
| } |
| } |
| } |
| println("Completed ${n * k} actions in $time ms") |
| } |
| |
| //sampleStart |
| val counter = AtomicInteger() |
| |
| fun main() = runBlocking { |
| withContext(Dispatchers.Default) { |
| massiveRun { |
| counter.incrementAndGet() |
| } |
| } |
| println("Counter = $counter") |
| } |
| //sampleEnd |
| ``` |
| {kotlin-runnable="true" kotlin-min-compiler-version="1.3"} |
| |
| > You can get the full code [here](../../kotlinx-coroutines-core/jvm/test/guide/example-sync-03.kt). |
| > |
| {type="note"} |
| |
| <!--- TEST ARBITRARY_TIME |
| Completed 100000 actions in xxx ms |
| Counter = 100000 |
| --> |
| |
| This is the fastest solution for this particular problem. It works for plain counters, collections, queues and other |
| standard data structures and basic operations on them. However, it does not easily scale to complex |
| state or to complex operations that do not have ready-to-use thread-safe implementations. |
| |
| ## Thread confinement fine-grained |
| |
| _Thread confinement_ is an approach to the problem of shared mutable state where all access to the particular shared |
| state is confined to a single thread. It is typically used in UI applications, where all UI state is confined to |
| the single event-dispatch/application thread. It is easy to apply with coroutines by using a |
| single-threaded context. |
| |
| <!--- CLEAR --> |
| |
| ```kotlin |
| import kotlinx.coroutines.* |
| import kotlin.system.* |
| |
| suspend fun massiveRun(action: suspend () -> Unit) { |
| val n = 100 // number of coroutines to launch |
| val k = 1000 // times an action is repeated by each coroutine |
| val time = measureTimeMillis { |
| coroutineScope { // scope for coroutines |
| repeat(n) { |
| launch { |
| repeat(k) { action() } |
| } |
| } |
| } |
| } |
| println("Completed ${n * k} actions in $time ms") |
| } |
| |
| //sampleStart |
| val counterContext = newSingleThreadContext("CounterContext") |
| var counter = 0 |
| |
| fun main() = runBlocking { |
| withContext(Dispatchers.Default) { |
| massiveRun { |
| // confine each increment to a single-threaded context |
| withContext(counterContext) { |
| counter++ |
| } |
| } |
| } |
| println("Counter = $counter") |
| } |
| //sampleEnd |
| ``` |
| {kotlin-runnable="true" kotlin-min-compiler-version="1.3"} |
| |
| > You can get the full code [here](../../kotlinx-coroutines-core/jvm/test/guide/example-sync-04.kt). |
| > |
| {type="note"} |
| |
| <!--- TEST ARBITRARY_TIME |
| Completed 100000 actions in xxx ms |
| Counter = 100000 |
| --> |
| |
| This code works very slowly, because it does _fine-grained_ thread-confinement. Each individual increment switches |
| from multi-threaded [Dispatchers.Default] context to the single-threaded context using |
| [withContext(counterContext)][withContext] block. |
| |
| ## Thread confinement coarse-grained |
| |
| In practice, thread confinement is performed in large chunks, e.g. big pieces of state-updating business logic |
| are confined to the single thread. The following example does it like that, running each coroutine in |
| the single-threaded context to start with. |
| |
| <!--- CLEAR --> |
| |
| ```kotlin |
| import kotlinx.coroutines.* |
| import kotlin.system.* |
| |
| suspend fun massiveRun(action: suspend () -> Unit) { |
| val n = 100 // number of coroutines to launch |
| val k = 1000 // times an action is repeated by each coroutine |
| val time = measureTimeMillis { |
| coroutineScope { // scope for coroutines |
| repeat(n) { |
| launch { |
| repeat(k) { action() } |
| } |
| } |
| } |
| } |
| println("Completed ${n * k} actions in $time ms") |
| } |
| |
| //sampleStart |
| val counterContext = newSingleThreadContext("CounterContext") |
| var counter = 0 |
| |
| fun main() = runBlocking { |
| // confine everything to a single-threaded context |
| withContext(counterContext) { |
| massiveRun { |
| counter++ |
| } |
| } |
| println("Counter = $counter") |
| } |
| //sampleEnd |
| ``` |
| {kotlin-runnable="true" kotlin-min-compiler-version="1.3"} |
| |
| > You can get the full code [here](../../kotlinx-coroutines-core/jvm/test/guide/example-sync-05.kt). |
| > |
| {type="note"} |
| |
| <!--- TEST ARBITRARY_TIME |
| Completed 100000 actions in xxx ms |
| Counter = 100000 |
| --> |
| |
| This now works much faster and produces correct result. |
| |
| ## Mutual exclusion |
| |
| Mutual exclusion solution to the problem is to protect all modifications of the shared state with a _critical section_ |
| that is never executed concurrently. In a blocking world you'd typically use `synchronized` or `ReentrantLock` for that. |
| Coroutine's alternative is called [Mutex]. It has [lock][Mutex.lock] and [unlock][Mutex.unlock] functions to |
| delimit a critical section. The key difference is that `Mutex.lock()` is a suspending function. It does not block a thread. |
| |
| There is also [withLock] extension function that conveniently represents |
| `mutex.lock(); try { ... } finally { mutex.unlock() }` pattern: |
| |
| <!--- CLEAR --> |
| |
| ```kotlin |
| import kotlinx.coroutines.* |
| import kotlinx.coroutines.sync.* |
| import kotlin.system.* |
| |
| suspend fun massiveRun(action: suspend () -> Unit) { |
| val n = 100 // number of coroutines to launch |
| val k = 1000 // times an action is repeated by each coroutine |
| val time = measureTimeMillis { |
| coroutineScope { // scope for coroutines |
| repeat(n) { |
| launch { |
| repeat(k) { action() } |
| } |
| } |
| } |
| } |
| println("Completed ${n * k} actions in $time ms") |
| } |
| |
| //sampleStart |
| val mutex = Mutex() |
| var counter = 0 |
| |
| fun main() = runBlocking { |
| withContext(Dispatchers.Default) { |
| massiveRun { |
| // protect each increment with lock |
| mutex.withLock { |
| counter++ |
| } |
| } |
| } |
| println("Counter = $counter") |
| } |
| //sampleEnd |
| ``` |
| {kotlin-runnable="true" kotlin-min-compiler-version="1.3"} |
| |
| > You can get the full code [here](../../kotlinx-coroutines-core/jvm/test/guide/example-sync-06.kt). |
| > |
| {type="note"} |
| |
| <!--- TEST ARBITRARY_TIME |
| Completed 100000 actions in xxx ms |
| Counter = 100000 |
| --> |
| |
| The locking in this example is fine-grained, so it pays the price. However, it is a good choice for some situations |
| where you absolutely must modify some shared state periodically, but there is no natural thread that this state |
| is confined to. |
| |
| ## Actors |
| |
| An [actor](https://en.wikipedia.org/wiki/Actor_model) is an entity made up of a combination of a coroutine, |
| the state that is confined and encapsulated into this coroutine, |
| and a channel to communicate with other coroutines. A simple actor can be written as a function, |
| but an actor with a complex state is better suited for a class. |
| |
| There is an [actor] coroutine builder that conveniently combines actor's mailbox channel into its |
| scope to receive messages from and combines the send channel into the resulting job object, so that a |
| single reference to the actor can be carried around as its handle. |
| |
| The first step of using an actor is to define a class of messages that an actor is going to process. |
| Kotlin's [sealed classes](https://kotlinlang.org/docs/reference/sealed-classes.html) are well suited for that purpose. |
| We define `CounterMsg` sealed class with `IncCounter` message to increment a counter and `GetCounter` message |
| to get its value. The latter needs to send a response. A [CompletableDeferred] communication |
| primitive, that represents a single value that will be known (communicated) in the future, |
| is used here for that purpose. |
| |
| ```kotlin |
| // Message types for counterActor |
| sealed class CounterMsg |
| object IncCounter : CounterMsg() // one-way message to increment counter |
| class GetCounter(val response: CompletableDeferred<Int>) : CounterMsg() // a request with reply |
| ``` |
| |
| Then we define a function that launches an actor using an [actor] coroutine builder: |
| |
| ```kotlin |
| // This function launches a new counter actor |
| fun CoroutineScope.counterActor() = actor<CounterMsg> { |
| var counter = 0 // actor state |
| for (msg in channel) { // iterate over incoming messages |
| when (msg) { |
| is IncCounter -> counter++ |
| is GetCounter -> msg.response.complete(counter) |
| } |
| } |
| } |
| ``` |
| |
| The main code is straightforward: |
| |
| <!--- CLEAR --> |
| |
| ```kotlin |
| import kotlinx.coroutines.* |
| import kotlinx.coroutines.channels.* |
| import kotlin.system.* |
| |
| suspend fun massiveRun(action: suspend () -> Unit) { |
| val n = 100 // number of coroutines to launch |
| val k = 1000 // times an action is repeated by each coroutine |
| val time = measureTimeMillis { |
| coroutineScope { // scope for coroutines |
| repeat(n) { |
| launch { |
| repeat(k) { action() } |
| } |
| } |
| } |
| } |
| println("Completed ${n * k} actions in $time ms") |
| } |
| |
| // Message types for counterActor |
| sealed class CounterMsg |
| object IncCounter : CounterMsg() // one-way message to increment counter |
| class GetCounter(val response: CompletableDeferred<Int>) : CounterMsg() // a request with reply |
| |
| // This function launches a new counter actor |
| fun CoroutineScope.counterActor() = actor<CounterMsg> { |
| var counter = 0 // actor state |
| for (msg in channel) { // iterate over incoming messages |
| when (msg) { |
| is IncCounter -> counter++ |
| is GetCounter -> msg.response.complete(counter) |
| } |
| } |
| } |
| |
| //sampleStart |
| fun main() = runBlocking<Unit> { |
| val counter = counterActor() // create the actor |
| withContext(Dispatchers.Default) { |
| massiveRun { |
| counter.send(IncCounter) |
| } |
| } |
| // send a message to get a counter value from an actor |
| val response = CompletableDeferred<Int>() |
| counter.send(GetCounter(response)) |
| println("Counter = ${response.await()}") |
| counter.close() // shutdown the actor |
| } |
| //sampleEnd |
| ``` |
| {kotlin-runnable="true" kotlin-min-compiler-version="1.3"} |
| |
| > You can get the full code [here](../../kotlinx-coroutines-core/jvm/test/guide/example-sync-07.kt). |
| > |
| {type="note"} |
| |
| <!--- TEST ARBITRARY_TIME |
| Completed 100000 actions in xxx ms |
| Counter = 100000 |
| --> |
| |
| It does not matter (for correctness) what context the actor itself is executed in. An actor is |
| a coroutine and a coroutine is executed sequentially, so confinement of the state to the specific coroutine |
| works as a solution to the problem of shared mutable state. Indeed, actors may modify their own private state, |
| but can only affect each other through messages (avoiding the need for any locks). |
| |
| Actor is more efficient than locking under load, because in this case it always has work to do and it does not |
| have to switch to a different context at all. |
| |
| > Note that an [actor] coroutine builder is a dual of [produce] coroutine builder. An actor is associated |
| > with the channel that it receives messages from, while a producer is associated with the channel that it |
| > sends elements to. |
| > |
| {type="note"} |
| |
| <!--- MODULE kotlinx-coroutines-core --> |
| <!--- INDEX kotlinx.coroutines --> |
| |
| [Dispatchers.Default]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines/-dispatchers/-default.html |
| [withContext]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines/with-context.html |
| [CompletableDeferred]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines/-completable-deferred/index.html |
| |
| <!--- INDEX kotlinx.coroutines.sync --> |
| |
| [Mutex]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.sync/-mutex/index.html |
| [Mutex.lock]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.sync/-mutex/lock.html |
| [Mutex.unlock]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.sync/-mutex/unlock.html |
| [withLock]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.sync/with-lock.html |
| |
| <!--- INDEX kotlinx.coroutines.channels --> |
| |
| [actor]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.channels/actor.html |
| [produce]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.channels/produce.html |
| |
| <!--- END --> |