
下面列出了org.apache.hadoop.fs.PositionedReadable#org.apache.hadoop.fs.Seekable 实例代码,或者点击链接到github查看源代码,也可以在右侧发表评论。

源代码1 项目: hadoop   文件:
/** Seek to a position. */
public void seek(long pos) throws IOException {
  Preconditions.checkArgument(pos >= 0, "Cannot seek to negative offset.");
  try {
     * If data of target pos in the underlying stream has already been read 
     * and decrypted in outBuffer, we just need to re-position outBuffer.
    if (pos <= streamOffset && pos >= (streamOffset - outBuffer.remaining())) {
      int forward = (int) (pos - (streamOffset - outBuffer.remaining()));
      if (forward > 0) {
        outBuffer.position(outBuffer.position() + forward);
    } else {
      ((Seekable) in).seek(pos);
  } catch (ClassCastException e) {
    throw new UnsupportedOperationException("This stream does not support " +
源代码2 项目: hadoop   文件:
/** Test get position. */
public void testGetPos() throws Exception {
  OutputStream out = getOutputStream(defaultBufferSize);
  // Default buffer size
  InputStream in = getInputStream(defaultBufferSize);
  byte[] result = new byte[dataLen];
  int n1 = readAll(in, result, 0, dataLen / 3);
  Assert.assertEquals(n1, ((Seekable) in).getPos());
  int n2 = readAll(in, result, n1, dataLen - n1);
  Assert.assertEquals(n1 + n2, ((Seekable) in).getPos());
源代码3 项目: dremio-oss   文件:
public void testOnMessageSuccessful() throws IOException {
  InputStream mis = mock(InputStream.class, withSettings().extraInterfaces(Seekable.class, PositionedReadable.class));
  doReturn(42).when(mis).read(any(byte[].class), anyInt(), anyInt());

  FSDataInputStream fdis = new FSDataInputStream(mis);
  Response response = getResponse(7L, 4096, fdis);

  InOrder inOrder = Mockito.inOrder(mis);

  inOrder.verify((Seekable) mis).seek(7);
  inOrder.verify(mis).read(any(byte[].class), anyInt(), anyInt());

  assertEquals(42, ((DFS.GetFileDataResponse) response.pBody).getRead());
  assertEquals(42, response.dBodies[0].readableBytes());
源代码4 项目: dremio-oss   文件:
public void testOnMessageEOF() throws IOException {
  InputStream mis = mock(InputStream.class, withSettings().extraInterfaces(Seekable.class, PositionedReadable.class));
  doReturn(-1).when(mis).read(any(byte[].class), anyInt(), anyInt());

  FSDataInputStream fdis = new FSDataInputStream(mis);
  Response response = getResponse(7L, 4096, fdis);

  InOrder inOrder = Mockito.inOrder(mis);

  inOrder.verify((Seekable) mis).seek(7);
  inOrder.verify(mis).read(any(byte[].class), anyInt(), anyInt());

  assertEquals(-1, ((DFS.GetFileDataResponse) response.pBody).getRead());
  assertEquals(0, response.dBodies.length);
源代码5 项目: big-c   文件:
/** Seek to a position. */
public void seek(long pos) throws IOException {
  Preconditions.checkArgument(pos >= 0, "Cannot seek to negative offset.");
  try {
     * If data of target pos in the underlying stream has already been read 
     * and decrypted in outBuffer, we just need to re-position outBuffer.
    if (pos <= streamOffset && pos >= (streamOffset - outBuffer.remaining())) {
      int forward = (int) (pos - (streamOffset - outBuffer.remaining()));
      if (forward > 0) {
        outBuffer.position(outBuffer.position() + forward);
    } else {
      ((Seekable) in).seek(pos);
  } catch (ClassCastException e) {
    throw new UnsupportedOperationException("This stream does not support " +
源代码6 项目: big-c   文件:
/** Test get position. */
public void testGetPos() throws Exception {
  OutputStream out = getOutputStream(defaultBufferSize);
  // Default buffer size
  InputStream in = getInputStream(defaultBufferSize);
  byte[] result = new byte[dataLen];
  int n1 = readAll(in, result, 0, dataLen / 3);
  Assert.assertEquals(n1, ((Seekable) in).getPos());
  int n2 = readAll(in, result, n1, dataLen - n1);
  Assert.assertEquals(n1 + n2, ((Seekable) in).getPos());
源代码7 项目: Hive-XML-SerDe   文件:
public boolean next(LongWritable key, Text value) throws IOException {
    if (this.pos < this.end) {
        if (readUntilMatch(this.startTag, false)) {
            this.recordStartPos = this.pos - this.startTag.length;
            try {
                if (readUntilMatch(this.endTag, true)) {
                    value.set(this.buffer.getData(), 0, this.buffer.getLength());
                    return true;
            } finally {
                if (this.fsin instanceof Seekable) {
                    if (this.pos != ((Seekable) this.fsin).getPos()) {
                        throw new RuntimeException("bytes consumed error!");
    return false;
源代码8 项目: Bats   文件:
 * Creates a new instance with the mandatory characters for handling newlines
 * transparently. lineSeparator the sequence of characters that represent a
 * newline, as defined in {@link Format#getLineSeparator()}
 * normalizedLineSeparator the normalized newline character (as defined in
 * {@link Format#getNormalizedNewline()}) that is used to replace any
 * lineSeparator sequence found in the input.
public TextInput(TextParsingSettings settings, InputStream input, DrillBuf readBuffer, long startPos, long endPos) {
  this.lineSeparator = settings.getNewLineDelimiter();
  byte normalizedLineSeparator = settings.getNormalizedNewLine();
  Preconditions.checkArgument(input instanceof Seekable, "Text input only supports an InputStream that supports Seekable.");
  boolean isCompressed = input instanceof CompressionInputStream;
  Preconditions.checkArgument(!isCompressed || startPos == 0, "Cannot use split on compressed stream.");

  // splits aren't allowed with compressed data.  The split length will be the compressed size which means we'll normally end prematurely.
  if (isCompressed && endPos > 0) {
    endPos = Long.MAX_VALUE;

  this.input = input;
  this.seekable = (Seekable) input;
  this.settings = settings;

  if (input instanceof FSDataInputStream) {
    this.inputFS = (FSDataInputStream) input;
    this.bufferReadable = inputFS.getWrappedStream() instanceof ByteBufferReadable;
  } else {
    this.inputFS = null;
    this.bufferReadable = false;

  this.startPos = startPos;
  this.endPos = endPos;

  this.normalizedLineSeparator = normalizedLineSeparator;

  this.buffer = readBuffer;
  this.bStart = buffer.memoryAddress();
  this.bStartMinus1 = bStart -1;
  this.underlyingBuffer = buffer.nioBuffer(0, buffer.capacity());
源代码9 项目: hadoop   文件:
 * If input stream is {@link org.apache.hadoop.fs.Seekable}, return it's
 * current position, otherwise return 0;
public static long getInputStreamOffset(InputStream in) throws IOException {
  if (in instanceof Seekable) {
    return ((Seekable) in).getPos();
  return 0;
源代码10 项目: hadoop   文件:
public boolean seekToNewSource(long targetPos) throws IOException {
  Preconditions.checkArgument(targetPos >= 0, 
      "Cannot seek to negative offset.");
  try {
    boolean result = ((Seekable) in).seekToNewSource(targetPos);
    return result;
  } catch (ClassCastException e) {
    throw new UnsupportedOperationException("This stream does not support " +
源代码11 项目: hadoop   文件:
public ByteBuffer read(ByteBufferPool bufferPool, int maxLength,
    EnumSet<ReadOption> opts) throws IOException,
    UnsupportedOperationException {
  try {
    if (outBuffer.remaining() > 0) {
      // Have some decrypted data unread, need to reset.
      ((Seekable) in).seek(getPos());
    final ByteBuffer buffer = ((HasEnhancedByteBufferAccess) in).
        read(bufferPool, maxLength, opts);
    if (buffer != null) {
      final int n = buffer.remaining();
      if (n > 0) {
        streamOffset += buffer.remaining(); // Read n bytes
        final int pos = buffer.position();
        decrypt(buffer, n, pos);
    return buffer;
  } catch (ClassCastException e) {
    throw new UnsupportedOperationException("This stream does not support " + 
        "enhanced byte buffer access.");
源代码12 项目: hadoop   文件:
 * This method returns the current position in the stream.
 * @return Current position in stream as a long
public long getPos() throws IOException {
  if (!(in instanceof Seekable) || !(in instanceof PositionedReadable)){
    //This way of getting the current position will not work for file
    //size which can be fit in an int and hence can not be returned by
    //available method.
    return (this.maxAvailableData -;
    return ((Seekable);

源代码13 项目: hadoop   文件:
private void seekCheck(InputStream in, int pos) throws Exception {
  byte[] result = new byte[dataLen];
  ((Seekable) in).seek(pos);
  int n = readAll(in, result, 0, dataLen);
  Assert.assertEquals(dataLen, n + pos);
  byte[] readData = new byte[n];
  System.arraycopy(result, 0, readData, 0, n);
  byte[] expectedData = new byte[n];
  System.arraycopy(data, pos, expectedData, 0, n);
  Assert.assertArrayEquals(readData, expectedData);
源代码14 项目: hadoop   文件:
/** Test skip. */
public void testSkip() throws Exception {
  OutputStream out = getOutputStream(defaultBufferSize);
  // Default buffer size
  InputStream in = getInputStream(defaultBufferSize);
  byte[] result = new byte[dataLen];
  int n1 = readAll(in, result, 0, dataLen / 3);
  Assert.assertEquals(n1, ((Seekable) in).getPos());
  long skipped = in.skip(dataLen / 3);
  int n2 = readAll(in, result, 0, dataLen);
  Assert.assertEquals(dataLen, n1 + skipped + n2);
  byte[] readData = new byte[n2];
  System.arraycopy(result, 0, readData, 0, n2);
  byte[] expectedData = new byte[n2];
  System.arraycopy(data, dataLen - n2, expectedData, 0, n2);
  Assert.assertArrayEquals(readData, expectedData);
  try {
    skipped = in.skip(-3);"Skip Negative length should fail.");
  } catch (IllegalArgumentException e) {
    GenericTestUtils.assertExceptionContains("Negative skip length", e);
  // Skip after EOF
  skipped = in.skip(3);
  Assert.assertEquals(skipped, 0);
源代码15 项目: hadoop   文件:
private void seekToNewSourceCheck(InputStream in, int targetPos) 
    throws Exception {
  byte[] result = new byte[dataLen];
  ((Seekable) in).seekToNewSource(targetPos);
  int n = readAll(in, result, 0, dataLen);
  Assert.assertEquals(dataLen, n + targetPos);
  byte[] readData = new byte[n];
  System.arraycopy(result, 0, readData, 0, n);
  byte[] expectedData = new byte[n];
  System.arraycopy(data, targetPos, expectedData, 0, n);
  Assert.assertArrayEquals(readData, expectedData);
源代码16 项目: dremio-oss   文件:
public void test() throws Exception {
  Class<?> byteBufferPositionedReadableClass = getClass("org.apache.hadoop.fs.ByteBufferPositionedReadable");

  final IOException ioException = new IOException("test io exception");
  final FSError fsError = newFSError(ioException);
  FSDataInputStream fdis = new FSDataInputStream(mock(InputStream.class, withSettings().extraInterfaces(Seekable.class, byteBufferPositionedReadableClass == null ? AutoCloseable.class : byteBufferPositionedReadableClass, PositionedReadable.class, ByteBufferReadable.class).defaultAnswer(new Answer<Object>() {
    public Object answer(InvocationOnMock invocation) throws Throwable {
      throw fsError;

  FSInputStream fdisw = FSDataInputStreamWrapper.of(fdis);
  Object[] params = getDummyArguments(method);
  try {
    method.invoke(fdisw, params);
  } catch(InvocationTargetException e) {
    if (byteBufferPositionedReadableClass == null) {
      assertThat(e.getTargetException(), anyOf(is(instanceOf(IOException.class)), is(instanceOf(UnsupportedOperationException.class))));
    } else {
      assertThat(e.getTargetException(), is(instanceOf(IOException.class)));
    if (e.getTargetException() instanceof IOException) {
      assertThat((IOException) e.getTargetException(), is(sameInstance(ioException)));
源代码17 项目: big-c   文件:
 * If input stream is {@link org.apache.hadoop.fs.Seekable}, return it's
 * current position, otherwise return 0;
public static long getInputStreamOffset(InputStream in) throws IOException {
  if (in instanceof Seekable) {
    return ((Seekable) in).getPos();
  return 0;
源代码18 项目: big-c   文件:
public boolean seekToNewSource(long targetPos) throws IOException {
  Preconditions.checkArgument(targetPos >= 0, 
      "Cannot seek to negative offset.");
  try {
    boolean result = ((Seekable) in).seekToNewSource(targetPos);
    return result;
  } catch (ClassCastException e) {
    throw new UnsupportedOperationException("This stream does not support " +
源代码19 项目: big-c   文件:
public ByteBuffer read(ByteBufferPool bufferPool, int maxLength,
    EnumSet<ReadOption> opts) throws IOException,
    UnsupportedOperationException {
  try {
    if (outBuffer.remaining() > 0) {
      // Have some decrypted data unread, need to reset.
      ((Seekable) in).seek(getPos());
    final ByteBuffer buffer = ((HasEnhancedByteBufferAccess) in).
        read(bufferPool, maxLength, opts);
    if (buffer != null) {
      final int n = buffer.remaining();
      if (n > 0) {
        streamOffset += buffer.remaining(); // Read n bytes
        final int pos = buffer.position();
        decrypt(buffer, n, pos);
    return buffer;
  } catch (ClassCastException e) {
    throw new UnsupportedOperationException("This stream does not support " + 
        "enhanced byte buffer access.");
源代码20 项目: big-c   文件:
 * This method returns the current position in the stream.
 * @return Current position in stream as a long
public long getPos() throws IOException {
  if (!(in instanceof Seekable) || !(in instanceof PositionedReadable)){
    //This way of getting the current position will not work for file
    //size which can be fit in an int and hence can not be returned by
    //available method.
    return (this.maxAvailableData -;
    return ((Seekable);

源代码21 项目: big-c   文件:
private void seekCheck(InputStream in, int pos) throws Exception {
  byte[] result = new byte[dataLen];
  ((Seekable) in).seek(pos);
  int n = readAll(in, result, 0, dataLen);
  Assert.assertEquals(dataLen, n + pos);
  byte[] readData = new byte[n];
  System.arraycopy(result, 0, readData, 0, n);
  byte[] expectedData = new byte[n];
  System.arraycopy(data, pos, expectedData, 0, n);
  Assert.assertArrayEquals(readData, expectedData);
源代码22 项目: big-c   文件:
/** Test skip. */
public void testSkip() throws Exception {
  OutputStream out = getOutputStream(defaultBufferSize);
  // Default buffer size
  InputStream in = getInputStream(defaultBufferSize);
  byte[] result = new byte[dataLen];
  int n1 = readAll(in, result, 0, dataLen / 3);
  Assert.assertEquals(n1, ((Seekable) in).getPos());
  long skipped = in.skip(dataLen / 3);
  int n2 = readAll(in, result, 0, dataLen);
  Assert.assertEquals(dataLen, n1 + skipped + n2);
  byte[] readData = new byte[n2];
  System.arraycopy(result, 0, readData, 0, n2);
  byte[] expectedData = new byte[n2];
  System.arraycopy(data, dataLen - n2, expectedData, 0, n2);
  Assert.assertArrayEquals(readData, expectedData);
  try {
    skipped = in.skip(-3);"Skip Negative length should fail.");
  } catch (IllegalArgumentException e) {
    GenericTestUtils.assertExceptionContains("Negative skip length", e);
  // Skip after EOF
  skipped = in.skip(3);
  Assert.assertEquals(skipped, 0);
源代码23 项目: big-c   文件:
private void seekToNewSourceCheck(InputStream in, int targetPos) 
    throws Exception {
  byte[] result = new byte[dataLen];
  ((Seekable) in).seekToNewSource(targetPos);
  int n = readAll(in, result, 0, dataLen);
  Assert.assertEquals(dataLen, n + targetPos);
  byte[] readData = new byte[n];
  System.arraycopy(result, 0, readData, 0, n);
  byte[] expectedData = new byte[n];
  System.arraycopy(data, targetPos, expectedData, 0, n);
  Assert.assertArrayEquals(readData, expectedData);
源代码24 项目: Cobol-to-Hive   文件:
public void initialize(Configuration job, long splitStart,
		long splitLength, Path file) throws IOException {

	start = splitStart;
	end = start + splitLength;"Start of the split:" + start + "-End of split:" + end);
	LOG.debug("VLR initialize started: start pos:" + start + "endpos:"
			+ end);

	// open the file and seek to the start of the split
	final FileSystem fs = file.getFileSystem(job);
	fileIn =;

	CompressionCodec codec = new CompressionCodecFactory(job)
	if (null != codec) {
		isCompressedInput = true;
		decompressor = CodecPool.getDecompressor(codec);
		CompressionInputStream cIn = codec.createInputStream(fileIn,
		filePosition = (Seekable) cIn;
		inputStream = cIn;"Compressed input; cannot compute number of records in the split");
	} else {;
		filePosition = fileIn;
		inputStream = fileIn;
		numBytesRemainingInSplit = splitLength;"Variable length input; cannot compute number of records in the split");

	this.pos = start;
源代码25 项目: Hadoop-BAM   文件:
public SplitCompressionInputStream createInputStream(InputStream seekableIn,
    Decompressor decompressor, long start, long end, READ_MODE readMode) throws IOException {
  BGZFSplitGuesser splitGuesser = new BGZFSplitGuesser(seekableIn);
  long adjustedStart = splitGuesser.guessNextBGZFBlockStart(start, end);
  return new BGZFSplitCompressionInputStream(seekableIn, adjustedStart, end);
源代码26 项目: Hadoop-BAM   文件:
public SplitCompressionInputStream createInputStream(InputStream seekableIn, Decompressor decompressor, long start, long end, READ_MODE readMode) throws IOException {
  if (!(seekableIn instanceof Seekable)) {
    throw new IOException("seekableIn must be an instance of " +
  if (!BlockCompressedInputStream.isValidFile(new BufferedInputStream(seekableIn))) {
    // data is regular gzip, not BGZF
    final CompressionInputStream compressionInputStream = createInputStream(seekableIn,
    return new SplitCompressionInputStream(compressionInputStream, start, end) {
      public int read(byte[] b, int off, int len) throws IOException {
        return, off, len);
      public void resetState() throws IOException {
      public int read() throws IOException {
  BGZFSplitGuesser splitGuesser = new BGZFSplitGuesser(seekableIn);
  long adjustedStart = splitGuesser.guessNextBGZFBlockStart(start, end);
  return new BGZFSplitCompressionInputStream(seekableIn, adjustedStart, end);
源代码27 项目: Hadoop-BAM   文件:
public BGZFSplitGuesser(InputStream is) {
	inFile = is;
	seekableInFile = (Seekable) is;

	buf = ByteBuffer.allocate(8);
源代码28 项目: spork   文件:
 * Create a line reader that reads from the given stream using the
 * given buffer-size.
 * @param in The input stream
 * @param bufferSize Size of the read buffer
 * @throws IOException
public IndexedStorageLineReader(InputStream in, int bufferSize) {
    if( !(in instanceof Seekable) || !(in instanceof PositionedReadable) ) {
          throw new IllegalArgumentException(
              "In is not an instance of Seekable or PositionedReadable");
    } = in;
    this.bufferSize = bufferSize;
    this.buffer = new byte[this.bufferSize];
源代码29 项目: spork   文件:
 * If given offset is within the buffer, adjust the buffer position to read from
 * otherwise seek to the given offset from start of the file.
 * @param offset
 * @throws IOException
public void seek(long offset) throws IOException {
    if ((offset >= bufferOffset) && (offset < (bufferOffset + bufferLength)))
        bufferPosn = (int) (offset - bufferOffset);
    else {
        bufferPosn = bufferLength;
源代码30 项目: RDFS   文件:
public InterleavedInputStream(InputStream in,
    int metaDataBlockSize, int dataBlockSize,
    MetaDataConsumer metaDataConsumer) { = in;
  this.seekableIn = (in instanceof Seekable) ? (Seekable)in : null;
  this.metaDataBlockSize = metaDataBlockSize;
  this.dataBlockSize = dataBlockSize;
  this.metaDataConsumer = metaDataConsumer;
  // Signal that we need to read metadata block first.
  eofReached = false;