blob: 89af1f3562341105cd9fab794452421ef340ebe7 [file] [log] [blame]
Justin Klaassen4d01eea2018-04-03 23:21:57 -04001/*
2 * Copyright (C) 2017 The Android Open Source Project
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17package androidx.room;
18
19import androidx.annotation.RestrictTo;
20import androidx.arch.core.executor.ArchTaskExecutor;
21
22import java.util.Set;
23import java.util.concurrent.Callable;
24import java.util.concurrent.TimeUnit;
25import java.util.concurrent.atomic.AtomicBoolean;
26
27import io.reactivex.BackpressureStrategy;
28import io.reactivex.Flowable;
29import io.reactivex.FlowableEmitter;
30import io.reactivex.FlowableOnSubscribe;
31import io.reactivex.Maybe;
32import io.reactivex.MaybeSource;
33import io.reactivex.Scheduler;
34import io.reactivex.annotations.NonNull;
35import io.reactivex.disposables.Disposable;
36import io.reactivex.disposables.Disposables;
37import io.reactivex.functions.Action;
38import io.reactivex.functions.Function;
39
40/**
41 * Helper class to add RxJava2 support to Room.
42 */
43@SuppressWarnings("WeakerAccess")
44public class RxRoom {
45 /**
46 * Data dispatched by the publisher created by {@link #createFlowable(RoomDatabase, String...)}.
47 */
48 public static final Object NOTHING = new Object();
49
50 /**
51 * Creates a {@link Flowable} that emits at least once and also re-emits whenever one of the
52 * observed tables is updated.
53 * <p>
54 * You can easily chain a database operation to downstream of this {@link Flowable} to ensure
55 * that it re-runs when database is modified.
56 * <p>
57 * Since database invalidation is batched, multiple changes in the database may results in just
58 * 1 emission.
59 *
60 * @param database The database instance
61 * @param tableNames The list of table names that should be observed
62 * @return A {@link Flowable} which emits {@link #NOTHING} when one of the observed tables
63 * is modified (also once when the invalidation tracker connection is established).
64 */
65 public static Flowable<Object> createFlowable(final RoomDatabase database,
66 final String... tableNames) {
67 return Flowable.create(new FlowableOnSubscribe<Object>() {
68 @Override
69 public void subscribe(final FlowableEmitter<Object> emitter) throws Exception {
70 final InvalidationTracker.Observer observer = new InvalidationTracker.Observer(
71 tableNames) {
72 @Override
73 public void onInvalidated(@androidx.annotation.NonNull Set<String> tables) {
74 if (!emitter.isCancelled()) {
75 emitter.onNext(NOTHING);
76 }
77 }
78 };
79 if (!emitter.isCancelled()) {
80 database.getInvalidationTracker().addObserver(observer);
81 emitter.setDisposable(Disposables.fromAction(new Action() {
82 @Override
83 public void run() throws Exception {
84 database.getInvalidationTracker().removeObserver(observer);
85 }
86 }));
87 }
88
89 // emit once to avoid missing any data and also easy chaining
90 if (!emitter.isCancelled()) {
91 emitter.onNext(NOTHING);
92 }
93 }
94 }, BackpressureStrategy.LATEST);
95 }
96
97 /**
98 * Helper method used by generated code to bind a Callable such that it will be run in
99 * our disk io thread and will automatically block null values since RxJava2 does not like null.
100 *
101 * @hide
102 */
103 @RestrictTo(RestrictTo.Scope.LIBRARY_GROUP)
104 public static <T> Flowable<T> createFlowable(final RoomDatabase database,
105 final String[] tableNames, final Callable<T> callable) {
106 final Maybe<T> maybe = Maybe.fromCallable(callable);
107 return createFlowable(database, tableNames).observeOn(sAppToolkitIOScheduler)
108 .flatMapMaybe(new Function<Object, MaybeSource<T>>() {
109 @Override
110 public MaybeSource<T> apply(Object o) throws Exception {
111 return maybe;
112 }
113 });
114 }
115
116 private static Scheduler sAppToolkitIOScheduler = new Scheduler() {
117 @Override
118 public Worker createWorker() {
119 final AtomicBoolean mDisposed = new AtomicBoolean(false);
120 return new Worker() {
121 @Override
122 public Disposable schedule(@NonNull Runnable run, long delay,
123 @NonNull TimeUnit unit) {
124 DisposableRunnable disposable = new DisposableRunnable(run, mDisposed);
125 ArchTaskExecutor.getInstance().executeOnDiskIO(run);
126 return disposable;
127 }
128
129 @Override
130 public void dispose() {
131 mDisposed.set(true);
132 }
133
134 @Override
135 public boolean isDisposed() {
136 return mDisposed.get();
137 }
138 };
139 }
140 };
141
142 private static class DisposableRunnable implements Disposable, Runnable {
143 private final Runnable mActual;
144 private volatile boolean mDisposed = false;
145 private final AtomicBoolean mGlobalDisposed;
146
147 DisposableRunnable(Runnable actual, AtomicBoolean globalDisposed) {
148 mActual = actual;
149 mGlobalDisposed = globalDisposed;
150 }
151
152 @Override
153 public void dispose() {
154 mDisposed = true;
155 }
156
157 @Override
158 public boolean isDisposed() {
159 return mDisposed || mGlobalDisposed.get();
160 }
161
162 @Override
163 public void run() {
164 if (!isDisposed()) {
165 mActual.run();
166 }
167 }
168 }
169
170 /** @deprecated This type should not be instantiated as it contains only static methods. */
171 @Deprecated
172 @SuppressWarnings("PrivateConstructorForUtilityClass")
173 public RxRoom() {
174 }
175}