下面列出了io.reactivex.Flowable#fromCallable ( ) 实例代码,或者点击链接到github查看源代码,也可以在右侧发表评论。
private Publisher<?> executeInternal(Operation operation) {
if (operation instanceof StandardOperations.Select) {
return Flowable.fromCallable(new SyncSelect(this, (StandardOperations.Select) operation)).flatMapIterable(x -> x);
} else if (operation instanceof StandardOperations.Update) {
return Flowable.fromCallable(new SyncUpdate(this, (StandardOperations.Update) operation));
} else if (operation instanceof StandardOperations.Insert) {
return Flowable.fromCallable(new SyncInsert(this, (StandardOperations.Insert) operation));
} else if (operation instanceof StandardOperations.Delete) {
return Flowable.fromCallable(new SyncDelete(this, (StandardOperations.Delete) operation));
} else if (operation instanceof StandardOperations.Watch) {
return watch((StandardOperations.Watch) operation);
} else if (operation instanceof StandardOperations.DeleteByKey) {
return Flowable.fromCallable(new SyncDeleteByKey(this, (StandardOperations.DeleteByKey) operation));
} else if (operation instanceof StandardOperations.GetByKey) {
return Flowable.fromCallable(new SyncGetByKey(this, (StandardOperations.GetByKey) operation)).flatMapIterable(x -> x);
}
return Flowable.error(new UnsupportedOperationException(String.format("Operation %s not supported by %s",
operation, GeodeBackend.class.getSimpleName())));
}
private Flowable<? extends MutableHttpResponse<?>> handleException(MicronautAwsProxyRequest<?> containerRequest, MicronautAwsProxyResponse<?> containerResponse, Throwable throwable, ExceptionHandler exceptionHandler) {
return Flowable.fromCallable(() -> {
Object result = exceptionHandler.handle(containerRequest, throwable);
MutableHttpResponse<?> response = errorResultToResponse(result);
containerResponse.status(response.getStatus());
response.getContentType().ifPresent(containerResponse::contentType);
response.getBody().ifPresent(((MutableHttpResponse) containerResponse)::body);
return response;
});
}
@Override
public Flowable<Optional<Task>> getTask(@NonNull String taskId) {
if (fail) return Flowable.error(new RuntimeException("Could not load task"));
return Flowable.fromCallable(
() -> {
for (Task task : tasks) {
if (task.id().equals(taskId)) return Optional.of(task);
}
return Optional.absent();
});
}
@Override
public Flowable<Enrollment> completeEnrollment(@NonNull String enrollmentUid) {
return Flowable.fromCallable(() -> {
d2.enrollmentModule().enrollments().uid(enrollmentUid)
.setStatus(EnrollmentStatus.COMPLETED);
return d2.enrollmentModule().enrollments().uid(enrollmentUid).blockingGet();
});
}
@Override
public Flowable<Boolean> isCompleted(String orgUnitUid, String periodInitialDate, String catCombo) {
return Flowable.fromCallable(() -> {
DataSetCompleteRegistration completeRegistration = d2.dataSetModule().dataSetCompleteRegistrations()
.byDataSetUid().eq(dataSetUid)
.byAttributeOptionComboUid().eq(catCombo)
.byPeriod().eq(periodInitialDate)
.byOrganisationUnitUid().eq(orgUnitUid)
.one().blockingGet();
return completeRegistration != null && !completeRegistration.deleted();
});
}
@Override
public Flowable<Boolean> isApproval(String orgUnit, String period, String attributeOptionCombo) {
return Flowable.fromCallable(() -> {
DataApproval dataApproval = d2.dataSetModule().dataApprovals()
.byOrganisationUnitUid().eq(orgUnit)
.byPeriodId().eq(period)
.byAttributeOptionComboUid().eq(attributeOptionCombo)
.one().blockingGet();
return dataApproval != null && dataApproval.state().equals(DataApprovalState.APPROVED_HERE);
});
}
@NonNull
@Override
public Flowable<String> getPeriodId(PeriodType periodType, Date date) {
return Flowable.fromCallable(() -> {
if (d2.periodModule().periodHelper().getPeriod(periodType, date) == null)
d2.periodModule().periodHelper().blockingGetPeriodsForDataSet(dataSetUid);
return d2.periodModule().periodHelper().getPeriod(periodType, date).periodId();
});
}
public <T> Flowable<T> get(final Class<T> entityClass, final long id) {
return Flowable.fromCallable(new Callable<T>() {
@Override
public T call() throws Exception {
Uri getUri = ContentUris.withAppendedId(uri, id);
return provider.get(getUri, entityClass);
}
});
}
public static <T> IO<T> of(Supplier<? extends T> s, Scheduler ex){
Flowable<T> x = Flowable.fromCallable(() -> s.get());
x = x.subscribeOn(ex);
return new FlowableIO<T>(x);
}
@Override
public Flowable<String> getCatComboName(String catcomboUid) {
return Flowable.fromCallable(() -> d2.categoryModule().categoryOptionCombos().uid(catcomboUid).blockingGet().displayName());
}
@Override
public Flowable<Period> getPeriod(String periodId) {
return Flowable.fromCallable(() -> d2.periodModule().periods().byPeriodId().eq(periodId).one().blockingGet());
}
@Override
public Flowable<List<DataInputPeriod>> getDataInputPeriod() {
return Flowable.fromCallable(() -> d2.dataSetModule().dataSets().withDataInputPeriods().byUid().eq(dataSetUid).one().blockingGet().dataInputPeriods());
}
@Override
public Flowable<DataSet> getDataSet() {
return Flowable.fromCallable(() -> d2.dataSetModule().dataSets().byUid().eq(dataSetUid).one().blockingGet());
}
@NonNull
@Override
public Flowable<UserCredentials> credentials() {
return Flowable.fromCallable(() -> d2.userModule().userCredentials().blockingGet());
}
public Flowable<BigDecimal> getAdamantBalance() {
return Flowable.fromCallable(this::getBalance);
}
@MainThread
public Flowable<List<MessageListContent>> loadMoreMessages() {
if (chatId == null) {
return Flowable.error(new IllegalStateException("Chat setChatId must be called"));
}
if (!haveMoreMessages()) {
return Flowable.fromCallable(() -> chatsStorage.getMessagesByCompanionId(chatId));
}
if (loadingMessagesFlowable == null) {
loadingMessagesFlowable = historySource.execute(chatId, getCurrentOffset(), PAGE_SIZE)
.observeOn(AndroidSchedulers.mainThread())
.doOnNext(transaction -> updateHeight(transaction.getHeight()))
.observeOn(Schedulers.computation())
.map(transaction -> keyStorage.combinePublicKeyWithTransaction(transaction))
.flatMap(pair -> Flowable.just(pair)
.map(transaction -> (MessageListContent) messageMapper.apply(transaction))
.onErrorReturn(FallbackMessage::createMessageFromThrowable)
)
.toList()
.toFlowable()
.doOnNext(messages -> {
for (MessageListContent messageListContent : messages) {
chatsStorage.addMessageToChat(messageListContent);
}
})
.doOnComplete(() -> chatsStorage.updateLastMessages())
.observeOn(AndroidSchedulers.mainThread())
.doOnNext(ignored -> {
loadingMessagesFlowable = null;
currentPage++;
})
.observeOn(Schedulers.computation())
.map(ignored -> chatsStorage.getMessagesByCompanionId(chatId))
.retry(throwable -> throwable instanceof IOException)
.doOnError(e -> {
loadingMessagesFlowable = null;
if (!(e instanceof IOException)) {
currentPage++;
}
})
.share();
}
return loadingMessagesFlowable;
}
public static <T> IO<T> of(Supplier<? extends T> s){
return new FlowableIO<T>(Flowable.fromCallable(()->s.get()));
}
/**
* Fetches a single row from a database and maps it to and object of type {@link T},
* non-blocking operation.
*
* @param strategy the backpressure strategy used for the {@link Flowable}.
* (see {@link BackpressureStrategy})
* @return a {@link Flowable<T>} where an object of type {@link T} mapped from a database
* record is passed as the parameter to
* {@link io.reactivex.observers.DisposableObserver#onNext(Object)}
*/
@Override
public Flowable<T> asFlowable(BackpressureStrategy strategy) {
return Flowable.fromCallable(new Callable<T>() {
@Override
public T call() throws Exception {
return executeBlocking();
}
});
}
/**
* Provide an flowable to emit result from our function.
*
* @return an flowable
*/
public Flowable<T> flowable() {
return Flowable.fromCallable(this::send);
}
/**
* Provide an flowable to emit result from our function.
*
* @return an flowable
*/
public Flowable<T> flowable() {
return Flowable.fromCallable(this::send);
}