org.apache.hadoop.fs.FSDataInputStream#getWrappedStream ( )源码实例Demo

下面列出了org.apache.hadoop.fs.FSDataInputStream#getWrappedStream ( ) 实例代码,或者点击链接到github查看源代码,也可以在右侧发表评论。

源代码1 项目: hadoop   文件: AppendTestUtil.java
public static void check(FileSystem fs, Path p, long length) throws IOException {
  int i = -1;
  try {
    final FileStatus status = fs.getFileStatus(p);
    FSDataInputStream in = fs.open(p);
    if (in.getWrappedStream() instanceof DFSInputStream) {
      long len = ((DFSInputStream)in.getWrappedStream()).getFileLength();
      assertEquals(length, len);
    } else {
      assertEquals(length, status.getLen());
    }
    
    for(i++; i < length; i++) {
      assertEquals((byte)i, (byte)in.read());  
    }
    i = -(int)length;
    assertEquals(-1, in.read()); //EOF  
    in.close();
  } catch(IOException ioe) {
    throw new IOException("p=" + p + ", length=" + length + ", i=" + i, ioe);
  }
}
 
源代码2 项目: big-c   文件: AppendTestUtil.java
public static void check(FileSystem fs, Path p, long length) throws IOException {
  int i = -1;
  try {
    final FileStatus status = fs.getFileStatus(p);
    FSDataInputStream in = fs.open(p);
    if (in.getWrappedStream() instanceof DFSInputStream) {
      long len = ((DFSInputStream)in.getWrappedStream()).getFileLength();
      assertEquals(length, len);
    } else {
      assertEquals(length, status.getLen());
    }
    
    for(i++; i < length; i++) {
      assertEquals((byte)i, (byte)in.read());  
    }
    i = -(int)length;
    assertEquals(-1, in.read()); //EOF  
    in.close();
  } catch(IOException ioe) {
    throw new IOException("p=" + p + ", length=" + length + ", i=" + i, ioe);
  }
}
 
源代码3 项目: hudi   文件: HoodieLogFileReader.java
public HoodieLogFileReader(FileSystem fs, HoodieLogFile logFile, Schema readerSchema, int bufferSize,
    boolean readBlockLazily, boolean reverseReader) throws IOException {
  FSDataInputStream fsDataInputStream = fs.open(logFile.getPath(), bufferSize);
  if (fsDataInputStream.getWrappedStream() instanceof FSInputStream) {
    this.inputStream = new FSDataInputStream(
        new BufferedFSInputStream((FSInputStream) fsDataInputStream.getWrappedStream(), bufferSize));
  } else {
    // fsDataInputStream.getWrappedStream() maybe a BufferedFSInputStream
    // need to wrap in another BufferedFSInputStream the make bufferSize work?
    this.inputStream = fsDataInputStream;
  }

  this.logFile = logFile;
  this.readerSchema = readerSchema;
  this.readBlockLazily = readBlockLazily;
  this.reverseReader = reverseReader;
  if (this.reverseReader) {
    this.reverseLogFilePosition = this.lastReverseLogFilePosition = fs.getFileStatus(logFile.getPath()).getLen();
  }
  addShutDownHook();
}
 
源代码4 项目: indexr   文件: IOUtil.java
public static void readFully(FSDataInputStream reader, long offset, ByteBuffer buffer) throws IOException {
    if (offset >= 0) {
        reader.seek(offset);
    }

    InputStream is = reader.getWrappedStream();

    if (!(is instanceof ByteBufferReadable)) {
        logger.trace("Using read bytes method");
        byte[] bytes = new byte[buffer.remaining()];
        reader.readFully(bytes);
        buffer.put(bytes);
    } else {
        while (buffer.hasRemaining()) {
            int pos = buffer.position();
            int rt = reader.read(buffer);
            if (rt < 0) {
                throw new IOException("End of stream");
            }
            buffer.position(pos + rt);
        }
    }
    Preconditions.checkState(!buffer.hasRemaining());
}
 
源代码5 项目: hadoop   文件: TestPread.java
private void doPread(FSDataInputStream stm, long position, byte[] buffer,
                     int offset, int length) throws IOException {
  int nread = 0;
  long totalRead = 0;
  DFSInputStream dfstm = null;

  if (stm.getWrappedStream() instanceof DFSInputStream) {
    dfstm = (DFSInputStream) (stm.getWrappedStream());
    totalRead = dfstm.getReadStatistics().getTotalBytesRead();
  }

  while (nread < length) {
    int nbytes =
        stm.read(position + nread, buffer, offset + nread, length - nread);
    assertTrue("Error in pread", nbytes > 0);
    nread += nbytes;
  }

  if (dfstm != null) {
    if (isHedgedRead) {
      assertTrue("Expected read statistic to be incremented", length <= dfstm
          .getReadStatistics().getTotalBytesRead() - totalRead);
    } else {
      assertEquals("Expected read statistic to be incremented", length, dfstm
          .getReadStatistics().getTotalBytesRead() - totalRead);
    }
  }
}
 
源代码6 项目: dremio-oss   文件: FSDataInputStreamWrapper.java
public static FSInputStream of(FSDataInputStream in) throws IOException {
  if (in.getWrappedStream() instanceof ByteBufferReadable) {
    return new FSDataInputStreamWrapper(in);
  }

  return new ByteArrayFSInputStream(in);
}
 
@Override
public void close() throws IOException {
  try {
    final FSDataInputStream is = getUnderlyingStream();
    final Object inputStream;
    final Method readStatsMethod;
    if (isInstanceOfHdfsDataInputStream(is)) {
      inputStream = is;
      readStatsMethod = HDFS_DATA_INPUT_STREAM_READ_STATISTICS_METHOD;
    } else if (isInstanceOfHdfsDataInputStream(is.getWrappedStream())) {
      inputStream = is.getWrappedStream();
      readStatsMethod = HDFS_DATA_INPUT_STREAM_READ_STATISTICS_METHOD;
    } else if (isInstanceOfDFSInputStream(is.getWrappedStream().getClass())) {
      inputStream = is.getWrappedStream();
      readStatsMethod = DFS_INPUT_STREAM_READ_STATISTICS_METHOD;
    } else {
      inputStream = null;
      readStatsMethod = null;
    }

    if (inputStream == null || readStatsMethod == null) {
      return;
    }

    try {
      Object readStatistics = readStatsMethod.invoke(inputStream);

      addLongStat(ScanOperator.Metric.TOTAL_BYTES_READ, readStatistics, GET_TOTAL_BYTES_READ_METHOD);
      addLongStat(ScanOperator.Metric.LOCAL_BYTES_READ, readStatistics, GET_TOTAL_LOCAL_BYTES_READ_METHOD);
      addLongStat(ScanOperator.Metric.SHORT_CIRCUIT_BYTES_READ, readStatistics, GET_TOTAL_SHORT_CIRCUIT_BYTES_READ_METHOD);
    } catch (IllegalAccessException | InvocationTargetException e) {
      // suppress and continue with other streams
    }
  } finally {
    super.close();
  }
}
 
源代码8 项目: dremio-oss   文件: FSDataInputStreamWrapper.java
public static FSInputStream of(FSDataInputStream in) throws IOException {
  if (in.getWrappedStream() instanceof ByteBufferReadable) {
    return new FSDataInputStreamWrapper(in);
  }

  return new ByteArrayFSInputStream(in);
}
 
源代码9 项目: dremio-oss   文件: FSDataInputStreamWrapper.java
public static FSInputStream of(FSDataInputStream in) throws IOException {
  if (in.getWrappedStream() instanceof ByteBufferReadable) {
    return new FSDataInputStreamWrapper(in);
  }

  return new ByteArrayFSInputStream(in);
}
 
源代码10 项目: big-c   文件: TestPread.java
private void doPread(FSDataInputStream stm, long position, byte[] buffer,
                     int offset, int length) throws IOException {
  int nread = 0;
  long totalRead = 0;
  DFSInputStream dfstm = null;

  if (stm.getWrappedStream() instanceof DFSInputStream) {
    dfstm = (DFSInputStream) (stm.getWrappedStream());
    totalRead = dfstm.getReadStatistics().getTotalBytesRead();
  }

  while (nread < length) {
    int nbytes =
        stm.read(position + nread, buffer, offset + nread, length - nread);
    assertTrue("Error in pread", nbytes > 0);
    nread += nbytes;
  }

  if (dfstm != null) {
    if (isHedgedRead) {
      assertTrue("Expected read statistic to be incremented", length <= dfstm
          .getReadStatistics().getTotalBytesRead() - totalRead);
    } else {
      assertEquals("Expected read statistic to be incremented", length, dfstm
          .getReadStatistics().getTotalBytesRead() - totalRead);
    }
  }
}
 
源代码11 项目: hudi   文件: InLineFsDataInputStream.java
public InLineFsDataInputStream(int startOffset, FSDataInputStream outerStream, int length) throws IOException {
  super(outerStream.getWrappedStream());
  this.startOffset = startOffset;
  this.outerStream = outerStream;
  this.length = length;
  outerStream.seek(startOffset);
}
 
源代码12 项目: indexr   文件: DFSByteBufferReader.java
public static ByteBufferReader open(org.apache.hadoop.fs.FileSystem fileSystem,
                                    org.apache.hadoop.fs.Path path,
                                    long size,
                                    int blockCount,
                                    long readBase) throws IOException {
    FSDataInputStream stream = fileSystem.open(path);

    if (HDFS_READ_HACK_ENABLE) {
        if (IS_SHORT_CIRCUIT_LOCAL_READ_ENABLE == null) {
            IS_SHORT_CIRCUIT_LOCAL_READ_ENABLE = Boolean.parseBoolean(fileSystem.getConf().get("dfs.client.read.shortcircuit", "false"));
        }
        if (IS_SHORT_CIRCUIT_LOCAL_READ_ENABLE) {
            InputStream is = stream.getWrappedStream();
            if (is instanceof DFSInputStream) {
                // Close check sum if short circuit local read is enabled.
                MemoryUtil.setDFSInputStream_verifyChecksum(is, false);
                logger.debug("disable read check sum for: {}", path);
            }
        }
    }

    return new DFSByteBufferReader(
            path.toString(),
            stream,
            size,
            readBase,
            stream,
            blockCount);
}
 
源代码13 项目: tajo   文件: FSDataInputChannel.java
public FSDataInputChannel(FSDataInputStream inputStream) throws IOException {
  if (inputStream.getWrappedStream() instanceof ByteBufferReadable) {
    this.isDirectRead = true;
  } else {
    /* LocalFileSystem, S3 does not support ByteBufferReadable */
    this.channel = Channels.newChannel(inputStream);
  }
  this.inputStream = inputStream;
  this.size = inputStream.getPos() + inputStream.available();
}
 
源代码14 项目: hbase   文件: FSDataInputStreamWrapper.java
/**
 * This will free sockets and file descriptors held by the stream only when the stream implements
 * org.apache.hadoop.fs.CanUnbuffer. NOT THREAD SAFE. Must be called only when all the clients
 * using this stream to read the blocks have finished reading. If by chance the stream is
 * unbuffered and there are clients still holding this stream for read then on next client read
 * request a new socket will be opened by Datanode without client knowing about it and will serve
 * its read request. Note: If this socket is idle for some time then the DataNode will close the
 * socket and the socket will move into CLOSE_WAIT state and on the next client request on this
 * stream, the current socket will be closed and a new socket will be opened to serve the
 * requests.
 */
@SuppressWarnings({ "rawtypes" })
public void unbuffer() {
  FSDataInputStream stream = this.getStream(this.shouldUseHBaseChecksum());
  if (stream != null) {
    InputStream wrappedStream = stream.getWrappedStream();
    // CanUnbuffer interface was added as part of HDFS-7694 and the fix is available in Hadoop
    // 2.6.4+ and 2.7.1+ versions only so check whether the stream object implements the
    // CanUnbuffer interface or not and based on that call the unbuffer api.
    final Class<? extends InputStream> streamClass = wrappedStream.getClass();
    if (this.instanceOfCanUnbuffer == null) {
      // To ensure we compute whether the stream is instance of CanUnbuffer only once.
      this.instanceOfCanUnbuffer = false;
      if (wrappedStream instanceof CanUnbuffer) {
        this.unbuffer = (CanUnbuffer) wrappedStream;
        this.instanceOfCanUnbuffer = true;
      }
    }
    if (this.instanceOfCanUnbuffer) {
      try {
        this.unbuffer.unbuffer();
      } catch (UnsupportedOperationException e){
        if (isLogTraceEnabled) {
          LOG.trace("Failed to invoke 'unbuffer' method in class " + streamClass
              + " . So there may be the stream does not support unbuffering.", e);
        }
      }
    } else {
      if (isLogTraceEnabled) {
        LOG.trace("Failed to find 'unbuffer' method in class " + streamClass);
      }
    }
  }
}
 
源代码15 项目: hbase   文件: BlockIOUtils.java
public static boolean isByteBufferReadable(FSDataInputStream is) {
  InputStream cur = is.getWrappedStream();
  for (;;) {
    if ((cur instanceof FSDataInputStream)) {
      cur = ((FSDataInputStream) cur).getWrappedStream();
    } else {
      break;
    }
  }
  return cur instanceof ByteBufferReadable;
}