下面列出了org.apache.hadoop.fs.Seekable#org.apache.hadoop.fs.PositionedReadable 实例代码,或者点击链接到github查看源代码,也可以在右侧发表评论。
/** Positioned read. It is thread-safe */
@Override
public int read(long position, byte[] buffer, int offset, int length)
throws IOException {
checkStream();
try {
final int n = ((PositionedReadable) in).read(position, buffer, offset,
length);
if (n > 0) {
// This operation does not change the current offset of the file
decrypt(position, buffer, offset, n);
}
return n;
} catch (ClassCastException e) {
throw new UnsupportedOperationException("This stream does not support " +
"positioned read.");
}
}
/** Positioned read fully. It is thread-safe */
@Override
public void readFully(long position, byte[] buffer, int offset, int length)
throws IOException {
checkStream();
try {
((PositionedReadable) in).readFully(position, buffer, offset, length);
if (length > 0) {
// This operation does not change the current offset of the file
decrypt(position, buffer, offset, length);
}
} catch (ClassCastException e) {
throw new UnsupportedOperationException("This stream does not support " +
"positioned readFully.");
}
}
@Test
public void testOnMessageSuccessful() throws IOException {
InputStream mis = mock(InputStream.class, withSettings().extraInterfaces(Seekable.class, PositionedReadable.class));
doReturn(42).when(mis).read(any(byte[].class), anyInt(), anyInt());
FSDataInputStream fdis = new FSDataInputStream(mis);
Response response = getResponse(7L, 4096, fdis);
InOrder inOrder = Mockito.inOrder(mis);
inOrder.verify((Seekable) mis).seek(7);
inOrder.verify(mis).read(any(byte[].class), anyInt(), anyInt());
assertEquals(42, ((DFS.GetFileDataResponse) response.pBody).getRead());
assertEquals(42, response.dBodies[0].readableBytes());
}
@Test
public void testOnMessageEOF() throws IOException {
InputStream mis = mock(InputStream.class, withSettings().extraInterfaces(Seekable.class, PositionedReadable.class));
doReturn(-1).when(mis).read(any(byte[].class), anyInt(), anyInt());
FSDataInputStream fdis = new FSDataInputStream(mis);
Response response = getResponse(7L, 4096, fdis);
InOrder inOrder = Mockito.inOrder(mis);
inOrder.verify((Seekable) mis).seek(7);
inOrder.verify(mis).read(any(byte[].class), anyInt(), anyInt());
assertEquals(-1, ((DFS.GetFileDataResponse) response.pBody).getRead());
assertEquals(0, response.dBodies.length);
}
/** Positioned read. It is thread-safe */
@Override
public int read(long position, byte[] buffer, int offset, int length)
throws IOException {
checkStream();
try {
final int n = ((PositionedReadable) in).read(position, buffer, offset,
length);
if (n > 0) {
// This operation does not change the current offset of the file
decrypt(position, buffer, offset, n);
}
return n;
} catch (ClassCastException e) {
throw new UnsupportedOperationException("This stream does not support " +
"positioned read.");
}
}
/** Positioned read fully. It is thread-safe */
@Override
public void readFully(long position, byte[] buffer, int offset, int length)
throws IOException {
checkStream();
try {
((PositionedReadable) in).readFully(position, buffer, offset, length);
if (length > 0) {
// This operation does not change the current offset of the file
decrypt(position, buffer, offset, length);
}
} catch (ClassCastException e) {
throw new UnsupportedOperationException("This stream does not support " +
"positioned readFully.");
}
}
/** Get input stream. */
private PositionedReadable in() throws IOException {
synchronized (mux) {
if (opened) {
if (err != null)
throw err;
}
else {
opened = true;
try {
in = fs.open(path, bufSize);
if (in == null)
throw new IOException("Failed to open input stream (file system returned null): " + path);
}
catch (IOException e) {
err = e;
throw err;
}
}
return in;
}
}
/**
* Read bytes starting from the specified position. This requires rawStream is an instance of
* {@link PositionedReadable}.
*/
public int read(long position, byte[] buffer, int offset, int length) throws IOException {
if (!(rawStream instanceof PositionedReadable)) {
throw new UnsupportedOperationException("positioned read is not supported by the internal stream");
}
throttle();
int readLen = ((PositionedReadable) rawStream).read(position, buffer, offset, length);
if (readLen != -1) {
bytesRead += readLen;
}
return readLen;
}
/**
* Read bytes starting from the specified position. This requires rawStream is
* an instance of {@link PositionedReadable}.
*/
public int read(long position, byte[] buffer, int offset, int length)
throws IOException {
if (!(rawStream instanceof PositionedReadable)) {
throw new UnsupportedOperationException(
"positioned read is not supported by the internal stream");
}
throttle();
int readLen = ((PositionedReadable) rawStream).read(position, buffer,
offset, length);
if (readLen != -1) {
bytesRead += readLen;
}
return readLen;
}
/**
* This method returns the current position in the stream.
*
* @return Current position in stream as a long
*/
@Override
public long getPos() throws IOException {
if (!(in instanceof Seekable) || !(in instanceof PositionedReadable)){
//This way of getting the current position will not work for file
//size which can be fit in an int and hence can not be returned by
//available method.
return (this.maxAvailableData - this.in.available());
}
else{
return ((Seekable)this.in).getPos();
}
}
private int readAll(InputStream in, long pos, byte[] b, int off, int len)
throws IOException {
int n = 0;
int total = 0;
while (n != -1) {
total += n;
if (total >= len) {
break;
}
n = ((PositionedReadable) in).read(pos + total, b, off + total,
len - total);
}
return total;
}
private void readFullyCheck(InputStream in, int pos) throws Exception {
byte[] result = new byte[dataLen - pos];
((PositionedReadable) in).readFully(pos, result);
byte[] expectedData = new byte[dataLen - pos];
System.arraycopy(data, pos, expectedData, 0, dataLen - pos);
Assert.assertArrayEquals(result, expectedData);
result = new byte[dataLen]; // Exceeds maximum length
try {
((PositionedReadable) in).readFully(pos, result);
Assert.fail("Read fully exceeds maximum length should fail.");
} catch (IOException e) {
}
}
@Test
public void test() throws Exception {
Class<?> byteBufferPositionedReadableClass = getClass("org.apache.hadoop.fs.ByteBufferPositionedReadable");
assumeNonMaprProfile();
final IOException ioException = new IOException("test io exception");
final FSError fsError = newFSError(ioException);
FSDataInputStream fdis = new FSDataInputStream(mock(InputStream.class, withSettings().extraInterfaces(Seekable.class, byteBufferPositionedReadableClass == null ? AutoCloseable.class : byteBufferPositionedReadableClass, PositionedReadable.class, ByteBufferReadable.class).defaultAnswer(new Answer<Object>() {
@Override
public Object answer(InvocationOnMock invocation) throws Throwable {
throw fsError;
}
})));
FSInputStream fdisw = FSDataInputStreamWrapper.of(fdis);
Object[] params = getDummyArguments(method);
try {
method.invoke(fdisw, params);
} catch(InvocationTargetException e) {
if (byteBufferPositionedReadableClass == null) {
assertThat(e.getTargetException(), anyOf(is(instanceOf(IOException.class)), is(instanceOf(UnsupportedOperationException.class))));
} else {
assertThat(e.getTargetException(), is(instanceOf(IOException.class)));
}
if (e.getTargetException() instanceof IOException) {
assertThat((IOException) e.getTargetException(), is(sameInstance(ioException)));
}
}
}
/**
* Read bytes starting from the specified position. This requires rawStream is
* an instance of {@link PositionedReadable}.
*/
public int read(long position, byte[] buffer, int offset, int length)
throws IOException {
if (!(rawStream instanceof PositionedReadable)) {
throw new UnsupportedOperationException(
"positioned read is not supported by the internal stream");
}
throttle();
int readLen = ((PositionedReadable) rawStream).read(position, buffer,
offset, length);
if (readLen != -1) {
bytesRead += readLen;
}
return readLen;
}
/**
* This method returns the current position in the stream.
*
* @return Current position in stream as a long
*/
@Override
public long getPos() throws IOException {
if (!(in instanceof Seekable) || !(in instanceof PositionedReadable)){
//This way of getting the current position will not work for file
//size which can be fit in an int and hence can not be returned by
//available method.
return (this.maxAvailableData - this.in.available());
}
else{
return ((Seekable)this.in).getPos();
}
}
private int readAll(InputStream in, long pos, byte[] b, int off, int len)
throws IOException {
int n = 0;
int total = 0;
while (n != -1) {
total += n;
if (total >= len) {
break;
}
n = ((PositionedReadable) in).read(pos + total, b, off + total,
len - total);
}
return total;
}
private void readFullyCheck(InputStream in, int pos) throws Exception {
byte[] result = new byte[dataLen - pos];
((PositionedReadable) in).readFully(pos, result);
byte[] expectedData = new byte[dataLen - pos];
System.arraycopy(data, pos, expectedData, 0, dataLen - pos);
Assert.assertArrayEquals(result, expectedData);
result = new byte[dataLen]; // Exceeds maximum length
try {
((PositionedReadable) in).readFully(pos, result);
Assert.fail("Read fully exceeds maximum length should fail.");
} catch (IOException e) {
}
}
/**
* Read bytes starting from the specified position. This requires rawStream is
* an instance of {@link PositionedReadable}.
* @param position
* @param buffer
* @param offset
* @param length
* @return the number of bytes read
*/
public int read(long position, byte[] buffer, int offset, int length)
throws IOException {
if (!(rawStream instanceof PositionedReadable)) {
throw new UnsupportedOperationException(
"positioned read is not supported by the internal stream");
}
throttle();
int readLen = ((PositionedReadable) rawStream).read(position, buffer,
offset, length);
if (readLen != -1) {
bytesRead += readLen;
}
return readLen;
}
/**
* Create a line reader that reads from the given stream using the
* given buffer-size.
* @param in The input stream
* @param bufferSize Size of the read buffer
* @throws IOException
*/
public IndexedStorageLineReader(InputStream in, int bufferSize) {
if( !(in instanceof Seekable) || !(in instanceof PositionedReadable) ) {
throw new IllegalArgumentException(
"In is not an instance of Seekable or PositionedReadable");
}
this.in = in;
this.bufferSize = bufferSize;
this.buffer = new byte[this.bufferSize];
}
@Override
public int read(long position, byte[] buffer, int offset, int length)
throws IOException {
injectFault();
return ((PositionedReadable)in).read(position, buffer, offset, length);
}
/**
* Create a compression input stream that reads
* the decompressed bytes from the given stream.
*
* @param in The input stream to be compressed.
* @throws IOException
*/
protected CompressionInputStream(InputStream in) throws IOException {
if (!(in instanceof Seekable) || !(in instanceof PositionedReadable)) {
this.maxAvailableData = in.available();
}
this.in = in;
}
/**
* Create a compression input stream that reads
* the decompressed bytes from the given stream.
*
* @param in The input stream to be compressed.
* @throws IOException
*/
protected CompressionInputStream(InputStream in) throws IOException {
if (!(in instanceof Seekable) || !(in instanceof PositionedReadable)) {
this.maxAvailableData = in.available();
}
this.in = in;
}