下面列出了org.apache.hadoop.mapreduce.Job#setOutputKeyClass ( ) 实例代码,或者点击链接到github查看源代码,也可以在右侧发表评论。
public static Job getHdfsJob(Configuration conf, TaskConfig taskConfig, IndexInfo indexInfo) throws Exception {
Job job = Job.getInstance(conf, MAIN_CLASS);
job.setJobName("DidiFastIndex_" + taskConfig.getEsTemplate());
job.setJarByClass(FastIndex.class);
job.setMapperClass(FastIndexMapper.class);
job.setInputFormatClass(HCatInputFormat.class);
job.setMapOutputKeyClass(IntWritable.class);
job.setMapOutputValueClass(DefaultHCatRecord.class);
HCatInputFormat.setInput(job, taskConfig.getHiveDB(), taskConfig.getHiveTable(), taskConfig.getFilterStr());
job.setReducerClass(FastIndexReducer.class);
job.setOutputKeyClass(NullWritable.class);
job.setOutputValueClass(NullWritable.class);
job.setNumReduceTasks(indexInfo.getReducerNum());
job.setOutputFormatClass(TextOutputFormat.class);
FileOutputFormat.setOutputPath(job, new Path(taskConfig.getHdfsMROutputPath()));
return job;
}
@SuppressWarnings("deprecation")
public static String runSafely (String[] args, long n) throws IOException, InterruptedException, ClassNotFoundException {
Configuration conf= new Configuration();
FileSystem hdfs=FileSystem.get(conf);
// Deleting previous stored nth row
hdfs.delete(new Path(args[1]));
conf.setLong("n", n);
Job job = new Job(conf);
job.setJarByClass(find_nth_driver.class);
job.setJobName("Finds the nth row of the HDFS file");
FileInputFormat.setInputPaths(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));
job.setMapperClass(find_nth_mapper.class);
job.setNumReduceTasks(0);
job.setOutputKeyClass(NullWritable.class);
job.setOutputValueClass(LongAndTextWritable.class);
job.waitForCompletion(true);
return readNthRow(args[1], conf);
}
@Override
public int run(String[] args) throws Exception {
Configuration conf = new Configuration();
GenericOptionsParser parser = new GenericOptionsParser(conf, args);
String[] otherArgs = parser.getRemainingArgs();
if (otherArgs.length != 3) {
System.err.println("Usage: DistributedGrep <regex> <in> <out>");
ToolRunner.printGenericCommandUsage(System.err);
System.exit(2);
}
Job job = new Job(conf, "Distributed Grep");
job.setJarByClass(DistributedGrep.class);
job.setMapperClass(GrepMapper.class);
job.setOutputKeyClass(NullWritable.class);
job.setOutputValueClass(Text.class);
job.getConfiguration().set(REGEX_KEY, otherArgs[0]);
FileInputFormat.addInputPath(job, new Path(otherArgs[1]));
FileOutputFormat.setOutputPath(job, new Path(otherArgs[2]));
boolean success = job.waitForCompletion(true);
return success ? 0 : 1;
}
public static void runSortJob(Configuration conf, Path input, Path outputPath)
throws Exception {
Job job = new Job(conf);
job.setJarByClass(Main.class);
job.setMapperClass(SortMapReduce.Map.class);
job.setReducerClass(SortMapReduce.Reduce.class);
job.setInputFormatClass(KeyValueTextInputFormat.class);
job.setMapOutputKeyClass(Person.class);
job.setMapOutputValueClass(Person.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Text.class);
job.setPartitionerClass(PersonNamePartitioner.class);
job.setSortComparatorClass(PersonComparator.class);
job.setGroupingComparatorClass(PersonNameComparator.class);
FileInputFormat.setInputPaths(job, input);
FileOutputFormat.setOutputPath(job, outputPath);
job.waitForCompletion(true);
}
public static void main(String[] args) throws Exception {
Configuration conf = new Configuration();
if (args.length < 2) {
System.err.println("Usage: LinkCountHDFS inputDir outputDir");
System.exit(2);
}
String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();
Job job = Job.getInstance(conf, "link count hdfs");
job.setJarByClass(LinkCountHDFS.class);
job.setInputFormatClass(HDFSInputFormat.class);
job.setMapperClass(RefMapper.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(IntWritable.class);
job.setReducerClass(IntSumReducer.class);
job.setOutputFormatClass(TextOutputFormat.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
HDFSInputFormat.setInputPaths(job, new Path(otherArgs[0]));
FileOutputFormat.setOutputPath(job, new Path(otherArgs[1]));
System.exit(job.waitForCompletion(true) ? 0 : 1);
}
@Override
public int run(String[] args) throws Exception {
if (args.length != 2) {
System.err.println("Usage: wordstddev <in> <out>");
return 0;
}
Configuration conf = getConf();
Job job = Job.getInstance(conf, "word stddev");
job.setJarByClass(WordStandardDeviation.class);
job.setMapperClass(WordStandardDeviationMapper.class);
job.setCombinerClass(WordStandardDeviationReducer.class);
job.setReducerClass(WordStandardDeviationReducer.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(LongWritable.class);
FileInputFormat.addInputPath(job, new Path(args[0]));
Path outputpath = new Path(args[1]);
FileOutputFormat.setOutputPath(job, outputpath);
boolean result = job.waitForCompletion(true);
// read output and calculate standard deviation
stddev = readAndCalcStdDev(outputpath, conf);
return (result ? 0 : 1);
}
private Configuration createConf() throws IOException {
Configuration conf = HdpBootstrap.hadoopConfig();
HadoopCfgUtils.setGenericOptions(conf);
Job job = new Job(conf);
job.setInputFormatClass(EsInputFormat.class);
job.setOutputFormatClass(PrintStreamOutputFormat.class);
job.setOutputKeyClass(Text.class);
boolean type = random.nextBoolean();
Class<?> mapType = (type ? MapWritable.class : LinkedMapWritable.class);
job.setOutputValueClass(mapType);
conf.set(ConfigurationOptions.ES_QUERY, query);
conf.set(ConfigurationOptions.ES_READ_METADATA, String.valueOf(readMetadata));
conf.set(ConfigurationOptions.ES_OUTPUT_JSON, String.valueOf(readAsJson));
new QueryTestParams(tempFolder).provisionQueries(conf);
job.setNumReduceTasks(0);
//PrintStreamOutputFormat.stream(conf, Stream.OUT);
Configuration cfg = job.getConfiguration();
HdpBootstrap.addProperties(cfg, TestSettings.TESTING_PROPS, false);
return cfg;
}
public static void main(String[] args) throws Exception {
Configuration conf = new Configuration();
String[] otherArgs = new GenericOptionsParser(conf,args).getRemainingArgs();
// System.out.println(otherArgs);
if(otherArgs.length != 2) {
System.out.println("Usage:wordcount <in> <out>");
System.exit(2);
}
// if(args.length != 2) {
// System.out.println("param error!");
// System.exit(-1);
// }
Job job = new Job(conf, "word count");
job.setJarByClass(WordCount.class);
job.setMapperClass(TokenizerMapper.class);
job.setCombinerClass(IntSumReducer.class);
job.setReducerClass(IntSumReducer.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
FileInputFormat.addInputPath(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));
System.exit(job.waitForCompletion(true) ? 0 : 1);
}
private boolean analyze(final String inputFilePath,
final String outputFilePath,
final Long startTime) throws Exception {
Configuration conf = new Configuration();
conf.setLong(Holistic.START_TIME, startTime);
conf.setLong(Holistic.EXECUTE_TIME, executeHourTime);
Job jobAnalyze = Job.getInstance(conf, "analyze");
jobAnalyze.setJarByClass(Holistic.class);
MultipleOutputs.addNamedOutput(jobAnalyze, MapKeyConfig.NEW_OLD_CUSTOMER,
TextOutputFormat.class, KeyWrapper.class, Text.class);
MultipleOutputs.addNamedOutput(jobAnalyze, MapKeyConfig.CUSTOMER_FLOW_KEY,
TextOutputFormat.class, KeyWrapper.class, Text.class);
MultipleOutputs.addNamedOutput(jobAnalyze, MapKeyConfig.CYCLE,
TextOutputFormat.class, KeyWrapper.class, Text.class);
MultipleOutputs.addNamedOutput(jobAnalyze, MapKeyConfig.IN_STORE_HOUR,
TextOutputFormat.class, KeyWrapper.class, Text.class);
jobAnalyze.setMapperClass(AnalysisMapper.class);
jobAnalyze.setReducerClass(AnalysisReducer.class);
jobAnalyze.setCombinerClass(AnalysisCombiner.class);
jobAnalyze.setOutputKeyClass(LongWritable.class);
jobAnalyze.setOutputValueClass(Text.class);
jobAnalyze.setMapOutputKeyClass(KeyWrapper.class);
jobAnalyze.setMapOutputValueClass(ValueWrapper.class);
FileInputFormat.addInputPath(jobAnalyze, new Path(inputFilePath));
FileOutputFormat.setOutputPath(jobAnalyze, new Path(outputFilePath));
return jobAnalyze.waitForCompletion(true) ;
}
@Override
public int run(String[] args) throws Exception {
Configuration conf = this.getConf();
// 初始化参数
this.processArgs(conf, args);
// 创建job
Job job = Job.getInstance(conf, "active_user");
// 设置job相关配置参数
job.setJarByClass(ActiveUserRunner.class);
// hbase 输入mapper参数
// 1. 本地运行
TableMapReduceUtil.initTableMapperJob(this.initScans(job), ActiveUserMapper.class, StatsUserDimension.class, TimeOutputValue.class, job, false);
// 2. 集群运行
// TableMapReduceUtil.initTableMapperJob(null, ActiveUserMapper.class,
// StatsUserDimension.class, TimeOutputValue.class, job);
// 设置reducer相关参数
job.setReducerClass(ActiveUserReducer.class);
job.setOutputKeyClass(StatsUserDimension.class);
job.setOutputValueClass(MapWritableValue.class);
// 设置output相关参数
job.setOutputFormatClass(TransformerOutputFormat.class);
// 开始毫秒数
long startTime = System.currentTimeMillis();
try {
return job.waitForCompletion(true) ? 0 : -1;
} finally {
// 结束的毫秒数
long endTime = System.currentTimeMillis();
logger.info("Job<" + job.getJobName() + ">是否执行成功:" + job.isSuccessful() + "; 开始时间:" + startTime + "; 结束时间:" + endTime + "; 用时:" + (endTime - startTime) + "ms");
}
}
public int run(String[] args) throws Exception {
GfxdDataSerializable.initTypes();
Configuration conf = getConf();
Path outputPath = new Path(args[0]);
String hdfsHomeDir = args[1];
String tableName = args[2];
outputPath.getFileSystem(conf).delete(outputPath, true);
conf.set(RowInputFormat.HOME_DIR, hdfsHomeDir);
conf.set(RowInputFormat.INPUT_TABLE, tableName);
conf.setBoolean(RowInputFormat.CHECKPOINT_MODE, false);
Job job = Job.getInstance(conf, "Busy Leg Count");
job.setInputFormatClass(RowInputFormat.class);
// configure mapper and reducer
job.setMapperClass(SampleMapper.class);
job.setCombinerClass(IntSumReducer.class);
job.setReducerClass(IntSumReducer.class);
// configure output
TextOutputFormat.setOutputPath(job, outputPath);
job.setOutputFormatClass(TextOutputFormat.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
return job.waitForCompletion(true) ? 0 : 1;
}
/**
* The MapReduce driver - setup and launch the job.
*
* @param args the command-line arguments
* @return the process exit code
* @throws Exception if something goes wrong
*/
public int run(final String[] args) throws Exception {
Cli cli = Cli.builder().setArgs(args).addOptions(CliCommonOpts.MrIoOpts.values()).build();
int result = cli.runCmd();
if (result != 0) {
return result;
}
Path inputPath = new Path(cli.getArgValueAsString(CliCommonOpts.MrIoOpts.INPUT));
Path outputPath = new Path(cli.getArgValueAsString(CliCommonOpts.MrIoOpts.OUTPUT));
Configuration conf = super.getConf();
Job job = new Job(conf);
job.setJarByClass(SortMapReduce.class);
job.setMapperClass(Map.class);
job.setReducerClass(Reduce.class);
job.setInputFormatClass(KeyValueTextInputFormat.class);
job.setMapOutputKeyClass(Person.class);
job.setMapOutputValueClass(Text.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Text.class);
job.setPartitionerClass(PersonNamePartitioner.class);
job.setSortComparatorClass(PersonComparator.class);
job.setGroupingComparatorClass(PersonNameComparator.class);
FileInputFormat.setInputPaths(job, inputPath);
FileOutputFormat.setOutputPath(job, outputPath);
if (job.waitForCompletion(true)) {
return 0;
}
return 1;
}
public static void main(String[] args) throws Exception {
//1.设置HDFS配置信息
String namenode_ip = "192.168.17.10";
String hdfs = "hdfs://" + namenode_ip + ":9000";
Configuration conf = new Configuration();
conf.set("fs.defaultFS", hdfs);
conf.set("mapreduce.app-submission.cross-platform", "true");
//2.设置MapReduce作业配置信息
String jobName = "FlowCount"; //作业名称
Job job = Job.getInstance(conf, jobName);
job.setJarByClass(FlowCount.class); //指定运行时作业类
job.setJar("export\\FlowCount.jar"); //指定本地jar包
job.setMapperClass(FlowCountMapper.class); //指定Mapper类
job.setMapOutputKeyClass(Text.class); //设置Mapper输出Key类型
job.setMapOutputValueClass(IntWritable.class); //设置Mapper输出Value类型
job.setReducerClass(FlowCountReducer.class); //指定Reducer类
job.setOutputKeyClass(Text.class); //设置Reduce输出Key类型
job.setOutputValueClass(IntWritable.class); //设置Reduce输出Value类型
//3.设置作业输入和输出路径
String dataDir = "/expr/weblog/data"; //实验数据目录
String outputDir = "/expr/weblog/output1"; //实验输出目录
Path inPath = new Path(hdfs + dataDir);
Path outPath = new Path(hdfs + outputDir);
FileInputFormat.addInputPath(job, inPath);
FileOutputFormat.setOutputPath(job, outPath);
FileSystem fs = FileSystem.get(conf);
if(fs.exists(outPath)) {
fs.delete(outPath, true);
}
//4.运行作业
System.out.println("Job: " + jobName + " is running...");
if(job.waitForCompletion(true)) {
System.out.println("success!");
System.exit(0);
} else {
System.out.println("failed!");
System.exit(1);
}
}
public static void main(String[] args)
throws IOException, InterruptedException, ClassNotFoundException {
// GenericOptionsParser is a utility to parse command line arguments generic to the Hadoop
// framework. This example won't cover the specifics, but will recognize several standard
// command line arguments, enabling applications to easily specify a namenode, a
// ResourceManager, additional configuration resources etc.
GenericOptionsParser parser = new GenericOptionsParser(args);
args = parser.getRemainingArgs();
// Make sure we have the right parameters.
if (args.length != 3) {
System.out.println(
"Usage: hadoop jar bigquery_wordcount.jar [ProjectId] [QualifiedInputTableId] "
+ "[GcsOutputPath]\n"
+ " ProjectId - Project under which to issue the BigQuery operations. Also "
+ "serves as the default project for table IDs which don't explicitly specify a "
+ "project for the table.\n"
+ " QualifiedInputTableId - Input table ID of the form "
+ "(Optional ProjectId):[DatasetId].[TableId]\n"
+ " OutputPath - The output path to write data, e.g. "
+ "gs://bucket/dir/");
System.exit(1);
}
// Get the individual parameters from the command line.
String projectId = args[0];
String inputQualifiedTableId = args[1];
String outputPath = args[2];
// Create the job and get its configuration.
Job job = new Job(parser.getConfiguration(), "wordcount");
Configuration conf = job.getConfiguration();
// Set the job-level projectId.
conf.set(PROJECT_ID.getKey(), projectId);
// Configure input and output.
BigQueryConfiguration.configureBigQueryInput(conf, inputQualifiedTableId);
// Set column and predicate filters
conf.set(SELECTED_FIELDS.getKey(), "word,word_count");
conf.set(SQL_FILTER.getKey(), "word >= 'A' AND word <= 'zzz'");
conf.set(MRJobConfig.NUM_MAPS, "999");
// This helps Hadoop identify the Jar which contains the mapper and reducer by specifying a
// class in that Jar. This is required if the jar is being passed on the command line to Hadoop.
job.setJarByClass(DirectBigQueryWordCount.class);
// Tell the job what the output will be.
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(LongWritable.class);
job.setMapperClass(Map.class);
job.setReducerClass(Reduce.class);
job.setInputFormatClass(DirectBigQueryInputFormat.class);
job.setOutputFormatClass(TextOutputFormat.class);
FileOutputFormat.setOutputPath(job, new Path(outputPath));
job.waitForCompletion(true);
}
public int run(String[] args) throws Exception {
Configuration conf = getConf();
if (args.length != 6) {
System.err
.println("Usage: AddBalance <redis hosts> <hash prefix> <balance> <partition length> <begin> <end>");
System.exit(1);
}
String host = args[0];
String hashPrefix = args[1];
String balance = args[2];
String pLength = args[3];
String begin = args[4];
String end = args[5];
Job job = Job.getInstance(conf, "Add Balance");
job.setNumReduceTasks(0);
job.setJarByClass(Application.class);
job.setMapperClass(RedisOutputMapper.class);
RedisOutputMapper.setBalance(job,balance);
job.setInputFormatClass(RedisHashInputFormat.class);
RedisHashInputFormat.setRedisHost(job, host);
RedisHashInputFormat.setHashPrefix(job, hashPrefix);
RedisHashInputFormat.setBegin(job, begin);
RedisHashInputFormat.setEnd(job, end);
RedisHashInputFormat.setPLength(job, pLength);
job.setOutputFormatClass(RedisHashOutputFormat.class);
RedisHashOutputFormat.setRedisHost(job, host);
RedisHashOutputFormat.setPLength(job, pLength);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Text.class);
//Wait for job completion
return (job.waitForCompletion(true) ? 0 : 1);
}
public static void main(String[] args) throws Exception {
//1.设置HDFS配置信息
String namenode_ip = "192.168.17.10";
String hdfs = "hdfs://" + namenode_ip + ":9000";
Configuration conf = new Configuration();
conf.set("fs.defaultFS", hdfs);
conf.set("mapreduce.app-submission.cross-platform", "true");
//2.设置MapReduce作业配置信息
String jobName = "IPCount"; //作业名称
Job job = Job.getInstance(conf, jobName);
job.setJarByClass(IPCount.class); //指定运行时作业类
job.setJar("export\\IPCount.jar"); //指定本地jar包
job.setMapperClass(IPCountMapper.class); //指定Mapper类
job.setMapOutputKeyClass(DayAndIp.class); //设置Mapper输出Key类型
job.setMapOutputValueClass(IntWritable.class); //设置Mapper输出Value类型
job.setReducerClass(IPCountReducer.class); //指定Reducer类
job.setOutputKeyClass(DayAndIp.class); //设置Reduce输出Key类型
job.setOutputValueClass(IntWritable.class); //设置Reduce输出Value类型
//3.设置作业输入和输出路径
String dataDir = "/expr/weblog/data"; //实验数据目录
String outputDir = "/expr/weblog/output4"; //实验输出目录
Path inPath = new Path(hdfs + dataDir);
Path outPath = new Path(hdfs + outputDir);
FileInputFormat.addInputPath(job, inPath);
FileOutputFormat.setOutputPath(job, outPath);
FileSystem fs = FileSystem.get(conf);
if(fs.exists(outPath)) {
fs.delete(outPath, true);
}
//4.运行作业
System.out.println("Job: " + jobName + " is running...");
if(job.waitForCompletion(true)) {
System.out.println("success!");
System.exit(0);
} else {
System.out.println("failed!");
System.exit(1);
}
}
public static void main(String[] args) throws Exception {
//1.设置HDFS配置信息
String namenode_ip = "192.168.17.10";
String hdfs = "hdfs://" + namenode_ip + ":9000";
Configuration conf = new Configuration();
conf.set("fs.defaultFS", hdfs);
conf.set("mapreduce.app-submission.cross-platform", "true");
//2.设置MapReduce作业配置信息
String jobName = "MedianStdDevJob"; //作业名称
Job job = Job.getInstance(conf, jobName);
job.setJarByClass(MedianStdDevJob.class); //指定运行时作业类
job.setJar("export\\MedianStdDevJob.jar"); //指定本地jar包
job.setMapperClass(MedianStdDevMapper.class); //指定Mapper类
job.setMapOutputKeyClass(IntWritable.class); //设置Mapper输出Key类型
job.setMapOutputValueClass(IntWritable.class); //设置Mapper输出Value类型
job.setReducerClass(MedianStdDevReducer.class); //指定Reducer类
job.setOutputKeyClass(IntWritable.class); //设置Reduce输出Key类型
job.setOutputValueClass(MedianStdDevTuple.class); //设置Reduce输出Value类型
//3.设置作业输入和输出路径
String dataDir = "/expr/medianstddev/data"; //实验数据目录
String outputDir = "/expr/medianstddev/output"; //实验输出目录
Path inPath = new Path(hdfs + dataDir);
Path outPath = new Path(hdfs + outputDir);
FileInputFormat.addInputPath(job, inPath);
FileOutputFormat.setOutputPath(job, outPath);
FileSystem fs = FileSystem.get(conf);
if(fs.exists(outPath)) {
fs.delete(outPath, true);
}
//4.运行作业
System.out.println("Job: " + jobName + " is running...");
if(job.waitForCompletion(true)) {
System.out.println("success!");
System.exit(0);
} else {
System.out.println("failed!");
System.exit(1);
}
}
private boolean computeFinalResponse(Path outPathFinal) throws ClassNotFoundException, IOException, InterruptedException
{
boolean success;
Job finalResponseJob = Job.getInstance(conf, "pir_finalResponse");
finalResponseJob.setSpeculativeExecution(false);
String finalResponseJobName = "pir_finalResponse";
// Set the same job configs as for the first iteration
finalResponseJob.getConfiguration().set("mapreduce.map.memory.mb", SystemConfiguration.getProperty("mapreduce.map.memory.mb", "2000"));
finalResponseJob.getConfiguration().set("mapreduce.reduce.memory.mb", SystemConfiguration.getProperty("mapreduce.reduce.memory.mb", "2000"));
finalResponseJob.getConfiguration().set("mapreduce.map.java.opts", SystemConfiguration.getProperty("mapreduce.map.java.opts", "-Xmx1800m"));
finalResponseJob.getConfiguration().set("mapreduce.reduce.java.opts", SystemConfiguration.getProperty("mapreduce.reduce.java.opts", "-Xmx1800m"));
finalResponseJob.getConfiguration().set("pirMR.queryInputDir", SystemConfiguration.getProperty("pir.queryInput"));
finalResponseJob.getConfiguration().set("pirMR.outputFile", outputFile);
finalResponseJob.getConfiguration().set("mapreduce.map.speculative", "false");
finalResponseJob.getConfiguration().set("mapreduce.reduce.speculative", "false");
finalResponseJob.setJobName(finalResponseJobName);
finalResponseJob.setJarByClass(ColumnMultMapper.class);
finalResponseJob.setNumReduceTasks(1);
// Set the Mapper, InputFormat, and input path
finalResponseJob.setMapperClass(ColumnMultMapper.class);
finalResponseJob.setInputFormatClass(TextInputFormat.class);
FileStatus[] status = fs.listStatus(new Path(outputDirColumnMult));
for (FileStatus fstat : status)
{
if (fstat.getPath().getName().startsWith(FileConst.PIR_COLS))
{
logger.info("fstat.getPath() = " + fstat.getPath().toString());
FileInputFormat.addInputPath(finalResponseJob, fstat.getPath());
}
}
finalResponseJob.setMapOutputKeyClass(LongWritable.class);
finalResponseJob.setMapOutputValueClass(Text.class);
// Set the reducer and output options
finalResponseJob.setReducerClass(FinalResponseReducer.class);
finalResponseJob.setOutputKeyClass(LongWritable.class);
finalResponseJob.setOutputValueClass(Text.class);
finalResponseJob.getConfiguration().set("mapreduce.output.textoutputformat.separator", ",");
// Delete the output file, if it exists
if (fs.exists(outPathFinal))
{
fs.delete(outPathFinal, true);
}
FileOutputFormat.setOutputPath(finalResponseJob, outPathFinal);
MultipleOutputs.addNamedOutput(finalResponseJob, FileConst.PIR_FINAL, TextOutputFormat.class, LongWritable.class, Text.class);
// Submit job, wait for completion
success = finalResponseJob.waitForCompletion(true);
return success;
}
public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
String namenode_ip = "192.168.17.10";
String hdfs = "hdfs://" + namenode_ip + ":9000";
Configuration conf = new Configuration();
conf.set("fs.defaultFS", hdfs);
conf.set("mapreduce.app-submission.cross-platform", "true");
// job1配置信息
Job job1 = Job.getInstance(conf,"Decompose");
job1.setJarByClass(JobRun.class);
job1.setJar("export\\mutualFriend.jar");
job1.setMapperClass(DecomposeFriendsMapper.class);
job1.setReducerClass(DecomposeFriendsReducer.class);
job1.setOutputKeyClass(Text.class);
job1.setOutputValueClass(Text.class);
Path input = new Path(hdfs+"/workspace/mutualFriends/data");
Path output1 = new Path(hdfs+"/workspace/mutualFriends/output_Dec");
FileInputFormat.addInputPath(job1, input);
FileOutputFormat.setOutputPath(job1, output1);
FileSystem fs = FileSystem.get(conf);
if (fs.exists(output1)) {
fs.delete(output1, true);
}
// job1如果运行成功则进入job2
if(job1.waitForCompletion(true)) {//job2完全依赖job1的结果,所以job1成功执行就开启job2
// job2配置信息
Job job2 = Job.getInstance(conf, "Merge");
job2.setJarByClass(JobRun.class);
job2.setJar("export\\mutualFriend.jar");
job2.setMapperClass(MergeFriendsMapper.class);
job2.setReducerClass(MergeFriendsReducer.class);
job2.setOutputKeyClass(Text.class);
job2.setOutputValueClass(Text.class);
Path output2 = new Path(hdfs+"/workspace/mutualFriends/output_Meg");
FileInputFormat.addInputPath(job2, output1);// 输入是job1的输出
FileOutputFormat.setOutputPath(job2, output2);
if (fs.exists(output2)) {
fs.delete(output2, true);
}
if(job2.waitForCompletion(true)) {
System.out.println("sucessed");
}else {
System.out.println("failed");
}
}
}
/**
* Adds a {@link Mapper} class to the chain reducer.
*
* <p>
* The key and values are passed from one element of the chain to the next, by
* value For the added Mapper the configuration given for it,
* <code>mapperConf</code>, have precedence over the job's Configuration. This
* precedence is in effect when the task is running.
* </p>
* <p>
* IMPORTANT: There is no need to specify the output key/value classes for the
* ChainMapper, this is done by the addMapper for the last mapper in the
* chain.
* </p>
*
* @param job
* The job.
* @param klass
* the Mapper class to add.
* @param inputKeyClass
* mapper input key class.
* @param inputValueClass
* mapper input value class.
* @param outputKeyClass
* mapper output key class.
* @param outputValueClass
* mapper output value class.
* @param mapperConf
* a configuration for the Mapper class. It is recommended to use a
* Configuration without default values using the
* <code>Configuration(boolean loadDefaults)</code> constructor with
* FALSE.
*/
public static void addMapper(Job job, Class<? extends Mapper> klass,
Class<?> inputKeyClass, Class<?> inputValueClass,
Class<?> outputKeyClass, Class<?> outputValueClass,
Configuration mapperConf) throws IOException {
job.setOutputKeyClass(outputKeyClass);
job.setOutputValueClass(outputValueClass);
Chain.addMapper(false, job, klass, inputKeyClass, inputValueClass,
outputKeyClass, outputValueClass, mapperConf);
}