org.apache.hadoop.mapreduce.Job#setMapperClass ( )源码实例Demo

下面列出了org.apache.hadoop.mapreduce.Job#setMapperClass ( ) 实例代码,或者点击链接到github查看源代码,也可以在右侧发表评论。

源代码1 项目: marklogic-contentpump   文件: Test20772.java
public static void main(String[] args) throws Exception {
    Configuration conf = new Configuration();
    if (args.length < 2) {
        System.err.println("Usage: Test20772 outputpath");
        System.exit(2);
    }

    Job job = Job.getInstance(conf);
    job.setJarByClass(Test20772.class);
    
    // Map related configuration
    job.setInputFormatClass(NodeInputFormat.class);
    job.setMapperClass(MyMapper.class);
    job.setMapOutputKeyClass(NodePath.class);
    job.setMapOutputValueClass(MarkLogicNode.class);
    job.setReducerClass(MyReducer.class);
    FileOutputFormat.setOutputPath(job, new Path(args[1]));
    conf.setInt("mapred.reduce.tasks", 0);

    conf = job.getConfiguration();
    conf.addResource(args[0]);
    
    System.exit(job.waitForCompletion(true) ? 0 : 1);
}
 
源代码2 项目: incubator-tez   文件: TeraValidate.java
public int run(String[] args) throws Exception {
  Job job = Job.getInstance(getConf());
  if (args.length != 2) {
    usage();
    return 1;
  }
  TeraInputFormat.setInputPaths(job, new Path(args[0]));
  FileOutputFormat.setOutputPath(job, new Path(args[1]));
  job.setJobName("TeraValidate");
  job.setJarByClass(TeraValidate.class);
  job.setMapperClass(ValidateMapper.class);
  job.setReducerClass(ValidateReducer.class);
  job.setOutputKeyClass(Text.class);
  job.setOutputValueClass(Text.class);
  // force a single reducer
  job.setNumReduceTasks(1);
  // force a single split 
  FileInputFormat.setMinInputSplitSize(job, Long.MAX_VALUE);
  job.setInputFormatClass(TeraInputFormat.class);
  return job.waitForCompletion(true) ? 0 : 1;
}
 
源代码3 项目: recsys-offline   文件: Step1.java
public static void main(String[] args) throws Exception {
    Configuration conf1 = new Configuration();

    Job job1 = new Job(conf1, "step1");
    job1.setOutputFormatClass(SequenceFileOutputFormat.class);
    job1.setNumReduceTasks(1);
    job1.setJarByClass(Step1.class);
    job1.setMapperClass(WikiMapper1.class);
    job1.setMapOutputKeyClass(VarLongWritable.class);
    job1.setMapOutputValueClass(LongAndFloat.class);
    job1.setReducerClass(WiKiReducer1.class);
    job1.setOutputKeyClass(VarLongWritable.class);
    job1.setOutputValueClass(VectorWritable.class);

    FileInputFormat.addInputPath(job1, new Path( INPUT_PATH ) );
    SequenceFileOutputFormat.setOutputPath(job1, new Path(OUTPUT_PATH ));
    if (!job1.waitForCompletion(true)) {
        System.exit(1);
    }
}
 
源代码4 项目: RDFS   文件: TestMiniCoronaFederatedJT.java
public void testOneRemoteJT() throws Exception {
  LOG.info("Starting testOneRemoteJT");
  String[] racks = "/rack-1".split(",");
  String[] trackers = "tracker-1".split(",");
  corona = new MiniCoronaCluster.Builder().numTaskTrackers(1).racks(racks)
      .hosts(trackers).build();
  Configuration conf = corona.createJobConf();
  conf.set("mapred.job.tracker", "corona");
  conf.set("mapred.job.tracker.class", CoronaJobTracker.class.getName());
  String locationsCsv = "tracker-1";
  conf.set("test.locations", locationsCsv);
  conf.setBoolean("mapred.coronajobtracker.forceremote", true);
  Job job = new Job(conf);
  job.setMapperClass(TstJob.TestMapper.class);
  job.setInputFormatClass(TstJob.TestInputFormat.class);
  job.setOutputFormatClass(NullOutputFormat.class);
  job.setNumReduceTasks(0);
  job.getConfiguration().set("io.sort.record.pct", "0.50");
  job.getConfiguration().set("io.sort.mb", "25");
  boolean success = job.waitForCompletion(true);
  assertTrue("Job did not succeed", success);
}
 
源代码5 项目: laser   文件: Compute.java
public static int run(Path model, Configuration baseConf) throws IOException, ClassNotFoundException,
		InterruptedException {
	Configuration conf = new Configuration(baseConf);
	conf.set("com.b5m.laser.msgpack.input.method", "ad_feature");
	conf.set("com.b5m.laser.msgpack.output.method", "precompute_ad_offline_model");
	conf.set("com.b5m.laser.offline.model", model.toString());
	Job job = Job.getInstance(conf);
	job.setJarByClass(Compute.class);
	job.setJobName("per compute stable part from offline model for each user");
	job.setInputFormatClass(MsgpackInputFormat.class);
	job.setOutputFormatClass(MsgpackOutputFormat.class);
	
	job.setOutputKeyClass(Long.class);
	job.setOutputValueClass(Result.class);
	
	job.setMapperClass(Mapper.class);
	job.setNumReduceTasks(0);
	
	boolean succeeded = job.waitForCompletion(true);
	if (!succeeded) {
		throw new IllegalStateException("Job failed!");
	}

	return 0;
}
 
源代码6 项目: MLHadoop   文件: merge_results_driver.java
public static boolean runWithJob(Job job, String out_path) throws IOException, InterruptedException, ClassNotFoundException {
  job.setJarByClass(merge_results_driver.class);

  job.setJobName("Final Step: Merging results and creating separate LU decomposed components of input matrix");

  FileOutputFormat.setOutputPath(job, new Path(out_path));

  job.setMapperClass(lu_decomposition.naive_gausssian.MergeResults.merge_results_mapper.class);
  job.setReducerClass(lu_decomposition.naive_gausssian.MergeResults.merge_results_reducer.class);
  job.setMapOutputKeyClass(TextPair.class);
  job.setMapOutputValueClass(Text.class);
  job.setOutputKeyClass(TextPair.class);
  job.setOutputValueClass(Text.class);
  job.setPartitionerClass(TextPairPartitioner.class);
     job.setGroupingComparatorClass(NaturalKeyGroupingComparator.class);
     job.setSortComparatorClass(TextPairComparator.class);
     
     boolean success = job.waitForCompletion(true);
  return success;
}
 
源代码7 项目: ecosys   文件: Hdfs2Tg.java
public static void main(String[] args) throws Exception {
    Configuration conf = new Configuration();
    Job job = Job.getInstance(conf, "HDFS to TG");
    job.setJarByClass(Hdfs2Tg.class);
    job.setMapperClass(LineMapper.class);
    job.setNumReduceTasks(0);
    job.setOutputKeyClass(Text.class);
    job.setOutputValueClass(IntWritable.class);
    FileInputFormat.addInputPath(job, new Path(args[0]));
    FileOutputFormat.setOutputPath(job, new Path(args[1]));
    System.exit(job.waitForCompletion(true) ? 0 : 1);
}
 
源代码8 项目: hiped2   文件: FinalJoinJob.java
public static void runJob(Configuration conf,
                          Path userLogsPath,
                          Path usersPath,
                          Path outputPath)
    throws Exception {

  FileSystem fs = usersPath.getFileSystem(conf);

  FileStatus usersStatus = fs.getFileStatus(usersPath);

  if (usersStatus.isDir()) {
    for (FileStatus f : fs.listStatus(usersPath)) {
      if (f.getPath().getName().startsWith("part")) {
        DistributedCache.addCacheFile(f.getPath().toUri(), conf);
      }
    }
  } else {
    DistributedCache.addCacheFile(usersPath.toUri(), conf);
  }

  Job job = new Job(conf);

  job.setJarByClass(FinalJoinJob.class);
  job.setMapperClass(GenericReplicatedJoin.class);

  job.setNumReduceTasks(0);

  job.setInputFormatClass(KeyValueTextInputFormat.class);

  outputPath.getFileSystem(conf).delete(outputPath, true);

  FileInputFormat.setInputPaths(job, userLogsPath);
  FileOutputFormat.setOutputPath(job, outputPath);

  if (!job.waitForCompletion(true)) {
    throw new Exception("Job failed");
  }
}
 
源代码9 项目: dkpro-c4corpus   文件: PagesByURLExtractor.java
@Override
public int run(String[] args)
        throws Exception
{
    Job job = Job.getInstance(getConf());

    for (Map.Entry<String, String> next : job.getConfiguration()) {
        System.out.println(next.getKey() + ": " + next.getValue());
    }

    job.setJarByClass(PagesByURLExtractor.class);
    job.setJobName(PagesByURLExtractor.class.getName());

    // mapper
    job.setMapperClass(MapperClass.class);

    // input
    job.setInputFormatClass(WARCInputFormat.class);

    // output
    job.setOutputFormatClass(WARCOutputFormat.class);
    job.setOutputKeyClass(NullWritable.class);
    job.setOutputValueClass(WARCWritable.class);
    FileOutputFormat.setCompressOutput(job, true);

    // paths
    String commaSeparatedInputFiles = args[0];
    String outputPath = args[1];

    // load IDs to be searched for
    job.getConfiguration().set(MAPREDUCE_MAPPER_URLS, loadURLs(args[2]));

    FileInputFormat.addInputPaths(job, commaSeparatedInputFiles);
    FileOutputFormat.setOutputPath(job, new Path(outputPath));

    return job.waitForCompletion(true) ? 0 : 1;
}
 
源代码10 项目: kite   文件: DatasetSourceTarget.java
@Override
@SuppressWarnings("unchecked")
public void configureSource(Job job, int inputId) throws IOException {
  Configuration conf = job.getConfiguration();
  if (inputId == -1) {
    job.setMapperClass(CrunchMapper.class);
    job.setInputFormatClass(formatBundle.getFormatClass());
    formatBundle.configure(conf);
  } else {
    Path dummy = new Path("/view/" + view.getDataset().getName());
    CrunchInputs.addInputPath(job, dummy, formatBundle, inputId);
  }
}
 
/**
 * Set the mapper class implementation to use in the job,
 * as well as any related configuration (e.g., map output types).
 */
protected void configureMapper(Job job, String tableName,
    String tableClassName) throws ClassNotFoundException, IOException {
  job.setMapperClass(getMapperClass());
  job.setOutputKeyClass(String.class);
  job.setOutputValueClass(NullWritable.class);
}
 
public static void run() throws IOException, ClassNotFoundException,
		InterruptedException {

	String inputPath = ItemBasedCFDriver.path.get("step2InputPath");
	String outputPath = ItemBasedCFDriver.path.get("step2OutputPath");

	Configuration conf = new Configuration();
	conf.set("mapred.textoutputformat.separator", ":");

	Job job = Job.getInstance(conf);

	HDFS hdfs = new HDFS(conf);
	hdfs.rmr(outputPath);

	job.setMapperClass(Step2_Mapper.class);
	job.setReducerClass(Step2_Reducer.class);
	job.setCombinerClass(Step2_Reducer.class);
	job.setNumReduceTasks(ItemBasedCFDriver.ReducerNumber);

	job.setJarByClass(CalculateSimilarityStep2.class);

	job.setMapOutputKeyClass(IntWritable.class);
	job.setMapOutputValueClass(Text.class);
	job.setOutputKeyClass(IntWritable.class);
	job.setOutputValueClass(Text.class);

	job.setInputFormatClass(TextInputFormat.class);
	job.setOutputFormatClass(TextOutputFormat.class);

	FileInputFormat.setInputPaths(job, new Path(inputPath));
	FileOutputFormat.setOutputPath(job, new Path(outputPath));

	job.waitForCompletion(true);
}
 
源代码13 项目: halvade   文件: MapReduceRunner.java
protected int runCombineJob(String halvadeOutDir, String mergeOutDir, boolean featureCount) throws IOException, URISyntaxException, InterruptedException, ClassNotFoundException {
        Configuration combineConf = getConf();
        if(!halvadeOpts.out.endsWith("/")) halvadeOpts.out += "/";  
        HalvadeConf.setInputDir(combineConf, halvadeOutDir);
        HalvadeConf.setOutDir(combineConf, mergeOutDir);
        FileSystem outFs = FileSystem.get(new URI(mergeOutDir), combineConf);
        if (outFs.exists(new Path(mergeOutDir))) {
            Logger.INFO("The output directory \'" + mergeOutDir + "\' already exists.");
            Logger.INFO("ERROR: Please remove this directory before trying again.");
            System.exit(-2);
        }
        HalvadeConf.setReportAllVariant(combineConf, halvadeOpts.reportAll);
        HalvadeResourceManager.setJobResources(halvadeOpts, combineConf, HalvadeResourceManager.COMBINE, false, halvadeOpts.useBamInput);
//        halvadeOpts.splitChromosomes(combineConf, 0);
        Job combineJob = Job.getInstance(combineConf, "HalvadeCombineVCF");            
        combineJob.setJarByClass(VCFCombineMapper.class);

        addInputFiles(halvadeOutDir, combineConf, combineJob, featureCount ? ".count" : ".vcf");
        FileOutputFormat.setOutputPath(combineJob, new Path(mergeOutDir));

        combineJob.setMapperClass(featureCount ? HTSeqCombineMapper.class : VCFCombineMapper.class);
        combineJob.setMapOutputKeyClass(featureCount ? Text.class : LongWritable.class);
        combineJob.setMapOutputValueClass(featureCount ? LongWritable.class : VariantContextWritable.class);
        combineJob.setInputFormatClass(featureCount ? TextInputFormat.class : VCFInputFormat.class);
        combineJob.setNumReduceTasks(1); 
        combineJob.setReducerClass(featureCount ? 
                be.ugent.intec.halvade.hadoop.mapreduce.HTSeqCombineReducer.class :
                be.ugent.intec.halvade.hadoop.mapreduce.VCFCombineReducer.class);
        combineJob.setOutputKeyClass(Text.class);
        combineJob.setOutputValueClass(featureCount ? LongWritable.class : VariantContextWritable.class);

        return runTimedJob(combineJob, (featureCount ? "featureCounts" : "VCF")  + " Combine Job");
    }
 
源代码14 项目: MapReduce-Demo   文件: PeopleRank.java
public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
	String namenode_ip = "192.168.17.10";
	String hdfs = "hdfs://" + namenode_ip + ":9000";			
	Configuration conf = new Configuration();		
	conf.set("fs.defaultFS", hdfs);
	conf.set("mapreduce.app-submission.cross-platform", "true");
	
	String jobName = "PeopleRank";
	Job job = Job.getInstance(conf, jobName);
	job.setJarByClass(PeopleRank.class);
	job.setJar("export\\PeopleRank.jar");
	job.setMapperClass(PeopleRankMapper.class);
	job.setMapOutputKeyClass(Text.class);
	job.setMapOutputValueClass(Text.class);
	job.setReducerClass(PeopleRankReducer.class);
	job.setOutputKeyClass(Text.class);
	job.setOutputValueClass(Text.class);
	
	String dataDir = "/expr/peoplerank/data";
	String outputDir = "/expr/peoplerank/output/adjacent";
	Path inPath = new Path(hdfs + dataDir);
	Path outPath = new Path(hdfs + outputDir);
	FileInputFormat.addInputPath(job, inPath);
	FileOutputFormat.setOutputPath(job, outPath);		
	FileSystem fs = FileSystem.get(conf);
	if(fs.exists(outPath)) {
		fs.delete(outPath, true);
	}

	System.out.println( "Job: " + jobName + " is running...");
	if(job.waitForCompletion(true)) {
		System.out.println("success!");
		System.exit(0);
	} else {
		System.out.println("failed!");
		System.exit(1);
	}
}
 
源代码15 项目: geowave   文件: OSMRunner.java
@Override
public int run(final String[] args) throws Exception {

  final Configuration conf = getConf();
  conf.set("tableName", ingestOptions.getQualifiedTableName());
  conf.set("osmVisibility", ingestOptions.getVisibilityOptions().getVisibility());

  // job settings
  final Job job = Job.getInstance(conf, ingestOptions.getJobName());
  job.setJarByClass(OSMRunner.class);

  switch (ingestOptions.getMapperType()) {
    case "NODE": {
      configureSchema(AvroNode.getClassSchema());
      inputAvroFile = ingestOptions.getNodesBasePath();
      job.setMapperClass(OSMNodeMapper.class);
      break;
    }
    case "WAY": {
      configureSchema(AvroWay.getClassSchema());
      inputAvroFile = ingestOptions.getWaysBasePath();
      job.setMapperClass(OSMWayMapper.class);
      break;
    }
    case "RELATION": {
      configureSchema(AvroRelation.getClassSchema());
      inputAvroFile = ingestOptions.getRelationsBasePath();
      job.setMapperClass(OSMRelationMapper.class);
      break;
    }
    default:
      break;
  }
  if ((avroSchema == null) || (inputAvroFile == null)) {
    throw new MissingArgumentException(
        "argument for mapper type must be one of: NODE, WAY, or RELATION");
  }

  enableLocalityGroups(ingestOptions);

  // input format
  job.setInputFormatClass(AvroKeyInputFormat.class);
  FileInputFormat.setInputPaths(job, inputAvroFile);
  AvroJob.setInputKeySchema(job, avroSchema);

  // mappper

  job.setOutputKeyClass(Text.class);
  job.setOutputValueClass(Mutation.class);
  job.setOutputFormatClass(AccumuloOutputFormat.class);
  AccumuloOutputFormat.setConnectorInfo(
      job,
      accumuloOptions.getUser(),
      new PasswordToken(accumuloOptions.getPassword()));
  AccumuloOutputFormat.setCreateTables(job, true);
  AccumuloOutputFormat.setDefaultTableName(job, ingestOptions.getQualifiedTableName());
  AccumuloOutputFormat.setZooKeeperInstance(
      job,
      new ClientConfiguration().withInstance(accumuloOptions.getInstance()).withZkHosts(
          accumuloOptions.getZookeeper()));

  // reducer
  job.setNumReduceTasks(0);

  return job.waitForCompletion(true) ? 0 : -1;
}
 
源代码16 项目: MapReduce-Demo   文件: DateCount.java
public static void main(String[] args) throws Exception {		
	//1.设置HDFS配置信息
	String namenode_ip = "192.168.17.10";
	String hdfs = "hdfs://" + namenode_ip + ":9000";			
	Configuration conf = new Configuration();
	conf.set("fs.defaultFS", hdfs);
	conf.set("mapreduce.app-submission.cross-platform", "true");

	//2.设置MapReduce作业配置信息
	String jobName = "DateCount";					//作业名称
	Job job = Job.getInstance(conf, jobName);
	job.setJarByClass(DateCount.class);				//指定运行时作业类
	job.setJar("export\\DateCount.jar");			//指定本地jar包
	job.setMapperClass(DateCountMapper.class);		//指定Mapper类
	job.setMapOutputKeyClass(Text.class);			//设置Mapper输出Key类型
	job.setMapOutputValueClass(IntWritable.class);	//设置Mapper输出Value类型
	job.setReducerClass(DateCountReducer.class);	//指定Reducer类
	job.setOutputKeyClass(Text.class);				//设置Reduce输出Key类型
	job.setOutputValueClass(IntWritable.class); 	//设置Reduce输出Value类型
	
	//3.设置作业输入和输出路径
	String dataDir = "/expr/datecount/data";		//实验数据目录	
	String outputDir = "/expr/datecount/output";	//实验输出目录
	Path inPath = new Path(hdfs + dataDir);
	Path outPath = new Path(hdfs + outputDir);
	FileInputFormat.addInputPath(job, inPath);
	FileOutputFormat.setOutputPath(job, outPath);
	FileSystem fs = FileSystem.get(conf);
	if(fs.exists(outPath)) {
		fs.delete(outPath, true);
	}
	
	//4.运行作业
	System.out.println("Job: " + jobName + " is running...");
	if(job.waitForCompletion(true)) {
		System.out.println("success!");
		System.exit(0);
	} else {
		System.out.println("failed!");
		System.exit(1);
	}
}
 
源代码17 项目: kite   文件: TestMapReduce.java
@SuppressWarnings("deprecation")
private Job createJob() throws Exception {
  Job job = new Job();

  DatasetKeyInputFormat.configure(job).readFrom(inputDataset).withType(GenericData.Record.class);

  job.setMapperClass(LineCountMapper.class);
  job.setMapOutputKeyClass(Text.class);
  job.setMapOutputValueClass(IntWritable.class);

  job.setReducerClass(GenericStatsReducer.class);

  DatasetKeyOutputFormat.configure(job).writeTo(outputDataset).withType(GenericData.Record.class);

  return job;
}
 
源代码18 项目: halvade   文件: MapReduceRunner.java
protected int runPass1RNAJob(Configuration pass1Conf, String tmpOutDir) throws IOException, InterruptedException, ClassNotFoundException, URISyntaxException {
    HalvadeConf.setIsPass2(pass1Conf, false);
    HalvadeResourceManager.setJobResources(halvadeOpts, pass1Conf, HalvadeResourceManager.RNA_SHMEM_PASS1, halvadeOpts.nodes == 1, halvadeOpts.useBamInput);
    int pass2Reduces = HalvadeResourceManager.getPass2Reduces(halvadeOpts);
    halvadeOpts.splitChromosomes(pass1Conf, pass2Reduces);
    HalvadeConf.setPass2Suffix(pass1Conf, pass2suffix);
    
    Job pass1Job = Job.getInstance(pass1Conf, "Halvade pass 1 RNA pipeline");
    pass1Job.addCacheArchive(new URI(halvadeOpts.halvadeBinaries));
    pass1Job.setJarByClass(be.ugent.intec.halvade.hadoop.mapreduce.HalvadeMapper.class);
    // set pass 2 suffix so only this job finds it!
    FileSystem fs = FileSystem.get(new URI(halvadeOpts.in), pass1Conf);
    try {
        if (fs.getFileStatus(new Path(halvadeOpts.in)).isDirectory()) {
            // add every file in directory
            FileStatus[] files = fs.listStatus(new Path(halvadeOpts.in));
            for(FileStatus file : files) {
                if (!file.isDirectory()) {
                    FileInputFormat.addInputPath(pass1Job, file.getPath());
                }
            }
        } else {
            FileInputFormat.addInputPath(pass1Job, new Path(halvadeOpts.in));
        }
    } catch (IOException | IllegalArgumentException e) {
        Logger.EXCEPTION(e);
    }

    FileSystem outFs = FileSystem.get(new URI(tmpOutDir), pass1Conf);
    boolean skipPass1 = false;
    if (outFs.exists(new Path(tmpOutDir))) {
        // check if genome already exists
        skipPass1 = outFs.exists(new Path(tmpOutDir + "/_SUCCESS"));
        if(skipPass1)
            Logger.DEBUG("pass1 genome already created, skipping pass 1");
        else {
            Logger.INFO("The output directory \'" + tmpOutDir + "\' already exists.");
            Logger.INFO("ERROR: Please remove this directory before trying again.");
            System.exit(-2);
        }
    }
    if(!skipPass1) {
        FileOutputFormat.setOutputPath(pass1Job, new Path(tmpOutDir));
        pass1Job.setMapperClass(be.ugent.intec.halvade.hadoop.mapreduce.StarAlignPassXMapper.class);

        pass1Job.setInputFormatClass(HalvadeTextInputFormat.class);
        pass1Job.setMapOutputKeyClass(GenomeSJ.class);
        pass1Job.setMapOutputValueClass(Text.class);

        pass1Job.setSortComparatorClass(GenomeSJSortComparator.class);
        pass1Job.setGroupingComparatorClass(GenomeSJGroupingComparator.class);
        pass1Job.setNumReduceTasks(1); 
        pass1Job.setReducerClass(be.ugent.intec.halvade.hadoop.mapreduce.RebuildStarGenomeReducer.class);          
        pass1Job.setOutputKeyClass(LongWritable.class);
        pass1Job.setOutputValueClass(Text.class);

        return runTimedJob(pass1Job, "Halvade pass 1 Job");
    } else
        return 0;
}
 
源代码19 项目: aegisthus   文件: Aegisthus.java
@Override
public int run(String[] args) throws Exception {
    Job job = Job.getInstance(getConf());
    Configuration configuration = job.getConfiguration();

    job.setJarByClass(Aegisthus.class);
    CommandLine cl = getOptions(args);
    if (cl == null) {
        return 1;
    }

    // Check all of the paths and load the sstable version from the input filenames
    List<Path> paths = Lists.newArrayList();
    if (cl.hasOption(Feature.CMD_ARG_INPUT_FILE)) {
        for (String input : cl.getOptionValues(Feature.CMD_ARG_INPUT_FILE)) {
            paths.add(new Path(input));
        }
    }
    if (cl.hasOption(Feature.CMD_ARG_INPUT_DIR)) {
        paths.addAll(getDataFiles(configuration, cl.getOptionValue(Feature.CMD_ARG_INPUT_DIR)));
    }
    LOG.info("Processing paths: {}", paths);

    // At this point we have the version of sstable that we can use for this run
    Descriptor.Version version = Descriptor.Version.CURRENT;
    if (cl.hasOption(Feature.CMD_ARG_SSTABLE_OUTPUT_VERSION)) {
        version = new Descriptor.Version(cl.getOptionValue(Feature.CMD_ARG_SSTABLE_OUTPUT_VERSION));
    }
    configuration.set(Feature.CONF_SSTABLE_VERSION, version.toString());

    if (configuration.get(Feature.CONF_CQL_SCHEMA) != null) {
        setConfigurationFromCql(configuration);
    }

    if(cl.hasOption(Feature.CMD_ARG_COMBINE_SPLITS)) {
        job.setInputFormatClass(AegisthusCombinedInputFormat.class);
    } else {
        job.setInputFormatClass(AegisthusInputFormat.class);
    }
    job.setMapOutputKeyClass(AegisthusKey.class);
    job.setMapOutputValueClass(AtomWritable.class);
    job.setOutputKeyClass(AegisthusKey.class);
    job.setOutputValueClass(RowWritable.class);
    job.setMapperClass(AegisthusKeyMapper.class);
    job.setReducerClass(CassSSTableReducer.class);
    job.setGroupingComparatorClass(AegisthusKeyGroupingComparator.class);
    job.setPartitionerClass(AegisthusKeyPartitioner.class);
    job.setSortComparatorClass(AegisthusKeySortingComparator.class);

    TextInputFormat.setInputPaths(job, paths.toArray(new Path[paths.size()]));

    if (cl.hasOption(Feature.CMD_ARG_PRODUCE_SSTABLE)) {
        job.setOutputFormatClass(SSTableOutputFormat.class);
    } else {
        job.setOutputFormatClass(JsonOutputFormat.class);
    }
    CustomFileNameFileOutputFormat.setOutputPath(job, new Path(cl.getOptionValue(Feature.CMD_ARG_OUTPUT_DIR)));

    job.submit();
    if (configuration.getBoolean(Feature.CONF_SHUTDOWN_HOOK, true)) {
        Runtime.getRuntime().addShutdownHook(new JobKiller(job));
    }

    System.out.println(job.getJobID());
    System.out.println(job.getTrackingURL());
    boolean success = job.waitForCompletion(true);

    if (success) {
        Counter errorCounter = job.getCounters().findCounter("aegisthus", "error_skipped_input");
        long errorCount = errorCounter != null ? errorCounter.getValue() : 0L;
        int maxAllowed = configuration.getInt(Feature.CONF_MAX_CORRUPT_FILES_TO_SKIP, 0);
        if (errorCounter != null && errorCounter.getValue() > maxAllowed) {
            LOG.error("Found {} corrupt files which is greater than the max allowed {}", errorCount, maxAllowed);
            success = false;
        } else if (errorCount > 0) {
            LOG.warn("Found {} corrupt files but not failing the job because the max allowed is {}",
                    errorCount, maxAllowed);
        }
    }

    return success ? 0 : 1;
}
 
public void testBlurOutputFormatCleanupDuringJobKillTest() throws IOException, InterruptedException,
    ClassNotFoundException {
  Path input = getInDir();
  Path output = getOutDir();
  _fileSystem.delete(input, true);
  _fileSystem.delete(output, true);
  // 1500 * 50 = 75,000
  writeRecordsFile(new Path(input, "part1"), 1, 50, 1, 1500, "cf1");
  // 100 * 5000 = 500,000
  writeRecordsFile(new Path(input, "part2"), 1, 5000, 2000, 100, "cf1");

  Job job = Job.getInstance(_conf, "blur index");
  job.setJarByClass(BlurOutputFormatTest.class);
  job.setMapperClass(CsvBlurMapper.class);
  job.setInputFormatClass(TextInputFormat.class);

  FileInputFormat.addInputPath(job, input);
  CsvBlurMapper.addColumns(job, "cf1", "col");

  Path tablePath = new Path(new Path(_root, "table"), "test");

  TableDescriptor tableDescriptor = new TableDescriptor();
  tableDescriptor.setShardCount(2);
  tableDescriptor.setTableUri(tablePath.toString());
  tableDescriptor.setName("test");

  createShardDirectories(getOutDir(), 2);

  BlurOutputFormat.setupJob(job, tableDescriptor);
  BlurOutputFormat.setOutputPath(job, output);
  BlurOutputFormat.setIndexLocally(job, false);

  job.submit();
  boolean killCalled = false;
  while (!job.isComplete()) {
    Thread.sleep(1000);
    System.out.printf("Killed [" + killCalled + "] Map [%f] Reduce [%f]%n", job.mapProgress() * 100,
        job.reduceProgress() * 100);
    if (job.reduceProgress() > 0.7 && !killCalled) {
      job.killJob();
      killCalled = true;
    }
  }

  assertFalse(job.isSuccessful());

  for (int i = 0; i < tableDescriptor.getShardCount(); i++) {
    Path path = new Path(output, ShardUtil.getShardName(i));
    FileSystem fileSystem = path.getFileSystem(job.getConfiguration());
    FileStatus[] listStatus = fileSystem.listStatus(path);
    assertEquals(toString(listStatus), 0, listStatus.length);
  }
}