下面列出了java.util.stream.Stream#collect ( ) 实例代码,或者点击链接到github查看源代码,也可以在右侧发表评论。
/**
* Reflectively build <a href="https://www.elastic.co/guide/en/elasticsearch/reference/current/mapping.html">elastic mapping</a>
* for a class. Currently works only for immutable interfaces and is
* very limited in its functionality.
*/
static Mapping of(Class<?> clazz) {
Objects.requireNonNull(clazz, "clazz");
Preconditions.checkArgument(clazz.isInterface(), "Expected %s to be an interface", clazz);
Map<String, String> map = new LinkedHashMap<>();
Stream<Method> methods = Arrays.stream(clazz.getDeclaredMethods())
.filter(m -> m.getParameterCount() == 0)
.filter(m -> m.getReturnType() != Void.class)
.filter(m -> Modifier.isPublic(m.getModifiers()))
.filter(m -> !Modifier.isStatic(m.getModifiers()))
.filter(m -> m.getDeclaringClass() != Object.class);
for (Method method: methods.collect(Collectors.toSet())) {
Class<?> returnType = method.getReturnType();
// skip arrays and iterables (we don't handle them yet)
if (returnType.isArray() || Iterable.class.isAssignableFrom(returnType)) {
continue;
}
Type type = method.getGenericReturnType();
map.put(method.getName(), elasticType(type));
}
return Mapping.ofElastic(map);
}
public static <I extends Inventory, S extends Slot<I, ?>> Map<S, ItemStack> chooseSlots(I inv, Stream<? extends S> slots, ItemStack stack) {
final Map<S, ItemStack> map = new HashMap<>();
final ItemStack remaining = stack.clone();
final List<? extends S> slotList = slots.collect(Collectors.toList());
Stream.<S>concat(similar(inv, slotList.stream(), remaining), empty(inv, slotList.stream())).forEach(slot -> {
if(!ItemUtils.isNothing(remaining)) {
final int transferAmount = slot.maxTransferrableIn(remaining, inv);
if(transferAmount > 0) {
final ItemStack transferStack = remaining.clone();
remaining.setAmount(remaining.getAmount() - transferAmount);
transferStack.setAmount(transferAmount);
map.put(slot, transferStack);
}
}
});
return map;
}
@Test
public void parallelMinMax() {
final Stream<Integer> stream = IntStream.range(0, 100).boxed().parallel();
final MinMax<Integer> minMax = stream.collect(
MinMax::of,
MinMax::accept,
MinMax::combine
);
assertEquals(minMax.max(), Integer.valueOf(99));
assertEquals(minMax.min(), Integer.valueOf(0));
assertEquals(100, minMax.count());
}
@Test
public void testGetIssueDiscussionsByStream() throws Exception {
Stream<Discussion> stream = new DiscussionsApi(gitLabApi).getIssueDiscussionsStream(1, 1);
assertNotNull(stream);
List<Discussion> discussions = stream.collect(Collectors.toList());
assertTrue(compareJson(discussions, "issue-discussions.json"));
}
@Test
public void testThatGetsImportDependenciesFromClass() {
final Stream<String> methodDependencies = dependenciesOf(methodOf(MyGenericInterfaceWithMethods.class, "getA"));
final Set<String> dependencySet = methodDependencies.collect(toSet());
assertEquals(3, dependencySet.size());
assertThat(dependencySet, hasItems(
"java.io.IOException",
"java.util.Optional",
"java.lang.RuntimeException"
));
}
@Test
public void testStatsLogging()
throws JsonSyntaxException, UnsupportedEncodingException, IOException, OperationException {
DummyOperationFactory mutatorFactory = new DummyOperationFactory();
OperationProcessor processor = new OperationProcessor(mutatorFactory);
/*
* Mock the Stat object
*/
Stat runtimeStat = mock(Stat.class);
Stat successStat = mock(Stat.class);
Stat errorStat = mock(Stat.class);
processor.setRuntimeStat(runtimeStat);
processor.setSuccessCountStat(successStat);
processor.setErrorCountStat(errorStat);
InternalEvent ievent = new InternalEvent("foo", null, 1);
ievent.setEventObj(new DummyDeserializerHelper.DummyStringEvent("test"));
Stream<InternalEvent> stream = processor.perform(Stream.of(ievent));
List<InternalEvent> output = stream.collect(Collectors.toList());
/*
* Verify start, stop, increment success count, and never increment error count.
*/
verify(runtimeStat, times(1)).start();
verify(runtimeStat, times(1)).stop();
verify(successStat, times(1)).increment();
verify(errorStat, never()).increment();
/*
* Verify contents of output stream
*/
assertEquals(1, output.size());
}
protected void checkRecords(Map<String, Collection<ExtractedRecord>> recordMap) {
Collection<ExtractedRecord> records = recordMap.get(annotatorClass.getSimpleName());
Stream<ExtractedRecord> recordStream =
records.stream()
.filter(p -> p.getKind().equals(Kind.NAMED) && p.getName().equals("record1"));
List<ExtractedRecord> collect = recordStream.collect(Collectors.toList());
ExtractedRecord record1 = collect.get(0);
assertEquals(Kind.NAMED, record1.getKind());
assertEquals(2, record1.getFields().size());
assertEquals("The quick brown", findFieldValue("record1Field1", record1.getFields()));
assertEquals("fox jumped over", findFieldValue("record1Field2", record1.getFields()));
ExtractedRecord record2 =
records.stream()
.filter(p -> p.getKind().equals(Kind.NAMED) && p.getName().equals("record2"))
.collect(Collectors.toList())
.get(0);
assertEquals(Kind.NAMED, record1.getKind());
assertEquals(2, record2.getFields().size());
assertEquals("The quick brown", findFieldValue("record2Field1", record2.getFields()));
assertEquals("cat jumped over", findFieldValue("record2Field2", record2.getFields()));
ExtractedRecord defaultRecord =
records.stream()
.filter(p -> p.getKind().equals(Kind.DEFAULT))
.collect(Collectors.toList())
.get(0);
assertEquals(null, defaultRecord.getName());
assertEquals(2, defaultRecord.getFields().size());
assertEquals("The quick brown", findFieldValue("noRecordField1", defaultRecord.getFields()));
assertEquals("rat jumped over", findFieldValue("noRecordField2", defaultRecord.getFields()));
}
@Test
public void givenIntegerStream_whenCollectOnMinByProjection_shouldReturnOptionalMinValue() {
Stream<String> integerStream = Stream.of("abc", "bb", "ccc", "1");
Optional<String> max = integerStream.collect(minBy(String::length));
assertThat(max.get(), is("1"));
}
@Test public void
skip_while_inclusive_skips_items_while_condition_is_met_but_includes_first_condition_breaker() {
Stream<Integer> ints = Stream.of(1,2,3,4,5,6,7,8,9,10);
Stream<Integer> skipped = StreamUtils.skipWhileInclusive(ints, i -> i < 4);
List<Integer> collected = skipped.collect(Collectors.toList());
assertThat(collected, contains(5, 6, 7, 8, 9, 10));
}
private void checkMalformedInputException(Stream<String> s) {
try {
List<String> lines = s.collect(Collectors.toList());
fail("UncheckedIOException expected");
} catch (UncheckedIOException ex) {
IOException cause = ex.getCause();
assertTrue(cause instanceof MalformedInputException,
"MalformedInputException expected");
}
}
/**
* @return all locked pages stripes underlying collectinos
*/
private List<Collection<FullPageId>> getAllLockedPages() {
Object tracker = delayedReplacementTracker();
Object[] stripes = U.field(tracker, "stripes");
Stream<Collection<FullPageId>> locked = Arrays.asList(stripes).stream().map(stripe ->
(Collection<FullPageId>)U.field(stripe, "locked"));
return locked.collect(Collectors.toList());
}
/**
* For each row mapping returned by the {@link #resultSet}, convert the vertex/edge corresponding
* to each {@link GraphTraversal#select}'ed alias, into corresponding object representations.
*/
private Selections resultSet(GraphTraversalSource g, Selections selections) {
Stream<Map<String, Object>> resultStream = (Stream<Map<String, Object>>) resultSet(g);
List<Map<String, Object>> resultRows = resultStream.collect(Collectors.toList());
for (Map<String, Object> resultRow : resultRows) {
Selections.Selection selection = Selections.Selection.of();
selections.add(selection);
for (Map.Entry<String, Object> entry : resultRow.entrySet()) {
String alias = entry.getKey();
selection.put(alias, Parser.as(entry.getValue(), selections.as(alias)));
}
}
return selections;
}
@Test
public void testGetProfileTypesForTriggerChannel2() {
Stream<ProfileTypeDTO> result = ressource.getProfileTypes(null, otherTriggerChannelTypeUID.toString(), null);
List<ProfileTypeDTO> list = result.collect(Collectors.toList());
// should be only the second trigger profile because the first one is restricted to another channel type UID
assertThat(list.size(), is(1));
ProfileTypeDTO pt = list.get(0);
assertThat(pt.kind, is("TRIGGER"));
assertThat(pt.label, is("profile4"));
assertThat(pt.uid, is(triggerProfileTypeUID2.toString()));
}
/**
* Returns the list of the platform MXBean proxies for
* forwarding the method calls of the {@code mxbeanInterface}
* through the given {@code MBeanServerConnection}.
* The returned list may contain zero, one, or more instances.
* The number of instances in the returned list is defined
* in the specification of the given management interface.
* The order is undefined and there is no guarantee that
* the list returned is in the same order as previous invocations.
*
* @param connection the {@code MBeanServerConnection} to forward to.
* @param mxbeanInterface a management interface for a platform
* MXBean
* @param <T> an {@code mxbeanInterface} type parameter
*
* @return the list of platform MXBean proxies for
* forwarding the method calls of the {@code mxbeanInterface}
* through the given {@code MBeanServerConnection}.
*
* @throws IllegalArgumentException if {@code mxbeanInterface}
* is not a platform management interface.
*
* @throws java.io.IOException if a communication problem
* occurred when accessing the {@code MBeanServerConnection}.
*
* @see #newPlatformMXBeanProxy
* @since 1.7
*/
public static <T extends PlatformManagedObject>
List<T> getPlatformMXBeans(MBeanServerConnection connection,
Class<T> mxbeanInterface)
throws java.io.IOException
{
// Validates at first the specified interface by finding at least one
// PlatformComponent whose MXBean implements this interface.
// An interface can be implemented by different MBeans, provided by
// different platform components.
PlatformComponent<?> pc = PlatformMBeanFinder.findFirst(mxbeanInterface);
if (pc == null) {
throw new IllegalArgumentException(mxbeanInterface.getName()
+ " is not a platform management interface");
}
// Collect all names, eliminate duplicates.
Stream<String> names = Stream.empty();
for (PlatformComponent<?> p : platformComponents()) {
names = Stream.concat(names, getProxyNames(p, connection, mxbeanInterface));
}
Set<String> objectNames = names.collect(Collectors.toSet());
if (objectNames.isEmpty()) return Collections.emptyList();
// Map names on proxies.
List<T> proxies = new ArrayList<>();
for (String name : objectNames) {
proxies.add(newPlatformMXBeanProxy(connection, name, mxbeanInterface));
}
return proxies;
}
public Optional<Artist> biggestGroup(Stream<Artist> artists) {
Function<Artist,Long> getCount = artist -> artist.getMembers().count();
return artists.collect(maxBy(comparing(getCount)));
}
public ImmutableWeightedRandomChooser(Stream<T> elements, Function<T, N> scale) {
this(elements.collect(Collectors.mappingTo(scale)));
}
private void checkLines(Stream<String> s, List<String> expected) {
List<String> lines = s.collect(Collectors.toList());
assertTrue(lines.size() == expected.size(), "Unexpected number of lines");
assertTrue(lines.equals(expected), "Unexpected content");
}
@ParameterizedTest
@MethodSource("argumentsProvider")
public void testUpsertPartitioner(HoodieFileFormat baseFileFormat) throws Exception {
init(baseFileFormat);
HoodieWriteConfig cfg = getConfig(true);
try (HoodieWriteClient client = getHoodieWriteClient(cfg);) {
/**
* Write 1 (only inserts, written as parquet file)
*/
String newCommitTime = "001";
client.startCommitWithTime(newCommitTime);
List<HoodieRecord> records = dataGen.generateInserts(newCommitTime, 20);
JavaRDD<HoodieRecord> writeRecords = jsc.parallelize(records, 1);
List<WriteStatus> statuses = client.upsert(writeRecords, newCommitTime).collect();
assertNoWriteErrors(statuses);
metaClient = getHoodieMetaClient(hadoopConf, cfg.getBasePath());
HoodieTable hoodieTable = HoodieTable.create(metaClient, cfg, hadoopConf);
Option<HoodieInstant> deltaCommit = metaClient.getActiveTimeline().getDeltaCommitTimeline().firstInstant();
assertTrue(deltaCommit.isPresent());
assertEquals("001", deltaCommit.get().getTimestamp(), "Delta commit should be 001");
Option<HoodieInstant> commit = metaClient.getActiveTimeline().getCommitTimeline().firstInstant();
assertFalse(commit.isPresent());
FileStatus[] allFiles = listAllDataFilesInPath(hoodieTable, cfg.getBasePath());
BaseFileOnlyView roView = getHoodieTableFileSystemView(metaClient,
metaClient.getCommitsTimeline().filterCompletedInstants(), allFiles);
Stream<HoodieBaseFile> dataFilesToRead = roView.getLatestBaseFiles();
Map<String, Long> parquetFileIdToSize =
dataFilesToRead.collect(Collectors.toMap(HoodieBaseFile::getFileId, HoodieBaseFile::getFileSize));
roView = getHoodieTableFileSystemView(metaClient, hoodieTable.getCompletedCommitsTimeline(), allFiles);
dataFilesToRead = roView.getLatestBaseFiles();
List<HoodieBaseFile> dataFilesList = dataFilesToRead.collect(Collectors.toList());
assertTrue(dataFilesList.size() > 0,
"Should list the parquet files we wrote in the delta commit");
/**
* Write 2 (only updates + inserts, written to .log file + correction of existing parquet file size)
*/
newCommitTime = "002";
client.startCommitWithTime(newCommitTime);
List<HoodieRecord> newRecords = dataGen.generateUpdates(newCommitTime, records);
newRecords.addAll(dataGen.generateInserts(newCommitTime, 20));
statuses = client.upsert(jsc.parallelize(newRecords), newCommitTime).collect();
// Verify there are no errors
assertNoWriteErrors(statuses);
metaClient = HoodieTableMetaClient.reload(metaClient);
deltaCommit = metaClient.getActiveTimeline().getDeltaCommitTimeline().lastInstant();
assertTrue(deltaCommit.isPresent());
assertEquals("002", deltaCommit.get().getTimestamp(), "Latest Delta commit should be 002");
commit = metaClient.getActiveTimeline().getCommitTimeline().firstInstant();
assertFalse(commit.isPresent());
allFiles = listAllDataFilesInPath(hoodieTable, cfg.getBasePath());
roView = getHoodieTableFileSystemView(metaClient,
hoodieTable.getActiveTimeline().reload().getCommitsTimeline().filterCompletedInstants(), allFiles);
dataFilesToRead = roView.getLatestBaseFiles();
List<HoodieBaseFile> newDataFilesList = dataFilesToRead.collect(Collectors.toList());
Map<String, Long> parquetFileIdToNewSize =
newDataFilesList.stream().collect(Collectors.toMap(HoodieBaseFile::getFileId, HoodieBaseFile::getFileSize));
assertTrue(parquetFileIdToNewSize.entrySet().stream().anyMatch(entry -> parquetFileIdToSize.get(entry.getKey()) < entry.getValue()));
List<String> dataFiles = roView.getLatestBaseFiles().map(HoodieBaseFile::getPath).collect(Collectors.toList());
List<GenericRecord> recordsRead = HoodieMergeOnReadTestUtils.getRecordsUsingInputFormat(hadoopConf, dataFiles,
basePath);
// Wrote 20 records in 2 batches
assertEquals(40, recordsRead.size(), "Must contain 40 records");
}
}
@Test
public void collectEmptyStream() {
Stream<JsonValue> jsonValueStream = Stream.empty();
JsonArray a = jsonValueStream.collect(JsonUtils.asArray());
Assert.assertEquals(0, a.length());
}
/**
* Accumulates the elements of stream into a new List.
*
* @param self the stream
* @param <T> the type of element
* @return a new {@code java.util.List} instance
*
* @since 2.5.0
*/
public static <T> List<T> toList(final Stream<T> self) {
return self.collect(Collectors.toList());
}