类io.reactivex.subjects.BehaviorSubject源码实例Demo

下面列出了怎么用io.reactivex.subjects.BehaviorSubject的API类实例代码及写法,或者点击链接到github查看源代码。

public SearchTEPresenter(SearchTEContractsModule.View view,
                         D2 d2,
                         SearchRepository searchRepository,
                         SchedulerProvider schedulerProvider,
                         AnalyticsHelper analyticsHelper,
                         @Nullable String initialProgram) {
    this.view = view;
    this.searchRepository = searchRepository;
    this.d2 = d2;
    this.schedulerProvider = schedulerProvider;
    this.analyticsHelper = analyticsHelper;
    compositeDisposable = new CompositeDisposable();
    queryData = new HashMap<>();
    queryProcessor = PublishProcessor.create();
    mapProcessor = PublishProcessor.create();
    enrollmentMapProcessor = PublishProcessor.create();
    selectedProgram = initialProgram != null ? d2.programModule().programs().uid(initialProgram).blockingGet() : null;
    currentProgram = BehaviorSubject.createDefault(initialProgram != null ? initialProgram : "");
}
 
源代码2 项目: RxCentralBle   文件: CorePeripheral.java
@Override
public Observable<ConnectableState> connect() {
  if (sharedConnectionState != null) {
    return sharedConnectionState;
  }

  connectionStateSubject = BehaviorSubject.create();

  sharedConnectionState =
      connectionStateSubject
          .doOnNext(state -> connectedRelay.accept(state == CONNECTED))
          .doOnSubscribe(disposable -> processConnect())
          .doFinally(() -> disconnect())
          .replay(1)
          .refCount(); // Don't do this on normal disconnect status 0 / 8

  return sharedConnectionState;
}
 
源代码3 项目: RxShell   文件: RxCmdShellTest.java
@Test
public void testClose_waitForCommands() {
    BehaviorSubject<Boolean> idler = BehaviorSubject.createDefault(false);
    when(cmdProcessor.isIdle()).thenReturn(idler);

    RxCmdShell shell = new RxCmdShell(builder, rxShell);
    shell.open().test().awaitDone(1, TimeUnit.SECONDS).assertNoTimeout().values().get(0);
    shell.isAlive().test().awaitDone(1, TimeUnit.SECONDS).assertNoTimeout().assertValue(true);

    shell.close().test().awaitDone(1, TimeUnit.SECONDS).assertTimeout();

    idler.onNext(true);

    shell.close().test().awaitDone(1, TimeUnit.SECONDS).assertNoTimeout().assertValue(0);

    verify(cmdProcessor).isIdle();
    verify(rxShellSession).close();
}
 
源代码4 项目: web3sdk   文件: WebSocketService.java
@Override
public <T extends Notification<?>> Flowable<T> subscribe(
        Request request, String unsubscribeMethod, Class<T> responseType) {
    // We can't use usual Observer since we can call "onError"
    // before first client is subscribed and we need to
    // preserve it
    BehaviorSubject<T> subject = BehaviorSubject.create();

    // We need to subscribe synchronously, since if we return
    // an Flowable to a client before we got a reply
    // a client can unsubscribe before we know a subscription
    // id and this can cause a race condition
    subscribeToEventsStream(request, subject, responseType);

    return subject.doOnDispose(() -> closeSubscription(subject, unsubscribeMethod))
            .toFlowable(BackpressureStrategy.BUFFER);
}
 
源代码5 项目: web3j   文件: WebSocketService.java
@Override
public <T extends Notification<?>> Flowable<T> subscribe(
        Request request, String unsubscribeMethod, Class<T> responseType) {
    // We can't use usual Observer since we can call "onError"
    // before first client is subscribed and we need to
    // preserve it
    BehaviorSubject<T> subject = BehaviorSubject.create();

    // We need to subscribe synchronously, since if we return
    // an Flowable to a client before we got a reply
    // a client can unsubscribe before we know a subscription
    // id and this can cause a race condition
    subscribeToEventsStream(request, subject, responseType);

    return subject.doOnDispose(() -> closeSubscription(subject, unsubscribeMethod))
            .toFlowable(BackpressureStrategy.BUFFER);
}
 
源代码6 项目: android-mvvm   文件: RecyclerViewAdapterTest.java
@Before
public void setUp() throws Exception {
    List<ViewModel> vms = TestViewModel.dummyViewModels(INITIAL_COUNT);

    viewModelsSource = BehaviorSubject.createDefault(vms);
    testViewProvider = new TestViewProvider();
    testBinder = new TestViewModelBinder();
    subscriptionCounter = new SubscriptionCounter<>();
    sut = new RecyclerViewAdapter(viewModelsSource.compose(subscriptionCounter),
            testViewProvider, testBinder);

    notifyCallCount = 0;
    defaultObserver = new RecyclerView.AdapterDataObserver() {
        @Override
        public void onChanged() {
            notifyCallCount++;
        }
    };
    sut.registerAdapterDataObserver(defaultObserver);
}
 
源代码7 项目: akarnokd-misc   文件: ReplayRefCountSubjectTest.java
@Test
public void test() {
    BehaviorSubject<Integer> subject = BehaviorSubject.create();

    Observable<Integer> observable = subject
            .doOnNext(e -> { 
                System.out.println("This emits for second subscriber"); 
            })
            .doOnSubscribe(s -> System.out.println("OnSubscribe"))
            .doOnDispose(() -> System.out.println("OnDispose"))
            .replay(1)
            .refCount()
            .doOnNext(e -> { System.out.println("This does NOT emit for second subscriber"); });

    System.out.println("Subscribe-1");
    // This line causes the test to fail.
    observable.takeUntil(Observable.just(1)).test();

    subject.onNext(2);

    System.out.println("Subscribe-2");
    TestObserver<Integer> subscriber = observable.take(1).test();
    Assert.assertTrue(subscriber.awaitTerminalEvent(2, TimeUnit.SECONDS));
}
 
源代码8 项目: incubator-taverna-mobile   文件: RxSearch.java
public static Observable<String> fromSearchView(@NonNull final SearchView searchView) {
    final BehaviorSubject<String> subject = BehaviorSubject.create();

    searchView.setOnQueryTextListener(new SearchView.OnQueryTextListener() {

        @Override
        public boolean onQueryTextSubmit(String query) {
            subject.onNext(query);
            subject.onComplete();
            searchView.clearFocus();
            return true;
        }

        @Override
        public boolean onQueryTextChange(String newText) {
            subject.onNext(newText);
            return true;
        }
    });

    return subject;
}
 
源代码9 项目: api   文件: RpcRx.java
/**
 * @param rpc An API provider using HTTP or WebSocket
 */
public RpcRx(RpcCore rpc) {
    this.api = rpc;
    this.eventEmitter = new EventEmitter();
    this.isConnected = BehaviorSubject.createDefault(this.api.getProvider().isConnected());

    this.initEmitters(this.api.getProvider());

    this.author = this.createInterface(this.api.author());
    this.chain = this.createInterface(this.api.chain());
    this.state = this.createInterface(this.api.state());
    this.system = this.createInterface(this.api.system());
}
 
@Test
void behaviorSubject_test() {
    BehaviorSubject<Object> behaviorSubject = BehaviorSubject.create();

    behaviorSubject.onNext(1L);
    behaviorSubject.onNext(2L);
    behaviorSubject.subscribe(x -> log("一郎神: " + x),
            Throwable::printStackTrace,
            () -> System.out.println("Emission completed"),
            disposable -> System.out.println("onSubscribe")
    );
    behaviorSubject.onNext(10L);
    behaviorSubject.onComplete();
}
 
@Test
void behaviorSubject_error_test() {
    BehaviorSubject<Object> behaviorSubject = BehaviorSubject.create();

    behaviorSubject.onNext(1L);
    behaviorSubject.onNext(2L);
    behaviorSubject.onError(new RuntimeException("我来耍下宝"));
    behaviorSubject.subscribe(x -> log("一郎神: " + x),
            Throwable::printStackTrace,
            () -> System.out.println("Emission completed"),
            disposable -> System.out.println("onSubscribe")
    );
    behaviorSubject.onNext(10L);
    behaviorSubject.onComplete();
}
 
public EventCapturePresenterImpl(EventCaptureContract.View view, String eventUid,
                                 EventCaptureContract.EventCaptureRepository eventCaptureRepository,
                                 RulesUtilsProvider rulesUtils,
                                 ValueStore valueStore, SchedulerProvider schedulerProvider) {
    this.view = view;
    this.eventUid = eventUid;
    this.eventCaptureRepository = eventCaptureRepository;
    this.rulesUtils = rulesUtils;
    this.valueStore = valueStore;
    this.schedulerProvider = schedulerProvider;
    this.currentPosition = 0;
    this.sectionsToHide = new ArrayList<>();
    this.currentSection = new ObservableField<>("");
    this.errors = new HashMap<>();
    this.emptyMandatoryFields = new HashMap<>();
    this.canComplete = true;
    this.sectionList = new ArrayList<>();
    this.compositeDisposable = new CompositeDisposable();

    currentSectionPosition = PublishProcessor.create();
    sectionProcessor = PublishProcessor.create();
    showCalculationProcessor = PublishProcessor.create();
    progressProcessor = PublishProcessor.create();
    sectionAdjustProcessor = PublishProcessor.create();
    formAdjustProcessor = PublishProcessor.create();
    notesCounterProcessor = PublishProcessor.create();
    formFieldsProcessor = BehaviorSubject.createDefault(new ArrayList<>());
}
 
源代码13 项目: ground-android   文件: AuthenticationManager.java
@Inject
public AuthenticationManager(Application application, ActivityStreams activityStreams) {
  this.signInState = BehaviorSubject.create();
  this.firebaseAuth = FirebaseAuth.getInstance();
  this.googleSignInOptions =
      new GoogleSignInOptions.Builder(GoogleSignInOptions.DEFAULT_SIGN_IN)
          .requestIdToken(application.getResources().getString(R.string.default_web_client_id))
          .requestEmail()
          .requestProfile()
          .build();
  this.activityStreams = activityStreams;
  this.activityResultsSubscription =
      activityStreams.getActivityResults(SIGN_IN_REQUEST_CODE).subscribe(this::onActivityResult);
}
 
源代码14 项目: ground-android   文件: LocationManager.java
@Inject
public LocationManager(
    Application app,
    PermissionsManager permissionsManager,
    SettingsManager settingsManager,
    RxFusedLocationProviderClient locationClient) {
  this.permissionsManager = permissionsManager;
  this.settingsManager = settingsManager;
  this.locationClient = locationClient;
  this.locationUpdates = BehaviorSubject.create();
  this.locationUpdateCallback = RxLocationCallback.create(locationUpdates);
}
 
源代码15 项目: 2018-TowerDefence   文件: RunnerRoundProcessor.java
boolean processRound(BehaviorSubject<String> addToConsoleOutput) throws Exception {
    if (roundProcessed) {
        throw new InvalidOperationException("This round has already been processed!");
    }
    boolean processed = gameRoundProcessor.processRound(gameMap, commandsToProcess);

    ArrayList<String> errorList = gameRoundProcessor.getErrorList();
    String errorListText = "Error List: " + Arrays.toString(errorList.toArray());
    addToConsoleOutput.onNext(errorListText);

    roundProcessed = true;

    return processed;
}
 
源代码16 项目: 2018-TowerDefence   文件: GameEngineRunner.java
public GameEngineRunner() {
    this.unsubscribe = BehaviorSubject.create();
    this.addToConsoleOutput = BehaviorSubject.create();
    this.addToConsoleOutput
            .takeUntil(this.unsubscribe)
            .subscribe(text -> consoleOutput += text);
}
 
源代码17 项目: RxShell   文件: RxCmdShellTest.java
@Before
public void setup() throws Exception {
    super.setup();
    when(builder.getProcessorFactory()).thenReturn(commandProcessorFactory);
    when(commandProcessorFactory.create()).thenReturn(cmdProcessor);
    BehaviorSubject<Boolean> idlePub = BehaviorSubject.createDefault(true);
    when(cmdProcessor.isIdle()).thenReturn(idlePub);

    when(rxShell.open()).thenReturn(Single.create(emitter -> {
        when(rxShellSession.waitFor()).thenReturn(Single.create(e -> waitForEmitter = e));
        emitter.onSuccess(rxShellSession);
    }));

    when(rxShellSession.waitFor()).thenReturn(Single.just(0));
    when(rxShellSession.isAlive()).thenReturn(Single.just(true));
    when(rxShellSession.cancel()).thenReturn(Completable.create(e -> {
        when(rxShellSession.isAlive()).thenReturn(Single.just(false));
        waitForEmitter.onSuccess(1);
        idlePub.onNext(true);
        idlePub.onComplete();
        e.onComplete();
    }));
    when(rxShellSession.close()).thenReturn(Single.create(e -> {
        when(rxShellSession.isAlive()).thenReturn(Single.just(false));
        waitForEmitter.onSuccess(0);
        e.onSuccess(0);
        idlePub.onNext(true);
        idlePub.onComplete();
    }));
}
 
源代码18 项目: web3sdk   文件: WebSocketService.java
private <T extends Notification<?>> void processSubscriptionResponse(
        BcosSubscribe subscriptionReply, BehaviorSubject<T> subject, Class<T> responseType)
        throws IOException {
    if (!subscriptionReply.hasError()) {
        establishSubscription(subject, responseType, subscriptionReply);
    } else {
        reportSubscriptionError(subject, subscriptionReply);
    }
}
 
源代码19 项目: web3sdk   文件: WebSocketService.java
private <T extends Notification<?>> void establishSubscription(
        BehaviorSubject<T> subject, Class<T> responseType, BcosSubscribe subscriptionReply) {
    log.info("Subscribed to RPC events with id {}", subscriptionReply.getSubscriptionId());
    subscriptionForId.put(
            subscriptionReply.getSubscriptionId(),
            new WebSocketSubscription<>(subject, responseType));
}
 
源代码20 项目: web3sdk   文件: WebSocketService.java
private <T extends Notification<?>> String getSubscriptionId(BehaviorSubject<T> subject) {
    return subscriptionForId
            .entrySet()
            .stream()
            .filter(entry -> entry.getValue().getSubject() == subject)
            .map(Map.Entry::getKey)
            .findFirst()
            .orElse(null);
}
 
源代码21 项目: web3sdk   文件: WebSocketService.java
private <T extends Notification<?>> void reportSubscriptionError(
        BehaviorSubject<T> subject, BcosSubscribe subscriptionReply) {
    Response.Error error = subscriptionReply.getError();
    log.error("Subscription request returned error: {}", error.getMessage());
    subject.onError(
            new IOException(
                    String.format(
                            "Subscription request failed with error: %s", error.getMessage())));
}
 
源代码22 项目: web3sdk   文件: WebSocketService.java
private <T extends Notification<?>> void subscribeToEventsStream(
        Request request, BehaviorSubject<T> subject, Class<T> responseType) {

    subscriptionRequestForId.put(
            request.getId(), new WebSocketSubscription<>(subject, responseType));
    try {
        send(request, BcosSubscribe.class);
    } catch (IOException e) {
        log.error("Failed to subscribe to RPC events with request id {}", request.getId());
        subject.onError(e);
    }
}
 
源代码23 项目: web3sdk   文件: WebSocketService.java
private <T extends Notification<?>> void closeSubscription(
        BehaviorSubject<T> subject, String unsubscribeMethod) {
    String subscriptionId = getSubscriptionId(subject);
    if (subscriptionId != null) {
        subscriptionForId.remove(subscriptionId);
        unsubscribeFromEventsStream(subscriptionId, unsubscribeMethod);
    } else {
        log.warn("Trying to unsubscribe from a non-existing subscription. Race condition?");
    }
}
 
源代码24 项目: web3j   文件: WebSocketService.java
private <T extends Notification<?>> void processSubscriptionResponse(
        EthSubscribe subscriptionReply, BehaviorSubject<T> subject, Class<T> responseType) {
    if (!subscriptionReply.hasError()) {
        establishSubscription(subject, responseType, subscriptionReply);
    } else {
        reportSubscriptionError(subject, subscriptionReply);
    }
}
 
源代码25 项目: web3j   文件: WebSocketService.java
private <T extends Notification<?>> void establishSubscription(
        BehaviorSubject<T> subject, Class<T> responseType, EthSubscribe subscriptionReply) {
    log.debug("Subscribed to RPC events with id {}", subscriptionReply.getSubscriptionId());
    subscriptionForId.put(
            subscriptionReply.getSubscriptionId(),
            new WebSocketSubscription<>(subject, responseType));
}
 
源代码26 项目: web3j   文件: WebSocketService.java
private <T extends Notification<?>> String getSubscriptionId(BehaviorSubject<T> subject) {
    return subscriptionForId.entrySet().stream()
            .filter(entry -> entry.getValue().getSubject() == subject)
            .map(Map.Entry::getKey)
            .findFirst()
            .orElse(null);
}
 
源代码27 项目: web3j   文件: WebSocketService.java
private <T extends Notification<?>> void reportSubscriptionError(
        BehaviorSubject<T> subject, EthSubscribe subscriptionReply) {
    Response.Error error = subscriptionReply.getError();
    log.error("Subscription request returned error: {}", error.getMessage());
    subject.onError(
            new IOException(
                    String.format(
                            "Subscription request failed with error: %s", error.getMessage())));
}
 
源代码28 项目: web3j   文件: WebSocketService.java
private <T extends Notification<?>> void subscribeToEventsStream(
        Request request, BehaviorSubject<T> subject, Class<T> responseType) {

    subscriptionRequestForId.put(
            request.getId(), new WebSocketSubscription<>(subject, responseType));
    try {
        send(request, EthSubscribe.class);
    } catch (IOException e) {
        log.error("Failed to subscribe to RPC events with request id {}", request.getId());
        subject.onError(e);
    }
}
 
源代码29 项目: web3j   文件: WebSocketService.java
private <T extends Notification<?>> void closeSubscription(
        BehaviorSubject<T> subject, String unsubscribeMethod) {
    String subscriptionId = getSubscriptionId(subject);
    if (subscriptionId != null) {
        subscriptionForId.remove(subscriptionId);
        unsubscribeFromEventsStream(subscriptionId, unsubscribeMethod);
    } else {
        log.warn("Trying to unsubscribe from a non-existing subscription. Race condition?");
    }
}
 
源代码30 项目: science-journal   文件: GoogleAccount.java
/**
 * Constructs a new GoogleAccount instance.
 */

GoogleAccount(Context context, @Nullable Account account, GoogleSignInAccount googleSignInAccount) {
  super(context);
  accountSubject =
      (account != null) ? BehaviorSubject.createDefault(account) : BehaviorSubject.create();
  this.googleSignInAccount = googleSignInAccount;
  this.account = googleSignInAccount.getAccount();
  setAccount(account);
  this.accountKey = AccountsUtils.makeAccountKey(NAMESPACE, this.googleSignInAccount.getId());
}
 
 类所在包
 同包方法