下面列出了org.apache.hadoop.mapreduce.lib.output.FileOutputFormat#getCompressOutput ( ) 实例代码,或者点击链接到github查看源代码,也可以在右侧发表评论。
static <T> void configureDataFileWriter(DataFileWriter<T> writer,
TaskAttemptContext context) throws UnsupportedEncodingException {
if (FileOutputFormat.getCompressOutput(context)) {
int level = context.getConfiguration()
.getInt(DEFLATE_LEVEL_KEY, DEFAULT_DEFLATE_LEVEL);
String codecName = context.getConfiguration()
.get(org.apache.avro.mapred.AvroJob.OUTPUT_CODEC, DEFLATE_CODEC);
CodecFactory factory =
codecName.equals(DEFLATE_CODEC) ? CodecFactory.deflateCodec(level)
: CodecFactory.fromString(codecName);
writer.setCodec(factory);
}
writer.setSyncInterval(context.getConfiguration()
.getInt(SYNC_INTERVAL_KEY, DEFAULT_SYNC_INTERVAL));
// copy metadata from job
for (Map.Entry<String, String> e : context.getConfiguration()) {
if (e.getKey().startsWith(org.apache.avro.mapred.AvroJob.TEXT_PREFIX)) {
writer.setMeta(e.getKey()
.substring(org.apache.avro.mapred.AvroJob.TEXT_PREFIX.length()),
e.getValue());
}
if (e.getKey().startsWith(org.apache.avro.mapred.AvroJob.BINARY_PREFIX)) {
writer.setMeta(e.getKey()
.substring(org.apache.avro.mapred.AvroJob.BINARY_PREFIX.length()),
URLDecoder.decode(e.getValue(), "ISO-8859-1").getBytes("ISO-8859-1"));
}
}
}
@Override
public RecordWriter<NullWritable, Object> getRecordWriter(TaskAttemptContext context) throws IOException, InterruptedException {
if (schema == null)
throw new IOException("Must provide a schema");
Configuration conf = context.getConfiguration();
DataFileWriter<Object> writer = new DataFileWriter<Object>(new PigAvroDatumWriter(schema));
if (FileOutputFormat.getCompressOutput(context)) {
int level = conf.getInt(DEFLATE_LEVEL_KEY, DEFAULT_DEFLATE_LEVEL);
String codecName = conf.get(OUTPUT_CODEC, DEFLATE_CODEC);
CodecFactory factory = codecName.equals(DEFLATE_CODEC)
? CodecFactory.deflateCodec(level)
: CodecFactory.fromString(codecName);
writer.setCodec(factory);
}
// Do max as core-default.xml has io.file.buffer.size as 4K
writer.setSyncInterval(conf.getInt(SYNC_INTERVAL_KEY, Math.max(
conf.getInt("io.file.buffer.size", DEFAULT_SYNC_INTERVAL), DEFAULT_SYNC_INTERVAL)));
Path path = getDefaultWorkFile(context, EXT);
writer.create(schema, path.getFileSystem(conf).create(path));
return new PigAvroRecordWriter(writer);
}
/**
* Checks to make sure the configuration is valid, the output path doesn't already exist, and that
* a connection to BigQuery can be established.
*/
@Override
public void checkOutputSpecs(JobContext job) throws FileAlreadyExistsException, IOException {
Configuration conf = job.getConfiguration();
// Validate the output configuration.
BigQueryOutputConfiguration.validateConfiguration(conf);
// Get the output path.
Path outputPath = BigQueryOutputConfiguration.getGcsOutputPath(conf);
logger.atInfo().log("Using output path '%s'.", outputPath);
// Error if the output path already exists.
FileSystem outputFileSystem = outputPath.getFileSystem(conf);
if (outputFileSystem.exists(outputPath)) {
throw new IOException("The output path '" + outputPath + "' already exists.");
}
// Error if compression is set as there's mixed support in BigQuery.
if (FileOutputFormat.getCompressOutput(job)) {
throw new IOException("Compression isn't supported for this OutputFormat.");
}
// Error if unable to create a BigQuery helper.
try {
new BigQueryFactory().getBigQueryHelper(conf);
} catch (GeneralSecurityException gse) {
throw new IOException("Failed to create BigQuery client", gse);
}
// Let delegate process its checks.
getDelegate(conf).checkOutputSpecs(job);
}
@Override
public RecordWriter<NullWritable, Object> getRecordWriter(TaskAttemptContext context) throws IOException, InterruptedException {
if (schema == null)
throw new IOException("Must provide a schema");
Configuration conf = context.getConfiguration();
DataFileWriter<Object> writer = new DataFileWriter<Object>(new PigAvroDatumWriter(schema));
if (FileOutputFormat.getCompressOutput(context)) {
int level = conf.getInt(DEFLATE_LEVEL_KEY, DEFAULT_DEFLATE_LEVEL);
String codecName = conf.get(OUTPUT_CODEC, DEFLATE_CODEC);
CodecFactory factory = codecName.equals(DEFLATE_CODEC)
? CodecFactory.deflateCodec(level)
: CodecFactory.fromString(codecName);
writer.setCodec(factory);
}
// Do max as core-default.xml has io.file.buffer.size as 4K
writer.setSyncInterval(conf.getInt(SYNC_INTERVAL_KEY, Math.max(
conf.getInt("io.file.buffer.size", DEFAULT_SYNC_INTERVAL), DEFAULT_SYNC_INTERVAL)));
Path path = getDefaultWorkFile(context, EXT);
writer.create(schema, path.getFileSystem(conf).create(path));
return new PigAvroRecordWriter(writer);
}
@Override
protected void setup(Context context)
throws IOException, InterruptedException {
if (!context.nextKey()
|| context.getCurrentKey().getType() != GridmixKey.REDUCE_SPEC) {
throw new IOException("Missing reduce spec");
}
long outBytes = 0L;
long outRecords = 0L;
long inRecords = 0L;
ResourceUsageMetrics metrics = new ResourceUsageMetrics();
for (GridmixRecord ignored : context.getValues()) {
final GridmixKey spec = context.getCurrentKey();
inRecords += spec.getReduceInputRecords();
outBytes += spec.getReduceOutputBytes();
outRecords += spec.getReduceOutputRecords();
if (spec.getReduceResourceUsageMetrics() != null) {
metrics = spec.getReduceResourceUsageMetrics();
}
}
if (0 == outRecords && inRecords > 0) {
LOG.info("Spec output bytes w/o records. Using input record count");
outRecords = inRecords;
}
// enable gridmix reduce output record for compression
Configuration conf = context.getConfiguration();
if (CompressionEmulationUtil.isCompressionEmulationEnabled(conf)
&& FileOutputFormat.getCompressOutput(context)) {
float compressionRatio =
CompressionEmulationUtil
.getJobOutputCompressionEmulationRatio(conf);
LOG.info("GridMix is configured to use a compression ratio of "
+ compressionRatio + " for the reduce output data.");
val.setCompressibility(true, compressionRatio);
// Set the actual output data size to make sure that the actual output
// data size is same after compression
outBytes /= compressionRatio;
}
factory =
new AvgRecordFactory(outBytes, outRecords,
context.getConfiguration(), 5*1024);
ratio = outRecords / (1.0 * inRecords);
acc = 0.0;
matcher = new ResourceUsageMatcherRunner(context, metrics);
// start the status reporter thread
reporter = new StatusReporter(context, matcher);
reporter.start();
}
@Override
protected void setup(Context context)
throws IOException, InterruptedException {
if (!context.nextKey()
|| context.getCurrentKey().getType() != GridmixKey.REDUCE_SPEC) {
throw new IOException("Missing reduce spec");
}
long outBytes = 0L;
long outRecords = 0L;
long inRecords = 0L;
ResourceUsageMetrics metrics = new ResourceUsageMetrics();
for (GridmixRecord ignored : context.getValues()) {
final GridmixKey spec = context.getCurrentKey();
inRecords += spec.getReduceInputRecords();
outBytes += spec.getReduceOutputBytes();
outRecords += spec.getReduceOutputRecords();
if (spec.getReduceResourceUsageMetrics() != null) {
metrics = spec.getReduceResourceUsageMetrics();
}
}
if (0 == outRecords && inRecords > 0) {
LOG.info("Spec output bytes w/o records. Using input record count");
outRecords = inRecords;
}
// enable gridmix reduce output record for compression
Configuration conf = context.getConfiguration();
if (CompressionEmulationUtil.isCompressionEmulationEnabled(conf)
&& FileOutputFormat.getCompressOutput(context)) {
float compressionRatio =
CompressionEmulationUtil
.getJobOutputCompressionEmulationRatio(conf);
LOG.info("GridMix is configured to use a compression ratio of "
+ compressionRatio + " for the reduce output data.");
val.setCompressibility(true, compressionRatio);
// Set the actual output data size to make sure that the actual output
// data size is same after compression
outBytes /= compressionRatio;
}
factory =
new AvgRecordFactory(outBytes, outRecords,
context.getConfiguration(), 5*1024);
ratio = outRecords / (1.0 * inRecords);
acc = 0.0;
matcher = new ResourceUsageMatcherRunner(context, metrics);
// start the status reporter thread
reporter = new StatusReporter(context, matcher);
reporter.start();
}
@Override
public boolean isHadoopCompressionSet() {
return FileOutputFormat.getCompressOutput(context);
}