下面列出了java.util.stream.Collectors#mapping ( ) 实例代码,或者点击链接到github查看源代码,也可以在右侧发表评论。
@Test
public void executeWithCollector(TestContext testContext) {
initializeNamesKeyspace();
String prefix = "(";
String suffix = ")";
String delimiter = ",";
Collector<Row, ?, String> collector = Collectors.mapping(
row -> row.getString(0),
Collectors.joining(delimiter, prefix, suffix)
);
String insert = "INSERT INTO names.names_by_first_letter (first_letter, name) VALUES (?, ?)";
client.prepare(insert, testContext.asyncAssertSuccess(prepared -> {
BatchStatement batch = BatchStatement.newInstance(BatchType.LOGGED);
for (String name : Stream.of("Paul", "Paulo", "Pavel").collect(Collectors.toSet())) {
batch = batch.add(prepared.bind(name.substring(0, 1), name));
}
client.execute(batch, testContext.asyncAssertSuccess(exec -> {
String query = "select name from names.names_by_first_letter where first_letter = 'P'";
client.execute(query, collector, testContext.asyncAssertSuccess(result -> {
testContext.assertEquals(result, "(Paul,Paulo,Pavel)");
}));
}));
}));
}
public void collector02Example(SqlClient client) {
// Create a collector projecting a row set to a (last_name_1,last_name_2,...)
Collector<Row, ?, String> collector = Collectors.mapping(
row -> row.getString("last_name"),
Collectors.joining(",", "(", ")")
);
// Run the query with the collector
client.query("SELECT * FROM users").collecting(collector).execute(ar -> {
if (ar.succeeded()) {
SqlResult<String> result = ar.result();
// Get the string created by the collector
String list = result.value();
System.out.println("Got " + list);
} else {
System.out.println("Failure: " + ar.cause().getMessage());
}
});
}
public void collector02Example(SqlClient client) {
// Create a collector projecting a row set to a (last_name_1,last_name_2,...)
Collector<Row, ?, String> collector = Collectors.mapping(
row -> row.getString("last_name"),
Collectors.joining(",", "(", ")")
);
// Run the query with the collector
client.query("SELECT * FROM users").collecting(collector).execute(ar -> {
if (ar.succeeded()) {
SqlResult<String> result = ar.result();
// Get the string created by the collector
String list = result.value();
System.out.println("Got " + list);
} else {
System.out.println("Failure: " + ar.cause().getMessage());
}
});
}
public void collector02Example(SqlClient client) {
// Create a collector projecting a row set to a (last_name_1,last_name_2,...)
Collector<Row, ?, String> collector = Collectors.mapping(
row -> row.getString("last_name"),
Collectors.joining(",", "(", ")")
);
// Run the query with the collector
client.query("SELECT * FROM users")
.collecting(collector)
.execute(ar -> {
if (ar.succeeded()) {
SqlResult<String> result = ar.result();
// Get the string created by the collector
String list = result.value();
System.out.println("Got " + list);
} else {
System.out.println("Failure: " + ar.cause().getMessage());
}
});
}
public void collector02Example(SqlClient client) {
// Create a collector projecting a row set to a (last_name_1,last_name_2,...)
Collector<Row, ?, String> collector = Collectors.mapping(
row -> row.getString("last_name"),
Collectors.joining(",", "(", ")")
);
// Run the query with the collector
client.query("SELECT * FROM users").collecting(collector).execute(ar -> {
if (ar.succeeded()) {
SqlResult<String> result = ar.result();
// Get the string created by the collector
String list = result.value();
System.out.println("Got " + list);
} else {
System.out.println("Failure: " + ar.cause().getMessage());
}
});
}
@Test
public void testBatchedIteratorConsumption() throws Exception {
List<Object[]> expectedResult = IntStream.range(0, 10)
.mapToObj(i -> new Object[]{i})
.collect(Collectors.toList());
BatchSimulatingIterator<Row> batchSimulatingIterator =
new BatchSimulatingIterator<>(TestingBatchIterators.range(0, 10),
2,
5,
null);
CollectingRowConsumer<?, List<Object[]>> batchConsumer =
new CollectingRowConsumer<>(Collectors.mapping(Row::materialize, Collectors.toList()));
batchConsumer.accept(batchSimulatingIterator, null);
CompletableFuture<List<Object[]>> result = batchConsumer.completionFuture();
List<Object[]> consumedRows = result.get(10, TimeUnit.SECONDS);
assertThat(consumedRows.size(), is(10));
assertThat(consumedRows, Matchers.contains(expectedResult.toArray(new Object[0])));
}
/**
* Returns a {@link BiCollector} that first maps the input pair using {@code mapper} and then collects the
* results using {@code downstream} collector.
*
* @since 3.2
*/
public static <K, V, T, R> BiCollector<K, V, R> mapping(
BiFunction<? super K, ? super V, ? extends T> mapper, Collector<? super T, ?, R> downstream) {
requireNonNull(mapper);
requireNonNull(downstream);
return new BiCollector<K, V, R>() {
@Override public <E> Collector<E, ?, R> splitting(Function<E, K> toKey, Function<E, V> toValue) {
return Collectors.mapping(e -> mapper.apply(toKey.apply(e), toValue.apply(e)), downstream);
}
};
}
/**
* Returns a {@code Collector} that extracts the pairs from the input stream,
* and then collects them into a {@code BiCollection}.
*
* @param leftFunction extracts the first element of each pair
* @param rightFunction extracts the second element of each pair
*/
public static <T, L, R> Collector<T, ?, BiCollection<L, R>> toBiCollection(
Function<? super T, ? extends L> leftFunction,
Function<? super T, ? extends R> rightFunction) {
requireNonNull(leftFunction);
requireNonNull(rightFunction);
Function<T, Map.Entry<L, R>> toEntry = x -> kv(leftFunction.apply(x), rightFunction.apply(x));
Collector<T, ?, ? extends Collection<Map.Entry<L, R>>> entryCollector =
Collectors.mapping(toEntry, Collectors.toList());
return Collectors.collectingAndThen(entryCollector, BiCollection::new);
}
@Test
void testProjection() {
List<PostCommentProjection> postCommentProjections = this.postRepository.findByTitle("Post Title");
final Function<Entry<RootValueDTO, List<PostCommentsDTO>>, PostDTO> mapToPostDTO = entry -> PostDTO.builder()
.title(entry.getKey().title()).content(entry.getKey().content()).comments(entry.getValue()).build();
final Function<PostCommentProjection, RootValueDTO> titleAndContentClassifier = postCommentProjection -> new RootValueDTO(
postCommentProjection.getTitle(), postCommentProjection.getContent());
final Function<PostCommentProjection, PostCommentsDTO> mapToPostComments = postCommentProjection -> PostCommentsDTO
.builder().review(postCommentProjection.getReview()).build();
final Collector<PostCommentProjection, ?, List<PostCommentsDTO>> downStreamCollector = Collectors
.mapping(mapToPostComments, Collectors.toList());
List<PostDTO> postDTOS = postCommentProjections.stream()
.collect(groupingBy(titleAndContentClassifier, downStreamCollector)).entrySet().stream()
.map(mapToPostDTO).collect(toUnmodifiableList());
assertThat(postDTOS).isNotEmpty().hasSize(1);
PostDTO postDTO = postDTOS.get(0);
assertThat(postDTO.getTitle()).isEqualTo("Post Title");
assertThat(postDTO.getContent()).isEqualTo("Post Content");
assertThat(postDTO.getComments()).isNotEmpty().hasSizeGreaterThanOrEqualTo(2);
assertThat(postDTO.getComments()).contains(PostCommentsDTO.builder().review("Review New").build(),
PostCommentsDTO.builder().review("Review Old").build());
}
private CompletableFuture<List<Row>> retrieveRows(ShardRouting activePrimaryRouting,
RoutedCollectPhase collectPhase,
CollectTask collectTask,
ShardCollectorProviderFactory shardCollectorProviderFactory) {
Collector<Row, ?, List<Object[]>> listCollector = Collectors.mapping(Row::materialize, Collectors.toList());
CollectingRowConsumer<?, List<Object[]>> consumer = new CollectingRowConsumer<>(listCollector);
String nodeId = activePrimaryRouting.currentNodeId();
String localNodeId = clusterService.localNode().getId();
if (localNodeId.equalsIgnoreCase(nodeId)) {
var indexShard = indicesService.indexServiceSafe(activePrimaryRouting.index())
.getShard(activePrimaryRouting.shardId().id());
var collectorProvider = shardCollectorProviderFactory.create(indexShard);
BatchIterator<Row> it;
try {
it = collectorProvider.getIterator(collectPhase, consumer.requiresScroll(), collectTask);
} catch (Exception e) {
return Exceptions.rethrowRuntimeException(e);
}
consumer.accept(it, null);
} else {
UUID childJobId = UUID.randomUUID();
RemoteCollector remoteCollector = new RemoteCollector(
childJobId,
collectTask.txnCtx().sessionSettings(),
localNodeId,
nodeId,
transportActionProvider.transportJobInitAction(),
transportActionProvider.transportKillJobsNodeAction(),
searchTp,
tasksService,
collectTask.getRamAccounting(),
consumer,
createRemoteCollectPhase(childJobId, collectPhase, activePrimaryRouting.shardId(), nodeId)
);
remoteCollector.doCollect();
}
return consumer
.completionFuture()
.thenApply(rows -> LazyMapList.of(rows, Buckets.arrayToSharedRow()));
}
@Override
public BatchIterator<Row> apply(BatchIterator<Row> batchIterator) {
Collector<Row, ?, Bucket> collector = Collectors.mapping(
this::getCells,
Collectors.collectingAndThen(Collectors.toList(), this::sortAndCreateBucket));
return CollectingBatchIterator.newInstance(batchIterator, collector);
}
static <T, A, R> Collector<T, ?, R> checkingNulls(Collector<T, A, R> downstream) {
return Collectors.mapping(Objects::requireNonNull, downstream);
}
/**
* Returns a {@link Collector} that lists the file lines numbered by the input stream (1-based).
*/
Collector<Long, ?, String> toLineList() {
return Collectors.mapping(this::listLine, joining("\n"));
}
public TestingRowConsumer() {
consumer = new CollectingRowConsumer<>(Collectors.mapping(Row::materialize, Collectors.toList()));
}
/**
* Returns a collector that collects input elements into a list, which is then arranged by the
* {@code arranger} function before being wrapped as <em>immutable</em> list result.
* List elements are not allowed to be null.
*
* <p>Example usages: <ul>
* <li>{@code stream.collect(toListAndThen(Collections::reverse))} to collect to reverse order.
* <li>{@code stream.collect(toListAndThen(Collections::shuffle))} to collect and shuffle.
* <li>{@code stream.collect(toListAndThen(Collections::sort))} to collect and sort.
* </ul>
*
* @since 4.2
*/
public static <T> Collector<T, ?, List<T>> toListAndThen(Consumer<? super List<T>> arranger) {
requireNonNull(arranger);
Collector<T, ?, List<T>> rejectingNulls =
Collectors.mapping(Objects::requireNonNull, Collectors.toCollection(ArrayList::new));
return Collectors.collectingAndThen(rejectingNulls, list -> {
arranger.accept(list);
return Collections.unmodifiableList(list);
});
}
/**
* Adapts a {@code Collector} accepting elements of type {@code U} to one
* accepting elements of type {@code T} by applying a mapping function to
* each input element before accumulation.
*
* <p>
* Unlike {@link Collectors#mapping(Function, Collector)} this method
* returns a
* <a href="package-summary.html#ShortCircuitReduction">short-circuiting
* collector</a> if the downstream collector is short-circuiting.
*
* @param <T> the type of the input elements
* @param <U> type of elements accepted by downstream collector
* @param <A> intermediate accumulation type of the downstream collector
* @param <R> result type of collector
* @param mapper a function to be applied to the input elements
* @param downstream a collector which will accept mapped values
* @return a collector which applies the mapping function to the input
* elements and provides the mapped results to the downstream
* collector
* @throws NullPointerException if mapper is null, or downstream is null.
* @see Collectors#mapping(Function, Collector)
* @since 0.4.0
*/
public static <T, U, A, R> Collector<T, ?, R> mapping(Function<? super T, ? extends U> mapper,
Collector<? super U, A, R> downstream) {
Objects.requireNonNull(mapper);
Predicate<A> finished = finished(downstream);
if (finished != null) {
BiConsumer<A, ? super U> downstreamAccumulator = downstream.accumulator();
return new CancellableCollectorImpl<>(downstream.supplier(), (acc, t) -> {
if (!finished.test(acc))
downstreamAccumulator.accept(acc, mapper.apply(t));
}, downstream.combiner(), downstream.finisher(), finished, downstream.characteristics());
}
return Collectors.mapping(mapper, downstream);
}
/**
* Returns a {@link Map} where elements of this stream with the same key are
* grouped together. The resulting {@code Map} keys are the keys of this
* stream entries and the corresponding values are combined using the
* provided downstream collector.
*
* <p>
* There are no guarantees on the type, mutability, serializability, or
* thread-safety of the {@code Map} object returned. If more control over
* the returned {@code Map} is required, use
* {@link #grouping(Supplier, Collector)}.
*
* <p>
* This is a <a href="package-summary.html#StreamOps">terminal</a>
* operation.
*
* @param <A> the intermediate accumulation type of the downstream collector
* @param <D> the result type of the downstream reduction
* @param downstream a {@code Collector} implementing the downstream
* reduction
* @return a {@code Map} containing the elements of this stream
* @see Collectors#groupingBy(Function, Collector)
*/
public <A, D> Map<K, D> grouping(Collector<? super V, A, D> downstream) {
Function<Entry<K, V>, K> keyMapper = Entry::getKey;
Collector<Entry<K, V>, ?, D> mapping = Collectors.mapping(Entry::getValue, downstream);
if (isParallel() && downstream.characteristics().contains(Characteristics.UNORDERED)) {
return collect(Collectors.groupingByConcurrent(keyMapper, mapping));
}
return collect(Collectors.groupingBy(keyMapper, mapping));
}
/**
* Returns a {@link Map} where elements of this stream with the same key are
* grouped together. The resulting {@code Map} keys are the keys of this
* stream entries and the corresponding values are combined using the
* provided downstream collector. The {@code Map} is created using
* the provided supplier function.
*
* <p>
* This is a <a href="package-summary.html#StreamOps">terminal</a>
* operation.
*
* @param <A> the intermediate accumulation type of the downstream collector
* @param <D> the result type of the downstream reduction
* @param <M> the type of the resulting {@code Map}
* @param mapSupplier a function which returns a new, empty {@code Map} into
* which the results will be inserted
* @param downstream a {@code Collector} implementing the downstream
* reduction
* @return a {@code Map} containing the elements of this stream
* @see Collectors#groupingBy(Function, Supplier, Collector)
*/
@SuppressWarnings("unchecked")
public <A, D, M extends Map<K, D>> M grouping(Supplier<M> mapSupplier, Collector<? super V, A, D> downstream) {
Function<Entry<K, V>, K> keyMapper = Entry::getKey;
Collector<Entry<K, V>, ?, D> mapping = Collectors.mapping(Entry::getValue, downstream);
if (isParallel() && downstream.characteristics().contains(Characteristics.UNORDERED)
&& mapSupplier.get() instanceof ConcurrentMap) {
return (M) collect(Collectors.groupingByConcurrent(keyMapper,
(Supplier<? extends ConcurrentMap<K, D>>) mapSupplier, mapping));
}
return collect(Collectors.groupingBy(keyMapper, mapSupplier, mapping));
}
/**
* Returns a collector which collects input elements to the new {@code List}
* transforming them with the supplied function beforehand.
*
* <p>
* This method behaves like
* {@code Collectors.mapping(mapper, Collectors.toList())}.
*
* <p>
* There are no guarantees on the type, mutability, serializability, or
* thread-safety of the {@code List} returned.
*
* @param <T> the type of the input elements
* @param <U> the resulting type of the mapper function
* @param mapper a function to be applied to the input elements
* @return a collector which applies the mapping function to the input
* elements and collects the mapped results to the {@code List}
* @throws NullPointerException if mapper is null.
* @see #mapping(Function, Collector)
* @since 0.6.0
*/
public static <T, U> Collector<T, ?, List<U>> mapping(Function<? super T, ? extends U> mapper) {
return Collectors.mapping(mapper, Collectors.toList());
}