下面列出了怎么用org.apache.hadoop.fs.ByteBufferUtil的API类实例代码及写法,或者点击链接到github查看源代码。
@Override
public ByteBuffer read(ByteBufferPool bufferPool, int maxLength,
EnumSet<ReadOption> opts)
throws IOException, UnsupportedOperationException {
try {
return ((HasEnhancedByteBufferAccess)in).read(bufferPool,
maxLength, opts);
}
catch (ClassCastException e) {
ByteBuffer buffer = ByteBufferUtil.
fallbackRead(this, bufferPool, maxLength);
if (buffer != null) {
extendedReadBuffers.put(buffer, bufferPool);
}
return buffer;
}
}
@Override
public ByteBuffer read(ByteBufferPool bufferPool, int maxLength,
EnumSet<ReadOption> opts)
throws IOException, UnsupportedOperationException {
try {
return ((HasEnhancedByteBufferAccess)in).read(bufferPool,
maxLength, opts);
}
catch (ClassCastException e) {
ByteBuffer buffer = ByteBufferUtil.
fallbackRead(this, bufferPool, maxLength);
if (buffer != null) {
extendedReadBuffers.put(buffer, bufferPool);
}
return buffer;
}
}
@Override
public synchronized ByteBuffer read(ByteBufferPool bufferPool,
int maxLength, EnumSet<ReadOption> opts)
throws IOException, UnsupportedOperationException {
if (maxLength == 0) {
return EMPTY_BUFFER;
} else if (maxLength < 0) {
throw new IllegalArgumentException("can't read a negative " +
"number of bytes.");
}
if ((blockReader == null) || (blockEnd == -1)) {
if (pos >= getFileLength()) {
return null;
}
/*
* If we don't have a blockReader, or the one we have has no more bytes
* left to read, we call seekToBlockSource to get a new blockReader and
* recalculate blockEnd. Note that we assume we're not at EOF here
* (we check this above).
*/
if ((!seekToBlockSource(pos)) || (blockReader == null)) {
throw new IOException("failed to allocate new BlockReader " +
"at position " + pos);
}
}
ByteBuffer buffer = null;
if (dfsClient.getConf().shortCircuitMmapEnabled) {
buffer = tryReadZeroCopy(maxLength, opts);
}
if (buffer != null) {
return buffer;
}
buffer = ByteBufferUtil.fallbackRead(this, bufferPool, maxLength);
if (buffer != null) {
getExtendedReadBuffers().put(buffer, bufferPool);
}
return buffer;
}
@Override
public synchronized ByteBuffer read(ByteBufferPool bufferPool,
int maxLength, EnumSet<ReadOption> opts)
throws IOException, UnsupportedOperationException {
if (maxLength == 0) {
return EMPTY_BUFFER;
} else if (maxLength < 0) {
throw new IllegalArgumentException("can't read a negative " +
"number of bytes.");
}
if ((blockReader == null) || (blockEnd == -1)) {
if (pos >= getFileLength()) {
return null;
}
/*
* If we don't have a blockReader, or the one we have has no more bytes
* left to read, we call seekToBlockSource to get a new blockReader and
* recalculate blockEnd. Note that we assume we're not at EOF here
* (we check this above).
*/
if ((!seekToBlockSource(pos)) || (blockReader == null)) {
throw new IOException("failed to allocate new BlockReader " +
"at position " + pos);
}
}
ByteBuffer buffer = null;
if (dfsClient.getConf().shortCircuitMmapEnabled) {
buffer = tryReadZeroCopy(maxLength, opts);
}
if (buffer != null) {
return buffer;
}
buffer = ByteBufferUtil.fallbackRead(this, bufferPool, maxLength);
if (buffer != null) {
getExtendedReadBuffers().put(buffer, bufferPool);
}
return buffer;
}
/**
* Read the list of ranges from the file.
* @param fs FileSystem object to get block locations of the file
* @param file the file to read
* @param base the base of the stripe
* @param range the disk ranges within the stripe to read
* @return the bytes read for each disk range, which is the same length as
* ranges
* @throws IOException
*/
private DiskRangeList readDiskRanges(FileSystem fs, FSDataInputStream file,
Path path, HadoopShims.ZeroCopyReaderShim zcr, ByteBufferAllocatorPool pool, boolean useDirectMemory, long base, DiskRangeList range) throws IOException {
if (range == null) {
return null;
}
computeLocality(fs, path, range);
DiskRangeList prev = range.prev;
if (prev == null) {
prev = new DiskRangeList.MutateHelper(range);
}
while (range != null) {
if (range.hasData()) {
range = range.next;
continue;
}
int len = (int) (range.getEnd() - range.getOffset());
long off = range.getOffset();
if (useDirectMemory) {
file.seek(base + off);
boolean hasReplaced = false;
while (len > 0) {
ByteBuffer partial;
if (zcr != null) {
partial = zcr.readBuffer(len, false);
} else {
// in the zero copy read path, when memory mapped file does not exist,
// hadoop client uses following call to read using direct memory
partial = ByteBufferUtil.fallbackRead(file, pool, len);
}
BufferChunk bc = new BufferChunk(partial, off);
if (!hasReplaced) {
range.replaceSelfWith(bc);
hasReplaced = true;
} else {
range.insertAfter(bc);
}
range = bc;
int read = partial.remaining();
len -= read;
off += read;
}
} else {
byte[] buffer = new byte[len];
file.readFully((base + off), buffer, 0, buffer.length);
ByteBuffer bb = ByteBuffer.wrap(buffer);
range = range.replaceSelfWith(new BufferChunk(bb, range.getOffset()));
}
range = range.next;
}
return prev.next;
}
/**
* Read the list of ranges from the file.
* @param fs FileSystem object to get block locations of the file
* @param file the file to read
* @param base the base of the stripe
* @param range the disk ranges within the stripe to read
* @return the bytes read for each disk range, which is the same length as
* ranges
* @throws IOException
*/
private DiskRangeList readDiskRanges(FileSystem fs, FSDataInputStream file,
Path path, HadoopShims.ZeroCopyReaderShim zcr, ByteBufferAllocatorPool pool, boolean useDirectMemory, long base, DiskRangeList range) throws IOException {
if (range == null) {
return null;
}
computeLocality(fs, path, range);
DiskRangeList prev = range.prev;
if (prev == null) {
prev = new DiskRangeList.MutateHelper(range);
}
while (range != null) {
if (range.hasData()) {
range = range.next;
continue;
}
int len = (int) (range.getEnd() - range.getOffset());
long off = range.getOffset();
if (useDirectMemory) {
file.seek(base + off);
boolean hasReplaced = false;
while (len > 0) {
ByteBuffer partial;
if (zcr != null) {
partial = zcr.readBuffer(len, false);
} else {
// in the zero copy read path, when memory mapped file does not exist,
// hadoop client uses following call to read using direct memory
partial = ByteBufferUtil.fallbackRead(file, pool, len);
}
buffersToRelease.add(partial);
BufferChunk bc = new BufferChunk(partial, off);
if (!hasReplaced) {
range.replaceSelfWith(bc);
hasReplaced = true;
} else {
range.insertAfter(bc);
}
range = bc;
int read = partial.remaining();
len -= read;
off += read;
}
} else {
byte[] buffer = new byte[len];
file.readFully((base + off), buffer, 0, buffer.length);
ByteBuffer bb = ByteBuffer.wrap(buffer);
range = range.replaceSelfWith(new BufferChunk(bb, range.getOffset()));
}
range = range.next;
}
return prev.next;
}