io.reactivex.Observable#subscribeOn ( )源码实例Demo

下面列出了io.reactivex.Observable#subscribeOn ( ) 实例代码,或者点击链接到github查看源代码,也可以在右侧发表评论。

源代码1 项目: java-unified-sdk   文件: RealtimeClient.java
private Observable wrapObservable(Observable observable) {
  if (null == observable) {
    return null;
  }
  if (asynchronized) {
    observable = observable.subscribeOn(Schedulers.io());
  }
  if (null != defaultCreator) {
    observable = observable.observeOn(defaultCreator.create());
  }
  observable = observable.onErrorResumeNext(new Function<Throwable, ObservableSource>() {
    @Override
    public ObservableSource apply(Throwable throwable) throws Exception {
      return Observable.error(ErrorUtils.propagateException(throwable));
    }
  });
  return observable;
}
 
源代码2 项目: java-unified-sdk   文件: AVFile.java
/**
 * Get data in async mode.
 * @return observable instance.
 */
public Observable<byte[]> getDataInBackground() {
  Observable observable = Observable.fromCallable(new Callable<byte[]>() {
    @Override
    public byte[] call() throws Exception {
      return getData();
    }
  });
  if (AppConfiguration.isAsynchronized()) {
    observable = observable.subscribeOn(Schedulers.io());
  }
  AppConfiguration.SchedulerCreator defaultScheduler = AppConfiguration.getDefaultScheduler();
  if (null != defaultScheduler) {
    observable = observable.observeOn(defaultScheduler.create());
  }
  return observable;
}
 
源代码3 项目: java-unified-sdk   文件: AVFile.java
/**
 * Get data stream in async mode.
 * @return observable instance.
 */
public Observable<InputStream> getDataStreamInBackground() {
  Observable<InputStream> observable = Observable.fromCallable(new Callable<InputStream>() {
    @Override
    public InputStream call() throws Exception {
      return getDataStream();
    }
  });
  if (AppConfiguration.isAsynchronized()) {
    observable = observable.subscribeOn(Schedulers.io());
  }
  AppConfiguration.SchedulerCreator defaultScheduler = AppConfiguration.getDefaultScheduler();
  if (null != defaultScheduler) {
    observable = observable.observeOn(defaultScheduler.create());
  }
  return observable;
}
 
源代码4 项目: java-unified-sdk   文件: PushClient.java
private Observable wrapObservable(Observable observable) {
  if (null == observable) {
    return null;
  }
  if (asynchronized) {
    observable = observable.subscribeOn(Schedulers.io());
  }
  if (null != defaultCreator) {
    observable = observable.observeOn(defaultCreator.create());
  }
  observable = observable.onErrorResumeNext(new Function<Throwable, ObservableSource>() {
    @Override
    public ObservableSource apply(Throwable throwable) throws Exception {
      return Observable.error(ErrorUtils.propagateException(throwable));
    }
  });
  return observable;
}
 
源代码5 项目: java-unified-sdk   文件: AppRouter.java
public Observable<AppAccessEndpoint> fetchServerHostsInBackground(final String appId) {
  AppRouterService service = retrofit.create(AppRouterService.class);
  Observable<AppAccessEndpoint> result = service.getRouter(appId);
  if (AppConfiguration.isAsynchronized()) {
    result = result.subscribeOn(Schedulers.io());
  }
  AppConfiguration.SchedulerCreator creator = AppConfiguration.getDefaultScheduler();
  if (null != creator) {
    result = result.observeOn(creator.create());
  }
  return result.map(new Function<AppAccessEndpoint, AppAccessEndpoint>() {
    @Override
    public AppAccessEndpoint apply(AppAccessEndpoint appAccessEndpoint) throws Exception {
      // save result to local cache.
      LOGGER.d(appAccessEndpoint.toString());
      AppRouter.this.defaultEndpoint = appAccessEndpoint;
      AppRouter.this.defaultEndpoint.setTtl(appAccessEndpoint.getTtl() + System.currentTimeMillis() / 1000);
      SystemSetting setting = AppConfiguration.getDefaultSetting();
      if (null != setting) {
        String endPoints = JSON.toJSONString(AppRouter.this.defaultEndpoint);
        setting.saveString(getPersistenceKeyZone(appId, true), appId, endPoints);
      }
      return AppRouter.this.defaultEndpoint;
    }
  });
}
 
源代码6 项目: java-unified-sdk   文件: StorageClient.java
public Observable wrapObservable(Observable observable) {
  if (null == observable) {
    return null;
  }
  if (asynchronized) {
    observable = observable.subscribeOn(Schedulers.io());
  }
  if (null != defaultCreator) {
    observable = observable.observeOn(defaultCreator.create());
  }
  observable = observable.onErrorResumeNext(new Function<Throwable, ObservableSource>() {
    @Override
    public ObservableSource apply(Throwable throwable) throws Exception {
      return Observable.error(ErrorUtils.propagateException(throwable));
    }
  });
  return observable;
}
 
源代码7 项目: burstkit4j   文件: RxTestUtils.java
public static <T> List<T> testObservable(Observable<T> observable, int awaitCount) {
    assertNotNull(observable);
    if (awaitCount == 1) {
        return Collections.singletonList(testSingle(observable.firstOrError()));
    }
    // If you don't do this it blocks trying to do the operation and therefore can't observe the results
    observable = observable.subscribeOn(Schedulers.io());
    TestObserver<T> observer = observable.test();
    observer.awaitCount(awaitCount);
    observer.assertNoErrors();
    assertEquals(awaitCount, observer.valueCount());
    observer.values().forEach(Assert::assertNotNull);
    return observer.values();
}
 
源代码8 项目: java-unified-sdk   文件: AppRouter.java
private Observable<RTMConnectionServerResponse> fetchRTMServerFromRemote(final String routerHost, final String appId,
                                                                         final String installationId, int secure) {
  LOGGER.d("fetchRTMServerFromRemote. router=" + routerHost + ", appId=" + appId
          + ", installationId=" + installationId);
  Retrofit tmpRetrofit = retrofit.newBuilder().baseUrl(routerHost).build();
  AppRouterService tmpService = tmpRetrofit.create(AppRouterService.class);
  Observable<RTMConnectionServerResponse> result = tmpService.getRTMConnectionServer(appId, installationId, secure);
  if (AppConfiguration.isAsynchronized()) {
    result = result.subscribeOn(Schedulers.io());
  }
  AppConfiguration.SchedulerCreator creator = AppConfiguration.getDefaultScheduler();
  if (null != creator) {
    result = result.observeOn(creator.create());
  }
  return result.map(new Function<RTMConnectionServerResponse, RTMConnectionServerResponse>() {
    @Override
    public RTMConnectionServerResponse apply(RTMConnectionServerResponse rtmConnectionServerResponse) throws Exception {
      SystemSetting setting = AppConfiguration.getDefaultSetting();
      if (null != rtmConnectionServerResponse && null != setting) {
        rtmConnectionServerResponse.setTtl(rtmConnectionServerResponse.getTtl() + System.currentTimeMillis() / 1000);
        String cacheResult = JSON.toJSONString(rtmConnectionServerResponse);
        setting.saveString(getPersistenceKeyZone(appId, false), routerHost, cacheResult);
      }
      return rtmConnectionServerResponse;
    }
  });
}
 
源代码9 项目: java-unified-sdk   文件: StorageClient.java
private Observable wrapObservableInBackground(Observable observable) {
  if (null == observable) {
    return null;
  }
  Scheduler scheduler = Schedulers.io();
  if (asynchronized) {
    observable = observable.subscribeOn(scheduler);
  }
  if (null != defaultCreator) {
    observable = observable.observeOn(scheduler);
  }
  return observable;
}
 
源代码10 项目: retrocache   文件: RxJava2CachedCallAdapter.java
@Override
public Object adapt(Call<R> call) {
    Observable<Response<R>> responseObservable = mAsync
            ? new CallEnqueueObservable<>(mCachingSystem, call, mResponseType, mAnnotations, mRetrofit)
            : new CallExecuteObservable<>(mCachingSystem, call, mResponseType, mAnnotations, mRetrofit);

    Observable<?> observable;
    if (mResult) {
        observable = new ResultObservable<>(responseObservable);
    } else if (mBody) {
        observable = new BodyObservable<>(responseObservable);
    } else {
        observable = responseObservable;
    }

    if (mScheduler != null) {
        observable = observable.subscribeOn(mScheduler);
    }

    if (mFlowable) {
        return observable.toFlowable(BackpressureStrategy.LATEST);
    }
    if (mSingle) {
        return observable.singleOrError();
    }
    if (mMaybe) {
        return observable.singleElement();
    }
    if (mCompletable) {
        return observable.ignoreElements();
    }
    return observable;
}
 
源代码11 项目: Capstone-Project   文件: UserProfilePresenter.java
@Override
public void getWebsite(final int userId) {
    Observable<String> observableWebsite = Observable.create(new ObservableOnSubscribe<String>() {
        @Override
        public void subscribe(ObservableEmitter<String> emitter) throws Exception {
            String website = PredatorDatabase.getInstance()
                    .getWebsiteForUser(userId);
            if (!TextUtils.isEmpty(website)) {
                emitter.onNext(website);
            } else {
                emitter.onError(new NullPointerException("No website available for this user."));
            }
            emitter.onComplete();
        }
    });

    observableWebsite.subscribeOn(Schedulers.io());
    observableWebsite.observeOn(AndroidSchedulers.mainThread());

    mCompositeDisposable.add(observableWebsite.subscribeWith(new DisposableObserver<String>(){
        @Override
        public void onNext(String value) {
            Logger.d("onNext: getWebsite: " + value);
            mView.websiteAvailable(value);
        }

        @Override
        public void onError(Throwable e) {
            Logger.e(TAG, "onError: " + e.getMessage(), e);
            mView.websiteUnavailable();
        }

        @Override
        public void onComplete() {
            Logger.d(TAG, "onComplete: fetched user website");
        }
    }));
}
 
源代码12 项目: cxf   文件: ObservableRxInvokerImpl.java
private <T> Observable<T> create(Supplier<T> supplier) {
    Observable<T> observable = Observable.create(new ObservableOnSubscribe<T>() {
        @Override
        public void subscribe(ObservableEmitter<T> emitter) throws Exception {
            try {
                T response = supplier.get();
                if (!emitter.isDisposed()) {
                    emitter.onNext(response);
                }
                
                if (!emitter.isDisposed()) {
                    emitter.onComplete();
                }
            } catch (Throwable e) {
                if (!emitter.isDisposed()) {
                    emitter.onError(e);
                }
            }
        }
    });
    
    if (sc == null) {
        return observable.subscribeOn(Schedulers.io());
    }
    
    return observable.subscribeOn(sc).observeOn(sc);
}
 
源代码13 项目: burstkit4j   文件: HttpBurstNodeService.java
private <T> Observable<T> assign(Observable<T> source) {
    return source.subscribeOn(BurstKitUtils.defaultBurstNodeServiceScheduler());
}
 
源代码14 项目: java-unified-sdk   文件: QueryResultCache.java
public Observable<String> getCacheRawResult(final String className, final String cacheKey,
                                            final long maxAgeInMilliseconds, final boolean isFinal) {
  LOGGER.d("try to get cache raw result for class:" + className);
  AppConfiguration.SchedulerCreator creator = AppConfiguration.getDefaultScheduler();
  boolean isAsync = AppConfiguration.isAsynchronized();

  Callable<String> callable = new Callable<String>() {
    public String call() throws Exception {
      File cacheFile = getCacheFile(cacheKey);
      if (null == cacheFile || !cacheFile.exists()) {
        LOGGER.d("cache file(key=" + cacheKey + ") not existed.");
        if (isFinal) {
          throw new FileNotFoundException("cache is not existed.");
        } else {
          return "";
        }
      }
      if (maxAgeInMilliseconds > 0 && (System.currentTimeMillis() - cacheFile.lastModified() > maxAgeInMilliseconds)) {
        LOGGER.d("cache file(key=" + cacheKey + ") is expired.");
        if (isFinal) {
          throw new FileNotFoundException("cache file is expired.");
        } else {
          return "";
        }
      }
      byte[] data = readData(cacheFile);
      if (null == data) {
        LOGGER.d("cache file(key=" + cacheKey + ") is empty.");
        if (isFinal) {
          throw new InterruptedException("failed to read cache file.");
        } else {
          return "";
        }
      }
      String content = new String(data, 0, data.length, "UTF-8");
      LOGGER.d("cache file(key=" + cacheKey + "), content: " + content);
      return content;
    }
  };
  FutureTask<String> futureTask = new FutureTask<>(callable);
  executor.submit(futureTask);
  Observable result = Observable.fromFuture(futureTask);
  if (isAsync) {
    result = result.subscribeOn(Schedulers.io());
  }
  if (null != creator) {
    result = result.observeOn(creator.create());
  }
  return result;
}
 
源代码15 项目: Capstone-Project   文件: MediaDetailsPresenter.java
@Override
public void getMedia(final int postId, final int mediaId) {
    Observable<MediaData> mediaObservable = Observable.create(new ObservableOnSubscribe<MediaData>() {
        @Override
        public void subscribe(ObservableEmitter<MediaData> emitter) throws Exception {
            // Fetch media from database.
            List<Media> media = PredatorDatabase.getInstance()
                    .getMediaForPost(postId);

            int defaultPosition = 0;

            if (media != null && !media.isEmpty()) {
                for (int i = 0; i < media.size(); i++) {
                    if (mediaId == media.get(i).getMediaId()) {
                        defaultPosition = i;
                    }
                }
                emitter.onNext(new MediaData(media, defaultPosition));
            } else {
                emitter.onError(new NullPointerException("No media available for this post"));
            }
            emitter.onComplete();
        }
    });

    mediaObservable.subscribeOn(Schedulers.io());
    mediaObservable.observeOn(AndroidSchedulers.mainThread());

    mCompositeDisposable.add(mediaObservable.subscribeWith(new DisposableObserver<MediaData>() {
        @Override
        public void onComplete() {
            // Done
        }

        @Override
        public void onError(Throwable e) {
            mView.unableToFetchMedia(e.getMessage());
        }

        @Override
        public void onNext(MediaData mediaData) {
            mView.showMedia(mediaData.getMedia(), mediaData.getDefaultPosition());
        }
    }));
}
 
源代码16 项目: Capstone-Project   文件: SettingsPresenter.java
@Override
public void clearCache() {
    Observable<Boolean> clearCacheObservable = Observable.create(new ObservableOnSubscribe<Boolean>() {
        @Override
        public void subscribe(ObservableEmitter<Boolean> emitter) throws Exception {
            // Delete all values from all tables except CATEGORY TABLE AS IT MUST STAY AS IT IS.
            // Delete all values from posts_table.
            PredatorDatabase.getInstance()
                    .deleteAllPosts();
            // Delete all values from users_table.
            PredatorDatabase.getInstance()
                    .deleteAllUsers();
            // Delete all values from comments_table.
            PredatorDatabase.getInstance()
                    .deleteAllComments();
            // Delete all values from install_links_table.
            PredatorDatabase.getInstance()
                    .deleteAllInstallLinks();
            // Delete all values from media_table.
            PredatorDatabase.getInstance()
                    .deleteAllMedia();
            // Delete all values from collections_table.
            PredatorDatabase.getInstance()
                    .deleteAllCollections();

            // Clear fresco cache.
            Fresco.getImagePipeline().clearCaches();

            emitter.onNext(true);
            emitter.onComplete();
        }
    });

    clearCacheObservable.subscribeOn(Schedulers.io());
    clearCacheObservable.observeOn(AndroidSchedulers.mainThread());

    mCompositeDisposable.add(clearCacheObservable.subscribeWith(new DisposableObserver<Boolean>() {
        @Override
        public void onComplete() {
            // Done
        }

        @Override
        public void onError(Throwable e) {
            Logger.e(TAG, "onError: " + e.getMessage(), e);
            mView.unableToWipeCache();
        }

        @Override
        public void onNext(Boolean aBoolean) {
            Logger.d(TAG, "onNext: cache cleared");
            mView.cacheCleared();
        }
    }));
}