类org.apache.hadoop.mapred.lib.NullOutputFormat源码实例Demo

下面列出了怎么用org.apache.hadoop.mapred.lib.NullOutputFormat的API类实例代码及写法,或者点击链接到github查看源代码。

源代码1 项目: big-c   文件: SleepJob.java
public JobConf setupJobConf(int numMapper, int numReducer, 
                              long mapSleepTime, int mapSleepCount, 
                              long reduceSleepTime, int reduceSleepCount) {
  JobConf job = new JobConf(getConf(), SleepJob.class);
  job.setNumMapTasks(numMapper);
  job.setNumReduceTasks(numReducer);
  job.setMapperClass(SleepJob.class);
  job.setMapOutputKeyClass(IntWritable.class);
  job.setMapOutputValueClass(NullWritable.class);
  job.setReducerClass(SleepJob.class);
  job.setOutputFormat(NullOutputFormat.class);
  job.setInputFormat(SleepInputFormat.class);
  job.setPartitionerClass(SleepJob.class);
  job.setSpeculativeExecution(false);
  job.setJobName("Sleep job");
  FileInputFormat.addInputPath(job, new Path("ignored"));
  job.setLong("sleep.job.map.sleep.time", mapSleepTime);
  job.setLong("sleep.job.reduce.sleep.time", reduceSleepTime);
  job.setInt("sleep.job.map.sleep.count", mapSleepCount);
  job.setInt("sleep.job.reduce.sleep.count", reduceSleepCount);
  return job;
}
 
源代码2 项目: tracing-framework   文件: ReadExistingDataJob.java
public void configure(JobConf job) {
    // Set the mapper and reducers
    job.setMapperClass(ReadDataJob.TestMapper.class);

    // Make sure this jar is included
    job.setJarByClass(ReadDataJob.TestMapper.class);

    // Specify the input and output data formats
    job.setInputFormat(TextInputFormat.class);
    job.setOutputFormat(NullOutputFormat.class);

    // Turn off speculative execution
    job.setMapSpeculativeExecution(false);
    job.setReduceSpeculativeExecution(false);

    // Add the job input path
    FileInputFormat.addInputPath(job, new Path(this.input_path));
}
 
源代码3 项目: tracing-framework   文件: ReadDataJob.java
public void configure(JobConf job) {
    // Set the mapper and reducers
    job.setMapperClass(TestMapper.class);
    // job.setReducerClass(TestReducer.class);

    // Set the output types of the mapper and reducer
    // job.setMapOutputKeyClass(IntWritable.class);
    // job.setMapOutputValueClass(NullWritable.class);
    // job.setOutputKeyClass(NullWritable.class);
    // job.setOutputValueClass(NullWritable.class);

    // Make sure this jar is included
    job.setJarByClass(TestMapper.class);

    // Specify the input and output data formats
    job.setInputFormat(TextInputFormat.class);
    job.setOutputFormat(NullOutputFormat.class);

    // Turn off speculative execution
    job.setMapSpeculativeExecution(false);
    job.setReduceSpeculativeExecution(false);

    // Add the job input path
    FileInputFormat.addInputPath(job, new Path(this.input_filename));
}
 
源代码4 项目: anthelion   文件: SolrDeleteDuplicates.java
public void dedup(String solrUrl, boolean noCommit) throws IOException {
  SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
  long start = System.currentTimeMillis();
  LOG.info("SolrDeleteDuplicates: starting at " + sdf.format(start));
  LOG.info("SolrDeleteDuplicates: Solr url: " + solrUrl);
  
  JobConf job = new NutchJob(getConf());

  job.set(SolrConstants.SERVER_URL, solrUrl);
  job.setBoolean("noCommit", noCommit);
  job.setInputFormat(SolrInputFormat.class);
  job.setOutputFormat(NullOutputFormat.class);
  job.setMapOutputKeyClass(Text.class);
  job.setMapOutputValueClass(SolrRecord.class);
  job.setMapperClass(IdentityMapper.class);
  job.setReducerClass(SolrDeleteDuplicates.class);

  JobClient.runJob(job);

  long end = System.currentTimeMillis();
  LOG.info("SolrDeleteDuplicates: finished at " + sdf.format(end) + ", elapsed: " + TimingUtil.elapsedTime(start, end));
}
 
源代码5 项目: anthelion   文件: SolrClean.java
public void delete(String crawldb, String solrUrl, boolean noCommit) throws IOException {
  SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
  long start = System.currentTimeMillis();
  LOG.info("SolrClean: starting at " + sdf.format(start));

  JobConf job = new NutchJob(getConf());

  FileInputFormat.addInputPath(job, new Path(crawldb, CrawlDb.CURRENT_NAME));
  job.setBoolean("noCommit", noCommit);
  job.set(SolrConstants.SERVER_URL, solrUrl);
  job.setInputFormat(SequenceFileInputFormat.class);
  job.setOutputFormat(NullOutputFormat.class);
  job.setMapOutputKeyClass(ByteWritable.class);
  job.setMapOutputValueClass(Text.class);
  job.setMapperClass(DBFilter.class);
  job.setReducerClass(SolrDeleter.class);

  JobClient.runJob(job);

  long end = System.currentTimeMillis();
  LOG.info("SolrClean: finished at " + sdf.format(end) + ", elapsed: " + TimingUtil.elapsedTime(start, end));
}
 
源代码6 项目: hbase   文件: TestTableInputFormat.java
void testInputFormat(Class<? extends InputFormat> clazz) throws IOException {
  Configuration conf = UTIL.getConfiguration();
  final JobConf job = new JobConf(conf);
  job.setInputFormat(clazz);
  job.setOutputFormat(NullOutputFormat.class);
  job.setMapperClass(ExampleVerifier.class);
  job.setNumReduceTasks(0);
  LOG.debug("submitting job.");
  final RunningJob run = JobClient.runJob(job);
  assertTrue("job failed!", run.isSuccessful());
  assertEquals("Saw the wrong number of instances of the filtered-for row.", 2, run.getCounters()
      .findCounter(TestTableInputFormat.class.getName() + ":row", "aaa").getCounter());
  assertEquals("Saw any instances of the filtered out row.", 0, run.getCounters()
      .findCounter(TestTableInputFormat.class.getName() + ":row", "bbb").getCounter());
  assertEquals("Saw the wrong number of instances of columnA.", 1, run.getCounters()
      .findCounter(TestTableInputFormat.class.getName() + ":family", "columnA").getCounter());
  assertEquals("Saw the wrong number of instances of columnB.", 1, run.getCounters()
      .findCounter(TestTableInputFormat.class.getName() + ":family", "columnB").getCounter());
  assertEquals("Saw the wrong count of values for the filtered-for row.", 2, run.getCounters()
      .findCounter(TestTableInputFormat.class.getName() + ":value", "value aaa").getCounter());
  assertEquals("Saw the wrong count of values for the filtered-out row.", 0, run.getCounters()
      .findCounter(TestTableInputFormat.class.getName() + ":value", "value bbb").getCounter());
}
 
源代码7 项目: RDFS   文件: TestMiniMRDFSSort.java
private static void runJvmReuseTest(JobConf job,
                                    boolean reuse) throws IOException {
  // setup a map-only job that reads the input and only sets the counters
  // based on how many times the jvm was reused.
  job.setInt("mapred.job.reuse.jvm.num.tasks", reuse ? -1 : 1);
  FileInputFormat.setInputPaths(job, SORT_INPUT_PATH);
  job.setInputFormat(SequenceFileInputFormat.class);
  job.setOutputFormat(NullOutputFormat.class);
  job.setMapperClass(ReuseDetector.class);
  job.setOutputKeyClass(Text.class);
  job.setOutputValueClass(Text.class);
  job.setNumMapTasks(24);
  job.setNumReduceTasks(0);
  RunningJob result = JobClient.runJob(job);
  long uses = result.getCounters().findCounter("jvm", "use").getValue();
  int maps = job.getNumMapTasks();
  if (reuse) {
    assertTrue("maps = " + maps + ", uses = " + uses, maps < uses);
  } else {
    assertEquals("uses should be number of maps", job.getNumMapTasks(), uses);
  }
}
 
源代码8 项目: RDFS   文件: TestMapCollection.java
private static void runTest(String name, int keylen, int vallen,
    int records, int ioSortMB, float recPer, float spillPer,
    boolean pedantic) throws Exception {
  JobConf conf = new JobConf(new Configuration(), SpillMapper.class);

  conf.setInt("io.sort.mb", ioSortMB);
  conf.set("io.sort.record.percent", Float.toString(recPer));
  conf.set("io.sort.spill.percent", Float.toString(spillPer));

  conf.setInt("test.keywritable.length", keylen);
  conf.setInt("test.valwritable.length", vallen);
  conf.setInt("test.spillmap.records", records);
  conf.setBoolean("test.pedantic.verification", pedantic);

  conf.setNumMapTasks(1);
  conf.setNumReduceTasks(1);
  conf.setInputFormat(FakeIF.class);
  conf.setOutputFormat(NullOutputFormat.class);
  conf.setMapperClass(SpillMapper.class);
  conf.setReducerClass(SpillReducer.class);
  conf.setMapOutputKeyClass(KeyWritable.class);
  conf.setMapOutputValueClass(ValWritable.class);

  LOG.info("Running " + name);
  JobClient.runJob(conf);
}
 
源代码9 项目: hadoop-book   文件: SleepJob.java
public JobConf setupJobConf(int numMapper, int numReducer,
        long mapSleepTime, int mapSleepCount,
        long reduceSleepTime, int reduceSleepCount) {
    JobConf job = new JobConf(getConf(), SleepJob.class);
    job.setNumMapTasks(numMapper);
    job.setNumReduceTasks(numReducer);
    job.setMapperClass(SleepJob.class);
    job.setMapOutputKeyClass(IntWritable.class);
    job.setMapOutputValueClass(NullWritable.class);
    job.setReducerClass(SleepJob.class);
    job.setOutputFormat(NullOutputFormat.class);
    job.setInputFormat(SleepInputFormat.class);
    job.setPartitionerClass(SleepJob.class);
    job.setSpeculativeExecution(false);
    job.setJobName("Sleep job");
    FileInputFormat.addInputPath(job, new Path("ignored"));
    job.setLong("sleep.job.map.sleep.time", mapSleepTime);
    job.setLong("sleep.job.reduce.sleep.time", reduceSleepTime);
    job.setInt("sleep.job.map.sleep.count", mapSleepCount);
    job.setInt("sleep.job.reduce.sleep.count", reduceSleepCount);
    return job;
}
 
源代码10 项目: hadoop-gpu   文件: SleepJob.java
public JobConf setupJobConf(int numMapper, int numReducer, 
                              long mapSleepTime, int mapSleepCount, 
                              long reduceSleepTime, int reduceSleepCount) {
  JobConf job = new JobConf(getConf(), SleepJob.class);
  job.setNumMapTasks(numMapper);
  job.setNumReduceTasks(numReducer);
  job.setMapperClass(SleepJob.class);
  job.setMapOutputKeyClass(IntWritable.class);
  job.setMapOutputValueClass(NullWritable.class);
  job.setReducerClass(SleepJob.class);
  job.setOutputFormat(NullOutputFormat.class);
  job.setInputFormat(SleepInputFormat.class);
  job.setPartitionerClass(SleepJob.class);
  job.setSpeculativeExecution(false);
  job.setJobName("Sleep job");
  FileInputFormat.addInputPath(job, new Path("ignored"));
  job.setLong("sleep.job.map.sleep.time", mapSleepTime);
  job.setLong("sleep.job.reduce.sleep.time", reduceSleepTime);
  job.setInt("sleep.job.map.sleep.count", mapSleepCount);
  job.setInt("sleep.job.reduce.sleep.count", reduceSleepCount);
  return job;
}
 
源代码11 项目: hadoop-gpu   文件: TestMiniMRDFSSort.java
private static void runJvmReuseTest(JobConf job,
                                    boolean reuse) throws IOException {
  // setup a map-only job that reads the input and only sets the counters
  // based on how many times the jvm was reused.
  job.setInt("mapred.job.reuse.jvm.num.tasks", reuse ? -1 : 1);
  FileInputFormat.setInputPaths(job, SORT_INPUT_PATH);
  job.setInputFormat(SequenceFileInputFormat.class);
  job.setOutputFormat(NullOutputFormat.class);
  job.setMapperClass(ReuseDetector.class);
  job.setOutputKeyClass(Text.class);
  job.setOutputValueClass(Text.class);
  job.setNumMapTasks(24);
  job.setNumReduceTasks(0);
  RunningJob result = JobClient.runJob(job);
  long uses = result.getCounters().findCounter("jvm", "use").getValue();
  int maps = job.getNumMapTasks();
  if (reuse) {
    assertTrue("maps = " + maps + ", uses = " + uses, maps < uses);
  } else {
    assertEquals("uses should be number of maps", job.getNumMapTasks(), uses);
  }
}
 
源代码12 项目: hadoop-gpu   文件: TestMapCollection.java
private static void runTest(String name, int keylen, int vallen,
    int records, int ioSortMB, float recPer, float spillPer,
    boolean pedantic) throws Exception {
  JobConf conf = new JobConf(new Configuration(), SpillMapper.class);

  conf.setInt("io.sort.mb", ioSortMB);
  conf.set("io.sort.record.percent", Float.toString(recPer));
  conf.set("io.sort.spill.percent", Float.toString(spillPer));

  conf.setInt("test.keywritable.length", keylen);
  conf.setInt("test.valwritable.length", vallen);
  conf.setInt("test.spillmap.records", records);
  conf.setBoolean("test.pedantic.verification", pedantic);

  conf.setNumMapTasks(1);
  conf.setNumReduceTasks(1);
  conf.setInputFormat(FakeIF.class);
  conf.setOutputFormat(NullOutputFormat.class);
  conf.setMapperClass(SpillMapper.class);
  conf.setReducerClass(SpillReducer.class);
  conf.setMapOutputKeyClass(KeyWritable.class);
  conf.setMapOutputValueClass(ValWritable.class);

  LOG.info("Running " + name);
  JobClient.runJob(conf);
}
 
源代码13 项目: RDFS   文件: SleepJob.java
public JobConf setupJobConf(int numMapper, int numReducer, 
                              long mapSleepTime, int mapSleepCount, 
                              long reduceSleepTime, int reduceSleepCount,
                              boolean doSpeculation, List<String> slowMaps,
                              List<String> slowReduces, int slowRatio,
                              int countersPerTask, List<String> hosts,
                              int hostsPerSplit, boolean setup) {
  
  JobConf job = new JobConf(getConf(), SleepJob.class);
  job.setNumMapTasks(numMapper);
  job.setNumReduceTasks(numReducer);
  job.setMapperClass(SleepJob.class);
  job.setMapOutputKeyClass(IntWritable.class);
  job.setMapOutputValueClass(NullWritable.class);
  job.setReducerClass(SleepJob.class);
  job.setOutputFormat(NullOutputFormat.class);
  job.setJobSetupCleanupNeeded(setup);
  job.setInputFormat(SleepInputFormat.class);
  job.setPartitionerClass(SleepJob.class);
  job.setJobName("Sleep job");
  FileInputFormat.addInputPath(job, new Path("ignored"));
  job.setLong("sleep.job.map.sleep.time", mapSleepTime);
  job.setLong("sleep.job.reduce.sleep.time", reduceSleepTime);
  job.setInt("sleep.job.map.sleep.count", mapSleepCount);
  job.setInt("sleep.job.reduce.sleep.count", reduceSleepCount);
  job.setSpeculativeExecution(doSpeculation);
  job.setInt(SLOW_RATIO, slowRatio);
  job.setStrings(SLOW_MAPS, slowMaps.toArray(new String[slowMaps.size()]));
  job.setStrings(SLOW_REDUCES, slowMaps.toArray(new String[slowReduces.size()]));
  job.setInt("sleep.job.counters.per.task", countersPerTask);
  job.setStrings(HOSTS_FOR_LOCALITY, hosts.toArray(new String[hosts.size()]));
  job.setInt(HOSTS_PER_SPLIT, hostsPerSplit);
  return job;
}
 
源代码14 项目: RDFS   文件: TestNewCollector.java
private void runTest(String name, int keyLen, int valLen,
    int bigKeyLen, int bigValLen, int recordsNumPerMapper,
    int sortMb, float spillPer, int numMapperTasks,
    int numReducerTask, double[] reducerRecPercents,
    int[] numBigRecordsStart, int[] numBigRecordsMiddle,
    int[] numBigRecordsEnd) throws Exception {
  JobConf conf = mrCluster.createJobConf();
  conf.setInt("io.sort.mb", sortMb);
  conf.set("io.sort.spill.percent", Float.toString(spillPer));
  conf.setInt("test.key.length", keyLen);
  conf.setInt("test.value.length", valLen);
  conf.setInt("test.bigkey.length", bigKeyLen);
  conf.setInt("test.bigvalue.length", bigValLen);
  conf.setNumMapTasks(numMapperTasks);
  conf.setNumReduceTasks(numReducerTask);
  conf.setInputFormat(FakeIF.class);
  conf.setOutputFormat(NullOutputFormat.class);
  conf.setMapperClass(TestNewCollectorMapper.class);
  conf.setReducerClass(TestNewCollectorReducer.class);
  conf.setMapOutputKeyClass(TestNewCollectorKey.class);
  conf.setMapOutputValueClass(BytesWritable.class);
  conf.setBoolean("mapred.map.output.blockcollector", true);

  RecordNumStore.setJobConf(numReducerTask, numMapperTasks,
      recordsNumPerMapper, reducerRecPercents, numBigRecordsStart,
      numBigRecordsMiddle, numBigRecordsEnd, conf);
  RecordNumStore.getInst(conf);
  LOG.info("Running " + name);
  JobClient.runJob(conf);
}
 
源代码15 项目: RDFS   文件: ControlledMapReduceJob.java
private JobConf getControlledMapReduceJobConf(Configuration clusterConf,
    int numMapper, int numReducer)
    throws IOException {
  setConf(clusterConf);
  initialize();
  JobConf conf = new JobConf(getConf(), ControlledMapReduceJob.class);
  conf.setJobName("ControlledJob");
  conf.set("signal.dir.path", signalFileDir.toString());
  conf.setNumMapTasks(numMapper);
  conf.setNumReduceTasks(numReducer);
  conf.setMapperClass(ControlledMapReduceJob.class);
  conf.setMapOutputKeyClass(IntWritable.class);
  conf.setMapOutputValueClass(NullWritable.class);
  conf.setReducerClass(ControlledMapReduceJob.class);
  conf.setOutputKeyClass(NullWritable.class);
  conf.setOutputValueClass(NullWritable.class);
  conf.setInputFormat(ControlledMapReduceJob.class);
  FileInputFormat.addInputPath(conf, new Path("ignored"));
  conf.setOutputFormat(NullOutputFormat.class);
  conf.setMapSpeculativeExecution(false);
  conf.setReduceSpeculativeExecution(false);

  // Set the following for reduce tasks to be able to be started running
  // immediately along with maps.
  conf.set("mapred.reduce.slowstart.completed.maps", String.valueOf(0));

  return conf;
}
 
源代码16 项目: nutch-htmlunit   文件: CleaningJob.java
public void delete(String crawldb, boolean noCommit) throws IOException {
    SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
    long start = System.currentTimeMillis();
    LOG.info("CleaningJob: starting at " + sdf.format(start));

    JobConf job = new NutchJob(getConf());

    FileInputFormat.addInputPath(job, new Path(crawldb,
            CrawlDb.CURRENT_NAME));
    job.setBoolean("noCommit", noCommit);
    job.setInputFormat(SequenceFileInputFormat.class);
    job.setOutputFormat(NullOutputFormat.class);
    job.setMapOutputKeyClass(ByteWritable.class);
    job.setMapOutputValueClass(Text.class);
    job.setMapperClass(DBFilter.class);
    job.setReducerClass(DeleterReducer.class);
    
    job.setJobName("CleaningJob");

    // need to expicitely allow deletions
    job.setBoolean(IndexerMapReduce.INDEXER_DELETE, true);

    JobClient.runJob(job);

    long end = System.currentTimeMillis();
    LOG.info("CleaningJob: finished at " + sdf.format(end) + ", elapsed: "
            + TimingUtil.elapsedTime(start, end));
}
 
源代码17 项目: hadoop-gpu   文件: ControlledMapReduceJob.java
private JobConf getControlledMapReduceJobConf(Configuration clusterConf,
    int numMapper, int numReducer)
    throws IOException {
  setConf(clusterConf);
  initialize();
  JobConf conf = new JobConf(getConf(), ControlledMapReduceJob.class);
  conf.setJobName("ControlledJob");
  conf.set("signal.dir.path", signalFileDir.toString());
  conf.setNumMapTasks(numMapper);
  conf.setNumReduceTasks(numReducer);
  conf.setMapperClass(ControlledMapReduceJob.class);
  conf.setMapOutputKeyClass(IntWritable.class);
  conf.setMapOutputValueClass(NullWritable.class);
  conf.setReducerClass(ControlledMapReduceJob.class);
  conf.setOutputKeyClass(NullWritable.class);
  conf.setOutputValueClass(NullWritable.class);
  conf.setInputFormat(ControlledMapReduceJob.class);
  FileInputFormat.addInputPath(conf, new Path("ignored"));
  conf.setOutputFormat(NullOutputFormat.class);
  conf.setMapSpeculativeExecution(false);
  conf.setReduceSpeculativeExecution(false);

  // Set the following for reduce tasks to be able to be started running
  // immediately along with maps.
  conf.set("mapred.reduce.slowstart.completed.maps", String.valueOf(0));

  return conf;
}
 
源代码18 项目: hadoop   文件: GenericMRLoadGenerator.java
public int run(String [] argv) throws Exception {
  JobConf job = new JobConf(getConf());
  job.setJarByClass(GenericMRLoadGenerator.class);
  job.setMapperClass(SampleMapper.class);
  job.setReducerClass(SampleReducer.class);
  if (!parseArgs(argv, job)) {
    return -1;
  }

  if (null == FileOutputFormat.getOutputPath(job)) {
    // No output dir? No writes
    job.setOutputFormat(NullOutputFormat.class);
  }

  if (0 == FileInputFormat.getInputPaths(job).length) {
    // No input dir? Generate random data
    System.err.println("No input path; ignoring InputFormat");
    confRandom(job);
  } else if (null != job.getClass(
     org.apache.hadoop.mapreduce.GenericMRLoadGenerator.INDIRECT_INPUT_FORMAT,
     null)) {
    // specified IndirectInputFormat? Build src list
    JobClient jClient = new JobClient(job);
    Path tmpDir = new Path(jClient.getFs().getHomeDirectory(), ".staging");
    Random r = new Random();
    Path indirInputFile = new Path(tmpDir,
        Integer.toString(r.nextInt(Integer.MAX_VALUE), 36) + "_files");
    job.set(
      org.apache.hadoop.mapreduce.GenericMRLoadGenerator.INDIRECT_INPUT_FILE,
      indirInputFile.toString());
    SequenceFile.Writer writer = SequenceFile.createWriter(
        tmpDir.getFileSystem(job), job, indirInputFile,
        LongWritable.class, Text.class,
        SequenceFile.CompressionType.NONE);
    try {
      for (Path p : FileInputFormat.getInputPaths(job)) {
        FileSystem fs = p.getFileSystem(job);
        Stack<Path> pathstack = new Stack<Path>();
        pathstack.push(p);
        while (!pathstack.empty()) {
          for (FileStatus stat : fs.listStatus(pathstack.pop())) {
            if (stat.isDirectory()) {
              if (!stat.getPath().getName().startsWith("_")) {
                pathstack.push(stat.getPath());
              }
            } else {
              writer.sync();
              writer.append(new LongWritable(stat.getLen()),
                  new Text(stat.getPath().toUri().toString()));
            }
          }
        }
      }
    } finally {
      writer.close();
    }
  }

  Date startTime = new Date();
  System.out.println("Job started: " + startTime);
  JobClient.runJob(job);
  Date endTime = new Date();
  System.out.println("Job ended: " + endTime);
  System.out.println("The job took " +
                     (endTime.getTime() - startTime.getTime()) /1000 +
                     " seconds.");

  return 0;
}
 
源代码19 项目: hadoop   文件: Submitter.java
private static void setupPipesJob(JobConf conf) throws IOException {
  // default map output types to Text
  if (!getIsJavaMapper(conf)) {
    conf.setMapRunnerClass(PipesMapRunner.class);
    // Save the user's partitioner and hook in our's.
    setJavaPartitioner(conf, conf.getPartitionerClass());
    conf.setPartitionerClass(PipesPartitioner.class);
  }
  if (!getIsJavaReducer(conf)) {
    conf.setReducerClass(PipesReducer.class);
    if (!getIsJavaRecordWriter(conf)) {
      conf.setOutputFormat(NullOutputFormat.class);
    }
  }
  String textClassname = Text.class.getName();
  setIfUnset(conf, MRJobConfig.MAP_OUTPUT_KEY_CLASS, textClassname);
  setIfUnset(conf, MRJobConfig.MAP_OUTPUT_VALUE_CLASS, textClassname);
  setIfUnset(conf, MRJobConfig.OUTPUT_KEY_CLASS, textClassname);
  setIfUnset(conf, MRJobConfig.OUTPUT_VALUE_CLASS, textClassname);
  
  // Use PipesNonJavaInputFormat if necessary to handle progress reporting
  // from C++ RecordReaders ...
  if (!getIsJavaRecordReader(conf) && !getIsJavaMapper(conf)) {
    conf.setClass(Submitter.INPUT_FORMAT, 
                  conf.getInputFormat().getClass(), InputFormat.class);
    conf.setInputFormat(PipesNonJavaInputFormat.class);
  }
  
  String exec = getExecutable(conf);
  if (exec == null) {
    throw new IllegalArgumentException("No application program defined.");
  }
  // add default debug script only when executable is expressed as
  // <path>#<executable>
  if (exec.contains("#")) {
    // set default gdb commands for map and reduce task 
    String defScript = "$HADOOP_PREFIX/src/c++/pipes/debug/pipes-default-script";
    setIfUnset(conf, MRJobConfig.MAP_DEBUG_SCRIPT,defScript);
    setIfUnset(conf, MRJobConfig.REDUCE_DEBUG_SCRIPT,defScript);
  }
  URI[] fileCache = DistributedCache.getCacheFiles(conf);
  if (fileCache == null) {
    fileCache = new URI[1];
  } else {
    URI[] tmp = new URI[fileCache.length+1];
    System.arraycopy(fileCache, 0, tmp, 1, fileCache.length);
    fileCache = tmp;
  }
  try {
    fileCache[0] = new URI(exec);
  } catch (URISyntaxException e) {
    IOException ie = new IOException("Problem parsing execable URI " + exec);
    ie.initCause(e);
    throw ie;
  }
  DistributedCache.setCacheFiles(fileCache, conf);
}
 
源代码20 项目: big-c   文件: GenericMRLoadGenerator.java
public int run(String [] argv) throws Exception {
  JobConf job = new JobConf(getConf());
  job.setJarByClass(GenericMRLoadGenerator.class);
  job.setMapperClass(SampleMapper.class);
  job.setReducerClass(SampleReducer.class);
  if (!parseArgs(argv, job)) {
    return -1;
  }

  if (null == FileOutputFormat.getOutputPath(job)) {
    // No output dir? No writes
    job.setOutputFormat(NullOutputFormat.class);
  }

  if (0 == FileInputFormat.getInputPaths(job).length) {
    // No input dir? Generate random data
    System.err.println("No input path; ignoring InputFormat");
    confRandom(job);
  } else if (null != job.getClass(
     org.apache.hadoop.mapreduce.GenericMRLoadGenerator.INDIRECT_INPUT_FORMAT,
     null)) {
    // specified IndirectInputFormat? Build src list
    JobClient jClient = new JobClient(job);
    Path tmpDir = new Path(jClient.getFs().getHomeDirectory(), ".staging");
    Random r = new Random();
    Path indirInputFile = new Path(tmpDir,
        Integer.toString(r.nextInt(Integer.MAX_VALUE), 36) + "_files");
    job.set(
      org.apache.hadoop.mapreduce.GenericMRLoadGenerator.INDIRECT_INPUT_FILE,
      indirInputFile.toString());
    SequenceFile.Writer writer = SequenceFile.createWriter(
        tmpDir.getFileSystem(job), job, indirInputFile,
        LongWritable.class, Text.class,
        SequenceFile.CompressionType.NONE);
    try {
      for (Path p : FileInputFormat.getInputPaths(job)) {
        FileSystem fs = p.getFileSystem(job);
        Stack<Path> pathstack = new Stack<Path>();
        pathstack.push(p);
        while (!pathstack.empty()) {
          for (FileStatus stat : fs.listStatus(pathstack.pop())) {
            if (stat.isDirectory()) {
              if (!stat.getPath().getName().startsWith("_")) {
                pathstack.push(stat.getPath());
              }
            } else {
              writer.sync();
              writer.append(new LongWritable(stat.getLen()),
                  new Text(stat.getPath().toUri().toString()));
            }
          }
        }
      }
    } finally {
      writer.close();
    }
  }

  Date startTime = new Date();
  System.out.println("Job started: " + startTime);
  JobClient.runJob(job);
  Date endTime = new Date();
  System.out.println("Job ended: " + endTime);
  System.out.println("The job took " +
                     (endTime.getTime() - startTime.getTime()) /1000 +
                     " seconds.");

  return 0;
}
 
源代码21 项目: big-c   文件: Submitter.java
private static void setupPipesJob(JobConf conf) throws IOException {
  // default map output types to Text
  if (!getIsJavaMapper(conf)) {
    conf.setMapRunnerClass(PipesMapRunner.class);
    // Save the user's partitioner and hook in our's.
    setJavaPartitioner(conf, conf.getPartitionerClass());
    conf.setPartitionerClass(PipesPartitioner.class);
  }
  if (!getIsJavaReducer(conf)) {
    conf.setReducerClass(PipesReducer.class);
    if (!getIsJavaRecordWriter(conf)) {
      conf.setOutputFormat(NullOutputFormat.class);
    }
  }
  String textClassname = Text.class.getName();
  setIfUnset(conf, MRJobConfig.MAP_OUTPUT_KEY_CLASS, textClassname);
  setIfUnset(conf, MRJobConfig.MAP_OUTPUT_VALUE_CLASS, textClassname);
  setIfUnset(conf, MRJobConfig.OUTPUT_KEY_CLASS, textClassname);
  setIfUnset(conf, MRJobConfig.OUTPUT_VALUE_CLASS, textClassname);
  
  // Use PipesNonJavaInputFormat if necessary to handle progress reporting
  // from C++ RecordReaders ...
  if (!getIsJavaRecordReader(conf) && !getIsJavaMapper(conf)) {
    conf.setClass(Submitter.INPUT_FORMAT, 
                  conf.getInputFormat().getClass(), InputFormat.class);
    conf.setInputFormat(PipesNonJavaInputFormat.class);
  }
  
  String exec = getExecutable(conf);
  if (exec == null) {
    throw new IllegalArgumentException("No application program defined.");
  }
  // add default debug script only when executable is expressed as
  // <path>#<executable>
  if (exec.contains("#")) {
    // set default gdb commands for map and reduce task 
    String defScript = "$HADOOP_PREFIX/src/c++/pipes/debug/pipes-default-script";
    setIfUnset(conf, MRJobConfig.MAP_DEBUG_SCRIPT,defScript);
    setIfUnset(conf, MRJobConfig.REDUCE_DEBUG_SCRIPT,defScript);
  }
  URI[] fileCache = DistributedCache.getCacheFiles(conf);
  if (fileCache == null) {
    fileCache = new URI[1];
  } else {
    URI[] tmp = new URI[fileCache.length+1];
    System.arraycopy(fileCache, 0, tmp, 1, fileCache.length);
    fileCache = tmp;
  }
  try {
    fileCache[0] = new URI(exec);
  } catch (URISyntaxException e) {
    IOException ie = new IOException("Problem parsing execable URI " + exec);
    ie.initCause(e);
    throw ie;
  }
  DistributedCache.setCacheFiles(fileCache, conf);
}
 
@Override
public String getOutputFormat() {
	 return NullOutputFormat.class.getName();
}
 
@Override
public String getOutputFormat() {
  return NullOutputFormat.class.getName();
}
 
源代码24 项目: hbase   文件: TestTableSnapshotInputFormat.java
public static void doTestWithMapReduce(HBaseTestingUtility util, TableName tableName,
    String snapshotName, byte[] startRow, byte[] endRow, Path tableDir, int numRegions,
    int numSplitsPerRegion,int expectedNumSplits, boolean shutdownCluster) throws Exception {

  //create the table and snapshot
  createTableAndSnapshot(util, tableName, snapshotName, startRow, endRow, numRegions);

  if (shutdownCluster) {
    util.shutdownMiniHBaseCluster();
  }

  try {
    // create the job
    JobConf jobConf = new JobConf(util.getConfiguration());

    jobConf.setJarByClass(util.getClass());
    org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil.addDependencyJarsForClasses(jobConf,
      TestTableSnapshotInputFormat.class);

    if(numSplitsPerRegion > 1) {
      TableMapReduceUtil.initTableSnapshotMapJob(snapshotName, COLUMNS,
              TestTableSnapshotMapper.class, ImmutableBytesWritable.class,
              NullWritable.class, jobConf, true, tableDir, new RegionSplitter.UniformSplit(),
              numSplitsPerRegion);
    } else {
      TableMapReduceUtil.initTableSnapshotMapJob(snapshotName, COLUMNS,
              TestTableSnapshotMapper.class, ImmutableBytesWritable.class,
              NullWritable.class, jobConf, true, tableDir);
    }

    jobConf.setReducerClass(TestTableSnapshotInputFormat.TestTableSnapshotReducer.class);
    jobConf.setNumReduceTasks(1);
    jobConf.setOutputFormat(NullOutputFormat.class);

    RunningJob job = JobClient.runJob(jobConf);
    Assert.assertTrue(job.isSuccessful());
  } finally {
    if (!shutdownCluster) {
      util.getAdmin().deleteSnapshot(snapshotName);
      util.deleteTable(tableName);
    }
  }
}
 
源代码25 项目: RDFS   文件: Submitter.java
private static void setupPipesJob(JobConf conf) throws IOException {
  // default map output types to Text
  if (!getIsJavaMapper(conf)) {
    conf.setMapRunnerClass(PipesMapRunner.class);
    // Save the user's partitioner and hook in our's.
    setJavaPartitioner(conf, conf.getPartitionerClass());
    conf.setPartitionerClass(PipesPartitioner.class);
  }
  if (!getIsJavaReducer(conf)) {
    conf.setReducerClass(PipesReducer.class);
    if (!getIsJavaRecordWriter(conf)) {
      conf.setOutputFormat(NullOutputFormat.class);
    }
  }
  String textClassname = Text.class.getName();
  setIfUnset(conf, "mapred.mapoutput.key.class", textClassname);
  setIfUnset(conf, "mapred.mapoutput.value.class", textClassname);
  setIfUnset(conf, "mapred.output.key.class", textClassname);
  setIfUnset(conf, "mapred.output.value.class", textClassname);
  
  // Use PipesNonJavaInputFormat if necessary to handle progress reporting
  // from C++ RecordReaders ...
  if (!getIsJavaRecordReader(conf) && !getIsJavaMapper(conf)) {
    conf.setClass("mapred.pipes.user.inputformat", 
                  conf.getInputFormat().getClass(), InputFormat.class);
    conf.setInputFormat(PipesNonJavaInputFormat.class);
  }
  
  String exec = getExecutable(conf);
  if (exec == null) {
    throw new IllegalArgumentException("No application program defined.");
  }
  // add default debug script only when executable is expressed as
  // <path>#<executable>
  if (exec.contains("#")) {
    DistributedCache.createSymlink(conf);
    // set default gdb commands for map and reduce task 
    String defScript = "$HADOOP_HOME/src/c++/pipes/debug/pipes-default-script";
    setIfUnset(conf,"mapred.map.task.debug.script",defScript);
    setIfUnset(conf,"mapred.reduce.task.debug.script",defScript);
  }
  URI[] fileCache = DistributedCache.getCacheFiles(conf);
  if (fileCache == null) {
    fileCache = new URI[1];
  } else {
    URI[] tmp = new URI[fileCache.length+1];
    System.arraycopy(fileCache, 0, tmp, 1, fileCache.length);
    fileCache = tmp;
  }
  try {
    fileCache[0] = new URI(exec);
  } catch (URISyntaxException e) {
    IOException ie = new IOException("Problem parsing execable URI " + exec);
    ie.initCause(e);
    throw ie;
  }
  DistributedCache.setCacheFiles(fileCache, conf);
}
 
源代码26 项目: RDFS   文件: GenericMRLoadJobCreator.java
public static JobConf createJob(String[] argv, boolean mapoutputCompressed,
    boolean outputCompressed) throws Exception {

  JobConf job = new JobConf();
  job.setJarByClass(GenericMRLoadGenerator.class);
  job.setMapperClass(SampleMapper.class);
  job.setReducerClass(SampleReducer.class);
  if (!parseArgs(argv, job)) {
    return null;
  }

  if (null == FileOutputFormat.getOutputPath(job)) {
    // No output dir? No writes
    job.setOutputFormat(NullOutputFormat.class);
  }

  if (0 == FileInputFormat.getInputPaths(job).length) {
    // No input dir? Generate random data
    System.err.println("No input path; ignoring InputFormat");
    confRandom(job);
  } else if (null != job.getClass("mapred.indirect.input.format", null)) {
    // specified IndirectInputFormat? Build src list
    JobClient jClient = new JobClient(job);
    Path sysdir = jClient.getSystemDir();
    Random r = new Random();
    Path indirInputFile = new Path(sysdir, Integer.toString(r
        .nextInt(Integer.MAX_VALUE), 36)
        + "_files");
    job.set("mapred.indirect.input.file", indirInputFile.toString());
    SequenceFile.Writer writer = SequenceFile.createWriter(sysdir
        .getFileSystem(job), job, indirInputFile, LongWritable.class,
        Text.class, SequenceFile.CompressionType.NONE);
    try {
      for (Path p : FileInputFormat.getInputPaths(job)) {
        FileSystem fs = p.getFileSystem(job);
        Stack<Path> pathstack = new Stack<Path>();
        pathstack.push(p);
        while (!pathstack.empty()) {
          for (FileStatus stat : fs.listStatus(pathstack.pop())) {
            if (stat.isDir()) {
              if (!stat.getPath().getName().startsWith("_")) {
                pathstack.push(stat.getPath());
              }
            } else {
              writer.sync();
              writer.append(new LongWritable(stat.getLen()), new Text(stat
                  .getPath().toUri().toString()));
            }
          }
        }
      }
    } finally {
      writer.close();
    }
  }

  job.setCompressMapOutput(mapoutputCompressed);
  job.setBoolean("mapred.output.compress", outputCompressed);
  return job;

}
 
源代码27 项目: RDFS   文件: HadoopArchives.java
/**archive the given source paths into
 * the dest
 * @param parentPath the parent path of all the source paths
 * @param srcPaths the src paths to be archived
 * @param dest the dest dir that will contain the archive
 */
void archive(Path parentPath, List<Path> srcPaths, 
    String archiveName, Path dest) throws IOException {
  checkPaths(conf, srcPaths);
  int numFiles = 0;
  long totalSize = 0;
  FileSystem fs = parentPath.getFileSystem(conf);
  this.blockSize = conf.getLong(HAR_BLOCKSIZE_LABEL, blockSize);
  this.partSize = conf.getLong(HAR_PARTSIZE_LABEL, partSize);
  conf.setLong(HAR_BLOCKSIZE_LABEL, blockSize);
  conf.setLong(HAR_PARTSIZE_LABEL, partSize);
  conf.set(DST_HAR_LABEL, archiveName);
  conf.set(SRC_PARENT_LABEL, parentPath.makeQualified(fs).toString());
  Path outputPath = new Path(dest, archiveName);
  FileOutputFormat.setOutputPath(conf, outputPath);
  FileSystem outFs = outputPath.getFileSystem(conf);
  if (outFs.exists(outputPath) || outFs.isFile(dest)) {
    throw new IOException("Invalid Output: " + outputPath);
  }
  conf.set(DST_DIR_LABEL, outputPath.toString());
  final String randomId = DistCp.getRandomId();
  Path jobDirectory = new Path(new JobClient(conf).getSystemDir(),
                        NAME + "_" + randomId);
  conf.set(JOB_DIR_LABEL, jobDirectory.toString());
  //get a tmp directory for input splits
  FileSystem jobfs = jobDirectory.getFileSystem(conf);
  jobfs.mkdirs(jobDirectory);
  Path srcFiles = new Path(jobDirectory, "_har_src_files");
  conf.set(SRC_LIST_LABEL, srcFiles.toString());
  SequenceFile.Writer srcWriter = SequenceFile.createWriter(jobfs, conf,
      srcFiles, LongWritable.class, HarEntry.class, 
      SequenceFile.CompressionType.NONE);
  // get the list of files 
  // create single list of files and dirs
  try {
    // write the top level dirs in first 
    writeTopLevelDirs(srcWriter, srcPaths, parentPath);
    srcWriter.sync();
    // these are the input paths passed 
    // from the command line
    // we do a recursive ls on these paths 
    // and then write them to the input file 
    // one at a time
    for (Path src: srcPaths) {
      ArrayList<FileStatus> allFiles = new ArrayList<FileStatus>();
      recursivels(fs, src, allFiles);
      for (FileStatus stat: allFiles) {
        String toWrite = "";
        long len = stat.isDir()? 0:stat.getLen();
        String path = relPathToRoot(stat.getPath(), parentPath).toString();
        String[] children = null;
        if (stat.isDir()) {
          //get the children 
          FileStatus[] list = fs.listStatus(stat.getPath());
          children = new String[list.length];
          for (int i = 0; i < list.length; i++) {
            children[i] = list[i].getPath().getName();
          }
        }
        srcWriter.append(new LongWritable(len), new HarEntry(path, children));
        srcWriter.sync();
        numFiles++;
        totalSize += len;
      }
    }
  } finally {
    srcWriter.close();
  }
  //increase the replication of src files
  jobfs.setReplication(srcFiles, (short) 10);
  conf.setInt(SRC_COUNT_LABEL, numFiles);
  conf.setLong(TOTAL_SIZE_LABEL, totalSize);
  int numMaps = (int)(totalSize/partSize);
  //run atleast one map.
  conf.setNumMapTasks(numMaps == 0? 1:numMaps);
  conf.setNumReduceTasks(1);
  conf.setInputFormat(HArchiveInputFormat.class);
  conf.setOutputFormat(NullOutputFormat.class);
  conf.setMapperClass(HArchivesMapper.class);
  conf.setReducerClass(HArchivesReducer.class);
  conf.setMapOutputKeyClass(IntWritable.class);
  conf.setMapOutputValueClass(Text.class);
  conf.set("hadoop.job.history.user.location", "none");
  FileInputFormat.addInputPath(conf, jobDirectory);
  //make sure no speculative execution is done
  conf.setSpeculativeExecution(false);
  JobClient.runJob(conf);
  //delete the tmp job directory
  try {
    jobfs.delete(jobDirectory, true);
  } catch(IOException ie) {
    LOG.info("Unable to clean tmp directory " + jobDirectory);
  }
}
 
源代码28 项目: RDFS   文件: GenericMRLoadGenerator.java
public int run(String [] argv) throws Exception {
  JobConf job = new JobConf(getConf());
  job.setJarByClass(GenericMRLoadGenerator.class);
  job.setMapperClass(SampleMapper.class);
  job.setReducerClass(SampleReducer.class);
  if (!parseArgs(argv, job)) {
    return -1;
  }

  if (null == FileOutputFormat.getOutputPath(job)) {
    // No output dir? No writes
    job.setOutputFormat(NullOutputFormat.class);
  }

  if (0 == FileInputFormat.getInputPaths(job).length) {
    // No input dir? Generate random data
    System.err.println("No input path; ignoring InputFormat");
    confRandom(job);
  } else if (null != job.getClass("mapred.indirect.input.format", null)) {
    // specified IndirectInputFormat? Build src list
    JobClient jClient = new JobClient(job);  
    Path sysdir = jClient.getSystemDir();
    Random r = new Random();
    Path indirInputFile = new Path(sysdir,
        Integer.toString(r.nextInt(Integer.MAX_VALUE), 36) + "_files");
    job.set("mapred.indirect.input.file", indirInputFile.toString());
    SequenceFile.Writer writer = SequenceFile.createWriter(
        sysdir.getFileSystem(job), job, indirInputFile,
        LongWritable.class, Text.class,
        SequenceFile.CompressionType.NONE);
    try {
      for (Path p : FileInputFormat.getInputPaths(job)) {
        FileSystem fs = p.getFileSystem(job);
        Stack<Path> pathstack = new Stack<Path>();
        pathstack.push(p);
        while (!pathstack.empty()) {
          for (FileStatus stat : fs.listStatus(pathstack.pop())) {
            if (stat.isDir()) {
              if (!stat.getPath().getName().startsWith("_")) {
                pathstack.push(stat.getPath());
              }
            } else {
              writer.sync();
              writer.append(new LongWritable(stat.getLen()),
                  new Text(stat.getPath().toUri().toString()));
            }
          }
        }
      }
    } finally {
      writer.close();
    }
  }

  Date startTime = new Date();
  System.out.println("Job started: " + startTime);
  JobClient.runJob(job);
  Date endTime = new Date();
  System.out.println("Job ended: " + endTime);
  System.out.println("The job took " +
                     (endTime.getTime() - startTime.getTime()) /1000 +
                     " seconds.");

  return 0;
}
 
源代码29 项目: hadoop-gpu   文件: GenericMRLoadJobCreator.java
public static JobConf createJob(String[] argv, boolean mapoutputCompressed,
    boolean outputCompressed) throws Exception {

  JobConf job = new JobConf();
  job.setJarByClass(GenericMRLoadGenerator.class);
  job.setMapperClass(SampleMapper.class);
  job.setReducerClass(SampleReducer.class);
  if (!parseArgs(argv, job)) {
    return null;
  }

  if (null == FileOutputFormat.getOutputPath(job)) {
    // No output dir? No writes
    job.setOutputFormat(NullOutputFormat.class);
  }

  if (0 == FileInputFormat.getInputPaths(job).length) {
    // No input dir? Generate random data
    System.err.println("No input path; ignoring InputFormat");
    confRandom(job);
  } else if (null != job.getClass("mapred.indirect.input.format", null)) {
    // specified IndirectInputFormat? Build src list
    JobClient jClient = new JobClient(job);
    Path sysdir = jClient.getSystemDir();
    Random r = new Random();
    Path indirInputFile = new Path(sysdir, Integer.toString(r
        .nextInt(Integer.MAX_VALUE), 36)
        + "_files");
    job.set("mapred.indirect.input.file", indirInputFile.toString());
    SequenceFile.Writer writer = SequenceFile.createWriter(sysdir
        .getFileSystem(job), job, indirInputFile, LongWritable.class,
        Text.class, SequenceFile.CompressionType.NONE);
    try {
      for (Path p : FileInputFormat.getInputPaths(job)) {
        FileSystem fs = p.getFileSystem(job);
        Stack<Path> pathstack = new Stack<Path>();
        pathstack.push(p);
        while (!pathstack.empty()) {
          for (FileStatus stat : fs.listStatus(pathstack.pop())) {
            if (stat.isDir()) {
              if (!stat.getPath().getName().startsWith("_")) {
                pathstack.push(stat.getPath());
              }
            } else {
              writer.sync();
              writer.append(new LongWritable(stat.getLen()), new Text(stat
                  .getPath().toUri().toString()));
            }
          }
        }
      }
    } finally {
      writer.close();
    }
  }

  job.setCompressMapOutput(mapoutputCompressed);
  job.setBoolean("mapred.output.compress", outputCompressed);
  return job;

}
 
源代码30 项目: hadoop-gpu   文件: HadoopArchives.java
/**archive the given source paths into
 * the dest
 * @param srcPaths the src paths to be archived
 * @param dest the dest dir that will contain the archive
 */
public void archive(List<Path> srcPaths, String archiveName, Path dest) 
throws IOException {
  checkPaths(conf, srcPaths);
  int numFiles = 0;
  long totalSize = 0;
  conf.set(DST_HAR_LABEL, archiveName);
  Path outputPath = new Path(dest, archiveName);
  FileOutputFormat.setOutputPath(conf, outputPath);
  FileSystem outFs = outputPath.getFileSystem(conf);
  if (outFs.exists(outputPath) || outFs.isFile(dest)) {
    throw new IOException("Invalid Output.");
  }
  conf.set(DST_DIR_LABEL, outputPath.toString());
  final String randomId = DistCp.getRandomId();
  Path jobDirectory = new Path(new JobClient(conf).getSystemDir(),
                        NAME + "_" + randomId);
  conf.set(JOB_DIR_LABEL, jobDirectory.toString());
  //get a tmp directory for input splits
  FileSystem jobfs = jobDirectory.getFileSystem(conf);
  jobfs.mkdirs(jobDirectory);
  Path srcFiles = new Path(jobDirectory, "_har_src_files");
  conf.set(SRC_LIST_LABEL, srcFiles.toString());
  SequenceFile.Writer srcWriter = SequenceFile.createWriter(jobfs, conf,
      srcFiles, LongWritable.class, Text.class, 
      SequenceFile.CompressionType.NONE);
  // get the list of files 
  // create single list of files and dirs
  try {
    // write the top level dirs in first 
    writeTopLevelDirs(srcWriter, srcPaths);
    srcWriter.sync();
    // these are the input paths passed 
    // from the command line
    // we do a recursive ls on these paths 
    // and then write them to the input file 
    // one at a time
    for (Path src: srcPaths) {
      FileSystem fs = src.getFileSystem(conf);
      ArrayList<FileStatus> allFiles = new ArrayList<FileStatus>();
      recursivels(fs, src, allFiles);
      for (FileStatus stat: allFiles) {
        String toWrite = "";
        long len = stat.isDir()? 0:stat.getLen();
        if (stat.isDir()) {
          toWrite = "" + fs.makeQualified(stat.getPath()) + " dir ";
          //get the children 
          FileStatus[] list = fs.listStatus(stat.getPath());
          StringBuffer sbuff = new StringBuffer();
          sbuff.append(toWrite);
          for (FileStatus stats: list) {
            sbuff.append(stats.getPath().getName() + " ");
          }
          toWrite = sbuff.toString();
        }
        else {
          toWrite +=  fs.makeQualified(stat.getPath()) + " file ";
        }
        srcWriter.append(new LongWritable(len), new 
            Text(toWrite));
        srcWriter.sync();
        numFiles++;
        totalSize += len;
      }
    }
  } finally {
    srcWriter.close();
  }
  //increase the replication of src files
  jobfs.setReplication(srcFiles, (short) 10);
  conf.setInt(SRC_COUNT_LABEL, numFiles);
  conf.setLong(TOTAL_SIZE_LABEL, totalSize);
  int numMaps = (int)(totalSize/partSize);
  //run atleast one map.
  conf.setNumMapTasks(numMaps == 0? 1:numMaps);
  conf.setNumReduceTasks(1);
  conf.setInputFormat(HArchiveInputFormat.class);
  conf.setOutputFormat(NullOutputFormat.class);
  conf.setMapperClass(HArchivesMapper.class);
  conf.setReducerClass(HArchivesReducer.class);
  conf.setMapOutputKeyClass(IntWritable.class);
  conf.setMapOutputValueClass(Text.class);
  conf.set("hadoop.job.history.user.location", "none");
  FileInputFormat.addInputPath(conf, jobDirectory);
  //make sure no speculative execution is done
  conf.setSpeculativeExecution(false);
  JobClient.runJob(conf);
  //delete the tmp job directory
  try {
    jobfs.delete(jobDirectory, true);
  } catch(IOException ie) {
    LOG.info("Unable to clean tmp directory " + jobDirectory);
  }
}
 
 类所在包
 类方法
 同包方法