下面列出了org.apache.hadoop.fs.Seekable#org.apache.hadoop.fs.ByteBufferReadable 实例代码,或者点击链接到github查看源代码,也可以在右侧发表评论。
public CryptoInputStream(InputStream in, CryptoCodec codec,
int bufferSize, byte[] key, byte[] iv, long streamOffset) throws IOException {
super(in);
CryptoStreamUtils.checkCodec(codec);
this.bufferSize = CryptoStreamUtils.checkBufferSize(codec, bufferSize);
this.codec = codec;
this.key = key.clone();
this.initIV = iv.clone();
this.iv = iv.clone();
this.streamOffset = streamOffset;
isByteBufferReadable = in instanceof ByteBufferReadable;
isReadableByteChannel = in instanceof ReadableByteChannel;
inBuffer = ByteBuffer.allocateDirect(this.bufferSize);
outBuffer = ByteBuffer.allocateDirect(this.bufferSize);
decryptor = getDecryptor();
resetStreamOffset(streamOffset);
}
public CryptoInputStream(InputStream in, CryptoCodec codec,
int bufferSize, byte[] key, byte[] iv, long streamOffset) throws IOException {
super(in);
CryptoStreamUtils.checkCodec(codec);
this.bufferSize = CryptoStreamUtils.checkBufferSize(codec, bufferSize);
this.codec = codec;
this.key = key.clone();
this.initIV = iv.clone();
this.iv = iv.clone();
this.streamOffset = streamOffset;
isByteBufferReadable = in instanceof ByteBufferReadable;
isReadableByteChannel = in instanceof ReadableByteChannel;
inBuffer = ByteBuffer.allocateDirect(this.bufferSize);
outBuffer = ByteBuffer.allocateDirect(this.bufferSize);
decryptor = getDecryptor();
resetStreamOffset(streamOffset);
}
public static void readFully(FSDataInputStream reader, long offset, ByteBuffer buffer) throws IOException {
if (offset >= 0) {
reader.seek(offset);
}
InputStream is = reader.getWrappedStream();
if (!(is instanceof ByteBufferReadable)) {
logger.trace("Using read bytes method");
byte[] bytes = new byte[buffer.remaining()];
reader.readFully(bytes);
buffer.put(bytes);
} else {
while (buffer.hasRemaining()) {
int pos = buffer.position();
int rt = reader.read(buffer);
if (rt < 0) {
throw new IOException("End of stream");
}
buffer.position(pos + rt);
}
}
Preconditions.checkState(!buffer.hasRemaining());
}
/**
* Creates a new instance with the mandatory characters for handling newlines
* transparently. lineSeparator the sequence of characters that represent a
* newline, as defined in {@link Format#getLineSeparator()}
* normalizedLineSeparator the normalized newline character (as defined in
* {@link Format#getNormalizedNewline()}) that is used to replace any
* lineSeparator sequence found in the input.
*/
public TextInput(TextParsingSettings settings, InputStream input, DrillBuf readBuffer, long startPos, long endPos) {
this.lineSeparator = settings.getNewLineDelimiter();
byte normalizedLineSeparator = settings.getNormalizedNewLine();
Preconditions.checkArgument(input instanceof Seekable, "Text input only supports an InputStream that supports Seekable.");
boolean isCompressed = input instanceof CompressionInputStream;
Preconditions.checkArgument(!isCompressed || startPos == 0, "Cannot use split on compressed stream.");
// splits aren't allowed with compressed data. The split length will be the compressed size which means we'll normally end prematurely.
if (isCompressed && endPos > 0) {
endPos = Long.MAX_VALUE;
}
this.input = input;
this.seekable = (Seekable) input;
this.settings = settings;
if (input instanceof FSDataInputStream) {
this.inputFS = (FSDataInputStream) input;
this.bufferReadable = inputFS.getWrappedStream() instanceof ByteBufferReadable;
} else {
this.inputFS = null;
this.bufferReadable = false;
}
this.startPos = startPos;
this.endPos = endPos;
this.normalizedLineSeparator = normalizedLineSeparator;
this.buffer = readBuffer;
this.bStart = buffer.memoryAddress();
this.bStartMinus1 = bStart -1;
this.underlyingBuffer = buffer.nioBuffer(0, buffer.capacity());
}
private void byteBufferReadCheck(InputStream in, ByteBuffer buf,
int bufPos) throws Exception {
buf.position(bufPos);
int n = ((ByteBufferReadable) in).read(buf);
Assert.assertEquals(bufPos + n, buf.position());
byte[] readData = new byte[n];
buf.rewind();
buf.position(bufPos);
buf.get(readData);
byte[] expectedData = new byte[n];
System.arraycopy(data, 0, expectedData, 0, n);
Assert.assertArrayEquals(readData, expectedData);
}
public static FSInputStream of(FSDataInputStream in) throws IOException {
if (in.getWrappedStream() instanceof ByteBufferReadable) {
return new FSDataInputStreamWrapper(in);
}
return new ByteArrayFSInputStream(in);
}
@Test
public void test() throws Exception {
Class<?> byteBufferPositionedReadableClass = getClass("org.apache.hadoop.fs.ByteBufferPositionedReadable");
assumeNonMaprProfile();
final IOException ioException = new IOException("test io exception");
final FSError fsError = newFSError(ioException);
FSDataInputStream fdis = new FSDataInputStream(mock(InputStream.class, withSettings().extraInterfaces(Seekable.class, byteBufferPositionedReadableClass == null ? AutoCloseable.class : byteBufferPositionedReadableClass, PositionedReadable.class, ByteBufferReadable.class).defaultAnswer(new Answer<Object>() {
@Override
public Object answer(InvocationOnMock invocation) throws Throwable {
throw fsError;
}
})));
FSInputStream fdisw = FSDataInputStreamWrapper.of(fdis);
Object[] params = getDummyArguments(method);
try {
method.invoke(fdisw, params);
} catch(InvocationTargetException e) {
if (byteBufferPositionedReadableClass == null) {
assertThat(e.getTargetException(), anyOf(is(instanceOf(IOException.class)), is(instanceOf(UnsupportedOperationException.class))));
} else {
assertThat(e.getTargetException(), is(instanceOf(IOException.class)));
}
if (e.getTargetException() instanceof IOException) {
assertThat((IOException) e.getTargetException(), is(sameInstance(ioException)));
}
}
}
public static FSInputStream of(FSDataInputStream in) throws IOException {
if (in.getWrappedStream() instanceof ByteBufferReadable) {
return new FSDataInputStreamWrapper(in);
}
return new ByteArrayFSInputStream(in);
}
public static FSInputStream of(FSDataInputStream in) throws IOException {
if (in.getWrappedStream() instanceof ByteBufferReadable) {
return new FSDataInputStreamWrapper(in);
}
return new ByteArrayFSInputStream(in);
}
private void byteBufferReadCheck(InputStream in, ByteBuffer buf,
int bufPos) throws Exception {
buf.position(bufPos);
int n = ((ByteBufferReadable) in).read(buf);
Assert.assertEquals(bufPos + n, buf.position());
byte[] readData = new byte[n];
buf.rewind();
buf.position(bufPos);
buf.get(readData);
byte[] expectedData = new byte[n];
System.arraycopy(data, 0, expectedData, 0, n);
Assert.assertArrayEquals(readData, expectedData);
}
public FSDataInputChannel(FSDataInputStream inputStream) throws IOException {
if (inputStream.getWrappedStream() instanceof ByteBufferReadable) {
this.isDirectRead = true;
} else {
/* LocalFileSystem, S3 does not support ByteBufferReadable */
this.channel = Channels.newChannel(inputStream);
}
this.inputStream = inputStream;
this.size = inputStream.getPos() + inputStream.available();
}
public ByteBufInputChannel(InputStream inputStream) {
if (inputStream instanceof ByteBufferReadable) {
this.byteBufferReadable = (ByteBufferReadable) inputStream;
} else {
this.channel = Channels.newChannel(inputStream);
}
this.inputStream = inputStream;
}
public static boolean isByteBufferReadable(FSDataInputStream is) {
InputStream cur = is.getWrappedStream();
for (;;) {
if ((cur instanceof FSDataInputStream)) {
cur = ((FSDataInputStream) cur).getWrappedStream();
} else {
break;
}
}
return cur instanceof ByteBufferReadable;
}
protected void checkStreamSupportsByteBuffer() throws UnsupportedOperationException {
// Check input stream supports ByteBuffer
if (!(in instanceof ByteBufferReadable)) {
throw new UnsupportedOperationException("The input stream is not ByteBuffer readable.");
}
}
@Test(timeout=120000)
public void testCombinedOp() throws Exception {
OutputStream out = getOutputStream(defaultBufferSize);
writeData(out);
final int len1 = dataLen / 8;
final int len2 = dataLen / 10;
InputStream in = getInputStream(defaultBufferSize);
// Read len1 data.
byte[] readData = new byte[len1];
readAll(in, readData, 0, len1);
byte[] expectedData = new byte[len1];
System.arraycopy(data, 0, expectedData, 0, len1);
Assert.assertArrayEquals(readData, expectedData);
long pos = ((Seekable) in).getPos();
Assert.assertEquals(len1, pos);
// Seek forward len2
((Seekable) in).seek(pos + len2);
// Skip forward len2
long n = in.skip(len2);
Assert.assertEquals(len2, n);
// Pos: 1/4 dataLen
positionedReadCheck(in , dataLen / 4);
// Pos should be len1 + len2 + len2
pos = ((Seekable) in).getPos();
Assert.assertEquals(len1 + len2 + len2, pos);
// Read forward len1
ByteBuffer buf = ByteBuffer.allocate(len1);
int nRead = ((ByteBufferReadable) in).read(buf);
Assert.assertEquals(nRead, buf.position());
readData = new byte[nRead];
buf.rewind();
buf.get(readData);
expectedData = new byte[nRead];
System.arraycopy(data, (int)pos, expectedData, 0, nRead);
Assert.assertArrayEquals(readData, expectedData);
long lastPos = pos;
// Pos should be lastPos + nRead
pos = ((Seekable) in).getPos();
Assert.assertEquals(lastPos + nRead, pos);
// Pos: 1/3 dataLen
positionedReadCheck(in , dataLen / 3);
// Read forward len1
readData = new byte[len1];
readAll(in, readData, 0, len1);
expectedData = new byte[len1];
System.arraycopy(data, (int)pos, expectedData, 0, len1);
Assert.assertArrayEquals(readData, expectedData);
lastPos = pos;
// Pos should be lastPos + len1
pos = ((Seekable) in).getPos();
Assert.assertEquals(lastPos + len1, pos);
// Read forward len1
buf = ByteBuffer.allocate(len1);
nRead = ((ByteBufferReadable) in).read(buf);
Assert.assertEquals(nRead, buf.position());
readData = new byte[nRead];
buf.rewind();
buf.get(readData);
expectedData = new byte[nRead];
System.arraycopy(data, (int)pos, expectedData, 0, nRead);
Assert.assertArrayEquals(readData, expectedData);
lastPos = pos;
// Pos should be lastPos + nRead
pos = ((Seekable) in).getPos();
Assert.assertEquals(lastPos + nRead, pos);
// ByteBuffer read after EOF
((Seekable) in).seek(dataLen);
buf.clear();
n = ((ByteBufferReadable) in).read(buf);
Assert.assertEquals(n, -1);
in.close();
}
@Test(timeout=120000)
public void testCombinedOp() throws Exception {
OutputStream out = getOutputStream(defaultBufferSize);
writeData(out);
final int len1 = dataLen / 8;
final int len2 = dataLen / 10;
InputStream in = getInputStream(defaultBufferSize);
// Read len1 data.
byte[] readData = new byte[len1];
readAll(in, readData, 0, len1);
byte[] expectedData = new byte[len1];
System.arraycopy(data, 0, expectedData, 0, len1);
Assert.assertArrayEquals(readData, expectedData);
long pos = ((Seekable) in).getPos();
Assert.assertEquals(len1, pos);
// Seek forward len2
((Seekable) in).seek(pos + len2);
// Skip forward len2
long n = in.skip(len2);
Assert.assertEquals(len2, n);
// Pos: 1/4 dataLen
positionedReadCheck(in , dataLen / 4);
// Pos should be len1 + len2 + len2
pos = ((Seekable) in).getPos();
Assert.assertEquals(len1 + len2 + len2, pos);
// Read forward len1
ByteBuffer buf = ByteBuffer.allocate(len1);
int nRead = ((ByteBufferReadable) in).read(buf);
Assert.assertEquals(nRead, buf.position());
readData = new byte[nRead];
buf.rewind();
buf.get(readData);
expectedData = new byte[nRead];
System.arraycopy(data, (int)pos, expectedData, 0, nRead);
Assert.assertArrayEquals(readData, expectedData);
long lastPos = pos;
// Pos should be lastPos + nRead
pos = ((Seekable) in).getPos();
Assert.assertEquals(lastPos + nRead, pos);
// Pos: 1/3 dataLen
positionedReadCheck(in , dataLen / 3);
// Read forward len1
readData = new byte[len1];
readAll(in, readData, 0, len1);
expectedData = new byte[len1];
System.arraycopy(data, (int)pos, expectedData, 0, len1);
Assert.assertArrayEquals(readData, expectedData);
lastPos = pos;
// Pos should be lastPos + len1
pos = ((Seekable) in).getPos();
Assert.assertEquals(lastPos + len1, pos);
// Read forward len1
buf = ByteBuffer.allocate(len1);
nRead = ((ByteBufferReadable) in).read(buf);
Assert.assertEquals(nRead, buf.position());
readData = new byte[nRead];
buf.rewind();
buf.get(readData);
expectedData = new byte[nRead];
System.arraycopy(data, (int)pos, expectedData, 0, nRead);
Assert.assertArrayEquals(readData, expectedData);
lastPos = pos;
// Pos should be lastPos + nRead
pos = ((Seekable) in).getPos();
Assert.assertEquals(lastPos + nRead, pos);
// ByteBuffer read after EOF
((Seekable) in).seek(dataLen);
buf.clear();
n = ((ByteBufferReadable) in).read(buf);
Assert.assertEquals(n, -1);
in.close();
}