类io.reactivex.rxjava3.subscribers.TestSubscriber源码实例Demo

下面列出了怎么用io.reactivex.rxjava3.subscribers.TestSubscriber的API类实例代码及写法,或者点击链接到github查看源代码。

@Test public void initialValueToNewSubscriberAfterUnsubscribe() {
  PublishProcessor<String> subject = PublishProcessor.create();
  Flowable<String> flowable = subject.compose(ReplayingShare.<String>instance());

  TestSubscriber<String> subscriber1 = new TestSubscriber<>();
  flowable.subscribe(subscriber1);
  subscriber1.assertNoValues();

  subject.onNext("Foo");
  subscriber1.assertValues("Foo");
  subscriber1.cancel();

  TestSubscriber<String> subscriber2 = new TestSubscriber<>();
  flowable.subscribe(subscriber2);
  subscriber2.assertValues("Foo");
}
 
@Test public void valueMissedWhenNoSubscribers() {
  PublishProcessor<String> subject = PublishProcessor.create();
  Flowable<String> flowable = subject.compose(ReplayingShare.<String>instance());

  TestSubscriber<String> subscriber1 = new TestSubscriber<>();
  flowable.subscribe(subscriber1);
  subscriber1.assertNoValues();
  subscriber1.cancel();

  subject.onNext("Foo");
  subscriber1.assertNoValues();

  TestSubscriber<String> subscriber2 = new TestSubscriber<>();
  flowable.subscribe(subscriber2);
  subscriber2.assertNoValues();
}
 
@Test public void backpressureHonoredWhenCached() {
  PublishProcessor<String> subject = PublishProcessor.create();
  Flowable<String> flowable = subject.compose(ReplayingShare.<String>instance());

  TestSubscriber<String> subscriber1 = new TestSubscriber<>();
  flowable.subscribe(subscriber1);
  subscriber1.assertNoValues();

  subject.onNext("Foo");
  subscriber1.assertValues("Foo");

  TestSubscriber<String> subscriber2 = new TestSubscriber<>(0);
  flowable.subscribe(subscriber2);
  subscriber2.assertNoValues();

  subject.onNext("Bar"); // Replace the cached value...
  subscriber2.request(1); // ...and ensure new requests see it.
  subscriber2.assertValues("Bar");
}
 
@Test public void streamsDoNotShareInstances() {
  PublishProcessor<String> subjectA = PublishProcessor.create();
  Flowable<String> flowableA = subjectA.compose(ReplayingShare.<String>instance());
  TestSubscriber<String> subscriberA1 = new TestSubscriber<>();
  flowableA.subscribe(subscriberA1);

  PublishProcessor<String> subjectB = PublishProcessor.create();
  Flowable<String> flowableB = subjectB.compose(ReplayingShare.<String>instance());
  TestSubscriber<String> subscriberB1 = new TestSubscriber<>();
  flowableB.subscribe(subscriberB1);

  subjectA.onNext("Foo");
  subscriberA1.assertValues("Foo");
  subjectB.onNext("Bar");
  subscriberB1.assertValues("Bar");

  TestSubscriber<String> subscriberA2 = new TestSubscriber<>();
  flowableA.subscribe(subscriberA2);
  subscriberA2.assertValues("Foo");

  TestSubscriber<String> subscriberB2 = new TestSubscriber<>();
  flowableB.subscribe(subscriberB2);
  subscriberB2.assertValues("Bar");
}
 
@Test public void completeClearsCacheAndResubscribes() {
  List<String> start = new ArrayList<>();
  start.add("initA");

  PublishProcessor<String> upstream = PublishProcessor.create();
  Flowable<String> replayed = upstream.startWithIterable(start).compose(ReplayingShare.<String>instance());

  TestSubscriber<String> subscriber1 = new TestSubscriber<>();
  replayed.subscribe(subscriber1);
  subscriber1.assertValues("initA");

  TestSubscriber<String> observer2 = new TestSubscriber<>();
  replayed.subscribe(observer2);
  subscriber1.assertValues("initA");

  upstream.onComplete();
  subscriber1.assertComplete();
  observer2.assertComplete();

  start.set(0, "initB");

  TestSubscriber<String> observer3 = new TestSubscriber<>();
  replayed.subscribe(observer3);
  observer3.assertValues("initB");
}
 
@Test public void errorClearsCacheAndResubscribes() {
  List<String> start = new ArrayList<>();
  start.add("initA");

  PublishProcessor<String> upstream = PublishProcessor.create();
  Flowable<String> replayed = upstream.startWithIterable(start).compose(ReplayingShare.<String>instance());

  TestSubscriber<String> subscriber1 = new TestSubscriber<>();
  replayed.subscribe(subscriber1);
  subscriber1.assertValues("initA");

  TestSubscriber<String> observer2 = new TestSubscriber<>();
  replayed.subscribe(observer2);
  subscriber1.assertValues("initA");

  RuntimeException r = new RuntimeException();
  upstream.onError(r);
  subscriber1.assertError(r);
  observer2.assertError(r);

  start.set(0, "initB");

  TestSubscriber<String> observer3 = new TestSubscriber<>();
  replayed.subscribe(observer3);
  observer3.assertValues("initB");
}
 
@Test public void completeClearsCacheAndResubscribesStartingWithDefault() {
  List<String> start = new ArrayList<>();
  start.add("initA");

  PublishProcessor<String> upstream = PublishProcessor.create();
  Flowable<String> replayed =
      upstream.startWithIterable(start).compose(ReplayingShare.createWithDefault("default"));

  TestSubscriber<String> subscriber1 = new TestSubscriber<>();
  replayed.subscribe(subscriber1);
  subscriber1.assertValues("default", "initA");

  TestSubscriber<String> observer2 = new TestSubscriber<>();
  replayed.subscribe(observer2);
  subscriber1.assertValues("default", "initA");

  upstream.onComplete();
  subscriber1.assertComplete();
  observer2.assertComplete();

  start.set(0, "initB");

  TestSubscriber<String> observer3 = new TestSubscriber<>();
  replayed.subscribe(observer3);
  observer3.assertValues("default", "initB");
}
 
@Test
public void doNotTimeout() {
    given(timeLimiter.getTimeLimiterConfig())
        .willReturn(toConfig(Duration.ofMinutes(1)));
    TestSubscriber<Long> subscriber = Flowable.interval(1, TimeUnit.SECONDS)
        .take(2)
        .compose(TimeLimiterTransformer.of(timeLimiter))
        .test();

    testScheduler.advanceTimeBy(1, TimeUnit.MINUTES);

    subscriber.assertValueCount(2)
        .assertComplete();
    then(timeLimiter).should(times(3))
        .onSuccess();
}
 
@Test
public void timeoutAfterInitial() throws InterruptedException {
    int timeout = 2;
    int initialDelay = 1;
    int periodDelay = 3;
    given(timeLimiter.getTimeLimiterConfig())
        .willReturn(toConfig(Duration.ofSeconds(timeout)));
    TestSubscriber<Long> subscriber = Flowable
        .interval(initialDelay, periodDelay, TimeUnit.SECONDS)
        .compose(TimeLimiterTransformer.of(timeLimiter))
        .test();

    testScheduler.advanceTimeBy(1, TimeUnit.MINUTES);

    subscriber.await()
        .assertValueCount(1)
        .assertError(TimeoutException.class);
    then(timeLimiter).should()
        .onSuccess();
    then(timeLimiter).should()
        .onError(any(TimeoutException.class));
}
 
源代码10 项目: cxf   文件: JAXRSRxJava3FlowableTest.java
@Test
public void testGetHelloWorldJson() throws Exception {
    String address = "http://localhost:" + PORT + "/rx3/flowable/textJson";
    List<Object> providers = new LinkedList<>();
    providers.add(new JacksonJsonProvider());
    providers.add(new FlowableRxInvokerProvider());
    WebClient wc = WebClient.create(address, providers);
    Flowable<HelloWorldBean> obs = wc.accept("application/json")
        .rx(FlowableRxInvoker.class)
        .get(HelloWorldBean.class);

    final TestSubscriber<HelloWorldBean> subscriber = new TestSubscriber<>();
    obs.subscribe(subscriber);

    subscriber.await(3, TimeUnit.SECONDS);
    subscriber.assertResult(new HelloWorldBean("Hello", "World"));
}
 
@Test public void noInitialValue() {
  PublishProcessor<String> subject = PublishProcessor.create();
  Flowable<String> flowable = subject.compose(ReplayingShare.<String>instance());

  TestSubscriber<String> subscriber = new TestSubscriber<>();
  flowable.subscribe(subscriber);
  subscriber.assertNoValues();
}
 
@Test public void initialValueToNewSubscriber() {
  PublishProcessor<String> subject = PublishProcessor.create();
  Flowable<String> flowable = subject.compose(ReplayingShare.<String>instance());

  TestSubscriber<String> subscriber1 = new TestSubscriber<>();
  flowable.subscribe(subscriber1);
  subscriber1.assertNoValues();

  subject.onNext("Foo");
  subscriber1.assertValues("Foo");

  TestSubscriber<String> subscriber2 = new TestSubscriber<>();
  flowable.subscribe(subscriber2);
  subscriber2.assertValues("Foo");
}
 
@Test public void refCountToUpstream() {
  PublishProcessor<String> subject = PublishProcessor.create();

  final AtomicInteger count = new AtomicInteger();
  Flowable<String> flowable = subject //
      .doOnSubscribe(new Consumer<Subscription>() {
        @Override public void accept(Subscription subscription) {
          count.incrementAndGet();
        }
      }) //
      .doOnCancel(new Action() {
        @Override public void run() {
          count.decrementAndGet();
        }
      }) //
      .compose(ReplayingShare.<String>instance());

  TestSubscriber<String> disposable1 = flowable.subscribeWith(new TestSubscriber<String>());
  assertEquals(1, count.get());

  TestSubscriber<String> disposable2 = flowable.subscribeWith(new TestSubscriber<String>());
  assertEquals(1, count.get());

  TestSubscriber<String> disposable3 = flowable.subscribeWith(new TestSubscriber<String>());
  assertEquals(1, count.get());

  disposable1.cancel();
  assertEquals(1, count.get());

  disposable3.cancel();
  assertEquals(1, count.get());

  disposable2.cancel();
  assertEquals(0, count.get());
}
 
@Test public void unsubscribeBeforeSubscribePreventsCacheEmission() {
  PublishProcessor<String> upstream = PublishProcessor.create();
  Flowable<String> replayed = upstream.compose(ReplayingShare.<String>instance());
  replayed.subscribe();
  upstream.onNext("something to cache");

  TestSubscriber<String> testSubscriber = new TestSubscriber<>();
  testSubscriber.cancel();
  replayed.subscribe(testSubscriber);
  testSubscriber.assertNoValues();
}
 
@Test public void defaultValueOnSubscribe() {
  PublishProcessor<String> subject = PublishProcessor.create();
  Flowable<String> flowable = subject.compose(ReplayingShare.createWithDefault("default"));

  TestSubscriber<String> subscriber1 = new TestSubscriber<>();
  flowable.subscribe(subscriber1);
  subscriber1.assertValues("default");

  subject.onNext("Foo");
  subscriber1.assertValues("default", "Foo");
}
 
@Test public void defaultValueIsOverriddenByLatestEmissionForNewSubscriber() {
  PublishProcessor<String> subject = PublishProcessor.create();
  Flowable<String> flowable = subject.compose(ReplayingShare.createWithDefault("default"));

  TestSubscriber<String> subscriber1 = new TestSubscriber<>();
  flowable.subscribe(subscriber1);
  subscriber1.assertValues("default");

  subject.onNext("Foo");
  subscriber1.assertValues("default", "Foo");

  TestSubscriber<String> observer2 = new TestSubscriber<>();
  flowable.subscribe(observer2);
  observer2.assertValues("Foo");
}
 
@Test public void errorClearsCacheAndResubscribesStartingWithDefault() {
  List<String> start = new ArrayList<>();
  start.add("initA");

  PublishProcessor<String> upstream = PublishProcessor.create();
  Flowable<String> replayed =
      upstream.startWithIterable(start).compose(ReplayingShare.createWithDefault("default"));

  TestSubscriber<String> subscriber1 = new TestSubscriber<>();
  replayed.subscribe(subscriber1);
  subscriber1.assertValues("default", "initA");

  TestSubscriber<String> observer2 = new TestSubscriber<>();
  replayed.subscribe(observer2);
  subscriber1.assertValues("default", "initA");

  RuntimeException r = new RuntimeException();
  upstream.onError(r);
  subscriber1.assertError(r);
  observer2.assertError(r);

  start.set(0, "initB");

  TestSubscriber<String> observer3 = new TestSubscriber<>();
  replayed.subscribe(observer3);
  observer3.assertValues("default", "initB");
}
 
@Test
public void otherError() {
    given(timeLimiter.getTimeLimiterConfig())
        .willReturn(toConfig(Duration.ZERO));
    TestSubscriber<Object> subscriber = Flowable.error(new RuntimeException())
        .compose(TimeLimiterTransformer.of(timeLimiter))
        .test();

    testScheduler.advanceTimeBy(1, TimeUnit.MINUTES);

    subscriber.assertError(RuntimeException.class);
    then(timeLimiter).should()
        .onError(any(RuntimeException.class));
}
 
@Test
public void timeout() {
    given(timeLimiter.getTimeLimiterConfig())
        .willReturn(toConfig(Duration.ZERO));
    TestSubscriber<Long> subscriber = Flowable.interval(1, TimeUnit.MINUTES)
        .compose(TimeLimiterTransformer.of(timeLimiter))
        .test();

    testScheduler.advanceTimeBy(1, TimeUnit.MINUTES);

    subscriber.assertError(TimeoutException.class);
    then(timeLimiter).should()
        .onError(any(TimeoutException.class));
}
 
@Test
public void timeoutEmpty() {
    given(timeLimiter.getTimeLimiterConfig())
        .willReturn(toConfig(Duration.ZERO));
    TestSubscriber<Object> subscriber = Flowable.empty()
        .delay(1, TimeUnit.MINUTES)
        .compose(TimeLimiterTransformer.of(timeLimiter))
        .test();

    testScheduler.advanceTimeBy(1, TimeUnit.MINUTES);

    subscriber.assertError(TimeoutException.class);
    then(timeLimiter).should()
        .onError(any(TimeoutException.class));
}
 
@Test
public void doNotTimeoutEmpty() {
    given(timeLimiter.getTimeLimiterConfig())
        .willReturn(toConfig(Duration.ofMinutes(1)));

    TestSubscriber<Object> subscriber = Flowable.empty()
        .compose(TimeLimiterTransformer.of(timeLimiter))
        .test();

    subscriber.assertComplete();
    then(timeLimiter).should()
        .onSuccess();
}
 
@Test
public void noEvents() {
    TestSubscriber<String> testSubscriber = stream
        .compose(RxLifecycle.<String, String>bind(lifecycle, CORRESPONDING_EVENTS))
        .test();

    stream.onNext("1");
    stream.onNext("2");
    testSubscriber.assertValues("1", "2");
    testSubscriber.assertNotComplete();
}
 
@Test
public void oneStartEvent() {
    TestSubscriber<String> testSubscriber = stream
        .compose(RxLifecycle.<String, String>bind(lifecycle, CORRESPONDING_EVENTS))
        .test();

    lifecycle.onNext("create");
    stream.onNext("1");
    stream.onNext("2");

    testSubscriber.assertValues("1", "2");
    testSubscriber.assertNotComplete();
}
 
@Test
public void twoOpenEvents() {
    TestSubscriber<String> testSubscriber = stream
        .compose(RxLifecycle.<String, String>bind(lifecycle, CORRESPONDING_EVENTS))
        .test();

    lifecycle.onNext("create");
    stream.onNext("1");
    lifecycle.onNext("start");
    stream.onNext("2");

    testSubscriber.assertValues("1", "2");
    testSubscriber.assertNotComplete();
}
 
@Test
public void openAndCloseEvent() {
    TestSubscriber<String> testSubscriber = stream
        .compose(RxLifecycle.<String, String>bind(lifecycle, CORRESPONDING_EVENTS))
        .test();

    lifecycle.onNext("create");
    stream.onNext("1");
    lifecycle.onNext("destroy");
    stream.onNext("2");

    testSubscriber.assertValues("1");
    testSubscriber.assertComplete();
}
 
@Test
public void noEvent() {
    TestSubscriber<String> testSubscriber = stream
        .compose(RxLifecycle.<String, String>bind(lifecycle))
        .test();

    stream.onNext("1");
    stream.onNext("2");
    testSubscriber.assertValues("1", "2");
    testSubscriber.assertNotComplete();
}
 
@Test
public void oneEvent() {
    TestSubscriber<String> testSubscriber = stream
        .compose(RxLifecycle.<String, String>bind(lifecycle))
        .test();

    stream.onNext("1");
    lifecycle.onNext("stop");
    stream.onNext("2");

    testSubscriber.assertValues("1");
    testSubscriber.assertComplete();
}
 
@Test
public void noEvents() {
    TestSubscriber<String> testSubscriber = stream
        .compose(RxLifecycle.<String, String>bindUntilEvent(lifecycle, "stop"))
        .test();

    stream.onNext("1");
    stream.onNext("2");
    testSubscriber.assertValues("1", "2");
    testSubscriber.assertNotComplete();
}
 
@Test
public void oneWrongEvent() {
    TestSubscriber<String> testSubscriber = stream
        .compose(RxLifecycle.<String, String>bindUntilEvent(lifecycle, "stop"))
        .test();

    stream.onNext("1");
    lifecycle.onNext("keep going");
    stream.onNext("2");

    testSubscriber.assertValues("1", "2");
    testSubscriber.assertNotComplete();
}
 
@Test
public void twoEvents() {
    TestSubscriber<String> testSubscriber = stream
        .compose(RxLifecycle.<String, String>bindUntilEvent(lifecycle, "stop"))
        .test();

    stream.onNext("1");
    lifecycle.onNext("keep going");
    stream.onNext("2");
    lifecycle.onNext("stop");
    stream.onNext("3");

    testSubscriber.assertValues("1", "2");
    testSubscriber.assertComplete();
}
 
 类所在包
 同包方法