io.reactivex.Flowable#flatMap ( )源码实例Demo

下面列出了io.reactivex.Flowable#flatMap ( ) 实例代码,或者点击链接到github查看源代码,也可以在右侧发表评论。

源代码1 项目: AcgClub   文件: RxUtil.java
/**
 * 统一返回结果处理
 */
public static <T> FlowableTransformer<BaseResponse<T>, T> handleResult() {
  return new FlowableTransformer<BaseResponse<T>, T>() {
    @Override
    public Flowable<T> apply(Flowable<BaseResponse<T>> httpResponseFlowable) {
      return httpResponseFlowable.flatMap(new Function<BaseResponse<T>, Flowable<T>>() {
        @Override
        public Flowable<T> apply(BaseResponse<T> tBaseResponse) throws Exception {
          /*if (tBaseResponse.code == 200) {
            return createData(tBaseResponse.data);
          } else {
            return Flowable.error(new ApiException(tBaseResponse.message));
          }*/
          return createData(tBaseResponse.data);
        }
      });

    }
  };
}
 
源代码2 项目: dapp-wallet-demo   文件: InitWalletManager.java
public Flowable<HLWallet> importMnemonic(Context context,
                                         String password,
                                         String mnemonics) {
    Flowable<String> flowable = Flowable.just(mnemonics);

    return flowable
            .flatMap(s -> {
                ECKeyPair keyPair = generateKeyPair(s);
                WalletFile walletFile = Wallet.createLight(password, keyPair);
                HLWallet hlWallet = new HLWallet(walletFile);
                if (WalletManager.shared().isWalletExist(hlWallet.getAddress())) {
                    return Flowable.error(new HLError(ReplyCode.walletExisted, new Throwable("Wallet existed!")));
                }
                WalletManager.shared().saveWallet(context, hlWallet);
                return Flowable.just(hlWallet);
            });
}
 
源代码3 项目: dapp-wallet-demo   文件: InitWalletManager.java
public Flowable<HLWallet> importPrivateKey(Context context, String privateKey, String password) {
    if (privateKey.startsWith(Constant.PREFIX_16)) {
        privateKey = privateKey.substring(Constant.PREFIX_16.length());
    }
    Flowable<String> flowable = Flowable.just(privateKey);
    return flowable.flatMap(s -> {
        byte[] privateBytes = Hex.decode(s);
        ECKeyPair ecKeyPair = ECKeyPair.create(privateBytes);
        WalletFile walletFile = Wallet.createLight(password, ecKeyPair);
        HLWallet hlWallet = new HLWallet(walletFile);
        if (WalletManager.shared().isWalletExist(hlWallet.getAddress())) {
            return Flowable.error(new HLError(ReplyCode.walletExisted, new Throwable("Wallet existed!")));
        }
        WalletManager.shared().saveWallet(context, hlWallet);
        return Flowable.just(hlWallet);
    });
}
 
源代码4 项目: XDroidMvp   文件: XApi.java
/**
 * 异常处理变换
 *
 * @return
 */
public static <T extends IModel> FlowableTransformer<T, T> getApiTransformer() {

    return new FlowableTransformer<T, T>() {
        @Override
        public Publisher<T> apply(Flowable<T> upstream) {
            return upstream.flatMap(new Function<T, Publisher<T>>() {
                @Override
                public Publisher<T> apply(T model) throws Exception {

                    if (model == null || model.isNull()) {
                        return Flowable.error(new NetError(model.getErrorMsg(), NetError.NoDataError));
                    } else if (model.isAuthError()) {
                        return Flowable.error(new NetError(model.getErrorMsg(), NetError.AuthError));
                    } else if (model.isBizError()) {
                        return Flowable.error(new NetError(model.getErrorMsg(), NetError.BusinessError));
                    } else {
                        return Flowable.just(model);
                    }
                }
            });
        }
    };
}
 
源代码5 项目: cyclops   文件: Flowables.java
public static  <T,R> Flowable<R> tailRec(T initial, Function<? super T, ? extends Flowable<? extends Either<T, R>>> fn) {
    Flowable<Either<T, R>> next = Flowable.just(Either.left(initial));

    boolean newValue[] = {true};
    for(;;){

        next = next.flatMap(e -> e.fold(s -> {
                    newValue[0]=true;
                    return fn.apply(s); },
                p -> {
                    newValue[0]=false;
                    return Flowable.just(e);
                }));
        if(!newValue[0])
            break;

    }

    return next.filter(Either::isRight).map(e->e.orElse(null));
}
 
源代码6 项目: micronaut-aws   文件: Route53AutoNamingClient.java
/**
 * Gets a list of instances registered with Route53 given a service ID.
 * @param serviceId The service id
 * @return list of serviceInstances usable by MN.
 */
@Override
public Publisher<List<ServiceInstance>> getInstances(String serviceId) {
    if (serviceId == null) {
        serviceId = getRoute53ClientDiscoveryConfiguration().getAwsServiceId();  // we can default to the config file
    }
    ListInstancesRequest instancesRequest = new ListInstancesRequest().withServiceId(serviceId);
    Future<ListInstancesResult> instanceResult = getDiscoveryClient().listInstancesAsync(instancesRequest);
    Flowable<ListInstancesResult> observableInstanceResult = Flowable.fromFuture(instanceResult);
    return observableInstanceResult.flatMap(this::convertInstancesResulttoServiceInstances);
}
 
源代码7 项目: micronaut-aws   文件: Route53AutoNamingClient.java
/**
 * Gets a list of service IDs from AWS for a given namespace.
 * @return rx java publisher list of the service IDs in string format
 */
@Override
public Publisher<List<String>> getServiceIds() {
    ServiceFilter serviceFilter = new ServiceFilter().withName("NAMESPACE_ID").withValues(getRoute53ClientDiscoveryConfiguration().getNamespaceId());
    ListServicesRequest listServicesRequest = new ListServicesRequest().withFilters(serviceFilter);
    Future<ListServicesResult> response = getDiscoveryClient().listServicesAsync(listServicesRequest);
    Flowable<ListServicesResult> flowableList = Flowable.fromFuture(response);
    return flowableList.flatMap(this::convertServiceIds);
}
 
源代码8 项目: adamant-android   文件: AdamantWalletFacade.java
private Flowable<TransferDetails> mapTransactionDetailsResponseToTransfer(Flowable<TransactionDetailsResponse> response){
    return response.flatMap(transferDetails->{
            Transaction t = transferDetails.getTransaction();
            if (transferDetails.isSuccess()){
                return Flowable.just(transferDetailsFromTransaction(t));
            } else {
                return Flowable.error(new Exception(transferDetails.getError()));
            }
    });
}
 
@Override
public Publisher<?> apply(Flowable<? extends Throwable> inputObservable) {

  // it is critical to use inputObservable in the chain for the result
  // ignoring it and doing your own thing will break the sequence

  return inputObservable.flatMap(
      new Function<Throwable, Publisher<?>>() {
        @Override
        public Publisher<?> apply(Throwable throwable) {
          if (++_retryCount < _maxRetries) {

            // When this Observable calls onNext, the original
            // Observable will be retried (i.e. re-subscribed)

            Timber.d("Retrying in %d ms", _retryCount * _retryDelayMillis);
            _log(String.format("Retrying in %d ms", _retryCount * _retryDelayMillis));

            return Flowable.timer(_retryCount * _retryDelayMillis, TimeUnit.MILLISECONDS);
          }

          Timber.d("Argh! i give up");

          // Max retries hit. Pass an error so the chain is forcibly completed
          // only onNext triggers a re-subscription (onError + onComplete kills it)
          return Flowable.error(throwable);
        }
      });
}
 
源代码10 项目: RetrofitGO   文件: RxHelper.java
public static <T> Publisher<T> handleResponse(Flowable<Response<T>> upstream) {
    return upstream.flatMap((Function<Response<T>, Publisher<T>>) response -> {
        if (response.getCode() != ErrorHelper.SUCCESS) {
            return Flowable.error(new ApiException(response.getCode(), response.getMessage()));
        }

        if (response.getData() == null) {
            return Flowable.error(new ApiException(response.getCode(), response.getMessage()));
        }

        return Flowable.just(response.getData());
    });
}
 
源代码11 项目: rxjava2-jdbc   文件: Select.java
static <T> Flowable<T> create(Connection con, String sql,
                              Flowable<List<Object>> parameterGroups, int fetchSize,
                              Function<? super ResultSet, T> mapper, boolean eagerDispose, int queryTimeoutSec) {
    log.debug("Select.create called with con={}", con);
    Callable<NamedPreparedStatement> initialState = () -> Util.prepare(con, fetchSize, sql, queryTimeoutSec);
    Function<NamedPreparedStatement, Flowable<T>> observableFactory = ps -> parameterGroups
            .flatMap(parameters -> create(ps.ps, parameters, mapper, ps.names, sql, fetchSize, queryTimeoutSec),
                    true, 1);
    Consumer<NamedPreparedStatement> disposer = Util::closePreparedStatementAndConnection;
    return Flowable.using(initialState, observableFactory, disposer, eagerDispose);
}
 
源代码12 项目: cyclops   文件: Flowables.java
public static <T> Flowable<Single<T>> sequence(final Publisher<? extends Flowable<T>> fts) {

        io.reactivex.functions.BiFunction<Flowable<Single<T>>,Flowable<T>,Flowable<Single<T>>> combineToStream = (acc,next) ->Flowable.merge(acc,next.map(Single::just));
        Single<Flowable<Single<T>>> x = Flowable.fromPublisher(fts).reduce(Flowable.empty(), combineToStream);
        Flowable<Flowable<Single<T>>> r = x.flatMapPublisher(Flowable::just);
        return r.flatMap(i->i);
    }
 
private Flowable<Object> buildSendFlowable(
        MethodInvocationContext<Object, Object> context,
        String topic,
        Producer kafkaProducer,
        List<Header> kafkaHeaders,
        Argument<?> returnType,
        Object key,
        Object value,
        Long timestamp,
        Duration maxBlock) {
    Flowable<?> valueFlowable = Publishers.convertPublisher(value, Flowable.class);
    Class<?> javaReturnType = returnType.getType();

    if (Iterable.class.isAssignableFrom(javaReturnType)) {
        javaReturnType = returnType.getFirstTypeVariable().orElse(Argument.OBJECT_ARGUMENT).getType();
    }

    Class<?> finalJavaReturnType = javaReturnType;
    Flowable<Object> sendFlowable = valueFlowable.flatMap(o -> {
        ProducerRecord record = buildProducerRecord(topic, kafkaHeaders, key, o, timestamp);

        if (LOG.isTraceEnabled()) {
            LOG.trace("@KafkaClient method [" + context + "] Sending producer record: " + record);
        }

        //noinspection unchecked
        return Flowable.create(emitter -> kafkaProducer.send(record, (metadata, exception) -> {
            if (exception != null) {
                emitter.onError(wrapException(context, exception));
            } else {
                if (RecordMetadata.class.isAssignableFrom(finalJavaReturnType)) {
                    emitter.onNext(metadata);
                } else if (finalJavaReturnType.isInstance(o)) {
                    emitter.onNext(o);
                } else {
                    Optional converted = conversionService.convert(metadata, finalJavaReturnType);
                    if (converted.isPresent()) {
                        emitter.onNext(converted.get());
                    }
                }

                emitter.onComplete();
            }
        }), BackpressureStrategy.BUFFER);
    });

    if (maxBlock != null) {
        sendFlowable = sendFlowable.timeout(maxBlock.toMillis(), TimeUnit.MILLISECONDS);
    }
    return sendFlowable;
}
 
源代码14 项目: vertx-in-action   文件: CongratsTest.java
private Flowable<Long> retryLater(Vertx vertx, Flowable<Throwable> errs) {
  return errs
    .flatMap(d -> Flowable.timer(10, TimeUnit.SECONDS, RxHelper.scheduler(vertx)));
}
 
public Flowable<Book> save(Flowable<Book> books) {
   return books
      .flatMap(book -> save(book).toFlowable());
}
 
源代码16 项目: NetDiscovery   文件: RetryWithDelay.java
@Override
public Publisher apply(Flowable<Throwable> attempts) throws Exception {
    return attempts.flatMap(new Function<Throwable, Publisher<?>>() {
        @Override
        public Publisher<?> apply(Throwable throwable) throws Exception {
            if (++retryCount <= maxRetries && request!=null) {

                String url = request.getUrl();

                if (Preconditions.isNotBlank(url)) {

                    log.info("url:" + url + " get error, it will try after " + retryDelayMillis
                            + " millisecond, retry count " + retryCount);

                    log.info(request.toString());
                } else {

                    log.info("get error, it will try after " + retryDelayMillis
                            + " millisecond, retry count " + retryCount);

                    return Flowable.error(throwable);
                }

                return Flowable.timer(retryDelayMillis, TimeUnit.MILLISECONDS)
                        .map(new Function<Long, Long>() {
                            @Override
                            public Long apply(Long aLong) throws Exception {

                                Request.BeforeRequest beforeRequest = request.getBeforeRequest();

                                Proxy proxy = ProxyPool.getProxy();

                                if (proxy != null && SpiderUtils.checkProxy(proxy)) { // 如果存在代理,则重试时切换一下代理
                                    request.proxy(proxy);
                                }

                                if (beforeRequest != null) {
                                    beforeRequest.process(request);
                                }
                                return aLong;
                            }
                        });
            }

            Request.OnErrorRequest onErrorRequest = request.getOnErrorRequest();
            if (onErrorRequest != null) {
                onErrorRequest.process(request);
            }

            // Max retries hit. Just pass the error along.
            return Flowable.error(throwable);
        }
    });
}
 
源代码17 项目: cyclops   文件: Flowables.java
/**
     * Perform a For Comprehension over a Flowable, accepting 2 generating functions.
     * This results in a three level nested internal iteration over the provided Publishers.
     * <pre>
     * {@code
     *
     * import static cyclops.companion.reactor.Flowables.forEach;
     *
     * forEach(Flowable.range(1,10),
                   a-> ReactiveSeq.iterate(a,i->i+1).limit(10),
                   (a,b) -> Maybe.<Integer>of(a+b),
                   (a,b,c) ->a+b+c<10,
                   Tuple::tuple).toListX();
     * }
     * </pre>
     *
     * @param value1 top level Flowable
     * @param value2 Nested publisher
     * @param value3 Nested publisher
     * @param filterFunction A filtering function, keeps values where the predicate holds
     * @param yieldingFunction Generates a result per combination
     * @return
     */
public static <T1, T2, R1, R2, R> Flowable<R> forEach3(Flowable<? extends T1> value1,
        Function<? super T1, ? extends Publisher<R1>> value2,
        BiFunction<? super T1, ? super R1, ? extends Publisher<R2>> value3,
        Function3<? super T1, ? super R1, ? super R2, Boolean> filterFunction,
        Function3<? super T1, ? super R1, ? super R2, ? extends R> yieldingFunction) {

    return value1.flatMap(in -> {

        Flowable<R1> a = Flowable.fromPublisher(value2.apply(in));
        return a.flatMap(ina -> {
            Flowable<R2> b = Flowable.fromPublisher(value3.apply(in,ina));
            return b.filter(in2->filterFunction.apply(in,ina,in2))
                    .map(in2 -> yieldingFunction.apply(in, ina, in2));
        });



    });

}
 
源代码18 项目: cyclops   文件: Flowables.java
/**
 * Perform a For Comprehension over a Flowable, accepting 2 generating functions.
 * This results in a three level nested internal iteration over the provided Publishers.
 *
 * <pre>
 * {@code
 *
 * import static cyclops.companion.reactor.Flowables.forEach;
 *
 * forEach(Flowable.range(1,10),
                        a-> ReactiveSeq.iterate(a,i->i+1).limit(10),
                        (a,b) -> Maybe.<Integer>of(a+b),
                        Tuple::tuple);
 *
 * }
 * </pre>
 *
 *
 * @param value1 top level Flowable
 * @param value2 Nested publisher
 * @param value3 Nested publisher
 * @param yieldingFunction Generates a result per combination
 * @return Flowable with an element per combination of nested publishers generated by the yielding function
 */
public static <T1, T2, R1, R2, R> Flowable<R> forEach3(Flowable<? extends T1> value1,
        Function<? super T1, ? extends Publisher<R1>> value2,
        BiFunction<? super T1, ? super R1, ? extends Publisher<R2>> value3,
        Function3<? super T1, ? super R1, ? super R2, ? extends R> yieldingFunction) {

    return value1.flatMap(in -> {

        Flowable<R1> a = Flowable.fromPublisher(value2.apply(in));
        return a.flatMap(ina -> {
            Flowable<R2> b = Flowable.fromPublisher(value3.apply(in, ina));
            return b.map(in2 -> yieldingFunction.apply(in, ina, in2));
        });


    });

}
 
源代码19 项目: cyclops   文件: Flowables.java
/**
 * Perform a For Comprehension over a Flowable, accepting an additonal generating function.
 * This results in a two level nested internal iteration over the provided Publishers.
 *
 * <pre>
 * {@code
 *
 *  import static cyclops.companion.reactor.Flowables.forEach;
 *  forEach(Flowable.range(1, 10), i -> Flowable.range(i, 10), Tuple::tuple)
          .subscribe(System.out::println);

   //(1, 1)
     (1, 2)
     (1, 3)
     (1, 4)
     ...
 *
 * }</pre>
 *
 * @param value1 top level Flowable
 * @param value2 Nested publisher
 * @param yieldingFunction Generates a result per combination
 * @return
 */
public static <T, R1, R> Flowable<R> forEach(Flowable<? extends T> value1, Function<? super T, Flowable<R1>> value2,
        BiFunction<? super T, ? super R1, ? extends R> yieldingFunction) {

    return value1.flatMap(in -> {

        Flowable<R1> a = Flowable.fromPublisher(value2.apply(in));
        return a.map(in2 -> yieldingFunction.apply(in,  in2));
    });

}
 
源代码20 项目: cyclops   文件: Flowables.java
/**
 *
 * <pre>
 * {@code
 *
 *   import static cyclops.companion.reactor.Flowables.forEach;
 *
 *   forEach(Flowable.range(1, 10), i -> Flowable.range(i, 10),(a,b) -> a>2 && b<10,Tuple::tuple)
           .subscribe(System.out::println);

   //(3, 3)
     (3, 4)
     (3, 5)
     (3, 6)
     (3, 7)
     (3, 8)
     (3, 9)
     ...

 *
 * }</pre>
 *
 *
 * @param value1 top level Flowable
 * @param value2 Nested publisher
 * @param filterFunction A filtering function, keeps values where the predicate holds
 * @param yieldingFunction Generates a result per combination
 * @return
 */
public static <T, R1, R> Flowable<R> forEach(Flowable<? extends T> value1,
        Function<? super T, ? extends Publisher<R1>> value2,
        BiFunction<? super T, ? super R1, Boolean> filterFunction,
        BiFunction<? super T, ? super R1, ? extends R> yieldingFunction) {

    return value1.flatMap(in -> {

        Flowable<R1> a = Flowable.fromPublisher(value2.apply(in));
        return a.filter(in2->filterFunction.apply(in,in2))
                .map(in2 -> yieldingFunction.apply(in,  in2));
    });

}