io.reactivex.observables.ConnectableObservable#subscribe ( )源码实例Demo

下面列出了io.reactivex.observables.ConnectableObservable#subscribe ( ) 实例代码,或者点击链接到github查看源代码,也可以在右侧发表评论。

@Test
void hot_publish_Observable() {
    ConnectableObservable<Object> observable = Observable.create(observer -> {
        System.out.println("Establishing connection");
        observer.onNext("处理的数字是: " + Math.random() * 100);
        observer.onNext("处理的数字是: " + Math.random() * 100);
        observer.onComplete();
    }).publish();
    observable.subscribe(consumer -> {
        System.out.println("一郎神: " + consumer);
    });
    observable.subscribe(consumer -> {
        System.out.println("二郎神: " + consumer);
    });
    observable.connect();
}
 
private void doSomeWork() {

        PublishSubject<Integer> source = PublishSubject.create();
        ConnectableObservable<Integer> connectableObservable = source.replay(3); // bufferSize = 3 to retain 3 values to replay
        connectableObservable.connect(); // connecting the connectableObservable

        connectableObservable.subscribe(getFirstObserver());

        source.onNext(1);
        source.onNext(2);
        source.onNext(3);
        source.onNext(4);
        source.onComplete();

        /*
         * it will emit 2, 3, 4 as (count = 3), retains the 3 values for replay
         */
        connectableObservable.subscribe(getSecondObserver());

    }
 
源代码3 项目: akarnokd-misc   文件: RxLosers.java
@Test
public void test() throws Exception {
    printWithTime("Starting.");
    Observable<String> obs = Observable.<String>create(e -> {
                slowOperation();
            })
            .subscribeOn(Schedulers.io())
            .observeOn(Schedulers.computation())
            .cache()
            ;
    ConnectableObservable<String> hot = obs.publish();
    hot.connect();
    printWithTime("Published and connected.");
    //Thread.sleep(3000);

    printWithTime("Subscribe.");
    hot.subscribe(this::printWithTime, Throwable::printStackTrace, () -> System.out.println("Done"));
    printWithTime("Subscribe2.");
    hot.subscribe(this::printWithTime, Throwable::printStackTrace, () -> System.out.println("Done"));
    Thread.sleep(3000);
}
 
@Test
void infinite_publish_test() {
    ConnectableObservable<Object> observable = Observable.create(observer -> {
        BigInteger i = BigInteger.ZERO;
        while (true) {
            observer.onNext(i);
            i = i.add(BigInteger.ONE);
           /* if (i.compareTo(BigInteger.valueOf(1000))==0) {
                break;
            }*/
        }
    }).publish();
    observable.subscribe(x -> log(x));
    observable.connect();
}
 
源代码5 项目: Learn-Java-12-Programming   文件: HotObservable.java
private static void hot1(){
    ConnectableObservable<Long> hot = Observable.interval(10, TimeUnit.MILLISECONDS).publish();
    hot.connect();

    hot.subscribe(i -> System.out.println("First: " + i));
    pauseMs(25);

    hot.subscribe(i -> System.out.println("Second: " + i));
    pauseMs(55);
}
 
@Override
public ConnectableObservable<String> freeFlowEmps() {
	 List<String> rosterNames = new ArrayList<>();
	 Function<Employee, String> familyNames = (emp) -> emp.getLastName().toUpperCase();
	 ConnectableObservable<String> flowyNames = Observable.fromIterable(employeeDaoImpl.getEmployees()).map(familyNames).cache().publish();
	 
	 flowyNames.subscribe(System.out::println);
	 flowyNames.subscribe((name) ->{
		 rosterNames.add(name);
	 }); 
	 System.out.println(rosterNames);
	return flowyNames;
}
 
@Test
void replay_PublishSubject_test() {
    PublishSubject<Object> publishSubject = PublishSubject.create();
    ConnectableObservable<Object> replay = publishSubject.replay();
    ForkJoinPool forkJoinPool = ForkJoinPool.commonPool();
    List<Integer> integers = new ArrayList<>();
    for (int i=1;i<10;i++){
        integers.add(i);
    }
    Disposable subscribe1 = replay.subscribe(x -> {
        log("一郎神: " + x);
    }, Throwable::printStackTrace, () -> System.out.println("Emission completed"));

    Disposable subscribe2 = replay.subscribe(x -> {
        log("二郎神: " + x);
    }, Throwable::printStackTrace, () -> System.out.println("Emission completed"));
    Disposable subscribe3 = replay.subscribe(x -> {
        log("三郎神: " + x);
    }, Throwable::printStackTrace, () -> System.out.println("Emission completed"));
    AtomicInteger atomicInteger = new AtomicInteger(integers.size());
    try {
        forkJoinPool.submit(() -> {
            integers.forEach(id -> {
                sleep(1,TimeUnit.SECONDS);
                publishSubject.onNext(id);
                if (atomicInteger.decrementAndGet() == 0) {
                    publishSubject.onComplete();
                }
            });
        });
       /* integers.forEach(id -> forkJoinPool.submit(() -> {
            sleep(3,TimeUnit.SECONDS);
            publishSubject.onNext(id);
            if (atomicInteger.decrementAndGet() == 0) {
                publishSubject.onComplete();
            }
        }));*/
        replay.connect();
        sleep(2,TimeUnit.SECONDS);
        subscribe1.dispose();
        sleep(1,TimeUnit.SECONDS);
        //replay.connect(consumer -> consumer.dispose());
        publishSubject.onComplete();
    } finally  {
        try {
            forkJoinPool.shutdown();
            int shutdownDelaySec = 2;
            System.out.println("………………等待 " + shutdownDelaySec + " 秒后结束服务……………… ");
            forkJoinPool.awaitTermination(shutdownDelaySec, TimeUnit.SECONDS);
        } catch (Exception ex) {
            System.out.println("捕获到 forkJoinPool.awaitTermination()方法的异常: " + ex.getClass().getName());
        } finally {
            System.out.println("调用 forkJoinPool.shutdownNow()结束服务...");
            List<Runnable> l = forkJoinPool.shutdownNow();
            System.out.println("还剩 " + l.size() + " 个任务等待被执行,服务已关闭 ");
        }
    }
}
 
public static void main(String args[]) {
	
	
	Observable<Long> observableInt = Observable.interval(2, TimeUnit.SECONDS);
	
	ConnectableObservable<Long> connectableIntObservable = observableInt.publish();
	connectableIntObservable.subscribe(i -> System.out.println("Observable #1 : "+i));

	connectableIntObservable.connect();
					
	addDelay(7000);
	connectableIntObservable.
		subscribe(i -> System.out.println("Observable #2 : "+i));

	addDelay(10000);

      
}
 
public static void main(String[] args) {
	ConnectableObservable observable = Observable.create(observer -> {
		observer.onNext("I am Hot Observable "+Math.random()*100);
		observer.onComplete();
	}).publish();

	observable.subscribe(consumer -> System.out.println("message:-" + consumer));
	observable.subscribe(consumer -> System.out.println("message:-" + consumer));
	
	observable.connect();

	
}
 
源代码10 项目: rxjava2   文件: HotObservable.java
public static void main(String[] args) throws InterruptedException{

        ConnectableObservable<Long> numbers = (ConnectableObservable<Long>) Observable
                .interval(1, TimeUnit.SECONDS)     // generate numbers
                .publish();                        // make it hot

        numbers.connect();                        // create internal subscribtion
        
        Disposable subscriber1 = numbers
            .subscribe(n ->System.out.println("First subscriber: "+ n ));


        Thread.sleep(3000);

        Disposable subscriber2 = numbers
            .subscribe(n ->System.out.println("  Second subscriber: "+ n ));

        Thread.sleep(5000);
        System.out.println(">>> First subscriber goes for lunch break");
        subscriber1.dispose();

        Thread.sleep(5000);
        System.out.println("<<< First subscriber returned from lunch");
        subscriber1 = numbers.subscribe(n ->System.out.println("First subscriber: "+ n ));

        Thread.sleep(60000); // Just to keep the program running
    }