下面列出了org.apache.hadoop.mapreduce.Job#setSortComparatorClass ( ) 实例代码,或者点击链接到github查看源代码,也可以在右侧发表评论。
public static boolean runWithJob(Job job, String out_path) throws IOException, InterruptedException, ClassNotFoundException {
job.setJarByClass(merge_results_driver.class);
job.setJobName("Final Step: Merging results and creating separate LU decomposed components of input matrix");
FileOutputFormat.setOutputPath(job, new Path(out_path));
job.setMapperClass(lud.naiveGaussian.mergeResults.merge_results_mapper.class);
job.setReducerClass(lud.naiveGaussian.mergeResults.merge_results_reducer.class);
job.setMapOutputKeyClass(TextPair.class);
job.setMapOutputValueClass(Text.class);
job.setOutputKeyClass(TextPair.class);
job.setOutputValueClass(Text.class);
job.setPartitionerClass(TextPairPartitioner.class);
job.setGroupingComparatorClass(NaturalKeyGroupingComparator.class);
job.setSortComparatorClass(TextPairComparator.class);
boolean success = job.waitForCompletion(true);
return success;
}
public static boolean runWithJob(Job job, String out_path) throws IOException, InterruptedException, ClassNotFoundException {
job.setJarByClass(merge_results_driver.class);
job.setJobName("Final Step: Merging results and creating separate LU decomposed components of input matrix");
FileOutputFormat.setOutputPath(job, new Path(out_path));
job.setMapperClass(lu_decomposition.naive_gausssian.MergeResults.merge_results_mapper.class);
job.setReducerClass(lu_decomposition.naive_gausssian.MergeResults.merge_results_reducer.class);
job.setMapOutputKeyClass(TextPair.class);
job.setMapOutputValueClass(Text.class);
job.setOutputKeyClass(TextPair.class);
job.setOutputValueClass(Text.class);
job.setPartitionerClass(TextPairPartitioner.class);
job.setGroupingComparatorClass(NaturalKeyGroupingComparator.class);
job.setSortComparatorClass(TextPairComparator.class);
boolean success = job.waitForCompletion(true);
return success;
}
public static void runSortJob(Configuration conf, Path input, Path outputPath)
throws Exception {
Job job = new Job(conf);
job.setJarByClass(Main.class);
job.setMapperClass(SortMapReduce.Map.class);
job.setReducerClass(SortMapReduce.Reduce.class);
job.setInputFormatClass(KeyValueTextInputFormat.class);
job.setMapOutputKeyClass(Person.class);
job.setMapOutputValueClass(Person.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Text.class);
job.setPartitionerClass(PersonNamePartitioner.class);
job.setSortComparatorClass(PersonComparator.class);
job.setGroupingComparatorClass(PersonNameComparator.class);
FileInputFormat.setInputPaths(job, input);
FileOutputFormat.setOutputPath(job, outputPath);
job.waitForCompletion(true);
}
@Override
public int run(String[] args) throws Exception {
Configuration conf = getConf();
String outpath = conf.get(OUTPUTPATH);
Job job = new Job(conf, this.getClass().getSimpleName() + "_" + System.currentTimeMillis());
job.setJarByClass(this.getClass());
conf.setBoolean(MRJobConfig.MAPREDUCE_JOB_USER_CLASSPATH_FIRST, true);
MultipleInputs.addInputPath(job, new Path(PROSPECTSOUT.getAbsolutePath()),
SequenceFileInputFormat.class, JoinSelectAggregateMapper.class);
MultipleInputs.addInputPath(job,new Path(SPOOUT.getAbsolutePath()) ,
SequenceFileInputFormat.class, JoinSelectAggregateMapper.class);
job.setMapOutputKeyClass(CompositeType.class);
job.setMapOutputValueClass(TripleCard.class);
tempDir = new File(File.createTempFile(outpath, "txt").getParentFile(), System.currentTimeMillis() + "");
SequenceFileOutputFormat.setOutputPath(job, new Path(tempDir.getAbsolutePath()));
job.setOutputFormatClass(SequenceFileOutputFormat.class);
job.setOutputKeyClass(TripleEntry.class);
job.setOutputValueClass(CardList.class);
job.setSortComparatorClass(JoinSelectSortComparator.class);
job.setGroupingComparatorClass(JoinSelectGroupComparator.class);
job.setPartitionerClass(JoinSelectPartitioner.class);
job.setReducerClass(JoinReducer.class);
job.setNumReduceTasks(32);
job.waitForCompletion(true);
return job.isSuccessful() ? 0 : 1;
}
protected void configureMapper(Job job) {
job.setInputFormatClass(OrcValueCombineFileInputFormat.class);
job.setMapperClass(OrcValueMapper.class);
job.setMapOutputKeyClass(OrcKey.class);
job.setMapOutputValueClass(OrcValue.class);
job.setGroupingComparatorClass(OrcKeyComparator.class);
job.setSortComparatorClass(OrcKeyComparator.class);
}
/**
* The MapReduce driver - setup and launch the job.
*
* @param args the command-line arguments
* @return the process exit code
* @throws Exception if something goes wrong
*/
public int run(final String[] args) throws Exception {
Cli cli = Cli.builder().setArgs(args).addOptions(CliCommonOpts.MrIoOpts.values()).build();
int result = cli.runCmd();
if (result != 0) {
return result;
}
Path inputPath = new Path(cli.getArgValueAsString(CliCommonOpts.MrIoOpts.INPUT));
Path outputPath = new Path(cli.getArgValueAsString(CliCommonOpts.MrIoOpts.OUTPUT));
Configuration conf = super.getConf();
Job job = new Job(conf);
job.setJarByClass(SortMapReduce.class);
job.setMapperClass(Map.class);
job.setReducerClass(Reduce.class);
job.setInputFormatClass(KeyValueTextInputFormat.class);
job.setMapOutputKeyClass(Person.class);
job.setMapOutputValueClass(Text.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Text.class);
job.setPartitionerClass(PersonNamePartitioner.class);
job.setSortComparatorClass(PersonComparator.class);
job.setGroupingComparatorClass(PersonNameComparator.class);
FileInputFormat.setInputPaths(job, inputPath);
FileOutputFormat.setOutputPath(job, outputPath);
if (job.waitForCompletion(true)) {
return 0;
}
return 1;
}
public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
// 1.设置HDFS配置信息
String namenode_ip = "192.168.17.10";
String hdfs = "hdfs://" + namenode_ip + ":9000";
Configuration conf = new Configuration();
conf.set("fs.defaultFS", hdfs);
conf.set("mapreduce.app-submission.cross-platform", "true");
// 2.设置MapReduce作业配置信息
String jobName = "DateSortDesc"; // 定义作业名称
Job job = Job.getInstance(conf, jobName);
job.setJarByClass(DateSortAsc.class); // 指定作业类
job.setJar("export\\DateSortDesc.jar"); // 指定本地jar包
job.setMapperClass(SortMapper.class);
job.setMapOutputKeyClass(IntWritable.class);
job.setMapOutputValueClass(Text.class);
job.setReducerClass(SortReducer.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
// 指定排序所使用的比较器
job.setSortComparatorClass(MyComparator.class);
// 3.设置作业输入和输出路径
String dataDir = "/workspace/dateSort/data"; // 实验数据目录
String outputDir = "/workspace/dateSort/output"; // 实验输出目录
Path inPath = new Path(hdfs + dataDir);
Path outPath = new Path(hdfs + outputDir);
FileInputFormat.addInputPath(job, inPath);
FileOutputFormat.setOutputPath(job, outPath);
FileSystem fs = FileSystem.get(conf);
if (fs.exists(outPath)) {
fs.delete(outPath, true);
}
// 4.运行作业
System.out.println("Job: " + jobName + " is running...");
if (job.waitForCompletion(true)) {
System.out.println("success!");
System.exit(0);
} else {
System.out.println("failed!");
System.exit(1);
}
}
public static void main(String[] args) throws Exception {
//1.设置HDFS配置信息
String namenode_ip = "192.168.17.10";
String hdfs = "hdfs://" + namenode_ip + ":9000";
Configuration conf = new Configuration();
conf.set("fs.defaultFS", hdfs);
conf.set("mapreduce.app-submission.cross-platform", "true");
//2.设置MapReduce作业配置信息
String jobName = "DateSort2"; //定义作业名称
Job job = Job.getInstance(conf, jobName);
job.setJarByClass(DateSort2.class); //指定作业类
job.setJar("export\\DateSort2.jar"); //指定本地jar包
// Map
job.setMapperClass(DateSort2Mapper.class); //指定Mapper类
job.setMapOutputKeyClass(IntWritable.class); //设置Mapper输出Key类型
job.setMapOutputValueClass(Text.class); //设置Mapper输出Value类型
// Reduce
job.setReducerClass(DateSort2Reducer.class); //指定Reducer类
job.setOutputKeyClass(Text.class); //设置Reduce输出Key类型
job.setOutputValueClass(IntWritable.class); //设置Reduce输出Value类型
// 自定义Sort
job.setSortComparatorClass(MySort.class); //设置自定义排序类
//3.设置作业输入和输出路径
String dataDir = "/expr/datecount/output/part-r-00000"; //实验数据目录
String outputDir = "/expr/datecount/output_sort2"; //实验输出目录
Path inPath = new Path(hdfs + dataDir);
Path outPath = new Path(hdfs + outputDir);
FileInputFormat.addInputPath(job, inPath);
FileOutputFormat.setOutputPath(job, outPath);
FileSystem fs = FileSystem.get(conf);
if(fs.exists(outPath)) {
fs.delete(outPath, true);
}
//4.运行作业
System.out.println("Job: " + jobName + " is running...");
if(job.waitForCompletion(true)) {
System.out.println("success!");
System.exit(0);
} else {
System.out.println("failed!");
System.exit(1);
}
}
private void testComparator(String keySpec, int expect)
throws Exception {
String root = System.getProperty("test.build.data", "/tmp");
Path inDir = new Path(root, "test_cmp/in");
Path outDir = new Path(root, "test_cmp/out");
conf.set("mapreduce.partition.keycomparator.options", keySpec);
conf.set("mapreduce.partition.keypartitioner.options", "-k1.1,1.1");
conf.set(MRJobConfig.MAP_OUTPUT_KEY_FIELD_SEPERATOR, " ");
Job job = MapReduceTestUtil.createJob(conf, inDir, outDir, 1, 1,
line1 +"\n" + line2 + "\n");
job.setMapperClass(InverseMapper.class);
job.setReducerClass(Reducer.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(LongWritable.class);
job.setSortComparatorClass(KeyFieldBasedComparator.class);
job.setPartitionerClass(KeyFieldBasedPartitioner.class);
job.waitForCompletion(true);
assertTrue(job.isSuccessful());
// validate output
Path[] outputFiles = FileUtil.stat2Paths(getFileSystem().listStatus(outDir,
new Utils.OutputFileUtils.OutputFilesFilter()));
if (outputFiles.length > 0) {
InputStream is = getFileSystem().open(outputFiles[0]);
BufferedReader reader = new BufferedReader(new InputStreamReader(is));
String line = reader.readLine();
//make sure we get what we expect as the first line, and also
//that we have two lines (both the lines must end up in the same
//reducer since the partitioner takes the same key spec for all
//lines
if (expect == 1) {
assertTrue(line.startsWith(line1));
} else if (expect == 2) {
assertTrue(line.startsWith(line2));
}
line = reader.readLine();
if (expect == 1) {
assertTrue(line.startsWith(line2));
} else if (expect == 2) {
assertTrue(line.startsWith(line1));
}
reader.close();
}
}
private void testComparator(String keySpec, int expect)
throws Exception {
String root = System.getProperty("test.build.data", "/tmp");
Path inDir = new Path(root, "test_cmp/in");
Path outDir = new Path(root, "test_cmp/out");
conf.set("mapreduce.partition.keycomparator.options", keySpec);
conf.set("mapreduce.partition.keypartitioner.options", "-k1.1,1.1");
conf.set(MRJobConfig.MAP_OUTPUT_KEY_FIELD_SEPERATOR, " ");
Job job = MapReduceTestUtil.createJob(conf, inDir, outDir, 1, 1,
line1 +"\n" + line2 + "\n");
job.setMapperClass(InverseMapper.class);
job.setReducerClass(Reducer.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(LongWritable.class);
job.setSortComparatorClass(KeyFieldBasedComparator.class);
job.setPartitionerClass(KeyFieldBasedPartitioner.class);
job.waitForCompletion(true);
assertTrue(job.isSuccessful());
// validate output
Path[] outputFiles = FileUtil.stat2Paths(getFileSystem().listStatus(outDir,
new Utils.OutputFileUtils.OutputFilesFilter()));
if (outputFiles.length > 0) {
InputStream is = getFileSystem().open(outputFiles[0]);
BufferedReader reader = new BufferedReader(new InputStreamReader(is));
String line = reader.readLine();
//make sure we get what we expect as the first line, and also
//that we have two lines (both the lines must end up in the same
//reducer since the partitioner takes the same key spec for all
//lines
if (expect == 1) {
assertTrue(line.startsWith(line1));
} else if (expect == 2) {
assertTrue(line.startsWith(line2));
}
line = reader.readLine();
if (expect == 1) {
assertTrue(line.startsWith(line2));
} else if (expect == 2) {
assertTrue(line.startsWith(line1));
}
reader.close();
}
}
/**
* After adding data to the table start a mr job to
* @throws IOException
* @throws ClassNotFoundException
* @throws InterruptedException
*/
private void runCheck() throws IOException, ClassNotFoundException, InterruptedException {
LOG.info("Running check");
Configuration conf = getConf();
String jobName = getTablename() + "_check" + EnvironmentEdgeManager.currentTime();
Path p = util.getDataTestDirOnTestFS(jobName);
Job job = new Job(conf);
job.setJarByClass(getClass());
job.setJobName(jobName);
job.setPartitionerClass(NaturalKeyPartitioner.class);
job.setGroupingComparatorClass(NaturalKeyGroupingComparator.class);
job.setSortComparatorClass(CompositeKeyComparator.class);
Scan scan = new Scan();
scan.addFamily(CHAIN_FAM);
scan.addFamily(SORT_FAM);
scan.readVersions(1);
scan.setCacheBlocks(false);
scan.setBatch(1000);
int replicaCount = conf.getInt(NUM_REPLICA_COUNT_KEY, NUM_REPLICA_COUNT_DEFAULT);
if (replicaCount != NUM_REPLICA_COUNT_DEFAULT) {
scan.setConsistency(Consistency.TIMELINE);
}
TableMapReduceUtil.initTableMapperJob(
getTablename().getName(),
scan,
LinkedListCheckingMapper.class,
LinkKey.class,
LinkChain.class,
job
);
job.setReducerClass(LinkedListCheckingReducer.class);
job.setOutputKeyClass(NullWritable.class);
job.setOutputValueClass(NullWritable.class);
FileOutputFormat.setOutputPath(job, p);
assertEquals(true, job.waitForCompletion(true));
// Delete the files.
util.getTestFileSystem().delete(p, true);
}
public static void main(String[] args) throws Exception {
Configuration conf = new Configuration();
String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();
if (otherArgs.length != 2) {
System.err.println("Usage: MRSessionize <in> <out>");
System.exit(2);
}
Job job = new Job(conf, "MapReduce Sessionization");
job.setJarByClass(MRSessionize.class);
job.setMapperClass(SessionizeMapper.class);
job.setReducerClass(SessionizeReducer.class);
// WARNING: do NOT set the Combiner class
// from the same IP in one place before we can do sessionization
// Also, our reducer doesn't return the same key,value types it takes
// It can't be used on the result of a previous reducer
job.setMapOutputKeyClass(IpTimestampKey.class);
job.setMapOutputValueClass(Text.class);
job.setOutputKeyClass(NullWritable.class);
job.setOutputValueClass(Text.class);
// We need these for secondary sorting.
// We need to shuffle the records (between Map and Reduce phases) by using IP address as key, since that is
// the field we are using for determining uniqueness of users. However, when the records arrive to the reducers,
// we would like them to be sorted in ascending order of their timestamps. This concept is known as secondary
// sorting since we are "secondarily" sorting the records by another key (timestamp, in our case) in addition
// to the shuffle key (also called the "partition" key).
// So, to get some terminology straight.
// Natural key (aka Shuffle key or Partition key) is the key we use to shuffle. IP address in our case
// Secondary Sorting Key is the key we use to sort within each partition that gets sent to the user. Timestamp
// in our case.
// Together, the natural key and secondary sorting key form what we call the composite key. This key is called
// IpTimestampKey in our example.
// For secondary sorting, even though we are partitioning and shuffling by only the natural key, the map output
// key and the reduce input key is the composite key. We, however, use a custom partitioner and custom grouping
// comparator that only uses the natural key part of the composite key to partition and group respectively (both
// happen during the shuffle phase).
// However, we have a different sort comparator which also gets used in the shuffle phase but determines how
// the records are sorted when they enter the reduce phase. This custom sort comparator in our case will make use
// of the entire composite key.
// We found http://vangjee.wordpress.com/2012/03/20/secondary-sorting-aka-sorting-values-in-hadoops-mapreduce-programming-paradigm/
// to be very helpful, if you'd like to read more on the subject.
job.setPartitionerClass(NaturalKeyPartitioner.class);
job.setGroupingComparatorClass(NaturalKeyComparator.class);
job.setSortComparatorClass(CompositeKeyComparator.class);
FileInputFormat.addInputPath(job, new Path(otherArgs[0]));
FileOutputFormat.setOutputPath(job, new Path(otherArgs[1]));
System.exit(job.waitForCompletion(true) ? 0 : 1);
}
public static void main(String[] args) throws Exception {
Configuration conf = new Configuration();
String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();
if (otherArgs.length != 2) {
System.err.println("Usage: MRSessionize <in> <out>");
System.exit(2);
}
Job job = new Job(conf, "MapReduce Sessionization");
job.setJarByClass(MRSessionize.class);
job.setMapperClass(SessionizeMapper.class);
job.setReducerClass(SessionizeReducer.class);
// WARNING: do NOT set the Combiner class
// from the same IP in one place before we can do sessionization
// Also, our reducer doesn't return the same key,value types it takes
// It can't be used on the result of a previous reducer
job.setMapOutputKeyClass(IpTimestampKey.class);
job.setMapOutputValueClass(Text.class);
job.setOutputKeyClass(NullWritable.class);
job.setOutputValueClass(Text.class);
// We need these for secondary sorting.
// We need to shuffle the records (between Map and Reduce phases) by using IP address as key, since that is
// the field we are using for determining uniqueness of users. However, when the records arrive to the reducers,
// we would like them to be sorted in ascending order of their timestamps. This concept is known as secondary
// sorting since we are "secondarily" sorting the records by another key (timestamp, in our case) in addition
// to the shuffle key (also called the "partition" key).
// So, to get some terminology straight.
// Natural key (aka Shuffle key or Partition key) is the key we use to shuffle. IP address in our case
// Secondary Sorting Key is the key we use to sort within each partition that gets sent to the user. Timestamp
// in our case.
// Together, the natural key and secondary sorting key form what we call the composite key. This key is called
// IpTimestampKey in our example.
// For secondary sorting, even though we are partitioning and shuffling by only the natural key, the map output
// key and the reduce input key is the composite key. We, however, use a custom partitioner and custom grouping
// comparator that only uses the natural key part of the composite key to partition and group respectively (both
// happen during the shuffle phase).
// However, we have a different sort comparator which also gets used in the shuffle phase but determines how
// the records are sorted when they enter the reduce phase. This custom sort comparator in our case will make use
// of the entire composite key.
// We found http://vangjee.wordpress.com/2012/03/20/secondary-sorting-aka-sorting-values-in-hadoops-mapreduce-programming-paradigm/
// to be very helpful, if you'd like to read more on the subject.
job.setPartitionerClass(NaturalKeyPartitioner.class);
job.setGroupingComparatorClass(NaturalKeyComparator.class);
job.setSortComparatorClass(CompositeKeyComparator.class);
FileInputFormat.addInputPath(job, new Path(otherArgs[0]));
FileOutputFormat.setOutputPath(job, new Path(otherArgs[1]));
System.exit(job.waitForCompletion(true) ? 0 : 1);
}
@Override
public int run(String[] args) throws Exception {
Job job = Job.getInstance(getConf());
Configuration configuration = job.getConfiguration();
job.setJarByClass(Aegisthus.class);
CommandLine cl = getOptions(args);
if (cl == null) {
return 1;
}
// Check all of the paths and load the sstable version from the input filenames
List<Path> paths = Lists.newArrayList();
if (cl.hasOption(Feature.CMD_ARG_INPUT_FILE)) {
for (String input : cl.getOptionValues(Feature.CMD_ARG_INPUT_FILE)) {
paths.add(new Path(input));
}
}
if (cl.hasOption(Feature.CMD_ARG_INPUT_DIR)) {
paths.addAll(getDataFiles(configuration, cl.getOptionValue(Feature.CMD_ARG_INPUT_DIR)));
}
LOG.info("Processing paths: {}", paths);
// At this point we have the version of sstable that we can use for this run
Descriptor.Version version = Descriptor.Version.CURRENT;
if (cl.hasOption(Feature.CMD_ARG_SSTABLE_OUTPUT_VERSION)) {
version = new Descriptor.Version(cl.getOptionValue(Feature.CMD_ARG_SSTABLE_OUTPUT_VERSION));
}
configuration.set(Feature.CONF_SSTABLE_VERSION, version.toString());
if (configuration.get(Feature.CONF_CQL_SCHEMA) != null) {
setConfigurationFromCql(configuration);
}
if(cl.hasOption(Feature.CMD_ARG_COMBINE_SPLITS)) {
job.setInputFormatClass(AegisthusCombinedInputFormat.class);
} else {
job.setInputFormatClass(AegisthusInputFormat.class);
}
job.setMapOutputKeyClass(AegisthusKey.class);
job.setMapOutputValueClass(AtomWritable.class);
job.setOutputKeyClass(AegisthusKey.class);
job.setOutputValueClass(RowWritable.class);
job.setMapperClass(AegisthusKeyMapper.class);
job.setReducerClass(CassSSTableReducer.class);
job.setGroupingComparatorClass(AegisthusKeyGroupingComparator.class);
job.setPartitionerClass(AegisthusKeyPartitioner.class);
job.setSortComparatorClass(AegisthusKeySortingComparator.class);
TextInputFormat.setInputPaths(job, paths.toArray(new Path[paths.size()]));
if (cl.hasOption(Feature.CMD_ARG_PRODUCE_SSTABLE)) {
job.setOutputFormatClass(SSTableOutputFormat.class);
} else {
job.setOutputFormatClass(JsonOutputFormat.class);
}
CustomFileNameFileOutputFormat.setOutputPath(job, new Path(cl.getOptionValue(Feature.CMD_ARG_OUTPUT_DIR)));
job.submit();
if (configuration.getBoolean(Feature.CONF_SHUTDOWN_HOOK, true)) {
Runtime.getRuntime().addShutdownHook(new JobKiller(job));
}
System.out.println(job.getJobID());
System.out.println(job.getTrackingURL());
boolean success = job.waitForCompletion(true);
if (success) {
Counter errorCounter = job.getCounters().findCounter("aegisthus", "error_skipped_input");
long errorCount = errorCounter != null ? errorCounter.getValue() : 0L;
int maxAllowed = configuration.getInt(Feature.CONF_MAX_CORRUPT_FILES_TO_SKIP, 0);
if (errorCounter != null && errorCounter.getValue() > maxAllowed) {
LOG.error("Found {} corrupt files which is greater than the max allowed {}", errorCount, maxAllowed);
success = false;
} else if (errorCount > 0) {
LOG.warn("Found {} corrupt files but not failing the job because the max allowed is {}",
errorCount, maxAllowed);
}
}
return success ? 0 : 1;
}
protected int runHalvadeJob(Configuration halvadeConf, String tmpOutDir, int jobType) throws IOException, URISyntaxException, InterruptedException, ClassNotFoundException {
String pipeline = "";
if(jobType == HalvadeResourceManager.RNA_SHMEM_PASS2) {
HalvadeConf.setIsPass2(halvadeConf, true);
HalvadeResourceManager.setJobResources(halvadeOpts, halvadeConf, jobType, false, halvadeOpts.useBamInput);
pipeline = RNA_PASS2;
} else if(jobType == HalvadeResourceManager.DNA) {
HalvadeResourceManager.setJobResources(halvadeOpts, halvadeConf, jobType, false, halvadeOpts.useBamInput);
pipeline = DNA;
}
halvadeOpts.splitChromosomes(halvadeConf, 0);
HalvadeConf.setOutDir(halvadeConf, tmpOutDir);
FileSystem outFs = FileSystem.get(new URI(tmpOutDir), halvadeConf);
if (outFs.exists(new Path(tmpOutDir))) {
Logger.INFO("The output directory \'" + tmpOutDir + "\' already exists.");
Logger.INFO("ERROR: Please remove this directory before trying again.");
System.exit(-2);
}
if(halvadeOpts.useBamInput)
setHeaderFile(halvadeOpts.in, halvadeConf);
if(halvadeOpts.rnaPipeline)
HalvadeConf.setPass2Suffix(halvadeConf, pass2suffix);
Job halvadeJob = Job.getInstance(halvadeConf, "Halvade" + pipeline);
halvadeJob.addCacheArchive(new URI(halvadeOpts.halvadeBinaries));
halvadeJob.setJarByClass(be.ugent.intec.halvade.hadoop.mapreduce.HalvadeMapper.class);
addInputFiles(halvadeOpts.in, halvadeConf, halvadeJob);
FileOutputFormat.setOutputPath(halvadeJob, new Path(tmpOutDir));
if(jobType == HalvadeResourceManager.RNA_SHMEM_PASS2) {
halvadeJob.setMapperClass(be.ugent.intec.halvade.hadoop.mapreduce.StarAlignPassXMapper.class);
halvadeJob.setReducerClass(be.ugent.intec.halvade.hadoop.mapreduce.RnaGATKReducer.class);
} else if(jobType == HalvadeResourceManager.DNA){
halvadeJob.setMapperClass(halvadeOpts.alignmentTools[halvadeOpts.aln]);
halvadeJob.setReducerClass(be.ugent.intec.halvade.hadoop.mapreduce.DnaGATKReducer.class);
}
halvadeJob.setMapOutputKeyClass(ChromosomeRegion.class);
halvadeJob.setMapOutputValueClass(SAMRecordWritable.class);
halvadeJob.setInputFormatClass(HalvadeTextInputFormat.class);
halvadeJob.setOutputKeyClass(Text.class);
if(halvadeOpts.mergeBam) {
halvadeJob.setSortComparatorClass(SimpleChrRegionComparator.class);
halvadeJob.setOutputValueClass(SAMRecordWritable.class);
}else {
halvadeJob.setPartitionerClass(ChrRgPartitioner.class);
halvadeJob.setSortComparatorClass(ChrRgSortComparator.class);
halvadeJob.setGroupingComparatorClass(ChrRgGroupingComparator.class);
halvadeJob.setOutputValueClass(VariantContextWritable.class);
}
if(halvadeOpts.justAlign && !halvadeOpts.mergeBam)
halvadeJob.setNumReduceTasks(0);
else if (halvadeOpts.mergeBam) {
halvadeJob.setReducerClass(be.ugent.intec.halvade.hadoop.mapreduce.BamMergeReducer.class);
halvadeJob.setNumReduceTasks(1);
} else {
halvadeJob.setNumReduceTasks(halvadeOpts.reduces);
if(halvadeOpts.countOnly) {
halvadeJob.setReducerClass(be.ugent.intec.halvade.hadoop.mapreduce.CountReadsReducer.class);
halvadeJob.setOutputValueClass(LongWritable.class);
}
}
if(halvadeOpts.useBamInput) {
halvadeJob.setMapperClass(be.ugent.intec.halvade.hadoop.mapreduce.AlignedBamMapper.class);
halvadeJob.setInputFormatClass(BAMInputFormat.class);
}
return runTimedJob(halvadeJob, "Halvade Job");
}
public static void runSortJob(String input, String output)
throws Exception {
Configuration conf = new Configuration();
Job job = new Job(conf);
job.setJarByClass(PersonSortMapReduce.class);
job.setMapperClass(Map.class);
job.setReducerClass(Reduce.class);
job.setInputFormatClass(KeyValueTextInputFormat.class);
job.setMapOutputKeyClass(Person.class);
job.setMapOutputValueClass(Text.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Text.class);
job.setSortComparatorClass(PersonBinaryComparator.class);
Path outputPath = new Path(output);
FileInputFormat.setInputPaths(job, input);
FileOutputFormat.setOutputPath(job, outputPath);
outputPath.getFileSystem(conf).delete(outputPath, true);
job.waitForCompletion(true);
}
public static void runSortJob(String input, String output)
throws Exception {
Configuration conf = new Configuration();
Job job = new Job(conf);
job.setJarByClass(CloneReduce.class);
job.setReducerClass(Reduce.class);
job.setInputFormatClass(KeyValueTextInputFormat.class);
job.setMapOutputKeyClass(Person.class);
job.setMapOutputValueClass(Text.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Text.class);
job.setSortComparatorClass(PersonBinaryComparator.class);
Path outputPath = new Path(output);
FileInputFormat.setInputPaths(job, input);
FileOutputFormat.setOutputPath(job, outputPath);
outputPath.getFileSystem(conf).delete(outputPath, true);
job.waitForCompletion(true);
}
public static void runSortJob(String input, String output)
throws Exception {
Configuration conf = new Configuration();
Job job = new Job(conf);
job.setJarByClass(SampleMapReduce.class);
job.setMapperClass(Map.class);
job.setReducerClass(Reduce.class);
job.setInputFormatClass(KeyValueTextInputFormat.class);
job.setMapOutputKeyClass(Person.class);
job.setMapOutputValueClass(Text.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Text.class);
job.setPartitionerClass(PersonNamePartitioner.class);
job.setSortComparatorClass(PersonComparator.class);
job.setGroupingComparatorClass(PersonNameComparator.class);
Path outputPath = new Path(output);
FileInputFormat.setInputPaths(job, input);
FileOutputFormat.setOutputPath(job, outputPath);
outputPath.getFileSystem(conf).delete(outputPath, true);
job.waitForCompletion(true);
}
public static void main(String[] args) throws Exception {
Configuration conf = new Configuration(true);
conf.set("mapreduce.framework.name","local");
conf.set("mapreduce.app-submission.cross-platform","true");
String[] other = new GenericOptionsParser(conf, args).getRemainingArgs();
Job job = Job.getInstance(conf);
job.setJarByClass(MyTopN.class);
job.setJobName("TopN");
job.setJar("C:\\Users\\Administrator\\IdeaProjects\\msbhadoop\\target\\hadoop-hdfs-1.0-0.1.jar");
//客户端规划的时候讲join的右表cache到mapTask出现的节点上
job.addCacheFile(new Path("/data/topn/dict/dict.txt").toUri());
//初学者,关注的是client端的代码梳理:因为把这块写明白了,其实你也就真的知道这个作业的开发原理;
//maptask
//input
TextInputFormat.addInputPath(job,new Path(other[0]));
Path outPath = new Path(other[1]);
if(outPath.getFileSystem(conf).exists(outPath)) outPath.getFileSystem(conf).delete(outPath,true);
TextOutputFormat.setOutputPath(job,outPath);
//key
//map
job.setMapperClass(TMapper.class);
job.setMapOutputKeyClass(TKey.class);
job.setMapOutputValueClass(IntWritable.class);
//partitioner 按 年,月 分区 -》 分区 > 分组 按 年分区!!!!!!
//分区器潜台词:满足 相同的key获得相同的分区号就可以~!
job.setPartitionerClass(TPartitioner.class);
//sortComparator 年,月,温度 且 温度倒序
job.setSortComparatorClass(TSortComparator.class);
//combine
// job.setCombinerClass();
//reducetask
//groupingComparator
job.setGroupingComparatorClass(TGroupingComparator.class);
//reduce
job.setReducerClass(TReducer.class);
job.waitForCompletion(true);
}
/**
* @throws Exception If failed.
*/
@Test
public void testSortSimple() throws Exception {
// Generate test data.
Job job = Job.getInstance();
job.setInputFormatClass(InFormat.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(NullWritable.class);
job.setMapperClass(Mapper.class);
job.setNumReduceTasks(0);
setupFileSystems(job.getConfiguration());
FileOutputFormat.setOutputPath(job, new Path(igfsScheme() + PATH_INPUT));
X.printerrln("Data generation started.");
grid(0).hadoop().submit(new HadoopJobId(UUID.randomUUID(), 1),
createJobInfo(job.getConfiguration(), null)).get(180000);
X.printerrln("Data generation complete.");
// Run main map-reduce job.
job = Job.getInstance();
setupFileSystems(job.getConfiguration());
job.getConfiguration().set(CommonConfigurationKeys.IO_SERIALIZATIONS_KEY, JavaSerialization.class.getName() +
"," + WritableSerialization.class.getName());
FileInputFormat.setInputPaths(job, new Path(igfsScheme() + PATH_INPUT));
FileOutputFormat.setOutputPath(job, new Path(igfsScheme() + PATH_OUTPUT));
job.setSortComparatorClass(JavaSerializationComparator.class);
job.setMapperClass(MyMapper.class);
job.setReducerClass(MyReducer.class);
job.setNumReduceTasks(2);
job.setMapOutputKeyClass(UUID.class);
job.setMapOutputValueClass(NullWritable.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(NullWritable.class);
X.printerrln("Job started.");
grid(0).hadoop().submit(new HadoopJobId(UUID.randomUUID(), 2),
createJobInfo(job.getConfiguration(), null)).get(180000);
X.printerrln("Job complete.");
// Check result.
Path outDir = new Path(igfsScheme() + PATH_OUTPUT);
AbstractFileSystem fs = AbstractFileSystem.get(new URI(igfsScheme()), job.getConfiguration());
for (FileStatus file : fs.listStatus(outDir)) {
X.printerrln("__ file: " + file);
if (file.getLen() == 0)
continue;
FSDataInputStream in = fs.open(file.getPath());
Scanner sc = new Scanner(in);
UUID prev = null;
while (sc.hasNextLine()) {
UUID next = UUID.fromString(sc.nextLine());
// X.printerrln("___ check: " + next);
if (prev != null)
assertTrue(prev.compareTo(next) < 0);
prev = next;
}
}
}