下面列出了怎么用org.apache.hadoop.io.ByteBufferPool的API类实例代码及写法,或者点击链接到github查看源代码。
@Override
public ByteBuffer read(ByteBufferPool bufferPool, int maxLength,
EnumSet<ReadOption> opts)
throws IOException, UnsupportedOperationException {
try {
return ((HasEnhancedByteBufferAccess)in).read(bufferPool,
maxLength, opts);
}
catch (ClassCastException e) {
ByteBuffer buffer = ByteBufferUtil.
fallbackRead(this, bufferPool, maxLength);
if (buffer != null) {
extendedReadBuffers.put(buffer, bufferPool);
}
return buffer;
}
}
@Override
public ByteBuffer read(ByteBufferPool bufferPool, int maxLength,
EnumSet<ReadOption> opts) throws IOException,
UnsupportedOperationException {
if (bufferPool == null) {
throw new IOException("Please specify buffer pool.");
}
ByteBuffer buffer = bufferPool.getBuffer(true, maxLength);
int pos = buffer.position();
int n = read(buffer);
if (n >= 0) {
buffer.position(pos);
return buffer;
}
return null;
}
@Override
public ByteBuffer read(ByteBufferPool bufferPool, int maxLength,
EnumSet<ReadOption> opts)
throws IOException, UnsupportedOperationException {
try {
return ((HasEnhancedByteBufferAccess)in).read(bufferPool,
maxLength, opts);
}
catch (ClassCastException e) {
ByteBuffer buffer = ByteBufferUtil.
fallbackRead(this, bufferPool, maxLength);
if (buffer != null) {
extendedReadBuffers.put(buffer, bufferPool);
}
return buffer;
}
}
@Override
public ByteBuffer read(ByteBufferPool bufferPool, int maxLength,
EnumSet<ReadOption> opts) throws IOException,
UnsupportedOperationException {
if (bufferPool == null) {
throw new IOException("Please specify buffer pool.");
}
ByteBuffer buffer = bufferPool.getBuffer(true, maxLength);
int pos = buffer.position();
int n = read(buffer);
if (n >= 0) {
buffer.position(pos);
return buffer;
}
return null;
}
@Override
public ByteBuffer read(ByteBufferPool bufferPool, int maxLength, EnumSet<ReadOption> opts) throws IOException, UnsupportedOperationException {
operatorStats.startWait();
try {
return underlyingIs.read(bufferPool, maxLength, opts);
} finally {
operatorStats.stopWait();
}
}
@Override
public synchronized ByteBuffer read(ByteBufferPool bufferPool,
int maxLength, EnumSet<ReadOption> opts)
throws IOException, UnsupportedOperationException {
if (maxLength == 0) {
return EMPTY_BUFFER;
} else if (maxLength < 0) {
throw new IllegalArgumentException("can't read a negative " +
"number of bytes.");
}
if ((blockReader == null) || (blockEnd == -1)) {
if (pos >= getFileLength()) {
return null;
}
/*
* If we don't have a blockReader, or the one we have has no more bytes
* left to read, we call seekToBlockSource to get a new blockReader and
* recalculate blockEnd. Note that we assume we're not at EOF here
* (we check this above).
*/
if ((!seekToBlockSource(pos)) || (blockReader == null)) {
throw new IOException("failed to allocate new BlockReader " +
"at position " + pos);
}
}
ByteBuffer buffer = null;
if (dfsClient.getConf().shortCircuitMmapEnabled) {
buffer = tryReadZeroCopy(maxLength, opts);
}
if (buffer != null) {
return buffer;
}
buffer = ByteBufferUtil.fallbackRead(this, bufferPool, maxLength);
if (buffer != null) {
getExtendedReadBuffers().put(buffer, bufferPool);
}
return buffer;
}
@Override
public synchronized void releaseBuffer(ByteBuffer buffer) {
if (buffer == EMPTY_BUFFER) return;
Object val = getExtendedReadBuffers().remove(buffer);
if (val == null) {
throw new IllegalArgumentException("tried to release a buffer " +
"that was not created by this stream, " + buffer);
}
if (val instanceof ClientMmap) {
IOUtils.closeQuietly((ClientMmap)val);
} else if (val instanceof ByteBufferPool) {
((ByteBufferPool)val).putBuffer(buffer);
}
}
@Override
public void releaseBuffer(ByteBuffer buffer) {
try {
((HasEnhancedByteBufferAccess)in).releaseBuffer(buffer);
}
catch (ClassCastException e) {
ByteBufferPool bufferPool = extendedReadBuffers.remove( buffer);
if (bufferPool == null) {
throw new IllegalArgumentException("tried to release a buffer " +
"that was not created by this stream.");
}
bufferPool.putBuffer(buffer);
}
}
@Override
public ByteBuffer read(ByteBufferPool bufferPool, int maxLength,
EnumSet<ReadOption> opts) throws IOException,
UnsupportedOperationException {
checkStream();
try {
if (outBuffer.remaining() > 0) {
// Have some decrypted data unread, need to reset.
((Seekable) in).seek(getPos());
resetStreamOffset(getPos());
}
final ByteBuffer buffer = ((HasEnhancedByteBufferAccess) in).
read(bufferPool, maxLength, opts);
if (buffer != null) {
final int n = buffer.remaining();
if (n > 0) {
streamOffset += buffer.remaining(); // Read n bytes
final int pos = buffer.position();
decrypt(buffer, n, pos);
}
}
return buffer;
} catch (ClassCastException e) {
throw new UnsupportedOperationException("This stream does not support " +
"enhanced byte buffer access.");
}
}
private ByteBufferPool getBufferPool() {
return new ByteBufferPool() {
@Override
public ByteBuffer getBuffer(boolean direct, int length) {
return ByteBuffer.allocateDirect(length);
}
@Override
public void putBuffer(ByteBuffer buffer) {
}
};
}
private void testReadHelper(final int readSize, final int readLimit) throws Exception {
ByteArrayInputStream inputStream = new LimitedByteArrayInputStream(testData, readLimit);
ByteBufferPool adapterPool = new TestByteBufferPool();
int currOffset = 0;
while (currOffset < TEST_DATA_SIZE) {
ByteBuffer byteBuffer = ByteBufferUtil.fallbackRead(inputStream, adapterPool, readSize);
final int length = byteBuffer.remaining();
for (int i = 0; i < length; i++) {
assertEquals(testData[currOffset + i], byteBuffer.get());
}
adapterPool.putBuffer(byteBuffer);
currOffset += length;
}
}
@Override
public ByteBuffer read(ByteBufferPool bufferPool, int maxLength, EnumSet<ReadOption> opts) throws IOException, UnsupportedOperationException {
try {
return underlyingIs.read(bufferPool, maxLength, opts);
} catch(FSError e) {
throw HadoopFileSystemWrapper.propagateFSError(e);
}
}
@Override
public synchronized ByteBuffer read(ByteBufferPool bufferPool,
int maxLength, EnumSet<ReadOption> opts)
throws IOException, UnsupportedOperationException {
if (maxLength == 0) {
return EMPTY_BUFFER;
} else if (maxLength < 0) {
throw new IllegalArgumentException("can't read a negative " +
"number of bytes.");
}
if ((blockReader == null) || (blockEnd == -1)) {
if (pos >= getFileLength()) {
return null;
}
/*
* If we don't have a blockReader, or the one we have has no more bytes
* left to read, we call seekToBlockSource to get a new blockReader and
* recalculate blockEnd. Note that we assume we're not at EOF here
* (we check this above).
*/
if ((!seekToBlockSource(pos)) || (blockReader == null)) {
throw new IOException("failed to allocate new BlockReader " +
"at position " + pos);
}
}
ByteBuffer buffer = null;
if (dfsClient.getConf().shortCircuitMmapEnabled) {
buffer = tryReadZeroCopy(maxLength, opts);
}
if (buffer != null) {
return buffer;
}
buffer = ByteBufferUtil.fallbackRead(this, bufferPool, maxLength);
if (buffer != null) {
getExtendedReadBuffers().put(buffer, bufferPool);
}
return buffer;
}
@Override
public synchronized void releaseBuffer(ByteBuffer buffer) {
if (buffer == EMPTY_BUFFER) return;
Object val = getExtendedReadBuffers().remove(buffer);
if (val == null) {
throw new IllegalArgumentException("tried to release a buffer " +
"that was not created by this stream, " + buffer);
}
if (val instanceof ClientMmap) {
IOUtils.closeQuietly((ClientMmap)val);
} else if (val instanceof ByteBufferPool) {
((ByteBufferPool)val).putBuffer(buffer);
}
}
@Override
public void releaseBuffer(ByteBuffer buffer) {
try {
((HasEnhancedByteBufferAccess)in).releaseBuffer(buffer);
}
catch (ClassCastException e) {
ByteBufferPool bufferPool = extendedReadBuffers.remove( buffer);
if (bufferPool == null) {
throw new IllegalArgumentException("tried to release a buffer " +
"that was not created by this stream.");
}
bufferPool.putBuffer(buffer);
}
}
@Override
public ByteBuffer read(ByteBufferPool bufferPool, int maxLength,
EnumSet<ReadOption> opts) throws IOException,
UnsupportedOperationException {
checkStream();
try {
if (outBuffer.remaining() > 0) {
// Have some decrypted data unread, need to reset.
((Seekable) in).seek(getPos());
resetStreamOffset(getPos());
}
final ByteBuffer buffer = ((HasEnhancedByteBufferAccess) in).
read(bufferPool, maxLength, opts);
if (buffer != null) {
final int n = buffer.remaining();
if (n > 0) {
streamOffset += buffer.remaining(); // Read n bytes
final int pos = buffer.position();
decrypt(buffer, n, pos);
}
}
return buffer;
} catch (ClassCastException e) {
throw new UnsupportedOperationException("This stream does not support " +
"enhanced byte buffer access.");
}
}
private ByteBufferPool getBufferPool() {
return new ByteBufferPool() {
@Override
public ByteBuffer getBuffer(boolean direct, int length) {
return ByteBuffer.allocateDirect(length);
}
@Override
public void putBuffer(ByteBuffer buffer) {
}
};
}
@Override
public ByteBuffer read(ByteBufferPool bufferPool, int maxLength, EnumSet<ReadOption> opts)
throws IOException, UnsupportedOperationException {
if (maxLength > this.length) {
throw new IOException("Attempting to read max length beyond inline content");
}
return outerStream.read(bufferPool, maxLength, opts);
}
final public ByteBuffer read(ByteBufferPool bufferPool, int maxLength)
throws IOException, UnsupportedOperationException {
return read(bufferPool, maxLength, EMPTY_READ_OPTIONS_SET);
}
@Override
public ByteBuffer read(ByteBufferPool bufferPool, int maxLength, EnumSet<ReadOption> opts) throws IOException, UnsupportedOperationException {
try (WaitRecorder recorder = OperatorStats.getWaitRecorder(operatorStats)) {
return super.read(bufferPool, maxLength, opts);
}
}
@Override
public ByteBufferPool getBufferPool() {
return pool;
}
@Override
public ByteBuffer read(ByteBufferPool bufferPool, int maxLength, EnumSet<ReadOption> opts) throws IOException, UnsupportedOperationException {
try (WaitRecorder recorder = OperatorStats.getWaitRecorder(operatorStats)) {
return super.read(bufferPool, maxLength, opts);
}
}
final public ByteBuffer read(ByteBufferPool bufferPool, int maxLength)
throws IOException, UnsupportedOperationException {
return read(bufferPool, maxLength, EMPTY_READ_OPTIONS_SET);
}
@Override
public ByteBuffer read(ByteBufferPool paramByteBufferPool,
int paramInt, EnumSet<ReadOption> paramEnumSet)
throws IOException, UnsupportedOperationException {
return null;
}
/**
* Get a ByteBuffer containing file data.
*
* This ByteBuffer may come from the stream itself, via a call like mmap,
* or it may come from the ByteBufferFactory which is passed in as an
* argument.
*
* @param factory
* If this is non-null, it will be used to create a fallback
* ByteBuffer when the stream itself cannot create one.
* @param maxLength
* The maximum length of buffer to return. We may return a buffer
* which is shorter than this.
* @param opts
* Options to use when reading.
*
* @return
* We will always return an empty buffer if maxLength was 0,
* whether or not we are at EOF.
* If maxLength > 0, we will return null if the stream has
* reached EOF.
* Otherwise, we will return a ByteBuffer containing at least one
* byte. You must free this ByteBuffer when you are done with it
* by calling releaseBuffer on it. The buffer will continue to be
* readable until it is released in this manner. However, the
* input stream's close method may warn about unclosed buffers.
* @throws
* IOException: if there was an error reading.
* UnsupportedOperationException: if factory was null, and we
* needed an external byte buffer. UnsupportedOperationException
* will never be thrown unless the factory argument is null.
*/
public ByteBuffer read(ByteBufferPool factory, int maxLength,
EnumSet<ReadOption> opts)
throws IOException, UnsupportedOperationException;
/**
* Get a ByteBuffer containing file data.
*
* This ByteBuffer may come from the stream itself, via a call like mmap,
* or it may come from the ByteBufferFactory which is passed in as an
* argument.
*
* @param factory
* If this is non-null, it will be used to create a fallback
* ByteBuffer when the stream itself cannot create one.
* @param maxLength
* The maximum length of buffer to return. We may return a buffer
* which is shorter than this.
* @param opts
* Options to use when reading.
*
* @return
* We will always return an empty buffer if maxLength was 0,
* whether or not we are at EOF.
* If maxLength > 0, we will return null if the stream has
* reached EOF.
* Otherwise, we will return a ByteBuffer containing at least one
* byte. You must free this ByteBuffer when you are done with it
* by calling releaseBuffer on it. The buffer will continue to be
* readable until it is released in this manner. However, the
* input stream's close method may warn about unclosed buffers.
* @throws
* IOException: if there was an error reading.
* UnsupportedOperationException: if factory was null, and we
* needed an external byte buffer. UnsupportedOperationException
* will never be thrown unless the factory argument is null.
*/
public ByteBuffer read(ByteBufferPool factory, int maxLength,
EnumSet<ReadOption> opts)
throws IOException, UnsupportedOperationException;