io.reactivex.internal.fuseable.ScalarCallable#io.reactivex.observables.ConnectableObservable源码实例Demo

下面列出了io.reactivex.internal.fuseable.ScalarCallable#io.reactivex.observables.ConnectableObservable 实例代码,或者点击链接到github查看源代码,也可以在右侧发表评论。

@Test
void hot_publish_Observable() {
    ConnectableObservable<Object> observable = Observable.create(observer -> {
        System.out.println("Establishing connection");
        observer.onNext("处理的数字是: " + Math.random() * 100);
        observer.onNext("处理的数字是: " + Math.random() * 100);
        observer.onComplete();
    }).publish();
    observable.subscribe(consumer -> {
        System.out.println("一郎神: " + consumer);
    });
    observable.subscribe(consumer -> {
        System.out.println("二郎神: " + consumer);
    });
    observable.connect();
}
 
源代码2 项目: dive-into-graphql-in-java   文件: Subscription.java
public Publisher<Score> talkScores(final String title) {
    Observable<Score> observable = Observable.create(e -> {
        ScheduledExecutorService executorService = Executors.newScheduledThreadPool(1);
        executorService.scheduleAtFixedRate(() -> {
            Score s = Score.builder()
                           .title(title)
                           .score(Integer.valueOf((int) Math.floor(Math.random()*10)))
                           .build();
            e.onNext(s);
        }, 0, 2, TimeUnit.SECONDS);
    });

    ConnectableObservable connectableObservable = observable.share().publish();
    connectableObservable.connect();
    return connectableObservable.toFlowable(BackpressureStrategy.BUFFER);
}
 
源代码3 项目: dive-into-graphql-in-java   文件: Subscription.java
public Publisher<Score> talkScores(final String title) {
    Observable<Score> observable = Observable.create(e -> {
        ScheduledExecutorService executorService = Executors.newScheduledThreadPool(1);
        executorService.scheduleAtFixedRate(() -> {
            Score s = Score.builder()
                           .title(title)
                           .score(Integer.valueOf((int) Math.floor(Math.random()*10)))
                           .build();
            e.onNext(s);
        }, 0, 2, TimeUnit.SECONDS);
    });

    ConnectableObservable connectableObservable = observable.share().publish();
    connectableObservable.connect();
    return connectableObservable.toFlowable(BackpressureStrategy.BUFFER);
}
 
private void doSomeWork() {

        PublishSubject<Integer> source = PublishSubject.create();
        ConnectableObservable<Integer> connectableObservable = source.replay(3); // bufferSize = 3 to retain 3 values to replay
        connectableObservable.connect(); // connecting the connectableObservable

        connectableObservable.subscribe(getFirstObserver());

        source.onNext(1);
        source.onNext(2);
        source.onNext(3);
        source.onNext(4);
        source.onComplete();

        /*
         * it will emit 2, 3, 4 as (count = 3), retains the 3 values for replay
         */
        connectableObservable.subscribe(getSecondObserver());

    }
 
源代码5 项目: akarnokd-misc   文件: RxLosers.java
@Test
public void test() throws Exception {
    printWithTime("Starting.");
    Observable<String> obs = Observable.<String>create(e -> {
                slowOperation();
            })
            .subscribeOn(Schedulers.io())
            .observeOn(Schedulers.computation())
            .cache()
            ;
    ConnectableObservable<String> hot = obs.publish();
    hot.connect();
    printWithTime("Published and connected.");
    //Thread.sleep(3000);

    printWithTime("Subscribe.");
    hot.subscribe(this::printWithTime, Throwable::printStackTrace, () -> System.out.println("Done"));
    printWithTime("Subscribe2.");
    hot.subscribe(this::printWithTime, Throwable::printStackTrace, () -> System.out.println("Done"));
    Thread.sleep(3000);
}
 
源代码6 项目: akarnokd-misc   文件: PublishFuncExample.java
@Test
public void test() throws Exception {
    Observable<Integer> source = Observable.range(1, 5)
            .delaySubscription(1, TimeUnit.SECONDS);

        Function<Observable<Integer>, Observable<Integer>> func = o ->
            Observable.merge(o.take(1), o.takeLast(1));

        Observable<Integer> forkAndJoin = Observable.defer(() -> {
            ConnectableObservable<Integer> conn = source
                .doOnSubscribe(s -> System.out.println("Subscribed!"))
                .publish();
            Observable<Integer> result = func.apply(conn);
            conn.connect();
            return result;
        });

        forkAndJoin.subscribe(System.out::println);
        forkAndJoin.subscribe(System.out::println);
        forkAndJoin.subscribe(System.out::println);
    
        Thread.sleep(10000);
}
 
源代码7 项目: poc-graphql   文件: NewAssociationPublisher.java
/**
 * Constructor - Init the publishing of the events
 */
public NewAssociationPublisher(BusinessDataRepository bDataRepository) {
    this.businessDataRepository = bDataRepository;
    Observable<String> stockPriceUpdateObservable = Observable.create(emitter -> {
        ScheduledExecutorService executorService = Executors.newScheduledThreadPool(1);
        executorService.scheduleAtFixedRate(verifyPresenceOfNewAssociation(emitter), 0, 20, TimeUnit.SECONDS);
    });
    ConnectableObservable<String> connectableObservable = stockPriceUpdateObservable.share().publish();
    connectableObservable.connect();
    publisher = connectableObservable.toFlowable(BackpressureStrategy.BUFFER);
}
 
@Test
void infinite_publish_test() {
    ConnectableObservable<Object> observable = Observable.create(observer -> {
        BigInteger i = BigInteger.ZERO;
        while (true) {
            observer.onNext(i);
            i = i.add(BigInteger.ONE);
           /* if (i.compareTo(BigInteger.valueOf(1000))==0) {
                break;
            }*/
        }
    }).publish();
    observable.subscribe(x -> log(x));
    observable.connect();
}
 
源代码9 项目: Learn-Java-12-Programming   文件: HotObservable.java
private static void hot1(){
    ConnectableObservable<Long> hot = Observable.interval(10, TimeUnit.MILLISECONDS).publish();
    hot.connect();

    hot.subscribe(i -> System.out.println("First: " + i));
    pauseMs(25);

    hot.subscribe(i -> System.out.println("Second: " + i));
    pauseMs(55);
}
 
@Override
public ConnectableObservable<String> freeFlowEmps() {
	 List<String> rosterNames = new ArrayList<>();
	 Function<Employee, String> familyNames = (emp) -> emp.getLastName().toUpperCase();
	 ConnectableObservable<String> flowyNames = Observable.fromIterable(employeeDaoImpl.getEmployees()).map(familyNames).cache().publish();
	 
	 flowyNames.subscribe(System.out::println);
	 flowyNames.subscribe((name) ->{
		 rosterNames.add(name);
	 }); 
	 System.out.println(rosterNames);
	return flowyNames;
}
 
源代码11 项目: graphql-java-demo   文件: CommentPublisher.java
public CommentPublisher() {
    Observable<Comment> commentUpdateObservable = Observable.create(emitter -> {
        this.emitter = emitter;
    });

    ConnectableObservable<Comment> connectableObservable = commentUpdateObservable.share().publish();
    connectableObservable.connect();


    publisher = connectableObservable.toFlowable(BackpressureStrategy.BUFFER);
}
 
源代码12 项目: RxCommand   文件: RxCommand.java
/**
 * If the receiver is enabled, this method will:
 * <p>
 * 1. Invoke the `func` given at the time of creation.
 * 2. Multicast the returned observable.
 * 3. Send the multicasted observable on {@link #executionObservables()}.
 * 4. Subscribe (connect) to the original observable on the main thread.
 *
 * @param input The input value to pass to the receiver's `func`. This may be null.
 * @return the multicasted observable, after subscription. If the receiver is not
 * enabled, returns a observable that will send an error.
 */
@MainThread
public final Observable<T> execute(@Nullable Object input) {
    boolean enabled = mImmediateEnabled.blockingFirst();
    if (!enabled) {
        return Observable.error(new IllegalStateException("The command is disabled and cannot be executed"));
    }
    try {
        Observable<T> observable = mFunc.apply(input);
        if (observable == null) {
            throw new RuntimeException(String.format("null Observable returned from observable func for value %s", input));
        }

        // This means that `executing` and `enabled` will send updated values before
        // the observable actually starts performing work.
        final ConnectableObservable<T> connection = observable
                .subscribeOn(AndroidSchedulers.mainThread())
                .replay();

        mAddedExecutionObservableSubject.onNext(connection);
        connection.connect();
        return connection;
    } catch (Exception e) {
        e.printStackTrace();
        return Observable.error(e);
    }
}
 
源代码13 项目: tutorials   文件: RxJavaHooksUnitTest.java
@Test
public void givenConnectableObservable_whenAssembled_shouldExecuteTheHook() {

    RxJavaPlugins.setOnConnectableObservableAssembly(connectableObservable -> {
        hookCalled = true;
        return connectableObservable;
    });

    ConnectableObservable.range(1, 10)
        .publish()
        .connect();
    assertTrue(hookCalled);
}
 
@Test public void connectableObservable_assembleInScope_subscribeNoScope() {
  ConnectableObservable<Integer> source, errorSource;
  try (Scope scope = currentTraceContext.newScope(assemblyContext)) {
    source = Observable.range(1, 3)
      .doOnNext(e -> assertInAssemblyContext())
      .doOnComplete(this::assertInAssemblyContext).publish();
    errorSource = Observable.<Integer>error(new IllegalStateException())
      .doOnError(t -> assertInAssemblyContext())
      .doOnComplete(this::assertInAssemblyContext).publish();
  }

  subscribeInNoContext(source.autoConnect(), errorSource.autoConnect()).assertResult(1, 2, 3);
}
 
@Test public void connectableObservable_assembleInScope_subscribeInScope() {
  ConnectableObservable<Integer> source, errorSource;
  try (Scope scope = currentTraceContext.newScope(assemblyContext)) {
    source = Observable.range(1, 3)
      .doOnNext(e -> assertInAssemblyContext())
      .doOnComplete(this::assertInAssemblyContext).publish();
    errorSource = Observable.<Integer>error(new IllegalStateException())
      .doOnError(t -> assertInAssemblyContext())
      .doOnComplete(this::assertInAssemblyContext).publish();
  }

  subscribeInDifferentContext(source.autoConnect(), errorSource.autoConnect())
    .assertResult(1, 2, 3);
}
 
@Test public void connectableObservable_assembleNoScope_subscribeInScope() {
  ConnectableObservable<Integer> source = Observable.range(1, 3)
    .doOnNext(e -> assertInSubscribeContext())
    .doOnComplete(this::assertInSubscribeContext).publish();
  ConnectableObservable<Integer> errorSource =
    Observable.<Integer>error(new IllegalStateException())
      .doOnError(t -> assertInSubscribeContext())
      .doOnComplete(this::assertInSubscribeContext).publish();

  subscribeInDifferentContext(source.autoConnect(), errorSource.autoConnect())
    .assertResult(1, 2, 3);
}
 
@Test
void replay_PublishSubject_test() {
    PublishSubject<Object> publishSubject = PublishSubject.create();
    ConnectableObservable<Object> replay = publishSubject.replay();
    ForkJoinPool forkJoinPool = ForkJoinPool.commonPool();
    List<Integer> integers = new ArrayList<>();
    for (int i=1;i<10;i++){
        integers.add(i);
    }
    Disposable subscribe1 = replay.subscribe(x -> {
        log("一郎神: " + x);
    }, Throwable::printStackTrace, () -> System.out.println("Emission completed"));

    Disposable subscribe2 = replay.subscribe(x -> {
        log("二郎神: " + x);
    }, Throwable::printStackTrace, () -> System.out.println("Emission completed"));
    Disposable subscribe3 = replay.subscribe(x -> {
        log("三郎神: " + x);
    }, Throwable::printStackTrace, () -> System.out.println("Emission completed"));
    AtomicInteger atomicInteger = new AtomicInteger(integers.size());
    try {
        forkJoinPool.submit(() -> {
            integers.forEach(id -> {
                sleep(1,TimeUnit.SECONDS);
                publishSubject.onNext(id);
                if (atomicInteger.decrementAndGet() == 0) {
                    publishSubject.onComplete();
                }
            });
        });
       /* integers.forEach(id -> forkJoinPool.submit(() -> {
            sleep(3,TimeUnit.SECONDS);
            publishSubject.onNext(id);
            if (atomicInteger.decrementAndGet() == 0) {
                publishSubject.onComplete();
            }
        }));*/
        replay.connect();
        sleep(2,TimeUnit.SECONDS);
        subscribe1.dispose();
        sleep(1,TimeUnit.SECONDS);
        //replay.connect(consumer -> consumer.dispose());
        publishSubject.onComplete();
    } finally  {
        try {
            forkJoinPool.shutdown();
            int shutdownDelaySec = 2;
            System.out.println("………………等待 " + shutdownDelaySec + " 秒后结束服务……………… ");
            forkJoinPool.awaitTermination(shutdownDelaySec, TimeUnit.SECONDS);
        } catch (Exception ex) {
            System.out.println("捕获到 forkJoinPool.awaitTermination()方法的异常: " + ex.getClass().getName());
        } finally {
            System.out.println("调用 forkJoinPool.shutdownNow()结束服务...");
            List<Runnable> l = forkJoinPool.shutdownNow();
            System.out.println("还剩 " + l.size() + " 个任务等待被执行,服务已关闭 ");
        }
    }
}
 
源代码18 项目: Spring-5.0-Projects   文件: RxJavaHotObservable2.java
public static void main(String args[]) {
	
	
	Observable<Long> observableInt = Observable.interval(2, TimeUnit.SECONDS);
	
	ConnectableObservable<Long> connectableIntObservable = observableInt.publish();
	connectableIntObservable.subscribe(i -> System.out.println("Observable #1 : "+i));

	connectableIntObservable.connect();
					
	addDelay(7000);
	connectableIntObservable.
		subscribe(i -> System.out.println("Observable #2 : "+i));

	addDelay(10000);

      
}
 
ObservableOnAssemblyConnectable(ConnectableObservable<T> source) {
    this.source = source;
    this.assembled = new RxJavaAssemblyException();
}
 
源代码20 项目: RxGroups   文件: SubscriptionProxy.java
private SubscriptionProxy(Observable<T> sourceObservable, Action onTerminate) {
  final ConnectableObservable<T> replay = sourceObservable.replay();
  sourceDisposable = replay.connect();
  proxy = replay.doOnTerminate(onTerminate);
  disposableList = new CompositeDisposable(sourceDisposable);
}
 
RequestContextConnectableObservable(ConnectableObservable<T> source, RequestContext assemblyContext) {
    this.source = source;
    this.assemblyContext = assemblyContext;
}
 
源代码22 项目: brave   文件: TraceContextConnectableObservable.java
TraceContextConnectableObservable(
  ConnectableObservable<T> source, CurrentTraceContext contextScoper, TraceContext assembled) {
  this.source = source;
  this.contextScoper = contextScoper;
  this.assembled = assembled;
}
 
源代码23 项目: brave   文件: Wrappers.java
public static <T> ConnectableObservable<T> wrap(
  ConnectableObservable<T> source, CurrentTraceContext contextScoper, TraceContext assembled) {
  return new TraceContextConnectableObservable<>(source, contextScoper, assembled);
}
 
public StockTickerPublisher() {
    Observable<StockPriceUpdate> stockPriceUpdateObservable = Observable.create(emitter -> {

        ScheduledExecutorService executorService = Executors.newScheduledThreadPool(1);
        executorService.scheduleAtFixedRate(newStockTick(emitter), 0, 2, TimeUnit.SECONDS);

    });

    ConnectableObservable<StockPriceUpdate> connectableObservable = stockPriceUpdateObservable.share().publish();
    connectableObservable.connect();

    publisher = connectableObservable.toFlowable(BackpressureStrategy.BUFFER);
}
 
public StockTickerPublisher() {
    Observable<StockPriceUpdate> stockPriceUpdateObservable = Observable.create(emitter -> {

        ScheduledExecutorService executorService = Executors.newScheduledThreadPool(1);
        executorService.scheduleAtFixedRate(newStockTick(emitter), 0, 2, TimeUnit.SECONDS);

    });

    ConnectableObservable<StockPriceUpdate> connectableObservable = stockPriceUpdateObservable.share().publish();
    connectableObservable.connect();

    publisher = connectableObservable.toFlowable(BackpressureStrategy.BUFFER);
}
 
public static void main(String[] args) {
	ConnectableObservable observable = Observable.create(observer -> {
		observer.onNext("I am Hot Observable "+Math.random()*100);
		observer.onComplete();
	}).publish();

	observable.subscribe(consumer -> System.out.println("message:-" + consumer));
	observable.subscribe(consumer -> System.out.println("message:-" + consumer));
	
	observable.connect();

	
}
 
源代码27 项目: rxjava2   文件: HotObservable.java
public static void main(String[] args) throws InterruptedException{

        ConnectableObservable<Long> numbers = (ConnectableObservable<Long>) Observable
                .interval(1, TimeUnit.SECONDS)     // generate numbers
                .publish();                        // make it hot

        numbers.connect();                        // create internal subscribtion
        
        Disposable subscriber1 = numbers
            .subscribe(n ->System.out.println("First subscriber: "+ n ));


        Thread.sleep(3000);

        Disposable subscriber2 = numbers
            .subscribe(n ->System.out.println("  Second subscriber: "+ n ));

        Thread.sleep(5000);
        System.out.println(">>> First subscriber goes for lunch break");
        subscriber1.dispose();

        Thread.sleep(5000);
        System.out.println("<<< First subscriber returned from lunch");
        subscriber1 = numbers.subscribe(n ->System.out.println("First subscriber: "+ n ));

        Thread.sleep(60000); // Just to keep the program running
    }
 
public ConnectableObservable<String> freeFlowEmps();