下面列出了怎么用io.reactivex.rxjava3.subjects.PublishSubject的API类实例代码及写法,或者点击链接到github查看源代码。
@Test
public void disposePreventsFurtherEvents() throws Exception {
PublishSubject<Integer> subject = PublishSubject.create();
EventSource<Integer> source = RxEventSources.fromObservables(subject);
RecordingConsumer<Integer> consumer = new RecordingConsumer<>();
Disposable d = source.subscribe(consumer);
subject.onNext(1);
subject.onNext(2);
d.dispose();
subject.onNext(3);
consumer.waitForChange(50);
consumer.assertValues(1, 2);
}
@Test
public void processingLongEffectsDoesNotBlockProcessingShorterEffects() {
final List<String> effects = Arrays.asList("Hello", "Rx");
PublishSubject<String> upstream = PublishSubject.create();
Function<String, Integer> sleepyFunction =
s -> {
try {
Thread.sleep(duration(s));
} catch (InterruptedException ie) {
}
return s.length();
};
final List<Integer> results = new ArrayList<>();
upstream
.compose(Transformers.fromFunction(sleepyFunction, Schedulers.io()))
.subscribe(results::add);
Observable.fromIterable(effects).subscribe(upstream);
await().atMost(durationForEffects(effects)).until(() -> results.equals(expected(effects)));
}
@Before
public void setUp() throws Exception {
cConsumer = new TestConsumer<>();
dAction = new TestAction();
ObservableTransformer<TestEffect, TestEvent> router =
RxMobius.<TestEffect, TestEvent>subtypeEffectHandler()
.addTransformer(A.class, (Observable<A> as) -> as.map(a -> AEvent.create(a.id())))
.addTransformer(B.class, (Observable<B> bs) -> bs.map(b -> BEvent.create(b.id())))
.addConsumer(C.class, cConsumer)
.addAction(D.class, dAction)
.addFunction(E.class, e -> AEvent.create(e.id()))
.build();
publishSubject = PublishSubject.create();
testSubscriber = TestObserver.create();
publishSubject.compose(router).subscribe(testSubscriber);
}
@Test
public void effectHandlersShouldBeImmutable() throws Exception {
// redo some test setup for test case specific conditions
publishSubject = PublishSubject.create();
testSubscriber = TestObserver.create();
RxMobius.SubtypeEffectHandlerBuilder<TestEffect, TestEvent> builder =
RxMobius.<TestEffect, TestEvent>subtypeEffectHandler()
.addTransformer(A.class, (Observable<A> as) -> as.map(a -> AEvent.create(a.id())));
ObservableTransformer<TestEffect, TestEvent> router = builder.build();
// this should not lead to the effects router being capable of handling B effects
builder.addTransformer(B.class, bs -> bs.map(b -> BEvent.create(b.id())));
publishSubject.compose(router).subscribe(testSubscriber);
B effect = B.create(84);
publishSubject.onNext(effect);
publishSubject.onComplete();
testSubscriber.await();
testSubscriber.assertError(new UnknownEffectException(effect));
}
@Before
public void setUp() throws Exception {
input = PublishSubject.create();
connectable =
new Connectable<String, Integer>() {
@Nonnull
@Override
public Connection<String> connect(final Consumer<Integer> output)
throws ConnectionLimitExceededException {
return new Connection<String>() {
@Override
public void accept(String value) {
if (value.equals("crash")) {
throw new RuntimeException("crashing!");
}
output.accept(value.length());
}
@Override
public void dispose() {}
};
}
};
}
@Test public void initialValueToNewSubscriberAfterUnsubscribe() {
PublishSubject<String> subject = PublishSubject.create();
Observable<String> observable = subject.compose(ReplayingShare.<String>instance());
TestObserver<String> observer1 = new TestObserver<>();
observable.subscribe(observer1);
observer1.assertNoValues();
subject.onNext("Foo");
observer1.assertValues("Foo");
observer1.dispose();
TestObserver<String> observer2 = new TestObserver<>();
observable.subscribe(observer2);
observer2.assertValues("Foo");
}
@Test public void valueMissedWhenNoSubscribers() {
PublishSubject<String> subject = PublishSubject.create();
Observable<String> observable = subject.compose(ReplayingShare.<String>instance());
TestObserver<String> observer1 = new TestObserver<>();
observable.subscribe(observer1);
observer1.assertNoValues();
observer1.dispose();
subject.onNext("Foo");
observer1.assertNoValues();
TestObserver<String> observer2 = new TestObserver<>();
observable.subscribe(observer2);
observer2.assertNoValues();
}
@SuppressWarnings("CheckReturnValue")
@Test public void fatalExceptionDuringReplayThrown() {
PublishSubject<String> subject = PublishSubject.create();
Observable<String> observable = subject.compose(ReplayingShare.<String>instance());
observable.subscribe();
subject.onNext("Foo");
Consumer<String> brokenAction = new Consumer<String>() {
@Override public void accept(String s) {
throw new OutOfMemoryError("broken!");
}
};
try {
observable.subscribe(brokenAction);
fail();
} catch (OutOfMemoryError e) {
assertEquals("broken!", e.getMessage());
}
}
@Ignore("No backpressure in Observable")
@Test public void backpressureHonoredWhenCached() {
PublishSubject<String> subject = PublishSubject.create();
Observable<String> observable = subject.compose(ReplayingShare.<String>instance());
TestObserver<String> observer1 = new TestObserver<>();
observable.subscribe(observer1);
observer1.assertNoValues();
subject.onNext("Foo");
observer1.assertValues("Foo");
TestObserver<String> observer2 = new TestObserver<>(/*0*/);
observable.subscribe(observer2);
observer2.assertNoValues();
subject.onNext("Bar"); // Replace the cached value...
observer1.assertValues("Foo", "Bar");
//observer2.requestMore(1); // ...and ensure new requests see it.
observer2.assertValues("Bar");
}
@Test public void streamsDoNotShareInstances() {
PublishSubject<String> subjectA = PublishSubject.create();
Observable<String> observableA = subjectA.compose(ReplayingShare.<String>instance());
TestObserver<String> observerA1 = new TestObserver<>();
observableA.subscribe(observerA1);
PublishSubject<String> subjectB = PublishSubject.create();
Observable<String> observableB = subjectB.compose(ReplayingShare.<String>instance());
TestObserver<String> observerB1 = new TestObserver<>();
observableB.subscribe(observerB1);
subjectA.onNext("Foo");
observerA1.assertValues("Foo");
subjectB.onNext("Bar");
observerB1.assertValues("Bar");
TestObserver<String> observerA2 = new TestObserver<>();
observableA.subscribe(observerA2);
observerA2.assertValues("Foo");
TestObserver<String> observerB2 = new TestObserver<>();
observableB.subscribe(observerB2);
observerB2.assertValues("Bar");
}
@Test public void completeClearsCacheAndResubscribes() {
List<String> start = new ArrayList<>();
start.add("initA");
PublishSubject<String> upstream = PublishSubject.create();
Observable<String> replayed = upstream.startWithIterable(start).compose(ReplayingShare.<String>instance());
TestObserver<String> observer1 = new TestObserver<>();
replayed.subscribe(observer1);
observer1.assertValues("initA");
TestObserver<String> observer2 = new TestObserver<>();
replayed.subscribe(observer2);
observer1.assertValues("initA");
upstream.onComplete();
observer1.assertComplete();
observer2.assertComplete();
start.set(0, "initB");
TestObserver<String> observer3 = new TestObserver<>();
replayed.subscribe(observer3);
observer3.assertValues("initB");
}
@Test public void errorClearsCacheAndResubscribes() {
List<String> start = new ArrayList<>();
start.add("initA");
PublishSubject<String> upstream = PublishSubject.create();
Observable<String> replayed = upstream.startWithIterable(start).compose(ReplayingShare.<String>instance());
TestObserver<String> observer1 = new TestObserver<>();
replayed.subscribe(observer1);
observer1.assertValues("initA");
TestObserver<String> observer2 = new TestObserver<>();
replayed.subscribe(observer2);
observer1.assertValues("initA");
RuntimeException r = new RuntimeException();
upstream.onError(r);
observer1.assertError(r);
observer2.assertError(r);
start.set(0, "initB");
TestObserver<String> observer3 = new TestObserver<>();
replayed.subscribe(observer3);
observer3.assertValues("initB");
}
@Test public void completeClearsCacheAndResubscribesStartingWithDefault() {
List<String> start = new ArrayList<>();
start.add("initA");
PublishSubject<String> upstream = PublishSubject.create();
Observable<String> replayed =
upstream.startWithIterable(start).compose(ReplayingShare.createWithDefault("default"));
TestObserver<String> observer1 = new TestObserver<>();
replayed.subscribe(observer1);
observer1.assertValues("default", "initA");
TestObserver<String> observer2 = new TestObserver<>();
replayed.subscribe(observer2);
observer1.assertValues("default", "initA");
upstream.onComplete();
observer1.assertComplete();
observer2.assertComplete();
start.set(0, "initB");
TestObserver<String> observer3 = new TestObserver<>();
replayed.subscribe(observer3);
observer3.assertValues("default", "initB");
}
@Test
public void eventOutOfLifecycle() {
PublishSubject<String> stream = PublishSubject.create();
PublishSubject<String> lifecycle = PublishSubject.create();
TestObserver<String> testObserver = stream
.compose(RxLifecycle.<String, String>bind(lifecycle, CORRESPONDING_EVENTS))
.test();
// Event is out of lifecycle, but this just results in completing the stream
lifecycle.onNext("destroy");
stream.onNext("1");
testObserver.assertNoValues();
testObserver.assertComplete();
}
@Test
public void effectPerformerRunsActionWheneverEffectIsRequested() throws Exception {
PublishSubject<String> upstream = PublishSubject.create();
TestAction action = new TestAction();
upstream.compose(Transformers.fromAction(action)).subscribe();
upstream.onNext("First Time");
assertThat(action.getRunCount(), is(1));
upstream.onNext("One more!");
assertThat(action.getRunCount(), is(2));
}
@Test
public void effectPerformerRunsActionOnSchedulerWheneverEffectIsRequested() throws Exception {
PublishSubject<String> upstream = PublishSubject.create();
TestAction action = new TestAction();
TestScheduler scheduler = new TestScheduler();
upstream.compose(Transformers.fromAction(action, scheduler)).subscribe();
upstream.onNext("First Time");
assertThat(action.getRunCount(), is(0));
scheduler.triggerActions();
assertThat(action.getRunCount(), is(1));
}
@Test
public void effectPerformerInvokesConsumerAndPassesTheRequestedEffect() throws Exception {
PublishSubject<String> upstream = PublishSubject.create();
TestConsumer<String> consumer = new TestConsumer<>();
upstream.compose(Transformers.fromConsumer(consumer)).subscribe();
upstream.onNext("First Time");
assertThat(consumer.getCurrentValue(), is("First Time"));
upstream.onNext("Do it again!");
assertThat(consumer.getCurrentValue(), is("Do it again!"));
}
@Test
public void effectPerformerInvokesConsumerOnSchedulerAndPassesTheRequestedEffect()
throws Exception {
PublishSubject<String> upstream = PublishSubject.create();
TestConsumer<String> consumer = new TestConsumer<>();
TestScheduler scheduler = new TestScheduler();
upstream.compose(Transformers.fromConsumer(consumer, scheduler)).subscribe();
upstream.onNext("First Time");
assertThat(consumer.getCurrentValue(), is(equalTo(null)));
scheduler.triggerActions();
assertThat(consumer.getCurrentValue(), is("First Time"));
}
@Test
public void effectPerformerInvokesFunctionWithReceivedEffectAndEmitsReturnedEvents() {
PublishSubject<String> upstream = PublishSubject.create();
TestScheduler scheduler = new TestScheduler();
Function<String, Integer> function = String::length;
TestObserver<Integer> observer =
upstream.compose(Transformers.fromFunction(function, scheduler)).test();
upstream.onNext("Hello");
scheduler.triggerActions();
observer.assertValue(5);
}
@Test
public void effectPerformerInvokesFunctionWithReceivedEffectAndErrorsForUnhandledExceptions() {
PublishSubject<String> upstream = PublishSubject.create();
TestScheduler scheduler = new TestScheduler();
Function<String, Integer> function =
s -> {
throw new RuntimeException("Something bad happened");
};
TestObserver<Integer> observer =
upstream.compose(Transformers.fromFunction(function, scheduler)).test();
upstream.onNext("Hello");
scheduler.triggerActions();
observer.assertError(RuntimeException.class);
}
@Test
public void shouldPropagateIncomingErrorsAsUnrecoverable() throws Exception {
final RxMobiusLoop<Integer, String, Boolean> loop =
new RxMobiusLoop<>(builder, "", Collections.emptySet());
PublishSubject<Integer> input = PublishSubject.create();
TestObserver<String> subscriber = input.compose(loop).test();
Exception expected = new RuntimeException("expected");
input.onError(expected);
subscriber.awaitDone(1, TimeUnit.SECONDS);
subscriber.assertError(new UnrecoverableIncomingException(expected));
assertEquals(0, connection.valueCount());
}
@Test
public void shouldHandleNullRxJavaErrorHandler() throws Exception {
// given no RxJava error handler
RxJavaPlugins.setErrorHandler(null);
// and a router with a broken effect handler
publishSubject = PublishSubject.create();
testSubscriber = TestObserver.create();
final RuntimeException expected = new RuntimeException("expected!");
ObservableTransformer<TestEffect, TestEvent> router =
RxMobius.<TestEffect, TestEvent>subtypeEffectHandler()
.addFunction(
A.class,
a -> {
throw expected;
})
.build();
publishSubject.compose(router).subscribe(testSubscriber);
// when an event is sent, it doesn't crash (the exception does get printed to stderr)
publishSubject.onNext(A.create(1));
// and the right exception is forwarded to the test subscriber
testSubscriber.assertError(t -> t == expected);
}
@Test public void noInitialValue() {
PublishSubject<String> subject = PublishSubject.create();
Observable<String> observable = subject.compose(ReplayingShare.<String>instance());
TestObserver<String> observer = new TestObserver<>();
observable.subscribe(observer);
observer.assertNoValues();
}
@Test public void initialValueToNewSubscriber() {
PublishSubject<String> subject = PublishSubject.create();
Observable<String> observable = subject.compose(ReplayingShare.<String>instance());
TestObserver<String> observer1 = new TestObserver<>();
observable.subscribe(observer1);
observer1.assertNoValues();
subject.onNext("Foo");
observer1.assertValues("Foo");
TestObserver<String> observer2 = new TestObserver<>();
observable.subscribe(observer2);
observer2.assertValues("Foo");
}
@Test public void refCountToUpstream() {
PublishSubject<String> subject = PublishSubject.create();
final AtomicInteger count = new AtomicInteger();
Observable<String> observable = subject //
.doOnSubscribe(new Consumer<Disposable>() {
@Override public void accept(Disposable disposable) throws Exception {
count.incrementAndGet();
}
}) //
.doOnDispose(new Action() {
@Override public void run() throws Exception {
count.decrementAndGet();
}
}) //
.compose(ReplayingShare.<String>instance());
Disposable disposable1 = observable.subscribeWith(new TestObserver<String>());
assertEquals(1, count.get());
Disposable disposable2 = observable.subscribeWith(new TestObserver<String>());
assertEquals(1, count.get());
Disposable disposable3 = observable.subscribeWith(new TestObserver<String>());
assertEquals(1, count.get());
disposable1.dispose();
assertEquals(1, count.get());
disposable3.dispose();
assertEquals(1, count.get());
disposable2.dispose();
assertEquals(0, count.get());
}
@Test public void unsubscribeBeforeSubscribePreventsCacheEmission() {
PublishSubject<String> upstream = PublishSubject.create();
Observable<String> replayed = upstream.compose(ReplayingShare.<String>instance());
replayed.subscribe();
upstream.onNext("something to cache");
TestObserver<String> testObserver = new TestObserver<>();
testObserver.dispose();
replayed.subscribe(testObserver);
testObserver.assertNoValues();
}
@Test public void defaultValueOnSubscribe() {
PublishSubject<String> subject = PublishSubject.create();
Observable<String> observable = subject.compose(ReplayingShare.createWithDefault("default"));
TestObserver<String> observer1 = new TestObserver<>();
observable.subscribe(observer1);
observer1.assertValues("default");
subject.onNext("Foo");
observer1.assertValues("default", "Foo");
}
@Test public void defaultValueIsOverriddenByLatestEmissionForNewSubscriber() {
PublishSubject<String> subject = PublishSubject.create();
Observable<String> observable = subject.compose(ReplayingShare.createWithDefault("default"));
TestObserver<String> observer1 = new TestObserver<>();
observable.subscribe(observer1);
observer1.assertValues("default");
subject.onNext("Foo");
observer1.assertValues("default", "Foo");
TestObserver<String> observer2 = new TestObserver<>();
observable.subscribe(observer2);
observer2.assertValues("Foo");
}
@Test public void errorClearsCacheAndResubscribesStartingWithDefault() {
List<String> start = new ArrayList<>();
start.add("initA");
PublishSubject<String> upstream = PublishSubject.create();
Observable<String> replayed =
upstream.startWithIterable(start).compose(ReplayingShare.createWithDefault("default"));
TestObserver<String> observer1 = new TestObserver<>();
replayed.subscribe(observer1);
observer1.assertValues("default", "initA");
TestObserver<String> observer2 = new TestObserver<>();
replayed.subscribe(observer2);
observer1.assertValues("default", "initA");
RuntimeException r = new RuntimeException();
upstream.onError(r);
observer1.assertError(r);
observer2.assertError(r);
start.set(0, "initB");
TestObserver<String> observer3 = new TestObserver<>();
replayed.subscribe(observer3);
observer3.assertValues("default", "initB");
}
@Test
public void eventThrowsBadException() {
PublishSubject<String> stream = PublishSubject.create();
PublishSubject<String> lifecycle = PublishSubject.create();
TestObserver<String> testObserver = stream
.compose(RxLifecycle.<String, String>bind(lifecycle, CORRESPONDING_EVENTS))
.test();
// We get an error from the function for this lifecycle event
lifecycle.onNext("ick");
stream.onNext("1");
testObserver.assertNoValues();
// We only want to check for our IllegalArgumentException, but may have
// to wade through a CompositeException to get at it.
testObserver.assertError(new Predicate<Throwable>() {
@Override
public boolean test(Throwable throwable) throws Exception {
if (throwable instanceof CompositeException) {
CompositeException ce = (CompositeException) throwable;
for (Throwable t : ce.getExceptions()) {
if (t instanceof IllegalArgumentException) {
return true;
}
}
}
return false;
}
});
}