类io.reactivex.flowables.ConnectableFlowable源码实例Demo

下面列出了怎么用io.reactivex.flowables.ConnectableFlowable的API类实例代码及写法,或者点击链接到github查看源代码。

private ConnectableFlowable<List<FieldViewModel>> getFieldFlowable() {
    return showCalculationProcessor
            .startWith(true)
            .filter(newCalculation -> newCalculation)
            .flatMap(newCalculation -> Flowable.zip(
                    eventCaptureRepository.list(),
                    eventCaptureRepository.calculate(),
                    this::applyEffects)
            ).map(fields ->
                    {
                        emptyMandatoryFields = new HashMap<>();
                        for (FieldViewModel fieldViewModel : fields) {
                            if (fieldViewModel.mandatory() && DhisTextUtils.Companion.isEmpty(fieldViewModel.value()) && !sectionsToHide.contains(fieldViewModel.programStageSection()))
                                emptyMandatoryFields.put(fieldViewModel.uid(), fieldViewModel);
                        }
                        return fields;
                    }
            )
            .publish();

}
 
源代码2 项目: akarnokd-misc   文件: ConnectConnect.java
@Test(timeout = 5000)
public void schedulerTestRx() throws InterruptedException {
  CountDownLatch latch = new CountDownLatch(2);

  ConnectableFlowable<Integer> connectableFlux = Flowable.just(1, 2, 3, 4, 5).publish();

  connectableFlux
      .doOnEach(System.out::println)
      .subscribe(v -> {} , e -> {}, latch::countDown);

  connectableFlux
      .observeOn(Schedulers.computation())
      .doOnEach(System.out::println)
      .map(v -> v * 10)
      .observeOn(Schedulers.single())
      .doOnEach(System.out::println)
      .subscribeOn(Schedulers.io())
      .subscribe(v -> {} , e -> {}, latch::countDown);

  Thread.sleep(100);
  
  connectableFlux.connect();
  latch.await();
}
 
@Provides
@Singleton
@AppForeground
public ConnectableFlowable<String> providesAppForegroundEventStream(Application application) {
  ForegroundNotifier notifier = new ForegroundNotifier();
  ConnectableFlowable<String> foregroundFlowable = notifier.foregroundFlowable();
  foregroundFlowable.connect();

  application.registerActivityLifecycleCallbacks(notifier);
  return foregroundFlowable;
}
 
@Provides
@AnalyticsListener
@Singleton
ConnectableFlowable<String> providesAnalyticsConnectorEvents(
    AnalyticsEventsManager analyticsEventsManager) {
  return analyticsEventsManager.getAnalyticsEventsFlowable();
}
 
@Provides
@Singleton
@ProgrammaticTrigger
public ConnectableFlowable<String> providesProgramaticContextualTriggerStream() {

  ConnectableFlowable<String> flowable =
      Flowable.<String>create(e -> triggers.setListener((trigger) -> e.onNext(trigger)), BUFFER)
          .publish();

  flowable.connect();
  // We ignore the subscription since this connected flowable is expected to last the lifetime of
  // the app.
  return flowable;
}
 
@Inject
public InAppMessageStreamManager(
    @AppForeground ConnectableFlowable<String> appForegroundEventFlowable,
    @ProgrammaticTrigger ConnectableFlowable<String> programmaticTriggerEventFlowable,
    CampaignCacheClient campaignCacheClient,
    Clock clock,
    ApiClient apiClient,
    AnalyticsEventsManager analyticsEventsManager,
    Schedulers schedulers,
    ImpressionStorageClient impressionStorageClient,
    RateLimiterClient rateLimiterClient,
    @AppForeground RateLimit appForegroundRateLimit,
    TestDeviceHelper testDeviceHelper,
    FirebaseInstallationsApi firebaseInstallations,
    DataCollectionHelper dataCollectionHelper,
    AbtIntegrationHelper abtIntegrationHelper) {
  this.appForegroundEventFlowable = appForegroundEventFlowable;
  this.programmaticTriggerEventFlowable = programmaticTriggerEventFlowable;
  this.campaignCacheClient = campaignCacheClient;
  this.clock = clock;
  this.apiClient = apiClient;
  this.analyticsEventsManager = analyticsEventsManager;
  this.schedulers = schedulers;
  this.impressionStorageClient = impressionStorageClient;
  this.rateLimiterClient = rateLimiterClient;
  this.appForegroundRateLimit = appForegroundRateLimit;
  this.testDeviceHelper = testDeviceHelper;
  this.dataCollectionHelper = dataCollectionHelper;
  this.firebaseInstallations = firebaseInstallations;
  this.abtIntegrationHelper = abtIntegrationHelper;
}
 
@Provides
@Singleton
@AppForeground
public ConnectableFlowable<String> providesAppForegroundEventStream(Application application) {
  ConnectableFlowable<String> foregroundFlowable = foregroundNotifier.foregroundFlowable();
  foregroundFlowable.connect();

  application.registerActivityLifecycleCallbacks(foregroundNotifier);
  return foregroundFlowable;
}
 
@Before
public void setup() {
  foregroundNotifier = new ForegroundNotifier();
  ConnectableFlowable<String> foregroundFlowable = foregroundNotifier.foregroundFlowable();
  foregroundFlowable.connect();
  subscriber = foregroundFlowable.test();
}
 
源代码9 项目: J2ME-Loader   文件: AppsListFragment.java
@SuppressLint("CheckResult")
private void initDb() {
	appRepository = new AppRepository(getActivity().getApplication(), appSort.equals("date"));
	ConnectableFlowable<List<AppItem>> listConnectableFlowable = appRepository.getAll()
			.subscribeOn(Schedulers.io()).publish();
	listConnectableFlowable
			.firstElement()
			.subscribe(list -> AppUtils.updateDb(appRepository, list));
	listConnectableFlowable
			.observeOn(AndroidSchedulers.mainThread())
			.subscribe(list -> adapter.setItems(list));
	compositeDisposable.add(listConnectableFlowable.connect());
}
 
源代码10 项目: akarnokd-misc   文件: DoOnErrorFusion.java
public static void main(String[] args) {
    ConnectableFlowable<Integer> f = Flowable.just(1)
            .doOnNext(i -> {
                throw new IllegalArgumentException();
            })
            .doOnError(e -> {
                throw new IllegalStateException(e);
            }).publish();
    f.subscribe(
            i -> { throw new AssertionError(); }, 
            e -> e.printStackTrace());
    f.connect();
}
 
@Override
public void onStart() {
  super.onStart();
  _disposables = new CompositeDisposable();

  ConnectableFlowable<Object> tapEventEmitter = _rxBus.asFlowable().publish();

  _disposables //
      .add(
      tapEventEmitter.subscribe(
          event -> {
            if (event instanceof RxBusDemoFragment.TapEvent) {
              _showTapText();
            }
          }));

  _disposables.add(
      tapEventEmitter
          .publish(stream -> stream.buffer(stream.debounce(1, TimeUnit.SECONDS)))
          .observeOn(AndroidSchedulers.mainThread())
          .subscribe(
              taps -> {
                _showTapCount(taps.size());
              }));

  _disposables.add(tapEventEmitter.connect());
}
 
源代码12 项目: contentful.java   文件: Callbacks.java
static <O, C> CDACallback<C> subscribeAsync(
    Flowable<O> flowable, CDACallback<C> callback, CDAClient client) {
  ConnectableFlowable<O> connectable = flowable.observeOn(Schedulers.io()).publish();

  callback.setSubscription(connectable.subscribe(
      new SuccessAction<>(callback, client),
      new FailureAction(callback, client)));

  connectable.connect();

  return callback;
}
 
源代码13 项目: tutorials   文件: RxJavaHooksUnitTest.java
@Test
public void givenConnectableFlowable_whenAssembled_shouldExecuteTheHook() {

    RxJavaPlugins.setOnConnectableFlowableAssembly(connectableFlowable -> {
        hookCalled = true;
        return connectableFlowable;
    });

    ConnectableFlowable.range(1, 10)
        .publish()
        .connect();
    assertTrue(hookCalled);
}
 
@Test public void connectableFlowable_assembleInScope_subscribeNoScope() {
  ConnectableFlowable<Integer> source, errorSource;
  try (Scope scope = currentTraceContext.newScope(assemblyContext)) {
    source = Flowable.range(1, 3)
      .doOnNext(e -> assertInAssemblyContext())
      .doOnComplete(this::assertInAssemblyContext).publish();
    errorSource = Flowable.<Integer>error(new IllegalStateException())
      .doOnError(t -> assertInAssemblyContext())
      .doOnComplete(this::assertInAssemblyContext).publish();
  }

  subscribeInNoContext(
    source.autoConnect().toObservable(), errorSource.autoConnect().toObservable()
  ).assertResult(1, 2, 3);
}
 
@Test public void connectableFlowable_assembleInScope_subscribeInScope() {
  ConnectableFlowable<Integer> source, errorSource;
  try (Scope scope = currentTraceContext.newScope(assemblyContext)) {
    source = Flowable.range(1, 3)
      .doOnNext(e -> assertInAssemblyContext())
      .doOnComplete(this::assertInAssemblyContext).publish();
    errorSource = Flowable.<Integer>error(new IllegalStateException())
      .doOnError(t -> assertInAssemblyContext())
      .doOnComplete(this::assertInAssemblyContext).publish();
  }

  subscribeInDifferentContext(
    source.autoConnect().toObservable(), errorSource.autoConnect().toObservable()
  ).assertResult(1, 2, 3);
}
 
@Test public void connectableFlowable_assembleNoScope_subscribeInScope() {
  ConnectableFlowable<Integer> source = Flowable.range(1, 3)
    .doOnNext(e -> assertInSubscribeContext())
    .doOnComplete(this::assertInSubscribeContext).publish();
  ConnectableFlowable<Integer> errorSource = Flowable.<Integer>error(new IllegalStateException())
    .doOnError(t -> assertInSubscribeContext())
    .doOnComplete(this::assertInSubscribeContext).publish();

  subscribeInDifferentContext(
    source.autoConnect().toObservable(), errorSource.autoConnect().toObservable()
  ).assertResult(1, 2, 3);
}
 
源代码17 项目: firebase-android-sdk   文件: ForegroundNotifier.java
/** @returns a {@link ConnectableFlowable} representing a stream of foreground events */
public ConnectableFlowable<String> foregroundFlowable() {
  return foregroundSubject.toFlowable(BackpressureStrategy.BUFFER).publish();
}
 
源代码18 项目: firebase-android-sdk   文件: UniversalComponent.java
@AppForeground
ConnectableFlowable<String> appForegroundEventFlowable();
 
源代码19 项目: firebase-android-sdk   文件: UniversalComponent.java
@ProgrammaticTrigger
ConnectableFlowable<String> programmaticContextualTriggerFlowable();
 
源代码20 项目: firebase-android-sdk   文件: UniversalComponent.java
@AnalyticsListener
ConnectableFlowable<String> analyticsEventsFlowable();
 
public ConnectableFlowable<String> getAnalyticsEventsFlowable() {
  return flowable;
}
 
@Override
public ConnectableFlowable<String> foregroundFlowable() {
  return foregroundSubject.toFlowable(BackpressureStrategy.BUFFER).publish();
}
 
@Before
public void setup() {
  MockitoAnnotations.initMocks(this);
  ConnectableFlowable<String> appForegroundEventFlowable =
      Flowable.<String>create(e -> appForegroundEmitter = e, BUFFER).publish();
  appForegroundEventFlowable.connect();

  ConnectableFlowable<String> analyticsEventsFlowable =
      Flowable.<String>create(e -> analyticsEmitter = e, BUFFER).publish();
  analyticsEventsFlowable.connect();
  when(analyticsEventsManager.getAnalyticsEventsFlowable()).thenReturn(analyticsEventsFlowable);

  ConnectableFlowable<String> programmaticTriggerFlowable =
      Flowable.<String>create(e -> programmaticTriggerEmitter = e, BUFFER).publish();
  programmaticTriggerFlowable.connect();

  InAppMessageStreamManager streamManager =
      new InAppMessageStreamManager(
          appForegroundEventFlowable,
          programmaticTriggerFlowable,
          campaignCacheClient,
          new FakeClock(NOW),
          mockApiClient,
          analyticsEventsManager,
          schedulers,
          impressionStorageClient,
          rateLimiterClient,
          appForegroundRateLimit,
          testDeviceHelper,
          firebaseInstallations,
          dataCollectionHelper,
          abtIntegrationHelper);
  subscriber = streamManager.createFirebaseInAppMessageStream().test();
  when(application.getApplicationContext()).thenReturn(application);
  when(dataCollectionHelper.isAutomaticDataCollectionEnabled()).thenReturn(true);
  when(impressionStorageClient.clearImpressions(any(FetchEligibleCampaignsResponse.class)))
      .thenReturn(Completable.complete());
  when(rateLimiterClient.isRateLimited(appForegroundRateLimit)).thenReturn(Single.just(false));
  when(campaignCacheClient.get()).thenReturn(Maybe.empty());
  when(campaignCacheClient.put(any(FetchEligibleCampaignsResponse.class)))
      .thenReturn(Completable.complete());
  when(impressionStorageClient.isImpressed(any(ThickContent.class)))
      .thenReturn(Single.just(false));
  when(impressionStorageClient.getAllImpressions()).thenReturn(Maybe.just(CAMPAIGN_IMPRESSIONS));
  when(firebaseInstallations.getId()).thenReturn(Tasks.forResult(INSTALLATION_ID));
  when(firebaseInstallations.getToken(false))
      .thenReturn(Tasks.forResult(INSTALLATION_TOKEN_RESULT));
}
 
FlowableOnAssemblyConnectable(ConnectableFlowable<T> source) {
    this.source = source;
    this.assembled = new RxJavaAssemblyException();
}
 
RequestContextConnectableFlowable(ConnectableFlowable<T> source, RequestContext assemblyContext) {
    this.source = source;
    this.assemblyContext = assemblyContext;
}
 
源代码26 项目: incubator-gobblin   文件: StreamModelTaskRunner.java
protected void run() throws Exception {
  long maxWaitInMinute = taskState.getPropAsLong(ConfigurationKeys.FORK_MAX_WAIT_MININUTES, ConfigurationKeys.DEFAULT_FORK_MAX_WAIT_MININUTES);

  // Get the fork operator. By default IdentityForkOperator is used with a single branch.
  ForkOperator forkOperator = closer.register(this.taskContext.getForkOperator());

  RecordStreamWithMetadata<?, ?> stream = this.extractor.recordStream(this.shutdownRequested);
  // This prevents emitting records until a connect() call is made on the connectable stream
  ConnectableFlowable connectableStream = stream.getRecordStream().publish();

  // The cancel is not propagated to the extractor's record generator when it has been turned into a hot Flowable
  // by publish, so set the shutdownRequested flag on cancel to stop the extractor
  Flowable streamWithShutdownOnCancel = connectableStream.doOnCancel(() -> this.shutdownRequested.set(true));

  stream = stream.withRecordStream(streamWithShutdownOnCancel);

  stream = stream.mapRecords(r -> {
    this.task.onRecordExtract();
    return r;
  });
  if (this.task.isStreamingTask()) {

    // Start watermark manager and tracker
    if (this.watermarkTracker.isPresent()) {
      this.watermarkTracker.get().start();
    }
    this.watermarkManager.get().start();

    ((StreamingExtractor) this.taskContext.getRawSourceExtractor()).start(this.watermarkStorage.get());

    stream = stream.mapRecords(r -> {
      AcknowledgableWatermark ackableWatermark = new AcknowledgableWatermark(r.getWatermark());
      if (watermarkTracker.isPresent()) {
        watermarkTracker.get().track(ackableWatermark);
      }
      r.addCallBack(ackableWatermark);
      return r;
    });
  }

  // Use the recordStreamProcessor list if it is configured. This list can contain both all RecordStreamProcessor types
  if (!this.recordStreamProcessors.isEmpty()) {
    for (RecordStreamProcessor streamProcessor : this.recordStreamProcessors) {
      stream = streamProcessor.processStream(stream, this.taskState);
    }
  } else {
    if (this.converter instanceof MultiConverter) {
      // if multiconverter, unpack it
      for (Converter cverter : ((MultiConverter) this.converter).getConverters()) {
        stream = cverter.processStream(stream, this.taskState);
      }
    } else {
      stream = this.converter.processStream(stream, this.taskState);
    }
  }
  stream = this.rowChecker.processStream(stream, this.taskState);

  Forker.ForkedStream<?, ?> forkedStreams = new Forker().forkStream(stream, forkOperator, this.taskState);

  boolean isForkAsync = !this.task.areSingleBranchTasksSynchronous(this.taskContext) || forkedStreams.getForkedStreams().size() > 1;
  int bufferSize =
      this.taskState.getPropAsInt(ConfigurationKeys.FORK_RECORD_QUEUE_CAPACITY_KEY, ConfigurationKeys.DEFAULT_FORK_RECORD_QUEUE_CAPACITY);

  for (int fidx = 0; fidx < forkedStreams.getForkedStreams().size(); fidx ++) {
    RecordStreamWithMetadata<?, ?> forkedStream = forkedStreams.getForkedStreams().get(fidx);
    if (forkedStream != null) {
      if (isForkAsync) {
        forkedStream = forkedStream.mapStream(f -> f.observeOn(Schedulers.from(this.taskExecutor.getForkExecutor()), false, bufferSize));
      }
      Fork fork = new Fork(this.taskContext, forkedStream.getGlobalMetadata().getSchema(), forkedStreams.getForkedStreams().size(), fidx, this.taskMode);
      fork.consumeRecordStream(forkedStream);
      this.forks.put(Optional.of(fork), Optional.of(Futures.immediateFuture(null)));
      this.task.configureStreamingFork(fork);
    }
  }

  connectableStream.connect();

  if (!ExponentialBackoff.awaitCondition().callable(() -> this.forks.keySet().stream().map(Optional::get).allMatch(Fork::isDone)).
      initialDelay(1000L).maxDelay(1000L).maxWait(TimeUnit.MINUTES.toMillis(maxWaitInMinute)).await()) {
    throw new TimeoutException("Forks did not finish withing specified timeout.");
  }
}
 
源代码27 项目: brave   文件: TraceContextConnectableFlowable.java
TraceContextConnectableFlowable(
  ConnectableFlowable<T> source, CurrentTraceContext contextScoper, TraceContext assembled) {
  this.source = source;
  this.contextScoper = contextScoper;
  this.assembled = assembled;
}
 
源代码28 项目: brave   文件: Wrappers.java
public static <T> ConnectableFlowable<T> wrap(
  ConnectableFlowable<T> source, CurrentTraceContext contextScoper, TraceContext assembled) {
  return new TraceContextConnectableFlowable<>(source, contextScoper, assembled);
}
 
 类所在包
 类方法
 同包方法