下面列出了org.apache.hadoop.mapreduce.lib.input.NLineInputFormat#addInputPath ( ) 实例代码,或者点击链接到github查看源代码,也可以在右侧发表评论。
@Override
public int run(String[] args)
throws Exception
{
Job job = Job.getInstance(getConf());
job.setJarByClass(Phase3Step4LocalDeDuplication.class);
job.setJobName(Phase3Step4LocalDeDuplication.class.getName());
// paths
String inputPath = args[0];
// text files of ids to be deleted
String outputPath = args[1];
// input: reading max N lines for each mapper
job.setInputFormatClass(NLineInputFormat.class);
NLineInputFormat.addInputPath(job, new Path(inputPath));
job.getConfiguration().setInt("mapreduce.input.lineinputformat.linespermap", LINES);
// mapper
job.setMapperClass(LocalGreedyDeDuplicationMapper.class);
LazyOutputFormat.setOutputFormatClass(job, TextOutputFormat.class);
// reducer
job.setReducerClass(IDCollectorReducer.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(NullWritable.class);
FileOutputFormat.setOutputPath(job, new Path(outputPath));
return job.waitForCompletion(true) ? 0 : 1;
}
private Job prepareJob() throws IOException {
// Basic configuration
configuration.setInt("mapreduce.input.lineinputformat.linespermap", 1);
configuration.set("reportingHost", this.reportingHost);
configuration.setBoolean("mapreduce.map.output.compress", true);
configuration.setBoolean("mapred.compress.map.output", true);
configuration.setBoolean("mapred.output.compress", true);
configuration.setClass("mapred.map.output.compression.codec", GzipCodec.class, CompressionCodec.class);
configuration.setClass("mapred.output.compression.codec", GzipCodec.class, CompressionCodec.class);
/* configuration.setBoolean("mapreduce.output.fileoutputformat.compress", true);
configuration.setClass("mapreduce.output.fileoutputformat.compress.codec", GzipCodec.class, CompressionCodec.class);
configuration.setCompressMapOutput(true);
*/
// configuration.set("mapreduce.output.fileoutputformat.compress", "true");
// configuration.set("mapreduce.output.fileoutputformat.compress.codec", "org.apache.hadoop.io.compress.GzipCodec");
// configuration.set("mapreduce.output.fileoutputformat.compress.type", "BLOCK");
// Job ret = new Job(configuration);
Job ret = org.apache.hadoop.mapreduce.Job.getInstance(configuration);
ret.setJarByClass(HDFSDistributor.class);
ret.setJobName("PATH Test Data Generation");
// Mapper
ret.setMapperClass(DataGeneratorMapper.class);
// Reducer (none)
ret.setNumReduceTasks(0);
// Input
ret.setInputFormatClass(NLineInputFormat.class);
NLineInputFormat.addInputPath(ret, mapperInputFilePath);
// Output
// [BTR] Saw this used in an example w/NLineInputFormatter
// but not sure what it actually does ...
// LazyOutputFormat.setOutputFormatClass(ret, TextOutputFormat.class);
FileOutputFormat.setOutputPath(ret, mapperOutputFilePath);
//ret.getConfiguration().setBoolean("mapred.output.compress", false);
return ret;
}