下面列出了怎么用org.apache.hadoop.mapreduce.lib.input.MultipleInputs的API类实例代码及写法,或者点击链接到github查看源代码。
@Override
public int run(String[] args) throws Exception {
Configuration conf = new Configuration();
Job job = new Job(conf, "PostCommentHeirarchy");
job.setJarByClass(PostCommentHierarchy.class);
MultipleInputs.addInputPath(job, new Path(args[0]),
TextInputFormat.class, PostMapper.class);
MultipleInputs.addInputPath(job, new Path(args[1]),
TextInputFormat.class, CommentMapper.class);
job.setReducerClass(PostCommentHierarchyReducer.class);
job.setOutputFormatClass(TextOutputFormat.class);
TextOutputFormat.setOutputPath(job, new Path(args[2]));
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Text.class);
return job.waitForCompletion(true) ? 0 : 2;
}
/** The cMap physical operator
* @param map_fnc mapper function
* @param acc_fnc optional accumulator function
* @param zero optional the zero value for the accumulator
* @param source input data source
* @param stop_counter optional counter used in repeat operation
* @return a new data source that contains the result
*/
public final static DataSet cMap ( Tree map_fnc, // mapper function
Tree acc_fnc, // optional accumulator function
Tree zero, // optional the zero value for the accumulator
DataSet source, // input data source
String stop_counter ) // optional counter used in repeat operation
throws Exception {
conf = MapReduceEvaluator.clear_configuration(conf);
String newpath = new_path(conf);
conf.set("mrql.mapper",map_fnc.toString());
conf.set("mrql.counter",stop_counter);
if (zero != null) {
conf.set("mrql.accumulator",acc_fnc.toString());
conf.set("mrql.zero",zero.toString());
} else conf.set("mrql.zero","");
setupSplits(source,conf);
Job job = new Job(conf,newpath);
distribute_compiled_arguments(job.getConfiguration());
job.setJarByClass(MapReducePlan.class);
job.setOutputKeyClass(MRContainer.class);
job.setOutputValueClass(MRContainer.class);
job.setOutputFormatClass(SequenceFileOutputFormat.class);
for (DataSource p: source.source)
MultipleInputs.addInputPath(job,new Path(p.path),(Class<? extends MapReduceMRQLFileInputFormat>)p.inputFormat,cMapMapper.class);
FileOutputFormat.setOutputPath(job,new Path(newpath));
job.setNumReduceTasks(0);
job.waitForCompletion(true);
long c = (stop_counter.equals("-")) ? 0
: job.getCounters().findCounter("mrql",stop_counter).getValue();
return new DataSet(new BinaryDataSource(newpath,conf),c,outputRecords(job));
}
public static void initJoinMRJob(Job job, String prospectsPath, String spoPath, Class<? extends Mapper<CompositeType,TripleCard,?,?>> mapperClass,
String outPath, String auths) throws AccumuloSecurityException {
MultipleInputs.addInputPath(job, new Path(prospectsPath), SequenceFileInputFormat.class, mapperClass);
MultipleInputs.addInputPath(job, new Path(spoPath), SequenceFileInputFormat.class, mapperClass);
job.setMapOutputKeyClass(CompositeType.class);
job.setMapOutputValueClass(TripleCard.class);
SequenceFileOutputFormat.setOutputPath(job, new Path(outPath));
job.setOutputFormatClass(SequenceFileOutputFormat.class);
job.setOutputKeyClass(TripleEntry.class);
job.setOutputValueClass(CardList.class);
}
@Override
public int run(String[] args) throws Exception {
Configuration conf = getConf();
String outpath = conf.get(OUTPUTPATH);
Job job = new Job(conf, this.getClass().getSimpleName() + "_" + System.currentTimeMillis());
job.setJarByClass(this.getClass());
conf.setBoolean(MRJobConfig.MAPREDUCE_JOB_USER_CLASSPATH_FIRST, true);
MultipleInputs.addInputPath(job, new Path(PROSPECTSOUT.getAbsolutePath()),
SequenceFileInputFormat.class, JoinSelectAggregateMapper.class);
MultipleInputs.addInputPath(job,new Path(SPOOUT.getAbsolutePath()) ,
SequenceFileInputFormat.class, JoinSelectAggregateMapper.class);
job.setMapOutputKeyClass(CompositeType.class);
job.setMapOutputValueClass(TripleCard.class);
tempDir = new File(File.createTempFile(outpath, "txt").getParentFile(), System.currentTimeMillis() + "");
SequenceFileOutputFormat.setOutputPath(job, new Path(tempDir.getAbsolutePath()));
job.setOutputFormatClass(SequenceFileOutputFormat.class);
job.setOutputKeyClass(TripleEntry.class);
job.setOutputValueClass(CardList.class);
job.setSortComparatorClass(JoinSelectSortComparator.class);
job.setGroupingComparatorClass(JoinSelectGroupComparator.class);
job.setPartitionerClass(JoinSelectPartitioner.class);
job.setReducerClass(JoinReducer.class);
job.setNumReduceTasks(32);
job.waitForCompletion(true);
return job.isSuccessful() ? 0 : 1;
}
/**
* Set up the MapReduce job to use Accumulo as an input.
* @param tableMapper Mapper class to use
*/
protected void configureAccumuloInput(Class<? extends Mapper<Key,Value,?,?>> tableMapper)
throws AccumuloSecurityException {
MRReasoningUtils.configureAccumuloInput(job);
MultipleInputs.addInputPath(job, new Path("/tmp/input"),
AccumuloInputFormat.class, tableMapper);
}
/**
* Set up the MapReduce job to use an RDF file as an input.
* @param rdfMapper class to use
*/
protected void configureRdfInput(Path inputPath,
Class<? extends Mapper<LongWritable, RyaStatementWritable, ?, ?>> rdfMapper) {
Configuration conf = job.getConfiguration();
String format = conf.get(MRUtils.FORMAT_PROP, RDFFormat.RDFXML.getName());
conf.set(MRUtils.FORMAT_PROP, format);
MultipleInputs.addInputPath(job, inputPath,
RdfFileInputFormat.class, rdfMapper);
}
/**
* The MapReduce driver - setup and launch the job.
*
* @param args the command-line arguments
* @return the process exit code
* @throws Exception if something goes wrong
*/
public int run(final String[] args) throws Exception {
Cli cli = Cli.builder().setArgs(args).addOptions(Options.values()).build();
int result = cli.runCmd();
if (result != 0) {
return result;
}
Path usersPath = new Path(cli.getArgValueAsString(Options.USERS));
Path userLogsPath = new Path(cli.getArgValueAsString(Options.USER_LOGS));
Path outputPath = new Path(cli.getArgValueAsString(Options.OUTPUT));
Configuration conf = super.getConf();
Job job = new Job(conf);
job.setJarByClass(StreamingRepartitionJoin.class);
MultipleInputs.addInputPath(job, usersPath, TextInputFormat.class, UserMap.class);
MultipleInputs.addInputPath(job, userLogsPath, TextInputFormat.class, UserLogMap.class);
ShuffleUtils.configBuilder()
.useNewApi()
.setSortIndices(KeyFields.USER, KeyFields.DATASET)
.setPartitionerIndices(KeyFields.USER)
.setGroupIndices(KeyFields.USER)
.configure(job.getConfiguration());
job.setReducerClass(Reduce.class);
job.setMapOutputKeyClass(Tuple.class);
job.setMapOutputValueClass(Tuple.class);
FileOutputFormat.setOutputPath(job, outputPath);
return job.waitForCompletion(true) ? 0 : 1;
}
/**
* The MapReduce driver - setup and launch the job.
*
* @param args the command-line arguments
* @return the process exit code
* @throws Exception if something goes wrong
*/
public int run(final String[] args) throws Exception {
Cli cli = Cli.builder().setArgs(args).addOptions(Options.values()).build();
int result = cli.runCmd();
if (result != 0) {
return result;
}
Path usersPath = new Path(cli.getArgValueAsString(Options.USERS));
Path userLogsPath = new Path(cli.getArgValueAsString(Options.USER_LOGS));
Path bloomPath = new Path(cli.getArgValueAsString(Options.BLOOM_FILE));
Path outputPath = new Path(cli.getArgValueAsString(Options.OUTPUT));
Configuration conf = super.getConf();
Job job = new Job(conf);
job.setJarByClass(BloomJoin.class);
MultipleInputs.addInputPath(job, usersPath, TextInputFormat.class, UserMap.class);
MultipleInputs.addInputPath(job, userLogsPath, TextInputFormat.class, UserLogMap.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(Tuple.class);
job.setReducerClass(Reduce.class);
job.addCacheFile(bloomPath.toUri());
job.getConfiguration().set(AbstractFilterMap.DISTCACHE_FILENAME_CONFIG, bloomPath.getName());
FileInputFormat.setInputPaths(job, userLogsPath);
FileOutputFormat.setOutputPath(job, outputPath);
return job.waitForCompletion(true) ? 0 : 1;
}
private boolean runMrWithLookup(String uuid, TableDescriptor descriptor, List<Path> inprogressPathList, String table,
Path fileCache, Path outputPath, int reducerMultipler, Path tmpPath, TableStats tableStats, String snapshot)
throws ClassNotFoundException, IOException, InterruptedException {
PartitionedInputResult result = buildPartitionedInputData(uuid, tmpPath, descriptor, inprogressPathList, snapshot,
fileCache);
Job job = Job.getInstance(getConf(), "Blur Row Updater for table [" + table + "]");
ExistingDataIndexLookupMapper.setSnapshot(job, MRUPDATE_SNAPSHOT);
FileInputFormat.addInputPath(job, result._partitionedInputData);
MultipleInputs.addInputPath(job, result._partitionedInputData, SequenceFileInputFormat.class,
ExistingDataIndexLookupMapper.class);
for (Path p : inprogressPathList) {
FileInputFormat.addInputPath(job, p);
MultipleInputs.addInputPath(job, p, SequenceFileInputFormat.class, NewDataMapper.class);
}
BlurOutputFormat.setOutputPath(job, outputPath);
BlurOutputFormat.setupJob(job, descriptor);
job.setReducerClass(UpdateReducer.class);
job.setMapOutputKeyClass(IndexKey.class);
job.setMapOutputValueClass(IndexValue.class);
job.setPartitionerClass(IndexKeyPartitioner.class);
job.setGroupingComparatorClass(IndexKeyWritableComparator.class);
BlurOutputFormat.setReducerMultiplier(job, reducerMultipler);
boolean success = job.waitForCompletion(true);
Counters counters = job.getCounters();
LOG.info("Counters [" + counters + "]");
return success;
}
private boolean runMrOnly(TableDescriptor descriptor, List<Path> inprogressPathList, String table, Path fileCache,
Path outputPath, int reducerMultipler) throws IOException, ClassNotFoundException, InterruptedException {
Job job = Job.getInstance(getConf(), "Blur Row Updater for table [" + table + "]");
Path tablePath = new Path(descriptor.getTableUri());
BlurInputFormat.setLocalCachePath(job, fileCache);
BlurInputFormat.addTable(job, descriptor, MRUPDATE_SNAPSHOT);
MultipleInputs.addInputPath(job, tablePath, BlurInputFormat.class, ExistingDataMapper.class);
for (Path p : inprogressPathList) {
FileInputFormat.addInputPath(job, p);
MultipleInputs.addInputPath(job, p, SequenceFileInputFormat.class, NewDataMapper.class);
}
BlurOutputFormat.setOutputPath(job, outputPath);
BlurOutputFormat.setupJob(job, descriptor);
job.setReducerClass(UpdateReducer.class);
job.setMapOutputKeyClass(IndexKey.class);
job.setMapOutputValueClass(IndexValue.class);
job.setPartitionerClass(IndexKeyPartitioner.class);
job.setGroupingComparatorClass(IndexKeyWritableComparator.class);
BlurOutputFormat.setReducerMultiplier(job, reducerMultipler);
boolean success = job.waitForCompletion(true);
Counters counters = job.getCounters();
LOG.info("Counters [" + counters + "]");
return success;
}
@Override
public int run(String[] args) throws Exception {
Configuration conf = new Configuration();
GenericOptionsParser parser = new GenericOptionsParser(conf, args);
String[] otherArgs = parser.getRemainingArgs();
if (otherArgs.length != 4) {
printUsage();
}
Job job = new Job(conf, "ReduceSideJoin");
job.setJarByClass(ReplicatedUserJoin.class);
// Use MultipleInputs to set which input uses what mapper
// This will keep parsing of each data set separate from a logical
// standpoint
// The first two elements of the args array are the two inputs
MultipleInputs.addInputPath(job, new Path(args[0]),
TextInputFormat.class, UserJoinMapper.class);
MultipleInputs.addInputPath(job, new Path(args[1]),
TextInputFormat.class, CommentJoinMapper.class);
job.getConfiguration().set("join.type", args[2]);
job.setReducerClass(UserJoinReducer.class);
job.setOutputFormatClass(TextOutputFormat.class);
TextOutputFormat.setOutputPath(job, new Path(args[3]));
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Text.class);
return job.waitForCompletion(true) ? 0 : 2;
}
@Override
public int run(String[] args) throws Exception {
Configuration conf = new Configuration();
GenericOptionsParser parser = new GenericOptionsParser(conf, args);
String[] otherArgs = parser.getRemainingArgs();
if (otherArgs.length != 4) {
printUsage();
}
Job job = new Job(conf, "ReduceSideJoinBloomFilter");
job.setJarByClass(ReduceSideJoinBloomFilter.class);
// Use MultipleInputs to set which input uses what mapper
// This will keep parsing of each data set separate from a logical
// standpoint
// The first two elements of the args array are the two inputs
MultipleInputs.addInputPath(job, new Path(args[0]),
TextInputFormat.class, UserJoinMapper.class);
MultipleInputs.addInputPath(job, new Path(args[1]),
TextInputFormat.class, CommentJoinMapperWithBloom.class);
job.getConfiguration().set("join.type", args[2]);
job.setReducerClass(UserJoinReducer.class);
job.setOutputFormatClass(TextOutputFormat.class);
TextOutputFormat.setOutputPath(job, new Path(args[3]));
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Text.class);
return job.waitForCompletion(true) ? 0 : 2;
}
@Override
public int run(String[] args) throws Exception {
Configuration conf = new Configuration();
GenericOptionsParser parser = new GenericOptionsParser(conf, args);
String[] otherArgs = parser.getRemainingArgs();
if (otherArgs.length != 4) {
printUsage();
}
Job job = new Job(conf, "ReduceSideJoin");
job.setJarByClass(ReduceSideJoin.class);
// Use MultipleInputs to set which input uses what mapper
// This will keep parsing of each data set separate from a logical
// standpoint
// The first two elements of the args array are the two inputs
MultipleInputs.addInputPath(job, new Path(args[0]),
TextInputFormat.class, UserJoinMapper.class);
MultipleInputs.addInputPath(job, new Path(args[1]),
TextInputFormat.class, CommentJoinMapper.class);
job.getConfiguration().set("join.type", args[2]);
job.setReducerClass(UserJoinReducer.class);
job.setOutputFormatClass(TextOutputFormat.class);
TextOutputFormat.setOutputPath(job, new Path(args[3]));
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Text.class);
return job.waitForCompletion(true) ? 0 : 2;
}
public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
// 1.设置HDFS配置信息
String namenode_ip = "192.168.17.10";
String hdfs = "hdfs://" + namenode_ip + ":9000";
Configuration conf = new Configuration();
conf.set("fs.defaultFS", hdfs);
conf.set("mapreduce.app-submission.cross-platform", "true");
// 2.设置MapReduce作业配置信息
String jobName = "MultInputOutput"; // 作业名称
Job job = Job.getInstance(conf, jobName);
job.setJarByClass(MultiInOutput.class); // 指定运行时作业类
job.setJar("export\\MultiInOutput.jar"); // 指定本地jar包
job.setMapOutputKeyClass(Text.class); // 设置Mapper输出Key类型
job.setMapOutputValueClass(IntWritable.class); // 设置Mapper输出Value类型
job.setReducerClass(MultOutputReducer.class); // 指定Reducer类
// job.setOutputKeyClass(Text.class); //设置Reduce输出Key类型
// job.setOutputValueClass(IntWritable.class); //设置Reduce输出Value类型
// 3.指定作业多输入路径,及Map所使用的类
MultipleInputs.addInputPath(job, new Path(hdfs+"/expr/multiinoutput/data/txt"), TextInputFormat.class, TxtFileMapper.class);
MultipleInputs.addInputPath(job, new Path(hdfs+"/expr/multiinoutput/data/csv"), TextInputFormat.class, CsvFileMapper.class);
// 定义多文件输出的文件名、输出格式、Reduce输出键类型,值类型
MultipleOutputs.addNamedOutput(job, "f2015", TextOutputFormat.class, Text.class, IntWritable.class);
MultipleOutputs.addNamedOutput(job, "f2016", SequenceFileOutputFormat.class, Text.class, IntWritable.class);
MultipleOutputs.addNamedOutput(job, "f2017", MapFileOutputFormat.class, Text.class, IntWritable.class);
// 设置作业输出路径
String outputDir = "/expr/multiinoutput/output"; // 实验输出目录
Path outPath = new Path(hdfs + outputDir);
FileOutputFormat.setOutputPath(job, outPath);
FileSystem fs = FileSystem.get(conf);
if (fs.exists(outPath)) {
fs.delete(outPath, true);
}
// 4.运行作业
System.out.println("Job: " + jobName + " is running...");
if (job.waitForCompletion(true)) {
System.out.println("success!");
System.exit(0);
} else {
System.out.println("failed!");
System.exit(1);
}
}
public static void main(String[] args) throws Exception {
//1.设置HDFS配置信息
String namenode_ip = "192.168.17.10";
String hdfs = "hdfs://" + namenode_ip + ":9000";
Configuration conf = new Configuration();
conf.set("fs.defaultFS", hdfs);
conf.set("mapreduce.app-submission.cross-platform", "true");
//2.设置MapReduce作业配置信息
String jobName = "MultInput2"; //作业名称
Job job = Job.getInstance(conf, jobName);
job.setJarByClass(MultInput2.class); //指定运行时作业类
job.setJar("export\\MultInput2.jar"); //指定本地jar包
//job.setMapperClass(MultInput2Mapper.class); //无需指定Mapper类,而在MultipleInputs.addInputPath()方法中指定
job.setMapOutputKeyClass(Text.class); //设置Mapper输出Key类型
job.setMapOutputValueClass(IntWritable.class); //设置Mapper输出Value类型
job.setReducerClass(MultInput2Reducer.class); //指定Reducer类
job.setOutputKeyClass(Text.class); //设置Reduce输出Key类型
job.setOutputValueClass(IntWritable.class); //设置Reduce输出Value类型
//3.设置作业输入和输出路径
//方法五:MultipleInputs.addInputPath()
MultipleInputs.addInputPath(job, new Path(hdfs+"/expr/multinput/data/txt1"), TextInputFormat.class, TxtFileMapper.class);
MultipleInputs.addInputPath(job, new Path(hdfs+"/expr/multinput/data/csv"), TextInputFormat.class, CsvFileMapper.class);
Path outPath = new Path(hdfs + "/expr/multinput/output3"); //输出目录
FileOutputFormat.setOutputPath(job, outPath);
FileSystem fs = FileSystem.get(conf);
if(fs.exists(outPath)) {
fs.delete(outPath, true);
}
//4.运行作业
System.out.println("Job: " + jobName + " is running...");
if(job.waitForCompletion(true)) {
System.out.println("success!");
System.exit(0);
} else {
System.out.println("failed!");
System.exit(1);
}
}
public int run(String[] args) throws Exception {
//创建job
Configuration config = getConf();
//添加自定义配置
config.addResource("mr.xml");
Job job = Job.getInstance(config);
//通过job设置一些参数
//通过job设置一些参数
job.setJarByClass(ParseLogJob_End.class);
job.setJobName("parselog");
job.setMapperClass(LogMapper.class);
job.setReducerClass(LogReducer.class);
job.setMapOutputKeyClass(TextLongWritable.class);
job.setGroupingComparatorClass(TextLongGroupComparator.class);
job.setPartitionerClass(TextLongPartition.class);
job.setMapOutputValueClass(LogWritable.class);
job.setOutputValueClass(Text.class);
//设置CombineFileInputFormat
job.setInputFormatClass(CombineTextInputFormat.class);
//添加分布式缓存
job.addCacheFile(new URI(config.get("ip.file.path")));
//设置OutputFormat
// job.setOutputFormatClass(LogOutputFormat.class);
//设置压缩类型
// FileOutputFormat.setCompressOutput(job, true);
// FileOutputFormat.setOutputCompressorClass(job, LzopCodec.class);
FileSystem fs = FileSystem.get(config);
//添加输入和输出数据
FileStatus[] fileStatuses = fs.listStatus(new Path(args[0]));
for (int i = 0; i < fileStatuses.length; i++) {
MultipleInputs.addInputPath(job, fileStatuses[i].getPath(), TextInputFormat.class, LogMapper.class);
String inputPath = fileStatuses[i].getPath().toString();
String dir_name = inputPath.substring(inputPath.lastIndexOf('/')+1);
System.out.println(dir_name);
}
Path outputPath = new Path(args[1]);
FileOutputFormat.setOutputPath(job, outputPath);
if (fs.exists(outputPath)) {
fs.delete(outputPath, true);
}
//运行程序
if (!job.waitForCompletion(true)) {
throw new RuntimeException(job.getJobName() + "failed!");
}
return 0;
}
@Override
public int run(String[] args)
throws Exception
{
Job job = Job.getInstance(getConf());
job.setJarByClass(Phase4RemoveDuplicatesUsingReduceSideJoins.class);
job.setJobName(Phase4RemoveDuplicatesUsingReduceSideJoins.class.getName());
// paths
// text files of ids to be deleted
String textFilePath = args[0];
// corpus with *.warc.gz
String commaSeparatedInputFiles = args[1];
// output
String outputPath = args[2];
//second input the look up text file
MultipleInputs.addInputPath(job, new Path(textFilePath), TextInputFormat.class,
JoinTextMapper.class);
//first input the data set (check comma separated availability)
MultipleInputs.addInputPath(job, new Path(commaSeparatedInputFiles), WARCInputFormat.class,
JoinWARCMapper.class);
job.setPartitionerClass(SourceJoiningKeyPartitioner.class);
job.setGroupingComparatorClass(SourceJoiningGroupingComparator.class);
job.setMapOutputKeyClass(CompositeKey.class);
job.setMapOutputValueClass(WARCWritable.class);
job.setReducerClass(JoinReducer.class);
job.setOutputFormatClass(WARCOutputFormat.class);
job.setOutputKeyClass(NullWritable.class);
job.setOutputValueClass(WARCWritable.class);
FileOutputFormat.setOutputPath(job, new Path(outputPath));
FileOutputFormat.setCompressOutput(job, true);
FileOutputFormat.setOutputCompressorClass(job, GzipCodec.class);
return job.waitForCompletion(true) ? 0 : 1;
}
/**
* The MapReduce physical operator
* @param map_fnc the mapper function
* @param combine_fnc optional in-mapper combiner function
* @param reduce_fnc the reducer function
* @param acc_fnc optional accumulator function
* @param zero optional the zero value for the accumulator
* @param source the input data source
* @param num_reduces number of reducers
* @param stop_counter optional counter used in repeat operation
* @param orderp does the result need to be ordered?
* @return a new data source that contains the result
*/
public final static DataSet mapReduce ( Tree map_fnc, // mapper function
Tree combine_fnc, // optional in-mapper combiner function
Tree reduce_fnc, // reducer function
Tree acc_fnc, // optional accumulator function
Tree zero, // optional the zero value for the accumulator
DataSet source, // input data source
int num_reduces, // number of reducers
String stop_counter, // optional counter used in repeat operation
boolean orderp ) // does the result need to be ordered?
throws Exception {
conf = MapReduceEvaluator.clear_configuration(conf);
String newpath = new_path(conf);
conf.set("mrql.mapper",map_fnc.toString());
if (combine_fnc != null)
conf.set("mrql.combiner",combine_fnc.toString());
conf.set("mrql.reducer",reduce_fnc.toString());
if (zero != null) { // will use in-mapper combiner
conf.set("mrql.accumulator",acc_fnc.toString());
conf.set("mrql.zero",zero.toString());
} else conf.set("mrql.zero","");
conf.set("mrql.counter",stop_counter);
setupSplits(source,conf);
Job job = new Job(conf,newpath);
distribute_compiled_arguments(job.getConfiguration());
job.setJarByClass(MapReducePlan.class);
job.setOutputKeyClass(MRContainer.class);
job.setOutputValueClass(MRContainer.class);
job.setPartitionerClass(MRContainerPartitioner.class);
job.setSortComparatorClass(MRContainerKeyComparator.class);
job.setGroupingComparatorClass(MRContainerKeyComparator.class);
job.setOutputFormatClass(SequenceFileOutputFormat.class);
for (DataSource p: source.source)
MultipleInputs.addInputPath(job,new Path(p.path),(Class<? extends MapReduceMRQLFileInputFormat>)p.inputFormat,MRMapper.class);
FileOutputFormat.setOutputPath(job,new Path(newpath));
job.setReducerClass(MRReducer.class);
if (Config.trace && PlanGeneration.streamed_MapReduce_reducer(reduce_fnc))
System.out.println("Streamed MapReduce reducer");
if (num_reduces > 0)
job.setNumReduceTasks(num_reduces);
job.waitForCompletion(true);
long c = (stop_counter.equals("-")) ? 0
: job.getCounters().findCounter("mrql",stop_counter).getValue();
DataSource s = new BinaryDataSource(newpath,conf);
s.to_be_merged = orderp;
return new DataSet(s,c,outputRecords(job));
}
private boolean runAutomatic(String uuid, TableDescriptor descriptor, List<Path> inprogressPathList, String table,
Path fileCache, Path outputPath, int reducerMultipler, Path tmpPath, TableStats tableStats, String snapshot)
throws ClassNotFoundException, IOException, InterruptedException {
PartitionedInputResult result = buildPartitionedInputData(uuid, tmpPath, descriptor, inprogressPathList, snapshot,
fileCache);
Job job = Job.getInstance(getConf(), "Blur Row Updater for table [" + table + "]");
InputSplitPruneUtil.setBlurLookupRowIdFromNewDataCounts(job, table, result._rowIdsFromNewData);
InputSplitPruneUtil.setBlurLookupRowIdUpdateFromNewDataCounts(job, table, result._rowIdsToUpdateFromNewData);
InputSplitPruneUtil.setBlurLookupRowIdFromIndexCounts(job, table, result._rowIdsFromIndex);
InputSplitPruneUtil.setTable(job, table);
BlurInputFormat.setLocalCachePath(job, fileCache);
// Existing data - This adds the copy data files first open and stream
// through all documents.
{
Path tablePath = new Path(descriptor.getTableUri());
BlurInputFormat.addTable(job, descriptor, MRUPDATE_SNAPSHOT);
MultipleInputs.addInputPath(job, tablePath, PrunedBlurInputFormat.class, ExistingDataMapper.class);
}
// Existing data - This adds the row id lookup
{
ExistingDataIndexLookupMapper.setSnapshot(job, MRUPDATE_SNAPSHOT);
FileInputFormat.addInputPath(job, result._partitionedInputData);
MultipleInputs.addInputPath(job, result._partitionedInputData, PrunedSequenceFileInputFormat.class,
ExistingDataIndexLookupMapper.class);
}
// New Data
for (Path p : inprogressPathList) {
FileInputFormat.addInputPath(job, p);
MultipleInputs.addInputPath(job, p, SequenceFileInputFormat.class, NewDataMapper.class);
}
BlurOutputFormat.setOutputPath(job, outputPath);
BlurOutputFormat.setupJob(job, descriptor);
job.setReducerClass(UpdateReducer.class);
job.setMapOutputKeyClass(IndexKey.class);
job.setMapOutputValueClass(IndexValue.class);
job.setPartitionerClass(IndexKeyPartitioner.class);
job.setGroupingComparatorClass(IndexKeyWritableComparator.class);
BlurOutputFormat.setReducerMultiplier(job, reducerMultipler);
boolean success = job.waitForCompletion(true);
Counters counters = job.getCounters();
LOG.info("Counters [" + counters + "]");
return success;
}
@Override
public int run(String[] args) throws Exception {
int c = 0;
if (args.length < 5) {
System.err
.println("Usage Driver <table> <mr inc working path> <output path> <zk connection> <reducer multipler> <extra config files...>");
return 1;
}
String table = args[c++];
String mrIncWorkingPathStr = args[c++];
String outputPathStr = args[c++];
String blurZkConnection = args[c++];
int reducerMultipler = Integer.parseInt(args[c++]);
for (; c < args.length; c++) {
String externalConfigFileToAdd = args[c];
getConf().addResource(new Path(externalConfigFileToAdd));
}
Path outputPath = new Path(outputPathStr);
Path mrIncWorkingPath = new Path(mrIncWorkingPathStr);
FileSystem fileSystem = mrIncWorkingPath.getFileSystem(getConf());
Path newData = new Path(mrIncWorkingPath, NEW);
Path inprogressData = new Path(mrIncWorkingPath, INPROGRESS);
Path completeData = new Path(mrIncWorkingPath, COMPLETE);
Path fileCache = new Path(mrIncWorkingPath, CACHE);
fileSystem.mkdirs(newData);
fileSystem.mkdirs(inprogressData);
fileSystem.mkdirs(completeData);
fileSystem.mkdirs(fileCache);
List<Path> srcPathList = new ArrayList<Path>();
for (FileStatus fileStatus : fileSystem.listStatus(newData)) {
srcPathList.add(fileStatus.getPath());
}
if (srcPathList.isEmpty()) {
return 0;
}
List<Path> inprogressPathList = new ArrayList<Path>();
boolean success = false;
Iface client = null;
try {
inprogressPathList = movePathList(fileSystem, inprogressData, srcPathList);
Job job = Job.getInstance(getConf(), "Blur Row Updater for table [" + table + "]");
client = BlurClient.getClientFromZooKeeperConnectionStr(blurZkConnection);
waitForOtherSnapshotsToBeRemoved(client, table, MRUPDATE_SNAPSHOT);
client.createSnapshot(table, MRUPDATE_SNAPSHOT);
TableDescriptor descriptor = client.describe(table);
Path tablePath = new Path(descriptor.getTableUri());
BlurInputFormat.setLocalCachePath(job, fileCache);
BlurInputFormat.addTable(job, descriptor, MRUPDATE_SNAPSHOT);
MultipleInputs.addInputPath(job, tablePath, BlurInputFormat.class, MapperForExistingData.class);
for (Path p : inprogressPathList) {
FileInputFormat.addInputPath(job, p);
MultipleInputs.addInputPath(job, p, SequenceFileInputFormat.class, MapperForNewData.class);
}
BlurOutputFormat.setOutputPath(job, outputPath);
BlurOutputFormat.setupJob(job, descriptor);
job.setReducerClass(UpdateReducer.class);
job.setMapOutputKeyClass(IndexKey.class);
job.setMapOutputValueClass(IndexValue.class);
job.setPartitionerClass(IndexKeyPartitioner.class);
job.setGroupingComparatorClass(IndexKeyWritableComparator.class);
BlurOutputFormat.setReducerMultiplier(job, reducerMultipler);
success = job.waitForCompletion(true);
Counters counters = job.getCounters();
LOG.info("Counters [" + counters + "]");
} finally {
if (success) {
LOG.info("Indexing job succeeded!");
movePathList(fileSystem, completeData, inprogressPathList);
} else {
LOG.error("Indexing job failed!");
movePathList(fileSystem, newData, inprogressPathList);
}
if (client != null) {
client.removeSnapshot(table, MRUPDATE_SNAPSHOT);
}
}
if (success) {
return 0;
} else {
return 1;
}
}
/**
* The MapReduce driver - setup and launch the job.
*
* @param args the command-line arguments
* @return the process exit code
* @throws Exception if something goes wrong
*/
public int run(final String[] args) throws Exception {
Cli cli = Cli.builder().setArgs(args).addOptions(Options.values()).build();
int result = cli.runCmd();
if (result != 0) {
return result;
}
Path usersPath = new Path(cli.getArgValueAsString(Options.USERS));
Path userLogsPath = new Path(cli.getArgValueAsString(Options.USER_LOGS));
Path outputPath = new Path(cli.getArgValueAsString(Options.OUTPUT));
Configuration conf = super.getConf();
Job job = new Job(conf);
job.setJarByClass(SimpleRepartitionMapReduce2.class);
MultipleInputs.addInputPath(job, usersPath, TextInputFormat.class, UserMap.class);
MultipleInputs.addInputPath(job, userLogsPath, TextInputFormat.class, UserLogMap.class);
job.setReducerClass(Reduce.class);
job.setMapOutputKeyClass(Tuple.class);
job.setMapOutputValueClass(Tuple.class);
FileOutputFormat.setOutputPath(job, outputPath);
return job.waitForCompletion(true) ? 0 : 1;
}
/**
* The MapReduce driver - setup and launch the job.
*
* @param args the command-line arguments
* @return the process exit code
* @throws Exception if something goes wrong
*/
public int run(final String[] args) throws Exception {
Cli cli = Cli.builder().setArgs(args).addOptions(Options.values()).build();
int result = cli.runCmd();
if (result != 0) {
return result;
}
Path usersPath = new Path(cli.getArgValueAsString(Options.USERS));
Path userLogsPath = new Path(cli.getArgValueAsString(Options.USER_LOGS));
Path outputPath = new Path(cli.getArgValueAsString(Options.OUTPUT));
Configuration conf = super.getConf();
Job job = new Job(conf);
job.setJarByClass(SimpleRepartitionJoin.class);
MultipleInputs.addInputPath(job, usersPath, TextInputFormat.class, UserMap.class);
MultipleInputs.addInputPath(job, userLogsPath, TextInputFormat.class, UserLogMap.class);
job.setReducerClass(Reduce.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(Tuple.class);
FileOutputFormat.setOutputPath(job, outputPath);
return job.waitForCompletion(true) ? 0 : 1;
}