io.reactivex.subjects.PublishSubject#onNext ( )源码实例Demo

下面列出了io.reactivex.subjects.PublishSubject#onNext ( ) 实例代码,或者点击链接到github查看源代码,也可以在右侧发表评论。

源代码1 项目: RxGroups   文件: ObservableGroupTest.java
@Test public void shouldOverrideExistingSubscriber() throws Exception {
  ObservableGroup group = observableManager.newGroup();
  PublishSubject<String> sourceObservable = PublishSubject.create();
  TestAutoResubscribingObserver testSubscriber1 = new TestAutoResubscribingObserver("foo");
  TestAutoResubscribingObserver testSubscriber2 = new TestAutoResubscribingObserver("foo");

  sourceObservable.compose(group.transform(testSubscriber1)).subscribe(testSubscriber1);
  sourceObservable.compose(group.transform(testSubscriber2)).subscribe(testSubscriber2);

  sourceObservable.onNext("Ruben Aguirre");
  sourceObservable.onComplete();

  testSubscriber1.assertionTarget.assertNotComplete();
  testSubscriber1.assertionTarget.assertNoValues();
  testSubscriber2.assertionTarget.assertComplete();
  testSubscriber2.assertionTarget.assertValue("Ruben Aguirre");
}
 
源代码2 项目: RxGroups   文件: ObservableGroupTest.java
@Test public void shouldReplaceObservablesOfSameTagAndSameGroupId() {
  ObservableGroup group = observableManager.newGroup();
  PublishSubject<String> observable1 = PublishSubject.create();
  PublishSubject<String> observable2 = PublishSubject.create();
  TestAutoResubscribingObserver observer1 = new TestAutoResubscribingObserver("foo");
  TestAutoResubscribingObserver observer2 = new TestAutoResubscribingObserver("foo");
  observable1.compose(group.transform(observer1)).subscribe(observer1);
  observable2.compose(group.transform(observer2)).subscribe(observer2);

  assertThat(group.subscription(fooObserver).isCancelled()).isFalse();
  assertThat(group.hasObservables(fooObserver)).isTrue();

  observable1.onNext("Hello World 1");
  observable1.onComplete();

  observable2.onNext("Hello World 2");
  observable2.onComplete();

  observer2.assertionTarget.awaitTerminalEvent();
  observer2.assertionTarget.assertComplete();
  observer2.assertionTarget.assertValue("Hello World 2");

  observer1.assertionTarget.assertNoValues();
}
 
源代码3 项目: mobius   文件: RxEventSourcesTest.java
@Test
public void disposePreventsFurtherEvents() throws Exception {
  PublishSubject<Integer> subject = PublishSubject.create();
  EventSource<Integer> source = RxEventSources.fromObservables(subject);
  RecordingConsumer<Integer> consumer = new RecordingConsumer<>();

  Disposable d = source.subscribe(consumer);

  subject.onNext(1);
  subject.onNext(2);
  d.dispose();
  subject.onNext(3);

  consumer.waitForChange(50);
  consumer.assertValues(1, 2);
}
 
源代码4 项目: RxGroups   文件: ObservableGroupTest.java
@Test public void shouldQueueMultipleRequests() throws Exception {
  ObservableGroup group = observableManager.newGroup();
  PublishSubject<String> subject1 = PublishSubject.create();
  TestObserver<String> testSubscriber1 = new TestObserver<>();
  PublishSubject<String> subject2 = PublishSubject.create();
  TestObserver<String> testSubscriber2 = new TestObserver<>();

  subject1.compose(group.transform(testSubscriber1)).subscribe(testSubscriber1);
  subject2.compose(group.transform(testSubscriber2)).subscribe(testSubscriber2);
  group.dispose();

  subject1.onNext("Chespirito");
  subject1.onComplete();
  subject2.onNext("Edgar Vivar");
  subject2.onComplete();

  testSubscriber1.assertNotComplete();
  testSubscriber2.assertNotComplete();
  assertThat(group.hasObservables(testSubscriber1)).isEqualTo(true);
  assertThat(group.hasObservables(testSubscriber2)).isEqualTo(true);
}
 
源代码5 项目: RxGroups   文件: ObservableGroupTest.java
@Test public void shouldNotDeliverEventsWhenResubscribedIfLocked() {
  ObservableGroup group = observableManager.newGroup();
  TestAutoResubscribingObserver testObserver = new TestAutoResubscribingObserver("foo");
  PublishSubject<String> sourceObservable = PublishSubject.create();
  sourceObservable.compose(group.transform(testObserver)).subscribe(testObserver);
  group.dispose();

  sourceObservable.onNext("Hello World");
  sourceObservable.onComplete();

  group.lock();
  testObserver = new TestAutoResubscribingObserver("foo");
  group.observable(testObserver).subscribe(testObserver);

  testObserver.assertionTarget.assertNotComplete();
  testObserver.assertionTarget.assertNoValues();

  group.unlock();
  testObserver.assertionTarget.assertComplete();
  testObserver.assertionTarget.assertNoErrors();
  testObserver.assertionTarget.assertValue("Hello World");
  assertThat(group.hasObservables(testObserver)).isEqualTo(false);
}
 
源代码6 项目: RxPermissions   文件: RxPermissionsTest.java
@Test
@TargetApi(Build.VERSION_CODES.M)
public void eachSubscriptionCombined_trigger_granted() {
    TestObserver<Permission> sub = new TestObserver<>();
    String permission = Manifest.permission.READ_PHONE_STATE;
    when(mRxPermissions.isGranted(permission)).thenReturn(false);
    int[] result = new int[]{PackageManager.PERMISSION_GRANTED};
    PublishSubject<Object> trigger = PublishSubject.create();

    trigger.compose(mRxPermissions.ensureEachCombined(permission)).subscribe(sub);
    trigger.onNext(1);
    mRxPermissions.onRequestPermissionsResult(new String[]{permission}, result);

    sub.assertNoErrors();
    sub.assertNotTerminated();
    sub.assertValue(new Permission(permission, true));
}
 
源代码7 项目: ThirtyInch   文件: RxTiPresenterUtilsTest.java
@Test
public void testDeliverLatestToView_SingleItemViewComesAndGoes() throws Exception {
    mPresenter.create();

    PublishSubject<Integer> source = PublishSubject.create();
    TestObserver<Integer> testObserver = new TestObserver<>();

    source
            .compose(RxTiPresenterUtils.<Integer>deliverLatestToView(mPresenter))
            .subscribe(testObserver);

    source.onNext(1);
    source.onNext(2);
    mPresenter.attachView(mView);
    mPresenter.detachView();
    mPresenter.attachView(mView);
    mPresenter.detachView();
    mPresenter.attachView(mView);

    testObserver.assertNotComplete();
    testObserver.assertNoErrors();
    testObserver.assertValuesOnly(2, 2, 2);
}
 
源代码8 项目: akarnokd-misc   文件: FlatMapWithTwoErrors.java
@Test
public void innerCancelled3() {
    PublishSubject<Integer> pp1 = PublishSubject.create();
    PublishSubject<Integer> pp2 = PublishSubject.create();
    
    pp1
    .flatMap(v -> pp2)
    .test();

    pp1.onNext(1);
    assertTrue("No subscribers?", pp2.hasObservers());

    pp1.onError(new Exception());
    
    assertFalse("Has subscribers?", pp2.hasObservers());
}
 
源代码9 项目: RxDownloader   文件: RxDownloader.java
@Override
public void onReceive(Context context, Intent intent) {
    long id = intent.getLongExtra(DownloadManager.EXTRA_DOWNLOAD_ID, 0L);
    PublishSubject<String> publishSubject = subjectMap.get(id);

    if (publishSubject == null)
        return;

    DownloadManager.Query query = new DownloadManager.Query();
    query.setFilterById(id);
    DownloadManager downloadManager = getDownloadManager();
    Cursor cursor = downloadManager.query(query);

    if (!cursor.moveToFirst()) {
        cursor.close();
        downloadManager.remove(id);
        publishSubject.onError(new IllegalStateException("Cursor empty, this shouldn't happened"));
        subjectMap.remove(id);
        return;
    }

    int statusIndex = cursor.getColumnIndex(DownloadManager.COLUMN_STATUS);
    if (DownloadManager.STATUS_SUCCESSFUL != cursor.getInt(statusIndex)) {
        cursor.close();
        downloadManager.remove(id);
        publishSubject.onError(new IllegalStateException("Download Failed"));
        subjectMap.remove(id);
        return;
    }

    int uriIndex = cursor.getColumnIndex(DownloadManager.COLUMN_LOCAL_URI);
    String downloadedPackageUriString = cursor.getString(uriIndex);
    cursor.close();

    publishSubject.onNext(downloadedPackageUriString);
    publishSubject.onComplete();
    subjectMap.remove(id);
}
 
源代码10 项目: mobius   文件: TransformersTest.java
@Test
public void effectPerformerInvokesConsumerAndPassesTheRequestedEffect() throws Exception {
  PublishSubject<String> upstream = PublishSubject.create();
  TestConsumer<String> consumer = new TestConsumer<>();
  upstream.compose(Transformers.fromConsumer(consumer)).subscribe();

  upstream.onNext("First Time");
  assertThat(consumer.getCurrentValue(), is("First Time"));

  upstream.onNext("Do it again!");
  assertThat(consumer.getCurrentValue(), is("Do it again!"));
}
 
源代码11 项目: mobius   文件: TransformersTest.java
@Test
public void effectPerformerInvokesConsumerOnSchedulerAndPassesTheRequestedEffect()
    throws Exception {
  PublishSubject<String> upstream = PublishSubject.create();
  TestConsumer<String> consumer = new TestConsumer<>();
  TestScheduler scheduler = new TestScheduler();
  upstream.compose(Transformers.fromConsumer(consumer, scheduler)).subscribe();

  upstream.onNext("First Time");
  assertThat(consumer.getCurrentValue(), is(equalTo(null)));
  scheduler.triggerActions();
  assertThat(consumer.getCurrentValue(), is("First Time"));
}
 
源代码12 项目: mobius   文件: TransformersTest.java
@Test
public void effectPerformerInvokesFunctionWithReceivedEffectAndEmitsReturnedEvents() {
  PublishSubject<String> upstream = PublishSubject.create();
  TestScheduler scheduler = new TestScheduler();
  Function<String, Integer> function = s -> s.length();
  TestObserver<Integer> observer =
      upstream.compose(Transformers.fromFunction(function, scheduler)).test();

  upstream.onNext("Hello");
  scheduler.triggerActions();
  observer.assertValue(5);
}
 
源代码13 项目: RxGroups   文件: ObservableGroupTest.java
@Test public void shouldNotDeliverResultWhileLocked() throws Exception {
  ObservableGroup group = observableManager.newGroup();
  TestObserver<String> testObserver = new TestObserver<>();
  PublishSubject<String> sourceObservable = PublishSubject.create();

  group.lock();
  sourceObservable.compose(group.transform(testObserver)).subscribe(testObserver);

  sourceObservable.onNext("Chespirito");
  sourceObservable.onComplete();

  testObserver.assertNotComplete();
  testObserver.assertNoValues();
  assertThat(group.hasObservables(testObserver)).isEqualTo(true);
}
 
源代码14 项目: akarnokd-misc   文件: DebounceTimeDrop.java
@Test
public void test() {
    PublishSubject<Integer> source = PublishSubject.create();
    
    TestScheduler scheduler = new TestScheduler();
    
    source.compose(debounceTime(10, TimeUnit.MILLISECONDS, scheduler, v -> {
        System.out.println(
                "Dropped: " + v + " @ T=" + scheduler.now(TimeUnit.MILLISECONDS));
    }))
    .subscribe(v -> System.out.println(
            "Passed: " + v + " @ T=" + scheduler.now(TimeUnit.MILLISECONDS)),
            Throwable::printStackTrace, 
            () -> System.out.println(
                    "Done "  + " @ T=" + scheduler.now(TimeUnit.MILLISECONDS)));
    
    source.onNext(1);
    scheduler.advanceTimeBy(10, TimeUnit.MILLISECONDS);
    
    scheduler.advanceTimeBy(20, TimeUnit.MILLISECONDS);
    
    source.onNext(2);
    scheduler.advanceTimeBy(1, TimeUnit.MILLISECONDS);
    source.onNext(3);
    scheduler.advanceTimeBy(1, TimeUnit.MILLISECONDS);
    source.onNext(4);
    scheduler.advanceTimeBy(1, TimeUnit.MILLISECONDS);
    source.onNext(5);
    scheduler.advanceTimeBy(10, TimeUnit.MILLISECONDS);
    
    scheduler.advanceTimeBy(20, TimeUnit.MILLISECONDS);
    
    source.onNext(6);
    scheduler.advanceTimeBy(10, TimeUnit.MILLISECONDS);
    
    scheduler.advanceTimeBy(20, TimeUnit.MILLISECONDS);
    
    source.onComplete();
}
 
private void demo4() {
    PublishSubject<Integer> observable = PublishSubject.create();

    observable.toFlowable(BackpressureStrategy.MISSING)
            .buffer(10)
            .observeOn(Schedulers.computation())
            .subscribe(v -> log("s", v.toString()), this::log);

    for (int i = 0; i < 1000000; i++) {
        observable.onNext(i);
    }
}
 
源代码16 项目: RxGroups   文件: ObservableGroupTest.java
@Test public void shouldClearQueuedResults() throws Exception {
  ObservableGroup group = observableManager.newGroup();
  PublishSubject<String> sourceObservable = PublishSubject.create();
  TestObserver<String> subscriber1 = new TestObserver<>();

  sourceObservable.compose(group.transform(subscriber1)).subscribe(subscriber1);
  group.dispose();
  sourceObservable.onNext("Hello");
  sourceObservable.onComplete();
  observableManager.destroy(group);

  assertThat(group.hasObservables(fooObserver)).isEqualTo(false);
}
 
源代码17 项目: RxGroups   文件: ObservableGroupTest.java
@Test public void shouldNotDeliverResultWhileUnsubscribed() throws Exception {
  ObservableGroup group = observableManager.newGroup();
  TestObserver<String> testObserver = new TestObserver<>();
  PublishSubject<String> sourceObservable = PublishSubject.create();

  sourceObservable.compose(group.transform(testObserver)).subscribe(testObserver);
  group.dispose();

  sourceObservable.onNext("Roberto Gomez Bolanos");
  sourceObservable.onComplete();

  testObserver.assertNotComplete();
  assertThat(group.hasObservables(testObserver)).isEqualTo(true);
}
 
@Test
public void testRequestOverflow() {
    PublishSubject<Integer> subject = PublishSubject.create();

    TestSubscriber<Integer> sub = subject.toFlowable(BackpressureStrategy.BUFFER) //
            .to(Transformers.reduce(reducer, 2, 5)) //
            .test(Long.MAX_VALUE - 2) //
            .requestMore(Long.MAX_VALUE - 2);
    subject.onNext(1);
    subject.onNext(2);
    subject.onComplete();
    sub.assertValues(3);
}
 
源代码19 项目: mobius   文件: RxConnectables.java
public static <I, O> Connectable<I, O> fromTransformer(
    final ObservableTransformer<I, O> transformer) {
  checkNotNull(transformer);

  Connectable<I, O> actualConnectable =
      new Connectable<I, O>() {
        @Nonnull
        @Override
        public Connection<I> connect(final Consumer<O> output) {
          final PublishSubject<I> subject = PublishSubject.create();

          final Disposable disposable =
              subject
                  .compose(transformer)
                  .subscribe(
                      new io.reactivex.functions.Consumer<O>() {
                        @Override
                        public void accept(O e) {
                          output.accept(e);
                        }
                      },
                      new io.reactivex.functions.Consumer<Throwable>() {
                        @Override
                        public void accept(Throwable throwable) throws Exception {
                          RxJavaPlugins.onError(throwable);
                        }
                      },
                      new Action() {
                        @Override
                        public void run() throws Exception {
                          // TODO: complain loudly! shouldn't ever complete
                        }
                      });

          return new Connection<I>() {
            public void accept(I effect) {
              subject.onNext(effect);
            }

            @Override
            public void dispose() {
              disposable.dispose();
            }
          };
        }
      };

  return new DiscardAfterDisposeConnectable<>(actualConnectable);
}
 
源代码20 项目: akarnokd-misc   文件: DebounceRailTest.java
@Test
public void test() {
    PublishSubject<String> subject = PublishSubject.create();
    
    TestScheduler sch = new TestScheduler();
    
    subject.compose(debounceOnly(v -> v.startsWith("A"), 100, TimeUnit.MILLISECONDS, sch))
    .subscribe(System.out::println, Throwable::printStackTrace, () -> System.out.println("Done"));
    
    subject.onNext("A1");
    
    sch.advanceTimeBy(100, TimeUnit.MILLISECONDS);
    
    subject.onNext("B1");
    sch.advanceTimeBy(1, TimeUnit.MILLISECONDS);

    subject.onNext("C1");
    sch.advanceTimeBy(1, TimeUnit.MILLISECONDS);

    subject.onNext("A2");
    sch.advanceTimeBy(50, TimeUnit.MILLISECONDS);

    subject.onNext("A3");
    sch.advanceTimeBy(100, TimeUnit.MILLISECONDS);

    subject.onNext("A4");
    sch.advanceTimeBy(50, TimeUnit.MILLISECONDS);

    subject.onNext("B2");
    sch.advanceTimeBy(100, TimeUnit.MILLISECONDS);

    subject.onNext("C2");
    sch.advanceTimeBy(100, TimeUnit.MILLISECONDS);
    
    subject.onComplete();
}