下面列出了io.reactivex.Observable#observeOn ( ) 实例代码,或者点击链接到github查看源代码,也可以在右侧发表评论。
@Override
public ObservableSource<T> apply(Observable<T> upstream) {
switch (mSchedulerType) {
case _main:
return upstream.observeOn(AndroidSchedulers.mainThread());
case _io:
return upstream.observeOn(RxSchedulerUtils.io(mIOExecutor));
case _io_main:
return upstream
.subscribeOn(RxSchedulerUtils.io(mIOExecutor))
.unsubscribeOn(RxSchedulerUtils.io(mIOExecutor))
.observeOn(AndroidSchedulers.mainThread());
case _io_io:
return upstream
.subscribeOn(RxSchedulerUtils.io(mIOExecutor))
.unsubscribeOn(RxSchedulerUtils.io(mIOExecutor))
.observeOn(RxSchedulerUtils.io(mIOExecutor));
default:
break;
}
return upstream;
}
private Observable wrapObservable(Observable observable) {
if (null == observable) {
return null;
}
if (asynchronized) {
observable = observable.subscribeOn(Schedulers.io());
}
if (null != defaultCreator) {
observable = observable.observeOn(defaultCreator.create());
}
observable = observable.onErrorResumeNext(new Function<Throwable, ObservableSource>() {
@Override
public ObservableSource apply(Throwable throwable) throws Exception {
return Observable.error(ErrorUtils.propagateException(throwable));
}
});
return observable;
}
/**
* Get data in async mode.
* @return observable instance.
*/
public Observable<byte[]> getDataInBackground() {
Observable observable = Observable.fromCallable(new Callable<byte[]>() {
@Override
public byte[] call() throws Exception {
return getData();
}
});
if (AppConfiguration.isAsynchronized()) {
observable = observable.subscribeOn(Schedulers.io());
}
AppConfiguration.SchedulerCreator defaultScheduler = AppConfiguration.getDefaultScheduler();
if (null != defaultScheduler) {
observable = observable.observeOn(defaultScheduler.create());
}
return observable;
}
/**
* Get data stream in async mode.
* @return observable instance.
*/
public Observable<InputStream> getDataStreamInBackground() {
Observable<InputStream> observable = Observable.fromCallable(new Callable<InputStream>() {
@Override
public InputStream call() throws Exception {
return getDataStream();
}
});
if (AppConfiguration.isAsynchronized()) {
observable = observable.subscribeOn(Schedulers.io());
}
AppConfiguration.SchedulerCreator defaultScheduler = AppConfiguration.getDefaultScheduler();
if (null != defaultScheduler) {
observable = observable.observeOn(defaultScheduler.create());
}
return observable;
}
private Observable wrapObservable(Observable observable) {
if (null == observable) {
return null;
}
if (asynchronized) {
observable = observable.subscribeOn(Schedulers.io());
}
if (null != defaultCreator) {
observable = observable.observeOn(defaultCreator.create());
}
observable = observable.onErrorResumeNext(new Function<Throwable, ObservableSource>() {
@Override
public ObservableSource apply(Throwable throwable) throws Exception {
return Observable.error(ErrorUtils.propagateException(throwable));
}
});
return observable;
}
public Observable<AppAccessEndpoint> fetchServerHostsInBackground(final String appId) {
AppRouterService service = retrofit.create(AppRouterService.class);
Observable<AppAccessEndpoint> result = service.getRouter(appId);
if (AppConfiguration.isAsynchronized()) {
result = result.subscribeOn(Schedulers.io());
}
AppConfiguration.SchedulerCreator creator = AppConfiguration.getDefaultScheduler();
if (null != creator) {
result = result.observeOn(creator.create());
}
return result.map(new Function<AppAccessEndpoint, AppAccessEndpoint>() {
@Override
public AppAccessEndpoint apply(AppAccessEndpoint appAccessEndpoint) throws Exception {
// save result to local cache.
LOGGER.d(appAccessEndpoint.toString());
AppRouter.this.defaultEndpoint = appAccessEndpoint;
AppRouter.this.defaultEndpoint.setTtl(appAccessEndpoint.getTtl() + System.currentTimeMillis() / 1000);
SystemSetting setting = AppConfiguration.getDefaultSetting();
if (null != setting) {
String endPoints = JSON.toJSONString(AppRouter.this.defaultEndpoint);
setting.saveString(getPersistenceKeyZone(appId, true), appId, endPoints);
}
return AppRouter.this.defaultEndpoint;
}
});
}
public Observable wrapObservable(Observable observable) {
if (null == observable) {
return null;
}
if (asynchronized) {
observable = observable.subscribeOn(Schedulers.io());
}
if (null != defaultCreator) {
observable = observable.observeOn(defaultCreator.create());
}
observable = observable.onErrorResumeNext(new Function<Throwable, ObservableSource>() {
@Override
public ObservableSource apply(Throwable throwable) throws Exception {
return Observable.error(ErrorUtils.propagateException(throwable));
}
});
return observable;
}
private void subscribeActual(Observer<? super T> observer) {
Observable<T> upStream = this.upStream;
if (onMain) {
upStream = upStream.observeOn(AndroidSchedulers.mainThread());
}
upStream.onTerminateDetach().subscribe(new LifeObserver<>(observer, scope));
}
private Observable<RTMConnectionServerResponse> fetchRTMServerFromRemote(final String routerHost, final String appId,
final String installationId, int secure) {
LOGGER.d("fetchRTMServerFromRemote. router=" + routerHost + ", appId=" + appId
+ ", installationId=" + installationId);
Retrofit tmpRetrofit = retrofit.newBuilder().baseUrl(routerHost).build();
AppRouterService tmpService = tmpRetrofit.create(AppRouterService.class);
Observable<RTMConnectionServerResponse> result = tmpService.getRTMConnectionServer(appId, installationId, secure);
if (AppConfiguration.isAsynchronized()) {
result = result.subscribeOn(Schedulers.io());
}
AppConfiguration.SchedulerCreator creator = AppConfiguration.getDefaultScheduler();
if (null != creator) {
result = result.observeOn(creator.create());
}
return result.map(new Function<RTMConnectionServerResponse, RTMConnectionServerResponse>() {
@Override
public RTMConnectionServerResponse apply(RTMConnectionServerResponse rtmConnectionServerResponse) throws Exception {
SystemSetting setting = AppConfiguration.getDefaultSetting();
if (null != rtmConnectionServerResponse && null != setting) {
rtmConnectionServerResponse.setTtl(rtmConnectionServerResponse.getTtl() + System.currentTimeMillis() / 1000);
String cacheResult = JSON.toJSONString(rtmConnectionServerResponse);
setting.saveString(getPersistenceKeyZone(appId, false), routerHost, cacheResult);
}
return rtmConnectionServerResponse;
}
});
}
private Observable wrapObservableInBackground(Observable observable) {
if (null == observable) {
return null;
}
Scheduler scheduler = Schedulers.io();
if (asynchronized) {
observable = observable.subscribeOn(scheduler);
}
if (null != defaultCreator) {
observable = observable.observeOn(scheduler);
}
return observable;
}
@Override
public void getWebsite(final int userId) {
Observable<String> observableWebsite = Observable.create(new ObservableOnSubscribe<String>() {
@Override
public void subscribe(ObservableEmitter<String> emitter) throws Exception {
String website = PredatorDatabase.getInstance()
.getWebsiteForUser(userId);
if (!TextUtils.isEmpty(website)) {
emitter.onNext(website);
} else {
emitter.onError(new NullPointerException("No website available for this user."));
}
emitter.onComplete();
}
});
observableWebsite.subscribeOn(Schedulers.io());
observableWebsite.observeOn(AndroidSchedulers.mainThread());
mCompositeDisposable.add(observableWebsite.subscribeWith(new DisposableObserver<String>(){
@Override
public void onNext(String value) {
Logger.d("onNext: getWebsite: " + value);
mView.websiteAvailable(value);
}
@Override
public void onError(Throwable e) {
Logger.e(TAG, "onError: " + e.getMessage(), e);
mView.websiteUnavailable();
}
@Override
public void onComplete() {
Logger.d(TAG, "onComplete: fetched user website");
}
}));
}
@Override
public ObservableSource<T> apply(Observable<T> upstream) {
Observable<T> tObservable = upstream
.observeOn(AndroidSchedulers.mainThread());
if (provider == null) {
return tObservable;
}
return tObservable.compose(RxLifecycleDelegate.<T>bindLifecycle(provider));
}
public <T> ObservableTransformer<T, T> applyObservableMainThread() {
return new ObservableTransformer<T, T>() {
@Override
public ObservableSource<T> apply(Observable<T> observable) {
return observable.observeOn(AndroidSchedulers.mainThread());
}
};
}
public Observable<String> getCacheRawResult(final String className, final String cacheKey,
final long maxAgeInMilliseconds, final boolean isFinal) {
LOGGER.d("try to get cache raw result for class:" + className);
AppConfiguration.SchedulerCreator creator = AppConfiguration.getDefaultScheduler();
boolean isAsync = AppConfiguration.isAsynchronized();
Callable<String> callable = new Callable<String>() {
public String call() throws Exception {
File cacheFile = getCacheFile(cacheKey);
if (null == cacheFile || !cacheFile.exists()) {
LOGGER.d("cache file(key=" + cacheKey + ") not existed.");
if (isFinal) {
throw new FileNotFoundException("cache is not existed.");
} else {
return "";
}
}
if (maxAgeInMilliseconds > 0 && (System.currentTimeMillis() - cacheFile.lastModified() > maxAgeInMilliseconds)) {
LOGGER.d("cache file(key=" + cacheKey + ") is expired.");
if (isFinal) {
throw new FileNotFoundException("cache file is expired.");
} else {
return "";
}
}
byte[] data = readData(cacheFile);
if (null == data) {
LOGGER.d("cache file(key=" + cacheKey + ") is empty.");
if (isFinal) {
throw new InterruptedException("failed to read cache file.");
} else {
return "";
}
}
String content = new String(data, 0, data.length, "UTF-8");
LOGGER.d("cache file(key=" + cacheKey + "), content: " + content);
return content;
}
};
FutureTask<String> futureTask = new FutureTask<>(callable);
executor.submit(futureTask);
Observable result = Observable.fromFuture(futureTask);
if (isAsync) {
result = result.subscribeOn(Schedulers.io());
}
if (null != creator) {
result = result.observeOn(creator.create());
}
return result;
}
@Override
public void getMedia(final int postId, final int mediaId) {
Observable<MediaData> mediaObservable = Observable.create(new ObservableOnSubscribe<MediaData>() {
@Override
public void subscribe(ObservableEmitter<MediaData> emitter) throws Exception {
// Fetch media from database.
List<Media> media = PredatorDatabase.getInstance()
.getMediaForPost(postId);
int defaultPosition = 0;
if (media != null && !media.isEmpty()) {
for (int i = 0; i < media.size(); i++) {
if (mediaId == media.get(i).getMediaId()) {
defaultPosition = i;
}
}
emitter.onNext(new MediaData(media, defaultPosition));
} else {
emitter.onError(new NullPointerException("No media available for this post"));
}
emitter.onComplete();
}
});
mediaObservable.subscribeOn(Schedulers.io());
mediaObservable.observeOn(AndroidSchedulers.mainThread());
mCompositeDisposable.add(mediaObservable.subscribeWith(new DisposableObserver<MediaData>() {
@Override
public void onComplete() {
// Done
}
@Override
public void onError(Throwable e) {
mView.unableToFetchMedia(e.getMessage());
}
@Override
public void onNext(MediaData mediaData) {
mView.showMedia(mediaData.getMedia(), mediaData.getDefaultPosition());
}
}));
}
@Override
public void clearCache() {
Observable<Boolean> clearCacheObservable = Observable.create(new ObservableOnSubscribe<Boolean>() {
@Override
public void subscribe(ObservableEmitter<Boolean> emitter) throws Exception {
// Delete all values from all tables except CATEGORY TABLE AS IT MUST STAY AS IT IS.
// Delete all values from posts_table.
PredatorDatabase.getInstance()
.deleteAllPosts();
// Delete all values from users_table.
PredatorDatabase.getInstance()
.deleteAllUsers();
// Delete all values from comments_table.
PredatorDatabase.getInstance()
.deleteAllComments();
// Delete all values from install_links_table.
PredatorDatabase.getInstance()
.deleteAllInstallLinks();
// Delete all values from media_table.
PredatorDatabase.getInstance()
.deleteAllMedia();
// Delete all values from collections_table.
PredatorDatabase.getInstance()
.deleteAllCollections();
// Clear fresco cache.
Fresco.getImagePipeline().clearCaches();
emitter.onNext(true);
emitter.onComplete();
}
});
clearCacheObservable.subscribeOn(Schedulers.io());
clearCacheObservable.observeOn(AndroidSchedulers.mainThread());
mCompositeDisposable.add(clearCacheObservable.subscribeWith(new DisposableObserver<Boolean>() {
@Override
public void onComplete() {
// Done
}
@Override
public void onError(Throwable e) {
Logger.e(TAG, "onError: " + e.getMessage(), e);
mView.unableToWipeCache();
}
@Override
public void onNext(Boolean aBoolean) {
Logger.d(TAG, "onNext: cache cleared");
mView.cacheCleared();
}
}));
}
/**
* 回到主线程
*
* @param observable 被观察者
*/
public static <T> Observable<T> toMain(Observable<T> observable) {
return observable.observeOn(AndroidSchedulers.mainThread());
}
/**
* 回到io线程
*
* @param observable 被观察者
*/
public static <T> Observable<T> toIo(Observable<T> observable) {
return observable.observeOn(Schedulers.io());
}