下面列出了怎么用io.reactivex.rxjava3.plugins.RxJavaPlugins的API类实例代码及写法,或者点击链接到github查看源代码。
@Override
public Statement apply(final Statement statement, Description description) {
return new Statement() {
@Override
public void evaluate() throws Throwable {
RxJavaPlugins.setIoSchedulerHandler(scheduler -> testScheduler);
RxJavaPlugins.setComputationSchedulerHandler(scheduler -> testScheduler);
RxJavaPlugins.setNewThreadSchedulerHandler(scheduler -> testScheduler);
try {
statement.evaluate();
} finally {
RxJavaPlugins.reset();
}
}
};
}
/**
* Disable {@link RequestContext} during operators.
*/
public static synchronized void disable() {
if (!enabled) {
return;
}
RxJavaPlugins.setOnObservableAssembly(oldOnObservableAssembly);
oldOnObservableAssembly = null;
RxJavaPlugins.setOnConnectableObservableAssembly(oldOnConnectableObservableAssembly);
oldOnConnectableObservableAssembly = null;
RxJavaPlugins.setOnCompletableAssembly(oldOnCompletableAssembly);
oldOnCompletableAssembly = null;
RxJavaPlugins.setOnSingleAssembly(oldOnSingleAssembly);
oldOnSingleAssembly = null;
RxJavaPlugins.setOnMaybeAssembly(oldOnMaybeAssembly);
oldOnMaybeAssembly = null;
RxJavaPlugins.setOnFlowableAssembly(oldOnFlowableAssembly);
oldOnFlowableAssembly = null;
RxJavaPlugins.setOnConnectableFlowableAssembly(oldOnConnectableFlowableAssembly);
oldOnConnectableFlowableAssembly = null;
RxJavaPlugins.setOnParallelAssembly(oldOnParallelAssembly);
oldOnParallelAssembly = null;
enabled = false;
}
@Test
void composeWithOtherHook() throws Exception {
final AtomicInteger calledFlag = new AtomicInteger();
RxJavaPlugins.setOnSingleAssembly(single -> {
calledFlag.incrementAndGet();
return single;
});
final WebClient client = WebClient.of(server.httpUri());
client.execute(RequestHeaders.of(HttpMethod.GET, "/single")).aggregate().get();
assertThat(calledFlag.get()).isEqualTo(3);
try {
RequestContextAssembly.enable();
client.execute(RequestHeaders.of(HttpMethod.GET, "/single")).aggregate().get();
assertThat(calledFlag.get()).isEqualTo(6);
} finally {
RequestContextAssembly.disable();
}
client.execute(RequestHeaders.of(HttpMethod.GET, "/single")).aggregate().get();
assertThat(calledFlag.get()).isEqualTo(9);
RxJavaPlugins.setOnSingleAssembly(null);
client.execute(RequestHeaders.of(HttpMethod.GET, "/single")).aggregate().get();
assertThat(calledFlag.get()).isEqualTo(9);
}
/**
* Create an event source from the given RxJava streams.
*
* <p>All streams must be mapped to your event type.
*
* @param sources the observables you want to include in this event source
* @param <E> the event type
* @return an EventSource based on the provided observables
*/
@SafeVarargs
public static <E> EventSource<E> fromObservables(@NonNull final ObservableSource<E>... sources) {
final Observable<E> eventSource = Observable.mergeArray(sources);
return new EventSource<E>() {
@Nonnull
@Override
public com.spotify.mobius.disposables.Disposable subscribe(
com.spotify.mobius.functions.Consumer<E> eventConsumer) {
final Disposable disposable =
eventSource.subscribe(
new Consumer<E>() {
@Override
public void accept(E value) throws Throwable {
eventConsumer.accept(value);
}
},
new Consumer<Throwable>() {
@Override
public void accept(Throwable error) throws Throwable {
RxJavaPlugins.onError(error);
}
});
return new com.spotify.mobius.disposables.Disposable() {
@Override
public void dispose() {
disposable.dispose();
}
};
}
};
}
private static <F, E> Consumer<Throwable> defaultOnError(
final ObservableTransformer<? extends F, E> effectHandler) {
return new Consumer<Throwable>() {
@Override
public void accept(Throwable throwable) throws Throwable {
RxJavaPlugins.onError(
new ConnectionException(
"in effect handler: " + effectHandler.getClass().toString(), throwable));
}
};
}
@Test
public void shouldHandleNullRxJavaErrorHandler() throws Exception {
// given no RxJava error handler
RxJavaPlugins.setErrorHandler(null);
// and a router with a broken effect handler
publishSubject = PublishSubject.create();
testSubscriber = TestObserver.create();
final RuntimeException expected = new RuntimeException("expected!");
ObservableTransformer<TestEffect, TestEvent> router =
RxMobius.<TestEffect, TestEvent>subtypeEffectHandler()
.addFunction(
A.class,
a -> {
throw expected;
})
.build();
publishSubject.compose(router).subscribe(testSubscriber);
// when an event is sent, it doesn't crash (the exception does get printed to stderr)
publishSubject.onNext(A.create(1));
// and the right exception is forwarded to the test subscriber
testSubscriber.assertError(t -> t == expected);
}
public App() {
mContext = this;
RxJavaPlugins.setErrorHandler(e -> {
if (e.getMessage() != null && e.getMessage().contains("shutdownNow")) {
// Is propagated from gRPC when shutting down channel
} else {
ZapLog.debug("RxJava", e.getMessage());
}
});
}
public static <I, O> Connectable<I, O> fromTransformer(
@NonNull final ObservableTransformer<I, O> transformer) {
checkNotNull(transformer);
return new Connectable<I, O>() {
@Nonnull
@Override
public Connection<I> connect(com.spotify.mobius.functions.Consumer<O> output) {
final PublishSubject<I> subject = PublishSubject.create();
final Disposable disposable =
subject
.compose(transformer)
.subscribe(
new Consumer<O>() {
@Override
public void accept(O value) throws Throwable {
output.accept(value);
}
},
new Consumer<Throwable>() {
@Override
public void accept(Throwable error) throws Throwable {
RxJavaPlugins.onError(error);
}
},
new Action() {
@Override
public void run() throws Throwable {}
});
return new Connection<I>() {
public void accept(I effect) {
subject.onNext(effect);
}
@Override
public void dispose() {
disposable.dispose();
}
};
}
};
}
@Test
public void test() {
RxJavaPlugins.setErrorHandler(error -> System.out.println(error));
PublishProcessor<Integer> main = PublishProcessor.create();
PublishProcessor<Integer> inner = PublishProcessor.create();
// switchMapDelayError will delay all errors
TestSubscriber<Integer> ts = main.switchMapDelayError(v -> inner).test();
main.onNext(1);
// the inner fails
inner.onError(new IOException());
// the consumer is still clueless
ts.assertEmpty();
// the consumer cancels
ts.cancel();
}