io.reactivex.subjects.ReplaySubject#onComplete ( )源码实例Demo

下面列出了io.reactivex.subjects.ReplaySubject#onComplete ( ) 实例代码,或者点击链接到github查看源代码,也可以在右侧发表评论。

@Test
void replaySubject_conf_test() {
    //ReplaySubject<Object> replaySubject = ReplaySubject.createWithSize(5);//只缓存订阅前最后5条数据
    //ReplaySubject<Object> replaySubject = ReplaySubject.createWithTime(5,TimeUnit.SECONDS, Schedulers.computation()); //只缓存被订阅前5秒内的数据
    ReplaySubject<Object> replaySubject =ReplaySubject.createWithTimeAndSize(5,
            TimeUnit.SECONDS, Schedulers.computation(),3); // 请结合以上两者注释
    for (Long i=1l;i<10l;i++){
        replaySubject.onNext(i);
        sleep(1,TimeUnit.SECONDS);
    }

    replaySubject.subscribe(x -> log("一郎神: " + x),
            Throwable::printStackTrace,
            () -> System.out.println("Emission completed"),
            disposable -> System.out.println("onSubscribe")
    );
    replaySubject.onNext(10l);
    replaySubject.onComplete();

}
 
private void doSomeWork() {

        ReplaySubject<Integer> source = ReplaySubject.create();

        source.subscribe(getFirstObserver()); // it will get 1, 2, 3, 4

        source.onNext(1);
        source.onNext(2);
        source.onNext(3);
        source.onNext(4);
        source.onComplete();

        /*
         * it will emit 1, 2, 3, 4 for second observer also as we have used replay
         */
        source.subscribe(getSecondObserver());

    }
 
源代码3 项目: RxGroups   文件: SubscriptionProxyTest.java
@Test public void testUnsubscribeBeforeEmit() {
  TestObserver<String> observer = new TestObserver<>();
  ReplaySubject<String> subject = ReplaySubject.create();
  SubscriptionProxy<String> proxy = SubscriptionProxy.create(subject);

  proxy.subscribe(observer);
  proxy.dispose();

  observer.assertNotComplete();
  observer.assertNoValues();

  subject.onNext("Avanti!");
  subject.onComplete();

  // disposable observables may not be resused in RxJava2
  observer = new TestObserver<>();
  proxy.subscribe(observer);
  observer.assertComplete();
  observer.assertValue("Avanti!");
}
 
源代码4 项目: RxGroups   文件: SubscriptionProxyTest.java
@Test public void shouldCacheResultsWhileUnsubscribedAndDeliverAfterResubscription() {
  TestObserver<String> observer = new TestObserver<>();
  ReplaySubject<String> subject = ReplaySubject.create();
  SubscriptionProxy<String> proxy = SubscriptionProxy.create(subject);

  proxy.subscribe(observer);
  proxy.dispose();

  observer.assertNoValues();

  subject.onNext("Avanti!");
  subject.onComplete();

  // disposable observables may not be resused in RxJava2
  observer = new TestObserver<>();
  proxy.subscribe(observer);

  observer.awaitTerminalEvent(3, TimeUnit.SECONDS);
  observer.assertValue("Avanti!");
}
 
源代码5 项目: RxGroups   文件: SubscriptionProxyTest.java
@Test public void shouldRedeliverSameResultsToDifferentSubscriber() {
  // Use case: When rotating an activity, ObservableManager will re-subscribe original request's
  // Observable to a new Observer, which is a member of the new activity instance. In this
  // case, we may want to redeliver any previous results (if the request is still being
  // managed by ObservableManager).
  TestObserver<String> observer = new TestObserver<>();
  ReplaySubject<String> subject = ReplaySubject.create();
  SubscriptionProxy<String> proxy = SubscriptionProxy.create(subject);

  proxy.subscribe(observer);

  subject.onNext("Avanti!");
  subject.onComplete();

  proxy.dispose();

  TestObserver<String> newSubscriber = new TestObserver<>();
  proxy.subscribe(newSubscriber);

  newSubscriber.awaitTerminalEvent(3, TimeUnit.SECONDS);
  newSubscriber.assertComplete();
  newSubscriber.assertValue("Avanti!");

  observer.assertComplete();
  observer.assertValue("Avanti!");
}
 
@Test
void replaySubject_test() {
    ReplaySubject<Object> replaySubject = ReplaySubject.create();
    replaySubject.onNext(1l);
    replaySubject.onNext(2l);
    replaySubject.subscribe(x -> log("一郎神: " + x),
            Throwable::printStackTrace,
            () -> System.out.println("Emission completed"),
            disposable -> System.out.println("onSubscribe")
    );
    replaySubject.onNext(10l);
    replaySubject.onComplete();

}
 
public static void main(String[] args) {
	// TODO Auto-generated method stub
	Observer<Long> observer=new Observer<Long>() {

		@Override
		public void onComplete() {
			// TODO Auto-generated method stub
			System.out.println("It's Done");
			
		}

		@Override
		public void onError(Throwable throwable) {
			// TODO Auto-generated method stub
			throwable.printStackTrace();
			
		}

		@Override
		public void onNext(Long value) {
			// TODO Auto-generated method stub
			System.out.println(":"+value);
		}

		@Override
		public void onSubscribe(Disposable disposable) {
			// TODO Auto-generated method stub
			System.out.println("onSubscribe");
			
		}
	};
	
	ReplaySubject<Long> replaySubject=ReplaySubject.create();
	replaySubject.onNext(1l);
	replaySubject.onNext(2l);
	replaySubject.subscribe(observer);
	replaySubject.onNext(10l);
	replaySubject.onComplete();
	

}