下面列出了怎么用org.apache.hadoop.io.compress.CompressionOutputStream的API类实例代码及写法,或者点击链接到github查看源代码。
@Override
public synchronized OutputStream createCompressionStream(
OutputStream downStream, Compressor compressor,
int downStreamBufferSize) throws IOException {
if (!isSupported()) {
throw new IOException(
"LZO codec class not specified. Did you forget to set property "
+ CONF_LZO_CLASS + "?");
}
OutputStream bos1 = null;
if (downStreamBufferSize > 0) {
bos1 = new BufferedOutputStream(downStream, downStreamBufferSize);
} else {
bos1 = downStream;
}
conf.setInt("io.compression.codec.lzo.buffersize", 64 * 1024);
CompressionOutputStream cos =
codec.createOutputStream(bos1, compressor);
BufferedOutputStream bos2 =
new BufferedOutputStream(new FinishOnFlushCompressionStream(cos),
DATA_OBUF_SIZE);
return bos2;
}
@Override
public synchronized OutputStream createCompressionStream(
OutputStream downStream, Compressor compressor,
int downStreamBufferSize) throws IOException {
OutputStream bos1 = null;
if (downStreamBufferSize > 0) {
bos1 = new BufferedOutputStream(downStream, downStreamBufferSize);
} else {
bos1 = downStream;
}
codec.getConf().setInt("io.file.buffer.size", 32 * 1024);
CompressionOutputStream cos =
codec.createOutputStream(bos1, compressor);
BufferedOutputStream bos2 =
new BufferedOutputStream(new FinishOnFlushCompressionStream(cos),
DATA_OBUF_SIZE);
return bos2;
}
@Override
public synchronized OutputStream createCompressionStream(
OutputStream downStream, Compressor compressor,
int downStreamBufferSize) throws IOException {
if (!isSupported()) {
throw new IOException(
"LZO codec class not specified. Did you forget to set property "
+ CONF_LZO_CLASS + "?");
}
OutputStream bos1 = null;
if (downStreamBufferSize > 0) {
bos1 = new BufferedOutputStream(downStream, downStreamBufferSize);
} else {
bos1 = downStream;
}
conf.setInt("io.compression.codec.lzo.buffersize", 64 * 1024);
CompressionOutputStream cos =
codec.createOutputStream(bos1, compressor);
BufferedOutputStream bos2 =
new BufferedOutputStream(new FinishOnFlushCompressionStream(cos),
DATA_OBUF_SIZE);
return bos2;
}
@Override
public synchronized OutputStream createCompressionStream(
OutputStream downStream, Compressor compressor,
int downStreamBufferSize) throws IOException {
OutputStream bos1 = null;
if (downStreamBufferSize > 0) {
bos1 = new BufferedOutputStream(downStream, downStreamBufferSize);
} else {
bos1 = downStream;
}
codec.getConf().setInt("io.file.buffer.size", 32 * 1024);
CompressionOutputStream cos =
codec.createOutputStream(bos1, compressor);
BufferedOutputStream bos2 =
new BufferedOutputStream(new FinishOnFlushCompressionStream(cos),
DATA_OBUF_SIZE);
return bos2;
}
public OutputStream createCompressionStream(
OutputStream downStream, Compressor compressor, int downStreamBufferSize)
throws IOException {
OutputStream bos1 = null;
if (downStreamBufferSize > 0) {
bos1 = new BufferedOutputStream(downStream, downStreamBufferSize);
}
else {
bos1 = downStream;
}
CompressionOutputStream cos =
createPlainCompressionStream(bos1, compressor);
BufferedOutputStream bos2 =
new BufferedOutputStream(new FinishOnFlushCompressionStream(cos),
DATA_OBUF_SIZE);
return bos2;
}
@Override
public BytesInput compress(BytesInput bytes) throws IOException {
final BytesInput compressedBytes;
if (codec == null) {
compressedBytes = bytes;
} else {
compressedOutBuffer.reset();
if (compressor != null) {
// null compressor for non-native gzip
compressor.reset();
}
CompressionOutputStream cos = codec.createOutputStream(compressedOutBuffer, compressor);
bytes.writeAllTo(cos);
cos.finish();
cos.close();
compressedBytes = BytesInput.from(compressedOutBuffer);
}
return compressedBytes;
}
@Override
public synchronized OutputStream createCompressionStream(
OutputStream downStream, Compressor compressor,
int downStreamBufferSize) throws IOException {
if (!isSupported()) {
throw new IOException(
"LZO codec class not specified. Did you forget to set property "
+ CONF_LZO_CLASS + "?");
}
OutputStream bos1 = null;
if (downStreamBufferSize > 0) {
bos1 = new BufferedOutputStream(downStream, downStreamBufferSize);
} else {
bos1 = downStream;
}
conf.setInt("io.compression.codec.lzo.buffersize", 64 * 1024);
CompressionOutputStream cos =
codec.createOutputStream(bos1, compressor);
BufferedOutputStream bos2 =
new BufferedOutputStream(new FinishOnFlushCompressionStream(cos),
DATA_OBUF_SIZE);
return bos2;
}
@Override
public synchronized OutputStream createCompressionStream(
OutputStream downStream, Compressor compressor,
int downStreamBufferSize) throws IOException {
if (!isSupported()) {
throw new IOException(
"LZMA codec cannot be loaded. " +
"You may want to check LD_LIBRARY_PATH.");
}
OutputStream bos1 = null;
if (downStreamBufferSize > 0) {
bos1 = new BufferedOutputStream(downStream, downStreamBufferSize);
} else {
bos1 = downStream;
}
conf.setInt("io.compression.codec.lzma.buffersize", 64 * 1024);
CompressionOutputStream cos =
codec.createOutputStream(bos1, compressor);
BufferedOutputStream bos2 =
new BufferedOutputStream(new FinishOnFlushCompressionStream(cos),
DATA_OBUF_SIZE);
return bos2;
}
@Override
public synchronized OutputStream createCompressionStream(
OutputStream downStream, Compressor compressor,
int downStreamBufferSize) throws IOException {
OutputStream bos1 = null;
if (downStreamBufferSize > 0) {
bos1 = new BufferedOutputStream(downStream, downStreamBufferSize);
} else {
bos1 = downStream;
}
codec.getConf().setInt("io.file.buffer.size", 32 * 1024);
CompressionOutputStream cos =
codec.createOutputStream(bos1, compressor);
BufferedOutputStream bos2 =
new BufferedOutputStream(new FinishOnFlushCompressionStream(cos),
DATA_OBUF_SIZE);
return bos2;
}
@Override
public synchronized OutputStream createCompressionStream(
OutputStream downStream, Compressor compressor,
int downStreamBufferSize) throws IOException {
if (!isSupported()) {
throw new IOException(
"LZO codec class not specified. Did you forget to set property "
+ CONF_LZO_CLASS + "?");
}
OutputStream bos1 = null;
if (downStreamBufferSize > 0) {
bos1 = new BufferedOutputStream(downStream, downStreamBufferSize);
} else {
bos1 = downStream;
}
conf.setInt("io.compression.codec.lzo.buffersize", 64 * 1024);
CompressionOutputStream cos =
codec.createOutputStream(bos1, compressor);
BufferedOutputStream bos2 =
new BufferedOutputStream(new FinishOnFlushCompressionStream(cos),
DATA_OBUF_SIZE);
return bos2;
}
@Override
public synchronized OutputStream createCompressionStream(
OutputStream downStream, Compressor compressor,
int downStreamBufferSize) throws IOException {
OutputStream bos1 = null;
if (downStreamBufferSize > 0) {
bos1 = new BufferedOutputStream(downStream, downStreamBufferSize);
} else {
bos1 = downStream;
}
codec.getConf().setInt("io.file.buffer.size", 32 * 1024);
CompressionOutputStream cos =
codec.createOutputStream(bos1, compressor);
BufferedOutputStream bos2 =
new BufferedOutputStream(new FinishOnFlushCompressionStream(cos),
DATA_OBUF_SIZE);
return bos2;
}
@Override
public CompressedSliceOutput get()
{
try {
compressor.reset();
compressedOutput.reset();
CompressionOutputStream compressionStream = codec.createOutputStream(compressedOutput, compressor);
return new CompressedSliceOutput(compressionStream, compressedOutput, this, () -> {});
}
catch (IOException e) {
throw new UncheckedIOException(e);
}
}
@Override
public CompressedSliceOutput get()
{
try {
compressor.reset();
bufferedOutput.reset();
CompressionOutputStream compressionStream = codec.createOutputStream(bufferedOutput, compressor);
return new CompressedSliceOutput(compressionStream, bufferedOutput, this, () -> CodecPool.returnCompressor(compressor));
}
catch (IOException e) {
throw new UncheckedIOException(e);
}
}
@Override
public CompressionOutputStream createOutputStream(OutputStream out)
throws IOException {
CompressionOutputStream ret = super.createOutputStream(out);
CompressionOutputStream spy = Mockito.spy(ret);
Mockito.doAnswer(new GenericTestUtils.SleepAnswer(5))
.when(spy).write(Mockito.<byte[]>any(), Mockito.anyInt(), Mockito.anyInt());
return spy;
}
@Override
public void flush() throws IOException {
CompressionOutputStream cout = (CompressionOutputStream) out;
cout.finish();
cout.flush();
cout.resetState();
}
@Override
public CompressionOutputStream createOutputStream(OutputStream out)
throws IOException {
CompressionOutputStream ret = super.createOutputStream(out);
CompressionOutputStream spy = Mockito.spy(ret);
Mockito.doAnswer(new GenericTestUtils.SleepAnswer(5))
.when(spy).write(Mockito.<byte[]>any(), Mockito.anyInt(), Mockito.anyInt());
return spy;
}
@Override
public void flush() throws IOException {
CompressionOutputStream cout = (CompressionOutputStream) out;
cout.finish();
cout.flush();
cout.resetState();
}
@Override
public void flush() throws IOException {
CompressionOutputStream cout = (CompressionOutputStream) out;
cout.finish();
cout.flush();
cout.resetState();
}
/**
* Creates a compression stream without any additional wrapping into
* buffering streams.
*/
public CompressionOutputStream createPlainCompressionStream(
OutputStream downStream, Compressor compressor) throws IOException {
CompressionCodec codec = getCodec(conf);
((Configurable)codec).getConf().setInt("io.file.buffer.size", 32 * 1024);
return codec.createOutputStream(downStream, compressor);
}
@Override
public CompressionOutputStream createOutputStream(OutputStream out)
throws IOException {
if (ZlibFactory.isNativeZlibLoaded(getConf())) {
return super.createOutputStream(out);
}
return new ReusableGzipOutputStream(out);
}
private BytesInput compress(ZstandardCodec codec, BytesInput bytes) throws IOException {
ByteArrayOutputStream compressedOutBuffer = new ByteArrayOutputStream((int)bytes.size());
CompressionOutputStream cos = codec.createOutputStream(compressedOutBuffer, null);
bytes.writeAllTo(cos);
cos.close();
return BytesInput.from(compressedOutBuffer);
}
private void mockDelimitedTextFileWriter(boolean isCompressed) throws Exception {
PowerMockito.mockStatic(FileSystem.class);
FileSystem fs = Mockito.mock(FileSystem.class);
Mockito.when(
FileSystem.get(Mockito.any(URI.class),
Mockito.any(Configuration.class))).thenReturn(fs);
Path fsPath = (!isCompressed) ? new Path(PATH) : new Path(PATH_GZ);
GzipCodec codec = PowerMockito.mock(GzipCodec.class);
PowerMockito.whenNew(GzipCodec.class).withNoArguments()
.thenReturn(codec);
FSDataInputStream fileInputStream = Mockito
.mock(FSDataInputStream.class);
FSDataOutputStream fileOutputStream = Mockito
.mock(FSDataOutputStream.class);
Mockito.when(fs.open(fsPath)).thenReturn(fileInputStream);
Mockito.when(fs.create(fsPath)).thenReturn(fileOutputStream);
CompressionInputStream inputStream = Mockito
.mock(CompressionInputStream.class);
CompressionOutputStream outputStream = Mockito
.mock(CompressionOutputStream.class);
Mockito.when(codec.createInputStream(Mockito.any(InputStream.class)))
.thenReturn(inputStream);
Mockito.when(codec.createOutputStream(Mockito.any(OutputStream.class)))
.thenReturn(outputStream);
}
@Override
public void writeTestData(File file, int recordCounts, int columnCount,
String colSeparator) throws IOException {
// write random test data
GzipCodec gzipCodec = new GzipCodec();
CompressionOutputStream out = gzipCodec
.createOutputStream(new FileOutputStream(file));
BufferedWriter writer = new BufferedWriter(new OutputStreamWriter(
out));
try {
for (int r = 0; r < recordCounts; r++) {
// foreach row write n columns
for (int c = 0; c < columnCount; c++) {
if (c != 0) {
writer.append(colSeparator);
}
writer.append(String.valueOf(Math.random()));
}
writer.append("\n");
}
} finally {
writer.close();
out.close();
}
}
/**
* {@inheritDoc}
*/
@Override
public CompressionOutputStream createOutputStream(final OutputStream out, final Compressor compressor)
throws IOException {
LOG.info("Creating compressor stream");
return new OStreamDelegatingCompressorStream(new SnappyFramedOutputStream(out, conf.getInt(
COMPRESSION_BLOCK_SIZE_CONF, SnappyFramedOutputStream.DEFAULT_BLOCK_SIZE),
SnappyFramedOutputStream.DEFAULT_MIN_COMPRESSION_RATIO));
}
@Override
public CompressionOutputStream createOutputStream(OutputStream out)
throws IOException {
SimpleSeekableFormatOutputStream s = new SimpleSeekableFormatOutputStream(out);
s.setConf(getConf());
return s;
}
@Override
public void flush() throws IOException {
CompressionOutputStream cout = (CompressionOutputStream) out;
cout.finish();
cout.flush();
cout.resetState();
}
@Override
public void flush() throws IOException {
CompressionOutputStream cout = (CompressionOutputStream) out;
cout.finish();
cout.flush();
cout.resetState();
}
@Override
public CompressionOutputStream createOutputStream(OutputStream out)
throws IOException {
return null;
}
@Override
public CompressionOutputStream createOutputStream(OutputStream out,
Compressor compressor) throws IOException {
return null;
}
@Override
public CompressionOutputStream createOutputStream(OutputStream out, Compressor compressor)
throws IOException
{
return this.createOutputStream(out);
}