下面列出了怎么用io.reactivex.SingleObserver的API类实例代码及写法,或者点击链接到github查看源代码。
/**
* fetch leaderboard
*/
private void loadLeaders() {
RetrofitServiceGenerator.getService().getLeaderboardList()
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribeWith(new SingleObserver<LeaderboardModel>() {
@Override
public void onSubscribe(Disposable d) {
compositeDisposable.add(d);
}
@Override
public void onSuccess(LeaderboardModel leaderboardModel) {
handleLeaderBoardList(leaderboardModel);
}
@Override
public void onError(Throwable e) {
}
});
}
@Override
public void addHistory(SearchHistoryBean newDataBean) {
mDataManager.getSingleHistory(newDataBean.getKeyWord())
.compose(RxSchedulers.singleIo2Main())
.subscribe(new SingleObserver<SearchHistoryBean>() {
@Override
public void onSubscribe(Disposable d) {
addSubscribe(d);
}
@Override
public void onSuccess(SearchHistoryBean oldDataBean) {
oldDataBean.setTime(newDataBean.getTime());
updateHistory(oldDataBean);
}
@Override
public void onError(Throwable e) {
if (e instanceof EmptyResultSetException) {
mDataManager.insert(newDataBean);
}
}
});
}
@Test
public void testContainsObservable() {
Observable.just(1, 2, 3, 4, 5, 6)
.contains(10)
.subscribe(new SingleObserver<Boolean>() {
@Override
public void onSubscribe(Disposable d) {
}
@Override
public void onSuccess(Boolean aBoolean) {
System.out.println("Does list contain value 10: " + aBoolean);
}
@Override
public void onError(Throwable e) {
}
});
}
@Test
public void testCountObservable() {
Observable.just(1, 2, 3, 4, 5)
.count()
.subscribe(new SingleObserver<Long>() {
@Override
public void onSubscribe(Disposable d) {
}
@Override
public void onSuccess(Long aLong) {
System.out.println("Count: " + aLong);
}
@Override
public void onError(Throwable e) {
}
});
}
@Override
public SingleObserver<? super T> apply(final SingleObserver<? super T> observer) {
return new SingleObserver<T>() {
@Override
public void onSubscribe(Disposable d) {
if (subscribedOnce.getAndSet(true)) {
throw new NullPointerException("You cannot directly subscribe to a gRPC service multiple times " +
"concurrently. Use Flowable.share() instead.");
} else {
observer.onSubscribe(d);
}
}
@Override
public void onSuccess(T t) {
observer.onSuccess(t);
}
@Override
public void onError(Throwable e) {
observer.onError(e);
}
};
}
@Test
public void subscribeOnlyOnceSingleOperatorErrorsWhenMultipleSubscribe() {
SubscribeOnlyOnceSingleOperator<Object> op = new SubscribeOnlyOnceSingleOperator<Object>();
SingleObserver<Object> innerSub = mock(SingleObserver.class);
final Disposable disposable = mock(Disposable.class);
final SingleObserver<Object> outerSub = op.apply(innerSub);
outerSub.onSubscribe(disposable);
assertThatThrownBy(new ThrowableAssert.ThrowingCallable() {
@Override
public void call() {
outerSub.onSubscribe(disposable);
}
})
.isInstanceOf(NullPointerException.class)
.hasMessageContaining("cannot directly subscribe to a gRPC service multiple times");
verify(innerSub, times(1)).onSubscribe(disposable);
}
private void refreshBook(){
RemoteRepository
.getInstance()
.getBookDetail(bookId)
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new SingleObserver<BookDetailBean>() {
@Override
public void onSubscribe(Disposable d) {
addDisposable(d);
}
@Override
public void onSuccess(BookDetailBean value){
mView.finishRefresh(value);
mView.complete();
}
@Override
public void onError(Throwable e) {
mView.showError();
}
});
}
@Override public void renameFile(int adapterPosition, String value) {
rename(recordingItems.get(adapterPosition), adapterPosition, value).subscribe(
new SingleObserver<Integer>() {
@Override public void onSubscribe(Disposable d) {
}
@Override public void onSuccess(Integer position) {
getAttachedView().notifyListItemChange(position);
}
@Override public void onError(Throwable e) {
getAttachedView().showError(e.getMessage());
}
});
}
@SuppressLint("CheckResult")
private void takeScreenshot() {
microLoader.takeScreenshot((Canvas) current)
.subscribeOn(Schedulers.computation())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new SingleObserver<String>() {
@Override
public void onSubscribe(Disposable d) {
}
@Override
public void onSuccess(String s) {
Toast.makeText(MicroActivity.this, getString(R.string.screenshot_saved)
+ " " + s, Toast.LENGTH_LONG).show();
}
@Override
public void onError(Throwable e) {
e.printStackTrace();
Toast.makeText(MicroActivity.this, R.string.error, Toast.LENGTH_SHORT).show();
}
});
}
@Override
protected void subscribeActual(SingleObserver<? super Member<T>> observer) {
// the action of checking out a member from the pool is implemented as a
// subscription to the singleton MemberSingle
MemberSingleObserver<T> m = new MemberSingleObserver<T>(observer, this);
observer.onSubscribe(m);
if (pool.isClosed()) {
observer.onError(new PoolClosedException());
return;
}
add(m);
if (m.isDisposed()) {
remove(m);
} else {
// atomically change requested
while (true) {
Observers<T> a = observers.get();
if (observers.compareAndSet(a, a.withRequested(a.requested + 1))) {
break;
}
}
}
log.debug("subscribed");
drain();
}
private void postUread(String boardName, long threadId, @NonNull UnreadCountUpdate callback) {
HistoryTableConnection.fetchPost(boardName, threadId)
.subscribeOn(Schedulers.computation())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new SingleObserver<History>() {
@Override
public void onSubscribe(Disposable d) {
// no op
}
@Override
public void onSuccess(History history) {
callback.OnUnreadCountUpdate(history.unreadCount);
}
@Override
public void onError(Throwable e) {
Log.e(LOG_TAG, "Error setting unread count badge", e);
}
});
}
void autoDisable() {
Helper.autoDisable(context, appInfo.packageName)
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new SingleObserver<SparseIntArray>() {
@Override
public void onSubscribe(Disposable d) {
}
@Override
public void onSuccess(SparseIntArray value) {
autoDisabled = true;
load();
}
@Override
public void onError(Throwable e) {
autoDisabled = true;
load();
}
});
}
void reset(){
Helper.resetMode(context, appInfo.packageName)
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new SingleObserver<OpsResult>(){
@Override
public void onSubscribe(@NonNull Disposable d) {
}
@Override
public void onSuccess(@NonNull OpsResult opsResult) {
load();
}
@Override
public void onError(@NonNull Throwable e) {
}
});
}
private void loadAppinfo(String pkgName){
Helper.getAppInfo(getApplicationContext(),pkgName)
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new SingleObserver<AppInfo>() {
@Override
public void onSubscribe(@NonNull Disposable d) {
}
@Override
public void onSuccess(@NonNull AppInfo appInfo) {
setTitle(appInfo.appName);
}
@Override
public void onError(@NonNull Throwable e) {
}
});
}
private void loadUsers(){
Helper.getUsers(getApplicationContext(),true).subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new SingleObserver<List<UserInfo>>() {
@Override
public void onSubscribe(Disposable d) {
}
@Override
public void onSuccess(List<UserInfo> userInfos) {
Users.getInstance().updateUsers(userInfos);
invalidateOptionsMenu();
}
@Override
public void onError(Throwable e) {
}
});
}
private void closeServer() {
Helper.closeBgServer(getActivity().getApplicationContext()).subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread()).subscribe(new SingleObserver<Boolean>() {
@Override
public void onSubscribe(Disposable d) {
}
@Override
public void onSuccess(Boolean value) {
Activity activity = getActivity();
if (activity != null) {
Toast.makeText(activity, R.string.bg_closed, Toast.LENGTH_SHORT).show();
}
}
@Override
public void onError(Throwable e) {
}
});
}
private void showDlg(final Context context, String pkg) {
Helper.getAppInfo(context, pkg)
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new SingleObserver<AppInfo>() {
@Override
public void onSubscribe(Disposable d) {
}
@Override
public void onSuccess(AppInfo value) {
Intent intent = new Intent(context, AlertInstalledPremActivity.class);
intent.putExtra(AlertInstalledPremActivity.EXTRA_APP, value);
intent.addFlags(Intent.FLAG_ACTIVITY_NEW_TASK);
intent.addFlags(Intent.FLAG_ACTIVITY_NO_ANIMATION);
context.startActivity(intent);
}
@Override
public void onError(Throwable e) {
}
});
}
/**
* Performs the opposite translation from {@link
* MaybeConsumers#fromSingleObserver(SingleObserver)}
*/
public static <T> SingleObserver<T> toSingleObserver(final MaybeConsumer<T> c) {
return new SingleObserver<T>() {
@Override
public void onSubscribe(@NonNull Disposable d) {
// do nothing
}
@Override
public void onSuccess(@NonNull T t) {
c.success(t);
}
@Override
public void onError(@NonNull Throwable e) {
c.fail(new RuntimeException(e));
}
};
}
/**
* @return SingleObserver with which observable will subscribe
*/
public static <T> SingleObserver<T> getSingleObserver(final CompositeDisposable compositeDisposable,
final WebserviceBuilder.ApiNames apiNames,
final SingleCallback tSingleCallback) {
return new SingleObserver<T>() {
@Override
public void onSubscribe(Disposable d) {
if (compositeDisposable != null) compositeDisposable.add(d);
}
@Override
public void onSuccess(@NonNull T t) {
if (tSingleCallback != null) tSingleCallback.onSingleSuccess(t, apiNames);
}
@Override
public void onError(Throwable e) {
if (tSingleCallback != null) tSingleCallback.onFailure(e, apiNames);
}
};
}
private SingleObserver<Integer> getObserver() {
return new SingleObserver<Integer>() {
@Override
public void onSubscribe(Disposable d) {
Log.d(TAG, " onSubscribe : " + d.isDisposed());
}
@Override
public void onSuccess(Integer value) {
textView.append(" onSuccess : value : " + value);
textView.append(AppConstant.LINE_SEPARATOR);
Log.d(TAG, " onSuccess : value : " + value);
}
@Override
public void onError(Throwable e) {
textView.append(" onError : " + e.getMessage());
textView.append(AppConstant.LINE_SEPARATOR);
Log.d(TAG, " onError : " + e.getMessage());
}
};
}
private SingleObserver<String> getSingleObserver() {
return new SingleObserver<String>() {
@Override
public void onSubscribe(Disposable d) {
Log.d(TAG, " onSubscribe : " + d.isDisposed());
}
@Override
public void onSuccess(String value) {
textView.append(" onNext : value : " + value);
textView.append(AppConstant.LINE_SEPARATOR);
Log.d(TAG, " onNext value : " + value);
}
@Override
public void onError(Throwable e) {
textView.append(" onError : " + e.getMessage());
textView.append(AppConstant.LINE_SEPARATOR);
Log.d(TAG, " onError : " + e.getMessage());
}
};
}
private SingleObserver<String> getObserver() {
return new SingleObserver<String>() {
@Override
public void onSubscribe(Disposable d) {
Log.d(TAG, " onSubscribe : " + d.isDisposed());
}
@Override
public void onSuccess(String value) {
textView.append(" onNext : value : " + value);
textView.append(AppConstant.LINE_SEPARATOR);
Log.d(TAG, " onNext value : " + value);
}
@Override
public void onError(Throwable e) {
textView.append(" onError : " + e.getMessage());
textView.append(AppConstant.LINE_SEPARATOR);
Log.d(TAG, " onError : " + e.getMessage());
}
};
}
protected <T> SingleObserver<T> countdownLatchHandler(final CountDownLatch latch){
return new SingleObserver<T>() {
@Override
public void onSubscribe(Disposable d) {
}
@Override
public void onSuccess(T t) {
latch.countDown();
}
@Override
public void onError(Throwable x) {
x.printStackTrace();
Assert.fail(x.getMessage());
latch.countDown();
}
};
}
private void loadArticle() {
SingleObserver observer = new SingleObserver<Story>() {
@Override
public void onSubscribe(Disposable d) {
}
@Override
public void onSuccess(Story story) {
mWebView.loadUrl(story.getUrl());
hideArticleUnavailable();
}
@Override
public void onError(Throwable e) {
showArticleUnavailable();
mSwipeRefreshLayout.setRefreshing(false);
}
};
getStory().subscribeOn(Schedulers.newThread())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(observer);
}
private void loadArticleTitle() {
SingleObserver observer = new SingleObserver<Story>() {
@Override
public void onSubscribe(Disposable d) {
}
@Override
public void onSuccess(Story story) {
getSupportActionBar().setTitle(story.getTitle());
}
@Override
public void onError(Throwable e) {
}
};
mGetStory.subscribeOn(Schedulers.newThread())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(observer);
}
private void single() {
Single.just(1).subscribe(new SingleObserver<Integer>() {
@Override
public void onSubscribe(Disposable d) {
Log.d(TAG, "onSubscribe: " + d.isDisposed());
}
@Override
public void onSuccess(Integer integer) {
Log.d(TAG, "onSuccess: " + integer);
}
@Override
public void onError(Throwable e) {
}
});
}
@Override
protected void subscribeActual(SingleObserver<? super T> observer) {
ValueSourceDisposable disposable = new ValueSourceDisposable();
valueSource.addCallbacks(
result -> {
try {
observer.onSuccess(result);
} catch (Throwable e) {
observer.onError(e);
}
},
ex -> {
if (!disposable.isDisposed()) {
observer.onError(ex);
}
}
);
observer.onSubscribe(disposable);
}
/**
* Adapts an Vert.x {@code Handler<AsyncResult<T>>} to an RxJava2 {@link SingleObserver}.
* <p>
* The returned observer can be subscribed to an {@link Single#subscribe(SingleObserver)}.
*
* @param handler the handler to adapt
* @return the observer
*/
public static <T> SingleObserver<T> toObserver(Handler<AsyncResult<T>> handler) {
AtomicBoolean completed = new AtomicBoolean();
return new SingleObserver<T>() {
@Override
public void onSubscribe(@NonNull Disposable d) {
}
@Override
public void onSuccess(@NonNull T item) {
if (completed.compareAndSet(false, true)) {
handler.handle(Future.succeededFuture(item));
}
}
@Override
public void onError(Throwable error) {
if (completed.compareAndSet(false, true)) {
handler.handle(Future.failedFuture(error));
}
}
};
}
private void subscribeActual(SingleObserver<? super T> observer) {
Single<T> upStream = this.upStream;
if (onMain) {
upStream = upStream.observeOn(AndroidSchedulers.mainThread());
}
upStream.onTerminateDetach().subscribe(new LifeSingleObserver<>(observer, scope));
}
private void registerPostPermlink() {
SingleObserver<CompetitionCreateResponse> temp = RetrofitServiceGenerator.getService().registerCompetitionPermlink(
mCompetitionId,
REGISTER_PERMLINK_WINNER_DECLARE_TYPE,
getFullpermlink())
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribeWith(new SingleObserver<CompetitionCreateResponse>() {
@Override
public void onSubscribe(Disposable d) {
}
@Override
public void onSuccess(CompetitionCreateResponse competitionCreateResponse) {
showProgressDialog(false, "");
HaprampPreferenceManager.getInstance().setLastPostCreatedAt(MomentsUtils.getCurrentTime());
Toast.makeText(WinnerDeclarationActivity.this, "Winners blog posted!", Toast.LENGTH_LONG).show();
finish();
}
@Override
public void onError(Throwable e) {
e.printStackTrace();
}
});
}