下面列出了org.apache.hadoop.mapreduce.lib.input.FileInputFormat#getInputPaths ( ) 实例代码,或者点击链接到github查看源代码,也可以在右侧发表评论。
/**
* Submit this job to mapred. The state becomes RUNNING if submission
* is successful, FAILED otherwise.
*/
protected synchronized void submit() {
try {
Configuration conf = job.getConfiguration();
if (conf.getBoolean(CREATE_DIR, false)) {
FileSystem fs = FileSystem.get(conf);
Path inputPaths[] = FileInputFormat.getInputPaths(job);
for (int i = 0; i < inputPaths.length; i++) {
if (!fs.exists(inputPaths[i])) {
try {
fs.mkdirs(inputPaths[i]);
} catch (IOException e) {
}
}
}
}
job.submit();
this.state = State.RUNNING;
} catch (Exception ioe) {
LOG.info(getJobName()+" got an error while submitting ",ioe);
this.state = State.FAILED;
this.message = StringUtils.stringifyException(ioe);
}
}
/**
* Submit this job to mapred. The state becomes RUNNING if submission
* is successful, FAILED otherwise.
*/
protected synchronized void submit() {
try {
Configuration conf = job.getConfiguration();
if (conf.getBoolean(CREATE_DIR, false)) {
FileSystem fs = FileSystem.get(conf);
Path inputPaths[] = FileInputFormat.getInputPaths(job);
for (int i = 0; i < inputPaths.length; i++) {
if (!fs.exists(inputPaths[i])) {
try {
fs.mkdirs(inputPaths[i]);
} catch (IOException e) {
}
}
}
}
job.submit();
this.state = State.RUNNING;
} catch (Exception ioe) {
LOG.info(getJobName()+" got an error while submitting ",ioe);
this.state = State.FAILED;
this.message = StringUtils.stringifyException(ioe);
}
}
public static TypeDescription getNewestSchemaFromSource(Job job, FileSystem fs)
throws IOException {
Path[] sourceDirs = FileInputFormat.getInputPaths(job);
if (sourceDirs.length == 0) {
throw new IllegalStateException("There should be at least one directory specified for the MR job");
}
List<FileStatus> files = new ArrayList<FileStatus>();
for (Path sourceDir : sourceDirs) {
files.addAll(FileListUtils.listFilesRecursively(fs, sourceDir));
}
Collections.sort(files, new MRCompactorAvroKeyDedupJobRunner.LastModifiedDescComparator());
TypeDescription resultSchema;
for (FileStatus status : files) {
resultSchema = getTypeDescriptionFromFile(job.getConfiguration(), status.getPath());
if (resultSchema != null) {
return resultSchema;
}
}
throw new IllegalStateException(String
.format("There's no file carrying orc file schema in the list of directories: %s",
Arrays.toString(sourceDirs)));
}
public static Schema getNewestSchemaFromSource(Job job, FileSystem fs) throws IOException {
Path[] sourceDirs = FileInputFormat.getInputPaths(job);
List<FileStatus> files = new ArrayList<FileStatus>();
for (Path sourceDir : sourceDirs) {
files.addAll(Arrays.asList(fs.listStatus(sourceDir)));
}
Collections.sort(files, new LastModifiedDescComparator());
for (FileStatus file : files) {
Schema schema = getNewestSchemaFromSource(file.getPath(), fs);
if (schema != null) {
return schema;
}
}
return null;
}
@Override
protected List<FileStatus> listStatus(JobContext job) throws IOException {
Path[] dirs = FileInputFormat.getInputPaths(job);
if (dirs.length == 0) {
throw new IOException("No input paths specified in job");
}
List<FileStatus> files = new ArrayList<FileStatus>();
for (int i=0; i<dirs.length; ++i) {
Path p = dirs[i];
FileSystem fs = p.getFileSystem(job.getConfiguration());
FileStatus[] matches = fs.globStatus(p, hiddenFileFilter);
if (matches == null) {
throw new IOException("Input path does not exist: " + p);
} else if (matches.length == 0) {
throw new IOException("Input Pattern " + p + " matches 0 files");
} else {
for (FileStatus globStat: matches) {
files.add(globStat);
}
}
}
return MapRedUtil.getAllFileRecursively(files, job.getConfiguration());
}
@Override
public BaseStatistics getStatistics(BaseStatistics cachedStats) throws IOException {
// only gather base statistics for FileInputFormats
if (!(mapreduceInputFormat instanceof FileInputFormat)) {
return null;
}
JobContext jobContext = new JobContextImpl(configuration, null);
final FileBaseStatistics cachedFileStats = (cachedStats instanceof FileBaseStatistics) ?
(FileBaseStatistics) cachedStats : null;
try {
final org.apache.hadoop.fs.Path[] paths = FileInputFormat.getInputPaths(jobContext);
return getFileStats(cachedFileStats, paths, new ArrayList<FileStatus>(1));
} catch (IOException ioex) {
if (LOG.isWarnEnabled()) {
LOG.warn("Could not determine statistics due to an io error: "
+ ioex.getMessage());
}
} catch (Throwable t) {
if (LOG.isErrorEnabled()) {
LOG.error("Unexpected problem while getting the file statistics: "
+ t.getMessage(), t);
}
}
// no statistics available
return null;
}
@Override
public BaseStatistics getStatistics(BaseStatistics cachedStats) throws IOException {
// only gather base statistics for FileInputFormats
if (!(mapreduceInputFormat instanceof FileInputFormat)) {
return null;
}
JobContext jobContext = new JobContextImpl(configuration, null);
final FileBaseStatistics cachedFileStats = (cachedStats instanceof FileBaseStatistics) ?
(FileBaseStatistics) cachedStats : null;
try {
final org.apache.hadoop.fs.Path[] paths = FileInputFormat.getInputPaths(jobContext);
return getFileStats(cachedFileStats, paths, new ArrayList<FileStatus>(1));
} catch (IOException ioex) {
if (LOG.isWarnEnabled()) {
LOG.warn("Could not determine statistics due to an io error: "
+ ioex.getMessage());
}
} catch (Throwable t) {
if (LOG.isErrorEnabled()) {
LOG.error("Unexpected problem while getting the file statistics: "
+ t.getMessage(), t);
}
}
// no statistics available
return null;
}
@Override
public List<InputSplit> getSplits(JobContext context)
throws IOException {
Path[] paths = FileInputFormat.getInputPaths(context);
return FluentIterable.from(BaseInputFormat.getSplits(context.getConfiguration(), paths))
.transform(_fromSplit)
.toList();
}
@Override
public BaseStatistics getStatistics(BaseStatistics cachedStats) throws IOException {
// only gather base statistics for FileInputFormats
if (!(mapreduceInputFormat instanceof FileInputFormat)) {
return null;
}
JobContext jobContext = new JobContextImpl(configuration, null);
final FileBaseStatistics cachedFileStats = (cachedStats instanceof FileBaseStatistics) ?
(FileBaseStatistics) cachedStats : null;
try {
final org.apache.hadoop.fs.Path[] paths = FileInputFormat.getInputPaths(jobContext);
return getFileStats(cachedFileStats, paths, new ArrayList<FileStatus>(1));
} catch (IOException ioex) {
if (LOG.isWarnEnabled()) {
LOG.warn("Could not determine statistics due to an io error: "
+ ioex.getMessage());
}
} catch (Throwable t) {
if (LOG.isErrorEnabled()) {
LOG.error("Unexpected problem while getting the file statistics: "
+ t.getMessage(), t);
}
}
// no statistics available
return null;
}
@Override
public List<InputSplit> getSplits(JobContext context)
throws IOException, InterruptedException {
Path[] inputPaths = FileInputFormat.getInputPaths(context);
if (inputPaths == null || inputPaths.length == 0) {
throw new IOException("No input found!");
}
List<String> allPaths = Lists.newArrayList();
for (Path path : inputPaths) {
// path is a single work unit / multi work unit
FileSystem fs = path.getFileSystem(context.getConfiguration());
FileStatus[] inputs = fs.listStatus(path);
if (inputs == null) {
throw new IOException(String.format("Path %s does not exist.", path));
}
log.info(String.format("Found %d input files at %s: %s", inputs.length, path, Arrays.toString(inputs)));
for (FileStatus input : inputs) {
allPaths.add(input.getPath().toString());
}
}
int maxMappers = getMaxMapper(context.getConfiguration());
int numTasksPerMapper =
allPaths.size() % maxMappers == 0 ? allPaths.size() / maxMappers : allPaths.size() / maxMappers + 1;
List<InputSplit> splits = Lists.newArrayList();
Iterator<String> pathsIt = allPaths.iterator();
while (pathsIt.hasNext()) {
Iterator<String> limitedIterator = Iterators.limit(pathsIt, numTasksPerMapper);
splits.add(new GobblinSplit(Lists.newArrayList(limitedIterator)));
}
return splits;
}
public int run(String [] argv) throws Exception {
Job job = Job.getInstance(getConf());
job.setJarByClass(GenericMRLoadGenerator.class);
job.setMapperClass(SampleMapper.class);
job.setReducerClass(SampleReducer.class);
if (!parseArgs(argv, job)) {
return -1;
}
Configuration conf = job.getConfiguration();
if (null == FileOutputFormat.getOutputPath(job)) {
// No output dir? No writes
job.setOutputFormatClass(NullOutputFormat.class);
}
if (0 == FileInputFormat.getInputPaths(job).length) {
// No input dir? Generate random data
System.err.println("No input path; ignoring InputFormat");
confRandom(job);
} else if (null != conf.getClass(INDIRECT_INPUT_FORMAT, null)) {
// specified IndirectInputFormat? Build src list
JobClient jClient = new JobClient(conf);
Path tmpDir = new Path("/tmp");
Random r = new Random();
Path indirInputFile = new Path(tmpDir,
Integer.toString(r.nextInt(Integer.MAX_VALUE), 36) + "_files");
conf.set(INDIRECT_INPUT_FILE, indirInputFile.toString());
SequenceFile.Writer writer = SequenceFile.createWriter(
tmpDir.getFileSystem(conf), conf, indirInputFile,
LongWritable.class, Text.class,
SequenceFile.CompressionType.NONE);
try {
for (Path p : FileInputFormat.getInputPaths(job)) {
FileSystem fs = p.getFileSystem(conf);
Stack<Path> pathstack = new Stack<Path>();
pathstack.push(p);
while (!pathstack.empty()) {
for (FileStatus stat : fs.listStatus(pathstack.pop())) {
if (stat.isDirectory()) {
if (!stat.getPath().getName().startsWith("_")) {
pathstack.push(stat.getPath());
}
} else {
writer.sync();
writer.append(new LongWritable(stat.getLen()),
new Text(stat.getPath().toUri().toString()));
}
}
}
}
} finally {
writer.close();
}
}
Date startTime = new Date();
System.out.println("Job started: " + startTime);
int ret = job.waitForCompletion(true) ? 0 : 1;
Date endTime = new Date();
System.out.println("Job ended: " + endTime);
System.out.println("The job took " +
(endTime.getTime() - startTime.getTime()) /1000 +
" seconds.");
return ret;
}
@Override
public int run(String[] strings) throws Exception {
Configuration conf = getConf();
String dir = conf.get(LindenJobConfig.INPUT_DIR, null);
logger.info("input dir:" + dir);
Path inputPath = new Path(StringUtils.unEscapeString(dir));
Path outputPath = new Path(conf.get(LindenJobConfig.OUTPUT_DIR));
String indexPath = conf.get(LindenJobConfig.INDEX_PATH);
FileSystem fs = FileSystem.get(conf);
if (fs.exists(outputPath)) {
fs.delete(outputPath, true);
}
if (fs.exists(new Path(indexPath))) {
fs.delete(new Path(indexPath), true);
}
int numShards = conf.getInt(LindenJobConfig.NUM_SHARDS, 1);
Shard[] shards = createShards(indexPath, numShards);
Shard.setIndexShards(conf, shards);
//empty trash;
(new Trash(conf)).expunge();
Job job = Job.getInstance(conf, "linden-hadoop-indexing");
job.setJarByClass(LindenJob.class);
job.setMapperClass(LindenMapper.class);
job.setCombinerClass(LindenCombiner.class);
job.setReducerClass(LindenReducer.class);
job.setMapOutputKeyClass(Shard.class);
job.setMapOutputValueClass(IntermediateForm.class);
job.setOutputKeyClass(Shard.class);
job.setOutputValueClass(Text.class);
job.setInputFormatClass(TextInputFormat.class);
job.setOutputFormatClass(IndexUpdateOutputFormat.class);
job.setReduceSpeculativeExecution(false);
job.setNumReduceTasks(numShards);
String lindenSchemaFile = conf.get(LindenJobConfig.SCHEMA_FILE_URL);
if (lindenSchemaFile == null) {
throw new IOException("no schema file is found");
}
logger.info("Adding schema file: " + lindenSchemaFile);
job.addCacheFile(new URI(lindenSchemaFile + "#lindenSchema"));
String lindenPropertiesFile = conf.get(LindenJobConfig.LINDEN_PROPERTIES_FILE_URL);
if (lindenPropertiesFile == null) {
throw new IOException("no linden properties file is found");
}
logger.info("Adding linden properties file: " + lindenPropertiesFile);
job.addCacheFile(new URI(lindenPropertiesFile + "#lindenProperties"));
FileInputFormat.setInputPaths(job, inputPath);
FileOutputFormat.setOutputPath(job, outputPath);
Path[] inputs = FileInputFormat.getInputPaths(job);
StringBuilder buffer = new StringBuilder(inputs[0].toString());
for (int i = 1; i < inputs.length; i++) {
buffer.append(",");
buffer.append(inputs[i].toString());
}
logger.info("mapreduce.input.dir = " + buffer.toString());
logger.info("mapreduce.output.dir = " + FileOutputFormat.getOutputPath(job).toString());
logger.info("mapreduce.job.num.reduce.tasks = " + job.getNumReduceTasks());
logger.info(shards.length + " shards = " + conf.get(LindenJobConfig.INDEX_SHARDS));
logger.info("mapreduce.input.format.class = " + job.getInputFormatClass());
logger.info("mapreduce.output.format.class = " + job.getOutputFormatClass());
logger.info("mapreduce.cluster.temp.dir = " + conf.get(MRJobConfig.TEMP_DIR));
job.waitForCompletion(true);
if (!job.isSuccessful()) {
throw new RuntimeException("Job failed");
}
return 0;
}
public int run(String [] argv) throws Exception {
Job job = Job.getInstance(getConf());
job.setJarByClass(GenericMRLoadGenerator.class);
job.setMapperClass(SampleMapper.class);
job.setReducerClass(SampleReducer.class);
if (!parseArgs(argv, job)) {
return -1;
}
Configuration conf = job.getConfiguration();
if (null == FileOutputFormat.getOutputPath(job)) {
// No output dir? No writes
job.setOutputFormatClass(NullOutputFormat.class);
}
if (0 == FileInputFormat.getInputPaths(job).length) {
// No input dir? Generate random data
System.err.println("No input path; ignoring InputFormat");
confRandom(job);
} else if (null != conf.getClass(INDIRECT_INPUT_FORMAT, null)) {
// specified IndirectInputFormat? Build src list
JobClient jClient = new JobClient(conf);
Path tmpDir = new Path("/tmp");
Random r = new Random();
Path indirInputFile = new Path(tmpDir,
Integer.toString(r.nextInt(Integer.MAX_VALUE), 36) + "_files");
conf.set(INDIRECT_INPUT_FILE, indirInputFile.toString());
SequenceFile.Writer writer = SequenceFile.createWriter(
tmpDir.getFileSystem(conf), conf, indirInputFile,
LongWritable.class, Text.class,
SequenceFile.CompressionType.NONE);
try {
for (Path p : FileInputFormat.getInputPaths(job)) {
FileSystem fs = p.getFileSystem(conf);
Stack<Path> pathstack = new Stack<Path>();
pathstack.push(p);
while (!pathstack.empty()) {
for (FileStatus stat : fs.listStatus(pathstack.pop())) {
if (stat.isDirectory()) {
if (!stat.getPath().getName().startsWith("_")) {
pathstack.push(stat.getPath());
}
} else {
writer.sync();
writer.append(new LongWritable(stat.getLen()),
new Text(stat.getPath().toUri().toString()));
}
}
}
}
} finally {
writer.close();
}
}
Date startTime = new Date();
System.out.println("Job started: " + startTime);
int ret = job.waitForCompletion(true) ? 0 : 1;
Date endTime = new Date();
System.out.println("Job ended: " + endTime);
System.out.println("The job took " +
(endTime.getTime() - startTime.getTime()) /1000 +
" seconds.");
return ret;
}
/**
* This method is called by the FileInputFormat to find the input paths for
* which splits should be calculated.<br/>
* If applyDateRanges == true: Then the HiveRCDateSplitter is used to apply
* filtering on the input files.<br/>
* Else the default FileInputFormat listStatus method is used.
*
* @param ctx
* JobContext
* @param loaderClass
* this is chosen to be a subclass of LoadFunc to maintain some
* consistency.
*/
public List<FileStatus> listStatus(JobContext ctx,
Class<? extends LoadFunc> loaderClass, String signature)
throws IOException {
Properties properties = UDFContext.getUDFContext().getUDFProperties(
loaderClass, new String[] { signature });
String partitionExpression = properties
.getProperty(PARITITION_FILTER_EXPRESSION);
ExpressionFactory expressionFactory = null;
if (partitionExpression != null) {
expressionFactory = ExpressionFactory.newInstance();
}
String partitionColumnStr = properties
.getProperty(PathPartitionHelper.PARTITION_COLUMNS);
String[] partitionKeys = (partitionColumnStr == null) ? null
: partitionColumnStr.split(",");
Path[] inputPaths = FileInputFormat.getInputPaths(ctx);
List<FileStatus> splitPaths = null;
if (partitionKeys != null) {
splitPaths = new ArrayList<FileStatus>();
for (Path inputPath : inputPaths) {
// for each input path work recursively through each partition
// level to find the rc files
FileSystem fs = inputPath.getFileSystem(ctx.getConfiguration());
if (fs.getFileStatus(inputPath).isDir()) {
// assure that we are at the root of the partition tree.
FileStatus fileStatusArr[] = fs.listStatus(inputPath);
if (fileStatusArr != null) {
for (FileStatus childFileStatus : fileStatusArr) {
getPartitionedFiles(expressionFactory,
partitionExpression, fs, childFileStatus,
0, partitionKeys, splitPaths);
}
}
} else {
splitPaths.add(fs.getFileStatus(inputPath));
}
}
if (splitPaths.size() < 1) {
LOG.error("Not split paths where found, please check that the filter logic for the partition keys does not filter out everything ");
}
}
return splitPaths;
}
@Override
public InputSplit[] getSplits(JobConf jobConf, int numSplits) throws IOException {
String ks = jobConf.get(AbstractColumnSerDe.CASSANDRA_KEYSPACE_NAME);
String cf = jobConf.get(AbstractColumnSerDe.CASSANDRA_CF_NAME);
int slicePredicateSize = jobConf.getInt(AbstractColumnSerDe.CASSANDRA_SLICE_PREDICATE_SIZE,
AbstractColumnSerDe.DEFAULT_SLICE_PREDICATE_SIZE);
int sliceRangeSize = jobConf.getInt(
AbstractColumnSerDe.CASSANDRA_RANGE_BATCH_SIZE,
AbstractColumnSerDe.DEFAULT_RANGE_BATCH_SIZE);
int splitSize = jobConf.getInt(
AbstractColumnSerDe.CASSANDRA_SPLIT_SIZE,
AbstractColumnSerDe.DEFAULT_SPLIT_SIZE);
String cassandraColumnMapping = jobConf.get(AbstractColumnSerDe.CASSANDRA_COL_MAPPING);
int rpcPort = jobConf.getInt(AbstractColumnSerDe.CASSANDRA_PORT, 9160);
String host = jobConf.get(AbstractColumnSerDe.CASSANDRA_HOST);
String partitioner = jobConf.get(AbstractColumnSerDe.CASSANDRA_PARTITIONER);
if (cassandraColumnMapping == null) {
throw new IOException("cassandra.columns.mapping required for Cassandra Table.");
}
SliceRange range = new SliceRange();
range.setStart(new byte[0]);
range.setFinish(new byte[0]);
range.setReversed(false);
range.setCount(slicePredicateSize);
SlicePredicate predicate = new SlicePredicate();
predicate.setSlice_range(range);
ConfigHelper.setInputRpcPort(jobConf, "" + rpcPort);
ConfigHelper.setInputInitialAddress(jobConf, host);
ConfigHelper.setInputPartitioner(jobConf, partitioner);
ConfigHelper.setInputSlicePredicate(jobConf, predicate);
ConfigHelper.setInputColumnFamily(jobConf, ks, cf);
ConfigHelper.setRangeBatchSize(jobConf, sliceRangeSize);
ConfigHelper.setInputSplitSize(jobConf, splitSize);
Job job = new Job(jobConf);
JobContext jobContext = new JobContext(job.getConfiguration(), job.getJobID());
Path[] tablePaths = FileInputFormat.getInputPaths(jobContext);
List<org.apache.hadoop.mapreduce.InputSplit> splits = getSplits(jobContext);
InputSplit[] results = new InputSplit[splits.size()];
for (int i = 0; i < splits.size(); ++i) {
HiveCassandraStandardSplit csplit = new HiveCassandraStandardSplit(
(ColumnFamilySplit) splits.get(i), cassandraColumnMapping, tablePaths[0]);
csplit.setKeyspace(ks);
csplit.setColumnFamily(cf);
csplit.setRangeBatchSize(sliceRangeSize);
csplit.setSplitSize(splitSize);
csplit.setHost(host);
csplit.setPort(rpcPort);
csplit.setSlicePredicateSize(slicePredicateSize);
csplit.setPartitioner(partitioner);
csplit.setColumnMapping(cassandraColumnMapping);
results[i] = csplit;
}
return results;
}