blob: 89af1f3562341105cd9fab794452421ef340ebe7 [file] [log] [blame]
/*
* Copyright (C) 2017 The Android Open Source Project
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package androidx.room;
import androidx.annotation.RestrictTo;
import androidx.arch.core.executor.ArchTaskExecutor;
import java.util.Set;
import java.util.concurrent.Callable;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import io.reactivex.BackpressureStrategy;
import io.reactivex.Flowable;
import io.reactivex.FlowableEmitter;
import io.reactivex.FlowableOnSubscribe;
import io.reactivex.Maybe;
import io.reactivex.MaybeSource;
import io.reactivex.Scheduler;
import io.reactivex.annotations.NonNull;
import io.reactivex.disposables.Disposable;
import io.reactivex.disposables.Disposables;
import io.reactivex.functions.Action;
import io.reactivex.functions.Function;
/**
* Helper class to add RxJava2 support to Room.
*/
@SuppressWarnings("WeakerAccess")
public class RxRoom {
/**
* Data dispatched by the publisher created by {@link #createFlowable(RoomDatabase, String...)}.
*/
public static final Object NOTHING = new Object();
/**
* Creates a {@link Flowable} that emits at least once and also re-emits whenever one of the
* observed tables is updated.
* <p>
* You can easily chain a database operation to downstream of this {@link Flowable} to ensure
* that it re-runs when database is modified.
* <p>
* Since database invalidation is batched, multiple changes in the database may results in just
* 1 emission.
*
* @param database The database instance
* @param tableNames The list of table names that should be observed
* @return A {@link Flowable} which emits {@link #NOTHING} when one of the observed tables
* is modified (also once when the invalidation tracker connection is established).
*/
public static Flowable<Object> createFlowable(final RoomDatabase database,
final String... tableNames) {
return Flowable.create(new FlowableOnSubscribe<Object>() {
@Override
public void subscribe(final FlowableEmitter<Object> emitter) throws Exception {
final InvalidationTracker.Observer observer = new InvalidationTracker.Observer(
tableNames) {
@Override
public void onInvalidated(@androidx.annotation.NonNull Set<String> tables) {
if (!emitter.isCancelled()) {
emitter.onNext(NOTHING);
}
}
};
if (!emitter.isCancelled()) {
database.getInvalidationTracker().addObserver(observer);
emitter.setDisposable(Disposables.fromAction(new Action() {
@Override
public void run() throws Exception {
database.getInvalidationTracker().removeObserver(observer);
}
}));
}
// emit once to avoid missing any data and also easy chaining
if (!emitter.isCancelled()) {
emitter.onNext(NOTHING);
}
}
}, BackpressureStrategy.LATEST);
}
/**
* Helper method used by generated code to bind a Callable such that it will be run in
* our disk io thread and will automatically block null values since RxJava2 does not like null.
*
* @hide
*/
@RestrictTo(RestrictTo.Scope.LIBRARY_GROUP)
public static <T> Flowable<T> createFlowable(final RoomDatabase database,
final String[] tableNames, final Callable<T> callable) {
final Maybe<T> maybe = Maybe.fromCallable(callable);
return createFlowable(database, tableNames).observeOn(sAppToolkitIOScheduler)
.flatMapMaybe(new Function<Object, MaybeSource<T>>() {
@Override
public MaybeSource<T> apply(Object o) throws Exception {
return maybe;
}
});
}
private static Scheduler sAppToolkitIOScheduler = new Scheduler() {
@Override
public Worker createWorker() {
final AtomicBoolean mDisposed = new AtomicBoolean(false);
return new Worker() {
@Override
public Disposable schedule(@NonNull Runnable run, long delay,
@NonNull TimeUnit unit) {
DisposableRunnable disposable = new DisposableRunnable(run, mDisposed);
ArchTaskExecutor.getInstance().executeOnDiskIO(run);
return disposable;
}
@Override
public void dispose() {
mDisposed.set(true);
}
@Override
public boolean isDisposed() {
return mDisposed.get();
}
};
}
};
private static class DisposableRunnable implements Disposable, Runnable {
private final Runnable mActual;
private volatile boolean mDisposed = false;
private final AtomicBoolean mGlobalDisposed;
DisposableRunnable(Runnable actual, AtomicBoolean globalDisposed) {
mActual = actual;
mGlobalDisposed = globalDisposed;
}
@Override
public void dispose() {
mDisposed = true;
}
@Override
public boolean isDisposed() {
return mDisposed || mGlobalDisposed.get();
}
@Override
public void run() {
if (!isDisposed()) {
mActual.run();
}
}
}
/** @deprecated This type should not be instantiated as it contains only static methods. */
@Deprecated
@SuppressWarnings("PrivateConstructorForUtilityClass")
public RxRoom() {
}
}