下面列出了怎么用io.reactivex.processors.BehaviorProcessor的API类实例代码及写法,或者点击链接到github查看源代码。
public TEIDataPresenterImpl(TEIDataContracts.View view, D2 d2,
DashboardRepository dashboardRepository,
TeiDataRepository teiDataRepository,
RuleEngineRepository ruleEngineRepository,
String programUid, String teiUid, String enrollmentUid,
SchedulerProvider schedulerProvider,
PreferenceProvider preferenceProvider,
AnalyticsHelper analyticsHelper,
FilterManager filterManager) {
this.view = view;
this.d2 = d2;
this.dashboardRepository = dashboardRepository;
this.teiDataRepository = teiDataRepository;
this.ruleEngineRepository = ruleEngineRepository;
this.programUid = programUid;
this.teiUid = teiUid;
this.enrollmentUid = enrollmentUid;
this.schedulerProvider = schedulerProvider;
this.preferences = preferenceProvider;
this.analyticsHelper = analyticsHelper;
this.filterManager = filterManager;
this.compositeDisposable = new CompositeDisposable();
this.groupingProcessor = BehaviorProcessor.create();
}
@Inject
ObservationDetailsViewModel(ObservationRepository observationRepository) {
this.argsProcessor = BehaviorProcessor.create();
Flowable<Loadable<Observation>> observationStream =
argsProcessor.switchMapSingle(
args ->
observationRepository
.getObservation(
args.getProjectId(), args.getFeatureId(), args.getObservationId())
.map(Loadable::loaded)
.onErrorReturn(Loadable::error));
// TODO: Refactor to expose the fetched observation directly.
this.observations = LiveDataReactiveStreams.fromPublisher(observationStream);
this.progressBarVisibility =
LiveDataReactiveStreams.fromPublisher(
observationStream.map(ObservationDetailsViewModel::getProgressBarVisibility));
this.feature =
LiveDataReactiveStreams.fromPublisher(
observationStream.map(ObservationDetailsViewModel::getFeature));
}
@Inject
public OfflineAreaViewerViewModel(OfflineAreaRepository offlineAreaRepository, Context context) {
this.argsProcessor = BehaviorProcessor.create();
this.offlineAreaRepository = offlineAreaRepository;
this.context = new WeakReference<>(context);
this.areaStorageSize =
LiveDataReactiveStreams.fromPublisher(
this.argsProcessor.switchMap(
args ->
this.offlineAreaRepository
.getOfflineArea(args.getOfflineAreaId())
.toFlowable()
.flatMap(offlineAreaRepository::getIntersectingDownloadedTilesOnceAndStream)
.map(this::tilesToTotalStorageSize)));
this.offlineArea =
LiveDataReactiveStreams.fromPublisher(
this.argsProcessor.switchMap(
args ->
this.offlineAreaRepository
.getOfflineArea(args.getOfflineAreaId())
.toFlowable()));
}
@Test
public void test2() {
BehaviorProcessor<Integer> subject = BehaviorProcessor.create();
Flowable<Integer> observable = subject
.doOnNext(e -> {
System.out.println("This emits for second subscriber");
})
.doOnSubscribe(s -> System.out.println("OnSubscribe"))
.doOnCancel(() -> System.out.println("OnDispose"))
.replay(1)
.refCount()
.doOnNext(e -> { System.out.println("This does NOT emit for second subscriber"); });
System.out.println("Subscribe-1");
// This line causes the test to fail.
observable.takeUntil(Flowable.just(1)).test();
subject.onNext(2);
System.out.println("Subscribe-2");
TestSubscriber<Integer> subscriber = observable.take(1).test();
Assert.assertTrue(subscriber.awaitTerminalEvent(2, TimeUnit.SECONDS));
}
@Inject
public ProjectRepository(
LocalDataStore localDataStore,
RemoteDataStore remoteDataStore,
InMemoryCache cache,
LocalValueStore localValueStore) {
this.localDataStore = localDataStore;
this.remoteDataStore = remoteDataStore;
this.cache = cache;
this.localValueStore = localValueStore;
// BehaviorProcessor re-emits last requested project id to late subscribers.
this.activateProjectRequests = BehaviorProcessor.create();
// Stream that emits a value whenever the user changes projects.
Flowable<Optional<String>> distinctActivateProjectRequests =
activateProjectRequests
.distinctUntilChanged()
.doOnNext(id -> Log.v(TAG, "Requested project id changed: " + id));
// Stream that emits project loading state when requested id changes. Late subscribers receive
// the last project or loading state.
Flowable<Loadable<Project>> activeProject =
distinctActivateProjectRequests.switchMap(this::loadProject).onBackpressureLatest();
// Convert project loading state stream to Connectable to prevent loadProject() from being
// called once for each subscription. Instead, it will be called once on each project change,
// with each subscriber receiving a cached copy of the result. This is required in addition
// to onBackpressureLatest() above.
this.activeProjectStream = activeProject.replay(1).refCount();
}
@Inject
public FeatureDetailsViewModel() {
featureTitle = new ObservableField<>();
featureSubtitle = new ObservableField<>();
featureSubtitleVisibility = new ObservableInt();
selectedFeature = BehaviorProcessor.createDefault(Optional.empty());
}
MqttServerSource(Vertx vertx, MqttServerConnectorIncomingConfiguration config) {
this.broadcast = config.getBroadcast();
final MqttServerOptions options = mqttServerOptions(config);
this.mqttServer = MqttServer.create(vertx, options);
final BehaviorProcessor<MqttMessage> processor = BehaviorProcessor.create();
mqttServer.exceptionHandler(error -> {
log.exceptionThrown(error);
processor.onError(error);
});
mqttServer.endpointHandler(endpoint -> {
log.requestToConnect(endpoint.clientIdentifier(), endpoint.isCleanSession());
if (endpoint.auth() != null) {
log.requestToConnectUserName(endpoint.auth().getUsername(), endpoint.auth().getPassword());
}
if (endpoint.will() != null) {
log.requestToConnectWill(endpoint.will().getWillTopic(), endpoint.will().getWillMessageBytes(),
endpoint.will().getWillQos(), endpoint.will().isWillRetain());
}
log.requestToConnectKeepAlive(endpoint.keepAliveTimeSeconds());
endpoint.exceptionHandler(
error -> log.errorWithClient(endpoint.clientIdentifier(), error));
endpoint.disconnectHandler(
v -> log.clientDisconnected(endpoint.clientIdentifier()));
endpoint.pingHandler(
v -> log.pingReceived(endpoint.clientIdentifier()));
endpoint.publishHandler(message -> {
log.receivedMessageFromClient(message.payload(), message.qosLevel(), endpoint.clientIdentifier());
processor.onNext(new MqttMessage(message, endpoint.clientIdentifier(), () -> {
if (message.qosLevel() == AT_LEAST_ONCE) {
log.sendToClient("PUBACK", endpoint.clientIdentifier(), message.messageId());
endpoint.publishAcknowledge(message.messageId());
} else if (message.qosLevel() == EXACTLY_ONCE) {
log.sendToClient("PUBREC", endpoint.clientIdentifier(), message.messageId());
endpoint.publishReceived(message.messageId());
}
return CompletableFuture.completedFuture(null);
}));
});
endpoint.publishReleaseHandler(messageId -> {
log.sendToClient("PUBCOMP", endpoint.clientIdentifier(), messageId);
endpoint.publishComplete(messageId);
});
endpoint.subscribeHandler(subscribeMessage -> {
log.receivedSubscription(subscribeMessage, endpoint.clientIdentifier());
endpoint.close();
});
// accept connection from the remote client
// this implementation doesn't keep track of sessions
endpoint.accept(false);
});
this.source = ReactiveStreams.fromPublisher(processor
.delaySubscription(mqttServer.listen()
.onItem().invoke(ignored -> log.serverListeningOn(options.getHost(), mqttServer.actualPort()))
.onFailure().invoke(throwable -> log.failedToStart(throwable))
.toMulti()
.then(flow -> {
if (broadcast) {
return flow.broadcast().toAllSubscribers();
} else {
return flow;
}
}))
.doOnSubscribe(subscription -> log.newSubscriberAdded(subscription)));
}
public GCounter(String nodeId, String crdtId) {
super(nodeId, crdtId, BehaviorProcessor.create());
}
public PNCounter(String nodeId, String crtdId) {
super(nodeId, crtdId, BehaviorProcessor.create());
}
public LWWRegister(String nodeId, String crdtId) {
super(nodeId, crdtId, BehaviorProcessor.create());
this.clock = new StrictVectorClock(nodeId);
}
public static RxJava2ProcProxy behaviorProcessorProxy() {
return new RxJava2ProcProxy(BehaviorProcessor.create(), Roxy.TePolicy.PASS);
}
public static RxJava2ProcProxy serializedBehaviorProcessorProxy() {
return new RxJava2ProcProxy(BehaviorProcessor.create().toSerialized(), Roxy.TePolicy.PASS);
}
public static RxJava2ProcProxy safeBehaviorProcessorProxy() {
return new RxJava2ProcProxy(BehaviorProcessor.create(), Roxy.TePolicy.WRAP);
}
public static RxJava2ProcProxy safeSerializedBehaviorProcessorProxy() {
return new RxJava2ProcProxy(BehaviorProcessor.create().toSerialized(), Roxy.TePolicy.WRAP);
}