blob: ed792d25fe7b534ebe24770c0faa148132d1401d [file] [log] [blame]
Justin Klaassen4d01eea2018-04-03 23:21:57 -04001/*
2 * Copyright (C) 2017 The Android Open Source Project
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17package androidx.lifecycle;
18
19import static org.hamcrest.CoreMatchers.is;
20import static org.hamcrest.MatcherAssert.assertThat;
21import static org.junit.Assert.assertEquals;
22import static org.junit.Assert.fail;
23
24import androidx.annotation.Nullable;
25import androidx.arch.core.executor.testing.InstantTaskExecutorRule;
26
27import org.junit.Before;
28import org.junit.Rule;
29import org.junit.Test;
30import org.junit.rules.TestRule;
31import org.reactivestreams.Subscriber;
32import org.reactivestreams.Subscription;
33
34import java.util.ArrayList;
35import java.util.Arrays;
36import java.util.Collections;
37import java.util.List;
38import java.util.concurrent.TimeUnit;
39
40import io.reactivex.Flowable;
41import io.reactivex.disposables.Disposable;
42import io.reactivex.functions.Consumer;
43import io.reactivex.processors.PublishProcessor;
44import io.reactivex.processors.ReplayProcessor;
45import io.reactivex.schedulers.TestScheduler;
46import io.reactivex.subjects.AsyncSubject;
47
48public class LiveDataReactiveStreamsTest {
49 @Rule public final TestRule instantTaskExecutorRule = new InstantTaskExecutorRule();
50
51 private LifecycleOwner mLifecycleOwner;
52
53 private final List<String> mLiveDataOutput = new ArrayList<>();
54 private final Observer<String> mObserver = new Observer<String>() {
55 @Override
56 public void onChanged(@Nullable String s) {
57 mLiveDataOutput.add(s);
58 }
59 };
60
61 private final ReplayProcessor<String> mOutputProcessor = ReplayProcessor.create();
62
63 private static final TestScheduler sBackgroundScheduler = new TestScheduler();
64
65 @Before
66 public void init() {
67 mLifecycleOwner = new LifecycleOwner() {
68 LifecycleRegistry mRegistry = new LifecycleRegistry(this);
69 {
70 mRegistry.handleLifecycleEvent(Lifecycle.Event.ON_RESUME);
71 }
72
73 @Override
74 public Lifecycle getLifecycle() {
75 return mRegistry;
76 }
77 };
78 }
79
80 @Test
81 public void convertsFromPublisher() {
82 PublishProcessor<String> processor = PublishProcessor.create();
83 LiveData<String> liveData = LiveDataReactiveStreams.fromPublisher(processor);
84
85 liveData.observe(mLifecycleOwner, mObserver);
86
87 processor.onNext("foo");
88 processor.onNext("bar");
89 processor.onNext("baz");
90
91 assertThat(mLiveDataOutput, is(Arrays.asList("foo", "bar", "baz")));
92 }
93
94 @Test
95 public void convertsFromPublisherSubscribeWithDelay() {
96 PublishProcessor<String> processor = PublishProcessor.create();
97 processor.delaySubscription(100, TimeUnit.SECONDS, sBackgroundScheduler);
98 LiveData<String> liveData = LiveDataReactiveStreams.fromPublisher(processor);
99
100 liveData.observe(mLifecycleOwner, mObserver);
101
102 processor.onNext("foo");
103 liveData.removeObserver(mObserver);
104 sBackgroundScheduler.triggerActions();
105 liveData.observe(mLifecycleOwner, mObserver);
106
107 processor.onNext("bar");
108 processor.onNext("baz");
109
110 assertThat(mLiveDataOutput, is(Arrays.asList("foo", "foo", "bar", "baz")));
111 }
112
113 @Test
114 public void convertsFromPublisherThrowsException() {
115 PublishProcessor<String> processor = PublishProcessor.create();
116 LiveData<String> liveData = LiveDataReactiveStreams.fromPublisher(processor);
117
118 liveData.observe(mLifecycleOwner, mObserver);
119
120 IllegalStateException exception = new IllegalStateException("test exception");
121 try {
122 processor.onError(exception);
123 fail("Runtime Exception expected");
124 } catch (RuntimeException ex) {
125 assertEquals(ex.getCause(), exception);
126 }
127 }
128
129 @Test
130 public void convertsFromPublisherWithMultipleObservers() {
131 final List<String> output2 = new ArrayList<>();
132 PublishProcessor<String> processor = PublishProcessor.create();
133 LiveData<String> liveData = LiveDataReactiveStreams.fromPublisher(processor);
134
135 liveData.observe(mLifecycleOwner, mObserver);
136
137 processor.onNext("foo");
138 processor.onNext("bar");
139
140 // The second observer should only get the newest value and any later values.
141 liveData.observe(mLifecycleOwner, new Observer<String>() {
142 @Override
143 public void onChanged(@Nullable String s) {
144 output2.add(s);
145 }
146 });
147
148 processor.onNext("baz");
149
150 assertThat(mLiveDataOutput, is(Arrays.asList("foo", "bar", "baz")));
151 assertThat(output2, is(Arrays.asList("bar", "baz")));
152 }
153
154 @Test
155 public void convertsFromPublisherWithMultipleObserversAfterInactive() {
156 final List<String> output2 = new ArrayList<>();
157 PublishProcessor<String> processor = PublishProcessor.create();
158 LiveData<String> liveData = LiveDataReactiveStreams.fromPublisher(processor);
159
160 liveData.observe(mLifecycleOwner, mObserver);
161
162 processor.onNext("foo");
163 processor.onNext("bar");
164
165 // The second observer should only get the newest value and any later values.
166 liveData.observe(mLifecycleOwner, new Observer<String>() {
167 @Override
168 public void onChanged(@Nullable String s) {
169 output2.add(s);
170 }
171 });
172
173 liveData.removeObserver(mObserver);
174 processor.onNext("baz");
175
176 assertThat(mLiveDataOutput, is(Arrays.asList("foo", "bar")));
177 assertThat(output2, is(Arrays.asList("bar", "baz")));
178 }
179
180 @Test
181 public void convertsFromPublisherAfterInactive() {
182 PublishProcessor<String> processor = PublishProcessor.create();
183 LiveData<String> liveData = LiveDataReactiveStreams.fromPublisher(processor);
184
185 liveData.observe(mLifecycleOwner, mObserver);
186 processor.onNext("foo");
187 liveData.removeObserver(mObserver);
188 processor.onNext("bar");
189
190 liveData.observe(mLifecycleOwner, mObserver);
191 processor.onNext("baz");
192
193 assertThat(mLiveDataOutput, is(Arrays.asList("foo", "foo", "baz")));
194 }
195
196 @Test
197 public void convertsFromPublisherManagesSubscriptions() {
198 PublishProcessor<String> processor = PublishProcessor.create();
199 LiveData<String> liveData = LiveDataReactiveStreams.fromPublisher(processor);
200
201 assertThat(processor.hasSubscribers(), is(false));
202 liveData.observe(mLifecycleOwner, mObserver);
203
204 // once the live data is active, there's a subscriber
205 assertThat(processor.hasSubscribers(), is(true));
206
207 liveData.removeObserver(mObserver);
208 // once the live data is inactive, the subscriber is removed
209 assertThat(processor.hasSubscribers(), is(false));
210 }
211
212 @Test
213 public void convertsFromAsyncPublisher() {
214 Flowable<String> input = Flowable.just("foo")
215 .concatWith(Flowable.just("bar", "baz").observeOn(sBackgroundScheduler));
216 LiveData<String> liveData = LiveDataReactiveStreams.fromPublisher(input);
217
218 liveData.observe(mLifecycleOwner, mObserver);
219
220 assertThat(mLiveDataOutput, is(Collections.singletonList("foo")));
221 sBackgroundScheduler.triggerActions();
222 assertThat(mLiveDataOutput, is(Arrays.asList("foo", "bar", "baz")));
223 }
224
225 @Test
226 public void convertsToPublisherWithSyncData() {
227 MutableLiveData<String> liveData = new MutableLiveData<>();
228 liveData.setValue("foo");
229 assertThat(liveData.getValue(), is("foo"));
230
231 Flowable.fromPublisher(LiveDataReactiveStreams.toPublisher(mLifecycleOwner, liveData))
232 .subscribe(mOutputProcessor);
233
234 liveData.setValue("bar");
235 liveData.setValue("baz");
236
237 assertThat(
238 mOutputProcessor.getValues(new String[]{}),
239 is(new String[]{"foo", "bar", "baz"}));
240 }
241
242 @Test
243 public void convertingToPublisherIsCancelable() {
244 MutableLiveData<String> liveData = new MutableLiveData<>();
245 liveData.setValue("foo");
246 assertThat(liveData.getValue(), is("foo"));
247
248 Disposable disposable = Flowable
249 .fromPublisher(LiveDataReactiveStreams.toPublisher(mLifecycleOwner, liveData))
250 .subscribe(new Consumer<String>() {
251 @Override
252 public void accept(String s) throws Exception {
253 mLiveDataOutput.add(s);
254 }
255 });
256
257 liveData.setValue("bar");
258 liveData.setValue("baz");
259
260 assertThat(liveData.hasObservers(), is(true));
261 disposable.dispose();
262
263 liveData.setValue("fizz");
264 liveData.setValue("buzz");
265
266 assertThat(mLiveDataOutput, is(Arrays.asList("foo", "bar", "baz")));
267 // Canceling disposable should also remove livedata mObserver.
268 assertThat(liveData.hasObservers(), is(false));
269 }
270
271 @Test
272 public void convertsToPublisherWithBackpressure() {
273 MutableLiveData<String> liveData = new MutableLiveData<>();
274
275 final AsyncSubject<Subscription> subscriptionSubject = AsyncSubject.create();
276
277 Flowable.fromPublisher(LiveDataReactiveStreams.toPublisher(mLifecycleOwner, liveData))
278 .subscribe(new Subscriber<String>() {
279 @Override
280 public void onSubscribe(Subscription s) {
281 subscriptionSubject.onNext(s);
282 subscriptionSubject.onComplete();
283 }
284
285 @Override
286 public void onNext(String s) {
287 mOutputProcessor.onNext(s);
288 }
289
290 @Override
291 public void onError(Throwable t) {
292 throw new RuntimeException(t);
293 }
294
295 @Override
296 public void onComplete() {
297 }
298 });
299
300 // Subscription should have happened synchronously. If it didn't, this will deadlock.
301 final Subscription subscription = subscriptionSubject.blockingSingle();
302
303 subscription.request(1);
304 assertThat(mOutputProcessor.getValues(new String[]{}), is(new String[]{}));
305
306 liveData.setValue("foo");
307 assertThat(mOutputProcessor.getValues(new String[]{}), is(new String[]{"foo"}));
308
309 subscription.request(2);
310 liveData.setValue("baz");
311 liveData.setValue("fizz");
312
313 assertThat(
314 mOutputProcessor.getValues(new String[]{}),
315 is(new String[]{"foo", "baz", "fizz"}));
316
317 // 'nyan' will be dropped as there is nothing currently requesting a stream.
318 liveData.setValue("nyan");
319 liveData.setValue("cat");
320
321 assertThat(
322 mOutputProcessor.getValues(new String[]{}),
323 is(new String[]{"foo", "baz", "fizz"}));
324
325 // When a new request comes in, the latest value will be pushed.
326 subscription.request(1);
327 assertThat(
328 mOutputProcessor.getValues(new String[]{}),
329 is(new String[]{"foo", "baz", "fizz", "cat"}));
330 }
331
332 @Test
333 public void convertsToPublisherWithAsyncData() {
334 MutableLiveData<String> liveData = new MutableLiveData<>();
335
336 Flowable.fromPublisher(LiveDataReactiveStreams.toPublisher(mLifecycleOwner, liveData))
337 .observeOn(sBackgroundScheduler)
338 .subscribe(mOutputProcessor);
339
340 liveData.setValue("foo");
341
342 assertThat(mOutputProcessor.getValues(new String[]{}), is(new String[]{}));
343 sBackgroundScheduler.triggerActions();
344 assertThat(mOutputProcessor.getValues(new String[]{}), is(new String[]{"foo"}));
345
346 liveData.setValue("bar");
347 liveData.setValue("baz");
348
349 assertThat(mOutputProcessor.getValues(new String[]{}), is(new String[]{"foo"}));
350 sBackgroundScheduler.triggerActions();
351 assertThat(mOutputProcessor.getValues(
352 new String[]{}),
353 is(new String[]{"foo", "bar", "baz"}));
354 }
355}