下面列出了io.reactivex.observables.ConnectableObservable#toFlowable ( ) 实例代码,或者点击链接到github查看源代码,也可以在右侧发表评论。
public Publisher<Score> talkScores(final String title) {
Observable<Score> observable = Observable.create(e -> {
ScheduledExecutorService executorService = Executors.newScheduledThreadPool(1);
executorService.scheduleAtFixedRate(() -> {
Score s = Score.builder()
.title(title)
.score(Integer.valueOf((int) Math.floor(Math.random()*10)))
.build();
e.onNext(s);
}, 0, 2, TimeUnit.SECONDS);
});
ConnectableObservable connectableObservable = observable.share().publish();
connectableObservable.connect();
return connectableObservable.toFlowable(BackpressureStrategy.BUFFER);
}
public Publisher<Score> talkScores(final String title) {
Observable<Score> observable = Observable.create(e -> {
ScheduledExecutorService executorService = Executors.newScheduledThreadPool(1);
executorService.scheduleAtFixedRate(() -> {
Score s = Score.builder()
.title(title)
.score(Integer.valueOf((int) Math.floor(Math.random()*10)))
.build();
e.onNext(s);
}, 0, 2, TimeUnit.SECONDS);
});
ConnectableObservable connectableObservable = observable.share().publish();
connectableObservable.connect();
return connectableObservable.toFlowable(BackpressureStrategy.BUFFER);
}
/**
* Constructor - Init the publishing of the events
*/
public NewAssociationPublisher(BusinessDataRepository bDataRepository) {
this.businessDataRepository = bDataRepository;
Observable<String> stockPriceUpdateObservable = Observable.create(emitter -> {
ScheduledExecutorService executorService = Executors.newScheduledThreadPool(1);
executorService.scheduleAtFixedRate(verifyPresenceOfNewAssociation(emitter), 0, 20, TimeUnit.SECONDS);
});
ConnectableObservable<String> connectableObservable = stockPriceUpdateObservable.share().publish();
connectableObservable.connect();
publisher = connectableObservable.toFlowable(BackpressureStrategy.BUFFER);
}
public CommentPublisher() {
Observable<Comment> commentUpdateObservable = Observable.create(emitter -> {
this.emitter = emitter;
});
ConnectableObservable<Comment> connectableObservable = commentUpdateObservable.share().publish();
connectableObservable.connect();
publisher = connectableObservable.toFlowable(BackpressureStrategy.BUFFER);
}
public StockTickerPublisher() {
Observable<StockPriceUpdate> stockPriceUpdateObservable = Observable.create(emitter -> {
ScheduledExecutorService executorService = Executors.newScheduledThreadPool(1);
executorService.scheduleAtFixedRate(newStockTick(emitter), 0, 2, TimeUnit.SECONDS);
});
ConnectableObservable<StockPriceUpdate> connectableObservable = stockPriceUpdateObservable.share().publish();
connectableObservable.connect();
publisher = connectableObservable.toFlowable(BackpressureStrategy.BUFFER);
}
public StockTickerPublisher() {
Observable<StockPriceUpdate> stockPriceUpdateObservable = Observable.create(emitter -> {
ScheduledExecutorService executorService = Executors.newScheduledThreadPool(1);
executorService.scheduleAtFixedRate(newStockTick(emitter), 0, 2, TimeUnit.SECONDS);
});
ConnectableObservable<StockPriceUpdate> connectableObservable = stockPriceUpdateObservable.share().publish();
connectableObservable.connect();
publisher = connectableObservable.toFlowable(BackpressureStrategy.BUFFER);
}