org.apache.hadoop.mapreduce.Job#setMapOutputKeyClass ( )源码实例Demo

下面列出了org.apache.hadoop.mapreduce.Job#setMapOutputKeyClass ( ) 实例代码,或者点击链接到github查看源代码,也可以在右侧发表评论。

public static void main(String[] args) throws Exception {
    Configuration conf = new Configuration();
    String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();
    if (otherArgs.length < 2) {
        System.err.println("Usage: ElementAttributeValuesTest configFile outputDir");
        System.exit(2);
    }

    Job job = Job.getInstance(conf);
    job.setJarByClass(ElementAttributeValuesTest.class);
    job.setInputFormatClass(ValueInputFormat.class);
    job.setMapperClass(ElementAttrValueMapper.class);
    job.setMapOutputKeyClass(LongWritable.class);
    job.setMapOutputValueClass(Text.class);
    job.setOutputFormatClass(TextOutputFormat.class);
    FileOutputFormat.setOutputPath(job, new Path(otherArgs[1]));

    conf = job.getConfiguration();
    conf.addResource(otherArgs[0]);
    conf.setClass(MarkLogicConstants.INPUT_VALUE_CLASS, Text.class, 
            Writable.class);
    conf.setClass(MarkLogicConstants.INPUT_LEXICON_FUNCTION_CLASS, 
        ElementAttributeValuesFunction.class, ElementAttributeValues.class);

    System.exit(job.waitForCompletion(true) ? 0 : 1);
}
 
源代码2 项目: sqoop-on-spark   文件: MRJobTestUtil.java
@SuppressWarnings("deprecation")
public static boolean runJob(Configuration conf,
    Class<? extends InputFormat<?,?>> inputFormatClass,
    Class<? extends Mapper<?,?,?,?>> mapperClass,
    Class<? extends OutputFormat<?,?>> outputFormatClass) throws IOException,
    InterruptedException, ClassNotFoundException {
  Job job = new Job(conf);
  job.setInputFormatClass(inputFormatClass);
  job.setMapperClass(mapperClass);
  job.setMapOutputKeyClass(Text.class);
  job.setMapOutputValueClass(NullWritable.class);
  job.setOutputFormatClass(outputFormatClass);
  
  job.setOutputKeyClass(Text.class);
  job.setOutputValueClass(NullWritable.class);

  boolean ret = job.waitForCompletion(true);

  // Hadoop 1.0 (and 0.20) have nasty bug when job committer is not called in
  // LocalJobRuner
  if (isHadoop1()) {
    callOutputCommitter(job, outputFormatClass);
  }

  return ret;
}
 
源代码3 项目: cloud-bigtable-examples   文件: CellCounter.java
/**
 * Sets up the actual job.
 *
 * @param conf The current configuration.
 * @param args The command line parameters.
 * @return The newly created job.
 * @throws IOException When setting up the job fails.
 */
public static Job createSubmittableJob(Configuration conf, String[] args)
    throws IOException {
  String tableName = args[0];
  Path outputDir = new Path(args[1]);
  String reportSeparatorString = (args.length > 2) ? args[2]: ":";
  conf.set("ReportSeparator", reportSeparatorString);
  Job job = Job.getInstance(conf, conf.get(JOB_NAME_CONF_KEY, NAME + "_" + tableName));
  job.setJarByClass(CellCounter.class);
  Scan scan = getConfiguredScanForJob(conf, args);
  TableMapReduceUtil.initTableMapperJob(tableName, scan,
      CellCounterMapper.class, ImmutableBytesWritable.class, Result.class, job);
  job.setNumReduceTasks(1);
  job.setMapOutputKeyClass(Text.class);
  job.setMapOutputValueClass(IntWritable.class);
  job.setOutputFormatClass(TextOutputFormat.class);
  job.setOutputKeyClass(Text.class);
  job.setOutputValueClass(IntWritable.class);
  FileOutputFormat.setOutputPath(job, outputDir);
  job.setReducerClass(IntSumReducer.class);
  return job;
}
 
源代码4 项目: MapReduce-Demo   文件: PeopleRank.java
public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
	String namenode_ip = "192.168.17.10";
	String hdfs = "hdfs://" + namenode_ip + ":9000";			
	Configuration conf = new Configuration();		
	conf.set("fs.defaultFS", hdfs);
	conf.set("mapreduce.app-submission.cross-platform", "true");
	
	String jobName = "PeopleRank";
	Job job = Job.getInstance(conf, jobName);
	job.setJarByClass(PeopleRank.class);
	job.setJar("export\\PeopleRank.jar");
	job.setMapperClass(PeopleRankMapper.class);
	job.setMapOutputKeyClass(Text.class);
	job.setMapOutputValueClass(Text.class);
	job.setReducerClass(PeopleRankReducer.class);
	job.setOutputKeyClass(Text.class);
	job.setOutputValueClass(Text.class);
	
	String dataDir = "/expr/peoplerank/data";
	String outputDir = "/expr/peoplerank/output/adjacent";
	Path inPath = new Path(hdfs + dataDir);
	Path outPath = new Path(hdfs + outputDir);
	FileInputFormat.addInputPath(job, inPath);
	FileOutputFormat.setOutputPath(job, outPath);		
	FileSystem fs = FileSystem.get(conf);
	if(fs.exists(outPath)) {
		fs.delete(outPath, true);
	}

	System.out.println( "Job: " + jobName + " is running...");
	if(job.waitForCompletion(true)) {
		System.out.println("success!");
		System.exit(0);
	} else {
		System.out.println("failed!");
		System.exit(1);
	}
}
 
源代码5 项目: big-c   文件: DistCp.java
/**
 * Create Job object for submitting it, with all the configuration
 *
 * @return Reference to job object.
 * @throws IOException - Exception if any
 */
private Job createJob() throws IOException {
  String jobName = "distcp";
  String userChosenName = getConf().get(JobContext.JOB_NAME);
  if (userChosenName != null)
    jobName += ": " + userChosenName;
  Job job = Job.getInstance(getConf());
  job.setJobName(jobName);
  job.setInputFormatClass(DistCpUtils.getStrategy(getConf(), inputOptions));
  job.setJarByClass(CopyMapper.class);
  configureOutputFormat(job);

  job.setMapperClass(CopyMapper.class);
  job.setNumReduceTasks(0);
  job.setMapOutputKeyClass(Text.class);
  job.setMapOutputValueClass(Text.class);
  job.setOutputFormatClass(CopyOutputFormat.class);
  job.getConfiguration().set(JobContext.MAP_SPECULATIVE, "false");
  job.getConfiguration().set(JobContext.NUM_MAPS,
                String.valueOf(inputOptions.getMaxMaps()));

  if (inputOptions.getSslConfigurationFile() != null) {
    setupSSLConfig(job);
  }

  inputOptions.appendToConf(job.getConfiguration());
  return job;
}
 
@Override
protected void configureMapper(Job job, String tableName,
    String tableClassName) throws ClassNotFoundException, IOException {

  job.setMapperClass(getMapperClass());

  // Concurrent writes of the same records would be problematic.
  ConfigurationHelper.setJobMapSpeculativeExecution(job, false);

  job.setMapOutputKeyClass(SqoopRecord.class);
  job.setMapOutputValueClass(NullWritable.class);
}
 
源代码7 项目: rya   文件: JoinSelectStatisticsTest.java
@Override
public int run(String[] args) throws Exception {

    Configuration conf = getConf();
    String outpath = conf.get(OUTPUTPATH);
    
    Job job = new Job(conf, this.getClass().getSimpleName() + "_" + System.currentTimeMillis());
    job.setJarByClass(this.getClass());
    conf.setBoolean(MRJobConfig.MAPREDUCE_JOB_USER_CLASSPATH_FIRST, true);
    
    MultipleInputs.addInputPath(job, new Path(PROSPECTSOUT.getAbsolutePath()), 
            SequenceFileInputFormat.class, JoinSelectAggregateMapper.class);
    MultipleInputs.addInputPath(job,new Path(SPOOUT.getAbsolutePath()) , 
            SequenceFileInputFormat.class, JoinSelectAggregateMapper.class);
    job.setMapOutputKeyClass(CompositeType.class);
    job.setMapOutputValueClass(TripleCard.class);

    tempDir = new File(File.createTempFile(outpath, "txt").getParentFile(), System.currentTimeMillis() + "");
    SequenceFileOutputFormat.setOutputPath(job, new Path(tempDir.getAbsolutePath()));
    job.setOutputFormatClass(SequenceFileOutputFormat.class);
    job.setOutputKeyClass(TripleEntry.class);
    job.setOutputValueClass(CardList.class);


    job.setSortComparatorClass(JoinSelectSortComparator.class);
    job.setGroupingComparatorClass(JoinSelectGroupComparator.class);
    job.setPartitionerClass(JoinSelectPartitioner.class);
    job.setReducerClass(JoinReducer.class);
    job.setNumReduceTasks(32);
    job.waitForCompletion(true);
    
    return job.isSuccessful() ? 0 : 1;          
}
 
源代码8 项目: hbase   文件: TestTableInputFormatScanBase.java
/**
 * Tests an MR Scan initialized from properties set in the Configuration.
 */
protected void testScanFromConfiguration(String start, String stop, String last)
    throws IOException, InterruptedException, ClassNotFoundException {
  String jobName = "ScanFromConfig" + (start != null ? start.toUpperCase(Locale.ROOT) : "Empty") +
    "To" + (stop != null ? stop.toUpperCase(Locale.ROOT) : "Empty");
  Configuration c = new Configuration(TEST_UTIL.getConfiguration());
  c.set(TableInputFormat.INPUT_TABLE, TABLE_NAME.getNameAsString());
  c.set(TableInputFormat.SCAN_COLUMN_FAMILY,
    Bytes.toString(INPUT_FAMILYS[0]) + ", " + Bytes.toString(INPUT_FAMILYS[1]));
  c.set(KEY_STARTROW, start != null ? start : "");
  c.set(KEY_LASTROW, last != null ? last : "");

  if (start != null) {
    c.set(TableInputFormat.SCAN_ROW_START, start);
  }

  if (stop != null) {
    c.set(TableInputFormat.SCAN_ROW_STOP, stop);
  }

  Job job = Job.getInstance(c, jobName);
  job.setMapperClass(ScanMapper.class);
  job.setReducerClass(ScanReducer.class);
  job.setMapOutputKeyClass(ImmutableBytesWritable.class);
  job.setMapOutputValueClass(ImmutableBytesWritable.class);
  job.setInputFormatClass(TableInputFormat.class);
  job.setNumReduceTasks(1);
  FileOutputFormat.setOutputPath(job, new Path(job.getJobName()));
  TableMapReduceUtil.addDependencyJars(job);
  assertTrue(job.waitForCompletion(true));
}
 
源代码9 项目: MapReduce-Demo   文件: TopTenJob.java
public static void main(String[] args) throws Exception {
	String namenode_ip = "192.168.17.10";
	String hdfs = "hdfs://" + namenode_ip + ":9000";			
	Configuration conf = new Configuration();		
	conf.set("fs.defaultFS", hdfs);
	conf.set("mapreduce.app-submission.cross-platform", "true");
	
	String jobName = "TopTenJob";
	Job job = Job.getInstance(conf, jobName);
	job.setJarByClass(TopTenJob.class);
	job.setJar("export\\TopTen.jar");
	job.setMapperClass(TopTenMapper.class);
	job.setMapOutputKeyClass(NullWritable.class);
	job.setMapOutputValueClass(Text.class);
	job.setReducerClass(TopTenReducer.class);
	job.setOutputKeyClass(NullWritable.class);
	job.setOutputValueClass(Text.class);
	job.setNumReduceTasks(1);		//计算最终TopN,只能运行一个Reduce任务

	String dataDir = "/expr/topten/data";	
	String outputDir = "/expr/topten/output";
	Path inPath = new Path(hdfs + dataDir);
	Path outPath = new Path(hdfs + outputDir);
	FileInputFormat.addInputPath(job, inPath);
	FileOutputFormat.setOutputPath(job, outPath);		
	FileSystem fs = FileSystem.get(conf);
	if(fs.exists(outPath)) {
		fs.delete(outPath, true);
	}
	
	System.out.println( "Job: " + jobName + " is running...");
	if(job.waitForCompletion(true)) {
		System.out.println("success!");
		System.exit(0);
	} else {
		System.out.println("failed!");
		System.exit(1);
	}
}
 
源代码10 项目: rya   文件: JoinSelectStatsUtil.java
public static void initTableMRJob(Job job, String intable, String outtable, String auths) throws AccumuloSecurityException {
  Configuration conf = job.getConfiguration();
  String username = conf.get(USERNAME);
  String password = conf.get(PASSWORD);
  String instance = conf.get(INSTANCE);
  String zookeepers = conf.get(ZOOKEEPERS);

  System.out.println("Zookeepers are " + auths);

  if (zookeepers != null) {
    AccumuloInputFormat.setZooKeeperInstance(job, instance, zookeepers);
    AccumuloOutputFormat.setZooKeeperInstance(job, instance, zookeepers);
  } else {
    throw new IllegalArgumentException("Must specify either mock or zookeepers");
  }

  AccumuloInputFormat.setConnectorInfo(job, username, new PasswordToken(password));
  AccumuloInputFormat.setScanAuthorizations(job, new Authorizations(auths));
  AccumuloInputFormat.setInputTableName(job, intable);
  job.setInputFormatClass(AccumuloInputFormat.class);
  job.setMapOutputKeyClass(Text.class);
  job.setMapOutputValueClass(IntWritable.class);

  // OUTPUT
  AccumuloOutputFormat.setConnectorInfo(job, username, new PasswordToken(password));
  AccumuloOutputFormat.setDefaultTableName(job, outtable);
  job.setOutputFormatClass(AccumuloOutputFormat.class);
  job.setOutputKeyClass(Text.class);
  job.setOutputValueClass(Mutation.class);

}
 
源代码11 项目: RDFS   文件: SecondarySort.java
public static void main(String[] args) throws Exception {
  Configuration conf = new Configuration();
  String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();
  if (otherArgs.length != 2) {
    System.err.println("Usage: secondarysrot <in> <out>");
    System.exit(2);
  }
  Job job = new Job(conf, "secondary sort");
  job.setJarByClass(SecondarySort.class);
  job.setMapperClass(MapClass.class);
  job.setReducerClass(Reduce.class);

  // group and partition by the first int in the pair
  job.setPartitionerClass(FirstPartitioner.class);
  job.setGroupingComparatorClass(FirstGroupingComparator.class);

  // the map output is IntPair, IntWritable
  job.setMapOutputKeyClass(IntPair.class);
  job.setMapOutputValueClass(IntWritable.class);

  // the reduce output is Text, IntWritable
  job.setOutputKeyClass(Text.class);
  job.setOutputValueClass(IntWritable.class);
  
  FileInputFormat.addInputPath(job, new Path(otherArgs[0]));
  FileOutputFormat.setOutputPath(job, new Path(otherArgs[1]));
  System.exit(job.waitForCompletion(true) ? 0 : 1);
}
 
源代码12 项目: hiped2   文件: SortMapReduce.java
/**
 * The MapReduce driver - setup and launch the job.
 *
 * @param args the command-line arguments
 * @return the process exit code
 * @throws Exception if something goes wrong
 */
public int run(final String[] args) throws Exception {

  Cli cli = Cli.builder().setArgs(args).addOptions(CliCommonOpts.MrIoOpts.values()).build();
  int result = cli.runCmd();

  if (result != 0) {
    return result;
  }

  Path inputPath = new Path(cli.getArgValueAsString(CliCommonOpts.MrIoOpts.INPUT));
  Path outputPath = new Path(cli.getArgValueAsString(CliCommonOpts.MrIoOpts.OUTPUT));

  Configuration conf = super.getConf();
  Job job = new Job(conf);
  job.setJarByClass(SortMapReduce.class);

  job.setMapperClass(Map.class);
  job.setReducerClass(Reduce.class);

  job.setInputFormatClass(KeyValueTextInputFormat.class);

  job.setMapOutputKeyClass(Person.class);
  job.setMapOutputValueClass(Text.class);

  job.setOutputKeyClass(Text.class);
  job.setOutputValueClass(Text.class);

  job.setPartitionerClass(PersonNamePartitioner.class);
  job.setSortComparatorClass(PersonComparator.class);
  job.setGroupingComparatorClass(PersonNameComparator.class);

  FileInputFormat.setInputPaths(job, inputPath);
  FileOutputFormat.setOutputPath(job, outputPath);

  if (job.waitForCompletion(true)) {
    return 0;
  }
  return 1;
}
 
源代码13 项目: MapReduce-Demo   文件: PVMinMax2.java
public static void main(String[] args) throws Exception {		
	//1.设置HDFS配置信息
	String namenode_ip = "192.168.17.10";
	String hdfs = "hdfs://" + namenode_ip + ":9000";			
	Configuration conf = new Configuration();
	conf.set("fs.defaultFS", hdfs);
	conf.set("mapreduce.app-submission.cross-platform", "true");

	//2.设置MapReduce作业配置信息
	String jobName = "PVMinMax2";					//作业名称
	Job job = Job.getInstance(conf, jobName);
	job.setJarByClass(PVMinMax2.class);				//指定运行时作业类
	job.setJar("export\\PVMinMax2.jar");			//指定本地jar包
	job.setMapperClass(PVMinMax2Mapper.class);		//指定Mapper类
	job.setMapOutputKeyClass(Text.class);			//设置Mapper输出Key类型
	job.setMapOutputValueClass(Text.class);			//设置Mapper输出Value类型
	job.setReducerClass(PVMinMax2Reducer.class);	//指定Reducer类
	job.setOutputKeyClass(Text.class);				//设置Reduce输出Key类型
	job.setOutputValueClass(Text.class); 	//设置Reduce输出Value类型
	
	//3.设置作业输入和输出路径
	String dataDir = "/expr/weblog/output5_1";			//实验数据目录	
	String outputDir = "/expr/weblog/output5_2";		//实验输出目录
	Path inPath = new Path(hdfs + dataDir);
	Path outPath = new Path(hdfs + outputDir);
	FileInputFormat.addInputPath(job, inPath);
	FileOutputFormat.setOutputPath(job, outPath);
	FileSystem fs = FileSystem.get(conf);
	if(fs.exists(outPath)) {
		fs.delete(outPath, true);
	}
	
	//4.运行作业
	System.out.println("Job: " + jobName + " is running...");
	if(job.waitForCompletion(true)) {
		System.out.println("success!");
		System.exit(0);
	} else {
		System.out.println("failed!");
		System.exit(1);
	}
}
 
源代码14 项目: stratio-cassandra   文件: WordCount.java
public int run(String[] args) throws Exception
{
    String outputReducerType = "filesystem";
    if (args != null && args[0].startsWith(OUTPUT_REDUCER_VAR))
    {
        String[] s = args[0].split("=");
        if (s != null && s.length == 2)
            outputReducerType = s[1];
    }
    logger.info("output reducer type: " + outputReducerType);

    // use a smaller page size that doesn't divide the row count evenly to exercise the paging logic better
    ConfigHelper.setRangeBatchSize(getConf(), 99);

    for (int i = 0; i < WordCountSetup.TEST_COUNT; i++)
    {
        String columnName = "text" + i;

        Job job = new Job(getConf(), "wordcount");
        job.setJarByClass(WordCount.class);
        job.setMapperClass(TokenizerMapper.class);

        if (outputReducerType.equalsIgnoreCase("filesystem"))
        {
            job.setCombinerClass(ReducerToFilesystem.class);
            job.setReducerClass(ReducerToFilesystem.class);
            job.setOutputKeyClass(Text.class);
            job.setOutputValueClass(IntWritable.class);
            FileOutputFormat.setOutputPath(job, new Path(OUTPUT_PATH_PREFIX + i));
        }
        else
        {
            job.setReducerClass(ReducerToCassandra.class);

            job.setMapOutputKeyClass(Text.class);
            job.setMapOutputValueClass(IntWritable.class);
            job.setOutputKeyClass(ByteBuffer.class);
            job.setOutputValueClass(List.class);

            job.setOutputFormatClass(ColumnFamilyOutputFormat.class);

            ConfigHelper.setOutputColumnFamily(job.getConfiguration(), KEYSPACE, OUTPUT_COLUMN_FAMILY);
            job.getConfiguration().set(CONF_COLUMN_NAME, "sum");
        }

        job.setInputFormatClass(ColumnFamilyInputFormat.class);

        ConfigHelper.setInputRpcPort(job.getConfiguration(), "9160");
        ConfigHelper.setInputInitialAddress(job.getConfiguration(), "localhost");
        ConfigHelper.setInputPartitioner(job.getConfiguration(), "Murmur3Partitioner");
        ConfigHelper.setInputColumnFamily(job.getConfiguration(), KEYSPACE, COLUMN_FAMILY);
        SlicePredicate predicate = new SlicePredicate().setColumn_names(Arrays.asList(ByteBufferUtil.bytes(columnName)));
        ConfigHelper.setInputSlicePredicate(job.getConfiguration(), predicate);

        if (i == 4)
        {
            IndexExpression expr = new IndexExpression(ByteBufferUtil.bytes("int4"), IndexOperator.EQ, ByteBufferUtil.bytes(0));
            ConfigHelper.setInputRange(job.getConfiguration(), Arrays.asList(expr));
        }

        if (i == 5)
        {
            // this will cause the predicate to be ignored in favor of scanning everything as a wide row
            ConfigHelper.setInputColumnFamily(job.getConfiguration(), KEYSPACE, COLUMN_FAMILY, true);
        }

        ConfigHelper.setOutputInitialAddress(job.getConfiguration(), "localhost");
        ConfigHelper.setOutputPartitioner(job.getConfiguration(), "Murmur3Partitioner");

        job.waitForCompletion(true);
    }
    return 0;
}
 
public int run(String[] args) throws Exception {
  GfxdDataSerializable.initTypes();

  Configuration conf = getConf();
  
  String hdfsHomeDir = args[0];
  String url         = args[1];
  String tableName   = args[2];

  System.out.println("TradeBuyOrdersHdfsDataVerifier.run() invoked with " 
                     + " hdfsHomeDir = " + hdfsHomeDir 
                     + " url = " + url
                     + " tableName = " + tableName);

  // Job-specific params
  conf.set(RowInputFormat.HOME_DIR, hdfsHomeDir);
  conf.set(RowInputFormat.INPUT_TABLE, tableName);
  conf.setBoolean(RowInputFormat.CHECKPOINT_MODE, false);
  conf.set(RowOutputFormat.OUTPUT_TABLE,tableName + "_HDFS");
  conf.set(RowOutputFormat.OUTPUT_URL, url);
  
  
  Job job = Job.getInstance(conf, "TradeBuyOrdersHdfsDataVerifierV2");
  job.setJobName("TradeBuyOrdersHdfsDataVerifierV2");
  job.setInputFormatClass(RowInputFormat.class);
  job.setOutputFormatClass(RowOutputFormat.class);
  
    
  job.setMapperClass(HdfsDataMapper.class);
  job.setMapOutputKeyClass(Key.class);
  job.setMapOutputValueClass(TradeBuyOrdersRow.class);   
  
  job.setReducerClass(HdfsDataReducer.class);  
  job.setOutputKeyClass(Key.class);
  job.setOutputValueClass(TradeBuyOrdersOutputObject.class);
  
  StringBuffer aStr = new StringBuffer();
  aStr.append("HOME_DIR = " + conf.get(RowInputFormat.HOME_DIR) + " ");
  aStr.append("INPUT_TABLE = " + conf.get(RowInputFormat.INPUT_TABLE) + " ");
  aStr.append("OUTPUT_TABLE = " + conf.get(RowOutputFormat.OUTPUT_TABLE) + " ");
  aStr.append("OUTPUT_URL = " + conf.get(RowOutputFormat.OUTPUT_URL) + " ");
  System.out.println("VerifyHdfsData running with the following conf: " + aStr.toString());
  
  return job.waitForCompletion(false) ? 0 : 1;
}
 
源代码16 项目: clickstream-tutorial   文件: MRSessionize.java
public static void main(String[] args) throws Exception {
    Configuration conf = new Configuration();
    String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();
    if (otherArgs.length != 2) {
        System.err.println("Usage: MRSessionize <in> <out>");
        System.exit(2);
    }
    Job job = new Job(conf, "MapReduce Sessionization");
    job.setJarByClass(MRSessionize.class);
    job.setMapperClass(SessionizeMapper.class);
    job.setReducerClass(SessionizeReducer.class);

    // WARNING: do NOT set the Combiner class
    // from the same IP in one place before we can do sessionization
    // Also, our reducer doesn't return the same key,value types it takes
    // It can't be used on the result of a previous reducer

    job.setMapOutputKeyClass(IpTimestampKey.class);
    job.setMapOutputValueClass(Text.class);
    job.setOutputKeyClass(NullWritable.class);
    job.setOutputValueClass(Text.class);

    // We need these for secondary sorting.
    // We need to shuffle the records (between Map and Reduce phases) by using IP address as key, since that is
    // the field we are using for determining uniqueness of users. However, when the records arrive to the reducers,
    // we would like them to be sorted in ascending order of their timestamps. This concept is known as secondary
    // sorting since we are "secondarily" sorting the records by another key (timestamp, in our case) in addition
    // to the shuffle key (also called the "partition" key).

    // So, to get some terminology straight.
    // Natural key (aka Shuffle key or Partition key) is the key we use to shuffle. IP address in our case
    // Secondary Sorting Key is the key we use to sort within each partition that gets sent to the user. Timestamp
    // in our case.
    // Together, the natural key and secondary sorting key form what we call the composite key. This key is called
    // IpTimestampKey in our example.

    // For secondary sorting, even though we are partitioning and shuffling by only the natural key, the map output
    // key and the reduce input key is the composite key. We, however, use a custom partitioner and custom grouping
    // comparator that only uses the natural key part of the composite key to partition and group respectively (both
    // happen during the shuffle phase).

    // However, we have a different sort comparator which also gets used in the shuffle phase but determines how
    // the records are sorted when they enter the reduce phase. This custom sort comparator in our case will make use
    // of the entire composite key.

    // We found http://vangjee.wordpress.com/2012/03/20/secondary-sorting-aka-sorting-values-in-hadoops-mapreduce-programming-paradigm/
    // to be very helpful, if you'd like to read more on the subject.
    job.setPartitionerClass(NaturalKeyPartitioner.class);
    job.setGroupingComparatorClass(NaturalKeyComparator.class);
    job.setSortComparatorClass(CompositeKeyComparator.class);

    FileInputFormat.addInputPath(job, new Path(otherArgs[0]));
    FileOutputFormat.setOutputPath(job, new Path(otherArgs[1]));
    System.exit(job.waitForCompletion(true) ? 0 : 1);
}
 
源代码17 项目: MapReduce-Demo   文件: FixedLengthInput2.java
public static void main(String[] args) throws Exception {		
	//1.设置HDFS配置信息
	String namenode_ip = "192.168.17.10";
	String hdfs = "hdfs://" + namenode_ip + ":9000";			
	Configuration conf = new Configuration();
	conf.set("fs.defaultFS", hdfs);
	conf.set("mapreduce.app-submission.cross-platform", "true");
	conf.setInt(FixedLengthInputFormat.FIXED_RECORD_LENGTH, 13);
	
	//2.设置MapReduce作业配置信息
	String jobName = "FixedLengthInput2";					//作业名称
	Job job = Job.getInstance(conf, jobName);
	job.setJarByClass(FixedLengthInput2.class);				//指定运行时作业类
	job.setJar("export\\FixedLengthInput2.jar");				//指定本地jar包
	job.setMapperClass(FixedLengthInput2Mapper.class);		//指定Mapper类
	job.setMapOutputKeyClass(Text.class);			//设置Mapper输出Key类型
	job.setMapOutputValueClass(IntWritable.class);		//设置Mapper输出Value类型
	job.setReducerClass(FixedLengthInput2Reducer.class);		//指定Reducer类
	job.setOutputKeyClass(Text.class);				//设置Reduce输出Key类型
	job.setOutputValueClass(IntWritable.class); 			//设置Reduce输出Value类型
	
	job.setInputFormatClass(FixedLengthInputFormat.class);	//设置输入格式化类
	
	//3.设置作业输入和输出路径
	String dataDir = "/expr/fixedinput/data";			//实验数据目录	
	String outputDir = "/expr/fixedinput/output";		//实验输出目录
	Path inPath = new Path(hdfs + dataDir);
	Path outPath = new Path(hdfs + outputDir);
	FileInputFormat.addInputPath(job, inPath);
	FileOutputFormat.setOutputPath(job, outPath);
	FileSystem fs = FileSystem.get(conf);
	if(fs.exists(outPath)) {
		fs.delete(outPath, true);
	}
	
	//4.运行作业
	System.out.println("Job: " + jobName + " is running...");
	if(job.waitForCompletion(true)) {
		System.out.println("success!");
		System.exit(0);
	} else {
		System.out.println("failed!");
		System.exit(1);
	}
}
 
源代码18 项目: incubator-tez   文件: MRRSleepJob.java
@VisibleForTesting
public Job createJob(int numMapper, int numReducer, int iReduceStagesCount,
    int numIReducer, long mapSleepTime, int mapSleepCount,
    long reduceSleepTime, int reduceSleepCount,
    long iReduceSleepTime, int iReduceSleepCount)
        throws IOException {
  Configuration conf = getConf();
  conf.setLong(MAP_SLEEP_TIME, mapSleepTime);
  conf.setLong(REDUCE_SLEEP_TIME, reduceSleepTime);
  conf.setLong(IREDUCE_SLEEP_TIME, iReduceSleepTime);
  conf.setInt(MAP_SLEEP_COUNT, mapSleepCount);
  conf.setInt(REDUCE_SLEEP_COUNT, reduceSleepCount);
  conf.setInt(IREDUCE_SLEEP_COUNT, iReduceSleepCount);
  conf.setInt(MRJobConfig.NUM_MAPS, numMapper);
  conf.setInt(IREDUCE_STAGES_COUNT, iReduceStagesCount);
  conf.setInt(IREDUCE_TASKS_COUNT, numIReducer);

  // Configure intermediate reduces
  conf.setInt(
      org.apache.tez.mapreduce.hadoop.MRJobConfig.MRR_INTERMEDIATE_STAGES,
      iReduceStagesCount);
  LOG.info("Running MRR with " + iReduceStagesCount + " IR stages");

  for (int i = 1; i <= iReduceStagesCount; ++i) {
    // Set reducer class for intermediate reduce
    conf.setClass(
        MultiStageMRConfigUtil.getPropertyNameForIntermediateStage(i,
            "mapreduce.job.reduce.class"), ISleepReducer.class, Reducer.class);
    // Set reducer output key class
    conf.setClass(
        MultiStageMRConfigUtil.getPropertyNameForIntermediateStage(i,
            "mapreduce.map.output.key.class"), IntWritable.class, Object.class);
    // Set reducer output value class
    conf.setClass(
        MultiStageMRConfigUtil.getPropertyNameForIntermediateStage(i,
            "mapreduce.map.output.value.class"), IntWritable.class, Object.class);
    conf.setInt(
        MultiStageMRConfigUtil.getPropertyNameForIntermediateStage(i,
            "mapreduce.job.reduces"), numIReducer);
  }

  Job job = Job.getInstance(conf, "sleep");
  job.setNumReduceTasks(numReducer);
  job.setJarByClass(MRRSleepJob.class);
  job.setNumReduceTasks(numReducer);
  job.setMapperClass(SleepMapper.class);
  job.setMapOutputKeyClass(IntWritable.class);
  job.setMapOutputValueClass(IntWritable.class);
  job.setReducerClass(SleepReducer.class);
  job.setOutputFormatClass(NullOutputFormat.class);
  job.setInputFormatClass(SleepInputFormat.class);
  job.setPartitionerClass(MRRSleepJobPartitioner.class);
  job.setSpeculativeExecution(false);
  job.setJobName("Sleep job");

  FileInputFormat.addInputPath(job, new Path("ignored"));
  return job;
}
 
源代码19 项目: MapReduce-Demo   文件: FlowSort.java
public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
	// 设置HDFS配置信息
	String namenode_ip = "192.168.17.10";
	String hdfs = "hdfs://" + namenode_ip + ":9000";
	Configuration conf = new Configuration();
	conf.set("fs.defaultFS", hdfs);
	conf.set("mapreduce.app-submission.cross-platform", "true");

	// 设置job配置信息
	Job job = Job.getInstance(conf, "FlowSort");
	job.setJarByClass(FlowSort.class);
	job.setJar("export\\FlowSort.jar");
	// Mapper
	job.setMapperClass(SortMapper.class);
	job.setMapOutputKeyClass(MySortKey.class);
	job.setMapOutputValueClass(Text.class);
	// Reducer
	job.setReducerClass(SortReducer.class);
	job.setOutputKeyClass(Text.class);
	job.setOutputKeyClass(MySortKey.class);
	// 作业输入输出路径
	String dataDir = "/workspace/flowStatistics/output/part-r-00000"; // 实验数据目录
	String outputDir = "/workspace/flowStatistics/output_sort"; // 实验输出目录
	Path inPath = new Path(hdfs + dataDir);
	Path outPath = new Path(hdfs + outputDir);
	FileInputFormat.addInputPath(job, inPath);
	FileOutputFormat.setOutputPath(job, outPath);
	FileSystem fs = FileSystem.get(conf);
	if (fs.exists(outPath)) {
		fs.delete(outPath, true);
	}
	// 运行作业
	System.out.println("Job: FlowSort is running...");
	if (job.waitForCompletion(true)) {
		System.out.println("success!");
		System.exit(0);
	} else {
		System.out.println("failed!");
		System.exit(1);
	}
}
 
源代码20 项目: big-c   文件: DBCountPageView.java
@Override
//Usage DBCountPageView [driverClass dburl]
public int run(String[] args) throws Exception {
  
  String driverClassName = DRIVER_CLASS;
  String url = DB_URL;
  
  if(args.length > 1) {
    driverClassName = args[0];
    url = args[1];
  }
  
  initialize(driverClassName, url);
  Configuration conf = getConf();

  DBConfiguration.configureDB(conf, driverClassName, url);

  Job job = new Job(conf);
      
  job.setJobName("Count Pageviews of URLs");
  job.setJarByClass(DBCountPageView.class);
  job.setMapperClass(PageviewMapper.class);
  job.setCombinerClass(LongSumReducer.class);
  job.setReducerClass(PageviewReducer.class);

  DBInputFormat.setInput(job, AccessRecord.class, "Access"
      , null, "url", AccessFieldNames);

  DBOutputFormat.setOutput(job, "Pageview", PageviewFieldNames);
  
  job.setMapOutputKeyClass(Text.class);
  job.setMapOutputValueClass(LongWritable.class);

  job.setOutputKeyClass(PageviewRecord.class);
  job.setOutputValueClass(NullWritable.class);
  int ret;
  try {
    ret = job.waitForCompletion(true) ? 0 : 1;
    boolean correct = verify();
    if(!correct) {
      throw new RuntimeException("Evaluation was not correct!");
    }
  } finally {
    shutdown();    
  }
  return ret;
}