下面列出了怎么用io.reactivex.observers.DisposableObserver的API类实例代码及写法,或者点击链接到github查看源代码。
private void getLatestCollections() {
PredatorAccount.getAuthToken(getActivity(),
Constants.Authenticator.PREDATOR_ACCOUNT_TYPE,
PredatorSharedPreferences.getAuthTokenType(getContext().getApplicationContext()))
.subscribeOn(Schedulers.newThread())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new DisposableObserver<String>() {
@Override
public void onComplete() {
// Done
}
@Override
public void onError(Throwable e) {
Logger.e(TAG, "onError: " + e.getMessage(), e);
unableToFetchCollections(false,
false,
getString(R.string.authentication_failure));
}
@Override
public void onNext(String s) {
mCollectionsPresenter.getLatestCollections(s, true);
}
});
}
@Override
public Observer<? super Response<T>> apply(Observer<? super T> observer) throws Exception {
return new DisposableObserver<Response<T>>() {
@Override
public void onNext(Response<T> response) {
observer.onNext(response.body());
observer.onComplete();
}
@Override
public void onError(Throwable e) {
observer.onError(e);
}
@Override
public void onComplete() {
observer.onComplete();
}
};
}
public static <T> ObservableOperator<T,T> doOnEmpty(Action action){
return observer -> new DisposableObserver<T>() {
boolean isEmpty = true;
@Override
public void onNext(T value) {
isEmpty = false;
observer.onNext(value);
}
@Override
public void onError(Throwable t) {
observer.onError(t);
}
@Override
public void onComplete() {
if (isEmpty) {
try {
action.run();
} catch (Exception e) {
onError(e);
return;
}
}
observer.onComplete();
}
};
}
@Override
public Observer<? super Response<T>> apply(Observer<? super T> observer) throws Exception {
return new DisposableObserver<Response<T>>() {
@Override
public void onNext(Response<T> response) {
observer.onNext(response.body());
observer.onComplete();
}
@Override
public void onError(Throwable e) {
observer.onError(e);
}
@Override
public void onComplete() {
observer.onComplete();
}
};
}
@Override
public Observer<? super retrofit2.Response<T>> apply(Observer<? super T> observer) throws Exception {
return new DisposableObserver<Response<T>>() {
@Override
public void onNext(Response<T> response) {
observer.onNext(response.body());
observer.onComplete();
}
@Override
public void onError(Throwable e) {
observer.onError(e);
}
@Override
public void onComplete() {
observer.onComplete();
}
};
}
@Override
public Observer<? super Response<T>> apply(Observer<? super T> observer) throws Exception {
return new DisposableObserver<Response<T>>() {
@Override
public void onNext(Response<T> response) {
observer.onNext(response.body());
observer.onComplete();
}
@Override
public void onError(Throwable e) {
observer.onError(e);
}
@Override
public void onComplete() {
observer.onComplete();
}
};
}
private DisposableObserver<TextViewTextChangeEvent> userNameTextWatcher() {
return new DisposableObserver<TextViewTextChangeEvent>() {
@Override
public void onNext(TextViewTextChangeEvent textViewTextChangeEvent) {
String searchTerm = textViewTextChangeEvent.text().toString().trim().toLowerCase();
if (searchTerm.length() > 0 && usernameEt.getSelectionEnd() > 0) {
onSearchingUser();
publishSubject.onNext(searchTerm);
} else {
userMentionsSuggestionsView.setVisibility(View.GONE);
}
}
@Override
public void onError(Throwable e) {
}
@Override
public void onComplete() {
}
};
}
private DisposableObserver<LookupAccount> usernamesResponseObserver() {
return new DisposableObserver<LookupAccount>() {
@Override
public void onNext(LookupAccount accounts) {
onUserSuggestionsAvailable(accounts.getmResult());
}
@Override
public void onError(Throwable e) {
}
@Override
public void onComplete() {
}
};
}
private void getLatestPosts() {
PredatorAccount.getAuthToken(getActivity(),
Constants.Authenticator.PREDATOR_ACCOUNT_TYPE,
PredatorSharedPreferences.getAuthTokenType(getContext().getApplicationContext()))
.subscribeOn(Schedulers.newThread())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new DisposableObserver<String>() {
@Override
public void onComplete() {
// Done
}
@Override
public void onError(Throwable e) {
Logger.e(TAG, "onError: " + e.getMessage(), e);
unableToGetPosts(false, getString(R.string.authentication_failure));
}
@Override
public void onNext(String s) {
mCollectionDetailsPresenter.getPosts(s, getArguments().getInt(ARG_COLLECTION_TABLE_COLLECTION_ID));
}
});
}
private DisposableObserver<LookupAccount> getUsernameObserver() {
return new DisposableObserver<LookupAccount>() {
@Override
public void onNext(LookupAccount lookupAccount) {
onUserSuggestionsAvailable(lookupAccount.getmResult());
}
@Override
public void onError(Throwable e) {
}
@Override
public void onComplete() {
}
};
}
private void loadMoreCollections() {
PredatorAccount.getAuthToken(getActivity(),
Constants.Authenticator.PREDATOR_ACCOUNT_TYPE,
PredatorSharedPreferences.getAuthTokenType(getContext().getApplicationContext()))
.subscribeOn(Schedulers.newThread())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new DisposableObserver<String>() {
@Override
public void onComplete() {
// Done
}
@Override
public void onError(Throwable e) {
Logger.e(TAG, "onError: " + e.getMessage(), e);
unableToFetchCollections(true,
false,
getString(R.string.authentication_failure));
}
@Override
public void onNext(String s) {
mCollectionsPresenter.loadMoreCollections(s);
}
});
}
@Override
public void searchArticles(@NonNull int page, @NonNull String keyWords, final boolean forceUpdate, final boolean clearCache) {
Disposable disposable = articlesDataRepository.queryArticles(page, keyWords, forceUpdate, clearCache)
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribeWith(new DisposableObserver<List<ArticleDetailData>>() {
@Override
public void onNext(List<ArticleDetailData> value) {
if (view.isActive()) {
view.showArticles(value);
}
}
@Override
public void onError(Throwable e) {
view.showEmptyView(true);
}
@Override
public void onComplete() {
}
});
compositeDisposable.add(disposable);
}
private void loadLatestDetails(final int userId, final boolean refresh) {
PredatorAccount.getAuthToken(UserProfileActivity.this,
Constants.Authenticator.PREDATOR_ACCOUNT_TYPE,
PredatorSharedPreferences.getAuthTokenType(getApplicationContext()))
.subscribeOn(Schedulers.newThread())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new DisposableObserver<String>() {
@Override
public void onComplete() {
// Done
}
@Override
public void onError(Throwable e) {
Logger.e(TAG, "onError: " + e.getMessage(), e);
unableToFetchDataOnline(getString(R.string.authentication_failure));
}
@Override
public void onNext(String s) {
Logger.d(TAG, "onNext: token fetched ; Thread: " + Thread.currentThread().getName());
mUserProfilePresenter.getLatestData(s, userId, refresh);
}
});
}
DisposableObserver<? super T> disposableWrapper(final ObservableEmitter<? super T> emitter) {
return new DisposableObserver<T>() {
@Override public void onNext(@NonNull T t) {
if (!emitter.isDisposed()) {
emitter.onNext(t);
}
}
@Override public void onError(@NonNull Throwable e) {
if (!emitter.isDisposed()) {
emitter.onError(e);
}
}
@Override public void onComplete() {
if (!emitter.isDisposed()) {
emitter.onComplete();
}
}
};
}
@Override
public void doLoadMore(@Constants.Type String type, Long lastCursor) {
addSubscribe(mDataManager.getNewsList(type, lastCursor, Constants.TOPIC_PAGE_SIZE)
.compose(RxSchedulers.observableIo2Main())
.subscribeWith(new DisposableObserver<DataListBean<NewsBean>>() {
@Override
public void onNext(DataListBean<NewsBean> newsBeanDataListBean) {
getView().bindData(newsBeanDataListBean, false);
}
@Override
public void onError(Throwable e) {
getView().bindData(null, false);
}
@Override
public void onComplete() {
}
}));
}
@Override
public void onAuthTokenRetrieved(Bundle args, String message) {
PredatorAccount.getAuthToken(this,
Constants.Authenticator.PREDATOR_ACCOUNT_TYPE,
PredatorSharedPreferences.getAuthTokenType(getApplicationContext()))
.subscribeOn(Schedulers.newThread())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new DisposableObserver<String>() {
@Override
public void onComplete() {
// Done
}
@Override
public void onError(Throwable e) {
Logger.e(TAG, "onError: " + e.getMessage(), e);
unableToFetchAuthToken(getString(R.string.authentication_failure));
}
@Override
public void onNext(String s) {
mCategoriesPresenter.fetchCategories(getApplicationContext(), s);
}
});
}
@Override
public void getTopicInstantRead(String topicId) {
addSubscribe(mDataManager.getTopicInstantRead(topicId)
.compose(RxSchedulers.observableIo2Main())
.doOnSubscribe(disposable -> getView().onRequestStart())
.subscribeWith(new DisposableObserver<InstantReadBean>() {
@Override
public void onNext(InstantReadBean instantReadBean) {
getView().onRequestEnd(instantReadBean);
}
@Override
public void onError(Throwable e) {
getView().onRequestError();
}
@Override
public void onComplete() {
}
}));
}
public void getFavourite(String id) {
checkViewAttached();
compositeDisposable.add(mDataManager.getFavoriteWorkflow(id)
.observeOn(AndroidSchedulers.mainThread())
.subscribeOn(Schedulers.io())
.subscribeWith(new DisposableObserver<Boolean>() {
@Override
public void onNext(Boolean favoriteStatus) {
getMvpView().getFavouriteIcon(favoriteStatus);
}
@Override
public void onError(Throwable e) {
getMvpView().showErrorSnackBar("Something went wrong please try after " +
"sometime");
}
@Override
public void onComplete() {
}
}));
}
@Override
public void getDiffResult(List<TopicBean> oldData, List<TopicBean> newData) {
addSubscribe(Observable.just(DiffUtil.calculateDiff(new DiffCallback(oldData, newData), false))
.subscribeWith(new DisposableObserver<DiffUtil.DiffResult>() {
@Override
public void onNext(DiffUtil.DiffResult diffResult) {
getView().onDiffResult(diffResult, newData);
}
@Override
public void onError(Throwable e) {
}
@Override
public void onComplete() {
}
}));
}
@Override
public void getRelateTopic(String topicId, int eventType, long order, long timeStamp) {
addSubscribe(mDataManager.getRelateTopic(topicId, eventType, order, timeStamp)
.compose(RxSchedulers.observableIo2Main())
.doOnSubscribe(disposable -> getView().onRequestStart())
.subscribeWith(new DisposableObserver<List<RelevantTopicBean>>() {
@Override
public void onNext(List<RelevantTopicBean> list) {
getView().onRequestTopicEnd(list);
}
@Override
public void onError(Throwable e) {
getView().onRequestError();
}
@Override
public void onComplete() {
}
}));
}
@Override
public void getTopicDetail(String topicId, boolean isPullToRefresh) {
addSubscribe(mDataManager.getTopicDetail(topicId)
.compose(RxSchedulers.observableIo2Main())
.doOnSubscribe(disposable -> getView().showLoading(isPullToRefresh))
.subscribeWith(new DisposableObserver<TopicDetailBean>() {
@Override
public void onNext(TopicDetailBean bean) {
getView().bindData(bean, true);
}
@Override
public void onError(Throwable e) {
getView().showError();
e.printStackTrace();
}
@Override
public void onComplete() {
getView().showContent();
}
}));
}
@Override
public Observer<? super Response<T>> apply(Observer<? super T> observer) throws Exception {
return new DisposableObserver<Response<T>>() {
@Override
public void onNext(Response<T> response) {
observer.onNext(response.body());
observer.onComplete();
}
@Override
public void onError(Throwable e) {
observer.onError(e);
}
@Override
public void onComplete() {
observer.onComplete();
}
};
}
@Override
public Observer<? super Response<T>> apply(Observer<? super T> observer) throws Exception {
return new DisposableObserver<Response<T>>() {
@Override
public void onNext(Response<T> response) {
observer.onNext(response.body());
observer.onComplete();
}
@Override
public void onError(Throwable e) {
observer.onError(e);
}
@Override
public void onComplete() {
observer.onComplete();
}
};
}
public static <T> DisposableObserver<T> disposableObserverFromEmitter(final ObservableEmitter<T> emitter) {
return new DisposableObserver<T>() {
@Override
public void onNext(T t) {
emitter.onNext(t);
}
@Override
public void onError(Throwable e) {
emitter.tryOnError(e);
}
@Override
public void onComplete() {
emitter.onComplete();
}
};
}
public void loadWorkflowDetail(String id) {
checkViewAttached();
getMvpView().showProgressbar(true);
compositeDisposable.add(mDataManager.getDetailWorkflow(id, getDetailQueryOptions())
.observeOn(AndroidSchedulers.mainThread())
.subscribeOn(Schedulers.io())
.subscribeWith(new DisposableObserver<Workflow>() {
@Override
public void onNext(Workflow workflow) {
getMvpView().showWorkflowDetail(workflow);
loadUserDetail(workflow.getUploader().getId());
getFavourite(workflow.getId());
}
@Override
public void onError(Throwable e) {
getMvpView().showProgressbar(false);
}
@Override
public void onComplete() {
getMvpView().showProgressbar(false);
}
}));
}
@Override
public void getRecommendedWares() {
addDisposabe(mDataManager.getData(new DisposableObserver<HomeIndex>() {
@Override
public void onNext(HomeIndex homeIndex) {
mHomeView.setRecommendedWares(homeIndex);
}
@Override
public void onError(Throwable e) {
}
@Override
public void onComplete() {
}
},HomeIndex.class, "recommend.txt"));
}
@Override
public void getMoreRecommendedWares() {
addDisposabe(mDataManager.getData(new DisposableObserver<HomeIndex>() {
@Override
public void onNext(HomeIndex homeIndex) {
mHomeView.setMoreRecommendedWares(homeIndex);
}
@Override
public void onError(Throwable e) {
}
@Override
public void onComplete() {
}
},HomeIndex.class, "recommended.txt"));
}
@Override
public void getMoreFindData() {
addDisposabe(mDataManager.getData(new DisposableObserver<FindsBean>() {
@Override
public void onNext(FindsBean findsBean) {
mFindView.setMoreData(findsBean);
}
@Override
public void onError(Throwable e) {
}
@Override
public void onComplete() {
}
},FindsBean.class, "find.txt"));
}
public void loadWorkflowDetail(String id) {
checkViewAttached();
getMvpView().showProgressbar(true);
compositeDisposable.add(mDataManager.getFavoriteDetailWorkflow(id)
.observeOn(AndroidSchedulers.mainThread())
.subscribeOn(Schedulers.io())
.subscribeWith(new DisposableObserver<Workflow>() {
@Override
public void onNext(Workflow workflow) {
getMvpView().showWorkflowDetail(workflow);
loadUserDetail(workflow.getUploader().getId());
getFavourite(workflow.getId());
}
@Override
public void onError(Throwable e) {
getMvpView().showProgressbar(false);
}
@Override
public void onComplete() {
getMvpView().showProgressbar(false);
}
}));
}
private DisposableObserver<TextViewTextChangeEvent> getSearchObserver() {
return new DisposableObserver<TextViewTextChangeEvent>() {
@Override
public void onComplete() {
}
@Override
public void onError(Throwable e) {
}
@Override
public void onNext(TextViewTextChangeEvent onTextChangeEvent) {
String query = onTextChangeEvent.text().toString().trim();
if (query.toString().length() == 0) {
mImageButtonClear.setVisibility(View.INVISIBLE);
} else {
mImageButtonClear.setVisibility(View.VISIBLE);
}
onQueryTextChange(query);
}
};
}