下面列出了怎么用java.nio.channels.AsynchronousFileChannel的API类实例代码及写法,或者点击链接到github查看源代码。
/**
* Obtain a {@code AsynchronousFileChannel} from the given supplier, and
* read it into a {@code Flux} of {@code DataBuffer}s, starting at the given
* position. Closes the channel when the Flux is terminated.
* @param channelSupplier the supplier for the channel to read from
* @param position the position to start reading from
* @param bufferFactory the factory to create data buffers with
* @param bufferSize the maximum size of the data buffers
* @return a Flux of data buffers read from the given channel
*/
public static Flux<DataBuffer> readAsynchronousFileChannel(
Callable<AsynchronousFileChannel> channelSupplier, long position,
DataBufferFactory bufferFactory, int bufferSize) {
Assert.notNull(channelSupplier, "'channelSupplier' must not be null");
Assert.notNull(bufferFactory, "'dataBufferFactory' must not be null");
Assert.isTrue(position >= 0, "'position' must be >= 0");
Assert.isTrue(bufferSize > 0, "'bufferSize' must be > 0");
Flux<DataBuffer> flux = Flux.using(channelSupplier,
channel -> Flux.create(sink -> {
ReadCompletionHandler handler =
new ReadCompletionHandler(channel, sink, position, bufferFactory, bufferSize);
sink.onDispose(handler::dispose);
DataBuffer dataBuffer = bufferFactory.allocateBuffer(bufferSize);
ByteBuffer byteBuffer = dataBuffer.asByteBuffer(0, bufferSize);
channel.read(byteBuffer, position, dataBuffer, handler);
}),
channel -> {
// Do not close channel from here, rather wait for the current read callback
// and then complete after releasing the DataBuffer.
});
return flux.doOnDiscard(PooledDataBuffer.class, DataBufferUtils::release);
}
/**
* Read the given {@code Resource} into a {@code Flux} of {@code DataBuffer}s
* starting at the given position.
* <p>If the resource is a file, it is read into an
* {@code AsynchronousFileChannel} and turned to {@code Flux} via
* {@link #readAsynchronousFileChannel(Callable, DataBufferFactory, int)} or else
* fall back on {@link #readByteChannel(Callable, DataBufferFactory, int)}.
* Closes the channel when the flux is terminated.
* @param resource the resource to read from
* @param position the position to start reading from
* @param bufferFactory the factory to create data buffers with
* @param bufferSize the maximum size of the data buffers
* @return a Flux of data buffers read from the given channel
*/
public static Flux<DataBuffer> read(
Resource resource, long position, DataBufferFactory bufferFactory, int bufferSize) {
try {
if (resource.isFile()) {
File file = resource.getFile();
return readAsynchronousFileChannel(
() -> AsynchronousFileChannel.open(file.toPath(), StandardOpenOption.READ),
position, bufferFactory, bufferSize);
}
}
catch (IOException ignore) {
// fallback to resource.readableChannel(), below
}
Flux<DataBuffer> result = readByteChannel(resource::readableChannel, bufferFactory, bufferSize);
return position == 0 ? result : skipUntilByteCount(result, position);
}
private String readContent(Path file) throws ExecutionException, InterruptedException {
AsynchronousFileChannel fileChannel = null;
try {
fileChannel = AsynchronousFileChannel.open(file, StandardOpenOption.READ);
} catch (IOException e) {
e.printStackTrace();
}
final ByteBuffer buffer = ByteBuffer.allocate(1024);
final Future<Integer> operation = fileChannel.read(buffer, 0);
operation.get();
final String fileContent = new String(buffer.array()).trim();
buffer.clear();
return fileContent;
}
@Test
public void givenPathAndContent_whenWritesToFileWithFuture_thenCorrect() throws IOException, ExecutionException, InterruptedException {
final String fileName = "temp";
final Path path = Paths.get(fileName);
final AsynchronousFileChannel fileChannel = AsynchronousFileChannel.open(path, StandardOpenOption.WRITE, StandardOpenOption.CREATE);
final ByteBuffer buffer = ByteBuffer.allocate(1024);
final long position = 0;
buffer.put("hello world".getBytes());
buffer.flip();
final Future<Integer> operation = fileChannel.write(buffer, position);
buffer.clear();
operation.get();
final String content = readContent(path);
assertEquals("hello world", content);
}
@Test
public void writeAsynchronousFileChannelErrorInFlux() throws Exception {
DataBuffer foo = stringBuffer("foo");
DataBuffer bar = stringBuffer("bar");
Flux<DataBuffer> flux =
Flux.just(foo, bar).concatWith(Mono.error(new RuntimeException()));
AsynchronousFileChannel channel =
AsynchronousFileChannel.open(tempFile, StandardOpenOption.WRITE);
Flux<DataBuffer> writeResult = DataBufferUtils.write(flux, channel);
StepVerifier.create(writeResult)
.consumeNextWith(stringConsumer("foo"))
.consumeNextWith(stringConsumer("bar"))
.expectError(RuntimeException.class)
.verify();
String result = String.join("", Files.readAllLines(tempFile));
assertEquals("foobar", result);
channel.close();
}
@Test
public void writeAsynchronousFileChannelCanceled() throws Exception {
DataBuffer foo = stringBuffer("foo");
DataBuffer bar = stringBuffer("bar");
Flux<DataBuffer> flux = Flux.just(foo, bar);
AsynchronousFileChannel channel =
AsynchronousFileChannel.open(tempFile, StandardOpenOption.WRITE);
Flux<DataBuffer> writeResult = DataBufferUtils.write(flux, channel);
StepVerifier.create(writeResult, 1)
.consumeNextWith(stringConsumer("foo"))
.thenCancel()
.verify();
String result = String.join("", Files.readAllLines(tempFile));
assertEquals("foo", result);
channel.close();
flux.subscribe(DataBufferUtils::release);
}
/**
* Obtain a {@code AsynchronousFileChannel} from the given supplier, and read it into a
* {@code Flux} of {@code DataBuffer}s, starting at the given position. Closes the
* channel when the flux is terminated.
* @param channelSupplier the supplier for the channel to read from
* @param position the position to start reading from
* @param dataBufferFactory the factory to create data buffers with
* @param bufferSize the maximum size of the data buffers
* @return a flux of data buffers read from the given channel
*/
public static Flux<DataBuffer> readAsynchronousFileChannel(Callable<AsynchronousFileChannel> channelSupplier,
long position, DataBufferFactory dataBufferFactory, int bufferSize) {
Assert.notNull(channelSupplier, "'channelSupplier' must not be null");
Assert.notNull(dataBufferFactory, "'dataBufferFactory' must not be null");
Assert.isTrue(position >= 0, "'position' must be >= 0");
Assert.isTrue(bufferSize > 0, "'bufferSize' must be > 0");
DataBuffer dataBuffer = dataBufferFactory.allocateBuffer(bufferSize);
ByteBuffer byteBuffer = dataBuffer.asByteBuffer(0, bufferSize);
Flux<DataBuffer> result = Flux.using(channelSupplier,
channel -> Flux.create(sink -> {
AsynchronousFileChannelReadCompletionHandler completionHandler =
new AsynchronousFileChannelReadCompletionHandler(channel,
sink, position, dataBufferFactory, bufferSize);
channel.read(byteBuffer, position, dataBuffer, completionHandler);
sink.onDispose(completionHandler::dispose);
}),
DataBufferUtils::closeChannel);
return result.doOnDiscard(PooledDataBuffer.class, DataBufferUtils::release);
}
/**
* Read the given {@code Resource} into a {@code Flux} of {@code DataBuffer}s
* starting at the given position.
* <p>If the resource is a file, it is read into an
* {@code AsynchronousFileChannel} and turned to {@code Flux} via
* {@link #readAsynchronousFileChannel(Callable, DataBufferFactory, int)} or else
* fall back on {@link #readByteChannel(Callable, DataBufferFactory, int)}.
* Closes the channel when the flux is terminated.
* @param resource the resource to read from
* @param position the position to start reading from
* @param dataBufferFactory the factory to create data buffers with
* @param bufferSize the maximum size of the data buffers
* @return a flux of data buffers read from the given channel
*/
public static Flux<DataBuffer> read(
Resource resource, long position, DataBufferFactory dataBufferFactory, int bufferSize) {
try {
if (resource.isFile()) {
File file = resource.getFile();
return readAsynchronousFileChannel(
() -> AsynchronousFileChannel.open(file.toPath(), StandardOpenOption.READ),
position, dataBufferFactory, bufferSize);
}
}
catch (IOException ignore) {
// fallback to resource.readableChannel(), below
}
Flux<DataBuffer> result = readByteChannel(resource::readableChannel, dataBufferFactory, bufferSize);
return position == 0 ? result : skipUntilByteCount(result, position);
}
/**
* Write the given stream of {@link DataBuffer DataBuffers} to the given {@code AsynchronousFileChannel}.
* Does <strong>not</strong> close the channel when the flux is terminated, and does
* <strong>not</strong> {@linkplain #release(DataBuffer) release} the data buffers in the
* source. If releasing is required, then subscribe to the returned {@code Flux} with a
* {@link #releaseConsumer()}.
* <p>Note that the writing process does not start until the returned {@code Flux} is subscribed to.
* @param source the stream of data buffers to be written
* @param channel the channel to write to
* @param position the file position at which the write is to begin; must be non-negative
* @return a flux containing the same buffers as in {@code source}, that starts the writing
* process when subscribed to, and that publishes any writing errors and the completion signal
*/
public static Flux<DataBuffer> write(
Publisher<DataBuffer> source, AsynchronousFileChannel channel, long position) {
Assert.notNull(source, "'source' must not be null");
Assert.notNull(channel, "'channel' must not be null");
Assert.isTrue(position >= 0, "'position' must be >= 0");
Flux<DataBuffer> flux = Flux.from(source);
return Flux.create(sink -> {
AsynchronousFileChannelWriteCompletionHandler completionHandler =
new AsynchronousFileChannelWriteCompletionHandler(sink, channel, position);
sink.onDispose(completionHandler);
flux.subscribe(completionHandler);
});
}
public AsynchronousFileChannel getAsyncChannel() {
if (asyncChannel != null) {
if (asyncChannel.isOpen())
return asyncChannel;
}
synchronized (this) {
if (asyncChannel != null) { // check again while synchronized
if (asyncChannel.isOpen())
return asyncChannel;
}
try {
asyncChannel = AsynchronousFileChannel.open(path,
StandardOpenOption.READ);
} catch (IOException e) {
throw new RuntimeException(e);
}
asyncChannelOpenCount++;
return asyncChannel;
}
}
@Test
public void writeAsynchronousFileChannelErrorInFlux() throws Exception {
DataBuffer foo = stringBuffer("foo");
DataBuffer bar = stringBuffer("bar");
Flux<DataBuffer> flux =
Flux.just(foo, bar).concatWith(Mono.error(new RuntimeException()));
AsynchronousFileChannel channel =
AsynchronousFileChannel.open(tempFile, StandardOpenOption.WRITE);
Flux<DataBuffer> writeResult = DataBufferUtils.write(flux, channel);
StepVerifier.create(writeResult)
.consumeNextWith(stringConsumer("foo"))
.consumeNextWith(stringConsumer("bar"))
.expectError(RuntimeException.class)
.verify();
String result = String.join("", Files.readAllLines(tempFile));
assertEquals("foobar", result);
channel.close();
}
@Test
public void writeAsynchronousFileChannelCanceled() throws Exception {
DataBuffer foo = stringBuffer("foo");
DataBuffer bar = stringBuffer("bar");
Flux<DataBuffer> flux = Flux.just(foo, bar);
AsynchronousFileChannel channel =
AsynchronousFileChannel.open(tempFile, StandardOpenOption.WRITE);
Flux<DataBuffer> writeResult = DataBufferUtils.write(flux, channel);
StepVerifier.create(writeResult, 1)
.consumeNextWith(stringConsumer("foo"))
.thenCancel()
.verify();
String result = String.join("", Files.readAllLines(tempFile));
assertEquals("foo", result);
channel.close();
flux.subscribe(DataBufferUtils::release);
}
/**
* Open the log
*
* @throws IOException if the log cannot be opened
*/
public void open() throws IOException {
if (channel == null) {
// if path has a directory, create full directory tree
final Path parent = path.getParent();
if (parent != null) {
parent.toFile().mkdirs();
}
if (truncateExisting) {
this.channel = AsynchronousFileChannel.open(path, StandardOpenOption.WRITE,
StandardOpenOption.CREATE, StandardOpenOption.TRUNCATE_EXISTING);
} else {
this.channel =
AsynchronousFileChannel.open(path, StandardOpenOption.WRITE, StandardOpenOption.CREATE);
}
}
position.set(channel.size());
}
/***************************************
* A helper function that opens a file channel for a certain file name and
* open options.
*
* @param sFileName The file name
* @param rMode The open option for the file access mode (e.g.
* READ, WRITE)
* @param rExtraOptions Optional extra file open options
*
* @return The file channel
*/
protected static AsynchronousFileChannel openFileChannel(
String sFileName,
OpenOption rMode,
OpenOption... rExtraOptions)
{
try
{
return AsynchronousFileChannel.open(
new File(sFileName).toPath(),
CollectionUtil.join(rExtraOptions, rMode));
}
catch (IOException e)
{
throw new CoroutineException(e);
}
}
/***************************************
* {@inheritDoc}
*/
@Override
protected ByteBuffer execute(
ByteBuffer rData,
Continuation<?> rContinuation)
{
try
{
AsynchronousFileChannel rChannel = getFileChannel(rContinuation);
performBlockingOperation(rChannel, rData);
}
catch (Exception e)
{
throw new CoroutineException(e);
}
return rData;
}
/***************************************
* Returns the channel to be used by this step. This first checks the
* currently exexcuting coroutine in the continuation parameter for an
* existing {@link #FILE_CHANNEL} relation. If that doesn't exists or the
* channel is closed a new {@link AsynchronousFileChannel} will be opened
* and stored in the coroutine relation. Using the coroutine to store the
* channel allows coroutines to be structured so that multiple subroutines
* perform communication on different channels.
*
* @param rContinuation The continuation to query for an existing channel
*
* @return The socket channel
*
* @throws IOException If opening the channel fails
*/
protected AsynchronousFileChannel getFileChannel(
Continuation<?> rContinuation) throws IOException
{
Coroutine<?, ?> rCoroutine = rContinuation.getCurrentCoroutine();
AsynchronousFileChannel rChannel = rCoroutine.get(FILE_CHANNEL);
if (rChannel == null || !rChannel.isOpen())
{
rChannel = fGetFileChannel.apply(rContinuation);
rCoroutine.set(FILE_CHANNEL, rChannel).annotate(MANAGED);
}
return rChannel;
}
/***************************************
* Opens a {@link AsynchronousFileChannel} and then performs the channel
* operation asynchronously.
*
* @param rData The byte buffer of the data to be processed
* @param rSuspension The coroutine suspension to be resumed when the
* operation is complete
*/
private void transferAsync(
ByteBuffer rData,
Suspension<ByteBuffer> rSuspension)
{
try
{
AsynchronousFileChannel rChannel =
getFileChannel(rSuspension.continuation());
performAsyncOperation(
FIRST_OPERATION,
rChannel,
rData,
new ChannelCallback<>(
rChannel,
rSuspension,
this::performAsyncOperation));
}
catch (Exception e)
{
rSuspension.fail(e);
}
}
/**
* Schedule a read on the channel. If the operation was previously
* schedule and is done (normal completion), then return immediately. If
* the operation was previously schedule and was cancelled, then throws
* out a CancellationException. If the operation was previously schedule
* and failed, then the future is cleared and the operation is
* rescheduled. This is done in order to allow us to complete a high
* level read on the channel when the backing channel may have been
* closed by an interrupt in another thread due to Java IO channel
* semantics (e.g., driven by query termination during reads).
*
* @param channel The channel.
*
* @throws IllegalArgumentException
* @throws NonReadableChannelException
* @throws CancellationException
* @throws InterruptedException
*/
private void read(final AsynchronousFileChannel channel)
throws IllegalArgumentException, NonReadableChannelException, CancellationException, InterruptedException {
if (isDone()) { // Check for re-scheduling of the read().
try {
/*
* Note: It is either unlikely or impossible to have an
* InterruptedException thrown out here since we know that
* the Future isDone().
*/
m_fut.get(); // throws CancellationException, ExecutionException, InterruptedException.
} catch (ExecutionException ex) {
/*
* This read() had failed. We clear future so we can re-do
* the read.
*/
m_fut = null;
}
}
if(!isDone()) {
// ensure buffer is ready
m_buffer.reset();
m_fut = channel.read(m_buffer, m_addr); // throws IllegalArgumentException, NonReadableChannelException
}
}
private void save(boolean saveLaterIfWriting) {
// atomically sets to true if false:
boolean canSaveNow = currentlyWriting.compareAndSet(false, true);
if (canSaveNow) {// no writing is in progress: start one immediately
// Writes the config data to a ByteBuffer
Charray builder = new Charray(512);
writer.write(config, builder.asOutput());
CharBuffer chars = CharBuffer.wrap(builder);
ByteBuffer buffer = charset.encode(chars);
// Writes the ByteBuffer to the nioPath, asynchronously
synchronized (channelGuard) {
try {
channel = AsynchronousFileChannel.open(nioPath, openOptions);
channel.write(buffer, channel.size(), null, writeCompletedHandler);
} catch (IOException e) {
writeCompletedHandler.failed(e, null);
}
}
} else if (saveLaterIfWriting) {// there is a writing in progress: start one later
mustWriteAgain.set(true);
}
}
public void readFile(String filePath) throws IOException {
Path path = Paths.get(filePath);
AsynchronousFileChannel afc = AsynchronousFileChannel.open(path, READ);
ReadHandler handler = new ReadHandler();
int fileSize = (int) afc.size();
ByteBuffer dataBuffer = ByteBuffer.allocate(fileSize);
Attachment attach = new Attachment();
attach.asyncChannel = afc;
attach.buffer = dataBuffer;
attach.path = path;
afc.read(dataBuffer, 0, attach, handler);
System.out.println("Sleeping for 5 seconds...");
try {
Thread.sleep(5000);
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
private void writeFile(String filePath, String input) throws IOException {
Path path = Paths.get(filePath);
AsynchronousFileChannel afc = AsynchronousFileChannel.open(path, WRITE, CREATE);
WriteHandler handler = new WriteHandler();
ByteBuffer dataBuffer = ByteBuffer.wrap(input.getBytes());//getDataBuffer();
Attachment attach = new Attachment();
attach.asyncChannel = afc;
attach.buffer = dataBuffer;
attach.path = path;
afc.write(dataBuffer, 0, attach, handler);
System.out.println("Sleeping for 3 seconds...");
try {
Thread.sleep(3000);
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
private void readFile(String filePath) throws IOException {
Path path = Paths.get(filePath);
AsynchronousFileChannel afc = AsynchronousFileChannel.open(path, READ);
ReadHandler handler = new ReadHandler();
int fileSize = (int) afc.size();
ByteBuffer dataBuffer = ByteBuffer.allocate(fileSize);
Attachment attach = new Attachment();
attach.asyncChannel = afc;
attach.buffer = dataBuffer;
attach.path = path;
afc.read(dataBuffer, 0, attach, handler);
System.out.println("Sleeping for 5 seconds...");
try {
Thread.sleep(5000);
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
@Override
public void subscribe(Subscriber<? super ByteBuffer> s) {
try {
AsynchronousFileChannel channel = openInputChannel(this.path);
// We need to synchronize here because the subscriber could call
// request() from within onSubscribe which would potentially
// trigger onNext before onSubscribe is finished.
Subscription subscription = new FileSubscription(channel, s, chunkSizeInBytes);
synchronized (subscription) {
s.onSubscribe(subscription);
}
} catch (IOException e) {
// subscribe() must return normally, so we need to signal the
// failure to open via onError() once onSubscribe() is signaled.
s.onSubscribe(new NoopSubscription(s));
s.onError(e);
}
}
/** Test leaks via AsynchronousFileChannel.open */
public void testLeakAsyncFileChannel() throws IOException, InterruptedException {
Path dir = wrap(createTempDir());
OutputStream file = Files.newOutputStream(dir.resolve("stillopen"));
file.write(5);
file.close();
ExecutorService executorService = Executors.newFixedThreadPool(1,
new NamedThreadFactory("async-io"));
try {
AsynchronousFileChannel leak = AsynchronousFileChannel.open(dir.resolve("stillopen"),
Collections.emptySet(), executorService);
Exception e = expectThrows(Exception.class, () -> dir.getFileSystem().close());
assertTrue(e.getMessage().contains("file handle leaks"));
leak.close();
} finally {
executorService.shutdown();
executorService.awaitTermination(5, TimeUnit.SECONDS);
}
}
/** Test AsynchronousFileChannel.open */
public void testAsyncFileChannel() throws IOException, InterruptedException {
InfoStreamListener stream = new InfoStreamListener("newAsynchronousFileChannel");
Path dir = wrap(createTempDir(), stream);
ExecutorService executorService = Executors.newFixedThreadPool(1,
new NamedThreadFactory("async-io"));
try {
Set<StandardOpenOption> opts = Set
.of(StandardOpenOption.CREATE_NEW, StandardOpenOption.READ,
StandardOpenOption.WRITE);
AsynchronousFileChannel channel = AsynchronousFileChannel
.open(dir.resolve("foobar"), opts, executorService);
assertTrue(stream.sawMessage());
channel.close();
expectThrows(IOException.class, () -> AsynchronousFileChannel.open(dir.resolve("foobar"),
opts, executorService));
expectThrows(NoSuchFileException.class,
() -> AsynchronousFileChannel.open(dir.resolve("doesNotExist.rip")));
} finally {
executorService.shutdown();
executorService.awaitTermination(5, TimeUnit.SECONDS);
}
}
public ReadCompletionHandler(AsynchronousFileChannel channel,
FluxSink<DataBuffer> sink, long position, DataBufferFactory dataBufferFactory, int bufferSize) {
this.channel = channel;
this.sink = sink;
this.position = new AtomicLong(position);
this.dataBufferFactory = dataBufferFactory;
this.bufferSize = bufferSize;
}
public WriteCompletionHandler(
FluxSink<DataBuffer> sink, AsynchronousFileChannel channel, long position) {
this.sink = sink;
this.channel = channel;
this.position = new AtomicLong(position);
}
@Test
public void readAsynchronousFileChannelPosition() throws Exception {
URI uri = this.resource.getURI();
Flux<DataBuffer> flux = DataBufferUtils.readAsynchronousFileChannel(
() -> AsynchronousFileChannel.open(Paths.get(uri), StandardOpenOption.READ),
9, this.bufferFactory, 3);
StepVerifier.create(flux)
.consumeNextWith(stringConsumer("qux"))
.expectComplete()
.verify(Duration.ofSeconds(5));
}
@Test
public void readAsynchronousFileChannelCancel() throws Exception {
URI uri = this.resource.getURI();
Flux<DataBuffer> flux = DataBufferUtils.readAsynchronousFileChannel(
() -> AsynchronousFileChannel.open(Paths.get(uri), StandardOpenOption.READ),
this.bufferFactory, 3);
StepVerifier.create(flux)
.consumeNextWith(stringConsumer("foo"))
.thenCancel()
.verify();
}
@Test
public void writeAsynchronousFileChannel() throws Exception {
DataBuffer foo = stringBuffer("foo");
DataBuffer bar = stringBuffer("bar");
DataBuffer baz = stringBuffer("baz");
DataBuffer qux = stringBuffer("qux");
Flux<DataBuffer> flux = Flux.just(foo, bar, baz, qux);
AsynchronousFileChannel channel =
AsynchronousFileChannel.open(tempFile, StandardOpenOption.WRITE);
Flux<DataBuffer> writeResult = DataBufferUtils.write(flux, channel);
verifyWrittenData(writeResult);
channel.close();
}