下面列出了怎么用io.reactivex.subjects.AsyncSubject的API类实例代码及写法,或者点击链接到github查看源代码。
@Test
void asyncSubject_test() {
AsyncSubject<Object> asyncSubject = AsyncSubject.create();
asyncSubject.subscribe(x -> log("一郎神: " + x),
Throwable::printStackTrace,
() -> System.out.println("Emission completed"),
disposable -> System.out.println("onSubscribe")
);
asyncSubject.onNext(1L);
asyncSubject.onNext(2L);
asyncSubject.onNext(10L);
//asyncSubject.onError(new RuntimeException("我来耍下宝"));
asyncSubject.onComplete();
}
public static void main(String[] args) {
// TODO Auto-generated method stub
AsyncSubject<Long> asyncSubject=AsyncSubject.create();
asyncSubject.subscribe(new Observer<Long>() {
@Override
public void onComplete() {
// TODO Auto-generated method stub
System.out.println("It's Done");
}
@Override
public void onError(Throwable throwable) {
// TODO Auto-generated method stub
throwable.printStackTrace();
}
@Override
public void onNext(Long value) {
// TODO Auto-generated method stub
System.out.println(":"+value);
}
@Override
public void onSubscribe(Disposable disposable) {
// TODO Auto-generated method stub
System.out.println("onSubscribe");
}
});
asyncSubject.onNext(1L);
asyncSubject.onNext(2L);
asyncSubject.onNext(10L);
asyncSubject.onComplete();
}
private void doSomeWork() {
AsyncSubject<Integer> source = AsyncSubject.create();
source.subscribe(getFirstObserver()); // it will emit only 4 and onComplete
source.onNext(1);
source.onNext(2);
source.onNext(3);
/*
* it will emit 4 and onComplete for second observer also.
*/
source.subscribe(getSecondObserver());
source.onNext(4);
source.onComplete();
}
private static void demo5() throws InterruptedException {
Subject<String> subject = AsyncSubject.create();
Observable.interval(0, 1, TimeUnit.SECONDS)
.take(4)
.map(Objects::toString)
.subscribe(subject);
subject.subscribe(v -> log(v));
Thread.sleep(5100);
subject.subscribe(v -> log(v));
}