类io.reactivex.rxjava3.plugins.RxJavaPlugins源码实例Demo

下面列出了怎么用io.reactivex.rxjava3.plugins.RxJavaPlugins的API类实例代码及写法,或者点击链接到github查看源代码。

源代码1 项目: resilience4j   文件: TestSchedulerRule.java
@Override
public Statement apply(final Statement statement, Description description) {
    return new Statement() {
        @Override
        public void evaluate() throws Throwable {
            RxJavaPlugins.setIoSchedulerHandler(scheduler -> testScheduler);
            RxJavaPlugins.setComputationSchedulerHandler(scheduler -> testScheduler);
            RxJavaPlugins.setNewThreadSchedulerHandler(scheduler -> testScheduler);
            try {
                statement.evaluate();
            } finally {
                RxJavaPlugins.reset();
            }
        }
    };
}
 
源代码2 项目: armeria   文件: RequestContextAssembly.java
/**
 * Disable {@link RequestContext} during operators.
 */
public static synchronized void disable() {
    if (!enabled) {
        return;
    }
    RxJavaPlugins.setOnObservableAssembly(oldOnObservableAssembly);
    oldOnObservableAssembly = null;
    RxJavaPlugins.setOnConnectableObservableAssembly(oldOnConnectableObservableAssembly);
    oldOnConnectableObservableAssembly = null;
    RxJavaPlugins.setOnCompletableAssembly(oldOnCompletableAssembly);
    oldOnCompletableAssembly = null;
    RxJavaPlugins.setOnSingleAssembly(oldOnSingleAssembly);
    oldOnSingleAssembly = null;
    RxJavaPlugins.setOnMaybeAssembly(oldOnMaybeAssembly);
    oldOnMaybeAssembly = null;
    RxJavaPlugins.setOnFlowableAssembly(oldOnFlowableAssembly);
    oldOnFlowableAssembly = null;
    RxJavaPlugins.setOnConnectableFlowableAssembly(oldOnConnectableFlowableAssembly);
    oldOnConnectableFlowableAssembly = null;
    RxJavaPlugins.setOnParallelAssembly(oldOnParallelAssembly);
    oldOnParallelAssembly = null;
    enabled = false;
}
 
源代码3 项目: armeria   文件: RequestContextAssemblyTest.java
@Test
void composeWithOtherHook() throws Exception {
    final AtomicInteger calledFlag = new AtomicInteger();
    RxJavaPlugins.setOnSingleAssembly(single -> {
        calledFlag.incrementAndGet();
        return single;
    });
    final WebClient client = WebClient.of(server.httpUri());
    client.execute(RequestHeaders.of(HttpMethod.GET, "/single")).aggregate().get();
    assertThat(calledFlag.get()).isEqualTo(3);

    try {
        RequestContextAssembly.enable();
        client.execute(RequestHeaders.of(HttpMethod.GET, "/single")).aggregate().get();
        assertThat(calledFlag.get()).isEqualTo(6);
    } finally {
        RequestContextAssembly.disable();
    }
    client.execute(RequestHeaders.of(HttpMethod.GET, "/single")).aggregate().get();
    assertThat(calledFlag.get()).isEqualTo(9);

    RxJavaPlugins.setOnSingleAssembly(null);
    client.execute(RequestHeaders.of(HttpMethod.GET, "/single")).aggregate().get();
    assertThat(calledFlag.get()).isEqualTo(9);
}
 
源代码4 项目: mobius   文件: RxEventSources.java
/**
 * Create an event source from the given RxJava streams.
 *
 * <p>All streams must be mapped to your event type.
 *
 * @param sources the observables you want to include in this event source
 * @param <E> the event type
 * @return an EventSource based on the provided observables
 */
@SafeVarargs
public static <E> EventSource<E> fromObservables(@NonNull final ObservableSource<E>... sources) {
  final Observable<E> eventSource = Observable.mergeArray(sources);
  return new EventSource<E>() {

    @Nonnull
    @Override
    public com.spotify.mobius.disposables.Disposable subscribe(
        com.spotify.mobius.functions.Consumer<E> eventConsumer) {
      final Disposable disposable =
          eventSource.subscribe(
              new Consumer<E>() {
                @Override
                public void accept(E value) throws Throwable {
                  eventConsumer.accept(value);
                }
              },
              new Consumer<Throwable>() {
                @Override
                public void accept(Throwable error) throws Throwable {
                  RxJavaPlugins.onError(error);
                }
              });
      return new com.spotify.mobius.disposables.Disposable() {
        @Override
        public void dispose() {
          disposable.dispose();
        }
      };
    }
  };
}
 
源代码5 项目: mobius   文件: RxMobius.java
private static <F, E> Consumer<Throwable> defaultOnError(
    final ObservableTransformer<? extends F, E> effectHandler) {
  return new Consumer<Throwable>() {
    @Override
    public void accept(Throwable throwable) throws Throwable {
      RxJavaPlugins.onError(
          new ConnectionException(
              "in effect handler: " + effectHandler.getClass().toString(), throwable));
    }
  };
}
 
源代码6 项目: mobius   文件: MobiusEffectRouterTest.java
@Test
public void shouldHandleNullRxJavaErrorHandler() throws Exception {
  // given no RxJava error handler
  RxJavaPlugins.setErrorHandler(null);

  // and a router with a broken effect handler
  publishSubject = PublishSubject.create();
  testSubscriber = TestObserver.create();

  final RuntimeException expected = new RuntimeException("expected!");
  ObservableTransformer<TestEffect, TestEvent> router =
      RxMobius.<TestEffect, TestEvent>subtypeEffectHandler()
          .addFunction(
              A.class,
              a -> {
                throw expected;
              })
          .build();

  publishSubject.compose(router).subscribe(testSubscriber);

  // when an event is sent, it doesn't crash (the exception does get printed to stderr)
  publishSubject.onNext(A.create(1));

  // and the right exception is forwarded to the test subscriber
  testSubscriber.assertError(t -> t == expected);
}
 
源代码7 项目: zap-android   文件: App.java
public App() {
    mContext = this;

    RxJavaPlugins.setErrorHandler(e -> {
        if (e.getMessage() != null && e.getMessage().contains("shutdownNow")) {
            // Is propagated from gRPC when shutting down channel
        } else {
            ZapLog.debug("RxJava", e.getMessage());
        }
    });
}
 
源代码8 项目: mobius   文件: RxConnectables.java
public static <I, O> Connectable<I, O> fromTransformer(
    @NonNull final ObservableTransformer<I, O> transformer) {
  checkNotNull(transformer);
  return new Connectable<I, O>() {
    @Nonnull
    @Override
    public Connection<I> connect(com.spotify.mobius.functions.Consumer<O> output) {
      final PublishSubject<I> subject = PublishSubject.create();

      final Disposable disposable =
          subject
              .compose(transformer)
              .subscribe(
                  new Consumer<O>() {
                    @Override
                    public void accept(O value) throws Throwable {
                      output.accept(value);
                    }
                  },
                  new Consumer<Throwable>() {
                    @Override
                    public void accept(Throwable error) throws Throwable {
                      RxJavaPlugins.onError(error);
                    }
                  },
                  new Action() {
                    @Override
                    public void run() throws Throwable {}
                  });

      return new Connection<I>() {
        public void accept(I effect) {
          subject.onNext(effect);
        }

        @Override
        public void dispose() {
          disposable.dispose();
        }
      };
    }
  };
}
 
源代码9 项目: akarnokd-misc   文件: UndeliverableTest.java
@Test
public void test() {
    RxJavaPlugins.setErrorHandler(error -> System.out.println(error));

    PublishProcessor<Integer> main = PublishProcessor.create();
    PublishProcessor<Integer> inner = PublishProcessor.create();

    // switchMapDelayError will delay all errors
    TestSubscriber<Integer> ts = main.switchMapDelayError(v -> inner).test();

    main.onNext(1);

    // the inner fails
    inner.onError(new IOException());

    // the consumer is still clueless
    ts.assertEmpty();

    // the consumer cancels
    ts.cancel();

}
 
 类所在包
 类方法
 同包方法