类org.apache.hadoop.mapreduce.lib.partition.InputSampler源码实例Demo

下面列出了怎么用org.apache.hadoop.mapreduce.lib.partition.InputSampler的API类实例代码及写法,或者点击链接到github查看源代码。

源代码1 项目: hiped2   文件: TotalSortMapReduce.java
/**
 * The MapReduce driver - setup and launch the job.
 *
 * @param args the command-line arguments
 * @return the process exit code
 * @throws Exception if something goes wrong
 */
public int run(final String[] args) throws Exception {

  int numReducers = 2;

  Cli cli = Cli.builder().setArgs(args).addOptions(CliOpts.values()).build();
  int result = cli.runCmd();

  if (result != 0) {
    return result;
  }

  Path input = new Path(cli.getArgValueAsString(CliOpts.INPUT));
  Path partitionFile = new Path(cli.getArgValueAsString(CliOpts.PARTITION));
  Path output = new Path(cli.getArgValueAsString(CliOpts.OUTPUT));


  InputSampler.Sampler<Text, Text> sampler =
      new InputSampler.RandomSampler<Text, Text>
          (0.1,
              10000,
              10);

  Configuration conf = super.getConf();

  Job job = new Job(conf);
  job.setJarByClass(TotalSortMapReduce.class);

  job.setNumReduceTasks(numReducers);

  job.setInputFormatClass(KeyValueTextInputFormat.class);
  job.setOutputFormatClass(TextOutputFormat.class);
  job.setPartitionerClass(TotalOrderPartitioner.class);

  job.setMapOutputKeyClass(Text.class);
  job.setMapOutputValueClass(Text.class);

  TotalOrderPartitioner.setPartitionFile(job.getConfiguration(), partitionFile);
  FileInputFormat.setInputPaths(job, input);
  FileOutputFormat.setOutputPath(job, output);

  InputSampler.writePartitionFile(job, sampler);

  URI partitionUri = new URI(partitionFile.toString() +
      "#" + "_sortPartitioning");
  DistributedCache.addCacheFile(partitionUri, conf);

  if (job.waitForCompletion(true)) {
    return 0;
  }
  return 1;
}
 
@SuppressWarnings("unchecked")
@Override
public int run(String[] args) throws Exception {
	Configuration conf = new Configuration();
	Path inputPath = new Path(args[0]);
	Path partitionFile = new Path(args[1] + "_partitions.lst");
	Path outputStage = new Path(args[1] + "_staging");
	Path outputOrder = new Path(args[1]);
	// Configure job to prepare for sampling
	Job sampleJob = new Job(conf, "TotalOrderSortingStage");
	sampleJob.setJarByClass(TotalOrderSortingStage.class);
	// Use the mapper implementation with zero reduce tasks
	sampleJob.setMapperClass(LastAccessMapper.class);
	sampleJob.setNumReduceTasks(0);
	sampleJob.setOutputKeyClass(Text.class);
	sampleJob.setOutputValueClass(Text.class);
	TextInputFormat.setInputPaths(sampleJob, inputPath);
	// Set the output format to a sequence file
	sampleJob.setOutputFormatClass(SequenceFileOutputFormat.class);
	SequenceFileOutputFormat.setOutputPath(sampleJob, outputStage);
	// Submit the job and get completion code.
	int code = sampleJob.waitForCompletion(true) ? 0 : 1;
	if (code == 0) {
		Job orderJob = new Job(conf, "TotalOrderSortingStage");
		orderJob.setJarByClass(TotalOrderSortingStage.class);
		// Here, use the identity mapper to output the key/value pairs in
		// the SequenceFile
		orderJob.setMapperClass(Mapper.class);
		orderJob.setReducerClass(ValuesReducer.class);
		// Set the number of reduce tasks to an appropriate number for the
		// amount of data being sorted
		orderJob.setNumReduceTasks(10);
		// Use Hadoop's TotalOrderPartitioner class
		orderJob.setPartitionerClass(TotalOrderPartitioner.class);
		// Set the partition file
		TotalOrderPartitioner.setPartitionFile(orderJob.getConfiguration(),
				partitionFile);
		orderJob.setOutputKeyClass(Text.class);
		orderJob.setOutputValueClass(Text.class);
		// Set the input to the previous job's output
		orderJob.setInputFormatClass(SequenceFileInputFormat.class);
		SequenceFileInputFormat.setInputPaths(orderJob, outputStage);
		// Set the output path to the command line parameter
		TextOutputFormat.setOutputPath(orderJob, outputOrder);
		// Set the separator to an empty string
		orderJob.getConfiguration().set(
				"mapred.textoutputformat.separator", "");
		// Use the InputSampler to go through the output of the previous
		// job, sample it, and create the partition file
		InputSampler.writePartitionFile(orderJob,
				new InputSampler.RandomSampler(.001, 10000));
		// Submit the job
		code = orderJob.waitForCompletion(true) ? 0 : 2;
	}
	// Clean up the partition file and the staging directory
	FileSystem.get(new Configuration()).delete(partitionFile, false);
	FileSystem.get(new Configuration()).delete(outputStage, true);
	return code;
}
 
 类方法
 同包方法