下面列出了io.reactivex.observables.ConnectableObservable#subscribe ( ) 实例代码,或者点击链接到github查看源代码,也可以在右侧发表评论。
@Test
void hot_publish_Observable() {
ConnectableObservable<Object> observable = Observable.create(observer -> {
System.out.println("Establishing connection");
observer.onNext("处理的数字是: " + Math.random() * 100);
observer.onNext("处理的数字是: " + Math.random() * 100);
observer.onComplete();
}).publish();
observable.subscribe(consumer -> {
System.out.println("一郎神: " + consumer);
});
observable.subscribe(consumer -> {
System.out.println("二郎神: " + consumer);
});
observable.connect();
}
private void doSomeWork() {
PublishSubject<Integer> source = PublishSubject.create();
ConnectableObservable<Integer> connectableObservable = source.replay(3); // bufferSize = 3 to retain 3 values to replay
connectableObservable.connect(); // connecting the connectableObservable
connectableObservable.subscribe(getFirstObserver());
source.onNext(1);
source.onNext(2);
source.onNext(3);
source.onNext(4);
source.onComplete();
/*
* it will emit 2, 3, 4 as (count = 3), retains the 3 values for replay
*/
connectableObservable.subscribe(getSecondObserver());
}
@Test
public void test() throws Exception {
printWithTime("Starting.");
Observable<String> obs = Observable.<String>create(e -> {
slowOperation();
})
.subscribeOn(Schedulers.io())
.observeOn(Schedulers.computation())
.cache()
;
ConnectableObservable<String> hot = obs.publish();
hot.connect();
printWithTime("Published and connected.");
//Thread.sleep(3000);
printWithTime("Subscribe.");
hot.subscribe(this::printWithTime, Throwable::printStackTrace, () -> System.out.println("Done"));
printWithTime("Subscribe2.");
hot.subscribe(this::printWithTime, Throwable::printStackTrace, () -> System.out.println("Done"));
Thread.sleep(3000);
}
@Test
void infinite_publish_test() {
ConnectableObservable<Object> observable = Observable.create(observer -> {
BigInteger i = BigInteger.ZERO;
while (true) {
observer.onNext(i);
i = i.add(BigInteger.ONE);
/* if (i.compareTo(BigInteger.valueOf(1000))==0) {
break;
}*/
}
}).publish();
observable.subscribe(x -> log(x));
observable.connect();
}
private static void hot1(){
ConnectableObservable<Long> hot = Observable.interval(10, TimeUnit.MILLISECONDS).publish();
hot.connect();
hot.subscribe(i -> System.out.println("First: " + i));
pauseMs(25);
hot.subscribe(i -> System.out.println("Second: " + i));
pauseMs(55);
}
@Override
public ConnectableObservable<String> freeFlowEmps() {
List<String> rosterNames = new ArrayList<>();
Function<Employee, String> familyNames = (emp) -> emp.getLastName().toUpperCase();
ConnectableObservable<String> flowyNames = Observable.fromIterable(employeeDaoImpl.getEmployees()).map(familyNames).cache().publish();
flowyNames.subscribe(System.out::println);
flowyNames.subscribe((name) ->{
rosterNames.add(name);
});
System.out.println(rosterNames);
return flowyNames;
}
@Test
void replay_PublishSubject_test() {
PublishSubject<Object> publishSubject = PublishSubject.create();
ConnectableObservable<Object> replay = publishSubject.replay();
ForkJoinPool forkJoinPool = ForkJoinPool.commonPool();
List<Integer> integers = new ArrayList<>();
for (int i=1;i<10;i++){
integers.add(i);
}
Disposable subscribe1 = replay.subscribe(x -> {
log("一郎神: " + x);
}, Throwable::printStackTrace, () -> System.out.println("Emission completed"));
Disposable subscribe2 = replay.subscribe(x -> {
log("二郎神: " + x);
}, Throwable::printStackTrace, () -> System.out.println("Emission completed"));
Disposable subscribe3 = replay.subscribe(x -> {
log("三郎神: " + x);
}, Throwable::printStackTrace, () -> System.out.println("Emission completed"));
AtomicInteger atomicInteger = new AtomicInteger(integers.size());
try {
forkJoinPool.submit(() -> {
integers.forEach(id -> {
sleep(1,TimeUnit.SECONDS);
publishSubject.onNext(id);
if (atomicInteger.decrementAndGet() == 0) {
publishSubject.onComplete();
}
});
});
/* integers.forEach(id -> forkJoinPool.submit(() -> {
sleep(3,TimeUnit.SECONDS);
publishSubject.onNext(id);
if (atomicInteger.decrementAndGet() == 0) {
publishSubject.onComplete();
}
}));*/
replay.connect();
sleep(2,TimeUnit.SECONDS);
subscribe1.dispose();
sleep(1,TimeUnit.SECONDS);
//replay.connect(consumer -> consumer.dispose());
publishSubject.onComplete();
} finally {
try {
forkJoinPool.shutdown();
int shutdownDelaySec = 2;
System.out.println("………………等待 " + shutdownDelaySec + " 秒后结束服务……………… ");
forkJoinPool.awaitTermination(shutdownDelaySec, TimeUnit.SECONDS);
} catch (Exception ex) {
System.out.println("捕获到 forkJoinPool.awaitTermination()方法的异常: " + ex.getClass().getName());
} finally {
System.out.println("调用 forkJoinPool.shutdownNow()结束服务...");
List<Runnable> l = forkJoinPool.shutdownNow();
System.out.println("还剩 " + l.size() + " 个任务等待被执行,服务已关闭 ");
}
}
}
public static void main(String args[]) {
Observable<Long> observableInt = Observable.interval(2, TimeUnit.SECONDS);
ConnectableObservable<Long> connectableIntObservable = observableInt.publish();
connectableIntObservable.subscribe(i -> System.out.println("Observable #1 : "+i));
connectableIntObservable.connect();
addDelay(7000);
connectableIntObservable.
subscribe(i -> System.out.println("Observable #2 : "+i));
addDelay(10000);
}
public static void main(String[] args) {
ConnectableObservable observable = Observable.create(observer -> {
observer.onNext("I am Hot Observable "+Math.random()*100);
observer.onComplete();
}).publish();
observable.subscribe(consumer -> System.out.println("message:-" + consumer));
observable.subscribe(consumer -> System.out.println("message:-" + consumer));
observable.connect();
}
public static void main(String[] args) throws InterruptedException{
ConnectableObservable<Long> numbers = (ConnectableObservable<Long>) Observable
.interval(1, TimeUnit.SECONDS) // generate numbers
.publish(); // make it hot
numbers.connect(); // create internal subscribtion
Disposable subscriber1 = numbers
.subscribe(n ->System.out.println("First subscriber: "+ n ));
Thread.sleep(3000);
Disposable subscriber2 = numbers
.subscribe(n ->System.out.println(" Second subscriber: "+ n ));
Thread.sleep(5000);
System.out.println(">>> First subscriber goes for lunch break");
subscriber1.dispose();
Thread.sleep(5000);
System.out.println("<<< First subscriber returned from lunch");
subscriber1 = numbers.subscribe(n ->System.out.println("First subscriber: "+ n ));
Thread.sleep(60000); // Just to keep the program running
}