org.apache.hadoop.mapreduce.Job#setOutputValueClass ( )源码实例Demo

下面列出了org.apache.hadoop.mapreduce.Job#setOutputValueClass ( ) 实例代码,或者点击链接到github查看源代码,也可以在右侧发表评论。

源代码1 项目: wifi   文件: WordCount.java
public static void main(String[] args) throws Exception {
		Configuration conf = new Configuration();
		String[] otherArgs = new GenericOptionsParser(conf,args).getRemainingArgs();
//		System.out.println(otherArgs);
		if(otherArgs.length != 2) {
			System.out.println("Usage:wordcount <in> <out>");
			System.exit(2);
		}
//		if(args.length != 2) {
//			System.out.println("param error!");
//			System.exit(-1);
//		}
		
		Job job = new Job(conf, "word count");
		job.setJarByClass(WordCount.class);
		job.setMapperClass(TokenizerMapper.class);
		job.setCombinerClass(IntSumReducer.class);
		job.setReducerClass(IntSumReducer.class);
		job.setOutputKeyClass(Text.class);
		job.setOutputValueClass(IntWritable.class);
		FileInputFormat.addInputPath(job, new Path(args[0]));
		FileOutputFormat.setOutputPath(job, new Path(args[1]));
		System.exit(job.waitForCompletion(true) ? 0 : 1);
		
	}
 
源代码2 项目: marklogic-contentpump   文件: CustomQuery.java
public static void main(String[] args) throws Exception {
    Configuration conf = new Configuration();
    if (args.length < 1) {
        System.err.println("Usage: CustomQuery configFile");
        System.exit(2);
    }
    String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();

    Job job = Job.getInstance(conf, "custom query");
    job.setJarByClass(CustomQuery.class);
    
    job.setInputFormatClass(NodeInputFormat.class);
    job.setMapperClass(QueryMapper.class);
    job.setMapOutputKeyClass(IntWritable.class);
    job.setMapOutputValueClass(Text.class);
    
    job.setReducerClass(QueryReducer.class);
    job.setOutputFormatClass(KeyValueOutputFormat.class);
    job.setOutputKeyClass(Text.class);
    job.setOutputValueClass(Text.class);

    conf = job.getConfiguration();
    conf.addResource(otherArgs[0]);
    
    System.exit(job.waitForCompletion(true) ? 0 : 1);
}
 
源代码3 项目: big-c   文件: TeraValidate.java
public int run(String[] args) throws Exception {
  Job job = Job.getInstance(getConf());
  if (args.length != 2) {
    usage();
    return 1;
  }
  TeraInputFormat.setInputPaths(job, new Path(args[0]));
  FileOutputFormat.setOutputPath(job, new Path(args[1]));
  job.setJobName("TeraValidate");
  job.setJarByClass(TeraValidate.class);
  job.setMapperClass(ValidateMapper.class);
  job.setReducerClass(ValidateReducer.class);
  job.setOutputKeyClass(Text.class);
  job.setOutputValueClass(Text.class);
  // force a single reducer
  job.setNumReduceTasks(1);
  // force a single split 
  FileInputFormat.setMinInputSplitSize(job, Long.MAX_VALUE);
  job.setInputFormatClass(TeraInputFormat.class);
  return job.waitForCompletion(true) ? 0 : 1;
}
 
源代码4 项目: bigdata-tutorial   文件: XflowStatic.java
public static void main(String[] args) throws Exception {

		Configuration conf = new Configuration();
		String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();
		if (otherArgs.length != 3) {
			System.err.println("Usage: xflowstatic <type> <in> <out>");
			System.exit(2);
		}
		conf.set(TYPE_KEY, otherArgs[0]);
		Job job = Job.getInstance();
		job.setJobName("xflowstatic");
		job.setJarByClass(XflowStatic.class);
		job.setMapperClass(XflowMapper.class);
		job.setCombinerClass(IntSumReducer.class);
		job.setReducerClass(IntSumReducer.class);
		job.setOutputKeyClass(Text.class);
		job.setOutputValueClass(IntWritable.class);
		FileInputFormat.addInputPath(job, new Path(otherArgs[1]));
		FileOutputFormat.setOutputPath(job, new Path(otherArgs[2]));
		System.exit(job.waitForCompletion(true) ? 0 : 1);
	}
 
源代码5 项目: IntroToHadoopAndMR__Udacity_Course   文件: P1.java
public final static void main(final String[] args) throws Exception {
	final Configuration conf = new Configuration();

	final Job job = new Job(conf, "P1");
	job.setJarByClass(P1.class);

	job.setOutputKeyClass(Text.class);
	job.setOutputValueClass(DoubleWritable.class);

	job.setMapperClass(P1Map.class);
	job.setCombinerClass(P1Reduce.class);
	job.setReducerClass(P1Reduce.class);

	job.setInputFormatClass(TextInputFormat.class);
	job.setOutputFormatClass(TextOutputFormat.class);

	FileInputFormat.addInputPath(job, new Path(args[0]));
	FileOutputFormat.setOutputPath(job, new Path(args[1]));

	job.waitForCompletion(true);
}
 
/**
 * Set the mapper class implementation to use in the job,
 * as well as any related configuration (e.g., map output types).
 */
protected void configureMapper(Job job, String tableName,
    String tableClassName) throws ClassNotFoundException, IOException {
  job.setMapperClass(getMapperClass());
  job.setOutputKeyClass(String.class);
  job.setOutputValueClass(NullWritable.class);
}
 
源代码7 项目: Hadoop-BAM   文件: TestBAM.java
public int run(String[] args) throws Exception {
    final Configuration conf = getConf();

    conf.set(MyOutputFormat.HEADER_FROM_FILE, args[0]);

    final Job job = new Job(conf);

    job.setJarByClass(TestBAM.class);
    job.setMapperClass (TestBAMMapper.class);
    job.setReducerClass(TestBAMReducer.class);

    job.setMapOutputKeyClass(Text.class);
    job.setMapOutputValueClass(SAMRecordWritable.class);
    job.setOutputKeyClass(Text.class);
    job.setOutputValueClass (SAMRecordWritable.class);

    job.setInputFormatClass(AnySAMInputFormat.class);
    job.setOutputFormatClass(TestBAM.MyOutputFormat.class);

    org.apache.hadoop.mapreduce.lib.input.FileInputFormat.addInputPath(job, new Path(args[0]));

    org.apache.hadoop.mapreduce.lib.output.FileOutputFormat.setOutputPath(job, new Path(args[1]));
    job.submit();

    if (!job.waitForCompletion(true)) {
        System.err.println("sort :: Job failed.");
        return 1;
    }

  return 0;
}
 
@Override
public int run(String[] args) throws Exception {
	Configuration conf = new Configuration();
	GenericOptionsParser parser = new GenericOptionsParser(conf, args);
	String[] otherArgs = parser.getRemainingArgs();
	if (otherArgs.length != 4) {
		printUsage();
	}
	Job job = new Job(conf, "ReduceSideJoin");
	job.setJarByClass(ReplicatedUserJoin.class);

	// Use MultipleInputs to set which input uses what mapper
	// This will keep parsing of each data set separate from a logical
	// standpoint
	// The first two elements of the args array are the two inputs
	MultipleInputs.addInputPath(job, new Path(args[0]),
			TextInputFormat.class, UserJoinMapper.class);
	MultipleInputs.addInputPath(job, new Path(args[1]),
			TextInputFormat.class, CommentJoinMapper.class);
	job.getConfiguration().set("join.type", args[2]);

	job.setReducerClass(UserJoinReducer.class);

	job.setOutputFormatClass(TextOutputFormat.class);
	TextOutputFormat.setOutputPath(job, new Path(args[3]));

	job.setOutputKeyClass(Text.class);
	job.setOutputValueClass(Text.class);

	return job.waitForCompletion(true) ? 0 : 2;
}
 
源代码9 项目: hgraphdb   文件: IndexTool.java
/**
 * Uses the HBase Front Door Api to write to index table. Submits the job and either returns or
 * waits for the job completion based on runForeground parameter.
 * 
 * @param job job
 * @param outputPath output path
 * @param runForeground - if true, waits for job completion, else submits and returns
 *            immediately.
 * @throws Exception
 */
private void configureSubmittableJobUsingDirectApi(Job job, Path outputPath, TableName outputTableName,
                                                   boolean skipDependencyJars, boolean runForeground)
        throws Exception {
    job.setMapperClass(getDirectMapperClass());
    job.setReducerClass(getDirectReducerClass());
    Configuration conf = job.getConfiguration();
    HBaseConfiguration.merge(conf, HBaseConfiguration.create(conf));
    conf.set(TableOutputFormat.OUTPUT_TABLE, outputTableName.getNameAsString());

    //Set the Output classes
    job.setMapOutputValueClass(IntWritable.class);
    job.setOutputKeyClass(NullWritable.class);
    job.setOutputValueClass(NullWritable.class);
    if (!skipDependencyJars) {
        TableMapReduceUtil.addDependencyJars(job);
    }
    job.setNumReduceTasks(1);

    if (!runForeground) {
        LOG.info("Running Index Build in Background - Submit async and exit");
        job.submit();
        return;
    }
    LOG.info("Running Index Build in Foreground. Waits for the build to complete. This may take a long time!.");
    boolean result = job.waitForCompletion(true);
    if (!result) {
        LOG.error("IndexTool job failed!");
        throw new Exception("IndexTool job failed: " + job.toString());
    }
    FileSystem.get(conf).delete(outputPath, true);
}
 
源代码10 项目: gemfirexd-oss   文件: BusyAirports.java
public int run(String[] args) throws Exception {

    GfxdDataSerializable.initTypes();
    Configuration conf = getConf();

    Path outputPath = new Path(args[0]);
    String hdfsHomeDir = args[1];
    String tableName = args[2];

    outputPath.getFileSystem(conf).delete(outputPath, true);

    conf.set(RowInputFormat.HOME_DIR, hdfsHomeDir);
    conf.set(RowInputFormat.INPUT_TABLE, tableName);
    conf.setBoolean(RowInputFormat.CHECKPOINT_MODE, false);

    Job job = Job.getInstance(conf, "Busy Airport Count");

    job.setInputFormatClass(RowInputFormat.class);

    // configure mapper and reducer
    job.setMapperClass(SampleMapper.class);
    job.setCombinerClass(IntSumReducer.class);
    job.setReducerClass(IntSumReducer.class);

    // configure output
    TextOutputFormat.setOutputPath(job, outputPath);
    job.setOutputFormatClass(TextOutputFormat.class);
    job.setOutputKeyClass(Text.class);
    job.setOutputValueClass(IntWritable.class);

    return job.waitForCompletion(true) ? 0 : 1;
  }
 
源代码11 项目: kite   文件: TestMapReduceHBase.java
@Test
@SuppressWarnings("deprecation")
public void testJobEmptyView() throws Exception {
  Job job = new Job(HBaseTestUtils.getConf());

  String datasetName = tableName + ".TestGenericEntity";

  Dataset<GenericRecord> inputDataset = repo.create("default", "in",
      new DatasetDescriptor.Builder()
          .schemaLiteral(testGenericEntity).build());

  DatasetDescriptor descriptor = new DatasetDescriptor.Builder()
      .schemaLiteral(testGenericEntity)
      .build();
  Dataset<GenericRecord> outputDataset = repo.create("default", datasetName, descriptor);

  DatasetKeyInputFormat.configure(job).readFrom(inputDataset);

  job.setMapperClass(AvroKeyWrapperMapper.class);
  job.setMapOutputKeyClass(AvroKey.class);
  job.setMapOutputValueClass(NullWritable.class);
  AvroJob.setMapOutputKeySchema(job, new Schema.Parser().parse(testGenericEntity));

  job.setReducerClass(AvroKeyWrapperReducer.class);
  job.setOutputKeyClass(GenericData.Record.class);
  job.setOutputValueClass(Void.class);
  AvroJob.setOutputKeySchema(job, new Schema.Parser().parse(testGenericEntity));

  DatasetKeyOutputFormat.configure(job).writeTo(outputDataset);

  Assert.assertTrue(job.waitForCompletion(true));
}
 
源代码12 项目: MapReduce-Demo   文件: FlowCount.java
public static void main(String[] args) throws Exception {		
	//1.设置HDFS配置信息
	String namenode_ip = "192.168.17.10";
	String hdfs = "hdfs://" + namenode_ip + ":9000";			
	Configuration conf = new Configuration();
	conf.set("fs.defaultFS", hdfs);
	conf.set("mapreduce.app-submission.cross-platform", "true");

	//2.设置MapReduce作业配置信息
	String jobName = "FlowCount";					//作业名称
	Job job = Job.getInstance(conf, jobName);
	job.setJarByClass(FlowCount.class);				//指定运行时作业类
	job.setJar("export\\FlowCount.jar");			//指定本地jar包
	job.setMapperClass(FlowCountMapper.class);		//指定Mapper类
	job.setMapOutputKeyClass(Text.class);			//设置Mapper输出Key类型
	job.setMapOutputValueClass(IntWritable.class);	//设置Mapper输出Value类型
	job.setReducerClass(FlowCountReducer.class);	//指定Reducer类
	job.setOutputKeyClass(Text.class);				//设置Reduce输出Key类型
	job.setOutputValueClass(IntWritable.class); 	//设置Reduce输出Value类型
	
	//3.设置作业输入和输出路径
	String dataDir = "/expr/weblog/data";			//实验数据目录	
	String outputDir = "/expr/weblog/output1";		//实验输出目录
	Path inPath = new Path(hdfs + dataDir);
	Path outPath = new Path(hdfs + outputDir);
	FileInputFormat.addInputPath(job, inPath);
	FileOutputFormat.setOutputPath(job, outPath);
	FileSystem fs = FileSystem.get(conf);
	if(fs.exists(outPath)) {
		fs.delete(outPath, true);
	}
	
	//4.运行作业
	System.out.println("Job: " + jobName + " is running...");
	if(job.waitForCompletion(true)) {
		System.out.println("success!");
		System.exit(0);
	} else {
		System.out.println("failed!");
		System.exit(1);
	}
}
 
public static void main(String[] args) throws Exception {
	Configuration conf = new Configuration();
	String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();

	if (otherArgs.length != 3) {
		System.err.println("Usage: JobChainingDriver <posts> <users> <out>");
		System.exit(2);
	}

	Path postInput = new Path(otherArgs[0]);
	Path userInput = new Path(otherArgs[1]);
	Path outputDirIntermediate = new Path(otherArgs[2] + "_int");
	Path outputDir = new Path(otherArgs[2]);

	// Setup first job to counter user posts
	Job countingJob = new Job(conf, "JobChaining-Counting");
	countingJob.setJarByClass(BasicJobChaining.class);

	// Set our mapper and reducer, we can use the API's long sum reducer for
	// a combiner!
	countingJob.setMapperClass(UserIdCountMapper.class);
	countingJob.setCombinerClass(LongSumReducer.class);
	countingJob.setReducerClass(UserIdSumReducer.class);

	countingJob.setOutputKeyClass(Text.class);
	countingJob.setOutputValueClass(LongWritable.class);

	countingJob.setInputFormatClass(TextInputFormat.class);

	TextInputFormat.addInputPath(countingJob, postInput);

	countingJob.setOutputFormatClass(TextOutputFormat.class);
	TextOutputFormat.setOutputPath(countingJob, outputDirIntermediate);

	// Execute job and grab exit code
	int code = countingJob.waitForCompletion(true) ? 0 : 1;

	if (code == 0) {
		// Calculate the average posts per user by getting counter values
		double numRecords = (double) countingJob.getCounters()
				.findCounter(AVERAGE_CALC_GROUP, UserIdCountMapper.RECORDS_COUNTER_NAME)
				.getValue();
		double numUsers = (double) countingJob.getCounters()
				.findCounter(AVERAGE_CALC_GROUP, UserIdSumReducer.USERS_COUNTER_NAME)
				.getValue();

		double averagePostsPerUser = numRecords / numUsers;

		// Setup binning job
		Job binningJob = new Job(new Configuration(), "JobChaining-Binning");
		binningJob.setJarByClass(BasicJobChaining.class);

		// Set mapper and the average posts per user
		binningJob.setMapperClass(UserIdBinningMapper.class);
		UserIdBinningMapper.setAveragePostsPerUser(binningJob, averagePostsPerUser);

		binningJob.setNumReduceTasks(0);

		binningJob.setInputFormatClass(TextInputFormat.class);
		TextInputFormat.addInputPath(binningJob, outputDirIntermediate);

		// Add two named outputs for below/above average
		MultipleOutputs.addNamedOutput(binningJob, MULTIPLE_OUTPUTS_BELOW_NAME,
				TextOutputFormat.class, Text.class, Text.class);

		MultipleOutputs.addNamedOutput(binningJob, MULTIPLE_OUTPUTS_ABOVE_NAME,
				TextOutputFormat.class, Text.class, Text.class);
		MultipleOutputs.setCountersEnabled(binningJob, true);

		TextOutputFormat.setOutputPath(binningJob, outputDir);

		// Add the user files to the DistributedCache
		FileStatus[] userFiles = FileSystem.get(conf).listStatus(userInput);
		for (FileStatus status : userFiles) {
			DistributedCache.addCacheFile(status.getPath().toUri(),
					binningJob.getConfiguration());
		}

		// Execute job and grab exit code
		code = binningJob.waitForCompletion(true) ? 0 : 1;
	}

	// Clean up the intermediate output
	FileSystem.get(conf).delete(outputDirIntermediate, true);

	System.exit(code);
}
 
源代码14 项目: 163-bigdate-note   文件: ParseLogJob.java
public int run(String[] args) throws Exception {
        //创建job
        Configuration config = getConf();
        //添加自定义配置
        config.addResource("mr.xml");
        Job job = Job.getInstance(config);
        //通过job设置一些参数
        //通过job设置一些参数
        job.setJarByClass(ParseLogJob.class);
        job.setJobName("parselog");
        job.setMapperClass(LogMapper.class);
        job.setReducerClass(LogReducer.class);
        job.setMapOutputKeyClass(TextLongWritable.class);
        job.setGroupingComparatorClass(TextLongGroupComparator.class);
        job.setPartitionerClass(TextLongPartition.class);
        job.setMapOutputValueClass(LogWritable.class);
        job.setOutputValueClass(Text.class);

        //设置CombineFileInputFormat
        job.setInputFormatClass(CombineTextInputFormat.class);

        //添加分布式缓存
        job.addCacheFile(new URI(config.get("ip.file.path")));

        //设置OutputFormat
        job.setOutputFormatClass(LogOutputFormat.class);


        //添加输入和输出数据
        FileInputFormat.addInputPath(job, new Path(args[0]));
        Path outputPath = new Path(args[1]);
        FileOutputFormat.setOutputPath(job, outputPath);


        //设置压缩类型
//        FileOutputFormat.setCompressOutput(job, true);
//        FileOutputFormat.setOutputCompressorClass(job, LzopCodec.class);

        FileSystem fs = FileSystem.get(config);
        if (fs.exists(outputPath)) {
            fs.delete(outputPath, true);
        }


        //运行程序
        if (!job.waitForCompletion(true)) {
            throw new RuntimeException(job.getJobName() + "failed!");
        }
        return 0;
    }
 
源代码15 项目: incubator-iotdb   文件: TSFMRReadExample.java
public static void main(String[] args)
    throws IOException, ClassNotFoundException, TSFHadoopException, URISyntaxException {

  if (args.length != 3) {
    System.out.println("Please give hdfs url, input path, output path");
    return;
  }
  String HDFSURL = args[0];
  Path inputPath = new Path(args[1]);
  Path outputPath = new Path(args[2]);

  Configuration configuration = new Configuration();
  // set file system configuration
  //configuration.set("fs.defaultFS", HDFSURL);
  Job job = Job.getInstance(configuration);

  FileSystem fs = FileSystem.get(configuration);
  if(fs.exists(outputPath)){
    fs.delete(outputPath,true);
  }

  job.setJobName("TsFile read jar");
  job.setJarByClass(TSFMRReadExample.class);
  // set mapper and reducer
  job.setMapperClass(TSMapper.class);
  job.setReducerClass(TSReducer.class);
  // set inputformat and outputformat
  job.setInputFormatClass(TSFInputFormat.class);
  // set mapper output key and value
  job.setMapOutputKeyClass(Text.class);
  job.setMapOutputValueClass(DoubleWritable.class);
  // set reducer output key and value
  job.setOutputKeyClass(Text.class);
  job.setOutputValueClass(DoubleWritable.class);
  // set input file path
  TSFInputFormat.setInputPaths(job, inputPath);
  // set output file path
  TSFOutputFormat.setOutputPath(job, outputPath);

  /**
   * special configuration for reading tsfile with TSFInputFormat
   */
  TSFInputFormat.setReadTime(job, true); // configure reading time enable
  TSFInputFormat.setReadDeviceId(job, true); // configure reading deltaObjectId enable
  String[] deviceIds = {"device_1"};// configure reading which deviceIds
  TSFInputFormat.setReadDeviceIds(job, deviceIds);
  String[] measurementIds = {"sensor_1", "sensor_2", "sensor_3"};// configure reading which measurementIds
  TSFInputFormat.setReadMeasurementIds(job, measurementIds);
  boolean isSuccess = false;
  try {
    isSuccess = job.waitForCompletion(true);
  } catch (InterruptedException e) {
    e.printStackTrace();
  }
  if (isSuccess) {
    System.out.println("Execute successfully");
  } else {
    System.out.println("Execute unsuccessfully");
  }
}
 
源代码16 项目: hadoop-louvain-community   文件: TestJob.java
public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {

        Configuration conf = new Configuration();
        Job job = new Job(conf);
        job.setJobName(TestJob.class.getName());
        job.setJarByClass(TestJob.class);
        job.setMapperClass(MapJob.class);
        job.setReducerClass(ReduceJob.class);

        // Hello there ZipFileInputFormat!
        job.setInputFormatClass(GraphInputFormat.class);
        job.setOutputFormatClass(TextOutputFormat.class);

        job.setOutputKeyClass(Text.class);
        job.setMapOutputValueClass(Text.class);
        job.setOutputValueClass(IntWritable.class);

        FileInputFormat.addInputPath(job, new Path(args[0]));
        TextOutputFormat.setOutputPath(job, new Path(args[1]));


        job.waitForCompletion(true);



    }
 
public int run(String[] args) throws Exception {

    GfxdDataSerializable.initTypes();

    Configuration conf = getConf();
    
    String hdfsHomeDir = args[0];
    String url         = args[1];
    String tableName   = args[2];

    System.out.println("TradeSecurityHdfsDataVerifier.run() invoked with " 
                       + " hdfsHomeDir = " + hdfsHomeDir 
                       + " url = " + url
                       + " tableName = " + tableName);

    // Job-specific params
    conf.set(RowInputFormat.HOME_DIR, hdfsHomeDir);
    conf.set(RowInputFormat.INPUT_TABLE, tableName);
    conf.setBoolean(RowInputFormat.CHECKPOINT_MODE, false);
    conf.set(RowOutputFormat.OUTPUT_TABLE,tableName + "_HDFS");
    conf.set(RowOutputFormat.OUTPUT_URL, url);
    
    
    Job job = Job.getInstance(conf, "TradeSecurityHdfsDataVerifierV2");
    job.setJobName("TradeSecurityHdfsDataVerifierV2");
    job.setInputFormatClass(RowInputFormat.class);
    job.setOutputFormatClass(RowOutputFormat.class);
    
      
    job.setMapperClass(HdfsDataMapper.class);
    job.setMapOutputKeyClass(Key.class);
    job.setMapOutputValueClass(TradeSecurityRow.class);   
    
    job.setReducerClass(HdfsDataReducer.class);  
    job.setOutputKeyClass(Key.class);
    job.setOutputValueClass(TradeSecurityOutputObject.class);
    
    StringBuffer aStr = new StringBuffer();
    aStr.append("HOME_DIR = " + conf.get(RowInputFormat.HOME_DIR) + " ");
    aStr.append("INPUT_TABLE = " + conf.get(RowInputFormat.INPUT_TABLE) + " ");
    aStr.append("OUTPUT_TABLE = " + conf.get(RowOutputFormat.OUTPUT_TABLE) + " ");
    aStr.append("OUTPUT_URL = " + conf.get(RowOutputFormat.OUTPUT_URL) + " ");
    System.out.println("VerifyHdfsData running with the following conf: " + aStr.toString());
    
    return job.waitForCompletion(false) ? 0 : 1;
  }
 
源代码18 项目: big-c   文件: ChainReducer.java
/**
 * Sets the {@link Reducer} class to the chain job.
 * 
 * <p>
 * The key and values are passed from one element of the chain to the next, by
 * value. For the added Reducer the configuration given for it,
 * <code>reducerConf</code>, have precedence over the job's Configuration.
 * This precedence is in effect when the task is running.
 * </p>
 * <p>
 * IMPORTANT: There is no need to specify the output key/value classes for the
 * ChainReducer, this is done by the setReducer or the addMapper for the last
 * element in the chain.
 * </p>
 * 
 * @param job
 *          the job
 * @param klass
 *          the Reducer class to add.
 * @param inputKeyClass
 *          reducer input key class.
 * @param inputValueClass
 *          reducer input value class.
 * @param outputKeyClass
 *          reducer output key class.
 * @param outputValueClass
 *          reducer output value class.
 * @param reducerConf
 *          a configuration for the Reducer class. It is recommended to use a
 *          Configuration without default values using the
 *          <code>Configuration(boolean loadDefaults)</code> constructor with
 *          FALSE.
 */
public static void setReducer(Job job, Class<? extends Reducer> klass,
    Class<?> inputKeyClass, Class<?> inputValueClass,
    Class<?> outputKeyClass, Class<?> outputValueClass,
    Configuration reducerConf) {
  job.setReducerClass(ChainReducer.class);
  job.setOutputKeyClass(outputKeyClass);
  job.setOutputValueClass(outputValueClass);
  Chain.setReducer(job, klass, inputKeyClass, inputValueClass,
      outputKeyClass, outputValueClass, reducerConf);
}
 
/**
 * @throws Exception If failed.
 */
@Test
public void testSimpleTaskSubmit() throws Exception {
    String testInputFile = "/test";

    prepareTestFile(testInputFile);

    Configuration cfg = new Configuration();

    setupFileSystems(cfg);

    Job job = Job.getInstance(cfg);

    job.setMapperClass(TestMapper.class);
    job.setCombinerClass(TestReducer.class);
    job.setReducerClass(TestReducer.class);

    job.setMapOutputKeyClass(Text.class);
    job.setMapOutputValueClass(IntWritable.class);
    job.setOutputKeyClass(Text.class);
    job.setOutputValueClass(IntWritable.class);

    job.setNumReduceTasks(1);

    FileInputFormat.setInputPaths(job, new Path("igfs://:" + getTestIgniteInstanceName(0) + "@/" + testInputFile));
    FileOutputFormat.setOutputPath(job, new Path("igfs://:" + getTestIgniteInstanceName(0) + "@/output"));

    job.setJarByClass(getClass());

    IgniteInternalFuture<?> fut = grid(0).hadoop().submit(new HadoopJobId(UUID.randomUUID(), 1),
        createJobInfo(job.getConfiguration(), null));

    fut.get();
}
 
源代码20 项目: ignite   文件: HadoopAbstractMapReduceTest.java
/**
 * Does actual test job
 *
 * @param useNewMapper flag to use new mapper API.
 * @param useNewCombiner flag to use new combiner API.
 * @param useNewReducer flag to use new reducer API.
 */
protected final void doTest(IgfsPath inFile, boolean useNewMapper, boolean useNewCombiner, boolean useNewReducer)
    throws Exception {
    log.info("useNewMapper=" + useNewMapper + ", useNewCombiner=" + useNewCombiner + ", useNewReducer=" + useNewReducer);

    igfs.delete(new IgfsPath(PATH_OUTPUT), true);

    JobConf jobConf = new JobConf();

    jobConf.set(HadoopCommonUtils.JOB_COUNTER_WRITER_PROPERTY, IgniteHadoopFileSystemCounterWriter.class.getName());
    jobConf.setUser(USER);
    jobConf.set(IgniteHadoopFileSystemCounterWriter.COUNTER_WRITER_DIR_PROPERTY, "/xxx/${USER}/zzz");

    //To split into about 40 items for v2
    jobConf.setInt(FileInputFormat.SPLIT_MAXSIZE, 65000);

    //For v1
    jobConf.setInt("fs.local.block.size", 65000);

    // File system coordinates.
    setupFileSystems(jobConf);

    HadoopWordCount1.setTasksClasses(jobConf, !useNewMapper, !useNewCombiner, !useNewReducer);

    Job job = Job.getInstance(jobConf);

    HadoopWordCount2.setTasksClasses(job, useNewMapper, useNewCombiner, useNewReducer, compressOutputSnappy());

    job.setOutputKeyClass(Text.class);
    job.setOutputValueClass(IntWritable.class);

    FileInputFormat.setInputPaths(job, new Path(igfsScheme() + inFile.toString()));
    FileOutputFormat.setOutputPath(job, new Path(igfsScheme() + PATH_OUTPUT));

    job.setJarByClass(HadoopWordCount2.class);

    HadoopJobId jobId = new HadoopJobId(UUID.randomUUID(), 1);

    IgniteInternalFuture<?> fut = grid(0).hadoop().submit(jobId, createJobInfo(job.getConfiguration(), null));

    fut.get();

    checkJobStatistics(jobId);

    final String outFile = PATH_OUTPUT + "/" + (useNewReducer ? "part-r-" : "part-") + "00000";

    checkOwner(new IgfsPath(PATH_OUTPUT + "/" + "_SUCCESS"));

    checkOwner(new IgfsPath(outFile));

    String actual = readAndSortFile(outFile, job.getConfiguration());

    assertEquals("Use new mapper: " + useNewMapper + ", new combiner: " + useNewCombiner + ", new reducer: " +
            useNewReducer,
        "blue\t" + blue + "\n" +
            "green\t" + green + "\n" +
            "red\t" + red + "\n" +
            "yellow\t" + yellow + "\n",
        actual
    );
}