java.util.stream.Stream#onClose ( )源码实例Demo

下面列出了java.util.stream.Stream#onClose ( ) 实例代码,或者点击链接到github查看源代码,也可以在右侧发表评论。

@Test
public void should_correctly_call_the_onClose_callbacks_of_the_underlying_streams() {
    // Given
    AtomicBoolean b = new AtomicBoolean(false);
    Stream<Map.Entry<Integer, String>> entries =
            Stream.of(
                    new AbstractMap.SimpleEntry<>(1, "1"),
                    new AbstractMap.SimpleEntry<>(2, "2")
            );
    Stream<Map.Entry<Integer, String>> stream = entries.onClose(() -> b.set(true));

    // When
    StreamsUtils.accumulateEntries(stream, String::concat).close();

    // Then
    assertThat(b.get()).isEqualTo(true);
}
 
源代码2 项目: vertexium   文件: StreamUtils.java
@SafeVarargs
private static <T> Stream<T> withCloseHandler(Stream<T> stream, Iterator<T>... iterators) {
    return stream.onClose(() -> {
        for (Iterator<T> iterator : iterators) {
            if (iterator instanceof AutoCloseable) {
                try {
                    ((AutoCloseable) iterator).close();
                } catch (Exception ex) {
                    throw new VertexiumException(
                        String.format("exception occurred when closing %s", iterator.getClass().getName()),
                        ex
                    );
                }
            }
        }
    });
}
 
源代码3 项目: glove   文件: GloveTextReader.java
@Override
public Stream<StringVectorPair> stream(Path input) throws IOException {

  final Stream<String> lines = Files.lines(input);
  int[] expectedSize = new int[] { -1 };
  Stream<StringVectorPair> pairs = lines.map((line) -> process(line)).map(
      (pair) -> {
        Preconditions.checkNotNull(pair.value, "word was null");
        if (expectedSize[0] == -1) {
          expectedSize[0] = pair.vector.getDimension();
        } else {
          Preconditions.checkArgument(
              expectedSize[0] == pair.vector.getDimension(),
              "found inconsistency. Expected size " + expectedSize[0]
                  + " but found " + pair.vector.getDimension());
        }
        return pair;
      });

  pairs.onClose(() -> lines.close());

  return pairs;
}
 
源代码4 项目: Flink-CEPplus   文件: RocksDBKeyedStateBackend.java
@SuppressWarnings("unchecked")
@Override
public <N> Stream<K> getKeys(String state, N namespace) {
	RocksDbKvStateInfo columnInfo = kvStateInformation.get(state);
	if (columnInfo == null || !(columnInfo.metaInfo instanceof RegisteredKeyValueStateBackendMetaInfo)) {
		return Stream.empty();
	}

	RegisteredKeyValueStateBackendMetaInfo<N, ?> registeredKeyValueStateBackendMetaInfo =
		(RegisteredKeyValueStateBackendMetaInfo<N, ?>) columnInfo.metaInfo;

	final TypeSerializer<N> namespaceSerializer = registeredKeyValueStateBackendMetaInfo.getNamespaceSerializer();
	final DataOutputSerializer namespaceOutputView = new DataOutputSerializer(8);
	boolean ambiguousKeyPossible = RocksDBKeySerializationUtils.isAmbiguousKeyPossible(getKeySerializer(), namespaceSerializer);
	final byte[] nameSpaceBytes;
	try {
		RocksDBKeySerializationUtils.writeNameSpace(
			namespace,
			namespaceSerializer,
			namespaceOutputView,
			ambiguousKeyPossible);
		nameSpaceBytes = namespaceOutputView.getCopyOfBuffer();
	} catch (IOException ex) {
		throw new FlinkRuntimeException("Failed to get keys from RocksDB state backend.", ex);
	}

	RocksIteratorWrapper iterator = RocksDBOperationUtils.getRocksIterator(db, columnInfo.columnFamilyHandle);
	iterator.seekToFirst();

	final RocksStateKeysIterator<K> iteratorWrapper = new RocksStateKeysIterator<>(iterator, state, getKeySerializer(), keyGroupPrefixBytes,
		ambiguousKeyPossible, nameSpaceBytes);

	Stream<K> targetStream = StreamSupport.stream(Spliterators.spliteratorUnknownSize(iteratorWrapper, Spliterator.ORDERED), false);
	return targetStream.onClose(iteratorWrapper::close);
}
 
源代码5 项目: flink   文件: RocksDBKeyedStateBackend.java
@SuppressWarnings("unchecked")
@Override
public <N> Stream<K> getKeys(String state, N namespace) {
	RocksDbKvStateInfo columnInfo = kvStateInformation.get(state);
	if (columnInfo == null || !(columnInfo.metaInfo instanceof RegisteredKeyValueStateBackendMetaInfo)) {
		return Stream.empty();
	}

	RegisteredKeyValueStateBackendMetaInfo<N, ?> registeredKeyValueStateBackendMetaInfo =
		(RegisteredKeyValueStateBackendMetaInfo<N, ?>) columnInfo.metaInfo;

	final TypeSerializer<N> namespaceSerializer = registeredKeyValueStateBackendMetaInfo.getNamespaceSerializer();
	final DataOutputSerializer namespaceOutputView = new DataOutputSerializer(8);
	boolean ambiguousKeyPossible = RocksDBKeySerializationUtils.isAmbiguousKeyPossible(getKeySerializer(), namespaceSerializer);
	final byte[] nameSpaceBytes;
	try {
		RocksDBKeySerializationUtils.writeNameSpace(
			namespace,
			namespaceSerializer,
			namespaceOutputView,
			ambiguousKeyPossible);
		nameSpaceBytes = namespaceOutputView.getCopyOfBuffer();
	} catch (IOException ex) {
		throw new FlinkRuntimeException("Failed to get keys from RocksDB state backend.", ex);
	}

	RocksIteratorWrapper iterator = RocksDBOperationUtils.getRocksIterator(db, columnInfo.columnFamilyHandle);
	iterator.seekToFirst();

	final RocksStateKeysIterator<K> iteratorWrapper = new RocksStateKeysIterator<>(iterator, state, getKeySerializer(), keyGroupPrefixBytes,
		ambiguousKeyPossible, nameSpaceBytes);

	Stream<K> targetStream = StreamSupport.stream(Spliterators.spliteratorUnknownSize(iteratorWrapper, Spliterator.ORDERED), false);
	return targetStream.onClose(iteratorWrapper::close);
}
 
源代码6 项目: mug   文件: MoreStreams.java
static <F, T> Stream<T> mapBySpliterator(
    Stream<F> stream, int characteristics,
    Function<? super Spliterator<F>, ? extends Spliterator<T>> mapper) {
  requireNonNull(mapper);
  Stream<T> mapped = StreamSupport.stream(
      () -> mapper.apply(stream.spliterator()), characteristics, stream.isParallel());
  mapped.onClose(stream::close);
  return mapped;
}
 
源代码7 项目: mug   文件: MoreStreamsParameterizedTest.java
@Test public void close() {
  Stream<?> stream = kind.natural(0);
  AtomicBoolean closed = new AtomicBoolean();
  stream.onClose(() -> closed.set(true));
  try (Stream<?> diced = MoreStreams.dice(stream, 1)) {}
  assertThat(closed.get()).isTrue();
}
 
源代码8 项目: huntbugs   文件: TestAbandonedStream.java
@AssertNoWarning("AbandonedStream")
@AssertWarning("StreamMethodMayNotReturnItself")
public Stream<String> testClose(List<String> list) {
    Stream<String> stream = list.stream();
    stream.onClose(() -> System.out.println("Closed!"));
    return stream;
}
 
源代码9 项目: caffeine   文件: BinaryTraceReader.java
@Override
@SuppressWarnings("PMD.CloseResource")
public Stream<AccessEvent> events() throws IOException {
  DataInputStream input = new DataInputStream(new BufferedInputStream(readFile()));
  Stream<AccessEvent> stream = StreamSupport.stream(Spliterators.spliteratorUnknownSize(
      new TraceIterator(input), Spliterator.ORDERED), /* parallel */ false);
  return stream.onClose(() -> Closeables.closeQuietly(input));
}
 
源代码10 项目: flink   文件: RocksDBKeyedStateBackend.java
@SuppressWarnings("unchecked")
@Override
public <N> Stream<K> getKeys(String state, N namespace) {
	RocksDbKvStateInfo columnInfo = kvStateInformation.get(state);
	if (columnInfo == null || !(columnInfo.metaInfo instanceof RegisteredKeyValueStateBackendMetaInfo)) {
		return Stream.empty();
	}

	RegisteredKeyValueStateBackendMetaInfo<N, ?> registeredKeyValueStateBackendMetaInfo =
		(RegisteredKeyValueStateBackendMetaInfo<N, ?>) columnInfo.metaInfo;

	final TypeSerializer<N> namespaceSerializer = registeredKeyValueStateBackendMetaInfo.getNamespaceSerializer();
	final DataOutputSerializer namespaceOutputView = new DataOutputSerializer(8);
	boolean ambiguousKeyPossible = RocksDBKeySerializationUtils.isAmbiguousKeyPossible(getKeySerializer(), namespaceSerializer);
	final byte[] nameSpaceBytes;
	try {
		RocksDBKeySerializationUtils.writeNameSpace(
			namespace,
			namespaceSerializer,
			namespaceOutputView,
			ambiguousKeyPossible);
		nameSpaceBytes = namespaceOutputView.getCopyOfBuffer();
	} catch (IOException ex) {
		throw new FlinkRuntimeException("Failed to get keys from RocksDB state backend.", ex);
	}

	RocksIteratorWrapper iterator = RocksDBOperationUtils.getRocksIterator(db, columnInfo.columnFamilyHandle, readOptions);
	iterator.seekToFirst();

	final RocksStateKeysIterator<K> iteratorWrapper = new RocksStateKeysIterator<>(iterator, state, getKeySerializer(), keyGroupPrefixBytes,
		ambiguousKeyPossible, nameSpaceBytes);

	Stream<K> targetStream = StreamSupport.stream(Spliterators.spliteratorUnknownSize(iteratorWrapper, Spliterator.ORDERED), false);
	return targetStream.onClose(iteratorWrapper::close);
}
 
源代码11 项目: data-prep   文件: StreamModuleTest.java
@Test
public void shouldHandleCloseStreamException() throws Exception {
    // Given
    Stream<String> stringStream = Stream.of();
    stringStream = stringStream.onClose(() -> {
        throw new RuntimeException("On purpose thrown close exception.");
    });

    // When
    final StringWriter writer = new StringWriter();
    mapper.writeValue(writer, stringStream);

    // Then
    assertThat(writer.toString(), sameJSONAs("[]"));
}
 
源代码12 项目: doma   文件: SelectCommand.java
protected void close(Supplier<RESULT> supplier, Runnable closeHandler) {
  if (supplier != null && query.isResultStream() && query.getFetchType() == FetchType.LAZY) {
    RESULT result = supplier.get();
    if (result instanceof Stream) {
      @SuppressWarnings("resource")
      Stream<?> stream = (Stream<?>) result;
      stream.onClose(closeHandler);
    } else {
      closeHandler.run();
    }
  } else {
    closeHandler.run();
  }
}
 
源代码13 项目: Bytecoder   文件: Scanner.java
/**
 * Returns a stream of match results from this scanner. The stream
 * contains the same results in the same order that would be returned by
 * calling {@code findWithinHorizon(pattern, 0)} and then {@link #match}
 * successively as long as {@link #findWithinHorizon findWithinHorizon()}
 * finds matches.
 *
 * <p>The resulting stream is sequential and ordered. All stream elements are
 * non-null.
 *
 * <p>Scanning starts upon initiation of the terminal stream operation, using the
 * current state of this scanner. Subsequent calls to any methods on this scanner
 * other than {@link #close} and {@link #ioException} may return undefined results
 * or may cause undefined effects on the returned stream. The returned stream's source
 * {@code Spliterator} is <em>fail-fast</em> and will, on a best-effort basis, throw a
 * {@link java.util.ConcurrentModificationException} if any such calls are detected
 * during stream pipeline execution.
 *
 * <p>After stream pipeline execution completes, this scanner is left in an indeterminate
 * state and cannot be reused.
 *
 * <p>If this scanner contains a resource that must be released, this scanner
 * should be closed, either by calling its {@link #close} method, or by
 * closing the returned stream. Closing the stream will close the underlying scanner.
 * {@code IllegalStateException} is thrown if the scanner has been closed when this
 * method is called, or if this scanner is closed during stream pipeline execution.
 *
 * <p>As with the {@link #findWithinHorizon findWithinHorizon()} methods, this method
 * might block waiting for additional input, and it might buffer an unbounded amount of
 * input searching for a match.
 *
 * @apiNote
 * For example, the following code will read a file and return a list
 * of all sequences of characters consisting of seven or more Latin capital
 * letters:
 *
 * <pre>{@code
 * try (Scanner sc = new Scanner(Path.of("input.txt"))) {
 *     Pattern pat = Pattern.compile("[A-Z]{7,}");
 *     List<String> capWords = sc.findAll(pat)
 *                               .map(MatchResult::group)
 *                               .collect(Collectors.toList());
 * }
 * }</pre>
 *
 * @param pattern the pattern to be matched
 * @return a sequential stream of match results
 * @throws NullPointerException if pattern is null
 * @throws IllegalStateException if this scanner is closed
 * @since 9
 */
public Stream<MatchResult> findAll(Pattern pattern) {
    Objects.requireNonNull(pattern);
    ensureOpen();
    Stream<MatchResult> stream = StreamSupport.stream(new FindSpliterator(pattern), false);
    return stream.onClose(this::close);
}
 
源代码14 项目: openjdk-jdk9   文件: Scanner.java
/**
 * Returns a stream of match results from this scanner. The stream
 * contains the same results in the same order that would be returned by
 * calling {@code findWithinHorizon(pattern, 0)} and then {@link #match}
 * successively as long as {@link #findWithinHorizon findWithinHorizon()}
 * finds matches.
 *
 * <p>The resulting stream is sequential and ordered. All stream elements are
 * non-null.
 *
 * <p>Scanning starts upon initiation of the terminal stream operation, using the
 * current state of this scanner. Subsequent calls to any methods on this scanner
 * other than {@link #close} and {@link #ioException} may return undefined results
 * or may cause undefined effects on the returned stream. The returned stream's source
 * {@code Spliterator} is <em>fail-fast</em> and will, on a best-effort basis, throw a
 * {@link java.util.ConcurrentModificationException} if any such calls are detected
 * during stream pipeline execution.
 *
 * <p>After stream pipeline execution completes, this scanner is left in an indeterminate
 * state and cannot be reused.
 *
 * <p>If this scanner contains a resource that must be released, this scanner
 * should be closed, either by calling its {@link #close} method, or by
 * closing the returned stream. Closing the stream will close the underlying scanner.
 * {@code IllegalStateException} is thrown if the scanner has been closed when this
 * method is called, or if this scanner is closed during stream pipeline execution.
 *
 * <p>As with the {@link #findWithinHorizon findWithinHorizon()} methods, this method
 * might block waiting for additional input, and it might buffer an unbounded amount of
 * input searching for a match.
 *
 * @apiNote
 * For example, the following code will read a file and return a list
 * of all sequences of characters consisting of seven or more Latin capital
 * letters:
 *
 * <pre>{@code
 * try (Scanner sc = new Scanner(Paths.get("input.txt"))) {
 *     Pattern pat = Pattern.compile("[A-Z]{7,}");
 *     List<String> capWords = sc.findAll(pat)
 *                               .map(MatchResult::group)
 *                               .collect(Collectors.toList());
 * }
 * }</pre>
 *
 * @param pattern the pattern to be matched
 * @return a sequential stream of match results
 * @throws NullPointerException if pattern is null
 * @throws IllegalStateException if this scanner is closed
 * @since 9
 */
public Stream<MatchResult> findAll(Pattern pattern) {
    Objects.requireNonNull(pattern);
    ensureOpen();
    Stream<MatchResult> stream = StreamSupport.stream(new FindSpliterator(pattern), false);
    return stream.onClose(this::close);
}
 
源代码15 项目: tinkerpop3   文件: Streams.java
/**
 * <p>
 * Obtain a Java 8 stream from an iterator.  If the iterator happens to
 * implement AutoCloseable (e.g. {@link CloseableIterator}), the stream's 
 * onClose behavior will close the iterator.  Thus it is important to
 * always close the returned stream, or use within a try-with-resources:
 * </p>
 * <pre>
 * {@code
 * try (Stream<Object> s = Streams.of(it)) {
 *     // do something with s
 * } // auto-close
 * }
 * </pre>
 *
 */
public static final <T> Stream<T> of(final Iterator<T> it) {
    final Stream<T> s = StreamSupport.stream(
            Spliterators.spliteratorUnknownSize(it, Spliterator.ORDERED), false);
    if (it instanceof AutoCloseable) {
        final AutoCloseable closeable = (AutoCloseable) it;
        s.onClose(() -> Code.wrapThrow(() -> closeable.close()));
    }
    return s;
}
 
源代码16 项目: Bytecoder   文件: Scanner.java
/**
 * Returns a stream of delimiter-separated tokens from this scanner. The
 * stream contains the same tokens that would be returned, starting from
 * this scanner's current state, by calling the {@link #next} method
 * repeatedly until the {@link #hasNext} method returns false.
 *
 * <p>The resulting stream is sequential and ordered. All stream elements are
 * non-null.
 *
 * <p>Scanning starts upon initiation of the terminal stream operation, using the
 * current state of this scanner. Subsequent calls to any methods on this scanner
 * other than {@link #close} and {@link #ioException} may return undefined results
 * or may cause undefined effects on the returned stream. The returned stream's source
 * {@code Spliterator} is <em>fail-fast</em> and will, on a best-effort basis, throw a
 * {@link java.util.ConcurrentModificationException} if any such calls are detected
 * during stream pipeline execution.
 *
 * <p>After stream pipeline execution completes, this scanner is left in an indeterminate
 * state and cannot be reused.
 *
 * <p>If this scanner contains a resource that must be released, this scanner
 * should be closed, either by calling its {@link #close} method, or by
 * closing the returned stream. Closing the stream will close the underlying scanner.
 * {@code IllegalStateException} is thrown if the scanner has been closed when this
 * method is called, or if this scanner is closed during stream pipeline execution.
 *
 * <p>This method might block waiting for more input.
 *
 * @apiNote
 * For example, the following code will create a list of
 * comma-delimited tokens from a string:
 *
 * <pre>{@code
 * List<String> result = new Scanner("abc,def,,ghi")
 *     .useDelimiter(",")
 *     .tokens()
 *     .collect(Collectors.toList());
 * }</pre>
 *
 * <p>The resulting list would contain {@code "abc"}, {@code "def"},
 * the empty string, and {@code "ghi"}.
 *
 * @return a sequential stream of token strings
 * @throws IllegalStateException if this scanner is closed
 * @since 9
 */
public Stream<String> tokens() {
    ensureOpen();
    Stream<String> stream = StreamSupport.stream(new TokenSpliterator(), false);
    return stream.onClose(this::close);
}
 
源代码17 项目: openjdk-jdk9   文件: Scanner.java
/**
 * Returns a stream of delimiter-separated tokens from this scanner. The
 * stream contains the same tokens that would be returned, starting from
 * this scanner's current state, by calling the {@link #next} method
 * repeatedly until the {@link #hasNext} method returns false.
 *
 * <p>The resulting stream is sequential and ordered. All stream elements are
 * non-null.
 *
 * <p>Scanning starts upon initiation of the terminal stream operation, using the
 * current state of this scanner. Subsequent calls to any methods on this scanner
 * other than {@link #close} and {@link #ioException} may return undefined results
 * or may cause undefined effects on the returned stream. The returned stream's source
 * {@code Spliterator} is <em>fail-fast</em> and will, on a best-effort basis, throw a
 * {@link java.util.ConcurrentModificationException} if any such calls are detected
 * during stream pipeline execution.
 *
 * <p>After stream pipeline execution completes, this scanner is left in an indeterminate
 * state and cannot be reused.
 *
 * <p>If this scanner contains a resource that must be released, this scanner
 * should be closed, either by calling its {@link #close} method, or by
 * closing the returned stream. Closing the stream will close the underlying scanner.
 * {@code IllegalStateException} is thrown if the scanner has been closed when this
 * method is called, or if this scanner is closed during stream pipeline execution.
 *
 * <p>This method might block waiting for more input.
 *
 * @apiNote
 * For example, the following code will create a list of
 * comma-delimited tokens from a string:
 *
 * <pre>{@code
 * List<String> result = new Scanner("abc,def,,ghi")
 *     .useDelimiter(",")
 *     .tokens()
 *     .collect(Collectors.toList());
 * }</pre>
 *
 * <p>The resulting list would contain {@code "abc"}, {@code "def"},
 * the empty string, and {@code "ghi"}.
 *
 * @return a sequential stream of token strings
 * @throws IllegalStateException if this scanner is closed
 * @since 9
 */
public Stream<String> tokens() {
    ensureOpen();
    Stream<String> stream = StreamSupport.stream(new TokenSpliterator(), false);
    return stream.onClose(this::close);
}