类org.apache.hadoop.fs.PositionedReadable源码实例Demo

下面列出了怎么用org.apache.hadoop.fs.PositionedReadable的API类实例代码及写法,或者点击链接到github查看源代码。

源代码1 项目: hadoop   文件: CryptoInputStream.java
/** Positioned read. It is thread-safe */
@Override
public int read(long position, byte[] buffer, int offset, int length)
    throws IOException {
  checkStream();
  try {
    final int n = ((PositionedReadable) in).read(position, buffer, offset, 
        length);
    if (n > 0) {
      // This operation does not change the current offset of the file
      decrypt(position, buffer, offset, n);
    }
    
    return n;
  } catch (ClassCastException e) {
    throw new UnsupportedOperationException("This stream does not support " +
        "positioned read.");
  }
}
 
源代码2 项目: hadoop   文件: CryptoInputStream.java
/** Positioned read fully. It is thread-safe */
@Override
public void readFully(long position, byte[] buffer, int offset, int length)
    throws IOException {
  checkStream();
  try {
    ((PositionedReadable) in).readFully(position, buffer, offset, length);
    if (length > 0) {
      // This operation does not change the current offset of the file
      decrypt(position, buffer, offset, length);
    }
  } catch (ClassCastException e) {
    throw new UnsupportedOperationException("This stream does not support " +
        "positioned readFully.");
  }
}
 
源代码3 项目: dremio-oss   文件: TestPDFSProtocol.java
@Test
public void testOnMessageSuccessful() throws IOException {
  InputStream mis = mock(InputStream.class, withSettings().extraInterfaces(Seekable.class, PositionedReadable.class));
  doReturn(42).when(mis).read(any(byte[].class), anyInt(), anyInt());

  FSDataInputStream fdis = new FSDataInputStream(mis);
  Response response = getResponse(7L, 4096, fdis);

  InOrder inOrder = Mockito.inOrder(mis);

  inOrder.verify((Seekable) mis).seek(7);
  inOrder.verify(mis).read(any(byte[].class), anyInt(), anyInt());

  assertEquals(42, ((DFS.GetFileDataResponse) response.pBody).getRead());
  assertEquals(42, response.dBodies[0].readableBytes());
}
 
源代码4 项目: dremio-oss   文件: TestPDFSProtocol.java
@Test
public void testOnMessageEOF() throws IOException {
  InputStream mis = mock(InputStream.class, withSettings().extraInterfaces(Seekable.class, PositionedReadable.class));
  doReturn(-1).when(mis).read(any(byte[].class), anyInt(), anyInt());

  FSDataInputStream fdis = new FSDataInputStream(mis);
  Response response = getResponse(7L, 4096, fdis);

  InOrder inOrder = Mockito.inOrder(mis);

  inOrder.verify((Seekable) mis).seek(7);
  inOrder.verify(mis).read(any(byte[].class), anyInt(), anyInt());

  assertEquals(-1, ((DFS.GetFileDataResponse) response.pBody).getRead());
  assertEquals(0, response.dBodies.length);
}
 
源代码5 项目: big-c   文件: CryptoInputStream.java
/** Positioned read. It is thread-safe */
@Override
public int read(long position, byte[] buffer, int offset, int length)
    throws IOException {
  checkStream();
  try {
    final int n = ((PositionedReadable) in).read(position, buffer, offset, 
        length);
    if (n > 0) {
      // This operation does not change the current offset of the file
      decrypt(position, buffer, offset, n);
    }
    
    return n;
  } catch (ClassCastException e) {
    throw new UnsupportedOperationException("This stream does not support " +
        "positioned read.");
  }
}
 
源代码6 项目: big-c   文件: CryptoInputStream.java
/** Positioned read fully. It is thread-safe */
@Override
public void readFully(long position, byte[] buffer, int offset, int length)
    throws IOException {
  checkStream();
  try {
    ((PositionedReadable) in).readFully(position, buffer, offset, length);
    if (length > 0) {
      // This operation does not change the current offset of the file
      decrypt(position, buffer, offset, length);
    }
  } catch (ClassCastException e) {
    throw new UnsupportedOperationException("This stream does not support " +
        "positioned readFully.");
  }
}
 
/** Get input stream. */
private PositionedReadable in() throws IOException {
    synchronized (mux) {
        if (opened) {
            if (err != null)
                throw err;
        }
        else {
            opened = true;

            try {
                in = fs.open(path, bufSize);

                if (in == null)
                    throw new IOException("Failed to open input stream (file system returned null): " + path);
            }
            catch (IOException e) {
                err = e;

                throw err;
            }
        }

        return in;
    }
}
 
源代码8 项目: circus-train   文件: ThrottledInputStream.java
/**
 * Read bytes starting from the specified position. This requires rawStream is an instance of
 * {@link PositionedReadable}.
 */
public int read(long position, byte[] buffer, int offset, int length) throws IOException {
  if (!(rawStream instanceof PositionedReadable)) {
    throw new UnsupportedOperationException("positioned read is not supported by the internal stream");
  }
  throttle();
  int readLen = ((PositionedReadable) rawStream).read(position, buffer, offset, length);
  if (readLen != -1) {
    bytesRead += readLen;
  }
  return readLen;
}
 
源代码9 项目: hadoop   文件: ThrottledInputStream.java
/**
 * Read bytes starting from the specified position. This requires rawStream is
 * an instance of {@link PositionedReadable}.
 */
public int read(long position, byte[] buffer, int offset, int length)
    throws IOException {
  if (!(rawStream instanceof PositionedReadable)) {
    throw new UnsupportedOperationException(
        "positioned read is not supported by the internal stream");
  }
  throttle();
  int readLen = ((PositionedReadable) rawStream).read(position, buffer,
      offset, length);
  if (readLen != -1) {
    bytesRead += readLen;
  }
  return readLen;
}
 
源代码10 项目: hadoop   文件: CompressionInputStream.java
/**
 * This method returns the current position in the stream.
 *
 * @return Current position in stream as a long
 */
@Override
public long getPos() throws IOException {
  if (!(in instanceof Seekable) || !(in instanceof PositionedReadable)){
    //This way of getting the current position will not work for file
    //size which can be fit in an int and hence can not be returned by
    //available method.
    return (this.maxAvailableData - this.in.available());
  }
  else{
    return ((Seekable)this.in).getPos();
  }

}
 
源代码11 项目: hadoop   文件: CryptoStreamsTestBase.java
private int readAll(InputStream in, long pos, byte[] b, int off, int len) 
    throws IOException {
  int n = 0;
  int total = 0;
  while (n != -1) {
    total += n;
    if (total >= len) {
      break;
    }
    n = ((PositionedReadable) in).read(pos + total, b, off + total, 
        len - total);
  }
  
  return total;
}
 
源代码12 项目: hadoop   文件: CryptoStreamsTestBase.java
private void readFullyCheck(InputStream in, int pos) throws Exception {
  byte[] result = new byte[dataLen - pos];
  ((PositionedReadable) in).readFully(pos, result);
  
  byte[] expectedData = new byte[dataLen - pos];
  System.arraycopy(data, pos, expectedData, 0, dataLen - pos);
  Assert.assertArrayEquals(result, expectedData);
  
  result = new byte[dataLen]; // Exceeds maximum length 
  try {
    ((PositionedReadable) in).readFully(pos, result);
    Assert.fail("Read fully exceeds maximum length should fail.");
  } catch (IOException e) {
  }
}
 
源代码13 项目: dremio-oss   文件: TestFSDataInputStreamWrapper.java
@Test
public void test() throws Exception {
  Class<?> byteBufferPositionedReadableClass = getClass("org.apache.hadoop.fs.ByteBufferPositionedReadable");

  assumeNonMaprProfile();
  final IOException ioException = new IOException("test io exception");
  final FSError fsError = newFSError(ioException);
  FSDataInputStream fdis = new FSDataInputStream(mock(InputStream.class, withSettings().extraInterfaces(Seekable.class, byteBufferPositionedReadableClass == null ? AutoCloseable.class : byteBufferPositionedReadableClass, PositionedReadable.class, ByteBufferReadable.class).defaultAnswer(new Answer<Object>() {
    @Override
    public Object answer(InvocationOnMock invocation) throws Throwable {
      throw fsError;
    }
  })));

  FSInputStream fdisw = FSDataInputStreamWrapper.of(fdis);
  Object[] params = getDummyArguments(method);
  try {
    method.invoke(fdisw, params);
  } catch(InvocationTargetException e) {
    if (byteBufferPositionedReadableClass == null) {
      assertThat(e.getTargetException(), anyOf(is(instanceOf(IOException.class)), is(instanceOf(UnsupportedOperationException.class))));
    } else {
      assertThat(e.getTargetException(), is(instanceOf(IOException.class)));
    }
    if (e.getTargetException() instanceof IOException) {
      assertThat((IOException) e.getTargetException(), is(sameInstance(ioException)));
    }
  }
}
 
源代码14 项目: big-c   文件: ThrottledInputStream.java
/**
 * Read bytes starting from the specified position. This requires rawStream is
 * an instance of {@link PositionedReadable}.
 */
public int read(long position, byte[] buffer, int offset, int length)
    throws IOException {
  if (!(rawStream instanceof PositionedReadable)) {
    throw new UnsupportedOperationException(
        "positioned read is not supported by the internal stream");
  }
  throttle();
  int readLen = ((PositionedReadable) rawStream).read(position, buffer,
      offset, length);
  if (readLen != -1) {
    bytesRead += readLen;
  }
  return readLen;
}
 
源代码15 项目: big-c   文件: CompressionInputStream.java
/**
 * This method returns the current position in the stream.
 *
 * @return Current position in stream as a long
 */
@Override
public long getPos() throws IOException {
  if (!(in instanceof Seekable) || !(in instanceof PositionedReadable)){
    //This way of getting the current position will not work for file
    //size which can be fit in an int and hence can not be returned by
    //available method.
    return (this.maxAvailableData - this.in.available());
  }
  else{
    return ((Seekable)this.in).getPos();
  }

}
 
源代码16 项目: big-c   文件: CryptoStreamsTestBase.java
private int readAll(InputStream in, long pos, byte[] b, int off, int len) 
    throws IOException {
  int n = 0;
  int total = 0;
  while (n != -1) {
    total += n;
    if (total >= len) {
      break;
    }
    n = ((PositionedReadable) in).read(pos + total, b, off + total, 
        len - total);
  }
  
  return total;
}
 
源代码17 项目: big-c   文件: CryptoStreamsTestBase.java
private void readFullyCheck(InputStream in, int pos) throws Exception {
  byte[] result = new byte[dataLen - pos];
  ((PositionedReadable) in).readFully(pos, result);
  
  byte[] expectedData = new byte[dataLen - pos];
  System.arraycopy(data, pos, expectedData, 0, dataLen - pos);
  Assert.assertArrayEquals(result, expectedData);
  
  result = new byte[dataLen]; // Exceeds maximum length 
  try {
    ((PositionedReadable) in).readFully(pos, result);
    Assert.fail("Read fully exceeds maximum length should fail.");
  } catch (IOException e) {
  }
}
 
源代码18 项目: hbase   文件: ThrottledInputStream.java
/**
 * Read bytes starting from the specified position. This requires rawStream is
 * an instance of {@link PositionedReadable}.
 * @param position
 * @param buffer
 * @param offset
 * @param length
 * @return the number of bytes read
 */
public int read(long position, byte[] buffer, int offset, int length)
    throws IOException {
  if (!(rawStream instanceof PositionedReadable)) {
    throw new UnsupportedOperationException(
      "positioned read is not supported by the internal stream");
  }
  throttle();
  int readLen = ((PositionedReadable) rawStream).read(position, buffer,
    offset, length);
  if (readLen != -1) {
    bytesRead += readLen;
  }
  return readLen;
}
 
源代码19 项目: spork   文件: IndexedStorage.java
/**
 * Create a line reader that reads from the given stream using the
 * given buffer-size.
 * @param in The input stream
 * @param bufferSize Size of the read buffer
 * @throws IOException
 */
public IndexedStorageLineReader(InputStream in, int bufferSize) {
    if( !(in instanceof Seekable) || !(in instanceof PositionedReadable) ) {
          throw new IllegalArgumentException(
              "In is not an instance of Seekable or PositionedReadable");
    }

    this.in = in;
    this.bufferSize = bufferSize;
    this.buffer = new byte[this.bufferSize];
}
 
源代码20 项目: hbase   文件: TestFSErrorsExposed.java
@Override
public int read(long position, byte[] buffer, int offset, int length)
  throws IOException {
  injectFault();
  return ((PositionedReadable)in).read(position, buffer, offset, length);
}
 
源代码21 项目: hadoop   文件: CompressionInputStream.java
/**
 * Create a compression input stream that reads
 * the decompressed bytes from the given stream.
 * 
 * @param in The input stream to be compressed.
 * @throws IOException
 */
protected CompressionInputStream(InputStream in) throws IOException {
  if (!(in instanceof Seekable) || !(in instanceof PositionedReadable)) {
      this.maxAvailableData = in.available();
  }
  this.in = in;
}
 
源代码22 项目: big-c   文件: CompressionInputStream.java
/**
 * Create a compression input stream that reads
 * the decompressed bytes from the given stream.
 * 
 * @param in The input stream to be compressed.
 * @throws IOException
 */
protected CompressionInputStream(InputStream in) throws IOException {
  if (!(in instanceof Seekable) || !(in instanceof PositionedReadable)) {
      this.maxAvailableData = in.available();
  }
  this.in = in;
}
 
 类所在包
 同包方法