下面列出了io.reactivex.Single#map ( ) 实例代码,或者点击链接到github查看源代码,也可以在右侧发表评论。
@Override
public Single<List<PlaylistEntity>> getPlaylistsBy(List<String> categories) {
if (categories != null) {
Single<List<PlaylistEntity>> start = Single.just(new LinkedList<>());
for (String category : categories) {
start = Single.zip(start, service.searchPlaylists(PlaylistEntity
.Filter.start()
.byName(category)
.limit(100)
.createOptions())
.onErrorResumeNext(Single.just(new ArrayList<>())), (first, second) -> {
if (second != null) {
first.addAll(second);
}
return first;
});
}
return start.map(filter::filterPlaylists);
}
return Single.error(new IllegalArgumentException("categories are null"));
}
@Override
public Single<List<TrackEntity>> getTracksBy(List<String> categories) {
if (categories != null) {
Single<List<TrackEntity>> start = Single.just(new LinkedList<>());
for (String category : categories) {
start = Single.zip(start, service.searchTracks(TrackEntity
.Filter.start()
.byTags(category)
.createOptions())
.onErrorResumeNext(Single.just(new ArrayList<>())), (first, second) -> {
if (second != null) {
first.addAll(second);
}
return first;
});
}
return start.map(filter::filterTracks);
}
return Single.error(new IllegalArgumentException("categories are null"));
}
@Override
public SingleSource<T> apply(@NonNull Single<B> upstream) {
Single<Buffer> unwrapped = upstream.map(unwrap::apply);
Single<T> unmarshalled = unwrapped.flatMap(buffer -> {
try {
T obj;
if (mapper != null) {
JsonParser parser = mapper.getFactory().createParser(buffer.getBytes());
obj = nonNull(mappedType) ? mapper.readValue(parser, mappedType) :
mapper.readValue(parser, mappedTypeRef);
} else {
obj = getT(buffer, mappedType, mappedTypeRef);
}
return Single.just(obj);
} catch (Exception e) {
return Single.error(e);
}
});
return unmarshalled;
}
@Topic("awesome-products") // <1>
@SendTo("product-quantities") // <2>
public Single<Integer> receiveProduct(
@KafkaKey String brand,
Single<Product> productSingle) {
return productSingle.map(product -> {
System.out.println("Got Product - " + product.getName() + " by " + brand);
return product.getQuantity(); // <3>
});
}
/**
* @param cacheConfigBean cacheConfigBean
* @param key key
* @param value value
* @param expireTimeInSecond 单位: 秒, KEY 过期时间
* @param timeOutInSecond 单位: 秒, 获取锁超时时间
* @return DistributedLock
*/
public static Single<DistributedLock> lock(CacheConfigBean cacheConfigBean, String key, Object value, int expireTimeInSecond, int timeOutInSecond) {
final long applyTime = System.currentTimeMillis();
final int _expireTimeInSecond = expireTimeInSecond < 1 ? 3 : expireTimeInSecond;
final int _timeOutInSecond = timeOutInSecond < 1 ? 3 : timeOutInSecond;
if (expireTimeInSecond != _expireTimeInSecond)
LOG.warn(String.format("key: %s, 原 expireTime: %s < 1, 校正为现 expireTime: %s", key, expireTimeInSecond, _expireTimeInSecond));
if (timeOutInSecond != _timeOutInSecond)
LOG.warn(String.format("key: %s, 原 timeOutInSecond: %s < 1, 校正为现 timeOutInSecond: %s", key, timeOutInSecond, _timeOutInSecond));
final String lockKey = "LOCK_" + key;
Single<UnitResponse> single = SingleRxXian.call("cache", "distributedLock", new JSONObject() {{
put("cacheConfig", cacheConfigBean);
put("key", lockKey);
put("value", value);
put("expireTimeInSecond", _expireTimeInSecond);
put("timeOutInSecond", _timeOutInSecond);
}});
final long receiveTime = System.currentTimeMillis();
return single.map(unitResponseObject -> {
if (unitResponseObject.succeeded()) {
int autoIncrement = AUTO_INCREMENT.incrementAndGet();
if (!EnvUtil.getEnv().equals(EnvUtil.PRODUCTION))
LOG.info(String.format("锁编号: %s, key: %s, lockKey: %s, value: %s, 分布式加锁, 成功, 耗时: %s 毫秒", autoIncrement, key, lockKey, value, (receiveTime - applyTime)));
return new DistributedLock(autoIncrement, key, lockKey, value);
} else if (unitResponseObject.getCode().equals(Group.CODE_TIME_OUT))
throw new TimeOutException(String.format("分布式加锁, 超时, key: %s, lockKey: %s, 耗时: %s 毫秒", key, lockKey, (receiveTime - applyTime)));
else
throw new RuntimeException(String.format("分布式加锁, 异常, key: %s, lockKey: %s, 耗时: %s 毫秒", key, lockKey, (receiveTime - applyTime)));
});
}
public Single<Boolean> unlock(CacheConfigBean cacheConfigBean) {
final long applyTime = System.currentTimeMillis();
if (lockKey == null)
return Single.just(false);
Single<UnitResponse> single = SingleRxXian.call("cache", "distributedUnLock", new JSONObject() {{
put("cacheConfig", cacheConfigBean);
put("key", lockKey);
put("value", value);
}});
return single.map(unitResponseObject -> {
if (!EnvUtil.getEnv().equals(EnvUtil.PRODUCTION)) {
final long receiveTime = System.currentTimeMillis();
final String result = unitResponseObject.succeeded() ? "成功" : "失败";
LOG.info(String.format("锁编号: %s, key: %s, lockKey: %s, value: %s, 分布式解锁, %s, 影响数量: %s, 耗时: %s 毫秒", autoIncrement, key, lockKey, value, result, unitResponseObject.getData(), (receiveTime - applyTime)));
}
if (!unitResponseObject.succeeded()) {
LOG.error(unitResponseObject);
return false;
}
return true;
});
}
/**
* Create a new step with a single that always returns a value.
*
* @param stepDataSingle - a single that returns a result for this step.
* @param <T> type of return value (if any) for this step.
* @param <A> type of {@link ActionableItem} this step returns when finished
* @return a new {@link Step}.
*/
public static <T, A extends ActionableItem> Step<T, A> from(Single<Data<T, A>> stepDataSingle) {
return new Step<>(
stepDataSingle.map(
new Function<Data<T, A>, Optional<Data<T, A>>>() {
@Override
public Optional<Data<T, A>> apply(Data<T, A> data) throws Exception {
return Optional.of(data);
}
}));
}
@SuppressWarnings("unchecked")
@Override public <T> T fromJson(Single<Reader> req, Class<? super T> container, Class<?> type) {
if (Completable.class.equals(container)) return (T) req.doOnSuccess(this::consume).toCompletable();
if (Single.class.equals(container)) return (T) req.map(reader -> {
if (Reader.class.equals(type)) return reader;
if (String.class.equals(type)) return readAsString(reader);
return gson.fromJson(reader, type);
});
if (Observable.class.equals(container)) return (T) req.toObservable()
.flatMapIterable(n -> () -> new ParseArrayIterator<>(n, type));
throw new IllegalArgumentException("unsupported type " + container);
}
@Override
public <R, A> ReactiveSeq<R> collectAll(Collector<? super T, A, R> collector) {
Single<A> inter = flowable.collect(()->collector.supplier().get(), (a,b)->collector.accumulator().accept(a,b));
Single<R> res = inter.map(Functions.rxFunction(collector.finisher()));
return flux(res.toFlowable());
}
@Override
public Single<Message> oneToOne(Single<Message> request) {
return request.map(this::maybeExplode);
}