下面列出了怎么用org.apache.hadoop.io.compress.CompressionInputStream的API类实例代码及写法,或者点击链接到github查看源代码。
/** Read a compressed buffer */
private synchronized void readBuffer(DataInputBuffer buffer,
CompressionInputStream filter) throws IOException {
// Read data into a temporary buffer
DataOutputBuffer dataBuffer = new DataOutputBuffer();
try {
int dataBufferLength = WritableUtils.readVInt(in);
dataBuffer.write(in, dataBufferLength);
// Set up 'buffer' connected to the input-stream
buffer.reset(dataBuffer.getData(), 0, dataBuffer.getLength());
} finally {
dataBuffer.close();
}
// Reset the codec
filter.resetState();
}
/** Read a compressed buffer */
private synchronized void readBuffer(DataInputBuffer buffer,
CompressionInputStream filter) throws IOException {
// Read data into a temporary buffer
DataOutputBuffer dataBuffer = new DataOutputBuffer();
try {
int dataBufferLength = WritableUtils.readVInt(in);
dataBuffer.write(in, dataBufferLength);
// Set up 'buffer' connected to the input-stream
buffer.reset(dataBuffer.getData(), 0, dataBuffer.getLength());
} finally {
dataBuffer.close();
}
// Reset the codec
filter.resetState();
}
/** Read a compressed buffer */
private synchronized void readBuffer(DataInputBuffer buffer,
CompressionInputStream filter) throws IOException {
// Read data into a temporary buffer
DataOutputBuffer dataBuffer = new DataOutputBuffer();
try {
int dataBufferLength = WritableUtils.readVInt(in);
dataBuffer.write(in, dataBufferLength);
// Set up 'buffer' connected to the input-stream
buffer.reset(dataBuffer.getData(), 0, dataBuffer.getLength());
} finally {
dataBuffer.close();
}
// Reset the codec
filter.resetState();
}
public static BufferedReader getBufferedReader(File file, MapredContext context)
throws IOException {
URI fileuri = file.toURI();
Path path = new Path(fileuri);
Configuration conf = context.getJobConf();
CompressionCodecFactory ccf = new CompressionCodecFactory(conf);
CompressionCodec codec = ccf.getCodec(path);
if (codec == null) {
return new BufferedReader(new FileReader(file));
} else {
Decompressor decompressor = CodecPool.getDecompressor(codec);
FileInputStream fis = new FileInputStream(file);
CompressionInputStream cis = codec.createInputStream(fis, decompressor);
BufferedReader br = new BufferedReaderExt(new InputStreamReader(cis), decompressor);
return br;
}
}
/** Read a compressed buffer */
private synchronized void readBuffer(DataInputBuffer buffer,
CompressionInputStream filter) throws IOException {
// Read data into a temporary buffer
DataOutputBuffer dataBuffer = new DataOutputBuffer();
try {
int dataBufferLength = WritableUtils.readVInt(in);
dataBuffer.write(in, dataBufferLength);
// Set up 'buffer' connected to the input-stream
buffer.reset(dataBuffer.getData(), 0, dataBuffer.getLength());
} finally {
dataBuffer.close();
}
// Reset the codec
filter.resetState();
}
private ByteBuffer decompress(CompressionCodec compressor, InputStream cellBlockStream,
int osInitialSize) throws IOException {
// GZIPCodec fails w/ NPE if no configuration.
if (compressor instanceof Configurable) {
((Configurable) compressor).setConf(this.conf);
}
Decompressor poolDecompressor = CodecPool.getDecompressor(compressor);
CompressionInputStream cis = compressor.createInputStream(cellBlockStream, poolDecompressor);
ByteBufferOutputStream bbos;
try {
// TODO: This is ugly. The buffer will be resized on us if we guess wrong.
// TODO: Reuse buffers.
bbos = new ByteBufferOutputStream(osInitialSize);
IOUtils.copy(cis, bbos);
bbos.close();
return bbos.getByteBuffer();
} finally {
CodecPool.returnDecompressor(poolDecompressor);
}
}
@Test
public void testCompressAndDecompressConsistent() throws Exception {
final String testString = "Test String";
final ByteArrayOutputStream baos = new ByteArrayOutputStream();
final OutputStreamWriter writer = new OutputStreamWriter(subject.createOutputStream(baos));
writer.write(testString);
writer.flush();
writer.close();
final CompressionInputStream inputStream = subject.createInputStream(new ByteArrayInputStream(baos
.toByteArray()));
final StringWriter contentsTester = new StringWriter();
IOUtils.copy(inputStream, contentsTester);
inputStream.close();
contentsTester.flush();
contentsTester.close();
Assert.assertEquals(testString, contentsTester.toString());
}
/** Read a compressed buffer */
private synchronized void readBuffer(DataInputBuffer buffer,
CompressionInputStream filter) throws IOException {
// Read data into a temporary buffer
DataOutputBuffer dataBuffer = new DataOutputBuffer();
try {
int dataBufferLength = WritableUtils.readVInt(in);
dataBuffer.write(in, dataBufferLength);
// Set up 'buffer' connected to the input-stream
buffer.reset(dataBuffer.getData(), 0, dataBuffer.getLength());
} finally {
dataBuffer.close();
}
// Reset the codec
filter.resetState();
}
@Test
public void testInternalErrorTranslation() throws Exception {
String codecErrorMsg = "codec failure";
CompressionInputStream mockCodecStream = mock(CompressionInputStream.class);
when(mockCodecStream.read(any(byte[].class), anyInt(), anyInt()))
.thenThrow(new InternalError(codecErrorMsg));
Decompressor mockDecoder = mock(Decompressor.class);
CompressionCodec mockCodec = mock(ConfigurableCodecForTest.class);
when(mockCodec.createDecompressor()).thenReturn(mockDecoder);
when(mockCodec.createInputStream(any(InputStream.class), any(Decompressor.class)))
.thenReturn(mockCodecStream);
byte[] header = new byte[] { (byte) 'T', (byte) 'I', (byte) 'F', (byte) 1};
try {
ShuffleUtils.shuffleToMemory(new byte[1024], new ByteArrayInputStream(header),
1024, 128, mockCodec, false, 0, mock(Logger.class), null);
Assert.fail("shuffle was supposed to throw!");
} catch (IOException e) {
Assert.assertTrue(e.getCause() instanceof InternalError);
Assert.assertTrue(e.getMessage().contains(codecErrorMsg));
}
}
/** Read a compressed buffer */
private synchronized void readBuffer(DataInputBuffer buffer,
CompressionInputStream filter) throws IOException {
// Read data into a temporary buffer
DataOutputBuffer dataBuffer = new DataOutputBuffer();
try {
int dataBufferLength = WritableUtils.readVInt(in);
dataBuffer.write(in, dataBufferLength);
// Set up 'buffer' connected to the input-stream
buffer.reset(dataBuffer.getData(), 0, dataBuffer.getLength());
} finally {
dataBuffer.close();
}
// Reset the codec
filter.resetState();
}
/**
* Creates a new instance with the mandatory characters for handling newlines
* transparently. lineSeparator the sequence of characters that represent a
* newline, as defined in {@link Format#getLineSeparator()}
* normalizedLineSeparator the normalized newline character (as defined in
* {@link Format#getNormalizedNewline()}) that is used to replace any
* lineSeparator sequence found in the input.
*/
public TextInput(TextParsingSettings settings, InputStream input, DrillBuf readBuffer, long startPos, long endPos) {
this.lineSeparator = settings.getNewLineDelimiter();
byte normalizedLineSeparator = settings.getNormalizedNewLine();
Preconditions.checkArgument(input instanceof Seekable, "Text input only supports an InputStream that supports Seekable.");
boolean isCompressed = input instanceof CompressionInputStream;
Preconditions.checkArgument(!isCompressed || startPos == 0, "Cannot use split on compressed stream.");
// splits aren't allowed with compressed data. The split length will be the compressed size which means we'll normally end prematurely.
if (isCompressed && endPos > 0) {
endPos = Long.MAX_VALUE;
}
this.input = input;
this.seekable = (Seekable) input;
this.settings = settings;
if (input instanceof FSDataInputStream) {
this.inputFS = (FSDataInputStream) input;
this.bufferReadable = inputFS.getWrappedStream() instanceof ByteBufferReadable;
} else {
this.inputFS = null;
this.bufferReadable = false;
}
this.startPos = startPos;
this.endPos = endPos;
this.normalizedLineSeparator = normalizedLineSeparator;
this.buffer = readBuffer;
this.bStart = buffer.memoryAddress();
this.bStartMinus1 = bStart -1;
this.underlyingBuffer = buffer.nioBuffer(0, buffer.capacity());
}
@Override
public void decompress(Slice compressed, Slice uncompressed)
throws RcFileCorruptionException
{
checkState(!destroyed, "Codec has been destroyed");
decompressor.reset();
try (CompressionInputStream decompressorStream = codec.createInputStream(compressed.getInput(), decompressor)) {
uncompressed.setBytes(0, decompressorStream, uncompressed.length());
}
catch (IndexOutOfBoundsException | IOException e) {
throw new RcFileCorruptionException(e, "Compressed stream is truncated");
}
}
@Override
public void decompress(Slice compressed, Slice uncompressed)
throws RcFileCorruptionException
{
try (CompressionInputStream decompressorStream = codec.createInputStream(compressed.getInput())) {
uncompressed.setBytes(0, decompressorStream, uncompressed.length());
}
catch (IndexOutOfBoundsException | IOException e) {
throw new RcFileCorruptionException(e, "Compressed stream is truncated");
}
}
@Override
public synchronized InputStream createDecompressionStream(
InputStream downStream, Decompressor decompressor,
int downStreamBufferSize) throws IOException {
// Set the internal buffer size to read from down stream.
if (downStreamBufferSize > 0) {
codec.getConf().setInt("io.file.buffer.size", downStreamBufferSize);
}
CompressionInputStream cis =
codec.createInputStream(downStream, decompressor);
BufferedInputStream bis2 = new BufferedInputStream(cis, DATA_IBUF_SIZE);
return bis2;
}
@Override
public synchronized InputStream createDecompressionStream(
InputStream downStream, Decompressor decompressor,
int downStreamBufferSize) throws IOException {
// Set the internal buffer size to read from down stream.
if (downStreamBufferSize > 0) {
codec.getConf().setInt("io.file.buffer.size", downStreamBufferSize);
}
CompressionInputStream cis =
codec.createInputStream(downStream, decompressor);
BufferedInputStream bis2 = new BufferedInputStream(cis, DATA_IBUF_SIZE);
return bis2;
}
@Test
public void testSnappyCompressionSimple() throws IOException
{
if (checkNativeSnappy()) {
return;
}
File snappyFile = new File(testMeta.getDir(), "snappyTestFile.snappy");
BufferedOutputStream os = new BufferedOutputStream(new FileOutputStream(snappyFile));
Configuration conf = new Configuration();
CompressionCodec codec = (CompressionCodec)ReflectionUtils.newInstance(SnappyCodec.class, conf);
FilterStreamCodec.SnappyFilterStream filterStream = new FilterStreamCodec.SnappyFilterStream(
codec.createOutputStream(os));
int ONE_MB = 1024 * 1024;
String testStr = "TestSnap-16bytes";
for (int i = 0; i < ONE_MB; i++) { // write 16 MBs
filterStream.write(testStr.getBytes());
}
filterStream.flush();
filterStream.close();
CompressionInputStream is = codec.createInputStream(new FileInputStream(snappyFile));
byte[] recovered = new byte[testStr.length()];
int bytesRead = is.read(recovered);
is.close();
assertEquals(testStr, new String(recovered));
}
public void initialize(Configuration job, long splitStart,
long splitLength, Path file) throws IOException {
start = splitStart;
end = start + splitLength;
LOG.info("Start of the split:" + start + "-End of split:" + end);
LOG.debug("VLR initialize started: start pos:" + start + "endpos:"
+ end);
// open the file and seek to the start of the split
final FileSystem fs = file.getFileSystem(job);
fileIn = fs.open(file);
CompressionCodec codec = new CompressionCodecFactory(job)
.getCodec(file);
if (null != codec) {
isCompressedInput = true;
decompressor = CodecPool.getDecompressor(codec);
CompressionInputStream cIn = codec.createInputStream(fileIn,
decompressor);
filePosition = (Seekable) cIn;
inputStream = cIn;
LOG.info("Compressed input; cannot compute number of records in the split");
} else {
fileIn.seek(start);
filePosition = fileIn;
inputStream = fileIn;
numBytesRemainingInSplit = splitLength;
LOG.info("Variable length input; cannot compute number of records in the split");
}
this.pos = start;
}
public InputStream createDecompressionStream(
InputStream downStream, Decompressor decompressor,
int downStreamBufferSize) throws IOException {
CompressionCodec codec = getCodec(conf);
// Set the internal buffer size to read from down stream.
if (downStreamBufferSize > 0) {
((Configurable)codec).getConf().setInt("io.file.buffer.size",
downStreamBufferSize);
}
CompressionInputStream cis =
codec.createInputStream(downStream, decompressor);
BufferedInputStream bis2 = new BufferedInputStream(cis, DATA_IBUF_SIZE);
return bis2;
}
@Override
public SplitCompressionInputStream createInputStream(InputStream seekableIn, Decompressor decompressor, long start, long end, READ_MODE readMode) throws IOException {
if (!(seekableIn instanceof Seekable)) {
throw new IOException("seekableIn must be an instance of " +
Seekable.class.getName());
}
if (!BlockCompressedInputStream.isValidFile(new BufferedInputStream(seekableIn))) {
// data is regular gzip, not BGZF
((Seekable)seekableIn).seek(0);
final CompressionInputStream compressionInputStream = createInputStream(seekableIn,
decompressor);
return new SplitCompressionInputStream(compressionInputStream, start, end) {
@Override
public int read(byte[] b, int off, int len) throws IOException {
return compressionInputStream.read(b, off, len);
}
@Override
public void resetState() throws IOException {
compressionInputStream.resetState();
}
@Override
public int read() throws IOException {
return compressionInputStream.read();
}
};
}
BGZFSplitGuesser splitGuesser = new BGZFSplitGuesser(seekableIn);
long adjustedStart = splitGuesser.guessNextBGZFBlockStart(start, end);
((Seekable)seekableIn).seek(adjustedStart);
return new BGZFSplitCompressionInputStream(seekableIn, adjustedStart, end);
}
private void mockDelimitedTextFileWriter(boolean isCompressed) throws Exception {
PowerMockito.mockStatic(FileSystem.class);
FileSystem fs = Mockito.mock(FileSystem.class);
Mockito.when(
FileSystem.get(Mockito.any(URI.class),
Mockito.any(Configuration.class))).thenReturn(fs);
Path fsPath = (!isCompressed) ? new Path(PATH) : new Path(PATH_GZ);
GzipCodec codec = PowerMockito.mock(GzipCodec.class);
PowerMockito.whenNew(GzipCodec.class).withNoArguments()
.thenReturn(codec);
FSDataInputStream fileInputStream = Mockito
.mock(FSDataInputStream.class);
FSDataOutputStream fileOutputStream = Mockito
.mock(FSDataOutputStream.class);
Mockito.when(fs.open(fsPath)).thenReturn(fileInputStream);
Mockito.when(fs.create(fsPath)).thenReturn(fileOutputStream);
CompressionInputStream inputStream = Mockito
.mock(CompressionInputStream.class);
CompressionOutputStream outputStream = Mockito
.mock(CompressionOutputStream.class);
Mockito.when(codec.createInputStream(Mockito.any(InputStream.class)))
.thenReturn(inputStream);
Mockito.when(codec.createOutputStream(Mockito.any(OutputStream.class)))
.thenReturn(outputStream);
}
public static MrGeoRaster toMrGeoRaster(RasterWritable writable,
CompressionCodec codec, Decompressor decompressor) throws IOException
{
decompressor.reset();
ByteArrayInputStream bis = new ByteArrayInputStream(writable.bytes, 0, writable.getSize());
CompressionInputStream gis = codec.createInputStream(bis, decompressor);
ByteArrayOutputStream baos = new ByteArrayOutputStream();
IOUtils.copyBytes(gis, baos, 1024 * 1024 * 2, true);
return toMrGeoRaster(new RasterWritable(baos.toByteArray()));
}
/**
* {@inheritDoc}
*/
@Override
public CompressionInputStream createInputStream(final InputStream in, final Decompressor decompressor)
throws IOException {
LOG.debug("Creating decompressor stream");
return new IStreamDelegatingDecompressorStream(new SnappyFramedInputStream(in));
}
@Override
public synchronized InputStream createDecompressionStream(
InputStream downStream, Decompressor decompressor,
int downStreamBufferSize) throws IOException {
// Set the internal buffer size to read from down stream.
if (downStreamBufferSize > 0) {
codec.getConf().setInt("io.file.buffer.size", downStreamBufferSize);
}
CompressionInputStream cis =
codec.createInputStream(downStream, decompressor);
BufferedInputStream bis2 = new BufferedInputStream(cis, DATA_IBUF_SIZE);
return bis2;
}
@Override
public synchronized InputStream createDecompressionStream(
InputStream downStream, Decompressor decompressor,
int downStreamBufferSize) throws IOException {
// Set the internal buffer size to read from down stream.
if (downStreamBufferSize > 0) {
codec.getConf().setInt("io.file.buffer.size", downStreamBufferSize);
}
CompressionInputStream cis =
codec.createInputStream(downStream, decompressor);
BufferedInputStream bis2 = new BufferedInputStream(cis, DATA_IBUF_SIZE);
return bis2;
}
@Override
public CompressionInputStream createInputStream(InputStream in)
throws IOException {
return null;
}
@Override
public CompressionInputStream createInputStream(InputStream in,
Decompressor decompressor) throws IOException {
return null;
}
@Override
public CompressionInputStream createInputStream(InputStream in)
throws IOException
{
return null;
}
@Override
public CompressionInputStream createInputStream(InputStream in, Decompressor decompressor)
throws IOException
{
return null;
}
public void initialize(Configuration job, long splitStart, long splitLength,
Path file) throws IOException {
start = splitStart;
end = start + splitLength;
long partialRecordLength = start % recordLength;
long numBytesToSkip = 0;
if (partialRecordLength != 0) {
numBytesToSkip = recordLength - partialRecordLength;
}
// open the file and seek to the start of the split
final FileSystem fs = file.getFileSystem(job);
fileIn = fs.open(file);
CompressionCodec codec = new CompressionCodecFactory(job).getCodec(file);
if (null != codec) {
isCompressedInput = true;
decompressor = CodecPool.getDecompressor(codec);
CompressionInputStream cIn
= codec.createInputStream(fileIn, decompressor);
filePosition = cIn;
inputStream = cIn;
numRecordsRemainingInSplit = Long.MAX_VALUE;
LOG.info(
"Compressed input; cannot compute number of records in the split");
} else {
fileIn.seek(start);
filePosition = fileIn;
inputStream = fileIn;
long splitSize = end - start - numBytesToSkip;
numRecordsRemainingInSplit = (splitSize + recordLength - 1)/recordLength;
if (numRecordsRemainingInSplit < 0) {
numRecordsRemainingInSplit = 0;
}
LOG.info("Expecting " + numRecordsRemainingInSplit
+ " records each with a length of " + recordLength
+ " bytes in the split with an effective size of "
+ splitSize + " bytes");
}
if (numBytesToSkip != 0) {
start += inputStream.skip(numBytesToSkip);
}
this.pos = start;
}
@Override
public CompressionInputStream createInputStream(InputStream in,
Decompressor decompressor) throws IOException {
return null;
}