下面列出了org.apache.hadoop.mapreduce.Job#setPartitionerClass ( ) 实例代码,或者点击链接到github查看源代码,也可以在右侧发表评论。
public Job createJob()
throws IOException {
Configuration conf = getConf();
conf.setInt(MRJobConfig.NUM_MAPS, 1);
Job job = Job.getInstance(conf, "test");
job.setNumReduceTasks(1);
job.setJarByClass(CredentialsTestJob.class);
job.setNumReduceTasks(1);
job.setMapperClass(CredentialsTestJob.CredentialsTestMapper.class);
job.setMapOutputKeyClass(IntWritable.class);
job.setMapOutputValueClass(NullWritable.class);
job.setReducerClass(CredentialsTestJob.CredentialsTestReducer.class);
job.setInputFormatClass(SleepJob.SleepInputFormat.class);
job.setPartitionerClass(SleepJob.SleepJobPartitioner.class);
job.setOutputFormatClass(NullOutputFormat.class);
job.setSpeculativeExecution(false);
job.setJobName("test job");
FileInputFormat.addInputPath(job, new Path("ignored"));
return job;
}
private void setupSplitsFile(final Job job, final TableOperations parentTableOperations, final String parentTableName, final String childTableName) throws Exception {
final FileSystem fs = FileSystem.get(conf);
fs.setPermission(getPath(baseOutputDir, childTableName), new FsPermission(FsAction.ALL, FsAction.ALL, FsAction.ALL));
final Path splitsPath = getPath(baseOutputDir, childTableName, "splits.txt");
final Collection<Text> splits = parentTableOperations.listSplits(parentTableName, 100);
log.info("Creating splits file at: " + splitsPath);
try (PrintStream out = new PrintStream(new BufferedOutputStream(fs.create(splitsPath)), false, StandardCharsets.UTF_8.name())) {
for (final Text split : splits) {
final String encoded = new String(Base64.encodeBase64(TextUtil.getBytes(split)), StandardCharsets.UTF_8);
out.println(encoded);
}
}
fs.setPermission(splitsPath, new FsPermission(FsAction.ALL, FsAction.ALL, FsAction.ALL));
final String userDir = PathUtils.clean(System.getProperty("user.dir"));
// The splits file has a symlink created in the user directory for some reason.
// It might be better to copy the entire file for Windows but it doesn't seem to matter if
// the user directory symlink is broken.
java.nio.file.Files.deleteIfExists(new File(userDir, "splits.txt").toPath());
//Files.copy(new File(splitsPath.toString()), new File(userDir, "splits.txt"));
job.setPartitionerClass(KeyRangePartitioner.class);
KeyRangePartitioner.setSplitFile(job, splitsPath.toString());
job.setNumReduceTasks(splits.size() + 1);
}
public static boolean runWithJob(Job job, String out_path) throws IOException, InterruptedException, ClassNotFoundException {
job.setJarByClass(merge_results_driver.class);
job.setJobName("Final Step: Merging results and creating separate LU decomposed components of input matrix");
FileOutputFormat.setOutputPath(job, new Path(out_path));
job.setMapperClass(lud.naiveGaussian.mergeResults.merge_results_mapper.class);
job.setReducerClass(lud.naiveGaussian.mergeResults.merge_results_reducer.class);
job.setMapOutputKeyClass(TextPair.class);
job.setMapOutputValueClass(Text.class);
job.setOutputKeyClass(TextPair.class);
job.setOutputValueClass(Text.class);
job.setPartitionerClass(TextPairPartitioner.class);
job.setGroupingComparatorClass(NaturalKeyGroupingComparator.class);
job.setSortComparatorClass(TextPairComparator.class);
boolean success = job.waitForCompletion(true);
return success;
}
public static void setupReducer(Job job, CubeSegment cubeSegment, Path output) throws IOException {
// Output
//// prevent to create zero-sized default output
LazyOutputFormat.setOutputFormatClass(job, SequenceFileOutputFormat.class);
FileOutputFormat.setOutputPath(job, output);
// Reducer
job.setReducerClass(ConvergeCuboidDataReducer.class);
job.setPartitionerClass(ConvergeCuboidDataPartitioner.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Text.class);
Pair<Integer, Integer> numReduceTasks = MapReduceUtil.getConvergeCuboidDataReduceTaskNums(cubeSegment);
job.setNumReduceTasks(numReduceTasks.getFirst());
int nBaseReduceTasks = numReduceTasks.getSecond();
boolean enableSharding = cubeSegment.isEnableSharding();
long baseCuboidId = cubeSegment.getCuboidScheduler().getBaseCuboidId();
String partiParams = enableSharding + "," + baseCuboidId + "," + nBaseReduceTasks;
job.getConfiguration().set(BatchConstants.CFG_CONVERGE_CUBOID_PARTITION_PARAM, partiParams);
}
/**
* Configure <code>job</code> with a TotalOrderPartitioner, partitioning against
* <code>splitPoints</code>. Cleans up the partitions file after job exists.
*/
static void configurePartitioner(Job job, List<ImmutableBytesWritable> splitPoints) throws IOException {
Configuration conf = job.getConfiguration();
// create the partitions file
FileSystem fs = FileSystem.get(conf);
Path partitionsPath = new Path(conf.get("hbase.fs.tmp.dir"), "partitions_" + RandomUtil.randomUUID());
fs.makeQualified(partitionsPath);
writePartitions(conf, partitionsPath, splitPoints);
fs.deleteOnExit(partitionsPath);
// configure job to use it
job.setPartitionerClass(TotalOrderPartitioner.class);
TotalOrderPartitioner.setPartitionFile(conf, partitionsPath);
}
public static void main(String[] args) throws Exception {
Configuration conf = new Configuration();
String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();
if (otherArgs.length != 2) {
System.err.println("Usage: MRSessionize <in> <out>");
System.exit(2);
}
Job job = new Job(conf, "MapReduce Sessionization");
job.setJarByClass(MRSessionize.class);
job.setMapperClass(SessionizeMapper.class);
job.setReducerClass(SessionizeReducer.class);
// WARNING: do NOT set the Combiner class
// from the same IP in one place before we can do sessionization
// Also, our reducer doesn't return the same key,value types it takes
// It can't be used on the result of a previous reducer
job.setMapOutputKeyClass(IpTimestampKey.class);
job.setMapOutputValueClass(Text.class);
job.setOutputKeyClass(NullWritable.class);
job.setOutputValueClass(Text.class);
// We need these for secondary sorting.
// We need to shuffle the records (between Map and Reduce phases) by using IP address as key, since that is
// the field we are using for determining uniqueness of users. However, when the records arrive to the reducers,
// we would like them to be sorted in ascending order of their timestamps. This concept is known as secondary
// sorting since we are "secondarily" sorting the records by another key (timestamp, in our case) in addition
// to the shuffle key (also called the "partition" key).
// So, to get some terminology straight.
// Natural key (aka Shuffle key or Partition key) is the key we use to shuffle. IP address in our case
// Secondary Sorting Key is the key we use to sort within each partition that gets sent to the user. Timestamp
// in our case.
// Together, the natural key and secondary sorting key form what we call the composite key. This key is called
// IpTimestampKey in our example.
// For secondary sorting, even though we are partitioning and shuffling by only the natural key, the map output
// key and the reduce input key is the composite key. We, however, use a custom partitioner and custom grouping
// comparator that only uses the natural key part of the composite key to partition and group respectively (both
// happen during the shuffle phase).
// However, we have a different sort comparator which also gets used in the shuffle phase but determines how
// the records are sorted when they enter the reduce phase. This custom sort comparator in our case will make use
// of the entire composite key.
// We found http://vangjee.wordpress.com/2012/03/20/secondary-sorting-aka-sorting-values-in-hadoops-mapreduce-programming-paradigm/
// to be very helpful, if you'd like to read more on the subject.
job.setPartitionerClass(NaturalKeyPartitioner.class);
job.setGroupingComparatorClass(NaturalKeyComparator.class);
job.setSortComparatorClass(CompositeKeyComparator.class);
FileInputFormat.addInputPath(job, new Path(otherArgs[0]));
FileOutputFormat.setOutputPath(job, new Path(otherArgs[1]));
System.exit(job.waitForCompletion(true) ? 0 : 1);
}
public int run(String[] args) throws Exception {
if (args.length != 2) {
System.err.println("Invalid no. of arguments provided");
System.err.println("Usage: terasort <input-dir> <output-dir>");
return -1;
}
LOG.info("starting");
Job job = Job.getInstance(getConf());
Path inputDir = new Path(args[0]);
Path outputDir = new Path(args[1]);
boolean useSimplePartitioner = getUseSimplePartitioner(job);
TeraInputFormat.setInputPaths(job, inputDir);
FileOutputFormat.setOutputPath(job, outputDir);
job.setJobName("TeraSort");
job.setJarByClass(TeraSort.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Text.class);
job.setInputFormatClass(TeraInputFormat.class);
job.setOutputFormatClass(TeraOutputFormat.class);
if (useSimplePartitioner) {
job.setPartitionerClass(SimplePartitioner.class);
} else {
long start = System.currentTimeMillis();
Path partitionFile = new Path(outputDir,
TeraInputFormat.PARTITION_FILENAME);
URI partitionUri = new URI(partitionFile.toString() +
"#" + TeraInputFormat.PARTITION_FILENAME);
try {
TeraInputFormat.writePartitionFile(job, partitionFile);
} catch (Throwable e) {
LOG.error(e.getMessage());
return -1;
}
job.addCacheFile(partitionUri);
long end = System.currentTimeMillis();
System.out.println("Spent " + (end - start) + "ms computing partitions.");
job.setPartitionerClass(TotalOrderPartitioner.class);
}
job.getConfiguration().setInt("dfs.replication", getOutputReplication(job));
TeraOutputFormat.setFinalSync(job, true);
int ret = job.waitForCompletion(true) ? 0 : 1;
LOG.info("done");
return ret;
}
@VisibleForTesting
public Job createJob(int numMapper, int numReducer, int iReduceStagesCount,
int numIReducer, long mapSleepTime, int mapSleepCount,
long reduceSleepTime, int reduceSleepCount,
long iReduceSleepTime, int iReduceSleepCount)
throws IOException {
Configuration conf = getConf();
conf.setLong(MAP_SLEEP_TIME, mapSleepTime);
conf.setLong(REDUCE_SLEEP_TIME, reduceSleepTime);
conf.setLong(IREDUCE_SLEEP_TIME, iReduceSleepTime);
conf.setInt(MAP_SLEEP_COUNT, mapSleepCount);
conf.setInt(REDUCE_SLEEP_COUNT, reduceSleepCount);
conf.setInt(IREDUCE_SLEEP_COUNT, iReduceSleepCount);
conf.setInt(MRJobConfig.NUM_MAPS, numMapper);
conf.setInt(IREDUCE_STAGES_COUNT, iReduceStagesCount);
conf.setInt(IREDUCE_TASKS_COUNT, numIReducer);
// Configure intermediate reduces
conf.setInt(
org.apache.tez.mapreduce.hadoop.MRJobConfig.MRR_INTERMEDIATE_STAGES,
iReduceStagesCount);
LOG.info("Running MRR with " + iReduceStagesCount + " IR stages");
for (int i = 1; i <= iReduceStagesCount; ++i) {
// Set reducer class for intermediate reduce
conf.setClass(
MultiStageMRConfigUtil.getPropertyNameForIntermediateStage(i,
"mapreduce.job.reduce.class"), ISleepReducer.class, Reducer.class);
// Set reducer output key class
conf.setClass(
MultiStageMRConfigUtil.getPropertyNameForIntermediateStage(i,
"mapreduce.map.output.key.class"), IntWritable.class, Object.class);
// Set reducer output value class
conf.setClass(
MultiStageMRConfigUtil.getPropertyNameForIntermediateStage(i,
"mapreduce.map.output.value.class"), IntWritable.class, Object.class);
conf.setInt(
MultiStageMRConfigUtil.getPropertyNameForIntermediateStage(i,
"mapreduce.job.reduces"), numIReducer);
}
Job job = Job.getInstance(conf, "sleep");
job.setNumReduceTasks(numReducer);
job.setJarByClass(MRRSleepJob.class);
job.setNumReduceTasks(numReducer);
job.setMapperClass(SleepMapper.class);
job.setMapOutputKeyClass(IntWritable.class);
job.setMapOutputValueClass(IntWritable.class);
job.setReducerClass(SleepReducer.class);
job.setOutputFormatClass(NullOutputFormat.class);
job.setInputFormatClass(SleepInputFormat.class);
job.setPartitionerClass(MRRSleepJobPartitioner.class);
job.setSpeculativeExecution(false);
job.setJobName("Sleep job");
FileInputFormat.addInputPath(job, new Path("ignored"));
return job;
}
public static void configureJob(Job job) {
job.setPartitionerClass(RowPartitioner.class);
}
/**
* The MapReduce driver - setup and launch the job.
*
* @param args the command-line arguments
* @return the process exit code
* @throws Exception if something goes wrong
*/
public int run(final String[] args) throws Exception {
int numReducers = 2;
Cli cli = Cli.builder().setArgs(args).addOptions(CliOpts.values()).build();
int result = cli.runCmd();
if (result != 0) {
return result;
}
Path input = new Path(cli.getArgValueAsString(CliOpts.INPUT));
Path partitionFile = new Path(cli.getArgValueAsString(CliOpts.PARTITION));
Path output = new Path(cli.getArgValueAsString(CliOpts.OUTPUT));
InputSampler.Sampler<Text, Text> sampler =
new InputSampler.RandomSampler<Text, Text>
(0.1,
10000,
10);
Configuration conf = super.getConf();
Job job = new Job(conf);
job.setJarByClass(TotalSortMapReduce.class);
job.setNumReduceTasks(numReducers);
job.setInputFormatClass(KeyValueTextInputFormat.class);
job.setOutputFormatClass(TextOutputFormat.class);
job.setPartitionerClass(TotalOrderPartitioner.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(Text.class);
TotalOrderPartitioner.setPartitionFile(job.getConfiguration(), partitionFile);
FileInputFormat.setInputPaths(job, input);
FileOutputFormat.setOutputPath(job, output);
InputSampler.writePartitionFile(job, sampler);
URI partitionUri = new URI(partitionFile.toString() +
"#" + "_sortPartitioning");
DistributedCache.addCacheFile(partitionUri, conf);
if (job.waitForCompletion(true)) {
return 0;
}
return 1;
}
/**
* The MapReduce driver - setup and launch the job.
*
* @param args the command-line arguments
* @return the process exit code
* @throws Exception if something goes wrong
*/
public int run(final String[] args) throws Exception {
Cli cli = Cli.builder().setArgs(args).addOptions(IOOptions.values()).build();
int result = cli.runCmd();
if (result != 0) {
return result;
}
Path input = new Path(cli.getArgValueAsString(IOOptions.INPUT));
Path output = new Path(cli.getArgValueAsString(IOOptions.OUTPUT));
Configuration conf = super.getConf();
List<String> dates = Lists.newArrayList("2000-01-03",
"2001-01-02", "2002-01-02", "2003-01-02", "2004-01-02",
"2005-01-03", "2006-01-03", "2007-01-03", "2008-01-02",
"2009-01-02");
for (int partition = 0; partition < dates.size(); partition++) {
DatePartitioner.addPartitionToConfig(conf,
dates.get(partition), partition);
}
Job job = new Job(conf);
job.setJarByClass(CustomPartitionerJob.class);
job.setMapperClass(Map.class);
job.setReducerClass(Reducer.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(Text.class);
job.setPartitionerClass(DatePartitioner.class);
FileInputFormat.setInputPaths(job, input);
FileOutputFormat.setOutputPath(job, output);
job.setNumReduceTasks(10);
return job.waitForCompletion(true) ? 0 : 1;
}
@Override
public int run(String[] args)
throws Exception
{
Job job = Job.getInstance(getConf());
job.setJarByClass(Phase4RemoveDuplicatesUsingReduceSideJoins.class);
job.setJobName(Phase4RemoveDuplicatesUsingReduceSideJoins.class.getName());
// paths
// text files of ids to be deleted
String textFilePath = args[0];
// corpus with *.warc.gz
String commaSeparatedInputFiles = args[1];
// output
String outputPath = args[2];
//second input the look up text file
MultipleInputs.addInputPath(job, new Path(textFilePath), TextInputFormat.class,
JoinTextMapper.class);
//first input the data set (check comma separated availability)
MultipleInputs.addInputPath(job, new Path(commaSeparatedInputFiles), WARCInputFormat.class,
JoinWARCMapper.class);
job.setPartitionerClass(SourceJoiningKeyPartitioner.class);
job.setGroupingComparatorClass(SourceJoiningGroupingComparator.class);
job.setMapOutputKeyClass(CompositeKey.class);
job.setMapOutputValueClass(WARCWritable.class);
job.setReducerClass(JoinReducer.class);
job.setOutputFormatClass(WARCOutputFormat.class);
job.setOutputKeyClass(NullWritable.class);
job.setOutputValueClass(WARCWritable.class);
FileOutputFormat.setOutputPath(job, new Path(outputPath));
FileOutputFormat.setCompressOutput(job, true);
FileOutputFormat.setOutputCompressorClass(job, GzipCodec.class);
return job.waitForCompletion(true) ? 0 : 1;
}
public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
// TODO Auto-generated method stub
String hdfs = "hdfs://192.168.17.10:9000";
Configuration conf = new Configuration();
conf.set("fs.defaultFS", hdfs);
conf.set("mapreduce.app-submission.cross-platform", "true");
// 设置作业配置信息
String jobName = "TempSort";
Job job = Job.getInstance(conf, jobName);
job.setJarByClass(TempSort.class);
job.setJar("export\\TempSort.jar");
// Map
job.setMapperClass(TempSortMapper.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(IntWritable.class);
// Reduce
job.setReducerClass(TempSortReducer.class);
// 全局
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
// Sort
// job.setSortComparatorClass(MySort.class);
// Partition
job.setPartitionerClass(YearPartitioner.class);
job.setNumReduceTasks(3);
//3.设置作业输入和输出路径
String dataDir = "/expr/test/data"; //实验数据目录
String outputDir = "/expr/test/output"; //实验输出目录
Path inPath = new Path(hdfs + dataDir);
Path outPath = new Path(hdfs + outputDir);
FileInputFormat.addInputPath(job, inPath);
FileOutputFormat.setOutputPath(job, outPath);
FileSystem fs = FileSystem.get(conf);
if(fs.exists(outPath)) {
fs.delete(outPath, true);
}
//4.运行作业
System.out.println("Job: " + jobName + " is running...");
if(job.waitForCompletion(true)) {
System.out.println("success!");
System.exit(0);
} else {
System.out.println("failed!");
System.exit(1);
}
}
public static void main(String[] args) throws Exception {
//1.设置HDFS配置信息
String namenode_ip = "192.168.17.10";
String hdfs = "hdfs://" + namenode_ip + ":9000";
Configuration conf = new Configuration();
conf.set("fs.defaultFS", hdfs);
conf.set("mapreduce.app-submission.cross-platform", "true");
//2.设置MapReduce作业配置信息
String jobName = "DatePartition"; //定义作业名称
Job job = Job.getInstance(conf, jobName);
job.setJarByClass(DatePartition.class); //指定运行时作业类
job.setJar("export\\DatePartition.jar"); //指定本地jar包
// Map
job.setMapperClass(DatePartitionMapper.class); //指定Mapper类
job.setMapOutputKeyClass(Text.class); //设置Mapper输出Key类型
job.setMapOutputValueClass(IntWritable.class); //设置Mapper输出Value类型
// Reduce
job.setReducerClass(DatePartitionReducer.class); //指定Reducer类
// 全局
job.setOutputKeyClass(Text.class); //设置Reduce输出Key类型
job.setOutputValueClass(IntWritable.class); //设置Reduce输出Value类型
// Partition
job.setPartitionerClass(YearPartitioner.class); //自定义分区方法
job.setNumReduceTasks(10); //设置reduce任务的数量,该值传递给Partitioner.getPartition()方法的numPartitions参数
//3.设置作业输入和输出路径
String dataDir = "/expr/datecount/data"; //实验数据目录
String outputDir = "/expr/datecount/output_partition"; //实验输出目录
Path inPath = new Path(hdfs + dataDir);
Path outPath = new Path(hdfs + outputDir);
FileInputFormat.addInputPath(job, inPath);
FileOutputFormat.setOutputPath(job, outPath);
FileSystem fs = FileSystem.get(conf);
if(fs.exists(outPath)) {
fs.delete(outPath, true);
}
//4.运行作业
System.out.println("Job: " + jobName + " is running...");
if(job.waitForCompletion(true)) {
System.out.println("success!");
System.exit(0);
} else {
System.out.println("failed!");
System.exit(1);
}
}
protected int runHalvadeJob(Configuration halvadeConf, String tmpOutDir, int jobType) throws IOException, URISyntaxException, InterruptedException, ClassNotFoundException {
String pipeline = "";
if(jobType == HalvadeResourceManager.RNA_SHMEM_PASS2) {
HalvadeConf.setIsPass2(halvadeConf, true);
HalvadeResourceManager.setJobResources(halvadeOpts, halvadeConf, jobType, false, halvadeOpts.useBamInput);
pipeline = RNA_PASS2;
} else if(jobType == HalvadeResourceManager.DNA) {
HalvadeResourceManager.setJobResources(halvadeOpts, halvadeConf, jobType, false, halvadeOpts.useBamInput);
pipeline = DNA;
}
halvadeOpts.splitChromosomes(halvadeConf, 0);
HalvadeConf.setOutDir(halvadeConf, tmpOutDir);
FileSystem outFs = FileSystem.get(new URI(tmpOutDir), halvadeConf);
if (outFs.exists(new Path(tmpOutDir))) {
Logger.INFO("The output directory \'" + tmpOutDir + "\' already exists.");
Logger.INFO("ERROR: Please remove this directory before trying again.");
System.exit(-2);
}
if(halvadeOpts.useBamInput)
setHeaderFile(halvadeOpts.in, halvadeConf);
if(halvadeOpts.rnaPipeline)
HalvadeConf.setPass2Suffix(halvadeConf, pass2suffix);
Job halvadeJob = Job.getInstance(halvadeConf, "Halvade" + pipeline);
halvadeJob.addCacheArchive(new URI(halvadeOpts.halvadeBinaries));
halvadeJob.setJarByClass(be.ugent.intec.halvade.hadoop.mapreduce.HalvadeMapper.class);
addInputFiles(halvadeOpts.in, halvadeConf, halvadeJob);
FileOutputFormat.setOutputPath(halvadeJob, new Path(tmpOutDir));
if(jobType == HalvadeResourceManager.RNA_SHMEM_PASS2) {
halvadeJob.setMapperClass(be.ugent.intec.halvade.hadoop.mapreduce.StarAlignPassXMapper.class);
halvadeJob.setReducerClass(be.ugent.intec.halvade.hadoop.mapreduce.RnaGATKReducer.class);
} else if(jobType == HalvadeResourceManager.DNA){
halvadeJob.setMapperClass(halvadeOpts.alignmentTools[halvadeOpts.aln]);
halvadeJob.setReducerClass(be.ugent.intec.halvade.hadoop.mapreduce.DnaGATKReducer.class);
}
halvadeJob.setMapOutputKeyClass(ChromosomeRegion.class);
halvadeJob.setMapOutputValueClass(SAMRecordWritable.class);
halvadeJob.setInputFormatClass(HalvadeTextInputFormat.class);
halvadeJob.setOutputKeyClass(Text.class);
if(halvadeOpts.mergeBam) {
halvadeJob.setSortComparatorClass(SimpleChrRegionComparator.class);
halvadeJob.setOutputValueClass(SAMRecordWritable.class);
}else {
halvadeJob.setPartitionerClass(ChrRgPartitioner.class);
halvadeJob.setSortComparatorClass(ChrRgSortComparator.class);
halvadeJob.setGroupingComparatorClass(ChrRgGroupingComparator.class);
halvadeJob.setOutputValueClass(VariantContextWritable.class);
}
if(halvadeOpts.justAlign && !halvadeOpts.mergeBam)
halvadeJob.setNumReduceTasks(0);
else if (halvadeOpts.mergeBam) {
halvadeJob.setReducerClass(be.ugent.intec.halvade.hadoop.mapreduce.BamMergeReducer.class);
halvadeJob.setNumReduceTasks(1);
} else {
halvadeJob.setNumReduceTasks(halvadeOpts.reduces);
if(halvadeOpts.countOnly) {
halvadeJob.setReducerClass(be.ugent.intec.halvade.hadoop.mapreduce.CountReadsReducer.class);
halvadeJob.setOutputValueClass(LongWritable.class);
}
}
if(halvadeOpts.useBamInput) {
halvadeJob.setMapperClass(be.ugent.intec.halvade.hadoop.mapreduce.AlignedBamMapper.class);
halvadeJob.setInputFormatClass(BAMInputFormat.class);
}
return runTimedJob(halvadeJob, "Halvade Job");
}
public int run(String[] args) throws Exception {
//创建job
Configuration config = getConf();
//添加自定义配置
config.addResource("mr.xml");
Job job = Job.getInstance(config);
//通过job设置一些参数
//通过job设置一些参数
job.setJarByClass(ParseLogJob.class);
job.setJobName("parselog");
job.setMapperClass(LogMapper.class);
job.setReducerClass(LogReducer.class);
job.setMapOutputKeyClass(TextLongWritable.class);
job.setGroupingComparatorClass(TextLongGroupComparator.class);
job.setPartitionerClass(TextLongPartition.class);
job.setMapOutputValueClass(LogWritable.class);
job.setOutputValueClass(Text.class);
//设置CombineFileInputFormat
job.setInputFormatClass(CombineTextInputFormat.class);
//添加分布式缓存
job.addCacheFile(new URI(config.get("ip.file.path")));
//设置OutputFormat
job.setOutputFormatClass(LogOutputFormat.class);
//添加输入和输出数据
FileInputFormat.addInputPath(job, new Path(args[0]));
Path outputPath = new Path(args[1]);
FileOutputFormat.setOutputPath(job, outputPath);
//设置压缩类型
// FileOutputFormat.setCompressOutput(job, true);
// FileOutputFormat.setOutputCompressorClass(job, LzopCodec.class);
FileSystem fs = FileSystem.get(config);
if (fs.exists(outputPath)) {
fs.delete(outputPath, true);
}
//运行程序
if (!job.waitForCompletion(true)) {
throw new RuntimeException(job.getJobName() + "failed!");
}
return 0;
}
@Override
public int run(String[] args) throws Exception {
Job job = Job.getInstance(getConf());
Configuration configuration = job.getConfiguration();
job.setJarByClass(Aegisthus.class);
CommandLine cl = getOptions(args);
if (cl == null) {
return 1;
}
// Check all of the paths and load the sstable version from the input filenames
List<Path> paths = Lists.newArrayList();
if (cl.hasOption(Feature.CMD_ARG_INPUT_FILE)) {
for (String input : cl.getOptionValues(Feature.CMD_ARG_INPUT_FILE)) {
paths.add(new Path(input));
}
}
if (cl.hasOption(Feature.CMD_ARG_INPUT_DIR)) {
paths.addAll(getDataFiles(configuration, cl.getOptionValue(Feature.CMD_ARG_INPUT_DIR)));
}
LOG.info("Processing paths: {}", paths);
// At this point we have the version of sstable that we can use for this run
Descriptor.Version version = Descriptor.Version.CURRENT;
if (cl.hasOption(Feature.CMD_ARG_SSTABLE_OUTPUT_VERSION)) {
version = new Descriptor.Version(cl.getOptionValue(Feature.CMD_ARG_SSTABLE_OUTPUT_VERSION));
}
configuration.set(Feature.CONF_SSTABLE_VERSION, version.toString());
if (configuration.get(Feature.CONF_CQL_SCHEMA) != null) {
setConfigurationFromCql(configuration);
}
if(cl.hasOption(Feature.CMD_ARG_COMBINE_SPLITS)) {
job.setInputFormatClass(AegisthusCombinedInputFormat.class);
} else {
job.setInputFormatClass(AegisthusInputFormat.class);
}
job.setMapOutputKeyClass(AegisthusKey.class);
job.setMapOutputValueClass(AtomWritable.class);
job.setOutputKeyClass(AegisthusKey.class);
job.setOutputValueClass(RowWritable.class);
job.setMapperClass(AegisthusKeyMapper.class);
job.setReducerClass(CassSSTableReducer.class);
job.setGroupingComparatorClass(AegisthusKeyGroupingComparator.class);
job.setPartitionerClass(AegisthusKeyPartitioner.class);
job.setSortComparatorClass(AegisthusKeySortingComparator.class);
TextInputFormat.setInputPaths(job, paths.toArray(new Path[paths.size()]));
if (cl.hasOption(Feature.CMD_ARG_PRODUCE_SSTABLE)) {
job.setOutputFormatClass(SSTableOutputFormat.class);
} else {
job.setOutputFormatClass(JsonOutputFormat.class);
}
CustomFileNameFileOutputFormat.setOutputPath(job, new Path(cl.getOptionValue(Feature.CMD_ARG_OUTPUT_DIR)));
job.submit();
if (configuration.getBoolean(Feature.CONF_SHUTDOWN_HOOK, true)) {
Runtime.getRuntime().addShutdownHook(new JobKiller(job));
}
System.out.println(job.getJobID());
System.out.println(job.getTrackingURL());
boolean success = job.waitForCompletion(true);
if (success) {
Counter errorCounter = job.getCounters().findCounter("aegisthus", "error_skipped_input");
long errorCount = errorCounter != null ? errorCounter.getValue() : 0L;
int maxAllowed = configuration.getInt(Feature.CONF_MAX_CORRUPT_FILES_TO_SKIP, 0);
if (errorCounter != null && errorCounter.getValue() > maxAllowed) {
LOG.error("Found {} corrupt files which is greater than the max allowed {}", errorCount, maxAllowed);
success = false;
} else if (errorCount > 0) {
LOG.warn("Found {} corrupt files but not failing the job because the max allowed is {}",
errorCount, maxAllowed);
}
}
return success ? 0 : 1;
}
public int run(String[] args) throws Exception {
//创建job
Configuration config = getConf();
//添加自定义配置
config.addResource("mr.xml");
Job job = Job.getInstance(config);
//通过job设置一些参数
job.setJarByClass(ParseLogJob.class);
job.setJobName("parselog");
job.setMapperClass(LogMapper.class);
job.setReducerClass(LogReducer.class);
job.setMapOutputKeyClass(TextLongWritable.class);
job.setGroupingComparatorClass(TextLongGroupComparator.class);
job.setPartitionerClass(TextLongPartition.class);
job.setMapOutputValueClass(LogWritable.class);
job.setOutputValueClass(Text.class);
//设置CombineFileInputFormat
job.setInputFormatClass(CombineTextInputFormat.class);
//添加分布式缓存
job.addCacheFile(new URI(config.get("ip.file.path")));
//添加输入和输出数据
FileInputFormat.addInputPath(job, new Path(args[0]));
Path outputPath = new Path(args[1]);
FileOutputFormat.setOutputPath(job, outputPath);
//设置压缩类型
// FileOutputFormat.setCompressOutput(job, true);
// FileOutputFormat.setOutputCompressorClass(job, LzopCodec.class);
FileSystem fs = FileSystem.get(config);
if (fs.exists(outputPath)) {
fs.delete(outputPath, true);
}
//运行程序
if (!job.waitForCompletion(true)) {
throw new RuntimeException(job.getJobName() + "failed!");
}
return 0;
}
public int run(String[] args) throws Exception {
//创建job
Configuration config = getConf();
//添加自定义配置
config.addResource("mr.xml");
Job job = Job.getInstance(config);
//通过job设置一些参数
job.setJarByClass(ParseLogJob.class);
job.setJobName("parselog");
job.setMapperClass(LogMapper.class);
job.setReducerClass(LogReducer.class);
job.setMapOutputKeyClass(TextLongWritable.class);
job.setGroupingComparatorClass(TextLongGroupComparator.class);
job.setPartitionerClass(TextLongPartition.class);
job.setMapOutputValueClass(LogWritable.class);
job.setOutputValueClass(Text.class);
//添加分布式缓存
job.addCacheFile(new URI(config.get("ip.file.path")));
//添加输入和输出数据
FileInputFormat.addInputPath(job, new Path(args[0]));
Path outputPath = new Path(args[1]);
FileOutputFormat.setOutputPath(job, outputPath);
//设置压缩类型
// FileOutputFormat.setCompressOutput(job, true);
// FileOutputFormat.setOutputCompressorClass(job, LzopCodec.class);
FileSystem fs = FileSystem.get(config);
if (fs.exists(outputPath)) {
fs.delete(outputPath, true);
}
//运行程序
if (!job.waitForCompletion(true)) {
throw new RuntimeException(job.getJobName() + "failed!");
}
return 0;
}
public int run(String[] args) throws Exception {
//创建job
Configuration config = getConf();
//添加自定义配置
config.addResource("mr.xml");
Job job = Job.getInstance(config);
//通过job设置一些参数
job.setJarByClass(ParseLogJob.class);
job.setJobName("parselog");
job.setMapperClass(LogMapper.class);
job.setReducerClass(LogReducer.class);
job.setMapOutputKeyClass(TextLongWritable.class);
job.setGroupingComparatorClass(TextLongGroupComparator.class);
job.setPartitionerClass(TextLongPartition.class);
job.setMapOutputValueClass(LogWritable.class);
job.setOutputValueClass(Text.class);
//设置CombineFileInputFormat
job.setInputFormatClass(CombineTextInputFormat.class);
//添加分布式缓存
job.addCacheFile(new URI(config.get("ip.file.path")));
//设置OutputFormat
job.setOutputFormatClass(LogOutputFormat.class);
//添加输入和输出数据
FileInputFormat.addInputPath(job, new Path(args[0]));
Path outputPath = new Path(args[1]);
FileOutputFormat.setOutputPath(job, outputPath);
//设置压缩类型
// FileOutputFormat.setCompressOutput(job, true);
// FileOutputFormat.setOutputCompressorClass(job, LzopCodec.class);
FileSystem fs = FileSystem.get(config);
if (fs.exists(outputPath)) {
fs.delete(outputPath, true);
}
//运行程序
if (!job.waitForCompletion(true)) {
throw new RuntimeException(job.getJobName() + "failed!");
}
return 0;
}