org.apache.hadoop.mapred.JobConf#setOutputValueClass ( )源码实例Demo

下面列出了org.apache.hadoop.mapred.JobConf#setOutputValueClass ( ) 实例代码,或者点击链接到github查看源代码,也可以在右侧发表评论。

源代码1 项目: big-c   文件: NNBench.java
/**
 * Run the test
 * 
 * @throws IOException on error
 */
public static void runTests() throws IOException {
  config.setLong("io.bytes.per.checksum", bytesPerChecksum);
  
  JobConf job = new JobConf(config, NNBench.class);

  job.setJobName("NNBench-" + operation);
  FileInputFormat.setInputPaths(job, new Path(baseDir, CONTROL_DIR_NAME));
  job.setInputFormat(SequenceFileInputFormat.class);
  
  // Explicitly set number of max map attempts to 1.
  job.setMaxMapAttempts(1);
  
  // Explicitly turn off speculative execution
  job.setSpeculativeExecution(false);

  job.setMapperClass(NNBenchMapper.class);
  job.setReducerClass(NNBenchReducer.class);

  FileOutputFormat.setOutputPath(job, new Path(baseDir, OUTPUT_DIR_NAME));
  job.setOutputKeyClass(Text.class);
  job.setOutputValueClass(Text.class);
  job.setNumReduceTasks((int) numberOfReduces);
  JobClient.runJob(job);
}
 
源代码2 项目: hadoop   文件: TestDatamerge.java
private static void joinAs(String jointype,
    Class<? extends SimpleCheckerBase> c) throws Exception {
  final int srcs = 4;
  Configuration conf = new Configuration();
  JobConf job = new JobConf(conf, c);
  Path base = cluster.getFileSystem().makeQualified(new Path("/"+jointype));
  Path[] src = writeSimpleSrc(base, conf, srcs);
  job.set("mapreduce.join.expr", CompositeInputFormat.compose(jointype,
      SequenceFileInputFormat.class, src));
  job.setInt("testdatamerge.sources", srcs);
  job.setInputFormat(CompositeInputFormat.class);
  FileOutputFormat.setOutputPath(job, new Path(base, "out"));

  job.setMapperClass(c);
  job.setReducerClass(c);
  job.setOutputKeyClass(IntWritable.class);
  job.setOutputValueClass(IntWritable.class);
  JobClient.runJob(job);
  base.getFileSystem(job).delete(base, true);
}
 
源代码3 项目: hadoop-gpu   文件: TestFileSystem.java
public static void readTest(FileSystem fs, boolean fastCheck)
  throws Exception {

  fs.delete(READ_DIR, true);

  JobConf job = new JobConf(conf, TestFileSystem.class);
  job.setBoolean("fs.test.fastCheck", fastCheck);


  FileInputFormat.setInputPaths(job, CONTROL_DIR);
  job.setInputFormat(SequenceFileInputFormat.class);

  job.setMapperClass(ReadMapper.class);
  job.setReducerClass(LongSumReducer.class);

  FileOutputFormat.setOutputPath(job, READ_DIR);
  job.setOutputKeyClass(UTF8.class);
  job.setOutputValueClass(LongWritable.class);
  job.setNumReduceTasks(1);
  JobClient.runJob(job);
}
 
源代码4 项目: RDFS   文件: TestFileSystem.java
public static void readTest(FileSystem fs, boolean fastCheck)
  throws Exception {

  fs.delete(READ_DIR, true);

  JobConf job = new JobConf(conf, TestFileSystem.class);
  job.setBoolean("fs.test.fastCheck", fastCheck);


  FileInputFormat.setInputPaths(job, CONTROL_DIR);
  job.setInputFormat(SequenceFileInputFormat.class);

  job.setMapperClass(ReadMapper.class);
  job.setReducerClass(LongSumReducer.class);

  FileOutputFormat.setOutputPath(job, READ_DIR);
  job.setOutputKeyClass(UTF8.class);
  job.setOutputValueClass(LongWritable.class);
  job.setNumReduceTasks(1);
  JobClient.runJob(job);
}
 
源代码5 项目: tez   文件: TestMROutput.java
@Test(timeout = 5000)
public void testOldAPI_SequenceFileOutputFormat() throws Exception {
  JobConf conf = new JobConf();
  conf.setOutputKeyClass(NullWritable.class);
  conf.setOutputValueClass(Text.class);
  DataSinkDescriptor dataSink = MROutput
      .createConfigBuilder(conf,
          org.apache.hadoop.mapred.SequenceFileOutputFormat.class,
          tmpDir.getPath())
      .build();

  OutputContext outputContext = createMockOutputContext(dataSink.getOutputDescriptor().getUserPayload(),
      new Configuration(false));
  MROutput output = new MROutput(outputContext, 2);
  output.initialize();
  assertEquals(false, output.useNewApi);
  assertEquals(org.apache.hadoop.mapred.SequenceFileOutputFormat.class, output.oldOutputFormat.getClass());
  assertNull(output.newOutputFormat);
  assertEquals(NullWritable.class, output.oldApiTaskAttemptContext.getOutputKeyClass());
  assertEquals(Text.class, output.oldApiTaskAttemptContext.getOutputValueClass());
  assertNull(output.newApiTaskAttemptContext);
  assertNotNull(output.oldRecordWriter);
  assertNull(output.newRecordWriter);
  assertEquals(org.apache.hadoop.mapred.FileOutputCommitter.class, output.committer.getClass());
}
 
源代码6 项目: RDFS   文件: DFSGeneralTest.java
private void updateJobConf(JobConf conf, Path inputPath, Path outputPath) {
  // set specific job config
  conf.setLong(NUMBER_OF_MAPS_KEY, nmaps);
  conf.setLong(NUMBER_OF_THREADS_KEY, nthreads);
  conf.setInt(BUFFER_SIZE_KEY, buffersize);
  conf.setLong(WRITER_DATARATE_KEY, datarate);
  conf.setLong("mapred.task.timeout", Long.MAX_VALUE);
  conf.set(OUTPUT_DIR_KEY, output);
  
  // set the output and input for the map reduce
  FileInputFormat.setInputPaths(conf, inputPath);
  FileOutputFormat.setOutputPath(conf, outputPath);

  conf.setInputFormat(SequenceFileInputFormat.class);
  conf.setOutputKeyClass(Text.class);
  conf.setOutputValueClass(Text.class);
  conf.setNumReduceTasks(1);
  conf.setSpeculativeExecution(false);
}
 
源代码7 项目: RDFS   文件: TestFileSystem.java
public static void writeTest(FileSystem fs, boolean fastCheck)
  throws Exception {

  fs.delete(DATA_DIR, true);
  fs.delete(WRITE_DIR, true);
  
  JobConf job = new JobConf(conf, TestFileSystem.class);
  job.setBoolean("fs.test.fastCheck", fastCheck);

  FileInputFormat.setInputPaths(job, CONTROL_DIR);
  job.setInputFormat(SequenceFileInputFormat.class);

  job.setMapperClass(WriteMapper.class);
  job.setReducerClass(LongSumReducer.class);

  FileOutputFormat.setOutputPath(job, WRITE_DIR);
  job.setOutputKeyClass(UTF8.class);
  job.setOutputValueClass(LongWritable.class);
  job.setNumReduceTasks(1);
  JobClient.runJob(job);
}
 
源代码8 项目: tez   文件: TestMROutput.java
@Test(timeout = 5000)
public void testNewAPI_SequenceFileOutputFormat() throws Exception {
  JobConf conf = new JobConf();
  conf.setOutputKeyClass(NullWritable.class);
  conf.setOutputValueClass(Text.class);
  DataSinkDescriptor dataSink = MROutput
      .createConfigBuilder(conf, SequenceFileOutputFormat.class,
          tmpDir.getPath())
      .build();

  OutputContext outputContext = createMockOutputContext(dataSink.getOutputDescriptor().getUserPayload(),
      new Configuration(false));
  MROutput output = new MROutput(outputContext, 2);
  output.initialize();
  assertEquals(true, output.useNewApi);
  assertEquals(SequenceFileOutputFormat.class, output.newOutputFormat.getClass());
  assertNull(output.oldOutputFormat);
  assertEquals(NullWritable.class, output.newApiTaskAttemptContext.getOutputKeyClass());
  assertEquals(Text.class, output.newApiTaskAttemptContext.getOutputValueClass());
  assertNull(output.oldApiTaskAttemptContext);
  assertNotNull(output.newRecordWriter);
  assertNull(output.oldRecordWriter);
  assertEquals(FileOutputCommitter.class, output.committer.getClass());
}
 
源代码9 项目: hadoop-book   文件: MultiFileWordCount.java
public int run(String[] args) throws Exception {

        if (args.length < 2) {
            printUsage();
            return 1;
        }

        JobConf job = new JobConf(getConf(), MultiFileWordCount.class);
        job.setJobName("MultiFileWordCount");

        //set the InputFormat of the job to our InputFormat
        job.setInputFormat(MyInputFormat.class);

        // the keys are words (strings)
        job.setOutputKeyClass(Text.class);
        // the values are counts (ints)
        job.setOutputValueClass(LongWritable.class);

        //use the defined mapper
        job.setMapperClass(MapClass.class);
        //use the WordCount Reducer
        job.setCombinerClass(LongSumReducer.class);
        job.setReducerClass(LongSumReducer.class);

        FileInputFormat.addInputPaths(job, args[0]);
        FileOutputFormat.setOutputPath(job, new Path(args[1]));

        JobClient.runJob(job);

        return 0;
    }
 
源代码10 项目: RDFS   文件: UtilsForTests.java
static RunningJob runJob(JobConf conf, Path inDir, Path outDir, int numMaps, 
                         int numReds) throws IOException {

  FileSystem fs = FileSystem.get(conf);
  if (fs.exists(outDir)) {
    fs.delete(outDir, true);
  }
  if (!fs.exists(inDir)) {
    fs.mkdirs(inDir);
  }
  String input = "The quick brown fox\n" + "has many silly\n"
      + "red fox sox\n";
  for (int i = 0; i < numMaps; ++i) {
    DataOutputStream file = fs.create(new Path(inDir, "part-" + i));
    file.writeBytes(input);
    file.close();
  }    

  conf.setInputFormat(TextInputFormat.class);
  conf.setOutputKeyClass(LongWritable.class);
  conf.setOutputValueClass(Text.class);

  FileInputFormat.setInputPaths(conf, inDir);
  FileOutputFormat.setOutputPath(conf, outDir);
  conf.setNumMapTasks(numMaps);
  conf.setNumReduceTasks(numReds);

  JobClient jobClient = new JobClient(conf);
  RunningJob job = jobClient.submitJob(conf);

  return job;
}
 
源代码11 项目: RDFS   文件: TeraSort.java
public int run(String[] args) throws Exception {
  LOG.info("starting");
  JobConf job = (JobConf) getConf();
  Path inputDir = new Path(args[0]);
  inputDir = inputDir.makeQualified(inputDir.getFileSystem(job));
  Path partitionFile = new Path(inputDir, TeraInputFormat.PARTITION_FILENAME);
  URI partitionUri = new URI(partitionFile.toString() +
                             "#" + TeraInputFormat.PARTITION_FILENAME);
  TeraInputFormat.setInputPaths(job, new Path(args[0]));
  FileOutputFormat.setOutputPath(job, new Path(args[1]));
  job.setJobName("TeraSort");
  job.setJarByClass(TeraSort.class);
  job.setOutputKeyClass(Text.class);
  job.setOutputValueClass(Text.class);
  job.setInputFormat(TeraInputFormat.class);
  job.setOutputFormat(TeraOutputFormat.class);
  job.setPartitionerClass(TotalOrderPartitioner.class);
  TeraInputFormat.writePartitionFile(job, partitionFile);
  DistributedCache.addCacheFile(partitionUri, job);
  DistributedCache.createSymlink(job);
  job.setInt("dfs.replication", 1);
  TeraOutputFormat.setFinalSync(job, true);
  long startTime = System.currentTimeMillis();
  JobClient.runJob(job);
  long endTime = System.currentTimeMillis();
  System.out.println((float)(endTime-startTime)/1000);
  LOG.info("done");
  return 0;
}
 
源代码12 项目: nutch-htmlunit   文件: CrawlDbReader.java
public void processDumpJob(String crawlDb, String output, Configuration config, String format, String regex, String status, Integer retry) throws IOException {
  if (LOG.isInfoEnabled()) {
    LOG.info("CrawlDb dump: starting");
    LOG.info("CrawlDb db: " + crawlDb);
  }

  Path outFolder = new Path(output);

  JobConf job = new NutchJob(config);
  job.setJobName("dump " + crawlDb);

  FileInputFormat.addInputPath(job, new Path(crawlDb, CrawlDb.CURRENT_NAME));
  job.setInputFormat(SequenceFileInputFormat.class);
  FileOutputFormat.setOutputPath(job, outFolder);

  if (format.equals("csv")) {
    job.setOutputFormat(CrawlDatumCsvOutputFormat.class);
  }
  else if (format.equals("crawldb")) {
    job.setOutputFormat(MapFileOutputFormat.class);
  } else {
    job.setOutputFormat(TextOutputFormat.class);
  }

  if (status != null) job.set("status", status);
  if (regex != null) job.set("regex", regex);
  if (retry != null) job.setInt("retry", retry);
  
  job.setMapperClass(CrawlDbDumpMapper.class);
  job.setOutputKeyClass(Text.class);
  job.setOutputValueClass(CrawlDatum.class);

  JobClient.runJob(job);
  if (LOG.isInfoEnabled()) { LOG.info("CrawlDb dump: done"); }
}
 
@Override
public int run(String[] args) throws Exception {
	if (args.length != 4) {
		printUsage();
	}
	Path userPath = new Path(args[0]);
	Path commentPath = new Path(args[1]);
	Path outputDir = new Path(args[2]);
	String joinType = args[3];
	JobConf conf = new JobConf("CompositeJoin");
	conf.setJarByClass(CompositeUserJoin.class);
	conf.setMapperClass(CompositeMapper.class);
	conf.setNumReduceTasks(0);
	// Set the input format class to a CompositeInputFormat class.
	// The CompositeInputFormat will parse all of our input files and output
	// records to our mapper.
	conf.setInputFormat(CompositeInputFormat.class);
	// The composite input format join expression will set how the records
	// are going to be read in, and in what input format.
	conf.set("mapred.join.expr", CompositeInputFormat.compose(joinType,
			KeyValueTextInputFormat.class, userPath, commentPath));
	TextOutputFormat.setOutputPath(conf, outputDir);
	conf.setOutputKeyClass(Text.class);
	conf.setOutputValueClass(Text.class);
	RunningJob job = JobClient.runJob(conf);
	while (!job.isComplete()) {
		Thread.sleep(1000);
	}
	return job.isSuccessful() ? 0 : 1;
}
 
源代码14 项目: hadoop   文件: DistCh.java
private static JobConf createJobConf(Configuration conf) {
  JobConf jobconf = new JobConf(conf, DistCh.class);
  jobconf.setJobName(NAME);
  jobconf.setMapSpeculativeExecution(false);

  jobconf.setInputFormat(ChangeInputFormat.class);
  jobconf.setOutputKeyClass(Text.class);
  jobconf.setOutputValueClass(Text.class);

  jobconf.setMapperClass(ChangeFilesMapper.class);
  jobconf.setNumReduceTasks(0);
  return jobconf;
}
 
源代码15 项目: stratosphere   文件: HadoopDataSink.java
@SuppressWarnings("deprecation")
public HadoopDataSink(OutputFormat<K,V> hadoopFormat, JobConf jobConf, String name, List<Operator<Record>> input, StratosphereTypeConverter<K,V> conv, Class<K> keyClass, Class<V> valueClass) {
	super(new HadoopOutputFormatWrapper<K,V>(hadoopFormat, jobConf, conv),input, name);
	Preconditions.checkNotNull(hadoopFormat);
	Preconditions.checkNotNull(jobConf);
	this.name = name;
	this.jobConf = jobConf;
	jobConf.setOutputKeyClass(keyClass);
	jobConf.setOutputValueClass(valueClass);
}
 
源代码16 项目: wikireverse   文件: SegmentCombiner.java
public int run(String[] args) throws Exception {
	// Get current configuration.
	Configuration conf = getConf();

	// Parse command line arguments.
	String inputPaths = args[0];
	String outputPath = args[1];

	JobConf job = new JobConf(conf);

	// Set input path.
	if (inputPaths.length() > 0) {
		List<String> segmentPaths = Lists.newArrayList(Splitter.on(",")
				.split(inputPaths));

		for (String segmentPath : segmentPaths) {
			LOG.info("Adding input path " + segmentPath);
			FileInputFormat.addInputPath(job, new Path(segmentPath));
		}
	} else {
		System.err.println("No input path found.");
		return 1;
	}

	// Set output path.
	if (outputPath.length() > 0) {
		LOG.info("Setting output path to " + outputPath);
		SequenceFileOutputFormat.setOutputPath(job, new Path(outputPath));
		// Compress output to boost performance.
		SequenceFileOutputFormat.setCompressOutput(job, true);
		SequenceFileOutputFormat.getOutputCompressorClass(job, GzipCodec.class);
	} else {
		System.err.println("No output path found.");
		return 1;
	}

	// Load other classes from same jar as this class.
	job.setJarByClass(SegmentCombiner.class);

	// Input is Hadoop sequence file format.
	job.setInputFormat(SequenceFileInputFormat.class);

	// Output is Hadoop sequence file format.
	job.setOutputFormat(SequenceFileOutputFormat.class);

	// Set the output data types.
	job.setOutputKeyClass(Text.class);
	job.setOutputValueClass(LinkArrayWritable.class);

	// Use custom mapper class.
	job.setMapperClass(SegmentCombinerMapper.class);

	// Use custom reducer class.
	job.setReducerClass(LinkArrayReducer.class);

	if (JobClient.runJob(job).isSuccessful())
		return 0;
	else
		return 1;
}
 
源代码17 项目: gemfirexd-oss   文件: TradesHdfsDataVerifier.java
public int run(String[] args) throws Exception {

    GfxdDataSerializable.initTypes();

    JobConf conf = new JobConf(getConf());
    conf.setJobName("TradesHdfsDataVerifier");

    String hdfsHomeDir = args[0];
    String url         = args[1];
    String tableName   = args[2];

    System.out.println("TradesHdfsDataVerifier.run() invoked with " 
                       + " hdfsHomeDir = " + hdfsHomeDir 
                       + " url = " + url
                       + " tableName = " + tableName);

    // Job-specific params
    conf.set(RowInputFormat.HOME_DIR, hdfsHomeDir);
    conf.set(RowInputFormat.INPUT_TABLE, tableName);
    conf.setBoolean(RowInputFormat.CHECKPOINT_MODE, false);
    
    conf.setInputFormat(RowInputFormat.class);
    conf.setMapperClass(HdfsDataMapper.class);
    conf.setMapOutputKeyClass(Text.class);
    conf.setMapOutputValueClass(TradesRow.class);
    
    conf.setReducerClass(HdfsDataReducer.class);
    conf.set(RowOutputFormat.OUTPUT_TABLE, tableName + "_HDFS");
    //conf.set(GfxdOutputFormat.OUTPUT_SCHEMA, "APP");
    conf.set(RowOutputFormat.OUTPUT_URL, url);
    conf.setOutputFormat(RowOutputFormat.class);
    conf.setOutputKeyClass(Key.class);
    conf.setOutputValueClass(TradeOutputObject.class);

    StringBuffer aStr = new StringBuffer();
    aStr.append("HOME_DIR = " + conf.get(RowInputFormat.HOME_DIR) + " ");
    aStr.append("INPUT_TABLE = " + conf.get(RowInputFormat.INPUT_TABLE) + " ");
    aStr.append("OUTPUT_TABLE = " + conf.get(RowOutputFormat.OUTPUT_TABLE) + " ");
    aStr.append("OUTPUT_URL = " + conf.get(RowOutputFormat.OUTPUT_URL) + " ");
    System.out.println("VerifyHdfsData running with the following conf: " + aStr.toString());

    
    FileOutputFormat.setOutputPath(conf, new Path("" + System.currentTimeMillis()));
    
    JobClient.runJob(conf);
    return 0;
  }
 
源代码18 项目: hadoop   文件: LoadGeneratorMR.java
/**
 * Based on args we submit the LoadGenerator as MR job.
 * Number of MapTasks is numMapTasks
 * @return exitCode for job submission
 */
private int submitAsMapReduce() {
  
  System.out.println("Running as a MapReduce job with " + 
      numMapTasks + " mapTasks;  Output to file " + mrOutDir);


  Configuration conf = new Configuration(getConf());
  
  // First set all the args of LoadGenerator as Conf vars to pass to MR tasks

  conf.set(LG_ROOT , root.toString());
  conf.setInt(LG_MAXDELAYBETWEENOPS, maxDelayBetweenOps);
  conf.setInt(LG_NUMOFTHREADS, numOfThreads);
  conf.set(LG_READPR, readProbs[0]+""); //Pass Double as string
  conf.set(LG_WRITEPR, writeProbs[0]+""); //Pass Double as string
  conf.setLong(LG_SEED, seed); //No idea what this is
  conf.setInt(LG_NUMMAPTASKS, numMapTasks);
  if (scriptFile == null && durations[0] <=0) {
    System.err.println("When run as a MapReduce job, elapsed Time or ScriptFile must be specified");
    System.exit(-1);
  }
  conf.setLong(LG_ELAPSEDTIME, durations[0]);
  conf.setLong(LG_STARTTIME, startTime); 
  if (scriptFile != null) {
    conf.set(LG_SCRIPTFILE , scriptFile);
  }
  conf.set(LG_FLAGFILE, flagFile.toString());
  
  // Now set the necessary conf variables that apply to run MR itself.
  JobConf jobConf = new JobConf(conf, LoadGenerator.class);
  jobConf.setJobName("NNLoadGeneratorViaMR");
  jobConf.setNumMapTasks(numMapTasks);
  jobConf.setNumReduceTasks(1); // 1 reducer to collect the results

  jobConf.setOutputKeyClass(Text.class);
  jobConf.setOutputValueClass(IntWritable.class);

  jobConf.setMapperClass(MapperThatRunsNNLoadGenerator.class);
  jobConf.setReducerClass(ReducerThatCollectsLGdata.class);

  jobConf.setInputFormat(DummyInputFormat.class);
  jobConf.setOutputFormat(TextOutputFormat.class);
  
  // Explicitly set number of max map attempts to 1.
  jobConf.setMaxMapAttempts(1);
  // Explicitly turn off speculative execution
  jobConf.setSpeculativeExecution(false);

  // This mapReduce job has no input but has output
  FileOutputFormat.setOutputPath(jobConf, new Path(mrOutDir));

  try {
    JobClient.runJob(jobConf);
  } catch (IOException e) {
    System.err.println("Failed to run job: " + e.getMessage());
    return -1;
  }
  return 0;
  
}
 
源代码19 项目: RDFS   文件: PiEstimator.java
/**
 * Run a map/reduce job for estimating Pi.
 *
 * @return the estimated value of Pi
 */
public static BigDecimal estimate(int numMaps, long numPoints, JobConf jobConf
    ) throws IOException {
  //setup job conf
  jobConf.setJobName(PiEstimator.class.getSimpleName());

  jobConf.setInputFormat(SequenceFileInputFormat.class);

  jobConf.setOutputKeyClass(BooleanWritable.class);
  jobConf.setOutputValueClass(LongWritable.class);
  jobConf.setOutputFormat(SequenceFileOutputFormat.class);

  jobConf.setMapperClass(PiMapper.class);
  jobConf.setNumMapTasks(numMaps);

  jobConf.setReducerClass(PiReducer.class);
  jobConf.setNumReduceTasks(1);

  // turn off speculative execution, because DFS doesn't handle
  // multiple writers to the same file.
  jobConf.setSpeculativeExecution(false);

  //setup input/output directories
  final Path inDir = new Path(TMP_DIR, "in");
  final Path outDir = new Path(TMP_DIR, "out");
  FileInputFormat.setInputPaths(jobConf, inDir);
  FileOutputFormat.setOutputPath(jobConf, outDir);

  final FileSystem fs = FileSystem.get(jobConf);
  if (fs.exists(TMP_DIR)) {
    throw new IOException("Tmp directory " + fs.makeQualified(TMP_DIR)
        + " already exists.  Please remove it first.");
  }
  if (!fs.mkdirs(inDir)) {
    throw new IOException("Cannot create input directory " + inDir);
  }

  try {
    //generate an input file for each map task
    for(int i=0; i < numMaps; ++i) {
      final Path file = new Path(inDir, "part"+i);
      final LongWritable offset = new LongWritable(i * numPoints);
      final LongWritable size = new LongWritable(numPoints);
      final SequenceFile.Writer writer = SequenceFile.createWriter(
          fs, jobConf, file,
          LongWritable.class, LongWritable.class, CompressionType.NONE);
      try {
        writer.append(offset, size);
      } finally {
        writer.close();
      }
      System.out.println("Wrote input for Map #"+i);
    }

    //start a map/reduce job
    System.out.println("Starting Job");
    final long startTime = System.currentTimeMillis();
    JobClient.runJob(jobConf);
    final double duration = (System.currentTimeMillis() - startTime)/1000.0;
    System.out.println("Job Finished in " + duration + " seconds");

    //read outputs
    Path inFile = new Path(outDir, "reduce-out");
    LongWritable numInside = new LongWritable();
    LongWritable numOutside = new LongWritable();
    SequenceFile.Reader reader = new SequenceFile.Reader(fs, inFile, jobConf);
    try {
      reader.next(numInside, numOutside);
    } finally {
      reader.close();
    }

    //compute estimated value
    return BigDecimal.valueOf(4).setScale(20)
        .multiply(BigDecimal.valueOf(numInside.get()))
        .divide(BigDecimal.valueOf(numMaps))
        .divide(BigDecimal.valueOf(numPoints));
  } finally {
    fs.delete(TMP_DIR, true);
  }
}
 
源代码20 项目: nutch-htmlunit   文件: LinkRank.java
/**
 * Runs the counter job. The counter job determines the number of links in the
 * webgraph. This is used during analysis.
 * 
 * @param fs The job file system.
 * @param webGraphDb The web graph database to use.
 * 
 * @return The number of nodes in the web graph.
 * @throws IOException If an error occurs while running the counter job.
 */
private int runCounter(FileSystem fs, Path webGraphDb)
  throws IOException {

  // configure the counter job
  Path numLinksPath = new Path(webGraphDb, NUM_NODES);
  Path nodeDb = new Path(webGraphDb, WebGraph.NODE_DIR);
  JobConf counter = new NutchJob(getConf());
  counter.setJobName("LinkRank Counter");
  FileInputFormat.addInputPath(counter, nodeDb);
  FileOutputFormat.setOutputPath(counter, numLinksPath);
  counter.setInputFormat(SequenceFileInputFormat.class);
  counter.setMapperClass(Counter.class);
  counter.setCombinerClass(Counter.class);
  counter.setReducerClass(Counter.class);
  counter.setMapOutputKeyClass(Text.class);
  counter.setMapOutputValueClass(LongWritable.class);
  counter.setOutputKeyClass(Text.class);
  counter.setOutputValueClass(LongWritable.class);
  counter.setNumReduceTasks(1);
  counter.setOutputFormat(TextOutputFormat.class);
  counter.setBoolean("mapreduce.fileoutputcommitter.marksuccessfuljobs", false);

  // run the counter job, outputs to a single reduce task and file
  LOG.info("Starting link counter job");
  try {
    JobClient.runJob(counter);
  }
  catch (IOException e) {
    LOG.error(StringUtils.stringifyException(e));
    throw e;
  }
  LOG.info("Finished link counter job");

  // read the first (and only) line from the file which should be the
  // number of links in the web graph
  LOG.info("Reading numlinks temp file");
  FSDataInputStream readLinks = fs.open(new Path(numLinksPath, "part-00000"));
  BufferedReader buffer = new BufferedReader(new InputStreamReader(readLinks));
  String numLinksLine = buffer.readLine();
  readLinks.close();
  
  // check if there are links to process, if none, webgraph might be empty
  if (numLinksLine == null || numLinksLine.length() == 0) {
    fs.delete(numLinksPath, true);
    throw new IOException("No links to process, is the webgraph empty?");
  }
  
  // delete temp file and convert and return the number of links as an int
  LOG.info("Deleting numlinks temp file");
  fs.delete(numLinksPath, true);
  String numLinks = numLinksLine.split("\\s+")[1];
  return Integer.parseInt(numLinks);
}