下面列出了io.reactivex.Observable#fromCallable ( ) 实例代码,或者点击链接到github查看源代码,也可以在右侧发表评论。
public Observable<AdamantApi> build(int index) {
return Observable.fromCallable(() -> {
OkHttpClient.Builder httpClient = new OkHttpClient.Builder();
addInterceptors(httpClient);
if (currentServerNode != null){
currentServerNode.setStatus(ServerNode.Status.CONNECTING);
}
currentServerNode = nodes.get(index);
currentServerNode.setStatus(ServerNode.Status.CONNECTED);
Retrofit retrofit = new Retrofit.Builder()
.baseUrl(currentServerNode.getUrl() + BuildConfig.API_BASE)
.addConverterFactory(GsonConverterFactory.create(gson))
.addCallAdapterFactory(RxJava2CallAdapterFactory.create())
.client(httpClient.build())
.build();
return retrofit.create(AdamantApi.class);
});
}
public Observable<EosPrivateKey[]> createKey( int count ) {
return Observable.fromCallable( () -> {
EosPrivateKey[] retKeys = new EosPrivateKey[ count ];
for ( int i = 0; i < count; i++) {
retKeys[i] = new EosPrivateKey();
}
return retKeys;
} );
}
@Override
public Observable<Enrollment> getEnrollment(String programUid, String teiUid) {
String progId = programUid == null ? "" : programUid;
String teiId = teiUid == null ? "" : teiUid;
return Observable.fromCallable(() -> d2.enrollmentModule().enrollments().byTrackedEntityInstance().eq(teiId)
.byProgram().eq(progId).one().blockingGet());
}
@Override
public Observable<Boolean> saveQuestionList(final List<Question> questionList) {
return Observable.fromCallable(new Callable<Boolean>() {
@Override
public Boolean call() throws Exception {
mAppDatabase.questionDao().insertAll(questionList);
return true;
}
});
}
private Observable<String> getObservable() {
return Observable.fromCallable(new Callable<String>() {
@Override
public String call() throws Exception {
try {
Thread.sleep(1000); // 假设此处是耗时操作
} catch (Exception e) {
e.printStackTrace();
}
return "3";
}
});
}
public <T> Observable<T> insertOrReplaceRx(Class<T> entityClass, T entity) {
return Observable.fromCallable(() -> {
AbstractDao<T, ?> abstractDao = getAbstractDao(entityClass);
abstractDao.insertOrReplace(entity);
return entity;
});
}
@Override
public Observable<List<User>> getAllUsers() {
return Observable.fromCallable(new Callable<List<User>>() {
@Override
public List<User> call() throws Exception {
return mDaoSession.getUserDao().loadAll();
}
});
}
public Observable<Boolean> saveOptionList(final List<Option> optionList) {
return Observable.fromCallable(new Callable<Boolean>() {
@Override
public Boolean call() throws Exception {
mDaoSession.getOptionDao().insertInTx(optionList);
return true;
}
});
}
/**
* Deletes one or more records from a table, non-blocking operation.
*
* @return a {@link Observable<Integer>} where the number of records deleted is passed
* as the parameter to {@link io.reactivex.observers.DisposableObserver#onNext(Object)}
*/
@Override
public Observable<Integer> asObservable() {
return Observable.fromCallable(new Callable<Integer>() {
@Override
public Integer call() {
return executeBlocking();
}
});
}
public Observable<Boolean> saveQuestion(final Question question) {
return Observable.fromCallable(new Callable<Boolean>() {
@Override
public Boolean call() throws Exception {
mDaoSession.getQuestionDao().insert(question);
return true;
}
});
}
@NonNull
@Override
public Observable<List<Media>> fromTracksSearch(@Nullable final String query) {
return Observable.fromCallable(() -> queueFromTracksSearch(query));
}
public Observable<QmsChatModel> getChat(final int userId, final int themeId) {
return Observable.fromCallable(() -> transform(Api.Qms().getChat(userId, themeId), false));
}
public Observable<Object> markRead(int id) {
return Observable.fromCallable(() -> Api.Forum().markRead(id));
}
public Observable<ArrayList<QmsMessage>> sendMessage(int userId, int themeId, String text) {
return Observable.fromCallable(() -> Api.Qms().sendMessage(userId, themeId, text));
}
private Observable<TokenScriptResult.Attribute> resultFromDatabase(TransactionResult transactionResult, Attribute attr)
{
return Observable.fromCallable(() -> parseFunctionResult(transactionResult, attr));
}
public Observable<Boolean> likeComment(int articleId, int commentId) {
return Observable.fromCallable(() -> Api.NewsApi().likeComment(articleId, commentId));
}
public Observable<EditPostForm> loadForm(int postId) {
return Observable.fromCallable(() -> Api.EditPost().loadForm(postId));
}
public Observable<Object> markAllRead() {
return Observable.fromCallable(() -> Api.Forum().markAllRead());
}
public Observable<Device> getDevice(String devId) {
return Observable.fromCallable(() -> Api.DevDb().getDevice(devId));
}
@Override
public Object adapt(HttpLite lite, RequestCreator creator, Type returnType, Object... args) throws Exception {
return Observable.fromCallable(CallOnSubscribe.newInstance(lite,creator,returnType,args));
}