下面列出了怎么用org.apache.hadoop.mapreduce.lib.reduce.IntSumReducer的API类实例代码及写法,或者点击链接到github查看源代码。
public void testNoChain() throws Exception {
Path inDir = new Path(localPathRoot, "testing/chain/input");
Path outDir = new Path(localPathRoot, "testing/chain/output");
String input = "a\nb\na\n";
String expectedOutput = "a\t2\nb\t1\n";
Configuration conf = createJobConf();
Job job = MapReduceTestUtil.createJob(conf, inDir, outDir, 1, 1, input);
job.setJobName("chain");
ChainMapper.addMapper(job, TokenCounterMapper.class, Object.class,
Text.class, Text.class, IntWritable.class, null);
ChainReducer.setReducer(job, IntSumReducer.class, Text.class,
IntWritable.class, Text.class, IntWritable.class, null);
job.waitForCompletion(true);
assertTrue("Job failed", job.isSuccessful());
assertEquals("Outputs doesn't match", expectedOutput, MapReduceTestUtil
.readOutput(outDir, conf));
}
public void testNoChain() throws Exception {
Path inDir = new Path(localPathRoot, "testing/chain/input");
Path outDir = new Path(localPathRoot, "testing/chain/output");
String input = "a\nb\na\n";
String expectedOutput = "a\t2\nb\t1\n";
Configuration conf = createJobConf();
Job job = MapReduceTestUtil.createJob(conf, inDir, outDir, 1, 1, input);
job.setJobName("chain");
ChainMapper.addMapper(job, TokenCounterMapper.class, Object.class,
Text.class, Text.class, IntWritable.class, null);
ChainReducer.setReducer(job, IntSumReducer.class, Text.class,
IntWritable.class, Text.class, IntWritable.class, null);
job.waitForCompletion(true);
assertTrue("Job failed", job.isSuccessful());
assertEquals("Outputs doesn't match", expectedOutput, MapReduceTestUtil
.readOutput(outDir, conf));
}
public int run(String[] args) throws Exception {
if(args.length < 2) {
printUsage();
return 2;
}
Job job = Job.getInstance(getConf());
job.setJobName("MultiFileWordCount");
job.setJarByClass(MultiFileWordCount.class);
//set the InputFormat of the job to our InputFormat
job.setInputFormatClass(MyInputFormat.class);
// the keys are words (strings)
job.setOutputKeyClass(Text.class);
// the values are counts (ints)
job.setOutputValueClass(IntWritable.class);
//use the defined mapper
job.setMapperClass(MapClass.class);
//use the WordCount Reducer
job.setCombinerClass(IntSumReducer.class);
job.setReducerClass(IntSumReducer.class);
FileInputFormat.addInputPaths(job, args[0]);
FileOutputFormat.setOutputPath(job, new Path(args[1]));
return job.waitForCompletion(true) ? 0 : 1;
}
public int run(String[] args) throws Exception {
GfxdDataSerializable.initTypes();
Configuration conf = getConf();
Path outputPath = new Path(args[0]);
String hdfsHomeDir = args[1];
String tableName = args[2];
outputPath.getFileSystem(conf).delete(outputPath, true);
conf.set(RowInputFormat.HOME_DIR, hdfsHomeDir);
conf.set(RowInputFormat.INPUT_TABLE, tableName);
conf.setBoolean(RowInputFormat.CHECKPOINT_MODE, false);
Job job = Job.getInstance(conf, "Busy Airport Count");
job.setInputFormatClass(RowInputFormat.class);
// configure mapper and reducer
job.setMapperClass(SampleMapper.class);
job.setCombinerClass(IntSumReducer.class);
job.setReducerClass(IntSumReducer.class);
// configure output
TextOutputFormat.setOutputPath(job, outputPath);
job.setOutputFormatClass(TextOutputFormat.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
return job.waitForCompletion(true) ? 0 : 1;
}
public int run(String[] args) throws Exception {
GfxdDataSerializable.initTypes();
Configuration conf = getConf();
Path outputPath = new Path(args[0]);
String hdfsHomeDir = args[1];
String tableName = args[2];
outputPath.getFileSystem(conf).delete(outputPath, true);
conf.set(RowInputFormat.HOME_DIR, hdfsHomeDir);
conf.set(RowInputFormat.INPUT_TABLE, tableName);
conf.setBoolean(RowInputFormat.CHECKPOINT_MODE, false);
Job job = Job.getInstance(conf, "Busy Leg Count");
job.setInputFormatClass(RowInputFormat.class);
// configure mapper and reducer
job.setMapperClass(SampleMapper.class);
job.setCombinerClass(IntSumReducer.class);
job.setReducerClass(IntSumReducer.class);
// configure output
TextOutputFormat.setOutputPath(job, outputPath);
job.setOutputFormatClass(TextOutputFormat.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
return job.waitForCompletion(true) ? 0 : 1;
}
public int run(String[] args) throws Exception {
if(args.length < 2) {
printUsage();
return 2;
}
Job job = Job.getInstance(getConf());
job.setJobName("MultiFileWordCount");
job.setJarByClass(MultiFileWordCount.class);
//set the InputFormat of the job to our InputFormat
job.setInputFormatClass(MyInputFormat.class);
// the keys are words (strings)
job.setOutputKeyClass(Text.class);
// the values are counts (ints)
job.setOutputValueClass(IntWritable.class);
//use the defined mapper
job.setMapperClass(MapClass.class);
//use the WordCount Reducer
job.setCombinerClass(IntSumReducer.class);
job.setReducerClass(IntSumReducer.class);
FileInputFormat.addInputPaths(job, args[0]);
FileOutputFormat.setOutputPath(job, new Path(args[1]));
return job.waitForCompletion(true) ? 0 : 1;
}
public int run(String[] args) throws Exception {
GfxdDataSerializable.initTypes();
Configuration conf = getConf();
Path outputPath = new Path(args[0]);
String hdfsHomeDir = args[1];
String tableName = args[2];
outputPath.getFileSystem(conf).delete(outputPath, true);
conf.set(RowInputFormat.HOME_DIR, hdfsHomeDir);
conf.set(RowInputFormat.INPUT_TABLE, tableName);
conf.setBoolean(RowInputFormat.CHECKPOINT_MODE, false);
Job job = Job.getInstance(conf, "Busy Airport Count");
job.setInputFormatClass(RowInputFormat.class);
// configure mapper and reducer
job.setMapperClass(SampleMapper.class);
job.setCombinerClass(IntSumReducer.class);
job.setReducerClass(IntSumReducer.class);
// configure output
TextOutputFormat.setOutputPath(job, outputPath);
job.setOutputFormatClass(TextOutputFormat.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
return job.waitForCompletion(true) ? 0 : 1;
}
public int run(String[] args) throws Exception {
GfxdDataSerializable.initTypes();
Configuration conf = getConf();
Path outputPath = new Path(args[0]);
String hdfsHomeDir = args[1];
String tableName = args[2];
outputPath.getFileSystem(conf).delete(outputPath, true);
conf.set(RowInputFormat.HOME_DIR, hdfsHomeDir);
conf.set(RowInputFormat.INPUT_TABLE, tableName);
conf.setBoolean(RowInputFormat.CHECKPOINT_MODE, false);
Job job = Job.getInstance(conf, "Busy Leg Count");
job.setInputFormatClass(RowInputFormat.class);
// configure mapper and reducer
job.setMapperClass(SampleMapper.class);
job.setCombinerClass(IntSumReducer.class);
job.setReducerClass(IntSumReducer.class);
// configure output
TextOutputFormat.setOutputPath(job, outputPath);
job.setOutputFormatClass(TextOutputFormat.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
return job.waitForCompletion(true) ? 0 : 1;
}
public int run(String[] args) throws Exception {
GfxdDataSerializable.initTypes();
Configuration conf = getConf();
Path outputPath = new Path(args[0]);
Path intermediateOutputPath = new Path(args[0] + "_int");
String hdfsHomeDir = args[1];
String tableName = args[2];
outputPath.getFileSystem(conf).delete(outputPath, true);
intermediateOutputPath.getFileSystem(conf).delete(intermediateOutputPath, true);
conf.set(RowInputFormat.HOME_DIR, hdfsHomeDir);
conf.set(RowInputFormat.INPUT_TABLE, tableName);
conf.setBoolean(RowInputFormat.CHECKPOINT_MODE, false);
Job job = Job.getInstance(conf, "Busy Airport Count");
job.setJarByClass(TopBusyAirportGemfirexd.class);
job.setInputFormatClass(RowInputFormat.class);
// configure mapper and reducer
job.setMapperClass(SampleMapper.class);
job.setCombinerClass(IntSumReducer.class);
job.setReducerClass(IntSumReducer.class);
// configure output
TextOutputFormat.setOutputPath(job, intermediateOutputPath);
job.setOutputFormatClass(TextOutputFormat.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
int rc = job.waitForCompletion(true) ? 0 : 1;
if (rc == 0) {
Configuration topConf = getConf();
String gemfirexdUrl = topConf.get("gemfirexd.url", "jdbc:gemfirexd://localhost:1527");
topConf.set(RowOutputFormat.OUTPUT_URL, gemfirexdUrl);
topConf.set(RowOutputFormat.OUTPUT_TABLE, "APP.BUSY_AIRPORT");
Configuration.dumpConfiguration(topConf, new PrintWriter(System.out));
Job topJob = Job.getInstance(topConf, "Top Busy Airport");
// We want the task to run on a single VM
topJob.setNumReduceTasks(1);
// Set the inputs
topJob.setInputFormatClass(TextInputFormat.class);
TextInputFormat.addInputPath(topJob, intermediateOutputPath);
// Set the mapper and reducer
topJob.setMapperClass(TopBusyAirportMapper.class);
topJob.setReducerClass(TopBusyAirportReducer.class);
topJob.setMapOutputKeyClass(Text.class);
topJob.setMapOutputValueClass(StringIntPair.class);
// Set the outputs
TextOutputFormat.setOutputPath(topJob, outputPath);
topJob.setOutputFormatClass(RowOutputFormat.class);
topJob.setOutputKeyClass(Key.class);
topJob.setOutputValueClass(BusyAirportModel.class);
rc = topJob.waitForCompletion(true) ? 0 : 1;
}
return rc;
}
public int run(String[] args) throws Exception {
GfxdDataSerializable.initTypes();
Configuration conf = getConf();
Path outputPath = new Path(args[0]);
Path intermediateOutputPath = new Path(args[0] + "_int");
String hdfsHomeDir = args[1];
String tableName = args[2];
outputPath.getFileSystem(conf).delete(outputPath, true);
intermediateOutputPath.getFileSystem(conf).delete(intermediateOutputPath, true);
conf.set(RowInputFormat.HOME_DIR, hdfsHomeDir);
conf.set(RowInputFormat.INPUT_TABLE, tableName);
conf.setBoolean(RowInputFormat.CHECKPOINT_MODE, false);
Job job = Job.getInstance(conf, "Busy Airport Count");
job.setInputFormatClass(RowInputFormat.class);
// configure mapper and reducer
job.setMapperClass(SampleMapper.class);
job.setCombinerClass(IntSumReducer.class);
job.setReducerClass(IntSumReducer.class);
// Only have one reduce task so that all of the results from mapping are
// processed in one place.
job.setNumReduceTasks(1);
// configure output
TextOutputFormat.setOutputPath(job, intermediateOutputPath);
job.setOutputFormatClass(TextOutputFormat.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
int rc = job.waitForCompletion(true) ? 0 : 1;
if (rc == 0) {
Job topJob = Job.getInstance(getConf(), "Top Busy Airport");
// We want the task to run on a single VM
topJob.setNumReduceTasks(1);
// Set the inputs
topJob.setInputFormatClass(TextInputFormat.class);
TextInputFormat.addInputPath(topJob, intermediateOutputPath);
// Set the mapper and reducer
topJob.setMapperClass(TopBusyAirportMapper.class);
topJob.setReducerClass(TopBusyAirportReducer.class);
// Set the outputs
TextOutputFormat.setOutputPath(topJob, outputPath);
topJob.setOutputFormatClass(TextOutputFormat.class);
topJob.setOutputKeyClass(Text.class);
topJob.setOutputValueClass(IntWritable.class);
topJob.setMapOutputKeyClass(Text.class);
topJob.setMapOutputValueClass(StringIntPair.class);
rc = topJob.waitForCompletion(true) ? 0 : 1;
}
return rc;
}
public int run(String[] args) throws Exception {
GfxdDataSerializable.initTypes();
Configuration conf = getConf();
Path outputPath = new Path(args[0]);
Path intermediateOutputPath = new Path(args[0] + "_int");
String hdfsHomeDir = args[1];
String tableName = args[2];
outputPath.getFileSystem(conf).delete(outputPath, true);
intermediateOutputPath.getFileSystem(conf).delete(intermediateOutputPath, true);
conf.set(RowInputFormat.HOME_DIR, hdfsHomeDir);
conf.set(RowInputFormat.INPUT_TABLE, tableName);
conf.setBoolean(RowInputFormat.CHECKPOINT_MODE, false);
Job job = Job.getInstance(conf, "Busy Airport Count");
job.setJarByClass(TopBusyAirportGemfirexd.class);
job.setInputFormatClass(RowInputFormat.class);
// configure mapper and reducer
job.setMapperClass(SampleMapper.class);
job.setCombinerClass(IntSumReducer.class);
job.setReducerClass(IntSumReducer.class);
// configure output
TextOutputFormat.setOutputPath(job, intermediateOutputPath);
job.setOutputFormatClass(TextOutputFormat.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
int rc = job.waitForCompletion(true) ? 0 : 1;
if (rc == 0) {
Configuration topConf = getConf();
String gemfirexdUrl = topConf.get("gemfirexd.url", "jdbc:gemfirexd://localhost:1527");
topConf.set(RowOutputFormat.OUTPUT_URL, gemfirexdUrl);
topConf.set(RowOutputFormat.OUTPUT_TABLE, "APP.BUSY_AIRPORT");
Configuration.dumpConfiguration(topConf, new PrintWriter(System.out));
Job topJob = Job.getInstance(topConf, "Top Busy Airport");
// We want the task to run on a single VM
topJob.setNumReduceTasks(1);
// Set the inputs
topJob.setInputFormatClass(TextInputFormat.class);
TextInputFormat.addInputPath(topJob, intermediateOutputPath);
// Set the mapper and reducer
topJob.setMapperClass(TopBusyAirportMapper.class);
topJob.setReducerClass(TopBusyAirportReducer.class);
topJob.setMapOutputKeyClass(Text.class);
topJob.setMapOutputValueClass(StringIntPair.class);
// Set the outputs
TextOutputFormat.setOutputPath(topJob, outputPath);
topJob.setOutputFormatClass(RowOutputFormat.class);
topJob.setOutputKeyClass(Key.class);
topJob.setOutputValueClass(BusyAirportModel.class);
rc = topJob.waitForCompletion(true) ? 0 : 1;
}
return rc;
}
public int run(String[] args) throws Exception {
GfxdDataSerializable.initTypes();
Configuration conf = getConf();
Path outputPath = new Path(args[0]);
Path intermediateOutputPath = new Path(args[0] + "_int");
String hdfsHomeDir = args[1];
String tableName = args[2];
outputPath.getFileSystem(conf).delete(outputPath, true);
intermediateOutputPath.getFileSystem(conf).delete(intermediateOutputPath, true);
conf.set(RowInputFormat.HOME_DIR, hdfsHomeDir);
conf.set(RowInputFormat.INPUT_TABLE, tableName);
conf.setBoolean(RowInputFormat.CHECKPOINT_MODE, false);
Job job = Job.getInstance(conf, "Busy Airport Count");
job.setInputFormatClass(RowInputFormat.class);
// configure mapper and reducer
job.setMapperClass(SampleMapper.class);
job.setCombinerClass(IntSumReducer.class);
job.setReducerClass(IntSumReducer.class);
// Only have one reduce task so that all of the results from mapping are
// processed in one place.
job.setNumReduceTasks(1);
// configure output
TextOutputFormat.setOutputPath(job, intermediateOutputPath);
job.setOutputFormatClass(TextOutputFormat.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
int rc = job.waitForCompletion(true) ? 0 : 1;
if (rc == 0) {
Job topJob = Job.getInstance(getConf(), "Top Busy Airport");
// We want the task to run on a single VM
topJob.setNumReduceTasks(1);
// Set the inputs
topJob.setInputFormatClass(TextInputFormat.class);
TextInputFormat.addInputPath(topJob, intermediateOutputPath);
// Set the mapper and reducer
topJob.setMapperClass(TopBusyAirportMapper.class);
topJob.setReducerClass(TopBusyAirportReducer.class);
// Set the outputs
TextOutputFormat.setOutputPath(topJob, outputPath);
topJob.setOutputFormatClass(TextOutputFormat.class);
topJob.setOutputKeyClass(Text.class);
topJob.setOutputValueClass(IntWritable.class);
topJob.setMapOutputKeyClass(Text.class);
topJob.setMapOutputValueClass(StringIntPair.class);
rc = topJob.waitForCompletion(true) ? 0 : 1;
}
return rc;
}