org.apache.hadoop.mapreduce.Job#setReducerClass ( )源码实例Demo

下面列出了org.apache.hadoop.mapreduce.Job#setReducerClass ( ) 实例代码,或者点击链接到github查看源代码,也可以在右侧发表评论。

private void configueAvroMergeJob(Configuration conf, Job job, Path oldPath, Path newPath)
    throws IOException {
  LOG.info("Trying to merge avro files");
  final Schema oldPathSchema = AvroUtil.getAvroSchema(oldPath, conf);
  final Schema newPathSchema = AvroUtil.getAvroSchema(newPath, conf);
  if (oldPathSchema == null || newPathSchema == null || !oldPathSchema.equals(newPathSchema)) {
    throw new IOException("Invalid schema for input directories. Schema for old data: ["
        + oldPathSchema + "]. Schema for new data: [" + newPathSchema + "]");
  }
  LOG.debug("Avro Schema:" + oldPathSchema);
  job.setInputFormatClass(AvroInputFormat.class);
  job.setOutputFormatClass(AvroOutputFormat.class);
  job.setMapperClass(MergeAvroMapper.class);
  job.setReducerClass(MergeAvroReducer.class);
  AvroJob.setOutputSchema(job.getConfiguration(), oldPathSchema);
}
 
源代码2 项目: IntroToHadoopAndMR__Udacity_Course   文件: P1.java
public final static void main(final String[] args) throws Exception {
	final Configuration conf = new Configuration();

	final Job job = new Job(conf, "P1");
	job.setJarByClass(P1.class);

	job.setOutputKeyClass(Text.class);
	job.setOutputValueClass(DoubleWritable.class);

	job.setMapperClass(P1Map.class);
	job.setCombinerClass(P1Reduce.class);
	job.setReducerClass(P1Reduce.class);

	job.setInputFormatClass(TextInputFormat.class);
	job.setOutputFormatClass(TextOutputFormat.class);

	FileInputFormat.addInputPath(job, new Path(args[0]));
	FileOutputFormat.setOutputPath(job, new Path(args[1]));

	job.waitForCompletion(true);
}
 
源代码3 项目: hiped2   文件: UniqueHashedKeyJob.java
public static void runJob(Configuration conf,
                          Path inputPath,
                          Path outputPath)
    throws Exception {

  Job job = new Job(conf);

  job.setJarByClass(UniqueHashedKeyJob.class);
  job.setMapperClass(Map.class);
  job.setReducerClass(Reduce.class);

  job.setInputFormatClass(KeyValueTextInputFormat.class);
  job.setMapOutputKeyClass(Text.class);
  job.setMapOutputValueClass(NullWritable.class);
  job.setOutputKeyClass(Text.class);
  job.setOutputValueClass(NullWritable.class);

  outputPath.getFileSystem(conf).delete(outputPath, true);

  FileInputFormat.setInputPaths(job, inputPath);
  FileOutputFormat.setOutputPath(job, outputPath);

  if (!job.waitForCompletion(true)) {
    throw new Exception("Job failed");
  }
}
 
源代码4 项目: MapReduce-Demo   文件: Step4.java
public static boolean run(Configuration config, Map<String, String> paths) 
		throws IOException, ClassNotFoundException, InterruptedException {
	String jobName = "step4";
	Job job = Job.getInstance(config, jobName);
	job.setJarByClass(Step4.class);
	job.setJar("export\\ItemCF.jar");
	job.setMapperClass(Step4_Mapper.class);
	job.setReducerClass(Step4_Reducer.class);
	job.setMapOutputKeyClass(Text.class);
	job.setMapOutputValueClass(Text.class);

	Path[] inPaths = new Path[] { 
			new Path(paths.get("Step4Input1")),
			new Path(paths.get("Step4Input2")) };		
	Path outpath = new Path(paths.get("Step4Output"));
	FileInputFormat.setInputPaths(job, inPaths);
	FileOutputFormat.setOutputPath(job, outpath);		
	FileSystem fs = FileSystem.get(config);
	if (fs.exists(outpath)) {
		fs.delete(outpath, true);
	}
	
	return job.waitForCompletion(true);
}
 
源代码5 项目: spork   文件: WordCount.java
public static void main(String[] args) throws Exception {
    Configuration conf = new Configuration();
    String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();
    if (otherArgs.length < 2) {
        System.err.println("Usage: wordcount <in> <out> [wordcount stop word file]");
        System.exit(2);
    }
    Job job = new Job(conf, "word count");
    job.setJarByClass(WordCount.class);
    job.setMapperClass(TokenizerMapper.class);
    job.setCombinerClass(IntSumReducer.class);
    job.setReducerClass(IntSumReducer.class);
    job.setOutputKeyClass(Text.class);
    job.setOutputValueClass(IntWritable.class);
    FileInputFormat.addInputPath(job, new Path(otherArgs[0]));
    FileOutputFormat.setOutputPath(job, new Path(otherArgs[1]));
    if(otherArgs.length > 2){
        job.getConfiguration().set(STOP_WORDS_FILE, otherArgs[2]);
    }
    System.exit(job.waitForCompletion(true) ? 0 : 1);
}
 
源代码6 项目: hadoop   文件: BaileyBorweinPlouffe.java
/** Create and setup a job */
private static Job createJob(String name, Configuration conf
    ) throws IOException {
  final Job job = Job.getInstance(conf, NAME + "_" + name);
  final Configuration jobconf = job.getConfiguration();
  job.setJarByClass(BaileyBorweinPlouffe.class);

  // setup mapper
  job.setMapperClass(BbpMapper.class);
  job.setMapOutputKeyClass(LongWritable.class);
  job.setMapOutputValueClass(BytesWritable.class);

  // setup reducer
  job.setReducerClass(BbpReducer.class);
  job.setOutputKeyClass(LongWritable.class);
  job.setOutputValueClass(BytesWritable.class);
  job.setNumReduceTasks(1);

  // setup input
  job.setInputFormatClass(BbpInputFormat.class);

  // disable task timeout
  jobconf.setLong(MRJobConfig.TASK_TIMEOUT, 0);

  // do not use speculative execution
  jobconf.setBoolean(MRJobConfig.MAP_SPECULATIVE, false);
  jobconf.setBoolean(MRJobConfig.REDUCE_SPECULATIVE, false);
  return job;
}
 
源代码7 项目: MapReduce-Demo   文件: TopTenJob.java
public static void main(String[] args) throws Exception {
	String namenode_ip = "192.168.17.10";
	String hdfs = "hdfs://" + namenode_ip + ":9000";			
	Configuration conf = new Configuration();		
	conf.set("fs.defaultFS", hdfs);
	conf.set("mapreduce.app-submission.cross-platform", "true");
	
	String jobName = "TopTenJob";
	Job job = Job.getInstance(conf, jobName);
	job.setJarByClass(TopTenJob.class);
	job.setJar("export\\TopTen.jar");
	job.setMapperClass(TopTenMapper.class);
	job.setMapOutputKeyClass(NullWritable.class);
	job.setMapOutputValueClass(Text.class);
	job.setReducerClass(TopTenReducer.class);
	job.setOutputKeyClass(NullWritable.class);
	job.setOutputValueClass(Text.class);
	job.setNumReduceTasks(1);		//计算最终TopN,只能运行一个Reduce任务

	String dataDir = "/expr/topten/data";	
	String outputDir = "/expr/topten/output";
	Path inPath = new Path(hdfs + dataDir);
	Path outPath = new Path(hdfs + outputDir);
	FileInputFormat.addInputPath(job, inPath);
	FileOutputFormat.setOutputPath(job, outPath);		
	FileSystem fs = FileSystem.get(conf);
	if(fs.exists(outPath)) {
		fs.delete(outPath, true);
	}
	
	System.out.println( "Job: " + jobName + " is running...");
	if(job.waitForCompletion(true)) {
		System.out.println("success!");
		System.exit(0);
	} else {
		System.out.println("failed!");
		System.exit(1);
	}
}
 
源代码8 项目: yuzhouwan   文件: PatentMainController.java
/**
 * Main方法里面,设置了 Patent任务流程,Mapper ->Combiner ->Reducer ->Partitioner.
 *
 * @param args
 * @throws Exception
 */
public static void main(String[] args) throws Exception {

    //配置 Job,并完成初始化
    Job job = Job.getInstance(new Configuration());

    //指定 Job的主类
    job.setJarByClass(PatentMainController.class);

    //指定 Job的 Mapper组件
    job.setMapperClass(PatentMapper.class);
    job.setMapOutputKeyClass(Text.class);
    job.setMapOutputValueClass(Text.class);
    //指定 Job的数据输入地址
    FileInputFormat.setInputPaths(job, new Path(args[0]));

    //指定 Job的 Combiner组件
    job.setCombinerClass(InverseIndexByKeywordCombiner.class);
    job.setReducerClass(InverseIndexByKeywordCombiner.class);

    //指定 Job的 Reducer组件
    job.setReducerClass(PatentReducer.class);
    job.setOutputKeyClass(Text.class);
    job.setOutputValueClass(Text.class);
    //指定 Job的数据输出地址
    FileOutputFormat.setOutputPath(job, new Path(args[1]));

    job.setPartitionerClass(PatentPartitioner.class);
    //指定最大的 Task数量
    job.setNumReduceTasks(ConfUtil.getMax());

    //提交并等待执行完成
    job.waitForCompletion(true);
}
 
源代码9 项目: incubator-gobblin   文件: MRTaskFactoryTest.java
@Override
public List<WorkUnit> getWorkunits(SourceState state) {
  List<String> dirs = Splitter.on(",").splitToList(state.getProp(INPUT_DIRECTORIES_KEY));
  String outputBase = state.getProp(OUTPUT_LOCATION);
  List<WorkUnit> workUnits = Lists.newArrayList();

  for (String dir : dirs) {
    try {
      Path input = new Path(dir);
      Path output = new Path(outputBase, input.getName());

      WorkUnit workUnit = new WorkUnit();
      TaskUtils.setTaskFactoryClass(workUnit, MRTaskFactory.class);
      Configuration conf = new Configuration();
      Job job = Job.getInstance(conf, "WordCount_" + input.getName());
      job.setJarByClass(MRTaskFactoryTest.class);
      job.setMapperClass(TokenizerMapper.class);
      job.setCombinerClass(IntSumReducer.class);
      job.setReducerClass(IntSumReducer.class);
      job.setOutputKeyClass(Text.class);
      job.setOutputValueClass(IntWritable.class);
      job.setNumReduceTasks(1);
      FileInputFormat.addInputPath(job, input);
      FileOutputFormat.setOutputPath(job, output);

      MRTask.serializeJobToState(workUnit, job);
      workUnits.add(workUnit);
    } catch (IOException ioe) {
      log.error("Failed to create MR job for " + dir, ioe);
    }
  }

  return workUnits;
}
 
源代码10 项目: hadoop-gpu   文件: SecondarySort.java
public static void main(String[] args) throws Exception {
  Configuration conf = new Configuration();
  String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();
  if (otherArgs.length != 2) {
    System.err.println("Usage: secondarysrot <in> <out>");
    System.exit(2);
  }
  Job job = new Job(conf, "secondary sort");
  job.setJarByClass(SecondarySort.class);
  job.setMapperClass(MapClass.class);
  job.setReducerClass(Reduce.class);

  // group and partition by the first int in the pair
  job.setPartitionerClass(FirstPartitioner.class);
  job.setGroupingComparatorClass(FirstGroupingComparator.class);

  // the map output is IntPair, IntWritable
  job.setMapOutputKeyClass(IntPair.class);
  job.setMapOutputValueClass(IntWritable.class);

  // the reduce output is Text, IntWritable
  job.setOutputKeyClass(Text.class);
  job.setOutputValueClass(IntWritable.class);
  
  FileInputFormat.addInputPath(job, new Path(otherArgs[0]));
  FileOutputFormat.setOutputPath(job, new Path(otherArgs[1]));
  System.exit(job.waitForCompletion(true) ? 0 : 1);
}
 
源代码11 项目: laser   文件: OnlineFeatureDriver.java
public static long run(String collection, Path input, Path output,
		Configuration baseConf) throws IOException, ClassNotFoundException,
		InterruptedException {
	Configuration conf = new Configuration(baseConf);
	Job job = Job.getInstance(conf);

	job.setJarByClass(OnlineFeatureDriver.class);
	job.setJobName("GROUP each record's feature BY identifier");

	FileInputFormat.setInputPaths(job, input);
	FileOutputFormat.setOutputPath(job, output);

	job.setInputFormatClass(SequenceFileInputFormat.class);
	job.setOutputFormatClass(SequenceFileOutputFormat.class);

	job.setMapOutputKeyClass(Text.class);
	job.setMapOutputValueClass(OnlineVectorWritable.class);
	job.setOutputKeyClass(Text.class);
	job.setOutputValueClass(ListWritable.class);

	job.setMapperClass(OnlineFeatureMapper.class);
	job.setReducerClass(OnlineFeatureReducer.class);

	HadoopUtil.delete(conf, output);
	boolean succeeded = job.waitForCompletion(true);
	if (!succeeded) {
		throw new IllegalStateException("Job:Group feature,  Failed!");
	}
	Counter counter = job.getCounters().findCounter(
			"org.apache.hadoop.mapred.Task$Counter",
			"REDUCE_OUTPUT_RECORDS");
	long reduceOutputRecords = counter.getValue();

	LOG.info(
			"Job: GROUP each record's feature BY identifier, output recordes = {}",
			reduceOutputRecords);

	return reduceOutputRecords;
}
 
源代码12 项目: hadoop   文件: SecondarySort.java
public static void main(String[] args) throws Exception {
  Configuration conf = new Configuration();
  String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();
  if (otherArgs.length != 2) {
    System.err.println("Usage: secondarysort <in> <out>");
    System.exit(2);
  }
  Job job = Job.getInstance(conf, "secondary sort");
  job.setJarByClass(SecondarySort.class);
  job.setMapperClass(MapClass.class);
  job.setReducerClass(Reduce.class);

  // group and partition by the first int in the pair
  job.setPartitionerClass(FirstPartitioner.class);
  job.setGroupingComparatorClass(FirstGroupingComparator.class);

  // the map output is IntPair, IntWritable
  job.setMapOutputKeyClass(IntPair.class);
  job.setMapOutputValueClass(IntWritable.class);

  // the reduce output is Text, IntWritable
  job.setOutputKeyClass(Text.class);
  job.setOutputValueClass(IntWritable.class);
  
  FileInputFormat.addInputPath(job, new Path(otherArgs[0]));
  FileOutputFormat.setOutputPath(job, new Path(otherArgs[1]));
  System.exit(job.waitForCompletion(true) ? 0 : 1);
}
 
源代码13 项目: tez   文件: SecondarySort.java
@SuppressWarnings("deprecation")
@Override
public int run(String[] args) throws Exception {
  Configuration conf = getConf();
  String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();
  if (otherArgs.length != 2) {
    System.err.println("Usage: secondarysort <in> <out>");
    ToolRunner.printGenericCommandUsage(System.out);
    return 2;
  }
  Job job = new Job(conf, "secondary sort");
  job.setJarByClass(SecondarySort.class);
  job.setMapperClass(MapClass.class);
  job.setReducerClass(Reduce.class);

  // group and partition by the first int in the pair
  job.setPartitionerClass(FirstPartitioner.class);
  job.setGroupingComparatorClass(FirstGroupingComparator.class);

  // the map output is IntPair, IntWritable
  job.setMapOutputKeyClass(IntPair.class);
  job.setMapOutputValueClass(IntWritable.class);

  // the reduce output is Text, IntWritable
  job.setOutputKeyClass(Text.class);
  job.setOutputValueClass(IntWritable.class);
  
  FileInputFormat.addInputPath(job, new Path(otherArgs[0]));
  FileOutputFormat.setOutputPath(job, new Path(otherArgs[1]));
  return job.waitForCompletion(true) ? 0 : 1;
}
 
源代码14 项目: xxhadoop   文件: FlowPartitionJob.java
public int run(String[] args) throws Exception {
		
	    /*Configuration conf = getConf();
	    JobClient client = new JobClient(conf);
	    ClusterStatus cluster = client.getClusterStatus();
	    int num_reduces = (int) (cluster.getMaxReduceTasks() * 0.9);
	    String join_reduces = conf.get(REDUCES_PER_HOST);
	    if (join_reduces != null) {
	       num_reduces = cluster.getTaskTrackers() *
	                       Integer.parseInt(join_reduces);
	    }
	    // Set user-supplied (possibly default) job configs
	    job.setNumReduceTasks(num_reduces);*/
	    
	    
		Configuration conf = new Configuration();
		//conf.set("fs.defaultFS", "hdfs://node-01:9000");
		String[] otherArgs = new GenericOptionsParser(conf, args)
				.getRemainingArgs();

		String commaSeparatedPaths = null;
		String outputDir = null;
		if (otherArgs.length == 2) {
			commaSeparatedPaths = otherArgs[0];
			outputDir = otherArgs[1];
		} else {
			System.err.println("Usage: <in>[,<in>...] <out>");
			//System.exit(-1);
			return -1;
		}
		

		Job job = Job.getInstance(conf);
		job.setJobName("FlowPartitionJob");
		job.setJarByClass(FlowPartitionJob.class);
		
//		job.setInputFormatClass(TextInputFormat.class);
//		job.setOutputFormatClass(TextOutputFormat.class);
		
		job.setMapperClass(FlowPartitionMapper.class);
		//job.setCombinerClass(WordCountReducer.class);
		job.setReducerClass(FlowPartitionReducer.class);
		
		job.setPartitionerClass(FlowPartition.class);
		job.setNumReduceTasks(5);

		job.setOutputKeyClass(Text.class);
		job.setOutputValueClass(FlowBean.class);
		job.setMapOutputKeyClass(Text.class);
		job.setMapOutputValueClass(FlowBean.class);

		FileInputFormat.setInputPaths(job, commaSeparatedPaths);
		FileOutputFormat.setOutputPath(job, new Path(outputDir));

		return job.waitForCompletion(true) ? 0 : 1;
	}
 
源代码15 项目: 163-bigdate-note   文件: ParseLogJob.java
public int run(String[] args) throws Exception {
        //创建job
        Configuration config = getConf();
        //添加自定义配置
        config.addResource("mr.xml");
        Job job = Job.getInstance(config);
        //通过job设置一些参数
        job.setJarByClass(ParseLogJob.class);
        job.setJobName("parselog");
        job.setMapperClass(LogMapper.class);
        job.setReducerClass(LogReducer.class);
        job.setMapOutputKeyClass(TextLongWritable.class);
        job.setGroupingComparatorClass(TextLongGroupComparator.class);
        job.setPartitionerClass(TextLongPartition.class);
        job.setMapOutputValueClass(LogWritable.class);
        job.setOutputValueClass(Text.class);

        //设置CombineFileInputFormat
        job.setInputFormatClass(CombineTextInputFormat.class);

        //添加分布式缓存
        job.addCacheFile(new URI(config.get("ip.file.path")));

        //设置OutputFormat
        job.setOutputFormatClass(LogOutputFormat.class);

        //添加输入和输出数据
        FileInputFormat.addInputPath(job, new Path(args[0]));
        Path outputPath = new Path(args[1]);
        FileOutputFormat.setOutputPath(job, outputPath);

        //设置压缩类型
//        FileOutputFormat.setCompressOutput(job, true);
//        FileOutputFormat.setOutputCompressorClass(job, LzopCodec.class);

        FileSystem fs = FileSystem.get(config);
        if (fs.exists(outputPath)) {
            fs.delete(outputPath, true);
        }


        //运行程序
        if (!job.waitForCompletion(true)) {
            throw new RuntimeException(job.getJobName() + "failed!");
        }
        return 0;
    }
 
源代码16 项目: hadoop-connectors   文件: WikipediaRequestBytes.java
public static void main(String[] args)
    throws IOException, InterruptedException, ClassNotFoundException {
  GenericOptionsParser parser = new GenericOptionsParser(args);
  String[] customArgs = parser.getRemainingArgs();
  Configuration config = parser.getConfiguration();

  if (customArgs.length != 5) {
    System.out.println(
        "Usage: hadoop jar wikipedia_bytes_deploy.jar "
            + "[projectId] [inputDatasetId] [inputTableId] [exportGcsBucket] [jobOutputPath]");
    System.exit(1);
  }

  String projectId = customArgs[0];
  String inputDatasetId = customArgs[1];
  String inputTableId = customArgs[2];
  String exportGcsBucket = customArgs[3];
  String jobOutputPath = customArgs[4];

  JobConf conf = new JobConf(config, WikipediaRequestBytes.class);
  BigQueryConfiguration.configureBigQueryInput(conf, projectId, inputDatasetId, inputTableId);
  conf.set(BigQueryConfiguration.GCS_BUCKET.getKey(), exportGcsBucket);

  Job job = new Job(conf, "WikipediaRequestBytes");
  job.setJarByClass(WikipediaRequestBytes.class);

  job.setMapperClass(TitleBytesMapper.class);
  job.setCombinerClass(TitleBytesSumReducer.class);
  job.setReducerClass(TitleBytesSumReducer.class);

  job.setOutputKeyClass(Text.class);
  job.setOutputValueClass(LongWritable.class);
  FileOutputFormat.setOutputPath(job, new Path(jobOutputPath));

  // Read from BigQuery, write with plan TextOutputFormat to provided 'Path'.
  job.setInputFormatClass(GsonBigQueryInputFormat.class);
  job.setOutputFormatClass(TextOutputFormat.class);

  job.waitForCompletion(true);

  // Make sure to clean up the GCS export paths if desired, and possibly an intermediate input
  // table if we did sharded export and thus didn't clean it up at setup time.
  GsonBigQueryInputFormat.cleanupJob(job.getConfiguration(), job.getJobID());
}
 
public int run(String[] args) throws Exception {

    GfxdDataSerializable.initTypes();

    Configuration conf = getConf();
    
    String hdfsHomeDir = args[0];
    String url         = args[1];
    String tableName   = args[2];

    System.out.println("TradeNetworthHdfsDataVerifier.run() invoked with " 
                       + " hdfsHomeDir = " + hdfsHomeDir 
                       + " url = " + url
                       + " tableName = " + tableName);

    // Job-specific params
    conf.set(RowInputFormat.HOME_DIR, hdfsHomeDir);
    conf.set(RowInputFormat.INPUT_TABLE, tableName);
    conf.setBoolean(RowInputFormat.CHECKPOINT_MODE, false);
    conf.set(RowOutputFormat.OUTPUT_TABLE,tableName + "_HDFS");
    conf.set(RowOutputFormat.OUTPUT_URL, url);
    
    
    Job job = Job.getInstance(conf, "TradeNetworthHdfsDataVerifierV2");
    job.setJobName("TradeNetworthHdfsDataVerifierV2");
    job.setInputFormatClass(RowInputFormat.class);
    job.setOutputFormatClass(RowOutputFormat.class);
    
      
    job.setMapperClass(HdfsDataMapper.class);
    job.setMapOutputKeyClass(Key.class);
    job.setMapOutputValueClass(TradeNetworthRow.class);   
    
    job.setReducerClass(HdfsDataReducer.class);  
    job.setOutputKeyClass(Key.class);
    job.setOutputValueClass(TradeNetworthOutputObject.class);
    
    StringBuffer aStr = new StringBuffer();
    aStr.append("HOME_DIR = " + conf.get(RowInputFormat.HOME_DIR) + " ");
    aStr.append("INPUT_TABLE = " + conf.get(RowInputFormat.INPUT_TABLE) + " ");
    aStr.append("OUTPUT_TABLE = " + conf.get(RowOutputFormat.OUTPUT_TABLE) + " ");
    aStr.append("OUTPUT_URL = " + conf.get(RowOutputFormat.OUTPUT_URL) + " ");
    System.out.println("VerifyHdfsData running with the following conf: " + aStr.toString());
    
    return job.waitForCompletion(false) ? 0 : 1;  
  }
 
源代码18 项目: incubator-retired-blur   文件: Driver.java
@Override
public int run(String[] args) throws Exception {
  int c = 0;
  if (args.length < 5) {
    System.err
        .println("Usage Driver <table> <mr inc working path> <output path> <zk connection> <reducer multipler> <extra config files...>");
    return 1;
  }
  String table = args[c++];
  String mrIncWorkingPathStr = args[c++];
  String outputPathStr = args[c++];
  String blurZkConnection = args[c++];
  int reducerMultipler = Integer.parseInt(args[c++]);
  for (; c < args.length; c++) {
    String externalConfigFileToAdd = args[c];
    getConf().addResource(new Path(externalConfigFileToAdd));
  }

  Path outputPath = new Path(outputPathStr);
  Path mrIncWorkingPath = new Path(mrIncWorkingPathStr);
  FileSystem fileSystem = mrIncWorkingPath.getFileSystem(getConf());

  Path newData = new Path(mrIncWorkingPath, NEW);
  Path inprogressData = new Path(mrIncWorkingPath, INPROGRESS);
  Path completeData = new Path(mrIncWorkingPath, COMPLETE);
  Path fileCache = new Path(mrIncWorkingPath, CACHE);

  fileSystem.mkdirs(newData);
  fileSystem.mkdirs(inprogressData);
  fileSystem.mkdirs(completeData);
  fileSystem.mkdirs(fileCache);

  List<Path> srcPathList = new ArrayList<Path>();
  for (FileStatus fileStatus : fileSystem.listStatus(newData)) {
    srcPathList.add(fileStatus.getPath());
  }
  if (srcPathList.isEmpty()) {
    return 0;
  }

  List<Path> inprogressPathList = new ArrayList<Path>();
  boolean success = false;
  Iface client = null;
  try {
    inprogressPathList = movePathList(fileSystem, inprogressData, srcPathList);

    Job job = Job.getInstance(getConf(), "Blur Row Updater for table [" + table + "]");
    client = BlurClient.getClientFromZooKeeperConnectionStr(blurZkConnection);
    waitForOtherSnapshotsToBeRemoved(client, table, MRUPDATE_SNAPSHOT);
    client.createSnapshot(table, MRUPDATE_SNAPSHOT);
    TableDescriptor descriptor = client.describe(table);
    Path tablePath = new Path(descriptor.getTableUri());

    BlurInputFormat.setLocalCachePath(job, fileCache);
    BlurInputFormat.addTable(job, descriptor, MRUPDATE_SNAPSHOT);
    MultipleInputs.addInputPath(job, tablePath, BlurInputFormat.class, MapperForExistingData.class);
    for (Path p : inprogressPathList) {
      FileInputFormat.addInputPath(job, p);
      MultipleInputs.addInputPath(job, p, SequenceFileInputFormat.class, MapperForNewData.class);
    }

    BlurOutputFormat.setOutputPath(job, outputPath);
    BlurOutputFormat.setupJob(job, descriptor);

    job.setReducerClass(UpdateReducer.class);
    job.setMapOutputKeyClass(IndexKey.class);
    job.setMapOutputValueClass(IndexValue.class);
    job.setPartitionerClass(IndexKeyPartitioner.class);
    job.setGroupingComparatorClass(IndexKeyWritableComparator.class);

    BlurOutputFormat.setReducerMultiplier(job, reducerMultipler);

    success = job.waitForCompletion(true);
    Counters counters = job.getCounters();
    LOG.info("Counters [" + counters + "]");

  } finally {
    if (success) {
      LOG.info("Indexing job succeeded!");
      movePathList(fileSystem, completeData, inprogressPathList);
    } else {
      LOG.error("Indexing job failed!");
      movePathList(fileSystem, newData, inprogressPathList);
    }
    if (client != null) {
      client.removeSnapshot(table, MRUPDATE_SNAPSHOT);
    }
  }

  if (success) {
    return 0;
  } else {
    return 1;
  }

}
 
源代码19 项目: learning-hadoop   文件: ImportTsv.java
/**
 * Sets up the actual job.
 * 
 * @param conf
 *            The current configuration.
 * @param args
 *            The command line parameters.
 * @return The newly created job.
 * @throws IOException
 *             When setting up the job fails.
 */
public static Job createSubmittableJob(Configuration conf, String[] args)
		throws IOException, ClassNotFoundException {

	// Support non-XML supported characters
	// by re-encoding the passed separator as a Base64 string.
	String actualSeparator = conf.get(SEPARATOR_CONF_KEY);
	if (actualSeparator != null) {
		conf.set(SEPARATOR_CONF_KEY,
				new String(Base64.encodeBytes(actualSeparator.getBytes())));
	}

	// See if a non-default Mapper was set
	String mapperClassName = conf.get(MAPPER_CONF_KEY);
	Class mapperClass = mapperClassName != null ? Class
			.forName(mapperClassName) : DEFAULT_MAPPER;

	String tableName = args[0];
	Path inputDir = new Path(args[1]);
	Job job = new Job(conf, NAME + "_" + tableName);
	job.setJarByClass(mapperClass);
	FileInputFormat.setInputPaths(job, inputDir);

	String inputCodec = conf.get(INPUT_LZO_KEY);
	if (inputCodec == null) {
		FileInputFormat.setMaxInputSplitSize(job, 67108864l); // max split
																// size =
																// 64m
		job.setInputFormatClass(TextInputFormat.class);
	} else {
		if (inputCodec.equalsIgnoreCase("lzo"))
			job.setInputFormatClass(LzoTextInputFormat.class);
		else {
			usage("not supported compression codec!");
			System.exit(-1);
		}
	}

	job.setMapperClass(mapperClass);

	String hfileOutPath = conf.get(BULK_OUTPUT_CONF_KEY);
	if (hfileOutPath != null) {
		HTable table = new HTable(conf, tableName);
		job.setReducerClass(PutSortReducer.class);
		Path outputDir = new Path(hfileOutPath);
		FileOutputFormat.setOutputPath(job, outputDir);
		job.setMapOutputKeyClass(ImmutableBytesWritable.class);
		job.setMapOutputValueClass(Put.class);
		HFileOutputFormat.configureIncrementalLoad(job, table);
	} else {
		// No reducers. Just write straight to table. Call
		// initTableReducerJob
		// to set up the TableOutputFormat.
		TableMapReduceUtil.initTableReducerJob(tableName, null, job);
		job.setNumReduceTasks(0);
	}

	TableMapReduceUtil.addDependencyJars(job);
	TableMapReduceUtil.addDependencyJars(job.getConfiguration(),
			com.google.common.base.Function.class /*
												 * Guava used by TsvParser
												 */);
	return job;
}
 
源代码20 项目: ignite   文件: HadoopTaskExecutionSelfTest.java
/**
 * @throws Exception If failed.
 */
@Test
public void testMapCombineRun() throws Exception {
    int lineCnt = 10001;
    String fileName = "/testFile";

    prepareFile(fileName, lineCnt);

    totalLineCnt.set(0);
    taskWorkDirs.clear();

    Configuration cfg = new Configuration();

    cfg.setStrings("fs.igfs.impl", IgniteHadoopFileSystem.class.getName());
    cfg.setBoolean(MAP_WRITE, true);

    Job job = Job.getInstance(cfg);
    job.setOutputKeyClass(Text.class);
    job.setOutputValueClass(IntWritable.class);

    job.setMapperClass(TestMapper.class);
    job.setCombinerClass(TestCombiner.class);
    job.setReducerClass(TestReducer.class);

    job.setNumReduceTasks(2);

    job.setInputFormatClass(TextInputFormat.class);

    FileInputFormat.setInputPaths(job, new Path("igfs://" + igfsName + "@/"));
    FileOutputFormat.setOutputPath(job, new Path("igfs://" + igfsName + "@/output/"));

    job.setJarByClass(getClass());

    HadoopJobId jobId = new HadoopJobId(UUID.randomUUID(), 2);

    IgniteInternalFuture<?> fut = grid(0).hadoop().submit(jobId, createJobInfo(job.getConfiguration(), null));

    fut.get();

    assertEquals(lineCnt, totalLineCnt.get());

    assertEquals(34, taskWorkDirs.size());

    for (int g = 0; g < gridCount(); g++)
        grid(g).hadoop().finishFuture(jobId).get();
}