io.reactivex.Single#map ( )源码实例Demo

下面列出了io.reactivex.Single#map ( ) 实例代码,或者点击链接到github查看源代码,也可以在右侧发表评论。

源代码1 项目: Melophile   文件: MusicRemoteSource.java
@Override
public Single<List<PlaylistEntity>> getPlaylistsBy(List<String> categories) {
  if (categories != null) {
    Single<List<PlaylistEntity>> start = Single.just(new LinkedList<>());
    for (String category : categories) {
      start = Single.zip(start, service.searchPlaylists(PlaylistEntity
              .Filter.start()
              .byName(category)
              .limit(100)
              .createOptions())
              .onErrorResumeNext(Single.just(new ArrayList<>())), (first, second) -> {
        if (second != null) {
          first.addAll(second);
        }
        return first;
      });
    }
    return start.map(filter::filterPlaylists);
  }
  return Single.error(new IllegalArgumentException("categories are null"));
}
 
源代码2 项目: Melophile   文件: MusicRemoteSource.java
@Override
public Single<List<TrackEntity>> getTracksBy(List<String> categories) {
  if (categories != null) {
    Single<List<TrackEntity>> start = Single.just(new LinkedList<>());
    for (String category : categories) {
      start = Single.zip(start, service.searchTracks(TrackEntity
              .Filter.start()
              .byTags(category)
              .createOptions())
              .onErrorResumeNext(Single.just(new ArrayList<>())), (first, second) -> {
        if (second != null) {
          first.addAll(second);
        }
        return first;
      });
    }
    return start.map(filter::filterTracks);
  }
  return Single.error(new IllegalArgumentException("categories are null"));
}
 
源代码3 项目: vertx-rx   文件: SingleUnmarshaller.java
@Override
public SingleSource<T> apply(@NonNull Single<B> upstream) {
  Single<Buffer> unwrapped = upstream.map(unwrap::apply);
  Single<T> unmarshalled = unwrapped.flatMap(buffer -> {
    try {
      T obj;
      if (mapper != null) {
        JsonParser parser = mapper.getFactory().createParser(buffer.getBytes());
        obj = nonNull(mappedType) ? mapper.readValue(parser, mappedType) :
          mapper.readValue(parser, mappedTypeRef);
      } else {
        obj = getT(buffer, mappedType, mappedTypeRef);
      }
      return Single.just(obj);
    } catch (Exception e) {
      return Single.error(e);
    }
  });
  return unmarshalled;
}
 
源代码4 项目: micronaut-kafka   文件: ProductListener.java
@Topic("awesome-products") // <1>
@SendTo("product-quantities") // <2>
public Single<Integer> receiveProduct(
        @KafkaKey String brand,
        Single<Product> productSingle) {

    return productSingle.map(product -> {
        System.out.println("Got Product - " + product.getName() + " by " + brand);
        return product.getQuantity(); // <3>
    });
}
 
源代码5 项目: xian   文件: DistributedLock.java
/**
 * @param cacheConfigBean    cacheConfigBean
 * @param key                key
 * @param value              value
 * @param expireTimeInSecond 单位: 秒, KEY 过期时间
 * @param timeOutInSecond    单位: 秒, 获取锁超时时间
 * @return DistributedLock
 */
public static Single<DistributedLock> lock(CacheConfigBean cacheConfigBean, String key, Object value, int expireTimeInSecond, int timeOutInSecond) {
    final long applyTime = System.currentTimeMillis();

    final int _expireTimeInSecond = expireTimeInSecond < 1 ? 3 : expireTimeInSecond;
    final int _timeOutInSecond = timeOutInSecond < 1 ? 3 : timeOutInSecond;

    if (expireTimeInSecond != _expireTimeInSecond)
        LOG.warn(String.format("key: %s, 原 expireTime: %s < 1, 校正为现 expireTime: %s", key, expireTimeInSecond, _expireTimeInSecond));
    if (timeOutInSecond != _timeOutInSecond)
        LOG.warn(String.format("key: %s, 原 timeOutInSecond: %s < 1, 校正为现 timeOutInSecond: %s", key, timeOutInSecond, _timeOutInSecond));

    final String lockKey = "LOCK_" + key;

    Single<UnitResponse> single = SingleRxXian.call("cache", "distributedLock", new JSONObject() {{
        put("cacheConfig", cacheConfigBean);
        put("key", lockKey);
        put("value", value);
        put("expireTimeInSecond", _expireTimeInSecond);
        put("timeOutInSecond", _timeOutInSecond);
    }});

    final long receiveTime = System.currentTimeMillis();

    return single.map(unitResponseObject -> {
        if (unitResponseObject.succeeded()) {
            int autoIncrement = AUTO_INCREMENT.incrementAndGet();

            if (!EnvUtil.getEnv().equals(EnvUtil.PRODUCTION))
                LOG.info(String.format("锁编号: %s, key: %s, lockKey: %s, value: %s, 分布式加锁, 成功, 耗时: %s 毫秒", autoIncrement, key, lockKey, value, (receiveTime - applyTime)));

            return new DistributedLock(autoIncrement, key, lockKey, value);
        } else if (unitResponseObject.getCode().equals(Group.CODE_TIME_OUT))
            throw new TimeOutException(String.format("分布式加锁, 超时, key: %s, lockKey: %s, 耗时: %s 毫秒", key, lockKey, (receiveTime - applyTime)));
        else
            throw new RuntimeException(String.format("分布式加锁, 异常, key: %s, lockKey: %s, 耗时: %s 毫秒", key, lockKey, (receiveTime - applyTime)));
    });
}
 
源代码6 项目: xian   文件: DistributedLock.java
public Single<Boolean> unlock(CacheConfigBean cacheConfigBean) {
    final long applyTime = System.currentTimeMillis();

    if (lockKey == null)
        return Single.just(false);

    Single<UnitResponse> single = SingleRxXian.call("cache", "distributedUnLock", new JSONObject() {{
        put("cacheConfig", cacheConfigBean);
        put("key", lockKey);
        put("value", value);
    }});

    return single.map(unitResponseObject -> {
        if (!EnvUtil.getEnv().equals(EnvUtil.PRODUCTION)) {
            final long receiveTime = System.currentTimeMillis();
            final String result = unitResponseObject.succeeded() ? "成功" : "失败";
            LOG.info(String.format("锁编号: %s, key: %s, lockKey: %s, value: %s, 分布式解锁, %s, 影响数量: %s, 耗时: %s 毫秒", autoIncrement, key, lockKey, value, result, unitResponseObject.getData(), (receiveTime - applyTime)));
        }

        if (!unitResponseObject.succeeded()) {
            LOG.error(unitResponseObject);
            return false;
        }

        return true;
    });
}
 
源代码7 项目: RIBs   文件: Step.java
/**
 * Create a new step with a single that always returns a value.
 *
 * @param stepDataSingle - a single that returns a result for this step.
 * @param <T> type of return value (if any) for this step.
 * @param <A> type of {@link ActionableItem} this step returns when finished
 * @return a new {@link Step}.
 */
public static <T, A extends ActionableItem> Step<T, A> from(Single<Data<T, A>> stepDataSingle) {
  return new Step<>(
      stepDataSingle.map(
          new Function<Data<T, A>, Optional<Data<T, A>>>() {
            @Override
            public Optional<Data<T, A>> apply(Data<T, A> data) throws Exception {
              return Optional.of(data);
            }
          }));
}
 
源代码8 项目: autorest   文件: JreResourceBuilder.java
@SuppressWarnings("unchecked")
@Override public <T> T fromJson(Single<Reader> req, Class<? super T> container, Class<?> type) {
    if (Completable.class.equals(container)) return (T) req.doOnSuccess(this::consume).toCompletable();
    if (Single.class.equals(container)) return (T) req.map(reader -> {
        if (Reader.class.equals(type)) return reader;
        if (String.class.equals(type)) return readAsString(reader);
        return gson.fromJson(reader, type);
    });
    if (Observable.class.equals(container)) return (T) req.toObservable()
            .flatMapIterable(n -> () -> new ParseArrayIterator<>(n, type));
    throw new IllegalArgumentException("unsupported type " + container);
}
 
源代码9 项目: cyclops   文件: FlowableReactiveSeqImpl.java
@Override
public <R, A> ReactiveSeq<R> collectAll(Collector<? super T, A, R> collector) {

      Single<A> inter = flowable.collect(()->collector.supplier().get(), (a,b)->collector.accumulator().accept(a,b));
      Single<R> res = inter.map(Functions.rxFunction(collector.finisher()));
      return flux(res.toFlowable());

}
 
源代码10 项目: reactive-grpc   文件: FusedTckService.java
@Override
public Single<Message> oneToOne(Single<Message> request) {
    return request.map(this::maybeExplode);
}