io.reactivex.subjects.PublishSubject#onComplete ( )源码实例Demo

下面列出了io.reactivex.subjects.PublishSubject#onComplete ( ) 实例代码,或者点击链接到github查看源代码,也可以在右侧发表评论。

源代码1 项目: RxGroups   文件: ObservableGroupTest.java
@Test public void shouldAutoResubscribeAfterUnlock() throws InterruptedException {
  ObservableGroup group = observableManager.newGroup();
  TestObserver<String> testObserver = new TestObserver<>();
  PublishSubject<String> sourceObservable = PublishSubject.create();

  group.lock();
  sourceObservable.compose(group.transform(testObserver)).subscribe(testObserver);

  sourceObservable.onNext("Chespirito");
  sourceObservable.onComplete();

  group.unlock();

  testObserver.assertComplete();
  testObserver.assertNoErrors();
  testObserver.assertValue("Chespirito");
  assertThat(group.hasObservables(fooObserver)).isEqualTo(false);
}
 
源代码2 项目: RxGroups   文件: ObservableGroupTest.java
/**
 * The same observable tag can be used so long as it is associated with a different observer tag.
 */
@Test public void shouldNotReplaceObservableOfSameTagAndSameGroupIdAndDifferentObservers() {
  ObservableGroup group = observableManager.newGroup();
  PublishSubject<String> observable1 = PublishSubject.create();
  TestObserver<String> observer1 = new TestObserver<>();
  TestObserver<String> observer2 = new TestObserver<>();
  String sharedObservableTag = "sharedTag";
  observable1.compose(group.transform(observer1, sharedObservableTag)).subscribe(observer1);
  observable1.compose(group.transform(observer2, sharedObservableTag)).subscribe(observer2);

  assertThat(group.subscription(observer1, sharedObservableTag).isCancelled()).isFalse();
  assertThat(group.hasObservables(observer1)).isTrue();

  assertThat(group.subscription(observer2, sharedObservableTag).isCancelled()).isFalse();
  assertThat(group.hasObservables(observer2)).isTrue();

  observable1.onNext("Hello World 1");
  observable1.onComplete();

  observer2.assertComplete();
  observer2.assertValue("Hello World 1");

  observer1.assertComplete();
  observer1.assertValue("Hello World 1");
}
 
源代码3 项目: RxGroups   文件: ObservableGroupTest.java
@Test public void shouldQueueMultipleRequests() throws Exception {
  ObservableGroup group = observableManager.newGroup();
  PublishSubject<String> subject1 = PublishSubject.create();
  TestObserver<String> testSubscriber1 = new TestObserver<>();
  PublishSubject<String> subject2 = PublishSubject.create();
  TestObserver<String> testSubscriber2 = new TestObserver<>();

  subject1.compose(group.transform(testSubscriber1)).subscribe(testSubscriber1);
  subject2.compose(group.transform(testSubscriber2)).subscribe(testSubscriber2);
  group.dispose();

  subject1.onNext("Chespirito");
  subject1.onComplete();
  subject2.onNext("Edgar Vivar");
  subject2.onComplete();

  testSubscriber1.assertNotComplete();
  testSubscriber2.assertNotComplete();
  assertThat(group.hasObservables(testSubscriber1)).isEqualTo(true);
  assertThat(group.hasObservables(testSubscriber2)).isEqualTo(true);
}
 
源代码4 项目: RxGroups   文件: ObservableGroupTest.java
@Test public void shouldReplaceObservablesOfSameTagAndSameGroupId() {
  ObservableGroup group = observableManager.newGroup();
  PublishSubject<String> observable1 = PublishSubject.create();
  PublishSubject<String> observable2 = PublishSubject.create();
  TestAutoResubscribingObserver observer1 = new TestAutoResubscribingObserver("foo");
  TestAutoResubscribingObserver observer2 = new TestAutoResubscribingObserver("foo");
  observable1.compose(group.transform(observer1)).subscribe(observer1);
  observable2.compose(group.transform(observer2)).subscribe(observer2);

  assertThat(group.subscription(fooObserver).isCancelled()).isFalse();
  assertThat(group.hasObservables(fooObserver)).isTrue();

  observable1.onNext("Hello World 1");
  observable1.onComplete();

  observable2.onNext("Hello World 2");
  observable2.onComplete();

  observer2.assertionTarget.awaitTerminalEvent();
  observer2.assertionTarget.assertComplete();
  observer2.assertionTarget.assertValue("Hello World 2");

  observer1.assertionTarget.assertNoValues();
}
 
源代码5 项目: RxGroups   文件: ObservableGroupTest.java
@Test public void testUnsubscribeWhenLocked() {
  ObservableGroup group = observableManager.newGroup();
  TestObserver<String> testObserver = new TestObserver<>();
  PublishSubject<String> sourceObservable = PublishSubject.create();

  sourceObservable.compose(group.transform(testObserver)).subscribe(testObserver);
  group.lock();
  group.dispose();

  sourceObservable.onNext("Chespirito");
  sourceObservable.onComplete();

  group.unlock();

  testObserver.assertNotComplete();
  testObserver.assertNoValues();
  assertThat(group.hasObservables(testObserver)).isEqualTo(true);
}
 
private void doSomeWork() {

        PublishSubject<Integer> source = PublishSubject.create();
        ConnectableObservable<Integer> connectableObservable = source.replay(3); // bufferSize = 3 to retain 3 values to replay
        connectableObservable.connect(); // connecting the connectableObservable

        connectableObservable.subscribe(getFirstObserver());

        source.onNext(1);
        source.onNext(2);
        source.onNext(3);
        source.onNext(4);
        source.onComplete();

        /*
         * it will emit 2, 3, 4 as (count = 3), retains the 3 values for replay
         */
        connectableObservable.subscribe(getSecondObserver());

    }
 
源代码7 项目: RxGroups   文件: SubscriptionProxyTest.java
@Test public void testUnsubscribe() {
  TestObserver<String> observer = new TestObserver<>();
  PublishSubject<String> subject = PublishSubject.create();
  SubscriptionProxy<String> proxy = SubscriptionProxy.create(subject);

  proxy.subscribe(observer);
  proxy.dispose();

  subject.onNext("Avanti!");
  subject.onComplete();

  assertThat(proxy.isDisposed()).isEqualTo(true);
  observer.awaitTerminalEvent(10, TimeUnit.MILLISECONDS);
  observer.assertNotComplete();
  observer.assertNoValues();
}
 
源代码8 项目: RxGroups   文件: SubscriptionProxyTest.java
@Test public void multipleSubscribesForSameObserverShouldBeIgnored() {
  TestObserver<String> observer = new TestObserver<>();
  PublishSubject<String> subject = PublishSubject.create();
  SubscriptionProxy<String> proxy = SubscriptionProxy.create(subject);

  proxy.subscribe(observer);
  proxy.subscribe(observer);
  proxy.dispose();

  subject.onNext("Avanti!");
  subject.onComplete();

  assertThat(proxy.isDisposed()).isEqualTo(true);
  observer.awaitTerminalEvent(10, TimeUnit.MILLISECONDS);
  observer.assertNotComplete();
  observer.assertNoValues();
}
 
源代码9 项目: RxPermissions   文件: RxPermissionsFragment.java
void onRequestPermissionsResult(String permissions[], int[] grantResults, boolean[] shouldShowRequestPermissionRationale) {
    for (int i = 0, size = permissions.length; i < size; i++) {
        log("onRequestPermissionsResult  " + permissions[i]);
        // Find the corresponding subject
        PublishSubject<Permission> subject = mSubjects.get(permissions[i]);
        if (subject == null) {
            // No subject found
            Log.e(RxPermissions.TAG, "RxPermissions.onRequestPermissionsResult invoked but didn't find the corresponding permission request.");
            return;
        }
        mSubjects.remove(permissions[i]);
        boolean granted = grantResults[i] == PackageManager.PERMISSION_GRANTED;
        subject.onNext(new Permission(permissions[i], granted, shouldShowRequestPermissionRationale[i]));
        subject.onComplete();
    }
}
 
源代码10 项目: RxGroups   文件: ObservableGroupTest.java
@Test public void shouldDeliverQueuedEventsWhenResubscribed() throws Exception {
  ObservableGroup group = observableManager.newGroup();
  TestAutoResubscribingObserver resubscribingObserver = new TestAutoResubscribingObserver("foo");
  PublishSubject<String> sourceObservable = PublishSubject.create();
  sourceObservable.compose(group.transform(resubscribingObserver))
      .subscribe(resubscribingObserver);
  group.dispose();

  sourceObservable.onNext("Hello World");
  sourceObservable.onComplete();

  resubscribingObserver.assertionTarget.assertNotComplete();
  resubscribingObserver.assertionTarget.assertNoValues();

  // TestObserver cannot be reused after being disposed in RxJava2
  resubscribingObserver = new TestAutoResubscribingObserver("foo");
  group.observable(resubscribingObserver).subscribe(resubscribingObserver);

  resubscribingObserver.assertionTarget.assertComplete();
  resubscribingObserver.assertionTarget.assertValue("Hello World");
  assertThat(group.hasObservables(resubscribingObserver)).isEqualTo(false);
}
 
源代码11 项目: RxGroups   文件: ObservableGroupTest.java
@Test public void shouldOverrideExistingSubscriber() throws Exception {
  ObservableGroup group = observableManager.newGroup();
  PublishSubject<String> sourceObservable = PublishSubject.create();
  TestAutoResubscribingObserver testSubscriber1 = new TestAutoResubscribingObserver("foo");
  TestAutoResubscribingObserver testSubscriber2 = new TestAutoResubscribingObserver("foo");

  sourceObservable.compose(group.transform(testSubscriber1)).subscribe(testSubscriber1);
  sourceObservable.compose(group.transform(testSubscriber2)).subscribe(testSubscriber2);

  sourceObservable.onNext("Ruben Aguirre");
  sourceObservable.onComplete();

  testSubscriber1.assertionTarget.assertNotComplete();
  testSubscriber1.assertionTarget.assertNoValues();
  testSubscriber2.assertionTarget.assertComplete();
  testSubscriber2.assertionTarget.assertValue("Ruben Aguirre");
}
 
源代码12 项目: GankGirl   文件: RxPermissionsFragment.java
void onRequestPermissionsResult(String permissions[], int[] grantResults, boolean[] shouldShowRequestPermissionRationale) {
    for (int i = 0, size = permissions.length; i < size; i++) {
        log("onRequestPermissionsResult  " + permissions[i]);
        // 找到相应的subject
        PublishSubject<Permission> subject = mSubjects.get(permissions[i]);
        if (subject == null) {
            Log.e(RxPermissions.TAG, "RxPermissions.onRequestPermissionsResult invoked but didn't find the corresponding permission request.");
            return;
        }
        mSubjects.remove(permissions[i]);
        boolean granted = grantResults[i] == PackageManager.PERMISSION_GRANTED;
        subject.onNext(new Permission(permissions[i], granted, shouldShowRequestPermissionRationale[i]));
        subject.onComplete();
    }
}
 
源代码13 项目: RxDownloader   文件: RxDownloader.java
@Override
public void onReceive(Context context, Intent intent) {
    long id = intent.getLongExtra(DownloadManager.EXTRA_DOWNLOAD_ID, 0L);
    PublishSubject<String> publishSubject = subjectMap.get(id);

    if (publishSubject == null)
        return;

    DownloadManager.Query query = new DownloadManager.Query();
    query.setFilterById(id);
    DownloadManager downloadManager = getDownloadManager();
    Cursor cursor = downloadManager.query(query);

    if (!cursor.moveToFirst()) {
        cursor.close();
        downloadManager.remove(id);
        publishSubject.onError(new IllegalStateException("Cursor empty, this shouldn't happened"));
        subjectMap.remove(id);
        return;
    }

    int statusIndex = cursor.getColumnIndex(DownloadManager.COLUMN_STATUS);
    if (DownloadManager.STATUS_SUCCESSFUL != cursor.getInt(statusIndex)) {
        cursor.close();
        downloadManager.remove(id);
        publishSubject.onError(new IllegalStateException("Download Failed"));
        subjectMap.remove(id);
        return;
    }

    int uriIndex = cursor.getColumnIndex(DownloadManager.COLUMN_LOCAL_URI);
    String downloadedPackageUriString = cursor.getString(uriIndex);
    cursor.close();

    publishSubject.onNext(downloadedPackageUriString);
    publishSubject.onComplete();
    subjectMap.remove(id);
}
 
源代码14 项目: RxGroups   文件: ObservableGroupTest.java
@Test public void shouldClearQueuedResults() throws Exception {
  ObservableGroup group = observableManager.newGroup();
  PublishSubject<String> sourceObservable = PublishSubject.create();
  TestObserver<String> subscriber1 = new TestObserver<>();

  sourceObservable.compose(group.transform(subscriber1)).subscribe(subscriber1);
  group.dispose();
  sourceObservable.onNext("Hello");
  sourceObservable.onComplete();
  observableManager.destroy(group);

  assertThat(group.hasObservables(fooObserver)).isEqualTo(false);
}
 
源代码15 项目: RxGroups   文件: ObservableGroupTest.java
@Test public void shouldRemoveObservablesAfterTermination() throws Exception {
  ObservableGroup group = observableManager.newGroup();
  PublishSubject<String> sourceObservable = PublishSubject.create();
  TestObserver<String> subscriber = new TestObserver<>();
  sourceObservable.compose(group.transform(subscriber)).subscribe(subscriber);

  sourceObservable.onNext("Roberto Gomez Bolanos is king");
  sourceObservable.onComplete();

  subscriber.assertComplete();
  assertThat(group.hasObservables(fooObserver)).isEqualTo(false);
}
 
源代码16 项目: RxGroups   文件: ObservableGroupTest.java
@Test public void shouldNotDeliverResultWhileUnsubscribed() throws Exception {
  ObservableGroup group = observableManager.newGroup();
  TestObserver<String> testObserver = new TestObserver<>();
  PublishSubject<String> sourceObservable = PublishSubject.create();

  sourceObservable.compose(group.transform(testObserver)).subscribe(testObserver);
  group.dispose();

  sourceObservable.onNext("Roberto Gomez Bolanos");
  sourceObservable.onComplete();

  testObserver.assertNotComplete();
  assertThat(group.hasObservables(testObserver)).isEqualTo(true);
}
 
@Test
void replay_PublishSubject_test() {
    PublishSubject<Object> publishSubject = PublishSubject.create();
    ConnectableObservable<Object> replay = publishSubject.replay();
    ForkJoinPool forkJoinPool = ForkJoinPool.commonPool();
    List<Integer> integers = new ArrayList<>();
    for (int i=1;i<10;i++){
        integers.add(i);
    }
    Disposable subscribe1 = replay.subscribe(x -> {
        log("一郎神: " + x);
    }, Throwable::printStackTrace, () -> System.out.println("Emission completed"));

    Disposable subscribe2 = replay.subscribe(x -> {
        log("二郎神: " + x);
    }, Throwable::printStackTrace, () -> System.out.println("Emission completed"));
    Disposable subscribe3 = replay.subscribe(x -> {
        log("三郎神: " + x);
    }, Throwable::printStackTrace, () -> System.out.println("Emission completed"));
    AtomicInteger atomicInteger = new AtomicInteger(integers.size());
    try {
        forkJoinPool.submit(() -> {
            integers.forEach(id -> {
                sleep(1,TimeUnit.SECONDS);
                publishSubject.onNext(id);
                if (atomicInteger.decrementAndGet() == 0) {
                    publishSubject.onComplete();
                }
            });
        });
       /* integers.forEach(id -> forkJoinPool.submit(() -> {
            sleep(3,TimeUnit.SECONDS);
            publishSubject.onNext(id);
            if (atomicInteger.decrementAndGet() == 0) {
                publishSubject.onComplete();
            }
        }));*/
        replay.connect();
        sleep(2,TimeUnit.SECONDS);
        subscribe1.dispose();
        sleep(1,TimeUnit.SECONDS);
        //replay.connect(consumer -> consumer.dispose());
        publishSubject.onComplete();
    } finally  {
        try {
            forkJoinPool.shutdown();
            int shutdownDelaySec = 2;
            System.out.println("………………等待 " + shutdownDelaySec + " 秒后结束服务……………… ");
            forkJoinPool.awaitTermination(shutdownDelaySec, TimeUnit.SECONDS);
        } catch (Exception ex) {
            System.out.println("捕获到 forkJoinPool.awaitTermination()方法的异常: " + ex.getClass().getName());
        } finally {
            System.out.println("调用 forkJoinPool.shutdownNow()结束服务...");
            List<Runnable> l = forkJoinPool.shutdownNow();
            System.out.println("还剩 " + l.size() + " 个任务等待被执行,服务已关闭 ");
        }
    }
}
 
private void doSomeWork() {

        PublishSubject<Integer> source = PublishSubject.create();

        source.subscribe(getFirstObserver()); // it will get 1, 2, 3, 4 and onComplete

        source.onNext(1);
        source.onNext(2);
        source.onNext(3);

        /*
         * it will emit 4 and onComplete for second observer also.
         */
        source.subscribe(getSecondObserver());

        source.onNext(4);
        source.onComplete();

    }
 
源代码19 项目: akarnokd-misc   文件: BufferDebounce.java
@Test
public void test() {
    PublishSubject<Integer> ps = PublishSubject.create();
    
    TestScheduler sch = new TestScheduler();
    
    ps.compose(bufferDebounce(200, TimeUnit.MILLISECONDS, sch))
    .subscribe(
            v -> System.out.println(sch.now(TimeUnit.MILLISECONDS)+ ": " + v),
            Throwable::printStackTrace,
            () -> System.out.println("Done"));
    
    ps.onNext(1);
    ps.onNext(2);
    
    sch.advanceTimeTo(100, TimeUnit.MILLISECONDS);
    
    ps.onNext(3);

    sch.advanceTimeTo(150, TimeUnit.MILLISECONDS);

    ps.onNext(4);

    sch.advanceTimeTo(400, TimeUnit.MILLISECONDS);

    ps.onNext(5);

    sch.advanceTimeTo(450, TimeUnit.MILLISECONDS);

    ps.onNext(6);

    sch.advanceTimeTo(800, TimeUnit.MILLISECONDS);

    ps.onNext(7);
    ps.onComplete();

    sch.advanceTimeTo(850, TimeUnit.MILLISECONDS);
}
 
源代码20 项目: akarnokd-misc   文件: DebounceRailTest.java
@Test
public void test() {
    PublishSubject<String> subject = PublishSubject.create();
    
    TestScheduler sch = new TestScheduler();
    
    subject.compose(debounceOnly(v -> v.startsWith("A"), 100, TimeUnit.MILLISECONDS, sch))
    .subscribe(System.out::println, Throwable::printStackTrace, () -> System.out.println("Done"));
    
    subject.onNext("A1");
    
    sch.advanceTimeBy(100, TimeUnit.MILLISECONDS);
    
    subject.onNext("B1");
    sch.advanceTimeBy(1, TimeUnit.MILLISECONDS);

    subject.onNext("C1");
    sch.advanceTimeBy(1, TimeUnit.MILLISECONDS);

    subject.onNext("A2");
    sch.advanceTimeBy(50, TimeUnit.MILLISECONDS);

    subject.onNext("A3");
    sch.advanceTimeBy(100, TimeUnit.MILLISECONDS);

    subject.onNext("A4");
    sch.advanceTimeBy(50, TimeUnit.MILLISECONDS);

    subject.onNext("B2");
    sch.advanceTimeBy(100, TimeUnit.MILLISECONDS);

    subject.onNext("C2");
    sch.advanceTimeBy(100, TimeUnit.MILLISECONDS);
    
    subject.onComplete();
}