下面列出了io.reactivex.internal.fuseable.ScalarCallable#io.reactivex.observables.ConnectableObservable 实例代码,或者点击链接到github查看源代码,也可以在右侧发表评论。
@Test
void hot_publish_Observable() {
ConnectableObservable<Object> observable = Observable.create(observer -> {
System.out.println("Establishing connection");
observer.onNext("处理的数字是: " + Math.random() * 100);
observer.onNext("处理的数字是: " + Math.random() * 100);
observer.onComplete();
}).publish();
observable.subscribe(consumer -> {
System.out.println("一郎神: " + consumer);
});
observable.subscribe(consumer -> {
System.out.println("二郎神: " + consumer);
});
observable.connect();
}
public Publisher<Score> talkScores(final String title) {
Observable<Score> observable = Observable.create(e -> {
ScheduledExecutorService executorService = Executors.newScheduledThreadPool(1);
executorService.scheduleAtFixedRate(() -> {
Score s = Score.builder()
.title(title)
.score(Integer.valueOf((int) Math.floor(Math.random()*10)))
.build();
e.onNext(s);
}, 0, 2, TimeUnit.SECONDS);
});
ConnectableObservable connectableObservable = observable.share().publish();
connectableObservable.connect();
return connectableObservable.toFlowable(BackpressureStrategy.BUFFER);
}
public Publisher<Score> talkScores(final String title) {
Observable<Score> observable = Observable.create(e -> {
ScheduledExecutorService executorService = Executors.newScheduledThreadPool(1);
executorService.scheduleAtFixedRate(() -> {
Score s = Score.builder()
.title(title)
.score(Integer.valueOf((int) Math.floor(Math.random()*10)))
.build();
e.onNext(s);
}, 0, 2, TimeUnit.SECONDS);
});
ConnectableObservable connectableObservable = observable.share().publish();
connectableObservable.connect();
return connectableObservable.toFlowable(BackpressureStrategy.BUFFER);
}
private void doSomeWork() {
PublishSubject<Integer> source = PublishSubject.create();
ConnectableObservable<Integer> connectableObservable = source.replay(3); // bufferSize = 3 to retain 3 values to replay
connectableObservable.connect(); // connecting the connectableObservable
connectableObservable.subscribe(getFirstObserver());
source.onNext(1);
source.onNext(2);
source.onNext(3);
source.onNext(4);
source.onComplete();
/*
* it will emit 2, 3, 4 as (count = 3), retains the 3 values for replay
*/
connectableObservable.subscribe(getSecondObserver());
}
@Test
public void test() throws Exception {
printWithTime("Starting.");
Observable<String> obs = Observable.<String>create(e -> {
slowOperation();
})
.subscribeOn(Schedulers.io())
.observeOn(Schedulers.computation())
.cache()
;
ConnectableObservable<String> hot = obs.publish();
hot.connect();
printWithTime("Published and connected.");
//Thread.sleep(3000);
printWithTime("Subscribe.");
hot.subscribe(this::printWithTime, Throwable::printStackTrace, () -> System.out.println("Done"));
printWithTime("Subscribe2.");
hot.subscribe(this::printWithTime, Throwable::printStackTrace, () -> System.out.println("Done"));
Thread.sleep(3000);
}
@Test
public void test() throws Exception {
Observable<Integer> source = Observable.range(1, 5)
.delaySubscription(1, TimeUnit.SECONDS);
Function<Observable<Integer>, Observable<Integer>> func = o ->
Observable.merge(o.take(1), o.takeLast(1));
Observable<Integer> forkAndJoin = Observable.defer(() -> {
ConnectableObservable<Integer> conn = source
.doOnSubscribe(s -> System.out.println("Subscribed!"))
.publish();
Observable<Integer> result = func.apply(conn);
conn.connect();
return result;
});
forkAndJoin.subscribe(System.out::println);
forkAndJoin.subscribe(System.out::println);
forkAndJoin.subscribe(System.out::println);
Thread.sleep(10000);
}
/**
* Constructor - Init the publishing of the events
*/
public NewAssociationPublisher(BusinessDataRepository bDataRepository) {
this.businessDataRepository = bDataRepository;
Observable<String> stockPriceUpdateObservable = Observable.create(emitter -> {
ScheduledExecutorService executorService = Executors.newScheduledThreadPool(1);
executorService.scheduleAtFixedRate(verifyPresenceOfNewAssociation(emitter), 0, 20, TimeUnit.SECONDS);
});
ConnectableObservable<String> connectableObservable = stockPriceUpdateObservable.share().publish();
connectableObservable.connect();
publisher = connectableObservable.toFlowable(BackpressureStrategy.BUFFER);
}
@Test
void infinite_publish_test() {
ConnectableObservable<Object> observable = Observable.create(observer -> {
BigInteger i = BigInteger.ZERO;
while (true) {
observer.onNext(i);
i = i.add(BigInteger.ONE);
/* if (i.compareTo(BigInteger.valueOf(1000))==0) {
break;
}*/
}
}).publish();
observable.subscribe(x -> log(x));
observable.connect();
}
private static void hot1(){
ConnectableObservable<Long> hot = Observable.interval(10, TimeUnit.MILLISECONDS).publish();
hot.connect();
hot.subscribe(i -> System.out.println("First: " + i));
pauseMs(25);
hot.subscribe(i -> System.out.println("Second: " + i));
pauseMs(55);
}
@Override
public ConnectableObservable<String> freeFlowEmps() {
List<String> rosterNames = new ArrayList<>();
Function<Employee, String> familyNames = (emp) -> emp.getLastName().toUpperCase();
ConnectableObservable<String> flowyNames = Observable.fromIterable(employeeDaoImpl.getEmployees()).map(familyNames).cache().publish();
flowyNames.subscribe(System.out::println);
flowyNames.subscribe((name) ->{
rosterNames.add(name);
});
System.out.println(rosterNames);
return flowyNames;
}
public CommentPublisher() {
Observable<Comment> commentUpdateObservable = Observable.create(emitter -> {
this.emitter = emitter;
});
ConnectableObservable<Comment> connectableObservable = commentUpdateObservable.share().publish();
connectableObservable.connect();
publisher = connectableObservable.toFlowable(BackpressureStrategy.BUFFER);
}
/**
* If the receiver is enabled, this method will:
* <p>
* 1. Invoke the `func` given at the time of creation.
* 2. Multicast the returned observable.
* 3. Send the multicasted observable on {@link #executionObservables()}.
* 4. Subscribe (connect) to the original observable on the main thread.
*
* @param input The input value to pass to the receiver's `func`. This may be null.
* @return the multicasted observable, after subscription. If the receiver is not
* enabled, returns a observable that will send an error.
*/
@MainThread
public final Observable<T> execute(@Nullable Object input) {
boolean enabled = mImmediateEnabled.blockingFirst();
if (!enabled) {
return Observable.error(new IllegalStateException("The command is disabled and cannot be executed"));
}
try {
Observable<T> observable = mFunc.apply(input);
if (observable == null) {
throw new RuntimeException(String.format("null Observable returned from observable func for value %s", input));
}
// This means that `executing` and `enabled` will send updated values before
// the observable actually starts performing work.
final ConnectableObservable<T> connection = observable
.subscribeOn(AndroidSchedulers.mainThread())
.replay();
mAddedExecutionObservableSubject.onNext(connection);
connection.connect();
return connection;
} catch (Exception e) {
e.printStackTrace();
return Observable.error(e);
}
}
@Test
public void givenConnectableObservable_whenAssembled_shouldExecuteTheHook() {
RxJavaPlugins.setOnConnectableObservableAssembly(connectableObservable -> {
hookCalled = true;
return connectableObservable;
});
ConnectableObservable.range(1, 10)
.publish()
.connect();
assertTrue(hookCalled);
}
@Test public void connectableObservable_assembleInScope_subscribeNoScope() {
ConnectableObservable<Integer> source, errorSource;
try (Scope scope = currentTraceContext.newScope(assemblyContext)) {
source = Observable.range(1, 3)
.doOnNext(e -> assertInAssemblyContext())
.doOnComplete(this::assertInAssemblyContext).publish();
errorSource = Observable.<Integer>error(new IllegalStateException())
.doOnError(t -> assertInAssemblyContext())
.doOnComplete(this::assertInAssemblyContext).publish();
}
subscribeInNoContext(source.autoConnect(), errorSource.autoConnect()).assertResult(1, 2, 3);
}
@Test public void connectableObservable_assembleInScope_subscribeInScope() {
ConnectableObservable<Integer> source, errorSource;
try (Scope scope = currentTraceContext.newScope(assemblyContext)) {
source = Observable.range(1, 3)
.doOnNext(e -> assertInAssemblyContext())
.doOnComplete(this::assertInAssemblyContext).publish();
errorSource = Observable.<Integer>error(new IllegalStateException())
.doOnError(t -> assertInAssemblyContext())
.doOnComplete(this::assertInAssemblyContext).publish();
}
subscribeInDifferentContext(source.autoConnect(), errorSource.autoConnect())
.assertResult(1, 2, 3);
}
@Test public void connectableObservable_assembleNoScope_subscribeInScope() {
ConnectableObservable<Integer> source = Observable.range(1, 3)
.doOnNext(e -> assertInSubscribeContext())
.doOnComplete(this::assertInSubscribeContext).publish();
ConnectableObservable<Integer> errorSource =
Observable.<Integer>error(new IllegalStateException())
.doOnError(t -> assertInSubscribeContext())
.doOnComplete(this::assertInSubscribeContext).publish();
subscribeInDifferentContext(source.autoConnect(), errorSource.autoConnect())
.assertResult(1, 2, 3);
}
@Test
void replay_PublishSubject_test() {
PublishSubject<Object> publishSubject = PublishSubject.create();
ConnectableObservable<Object> replay = publishSubject.replay();
ForkJoinPool forkJoinPool = ForkJoinPool.commonPool();
List<Integer> integers = new ArrayList<>();
for (int i=1;i<10;i++){
integers.add(i);
}
Disposable subscribe1 = replay.subscribe(x -> {
log("一郎神: " + x);
}, Throwable::printStackTrace, () -> System.out.println("Emission completed"));
Disposable subscribe2 = replay.subscribe(x -> {
log("二郎神: " + x);
}, Throwable::printStackTrace, () -> System.out.println("Emission completed"));
Disposable subscribe3 = replay.subscribe(x -> {
log("三郎神: " + x);
}, Throwable::printStackTrace, () -> System.out.println("Emission completed"));
AtomicInteger atomicInteger = new AtomicInteger(integers.size());
try {
forkJoinPool.submit(() -> {
integers.forEach(id -> {
sleep(1,TimeUnit.SECONDS);
publishSubject.onNext(id);
if (atomicInteger.decrementAndGet() == 0) {
publishSubject.onComplete();
}
});
});
/* integers.forEach(id -> forkJoinPool.submit(() -> {
sleep(3,TimeUnit.SECONDS);
publishSubject.onNext(id);
if (atomicInteger.decrementAndGet() == 0) {
publishSubject.onComplete();
}
}));*/
replay.connect();
sleep(2,TimeUnit.SECONDS);
subscribe1.dispose();
sleep(1,TimeUnit.SECONDS);
//replay.connect(consumer -> consumer.dispose());
publishSubject.onComplete();
} finally {
try {
forkJoinPool.shutdown();
int shutdownDelaySec = 2;
System.out.println("………………等待 " + shutdownDelaySec + " 秒后结束服务……………… ");
forkJoinPool.awaitTermination(shutdownDelaySec, TimeUnit.SECONDS);
} catch (Exception ex) {
System.out.println("捕获到 forkJoinPool.awaitTermination()方法的异常: " + ex.getClass().getName());
} finally {
System.out.println("调用 forkJoinPool.shutdownNow()结束服务...");
List<Runnable> l = forkJoinPool.shutdownNow();
System.out.println("还剩 " + l.size() + " 个任务等待被执行,服务已关闭 ");
}
}
}
public static void main(String args[]) {
Observable<Long> observableInt = Observable.interval(2, TimeUnit.SECONDS);
ConnectableObservable<Long> connectableIntObservable = observableInt.publish();
connectableIntObservable.subscribe(i -> System.out.println("Observable #1 : "+i));
connectableIntObservable.connect();
addDelay(7000);
connectableIntObservable.
subscribe(i -> System.out.println("Observable #2 : "+i));
addDelay(10000);
}
ObservableOnAssemblyConnectable(ConnectableObservable<T> source) {
this.source = source;
this.assembled = new RxJavaAssemblyException();
}
private SubscriptionProxy(Observable<T> sourceObservable, Action onTerminate) {
final ConnectableObservable<T> replay = sourceObservable.replay();
sourceDisposable = replay.connect();
proxy = replay.doOnTerminate(onTerminate);
disposableList = new CompositeDisposable(sourceDisposable);
}
RequestContextConnectableObservable(ConnectableObservable<T> source, RequestContext assemblyContext) {
this.source = source;
this.assemblyContext = assemblyContext;
}
TraceContextConnectableObservable(
ConnectableObservable<T> source, CurrentTraceContext contextScoper, TraceContext assembled) {
this.source = source;
this.contextScoper = contextScoper;
this.assembled = assembled;
}
public static <T> ConnectableObservable<T> wrap(
ConnectableObservable<T> source, CurrentTraceContext contextScoper, TraceContext assembled) {
return new TraceContextConnectableObservable<>(source, contextScoper, assembled);
}
public StockTickerPublisher() {
Observable<StockPriceUpdate> stockPriceUpdateObservable = Observable.create(emitter -> {
ScheduledExecutorService executorService = Executors.newScheduledThreadPool(1);
executorService.scheduleAtFixedRate(newStockTick(emitter), 0, 2, TimeUnit.SECONDS);
});
ConnectableObservable<StockPriceUpdate> connectableObservable = stockPriceUpdateObservable.share().publish();
connectableObservable.connect();
publisher = connectableObservable.toFlowable(BackpressureStrategy.BUFFER);
}
public StockTickerPublisher() {
Observable<StockPriceUpdate> stockPriceUpdateObservable = Observable.create(emitter -> {
ScheduledExecutorService executorService = Executors.newScheduledThreadPool(1);
executorService.scheduleAtFixedRate(newStockTick(emitter), 0, 2, TimeUnit.SECONDS);
});
ConnectableObservable<StockPriceUpdate> connectableObservable = stockPriceUpdateObservable.share().publish();
connectableObservable.connect();
publisher = connectableObservable.toFlowable(BackpressureStrategy.BUFFER);
}
public static void main(String[] args) {
ConnectableObservable observable = Observable.create(observer -> {
observer.onNext("I am Hot Observable "+Math.random()*100);
observer.onComplete();
}).publish();
observable.subscribe(consumer -> System.out.println("message:-" + consumer));
observable.subscribe(consumer -> System.out.println("message:-" + consumer));
observable.connect();
}
public static void main(String[] args) throws InterruptedException{
ConnectableObservable<Long> numbers = (ConnectableObservable<Long>) Observable
.interval(1, TimeUnit.SECONDS) // generate numbers
.publish(); // make it hot
numbers.connect(); // create internal subscribtion
Disposable subscriber1 = numbers
.subscribe(n ->System.out.println("First subscriber: "+ n ));
Thread.sleep(3000);
Disposable subscriber2 = numbers
.subscribe(n ->System.out.println(" Second subscriber: "+ n ));
Thread.sleep(5000);
System.out.println(">>> First subscriber goes for lunch break");
subscriber1.dispose();
Thread.sleep(5000);
System.out.println("<<< First subscriber returned from lunch");
subscriber1 = numbers.subscribe(n ->System.out.println("First subscriber: "+ n ));
Thread.sleep(60000); // Just to keep the program running
}
public ConnectableObservable<String> freeFlowEmps();