Introduce a ProcessingQueue to CameraPipe to bulk process events.

This introduces a ProcessingQueue class that handles a common pattern
of enqueuing and processing lists of events on a background scope
using a channel.

Test: ./gradlew\
      :camera:camera-camera2-pipe:testDebugUnitTest\
      :camera:camera-camera2-pipe-testing:testDebugUnitTest\
      :camera:camera-camera2-pipe-integration:testDebugUnitTest

Change-Id: I5a560a1c37fbb4524737c943165d04d58f7d319a
diff --git a/camera/camera-camera2-pipe/src/main/java/androidx/camera/camera2/pipe/core/ProcessingQueue.kt b/camera/camera-camera2-pipe/src/main/java/androidx/camera/camera2/pipe/core/ProcessingQueue.kt
new file mode 100644
index 0000000..6eb5fe7
--- /dev/null
+++ b/camera/camera-camera2-pipe/src/main/java/androidx/camera/camera2/pipe/core/ProcessingQueue.kt
@@ -0,0 +1,173 @@
+/*
+ * Copyright 2024 The Android Open Source Project
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package androidx.camera.camera2.pipe.core
+
+import kotlinx.atomicfu.atomic
+import kotlinx.coroutines.CoroutineScope
+import kotlinx.coroutines.channels.Channel
+import kotlinx.coroutines.launch
+
+/**
+ * ProcessingQueue handles the sequential aggregation and processing of a running list of elements.
+ * It is designed to iteratively invoke the provided [process] function with a [MutableList] of
+ * elements that have been aggregated from the previous iteration. [process] is guaranteed to be
+ * invoked sequentially. [process] is expected to remove items from the [MutableList] that are no
+ * longer needed (or that have been processed). Items that are not removed will be present in
+ * subsequent [process] invocations.
+ *
+ * A key design consideration for this class, and the reason it does not operate on generic flows,
+ * is the handling of unprocessed elements, which may need to be handled and/or closed. This is
+ * non-trivial for buffered flows. [onUnprocessedElements] will be invoked synchronously with all
+ * un-processed items exactly once if there is a non-zero number of unprocessed elements when the
+ * ProcessingQueue scope is closed.
+ *
+ * Example Usage:
+ * ```
+ * class MyClass(scope: CoroutineScope) {
+ *   private val processingQueue = ProcessingQueue<Int>(
+ *     onUnprocessedElements = ::onUnprocessedElements
+ *     process = ::processInts
+ *   ).processIn(scope)
+ *
+ *   fun processAnInt(value: Int) {
+ *     processingQueue.emitChecked(value)
+ *   }
+ *
+ *   private suspend fun processInts(items: MutableList<Int>) {
+ *     val first = items.removeFirst()
+ *     println("Processing: $first")
+ *   }
+ *
+ *   private fun onUnprocessedElements(items: List<Int>) {
+ *     println("Releasing unprocessed items: items")
+ *   }
+ * }
+ * ```
+ *
+ * This class is thread safe.
+ */
+internal class ProcessingQueue<T>(
+    val capacity: Int = Channel.UNLIMITED,
+    private val onUnprocessedElements: (List<T>) -> Unit = {},
+    private val process: suspend (MutableList<T>) -> Unit
+) {
+    private val started = atomic(false)
+    private val channel = Channel<T>(capacity = capacity, onUndeliveredElement = { queue.add(it) })
+    private val queue = ArrayDeque<T>()
+
+    /** Emit an element into the queue, suspending if the queue is at capacity. */
+    suspend fun emit(element: T) {
+        channel.send(element)
+    }
+
+    /** Emit an element into the queue, throwing an exception if it is closed or at capacity. */
+    fun emitChecked(element: T) {
+        val result = channel.trySend(element)
+        check(result.isSuccess) { "Failed to emit item to ProcessingQueue!: $result" }
+    }
+
+    /**
+     * Synchronously emit an element into the queue. Returns false if closed or if the queue is at
+     * capacity.
+     */
+    fun tryEmit(element: T): Boolean {
+        return channel.trySend(element).isSuccess
+    }
+
+    private suspend fun processingLoop() {
+        try {
+            // The core loop is:
+            // 1. Wait for a new item in the channel.
+            // 2. Add all items that can be immediately received from the channel into queue.
+            // 3. Process items (maybe suspend)
+            // 4. If the queue of items is the same, assume processing did nothing and jump to 1.
+            // 5. If the queue of items is different, assume processing did something and jump to 2.
+
+            while (true) {
+                // Suspend until we receive a element from the channel
+                val element = channel.receive()
+                queue.add(element)
+
+                while (queue.isNotEmpty()) {
+                    // Buffer any additional elements from the inputChannel that may have been sent
+                    // during the last call to process
+                    var nextResult = channel.tryReceive()
+                    while (nextResult.isSuccess) {
+                        queue.add(nextResult.getOrThrow())
+                        nextResult = channel.tryReceive()
+                    }
+
+                    // Emit the list of elements. This may suspend, and the consumer may modify the
+                    // list, which will be updated and sent back on the next iteration.
+                    val size = queue.size
+                    process(queue)
+                    if (size == queue.size) {
+                        break
+                    }
+                }
+            }
+        } catch (e: Throwable) {
+            releaseUnprocessedElements(e)
+            throw e
+        }
+    }
+
+    private fun releaseUnprocessedElements(cause: Throwable?) {
+        // If we reach here, it means the scope that was driving the processing loop has been
+        // cancelled. It means that the last call to `processor` has exited. The first time
+        // that channel.close() is called, the `onUndeliveredElement` handler will be invoked
+        // with the item that was pending for delivery. This, however, does not include *all*
+        // of the items, and we may need to iterate and handle the remaining items that may
+        // still be in the channel.
+        if (channel.close(cause)) {
+
+            // After closing the channel, there may be remaining items in the channel that
+            // were sent after the receiving scope was closed. Read these items out and send
+            // them to the onUnpressedElements handler.
+            var nextResult = channel.tryReceive()
+            while (nextResult.isSuccess) {
+                queue.add(nextResult.getOrThrow())
+                nextResult = channel.tryReceive()
+            }
+
+            // Synchronously invoke the onUnprocessedElements handler with the remaining items.
+            if (queue.isNotEmpty()) {
+                onUnprocessedElements(queue.toMutableList())
+                queue.clear()
+            }
+        }
+    }
+
+    internal companion object {
+        /** Launch the processing loop in the provided processing scope. */
+        fun <T> ProcessingQueue<T>.processIn(scope: CoroutineScope): ProcessingQueue<T> {
+            check(started.compareAndSet(expect = false, update = true)) {
+                "ProcessingQueue cannot be re-started!"
+            }
+
+            // Launch the processing loop in the provided scope.
+            val job = scope.launch { processingLoop() }
+
+            // If the scope is already cancelled, then `process` will never be invoked. To ensure
+            // items are released, attempt to close the channel and release any remaining items.
+            if (job.isCancelled) {
+                releaseUnprocessedElements(null)
+            }
+            return this
+        }
+    }
+}
diff --git a/camera/camera-camera2-pipe/src/test/java/androidx/camera/camera2/pipe/core/ProcessingQueueTest.kt b/camera/camera-camera2-pipe/src/test/java/androidx/camera/camera2/pipe/core/ProcessingQueueTest.kt
new file mode 100644
index 0000000..60f5233
--- /dev/null
+++ b/camera/camera-camera2-pipe/src/test/java/androidx/camera/camera2/pipe/core/ProcessingQueueTest.kt
@@ -0,0 +1,290 @@
+/*
+ * Copyright 2024 The Android Open Source Project
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package androidx.camera.camera2.pipe.core
+
+import android.os.Build
+import androidx.camera.camera2.pipe.core.ProcessingQueue.Companion.processIn
+import com.google.common.truth.Truth.assertThat
+import kotlinx.coroutines.CoroutineExceptionHandler
+import kotlinx.coroutines.CoroutineScope
+import kotlinx.coroutines.ExperimentalCoroutinesApi
+import kotlinx.coroutines.Job
+import kotlinx.coroutines.cancel
+import kotlinx.coroutines.delay
+import kotlinx.coroutines.test.StandardTestDispatcher
+import kotlinx.coroutines.test.TestScope
+import kotlinx.coroutines.test.advanceTimeBy
+import kotlinx.coroutines.test.advanceUntilIdle
+import kotlinx.coroutines.test.runTest
+import org.junit.Test
+import org.junit.runner.RunWith
+import org.junit.runners.JUnit4
+import org.robolectric.annotation.Config
+
+@OptIn(ExperimentalCoroutinesApi::class)
+@RunWith(JUnit4::class)
+@Config(minSdk = Build.VERSION_CODES.LOLLIPOP)
+class ProcessingQueueTest {
+    private val testScope = TestScope()
+    private val processingScope =
+        CoroutineScope(
+            Job() +
+                StandardTestDispatcher(testScope.testScheduler) +
+                CoroutineExceptionHandler { _, throwable -> lastUncaughtException = throwable }
+        )
+
+    private var lastUncaughtException: Throwable? = null
+    private val unprocessedElements = mutableListOf<List<Int>>()
+    private val processingCalls = mutableListOf<List<Int>>()
+    private val unprocessElementHandler: (List<Int>) -> Unit = {
+        unprocessedElements.add(it.toMutableList())
+    }
+
+    @Test
+    fun processingQueueBuffersItems() =
+        testScope.runTest {
+            val processingQueue =
+                ProcessingQueue<Int>(
+                    capacity = 2,
+                    onUnprocessedElements = unprocessElementHandler
+                ) {}
+
+            assertThat(processingQueue.tryEmit(1)).isTrue()
+            assertThat(processingQueue.tryEmit(2)).isTrue()
+            assertThat(processingQueue.tryEmit(3)).isFalse() // Queue is full (2 items)
+        }
+
+    @Test
+    fun processInProcessesItems() =
+        testScope.runTest {
+            val processingQueue =
+                ProcessingQueue<Int>(
+                        capacity = 2,
+                        onUnprocessedElements = unprocessElementHandler
+                    ) {
+                        processingCalls.add(it.toMutableList())
+                        it.removeAt(0)
+                    }
+                    .processIn(processingScope)
+
+            assertThat(processingQueue.tryEmit(1)).isTrue()
+            assertThat(processingQueue.tryEmit(2)).isTrue()
+            assertThat(processingQueue.tryEmit(3)).isFalse() // Queue is full
+
+            advanceUntilIdle() // Processing loop runs
+
+            // Processing loop receives [1, 2], removes 1, then is re-invoked with [2]
+            assertThat(processingCalls).containsExactly(listOf(1, 2), listOf(2))
+
+            processingScope.cancel()
+        }
+
+    @Test
+    fun processingQueueIterativelyProcessesElements() =
+        testScope.runTest {
+            val processingQueue =
+                ProcessingQueue<Int>(
+                        capacity = 2,
+                        onUnprocessedElements = unprocessElementHandler
+                    ) {
+                        processingCalls.add(it.toMutableList())
+                        it.removeAt(0) // Mutation works
+                    }
+                    .processIn(processingScope)
+
+            processingQueue.tryEmit(1)
+            processingQueue.tryEmit(2)
+            advanceUntilIdle()
+
+            processingQueue.tryEmit(3)
+            advanceUntilIdle()
+
+            processingQueue.tryEmit(4)
+            processingQueue.tryEmit(5)
+            advanceUntilIdle()
+
+            // Processing loop run 5 times:
+            // [1, 2] (removes 1)
+            // [2] (removes 2)
+            // [3] (removes 3)
+            // [4, 5] (removes 4)
+            // [5] (removes 5)
+            assertThat(processingCalls)
+                .containsExactly(listOf(1, 2), listOf(2), listOf(3), listOf(4, 5), listOf(5))
+
+            processingScope.cancel()
+        }
+
+    @Test
+    fun processingQueueAggregatesElements() =
+        testScope.runTest {
+            val processingQueue =
+                ProcessingQueue<Int>(onUnprocessedElements = unprocessElementHandler) {
+                        processingCalls.add(it.toMutableList())
+                    }
+                    .processIn(processingScope)
+
+            processingQueue.tryEmit(1)
+            processingQueue.tryEmit(2)
+            advanceUntilIdle()
+
+            processingQueue.tryEmit(3)
+            advanceUntilIdle()
+
+            processingQueue.tryEmit(4)
+            processingQueue.tryEmit(5)
+            advanceUntilIdle()
+
+            // Processing loop does not remove anything
+            assertThat(processingCalls)
+                .containsExactly(listOf(1, 2), listOf(1, 2, 3), listOf(1, 2, 3, 4, 5))
+
+            processingScope.cancel()
+        }
+
+    @Test
+    fun processInOnCanceledScopeInvokesOnUnprocessedElements() =
+        testScope.runTest {
+            val processingQueue =
+                ProcessingQueue<Int>(onUnprocessedElements = unprocessElementHandler) {
+                    processingCalls.add(it.toMutableList())
+                    it.clear()
+                }
+
+            processingQueue.tryEmit(1)
+            processingQueue.tryEmit(2)
+
+            processingScope.cancel()
+            processingQueue.processIn(processingScope)
+
+            // Processing loop does not receive anything
+            assertThat(processingCalls).isEmpty()
+            assertThat(unprocessedElements).containsExactly(listOf(1, 2))
+        }
+
+    @Test
+    fun cancellingProcessingScopeStopsProcessing() =
+        testScope.runTest {
+            val processingQueue =
+                ProcessingQueue<Int>(onUnprocessedElements = unprocessElementHandler) {
+                        processingCalls.add(it.toMutableList())
+                        it.clear()
+                    }
+                    .processIn(processingScope)
+
+            processingQueue.tryEmit(1)
+            processingQueue.tryEmit(2)
+            advanceUntilIdle()
+
+            assertThat(processingQueue.tryEmit(3)).isTrue() // Normal
+            assertThat(processingQueue.tryEmit(4)).isTrue() // Normal
+            processingScope.cancel()
+            assertThat(processingQueue.tryEmit(5)).isTrue() // Channel hasn't been closed
+            assertThat(processingQueue.tryEmit(6)).isTrue() // Channel hasn't been closed
+            advanceUntilIdle()
+
+            assertThat(processingQueue.tryEmit(7)).isFalse() // fails
+            assertThat(processingQueue.tryEmit(8)).isFalse() // fails
+
+            // Processing loop does not remove anything
+            assertThat(processingCalls)
+                .containsExactly(
+                    listOf(1, 2),
+                )
+            // Processing loop does not remove anything
+            assertThat(unprocessedElements).containsExactly(listOf(3, 4, 5, 6))
+        }
+
+    @Test
+    fun longProcessingBlocksAggregateItems() =
+        testScope.runTest {
+            val processingQueue =
+                ProcessingQueue<Int>(onUnprocessedElements = unprocessElementHandler) {
+                        processingCalls.add(it.toMutableList())
+                        delay(100)
+                        it.clear()
+                    }
+                    .processIn(processingScope)
+
+            processingQueue.emitChecked(1)
+            processingQueue.emitChecked(2)
+            processingQueue.emitChecked(3)
+            advanceTimeBy(50) // Triggers initial processing call
+
+            processingQueue.emitChecked(4)
+            processingQueue.emitChecked(5)
+            advanceTimeBy(25) // No updates, process function is still suspended
+
+            processingQueue.emitChecked(6)
+            advanceUntilIdle() // Last update includes all previous updates.
+
+            // Processing loop does not remove anything
+            assertThat(processingCalls)
+                .containsExactly(
+                    listOf(1, 2, 3),
+                    listOf(4, 5, 6),
+                )
+            processingScope.cancel()
+        }
+
+    @Test
+    fun exceptionsDuringProcessingArePropagated() =
+        testScope.runTest {
+            val processingQueue =
+                ProcessingQueue<Int>(onUnprocessedElements = unprocessElementHandler) {
+                        processingCalls.add(it.toMutableList())
+                        it.clear()
+                        delay(100)
+                        throw RuntimeException("Test")
+                    }
+                    .processIn(processingScope)
+
+            processingQueue.emitChecked(1)
+            processingQueue.emitChecked(2)
+            processingQueue.emitChecked(3)
+            advanceTimeBy(50) // Triggers initial processing call, but not exception
+
+            processingQueue.emitChecked(4)
+            processingQueue.emitChecked(5)
+            advanceUntilIdle() // Trigger exception.
+
+            assertThat(processingCalls).containsExactly(listOf(1, 2, 3))
+            assertThat(unprocessedElements).containsExactly(listOf(4, 5))
+            assertThat(lastUncaughtException).isInstanceOf(RuntimeException::class.java)
+        }
+
+    @Test
+    fun duplicateItemsAreNotOmitted() =
+        testScope.runTest {
+            val processingQueue =
+                ProcessingQueue<Int>(onUnprocessedElements = unprocessElementHandler) {
+                        processingCalls.add(it.toMutableList())
+                        it.clear()
+                    }
+                    .processIn(processingScope)
+
+            processingQueue.emitChecked(1)
+            processingQueue.emitChecked(1)
+            advanceUntilIdle()
+            processingQueue.emitChecked(1)
+            processingQueue.emitChecked(1)
+            processingQueue.emitChecked(1)
+            advanceUntilIdle()
+
+            assertThat(processingCalls).containsExactly(listOf(1, 1), listOf(1, 1, 1))
+        }
+}