下面列出了怎么用io.reactivex.Observer的API类实例代码及写法,或者点击链接到github查看源代码。
@Override
public Observer<? super Response<T>> apply(Observer<? super T> observer) throws Exception {
return new DisposableObserver<Response<T>>() {
@Override
public void onNext(Response<T> response) {
observer.onNext(response.body());
observer.onComplete();
}
@Override
public void onError(Throwable e) {
observer.onError(e);
}
@Override
public void onComplete() {
observer.onComplete();
}
};
}
private void goFindMoreData() {
mPageNo ++;
socialApiFactory.getImFeedList(mPageNo,PAGE_NUM, new Observer<List<Feed>>() {
@Override
public void onSubscribe(Disposable disposable) {
}
@Override
public void onNext(List<Feed> feeds) {
mFeeds.addAll(feeds);
mFeedAdapter.setData(mFeeds);
mRefreshLayout.endLoadingMore();
}
@Override
public void onError(Throwable throwable) {
}
@Override
public void onComplete() {
}
});
}
public void testFirstQuery() throws Exception {
AVQuery query = new AVQuery("Student");
query.orderByDescending(AVObject.KEY_CREATED_AT);
query.getFirstInBackground().subscribe(new Observer<AVObject>() {
public void onSubscribe(Disposable disposable) {
}
public void onNext(AVObject o) {
System.out.println(o.toString());
testSucceed = true;
latch.countDown();
}
public void onError(Throwable throwable) {
latch.countDown();
}
public void onComplete() {
System.out.println("completed.");
latch.countDown();
}
});
latch.await();
assertTrue(testSucceed);
}
public void getDailyGankDataFromServer(String date, Observer observer) {
service.getGankData(date)
.map(new Function<GankContent, GankContent.Result>() {
@Override
public GankContent.Result apply(GankContent gankContent) throws Exception {
return gankContent.results;
}
}).map(new Function<GankContent.Result, List<Gank>>() {
@Override
public List<Gank> apply(GankContent.Result result) throws Exception {
return addAllResult(result);
}
}).subscribeOn(Schedulers.io()) //在io线程进行数据的读取 放在任何地方都可以
.observeOn(AndroidSchedulers.mainThread()) //在主线程处理数据 指定的是它之后的操作的线程,因此如果需要多次切换线程,可指定多次
.subscribe(observer);
/*service.getGankData(date).flatMap(new Func1<GankContent, Observable<Gank>>() {
@Override public Observable<Gank> call(GankContent gankContent) {
return Observable.from(gankContent.results);
}
})
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread()).subscribe(subscriber);*/
}
public void query() {
getModel().queryForAllSync().subscribe(new Observer<List<City>>() {
@Override
public void onSubscribe(Disposable d) {
getRxManager().add(d);
}
@Override
public void onNext(List<City> data) {
queryFinish(data);
}
@Override
public void onError(Throwable e) {
}
@Override
public void onComplete() {
}
});
}
public void testCloudFunction() {
String name = "hallo";
Map<String, Object> param = new HashMap<String, Object>();
Observable<JSONObject> res = AVCloud.callFunctionInBackground(name, param);
res.subscribe(new Observer<JSONObject>() {
@Override
public void onSubscribe(Disposable disposable) {
}
@Override
public void onNext(JSONObject jsonObject) {
System.out.println("结果 = " + jsonObject);
}
@Override
public void onError(Throwable throwable) {
System.out.println("error occurred! " + throwable);
}
@Override
public void onComplete() {
}
});
}
/**
* Allows a function that takes a MaybeConsumer to pipe a single success value to the given
* Observer (which may also be accepting values from other places)
*
* @return a {@link MaybeConsumer<T>} that pipes {@link MaybeConsumer#success(Object)} to {@link
* Observer#onNext(Object)}, and {@link MaybeConsumer#fail(Exception)} to {@link
* Observer#onError(Throwable)}
*/
public static <T> MaybeConsumer<T> fromObserver(Observer<T> o) {
return new MaybeConsumer<T>() {
@Override
public void success(T value) {
// if value is null, just report empty
if (value != null) {
o.onNext(value);
}
o.onComplete();
}
@Override
public void fail(Exception e) {
o.onError(e);
}
};
}
public void testFindObject() {
AVQuery query = new AVQuery("Student");
query.limit(4);
query.orderByDescending(AVObject.KEY_CREATED_AT);
query.findInBackground().subscribe(new Observer<List<AVObject>>() {
public void onSubscribe(Disposable disposable) {
}
public void onNext(List<AVObject> o) {
for(AVObject obj: o) {
System.out.println("Query of Student is: " + obj.toString());
}
}
public void onError(Throwable throwable) {
fail();
}
public void onComplete() {
}
});
}
public static void requestPermission(final Activity activity, final PermissionsListener listener, final String... permissions) {
RxPermissions rxPermissions = new RxPermissions(activity);
rxPermissions.request(permissions)
.subscribe(new Observer<Boolean>() {
@Override
public void onSubscribe(Disposable d) {
}
@Override
public void onNext(Boolean aBoolean) {
if (aBoolean) {
listener.onGranted();
} else {
Toast.makeText(activity, R.string.permission_request_denied, Toast.LENGTH_LONG)
.show();
}
}
@Override
public void onError(Throwable e) {
}
@Override
public void onComplete() {
}
});
}
private Observer<Integer> getSecondObserver() {
return new Observer<Integer>() {
@Override
public void onSubscribe(Disposable d) {
textView.append(" Second onSubscribe : isDisposed :" + d.isDisposed());
Log.d(TAG, " Second onSubscribe : " + d.isDisposed());
textView.append(AppConstant.LINE_SEPARATOR);
}
@Override
public void onNext(Integer value) {
textView.append(" Second onNext : value : " + value);
textView.append(AppConstant.LINE_SEPARATOR);
Log.d(TAG, " Second onNext value : " + value);
}
@Override
public void onError(Throwable e) {
textView.append(" Second onError : " + e.getMessage());
textView.append(AppConstant.LINE_SEPARATOR);
Log.d(TAG, " Second onError : " + e.getMessage());
}
@Override
public void onComplete() {
textView.append(" Second onComplete");
textView.append(AppConstant.LINE_SEPARATOR);
Log.d(TAG, " Second onComplete");
}
};
}
private static void streamInto(InputStream inputStream, Observer<byte[]> observer) throws IOException {
byte[] bytes = new byte[STDIO_BUFFER_LEN_IN_BYTES];
int bufLen;
while((bufLen = inputStream.read(bytes, 0, STDIO_BUFFER_LEN_IN_BYTES)) != -1) {
// Copy is necessary because observers might assume the buffer is
// immutable, coming from an observable.
byte[] outputBytes = new byte[bufLen];
arraycopy(bytes, 0, outputBytes, 0, bufLen);
observer.onNext(outputBytes);
}
observer.onComplete();
}
public void testUploaderExternelUrlWithObserver() throws Exception {
AVFile file = new AVFile("test", "http://cms-bucket.ws.126.net/2020/0401/8666ec9dp00q83fid008oc000m801n8c.png");
Observable<AVFile> result = file.saveInBackground();
result.subscribe(new Observer<AVFile>() {
@Override
public void onSubscribe(Disposable d) {
;
}
@Override
public void onNext(AVFile avFile) {
log("Thread:" + Thread.currentThread().getId());
log("保存了一个File:" + avFile.getObjectId());
Toast.makeText(FileDemoActivity.this, "上传成功", Toast.LENGTH_SHORT).show();
}
@Override
public void onError(Throwable e) {
Toast.makeText(FileDemoActivity.this, "上传失败", Toast.LENGTH_SHORT).show();
}
@Override
public void onComplete() {
}
});
}
/**
* Sends this push notification in a background thread. This is preferable to using send(), unless
* your code is already running from a background thread.
*
* @param callback callback.done(e) is called when the send completes.
*/
public void sendInBackground(final SendCallback callback) {
sendInBackground().subscribe(new Observer<JSONObject>() {
@Override
public void onSubscribe(Disposable disposable) {
}
@Override
public void onNext(JSONObject jsonObject) {
notification = new AVObject("_Notification");
notification.resetServerData(jsonObject.getInnerMap());
if (null != callback) {
callback.internalDone(null);
}
}
@Override
public void onError(Throwable throwable) {
if (null != callback) {
callback.internalDone(new AVException(throwable));
}
}
@Override
public void onComplete() {
}
});
}
@Override
public void getArticlesFromWechatChapter(int pageNum, int cid) {
model.getArticlesFromWechatChapter(pageNum, cid)
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.compose(((RxFragment) view).bindToLifecycle())
.subscribe(new Observer<ArticleDataRes>() {
@Override
public void onSubscribe(Disposable d) {
}
@Override
public void onNext(ArticleDataRes articleDataRes) {
if (view != null) {
if (articleDataRes.getErrorCode() == 0) {
view.updateArticles(articleDataRes.getData());
}
}
}
@Override
public void onError(Throwable e) {
Log.e(TAG, " getArticlesFromWechatChapter" + e.getMessage());
}
@Override
public void onComplete() {
}
});
}
public void testDateAttribute() throws Exception {
final Date now = new Date();
AVObject object = new AVObject("Student");
object.put("name", "Automatic Tester");
object.put("age", 18);
object.put("grade", null);
object.put("lastOcc", now);
object.setFetchWhenSave(true);
object.saveInBackground().subscribe(new Observer<AVObject>() {
@Override
public void onSubscribe(Disposable disposable) {
}
@Override
public void onNext(AVObject avObject) {
System.out.println("saveObject field finished.");
Date savedDate = avObject.getDate("lastOcc");
testSucceed = now.equals(savedDate);
latch.countDown();
}
@Override
public void onError(Throwable throwable) {
latch.countDown();
}
@Override
public void onComplete() {
}
});
latch.await();
assertTrue(testSucceed);
}
public void testStatusCountQuery() throws Exception {
AVUser currentUser = AVUser.currentUser();
AVStatus.statusQuery(currentUser).countInBackground().subscribe(new Observer<Integer>() {
@Override
public void onSubscribe(Disposable disposable) {
}
@Override
public void onNext(Integer integer) {
testSucceed = true;
latch.countDown();
}
@Override
public void onError(Throwable throwable) {
latch.countDown();
}
@Override
public void onComplete() {
}
});
latch.await();
assertTrue(testSucceed);
}
public void testQueryCacheElseNetworking() throws Exception {
AVQuery query = new AVQuery("Student");
query.orderByDescending(AVObject.KEY_CREATED_AT);
query.limit(5);
query.skip(1);
query.setCachePolicy(AVQuery.CachePolicy.CACHE_ELSE_NETWORK);
query.findInBackground().subscribe(new Observer<List<AVObject>>() {
public void onSubscribe(Disposable disposable) {
}
public void onNext(List<AVObject> o) {
for (AVObject j: o) {
System.out.println("found result: " + j.toString());
}
testSucceed = true;
latch.countDown();
}
public void onError(Throwable throwable) {
throwable.printStackTrace();
latch.countDown();
}
public void onComplete() {
}
});
latch.await();
assertTrue(testSucceed);
}
@Override
public void getWechatChapters() {
model.getWechatChapters()
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.compose(((RxFragment) view).bindToLifecycle())
.subscribe(new Observer<PrimaryArticleDirectoryRes>() {
@Override
public void onSubscribe(Disposable d) {
}
@Override
public void onNext(PrimaryArticleDirectoryRes primaryArticleDirectoryRes) {
if (view != null) {
if (primaryArticleDirectoryRes.getErrorCode() == 0) {
view.updateWechatChapter(primaryArticleDirectoryRes.getData());
}
}
}
@Override
public void onError(Throwable e) {
Log.e(TAG, "getWechatChapters() errors: " + e.getMessage());
if (view != null) {
view.networkError();
}
}
@Override
public void onComplete() {
}
});
}
public void testInboxQueryCountWithoutLogin() throws Exception {
AVUser currentUser = AVUser.currentUser();
final String currentUserObjectId = currentUser.getObjectId();
currentUser.logOut();
AVUser owner = AVObject.createWithoutData(AVUser.class, currentUserObjectId);
AVStatus.inboxQuery(owner, AVStatus.INBOX_TYPE.PRIVATE.toString())
.unreadCountInBackground()
.subscribe(new Observer<JSONObject>() {
@Override
public void onSubscribe(Disposable disposable) {
}
@Override
public void onNext(JSONObject jsonObject) {
latch.countDown();
}
@Override
public void onError(Throwable throwable) {
throwable.printStackTrace();
testSucceed = true;
latch.countDown();
}
@Override
public void onComplete() {
}
});
latch.await();
assertTrue(testSucceed);
}
private void setCurrentAccountFromDB() {
this.getAccountFromDB()
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Observer<Account>() {
@Override
public void onSubscribe(Disposable d) {
}
@Override
public void onNext(Account account) {
AccountManager.getInstance().setCurrentAccount(account);
}
@Override
public void onError(Throwable e) {
}
@Override
public void onComplete() {
}
});
}
public void clearAccount() {
Observable<Boolean> observable = Observable.create(emitter -> {
boolean isSuccess = SharedPreferenceUtils.removeData(PREF_NAME, USERNAME);
emitter.onNext(isSuccess);
emitter.onComplete();
});
observable.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Observer<Boolean>() {
@Override
public void onSubscribe(Disposable d) {
}
@Override
public void onNext(Boolean aBoolean) {
if (aBoolean) {
AccountManager.getInstance().setCurrentAccount(null);
}
}
@Override
public void onError(Throwable e) {
}
@Override
public void onComplete() {
}
});
}
public static void main(String[] args) {
// TODO Auto-generated method stub
Observable.merge(Observable.range(1, 5), Observable.range(100, 5)).subscribe(new Observer<Integer>() {
@Override
public void onComplete() {
// TODO Auto-generated method stub
System.out.println("items merged successfully");
}
@Override
public void onError(Throwable throwable) {
// TODO Auto-generated method stub
throwable.printStackTrace();
}
@Override
public void onNext(Integer value) {
// TODO Auto-generated method stub
System.out.println("collected item: " + value);
}
@Override
public void onSubscribe(Disposable disposable) {
// TODO Auto-generated method stub
}
});
}
private synchronized void doUpdate() {
upIndex++;
if (upIndex < searchBookBeanList.size()) {
toBookshelf(searchBookBeanList.get(upIndex))
.flatMap(this::getChapterList)
.flatMap(this::saveSearchBookBean)
.subscribeOn(scheduler)
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Observer<SearchBookBean>() {
@Override
public void onSubscribe(Disposable d) {
compositeDisposable.add(d);
handler.postDelayed(() -> {
if (!d.isDisposed()) {
d.dispose();
doUpdate();
}
}, 20 * 1000);
}
@Override
public void onNext(SearchBookBean searchBookBean) {
RxBus.get().post(RxBusTag.UP_SEARCH_BOOK, searchBookBean);
doUpdate();
}
@Override
public void onError(Throwable e) {
doUpdate();
}
@Override
public void onComplete() {
}
});
}
}
/**
* 发送数据
*/
public void sendData(final List<byte[]> data, final SendDataCallback callback) {
Observable.create(new ObservableOnSubscribe<String>() {
@Override
public void subscribe(ObservableEmitter<String> emitter) throws Exception {
OutputStream outputStream = mBluetoothSocket.getOutputStream();
for (int i = 0; i < data.size(); i++) {
outputStream.write(data.get(i));
outputStream.flush();
}
}
}).subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Observer<String>() {
@Override
public void onSubscribe(Disposable d) {
}
@Override
public void onNext(String s) {
callback.sendSuccess();
}
@Override
public void onError(Throwable e) {
callback.sendError(e);
}
@Override
public void onComplete() {
}
});
}
public void testQueryWithEmptyResult() throws Exception {
AVQuery query = new AVQuery("Student");
query.whereGreaterThan("age", 119);
query.orderByDescending(AVObject.KEY_CREATED_AT);
query.findInBackground().subscribe(new Observer<List<AVObject>>() {
public void onSubscribe(Disposable disposable) {
}
public void onNext(List<AVObject> list) {
System.out.println("onNext result size: " + list.size());
testSucceed = true;
latch.countDown();
}
public void onError(Throwable throwable) {
latch.countDown();
}
public void onComplete() {
System.out.println("completed.");
latch.countDown();
}
});
latch.await();
assertTrue(testSucceed);
}
public void getFollowersAndFolloweesInBackground(final FollowersAndFolloweesCallback callback) {
if (null == callback) {
return;
}
if (!checkUserAuthentication(callback)) {
return;
}
PaasClient.getStorageClient().getFollowersAndFollowees(getObjectId()).subscribe(new Observer<JSONObject>() {
@Override
public void onSubscribe(Disposable disposable) {
}
@Override
public void onNext(JSONObject jsonObject) {
if (null == jsonObject) {
callback.done(null, null);
} else {
Map<String, List<AVUser>> result = parseFollowerAndFollowee(jsonObject);
callback.done(result, null);
}
}
@Override
public void onError(Throwable throwable) {
callback.done(null, new AVException(throwable));
}
@Override
public void onComplete() {
}
});
}
public void testLocalFileWithoutKeepFileName() throws Exception {
File currentFile = new File("./20160704174809.jpeg");
AVFile file = new AVFile("20160704174809.jpeg", currentFile);
file.saveInBackground().subscribe(new Observer<AVFile>() {
@Override
public void onSubscribe(Disposable disposable) {
}
@Override
public void onNext(AVFile avFile) {
testSucceed = true;
latch.countDown();
}
@Override
public void onError(Throwable throwable) {
throwable.printStackTrace();
latch.countDown();
}
@Override
public void onComplete() {
}
});
latch.await();
assertTrue(testSucceed);
}
private Observer<List<String>> getObserver() {
return new Observer<List<String>>() {
@Override
public void onSubscribe(Disposable d) {
Log.d(TAG, " onSubscribe : " + d.isDisposed());
}
@Override
public void onNext(List<String> stringList) {
textView.append(" onNext size : " + stringList.size());
textView.append(AppConstant.LINE_SEPARATOR);
Log.d(TAG, " onNext : size :" + stringList.size());
for (String value : stringList) {
textView.append(" value : " + value);
textView.append(AppConstant.LINE_SEPARATOR);
Log.d(TAG, " : value :" + value);
}
}
@Override
public void onError(Throwable e) {
textView.append(" onError : " + e.getMessage());
textView.append(AppConstant.LINE_SEPARATOR);
Log.d(TAG, " onError : " + e.getMessage());
}
@Override
public void onComplete() {
textView.append(" onComplete");
textView.append(AppConstant.LINE_SEPARATOR);
Log.d(TAG, " onComplete");
}
};
}
@Override
public void importBooks(List<File> books) {
Observable.fromIterable(books)
.flatMap(file -> ImportBookModel.getInstance().importBook(file))
.compose(RxUtils::toSimpleSingle)
.subscribe(new Observer<LocBookShelfBean>() {
@Override
public void onSubscribe(Disposable d) {
compositeDisposable.add(d);
}
@Override
public void onNext(LocBookShelfBean value) {
if (value.getNew()) {
RxBus.get().post(RxBusTag.HAD_ADD_BOOK, value.getBookShelfBean());
}
}
@Override
public void onError(Throwable e) {
e.printStackTrace();
mView.addError(e.getMessage());
}
@Override
public void onComplete() {
mView.addSuccess();
}
});
}
@Override
protected void subscribeActual(Observer<? super ClickExposureCellOp> observer) {
if (!Preconditions.checkMainThread(observer)) {
return;
}
if (mListener == null) {
mListener = new RxClickListener(mRxClickExposureEvent, observer);
} else {
mListener.setRxClickExposureEvent(mRxClickExposureEvent);
mListener.setObserver(observer);
}
observer.onSubscribe(mListener);
mRxClickExposureEvent.getArg1().setOnClickListener(mListener);
}