类io.reactivex.subscribers.DisposableSubscriber源码实例Demo

下面列出了怎么用io.reactivex.subscribers.DisposableSubscriber的API类实例代码及写法,或者点击链接到github查看源代码。

源代码1 项目: JReadHub   文件: SearchPresenter.java
@Override
public void loadAllHistory() {
    addSubscribe(mDataManager.getAllSearchHistroy()
            .compose(RxSchedulers.flowableIo2Main())
            .subscribeWith(new DisposableSubscriber<List<SearchHistoryBean>>() {
                @Override
                public void onNext(List<SearchHistoryBean> searchHistoryBeans) {
                    if (searchHistoryBeans != null && !searchHistoryBeans.isEmpty()) {
                        getView().bindData(searchHistoryBeans);
                    } else {
                        getView().showEmpty();
                    }
                }

                @Override
                public void onError(Throwable t) {
                }

                @Override
                public void onComplete() {
                }
            }));
}
 
源代码2 项目: JReadHub   文件: StarCommonPresenter.java
@Override
public void getDataWithKeyword(String keyWord) {
    addSubscribe(mDataManager.getNewsByKeyword(keyWord)
            .compose(RxSchedulers.flowableIo2Main())
            .doOnSubscribe(disposable -> getView().showLoading(false))
            .subscribeWith(new DisposableSubscriber<List<NewsBean>>() {
                @Override
                public void onNext(List<NewsBean> topicBeans) {
                    if (null != topicBeans && !topicBeans.isEmpty()) {
                        getView().bindData(topicBeans, true);
                        getView().showContent();
                    } else {
                        getView().showEmpty();
                    }
                }

                @Override
                public void onError(Throwable t) {
                    getView().showError();
                }

                @Override
                public void onComplete() {
                }
            }));
}
 
源代码3 项目: JReadHub   文件: StarCommonPresenter.java
@Override
public void doRefresh(boolean isPullToRefresh) {
    addSubscribe(mDataManager.getAllNews()
            .compose(RxSchedulers.flowableIo2Main())
            .doOnSubscribe(disposable -> getView().showLoading(isPullToRefresh))
            .subscribeWith(new DisposableSubscriber<List<NewsBean>>() {
                @Override
                public void onNext(List<NewsBean> topicBeans) {
                    if (null != topicBeans && !topicBeans.isEmpty()) {
                        getView().bindData(topicBeans, true);
                        getView().showContent();
                    } else {
                        getView().showEmpty();
                    }
                }

                @Override
                public void onError(Throwable t) {
                    getView().showError();
                }

                @Override
                public void onComplete() {
                }
            }));
}
 
源代码4 项目: JReadHub   文件: StarTopicPresenter.java
@Override
public void getDataWithKeyword(String keyWord) {
    addSubscribe(mDataManager.getTopicsByKeyword(keyWord)
            .compose(RxSchedulers.flowableIo2Main())
            .doOnSubscribe(disposable -> getView().showLoading(false))
            .subscribeWith(new DisposableSubscriber<List<TopicDetailBean>>() {
                @Override
                public void onNext(List<TopicDetailBean> topicBeans) {
                    if (null != topicBeans && !topicBeans.isEmpty()) {
                        getView().bindData(topicBeans, true);
                        getView().showContent();
                    } else {
                        getView().showEmpty();
                    }
                }

                @Override
                public void onError(Throwable t) {
                    getView().showError();
                }

                @Override
                public void onComplete() {
                }
            }));
}
 
源代码5 项目: JReadHub   文件: StarTopicPresenter.java
@Override
public void doRefresh(boolean isPullToRefresh) {
    addSubscribe(mDataManager.getAllTopic()
            .compose(RxSchedulers.flowableIo2Main())
            .doOnSubscribe(disposable -> getView().showLoading(isPullToRefresh))
            .subscribeWith(new DisposableSubscriber<List<TopicDetailBean>>() {
                @Override
                public void onNext(List<TopicDetailBean> topicBeans) {
                    if (null != topicBeans && !topicBeans.isEmpty()) {
                        getView().bindData(topicBeans, true);
                        getView().showContent();
                    } else {
                        getView().showEmpty();
                    }
                }

                @Override
                public void onError(Throwable t) {
                    getView().showError();
                }

                @Override
                public void onComplete() {
                }
            }));
}
 
源代码6 项目: RxBus2   文件: RxBusBuilder.java
public <R> Disposable subscribe(DisposableSubscriber<R> subscriber, FlowableTransformer<T, R> transformer)
{
    Flowable flowable = build(false);
    if (transformer != null)
        flowable = flowable.compose(transformer);

    Subscriber<R> actualSubscriber = subscriber;
    if (mQueuer != null && mQueueSubscriptionSafetyCheckEnabled)
        actualSubscriber = RxBusUtil.wrapSubscriber(subscriber, mQueuer);

    flowable = applySchedular(flowable);
    Disposable disposable = (DisposableSubscriber)flowable.subscribeWith(actualSubscriber);
    if (mBoundObject != null)
        RxDisposableManager.addDisposable(mBoundObject, disposable);
    return disposable;
}
 
源代码7 项目: RxBus2   文件: RxBusUtil.java
protected static <T> DisposableSubscriber<T> wrapSubscriber(DisposableSubscriber<T> subscriber, IRxBusQueue isResumedProvider)
{
    return new DisposableSubscriber<T>()
    {
        @Override
        public void onComplete()
        {
            subscriber.onComplete();
        }

        @Override
        public void onError(Throwable e)
        {
            subscriber.onError(e);
        }

        @Override
        public void onNext(T t)
        {
            if (RxUtil.safetyQueueCheck(t, isResumedProvider))
                subscriber.onNext(t);
        }
    };
}
 
源代码8 项目: Android-AudioRecorder-App   文件: AudioRecorder.java
private void startRecordThread() {
  audioDataFlowable.subscribeOn(Schedulers.io()).subscribe(recordDataPublishProcessor);
  compositeDisposable.add(recordDataPublishProcessor.onBackpressureBuffer()
      .observeOn(Schedulers.io())
      .subscribeWith(new DisposableSubscriber<byte[]>() {
        @Override public void onNext(byte[] bytes) {
          audioSaveHelper.onDataReady(recordBuffer);
        }

        @Override public void onError(Throwable t) {

        }

        @Override public void onComplete() {
          audioSaveHelper.onRecordingStopped(currentRecordTime);
          synchronized (recorderStateMonitor) {
            recorderState = RECORDER_STATE_IDLE;
            recorderStateMonitor.notifyAll();
          }
        }
      }));
}
 
@Override public void startDbmThread() {
  disposableSubscriber = audioRecorder.getAudioDataFlowable()
      .onBackpressureLatest()
      .observeOn(Schedulers.newThread())
      .subscribeWith(new DisposableSubscriber<byte[]>() {
        @Override public void onNext(byte[] moreData) {
          if (moreData != null && moreData.length > 0) {
            onDataReceived(moreData);
          }
        }

        @Override public void onError(Throwable t) {
          Log.i("Visual Error ", t.getMessage() + " ");
        }

        @Override public void onComplete() {
          Log.i("Visualise ", "complete");
        }
      });
}
 
@Override
public void setStream(Flowable<Integer> intStream) {
  DisposableSubscriber<Integer> d =
      new DisposableSubscriber<Integer>() {
        @Override
        public void onNext(Integer integer) {
          _log(String.format("Worker frag spits out - %d", integer));
        }

        @Override
        public void onError(Throwable e) {
          Timber.e(e, "Error in worker demo frag observable");
          _log("Dang! something went wrong.");
        }

        @Override
        public void onComplete() {
          _log("Observable is complete");
        }
      };

  intStream.doOnSubscribe(subscription -> _log("Subscribing to intsObservable")).subscribe(d);

  _disposables.add(d);
}
 
源代码11 项目: RxAndroid-Examples   文件: MainActivity.java
@OnClick(R.id.btnInterval)
public void startTimerInterval(View view) {
    if (subscriberInterval != null && !subscriberInterval.isDisposed()) {
        subscriberInterval.dispose();
    }
    subscriberInterval = new DisposableSubscriber<Long>() {
        @Override
        public void onNext(Long aLong) {
            addLogMessage("Timer interval:  " + TimeUtil.getCurrentTime());
        }

        @Override
        public void onError(Throwable t) {
            addLogMessage("ERROR Timer interval: " + t.getMessage());
        }

        @Override
        public void onComplete() {
            addLogMessage("Timer interval completed !!");
        }
    };
    addLogMessage("START 2s timer interval...");
    Flowable.interval(POLL_INTERVAL, TimeUnit.SECONDS)
            .observeOn(AndroidSchedulers.mainThread())
            .subscribeOn(Schedulers.io())
            .subscribe(subscriberInterval);
}
 
源代码12 项目: RxAndroid-Examples   文件: MainActivity.java
@OnClick(R.id.btnDelayInterval)
public void startDelayTimer(View view) {
    if (subscriberDelayInterval != null && !subscriberDelayInterval.isDisposed()) {
        subscriberDelayInterval.dispose();
    }
    subscriberDelayInterval = new DisposableSubscriber<Long>() {
        @Override
        public void onNext(Long aLong) {
            addLogMessage("Delay Timer interval: " + TimeUtil.getCurrentTime());
        }

        @Override
        public void onError(Throwable t) {
            addLogMessage("ERROR delay timer: " + t.getMessage());
        }

        @Override
        public void onComplete() {
            addLogMessage("Delay timer completed !!");
        }
    };

    addLogMessage("START timer interval after " + DELAY_TIME + "s !!");

    Flowable.interval(DELAY_TIME, POLL_INTERVAL, TimeUnit.SECONDS)
            .observeOn(AndroidSchedulers.mainThread())
            .subscribeOn(Schedulers.io())
            .subscribe(subscriberDelayInterval);
}
 
public static void main(String[] args) {
	// TODO Auto-generated method stub
	DisposableSubscriber<Long> disposableSubscriber = new DisposableSubscriber<Long>() {

		
		@Override
		public void onComplete() {
			// TODO Auto-generated method stub
			System.out.println("Its Done!!!");

		}

		@Override
		public void onError(Throwable throwable) {
			// TODO Auto-generated method stub
			throwable.printStackTrace();

		}

		@Override
		public void onNext(Long value_long) {
			// TODO Auto-generated method stub
			if(value_long==7)
				dispose();
			System.out.println("value :-" + value_long);
		}

		@Override
		protected void onStart() {
			// TODO Auto-generated method stub
			request(Long.MAX_VALUE);
		}
	};

	Flowable.rangeLong(5, 4).subscribe(disposableSubscriber);
	disposableSubscriber.dispose();

}
 
源代码14 项目: rxjava2   文件: FlowableRange.java
public static void main(String[] args) {

       subscriber = new DisposableSubscriber<Integer>() {

            public void onStart() {
                request(5);

              while (true){     // Emulate some processing
                try {
                    Thread.sleep(1000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                request(1);
              }
            }

            public void onNext(Integer t) {
                System.out.println("processing "+ t);

                if (t==8) {
                    subscriber.dispose();
                }
            }

            public void onError(Throwable thr) {
                System.err.println("In onError(): " + thr.getMessage());
            }

            public void onComplete() {
                System.out.println("Done");
            }
        };

        Flowable.range(1, 10)
                .delay(1, TimeUnit.SECONDS)
                .subscribe(subscriber);
    }
 
源代码15 项目: IncDec   文件: IncDecCircular.java
/** Rx Java **/

    private void startIncrementObserver(){

        _Incrementsubscriber=new DisposableSubscriber<Long>() {

            @Override
            public void onNext(Long aLong) {
                if(isLeftButtonLongPressed||isRightButtonLongPressed) {
                    IncrementAction();
                }
                else
                    _Incrementsubscriber.dispose();

            }

            @Override
            public void onError(Throwable t) {
                Log.i(TAG,t.getMessage());
            }

            @Override
            public void onComplete() {

            }
        };

        Flowable.interval(0,seconds,TimeUnit.MILLISECONDS)
                .observeOn(AndroidSchedulers.mainThread())
                .subscribe(_Incrementsubscriber);

    }
 
源代码16 项目: IncDec   文件: IncDecCircular.java
private void startDecrementObserver(){

        _Decrementsubscriber=new DisposableSubscriber<Long>() {

            @Override
            public void onNext(Long aLong) {
                if(isLeftButtonLongPressed||isRightButtonLongPressed) {
                    DecrementAction();
                }
                else
                    _Decrementsubscriber.dispose();

            }

            @Override
            public void onError(Throwable t) {
                Log.i(TAG,t.getMessage());
            }

            @Override
            public void onComplete() {

            }
        };

        Flowable.interval(0,seconds,TimeUnit.MILLISECONDS)
                .observeOn(AndroidSchedulers.mainThread())
                .subscribe(_Decrementsubscriber);


    }
 
源代码17 项目: IncDec   文件: IncDecImageButton.java
/** Rx Java **/

    private void startIncrementObserver(){

        _Incrementsubscriber=new DisposableSubscriber<Long>() {

            @Override
            public void onNext(Long aLong) {
                if(isLeftButtonLongPressed||isRightButtonLongPressed) {
                    IncrementAction();
                }
                else
                    _Incrementsubscriber.dispose();

            }

            @Override
            public void onError(Throwable t) {
                Log.i(TAG,t.getMessage());
            }

            @Override
            public void onComplete() {

            }
        };

        Flowable.interval(0,seconds, TimeUnit.MILLISECONDS)
                .observeOn(AndroidSchedulers.mainThread())
                .subscribe(_Incrementsubscriber);

    }
 
源代码18 项目: IncDec   文件: IncDecImageButton.java
private void startDecrementObserver(){

        _Decrementsubscriber=new DisposableSubscriber<Long>() {

            @Override
            public void onNext(Long aLong) {
                if(isLeftButtonLongPressed||isRightButtonLongPressed) {
                    DecrementAction();
                }
                else
                    _Decrementsubscriber.dispose();

            }

            @Override
            public void onError(Throwable t) {
                Log.i(TAG,t.getMessage());
            }

            @Override
            public void onComplete() {

            }
        };

        Flowable.interval(0,seconds,TimeUnit.MILLISECONDS)
                .observeOn(AndroidSchedulers.mainThread())
                .subscribe(_Decrementsubscriber);


    }
 
private void subscribeRepoCommentAdded() {
  ApolloSubscriptionCall<RepoCommentAddedSubscription.Data> subscriptionCall = application.apolloClient()
      .subscribe(new RepoCommentAddedSubscription(repoFullName));

  disposables.add(Rx2Apollo.from(subscriptionCall)
      .subscribeOn(Schedulers.io())
      .observeOn(AndroidSchedulers.mainThread())
      .subscribeWith(
          new DisposableSubscriber<Response<RepoCommentAddedSubscription.Data>>() {
            @Override public void onNext(Response<RepoCommentAddedSubscription.Data> response) {
                final RepoCommentAddedSubscription.Data data = response.data();
                if (data != null) {
                    RepoCommentAddedSubscription.CommentAdded newComment = data.commentAdded();
                    if (newComment != null) {
                        commentsListViewAdapter.addItem(newComment.content());
                    } else {
                        Log.w(TAG, "Comment added subscription data is null.");
                    }
                    Toast.makeText(GitHuntEntryDetailActivity.this, "Subscription response received", Toast.LENGTH_SHORT)
                            .show();
                }
            }

            @Override public void onError(Throwable e) {
              Log.e(TAG, e.getMessage(), e);
              Toast.makeText(GitHuntEntryDetailActivity.this, "Subscription failure", Toast.LENGTH_SHORT).show();
            }

            @Override public void onComplete() {
              Log.d(TAG, "Subscription exhausted");
              Toast.makeText(GitHuntEntryDetailActivity.this, "Subscription complete", Toast.LENGTH_SHORT).show();
            }
          }
      )
  );
}
 
@Override
public void observeResults(Flowable<Integer> intsFlowable) {

  DisposableSubscriber<Integer> d =
      new DisposableSubscriber<Integer>() {
        @Override
        public void onNext(Integer integer) {
          _log(String.format("Worker frag spits out - %d", integer));
        }

        @Override
        public void onError(Throwable e) {
          Timber.e(e, "Error in worker demo frag observable");
          _log("Dang! something went wrong.");
        }

        @Override
        public void onComplete() {
          _log("Observable is complete");
        }
      };

  intsFlowable
      .doOnSubscribe(
          subscription -> {
            _log("Subscribing to intsObservable");
          })
      .subscribe(d);

  _disposables.add(d);
}
 
@OnClick(R.id.btn_eb_retry)
public void startRetryingWithExponentialBackoffStrategy() {
  _logs = new ArrayList<>();
  _adapter.clear();

  DisposableSubscriber<Object> disposableSubscriber =
      new DisposableSubscriber<Object>() {
        @Override
        public void onNext(Object aVoid) {
          Timber.d("on Next");
        }

        @Override
        public void onComplete() {
          Timber.d("on Completed");
        }

        @Override
        public void onError(Throwable e) {
          _log("Error: I give up!");
        }
      };

  Flowable.error(new RuntimeException("testing")) // always fails
      .retryWhen(new RetryWithDelay(5, 1000)) // notice this is called only onError (onNext
      // values sent are ignored)
      .doOnSubscribe(subscription -> _log("Attempting the impossible 5 times in intervals of 1s"))
      .subscribe(disposableSubscriber);

  _disposables.add(disposableSubscriber);
}
 
@OnClick(R.id.btn_demo_timing_2)
public void btn2_RunTask_IntervalOf1s() {
  if (_subscriber1 != null && !_subscriber1.isDisposed()) {
    _subscriber1.dispose();
    _log(String.format("B2 [%s] XXX BTN KILLED", _getCurrentTimestamp()));
    return;
  }

  _log(String.format("B2 [%s] --- BTN click", _getCurrentTimestamp()));

  _subscriber1 =
      new DisposableSubscriber<Long>() {
        @Override
        public void onComplete() {
          _log(String.format("B2 [%s] XXXX COMPLETE", _getCurrentTimestamp()));
        }

        @Override
        public void onError(Throwable e) {
          Timber.e(e, "something went wrong in TimingDemoFragment example");
        }

        @Override
        public void onNext(Long number) {
          _log(String.format("B2 [%s]     NEXT", _getCurrentTimestamp()));
        }
      };

  Flowable.interval(1, TimeUnit.SECONDS).subscribe(_subscriber1);
}
 
@OnClick(R.id.btn_demo_timing_3)
public void btn3_RunTask_IntervalOf1s_StartImmediately() {
  if (_subscriber2 != null && !_subscriber2.isDisposed()) {
    _subscriber2.dispose();
    _log(String.format("C3 [%s] XXX BTN KILLED", _getCurrentTimestamp()));
    return;
  }

  _log(String.format("C3 [%s] --- BTN click", _getCurrentTimestamp()));

  _subscriber2 =
      new DisposableSubscriber<Long>() {
        @Override
        public void onNext(Long number) {
          _log(String.format("C3 [%s]     NEXT", _getCurrentTimestamp()));
        }

        @Override
        public void onComplete() {
          _log(String.format("C3 [%s] XXXX COMPLETE", _getCurrentTimestamp()));
        }

        @Override
        public void onError(Throwable e) {
          Timber.e(e, "something went wrong in TimingDemoFragment example");
        }
      };

  Flowable.interval(0, 1, TimeUnit.SECONDS).subscribe(_subscriber2);
}
 
源代码24 项目: JReadHub   文件: WebViewPresenter.java
@Override
public void checkStar(String newsId) {
    addSubscribe(mDataManager.getNewsById(newsId)
            .compose(RxSchedulers.flowableIo2Main())
            .subscribeWith(new DisposableSubscriber<List<NewsBean>>() {
                @Override
                public void onNext(List<NewsBean> topicBean) {
                    if (null != topicBean && topicBean.size() > 0) {
                        getView().onCheckStarResult(true);
                    } else {
                        getView().onCheckStarResult(false);
                    }
                }

                @Override
                public void onError(Throwable t) {

                }

                @Override
                public void onComplete() {

                }
            }));
    /*mDataManager.getSingleBean(NewsBean.class, newsId)
            .subscribeOn(Schedulers.io())
            .observeOn(AndroidSchedulers.mainThread())
            .subscribe(new SingleObserver<NewsBean>() {
                @Override
                public void onSubscribe(Disposable d) {
                    addSubscribe(d);
                }

                @Override
                public void onSuccess(NewsBean newsBean) {
                    if (null != newsBean) {
                        getView().onCheckStarResult(true);
                    }
                }

                @Override
                public void onError(Throwable e) {
                    if (e instanceof EmptyResultSetException) {
                        getView().onCheckStarResult(false);
                    }
                }
            });*/
}
 
源代码25 项目: JReadHub   文件: TopicDetailPresenter.java
@Override
public void checkStar(String topicId) {
    addSubscribe(mDataManager.getTopicById(topicId)
            .compose(RxSchedulers.flowableIo2Main())
            .subscribeWith(new DisposableSubscriber<List<TopicDetailBean>>() {
                @Override
                public void onNext(List<TopicDetailBean> topicBean) {
                    if (null != topicBean && topicBean.size() > 0) {
                        getView().onCheckStarResult(true);
                    } else {
                        getView().onCheckStarResult(false);
                    }
                }

                @Override
                public void onError(Throwable t) {

                }

                @Override
                public void onComplete() {

                }
            }));
    /*mDataManager.getSingleBean(TopicBean.class, topicId)
            .subscribeOn(Schedulers.io())
            .observeOn(AndroidSchedulers.mainThread())
            .subscribe(new SingleObserver<TopicBean>() {
                @Override
                public void onSubscribe(Disposable d) {
                    addSubscribe(d);
                }

                @Override
                public void onSuccess(TopicBean topicBean) {
                    if (null != topicBean) {
                        getView().onCheckStarResult(true, showTips);
                    }
                }

                @Override
                public void onError(Throwable e) {
                    if (e instanceof EmptyResultSetException) {
                        getView().onCheckStarResult(false, showTips);
                    }
                }
            });*/
}
 
源代码26 项目: RxAndroid-Examples   文件: MainActivity.java
private void combineLatestEvents() {
    disposableSubscriber = new DisposableSubscriber<Boolean>() {
        @Override
        public void onNext(Boolean isValidForm) {
            if (isValidForm) {
                btnSubmit.setBackgroundColor(ContextCompat.getColor(MainActivity.this, R.color.blue));
            } else {
                btnSubmit.setBackgroundColor(ContextCompat.getColor(MainActivity.this, R.color.gray));
            }
        }

        @Override
        public void onError(Throwable t) {
            Log.d(TAG, "onError       ==/   " + t.getMessage());
        }

        @Override
        public void onComplete() {
            Log.d(TAG, "onComplete       ==/    called");
        }
    };

    Flowable.combineLatest(firstNameChangeObservable, lastNameChangeObservable, emailChangeObservable,
            (firstName, lastName, email) -> {
                boolean isValidFirstName = firstName != null && firstName.length() > 3;
                if (!isValidFirstName) {
                    etFirstName.setError("Invalid first name");
                }

                boolean isValidLastName = lastName != null && lastName.length() > 3;
                if (!isValidLastName) {
                    etLastName.setError("Invalid last name");
                }

                boolean isValidEmail = email != null && Validator.isValidEmailAddress(String.valueOf(email));
                if (!isValidEmail) {
                    etEmail.setError("Invalid email addess");
                }

                return isValidFirstName && isValidLastName && isValidEmail;
            })
            .subscribe(disposableSubscriber);
}
 
源代码27 项目: CrazyDaily   文件: UseCase.java
public void execute(Params params, DisposableSubscriber<T> subscriber) {
    Preconditions.checkNotNull(subscriber);
    final Flowable<T> flowable = this.buildUseCaseObservable(params);
    addDisposable(flowable.subscribeWith(subscriber));
}
 
源代码28 项目: CrazyDaily   文件: UseCase.java
public <P> void execute(Flowable<P> flowable, DisposableSubscriber<P> subscriber) {
    Preconditions.checkNotNull(flowable);
    Preconditions.checkNotNull(subscriber);
    addDisposable(flowable.subscribeWith(subscriber));
}
 
源代码29 项目: CrazyDaily   文件: UseCase.java
public void execute(DisposableSubscriber<T> subscriber) {
    execute((Params) null, subscriber);
}
 
private void _combineLatestEvents() {

    _disposableObserver =
        new DisposableSubscriber<Boolean>() {
          @Override
          public void onNext(Boolean formValid) {
            if (formValid) {
              _btnValidIndicator.setBackgroundColor(
                  ContextCompat.getColor(getContext(), R.color.blue));
            } else {
              _btnValidIndicator.setBackgroundColor(
                  ContextCompat.getColor(getContext(), R.color.gray));
            }
          }

          @Override
          public void onError(Throwable e) {
            Timber.e(e, "there was an error");
          }

          @Override
          public void onComplete() {
            Timber.d("completed");
          }
        };

    Flowable.combineLatest(
            _emailChangeObservable,
            _passwordChangeObservable,
            _numberChangeObservable,
            (newEmail, newPassword, newNumber) -> {
              boolean emailValid = !isEmpty(newEmail) && EMAIL_ADDRESS.matcher(newEmail).matches();
              if (!emailValid) {
                _email.setError("Invalid Email!");
              }

              boolean passValid = !isEmpty(newPassword) && newPassword.length() > 8;
              if (!passValid) {
                _password.setError("Invalid Password!");
              }

              boolean numValid = !isEmpty(newNumber);
              if (numValid) {
                int num = Integer.parseInt(newNumber.toString());
                numValid = num > 0 && num <= 100;
              }
              if (!numValid) {
                _number.setError("Invalid Number!");
              }

              return emailValid && passValid && numValid;
            })
        .subscribe(_disposableObserver);
  }
 
 类所在包
 类方法
 同包方法