下面列出了java.util.stream.Stream#onClose ( ) 实例代码,或者点击链接到github查看源代码,也可以在右侧发表评论。
@Test
public void should_correctly_call_the_onClose_callbacks_of_the_underlying_streams() {
// Given
AtomicBoolean b = new AtomicBoolean(false);
Stream<Map.Entry<Integer, String>> entries =
Stream.of(
new AbstractMap.SimpleEntry<>(1, "1"),
new AbstractMap.SimpleEntry<>(2, "2")
);
Stream<Map.Entry<Integer, String>> stream = entries.onClose(() -> b.set(true));
// When
StreamsUtils.accumulateEntries(stream, String::concat).close();
// Then
assertThat(b.get()).isEqualTo(true);
}
@SafeVarargs
private static <T> Stream<T> withCloseHandler(Stream<T> stream, Iterator<T>... iterators) {
return stream.onClose(() -> {
for (Iterator<T> iterator : iterators) {
if (iterator instanceof AutoCloseable) {
try {
((AutoCloseable) iterator).close();
} catch (Exception ex) {
throw new VertexiumException(
String.format("exception occurred when closing %s", iterator.getClass().getName()),
ex
);
}
}
}
});
}
@Override
public Stream<StringVectorPair> stream(Path input) throws IOException {
final Stream<String> lines = Files.lines(input);
int[] expectedSize = new int[] { -1 };
Stream<StringVectorPair> pairs = lines.map((line) -> process(line)).map(
(pair) -> {
Preconditions.checkNotNull(pair.value, "word was null");
if (expectedSize[0] == -1) {
expectedSize[0] = pair.vector.getDimension();
} else {
Preconditions.checkArgument(
expectedSize[0] == pair.vector.getDimension(),
"found inconsistency. Expected size " + expectedSize[0]
+ " but found " + pair.vector.getDimension());
}
return pair;
});
pairs.onClose(() -> lines.close());
return pairs;
}
@SuppressWarnings("unchecked")
@Override
public <N> Stream<K> getKeys(String state, N namespace) {
RocksDbKvStateInfo columnInfo = kvStateInformation.get(state);
if (columnInfo == null || !(columnInfo.metaInfo instanceof RegisteredKeyValueStateBackendMetaInfo)) {
return Stream.empty();
}
RegisteredKeyValueStateBackendMetaInfo<N, ?> registeredKeyValueStateBackendMetaInfo =
(RegisteredKeyValueStateBackendMetaInfo<N, ?>) columnInfo.metaInfo;
final TypeSerializer<N> namespaceSerializer = registeredKeyValueStateBackendMetaInfo.getNamespaceSerializer();
final DataOutputSerializer namespaceOutputView = new DataOutputSerializer(8);
boolean ambiguousKeyPossible = RocksDBKeySerializationUtils.isAmbiguousKeyPossible(getKeySerializer(), namespaceSerializer);
final byte[] nameSpaceBytes;
try {
RocksDBKeySerializationUtils.writeNameSpace(
namespace,
namespaceSerializer,
namespaceOutputView,
ambiguousKeyPossible);
nameSpaceBytes = namespaceOutputView.getCopyOfBuffer();
} catch (IOException ex) {
throw new FlinkRuntimeException("Failed to get keys from RocksDB state backend.", ex);
}
RocksIteratorWrapper iterator = RocksDBOperationUtils.getRocksIterator(db, columnInfo.columnFamilyHandle);
iterator.seekToFirst();
final RocksStateKeysIterator<K> iteratorWrapper = new RocksStateKeysIterator<>(iterator, state, getKeySerializer(), keyGroupPrefixBytes,
ambiguousKeyPossible, nameSpaceBytes);
Stream<K> targetStream = StreamSupport.stream(Spliterators.spliteratorUnknownSize(iteratorWrapper, Spliterator.ORDERED), false);
return targetStream.onClose(iteratorWrapper::close);
}
@SuppressWarnings("unchecked")
@Override
public <N> Stream<K> getKeys(String state, N namespace) {
RocksDbKvStateInfo columnInfo = kvStateInformation.get(state);
if (columnInfo == null || !(columnInfo.metaInfo instanceof RegisteredKeyValueStateBackendMetaInfo)) {
return Stream.empty();
}
RegisteredKeyValueStateBackendMetaInfo<N, ?> registeredKeyValueStateBackendMetaInfo =
(RegisteredKeyValueStateBackendMetaInfo<N, ?>) columnInfo.metaInfo;
final TypeSerializer<N> namespaceSerializer = registeredKeyValueStateBackendMetaInfo.getNamespaceSerializer();
final DataOutputSerializer namespaceOutputView = new DataOutputSerializer(8);
boolean ambiguousKeyPossible = RocksDBKeySerializationUtils.isAmbiguousKeyPossible(getKeySerializer(), namespaceSerializer);
final byte[] nameSpaceBytes;
try {
RocksDBKeySerializationUtils.writeNameSpace(
namespace,
namespaceSerializer,
namespaceOutputView,
ambiguousKeyPossible);
nameSpaceBytes = namespaceOutputView.getCopyOfBuffer();
} catch (IOException ex) {
throw new FlinkRuntimeException("Failed to get keys from RocksDB state backend.", ex);
}
RocksIteratorWrapper iterator = RocksDBOperationUtils.getRocksIterator(db, columnInfo.columnFamilyHandle);
iterator.seekToFirst();
final RocksStateKeysIterator<K> iteratorWrapper = new RocksStateKeysIterator<>(iterator, state, getKeySerializer(), keyGroupPrefixBytes,
ambiguousKeyPossible, nameSpaceBytes);
Stream<K> targetStream = StreamSupport.stream(Spliterators.spliteratorUnknownSize(iteratorWrapper, Spliterator.ORDERED), false);
return targetStream.onClose(iteratorWrapper::close);
}
static <F, T> Stream<T> mapBySpliterator(
Stream<F> stream, int characteristics,
Function<? super Spliterator<F>, ? extends Spliterator<T>> mapper) {
requireNonNull(mapper);
Stream<T> mapped = StreamSupport.stream(
() -> mapper.apply(stream.spliterator()), characteristics, stream.isParallel());
mapped.onClose(stream::close);
return mapped;
}
@Test public void close() {
Stream<?> stream = kind.natural(0);
AtomicBoolean closed = new AtomicBoolean();
stream.onClose(() -> closed.set(true));
try (Stream<?> diced = MoreStreams.dice(stream, 1)) {}
assertThat(closed.get()).isTrue();
}
@AssertNoWarning("AbandonedStream")
@AssertWarning("StreamMethodMayNotReturnItself")
public Stream<String> testClose(List<String> list) {
Stream<String> stream = list.stream();
stream.onClose(() -> System.out.println("Closed!"));
return stream;
}
@Override
@SuppressWarnings("PMD.CloseResource")
public Stream<AccessEvent> events() throws IOException {
DataInputStream input = new DataInputStream(new BufferedInputStream(readFile()));
Stream<AccessEvent> stream = StreamSupport.stream(Spliterators.spliteratorUnknownSize(
new TraceIterator(input), Spliterator.ORDERED), /* parallel */ false);
return stream.onClose(() -> Closeables.closeQuietly(input));
}
@SuppressWarnings("unchecked")
@Override
public <N> Stream<K> getKeys(String state, N namespace) {
RocksDbKvStateInfo columnInfo = kvStateInformation.get(state);
if (columnInfo == null || !(columnInfo.metaInfo instanceof RegisteredKeyValueStateBackendMetaInfo)) {
return Stream.empty();
}
RegisteredKeyValueStateBackendMetaInfo<N, ?> registeredKeyValueStateBackendMetaInfo =
(RegisteredKeyValueStateBackendMetaInfo<N, ?>) columnInfo.metaInfo;
final TypeSerializer<N> namespaceSerializer = registeredKeyValueStateBackendMetaInfo.getNamespaceSerializer();
final DataOutputSerializer namespaceOutputView = new DataOutputSerializer(8);
boolean ambiguousKeyPossible = RocksDBKeySerializationUtils.isAmbiguousKeyPossible(getKeySerializer(), namespaceSerializer);
final byte[] nameSpaceBytes;
try {
RocksDBKeySerializationUtils.writeNameSpace(
namespace,
namespaceSerializer,
namespaceOutputView,
ambiguousKeyPossible);
nameSpaceBytes = namespaceOutputView.getCopyOfBuffer();
} catch (IOException ex) {
throw new FlinkRuntimeException("Failed to get keys from RocksDB state backend.", ex);
}
RocksIteratorWrapper iterator = RocksDBOperationUtils.getRocksIterator(db, columnInfo.columnFamilyHandle, readOptions);
iterator.seekToFirst();
final RocksStateKeysIterator<K> iteratorWrapper = new RocksStateKeysIterator<>(iterator, state, getKeySerializer(), keyGroupPrefixBytes,
ambiguousKeyPossible, nameSpaceBytes);
Stream<K> targetStream = StreamSupport.stream(Spliterators.spliteratorUnknownSize(iteratorWrapper, Spliterator.ORDERED), false);
return targetStream.onClose(iteratorWrapper::close);
}
@Test
public void shouldHandleCloseStreamException() throws Exception {
// Given
Stream<String> stringStream = Stream.of();
stringStream = stringStream.onClose(() -> {
throw new RuntimeException("On purpose thrown close exception.");
});
// When
final StringWriter writer = new StringWriter();
mapper.writeValue(writer, stringStream);
// Then
assertThat(writer.toString(), sameJSONAs("[]"));
}
protected void close(Supplier<RESULT> supplier, Runnable closeHandler) {
if (supplier != null && query.isResultStream() && query.getFetchType() == FetchType.LAZY) {
RESULT result = supplier.get();
if (result instanceof Stream) {
@SuppressWarnings("resource")
Stream<?> stream = (Stream<?>) result;
stream.onClose(closeHandler);
} else {
closeHandler.run();
}
} else {
closeHandler.run();
}
}
/**
* Returns a stream of match results from this scanner. The stream
* contains the same results in the same order that would be returned by
* calling {@code findWithinHorizon(pattern, 0)} and then {@link #match}
* successively as long as {@link #findWithinHorizon findWithinHorizon()}
* finds matches.
*
* <p>The resulting stream is sequential and ordered. All stream elements are
* non-null.
*
* <p>Scanning starts upon initiation of the terminal stream operation, using the
* current state of this scanner. Subsequent calls to any methods on this scanner
* other than {@link #close} and {@link #ioException} may return undefined results
* or may cause undefined effects on the returned stream. The returned stream's source
* {@code Spliterator} is <em>fail-fast</em> and will, on a best-effort basis, throw a
* {@link java.util.ConcurrentModificationException} if any such calls are detected
* during stream pipeline execution.
*
* <p>After stream pipeline execution completes, this scanner is left in an indeterminate
* state and cannot be reused.
*
* <p>If this scanner contains a resource that must be released, this scanner
* should be closed, either by calling its {@link #close} method, or by
* closing the returned stream. Closing the stream will close the underlying scanner.
* {@code IllegalStateException} is thrown if the scanner has been closed when this
* method is called, or if this scanner is closed during stream pipeline execution.
*
* <p>As with the {@link #findWithinHorizon findWithinHorizon()} methods, this method
* might block waiting for additional input, and it might buffer an unbounded amount of
* input searching for a match.
*
* @apiNote
* For example, the following code will read a file and return a list
* of all sequences of characters consisting of seven or more Latin capital
* letters:
*
* <pre>{@code
* try (Scanner sc = new Scanner(Path.of("input.txt"))) {
* Pattern pat = Pattern.compile("[A-Z]{7,}");
* List<String> capWords = sc.findAll(pat)
* .map(MatchResult::group)
* .collect(Collectors.toList());
* }
* }</pre>
*
* @param pattern the pattern to be matched
* @return a sequential stream of match results
* @throws NullPointerException if pattern is null
* @throws IllegalStateException if this scanner is closed
* @since 9
*/
public Stream<MatchResult> findAll(Pattern pattern) {
Objects.requireNonNull(pattern);
ensureOpen();
Stream<MatchResult> stream = StreamSupport.stream(new FindSpliterator(pattern), false);
return stream.onClose(this::close);
}
/**
* Returns a stream of match results from this scanner. The stream
* contains the same results in the same order that would be returned by
* calling {@code findWithinHorizon(pattern, 0)} and then {@link #match}
* successively as long as {@link #findWithinHorizon findWithinHorizon()}
* finds matches.
*
* <p>The resulting stream is sequential and ordered. All stream elements are
* non-null.
*
* <p>Scanning starts upon initiation of the terminal stream operation, using the
* current state of this scanner. Subsequent calls to any methods on this scanner
* other than {@link #close} and {@link #ioException} may return undefined results
* or may cause undefined effects on the returned stream. The returned stream's source
* {@code Spliterator} is <em>fail-fast</em> and will, on a best-effort basis, throw a
* {@link java.util.ConcurrentModificationException} if any such calls are detected
* during stream pipeline execution.
*
* <p>After stream pipeline execution completes, this scanner is left in an indeterminate
* state and cannot be reused.
*
* <p>If this scanner contains a resource that must be released, this scanner
* should be closed, either by calling its {@link #close} method, or by
* closing the returned stream. Closing the stream will close the underlying scanner.
* {@code IllegalStateException} is thrown if the scanner has been closed when this
* method is called, or if this scanner is closed during stream pipeline execution.
*
* <p>As with the {@link #findWithinHorizon findWithinHorizon()} methods, this method
* might block waiting for additional input, and it might buffer an unbounded amount of
* input searching for a match.
*
* @apiNote
* For example, the following code will read a file and return a list
* of all sequences of characters consisting of seven or more Latin capital
* letters:
*
* <pre>{@code
* try (Scanner sc = new Scanner(Paths.get("input.txt"))) {
* Pattern pat = Pattern.compile("[A-Z]{7,}");
* List<String> capWords = sc.findAll(pat)
* .map(MatchResult::group)
* .collect(Collectors.toList());
* }
* }</pre>
*
* @param pattern the pattern to be matched
* @return a sequential stream of match results
* @throws NullPointerException if pattern is null
* @throws IllegalStateException if this scanner is closed
* @since 9
*/
public Stream<MatchResult> findAll(Pattern pattern) {
Objects.requireNonNull(pattern);
ensureOpen();
Stream<MatchResult> stream = StreamSupport.stream(new FindSpliterator(pattern), false);
return stream.onClose(this::close);
}
/**
* <p>
* Obtain a Java 8 stream from an iterator. If the iterator happens to
* implement AutoCloseable (e.g. {@link CloseableIterator}), the stream's
* onClose behavior will close the iterator. Thus it is important to
* always close the returned stream, or use within a try-with-resources:
* </p>
* <pre>
* {@code
* try (Stream<Object> s = Streams.of(it)) {
* // do something with s
* } // auto-close
* }
* </pre>
*
*/
public static final <T> Stream<T> of(final Iterator<T> it) {
final Stream<T> s = StreamSupport.stream(
Spliterators.spliteratorUnknownSize(it, Spliterator.ORDERED), false);
if (it instanceof AutoCloseable) {
final AutoCloseable closeable = (AutoCloseable) it;
s.onClose(() -> Code.wrapThrow(() -> closeable.close()));
}
return s;
}
/**
* Returns a stream of delimiter-separated tokens from this scanner. The
* stream contains the same tokens that would be returned, starting from
* this scanner's current state, by calling the {@link #next} method
* repeatedly until the {@link #hasNext} method returns false.
*
* <p>The resulting stream is sequential and ordered. All stream elements are
* non-null.
*
* <p>Scanning starts upon initiation of the terminal stream operation, using the
* current state of this scanner. Subsequent calls to any methods on this scanner
* other than {@link #close} and {@link #ioException} may return undefined results
* or may cause undefined effects on the returned stream. The returned stream's source
* {@code Spliterator} is <em>fail-fast</em> and will, on a best-effort basis, throw a
* {@link java.util.ConcurrentModificationException} if any such calls are detected
* during stream pipeline execution.
*
* <p>After stream pipeline execution completes, this scanner is left in an indeterminate
* state and cannot be reused.
*
* <p>If this scanner contains a resource that must be released, this scanner
* should be closed, either by calling its {@link #close} method, or by
* closing the returned stream. Closing the stream will close the underlying scanner.
* {@code IllegalStateException} is thrown if the scanner has been closed when this
* method is called, or if this scanner is closed during stream pipeline execution.
*
* <p>This method might block waiting for more input.
*
* @apiNote
* For example, the following code will create a list of
* comma-delimited tokens from a string:
*
* <pre>{@code
* List<String> result = new Scanner("abc,def,,ghi")
* .useDelimiter(",")
* .tokens()
* .collect(Collectors.toList());
* }</pre>
*
* <p>The resulting list would contain {@code "abc"}, {@code "def"},
* the empty string, and {@code "ghi"}.
*
* @return a sequential stream of token strings
* @throws IllegalStateException if this scanner is closed
* @since 9
*/
public Stream<String> tokens() {
ensureOpen();
Stream<String> stream = StreamSupport.stream(new TokenSpliterator(), false);
return stream.onClose(this::close);
}
/**
* Returns a stream of delimiter-separated tokens from this scanner. The
* stream contains the same tokens that would be returned, starting from
* this scanner's current state, by calling the {@link #next} method
* repeatedly until the {@link #hasNext} method returns false.
*
* <p>The resulting stream is sequential and ordered. All stream elements are
* non-null.
*
* <p>Scanning starts upon initiation of the terminal stream operation, using the
* current state of this scanner. Subsequent calls to any methods on this scanner
* other than {@link #close} and {@link #ioException} may return undefined results
* or may cause undefined effects on the returned stream. The returned stream's source
* {@code Spliterator} is <em>fail-fast</em> and will, on a best-effort basis, throw a
* {@link java.util.ConcurrentModificationException} if any such calls are detected
* during stream pipeline execution.
*
* <p>After stream pipeline execution completes, this scanner is left in an indeterminate
* state and cannot be reused.
*
* <p>If this scanner contains a resource that must be released, this scanner
* should be closed, either by calling its {@link #close} method, or by
* closing the returned stream. Closing the stream will close the underlying scanner.
* {@code IllegalStateException} is thrown if the scanner has been closed when this
* method is called, or if this scanner is closed during stream pipeline execution.
*
* <p>This method might block waiting for more input.
*
* @apiNote
* For example, the following code will create a list of
* comma-delimited tokens from a string:
*
* <pre>{@code
* List<String> result = new Scanner("abc,def,,ghi")
* .useDelimiter(",")
* .tokens()
* .collect(Collectors.toList());
* }</pre>
*
* <p>The resulting list would contain {@code "abc"}, {@code "def"},
* the empty string, and {@code "ghi"}.
*
* @return a sequential stream of token strings
* @throws IllegalStateException if this scanner is closed
* @since 9
*/
public Stream<String> tokens() {
ensureOpen();
Stream<String> stream = StreamSupport.stream(new TokenSpliterator(), false);
return stream.onClose(this::close);
}