io.reactivex.Observable#flatMap ( )源码实例Demo

下面列出了io.reactivex.Observable#flatMap ( ) 实例代码,或者点击链接到github查看源代码,也可以在右侧发表评论。

源代码1 项目: v9porn   文件: RetryWhenProcess.java
@Override
public ObservableSource<?> apply(Observable<Throwable> throwableObservable) throws Exception {

    return throwableObservable.flatMap(new Function<Throwable, ObservableSource<?>>() {
        @Override
        public ObservableSource<?> apply(Throwable throwable) throws Exception {
            Logger.t(TAG).d("Error:::" + throwable);
            if (throwable instanceof SocketTimeoutException && ++tryTimes <= maxTryTime) {
                Logger.t(TAG).d("超时重试第【" + (tryTimes - 1) + "】次");
                return Observable.timer(mInterval, TimeUnit.SECONDS);
            } else if (throwable instanceof CompositeException) {
                CompositeException compositeException = (CompositeException) throwable;
                //结合rxcache会把异常进行包裹才会返回,需要解析提取
                for (Throwable innerthrowable : compositeException.getExceptions()) {
                    if (innerthrowable instanceof SocketTimeoutException && ++tryTimes <= maxTryTime) {
                        Logger.t(TAG).d("带Rxcache超时重试第【" + (tryTimes - 1) + "】次");
                        return Observable.timer(mInterval, TimeUnit.SECONDS);
                    }
                }
            }
            return Observable.error(throwable);
        }

    });
}
 
源代码2 项目: mobius   文件: Transformers.java
/**
 * Creates an {@link ObservableTransformer} that will flatten the provided {@link Function} into
 * the stream as an {@link Observable} every time it receives an effect from the upstream effects
 * observable. This will result in calling the function on the specified scheduler, and passing it
 * the requested effect object then emitting its returned value.
 *
 * @param function the {@link Function} to be invoked every time the effect is requested
 * @param scheduler the {@link Scheduler} to be used when invoking the function
 * @param <F> the type of Effect this transformer handles
 * @param <E> the type of Event this transformer emits
 * @return an {@link ObservableTransformer} that can be used with a {@link
 *     SubtypeEffectHandlerBuilder}.
 */
static <F, E> ObservableTransformer<F, E> fromFunction(
    final Function<F, E> function, @Nullable final Scheduler scheduler) {
  return new ObservableTransformer<F, E>() {
    @Override
    public ObservableSource<E> apply(Observable<F> effectStream) {
      return effectStream.flatMap(
          new Function<F, ObservableSource<E>>() {
            @Override
            public ObservableSource<E> apply(final F f) {
              Observable<E> eventObservable =
                  Observable.fromCallable(
                      new Callable<E>() {
                        @Override
                        public E call() throws Exception {
                          return function.apply(f);
                        }
                      });
              return scheduler == null ? eventObservable : eventObservable.subscribeOn(scheduler);
            }
          });
    }
  };
}
 
源代码3 项目: v9porn   文件: RetryWhenProcess.java
@Override
public ObservableSource<?> apply(Observable<Throwable> throwableObservable) throws Exception {

    return throwableObservable.flatMap(new Function<Throwable, ObservableSource<?>>() {
        @Override
        public ObservableSource<?> apply(Throwable throwable) throws Exception {
            Logger.t(TAG).d("Error:::" + throwable);
            if (throwable instanceof SocketTimeoutException && ++tryTimes <= maxTryTime) {
                Logger.t(TAG).d("超时重试第【" + (tryTimes - 1) + "】次");
                return Observable.timer(mInterval, TimeUnit.SECONDS);
            } else if (throwable instanceof CompositeException) {
                CompositeException compositeException = (CompositeException) throwable;
                //结合rxcache会把异常进行包裹才会返回,需要解析提取
                for (Throwable innerthrowable : compositeException.getExceptions()) {
                    if (innerthrowable instanceof SocketTimeoutException && ++tryTimes <= maxTryTime) {
                        Logger.t(TAG).d("带Rxcache超时重试第【" + (tryTimes - 1) + "】次");
                        return Observable.timer(mInterval, TimeUnit.SECONDS);
                    }
                }
            }
            return Observable.error(throwable);
        }

    });
}
 
源代码4 项目: java-unified-sdk   文件: AVObject.java
/**
 * Save object in background.
 * @param option save option.
 * @return observable instance.
 */
public Observable<? extends AVObject> saveInBackground(final AVSaveOption option) {
  Map<AVObject, Boolean> markMap = new HashMap<>();
  if (hasCircleReference(markMap)) {
    return Observable.error(new AVException(AVException.CIRCLE_REFERENCE, "Found a circular dependency when saving."));
  }

  Observable<List<AVObject>> needSaveFirstly = generateCascadingSaveObjects();
  return needSaveFirstly.flatMap(new Function<List<AVObject>, Observable<? extends AVObject>>() {
    @Override
    public Observable<? extends AVObject> apply(List<AVObject> objects) throws Exception {
      logger.d("First, try to execute save operations in thread: " + Thread.currentThread());
      for (AVObject o: objects) {
        o.save();
      }
      logger.d("Second, save object itself...");
      return saveSelfOperations(option);
    }
  });
}
 
源代码5 项目: WanAndroid   文件: RxUtils.java
/**
 * 对结果进行预处理
 */
public static <T> ObservableTransformer<BaseResponse<T>, T> handleResult(){
    return new ObservableTransformer<BaseResponse<T>, T>() {
        @Override
        public ObservableSource<T> apply(Observable<BaseResponse<T>> upstream) {
            return upstream.flatMap(new Function<BaseResponse<T>, ObservableSource<T>>() {
                @Override
                public ObservableSource<T> apply(BaseResponse<T> tBaseResponse) throws Exception {
                    if(tBaseResponse.getErrorCode() == 0){
                        return createObservable(tBaseResponse.getData());//创建我们需要的数据
                    }
                    return Observable.error(
                            new ApiException(tBaseResponse.getErrorCode(), tBaseResponse.getErrorMsg())//创建一个异常
                    );
                }
            });
        }
    };
}
 
源代码6 项目: ViewPagerHelper   文件: RxUtils.java
/**
 * 统一返回结果处理
 * @param <T> 指定的泛型类型
 * @return ObservableTransformer
 */
public static <T> ObservableTransformer<BaseResponse<T>, T> handleResult() {

    return new ObservableTransformer<BaseResponse<T>, T>() {
        @Override
        public ObservableSource<T> apply(Observable<BaseResponse<T>> upstream) {
            return upstream.flatMap(new Function<BaseResponse<T>, ObservableSource<T>>() {
                @Override
                public ObservableSource<T> apply(BaseResponse<T> baseResponse) throws Exception {
                    if (baseResponse.getErrorCode() == BaseResponse.SUCCESS
                            && baseResponse.getData() != null
                    ) {
                        return createData(baseResponse.getData());
                    } else {
                        return Observable.error(new Exception(baseResponse.getErrorMsg()));
                    }
                }
            });

        }
    };
}
 
源代码7 项目: FastLib   文件: FastRetryWhen.java
/**
 * Applies this function to the given argument.
 *
 * @param observable the function argument
 * @return the function result
 */
@Override
public Observable<?> apply(Observable<? extends Throwable> observable) {
    return observable.flatMap(new Function<Throwable, ObservableSource<?>>() {
        @Override
        public ObservableSource<?> apply(Throwable throwable) {
            //未连接网络直接返回异常
            if (!NetworkUtil.isConnected(mContext)) {
                return Observable.error(throwable);
            }
            //仅仅对连接失败相关错误进行重试
            if (throwable instanceof ConnectException
                    || throwable instanceof UnknownHostException
                    || throwable instanceof SocketTimeoutException
                    || throwable instanceof SocketException
                    || throwable instanceof TimeoutException) {
                if (++mRetryCount <= mRetryMaxTime) {
                    LoggerManager.e(TAG, "网络请求错误,将在 " + mRetryDelay + " ms后进行重试, 重试次数 " + mRetryCount + ";throwable:" + throwable);
                    return Observable.timer(mRetryDelay, TimeUnit.MILLISECONDS);
                }
            }
            return Observable.error(throwable);
        }
    });
}
 
源代码8 项目: RxCache   文件: RxCacheHelper.java
public static <T> Observable<CacheResult<T>> loadRemoteSync(final RxCache rxCache, final String key, Observable<T> source, final CacheTarget target, final boolean needEmpty) {
    Observable<CacheResult<T>> observable = source
            .flatMap(new Function<T, ObservableSource<CacheResult<T>>>() {
                @Override
                public ObservableSource<CacheResult<T>> apply(@NonNull T t) throws Exception {
                    return saveCacheSync(rxCache, key, t, target);
                }
            });
    if (needEmpty) {
        observable = observable.onErrorResumeNext(new Function<Throwable, ObservableSource<? extends CacheResult<T>>>() {
            @Override
            public ObservableSource<? extends CacheResult<T>> apply(@NonNull Throwable throwable) throws Exception {
                return Observable.empty();
            }
        });
    }
    return observable;

}
 
源代码9 项目: akarnokd-misc   文件: Cartesian.java
static Observable<int[]> cartesian(List<Observable<Integer>> sources) {
    if (sources.size() == 0) {
        return Observable.<int[]>empty();
    }
    Observable<int[]> main = sources.get(0).map(v -> new int[] { v });
    
    for (int i = 1; i < sources.size(); i++) {
        int j = i;
        Observable<Integer> o = sources.get(i).cache();
        main = main.flatMap(v -> {
            return o.map(w -> {
                int[] arr = Arrays.copyOf(v, j + 1);
                arr[j] = w;
                return arr;
            });
        });
    }
    
    return main;
}
 
源代码10 项目: symbol-sdk-java   文件: TransactionServiceImpl.java
@Override
public Observable<AggregateTransaction> announceAggregateBonded(
    Listener listener, SignedTransaction signedAggregateTransaction) {
    Validate.notNull(signedAggregateTransaction, "signedAggregateTransaction is required");
    Validate.isTrue(signedAggregateTransaction.getType() == TransactionType.AGGREGATE_BONDED,
        "signedAggregateTransaction type must be AGGREGATE_BONDED");
    Observable<TransactionAnnounceResponse> announce = transactionRepository
        .announceAggregateBonded(signedAggregateTransaction);
    return announce.flatMap(
        r -> listener.aggregateBondedAddedOrError(signedAggregateTransaction.getSigner().getAddress(),
            signedAggregateTransaction.getHash()));
}
 
源代码11 项目: WanAndroid   文件: RxUtils.java
/**
 * 对结果进行预处理,只返回成功的结果
 */
public static <T> ObservableTransformer<BaseResponse<T>, T> handleRequest2(){

    return new ObservableTransformer<BaseResponse<T>, T>() {
        @Override
        public ObservableSource<T> apply(Observable<BaseResponse<T>> upstream) {
            return upstream.flatMap(new Function<BaseResponse<T>, ObservableSource<T>>() {
                @Override
                public ObservableSource<T> apply(BaseResponse<T> tBaseResponse) throws Exception {
                    return createObservable(tBaseResponse.getData());
                }
            });
        }
    };
}
 
源代码12 项目: asf-sdk   文件: BDSChannelImpl.java
private ObservableSource<?> handleWsError(Observable<Throwable> throwableObservable) {
  AtomicInteger counter = new AtomicInteger();

  return throwableObservable.flatMap(throwable -> {
    if (throwable instanceof HttpException) {
      return Observable.just(throwable)
          .takeWhile(__ -> counter.getAndIncrement() != 5)
          .flatMap(__ -> Observable.timer(5, TimeUnit.SECONDS));
    } else {
      return Observable.just(throwable);
    }
  });
}
 
@Override
public Observable<List<Movie>> getPopularMovies(int page) {
    Observable<DiscoverMoviesResponse> discoverMoviesResponseObservable =
            apiService.discover("popularity.desc", page, ApiUtils.getApiKey());
    return discoverMoviesResponseObservable
            .flatMap(new Function<DiscoverMoviesResponse, ObservableSource<? extends List<Movie>>>() {
                @Override
                public ObservableSource<? extends List<Movie>> apply(DiscoverMoviesResponse discoverMoviesResponse)
                        throws Exception {
                    return Observable.just(discoverMoviesResponse.getResults());
                }
            });
}
 
源代码14 项目: BaseProject   文件: RetryFunc.java
@Override public ObservableSource<?> apply(final Observable<? extends Throwable> observable)
        throws Exception {
    return observable.flatMap(new Function<Throwable, ObservableSource<?>>() {
        @Override public ObservableSource<?> apply(Throwable throwable) throws Exception {
            if (++mRetryCount < mMaxRetryCount) {
                return Observable.timer(mRetryDelay, mTimeUnit);
            }
            return Observable.error(throwable);
        }
    });
}
 
源代码15 项目: RxAndroidBle   文件: NoRetryStrategy.java
@Override
public Observable<LongWriteFailure> apply(Observable<LongWriteFailure> observable) {
    return observable.flatMap(new Function<LongWriteFailure, Observable<LongWriteFailure>>() {
        @Override
        public Observable<LongWriteFailure> apply(LongWriteFailure longWriteFailure) {
            return Observable.error(longWriteFailure.getCause());
        }
    });
}
 
@NonNull
@Override
public Observable<List<Media>> fromFile(@NonNull final Uri uri) {
    final Observable<List<Media>> fromProvider = mMediaProvider.load(
            MediaStore.Audio.Media.DATA.concat("=?"),
            new String[]{uri.getPath()},
            null,
            null);

    return fromProvider.flatMap(queue -> queue.isEmpty()
                ? Observable.fromCallable(() -> queueFromFile(uri))
                : Observable.just(queue));
}
 
源代码17 项目: akarnokd-misc   文件: SwitchFallback.java
public static void main(String[] args) {
    Observable<List<Integer>> source = Observable.empty();

            Observable<List<Integer>> fallbackSource = Observable.empty();

            source.flatMap(list -> {
                if (list.isEmpty()) {
                    return fallbackSource;
                }
                return Observable.just(list);
            });
}
 
源代码18 项目: symbol-sdk-java   文件: ListenerBase.java
public Observable<Boolean> transactionFromAddress(final Transaction transaction, final Address address,
    final Observable<List<NamespaceId>> namespaceIdsObservable) {
    if (transaction.getSigner().filter(s -> s.getAddress().equals(address)).isPresent()) {
        return Observable.just(true);
    }
    if (transaction instanceof AggregateTransaction) {
        final AggregateTransaction aggregateTransaction = (AggregateTransaction) transaction;
        if (aggregateTransaction.getCosignatures().stream()
            .anyMatch(c -> c.getSigner().getAddress().equals(address))) {
            return Observable.just(true);
        }
        //Recursion...
        Observable<Transaction> innerTransactionObservable = Observable
            .fromIterable(aggregateTransaction.getInnerTransactions());

        return innerTransactionObservable
            .flatMap(t -> this.transactionFromAddress(t, address, namespaceIdsObservable).filter(a -> a))
            .first(false).toObservable();
    }
    if (transaction instanceof PublicKeyLinkTransaction) {
        return Observable.just(Address
            .createFromPublicKey(((PublicKeyLinkTransaction) transaction).getLinkedPublicKey().toHex(),
                transaction.getNetworkType()).equals(address));
    }

    if (transaction instanceof MetadataTransaction) {
        MetadataTransaction metadataTransaction = (MetadataTransaction) transaction;
        return Observable.just(metadataTransaction.getTargetAddress().equals(address));
    }

    if (transaction instanceof TargetAddressTransaction) {
        TargetAddressTransaction targetAddressTransaction = (TargetAddressTransaction) transaction;
        if (targetAddressTransaction.getTargetAddress() instanceof Address) {
            return Observable.just(targetAddressTransaction.getTargetAddress().equals(address));
        }
        return namespaceIdsObservable
            .map(namespaceIds -> namespaceIds.contains(targetAddressTransaction.getTargetAddress()));
    }

    if (transaction instanceof MultisigAccountModificationTransaction) {
        MultisigAccountModificationTransaction multisigAccountModificationTransaction = (MultisigAccountModificationTransaction) transaction;
        if (multisigAccountModificationTransaction.getAddressAdditions().stream()
            .anyMatch(a -> a.equals(address))) {
            return Observable.just(true);
        }

        return Observable.just(
            multisigAccountModificationTransaction.getAddressDeletions().stream().anyMatch(a -> a.equals(address)));

    }

    if (transaction instanceof AccountAddressRestrictionTransaction) {
        AccountAddressRestrictionTransaction accountAddressRestrictionTransaction = (AccountAddressRestrictionTransaction) transaction;
        if (accountAddressRestrictionTransaction.getRestrictionAdditions().contains(address)) {
            return Observable.just(true);
        }
        if (accountAddressRestrictionTransaction.getRestrictionDeletions().contains(address)) {
            return Observable.just(true);
        }
        return namespaceIdsObservable.flatMap(namespaceIds -> {
            if (namespaceIds.stream().anyMatch(
                namespaceId -> accountAddressRestrictionTransaction.getRestrictionAdditions()
                    .contains(namespaceId))) {
                return Observable.just(true);
            }
            if (namespaceIds.stream().anyMatch(
                namespaceId -> accountAddressRestrictionTransaction.getRestrictionDeletions()
                    .contains(namespaceId))) {
                return Observable.just(true);
            }
            return Observable.just(false);
        });
    }

    if (transaction instanceof RecipientTransaction) {
        RecipientTransaction recipientTransaction = (RecipientTransaction) transaction;
        if (recipientTransaction.getRecipient() instanceof NamespaceId) {
            return namespaceIdsObservable
                .map(namespaceIds -> namespaceIds.contains(recipientTransaction.getRecipient()));
        }
        return Observable.just(recipientTransaction.getRecipient().equals(address));

    }

    return Observable.just(false);
}
 
源代码19 项目: xio   文件: ClientPoolTest.java
@Override
public ObservableSource<T> apply(Observable<T> upstream) {
  return upstream.flatMap(
      client ->
          Observable.timer(random.nextInt(maxTimeMs), TimeUnit.MILLISECONDS).map(t -> client));
}