下面列出了io.reactivex.Observable#flatMap ( ) 实例代码,或者点击链接到github查看源代码,也可以在右侧发表评论。
@Override
public ObservableSource<?> apply(Observable<Throwable> throwableObservable) throws Exception {
return throwableObservable.flatMap(new Function<Throwable, ObservableSource<?>>() {
@Override
public ObservableSource<?> apply(Throwable throwable) throws Exception {
Logger.t(TAG).d("Error:::" + throwable);
if (throwable instanceof SocketTimeoutException && ++tryTimes <= maxTryTime) {
Logger.t(TAG).d("超时重试第【" + (tryTimes - 1) + "】次");
return Observable.timer(mInterval, TimeUnit.SECONDS);
} else if (throwable instanceof CompositeException) {
CompositeException compositeException = (CompositeException) throwable;
//结合rxcache会把异常进行包裹才会返回,需要解析提取
for (Throwable innerthrowable : compositeException.getExceptions()) {
if (innerthrowable instanceof SocketTimeoutException && ++tryTimes <= maxTryTime) {
Logger.t(TAG).d("带Rxcache超时重试第【" + (tryTimes - 1) + "】次");
return Observable.timer(mInterval, TimeUnit.SECONDS);
}
}
}
return Observable.error(throwable);
}
});
}
/**
* Creates an {@link ObservableTransformer} that will flatten the provided {@link Function} into
* the stream as an {@link Observable} every time it receives an effect from the upstream effects
* observable. This will result in calling the function on the specified scheduler, and passing it
* the requested effect object then emitting its returned value.
*
* @param function the {@link Function} to be invoked every time the effect is requested
* @param scheduler the {@link Scheduler} to be used when invoking the function
* @param <F> the type of Effect this transformer handles
* @param <E> the type of Event this transformer emits
* @return an {@link ObservableTransformer} that can be used with a {@link
* SubtypeEffectHandlerBuilder}.
*/
static <F, E> ObservableTransformer<F, E> fromFunction(
final Function<F, E> function, @Nullable final Scheduler scheduler) {
return new ObservableTransformer<F, E>() {
@Override
public ObservableSource<E> apply(Observable<F> effectStream) {
return effectStream.flatMap(
new Function<F, ObservableSource<E>>() {
@Override
public ObservableSource<E> apply(final F f) {
Observable<E> eventObservable =
Observable.fromCallable(
new Callable<E>() {
@Override
public E call() throws Exception {
return function.apply(f);
}
});
return scheduler == null ? eventObservable : eventObservable.subscribeOn(scheduler);
}
});
}
};
}
@Override
public ObservableSource<?> apply(Observable<Throwable> throwableObservable) throws Exception {
return throwableObservable.flatMap(new Function<Throwable, ObservableSource<?>>() {
@Override
public ObservableSource<?> apply(Throwable throwable) throws Exception {
Logger.t(TAG).d("Error:::" + throwable);
if (throwable instanceof SocketTimeoutException && ++tryTimes <= maxTryTime) {
Logger.t(TAG).d("超时重试第【" + (tryTimes - 1) + "】次");
return Observable.timer(mInterval, TimeUnit.SECONDS);
} else if (throwable instanceof CompositeException) {
CompositeException compositeException = (CompositeException) throwable;
//结合rxcache会把异常进行包裹才会返回,需要解析提取
for (Throwable innerthrowable : compositeException.getExceptions()) {
if (innerthrowable instanceof SocketTimeoutException && ++tryTimes <= maxTryTime) {
Logger.t(TAG).d("带Rxcache超时重试第【" + (tryTimes - 1) + "】次");
return Observable.timer(mInterval, TimeUnit.SECONDS);
}
}
}
return Observable.error(throwable);
}
});
}
/**
* Save object in background.
* @param option save option.
* @return observable instance.
*/
public Observable<? extends AVObject> saveInBackground(final AVSaveOption option) {
Map<AVObject, Boolean> markMap = new HashMap<>();
if (hasCircleReference(markMap)) {
return Observable.error(new AVException(AVException.CIRCLE_REFERENCE, "Found a circular dependency when saving."));
}
Observable<List<AVObject>> needSaveFirstly = generateCascadingSaveObjects();
return needSaveFirstly.flatMap(new Function<List<AVObject>, Observable<? extends AVObject>>() {
@Override
public Observable<? extends AVObject> apply(List<AVObject> objects) throws Exception {
logger.d("First, try to execute save operations in thread: " + Thread.currentThread());
for (AVObject o: objects) {
o.save();
}
logger.d("Second, save object itself...");
return saveSelfOperations(option);
}
});
}
/**
* 对结果进行预处理
*/
public static <T> ObservableTransformer<BaseResponse<T>, T> handleResult(){
return new ObservableTransformer<BaseResponse<T>, T>() {
@Override
public ObservableSource<T> apply(Observable<BaseResponse<T>> upstream) {
return upstream.flatMap(new Function<BaseResponse<T>, ObservableSource<T>>() {
@Override
public ObservableSource<T> apply(BaseResponse<T> tBaseResponse) throws Exception {
if(tBaseResponse.getErrorCode() == 0){
return createObservable(tBaseResponse.getData());//创建我们需要的数据
}
return Observable.error(
new ApiException(tBaseResponse.getErrorCode(), tBaseResponse.getErrorMsg())//创建一个异常
);
}
});
}
};
}
/**
* 统一返回结果处理
* @param <T> 指定的泛型类型
* @return ObservableTransformer
*/
public static <T> ObservableTransformer<BaseResponse<T>, T> handleResult() {
return new ObservableTransformer<BaseResponse<T>, T>() {
@Override
public ObservableSource<T> apply(Observable<BaseResponse<T>> upstream) {
return upstream.flatMap(new Function<BaseResponse<T>, ObservableSource<T>>() {
@Override
public ObservableSource<T> apply(BaseResponse<T> baseResponse) throws Exception {
if (baseResponse.getErrorCode() == BaseResponse.SUCCESS
&& baseResponse.getData() != null
) {
return createData(baseResponse.getData());
} else {
return Observable.error(new Exception(baseResponse.getErrorMsg()));
}
}
});
}
};
}
/**
* Applies this function to the given argument.
*
* @param observable the function argument
* @return the function result
*/
@Override
public Observable<?> apply(Observable<? extends Throwable> observable) {
return observable.flatMap(new Function<Throwable, ObservableSource<?>>() {
@Override
public ObservableSource<?> apply(Throwable throwable) {
//未连接网络直接返回异常
if (!NetworkUtil.isConnected(mContext)) {
return Observable.error(throwable);
}
//仅仅对连接失败相关错误进行重试
if (throwable instanceof ConnectException
|| throwable instanceof UnknownHostException
|| throwable instanceof SocketTimeoutException
|| throwable instanceof SocketException
|| throwable instanceof TimeoutException) {
if (++mRetryCount <= mRetryMaxTime) {
LoggerManager.e(TAG, "网络请求错误,将在 " + mRetryDelay + " ms后进行重试, 重试次数 " + mRetryCount + ";throwable:" + throwable);
return Observable.timer(mRetryDelay, TimeUnit.MILLISECONDS);
}
}
return Observable.error(throwable);
}
});
}
public static <T> Observable<CacheResult<T>> loadRemoteSync(final RxCache rxCache, final String key, Observable<T> source, final CacheTarget target, final boolean needEmpty) {
Observable<CacheResult<T>> observable = source
.flatMap(new Function<T, ObservableSource<CacheResult<T>>>() {
@Override
public ObservableSource<CacheResult<T>> apply(@NonNull T t) throws Exception {
return saveCacheSync(rxCache, key, t, target);
}
});
if (needEmpty) {
observable = observable.onErrorResumeNext(new Function<Throwable, ObservableSource<? extends CacheResult<T>>>() {
@Override
public ObservableSource<? extends CacheResult<T>> apply(@NonNull Throwable throwable) throws Exception {
return Observable.empty();
}
});
}
return observable;
}
static Observable<int[]> cartesian(List<Observable<Integer>> sources) {
if (sources.size() == 0) {
return Observable.<int[]>empty();
}
Observable<int[]> main = sources.get(0).map(v -> new int[] { v });
for (int i = 1; i < sources.size(); i++) {
int j = i;
Observable<Integer> o = sources.get(i).cache();
main = main.flatMap(v -> {
return o.map(w -> {
int[] arr = Arrays.copyOf(v, j + 1);
arr[j] = w;
return arr;
});
});
}
return main;
}
@Override
public Observable<AggregateTransaction> announceAggregateBonded(
Listener listener, SignedTransaction signedAggregateTransaction) {
Validate.notNull(signedAggregateTransaction, "signedAggregateTransaction is required");
Validate.isTrue(signedAggregateTransaction.getType() == TransactionType.AGGREGATE_BONDED,
"signedAggregateTransaction type must be AGGREGATE_BONDED");
Observable<TransactionAnnounceResponse> announce = transactionRepository
.announceAggregateBonded(signedAggregateTransaction);
return announce.flatMap(
r -> listener.aggregateBondedAddedOrError(signedAggregateTransaction.getSigner().getAddress(),
signedAggregateTransaction.getHash()));
}
/**
* 对结果进行预处理,只返回成功的结果
*/
public static <T> ObservableTransformer<BaseResponse<T>, T> handleRequest2(){
return new ObservableTransformer<BaseResponse<T>, T>() {
@Override
public ObservableSource<T> apply(Observable<BaseResponse<T>> upstream) {
return upstream.flatMap(new Function<BaseResponse<T>, ObservableSource<T>>() {
@Override
public ObservableSource<T> apply(BaseResponse<T> tBaseResponse) throws Exception {
return createObservable(tBaseResponse.getData());
}
});
}
};
}
private ObservableSource<?> handleWsError(Observable<Throwable> throwableObservable) {
AtomicInteger counter = new AtomicInteger();
return throwableObservable.flatMap(throwable -> {
if (throwable instanceof HttpException) {
return Observable.just(throwable)
.takeWhile(__ -> counter.getAndIncrement() != 5)
.flatMap(__ -> Observable.timer(5, TimeUnit.SECONDS));
} else {
return Observable.just(throwable);
}
});
}
@Override
public Observable<List<Movie>> getPopularMovies(int page) {
Observable<DiscoverMoviesResponse> discoverMoviesResponseObservable =
apiService.discover("popularity.desc", page, ApiUtils.getApiKey());
return discoverMoviesResponseObservable
.flatMap(new Function<DiscoverMoviesResponse, ObservableSource<? extends List<Movie>>>() {
@Override
public ObservableSource<? extends List<Movie>> apply(DiscoverMoviesResponse discoverMoviesResponse)
throws Exception {
return Observable.just(discoverMoviesResponse.getResults());
}
});
}
@Override public ObservableSource<?> apply(final Observable<? extends Throwable> observable)
throws Exception {
return observable.flatMap(new Function<Throwable, ObservableSource<?>>() {
@Override public ObservableSource<?> apply(Throwable throwable) throws Exception {
if (++mRetryCount < mMaxRetryCount) {
return Observable.timer(mRetryDelay, mTimeUnit);
}
return Observable.error(throwable);
}
});
}
@Override
public Observable<LongWriteFailure> apply(Observable<LongWriteFailure> observable) {
return observable.flatMap(new Function<LongWriteFailure, Observable<LongWriteFailure>>() {
@Override
public Observable<LongWriteFailure> apply(LongWriteFailure longWriteFailure) {
return Observable.error(longWriteFailure.getCause());
}
});
}
@NonNull
@Override
public Observable<List<Media>> fromFile(@NonNull final Uri uri) {
final Observable<List<Media>> fromProvider = mMediaProvider.load(
MediaStore.Audio.Media.DATA.concat("=?"),
new String[]{uri.getPath()},
null,
null);
return fromProvider.flatMap(queue -> queue.isEmpty()
? Observable.fromCallable(() -> queueFromFile(uri))
: Observable.just(queue));
}
public static void main(String[] args) {
Observable<List<Integer>> source = Observable.empty();
Observable<List<Integer>> fallbackSource = Observable.empty();
source.flatMap(list -> {
if (list.isEmpty()) {
return fallbackSource;
}
return Observable.just(list);
});
}
public Observable<Boolean> transactionFromAddress(final Transaction transaction, final Address address,
final Observable<List<NamespaceId>> namespaceIdsObservable) {
if (transaction.getSigner().filter(s -> s.getAddress().equals(address)).isPresent()) {
return Observable.just(true);
}
if (transaction instanceof AggregateTransaction) {
final AggregateTransaction aggregateTransaction = (AggregateTransaction) transaction;
if (aggregateTransaction.getCosignatures().stream()
.anyMatch(c -> c.getSigner().getAddress().equals(address))) {
return Observable.just(true);
}
//Recursion...
Observable<Transaction> innerTransactionObservable = Observable
.fromIterable(aggregateTransaction.getInnerTransactions());
return innerTransactionObservable
.flatMap(t -> this.transactionFromAddress(t, address, namespaceIdsObservable).filter(a -> a))
.first(false).toObservable();
}
if (transaction instanceof PublicKeyLinkTransaction) {
return Observable.just(Address
.createFromPublicKey(((PublicKeyLinkTransaction) transaction).getLinkedPublicKey().toHex(),
transaction.getNetworkType()).equals(address));
}
if (transaction instanceof MetadataTransaction) {
MetadataTransaction metadataTransaction = (MetadataTransaction) transaction;
return Observable.just(metadataTransaction.getTargetAddress().equals(address));
}
if (transaction instanceof TargetAddressTransaction) {
TargetAddressTransaction targetAddressTransaction = (TargetAddressTransaction) transaction;
if (targetAddressTransaction.getTargetAddress() instanceof Address) {
return Observable.just(targetAddressTransaction.getTargetAddress().equals(address));
}
return namespaceIdsObservable
.map(namespaceIds -> namespaceIds.contains(targetAddressTransaction.getTargetAddress()));
}
if (transaction instanceof MultisigAccountModificationTransaction) {
MultisigAccountModificationTransaction multisigAccountModificationTransaction = (MultisigAccountModificationTransaction) transaction;
if (multisigAccountModificationTransaction.getAddressAdditions().stream()
.anyMatch(a -> a.equals(address))) {
return Observable.just(true);
}
return Observable.just(
multisigAccountModificationTransaction.getAddressDeletions().stream().anyMatch(a -> a.equals(address)));
}
if (transaction instanceof AccountAddressRestrictionTransaction) {
AccountAddressRestrictionTransaction accountAddressRestrictionTransaction = (AccountAddressRestrictionTransaction) transaction;
if (accountAddressRestrictionTransaction.getRestrictionAdditions().contains(address)) {
return Observable.just(true);
}
if (accountAddressRestrictionTransaction.getRestrictionDeletions().contains(address)) {
return Observable.just(true);
}
return namespaceIdsObservable.flatMap(namespaceIds -> {
if (namespaceIds.stream().anyMatch(
namespaceId -> accountAddressRestrictionTransaction.getRestrictionAdditions()
.contains(namespaceId))) {
return Observable.just(true);
}
if (namespaceIds.stream().anyMatch(
namespaceId -> accountAddressRestrictionTransaction.getRestrictionDeletions()
.contains(namespaceId))) {
return Observable.just(true);
}
return Observable.just(false);
});
}
if (transaction instanceof RecipientTransaction) {
RecipientTransaction recipientTransaction = (RecipientTransaction) transaction;
if (recipientTransaction.getRecipient() instanceof NamespaceId) {
return namespaceIdsObservable
.map(namespaceIds -> namespaceIds.contains(recipientTransaction.getRecipient()));
}
return Observable.just(recipientTransaction.getRecipient().equals(address));
}
return Observable.just(false);
}
@Override
public ObservableSource<T> apply(Observable<T> upstream) {
return upstream.flatMap(
client ->
Observable.timer(random.nextInt(maxTimeMs), TimeUnit.MILLISECONDS).map(t -> client));
}