类org.apache.hadoop.mapreduce.lib.input.MultipleInputs源码实例Demo

下面列出了怎么用org.apache.hadoop.mapreduce.lib.input.MultipleInputs的API类实例代码及写法,或者点击链接到github查看源代码。

@Override
public int run(String[] args) throws Exception {
	Configuration conf = new Configuration();
	Job job = new Job(conf, "PostCommentHeirarchy");
	job.setJarByClass(PostCommentHierarchy.class);

	MultipleInputs.addInputPath(job, new Path(args[0]),
			TextInputFormat.class, PostMapper.class);
	MultipleInputs.addInputPath(job, new Path(args[1]),
			TextInputFormat.class, CommentMapper.class);

	job.setReducerClass(PostCommentHierarchyReducer.class);

	job.setOutputFormatClass(TextOutputFormat.class);
	TextOutputFormat.setOutputPath(job, new Path(args[2]));

	job.setOutputKeyClass(Text.class);
	job.setOutputValueClass(Text.class);

	return job.waitForCompletion(true) ? 0 : 2;
}
 
源代码2 项目: incubator-retired-mrql   文件: MapOperation.java
/** The cMap physical operator
 * @param map_fnc       mapper function
 * @param acc_fnc       optional accumulator function
 * @param zero          optional the zero value for the accumulator
 * @param source        input data source
 * @param stop_counter  optional counter used in repeat operation
 * @return a new data source that contains the result
 */
public final static DataSet cMap ( Tree map_fnc,         // mapper function
                                   Tree acc_fnc,         // optional accumulator function
                                   Tree zero,            // optional the zero value for the accumulator
                                   DataSet source,       // input data source
                                   String stop_counter ) // optional counter used in repeat operation
                            throws Exception {
    conf = MapReduceEvaluator.clear_configuration(conf);
    String newpath = new_path(conf);
    conf.set("mrql.mapper",map_fnc.toString());
    conf.set("mrql.counter",stop_counter);
    if (zero != null) {
        conf.set("mrql.accumulator",acc_fnc.toString());
        conf.set("mrql.zero",zero.toString());
    } else conf.set("mrql.zero","");
    setupSplits(source,conf);
    Job job = new Job(conf,newpath);
    distribute_compiled_arguments(job.getConfiguration());
    job.setJarByClass(MapReducePlan.class);
    job.setOutputKeyClass(MRContainer.class);
    job.setOutputValueClass(MRContainer.class);
    job.setOutputFormatClass(SequenceFileOutputFormat.class);
    for (DataSource p: source.source)
        MultipleInputs.addInputPath(job,new Path(p.path),(Class<? extends MapReduceMRQLFileInputFormat>)p.inputFormat,cMapMapper.class);
    FileOutputFormat.setOutputPath(job,new Path(newpath));
    job.setNumReduceTasks(0);
    job.waitForCompletion(true);
    long c = (stop_counter.equals("-")) ? 0
             : job.getCounters().findCounter("mrql",stop_counter).getValue();
    return new DataSet(new BinaryDataSource(newpath,conf),c,outputRecords(job));
}
 
源代码3 项目: rya   文件: JoinSelectStatsUtil.java
public static void initJoinMRJob(Job job, String prospectsPath, String spoPath, Class<? extends Mapper<CompositeType,TripleCard,?,?>> mapperClass,
    String outPath, String auths) throws AccumuloSecurityException {

  MultipleInputs.addInputPath(job, new Path(prospectsPath), SequenceFileInputFormat.class, mapperClass);
  MultipleInputs.addInputPath(job, new Path(spoPath), SequenceFileInputFormat.class, mapperClass);
  job.setMapOutputKeyClass(CompositeType.class);
  job.setMapOutputValueClass(TripleCard.class);

  SequenceFileOutputFormat.setOutputPath(job, new Path(outPath));
  job.setOutputFormatClass(SequenceFileOutputFormat.class);
  job.setOutputKeyClass(TripleEntry.class);
  job.setOutputValueClass(CardList.class);

}
 
源代码4 项目: rya   文件: JoinSelectStatisticsTest.java
@Override
public int run(String[] args) throws Exception {

    Configuration conf = getConf();
    String outpath = conf.get(OUTPUTPATH);
    
    Job job = new Job(conf, this.getClass().getSimpleName() + "_" + System.currentTimeMillis());
    job.setJarByClass(this.getClass());
    conf.setBoolean(MRJobConfig.MAPREDUCE_JOB_USER_CLASSPATH_FIRST, true);
    
    MultipleInputs.addInputPath(job, new Path(PROSPECTSOUT.getAbsolutePath()), 
            SequenceFileInputFormat.class, JoinSelectAggregateMapper.class);
    MultipleInputs.addInputPath(job,new Path(SPOOUT.getAbsolutePath()) , 
            SequenceFileInputFormat.class, JoinSelectAggregateMapper.class);
    job.setMapOutputKeyClass(CompositeType.class);
    job.setMapOutputValueClass(TripleCard.class);

    tempDir = new File(File.createTempFile(outpath, "txt").getParentFile(), System.currentTimeMillis() + "");
    SequenceFileOutputFormat.setOutputPath(job, new Path(tempDir.getAbsolutePath()));
    job.setOutputFormatClass(SequenceFileOutputFormat.class);
    job.setOutputKeyClass(TripleEntry.class);
    job.setOutputValueClass(CardList.class);


    job.setSortComparatorClass(JoinSelectSortComparator.class);
    job.setGroupingComparatorClass(JoinSelectGroupComparator.class);
    job.setPartitionerClass(JoinSelectPartitioner.class);
    job.setReducerClass(JoinReducer.class);
    job.setNumReduceTasks(32);
    job.waitForCompletion(true);
    
    return job.isSuccessful() ? 0 : 1;          
}
 
源代码5 项目: rya   文件: AbstractReasoningTool.java
/**
 * Set up the MapReduce job to use Accumulo as an input.
 * @param tableMapper Mapper class to use
 */
protected void configureAccumuloInput(Class<? extends Mapper<Key,Value,?,?>> tableMapper)
        throws AccumuloSecurityException {
    MRReasoningUtils.configureAccumuloInput(job);
    MultipleInputs.addInputPath(job, new Path("/tmp/input"),
        AccumuloInputFormat.class, tableMapper);
}
 
源代码6 项目: rya   文件: AbstractReasoningTool.java
/**
 * Set up the MapReduce job to use an RDF file as an input.
 * @param rdfMapper class to use
 */
protected void configureRdfInput(Path inputPath,
        Class<? extends Mapper<LongWritable, RyaStatementWritable, ?, ?>> rdfMapper) {
    Configuration conf = job.getConfiguration();
    String format = conf.get(MRUtils.FORMAT_PROP, RDFFormat.RDFXML.getName());
    conf.set(MRUtils.FORMAT_PROP, format);
    MultipleInputs.addInputPath(job, inputPath,
        RdfFileInputFormat.class, rdfMapper);
}
 
源代码7 项目: hiped2   文件: StreamingRepartitionJoin.java
/**
 * The MapReduce driver - setup and launch the job.
 *
 * @param args the command-line arguments
 * @return the process exit code
 * @throws Exception if something goes wrong
 */
public int run(final String[] args) throws Exception {

  Cli cli = Cli.builder().setArgs(args).addOptions(Options.values()).build();
  int result = cli.runCmd();

  if (result != 0) {
    return result;
  }

  Path usersPath = new Path(cli.getArgValueAsString(Options.USERS));
  Path userLogsPath = new Path(cli.getArgValueAsString(Options.USER_LOGS));
  Path outputPath = new Path(cli.getArgValueAsString(Options.OUTPUT));

  Configuration conf = super.getConf();

  Job job = new Job(conf);
  job.setJarByClass(StreamingRepartitionJoin.class);

  MultipleInputs.addInputPath(job, usersPath, TextInputFormat.class, UserMap.class);
  MultipleInputs.addInputPath(job, userLogsPath, TextInputFormat.class, UserLogMap.class);

  ShuffleUtils.configBuilder()
      .useNewApi()
      .setSortIndices(KeyFields.USER, KeyFields.DATASET)
      .setPartitionerIndices(KeyFields.USER)
      .setGroupIndices(KeyFields.USER)
      .configure(job.getConfiguration());

  job.setReducerClass(Reduce.class);

  job.setMapOutputKeyClass(Tuple.class);
  job.setMapOutputValueClass(Tuple.class);

  FileOutputFormat.setOutputPath(job, outputPath);

  return job.waitForCompletion(true) ? 0 : 1;
}
 
源代码8 项目: hiped2   文件: BloomJoin.java
/**
 * The MapReduce driver - setup and launch the job.
 *
 * @param args the command-line arguments
 * @return the process exit code
 * @throws Exception if something goes wrong
 */
public int run(final String[] args) throws Exception {

  Cli cli = Cli.builder().setArgs(args).addOptions(Options.values()).build();
  int result = cli.runCmd();

  if (result != 0) {
    return result;
  }

  Path usersPath = new Path(cli.getArgValueAsString(Options.USERS));
  Path userLogsPath = new Path(cli.getArgValueAsString(Options.USER_LOGS));
  Path bloomPath = new Path(cli.getArgValueAsString(Options.BLOOM_FILE));
  Path outputPath = new Path(cli.getArgValueAsString(Options.OUTPUT));

  Configuration conf = super.getConf();

  Job job = new Job(conf);

  job.setJarByClass(BloomJoin.class);
  MultipleInputs.addInputPath(job, usersPath, TextInputFormat.class, UserMap.class);
  MultipleInputs.addInputPath(job, userLogsPath, TextInputFormat.class, UserLogMap.class);

  job.setMapOutputKeyClass(Text.class);
  job.setMapOutputValueClass(Tuple.class);

  job.setReducerClass(Reduce.class);

  job.addCacheFile(bloomPath.toUri());
  job.getConfiguration().set(AbstractFilterMap.DISTCACHE_FILENAME_CONFIG, bloomPath.getName());

  FileInputFormat.setInputPaths(job, userLogsPath);
  FileOutputFormat.setOutputPath(job, outputPath);

  return job.waitForCompletion(true) ? 0 : 1;
}
 
源代码9 项目: incubator-retired-blur   文件: IndexerJobDriver.java
private boolean runMrWithLookup(String uuid, TableDescriptor descriptor, List<Path> inprogressPathList, String table,
    Path fileCache, Path outputPath, int reducerMultipler, Path tmpPath, TableStats tableStats, String snapshot)
    throws ClassNotFoundException, IOException, InterruptedException {
  PartitionedInputResult result = buildPartitionedInputData(uuid, tmpPath, descriptor, inprogressPathList, snapshot,
      fileCache);

  Job job = Job.getInstance(getConf(), "Blur Row Updater for table [" + table + "]");

  ExistingDataIndexLookupMapper.setSnapshot(job, MRUPDATE_SNAPSHOT);
  FileInputFormat.addInputPath(job, result._partitionedInputData);
  MultipleInputs.addInputPath(job, result._partitionedInputData, SequenceFileInputFormat.class,
      ExistingDataIndexLookupMapper.class);

  for (Path p : inprogressPathList) {
    FileInputFormat.addInputPath(job, p);
    MultipleInputs.addInputPath(job, p, SequenceFileInputFormat.class, NewDataMapper.class);
  }

  BlurOutputFormat.setOutputPath(job, outputPath);
  BlurOutputFormat.setupJob(job, descriptor);

  job.setReducerClass(UpdateReducer.class);
  job.setMapOutputKeyClass(IndexKey.class);
  job.setMapOutputValueClass(IndexValue.class);
  job.setPartitionerClass(IndexKeyPartitioner.class);
  job.setGroupingComparatorClass(IndexKeyWritableComparator.class);

  BlurOutputFormat.setReducerMultiplier(job, reducerMultipler);

  boolean success = job.waitForCompletion(true);
  Counters counters = job.getCounters();
  LOG.info("Counters [" + counters + "]");
  return success;
}
 
源代码10 项目: incubator-retired-blur   文件: IndexerJobDriver.java
private boolean runMrOnly(TableDescriptor descriptor, List<Path> inprogressPathList, String table, Path fileCache,
    Path outputPath, int reducerMultipler) throws IOException, ClassNotFoundException, InterruptedException {
  Job job = Job.getInstance(getConf(), "Blur Row Updater for table [" + table + "]");
  Path tablePath = new Path(descriptor.getTableUri());
  BlurInputFormat.setLocalCachePath(job, fileCache);
  BlurInputFormat.addTable(job, descriptor, MRUPDATE_SNAPSHOT);
  MultipleInputs.addInputPath(job, tablePath, BlurInputFormat.class, ExistingDataMapper.class);

  for (Path p : inprogressPathList) {
    FileInputFormat.addInputPath(job, p);
    MultipleInputs.addInputPath(job, p, SequenceFileInputFormat.class, NewDataMapper.class);
  }

  BlurOutputFormat.setOutputPath(job, outputPath);
  BlurOutputFormat.setupJob(job, descriptor);

  job.setReducerClass(UpdateReducer.class);
  job.setMapOutputKeyClass(IndexKey.class);
  job.setMapOutputValueClass(IndexValue.class);
  job.setPartitionerClass(IndexKeyPartitioner.class);
  job.setGroupingComparatorClass(IndexKeyWritableComparator.class);

  BlurOutputFormat.setReducerMultiplier(job, reducerMultipler);

  boolean success = job.waitForCompletion(true);
  Counters counters = job.getCounters();
  LOG.info("Counters [" + counters + "]");
  return success;
}
 
@Override
public int run(String[] args) throws Exception {
	Configuration conf = new Configuration();
	GenericOptionsParser parser = new GenericOptionsParser(conf, args);
	String[] otherArgs = parser.getRemainingArgs();
	if (otherArgs.length != 4) {
		printUsage();
	}
	Job job = new Job(conf, "ReduceSideJoin");
	job.setJarByClass(ReplicatedUserJoin.class);

	// Use MultipleInputs to set which input uses what mapper
	// This will keep parsing of each data set separate from a logical
	// standpoint
	// The first two elements of the args array are the two inputs
	MultipleInputs.addInputPath(job, new Path(args[0]),
			TextInputFormat.class, UserJoinMapper.class);
	MultipleInputs.addInputPath(job, new Path(args[1]),
			TextInputFormat.class, CommentJoinMapper.class);
	job.getConfiguration().set("join.type", args[2]);

	job.setReducerClass(UserJoinReducer.class);

	job.setOutputFormatClass(TextOutputFormat.class);
	TextOutputFormat.setOutputPath(job, new Path(args[3]));

	job.setOutputKeyClass(Text.class);
	job.setOutputValueClass(Text.class);

	return job.waitForCompletion(true) ? 0 : 2;
}
 
@Override
public int run(String[] args) throws Exception {
	Configuration conf = new Configuration();
	GenericOptionsParser parser = new GenericOptionsParser(conf, args);
	String[] otherArgs = parser.getRemainingArgs();
	if (otherArgs.length != 4) {
		printUsage();
	}
	Job job = new Job(conf, "ReduceSideJoinBloomFilter");
	job.setJarByClass(ReduceSideJoinBloomFilter.class);

	// Use MultipleInputs to set which input uses what mapper
	// This will keep parsing of each data set separate from a logical
	// standpoint
	// The first two elements of the args array are the two inputs
	MultipleInputs.addInputPath(job, new Path(args[0]),
			TextInputFormat.class, UserJoinMapper.class);
	MultipleInputs.addInputPath(job, new Path(args[1]),
			TextInputFormat.class, CommentJoinMapperWithBloom.class);
	job.getConfiguration().set("join.type", args[2]);

	job.setReducerClass(UserJoinReducer.class);

	job.setOutputFormatClass(TextOutputFormat.class);
	TextOutputFormat.setOutputPath(job, new Path(args[3]));

	job.setOutputKeyClass(Text.class);
	job.setOutputValueClass(Text.class);

	return job.waitForCompletion(true) ? 0 : 2;
}
 
@Override
public int run(String[] args) throws Exception {
	Configuration conf = new Configuration();
	GenericOptionsParser parser = new GenericOptionsParser(conf, args);
	String[] otherArgs = parser.getRemainingArgs();
	if (otherArgs.length != 4) {
		printUsage();
	}
	Job job = new Job(conf, "ReduceSideJoin");
	job.setJarByClass(ReduceSideJoin.class);

	// Use MultipleInputs to set which input uses what mapper
	// This will keep parsing of each data set separate from a logical
	// standpoint
	// The first two elements of the args array are the two inputs
	MultipleInputs.addInputPath(job, new Path(args[0]),
			TextInputFormat.class, UserJoinMapper.class);
	MultipleInputs.addInputPath(job, new Path(args[1]),
			TextInputFormat.class, CommentJoinMapper.class);
	job.getConfiguration().set("join.type", args[2]);

	job.setReducerClass(UserJoinReducer.class);

	job.setOutputFormatClass(TextOutputFormat.class);
	TextOutputFormat.setOutputPath(job, new Path(args[3]));

	job.setOutputKeyClass(Text.class);
	job.setOutputValueClass(Text.class);

	return job.waitForCompletion(true) ? 0 : 2;
}
 
源代码14 项目: MapReduce-Demo   文件: MultiInOutput.java
public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
	// 1.设置HDFS配置信息
	String namenode_ip = "192.168.17.10";
	String hdfs = "hdfs://" + namenode_ip + ":9000";
	Configuration conf = new Configuration();
	conf.set("fs.defaultFS", hdfs);
	conf.set("mapreduce.app-submission.cross-platform", "true");

	// 2.设置MapReduce作业配置信息
	String jobName = "MultInputOutput"; // 作业名称
	Job job = Job.getInstance(conf, jobName);
	job.setJarByClass(MultiInOutput.class); // 指定运行时作业类
	job.setJar("export\\MultiInOutput.jar"); // 指定本地jar包
	job.setMapOutputKeyClass(Text.class); // 设置Mapper输出Key类型
	job.setMapOutputValueClass(IntWritable.class); // 设置Mapper输出Value类型
	job.setReducerClass(MultOutputReducer.class); // 指定Reducer类
	// job.setOutputKeyClass(Text.class); //设置Reduce输出Key类型
	// job.setOutputValueClass(IntWritable.class); //设置Reduce输出Value类型
	
	// 3.指定作业多输入路径,及Map所使用的类
	MultipleInputs.addInputPath(job, new Path(hdfs+"/expr/multiinoutput/data/txt"), TextInputFormat.class, TxtFileMapper.class);
	MultipleInputs.addInputPath(job, new Path(hdfs+"/expr/multiinoutput/data/csv"), TextInputFormat.class, CsvFileMapper.class);
	
	// 定义多文件输出的文件名、输出格式、Reduce输出键类型,值类型
	MultipleOutputs.addNamedOutput(job, "f2015", TextOutputFormat.class, Text.class, IntWritable.class);
	MultipleOutputs.addNamedOutput(job, "f2016", SequenceFileOutputFormat.class, Text.class, IntWritable.class);
	MultipleOutputs.addNamedOutput(job, "f2017", MapFileOutputFormat.class, Text.class, IntWritable.class);

	// 设置作业输出路径
	String outputDir = "/expr/multiinoutput/output"; // 实验输出目录
	Path outPath = new Path(hdfs + outputDir);
	FileOutputFormat.setOutputPath(job, outPath);
	FileSystem fs = FileSystem.get(conf);
	if (fs.exists(outPath)) {
		fs.delete(outPath, true);
	}

	// 4.运行作业
	System.out.println("Job: " + jobName + " is running...");
	if (job.waitForCompletion(true)) {
		System.out.println("success!");
		System.exit(0);
	} else {
		System.out.println("failed!");
		System.exit(1);
	}
}
 
源代码15 项目: MapReduce-Demo   文件: MultInput2.java
public static void main(String[] args) throws Exception {		
	//1.设置HDFS配置信息
	String namenode_ip = "192.168.17.10";
	String hdfs = "hdfs://" + namenode_ip + ":9000";			
	Configuration conf = new Configuration();
	conf.set("fs.defaultFS", hdfs);
	conf.set("mapreduce.app-submission.cross-platform", "true");

	//2.设置MapReduce作业配置信息
	String jobName = "MultInput2";					//作业名称
	Job job = Job.getInstance(conf, jobName);
	job.setJarByClass(MultInput2.class);			//指定运行时作业类
	job.setJar("export\\MultInput2.jar");			//指定本地jar包
	
	//job.setMapperClass(MultInput2Mapper.class);	//无需指定Mapper类,而在MultipleInputs.addInputPath()方法中指定
	job.setMapOutputKeyClass(Text.class);			//设置Mapper输出Key类型
	job.setMapOutputValueClass(IntWritable.class);	//设置Mapper输出Value类型
	job.setReducerClass(MultInput2Reducer.class);	//指定Reducer类
	job.setOutputKeyClass(Text.class);				//设置Reduce输出Key类型
	job.setOutputValueClass(IntWritable.class); 	//设置Reduce输出Value类型
	
	//3.设置作业输入和输出路径	
	//方法五:MultipleInputs.addInputPath()
	MultipleInputs.addInputPath(job, new Path(hdfs+"/expr/multinput/data/txt1"), TextInputFormat.class, TxtFileMapper.class);
	MultipleInputs.addInputPath(job, new Path(hdfs+"/expr/multinput/data/csv"), TextInputFormat.class, CsvFileMapper.class);
	
	Path outPath = new Path(hdfs + "/expr/multinput/output3");		//输出目录
	FileOutputFormat.setOutputPath(job, outPath);
	FileSystem fs = FileSystem.get(conf);
	if(fs.exists(outPath)) {
		fs.delete(outPath, true);
	}
	
	//4.运行作业
	System.out.println("Job: " + jobName + " is running...");
	if(job.waitForCompletion(true)) {
		System.out.println("success!");
		System.exit(0);
	} else {
		System.out.println("failed!");
		System.exit(1);
	}
}
 
源代码16 项目: 163-bigdate-note   文件: ParseLogJob_End.java
public int run(String[] args) throws Exception {
        //创建job
        Configuration config = getConf();
        //添加自定义配置
        config.addResource("mr.xml");
        Job job = Job.getInstance(config);
        //通过job设置一些参数
        //通过job设置一些参数
        job.setJarByClass(ParseLogJob_End.class);
        job.setJobName("parselog");
        job.setMapperClass(LogMapper.class);
        job.setReducerClass(LogReducer.class);
        job.setMapOutputKeyClass(TextLongWritable.class);
        job.setGroupingComparatorClass(TextLongGroupComparator.class);
        job.setPartitionerClass(TextLongPartition.class);
        job.setMapOutputValueClass(LogWritable.class);
        job.setOutputValueClass(Text.class);

        //设置CombineFileInputFormat
        job.setInputFormatClass(CombineTextInputFormat.class);

        //添加分布式缓存
        job.addCacheFile(new URI(config.get("ip.file.path")));

        //设置OutputFormat
//        job.setOutputFormatClass(LogOutputFormat.class);


        //设置压缩类型
//        FileOutputFormat.setCompressOutput(job, true);
//        FileOutputFormat.setOutputCompressorClass(job, LzopCodec.class);

        FileSystem fs = FileSystem.get(config);
        //添加输入和输出数据
        FileStatus[] fileStatuses = fs.listStatus(new Path(args[0]));
        for (int i = 0; i < fileStatuses.length; i++) {
            MultipleInputs.addInputPath(job, fileStatuses[i].getPath(), TextInputFormat.class, LogMapper.class);
            String inputPath = fileStatuses[i].getPath().toString();
            String dir_name = inputPath.substring(inputPath.lastIndexOf('/')+1);
            System.out.println(dir_name);
        }

        Path outputPath = new Path(args[1]);
        FileOutputFormat.setOutputPath(job, outputPath);
        if (fs.exists(outputPath)) {
            fs.delete(outputPath, true);
        }
        //运行程序
        if (!job.waitForCompletion(true)) {
            throw new RuntimeException(job.getJobName() + "failed!");
        }

        return 0;
    }
 
@Override
public int run(String[] args)
        throws Exception
{
    Job job = Job.getInstance(getConf());

    job.setJarByClass(Phase4RemoveDuplicatesUsingReduceSideJoins.class);
    job.setJobName(Phase4RemoveDuplicatesUsingReduceSideJoins.class.getName());

    // paths
    // text files of ids to be deleted
    String textFilePath = args[0];
    // corpus with *.warc.gz
    String commaSeparatedInputFiles = args[1];
    // output
    String outputPath = args[2];

    //second input the look up text file
    MultipleInputs.addInputPath(job, new Path(textFilePath), TextInputFormat.class,
            JoinTextMapper.class);
    //first input the data set (check comma separated availability)
    MultipleInputs.addInputPath(job, new Path(commaSeparatedInputFiles), WARCInputFormat.class,
            JoinWARCMapper.class);

    job.setPartitionerClass(SourceJoiningKeyPartitioner.class);
    job.setGroupingComparatorClass(SourceJoiningGroupingComparator.class);

    job.setMapOutputKeyClass(CompositeKey.class);
    job.setMapOutputValueClass(WARCWritable.class);

    job.setReducerClass(JoinReducer.class);

    job.setOutputFormatClass(WARCOutputFormat.class);
    job.setOutputKeyClass(NullWritable.class);
    job.setOutputValueClass(WARCWritable.class);
    FileOutputFormat.setOutputPath(job, new Path(outputPath));
    FileOutputFormat.setCompressOutput(job, true);
    FileOutputFormat.setOutputCompressorClass(job, GzipCodec.class);

    return job.waitForCompletion(true) ? 0 : 1;
}
 
/**
 * The MapReduce physical operator
 * @param map_fnc          the mapper function
 * @param combine_fnc      optional in-mapper combiner function
 * @param reduce_fnc       the reducer function
 * @param acc_fnc          optional accumulator function
 * @param zero             optional the zero value for the accumulator
 * @param source           the input data source
 * @param num_reduces      number of reducers
 * @param stop_counter     optional counter used in repeat operation
 * @param orderp           does the result need to be ordered?
 * @return a new data source that contains the result
 */
public final static DataSet mapReduce ( Tree map_fnc,         // mapper function
                                        Tree combine_fnc,     // optional in-mapper combiner function
                                        Tree reduce_fnc,      // reducer function
                                        Tree acc_fnc,         // optional accumulator function
                                        Tree zero,            // optional the zero value for the accumulator
                                        DataSet source,       // input data source
                                        int num_reduces,      // number of reducers
                                        String stop_counter,  // optional counter used in repeat operation
                                        boolean orderp )      // does the result need to be ordered?
                            throws Exception {
    conf = MapReduceEvaluator.clear_configuration(conf);
    String newpath = new_path(conf);
    conf.set("mrql.mapper",map_fnc.toString());
    if (combine_fnc != null)
        conf.set("mrql.combiner",combine_fnc.toString());
    conf.set("mrql.reducer",reduce_fnc.toString());
    if (zero != null) {   // will use in-mapper combiner
        conf.set("mrql.accumulator",acc_fnc.toString());
        conf.set("mrql.zero",zero.toString());
    } else conf.set("mrql.zero","");
    conf.set("mrql.counter",stop_counter);
    setupSplits(source,conf);
    Job job = new Job(conf,newpath);
    distribute_compiled_arguments(job.getConfiguration());
    job.setJarByClass(MapReducePlan.class);
    job.setOutputKeyClass(MRContainer.class);
    job.setOutputValueClass(MRContainer.class);
    job.setPartitionerClass(MRContainerPartitioner.class);
    job.setSortComparatorClass(MRContainerKeyComparator.class);
    job.setGroupingComparatorClass(MRContainerKeyComparator.class);
    job.setOutputFormatClass(SequenceFileOutputFormat.class);
    for (DataSource p: source.source)
        MultipleInputs.addInputPath(job,new Path(p.path),(Class<? extends MapReduceMRQLFileInputFormat>)p.inputFormat,MRMapper.class);
    FileOutputFormat.setOutputPath(job,new Path(newpath));
    job.setReducerClass(MRReducer.class);
    if (Config.trace && PlanGeneration.streamed_MapReduce_reducer(reduce_fnc))
        System.out.println("Streamed MapReduce reducer");
    if (num_reduces > 0)
        job.setNumReduceTasks(num_reduces);
    job.waitForCompletion(true);
    long c = (stop_counter.equals("-")) ? 0
             : job.getCounters().findCounter("mrql",stop_counter).getValue();
    DataSource s = new BinaryDataSource(newpath,conf);
    s.to_be_merged = orderp;
    return new DataSet(s,c,outputRecords(job));
}
 
源代码19 项目: incubator-retired-blur   文件: IndexerJobDriver.java
private boolean runAutomatic(String uuid, TableDescriptor descriptor, List<Path> inprogressPathList, String table,
    Path fileCache, Path outputPath, int reducerMultipler, Path tmpPath, TableStats tableStats, String snapshot)
    throws ClassNotFoundException, IOException, InterruptedException {
  PartitionedInputResult result = buildPartitionedInputData(uuid, tmpPath, descriptor, inprogressPathList, snapshot,
      fileCache);

  Job job = Job.getInstance(getConf(), "Blur Row Updater for table [" + table + "]");

  InputSplitPruneUtil.setBlurLookupRowIdFromNewDataCounts(job, table, result._rowIdsFromNewData);
  InputSplitPruneUtil.setBlurLookupRowIdUpdateFromNewDataCounts(job, table, result._rowIdsToUpdateFromNewData);
  InputSplitPruneUtil.setBlurLookupRowIdFromIndexCounts(job, table, result._rowIdsFromIndex);
  InputSplitPruneUtil.setTable(job, table);

  BlurInputFormat.setLocalCachePath(job, fileCache);

  // Existing data - This adds the copy data files first open and stream
  // through all documents.
  {
    Path tablePath = new Path(descriptor.getTableUri());
    BlurInputFormat.addTable(job, descriptor, MRUPDATE_SNAPSHOT);
    MultipleInputs.addInputPath(job, tablePath, PrunedBlurInputFormat.class, ExistingDataMapper.class);
  }

  // Existing data - This adds the row id lookup
  {
    ExistingDataIndexLookupMapper.setSnapshot(job, MRUPDATE_SNAPSHOT);
    FileInputFormat.addInputPath(job, result._partitionedInputData);
    MultipleInputs.addInputPath(job, result._partitionedInputData, PrunedSequenceFileInputFormat.class,
        ExistingDataIndexLookupMapper.class);
  }

  // New Data
  for (Path p : inprogressPathList) {
    FileInputFormat.addInputPath(job, p);
    MultipleInputs.addInputPath(job, p, SequenceFileInputFormat.class, NewDataMapper.class);
  }

  BlurOutputFormat.setOutputPath(job, outputPath);
  BlurOutputFormat.setupJob(job, descriptor);

  job.setReducerClass(UpdateReducer.class);
  job.setMapOutputKeyClass(IndexKey.class);
  job.setMapOutputValueClass(IndexValue.class);
  job.setPartitionerClass(IndexKeyPartitioner.class);
  job.setGroupingComparatorClass(IndexKeyWritableComparator.class);

  BlurOutputFormat.setReducerMultiplier(job, reducerMultipler);

  boolean success = job.waitForCompletion(true);
  Counters counters = job.getCounters();
  LOG.info("Counters [" + counters + "]");
  return success;
}
 
源代码20 项目: incubator-retired-blur   文件: Driver.java
@Override
public int run(String[] args) throws Exception {
  int c = 0;
  if (args.length < 5) {
    System.err
        .println("Usage Driver <table> <mr inc working path> <output path> <zk connection> <reducer multipler> <extra config files...>");
    return 1;
  }
  String table = args[c++];
  String mrIncWorkingPathStr = args[c++];
  String outputPathStr = args[c++];
  String blurZkConnection = args[c++];
  int reducerMultipler = Integer.parseInt(args[c++]);
  for (; c < args.length; c++) {
    String externalConfigFileToAdd = args[c];
    getConf().addResource(new Path(externalConfigFileToAdd));
  }

  Path outputPath = new Path(outputPathStr);
  Path mrIncWorkingPath = new Path(mrIncWorkingPathStr);
  FileSystem fileSystem = mrIncWorkingPath.getFileSystem(getConf());

  Path newData = new Path(mrIncWorkingPath, NEW);
  Path inprogressData = new Path(mrIncWorkingPath, INPROGRESS);
  Path completeData = new Path(mrIncWorkingPath, COMPLETE);
  Path fileCache = new Path(mrIncWorkingPath, CACHE);

  fileSystem.mkdirs(newData);
  fileSystem.mkdirs(inprogressData);
  fileSystem.mkdirs(completeData);
  fileSystem.mkdirs(fileCache);

  List<Path> srcPathList = new ArrayList<Path>();
  for (FileStatus fileStatus : fileSystem.listStatus(newData)) {
    srcPathList.add(fileStatus.getPath());
  }
  if (srcPathList.isEmpty()) {
    return 0;
  }

  List<Path> inprogressPathList = new ArrayList<Path>();
  boolean success = false;
  Iface client = null;
  try {
    inprogressPathList = movePathList(fileSystem, inprogressData, srcPathList);

    Job job = Job.getInstance(getConf(), "Blur Row Updater for table [" + table + "]");
    client = BlurClient.getClientFromZooKeeperConnectionStr(blurZkConnection);
    waitForOtherSnapshotsToBeRemoved(client, table, MRUPDATE_SNAPSHOT);
    client.createSnapshot(table, MRUPDATE_SNAPSHOT);
    TableDescriptor descriptor = client.describe(table);
    Path tablePath = new Path(descriptor.getTableUri());

    BlurInputFormat.setLocalCachePath(job, fileCache);
    BlurInputFormat.addTable(job, descriptor, MRUPDATE_SNAPSHOT);
    MultipleInputs.addInputPath(job, tablePath, BlurInputFormat.class, MapperForExistingData.class);
    for (Path p : inprogressPathList) {
      FileInputFormat.addInputPath(job, p);
      MultipleInputs.addInputPath(job, p, SequenceFileInputFormat.class, MapperForNewData.class);
    }

    BlurOutputFormat.setOutputPath(job, outputPath);
    BlurOutputFormat.setupJob(job, descriptor);

    job.setReducerClass(UpdateReducer.class);
    job.setMapOutputKeyClass(IndexKey.class);
    job.setMapOutputValueClass(IndexValue.class);
    job.setPartitionerClass(IndexKeyPartitioner.class);
    job.setGroupingComparatorClass(IndexKeyWritableComparator.class);

    BlurOutputFormat.setReducerMultiplier(job, reducerMultipler);

    success = job.waitForCompletion(true);
    Counters counters = job.getCounters();
    LOG.info("Counters [" + counters + "]");

  } finally {
    if (success) {
      LOG.info("Indexing job succeeded!");
      movePathList(fileSystem, completeData, inprogressPathList);
    } else {
      LOG.error("Indexing job failed!");
      movePathList(fileSystem, newData, inprogressPathList);
    }
    if (client != null) {
      client.removeSnapshot(table, MRUPDATE_SNAPSHOT);
    }
  }

  if (success) {
    return 0;
  } else {
    return 1;
  }

}
 
源代码21 项目: hiped2   文件: SimpleRepartitionMapReduce2.java
/**
 * The MapReduce driver - setup and launch the job.
 *
 * @param args the command-line arguments
 * @return the process exit code
 * @throws Exception if something goes wrong
 */
public int run(final String[] args) throws Exception {

  Cli cli = Cli.builder().setArgs(args).addOptions(Options.values()).build();
  int result = cli.runCmd();

  if (result != 0) {
    return result;
  }

  Path usersPath = new Path(cli.getArgValueAsString(Options.USERS));
  Path userLogsPath = new Path(cli.getArgValueAsString(Options.USER_LOGS));
  Path outputPath = new Path(cli.getArgValueAsString(Options.OUTPUT));

  Configuration conf = super.getConf();

  Job job = new Job(conf);
  job.setJarByClass(SimpleRepartitionMapReduce2.class);

  MultipleInputs.addInputPath(job, usersPath, TextInputFormat.class, UserMap.class);
  MultipleInputs.addInputPath(job, userLogsPath, TextInputFormat.class, UserLogMap.class);

  job.setReducerClass(Reduce.class);

  job.setMapOutputKeyClass(Tuple.class);
  job.setMapOutputValueClass(Tuple.class);

  FileOutputFormat.setOutputPath(job, outputPath);

  return job.waitForCompletion(true) ? 0 : 1;
}
 
源代码22 项目: hiped2   文件: SimpleRepartitionJoin.java
/**
 * The MapReduce driver - setup and launch the job.
 *
 * @param args the command-line arguments
 * @return the process exit code
 * @throws Exception if something goes wrong
 */
public int run(final String[] args) throws Exception {

  Cli cli = Cli.builder().setArgs(args).addOptions(Options.values()).build();
  int result = cli.runCmd();

  if (result != 0) {
    return result;
  }

  Path usersPath = new Path(cli.getArgValueAsString(Options.USERS));
  Path userLogsPath = new Path(cli.getArgValueAsString(Options.USER_LOGS));
  Path outputPath = new Path(cli.getArgValueAsString(Options.OUTPUT));

  Configuration conf = super.getConf();

  Job job = new Job(conf);
  job.setJarByClass(SimpleRepartitionJoin.class);

  MultipleInputs.addInputPath(job, usersPath, TextInputFormat.class, UserMap.class);
  MultipleInputs.addInputPath(job, userLogsPath, TextInputFormat.class, UserLogMap.class);

  job.setReducerClass(Reduce.class);

  job.setMapOutputKeyClass(Text.class);
  job.setMapOutputValueClass(Tuple.class);

  FileOutputFormat.setOutputPath(job, outputPath);

  return job.waitForCompletion(true) ? 0 : 1;
}
 
 类方法
 同包方法