org.apache.hadoop.mapreduce.lib.input.FileInputFormat#setMaxInputSplitSize ( )源码实例Demo

下面列出了org.apache.hadoop.mapreduce.lib.input.FileInputFormat#setMaxInputSplitSize ( ) 实例代码,或者点击链接到github查看源代码,也可以在右侧发表评论。

源代码1 项目: ignite   文件: HadoopPopularWords.java
/**
 * Configures the Hadoop MapReduce job.
 *
 * @return Instance of the Hadoop MapRed job.
 * @throws IOException If failed.
 */
@SuppressWarnings("deprecation")
private Job createConfigBasedHadoopJob() throws IOException {
    Job jobCfg = new Job();

    Configuration cfg = jobCfg.getConfiguration();

    // Use explicit configuration of distributed file system, if provided.
    cfg.addResource(U.resolveIgniteUrl(DFS_CFG));

    jobCfg.setJobName("HadoopPopularWordExample");
    jobCfg.setJarByClass(HadoopPopularWords.class);
    jobCfg.setInputFormatClass(TextInputFormat.class);
    jobCfg.setOutputKeyClass(Text.class);
    jobCfg.setOutputValueClass(IntWritable.class);
    jobCfg.setMapperClass(TokenizingMapper.class);
    jobCfg.setReducerClass(TopNWordsReducer.class);

    FileInputFormat.setInputPaths(jobCfg, BOOKS_DFS_DIR);
    FileOutputFormat.setOutputPath(jobCfg, RESULT_DFS_DIR);

    // Local job tracker allows the only task per wave, but text input format
    // replaces it with the calculated value based on input split size option.
    if ("local".equals(cfg.get("mapred.job.tracker", "local"))) {
        // Split job into tasks using 32MB split size.
        FileInputFormat.setMinInputSplitSize(jobCfg, 32L * 1024 * 1024);
        FileInputFormat.setMaxInputSplitSize(jobCfg, Long.MAX_VALUE);
    }

    return jobCfg;
}
 
源代码2 项目: components   文件: ConfigurableHDFSFileSource.java
protected List<InputSplit> computeSplits(long desiredBundleSizeBytes) throws IOException, IllegalAccessException,
        InstantiationException {
    Job job = jobInstance();
    FileInputFormat.setMinInputSplitSize(job, desiredBundleSizeBytes);
    FileInputFormat.setMaxInputSplitSize(job, desiredBundleSizeBytes);
    return createFormat(job).getSplits(job);
}
 
源代码3 项目: hadoop   文件: DistributedPentomino.java
public int run(String[] args) throws Exception {
  Configuration conf = getConf();
  if (args.length == 0) {
    System.out.println("Usage: pentomino <output> [-depth #] [-height #] [-width #]");
    ToolRunner.printGenericCommandUsage(System.out);
    return 2;
  }
  // check for passed parameters, otherwise use defaults
  int width = conf.getInt(Pentomino.WIDTH, PENT_WIDTH);
  int height = conf.getInt(Pentomino.HEIGHT, PENT_HEIGHT);
  int depth = conf.getInt(Pentomino.DEPTH, PENT_DEPTH);
  for (int i = 0; i < args.length; i++) {
    if (args[i].equalsIgnoreCase("-depth")) {
      depth = Integer.parseInt(args[++i].trim());
    } else if (args[i].equalsIgnoreCase("-height")) {
      height = Integer.parseInt(args[++i].trim());
    } else if (args[i].equalsIgnoreCase("-width") ) {
      width = Integer.parseInt(args[++i].trim());
    }
  }
  // now set the values within conf for M/R tasks to read, this
  // will ensure values are set preventing MAPREDUCE-4678
  conf.setInt(Pentomino.WIDTH, width);
  conf.setInt(Pentomino.HEIGHT, height);
  conf.setInt(Pentomino.DEPTH, depth);
  Class<? extends Pentomino> pentClass = conf.getClass(Pentomino.CLASS, 
    OneSidedPentomino.class, Pentomino.class);
  int numMaps = conf.getInt(MRJobConfig.NUM_MAPS, DEFAULT_MAPS);
  Path output = new Path(args[0]);
  Path input = new Path(output + "_input");
  FileSystem fileSys = FileSystem.get(conf);
  try {
    Job job = Job.getInstance(conf);
    FileInputFormat.setInputPaths(job, input);
    FileOutputFormat.setOutputPath(job, output);
    job.setJarByClass(PentMap.class);
    
    job.setJobName("dancingElephant");
    Pentomino pent = ReflectionUtils.newInstance(pentClass, conf);
    pent.initialize(width, height);
    long inputSize = createInputDirectory(fileSys, input, pent, depth);
    // for forcing the number of maps
    FileInputFormat.setMaxInputSplitSize(job, (inputSize/numMaps));
 
    // the keys are the prefix strings
    job.setOutputKeyClass(Text.class);
    // the values are puzzle solutions
    job.setOutputValueClass(Text.class);
    
    job.setMapperClass(PentMap.class);        
    job.setReducerClass(Reducer.class);
    
    job.setNumReduceTasks(1);
    
    return (job.waitForCompletion(true) ? 0 : 1);
    } finally {
    fileSys.delete(input, true);
  }
}
 
源代码4 项目: big-c   文件: DistributedPentomino.java
public int run(String[] args) throws Exception {
  Configuration conf = getConf();
  if (args.length == 0) {
    System.out.println("Usage: pentomino <output> [-depth #] [-height #] [-width #]");
    ToolRunner.printGenericCommandUsage(System.out);
    return 2;
  }
  // check for passed parameters, otherwise use defaults
  int width = conf.getInt(Pentomino.WIDTH, PENT_WIDTH);
  int height = conf.getInt(Pentomino.HEIGHT, PENT_HEIGHT);
  int depth = conf.getInt(Pentomino.DEPTH, PENT_DEPTH);
  for (int i = 0; i < args.length; i++) {
    if (args[i].equalsIgnoreCase("-depth")) {
      depth = Integer.parseInt(args[++i].trim());
    } else if (args[i].equalsIgnoreCase("-height")) {
      height = Integer.parseInt(args[++i].trim());
    } else if (args[i].equalsIgnoreCase("-width") ) {
      width = Integer.parseInt(args[++i].trim());
    }
  }
  // now set the values within conf for M/R tasks to read, this
  // will ensure values are set preventing MAPREDUCE-4678
  conf.setInt(Pentomino.WIDTH, width);
  conf.setInt(Pentomino.HEIGHT, height);
  conf.setInt(Pentomino.DEPTH, depth);
  Class<? extends Pentomino> pentClass = conf.getClass(Pentomino.CLASS, 
    OneSidedPentomino.class, Pentomino.class);
  int numMaps = conf.getInt(MRJobConfig.NUM_MAPS, DEFAULT_MAPS);
  Path output = new Path(args[0]);
  Path input = new Path(output + "_input");
  FileSystem fileSys = FileSystem.get(conf);
  try {
    Job job = Job.getInstance(conf);
    FileInputFormat.setInputPaths(job, input);
    FileOutputFormat.setOutputPath(job, output);
    job.setJarByClass(PentMap.class);
    
    job.setJobName("dancingElephant");
    Pentomino pent = ReflectionUtils.newInstance(pentClass, conf);
    pent.initialize(width, height);
    long inputSize = createInputDirectory(fileSys, input, pent, depth);
    // for forcing the number of maps
    FileInputFormat.setMaxInputSplitSize(job, (inputSize/numMaps));
 
    // the keys are the prefix strings
    job.setOutputKeyClass(Text.class);
    // the values are puzzle solutions
    job.setOutputValueClass(Text.class);
    
    job.setMapperClass(PentMap.class);        
    job.setReducerClass(Reducer.class);
    
    job.setNumReduceTasks(1);
    
    return (job.waitForCompletion(true) ? 0 : 1);
    } finally {
    fileSys.delete(input, true);
  }
}
 
源代码5 项目: learning-hadoop   文件: ImportTsv.java
/**
 * Sets up the actual job.
 * 
 * @param conf
 *            The current configuration.
 * @param args
 *            The command line parameters.
 * @return The newly created job.
 * @throws IOException
 *             When setting up the job fails.
 */
public static Job createSubmittableJob(Configuration conf, String[] args)
		throws IOException, ClassNotFoundException {

	// Support non-XML supported characters
	// by re-encoding the passed separator as a Base64 string.
	String actualSeparator = conf.get(SEPARATOR_CONF_KEY);
	if (actualSeparator != null) {
		conf.set(SEPARATOR_CONF_KEY,
				new String(Base64.encodeBytes(actualSeparator.getBytes())));
	}

	// See if a non-default Mapper was set
	String mapperClassName = conf.get(MAPPER_CONF_KEY);
	Class mapperClass = mapperClassName != null ? Class
			.forName(mapperClassName) : DEFAULT_MAPPER;

	String tableName = args[0];
	Path inputDir = new Path(args[1]);
	Job job = new Job(conf, NAME + "_" + tableName);
	job.setJarByClass(mapperClass);
	FileInputFormat.setInputPaths(job, inputDir);

	String inputCodec = conf.get(INPUT_LZO_KEY);
	if (inputCodec == null) {
		FileInputFormat.setMaxInputSplitSize(job, 67108864l); // max split
																// size =
																// 64m
		job.setInputFormatClass(TextInputFormat.class);
	} else {
		if (inputCodec.equalsIgnoreCase("lzo"))
			job.setInputFormatClass(LzoTextInputFormat.class);
		else {
			usage("not supported compression codec!");
			System.exit(-1);
		}
	}

	job.setMapperClass(mapperClass);

	String hfileOutPath = conf.get(BULK_OUTPUT_CONF_KEY);
	if (hfileOutPath != null) {
		HTable table = new HTable(conf, tableName);
		job.setReducerClass(PutSortReducer.class);
		Path outputDir = new Path(hfileOutPath);
		FileOutputFormat.setOutputPath(job, outputDir);
		job.setMapOutputKeyClass(ImmutableBytesWritable.class);
		job.setMapOutputValueClass(Put.class);
		HFileOutputFormat.configureIncrementalLoad(job, table);
	} else {
		// No reducers. Just write straight to table. Call
		// initTableReducerJob
		// to set up the TableOutputFormat.
		TableMapReduceUtil.initTableReducerJob(tableName, null, job);
		job.setNumReduceTasks(0);
	}

	TableMapReduceUtil.addDependencyJars(job);
	TableMapReduceUtil.addDependencyJars(job.getConfiguration(),
			com.google.common.base.Function.class /*
												 * Guava used by TsvParser
												 */);
	return job;
}