下面列出了org.apache.hadoop.fs.FSDataInputStream#getWrappedStream ( ) 实例代码,或者点击链接到github查看源代码,也可以在右侧发表评论。
public static void check(FileSystem fs, Path p, long length) throws IOException {
int i = -1;
try {
final FileStatus status = fs.getFileStatus(p);
FSDataInputStream in = fs.open(p);
if (in.getWrappedStream() instanceof DFSInputStream) {
long len = ((DFSInputStream)in.getWrappedStream()).getFileLength();
assertEquals(length, len);
} else {
assertEquals(length, status.getLen());
}
for(i++; i < length; i++) {
assertEquals((byte)i, (byte)in.read());
}
i = -(int)length;
assertEquals(-1, in.read()); //EOF
in.close();
} catch(IOException ioe) {
throw new IOException("p=" + p + ", length=" + length + ", i=" + i, ioe);
}
}
public static void check(FileSystem fs, Path p, long length) throws IOException {
int i = -1;
try {
final FileStatus status = fs.getFileStatus(p);
FSDataInputStream in = fs.open(p);
if (in.getWrappedStream() instanceof DFSInputStream) {
long len = ((DFSInputStream)in.getWrappedStream()).getFileLength();
assertEquals(length, len);
} else {
assertEquals(length, status.getLen());
}
for(i++; i < length; i++) {
assertEquals((byte)i, (byte)in.read());
}
i = -(int)length;
assertEquals(-1, in.read()); //EOF
in.close();
} catch(IOException ioe) {
throw new IOException("p=" + p + ", length=" + length + ", i=" + i, ioe);
}
}
public HoodieLogFileReader(FileSystem fs, HoodieLogFile logFile, Schema readerSchema, int bufferSize,
boolean readBlockLazily, boolean reverseReader) throws IOException {
FSDataInputStream fsDataInputStream = fs.open(logFile.getPath(), bufferSize);
if (fsDataInputStream.getWrappedStream() instanceof FSInputStream) {
this.inputStream = new FSDataInputStream(
new BufferedFSInputStream((FSInputStream) fsDataInputStream.getWrappedStream(), bufferSize));
} else {
// fsDataInputStream.getWrappedStream() maybe a BufferedFSInputStream
// need to wrap in another BufferedFSInputStream the make bufferSize work?
this.inputStream = fsDataInputStream;
}
this.logFile = logFile;
this.readerSchema = readerSchema;
this.readBlockLazily = readBlockLazily;
this.reverseReader = reverseReader;
if (this.reverseReader) {
this.reverseLogFilePosition = this.lastReverseLogFilePosition = fs.getFileStatus(logFile.getPath()).getLen();
}
addShutDownHook();
}
public static void readFully(FSDataInputStream reader, long offset, ByteBuffer buffer) throws IOException {
if (offset >= 0) {
reader.seek(offset);
}
InputStream is = reader.getWrappedStream();
if (!(is instanceof ByteBufferReadable)) {
logger.trace("Using read bytes method");
byte[] bytes = new byte[buffer.remaining()];
reader.readFully(bytes);
buffer.put(bytes);
} else {
while (buffer.hasRemaining()) {
int pos = buffer.position();
int rt = reader.read(buffer);
if (rt < 0) {
throw new IOException("End of stream");
}
buffer.position(pos + rt);
}
}
Preconditions.checkState(!buffer.hasRemaining());
}
private void doPread(FSDataInputStream stm, long position, byte[] buffer,
int offset, int length) throws IOException {
int nread = 0;
long totalRead = 0;
DFSInputStream dfstm = null;
if (stm.getWrappedStream() instanceof DFSInputStream) {
dfstm = (DFSInputStream) (stm.getWrappedStream());
totalRead = dfstm.getReadStatistics().getTotalBytesRead();
}
while (nread < length) {
int nbytes =
stm.read(position + nread, buffer, offset + nread, length - nread);
assertTrue("Error in pread", nbytes > 0);
nread += nbytes;
}
if (dfstm != null) {
if (isHedgedRead) {
assertTrue("Expected read statistic to be incremented", length <= dfstm
.getReadStatistics().getTotalBytesRead() - totalRead);
} else {
assertEquals("Expected read statistic to be incremented", length, dfstm
.getReadStatistics().getTotalBytesRead() - totalRead);
}
}
}
public static FSInputStream of(FSDataInputStream in) throws IOException {
if (in.getWrappedStream() instanceof ByteBufferReadable) {
return new FSDataInputStreamWrapper(in);
}
return new ByteArrayFSInputStream(in);
}
@Override
public void close() throws IOException {
try {
final FSDataInputStream is = getUnderlyingStream();
final Object inputStream;
final Method readStatsMethod;
if (isInstanceOfHdfsDataInputStream(is)) {
inputStream = is;
readStatsMethod = HDFS_DATA_INPUT_STREAM_READ_STATISTICS_METHOD;
} else if (isInstanceOfHdfsDataInputStream(is.getWrappedStream())) {
inputStream = is.getWrappedStream();
readStatsMethod = HDFS_DATA_INPUT_STREAM_READ_STATISTICS_METHOD;
} else if (isInstanceOfDFSInputStream(is.getWrappedStream().getClass())) {
inputStream = is.getWrappedStream();
readStatsMethod = DFS_INPUT_STREAM_READ_STATISTICS_METHOD;
} else {
inputStream = null;
readStatsMethod = null;
}
if (inputStream == null || readStatsMethod == null) {
return;
}
try {
Object readStatistics = readStatsMethod.invoke(inputStream);
addLongStat(ScanOperator.Metric.TOTAL_BYTES_READ, readStatistics, GET_TOTAL_BYTES_READ_METHOD);
addLongStat(ScanOperator.Metric.LOCAL_BYTES_READ, readStatistics, GET_TOTAL_LOCAL_BYTES_READ_METHOD);
addLongStat(ScanOperator.Metric.SHORT_CIRCUIT_BYTES_READ, readStatistics, GET_TOTAL_SHORT_CIRCUIT_BYTES_READ_METHOD);
} catch (IllegalAccessException | InvocationTargetException e) {
// suppress and continue with other streams
}
} finally {
super.close();
}
}
public static FSInputStream of(FSDataInputStream in) throws IOException {
if (in.getWrappedStream() instanceof ByteBufferReadable) {
return new FSDataInputStreamWrapper(in);
}
return new ByteArrayFSInputStream(in);
}
public static FSInputStream of(FSDataInputStream in) throws IOException {
if (in.getWrappedStream() instanceof ByteBufferReadable) {
return new FSDataInputStreamWrapper(in);
}
return new ByteArrayFSInputStream(in);
}
private void doPread(FSDataInputStream stm, long position, byte[] buffer,
int offset, int length) throws IOException {
int nread = 0;
long totalRead = 0;
DFSInputStream dfstm = null;
if (stm.getWrappedStream() instanceof DFSInputStream) {
dfstm = (DFSInputStream) (stm.getWrappedStream());
totalRead = dfstm.getReadStatistics().getTotalBytesRead();
}
while (nread < length) {
int nbytes =
stm.read(position + nread, buffer, offset + nread, length - nread);
assertTrue("Error in pread", nbytes > 0);
nread += nbytes;
}
if (dfstm != null) {
if (isHedgedRead) {
assertTrue("Expected read statistic to be incremented", length <= dfstm
.getReadStatistics().getTotalBytesRead() - totalRead);
} else {
assertEquals("Expected read statistic to be incremented", length, dfstm
.getReadStatistics().getTotalBytesRead() - totalRead);
}
}
}
public InLineFsDataInputStream(int startOffset, FSDataInputStream outerStream, int length) throws IOException {
super(outerStream.getWrappedStream());
this.startOffset = startOffset;
this.outerStream = outerStream;
this.length = length;
outerStream.seek(startOffset);
}
public static ByteBufferReader open(org.apache.hadoop.fs.FileSystem fileSystem,
org.apache.hadoop.fs.Path path,
long size,
int blockCount,
long readBase) throws IOException {
FSDataInputStream stream = fileSystem.open(path);
if (HDFS_READ_HACK_ENABLE) {
if (IS_SHORT_CIRCUIT_LOCAL_READ_ENABLE == null) {
IS_SHORT_CIRCUIT_LOCAL_READ_ENABLE = Boolean.parseBoolean(fileSystem.getConf().get("dfs.client.read.shortcircuit", "false"));
}
if (IS_SHORT_CIRCUIT_LOCAL_READ_ENABLE) {
InputStream is = stream.getWrappedStream();
if (is instanceof DFSInputStream) {
// Close check sum if short circuit local read is enabled.
MemoryUtil.setDFSInputStream_verifyChecksum(is, false);
logger.debug("disable read check sum for: {}", path);
}
}
}
return new DFSByteBufferReader(
path.toString(),
stream,
size,
readBase,
stream,
blockCount);
}
public FSDataInputChannel(FSDataInputStream inputStream) throws IOException {
if (inputStream.getWrappedStream() instanceof ByteBufferReadable) {
this.isDirectRead = true;
} else {
/* LocalFileSystem, S3 does not support ByteBufferReadable */
this.channel = Channels.newChannel(inputStream);
}
this.inputStream = inputStream;
this.size = inputStream.getPos() + inputStream.available();
}
/**
* This will free sockets and file descriptors held by the stream only when the stream implements
* org.apache.hadoop.fs.CanUnbuffer. NOT THREAD SAFE. Must be called only when all the clients
* using this stream to read the blocks have finished reading. If by chance the stream is
* unbuffered and there are clients still holding this stream for read then on next client read
* request a new socket will be opened by Datanode without client knowing about it and will serve
* its read request. Note: If this socket is idle for some time then the DataNode will close the
* socket and the socket will move into CLOSE_WAIT state and on the next client request on this
* stream, the current socket will be closed and a new socket will be opened to serve the
* requests.
*/
@SuppressWarnings({ "rawtypes" })
public void unbuffer() {
FSDataInputStream stream = this.getStream(this.shouldUseHBaseChecksum());
if (stream != null) {
InputStream wrappedStream = stream.getWrappedStream();
// CanUnbuffer interface was added as part of HDFS-7694 and the fix is available in Hadoop
// 2.6.4+ and 2.7.1+ versions only so check whether the stream object implements the
// CanUnbuffer interface or not and based on that call the unbuffer api.
final Class<? extends InputStream> streamClass = wrappedStream.getClass();
if (this.instanceOfCanUnbuffer == null) {
// To ensure we compute whether the stream is instance of CanUnbuffer only once.
this.instanceOfCanUnbuffer = false;
if (wrappedStream instanceof CanUnbuffer) {
this.unbuffer = (CanUnbuffer) wrappedStream;
this.instanceOfCanUnbuffer = true;
}
}
if (this.instanceOfCanUnbuffer) {
try {
this.unbuffer.unbuffer();
} catch (UnsupportedOperationException e){
if (isLogTraceEnabled) {
LOG.trace("Failed to invoke 'unbuffer' method in class " + streamClass
+ " . So there may be the stream does not support unbuffering.", e);
}
}
} else {
if (isLogTraceEnabled) {
LOG.trace("Failed to find 'unbuffer' method in class " + streamClass);
}
}
}
}
public static boolean isByteBufferReadable(FSDataInputStream is) {
InputStream cur = is.getWrappedStream();
for (;;) {
if ((cur instanceof FSDataInputStream)) {
cur = ((FSDataInputStream) cur).getWrappedStream();
} else {
break;
}
}
return cur instanceof ByteBufferReadable;
}