下面列出了怎么用org.apache.hadoop.io.compress.GzipCodec的API类实例代码及写法,或者点击链接到github查看源代码。
public RecordWriter<K, V> getRecordWriter(FileSystem ignored,
JobConf job,
String name,
Progressable progress)
throws IOException {
boolean ignoreSeparatorOnNull = job.getBoolean("mapred.textoutputformat.ignore.separator", false);
String keyValueSeparator = job.get("mapred.textoutputformat.separator", "\t");
splitSize = job.getLong(MR_REDUCE_MAX_FILE_PER_FILE, SPLIT_SIZE);
jobConf = job;
fileName = name;
jobProgress = progress;
Class<? extends CompressionCodec> codecClass =
getOutputCompressorClass(job, GzipCodec.class);
// create the named codec
codec = ReflectionUtils.newInstance(codecClass, job);
FSDataOutputStream fileOut = createFile();
return new MultiSplitRecordWriter<K, V>(new NewDataOutputStream(codec.createOutputStream(fileOut)),
keyValueSeparator, ignoreSeparatorOnNull);
}
@Test
/**
* Create an IFile.Writer using GzipCodec since this code does not
* have a compressor when run via the tests (ie no native libraries).
*/
public void testIFileWriterWithCodec() throws Exception {
Configuration conf = new Configuration();
FileSystem localFs = FileSystem.getLocal(conf);
FileSystem rfs = ((LocalFileSystem)localFs).getRaw();
Path path = new Path(new Path("build/test.ifile"), "data");
DefaultCodec codec = new GzipCodec();
codec.setConf(conf);
IFile.Writer<Text, Text> writer =
new IFile.Writer<Text, Text>(conf, rfs.create(path), Text.class, Text.class,
codec, null);
writer.close();
}
@Test
/** Same as above but create a reader. */
public void testIFileReaderWithCodec() throws Exception {
Configuration conf = new Configuration();
FileSystem localFs = FileSystem.getLocal(conf);
FileSystem rfs = ((LocalFileSystem)localFs).getRaw();
Path path = new Path(new Path("build/test.ifile"), "data");
DefaultCodec codec = new GzipCodec();
codec.setConf(conf);
FSDataOutputStream out = rfs.create(path);
IFile.Writer<Text, Text> writer =
new IFile.Writer<Text, Text>(conf, out, Text.class, Text.class,
codec, null);
writer.close();
FSDataInputStream in = rfs.open(path);
IFile.Reader<Text, Text> reader =
new IFile.Reader<Text, Text>(conf, in, rfs.getFileStatus(path).getLen(),
codec, null);
reader.close();
// test check sum
byte[] ab= new byte[100];
int readed= reader.checksumIn.readWithChecksum(ab, 0, ab.length);
assertEquals( readed,reader.checksumIn.getChecksum().length);
}
/**
* Returns a {@link OutputStream} for a file that might need
* compression.
*/
static OutputStream getPossiblyCompressedOutputStream(Path file,
Configuration conf)
throws IOException {
FileSystem fs = file.getFileSystem(conf);
JobConf jConf = new JobConf(conf);
if (org.apache.hadoop.mapred.FileOutputFormat.getCompressOutput(jConf)) {
// get the codec class
Class<? extends CompressionCodec> codecClass =
org.apache.hadoop.mapred.FileOutputFormat
.getOutputCompressorClass(jConf,
GzipCodec.class);
// get the codec implementation
CompressionCodec codec = ReflectionUtils.newInstance(codecClass, conf);
// add the appropriate extension
file = file.suffix(codec.getDefaultExtension());
if (isCompressionEmulationEnabled(conf)) {
FSDataOutputStream fileOut = fs.create(file, false);
return new DataOutputStream(codec.createOutputStream(fileOut));
}
}
return fs.create(file, false);
}
@Test
/**
* Create an IFile.Writer using GzipCodec since this code does not
* have a compressor when run via the tests (ie no native libraries).
*/
public void testIFileWriterWithCodec() throws Exception {
Configuration conf = new Configuration();
FileSystem localFs = FileSystem.getLocal(conf);
FileSystem rfs = ((LocalFileSystem)localFs).getRaw();
Path path = new Path(new Path("build/test.ifile"), "data");
DefaultCodec codec = new GzipCodec();
codec.setConf(conf);
IFile.Writer<Text, Text> writer =
new IFile.Writer<Text, Text>(conf, rfs.create(path), Text.class, Text.class,
codec, null);
writer.close();
}
@Test
/** Same as above but create a reader. */
public void testIFileReaderWithCodec() throws Exception {
Configuration conf = new Configuration();
FileSystem localFs = FileSystem.getLocal(conf);
FileSystem rfs = ((LocalFileSystem)localFs).getRaw();
Path path = new Path(new Path("build/test.ifile"), "data");
DefaultCodec codec = new GzipCodec();
codec.setConf(conf);
FSDataOutputStream out = rfs.create(path);
IFile.Writer<Text, Text> writer =
new IFile.Writer<Text, Text>(conf, out, Text.class, Text.class,
codec, null);
writer.close();
FSDataInputStream in = rfs.open(path);
IFile.Reader<Text, Text> reader =
new IFile.Reader<Text, Text>(conf, in, rfs.getFileStatus(path).getLen(),
codec, null);
reader.close();
// test check sum
byte[] ab= new byte[100];
int readed= reader.checksumIn.readWithChecksum(ab, 0, ab.length);
assertEquals( readed,reader.checksumIn.getChecksum().length);
}
/**
* Returns a {@link OutputStream} for a file that might need
* compression.
*/
static OutputStream getPossiblyCompressedOutputStream(Path file,
Configuration conf)
throws IOException {
FileSystem fs = file.getFileSystem(conf);
JobConf jConf = new JobConf(conf);
if (org.apache.hadoop.mapred.FileOutputFormat.getCompressOutput(jConf)) {
// get the codec class
Class<? extends CompressionCodec> codecClass =
org.apache.hadoop.mapred.FileOutputFormat
.getOutputCompressorClass(jConf,
GzipCodec.class);
// get the codec implementation
CompressionCodec codec = ReflectionUtils.newInstance(codecClass, conf);
// add the appropriate extension
file = file.suffix(codec.getDefaultExtension());
if (isCompressionEmulationEnabled(conf)) {
FSDataOutputStream fileOut = fs.create(file, false);
return new DataOutputStream(codec.createOutputStream(fileOut));
}
}
return fs.create(file, false);
}
@Test
public void readBitcoinRawBlockInputFormatGzipCompressed() throws IOException {
JobConf job = new JobConf(defaultConf);
CompressionCodec gzip = new GzipCodec();
ReflectionUtils.setConf(gzip, job);
ClassLoader classLoader = getClass().getClassLoader();
String fileName="version4comp.blk.gz";
String fileNameBlock=classLoader.getResource("testdata/"+fileName).getFile();
Path file = new Path(fileNameBlock);
FileInputFormat.setInputPaths(job, file);
BitcoinRawBlockFileInputFormat format = new BitcoinRawBlockFileInputFormat();
format.configure(job);
InputSplit[] inputSplits = format.getSplits(job,1);
assertEquals( 1, inputSplits.length,"Only one split generated for compressed block");
RecordReader<BytesWritable, BytesWritable> reader = format.getRecordReader(inputSplits[0], job, reporter);
assertNotNull( reader,"Format returned null RecordReader");
BytesWritable key = new BytesWritable();
BytesWritable block = new BytesWritable();
assertTrue( reader.next(key,block),"Input Split for block version contains at least one block");
assertEquals( 998039, block.getLength(),"Compressed block must have a size of 998.039 bytes");
BytesWritable emptyKey = new BytesWritable();
BytesWritable emptyBlock = new BytesWritable();
assertFalse( reader.next(emptyKey,emptyBlock),"No further blocks in compressed block");
reader.close();
}
@Test
public void readBitcoinTransactionInputFormatGzipCompressed() throws IOException{
JobConf job = new JobConf(defaultConf);
CompressionCodec gzip = new GzipCodec();
ReflectionUtils.setConf(gzip, job);
ClassLoader classLoader = getClass().getClassLoader();
String fileName="version4comp.blk.gz";
String fileNameBlock=classLoader.getResource("testdata/"+fileName).getFile();
Path file = new Path(fileNameBlock);
FileInputFormat.setInputPaths(job, file);
BitcoinTransactionFileInputFormat format = new BitcoinTransactionFileInputFormat();
format.configure(job);
InputSplit[] inputSplits = format.getSplits(job,1);
assertEquals( 1, inputSplits.length,"Only one split generated for compressed block");
RecordReader<BytesWritable, BitcoinTransaction> reader = format.getRecordReader(inputSplits[0], job, reporter);
assertNotNull( reader,"Format returned null RecordReader");
BytesWritable key = new BytesWritable();
BitcoinTransaction transaction = new BitcoinTransaction();
int transactCount=0;
while (reader.next(key,transaction)) {
transactCount++;
}
assertEquals( 936, transactCount,"Compressed block must have at least 936 transactions");
reader.close();
}
@Test
public void readBitcoinRawBlockInputFormatGzipCompressed() throws IOException, InterruptedException {
Configuration conf = new Configuration(defaultConf);
Job job = Job.getInstance(conf);
CompressionCodec gzip = new GzipCodec();
ReflectionUtils.setConf(gzip, conf);
ClassLoader classLoader = getClass().getClassLoader();
String fileName="version4comp.blk.gz";
String fileNameBlock=classLoader.getResource("testdata/"+fileName).getFile();
Path file = new Path(fileNameBlock);
FileInputFormat.setInputPaths(job, file);
BitcoinRawBlockFileInputFormat format = new BitcoinRawBlockFileInputFormat();
List<InputSplit> splits = format.getSplits(job);
TaskAttemptContext context = new TaskAttemptContextImpl(conf, new TaskAttemptID());
assertEquals( 1, splits.size(),"Only one split generated for compressed block");
RecordReader<BytesWritable, BytesWritable> reader = format.createRecordReader(splits.get(0), context);
assertNotNull( reader,"Format returned null RecordReader");
reader.initialize(splits.get(0),context);
BytesWritable key = new BytesWritable();
BytesWritable block = new BytesWritable();
assertTrue( reader.nextKeyValue(),"Input Split for block version contains at least one block");
block=reader.getCurrentValue();
assertEquals( 998039, block.getLength(),"Compressed block must have a size of 998.039 bytes");
assertFalse( reader.nextKeyValue(),"No further blocks in compressed block");
reader.close();
}
@Test
public void readBitcoinTransactionInputFormatGzipCompressed() throws IOException, InterruptedException{
Configuration conf = new Configuration(defaultConf);
Job job = Job.getInstance(conf);
CompressionCodec gzip = new GzipCodec();
ReflectionUtils.setConf(gzip, conf);
ClassLoader classLoader = getClass().getClassLoader();
String fileName="version4comp.blk.gz";
String fileNameBlock=classLoader.getResource("testdata/"+fileName).getFile();
Path file = new Path(fileNameBlock);
FileInputFormat.setInputPaths(job, file);
BitcoinTransactionFileInputFormat format = new BitcoinTransactionFileInputFormat();
List<InputSplit> splits = format.getSplits(job);
TaskAttemptContext context = new TaskAttemptContextImpl(conf, new TaskAttemptID());
assertEquals( 1, splits.size(),"Only one split generated for compressed block");
RecordReader<BytesWritable, BitcoinTransaction> reader = format.createRecordReader(splits.get(0), context);
assertNotNull( reader,"Format returned null RecordReader");
reader.initialize(splits.get(0),context);
int transactCount=0;
while (reader.nextKeyValue()) {
transactCount++;
}
assertEquals( 936, transactCount,"Comrpessed block must have at least 936 transactions");
reader.close();
}
@Override
public RecordWriter<LongWritable, Document> getRecordWriter(
TaskAttemptContext job) throws IOException, InterruptedException {
Configuration conf = job.getConfiguration();
boolean isCompressed = getCompressOutput(job);
CompressionCodec codec = null;
String extension = "";
if (isCompressed) {
Class<? extends CompressionCodec> codecClass = getOutputCompressorClass(
job, GzipCodec.class);
codec = ReflectionUtils.newInstance(codecClass, conf);
extension = codec.getDefaultExtension();
}
Path path = getDefaultWorkFile(job, extension);
FileSystem fs = path.getFileSystem(conf);
FSDataOutputStream out = fs.create(path, false);
if (!isCompressed) {
return new JSONFileOutputRecordWriter(out);
} else {
return new JSONFileOutputRecordWriter(new DataOutputStream(
codec.createOutputStream(out)));
}
}
/**
* Construct the preferred type of 'raw' SequenceFile Writer.
* @param out The stream on top which the writer is to be constructed.
* @param keyClass The 'key' type.
* @param valClass The 'value' type.
* @param compress Compress data?
* @param blockCompress Compress blocks?
* @param metadata The metadata of the file.
* @return Returns the handle to the constructed SequenceFile Writer.
* @throws IOException
*/
private static Writer
createWriter(Configuration conf, FSDataOutputStream out,
Class keyClass, Class valClass, boolean compress, boolean blockCompress,
CompressionCodec codec, Metadata metadata)
throws IOException {
if (codec != null && (codec instanceof GzipCodec) &&
!NativeCodeLoader.isNativeCodeLoaded() &&
!ZlibFactory.isNativeZlibLoaded(conf)) {
throw new IllegalArgumentException("SequenceFile doesn't work with " +
"GzipCodec without native-hadoop code!");
}
Writer writer = null;
if (!compress) {
writer = new Writer(conf, out, keyClass, valClass, metadata);
} else if (compress && !blockCompress) {
writer = new RecordCompressWriter(conf, out, keyClass, valClass, codec, metadata);
} else {
writer = new BlockCompressWriter(conf, out, keyClass, valClass, codec, metadata);
}
return writer;
}
/**
* Construct the preferred type of 'raw' SequenceFile Writer.
* @param conf The configuration.
* @param out The stream on top which the writer is to be constructed.
* @param keyClass The 'key' type.
* @param valClass The 'value' type.
* @param compressionType The compression type.
* @param codec The compression codec.
* @param metadata The metadata of the file.
* @return Returns the handle to the constructed SequenceFile Writer.
* @throws IOException
*/
public static Writer
createWriter(Configuration conf, FSDataOutputStream out,
Class keyClass, Class valClass, CompressionType compressionType,
CompressionCodec codec, Metadata metadata)
throws IOException {
if ((codec instanceof GzipCodec) &&
!NativeCodeLoader.isNativeCodeLoaded() &&
!ZlibFactory.isNativeZlibLoaded(conf)) {
throw new IllegalArgumentException("SequenceFile doesn't work with " +
"GzipCodec without native-hadoop code!");
}
Writer writer = null;
if (compressionType == CompressionType.NONE) {
writer = new Writer(conf, out, keyClass, valClass, metadata);
} else if (compressionType == CompressionType.RECORD) {
writer = new RecordCompressWriter(conf, out, keyClass, valClass, codec, metadata);
} else if (compressionType == CompressionType.BLOCK){
writer = new BlockCompressWriter(conf, out, keyClass, valClass, codec, metadata);
}
return writer;
}
/**
* For running a few tests of methods herein.
*
* @param args the arguments to use for the timer test
* @throws IOException if creating the build fails
*/
public static void main(String[] args) throws IOException {
int count = 1024;
int size = 10240;
for (String arg : args) {
if (arg.startsWith(COUNT)) {
count = Integer.parseInt(arg.replace(COUNT, ""));
} else if (arg.startsWith(SIZE)) {
size = Integer.parseInt(arg.replace(SIZE, ""));
} else {
usage(1);
}
}
CellBlockBuilder builder = new CellBlockBuilder(HBaseConfiguration.create());
timerTests(builder, count, size, new KeyValueCodec(), null);
timerTests(builder, count, size, new KeyValueCodec(), new DefaultCodec());
timerTests(builder, count, size, new KeyValueCodec(), new GzipCodec());
}
@Test
public void testGzCompressedInput() throws IOException
{
// write gzip-compressed data
GzipCodec codec = new GzipCodec();
PrintWriter qseqOut = new PrintWriter( new BufferedOutputStream( codec.createOutputStream( new FileOutputStream(tempGz) ) ) );
qseqOut.write(twoQseq);
qseqOut.close();
// now try to read it
split = new FileSplit(new Path(tempGz.toURI().toString()), 0, twoQseq.length(), null);
QseqRecordReader reader = new QseqRecordReader(conf, split);
boolean retval = reader.next(key, fragment);
assertTrue(retval);
assertEquals("ERR020229:10880:1:1:1373:2042:1", key.toString());
assertEquals("TTGGATGATAGGGATTATTTGACTCGAATATTGGAAATAGCTGTTTATATTTTTTAAAAATGGTCTGTAACTGGTGACAGGACGCTTCGAT", fragment.getSequence().toString());
retval = reader.next(key, fragment);
assertTrue(retval);
assertEquals("ERR020229:10883:1:1:1796:2044:2", key.toString());
assertEquals("TGAGCAGATGTGCTAAAGCTGCTTCTCCCCTAGGATCATTTGTACCTACCAGACTCAGGGAAAGGGGTGAGAATTGGGCCGTGGGGCAAGG", fragment.getSequence().toString());
}
@Test
public void testBuildDDL() throws Exception {
Schema schema = new Schema();
schema.addColumn("name", TajoDataTypes.Type.BLOB);
schema.addColumn("addr", TajoDataTypes.Type.TEXT);
TableMeta meta = CatalogUtil.newTableMeta(CatalogProtos.StoreType.CSV);
meta.putOption(CatalogConstants.CSVFILE_DELIMITER, CatalogConstants.CSVFILE_DELIMITER_DEFAULT);
meta.putOption(CatalogConstants.COMPRESSION_CODEC, GzipCodec.class.getName());
TableDesc desc = new TableDesc("table1", schema, meta, new Path("/table1"));
Schema expressionSchema = new Schema();
expressionSchema.addColumn("key", TajoDataTypes.Type.INT4);
expressionSchema.addColumn("key2", TajoDataTypes.Type.TEXT);
PartitionMethodDesc partitionMethod = new PartitionMethodDesc(
"table1",
CatalogProtos.PartitionType.COLUMN,
"key,key2",
expressionSchema);
desc.setPartitionMethod(partitionMethod);
assertEquals(FileUtil.readTextFile(new File("src/test/resources/results/testBuildDDL.result")),
DDLBuilder.buildDDL(desc));
}
private void createCompressedWriter() throws Exception {
PowerMockito.mockStatic(FileUtil.class);
PowerMockito.mockStatic(ReflectionUtil.class);
FileWriter writer = Mockito.mock(FileWriter.class);
Mockito.when(
ReflectionUtil.createFileWriter(
Mockito.any(String.class),
Mockito.any(LogFilePath.class),
Mockito.any(CompressionCodec.class),
Mockito.any(SecorConfig.class)
))
.thenReturn(writer);
Mockito.when(writer.getLength()).thenReturn(123L);
FileWriter createdWriter = mRegistry.getOrCreateWriter(
mLogFilePathGz, new GzipCodec());
assertTrue(createdWriter == writer);
}
public void testSequenceFileReader() throws Exception {
setupSequenceFileReaderConfig();
mockSequenceFileWriter(false);
ReflectionUtil.createFileReader(mConfig.getFileReaderWriterFactory(), mLogFilePath, null, mConfig);
// Verify that the method has been called exactly once (the default).
// PowerMockito.verifyStatic(FileSystem.class);
// FileSystem.get(Mockito.any(URI.class), Mockito.any(Configuration.class));
mockSequenceFileWriter(true);
ReflectionUtil.createFileWriter(mConfig.getFileReaderWriterFactory(), mLogFilePathGz, new GzipCodec(),
mConfig);
// Verify that the method has been called exactly once (the default).
// PowerMockito.verifyStatic(FileSystem.class);
// FileSystem.get(Mockito.any(URI.class), Mockito.any(Configuration.class));
}
/**
* Construct the preferred type of SequenceFile Writer.
* @param fs The configured filesystem.
* @param conf The configuration.
* @param name The name of the file.
* @param keyClass The 'key' type.
* @param valClass The 'value' type.
* @param bufferSize buffer size for the underlaying outputstream.
* @param replication replication factor for the file.
* @param blockSize block size for the file.
* @param compressionType The compression type.
* @param codec The compression codec.
* @param progress The Progressable object to track progress.
* @param metadata The metadata of the file.
* @return Returns the handle to the constructed SequenceFile Writer.
* @throws IOException
*/
public static Writer
createWriter(FileSystem fs, Configuration conf, Path name,
Class keyClass, Class valClass, int bufferSize,
short replication, long blockSize,
CompressionType compressionType, CompressionCodec codec,
Progressable progress, Metadata metadata) throws IOException {
if ((codec instanceof GzipCodec) &&
!NativeCodeLoader.isNativeCodeLoaded() &&
!ZlibFactory.isNativeZlibLoaded(conf)) {
throw new IllegalArgumentException("SequenceFile doesn't work with " +
"GzipCodec without native-hadoop code!");
}
Writer writer = null;
if (compressionType == CompressionType.NONE) {
writer = new Writer(fs, conf, name, keyClass, valClass,
bufferSize, replication, blockSize,
progress, metadata);
} else if (compressionType == CompressionType.RECORD) {
writer = new RecordCompressWriter(fs, conf, name, keyClass, valClass,
bufferSize, replication, blockSize,
codec, progress, metadata);
} else if (compressionType == CompressionType.BLOCK){
writer = new BlockCompressWriter(fs, conf, name, keyClass, valClass,
bufferSize, replication, blockSize,
codec, progress, metadata);
}
return writer;
}
public static boolean isCompressionCodecSupported(InputFormat<?, ?> inputFormat, Path path)
{
if (inputFormat instanceof TextInputFormat) {
return getCompressionCodec((TextInputFormat) inputFormat, path)
.map(codec -> (codec instanceof GzipCodec) || (codec instanceof BZip2Codec))
.orElse(false); // TODO (https://github.com/prestosql/presto/issues/2475) fix S3 Select when file not compressed
}
return false;
}
protected CompressionType getCompressionType(Path path)
{
CompressionCodec codec = compressionCodecFactory.getCodec(path);
if (codec == null) {
return CompressionType.NONE;
}
if (codec instanceof GzipCodec) {
return CompressionType.GZIP;
}
if (codec instanceof BZip2Codec) {
return CompressionType.BZIP2;
}
throw new PrestoException(NOT_SUPPORTED, "Compression extension not supported for S3 Select: " + path);
}
@Override
public String toString() {
switch (this) {
case NONE: return "NONE";
case DEFAULT: return DefaultCodec.class.getName();
case BZIP: return BZip2Codec.class.getName();
case GZIP: return GzipCodec.class.getName();
case LZ4: return Lz4Codec.class.getName();
case SNAPPY: return SnappyCodec.class.getName();
case AUTOMATIC: return "Automatically Detected";
}
return null;
}
public HdfsSink2(Hdfs2SinkConfig config)
throws ClassNotFoundException
{
this.batchSize = config.getBatchBufferSize();
this.writerDir = config.getWriteDir();
switch (config.getZipType().trim().toLowerCase()) {
case "lzo":
codecClass = (Class<? extends CompressionCodec>) Class.forName("com.hadoop.compression.lzo.LzopCodec");
break;
case "lz4":
codecClass = Lz4Codec.class;
break;
case "snappy":
codecClass = SnappyCodec.class;
break;
case "gzip":
codecClass = GzipCodec.class;
break;
case "bzip2":
codecClass = BZip2Codec.class;
break;
case "default":
codecClass = DefaultCodec.class;
break;
default:
codecClass = NoneCodec.class;
}
}
public void testNonSplittingGzipFile() throws IOException {
SplittingOutputStream os = new SplittingOutputStream(getConf(),
getWritePath(), "nonsplit-", 0, new GzipCodec());
SplittableBufferedWriter w = new SplittableBufferedWriter(os, true);
try {
w.allowSplit();
w.write("This is a string!");
w.newLine();
w.write("This is another string!");
w.allowSplit();
} finally {
w.close();
}
// Ensure we made exactly one file.
Path writePath = new Path(getWritePath(), "nonsplit-00000.gz");
Path badPath = new Path(getWritePath(), "nonsplit-00001.gz");
verifyFileExists(writePath);
verifyFileDoesNotExist(badPath); // Ensure we didn't make a second file.
// Now ensure all the data got there.
String [] expectedLines = {
"This is a string!",
"This is another string!",
};
verifyFileContents(
new GZIPInputStream(new FileInputStream(new File(getWriteDir(),
"nonsplit-00000.gz"))), expectedLines);
}
public void testSplittingGzipFile() throws IOException {
SplittingOutputStream os = new SplittingOutputStream(getConf(),
getWritePath(), "splitz-", 3, new GzipCodec());
SplittableBufferedWriter w = new SplittableBufferedWriter(os, true);
try {
w.write("This is a string!");
w.newLine();
w.write("This is another string!");
} finally {
w.close();
}
// Ensure we made exactly two files.
Path writePath = new Path(getWritePath(), "splitz-00000.gz");
Path writePath2 = new Path(getWritePath(), "splitz-00001.gz");
Path badPath = new Path(getWritePath(), "splitz-00002.gz");
verifyFileExists(writePath);
verifyFileExists(writePath2);
verifyFileDoesNotExist(badPath); // Ensure we didn't make three files.
// Now ensure all the data got there.
String [] expectedLines0 = {
"This is a string!",
};
verifyFileContents(
new GZIPInputStream(new FileInputStream(new File(getWriteDir(),
"splitz-00000.gz"))), expectedLines0);
String [] expectedLines1 = {
"This is another string!",
};
verifyFileContents(
new GZIPInputStream(new FileInputStream(new File(getWriteDir(),
"splitz-00001.gz"))), expectedLines1);
}
public void testGetCodec() throws IOException {
verifyCodec(GzipCodec.class, "gzip");
verifyCodec(GzipCodec.class, "Gzip");
verifyCodec(GzipCodec.class, "GZIP");
verifyCodec(GzipCodec.class, "gzipcodec");
verifyCodec(GzipCodec.class, "GzipCodec");
verifyCodec(GzipCodec.class, "GZIPCODEC");
verifyCodec(GzipCodec.class, "org.apache.hadoop.io.compress.GzipCodec");
}
/**
* Test using the gzip codec for reading
*/
@Test(timeout=10000)
public void testGzip() throws IOException {
JobConf job = new JobConf(defaultConf);
CompressionCodec gzip = new GzipCodec();
ReflectionUtils.setConf(gzip, job);
localFs.delete(workDir, true);
writeFile(localFs, new Path(workDir, "part1.txt.gz"), gzip,
"the quick\nbrown\nfox jumped\nover\n the lazy\n dog\n");
writeFile(localFs, new Path(workDir, "part2.txt.gz"), gzip,
"this is a test\nof gzip\n");
FileInputFormat.setInputPaths(job, workDir);
CombineTextInputFormat format = new CombineTextInputFormat();
InputSplit[] splits = format.getSplits(job, 100);
assertEquals("compressed splits == 1", 1, splits.length);
List<Text> results = readSplit(format, splits[0], job);
assertEquals("splits[0] length", 8, results.size());
final String[] firstList =
{"the quick", "brown", "fox jumped", "over", " the lazy", " dog"};
final String[] secondList = {"this is a test", "of gzip"};
String first = results.get(0).toString();
if (first.equals(firstList[0])) {
testResults(results, firstList, secondList);
} else if (first.equals(secondList[0])) {
testResults(results, secondList, firstList);
} else {
fail("unexpected first token!");
}
}
/**
* Test using the gzip codec for reading
*/
@Test(timeout=10000)
public void testGzip() throws IOException, InterruptedException {
Configuration conf = new Configuration(defaultConf);
CompressionCodec gzip = new GzipCodec();
ReflectionUtils.setConf(gzip, conf);
localFs.delete(workDir, true);
writeFile(localFs, new Path(workDir, "part1.txt.gz"), gzip,
"the quick\nbrown\nfox jumped\nover\n the lazy\n dog\n");
writeFile(localFs, new Path(workDir, "part2.txt.gz"), gzip,
"this is a test\nof gzip\n");
Job job = Job.getInstance(conf);
FileInputFormat.setInputPaths(job, workDir);
CombineTextInputFormat format = new CombineTextInputFormat();
List<InputSplit> splits = format.getSplits(job);
assertEquals("compressed splits == 1", 1, splits.size());
List<Text> results = readSplit(format, splits.get(0), job);
assertEquals("splits[0] length", 8, results.size());
final String[] firstList =
{"the quick", "brown", "fox jumped", "over", " the lazy", " dog"};
final String[] secondList = {"this is a test", "of gzip"};
String first = results.get(0).toString();
if (first.equals(firstList[0])) {
testResults(results, firstList, secondList);
} else if (first.equals(secondList[0])) {
testResults(results, secondList, firstList);
} else {
fail("unexpected first token!");
}
}
@Test
public void testCompressedFile() throws Exception {
String file = testFile.getCanonicalPath();
HDFSCompressedDataStream stream = new HDFSCompressedDataStream();
context.put("hdfs.useRawLocalFileSystem", "true");
stream.configure(context);
stream.open(file, new GzipCodec(), CompressionType.RECORD);
stream.append(event);
stream.sync();
Assert.assertTrue(testFile.length() > 0);
}