Stabilize `CancellableContinuation.resume` with `onCancellation` (#4090)
Additionally, give `onCancellation` extra parameters
that help to avoid allocating closures in some cases.
Fixes #4088
diff --git a/kotlinx-coroutines-core/api/kotlinx-coroutines-core.api b/kotlinx-coroutines-core/api/kotlinx-coroutines-core.api
index a5a6e1c..3d34689 100644
--- a/kotlinx-coroutines-core/api/kotlinx-coroutines-core.api
+++ b/kotlinx-coroutines-core/api/kotlinx-coroutines-core.api
@@ -39,10 +39,11 @@
public abstract fun isCancelled ()Z
public abstract fun isCompleted ()Z
public abstract fun resume (Ljava/lang/Object;Lkotlin/jvm/functions/Function1;)V
+ public abstract fun resume (Ljava/lang/Object;Lkotlin/jvm/functions/Function3;)V
public abstract fun resumeUndispatched (Lkotlinx/coroutines/CoroutineDispatcher;Ljava/lang/Object;)V
public abstract fun resumeUndispatchedWithException (Lkotlinx/coroutines/CoroutineDispatcher;Ljava/lang/Throwable;)V
public abstract fun tryResume (Ljava/lang/Object;Ljava/lang/Object;)Ljava/lang/Object;
- public abstract fun tryResume (Ljava/lang/Object;Ljava/lang/Object;Lkotlin/jvm/functions/Function1;)Ljava/lang/Object;
+ public abstract fun tryResume (Ljava/lang/Object;Ljava/lang/Object;Lkotlin/jvm/functions/Function3;)Ljava/lang/Object;
public abstract fun tryResumeWithException (Ljava/lang/Throwable;)Ljava/lang/Object;
}
@@ -54,7 +55,7 @@
public class kotlinx/coroutines/CancellableContinuationImpl : kotlin/coroutines/jvm/internal/CoroutineStackFrame, kotlinx/coroutines/CancellableContinuation, kotlinx/coroutines/Waiter {
public fun <init> (Lkotlin/coroutines/Continuation;I)V
public final fun callCancelHandler (Lkotlinx/coroutines/CancelHandler;Ljava/lang/Throwable;)V
- public final fun callOnCancellation (Lkotlin/jvm/functions/Function1;Ljava/lang/Throwable;)V
+ public final fun callOnCancellation (Lkotlin/jvm/functions/Function3;Ljava/lang/Throwable;Ljava/lang/Object;)V
public fun cancel (Ljava/lang/Throwable;)Z
public fun completeResume (Ljava/lang/Object;)V
public fun getCallerFrame ()Lkotlin/coroutines/jvm/internal/CoroutineStackFrame;
@@ -70,12 +71,13 @@
public fun isCompleted ()Z
protected fun nameString ()Ljava/lang/String;
public fun resume (Ljava/lang/Object;Lkotlin/jvm/functions/Function1;)V
+ public fun resume (Ljava/lang/Object;Lkotlin/jvm/functions/Function3;)V
public fun resumeUndispatched (Lkotlinx/coroutines/CoroutineDispatcher;Ljava/lang/Object;)V
public fun resumeUndispatchedWithException (Lkotlinx/coroutines/CoroutineDispatcher;Ljava/lang/Throwable;)V
public fun resumeWith (Ljava/lang/Object;)V
public fun toString ()Ljava/lang/String;
public fun tryResume (Ljava/lang/Object;Ljava/lang/Object;)Ljava/lang/Object;
- public fun tryResume (Ljava/lang/Object;Ljava/lang/Object;Lkotlin/jvm/functions/Function1;)Ljava/lang/Object;
+ public fun tryResume (Ljava/lang/Object;Ljava/lang/Object;Lkotlin/jvm/functions/Function3;)Ljava/lang/Object;
public fun tryResumeWithException (Ljava/lang/Throwable;)Ljava/lang/Object;
}
diff --git a/kotlinx-coroutines-core/api/kotlinx-coroutines-core.klib.api b/kotlinx-coroutines-core/api/kotlinx-coroutines-core.klib.api
index 8b98b40..4ed893a 100644
--- a/kotlinx-coroutines-core/api/kotlinx-coroutines-core.klib.api
+++ b/kotlinx-coroutines-core/api/kotlinx-coroutines-core.klib.api
@@ -104,13 +104,14 @@
abstract interface <#A: in kotlin/Any?> kotlinx.coroutines/CancellableContinuation : kotlin.coroutines/Continuation<#A> { // kotlinx.coroutines/CancellableContinuation|null[0]
abstract fun (kotlinx.coroutines/CoroutineDispatcher).resumeUndispatched(#A) // kotlinx.coroutines/CancellableContinuation.resumeUndispatched|[email protected](1:0){}[0]
abstract fun (kotlinx.coroutines/CoroutineDispatcher).resumeUndispatchedWithException(kotlin/Throwable) // kotlinx.coroutines/CancellableContinuation.resumeUndispatchedWithException|resumeUndispatchedWithException@kotlinx.coroutines.CoroutineDispatcher(kotlin.Throwable){}[0]
+ abstract fun <#A1: #A> resume(#A1, kotlin/Function3<kotlin/Throwable, #A1, kotlin.coroutines/CoroutineContext, kotlin/Unit>?) // kotlinx.coroutines/CancellableContinuation.resume|resume(0:0;kotlin.Function3<kotlin.Throwable,0:0,kotlin.coroutines.CoroutineContext,kotlin.Unit>?){0§<1:0>}[0]
+ abstract fun <#A1: #A> tryResume(#A1, kotlin/Any?, kotlin/Function3<kotlin/Throwable, #A1, kotlin.coroutines/CoroutineContext, kotlin/Unit>?): kotlin/Any? // kotlinx.coroutines/CancellableContinuation.tryResume|tryResume(0:0;kotlin.Any?;kotlin.Function3<kotlin.Throwable,0:0,kotlin.coroutines.CoroutineContext,kotlin.Unit>?){0§<1:0>}[0]
abstract fun cancel(kotlin/Throwable? = ...): kotlin/Boolean // kotlinx.coroutines/CancellableContinuation.cancel|cancel(kotlin.Throwable?){}[0]
abstract fun completeResume(kotlin/Any) // kotlinx.coroutines/CancellableContinuation.completeResume|completeResume(kotlin.Any){}[0]
abstract fun initCancellability() // kotlinx.coroutines/CancellableContinuation.initCancellability|initCancellability(){}[0]
abstract fun invokeOnCancellation(kotlin/Function1<kotlin/Throwable?, kotlin/Unit>) // kotlinx.coroutines/CancellableContinuation.invokeOnCancellation|invokeOnCancellation(kotlin.Function1<kotlin.Throwable?,kotlin.Unit>){}[0]
abstract fun resume(#A, kotlin/Function1<kotlin/Throwable, kotlin/Unit>?) // kotlinx.coroutines/CancellableContinuation.resume|resume(1:0;kotlin.Function1<kotlin.Throwable,kotlin.Unit>?){}[0]
abstract fun tryResume(#A, kotlin/Any? = ...): kotlin/Any? // kotlinx.coroutines/CancellableContinuation.tryResume|tryResume(1:0;kotlin.Any?){}[0]
- abstract fun tryResume(#A, kotlin/Any?, kotlin/Function1<kotlin/Throwable, kotlin/Unit>?): kotlin/Any? // kotlinx.coroutines/CancellableContinuation.tryResume|tryResume(1:0;kotlin.Any?;kotlin.Function1<kotlin.Throwable,kotlin.Unit>?){}[0]
abstract fun tryResumeWithException(kotlin/Throwable): kotlin/Any? // kotlinx.coroutines/CancellableContinuation.tryResumeWithException|tryResumeWithException(kotlin.Throwable){}[0]
abstract val isActive // kotlinx.coroutines/CancellableContinuation.isActive|{}isActive[0]
abstract fun <get-isActive>(): kotlin/Boolean // kotlinx.coroutines/CancellableContinuation.isActive.<get-isActive>|<get-isActive>(){}[0]
@@ -774,11 +775,13 @@
}
open class <#A: in kotlin/Any?> kotlinx.coroutines/CancellableContinuationImpl : kotlinx.coroutines.internal/CoroutineStackFrame, kotlinx.coroutines/CancellableContinuation<#A>, kotlinx.coroutines/DispatchedTask<#A>, kotlinx.coroutines/Waiter { // kotlinx.coroutines/CancellableContinuationImpl|null[0]
constructor <init>(kotlin.coroutines/Continuation<#A>, kotlin/Int) // kotlinx.coroutines/CancellableContinuationImpl.<init>|<init>(kotlin.coroutines.Continuation<1:0>;kotlin.Int){}[0]
+ final fun <#A1: kotlin/Any?> callOnCancellation(kotlin/Function3<kotlin/Throwable, #A1, kotlin.coroutines/CoroutineContext, kotlin/Unit>, kotlin/Throwable, #A1) // kotlinx.coroutines/CancellableContinuationImpl.callOnCancellation|callOnCancellation(kotlin.Function3<kotlin.Throwable,0:0,kotlin.coroutines.CoroutineContext,kotlin.Unit>;kotlin.Throwable;0:0){0§<kotlin.Any?>}[0]
final fun callCancelHandler(kotlinx.coroutines/CancelHandler, kotlin/Throwable?) // kotlinx.coroutines/CancellableContinuationImpl.callCancelHandler|callCancelHandler(kotlinx.coroutines.CancelHandler;kotlin.Throwable?){}[0]
- final fun callOnCancellation(kotlin/Function1<kotlin/Throwable, kotlin/Unit>, kotlin/Throwable) // kotlinx.coroutines/CancellableContinuationImpl.callOnCancellation|callOnCancellation(kotlin.Function1<kotlin.Throwable,kotlin.Unit>;kotlin.Throwable){}[0]
final fun getResult(): kotlin/Any? // kotlinx.coroutines/CancellableContinuationImpl.getResult|getResult(){}[0]
open fun (kotlinx.coroutines/CoroutineDispatcher).resumeUndispatched(#A) // kotlinx.coroutines/CancellableContinuationImpl.resumeUndispatched|[email protected](1:0){}[0]
open fun (kotlinx.coroutines/CoroutineDispatcher).resumeUndispatchedWithException(kotlin/Throwable) // kotlinx.coroutines/CancellableContinuationImpl.resumeUndispatchedWithException|resumeUndispatchedWithException@kotlinx.coroutines.CoroutineDispatcher(kotlin.Throwable){}[0]
+ open fun <#A1: #A> resume(#A1, kotlin/Function3<kotlin/Throwable, #A1, kotlin.coroutines/CoroutineContext, kotlin/Unit>?) // kotlinx.coroutines/CancellableContinuationImpl.resume|resume(0:0;kotlin.Function3<kotlin.Throwable,0:0,kotlin.coroutines.CoroutineContext,kotlin.Unit>?){0§<1:0>}[0]
+ open fun <#A1: #A> tryResume(#A1, kotlin/Any?, kotlin/Function3<kotlin/Throwable, #A1, kotlin.coroutines/CoroutineContext, kotlin/Unit>?): kotlin/Any? // kotlinx.coroutines/CancellableContinuationImpl.tryResume|tryResume(0:0;kotlin.Any?;kotlin.Function3<kotlin.Throwable,0:0,kotlin.coroutines.CoroutineContext,kotlin.Unit>?){0§<1:0>}[0]
open fun cancel(kotlin/Throwable?): kotlin/Boolean // kotlinx.coroutines/CancellableContinuationImpl.cancel|cancel(kotlin.Throwable?){}[0]
open fun completeResume(kotlin/Any) // kotlinx.coroutines/CancellableContinuationImpl.completeResume|completeResume(kotlin.Any){}[0]
open fun getContinuationCancellationCause(kotlinx.coroutines/Job): kotlin/Throwable // kotlinx.coroutines/CancellableContinuationImpl.getContinuationCancellationCause|getContinuationCancellationCause(kotlinx.coroutines.Job){}[0]
@@ -791,7 +794,6 @@
open fun resumeWith(kotlin/Result<#A>) // kotlinx.coroutines/CancellableContinuationImpl.resumeWith|resumeWith(kotlin.Result<1:0>){}[0]
open fun toString(): kotlin/String // kotlinx.coroutines/CancellableContinuationImpl.toString|toString(){}[0]
open fun tryResume(#A, kotlin/Any?): kotlin/Any? // kotlinx.coroutines/CancellableContinuationImpl.tryResume|tryResume(1:0;kotlin.Any?){}[0]
- open fun tryResume(#A, kotlin/Any?, kotlin/Function1<kotlin/Throwable, kotlin/Unit>?): kotlin/Any? // kotlinx.coroutines/CancellableContinuationImpl.tryResume|tryResume(1:0;kotlin.Any?;kotlin.Function1<kotlin.Throwable,kotlin.Unit>?){}[0]
open fun tryResumeWithException(kotlin/Throwable): kotlin/Any? // kotlinx.coroutines/CancellableContinuationImpl.tryResumeWithException|tryResumeWithException(kotlin.Throwable){}[0]
open val callerFrame // kotlinx.coroutines/CancellableContinuationImpl.callerFrame|{}callerFrame[0]
open fun <get-callerFrame>(): kotlinx.coroutines.internal/CoroutineStackFrame? // kotlinx.coroutines/CancellableContinuationImpl.callerFrame.<get-callerFrame>|<get-callerFrame>(){}[0]
@@ -903,7 +905,7 @@
abstract val clauseObject // kotlinx.coroutines.selects/SelectClause.clauseObject|{}clauseObject[0]
abstract fun <get-clauseObject>(): kotlin/Any // kotlinx.coroutines.selects/SelectClause.clauseObject.<get-clauseObject>|<get-clauseObject>(){}[0]
abstract val onCancellationConstructor // kotlinx.coroutines.selects/SelectClause.onCancellationConstructor|{}onCancellationConstructor[0]
- abstract fun <get-onCancellationConstructor>(): kotlin/Function3<kotlinx.coroutines.selects/SelectInstance<*>, kotlin/Any?, kotlin/Any?, kotlin/Function1<kotlin/Throwable, kotlin/Unit>>? // kotlinx.coroutines.selects/SelectClause.onCancellationConstructor.<get-onCancellationConstructor>|<get-onCancellationConstructor>(){}[0]
+ abstract fun <get-onCancellationConstructor>(): kotlin/Function3<kotlinx.coroutines.selects/SelectInstance<*>, kotlin/Any?, kotlin/Any?, kotlin/Function3<kotlin/Throwable, kotlin/Any?, kotlin.coroutines/CoroutineContext, kotlin/Unit>>? // kotlinx.coroutines.selects/SelectClause.onCancellationConstructor.<get-onCancellationConstructor>|<get-onCancellationConstructor>(){}[0]
abstract val processResFunc // kotlinx.coroutines.selects/SelectClause.processResFunc|{}processResFunc[0]
abstract fun <get-processResFunc>(): kotlin/Function3<kotlin/Any, kotlin/Any?, kotlin/Any?, kotlin/Any?> // kotlinx.coroutines.selects/SelectClause.processResFunc.<get-processResFunc>|<get-processResFunc>(){}[0]
abstract val regFunc // kotlinx.coroutines.selects/SelectClause.regFunc|{}regFunc[0]
diff --git a/kotlinx-coroutines-core/common/src/CancellableContinuation.kt b/kotlinx-coroutines-core/common/src/CancellableContinuation.kt
index 6c725d4..4fdc607 100644
--- a/kotlinx-coroutines-core/common/src/CancellableContinuation.kt
+++ b/kotlinx-coroutines-core/common/src/CancellableContinuation.kt
@@ -74,16 +74,21 @@
public fun tryResume(value: T, idempotent: Any? = null): Any?
/**
- * Same as [tryResume] but with [onCancellation] handler that called if and only if the value is not
- * delivered to the caller because of the dispatch in the process, so that atomicity delivery
- * guaranteed can be provided by having a cancellation fallback.
+ * Same as [tryResume] but with an [onCancellation] handler that is called if and only if the value is not
+ * delivered to the caller because of the dispatch in the process.
+ *
+ * The purpose of this function is to enable atomic delivery guarantees: either resumption succeeded, passing
+ * the responsibility for [value] to the continuation, or the [onCancellation] block will be invoked,
+ * allowing one to free the resources in [value].
*
* Implementation note: current implementation always returns RESUME_TOKEN or `null`
*
* @suppress **This is unstable API and it is subject to change.**
*/
@InternalCoroutinesApi
- public fun tryResume(value: T, idempotent: Any?, onCancellation: ((cause: Throwable) -> Unit)?): Any?
+ public fun <R: T> tryResume(
+ value: R, idempotent: Any?, onCancellation: ((cause: Throwable, value: R, context: CoroutineContext) -> Unit)?
+ ): Any?
/**
* Tries to resume this continuation with the specified [exception] and returns a non-null object token if successful,
@@ -126,7 +131,7 @@
* Otherwise, the handler will be invoked as soon as this continuation is cancelled.
*
* The installed [handler] should not throw any exceptions.
- * If it does, they will get caught, wrapped into a [CompletionHandlerException] and
+ * If it does, they will get caught, wrapped into a `CompletionHandlerException` and
* processed as an uncaught exception in the context of the current coroutine
* (see [CoroutineExceptionHandler]).
*
@@ -168,27 +173,50 @@
@ExperimentalCoroutinesApi
public fun CoroutineDispatcher.resumeUndispatchedWithException(exception: Throwable)
+ /** @suppress */
+ @Deprecated(
+ "Use the overload that also accepts the `value` and the coroutine context in lambda",
+ level = DeprecationLevel.WARNING,
+ replaceWith = ReplaceWith("resume(value) { cause, _, _ -> onCancellation(cause) }")
+ ) // warning since 1.9.0, was experimental
+ public fun resume(value: T, onCancellation: ((cause: Throwable) -> Unit)?)
+
/**
- * Resumes this continuation with the specified `value` and calls the specified `onCancellation`
- * handler when either resumed too late (when continuation was already cancelled) or, although resumed
- * successfully (before cancellation), the coroutine's job was cancelled before it had a
- * chance to run in its dispatcher, so that the suspended function threw an exception
- * instead of returning this value.
+ * Resumes this continuation with the specified [value], calling the specified [onCancellation] if and only if
+ * the [value] was not successfully used to resume the continuation.
+ *
+ * The [value] can be rejected in two cases (in both of which [onCancellation] will be called):
+ * - Cancellation happened before the handler was resumed;
+ * - The continuation was resumed successfully (before cancellation), but the coroutine's job was cancelled before
+ * it had a chance to run in its dispatcher, and so the suspended function threw an exception instead of returning
+ * this value.
*
* The installed [onCancellation] handler should not throw any exceptions.
- * If it does, they will get caught, wrapped into a [CompletionHandlerException] and
+ * If it does, they will get caught, wrapped into a `CompletionHandlerException`, and
* processed as an uncaught exception in the context of the current coroutine
* (see [CoroutineExceptionHandler]).
*
- * This function shall be used when resuming with a resource that must be closed by
- * code that called the corresponding suspending function, for example:
+ * With this version of [resume], it's possible to pass resources that can not simply be left for the garbage
+ * collector (like file handles, sockets, etc.) and need to be closed explicitly:
*
* ```
- * continuation.resume(resource) {
- * resource.close()
+ * continuation.resume(resourceToResumeWith) { _, resourceToClose, _ ->
+ * resourceToClose.close()
* }
* ```
*
+ * [onCancellation] accepts three arguments:
+ *
+ * - `cause: Throwable` is the exception with which the continuation was cancelled.
+ * - `value` is exactly the same as the [value] passed to [resume] itself.
+ * In the example above, `resourceToResumeWith` is exactly the same as `resourceToClose`; in particular,
+ * one could call `resourceToResumeWith.close()` in the lambda for the same effect.
+ * The reason to reference `resourceToClose` anyway is to avoid a memory allocation due to the lambda
+ * capturing the `resourceToResumeWith` reference.
+ * - `context` is the [context] of this continuation.
+ * Like with `value`, the reason this is available as a lambda parameter, even though it is always possible to
+ * call [context] from the lambda instead, is to allow lambdas to capture less of their environment.
+ *
* A more complete example and further details are given in
* the documentation for the [suspendCancellableCoroutine] function.
*
@@ -196,8 +224,9 @@
* It can be invoked concurrently with the surrounding code.
* There is no guarantee on the execution context of its invocation.
*/
- @ExperimentalCoroutinesApi // since 1.2.0
- public fun resume(value: T, onCancellation: ((cause: Throwable) -> Unit)?)
+ public fun <R: T> resume(
+ value: R, onCancellation: ((cause: Throwable, value: R, context: CoroutineContext) -> Unit)?
+ )
}
/**
@@ -293,8 +322,10 @@
* override fun onCompleted(resource: T) {
* // Resume coroutine with a value provided by the callback and ensure the resource is closed in case
* // when the coroutine is cancelled before the caller gets a reference to the resource.
- * continuation.resume(resource) {
- * resource.close() // Close the resource on cancellation
+ * continuation.resume(resource) { cause, resourceToClose, context ->
+ * resourceToClose.close() // Close the resource on cancellation
+ * // If we used `resource` instead of `resourceToClose`, this lambda would need to allocate a closure,
+ * // but with `resourceToClose`, the lambda does not capture any of its environment.
* }
* }
* // ...
diff --git a/kotlinx-coroutines-core/common/src/CancellableContinuationImpl.kt b/kotlinx-coroutines-core/common/src/CancellableContinuationImpl.kt
index df73141..00bd47e 100644
--- a/kotlinx-coroutines-core/common/src/CancellableContinuationImpl.kt
+++ b/kotlinx-coroutines-core/common/src/CancellableContinuationImpl.kt
@@ -146,7 +146,7 @@
assert { parentHandle !== NonDisposableHandle }
val state = _state.value
assert { state !is NotCompleted }
- if (state is CompletedContinuation && state.idempotentResume != null) {
+ if (state is CompletedContinuation<*> && state.idempotentResume != null) {
// Cannot reuse continuation that was resumed with idempotent marker
detachChild()
return false
@@ -169,7 +169,7 @@
when (state) {
is NotCompleted -> error("Not completed")
is CompletedExceptionally -> return // already completed exception or cancelled, nothing to do
- is CompletedContinuation -> {
+ is CompletedContinuation<*> -> {
check(!state.cancelled) { "Must be called at most once" }
val update = state.copy(cancelCause = cause)
if (_state.compareAndSet(state, update)) {
@@ -243,9 +243,13 @@
callCancelHandlerSafely { segment.onCancellation(index, cause, context) }
}
- fun callOnCancellation(onCancellation: (cause: Throwable) -> Unit, cause: Throwable) {
+ fun <R> callOnCancellation(
+ onCancellation: (cause: Throwable, value: R, context: CoroutineContext) -> Unit,
+ cause: Throwable,
+ value: R
+ ) {
try {
- onCancellation.invoke(cause)
+ onCancellation.invoke(cause, value, context)
} catch (ex: Throwable) {
// Handler should never fail, if it does -- it is an unhandled exception
handleCoroutineException(
@@ -353,7 +357,14 @@
override fun resumeWith(result: Result<T>) =
resumeImpl(result.toState(this), resumeMode)
+ @Suppress("OVERRIDE_DEPRECATION")
override fun resume(value: T, onCancellation: ((cause: Throwable) -> Unit)?) =
+ resumeImpl(value, resumeMode, onCancellation?.let { { cause, _, _ -> onCancellation(cause) } })
+
+ override fun <R : T> resume(
+ value: R,
+ onCancellation: ((cause: Throwable, value: R, context: CoroutineContext) -> Unit)?
+ ) =
resumeImpl(value, resumeMode, onCancellation)
/**
@@ -380,7 +391,8 @@
invokeOnCancellationImpl(segment)
}
- override fun invokeOnCancellation(handler: CompletionHandler) = invokeOnCancellation(CancelHandler.UserSupplied(handler))
+ override fun invokeOnCancellation(handler: CompletionHandler) =
+ invokeOnCancellation(CancelHandler.UserSupplied(handler))
internal fun invokeOnCancellationInternal(handler: CancelHandler) = invokeOnCancellationImpl(handler)
@@ -415,7 +427,8 @@
}
return
}
- is CompletedContinuation -> {
+
+ is CompletedContinuation<*> -> {
/*
* Continuation was already completed, and might already have cancel handler.
*/
@@ -456,11 +469,11 @@
dispatch(mode)
}
- private fun resumedState(
+ private fun <R> resumedState(
state: NotCompleted,
- proposedUpdate: Any?,
+ proposedUpdate: R,
resumeMode: Int,
- onCancellation: ((cause: Throwable) -> Unit)?,
+ onCancellation: ((cause: Throwable, value: R, context: CoroutineContext) -> Unit)?,
idempotent: Any?
): Any? = when {
proposedUpdate is CompletedExceptionally -> {
@@ -476,10 +489,10 @@
else -> proposedUpdate // simple case -- use the value directly
}
- private fun resumeImpl(
- proposedUpdate: Any?,
+ internal fun <R> resumeImpl(
+ proposedUpdate: R,
resumeMode: Int,
- onCancellation: ((cause: Throwable) -> Unit)? = null
+ onCancellation: ((cause: Throwable, value: R, context: CoroutineContext) -> Unit)? = null
) {
_state.loop { state ->
when (state) {
@@ -490,6 +503,7 @@
dispatchResume(resumeMode) // dispatch resume, but it might get cancelled in process
return // done
}
+
is CancelledContinuation -> {
/*
* If continuation was cancelled, then resume attempt must be ignored,
@@ -498,7 +512,7 @@
*/
if (state.makeResumed()) { // check if trying to resume one (otherwise error)
// call onCancellation
- onCancellation?.let { callOnCancellation(it, state.cause) }
+ onCancellation?.let { callOnCancellation(it, state.cause, proposedUpdate) }
return // done
}
}
@@ -511,10 +525,10 @@
* Similar to [tryResume], but does not actually completes resume (needs [completeResume] call).
* Returns [RESUME_TOKEN] when resumed, `null` when it was already resumed or cancelled.
*/
- private fun tryResumeImpl(
- proposedUpdate: Any?,
+ private fun <R> tryResumeImpl(
+ proposedUpdate: R,
idempotent: Any?,
- onCancellation: ((cause: Throwable) -> Unit)?
+ onCancellation: ((cause: Throwable, value: R, context: CoroutineContext) -> Unit)?
): Symbol? {
_state.loop { state ->
when (state) {
@@ -524,7 +538,7 @@
detachChildIfNonResuable()
return RESUME_TOKEN
}
- is CompletedContinuation -> {
+ is CompletedContinuation<*> -> {
return if (idempotent != null && state.idempotentResume === idempotent) {
assert { state.result == proposedUpdate } // "Non-idempotent resume"
RESUME_TOKEN // resumed with the same token -- ok
@@ -560,7 +574,11 @@
override fun tryResume(value: T, idempotent: Any?): Any? =
tryResumeImpl(value, idempotent, onCancellation = null)
- override fun tryResume(value: T, idempotent: Any?, onCancellation: ((cause: Throwable) -> Unit)?): Any? =
+ override fun <R : T> tryResume(
+ value: R,
+ idempotent: Any?,
+ onCancellation: ((cause: Throwable, value: R, context: CoroutineContext) -> Unit)?
+ ): Any? =
tryResumeImpl(value, idempotent, onCancellation)
override fun tryResumeWithException(exception: Throwable): Any? =
@@ -585,7 +603,7 @@
@Suppress("UNCHECKED_CAST")
override fun <T> getSuccessfulResult(state: Any?): T =
when (state) {
- is CompletedContinuation -> state.result as T
+ is CompletedContinuation<*> -> state.result as T
else -> state as T
}
@@ -652,10 +670,12 @@
}
// Completed with additional metadata
-private data class CompletedContinuation(
- @JvmField val result: Any?,
- @JvmField val cancelHandler: CancelHandler? = null, // installed via invokeOnCancellation
- @JvmField val onCancellation: ((cause: Throwable) -> Unit)? = null, // installed via resume block
+private data class CompletedContinuation<R>(
+ @JvmField val result: R,
+ // installed via `invokeOnCancellation`
+ @JvmField val cancelHandler: CancelHandler? = null,
+ // installed via the `resume` block
+ @JvmField val onCancellation: ((cause: Throwable, value: R, context: CoroutineContext) -> Unit)? = null,
@JvmField val idempotentResume: Any? = null,
@JvmField val cancelCause: Throwable? = null
) {
@@ -663,7 +683,7 @@
fun invokeHandlers(cont: CancellableContinuationImpl<*>, cause: Throwable) {
cancelHandler?.let { cont.callCancelHandler(it, cause) }
- onCancellation?.let { cont.callOnCancellation(it, cause) }
+ onCancellation?.let { cont.callOnCancellation(it, cause, result) }
}
}
diff --git a/kotlinx-coroutines-core/common/src/channels/BufferedChannel.kt b/kotlinx-coroutines-core/common/src/channels/BufferedChannel.kt
index 0d63050..f94a9e9 100644
--- a/kotlinx-coroutines-core/common/src/channels/BufferedChannel.kt
+++ b/kotlinx-coroutines-core/common/src/channels/BufferedChannel.kt
@@ -7,16 +7,13 @@
import kotlinx.coroutines.channels.ChannelResult.Companion.closed
import kotlinx.coroutines.channels.ChannelResult.Companion.failure
import kotlinx.coroutines.channels.ChannelResult.Companion.success
-import kotlinx.coroutines.flow.internal.*
import kotlinx.coroutines.internal.*
import kotlinx.coroutines.selects.*
import kotlinx.coroutines.selects.TrySelectDetailedResult.*
-import kotlin.contracts.*
import kotlin.coroutines.*
import kotlin.js.*
import kotlin.jvm.*
import kotlin.math.*
-import kotlin.random.*
import kotlin.reflect.*
/**
@@ -658,7 +655,7 @@
}
is ReceiveCatching<*> -> {
this as ReceiveCatching<E>
- cont.tryResume0(success(element), onUndeliveredElement?.bindCancellationFun(element, cont.context))
+ cont.tryResume0(success(element), onUndeliveredElement?.bindCancellationFunResult())
}
is BufferedChannel<*>.BufferedChannelIterator -> {
this as BufferedChannel<E>.BufferedChannelIterator
@@ -666,7 +663,7 @@
}
is CancellableContinuation<*> -> { // `receive()`
this as CancellableContinuation<E>
- tryResume0(element, onUndeliveredElement?.bindCancellationFun(element, context))
+ tryResume0(element, onUndeliveredElement?.bindCancellationFun())
}
else -> error("Unexpected receiver type: $this")
}
@@ -728,7 +725,7 @@
// not dispatched yet. In case `onUndeliveredElement` is
// specified, we need to invoke it in the latter case.
onElementRetrieved = { element ->
- val onCancellation = onUndeliveredElement?.bindCancellationFun(element, cont.context)
+ val onCancellation = onUndeliveredElement?.bindCancellationFun()
cont.resume(element, onCancellation)
},
onClosed = { onClosedReceiveOnNoWaiterSuspend(cont) },
@@ -772,7 +769,7 @@
segment, index, r,
waiter = waiter,
onElementRetrieved = { element ->
- cont.resume(success(element), onUndeliveredElement?.bindCancellationFun(element, cont.context))
+ cont.resume(success(element), onUndeliveredElement?.bindCancellationFunResult())
},
onClosed = { onClosedReceiveCatchingOnNoWaiterSuspend(cont) }
)
@@ -1563,7 +1560,9 @@
@Suppress("UNCHECKED_CAST")
private val onUndeliveredElementReceiveCancellationConstructor: OnCancellationConstructor? = onUndeliveredElement?.let {
{ select: SelectInstance<*>, _: Any?, element: Any? ->
- { if (element !== CHANNEL_CLOSED) onUndeliveredElement.callUndeliveredElement(element as E, select.context) }
+ { _, _, _ ->
+ if (element !== CHANNEL_CLOSED) onUndeliveredElement.callUndeliveredElement(element as E, select.context)
+ }
}
}
@@ -1667,7 +1666,7 @@
onElementRetrieved = { element ->
this.receiveResult = element
this.continuation = null
- cont.resume(true, onUndeliveredElement?.bindCancellationFun(element, cont.context))
+ cont.resume(true, onUndeliveredElement?.bindCancellationFun(element))
},
onClosed = { onClosedHasNextNoWaiterSuspend() }
)
@@ -1717,7 +1716,7 @@
// Try to resume this `hasNext()`. Importantly, the receiver coroutine
// may be cancelled after it is successfully resumed but not dispatched yet.
// In case `onUndeliveredElement` is specified, we need to invoke it in the latter case.
- return cont.tryResume0(true, onUndeliveredElement?.bindCancellationFun(element, cont.context))
+ return cont.tryResume0(true, onUndeliveredElement?.bindCancellationFun(element))
}
fun tryResumeHasNextOnClosedChannel() {
@@ -2764,6 +2763,34 @@
segment = segment.next!!
}
}
+
+ private fun OnUndeliveredElement<E>.bindCancellationFunResult() = ::onCancellationChannelResultImplDoNotCall
+
+ /**
+ * Do not call directly. Go through [bindCancellationFunResult] to ensure the callback isn't null.
+ * [bindCancellationFunResult] could have just returned a lambda as well, but there would be a risk of that
+ * lambda capturing the environment.
+ */
+ private fun onCancellationChannelResultImplDoNotCall(
+ cause: Throwable, element: ChannelResult<E>, context: CoroutineContext
+ ) {
+ onUndeliveredElement!!.callUndeliveredElement(element.getOrNull()!!, context)
+ }
+
+ private fun OnUndeliveredElement<E>.bindCancellationFun(element: E):
+ (Throwable, Any?, CoroutineContext) -> Unit =
+ { _: Throwable, _, context: CoroutineContext -> callUndeliveredElement(element, context) }
+
+ private fun OnUndeliveredElement<E>.bindCancellationFun() = ::onCancellationImplDoNotCall
+
+ /**
+ * Do not call directly. Go through [bindCancellationFun] to ensure the callback isn't null.
+ * [bindCancellationFun] could have just returned a lambda as well, but there would be a risk of that
+ * lambda capturing the environment.
+ */
+ private fun onCancellationImplDoNotCall(cause: Throwable, element: E, context: CoroutineContext) {
+ onUndeliveredElement!!.callUndeliveredElement(element, context)
+ }
}
/**
@@ -2923,7 +2950,7 @@
*/
private fun <T> CancellableContinuation<T>.tryResume0(
value: T,
- onCancellation: ((cause: Throwable) -> Unit)? = null
+ onCancellation: ((cause: Throwable, value: T, context: CoroutineContext) -> Unit)? = null
): Boolean =
tryResume(value, null, onCancellation).let { token ->
if (token != null) {
diff --git a/kotlinx-coroutines-core/common/src/channels/ConflatedBufferedChannel.kt b/kotlinx-coroutines-core/common/src/channels/ConflatedBufferedChannel.kt
index 5c7f151..0805c7f 100644
--- a/kotlinx-coroutines-core/common/src/channels/ConflatedBufferedChannel.kt
+++ b/kotlinx-coroutines-core/common/src/channels/ConflatedBufferedChannel.kt
@@ -1,13 +1,9 @@
package kotlinx.coroutines.channels
-import kotlinx.atomicfu.*
import kotlinx.coroutines.channels.BufferOverflow.*
-import kotlinx.coroutines.channels.ChannelResult.Companion.closed
import kotlinx.coroutines.channels.ChannelResult.Companion.success
import kotlinx.coroutines.internal.*
-import kotlinx.coroutines.internal.OnUndeliveredElement
import kotlinx.coroutines.selects.*
-import kotlin.coroutines.*
/**
* This is a special [BufferedChannel] extension that supports [DROP_OLDEST] and [DROP_LATEST]
diff --git a/kotlinx-coroutines-core/common/src/internal/OnUndeliveredElement.kt b/kotlinx-coroutines-core/common/src/internal/OnUndeliveredElement.kt
index 11d7d0b..5ed99d3 100644
--- a/kotlinx-coroutines-core/common/src/internal/OnUndeliveredElement.kt
+++ b/kotlinx-coroutines-core/common/src/internal/OnUndeliveredElement.kt
@@ -29,9 +29,6 @@
}
}
-internal fun <E> OnUndeliveredElement<E>.bindCancellationFun(element: E, context: CoroutineContext): (Throwable) -> Unit =
- { _: Throwable -> callUndeliveredElement(element, context) }
-
/**
* Internal exception that is thrown when [OnUndeliveredElement] handler in
* a [kotlinx.coroutines.channels.Channel] throws an exception.
diff --git a/kotlinx-coroutines-core/common/src/selects/Select.kt b/kotlinx-coroutines-core/common/src/selects/Select.kt
index 16deb19..73313b5 100644
--- a/kotlinx-coroutines-core/common/src/selects/Select.kt
+++ b/kotlinx-coroutines-core/common/src/selects/Select.kt
@@ -105,7 +105,8 @@
@LowPriorityInOverloadResolution
@Deprecated(
message = "Replaced with the same extension function",
- level = DeprecationLevel.ERROR, replaceWith = ReplaceWith(expression = "onTimeout", imports = ["kotlinx.coroutines.selects.onTimeout"])
+ level = DeprecationLevel.ERROR,
+ replaceWith = ReplaceWith(expression = "onTimeout", imports = ["kotlinx.coroutines.selects.onTimeout"])
) // Since 1.7.0, was experimental
public fun onTimeout(timeMillis: Long, block: suspend () -> R): Unit = onTimeout(timeMillis, block)
}
@@ -157,7 +158,8 @@
* as [SelectInstance.selectInRegistrationPhase] can be called when the coroutine is already cancelled.
*/
@InternalCoroutinesApi
-public typealias OnCancellationConstructor = (select: SelectInstance<*>, param: Any?, internalResult: Any?) -> (Throwable) -> Unit
+public typealias OnCancellationConstructor = (select: SelectInstance<*>, param: Any?, internalResult: Any?) ->
+ (Throwable, Any?, CoroutineContext) -> Unit
/**
* Clause for [select] expression without additional parameters that does not select any value.
@@ -171,6 +173,7 @@
) : SelectClause0 {
override val processResFunc: ProcessResultFunction = DUMMY_PROCESS_RESULT_FUNCTION
}
+
private val DUMMY_PROCESS_RESULT_FUNCTION: ProcessResultFunction = { _, _, _ -> null }
/**
@@ -234,7 +237,8 @@
*/
public fun selectInRegistrationPhase(internalResult: Any?)
}
-internal interface SelectInstanceInternal<R>: SelectInstance<R>, Waiter
+
+internal interface SelectInstanceInternal<R> : SelectInstance<R>, Waiter
@PublishedApi
internal open class SelectImplementation<R>(
@@ -346,6 +350,7 @@
* The state of this `select` operation. See the description above for details.
*/
private val state = atomic<Any>(STATE_REG)
+
/**
* Returns `true` if this `select` instance is in the REGISTRATION phase;
* otherwise, returns `false`.
@@ -354,12 +359,14 @@
get() = state.value.let {
it === STATE_REG || it is List<*>
}
+
/**
* Returns `true` if this `select` is already selected;
* thus, other parties are bound to fail when making a rendezvous with it.
*/
private val isSelected
get() = state.value is SelectImplementation<*>.ClauseData
+
/**
* Returns `true` if this `select` is cancelled.
*/
@@ -447,8 +454,10 @@
override fun SelectClause0.invoke(block: suspend () -> R) =
ClauseData(clauseObject, regFunc, processResFunc, PARAM_CLAUSE_0, block, onCancellationConstructor).register()
+
override fun <Q> SelectClause1<Q>.invoke(block: suspend (Q) -> R) =
ClauseData(clauseObject, regFunc, processResFunc, null, block, onCancellationConstructor).register()
+
override fun <P, Q> SelectClause2<P, Q>.invoke(param: P, block: suspend (Q) -> R) =
ClauseData(clauseObject, regFunc, processResFunc, param, block, onCancellationConstructor).register()
@@ -549,7 +558,7 @@
* this function performs registration of such clauses. After that, it atomically stores
* the continuation into the [state] field if there is no more clause to be re-registered.
*/
- private suspend fun waitUntilSelected() = suspendCancellableCoroutine<Unit> sc@ { cont ->
+ private suspend fun waitUntilSelected() = suspendCancellableCoroutine<Unit> sc@{ cont ->
// Update the state.
state.loop { curState ->
when {
@@ -762,7 +771,7 @@
/**
* Each `select` clause is internally represented with a [ClauseData] instance.
- */
+ */
internal inner class ClauseData(
@JvmField val clauseObject: Any, // the object of this `select` clause: Channel, Mutex, Job, ...
private val regFunc: RegistrationFunction,
@@ -771,8 +780,10 @@
private val block: Any, // the user-specified block, which should be called if this clause becomes selected
@JvmField val onCancellationConstructor: OnCancellationConstructor?
) {
- @JvmField var disposableHandleOrSegment: Any? = null
- @JvmField var indexInSegment: Int = -1
+ @JvmField
+ var disposableHandleOrSegment: Any? = null
+ @JvmField
+ var indexInSegment: Int = -1
/**
* Tries to register the specified [select] instance in [clauseObject] and check
@@ -843,8 +854,11 @@
}
}
-private fun CancellableContinuation<Unit>.tryResume(onCancellation: ((cause: Throwable) -> Unit)?): Boolean {
- val token = tryResume(Unit, null, onCancellation) ?: return false
+private fun CancellableContinuation<Unit>.tryResume(
+ onCancellation: ((cause: Throwable, value: Any?, context: CoroutineContext) -> Unit)?
+): Boolean {
+ val token =
+ tryResume(Unit, null, onCancellation) ?: return false
completeResume(token)
return true
}
@@ -854,6 +868,7 @@
private const val TRY_SELECT_REREGISTER = 1
private const val TRY_SELECT_CANCELLED = 2
private const val TRY_SELECT_ALREADY_SELECTED = 3
+
// trySelectDetailed(..) results.
internal enum class TrySelectDetailedResult {
SUCCESSFUL, REREGISTER, CANCELLED, ALREADY_SELECTED
@@ -870,9 +885,11 @@
private val STATE_REG = Symbol("STATE_REG")
private val STATE_COMPLETED = Symbol("STATE_COMPLETED")
private val STATE_CANCELLED = Symbol("STATE_CANCELLED")
+
// As the selection result is nullable, we use this special
// marker for the absence of result.
private val NO_RESULT = Symbol("NO_RESULT")
+
// We use this marker parameter objects to distinguish
// SelectClause[0,1,2] and invoke the user-specified block correctly.
internal val PARAM_CLAUSE_0 = Symbol("PARAM_CLAUSE_0")
diff --git a/kotlinx-coroutines-core/common/src/sync/Mutex.kt b/kotlinx-coroutines-core/common/src/sync/Mutex.kt
index 45bd383..43e0deb 100644
--- a/kotlinx-coroutines-core/common/src/sync/Mutex.kt
+++ b/kotlinx-coroutines-core/common/src/sync/Mutex.kt
@@ -5,6 +5,7 @@
import kotlinx.coroutines.internal.*
import kotlinx.coroutines.selects.*
import kotlin.contracts.*
+import kotlin.coroutines.CoroutineContext
import kotlin.jvm.*
/**
@@ -137,7 +138,7 @@
private val onSelectCancellationUnlockConstructor: OnCancellationConstructor =
{ _: SelectInstance<*>, owner: Any?, _: Any? ->
- { unlock(owner) }
+ { _, _, _ -> unlock(owner) }
}
override val isLocked: Boolean get() =
@@ -248,10 +249,14 @@
@JvmField
val owner: Any?
) : CancellableContinuation<Unit> by cont, Waiter by cont {
- override fun tryResume(value: Unit, idempotent: Any?, onCancellation: ((cause: Throwable) -> Unit)?): Any? {
+ override fun <R : Unit> tryResume(
+ value: R,
+ idempotent: Any?,
+ onCancellation: ((cause: Throwable, value: R, context: CoroutineContext) -> Unit)?
+ ): Any? {
assert { [email protected] === NO_OWNER }
- val token = cont.tryResume(value, idempotent) {
- assert { [email protected] { it === NO_OWNER ||it === owner } }
+ val token = cont.tryResume(value, idempotent) { _, _, _ ->
+ assert { [email protected] { it === NO_OWNER || it === owner } }
[email protected] = owner
unlock(owner)
}
@@ -262,7 +267,10 @@
return token
}
- override fun resume(value: Unit, onCancellation: ((cause: Throwable) -> Unit)?) {
+ override fun <R : Unit> resume(
+ value: R,
+ onCancellation: ((cause: Throwable, value: R, context: CoroutineContext) -> Unit)?
+ ) {
assert { [email protected] === NO_OWNER }
[email protected] = owner
cont.resume(value) { unlock(owner) }
diff --git a/kotlinx-coroutines-core/common/src/sync/Semaphore.kt b/kotlinx-coroutines-core/common/src/sync/Semaphore.kt
index bf568f3..c1bd249 100644
--- a/kotlinx-coroutines-core/common/src/sync/Semaphore.kt
+++ b/kotlinx-coroutines-core/common/src/sync/Semaphore.kt
@@ -146,7 +146,7 @@
private val _availablePermits = atomic(permits - acquiredPermits)
override val availablePermits: Int get() = max(_availablePermits.value, 0)
- private val onCancellationRelease = { _: Throwable -> release() }
+ private val onCancellationRelease = { _: Throwable, _: Unit, _: CoroutineContext -> release() }
override fun tryAcquire(): Boolean {
while (true) {
diff --git a/kotlinx-coroutines-core/common/test/CancellableResumeOldTest.kt b/kotlinx-coroutines-core/common/test/CancellableResumeOldTest.kt
new file mode 100644
index 0000000..501d033
--- /dev/null
+++ b/kotlinx-coroutines-core/common/test/CancellableResumeOldTest.kt
@@ -0,0 +1,291 @@
+@file:Suppress("NAMED_ARGUMENTS_NOT_ALLOWED") // KT-21913
+
+package kotlinx.coroutines
+
+import kotlinx.coroutines.testing.*
+import kotlin.test.*
+
+/**
+ * Test for [CancellableContinuation.resume] with `onCancellation` parameter.
+ */
+@Suppress("DEPRECATION")
+class CancellableResumeOldTest : TestBase() {
+ @Test
+ fun testResumeImmediateNormally() = runTest {
+ expect(1)
+ val ok = suspendCancellableCoroutine<String> { cont ->
+ expect(2)
+ cont.invokeOnCancellation { expectUnreached() }
+ cont.resume("OK") { expectUnreached() }
+ expect(3)
+ }
+ assertEquals("OK", ok)
+ finish(4)
+ }
+
+ @Test
+ fun testResumeImmediateAfterCancel() = runTest(
+ expected = { it is TestException }
+ ) {
+ expect(1)
+ suspendCancellableCoroutine<String> { cont ->
+ expect(2)
+ cont.invokeOnCancellation { expect(3) }
+ cont.cancel(TestException("FAIL"))
+ expect(4)
+ cont.resume("OK") { cause ->
+ expect(5)
+ assertIs<TestException>(cause)
+ }
+ finish(6)
+ }
+ expectUnreached()
+ }
+
+ @Test
+ fun testResumeImmediateAfterCancelWithHandlerFailure() = runTest(
+ expected = { it is TestException },
+ unhandled = listOf(
+ { it is CompletionHandlerException && it.cause is TestException2 },
+ { it is CompletionHandlerException && it.cause is TestException3 }
+ )
+ ) {
+ expect(1)
+ suspendCancellableCoroutine<String> { cont ->
+ expect(2)
+ cont.invokeOnCancellation {
+ expect(3)
+ throw TestException2("FAIL") // invokeOnCancellation handler fails with exception
+ }
+ cont.cancel(TestException("FAIL"))
+ expect(4)
+ cont.resume("OK") { cause ->
+ expect(5)
+ assertIs<TestException>(cause)
+ throw TestException3("FAIL") // onCancellation block fails with exception
+ }
+ finish(6)
+ }
+ expectUnreached()
+ }
+
+ @Test
+ fun testResumeImmediateAfterIndirectCancel() = runTest(
+ expected = { it is CancellationException }
+ ) {
+ expect(1)
+ val ctx = coroutineContext
+ suspendCancellableCoroutine<String> { cont ->
+ expect(2)
+ cont.invokeOnCancellation { expect(3) }
+ ctx.cancel()
+ expect(4)
+ cont.resume("OK") {
+ expect(5)
+ }
+ finish(6)
+ }
+ expectUnreached()
+ }
+
+ @Test
+ fun testResumeImmediateAfterIndirectCancelWithHandlerFailure() = runTest(
+ expected = { it is CancellationException },
+ unhandled = listOf(
+ { it is CompletionHandlerException && it.cause is TestException2 },
+ { it is CompletionHandlerException && it.cause is TestException3 }
+ )
+ ) {
+ expect(1)
+ val ctx = coroutineContext
+ suspendCancellableCoroutine<String> { cont ->
+ expect(2)
+ cont.invokeOnCancellation {
+ expect(3)
+ throw TestException2("FAIL") // invokeOnCancellation handler fails with exception
+ }
+ ctx.cancel()
+ expect(4)
+ cont.resume("OK") {
+ expect(5)
+ throw TestException3("FAIL") // onCancellation block fails with exception
+ }
+ finish(6)
+ }
+ expectUnreached()
+ }
+
+ @Test
+ fun testResumeLaterNormally() = runTest {
+ expect(1)
+ lateinit var cc: CancellableContinuation<String>
+ launch(start = CoroutineStart.UNDISPATCHED) {
+ expect(2)
+ val ok = suspendCancellableCoroutine<String> { cont ->
+ expect(3)
+ cont.invokeOnCancellation { expectUnreached() }
+ cc = cont
+ }
+ assertEquals("OK", ok)
+ finish(6)
+ }
+ expect(4)
+ cc.resume("OK") { expectUnreached() }
+ expect(5)
+ }
+
+ @Test
+ fun testResumeLaterAfterCancel() = runTest {
+ expect(1)
+ lateinit var cc: CancellableContinuation<String>
+ val job = launch(start = CoroutineStart.UNDISPATCHED) {
+ expect(2)
+ try {
+ suspendCancellableCoroutine<String> { cont ->
+ expect(3)
+ cont.invokeOnCancellation { expect(5) }
+ cc = cont
+ }
+ expectUnreached()
+ } catch (e: CancellationException) {
+ finish(9)
+ }
+ }
+ expect(4)
+ job.cancel(TestCancellationException())
+ expect(6)
+ cc.resume("OK") { cause ->
+ expect(7)
+ assertIs<TestCancellationException>(cause)
+ }
+ expect(8)
+ }
+
+ @Test
+ fun testResumeLaterAfterCancelWithHandlerFailure() = runTest(
+ unhandled = listOf(
+ { it is CompletionHandlerException && it.cause is TestException2 },
+ { it is CompletionHandlerException && it.cause is TestException3 }
+ )
+ ) {
+ expect(1)
+ lateinit var cc: CancellableContinuation<String>
+ val job = launch(start = CoroutineStart.UNDISPATCHED) {
+ expect(2)
+ try {
+ suspendCancellableCoroutine<String> { cont ->
+ expect(3)
+ cont.invokeOnCancellation {
+ expect(5)
+ throw TestException2("FAIL") // invokeOnCancellation handler fails with exception
+ }
+ cc = cont
+ }
+ expectUnreached()
+ } catch (e: CancellationException) {
+ finish(9)
+ }
+ }
+ expect(4)
+ job.cancel(TestCancellationException())
+ expect(6)
+ cc.resume("OK") { cause ->
+ expect(7)
+ assertIs<TestCancellationException>(cause)
+ throw TestException3("FAIL") // onCancellation block fails with exception
+ }
+ expect(8)
+ }
+
+ @Test
+ fun testResumeCancelWhileDispatched() = runTest {
+ expect(1)
+ lateinit var cc: CancellableContinuation<String>
+ val job = launch(start = CoroutineStart.UNDISPATCHED) {
+ expect(2)
+ try {
+ suspendCancellableCoroutine<String> { cont ->
+ expect(3)
+ // resumed first, dispatched, then cancelled, but still got invokeOnCancellation call
+ cont.invokeOnCancellation { cause ->
+ // Note: invokeOnCancellation is called before cc.resume(value) { ... } handler
+ expect(7)
+ assertIs<TestCancellationException>(cause)
+ }
+ cc = cont
+ }
+ expectUnreached()
+ } catch (e: CancellationException) {
+ expect(9)
+ }
+ }
+ expect(4)
+ cc.resume("OK") { cause ->
+ // Note: this handler is called after invokeOnCancellation handler
+ expect(8)
+ assertIs<TestCancellationException>(cause)
+ }
+ expect(5)
+ job.cancel(TestCancellationException()) // cancel while execution is dispatched
+ expect(6)
+ yield() // to coroutine -- throws cancellation exception
+ finish(10)
+ }
+
+ @Test
+ fun testResumeCancelWhileDispatchedWithHandlerFailure() = runTest(
+ unhandled = listOf(
+ { it is CompletionHandlerException && it.cause is TestException2 },
+ { it is CompletionHandlerException && it.cause is TestException3 }
+ )
+ ) {
+ expect(1)
+ lateinit var cc: CancellableContinuation<String>
+ val job = launch(start = CoroutineStart.UNDISPATCHED) {
+ expect(2)
+ try {
+ suspendCancellableCoroutine<String> { cont ->
+ expect(3)
+ // resumed first, dispatched, then cancelled, but still got invokeOnCancellation call
+ cont.invokeOnCancellation { cause ->
+ // Note: invokeOnCancellation is called before cc.resume(value) { ... } handler
+ expect(7)
+ assertIs<TestCancellationException>(cause)
+ throw TestException2("FAIL") // invokeOnCancellation handler fails with exception
+ }
+ cc = cont
+ }
+ expectUnreached()
+ } catch (e: CancellationException) {
+ expect(9)
+ }
+ }
+ expect(4)
+ cc.resume("OK") { cause ->
+ // Note: this handler is called after invokeOnCancellation handler
+ expect(8)
+ assertIs<TestCancellationException>(cause)
+ throw TestException3("FAIL") // onCancellation block fails with exception
+ }
+ expect(5)
+ job.cancel(TestCancellationException()) // cancel while execution is dispatched
+ expect(6)
+ yield() // to coroutine -- throws cancellation exception
+ finish(10)
+ }
+
+ @Test
+ fun testResumeUnconfined() = runTest {
+ val outerScope = this
+ withContext(Dispatchers.Unconfined) {
+ val result = suspendCancellableCoroutine<String> {
+ outerScope.launch {
+ it.resume("OK") {
+ expectUnreached()
+ }
+ }
+ }
+ assertEquals("OK", result)
+ }
+ }
+}
diff --git a/kotlinx-coroutines-core/common/test/CancellableResumeTest.kt b/kotlinx-coroutines-core/common/test/CancellableResumeTest.kt
index 517b0ce..c048a8d 100644
--- a/kotlinx-coroutines-core/common/test/CancellableResumeTest.kt
+++ b/kotlinx-coroutines-core/common/test/CancellableResumeTest.kt
@@ -15,7 +15,7 @@
val ok = suspendCancellableCoroutine<String> { cont ->
expect(2)
cont.invokeOnCancellation { expectUnreached() }
- cont.resume("OK") { expectUnreached() }
+ cont.resume("OK") { _, _, _ -> expectUnreached() }
expect(3)
}
assertEquals("OK", ok)
@@ -32,8 +32,11 @@
cont.invokeOnCancellation { expect(3) }
cont.cancel(TestException("FAIL"))
expect(4)
- cont.resume("OK") { cause ->
+ val value = "OK"
+ cont.resume(value) { cause, valueToClose, context ->
expect(5)
+ assertSame(value, valueToClose)
+ assertSame(context, cont.context)
assertIs<TestException>(cause)
}
finish(6)
@@ -58,8 +61,11 @@
}
cont.cancel(TestException("FAIL"))
expect(4)
- cont.resume("OK") { cause ->
+ val value = "OK"
+ cont.resume(value) { cause, valueToClose, context ->
expect(5)
+ assertSame(value, valueToClose)
+ assertSame(context, cont.context)
assertIs<TestException>(cause)
throw TestException3("FAIL") // onCancellation block fails with exception
}
@@ -79,8 +85,12 @@
cont.invokeOnCancellation { expect(3) }
ctx.cancel()
expect(4)
- cont.resume("OK") {
+ val value = "OK"
+ cont.resume(value) { cause, valueToClose, context ->
expect(5)
+ assertSame(value, valueToClose)
+ assertSame(context, cont.context)
+ assertIs<CancellationException>(cause)
}
finish(6)
}
@@ -105,8 +115,12 @@
}
ctx.cancel()
expect(4)
- cont.resume("OK") {
+ val value = "OK"
+ cont.resume(value) { cause, valueToClose, context ->
expect(5)
+ assertSame(value, valueToClose)
+ assertSame(context, cont.context)
+ assertIs<CancellationException>(cause)
throw TestException3("FAIL") // onCancellation block fails with exception
}
finish(6)
@@ -129,7 +143,7 @@
finish(6)
}
expect(4)
- cc.resume("OK") { expectUnreached() }
+ cc.resume("OK") { _, _, _ -> expectUnreached() }
expect(5)
}
@@ -146,15 +160,18 @@
cc = cont
}
expectUnreached()
- } catch (e: CancellationException) {
+ } catch (_: CancellationException) {
finish(9)
}
}
expect(4)
job.cancel(TestCancellationException())
expect(6)
- cc.resume("OK") { cause ->
+ val value = "OK"
+ cc.resume(value) { cause, valueToClose, context ->
expect(7)
+ assertSame(value, valueToClose)
+ assertSame(context, cc.context)
assertIs<TestCancellationException>(cause)
}
expect(8)
@@ -181,15 +198,18 @@
cc = cont
}
expectUnreached()
- } catch (e: CancellationException) {
+ } catch (_: CancellationException) {
finish(9)
}
}
expect(4)
job.cancel(TestCancellationException())
expect(6)
- cc.resume("OK") { cause ->
+ val value = "OK"
+ cc.resume(value) { cause, valueToClose, context ->
expect(7)
+ assertSame(value, valueToClose)
+ assertSame(context, cc.context)
assertIs<TestCancellationException>(cause)
throw TestException3("FAIL") // onCancellation block fails with exception
}
@@ -214,14 +234,17 @@
cc = cont
}
expectUnreached()
- } catch (e: CancellationException) {
+ } catch (_: CancellationException) {
expect(9)
}
}
expect(4)
- cc.resume("OK") { cause ->
+ val value = "OK"
+ cc.resume("OK") { cause, valueToClose, context ->
// Note: this handler is called after invokeOnCancellation handler
expect(8)
+ assertSame(value, valueToClose)
+ assertSame(context, cc.context)
assertIs<TestCancellationException>(cause)
}
expect(5)
@@ -255,14 +278,17 @@
cc = cont
}
expectUnreached()
- } catch (e: CancellationException) {
+ } catch (_: CancellationException) {
expect(9)
}
}
expect(4)
- cc.resume("OK") { cause ->
+ val value = "OK"
+ cc.resume(value) { cause, valueToClose, context ->
// Note: this handler is called after invokeOnCancellation handler
expect(8)
+ assertSame(value, valueToClose)
+ assertSame(context, cc.context)
assertIs<TestCancellationException>(cause)
throw TestException3("FAIL") // onCancellation block fails with exception
}
@@ -279,7 +305,7 @@
withContext(Dispatchers.Unconfined) {
val result = suspendCancellableCoroutine<String> {
outerScope.launch {
- it.resume("OK") {
+ it.resume("OK") { _, _, _ ->
expectUnreached()
}
}