org.apache.hadoop.mapred.JobConf#setJarByClass ( )源码实例Demo

下面列出了org.apache.hadoop.mapred.JobConf#setJarByClass ( ) 实例代码,或者点击链接到github查看源代码,也可以在右侧发表评论。

源代码1 项目: hadoop-book   文件: TeraGen.java
/**
 * @param args the cli arguments
 */
public int run(String[] args) throws IOException {
  JobConf job = (JobConf) getConf();
  setNumberOfRows(job, Long.parseLong(args[0]));
  FileOutputFormat.setOutputPath(job, new Path(args[1]));
  job.setJobName("TeraGen");
  job.setJarByClass(TeraGen.class);
  job.setMapperClass(SortGenMapper.class);
  job.setNumReduceTasks(0);
  job.setOutputKeyClass(Text.class);
  job.setOutputValueClass(Text.class);
  job.setInputFormat(RangeInputFormat.class);
  job.setOutputFormat(TeraOutputFormat.class);
  JobClient.runJob(job);
  return 0;
}
 
源代码2 项目: tracing-framework   文件: ReadExistingDataJob.java
public void configure(JobConf job) {
    // Set the mapper and reducers
    job.setMapperClass(ReadDataJob.TestMapper.class);

    // Make sure this jar is included
    job.setJarByClass(ReadDataJob.TestMapper.class);

    // Specify the input and output data formats
    job.setInputFormat(TextInputFormat.class);
    job.setOutputFormat(NullOutputFormat.class);

    // Turn off speculative execution
    job.setMapSpeculativeExecution(false);
    job.setReduceSpeculativeExecution(false);

    // Add the job input path
    FileInputFormat.addInputPath(job, new Path(this.input_path));
}
 
源代码3 项目: tracing-framework   文件: ReadDataJob.java
public void configure(JobConf job) {
    // Set the mapper and reducers
    job.setMapperClass(TestMapper.class);
    // job.setReducerClass(TestReducer.class);

    // Set the output types of the mapper and reducer
    // job.setMapOutputKeyClass(IntWritable.class);
    // job.setMapOutputValueClass(NullWritable.class);
    // job.setOutputKeyClass(NullWritable.class);
    // job.setOutputValueClass(NullWritable.class);

    // Make sure this jar is included
    job.setJarByClass(TestMapper.class);

    // Specify the input and output data formats
    job.setInputFormat(TextInputFormat.class);
    job.setOutputFormat(NullOutputFormat.class);

    // Turn off speculative execution
    job.setMapSpeculativeExecution(false);
    job.setReduceSpeculativeExecution(false);

    // Add the job input path
    FileInputFormat.addInputPath(job, new Path(this.input_filename));
}
 
源代码4 项目: hiped2   文件: Main.java
public static void main(String... args) throws Exception {

    JobConf job = new JobConf();
    job.setJarByClass(Main.class);

    String input = args[0];
    Path output = new Path(args[1]);

    output.getFileSystem(job).delete(output, true);

    job.setMapperClass(Map.class);
    job.setReducerClass(Reduce.class);

    job.setMapOutputKeyClass(Text.class);
    job.setMapOutputValueClass(TextTaggedMapOutput.class);
    job.setOutputKeyClass(Text.class);
    job.setOutputValueClass(Text.class);

    FileInputFormat.setInputPaths(job, input);
    FileOutputFormat.setOutputPath(job, output);

    JobClient.runJob(job);
  }
 
源代码5 项目: hadoop-book   文件: TeraValidate.java
public int run(String[] args) throws Exception {
  JobConf job = (JobConf) getConf();
  TeraInputFormat.setInputPaths(job, new Path(args[0]));
  FileOutputFormat.setOutputPath(job, new Path(args[1]));
  job.setJobName("TeraValidate");
  job.setJarByClass(TeraValidate.class);
  job.setMapperClass(ValidateMapper.class);
  job.setReducerClass(ValidateReducer.class);
  job.setOutputKeyClass(Text.class);
  job.setOutputValueClass(Text.class);
  // force a single reducer
  job.setNumReduceTasks(1);
  // force a single split 
  job.setLong("mapred.min.split.size", Long.MAX_VALUE);
  job.setInputFormat(TeraInputFormat.class);
  JobClient.runJob(job);
  return 0;
}
 
源代码6 项目: RDFS   文件: TeraGen.java
/**
 * @param args the cli arguments
 */
public int run(String[] args) throws IOException {
  JobConf job = (JobConf) getConf();
  setNumberOfRows(job, Long.parseLong(args[0]));
  FileOutputFormat.setOutputPath(job, new Path(args[1]));
  job.setJobName("TeraGen");
  job.setJarByClass(TeraGen.class);
  job.setMapperClass(SortGenMapper.class);
  job.setNumReduceTasks(0);
  job.setOutputKeyClass(Text.class);
  job.setOutputValueClass(Text.class);
  job.setInputFormat(RangeInputFormat.class);
  job.setOutputFormat(TeraOutputFormat.class);
  JobClient.runJob(job);
  return 0;
}
 
源代码7 项目: RDFS   文件: TeraValidate.java
public int run(String[] args) throws Exception {
  JobConf job = (JobConf) getConf();
  TeraInputFormat.setInputPaths(job, new Path(args[0]));
  FileOutputFormat.setOutputPath(job, new Path(args[1]));
  job.setJobName("TeraValidate");
  job.setJarByClass(TeraValidate.class);
  job.setMapperClass(ValidateMapper.class);
  job.setReducerClass(ValidateReducer.class);
  job.setOutputKeyClass(Text.class);
  job.setOutputValueClass(Text.class);
  // force a single reducer
  job.setNumReduceTasks(1);
  // force a single split 
  job.setLong("mapred.min.split.size", Long.MAX_VALUE);
  job.setInputFormat(TeraInputFormat.class);
  JobClient.runJob(job);
  return 0;
}
 
源代码8 项目: RDFS   文件: DataFsck.java
private JobConf createJobConf() {
  JobConf jobConf = new JobConf(getConf());
  String jobName = NAME + " " + dateForm.format(new Date(System.currentTimeMillis()));
  jobConf.setJobName(jobName);
  jobConf.setMapSpeculativeExecution(false);

  jobConf.setJarByClass(DataFsck.class);
  jobConf.setInputFormat(DataFsckInputFormat.class);
  jobConf.setOutputFormat(SequenceFileOutputFormat.class);
  jobConf.setOutputKeyClass(Text.class);
  jobConf.setOutputValueClass(Text.class);

  jobConf.setMapperClass(DataFsckMapper.class);
  jobConf.setNumReduceTasks(0);
  return jobConf;
}
 
源代码9 项目: big-c   文件: TestMROldApiJobs.java
public static void runJobFail(JobConf conf, Path inDir, Path outDir)
       throws IOException, InterruptedException {
  conf.setJobName("test-job-fail");
  conf.setMapperClass(FailMapper.class);
  conf.setJarByClass(FailMapper.class);
  conf.setReducerClass(IdentityReducer.class);
  conf.setMaxMapAttempts(1);
  
  boolean success = runJob(conf, inDir, outDir, 1, 0);
  Assert.assertFalse("Job expected to fail succeeded", success);
}
 
@Override
public int run(String[] args) throws Exception {
	if (args.length != 4) {
		printUsage();
	}
	Path userPath = new Path(args[0]);
	Path commentPath = new Path(args[1]);
	Path outputDir = new Path(args[2]);
	String joinType = args[3];
	JobConf conf = new JobConf("CompositeJoin");
	conf.setJarByClass(CompositeUserJoin.class);
	conf.setMapperClass(CompositeMapper.class);
	conf.setNumReduceTasks(0);
	// Set the input format class to a CompositeInputFormat class.
	// The CompositeInputFormat will parse all of our input files and output
	// records to our mapper.
	conf.setInputFormat(CompositeInputFormat.class);
	// The composite input format join expression will set how the records
	// are going to be read in, and in what input format.
	conf.set("mapred.join.expr", CompositeInputFormat.compose(joinType,
			KeyValueTextInputFormat.class, userPath, commentPath));
	TextOutputFormat.setOutputPath(conf, outputDir);
	conf.setOutputKeyClass(Text.class);
	conf.setOutputValueClass(Text.class);
	RunningJob job = JobClient.runJob(conf);
	while (!job.isComplete()) {
		Thread.sleep(1000);
	}
	return job.isSuccessful() ? 0 : 1;
}
 
源代码11 项目: mr4c   文件: MRv1TestBinding.java
public synchronized JobConf createTestMRJobConf() throws IOException {
	if ( m_mrCluster==null ) {
		startMRCluster();
	}
	JobConf job = m_mrCluster.createJobConf();
	job.setJarByClass(AlgoRunner.class);
	return job;
}
 
源代码12 项目: mr4c   文件: YarnTestBinding.java
public JobConf createTestMRJobConf() throws IOException {
	if ( m_mrCluster==null ) {
		startMrCluster();
	}
	JobConf job = new JobConf(m_mrCluster.getConfig());
	job.setJarByClass(AlgoRunner.class);
	return job;
}
 
源代码13 项目: RDFS   文件: TeraSort.java
public int run(String[] args) throws Exception {
  LOG.info("starting");
  JobConf job = (JobConf) getConf();
  Path inputDir = new Path(args[0]);
  inputDir = inputDir.makeQualified(inputDir.getFileSystem(job));
  Path partitionFile = new Path(inputDir, TeraInputFormat.PARTITION_FILENAME);
  URI partitionUri = new URI(partitionFile.toString() +
                             "#" + TeraInputFormat.PARTITION_FILENAME);
  TeraInputFormat.setInputPaths(job, new Path(args[0]));
  FileOutputFormat.setOutputPath(job, new Path(args[1]));
  job.setJobName("TeraSort");
  job.setJarByClass(TeraSort.class);
  job.setOutputKeyClass(Text.class);
  job.setOutputValueClass(Text.class);
  job.setInputFormat(TeraInputFormat.class);
  job.setOutputFormat(TeraOutputFormat.class);
  job.setPartitionerClass(TotalOrderPartitioner.class);
  TeraInputFormat.writePartitionFile(job, partitionFile);
  DistributedCache.addCacheFile(partitionUri, job);
  DistributedCache.createSymlink(job);
  job.setInt("dfs.replication", 1);
  TeraOutputFormat.setFinalSync(job, true);
  long startTime = System.currentTimeMillis();
  JobClient.runJob(job);
  long endTime = System.currentTimeMillis();
  System.out.println((float)(endTime-startTime)/1000);
  LOG.info("done");
  return 0;
}
 
源代码14 项目: wikireverse   文件: SegmentCombiner.java
public int run(String[] args) throws Exception {
	// Get current configuration.
	Configuration conf = getConf();

	// Parse command line arguments.
	String inputPaths = args[0];
	String outputPath = args[1];

	JobConf job = new JobConf(conf);

	// Set input path.
	if (inputPaths.length() > 0) {
		List<String> segmentPaths = Lists.newArrayList(Splitter.on(",")
				.split(inputPaths));

		for (String segmentPath : segmentPaths) {
			LOG.info("Adding input path " + segmentPath);
			FileInputFormat.addInputPath(job, new Path(segmentPath));
		}
	} else {
		System.err.println("No input path found.");
		return 1;
	}

	// Set output path.
	if (outputPath.length() > 0) {
		LOG.info("Setting output path to " + outputPath);
		SequenceFileOutputFormat.setOutputPath(job, new Path(outputPath));
		// Compress output to boost performance.
		SequenceFileOutputFormat.setCompressOutput(job, true);
		SequenceFileOutputFormat.getOutputCompressorClass(job, GzipCodec.class);
	} else {
		System.err.println("No output path found.");
		return 1;
	}

	// Load other classes from same jar as this class.
	job.setJarByClass(SegmentCombiner.class);

	// Input is Hadoop sequence file format.
	job.setInputFormat(SequenceFileInputFormat.class);

	// Output is Hadoop sequence file format.
	job.setOutputFormat(SequenceFileOutputFormat.class);

	// Set the output data types.
	job.setOutputKeyClass(Text.class);
	job.setOutputValueClass(LinkArrayWritable.class);

	// Use custom mapper class.
	job.setMapperClass(SegmentCombinerMapper.class);

	// Use custom reducer class.
	job.setReducerClass(LinkArrayReducer.class);

	if (JobClient.runJob(job).isSuccessful())
		return 0;
	else
		return 1;
}
 
源代码15 项目: wikireverse   文件: OutputToText.java
public int run(String[] args) throws Exception {
	// Get current configuration.
	Configuration conf = getConf();

	// Parse command line arguments.
	String inputPaths = args[0];
	String outputPath = args[1];

	JobConf job = new JobConf(conf);

	// Set input path.
	if (inputPaths.length() > 0) {
		List<String> segmentPaths = Lists.newArrayList(Splitter.on(",")
				.split(inputPaths));

		for (String segmentPath : segmentPaths) {
			LOG.info("Adding input path " + segmentPath);
			FileInputFormat.addInputPath(job, new Path(segmentPath));
		}
	} else {
		System.err.println("No input path found.");
		return 1;
	}

	// Set output path.
	if (outputPath.length() > 0) {
		LOG.info("Setting output path to " + outputPath);
		TextOutputFormat.setOutputPath(job, new Path(outputPath));
		// Compress output to boost performance.
		TextOutputFormat.setCompressOutput(job, true);
		TextOutputFormat.getOutputCompressorClass(job, GzipCodec.class);
	} else {
		System.err.println("No output path found.");
		return 1;
	}

	// Load other classes from same jar as this class.
	job.setJarByClass(OutputToText.class);

	// Input is Hadoop sequence file format.
	job.setInputFormat(SequenceFileInputFormat.class);

	// Output is text format for import into database later.
	job.setOutputFormat(TextOutputFormat.class);

	// Set the output data types.
	job.setOutputKeyClass(Text.class);
	job.setOutputValueClass(Text.class);

	// Use custom mapper class.
	job.setMapperClass(OutputToTextMapper.class);

	// Use standard reducer class.
	job.setReducerClass(IdentityReducer.class);

	if (JobClient.runJob(job).isSuccessful())
		return 0;
	else
		return 1;
}
 
源代码16 项目: hbase   文件: TestTableSnapshotInputFormat.java
public static void doTestWithMapReduce(HBaseTestingUtility util, TableName tableName,
    String snapshotName, byte[] startRow, byte[] endRow, Path tableDir, int numRegions,
    int numSplitsPerRegion,int expectedNumSplits, boolean shutdownCluster) throws Exception {

  //create the table and snapshot
  createTableAndSnapshot(util, tableName, snapshotName, startRow, endRow, numRegions);

  if (shutdownCluster) {
    util.shutdownMiniHBaseCluster();
  }

  try {
    // create the job
    JobConf jobConf = new JobConf(util.getConfiguration());

    jobConf.setJarByClass(util.getClass());
    org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil.addDependencyJarsForClasses(jobConf,
      TestTableSnapshotInputFormat.class);

    if(numSplitsPerRegion > 1) {
      TableMapReduceUtil.initTableSnapshotMapJob(snapshotName, COLUMNS,
              TestTableSnapshotMapper.class, ImmutableBytesWritable.class,
              NullWritable.class, jobConf, true, tableDir, new RegionSplitter.UniformSplit(),
              numSplitsPerRegion);
    } else {
      TableMapReduceUtil.initTableSnapshotMapJob(snapshotName, COLUMNS,
              TestTableSnapshotMapper.class, ImmutableBytesWritable.class,
              NullWritable.class, jobConf, true, tableDir);
    }

    jobConf.setReducerClass(TestTableSnapshotInputFormat.TestTableSnapshotReducer.class);
    jobConf.setNumReduceTasks(1);
    jobConf.setOutputFormat(NullOutputFormat.class);

    RunningJob job = JobClient.runJob(jobConf);
    Assert.assertTrue(job.isSuccessful());
  } finally {
    if (!shutdownCluster) {
      util.getAdmin().deleteSnapshot(snapshotName);
      util.deleteTable(tableName);
    }
  }
}
 
源代码17 项目: hadoop-gpu   文件: RandomWriter.java
/**
 * This is the main routine for launching a distributed random write job.
 * It runs 10 maps/node and each node writes 1 gig of data to a DFS file.
 * The reduce doesn't do anything.
 * 
 * @throws IOException 
 */
public int run(String[] args) throws Exception {    
  if (args.length == 0) {
    System.out.println("Usage: writer <out-dir>");
    ToolRunner.printGenericCommandUsage(System.out);
    return -1;
  }
  
  Path outDir = new Path(args[0]);
  JobConf job = new JobConf(getConf());
  
  job.setJarByClass(RandomWriter.class);
  job.setJobName("random-writer");
  FileOutputFormat.setOutputPath(job, outDir);
  
  job.setOutputKeyClass(BytesWritable.class);
  job.setOutputValueClass(BytesWritable.class);
  
  job.setInputFormat(RandomInputFormat.class);
  job.setMapperClass(Map.class);        
  job.setReducerClass(IdentityReducer.class);
  job.setOutputFormat(SequenceFileOutputFormat.class);
  
  JobClient client = new JobClient(job);
  ClusterStatus cluster = client.getClusterStatus();
  int numMapsPerHost = job.getInt("test.randomwriter.maps_per_host", 10);
  long numBytesToWritePerMap = job.getLong("test.randomwrite.bytes_per_map",
                                           1*1024*1024*1024);
  if (numBytesToWritePerMap == 0) {
    System.err.println("Cannot have test.randomwrite.bytes_per_map set to 0");
    return -2;
  }
  long totalBytesToWrite = job.getLong("test.randomwrite.total_bytes", 
       numMapsPerHost*numBytesToWritePerMap*cluster.getTaskTrackers());
  int numMaps = (int) (totalBytesToWrite / numBytesToWritePerMap);
  if (numMaps == 0 && totalBytesToWrite > 0) {
    numMaps = 1;
    job.setLong("test.randomwrite.bytes_per_map", totalBytesToWrite);
  }
  
  job.setNumMapTasks(numMaps);
  System.out.println("Running " + numMaps + " maps.");
  
  // reducer NONE
  job.setNumReduceTasks(0);
  
  Date startTime = new Date();
  System.out.println("Job started: " + startTime);
  JobClient.runJob(job);
  Date endTime = new Date();
  System.out.println("Job ended: " + endTime);
  System.out.println("The job took " + 
                     (endTime.getTime() - startTime.getTime()) /1000 + 
                     " seconds.");
  
  return 0;
}
 
源代码18 项目: hadoop-gpu   文件: RandomTextWriter.java
/**
 * This is the main routine for launching a distributed random write job.
 * It runs 10 maps/node and each node writes 1 gig of data to a DFS file.
 * The reduce doesn't do anything.
 * 
 * @throws IOException 
 */
public int run(String[] args) throws Exception {    
  if (args.length == 0) {
    return printUsage();    
  }
  
  JobConf job = new JobConf(getConf());
  
  job.setJarByClass(RandomTextWriter.class);
  job.setJobName("random-text-writer");
  
  job.setOutputKeyClass(Text.class);
  job.setOutputValueClass(Text.class);
  
  job.setInputFormat(RandomWriter.RandomInputFormat.class);
  job.setMapperClass(Map.class);        
  
  JobClient client = new JobClient(job);
  ClusterStatus cluster = client.getClusterStatus();
  int numMapsPerHost = job.getInt("test.randomtextwrite.maps_per_host", 10);
  long numBytesToWritePerMap = job.getLong("test.randomtextwrite.bytes_per_map",
                                           1*1024*1024*1024);
  if (numBytesToWritePerMap == 0) {
    System.err.println("Cannot have test.randomtextwrite.bytes_per_map set to 0");
    return -2;
  }
  long totalBytesToWrite = job.getLong("test.randomtextwrite.total_bytes", 
       numMapsPerHost*numBytesToWritePerMap*cluster.getTaskTrackers());
  int numMaps = (int) (totalBytesToWrite / numBytesToWritePerMap);
  if (numMaps == 0 && totalBytesToWrite > 0) {
    numMaps = 1;
    job.setLong("test.randomtextwrite.bytes_per_map", totalBytesToWrite);
  }
  
  Class<? extends OutputFormat> outputFormatClass = 
    SequenceFileOutputFormat.class;
  List<String> otherArgs = new ArrayList<String>();
  for(int i=0; i < args.length; ++i) {
    try {
      if ("-outFormat".equals(args[i])) {
        outputFormatClass = 
          Class.forName(args[++i]).asSubclass(OutputFormat.class);
      } else {
        otherArgs.add(args[i]);
      }
    } catch (ArrayIndexOutOfBoundsException except) {
      System.out.println("ERROR: Required parameter missing from " +
          args[i-1]);
      return printUsage(); // exits
    }
  }

  job.setOutputFormat(outputFormatClass);
  FileOutputFormat.setOutputPath(job, new Path(otherArgs.get(0)));
  
  job.setNumMapTasks(numMaps);
  System.out.println("Running " + numMaps + " maps.");
  
  // reducer NONE
  job.setNumReduceTasks(0);
  
  Date startTime = new Date();
  System.out.println("Job started: " + startTime);
  JobClient.runJob(job);
  Date endTime = new Date();
  System.out.println("Job ended: " + endTime);
  System.out.println("The job took " + 
                     (endTime.getTime() - startTime.getTime()) /1000 + 
                     " seconds.");
  
  return 0;
}
 
源代码19 项目: hadoop-book   文件: RandomWriter.java
/**
 * This is the main routine for launching a distributed random write job. It
 * runs 10 maps/node and each node writes 1 gig of data to a DFS file. The
 * reduce doesn't do anything.
 *
 * @throws IOException
 */
public int run(String[] args) throws Exception {
    if (args.length == 0) {
        System.out.println("Usage: writer <out-dir>");
        ToolRunner.printGenericCommandUsage(System.out);
        return -1;
    }

    Path outDir = new Path(args[0]);
    JobConf job = new JobConf(getConf());

    job.setJarByClass(RandomWriter.class);
    job.setJobName("random-writer");
    FileOutputFormat.setOutputPath(job, outDir);

    job.setOutputKeyClass(BytesWritable.class);
    job.setOutputValueClass(BytesWritable.class);

    job.setInputFormat(RandomInputFormat.class);
    job.setMapperClass(Map.class);
    job.setReducerClass(IdentityReducer.class);
    job.setOutputFormat(SequenceFileOutputFormat.class);

    JobClient client = new JobClient(job);
    ClusterStatus cluster = client.getClusterStatus();
    int numMapsPerHost = job.getInt("test.randomwriter.maps_per_host", 10);
    long numBytesToWritePerMap = job.getLong("test.randomwrite.bytes_per_map",
            1 * 1024 * 1024 * 1024);
    if (numBytesToWritePerMap == 0) {
        System.err.println("Cannot have test.randomwrite.bytes_per_map set to 0");
        return -2;
    }
    long totalBytesToWrite = job.getLong("test.randomwrite.total_bytes",
            numMapsPerHost * numBytesToWritePerMap * cluster.getTaskTrackers());
    int numMaps = (int) (totalBytesToWrite / numBytesToWritePerMap);
    if (numMaps == 0 && totalBytesToWrite > 0) {
        numMaps = 1;
        job.setLong("test.randomwrite.bytes_per_map", totalBytesToWrite);
    }

    job.setNumMapTasks(numMaps);
    System.out.println("Running " + numMaps + " maps.");

    // reducer NONE
    job.setNumReduceTasks(0);

    Date startTime = new Date();
    System.out.println("Job started: " + startTime);
    JobClient.runJob(job);
    Date endTime = new Date();
    System.out.println("Job ended: " + endTime);
    System.out.println("The job took "
            + (endTime.getTime() - startTime.getTime()) / 1000
            + " seconds.");

    return 0;
}
 
源代码20 项目: RDFS   文件: GenericMRLoadJobCreator.java
public static JobConf createJob(String[] argv, boolean mapoutputCompressed,
    boolean outputCompressed) throws Exception {

  JobConf job = new JobConf();
  job.setJarByClass(GenericMRLoadGenerator.class);
  job.setMapperClass(SampleMapper.class);
  job.setReducerClass(SampleReducer.class);
  if (!parseArgs(argv, job)) {
    return null;
  }

  if (null == FileOutputFormat.getOutputPath(job)) {
    // No output dir? No writes
    job.setOutputFormat(NullOutputFormat.class);
  }

  if (0 == FileInputFormat.getInputPaths(job).length) {
    // No input dir? Generate random data
    System.err.println("No input path; ignoring InputFormat");
    confRandom(job);
  } else if (null != job.getClass("mapred.indirect.input.format", null)) {
    // specified IndirectInputFormat? Build src list
    JobClient jClient = new JobClient(job);
    Path sysdir = jClient.getSystemDir();
    Random r = new Random();
    Path indirInputFile = new Path(sysdir, Integer.toString(r
        .nextInt(Integer.MAX_VALUE), 36)
        + "_files");
    job.set("mapred.indirect.input.file", indirInputFile.toString());
    SequenceFile.Writer writer = SequenceFile.createWriter(sysdir
        .getFileSystem(job), job, indirInputFile, LongWritable.class,
        Text.class, SequenceFile.CompressionType.NONE);
    try {
      for (Path p : FileInputFormat.getInputPaths(job)) {
        FileSystem fs = p.getFileSystem(job);
        Stack<Path> pathstack = new Stack<Path>();
        pathstack.push(p);
        while (!pathstack.empty()) {
          for (FileStatus stat : fs.listStatus(pathstack.pop())) {
            if (stat.isDir()) {
              if (!stat.getPath().getName().startsWith("_")) {
                pathstack.push(stat.getPath());
              }
            } else {
              writer.sync();
              writer.append(new LongWritable(stat.getLen()), new Text(stat
                  .getPath().toUri().toString()));
            }
          }
        }
      }
    } finally {
      writer.close();
    }
  }

  job.setCompressMapOutput(mapoutputCompressed);
  job.setBoolean("mapred.output.compress", outputCompressed);
  return job;

}