下面列出了org.apache.hadoop.mapreduce.Job#setCombinerClass ( ) 实例代码,或者点击链接到github查看源代码,也可以在右侧发表评论。
public final static void main(final String[] args) throws Exception {
final Configuration conf = new Configuration();
final Job job = new Job(conf, "P1Q2");
job.setJarByClass(P1Q2.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(DoubleWritable.class);
job.setMapperClass(P1Q2Map.class);
job.setCombinerClass(P1Q2Reduce.class);
job.setReducerClass(P1Q2Reduce.class);
job.setInputFormatClass(TextInputFormat.class);
job.setOutputFormatClass(TextOutputFormat.class);
FileInputFormat.addInputPath(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));
job.waitForCompletion(true);
}
public static boolean run(Configuration config, Map<String, String> paths) throws IOException, ClassNotFoundException, InterruptedException {
String jobName = "step3";
Job job = Job.getInstance(config, jobName);
job.setJarByClass(Step3.class);
job.setJar("export\\ItemCF.jar");
job.setMapperClass(Step3_Mapper.class);
job.setReducerClass(Step3_Reducer.class);
job.setCombinerClass(Step3_Reducer.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(IntWritable.class);
Path inPath = new Path(paths.get("Step3Input"));
Path outpath = new Path(paths.get("Step3Output"));
FileInputFormat.addInputPath(job, inPath);
FileOutputFormat.setOutputPath(job, outpath);
FileSystem fs = FileSystem.get(config);
if (fs.exists(outpath)) {
fs.delete(outpath, true);
}
return job.waitForCompletion(true);
}
@Override
public void configure(final Job job) throws Exception {
job.setMapperClass(KMeansDistortionMapReduce.KMeansDistortionMapper.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(CountofDoubleWritable.class);
job.setReducerClass(KMeansDistortionMapReduce.KMeansDistortionReduce.class);
job.setCombinerClass(KMeansDistortionMapReduce.KMeansDistorationCombiner.class);
job.setOutputKeyClass(GeoWaveOutputKey.class);
job.setOutputValueClass(DistortionEntry.class);
job.setOutputFormatClass(GeoWaveOutputFormat.class);
// extends wait time to 15 minutes (default: 600 seconds)
final long milliSeconds = 1000L * 60L * 15L;
final Configuration conf = job.getConfiguration();
conf.setLong("mapred.task.timeout", milliSeconds);
((ParameterEnum<Integer>) JumpParameters.Jump.COUNT_OF_CENTROIDS).getHelper().setValue(
conf,
KMeansDistortionMapReduce.class,
Integer.valueOf(k));
// Required since the Mapper uses the input format parameters to lookup
// the adapter
GeoWaveInputFormat.setStoreOptions(conf, dataStoreOptions);
GeoWaveOutputFormat.addDataAdapter(conf, new DistortionDataAdapter());
}
public static void main(String [] args) throws Exception
{
Path outDir = new Path("output");
Configuration conf = new Configuration();
Job job = Job.getInstance(conf, "user name check");
job.setJarByClass(UserNamePermission.class);
job.setMapperClass(UserNamePermission.UserNameMapper.class);
job.setCombinerClass(UserNamePermission.UserNameReducer.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(Text.class);
job.setReducerClass(UserNamePermission.UserNameReducer.class);
job.setNumReduceTasks(1);
job.setInputFormatClass(TextInputFormat.class);
TextInputFormat.addInputPath(job, new Path("input"));
FileOutputFormat.setOutputPath(job, outDir);
System.exit(job.waitForCompletion(true) ? 0 : 1);
}
public static void main(String[] args) throws Exception {
Configuration conf = new Configuration();
String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();
if (otherArgs.length != 2) {
System.err.println("Usage: MapperInputSplitInfo <in> <out>");
System.exit(2);
}
Job job = Job.getInstance(conf, MapperInputSplitInfo.class.getSimpleName());
job.setJarByClass(MapperInputSplitInfo.class);
job.setMapperClass(MyMapper.class);
job.setCombinerClass(MyReducer.class);
job.setReducerClass(MyReducer.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
FileInputFormat.addInputPath(job, new Path(otherArgs[0]));
FileOutputFormat.setOutputPath(job, new Path(otherArgs[1]));
System.exit(job.waitForCompletion(true) ? 0 : 1);
}
public static void main(String[] args) throws IOException, InterruptedException, ClassNotFoundException {
Configuration conf=new Configuration();
Job job = new Job(conf);
job.setJarByClass(MF_Driver.class);
job.setJobName("Mutual Friend Calculator");
FileInputFormat.setInputPaths(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));
job.setMapperClass(MF_Mapper.class);
job.setCombinerClass(MF_Reducer.class);
job.setReducerClass(MF_Reducer.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(Text.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Text.class);
boolean success = job.waitForCompletion(true);
System.exit(success ? 0 : 1);
}
public final static void main(final String[] args) throws Exception {
final Configuration conf = new Configuration();
final Job job = new Job(conf, "P2Q2");
job.setJarByClass(P2Q2.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
job.setMapperClass(P2Q2Map.class);
job.setCombinerClass(P2Q2Reduce.class);
job.setReducerClass(P2Q2Reduce.class);
job.setInputFormatClass(TextInputFormat.class);
job.setOutputFormatClass(TextOutputFormat.class);
FileInputFormat.addInputPath(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));
job.waitForCompletion(true);
}
public static Job createJob() throws IOException {
final Configuration conf = new Configuration();
final Job baseJob = Job.getInstance(conf);
baseJob.setOutputKeyClass(Text.class);
baseJob.setOutputValueClass(IntWritable.class);
baseJob.setMapperClass(NewMapTokenizer.class);
baseJob.setCombinerClass(NewSummer.class);
baseJob.setReducerClass(NewSummer.class);
baseJob.setNumReduceTasks(1);
baseJob.getConfiguration().setInt(JobContext.IO_SORT_MB, 1);
baseJob.getConfiguration().set(JobContext.MAP_SORT_SPILL_PERCENT, "0.50");
baseJob.getConfiguration().setInt(JobContext.MAP_COMBINE_MIN_SPILLS, 3);
org.apache.hadoop.mapreduce.lib.input.FileInputFormat.setMinInputSplitSize(
baseJob, Long.MAX_VALUE);
return baseJob;
}
public static void main(String [] args) throws Exception
{
Path outDir = new Path("output");
Configuration conf = new Configuration();
Job job = Job.getInstance(conf, "user name check");
job.setJarByClass(UserNamePermission.class);
job.setMapperClass(UserNamePermission.UserNameMapper.class);
job.setCombinerClass(UserNamePermission.UserNameReducer.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(Text.class);
job.setReducerClass(UserNamePermission.UserNameReducer.class);
job.setNumReduceTasks(1);
job.setInputFormatClass(TextInputFormat.class);
TextInputFormat.addInputPath(job, new Path("input"));
FileOutputFormat.setOutputPath(job, outDir);
System.exit(job.waitForCompletion(true) ? 0 : 1);
}
@Override
public int run(String[] arg0) throws Exception {
Configuration conf = new Configuration();
String[] otherArgs = new GenericOptionsParser(conf, arg0).getRemainingArgs();
if (otherArgs.length != 2) {
System.err.println("Usage: MinMaxCount <in> <out>");
System.exit(2);
}
Job job = new Job(conf, "StackOverflow Comment MinMaxCount");
job.setJarByClass(MinMaxCount.class);
job.setMapperClass(MinMaxCountMapper.class);
job.setCombinerClass(MinMaxCountReducer.class);
job.setReducerClass(MinMaxCountReducer.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(MinMaxCountTuple.class);
FileInputFormat.addInputPath(job, new Path(otherArgs[0]));
FileOutputFormat.setOutputPath(job, new Path(otherArgs[1]));
boolean success = job.waitForCompletion(true);
return success ? 0 : 1;
}
public static void main(String[] args) throws Exception {
Configuration conf = new Configuration();
String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();
if (otherArgs.length != 2) {
System.err.println("Usage: wordcount <in> <out>");
System.exit(2);
}
Job job = new Job(conf, "word count");
job.setJarByClass(WordCount.class);
job.setMapperClass(TokenizerMapper.class);
job.setCombinerClass(IntSumReducer.class);
job.setReducerClass(IntSumReducer.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
FileInputFormat.addInputPath(job, new Path(otherArgs[0]));
FileOutputFormat.setOutputPath(job, new Path(otherArgs[1]));
System.exit(job.waitForCompletion(true) ? 0 : 1);
}
@Override
public int run(String[] args) throws Exception {
Configuration conf = new Configuration();
GenericOptionsParser parser = new GenericOptionsParser(conf, args);
String[] otherArgs = parser.getRemainingArgs();
if (otherArgs.length != 2) {
System.err.println("Usage: DistinctUser <in> <out>");
ToolRunner.printGenericCommandUsage(System.err);
System.exit(2);
}
Job job = new Job(conf, "Distinct User");
job.setJarByClass(DistinctUser.class);
job.setMapperClass(DistinctUserMapper.class);
job.setReducerClass(DistinctUserReducer.class);
job.setCombinerClass(DistinctUserReducer.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(NullWritable.class);
FileInputFormat.addInputPath(job, new Path(otherArgs[0]));
FileOutputFormat.setOutputPath(job, new Path(otherArgs[1]));
boolean success = job.waitForCompletion(true);
return success ? 0 : 1;
}
@Override
public void configure(final Job job) throws Exception {
job.setMapperClass(KMeansMapReduce.KMeansMapper.class);
job.setMapOutputKeyClass(GroupIDText.class);
job.setMapOutputValueClass(BytesWritable.class);
job.setReducerClass(KMeansMapReduce.KMeansReduce.class);
job.setCombinerClass(KMeansMapReduce.KMeansCombiner.class);
job.setReduceSpeculativeExecution(false);
job.setOutputKeyClass(GeoWaveOutputKey.class);
job.setOutputValueClass(SimpleFeature.class);
}
public static void main(String[] args) throws Exception {
System.setProperty("HADOOP_USER_NAME", "hadoop");
Configuration conf = new Configuration();
//远程发布mr时配置
conf.set("fs.defaultFS", "hdfs://192.168.1.191:9000");
conf.set("hadoop.job.user", "hadoop");
conf.set("mapreduce.framework.name", "yarn");
conf.set("yarn.resourcemanager.hostname", "192.168.1.191");
conf.set("mapred.jar", "out/artifacts/hadoop2-demo.jar");
String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();
if (otherArgs.length != 2) {
System.err.println("Usage: MrjobRemoteCommitDemo <in> <out>");
System.exit(2);
}
Job job = Job.getInstance(conf, MrjobRemoteCommitDemo.class.getName());
job.setJarByClass(MrjobRemoteCommitDemo.class);
job.setMapperClass(TokenizerMapper.class);
job.setCombinerClass(IntSumReducer.class);
job.setReducerClass(IntSumReducer.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
FileInputFormat.addInputPath(job, new Path(otherArgs[0]));
FileOutputFormat.setOutputPath(job, new Path(otherArgs[1]));
System.exit(job.waitForCompletion(true) ? 0 : 1);
}
public int run(String[] args) throws Exception {
GfxdDataSerializable.initTypes();
Configuration conf = getConf();
Path outputPath = new Path(args[0]);
String hdfsHomeDir = args[1];
String tableName = args[2];
outputPath.getFileSystem(conf).delete(outputPath, true);
conf.set(RowInputFormat.HOME_DIR, hdfsHomeDir);
conf.set(RowInputFormat.INPUT_TABLE, tableName);
conf.setBoolean(RowInputFormat.CHECKPOINT_MODE, false);
Job job = Job.getInstance(conf, "Busy Leg Count");
job.setInputFormatClass(RowInputFormat.class);
// configure mapper and reducer
job.setMapperClass(SampleMapper.class);
job.setCombinerClass(IntSumReducer.class);
job.setReducerClass(IntSumReducer.class);
// configure output
TextOutputFormat.setOutputPath(job, outputPath);
job.setOutputFormatClass(TextOutputFormat.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
return job.waitForCompletion(true) ? 0 : 1;
}
public int run(String[] args) throws Exception
{
String inputMapperType = "native";
if (args != null && args[0].startsWith(INPUT_MAPPER_VAR))
{
String[] arg0 = args[0].split("=");
if (arg0 != null && arg0.length == 2)
inputMapperType = arg0[1];
}
Job job = new Job(getConf(), "wordcountcounters");
job.setCombinerClass(ReducerToFilesystem.class);
job.setReducerClass(ReducerToFilesystem.class);
job.setJarByClass(WordCountCounters.class);
ConfigHelper.setInputInitialAddress(job.getConfiguration(), "localhost");
ConfigHelper.setInputPartitioner(job.getConfiguration(), "Murmur3Partitioner");
ConfigHelper.setInputColumnFamily(job.getConfiguration(), WordCount.KEYSPACE, WordCount.OUTPUT_COLUMN_FAMILY);
CqlConfigHelper.setInputCQLPageRowSize(job.getConfiguration(), "3");
if ("native".equals(inputMapperType))
{
job.setMapperClass(SumNativeMapper.class);
job.setInputFormatClass(CqlInputFormat.class);
CqlConfigHelper.setInputCql(job.getConfiguration(), "select * from " + WordCount.OUTPUT_COLUMN_FAMILY + " where token(word) > ? and token(word) <= ? allow filtering");
}
else
{
job.setMapperClass(SumMapper.class);
job.setInputFormatClass(CqlInputFormat.class);
ConfigHelper.setInputRpcPort(job.getConfiguration(), "9160");
}
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(LongWritable.class);
FileOutputFormat.setOutputPath(job, new Path(OUTPUT_PATH_PREFIX));
job.waitForCompletion(true);
return 0;
}
public static void run() throws IOException, ClassNotFoundException,
InterruptedException {
String inputPath = ItemBasedCFDriver.path.get("step9InputPath");
String outputPath = ItemBasedCFDriver.path.get("step9OutputPath");
Configuration conf = new Configuration();
conf.set("mapred.textoutputformat.separator", ",");
HDFS hdfs = new HDFS(conf);
hdfs.rmr(outputPath);
Job job = Job.getInstance(conf);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(DoubleWritable.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(DoubleWritable.class);
job.setMapperClass(BlockMatrixStep3_Mapper.class);
job.setReducerClass(BlockMatrixStep3_Reducer.class);
job.setCombinerClass(BlockMatrixStep3_Reducer.class);
job.setNumReduceTasks(ItemBasedCFDriver.ReducerNumber);
job.setJarByClass(BlockMatrixMultiplicationStep3.class);
job.setInputFormatClass(TextInputFormat.class);
job.setOutputFormatClass(TextOutputFormat.class);
FileInputFormat.setInputPaths(job, new Path(inputPath));
FileOutputFormat.setOutputPath(job, new Path(outputPath));
job.waitForCompletion(true);
}
public int run(String[] args) throws Exception {
GfxdDataSerializable.initTypes();
Configuration conf = getConf();
Path outputPath = new Path(args[0]);
Path intermediateOutputPath = new Path(args[0] + "_int");
String hdfsHomeDir = args[1];
String tableName = args[2];
outputPath.getFileSystem(conf).delete(outputPath, true);
intermediateOutputPath.getFileSystem(conf).delete(intermediateOutputPath, true);
conf.set(RowInputFormat.HOME_DIR, hdfsHomeDir);
conf.set(RowInputFormat.INPUT_TABLE, tableName);
conf.setBoolean(RowInputFormat.CHECKPOINT_MODE, false);
Job job = Job.getInstance(conf, "Busy Airport Count");
job.setInputFormatClass(RowInputFormat.class);
// configure mapper and reducer
job.setMapperClass(SampleMapper.class);
job.setCombinerClass(IntSumReducer.class);
job.setReducerClass(IntSumReducer.class);
// Only have one reduce task so that all of the results from mapping are
// processed in one place.
job.setNumReduceTasks(1);
// configure output
TextOutputFormat.setOutputPath(job, intermediateOutputPath);
job.setOutputFormatClass(TextOutputFormat.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
int rc = job.waitForCompletion(true) ? 0 : 1;
if (rc == 0) {
Job topJob = Job.getInstance(getConf(), "Top Busy Airport");
// We want the task to run on a single VM
topJob.setNumReduceTasks(1);
// Set the inputs
topJob.setInputFormatClass(TextInputFormat.class);
TextInputFormat.addInputPath(topJob, intermediateOutputPath);
// Set the mapper and reducer
topJob.setMapperClass(TopBusyAirportMapper.class);
topJob.setReducerClass(TopBusyAirportReducer.class);
// Set the outputs
TextOutputFormat.setOutputPath(topJob, outputPath);
topJob.setOutputFormatClass(TextOutputFormat.class);
topJob.setOutputKeyClass(Text.class);
topJob.setOutputValueClass(IntWritable.class);
topJob.setMapOutputKeyClass(Text.class);
topJob.setMapOutputValueClass(StringIntPair.class);
rc = topJob.waitForCompletion(true) ? 0 : 1;
}
return rc;
}
/**
* @param args
*/
public static void main(String[] args) throws IOException, InterruptedException , ClassNotFoundException{
Configuration conf = new Configuration();
String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();
LOGGER.debug("Data Profiling job values respectively ["+otherArgs[0]+"], "+
otherArgs[1]);
StringBuilder sb = new StringBuilder();
int dynamicArgs = 0;
dynamicArgs = ((otherArgs.length)-1);
for (int i = dynamicArgs; i < otherArgs.length; i++) {
LOGGER.debug("other arguments" + otherArgs[i]);
sb.append(otherArgs[i]);
}
String outputPath = DataProfilingConstants.OUTPUT_DIR_PATH + new Date().getTime();
String inputPath = otherArgs[0];
String dpBeanString = sb.toString();
LOGGER.debug("Received dpBean value [" + dpBeanString+"]");
Gson gson = new Gson();
Type type = new TypeToken<DataProfilingBean>() {
}.getType();
DataProfilingBean dataProfilingBean = gson.fromJson(dpBeanString, type);
String recordSeparator = dataProfilingBean.getRecordSeparator();
conf.set(DataProfilingConstants.DATA_PROFILING_BEAN, dpBeanString);
conf.set(DataProfilingConstants.RECORD_SEPARATOR, recordSeparator);
conf.set(DataProfilingConstants.TEXTINPUTFORMAT_RECORD_DELIMITER, recordSeparator);
Job job = new Job(conf,DataProfilingConstants.JOB_NAME);
job.setJarByClass(DataProfilingJobExecutor.class);
job.setMapperClass(DataProfilingMapper.class);
job.setCombinerClass(DataProfilingReducer.class);
job.setReducerClass(DataProfilingReducer.class);
job.setInputFormatClass(TextInputFormat.class);
job.setOutputFormatClass(SequenceFileOutputFormat.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(IntWritable.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
Path[] inputPaths = FileUtil.getAllNestedFilePath(job, inputPath);
TextInputFormat.setInputPaths(job, inputPaths);
SequenceFileOutputFormat.setOutputPath(job, new Path(outputPath));
job.waitForCompletion(true);
LOGGER.debug("Job completed , now going to read the result from hdfs");
Set<CriteriaBasedDataProfiling> criteriaBasedDataProfilings = readJobOutputFromHdfs(conf,outputPath,dataProfilingBean);
final Gson dpReportGson = new GsonBuilder().disableHtmlEscaping().create();
final String jsonString = dpReportGson.toJson(criteriaBasedDataProfilings);
LOGGER.info(DataProfilingConstants.DATA_PROFILING_REPORT + jsonString);
}
public static void main(String[] args) throws Exception {
Configuration conf = new Configuration();
String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();
if (otherArgs.length != 3) {
System.err.println("Usage: JobChainingDriver <posts> <users> <out>");
System.exit(2);
}
Path postInput = new Path(otherArgs[0]);
Path userInput = new Path(otherArgs[1]);
Path outputDirIntermediate = new Path(otherArgs[2] + "_int");
Path outputDir = new Path(otherArgs[2]);
// Setup first job to counter user posts
Job countingJob = new Job(conf, "JobChaining-Counting");
countingJob.setJarByClass(BasicJobChaining.class);
// Set our mapper and reducer, we can use the API's long sum reducer for
// a combiner!
countingJob.setMapperClass(UserIdCountMapper.class);
countingJob.setCombinerClass(LongSumReducer.class);
countingJob.setReducerClass(UserIdSumReducer.class);
countingJob.setOutputKeyClass(Text.class);
countingJob.setOutputValueClass(LongWritable.class);
countingJob.setInputFormatClass(TextInputFormat.class);
TextInputFormat.addInputPath(countingJob, postInput);
countingJob.setOutputFormatClass(TextOutputFormat.class);
TextOutputFormat.setOutputPath(countingJob, outputDirIntermediate);
// Execute job and grab exit code
int code = countingJob.waitForCompletion(true) ? 0 : 1;
if (code == 0) {
// Calculate the average posts per user by getting counter values
double numRecords = (double) countingJob.getCounters()
.findCounter(AVERAGE_CALC_GROUP, UserIdCountMapper.RECORDS_COUNTER_NAME)
.getValue();
double numUsers = (double) countingJob.getCounters()
.findCounter(AVERAGE_CALC_GROUP, UserIdSumReducer.USERS_COUNTER_NAME)
.getValue();
double averagePostsPerUser = numRecords / numUsers;
// Setup binning job
Job binningJob = new Job(new Configuration(), "JobChaining-Binning");
binningJob.setJarByClass(BasicJobChaining.class);
// Set mapper and the average posts per user
binningJob.setMapperClass(UserIdBinningMapper.class);
UserIdBinningMapper.setAveragePostsPerUser(binningJob, averagePostsPerUser);
binningJob.setNumReduceTasks(0);
binningJob.setInputFormatClass(TextInputFormat.class);
TextInputFormat.addInputPath(binningJob, outputDirIntermediate);
// Add two named outputs for below/above average
MultipleOutputs.addNamedOutput(binningJob, MULTIPLE_OUTPUTS_BELOW_NAME,
TextOutputFormat.class, Text.class, Text.class);
MultipleOutputs.addNamedOutput(binningJob, MULTIPLE_OUTPUTS_ABOVE_NAME,
TextOutputFormat.class, Text.class, Text.class);
MultipleOutputs.setCountersEnabled(binningJob, true);
TextOutputFormat.setOutputPath(binningJob, outputDir);
// Add the user files to the DistributedCache
FileStatus[] userFiles = FileSystem.get(conf).listStatus(userInput);
for (FileStatus status : userFiles) {
DistributedCache.addCacheFile(status.getPath().toUri(),
binningJob.getConfiguration());
}
// Execute job and grab exit code
code = binningJob.waitForCompletion(true) ? 0 : 1;
}
// Clean up the intermediate output
FileSystem.get(conf).delete(outputDirIntermediate, true);
System.exit(code);
}