下面列出了怎么用org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter的API类实例代码及写法,或者点击链接到github查看源代码。
public S3MultipartOutputCommitter(Path outputPath, JobContext context)
throws IOException {
super(outputPath, context);
this.constructorOutputPath = outputPath;
Configuration conf = context.getConfiguration();
this.uploadPartSize = conf.getLong(
S3Committer.UPLOAD_SIZE, S3Committer.DEFAULT_UPLOAD_SIZE);
// Spark will use a fake app id based on the current minute and job id 0.
// To avoid collisions, use the YARN application ID for Spark.
this.uuid = conf.get(S3Committer.UPLOAD_UUID, conf.get(
S3Committer.SPARK_WRITE_UUID,
conf.get(S3Committer.SPARK_APP_ID, context.getJobID().toString())));
if (context instanceof TaskAttemptContext) {
this.workPath = taskAttemptPath((TaskAttemptContext) context, uuid);
} else {
this.workPath = null;
}
this.wrappedCommitter = new FileOutputCommitter(
Paths.getMultipartUploadCommitsDirectory(conf, uuid), context);
}
public RecordWriter<BytesWritable, BytesWritable> getRecordWriter(
TaskAttemptContext context) throws IOException {
// Get the path of the temporary output file
final Path outputPath = FileOutputFormat.getOutputPath(context);
final Path outputDir = new FileOutputCommitter(outputPath, context).getWorkPath();
final Configuration conf = context.getConfiguration();
final FileSystem fs = outputDir.getFileSystem(conf);
int blockSize = conf.getInt(Constants.HFILE_BLOCKSIZE, 16384);
// Default to snappy.
Compression.Algorithm compressionAlgorithm = getAlgorithm(
conf.get(Constants.HFILE_COMPRESSION));
final StoreFile.Writer writer =
new StoreFile.WriterBuilder(conf, new CacheConfig(conf), fs, blockSize)
.withFilePath(hfilePath(outputPath, context.getTaskAttemptID().getTaskID().getId()))
.withCompression(compressionAlgorithm)
.build();
return new HFileRecordWriter(writer);
}
@Override
public void open(String uId) throws Exception {
this.hash = uId.hashCode();
Job job = ((ConfigurableHDFSFileSink<K, V>) getWriteOperation().getSink()).jobInstance();
FileOutputFormat.setOutputPath(job, new Path(path));
// Each Writer is responsible for writing one bundle of elements and is represented by one
// unique Hadoop task based on uId/hash. All tasks share the same job ID. Since Dataflow
// handles retrying of failed bundles, each task has one attempt only.
JobID jobId = job.getJobID();
TaskID taskId = new TaskID(jobId, TaskType.REDUCE, hash);
configure(job);
context = new TaskAttemptContextImpl(job.getConfiguration(), new TaskAttemptID(taskId, 0));
FileOutputFormat<K, V> outputFormat = formatClass.newInstance();
recordWriter = outputFormat.getRecordWriter(context);
outputCommitter = (FileOutputCommitter) outputFormat.getOutputCommitter(context);
}
@Override
public void init() throws IOException {
super.init();
Configuration taskConf = new Configuration();
Path stagingResultDir = new Path(stagingDir, TajoConstants.RESULT_DIR_NAME);
taskConf.set(FileOutputFormat.OUTDIR, stagingResultDir.toString());
ExecutionBlockId ebId = taskAttemptId.getTaskId().getExecutionBlockId();
writerContext = new TaskAttemptContextImpl(taskConf,
new TaskAttemptID(ebId.getQueryId().toString(), ebId.getId(), TaskType.MAP,
taskAttemptId.getTaskId().getId(), taskAttemptId.getId()));
HFileOutputFormat2 hFileOutputFormat2 = new HFileOutputFormat2();
try {
writer = hFileOutputFormat2.getRecordWriter(writerContext);
committer = new FileOutputCommitter(FileOutputFormat.getOutputPath(writerContext), writerContext);
workingFilePath = committer.getWorkPath();
} catch (InterruptedException e) {
throw new IOException(e.getMessage(), e);
}
LOG.info("Created hbase file writer: " + workingFilePath);
}
/**
* Queries the file system for the URIs of all files in the base output directory that are not
* directories and whose name isn't {@link FileOutputCommitter#SUCCEEDED_FILE_NAME}.
*
* @return a list of all URIs in the form of strings.
* @throws IOException if unable to query for the files in the base output directory.
*/
protected List<String> getOutputFileURIs() throws IOException {
// Enumerate over all files in the output path.
FileStatus[] outputFiles = outputFileSystem.listStatus(outputPath);
ArrayList<String> sourceUris = new ArrayList<String>(outputFiles.length);
for (int i = 0; i < outputFiles.length; i++) {
FileStatus fileStatus = outputFiles[i];
// Skip the success file and directories as they're not relevant to BigQuery.
if (!fileStatus.isDir()
&& !fileStatus.getPath().getName().equals(FileOutputCommitter.SUCCEEDED_FILE_NAME)) {
sourceUris.add(fileStatus.getPath().toString());
}
}
return sourceUris;
}
private static void populateMRSettingsToRetain() {
// FileInputFormat
mrSettingsToRetain.add(FileInputFormat.INPUT_DIR);
mrSettingsToRetain.add(FileInputFormat.SPLIT_MAXSIZE);
mrSettingsToRetain.add(FileInputFormat.SPLIT_MINSIZE);
mrSettingsToRetain.add(FileInputFormat.PATHFILTER_CLASS);
mrSettingsToRetain.add(FileInputFormat.NUM_INPUT_FILES);
mrSettingsToRetain.add(FileInputFormat.INPUT_DIR_RECURSIVE);
// FileOutputFormat
mrSettingsToRetain.add(MRConfiguration.OUTPUT_BASENAME);
mrSettingsToRetain.add(FileOutputFormat.COMPRESS);
mrSettingsToRetain.add(FileOutputFormat.COMPRESS_CODEC);
mrSettingsToRetain.add(FileOutputFormat.COMPRESS_TYPE);
mrSettingsToRetain.add(FileOutputFormat.OUTDIR);
mrSettingsToRetain.add(FileOutputCommitter.SUCCESSFUL_JOB_OUTPUT_DIR_MARKER);
}
private void setHadoopJobConfigs(Job job, int numInputPaths) {
job.getConfiguration().set(JobContext.JOB_NAME, this.getClass().getName());
// Turn this on to always firstly use class paths that user specifies.
job.getConfiguration().set(MRJobConfig.MAPREDUCE_JOB_USER_CLASSPATH_FIRST, "true");
// Turn this off since we don't need an empty file in the output directory
job.getConfiguration().set(FileOutputCommitter.SUCCESSFUL_JOB_OUTPUT_DIR_MARKER, "false");
job.setJarByClass(HadoopSegmentPreprocessingJob.class);
String hadoopTokenFileLocation = System.getenv(UserGroupInformation.HADOOP_TOKEN_FILE_LOCATION);
if (hadoopTokenFileLocation != null) {
job.getConfiguration().set(MRJobConfig.MAPREDUCE_JOB_CREDENTIALS_BINARY, hadoopTokenFileLocation);
}
// Mapper configs.
job.setMapperClass(SegmentPreprocessingMapper.class);
job.setMapOutputKeyClass(AvroKey.class);
job.setMapOutputValueClass(AvroValue.class);
job.getConfiguration().setInt(JobContext.NUM_MAPS, numInputPaths);
// Reducer configs.
job.setReducerClass(SegmentPreprocessingReducer.class);
job.setOutputKeyClass(AvroKey.class);
job.setOutputValueClass(NullWritable.class);
}
@Test(timeout = 5000)
public void testNewAPI_TextOutputFormat() throws Exception {
Configuration conf = new Configuration();
conf.setBoolean(MRConfig.IS_MAP_PROCESSOR, true);
DataSinkDescriptor dataSink = MROutput
.createConfigBuilder(conf, TextOutputFormat.class,
tmpDir.getPath())
.build();
OutputContext outputContext = createMockOutputContext(dataSink.getOutputDescriptor().getUserPayload(),
new Configuration(false));
MROutput output = new MROutput(outputContext, 2);
output.initialize();
assertEquals(true, output.isMapperOutput);
assertEquals(true, output.useNewApi);
assertEquals(TextOutputFormat.class, output.newOutputFormat.getClass());
assertNull(output.oldOutputFormat);
assertNotNull(output.newApiTaskAttemptContext);
assertNull(output.oldApiTaskAttemptContext);
assertNotNull(output.newRecordWriter);
assertNull(output.oldRecordWriter);
assertEquals(FileOutputCommitter.class, output.committer.getClass());
}
@Test(timeout = 5000)
public void testOldAPI_TextOutputFormat() throws Exception {
Configuration conf = new Configuration();
conf.setBoolean(MRConfig.IS_MAP_PROCESSOR, false);
DataSinkDescriptor dataSink = MROutput
.createConfigBuilder(conf,
org.apache.hadoop.mapred.TextOutputFormat.class,
tmpDir.getPath())
.build();
OutputContext outputContext = createMockOutputContext(dataSink.getOutputDescriptor().getUserPayload(),
new Configuration(false));
MROutput output = new MROutput(outputContext, 2);
output.initialize();
assertEquals(false, output.isMapperOutput);
assertEquals(false, output.useNewApi);
assertEquals(org.apache.hadoop.mapred.TextOutputFormat.class, output.oldOutputFormat.getClass());
assertNull(output.newOutputFormat);
assertNotNull(output.oldApiTaskAttemptContext);
assertNull(output.newApiTaskAttemptContext);
assertNotNull(output.oldRecordWriter);
assertNull(output.newRecordWriter);
assertEquals(org.apache.hadoop.mapred.FileOutputCommitter.class, output.committer.getClass());
}
@Test(timeout = 5000)
public void testNewAPI_SequenceFileOutputFormat() throws Exception {
JobConf conf = new JobConf();
conf.setOutputKeyClass(NullWritable.class);
conf.setOutputValueClass(Text.class);
DataSinkDescriptor dataSink = MROutput
.createConfigBuilder(conf, SequenceFileOutputFormat.class,
tmpDir.getPath())
.build();
OutputContext outputContext = createMockOutputContext(dataSink.getOutputDescriptor().getUserPayload(),
new Configuration(false));
MROutput output = new MROutput(outputContext, 2);
output.initialize();
assertEquals(true, output.useNewApi);
assertEquals(SequenceFileOutputFormat.class, output.newOutputFormat.getClass());
assertNull(output.oldOutputFormat);
assertEquals(NullWritable.class, output.newApiTaskAttemptContext.getOutputKeyClass());
assertEquals(Text.class, output.newApiTaskAttemptContext.getOutputValueClass());
assertNull(output.oldApiTaskAttemptContext);
assertNotNull(output.newRecordWriter);
assertNull(output.oldRecordWriter);
assertEquals(FileOutputCommitter.class, output.committer.getClass());
}
@Test(timeout = 5000)
public void testOldAPI_SequenceFileOutputFormat() throws Exception {
JobConf conf = new JobConf();
conf.setOutputKeyClass(NullWritable.class);
conf.setOutputValueClass(Text.class);
DataSinkDescriptor dataSink = MROutput
.createConfigBuilder(conf,
org.apache.hadoop.mapred.SequenceFileOutputFormat.class,
tmpDir.getPath())
.build();
OutputContext outputContext = createMockOutputContext(dataSink.getOutputDescriptor().getUserPayload(),
new Configuration(false));
MROutput output = new MROutput(outputContext, 2);
output.initialize();
assertEquals(false, output.useNewApi);
assertEquals(org.apache.hadoop.mapred.SequenceFileOutputFormat.class, output.oldOutputFormat.getClass());
assertNull(output.newOutputFormat);
assertEquals(NullWritable.class, output.oldApiTaskAttemptContext.getOutputKeyClass());
assertEquals(Text.class, output.oldApiTaskAttemptContext.getOutputValueClass());
assertNull(output.newApiTaskAttemptContext);
assertNotNull(output.oldRecordWriter);
assertNull(output.newRecordWriter);
assertEquals(org.apache.hadoop.mapred.FileOutputCommitter.class, output.committer.getClass());
}
@Test(timeout = 5000)
public void testNewAPI_WorkOutputPathOutputFormat() throws Exception {
Configuration conf = new Configuration();
conf.setBoolean(MRConfig.IS_MAP_PROCESSOR, true);
DataSinkDescriptor dataSink = MROutput
.createConfigBuilder(conf, NewAPI_WorkOutputPathReadingOutputFormat.class,
tmpDir.getPath())
.build();
OutputContext outputContext = createMockOutputContext(dataSink.getOutputDescriptor().getUserPayload(),
new Configuration(false));
MROutput output = new MROutput(outputContext, 2);
output.initialize();
assertEquals(true, output.isMapperOutput);
assertEquals(true, output.useNewApi);
assertEquals(NewAPI_WorkOutputPathReadingOutputFormat.class, output.newOutputFormat.getClass());
assertNull(output.oldOutputFormat);
assertNotNull(output.newApiTaskAttemptContext);
assertNull(output.oldApiTaskAttemptContext);
assertNotNull(output.newRecordWriter);
assertNull(output.oldRecordWriter);
assertEquals(FileOutputCommitter.class, output.committer.getClass());
}
@Test(timeout = 5000)
public void testOldAPI_WorkOutputPathOutputFormat() throws Exception {
Configuration conf = new Configuration();
conf.setBoolean(MRConfig.IS_MAP_PROCESSOR, false);
DataSinkDescriptor dataSink = MROutput
.createConfigBuilder(conf, OldAPI_WorkOutputPathReadingOutputFormat.class,
tmpDir.getPath())
.build();
OutputContext outputContext = createMockOutputContext(dataSink.getOutputDescriptor().getUserPayload(),
new Configuration(false));
MROutput output = new MROutput(outputContext, 2);
output.initialize();
assertEquals(false, output.isMapperOutput);
assertEquals(false, output.useNewApi);
assertEquals(OldAPI_WorkOutputPathReadingOutputFormat.class, output.oldOutputFormat.getClass());
assertNull(output.newOutputFormat);
assertNotNull(output.oldApiTaskAttemptContext);
assertNull(output.newApiTaskAttemptContext);
assertNotNull(output.oldRecordWriter);
assertNull(output.newRecordWriter);
assertEquals(org.apache.hadoop.mapred.FileOutputCommitter.class, output.committer.getClass());
}
private static void cleanup() throws IOException {
Configuration conf = new Configuration();
FileSystem fs = outDir.getFileSystem(conf);
fs.delete(outDir, true);
// now determine if we have YARN-3027 and YARN-3079 patches applied
// based on whether the FILEOUTPUTCOMMITTER_ALGORITHM_VERSION static class
// member exists
try {
FileOutputCommitter.class.getDeclaredField("FILEOUTPUTCOMMITTER_ALGORITHM_VERSION");
patched = true;
} catch (NoSuchFieldException nsf) {
patched = false;
}
}
private void testCommitterInternal(int version) throws Exception {
Job job = Job.getInstance();
FileOutputFormat.setOutputPath(job, outDir);
Configuration conf = job.getConfiguration();
conf.set(MRJobConfig.TASK_ATTEMPT_ID, attempt);
conf.setInt(FILEOUTPUTCOMMITTER_ALGORITHM_VERSION, version);
JobContext jContext = new JobContextImpl(conf, taskID.getJobID());
TaskAttemptContext tContext = new TaskAttemptContextImpl(conf, taskID);
FileOutputCommitter committer = new SafeFileOutputCommitter(outDir, tContext);
// setup
committer.setupJob(jContext);
committer.setupTask(tContext);
// write output
TextOutputFormat theOutputFormat = new TextOutputFormat();
RecordWriter theRecordWriter = theOutputFormat.getRecordWriter(tContext);
writeOutput(theRecordWriter, tContext);
// do commit
committer.commitTask(tContext);
committer.commitJob(jContext);
// validate output
validateContent(outDir);
FileUtil.fullyDelete(new File(outDir.toString()));
}
private void testMapFileOutputCommitterInternal(int version) throws Exception {
Job job = Job.getInstance();
FileOutputFormat.setOutputPath(job, outDir);
Configuration conf = job.getConfiguration();
conf.set(MRJobConfig.TASK_ATTEMPT_ID, attempt);
conf.setInt(FILEOUTPUTCOMMITTER_ALGORITHM_VERSION, version);
JobContext jContext = new JobContextImpl(conf, taskID.getJobID());
TaskAttemptContext tContext = new TaskAttemptContextImpl(conf, taskID);
FileOutputCommitter committer = new SafeFileOutputCommitter(outDir, tContext);
// setup
committer.setupJob(jContext);
committer.setupTask(tContext);
// write output
MapFileOutputFormat theOutputFormat = new MapFileOutputFormat();
RecordWriter theRecordWriter = theOutputFormat.getRecordWriter(tContext);
writeMapFileOutput(theRecordWriter, tContext);
// do commit
committer.commitTask(tContext);
committer.commitJob(jContext);
// validate output
validateMapFileOutputContent(FileSystem.get(job.getConfiguration()), outDir);
FileUtil.fullyDelete(new File(outDir.toString()));
}
private void testSafety(int commitVersion) throws Exception {
Job job = Job.getInstance();
FileOutputFormat.setOutputPath(job, outDir);
Configuration conf = job.getConfiguration();
conf.set(MRJobConfig.TASK_ATTEMPT_ID, attempt);
conf.setInt(MRJobConfig.APPLICATION_ATTEMPT_ID, 1);
conf.setInt(FILEOUTPUTCOMMITTER_ALGORITHM_VERSION, commitVersion);
JobContext jContext = new JobContextImpl(conf, taskID.getJobID());
TaskAttemptContext tContext = new TaskAttemptContextImpl(conf, taskID);
FileOutputCommitter committer = new SafeFileOutputCommitter(outDir, tContext);
// setup
committer.setupJob(jContext);
committer.setupTask(tContext);
// write output
TextOutputFormat theOutputFormat = new TextOutputFormat();
RecordWriter theRecordWriter = theOutputFormat.getRecordWriter(tContext);
writeOutput(theRecordWriter, tContext);
// close the job prior to committing task (leaving files in temporary dir
try {
committer.commitJob(jContext);
Assert.fail("Expected commit job to fail");
} catch (Exception e) {
committer.commitTask(tContext);
committer.commitJob(jContext);
}
validateContent(outDir);
FileUtil.fullyDelete(new File(outDir.toString()));
}
private void ingestTestData(Configuration conf, TestFileLoader loader) throws IOException, InterruptedException {
log.debug("------------- ingestTestData -------------");
File tmpDir = new File(System.getProperty("java.io.tmpdir"));
Path tmpPath = new Path(tmpDir.toURI());
Path seqFile = new Path(tmpPath, UUID.randomUUID().toString());
TaskAttemptID id = new TaskAttemptID("testJob", 0, TaskType.MAP, 0, 0);
TaskAttemptContext context = new TaskAttemptContextImpl(conf, id);
try (final RawLocalFileSystem rfs = createSequenceFile(conf, seqFile, loader)) {
InputSplit split = new FileSplit(seqFile, 0, rfs.pathToFile(seqFile).length(), null);
EventSequenceFileRecordReader<LongWritable> rr = new EventSequenceFileRecordReader<>();
rr.initialize(split, context);
Path ocPath = new Path(tmpPath, "oc");
OutputCommitter oc = new FileOutputCommitter(ocPath, context);
rfs.deleteOnExit(ocPath);
StandaloneStatusReporter sr = new StandaloneStatusReporter();
EventMapper<LongWritable,RawRecordContainer,Text,Mutation> mapper = new EventMapper<>();
MapContext<LongWritable,RawRecordContainer,Text,Mutation> mapContext = new MapContextImpl<>(conf, id, rr, this.recordWriter, oc, sr, split);
Mapper<LongWritable,RawRecordContainer,Text,Mutation>.Context con = new WrappedMapper<LongWritable,RawRecordContainer,Text,Mutation>()
.getMapContext(mapContext);
mapper.run(con);
mapper.cleanup(con);
} finally {
this.recordWriter.close(context);
}
}
private Path getTaskOutputPath(TaskAttemptContext conf) throws IOException {
Path taskOutputPath;
OutputCommitter committer = getOutputCommitter(conf);
if (committer instanceof FileOutputCommitter) {
taskOutputPath = ((FileOutputCommitter) committer).getWorkPath();
} else {
Path outputPaht = getOutputPath(conf);
if (outputPaht == null) {
throw new IOException("Undefined job output path.");
}
taskOutputPath = outputPaht;
}
return taskOutputPath;
}
private Path getTaskOutputPath(TaskAttemptContext conf) throws IOException {
Path taskOutputPath;
OutputCommitter committer = getOutputCommitter(conf);
if (committer instanceof FileOutputCommitter) {
taskOutputPath = ((FileOutputCommitter) committer).getWorkPath();
} else {
Path outputPaht = getOutputPath(conf);
if (outputPaht == null) {
throw new IOException("Undefined job output path.");
}
taskOutputPath = outputPaht;
}
return taskOutputPath;
}
private Path getTaskOutputPath(TaskAttemptContext conf) throws IOException {
Path taskOutputPath;
OutputCommitter committer = getOutputCommitter(conf);
if (committer instanceof FileOutputCommitter) {
taskOutputPath = ((FileOutputCommitter) committer).getWorkPath();
} else {
Path outputPaht = getOutputPath(conf);
if (outputPaht == null) {
throw new IOException("Undefined job output path.");
}
taskOutputPath = outputPaht;
}
return taskOutputPath;
}
private Path getTaskOutputPath(TaskAttemptContext conf) throws IOException {
Path taskOutputPath;
OutputCommitter committer = getOutputCommitter(conf);
if (committer instanceof FileOutputCommitter) {
taskOutputPath = ((FileOutputCommitter) committer).getWorkPath();
} else {
Path outputPaht = getOutputPath(conf);
if (outputPaht == null) {
throw new IOException("Undefined job output path.");
}
taskOutputPath = outputPaht;
}
return taskOutputPath;
}
public Path getDefaultWorkFile(TaskAttemptContext context, String extension)
throws IOException {
FileOutputCommitter committer = (FileOutputCommitter) getOutputCommitter(context);
String outputName = context.getConfiguration().get(
"com.b5m.admm.iteration.output.name");
if (null == outputName) {
return new Path(committer.getWorkPath(), "Z");
}
return new Path(FileOutputFormat.getOutputPath(context), outputName);
}
@Override
public Path getDefaultWorkFile(TaskAttemptContext context,
String extension) throws IOException {
FileOutputCommitter committer =
(FileOutputCommitter) super.getOutputCommitter(context);
return new Path(committer.getWorkPath(), getUniqueFile(context,
"part", extension));
}
@Test (timeout = 5000)
public void testOldAPI_MR() throws Exception {
String outputPath = TEST_DIR.getAbsolutePath();
JobConf conf = new JobConf();
conf.setOutputKeyClass(NullWritable.class);
conf.setOutputValueClass(Text.class);
conf.setOutputFormat(org.apache.hadoop.mapred.SequenceFileOutputFormat.class);
org.apache.hadoop.mapred.SequenceFileOutputFormat.setOutputPath(conf, new Path(outputPath));
// the output is attached to reducer
conf.setBoolean(MRConfig.IS_MAP_PROCESSOR, false);
UserPayload vertexPayload = TezUtils.createUserPayloadFromConf(conf);
OutputDescriptor od = OutputDescriptor.create(MROutputLegacy.class.getName())
.setUserPayload(vertexPayload);
DataSinkDescriptor sink = DataSinkDescriptor.create(od,
OutputCommitterDescriptor.create(MROutputCommitter.class.getName()), null);
OutputContext outputContext = createMockOutputContext(sink.getOutputDescriptor().getUserPayload());
MROutputLegacy output = new MROutputLegacy(outputContext, 2);
output.initialize();
assertEquals(false, output.useNewApi);
assertEquals(org.apache.hadoop.mapred.SequenceFileOutputFormat.class, output.oldOutputFormat.getClass());
assertNull(output.newOutputFormat);
assertEquals(NullWritable.class, output.oldApiTaskAttemptContext.getOutputKeyClass());
assertEquals(Text.class, output.oldApiTaskAttemptContext.getOutputValueClass());
assertNull(output.newApiTaskAttemptContext);
assertNotNull(output.oldRecordWriter);
assertNull(output.newRecordWriter);
assertEquals(org.apache.hadoop.mapred.FileOutputCommitter.class, output.committer.getClass());
}
@Test (timeout = 5000)
public void testNewAPI_MR() throws Exception {
String outputPath = TEST_DIR.getAbsolutePath();
Job job = Job.getInstance();
job.setOutputKeyClass(NullWritable.class);
job.setOutputValueClass(Text.class);
job.setOutputFormatClass(SequenceFileOutputFormat.class);
SequenceFileOutputFormat.setOutputPath(job, new Path(outputPath));
job.getConfiguration().setBoolean("mapred.reducer.new-api", true);
// the output is attached to reducer
job.getConfiguration().setBoolean(MRConfig.IS_MAP_PROCESSOR, false);
UserPayload vertexPayload = TezUtils.createUserPayloadFromConf(job.getConfiguration());
OutputDescriptor od = OutputDescriptor.create(MROutputLegacy.class.getName())
.setUserPayload(vertexPayload);
DataSinkDescriptor sink = DataSinkDescriptor.create(od,
OutputCommitterDescriptor.create(MROutputCommitter.class.getName()), null);
OutputContext outputContext = createMockOutputContext(sink.getOutputDescriptor().getUserPayload());
MROutputLegacy output = new MROutputLegacy(outputContext, 2);
output.initialize();
assertEquals(true, output.useNewApi);
assertEquals(SequenceFileOutputFormat.class, output.newOutputFormat.getClass());
assertNull(output.oldOutputFormat);
assertEquals(NullWritable.class, output.newApiTaskAttemptContext.getOutputKeyClass());
assertEquals(Text.class, output.newApiTaskAttemptContext.getOutputValueClass());
assertNull(output.oldApiTaskAttemptContext);
assertNotNull(output.newRecordWriter);
assertNull(output.oldRecordWriter);
assertEquals(FileOutputCommitter.class, output.committer.getClass());
}
@Test (timeout = 5000)
public void testOldAPI_MapperOnly() throws Exception {
String outputPath = TEST_DIR.getAbsolutePath();
JobConf conf = new JobConf();
conf.setOutputKeyClass(NullWritable.class);
conf.setOutputValueClass(Text.class);
conf.setOutputFormat(org.apache.hadoop.mapred.SequenceFileOutputFormat.class);
org.apache.hadoop.mapred.SequenceFileOutputFormat.setOutputPath(conf, new Path(outputPath));
// the output is attached to mapper
conf.setBoolean(MRConfig.IS_MAP_PROCESSOR, true);
UserPayload vertexPayload = TezUtils.createUserPayloadFromConf(conf);
OutputDescriptor od = OutputDescriptor.create(MROutputLegacy.class.getName())
.setUserPayload(vertexPayload);
DataSinkDescriptor sink = DataSinkDescriptor.create(od,
OutputCommitterDescriptor.create(MROutputCommitter.class.getName()), null);
OutputContext outputContext = createMockOutputContext(sink.getOutputDescriptor().getUserPayload());
MROutputLegacy output = new MROutputLegacy(outputContext, 2);
output.initialize();
assertEquals(false, output.useNewApi);
assertEquals(org.apache.hadoop.mapred.SequenceFileOutputFormat.class, output.oldOutputFormat.getClass());
assertNull(output.newOutputFormat);
assertEquals(NullWritable.class, output.oldApiTaskAttemptContext.getOutputKeyClass());
assertEquals(Text.class, output.oldApiTaskAttemptContext.getOutputValueClass());
assertNull(output.newApiTaskAttemptContext);
assertNotNull(output.oldRecordWriter);
assertNull(output.newRecordWriter);
assertEquals(org.apache.hadoop.mapred.FileOutputCommitter.class, output.committer.getClass());
}
@Test (timeout = 5000)
public void testNewAPI_MapperOnly() throws Exception {
String outputPath = TEST_DIR.getAbsolutePath();
Job job = Job.getInstance();
job.setOutputKeyClass(NullWritable.class);
job.setOutputValueClass(Text.class);
job.setOutputFormatClass(SequenceFileOutputFormat.class);
SequenceFileOutputFormat.setOutputPath(job, new Path(outputPath));
job.getConfiguration().setBoolean("mapred.mapper.new-api", true);
// the output is attached to mapper
job.getConfiguration().setBoolean(MRConfig.IS_MAP_PROCESSOR, true);
UserPayload vertexPayload = TezUtils.createUserPayloadFromConf(job.getConfiguration());
OutputDescriptor od = OutputDescriptor.create(MROutputLegacy.class.getName())
.setUserPayload(vertexPayload);
DataSinkDescriptor sink = DataSinkDescriptor.create(od,
OutputCommitterDescriptor.create(MROutputCommitter.class.getName()), null);
OutputContext outputContext = createMockOutputContext(sink.getOutputDescriptor().getUserPayload());
MROutputLegacy output = new MROutputLegacy(outputContext, 2);
output.initialize();
assertEquals(true, output.useNewApi);
assertEquals(SequenceFileOutputFormat.class, output.newOutputFormat.getClass());
assertNull(output.oldOutputFormat);
assertEquals(NullWritable.class, output.newApiTaskAttemptContext.getOutputKeyClass());
assertEquals(Text.class, output.newApiTaskAttemptContext.getOutputValueClass());
assertNull(output.oldApiTaskAttemptContext);
assertNotNull(output.newRecordWriter);
assertNull(output.oldRecordWriter);
assertEquals(FileOutputCommitter.class, output.committer.getClass());
}
@Override
public OutputCommitter getOutputCommitter(TaskAttemptContext context) throws IOException, InterruptedException {
return new FileOutputCommitter(FileOutputFormat.getOutputPath(context), context);
}
@Override
public OutputCommitter getOutputCommitter(TaskAttemptContext context) throws IOException, InterruptedException {
return new FileOutputCommitter(FileOutputFormat.getOutputPath(context), context);
}