下面列出了怎么用io.reactivex.plugins.RxJavaPlugins的API类实例代码及写法,或者点击链接到github查看源代码。
@Override
public void onCreate() {
super.onCreate();
rxBleClient = RxBleClient.create(this);
RxBleClient.updateLogOptions(new LogOptions.Builder()
.setLogLevel(LogConstants.INFO)
.setMacAddressLogSetting(LogConstants.MAC_ADDRESS_FULL)
.setUuidsLogSetting(LogConstants.UUIDS_FULL)
.setShouldLogAttributeValues(true)
.build()
);
RxJavaPlugins.setErrorHandler(throwable -> {
if (throwable instanceof UndeliverableException && throwable.getCause() instanceof BleException) {
Log.v("SampleApplication", "Suppressed UndeliverableException: " + throwable.toString());
return; // ignore BleExceptions as they were surely delivered at least once
}
// add other custom handlers if needed
throw new RuntimeException("Unexpected Throwable in RxJavaPlugins error handler", throwable);
});
}
private void writeStreamEnd(AsyncResult<Void> result) {
try {
Action a;
if (result.succeeded()) {
synchronized (this) {
a = writeStreamEndHandler;
}
if (a != null) {
a.run();
}
} else {
Consumer<? super Throwable> c;
synchronized (this) {
c = this.writeStreamEndErrorHandler;
}
if (c != null) {
c.accept(result.cause());
}
}
} catch (Throwable t) {
Exceptions.throwIfFatal(t);
RxJavaPlugins.onError(t);
}
}
public Statement apply(Statement base, Description description) {
return new Statement() {
@Override
public void evaluate() throws Throwable {
Scheduler scheduler = TrampolineScheduler.instance();
Function<Callable<Scheduler>, Scheduler> schedulerFn = __ -> scheduler;
RxJavaPlugins.reset();
RxJavaPlugins.setInitIoSchedulerHandler(schedulerFn);
RxJavaPlugins.setInitNewThreadSchedulerHandler(schedulerFn);
RxAndroidPlugins.reset();
RxAndroidPlugins.setInitMainThreadSchedulerHandler(schedulerFn);
try {
base.evaluate();
} finally {
RxJavaPlugins.reset();
RxAndroidPlugins.reset();
}
}
};
}
@BeforeClass
public static void setUpRxSchedulers() {
Scheduler immediate = new Scheduler() {
@Override
public Disposable scheduleDirect(@NonNull Runnable run, long delay, @NonNull TimeUnit unit) {
// this prevents StackOverflowErrors when scheduling with a delay
return super.scheduleDirect(run, 0, unit);
}
@Override
public Scheduler.Worker createWorker() {
return new ExecutorScheduler.ExecutorWorker(Runnable::run, false);
}
};
RxJavaPlugins.setInitIoSchedulerHandler(scheduler -> immediate);
RxJavaPlugins.setInitComputationSchedulerHandler(scheduler -> immediate);
RxJavaPlugins.setInitNewThreadSchedulerHandler(scheduler -> immediate);
RxJavaPlugins.setInitSingleSchedulerHandler(scheduler -> immediate);
RxAndroidPlugins.setInitMainThreadSchedulerHandler(scheduler -> immediate);
}
public static void main(String[] args) {
RxJavaPlugins.setErrorHandler(e -> { });
Flowable.range(1, 10)
.parallel(4)
.runOn(Schedulers.io())
.map(v -> {
if (v == 2) {
throw new IOException();
}
Thread.sleep(2000);
return v;
})
.sequential()
.test()
.awaitDone(5, TimeUnit.SECONDS)
.assertFailure(IOException.class);
}
@Test
public void givenIOScheduler_whenCalled_shouldExecuteTheHooks() {
RxJavaPlugins.setInitIoSchedulerHandler((scheduler) -> {
initHookCalled = true;
return scheduler.call();
});
RxJavaPlugins.setIoSchedulerHandler((scheduler) -> {
hookCalled = true;
return scheduler;
});
Observable.range(1, 10)
.map(v -> v * 2)
.subscribeOn(Schedulers.io())
.test();
assertTrue(hookCalled && initHookCalled);
}
@Test
public void testToObservableAssemblyHook() {
FakeStream<String> stream = new FakeStream<>();
try {
final Observable<String> justMe = Observable.just("me");
RxJavaPlugins.setOnObservableAssembly(new Function<Observable, Observable>() {
@Override public Observable apply(Observable f) {
return justMe;
}
});
Observable<String> observable = ObservableHelper.toObservable(stream);
assertSame(observable, justMe);
Observable<String> observableFn = ObservableHelper.toObservable(stream, identity());
assertSame(observableFn, justMe);
} finally {
RxJavaPlugins.reset();
}
}
@Test
public void test() {
RxJavaPlugins.setOnObservableAssembly(o -> {
if (o instanceof ObservableFromArray) {
return new ObservableFromArray<>(new Integer[] { 4, 5, 6 });
}
return o;
});
Observable.just(1, 2, 3)
.filter(v -> v > 3)
.test()
.assertResult(4, 5, 6);
RxJavaPlugins.setOnObservableAssembly(null);
}
@Test
public void resultThrowingInOnCompletedDeliveredToPlugin() {
server.enqueue(new MockResponse());
final AtomicReference<Throwable> throwableRef = new AtomicReference<>();
RxJavaPlugins.setErrorHandler(new Consumer<Throwable>() {
@Override
public void accept(Throwable throwable) throws Exception {
if (!throwableRef.compareAndSet(null, throwable)) {
throw Exceptions.propagate(throwable);
}
}
});
RecordingObserver<Result<String>> observer = subscriberRule.create();
final RuntimeException e = new RuntimeException();
service.result().subscribe(new ForwardingObserver<Result<String>>(observer) {
@Override
public void onComplete() {
throw e;
}
});
observer.assertAnyValue();
assertThat(throwableRef.get()).isSameAs(e);
}
@Test public void enable_restoresSavedHooks() {
RxJavaPlugins.reset();
RxJavaAssemblyTracking.enable();
try {
CurrentTraceContextAssemblyTracking.create(currentTraceContext).enable();
CurrentTraceContextAssemblyTracking.disable();
Assert.assertNull(RxJavaPlugins.getOnCompletableAssembly());
Assert.assertNull(RxJavaPlugins.getOnSingleAssembly());
Assert.assertNull(RxJavaPlugins.getOnMaybeAssembly());
Assert.assertNull(RxJavaPlugins.getOnObservableAssembly());
Assert.assertNull(RxJavaPlugins.getOnFlowableAssembly());
Assert.assertNull(RxJavaPlugins.getOnConnectableFlowableAssembly());
Assert.assertNull(RxJavaPlugins.getOnConnectableObservableAssembly());
Assert.assertNull(RxJavaPlugins.getOnParallelAssembly());
} finally {
RxJavaAssemblyTracking.disable();
}
}
@Before
public void setup() {
MockitoAnnotations.initMocks(this);
RxJavaPlugins.setComputationSchedulerHandler(schedulerCallable -> testScheduler);
when(scanner.scan()).thenReturn(scanDataPublishSubject.hide());
when(bluetoothDetector.enabled()).thenReturn(bluetoothEnabledRelay.hide());
when(peripheralFactory.produce(any(), any())).thenReturn(peripheral);
when(peripheral.connect()).thenReturn(connectableStatePublishSubject.hide());
when(scanData.getBluetoothDevice()).thenReturn(bluetoothDevice);
coreConnectionManager =
new CoreConnectionManager(
context, bluetoothDetector, scanner, peripheralFactory);
}
@Test
public void givenAnyScheduler_whenCalled_shouldExecuteTheHook() {
RxJavaPlugins.setScheduleHandler((runnable) -> {
hookCalled = true;
return runnable;
});
Observable.range(1, 10)
.map(v -> v * 2)
.subscribeOn(Schedulers.single())
.test();
hookCalled = false;
Observable.range(1, 10)
.map(v -> v * 2)
.subscribeOn(Schedulers.computation())
.test();
assertTrue(hookCalled);
}
/**
* Disable the assembly tracking.
*/
public static void disable() {
if (lock.compareAndSet(false, true)) {
RxJavaPlugins.setOnCompletableAssembly(null);
RxJavaPlugins.setOnSingleAssembly(null);
RxJavaPlugins.setOnMaybeAssembly(null);
RxJavaPlugins.setOnObservableAssembly(null);
RxJavaPlugins.setOnFlowableAssembly(null);
RxJavaPlugins.setOnConnectableObservableAssembly(null);
RxJavaPlugins.setOnConnectableFlowableAssembly(null);
RxJavaPlugins.setOnParallelAssembly(null);
lock.set(false);
}
}
/**
* Disable {@link RequestContext} during operators.
*/
public static synchronized void disable() {
if (!enabled) {
return;
}
RxJavaPlugins.setOnObservableAssembly(oldOnObservableAssembly);
oldOnObservableAssembly = null;
RxJavaPlugins.setOnConnectableObservableAssembly(oldOnConnectableObservableAssembly);
oldOnConnectableObservableAssembly = null;
RxJavaPlugins.setOnCompletableAssembly(oldOnCompletableAssembly);
oldOnCompletableAssembly = null;
RxJavaPlugins.setOnSingleAssembly(oldOnSingleAssembly);
oldOnSingleAssembly = null;
RxJavaPlugins.setOnMaybeAssembly(oldOnMaybeAssembly);
oldOnMaybeAssembly = null;
RxJavaPlugins.setOnFlowableAssembly(oldOnFlowableAssembly);
oldOnFlowableAssembly = null;
RxJavaPlugins.setOnConnectableFlowableAssembly(oldOnConnectableFlowableAssembly);
oldOnConnectableFlowableAssembly = null;
RxJavaPlugins.setOnParallelAssembly(oldOnParallelAssembly);
oldOnParallelAssembly = null;
enabled = false;
}
@Override
public Statement apply(final Statement base, Description description) {
return new Statement() {
@Override
public void evaluate() throws Throwable {
RxAndroidPlugins.setMainThreadSchedulerHandler(scheduler -> Schedulers.trampoline());
RxAndroidPlugins.setInitMainThreadSchedulerHandler(schedulerCallable -> Schedulers.trampoline());
try {
base.evaluate();
} finally {
RxJavaPlugins.reset();
RxAndroidPlugins.reset();
}
}
};
}
@Test
public void givenNewThreadScheduler_whenCalled_shouldExecuteTheHook() {
RxJavaPlugins.setInitNewThreadSchedulerHandler((scheduler) -> {
initHookCalled = true;
return scheduler.call();
});
RxJavaPlugins.setNewThreadSchedulerHandler((scheduler) -> {
hookCalled = true;
return scheduler;
});
Observable.range(1, 15)
.map(v -> v * 2)
.subscribeOn(Schedulers.newThread())
.test();
assertTrue(hookCalled && initHookCalled);
}
@Test
public void composeWithOtherHook() throws Exception {
final AtomicInteger calledFlag = new AtomicInteger();
RxJavaPlugins.setOnSingleAssembly(single -> {
calledFlag.incrementAndGet();
return single;
});
final WebClient client = WebClient.of(rule.httpUri());
client.execute(RequestHeaders.of(HttpMethod.GET, "/single")).aggregate().get();
assertThat(calledFlag.get()).isEqualTo(3);
try {
RequestContextAssembly.enable();
client.execute(RequestHeaders.of(HttpMethod.GET, "/single")).aggregate().get();
assertThat(calledFlag.get()).isEqualTo(6);
} finally {
RequestContextAssembly.disable();
}
client.execute(RequestHeaders.of(HttpMethod.GET, "/single")).aggregate().get();
assertThat(calledFlag.get()).isEqualTo(9);
RxJavaPlugins.setOnSingleAssembly(null);
client.execute(RequestHeaders.of(HttpMethod.GET, "/single")).aggregate().get();
assertThat(calledFlag.get()).isEqualTo(9);
}
Observable<Integer> scalarCallableObservable(TraceContext expectedCallContext,
RuntimeException exception) {
return RxJavaPlugins.onAssembly(new ScalarCallableObservable() {
@Override public Integer call() {
assertThat(currentTraceContext.get()).isEqualTo(expectedCallContext);
throw exception;
}
});
}
@Override public void onError(Throwable t) {
if (done) {
RxJavaPlugins.onError(t);
return;
}
done = true;
Scope scope = contextScoper.maybeScope(assembled);
try { // retrolambda can't resolve this try/finally
downstream.onError(t);
} finally {
scope.close();
}
}
@Override
public void run() {
worker.dispose();
try {
observer.child.onSuccess(m);
} catch (Throwable e) {
RxJavaPlugins.onError(e);
} finally {
observer.dispose();
}
}
@Override
public void onSuccess(T t) {
if (isDisposed()) return;
lazySet(DisposableHelper.DISPOSED);
try {
removeObserver();
downstream.onSuccess(t);
} catch (Throwable ex) {
Exceptions.throwIfFatal(ex);
RxJavaPlugins.onError(ex);
}
}
@Override
public void onError(Throwable t) {
if (isDisposed()) {
RxJavaPlugins.onError(t);
return;
}
lazySet(DisposableHelper.DISPOSED);
try {
removeObserver();
downstream.onError(t);
} catch (Throwable e) {
Exceptions.throwIfFatal(e);
RxJavaPlugins.onError(new CompositeException(t, e));
}
}
@Override
public Void call() throws Exception {
if (!disposed && !cancelled) {
try {
run.run();
} catch (Throwable ex) {
Exceptions.throwIfFatal(ex);
RxJavaPlugins.onError(ex);
}
}
return null;
}
@Override
public void onComplete() {
if (isDisposed()) return;
lazySet(DisposableHelper.DISPOSED);
try {
removeObserver();
downstream.onComplete();
} catch (Throwable ex) {
Exceptions.throwIfFatal(ex);
RxJavaPlugins.onError(ex);
}
}
@Test
public void givenFlowable_whenAssembled_shouldExecuteTheHook() {
RxJavaPlugins.setOnFlowableAssembly(flowable -> {
hookCalled = true;
return flowable;
});
Flowable.range(1, 10);
assertTrue(hookCalled);
}
@Override
public void onError(Throwable t) {
if (isDisposed()) {
RxJavaPlugins.onError(t);
return;
}
lazySet(DisposableHelper.DISPOSED);
try {
removeObserver();
downstream.onError(t);
} catch (Throwable e) {
Exceptions.throwIfFatal(e);
RxJavaPlugins.onError(new CompositeException(t, e));
}
}
@Test
public void testErrorHandlerForUndeliverableAsyncTaskExceptions() {
DiagnosticEventFactory eventFactory = mock(DiagnosticEventFactory.class);
ExecutorStateEvent executorStateEvent = mock(ExecutorStateEvent.class);
RejectedTaskEvent rejectedTaskEvent = mock(RejectedTaskEvent.class);
when(eventFactory.rejectedTaskEvent(any(), any())).thenReturn(rejectedTaskEvent);
when(eventFactory.executorStateEvent(any(), any())).thenReturn(executorStateEvent);
Scheduler testScheduler = new Scheduler(checkpointConfig, coordinatorConfig, leaseManagementConfig,
lifecycleConfig, metricsConfig, processorConfig, retrievalConfig, eventFactory);
Scheduler schedulerSpy = spy(testScheduler);
// reject task on third loop
doCallRealMethod()
.doCallRealMethod()
.doAnswer(invocation -> {
// trigger rejected task in RxJava layer
RxJavaPlugins.onError(new RejectedExecutionException("Test exception."));
return null;
}).when(schedulerSpy).runProcessLoop();
// Scheduler sets error handler in initialize method
schedulerSpy.initialize();
schedulerSpy.runProcessLoop();
schedulerSpy.runProcessLoop();
schedulerSpy.runProcessLoop();
verify(eventFactory, times(1)).rejectedTaskEvent(eq(executorStateEvent), any());
verify(rejectedTaskEvent, times(1)).accept(any());
}
/**
* Exceptions in the RxJava layer can fail silently unless an error handler is set to propagate these exceptions
* back to the KCL, as is done below.
*/
private void registerErrorHandlerForUndeliverableAsyncTaskExceptions() {
RxJavaPlugins.setErrorHandler(t -> {
ExecutorStateEvent executorStateEvent = diagnosticEventFactory.executorStateEvent(executorService,
leaseCoordinator);
RejectedTaskEvent rejectedTaskEvent = diagnosticEventFactory.rejectedTaskEvent(executorStateEvent, t);
rejectedTaskEvent.accept(diagnosticEventHandler);
});
}
Single<Integer> callableSingle(TraceContext expectedCallContext, RuntimeException exception) {
return RxJavaPlugins.onAssembly(new CallableSingle() {
@Override public Integer call() {
assertThat(currentTraceContext.get()).isEqualTo(expectedCallContext);
throw exception;
}
});
}
@Override
public void onError(Throwable e) {
if (isDisposed()) {
RxJavaPlugins.onError(e);
} else {
downstream.onError(e);
}
}