org.apache.hadoop.mapreduce.Job#setOutputKeyClass ( )源码实例Demo

下面列出了org.apache.hadoop.mapreduce.Job#setOutputKeyClass ( ) 实例代码,或者点击链接到github查看源代码,也可以在右侧发表评论。

源代码1 项目: ES-Fastloader   文件: HdfsUtil.java
public static Job getHdfsJob(Configuration conf, TaskConfig taskConfig, IndexInfo indexInfo) throws Exception {
    Job job = Job.getInstance(conf, MAIN_CLASS);
    job.setJobName("DidiFastIndex_" + taskConfig.getEsTemplate());
    job.setJarByClass(FastIndex.class);
    job.setMapperClass(FastIndexMapper.class);
    job.setInputFormatClass(HCatInputFormat.class);
    job.setMapOutputKeyClass(IntWritable.class);
    job.setMapOutputValueClass(DefaultHCatRecord.class);
    HCatInputFormat.setInput(job, taskConfig.getHiveDB(), taskConfig.getHiveTable(), taskConfig.getFilterStr());

    job.setReducerClass(FastIndexReducer.class);
    job.setOutputKeyClass(NullWritable.class);
    job.setOutputValueClass(NullWritable.class);
    job.setNumReduceTasks(indexInfo.getReducerNum());
    job.setOutputFormatClass(TextOutputFormat.class);
    FileOutputFormat.setOutputPath(job, new Path(taskConfig.getHdfsMROutputPath()));

    return job;
}
 
源代码2 项目: MLHadoop   文件: find_nth_driver.java
@SuppressWarnings("deprecation")
public static String runSafely (String[] args, long n) throws IOException, InterruptedException, ClassNotFoundException {
  Configuration conf= new Configuration();
  FileSystem hdfs=FileSystem.get(conf);
  // Deleting previous stored nth row
  hdfs.delete(new Path(args[1]));
  conf.setLong("n", n);
  Job job = new Job(conf);

  job.setJarByClass(find_nth_driver.class);

  job.setJobName("Finds the nth row of the HDFS file");

  FileInputFormat.setInputPaths(job, new Path(args[0]));
  FileOutputFormat.setOutputPath(job, new Path(args[1]));

  job.setMapperClass(find_nth_mapper.class);
  job.setNumReduceTasks(0);
  job.setOutputKeyClass(NullWritable.class);
  job.setOutputValueClass(LongAndTextWritable.class);

  job.waitForCompletion(true);
  
  return readNthRow(args[1], conf);
}
 
@Override
public int run(String[] args) throws Exception {
  Configuration conf = new Configuration();
  GenericOptionsParser parser = new GenericOptionsParser(conf, args);
  String[] otherArgs = parser.getRemainingArgs();
  if (otherArgs.length != 3) {
    System.err.println("Usage: DistributedGrep <regex> <in> <out>");
    ToolRunner.printGenericCommandUsage(System.err);
    System.exit(2);
  }
  Job job = new Job(conf, "Distributed Grep");
  job.setJarByClass(DistributedGrep.class);
  job.setMapperClass(GrepMapper.class);
  job.setOutputKeyClass(NullWritable.class);
  job.setOutputValueClass(Text.class);
  job.getConfiguration().set(REGEX_KEY, otherArgs[0]);
  FileInputFormat.addInputPath(job, new Path(otherArgs[1]));
  FileOutputFormat.setOutputPath(job, new Path(otherArgs[2]));
  boolean success = job.waitForCompletion(true);

  return success ? 0 : 1;
}
 
源代码4 项目: hiped2   文件: Main.java
public static void runSortJob(Configuration conf, Path input, Path outputPath)
    throws Exception {

  Job job = new Job(conf);
  job.setJarByClass(Main.class);
  job.setMapperClass(SortMapReduce.Map.class);
  job.setReducerClass(SortMapReduce.Reduce.class);

  job.setInputFormatClass(KeyValueTextInputFormat.class);

  job.setMapOutputKeyClass(Person.class);
  job.setMapOutputValueClass(Person.class);

  job.setOutputKeyClass(Text.class);
  job.setOutputValueClass(Text.class);

  job.setPartitionerClass(PersonNamePartitioner.class);
  job.setSortComparatorClass(PersonComparator.class);
  job.setGroupingComparatorClass(PersonNameComparator.class);

  FileInputFormat.setInputPaths(job, input);
  FileOutputFormat.setOutputPath(job, outputPath);

  job.waitForCompletion(true);
}
 
源代码5 项目: marklogic-contentpump   文件: LinkCountHDFS.java
public static void main(String[] args) throws Exception {
    Configuration conf = new Configuration();
    if (args.length < 2) {
        System.err.println("Usage: LinkCountHDFS inputDir outputDir");
        System.exit(2);
    }
    String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();
    
    Job job = Job.getInstance(conf, "link count hdfs");
    job.setJarByClass(LinkCountHDFS.class);
    job.setInputFormatClass(HDFSInputFormat.class);
    job.setMapperClass(RefMapper.class);
    job.setMapOutputKeyClass(Text.class);
    job.setMapOutputValueClass(IntWritable.class);
    
    job.setReducerClass(IntSumReducer.class);
    job.setOutputFormatClass(TextOutputFormat.class);
    job.setOutputKeyClass(Text.class);
    job.setOutputValueClass(IntWritable.class);
    HDFSInputFormat.setInputPaths(job, new Path(otherArgs[0]));
    FileOutputFormat.setOutputPath(job, new Path(otherArgs[1]));

    System.exit(job.waitForCompletion(true) ? 0 : 1);
}
 
源代码6 项目: hadoop   文件: WordStandardDeviation.java
@Override
public int run(String[] args) throws Exception {
  if (args.length != 2) {
    System.err.println("Usage: wordstddev <in> <out>");
    return 0;
  }

  Configuration conf = getConf();

  Job job = Job.getInstance(conf, "word stddev");
  job.setJarByClass(WordStandardDeviation.class);
  job.setMapperClass(WordStandardDeviationMapper.class);
  job.setCombinerClass(WordStandardDeviationReducer.class);
  job.setReducerClass(WordStandardDeviationReducer.class);
  job.setOutputKeyClass(Text.class);
  job.setOutputValueClass(LongWritable.class);
  FileInputFormat.addInputPath(job, new Path(args[0]));
  Path outputpath = new Path(args[1]);
  FileOutputFormat.setOutputPath(job, outputpath);
  boolean result = job.waitForCompletion(true);

  // read output and calculate standard deviation
  stddev = readAndCalcStdDev(outputpath, conf);

  return (result ? 0 : 1);
}
 
private Configuration createConf() throws IOException {
    Configuration conf = HdpBootstrap.hadoopConfig();
    HadoopCfgUtils.setGenericOptions(conf);
    Job job = new Job(conf);
    job.setInputFormatClass(EsInputFormat.class);
    job.setOutputFormatClass(PrintStreamOutputFormat.class);
    job.setOutputKeyClass(Text.class);

    boolean type = random.nextBoolean();
    Class<?> mapType = (type ? MapWritable.class : LinkedMapWritable.class);

    job.setOutputValueClass(mapType);
    conf.set(ConfigurationOptions.ES_QUERY, query);

    conf.set(ConfigurationOptions.ES_READ_METADATA, String.valueOf(readMetadata));
    conf.set(ConfigurationOptions.ES_OUTPUT_JSON, String.valueOf(readAsJson));

    new QueryTestParams(tempFolder).provisionQueries(conf);
    job.setNumReduceTasks(0);
    //PrintStreamOutputFormat.stream(conf, Stream.OUT);

    Configuration cfg = job.getConfiguration();
    HdpBootstrap.addProperties(cfg, TestSettings.TESTING_PROPS, false);
    return cfg;
}
 
源代码8 项目: wifi   文件: WordCount.java
public static void main(String[] args) throws Exception {
		Configuration conf = new Configuration();
		String[] otherArgs = new GenericOptionsParser(conf,args).getRemainingArgs();
//		System.out.println(otherArgs);
		if(otherArgs.length != 2) {
			System.out.println("Usage:wordcount <in> <out>");
			System.exit(2);
		}
//		if(args.length != 2) {
//			System.out.println("param error!");
//			System.exit(-1);
//		}
		
		Job job = new Job(conf, "word count");
		job.setJarByClass(WordCount.class);
		job.setMapperClass(TokenizerMapper.class);
		job.setCombinerClass(IntSumReducer.class);
		job.setReducerClass(IntSumReducer.class);
		job.setOutputKeyClass(Text.class);
		job.setOutputValueClass(IntWritable.class);
		FileInputFormat.addInputPath(job, new Path(args[0]));
		FileOutputFormat.setOutputPath(job, new Path(args[1]));
		System.exit(job.waitForCompletion(true) ? 0 : 1);
		
	}
 
源代码9 项目: WIFIProbe   文件: Task.java
private boolean analyze(final String inputFilePath,
                           final String outputFilePath,
                           final Long startTime) throws Exception {
    Configuration conf = new Configuration();
    conf.setLong(Holistic.START_TIME, startTime);
    conf.setLong(Holistic.EXECUTE_TIME, executeHourTime);

    Job jobAnalyze = Job.getInstance(conf, "analyze");

    jobAnalyze.setJarByClass(Holistic.class);

    MultipleOutputs.addNamedOutput(jobAnalyze, MapKeyConfig.NEW_OLD_CUSTOMER,
            TextOutputFormat.class, KeyWrapper.class, Text.class);
    MultipleOutputs.addNamedOutput(jobAnalyze, MapKeyConfig.CUSTOMER_FLOW_KEY,
            TextOutputFormat.class, KeyWrapper.class, Text.class);
    MultipleOutputs.addNamedOutput(jobAnalyze, MapKeyConfig.CYCLE,
            TextOutputFormat.class, KeyWrapper.class, Text.class);
    MultipleOutputs.addNamedOutput(jobAnalyze, MapKeyConfig.IN_STORE_HOUR,
            TextOutputFormat.class, KeyWrapper.class, Text.class);

    jobAnalyze.setMapperClass(AnalysisMapper.class);
    jobAnalyze.setReducerClass(AnalysisReducer.class);
    jobAnalyze.setCombinerClass(AnalysisCombiner.class);

    jobAnalyze.setOutputKeyClass(LongWritable.class);
    jobAnalyze.setOutputValueClass(Text.class);

    jobAnalyze.setMapOutputKeyClass(KeyWrapper.class);
    jobAnalyze.setMapOutputValueClass(ValueWrapper.class);

    FileInputFormat.addInputPath(jobAnalyze, new Path(inputFilePath));
    FileOutputFormat.setOutputPath(jobAnalyze, new Path(outputFilePath));

    return jobAnalyze.waitForCompletion(true) ;
}
 
源代码10 项目: BigDataArchitect   文件: ActiveUserRunner.java
@Override
public int run(String[] args) throws Exception {
    Configuration conf = this.getConf();
    // 初始化参数
    this.processArgs(conf, args);

    // 创建job
    Job job = Job.getInstance(conf, "active_user");
    

    // 设置job相关配置参数
    job.setJarByClass(ActiveUserRunner.class);
    // hbase 输入mapper参数
    // 1. 本地运行
    TableMapReduceUtil.initTableMapperJob(this.initScans(job), ActiveUserMapper.class, StatsUserDimension.class, TimeOutputValue.class, job, false);
    // 2. 集群运行
    // TableMapReduceUtil.initTableMapperJob(null, ActiveUserMapper.class,
    // StatsUserDimension.class, TimeOutputValue.class, job);

    // 设置reducer相关参数
    job.setReducerClass(ActiveUserReducer.class);
    job.setOutputKeyClass(StatsUserDimension.class);
    job.setOutputValueClass(MapWritableValue.class);

    // 设置output相关参数
    job.setOutputFormatClass(TransformerOutputFormat.class);
    // 开始毫秒数
    long startTime = System.currentTimeMillis();
    try {
        return job.waitForCompletion(true) ? 0 : -1;
    } finally {
        // 结束的毫秒数
        long endTime = System.currentTimeMillis();
        logger.info("Job<" + job.getJobName() + ">是否执行成功:" + job.isSuccessful() + "; 开始时间:" + startTime + "; 结束时间:" + endTime + "; 用时:" + (endTime - startTime) + "ms");
    }
}
 
源代码11 项目: gemfirexd-oss   文件: BusyLegs.java
public int run(String[] args) throws Exception {

    GfxdDataSerializable.initTypes();
    Configuration conf = getConf();

    Path outputPath = new Path(args[0]);
    String hdfsHomeDir = args[1];
    String tableName = args[2];

    outputPath.getFileSystem(conf).delete(outputPath, true);

    conf.set(RowInputFormat.HOME_DIR, hdfsHomeDir);
    conf.set(RowInputFormat.INPUT_TABLE, tableName);
    conf.setBoolean(RowInputFormat.CHECKPOINT_MODE, false);

    Job job = Job.getInstance(conf, "Busy Leg Count");

    job.setInputFormatClass(RowInputFormat.class);

    // configure mapper and reducer
    job.setMapperClass(SampleMapper.class);
    job.setCombinerClass(IntSumReducer.class);
    job.setReducerClass(IntSumReducer.class);

    // configure output
    TextOutputFormat.setOutputPath(job, outputPath);
    job.setOutputFormatClass(TextOutputFormat.class);
    job.setOutputKeyClass(Text.class);
    job.setOutputValueClass(IntWritable.class);

    return job.waitForCompletion(true) ? 0 : 1;
  }
 
源代码12 项目: hiped2   文件: SortMapReduce.java
/**
 * The MapReduce driver - setup and launch the job.
 *
 * @param args the command-line arguments
 * @return the process exit code
 * @throws Exception if something goes wrong
 */
public int run(final String[] args) throws Exception {

  Cli cli = Cli.builder().setArgs(args).addOptions(CliCommonOpts.MrIoOpts.values()).build();
  int result = cli.runCmd();

  if (result != 0) {
    return result;
  }

  Path inputPath = new Path(cli.getArgValueAsString(CliCommonOpts.MrIoOpts.INPUT));
  Path outputPath = new Path(cli.getArgValueAsString(CliCommonOpts.MrIoOpts.OUTPUT));

  Configuration conf = super.getConf();
  Job job = new Job(conf);
  job.setJarByClass(SortMapReduce.class);

  job.setMapperClass(Map.class);
  job.setReducerClass(Reduce.class);

  job.setInputFormatClass(KeyValueTextInputFormat.class);

  job.setMapOutputKeyClass(Person.class);
  job.setMapOutputValueClass(Text.class);

  job.setOutputKeyClass(Text.class);
  job.setOutputValueClass(Text.class);

  job.setPartitionerClass(PersonNamePartitioner.class);
  job.setSortComparatorClass(PersonComparator.class);
  job.setGroupingComparatorClass(PersonNameComparator.class);

  FileInputFormat.setInputPaths(job, inputPath);
  FileOutputFormat.setOutputPath(job, outputPath);

  if (job.waitForCompletion(true)) {
    return 0;
  }
  return 1;
}
 
源代码13 项目: MapReduce-Demo   文件: FlowCount.java
public static void main(String[] args) throws Exception {		
	//1.设置HDFS配置信息
	String namenode_ip = "192.168.17.10";
	String hdfs = "hdfs://" + namenode_ip + ":9000";			
	Configuration conf = new Configuration();
	conf.set("fs.defaultFS", hdfs);
	conf.set("mapreduce.app-submission.cross-platform", "true");

	//2.设置MapReduce作业配置信息
	String jobName = "FlowCount";					//作业名称
	Job job = Job.getInstance(conf, jobName);
	job.setJarByClass(FlowCount.class);				//指定运行时作业类
	job.setJar("export\\FlowCount.jar");			//指定本地jar包
	job.setMapperClass(FlowCountMapper.class);		//指定Mapper类
	job.setMapOutputKeyClass(Text.class);			//设置Mapper输出Key类型
	job.setMapOutputValueClass(IntWritable.class);	//设置Mapper输出Value类型
	job.setReducerClass(FlowCountReducer.class);	//指定Reducer类
	job.setOutputKeyClass(Text.class);				//设置Reduce输出Key类型
	job.setOutputValueClass(IntWritable.class); 	//设置Reduce输出Value类型
	
	//3.设置作业输入和输出路径
	String dataDir = "/expr/weblog/data";			//实验数据目录	
	String outputDir = "/expr/weblog/output1";		//实验输出目录
	Path inPath = new Path(hdfs + dataDir);
	Path outPath = new Path(hdfs + outputDir);
	FileInputFormat.addInputPath(job, inPath);
	FileOutputFormat.setOutputPath(job, outPath);
	FileSystem fs = FileSystem.get(conf);
	if(fs.exists(outPath)) {
		fs.delete(outPath, true);
	}
	
	//4.运行作业
	System.out.println("Job: " + jobName + " is running...");
	if(job.waitForCompletion(true)) {
		System.out.println("success!");
		System.exit(0);
	} else {
		System.out.println("failed!");
		System.exit(1);
	}
}
 
public static void main(String[] args)
    throws IOException, InterruptedException, ClassNotFoundException {

  // GenericOptionsParser is a utility to parse command line arguments generic to the Hadoop
  // framework. This example won't cover the specifics, but will recognize several standard
  // command line arguments, enabling applications to easily specify a namenode, a
  // ResourceManager, additional configuration resources etc.
  GenericOptionsParser parser = new GenericOptionsParser(args);
  args = parser.getRemainingArgs();

  // Make sure we have the right parameters.
  if (args.length != 3) {
    System.out.println(
        "Usage: hadoop jar bigquery_wordcount.jar [ProjectId] [QualifiedInputTableId] "
            + "[GcsOutputPath]\n"
            + "    ProjectId - Project under which to issue the BigQuery operations. Also "
            + "serves as the default project for table IDs which don't explicitly specify a "
            + "project for the table.\n"
            + "    QualifiedInputTableId - Input table ID of the form "
            + "(Optional ProjectId):[DatasetId].[TableId]\n"
            + "    OutputPath - The output path to write data, e.g. "
            + "gs://bucket/dir/");
    System.exit(1);
  }

  // Get the individual parameters from the command line.
  String projectId = args[0];
  String inputQualifiedTableId = args[1];
  String outputPath = args[2];

  // Create the job and get its configuration.
  Job job = new Job(parser.getConfiguration(), "wordcount");
  Configuration conf = job.getConfiguration();

  // Set the job-level projectId.
  conf.set(PROJECT_ID.getKey(), projectId);

  // Configure input and output.
  BigQueryConfiguration.configureBigQueryInput(conf, inputQualifiedTableId);

  // Set column and predicate filters
  conf.set(SELECTED_FIELDS.getKey(), "word,word_count");
  conf.set(SQL_FILTER.getKey(), "word >= 'A' AND word <= 'zzz'");
  conf.set(MRJobConfig.NUM_MAPS, "999");

  // This helps Hadoop identify the Jar which contains the mapper and reducer by specifying a
  // class in that Jar. This is required if the jar is being passed on the command line to Hadoop.
  job.setJarByClass(DirectBigQueryWordCount.class);

  // Tell the job what the output will be.
  job.setOutputKeyClass(Text.class);
  job.setOutputValueClass(LongWritable.class);

  job.setMapperClass(Map.class);
  job.setReducerClass(Reduce.class);

  job.setInputFormatClass(DirectBigQueryInputFormat.class);
  job.setOutputFormatClass(TextOutputFormat.class);

  FileOutputFormat.setOutputPath(job, new Path(outputPath));

  job.waitForCompletion(true);
}
 
源代码15 项目: Redis-4.x-Cookbook   文件: Application.java
public int run(String[] args) throws Exception {
    Configuration conf = getConf();

    if (args.length != 6) {
        System.err
                .println("Usage: AddBalance <redis hosts> <hash prefix> <balance> <partition length> <begin> <end>");
        System.exit(1);
    }

    String host = args[0];
    String hashPrefix = args[1];
    String balance = args[2];
    String pLength = args[3];
    String begin = args[4];
    String end = args[5];

    Job job = Job.getInstance(conf, "Add Balance");
    job.setNumReduceTasks(0);

    job.setJarByClass(Application.class);
    job.setMapperClass(RedisOutputMapper.class);

    RedisOutputMapper.setBalance(job,balance);

    job.setInputFormatClass(RedisHashInputFormat.class);
    RedisHashInputFormat.setRedisHost(job, host);
    RedisHashInputFormat.setHashPrefix(job, hashPrefix);
    RedisHashInputFormat.setBegin(job, begin);
    RedisHashInputFormat.setEnd(job, end);
    RedisHashInputFormat.setPLength(job, pLength);

    job.setOutputFormatClass(RedisHashOutputFormat.class);
    RedisHashOutputFormat.setRedisHost(job, host);
    RedisHashOutputFormat.setPLength(job, pLength);

    job.setOutputKeyClass(Text.class);
    job.setOutputValueClass(Text.class);

    //Wait for job completion
    return (job.waitForCompletion(true) ? 0 : 1);
}
 
源代码16 项目: MapReduce-Demo   文件: IPCount.java
public static void main(String[] args) throws Exception {		
	//1.设置HDFS配置信息
	String namenode_ip = "192.168.17.10";
	String hdfs = "hdfs://" + namenode_ip + ":9000";			
	Configuration conf = new Configuration();
	conf.set("fs.defaultFS", hdfs);
	conf.set("mapreduce.app-submission.cross-platform", "true");

	//2.设置MapReduce作业配置信息
	String jobName = "IPCount";						//作业名称
	Job job = Job.getInstance(conf, jobName);
	job.setJarByClass(IPCount.class);				//指定运行时作业类
	job.setJar("export\\IPCount.jar");				//指定本地jar包
	job.setMapperClass(IPCountMapper.class);		//指定Mapper类
	job.setMapOutputKeyClass(DayAndIp.class);		//设置Mapper输出Key类型
	job.setMapOutputValueClass(IntWritable.class);	//设置Mapper输出Value类型
	job.setReducerClass(IPCountReducer.class);		//指定Reducer类
	job.setOutputKeyClass(DayAndIp.class);			//设置Reduce输出Key类型
	job.setOutputValueClass(IntWritable.class); 	//设置Reduce输出Value类型
	
	//3.设置作业输入和输出路径
	String dataDir = "/expr/weblog/data";			//实验数据目录	
	String outputDir = "/expr/weblog/output4";		//实验输出目录
	Path inPath = new Path(hdfs + dataDir);
	Path outPath = new Path(hdfs + outputDir);
	FileInputFormat.addInputPath(job, inPath);
	FileOutputFormat.setOutputPath(job, outPath);
	FileSystem fs = FileSystem.get(conf);
	if(fs.exists(outPath)) {
		fs.delete(outPath, true);
	}
	
	//4.运行作业
	System.out.println("Job: " + jobName + " is running...");
	if(job.waitForCompletion(true)) {
		System.out.println("success!");
		System.exit(0);
	} else {
		System.out.println("failed!");
		System.exit(1);
	}
}
 
源代码17 项目: MapReduce-Demo   文件: MedianStdDevJob.java
public static void main(String[] args) throws Exception {
	//1.设置HDFS配置信息
	String namenode_ip = "192.168.17.10";
	String hdfs = "hdfs://" + namenode_ip + ":9000";			
	Configuration conf = new Configuration();
	conf.set("fs.defaultFS", hdfs);
	conf.set("mapreduce.app-submission.cross-platform", "true");

	//2.设置MapReduce作业配置信息
	String jobName = "MedianStdDevJob";					//作业名称
	Job job = Job.getInstance(conf, jobName);
	job.setJarByClass(MedianStdDevJob.class);			//指定运行时作业类
	job.setJar("export\\MedianStdDevJob.jar");			//指定本地jar包
	job.setMapperClass(MedianStdDevMapper.class);		//指定Mapper类
	job.setMapOutputKeyClass(IntWritable.class);		//设置Mapper输出Key类型
	job.setMapOutputValueClass(IntWritable.class);		//设置Mapper输出Value类型
	job.setReducerClass(MedianStdDevReducer.class);		//指定Reducer类
	job.setOutputKeyClass(IntWritable.class);			//设置Reduce输出Key类型
	job.setOutputValueClass(MedianStdDevTuple.class); 	//设置Reduce输出Value类型
	
	//3.设置作业输入和输出路径
	String dataDir = "/expr/medianstddev/data";			//实验数据目录	
	String outputDir = "/expr/medianstddev/output";		//实验输出目录
	Path inPath = new Path(hdfs + dataDir);
	Path outPath = new Path(hdfs + outputDir);
	FileInputFormat.addInputPath(job, inPath);
	FileOutputFormat.setOutputPath(job, outPath);
	FileSystem fs = FileSystem.get(conf);
	if(fs.exists(outPath)) {
		fs.delete(outPath, true);
	}
	
	//4.运行作业
	System.out.println("Job: " + jobName + " is running...");
	if(job.waitForCompletion(true)) {
		System.out.println("success!");
		System.exit(0);
	} else {
		System.out.println("failed!");
		System.exit(1);
	}
}
 
private boolean computeFinalResponse(Path outPathFinal) throws ClassNotFoundException, IOException, InterruptedException
{
  boolean success;

  Job finalResponseJob = Job.getInstance(conf, "pir_finalResponse");
  finalResponseJob.setSpeculativeExecution(false);

  String finalResponseJobName = "pir_finalResponse";

  // Set the same job configs as for the first iteration
  finalResponseJob.getConfiguration().set("mapreduce.map.memory.mb", SystemConfiguration.getProperty("mapreduce.map.memory.mb", "2000"));
  finalResponseJob.getConfiguration().set("mapreduce.reduce.memory.mb", SystemConfiguration.getProperty("mapreduce.reduce.memory.mb", "2000"));
  finalResponseJob.getConfiguration().set("mapreduce.map.java.opts", SystemConfiguration.getProperty("mapreduce.map.java.opts", "-Xmx1800m"));
  finalResponseJob.getConfiguration().set("mapreduce.reduce.java.opts", SystemConfiguration.getProperty("mapreduce.reduce.java.opts", "-Xmx1800m"));

  finalResponseJob.getConfiguration().set("pirMR.queryInputDir", SystemConfiguration.getProperty("pir.queryInput"));
  finalResponseJob.getConfiguration().set("pirMR.outputFile", outputFile);

  finalResponseJob.getConfiguration().set("mapreduce.map.speculative", "false");
  finalResponseJob.getConfiguration().set("mapreduce.reduce.speculative", "false");

  finalResponseJob.setJobName(finalResponseJobName);
  finalResponseJob.setJarByClass(ColumnMultMapper.class);
  finalResponseJob.setNumReduceTasks(1);

  // Set the Mapper, InputFormat, and input path
  finalResponseJob.setMapperClass(ColumnMultMapper.class);
  finalResponseJob.setInputFormatClass(TextInputFormat.class);

  FileStatus[] status = fs.listStatus(new Path(outputDirColumnMult));
  for (FileStatus fstat : status)
  {
    if (fstat.getPath().getName().startsWith(FileConst.PIR_COLS))
    {
      logger.info("fstat.getPath() = " + fstat.getPath().toString());
      FileInputFormat.addInputPath(finalResponseJob, fstat.getPath());
    }
  }
  finalResponseJob.setMapOutputKeyClass(LongWritable.class);
  finalResponseJob.setMapOutputValueClass(Text.class);

  // Set the reducer and output options
  finalResponseJob.setReducerClass(FinalResponseReducer.class);
  finalResponseJob.setOutputKeyClass(LongWritable.class);
  finalResponseJob.setOutputValueClass(Text.class);
  finalResponseJob.getConfiguration().set("mapreduce.output.textoutputformat.separator", ",");

  // Delete the output file, if it exists
  if (fs.exists(outPathFinal))
  {
    fs.delete(outPathFinal, true);
  }
  FileOutputFormat.setOutputPath(finalResponseJob, outPathFinal);
  MultipleOutputs.addNamedOutput(finalResponseJob, FileConst.PIR_FINAL, TextOutputFormat.class, LongWritable.class, Text.class);

  // Submit job, wait for completion
  success = finalResponseJob.waitForCompletion(true);

  return success;
}
 
源代码19 项目: MapReduce-Demo   文件: JobRun.java
public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
	String namenode_ip = "192.168.17.10";
	String hdfs = "hdfs://" + namenode_ip + ":9000";			
	Configuration conf = new Configuration();		
	conf.set("fs.defaultFS", hdfs);
	conf.set("mapreduce.app-submission.cross-platform", "true");
	
	// job1配置信息
	Job job1 = Job.getInstance(conf,"Decompose");
	job1.setJarByClass(JobRun.class);
	job1.setJar("export\\mutualFriend.jar");
	job1.setMapperClass(DecomposeFriendsMapper.class);
	job1.setReducerClass(DecomposeFriendsReducer.class);
	job1.setOutputKeyClass(Text.class);
	job1.setOutputValueClass(Text.class);
	
	Path input = new Path(hdfs+"/workspace/mutualFriends/data");
	Path output1 = new Path(hdfs+"/workspace/mutualFriends/output_Dec");
	FileInputFormat.addInputPath(job1, input);
	FileOutputFormat.setOutputPath(job1, output1);
	FileSystem fs = FileSystem.get(conf);
	if (fs.exists(output1)) {
		fs.delete(output1, true);
	}
	
	// job1如果运行成功则进入job2
	if(job1.waitForCompletion(true)) {//job2完全依赖job1的结果,所以job1成功执行就开启job2
		// job2配置信息
		Job job2 = Job.getInstance(conf, "Merge");
		job2.setJarByClass(JobRun.class);
		job2.setJar("export\\mutualFriend.jar");
		job2.setMapperClass(MergeFriendsMapper.class);
		job2.setReducerClass(MergeFriendsReducer.class);
		job2.setOutputKeyClass(Text.class);
		job2.setOutputValueClass(Text.class);
		
		Path output2 = new Path(hdfs+"/workspace/mutualFriends/output_Meg");
		FileInputFormat.addInputPath(job2, output1);// 输入是job1的输出
		FileOutputFormat.setOutputPath(job2, output2);
		if (fs.exists(output2)) {
			fs.delete(output2, true);
		}
		if(job2.waitForCompletion(true)) {
			System.out.println("sucessed");
		}else {
			System.out.println("failed");
		}
	}
}
 
源代码20 项目: hadoop   文件: ChainReducer.java
/**
 * Adds a {@link Mapper} class to the chain reducer.
 * 
 * <p>
 * The key and values are passed from one element of the chain to the next, by
 * value For the added Mapper the configuration given for it,
 * <code>mapperConf</code>, have precedence over the job's Configuration. This
 * precedence is in effect when the task is running.
 * </p>
 * <p>
 * IMPORTANT: There is no need to specify the output key/value classes for the
 * ChainMapper, this is done by the addMapper for the last mapper in the
 * chain.
 * </p>
 * 
 * @param job
 *          The job.
 * @param klass
 *          the Mapper class to add.
 * @param inputKeyClass
 *          mapper input key class.
 * @param inputValueClass
 *          mapper input value class.
 * @param outputKeyClass
 *          mapper output key class.
 * @param outputValueClass
 *          mapper output value class.
 * @param mapperConf
 *          a configuration for the Mapper class. It is recommended to use a
 *          Configuration without default values using the
 *          <code>Configuration(boolean loadDefaults)</code> constructor with
 *          FALSE.
 */
public static void addMapper(Job job, Class<? extends Mapper> klass,
    Class<?> inputKeyClass, Class<?> inputValueClass,
    Class<?> outputKeyClass, Class<?> outputValueClass,
    Configuration mapperConf) throws IOException {
  job.setOutputKeyClass(outputKeyClass);
  job.setOutputValueClass(outputValueClass);
  Chain.addMapper(false, job, klass, inputKeyClass, inputValueClass,
      outputKeyClass, outputValueClass, mapperConf);
}