下面列出了io.reactivex.subjects.ReplaySubject#onNext ( ) 实例代码,或者点击链接到github查看源代码,也可以在右侧发表评论。
@Test
void replaySubject_conf_test() {
//ReplaySubject<Object> replaySubject = ReplaySubject.createWithSize(5);//只缓存订阅前最后5条数据
//ReplaySubject<Object> replaySubject = ReplaySubject.createWithTime(5,TimeUnit.SECONDS, Schedulers.computation()); //只缓存被订阅前5秒内的数据
ReplaySubject<Object> replaySubject =ReplaySubject.createWithTimeAndSize(5,
TimeUnit.SECONDS, Schedulers.computation(),3); // 请结合以上两者注释
for (Long i=1l;i<10l;i++){
replaySubject.onNext(i);
sleep(1,TimeUnit.SECONDS);
}
replaySubject.subscribe(x -> log("一郎神: " + x),
Throwable::printStackTrace,
() -> System.out.println("Emission completed"),
disposable -> System.out.println("onSubscribe")
);
replaySubject.onNext(10l);
replaySubject.onComplete();
}
private void doSomeWork() {
ReplaySubject<Integer> source = ReplaySubject.create();
source.subscribe(getFirstObserver()); // it will get 1, 2, 3, 4
source.onNext(1);
source.onNext(2);
source.onNext(3);
source.onNext(4);
source.onComplete();
/*
* it will emit 1, 2, 3, 4 for second observer also as we have used replay
*/
source.subscribe(getSecondObserver());
}
@Test public void testUnsubscribeBeforeEmit() {
TestObserver<String> observer = new TestObserver<>();
ReplaySubject<String> subject = ReplaySubject.create();
SubscriptionProxy<String> proxy = SubscriptionProxy.create(subject);
proxy.subscribe(observer);
proxy.dispose();
observer.assertNotComplete();
observer.assertNoValues();
subject.onNext("Avanti!");
subject.onComplete();
// disposable observables may not be resused in RxJava2
observer = new TestObserver<>();
proxy.subscribe(observer);
observer.assertComplete();
observer.assertValue("Avanti!");
}
@Test public void shouldCacheResultsWhileUnsubscribedAndDeliverAfterResubscription() {
TestObserver<String> observer = new TestObserver<>();
ReplaySubject<String> subject = ReplaySubject.create();
SubscriptionProxy<String> proxy = SubscriptionProxy.create(subject);
proxy.subscribe(observer);
proxy.dispose();
observer.assertNoValues();
subject.onNext("Avanti!");
subject.onComplete();
// disposable observables may not be resused in RxJava2
observer = new TestObserver<>();
proxy.subscribe(observer);
observer.awaitTerminalEvent(3, TimeUnit.SECONDS);
observer.assertValue("Avanti!");
}
@Test public void shouldRedeliverSameResultsToDifferentSubscriber() {
// Use case: When rotating an activity, ObservableManager will re-subscribe original request's
// Observable to a new Observer, which is a member of the new activity instance. In this
// case, we may want to redeliver any previous results (if the request is still being
// managed by ObservableManager).
TestObserver<String> observer = new TestObserver<>();
ReplaySubject<String> subject = ReplaySubject.create();
SubscriptionProxy<String> proxy = SubscriptionProxy.create(subject);
proxy.subscribe(observer);
subject.onNext("Avanti!");
subject.onComplete();
proxy.dispose();
TestObserver<String> newSubscriber = new TestObserver<>();
proxy.subscribe(newSubscriber);
newSubscriber.awaitTerminalEvent(3, TimeUnit.SECONDS);
newSubscriber.assertComplete();
newSubscriber.assertValue("Avanti!");
observer.assertComplete();
observer.assertValue("Avanti!");
}
@Test
void replaySubject_test() {
ReplaySubject<Object> replaySubject = ReplaySubject.create();
replaySubject.onNext(1l);
replaySubject.onNext(2l);
replaySubject.subscribe(x -> log("一郎神: " + x),
Throwable::printStackTrace,
() -> System.out.println("Emission completed"),
disposable -> System.out.println("onSubscribe")
);
replaySubject.onNext(10l);
replaySubject.onComplete();
}
@Test public void shouldKeepDeliveringEventsAfterResubscribed() {
TestObserver<String> observer = new TestObserver<>();
ReplaySubject<String> subject = ReplaySubject.create();
SubscriptionProxy<String> proxy = SubscriptionProxy.create(subject);
proxy.subscribe(observer);
subject.onNext("Avanti 1");
proxy.dispose();
observer = new TestObserver<>();
proxy.subscribe(observer);
subject.onNext("Avanti!");
observer.assertValues("Avanti 1", "Avanti!");
}
public static void main(String[] args) {
// TODO Auto-generated method stub
Observer<Long> observer=new Observer<Long>() {
@Override
public void onComplete() {
// TODO Auto-generated method stub
System.out.println("It's Done");
}
@Override
public void onError(Throwable throwable) {
// TODO Auto-generated method stub
throwable.printStackTrace();
}
@Override
public void onNext(Long value) {
// TODO Auto-generated method stub
System.out.println(":"+value);
}
@Override
public void onSubscribe(Disposable disposable) {
// TODO Auto-generated method stub
System.out.println("onSubscribe");
}
};
ReplaySubject<Long> replaySubject=ReplaySubject.create();
replaySubject.onNext(1l);
replaySubject.onNext(2l);
replaySubject.subscribe(observer);
replaySubject.onNext(10l);
replaySubject.onComplete();
}