下面列出了org.apache.hadoop.io.compress.CompressionInputStream#read ( ) 实例代码,或者点击链接到github查看源代码,也可以在右侧发表评论。
@Test
public void testSnappyCompressionSimple() throws IOException
{
if (checkNativeSnappy()) {
return;
}
File snappyFile = new File(testMeta.getDir(), "snappyTestFile.snappy");
BufferedOutputStream os = new BufferedOutputStream(new FileOutputStream(snappyFile));
Configuration conf = new Configuration();
CompressionCodec codec = (CompressionCodec)ReflectionUtils.newInstance(SnappyCodec.class, conf);
FilterStreamCodec.SnappyFilterStream filterStream = new FilterStreamCodec.SnappyFilterStream(
codec.createOutputStream(os));
int ONE_MB = 1024 * 1024;
String testStr = "TestSnap-16bytes";
for (int i = 0; i < ONE_MB; i++) { // write 16 MBs
filterStream.write(testStr.getBytes());
}
filterStream.flush();
filterStream.close();
CompressionInputStream is = codec.createInputStream(new FileInputStream(snappyFile));
byte[] recovered = new byte[testStr.length()];
int bytesRead = is.read(recovered);
is.close();
assertEquals(testStr, new String(recovered));
}
@Override
public SplitCompressionInputStream createInputStream(InputStream seekableIn, Decompressor decompressor, long start, long end, READ_MODE readMode) throws IOException {
if (!(seekableIn instanceof Seekable)) {
throw new IOException("seekableIn must be an instance of " +
Seekable.class.getName());
}
if (!BlockCompressedInputStream.isValidFile(new BufferedInputStream(seekableIn))) {
// data is regular gzip, not BGZF
((Seekable)seekableIn).seek(0);
final CompressionInputStream compressionInputStream = createInputStream(seekableIn,
decompressor);
return new SplitCompressionInputStream(compressionInputStream, start, end) {
@Override
public int read(byte[] b, int off, int len) throws IOException {
return compressionInputStream.read(b, off, len);
}
@Override
public void resetState() throws IOException {
compressionInputStream.resetState();
}
@Override
public int read() throws IOException {
return compressionInputStream.read();
}
};
}
BGZFSplitGuesser splitGuesser = new BGZFSplitGuesser(seekableIn);
long adjustedStart = splitGuesser.guessNextBGZFBlockStart(start, end);
((Seekable)seekableIn).seek(adjustedStart);
return new BGZFSplitCompressionInputStream(seekableIn, adjustedStart, end);
}
@Test
public void TestSnappyStream() throws IOException {
SnappyCodec codec = new SnappyCodec();
codec.setConf(new Configuration());
int blockSize = 1024;
int inputSize = blockSize * 1024;
byte[] input = new byte[inputSize];
for (int i = 0; i < inputSize; ++i) {
input[i] = (byte)i;
}
ByteArrayOutputStream compressedStream = new ByteArrayOutputStream();
CompressionOutputStream compressor = codec.createOutputStream(compressedStream);
int bytesCompressed = 0;
while (bytesCompressed < inputSize) {
int len = Math.min(inputSize - bytesCompressed, blockSize);
compressor.write(input, bytesCompressed, len);
bytesCompressed += len;
}
compressor.finish();
byte[] rawCompressed = Snappy.compress(input);
byte[] codecCompressed = compressedStream.toByteArray();
// Validate that the result from the codec is the same as if we compressed the
// buffer directly.
assertArrayEquals(rawCompressed, codecCompressed);
ByteArrayInputStream inputStream = new ByteArrayInputStream(codecCompressed);
CompressionInputStream decompressor = codec.createInputStream(inputStream);
byte[] codecDecompressed = new byte[inputSize];
int bytesDecompressed = 0;
int numBytes;
while ((numBytes = decompressor.read(codecDecompressed, bytesDecompressed, blockSize)) != 0) {
bytesDecompressed += numBytes;
if (bytesDecompressed == inputSize) break;
}
byte[] rawDecompressed = Snappy.uncompress(rawCompressed);
assertArrayEquals(input, rawDecompressed);
assertArrayEquals(input, codecDecompressed);
}