下面列出了io.reactivex.Observable#subscribeOn ( ) 实例代码,或者点击链接到github查看源代码,也可以在右侧发表评论。
private Observable wrapObservable(Observable observable) {
if (null == observable) {
return null;
}
if (asynchronized) {
observable = observable.subscribeOn(Schedulers.io());
}
if (null != defaultCreator) {
observable = observable.observeOn(defaultCreator.create());
}
observable = observable.onErrorResumeNext(new Function<Throwable, ObservableSource>() {
@Override
public ObservableSource apply(Throwable throwable) throws Exception {
return Observable.error(ErrorUtils.propagateException(throwable));
}
});
return observable;
}
/**
* Get data in async mode.
* @return observable instance.
*/
public Observable<byte[]> getDataInBackground() {
Observable observable = Observable.fromCallable(new Callable<byte[]>() {
@Override
public byte[] call() throws Exception {
return getData();
}
});
if (AppConfiguration.isAsynchronized()) {
observable = observable.subscribeOn(Schedulers.io());
}
AppConfiguration.SchedulerCreator defaultScheduler = AppConfiguration.getDefaultScheduler();
if (null != defaultScheduler) {
observable = observable.observeOn(defaultScheduler.create());
}
return observable;
}
/**
* Get data stream in async mode.
* @return observable instance.
*/
public Observable<InputStream> getDataStreamInBackground() {
Observable<InputStream> observable = Observable.fromCallable(new Callable<InputStream>() {
@Override
public InputStream call() throws Exception {
return getDataStream();
}
});
if (AppConfiguration.isAsynchronized()) {
observable = observable.subscribeOn(Schedulers.io());
}
AppConfiguration.SchedulerCreator defaultScheduler = AppConfiguration.getDefaultScheduler();
if (null != defaultScheduler) {
observable = observable.observeOn(defaultScheduler.create());
}
return observable;
}
private Observable wrapObservable(Observable observable) {
if (null == observable) {
return null;
}
if (asynchronized) {
observable = observable.subscribeOn(Schedulers.io());
}
if (null != defaultCreator) {
observable = observable.observeOn(defaultCreator.create());
}
observable = observable.onErrorResumeNext(new Function<Throwable, ObservableSource>() {
@Override
public ObservableSource apply(Throwable throwable) throws Exception {
return Observable.error(ErrorUtils.propagateException(throwable));
}
});
return observable;
}
public Observable<AppAccessEndpoint> fetchServerHostsInBackground(final String appId) {
AppRouterService service = retrofit.create(AppRouterService.class);
Observable<AppAccessEndpoint> result = service.getRouter(appId);
if (AppConfiguration.isAsynchronized()) {
result = result.subscribeOn(Schedulers.io());
}
AppConfiguration.SchedulerCreator creator = AppConfiguration.getDefaultScheduler();
if (null != creator) {
result = result.observeOn(creator.create());
}
return result.map(new Function<AppAccessEndpoint, AppAccessEndpoint>() {
@Override
public AppAccessEndpoint apply(AppAccessEndpoint appAccessEndpoint) throws Exception {
// save result to local cache.
LOGGER.d(appAccessEndpoint.toString());
AppRouter.this.defaultEndpoint = appAccessEndpoint;
AppRouter.this.defaultEndpoint.setTtl(appAccessEndpoint.getTtl() + System.currentTimeMillis() / 1000);
SystemSetting setting = AppConfiguration.getDefaultSetting();
if (null != setting) {
String endPoints = JSON.toJSONString(AppRouter.this.defaultEndpoint);
setting.saveString(getPersistenceKeyZone(appId, true), appId, endPoints);
}
return AppRouter.this.defaultEndpoint;
}
});
}
public Observable wrapObservable(Observable observable) {
if (null == observable) {
return null;
}
if (asynchronized) {
observable = observable.subscribeOn(Schedulers.io());
}
if (null != defaultCreator) {
observable = observable.observeOn(defaultCreator.create());
}
observable = observable.onErrorResumeNext(new Function<Throwable, ObservableSource>() {
@Override
public ObservableSource apply(Throwable throwable) throws Exception {
return Observable.error(ErrorUtils.propagateException(throwable));
}
});
return observable;
}
public static <T> List<T> testObservable(Observable<T> observable, int awaitCount) {
assertNotNull(observable);
if (awaitCount == 1) {
return Collections.singletonList(testSingle(observable.firstOrError()));
}
// If you don't do this it blocks trying to do the operation and therefore can't observe the results
observable = observable.subscribeOn(Schedulers.io());
TestObserver<T> observer = observable.test();
observer.awaitCount(awaitCount);
observer.assertNoErrors();
assertEquals(awaitCount, observer.valueCount());
observer.values().forEach(Assert::assertNotNull);
return observer.values();
}
private Observable<RTMConnectionServerResponse> fetchRTMServerFromRemote(final String routerHost, final String appId,
final String installationId, int secure) {
LOGGER.d("fetchRTMServerFromRemote. router=" + routerHost + ", appId=" + appId
+ ", installationId=" + installationId);
Retrofit tmpRetrofit = retrofit.newBuilder().baseUrl(routerHost).build();
AppRouterService tmpService = tmpRetrofit.create(AppRouterService.class);
Observable<RTMConnectionServerResponse> result = tmpService.getRTMConnectionServer(appId, installationId, secure);
if (AppConfiguration.isAsynchronized()) {
result = result.subscribeOn(Schedulers.io());
}
AppConfiguration.SchedulerCreator creator = AppConfiguration.getDefaultScheduler();
if (null != creator) {
result = result.observeOn(creator.create());
}
return result.map(new Function<RTMConnectionServerResponse, RTMConnectionServerResponse>() {
@Override
public RTMConnectionServerResponse apply(RTMConnectionServerResponse rtmConnectionServerResponse) throws Exception {
SystemSetting setting = AppConfiguration.getDefaultSetting();
if (null != rtmConnectionServerResponse && null != setting) {
rtmConnectionServerResponse.setTtl(rtmConnectionServerResponse.getTtl() + System.currentTimeMillis() / 1000);
String cacheResult = JSON.toJSONString(rtmConnectionServerResponse);
setting.saveString(getPersistenceKeyZone(appId, false), routerHost, cacheResult);
}
return rtmConnectionServerResponse;
}
});
}
private Observable wrapObservableInBackground(Observable observable) {
if (null == observable) {
return null;
}
Scheduler scheduler = Schedulers.io();
if (asynchronized) {
observable = observable.subscribeOn(scheduler);
}
if (null != defaultCreator) {
observable = observable.observeOn(scheduler);
}
return observable;
}
@Override
public Object adapt(Call<R> call) {
Observable<Response<R>> responseObservable = mAsync
? new CallEnqueueObservable<>(mCachingSystem, call, mResponseType, mAnnotations, mRetrofit)
: new CallExecuteObservable<>(mCachingSystem, call, mResponseType, mAnnotations, mRetrofit);
Observable<?> observable;
if (mResult) {
observable = new ResultObservable<>(responseObservable);
} else if (mBody) {
observable = new BodyObservable<>(responseObservable);
} else {
observable = responseObservable;
}
if (mScheduler != null) {
observable = observable.subscribeOn(mScheduler);
}
if (mFlowable) {
return observable.toFlowable(BackpressureStrategy.LATEST);
}
if (mSingle) {
return observable.singleOrError();
}
if (mMaybe) {
return observable.singleElement();
}
if (mCompletable) {
return observable.ignoreElements();
}
return observable;
}
@Override
public void getWebsite(final int userId) {
Observable<String> observableWebsite = Observable.create(new ObservableOnSubscribe<String>() {
@Override
public void subscribe(ObservableEmitter<String> emitter) throws Exception {
String website = PredatorDatabase.getInstance()
.getWebsiteForUser(userId);
if (!TextUtils.isEmpty(website)) {
emitter.onNext(website);
} else {
emitter.onError(new NullPointerException("No website available for this user."));
}
emitter.onComplete();
}
});
observableWebsite.subscribeOn(Schedulers.io());
observableWebsite.observeOn(AndroidSchedulers.mainThread());
mCompositeDisposable.add(observableWebsite.subscribeWith(new DisposableObserver<String>(){
@Override
public void onNext(String value) {
Logger.d("onNext: getWebsite: " + value);
mView.websiteAvailable(value);
}
@Override
public void onError(Throwable e) {
Logger.e(TAG, "onError: " + e.getMessage(), e);
mView.websiteUnavailable();
}
@Override
public void onComplete() {
Logger.d(TAG, "onComplete: fetched user website");
}
}));
}
private <T> Observable<T> create(Supplier<T> supplier) {
Observable<T> observable = Observable.create(new ObservableOnSubscribe<T>() {
@Override
public void subscribe(ObservableEmitter<T> emitter) throws Exception {
try {
T response = supplier.get();
if (!emitter.isDisposed()) {
emitter.onNext(response);
}
if (!emitter.isDisposed()) {
emitter.onComplete();
}
} catch (Throwable e) {
if (!emitter.isDisposed()) {
emitter.onError(e);
}
}
}
});
if (sc == null) {
return observable.subscribeOn(Schedulers.io());
}
return observable.subscribeOn(sc).observeOn(sc);
}
private <T> Observable<T> assign(Observable<T> source) {
return source.subscribeOn(BurstKitUtils.defaultBurstNodeServiceScheduler());
}
public Observable<String> getCacheRawResult(final String className, final String cacheKey,
final long maxAgeInMilliseconds, final boolean isFinal) {
LOGGER.d("try to get cache raw result for class:" + className);
AppConfiguration.SchedulerCreator creator = AppConfiguration.getDefaultScheduler();
boolean isAsync = AppConfiguration.isAsynchronized();
Callable<String> callable = new Callable<String>() {
public String call() throws Exception {
File cacheFile = getCacheFile(cacheKey);
if (null == cacheFile || !cacheFile.exists()) {
LOGGER.d("cache file(key=" + cacheKey + ") not existed.");
if (isFinal) {
throw new FileNotFoundException("cache is not existed.");
} else {
return "";
}
}
if (maxAgeInMilliseconds > 0 && (System.currentTimeMillis() - cacheFile.lastModified() > maxAgeInMilliseconds)) {
LOGGER.d("cache file(key=" + cacheKey + ") is expired.");
if (isFinal) {
throw new FileNotFoundException("cache file is expired.");
} else {
return "";
}
}
byte[] data = readData(cacheFile);
if (null == data) {
LOGGER.d("cache file(key=" + cacheKey + ") is empty.");
if (isFinal) {
throw new InterruptedException("failed to read cache file.");
} else {
return "";
}
}
String content = new String(data, 0, data.length, "UTF-8");
LOGGER.d("cache file(key=" + cacheKey + "), content: " + content);
return content;
}
};
FutureTask<String> futureTask = new FutureTask<>(callable);
executor.submit(futureTask);
Observable result = Observable.fromFuture(futureTask);
if (isAsync) {
result = result.subscribeOn(Schedulers.io());
}
if (null != creator) {
result = result.observeOn(creator.create());
}
return result;
}
@Override
public void getMedia(final int postId, final int mediaId) {
Observable<MediaData> mediaObservable = Observable.create(new ObservableOnSubscribe<MediaData>() {
@Override
public void subscribe(ObservableEmitter<MediaData> emitter) throws Exception {
// Fetch media from database.
List<Media> media = PredatorDatabase.getInstance()
.getMediaForPost(postId);
int defaultPosition = 0;
if (media != null && !media.isEmpty()) {
for (int i = 0; i < media.size(); i++) {
if (mediaId == media.get(i).getMediaId()) {
defaultPosition = i;
}
}
emitter.onNext(new MediaData(media, defaultPosition));
} else {
emitter.onError(new NullPointerException("No media available for this post"));
}
emitter.onComplete();
}
});
mediaObservable.subscribeOn(Schedulers.io());
mediaObservable.observeOn(AndroidSchedulers.mainThread());
mCompositeDisposable.add(mediaObservable.subscribeWith(new DisposableObserver<MediaData>() {
@Override
public void onComplete() {
// Done
}
@Override
public void onError(Throwable e) {
mView.unableToFetchMedia(e.getMessage());
}
@Override
public void onNext(MediaData mediaData) {
mView.showMedia(mediaData.getMedia(), mediaData.getDefaultPosition());
}
}));
}
@Override
public void clearCache() {
Observable<Boolean> clearCacheObservable = Observable.create(new ObservableOnSubscribe<Boolean>() {
@Override
public void subscribe(ObservableEmitter<Boolean> emitter) throws Exception {
// Delete all values from all tables except CATEGORY TABLE AS IT MUST STAY AS IT IS.
// Delete all values from posts_table.
PredatorDatabase.getInstance()
.deleteAllPosts();
// Delete all values from users_table.
PredatorDatabase.getInstance()
.deleteAllUsers();
// Delete all values from comments_table.
PredatorDatabase.getInstance()
.deleteAllComments();
// Delete all values from install_links_table.
PredatorDatabase.getInstance()
.deleteAllInstallLinks();
// Delete all values from media_table.
PredatorDatabase.getInstance()
.deleteAllMedia();
// Delete all values from collections_table.
PredatorDatabase.getInstance()
.deleteAllCollections();
// Clear fresco cache.
Fresco.getImagePipeline().clearCaches();
emitter.onNext(true);
emitter.onComplete();
}
});
clearCacheObservable.subscribeOn(Schedulers.io());
clearCacheObservable.observeOn(AndroidSchedulers.mainThread());
mCompositeDisposable.add(clearCacheObservable.subscribeWith(new DisposableObserver<Boolean>() {
@Override
public void onComplete() {
// Done
}
@Override
public void onError(Throwable e) {
Logger.e(TAG, "onError: " + e.getMessage(), e);
mView.unableToWipeCache();
}
@Override
public void onNext(Boolean aBoolean) {
Logger.d(TAG, "onNext: cache cleared");
mView.cacheCleared();
}
}));
}