org.apache.hadoop.mapred.FileOutputFormat#setOutputPath ( )源码实例Demo

下面列出了org.apache.hadoop.mapred.FileOutputFormat#setOutputPath ( ) 实例代码,或者点击链接到github查看源代码,也可以在右侧发表评论。

源代码1 项目: RDFS   文件: TeraValidate.java
public int run(String[] args) throws Exception {
  JobConf job = (JobConf) getConf();
  TeraInputFormat.setInputPaths(job, new Path(args[0]));
  FileOutputFormat.setOutputPath(job, new Path(args[1]));
  job.setJobName("TeraValidate");
  job.setJarByClass(TeraValidate.class);
  job.setMapperClass(ValidateMapper.class);
  job.setReducerClass(ValidateReducer.class);
  job.setOutputKeyClass(Text.class);
  job.setOutputValueClass(Text.class);
  // force a single reducer
  job.setNumReduceTasks(1);
  // force a single split 
  job.setLong("mapred.min.split.size", Long.MAX_VALUE);
  job.setInputFormat(TeraInputFormat.class);
  JobClient.runJob(job);
  return 0;
}
 
源代码2 项目: anthelion   文件: LinkDbMerger.java
public static JobConf createMergeJob(Configuration config, Path linkDb, boolean normalize, boolean filter) {
  Path newLinkDb =
    new Path("linkdb-merge-" + 
             Integer.toString(new Random().nextInt(Integer.MAX_VALUE)));

  JobConf job = new NutchJob(config);
  job.setJobName("linkdb merge " + linkDb);

  job.setInputFormat(SequenceFileInputFormat.class);

  job.setMapperClass(LinkDbFilter.class);
  job.setBoolean(LinkDbFilter.URL_NORMALIZING, normalize);
  job.setBoolean(LinkDbFilter.URL_FILTERING, filter);
  job.setReducerClass(LinkDbMerger.class);

  FileOutputFormat.setOutputPath(job, newLinkDb);
  job.setOutputFormat(MapFileOutputFormat.class);
  job.setBoolean("mapred.output.compress", true);
  job.setOutputKeyClass(Text.class);
  job.setOutputValueClass(Inlinks.class);

  // https://issues.apache.org/jira/browse/NUTCH-1069
  job.setBoolean("mapreduce.fileoutputcommitter.marksuccessfuljobs", false);

  return job;
}
 
源代码3 项目: hadoop-book   文件: InvertedIndex.java
public static void main(String[] args) {
    JobClient client = new JobClient();
    JobConf conf = new JobConf(InvertedIndex.class);

    conf.setJobName("InvertedIndex");

    conf.setOutputKeyClass(Text.class);
    conf.setOutputValueClass(Text.class);

    FileInputFormat.addInputPath(conf, new Path("input"));
    FileOutputFormat.setOutputPath(conf, new Path("output"));

    conf.setMapperClass(InvertedIndexMapper.class);
    conf.setReducerClass(InvertedIndexReducer.class);

    client.setConf(conf);

    try {
        JobClient.runJob(conf);
    } catch (Exception e) {
        e.printStackTrace(System.out);
    }
}
 
源代码4 项目: big-c   文件: SliveTest.java
/**
 * Sets up a job conf for the given job using the given config object. Ensures
 * that the correct input format is set, the mapper and and reducer class and
 * the input and output keys and value classes along with any other job
 * configuration.
 * 
 * @param config
 * @return JobConf representing the job to be ran
 * @throws IOException
 */
private JobConf getJob(ConfigExtractor config) throws IOException {
  JobConf job = new JobConf(config.getConfig(), SliveTest.class);
  job.setInputFormat(DummyInputFormat.class);
  FileOutputFormat.setOutputPath(job, config.getOutputPath());
  job.setMapperClass(SliveMapper.class);
  job.setPartitionerClass(SlivePartitioner.class);
  job.setReducerClass(SliveReducer.class);
  job.setOutputKeyClass(Text.class);
  job.setOutputValueClass(Text.class);
  job.setOutputFormat(TextOutputFormat.class);
  TextOutputFormat.setCompressOutput(job, false);
  job.setNumReduceTasks(config.getReducerAmount());
  job.setNumMapTasks(config.getMapAmount());
  return job;
}
 
源代码5 项目: hadoop-book   文件: TeraGen.java
/**
 * @param args the cli arguments
 */
public int run(String[] args) throws IOException {
  JobConf job = (JobConf) getConf();
  setNumberOfRows(job, Long.parseLong(args[0]));
  FileOutputFormat.setOutputPath(job, new Path(args[1]));
  job.setJobName("TeraGen");
  job.setJarByClass(TeraGen.class);
  job.setMapperClass(SortGenMapper.class);
  job.setNumReduceTasks(0);
  job.setOutputKeyClass(Text.class);
  job.setOutputValueClass(Text.class);
  job.setInputFormat(RangeInputFormat.class);
  job.setOutputFormat(TeraOutputFormat.class);
  JobClient.runJob(job);
  return 0;
}
 
源代码6 项目: hadoop-gpu   文件: TestDatamerge.java
public void testEmptyJoin() throws Exception {
  JobConf job = new JobConf();
  Path base = cluster.getFileSystem().makeQualified(new Path("/empty"));
  Path[] src = { new Path(base,"i0"), new Path("i1"), new Path("i2") };
  job.set("mapred.join.expr", CompositeInputFormat.compose("outer",
      Fake_IF.class, src));
  job.setInputFormat(CompositeInputFormat.class);
  FileOutputFormat.setOutputPath(job, new Path(base, "out"));

  job.setMapperClass(IdentityMapper.class);
  job.setReducerClass(IdentityReducer.class);
  job.setOutputKeyClass(IncomparableKey.class);
  job.setOutputValueClass(NullWritable.class);

  JobClient.runJob(job);
  base.getFileSystem(job).delete(base, true);
}
 
源代码7 项目: attic-apex-malhar   文件: LineIndexer.java
/**
 * The actual main() method for our program; this is the
 * "driver" for the MapReduce job.
 */
public static void main(String[] args)
{
  JobClient client = new JobClient();
  JobConf conf = new JobConf(LineIndexer.class);

  conf.setJobName("LineIndexer");

  conf.setOutputKeyClass(Text.class);
  conf.setOutputValueClass(Text.class);

  FileInputFormat.addInputPath(conf, new Path("input"));
  FileOutputFormat.setOutputPath(conf, new Path("output"));

  conf.setMapperClass(LineIndexMapper.class);
  conf.setReducerClass(LineIndexReducer.class);

  client.setConf(conf);

  try {
    JobClient.runJob(conf);
  } catch (Exception e) {
    e.printStackTrace();
  }
}
 
源代码8 项目: big-c   文件: TestMROldApiJobs.java
static boolean runJob(JobConf conf, Path inDir, Path outDir, int numMaps, 
                         int numReds) throws IOException, InterruptedException {

  FileSystem fs = FileSystem.get(conf);
  if (fs.exists(outDir)) {
    fs.delete(outDir, true);
  }
  if (!fs.exists(inDir)) {
    fs.mkdirs(inDir);
  }
  String input = "The quick brown fox\n" + "has many silly\n"
      + "red fox sox\n";
  for (int i = 0; i < numMaps; ++i) {
    DataOutputStream file = fs.create(new Path(inDir, "part-" + i));
    file.writeBytes(input);
    file.close();
  }

  DistributedCache.addFileToClassPath(TestMRJobs.APP_JAR, conf, fs);
  conf.setOutputCommitter(CustomOutputCommitter.class);
  conf.setInputFormat(TextInputFormat.class);
  conf.setOutputKeyClass(LongWritable.class);
  conf.setOutputValueClass(Text.class);

  FileInputFormat.setInputPaths(conf, inDir);
  FileOutputFormat.setOutputPath(conf, outDir);
  conf.setNumMapTasks(numMaps);
  conf.setNumReduceTasks(numReds);

  JobClient jobClient = new JobClient(conf);
  
  RunningJob job = jobClient.submitJob(conf);
  return jobClient.monitorAndPrintJob(conf, job);
}
 
源代码9 项目: anthelion   文件: LinkRank.java
/**
 * Runs the link analysis job. The link analysis job applies the link rank
 * formula to create a score per url and stores that score in the NodeDb.
 * 
 * Typically the link analysis job is run a number of times to allow the link
 * rank scores to converge.
 * 
 * @param nodeDb The node database from which we are getting previous link
 * rank scores.
 * @param inverted The inverted inlinks
 * @param output The link analysis output.
 * @param iteration The current iteration number.
 * @param numIterations The total number of link analysis iterations
 * 
 * @throws IOException If an error occurs during link analysis.
 */
private void runAnalysis(Path nodeDb, Path inverted, Path output,
  int iteration, int numIterations, float rankOne)
  throws IOException {

  JobConf analyzer = new NutchJob(getConf());
  analyzer.set("link.analyze.iteration", String.valueOf(iteration + 1));
  analyzer.setJobName("LinkAnalysis Analyzer, iteration " + (iteration + 1)
    + " of " + numIterations);
  FileInputFormat.addInputPath(analyzer, nodeDb);
  FileInputFormat.addInputPath(analyzer, inverted);
  FileOutputFormat.setOutputPath(analyzer, output);
  analyzer.set("link.analyze.rank.one", String.valueOf(rankOne));
  analyzer.setMapOutputKeyClass(Text.class);
  analyzer.setMapOutputValueClass(ObjectWritable.class);
  analyzer.setInputFormat(SequenceFileInputFormat.class);
  analyzer.setMapperClass(Analyzer.class);
  analyzer.setReducerClass(Analyzer.class);
  analyzer.setOutputKeyClass(Text.class);
  analyzer.setOutputValueClass(Node.class);
  analyzer.setOutputFormat(MapFileOutputFormat.class);
  analyzer.setBoolean("mapreduce.fileoutputcommitter.marksuccessfuljobs", false);

  LOG.info("Starting analysis job");
  try {
    JobClient.runJob(analyzer);
  }
  catch (IOException e) {
    LOG.error(StringUtils.stringifyException(e));
    throw e;
  }
  LOG.info("Finished analysis job.");
}
 
源代码10 项目: hadoop   文件: TestMROldApiJobs.java
static boolean runJob(JobConf conf, Path inDir, Path outDir, int numMaps, 
                         int numReds) throws IOException, InterruptedException {

  FileSystem fs = FileSystem.get(conf);
  if (fs.exists(outDir)) {
    fs.delete(outDir, true);
  }
  if (!fs.exists(inDir)) {
    fs.mkdirs(inDir);
  }
  String input = "The quick brown fox\n" + "has many silly\n"
      + "red fox sox\n";
  for (int i = 0; i < numMaps; ++i) {
    DataOutputStream file = fs.create(new Path(inDir, "part-" + i));
    file.writeBytes(input);
    file.close();
  }

  DistributedCache.addFileToClassPath(TestMRJobs.APP_JAR, conf, fs);
  conf.setOutputCommitter(CustomOutputCommitter.class);
  conf.setInputFormat(TextInputFormat.class);
  conf.setOutputKeyClass(LongWritable.class);
  conf.setOutputValueClass(Text.class);

  FileInputFormat.setInputPaths(conf, inDir);
  FileOutputFormat.setOutputPath(conf, outDir);
  conf.setNumMapTasks(numMaps);
  conf.setNumReduceTasks(numReds);

  JobClient jobClient = new JobClient(conf);
  
  RunningJob job = jobClient.submitJob(conf);
  return jobClient.monitorAndPrintJob(conf, job);
}
 
源代码11 项目: hadoop   文件: TestMRAppWithCombiner.java
@Test
public void testCombinerShouldUpdateTheReporter() throws Exception {
  JobConf conf = new JobConf(mrCluster.getConfig());
  int numMaps = 5;
  int numReds = 2;
  Path in = new Path(mrCluster.getTestWorkDir().getAbsolutePath(),
      "testCombinerShouldUpdateTheReporter-in");
  Path out = new Path(mrCluster.getTestWorkDir().getAbsolutePath(),
      "testCombinerShouldUpdateTheReporter-out");
  createInputOutPutFolder(in, out, numMaps);
  conf.setJobName("test-job-with-combiner");
  conf.setMapperClass(IdentityMapper.class);
  conf.setCombinerClass(MyCombinerToCheckReporter.class);
  //conf.setJarByClass(MyCombinerToCheckReporter.class);
  conf.setReducerClass(IdentityReducer.class);
  DistributedCache.addFileToClassPath(TestMRJobs.APP_JAR, conf);
  conf.setOutputCommitter(CustomOutputCommitter.class);
  conf.setInputFormat(TextInputFormat.class);
  conf.setOutputKeyClass(LongWritable.class);
  conf.setOutputValueClass(Text.class);

  FileInputFormat.setInputPaths(conf, in);
  FileOutputFormat.setOutputPath(conf, out);
  conf.setNumMapTasks(numMaps);
  conf.setNumReduceTasks(numReds);
  
  runJob(conf);
}
 
源代码12 项目: gemfirexd-oss   文件: BusyLegs.java
public int run(String[] args) throws Exception {

    GfxdDataSerializable.initTypes();

    JobConf conf = new JobConf(getConf());
    conf.setJobName("Busy Leg Count");

    Path outputPath = new Path(args[0]);
    String hdfsHomeDir = args[1];
    String tableName = args[2];

    outputPath.getFileSystem(conf).delete(outputPath, true);

    conf.set(RowInputFormat.HOME_DIR, hdfsHomeDir);
    conf.set(RowInputFormat.INPUT_TABLE, tableName);
    conf.setBoolean(RowInputFormat.CHECKPOINT_MODE, false);

    // Configure Mapper
    conf.setInputFormat(RowInputFormat.class);
    conf.setMapperClass(SampleMapper.class);
    conf.setMapOutputKeyClass(Text.class);
    conf.setMapOutputValueClass(IntWritable.class);

    // Configure Reducer
    conf.setReducerClass(SampleReducer.class);
    conf.setOutputKeyClass(Text.class);
    conf.setOutputValueClass(IntWritable.class);

    FileOutputFormat.setOutputPath(conf, outputPath);

    JobClient.runJob(conf);
    return 0;
  }
 
源代码13 项目: nutch-htmlunit   文件: LinkRank.java
/**
 * Runs the counter job. The counter job determines the number of links in the
 * webgraph. This is used during analysis.
 * 
 * @param fs The job file system.
 * @param webGraphDb The web graph database to use.
 * 
 * @return The number of nodes in the web graph.
 * @throws IOException If an error occurs while running the counter job.
 */
private int runCounter(FileSystem fs, Path webGraphDb)
  throws IOException {

  // configure the counter job
  Path numLinksPath = new Path(webGraphDb, NUM_NODES);
  Path nodeDb = new Path(webGraphDb, WebGraph.NODE_DIR);
  JobConf counter = new NutchJob(getConf());
  counter.setJobName("LinkRank Counter");
  FileInputFormat.addInputPath(counter, nodeDb);
  FileOutputFormat.setOutputPath(counter, numLinksPath);
  counter.setInputFormat(SequenceFileInputFormat.class);
  counter.setMapperClass(Counter.class);
  counter.setCombinerClass(Counter.class);
  counter.setReducerClass(Counter.class);
  counter.setMapOutputKeyClass(Text.class);
  counter.setMapOutputValueClass(LongWritable.class);
  counter.setOutputKeyClass(Text.class);
  counter.setOutputValueClass(LongWritable.class);
  counter.setNumReduceTasks(1);
  counter.setOutputFormat(TextOutputFormat.class);
  counter.setBoolean("mapreduce.fileoutputcommitter.marksuccessfuljobs", false);

  // run the counter job, outputs to a single reduce task and file
  LOG.info("Starting link counter job");
  try {
    JobClient.runJob(counter);
  }
  catch (IOException e) {
    LOG.error(StringUtils.stringifyException(e));
    throw e;
  }
  LOG.info("Finished link counter job");

  // read the first (and only) line from the file which should be the
  // number of links in the web graph
  LOG.info("Reading numlinks temp file");
  FSDataInputStream readLinks = fs.open(new Path(numLinksPath, "part-00000"));
  BufferedReader buffer = new BufferedReader(new InputStreamReader(readLinks));
  String numLinksLine = buffer.readLine();
  readLinks.close();
  
  // check if there are links to process, if none, webgraph might be empty
  if (numLinksLine == null || numLinksLine.length() == 0) {
    fs.delete(numLinksPath, true);
    throw new IOException("No links to process, is the webgraph empty?");
  }
  
  // delete temp file and convert and return the number of links as an int
  LOG.info("Deleting numlinks temp file");
  fs.delete(numLinksPath, true);
  String numLinks = numLinksLine.split("\\s+")[1];
  return Integer.parseInt(numLinks);
}
 
源代码14 项目: nutch-htmlunit   文件: CrawlDbReader.java
public void processTopNJob(String crawlDb, long topN, float min, String output, Configuration config) throws IOException {

    if (LOG.isInfoEnabled()) {
      LOG.info("CrawlDb topN: starting (topN=" + topN + ", min=" + min + ")");
      LOG.info("CrawlDb db: " + crawlDb);
    }

    Path outFolder = new Path(output);
    Path tempDir =
      new Path(config.get("mapred.temp.dir", ".") +
               "/readdb-topN-temp-"+
               Integer.toString(new Random().nextInt(Integer.MAX_VALUE)));

    JobConf job = new NutchJob(config);
    job.setJobName("topN prepare " + crawlDb);
    FileInputFormat.addInputPath(job, new Path(crawlDb, CrawlDb.CURRENT_NAME));
    job.setInputFormat(SequenceFileInputFormat.class);
    job.setMapperClass(CrawlDbTopNMapper.class);
    job.setReducerClass(IdentityReducer.class);

    FileOutputFormat.setOutputPath(job, tempDir);
    job.setOutputFormat(SequenceFileOutputFormat.class);
    job.setOutputKeyClass(FloatWritable.class);
    job.setOutputValueClass(Text.class);

    // XXX hmmm, no setFloat() in the API ... :(
    job.setLong("db.reader.topn.min", Math.round(1000000.0 * min));
    JobClient.runJob(job);

    if (LOG.isInfoEnabled()) {
      LOG.info("CrawlDb topN: collecting topN scores.");
    }
    job = new NutchJob(config);
    job.setJobName("topN collect " + crawlDb);
    job.setLong("db.reader.topn", topN);

    FileInputFormat.addInputPath(job, tempDir);
    job.setInputFormat(SequenceFileInputFormat.class);
    job.setMapperClass(IdentityMapper.class);
    job.setReducerClass(CrawlDbTopNReducer.class);

    FileOutputFormat.setOutputPath(job, outFolder);
    job.setOutputFormat(TextOutputFormat.class);
    job.setOutputKeyClass(FloatWritable.class);
    job.setOutputValueClass(Text.class);

    job.setNumReduceTasks(1); // create a single file.

    JobClient.runJob(job);
    FileSystem fs = FileSystem.get(config);
    fs.delete(tempDir, true);
    if (LOG.isInfoEnabled()) { LOG.info("CrawlDb topN: done"); }

  }
 
源代码15 项目: gemfirexd-oss   文件: VerifyHdfsDataUsingMR.java
public int run(String[] args) throws Exception {

    // [email protected] -- why do we need this?
    GfxdDataSerializable.initTypes();

    JobConf conf = new JobConf(getConf());
    conf.setJobName("hdfsMapReduce");

    String hdfsHomeDir = args[0];
    String url         = args[1];
    String tableName   = args[2];

    System.out.println("VerifyHdfsData.run() invoked with " 
                       + " hdfsHomeDir = " + hdfsHomeDir 
                       + " url = " + url
                       + " tableName = " + tableName);

    // Job-specific params
    conf.set(RowInputFormat.HOME_DIR, hdfsHomeDir);
    conf.set(RowInputFormat.INPUT_TABLE, tableName);
    conf.setBoolean(RowInputFormat.CHECKPOINT_MODE, false);
    
    conf.setInputFormat(RowInputFormat.class);
    conf.setMapperClass(HdfsDataMapper.class);
    conf.setMapOutputKeyClass(Text.class);
    conf.setMapOutputValueClass(MyRow.class);
    
    conf.setReducerClass(HdfsDataReducer.class);
    conf.set(RowOutputFormat.OUTPUT_TABLE, "TRADE.HDFS_CUSTOMERS");
    //conf.set(GfxdOutputFormat.OUTPUT_SCHEMA, "APP");
    conf.set(RowOutputFormat.OUTPUT_URL, url);
    conf.setOutputFormat(RowOutputFormat.class);
    conf.setOutputKeyClass(Key.class);
    conf.setOutputValueClass(DataObject.class);

    StringBuffer aStr = new StringBuffer();
    aStr.append("HOME_DIR = " + conf.get(RowInputFormat.HOME_DIR) + " ");
    aStr.append("INPUT_TABLE = " + conf.get(RowInputFormat.INPUT_TABLE) + " ");
    aStr.append("OUTPUT_TABLE = " + conf.get(RowOutputFormat.OUTPUT_TABLE) + " ");
    aStr.append("OUTPUT_URL = " + conf.get(RowOutputFormat.OUTPUT_URL) + " ");
    System.out.println("VerifyHdfsData running with the following conf: " + aStr.toString());

    // not planning to use this, but I get an NPE without it
    FileOutputFormat.setOutputPath(conf, new Path("" + System.currentTimeMillis()));
    
    JobClient.runJob(conf);
    return 0;
  }
 
源代码16 项目: RDFS   文件: RandomWriter.java
/**
 * This is the main routine for launching a distributed random write job.
 * It runs 10 maps/node and each node writes 1 gig of data to a DFS file.
 * The reduce doesn't do anything.
 * 
 * @throws IOException 
 */
public int run(String[] args) throws Exception {    
  if (args.length == 0) {
    System.out.println("Usage: writer <out-dir>");
    ToolRunner.printGenericCommandUsage(System.out);
    return -1;
  }
  
  Path outDir = new Path(args[0]);
  JobConf job = new JobConf(getConf());
  
  job.setJarByClass(RandomWriter.class);
  job.setJobName("random-writer");
  FileOutputFormat.setOutputPath(job, outDir);
  
  job.setOutputKeyClass(BytesWritable.class);
  job.setOutputValueClass(BytesWritable.class);
  
  job.setInputFormat(RandomInputFormat.class);
  job.setMapperClass(Map.class);        
  job.setReducerClass(IdentityReducer.class);
  job.setOutputFormat(SequenceFileOutputFormat.class);
  
  JobClient client = new JobClient(job);
  ClusterStatus cluster = client.getClusterStatus();
  int numMapsPerHost = job.getInt("test.randomwriter.maps_per_host", 10);
  long numBytesToWritePerMap = job.getLong("test.randomwrite.bytes_per_map",
                                           1*1024*1024*1024);
  if (numBytesToWritePerMap == 0) {
    System.err.println("Cannot have test.randomwrite.bytes_per_map set to 0");
    return -2;
  }
  long totalBytesToWrite = job.getLong("test.randomwrite.total_bytes", 
       numMapsPerHost*numBytesToWritePerMap*cluster.getTaskTrackers());
  int numMaps = (int) (totalBytesToWrite / numBytesToWritePerMap);
  if (numMaps == 0 && totalBytesToWrite > 0) {
    numMaps = 1;
    job.setLong("test.randomwrite.bytes_per_map", totalBytesToWrite);
  }
  
  job.setNumMapTasks(numMaps);
  System.out.println("Running " + numMaps + " maps.");
  
  // reducer NONE
  job.setNumReduceTasks(0);
  
  Date startTime = new Date();
  System.out.println("Job started: " + startTime);
  JobClient.runJob(job);
  Date endTime = new Date();
  System.out.println("Job ended: " + endTime);
  System.out.println("The job took " + 
                     (endTime.getTime() - startTime.getTime()) /1000 + 
                     " seconds.");
  
  return 0;
}
 
源代码17 项目: RDFS   文件: TestKeyFieldBasedComparator.java
public void configure(String keySpec, int expect) throws Exception {
  Path testdir = new Path("build/test/test.mapred.spill");
  Path inDir = new Path(testdir, "in");
  Path outDir = new Path(testdir, "out");
  FileSystem fs = getFileSystem();
  fs.delete(testdir, true);
  conf.setInputFormat(TextInputFormat.class);
  FileInputFormat.setInputPaths(conf, inDir);
  FileOutputFormat.setOutputPath(conf, outDir);
  conf.setOutputKeyClass(Text.class);
  conf.setOutputValueClass(LongWritable.class);

  conf.setNumMapTasks(1);
  conf.setNumReduceTasks(2);

  conf.setOutputFormat(TextOutputFormat.class);
  conf.setOutputKeyComparatorClass(KeyFieldBasedComparator.class);
  conf.setKeyFieldComparatorOptions(keySpec);
  conf.setKeyFieldPartitionerOptions("-k1.1,1.1");
  conf.set("map.output.key.field.separator", " ");
  conf.setMapperClass(InverseMapper.class);
  conf.setReducerClass(IdentityReducer.class);
  if (!fs.mkdirs(testdir)) {
    throw new IOException("Mkdirs failed to create " + testdir.toString());
  }
  if (!fs.mkdirs(inDir)) {
    throw new IOException("Mkdirs failed to create " + inDir.toString());
  }
  // set up input data in 2 files 
  Path inFile = new Path(inDir, "part0");
  FileOutputStream fos = new FileOutputStream(inFile.toString());
  fos.write((line1 + "\n").getBytes());
  fos.write((line2 + "\n").getBytes());
  fos.close();
  JobClient jc = new JobClient(conf);
  RunningJob r_job = jc.submitJob(conf);
  while (!r_job.isComplete()) {
    Thread.sleep(1000);
  }
  
  if (!r_job.isSuccessful()) {
    fail("Oops! The job broke due to an unexpected error");
  }
  Path[] outputFiles = FileUtil.stat2Paths(
      getFileSystem().listStatus(outDir,
      new Utils.OutputFileUtils.OutputFilesFilter()));
  if (outputFiles.length > 0) {
    InputStream is = getFileSystem().open(outputFiles[0]);
    BufferedReader reader = new BufferedReader(new InputStreamReader(is));
    String line = reader.readLine();
    //make sure we get what we expect as the first line, and also
    //that we have two lines (both the lines must end up in the same
    //reducer since the partitioner takes the same key spec for all
    //lines
    if (expect == 1) {
      assertTrue(line.startsWith(line1));
    } else if (expect == 2) {
      assertTrue(line.startsWith(line2));
    }
    line = reader.readLine();
    if (expect == 1) {
      assertTrue(line.startsWith(line2));
    } else if (expect == 2) {
      assertTrue(line.startsWith(line1));
    }
    reader.close();
  }
}
 
源代码18 项目: RDFS   文件: HadoopArchives.java
/**archive the given source paths into
 * the dest
 * @param parentPath the parent path of all the source paths
 * @param srcPaths the src paths to be archived
 * @param dest the dest dir that will contain the archive
 */
void archive(Path parentPath, List<Path> srcPaths, 
    String archiveName, Path dest) throws IOException {
  checkPaths(conf, srcPaths);
  int numFiles = 0;
  long totalSize = 0;
  FileSystem fs = parentPath.getFileSystem(conf);
  this.blockSize = conf.getLong(HAR_BLOCKSIZE_LABEL, blockSize);
  this.partSize = conf.getLong(HAR_PARTSIZE_LABEL, partSize);
  conf.setLong(HAR_BLOCKSIZE_LABEL, blockSize);
  conf.setLong(HAR_PARTSIZE_LABEL, partSize);
  conf.set(DST_HAR_LABEL, archiveName);
  conf.set(SRC_PARENT_LABEL, parentPath.makeQualified(fs).toString());
  Path outputPath = new Path(dest, archiveName);
  FileOutputFormat.setOutputPath(conf, outputPath);
  FileSystem outFs = outputPath.getFileSystem(conf);
  if (outFs.exists(outputPath) || outFs.isFile(dest)) {
    throw new IOException("Invalid Output: " + outputPath);
  }
  conf.set(DST_DIR_LABEL, outputPath.toString());
  final String randomId = DistCp.getRandomId();
  Path jobDirectory = new Path(new JobClient(conf).getSystemDir(),
                        NAME + "_" + randomId);
  conf.set(JOB_DIR_LABEL, jobDirectory.toString());
  //get a tmp directory for input splits
  FileSystem jobfs = jobDirectory.getFileSystem(conf);
  jobfs.mkdirs(jobDirectory);
  Path srcFiles = new Path(jobDirectory, "_har_src_files");
  conf.set(SRC_LIST_LABEL, srcFiles.toString());
  SequenceFile.Writer srcWriter = SequenceFile.createWriter(jobfs, conf,
      srcFiles, LongWritable.class, HarEntry.class, 
      SequenceFile.CompressionType.NONE);
  // get the list of files 
  // create single list of files and dirs
  try {
    // write the top level dirs in first 
    writeTopLevelDirs(srcWriter, srcPaths, parentPath);
    srcWriter.sync();
    // these are the input paths passed 
    // from the command line
    // we do a recursive ls on these paths 
    // and then write them to the input file 
    // one at a time
    for (Path src: srcPaths) {
      ArrayList<FileStatus> allFiles = new ArrayList<FileStatus>();
      recursivels(fs, src, allFiles);
      for (FileStatus stat: allFiles) {
        String toWrite = "";
        long len = stat.isDir()? 0:stat.getLen();
        String path = relPathToRoot(stat.getPath(), parentPath).toString();
        String[] children = null;
        if (stat.isDir()) {
          //get the children 
          FileStatus[] list = fs.listStatus(stat.getPath());
          children = new String[list.length];
          for (int i = 0; i < list.length; i++) {
            children[i] = list[i].getPath().getName();
          }
        }
        srcWriter.append(new LongWritable(len), new HarEntry(path, children));
        srcWriter.sync();
        numFiles++;
        totalSize += len;
      }
    }
  } finally {
    srcWriter.close();
  }
  //increase the replication of src files
  jobfs.setReplication(srcFiles, (short) 10);
  conf.setInt(SRC_COUNT_LABEL, numFiles);
  conf.setLong(TOTAL_SIZE_LABEL, totalSize);
  int numMaps = (int)(totalSize/partSize);
  //run atleast one map.
  conf.setNumMapTasks(numMaps == 0? 1:numMaps);
  conf.setNumReduceTasks(1);
  conf.setInputFormat(HArchiveInputFormat.class);
  conf.setOutputFormat(NullOutputFormat.class);
  conf.setMapperClass(HArchivesMapper.class);
  conf.setReducerClass(HArchivesReducer.class);
  conf.setMapOutputKeyClass(IntWritable.class);
  conf.setMapOutputValueClass(Text.class);
  conf.set("hadoop.job.history.user.location", "none");
  FileInputFormat.addInputPath(conf, jobDirectory);
  //make sure no speculative execution is done
  conf.setSpeculativeExecution(false);
  JobClient.runJob(conf);
  //delete the tmp job directory
  try {
    jobfs.delete(jobDirectory, true);
  } catch(IOException ie) {
    LOG.info("Unable to clean tmp directory " + jobDirectory);
  }
}
 
源代码19 项目: RDFS   文件: PiEstimator.java
/**
 * Run a map/reduce job for estimating Pi.
 *
 * @return the estimated value of Pi
 */
public static BigDecimal estimate(int numMaps, long numPoints, JobConf jobConf
    ) throws IOException {
  //setup job conf
  jobConf.setJobName(PiEstimator.class.getSimpleName());

  jobConf.setInputFormat(SequenceFileInputFormat.class);

  jobConf.setOutputKeyClass(BooleanWritable.class);
  jobConf.setOutputValueClass(LongWritable.class);
  jobConf.setOutputFormat(SequenceFileOutputFormat.class);

  jobConf.setMapperClass(PiMapper.class);
  jobConf.setNumMapTasks(numMaps);

  jobConf.setReducerClass(PiReducer.class);
  jobConf.setNumReduceTasks(1);

  // turn off speculative execution, because DFS doesn't handle
  // multiple writers to the same file.
  jobConf.setSpeculativeExecution(false);

  //setup input/output directories
  final Path inDir = new Path(TMP_DIR, "in");
  final Path outDir = new Path(TMP_DIR, "out");
  FileInputFormat.setInputPaths(jobConf, inDir);
  FileOutputFormat.setOutputPath(jobConf, outDir);

  final FileSystem fs = FileSystem.get(jobConf);
  if (fs.exists(TMP_DIR)) {
    throw new IOException("Tmp directory " + fs.makeQualified(TMP_DIR)
        + " already exists.  Please remove it first.");
  }
  if (!fs.mkdirs(inDir)) {
    throw new IOException("Cannot create input directory " + inDir);
  }

  try {
    //generate an input file for each map task
    for(int i=0; i < numMaps; ++i) {
      final Path file = new Path(inDir, "part"+i);
      final LongWritable offset = new LongWritable(i * numPoints);
      final LongWritable size = new LongWritable(numPoints);
      final SequenceFile.Writer writer = SequenceFile.createWriter(
          fs, jobConf, file,
          LongWritable.class, LongWritable.class, CompressionType.NONE);
      try {
        writer.append(offset, size);
      } finally {
        writer.close();
      }
      System.out.println("Wrote input for Map #"+i);
    }

    //start a map/reduce job
    System.out.println("Starting Job");
    final long startTime = System.currentTimeMillis();
    JobClient.runJob(jobConf);
    final double duration = (System.currentTimeMillis() - startTime)/1000.0;
    System.out.println("Job Finished in " + duration + " seconds");

    //read outputs
    Path inFile = new Path(outDir, "reduce-out");
    LongWritable numInside = new LongWritable();
    LongWritable numOutside = new LongWritable();
    SequenceFile.Reader reader = new SequenceFile.Reader(fs, inFile, jobConf);
    try {
      reader.next(numInside, numOutside);
    } finally {
      reader.close();
    }

    //compute estimated value
    return BigDecimal.valueOf(4).setScale(20)
        .multiply(BigDecimal.valueOf(numInside.get()))
        .divide(BigDecimal.valueOf(numMaps))
        .divide(BigDecimal.valueOf(numPoints));
  } finally {
    fs.delete(TMP_DIR, true);
  }
}
 
源代码20 项目: anthelion   文件: NodeDumper.java
/**
 * Runs the process to dump the top urls out to a text file.
 *
 * @param webGraphDb The WebGraph from which to pull values.
 *
 * @param topN
 * @param output
 *
 * @throws IOException If an error occurs while dumping the top values.
 */
public void dumpNodes(Path webGraphDb, DumpType type, long topN, Path output, boolean asEff, NameType nameType, AggrType aggrType, boolean asSequenceFile)
  throws Exception {

  SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
  long start = System.currentTimeMillis();
  LOG.info("NodeDumper: starting at " + sdf.format(start));
  Path nodeDb = new Path(webGraphDb, WebGraph.NODE_DIR);
  Configuration conf = getConf();

  JobConf dumper = new NutchJob(conf);
  dumper.setJobName("NodeDumper: " + webGraphDb);
  FileInputFormat.addInputPath(dumper, nodeDb);
  dumper.setInputFormat(SequenceFileInputFormat.class);

  if (nameType == null) {
    dumper.setMapperClass(Sorter.class);
    dumper.setReducerClass(Sorter.class);
    dumper.setMapOutputKeyClass(FloatWritable.class);
    dumper.setMapOutputValueClass(Text.class);
  } else {
    dumper.setMapperClass(Dumper.class);
    dumper.setReducerClass(Dumper.class);
    dumper.setMapOutputKeyClass(Text.class);
    dumper.setMapOutputValueClass(FloatWritable.class);
  }

  dumper.setOutputKeyClass(Text.class);
  dumper.setOutputValueClass(FloatWritable.class);
  FileOutputFormat.setOutputPath(dumper, output);

  if (asSequenceFile) {
    dumper.setOutputFormat(SequenceFileOutputFormat.class);
  } else {
    dumper.setOutputFormat(TextOutputFormat.class);
  }

  dumper.setNumReduceTasks(1);
  dumper.setBoolean("inlinks", type == DumpType.INLINKS);
  dumper.setBoolean("outlinks", type == DumpType.OUTLINKS);
  dumper.setBoolean("scores", type == DumpType.SCORES);

  dumper.setBoolean("host", nameType == NameType.HOST);
  dumper.setBoolean("domain", nameType == NameType.DOMAIN);
  dumper.setBoolean("sum", aggrType == AggrType.SUM);
  dumper.setBoolean("max", aggrType == AggrType.MAX);

  dumper.setLong("topn", topN);

  // Set equals-sign as separator for Solr's ExternalFileField
  if (asEff) {
    dumper.set("mapred.textoutputformat.separator", "=");
  }

  try {
    LOG.info("NodeDumper: running");
    JobClient.runJob(dumper);
  }
  catch (IOException e) {
    LOG.error(StringUtils.stringifyException(e));
    throw e;
  }
  long end = System.currentTimeMillis();
  LOG.info("NodeDumper: finished at " + sdf.format(end) + ", elapsed: " + TimingUtil.elapsedTime(start, end));
}