下面列出了怎么用io.reactivex.subjects.BehaviorSubject的API类实例代码及写法,或者点击链接到github查看源代码。
public SearchTEPresenter(SearchTEContractsModule.View view,
D2 d2,
SearchRepository searchRepository,
SchedulerProvider schedulerProvider,
AnalyticsHelper analyticsHelper,
@Nullable String initialProgram) {
this.view = view;
this.searchRepository = searchRepository;
this.d2 = d2;
this.schedulerProvider = schedulerProvider;
this.analyticsHelper = analyticsHelper;
compositeDisposable = new CompositeDisposable();
queryData = new HashMap<>();
queryProcessor = PublishProcessor.create();
mapProcessor = PublishProcessor.create();
enrollmentMapProcessor = PublishProcessor.create();
selectedProgram = initialProgram != null ? d2.programModule().programs().uid(initialProgram).blockingGet() : null;
currentProgram = BehaviorSubject.createDefault(initialProgram != null ? initialProgram : "");
}
@Override
public Observable<ConnectableState> connect() {
if (sharedConnectionState != null) {
return sharedConnectionState;
}
connectionStateSubject = BehaviorSubject.create();
sharedConnectionState =
connectionStateSubject
.doOnNext(state -> connectedRelay.accept(state == CONNECTED))
.doOnSubscribe(disposable -> processConnect())
.doFinally(() -> disconnect())
.replay(1)
.refCount(); // Don't do this on normal disconnect status 0 / 8
return sharedConnectionState;
}
@Test
public void testClose_waitForCommands() {
BehaviorSubject<Boolean> idler = BehaviorSubject.createDefault(false);
when(cmdProcessor.isIdle()).thenReturn(idler);
RxCmdShell shell = new RxCmdShell(builder, rxShell);
shell.open().test().awaitDone(1, TimeUnit.SECONDS).assertNoTimeout().values().get(0);
shell.isAlive().test().awaitDone(1, TimeUnit.SECONDS).assertNoTimeout().assertValue(true);
shell.close().test().awaitDone(1, TimeUnit.SECONDS).assertTimeout();
idler.onNext(true);
shell.close().test().awaitDone(1, TimeUnit.SECONDS).assertNoTimeout().assertValue(0);
verify(cmdProcessor).isIdle();
verify(rxShellSession).close();
}
@Override
public <T extends Notification<?>> Flowable<T> subscribe(
Request request, String unsubscribeMethod, Class<T> responseType) {
// We can't use usual Observer since we can call "onError"
// before first client is subscribed and we need to
// preserve it
BehaviorSubject<T> subject = BehaviorSubject.create();
// We need to subscribe synchronously, since if we return
// an Flowable to a client before we got a reply
// a client can unsubscribe before we know a subscription
// id and this can cause a race condition
subscribeToEventsStream(request, subject, responseType);
return subject.doOnDispose(() -> closeSubscription(subject, unsubscribeMethod))
.toFlowable(BackpressureStrategy.BUFFER);
}
@Override
public <T extends Notification<?>> Flowable<T> subscribe(
Request request, String unsubscribeMethod, Class<T> responseType) {
// We can't use usual Observer since we can call "onError"
// before first client is subscribed and we need to
// preserve it
BehaviorSubject<T> subject = BehaviorSubject.create();
// We need to subscribe synchronously, since if we return
// an Flowable to a client before we got a reply
// a client can unsubscribe before we know a subscription
// id and this can cause a race condition
subscribeToEventsStream(request, subject, responseType);
return subject.doOnDispose(() -> closeSubscription(subject, unsubscribeMethod))
.toFlowable(BackpressureStrategy.BUFFER);
}
@Before
public void setUp() throws Exception {
List<ViewModel> vms = TestViewModel.dummyViewModels(INITIAL_COUNT);
viewModelsSource = BehaviorSubject.createDefault(vms);
testViewProvider = new TestViewProvider();
testBinder = new TestViewModelBinder();
subscriptionCounter = new SubscriptionCounter<>();
sut = new RecyclerViewAdapter(viewModelsSource.compose(subscriptionCounter),
testViewProvider, testBinder);
notifyCallCount = 0;
defaultObserver = new RecyclerView.AdapterDataObserver() {
@Override
public void onChanged() {
notifyCallCount++;
}
};
sut.registerAdapterDataObserver(defaultObserver);
}
@Test
public void test() {
BehaviorSubject<Integer> subject = BehaviorSubject.create();
Observable<Integer> observable = subject
.doOnNext(e -> {
System.out.println("This emits for second subscriber");
})
.doOnSubscribe(s -> System.out.println("OnSubscribe"))
.doOnDispose(() -> System.out.println("OnDispose"))
.replay(1)
.refCount()
.doOnNext(e -> { System.out.println("This does NOT emit for second subscriber"); });
System.out.println("Subscribe-1");
// This line causes the test to fail.
observable.takeUntil(Observable.just(1)).test();
subject.onNext(2);
System.out.println("Subscribe-2");
TestObserver<Integer> subscriber = observable.take(1).test();
Assert.assertTrue(subscriber.awaitTerminalEvent(2, TimeUnit.SECONDS));
}
public static Observable<String> fromSearchView(@NonNull final SearchView searchView) {
final BehaviorSubject<String> subject = BehaviorSubject.create();
searchView.setOnQueryTextListener(new SearchView.OnQueryTextListener() {
@Override
public boolean onQueryTextSubmit(String query) {
subject.onNext(query);
subject.onComplete();
searchView.clearFocus();
return true;
}
@Override
public boolean onQueryTextChange(String newText) {
subject.onNext(newText);
return true;
}
});
return subject;
}
/**
* @param rpc An API provider using HTTP or WebSocket
*/
public RpcRx(RpcCore rpc) {
this.api = rpc;
this.eventEmitter = new EventEmitter();
this.isConnected = BehaviorSubject.createDefault(this.api.getProvider().isConnected());
this.initEmitters(this.api.getProvider());
this.author = this.createInterface(this.api.author());
this.chain = this.createInterface(this.api.chain());
this.state = this.createInterface(this.api.state());
this.system = this.createInterface(this.api.system());
}
@Test
void behaviorSubject_test() {
BehaviorSubject<Object> behaviorSubject = BehaviorSubject.create();
behaviorSubject.onNext(1L);
behaviorSubject.onNext(2L);
behaviorSubject.subscribe(x -> log("一郎神: " + x),
Throwable::printStackTrace,
() -> System.out.println("Emission completed"),
disposable -> System.out.println("onSubscribe")
);
behaviorSubject.onNext(10L);
behaviorSubject.onComplete();
}
@Test
void behaviorSubject_error_test() {
BehaviorSubject<Object> behaviorSubject = BehaviorSubject.create();
behaviorSubject.onNext(1L);
behaviorSubject.onNext(2L);
behaviorSubject.onError(new RuntimeException("我来耍下宝"));
behaviorSubject.subscribe(x -> log("一郎神: " + x),
Throwable::printStackTrace,
() -> System.out.println("Emission completed"),
disposable -> System.out.println("onSubscribe")
);
behaviorSubject.onNext(10L);
behaviorSubject.onComplete();
}
public EventCapturePresenterImpl(EventCaptureContract.View view, String eventUid,
EventCaptureContract.EventCaptureRepository eventCaptureRepository,
RulesUtilsProvider rulesUtils,
ValueStore valueStore, SchedulerProvider schedulerProvider) {
this.view = view;
this.eventUid = eventUid;
this.eventCaptureRepository = eventCaptureRepository;
this.rulesUtils = rulesUtils;
this.valueStore = valueStore;
this.schedulerProvider = schedulerProvider;
this.currentPosition = 0;
this.sectionsToHide = new ArrayList<>();
this.currentSection = new ObservableField<>("");
this.errors = new HashMap<>();
this.emptyMandatoryFields = new HashMap<>();
this.canComplete = true;
this.sectionList = new ArrayList<>();
this.compositeDisposable = new CompositeDisposable();
currentSectionPosition = PublishProcessor.create();
sectionProcessor = PublishProcessor.create();
showCalculationProcessor = PublishProcessor.create();
progressProcessor = PublishProcessor.create();
sectionAdjustProcessor = PublishProcessor.create();
formAdjustProcessor = PublishProcessor.create();
notesCounterProcessor = PublishProcessor.create();
formFieldsProcessor = BehaviorSubject.createDefault(new ArrayList<>());
}
@Inject
public AuthenticationManager(Application application, ActivityStreams activityStreams) {
this.signInState = BehaviorSubject.create();
this.firebaseAuth = FirebaseAuth.getInstance();
this.googleSignInOptions =
new GoogleSignInOptions.Builder(GoogleSignInOptions.DEFAULT_SIGN_IN)
.requestIdToken(application.getResources().getString(R.string.default_web_client_id))
.requestEmail()
.requestProfile()
.build();
this.activityStreams = activityStreams;
this.activityResultsSubscription =
activityStreams.getActivityResults(SIGN_IN_REQUEST_CODE).subscribe(this::onActivityResult);
}
@Inject
public LocationManager(
Application app,
PermissionsManager permissionsManager,
SettingsManager settingsManager,
RxFusedLocationProviderClient locationClient) {
this.permissionsManager = permissionsManager;
this.settingsManager = settingsManager;
this.locationClient = locationClient;
this.locationUpdates = BehaviorSubject.create();
this.locationUpdateCallback = RxLocationCallback.create(locationUpdates);
}
boolean processRound(BehaviorSubject<String> addToConsoleOutput) throws Exception {
if (roundProcessed) {
throw new InvalidOperationException("This round has already been processed!");
}
boolean processed = gameRoundProcessor.processRound(gameMap, commandsToProcess);
ArrayList<String> errorList = gameRoundProcessor.getErrorList();
String errorListText = "Error List: " + Arrays.toString(errorList.toArray());
addToConsoleOutput.onNext(errorListText);
roundProcessed = true;
return processed;
}
public GameEngineRunner() {
this.unsubscribe = BehaviorSubject.create();
this.addToConsoleOutput = BehaviorSubject.create();
this.addToConsoleOutput
.takeUntil(this.unsubscribe)
.subscribe(text -> consoleOutput += text);
}
@Before
public void setup() throws Exception {
super.setup();
when(builder.getProcessorFactory()).thenReturn(commandProcessorFactory);
when(commandProcessorFactory.create()).thenReturn(cmdProcessor);
BehaviorSubject<Boolean> idlePub = BehaviorSubject.createDefault(true);
when(cmdProcessor.isIdle()).thenReturn(idlePub);
when(rxShell.open()).thenReturn(Single.create(emitter -> {
when(rxShellSession.waitFor()).thenReturn(Single.create(e -> waitForEmitter = e));
emitter.onSuccess(rxShellSession);
}));
when(rxShellSession.waitFor()).thenReturn(Single.just(0));
when(rxShellSession.isAlive()).thenReturn(Single.just(true));
when(rxShellSession.cancel()).thenReturn(Completable.create(e -> {
when(rxShellSession.isAlive()).thenReturn(Single.just(false));
waitForEmitter.onSuccess(1);
idlePub.onNext(true);
idlePub.onComplete();
e.onComplete();
}));
when(rxShellSession.close()).thenReturn(Single.create(e -> {
when(rxShellSession.isAlive()).thenReturn(Single.just(false));
waitForEmitter.onSuccess(0);
e.onSuccess(0);
idlePub.onNext(true);
idlePub.onComplete();
}));
}
private <T extends Notification<?>> void processSubscriptionResponse(
BcosSubscribe subscriptionReply, BehaviorSubject<T> subject, Class<T> responseType)
throws IOException {
if (!subscriptionReply.hasError()) {
establishSubscription(subject, responseType, subscriptionReply);
} else {
reportSubscriptionError(subject, subscriptionReply);
}
}
private <T extends Notification<?>> void establishSubscription(
BehaviorSubject<T> subject, Class<T> responseType, BcosSubscribe subscriptionReply) {
log.info("Subscribed to RPC events with id {}", subscriptionReply.getSubscriptionId());
subscriptionForId.put(
subscriptionReply.getSubscriptionId(),
new WebSocketSubscription<>(subject, responseType));
}
private <T extends Notification<?>> String getSubscriptionId(BehaviorSubject<T> subject) {
return subscriptionForId
.entrySet()
.stream()
.filter(entry -> entry.getValue().getSubject() == subject)
.map(Map.Entry::getKey)
.findFirst()
.orElse(null);
}
private <T extends Notification<?>> void reportSubscriptionError(
BehaviorSubject<T> subject, BcosSubscribe subscriptionReply) {
Response.Error error = subscriptionReply.getError();
log.error("Subscription request returned error: {}", error.getMessage());
subject.onError(
new IOException(
String.format(
"Subscription request failed with error: %s", error.getMessage())));
}
private <T extends Notification<?>> void subscribeToEventsStream(
Request request, BehaviorSubject<T> subject, Class<T> responseType) {
subscriptionRequestForId.put(
request.getId(), new WebSocketSubscription<>(subject, responseType));
try {
send(request, BcosSubscribe.class);
} catch (IOException e) {
log.error("Failed to subscribe to RPC events with request id {}", request.getId());
subject.onError(e);
}
}
private <T extends Notification<?>> void closeSubscription(
BehaviorSubject<T> subject, String unsubscribeMethod) {
String subscriptionId = getSubscriptionId(subject);
if (subscriptionId != null) {
subscriptionForId.remove(subscriptionId);
unsubscribeFromEventsStream(subscriptionId, unsubscribeMethod);
} else {
log.warn("Trying to unsubscribe from a non-existing subscription. Race condition?");
}
}
private <T extends Notification<?>> void processSubscriptionResponse(
EthSubscribe subscriptionReply, BehaviorSubject<T> subject, Class<T> responseType) {
if (!subscriptionReply.hasError()) {
establishSubscription(subject, responseType, subscriptionReply);
} else {
reportSubscriptionError(subject, subscriptionReply);
}
}
private <T extends Notification<?>> void establishSubscription(
BehaviorSubject<T> subject, Class<T> responseType, EthSubscribe subscriptionReply) {
log.debug("Subscribed to RPC events with id {}", subscriptionReply.getSubscriptionId());
subscriptionForId.put(
subscriptionReply.getSubscriptionId(),
new WebSocketSubscription<>(subject, responseType));
}
private <T extends Notification<?>> String getSubscriptionId(BehaviorSubject<T> subject) {
return subscriptionForId.entrySet().stream()
.filter(entry -> entry.getValue().getSubject() == subject)
.map(Map.Entry::getKey)
.findFirst()
.orElse(null);
}
private <T extends Notification<?>> void reportSubscriptionError(
BehaviorSubject<T> subject, EthSubscribe subscriptionReply) {
Response.Error error = subscriptionReply.getError();
log.error("Subscription request returned error: {}", error.getMessage());
subject.onError(
new IOException(
String.format(
"Subscription request failed with error: %s", error.getMessage())));
}
private <T extends Notification<?>> void subscribeToEventsStream(
Request request, BehaviorSubject<T> subject, Class<T> responseType) {
subscriptionRequestForId.put(
request.getId(), new WebSocketSubscription<>(subject, responseType));
try {
send(request, EthSubscribe.class);
} catch (IOException e) {
log.error("Failed to subscribe to RPC events with request id {}", request.getId());
subject.onError(e);
}
}
private <T extends Notification<?>> void closeSubscription(
BehaviorSubject<T> subject, String unsubscribeMethod) {
String subscriptionId = getSubscriptionId(subject);
if (subscriptionId != null) {
subscriptionForId.remove(subscriptionId);
unsubscribeFromEventsStream(subscriptionId, unsubscribeMethod);
} else {
log.warn("Trying to unsubscribe from a non-existing subscription. Race condition?");
}
}
/**
* Constructs a new GoogleAccount instance.
*/
GoogleAccount(Context context, @Nullable Account account, GoogleSignInAccount googleSignInAccount) {
super(context);
accountSubject =
(account != null) ? BehaviorSubject.createDefault(account) : BehaviorSubject.create();
this.googleSignInAccount = googleSignInAccount;
this.account = googleSignInAccount.getAccount();
setAccount(account);
this.accountKey = AccountsUtils.makeAccountKey(NAMESPACE, this.googleSignInAccount.getId());
}