类io.reactivex.rxjava3.subjects.PublishSubject源码实例Demo

下面列出了怎么用io.reactivex.rxjava3.subjects.PublishSubject的API类实例代码及写法,或者点击链接到github查看源代码。

源代码1 项目: mobius   文件: RxEventSourcesTest.java
@Test
public void disposePreventsFurtherEvents() throws Exception {
  PublishSubject<Integer> subject = PublishSubject.create();
  EventSource<Integer> source = RxEventSources.fromObservables(subject);
  RecordingConsumer<Integer> consumer = new RecordingConsumer<>();

  Disposable d = source.subscribe(consumer);

  subject.onNext(1);
  subject.onNext(2);
  d.dispose();
  subject.onNext(3);

  consumer.waitForChange(50);
  consumer.assertValues(1, 2);
}
 
源代码2 项目: mobius   文件: TransformersTest.java
@Test
public void processingLongEffectsDoesNotBlockProcessingShorterEffects() {
  final List<String> effects = Arrays.asList("Hello", "Rx");

  PublishSubject<String> upstream = PublishSubject.create();
  Function<String, Integer> sleepyFunction =
      s -> {
        try {
          Thread.sleep(duration(s));
        } catch (InterruptedException ie) {
        }
        return s.length();
      };

  final List<Integer> results = new ArrayList<>();
  upstream
      .compose(Transformers.fromFunction(sleepyFunction, Schedulers.io()))
      .subscribe(results::add);

  Observable.fromIterable(effects).subscribe(upstream);

  await().atMost(durationForEffects(effects)).until(() -> results.equals(expected(effects)));
}
 
源代码3 项目: mobius   文件: MobiusEffectRouterTest.java
@Before
public void setUp() throws Exception {
  cConsumer = new TestConsumer<>();
  dAction = new TestAction();

  ObservableTransformer<TestEffect, TestEvent> router =
      RxMobius.<TestEffect, TestEvent>subtypeEffectHandler()
          .addTransformer(A.class, (Observable<A> as) -> as.map(a -> AEvent.create(a.id())))
          .addTransformer(B.class, (Observable<B> bs) -> bs.map(b -> BEvent.create(b.id())))
          .addConsumer(C.class, cConsumer)
          .addAction(D.class, dAction)
          .addFunction(E.class, e -> AEvent.create(e.id()))
          .build();

  publishSubject = PublishSubject.create();
  testSubscriber = TestObserver.create();

  publishSubject.compose(router).subscribe(testSubscriber);
}
 
源代码4 项目: mobius   文件: MobiusEffectRouterTest.java
@Test
public void effectHandlersShouldBeImmutable() throws Exception {
  // redo some test setup for test case specific conditions
  publishSubject = PublishSubject.create();
  testSubscriber = TestObserver.create();

  RxMobius.SubtypeEffectHandlerBuilder<TestEffect, TestEvent> builder =
      RxMobius.<TestEffect, TestEvent>subtypeEffectHandler()
          .addTransformer(A.class, (Observable<A> as) -> as.map(a -> AEvent.create(a.id())));

  ObservableTransformer<TestEffect, TestEvent> router = builder.build();

  // this should not lead to the effects router being capable of handling B effects
  builder.addTransformer(B.class, bs -> bs.map(b -> BEvent.create(b.id())));

  publishSubject.compose(router).subscribe(testSubscriber);

  B effect = B.create(84);
  publishSubject.onNext(effect);
  publishSubject.onComplete();

  testSubscriber.await();
  testSubscriber.assertError(new UnknownEffectException(effect));
}
 
源代码5 项目: mobius   文件: RxConnectablesTest.java
@Before
public void setUp() throws Exception {
  input = PublishSubject.create();
  connectable =
      new Connectable<String, Integer>() {
        @Nonnull
        @Override
        public Connection<String> connect(final Consumer<Integer> output)
            throws ConnectionLimitExceededException {
          return new Connection<String>() {
            @Override
            public void accept(String value) {
              if (value.equals("crash")) {
                throw new RuntimeException("crashing!");
              }
              output.accept(value.length());
            }

            @Override
            public void dispose() {}
          };
        }
      };
}
 
@Test public void initialValueToNewSubscriberAfterUnsubscribe() {
  PublishSubject<String> subject = PublishSubject.create();
  Observable<String> observable = subject.compose(ReplayingShare.<String>instance());

  TestObserver<String> observer1 = new TestObserver<>();
  observable.subscribe(observer1);
  observer1.assertNoValues();

  subject.onNext("Foo");
  observer1.assertValues("Foo");
  observer1.dispose();

  TestObserver<String> observer2 = new TestObserver<>();
  observable.subscribe(observer2);
  observer2.assertValues("Foo");
}
 
@Test public void valueMissedWhenNoSubscribers() {
  PublishSubject<String> subject = PublishSubject.create();
  Observable<String> observable = subject.compose(ReplayingShare.<String>instance());

  TestObserver<String> observer1 = new TestObserver<>();
  observable.subscribe(observer1);
  observer1.assertNoValues();
  observer1.dispose();

  subject.onNext("Foo");
  observer1.assertNoValues();

  TestObserver<String> observer2 = new TestObserver<>();
  observable.subscribe(observer2);
  observer2.assertNoValues();
}
 
@SuppressWarnings("CheckReturnValue")
@Test public void fatalExceptionDuringReplayThrown() {
  PublishSubject<String> subject = PublishSubject.create();
  Observable<String> observable = subject.compose(ReplayingShare.<String>instance());

  observable.subscribe();
  subject.onNext("Foo");

  Consumer<String> brokenAction = new Consumer<String>() {
    @Override public void accept(String s) {
      throw new OutOfMemoryError("broken!");
    }
  };
  try {
    observable.subscribe(brokenAction);
    fail();
  } catch (OutOfMemoryError e) {
    assertEquals("broken!", e.getMessage());
  }
}
 
@Ignore("No backpressure in Observable")
@Test public void backpressureHonoredWhenCached() {
  PublishSubject<String> subject = PublishSubject.create();
  Observable<String> observable = subject.compose(ReplayingShare.<String>instance());

  TestObserver<String> observer1 = new TestObserver<>();
  observable.subscribe(observer1);
  observer1.assertNoValues();

  subject.onNext("Foo");
  observer1.assertValues("Foo");

  TestObserver<String> observer2 = new TestObserver<>(/*0*/);
  observable.subscribe(observer2);
  observer2.assertNoValues();

  subject.onNext("Bar"); // Replace the cached value...
  observer1.assertValues("Foo", "Bar");

  //observer2.requestMore(1); // ...and ensure new requests see it.
  observer2.assertValues("Bar");
}
 
@Test public void streamsDoNotShareInstances() {
  PublishSubject<String> subjectA = PublishSubject.create();
  Observable<String> observableA = subjectA.compose(ReplayingShare.<String>instance());
  TestObserver<String> observerA1 = new TestObserver<>();
  observableA.subscribe(observerA1);

  PublishSubject<String> subjectB = PublishSubject.create();
  Observable<String> observableB = subjectB.compose(ReplayingShare.<String>instance());
  TestObserver<String> observerB1 = new TestObserver<>();
  observableB.subscribe(observerB1);

  subjectA.onNext("Foo");
  observerA1.assertValues("Foo");
  subjectB.onNext("Bar");
  observerB1.assertValues("Bar");

  TestObserver<String> observerA2 = new TestObserver<>();
  observableA.subscribe(observerA2);
  observerA2.assertValues("Foo");

  TestObserver<String> observerB2 = new TestObserver<>();
  observableB.subscribe(observerB2);
  observerB2.assertValues("Bar");
}
 
@Test public void completeClearsCacheAndResubscribes() {
  List<String> start = new ArrayList<>();
  start.add("initA");

  PublishSubject<String> upstream = PublishSubject.create();
  Observable<String> replayed = upstream.startWithIterable(start).compose(ReplayingShare.<String>instance());

  TestObserver<String> observer1 = new TestObserver<>();
  replayed.subscribe(observer1);
  observer1.assertValues("initA");

  TestObserver<String> observer2 = new TestObserver<>();
  replayed.subscribe(observer2);
  observer1.assertValues("initA");

  upstream.onComplete();
  observer1.assertComplete();
  observer2.assertComplete();

  start.set(0, "initB");

  TestObserver<String> observer3 = new TestObserver<>();
  replayed.subscribe(observer3);
  observer3.assertValues("initB");
}
 
@Test public void errorClearsCacheAndResubscribes() {
  List<String> start = new ArrayList<>();
  start.add("initA");

  PublishSubject<String> upstream = PublishSubject.create();
  Observable<String> replayed = upstream.startWithIterable(start).compose(ReplayingShare.<String>instance());

  TestObserver<String> observer1 = new TestObserver<>();
  replayed.subscribe(observer1);
  observer1.assertValues("initA");

  TestObserver<String> observer2 = new TestObserver<>();
  replayed.subscribe(observer2);
  observer1.assertValues("initA");

  RuntimeException r = new RuntimeException();
  upstream.onError(r);
  observer1.assertError(r);
  observer2.assertError(r);

  start.set(0, "initB");

  TestObserver<String> observer3 = new TestObserver<>();
  replayed.subscribe(observer3);
  observer3.assertValues("initB");
}
 
@Test public void completeClearsCacheAndResubscribesStartingWithDefault() {
  List<String> start = new ArrayList<>();
  start.add("initA");

  PublishSubject<String> upstream = PublishSubject.create();
  Observable<String> replayed =
      upstream.startWithIterable(start).compose(ReplayingShare.createWithDefault("default"));

  TestObserver<String> observer1 = new TestObserver<>();
  replayed.subscribe(observer1);
  observer1.assertValues("default", "initA");

  TestObserver<String> observer2 = new TestObserver<>();
  replayed.subscribe(observer2);
  observer1.assertValues("default", "initA");

  upstream.onComplete();
  observer1.assertComplete();
  observer2.assertComplete();

  start.set(0, "initB");

  TestObserver<String> observer3 = new TestObserver<>();
  replayed.subscribe(observer3);
  observer3.assertValues("default", "initB");
}
 
@Test
public void eventOutOfLifecycle() {
    PublishSubject<String> stream = PublishSubject.create();
    PublishSubject<String> lifecycle = PublishSubject.create();

    TestObserver<String> testObserver = stream
        .compose(RxLifecycle.<String, String>bind(lifecycle, CORRESPONDING_EVENTS))
        .test();

    // Event is out of lifecycle, but this just results in completing the stream
    lifecycle.onNext("destroy");
    stream.onNext("1");

    testObserver.assertNoValues();
    testObserver.assertComplete();
}
 
源代码15 项目: mobius   文件: TransformersTest.java
@Test
public void effectPerformerRunsActionWheneverEffectIsRequested() throws Exception {
  PublishSubject<String> upstream = PublishSubject.create();
  TestAction action = new TestAction();
  upstream.compose(Transformers.fromAction(action)).subscribe();
  upstream.onNext("First Time");
  assertThat(action.getRunCount(), is(1));

  upstream.onNext("One more!");
  assertThat(action.getRunCount(), is(2));
}
 
源代码16 项目: mobius   文件: TransformersTest.java
@Test
public void effectPerformerRunsActionOnSchedulerWheneverEffectIsRequested() throws Exception {
  PublishSubject<String> upstream = PublishSubject.create();
  TestAction action = new TestAction();
  TestScheduler scheduler = new TestScheduler();
  upstream.compose(Transformers.fromAction(action, scheduler)).subscribe();

  upstream.onNext("First Time");
  assertThat(action.getRunCount(), is(0));
  scheduler.triggerActions();
  assertThat(action.getRunCount(), is(1));
}
 
源代码17 项目: mobius   文件: TransformersTest.java
@Test
public void effectPerformerInvokesConsumerAndPassesTheRequestedEffect() throws Exception {
  PublishSubject<String> upstream = PublishSubject.create();
  TestConsumer<String> consumer = new TestConsumer<>();
  upstream.compose(Transformers.fromConsumer(consumer)).subscribe();

  upstream.onNext("First Time");
  assertThat(consumer.getCurrentValue(), is("First Time"));

  upstream.onNext("Do it again!");
  assertThat(consumer.getCurrentValue(), is("Do it again!"));
}
 
源代码18 项目: mobius   文件: TransformersTest.java
@Test
public void effectPerformerInvokesConsumerOnSchedulerAndPassesTheRequestedEffect()
    throws Exception {
  PublishSubject<String> upstream = PublishSubject.create();
  TestConsumer<String> consumer = new TestConsumer<>();
  TestScheduler scheduler = new TestScheduler();
  upstream.compose(Transformers.fromConsumer(consumer, scheduler)).subscribe();

  upstream.onNext("First Time");
  assertThat(consumer.getCurrentValue(), is(equalTo(null)));
  scheduler.triggerActions();
  assertThat(consumer.getCurrentValue(), is("First Time"));
}
 
源代码19 项目: mobius   文件: TransformersTest.java
@Test
public void effectPerformerInvokesFunctionWithReceivedEffectAndEmitsReturnedEvents() {
  PublishSubject<String> upstream = PublishSubject.create();
  TestScheduler scheduler = new TestScheduler();
  Function<String, Integer> function = String::length;
  TestObserver<Integer> observer =
      upstream.compose(Transformers.fromFunction(function, scheduler)).test();

  upstream.onNext("Hello");
  scheduler.triggerActions();
  observer.assertValue(5);
}
 
源代码20 项目: mobius   文件: TransformersTest.java
@Test
public void effectPerformerInvokesFunctionWithReceivedEffectAndErrorsForUnhandledExceptions() {
  PublishSubject<String> upstream = PublishSubject.create();
  TestScheduler scheduler = new TestScheduler();
  Function<String, Integer> function =
      s -> {
        throw new RuntimeException("Something bad happened");
      };
  TestObserver<Integer> observer =
      upstream.compose(Transformers.fromFunction(function, scheduler)).test();

  upstream.onNext("Hello");
  scheduler.triggerActions();
  observer.assertError(RuntimeException.class);
}
 
源代码21 项目: mobius   文件: RxMobiusLoopTest.java
@Test
public void shouldPropagateIncomingErrorsAsUnrecoverable() throws Exception {
  final RxMobiusLoop<Integer, String, Boolean> loop =
      new RxMobiusLoop<>(builder, "", Collections.emptySet());

  PublishSubject<Integer> input = PublishSubject.create();
  TestObserver<String> subscriber = input.compose(loop).test();
  Exception expected = new RuntimeException("expected");
  input.onError(expected);

  subscriber.awaitDone(1, TimeUnit.SECONDS);
  subscriber.assertError(new UnrecoverableIncomingException(expected));
  assertEquals(0, connection.valueCount());
}
 
源代码22 项目: mobius   文件: MobiusEffectRouterTest.java
@Test
public void shouldHandleNullRxJavaErrorHandler() throws Exception {
  // given no RxJava error handler
  RxJavaPlugins.setErrorHandler(null);

  // and a router with a broken effect handler
  publishSubject = PublishSubject.create();
  testSubscriber = TestObserver.create();

  final RuntimeException expected = new RuntimeException("expected!");
  ObservableTransformer<TestEffect, TestEvent> router =
      RxMobius.<TestEffect, TestEvent>subtypeEffectHandler()
          .addFunction(
              A.class,
              a -> {
                throw expected;
              })
          .build();

  publishSubject.compose(router).subscribe(testSubscriber);

  // when an event is sent, it doesn't crash (the exception does get printed to stderr)
  publishSubject.onNext(A.create(1));

  // and the right exception is forwarded to the test subscriber
  testSubscriber.assertError(t -> t == expected);
}
 
@Test public void noInitialValue() {
  PublishSubject<String> subject = PublishSubject.create();
  Observable<String> observable = subject.compose(ReplayingShare.<String>instance());

  TestObserver<String> observer = new TestObserver<>();
  observable.subscribe(observer);
  observer.assertNoValues();
}
 
@Test public void initialValueToNewSubscriber() {
  PublishSubject<String> subject = PublishSubject.create();
  Observable<String> observable = subject.compose(ReplayingShare.<String>instance());

  TestObserver<String> observer1 = new TestObserver<>();
  observable.subscribe(observer1);
  observer1.assertNoValues();

  subject.onNext("Foo");
  observer1.assertValues("Foo");

  TestObserver<String> observer2 = new TestObserver<>();
  observable.subscribe(observer2);
  observer2.assertValues("Foo");
}
 
@Test public void refCountToUpstream() {
  PublishSubject<String> subject = PublishSubject.create();

  final AtomicInteger count = new AtomicInteger();
  Observable<String> observable = subject //
      .doOnSubscribe(new Consumer<Disposable>() {
        @Override public void accept(Disposable disposable) throws Exception {
          count.incrementAndGet();
        }
      }) //
      .doOnDispose(new Action() {
        @Override public void run() throws Exception {
          count.decrementAndGet();
        }
      }) //
      .compose(ReplayingShare.<String>instance());

  Disposable disposable1 = observable.subscribeWith(new TestObserver<String>());
  assertEquals(1, count.get());

  Disposable disposable2 = observable.subscribeWith(new TestObserver<String>());
  assertEquals(1, count.get());

  Disposable disposable3 = observable.subscribeWith(new TestObserver<String>());
  assertEquals(1, count.get());

  disposable1.dispose();
  assertEquals(1, count.get());

  disposable3.dispose();
  assertEquals(1, count.get());

  disposable2.dispose();
  assertEquals(0, count.get());
}
 
@Test public void unsubscribeBeforeSubscribePreventsCacheEmission() {
  PublishSubject<String> upstream = PublishSubject.create();
  Observable<String> replayed = upstream.compose(ReplayingShare.<String>instance());
  replayed.subscribe();
  upstream.onNext("something to cache");

  TestObserver<String> testObserver = new TestObserver<>();
  testObserver.dispose();
  replayed.subscribe(testObserver);
  testObserver.assertNoValues();
}
 
@Test public void defaultValueOnSubscribe() {
  PublishSubject<String> subject = PublishSubject.create();
  Observable<String> observable = subject.compose(ReplayingShare.createWithDefault("default"));

  TestObserver<String> observer1 = new TestObserver<>();
  observable.subscribe(observer1);
  observer1.assertValues("default");

  subject.onNext("Foo");
  observer1.assertValues("default", "Foo");
}
 
@Test public void defaultValueIsOverriddenByLatestEmissionForNewSubscriber() {
  PublishSubject<String> subject = PublishSubject.create();
  Observable<String> observable = subject.compose(ReplayingShare.createWithDefault("default"));

  TestObserver<String> observer1 = new TestObserver<>();
  observable.subscribe(observer1);
  observer1.assertValues("default");

  subject.onNext("Foo");
  observer1.assertValues("default", "Foo");

  TestObserver<String> observer2 = new TestObserver<>();
  observable.subscribe(observer2);
  observer2.assertValues("Foo");
}
 
@Test public void errorClearsCacheAndResubscribesStartingWithDefault() {
  List<String> start = new ArrayList<>();
  start.add("initA");

  PublishSubject<String> upstream = PublishSubject.create();
  Observable<String> replayed =
      upstream.startWithIterable(start).compose(ReplayingShare.createWithDefault("default"));

  TestObserver<String> observer1 = new TestObserver<>();
  replayed.subscribe(observer1);
  observer1.assertValues("default", "initA");

  TestObserver<String> observer2 = new TestObserver<>();
  replayed.subscribe(observer2);
  observer1.assertValues("default", "initA");

  RuntimeException r = new RuntimeException();
  upstream.onError(r);
  observer1.assertError(r);
  observer2.assertError(r);

  start.set(0, "initB");

  TestObserver<String> observer3 = new TestObserver<>();
  replayed.subscribe(observer3);
  observer3.assertValues("default", "initB");
}
 
@Test
public void eventThrowsBadException() {
    PublishSubject<String> stream = PublishSubject.create();
    PublishSubject<String> lifecycle = PublishSubject.create();

    TestObserver<String> testObserver = stream
        .compose(RxLifecycle.<String, String>bind(lifecycle, CORRESPONDING_EVENTS))
        .test();

    // We get an error from the function for this lifecycle event
    lifecycle.onNext("ick");
    stream.onNext("1");

    testObserver.assertNoValues();

    // We only want to check for our IllegalArgumentException, but may have
    // to wade through a CompositeException to get at it.
    testObserver.assertError(new Predicate<Throwable>() {
        @Override
        public boolean test(Throwable throwable) throws Exception {
            if (throwable instanceof CompositeException) {
                CompositeException ce = (CompositeException) throwable;
                for (Throwable t : ce.getExceptions()) {
                    if (t instanceof IllegalArgumentException) {
                        return true;
                    }
                }
            }

            return false;
        }
    });
}
 
 类所在包
 同包方法