类java.nio.channels.AsynchronousFileChannel源码实例Demo

下面列出了怎么用java.nio.channels.AsynchronousFileChannel的API类实例代码及写法,或者点击链接到github查看源代码。

源代码1 项目: spring-analysis-note   文件: DataBufferUtils.java
/**
 * Obtain a {@code AsynchronousFileChannel} from the given supplier, and
 * read it into a {@code Flux} of {@code DataBuffer}s, starting at the given
 * position. Closes the channel when the Flux is terminated.
 * @param channelSupplier the supplier for the channel to read from
 * @param position the position to start reading from
 * @param bufferFactory the factory to create data buffers with
 * @param bufferSize the maximum size of the data buffers
 * @return a Flux of data buffers read from the given channel
 */
public static Flux<DataBuffer> readAsynchronousFileChannel(
		Callable<AsynchronousFileChannel> channelSupplier, long position,
		DataBufferFactory bufferFactory, int bufferSize) {

	Assert.notNull(channelSupplier, "'channelSupplier' must not be null");
	Assert.notNull(bufferFactory, "'dataBufferFactory' must not be null");
	Assert.isTrue(position >= 0, "'position' must be >= 0");
	Assert.isTrue(bufferSize > 0, "'bufferSize' must be > 0");

	Flux<DataBuffer> flux = Flux.using(channelSupplier,
			channel -> Flux.create(sink -> {
				ReadCompletionHandler handler =
						new ReadCompletionHandler(channel, sink, position, bufferFactory, bufferSize);
				sink.onDispose(handler::dispose);
				DataBuffer dataBuffer = bufferFactory.allocateBuffer(bufferSize);
				ByteBuffer byteBuffer = dataBuffer.asByteBuffer(0, bufferSize);
				channel.read(byteBuffer, position, dataBuffer, handler);
			}),
			channel -> {
				// Do not close channel from here, rather wait for the current read callback
				// and then complete after releasing the DataBuffer.
			});

	return flux.doOnDiscard(PooledDataBuffer.class, DataBufferUtils::release);
}
 
源代码2 项目: spring-analysis-note   文件: DataBufferUtils.java
/**
 * Read the given {@code Resource} into a {@code Flux} of {@code DataBuffer}s
 * starting at the given position.
 * <p>If the resource is a file, it is read into an
 * {@code AsynchronousFileChannel} and turned to {@code Flux} via
 * {@link #readAsynchronousFileChannel(Callable, DataBufferFactory, int)} or else
 * fall back on {@link #readByteChannel(Callable, DataBufferFactory, int)}.
 * Closes the channel when the flux is terminated.
 * @param resource the resource to read from
 * @param position the position to start reading from
 * @param bufferFactory the factory to create data buffers with
 * @param bufferSize the maximum size of the data buffers
 * @return a Flux of data buffers read from the given channel
 */
public static Flux<DataBuffer> read(
		Resource resource, long position, DataBufferFactory bufferFactory, int bufferSize) {

	try {
		if (resource.isFile()) {
			File file = resource.getFile();
			return readAsynchronousFileChannel(
					() -> AsynchronousFileChannel.open(file.toPath(), StandardOpenOption.READ),
					position, bufferFactory, bufferSize);
		}
	}
	catch (IOException ignore) {
		// fallback to resource.readableChannel(), below
	}
	Flux<DataBuffer> result = readByteChannel(resource::readableChannel, bufferFactory, bufferSize);
	return position == 0 ? result : skipUntilByteCount(result, position);
}
 
private String readContent(Path file) throws ExecutionException, InterruptedException {
    AsynchronousFileChannel fileChannel = null;
    try {
        fileChannel = AsynchronousFileChannel.open(file, StandardOpenOption.READ);
    } catch (IOException e) {
        e.printStackTrace();
    }

    final ByteBuffer buffer = ByteBuffer.allocate(1024);

    final Future<Integer> operation = fileChannel.read(buffer, 0);

    operation.get();

    final String fileContent = new String(buffer.array()).trim();
    buffer.clear();
    return fileContent;
}
 
@Test
public void givenPathAndContent_whenWritesToFileWithFuture_thenCorrect() throws IOException, ExecutionException, InterruptedException {
    final String fileName = "temp";
    final Path path = Paths.get(fileName);
    final AsynchronousFileChannel fileChannel = AsynchronousFileChannel.open(path, StandardOpenOption.WRITE, StandardOpenOption.CREATE);

    final ByteBuffer buffer = ByteBuffer.allocate(1024);
    final long position = 0;

    buffer.put("hello world".getBytes());
    buffer.flip();

    final Future<Integer> operation = fileChannel.write(buffer, position);
    buffer.clear();

    operation.get();

    final String content = readContent(path);
    assertEquals("hello world", content);
}
 
@Test
public void writeAsynchronousFileChannelErrorInFlux() throws Exception {
	DataBuffer foo = stringBuffer("foo");
	DataBuffer bar = stringBuffer("bar");
	Flux<DataBuffer> flux =
			Flux.just(foo, bar).concatWith(Mono.error(new RuntimeException()));

	AsynchronousFileChannel channel =
			AsynchronousFileChannel.open(tempFile, StandardOpenOption.WRITE);

	Flux<DataBuffer> writeResult = DataBufferUtils.write(flux, channel);
	StepVerifier.create(writeResult)
			.consumeNextWith(stringConsumer("foo"))
			.consumeNextWith(stringConsumer("bar"))
			.expectError(RuntimeException.class)
			.verify();

	String result = String.join("", Files.readAllLines(tempFile));

	assertEquals("foobar", result);
	channel.close();
}
 
@Test
public void writeAsynchronousFileChannelCanceled() throws Exception {
	DataBuffer foo = stringBuffer("foo");
	DataBuffer bar = stringBuffer("bar");
	Flux<DataBuffer> flux = Flux.just(foo, bar);

	AsynchronousFileChannel channel =
			AsynchronousFileChannel.open(tempFile, StandardOpenOption.WRITE);

	Flux<DataBuffer> writeResult = DataBufferUtils.write(flux, channel);
	StepVerifier.create(writeResult, 1)
			.consumeNextWith(stringConsumer("foo"))
			.thenCancel()
			.verify();

	String result = String.join("", Files.readAllLines(tempFile));

	assertEquals("foo", result);
	channel.close();

	flux.subscribe(DataBufferUtils::release);
}
 
源代码7 项目: java-technology-stack   文件: DataBufferUtils.java
/**
 * Obtain a {@code AsynchronousFileChannel} from the given supplier, and read it into a
 * {@code Flux} of {@code DataBuffer}s, starting at the given position. Closes the
 * channel when the flux is terminated.
 * @param channelSupplier the supplier for the channel to read from
 * @param position the position to start reading from
 * @param dataBufferFactory the factory to create data buffers with
 * @param bufferSize the maximum size of the data buffers
 * @return a flux of data buffers read from the given channel
 */
public static Flux<DataBuffer> readAsynchronousFileChannel(Callable<AsynchronousFileChannel> channelSupplier,
		long position, DataBufferFactory dataBufferFactory, int bufferSize) {

	Assert.notNull(channelSupplier, "'channelSupplier' must not be null");
	Assert.notNull(dataBufferFactory, "'dataBufferFactory' must not be null");
	Assert.isTrue(position >= 0, "'position' must be >= 0");
	Assert.isTrue(bufferSize > 0, "'bufferSize' must be > 0");

	DataBuffer dataBuffer = dataBufferFactory.allocateBuffer(bufferSize);
	ByteBuffer byteBuffer = dataBuffer.asByteBuffer(0, bufferSize);

	Flux<DataBuffer> result = Flux.using(channelSupplier,
			channel -> Flux.create(sink -> {
				AsynchronousFileChannelReadCompletionHandler completionHandler =
						new AsynchronousFileChannelReadCompletionHandler(channel,
								sink, position, dataBufferFactory, bufferSize);
				channel.read(byteBuffer, position, dataBuffer, completionHandler);
				sink.onDispose(completionHandler::dispose);
			}),
			DataBufferUtils::closeChannel);

	return result.doOnDiscard(PooledDataBuffer.class, DataBufferUtils::release);
}
 
源代码8 项目: java-technology-stack   文件: DataBufferUtils.java
/**
 * Read the given {@code Resource} into a {@code Flux} of {@code DataBuffer}s
 * starting at the given position.
 * <p>If the resource is a file, it is read into an
 * {@code AsynchronousFileChannel} and turned to {@code Flux} via
 * {@link #readAsynchronousFileChannel(Callable, DataBufferFactory, int)} or else
 * fall back on {@link #readByteChannel(Callable, DataBufferFactory, int)}.
 * Closes the channel when the flux is terminated.
 * @param resource the resource to read from
 * @param position the position to start reading from
 * @param dataBufferFactory the factory to create data buffers with
 * @param bufferSize the maximum size of the data buffers
 * @return a flux of data buffers read from the given channel
 */
public static Flux<DataBuffer> read(
		Resource resource, long position, DataBufferFactory dataBufferFactory, int bufferSize) {

	try {
		if (resource.isFile()) {
			File file = resource.getFile();
			return readAsynchronousFileChannel(
					() -> AsynchronousFileChannel.open(file.toPath(), StandardOpenOption.READ),
					position, dataBufferFactory, bufferSize);
		}
	}
	catch (IOException ignore) {
		// fallback to resource.readableChannel(), below
	}

	Flux<DataBuffer> result = readByteChannel(resource::readableChannel, dataBufferFactory, bufferSize);
	return position == 0 ? result : skipUntilByteCount(result, position);
}
 
源代码9 项目: java-technology-stack   文件: DataBufferUtils.java
/**
 * Write the given stream of {@link DataBuffer DataBuffers} to the given {@code AsynchronousFileChannel}.
 * Does <strong>not</strong> close the channel when the flux is terminated, and does
 * <strong>not</strong> {@linkplain #release(DataBuffer) release} the data buffers in the
 * source. If releasing is required, then subscribe to the returned {@code Flux} with a
 * {@link #releaseConsumer()}.
 * <p>Note that the writing process does not start until the returned {@code Flux} is subscribed to.
 * @param source the stream of data buffers to be written
 * @param channel the channel to write to
 * @param position the file position at which the write is to begin; must be non-negative
 * @return a flux containing the same buffers as in {@code source}, that starts the writing
 * process when subscribed to, and that publishes any writing errors and the completion signal
 */
public static Flux<DataBuffer> write(
		Publisher<DataBuffer> source, AsynchronousFileChannel channel, long position) {

	Assert.notNull(source, "'source' must not be null");
	Assert.notNull(channel, "'channel' must not be null");
	Assert.isTrue(position >= 0, "'position' must be >= 0");

	Flux<DataBuffer> flux = Flux.from(source);
	return Flux.create(sink -> {
		AsynchronousFileChannelWriteCompletionHandler completionHandler =
				new AsynchronousFileChannelWriteCompletionHandler(sink, channel, position);
		sink.onDispose(completionHandler);
		flux.subscribe(completionHandler);
	});
}
 
源代码10 项目: database   文件: TestFileChannelUtility.java
public AsynchronousFileChannel getAsyncChannel() {
	if (asyncChannel != null) {
		if (asyncChannel.isOpen())
			return asyncChannel;
	}

	synchronized (this) {
		if (asyncChannel != null) { // check again while synchronized
			if (asyncChannel.isOpen())
				return asyncChannel;
		}

		try {
			asyncChannel = AsynchronousFileChannel.open(path,
					StandardOpenOption.READ);
		} catch (IOException e) {
			throw new RuntimeException(e);
		}

		asyncChannelOpenCount++;

		return asyncChannel;
	}
}
 
@Test
public void writeAsynchronousFileChannelErrorInFlux() throws Exception {
	DataBuffer foo = stringBuffer("foo");
	DataBuffer bar = stringBuffer("bar");
	Flux<DataBuffer> flux =
			Flux.just(foo, bar).concatWith(Mono.error(new RuntimeException()));

	AsynchronousFileChannel channel =
			AsynchronousFileChannel.open(tempFile, StandardOpenOption.WRITE);

	Flux<DataBuffer> writeResult = DataBufferUtils.write(flux, channel);
	StepVerifier.create(writeResult)
			.consumeNextWith(stringConsumer("foo"))
			.consumeNextWith(stringConsumer("bar"))
			.expectError(RuntimeException.class)
			.verify();

	String result = String.join("", Files.readAllLines(tempFile));

	assertEquals("foobar", result);
	channel.close();
}
 
@Test
public void writeAsynchronousFileChannelCanceled() throws Exception {
	DataBuffer foo = stringBuffer("foo");
	DataBuffer bar = stringBuffer("bar");
	Flux<DataBuffer> flux = Flux.just(foo, bar);

	AsynchronousFileChannel channel =
			AsynchronousFileChannel.open(tempFile, StandardOpenOption.WRITE);

	Flux<DataBuffer> writeResult = DataBufferUtils.write(flux, channel);
	StepVerifier.create(writeResult, 1)
			.consumeNextWith(stringConsumer("foo"))
			.thenCancel()
			.verify();

	String result = String.join("", Files.readAllLines(tempFile));

	assertEquals("foo", result);
	channel.close();

	flux.subscribe(DataBufferUtils::release);
}
 
源代码13 项目: conga   文件: MessageLogWriter.java
/**
 * Open the log
 * 
 * @throws IOException if the log cannot be opened
 */
public void open() throws IOException {
  if (channel == null) {
    // if path has a directory, create full directory tree
    final Path parent = path.getParent();
    if (parent != null) {
      parent.toFile().mkdirs();
    }
    if (truncateExisting) {
      this.channel = AsynchronousFileChannel.open(path, StandardOpenOption.WRITE,
          StandardOpenOption.CREATE, StandardOpenOption.TRUNCATE_EXISTING);
    } else {
      this.channel =
          AsynchronousFileChannel.open(path, StandardOpenOption.WRITE, StandardOpenOption.CREATE);
    }
  }
  position.set(channel.size());
}
 
源代码14 项目: coroutines   文件: AsynchronousFileStep.java
/***************************************
 * A helper function that opens a file channel for a certain file name and
 * open options.
 *
 * @param  sFileName     The file name
 * @param  rMode         The open option for the file access mode (e.g.
 *                       READ, WRITE)
 * @param  rExtraOptions Optional extra file open options
 *
 * @return The file channel
 */
protected static AsynchronousFileChannel openFileChannel(
	String		  sFileName,
	OpenOption    rMode,
	OpenOption... rExtraOptions)
{
	try
	{
		return AsynchronousFileChannel.open(
			new File(sFileName).toPath(),
			CollectionUtil.join(rExtraOptions, rMode));
	}
	catch (IOException e)
	{
		throw new CoroutineException(e);
	}
}
 
源代码15 项目: coroutines   文件: AsynchronousFileStep.java
/***************************************
 * {@inheritDoc}
 */
@Override
protected ByteBuffer execute(
	ByteBuffer		rData,
	Continuation<?> rContinuation)
{
	try
	{
		AsynchronousFileChannel rChannel = getFileChannel(rContinuation);

		performBlockingOperation(rChannel, rData);
	}
	catch (Exception e)
	{
		throw new CoroutineException(e);
	}

	return rData;
}
 
源代码16 项目: coroutines   文件: AsynchronousFileStep.java
/***************************************
 * Returns the channel to be used by this step. This first checks the
 * currently exexcuting coroutine in the continuation parameter for an
 * existing {@link #FILE_CHANNEL} relation. If that doesn't exists or the
 * channel is closed a new {@link AsynchronousFileChannel} will be opened
 * and stored in the coroutine relation. Using the coroutine to store the
 * channel allows coroutines to be structured so that multiple subroutines
 * perform communication on different channels.
 *
 * @param  rContinuation The continuation to query for an existing channel
 *
 * @return The socket channel
 *
 * @throws IOException If opening the channel fails
 */
protected AsynchronousFileChannel getFileChannel(
	Continuation<?> rContinuation) throws IOException
{
	Coroutine<?, ?> rCoroutine = rContinuation.getCurrentCoroutine();

	AsynchronousFileChannel rChannel = rCoroutine.get(FILE_CHANNEL);

	if (rChannel == null || !rChannel.isOpen())
	{
		rChannel = fGetFileChannel.apply(rContinuation);
		rCoroutine.set(FILE_CHANNEL, rChannel).annotate(MANAGED);
	}

	return rChannel;
}
 
源代码17 项目: coroutines   文件: AsynchronousFileStep.java
/***************************************
 * Opens a {@link AsynchronousFileChannel} and then performs the channel
 * operation asynchronously.
 *
 * @param rData       The byte buffer of the data to be processed
 * @param rSuspension The coroutine suspension to be resumed when the
 *                    operation is complete
 */
private void transferAsync(
	ByteBuffer			   rData,
	Suspension<ByteBuffer> rSuspension)
{
	try
	{
		AsynchronousFileChannel rChannel =
			getFileChannel(rSuspension.continuation());

		performAsyncOperation(
			FIRST_OPERATION,
			rChannel,
			rData,
			new ChannelCallback<>(
				rChannel,
				rSuspension,
				this::performAsyncOperation));
	}
	catch (Exception e)
	{
		rSuspension.fail(e);
	}
}
 
源代码18 项目: database   文件: FileChannelUtility.java
/**
    * Schedule a read on the channel. If the operation was previously
    * schedule and is done (normal completion), then return immediately. If
    * the operation was previously schedule and was cancelled, then throws
    * out a CancellationException. If the operation was previously schedule
    * and failed, then the future is cleared and the operation is
    * rescheduled. This is done in order to allow us to complete a high
    * level read on the channel when the backing channel may have been
    * closed by an interrupt in another thread due to Java IO channel
    * semantics (e.g., driven by query termination during reads).
    * 
    * @param channel The channel.
    * 
    * @throws IllegalArgumentException
    * @throws NonReadableChannelException
    * @throws CancellationException
    * @throws InterruptedException
    */
   private void read(final AsynchronousFileChannel channel)
           throws IllegalArgumentException, NonReadableChannelException, CancellationException, InterruptedException {
       if (isDone()) { // Check for re-scheduling of the read().
	    try {
               /*
                * Note: It is either unlikely or impossible to have an
                * InterruptedException thrown out here since we know that
                * the Future isDone().
                */
	        m_fut.get(); // throws CancellationException, ExecutionException, InterruptedException. 
           } catch (ExecutionException ex) {
               /*
                * This read() had failed. We clear future so we can re-do
                * the read.
                */
	        m_fut = null;
	    }
	}
	if(!isDone()) {
		// ensure buffer is ready
		m_buffer.reset();
		m_fut = channel.read(m_buffer,  m_addr); // throws IllegalArgumentException, NonReadableChannelException
	}
}
 
源代码19 项目: night-config   文件: WriteAsyncFileConfig.java
private void save(boolean saveLaterIfWriting) {
	// atomically sets to true if false:
	boolean canSaveNow = currentlyWriting.compareAndSet(false, true);
	if (canSaveNow) {// no writing is in progress: start one immediately
		// Writes the config data to a ByteBuffer
		Charray builder = new Charray(512);
		writer.write(config, builder.asOutput());
		CharBuffer chars = CharBuffer.wrap(builder);
		ByteBuffer buffer = charset.encode(chars);

		// Writes the ByteBuffer to the nioPath, asynchronously
		synchronized (channelGuard) {
			try {
				channel = AsynchronousFileChannel.open(nioPath, openOptions);
				channel.write(buffer, channel.size(), null, writeCompletedHandler);
			} catch (IOException e) {
				writeCompletedHandler.failed(e, null);
			}
		}
	} else if (saveLaterIfWriting) {// there is a writing in progress: start one later
		mustWriteAgain.set(true);
	}
}
 
源代码20 项目: javase   文件: ProgMainNio.java
public void readFile(String filePath) throws IOException {
  	Path path = Paths.get(filePath);
      AsynchronousFileChannel afc = AsynchronousFileChannel.open(path, READ);
      ReadHandler handler = new ReadHandler();
      int fileSize = (int) afc.size();
      ByteBuffer dataBuffer = ByteBuffer.allocate(fileSize);

      Attachment attach = new Attachment();
      attach.asyncChannel = afc;
      attach.buffer = dataBuffer;
      attach.path = path;

      afc.read(dataBuffer, 0, attach, handler);

      System.out.println("Sleeping for 5  seconds...");
   try {
	Thread.sleep(5000);
} catch (InterruptedException e) {
	// TODO Auto-generated catch block
	e.printStackTrace();
}
  }
 
源代码21 项目: javase   文件: ProgMainNio.java
private void writeFile(String filePath, String input) throws IOException {
  	Path path = Paths.get(filePath);
  	AsynchronousFileChannel afc = AsynchronousFileChannel.open(path, WRITE, CREATE);
  	
  	WriteHandler handler = new WriteHandler();
   ByteBuffer dataBuffer = ByteBuffer.wrap(input.getBytes());//getDataBuffer();
   Attachment attach = new Attachment();
   
   attach.asyncChannel = afc;
   attach.buffer = dataBuffer;
   attach.path = path;
	
   afc.write(dataBuffer, 0, attach, handler);
	
   System.out.println("Sleeping for 3 seconds...");
   try {
	Thread.sleep(3000);
} catch (InterruptedException e) {
	// TODO Auto-generated catch block
	e.printStackTrace();
}
  }
 
源代码22 项目: javase   文件: ProgMainNio.java
private void readFile(String filePath) throws IOException {
  	Path path = Paths.get(filePath);
      AsynchronousFileChannel afc = AsynchronousFileChannel.open(path, READ);
      ReadHandler handler = new ReadHandler();
      int fileSize = (int) afc.size();
      ByteBuffer dataBuffer = ByteBuffer.allocate(fileSize);

      Attachment attach = new Attachment();
      attach.asyncChannel = afc;
      attach.buffer = dataBuffer;
      attach.path = path;

      afc.read(dataBuffer, 0, attach, handler);

      System.out.println("Sleeping for 5  seconds...");
   try {
	Thread.sleep(5000);
} catch (InterruptedException e) {
	// TODO Auto-generated catch block
	e.printStackTrace();
}
  }
 
源代码23 项目: aws-sdk-java-v2   文件: FileAsyncRequestBody.java
@Override
public void subscribe(Subscriber<? super ByteBuffer> s) {
    try {
        AsynchronousFileChannel channel = openInputChannel(this.path);

        // We need to synchronize here because the subscriber could call
        // request() from within onSubscribe which would potentially
        // trigger onNext before onSubscribe is finished.
        Subscription subscription = new FileSubscription(channel, s, chunkSizeInBytes);
        synchronized (subscription) {
            s.onSubscribe(subscription);
        }
    } catch (IOException e) {
        // subscribe() must return normally, so we need to signal the
        // failure to open via onError() once onSubscribe() is signaled.
        s.onSubscribe(new NoopSubscription(s));
        s.onError(e);
    }
}
 
源代码24 项目: lucene-solr   文件: TestLeakFS.java
/** Test leaks via AsynchronousFileChannel.open */
public void testLeakAsyncFileChannel() throws IOException, InterruptedException {
  Path dir = wrap(createTempDir());
  
  OutputStream file = Files.newOutputStream(dir.resolve("stillopen"));
  file.write(5);
  file.close();

  ExecutorService executorService = Executors.newFixedThreadPool(1,
      new NamedThreadFactory("async-io"));
  try {
    AsynchronousFileChannel leak = AsynchronousFileChannel.open(dir.resolve("stillopen"),
        Collections.emptySet(), executorService);
    Exception e = expectThrows(Exception.class, () -> dir.getFileSystem().close());
    assertTrue(e.getMessage().contains("file handle leaks"));
    leak.close();
  } finally {
    executorService.shutdown();
    executorService.awaitTermination(5, TimeUnit.SECONDS);
  }
}
 
源代码25 项目: lucene-solr   文件: TestVerboseFS.java
/** Test AsynchronousFileChannel.open */
public void testAsyncFileChannel() throws IOException, InterruptedException {
  InfoStreamListener stream = new InfoStreamListener("newAsynchronousFileChannel");
  Path dir = wrap(createTempDir(), stream);

  ExecutorService executorService = Executors.newFixedThreadPool(1,
      new NamedThreadFactory("async-io"));
  try {
    Set<StandardOpenOption> opts = Set
        .of(StandardOpenOption.CREATE_NEW, StandardOpenOption.READ,
            StandardOpenOption.WRITE);
    AsynchronousFileChannel channel = AsynchronousFileChannel
        .open(dir.resolve("foobar"), opts, executorService);
    assertTrue(stream.sawMessage());
    channel.close();

    expectThrows(IOException.class, () -> AsynchronousFileChannel.open(dir.resolve("foobar"),
        opts, executorService));

    expectThrows(NoSuchFileException.class,
        () -> AsynchronousFileChannel.open(dir.resolve("doesNotExist.rip")));
  } finally {
    executorService.shutdown();
    executorService.awaitTermination(5, TimeUnit.SECONDS);
  }
}
 
源代码26 项目: spring-analysis-note   文件: DataBufferUtils.java
public ReadCompletionHandler(AsynchronousFileChannel channel,
		FluxSink<DataBuffer> sink, long position, DataBufferFactory dataBufferFactory, int bufferSize) {

	this.channel = channel;
	this.sink = sink;
	this.position = new AtomicLong(position);
	this.dataBufferFactory = dataBufferFactory;
	this.bufferSize = bufferSize;
}
 
源代码27 项目: spring-analysis-note   文件: DataBufferUtils.java
public WriteCompletionHandler(
		FluxSink<DataBuffer> sink, AsynchronousFileChannel channel, long position) {

	this.sink = sink;
	this.channel = channel;
	this.position = new AtomicLong(position);
}
 
@Test
public void readAsynchronousFileChannelPosition() throws Exception {
	URI uri = this.resource.getURI();
	Flux<DataBuffer> flux = DataBufferUtils.readAsynchronousFileChannel(
			() -> AsynchronousFileChannel.open(Paths.get(uri), StandardOpenOption.READ),
			9, this.bufferFactory, 3);

	StepVerifier.create(flux)
			.consumeNextWith(stringConsumer("qux"))
			.expectComplete()
			.verify(Duration.ofSeconds(5));
}
 
@Test
public void readAsynchronousFileChannelCancel() throws Exception {
	URI uri = this.resource.getURI();
	Flux<DataBuffer> flux = DataBufferUtils.readAsynchronousFileChannel(
			() -> AsynchronousFileChannel.open(Paths.get(uri), StandardOpenOption.READ),
			this.bufferFactory, 3);

	StepVerifier.create(flux)
			.consumeNextWith(stringConsumer("foo"))
			.thenCancel()
			.verify();
}
 
@Test
public void writeAsynchronousFileChannel() throws Exception {
	DataBuffer foo = stringBuffer("foo");
	DataBuffer bar = stringBuffer("bar");
	DataBuffer baz = stringBuffer("baz");
	DataBuffer qux = stringBuffer("qux");
	Flux<DataBuffer> flux = Flux.just(foo, bar, baz, qux);

	AsynchronousFileChannel channel =
			AsynchronousFileChannel.open(tempFile, StandardOpenOption.WRITE);

	Flux<DataBuffer> writeResult = DataBufferUtils.write(flux, channel);
	verifyWrittenData(writeResult);
	channel.close();
}
 
 类所在包
 同包方法