io.reactivex.Observable#observeOn ( )源码实例Demo

下面列出了io.reactivex.Observable#observeOn ( ) 实例代码,或者点击链接到github查看源代码,也可以在右侧发表评论。

源代码1 项目: Collection-Android   文件: SchedulerTransformer.java
@Override
public ObservableSource<T> apply(Observable<T> upstream) {
    switch (mSchedulerType) {
        case _main:
            return upstream.observeOn(AndroidSchedulers.mainThread());
        case _io:
            return upstream.observeOn(RxSchedulerUtils.io(mIOExecutor));
        case _io_main:
            return upstream
                    .subscribeOn(RxSchedulerUtils.io(mIOExecutor))
                    .unsubscribeOn(RxSchedulerUtils.io(mIOExecutor))
                    .observeOn(AndroidSchedulers.mainThread());
        case _io_io:
            return upstream
                    .subscribeOn(RxSchedulerUtils.io(mIOExecutor))
                    .unsubscribeOn(RxSchedulerUtils.io(mIOExecutor))
                    .observeOn(RxSchedulerUtils.io(mIOExecutor));
        default:
            break;
    }
    return upstream;
}
 
源代码2 项目: java-unified-sdk   文件: RealtimeClient.java
private Observable wrapObservable(Observable observable) {
  if (null == observable) {
    return null;
  }
  if (asynchronized) {
    observable = observable.subscribeOn(Schedulers.io());
  }
  if (null != defaultCreator) {
    observable = observable.observeOn(defaultCreator.create());
  }
  observable = observable.onErrorResumeNext(new Function<Throwable, ObservableSource>() {
    @Override
    public ObservableSource apply(Throwable throwable) throws Exception {
      return Observable.error(ErrorUtils.propagateException(throwable));
    }
  });
  return observable;
}
 
源代码3 项目: java-unified-sdk   文件: AVFile.java
/**
 * Get data in async mode.
 * @return observable instance.
 */
public Observable<byte[]> getDataInBackground() {
  Observable observable = Observable.fromCallable(new Callable<byte[]>() {
    @Override
    public byte[] call() throws Exception {
      return getData();
    }
  });
  if (AppConfiguration.isAsynchronized()) {
    observable = observable.subscribeOn(Schedulers.io());
  }
  AppConfiguration.SchedulerCreator defaultScheduler = AppConfiguration.getDefaultScheduler();
  if (null != defaultScheduler) {
    observable = observable.observeOn(defaultScheduler.create());
  }
  return observable;
}
 
源代码4 项目: java-unified-sdk   文件: AVFile.java
/**
 * Get data stream in async mode.
 * @return observable instance.
 */
public Observable<InputStream> getDataStreamInBackground() {
  Observable<InputStream> observable = Observable.fromCallable(new Callable<InputStream>() {
    @Override
    public InputStream call() throws Exception {
      return getDataStream();
    }
  });
  if (AppConfiguration.isAsynchronized()) {
    observable = observable.subscribeOn(Schedulers.io());
  }
  AppConfiguration.SchedulerCreator defaultScheduler = AppConfiguration.getDefaultScheduler();
  if (null != defaultScheduler) {
    observable = observable.observeOn(defaultScheduler.create());
  }
  return observable;
}
 
源代码5 项目: java-unified-sdk   文件: PushClient.java
private Observable wrapObservable(Observable observable) {
  if (null == observable) {
    return null;
  }
  if (asynchronized) {
    observable = observable.subscribeOn(Schedulers.io());
  }
  if (null != defaultCreator) {
    observable = observable.observeOn(defaultCreator.create());
  }
  observable = observable.onErrorResumeNext(new Function<Throwable, ObservableSource>() {
    @Override
    public ObservableSource apply(Throwable throwable) throws Exception {
      return Observable.error(ErrorUtils.propagateException(throwable));
    }
  });
  return observable;
}
 
源代码6 项目: java-unified-sdk   文件: AppRouter.java
public Observable<AppAccessEndpoint> fetchServerHostsInBackground(final String appId) {
  AppRouterService service = retrofit.create(AppRouterService.class);
  Observable<AppAccessEndpoint> result = service.getRouter(appId);
  if (AppConfiguration.isAsynchronized()) {
    result = result.subscribeOn(Schedulers.io());
  }
  AppConfiguration.SchedulerCreator creator = AppConfiguration.getDefaultScheduler();
  if (null != creator) {
    result = result.observeOn(creator.create());
  }
  return result.map(new Function<AppAccessEndpoint, AppAccessEndpoint>() {
    @Override
    public AppAccessEndpoint apply(AppAccessEndpoint appAccessEndpoint) throws Exception {
      // save result to local cache.
      LOGGER.d(appAccessEndpoint.toString());
      AppRouter.this.defaultEndpoint = appAccessEndpoint;
      AppRouter.this.defaultEndpoint.setTtl(appAccessEndpoint.getTtl() + System.currentTimeMillis() / 1000);
      SystemSetting setting = AppConfiguration.getDefaultSetting();
      if (null != setting) {
        String endPoints = JSON.toJSONString(AppRouter.this.defaultEndpoint);
        setting.saveString(getPersistenceKeyZone(appId, true), appId, endPoints);
      }
      return AppRouter.this.defaultEndpoint;
    }
  });
}
 
源代码7 项目: java-unified-sdk   文件: StorageClient.java
public Observable wrapObservable(Observable observable) {
  if (null == observable) {
    return null;
  }
  if (asynchronized) {
    observable = observable.subscribeOn(Schedulers.io());
  }
  if (null != defaultCreator) {
    observable = observable.observeOn(defaultCreator.create());
  }
  observable = observable.onErrorResumeNext(new Function<Throwable, ObservableSource>() {
    @Override
    public ObservableSource apply(Throwable throwable) throws Exception {
      return Observable.error(ErrorUtils.propagateException(throwable));
    }
  });
  return observable;
}
 
源代码8 项目: rxjava-RxLife   文件: ObservableLife.java
private void subscribeActual(Observer<? super T> observer) {
    Observable<T> upStream = this.upStream;
    if (onMain) {
        upStream = upStream.observeOn(AndroidSchedulers.mainThread());
    }
    upStream.onTerminateDetach().subscribe(new LifeObserver<>(observer, scope));
}
 
源代码9 项目: java-unified-sdk   文件: AppRouter.java
private Observable<RTMConnectionServerResponse> fetchRTMServerFromRemote(final String routerHost, final String appId,
                                                                         final String installationId, int secure) {
  LOGGER.d("fetchRTMServerFromRemote. router=" + routerHost + ", appId=" + appId
          + ", installationId=" + installationId);
  Retrofit tmpRetrofit = retrofit.newBuilder().baseUrl(routerHost).build();
  AppRouterService tmpService = tmpRetrofit.create(AppRouterService.class);
  Observable<RTMConnectionServerResponse> result = tmpService.getRTMConnectionServer(appId, installationId, secure);
  if (AppConfiguration.isAsynchronized()) {
    result = result.subscribeOn(Schedulers.io());
  }
  AppConfiguration.SchedulerCreator creator = AppConfiguration.getDefaultScheduler();
  if (null != creator) {
    result = result.observeOn(creator.create());
  }
  return result.map(new Function<RTMConnectionServerResponse, RTMConnectionServerResponse>() {
    @Override
    public RTMConnectionServerResponse apply(RTMConnectionServerResponse rtmConnectionServerResponse) throws Exception {
      SystemSetting setting = AppConfiguration.getDefaultSetting();
      if (null != rtmConnectionServerResponse && null != setting) {
        rtmConnectionServerResponse.setTtl(rtmConnectionServerResponse.getTtl() + System.currentTimeMillis() / 1000);
        String cacheResult = JSON.toJSONString(rtmConnectionServerResponse);
        setting.saveString(getPersistenceKeyZone(appId, false), routerHost, cacheResult);
      }
      return rtmConnectionServerResponse;
    }
  });
}
 
源代码10 项目: java-unified-sdk   文件: StorageClient.java
private Observable wrapObservableInBackground(Observable observable) {
  if (null == observable) {
    return null;
  }
  Scheduler scheduler = Schedulers.io();
  if (asynchronized) {
    observable = observable.subscribeOn(scheduler);
  }
  if (null != defaultCreator) {
    observable = observable.observeOn(scheduler);
  }
  return observable;
}
 
源代码11 项目: Capstone-Project   文件: UserProfilePresenter.java
@Override
public void getWebsite(final int userId) {
    Observable<String> observableWebsite = Observable.create(new ObservableOnSubscribe<String>() {
        @Override
        public void subscribe(ObservableEmitter<String> emitter) throws Exception {
            String website = PredatorDatabase.getInstance()
                    .getWebsiteForUser(userId);
            if (!TextUtils.isEmpty(website)) {
                emitter.onNext(website);
            } else {
                emitter.onError(new NullPointerException("No website available for this user."));
            }
            emitter.onComplete();
        }
    });

    observableWebsite.subscribeOn(Schedulers.io());
    observableWebsite.observeOn(AndroidSchedulers.mainThread());

    mCompositeDisposable.add(observableWebsite.subscribeWith(new DisposableObserver<String>(){
        @Override
        public void onNext(String value) {
            Logger.d("onNext: getWebsite: " + value);
            mView.websiteAvailable(value);
        }

        @Override
        public void onError(Throwable e) {
            Logger.e(TAG, "onError: " + e.getMessage(), e);
            mView.websiteUnavailable();
        }

        @Override
        public void onComplete() {
            Logger.d(TAG, "onComplete: fetched user website");
        }
    }));
}
 
源代码12 项目: pandroid   文件: MainObserverTransformer.java
@Override
public ObservableSource<T> apply(Observable<T> upstream) {
    Observable<T> tObservable = upstream
            .observeOn(AndroidSchedulers.mainThread());
    if (provider == null) {
        return tObservable;
    }
    return tObservable.compose(RxLifecycleDelegate.<T>bindLifecycle(provider));
}
 
源代码13 项目: RxJava2-Android-Samples   文件: RxSchedulers.java
public <T> ObservableTransformer<T, T> applyObservableMainThread() {
    return new ObservableTransformer<T, T>() {
        @Override
        public ObservableSource<T> apply(Observable<T> observable) {
            return observable.observeOn(AndroidSchedulers.mainThread());
        }
    };
}
 
源代码14 项目: java-unified-sdk   文件: QueryResultCache.java
public Observable<String> getCacheRawResult(final String className, final String cacheKey,
                                            final long maxAgeInMilliseconds, final boolean isFinal) {
  LOGGER.d("try to get cache raw result for class:" + className);
  AppConfiguration.SchedulerCreator creator = AppConfiguration.getDefaultScheduler();
  boolean isAsync = AppConfiguration.isAsynchronized();

  Callable<String> callable = new Callable<String>() {
    public String call() throws Exception {
      File cacheFile = getCacheFile(cacheKey);
      if (null == cacheFile || !cacheFile.exists()) {
        LOGGER.d("cache file(key=" + cacheKey + ") not existed.");
        if (isFinal) {
          throw new FileNotFoundException("cache is not existed.");
        } else {
          return "";
        }
      }
      if (maxAgeInMilliseconds > 0 && (System.currentTimeMillis() - cacheFile.lastModified() > maxAgeInMilliseconds)) {
        LOGGER.d("cache file(key=" + cacheKey + ") is expired.");
        if (isFinal) {
          throw new FileNotFoundException("cache file is expired.");
        } else {
          return "";
        }
      }
      byte[] data = readData(cacheFile);
      if (null == data) {
        LOGGER.d("cache file(key=" + cacheKey + ") is empty.");
        if (isFinal) {
          throw new InterruptedException("failed to read cache file.");
        } else {
          return "";
        }
      }
      String content = new String(data, 0, data.length, "UTF-8");
      LOGGER.d("cache file(key=" + cacheKey + "), content: " + content);
      return content;
    }
  };
  FutureTask<String> futureTask = new FutureTask<>(callable);
  executor.submit(futureTask);
  Observable result = Observable.fromFuture(futureTask);
  if (isAsync) {
    result = result.subscribeOn(Schedulers.io());
  }
  if (null != creator) {
    result = result.observeOn(creator.create());
  }
  return result;
}
 
源代码15 项目: Capstone-Project   文件: MediaDetailsPresenter.java
@Override
public void getMedia(final int postId, final int mediaId) {
    Observable<MediaData> mediaObservable = Observable.create(new ObservableOnSubscribe<MediaData>() {
        @Override
        public void subscribe(ObservableEmitter<MediaData> emitter) throws Exception {
            // Fetch media from database.
            List<Media> media = PredatorDatabase.getInstance()
                    .getMediaForPost(postId);

            int defaultPosition = 0;

            if (media != null && !media.isEmpty()) {
                for (int i = 0; i < media.size(); i++) {
                    if (mediaId == media.get(i).getMediaId()) {
                        defaultPosition = i;
                    }
                }
                emitter.onNext(new MediaData(media, defaultPosition));
            } else {
                emitter.onError(new NullPointerException("No media available for this post"));
            }
            emitter.onComplete();
        }
    });

    mediaObservable.subscribeOn(Schedulers.io());
    mediaObservable.observeOn(AndroidSchedulers.mainThread());

    mCompositeDisposable.add(mediaObservable.subscribeWith(new DisposableObserver<MediaData>() {
        @Override
        public void onComplete() {
            // Done
        }

        @Override
        public void onError(Throwable e) {
            mView.unableToFetchMedia(e.getMessage());
        }

        @Override
        public void onNext(MediaData mediaData) {
            mView.showMedia(mediaData.getMedia(), mediaData.getDefaultPosition());
        }
    }));
}
 
源代码16 项目: Capstone-Project   文件: SettingsPresenter.java
@Override
public void clearCache() {
    Observable<Boolean> clearCacheObservable = Observable.create(new ObservableOnSubscribe<Boolean>() {
        @Override
        public void subscribe(ObservableEmitter<Boolean> emitter) throws Exception {
            // Delete all values from all tables except CATEGORY TABLE AS IT MUST STAY AS IT IS.
            // Delete all values from posts_table.
            PredatorDatabase.getInstance()
                    .deleteAllPosts();
            // Delete all values from users_table.
            PredatorDatabase.getInstance()
                    .deleteAllUsers();
            // Delete all values from comments_table.
            PredatorDatabase.getInstance()
                    .deleteAllComments();
            // Delete all values from install_links_table.
            PredatorDatabase.getInstance()
                    .deleteAllInstallLinks();
            // Delete all values from media_table.
            PredatorDatabase.getInstance()
                    .deleteAllMedia();
            // Delete all values from collections_table.
            PredatorDatabase.getInstance()
                    .deleteAllCollections();

            // Clear fresco cache.
            Fresco.getImagePipeline().clearCaches();

            emitter.onNext(true);
            emitter.onComplete();
        }
    });

    clearCacheObservable.subscribeOn(Schedulers.io());
    clearCacheObservable.observeOn(AndroidSchedulers.mainThread());

    mCompositeDisposable.add(clearCacheObservable.subscribeWith(new DisposableObserver<Boolean>() {
        @Override
        public void onComplete() {
            // Done
        }

        @Override
        public void onError(Throwable e) {
            Logger.e(TAG, "onError: " + e.getMessage(), e);
            mView.unableToWipeCache();
        }

        @Override
        public void onNext(Boolean aBoolean) {
            Logger.d(TAG, "onNext: cache cleared");
            mView.cacheCleared();
        }
    }));
}
 
源代码17 项目: Collection-Android   文件: RxSchedulerUtils.java
/**
 * 回到主线程
 *
 * @param observable 被观察者
 */
public static <T> Observable<T> toMain(Observable<T> observable) {
    return observable.observeOn(AndroidSchedulers.mainThread());
}
 
源代码18 项目: Collection-Android   文件: RxSchedulerUtils.java
/**
 * 回到io线程
 *
 * @param observable 被观察者
 */
public static <T> Observable<T> toIo(Observable<T> observable) {
    return observable.observeOn(Schedulers.io());
}