io.reactivex.Flowable#fromCallable ( )源码实例Demo

下面列出了io.reactivex.Flowable#fromCallable ( ) 实例代码,或者点击链接到github查看源代码,也可以在右侧发表评论。

源代码1 项目: immutables   文件: GeodeBackend.java
private Publisher<?> executeInternal(Operation operation) {
  if (operation instanceof StandardOperations.Select) {
    return Flowable.fromCallable(new SyncSelect(this, (StandardOperations.Select) operation)).flatMapIterable(x -> x);
  } else if (operation instanceof StandardOperations.Update) {
    return Flowable.fromCallable(new SyncUpdate(this, (StandardOperations.Update) operation));
  } else if (operation instanceof StandardOperations.Insert) {
    return Flowable.fromCallable(new SyncInsert(this, (StandardOperations.Insert) operation));
  } else if (operation instanceof StandardOperations.Delete) {
    return Flowable.fromCallable(new SyncDelete(this, (StandardOperations.Delete) operation));
  } else if (operation instanceof StandardOperations.Watch) {
    return watch((StandardOperations.Watch) operation);
  } else if (operation instanceof StandardOperations.DeleteByKey) {
    return Flowable.fromCallable(new SyncDeleteByKey(this, (StandardOperations.DeleteByKey) operation));
  } else if (operation instanceof StandardOperations.GetByKey) {
    return Flowable.fromCallable(new SyncGetByKey(this, (StandardOperations.GetByKey) operation)).flatMapIterable(x -> x);
  }

  return Flowable.error(new UnsupportedOperationException(String.format("Operation %s not supported by %s",
          operation, GeodeBackend.class.getSimpleName())));
}
 
private Flowable<? extends MutableHttpResponse<?>> handleException(MicronautAwsProxyRequest<?> containerRequest, MicronautAwsProxyResponse<?> containerResponse, Throwable throwable, ExceptionHandler exceptionHandler) {
    return Flowable.fromCallable(() -> {
        Object result = exceptionHandler.handle(containerRequest, throwable);
        MutableHttpResponse<?> response = errorResultToResponse(result);
        containerResponse.status(response.getStatus());
        response.getContentType().ifPresent(containerResponse::contentType);
        response.getBody().ifPresent(((MutableHttpResponse) containerResponse)::body);
        return response;
    });
}
 
@Override
public Flowable<Optional<Task>> getTask(@NonNull String taskId) {
  if (fail) return Flowable.error(new RuntimeException("Could not load task"));
  return Flowable.fromCallable(
      () -> {
        for (Task task : tasks) {
          if (task.id().equals(taskId)) return Optional.of(task);
        }
        return Optional.absent();
      });
}
 
@Override
public Flowable<Enrollment> completeEnrollment(@NonNull String enrollmentUid) {
    return Flowable.fromCallable(() -> {
        d2.enrollmentModule().enrollments().uid(enrollmentUid)
                .setStatus(EnrollmentStatus.COMPLETED);
        return d2.enrollmentModule().enrollments().uid(enrollmentUid).blockingGet();
    });
}
 
@Override
public Flowable<Boolean> isCompleted(String orgUnitUid, String periodInitialDate, String catCombo) {

    return Flowable.fromCallable(() -> {
        DataSetCompleteRegistration completeRegistration = d2.dataSetModule().dataSetCompleteRegistrations()
                .byDataSetUid().eq(dataSetUid)
                .byAttributeOptionComboUid().eq(catCombo)
                .byPeriod().eq(periodInitialDate)
                .byOrganisationUnitUid().eq(orgUnitUid)
                .one().blockingGet();
        return completeRegistration != null && !completeRegistration.deleted();
    });
}
 
@Override
public Flowable<Boolean> isApproval(String orgUnit, String period, String attributeOptionCombo) {
    return Flowable.fromCallable(() -> {
        DataApproval dataApproval = d2.dataSetModule().dataApprovals()
                .byOrganisationUnitUid().eq(orgUnit)
                .byPeriodId().eq(period)
                .byAttributeOptionComboUid().eq(attributeOptionCombo)
                .one().blockingGet();
        return dataApproval != null && dataApproval.state().equals(DataApprovalState.APPROVED_HERE);
    });
}
 
@NonNull
@Override
public Flowable<String> getPeriodId(PeriodType periodType, Date date) {
    return Flowable.fromCallable(() -> {
        if (d2.periodModule().periodHelper().getPeriod(periodType, date) == null)
            d2.periodModule().periodHelper().blockingGetPeriodsForDataSet(dataSetUid);

        return d2.periodModule().periodHelper().getPeriod(periodType, date).periodId();
    });
}
 
源代码8 项目: RxCupboard   文件: RxContentProvider.java
public <T> Flowable<T> get(final Class<T> entityClass, final long id) {
	return Flowable.fromCallable(new Callable<T>() {
		@Override
		public T call() throws Exception {
			Uri getUri = ContentUris.withAppendedId(uri, id);
			return provider.get(getUri, entityClass);
		}
	});
}
 
源代码9 项目: cyclops   文件: FlowableIO.java
public static <T> IO<T> of(Supplier<? extends T> s, Scheduler ex){
    Flowable<T> x = Flowable.fromCallable(() -> s.get());
    x = x.subscribeOn(ex);
    return new FlowableIO<T>(x);
}
 
@Override
public Flowable<String> getCatComboName(String catcomboUid) {
    return Flowable.fromCallable(() -> d2.categoryModule().categoryOptionCombos().uid(catcomboUid).blockingGet().displayName());
}
 
@Override
public Flowable<Period> getPeriod(String periodId) {
    return Flowable.fromCallable(() -> d2.periodModule().periods().byPeriodId().eq(periodId).one().blockingGet());
}
 
@Override
public Flowable<List<DataInputPeriod>> getDataInputPeriod() {
    return Flowable.fromCallable(() -> d2.dataSetModule().dataSets().withDataInputPeriods().byUid().eq(dataSetUid).one().blockingGet().dataInputPeriods());
}
 
@Override
public Flowable<DataSet> getDataSet() {
    return Flowable.fromCallable(() -> d2.dataSetModule().dataSets().byUid().eq(dataSetUid).one().blockingGet());

}
 
@NonNull
@Override
public Flowable<UserCredentials> credentials() {
    return Flowable.fromCallable(() -> d2.userModule().userCredentials().blockingGet());
}
 
源代码15 项目: adamant-android   文件: AccountInteractor.java
public Flowable<BigDecimal> getAdamantBalance() {
    return Flowable.fromCallable(this::getBalance);
}
 
源代码16 项目: adamant-android   文件: ChatHistoryInteractor.java
@MainThread
public Flowable<List<MessageListContent>> loadMoreMessages() {
    if (chatId == null) {
        return Flowable.error(new IllegalStateException("Chat setChatId must be called"));
    }
    if (!haveMoreMessages()) {
        return Flowable.fromCallable(() -> chatsStorage.getMessagesByCompanionId(chatId));
    }
    if (loadingMessagesFlowable == null) {
        loadingMessagesFlowable = historySource.execute(chatId, getCurrentOffset(), PAGE_SIZE)
                .observeOn(AndroidSchedulers.mainThread())
                .doOnNext(transaction -> updateHeight(transaction.getHeight()))
                .observeOn(Schedulers.computation())
                .map(transaction -> keyStorage.combinePublicKeyWithTransaction(transaction))
                .flatMap(pair -> Flowable.just(pair)
                        .map(transaction -> (MessageListContent) messageMapper.apply(transaction))
                        .onErrorReturn(FallbackMessage::createMessageFromThrowable)
                )
                .toList()
                .toFlowable()
                .doOnNext(messages -> {
                    for (MessageListContent messageListContent : messages) {
                        chatsStorage.addMessageToChat(messageListContent);
                    }
                })
                .doOnComplete(() -> chatsStorage.updateLastMessages())
                .observeOn(AndroidSchedulers.mainThread())
                .doOnNext(ignored -> {
                    loadingMessagesFlowable = null;
                    currentPage++;
                })
                .observeOn(Schedulers.computation())
                .map(ignored -> chatsStorage.getMessagesByCompanionId(chatId))
                .retry(throwable -> throwable instanceof IOException)
                .doOnError(e -> {
                    loadingMessagesFlowable = null;
                    if (!(e instanceof IOException)) {
                        currentPage++;
                    }
                })
                .share();
    }
    return loadingMessagesFlowable;
}
 
源代码17 项目: cyclops   文件: FlowableIO.java
public static <T> IO<T> of(Supplier<? extends T> s){
    return new FlowableIO<T>(Flowable.fromCallable(()->s.get()));
}
 
源代码18 项目: HighLite   文件: GetSingleOperation.java
/**
 * Fetches a single row from a database and maps it to and object of type {@link T},
 * non-blocking operation.
 *
 * @param strategy the backpressure strategy used for the {@link Flowable}.
 *                 (see {@link BackpressureStrategy})
 * @return a {@link Flowable<T>} where an object of type {@link T} mapped from a database
 * record is passed as the parameter to
 * {@link io.reactivex.observers.DisposableObserver#onNext(Object)}
 */
@Override
public Flowable<T> asFlowable(BackpressureStrategy strategy) {
    return Flowable.fromCallable(new Callable<T>() {
        @Override
        public T call() throws Exception {
            return executeBlocking();
        }
    });
}
 
源代码19 项目: web3sdk   文件: RemoteCall.java
/**
 * Provide an flowable to emit result from our function.
 *
 * @return an flowable
 */
public Flowable<T> flowable() {
    return Flowable.fromCallable(this::send);
}
 
源代码20 项目: web3j   文件: RemoteCall.java
/**
 * Provide an flowable to emit result from our function.
 *
 * @return an flowable
 */
public Flowable<T> flowable() {
    return Flowable.fromCallable(this::send);
}