下面列出了java.nio.channels.AsynchronousFileChannel#close() 实例代码,或者点击链接到github查看源代码,也可以在右侧发表评论。
@Test
public void writeAsynchronousFileChannelErrorInFlux() throws Exception {
DataBuffer foo = stringBuffer("foo");
DataBuffer bar = stringBuffer("bar");
Flux<DataBuffer> flux =
Flux.just(foo, bar).concatWith(Mono.error(new RuntimeException()));
AsynchronousFileChannel channel =
AsynchronousFileChannel.open(tempFile, StandardOpenOption.WRITE);
Flux<DataBuffer> writeResult = DataBufferUtils.write(flux, channel);
StepVerifier.create(writeResult)
.consumeNextWith(stringConsumer("foo"))
.consumeNextWith(stringConsumer("bar"))
.expectError(RuntimeException.class)
.verify();
String result = String.join("", Files.readAllLines(tempFile));
assertEquals("foobar", result);
channel.close();
}
@Test
public void writeAsynchronousFileChannelCanceled() throws Exception {
DataBuffer foo = stringBuffer("foo");
DataBuffer bar = stringBuffer("bar");
Flux<DataBuffer> flux = Flux.just(foo, bar);
AsynchronousFileChannel channel =
AsynchronousFileChannel.open(tempFile, StandardOpenOption.WRITE);
Flux<DataBuffer> writeResult = DataBufferUtils.write(flux, channel);
StepVerifier.create(writeResult, 1)
.consumeNextWith(stringConsumer("foo"))
.thenCancel()
.verify();
String result = String.join("", Files.readAllLines(tempFile));
assertEquals("foo", result);
channel.close();
flux.subscribe(DataBufferUtils::release);
}
@Test
public void writeAsynchronousFileChannelErrorInFlux() throws Exception {
DataBuffer foo = stringBuffer("foo");
DataBuffer bar = stringBuffer("bar");
Flux<DataBuffer> flux =
Flux.just(foo, bar).concatWith(Mono.error(new RuntimeException()));
AsynchronousFileChannel channel =
AsynchronousFileChannel.open(tempFile, StandardOpenOption.WRITE);
Flux<DataBuffer> writeResult = DataBufferUtils.write(flux, channel);
StepVerifier.create(writeResult)
.consumeNextWith(stringConsumer("foo"))
.consumeNextWith(stringConsumer("bar"))
.expectError(RuntimeException.class)
.verify();
String result = String.join("", Files.readAllLines(tempFile));
assertEquals("foobar", result);
channel.close();
}
@Test
public void writeAsynchronousFileChannelCanceled() throws Exception {
DataBuffer foo = stringBuffer("foo");
DataBuffer bar = stringBuffer("bar");
Flux<DataBuffer> flux = Flux.just(foo, bar);
AsynchronousFileChannel channel =
AsynchronousFileChannel.open(tempFile, StandardOpenOption.WRITE);
Flux<DataBuffer> writeResult = DataBufferUtils.write(flux, channel);
StepVerifier.create(writeResult, 1)
.consumeNextWith(stringConsumer("foo"))
.thenCancel()
.verify();
String result = String.join("", Files.readAllLines(tempFile));
assertEquals("foo", result);
channel.close();
flux.subscribe(DataBufferUtils::release);
}
/** Test leaks via AsynchronousFileChannel.open */
public void testLeakAsyncFileChannel() throws IOException, InterruptedException {
Path dir = wrap(createTempDir());
OutputStream file = Files.newOutputStream(dir.resolve("stillopen"));
file.write(5);
file.close();
ExecutorService executorService = Executors.newFixedThreadPool(1,
new NamedThreadFactory("async-io"));
try {
AsynchronousFileChannel leak = AsynchronousFileChannel.open(dir.resolve("stillopen"),
Collections.emptySet(), executorService);
Exception e = expectThrows(Exception.class, () -> dir.getFileSystem().close());
assertTrue(e.getMessage().contains("file handle leaks"));
leak.close();
} finally {
executorService.shutdown();
executorService.awaitTermination(5, TimeUnit.SECONDS);
}
}
/** Test AsynchronousFileChannel.open */
public void testAsyncFileChannel() throws IOException, InterruptedException {
InfoStreamListener stream = new InfoStreamListener("newAsynchronousFileChannel");
Path dir = wrap(createTempDir(), stream);
ExecutorService executorService = Executors.newFixedThreadPool(1,
new NamedThreadFactory("async-io"));
try {
Set<StandardOpenOption> opts = Set
.of(StandardOpenOption.CREATE_NEW, StandardOpenOption.READ,
StandardOpenOption.WRITE);
AsynchronousFileChannel channel = AsynchronousFileChannel
.open(dir.resolve("foobar"), opts, executorService);
assertTrue(stream.sawMessage());
channel.close();
expectThrows(IOException.class, () -> AsynchronousFileChannel.open(dir.resolve("foobar"),
opts, executorService));
expectThrows(NoSuchFileException.class,
() -> AsynchronousFileChannel.open(dir.resolve("doesNotExist.rip")));
} finally {
executorService.shutdown();
executorService.awaitTermination(5, TimeUnit.SECONDS);
}
}
@Test
public void writeAsynchronousFileChannel() throws Exception {
DataBuffer foo = stringBuffer("foo");
DataBuffer bar = stringBuffer("bar");
DataBuffer baz = stringBuffer("baz");
DataBuffer qux = stringBuffer("qux");
Flux<DataBuffer> flux = Flux.just(foo, bar, baz, qux);
AsynchronousFileChannel channel =
AsynchronousFileChannel.open(tempFile, StandardOpenOption.WRITE);
Flux<DataBuffer> writeResult = DataBufferUtils.write(flux, channel);
verifyWrittenData(writeResult);
channel.close();
}
@Test
public void writeAsynchronousFileChannel() throws Exception {
DataBuffer foo = stringBuffer("foo");
DataBuffer bar = stringBuffer("bar");
DataBuffer baz = stringBuffer("baz");
DataBuffer qux = stringBuffer("qux");
Flux<DataBuffer> flux = Flux.just(foo, bar, baz, qux);
AsynchronousFileChannel channel =
AsynchronousFileChannel.open(tempFile, StandardOpenOption.WRITE);
Flux<DataBuffer> writeResult = DataBufferUtils.write(flux, channel);
verifyWrittenData(writeResult);
channel.close();
}