下面列出了怎么用org.apache.hadoop.io.compress.DefaultCodec的API类实例代码及写法,或者点击链接到github查看源代码。
@Test
/**
* Create an IFile.Writer using GzipCodec since this code does not
* have a compressor when run via the tests (ie no native libraries).
*/
public void testIFileWriterWithCodec() throws Exception {
Configuration conf = new Configuration();
FileSystem localFs = FileSystem.getLocal(conf);
FileSystem rfs = ((LocalFileSystem)localFs).getRaw();
Path path = new Path(new Path("build/test.ifile"), "data");
DefaultCodec codec = new GzipCodec();
codec.setConf(conf);
IFile.Writer<Text, Text> writer =
new IFile.Writer<Text, Text>(conf, rfs.create(path), Text.class, Text.class,
codec, null);
writer.close();
}
@Test
/** Same as above but create a reader. */
public void testIFileReaderWithCodec() throws Exception {
Configuration conf = new Configuration();
FileSystem localFs = FileSystem.getLocal(conf);
FileSystem rfs = ((LocalFileSystem)localFs).getRaw();
Path path = new Path(new Path("build/test.ifile"), "data");
DefaultCodec codec = new GzipCodec();
codec.setConf(conf);
FSDataOutputStream out = rfs.create(path);
IFile.Writer<Text, Text> writer =
new IFile.Writer<Text, Text>(conf, out, Text.class, Text.class,
codec, null);
writer.close();
FSDataInputStream in = rfs.open(path);
IFile.Reader<Text, Text> reader =
new IFile.Reader<Text, Text>(conf, in, rfs.getFileStatus(path).getLen(),
codec, null);
reader.close();
// test check sum
byte[] ab= new byte[100];
int readed= reader.checksumIn.readWithChecksum(ab, 0, ab.length);
assertEquals( readed,reader.checksumIn.getChecksum().length);
}
private Configuration createConfiguration(OutputContext outputContext,
Class<? extends Writable> keyClass, Class<? extends Writable> valClass,
boolean shouldCompress, int maxSingleBufferSizeBytes,
Class<? extends Partitioner> partitionerClass) {
Configuration conf = new Configuration(false);
conf.set(CommonConfigurationKeys.FS_PERMISSIONS_UMASK_KEY, "077");
conf.setStrings(TezRuntimeFrameworkConfigs.LOCAL_DIRS, outputContext.getWorkDirs());
conf.set(TezRuntimeConfiguration.TEZ_RUNTIME_KEY_CLASS, keyClass.getName());
conf.set(TezRuntimeConfiguration.TEZ_RUNTIME_VALUE_CLASS, valClass.getName());
conf.set(TezRuntimeConfiguration.TEZ_RUNTIME_PARTITIONER_CLASS, partitionerClass.getName());
if (maxSingleBufferSizeBytes >= 0) {
conf.setInt(TezRuntimeConfiguration.TEZ_RUNTIME_UNORDERED_OUTPUT_MAX_PER_BUFFER_SIZE_BYTES,
maxSingleBufferSizeBytes);
}
conf.setBoolean(TezRuntimeConfiguration.TEZ_RUNTIME_COMPRESS, shouldCompress);
if (shouldCompress) {
conf.set(TezRuntimeConfiguration.TEZ_RUNTIME_COMPRESS_CODEC,
DefaultCodec.class.getName());
}
conf.set(TezRuntimeConfiguration.TEZ_RUNTIME_REPORT_PARTITION_STATS,
reportPartitionStats.getType());
return conf;
}
@Test
/**
* Create an IFile.Writer using GzipCodec since this code does not
* have a compressor when run via the tests (ie no native libraries).
*/
public void testIFileWriterWithCodec() throws Exception {
Configuration conf = new Configuration();
FileSystem localFs = FileSystem.getLocal(conf);
FileSystem rfs = ((LocalFileSystem)localFs).getRaw();
Path path = new Path(new Path("build/test.ifile"), "data");
DefaultCodec codec = new GzipCodec();
codec.setConf(conf);
IFile.Writer<Text, Text> writer =
new IFile.Writer<Text, Text>(conf, rfs.create(path), Text.class, Text.class,
codec, null);
writer.close();
}
@Test
/** Same as above but create a reader. */
public void testIFileReaderWithCodec() throws Exception {
Configuration conf = new Configuration();
FileSystem localFs = FileSystem.getLocal(conf);
FileSystem rfs = ((LocalFileSystem)localFs).getRaw();
Path path = new Path(new Path("build/test.ifile"), "data");
DefaultCodec codec = new GzipCodec();
codec.setConf(conf);
FSDataOutputStream out = rfs.create(path);
IFile.Writer<Text, Text> writer =
new IFile.Writer<Text, Text>(conf, out, Text.class, Text.class,
codec, null);
writer.close();
FSDataInputStream in = rfs.open(path);
IFile.Reader<Text, Text> reader =
new IFile.Reader<Text, Text>(conf, in, rfs.getFileStatus(path).getLen(),
codec, null);
reader.close();
// test check sum
byte[] ab= new byte[100];
int readed= reader.checksumIn.readWithChecksum(ab, 0, ab.length);
assertEquals( readed,reader.checksumIn.getChecksum().length);
}
public static Class<? extends CompressionCodec> getIntermediateInputCompressorClass(
Configuration conf, Class<DefaultCodec> defaultValue) {
Class<? extends CompressionCodec> codecClass = defaultValue;
String name = conf
.get(TezRuntimeConfiguration.TEZ_RUNTIME_COMPRESS_CODEC);
if (name != null) {
try {
codecClass = conf.getClassByName(name).asSubclass(
CompressionCodec.class);
} catch (ClassNotFoundException e) {
throw new IllegalArgumentException("Compression codec " + name
+ " was not found.", e);
}
}
return codecClass;
}
@Override
public void setStoreLocation(String location, Job job)
throws IOException {
job.setOutputKeyClass(keyClass);
job.setOutputValueClass(valueClass);
if (compressionType != null && compressionCodecClass != null) {
Class<? extends CompressionCodec> codecClass =
FileOutputFormat.getOutputCompressorClass(job,
DefaultCodec.class);
SequenceFileOutputFormat.
setOutputCompressorClass(job, codecClass);
SequenceFileOutputFormat.setOutputCompressionType(job,
SequenceFile.CompressionType.valueOf(compressionType));
}
FileOutputFormat.setOutputPath(job, new Path(location));
}
private FileWriter createWriter() throws Exception {
PowerMockito.mockStatic(FileUtil.class);
PowerMockito.mockStatic(ReflectionUtil.class);
FileWriter writer = Mockito.mock(FileWriter.class);
Mockito.when(
ReflectionUtil.createFileWriter(
Mockito.any(String.class),
Mockito.any(LogFilePath.class),
Mockito.any(CompressionCodec.class),
Mockito.any(SecorConfig.class)
))
.thenReturn(writer);
Mockito.when(writer.getLength()).thenReturn(123L);
FileWriter createdWriter = mRegistry.getOrCreateWriter(
mLogFilePath, new DefaultCodec());
assertTrue(createdWriter == writer);
return writer;
}
private static CompressionCodecName getCodec(JobConf conf) {
CompressionCodecName codec;
if (ParquetOutputFormat.isCompressionSet(conf)) { // explicit parquet config
codec = ParquetOutputFormat.getCompression(conf);
} else if (getCompressOutput(conf)) { // from hadoop config
// find the right codec
Class<?> codecClass = getOutputCompressorClass(conf, DefaultCodec.class);
LOG.info("Compression set through hadoop codec: " + codecClass.getName());
codec = CompressionCodecName.fromCompressionCodec(codecClass);
} else {
codec = CompressionCodecName.UNCOMPRESSED;
}
LOG.info("Compression: " + codec.name());
return codec;
}
public MapOutputCopier(JobConf job, Reporter reporter) {
setName("MapOutputCopier " + reduceTask.getTaskID() + "." + id);
LOG.debug(getName() + " created");
this.reporter = reporter;
shuffleConnectionTimeout =
job.getInt("mapreduce.reduce.shuffle.connect.timeout", STALLED_COPY_TIMEOUT);
shuffleReadTimeout =
job.getInt("mapreduce.reduce.shuffle.read.timeout", DEFAULT_READ_TIMEOUT);
if (job.getCompressMapOutput()) {
Class<? extends CompressionCodec> codecClass =
job.getMapOutputCompressorClass(DefaultCodec.class);
codec = ReflectionUtils.newInstance(codecClass, job);
decompressor = CodecPool.getDecompressor(codec);
}
setDaemon(true);
}
public HdfsSink2(Hdfs2SinkConfig config)
throws ClassNotFoundException
{
this.batchSize = config.getBatchBufferSize();
this.writerDir = config.getWriteDir();
switch (config.getZipType().trim().toLowerCase()) {
case "lzo":
codecClass = (Class<? extends CompressionCodec>) Class.forName("com.hadoop.compression.lzo.LzopCodec");
break;
case "lz4":
codecClass = Lz4Codec.class;
break;
case "snappy":
codecClass = SnappyCodec.class;
break;
case "gzip":
codecClass = GzipCodec.class;
break;
case "bzip2":
codecClass = BZip2Codec.class;
break;
case "default":
codecClass = DefaultCodec.class;
break;
default:
codecClass = NoneCodec.class;
}
}
public void testValueIteratorWithCompression() throws Exception {
Path tmpDir = new Path("build/test/test.reduce.task.compression");
Configuration conf = new Configuration();
DefaultCodec codec = new DefaultCodec();
codec.setConf(conf);
for (Pair[] testCase: testCases) {
runValueIterator(tmpDir, testCase, conf, codec);
}
}
public RecordWriter<WritableComparable, Writable> getRecordWriter(FileSystem ignored, JobConf job,
String name, Progressable progress)
throws IOException {
// get the path of the temporary output file
Path file = FileOutputFormat.getTaskOutputPath(job, name);
FileSystem fs = file.getFileSystem(job);
CompressionCodec codec = null;
CompressionType compressionType = CompressionType.NONE;
if (getCompressOutput(job)) {
// find the kind of compression to do
compressionType = SequenceFileOutputFormat.getOutputCompressionType(job);
// find the right codec
Class<? extends CompressionCodec> codecClass = getOutputCompressorClass(job,
DefaultCodec.class);
codec = ReflectionUtils.newInstance(codecClass, job);
}
// ignore the progress parameter, since MapFile is local
final MapFile.Writer out =
new MapFile.Writer(job, fs, file.toString(),
job.getOutputKeyClass().asSubclass(WritableComparable.class),
job.getOutputValueClass().asSubclass(Writable.class),
compressionType, codec,
progress);
return new RecordWriter<WritableComparable, Writable>() {
public void write(WritableComparable key, Writable value)
throws IOException {
out.append(key, value);
}
public void close(Reporter reporter) throws IOException { out.close();}
};
}
private CompressionCodec initCodec() {
// check if map-outputs are to be compressed
if (conf.getCompressMapOutput()) {
Class<? extends CompressionCodec> codecClass =
conf.getMapOutputCompressorClass(DefaultCodec.class);
return ReflectionUtils.newInstance(codecClass, conf);
}
return null;
}
public RecordWriter<K, V> getRecordWriter(
FileSystem ignored, JobConf job,
String name, Progressable progress)
throws IOException {
// get the path of the temporary output file
Path file = FileOutputFormat.getTaskOutputPath(job, name);
FileSystem fs = file.getFileSystem(job);
CompressionCodec codec = null;
CompressionType compressionType = CompressionType.NONE;
if (getCompressOutput(job)) {
// find the kind of compression to do
compressionType = getOutputCompressionType(job);
// find the right codec
Class<? extends CompressionCodec> codecClass = getOutputCompressorClass(job,
DefaultCodec.class);
codec = ReflectionUtils.newInstance(codecClass, job);
}
final SequenceFile.Writer out =
SequenceFile.createWriter(fs, job, file,
job.getOutputKeyClass(),
job.getOutputValueClass(),
compressionType,
codec,
progress);
return new RecordWriter<K, V>() {
public void write(K key, V value)
throws IOException {
out.append(key, value);
}
public void close(Reporter reporter) throws IOException { out.close();}
};
}
/** Test hsync via SequenceFiles */
@Test
public void testSequenceFileSync() throws Exception {
Configuration conf = new HdfsConfiguration();
MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf).build();
final FileSystem fs = cluster.getFileSystem();
final Path p = new Path("/testSequenceFileSync/foo");
final int len = 1 << 16;
FSDataOutputStream out = fs.create(p, FsPermission.getDefault(),
EnumSet.of(CreateFlag.CREATE, CreateFlag.OVERWRITE, CreateFlag.SYNC_BLOCK),
4096, (short) 1, len, null);
Writer w = SequenceFile.createWriter(new Configuration(),
Writer.stream(out),
Writer.keyClass(RandomDatum.class),
Writer.valueClass(RandomDatum.class),
Writer.compression(CompressionType.NONE, new DefaultCodec()));
w.hflush();
checkSyncMetric(cluster, 0);
w.hsync();
checkSyncMetric(cluster, 1);
int seed = new Random().nextInt();
RandomDatum.Generator generator = new RandomDatum.Generator(seed);
generator.next();
w.append(generator.getKey(), generator.getValue());
w.hsync();
checkSyncMetric(cluster, 2);
w.close();
checkSyncMetric(cluster, 2);
out.close();
checkSyncMetric(cluster, 3);
cluster.shutdown();
}
public void testValueIteratorWithCompression() throws Exception {
Path tmpDir = new Path("build/test/test.reduce.task.compression");
Configuration conf = new Configuration();
DefaultCodec codec = new DefaultCodec();
codec.setConf(conf);
for (Pair[] testCase: testCases) {
runValueIterator(tmpDir, testCase, conf, codec);
}
}
/** Create the named map using the named key comparator. */
public Writer(Configuration conf, FileSystem fs, String dirName,
WritableComparator comparator, Class valClass,
SequenceFile.CompressionType compress,
Progressable progress)
throws IOException {
this(conf, fs, dirName, comparator, valClass,
compress, new DefaultCodec(), progress);
}
private CompressionCodec initCodec() {
// check if map-outputs are to be compressed
if (conf.getCompressMapOutput()) {
Class<? extends CompressionCodec> codecClass =
conf.getMapOutputCompressorClass(DefaultCodec.class);
return ReflectionUtils.newInstance(codecClass, conf);
}
return null;
}
public RecordWriter<K, V> getRecordWriter(
FileSystem ignored, JobConf job,
String name, Progressable progress)
throws IOException {
// get the path of the temporary output file
Path file = FileOutputFormat.getTaskOutputPath(job, name);
FileSystem fs = file.getFileSystem(job);
CompressionCodec codec = null;
CompressionType compressionType = CompressionType.NONE;
if (getCompressOutput(job)) {
// find the kind of compression to do
compressionType = getOutputCompressionType(job);
// find the right codec
Class<? extends CompressionCodec> codecClass = getOutputCompressorClass(job,
DefaultCodec.class);
codec = ReflectionUtils.newInstance(codecClass, job);
}
final SequenceFile.Writer out =
SequenceFile.createWriter(fs, job, file,
job.getOutputKeyClass(),
job.getOutputValueClass(),
compressionType,
codec,
progress);
return new RecordWriter<K, V>() {
public void write(K key, V value)
throws IOException {
out.append(key, value);
}
public void close(Reporter reporter) throws IOException { out.close();}
};
}
/** Test hsync via SequenceFiles */
@Test
public void testSequenceFileSync() throws Exception {
Configuration conf = new HdfsConfiguration();
MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf).build();
final FileSystem fs = cluster.getFileSystem();
final Path p = new Path("/testSequenceFileSync/foo");
final int len = 1 << 16;
FSDataOutputStream out = fs.create(p, FsPermission.getDefault(),
EnumSet.of(CreateFlag.CREATE, CreateFlag.OVERWRITE, CreateFlag.SYNC_BLOCK),
4096, (short) 1, len, null);
Writer w = SequenceFile.createWriter(new Configuration(),
Writer.stream(out),
Writer.keyClass(RandomDatum.class),
Writer.valueClass(RandomDatum.class),
Writer.compression(CompressionType.NONE, new DefaultCodec()));
w.hflush();
checkSyncMetric(cluster, 0);
w.hsync();
checkSyncMetric(cluster, 1);
int seed = new Random().nextInt();
RandomDatum.Generator generator = new RandomDatum.Generator(seed);
generator.next();
w.append(generator.getKey(), generator.getValue());
w.hsync();
checkSyncMetric(cluster, 2);
w.close();
checkSyncMetric(cluster, 2);
out.close();
checkSyncMetric(cluster, 3);
cluster.shutdown();
}
protected CompressionCodec getCompressionCodec() {
if (connectorConfig.getCompression() == null) {
return null;
}
CompressionCodec codec = getCompressionCodecFactory()
.getCodecByName(connectorConfig.getCompression().name());
return (codec != null) ? codec : new DefaultCodec();
}
protected CompressionCodec getCompressionCodec() {
if (connectorConfig.getCompression() == null) {
return null;
}
CompressionCodec codec = getCompressionCodecFactory()
.getCodecByName(connectorConfig.getCompression().name());
return (codec != null) ? codec : new DefaultCodec();
}
@DataProvider(name="test-hdfs-extractor")
public static Object[][] data() {
List<Object[]> parameters = new ArrayList<Object[]>();
for (Class<?> compressionClass : new Class<?>[]{null, DefaultCodec.class, BZip2Codec.class}) {
for (Object outputFileType : new Object[]{TEXT_FILE, SEQUENCE_FILE}) {
parameters.add(new Object[]{outputFileType, compressionClass});
}
}
return parameters.toArray(new Object[0][]);
}
public RecordWriter<WritableComparable, Writable> getRecordWriter(FileSystem ignored, JobConf job,
String name, Progressable progress)
throws IOException {
// get the path of the temporary output file
Path file = FileOutputFormat.getTaskOutputPath(job, name);
FileSystem fs = file.getFileSystem(job);
CompressionCodec codec = null;
CompressionType compressionType = CompressionType.NONE;
if (getCompressOutput(job)) {
// find the kind of compression to do
compressionType = SequenceFileOutputFormat.getOutputCompressionType(job);
// find the right codec
Class<? extends CompressionCodec> codecClass = getOutputCompressorClass(job,
DefaultCodec.class);
codec = ReflectionUtils.newInstance(codecClass, job);
}
// ignore the progress parameter, since MapFile is local
final MapFile.Writer out =
new MapFile.Writer(job, fs, file.toString(),
job.getOutputKeyClass().asSubclass(WritableComparable.class),
job.getOutputValueClass().asSubclass(Writable.class),
compressionType, codec,
progress);
return new RecordWriter<WritableComparable, Writable>() {
public void write(WritableComparable key, Writable value)
throws IOException {
out.append(key, value);
}
public void close(Reporter reporter) throws IOException { out.close();}
};
}
public void testValueIteratorWithCompression() throws Exception {
Path tmpDir = new Path("build/test/test.reduce.task.compression");
Configuration conf = new Configuration();
DefaultCodec codec = new DefaultCodec();
codec.setConf(conf);
for (Pair[] testCase: testCases) {
runValueIterator(tmpDir, testCase, conf, codec);
}
}
@Override
DefaultCodec getCodec(Configuration conf) {
if (codec == null) {
synchronized (lock) {
if (codec == null) {
codec = buildCodec(conf);
}
}
}
return codec;
}
/**
* See {@link StateStore#put(String, String, T)}.
*
* <p>
* This implementation does not support putting the state object into an existing store as
* append is to be supported by the Hadoop SequenceFile (HADOOP-7139).
* </p>
*/
@Override
public void put(String storeName, String tableName, T state) throws IOException {
String tmpTableName = this.useTmpFileForPut ? TMP_FILE_PREFIX + tableName : tableName;
Path tmpTablePath = new Path(new Path(this.storeRootDir, storeName), tmpTableName);
if (!this.fs.exists(tmpTablePath) && !create(storeName, tmpTableName)) {
throw new IOException("Failed to create a state file for table " + tmpTableName);
}
Closer closer = Closer.create();
try {
@SuppressWarnings("deprecation")
SequenceFile.Writer writer = closer.register(SequenceFile.createWriter(this.fs, this.conf, tmpTablePath,
Text.class, this.stateClass, SequenceFile.CompressionType.BLOCK, new DefaultCodec()));
writer.append(new Text(Strings.nullToEmpty(state.getId())), state);
} catch (Throwable t) {
throw closer.rethrow(t);
} finally {
closer.close();
}
if (this.useTmpFileForPut) {
Path tablePath = new Path(new Path(this.storeRootDir, storeName), tableName);
renamePath(tmpTablePath, tablePath);
}
}
/**
* See {@link StateStore#putAll(String, String, Collection)}.
*
* <p>
* This implementation does not support putting the state objects into an existing store as
* append is to be supported by the Hadoop SequenceFile (HADOOP-7139).
* </p>
*/
@Override
public void putAll(String storeName, String tableName, Collection<T> states) throws IOException {
String tmpTableName = this.useTmpFileForPut ? TMP_FILE_PREFIX + tableName : tableName;
Path tmpTablePath = new Path(new Path(this.storeRootDir, storeName), tmpTableName);
if (!this.fs.exists(tmpTablePath) && !create(storeName, tmpTableName)) {
throw new IOException("Failed to create a state file for table " + tmpTableName);
}
Closer closer = Closer.create();
try {
@SuppressWarnings("deprecation")
SequenceFile.Writer writer = closer.register(SequenceFile.createWriter(this.fs, this.conf, tmpTablePath,
Text.class, this.stateClass, SequenceFile.CompressionType.BLOCK, new DefaultCodec()));
for (T state : states) {
writer.append(new Text(Strings.nullToEmpty(state.getId())), state);
}
} catch (Throwable t) {
throw closer.rethrow(t);
} finally {
closer.close();
}
if (this.useTmpFileForPut) {
Path tablePath = new Path(new Path(this.storeRootDir, storeName), tableName);
renamePath(tmpTablePath, tablePath);
}
}
/**
* Write the sequence file.
*
* @param args the command-line arguments
* @return the process exit code
* @throws Exception if something goes wrong
*/
public int run(final String[] args) throws Exception {
Cli cli = Cli.builder().setArgs(args).addOptions(CliCommonOpts.MrIoOpts.values()).build();
int result = cli.runCmd();
if (result != 0) {
return result;
}
Path inputPath = new Path(cli.getArgValueAsString(CliCommonOpts.MrIoOpts.INPUT));
Path outputPath = new Path(cli.getArgValueAsString(CliCommonOpts.MrIoOpts.OUTPUT));
Configuration conf = super.getConf();
Job job = new Job(conf);
job.setJarByClass(SequenceFileProtobufMapReduce.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Stock.class);
job.setInputFormatClass(SequenceFileInputFormat.class);
job.setOutputFormatClass(SequenceFileOutputFormat.class);
job.setMapperClass(PbMapper.class);
job.setReducerClass(PbReducer.class);
SequenceFileOutputFormat.setCompressOutput(job, true);
SequenceFileOutputFormat.setOutputCompressionType(job, SequenceFile.CompressionType.BLOCK);
SequenceFileOutputFormat.setOutputCompressorClass(job, DefaultCodec.class);
ProtobufSerialization.register(job.getConfiguration());
FileInputFormat.setInputPaths(job, inputPath);
FileOutputFormat.setOutputPath(job, outputPath);
if (job.waitForCompletion(true)) {
return 0;
}
return 1;
}