下面列出了java.nio.channels.AsynchronousFileChannel#open() 实例代码,或者点击链接到github查看源代码,也可以在右侧发表评论。
/** Test AsynchronousFileChannel.open */
public void testAsyncFileChannel() throws IOException, InterruptedException {
InfoStreamListener stream = new InfoStreamListener("newAsynchronousFileChannel");
Path dir = wrap(createTempDir(), stream);
ExecutorService executorService = Executors.newFixedThreadPool(1,
new NamedThreadFactory("async-io"));
try {
Set<StandardOpenOption> opts = Set
.of(StandardOpenOption.CREATE_NEW, StandardOpenOption.READ,
StandardOpenOption.WRITE);
AsynchronousFileChannel channel = AsynchronousFileChannel
.open(dir.resolve("foobar"), opts, executorService);
assertTrue(stream.sawMessage());
channel.close();
expectThrows(IOException.class, () -> AsynchronousFileChannel.open(dir.resolve("foobar"),
opts, executorService));
expectThrows(NoSuchFileException.class,
() -> AsynchronousFileChannel.open(dir.resolve("doesNotExist.rip")));
} finally {
executorService.shutdown();
executorService.awaitTermination(5, TimeUnit.SECONDS);
}
}
@Test
public void writeAsynchronousFileChannelCanceled() throws Exception {
DataBuffer foo = stringBuffer("foo");
DataBuffer bar = stringBuffer("bar");
Flux<DataBuffer> flux = Flux.just(foo, bar);
AsynchronousFileChannel channel =
AsynchronousFileChannel.open(tempFile, StandardOpenOption.WRITE);
Flux<DataBuffer> writeResult = DataBufferUtils.write(flux, channel);
StepVerifier.create(writeResult, 1)
.consumeNextWith(stringConsumer("foo"))
.thenCancel()
.verify();
String result = String.join("", Files.readAllLines(tempFile));
assertEquals("foo", result);
channel.close();
flux.subscribe(DataBufferUtils::release);
}
@Test
public void writeAsynchronousFileChannelErrorInFlux() throws Exception {
DataBuffer foo = stringBuffer("foo");
DataBuffer bar = stringBuffer("bar");
Flux<DataBuffer> flux =
Flux.just(foo, bar).concatWith(Mono.error(new RuntimeException()));
AsynchronousFileChannel channel =
AsynchronousFileChannel.open(tempFile, StandardOpenOption.WRITE);
Flux<DataBuffer> writeResult = DataBufferUtils.write(flux, channel);
StepVerifier.create(writeResult)
.consumeNextWith(stringConsumer("foo"))
.consumeNextWith(stringConsumer("bar"))
.expectError(RuntimeException.class)
.verify();
String result = String.join("", Files.readAllLines(tempFile));
assertEquals("foobar", result);
channel.close();
}
@Test
public void writeAsynchronousFileChannelCanceled() throws Exception {
DataBuffer foo = stringBuffer("foo");
DataBuffer bar = stringBuffer("bar");
Flux<DataBuffer> flux = Flux.just(foo, bar);
AsynchronousFileChannel channel =
AsynchronousFileChannel.open(tempFile, StandardOpenOption.WRITE);
Flux<DataBuffer> writeResult = DataBufferUtils.write(flux, channel);
StepVerifier.create(writeResult, 1)
.consumeNextWith(stringConsumer("foo"))
.thenCancel()
.verify();
String result = String.join("", Files.readAllLines(tempFile));
assertEquals("foo", result);
channel.close();
flux.subscribe(DataBufferUtils::release);
}
/** Test leaks via AsynchronousFileChannel.open */
public void testLeakAsyncFileChannel() throws IOException, InterruptedException {
Path dir = wrap(createTempDir());
OutputStream file = Files.newOutputStream(dir.resolve("stillopen"));
file.write(5);
file.close();
ExecutorService executorService = Executors.newFixedThreadPool(1,
new NamedThreadFactory("async-io"));
try {
AsynchronousFileChannel leak = AsynchronousFileChannel.open(dir.resolve("stillopen"),
Collections.emptySet(), executorService);
Exception e = expectThrows(Exception.class, () -> dir.getFileSystem().close());
assertTrue(e.getMessage().contains("file handle leaks"));
leak.close();
} finally {
executorService.shutdown();
executorService.awaitTermination(5, TimeUnit.SECONDS);
}
}
private String readContent(Path file) throws ExecutionException, InterruptedException {
AsynchronousFileChannel fileChannel = null;
try {
fileChannel = AsynchronousFileChannel.open(file, StandardOpenOption.READ);
} catch (IOException e) {
e.printStackTrace();
}
final ByteBuffer buffer = ByteBuffer.allocate(1024);
final Future<Integer> operation = fileChannel.read(buffer, 0);
operation.get();
final String fileContent = new String(buffer.array()).trim();
buffer.clear();
return fileContent;
}
/**
* Open the log
*
* @throws IOException if the log cannot be opened
*/
public void open() throws IOException {
if (channel == null) {
// if path has a directory, create full directory tree
final Path parent = path.getParent();
if (parent != null) {
parent.toFile().mkdirs();
}
if (truncateExisting) {
this.channel = AsynchronousFileChannel.open(path, StandardOpenOption.WRITE,
StandardOpenOption.CREATE, StandardOpenOption.TRUNCATE_EXISTING);
} else {
this.channel =
AsynchronousFileChannel.open(path, StandardOpenOption.WRITE, StandardOpenOption.CREATE);
}
}
position.set(channel.size());
}
/***************************************
* A helper function that opens a file channel for a certain file name and
* open options.
*
* @param sFileName The file name
* @param rMode The open option for the file access mode (e.g.
* READ, WRITE)
* @param rExtraOptions Optional extra file open options
*
* @return The file channel
*/
protected static AsynchronousFileChannel openFileChannel(
String sFileName,
OpenOption rMode,
OpenOption... rExtraOptions)
{
try
{
return AsynchronousFileChannel.open(
new File(sFileName).toPath(),
CollectionUtil.join(rExtraOptions, rMode));
}
catch (IOException e)
{
throw new CoroutineException(e);
}
}
public void writeFile(String filePath, String input) throws IOException {
Path path = Paths.get(filePath);
AsynchronousFileChannel afc = AsynchronousFileChannel.open(path, WRITE, CREATE);
WriteHandler handler = new WriteHandler();
ByteBuffer dataBuffer = ByteBuffer.wrap(input.getBytes());//getDataBuffer();
Attachment attach = new Attachment();
attach.asyncChannel = afc;
attach.buffer = dataBuffer;
attach.path = path;
afc.write(dataBuffer, 0, attach, handler);
System.out.println("Sleeping for 3 seconds...");
try {
Thread.sleep(3000);
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
private void save(boolean saveLaterIfWriting) {
// atomically sets to true if false:
boolean canSaveNow = currentlyWriting.compareAndSet(false, true);
if (canSaveNow) {// no writing is in progress: start one immediately
// Writes the config data to a ByteBuffer
Charray builder = new Charray(512);
writer.write(config, builder.asOutput());
CharBuffer chars = CharBuffer.wrap(builder);
ByteBuffer buffer = charset.encode(chars);
// Writes the ByteBuffer to the nioPath, asynchronously
synchronized (channelGuard) {
try {
channel = AsynchronousFileChannel.open(nioPath, openOptions);
channel.write(buffer, channel.size(), null, writeCompletedHandler);
} catch (IOException e) {
writeCompletedHandler.failed(e, null);
}
}
} else if (saveLaterIfWriting) {// there is a writing in progress: start one later
mustWriteAgain.set(true);
}
}
private void writeFile(String filePath, String input) throws IOException {
Path path = Paths.get(filePath);
AsynchronousFileChannel afc = AsynchronousFileChannel.open(path, WRITE, CREATE);
WriteHandler handler = new WriteHandler();
ByteBuffer dataBuffer = ByteBuffer.wrap(input.getBytes());//getDataBuffer();
Attachment attach = new Attachment();
attach.asyncChannel = afc;
attach.buffer = dataBuffer;
attach.path = path;
afc.write(dataBuffer, 0, attach, handler);
System.out.println("Sleeping for 3 seconds...");
try {
Thread.sleep(3000);
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
private void readFile(String filePath) throws IOException {
Path path = Paths.get(filePath);
AsynchronousFileChannel afc = AsynchronousFileChannel.open(path, READ);
ReadHandler handler = new ReadHandler();
int fileSize = (int) afc.size();
ByteBuffer dataBuffer = ByteBuffer.allocate(fileSize);
Attachment attach = new Attachment();
attach.asyncChannel = afc;
attach.buffer = dataBuffer;
attach.path = path;
afc.read(dataBuffer, 0, attach, handler);
System.out.println("Sleeping for 5 seconds...");
try {
Thread.sleep(5000);
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
public AsynchronousFileChannel getAsyncChannel() {
if (asyncChannel != null) {
if (asyncChannel.isOpen())
return asyncChannel;
}
synchronized (this) {
if (asyncChannel != null) { // check again while synchronized
if (asyncChannel.isOpen())
return asyncChannel;
}
try {
asyncChannel = AsynchronousFileChannel.open(path,
StandardOpenOption.READ);
} catch (IOException e) {
throw new RuntimeException(e);
}
asyncChannelOpenCount++;
return asyncChannel;
}
}
@Test
public void writeAsynchronousFileChannel() throws Exception {
DataBuffer foo = stringBuffer("foo");
DataBuffer bar = stringBuffer("bar");
DataBuffer baz = stringBuffer("baz");
DataBuffer qux = stringBuffer("qux");
Flux<DataBuffer> flux = Flux.just(foo, bar, baz, qux);
AsynchronousFileChannel channel =
AsynchronousFileChannel.open(tempFile, StandardOpenOption.WRITE);
Flux<DataBuffer> writeResult = DataBufferUtils.write(flux, channel);
verifyWrittenData(writeResult);
channel.close();
}
@Test
public void readAndWriteAsynchronousFileChannel() throws Exception {
Path source = Paths.get(
DataBufferUtilsTests.class.getResource("DataBufferUtilsTests.txt").toURI());
Flux<DataBuffer> sourceFlux = DataBufferUtils.readAsynchronousFileChannel(
() -> AsynchronousFileChannel.open(source, StandardOpenOption.READ),
this.bufferFactory, 3);
Path destination = Files.createTempFile("DataBufferUtilsTests", null);
AsynchronousFileChannel channel =
AsynchronousFileChannel.open(destination, StandardOpenOption.WRITE);
CountDownLatch latch = new CountDownLatch(1);
DataBufferUtils.write(sourceFlux, channel)
.subscribe(DataBufferUtils::release,
throwable -> fail(throwable.getMessage()),
() -> {
try {
String expected = String.join("", Files.readAllLines(source));
String result = String.join("", Files.readAllLines(destination));
assertEquals(expected, result);
latch.countDown();
}
catch (IOException e) {
fail(e.getMessage());
}
finally {
DataBufferUtils.closeChannel(channel);
}
});
latch.await();
}
/**
* Creates I/O implementation for specified {@code file}
* @param file Random access file
* @param modes Open modes.
*/
public AsyncFileIO(File file, ThreadLocal<ChannelOpFuture> holder, OpenOption... modes) throws IOException {
ch = AsynchronousFileChannel.open(file.toPath(), modes);
fd = getFileDescriptor(ch);
fsBlockSize = FileSystemUtils.getFileSystemBlockSize(fd);
this.holder = holder;
}
private AsynchronousFileChannel openChannel(Path p) {
try {
return AsynchronousFileChannel.open(p, StandardOpenOption.CREATE_NEW, StandardOpenOption.WRITE);
} catch (IOException e) {
throw new UncheckedIOException(e);
}
}
public AsyncFileChannelVol(File file, boolean readOnly){
this.readOnly = readOnly;
this.file = file;
try {
this.channel = readOnly?
AsynchronousFileChannel.open(file.toPath(),StandardOpenOption.READ):
AsynchronousFileChannel.open(file.toPath(),StandardOpenOption.READ, StandardOpenOption.WRITE, StandardOpenOption.CREATE);
} catch (IOException e) {
throw new IOError(e);
}
}
private AsynchronousFileChannel createChannel(Path path) throws IOException {
return AsynchronousFileChannel.open(path, StandardOpenOption.WRITE, StandardOpenOption.CREATE_NEW);
}
public static void main(String[] args) {
try {
//Path path = Paths.get("data/nio-data.txt");
Path path = Paths.get("data/test-write2.txt");
AsynchronousFileChannel fileChannelW =
AsynchronousFileChannel.open(path, StandardOpenOption.WRITE);
ByteBuffer buffer = ByteBuffer.allocate(1024);
long positionW = 0;
buffer.put("test data\r using Java SE NIO \r async with Future".getBytes());
buffer.flip();
Future<Integer> operationW = fileChannelW.write(buffer, positionW);
buffer.clear();
while(!operationW.isDone());
System.out.println("Write done");
AsynchronousFileChannel fileChannelR =
AsynchronousFileChannel.open(path, StandardOpenOption.READ);
//ByteBuffer buffer = ByteBuffer.allocate(1024);
buffer = ByteBuffer.allocate(1024);
long positionR = 0;
Future<Integer> operationR = fileChannelR.read(buffer, positionR);
while(!operationR.isDone());
buffer.flip();
byte[] data = new byte[buffer.limit()];
buffer.get(data);
System.out.println(new String(data));
buffer.clear();
} catch(IOException ioe) {
ioe.printStackTrace();
}
}