下面列出了org.apache.hadoop.mapreduce.Job#setNumReduceTasks ( ) 实例代码,或者点击链接到github查看源代码,也可以在右侧发表评论。
public int run(String[] args) throws Exception {
Job job = Job.getInstance(getConf());
if (args.length != 2) {
usage();
return 2;
}
TeraInputFormat.setInputPaths(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));
job.setJobName("TeraSum");
job.setJarByClass(TeraChecksum.class);
job.setMapperClass(ChecksumMapper.class);
job.setReducerClass(ChecksumReducer.class);
job.setOutputKeyClass(NullWritable.class);
job.setOutputValueClass(Unsigned16.class);
// force a single reducer
job.setNumReduceTasks(1);
job.setInputFormatClass(TeraInputFormat.class);
return job.waitForCompletion(true) ? 0 : 1;
}
public void testOneRemoteJT() throws Exception {
LOG.info("Starting testOneRemoteJT");
String[] racks = "/rack-1".split(",");
String[] trackers = "tracker-1".split(",");
corona = new MiniCoronaCluster.Builder().numTaskTrackers(1).racks(racks)
.hosts(trackers).build();
Configuration conf = corona.createJobConf();
conf.set("mapred.job.tracker", "corona");
conf.set("mapred.job.tracker.class", CoronaJobTracker.class.getName());
String locationsCsv = "tracker-1";
conf.set("test.locations", locationsCsv);
conf.setBoolean("mapred.coronajobtracker.forceremote", true);
Job job = new Job(conf);
job.setMapperClass(TstJob.TestMapper.class);
job.setInputFormatClass(TstJob.TestInputFormat.class);
job.setOutputFormatClass(NullOutputFormat.class);
job.setNumReduceTasks(0);
job.getConfiguration().set("io.sort.record.pct", "0.50");
job.getConfiguration().set("io.sort.mb", "25");
boolean success = job.waitForCompletion(true);
assertTrue("Job did not succeed", success);
}
void testInputFormat(Class<? extends InputFormat> clazz)
throws IOException, InterruptedException, ClassNotFoundException {
final Job job = MapreduceTestingShim.createJob(UTIL.getConfiguration());
job.setInputFormatClass(clazz);
job.setOutputFormatClass(NullOutputFormat.class);
job.setMapperClass(ExampleVerifier.class);
job.setNumReduceTasks(0);
LOG.debug("submitting job.");
assertTrue("job failed!", job.waitForCompletion(true));
assertEquals("Saw the wrong number of instances of the filtered-for row.", 2, job.getCounters()
.findCounter(TestTableInputFormat.class.getName() + ":row", "aaa").getValue());
assertEquals("Saw any instances of the filtered out row.", 0, job.getCounters()
.findCounter(TestTableInputFormat.class.getName() + ":row", "bbb").getValue());
assertEquals("Saw the wrong number of instances of columnA.", 1, job.getCounters()
.findCounter(TestTableInputFormat.class.getName() + ":family", "columnA").getValue());
assertEquals("Saw the wrong number of instances of columnB.", 1, job.getCounters()
.findCounter(TestTableInputFormat.class.getName() + ":family", "columnB").getValue());
assertEquals("Saw the wrong count of values for the filtered-for row.", 2, job.getCounters()
.findCounter(TestTableInputFormat.class.getName() + ":value", "value aaa").getValue());
assertEquals("Saw the wrong count of values for the filtered-out row.", 0, job.getCounters()
.findCounter(TestTableInputFormat.class.getName() + ":value", "value bbb").getValue());
}
private Job getVertexJobWithDefaultMapper(org.apache.hadoop.conf.Configuration c) throws IOException {
Job job = Job.getInstance(c);
job.setJarByClass(HadoopScanMapper.class);
job.setJobName("testPartitionedVertexScan");
job.setOutputKeyClass(NullWritable.class);
job.setOutputValueClass(NullWritable.class);
job.setMapOutputKeyClass(NullWritable.class);
job.setMapOutputValueClass(NullWritable.class);
job.setNumReduceTasks(0);
job.setOutputFormatClass(NullOutputFormat.class);
job.setInputFormatClass(CassandraInputFormat.class);
return job;
}
@Override
public int run(String[] args) throws Exception {
Configuration conf = getConf();
GenericOptionsParser optionparser = new GenericOptionsParser(conf, args);
conf = optionparser.getConfiguration();
Job job = Job.getInstance(conf, "leftjoin");
job.setJarByClass(LeftJoin.class);
FileInputFormat.addInputPaths(job, conf.get("input_dir"));
Path out = new Path(conf.get("output_dir"));
FileOutputFormat.setOutputPath(job, out);
job.setNumReduceTasks(conf.getInt("reduce_num", 1));
job.setMapperClass(LeftJoinMapper.class);
job.setReducerClass(LeftJoinReduce.class);
job.setInputFormatClass(TextInputFormat.class);
job.setOutputFormatClass(TextOutputFormat.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(Text.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Text.class);
conf.set("mapred.textoutputformat.separator", ",");
return (job.waitForCompletion(true) ? 0 : 1);
}
protected void doVerify(Configuration conf, TableDescriptor tableDescriptor) throws Exception {
Path outputDir = getTestDir(TEST_NAME, "verify-output");
LOG.info("Verify output dir: " + outputDir);
Job job = Job.getInstance(conf);
job.setJarByClass(this.getClass());
job.setJobName(TEST_NAME + " Verification for " + tableDescriptor.getTableName());
setJobScannerConf(job);
Scan scan = new Scan();
TableMapReduceUtil.initTableMapperJob(
tableDescriptor.getTableName().getNameAsString(), scan, VerifyMapper.class,
BytesWritable.class, BytesWritable.class, job);
TableMapReduceUtil.addDependencyJarsForClasses(job.getConfiguration(), AbstractHBaseTool.class);
int scannerCaching = conf.getInt("verify.scannercaching", SCANNER_CACHING);
TableMapReduceUtil.setScannerCaching(job, scannerCaching);
job.setReducerClass(VerifyReducer.class);
job.setNumReduceTasks(conf.getInt(NUM_REDUCE_TASKS_KEY, NUM_REDUCE_TASKS_DEFAULT));
FileOutputFormat.setOutputPath(job, outputDir);
assertTrue(job.waitForCompletion(true));
long numOutputRecords = job.getCounters().findCounter(Counters.ROWS_WRITTEN).getValue();
assertEquals(0, numOutputRecords);
}
private Path doMapReduce(final String inputFile) throws Exception {
final FileSystem fileSystem = FileSystem.get(conf);
final Path inputPath = new Path(inputFile);
final Path outputPath = fileSystem.makeQualified(new Path("target/out"));
fileSystem.delete(outputPath, true);
final Job job = Job.getInstance(conf);
FileInputFormat.setInputPaths(job, inputPath);
conf.set(BAMTestNoHeaderOutputFormat.READ_HEADER_FROM_FILE, inputFile);
job.setInputFormatClass(BAMInputFormat.class);
job.setMapOutputKeyClass(LongWritable.class);
job.setMapOutputValueClass(SAMRecordWritable.class);
job.setOutputFormatClass(BAMTestNoHeaderOutputFormat.class);
job.setOutputKeyClass(LongWritable.class);
job.setOutputValueClass(SAMRecordWritable.class);
job.setNumReduceTasks(0);
FileOutputFormat.setOutputPath(job, outputPath);
final boolean success = job.waitForCompletion(true);
assertTrue(success);
return outputPath;
}
protected Job doLoad(Configuration conf, TableDescriptor tableDescriptor) throws Exception {
Path outputDir = getTestDir(TEST_NAME, "load-output");
LOG.info("Load output dir: " + outputDir);
NMapInputFormat.setNumMapTasks(conf, conf.getInt(NUM_MAP_TASKS_KEY, NUM_MAP_TASKS_DEFAULT));
conf.set(TABLE_NAME_KEY, tableDescriptor.getTableName().getNameAsString());
Job job = Job.getInstance(conf);
job.setJobName(TEST_NAME + " Load for " + tableDescriptor.getTableName());
job.setJarByClass(this.getClass());
setMapperClass(job);
job.setInputFormatClass(NMapInputFormat.class);
job.setNumReduceTasks(0);
setJobScannerConf(job);
FileOutputFormat.setOutputPath(job, outputDir);
TableMapReduceUtil.addDependencyJars(job);
TableMapReduceUtil.addDependencyJarsForClasses(job.getConfiguration(), AbstractHBaseTool.class);
TableMapReduceUtil.initCredentials(job);
assertTrue(job.waitForCompletion(true));
return job;
}
private Path doMapReduce(final Path inputPath, final boolean writeHeader)
throws Exception {
final FileSystem fileSystem = FileSystem.get(conf);
final Path outputPath = fileSystem.makeQualified(new Path("target/out"));
fileSystem.delete(outputPath, true);
final Job job = Job.getInstance(conf);
FileInputFormat.setInputPaths(job, inputPath);
job.setInputFormatClass(VCFInputFormat.class);
job.setMapOutputKeyClass(LongWritable.class);
job.setMapOutputValueClass(VariantContextWritable.class);
job.setOutputFormatClass(writeHeader ? VCFTestWithHeaderOutputFormat.class :
VCFTestNoHeaderOutputFormat.class);
job.setOutputKeyClass(LongWritable.class);
job.setOutputValueClass(VariantContextWritable.class);
job.setNumReduceTasks(0);
FileOutputFormat.setOutputPath(job, outputPath);
if (codecClass != null) {
FileOutputFormat.setOutputCompressorClass(job, codecClass);
}
final boolean success = job.waitForCompletion(true);
assertTrue(success);
return outputPath;
}
public static void main(String[] args) throws Exception {
Configuration conf = new Configuration();
Job job = Job.getInstance(conf, "HDFS to TG");
job.setJarByClass(Hdfs2Tg.class);
job.setMapperClass(LineMapper.class);
job.setNumReduceTasks(0);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
FileInputFormat.addInputPath(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));
System.exit(job.waitForCompletion(true) ? 0 : 1);
}
public int runRandomInputGenerator(int numMappers, long numNodes, Path tmpOutput,
Integer width, Integer wrapMultiplier, Integer numWalkers)
throws Exception {
LOG.info("Running RandomInputGenerator with numMappers=" + numMappers
+ ", numNodes=" + numNodes);
Job job = Job.getInstance(getConf());
job.setJobName("Random Input Generator");
job.setNumReduceTasks(0);
job.setJarByClass(getClass());
job.setInputFormatClass(GeneratorInputFormat.class);
job.setOutputKeyClass(BytesWritable.class);
job.setOutputValueClass(NullWritable.class);
setJobConf(job, numMappers, numNodes, width, wrapMultiplier, numWalkers);
job.setMapperClass(Mapper.class); //identity mapper
FileOutputFormat.setOutputPath(job, tmpOutput);
job.setOutputFormatClass(SequenceFileOutputFormat.class);
TableMapReduceUtil.addDependencyJarsForClasses(job.getConfiguration(), Random64.class);
boolean success = jobCompletion(job);
return success ? 0 : 1;
}
public static void main(String[] args) throws Exception {
Opts opts = new Opts();
opts.parseArgs(TableToFile.class.getName(), args);
List<IteratorSetting.Column> columnsToFetch = new ArrayList<>();
for (String col : opts.columns.split(",")) {
int idx = col.indexOf(":");
String cf = idx < 0 ? col : col.substring(0, idx);
String cq = idx < 0 ? null : col.substring(idx + 1);
if (!cf.isEmpty())
columnsToFetch.add(new IteratorSetting.Column(cf, cq));
}
Job job = Job.getInstance(opts.getHadoopConfig());
job.setJobName(TableToFile.class.getSimpleName() + "_" + System.currentTimeMillis());
job.setJarByClass(TableToFile.class);
job.setInputFormatClass(AccumuloInputFormat.class);
InputFormatBuilder.InputFormatOptions<Job> inputOpts = AccumuloInputFormat.configure()
.clientProperties(opts.getClientProperties()).table(opts.tableName);
if (!columnsToFetch.isEmpty()) {
inputOpts.fetchColumns(columnsToFetch);
}
inputOpts.store(job);
job.setMapperClass(TTFMapper.class);
job.setMapOutputKeyClass(NullWritable.class);
job.setMapOutputValueClass(Text.class);
job.setNumReduceTasks(0);
job.setOutputFormatClass(TextOutputFormat.class);
TextOutputFormat.setOutputPath(job, new Path(opts.output));
System.exit(job.waitForCompletion(true) ? 0 : 1);
}
public static void configureJob(Job job, int numDays, String instance, String zookeepers, String userName, String password, String outputTable)
throws AccumuloSecurityException {
job.setNumReduceTasks(Math.min(numDays, 100)); // Cap the number of reducers at 100, just in case we have a large day range (shouldn't really happen
// though)
job.setReducerClass(MetricsDailySummaryReducer.class);
job.setOutputFormatClass(AccumuloOutputFormat.class);
AccumuloOutputFormat.setZooKeeperInstance(job, ClientConfiguration.loadDefault().withInstance(instance).withZkHosts(zookeepers));
AccumuloOutputFormat.setConnectorInfo(job, userName, new PasswordToken(password));
AccumuloOutputFormat.setCreateTables(job, true);
AccumuloOutputFormat.setDefaultTableName(job, outputTable);
}
public static void runJob(Configuration conf,
Path userLogsPath,
Path usersPath,
Path outputPath)
throws Exception {
FileSystem fs = usersPath.getFileSystem(conf);
FileStatus usersStatus = fs.getFileStatus(usersPath);
if (usersStatus.isDir()) {
for (FileStatus f : fs.listStatus(usersPath)) {
if (f.getPath().getName().startsWith("part")) {
DistributedCache.addCacheFile(f.getPath().toUri(), conf);
}
}
} else {
DistributedCache.addCacheFile(usersPath.toUri(), conf);
}
Job job = new Job(conf);
job.setJarByClass(FinalJoinJob.class);
job.setMapperClass(GenericReplicatedJoin.class);
job.setNumReduceTasks(0);
job.setInputFormatClass(KeyValueTextInputFormat.class);
outputPath.getFileSystem(conf).delete(outputPath, true);
FileInputFormat.setInputPaths(job, userLogsPath);
FileOutputFormat.setOutputPath(job, outputPath);
if (!job.waitForCompletion(true)) {
throw new Exception("Job failed");
}
}
/**
* creates and submits a job, updates file index and job index
*/
private void startJob(String jobName, Set<String> lostFiles, Priority priority, long detectTime)
throws IOException, InterruptedException, ClassNotFoundException {
Path inDir = new Path(JOB_NAME_PREFIX + "/in/" + jobName);
Path outDir = new Path(JOB_NAME_PREFIX + "/out/" + jobName);
List<String> filesInJob = createInputFile(
jobName, inDir, lostFiles);
if (filesInJob.isEmpty()) return;
Configuration jobConf = new Configuration(getConf());
RaidUtils.parseAndSetOptions(jobConf, priority.configOption);
Job job = new Job(jobConf, jobName);
job.getConfiguration().set(CORRUPT_FILE_DETECT_TIME, Long.toString(detectTime));
configureJob(job, this.RECONSTRUCTOR_CLASS);
job.setJarByClass(getClass());
job.setMapperClass(ReconstructionMapper.class);
job.setNumReduceTasks(0);
job.setInputFormatClass(ReconstructionInputFormat.class);
job.setOutputFormatClass(SequenceFileOutputFormat.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Text.class);
ReconstructionInputFormat.setInputPaths(job, inDir);
SequenceFileOutputFormat.setOutputPath(job, outDir);
submitJob(job, filesInJob, priority);
List<LostFileInfo> fileInfos =
updateFileIndex(jobName, filesInJob, priority);
// The implementation of submitJob() need not update jobIndex.
// So check if the job exists in jobIndex before updating jobInfos.
if (jobIndex.containsKey(job)) {
jobIndex.put(job, fileInfos);
}
numJobsRunning++;
}
@Test
public void testMapReduceJob() throws Exception {
Configuration conf = new Configuration();
conf.set(CRAMInputFormat.REFERENCE_SOURCE_PATH_PROPERTY, reference);
FileSystem fileSystem = FileSystem.get(conf);
Path inputPath = new Path(input);
Path outputPath = fileSystem.makeQualified(new Path("target/out"));
fileSystem.delete(outputPath, true);
Job job = Job.getInstance(conf);
FileInputFormat.setInputPaths(job, inputPath);
job.setInputFormatClass(CRAMInputFormat.class);
job.setOutputKeyClass(LongWritable.class);
job.setOutputValueClass(SAMRecordWritable.class);
job.setNumReduceTasks(0);
FileOutputFormat.setOutputPath(job, outputPath);
boolean success = job.waitForCompletion(true);
assertTrue(success);
List<String> samStrings = new ArrayList<String>();
SamReader samReader = SamReaderFactory.makeDefault()
.referenceSequence(new File(URI.create(reference))).open(new File(input));
for (SAMRecord r : samReader) {
samStrings.add(r.getSAMString().trim());
}
File outputFile = new File(new File(outputPath.toUri()), "part-m-00000");
BufferedReader br = new BufferedReader(new FileReader(outputFile));
String line;
int index = 0;
while ((line = br.readLine()) != null) {
String value = line.substring(line.indexOf("\t") + 1); // ignore key
assertEquals(samStrings.get(index++), value);
}
br.close();
}
private boolean multiplyColumns(Path outPathInit, Path outPathColumnMult) throws IOException, ClassNotFoundException, InterruptedException
{
boolean success;
Job columnMultJob = Job.getInstance(conf, "pir_columnMult");
columnMultJob.setSpeculativeExecution(false);
String columnMultJobName = "pir_columnMult";
// Set the same job configs as for the first iteration
columnMultJob.getConfiguration().set("mapreduce.map.memory.mb", SystemConfiguration.getProperty("mapreduce.map.memory.mb", "2000"));
columnMultJob.getConfiguration().set("mapreduce.reduce.memory.mb", SystemConfiguration.getProperty("mapreduce.reduce.memory.mb", "2000"));
columnMultJob.getConfiguration().set("mapreduce.map.java.opts", SystemConfiguration.getProperty("mapreduce.map.java.opts", "-Xmx1800m"));
columnMultJob.getConfiguration().set("mapreduce.reduce.java.opts", SystemConfiguration.getProperty("mapreduce.reduce.java.opts", "-Xmx1800m"));
columnMultJob.getConfiguration().set("mapreduce.map.speculative", "false");
columnMultJob.getConfiguration().set("mapreduce.reduce.speculative", "false");
columnMultJob.getConfiguration().set("pirMR.queryInputDir", SystemConfiguration.getProperty("pir.queryInput"));
columnMultJob.setJobName(columnMultJobName);
columnMultJob.setJarByClass(ColumnMultMapper.class);
columnMultJob.setNumReduceTasks(numReduceTasks);
// Set the Mapper, InputFormat, and input path
columnMultJob.setMapperClass(ColumnMultMapper.class);
columnMultJob.setInputFormatClass(TextInputFormat.class);
FileStatus[] status = fs.listStatus(outPathInit);
for (FileStatus fstat : status)
{
if (fstat.getPath().getName().startsWith(FileConst.PIR))
{
logger.info("fstat.getPath() = " + fstat.getPath().toString());
FileInputFormat.addInputPath(columnMultJob, fstat.getPath());
}
}
columnMultJob.setMapOutputKeyClass(LongWritable.class);
columnMultJob.setMapOutputValueClass(Text.class);
// Set the reducer and output options
columnMultJob.setReducerClass(ColumnMultReducer.class);
columnMultJob.setOutputKeyClass(LongWritable.class);
columnMultJob.setOutputValueClass(Text.class);
columnMultJob.getConfiguration().set("mapreduce.output.textoutputformat.separator", ",");
// Delete the output file, if it exists
if (fs.exists(outPathColumnMult))
{
fs.delete(outPathColumnMult, true);
}
FileOutputFormat.setOutputPath(columnMultJob, outPathColumnMult);
MultipleOutputs.addNamedOutput(columnMultJob, FileConst.PIR_COLS, TextOutputFormat.class, LongWritable.class, Text.class);
// Submit job, wait for completion
success = columnMultJob.waitForCompletion(true);
return success;
}
public static void main( String[] args ) throws IOException, ClassNotFoundException, InterruptedException
{
Configuration conf = new Configuration();
String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();
StringBuilder sb = new StringBuilder();
for (int j = 2; j < otherArgs.length; j++) {
sb.append(otherArgs[j]);
}
LOGGER.debug("Arguments[ " + otherArgs.length+"]"+"and values respectively ["+otherArgs[0]+"], "+
otherArgs[1]+", ["+otherArgs[2]+"]"+", ["+otherArgs[3]+"],"+
otherArgs[4]);
String inputpath = otherArgs[0];
String outputpath = "/tmp/jumbune/dvjsonreport"+ new Date().getTime();
String json = otherArgs[1];
String nullCondition = otherArgs[2];
String regex = otherArgs[3];
String dvDir = otherArgs[4];
if(regex.isEmpty()){
conf.set(JsonDataVaildationConstants.REGEX_ARGUMENT, "");
}else{
conf.set(JsonDataVaildationConstants.REGEX_ARGUMENT, regex);
}
if(nullCondition.isEmpty()){
conf.set(JsonDataVaildationConstants.NULL_ARGUMENT, "");
}else{
conf.set(JsonDataVaildationConstants.NULL_ARGUMENT, nullCondition);
}
conf.set(JsonDataVaildationConstants.SLAVE_DIR, dvDir);
conf.set(JsonDataVaildationConstants.JSON_ARGUMENT, json);
FileSystem fs = FileSystem.get(conf);
@SuppressWarnings("deprecation")
Job job = new Job(conf, "JSONDataValidation");
job.setJarByClass(JsonDataValidationExecutor.class);
job.setInputFormatClass(JsonFileInputFormat.class);
job.setMapperClass(JsonDataValidationMapper.class);
job.setPartitionerClass(JsonDataValidationPartitioner.class);
job.setReducerClass(JsonDataValidationReducer.class);
job.setNumReduceTasks(5);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(FileKeyViolationBean.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(TotalReducerViolationBean.class);
job.setOutputFormatClass(SequenceFileOutputFormat.class);
Path[] inputPaths = FileUtil.getAllJsonNestedFilePath(job, inputpath);
FileInputFormat.setInputPaths(job, inputPaths);
FileOutputFormat.setOutputPath(job, new Path(outputpath));
if(fs.exists(new Path(outputpath)))
{
fs.delete(new Path(outputpath), true);
}
job.waitForCompletion(true);
Map<String, JsonViolationReport> jsonMap = readDataFromHdfs(conf,outputpath);
final Gson gson= new Gson();
final String jsonReport = gson.toJson(jsonMap);
LOGGER.info("Completed DataValidation");
LOGGER.info(JsonDataVaildationConstants.JSON_DV_REPORT + jsonReport);
}
private void runSecondarySort(Configuration conf) throws IOException,
InterruptedException,
ClassNotFoundException {
FileSystem localFs = FileSystem.getLocal(conf);
localFs.delete(new Path(TEST_ROOT_DIR + "/in"), true);
localFs.delete(new Path(TEST_ROOT_DIR + "/out"), true);
TestMapReduceLocal.writeFile
("in/part1", "-1 -4\n-3 23\n5 10\n-1 -2\n-1 300\n-1 10\n4 1\n" +
"4 2\n4 10\n4 -1\n4 -10\n10 20\n10 30\n10 25\n");
Job job = new Job(conf, "word count");
job.setJarByClass(WordCount.class);
job.setNumReduceTasks(2);
job.setMapperClass(SecondarySort.MapClass.class);
job.setReducerClass(SecondarySort.Reduce.class);
// group and partition by the first int in the pair
job.setPartitionerClass(FirstPartitioner.class);
job.setGroupingComparatorClass(FirstGroupingComparator.class);
// the map output is IntPair, IntWritable
job.setMapOutputKeyClass(IntPair.class);
job.setMapOutputValueClass(IntWritable.class);
// the reduce output is Text, IntWritable
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
FileInputFormat.addInputPath(job, new Path(TEST_ROOT_DIR + "/in"));
FileOutputFormat.setOutputPath(job, new Path(TEST_ROOT_DIR + "/out"));
assertTrue(job.waitForCompletion(true));
String out = TestMapReduceLocal.readFile("out/part-r-00000");
assertEquals("------------------------------------------------\n" +
"4\t-10\n4\t-1\n4\t1\n4\t2\n4\t10\n" +
"------------------------------------------------\n" +
"10\t20\n10\t25\n10\t30\n", out);
out = TestMapReduceLocal.readFile("out/part-r-00001");
assertEquals("------------------------------------------------\n" +
"-3\t23\n" +
"------------------------------------------------\n" +
"-1\t-4\n-1\t-2\n-1\t10\n-1\t300\n" +
"------------------------------------------------\n" +
"5\t10\n", out);
}
public int run(String[] args) throws Exception {
GfxdDataSerializable.initTypes();
Configuration conf = getConf();
Path outputPath = new Path(args[0]);
Path intermediateOutputPath = new Path(args[0] + "_int");
String hdfsHomeDir = args[1];
String tableName = args[2];
outputPath.getFileSystem(conf).delete(outputPath, true);
intermediateOutputPath.getFileSystem(conf).delete(intermediateOutputPath, true);
conf.set(RowInputFormat.HOME_DIR, hdfsHomeDir);
conf.set(RowInputFormat.INPUT_TABLE, tableName);
conf.setBoolean(RowInputFormat.CHECKPOINT_MODE, false);
Job job = Job.getInstance(conf, "Busy Airport Count");
job.setInputFormatClass(RowInputFormat.class);
// configure mapper and reducer
job.setMapperClass(SampleMapper.class);
job.setCombinerClass(IntSumReducer.class);
job.setReducerClass(IntSumReducer.class);
// Only have one reduce task so that all of the results from mapping are
// processed in one place.
job.setNumReduceTasks(1);
// configure output
TextOutputFormat.setOutputPath(job, intermediateOutputPath);
job.setOutputFormatClass(TextOutputFormat.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
int rc = job.waitForCompletion(true) ? 0 : 1;
if (rc == 0) {
Job topJob = Job.getInstance(getConf(), "Top Busy Airport");
// We want the task to run on a single VM
topJob.setNumReduceTasks(1);
// Set the inputs
topJob.setInputFormatClass(TextInputFormat.class);
TextInputFormat.addInputPath(topJob, intermediateOutputPath);
// Set the mapper and reducer
topJob.setMapperClass(TopBusyAirportMapper.class);
topJob.setReducerClass(TopBusyAirportReducer.class);
// Set the outputs
TextOutputFormat.setOutputPath(topJob, outputPath);
topJob.setOutputFormatClass(TextOutputFormat.class);
topJob.setOutputKeyClass(Text.class);
topJob.setOutputValueClass(IntWritable.class);
topJob.setMapOutputKeyClass(Text.class);
topJob.setMapOutputValueClass(StringIntPair.class);
rc = topJob.waitForCompletion(true) ? 0 : 1;
}
return rc;
}