下面列出了java.util.stream.StreamSupport#stream ( ) 实例代码,或者点击链接到github查看源代码,也可以在右侧发表评论。
private void processDeletion(ResourceChange change) {
String path = change.getPath();
LOG.debug("Process resource deletion at path {}", path);
Resource parent = resolver.getResource(BINDINGS_PARENT_PATH);
Iterator<Resource> resourceIterator = parent.listChildren();
Stream<Resource> targetStream = StreamSupport.stream(Spliterators.spliteratorUnknownSize(resourceIterator, Spliterator.ORDERED),
false);
targetStream.filter(res -> {
ValueMap properties = res.getValueMap();
LOG.debug("Checking the binding at {}", res.getPath());
String cqConf = properties.get(Constants.PN_CONF, "");
if (StringUtils.isEmpty(cqConf)) {
return false;
}
return path.equals(cqConf + "/" + Constants.COMMERCE_BUCKET_PATH);
}).findFirst().ifPresent(res -> {
LOG.debug("Found a binding at {} that uses {}, we'll delete it", res.getPath(), path);
deleteJcrNode(res);
});
}
/**
* @hide
*/
public static Stream<SliceItem> stream(SliceItem slice) {
Queue<SliceItem> items = new LinkedList();
items.add(slice);
Iterator<SliceItem> iterator = new Iterator<SliceItem>() {
@Override
public boolean hasNext() {
return items.size() != 0;
}
@Override
public SliceItem next() {
SliceItem item = items.poll();
if (compareTypes(item, SliceItem.FORMAT_SLICE)
|| compareTypes(item, SliceItem.FORMAT_ACTION)) {
items.addAll(item.getSlice().getItems());
}
return item;
}
};
return StreamSupport.stream(Spliterators.spliteratorUnknownSize(iterator, 0), false);
}
/**
* @return a {@link Stream} that lazily binds to this {@link Spliterator} when evaluated.
*/
public Stream<T> stream() {
return StreamSupport.stream(
() -> this,
Spliterator.ORDERED | Spliterator.IMMUTABLE | Spliterator.NONNULL,
false
);
}
@Override
public <A extends Annotation> Stream<MergedAnnotation<A>> stream(Class<A> annotationType) {
if (this.annotationFilter == FILTER_ALL) {
return Stream.empty();
}
return StreamSupport.stream(spliterator(annotationType), false);
}
@SuppressWarnings("unchecked")
@Override
public <N> Stream<K> getKeys(String state, N namespace) {
RocksDbKvStateInfo columnInfo = kvStateInformation.get(state);
if (columnInfo == null || !(columnInfo.metaInfo instanceof RegisteredKeyValueStateBackendMetaInfo)) {
return Stream.empty();
}
RegisteredKeyValueStateBackendMetaInfo<N, ?> registeredKeyValueStateBackendMetaInfo =
(RegisteredKeyValueStateBackendMetaInfo<N, ?>) columnInfo.metaInfo;
final TypeSerializer<N> namespaceSerializer = registeredKeyValueStateBackendMetaInfo.getNamespaceSerializer();
final DataOutputSerializer namespaceOutputView = new DataOutputSerializer(8);
boolean ambiguousKeyPossible = RocksDBKeySerializationUtils.isAmbiguousKeyPossible(getKeySerializer(), namespaceSerializer);
final byte[] nameSpaceBytes;
try {
RocksDBKeySerializationUtils.writeNameSpace(
namespace,
namespaceSerializer,
namespaceOutputView,
ambiguousKeyPossible);
nameSpaceBytes = namespaceOutputView.getCopyOfBuffer();
} catch (IOException ex) {
throw new FlinkRuntimeException("Failed to get keys from RocksDB state backend.", ex);
}
RocksIteratorWrapper iterator = RocksDBOperationUtils.getRocksIterator(db, columnInfo.columnFamilyHandle);
iterator.seekToFirst();
final RocksStateKeysIterator<K> iteratorWrapper = new RocksStateKeysIterator<>(iterator, state, getKeySerializer(), keyGroupPrefixBytes,
ambiguousKeyPossible, nameSpaceBytes);
Stream<K> targetStream = StreamSupport.stream(Spliterators.spliteratorUnknownSize(iteratorWrapper, Spliterator.ORDERED), false);
return targetStream.onClose(iteratorWrapper::close);
}
@Override
public Stream<VideoMetadata> streamVideoMetadata(VideoQuery query) {
Preconditions.checkNotNull(query.getPlaceId(), "Must specify placeid");
UUID start = start(query);
UUID end = end(query);
logger.debug("Querying recordings by: types: {} deleted: {} tags: {} cameras: {} in range [{} - {}] limit [{}]", query.getRecordingType(), query.isListDeleted(), query.getTags(), query.getCameras(), start, end, query.getLimit());
Predicate<VideoMetadata> predicate = queryPredicate(query);
Iterator<VideoRecordingSize> recordingIds = queryPlan(query, start, end);
Iterator<VideoMetadata> result = Iterators.transform(recordingIds, (r) -> fetchVideoMetadata(r, predicate));
Spliterator<VideoMetadata> stream = Spliterators.spliteratorUnknownSize(result, Spliterator.IMMUTABLE | Spliterator.NONNULL);
return StreamSupport.stream(stream, false);
}
@SuppressWarnings("unchecked")
@Override
public <N> Stream<K> getKeys(String state, N namespace) {
RocksDbKvStateInfo columnInfo = kvStateInformation.get(state);
if (columnInfo == null || !(columnInfo.metaInfo instanceof RegisteredKeyValueStateBackendMetaInfo)) {
return Stream.empty();
}
RegisteredKeyValueStateBackendMetaInfo<N, ?> registeredKeyValueStateBackendMetaInfo =
(RegisteredKeyValueStateBackendMetaInfo<N, ?>) columnInfo.metaInfo;
final TypeSerializer<N> namespaceSerializer = registeredKeyValueStateBackendMetaInfo.getNamespaceSerializer();
final DataOutputSerializer namespaceOutputView = new DataOutputSerializer(8);
boolean ambiguousKeyPossible = RocksDBKeySerializationUtils.isAmbiguousKeyPossible(getKeySerializer(), namespaceSerializer);
final byte[] nameSpaceBytes;
try {
RocksDBKeySerializationUtils.writeNameSpace(
namespace,
namespaceSerializer,
namespaceOutputView,
ambiguousKeyPossible);
nameSpaceBytes = namespaceOutputView.getCopyOfBuffer();
} catch (IOException ex) {
throw new FlinkRuntimeException("Failed to get keys from RocksDB state backend.", ex);
}
RocksIteratorWrapper iterator = RocksDBOperationUtils.getRocksIterator(db, columnInfo.columnFamilyHandle);
iterator.seekToFirst();
final RocksStateKeysIterator<K> iteratorWrapper = new RocksStateKeysIterator<>(iterator, state, getKeySerializer(), keyGroupPrefixBytes,
ambiguousKeyPossible, nameSpaceBytes);
Stream<K> targetStream = StreamSupport.stream(Spliterators.spliteratorUnknownSize(iteratorWrapper, Spliterator.ORDERED), false);
return targetStream.onClose(iteratorWrapper::close);
}
@NonNull
@Override
public <T> Stream<T> entityStream(@NonNull ResultSet resultSet, @Nullable String prefix, @NonNull Class<T> rootEntity) {
ArgumentUtils.requireNonNull("resultSet", resultSet);
ArgumentUtils.requireNonNull("rootEntity", rootEntity);
TypeMapper<ResultSet, T> mapper = new SqlResultEntityTypeMapper<>(prefix, getEntity(rootEntity), columnNameResultSetReader, jsonCodec);
Iterable<T> iterable = () -> new Iterator<T>() {
boolean nextCalled = false;
@Override
public boolean hasNext() {
try {
if (!nextCalled) {
nextCalled = true;
return resultSet.next();
} else {
return nextCalled;
}
} catch (SQLException e) {
throw new DataAccessException("Error retrieving next JDBC result: " + e.getMessage(), e);
}
}
@Override
public T next() {
nextCalled = false;
return mapper.map(resultSet, rootEntity);
}
};
return StreamSupport.stream(iterable.spliterator(), false);
}
@Override
public Stream<Place> streamAll() {
Context timer = streamAllTimer.time();
Iterator<Row> rows = session.execute(streamAll.bind()).iterator();
Iterator<Place> result = TimingIterator.time(
Iterators.transform(rows, (row) -> buildEntity(row)),
timer
);
Spliterator<Place> stream = Spliterators.spliteratorUnknownSize(result, Spliterator.IMMUTABLE | Spliterator.NONNULL);
return StreamSupport.stream(stream, false);
}
public static <T> Stream<T> enumerationAsStream(final Enumeration<T> e) {
return StreamSupport.stream(Spliterators.spliteratorUnknownSize(new Iterator<T>() {
public T next() {
return e.nextElement();
}
public boolean hasNext() {
return e.hasMoreElements();
}
}, Spliterator.ORDERED), false);
}
@Override
public Stream<Person> streamAll() {
try(Context ctxt = streamAllTimer.time()) {
Iterator<Row> rows = session.execute(new BoundStatement(findAllPeople)).iterator();
Iterator<Person> result = Iterators.transform(rows, (row) -> buildEntity(row));
Spliterator<Person> stream = Spliterators.spliteratorUnknownSize(result, Spliterator.IMMUTABLE | Spliterator.NONNULL);
return StreamSupport.stream(stream, false);
}
}
/**
* Returns a {@code Stream}, the elements of which are lines read from
* this {@code BufferedReader}. The {@link Stream} is lazily populated,
* i.e., read only occurs during the
* <a href="../util/stream/package-summary.html#StreamOps">terminal
* stream operation</a>.
*
* <p> The reader must not be operated on during the execution of the
* terminal stream operation. Otherwise, the result of the terminal stream
* operation is undefined.
*
* <p> After execution of the terminal stream operation there are no
* guarantees that the reader will be at a specific position from which to
* read the next character or line.
*
* <p> If an {@link IOException} is thrown when accessing the underlying
* {@code BufferedReader}, it is wrapped in an {@link
* UncheckedIOException} which will be thrown from the {@code Stream}
* method that caused the read to take place. This method will return a
* Stream if invoked on a BufferedReader that is closed. Any operation on
* that stream that requires reading from the BufferedReader after it is
* closed, will cause an UncheckedIOException to be thrown.
*
* @return a {@code Stream<String>} providing the lines of text
* described by this {@code BufferedReader}
*
* @since 1.8
*/
public Stream<String> lines() {
Iterator<String> iter = new Iterator<String>() {
String nextLine = null;
@Override
public boolean hasNext() {
if (nextLine != null) {
return true;
} else {
try {
nextLine = readLine();
return (nextLine != null);
} catch (IOException e) {
throw new UncheckedIOException(e);
}
}
}
@Override
public String next() {
if (nextLine != null || hasNext()) {
String line = nextLine;
nextLine = null;
return line;
} else {
throw new NoSuchElementException();
}
}
};
return StreamSupport.stream(Spliterators.spliteratorUnknownSize(
iter, Spliterator.ORDERED | Spliterator.NONNULL), false);
}
@Override
public List<CandidateSnapshot> findCandidates(String group) {
Stream<CandidateSnapshot> candidates = StreamSupport.stream(Iterables.concat(
configDao.findCurrentSnapshotsInGroup(group), referenceDao.findCurrentSnapshotsInGroup(group)).spliterator(), false);
return candidates.collect(Collectors.toList());
}
private Stream<CSVRecord> readCsvFile(Reader reader, String... header) throws IOException
{
return StreamSupport.stream(csvFormat.withHeader(header).parse(reader).spliterator(), false);
}
@Override
public <A extends Annotation> Stream<MergedAnnotation<A>> stream(Class<A> annotationType) {
return StreamSupport.stream(spliterator(annotationType), false);
}
@Override
public Stream<MergedAnnotation<Annotation>> stream() {
return StreamSupport.stream(spliterator(), false);
}
default Stream<T> stream() {
return StreamSupport.stream(spliterator(), false);
}
public static <T> Stream<T> toStream(Iterator<T> it) {
return StreamSupport.stream(toIterable(it).spliterator(), false);
}
/**
* Returns a sequential {@code Stream} with this collection as its source.
*
* <p>This method should be overridden when the {@link #spliterator()}
* method cannot return a spliterator that is {@code IMMUTABLE},
* {@code CONCURRENT}, or <em>late-binding</em>. (See {@link #spliterator()}
* for details.)
*
* @implSpec
* The default implementation creates a sequential {@code Stream} from the
* collection's {@code Spliterator}.
*
* @return a sequential {@code Stream} over the elements in this collection
* @since 1.8
*/
default Stream<E> stream() {
return StreamSupport.stream(spliterator(), false);
}
/**
* Returns a sequential {@code Stream} with this collection as its source.
*
* <p>This method should be overridden when the {@link #spliterator()}
* method cannot return a spliterator that is {@code IMMUTABLE},
* {@code CONCURRENT}, or <em>late-binding</em>. (See {@link #spliterator()}
* for details.)
*
* @implSpec
* The default implementation creates a sequential {@code Stream} from the
* collection's {@code Spliterator}.
*
* @return a sequential {@code Stream} over the elements in this collection
* @since 1.8
*/
default Stream<E> stream() {
return StreamSupport.stream(spliterator(), false);
}