下面列出了io.reactivex.Flowable#flatMap ( ) 实例代码,或者点击链接到github查看源代码,也可以在右侧发表评论。
/**
* 统一返回结果处理
*/
public static <T> FlowableTransformer<BaseResponse<T>, T> handleResult() {
return new FlowableTransformer<BaseResponse<T>, T>() {
@Override
public Flowable<T> apply(Flowable<BaseResponse<T>> httpResponseFlowable) {
return httpResponseFlowable.flatMap(new Function<BaseResponse<T>, Flowable<T>>() {
@Override
public Flowable<T> apply(BaseResponse<T> tBaseResponse) throws Exception {
/*if (tBaseResponse.code == 200) {
return createData(tBaseResponse.data);
} else {
return Flowable.error(new ApiException(tBaseResponse.message));
}*/
return createData(tBaseResponse.data);
}
});
}
};
}
public Flowable<HLWallet> importMnemonic(Context context,
String password,
String mnemonics) {
Flowable<String> flowable = Flowable.just(mnemonics);
return flowable
.flatMap(s -> {
ECKeyPair keyPair = generateKeyPair(s);
WalletFile walletFile = Wallet.createLight(password, keyPair);
HLWallet hlWallet = new HLWallet(walletFile);
if (WalletManager.shared().isWalletExist(hlWallet.getAddress())) {
return Flowable.error(new HLError(ReplyCode.walletExisted, new Throwable("Wallet existed!")));
}
WalletManager.shared().saveWallet(context, hlWallet);
return Flowable.just(hlWallet);
});
}
public Flowable<HLWallet> importPrivateKey(Context context, String privateKey, String password) {
if (privateKey.startsWith(Constant.PREFIX_16)) {
privateKey = privateKey.substring(Constant.PREFIX_16.length());
}
Flowable<String> flowable = Flowable.just(privateKey);
return flowable.flatMap(s -> {
byte[] privateBytes = Hex.decode(s);
ECKeyPair ecKeyPair = ECKeyPair.create(privateBytes);
WalletFile walletFile = Wallet.createLight(password, ecKeyPair);
HLWallet hlWallet = new HLWallet(walletFile);
if (WalletManager.shared().isWalletExist(hlWallet.getAddress())) {
return Flowable.error(new HLError(ReplyCode.walletExisted, new Throwable("Wallet existed!")));
}
WalletManager.shared().saveWallet(context, hlWallet);
return Flowable.just(hlWallet);
});
}
/**
* 异常处理变换
*
* @return
*/
public static <T extends IModel> FlowableTransformer<T, T> getApiTransformer() {
return new FlowableTransformer<T, T>() {
@Override
public Publisher<T> apply(Flowable<T> upstream) {
return upstream.flatMap(new Function<T, Publisher<T>>() {
@Override
public Publisher<T> apply(T model) throws Exception {
if (model == null || model.isNull()) {
return Flowable.error(new NetError(model.getErrorMsg(), NetError.NoDataError));
} else if (model.isAuthError()) {
return Flowable.error(new NetError(model.getErrorMsg(), NetError.AuthError));
} else if (model.isBizError()) {
return Flowable.error(new NetError(model.getErrorMsg(), NetError.BusinessError));
} else {
return Flowable.just(model);
}
}
});
}
};
}
public static <T,R> Flowable<R> tailRec(T initial, Function<? super T, ? extends Flowable<? extends Either<T, R>>> fn) {
Flowable<Either<T, R>> next = Flowable.just(Either.left(initial));
boolean newValue[] = {true};
for(;;){
next = next.flatMap(e -> e.fold(s -> {
newValue[0]=true;
return fn.apply(s); },
p -> {
newValue[0]=false;
return Flowable.just(e);
}));
if(!newValue[0])
break;
}
return next.filter(Either::isRight).map(e->e.orElse(null));
}
/**
* Gets a list of instances registered with Route53 given a service ID.
* @param serviceId The service id
* @return list of serviceInstances usable by MN.
*/
@Override
public Publisher<List<ServiceInstance>> getInstances(String serviceId) {
if (serviceId == null) {
serviceId = getRoute53ClientDiscoveryConfiguration().getAwsServiceId(); // we can default to the config file
}
ListInstancesRequest instancesRequest = new ListInstancesRequest().withServiceId(serviceId);
Future<ListInstancesResult> instanceResult = getDiscoveryClient().listInstancesAsync(instancesRequest);
Flowable<ListInstancesResult> observableInstanceResult = Flowable.fromFuture(instanceResult);
return observableInstanceResult.flatMap(this::convertInstancesResulttoServiceInstances);
}
/**
* Gets a list of service IDs from AWS for a given namespace.
* @return rx java publisher list of the service IDs in string format
*/
@Override
public Publisher<List<String>> getServiceIds() {
ServiceFilter serviceFilter = new ServiceFilter().withName("NAMESPACE_ID").withValues(getRoute53ClientDiscoveryConfiguration().getNamespaceId());
ListServicesRequest listServicesRequest = new ListServicesRequest().withFilters(serviceFilter);
Future<ListServicesResult> response = getDiscoveryClient().listServicesAsync(listServicesRequest);
Flowable<ListServicesResult> flowableList = Flowable.fromFuture(response);
return flowableList.flatMap(this::convertServiceIds);
}
private Flowable<TransferDetails> mapTransactionDetailsResponseToTransfer(Flowable<TransactionDetailsResponse> response){
return response.flatMap(transferDetails->{
Transaction t = transferDetails.getTransaction();
if (transferDetails.isSuccess()){
return Flowable.just(transferDetailsFromTransaction(t));
} else {
return Flowable.error(new Exception(transferDetails.getError()));
}
});
}
@Override
public Publisher<?> apply(Flowable<? extends Throwable> inputObservable) {
// it is critical to use inputObservable in the chain for the result
// ignoring it and doing your own thing will break the sequence
return inputObservable.flatMap(
new Function<Throwable, Publisher<?>>() {
@Override
public Publisher<?> apply(Throwable throwable) {
if (++_retryCount < _maxRetries) {
// When this Observable calls onNext, the original
// Observable will be retried (i.e. re-subscribed)
Timber.d("Retrying in %d ms", _retryCount * _retryDelayMillis);
_log(String.format("Retrying in %d ms", _retryCount * _retryDelayMillis));
return Flowable.timer(_retryCount * _retryDelayMillis, TimeUnit.MILLISECONDS);
}
Timber.d("Argh! i give up");
// Max retries hit. Pass an error so the chain is forcibly completed
// only onNext triggers a re-subscription (onError + onComplete kills it)
return Flowable.error(throwable);
}
});
}
public static <T> Publisher<T> handleResponse(Flowable<Response<T>> upstream) {
return upstream.flatMap((Function<Response<T>, Publisher<T>>) response -> {
if (response.getCode() != ErrorHelper.SUCCESS) {
return Flowable.error(new ApiException(response.getCode(), response.getMessage()));
}
if (response.getData() == null) {
return Flowable.error(new ApiException(response.getCode(), response.getMessage()));
}
return Flowable.just(response.getData());
});
}
static <T> Flowable<T> create(Connection con, String sql,
Flowable<List<Object>> parameterGroups, int fetchSize,
Function<? super ResultSet, T> mapper, boolean eagerDispose, int queryTimeoutSec) {
log.debug("Select.create called with con={}", con);
Callable<NamedPreparedStatement> initialState = () -> Util.prepare(con, fetchSize, sql, queryTimeoutSec);
Function<NamedPreparedStatement, Flowable<T>> observableFactory = ps -> parameterGroups
.flatMap(parameters -> create(ps.ps, parameters, mapper, ps.names, sql, fetchSize, queryTimeoutSec),
true, 1);
Consumer<NamedPreparedStatement> disposer = Util::closePreparedStatementAndConnection;
return Flowable.using(initialState, observableFactory, disposer, eagerDispose);
}
public static <T> Flowable<Single<T>> sequence(final Publisher<? extends Flowable<T>> fts) {
io.reactivex.functions.BiFunction<Flowable<Single<T>>,Flowable<T>,Flowable<Single<T>>> combineToStream = (acc,next) ->Flowable.merge(acc,next.map(Single::just));
Single<Flowable<Single<T>>> x = Flowable.fromPublisher(fts).reduce(Flowable.empty(), combineToStream);
Flowable<Flowable<Single<T>>> r = x.flatMapPublisher(Flowable::just);
return r.flatMap(i->i);
}
private Flowable<Object> buildSendFlowable(
MethodInvocationContext<Object, Object> context,
String topic,
Producer kafkaProducer,
List<Header> kafkaHeaders,
Argument<?> returnType,
Object key,
Object value,
Long timestamp,
Duration maxBlock) {
Flowable<?> valueFlowable = Publishers.convertPublisher(value, Flowable.class);
Class<?> javaReturnType = returnType.getType();
if (Iterable.class.isAssignableFrom(javaReturnType)) {
javaReturnType = returnType.getFirstTypeVariable().orElse(Argument.OBJECT_ARGUMENT).getType();
}
Class<?> finalJavaReturnType = javaReturnType;
Flowable<Object> sendFlowable = valueFlowable.flatMap(o -> {
ProducerRecord record = buildProducerRecord(topic, kafkaHeaders, key, o, timestamp);
if (LOG.isTraceEnabled()) {
LOG.trace("@KafkaClient method [" + context + "] Sending producer record: " + record);
}
//noinspection unchecked
return Flowable.create(emitter -> kafkaProducer.send(record, (metadata, exception) -> {
if (exception != null) {
emitter.onError(wrapException(context, exception));
} else {
if (RecordMetadata.class.isAssignableFrom(finalJavaReturnType)) {
emitter.onNext(metadata);
} else if (finalJavaReturnType.isInstance(o)) {
emitter.onNext(o);
} else {
Optional converted = conversionService.convert(metadata, finalJavaReturnType);
if (converted.isPresent()) {
emitter.onNext(converted.get());
}
}
emitter.onComplete();
}
}), BackpressureStrategy.BUFFER);
});
if (maxBlock != null) {
sendFlowable = sendFlowable.timeout(maxBlock.toMillis(), TimeUnit.MILLISECONDS);
}
return sendFlowable;
}
private Flowable<Long> retryLater(Vertx vertx, Flowable<Throwable> errs) {
return errs
.flatMap(d -> Flowable.timer(10, TimeUnit.SECONDS, RxHelper.scheduler(vertx)));
}
public Flowable<Book> save(Flowable<Book> books) {
return books
.flatMap(book -> save(book).toFlowable());
}
@Override
public Publisher apply(Flowable<Throwable> attempts) throws Exception {
return attempts.flatMap(new Function<Throwable, Publisher<?>>() {
@Override
public Publisher<?> apply(Throwable throwable) throws Exception {
if (++retryCount <= maxRetries && request!=null) {
String url = request.getUrl();
if (Preconditions.isNotBlank(url)) {
log.info("url:" + url + " get error, it will try after " + retryDelayMillis
+ " millisecond, retry count " + retryCount);
log.info(request.toString());
} else {
log.info("get error, it will try after " + retryDelayMillis
+ " millisecond, retry count " + retryCount);
return Flowable.error(throwable);
}
return Flowable.timer(retryDelayMillis, TimeUnit.MILLISECONDS)
.map(new Function<Long, Long>() {
@Override
public Long apply(Long aLong) throws Exception {
Request.BeforeRequest beforeRequest = request.getBeforeRequest();
Proxy proxy = ProxyPool.getProxy();
if (proxy != null && SpiderUtils.checkProxy(proxy)) { // 如果存在代理,则重试时切换一下代理
request.proxy(proxy);
}
if (beforeRequest != null) {
beforeRequest.process(request);
}
return aLong;
}
});
}
Request.OnErrorRequest onErrorRequest = request.getOnErrorRequest();
if (onErrorRequest != null) {
onErrorRequest.process(request);
}
// Max retries hit. Just pass the error along.
return Flowable.error(throwable);
}
});
}
/**
* Perform a For Comprehension over a Flowable, accepting 2 generating functions.
* This results in a three level nested internal iteration over the provided Publishers.
* <pre>
* {@code
*
* import static cyclops.companion.reactor.Flowables.forEach;
*
* forEach(Flowable.range(1,10),
a-> ReactiveSeq.iterate(a,i->i+1).limit(10),
(a,b) -> Maybe.<Integer>of(a+b),
(a,b,c) ->a+b+c<10,
Tuple::tuple).toListX();
* }
* </pre>
*
* @param value1 top level Flowable
* @param value2 Nested publisher
* @param value3 Nested publisher
* @param filterFunction A filtering function, keeps values where the predicate holds
* @param yieldingFunction Generates a result per combination
* @return
*/
public static <T1, T2, R1, R2, R> Flowable<R> forEach3(Flowable<? extends T1> value1,
Function<? super T1, ? extends Publisher<R1>> value2,
BiFunction<? super T1, ? super R1, ? extends Publisher<R2>> value3,
Function3<? super T1, ? super R1, ? super R2, Boolean> filterFunction,
Function3<? super T1, ? super R1, ? super R2, ? extends R> yieldingFunction) {
return value1.flatMap(in -> {
Flowable<R1> a = Flowable.fromPublisher(value2.apply(in));
return a.flatMap(ina -> {
Flowable<R2> b = Flowable.fromPublisher(value3.apply(in,ina));
return b.filter(in2->filterFunction.apply(in,ina,in2))
.map(in2 -> yieldingFunction.apply(in, ina, in2));
});
});
}
/**
* Perform a For Comprehension over a Flowable, accepting 2 generating functions.
* This results in a three level nested internal iteration over the provided Publishers.
*
* <pre>
* {@code
*
* import static cyclops.companion.reactor.Flowables.forEach;
*
* forEach(Flowable.range(1,10),
a-> ReactiveSeq.iterate(a,i->i+1).limit(10),
(a,b) -> Maybe.<Integer>of(a+b),
Tuple::tuple);
*
* }
* </pre>
*
*
* @param value1 top level Flowable
* @param value2 Nested publisher
* @param value3 Nested publisher
* @param yieldingFunction Generates a result per combination
* @return Flowable with an element per combination of nested publishers generated by the yielding function
*/
public static <T1, T2, R1, R2, R> Flowable<R> forEach3(Flowable<? extends T1> value1,
Function<? super T1, ? extends Publisher<R1>> value2,
BiFunction<? super T1, ? super R1, ? extends Publisher<R2>> value3,
Function3<? super T1, ? super R1, ? super R2, ? extends R> yieldingFunction) {
return value1.flatMap(in -> {
Flowable<R1> a = Flowable.fromPublisher(value2.apply(in));
return a.flatMap(ina -> {
Flowable<R2> b = Flowable.fromPublisher(value3.apply(in, ina));
return b.map(in2 -> yieldingFunction.apply(in, ina, in2));
});
});
}
/**
* Perform a For Comprehension over a Flowable, accepting an additonal generating function.
* This results in a two level nested internal iteration over the provided Publishers.
*
* <pre>
* {@code
*
* import static cyclops.companion.reactor.Flowables.forEach;
* forEach(Flowable.range(1, 10), i -> Flowable.range(i, 10), Tuple::tuple)
.subscribe(System.out::println);
//(1, 1)
(1, 2)
(1, 3)
(1, 4)
...
*
* }</pre>
*
* @param value1 top level Flowable
* @param value2 Nested publisher
* @param yieldingFunction Generates a result per combination
* @return
*/
public static <T, R1, R> Flowable<R> forEach(Flowable<? extends T> value1, Function<? super T, Flowable<R1>> value2,
BiFunction<? super T, ? super R1, ? extends R> yieldingFunction) {
return value1.flatMap(in -> {
Flowable<R1> a = Flowable.fromPublisher(value2.apply(in));
return a.map(in2 -> yieldingFunction.apply(in, in2));
});
}
/**
*
* <pre>
* {@code
*
* import static cyclops.companion.reactor.Flowables.forEach;
*
* forEach(Flowable.range(1, 10), i -> Flowable.range(i, 10),(a,b) -> a>2 && b<10,Tuple::tuple)
.subscribe(System.out::println);
//(3, 3)
(3, 4)
(3, 5)
(3, 6)
(3, 7)
(3, 8)
(3, 9)
...
*
* }</pre>
*
*
* @param value1 top level Flowable
* @param value2 Nested publisher
* @param filterFunction A filtering function, keeps values where the predicate holds
* @param yieldingFunction Generates a result per combination
* @return
*/
public static <T, R1, R> Flowable<R> forEach(Flowable<? extends T> value1,
Function<? super T, ? extends Publisher<R1>> value2,
BiFunction<? super T, ? super R1, Boolean> filterFunction,
BiFunction<? super T, ? super R1, ? extends R> yieldingFunction) {
return value1.flatMap(in -> {
Flowable<R1> a = Flowable.fromPublisher(value2.apply(in));
return a.filter(in2->filterFunction.apply(in,in2))
.map(in2 -> yieldingFunction.apply(in, in2));
});
}