org.apache.hadoop.mapreduce.lib.input.FileInputFormat#getInputPaths ( )源码实例Demo

下面列出了org.apache.hadoop.mapreduce.lib.input.FileInputFormat#getInputPaths ( ) 实例代码,或者点击链接到github查看源代码,也可以在右侧发表评论。

源代码1 项目: hadoop   文件: ControlledJob.java
/**
 * Submit this job to mapred. The state becomes RUNNING if submission 
 * is successful, FAILED otherwise.  
 */
protected synchronized void submit() {
  try {
    Configuration conf = job.getConfiguration();
    if (conf.getBoolean(CREATE_DIR, false)) {
      FileSystem fs = FileSystem.get(conf);
      Path inputPaths[] = FileInputFormat.getInputPaths(job);
      for (int i = 0; i < inputPaths.length; i++) {
        if (!fs.exists(inputPaths[i])) {
          try {
            fs.mkdirs(inputPaths[i]);
          } catch (IOException e) {

          }
        }
      }
    }
    job.submit();
    this.state = State.RUNNING;
  } catch (Exception ioe) {
    LOG.info(getJobName()+" got an error while submitting ",ioe);
    this.state = State.FAILED;
    this.message = StringUtils.stringifyException(ioe);
  }
}
 
源代码2 项目: big-c   文件: ControlledJob.java
/**
 * Submit this job to mapred. The state becomes RUNNING if submission 
 * is successful, FAILED otherwise.  
 */
protected synchronized void submit() {
  try {
    Configuration conf = job.getConfiguration();
    if (conf.getBoolean(CREATE_DIR, false)) {
      FileSystem fs = FileSystem.get(conf);
      Path inputPaths[] = FileInputFormat.getInputPaths(job);
      for (int i = 0; i < inputPaths.length; i++) {
        if (!fs.exists(inputPaths[i])) {
          try {
            fs.mkdirs(inputPaths[i]);
          } catch (IOException e) {

          }
        }
      }
    }
    job.submit();
    this.state = State.RUNNING;
  } catch (Exception ioe) {
    LOG.info(getJobName()+" got an error while submitting ",ioe);
    this.state = State.FAILED;
    this.message = StringUtils.stringifyException(ioe);
  }
}
 
源代码3 项目: incubator-gobblin   文件: OrcUtils.java
public static TypeDescription getNewestSchemaFromSource(Job job, FileSystem fs)
    throws IOException {
  Path[] sourceDirs = FileInputFormat.getInputPaths(job);
  if (sourceDirs.length == 0) {
    throw new IllegalStateException("There should be at least one directory specified for the MR job");
  }

  List<FileStatus> files = new ArrayList<FileStatus>();

  for (Path sourceDir : sourceDirs) {
    files.addAll(FileListUtils.listFilesRecursively(fs, sourceDir));
  }
  Collections.sort(files, new MRCompactorAvroKeyDedupJobRunner.LastModifiedDescComparator());

  TypeDescription resultSchema;
  for (FileStatus status : files) {
    resultSchema = getTypeDescriptionFromFile(job.getConfiguration(), status.getPath());
    if (resultSchema != null) {
      return resultSchema;
    }
  }

  throw new IllegalStateException(String
      .format("There's no file carrying orc file schema in the list of directories: %s",
          Arrays.toString(sourceDirs)));
}
 
public static Schema getNewestSchemaFromSource(Job job, FileSystem fs) throws IOException {
  Path[] sourceDirs = FileInputFormat.getInputPaths(job);

  List<FileStatus> files = new ArrayList<FileStatus>();

  for (Path sourceDir : sourceDirs) {
    files.addAll(Arrays.asList(fs.listStatus(sourceDir)));
  }

  Collections.sort(files, new LastModifiedDescComparator());

  for (FileStatus file : files) {
    Schema schema = getNewestSchemaFromSource(file.getPath(), fs);
    if (schema != null) {
      return schema;
    }
  }
  return null;
}
 
源代码5 项目: spork   文件: PigSequenceFileInputFormat.java
@Override
protected List<FileStatus> listStatus(JobContext job) throws IOException {        
    Path[] dirs = FileInputFormat.getInputPaths(job);
    if (dirs.length == 0) {
        throw new IOException("No input paths specified in job");
    }
    List<FileStatus> files = new ArrayList<FileStatus>();
    for (int i=0; i<dirs.length; ++i) {
        Path p = dirs[i];
        FileSystem fs = p.getFileSystem(job.getConfiguration()); 
        FileStatus[] matches = fs.globStatus(p, hiddenFileFilter);
        if (matches == null) {
            throw new IOException("Input path does not exist: " + p);
        } else if (matches.length == 0) {
            throw new IOException("Input Pattern " + p + " matches 0 files");
        } else {
            for (FileStatus globStat: matches) {
                files.add(globStat);
            }
        }
    }
    return MapRedUtil.getAllFileRecursively(files, job.getConfiguration());        
}
 
源代码6 项目: Flink-CEPplus   文件: HadoopInputFormatBase.java
@Override
public BaseStatistics getStatistics(BaseStatistics cachedStats) throws IOException {
	// only gather base statistics for FileInputFormats
	if (!(mapreduceInputFormat instanceof FileInputFormat)) {
		return null;
	}

	JobContext jobContext = new JobContextImpl(configuration, null);

	final FileBaseStatistics cachedFileStats = (cachedStats instanceof FileBaseStatistics) ?
			(FileBaseStatistics) cachedStats : null;

	try {
		final org.apache.hadoop.fs.Path[] paths = FileInputFormat.getInputPaths(jobContext);
		return getFileStats(cachedFileStats, paths, new ArrayList<FileStatus>(1));
	} catch (IOException ioex) {
		if (LOG.isWarnEnabled()) {
			LOG.warn("Could not determine statistics due to an io error: "
					+ ioex.getMessage());
		}
	} catch (Throwable t) {
		if (LOG.isErrorEnabled()) {
			LOG.error("Unexpected problem while getting the file statistics: "
					+ t.getMessage(), t);
		}
	}

	// no statistics available
	return null;
}
 
源代码7 项目: flink   文件: HadoopInputFormatBase.java
@Override
public BaseStatistics getStatistics(BaseStatistics cachedStats) throws IOException {
	// only gather base statistics for FileInputFormats
	if (!(mapreduceInputFormat instanceof FileInputFormat)) {
		return null;
	}

	JobContext jobContext = new JobContextImpl(configuration, null);

	final FileBaseStatistics cachedFileStats = (cachedStats instanceof FileBaseStatistics) ?
			(FileBaseStatistics) cachedStats : null;

	try {
		final org.apache.hadoop.fs.Path[] paths = FileInputFormat.getInputPaths(jobContext);
		return getFileStats(cachedFileStats, paths, new ArrayList<FileStatus>(1));
	} catch (IOException ioex) {
		if (LOG.isWarnEnabled()) {
			LOG.warn("Could not determine statistics due to an io error: "
					+ ioex.getMessage());
		}
	} catch (Throwable t) {
		if (LOG.isErrorEnabled()) {
			LOG.error("Unexpected problem while getting the file statistics: "
					+ t.getMessage(), t);
		}
	}

	// no statistics available
	return null;
}
 
源代码8 项目: emodb   文件: EmoInputFormat.java
@Override
public List<InputSplit> getSplits(JobContext context)
        throws IOException {
    Path[] paths = FileInputFormat.getInputPaths(context);

    return FluentIterable.from(BaseInputFormat.getSplits(context.getConfiguration(), paths))
            .transform(_fromSplit)
            .toList();
}
 
源代码9 项目: flink   文件: HadoopInputFormatBase.java
@Override
public BaseStatistics getStatistics(BaseStatistics cachedStats) throws IOException {
	// only gather base statistics for FileInputFormats
	if (!(mapreduceInputFormat instanceof FileInputFormat)) {
		return null;
	}

	JobContext jobContext = new JobContextImpl(configuration, null);

	final FileBaseStatistics cachedFileStats = (cachedStats instanceof FileBaseStatistics) ?
			(FileBaseStatistics) cachedStats : null;

	try {
		final org.apache.hadoop.fs.Path[] paths = FileInputFormat.getInputPaths(jobContext);
		return getFileStats(cachedFileStats, paths, new ArrayList<FileStatus>(1));
	} catch (IOException ioex) {
		if (LOG.isWarnEnabled()) {
			LOG.warn("Could not determine statistics due to an io error: "
					+ ioex.getMessage());
		}
	} catch (Throwable t) {
		if (LOG.isErrorEnabled()) {
			LOG.error("Unexpected problem while getting the file statistics: "
					+ t.getMessage(), t);
		}
	}

	// no statistics available
	return null;
}
 
@Override
public List<InputSplit> getSplits(JobContext context)
    throws IOException, InterruptedException {

  Path[] inputPaths = FileInputFormat.getInputPaths(context);
  if (inputPaths == null || inputPaths.length == 0) {
    throw new IOException("No input found!");
  }

  List<String> allPaths = Lists.newArrayList();

  for (Path path : inputPaths) {
    // path is a single work unit / multi work unit
    FileSystem fs = path.getFileSystem(context.getConfiguration());
    FileStatus[] inputs = fs.listStatus(path);

    if (inputs == null) {
      throw new IOException(String.format("Path %s does not exist.", path));
    }
    log.info(String.format("Found %d input files at %s: %s", inputs.length, path, Arrays.toString(inputs)));
    for (FileStatus input : inputs) {
      allPaths.add(input.getPath().toString());
    }
  }

  int maxMappers = getMaxMapper(context.getConfiguration());
  int numTasksPerMapper =
      allPaths.size() % maxMappers == 0 ? allPaths.size() / maxMappers : allPaths.size() / maxMappers + 1;

  List<InputSplit> splits = Lists.newArrayList();
  Iterator<String> pathsIt = allPaths.iterator();
  while (pathsIt.hasNext()) {
    Iterator<String> limitedIterator = Iterators.limit(pathsIt, numTasksPerMapper);
    splits.add(new GobblinSplit(Lists.newArrayList(limitedIterator)));
  }

  return splits;
}
 
源代码11 项目: hadoop   文件: GenericMRLoadGenerator.java
public int run(String [] argv) throws Exception {
  Job job = Job.getInstance(getConf());
  job.setJarByClass(GenericMRLoadGenerator.class);
  job.setMapperClass(SampleMapper.class);
  job.setReducerClass(SampleReducer.class);
  if (!parseArgs(argv, job)) {
    return -1;
  }

  Configuration conf = job.getConfiguration();
  if (null == FileOutputFormat.getOutputPath(job)) {
    // No output dir? No writes
    job.setOutputFormatClass(NullOutputFormat.class);
  }

  if (0 == FileInputFormat.getInputPaths(job).length) {
    // No input dir? Generate random data
    System.err.println("No input path; ignoring InputFormat");
    confRandom(job);
  } else if (null != conf.getClass(INDIRECT_INPUT_FORMAT, null)) {
    // specified IndirectInputFormat? Build src list
    JobClient jClient = new JobClient(conf);  
    Path tmpDir = new Path("/tmp");
    Random r = new Random();
    Path indirInputFile = new Path(tmpDir,
        Integer.toString(r.nextInt(Integer.MAX_VALUE), 36) + "_files");
    conf.set(INDIRECT_INPUT_FILE, indirInputFile.toString());
    SequenceFile.Writer writer = SequenceFile.createWriter(
        tmpDir.getFileSystem(conf), conf, indirInputFile,
        LongWritable.class, Text.class,
        SequenceFile.CompressionType.NONE);
    try {
      for (Path p : FileInputFormat.getInputPaths(job)) {
        FileSystem fs = p.getFileSystem(conf);
        Stack<Path> pathstack = new Stack<Path>();
        pathstack.push(p);
        while (!pathstack.empty()) {
          for (FileStatus stat : fs.listStatus(pathstack.pop())) {
            if (stat.isDirectory()) {
              if (!stat.getPath().getName().startsWith("_")) {
                pathstack.push(stat.getPath());
              }
            } else {
              writer.sync();
              writer.append(new LongWritable(stat.getLen()),
                  new Text(stat.getPath().toUri().toString()));
            }
          }
        }
      }
    } finally {
      writer.close();
    }
  }

  Date startTime = new Date();
  System.out.println("Job started: " + startTime);
  int ret = job.waitForCompletion(true) ? 0 : 1;
  Date endTime = new Date();
  System.out.println("Job ended: " + endTime);
  System.out.println("The job took " +
                     (endTime.getTime() - startTime.getTime()) /1000 +
                     " seconds.");

  return ret;
}
 
源代码12 项目: linden   文件: LindenJob.java
@Override
public int run(String[] strings) throws Exception {
  Configuration conf = getConf();
  String dir = conf.get(LindenJobConfig.INPUT_DIR, null);
  logger.info("input dir:" + dir);
  Path inputPath = new Path(StringUtils.unEscapeString(dir));
  Path outputPath = new Path(conf.get(LindenJobConfig.OUTPUT_DIR));
  String indexPath = conf.get(LindenJobConfig.INDEX_PATH);

  FileSystem fs = FileSystem.get(conf);
  if (fs.exists(outputPath)) {
    fs.delete(outputPath, true);
  }
  if (fs.exists(new Path(indexPath))) {
    fs.delete(new Path(indexPath), true);
  }

  int numShards = conf.getInt(LindenJobConfig.NUM_SHARDS, 1);
  Shard[] shards = createShards(indexPath, numShards);

  Shard.setIndexShards(conf, shards);

  //empty trash;
  (new Trash(conf)).expunge();

  Job job = Job.getInstance(conf, "linden-hadoop-indexing");
  job.setJarByClass(LindenJob.class);
  job.setMapperClass(LindenMapper.class);
  job.setCombinerClass(LindenCombiner.class);
  job.setReducerClass(LindenReducer.class);
  job.setMapOutputKeyClass(Shard.class);
  job.setMapOutputValueClass(IntermediateForm.class);
  job.setOutputKeyClass(Shard.class);
  job.setOutputValueClass(Text.class);
  job.setInputFormatClass(TextInputFormat.class);
  job.setOutputFormatClass(IndexUpdateOutputFormat.class);
  job.setReduceSpeculativeExecution(false);
  job.setNumReduceTasks(numShards);

  String lindenSchemaFile = conf.get(LindenJobConfig.SCHEMA_FILE_URL);
  if (lindenSchemaFile == null) {
    throw new IOException("no schema file is found");
  }
  logger.info("Adding schema file: " + lindenSchemaFile);
  job.addCacheFile(new URI(lindenSchemaFile + "#lindenSchema"));
  String lindenPropertiesFile = conf.get(LindenJobConfig.LINDEN_PROPERTIES_FILE_URL);
  if (lindenPropertiesFile == null) {
    throw new IOException("no linden properties file is found");
  }
  logger.info("Adding linden properties file: " + lindenPropertiesFile);
  job.addCacheFile(new URI(lindenPropertiesFile + "#lindenProperties"));

  FileInputFormat.setInputPaths(job, inputPath);
  FileOutputFormat.setOutputPath(job, outputPath);

  Path[] inputs = FileInputFormat.getInputPaths(job);
  StringBuilder buffer = new StringBuilder(inputs[0].toString());
  for (int i = 1; i < inputs.length; i++) {
    buffer.append(",");
    buffer.append(inputs[i].toString());
  }
  logger.info("mapreduce.input.dir = " + buffer.toString());
  logger.info("mapreduce.output.dir = " + FileOutputFormat.getOutputPath(job).toString());
  logger.info("mapreduce.job.num.reduce.tasks = " + job.getNumReduceTasks());
  logger.info(shards.length + " shards = " + conf.get(LindenJobConfig.INDEX_SHARDS));
  logger.info("mapreduce.input.format.class = " + job.getInputFormatClass());
  logger.info("mapreduce.output.format.class = " + job.getOutputFormatClass());
  logger.info("mapreduce.cluster.temp.dir = " + conf.get(MRJobConfig.TEMP_DIR));

  job.waitForCompletion(true);
  if (!job.isSuccessful()) {
    throw new RuntimeException("Job failed");
  }
  return 0;
}
 
源代码13 项目: big-c   文件: GenericMRLoadGenerator.java
public int run(String [] argv) throws Exception {
  Job job = Job.getInstance(getConf());
  job.setJarByClass(GenericMRLoadGenerator.class);
  job.setMapperClass(SampleMapper.class);
  job.setReducerClass(SampleReducer.class);
  if (!parseArgs(argv, job)) {
    return -1;
  }

  Configuration conf = job.getConfiguration();
  if (null == FileOutputFormat.getOutputPath(job)) {
    // No output dir? No writes
    job.setOutputFormatClass(NullOutputFormat.class);
  }

  if (0 == FileInputFormat.getInputPaths(job).length) {
    // No input dir? Generate random data
    System.err.println("No input path; ignoring InputFormat");
    confRandom(job);
  } else if (null != conf.getClass(INDIRECT_INPUT_FORMAT, null)) {
    // specified IndirectInputFormat? Build src list
    JobClient jClient = new JobClient(conf);  
    Path tmpDir = new Path("/tmp");
    Random r = new Random();
    Path indirInputFile = new Path(tmpDir,
        Integer.toString(r.nextInt(Integer.MAX_VALUE), 36) + "_files");
    conf.set(INDIRECT_INPUT_FILE, indirInputFile.toString());
    SequenceFile.Writer writer = SequenceFile.createWriter(
        tmpDir.getFileSystem(conf), conf, indirInputFile,
        LongWritable.class, Text.class,
        SequenceFile.CompressionType.NONE);
    try {
      for (Path p : FileInputFormat.getInputPaths(job)) {
        FileSystem fs = p.getFileSystem(conf);
        Stack<Path> pathstack = new Stack<Path>();
        pathstack.push(p);
        while (!pathstack.empty()) {
          for (FileStatus stat : fs.listStatus(pathstack.pop())) {
            if (stat.isDirectory()) {
              if (!stat.getPath().getName().startsWith("_")) {
                pathstack.push(stat.getPath());
              }
            } else {
              writer.sync();
              writer.append(new LongWritable(stat.getLen()),
                  new Text(stat.getPath().toUri().toString()));
            }
          }
        }
      }
    } finally {
      writer.close();
    }
  }

  Date startTime = new Date();
  System.out.println("Job started: " + startTime);
  int ret = job.waitForCompletion(true) ? 0 : 1;
  Date endTime = new Date();
  System.out.println("Job ended: " + endTime);
  System.out.println("The job took " +
                     (endTime.getTime() - startTime.getTime()) /1000 +
                     " seconds.");

  return ret;
}
 
源代码14 项目: spork   文件: PathPartitionHelper.java
/**
    * This method is called by the FileInputFormat to find the input paths for
    * which splits should be calculated.<br/>
    * If applyDateRanges == true: Then the HiveRCDateSplitter is used to apply
    * filtering on the input files.<br/>
    * Else the default FileInputFormat listStatus method is used.
    * 
    * @param ctx
    *            JobContext
    * @param loaderClass
    *            this is chosen to be a subclass of LoadFunc to maintain some
    *            consistency.
    */
   public List<FileStatus> listStatus(JobContext ctx,
    Class<? extends LoadFunc> loaderClass, String signature)
    throws IOException {

Properties properties = UDFContext.getUDFContext().getUDFProperties(
	loaderClass, new String[] { signature });

String partitionExpression = properties
	.getProperty(PARITITION_FILTER_EXPRESSION);

ExpressionFactory expressionFactory = null;

if (partitionExpression != null) {
    expressionFactory = ExpressionFactory.newInstance();
}

String partitionColumnStr = properties
	.getProperty(PathPartitionHelper.PARTITION_COLUMNS);
String[] partitionKeys = (partitionColumnStr == null) ? null
	: partitionColumnStr.split(",");

Path[] inputPaths = FileInputFormat.getInputPaths(ctx);

List<FileStatus> splitPaths = null;

if (partitionKeys != null) {

    splitPaths = new ArrayList<FileStatus>();

    for (Path inputPath : inputPaths) {
	// for each input path work recursively through each partition
	// level to find the rc files

	FileSystem fs = inputPath.getFileSystem(ctx.getConfiguration());

	if (fs.getFileStatus(inputPath).isDir()) {
	    // assure that we are at the root of the partition tree.
	    FileStatus fileStatusArr[] = fs.listStatus(inputPath);

	    if (fileStatusArr != null) {
		for (FileStatus childFileStatus : fileStatusArr) {
		    getPartitionedFiles(expressionFactory,
			    partitionExpression, fs, childFileStatus,
			    0, partitionKeys, splitPaths);
		}
	    }

	} else {
	    splitPaths.add(fs.getFileStatus(inputPath));
	}

    }

    if (splitPaths.size() < 1) {
	LOG.error("Not split paths where found, please check that the filter logic for the partition keys does not filter out everything ");
    }

}

return splitPaths;
   }
 
@Override
public InputSplit[] getSplits(JobConf jobConf, int numSplits) throws IOException {
  String ks = jobConf.get(AbstractColumnSerDe.CASSANDRA_KEYSPACE_NAME);
  String cf = jobConf.get(AbstractColumnSerDe.CASSANDRA_CF_NAME);
  int slicePredicateSize = jobConf.getInt(AbstractColumnSerDe.CASSANDRA_SLICE_PREDICATE_SIZE,
      AbstractColumnSerDe.DEFAULT_SLICE_PREDICATE_SIZE);
  int sliceRangeSize = jobConf.getInt(
      AbstractColumnSerDe.CASSANDRA_RANGE_BATCH_SIZE,
      AbstractColumnSerDe.DEFAULT_RANGE_BATCH_SIZE);
  int splitSize = jobConf.getInt(
      AbstractColumnSerDe.CASSANDRA_SPLIT_SIZE,
      AbstractColumnSerDe.DEFAULT_SPLIT_SIZE);
  String cassandraColumnMapping = jobConf.get(AbstractColumnSerDe.CASSANDRA_COL_MAPPING);
  int rpcPort = jobConf.getInt(AbstractColumnSerDe.CASSANDRA_PORT, 9160);
  String host = jobConf.get(AbstractColumnSerDe.CASSANDRA_HOST);
  String partitioner = jobConf.get(AbstractColumnSerDe.CASSANDRA_PARTITIONER);

  if (cassandraColumnMapping == null) {
    throw new IOException("cassandra.columns.mapping required for Cassandra Table.");
  }

  SliceRange range = new SliceRange();
  range.setStart(new byte[0]);
  range.setFinish(new byte[0]);
  range.setReversed(false);
  range.setCount(slicePredicateSize);
  SlicePredicate predicate = new SlicePredicate();
  predicate.setSlice_range(range);

  ConfigHelper.setInputRpcPort(jobConf, "" + rpcPort);
  ConfigHelper.setInputInitialAddress(jobConf, host);
  ConfigHelper.setInputPartitioner(jobConf, partitioner);
  ConfigHelper.setInputSlicePredicate(jobConf, predicate);
  ConfigHelper.setInputColumnFamily(jobConf, ks, cf);
  ConfigHelper.setRangeBatchSize(jobConf, sliceRangeSize);
  ConfigHelper.setInputSplitSize(jobConf, splitSize);

  Job job = new Job(jobConf);
  JobContext jobContext = new JobContext(job.getConfiguration(), job.getJobID());

  Path[] tablePaths = FileInputFormat.getInputPaths(jobContext);
  List<org.apache.hadoop.mapreduce.InputSplit> splits = getSplits(jobContext);
  InputSplit[] results = new InputSplit[splits.size()];

  for (int i = 0; i < splits.size(); ++i) {
    HiveCassandraStandardSplit csplit = new HiveCassandraStandardSplit(
        (ColumnFamilySplit) splits.get(i), cassandraColumnMapping, tablePaths[0]);
    csplit.setKeyspace(ks);
    csplit.setColumnFamily(cf);
    csplit.setRangeBatchSize(sliceRangeSize);
    csplit.setSplitSize(splitSize);
    csplit.setHost(host);
    csplit.setPort(rpcPort);
    csplit.setSlicePredicateSize(slicePredicateSize);
    csplit.setPartitioner(partitioner);
    csplit.setColumnMapping(cassandraColumnMapping);
    results[i] = csplit;
  }
  return results;
}