org.apache.hadoop.mapreduce.Job#setNumReduceTasks ( )源码实例Demo

下面列出了org.apache.hadoop.mapreduce.Job#setNumReduceTasks ( ) 实例代码,或者点击链接到github查看源代码,也可以在右侧发表评论。

源代码1 项目: big-c   文件: TeraChecksum.java
public int run(String[] args) throws Exception {
  Job job = Job.getInstance(getConf());
  if (args.length != 2) {
    usage();
    return 2;
  }
  TeraInputFormat.setInputPaths(job, new Path(args[0]));
  FileOutputFormat.setOutputPath(job, new Path(args[1]));
  job.setJobName("TeraSum");
  job.setJarByClass(TeraChecksum.class);
  job.setMapperClass(ChecksumMapper.class);
  job.setReducerClass(ChecksumReducer.class);
  job.setOutputKeyClass(NullWritable.class);
  job.setOutputValueClass(Unsigned16.class);
  // force a single reducer
  job.setNumReduceTasks(1);
  job.setInputFormatClass(TeraInputFormat.class);
  return job.waitForCompletion(true) ? 0 : 1;
}
 
源代码2 项目: RDFS   文件: TestMiniCoronaFederatedJT.java
public void testOneRemoteJT() throws Exception {
  LOG.info("Starting testOneRemoteJT");
  String[] racks = "/rack-1".split(",");
  String[] trackers = "tracker-1".split(",");
  corona = new MiniCoronaCluster.Builder().numTaskTrackers(1).racks(racks)
      .hosts(trackers).build();
  Configuration conf = corona.createJobConf();
  conf.set("mapred.job.tracker", "corona");
  conf.set("mapred.job.tracker.class", CoronaJobTracker.class.getName());
  String locationsCsv = "tracker-1";
  conf.set("test.locations", locationsCsv);
  conf.setBoolean("mapred.coronajobtracker.forceremote", true);
  Job job = new Job(conf);
  job.setMapperClass(TstJob.TestMapper.class);
  job.setInputFormatClass(TstJob.TestInputFormat.class);
  job.setOutputFormatClass(NullOutputFormat.class);
  job.setNumReduceTasks(0);
  job.getConfiguration().set("io.sort.record.pct", "0.50");
  job.getConfiguration().set("io.sort.mb", "25");
  boolean success = job.waitForCompletion(true);
  assertTrue("Job did not succeed", success);
}
 
源代码3 项目: hbase   文件: TestTableInputFormat.java
void testInputFormat(Class<? extends InputFormat> clazz)
    throws IOException, InterruptedException, ClassNotFoundException {
  final Job job = MapreduceTestingShim.createJob(UTIL.getConfiguration());
  job.setInputFormatClass(clazz);
  job.setOutputFormatClass(NullOutputFormat.class);
  job.setMapperClass(ExampleVerifier.class);
  job.setNumReduceTasks(0);

  LOG.debug("submitting job.");
  assertTrue("job failed!", job.waitForCompletion(true));
  assertEquals("Saw the wrong number of instances of the filtered-for row.", 2, job.getCounters()
      .findCounter(TestTableInputFormat.class.getName() + ":row", "aaa").getValue());
  assertEquals("Saw any instances of the filtered out row.", 0, job.getCounters()
      .findCounter(TestTableInputFormat.class.getName() + ":row", "bbb").getValue());
  assertEquals("Saw the wrong number of instances of columnA.", 1, job.getCounters()
      .findCounter(TestTableInputFormat.class.getName() + ":family", "columnA").getValue());
  assertEquals("Saw the wrong number of instances of columnB.", 1, job.getCounters()
      .findCounter(TestTableInputFormat.class.getName() + ":family", "columnB").getValue());
  assertEquals("Saw the wrong count of values for the filtered-for row.", 2, job.getCounters()
      .findCounter(TestTableInputFormat.class.getName() + ":value", "value aaa").getValue());
  assertEquals("Saw the wrong count of values for the filtered-out row.", 0, job.getCounters()
      .findCounter(TestTableInputFormat.class.getName() + ":value", "value bbb").getValue());
}
 
源代码4 项目: titan1withtp3.1   文件: CassandraScanJobIT.java
private Job getVertexJobWithDefaultMapper(org.apache.hadoop.conf.Configuration c) throws IOException {

        Job job = Job.getInstance(c);

        job.setJarByClass(HadoopScanMapper.class);
        job.setJobName("testPartitionedVertexScan");
        job.setOutputKeyClass(NullWritable.class);
        job.setOutputValueClass(NullWritable.class);
        job.setMapOutputKeyClass(NullWritable.class);
        job.setMapOutputValueClass(NullWritable.class);
        job.setNumReduceTasks(0);
        job.setOutputFormatClass(NullOutputFormat.class);
        job.setInputFormatClass(CassandraInputFormat.class);

        return job;
    }
 
源代码5 项目: BigData-In-Practice   文件: LeftJoin.java
@Override
public int run(String[] args) throws Exception {
    Configuration conf = getConf();
    GenericOptionsParser optionparser = new GenericOptionsParser(conf, args);
    conf = optionparser.getConfiguration();

    Job job = Job.getInstance(conf, "leftjoin");
    job.setJarByClass(LeftJoin.class);
    FileInputFormat.addInputPaths(job, conf.get("input_dir"));
    Path out = new Path(conf.get("output_dir"));
    FileOutputFormat.setOutputPath(job, out);
    job.setNumReduceTasks(conf.getInt("reduce_num", 1));

    job.setMapperClass(LeftJoinMapper.class);
    job.setReducerClass(LeftJoinReduce.class);
    job.setInputFormatClass(TextInputFormat.class);
    job.setOutputFormatClass(TextOutputFormat.class);

    job.setMapOutputKeyClass(Text.class);
    job.setMapOutputValueClass(Text.class);
    job.setOutputKeyClass(Text.class);
    job.setOutputValueClass(Text.class);
    conf.set("mapred.textoutputformat.separator", ",");

    return (job.waitForCompletion(true) ? 0 : 1);
}
 
源代码6 项目: hbase   文件: IntegrationTestLoadAndVerify.java
protected void doVerify(Configuration conf, TableDescriptor tableDescriptor) throws Exception {
  Path outputDir = getTestDir(TEST_NAME, "verify-output");
  LOG.info("Verify output dir: " + outputDir);

  Job job = Job.getInstance(conf);
  job.setJarByClass(this.getClass());
  job.setJobName(TEST_NAME + " Verification for " + tableDescriptor.getTableName());
  setJobScannerConf(job);

  Scan scan = new Scan();

  TableMapReduceUtil.initTableMapperJob(
      tableDescriptor.getTableName().getNameAsString(), scan, VerifyMapper.class,
      BytesWritable.class, BytesWritable.class, job);
  TableMapReduceUtil.addDependencyJarsForClasses(job.getConfiguration(), AbstractHBaseTool.class);
  int scannerCaching = conf.getInt("verify.scannercaching", SCANNER_CACHING);
  TableMapReduceUtil.setScannerCaching(job, scannerCaching);

  job.setReducerClass(VerifyReducer.class);
  job.setNumReduceTasks(conf.getInt(NUM_REDUCE_TASKS_KEY, NUM_REDUCE_TASKS_DEFAULT));
  FileOutputFormat.setOutputPath(job, outputDir);
  assertTrue(job.waitForCompletion(true));

  long numOutputRecords = job.getCounters().findCounter(Counters.ROWS_WRITTEN).getValue();
  assertEquals(0, numOutputRecords);
}
 
源代码7 项目: Hadoop-BAM   文件: TestBAMOutputFormat.java
private Path doMapReduce(final String inputFile) throws Exception {
    final FileSystem fileSystem = FileSystem.get(conf);
    final Path inputPath = new Path(inputFile);
    final Path outputPath = fileSystem.makeQualified(new Path("target/out"));
    fileSystem.delete(outputPath, true);

    final Job job = Job.getInstance(conf);
    FileInputFormat.setInputPaths(job, inputPath);

    conf.set(BAMTestNoHeaderOutputFormat.READ_HEADER_FROM_FILE, inputFile);
    job.setInputFormatClass(BAMInputFormat.class);
    job.setMapOutputKeyClass(LongWritable.class);
    job.setMapOutputValueClass(SAMRecordWritable.class);

    job.setOutputFormatClass(BAMTestNoHeaderOutputFormat.class);
    job.setOutputKeyClass(LongWritable.class);
    job.setOutputValueClass(SAMRecordWritable.class);

    job.setNumReduceTasks(0);
    FileOutputFormat.setOutputPath(job, outputPath);

    final boolean success = job.waitForCompletion(true);
    assertTrue(success);

    return outputPath;
}
 
源代码8 项目: hbase   文件: IntegrationTestLoadAndVerify.java
protected Job doLoad(Configuration conf, TableDescriptor tableDescriptor) throws Exception {
  Path outputDir = getTestDir(TEST_NAME, "load-output");
  LOG.info("Load output dir: " + outputDir);

  NMapInputFormat.setNumMapTasks(conf, conf.getInt(NUM_MAP_TASKS_KEY, NUM_MAP_TASKS_DEFAULT));
  conf.set(TABLE_NAME_KEY, tableDescriptor.getTableName().getNameAsString());

  Job job = Job.getInstance(conf);
  job.setJobName(TEST_NAME + " Load for " + tableDescriptor.getTableName());
  job.setJarByClass(this.getClass());
  setMapperClass(job);
  job.setInputFormatClass(NMapInputFormat.class);
  job.setNumReduceTasks(0);
  setJobScannerConf(job);
  FileOutputFormat.setOutputPath(job, outputDir);

  TableMapReduceUtil.addDependencyJars(job);

  TableMapReduceUtil.addDependencyJarsForClasses(job.getConfiguration(), AbstractHBaseTool.class);
  TableMapReduceUtil.initCredentials(job);
  assertTrue(job.waitForCompletion(true));
  return job;
}
 
源代码9 项目: Hadoop-BAM   文件: TestVCFRoundTrip.java
private Path doMapReduce(final Path inputPath, final boolean writeHeader)
    throws Exception {
    final FileSystem fileSystem = FileSystem.get(conf);
    final Path outputPath = fileSystem.makeQualified(new Path("target/out"));
    fileSystem.delete(outputPath, true);

    final Job job = Job.getInstance(conf);
    FileInputFormat.setInputPaths(job, inputPath);

    job.setInputFormatClass(VCFInputFormat.class);
    job.setMapOutputKeyClass(LongWritable.class);
    job.setMapOutputValueClass(VariantContextWritable.class);

    job.setOutputFormatClass(writeHeader ? VCFTestWithHeaderOutputFormat.class :
        VCFTestNoHeaderOutputFormat.class);
    job.setOutputKeyClass(LongWritable.class);
    job.setOutputValueClass(VariantContextWritable.class);

    job.setNumReduceTasks(0);
    FileOutputFormat.setOutputPath(job, outputPath);
    if (codecClass != null) {
        FileOutputFormat.setOutputCompressorClass(job, codecClass);
    }

    final boolean success = job.waitForCompletion(true);
    assertTrue(success);

    return outputPath;
}
 
源代码10 项目: ecosys   文件: Hdfs2Tg.java
public static void main(String[] args) throws Exception {
    Configuration conf = new Configuration();
    Job job = Job.getInstance(conf, "HDFS to TG");
    job.setJarByClass(Hdfs2Tg.class);
    job.setMapperClass(LineMapper.class);
    job.setNumReduceTasks(0);
    job.setOutputKeyClass(Text.class);
    job.setOutputValueClass(IntWritable.class);
    FileInputFormat.addInputPath(job, new Path(args[0]));
    FileOutputFormat.setOutputPath(job, new Path(args[1]));
    System.exit(job.waitForCompletion(true) ? 0 : 1);
}
 
源代码11 项目: hbase   文件: IntegrationTestBigLinkedList.java
public int runRandomInputGenerator(int numMappers, long numNodes, Path tmpOutput,
    Integer width, Integer wrapMultiplier, Integer numWalkers)
    throws Exception {
  LOG.info("Running RandomInputGenerator with numMappers=" + numMappers
      + ", numNodes=" + numNodes);
  Job job = Job.getInstance(getConf());

  job.setJobName("Random Input Generator");
  job.setNumReduceTasks(0);
  job.setJarByClass(getClass());

  job.setInputFormatClass(GeneratorInputFormat.class);
  job.setOutputKeyClass(BytesWritable.class);
  job.setOutputValueClass(NullWritable.class);

  setJobConf(job, numMappers, numNodes, width, wrapMultiplier, numWalkers);

  job.setMapperClass(Mapper.class); //identity mapper

  FileOutputFormat.setOutputPath(job, tmpOutput);
  job.setOutputFormatClass(SequenceFileOutputFormat.class);
  TableMapReduceUtil.addDependencyJarsForClasses(job.getConfiguration(), Random64.class);

  boolean success = jobCompletion(job);

  return success ? 0 : 1;
}
 
源代码12 项目: accumulo-examples   文件: TableToFile.java
public static void main(String[] args) throws Exception {
  Opts opts = new Opts();
  opts.parseArgs(TableToFile.class.getName(), args);

  List<IteratorSetting.Column> columnsToFetch = new ArrayList<>();
  for (String col : opts.columns.split(",")) {
    int idx = col.indexOf(":");
    String cf = idx < 0 ? col : col.substring(0, idx);
    String cq = idx < 0 ? null : col.substring(idx + 1);
    if (!cf.isEmpty())
      columnsToFetch.add(new IteratorSetting.Column(cf, cq));
  }

  Job job = Job.getInstance(opts.getHadoopConfig());
  job.setJobName(TableToFile.class.getSimpleName() + "_" + System.currentTimeMillis());
  job.setJarByClass(TableToFile.class);
  job.setInputFormatClass(AccumuloInputFormat.class);
  InputFormatBuilder.InputFormatOptions<Job> inputOpts = AccumuloInputFormat.configure()
      .clientProperties(opts.getClientProperties()).table(opts.tableName);
  if (!columnsToFetch.isEmpty()) {
    inputOpts.fetchColumns(columnsToFetch);
  }
  inputOpts.store(job);
  job.setMapperClass(TTFMapper.class);
  job.setMapOutputKeyClass(NullWritable.class);
  job.setMapOutputValueClass(Text.class);
  job.setNumReduceTasks(0);
  job.setOutputFormatClass(TextOutputFormat.class);
  TextOutputFormat.setOutputPath(job, new Path(opts.output));

  System.exit(job.waitForCompletion(true) ? 0 : 1);
}
 
源代码13 项目: datawave   文件: MetricsDailySummaryReducer.java
public static void configureJob(Job job, int numDays, String instance, String zookeepers, String userName, String password, String outputTable)
                throws AccumuloSecurityException {
    job.setNumReduceTasks(Math.min(numDays, 100)); // Cap the number of reducers at 100, just in case we have a large day range (shouldn't really happen
                                                   // though)
    job.setReducerClass(MetricsDailySummaryReducer.class);
    job.setOutputFormatClass(AccumuloOutputFormat.class);
    AccumuloOutputFormat.setZooKeeperInstance(job, ClientConfiguration.loadDefault().withInstance(instance).withZkHosts(zookeepers));
    AccumuloOutputFormat.setConnectorInfo(job, userName, new PasswordToken(password));
    AccumuloOutputFormat.setCreateTables(job, true);
    AccumuloOutputFormat.setDefaultTableName(job, outputTable);
}
 
源代码14 项目: hiped2   文件: FinalJoinJob.java
public static void runJob(Configuration conf,
                          Path userLogsPath,
                          Path usersPath,
                          Path outputPath)
    throws Exception {

  FileSystem fs = usersPath.getFileSystem(conf);

  FileStatus usersStatus = fs.getFileStatus(usersPath);

  if (usersStatus.isDir()) {
    for (FileStatus f : fs.listStatus(usersPath)) {
      if (f.getPath().getName().startsWith("part")) {
        DistributedCache.addCacheFile(f.getPath().toUri(), conf);
      }
    }
  } else {
    DistributedCache.addCacheFile(usersPath.toUri(), conf);
  }

  Job job = new Job(conf);

  job.setJarByClass(FinalJoinJob.class);
  job.setMapperClass(GenericReplicatedJoin.class);

  job.setNumReduceTasks(0);

  job.setInputFormatClass(KeyValueTextInputFormat.class);

  outputPath.getFileSystem(conf).delete(outputPath, true);

  FileInputFormat.setInputPaths(job, userLogsPath);
  FileOutputFormat.setOutputPath(job, outputPath);

  if (!job.waitForCompletion(true)) {
    throw new Exception("Job failed");
  }
}
 
源代码15 项目: RDFS   文件: DistBlockIntegrityMonitor.java
/**
 * creates and submits a job, updates file index and job index
 */
private void startJob(String jobName, Set<String> lostFiles, Priority priority, long detectTime)
throws IOException, InterruptedException, ClassNotFoundException {
  Path inDir = new Path(JOB_NAME_PREFIX + "/in/" + jobName);
  Path outDir = new Path(JOB_NAME_PREFIX + "/out/" + jobName);
  List<String> filesInJob = createInputFile(
      jobName, inDir, lostFiles);
  if (filesInJob.isEmpty()) return;

  Configuration jobConf = new Configuration(getConf());
  RaidUtils.parseAndSetOptions(jobConf, priority.configOption);
  Job job = new Job(jobConf, jobName);
  job.getConfiguration().set(CORRUPT_FILE_DETECT_TIME, Long.toString(detectTime));
  configureJob(job, this.RECONSTRUCTOR_CLASS);
  job.setJarByClass(getClass());
  job.setMapperClass(ReconstructionMapper.class);
  job.setNumReduceTasks(0);
  job.setInputFormatClass(ReconstructionInputFormat.class);
  job.setOutputFormatClass(SequenceFileOutputFormat.class);
  job.setOutputKeyClass(Text.class);
  job.setOutputValueClass(Text.class);

  ReconstructionInputFormat.setInputPaths(job, inDir);
  SequenceFileOutputFormat.setOutputPath(job, outDir);
  

  submitJob(job, filesInJob, priority);
  List<LostFileInfo> fileInfos =
    updateFileIndex(jobName, filesInJob, priority);
  // The implementation of submitJob() need not update jobIndex.
  // So check if the job exists in jobIndex before updating jobInfos.
  if (jobIndex.containsKey(job)) {
    jobIndex.put(job, fileInfos);
  }
  numJobsRunning++;
}
 
源代码16 项目: Hadoop-BAM   文件: TestCRAMInputFormat.java
@Test
public void testMapReduceJob() throws Exception {
  Configuration conf = new Configuration();
  conf.set(CRAMInputFormat.REFERENCE_SOURCE_PATH_PROPERTY, reference);

  FileSystem fileSystem = FileSystem.get(conf);
  Path inputPath = new Path(input);
  Path outputPath = fileSystem.makeQualified(new Path("target/out"));
  fileSystem.delete(outputPath, true);

  Job job = Job.getInstance(conf);
  FileInputFormat.setInputPaths(job, inputPath);
  job.setInputFormatClass(CRAMInputFormat.class);
  job.setOutputKeyClass(LongWritable.class);
  job.setOutputValueClass(SAMRecordWritable.class);
  job.setNumReduceTasks(0);
  FileOutputFormat.setOutputPath(job, outputPath);

  boolean success = job.waitForCompletion(true);
  assertTrue(success);

  List<String> samStrings = new ArrayList<String>();
  SamReader samReader = SamReaderFactory.makeDefault()
      .referenceSequence(new File(URI.create(reference))).open(new File(input));
  for (SAMRecord r : samReader) {
    samStrings.add(r.getSAMString().trim());
  }

  File outputFile = new File(new File(outputPath.toUri()), "part-m-00000");
  BufferedReader br = new BufferedReader(new FileReader(outputFile));
  String line;
  int index = 0;
  while ((line = br.readLine()) != null) {
    String value = line.substring(line.indexOf("\t") + 1); // ignore key
    assertEquals(samStrings.get(index++), value);
  }
  br.close();
}
 
private boolean multiplyColumns(Path outPathInit, Path outPathColumnMult) throws IOException, ClassNotFoundException, InterruptedException
{
  boolean success;

  Job columnMultJob = Job.getInstance(conf, "pir_columnMult");
  columnMultJob.setSpeculativeExecution(false);

  String columnMultJobName = "pir_columnMult";

  // Set the same job configs as for the first iteration
  columnMultJob.getConfiguration().set("mapreduce.map.memory.mb", SystemConfiguration.getProperty("mapreduce.map.memory.mb", "2000"));
  columnMultJob.getConfiguration().set("mapreduce.reduce.memory.mb", SystemConfiguration.getProperty("mapreduce.reduce.memory.mb", "2000"));
  columnMultJob.getConfiguration().set("mapreduce.map.java.opts", SystemConfiguration.getProperty("mapreduce.map.java.opts", "-Xmx1800m"));
  columnMultJob.getConfiguration().set("mapreduce.reduce.java.opts", SystemConfiguration.getProperty("mapreduce.reduce.java.opts", "-Xmx1800m"));

  columnMultJob.getConfiguration().set("mapreduce.map.speculative", "false");
  columnMultJob.getConfiguration().set("mapreduce.reduce.speculative", "false");
  columnMultJob.getConfiguration().set("pirMR.queryInputDir", SystemConfiguration.getProperty("pir.queryInput"));

  columnMultJob.setJobName(columnMultJobName);
  columnMultJob.setJarByClass(ColumnMultMapper.class);
  columnMultJob.setNumReduceTasks(numReduceTasks);

  // Set the Mapper, InputFormat, and input path
  columnMultJob.setMapperClass(ColumnMultMapper.class);
  columnMultJob.setInputFormatClass(TextInputFormat.class);

  FileStatus[] status = fs.listStatus(outPathInit);
  for (FileStatus fstat : status)
  {
    if (fstat.getPath().getName().startsWith(FileConst.PIR))
    {
      logger.info("fstat.getPath() = " + fstat.getPath().toString());
      FileInputFormat.addInputPath(columnMultJob, fstat.getPath());
    }
  }
  columnMultJob.setMapOutputKeyClass(LongWritable.class);
  columnMultJob.setMapOutputValueClass(Text.class);

  // Set the reducer and output options
  columnMultJob.setReducerClass(ColumnMultReducer.class);
  columnMultJob.setOutputKeyClass(LongWritable.class);
  columnMultJob.setOutputValueClass(Text.class);
  columnMultJob.getConfiguration().set("mapreduce.output.textoutputformat.separator", ",");

  // Delete the output file, if it exists
  if (fs.exists(outPathColumnMult))
  {
    fs.delete(outPathColumnMult, true);
  }
  FileOutputFormat.setOutputPath(columnMultJob, outPathColumnMult);

  MultipleOutputs.addNamedOutput(columnMultJob, FileConst.PIR_COLS, TextOutputFormat.class, LongWritable.class, Text.class);

  // Submit job, wait for completion
  success = columnMultJob.waitForCompletion(true);

  return success;
}
 
源代码18 项目: jumbune   文件: JsonDataValidationExecutor.java
public static void main( String[] args ) throws IOException, ClassNotFoundException, InterruptedException
  {
  	Configuration conf = new Configuration();	
  	String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();
StringBuilder sb = new StringBuilder();
  	for (int j = 2; j < otherArgs.length; j++) {
	
  		sb.append(otherArgs[j]);
}
  	
  	LOGGER.debug("Arguments[ " + otherArgs.length+"]"+"and values respectively ["+otherArgs[0]+"], "+
		otherArgs[1]+", ["+otherArgs[2]+"]"+", ["+otherArgs[3]+"],"+
		otherArgs[4]);

String inputpath = otherArgs[0];
String outputpath = "/tmp/jumbune/dvjsonreport"+  new Date().getTime();

String json = otherArgs[1];
String nullCondition = otherArgs[2];
String regex = otherArgs[3];
String dvDir = otherArgs[4];



if(regex.isEmpty()){
	conf.set(JsonDataVaildationConstants.REGEX_ARGUMENT, "");
}else{
	conf.set(JsonDataVaildationConstants.REGEX_ARGUMENT, regex);
}

if(nullCondition.isEmpty()){
	conf.set(JsonDataVaildationConstants.NULL_ARGUMENT, "");
}else{
	conf.set(JsonDataVaildationConstants.NULL_ARGUMENT, nullCondition);
}


conf.set(JsonDataVaildationConstants.SLAVE_DIR, dvDir);
conf.set(JsonDataVaildationConstants.JSON_ARGUMENT, json);
FileSystem fs = FileSystem.get(conf);

@SuppressWarnings("deprecation")
Job job = new Job(conf, "JSONDataValidation");
job.setJarByClass(JsonDataValidationExecutor.class);

job.setInputFormatClass(JsonFileInputFormat.class);

job.setMapperClass(JsonDataValidationMapper.class);
job.setPartitionerClass(JsonDataValidationPartitioner.class);
job.setReducerClass(JsonDataValidationReducer.class);
job.setNumReduceTasks(5);

job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(FileKeyViolationBean.class);

job.setOutputKeyClass(Text.class);
job.setOutputValueClass(TotalReducerViolationBean.class);
	
job.setOutputFormatClass(SequenceFileOutputFormat.class);

  	Path[] inputPaths = FileUtil.getAllJsonNestedFilePath(job, inputpath);

FileInputFormat.setInputPaths(job, inputPaths);
FileOutputFormat.setOutputPath(job, new Path(outputpath));
		
if(fs.exists(new Path(outputpath)))
{
	fs.delete(new Path(outputpath), true);
}

job.waitForCompletion(true);	

 Map<String, JsonViolationReport> jsonMap = readDataFromHdfs(conf,outputpath);
 final Gson gson= new Gson();
 final String jsonReport = gson.toJson(jsonMap);

 LOGGER.info("Completed DataValidation");
 LOGGER.info(JsonDataVaildationConstants.JSON_DV_REPORT + jsonReport);
  }
 
源代码19 项目: RDFS   文件: TestMiniMRLocalFS.java
private void runSecondarySort(Configuration conf) throws IOException,
                                                      InterruptedException,
                                                      ClassNotFoundException {
  FileSystem localFs = FileSystem.getLocal(conf);
  localFs.delete(new Path(TEST_ROOT_DIR + "/in"), true);
  localFs.delete(new Path(TEST_ROOT_DIR + "/out"), true);
  TestMapReduceLocal.writeFile
           ("in/part1", "-1 -4\n-3 23\n5 10\n-1 -2\n-1 300\n-1 10\n4 1\n" +
            "4 2\n4 10\n4 -1\n4 -10\n10 20\n10 30\n10 25\n");
  Job job = new Job(conf, "word count");
  job.setJarByClass(WordCount.class);
  job.setNumReduceTasks(2);
  job.setMapperClass(SecondarySort.MapClass.class);
  job.setReducerClass(SecondarySort.Reduce.class);
  // group and partition by the first int in the pair
  job.setPartitionerClass(FirstPartitioner.class);
  job.setGroupingComparatorClass(FirstGroupingComparator.class);

  // the map output is IntPair, IntWritable
  job.setMapOutputKeyClass(IntPair.class);
  job.setMapOutputValueClass(IntWritable.class);

  // the reduce output is Text, IntWritable
  job.setOutputKeyClass(Text.class);
  job.setOutputValueClass(IntWritable.class);

  FileInputFormat.addInputPath(job, new Path(TEST_ROOT_DIR + "/in"));
  FileOutputFormat.setOutputPath(job, new Path(TEST_ROOT_DIR + "/out"));
  assertTrue(job.waitForCompletion(true));
  String out = TestMapReduceLocal.readFile("out/part-r-00000");
  assertEquals("------------------------------------------------\n" +
               "4\t-10\n4\t-1\n4\t1\n4\t2\n4\t10\n" +
               "------------------------------------------------\n" +
               "10\t20\n10\t25\n10\t30\n", out);
  out = TestMapReduceLocal.readFile("out/part-r-00001");
  assertEquals("------------------------------------------------\n" +
               "-3\t23\n" +
               "------------------------------------------------\n" +
               "-1\t-4\n-1\t-2\n-1\t10\n-1\t300\n" +
               "------------------------------------------------\n" +
               "5\t10\n", out);
}
 
源代码20 项目: gemfirexd-oss   文件: TopBusyAirport.java
public int run(String[] args) throws Exception {

    GfxdDataSerializable.initTypes();
    Configuration conf = getConf();

    Path outputPath = new Path(args[0]);
    Path intermediateOutputPath = new Path(args[0] + "_int");
    String hdfsHomeDir = args[1];
    String tableName = args[2];

    outputPath.getFileSystem(conf).delete(outputPath, true);
    intermediateOutputPath.getFileSystem(conf).delete(intermediateOutputPath, true);

    conf.set(RowInputFormat.HOME_DIR, hdfsHomeDir);
    conf.set(RowInputFormat.INPUT_TABLE, tableName);
    conf.setBoolean(RowInputFormat.CHECKPOINT_MODE, false);

    Job job = Job.getInstance(conf, "Busy Airport Count");

    job.setInputFormatClass(RowInputFormat.class);

    // configure mapper and reducer
    job.setMapperClass(SampleMapper.class);
    job.setCombinerClass(IntSumReducer.class);
    job.setReducerClass(IntSumReducer.class);

    // Only have one reduce task so that all of the results from mapping are
    // processed in one place.
    job.setNumReduceTasks(1);

    // configure output
    TextOutputFormat.setOutputPath(job, intermediateOutputPath);
    job.setOutputFormatClass(TextOutputFormat.class);
    job.setOutputKeyClass(Text.class);
    job.setOutputValueClass(IntWritable.class);

    int rc = job.waitForCompletion(true) ? 0 : 1;
    if (rc == 0) {
      Job topJob = Job.getInstance(getConf(), "Top Busy Airport");

      // We want the task to run on a single VM
      topJob.setNumReduceTasks(1);

      // Set the inputs
      topJob.setInputFormatClass(TextInputFormat.class);
      TextInputFormat.addInputPath(topJob, intermediateOutputPath);

      // Set the mapper and reducer
      topJob.setMapperClass(TopBusyAirportMapper.class);
      topJob.setReducerClass(TopBusyAirportReducer.class);

      // Set the outputs
      TextOutputFormat.setOutputPath(topJob, outputPath);
      topJob.setOutputFormatClass(TextOutputFormat.class);
      topJob.setOutputKeyClass(Text.class);
      topJob.setOutputValueClass(IntWritable.class);

      topJob.setMapOutputKeyClass(Text.class);
      topJob.setMapOutputValueClass(StringIntPair.class);

      rc = topJob.waitForCompletion(true) ? 0 : 1;
    }
    return rc;
  }