下面列出了怎么用java.nio.channels.WritableByteChannel的API类实例代码及写法,或者点击链接到github查看源代码。
private void sendAllEntries(BlockingQueue<NIOSendEntry> q, WritableByteChannel c, SelectionKey key) {
NIOSendEntry e = null;
while (true) {
// get one entry from queue
synchronized(queues) {
e = q.peek();
if (e == null) {
queues.remove(c);
key.cancel();
registered.remove(c);
return;
}
}
if (sendAllBuffers(e, c, key)) {
notifySender(e);
q.remove();
}
else {
return;
}
}
}
@Override
public long transferTo(WritableByteChannel target, long position) throws IOException {
if (this.byteBufferHeader.hasRemaining()) {
transferred += target.write(this.byteBufferHeader);
return transferred;
} else {
List<ByteBuffer> messageBufferList = this.queryMessageResult.getMessageBufferList();
for (ByteBuffer bb : messageBufferList) {
if (bb.hasRemaining()) {
transferred += target.write(bb);
return transferred;
}
}
}
return 0;
}
public void write(WritableByteChannel outChannel, byte[] data) throws IOException {
int maskedCrc32OfLength = hashLong(data.length);
int maskedCrc32OfData = hashBytes(data);
header.clear();
header.putLong(data.length).putInt(maskedCrc32OfLength);
header.rewind();
writeFully(outChannel, header);
writeFully(outChannel, ByteBuffer.wrap(data));
footer.clear();
footer.putInt(maskedCrc32OfData);
footer.rewind();
writeFully(outChannel, footer);
}
/**
* Read all available bytes from one channel and copy them to the other.
* Can be more efficient when the target is a {@link FileChannel}
* and the size of the source is known.
*
* @param in
* @param out
* @param sourceSize the size of the source or 0 when unknown
*
* @throws IOException
*/
public static void copy(ReadableByteChannel in, WritableByteChannel out, long sourceSize) throws IOException {
if ((out instanceof FileChannel) && (sourceSize > 0)) {
FileChannel outputFileChannel = (FileChannel) out;
long pos = 0;
long transferred = 0;
long count = 0;
while (pos < sourceSize) {
count = Math.min(Defaults.MAX_MAPPED_FILE_TRANSFER_SIZE, sourceSize - pos); // CL-2313
transferred = outputFileChannel.transferFrom(in, pos, count);
if (transferred == 0) {
break;
}
pos += transferred;
}
if (pos != sourceSize) {
throw new IOException(String.format("Failed to copy the whole content: expected %d, transferred %d bytes", sourceSize, pos));
}
} else {
copy(in, out);
}
}
/**
* Helper to generate files for testing.
*
* @param filePath The path to the file to write.
* @param lines The lines to write.
* @param compression The compression type of the file.
* @return The file written.
* @throws IOException If an error occurs while creating or writing the file.
*/
public static ResourceId writeToFile(
String filePath, List<String> lines, Compression compression) throws IOException {
String fileContents = String.join(System.lineSeparator(), lines);
ResourceId resourceId = FileSystems.matchNewResource(filePath, false);
String mimeType =
compression == Compression.UNCOMPRESSED ? MimeTypes.TEXT : MimeTypes.BINARY;
// Write the file contents to the channel and close.
try (ReadableByteChannel readChannel =
Channels.newChannel(new ByteArrayInputStream(fileContents.getBytes()))) {
try (WritableByteChannel writeChannel =
compression.writeCompressed(FileSystems.create(resourceId, mimeType))) {
ByteStreams.copy(readChannel, writeChannel);
}
}
return resourceId;
}
@Override
public long transferTo(WritableByteChannel target, long position) throws IOException {
if (this.byteBufferHeader.hasRemaining()) {
transfered += target.write(this.byteBufferHeader);
return transfered;
} else {
List<ByteBuffer> messageBufferList = this.queryMessageResult.getMessageBufferList();
for (ByteBuffer bb : messageBufferList) {
if (bb.hasRemaining()) {
transfered += target.write(bb);
return transfered;
}
}
}
return 0;
}
@Test
public void writeWritableByteChannelErrorInFlux() throws Exception {
DataBuffer foo = stringBuffer("foo");
DataBuffer bar = stringBuffer("bar");
Flux<DataBuffer> flux = Flux.just(foo, bar).concatWith(Flux.error(new RuntimeException()));
WritableByteChannel channel = Files.newByteChannel(tempFile, StandardOpenOption.WRITE);
Flux<DataBuffer> writeResult = DataBufferUtils.write(flux, channel);
StepVerifier.create(writeResult)
.consumeNextWith(stringConsumer("foo"))
.consumeNextWith(stringConsumer("bar"))
.expectError()
.verify(Duration.ofSeconds(5));
String result = String.join("", Files.readAllLines(tempFile));
assertEquals("foobar", result);
channel.close();
}
@Override
public void process(final OutputStream out) throws IOException {
try {
long totalBytes = 0L;
try (WritableByteChannel wbc = Channels.newChannel(new BufferedOutputStream(out))) {
ByteBuffer buffer = null;
while ((buffer = filledBuffers.poll(50, TimeUnit.MILLISECONDS)) != null) {
int bytesWrittenThisPass = 0;
try {
while (buffer.hasRemaining()) {
bytesWrittenThisPass += wbc.write(buffer);
}
totalBytes += bytesWrittenThisPass;
if (totalBytes > fileSizeTrigger || flowFilePerDatagram) {
break;// this is enough data
}
} finally {
bufferPool.returnBuffer(buffer, bytesWrittenThisPass);
}
}
}
} catch (final InterruptedException ie) {
// irrelevant
}
}
@Override
public long writeTo(WritableByteChannel channel) throws IOException {
long written = 0;
if (!isSendComplete()) {
written = readSet.writeTo(currentWriteIndex, channel,
sendInfoList.get(currentWriteIndex).relativeOffset() + sizeWrittenFromCurrentIndex,
sendInfoList.get(currentWriteIndex).sizetoSend() - sizeWrittenFromCurrentIndex);
logger.trace("writeindex {} relativeOffset {} maxSize {} written {}", currentWriteIndex,
sendInfoList.get(currentWriteIndex).relativeOffset() + sizeWrittenFromCurrentIndex,
sendInfoList.get(currentWriteIndex).sizetoSend() - sizeWrittenFromCurrentIndex, written);
sizeWritten += written;
sizeWrittenFromCurrentIndex += written;
logger.trace("size written in this loop : {} size written till now : {}", written, sizeWritten);
if (sizeWrittenFromCurrentIndex == sendInfoList.get(currentWriteIndex).sizetoSend()) {
currentWriteIndex++;
sizeWrittenFromCurrentIndex = 0;
}
}
return written;
}
private void writeToWithExtraCopy(long offsetBytes, long lengthBytes,
final WritableByteChannel out) throws IOException {
// Keep the bufLen a multiple of 8, to maybe allow getByteArray() to go a faster path.
final int bufLen = Ints.checkedCast(Math.max(8, Math.min((getCapacity() / 1024) & ~7L, 4096)));
final byte[] buf = new byte[bufLen];
final ByteBuffer bufToWrite = ByteBuffer.wrap(buf);
while (lengthBytes > 0) {
final int chunk = (int) Math.min(buf.length, lengthBytes);
getByteArray(offsetBytes, buf, 0, chunk);
bufToWrite.clear().limit(chunk);
writeFully(bufToWrite, out);
offsetBytes += chunk;
lengthBytes -= chunk;
}
}
public void getBox(WritableByteChannel writableByteChannel) throws IOException {
ByteBuffer header = ByteBuffer.allocate(l2i(getSize()));
IsoTypeWriter.writeUInt32(header, getSize());
header.put(IsoFile.fourCCtoBytes(getType()));
header.put(Utf8.convert(content));
writableByteChannel.write((ByteBuffer)((Buffer)header).rewind());
}
@Test
public void decodesProtostuffMessagesItEncodes() throws IOException {
final InputStream readFromMe = new PipedInputStream(pipedOutputStream);
final WritableByteChannel writeToMe = Channels.newChannel(pipedOutputStream);
final List<ByteBuffer> serialized = encodeWithLengthAndCrc(SCHEMA, TEST_ENTRY);
writeAllToChannel(serialized, writeToMe);
final OLogEntryHeader decodedMessage = decodeAndCheckCrc(readFromMe, SCHEMA);
assertThat(decodedMessage, is(theSameMessageAs(TEST_ENTRY)));
}
public long transferTo(long position, long count,
WritableByteChannel target)
throws IOException
{
ensureOpen();
if (!target.isOpen())
throw new ClosedChannelException();
if (!readable)
throw new NonReadableChannelException();
if (target instanceof FileChannelImpl &&
!((FileChannelImpl)target).writable)
throw new NonWritableChannelException();
if ((position < 0) || (count < 0))
throw new IllegalArgumentException();
long sz = size();
if (position > sz)
return 0;
int icount = (int)Math.min(count, Integer.MAX_VALUE);
if ((sz - position) < icount)
icount = (int)(sz - position);
long n;
// Attempt a direct transfer, if the kernel supports it
if ((n = transferToDirectly(position, icount, target)) >= 0)
return n;
// Attempt a mapped transfer, but only to trusted channel types
if ((n = transferToTrustedChannel(position, icount, target)) >= 0)
return n;
// Slow path for untrusted targets
return transferToArbitraryChannel(position, icount, target);
}
private long transferToDirectlyInternal(long position, int icount,
WritableByteChannel target,
FileDescriptor targetFD)
throws IOException
{
assert !nd.transferToDirectlyNeedsPositionLock() ||
Thread.holdsLock(positionLock);
long n = -1;
int ti = -1;
try {
begin();
ti = threads.add();
if (!isOpen())
return -1;
do {
n = transferTo0(fd, position, icount, targetFD);
} while ((n == IOStatus.INTERRUPTED) && isOpen());
if (n == IOStatus.UNSUPPORTED_CASE) {
if (target instanceof SinkChannelImpl)
pipeSupported = false;
if (target instanceof FileChannelImpl)
fileSupported = false;
return IOStatus.UNSUPPORTED_CASE;
}
if (n == IOStatus.UNSUPPORTED) {
// Don't bother trying again
transferSupported = false;
return IOStatus.UNSUPPORTED;
}
return IOStatus.normalize(n);
} finally {
threads.remove(ti);
end (n > -1);
}
}
/**
* Creates and opens an object for writing.
*
* @param path Object full path of the form gs://bucket/object-path.
* @return A channel for writing to the given object.
*/
public WritableByteChannel create(URI path, CreateFileOptions options) throws IOException {
logger.atFine().log("create(path: %s, options: %s)", path, options);
Preconditions.checkNotNull(path, "path could not be null");
StorageResourceId resourceId =
StorageResourceId.fromUriPath(path, /* allowEmptyObjectName=*/ true);
if (resourceId.isDirectory()) {
throw new IOException(
String.format(
"Cannot create a file whose name looks like a directory: '%s'", resourceId));
}
// Check if a directory of that name exists.
if (options.checkNoDirectoryConflict()
&& getFileInfoInternal(
resourceId.toDirectoryId(), gcs.getOptions().isInferImplicitDirectoriesEnabled())
.exists()) {
throw new FileAlreadyExistsException("A directory with that name exists: " + path);
}
// Ensure that parent directories exist.
if (options.ensureParentDirectoriesExist()) {
URI parentPath = UriPaths.getParentPath(path);
if (parentPath != null) {
mkdirs(parentPath);
}
}
if (options.getExistingGenerationId() != StorageResourceId.UNKNOWN_GENERATION_ID) {
resourceId =
new StorageResourceId(
resourceId.getBucketName(),
resourceId.getObjectName(),
options.getExistingGenerationId());
}
return gcs.create(resourceId, objectOptionsFromFileOptions(options));
}
private long transferToDirectlyInternal(long position, int icount,
WritableByteChannel target,
FileDescriptor targetFD)
throws IOException
{
assert !nd.transferToDirectlyNeedsPositionLock() ||
Thread.holdsLock(positionLock);
long n = -1;
int ti = -1;
try {
begin();
ti = threads.add();
if (!isOpen())
return -1;
do {
n = transferTo0(fd, position, icount, targetFD);
} while ((n == IOStatus.INTERRUPTED) && isOpen());
if (n == IOStatus.UNSUPPORTED_CASE) {
if (target instanceof SinkChannelImpl)
pipeSupported = false;
if (target instanceof FileChannelImpl)
fileSupported = false;
return IOStatus.UNSUPPORTED_CASE;
}
if (n == IOStatus.UNSUPPORTED) {
// Don't bother trying again
transferSupported = false;
return IOStatus.UNSUPPORTED;
}
return IOStatus.normalize(n);
} finally {
threads.remove(ti);
end (n > -1);
}
}
/** @deprecated do not use */
@Deprecated
public long readToByteChannel(Section section, WritableByteChannel wbc) throws IOException, InvalidRangeException {
if ((ncfile == null) || hasCachedData())
return IospHelper.copyToByteChannel(read(section), wbc);
return ncfile.readToByteChannel(this, section, wbc);
}
@Override
public long writeTo(int index, WritableByteChannel channel, long relativeOffset, long maxSize) throws IOException {
buffers.get(index).position((int) relativeOffset);
buffers.get(index).limit((int) Math.min(buffers.get(index).limit(), relativeOffset + maxSize));
int written = channel.write(buffers.get(index));
buffers.get(index).clear();
return written;
}
/**
* Write to cloud storage using the FileSystems API. See https://stackoverflow.com/a/50050583.
*/
private void writeToStorage(String path, byte[] data) throws Exception {
ResourceId resourceId = FileSystems.matchNewResource(path, false);
try (ByteArrayInputStream inputStream = new ByteArrayInputStream(data);
ReadableByteChannel readerChannel = Channels.newChannel(inputStream);
WritableByteChannel writerChannel = FileSystems.create(resourceId, MimeTypes.BINARY)) {
ByteStreams.copy(readerChannel, writerChannel);
}
}
@Override
public WritableByteChannel create(StorageResourceId resourceId) throws IOException {
// If the item exists in cache upon creation, remove it from cache so that later getItemInfo
// will pull the most updated item info.
if (cache.getItem(resourceId) != null) {
cache.removeItem(resourceId);
}
return super.create(resourceId);
}
private void writeWithHeader(WritableByteChannel channel, byte[] body) throws IOException {
synchronized (writeDataLock) {
writeHeader.clear();
writeHeader.putInt(body.length);
writeHeader.flip();
channel.write(writeHeader);
channel.write(ByteBuffer.wrap(body));
}
}
public static void writeFromFile(NetcdfFile fileIn, String fileOutName) throws IOException, InvalidRangeException {
try (FileOutputStream stream = new FileOutputStream(fileOutName);
WritableByteChannel channel = stream.getChannel();
DataOutputStream dout = new DataOutputStream(Channels.newOutputStream(channel))) {
N3channelWriter writer = new N3channelWriter(fileIn);
int numrec = fileIn.getUnlimitedDimension() == null ? 0 : fileIn.getUnlimitedDimension().getLength();
writer.writeHeader(dout, numrec);
dout.flush();
writer.writeDataAll(channel);
}
}
/**
* Write ncfile to a WritableByteChannel.
*
* @param ncfile the file to write
* @param wbc write to this WritableByteChannel.
* If its a Socket, must have been opened through a call to java.nio.channels.SocketChannel.open()
* @throws IOException on IO error
* @throws InvalidRangeException range error
*/
public static void writeToChannel(NetcdfFile ncfile, WritableByteChannel wbc)
throws IOException, InvalidRangeException {
try (
DataOutputStream stream = new DataOutputStream(new BufferedOutputStream(Channels.newOutputStream(wbc), 8000))) {
// DataOutputStream stream = new DataOutputStream(Channels.newOutputStream(wbc)); // buffering seems to improve by
// 5%
N3channelWriter writer = new N3channelWriter(ncfile);
int numrec = ncfile.getUnlimitedDimension() == null ? 0 : ncfile.getUnlimitedDimension().getLength();
writer.writeHeader(stream, numrec);
stream.flush();
writer.writeDataAll(wbc);
}
}
public static void copyAndClose(InputStream input, OutputStream output) throws IOException {
ReadableByteChannel srcChannel = Channels.newChannel(input);
WritableByteChannel destChannel = Channels.newChannel(output);
try {
copy(srcChannel, destChannel);
}
finally {
srcChannel.close();
destChannel.close();
}
}
public ChannelBufferUnsafeFramedOutputStream(WritableByteChannel channel,
int bufferSize) throws IOException {
super(channel, bufferSize);
// position the buffer to skip the length of frame at the start
this.addrPosition += 4;
this.doWriteFrameSize = true;
}
/**
* Write Succinct data structures to a DataOutputStream.
*
* @param os Output stream to write data to.
* @throws IOException
*/
public void writeToStream(DataOutputStream os) throws IOException {
WritableByteChannel dataChannel = Channels.newChannel(os);
os.writeInt(getOriginalSize());
os.writeInt(getSamplingRateSA());
os.writeInt(getSamplingRateISA());
os.writeInt(getSamplingRateNPA());
os.writeInt(getSampleBitWidth());
os.writeInt(getAlphabetSize());
for (int i = 0; i < getAlphabetSize(); i++) {
os.writeInt(alphabet[i]);
}
for (int i = 0; i < sa.limit(); i++) {
os.writeLong(sa.get(i));
}
for (int i = 0; i < isa.limit(); i++) {
os.writeLong(isa.get(i));
}
for (int i = 0; i < columnoffsets.limit(); i++) {
os.writeInt(columnoffsets.get(i));
}
for (int i = 0; i < columns.length; i++) {
os.writeInt(columns[i].limit());
dataChannel.write(columns[i].order(ByteOrder.BIG_ENDIAN));
columns[i].rewind();
}
}
/**
* Serialize.
*
* The current bitmap is not modified.
*
* @param out the DataOutput stream
* @throws IOException Signals that an I/O exception has occurred.
*/
@Override
public void serialize(DataOutput out) throws IOException {
if (buffer.hasArray()) {
out.write(buffer.array(), buffer.arrayOffset(), buffer.limit());
} else {
ByteBuffer tmp = buffer.duplicate();
tmp.position(0);
try (WritableByteChannel channel = Channels.newChannel((OutputStream) out)) {
channel.write(tmp);
}
}
}
public Builder(WritableByteChannel out, byte[] checksumSeed)
{
assert out != null;
assert checksumSeed != null;
_out = out;
_checksumSeed = checksumSeed;
}
@Test
public void testCircular() throws IOException {
ByteBuffer datagram5 = createDatagram(5);
ByteBuffer datagram3 = createDatagram(3);
DatagramBuffer datagramBuffer = new DatagramBuffer(14);
ByteArrayOutputStream bos = new ByteArrayOutputStream();
WritableByteChannel channel = Channels.newChannel(bos);
// write and consume 10 bytes
datagramBuffer.readFrom(createDatagram(10));
datagramBuffer.writeTo(Channels.newChannel(new ByteArrayOutputStream())); // forget
// DatagramBuffer is expected to store the whole datagram (even if it exceeds its "capacity")
datagramBuffer.readFrom(datagram5);
datagramBuffer.readFrom(datagram3);
datagramBuffer.writeTo(channel);
byte[] result = bos.toByteArray();
Assert.assertArrayEquals(datagram5.array(), result);
bos.reset();
datagramBuffer.writeTo(channel);
result = bos.toByteArray();
Assert.assertArrayEquals(datagram3.array(), result);
}
@Override
public long writeTo(WritableByteChannel channel) throws IOException {
if (bufferToSend == null) {
bufferToSend = ByteBuffer.allocate((int) sizeInBytes());
writeHeader();
bufferToSend.putInt(replicaMetadataRequestInfoList.size());
for (ReplicaMetadataRequestInfo replicaMetadataRequestInfo : replicaMetadataRequestInfoList) {
replicaMetadataRequestInfo.writeTo(bufferToSend);
}
bufferToSend.putLong(maxTotalSizeOfEntriesInBytes);
bufferToSend.flip();
}
return bufferToSend.remaining() > 0 ? channel.write(bufferToSend) : 0;
}