下面列出了org.apache.hadoop.mapreduce.OutputFormat#getOutputCommitter ( ) 实例代码,或者点击链接到github查看源代码,也可以在右侧发表评论。
private org.apache.hadoop.mapreduce.OutputCommitter
createOutputCommitter(boolean newApiCommitter, JobID jobId, Configuration conf) throws Exception {
org.apache.hadoop.mapreduce.OutputCommitter committer = null;
LOG.info("OutputCommitter set in config "
+ conf.get("mapred.output.committer.class"));
if (newApiCommitter) {
org.apache.hadoop.mapreduce.TaskID taskId =
new org.apache.hadoop.mapreduce.TaskID(jobId, TaskType.MAP, 0);
org.apache.hadoop.mapreduce.TaskAttemptID taskAttemptID =
new org.apache.hadoop.mapreduce.TaskAttemptID(taskId, 0);
org.apache.hadoop.mapreduce.TaskAttemptContext taskContext =
new TaskAttemptContextImpl(conf, taskAttemptID);
OutputFormat outputFormat =
ReflectionUtils.newInstance(taskContext.getOutputFormatClass(), conf);
committer = outputFormat.getOutputCommitter(taskContext);
} else {
committer = ReflectionUtils.newInstance(conf.getClass(
"mapred.output.committer.class", FileOutputCommitter.class,
org.apache.hadoop.mapred.OutputCommitter.class), conf);
}
LOG.info("OutputCommitter is " + committer.getClass().getName());
return committer;
}
private org.apache.hadoop.mapreduce.OutputCommitter
createOutputCommitter(boolean newApiCommitter, JobID jobId, Configuration conf) throws Exception {
org.apache.hadoop.mapreduce.OutputCommitter committer = null;
LOG.info("OutputCommitter set in config "
+ conf.get("mapred.output.committer.class"));
if (newApiCommitter) {
org.apache.hadoop.mapreduce.TaskID taskId =
new org.apache.hadoop.mapreduce.TaskID(jobId, TaskType.MAP, 0);
org.apache.hadoop.mapreduce.TaskAttemptID taskAttemptID =
new org.apache.hadoop.mapreduce.TaskAttemptID(taskId, 0);
org.apache.hadoop.mapreduce.TaskAttemptContext taskContext =
new TaskAttemptContextImpl(conf, taskAttemptID);
OutputFormat outputFormat =
ReflectionUtils.newInstance(taskContext.getOutputFormatClass(), conf);
committer = outputFormat.getOutputCommitter(taskContext);
} else {
committer = ReflectionUtils.newInstance(conf.getClass(
"mapred.output.committer.class", FileOutputCommitter.class,
org.apache.hadoop.mapred.OutputCommitter.class), conf);
}
LOG.info("OutputCommitter is " + committer.getClass().getName());
return committer;
}
private static OutputCommitter initOutputCommitter(
OutputFormat<?, ?> outputFormatObj,
Configuration conf,
TaskAttemptContext taskAttemptContext)
throws IllegalStateException {
OutputCommitter outputCommitter;
try {
outputCommitter = outputFormatObj.getOutputCommitter(taskAttemptContext);
if (outputCommitter != null) {
outputCommitter.setupJob(new JobContextImpl(conf, taskAttemptContext.getJobID()));
}
} catch (Exception e) {
throw new IllegalStateException("Unable to create OutputCommitter object: ", e);
}
return outputCommitter;
}
private void writeBadOutput(TaskAttempt attempt, Configuration conf)
throws Exception {
TaskAttemptContext tContext = new TaskAttemptContextImpl(conf,
TypeConverter.fromYarn(attempt.getID()));
TextOutputFormat<?, ?> theOutputFormat = new TextOutputFormat();
RecordWriter theRecordWriter = theOutputFormat
.getRecordWriter(tContext);
NullWritable nullWritable = NullWritable.get();
try {
theRecordWriter.write(key2, val2);
theRecordWriter.write(null, nullWritable);
theRecordWriter.write(null, val2);
theRecordWriter.write(nullWritable, val1);
theRecordWriter.write(key1, nullWritable);
theRecordWriter.write(key2, null);
theRecordWriter.write(null, null);
theRecordWriter.write(key1, val1);
} finally {
theRecordWriter.close(tContext);
}
OutputFormat outputFormat = ReflectionUtils.newInstance(
tContext.getOutputFormatClass(), conf);
OutputCommitter committer = outputFormat.getOutputCommitter(tContext);
committer.commitTask(tContext);
}
private void writeOutput(TaskAttempt attempt, Configuration conf)
throws Exception {
TaskAttemptContext tContext = new TaskAttemptContextImpl(conf,
TypeConverter.fromYarn(attempt.getID()));
TextOutputFormat<?, ?> theOutputFormat = new TextOutputFormat();
RecordWriter theRecordWriter = theOutputFormat
.getRecordWriter(tContext);
NullWritable nullWritable = NullWritable.get();
try {
theRecordWriter.write(key1, val1);
theRecordWriter.write(null, nullWritable);
theRecordWriter.write(null, val1);
theRecordWriter.write(nullWritable, val2);
theRecordWriter.write(key2, nullWritable);
theRecordWriter.write(key1, null);
theRecordWriter.write(null, null);
theRecordWriter.write(key2, val2);
} finally {
theRecordWriter.close(tContext);
}
OutputFormat outputFormat = ReflectionUtils.newInstance(
tContext.getOutputFormatClass(), conf);
OutputCommitter committer = outputFormat.getOutputCommitter(tContext);
committer.commitTask(tContext);
}
private void writeBadOutput(TaskAttempt attempt, Configuration conf)
throws Exception {
TaskAttemptContext tContext = new TaskAttemptContextImpl(conf,
TypeConverter.fromYarn(attempt.getID()));
TextOutputFormat<?, ?> theOutputFormat = new TextOutputFormat();
RecordWriter theRecordWriter = theOutputFormat
.getRecordWriter(tContext);
NullWritable nullWritable = NullWritable.get();
try {
theRecordWriter.write(key2, val2);
theRecordWriter.write(null, nullWritable);
theRecordWriter.write(null, val2);
theRecordWriter.write(nullWritable, val1);
theRecordWriter.write(key1, nullWritable);
theRecordWriter.write(key2, null);
theRecordWriter.write(null, null);
theRecordWriter.write(key1, val1);
} finally {
theRecordWriter.close(tContext);
}
OutputFormat outputFormat = ReflectionUtils.newInstance(
tContext.getOutputFormatClass(), conf);
OutputCommitter committer = outputFormat.getOutputCommitter(tContext);
committer.commitTask(tContext);
}
private void writeOutput(TaskAttempt attempt, Configuration conf)
throws Exception {
TaskAttemptContext tContext = new TaskAttemptContextImpl(conf,
TypeConverter.fromYarn(attempt.getID()));
TextOutputFormat<?, ?> theOutputFormat = new TextOutputFormat();
RecordWriter theRecordWriter = theOutputFormat
.getRecordWriter(tContext);
NullWritable nullWritable = NullWritable.get();
try {
theRecordWriter.write(key1, val1);
theRecordWriter.write(null, nullWritable);
theRecordWriter.write(null, val1);
theRecordWriter.write(nullWritable, val2);
theRecordWriter.write(key2, nullWritable);
theRecordWriter.write(key1, null);
theRecordWriter.write(null, null);
theRecordWriter.write(key2, val2);
} finally {
theRecordWriter.close(tContext);
}
OutputFormat outputFormat = ReflectionUtils.newInstance(
tContext.getOutputFormatClass(), conf);
OutputCommitter committer = outputFormat.getOutputCommitter(tContext);
committer.commitTask(tContext);
}
/**
* Commits whole write job.
*
* @param config hadoop config
*/
private void cleanupJob(Configuration config) {
externalSynchronization.releaseJobIdLock(config);
JobID jobID = HadoopFormats.getJobId(config);
TaskAttemptContext cleanupTaskContext = HadoopFormats.createCleanupTaskContext(config, jobID);
OutputFormat<?, ?> outputFormat = HadoopFormats.createOutputFormatFromConfig(config);
try {
OutputCommitter outputCommitter = outputFormat.getOutputCommitter(cleanupTaskContext);
outputCommitter.commitJob(cleanupTaskContext);
} catch (Exception e) {
throw new RuntimeException("Unable to commit job.", e);
}
}
public void testBinary() throws IOException, InterruptedException {
Configuration conf = new Configuration();
Job job = Job.getInstance(conf);
Path outdir = new Path(System.getProperty("test.build.data", "/tmp"),
"outseq");
Random r = new Random();
long seed = r.nextLong();
r.setSeed(seed);
FileOutputFormat.setOutputPath(job, outdir);
SequenceFileAsBinaryOutputFormat.setSequenceFileOutputKeyClass(job,
IntWritable.class );
SequenceFileAsBinaryOutputFormat.setSequenceFileOutputValueClass(job,
DoubleWritable.class );
SequenceFileAsBinaryOutputFormat.setCompressOutput(job, true);
SequenceFileAsBinaryOutputFormat.setOutputCompressionType(job,
CompressionType.BLOCK);
BytesWritable bkey = new BytesWritable();
BytesWritable bval = new BytesWritable();
TaskAttemptContext context =
MapReduceTestUtil.createDummyMapTaskAttemptContext(job.getConfiguration());
OutputFormat<BytesWritable, BytesWritable> outputFormat =
new SequenceFileAsBinaryOutputFormat();
OutputCommitter committer = outputFormat.getOutputCommitter(context);
committer.setupJob(job);
RecordWriter<BytesWritable, BytesWritable> writer = outputFormat.
getRecordWriter(context);
IntWritable iwritable = new IntWritable();
DoubleWritable dwritable = new DoubleWritable();
DataOutputBuffer outbuf = new DataOutputBuffer();
LOG.info("Creating data by SequenceFileAsBinaryOutputFormat");
try {
for (int i = 0; i < RECORDS; ++i) {
iwritable = new IntWritable(r.nextInt());
iwritable.write(outbuf);
bkey.set(outbuf.getData(), 0, outbuf.getLength());
outbuf.reset();
dwritable = new DoubleWritable(r.nextDouble());
dwritable.write(outbuf);
bval.set(outbuf.getData(), 0, outbuf.getLength());
outbuf.reset();
writer.write(bkey, bval);
}
} finally {
writer.close(context);
}
committer.commitTask(context);
committer.commitJob(job);
InputFormat<IntWritable, DoubleWritable> iformat =
new SequenceFileInputFormat<IntWritable, DoubleWritable>();
int count = 0;
r.setSeed(seed);
SequenceFileInputFormat.setInputPaths(job, outdir);
LOG.info("Reading data by SequenceFileInputFormat");
for (InputSplit split : iformat.getSplits(job)) {
RecordReader<IntWritable, DoubleWritable> reader =
iformat.createRecordReader(split, context);
MapContext<IntWritable, DoubleWritable, BytesWritable, BytesWritable>
mcontext = new MapContextImpl<IntWritable, DoubleWritable,
BytesWritable, BytesWritable>(job.getConfiguration(),
context.getTaskAttemptID(), reader, null, null,
MapReduceTestUtil.createDummyReporter(),
split);
reader.initialize(split, mcontext);
try {
int sourceInt;
double sourceDouble;
while (reader.nextKeyValue()) {
sourceInt = r.nextInt();
sourceDouble = r.nextDouble();
iwritable = reader.getCurrentKey();
dwritable = reader.getCurrentValue();
assertEquals(
"Keys don't match: " + "*" + iwritable.get() + ":" +
sourceInt + "*",
sourceInt, iwritable.get());
assertTrue(
"Vals don't match: " + "*" + dwritable.get() + ":" +
sourceDouble + "*",
Double.compare(dwritable.get(), sourceDouble) == 0 );
++count;
}
} finally {
reader.close();
}
}
assertEquals("Some records not found", RECORDS, count);
}
public void testBinary() throws IOException, InterruptedException {
Configuration conf = new Configuration();
Job job = Job.getInstance(conf);
Path outdir = new Path(System.getProperty("test.build.data", "/tmp"),
"outseq");
Random r = new Random();
long seed = r.nextLong();
r.setSeed(seed);
FileOutputFormat.setOutputPath(job, outdir);
SequenceFileAsBinaryOutputFormat.setSequenceFileOutputKeyClass(job,
IntWritable.class );
SequenceFileAsBinaryOutputFormat.setSequenceFileOutputValueClass(job,
DoubleWritable.class );
SequenceFileAsBinaryOutputFormat.setCompressOutput(job, true);
SequenceFileAsBinaryOutputFormat.setOutputCompressionType(job,
CompressionType.BLOCK);
BytesWritable bkey = new BytesWritable();
BytesWritable bval = new BytesWritable();
TaskAttemptContext context =
MapReduceTestUtil.createDummyMapTaskAttemptContext(job.getConfiguration());
OutputFormat<BytesWritable, BytesWritable> outputFormat =
new SequenceFileAsBinaryOutputFormat();
OutputCommitter committer = outputFormat.getOutputCommitter(context);
committer.setupJob(job);
RecordWriter<BytesWritable, BytesWritable> writer = outputFormat.
getRecordWriter(context);
IntWritable iwritable = new IntWritable();
DoubleWritable dwritable = new DoubleWritable();
DataOutputBuffer outbuf = new DataOutputBuffer();
LOG.info("Creating data by SequenceFileAsBinaryOutputFormat");
try {
for (int i = 0; i < RECORDS; ++i) {
iwritable = new IntWritable(r.nextInt());
iwritable.write(outbuf);
bkey.set(outbuf.getData(), 0, outbuf.getLength());
outbuf.reset();
dwritable = new DoubleWritable(r.nextDouble());
dwritable.write(outbuf);
bval.set(outbuf.getData(), 0, outbuf.getLength());
outbuf.reset();
writer.write(bkey, bval);
}
} finally {
writer.close(context);
}
committer.commitTask(context);
committer.commitJob(job);
InputFormat<IntWritable, DoubleWritable> iformat =
new SequenceFileInputFormat<IntWritable, DoubleWritable>();
int count = 0;
r.setSeed(seed);
SequenceFileInputFormat.setInputPaths(job, outdir);
LOG.info("Reading data by SequenceFileInputFormat");
for (InputSplit split : iformat.getSplits(job)) {
RecordReader<IntWritable, DoubleWritable> reader =
iformat.createRecordReader(split, context);
MapContext<IntWritable, DoubleWritable, BytesWritable, BytesWritable>
mcontext = new MapContextImpl<IntWritable, DoubleWritable,
BytesWritable, BytesWritable>(job.getConfiguration(),
context.getTaskAttemptID(), reader, null, null,
MapReduceTestUtil.createDummyReporter(),
split);
reader.initialize(split, mcontext);
try {
int sourceInt;
double sourceDouble;
while (reader.nextKeyValue()) {
sourceInt = r.nextInt();
sourceDouble = r.nextDouble();
iwritable = reader.getCurrentKey();
dwritable = reader.getCurrentValue();
assertEquals(
"Keys don't match: " + "*" + iwritable.get() + ":" +
sourceInt + "*",
sourceInt, iwritable.get());
assertTrue(
"Vals don't match: " + "*" + dwritable.get() + ":" +
sourceDouble + "*",
Double.compare(dwritable.get(), sourceDouble) == 0 );
++count;
}
} finally {
reader.close();
}
}
assertEquals("Some records not found", RECORDS, count);
}
public static void write(String out) throws IOException, ParserException,
InterruptedException, ExecException {
{
StringBuilder schemaString = new StringBuilder("a0: chararray");
for (int i = 1; i < COLUMN_COUNT; i++) {
schemaString.append(", a" + i + ": chararray");
}
String location = out;
String schema = schemaString.toString();
StoreFuncInterface storer = new ParquetStorer();
Job job = new Job(conf);
storer.setStoreFuncUDFContextSignature("sig");
String absPath = storer.relToAbsPathForStoreLocation(location, new Path(new File(".").getAbsoluteFile().toURI()));
storer.setStoreLocation(absPath, job);
storer.checkSchema(new ResourceSchema(Utils.getSchemaFromString(schema)));
@SuppressWarnings("unchecked") // that's how the base class is defined
OutputFormat<Void, Tuple> outputFormat = storer.getOutputFormat();
// it's ContextUtil.getConfiguration(job) and not just conf !
JobContext jobContext = ContextUtil.newJobContext(ContextUtil.getConfiguration(job), new JobID("jt", jobid ++));
outputFormat.checkOutputSpecs(jobContext);
if (schema != null) {
ResourceSchema resourceSchema = new ResourceSchema(Utils.getSchemaFromString(schema));
storer.checkSchema(resourceSchema);
if (storer instanceof StoreMetadata) {
((StoreMetadata)storer).storeSchema(resourceSchema, absPath, job);
}
}
TaskAttemptContext taskAttemptContext = ContextUtil.newTaskAttemptContext(ContextUtil.getConfiguration(job), new TaskAttemptID("jt", jobid, true, 1, 0));
RecordWriter<Void, Tuple> recordWriter = outputFormat.getRecordWriter(taskAttemptContext);
storer.prepareToWrite(recordWriter);
for (int i = 0; i < ROW_COUNT; i++) {
Tuple tuple = TupleFactory.getInstance().newTuple(COLUMN_COUNT);
for (int j = 0; j < COLUMN_COUNT; j++) {
tuple.set(j, "a" + i + "_" + j);
}
storer.putNext(tuple);
}
recordWriter.close(taskAttemptContext);
OutputCommitter outputCommitter = outputFormat.getOutputCommitter(taskAttemptContext);
outputCommitter.commitTask(taskAttemptContext);
outputCommitter.commitJob(jobContext);
}
}
@SuppressWarnings("rawtypes")
private org.apache.hadoop.mapreduce.OutputCommitter
getOutputCommitter(OutputCommitterContext context) {
org.apache.hadoop.mapreduce.OutputCommitter committer = null;
newApiCommitter = false;
if (jobConf.getBoolean("mapred.reducer.new-api", false)
|| jobConf.getBoolean("mapred.mapper.new-api", false)) {
newApiCommitter = true;
LOG.info("Using mapred newApiCommitter.");
}
LOG.info("OutputCommitter set in config for outputName="
+ context.getOutputName()
+ ", vertexName=" + context.getVertexName()
+ ", outputCommitterClass="
+ jobConf.get("mapred.output.committer.class"));
if (newApiCommitter) {
TaskAttemptID taskAttemptID = new TaskAttemptID(
Long.toString(context.getApplicationId().getClusterTimestamp()),
context.getApplicationId().getId(),
((jobConf.getBoolean(MRConfig.IS_MAP_PROCESSOR, false) ?
TaskType.MAP : TaskType.REDUCE)),
0, context.getDAGAttemptNumber());
TaskAttemptContext taskContext = new TaskAttemptContextImpl(jobConf,
taskAttemptID);
try {
OutputFormat outputFormat = ReflectionUtils.newInstance(taskContext
.getOutputFormatClass(), jobConf);
committer = outputFormat.getOutputCommitter(taskContext);
} catch (Exception e) {
throw new TezUncheckedException(e);
}
} else {
committer = ReflectionUtils.newInstance(jobConf.getClass(
"mapred.output.committer.class", FileOutputCommitter.class,
org.apache.hadoop.mapred.OutputCommitter.class), jobConf);
}
LOG.info("OutputCommitter for outputName="
+ context.getOutputName()
+ ", vertexName=" + context.getVertexName()
+ ", outputCommitterClass="
+ committer.getClass().getName());
return committer;
}
@SuppressWarnings("rawtypes")
private org.apache.hadoop.mapreduce.OutputCommitter
getOutputCommitter(OutputCommitterContext context) {
org.apache.hadoop.mapreduce.OutputCommitter committer = null;
newApiCommitter = false;
if (jobConf.getBoolean("mapred.reducer.new-api", false)
|| jobConf.getBoolean("mapred.mapper.new-api", false)) {
newApiCommitter = true;
}
LOG.info("Committer for " + getContext().getVertexName() + ":" + getContext().getOutputName() +
" using " + (newApiCommitter ? "new" : "old") + "mapred API");
if (newApiCommitter) {
TaskAttemptID taskAttemptID = new TaskAttemptID(
Long.toString(context.getApplicationId().getClusterTimestamp()),
context.getApplicationId().getId(),
((jobConf.getBoolean(MRConfig.IS_MAP_PROCESSOR, false) ?
TaskType.MAP : TaskType.REDUCE)),
0, context.getDAGAttemptNumber());
TaskAttemptContext taskContext = new TaskAttemptContextImpl(jobConf,
taskAttemptID);
try {
OutputFormat outputFormat = ReflectionUtils.newInstance(taskContext
.getOutputFormatClass(), jobConf);
committer = outputFormat.getOutputCommitter(taskContext);
} catch (Exception e) {
throw new TezUncheckedException(e);
}
} else {
committer = ReflectionUtils.newInstance(jobConf.getClass(
"mapred.output.committer.class", FileOutputCommitter.class,
org.apache.hadoop.mapred.OutputCommitter.class), jobConf);
}
LOG.info("OutputCommitter for outputName="
+ context.getOutputName()
+ ", vertexName=" + context.getVertexName()
+ ", outputCommitterClass="
+ committer.getClass().getName());
return committer;
}