org.apache.hadoop.mapreduce.Job#setCombinerClass ( )源码实例Demo

下面列出了org.apache.hadoop.mapreduce.Job#setCombinerClass ( ) 实例代码,或者点击链接到github查看源代码,也可以在右侧发表评论。

源代码1 项目: IntroToHadoopAndMR__Udacity_Course   文件: P1Q2.java
public final static void main(final String[] args) throws Exception {
	final Configuration conf = new Configuration();

	final Job job = new Job(conf, "P1Q2");
	job.setJarByClass(P1Q2.class);

	job.setOutputKeyClass(Text.class);
	job.setOutputValueClass(DoubleWritable.class);

	job.setMapperClass(P1Q2Map.class);
	job.setCombinerClass(P1Q2Reduce.class);
	job.setReducerClass(P1Q2Reduce.class);

	job.setInputFormatClass(TextInputFormat.class);
	job.setOutputFormatClass(TextOutputFormat.class);

	FileInputFormat.addInputPath(job, new Path(args[0]));
	FileOutputFormat.setOutputPath(job, new Path(args[1]));

	job.waitForCompletion(true);
}
 
源代码2 项目: MapReduce-Demo   文件: Step3.java
public static boolean run(Configuration config, Map<String, String> paths)  throws IOException, ClassNotFoundException, InterruptedException {
	String jobName = "step3";
	Job job = Job.getInstance(config, jobName);
	job.setJarByClass(Step3.class);
	job.setJar("export\\ItemCF.jar");
	job.setMapperClass(Step3_Mapper.class);
	job.setReducerClass(Step3_Reducer.class);
	job.setCombinerClass(Step3_Reducer.class);
	job.setMapOutputKeyClass(Text.class);
	job.setMapOutputValueClass(IntWritable.class);

	Path inPath = new Path(paths.get("Step3Input"));
	Path outpath = new Path(paths.get("Step3Output"));
	FileInputFormat.addInputPath(job, inPath);
	FileOutputFormat.setOutputPath(job, outpath);		
	FileSystem fs = FileSystem.get(config);
	if (fs.exists(outpath)) {
		fs.delete(outpath, true);
	}
	
	return job.waitForCompletion(true);		
}
 
源代码3 项目: geowave   文件: KMeansDistortionJobRunner.java
@Override
public void configure(final Job job) throws Exception {

  job.setMapperClass(KMeansDistortionMapReduce.KMeansDistortionMapper.class);
  job.setMapOutputKeyClass(Text.class);
  job.setMapOutputValueClass(CountofDoubleWritable.class);
  job.setReducerClass(KMeansDistortionMapReduce.KMeansDistortionReduce.class);
  job.setCombinerClass(KMeansDistortionMapReduce.KMeansDistorationCombiner.class);
  job.setOutputKeyClass(GeoWaveOutputKey.class);
  job.setOutputValueClass(DistortionEntry.class);
  job.setOutputFormatClass(GeoWaveOutputFormat.class);
  // extends wait time to 15 minutes (default: 600 seconds)
  final long milliSeconds = 1000L * 60L * 15L;
  final Configuration conf = job.getConfiguration();
  conf.setLong("mapred.task.timeout", milliSeconds);
  ((ParameterEnum<Integer>) JumpParameters.Jump.COUNT_OF_CENTROIDS).getHelper().setValue(
      conf,
      KMeansDistortionMapReduce.class,
      Integer.valueOf(k));

  // Required since the Mapper uses the input format parameters to lookup
  // the adapter
  GeoWaveInputFormat.setStoreOptions(conf, dataStoreOptions);

  GeoWaveOutputFormat.addDataAdapter(conf, new DistortionDataAdapter());
}
 
源代码4 项目: big-c   文件: UserNamePermission.java
public static void main(String [] args) throws Exception
{
  Path outDir = new Path("output");
  Configuration conf = new Configuration();
  Job job = Job.getInstance(conf, "user name check"); 
	
	
  job.setJarByClass(UserNamePermission.class);
  job.setMapperClass(UserNamePermission.UserNameMapper.class);
  job.setCombinerClass(UserNamePermission.UserNameReducer.class);
  job.setMapOutputKeyClass(Text.class);
  job.setMapOutputValueClass(Text.class);
  job.setReducerClass(UserNamePermission.UserNameReducer.class);
  job.setNumReduceTasks(1);
    
  job.setInputFormatClass(TextInputFormat.class);
  TextInputFormat.addInputPath(job, new Path("input"));
  FileOutputFormat.setOutputPath(job, outDir);
    
  System.exit(job.waitForCompletion(true) ? 0 : 1);
}
 
源代码5 项目: bigdata-tutorial   文件: MapperInputSplitInfo.java
public static void main(String[] args) throws Exception {
	Configuration conf = new Configuration();
	String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();
	if (otherArgs.length != 2) {
		System.err.println("Usage: MapperInputSplitInfo <in> <out>");
		System.exit(2);
	}
	Job job = Job.getInstance(conf, MapperInputSplitInfo.class.getSimpleName());
	job.setJarByClass(MapperInputSplitInfo.class);
	job.setMapperClass(MyMapper.class);
	job.setCombinerClass(MyReducer.class);
	job.setReducerClass(MyReducer.class);
	job.setOutputKeyClass(Text.class);
	job.setOutputValueClass(IntWritable.class);
	FileInputFormat.addInputPath(job, new Path(otherArgs[0]));
	FileOutputFormat.setOutputPath(job, new Path(otherArgs[1]));
	System.exit(job.waitForCompletion(true) ? 0 : 1);
}
 
源代码6 项目: MLHadoop   文件: MF_Driver.java
public static void main(String[] args) throws IOException, InterruptedException, ClassNotFoundException {
	Configuration conf=new Configuration();
	Job job = new Job(conf);
	job.setJarByClass(MF_Driver.class);
	job.setJobName("Mutual Friend Calculator");
	FileInputFormat.setInputPaths(job, new Path(args[0]));
	FileOutputFormat.setOutputPath(job, new Path(args[1]));
	job.setMapperClass(MF_Mapper.class);
	job.setCombinerClass(MF_Reducer.class);
	job.setReducerClass(MF_Reducer.class);
	job.setMapOutputKeyClass(Text.class);
	job.setMapOutputValueClass(Text.class);
	job.setOutputKeyClass(Text.class);
	job.setOutputValueClass(Text.class);
	boolean success = job.waitForCompletion(true);
	System.exit(success ? 0 : 1);
}
 
源代码7 项目: IntroToHadoopAndMR__Udacity_Course   文件: P2Q2.java
public final static void main(final String[] args) throws Exception {
	final Configuration conf = new Configuration();

	final Job job = new Job(conf, "P2Q2");
	job.setJarByClass(P2Q2.class);

	job.setOutputKeyClass(Text.class);
	job.setOutputValueClass(IntWritable.class);

	job.setMapperClass(P2Q2Map.class);
	job.setCombinerClass(P2Q2Reduce.class);
	job.setReducerClass(P2Q2Reduce.class);

	job.setInputFormatClass(TextInputFormat.class);
	job.setOutputFormatClass(TextOutputFormat.class);

	FileInputFormat.addInputPath(job, new Path(args[0]));
	FileOutputFormat.setOutputPath(job, new Path(args[1]));

	job.waitForCompletion(true);
}
 
源代码8 项目: hadoop   文件: TestJobCounters.java
public static Job createJob() throws IOException {
  final Configuration conf = new Configuration();
  final Job baseJob = Job.getInstance(conf);
  baseJob.setOutputKeyClass(Text.class);
  baseJob.setOutputValueClass(IntWritable.class);
  baseJob.setMapperClass(NewMapTokenizer.class);
  baseJob.setCombinerClass(NewSummer.class);
  baseJob.setReducerClass(NewSummer.class);
  baseJob.setNumReduceTasks(1);
  baseJob.getConfiguration().setInt(JobContext.IO_SORT_MB, 1);
  baseJob.getConfiguration().set(JobContext.MAP_SORT_SPILL_PERCENT, "0.50");
  baseJob.getConfiguration().setInt(JobContext.MAP_COMBINE_MIN_SPILLS, 3);
  org.apache.hadoop.mapreduce.lib.input.FileInputFormat.setMinInputSplitSize(
      baseJob, Long.MAX_VALUE);
  return baseJob;
}
 
源代码9 项目: hadoop   文件: UserNamePermission.java
public static void main(String [] args) throws Exception
{
  Path outDir = new Path("output");
  Configuration conf = new Configuration();
  Job job = Job.getInstance(conf, "user name check"); 
	
	
  job.setJarByClass(UserNamePermission.class);
  job.setMapperClass(UserNamePermission.UserNameMapper.class);
  job.setCombinerClass(UserNamePermission.UserNameReducer.class);
  job.setMapOutputKeyClass(Text.class);
  job.setMapOutputValueClass(Text.class);
  job.setReducerClass(UserNamePermission.UserNameReducer.class);
  job.setNumReduceTasks(1);
    
  job.setInputFormatClass(TextInputFormat.class);
  TextInputFormat.addInputPath(job, new Path("input"));
  FileOutputFormat.setOutputPath(job, outDir);
    
  System.exit(job.waitForCompletion(true) ? 0 : 1);
}
 
源代码10 项目: hadoop-map-reduce-patterns   文件: MinMaxCount.java
@Override
public int run(String[] arg0) throws Exception {
	Configuration conf = new Configuration();
	String[] otherArgs = new GenericOptionsParser(conf, arg0).getRemainingArgs();
	if (otherArgs.length != 2) {
		System.err.println("Usage: MinMaxCount <in> <out>");
		System.exit(2);
	}
	Job job = new Job(conf, "StackOverflow Comment MinMaxCount");
	job.setJarByClass(MinMaxCount.class);
	job.setMapperClass(MinMaxCountMapper.class);
	job.setCombinerClass(MinMaxCountReducer.class);
	job.setReducerClass(MinMaxCountReducer.class);
	job.setOutputKeyClass(Text.class);
	job.setOutputValueClass(MinMaxCountTuple.class);
	FileInputFormat.addInputPath(job, new Path(otherArgs[0]));
	FileOutputFormat.setOutputPath(job, new Path(otherArgs[1]));
	boolean success = job.waitForCompletion(true);

	return success ? 0 : 1;
}
 
源代码11 项目: hadoop-gpu   文件: WordCount.java
public static void main(String[] args) throws Exception {
  Configuration conf = new Configuration();
  String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();
  if (otherArgs.length != 2) {
    System.err.println("Usage: wordcount <in> <out>");
    System.exit(2);
  }
  Job job = new Job(conf, "word count");
  job.setJarByClass(WordCount.class);
  job.setMapperClass(TokenizerMapper.class);
  job.setCombinerClass(IntSumReducer.class);
  job.setReducerClass(IntSumReducer.class);
  job.setOutputKeyClass(Text.class);
  job.setOutputValueClass(IntWritable.class);
  FileInputFormat.addInputPath(job, new Path(otherArgs[0]));
  FileOutputFormat.setOutputPath(job, new Path(otherArgs[1]));
  System.exit(job.waitForCompletion(true) ? 0 : 1);
}
 
源代码12 项目: hadoop-map-reduce-patterns   文件: DistinctUser.java
@Override
public int run(String[] args) throws Exception {
	Configuration conf = new Configuration();
	GenericOptionsParser parser = new GenericOptionsParser(conf, args);
	String[] otherArgs = parser.getRemainingArgs();
	if (otherArgs.length != 2) {
		System.err.println("Usage: DistinctUser <in> <out>");
		ToolRunner.printGenericCommandUsage(System.err);
		System.exit(2);
	}
	Job job = new Job(conf, "Distinct User");
	job.setJarByClass(DistinctUser.class);
	job.setMapperClass(DistinctUserMapper.class);
	job.setReducerClass(DistinctUserReducer.class);
	job.setCombinerClass(DistinctUserReducer.class);
	job.setOutputKeyClass(Text.class);
	job.setOutputValueClass(NullWritable.class);
	FileInputFormat.addInputPath(job, new Path(otherArgs[0]));
	FileOutputFormat.setOutputPath(job, new Path(otherArgs[1]));
	boolean success = job.waitForCompletion(true);

	return success ? 0 : 1;
}
 
源代码13 项目: geowave   文件: KMeansJobRunner.java
@Override
public void configure(final Job job) throws Exception {
  job.setMapperClass(KMeansMapReduce.KMeansMapper.class);
  job.setMapOutputKeyClass(GroupIDText.class);
  job.setMapOutputValueClass(BytesWritable.class);
  job.setReducerClass(KMeansMapReduce.KMeansReduce.class);
  job.setCombinerClass(KMeansMapReduce.KMeansCombiner.class);
  job.setReduceSpeculativeExecution(false);
  job.setOutputKeyClass(GeoWaveOutputKey.class);
  job.setOutputValueClass(SimpleFeature.class);
}
 
源代码14 项目: bigdata-tutorial   文件: MrjobRemoteCommitDemo.java
public static void main(String[] args) throws Exception {

		System.setProperty("HADOOP_USER_NAME", "hadoop");
		Configuration conf = new Configuration();

		//远程发布mr时配置
		conf.set("fs.defaultFS", "hdfs://192.168.1.191:9000");
		conf.set("hadoop.job.user", "hadoop");
		conf.set("mapreduce.framework.name", "yarn");
		conf.set("yarn.resourcemanager.hostname", "192.168.1.191");
		conf.set("mapred.jar", "out/artifacts/hadoop2-demo.jar");
		String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();
		if (otherArgs.length != 2) {
			System.err.println("Usage: MrjobRemoteCommitDemo <in> <out>");
			System.exit(2);
		}
		Job job = Job.getInstance(conf, MrjobRemoteCommitDemo.class.getName());
		job.setJarByClass(MrjobRemoteCommitDemo.class);
		job.setMapperClass(TokenizerMapper.class);
		job.setCombinerClass(IntSumReducer.class);
		job.setReducerClass(IntSumReducer.class);
		job.setOutputKeyClass(Text.class);
		job.setOutputValueClass(IntWritable.class);
		FileInputFormat.addInputPath(job, new Path(otherArgs[0]));
		FileOutputFormat.setOutputPath(job, new Path(otherArgs[1]));
		System.exit(job.waitForCompletion(true) ? 0 : 1);
	}
 
源代码15 项目: gemfirexd-oss   文件: BusyLegs.java
public int run(String[] args) throws Exception {

    GfxdDataSerializable.initTypes();
    Configuration conf = getConf();

    Path outputPath = new Path(args[0]);
    String hdfsHomeDir = args[1];
    String tableName = args[2];

    outputPath.getFileSystem(conf).delete(outputPath, true);

    conf.set(RowInputFormat.HOME_DIR, hdfsHomeDir);
    conf.set(RowInputFormat.INPUT_TABLE, tableName);
    conf.setBoolean(RowInputFormat.CHECKPOINT_MODE, false);

    Job job = Job.getInstance(conf, "Busy Leg Count");

    job.setInputFormatClass(RowInputFormat.class);

    // configure mapper and reducer
    job.setMapperClass(SampleMapper.class);
    job.setCombinerClass(IntSumReducer.class);
    job.setReducerClass(IntSumReducer.class);

    // configure output
    TextOutputFormat.setOutputPath(job, outputPath);
    job.setOutputFormatClass(TextOutputFormat.class);
    job.setOutputKeyClass(Text.class);
    job.setOutputValueClass(IntWritable.class);

    return job.waitForCompletion(true) ? 0 : 1;
  }
 
源代码16 项目: stratio-cassandra   文件: WordCountCounters.java
public int run(String[] args) throws Exception
{
    String inputMapperType = "native";
    if (args != null && args[0].startsWith(INPUT_MAPPER_VAR))
    {
        String[] arg0 = args[0].split("=");
        if (arg0 != null && arg0.length == 2)
            inputMapperType = arg0[1];
    }
    Job job = new Job(getConf(), "wordcountcounters");

    job.setCombinerClass(ReducerToFilesystem.class);
    job.setReducerClass(ReducerToFilesystem.class);
    job.setJarByClass(WordCountCounters.class); 

    ConfigHelper.setInputInitialAddress(job.getConfiguration(), "localhost");
    ConfigHelper.setInputPartitioner(job.getConfiguration(), "Murmur3Partitioner");
    ConfigHelper.setInputColumnFamily(job.getConfiguration(), WordCount.KEYSPACE, WordCount.OUTPUT_COLUMN_FAMILY);

    CqlConfigHelper.setInputCQLPageRowSize(job.getConfiguration(), "3");
    if ("native".equals(inputMapperType))
    {
        job.setMapperClass(SumNativeMapper.class);
        job.setInputFormatClass(CqlInputFormat.class);
        CqlConfigHelper.setInputCql(job.getConfiguration(), "select * from " + WordCount.OUTPUT_COLUMN_FAMILY + " where token(word) > ? and token(word) <= ? allow filtering");
    }
    else
    {
        job.setMapperClass(SumMapper.class);
        job.setInputFormatClass(CqlInputFormat.class);
        ConfigHelper.setInputRpcPort(job.getConfiguration(), "9160");
    }

    job.setOutputKeyClass(Text.class);
    job.setOutputValueClass(LongWritable.class);
    FileOutputFormat.setOutputPath(job, new Path(OUTPUT_PATH_PREFIX));
    job.waitForCompletion(true);
    return 0;
}
 
public static void run() throws IOException, ClassNotFoundException,
		InterruptedException {

	String inputPath = ItemBasedCFDriver.path.get("step9InputPath");
	String outputPath = ItemBasedCFDriver.path.get("step9OutputPath");

	Configuration conf = new Configuration();
	conf.set("mapred.textoutputformat.separator", ",");

	HDFS hdfs = new HDFS(conf);
	hdfs.rmr(outputPath);

	Job job = Job.getInstance(conf);

	job.setMapOutputKeyClass(Text.class);
	job.setMapOutputValueClass(DoubleWritable.class);
	job.setOutputKeyClass(Text.class);
	job.setOutputValueClass(DoubleWritable.class);

	job.setMapperClass(BlockMatrixStep3_Mapper.class);
	job.setReducerClass(BlockMatrixStep3_Reducer.class);
	job.setCombinerClass(BlockMatrixStep3_Reducer.class);
	job.setNumReduceTasks(ItemBasedCFDriver.ReducerNumber);

	job.setJarByClass(BlockMatrixMultiplicationStep3.class);

	job.setInputFormatClass(TextInputFormat.class);
	job.setOutputFormatClass(TextOutputFormat.class);

	FileInputFormat.setInputPaths(job, new Path(inputPath));
	FileOutputFormat.setOutputPath(job, new Path(outputPath));

	job.waitForCompletion(true);
}
 
源代码18 项目: gemfirexd-oss   文件: TopBusyAirport.java
public int run(String[] args) throws Exception {

    GfxdDataSerializable.initTypes();
    Configuration conf = getConf();

    Path outputPath = new Path(args[0]);
    Path intermediateOutputPath = new Path(args[0] + "_int");
    String hdfsHomeDir = args[1];
    String tableName = args[2];

    outputPath.getFileSystem(conf).delete(outputPath, true);
    intermediateOutputPath.getFileSystem(conf).delete(intermediateOutputPath, true);

    conf.set(RowInputFormat.HOME_DIR, hdfsHomeDir);
    conf.set(RowInputFormat.INPUT_TABLE, tableName);
    conf.setBoolean(RowInputFormat.CHECKPOINT_MODE, false);

    Job job = Job.getInstance(conf, "Busy Airport Count");

    job.setInputFormatClass(RowInputFormat.class);

    // configure mapper and reducer
    job.setMapperClass(SampleMapper.class);
    job.setCombinerClass(IntSumReducer.class);
    job.setReducerClass(IntSumReducer.class);

    // Only have one reduce task so that all of the results from mapping are
    // processed in one place.
    job.setNumReduceTasks(1);

    // configure output
    TextOutputFormat.setOutputPath(job, intermediateOutputPath);
    job.setOutputFormatClass(TextOutputFormat.class);
    job.setOutputKeyClass(Text.class);
    job.setOutputValueClass(IntWritable.class);

    int rc = job.waitForCompletion(true) ? 0 : 1;
    if (rc == 0) {
      Job topJob = Job.getInstance(getConf(), "Top Busy Airport");

      // We want the task to run on a single VM
      topJob.setNumReduceTasks(1);

      // Set the inputs
      topJob.setInputFormatClass(TextInputFormat.class);
      TextInputFormat.addInputPath(topJob, intermediateOutputPath);

      // Set the mapper and reducer
      topJob.setMapperClass(TopBusyAirportMapper.class);
      topJob.setReducerClass(TopBusyAirportReducer.class);

      // Set the outputs
      TextOutputFormat.setOutputPath(topJob, outputPath);
      topJob.setOutputFormatClass(TextOutputFormat.class);
      topJob.setOutputKeyClass(Text.class);
      topJob.setOutputValueClass(IntWritable.class);

      topJob.setMapOutputKeyClass(Text.class);
      topJob.setMapOutputValueClass(StringIntPair.class);

      rc = topJob.waitForCompletion(true) ? 0 : 1;
    }
    return rc;
  }
 
源代码19 项目: jumbune   文件: DataProfilingJobExecutor.java
/**
 * @param args
 */
public static void main(String[] args) throws IOException, InterruptedException , ClassNotFoundException{
	
	Configuration conf = new Configuration();
	String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();
	LOGGER.debug("Data Profiling job values respectively ["+otherArgs[0]+"], "+
			 otherArgs[1]);
	StringBuilder sb = new StringBuilder();
	
	int dynamicArgs = 0;		
	dynamicArgs = ((otherArgs.length)-1);
	
	for (int i = dynamicArgs; i < otherArgs.length; i++) {
		LOGGER.debug("other arguments" + otherArgs[i]);
		sb.append(otherArgs[i]);
	}
	
	String outputPath = DataProfilingConstants.OUTPUT_DIR_PATH + new Date().getTime();
	String inputPath = otherArgs[0];
	String dpBeanString = sb.toString();
	LOGGER.debug("Received dpBean value [" + dpBeanString+"]");
	Gson gson = new Gson();
	Type type = new TypeToken<DataProfilingBean>() {
	}.getType();
	
	DataProfilingBean dataProfilingBean = gson.fromJson(dpBeanString, type);
	String recordSeparator = dataProfilingBean.getRecordSeparator();
	conf.set(DataProfilingConstants.DATA_PROFILING_BEAN, dpBeanString);
	conf.set(DataProfilingConstants.RECORD_SEPARATOR, recordSeparator);
	
	conf.set(DataProfilingConstants.TEXTINPUTFORMAT_RECORD_DELIMITER, recordSeparator);
	
	Job job = new Job(conf,DataProfilingConstants.JOB_NAME);
	
	job.setJarByClass(DataProfilingJobExecutor.class);
	job.setMapperClass(DataProfilingMapper.class);
	
	job.setCombinerClass(DataProfilingReducer.class);
	job.setReducerClass(DataProfilingReducer.class);
	
	job.setInputFormatClass(TextInputFormat.class);
	job.setOutputFormatClass(SequenceFileOutputFormat.class);
	
	job.setMapOutputKeyClass(Text.class);
	job.setMapOutputValueClass(IntWritable.class);
	
	job.setOutputKeyClass(Text.class);
	job.setOutputValueClass(IntWritable.class);
	
   	Path[] inputPaths = FileUtil.getAllNestedFilePath(job, inputPath);		
	
	TextInputFormat.setInputPaths(job, inputPaths);
	SequenceFileOutputFormat.setOutputPath(job, new Path(outputPath));
	
	job.waitForCompletion(true);
	LOGGER.debug("Job completed , now going to read the result from hdfs");
	Set<CriteriaBasedDataProfiling> criteriaBasedDataProfilings = readJobOutputFromHdfs(conf,outputPath,dataProfilingBean);
	final Gson dpReportGson = new GsonBuilder().disableHtmlEscaping().create();

	final String jsonString = dpReportGson.toJson(criteriaBasedDataProfilings);
	LOGGER.info(DataProfilingConstants.DATA_PROFILING_REPORT + jsonString);
}
 
public static void main(String[] args) throws Exception {
	Configuration conf = new Configuration();
	String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();

	if (otherArgs.length != 3) {
		System.err.println("Usage: JobChainingDriver <posts> <users> <out>");
		System.exit(2);
	}

	Path postInput = new Path(otherArgs[0]);
	Path userInput = new Path(otherArgs[1]);
	Path outputDirIntermediate = new Path(otherArgs[2] + "_int");
	Path outputDir = new Path(otherArgs[2]);

	// Setup first job to counter user posts
	Job countingJob = new Job(conf, "JobChaining-Counting");
	countingJob.setJarByClass(BasicJobChaining.class);

	// Set our mapper and reducer, we can use the API's long sum reducer for
	// a combiner!
	countingJob.setMapperClass(UserIdCountMapper.class);
	countingJob.setCombinerClass(LongSumReducer.class);
	countingJob.setReducerClass(UserIdSumReducer.class);

	countingJob.setOutputKeyClass(Text.class);
	countingJob.setOutputValueClass(LongWritable.class);

	countingJob.setInputFormatClass(TextInputFormat.class);

	TextInputFormat.addInputPath(countingJob, postInput);

	countingJob.setOutputFormatClass(TextOutputFormat.class);
	TextOutputFormat.setOutputPath(countingJob, outputDirIntermediate);

	// Execute job and grab exit code
	int code = countingJob.waitForCompletion(true) ? 0 : 1;

	if (code == 0) {
		// Calculate the average posts per user by getting counter values
		double numRecords = (double) countingJob.getCounters()
				.findCounter(AVERAGE_CALC_GROUP, UserIdCountMapper.RECORDS_COUNTER_NAME)
				.getValue();
		double numUsers = (double) countingJob.getCounters()
				.findCounter(AVERAGE_CALC_GROUP, UserIdSumReducer.USERS_COUNTER_NAME)
				.getValue();

		double averagePostsPerUser = numRecords / numUsers;

		// Setup binning job
		Job binningJob = new Job(new Configuration(), "JobChaining-Binning");
		binningJob.setJarByClass(BasicJobChaining.class);

		// Set mapper and the average posts per user
		binningJob.setMapperClass(UserIdBinningMapper.class);
		UserIdBinningMapper.setAveragePostsPerUser(binningJob, averagePostsPerUser);

		binningJob.setNumReduceTasks(0);

		binningJob.setInputFormatClass(TextInputFormat.class);
		TextInputFormat.addInputPath(binningJob, outputDirIntermediate);

		// Add two named outputs for below/above average
		MultipleOutputs.addNamedOutput(binningJob, MULTIPLE_OUTPUTS_BELOW_NAME,
				TextOutputFormat.class, Text.class, Text.class);

		MultipleOutputs.addNamedOutput(binningJob, MULTIPLE_OUTPUTS_ABOVE_NAME,
				TextOutputFormat.class, Text.class, Text.class);
		MultipleOutputs.setCountersEnabled(binningJob, true);

		TextOutputFormat.setOutputPath(binningJob, outputDir);

		// Add the user files to the DistributedCache
		FileStatus[] userFiles = FileSystem.get(conf).listStatus(userInput);
		for (FileStatus status : userFiles) {
			DistributedCache.addCacheFile(status.getPath().toUri(),
					binningJob.getConfiguration());
		}

		// Execute job and grab exit code
		code = binningJob.waitForCompletion(true) ? 0 : 1;
	}

	// Clean up the intermediate output
	FileSystem.get(conf).delete(outputDirIntermediate, true);

	System.exit(code);
}