Implement LimitOffsetListenableFuturePagingSource
An abstract implementation of ListenableFuturePagingSource to be
implemented by Room. Supports Guava operations through ListenableFuture.
Test: ./gradlew room:room-paging-guava:cC
Bug: 203666906
Change-Id: I3ec746a072d28e856018d03aeb08931a8d42fa93
diff --git a/room/room-paging-guava/build.gradle b/room/room-paging-guava/build.gradle
index 5db83cb..8f23959 100644
--- a/room/room-paging-guava/build.gradle
+++ b/room/room-paging-guava/build.gradle
@@ -14,6 +14,7 @@
* limitations under the License.
*/
+import org.jetbrains.kotlin.gradle.tasks.KotlinCompile
import androidx.build.LibraryType
import androidx.build.Publish
@@ -23,9 +24,49 @@
id("org.jetbrains.kotlin.android")
}
+// If set to `true`, we'll use KSP instead of KAPT.
+// Note that the CI does not run tests with KSP yet so this is only for local usage.
+// Once variants are properly supported by both ksp and AndroidX, we'll add support for this.
+// (b/153917176)
+def useKsp = project.properties.getOrDefault("useKsp", "false").toBoolean()
+if (useKsp) {
+ apply plugin: "com.google.devtools.ksp"
+} else {
+ apply plugin: "kotlin-kapt"
+}
+
dependencies {
api(libs.kotlinStdlib)
- // Add dependencies here
+ implementation(project(":room:room-paging"))
+ implementation(project(":room:room-guava"))
+ implementation(projectOrArtifact(":paging:paging-guava"))
+
+ androidTestImplementation(libs.truth)
+ androidTestImplementation(libs.testExtJunitKtx)
+ androidTestImplementation(libs.testRunner)
+ androidTestImplementation(libs.kotlinTestJunit)
+ androidTestImplementation(libs.kotlinCoroutinesTest)
+ androidTestImplementation(libs.kotlinCoroutinesGuava)
+ androidTestImplementation("androidx.arch.core:core-testing:2.0.1")
+ androidTestImplementation(project(":internal-testutils-common"))
+ // depend on the shadowed version so that it tests with the shipped artifact
+ // this is a temporary attribute until KSP and AndroidX plugin supports variants.
+ if(useKsp) {
+ kspAndroidTest(
+ project(path: ":room:room-compiler", configuration: "shadowAndImplementation")
+ )
+ } else {
+ kaptAndroidTest(
+ project(path: ":room:room-compiler", configuration: "shadowAndImplementation")
+ )
+ }
+}
+
+// Allow usage of Kotlin's @OptIn.
+tasks.withType(KotlinCompile).configureEach {
+ kotlinOptions {
+ freeCompilerArgs += ["-opt-in=kotlin.RequiresOptIn"]
+ }
}
androidx {
diff --git a/room/room-paging-guava/src/androidTest/kotlin/androidx/room/paging/guava/LimitOffsetListenableFuturePagingSourceTest.kt b/room/room-paging-guava/src/androidTest/kotlin/androidx/room/paging/guava/LimitOffsetListenableFuturePagingSourceTest.kt
new file mode 100644
index 0000000..a9fd319
--- /dev/null
+++ b/room/room-paging-guava/src/androidTest/kotlin/androidx/room/paging/guava/LimitOffsetListenableFuturePagingSourceTest.kt
@@ -0,0 +1,886 @@
+/*
+ * Copyright 2022 The Android Open Source Project
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package androidx.room.paging.guava
+
+import android.database.Cursor
+import androidx.arch.core.executor.testing.CountingTaskExecutorRule
+import androidx.paging.LoadType
+import androidx.paging.PagingConfig
+import androidx.paging.PagingSource
+import androidx.paging.PagingSource.LoadResult
+import androidx.room.Dao
+import androidx.room.Database
+import androidx.room.Entity
+import androidx.room.Insert
+import androidx.room.PrimaryKey
+import androidx.room.Room
+import androidx.room.RoomDatabase
+import androidx.room.RoomSQLiteQuery
+import androidx.room.util.getColumnIndexOrThrow
+import androidx.sqlite.db.SimpleSQLiteQuery
+import androidx.test.core.app.ApplicationProvider
+import androidx.test.ext.junit.runners.AndroidJUnit4
+import androidx.test.filters.SmallTest
+import androidx.testutils.TestExecutor
+import com.google.common.truth.Truth.assertThat
+import com.google.common.truth.Truth.assertWithMessage
+import com.google.common.util.concurrent.FutureCallback
+import com.google.common.util.concurrent.Futures.addCallback
+import com.google.common.util.concurrent.ListenableFuture
+import java.util.LinkedList
+import java.util.concurrent.CancellationException
+import java.util.concurrent.Executor
+import java.util.concurrent.TimeUnit
+import kotlin.test.assertFailsWith
+import kotlin.test.assertFalse
+import kotlin.test.assertTrue
+import kotlinx.coroutines.ExperimentalCoroutinesApi
+import kotlinx.coroutines.guava.await
+import kotlinx.coroutines.test.runTest
+import org.junit.Rule
+import org.junit.Test
+import org.junit.runner.RunWith
+
+private const val tableName: String = "TestItem"
+
+@OptIn(ExperimentalCoroutinesApi::class)
+@RunWith(AndroidJUnit4::class)
+@SmallTest
+class LimitOffsetListenableFuturePagingSourceTest {
+
+ @JvmField
+ @Rule
+ val countingTaskExecutorRule = CountingTaskExecutorRule()
+
+ @Test
+ fun initialEmptyLoad_futureIsDone() = setupAndRun { db ->
+ val pagingSource = LimitOffsetListenableFuturePagingSourceImpl(db)
+
+ runTest {
+ val listenableFuture = pagingSource.refresh()
+ val page = listenableFuture.await() as LoadResult.Page
+
+ assertThat(page.data).isEmpty()
+ assertTrue(listenableFuture.isDone)
+ }
+ }
+
+ @Test
+ fun initialLoad_returnsFutureImmediately() =
+ setupAndRunWithTestExecutor { db, _, transactionExecutor ->
+ val pagingSource = LimitOffsetListenableFuturePagingSourceImpl(db)
+
+ val listenableFuture = pagingSource.refresh()
+ // ensure future is returned even as its result is still pending
+ assertFalse(listenableFuture.isDone)
+ assertThat(pagingSource.itemCount.get()).isEqualTo(-1)
+
+ // now execute db queries
+ transactionExecutor.executeAll() // initial transactional refresh load
+
+ val page = listenableFuture.await() as LoadResult.Page
+ assertThat(page.data).containsExactlyElementsIn(
+ ITEMS_LIST.subList(0, 15)
+ )
+ assertTrue(listenableFuture.isDone)
+ }
+
+ @Test
+ fun append_returnsFutureImmediately() =
+ setupAndRunWithTestExecutor { db, queryExecutor, _ ->
+ val pagingSource = LimitOffsetListenableFuturePagingSourceImpl(db)
+ pagingSource.itemCount.set(100) // bypass check for initial load
+
+ val listenableFuture = pagingSource.append(key = 20)
+ // ensure future is returned even as its result is still pending
+ assertFalse(listenableFuture.isDone)
+
+ // load append
+ queryExecutor.executeNext()
+
+ val page = listenableFuture.await() as LoadResult.Page
+ assertThat(page.data).containsExactlyElementsIn(
+ ITEMS_LIST.subList(20, 25)
+ )
+ assertTrue(listenableFuture.isDone)
+ }
+
+ @Test
+ fun prepend_returnsFutureImmediately() =
+ setupAndRunWithTestExecutor { db, queryExecutor, _ ->
+ val pagingSource = LimitOffsetListenableFuturePagingSourceImpl(db)
+ pagingSource.itemCount.set(100) // bypass check for initial load
+
+ val listenableFuture = pagingSource.prepend(key = 20)
+ // ensure future is returned even as its result is still pending
+ assertFalse(listenableFuture.isDone)
+
+ // load prepend
+ queryExecutor.executeNext()
+
+ val page = listenableFuture.await() as LoadResult.Page
+ assertThat(page.data).containsExactlyElementsIn(
+ ITEMS_LIST.subList(15, 20)
+ )
+ assertTrue(listenableFuture.isDone)
+ }
+
+ @Test
+ fun append_returnsInvalid() =
+ setupAndRunWithTestExecutor { db, queryExecutor, _ ->
+ val pagingSource = LimitOffsetListenableFuturePagingSourceImpl(db)
+ pagingSource.itemCount.set(100) // bypass check for initial load
+
+ val listenableFuture = pagingSource.append(key = 50)
+
+ pagingSource.invalidate() // imitate refreshVersionsAsync invalidating the PagingSource
+ assertTrue(pagingSource.invalid)
+
+ // executing the load Callable
+ queryExecutor.executeNext()
+
+ val result = listenableFuture.await()
+ assertThat(result).isInstanceOf(LoadResult.Invalid::class.java)
+ assertTrue(listenableFuture.isDone)
+ }
+
+ @Test
+ fun prepend_returnsInvalid() =
+ setupAndRunWithTestExecutor { db, queryExecutor, _ ->
+ val pagingSource = LimitOffsetListenableFuturePagingSourceImpl(db)
+ pagingSource.itemCount.set(100) // bypass check for initial load
+
+ val listenableFuture = pagingSource.prepend(key = 50)
+
+ pagingSource.invalidate() // imitate refreshVersionsAsync invalidating the PagingSource
+ assertTrue(pagingSource.invalid)
+
+ // executing the load Callable
+ queryExecutor.executeNext()
+
+ val result = listenableFuture.await()
+ assertThat(result).isInstanceOf(LoadResult.Invalid::class.java)
+ assertTrue(listenableFuture.isDone)
+ }
+
+ @Test
+ fun refresh_consecutively() = setupAndRun { db ->
+ db.dao.addAllItems(ITEMS_LIST)
+ val pagingSource = LimitOffsetListenableFuturePagingSourceImpl(db)
+ val pagingSource2 = LimitOffsetListenableFuturePagingSourceImpl(db)
+
+ val listenableFuture1 = pagingSource.refresh(key = 10)
+ val listenableFuture2 = pagingSource2.refresh(key = 15)
+
+ // check that first Future completes first. If the first future didn't complete first,
+ // this await() would not return.
+ val page1 = listenableFuture1.await() as LoadResult.Page
+ assertThat(page1.data).containsExactlyElementsIn(
+ ITEMS_LIST.subList(10, 25)
+ )
+
+ val page2 = listenableFuture2.await() as LoadResult.Page
+ assertThat(page2.data).containsExactlyElementsIn(
+ ITEMS_LIST.subList(15, 30)
+ )
+ }
+
+ @Test
+ fun append_consecutively() =
+ setupAndRunWithTestExecutor { db, queryExecutor, _ ->
+ val pagingSource = LimitOffsetListenableFuturePagingSourceImpl(db)
+ pagingSource.itemCount.set(100) // bypass check for initial load
+
+ assertThat(queryExecutor.queuedSize()).isEqualTo(0)
+
+ val listenableFuture1 = pagingSource.append(key = 10)
+ val listenableFuture2 = pagingSource.append(key = 15)
+
+ // both appends should be queued
+ assertThat(queryExecutor.queuedSize()).isEqualTo(2)
+
+ // run next append in queue and make sure it is the first append
+ queryExecutor.executeNext()
+ val page1 = listenableFuture1.await() as LoadResult.Page
+ assertThat(page1.data).containsExactlyElementsIn(
+ ITEMS_LIST.subList(10, 15)
+ )
+
+ // now run the second append
+ queryExecutor.executeNext()
+ val page2 = listenableFuture2.await() as LoadResult.Page
+ assertThat(page2.data).containsExactlyElementsIn(
+ ITEMS_LIST.subList(15, 20)
+ )
+ }
+
+ @Test
+ fun prepend_consecutively() =
+ setupAndRunWithTestExecutor { db, queryExecutor, _ ->
+ val pagingSource = LimitOffsetListenableFuturePagingSourceImpl(db)
+ pagingSource.itemCount.set(100) // bypass check for initial load
+
+ assertThat(queryExecutor.queuedSize()).isEqualTo(0)
+
+ val listenableFuture1 = pagingSource.prepend(key = 30)
+ val listenableFuture2 = pagingSource.prepend(key = 25)
+
+ // both prepends should be queued
+ assertThat(queryExecutor.queuedSize()).isEqualTo(2)
+
+ // run next prepend in queue and make sure it is the first prepend
+ queryExecutor.executeNext()
+ val page1 = listenableFuture1.await() as LoadResult.Page
+ assertThat(page1.data).containsExactlyElementsIn(
+ ITEMS_LIST.subList(25, 30)
+ )
+
+ // now run the second prepend
+ queryExecutor.executeNext()
+ val page2 = listenableFuture2.await() as LoadResult.Page
+ assertThat(page2.data).containsExactlyElementsIn(
+ ITEMS_LIST.subList(20, 25)
+ )
+ }
+
+ @Test
+ fun refresh_onSuccess() = setupAndRun { db ->
+ db.dao.addAllItems(ITEMS_LIST)
+ val pagingSource = LimitOffsetListenableFuturePagingSourceImpl(db)
+
+ val listenableFuture = pagingSource.refresh(key = 30)
+
+ var onSuccessReceived = false
+ val callbackExecutor = TestExecutor()
+ listenableFuture.onSuccess(callbackExecutor) { result ->
+ val page = result as LoadResult.Page
+ assertThat(page.data).containsExactlyElementsIn(
+ ITEMS_LIST.subList(30, 45)
+ )
+ onSuccessReceived = true
+ }
+
+ // wait until Room db's refresh load is complete
+ countingTaskExecutorRule.drainTasks(500, TimeUnit.MILLISECONDS)
+ assertTrue(listenableFuture.isDone)
+
+ callbackExecutor.executeAll()
+
+ // make sure onSuccess callback was executed
+ assertTrue(onSuccessReceived)
+ assertTrue(listenableFuture.isDone)
+ }
+
+ @Test
+ fun append_onSuccess() = setupAndRun { db ->
+ db.dao.addAllItems(ITEMS_LIST)
+ val pagingSource = LimitOffsetListenableFuturePagingSourceImpl(db)
+ pagingSource.itemCount.set(100) // bypass check for initial load
+
+ val listenableFuture = pagingSource.append(key = 20)
+ // ensure future is returned even as its result is still pending
+ assertFalse(listenableFuture.isDone)
+
+ var onSuccessReceived = false
+ val callbackExecutor = TestExecutor()
+ listenableFuture.onSuccess(callbackExecutor) { result ->
+ val page = result as LoadResult.Page
+ assertThat(page.data).containsExactlyElementsIn(
+ ITEMS_LIST.subList(20, 25)
+ )
+ onSuccessReceived = true
+ }
+ // let room db complete load
+ countingTaskExecutorRule.drainTasks(500, TimeUnit.MILLISECONDS)
+ callbackExecutor.executeAll()
+
+ // make sure onSuccess callback was executed
+ assertTrue(onSuccessReceived)
+ assertTrue(listenableFuture.isDone)
+ }
+
+ @Test
+ fun prepend_onSuccess() = setupAndRun { db ->
+ db.dao.addAllItems(ITEMS_LIST)
+ val pagingSource = LimitOffsetListenableFuturePagingSourceImpl(db)
+ pagingSource.itemCount.set(100) // bypass check for initial load
+
+ val listenableFuture = pagingSource.prepend(key = 40)
+ // ensure future is returned even as its result is still pending
+ assertFalse(listenableFuture.isDone)
+
+ var onSuccessReceived = false
+ val callbackExecutor = TestExecutor()
+ listenableFuture.onSuccess(callbackExecutor) { result ->
+ val page = result as LoadResult.Page
+ assertThat(page.data).containsExactlyElementsIn(
+ ITEMS_LIST.subList(35, 40)
+ )
+ onSuccessReceived = true
+ }
+ // let room db complete load
+ countingTaskExecutorRule.drainTasks(500, TimeUnit.MILLISECONDS)
+ callbackExecutor.executeAll()
+
+ // make sure onSuccess callback was executed
+ assertTrue(onSuccessReceived)
+ assertTrue(listenableFuture.isDone)
+ }
+
+ @Test
+ fun refresh_awaitThrowsCancellationException() =
+ setupAndRunWithTestExecutor { db, queryExecutor, transactionExecutor ->
+ val pagingSource = LimitOffsetListenableFuturePagingSourceImpl(db)
+
+ val listenableFuture = pagingSource.refresh(key = 50)
+ // the initial runInTransaction load
+ assertThat(transactionExecutor.queuedSize()).isEqualTo(1)
+
+ listenableFuture.cancel(true)
+
+ assertThat(queryExecutor.queuedSize()).isEqualTo(0)
+ assertThat(transactionExecutor.queuedSize()).isEqualTo(1)
+
+ transactionExecutor.executeNext() // initial load
+ queryExecutor.executeNext() // refreshVersionsAsync from the end runInTransaction
+
+ // await() should throw after cancellation
+ assertFailsWith<CancellationException> {
+ listenableFuture.await()
+ }
+
+ // executors should be idle
+ assertThat(queryExecutor.queuedSize()).isEqualTo(0)
+ assertThat(transactionExecutor.queuedSize()).isEqualTo(0)
+ assertTrue(listenableFuture.isDone)
+ // even though initial refresh load is cancelled, the paging source itself
+ // is NOT invalidated
+ assertFalse(pagingSource.invalid)
+ }
+
+ @Test
+ fun append_awaitThrowsCancellationException() =
+ setupAndRunWithTestExecutor { db, queryExecutor, _ ->
+ val pagingSource = LimitOffsetListenableFuturePagingSourceImpl(db)
+ pagingSource.itemCount.set(100) // bypass check for initial load
+
+ // queue up the append first
+ val listenableFuture = pagingSource.append(key = 20)
+ assertThat(queryExecutor.queuedSize()).isEqualTo(1)
+
+ listenableFuture.cancel(true)
+ queryExecutor.executeNext()
+
+ // await() should throw after cancellation
+ assertFailsWith<CancellationException> {
+ listenableFuture.await()
+ }
+
+ // although query was executed, it should not complete due to the cancellation signal.
+ // If query was completed, paging source would call refreshVersionsAsync manually
+ // and queuedSize() would be 1 instead of 0 with InvalidationTracker queued up
+ assertThat(queryExecutor.queuedSize()).isEqualTo(0)
+ }
+
+ @Test
+ fun prepend_awaitThrowsCancellationException() =
+ setupAndRunWithTestExecutor { db, queryExecutor, _ ->
+ val pagingSource = LimitOffsetListenableFuturePagingSourceImpl(db)
+ pagingSource.itemCount.set(100) // bypass check for initial load
+
+ // queue up the prepend first
+ val listenableFuture = pagingSource.prepend(key = 30)
+ assertThat(queryExecutor.queuedSize()).isEqualTo(1)
+
+ listenableFuture.cancel(true)
+ queryExecutor.executeNext()
+
+ // await() should throw after cancellation
+ assertFailsWith<CancellationException> {
+ listenableFuture.await()
+ }
+
+ // although query was executed, it should not complete due to the cancellation signal.
+ // If query was completed, paging source would call refreshVersionsAsync manually
+ // and queuedSize() would be 1 instead of 0 with InvalidationTracker queued up
+ assertThat(queryExecutor.queuedSize()).isEqualTo(0)
+ }
+
+ @Test
+ fun refresh_canceledFutureRunsOnFailureCallback() =
+ setupAndRunWithTestExecutor { db, _, transactionExecutor ->
+ val pagingSource = LimitOffsetListenableFuturePagingSourceImpl(db)
+
+ val listenableFuture = pagingSource.refresh(key = 30)
+ assertThat(transactionExecutor.queuedSize()).isEqualTo(1)
+
+ val callbackExecutor = TestExecutor()
+ var onFailureReceived = false
+ listenableFuture.onFailure(callbackExecutor) { throwable ->
+ assertThat(throwable).isInstanceOf(CancellationException::class.java)
+ onFailureReceived = true
+ }
+
+ // now cancel future and execute the refresh load. The refresh should not complete.
+ listenableFuture.cancel(true)
+ transactionExecutor.executeNext()
+ assertThat(transactionExecutor.queuedSize()).isEqualTo(0)
+
+ callbackExecutor.executeAll()
+
+ // make sure onFailure callback was executed
+ assertTrue(onFailureReceived)
+ assertTrue(listenableFuture.isDone)
+ }
+
+ @Test
+ fun append_canceledFutureRunsOnFailureCallback() =
+ setupAndRunWithTestExecutor { db, queryExecutor, _ ->
+ val pagingSource = LimitOffsetListenableFuturePagingSourceImpl(db)
+ pagingSource.itemCount.set(100) // bypass check for initial load
+
+ // queue up the append first
+ val listenableFuture = pagingSource.append(key = 20)
+ assertThat(queryExecutor.queuedSize()).isEqualTo(1)
+
+ val callbackExecutor = TestExecutor()
+ var onFailureReceived = false
+ listenableFuture.onFailure(callbackExecutor) { throwable ->
+ assertThat(throwable).isInstanceOf(CancellationException::class.java)
+ onFailureReceived = true
+ }
+
+ // now cancel future and execute the append load. The append should not complete.
+ listenableFuture.cancel(true)
+ queryExecutor.executeNext()
+ // if load was erroneously completed, InvalidationTracker would be queued
+ assertThat(queryExecutor.queuedSize()).isEqualTo(0)
+
+ callbackExecutor.executeAll()
+
+ // make sure onFailure callback was executed
+ assertTrue(onFailureReceived)
+ assertTrue(listenableFuture.isDone)
+ }
+
+ @Test
+ fun prepend_canceledFutureRunsOnFailureCallback() =
+ setupAndRunWithTestExecutor { db, queryExecutor, _ ->
+ val pagingSource = LimitOffsetListenableFuturePagingSourceImpl(db)
+ pagingSource.itemCount.set(100) // bypass check for initial load
+
+ // queue up the prepend first
+ val listenableFuture = pagingSource.prepend(key = 30)
+ assertThat(queryExecutor.queuedSize()).isEqualTo(1)
+
+ val callbackExecutor = TestExecutor()
+ var onFailureReceived = false
+ listenableFuture.onFailure(callbackExecutor) { throwable ->
+ assertThat(throwable).isInstanceOf(CancellationException::class.java)
+ onFailureReceived = true
+ }
+
+ // now cancel future and execute the prepend which should not complete.
+ listenableFuture.cancel(true)
+ queryExecutor.executeNext()
+ // if load was erroneously completed, InvalidationTracker would be queued
+ assertThat(queryExecutor.queuedSize()).isEqualTo(0)
+
+ callbackExecutor.executeAll()
+
+ // make sure onFailure callback was executed
+ assertTrue(onFailureReceived)
+ assertTrue(listenableFuture.isDone)
+ }
+
+ @Test
+ fun refresh_AfterCancellation() = setupAndRun { db ->
+ db.dao.addAllItems(ITEMS_LIST)
+ val pagingSource = LimitOffsetListenableFuturePagingSourceImpl(db)
+ pagingSource.itemCount.set(100) // bypass check for initial load
+
+ val listenableFuture = pagingSource.prepend(key = 50)
+
+ listenableFuture.cancel(true)
+ assertFailsWith<CancellationException> {
+ listenableFuture.await()
+ }
+
+ // new gen after query from previous gen was cancelled
+ val pagingSource2 = LimitOffsetListenableFuturePagingSourceImpl(db)
+ val listenableFuture2 = pagingSource2.refresh()
+ val result = listenableFuture2.await() as LoadResult.Page
+
+ // the new generation should load as usual
+ assertThat(result.data).containsExactlyElementsIn(
+ ITEMS_LIST.subList(0, 15)
+ )
+ }
+
+ @Test
+ fun appendAgain_afterFutureCanceled() = setupAndRun { db ->
+ db.dao.addAllItems(ITEMS_LIST)
+ val pagingSource = LimitOffsetListenableFuturePagingSourceImpl(db)
+ pagingSource.itemCount.set(100) // bypass check for initial load
+
+ val listenableFuture = pagingSource.append(key = 30)
+
+ listenableFuture.cancel(true)
+ assertFailsWith<CancellationException> {
+ listenableFuture.await()
+ }
+ assertTrue(listenableFuture.isDone)
+ assertFalse(pagingSource.invalid)
+
+ val listenableFuture2 = pagingSource.append(key = 30)
+
+ val result = listenableFuture2.await() as LoadResult.Page
+ assertThat(result.data).containsExactlyElementsIn(
+ ITEMS_LIST.subList(30, 35)
+ )
+ assertTrue(listenableFuture2.isDone)
+ }
+
+ @Test
+ fun prependAgain_afterFutureCanceled() = setupAndRun { db ->
+ db.dao.addAllItems(ITEMS_LIST)
+ val pagingSource = LimitOffsetListenableFuturePagingSourceImpl(db)
+ pagingSource.itemCount.set(100) // bypass check for initial load
+
+ val listenableFuture = pagingSource.prepend(key = 30)
+
+ listenableFuture.cancel(true)
+ assertFailsWith<CancellationException> {
+ listenableFuture.await()
+ }
+ assertFalse(pagingSource.invalid)
+ assertTrue(listenableFuture.isDone)
+
+ val listenableFuture2 = pagingSource.prepend(key = 30)
+
+ val result = listenableFuture2.await() as LoadResult.Page
+ assertThat(result.data).containsExactlyElementsIn(
+ ITEMS_LIST.subList(25, 30)
+ )
+ assertTrue(listenableFuture2.isDone)
+ }
+
+ @Test
+ fun test_jumpSupport() = setupAndRun { db ->
+ val pagingSource = LimitOffsetListenableFuturePagingSourceImpl(db)
+ assertTrue(pagingSource.jumpingSupported)
+ }
+
+ @Test
+ fun refresh_secondaryConstructor() = setupAndRun { db ->
+ val pagingSource = object : LimitOffsetListenableFuturePagingSource<TestItem>(
+ db = db,
+ supportSQLiteQuery = SimpleSQLiteQuery(
+ "SELECT * FROM $tableName ORDER BY id ASC"
+ )
+ ) {
+ override fun convertRows(cursor: Cursor): List<TestItem> {
+ return convertRowsHelper(cursor)
+ }
+ }
+
+ db.dao.addAllItems(ITEMS_LIST)
+ val listenableFuture = pagingSource.refresh()
+
+ val page = listenableFuture.await() as LoadResult.Page
+ assertThat(page.data).containsExactlyElementsIn(
+ ITEMS_LIST.subList(0, 15)
+ )
+ assertTrue(listenableFuture.isDone)
+ }
+
+ @Test
+ fun append_secondaryConstructor() = setupAndRun { db ->
+ val pagingSource = object : LimitOffsetListenableFuturePagingSource<TestItem>(
+ db = db,
+ supportSQLiteQuery = SimpleSQLiteQuery(
+ "SELECT * FROM $tableName ORDER BY id ASC"
+ )
+ ) {
+ override fun convertRows(cursor: Cursor): List<TestItem> {
+ return convertRowsHelper(cursor)
+ }
+ }
+
+ db.dao.addAllItems(ITEMS_LIST)
+ pagingSource.itemCount.set(100)
+ val listenableFuture = pagingSource.append(key = 50)
+
+ val page = listenableFuture.await() as LoadResult.Page
+ assertThat(page.data).containsExactlyElementsIn(
+ ITEMS_LIST.subList(50, 55)
+ )
+ assertTrue(listenableFuture.isDone)
+ }
+
+ @Test
+ fun prepend_secondaryConstructor() = setupAndRun { db ->
+ val pagingSource = object : LimitOffsetListenableFuturePagingSource<TestItem>(
+ db = db,
+ supportSQLiteQuery = SimpleSQLiteQuery(
+ "SELECT * FROM $tableName ORDER BY id ASC"
+ )
+ ) {
+ override fun convertRows(cursor: Cursor): List<TestItem> {
+ return convertRowsHelper(cursor)
+ }
+ }
+
+ db.dao.addAllItems(ITEMS_LIST)
+ pagingSource.itemCount.set(100)
+ val listenableFuture = pagingSource.prepend(key = 50)
+
+ val page = listenableFuture.await() as LoadResult.Page
+ assertThat(page.data).containsExactlyElementsIn(
+ ITEMS_LIST.subList(45, 50)
+ )
+ assertTrue(listenableFuture.isDone)
+ }
+
+ private fun setupAndRun(
+ test: suspend (LimitOffsetTestDb) -> Unit
+ ) {
+ val db = Room.inMemoryDatabaseBuilder(
+ ApplicationProvider.getApplicationContext(),
+ LimitOffsetTestDb::class.java
+ ).build()
+
+ runTest {
+ test(db)
+ }
+ tearDown(db)
+ }
+
+ private fun setupAndRunWithTestExecutor(
+ test: suspend (LimitOffsetTestDb, TestExecutor, TestExecutor) -> Unit
+ ) {
+ val queryExecutor = TestExecutor()
+ val transactionExecutor = TestExecutor()
+ val db = Room.inMemoryDatabaseBuilder(
+ ApplicationProvider.getApplicationContext(),
+ LimitOffsetTestDb::class.java
+ )
+ .setTransactionExecutor(transactionExecutor)
+ .setQueryExecutor(queryExecutor)
+ .build()
+
+ runTest {
+ db.dao.addAllItems(ITEMS_LIST)
+ queryExecutor.executeNext() // InvalidationTracker from the addAllItems
+ test(db, queryExecutor, transactionExecutor)
+ }
+ tearDown(db)
+ }
+
+ private fun tearDown(db: LimitOffsetTestDb) {
+ if (db.isOpen) db.close()
+ countingTaskExecutorRule.drainTasks(500, TimeUnit.MILLISECONDS)
+ assertThat(countingTaskExecutorRule.isIdle).isTrue()
+ }
+}
+
+private class LimitOffsetListenableFuturePagingSourceImpl(
+ db: RoomDatabase,
+ queryString: String = "SELECT * FROM $tableName ORDER BY id ASC",
+) : LimitOffsetListenableFuturePagingSource<TestItem>(
+ sourceQuery = RoomSQLiteQuery.acquire(
+ queryString,
+ 0
+ ),
+ db = db,
+ tables = arrayOf(tableName)
+) {
+ override fun convertRows(cursor: Cursor): List<TestItem> {
+ return convertRowsHelper(cursor)
+ }
+}
+
+private fun convertRowsHelper(cursor: Cursor): List<TestItem> {
+ val cursorIndexOfId = getColumnIndexOrThrow(cursor, "id")
+ val data = mutableListOf<TestItem>()
+ while (cursor.moveToNext()) {
+ val tmpId = cursor.getInt(cursorIndexOfId)
+ data.add(TestItem(tmpId))
+ }
+ return data
+}
+
+@Suppress("UNCHECKED_CAST")
+private fun TestExecutor.executeNext() {
+ val tasks = javaClass.getDeclaredField("mTasks").let {
+ it.isAccessible = true
+ it.get(this)
+ } as LinkedList<Runnable>
+
+ if (!tasks.isEmpty()) {
+ val task = tasks.poll()
+ task?.run()
+ }
+}
+
+@Suppress("UNCHECKED_CAST")
+private fun TestExecutor.queuedSize(): Int {
+ val tasks = javaClass.getDeclaredField("mTasks").let {
+ it.isAccessible = true
+ it.get(this)
+ } as LinkedList<Runnable>
+
+ return tasks.size
+}
+
+private fun LimitOffsetListenableFuturePagingSource<TestItem>.refresh(
+ key: Int? = null,
+): ListenableFuture<LoadResult<Int, TestItem>> {
+ return loadFuture(
+ createLoadParam(
+ loadType = LoadType.REFRESH,
+ key = key,
+ )
+ )
+}
+
+private fun LimitOffsetListenableFuturePagingSource<TestItem>.append(
+ key: Int? = -1,
+): ListenableFuture<LoadResult<Int, TestItem>> {
+ return loadFuture(
+ createLoadParam(
+ loadType = LoadType.APPEND,
+ key = key,
+ )
+ )
+}
+
+private fun LimitOffsetListenableFuturePagingSource<TestItem>.prepend(
+ key: Int? = -1,
+): ListenableFuture<LoadResult<Int, TestItem>> {
+ return loadFuture(
+ createLoadParam(
+ loadType = LoadType.PREPEND,
+ key = key,
+ )
+ )
+}
+
+private val CONFIG = PagingConfig(
+ pageSize = 5,
+ enablePlaceholders = true,
+ initialLoadSize = 15
+)
+
+private val ITEMS_LIST = createItemsForDb(0, 100)
+
+private fun createItemsForDb(startId: Int, count: Int): List<TestItem> {
+ return List(count) {
+ TestItem(
+ id = it + startId,
+ )
+ }
+}
+
+private fun createLoadParam(
+ loadType: LoadType,
+ key: Int? = null,
+ initialLoadSize: Int = CONFIG.initialLoadSize,
+ pageSize: Int = CONFIG.pageSize,
+ placeholdersEnabled: Boolean = CONFIG.enablePlaceholders
+): PagingSource.LoadParams<Int> {
+ return when (loadType) {
+ LoadType.REFRESH -> {
+ PagingSource.LoadParams.Refresh(
+ key = key,
+ loadSize = initialLoadSize,
+ placeholdersEnabled = placeholdersEnabled
+ )
+ }
+ LoadType.APPEND -> {
+ PagingSource.LoadParams.Append(
+ key = key ?: -1,
+ loadSize = pageSize,
+ placeholdersEnabled = placeholdersEnabled
+ )
+ }
+ LoadType.PREPEND -> {
+ PagingSource.LoadParams.Prepend(
+ key = key ?: -1,
+ loadSize = pageSize,
+ placeholdersEnabled = placeholdersEnabled
+ )
+ }
+ }
+}
+
+private fun ListenableFuture<LoadResult<Int, TestItem>>.onSuccess(
+ executor: Executor,
+ onSuccessCallback: (LoadResult<Int, TestItem>?) -> Unit,
+) {
+ addCallback(
+ this,
+ object : FutureCallback<LoadResult<Int, TestItem>> {
+ override fun onSuccess(result: LoadResult<Int, TestItem>?) {
+ onSuccessCallback(result)
+ }
+
+ override fun onFailure(t: Throwable) {
+ assertWithMessage("Expected onSuccess callback instead of onFailure, " +
+ "received ${t.localizedMessage}").fail()
+ }
+ },
+ executor
+ )
+}
+
+private fun ListenableFuture<LoadResult<Int, TestItem>>.onFailure(
+ executor: Executor,
+ onFailureCallback: (Throwable) -> Unit,
+) {
+ addCallback(
+ this,
+ object : FutureCallback<LoadResult<Int, TestItem>> {
+ override fun onSuccess(result: LoadResult<Int, TestItem>?) {
+ assertWithMessage("Expected onFailure callback instead of onSuccess, " +
+ "received result $result").fail()
+ }
+
+ override fun onFailure(t: Throwable) {
+ onFailureCallback(t)
+ }
+ },
+ executor
+ )
+}
+
+@Database(entities = [TestItem::class], version = 1, exportSchema = false)
+abstract class LimitOffsetTestDb : RoomDatabase() {
+ abstract val dao: TestItemDao
+}
+
+@Entity(tableName = "TestItem")
+data class TestItem(
+ @PrimaryKey val id: Int,
+ val value: String = "item $id"
+)
+
+@Dao
+interface TestItemDao {
+ @Insert
+ fun addAllItems(testItems: List<TestItem>)
+}
diff --git a/room/room-paging-guava/src/main/kotlin/androidx/room/paging/guava/LimitOffsetListenableFuturePagingSource.kt b/room/room-paging-guava/src/main/kotlin/androidx/room/paging/guava/LimitOffsetListenableFuturePagingSource.kt
new file mode 100644
index 0000000..fb2d209
--- /dev/null
+++ b/room/room-paging-guava/src/main/kotlin/androidx/room/paging/guava/LimitOffsetListenableFuturePagingSource.kt
@@ -0,0 +1,151 @@
+ /*
+ * Copyright 2022 The Android Open Source Project
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package androidx.room.paging.guava
+
+import android.database.Cursor
+import androidx.annotation.NonNull
+import androidx.annotation.RestrictTo
+import androidx.paging.ListenableFuturePagingSource
+import androidx.paging.PagingState
+import androidx.room.RoomDatabase
+import androidx.room.RoomSQLiteQuery
+import androidx.room.guava.GuavaRoom.createListenableFuture
+import androidx.room.paging.util.INITIAL_ITEM_COUNT
+import androidx.room.paging.util.INVALID
+import androidx.room.paging.util.ThreadSafeInvalidationObserver
+import androidx.room.paging.util.getClippedRefreshKey
+import androidx.room.paging.util.queryDatabase
+import androidx.room.paging.util.queryItemCount
+import androidx.room.util.createCancellationSignal
+import androidx.sqlite.db.SupportSQLiteQuery
+import com.google.common.util.concurrent.ListenableFuture
+import java.util.concurrent.Callable
+import java.util.concurrent.atomic.AtomicInteger
+
+@RestrictTo(RestrictTo.Scope.LIBRARY_GROUP)
+abstract class LimitOffsetListenableFuturePagingSource<Value : Any>(
+ private val sourceQuery: RoomSQLiteQuery,
+ private val db: RoomDatabase,
+ vararg tables: String
+) : ListenableFuturePagingSource<Int, Value>() {
+
+ constructor(
+ supportSQLiteQuery: SupportSQLiteQuery,
+ db: RoomDatabase,
+ vararg tables: String,
+ ) : this(
+ sourceQuery = RoomSQLiteQuery.copyFrom(supportSQLiteQuery),
+ db = db,
+ tables = tables,
+ )
+
+ // internal for testing visibility
+ internal val itemCount: AtomicInteger = AtomicInteger(INITIAL_ITEM_COUNT)
+ private val observer = ThreadSafeInvalidationObserver(tables = tables, ::invalidate)
+
+ /**
+ * Returns a [ListenableFuture] immediately before loading from the database completes
+ *
+ * If PagingSource is invalidated while the [ListenableFuture] is still pending, the
+ * invalidation will cancel the load() coroutine that calls await() on this future. The
+ * cancellation of await() will transitively cancel this future as well.
+ */
+ override fun loadFuture(params: LoadParams<Int>): ListenableFuture<LoadResult<Int, Value>> {
+ observer.registerIfNecessary(db)
+ val tempCount = itemCount.get()
+ return if (tempCount == INITIAL_ITEM_COUNT) {
+ initialLoad(params)
+ } else {
+ nonInitialLoad(params, tempCount)
+ }
+ }
+
+ /**
+ * For refresh loads
+ *
+ * To guarantee a valid initial load, it is run in transaction so that db writes cannot
+ * happen in between [queryItemCount] and [queryDatabase] to ensure a valid [itemCount].
+ * [itemCount] must be correct in order to calculate correct LIMIT and OFFSET for the query.
+ *
+ *
+ * However, the database load will be canceled via the cancellation signal if the future
+ * it returned has been canceled before it has completed.
+ */
+ private fun initialLoad(params: LoadParams<Int>): ListenableFuture<LoadResult<Int, Value>> {
+ val cancellationSignal = createCancellationSignal()
+ val loadCallable = Callable<LoadResult<Int, Value>> {
+ db.runInTransaction(
+ Callable {
+ val tempCount = queryItemCount(sourceQuery, db)
+ itemCount.set(tempCount)
+ queryDatabase(
+ params, sourceQuery, db, tempCount, cancellationSignal, ::convertRows
+ )
+ }
+ )
+ }
+
+ return createListenableFuture(
+ db,
+ true,
+ loadCallable,
+ sourceQuery,
+ false,
+ cancellationSignal,
+ )
+ }
+
+ /**
+ * For append and prepend loads
+ *
+ * The cancellation signal cancels room database operation if its running, or cancels it
+ * the moment it starts. The signal is triggered when the future is canceled.
+ */
+ private fun nonInitialLoad(
+ params: LoadParams<Int>,
+ tempCount: Int
+ ): ListenableFuture<LoadResult<Int, Value>> {
+ val cancellationSignal = createCancellationSignal()
+ val loadCallable = Callable<LoadResult<Int, Value>> {
+ val result = queryDatabase(
+ params, sourceQuery, db, tempCount, cancellationSignal, ::convertRows
+ )
+ db.invalidationTracker.refreshVersionsAsync()
+ @Suppress("UNCHECKED_CAST")
+ if (invalid) INVALID as LoadResult.Invalid<Int, Value> else result
+ }
+
+ return createListenableFuture(
+ db,
+ false,
+ loadCallable,
+ sourceQuery,
+ false,
+ cancellationSignal
+ )
+ }
+
+ @NonNull
+ protected abstract fun convertRows(cursor: Cursor): List<Value>
+
+ override val jumpingSupported: Boolean
+ get() = true
+
+ override fun getRefreshKey(state: PagingState<Int, Value>): Int? {
+ return state.getClippedRefreshKey()
+ }
+}
\ No newline at end of file
diff --git a/room/room-paging/build.gradle b/room/room-paging/build.gradle
index 8e6600f..84367bf 100644
--- a/room/room-paging/build.gradle
+++ b/room/room-paging/build.gradle
@@ -45,7 +45,7 @@
dependencies {
api(libs.kotlinStdlib)
- implementation(project(":room:room-runtime"))
+ api(project(":room:room-runtime"))
implementation(project(":room:room-ktx"))
api("androidx.paging:paging-common:3.1.0")
diff --git a/room/room-paging/src/main/kotlin/androidx/room/paging/LimitOffsetPagingSource.kt b/room/room-paging/src/main/kotlin/androidx/room/paging/LimitOffsetPagingSource.kt
index 023e5b6..6634131 100644
--- a/room/room-paging/src/main/kotlin/androidx/room/paging/LimitOffsetPagingSource.kt
+++ b/room/room-paging/src/main/kotlin/androidx/room/paging/LimitOffsetPagingSource.kt
@@ -24,6 +24,8 @@
import androidx.room.RoomDatabase
import androidx.room.RoomSQLiteQuery
import androidx.room.getQueryDispatcher
+import androidx.room.paging.util.INITIAL_ITEM_COUNT
+import androidx.room.paging.util.INVALID
import androidx.room.paging.util.ThreadSafeInvalidationObserver
import androidx.room.paging.util.getClippedRefreshKey
import androidx.room.paging.util.queryDatabase
@@ -40,9 +42,6 @@
* for Pager's consumption. Registers observers on tables lazily and automatically invalidates
* itself when data changes.
*/
-
-private val INVALID = PagingSource.LoadResult.Invalid<Any, Any>()
-
@RestrictTo(RestrictTo.Scope.LIBRARY_GROUP)
abstract class LimitOffsetPagingSource<Value : Any>(
private val sourceQuery: RoomSQLiteQuery,
@@ -60,7 +59,7 @@
tables = tables,
)
- internal val itemCount: AtomicInteger = AtomicInteger(-1)
+ internal val itemCount: AtomicInteger = AtomicInteger(INITIAL_ITEM_COUNT)
private val observer = ThreadSafeInvalidationObserver(
tables = tables,
@@ -72,7 +71,7 @@
observer.registerIfNecessary(db)
val tempCount = itemCount.get()
// if itemCount is < 0, then it is initial load
- if (tempCount < 0) {
+ if (tempCount == INITIAL_ITEM_COUNT) {
initialLoad(params)
} else {
nonInitialLoad(params, tempCount)
@@ -94,7 +93,13 @@
return db.withTransaction {
val tempCount = queryItemCount(sourceQuery, db)
itemCount.set(tempCount)
- queryDatabase(params, sourceQuery, db, tempCount, ::convertRows)
+ queryDatabase(
+ params = params,
+ sourceQuery = sourceQuery,
+ db = db,
+ itemCount = tempCount,
+ convertRows = ::convertRows
+ )
}
}
@@ -102,8 +107,14 @@
params: LoadParams<Int>,
tempCount: Int,
): LoadResult<Int, Value> {
- val loadResult = queryDatabase(params, sourceQuery, db, tempCount, ::convertRows)
- // manually check if database has been updated. If so, the observers's
+ val loadResult = queryDatabase(
+ params = params,
+ sourceQuery = sourceQuery,
+ db = db,
+ itemCount = tempCount,
+ convertRows = ::convertRows
+ )
+ // manually check if database has been updated. If so, the observer's
// invalidation callback will invalidate this paging source
db.invalidationTracker.refreshVersionsSync()
@Suppress("UNCHECKED_CAST")
diff --git a/room/room-paging/src/main/kotlin/androidx/room/paging/util/RoomPagingUtil.kt b/room/room-paging/src/main/kotlin/androidx/room/paging/util/RoomPagingUtil.kt
index 2511fc6..1722c11 100644
--- a/room/room-paging/src/main/kotlin/androidx/room/paging/util/RoomPagingUtil.kt
+++ b/room/room-paging/src/main/kotlin/androidx/room/paging/util/RoomPagingUtil.kt
@@ -18,6 +18,7 @@
package androidx.room.paging.util
import android.database.Cursor
+import android.os.CancellationSignal
import androidx.annotation.RestrictTo
import androidx.paging.PagingSource
import androidx.paging.PagingSource.LoadParams
@@ -30,6 +31,18 @@
import androidx.room.RoomSQLiteQuery
/**
+ * A [LoadResult] that can be returned to trigger a new generation of PagingSource
+ *
+ * Any loaded data or queued loads prior to returning INVALID will be discarded
+ */
+val INVALID = LoadResult.Invalid<Any, Any>()
+
+/**
+ * The default itemCount value
+ */
+const val INITIAL_ITEM_COUNT = -1
+
+/**
* Calculates query limit based on LoadType.
*
* Prepend: If requested loadSize is larger than available number of items to prepend, it will
@@ -102,6 +115,8 @@
* @param itemCount the db row count, triggers a new PagingSource generation if itemCount changes,
* i.e. items are added / removed
*
+ * @param cancellationSignal the signal to cancel the query if the query hasn't yet completed
+ *
* @param convertRows the function to iterate data with provided [Cursor] to return List<Value>
*/
fun <Value : Any> queryDatabase(
@@ -109,6 +124,7 @@
sourceQuery: RoomSQLiteQuery,
db: RoomDatabase,
itemCount: Int,
+ cancellationSignal: CancellationSignal? = null,
convertRows: (Cursor) -> List<Value>,
): LoadResult<Int, Value> {
val key = params.key ?: 0
@@ -121,7 +137,7 @@
sourceQuery.argCount
)
sqLiteQuery.copyArgumentsFrom(sourceQuery)
- val cursor = db.query(sqLiteQuery)
+ val cursor = db.query(sqLiteQuery, cancellationSignal)
val data: List<Value>
try {
data = convertRows(cursor)