下面列出了org.apache.hadoop.mapreduce.Job#setReducerClass ( ) 实例代码,或者点击链接到github查看源代码,也可以在右侧发表评论。
private void configueAvroMergeJob(Configuration conf, Job job, Path oldPath, Path newPath)
throws IOException {
LOG.info("Trying to merge avro files");
final Schema oldPathSchema = AvroUtil.getAvroSchema(oldPath, conf);
final Schema newPathSchema = AvroUtil.getAvroSchema(newPath, conf);
if (oldPathSchema == null || newPathSchema == null || !oldPathSchema.equals(newPathSchema)) {
throw new IOException("Invalid schema for input directories. Schema for old data: ["
+ oldPathSchema + "]. Schema for new data: [" + newPathSchema + "]");
}
LOG.debug("Avro Schema:" + oldPathSchema);
job.setInputFormatClass(AvroInputFormat.class);
job.setOutputFormatClass(AvroOutputFormat.class);
job.setMapperClass(MergeAvroMapper.class);
job.setReducerClass(MergeAvroReducer.class);
AvroJob.setOutputSchema(job.getConfiguration(), oldPathSchema);
}
public final static void main(final String[] args) throws Exception {
final Configuration conf = new Configuration();
final Job job = new Job(conf, "P1");
job.setJarByClass(P1.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(DoubleWritable.class);
job.setMapperClass(P1Map.class);
job.setCombinerClass(P1Reduce.class);
job.setReducerClass(P1Reduce.class);
job.setInputFormatClass(TextInputFormat.class);
job.setOutputFormatClass(TextOutputFormat.class);
FileInputFormat.addInputPath(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));
job.waitForCompletion(true);
}
public static void runJob(Configuration conf,
Path inputPath,
Path outputPath)
throws Exception {
Job job = new Job(conf);
job.setJarByClass(UniqueHashedKeyJob.class);
job.setMapperClass(Map.class);
job.setReducerClass(Reduce.class);
job.setInputFormatClass(KeyValueTextInputFormat.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(NullWritable.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(NullWritable.class);
outputPath.getFileSystem(conf).delete(outputPath, true);
FileInputFormat.setInputPaths(job, inputPath);
FileOutputFormat.setOutputPath(job, outputPath);
if (!job.waitForCompletion(true)) {
throw new Exception("Job failed");
}
}
public static boolean run(Configuration config, Map<String, String> paths)
throws IOException, ClassNotFoundException, InterruptedException {
String jobName = "step4";
Job job = Job.getInstance(config, jobName);
job.setJarByClass(Step4.class);
job.setJar("export\\ItemCF.jar");
job.setMapperClass(Step4_Mapper.class);
job.setReducerClass(Step4_Reducer.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(Text.class);
Path[] inPaths = new Path[] {
new Path(paths.get("Step4Input1")),
new Path(paths.get("Step4Input2")) };
Path outpath = new Path(paths.get("Step4Output"));
FileInputFormat.setInputPaths(job, inPaths);
FileOutputFormat.setOutputPath(job, outpath);
FileSystem fs = FileSystem.get(config);
if (fs.exists(outpath)) {
fs.delete(outpath, true);
}
return job.waitForCompletion(true);
}
public static void main(String[] args) throws Exception {
Configuration conf = new Configuration();
String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();
if (otherArgs.length < 2) {
System.err.println("Usage: wordcount <in> <out> [wordcount stop word file]");
System.exit(2);
}
Job job = new Job(conf, "word count");
job.setJarByClass(WordCount.class);
job.setMapperClass(TokenizerMapper.class);
job.setCombinerClass(IntSumReducer.class);
job.setReducerClass(IntSumReducer.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
FileInputFormat.addInputPath(job, new Path(otherArgs[0]));
FileOutputFormat.setOutputPath(job, new Path(otherArgs[1]));
if(otherArgs.length > 2){
job.getConfiguration().set(STOP_WORDS_FILE, otherArgs[2]);
}
System.exit(job.waitForCompletion(true) ? 0 : 1);
}
/** Create and setup a job */
private static Job createJob(String name, Configuration conf
) throws IOException {
final Job job = Job.getInstance(conf, NAME + "_" + name);
final Configuration jobconf = job.getConfiguration();
job.setJarByClass(BaileyBorweinPlouffe.class);
// setup mapper
job.setMapperClass(BbpMapper.class);
job.setMapOutputKeyClass(LongWritable.class);
job.setMapOutputValueClass(BytesWritable.class);
// setup reducer
job.setReducerClass(BbpReducer.class);
job.setOutputKeyClass(LongWritable.class);
job.setOutputValueClass(BytesWritable.class);
job.setNumReduceTasks(1);
// setup input
job.setInputFormatClass(BbpInputFormat.class);
// disable task timeout
jobconf.setLong(MRJobConfig.TASK_TIMEOUT, 0);
// do not use speculative execution
jobconf.setBoolean(MRJobConfig.MAP_SPECULATIVE, false);
jobconf.setBoolean(MRJobConfig.REDUCE_SPECULATIVE, false);
return job;
}
public static void main(String[] args) throws Exception {
String namenode_ip = "192.168.17.10";
String hdfs = "hdfs://" + namenode_ip + ":9000";
Configuration conf = new Configuration();
conf.set("fs.defaultFS", hdfs);
conf.set("mapreduce.app-submission.cross-platform", "true");
String jobName = "TopTenJob";
Job job = Job.getInstance(conf, jobName);
job.setJarByClass(TopTenJob.class);
job.setJar("export\\TopTen.jar");
job.setMapperClass(TopTenMapper.class);
job.setMapOutputKeyClass(NullWritable.class);
job.setMapOutputValueClass(Text.class);
job.setReducerClass(TopTenReducer.class);
job.setOutputKeyClass(NullWritable.class);
job.setOutputValueClass(Text.class);
job.setNumReduceTasks(1); //计算最终TopN,只能运行一个Reduce任务
String dataDir = "/expr/topten/data";
String outputDir = "/expr/topten/output";
Path inPath = new Path(hdfs + dataDir);
Path outPath = new Path(hdfs + outputDir);
FileInputFormat.addInputPath(job, inPath);
FileOutputFormat.setOutputPath(job, outPath);
FileSystem fs = FileSystem.get(conf);
if(fs.exists(outPath)) {
fs.delete(outPath, true);
}
System.out.println( "Job: " + jobName + " is running...");
if(job.waitForCompletion(true)) {
System.out.println("success!");
System.exit(0);
} else {
System.out.println("failed!");
System.exit(1);
}
}
/**
* Main方法里面,设置了 Patent任务流程,Mapper ->Combiner ->Reducer ->Partitioner.
*
* @param args
* @throws Exception
*/
public static void main(String[] args) throws Exception {
//配置 Job,并完成初始化
Job job = Job.getInstance(new Configuration());
//指定 Job的主类
job.setJarByClass(PatentMainController.class);
//指定 Job的 Mapper组件
job.setMapperClass(PatentMapper.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(Text.class);
//指定 Job的数据输入地址
FileInputFormat.setInputPaths(job, new Path(args[0]));
//指定 Job的 Combiner组件
job.setCombinerClass(InverseIndexByKeywordCombiner.class);
job.setReducerClass(InverseIndexByKeywordCombiner.class);
//指定 Job的 Reducer组件
job.setReducerClass(PatentReducer.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Text.class);
//指定 Job的数据输出地址
FileOutputFormat.setOutputPath(job, new Path(args[1]));
job.setPartitionerClass(PatentPartitioner.class);
//指定最大的 Task数量
job.setNumReduceTasks(ConfUtil.getMax());
//提交并等待执行完成
job.waitForCompletion(true);
}
@Override
public List<WorkUnit> getWorkunits(SourceState state) {
List<String> dirs = Splitter.on(",").splitToList(state.getProp(INPUT_DIRECTORIES_KEY));
String outputBase = state.getProp(OUTPUT_LOCATION);
List<WorkUnit> workUnits = Lists.newArrayList();
for (String dir : dirs) {
try {
Path input = new Path(dir);
Path output = new Path(outputBase, input.getName());
WorkUnit workUnit = new WorkUnit();
TaskUtils.setTaskFactoryClass(workUnit, MRTaskFactory.class);
Configuration conf = new Configuration();
Job job = Job.getInstance(conf, "WordCount_" + input.getName());
job.setJarByClass(MRTaskFactoryTest.class);
job.setMapperClass(TokenizerMapper.class);
job.setCombinerClass(IntSumReducer.class);
job.setReducerClass(IntSumReducer.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
job.setNumReduceTasks(1);
FileInputFormat.addInputPath(job, input);
FileOutputFormat.setOutputPath(job, output);
MRTask.serializeJobToState(workUnit, job);
workUnits.add(workUnit);
} catch (IOException ioe) {
log.error("Failed to create MR job for " + dir, ioe);
}
}
return workUnits;
}
public static void main(String[] args) throws Exception {
Configuration conf = new Configuration();
String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();
if (otherArgs.length != 2) {
System.err.println("Usage: secondarysrot <in> <out>");
System.exit(2);
}
Job job = new Job(conf, "secondary sort");
job.setJarByClass(SecondarySort.class);
job.setMapperClass(MapClass.class);
job.setReducerClass(Reduce.class);
// group and partition by the first int in the pair
job.setPartitionerClass(FirstPartitioner.class);
job.setGroupingComparatorClass(FirstGroupingComparator.class);
// the map output is IntPair, IntWritable
job.setMapOutputKeyClass(IntPair.class);
job.setMapOutputValueClass(IntWritable.class);
// the reduce output is Text, IntWritable
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
FileInputFormat.addInputPath(job, new Path(otherArgs[0]));
FileOutputFormat.setOutputPath(job, new Path(otherArgs[1]));
System.exit(job.waitForCompletion(true) ? 0 : 1);
}
public static long run(String collection, Path input, Path output,
Configuration baseConf) throws IOException, ClassNotFoundException,
InterruptedException {
Configuration conf = new Configuration(baseConf);
Job job = Job.getInstance(conf);
job.setJarByClass(OnlineFeatureDriver.class);
job.setJobName("GROUP each record's feature BY identifier");
FileInputFormat.setInputPaths(job, input);
FileOutputFormat.setOutputPath(job, output);
job.setInputFormatClass(SequenceFileInputFormat.class);
job.setOutputFormatClass(SequenceFileOutputFormat.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(OnlineVectorWritable.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(ListWritable.class);
job.setMapperClass(OnlineFeatureMapper.class);
job.setReducerClass(OnlineFeatureReducer.class);
HadoopUtil.delete(conf, output);
boolean succeeded = job.waitForCompletion(true);
if (!succeeded) {
throw new IllegalStateException("Job:Group feature, Failed!");
}
Counter counter = job.getCounters().findCounter(
"org.apache.hadoop.mapred.Task$Counter",
"REDUCE_OUTPUT_RECORDS");
long reduceOutputRecords = counter.getValue();
LOG.info(
"Job: GROUP each record's feature BY identifier, output recordes = {}",
reduceOutputRecords);
return reduceOutputRecords;
}
public static void main(String[] args) throws Exception {
Configuration conf = new Configuration();
String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();
if (otherArgs.length != 2) {
System.err.println("Usage: secondarysort <in> <out>");
System.exit(2);
}
Job job = Job.getInstance(conf, "secondary sort");
job.setJarByClass(SecondarySort.class);
job.setMapperClass(MapClass.class);
job.setReducerClass(Reduce.class);
// group and partition by the first int in the pair
job.setPartitionerClass(FirstPartitioner.class);
job.setGroupingComparatorClass(FirstGroupingComparator.class);
// the map output is IntPair, IntWritable
job.setMapOutputKeyClass(IntPair.class);
job.setMapOutputValueClass(IntWritable.class);
// the reduce output is Text, IntWritable
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
FileInputFormat.addInputPath(job, new Path(otherArgs[0]));
FileOutputFormat.setOutputPath(job, new Path(otherArgs[1]));
System.exit(job.waitForCompletion(true) ? 0 : 1);
}
@SuppressWarnings("deprecation")
@Override
public int run(String[] args) throws Exception {
Configuration conf = getConf();
String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();
if (otherArgs.length != 2) {
System.err.println("Usage: secondarysort <in> <out>");
ToolRunner.printGenericCommandUsage(System.out);
return 2;
}
Job job = new Job(conf, "secondary sort");
job.setJarByClass(SecondarySort.class);
job.setMapperClass(MapClass.class);
job.setReducerClass(Reduce.class);
// group and partition by the first int in the pair
job.setPartitionerClass(FirstPartitioner.class);
job.setGroupingComparatorClass(FirstGroupingComparator.class);
// the map output is IntPair, IntWritable
job.setMapOutputKeyClass(IntPair.class);
job.setMapOutputValueClass(IntWritable.class);
// the reduce output is Text, IntWritable
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
FileInputFormat.addInputPath(job, new Path(otherArgs[0]));
FileOutputFormat.setOutputPath(job, new Path(otherArgs[1]));
return job.waitForCompletion(true) ? 0 : 1;
}
public int run(String[] args) throws Exception {
/*Configuration conf = getConf();
JobClient client = new JobClient(conf);
ClusterStatus cluster = client.getClusterStatus();
int num_reduces = (int) (cluster.getMaxReduceTasks() * 0.9);
String join_reduces = conf.get(REDUCES_PER_HOST);
if (join_reduces != null) {
num_reduces = cluster.getTaskTrackers() *
Integer.parseInt(join_reduces);
}
// Set user-supplied (possibly default) job configs
job.setNumReduceTasks(num_reduces);*/
Configuration conf = new Configuration();
//conf.set("fs.defaultFS", "hdfs://node-01:9000");
String[] otherArgs = new GenericOptionsParser(conf, args)
.getRemainingArgs();
String commaSeparatedPaths = null;
String outputDir = null;
if (otherArgs.length == 2) {
commaSeparatedPaths = otherArgs[0];
outputDir = otherArgs[1];
} else {
System.err.println("Usage: <in>[,<in>...] <out>");
//System.exit(-1);
return -1;
}
Job job = Job.getInstance(conf);
job.setJobName("FlowPartitionJob");
job.setJarByClass(FlowPartitionJob.class);
// job.setInputFormatClass(TextInputFormat.class);
// job.setOutputFormatClass(TextOutputFormat.class);
job.setMapperClass(FlowPartitionMapper.class);
//job.setCombinerClass(WordCountReducer.class);
job.setReducerClass(FlowPartitionReducer.class);
job.setPartitionerClass(FlowPartition.class);
job.setNumReduceTasks(5);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(FlowBean.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(FlowBean.class);
FileInputFormat.setInputPaths(job, commaSeparatedPaths);
FileOutputFormat.setOutputPath(job, new Path(outputDir));
return job.waitForCompletion(true) ? 0 : 1;
}
public int run(String[] args) throws Exception {
//创建job
Configuration config = getConf();
//添加自定义配置
config.addResource("mr.xml");
Job job = Job.getInstance(config);
//通过job设置一些参数
job.setJarByClass(ParseLogJob.class);
job.setJobName("parselog");
job.setMapperClass(LogMapper.class);
job.setReducerClass(LogReducer.class);
job.setMapOutputKeyClass(TextLongWritable.class);
job.setGroupingComparatorClass(TextLongGroupComparator.class);
job.setPartitionerClass(TextLongPartition.class);
job.setMapOutputValueClass(LogWritable.class);
job.setOutputValueClass(Text.class);
//设置CombineFileInputFormat
job.setInputFormatClass(CombineTextInputFormat.class);
//添加分布式缓存
job.addCacheFile(new URI(config.get("ip.file.path")));
//设置OutputFormat
job.setOutputFormatClass(LogOutputFormat.class);
//添加输入和输出数据
FileInputFormat.addInputPath(job, new Path(args[0]));
Path outputPath = new Path(args[1]);
FileOutputFormat.setOutputPath(job, outputPath);
//设置压缩类型
// FileOutputFormat.setCompressOutput(job, true);
// FileOutputFormat.setOutputCompressorClass(job, LzopCodec.class);
FileSystem fs = FileSystem.get(config);
if (fs.exists(outputPath)) {
fs.delete(outputPath, true);
}
//运行程序
if (!job.waitForCompletion(true)) {
throw new RuntimeException(job.getJobName() + "failed!");
}
return 0;
}
public static void main(String[] args)
throws IOException, InterruptedException, ClassNotFoundException {
GenericOptionsParser parser = new GenericOptionsParser(args);
String[] customArgs = parser.getRemainingArgs();
Configuration config = parser.getConfiguration();
if (customArgs.length != 5) {
System.out.println(
"Usage: hadoop jar wikipedia_bytes_deploy.jar "
+ "[projectId] [inputDatasetId] [inputTableId] [exportGcsBucket] [jobOutputPath]");
System.exit(1);
}
String projectId = customArgs[0];
String inputDatasetId = customArgs[1];
String inputTableId = customArgs[2];
String exportGcsBucket = customArgs[3];
String jobOutputPath = customArgs[4];
JobConf conf = new JobConf(config, WikipediaRequestBytes.class);
BigQueryConfiguration.configureBigQueryInput(conf, projectId, inputDatasetId, inputTableId);
conf.set(BigQueryConfiguration.GCS_BUCKET.getKey(), exportGcsBucket);
Job job = new Job(conf, "WikipediaRequestBytes");
job.setJarByClass(WikipediaRequestBytes.class);
job.setMapperClass(TitleBytesMapper.class);
job.setCombinerClass(TitleBytesSumReducer.class);
job.setReducerClass(TitleBytesSumReducer.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(LongWritable.class);
FileOutputFormat.setOutputPath(job, new Path(jobOutputPath));
// Read from BigQuery, write with plan TextOutputFormat to provided 'Path'.
job.setInputFormatClass(GsonBigQueryInputFormat.class);
job.setOutputFormatClass(TextOutputFormat.class);
job.waitForCompletion(true);
// Make sure to clean up the GCS export paths if desired, and possibly an intermediate input
// table if we did sharded export and thus didn't clean it up at setup time.
GsonBigQueryInputFormat.cleanupJob(job.getConfiguration(), job.getJobID());
}
public int run(String[] args) throws Exception {
GfxdDataSerializable.initTypes();
Configuration conf = getConf();
String hdfsHomeDir = args[0];
String url = args[1];
String tableName = args[2];
System.out.println("TradeNetworthHdfsDataVerifier.run() invoked with "
+ " hdfsHomeDir = " + hdfsHomeDir
+ " url = " + url
+ " tableName = " + tableName);
// Job-specific params
conf.set(RowInputFormat.HOME_DIR, hdfsHomeDir);
conf.set(RowInputFormat.INPUT_TABLE, tableName);
conf.setBoolean(RowInputFormat.CHECKPOINT_MODE, false);
conf.set(RowOutputFormat.OUTPUT_TABLE,tableName + "_HDFS");
conf.set(RowOutputFormat.OUTPUT_URL, url);
Job job = Job.getInstance(conf, "TradeNetworthHdfsDataVerifierV2");
job.setJobName("TradeNetworthHdfsDataVerifierV2");
job.setInputFormatClass(RowInputFormat.class);
job.setOutputFormatClass(RowOutputFormat.class);
job.setMapperClass(HdfsDataMapper.class);
job.setMapOutputKeyClass(Key.class);
job.setMapOutputValueClass(TradeNetworthRow.class);
job.setReducerClass(HdfsDataReducer.class);
job.setOutputKeyClass(Key.class);
job.setOutputValueClass(TradeNetworthOutputObject.class);
StringBuffer aStr = new StringBuffer();
aStr.append("HOME_DIR = " + conf.get(RowInputFormat.HOME_DIR) + " ");
aStr.append("INPUT_TABLE = " + conf.get(RowInputFormat.INPUT_TABLE) + " ");
aStr.append("OUTPUT_TABLE = " + conf.get(RowOutputFormat.OUTPUT_TABLE) + " ");
aStr.append("OUTPUT_URL = " + conf.get(RowOutputFormat.OUTPUT_URL) + " ");
System.out.println("VerifyHdfsData running with the following conf: " + aStr.toString());
return job.waitForCompletion(false) ? 0 : 1;
}
@Override
public int run(String[] args) throws Exception {
int c = 0;
if (args.length < 5) {
System.err
.println("Usage Driver <table> <mr inc working path> <output path> <zk connection> <reducer multipler> <extra config files...>");
return 1;
}
String table = args[c++];
String mrIncWorkingPathStr = args[c++];
String outputPathStr = args[c++];
String blurZkConnection = args[c++];
int reducerMultipler = Integer.parseInt(args[c++]);
for (; c < args.length; c++) {
String externalConfigFileToAdd = args[c];
getConf().addResource(new Path(externalConfigFileToAdd));
}
Path outputPath = new Path(outputPathStr);
Path mrIncWorkingPath = new Path(mrIncWorkingPathStr);
FileSystem fileSystem = mrIncWorkingPath.getFileSystem(getConf());
Path newData = new Path(mrIncWorkingPath, NEW);
Path inprogressData = new Path(mrIncWorkingPath, INPROGRESS);
Path completeData = new Path(mrIncWorkingPath, COMPLETE);
Path fileCache = new Path(mrIncWorkingPath, CACHE);
fileSystem.mkdirs(newData);
fileSystem.mkdirs(inprogressData);
fileSystem.mkdirs(completeData);
fileSystem.mkdirs(fileCache);
List<Path> srcPathList = new ArrayList<Path>();
for (FileStatus fileStatus : fileSystem.listStatus(newData)) {
srcPathList.add(fileStatus.getPath());
}
if (srcPathList.isEmpty()) {
return 0;
}
List<Path> inprogressPathList = new ArrayList<Path>();
boolean success = false;
Iface client = null;
try {
inprogressPathList = movePathList(fileSystem, inprogressData, srcPathList);
Job job = Job.getInstance(getConf(), "Blur Row Updater for table [" + table + "]");
client = BlurClient.getClientFromZooKeeperConnectionStr(blurZkConnection);
waitForOtherSnapshotsToBeRemoved(client, table, MRUPDATE_SNAPSHOT);
client.createSnapshot(table, MRUPDATE_SNAPSHOT);
TableDescriptor descriptor = client.describe(table);
Path tablePath = new Path(descriptor.getTableUri());
BlurInputFormat.setLocalCachePath(job, fileCache);
BlurInputFormat.addTable(job, descriptor, MRUPDATE_SNAPSHOT);
MultipleInputs.addInputPath(job, tablePath, BlurInputFormat.class, MapperForExistingData.class);
for (Path p : inprogressPathList) {
FileInputFormat.addInputPath(job, p);
MultipleInputs.addInputPath(job, p, SequenceFileInputFormat.class, MapperForNewData.class);
}
BlurOutputFormat.setOutputPath(job, outputPath);
BlurOutputFormat.setupJob(job, descriptor);
job.setReducerClass(UpdateReducer.class);
job.setMapOutputKeyClass(IndexKey.class);
job.setMapOutputValueClass(IndexValue.class);
job.setPartitionerClass(IndexKeyPartitioner.class);
job.setGroupingComparatorClass(IndexKeyWritableComparator.class);
BlurOutputFormat.setReducerMultiplier(job, reducerMultipler);
success = job.waitForCompletion(true);
Counters counters = job.getCounters();
LOG.info("Counters [" + counters + "]");
} finally {
if (success) {
LOG.info("Indexing job succeeded!");
movePathList(fileSystem, completeData, inprogressPathList);
} else {
LOG.error("Indexing job failed!");
movePathList(fileSystem, newData, inprogressPathList);
}
if (client != null) {
client.removeSnapshot(table, MRUPDATE_SNAPSHOT);
}
}
if (success) {
return 0;
} else {
return 1;
}
}
/**
* Sets up the actual job.
*
* @param conf
* The current configuration.
* @param args
* The command line parameters.
* @return The newly created job.
* @throws IOException
* When setting up the job fails.
*/
public static Job createSubmittableJob(Configuration conf, String[] args)
throws IOException, ClassNotFoundException {
// Support non-XML supported characters
// by re-encoding the passed separator as a Base64 string.
String actualSeparator = conf.get(SEPARATOR_CONF_KEY);
if (actualSeparator != null) {
conf.set(SEPARATOR_CONF_KEY,
new String(Base64.encodeBytes(actualSeparator.getBytes())));
}
// See if a non-default Mapper was set
String mapperClassName = conf.get(MAPPER_CONF_KEY);
Class mapperClass = mapperClassName != null ? Class
.forName(mapperClassName) : DEFAULT_MAPPER;
String tableName = args[0];
Path inputDir = new Path(args[1]);
Job job = new Job(conf, NAME + "_" + tableName);
job.setJarByClass(mapperClass);
FileInputFormat.setInputPaths(job, inputDir);
String inputCodec = conf.get(INPUT_LZO_KEY);
if (inputCodec == null) {
FileInputFormat.setMaxInputSplitSize(job, 67108864l); // max split
// size =
// 64m
job.setInputFormatClass(TextInputFormat.class);
} else {
if (inputCodec.equalsIgnoreCase("lzo"))
job.setInputFormatClass(LzoTextInputFormat.class);
else {
usage("not supported compression codec!");
System.exit(-1);
}
}
job.setMapperClass(mapperClass);
String hfileOutPath = conf.get(BULK_OUTPUT_CONF_KEY);
if (hfileOutPath != null) {
HTable table = new HTable(conf, tableName);
job.setReducerClass(PutSortReducer.class);
Path outputDir = new Path(hfileOutPath);
FileOutputFormat.setOutputPath(job, outputDir);
job.setMapOutputKeyClass(ImmutableBytesWritable.class);
job.setMapOutputValueClass(Put.class);
HFileOutputFormat.configureIncrementalLoad(job, table);
} else {
// No reducers. Just write straight to table. Call
// initTableReducerJob
// to set up the TableOutputFormat.
TableMapReduceUtil.initTableReducerJob(tableName, null, job);
job.setNumReduceTasks(0);
}
TableMapReduceUtil.addDependencyJars(job);
TableMapReduceUtil.addDependencyJars(job.getConfiguration(),
com.google.common.base.Function.class /*
* Guava used by TsvParser
*/);
return job;
}
/**
* @throws Exception If failed.
*/
@Test
public void testMapCombineRun() throws Exception {
int lineCnt = 10001;
String fileName = "/testFile";
prepareFile(fileName, lineCnt);
totalLineCnt.set(0);
taskWorkDirs.clear();
Configuration cfg = new Configuration();
cfg.setStrings("fs.igfs.impl", IgniteHadoopFileSystem.class.getName());
cfg.setBoolean(MAP_WRITE, true);
Job job = Job.getInstance(cfg);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
job.setMapperClass(TestMapper.class);
job.setCombinerClass(TestCombiner.class);
job.setReducerClass(TestReducer.class);
job.setNumReduceTasks(2);
job.setInputFormatClass(TextInputFormat.class);
FileInputFormat.setInputPaths(job, new Path("igfs://" + igfsName + "@/"));
FileOutputFormat.setOutputPath(job, new Path("igfs://" + igfsName + "@/output/"));
job.setJarByClass(getClass());
HadoopJobId jobId = new HadoopJobId(UUID.randomUUID(), 2);
IgniteInternalFuture<?> fut = grid(0).hadoop().submit(jobId, createJobInfo(job.getConfiguration(), null));
fut.get();
assertEquals(lineCnt, totalLineCnt.get());
assertEquals(34, taskWorkDirs.size());
for (int g = 0; g < gridCount(); g++)
grid(g).hadoop().finishFuture(jobId).get();
}