下面列出了怎么用org.apache.hadoop.mapreduce.lib.partition.InputSampler的API类实例代码及写法,或者点击链接到github查看源代码。
/**
* The MapReduce driver - setup and launch the job.
*
* @param args the command-line arguments
* @return the process exit code
* @throws Exception if something goes wrong
*/
public int run(final String[] args) throws Exception {
int numReducers = 2;
Cli cli = Cli.builder().setArgs(args).addOptions(CliOpts.values()).build();
int result = cli.runCmd();
if (result != 0) {
return result;
}
Path input = new Path(cli.getArgValueAsString(CliOpts.INPUT));
Path partitionFile = new Path(cli.getArgValueAsString(CliOpts.PARTITION));
Path output = new Path(cli.getArgValueAsString(CliOpts.OUTPUT));
InputSampler.Sampler<Text, Text> sampler =
new InputSampler.RandomSampler<Text, Text>
(0.1,
10000,
10);
Configuration conf = super.getConf();
Job job = new Job(conf);
job.setJarByClass(TotalSortMapReduce.class);
job.setNumReduceTasks(numReducers);
job.setInputFormatClass(KeyValueTextInputFormat.class);
job.setOutputFormatClass(TextOutputFormat.class);
job.setPartitionerClass(TotalOrderPartitioner.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(Text.class);
TotalOrderPartitioner.setPartitionFile(job.getConfiguration(), partitionFile);
FileInputFormat.setInputPaths(job, input);
FileOutputFormat.setOutputPath(job, output);
InputSampler.writePartitionFile(job, sampler);
URI partitionUri = new URI(partitionFile.toString() +
"#" + "_sortPartitioning");
DistributedCache.addCacheFile(partitionUri, conf);
if (job.waitForCompletion(true)) {
return 0;
}
return 1;
}
@SuppressWarnings("unchecked")
@Override
public int run(String[] args) throws Exception {
Configuration conf = new Configuration();
Path inputPath = new Path(args[0]);
Path partitionFile = new Path(args[1] + "_partitions.lst");
Path outputStage = new Path(args[1] + "_staging");
Path outputOrder = new Path(args[1]);
// Configure job to prepare for sampling
Job sampleJob = new Job(conf, "TotalOrderSortingStage");
sampleJob.setJarByClass(TotalOrderSortingStage.class);
// Use the mapper implementation with zero reduce tasks
sampleJob.setMapperClass(LastAccessMapper.class);
sampleJob.setNumReduceTasks(0);
sampleJob.setOutputKeyClass(Text.class);
sampleJob.setOutputValueClass(Text.class);
TextInputFormat.setInputPaths(sampleJob, inputPath);
// Set the output format to a sequence file
sampleJob.setOutputFormatClass(SequenceFileOutputFormat.class);
SequenceFileOutputFormat.setOutputPath(sampleJob, outputStage);
// Submit the job and get completion code.
int code = sampleJob.waitForCompletion(true) ? 0 : 1;
if (code == 0) {
Job orderJob = new Job(conf, "TotalOrderSortingStage");
orderJob.setJarByClass(TotalOrderSortingStage.class);
// Here, use the identity mapper to output the key/value pairs in
// the SequenceFile
orderJob.setMapperClass(Mapper.class);
orderJob.setReducerClass(ValuesReducer.class);
// Set the number of reduce tasks to an appropriate number for the
// amount of data being sorted
orderJob.setNumReduceTasks(10);
// Use Hadoop's TotalOrderPartitioner class
orderJob.setPartitionerClass(TotalOrderPartitioner.class);
// Set the partition file
TotalOrderPartitioner.setPartitionFile(orderJob.getConfiguration(),
partitionFile);
orderJob.setOutputKeyClass(Text.class);
orderJob.setOutputValueClass(Text.class);
// Set the input to the previous job's output
orderJob.setInputFormatClass(SequenceFileInputFormat.class);
SequenceFileInputFormat.setInputPaths(orderJob, outputStage);
// Set the output path to the command line parameter
TextOutputFormat.setOutputPath(orderJob, outputOrder);
// Set the separator to an empty string
orderJob.getConfiguration().set(
"mapred.textoutputformat.separator", "");
// Use the InputSampler to go through the output of the previous
// job, sample it, and create the partition file
InputSampler.writePartitionFile(orderJob,
new InputSampler.RandomSampler(.001, 10000));
// Submit the job
code = orderJob.waitForCompletion(true) ? 0 : 2;
}
// Clean up the partition file and the staging directory
FileSystem.get(new Configuration()).delete(partitionFile, false);
FileSystem.get(new Configuration()).delete(outputStage, true);
return code;
}