Add LoadResult.Invalid support for Paging2

This change adds LoadResult.Invalid support for Paging2 leveraging
PagingSource, such as LivePagedList and RxPagedList.

In the event of a PagingSource returning LoadResult.Invalid to its
PagedList, paging will detach the PagedList to stop attempts to load
on this PagedList and invalidate the PagingSource.

If no initial page is provided to PagedList.Builder and the initial
load fails with a LoadResult.Invalid, an IllegalStateException
will be thrown. To use a PagingSource that supports invalidation, use a
PagedList builder that accepts a factory method for PagingSource or
DataSource.Factory such as LivePagedList.

Bug: 192013267
Test: ./gradlew :paging:paging-common:test
Test: ./gradlew :paging:paging-runtime:cC
Test: ./gradlew :paging:paging-rxjava2:test
Test: ./gradlew :paging:paging-rxjava3:test
Change-Id: I97de7e67cc5fe01b2c5c5a8e13bd1926c5ca2cde
diff --git a/paging/rxjava3/build.gradle b/paging/rxjava3/build.gradle
index 5b5bf91..5755ccd 100644
--- a/paging/rxjava3/build.gradle
+++ b/paging/rxjava3/build.gradle
@@ -33,9 +33,11 @@
+    testImplementation(project(":internal-testutils-ktx"))
+    testImplementation(libs.truth)
diff --git a/paging/rxjava3/src/main/java/androidx/paging/rxjava3/RxPagedListBuilder.kt b/paging/rxjava3/src/main/java/androidx/paging/rxjava3/RxPagedListBuilder.kt
index 289b9b4a..d720c98 100644
--- a/paging/rxjava3/src/main/java/androidx/paging/rxjava3/RxPagedListBuilder.kt
+++ b/paging/rxjava3/src/main/java/androidx/paging/rxjava3/RxPagedListBuilder.kt
@@ -412,6 +412,13 @@
                 val lastKey = currentData.lastKey as Key?
                 val params = config.toRefreshLoadParams(lastKey)
                 when (val initialResult = pagingSource.load(params)) {
+                    is PagingSource.LoadResult.Invalid -> {
+                        currentData.setInitialLoadState(
+                            LoadType.REFRESH,
+                            LoadState.NotLoading(endOfPaginationReached = false)
+                        )
+                        pagingSource.invalidate()
+                    }
                     is PagingSource.LoadResult.Error -> {
diff --git a/paging/rxjava3/src/test/java/androidx/paging/RxPagedListBuilderTest.kt b/paging/rxjava3/src/test/java/androidx/paging/RxPagedListBuilderTest.kt
index 091672a..3e371e3 100644
--- a/paging/rxjava3/src/test/java/androidx/paging/RxPagedListBuilderTest.kt
+++ b/paging/rxjava3/src/test/java/androidx/paging/RxPagedListBuilderTest.kt
@@ -23,6 +23,8 @@
 import androidx.paging.LoadState.NotLoading
 import androidx.paging.LoadType.REFRESH
 import androidx.paging.rxjava3.RxPagedListBuilder
+import androidx.testutils.DirectDispatcher
+import androidx.testutils.TestDispatcher
 import io.reactivex.rxjava3.core.Observable
 import io.reactivex.rxjava3.observers.TestObserver
 import io.reactivex.rxjava3.schedulers.Schedulers
@@ -32,6 +34,10 @@
 import org.junit.runner.RunWith
 import org.junit.runners.JUnit4
 import kotlin.test.assertTrue
+import kotlinx.coroutines.CoroutineDispatcher
+import kotlinx.coroutines.asExecutor
+import kotlinx.coroutines.withContext
 class RxPagedListBuilderTest {
@@ -55,8 +61,10 @@
     class MockDataSourceFactory {
-        fun create(): PagingSource<Int, String> {
-            return MockPagingSource()
+        fun create(
+            loadDispatcher: CoroutineDispatcher = DirectDispatcher
+        ): PagingSource<Int, String> {
+            return MockPagingSource(loadDispatcher)
         var throwable: Throwable? = null
@@ -65,10 +73,25 @@
             throwable = EXCEPTION
-        private inner class MockPagingSource : PagingSource<Int, String>() {
-            override suspend fun load(params: LoadParams<Int>) = when (params) {
-                is LoadParams.Refresh -> loadInitial(params)
-                else -> loadRange()
+        inner class MockPagingSource(
+            // Allow explicit control of load calls outside of fetch / notify. Note: This is
+            // different from simply setting fetchDispatcher because PagingObservableOnSubscribe
+            // init happens on fetchDispatcher which makes it difficult to differentiate
+            // InitialPagedList.
+            val loadDispatcher: CoroutineDispatcher
+        ) : PagingSource<Int, String>() {
+            var invalidInitialLoad = false
+            override suspend fun load(params: LoadParams<Int>): LoadResult<Int, String> {
+                return withContext(loadDispatcher) {
+                    if (invalidInitialLoad) {
+                        invalidInitialLoad = false
+                        LoadResult.Invalid()
+                    } else when (params) {
+                        is LoadParams.Refresh -> loadInitial(params)
+                        else -> loadRange()
+                    }
+                }
             override fun getRefreshKey(state: PagingState<Int, String>): Int? = null
@@ -258,6 +281,105 @@
+    fun observablePagedList_invalidInitialResult() {
+        // this TestDispatcher is used to queue up pagingSource.load(). This allows us to control
+        // and assert against each load() attempt outside of fetch/notify dispatcher
+        val loadDispatcher = TestDispatcher()
+        val pagingSources = mutableListOf<MockDataSourceFactory.MockPagingSource>()
+        val factory = {
+            MockDataSourceFactory().create(loadDispatcher).also {
+                val source = it as MockDataSourceFactory.MockPagingSource
+                if (pagingSources.size == 0) source.invalidInitialLoad = true
+                pagingSources.add(source)
+            }
+        }
+        // this is essentially a direct scheduler so jobs are run immediately
+        val scheduler = Schedulers.from(DirectDispatcher.asExecutor())
+        val observable = RxPagedListBuilder(factory, 2)
+            .setFetchScheduler(scheduler)
+            .setNotifyScheduler(scheduler)
+            .buildObservable()
+        val observer = TestObserver<PagedList<String>>()
+        // subscribe triggers the PagingObservableOnSubscribe's invalidate() to create first
+        // pagingSource
+        observable.subscribe(observer)
+        // ensure the InitialPagedList with empty data is observed
+        observer.assertValueCount(1)
+        val initPagedList = observer.values()[0]!!
+        assertThat(initPagedList).isInstanceOf(
+        assertThat(initPagedList).isEmpty()
+        // ensure first pagingSource is also created at this point
+        assertThat(pagingSources.size).isEqualTo(1)
+        val loadStates = mutableListOf<LoadStateEvent>()
+        val loadStateChangedCallback = { type: LoadType, state: LoadState ->
+            if (type == REFRESH) {
+                loadStates.add(LoadStateEvent(type, state))
+            }
+        }
+        initPagedList.addWeakLoadStateListener(loadStateChangedCallback)
+        assertThat(loadStates).containsExactly(
+            // before first load() is called, REFRESH is set to loading, represents load
+            // attempt on first pagingSource
+            LoadStateEvent(REFRESH, Loading)
+        )
+        // execute first load, represents load attempt on first paging source
+        //
+        // using poll().run() instead of executeAll(), because executeAll() + immediate schedulers
+        // result in first load + subsequent loads executing immediately and we won't be able to
+        // assert the pagedLists/loads incrementally
+        loadDispatcher.queue.poll()?.run()
+        // the load failed so there should still be only one PagedList, but the first
+        // pagingSource should invalidated, and the second pagingSource is created
+        observer.assertValueCount(1)
+        assertTrue(pagingSources[0].invalid)
+        assertThat(pagingSources.size).isEqualTo(2)
+        assertThat(loadStates).containsExactly(
+            // the first load attempt
+            LoadStateEvent(REFRESH, Loading),
+            // LoadResult.Invalid resets RERFRESH state
+            LoadStateEvent(
+                REFRESH,
+                NotLoading(endOfPaginationReached = false)
+            ),
+            // before second load() is called, REFRESH is set to loading, represents load
+            // attempt on second pagingSource
+            LoadStateEvent(REFRESH, Loading),
+        )
+        // execute the load attempt on second pagingSource which succeeds
+        loadDispatcher.queue.poll()?.run()
+        // ensure second pagedList created with the correct data loaded
+        observer.assertValueCount(2)
+        val secondPagedList = observer.values()[1]
+        assertThat(secondPagedList).containsExactly("a", "b", null, null)
+        assertThat(secondPagedList).isNotInstanceOf(
+        assertThat(secondPagedList).isInstanceOf(
+        secondPagedList.addWeakLoadStateListener(loadStateChangedCallback)
+        assertThat(loadStates).containsExactly(
+            LoadStateEvent(REFRESH, Loading), // first load
+            LoadStateEvent(
+                REFRESH,
+                NotLoading(endOfPaginationReached = false)
+            ), // first load reset
+            LoadStateEvent(REFRESH, Loading), // second load
+            LoadStateEvent(
+                REFRESH,
+                NotLoading(endOfPaginationReached = false)
+            ), // second load succeeds
+        )
+    }
+    @Test
     fun instantiatesPagingSourceOnFetchDispatcher() {
         var pagingSourcesCreated = 0
         val pagingSourceFactory = {