下面列出了怎么用io.reactivex.subscribers.DisposableSubscriber的API类实例代码及写法,或者点击链接到github查看源代码。
@Override
public void loadAllHistory() {
addSubscribe(mDataManager.getAllSearchHistroy()
.compose(RxSchedulers.flowableIo2Main())
.subscribeWith(new DisposableSubscriber<List<SearchHistoryBean>>() {
@Override
public void onNext(List<SearchHistoryBean> searchHistoryBeans) {
if (searchHistoryBeans != null && !searchHistoryBeans.isEmpty()) {
getView().bindData(searchHistoryBeans);
} else {
getView().showEmpty();
}
}
@Override
public void onError(Throwable t) {
}
@Override
public void onComplete() {
}
}));
}
@Override
public void getDataWithKeyword(String keyWord) {
addSubscribe(mDataManager.getNewsByKeyword(keyWord)
.compose(RxSchedulers.flowableIo2Main())
.doOnSubscribe(disposable -> getView().showLoading(false))
.subscribeWith(new DisposableSubscriber<List<NewsBean>>() {
@Override
public void onNext(List<NewsBean> topicBeans) {
if (null != topicBeans && !topicBeans.isEmpty()) {
getView().bindData(topicBeans, true);
getView().showContent();
} else {
getView().showEmpty();
}
}
@Override
public void onError(Throwable t) {
getView().showError();
}
@Override
public void onComplete() {
}
}));
}
@Override
public void doRefresh(boolean isPullToRefresh) {
addSubscribe(mDataManager.getAllNews()
.compose(RxSchedulers.flowableIo2Main())
.doOnSubscribe(disposable -> getView().showLoading(isPullToRefresh))
.subscribeWith(new DisposableSubscriber<List<NewsBean>>() {
@Override
public void onNext(List<NewsBean> topicBeans) {
if (null != topicBeans && !topicBeans.isEmpty()) {
getView().bindData(topicBeans, true);
getView().showContent();
} else {
getView().showEmpty();
}
}
@Override
public void onError(Throwable t) {
getView().showError();
}
@Override
public void onComplete() {
}
}));
}
@Override
public void getDataWithKeyword(String keyWord) {
addSubscribe(mDataManager.getTopicsByKeyword(keyWord)
.compose(RxSchedulers.flowableIo2Main())
.doOnSubscribe(disposable -> getView().showLoading(false))
.subscribeWith(new DisposableSubscriber<List<TopicDetailBean>>() {
@Override
public void onNext(List<TopicDetailBean> topicBeans) {
if (null != topicBeans && !topicBeans.isEmpty()) {
getView().bindData(topicBeans, true);
getView().showContent();
} else {
getView().showEmpty();
}
}
@Override
public void onError(Throwable t) {
getView().showError();
}
@Override
public void onComplete() {
}
}));
}
@Override
public void doRefresh(boolean isPullToRefresh) {
addSubscribe(mDataManager.getAllTopic()
.compose(RxSchedulers.flowableIo2Main())
.doOnSubscribe(disposable -> getView().showLoading(isPullToRefresh))
.subscribeWith(new DisposableSubscriber<List<TopicDetailBean>>() {
@Override
public void onNext(List<TopicDetailBean> topicBeans) {
if (null != topicBeans && !topicBeans.isEmpty()) {
getView().bindData(topicBeans, true);
getView().showContent();
} else {
getView().showEmpty();
}
}
@Override
public void onError(Throwable t) {
getView().showError();
}
@Override
public void onComplete() {
}
}));
}
public <R> Disposable subscribe(DisposableSubscriber<R> subscriber, FlowableTransformer<T, R> transformer)
{
Flowable flowable = build(false);
if (transformer != null)
flowable = flowable.compose(transformer);
Subscriber<R> actualSubscriber = subscriber;
if (mQueuer != null && mQueueSubscriptionSafetyCheckEnabled)
actualSubscriber = RxBusUtil.wrapSubscriber(subscriber, mQueuer);
flowable = applySchedular(flowable);
Disposable disposable = (DisposableSubscriber)flowable.subscribeWith(actualSubscriber);
if (mBoundObject != null)
RxDisposableManager.addDisposable(mBoundObject, disposable);
return disposable;
}
protected static <T> DisposableSubscriber<T> wrapSubscriber(DisposableSubscriber<T> subscriber, IRxBusQueue isResumedProvider)
{
return new DisposableSubscriber<T>()
{
@Override
public void onComplete()
{
subscriber.onComplete();
}
@Override
public void onError(Throwable e)
{
subscriber.onError(e);
}
@Override
public void onNext(T t)
{
if (RxUtil.safetyQueueCheck(t, isResumedProvider))
subscriber.onNext(t);
}
};
}
private void startRecordThread() {
audioDataFlowable.subscribeOn(Schedulers.io()).subscribe(recordDataPublishProcessor);
compositeDisposable.add(recordDataPublishProcessor.onBackpressureBuffer()
.observeOn(Schedulers.io())
.subscribeWith(new DisposableSubscriber<byte[]>() {
@Override public void onNext(byte[] bytes) {
audioSaveHelper.onDataReady(recordBuffer);
}
@Override public void onError(Throwable t) {
}
@Override public void onComplete() {
audioSaveHelper.onRecordingStopped(currentRecordTime);
synchronized (recorderStateMonitor) {
recorderState = RECORDER_STATE_IDLE;
recorderStateMonitor.notifyAll();
}
}
}));
}
@Override public void startDbmThread() {
disposableSubscriber = audioRecorder.getAudioDataFlowable()
.onBackpressureLatest()
.observeOn(Schedulers.newThread())
.subscribeWith(new DisposableSubscriber<byte[]>() {
@Override public void onNext(byte[] moreData) {
if (moreData != null && moreData.length > 0) {
onDataReceived(moreData);
}
}
@Override public void onError(Throwable t) {
Log.i("Visual Error ", t.getMessage() + " ");
}
@Override public void onComplete() {
Log.i("Visualise ", "complete");
}
});
}
@Override
public void setStream(Flowable<Integer> intStream) {
DisposableSubscriber<Integer> d =
new DisposableSubscriber<Integer>() {
@Override
public void onNext(Integer integer) {
_log(String.format("Worker frag spits out - %d", integer));
}
@Override
public void onError(Throwable e) {
Timber.e(e, "Error in worker demo frag observable");
_log("Dang! something went wrong.");
}
@Override
public void onComplete() {
_log("Observable is complete");
}
};
intStream.doOnSubscribe(subscription -> _log("Subscribing to intsObservable")).subscribe(d);
_disposables.add(d);
}
@OnClick(R.id.btnInterval)
public void startTimerInterval(View view) {
if (subscriberInterval != null && !subscriberInterval.isDisposed()) {
subscriberInterval.dispose();
}
subscriberInterval = new DisposableSubscriber<Long>() {
@Override
public void onNext(Long aLong) {
addLogMessage("Timer interval: " + TimeUtil.getCurrentTime());
}
@Override
public void onError(Throwable t) {
addLogMessage("ERROR Timer interval: " + t.getMessage());
}
@Override
public void onComplete() {
addLogMessage("Timer interval completed !!");
}
};
addLogMessage("START 2s timer interval...");
Flowable.interval(POLL_INTERVAL, TimeUnit.SECONDS)
.observeOn(AndroidSchedulers.mainThread())
.subscribeOn(Schedulers.io())
.subscribe(subscriberInterval);
}
@OnClick(R.id.btnDelayInterval)
public void startDelayTimer(View view) {
if (subscriberDelayInterval != null && !subscriberDelayInterval.isDisposed()) {
subscriberDelayInterval.dispose();
}
subscriberDelayInterval = new DisposableSubscriber<Long>() {
@Override
public void onNext(Long aLong) {
addLogMessage("Delay Timer interval: " + TimeUtil.getCurrentTime());
}
@Override
public void onError(Throwable t) {
addLogMessage("ERROR delay timer: " + t.getMessage());
}
@Override
public void onComplete() {
addLogMessage("Delay timer completed !!");
}
};
addLogMessage("START timer interval after " + DELAY_TIME + "s !!");
Flowable.interval(DELAY_TIME, POLL_INTERVAL, TimeUnit.SECONDS)
.observeOn(AndroidSchedulers.mainThread())
.subscribeOn(Schedulers.io())
.subscribe(subscriberDelayInterval);
}
public static void main(String[] args) {
// TODO Auto-generated method stub
DisposableSubscriber<Long> disposableSubscriber = new DisposableSubscriber<Long>() {
@Override
public void onComplete() {
// TODO Auto-generated method stub
System.out.println("Its Done!!!");
}
@Override
public void onError(Throwable throwable) {
// TODO Auto-generated method stub
throwable.printStackTrace();
}
@Override
public void onNext(Long value_long) {
// TODO Auto-generated method stub
if(value_long==7)
dispose();
System.out.println("value :-" + value_long);
}
@Override
protected void onStart() {
// TODO Auto-generated method stub
request(Long.MAX_VALUE);
}
};
Flowable.rangeLong(5, 4).subscribe(disposableSubscriber);
disposableSubscriber.dispose();
}
public static void main(String[] args) {
subscriber = new DisposableSubscriber<Integer>() {
public void onStart() {
request(5);
while (true){ // Emulate some processing
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
request(1);
}
}
public void onNext(Integer t) {
System.out.println("processing "+ t);
if (t==8) {
subscriber.dispose();
}
}
public void onError(Throwable thr) {
System.err.println("In onError(): " + thr.getMessage());
}
public void onComplete() {
System.out.println("Done");
}
};
Flowable.range(1, 10)
.delay(1, TimeUnit.SECONDS)
.subscribe(subscriber);
}
/** Rx Java **/
private void startIncrementObserver(){
_Incrementsubscriber=new DisposableSubscriber<Long>() {
@Override
public void onNext(Long aLong) {
if(isLeftButtonLongPressed||isRightButtonLongPressed) {
IncrementAction();
}
else
_Incrementsubscriber.dispose();
}
@Override
public void onError(Throwable t) {
Log.i(TAG,t.getMessage());
}
@Override
public void onComplete() {
}
};
Flowable.interval(0,seconds,TimeUnit.MILLISECONDS)
.observeOn(AndroidSchedulers.mainThread())
.subscribe(_Incrementsubscriber);
}
private void startDecrementObserver(){
_Decrementsubscriber=new DisposableSubscriber<Long>() {
@Override
public void onNext(Long aLong) {
if(isLeftButtonLongPressed||isRightButtonLongPressed) {
DecrementAction();
}
else
_Decrementsubscriber.dispose();
}
@Override
public void onError(Throwable t) {
Log.i(TAG,t.getMessage());
}
@Override
public void onComplete() {
}
};
Flowable.interval(0,seconds,TimeUnit.MILLISECONDS)
.observeOn(AndroidSchedulers.mainThread())
.subscribe(_Decrementsubscriber);
}
/** Rx Java **/
private void startIncrementObserver(){
_Incrementsubscriber=new DisposableSubscriber<Long>() {
@Override
public void onNext(Long aLong) {
if(isLeftButtonLongPressed||isRightButtonLongPressed) {
IncrementAction();
}
else
_Incrementsubscriber.dispose();
}
@Override
public void onError(Throwable t) {
Log.i(TAG,t.getMessage());
}
@Override
public void onComplete() {
}
};
Flowable.interval(0,seconds, TimeUnit.MILLISECONDS)
.observeOn(AndroidSchedulers.mainThread())
.subscribe(_Incrementsubscriber);
}
private void startDecrementObserver(){
_Decrementsubscriber=new DisposableSubscriber<Long>() {
@Override
public void onNext(Long aLong) {
if(isLeftButtonLongPressed||isRightButtonLongPressed) {
DecrementAction();
}
else
_Decrementsubscriber.dispose();
}
@Override
public void onError(Throwable t) {
Log.i(TAG,t.getMessage());
}
@Override
public void onComplete() {
}
};
Flowable.interval(0,seconds,TimeUnit.MILLISECONDS)
.observeOn(AndroidSchedulers.mainThread())
.subscribe(_Decrementsubscriber);
}
private void subscribeRepoCommentAdded() {
ApolloSubscriptionCall<RepoCommentAddedSubscription.Data> subscriptionCall = application.apolloClient()
.subscribe(new RepoCommentAddedSubscription(repoFullName));
disposables.add(Rx2Apollo.from(subscriptionCall)
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribeWith(
new DisposableSubscriber<Response<RepoCommentAddedSubscription.Data>>() {
@Override public void onNext(Response<RepoCommentAddedSubscription.Data> response) {
final RepoCommentAddedSubscription.Data data = response.data();
if (data != null) {
RepoCommentAddedSubscription.CommentAdded newComment = data.commentAdded();
if (newComment != null) {
commentsListViewAdapter.addItem(newComment.content());
} else {
Log.w(TAG, "Comment added subscription data is null.");
}
Toast.makeText(GitHuntEntryDetailActivity.this, "Subscription response received", Toast.LENGTH_SHORT)
.show();
}
}
@Override public void onError(Throwable e) {
Log.e(TAG, e.getMessage(), e);
Toast.makeText(GitHuntEntryDetailActivity.this, "Subscription failure", Toast.LENGTH_SHORT).show();
}
@Override public void onComplete() {
Log.d(TAG, "Subscription exhausted");
Toast.makeText(GitHuntEntryDetailActivity.this, "Subscription complete", Toast.LENGTH_SHORT).show();
}
}
)
);
}
@Override
public void observeResults(Flowable<Integer> intsFlowable) {
DisposableSubscriber<Integer> d =
new DisposableSubscriber<Integer>() {
@Override
public void onNext(Integer integer) {
_log(String.format("Worker frag spits out - %d", integer));
}
@Override
public void onError(Throwable e) {
Timber.e(e, "Error in worker demo frag observable");
_log("Dang! something went wrong.");
}
@Override
public void onComplete() {
_log("Observable is complete");
}
};
intsFlowable
.doOnSubscribe(
subscription -> {
_log("Subscribing to intsObservable");
})
.subscribe(d);
_disposables.add(d);
}
@OnClick(R.id.btn_eb_retry)
public void startRetryingWithExponentialBackoffStrategy() {
_logs = new ArrayList<>();
_adapter.clear();
DisposableSubscriber<Object> disposableSubscriber =
new DisposableSubscriber<Object>() {
@Override
public void onNext(Object aVoid) {
Timber.d("on Next");
}
@Override
public void onComplete() {
Timber.d("on Completed");
}
@Override
public void onError(Throwable e) {
_log("Error: I give up!");
}
};
Flowable.error(new RuntimeException("testing")) // always fails
.retryWhen(new RetryWithDelay(5, 1000)) // notice this is called only onError (onNext
// values sent are ignored)
.doOnSubscribe(subscription -> _log("Attempting the impossible 5 times in intervals of 1s"))
.subscribe(disposableSubscriber);
_disposables.add(disposableSubscriber);
}
@OnClick(R.id.btn_demo_timing_2)
public void btn2_RunTask_IntervalOf1s() {
if (_subscriber1 != null && !_subscriber1.isDisposed()) {
_subscriber1.dispose();
_log(String.format("B2 [%s] XXX BTN KILLED", _getCurrentTimestamp()));
return;
}
_log(String.format("B2 [%s] --- BTN click", _getCurrentTimestamp()));
_subscriber1 =
new DisposableSubscriber<Long>() {
@Override
public void onComplete() {
_log(String.format("B2 [%s] XXXX COMPLETE", _getCurrentTimestamp()));
}
@Override
public void onError(Throwable e) {
Timber.e(e, "something went wrong in TimingDemoFragment example");
}
@Override
public void onNext(Long number) {
_log(String.format("B2 [%s] NEXT", _getCurrentTimestamp()));
}
};
Flowable.interval(1, TimeUnit.SECONDS).subscribe(_subscriber1);
}
@OnClick(R.id.btn_demo_timing_3)
public void btn3_RunTask_IntervalOf1s_StartImmediately() {
if (_subscriber2 != null && !_subscriber2.isDisposed()) {
_subscriber2.dispose();
_log(String.format("C3 [%s] XXX BTN KILLED", _getCurrentTimestamp()));
return;
}
_log(String.format("C3 [%s] --- BTN click", _getCurrentTimestamp()));
_subscriber2 =
new DisposableSubscriber<Long>() {
@Override
public void onNext(Long number) {
_log(String.format("C3 [%s] NEXT", _getCurrentTimestamp()));
}
@Override
public void onComplete() {
_log(String.format("C3 [%s] XXXX COMPLETE", _getCurrentTimestamp()));
}
@Override
public void onError(Throwable e) {
Timber.e(e, "something went wrong in TimingDemoFragment example");
}
};
Flowable.interval(0, 1, TimeUnit.SECONDS).subscribe(_subscriber2);
}
@Override
public void checkStar(String newsId) {
addSubscribe(mDataManager.getNewsById(newsId)
.compose(RxSchedulers.flowableIo2Main())
.subscribeWith(new DisposableSubscriber<List<NewsBean>>() {
@Override
public void onNext(List<NewsBean> topicBean) {
if (null != topicBean && topicBean.size() > 0) {
getView().onCheckStarResult(true);
} else {
getView().onCheckStarResult(false);
}
}
@Override
public void onError(Throwable t) {
}
@Override
public void onComplete() {
}
}));
/*mDataManager.getSingleBean(NewsBean.class, newsId)
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new SingleObserver<NewsBean>() {
@Override
public void onSubscribe(Disposable d) {
addSubscribe(d);
}
@Override
public void onSuccess(NewsBean newsBean) {
if (null != newsBean) {
getView().onCheckStarResult(true);
}
}
@Override
public void onError(Throwable e) {
if (e instanceof EmptyResultSetException) {
getView().onCheckStarResult(false);
}
}
});*/
}
@Override
public void checkStar(String topicId) {
addSubscribe(mDataManager.getTopicById(topicId)
.compose(RxSchedulers.flowableIo2Main())
.subscribeWith(new DisposableSubscriber<List<TopicDetailBean>>() {
@Override
public void onNext(List<TopicDetailBean> topicBean) {
if (null != topicBean && topicBean.size() > 0) {
getView().onCheckStarResult(true);
} else {
getView().onCheckStarResult(false);
}
}
@Override
public void onError(Throwable t) {
}
@Override
public void onComplete() {
}
}));
/*mDataManager.getSingleBean(TopicBean.class, topicId)
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new SingleObserver<TopicBean>() {
@Override
public void onSubscribe(Disposable d) {
addSubscribe(d);
}
@Override
public void onSuccess(TopicBean topicBean) {
if (null != topicBean) {
getView().onCheckStarResult(true, showTips);
}
}
@Override
public void onError(Throwable e) {
if (e instanceof EmptyResultSetException) {
getView().onCheckStarResult(false, showTips);
}
}
});*/
}
private void combineLatestEvents() {
disposableSubscriber = new DisposableSubscriber<Boolean>() {
@Override
public void onNext(Boolean isValidForm) {
if (isValidForm) {
btnSubmit.setBackgroundColor(ContextCompat.getColor(MainActivity.this, R.color.blue));
} else {
btnSubmit.setBackgroundColor(ContextCompat.getColor(MainActivity.this, R.color.gray));
}
}
@Override
public void onError(Throwable t) {
Log.d(TAG, "onError ==/ " + t.getMessage());
}
@Override
public void onComplete() {
Log.d(TAG, "onComplete ==/ called");
}
};
Flowable.combineLatest(firstNameChangeObservable, lastNameChangeObservable, emailChangeObservable,
(firstName, lastName, email) -> {
boolean isValidFirstName = firstName != null && firstName.length() > 3;
if (!isValidFirstName) {
etFirstName.setError("Invalid first name");
}
boolean isValidLastName = lastName != null && lastName.length() > 3;
if (!isValidLastName) {
etLastName.setError("Invalid last name");
}
boolean isValidEmail = email != null && Validator.isValidEmailAddress(String.valueOf(email));
if (!isValidEmail) {
etEmail.setError("Invalid email addess");
}
return isValidFirstName && isValidLastName && isValidEmail;
})
.subscribe(disposableSubscriber);
}
public void execute(Params params, DisposableSubscriber<T> subscriber) {
Preconditions.checkNotNull(subscriber);
final Flowable<T> flowable = this.buildUseCaseObservable(params);
addDisposable(flowable.subscribeWith(subscriber));
}
public <P> void execute(Flowable<P> flowable, DisposableSubscriber<P> subscriber) {
Preconditions.checkNotNull(flowable);
Preconditions.checkNotNull(subscriber);
addDisposable(flowable.subscribeWith(subscriber));
}
public void execute(DisposableSubscriber<T> subscriber) {
execute((Params) null, subscriber);
}
private void _combineLatestEvents() {
_disposableObserver =
new DisposableSubscriber<Boolean>() {
@Override
public void onNext(Boolean formValid) {
if (formValid) {
_btnValidIndicator.setBackgroundColor(
ContextCompat.getColor(getContext(), R.color.blue));
} else {
_btnValidIndicator.setBackgroundColor(
ContextCompat.getColor(getContext(), R.color.gray));
}
}
@Override
public void onError(Throwable e) {
Timber.e(e, "there was an error");
}
@Override
public void onComplete() {
Timber.d("completed");
}
};
Flowable.combineLatest(
_emailChangeObservable,
_passwordChangeObservable,
_numberChangeObservable,
(newEmail, newPassword, newNumber) -> {
boolean emailValid = !isEmpty(newEmail) && EMAIL_ADDRESS.matcher(newEmail).matches();
if (!emailValid) {
_email.setError("Invalid Email!");
}
boolean passValid = !isEmpty(newPassword) && newPassword.length() > 8;
if (!passValid) {
_password.setError("Invalid Password!");
}
boolean numValid = !isEmpty(newNumber);
if (numValid) {
int num = Integer.parseInt(newNumber.toString());
numValid = num > 0 && num <= 100;
}
if (!numValid) {
_number.setError("Invalid Number!");
}
return emailValid && passValid && numValid;
})
.subscribe(_disposableObserver);
}