类io.reactivex.plugins.RxJavaPlugins源码实例Demo

下面列出了怎么用io.reactivex.plugins.RxJavaPlugins的API类实例代码及写法,或者点击链接到github查看源代码。

源代码1 项目: RxAndroidBle   文件: SampleApplication.java
@Override
public void onCreate() {
    super.onCreate();
    rxBleClient = RxBleClient.create(this);
    RxBleClient.updateLogOptions(new LogOptions.Builder()
            .setLogLevel(LogConstants.INFO)
            .setMacAddressLogSetting(LogConstants.MAC_ADDRESS_FULL)
            .setUuidsLogSetting(LogConstants.UUIDS_FULL)
            .setShouldLogAttributeValues(true)
            .build()
    );
    RxJavaPlugins.setErrorHandler(throwable -> {
        if (throwable instanceof UndeliverableException && throwable.getCause() instanceof BleException) {
            Log.v("SampleApplication", "Suppressed UndeliverableException: " + throwable.toString());
            return; // ignore BleExceptions as they were surely delivered at least once
        }
        // add other custom handlers if needed
        throw new RuntimeException("Unexpected Throwable in RxJavaPlugins error handler", throwable);
    });
}
 
源代码2 项目: vertx-rx   文件: WriteStreamObserverImpl.java
private void writeStreamEnd(AsyncResult<Void> result) {
  try {
    Action a;
    if (result.succeeded()) {
      synchronized (this) {
        a = writeStreamEndHandler;
      }
      if (a != null) {
        a.run();
      }
    } else {
      Consumer<? super Throwable> c;
      synchronized (this) {
        c = this.writeStreamEndErrorHandler;
      }
      if (c != null) {
        c.accept(result.cause());
      }
    }
  } catch (Throwable t) {
    Exceptions.throwIfFatal(t);
    RxJavaPlugins.onError(t);
  }
}
 
源代码3 项目: quill   文件: RxSchedulersRule.java
public Statement apply(Statement base, Description description) {
    return new Statement() {
        @Override
        public void evaluate() throws Throwable {
            Scheduler scheduler = TrampolineScheduler.instance();
            Function<Callable<Scheduler>, Scheduler> schedulerFn = __ -> scheduler;
            RxJavaPlugins.reset();
            RxJavaPlugins.setInitIoSchedulerHandler(schedulerFn);
            RxJavaPlugins.setInitNewThreadSchedulerHandler(schedulerFn);
            RxAndroidPlugins.reset();
            RxAndroidPlugins.setInitMainThreadSchedulerHandler(schedulerFn);

            try {
                base.evaluate();
            } finally {
                RxJavaPlugins.reset();
                RxAndroidPlugins.reset();
            }
        }
    };
}
 
源代码4 项目: Hentoid   文件: AbstractObjectBoxTest.java
@BeforeClass
public static void setUpRxSchedulers() {
    Scheduler immediate = new Scheduler() {
        @Override
        public Disposable scheduleDirect(@NonNull Runnable run, long delay, @NonNull TimeUnit unit) {
            // this prevents StackOverflowErrors when scheduling with a delay
            return super.scheduleDirect(run, 0, unit);
        }

        @Override
        public Scheduler.Worker createWorker() {
            return new ExecutorScheduler.ExecutorWorker(Runnable::run, false);
        }
    };

    RxJavaPlugins.setInitIoSchedulerHandler(scheduler -> immediate);
    RxJavaPlugins.setInitComputationSchedulerHandler(scheduler -> immediate);
    RxJavaPlugins.setInitNewThreadSchedulerHandler(scheduler -> immediate);
    RxJavaPlugins.setInitSingleSchedulerHandler(scheduler -> immediate);
    RxAndroidPlugins.setInitMainThreadSchedulerHandler(scheduler -> immediate);
}
 
源代码5 项目: akarnokd-misc   文件: ParallelInterrupt.java
public static void main(String[] args) {
    RxJavaPlugins.setErrorHandler(e -> { });
    Flowable.range(1, 10)
    .parallel(4)
    .runOn(Schedulers.io())
    .map(v -> {
       if (v == 2) {
          throw new IOException();
       }
       Thread.sleep(2000);
       return v;
    })
    .sequential()
    .test()
    .awaitDone(5, TimeUnit.SECONDS)
    .assertFailure(IOException.class);
}
 
源代码6 项目: tutorials   文件: RxJavaHooksManualTest.java
@Test
public void givenIOScheduler_whenCalled_shouldExecuteTheHooks() {

    RxJavaPlugins.setInitIoSchedulerHandler((scheduler) -> {
        initHookCalled = true;
        return scheduler.call();
    });

    RxJavaPlugins.setIoSchedulerHandler((scheduler) -> {
        hookCalled = true;
        return scheduler;
    });

    Observable.range(1, 10)
        .map(v -> v * 2)
        .subscribeOn(Schedulers.io())
        .test();
    assertTrue(hookCalled && initHookCalled);
}
 
源代码7 项目: vertx-rx   文件: HelperTest.java
@Test
public void testToObservableAssemblyHook() {
  FakeStream<String> stream = new FakeStream<>();
  try {
    final Observable<String> justMe = Observable.just("me");
    RxJavaPlugins.setOnObservableAssembly(new Function<Observable, Observable>() {
      @Override public Observable apply(Observable f) {
        return justMe;
      }
    });
    Observable<String> observable = ObservableHelper.toObservable(stream);
    assertSame(observable, justMe);
    Observable<String> observableFn = ObservableHelper.toObservable(stream, identity());
    assertSame(observableFn, justMe);
  } finally {
    RxJavaPlugins.reset();
  }
}
 
源代码8 项目: akarnokd-misc   文件: AssemblyHooksExample.java
@Test
public void test() {
RxJavaPlugins.setOnObservableAssembly(o -> {
    if (o instanceof ObservableFromArray) {
        return new ObservableFromArray<>(new Integer[] { 4, 5, 6 });
    }
    return o;
});

Observable.just(1, 2, 3)
.filter(v -> v > 3)
.test()
.assertResult(4, 5, 6);

RxJavaPlugins.setOnObservableAssembly(null);
}
 
源代码9 项目: retrocache   文件: ObservableThrowingTest.java
@Test
public void resultThrowingInOnCompletedDeliveredToPlugin() {
    server.enqueue(new MockResponse());

    final AtomicReference<Throwable> throwableRef = new AtomicReference<>();
    RxJavaPlugins.setErrorHandler(new Consumer<Throwable>() {
        @Override
        public void accept(Throwable throwable) throws Exception {
            if (!throwableRef.compareAndSet(null, throwable)) {
                throw Exceptions.propagate(throwable);
            }
        }
    });

    RecordingObserver<Result<String>> observer = subscriberRule.create();
    final RuntimeException e = new RuntimeException();
    service.result().subscribe(new ForwardingObserver<Result<String>>(observer) {
        @Override
        public void onComplete() {
            throw e;
        }
    });

    observer.assertAnyValue();
    assertThat(throwableRef.get()).isSameAs(e);
}
 
@Test public void enable_restoresSavedHooks() {
  RxJavaPlugins.reset();

  RxJavaAssemblyTracking.enable();
  try {
    CurrentTraceContextAssemblyTracking.create(currentTraceContext).enable();

    CurrentTraceContextAssemblyTracking.disable();

    Assert.assertNull(RxJavaPlugins.getOnCompletableAssembly());
    Assert.assertNull(RxJavaPlugins.getOnSingleAssembly());
    Assert.assertNull(RxJavaPlugins.getOnMaybeAssembly());
    Assert.assertNull(RxJavaPlugins.getOnObservableAssembly());
    Assert.assertNull(RxJavaPlugins.getOnFlowableAssembly());
    Assert.assertNull(RxJavaPlugins.getOnConnectableFlowableAssembly());
    Assert.assertNull(RxJavaPlugins.getOnConnectableObservableAssembly());
    Assert.assertNull(RxJavaPlugins.getOnParallelAssembly());
  } finally {
    RxJavaAssemblyTracking.disable();
  }
}
 
源代码11 项目: RxCentralBle   文件: CoreConnectionManagerTest.java
@Before
public void setup() {
  MockitoAnnotations.initMocks(this);

  RxJavaPlugins.setComputationSchedulerHandler(schedulerCallable -> testScheduler);

  when(scanner.scan()).thenReturn(scanDataPublishSubject.hide());
  when(bluetoothDetector.enabled()).thenReturn(bluetoothEnabledRelay.hide());
  when(peripheralFactory.produce(any(), any())).thenReturn(peripheral);
  when(peripheral.connect()).thenReturn(connectableStatePublishSubject.hide());
  when(scanData.getBluetoothDevice()).thenReturn(bluetoothDevice);

  coreConnectionManager =
      new CoreConnectionManager(
          context, bluetoothDetector, scanner, peripheralFactory);
}
 
源代码12 项目: tutorials   文件: RxJavaHooksUnitTest.java
@Test
public void givenAnyScheduler_whenCalled_shouldExecuteTheHook() {

    RxJavaPlugins.setScheduleHandler((runnable) -> {
        hookCalled = true;
        return runnable;
    });

    Observable.range(1, 10)
        .map(v -> v * 2)
        .subscribeOn(Schedulers.single())
        .test();
    hookCalled = false;
    Observable.range(1, 10)
        .map(v -> v * 2)
        .subscribeOn(Schedulers.computation())
        .test();
    assertTrue(hookCalled);
}
 
源代码13 项目: RxJava2Debug   文件: RxJavaAssemblyTracking.java
/**
 * Disable the assembly tracking.
 */
public static void disable() {
    if (lock.compareAndSet(false, true)) {

        RxJavaPlugins.setOnCompletableAssembly(null);
        RxJavaPlugins.setOnSingleAssembly(null);
        RxJavaPlugins.setOnMaybeAssembly(null);

        RxJavaPlugins.setOnObservableAssembly(null);
        RxJavaPlugins.setOnFlowableAssembly(null);
        RxJavaPlugins.setOnConnectableObservableAssembly(null);
        RxJavaPlugins.setOnConnectableFlowableAssembly(null);

        RxJavaPlugins.setOnParallelAssembly(null);

        lock.set(false);
    }
}
 
源代码14 项目: armeria   文件: RequestContextAssembly.java
/**
 * Disable {@link RequestContext} during operators.
 */
public static synchronized void disable() {
    if (!enabled) {
        return;
    }
    RxJavaPlugins.setOnObservableAssembly(oldOnObservableAssembly);
    oldOnObservableAssembly = null;
    RxJavaPlugins.setOnConnectableObservableAssembly(oldOnConnectableObservableAssembly);
    oldOnConnectableObservableAssembly = null;
    RxJavaPlugins.setOnCompletableAssembly(oldOnCompletableAssembly);
    oldOnCompletableAssembly = null;
    RxJavaPlugins.setOnSingleAssembly(oldOnSingleAssembly);
    oldOnSingleAssembly = null;
    RxJavaPlugins.setOnMaybeAssembly(oldOnMaybeAssembly);
    oldOnMaybeAssembly = null;
    RxJavaPlugins.setOnFlowableAssembly(oldOnFlowableAssembly);
    oldOnFlowableAssembly = null;
    RxJavaPlugins.setOnConnectableFlowableAssembly(oldOnConnectableFlowableAssembly);
    oldOnConnectableFlowableAssembly = null;
    RxJavaPlugins.setOnParallelAssembly(oldOnParallelAssembly);
    oldOnParallelAssembly = null;
    enabled = false;
}
 
源代码15 项目: RxCommand   文件: ImmediateSchedulersRule.java
@Override
public Statement apply(final Statement base, Description description) {
    return new Statement() {
        @Override
        public void evaluate() throws Throwable {

            RxAndroidPlugins.setMainThreadSchedulerHandler(scheduler ->  Schedulers.trampoline());
            RxAndroidPlugins.setInitMainThreadSchedulerHandler(schedulerCallable ->  Schedulers.trampoline());

            try {
                base.evaluate();
            } finally {
                RxJavaPlugins.reset();
                RxAndroidPlugins.reset();
            }
        }
    };
}
 
源代码16 项目: tutorials   文件: RxJavaHooksManualTest.java
@Test
public void givenNewThreadScheduler_whenCalled_shouldExecuteTheHook() {

    RxJavaPlugins.setInitNewThreadSchedulerHandler((scheduler) -> {
        initHookCalled = true;
        return scheduler.call();
    });

    RxJavaPlugins.setNewThreadSchedulerHandler((scheduler) -> {
        hookCalled = true;
        return scheduler;
    });

    Observable.range(1, 15)
        .map(v -> v * 2)
        .subscribeOn(Schedulers.newThread())
        .test();
    assertTrue(hookCalled && initHookCalled);
}
 
源代码17 项目: armeria   文件: RequestContextAssemblyTest.java
@Test
public void composeWithOtherHook() throws Exception {
    final AtomicInteger calledFlag = new AtomicInteger();
    RxJavaPlugins.setOnSingleAssembly(single -> {
        calledFlag.incrementAndGet();
        return single;
    });
    final WebClient client = WebClient.of(rule.httpUri());
    client.execute(RequestHeaders.of(HttpMethod.GET, "/single")).aggregate().get();
    assertThat(calledFlag.get()).isEqualTo(3);

    try {
        RequestContextAssembly.enable();
        client.execute(RequestHeaders.of(HttpMethod.GET, "/single")).aggregate().get();
        assertThat(calledFlag.get()).isEqualTo(6);
    } finally {
        RequestContextAssembly.disable();
    }
    client.execute(RequestHeaders.of(HttpMethod.GET, "/single")).aggregate().get();
    assertThat(calledFlag.get()).isEqualTo(9);

    RxJavaPlugins.setOnSingleAssembly(null);
    client.execute(RequestHeaders.of(HttpMethod.GET, "/single")).aggregate().get();
    assertThat(calledFlag.get()).isEqualTo(9);
}
 
Observable<Integer> scalarCallableObservable(TraceContext expectedCallContext,
  RuntimeException exception) {
  return RxJavaPlugins.onAssembly(new ScalarCallableObservable() {
    @Override public Integer call() {
      assertThat(currentTraceContext.get()).isEqualTo(expectedCallContext);
      throw exception;
    }
  });
}
 
源代码19 项目: brave   文件: TraceContextSubscriber.java
@Override public void onError(Throwable t) {
  if (done) {
    RxJavaPlugins.onError(t);
    return;
  }
  done = true;

  Scope scope = contextScoper.maybeScope(assembled);
  try { // retrolambda can't resolve this try/finally
    downstream.onError(t);
  } finally {
    scope.close();
  }
}
 
源代码20 项目: rxjava2-jdbc   文件: MemberSingle.java
@Override
public void run() {
    worker.dispose();
    try {
        observer.child.onSuccess(m);
    } catch (Throwable e) {
        RxJavaPlugins.onError(e);
    } finally {
        observer.dispose();
    }
}
 
源代码21 项目: rxjava-RxLife   文件: LifeMaybeObserver.java
@Override
public void onSuccess(T t) {
    if (isDisposed()) return;
    lazySet(DisposableHelper.DISPOSED);
    try {
        removeObserver();
        downstream.onSuccess(t);
    } catch (Throwable ex) {
        Exceptions.throwIfFatal(ex);
        RxJavaPlugins.onError(ex);
    }
}
 
源代码22 项目: rxjava-RxLife   文件: LifeMaybeObserver.java
@Override
public void onError(Throwable t) {
    if (isDisposed()) {
        RxJavaPlugins.onError(t);
        return;
    }
    lazySet(DisposableHelper.DISPOSED);
    try {
        removeObserver();
        downstream.onError(t);
    } catch (Throwable e) {
        Exceptions.throwIfFatal(e);
        RxJavaPlugins.onError(new CompositeException(t, e));
    }
}
 
源代码23 项目: akarnokd-misc   文件: WeakParallelScheduler.java
@Override
public Void call() throws Exception {
    if (!disposed && !cancelled) {
        try {
            run.run();
        } catch (Throwable ex) {
            Exceptions.throwIfFatal(ex);
            RxJavaPlugins.onError(ex);
        }
    }
    return null;
}
 
源代码24 项目: rxjava-RxLife   文件: LifeCompletableObserver.java
@Override
public void onComplete() {
    if (isDisposed()) return;
    lazySet(DisposableHelper.DISPOSED);
    try {
        removeObserver();
        downstream.onComplete();
    } catch (Throwable ex) {
        Exceptions.throwIfFatal(ex);
        RxJavaPlugins.onError(ex);
    }
}
 
源代码25 项目: tutorials   文件: RxJavaHooksUnitTest.java
@Test
public void givenFlowable_whenAssembled_shouldExecuteTheHook() {

    RxJavaPlugins.setOnFlowableAssembly(flowable -> {
        hookCalled = true;
        return flowable;
    });

    Flowable.range(1, 10);
    assertTrue(hookCalled);
}
 
源代码26 项目: rxjava-RxLife   文件: LifeSingleObserver.java
@Override
public void onError(Throwable t) {
    if (isDisposed()) {
        RxJavaPlugins.onError(t);
        return;
    }
    lazySet(DisposableHelper.DISPOSED);
    try {
        removeObserver();
        downstream.onError(t);
    } catch (Throwable e) {
        Exceptions.throwIfFatal(e);
        RxJavaPlugins.onError(new CompositeException(t, e));
    }
}
 
源代码27 项目: amazon-kinesis-client   文件: SchedulerTest.java
@Test
public void testErrorHandlerForUndeliverableAsyncTaskExceptions() {
    DiagnosticEventFactory eventFactory = mock(DiagnosticEventFactory.class);
    ExecutorStateEvent executorStateEvent = mock(ExecutorStateEvent.class);
    RejectedTaskEvent rejectedTaskEvent = mock(RejectedTaskEvent.class);

    when(eventFactory.rejectedTaskEvent(any(), any())).thenReturn(rejectedTaskEvent);
    when(eventFactory.executorStateEvent(any(), any())).thenReturn(executorStateEvent);

    Scheduler testScheduler = new Scheduler(checkpointConfig, coordinatorConfig, leaseManagementConfig,
            lifecycleConfig, metricsConfig, processorConfig, retrievalConfig, eventFactory);

    Scheduler schedulerSpy = spy(testScheduler);

    // reject task on third loop
    doCallRealMethod()
            .doCallRealMethod()
            .doAnswer(invocation -> {
                // trigger rejected task in RxJava layer
                 RxJavaPlugins.onError(new RejectedExecutionException("Test exception."));
                 return null;
            }).when(schedulerSpy).runProcessLoop();

    // Scheduler sets error handler in initialize method
    schedulerSpy.initialize();
    schedulerSpy.runProcessLoop();
    schedulerSpy.runProcessLoop();
    schedulerSpy.runProcessLoop();

    verify(eventFactory, times(1)).rejectedTaskEvent(eq(executorStateEvent), any());
    verify(rejectedTaskEvent, times(1)).accept(any());
}
 
源代码28 项目: amazon-kinesis-client   文件: Scheduler.java
/**
 * Exceptions in the RxJava layer can fail silently unless an error handler is set to propagate these exceptions
 * back to the KCL, as is done below.
 */
private void registerErrorHandlerForUndeliverableAsyncTaskExceptions() {
    RxJavaPlugins.setErrorHandler(t -> {
        ExecutorStateEvent executorStateEvent = diagnosticEventFactory.executorStateEvent(executorService,
                leaseCoordinator);
        RejectedTaskEvent rejectedTaskEvent = diagnosticEventFactory.rejectedTaskEvent(executorStateEvent, t);
        rejectedTaskEvent.accept(diagnosticEventHandler);
    });
}
 
Single<Integer> callableSingle(TraceContext expectedCallContext, RuntimeException exception) {
  return RxJavaPlugins.onAssembly(new CallableSingle() {
    @Override public Integer call() {
      assertThat(currentTraceContext.get()).isEqualTo(expectedCallContext);
      throw exception;
    }
  });
}
 
源代码30 项目: sqlitemagic   文件: OperatorRunListQuery.java
@Override
public void onError(Throwable e) {
  if (isDisposed()) {
    RxJavaPlugins.onError(e);
  } else {
    downstream.onError(e);
  }
}
 
 类所在包
 同包方法