Improve behavior and test coverage  of CameraGraph.useSessionIn

The primary goal of this CL is to address delay() and exception
handling when using a session. Without this change, running
tests inside `runTest` or `runBlocking` with useSessionIn can
cause the test method to timeout or stall. This change also
includes a test to ensure this behavior functions correctly:

```
@Test
fun useSessionInWithRunBlockingDoesNotStall() = runBlocking {
    val deferred = cameraGraph.useSessionIn(this) { delay(1) }
    deferred.await() // Make sure this does not block.
}
```

Test: ./gradlew\
 :camera:camera-camera2-pipe:testDebugUnitTest\
 :camera:camera-camera2-pipe-testing:testDebugUnitTest\
 :camera:camera-camera2-pipe-integration:testDebugUnitTest

Change-Id: I11fa030607ec27e116947724e562deaed04f6398
diff --git a/camera/camera-camera2-pipe-testing/src/main/java/androidx/camera/camera2/pipe/testing/CameraGraphSimulator.kt b/camera/camera-camera2-pipe-testing/src/main/java/androidx/camera/camera2/pipe/testing/CameraGraphSimulator.kt
index df26736..5c70bb3 100644
--- a/camera/camera-camera2-pipe-testing/src/main/java/androidx/camera/camera2/pipe/testing/CameraGraphSimulator.kt
+++ b/camera/camera-camera2-pipe-testing/src/main/java/androidx/camera/camera2/pipe/testing/CameraGraphSimulator.kt
@@ -33,7 +33,6 @@
 import androidx.camera.camera2.pipe.RequestFailure
 import androidx.camera.camera2.pipe.StreamId
 import kotlinx.atomicfu.atomic
-import kotlinx.coroutines.ExperimentalCoroutinesApi
 import kotlinx.coroutines.test.StandardTestDispatcher
 import kotlinx.coroutines.test.TestScope
 import kotlinx.coroutines.withTimeout
@@ -70,7 +69,6 @@
          * test completes and allows the test to provide more fine grained control over the
          * interactions.
          */
-        @OptIn(ExperimentalCoroutinesApi::class)
         fun create(
             scope: TestScope,
             context: Context,
diff --git a/camera/camera-camera2-pipe/src/main/java/androidx/camera/camera2/pipe/graph/CameraGraphImpl.kt b/camera/camera-camera2-pipe/src/main/java/androidx/camera/camera2/pipe/graph/CameraGraphImpl.kt
index 5db75d8..3b93611 100644
--- a/camera/camera-camera2-pipe/src/main/java/androidx/camera/camera2/pipe/graph/CameraGraphImpl.kt
+++ b/camera/camera-camera2-pipe/src/main/java/androidx/camera/camera2/pipe/graph/CameraGraphImpl.kt
@@ -42,6 +42,7 @@
 import kotlinx.coroutines.CoroutineScope
 import kotlinx.coroutines.CoroutineStart
 import kotlinx.coroutines.Deferred
+import kotlinx.coroutines.Job
 import kotlinx.coroutines.async
 import kotlinx.coroutines.coroutineScope
 import kotlinx.coroutines.ensureActive
@@ -172,29 +173,37 @@
     override fun <T> useSessionIn(
         scope: CoroutineScope,
         action: suspend CoroutineScope.(CameraGraph.Session) -> T
-    ): Deferred<T> =
-        scope.async(start = CoroutineStart.UNDISPATCHED) {
-            ensureActive() // Exit early if the parent scope has been canceled.
+    ): Deferred<T> {
+        // https://github.com/Kotlin/kotlinx.coroutines/issues/1578
+        // To handle `runBlocking` we need to use `job.complete()` in `result.invokeOnCompletion`.
+        // However, if we do this directly on the scope that is provided it will cause
+        // SupervisorScopes to block and never complete. To work around this, we create a childJob,
+        // propagate the existing context, and use that as the context for scope.async.
+        val childJob = Job(scope.coroutineContext[Job])
+        val context = scope.coroutineContext + childJob
+        val result =
+            scope.async(context = context, start = CoroutineStart.UNDISPATCHED) {
+                ensureActive() // Exit early if the parent scope has been canceled.
 
-            // It is very important to acquire *and* suspend here. Invoking a coroutine using
-            // UNDISPATCHED will execute on the current thread until the suspension point, and this
-            // will
-            // force the execution to switch to the provided scope after ensuring the lock is
-            // acquired
-            // or in the queue. This guarantees exclusion, ordering, and execution within the
-            // correct
-            // scope.
-            val token = sessionMutex.acquireTokenAndSuspend()
+                // It is very important to acquire *and* suspend here. Invoking a coroutine using
+                // UNDISPATCHED will execute on the current thread until the suspension point, and
+                // this will force the execution to switch to the provided scope after ensuring the
+                // lock is acquired or in the queue. This guarantees exclusion, ordering, and
+                // execution within the correct scope.
+                val token = sessionMutex.acquireTokenAndSuspend()
 
-            // Create and use the session.
-            createSessionFromToken(token).use {
-                // Wrap the block in a coroutineScope to ensure all operations are completed before
-                // exiting and releasing the lock. The lock can be released early if the calling
-                // action
-                // decided to call session.close() early.
-                coroutineScope { action(it) }
+                // Create and use the session
+                createSessionFromToken(token).use {
+                    // Wrap the block in a coroutineScope to ensure all operations are completed
+                    // before exiting and releasing the lock. The lock can be released early if the
+                    // calling action decides to call session.close() early.
+                    coroutineScope { action(it) }
+                }
             }
-        }
+
+        result.invokeOnCompletion { childJob.complete() }
+        return result
+    }
 
     private fun createSessionFromToken(token: Token) =
         CameraGraphSessionImpl(token, graphProcessor, controller3A, frameCaptureQueue)
diff --git a/camera/camera-camera2-pipe/src/test/java/androidx/camera/camera2/pipe/graph/CameraGraphImplTest.kt b/camera/camera-camera2-pipe/src/test/java/androidx/camera/camera2/pipe/graph/CameraGraphImplTest.kt
index 8df2b23..78ebf5d 100644
--- a/camera/camera-camera2-pipe/src/test/java/androidx/camera/camera2/pipe/graph/CameraGraphImplTest.kt
+++ b/camera/camera-camera2-pipe/src/test/java/androidx/camera/camera2/pipe/graph/CameraGraphImplTest.kt
@@ -42,11 +42,21 @@
 import androidx.camera.camera2.pipe.testing.FakeThreads
 import androidx.camera.camera2.pipe.testing.RobolectricCameraPipeTestRunner
 import androidx.test.core.app.ApplicationProvider
+import androidx.testutils.assertThrows
 import com.google.common.truth.Truth.assertThat
+import kotlinx.coroutines.CancellationException
+import kotlinx.coroutines.CoroutineScope
 import kotlinx.coroutines.ExperimentalCoroutinesApi
+import kotlinx.coroutines.Job
+import kotlinx.coroutines.SupervisorJob
+import kotlinx.coroutines.async
+import kotlinx.coroutines.delay
+import kotlinx.coroutines.isActive
+import kotlinx.coroutines.runBlocking
 import kotlinx.coroutines.test.TestScope
 import kotlinx.coroutines.test.advanceUntilIdle
 import kotlinx.coroutines.test.runTest
+import kotlinx.coroutines.yield
 import org.junit.Test
 import org.junit.runner.RunWith
 import org.mockito.kotlin.eq
@@ -61,6 +71,8 @@
 @DoNotInstrument
 @Config(minSdk = Build.VERSION_CODES.LOLLIPOP)
 internal class CameraGraphImplTest {
+    private val testScope = TestScope()
+
     private val context = ApplicationProvider.getApplicationContext() as Context
     private val metadata =
         FakeCameraMetadata(
@@ -77,184 +89,374 @@
     private val stream2Config =
         CameraStream.Config.create(Size(1920, 1080), StreamFormat.YUV_420_888)
 
-    private lateinit var cameraController: CameraControllerSimulator
-    private lateinit var stream1: CameraStream
-    private lateinit var stream2: CameraStream
+    private val graphConfig =
+        CameraGraph.Config(
+            camera = metadata.camera,
+            streams = listOf(stream1Config, stream2Config),
+        )
+    private val threads = FakeThreads.fromTestScope(testScope)
+    private val backend = FakeCameraBackend(fakeCameras = mapOf(metadata.camera to metadata))
+    private val backends =
+        CameraBackendsImpl(
+            defaultBackendId = backend.id,
+            cameraBackends = mapOf(backend.id to CameraBackendFactory { backend }),
+            context,
+            threads
+        )
+    private val cameraContext = CameraBackendsImpl.CameraBackendContext(context, threads, backends)
+    private val graphLifecycleManager = GraphLifecycleManager(threads)
+    private val streamGraph = StreamGraphImpl(metadata, graphConfig)
+    private val imageSourceMap = ImageSourceMap(graphConfig, streamGraph, threads)
+    private val frameCaptureQueue = FrameCaptureQueue()
+    private val frameDistributor =
+        FrameDistributor(imageSourceMap.imageSources, frameCaptureQueue) {}
+    private val cameraController =
+        CameraControllerSimulator(cameraContext, graphConfig, fakeGraphProcessor, streamGraph)
 
-    private fun initializeCameraGraphImpl(scope: TestScope): CameraGraphImpl {
-        val graphConfig =
-            CameraGraph.Config(
-                camera = metadata.camera,
-                streams = listOf(stream1Config, stream2Config),
-            )
-        val threads = FakeThreads.fromTestScope(scope)
-        val backend = FakeCameraBackend(fakeCameras = mapOf(metadata.camera to metadata))
-        val backends =
-            CameraBackendsImpl(
-                defaultBackendId = backend.id,
-                cameraBackends = mapOf(backend.id to CameraBackendFactory { backend }),
-                context,
-                threads
-            )
-        val cameraContext = CameraBackendsImpl.CameraBackendContext(context, threads, backends)
-        val graphLifecycleManager = GraphLifecycleManager(threads)
-        val streamGraph = StreamGraphImpl(metadata, graphConfig)
-        val imageSourceMap = ImageSourceMap(graphConfig, streamGraph, threads)
-        val frameCaptureQueue = FrameCaptureQueue()
-        val frameDistributor = FrameDistributor(imageSourceMap.imageSources, frameCaptureQueue) {}
-        cameraController =
-            CameraControllerSimulator(cameraContext, graphConfig, fakeGraphProcessor, streamGraph)
+    private val surfaceGraph =
+        SurfaceGraph(streamGraph, cameraController, cameraSurfaceManager, emptyMap())
+    private val audioRestriction = FakeAudioRestrictionController()
+    private val cameraGraphId = CameraGraphId.nextId()
+    private val cameraGraph =
+        CameraGraphImpl(
+            graphConfig,
+            metadata,
+            cameraGraphId,
+            graphLifecycleManager,
+            fakeGraphProcessor,
+            fakeGraphProcessor,
+            streamGraph,
+            surfaceGraph,
+            backend,
+            cameraController,
+            GraphState3A(),
+            Listener3A(),
+            frameDistributor,
+            frameCaptureQueue,
+            audioRestriction
+        )
+    private val stream1: CameraStream =
+        checkNotNull(cameraGraph.streams[stream1Config]) {
+            "Failed to find stream for $stream1Config!"
+        }
+
+    private val stream2 =
+        checkNotNull(cameraGraph.streams[stream2Config]) {
+            "Failed to find stream for $stream2Config!"
+        }
+
+    init {
         cameraSurfaceManager.addListener(fakeSurfaceListener)
-        val surfaceGraph =
-            SurfaceGraph(streamGraph, cameraController, cameraSurfaceManager, emptyMap())
-        val audioRestriction = FakeAudioRestrictionController()
-        val cameraGraphId = CameraGraphId.nextId()
-        val graph =
-            CameraGraphImpl(
-                graphConfig,
-                metadata,
-                cameraGraphId,
-                graphLifecycleManager,
-                fakeGraphProcessor,
-                fakeGraphProcessor,
-                streamGraph,
-                surfaceGraph,
-                backend,
-                cameraController,
-                GraphState3A(),
-                Listener3A(),
-                frameDistributor,
-                frameCaptureQueue,
-                audioRestriction
-            )
-        stream1 =
-            checkNotNull(graph.streams[stream1Config]) {
-                "Failed to find stream for $stream1Config!"
-            }
+    }
 
-        stream2 =
-            checkNotNull(graph.streams[stream2Config]) {
-                "Failed to find stream for $stream2Config!"
-            }
-        return graph
+    @Test fun createCameraGraphImpl() = testScope.runTest { assertThat(cameraGraph).isNotNull() }
+
+    @Test
+    fun testAcquireSession() =
+        testScope.runTest {
+            val session = cameraGraph.acquireSession()
+            assertThat(session).isNotNull()
+        }
+
+    @Test
+    fun testAcquireSessionOrNull() =
+        testScope.runTest {
+            val session = cameraGraph.acquireSessionOrNull()
+            assertThat(session).isNotNull()
+        }
+
+    @Test
+    fun testAcquireSessionOrNullAfterAcquireSession() =
+        testScope.runTest {
+            val session = cameraGraph.acquireSession()
+            assertThat(session).isNotNull()
+
+            // Since a session is already active, an attempt to acquire another session will fail.
+            val session1 = cameraGraph.acquireSessionOrNull()
+            assertThat(session1).isNull()
+
+            // Closing an active session should allow a new session instance to be created.
+            session.close()
+
+            val session2 = cameraGraph.acquireSessionOrNull()
+            assertThat(session2).isNotNull()
+        }
+
+    @Test
+    fun sessionSubmitsRequestsToGraphProcessor() =
+        testScope.runTest {
+            val session = checkNotNull(cameraGraph.acquireSessionOrNull())
+            val request = Request(listOf())
+            session.submit(request)
+            advanceUntilIdle()
+
+            assertThat(fakeGraphProcessor.requestQueue).contains(listOf(request))
+        }
+
+    @Test
+    fun sessionSetsRepeatingRequestOnGraphProcessor() =
+        testScope.runTest {
+            val session = checkNotNull(cameraGraph.acquireSessionOrNull())
+            val request = Request(listOf())
+            session.startRepeating(request)
+            advanceUntilIdle()
+
+            assertThat(fakeGraphProcessor.repeatingRequest).isSameInstanceAs(request)
+        }
+
+    @Test
+    fun sessionAbortsRequestOnGraphProcessor() =
+        testScope.runTest {
+            val session = checkNotNull(cameraGraph.acquireSessionOrNull())
+            val request = Request(listOf())
+            session.submit(request)
+            session.abort()
+            advanceUntilIdle()
+
+            assertThat(fakeGraphProcessor.requestQueue).isEmpty()
+        }
+
+    @Test
+    fun closingSessionDoesNotCloseGraphProcessor() =
+        testScope.runTest {
+            val session = cameraGraph.acquireSessionOrNull()
+            checkNotNull(session).close()
+            advanceUntilIdle()
+
+            assertThat(fakeGraphProcessor.closed).isFalse()
+        }
+
+    @Test
+    fun closingCameraGraphClosesGraphProcessor() =
+        testScope.runTest {
+            cameraGraph.close()
+            assertThat(fakeGraphProcessor.closed).isTrue()
+        }
+
+    @Test
+    fun stoppingCameraGraphStopsGraphProcessor() =
+        testScope.runTest {
+            assertThat(cameraController.started).isFalse()
+            assertThat(fakeGraphProcessor.closed).isFalse()
+            cameraGraph.start()
+            assertThat(cameraController.started).isTrue()
+            cameraGraph.stop()
+            assertThat(cameraController.started).isFalse()
+            assertThat(fakeGraphProcessor.closed).isFalse()
+            cameraGraph.start()
+            assertThat(cameraController.started).isTrue()
+            cameraGraph.close()
+            assertThat(cameraController.started).isFalse()
+            assertThat(fakeGraphProcessor.closed).isTrue()
+        }
+
+    @Test
+    fun closingCameraGraphClosesAssociatedSurfaces() =
+        testScope.runTest {
+            cameraGraph.setSurface(stream1.id, imageReader1.surface)
+            cameraGraph.setSurface(stream2.id, imageReader2.surface)
+            cameraGraph.close()
+
+            verify(fakeSurfaceListener, times(1)).onSurfaceActive(eq(imageReader1.surface))
+            verify(fakeSurfaceListener, times(1)).onSurfaceActive(eq(imageReader2.surface))
+            verify(fakeSurfaceListener, times(1)).onSurfaceInactive(eq(imageReader1.surface))
+            verify(fakeSurfaceListener, times(1)).onSurfaceInactive(eq(imageReader1.surface))
+        }
+
+    @Test
+    fun useSessionInOperatesInOrder() =
+        testScope.runTest {
+            val events = mutableListOf<Int>()
+            val job1 =
+                cameraGraph.useSessionIn(testScope) {
+                    yield()
+                    events += 2
+                }
+            val job2 =
+                cameraGraph.useSessionIn(testScope) {
+                    delay(100)
+                    events += 3
+                }
+            val job3 =
+                cameraGraph.useSessionIn(testScope) {
+                    yield()
+                    events += 4
+                }
+
+            events += 1
+            job1.join()
+            job2.join()
+            job3.join()
+
+            assertThat(events).containsExactly(1, 2, 3, 4).inOrder()
+        }
+
+    @Test
+    fun useSessionWithEarlyCloseAllowsInterleavedExecution() =
+        testScope.runTest {
+            val events = mutableListOf<Int>()
+            val job1 =
+                cameraGraph.useSessionIn(testScope) { session ->
+                    yield()
+                    events += 2
+                    session.close()
+                    delay(1000)
+                    events += 5
+                }
+            val job2 =
+                cameraGraph.useSessionIn(testScope) {
+                    delay(100)
+                    events += 3
+                }
+            val job3 =
+                cameraGraph.useSessionIn(testScope) {
+                    yield()
+                    events += 4
+                }
+
+            events += 1
+            job1.join()
+            job2.join()
+            job3.join()
+
+            assertThat(events).containsExactly(1, 2, 3, 4, 5).inOrder()
+        }
+
+    @Test
+    fun useSessionInWithRunBlockingDoesNotStall() = runBlocking {
+        val deferred = cameraGraph.useSessionIn(this) { delay(1) }
+        deferred.await() // Make sure this does not block.
     }
 
     @Test
-    fun createCameraGraphImpl() = runTest {
-        val cameraGraphImpl = initializeCameraGraphImpl(this)
-        assertThat(cameraGraphImpl).isNotNull()
-    }
+    fun coroutineScope_isCanceledWithException() =
+        testScope.runTest {
+            val scope = CoroutineScope(Job())
+
+            val deferred = scope.async { throw RuntimeException() }
+            deferred.join()
+
+            // Ensure the deferred is completed with an exception, and that the scope is NOT active.
+            assertThat(deferred.isCompleted).isTrue()
+            assertThat(deferred.getCompletionExceptionOrNull())
+                .isInstanceOf(RuntimeException::class.java)
+            assertThrows<RuntimeException> { deferred.await() }
+            assertThat(scope.isActive).isFalse()
+        }
 
     @Test
-    fun testAcquireSession() = runTest {
-        val cameraGraphImpl = initializeCameraGraphImpl(this)
-        val session = cameraGraphImpl.acquireSession()
-        assertThat(session).isNotNull()
-    }
+    fun coroutineSupervisorScope_isNotCanceledWithException() =
+        testScope.runTest {
+            val scope = CoroutineScope(SupervisorJob())
+
+            val deferred = scope.async { throw RuntimeException() }
+            deferred.join()
+
+            // Ensure the deferred is completed with an exception, and that the scope remains
+            // active.
+            assertThat(deferred.isCompleted).isTrue()
+            assertThat(deferred.getCompletionExceptionOrNull())
+                .isInstanceOf(RuntimeException::class.java)
+            assertThrows<RuntimeException> { deferred.await() }
+            assertThat(scope.isActive).isTrue()
+        }
 
     @Test
-    fun testAcquireSessionOrNull() = runTest {
-        val cameraGraphImpl = initializeCameraGraphImpl(this)
-        val session = cameraGraphImpl.acquireSessionOrNull()
-        assertThat(session).isNotNull()
-    }
+    fun useSessionIn_scopeIsCanceledWithException() =
+        testScope.runTest {
+            val scope = CoroutineScope(Job())
+
+            val deferred = cameraGraph.useSessionIn(scope) { throw RuntimeException() }
+            deferred.join()
+
+            assertThat(deferred.isCompleted).isTrue()
+            assertThat(deferred.getCompletionExceptionOrNull())
+                .isInstanceOf(RuntimeException::class.java)
+            assertThrows<RuntimeException> { deferred.await() }
+            assertThat(scope.isActive).isFalse() // Regular scopes are canceled
+        }
 
     @Test
-    fun testAcquireSessionOrNullAfterAcquireSession() = runTest {
-        val cameraGraphImpl = initializeCameraGraphImpl(this)
-        val session = cameraGraphImpl.acquireSession()
-        assertThat(session).isNotNull()
+    fun useSessionIn_supervisorScopeIsNotCanceledWithException() =
+        testScope.runTest {
+            val scope = CoroutineScope(SupervisorJob())
+            val deferred = cameraGraph.useSessionIn(scope) { throw RuntimeException() }
+            deferred.join()
 
-        // Since a session is already active, an attempt to acquire another session will fail.
-        val session1 = cameraGraphImpl.acquireSessionOrNull()
-        assertThat(session1).isNull()
-
-        // Closing an active session should allow a new session instance to be created.
-        session.close()
-
-        val session2 = cameraGraphImpl.acquireSessionOrNull()
-        assertThat(session2).isNotNull()
-    }
+            assertThat(deferred.isCompleted).isTrue()
+            assertThat(deferred.getCompletionExceptionOrNull())
+                .isInstanceOf(RuntimeException::class.java)
+            assertThrows<RuntimeException> { deferred.await() }
+            assertThat(scope.isActive).isTrue() // Supervisor scopes are not canceled
+        }
 
     @Test
-    fun sessionSubmitsRequestsToGraphProcessor() = runTest {
-        val cameraGraphImpl = initializeCameraGraphImpl(this)
-        val session = checkNotNull(cameraGraphImpl.acquireSessionOrNull())
-        val request = Request(listOf())
-        session.submit(request)
-        advanceUntilIdle()
+    fun coroutineSupervisorTestScope_isNotCanceledWithException() =
+        testScope.runTest {
+            // This illustrates the correct way to create a scope that uses the testScope
+            // dispatcher, does delay skipping, but also does not fail the test if an exception
+            // occurs when doing scope.async. This is useful if, for example, in a real environment
+            // scope represents a supervisor job that will not crash if a coroutine fails and if
+            // some other system is handling the result of the deferred.
+            val scope = CoroutineScope(testScope.coroutineContext + Job())
 
-        assertThat(fakeGraphProcessor.requestQueue).contains(listOf(request))
-    }
+            val deferred =
+                scope.async {
+                    delay(100000) // Delay skipping
+                    throw RuntimeException()
+                }
+            deferred.join()
+
+            assertThat(deferred.isCompleted).isTrue()
+            assertThat(deferred.getCompletionExceptionOrNull())
+                .isInstanceOf(RuntimeException::class.java)
+            assertThrows<RuntimeException> { deferred.await() }
+            assertThat(scope.isActive).isFalse()
+            assertThat(testScope.isActive).isTrue()
+        }
 
     @Test
-    fun sessionSetsRepeatingRequestOnGraphProcessor() = runTest {
-        val cameraGraphImpl = initializeCameraGraphImpl(this)
-        val session = checkNotNull(cameraGraphImpl.acquireSessionOrNull())
-        val request = Request(listOf())
-        session.startRepeating(request)
-        advanceUntilIdle()
+    fun useSessionIn_withSupervisorTestScopeDoesNotCancelTestScope() =
+        testScope.runTest {
+            // Create a scope that uses the testScope dispatcher and delaySkipping, but does not
+            // fail
+            // the test if an exception occurs in useSessionIn.
+            val scope = CoroutineScope(testScope.coroutineContext + SupervisorJob())
 
-        assertThat(fakeGraphProcessor.repeatingRequest).isSameInstanceAs(request)
-    }
+            // If you pass in a testScope to useSessionIn, any exception will cause the test to
+            // fail. If, instead, you want to test that the deferred handles the exception, you must
+            // pass in an independent CoroutineScope.
+            val deferred = cameraGraph.useSessionIn(scope) { throw RuntimeException() }
+            deferred.join()
+
+            assertThat(deferred.isCompleted).isTrue()
+            assertThat(deferred.getCompletionExceptionOrNull())
+                .isInstanceOf(RuntimeException::class.java)
+            assertThat(scope.isActive).isTrue() // Supervisor scopes are not canceled
+            assertThat(testScope.isActive).isTrue()
+        }
 
     @Test
-    fun sessionAbortsRequestOnGraphProcessor() = runTest {
-        val cameraGraphImpl = initializeCameraGraphImpl(this)
-        val session = checkNotNull(cameraGraphImpl.acquireSessionOrNull())
-        val request = Request(listOf())
-        session.submit(request)
-        session.abort()
-        advanceUntilIdle()
+    fun useSessionIn_withCancellationDoesNotFailTest() =
+        testScope.runTest {
+            val deferred =
+                cameraGraph.useSessionIn(testScope) {
+                    throw CancellationException() // Throwing cancellation does not cause the test
+                    // to fail.
+                }
+            deferred.join()
 
-        assertThat(fakeGraphProcessor.requestQueue).isEmpty()
-    }
+            assertThat(deferred.isActive).isFalse()
+            assertThat(deferred.isCompleted).isTrue()
+            assertThat(deferred.isCancelled).isTrue()
+            assertThat(deferred.getCompletionExceptionOrNull())
+                .isInstanceOf(CancellationException::class.java)
+            assertThat(testScope.isActive).isTrue()
+        }
 
     @Test
-    fun closingSessionDoesNotCloseGraphProcessor() = runTest {
-        val cameraGraphImpl = initializeCameraGraphImpl(this)
-        val session = cameraGraphImpl.acquireSessionOrNull()
-        checkNotNull(session).close()
-        advanceUntilIdle()
-
-        assertThat(fakeGraphProcessor.closed).isFalse()
-    }
-
-    @Test
-    fun closingCameraGraphClosesGraphProcessor() = runTest {
-        val cameraGraphImpl = initializeCameraGraphImpl(this)
-        cameraGraphImpl.close()
-        assertThat(fakeGraphProcessor.closed).isTrue()
-    }
-
-    @Test
-    fun stoppingCameraGraphStopsGraphProcessor() = runTest {
-        val cameraGraph = initializeCameraGraphImpl(this)
-
-        assertThat(cameraController.started).isFalse()
-        assertThat(fakeGraphProcessor.closed).isFalse()
-        cameraGraph.start()
-        assertThat(cameraController.started).isTrue()
-        cameraGraph.stop()
-        assertThat(cameraController.started).isFalse()
-        assertThat(fakeGraphProcessor.closed).isFalse()
-        cameraGraph.start()
-        assertThat(cameraController.started).isTrue()
-        cameraGraph.close()
-        assertThat(cameraController.started).isFalse()
-        assertThat(fakeGraphProcessor.closed).isTrue()
-    }
-
-    @Test
-    fun closingCameraGraphClosesAssociatedSurfaces() = runTest {
-        val cameraGraph = initializeCameraGraphImpl(this)
-        cameraGraph.setSurface(stream1.id, imageReader1.surface)
-        cameraGraph.setSurface(stream2.id, imageReader2.surface)
-        cameraGraph.close()
-
-        verify(fakeSurfaceListener, times(1)).onSurfaceActive(eq(imageReader1.surface))
-        verify(fakeSurfaceListener, times(1)).onSurfaceActive(eq(imageReader2.surface))
-        verify(fakeSurfaceListener, times(1)).onSurfaceInactive(eq(imageReader1.surface))
-        verify(fakeSurfaceListener, times(1)).onSurfaceInactive(eq(imageReader1.surface))
-    }
+    fun useSession_throwsExceptions() =
+        testScope.runTest {
+            assertThrows<RuntimeException> { cameraGraph.useSession { throw RuntimeException() } }
+        }
 }