下面列出了org.apache.hadoop.mapreduce.lib.input.FileInputFormat#setMaxInputSplitSize ( ) 实例代码,或者点击链接到github查看源代码,也可以在右侧发表评论。
/**
* Configures the Hadoop MapReduce job.
*
* @return Instance of the Hadoop MapRed job.
* @throws IOException If failed.
*/
@SuppressWarnings("deprecation")
private Job createConfigBasedHadoopJob() throws IOException {
Job jobCfg = new Job();
Configuration cfg = jobCfg.getConfiguration();
// Use explicit configuration of distributed file system, if provided.
cfg.addResource(U.resolveIgniteUrl(DFS_CFG));
jobCfg.setJobName("HadoopPopularWordExample");
jobCfg.setJarByClass(HadoopPopularWords.class);
jobCfg.setInputFormatClass(TextInputFormat.class);
jobCfg.setOutputKeyClass(Text.class);
jobCfg.setOutputValueClass(IntWritable.class);
jobCfg.setMapperClass(TokenizingMapper.class);
jobCfg.setReducerClass(TopNWordsReducer.class);
FileInputFormat.setInputPaths(jobCfg, BOOKS_DFS_DIR);
FileOutputFormat.setOutputPath(jobCfg, RESULT_DFS_DIR);
// Local job tracker allows the only task per wave, but text input format
// replaces it with the calculated value based on input split size option.
if ("local".equals(cfg.get("mapred.job.tracker", "local"))) {
// Split job into tasks using 32MB split size.
FileInputFormat.setMinInputSplitSize(jobCfg, 32L * 1024 * 1024);
FileInputFormat.setMaxInputSplitSize(jobCfg, Long.MAX_VALUE);
}
return jobCfg;
}
protected List<InputSplit> computeSplits(long desiredBundleSizeBytes) throws IOException, IllegalAccessException,
InstantiationException {
Job job = jobInstance();
FileInputFormat.setMinInputSplitSize(job, desiredBundleSizeBytes);
FileInputFormat.setMaxInputSplitSize(job, desiredBundleSizeBytes);
return createFormat(job).getSplits(job);
}
public int run(String[] args) throws Exception {
Configuration conf = getConf();
if (args.length == 0) {
System.out.println("Usage: pentomino <output> [-depth #] [-height #] [-width #]");
ToolRunner.printGenericCommandUsage(System.out);
return 2;
}
// check for passed parameters, otherwise use defaults
int width = conf.getInt(Pentomino.WIDTH, PENT_WIDTH);
int height = conf.getInt(Pentomino.HEIGHT, PENT_HEIGHT);
int depth = conf.getInt(Pentomino.DEPTH, PENT_DEPTH);
for (int i = 0; i < args.length; i++) {
if (args[i].equalsIgnoreCase("-depth")) {
depth = Integer.parseInt(args[++i].trim());
} else if (args[i].equalsIgnoreCase("-height")) {
height = Integer.parseInt(args[++i].trim());
} else if (args[i].equalsIgnoreCase("-width") ) {
width = Integer.parseInt(args[++i].trim());
}
}
// now set the values within conf for M/R tasks to read, this
// will ensure values are set preventing MAPREDUCE-4678
conf.setInt(Pentomino.WIDTH, width);
conf.setInt(Pentomino.HEIGHT, height);
conf.setInt(Pentomino.DEPTH, depth);
Class<? extends Pentomino> pentClass = conf.getClass(Pentomino.CLASS,
OneSidedPentomino.class, Pentomino.class);
int numMaps = conf.getInt(MRJobConfig.NUM_MAPS, DEFAULT_MAPS);
Path output = new Path(args[0]);
Path input = new Path(output + "_input");
FileSystem fileSys = FileSystem.get(conf);
try {
Job job = Job.getInstance(conf);
FileInputFormat.setInputPaths(job, input);
FileOutputFormat.setOutputPath(job, output);
job.setJarByClass(PentMap.class);
job.setJobName("dancingElephant");
Pentomino pent = ReflectionUtils.newInstance(pentClass, conf);
pent.initialize(width, height);
long inputSize = createInputDirectory(fileSys, input, pent, depth);
// for forcing the number of maps
FileInputFormat.setMaxInputSplitSize(job, (inputSize/numMaps));
// the keys are the prefix strings
job.setOutputKeyClass(Text.class);
// the values are puzzle solutions
job.setOutputValueClass(Text.class);
job.setMapperClass(PentMap.class);
job.setReducerClass(Reducer.class);
job.setNumReduceTasks(1);
return (job.waitForCompletion(true) ? 0 : 1);
} finally {
fileSys.delete(input, true);
}
}
public int run(String[] args) throws Exception {
Configuration conf = getConf();
if (args.length == 0) {
System.out.println("Usage: pentomino <output> [-depth #] [-height #] [-width #]");
ToolRunner.printGenericCommandUsage(System.out);
return 2;
}
// check for passed parameters, otherwise use defaults
int width = conf.getInt(Pentomino.WIDTH, PENT_WIDTH);
int height = conf.getInt(Pentomino.HEIGHT, PENT_HEIGHT);
int depth = conf.getInt(Pentomino.DEPTH, PENT_DEPTH);
for (int i = 0; i < args.length; i++) {
if (args[i].equalsIgnoreCase("-depth")) {
depth = Integer.parseInt(args[++i].trim());
} else if (args[i].equalsIgnoreCase("-height")) {
height = Integer.parseInt(args[++i].trim());
} else if (args[i].equalsIgnoreCase("-width") ) {
width = Integer.parseInt(args[++i].trim());
}
}
// now set the values within conf for M/R tasks to read, this
// will ensure values are set preventing MAPREDUCE-4678
conf.setInt(Pentomino.WIDTH, width);
conf.setInt(Pentomino.HEIGHT, height);
conf.setInt(Pentomino.DEPTH, depth);
Class<? extends Pentomino> pentClass = conf.getClass(Pentomino.CLASS,
OneSidedPentomino.class, Pentomino.class);
int numMaps = conf.getInt(MRJobConfig.NUM_MAPS, DEFAULT_MAPS);
Path output = new Path(args[0]);
Path input = new Path(output + "_input");
FileSystem fileSys = FileSystem.get(conf);
try {
Job job = Job.getInstance(conf);
FileInputFormat.setInputPaths(job, input);
FileOutputFormat.setOutputPath(job, output);
job.setJarByClass(PentMap.class);
job.setJobName("dancingElephant");
Pentomino pent = ReflectionUtils.newInstance(pentClass, conf);
pent.initialize(width, height);
long inputSize = createInputDirectory(fileSys, input, pent, depth);
// for forcing the number of maps
FileInputFormat.setMaxInputSplitSize(job, (inputSize/numMaps));
// the keys are the prefix strings
job.setOutputKeyClass(Text.class);
// the values are puzzle solutions
job.setOutputValueClass(Text.class);
job.setMapperClass(PentMap.class);
job.setReducerClass(Reducer.class);
job.setNumReduceTasks(1);
return (job.waitForCompletion(true) ? 0 : 1);
} finally {
fileSys.delete(input, true);
}
}
/**
* Sets up the actual job.
*
* @param conf
* The current configuration.
* @param args
* The command line parameters.
* @return The newly created job.
* @throws IOException
* When setting up the job fails.
*/
public static Job createSubmittableJob(Configuration conf, String[] args)
throws IOException, ClassNotFoundException {
// Support non-XML supported characters
// by re-encoding the passed separator as a Base64 string.
String actualSeparator = conf.get(SEPARATOR_CONF_KEY);
if (actualSeparator != null) {
conf.set(SEPARATOR_CONF_KEY,
new String(Base64.encodeBytes(actualSeparator.getBytes())));
}
// See if a non-default Mapper was set
String mapperClassName = conf.get(MAPPER_CONF_KEY);
Class mapperClass = mapperClassName != null ? Class
.forName(mapperClassName) : DEFAULT_MAPPER;
String tableName = args[0];
Path inputDir = new Path(args[1]);
Job job = new Job(conf, NAME + "_" + tableName);
job.setJarByClass(mapperClass);
FileInputFormat.setInputPaths(job, inputDir);
String inputCodec = conf.get(INPUT_LZO_KEY);
if (inputCodec == null) {
FileInputFormat.setMaxInputSplitSize(job, 67108864l); // max split
// size =
// 64m
job.setInputFormatClass(TextInputFormat.class);
} else {
if (inputCodec.equalsIgnoreCase("lzo"))
job.setInputFormatClass(LzoTextInputFormat.class);
else {
usage("not supported compression codec!");
System.exit(-1);
}
}
job.setMapperClass(mapperClass);
String hfileOutPath = conf.get(BULK_OUTPUT_CONF_KEY);
if (hfileOutPath != null) {
HTable table = new HTable(conf, tableName);
job.setReducerClass(PutSortReducer.class);
Path outputDir = new Path(hfileOutPath);
FileOutputFormat.setOutputPath(job, outputDir);
job.setMapOutputKeyClass(ImmutableBytesWritable.class);
job.setMapOutputValueClass(Put.class);
HFileOutputFormat.configureIncrementalLoad(job, table);
} else {
// No reducers. Just write straight to table. Call
// initTableReducerJob
// to set up the TableOutputFormat.
TableMapReduceUtil.initTableReducerJob(tableName, null, job);
job.setNumReduceTasks(0);
}
TableMapReduceUtil.addDependencyJars(job);
TableMapReduceUtil.addDependencyJars(job.getConfiguration(),
com.google.common.base.Function.class /*
* Guava used by TsvParser
*/);
return job;
}