类io.reactivex.processors.BehaviorProcessor源码实例Demo

下面列出了怎么用io.reactivex.processors.BehaviorProcessor的API类实例代码及写法,或者点击链接到github查看源代码。

public TEIDataPresenterImpl(TEIDataContracts.View view, D2 d2,
                            DashboardRepository dashboardRepository,
                            TeiDataRepository teiDataRepository,
                            RuleEngineRepository ruleEngineRepository,
                            String programUid, String teiUid, String enrollmentUid,
                            SchedulerProvider schedulerProvider,
                            PreferenceProvider preferenceProvider,
                            AnalyticsHelper analyticsHelper,
                            FilterManager filterManager) {
    this.view = view;
    this.d2 = d2;
    this.dashboardRepository = dashboardRepository;
    this.teiDataRepository = teiDataRepository;
    this.ruleEngineRepository = ruleEngineRepository;
    this.programUid = programUid;
    this.teiUid = teiUid;
    this.enrollmentUid = enrollmentUid;
    this.schedulerProvider = schedulerProvider;
    this.preferences = preferenceProvider;
    this.analyticsHelper = analyticsHelper;
    this.filterManager = filterManager;
    this.compositeDisposable = new CompositeDisposable();
    this.groupingProcessor = BehaviorProcessor.create();
}
 
@Inject
ObservationDetailsViewModel(ObservationRepository observationRepository) {
  this.argsProcessor = BehaviorProcessor.create();

  Flowable<Loadable<Observation>> observationStream =
      argsProcessor.switchMapSingle(
          args ->
              observationRepository
                  .getObservation(
                      args.getProjectId(), args.getFeatureId(), args.getObservationId())
                  .map(Loadable::loaded)
                  .onErrorReturn(Loadable::error));

  // TODO: Refactor to expose the fetched observation directly.
  this.observations = LiveDataReactiveStreams.fromPublisher(observationStream);

  this.progressBarVisibility =
      LiveDataReactiveStreams.fromPublisher(
          observationStream.map(ObservationDetailsViewModel::getProgressBarVisibility));

  this.feature =
      LiveDataReactiveStreams.fromPublisher(
          observationStream.map(ObservationDetailsViewModel::getFeature));
}
 
@Inject
public OfflineAreaViewerViewModel(OfflineAreaRepository offlineAreaRepository, Context context) {
  this.argsProcessor = BehaviorProcessor.create();
  this.offlineAreaRepository = offlineAreaRepository;
  this.context = new WeakReference<>(context);
  this.areaStorageSize =
      LiveDataReactiveStreams.fromPublisher(
          this.argsProcessor.switchMap(
              args ->
                  this.offlineAreaRepository
                      .getOfflineArea(args.getOfflineAreaId())
                      .toFlowable()
                      .flatMap(offlineAreaRepository::getIntersectingDownloadedTilesOnceAndStream)
                      .map(this::tilesToTotalStorageSize)));
  this.offlineArea =
      LiveDataReactiveStreams.fromPublisher(
          this.argsProcessor.switchMap(
              args ->
                  this.offlineAreaRepository
                      .getOfflineArea(args.getOfflineAreaId())
                      .toFlowable()));
}
 
源代码4 项目: akarnokd-misc   文件: ReplayRefCountSubjectTest.java
@Test
public void test2() {
    BehaviorProcessor<Integer> subject = BehaviorProcessor.create();

    Flowable<Integer> observable = subject
            .doOnNext(e -> { 
                System.out.println("This emits for second subscriber"); 
            })
            .doOnSubscribe(s -> System.out.println("OnSubscribe"))
            .doOnCancel(() -> System.out.println("OnDispose"))
            .replay(1)
            .refCount()
            .doOnNext(e -> { System.out.println("This does NOT emit for second subscriber"); });

    System.out.println("Subscribe-1");
    // This line causes the test to fail.
    observable.takeUntil(Flowable.just(1)).test();

    subject.onNext(2);

    System.out.println("Subscribe-2");
    TestSubscriber<Integer> subscriber = observable.take(1).test();
    Assert.assertTrue(subscriber.awaitTerminalEvent(2, TimeUnit.SECONDS));
}
 
源代码5 项目: ground-android   文件: ProjectRepository.java
@Inject
public ProjectRepository(
    LocalDataStore localDataStore,
    RemoteDataStore remoteDataStore,
    InMemoryCache cache,
    LocalValueStore localValueStore) {
  this.localDataStore = localDataStore;
  this.remoteDataStore = remoteDataStore;
  this.cache = cache;
  this.localValueStore = localValueStore;

  // BehaviorProcessor re-emits last requested project id to late subscribers.
  this.activateProjectRequests = BehaviorProcessor.create();

  // Stream that emits a value whenever the user changes projects.
  Flowable<Optional<String>> distinctActivateProjectRequests =
      activateProjectRequests
          .distinctUntilChanged()
          .doOnNext(id -> Log.v(TAG, "Requested project id changed: " + id));

  // Stream that emits project loading state when requested id changes. Late subscribers receive
  // the last project or loading state.
  Flowable<Loadable<Project>> activeProject =
      distinctActivateProjectRequests.switchMap(this::loadProject).onBackpressureLatest();

  // Convert project loading state stream to Connectable to prevent loadProject() from being
  // called once for each subscription. Instead, it will be called once on each project change,
  // with each subscriber receiving a cached copy of the result. This is required in addition
  // to onBackpressureLatest() above.
  this.activeProjectStream = activeProject.replay(1).refCount();
}
 
源代码6 项目: ground-android   文件: FeatureDetailsViewModel.java
@Inject
public FeatureDetailsViewModel() {
  featureTitle = new ObservableField<>();
  featureSubtitle = new ObservableField<>();
  featureSubtitleVisibility = new ObservableInt();
  selectedFeature = BehaviorProcessor.createDefault(Optional.empty());
}
 
MqttServerSource(Vertx vertx, MqttServerConnectorIncomingConfiguration config) {
    this.broadcast = config.getBroadcast();
    final MqttServerOptions options = mqttServerOptions(config);
    this.mqttServer = MqttServer.create(vertx, options);
    final BehaviorProcessor<MqttMessage> processor = BehaviorProcessor.create();

    mqttServer.exceptionHandler(error -> {
        log.exceptionThrown(error);
        processor.onError(error);
    });

    mqttServer.endpointHandler(endpoint -> {
        log.requestToConnect(endpoint.clientIdentifier(), endpoint.isCleanSession());

        if (endpoint.auth() != null) {
            log.requestToConnectUserName(endpoint.auth().getUsername(), endpoint.auth().getPassword());
        }
        if (endpoint.will() != null) {
            log.requestToConnectWill(endpoint.will().getWillTopic(), endpoint.will().getWillMessageBytes(),
                    endpoint.will().getWillQos(), endpoint.will().isWillRetain());
        }

        log.requestToConnectKeepAlive(endpoint.keepAliveTimeSeconds());

        endpoint.exceptionHandler(
                error -> log.errorWithClient(endpoint.clientIdentifier(), error));

        endpoint.disconnectHandler(
                v -> log.clientDisconnected(endpoint.clientIdentifier()));

        endpoint.pingHandler(
                v -> log.pingReceived(endpoint.clientIdentifier()));

        endpoint.publishHandler(message -> {
            log.receivedMessageFromClient(message.payload(), message.qosLevel(), endpoint.clientIdentifier());

            processor.onNext(new MqttMessage(message, endpoint.clientIdentifier(), () -> {
                if (message.qosLevel() == AT_LEAST_ONCE) {
                    log.sendToClient("PUBACK", endpoint.clientIdentifier(), message.messageId());
                    endpoint.publishAcknowledge(message.messageId());
                } else if (message.qosLevel() == EXACTLY_ONCE) {
                    log.sendToClient("PUBREC", endpoint.clientIdentifier(), message.messageId());
                    endpoint.publishReceived(message.messageId());
                }
                return CompletableFuture.completedFuture(null);
            }));
        });

        endpoint.publishReleaseHandler(messageId -> {
            log.sendToClient("PUBCOMP", endpoint.clientIdentifier(), messageId);
            endpoint.publishComplete(messageId);
        });

        endpoint.subscribeHandler(subscribeMessage -> {
            log.receivedSubscription(subscribeMessage, endpoint.clientIdentifier());
            endpoint.close();
        });

        // accept connection from the remote client
        // this implementation doesn't keep track of sessions
        endpoint.accept(false);
    });

    this.source = ReactiveStreams.fromPublisher(processor
            .delaySubscription(mqttServer.listen()
                    .onItem().invoke(ignored -> log.serverListeningOn(options.getHost(), mqttServer.actualPort()))
                    .onFailure().invoke(throwable -> log.failedToStart(throwable))
                    .toMulti()
                    .then(flow -> {
                        if (broadcast) {
                            return flow.broadcast().toAllSubscribers();
                        } else {
                            return flow;
                        }
                    }))
            .doOnSubscribe(subscription -> log.newSubscriberAdded(subscription)));
}
 
源代码8 项目: wurmloch-crdt   文件: GCounter.java
public GCounter(String nodeId, String crdtId) {
    super(nodeId, crdtId, BehaviorProcessor.create());
}
 
源代码9 项目: wurmloch-crdt   文件: PNCounter.java
public PNCounter(String nodeId, String crtdId) {
    super(nodeId, crtdId, BehaviorProcessor.create());
}
 
源代码10 项目: wurmloch-crdt   文件: LWWRegister.java
public LWWRegister(String nodeId, String crdtId) {
    super(nodeId, crdtId, BehaviorProcessor.create());
    this.clock = new StrictVectorClock(nodeId);
}
 
源代码11 项目: RHub   文件: RxJava2Proxies.java
public static RxJava2ProcProxy behaviorProcessorProxy() {
    return new RxJava2ProcProxy(BehaviorProcessor.create(), Roxy.TePolicy.PASS);
}
 
源代码12 项目: RHub   文件: RxJava2Proxies.java
public static RxJava2ProcProxy serializedBehaviorProcessorProxy() {
    return new RxJava2ProcProxy(BehaviorProcessor.create().toSerialized(), Roxy.TePolicy.PASS);
}
 
源代码13 项目: RHub   文件: RxJava2Proxies.java
public static RxJava2ProcProxy safeBehaviorProcessorProxy() {
    return new RxJava2ProcProxy(BehaviorProcessor.create(), Roxy.TePolicy.WRAP);
}
 
源代码14 项目: RHub   文件: RxJava2Proxies.java
public static RxJava2ProcProxy safeSerializedBehaviorProcessorProxy() {
    return new RxJava2ProcProxy(BehaviorProcessor.create().toSerialized(), Roxy.TePolicy.WRAP);
}
 
 类所在包
 类方法
 同包方法