下面列出了org.apache.hadoop.mapreduce.Job#setMapperClass ( ) 实例代码,或者点击链接到github查看源代码,也可以在右侧发表评论。
public static void main(String[] args) throws Exception {
Configuration conf = new Configuration();
if (args.length < 2) {
System.err.println("Usage: Test20772 outputpath");
System.exit(2);
}
Job job = Job.getInstance(conf);
job.setJarByClass(Test20772.class);
// Map related configuration
job.setInputFormatClass(NodeInputFormat.class);
job.setMapperClass(MyMapper.class);
job.setMapOutputKeyClass(NodePath.class);
job.setMapOutputValueClass(MarkLogicNode.class);
job.setReducerClass(MyReducer.class);
FileOutputFormat.setOutputPath(job, new Path(args[1]));
conf.setInt("mapred.reduce.tasks", 0);
conf = job.getConfiguration();
conf.addResource(args[0]);
System.exit(job.waitForCompletion(true) ? 0 : 1);
}
public int run(String[] args) throws Exception {
Job job = Job.getInstance(getConf());
if (args.length != 2) {
usage();
return 1;
}
TeraInputFormat.setInputPaths(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));
job.setJobName("TeraValidate");
job.setJarByClass(TeraValidate.class);
job.setMapperClass(ValidateMapper.class);
job.setReducerClass(ValidateReducer.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Text.class);
// force a single reducer
job.setNumReduceTasks(1);
// force a single split
FileInputFormat.setMinInputSplitSize(job, Long.MAX_VALUE);
job.setInputFormatClass(TeraInputFormat.class);
return job.waitForCompletion(true) ? 0 : 1;
}
public static void main(String[] args) throws Exception {
Configuration conf1 = new Configuration();
Job job1 = new Job(conf1, "step1");
job1.setOutputFormatClass(SequenceFileOutputFormat.class);
job1.setNumReduceTasks(1);
job1.setJarByClass(Step1.class);
job1.setMapperClass(WikiMapper1.class);
job1.setMapOutputKeyClass(VarLongWritable.class);
job1.setMapOutputValueClass(LongAndFloat.class);
job1.setReducerClass(WiKiReducer1.class);
job1.setOutputKeyClass(VarLongWritable.class);
job1.setOutputValueClass(VectorWritable.class);
FileInputFormat.addInputPath(job1, new Path( INPUT_PATH ) );
SequenceFileOutputFormat.setOutputPath(job1, new Path(OUTPUT_PATH ));
if (!job1.waitForCompletion(true)) {
System.exit(1);
}
}
public void testOneRemoteJT() throws Exception {
LOG.info("Starting testOneRemoteJT");
String[] racks = "/rack-1".split(",");
String[] trackers = "tracker-1".split(",");
corona = new MiniCoronaCluster.Builder().numTaskTrackers(1).racks(racks)
.hosts(trackers).build();
Configuration conf = corona.createJobConf();
conf.set("mapred.job.tracker", "corona");
conf.set("mapred.job.tracker.class", CoronaJobTracker.class.getName());
String locationsCsv = "tracker-1";
conf.set("test.locations", locationsCsv);
conf.setBoolean("mapred.coronajobtracker.forceremote", true);
Job job = new Job(conf);
job.setMapperClass(TstJob.TestMapper.class);
job.setInputFormatClass(TstJob.TestInputFormat.class);
job.setOutputFormatClass(NullOutputFormat.class);
job.setNumReduceTasks(0);
job.getConfiguration().set("io.sort.record.pct", "0.50");
job.getConfiguration().set("io.sort.mb", "25");
boolean success = job.waitForCompletion(true);
assertTrue("Job did not succeed", success);
}
public static int run(Path model, Configuration baseConf) throws IOException, ClassNotFoundException,
InterruptedException {
Configuration conf = new Configuration(baseConf);
conf.set("com.b5m.laser.msgpack.input.method", "ad_feature");
conf.set("com.b5m.laser.msgpack.output.method", "precompute_ad_offline_model");
conf.set("com.b5m.laser.offline.model", model.toString());
Job job = Job.getInstance(conf);
job.setJarByClass(Compute.class);
job.setJobName("per compute stable part from offline model for each user");
job.setInputFormatClass(MsgpackInputFormat.class);
job.setOutputFormatClass(MsgpackOutputFormat.class);
job.setOutputKeyClass(Long.class);
job.setOutputValueClass(Result.class);
job.setMapperClass(Mapper.class);
job.setNumReduceTasks(0);
boolean succeeded = job.waitForCompletion(true);
if (!succeeded) {
throw new IllegalStateException("Job failed!");
}
return 0;
}
public static boolean runWithJob(Job job, String out_path) throws IOException, InterruptedException, ClassNotFoundException {
job.setJarByClass(merge_results_driver.class);
job.setJobName("Final Step: Merging results and creating separate LU decomposed components of input matrix");
FileOutputFormat.setOutputPath(job, new Path(out_path));
job.setMapperClass(lu_decomposition.naive_gausssian.MergeResults.merge_results_mapper.class);
job.setReducerClass(lu_decomposition.naive_gausssian.MergeResults.merge_results_reducer.class);
job.setMapOutputKeyClass(TextPair.class);
job.setMapOutputValueClass(Text.class);
job.setOutputKeyClass(TextPair.class);
job.setOutputValueClass(Text.class);
job.setPartitionerClass(TextPairPartitioner.class);
job.setGroupingComparatorClass(NaturalKeyGroupingComparator.class);
job.setSortComparatorClass(TextPairComparator.class);
boolean success = job.waitForCompletion(true);
return success;
}
public static void main(String[] args) throws Exception {
Configuration conf = new Configuration();
Job job = Job.getInstance(conf, "HDFS to TG");
job.setJarByClass(Hdfs2Tg.class);
job.setMapperClass(LineMapper.class);
job.setNumReduceTasks(0);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
FileInputFormat.addInputPath(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));
System.exit(job.waitForCompletion(true) ? 0 : 1);
}
public static void runJob(Configuration conf,
Path userLogsPath,
Path usersPath,
Path outputPath)
throws Exception {
FileSystem fs = usersPath.getFileSystem(conf);
FileStatus usersStatus = fs.getFileStatus(usersPath);
if (usersStatus.isDir()) {
for (FileStatus f : fs.listStatus(usersPath)) {
if (f.getPath().getName().startsWith("part")) {
DistributedCache.addCacheFile(f.getPath().toUri(), conf);
}
}
} else {
DistributedCache.addCacheFile(usersPath.toUri(), conf);
}
Job job = new Job(conf);
job.setJarByClass(FinalJoinJob.class);
job.setMapperClass(GenericReplicatedJoin.class);
job.setNumReduceTasks(0);
job.setInputFormatClass(KeyValueTextInputFormat.class);
outputPath.getFileSystem(conf).delete(outputPath, true);
FileInputFormat.setInputPaths(job, userLogsPath);
FileOutputFormat.setOutputPath(job, outputPath);
if (!job.waitForCompletion(true)) {
throw new Exception("Job failed");
}
}
@Override
public int run(String[] args)
throws Exception
{
Job job = Job.getInstance(getConf());
for (Map.Entry<String, String> next : job.getConfiguration()) {
System.out.println(next.getKey() + ": " + next.getValue());
}
job.setJarByClass(PagesByURLExtractor.class);
job.setJobName(PagesByURLExtractor.class.getName());
// mapper
job.setMapperClass(MapperClass.class);
// input
job.setInputFormatClass(WARCInputFormat.class);
// output
job.setOutputFormatClass(WARCOutputFormat.class);
job.setOutputKeyClass(NullWritable.class);
job.setOutputValueClass(WARCWritable.class);
FileOutputFormat.setCompressOutput(job, true);
// paths
String commaSeparatedInputFiles = args[0];
String outputPath = args[1];
// load IDs to be searched for
job.getConfiguration().set(MAPREDUCE_MAPPER_URLS, loadURLs(args[2]));
FileInputFormat.addInputPaths(job, commaSeparatedInputFiles);
FileOutputFormat.setOutputPath(job, new Path(outputPath));
return job.waitForCompletion(true) ? 0 : 1;
}
@Override
@SuppressWarnings("unchecked")
public void configureSource(Job job, int inputId) throws IOException {
Configuration conf = job.getConfiguration();
if (inputId == -1) {
job.setMapperClass(CrunchMapper.class);
job.setInputFormatClass(formatBundle.getFormatClass());
formatBundle.configure(conf);
} else {
Path dummy = new Path("/view/" + view.getDataset().getName());
CrunchInputs.addInputPath(job, dummy, formatBundle, inputId);
}
}
/**
* Set the mapper class implementation to use in the job,
* as well as any related configuration (e.g., map output types).
*/
protected void configureMapper(Job job, String tableName,
String tableClassName) throws ClassNotFoundException, IOException {
job.setMapperClass(getMapperClass());
job.setOutputKeyClass(String.class);
job.setOutputValueClass(NullWritable.class);
}
public static void run() throws IOException, ClassNotFoundException,
InterruptedException {
String inputPath = ItemBasedCFDriver.path.get("step2InputPath");
String outputPath = ItemBasedCFDriver.path.get("step2OutputPath");
Configuration conf = new Configuration();
conf.set("mapred.textoutputformat.separator", ":");
Job job = Job.getInstance(conf);
HDFS hdfs = new HDFS(conf);
hdfs.rmr(outputPath);
job.setMapperClass(Step2_Mapper.class);
job.setReducerClass(Step2_Reducer.class);
job.setCombinerClass(Step2_Reducer.class);
job.setNumReduceTasks(ItemBasedCFDriver.ReducerNumber);
job.setJarByClass(CalculateSimilarityStep2.class);
job.setMapOutputKeyClass(IntWritable.class);
job.setMapOutputValueClass(Text.class);
job.setOutputKeyClass(IntWritable.class);
job.setOutputValueClass(Text.class);
job.setInputFormatClass(TextInputFormat.class);
job.setOutputFormatClass(TextOutputFormat.class);
FileInputFormat.setInputPaths(job, new Path(inputPath));
FileOutputFormat.setOutputPath(job, new Path(outputPath));
job.waitForCompletion(true);
}
protected int runCombineJob(String halvadeOutDir, String mergeOutDir, boolean featureCount) throws IOException, URISyntaxException, InterruptedException, ClassNotFoundException {
Configuration combineConf = getConf();
if(!halvadeOpts.out.endsWith("/")) halvadeOpts.out += "/";
HalvadeConf.setInputDir(combineConf, halvadeOutDir);
HalvadeConf.setOutDir(combineConf, mergeOutDir);
FileSystem outFs = FileSystem.get(new URI(mergeOutDir), combineConf);
if (outFs.exists(new Path(mergeOutDir))) {
Logger.INFO("The output directory \'" + mergeOutDir + "\' already exists.");
Logger.INFO("ERROR: Please remove this directory before trying again.");
System.exit(-2);
}
HalvadeConf.setReportAllVariant(combineConf, halvadeOpts.reportAll);
HalvadeResourceManager.setJobResources(halvadeOpts, combineConf, HalvadeResourceManager.COMBINE, false, halvadeOpts.useBamInput);
// halvadeOpts.splitChromosomes(combineConf, 0);
Job combineJob = Job.getInstance(combineConf, "HalvadeCombineVCF");
combineJob.setJarByClass(VCFCombineMapper.class);
addInputFiles(halvadeOutDir, combineConf, combineJob, featureCount ? ".count" : ".vcf");
FileOutputFormat.setOutputPath(combineJob, new Path(mergeOutDir));
combineJob.setMapperClass(featureCount ? HTSeqCombineMapper.class : VCFCombineMapper.class);
combineJob.setMapOutputKeyClass(featureCount ? Text.class : LongWritable.class);
combineJob.setMapOutputValueClass(featureCount ? LongWritable.class : VariantContextWritable.class);
combineJob.setInputFormatClass(featureCount ? TextInputFormat.class : VCFInputFormat.class);
combineJob.setNumReduceTasks(1);
combineJob.setReducerClass(featureCount ?
be.ugent.intec.halvade.hadoop.mapreduce.HTSeqCombineReducer.class :
be.ugent.intec.halvade.hadoop.mapreduce.VCFCombineReducer.class);
combineJob.setOutputKeyClass(Text.class);
combineJob.setOutputValueClass(featureCount ? LongWritable.class : VariantContextWritable.class);
return runTimedJob(combineJob, (featureCount ? "featureCounts" : "VCF") + " Combine Job");
}
public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
String namenode_ip = "192.168.17.10";
String hdfs = "hdfs://" + namenode_ip + ":9000";
Configuration conf = new Configuration();
conf.set("fs.defaultFS", hdfs);
conf.set("mapreduce.app-submission.cross-platform", "true");
String jobName = "PeopleRank";
Job job = Job.getInstance(conf, jobName);
job.setJarByClass(PeopleRank.class);
job.setJar("export\\PeopleRank.jar");
job.setMapperClass(PeopleRankMapper.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(Text.class);
job.setReducerClass(PeopleRankReducer.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Text.class);
String dataDir = "/expr/peoplerank/data";
String outputDir = "/expr/peoplerank/output/adjacent";
Path inPath = new Path(hdfs + dataDir);
Path outPath = new Path(hdfs + outputDir);
FileInputFormat.addInputPath(job, inPath);
FileOutputFormat.setOutputPath(job, outPath);
FileSystem fs = FileSystem.get(conf);
if(fs.exists(outPath)) {
fs.delete(outPath, true);
}
System.out.println( "Job: " + jobName + " is running...");
if(job.waitForCompletion(true)) {
System.out.println("success!");
System.exit(0);
} else {
System.out.println("failed!");
System.exit(1);
}
}
@Override
public int run(final String[] args) throws Exception {
final Configuration conf = getConf();
conf.set("tableName", ingestOptions.getQualifiedTableName());
conf.set("osmVisibility", ingestOptions.getVisibilityOptions().getVisibility());
// job settings
final Job job = Job.getInstance(conf, ingestOptions.getJobName());
job.setJarByClass(OSMRunner.class);
switch (ingestOptions.getMapperType()) {
case "NODE": {
configureSchema(AvroNode.getClassSchema());
inputAvroFile = ingestOptions.getNodesBasePath();
job.setMapperClass(OSMNodeMapper.class);
break;
}
case "WAY": {
configureSchema(AvroWay.getClassSchema());
inputAvroFile = ingestOptions.getWaysBasePath();
job.setMapperClass(OSMWayMapper.class);
break;
}
case "RELATION": {
configureSchema(AvroRelation.getClassSchema());
inputAvroFile = ingestOptions.getRelationsBasePath();
job.setMapperClass(OSMRelationMapper.class);
break;
}
default:
break;
}
if ((avroSchema == null) || (inputAvroFile == null)) {
throw new MissingArgumentException(
"argument for mapper type must be one of: NODE, WAY, or RELATION");
}
enableLocalityGroups(ingestOptions);
// input format
job.setInputFormatClass(AvroKeyInputFormat.class);
FileInputFormat.setInputPaths(job, inputAvroFile);
AvroJob.setInputKeySchema(job, avroSchema);
// mappper
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Mutation.class);
job.setOutputFormatClass(AccumuloOutputFormat.class);
AccumuloOutputFormat.setConnectorInfo(
job,
accumuloOptions.getUser(),
new PasswordToken(accumuloOptions.getPassword()));
AccumuloOutputFormat.setCreateTables(job, true);
AccumuloOutputFormat.setDefaultTableName(job, ingestOptions.getQualifiedTableName());
AccumuloOutputFormat.setZooKeeperInstance(
job,
new ClientConfiguration().withInstance(accumuloOptions.getInstance()).withZkHosts(
accumuloOptions.getZookeeper()));
// reducer
job.setNumReduceTasks(0);
return job.waitForCompletion(true) ? 0 : -1;
}
public static void main(String[] args) throws Exception {
//1.设置HDFS配置信息
String namenode_ip = "192.168.17.10";
String hdfs = "hdfs://" + namenode_ip + ":9000";
Configuration conf = new Configuration();
conf.set("fs.defaultFS", hdfs);
conf.set("mapreduce.app-submission.cross-platform", "true");
//2.设置MapReduce作业配置信息
String jobName = "DateCount"; //作业名称
Job job = Job.getInstance(conf, jobName);
job.setJarByClass(DateCount.class); //指定运行时作业类
job.setJar("export\\DateCount.jar"); //指定本地jar包
job.setMapperClass(DateCountMapper.class); //指定Mapper类
job.setMapOutputKeyClass(Text.class); //设置Mapper输出Key类型
job.setMapOutputValueClass(IntWritable.class); //设置Mapper输出Value类型
job.setReducerClass(DateCountReducer.class); //指定Reducer类
job.setOutputKeyClass(Text.class); //设置Reduce输出Key类型
job.setOutputValueClass(IntWritable.class); //设置Reduce输出Value类型
//3.设置作业输入和输出路径
String dataDir = "/expr/datecount/data"; //实验数据目录
String outputDir = "/expr/datecount/output"; //实验输出目录
Path inPath = new Path(hdfs + dataDir);
Path outPath = new Path(hdfs + outputDir);
FileInputFormat.addInputPath(job, inPath);
FileOutputFormat.setOutputPath(job, outPath);
FileSystem fs = FileSystem.get(conf);
if(fs.exists(outPath)) {
fs.delete(outPath, true);
}
//4.运行作业
System.out.println("Job: " + jobName + " is running...");
if(job.waitForCompletion(true)) {
System.out.println("success!");
System.exit(0);
} else {
System.out.println("failed!");
System.exit(1);
}
}
@SuppressWarnings("deprecation")
private Job createJob() throws Exception {
Job job = new Job();
DatasetKeyInputFormat.configure(job).readFrom(inputDataset).withType(GenericData.Record.class);
job.setMapperClass(LineCountMapper.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(IntWritable.class);
job.setReducerClass(GenericStatsReducer.class);
DatasetKeyOutputFormat.configure(job).writeTo(outputDataset).withType(GenericData.Record.class);
return job;
}
protected int runPass1RNAJob(Configuration pass1Conf, String tmpOutDir) throws IOException, InterruptedException, ClassNotFoundException, URISyntaxException {
HalvadeConf.setIsPass2(pass1Conf, false);
HalvadeResourceManager.setJobResources(halvadeOpts, pass1Conf, HalvadeResourceManager.RNA_SHMEM_PASS1, halvadeOpts.nodes == 1, halvadeOpts.useBamInput);
int pass2Reduces = HalvadeResourceManager.getPass2Reduces(halvadeOpts);
halvadeOpts.splitChromosomes(pass1Conf, pass2Reduces);
HalvadeConf.setPass2Suffix(pass1Conf, pass2suffix);
Job pass1Job = Job.getInstance(pass1Conf, "Halvade pass 1 RNA pipeline");
pass1Job.addCacheArchive(new URI(halvadeOpts.halvadeBinaries));
pass1Job.setJarByClass(be.ugent.intec.halvade.hadoop.mapreduce.HalvadeMapper.class);
// set pass 2 suffix so only this job finds it!
FileSystem fs = FileSystem.get(new URI(halvadeOpts.in), pass1Conf);
try {
if (fs.getFileStatus(new Path(halvadeOpts.in)).isDirectory()) {
// add every file in directory
FileStatus[] files = fs.listStatus(new Path(halvadeOpts.in));
for(FileStatus file : files) {
if (!file.isDirectory()) {
FileInputFormat.addInputPath(pass1Job, file.getPath());
}
}
} else {
FileInputFormat.addInputPath(pass1Job, new Path(halvadeOpts.in));
}
} catch (IOException | IllegalArgumentException e) {
Logger.EXCEPTION(e);
}
FileSystem outFs = FileSystem.get(new URI(tmpOutDir), pass1Conf);
boolean skipPass1 = false;
if (outFs.exists(new Path(tmpOutDir))) {
// check if genome already exists
skipPass1 = outFs.exists(new Path(tmpOutDir + "/_SUCCESS"));
if(skipPass1)
Logger.DEBUG("pass1 genome already created, skipping pass 1");
else {
Logger.INFO("The output directory \'" + tmpOutDir + "\' already exists.");
Logger.INFO("ERROR: Please remove this directory before trying again.");
System.exit(-2);
}
}
if(!skipPass1) {
FileOutputFormat.setOutputPath(pass1Job, new Path(tmpOutDir));
pass1Job.setMapperClass(be.ugent.intec.halvade.hadoop.mapreduce.StarAlignPassXMapper.class);
pass1Job.setInputFormatClass(HalvadeTextInputFormat.class);
pass1Job.setMapOutputKeyClass(GenomeSJ.class);
pass1Job.setMapOutputValueClass(Text.class);
pass1Job.setSortComparatorClass(GenomeSJSortComparator.class);
pass1Job.setGroupingComparatorClass(GenomeSJGroupingComparator.class);
pass1Job.setNumReduceTasks(1);
pass1Job.setReducerClass(be.ugent.intec.halvade.hadoop.mapreduce.RebuildStarGenomeReducer.class);
pass1Job.setOutputKeyClass(LongWritable.class);
pass1Job.setOutputValueClass(Text.class);
return runTimedJob(pass1Job, "Halvade pass 1 Job");
} else
return 0;
}
@Override
public int run(String[] args) throws Exception {
Job job = Job.getInstance(getConf());
Configuration configuration = job.getConfiguration();
job.setJarByClass(Aegisthus.class);
CommandLine cl = getOptions(args);
if (cl == null) {
return 1;
}
// Check all of the paths and load the sstable version from the input filenames
List<Path> paths = Lists.newArrayList();
if (cl.hasOption(Feature.CMD_ARG_INPUT_FILE)) {
for (String input : cl.getOptionValues(Feature.CMD_ARG_INPUT_FILE)) {
paths.add(new Path(input));
}
}
if (cl.hasOption(Feature.CMD_ARG_INPUT_DIR)) {
paths.addAll(getDataFiles(configuration, cl.getOptionValue(Feature.CMD_ARG_INPUT_DIR)));
}
LOG.info("Processing paths: {}", paths);
// At this point we have the version of sstable that we can use for this run
Descriptor.Version version = Descriptor.Version.CURRENT;
if (cl.hasOption(Feature.CMD_ARG_SSTABLE_OUTPUT_VERSION)) {
version = new Descriptor.Version(cl.getOptionValue(Feature.CMD_ARG_SSTABLE_OUTPUT_VERSION));
}
configuration.set(Feature.CONF_SSTABLE_VERSION, version.toString());
if (configuration.get(Feature.CONF_CQL_SCHEMA) != null) {
setConfigurationFromCql(configuration);
}
if(cl.hasOption(Feature.CMD_ARG_COMBINE_SPLITS)) {
job.setInputFormatClass(AegisthusCombinedInputFormat.class);
} else {
job.setInputFormatClass(AegisthusInputFormat.class);
}
job.setMapOutputKeyClass(AegisthusKey.class);
job.setMapOutputValueClass(AtomWritable.class);
job.setOutputKeyClass(AegisthusKey.class);
job.setOutputValueClass(RowWritable.class);
job.setMapperClass(AegisthusKeyMapper.class);
job.setReducerClass(CassSSTableReducer.class);
job.setGroupingComparatorClass(AegisthusKeyGroupingComparator.class);
job.setPartitionerClass(AegisthusKeyPartitioner.class);
job.setSortComparatorClass(AegisthusKeySortingComparator.class);
TextInputFormat.setInputPaths(job, paths.toArray(new Path[paths.size()]));
if (cl.hasOption(Feature.CMD_ARG_PRODUCE_SSTABLE)) {
job.setOutputFormatClass(SSTableOutputFormat.class);
} else {
job.setOutputFormatClass(JsonOutputFormat.class);
}
CustomFileNameFileOutputFormat.setOutputPath(job, new Path(cl.getOptionValue(Feature.CMD_ARG_OUTPUT_DIR)));
job.submit();
if (configuration.getBoolean(Feature.CONF_SHUTDOWN_HOOK, true)) {
Runtime.getRuntime().addShutdownHook(new JobKiller(job));
}
System.out.println(job.getJobID());
System.out.println(job.getTrackingURL());
boolean success = job.waitForCompletion(true);
if (success) {
Counter errorCounter = job.getCounters().findCounter("aegisthus", "error_skipped_input");
long errorCount = errorCounter != null ? errorCounter.getValue() : 0L;
int maxAllowed = configuration.getInt(Feature.CONF_MAX_CORRUPT_FILES_TO_SKIP, 0);
if (errorCounter != null && errorCounter.getValue() > maxAllowed) {
LOG.error("Found {} corrupt files which is greater than the max allowed {}", errorCount, maxAllowed);
success = false;
} else if (errorCount > 0) {
LOG.warn("Found {} corrupt files but not failing the job because the max allowed is {}",
errorCount, maxAllowed);
}
}
return success ? 0 : 1;
}
public void testBlurOutputFormatCleanupDuringJobKillTest() throws IOException, InterruptedException,
ClassNotFoundException {
Path input = getInDir();
Path output = getOutDir();
_fileSystem.delete(input, true);
_fileSystem.delete(output, true);
// 1500 * 50 = 75,000
writeRecordsFile(new Path(input, "part1"), 1, 50, 1, 1500, "cf1");
// 100 * 5000 = 500,000
writeRecordsFile(new Path(input, "part2"), 1, 5000, 2000, 100, "cf1");
Job job = Job.getInstance(_conf, "blur index");
job.setJarByClass(BlurOutputFormatTest.class);
job.setMapperClass(CsvBlurMapper.class);
job.setInputFormatClass(TextInputFormat.class);
FileInputFormat.addInputPath(job, input);
CsvBlurMapper.addColumns(job, "cf1", "col");
Path tablePath = new Path(new Path(_root, "table"), "test");
TableDescriptor tableDescriptor = new TableDescriptor();
tableDescriptor.setShardCount(2);
tableDescriptor.setTableUri(tablePath.toString());
tableDescriptor.setName("test");
createShardDirectories(getOutDir(), 2);
BlurOutputFormat.setupJob(job, tableDescriptor);
BlurOutputFormat.setOutputPath(job, output);
BlurOutputFormat.setIndexLocally(job, false);
job.submit();
boolean killCalled = false;
while (!job.isComplete()) {
Thread.sleep(1000);
System.out.printf("Killed [" + killCalled + "] Map [%f] Reduce [%f]%n", job.mapProgress() * 100,
job.reduceProgress() * 100);
if (job.reduceProgress() > 0.7 && !killCalled) {
job.killJob();
killCalled = true;
}
}
assertFalse(job.isSuccessful());
for (int i = 0; i < tableDescriptor.getShardCount(); i++) {
Path path = new Path(output, ShardUtil.getShardName(i));
FileSystem fileSystem = path.getFileSystem(job.getConfiguration());
FileStatus[] listStatus = fileSystem.listStatus(path);
assertEquals(toString(listStatus), 0, listStatus.length);
}
}