Introduce a ProcessingQueue to CameraPipe to bulk process events.
This introduces a ProcessingQueue class that handles a common pattern
of enqueuing and processing lists of events on a background scope
using a channel.
Test: ./gradlew\
:camera:camera-camera2-pipe:testDebugUnitTest\
:camera:camera-camera2-pipe-testing:testDebugUnitTest\
:camera:camera-camera2-pipe-integration:testDebugUnitTest
Change-Id: I5a560a1c37fbb4524737c943165d04d58f7d319a
diff --git a/camera/camera-camera2-pipe/src/main/java/androidx/camera/camera2/pipe/core/ProcessingQueue.kt b/camera/camera-camera2-pipe/src/main/java/androidx/camera/camera2/pipe/core/ProcessingQueue.kt
new file mode 100644
index 0000000..6eb5fe7
--- /dev/null
+++ b/camera/camera-camera2-pipe/src/main/java/androidx/camera/camera2/pipe/core/ProcessingQueue.kt
@@ -0,0 +1,173 @@
+/*
+ * Copyright 2024 The Android Open Source Project
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package androidx.camera.camera2.pipe.core
+
+import kotlinx.atomicfu.atomic
+import kotlinx.coroutines.CoroutineScope
+import kotlinx.coroutines.channels.Channel
+import kotlinx.coroutines.launch
+
+/**
+ * ProcessingQueue handles the sequential aggregation and processing of a running list of elements.
+ * It is designed to iteratively invoke the provided [process] function with a [MutableList] of
+ * elements that have been aggregated from the previous iteration. [process] is guaranteed to be
+ * invoked sequentially. [process] is expected to remove items from the [MutableList] that are no
+ * longer needed (or that have been processed). Items that are not removed will be present in
+ * subsequent [process] invocations.
+ *
+ * A key design consideration for this class, and the reason it does not operate on generic flows,
+ * is the handling of unprocessed elements, which may need to be handled and/or closed. This is
+ * non-trivial for buffered flows. [onUnprocessedElements] will be invoked synchronously with all
+ * un-processed items exactly once if there is a non-zero number of unprocessed elements when the
+ * ProcessingQueue scope is closed.
+ *
+ * Example Usage:
+ * ```
+ * class MyClass(scope: CoroutineScope) {
+ * private val processingQueue = ProcessingQueue<Int>(
+ * onUnprocessedElements = ::onUnprocessedElements
+ * process = ::processInts
+ * ).processIn(scope)
+ *
+ * fun processAnInt(value: Int) {
+ * processingQueue.emitChecked(value)
+ * }
+ *
+ * private suspend fun processInts(items: MutableList<Int>) {
+ * val first = items.removeFirst()
+ * println("Processing: $first")
+ * }
+ *
+ * private fun onUnprocessedElements(items: List<Int>) {
+ * println("Releasing unprocessed items: items")
+ * }
+ * }
+ * ```
+ *
+ * This class is thread safe.
+ */
+internal class ProcessingQueue<T>(
+ val capacity: Int = Channel.UNLIMITED,
+ private val onUnprocessedElements: (List<T>) -> Unit = {},
+ private val process: suspend (MutableList<T>) -> Unit
+) {
+ private val started = atomic(false)
+ private val channel = Channel<T>(capacity = capacity, onUndeliveredElement = { queue.add(it) })
+ private val queue = ArrayDeque<T>()
+
+ /** Emit an element into the queue, suspending if the queue is at capacity. */
+ suspend fun emit(element: T) {
+ channel.send(element)
+ }
+
+ /** Emit an element into the queue, throwing an exception if it is closed or at capacity. */
+ fun emitChecked(element: T) {
+ val result = channel.trySend(element)
+ check(result.isSuccess) { "Failed to emit item to ProcessingQueue!: $result" }
+ }
+
+ /**
+ * Synchronously emit an element into the queue. Returns false if closed or if the queue is at
+ * capacity.
+ */
+ fun tryEmit(element: T): Boolean {
+ return channel.trySend(element).isSuccess
+ }
+
+ private suspend fun processingLoop() {
+ try {
+ // The core loop is:
+ // 1. Wait for a new item in the channel.
+ // 2. Add all items that can be immediately received from the channel into queue.
+ // 3. Process items (maybe suspend)
+ // 4. If the queue of items is the same, assume processing did nothing and jump to 1.
+ // 5. If the queue of items is different, assume processing did something and jump to 2.
+
+ while (true) {
+ // Suspend until we receive a element from the channel
+ val element = channel.receive()
+ queue.add(element)
+
+ while (queue.isNotEmpty()) {
+ // Buffer any additional elements from the inputChannel that may have been sent
+ // during the last call to process
+ var nextResult = channel.tryReceive()
+ while (nextResult.isSuccess) {
+ queue.add(nextResult.getOrThrow())
+ nextResult = channel.tryReceive()
+ }
+
+ // Emit the list of elements. This may suspend, and the consumer may modify the
+ // list, which will be updated and sent back on the next iteration.
+ val size = queue.size
+ process(queue)
+ if (size == queue.size) {
+ break
+ }
+ }
+ }
+ } catch (e: Throwable) {
+ releaseUnprocessedElements(e)
+ throw e
+ }
+ }
+
+ private fun releaseUnprocessedElements(cause: Throwable?) {
+ // If we reach here, it means the scope that was driving the processing loop has been
+ // cancelled. It means that the last call to `processor` has exited. The first time
+ // that channel.close() is called, the `onUndeliveredElement` handler will be invoked
+ // with the item that was pending for delivery. This, however, does not include *all*
+ // of the items, and we may need to iterate and handle the remaining items that may
+ // still be in the channel.
+ if (channel.close(cause)) {
+
+ // After closing the channel, there may be remaining items in the channel that
+ // were sent after the receiving scope was closed. Read these items out and send
+ // them to the onUnpressedElements handler.
+ var nextResult = channel.tryReceive()
+ while (nextResult.isSuccess) {
+ queue.add(nextResult.getOrThrow())
+ nextResult = channel.tryReceive()
+ }
+
+ // Synchronously invoke the onUnprocessedElements handler with the remaining items.
+ if (queue.isNotEmpty()) {
+ onUnprocessedElements(queue.toMutableList())
+ queue.clear()
+ }
+ }
+ }
+
+ internal companion object {
+ /** Launch the processing loop in the provided processing scope. */
+ fun <T> ProcessingQueue<T>.processIn(scope: CoroutineScope): ProcessingQueue<T> {
+ check(started.compareAndSet(expect = false, update = true)) {
+ "ProcessingQueue cannot be re-started!"
+ }
+
+ // Launch the processing loop in the provided scope.
+ val job = scope.launch { processingLoop() }
+
+ // If the scope is already cancelled, then `process` will never be invoked. To ensure
+ // items are released, attempt to close the channel and release any remaining items.
+ if (job.isCancelled) {
+ releaseUnprocessedElements(null)
+ }
+ return this
+ }
+ }
+}
diff --git a/camera/camera-camera2-pipe/src/test/java/androidx/camera/camera2/pipe/core/ProcessingQueueTest.kt b/camera/camera-camera2-pipe/src/test/java/androidx/camera/camera2/pipe/core/ProcessingQueueTest.kt
new file mode 100644
index 0000000..60f5233
--- /dev/null
+++ b/camera/camera-camera2-pipe/src/test/java/androidx/camera/camera2/pipe/core/ProcessingQueueTest.kt
@@ -0,0 +1,290 @@
+/*
+ * Copyright 2024 The Android Open Source Project
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package androidx.camera.camera2.pipe.core
+
+import android.os.Build
+import androidx.camera.camera2.pipe.core.ProcessingQueue.Companion.processIn
+import com.google.common.truth.Truth.assertThat
+import kotlinx.coroutines.CoroutineExceptionHandler
+import kotlinx.coroutines.CoroutineScope
+import kotlinx.coroutines.ExperimentalCoroutinesApi
+import kotlinx.coroutines.Job
+import kotlinx.coroutines.cancel
+import kotlinx.coroutines.delay
+import kotlinx.coroutines.test.StandardTestDispatcher
+import kotlinx.coroutines.test.TestScope
+import kotlinx.coroutines.test.advanceTimeBy
+import kotlinx.coroutines.test.advanceUntilIdle
+import kotlinx.coroutines.test.runTest
+import org.junit.Test
+import org.junit.runner.RunWith
+import org.junit.runners.JUnit4
+import org.robolectric.annotation.Config
+
+@OptIn(ExperimentalCoroutinesApi::class)
+@RunWith(JUnit4::class)
+@Config(minSdk = Build.VERSION_CODES.LOLLIPOP)
+class ProcessingQueueTest {
+ private val testScope = TestScope()
+ private val processingScope =
+ CoroutineScope(
+ Job() +
+ StandardTestDispatcher(testScope.testScheduler) +
+ CoroutineExceptionHandler { _, throwable -> lastUncaughtException = throwable }
+ )
+
+ private var lastUncaughtException: Throwable? = null
+ private val unprocessedElements = mutableListOf<List<Int>>()
+ private val processingCalls = mutableListOf<List<Int>>()
+ private val unprocessElementHandler: (List<Int>) -> Unit = {
+ unprocessedElements.add(it.toMutableList())
+ }
+
+ @Test
+ fun processingQueueBuffersItems() =
+ testScope.runTest {
+ val processingQueue =
+ ProcessingQueue<Int>(
+ capacity = 2,
+ onUnprocessedElements = unprocessElementHandler
+ ) {}
+
+ assertThat(processingQueue.tryEmit(1)).isTrue()
+ assertThat(processingQueue.tryEmit(2)).isTrue()
+ assertThat(processingQueue.tryEmit(3)).isFalse() // Queue is full (2 items)
+ }
+
+ @Test
+ fun processInProcessesItems() =
+ testScope.runTest {
+ val processingQueue =
+ ProcessingQueue<Int>(
+ capacity = 2,
+ onUnprocessedElements = unprocessElementHandler
+ ) {
+ processingCalls.add(it.toMutableList())
+ it.removeAt(0)
+ }
+ .processIn(processingScope)
+
+ assertThat(processingQueue.tryEmit(1)).isTrue()
+ assertThat(processingQueue.tryEmit(2)).isTrue()
+ assertThat(processingQueue.tryEmit(3)).isFalse() // Queue is full
+
+ advanceUntilIdle() // Processing loop runs
+
+ // Processing loop receives [1, 2], removes 1, then is re-invoked with [2]
+ assertThat(processingCalls).containsExactly(listOf(1, 2), listOf(2))
+
+ processingScope.cancel()
+ }
+
+ @Test
+ fun processingQueueIterativelyProcessesElements() =
+ testScope.runTest {
+ val processingQueue =
+ ProcessingQueue<Int>(
+ capacity = 2,
+ onUnprocessedElements = unprocessElementHandler
+ ) {
+ processingCalls.add(it.toMutableList())
+ it.removeAt(0) // Mutation works
+ }
+ .processIn(processingScope)
+
+ processingQueue.tryEmit(1)
+ processingQueue.tryEmit(2)
+ advanceUntilIdle()
+
+ processingQueue.tryEmit(3)
+ advanceUntilIdle()
+
+ processingQueue.tryEmit(4)
+ processingQueue.tryEmit(5)
+ advanceUntilIdle()
+
+ // Processing loop run 5 times:
+ // [1, 2] (removes 1)
+ // [2] (removes 2)
+ // [3] (removes 3)
+ // [4, 5] (removes 4)
+ // [5] (removes 5)
+ assertThat(processingCalls)
+ .containsExactly(listOf(1, 2), listOf(2), listOf(3), listOf(4, 5), listOf(5))
+
+ processingScope.cancel()
+ }
+
+ @Test
+ fun processingQueueAggregatesElements() =
+ testScope.runTest {
+ val processingQueue =
+ ProcessingQueue<Int>(onUnprocessedElements = unprocessElementHandler) {
+ processingCalls.add(it.toMutableList())
+ }
+ .processIn(processingScope)
+
+ processingQueue.tryEmit(1)
+ processingQueue.tryEmit(2)
+ advanceUntilIdle()
+
+ processingQueue.tryEmit(3)
+ advanceUntilIdle()
+
+ processingQueue.tryEmit(4)
+ processingQueue.tryEmit(5)
+ advanceUntilIdle()
+
+ // Processing loop does not remove anything
+ assertThat(processingCalls)
+ .containsExactly(listOf(1, 2), listOf(1, 2, 3), listOf(1, 2, 3, 4, 5))
+
+ processingScope.cancel()
+ }
+
+ @Test
+ fun processInOnCanceledScopeInvokesOnUnprocessedElements() =
+ testScope.runTest {
+ val processingQueue =
+ ProcessingQueue<Int>(onUnprocessedElements = unprocessElementHandler) {
+ processingCalls.add(it.toMutableList())
+ it.clear()
+ }
+
+ processingQueue.tryEmit(1)
+ processingQueue.tryEmit(2)
+
+ processingScope.cancel()
+ processingQueue.processIn(processingScope)
+
+ // Processing loop does not receive anything
+ assertThat(processingCalls).isEmpty()
+ assertThat(unprocessedElements).containsExactly(listOf(1, 2))
+ }
+
+ @Test
+ fun cancellingProcessingScopeStopsProcessing() =
+ testScope.runTest {
+ val processingQueue =
+ ProcessingQueue<Int>(onUnprocessedElements = unprocessElementHandler) {
+ processingCalls.add(it.toMutableList())
+ it.clear()
+ }
+ .processIn(processingScope)
+
+ processingQueue.tryEmit(1)
+ processingQueue.tryEmit(2)
+ advanceUntilIdle()
+
+ assertThat(processingQueue.tryEmit(3)).isTrue() // Normal
+ assertThat(processingQueue.tryEmit(4)).isTrue() // Normal
+ processingScope.cancel()
+ assertThat(processingQueue.tryEmit(5)).isTrue() // Channel hasn't been closed
+ assertThat(processingQueue.tryEmit(6)).isTrue() // Channel hasn't been closed
+ advanceUntilIdle()
+
+ assertThat(processingQueue.tryEmit(7)).isFalse() // fails
+ assertThat(processingQueue.tryEmit(8)).isFalse() // fails
+
+ // Processing loop does not remove anything
+ assertThat(processingCalls)
+ .containsExactly(
+ listOf(1, 2),
+ )
+ // Processing loop does not remove anything
+ assertThat(unprocessedElements).containsExactly(listOf(3, 4, 5, 6))
+ }
+
+ @Test
+ fun longProcessingBlocksAggregateItems() =
+ testScope.runTest {
+ val processingQueue =
+ ProcessingQueue<Int>(onUnprocessedElements = unprocessElementHandler) {
+ processingCalls.add(it.toMutableList())
+ delay(100)
+ it.clear()
+ }
+ .processIn(processingScope)
+
+ processingQueue.emitChecked(1)
+ processingQueue.emitChecked(2)
+ processingQueue.emitChecked(3)
+ advanceTimeBy(50) // Triggers initial processing call
+
+ processingQueue.emitChecked(4)
+ processingQueue.emitChecked(5)
+ advanceTimeBy(25) // No updates, process function is still suspended
+
+ processingQueue.emitChecked(6)
+ advanceUntilIdle() // Last update includes all previous updates.
+
+ // Processing loop does not remove anything
+ assertThat(processingCalls)
+ .containsExactly(
+ listOf(1, 2, 3),
+ listOf(4, 5, 6),
+ )
+ processingScope.cancel()
+ }
+
+ @Test
+ fun exceptionsDuringProcessingArePropagated() =
+ testScope.runTest {
+ val processingQueue =
+ ProcessingQueue<Int>(onUnprocessedElements = unprocessElementHandler) {
+ processingCalls.add(it.toMutableList())
+ it.clear()
+ delay(100)
+ throw RuntimeException("Test")
+ }
+ .processIn(processingScope)
+
+ processingQueue.emitChecked(1)
+ processingQueue.emitChecked(2)
+ processingQueue.emitChecked(3)
+ advanceTimeBy(50) // Triggers initial processing call, but not exception
+
+ processingQueue.emitChecked(4)
+ processingQueue.emitChecked(5)
+ advanceUntilIdle() // Trigger exception.
+
+ assertThat(processingCalls).containsExactly(listOf(1, 2, 3))
+ assertThat(unprocessedElements).containsExactly(listOf(4, 5))
+ assertThat(lastUncaughtException).isInstanceOf(RuntimeException::class.java)
+ }
+
+ @Test
+ fun duplicateItemsAreNotOmitted() =
+ testScope.runTest {
+ val processingQueue =
+ ProcessingQueue<Int>(onUnprocessedElements = unprocessElementHandler) {
+ processingCalls.add(it.toMutableList())
+ it.clear()
+ }
+ .processIn(processingScope)
+
+ processingQueue.emitChecked(1)
+ processingQueue.emitChecked(1)
+ advanceUntilIdle()
+ processingQueue.emitChecked(1)
+ processingQueue.emitChecked(1)
+ processingQueue.emitChecked(1)
+ advanceUntilIdle()
+
+ assertThat(processingCalls).containsExactly(listOf(1, 1), listOf(1, 1, 1))
+ }
+}