类io.reactivex.subjects.PublishSubject源码实例Demo

下面列出了怎么用io.reactivex.subjects.PublishSubject的API类实例代码及写法,或者点击链接到github查看源代码。

源代码1 项目: RxPermissions   文件: RxPermissionsTest.java
@Test
@TargetApi(Build.VERSION_CODES.M)
public void subscription_trigger_granted() {
    TestObserver<Boolean> sub = new TestObserver<>();
    String permission = Manifest.permission.READ_PHONE_STATE;
    when(mRxPermissions.isGranted(permission)).thenReturn(false);
    int[] result = new int[]{PackageManager.PERMISSION_GRANTED};
    PublishSubject<Object> trigger = PublishSubject.create();

    trigger.compose(mRxPermissions.ensure(permission)).subscribe(sub);
    trigger.onNext(1);
    mRxPermissions.onRequestPermissionsResult(new String[]{permission}, result);

    sub.assertNoErrors();
    sub.assertNotTerminated();
    sub.assertValue(true);
}
 
源代码2 项目: ThirtyInch   文件: RxTiPresenterUtilsTest.java
@Test
public void testDeliverLatestToView_ViewComesAndGoes() throws Exception {
    mPresenter.create();

    PublishSubject<Integer> source = PublishSubject.create();
    TestObserver<Integer> testObserver = new TestObserver<>();

    source
            .compose(RxTiPresenterUtils.<Integer>deliverLatestToView(mPresenter))
            .subscribe(testObserver);

    source.onNext(1);
    source.onNext(2);
    mPresenter.attachView(mView);
    source.onNext(3);
    mPresenter.detachView();
    source.onNext(4);
    source.onNext(5);
    mPresenter.attachView(mView);
    source.onNext(6);

    testObserver.assertNotComplete();
    testObserver.assertNoErrors();
    testObserver.assertValuesOnly(2, 3, 5, 6);
}
 
源代码3 项目: mobius   文件: MobiusEffectRouterTest.java
@Test
public void effectHandlersShouldBeImmutable() throws Exception {
  // redo some test setup for test case specific conditions
  publishSubject = PublishSubject.create();
  testSubscriber = TestObserver.create();

  SubtypeEffectHandlerBuilder<TestEffect, TestEvent> builder =
      RxMobius.<TestEffect, TestEvent>subtypeEffectHandler()
          .addTransformer(A.class, (Observable<A> as) -> as.map(a -> AEvent.create(a.id())));

  ObservableTransformer<TestEffect, TestEvent> router = builder.build();

  // this should not lead to the effects router being capable of handling B effects
  builder.addTransformer(B.class, bs -> bs.map(b -> BEvent.create(b.id())));

  publishSubject.compose(router).subscribe(testSubscriber);

  B effect = B.create(84);
  publishSubject.onNext(effect);
  publishSubject.onComplete();

  testSubscriber.awaitTerminalEvent();
  testSubscriber.assertError(new UnknownEffectException(effect));
}
 
源代码4 项目: RxGroups   文件: GroupLifecycleManagerTest.java
@Test public void testNonResubscribableObservablesRemovedAfterNonFinishingDestroy() {
  when(observableManager.newGroup()).thenReturn(new ObservableGroup(1));

  GroupLifecycleManager lifecycleManager = GroupLifecycleManager.onCreate
      (observableManager, null, target);

  TestObserver<String> nonResubscribableObserver = new TestObserver<>();
  PublishSubject.<String>create()
      .compose(lifecycleManager.group().transform(nonResubscribableObserver))
      .subscribe(nonResubscribableObserver);

  assertThat(lifecycleManager.group().hasObservables(nonResubscribableObserver)).isTrue();

  //Simulate a rotation
  Activity activity = mock(Activity.class);
  when(activity.isFinishing()).thenReturn(false);
  lifecycleManager.onSaveInstanceState(new Bundle());
  lifecycleManager.onDestroy(activity);

  assertThat(lifecycleManager.group().hasObservables(nonResubscribableObserver)).isFalse();
}
 
private void startListenBalance(final PublishSubject<Object> subject){
    Disposable subscribe =
            api
                .updateBalance()
                .repeatWhen((completed) -> completed.delay(
                        BuildConfig.UPDATE_BALANCE_SECONDS_DELAY,
                        TimeUnit.SECONDS
                ))
                .retryWhen((completed) -> completed.delay(
                        BuildConfig.UPDATE_BALANCE_SECONDS_DELAY,
                        TimeUnit.SECONDS
                ))
                .repeatWhen(repeatHandler ->
                        repeatHandler.flatMap(nothing -> subject.toFlowable(BackpressureStrategy.LATEST)))
                .subscribe();

    compositeDisposable.add(subscribe);
}
 
源代码6 项目: SweetMusicPlayer   文件: SubscriberEvent.java
private void initObservable() {
    subject = PublishSubject.create();
    subject.observeOn(EventThread.getScheduler(thread))
            .subscribe(new Consumer() {
                @Override
                public void accept(Object event) {
                    try {
                        if (valid) {
                            handleEvent(event);
                        }
                    } catch (InvocationTargetException e) {
                        throwRuntimeException("Could not dispatch event: " + event.getClass() + " to subscriber " + SubscriberEvent.this, e);
                    }
                }
            });
}
 
源代码7 项目: samples-android   文件: RxWebAuthClientImpl.java
@Override
public Observable<RxResult> registerListener(FragmentActivity activity) {
    submitResults = PublishSubject.create();

    registerActivityLifeCycle(activity);
    mSyncAuthClient.registerCallbackIfInterrupt(activity, (result, type) -> {
        switch (type) {
            case SIGN_IN:
                processLogInResult(result);
                break;
            case SIGN_OUT:
                processSignOutResult(result);
                break;
            default:
                break;
        }
    }, Executors.newSingleThreadExecutor());

    return submitResults;
}
 
源代码8 项目: RxGroups   文件: ObservableGroupTest.java
@Test public void shouldDeliverQueuedEventsWhenResubscribed() throws Exception {
  ObservableGroup group = observableManager.newGroup();
  TestAutoResubscribingObserver resubscribingObserver = new TestAutoResubscribingObserver("foo");
  PublishSubject<String> sourceObservable = PublishSubject.create();
  sourceObservable.compose(group.transform(resubscribingObserver))
      .subscribe(resubscribingObserver);
  group.dispose();

  sourceObservable.onNext("Hello World");
  sourceObservable.onComplete();

  resubscribingObserver.assertionTarget.assertNotComplete();
  resubscribingObserver.assertionTarget.assertNoValues();

  // TestObserver cannot be reused after being disposed in RxJava2
  resubscribingObserver = new TestAutoResubscribingObserver("foo");
  group.observable(resubscribingObserver).subscribe(resubscribingObserver);

  resubscribingObserver.assertionTarget.assertComplete();
  resubscribingObserver.assertionTarget.assertValue("Hello World");
  assertThat(group.hasObservables(resubscribingObserver)).isEqualTo(false);
}
 
private void doSomeWork() {

        PublishSubject<Integer> source = PublishSubject.create();
        ConnectableObservable<Integer> connectableObservable = source.replay(3); // bufferSize = 3 to retain 3 values to replay
        connectableObservable.connect(); // connecting the connectableObservable

        connectableObservable.subscribe(getFirstObserver());

        source.onNext(1);
        source.onNext(2);
        source.onNext(3);
        source.onNext(4);
        source.onComplete();

        /*
         * it will emit 2, 3, 4 as (count = 3), retains the 3 values for replay
         */
        connectableObservable.subscribe(getSecondObserver());

    }
 
源代码10 项目: RxGroups   文件: ObservableGroupTest.java
@Test public void shouldReplaceObservablesOfSameTagAndSameGroupId() {
  ObservableGroup group = observableManager.newGroup();
  PublishSubject<String> observable1 = PublishSubject.create();
  PublishSubject<String> observable2 = PublishSubject.create();
  TestAutoResubscribingObserver observer1 = new TestAutoResubscribingObserver("foo");
  TestAutoResubscribingObserver observer2 = new TestAutoResubscribingObserver("foo");
  observable1.compose(group.transform(observer1)).subscribe(observer1);
  observable2.compose(group.transform(observer2)).subscribe(observer2);

  assertThat(group.subscription(fooObserver).isCancelled()).isFalse();
  assertThat(group.hasObservables(fooObserver)).isTrue();

  observable1.onNext("Hello World 1");
  observable1.onComplete();

  observable2.onNext("Hello World 2");
  observable2.onComplete();

  observer2.assertionTarget.awaitTerminalEvent();
  observer2.assertionTarget.assertComplete();
  observer2.assertionTarget.assertValue("Hello World 2");

  observer1.assertionTarget.assertNoValues();
}
 
源代码11 项目: RxGroups   文件: ObservableGroupTest.java
@Test public void shouldNotRemoveSubscribersForOtherIds() throws Exception {
  ObservableGroup group = observableManager.newGroup();
  ObservableGroup group2 = observableManager.newGroup();
  PublishSubject<String> subject1 = PublishSubject.create();
  TestAutoResubscribingObserver testSubscriber1 = new TestAutoResubscribingObserver("foo");
  PublishSubject<String> subject2 = PublishSubject.create();
  TestAutoResubscribingObserver testSubscriber2 = new TestAutoResubscribingObserver("bar");

  subject1.compose(group.transform(testSubscriber1)).subscribe(testSubscriber1);
  subject2.compose(group2.transform(testSubscriber2)).subscribe(testSubscriber2);
  group.dispose();

  subject1.onNext("Florinda Mesa");
  subject1.onComplete();
  subject2.onNext("Carlos Villagran");
  subject2.onComplete();

  testSubscriber1.assertionTarget.assertNotComplete();
  testSubscriber2.assertionTarget.assertNoErrors();
  testSubscriber2.assertionTarget.assertValue("Carlos Villagran");
}
 
源代码12 项目: RxGroups   文件: ObservableGroupTest.java
@Test public void shouldQueueMultipleRequests() throws Exception {
  ObservableGroup group = observableManager.newGroup();
  PublishSubject<String> subject1 = PublishSubject.create();
  TestObserver<String> testSubscriber1 = new TestObserver<>();
  PublishSubject<String> subject2 = PublishSubject.create();
  TestObserver<String> testSubscriber2 = new TestObserver<>();

  subject1.compose(group.transform(testSubscriber1)).subscribe(testSubscriber1);
  subject2.compose(group.transform(testSubscriber2)).subscribe(testSubscriber2);
  group.dispose();

  subject1.onNext("Chespirito");
  subject1.onComplete();
  subject2.onNext("Edgar Vivar");
  subject2.onComplete();

  testSubscriber1.assertNotComplete();
  testSubscriber2.assertNotComplete();
  assertThat(group.hasObservables(testSubscriber1)).isEqualTo(true);
  assertThat(group.hasObservables(testSubscriber2)).isEqualTo(true);
}
 
源代码13 项目: jobson   文件: JobExecutorTest.java
@Test
public void testExecuteStderrListenerIsCompletedOnceApplicationExecutionEnds() throws Throwable {
    final JobExecutor jobExecutor = getInstance();
    final AtomicBoolean completedCalled = new AtomicBoolean(false);
    final Subject<byte[]> stderrSubject = PublishSubject.create();
    stderrSubject.doOnComplete(() -> completedCalled.set(true)).subscribe();
    final JobEventListeners listeners = createStderrListener(stderrSubject);
    final CancelablePromise<JobExecutionResult> ret =
            jobExecutor.execute(STANDARD_REQUEST, listeners);

    promiseAssert(ret, result -> {
        try {
            // The stderr thread can race with the exit thread
            Thread.sleep(50);
            assertThat(completedCalled.get()).isTrue();
        } catch (InterruptedException ignored) {}
    });
}
 
源代码14 项目: RxGroups   文件: ObservableGroupTest.java
@Test public void shouldAutoResubscribeAfterUnlock() throws InterruptedException {
  ObservableGroup group = observableManager.newGroup();
  TestObserver<String> testObserver = new TestObserver<>();
  PublishSubject<String> sourceObservable = PublishSubject.create();

  group.lock();
  sourceObservable.compose(group.transform(testObserver)).subscribe(testObserver);

  sourceObservable.onNext("Chespirito");
  sourceObservable.onComplete();

  group.unlock();

  testObserver.assertComplete();
  testObserver.assertNoErrors();
  testObserver.assertValue("Chespirito");
  assertThat(group.hasObservables(fooObserver)).isEqualTo(false);
}
 
源代码15 项目: jobson   文件: JobExecutorTest.java
@Test
public void testExecuteEvaluatesJoinAsExpected() throws InterruptedException {
    final JobExecutor jobExecutor = getInstance();
    final PersistedJob req =
            standardRequestWithCommand("echo", "${join(',', inputs.someList)}");
    final AtomicReference<byte[]> bytesEchoedToStdout = new AtomicReference<>(new byte[]{});
    final Subject<byte[]> stdoutSubject = PublishSubject.create();
    stdoutSubject.subscribe(bytes ->
            bytesEchoedToStdout.getAndUpdate(existingBytes ->
                    Bytes.concat(existingBytes, bytes)));

    final Semaphore s = new Semaphore(1);
    s.acquire();
    stdoutSubject.doOnComplete(s::release).subscribe();

    final JobEventListeners listeners =
            createStdoutListener(stdoutSubject);

    jobExecutor.execute(req, listeners);

    s.tryAcquire(TestConstants.DEFAULT_TIMEOUT, MILLISECONDS);

    final String stringFromStdout = new String(bytesEchoedToStdout.get()).trim();

    assertThat(stringFromStdout).isEqualTo("a,b,c,d"); // From the input fixture
}
 
源代码16 项目: jobson   文件: JobManagerTest.java
@Test
public void testGetStdoutUpdatesEchoesUpdatesFromExecutorObservers() throws InterruptedException, ExecutionException, TimeoutException {
    final CancelablePromise<JobExecutionResult> executorPromise = new SimpleCancelablePromise<>();
    final Subject<byte[]> stdoutSubject = PublishSubject.create();
    final JobExecutor executor =
            MockJobExecutor.thatUses(executorPromise, stdoutSubject, Observable.just(TestHelpers.generateRandomBytes()));
    final JobManager jobManager = createManagerWith(executor);

    final Pair<JobId, CancelablePromise<FinalizedJob>> ret =
            jobManager.submit(STANDARD_VALID_REQUEST);

    final Observable<byte[]> stdoutObservable =
            jobManager.stdoutUpdates(ret.getLeft()).get();

    final AtomicReference<byte[]> bytesFromObservable = new AtomicReference<>();
    stdoutObservable.subscribe(bytesFromObservable::set);

    final byte[] bytesExpected = TestHelpers.generateRandomBytes();
    stdoutSubject.onNext(bytesExpected);

    executorPromise.complete(new JobExecutionResult(FINISHED));

    ret.getRight().get(DEFAULT_TIMEOUT, MILLISECONDS);

    assertThat(bytesFromObservable.get()).isEqualTo(bytesExpected);
}
 
源代码17 项目: jobson   文件: JobManagerTest.java
@Test
public void testGetStderrUpdatesEchoesUpdatesFromExecutorObservers() throws InterruptedException, ExecutionException, TimeoutException {
    final CancelablePromise<JobExecutionResult> executorPromise = new SimpleCancelablePromise<>();
    final Subject<byte[]> stderrSubject = PublishSubject.create();
    final JobExecutor executor =
            MockJobExecutor.thatUses(executorPromise, Observable.just(TestHelpers.generateRandomBytes()), stderrSubject);
    final JobManager jobManager = createManagerWith(executor);

    final Pair<JobId, CancelablePromise<FinalizedJob>> ret =
            jobManager.submit(STANDARD_VALID_REQUEST);

    final Observable<byte[]> stderrObservable =
            jobManager.stderrUpdates(ret.getLeft()).get();

    final AtomicReference<byte[]> bytesFromObservable = new AtomicReference<>();
    stderrObservable.subscribe(bytesFromObservable::set);

    final byte[] bytesExpected = TestHelpers.generateRandomBytes();
    stderrSubject.onNext(bytesExpected);

    executorPromise.complete(new JobExecutionResult(FINISHED));

    ret.getRight().get(DEFAULT_TIMEOUT, MILLISECONDS);

    assertThat(bytesFromObservable.get()).isEqualTo(bytesExpected);
}
 
源代码18 项目: AvoidOnResult   文件: AvoidOnResultFragment.java
@Override
public void onActivityResult(int requestCode, int resultCode, Intent data) {
    super.onActivityResult(requestCode, resultCode, data);
    //rxjava方式的处理
    PublishSubject<ActivityResultInfo> subject = mSubjects.remove(requestCode);
    if (subject != null) {
        subject.onNext(new ActivityResultInfo(resultCode, data));
        subject.onComplete();
    }

    //callback方式的处理
    AvoidOnResult.Callback callback = mCallbacks.remove(requestCode);
    if (callback != null) {
        callback.onActivityResult(resultCode, data);
    }
}
 
源代码19 项目: RxPermissions   文件: RxPermissionsTest.java
@Test
@TargetApi(Build.VERSION_CODES.M)
public void eachSubscription_trigger_granted() {
    TestObserver<Permission> sub = new TestObserver<>();
    String permission = Manifest.permission.READ_PHONE_STATE;
    when(mRxPermissions.isGranted(permission)).thenReturn(false);
    int[] result = new int[]{PackageManager.PERMISSION_GRANTED};
    PublishSubject<Object> trigger = PublishSubject.create();

    trigger.compose(mRxPermissions.ensureEach(permission)).subscribe(sub);
    trigger.onNext(1);
    mRxPermissions.onRequestPermissionsResult(new String[]{permission}, result);

    sub.assertNoErrors();
    sub.assertNotTerminated();
    sub.assertValue(new Permission(permission, true));
}
 
源代码20 项目: akarnokd-misc   文件: FlatMapWithTwoErrors.java
@Test
public void innerCancelled4() {
    PublishSubject<Integer> pp1 = PublishSubject.create();
    PublishSubject<Integer> pp2 = PublishSubject.create();
    
    pp1
    .concatMap(v -> pp2)
    .test();

    pp1.onNext(1);
    assertTrue("No subscribers?", pp2.hasObservers());

    pp1.onError(new Exception());
    
    assertFalse("Has subscribers?", pp2.hasObservers());
}
 
源代码21 项目: java-debug   文件: ProcessConsole.java
private void monitor(InputStream input, PublishSubject<String> subject) {
    BufferedReader reader = new BufferedReader(new InputStreamReader(input, encoding));
    final int BUFFERSIZE = 4096;
    char[] buffer = new char[BUFFERSIZE];
    while (true) {
        try {
            if (Thread.interrupted()) {
                subject.onComplete();
                return;
            }
            int read = reader.read(buffer, 0, BUFFERSIZE);
            if (read == -1) {
                subject.onComplete();
                return;
            }

            subject.onNext(new String(buffer, 0, read));
        } catch (IOException e) {
            subject.onError(e);
            return;
        }
    }
}
 
源代码22 项目: StompProtocolAndroid   文件: StompClient.java
public StompClient(ConnectionProvider connectionProvider) {
    this.connectionProvider = connectionProvider;
    streamMap = new ConcurrentHashMap<>();
    lifecyclePublishSubject = PublishSubject.create();
    pathMatcher = new SimplePathMatcher();
    heartBeatTask = new HeartBeatTask(this::sendHeartBeat, () -> {
        lifecyclePublishSubject.onNext(new LifecycleEvent(LifecycleEvent.Type.FAILED_SERVER_HEARTBEAT));
    });
}
 
源代码23 项目: mobius   文件: TransformersTest.java
@Test
public void effectPerformerRunsActionWheneverEffectIsRequested() throws Exception {
  PublishSubject<String> upstream = PublishSubject.create();
  TestAction action = new TestAction();
  upstream.compose(Transformers.fromAction(action)).subscribe();
  upstream.onNext("First Time");
  assertThat(action.getRunCount(), is(1));

  upstream.onNext("One more!");
  assertThat(action.getRunCount(), is(2));
}
 
源代码24 项目: mobius   文件: TransformersTest.java
@Test
public void effectPerformerRunsActionOnSchedulerWheneverEffectIsRequested() throws Exception {
  PublishSubject<String> upstream = PublishSubject.create();
  TestAction action = new TestAction();
  TestScheduler scheduler = new TestScheduler();
  upstream.compose(Transformers.fromAction(action, scheduler)).subscribe();

  upstream.onNext("First Time");
  assertThat(action.getRunCount(), is(0));
  scheduler.triggerActions();
  assertThat(action.getRunCount(), is(1));
}
 
源代码25 项目: mobius   文件: TransformersTest.java
@Test
public void effectPerformerInvokesFunctionWithReceivedEffectAndErrorsForUnhandledExceptions() {
  PublishSubject<String> upstream = PublishSubject.create();
  TestScheduler scheduler = new TestScheduler();
  Function<String, Integer> function =
      s -> {
        throw new RuntimeException("Something bad happened");
      };
  TestObserver<Integer> observer =
      upstream.compose(Transformers.fromFunction(function, scheduler)).test();

  upstream.onNext("Hello");
  scheduler.triggerActions();
  observer.assertError(RuntimeException.class);
}
 
源代码26 项目: mobius   文件: MobiusEffectRouterTest.java
@Test
public void shouldHandleNullRxJavaErrorHandler() throws Exception {
  // given no RxJava error handler
  RxJavaPlugins.setErrorHandler(null);

  // and a router with a broken effect handler
  publishSubject = PublishSubject.create();
  testSubscriber = TestObserver.create();

  final RuntimeException expected = new RuntimeException("expected!");
  ObservableTransformer<TestEffect, TestEvent> router =
      RxMobius.<TestEffect, TestEvent>subtypeEffectHandler()
          .addFunction(
              A.class,
              a -> {
                throw expected;
              })
          .build();

  publishSubject.compose(router).subscribe(testSubscriber);

  // when an event is sent, it doesn't crash (the exception does get printed to stderr)
  publishSubject.onNext(A.create(1));

  // and the right exception is forwarded to the test subscriber
  testSubscriber.assertError(t -> t == expected);
}
 
源代码27 项目: AutoAdapter   文件: BaseAutoAdapter.java
@SuppressWarnings("unused")
public final <X extends AutoViewHolder, Y extends IRenderer<X>> PublishSubject<ItemInfo<Y, X>>
longClicks(@NonNull final Class<Y> clazz) {
    PublishSubject<ItemInfo<Y, X>> publishSubject = PublishSubject.create();
    mItemViewLongClickBinding.put(clazz, publishSubject);
    return publishSubject;
}
 
源代码28 项目: RxGroups   文件: ObservableGroupTest.java
@Test public void shouldNotDeliverResultWhileUnsubscribed() throws Exception {
  ObservableGroup group = observableManager.newGroup();
  TestObserver<String> testObserver = new TestObserver<>();
  PublishSubject<String> sourceObservable = PublishSubject.create();

  sourceObservable.compose(group.transform(testObserver)).subscribe(testObserver);
  group.dispose();

  sourceObservable.onNext("Roberto Gomez Bolanos");
  sourceObservable.onComplete();

  testObserver.assertNotComplete();
  assertThat(group.hasObservables(testObserver)).isEqualTo(true);
}
 
源代码29 项目: ground-android   文件: MapContainerViewModel.java
@Inject
MapContainerViewModel(
    ProjectRepository projectRepository,
    FeatureRepository featureRepository,
    LocationManager locationManager,
    OfflineAreaRepository offlineAreaRepository) {
  this.featureRepository = featureRepository;
  this.locationManager = locationManager;
  this.locationLockChangeRequests = PublishSubject.create();
  this.cameraUpdateSubject = PublishSubject.create();

  Flowable<BooleanOrError> locationLockStateFlowable = createLocationLockStateFlowable().share();
  this.locationLockState =
      LiveDataReactiveStreams.fromPublisher(
          locationLockStateFlowable.startWith(BooleanOrError.falseValue()));
  this.cameraUpdateRequests =
      LiveDataReactiveStreams.fromPublisher(
          createCameraUpdateFlowable(locationLockStateFlowable));
  this.cameraPosition = new MutableLiveData<>();
  this.activeProject =
      LiveDataReactiveStreams.fromPublisher(projectRepository.getActiveProjectOnceAndStream());
  // TODO: Clear feature markers when project is deactivated.
  // TODO: Since we depend on project stream from repo anyway, this transformation can be moved
  // into the repo?
  this.mapPins =
      LiveDataReactiveStreams.fromPublisher(
          projectRepository
              .getActiveProjectOnceAndStream()
              .map(Loadable::value)
              .switchMap(this::getFeaturesStream)
              .map(MapContainerViewModel::toMapPins));
  this.mbtilesFilePaths =
      LiveDataReactiveStreams.fromPublisher(
          offlineAreaRepository
              .getDownloadedTilesOnceAndStream()
              .map(set -> stream(set).map(Tile::getPath).collect(toImmutableSet())));
}
 
源代码30 项目: ground-android   文件: HomeScreenViewModel.java
@Inject
HomeScreenViewModel(
    ProjectRepository projectRepository,
    FeatureRepository featureRepository,
    AuthenticationManager authManager,
    Navigator navigator,
    Schedulers schedulers) {
  this.projectRepository = projectRepository;
  this.addFeatureDialogRequests = new MutableLiveData<>();
  this.openDrawerRequests = new MutableLiveData<>();
  this.bottomSheetState = new MutableLiveData<>();
  this.activeProject =
      LiveDataReactiveStreams.fromPublisher(projectRepository.getActiveProjectOnceAndStream());
  this.navigator = navigator;
  this.addFeatureClicks = PublishSubject.create();

  disposeOnClear(
      addFeatureClicks
          .switchMapSingle(
              newFeature ->
                  featureRepository
                      .saveFeature(newFeature, authManager.getCurrentUser())
                      .toSingleDefault(newFeature)
                      .doOnError(this::onAddFeatureError)
                      .onErrorResumeNext(Single.never())) // Prevent from breaking upstream.
          .observeOn(schedulers.ui())
          .subscribe(this::showBottomSheet));
}
 
 类所在包
 同包方法