下面列出了org.apache.hadoop.mapreduce.Job#setOutputValueClass ( ) 实例代码,或者点击链接到github查看源代码,也可以在右侧发表评论。
public static void main(String[] args) throws Exception {
Configuration conf = new Configuration();
String[] otherArgs = new GenericOptionsParser(conf,args).getRemainingArgs();
// System.out.println(otherArgs);
if(otherArgs.length != 2) {
System.out.println("Usage:wordcount <in> <out>");
System.exit(2);
}
// if(args.length != 2) {
// System.out.println("param error!");
// System.exit(-1);
// }
Job job = new Job(conf, "word count");
job.setJarByClass(WordCount.class);
job.setMapperClass(TokenizerMapper.class);
job.setCombinerClass(IntSumReducer.class);
job.setReducerClass(IntSumReducer.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
FileInputFormat.addInputPath(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));
System.exit(job.waitForCompletion(true) ? 0 : 1);
}
public static void main(String[] args) throws Exception {
Configuration conf = new Configuration();
if (args.length < 1) {
System.err.println("Usage: CustomQuery configFile");
System.exit(2);
}
String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();
Job job = Job.getInstance(conf, "custom query");
job.setJarByClass(CustomQuery.class);
job.setInputFormatClass(NodeInputFormat.class);
job.setMapperClass(QueryMapper.class);
job.setMapOutputKeyClass(IntWritable.class);
job.setMapOutputValueClass(Text.class);
job.setReducerClass(QueryReducer.class);
job.setOutputFormatClass(KeyValueOutputFormat.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Text.class);
conf = job.getConfiguration();
conf.addResource(otherArgs[0]);
System.exit(job.waitForCompletion(true) ? 0 : 1);
}
public int run(String[] args) throws Exception {
Job job = Job.getInstance(getConf());
if (args.length != 2) {
usage();
return 1;
}
TeraInputFormat.setInputPaths(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));
job.setJobName("TeraValidate");
job.setJarByClass(TeraValidate.class);
job.setMapperClass(ValidateMapper.class);
job.setReducerClass(ValidateReducer.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Text.class);
// force a single reducer
job.setNumReduceTasks(1);
// force a single split
FileInputFormat.setMinInputSplitSize(job, Long.MAX_VALUE);
job.setInputFormatClass(TeraInputFormat.class);
return job.waitForCompletion(true) ? 0 : 1;
}
public static void main(String[] args) throws Exception {
Configuration conf = new Configuration();
String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();
if (otherArgs.length != 3) {
System.err.println("Usage: xflowstatic <type> <in> <out>");
System.exit(2);
}
conf.set(TYPE_KEY, otherArgs[0]);
Job job = Job.getInstance();
job.setJobName("xflowstatic");
job.setJarByClass(XflowStatic.class);
job.setMapperClass(XflowMapper.class);
job.setCombinerClass(IntSumReducer.class);
job.setReducerClass(IntSumReducer.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
FileInputFormat.addInputPath(job, new Path(otherArgs[1]));
FileOutputFormat.setOutputPath(job, new Path(otherArgs[2]));
System.exit(job.waitForCompletion(true) ? 0 : 1);
}
public final static void main(final String[] args) throws Exception {
final Configuration conf = new Configuration();
final Job job = new Job(conf, "P1");
job.setJarByClass(P1.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(DoubleWritable.class);
job.setMapperClass(P1Map.class);
job.setCombinerClass(P1Reduce.class);
job.setReducerClass(P1Reduce.class);
job.setInputFormatClass(TextInputFormat.class);
job.setOutputFormatClass(TextOutputFormat.class);
FileInputFormat.addInputPath(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));
job.waitForCompletion(true);
}
/**
* Set the mapper class implementation to use in the job,
* as well as any related configuration (e.g., map output types).
*/
protected void configureMapper(Job job, String tableName,
String tableClassName) throws ClassNotFoundException, IOException {
job.setMapperClass(getMapperClass());
job.setOutputKeyClass(String.class);
job.setOutputValueClass(NullWritable.class);
}
public int run(String[] args) throws Exception {
final Configuration conf = getConf();
conf.set(MyOutputFormat.HEADER_FROM_FILE, args[0]);
final Job job = new Job(conf);
job.setJarByClass(TestBAM.class);
job.setMapperClass (TestBAMMapper.class);
job.setReducerClass(TestBAMReducer.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(SAMRecordWritable.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass (SAMRecordWritable.class);
job.setInputFormatClass(AnySAMInputFormat.class);
job.setOutputFormatClass(TestBAM.MyOutputFormat.class);
org.apache.hadoop.mapreduce.lib.input.FileInputFormat.addInputPath(job, new Path(args[0]));
org.apache.hadoop.mapreduce.lib.output.FileOutputFormat.setOutputPath(job, new Path(args[1]));
job.submit();
if (!job.waitForCompletion(true)) {
System.err.println("sort :: Job failed.");
return 1;
}
return 0;
}
@Override
public int run(String[] args) throws Exception {
Configuration conf = new Configuration();
GenericOptionsParser parser = new GenericOptionsParser(conf, args);
String[] otherArgs = parser.getRemainingArgs();
if (otherArgs.length != 4) {
printUsage();
}
Job job = new Job(conf, "ReduceSideJoin");
job.setJarByClass(ReplicatedUserJoin.class);
// Use MultipleInputs to set which input uses what mapper
// This will keep parsing of each data set separate from a logical
// standpoint
// The first two elements of the args array are the two inputs
MultipleInputs.addInputPath(job, new Path(args[0]),
TextInputFormat.class, UserJoinMapper.class);
MultipleInputs.addInputPath(job, new Path(args[1]),
TextInputFormat.class, CommentJoinMapper.class);
job.getConfiguration().set("join.type", args[2]);
job.setReducerClass(UserJoinReducer.class);
job.setOutputFormatClass(TextOutputFormat.class);
TextOutputFormat.setOutputPath(job, new Path(args[3]));
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Text.class);
return job.waitForCompletion(true) ? 0 : 2;
}
/**
* Uses the HBase Front Door Api to write to index table. Submits the job and either returns or
* waits for the job completion based on runForeground parameter.
*
* @param job job
* @param outputPath output path
* @param runForeground - if true, waits for job completion, else submits and returns
* immediately.
* @throws Exception
*/
private void configureSubmittableJobUsingDirectApi(Job job, Path outputPath, TableName outputTableName,
boolean skipDependencyJars, boolean runForeground)
throws Exception {
job.setMapperClass(getDirectMapperClass());
job.setReducerClass(getDirectReducerClass());
Configuration conf = job.getConfiguration();
HBaseConfiguration.merge(conf, HBaseConfiguration.create(conf));
conf.set(TableOutputFormat.OUTPUT_TABLE, outputTableName.getNameAsString());
//Set the Output classes
job.setMapOutputValueClass(IntWritable.class);
job.setOutputKeyClass(NullWritable.class);
job.setOutputValueClass(NullWritable.class);
if (!skipDependencyJars) {
TableMapReduceUtil.addDependencyJars(job);
}
job.setNumReduceTasks(1);
if (!runForeground) {
LOG.info("Running Index Build in Background - Submit async and exit");
job.submit();
return;
}
LOG.info("Running Index Build in Foreground. Waits for the build to complete. This may take a long time!.");
boolean result = job.waitForCompletion(true);
if (!result) {
LOG.error("IndexTool job failed!");
throw new Exception("IndexTool job failed: " + job.toString());
}
FileSystem.get(conf).delete(outputPath, true);
}
public int run(String[] args) throws Exception {
GfxdDataSerializable.initTypes();
Configuration conf = getConf();
Path outputPath = new Path(args[0]);
String hdfsHomeDir = args[1];
String tableName = args[2];
outputPath.getFileSystem(conf).delete(outputPath, true);
conf.set(RowInputFormat.HOME_DIR, hdfsHomeDir);
conf.set(RowInputFormat.INPUT_TABLE, tableName);
conf.setBoolean(RowInputFormat.CHECKPOINT_MODE, false);
Job job = Job.getInstance(conf, "Busy Airport Count");
job.setInputFormatClass(RowInputFormat.class);
// configure mapper and reducer
job.setMapperClass(SampleMapper.class);
job.setCombinerClass(IntSumReducer.class);
job.setReducerClass(IntSumReducer.class);
// configure output
TextOutputFormat.setOutputPath(job, outputPath);
job.setOutputFormatClass(TextOutputFormat.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
return job.waitForCompletion(true) ? 0 : 1;
}
@Test
@SuppressWarnings("deprecation")
public void testJobEmptyView() throws Exception {
Job job = new Job(HBaseTestUtils.getConf());
String datasetName = tableName + ".TestGenericEntity";
Dataset<GenericRecord> inputDataset = repo.create("default", "in",
new DatasetDescriptor.Builder()
.schemaLiteral(testGenericEntity).build());
DatasetDescriptor descriptor = new DatasetDescriptor.Builder()
.schemaLiteral(testGenericEntity)
.build();
Dataset<GenericRecord> outputDataset = repo.create("default", datasetName, descriptor);
DatasetKeyInputFormat.configure(job).readFrom(inputDataset);
job.setMapperClass(AvroKeyWrapperMapper.class);
job.setMapOutputKeyClass(AvroKey.class);
job.setMapOutputValueClass(NullWritable.class);
AvroJob.setMapOutputKeySchema(job, new Schema.Parser().parse(testGenericEntity));
job.setReducerClass(AvroKeyWrapperReducer.class);
job.setOutputKeyClass(GenericData.Record.class);
job.setOutputValueClass(Void.class);
AvroJob.setOutputKeySchema(job, new Schema.Parser().parse(testGenericEntity));
DatasetKeyOutputFormat.configure(job).writeTo(outputDataset);
Assert.assertTrue(job.waitForCompletion(true));
}
public static void main(String[] args) throws Exception {
//1.设置HDFS配置信息
String namenode_ip = "192.168.17.10";
String hdfs = "hdfs://" + namenode_ip + ":9000";
Configuration conf = new Configuration();
conf.set("fs.defaultFS", hdfs);
conf.set("mapreduce.app-submission.cross-platform", "true");
//2.设置MapReduce作业配置信息
String jobName = "FlowCount"; //作业名称
Job job = Job.getInstance(conf, jobName);
job.setJarByClass(FlowCount.class); //指定运行时作业类
job.setJar("export\\FlowCount.jar"); //指定本地jar包
job.setMapperClass(FlowCountMapper.class); //指定Mapper类
job.setMapOutputKeyClass(Text.class); //设置Mapper输出Key类型
job.setMapOutputValueClass(IntWritable.class); //设置Mapper输出Value类型
job.setReducerClass(FlowCountReducer.class); //指定Reducer类
job.setOutputKeyClass(Text.class); //设置Reduce输出Key类型
job.setOutputValueClass(IntWritable.class); //设置Reduce输出Value类型
//3.设置作业输入和输出路径
String dataDir = "/expr/weblog/data"; //实验数据目录
String outputDir = "/expr/weblog/output1"; //实验输出目录
Path inPath = new Path(hdfs + dataDir);
Path outPath = new Path(hdfs + outputDir);
FileInputFormat.addInputPath(job, inPath);
FileOutputFormat.setOutputPath(job, outPath);
FileSystem fs = FileSystem.get(conf);
if(fs.exists(outPath)) {
fs.delete(outPath, true);
}
//4.运行作业
System.out.println("Job: " + jobName + " is running...");
if(job.waitForCompletion(true)) {
System.out.println("success!");
System.exit(0);
} else {
System.out.println("failed!");
System.exit(1);
}
}
public static void main(String[] args) throws Exception {
Configuration conf = new Configuration();
String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();
if (otherArgs.length != 3) {
System.err.println("Usage: JobChainingDriver <posts> <users> <out>");
System.exit(2);
}
Path postInput = new Path(otherArgs[0]);
Path userInput = new Path(otherArgs[1]);
Path outputDirIntermediate = new Path(otherArgs[2] + "_int");
Path outputDir = new Path(otherArgs[2]);
// Setup first job to counter user posts
Job countingJob = new Job(conf, "JobChaining-Counting");
countingJob.setJarByClass(BasicJobChaining.class);
// Set our mapper and reducer, we can use the API's long sum reducer for
// a combiner!
countingJob.setMapperClass(UserIdCountMapper.class);
countingJob.setCombinerClass(LongSumReducer.class);
countingJob.setReducerClass(UserIdSumReducer.class);
countingJob.setOutputKeyClass(Text.class);
countingJob.setOutputValueClass(LongWritable.class);
countingJob.setInputFormatClass(TextInputFormat.class);
TextInputFormat.addInputPath(countingJob, postInput);
countingJob.setOutputFormatClass(TextOutputFormat.class);
TextOutputFormat.setOutputPath(countingJob, outputDirIntermediate);
// Execute job and grab exit code
int code = countingJob.waitForCompletion(true) ? 0 : 1;
if (code == 0) {
// Calculate the average posts per user by getting counter values
double numRecords = (double) countingJob.getCounters()
.findCounter(AVERAGE_CALC_GROUP, UserIdCountMapper.RECORDS_COUNTER_NAME)
.getValue();
double numUsers = (double) countingJob.getCounters()
.findCounter(AVERAGE_CALC_GROUP, UserIdSumReducer.USERS_COUNTER_NAME)
.getValue();
double averagePostsPerUser = numRecords / numUsers;
// Setup binning job
Job binningJob = new Job(new Configuration(), "JobChaining-Binning");
binningJob.setJarByClass(BasicJobChaining.class);
// Set mapper and the average posts per user
binningJob.setMapperClass(UserIdBinningMapper.class);
UserIdBinningMapper.setAveragePostsPerUser(binningJob, averagePostsPerUser);
binningJob.setNumReduceTasks(0);
binningJob.setInputFormatClass(TextInputFormat.class);
TextInputFormat.addInputPath(binningJob, outputDirIntermediate);
// Add two named outputs for below/above average
MultipleOutputs.addNamedOutput(binningJob, MULTIPLE_OUTPUTS_BELOW_NAME,
TextOutputFormat.class, Text.class, Text.class);
MultipleOutputs.addNamedOutput(binningJob, MULTIPLE_OUTPUTS_ABOVE_NAME,
TextOutputFormat.class, Text.class, Text.class);
MultipleOutputs.setCountersEnabled(binningJob, true);
TextOutputFormat.setOutputPath(binningJob, outputDir);
// Add the user files to the DistributedCache
FileStatus[] userFiles = FileSystem.get(conf).listStatus(userInput);
for (FileStatus status : userFiles) {
DistributedCache.addCacheFile(status.getPath().toUri(),
binningJob.getConfiguration());
}
// Execute job and grab exit code
code = binningJob.waitForCompletion(true) ? 0 : 1;
}
// Clean up the intermediate output
FileSystem.get(conf).delete(outputDirIntermediate, true);
System.exit(code);
}
public int run(String[] args) throws Exception {
//创建job
Configuration config = getConf();
//添加自定义配置
config.addResource("mr.xml");
Job job = Job.getInstance(config);
//通过job设置一些参数
//通过job设置一些参数
job.setJarByClass(ParseLogJob.class);
job.setJobName("parselog");
job.setMapperClass(LogMapper.class);
job.setReducerClass(LogReducer.class);
job.setMapOutputKeyClass(TextLongWritable.class);
job.setGroupingComparatorClass(TextLongGroupComparator.class);
job.setPartitionerClass(TextLongPartition.class);
job.setMapOutputValueClass(LogWritable.class);
job.setOutputValueClass(Text.class);
//设置CombineFileInputFormat
job.setInputFormatClass(CombineTextInputFormat.class);
//添加分布式缓存
job.addCacheFile(new URI(config.get("ip.file.path")));
//设置OutputFormat
job.setOutputFormatClass(LogOutputFormat.class);
//添加输入和输出数据
FileInputFormat.addInputPath(job, new Path(args[0]));
Path outputPath = new Path(args[1]);
FileOutputFormat.setOutputPath(job, outputPath);
//设置压缩类型
// FileOutputFormat.setCompressOutput(job, true);
// FileOutputFormat.setOutputCompressorClass(job, LzopCodec.class);
FileSystem fs = FileSystem.get(config);
if (fs.exists(outputPath)) {
fs.delete(outputPath, true);
}
//运行程序
if (!job.waitForCompletion(true)) {
throw new RuntimeException(job.getJobName() + "failed!");
}
return 0;
}
public static void main(String[] args)
throws IOException, ClassNotFoundException, TSFHadoopException, URISyntaxException {
if (args.length != 3) {
System.out.println("Please give hdfs url, input path, output path");
return;
}
String HDFSURL = args[0];
Path inputPath = new Path(args[1]);
Path outputPath = new Path(args[2]);
Configuration configuration = new Configuration();
// set file system configuration
//configuration.set("fs.defaultFS", HDFSURL);
Job job = Job.getInstance(configuration);
FileSystem fs = FileSystem.get(configuration);
if(fs.exists(outputPath)){
fs.delete(outputPath,true);
}
job.setJobName("TsFile read jar");
job.setJarByClass(TSFMRReadExample.class);
// set mapper and reducer
job.setMapperClass(TSMapper.class);
job.setReducerClass(TSReducer.class);
// set inputformat and outputformat
job.setInputFormatClass(TSFInputFormat.class);
// set mapper output key and value
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(DoubleWritable.class);
// set reducer output key and value
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(DoubleWritable.class);
// set input file path
TSFInputFormat.setInputPaths(job, inputPath);
// set output file path
TSFOutputFormat.setOutputPath(job, outputPath);
/**
* special configuration for reading tsfile with TSFInputFormat
*/
TSFInputFormat.setReadTime(job, true); // configure reading time enable
TSFInputFormat.setReadDeviceId(job, true); // configure reading deltaObjectId enable
String[] deviceIds = {"device_1"};// configure reading which deviceIds
TSFInputFormat.setReadDeviceIds(job, deviceIds);
String[] measurementIds = {"sensor_1", "sensor_2", "sensor_3"};// configure reading which measurementIds
TSFInputFormat.setReadMeasurementIds(job, measurementIds);
boolean isSuccess = false;
try {
isSuccess = job.waitForCompletion(true);
} catch (InterruptedException e) {
e.printStackTrace();
}
if (isSuccess) {
System.out.println("Execute successfully");
} else {
System.out.println("Execute unsuccessfully");
}
}
public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
Configuration conf = new Configuration();
Job job = new Job(conf);
job.setJobName(TestJob.class.getName());
job.setJarByClass(TestJob.class);
job.setMapperClass(MapJob.class);
job.setReducerClass(ReduceJob.class);
// Hello there ZipFileInputFormat!
job.setInputFormatClass(GraphInputFormat.class);
job.setOutputFormatClass(TextOutputFormat.class);
job.setOutputKeyClass(Text.class);
job.setMapOutputValueClass(Text.class);
job.setOutputValueClass(IntWritable.class);
FileInputFormat.addInputPath(job, new Path(args[0]));
TextOutputFormat.setOutputPath(job, new Path(args[1]));
job.waitForCompletion(true);
}
public int run(String[] args) throws Exception {
GfxdDataSerializable.initTypes();
Configuration conf = getConf();
String hdfsHomeDir = args[0];
String url = args[1];
String tableName = args[2];
System.out.println("TradeSecurityHdfsDataVerifier.run() invoked with "
+ " hdfsHomeDir = " + hdfsHomeDir
+ " url = " + url
+ " tableName = " + tableName);
// Job-specific params
conf.set(RowInputFormat.HOME_DIR, hdfsHomeDir);
conf.set(RowInputFormat.INPUT_TABLE, tableName);
conf.setBoolean(RowInputFormat.CHECKPOINT_MODE, false);
conf.set(RowOutputFormat.OUTPUT_TABLE,tableName + "_HDFS");
conf.set(RowOutputFormat.OUTPUT_URL, url);
Job job = Job.getInstance(conf, "TradeSecurityHdfsDataVerifierV2");
job.setJobName("TradeSecurityHdfsDataVerifierV2");
job.setInputFormatClass(RowInputFormat.class);
job.setOutputFormatClass(RowOutputFormat.class);
job.setMapperClass(HdfsDataMapper.class);
job.setMapOutputKeyClass(Key.class);
job.setMapOutputValueClass(TradeSecurityRow.class);
job.setReducerClass(HdfsDataReducer.class);
job.setOutputKeyClass(Key.class);
job.setOutputValueClass(TradeSecurityOutputObject.class);
StringBuffer aStr = new StringBuffer();
aStr.append("HOME_DIR = " + conf.get(RowInputFormat.HOME_DIR) + " ");
aStr.append("INPUT_TABLE = " + conf.get(RowInputFormat.INPUT_TABLE) + " ");
aStr.append("OUTPUT_TABLE = " + conf.get(RowOutputFormat.OUTPUT_TABLE) + " ");
aStr.append("OUTPUT_URL = " + conf.get(RowOutputFormat.OUTPUT_URL) + " ");
System.out.println("VerifyHdfsData running with the following conf: " + aStr.toString());
return job.waitForCompletion(false) ? 0 : 1;
}
/**
* Sets the {@link Reducer} class to the chain job.
*
* <p>
* The key and values are passed from one element of the chain to the next, by
* value. For the added Reducer the configuration given for it,
* <code>reducerConf</code>, have precedence over the job's Configuration.
* This precedence is in effect when the task is running.
* </p>
* <p>
* IMPORTANT: There is no need to specify the output key/value classes for the
* ChainReducer, this is done by the setReducer or the addMapper for the last
* element in the chain.
* </p>
*
* @param job
* the job
* @param klass
* the Reducer class to add.
* @param inputKeyClass
* reducer input key class.
* @param inputValueClass
* reducer input value class.
* @param outputKeyClass
* reducer output key class.
* @param outputValueClass
* reducer output value class.
* @param reducerConf
* a configuration for the Reducer class. It is recommended to use a
* Configuration without default values using the
* <code>Configuration(boolean loadDefaults)</code> constructor with
* FALSE.
*/
public static void setReducer(Job job, Class<? extends Reducer> klass,
Class<?> inputKeyClass, Class<?> inputValueClass,
Class<?> outputKeyClass, Class<?> outputValueClass,
Configuration reducerConf) {
job.setReducerClass(ChainReducer.class);
job.setOutputKeyClass(outputKeyClass);
job.setOutputValueClass(outputValueClass);
Chain.setReducer(job, klass, inputKeyClass, inputValueClass,
outputKeyClass, outputValueClass, reducerConf);
}
/**
* @throws Exception If failed.
*/
@Test
public void testSimpleTaskSubmit() throws Exception {
String testInputFile = "/test";
prepareTestFile(testInputFile);
Configuration cfg = new Configuration();
setupFileSystems(cfg);
Job job = Job.getInstance(cfg);
job.setMapperClass(TestMapper.class);
job.setCombinerClass(TestReducer.class);
job.setReducerClass(TestReducer.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(IntWritable.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
job.setNumReduceTasks(1);
FileInputFormat.setInputPaths(job, new Path("igfs://:" + getTestIgniteInstanceName(0) + "@/" + testInputFile));
FileOutputFormat.setOutputPath(job, new Path("igfs://:" + getTestIgniteInstanceName(0) + "@/output"));
job.setJarByClass(getClass());
IgniteInternalFuture<?> fut = grid(0).hadoop().submit(new HadoopJobId(UUID.randomUUID(), 1),
createJobInfo(job.getConfiguration(), null));
fut.get();
}
/**
* Does actual test job
*
* @param useNewMapper flag to use new mapper API.
* @param useNewCombiner flag to use new combiner API.
* @param useNewReducer flag to use new reducer API.
*/
protected final void doTest(IgfsPath inFile, boolean useNewMapper, boolean useNewCombiner, boolean useNewReducer)
throws Exception {
log.info("useNewMapper=" + useNewMapper + ", useNewCombiner=" + useNewCombiner + ", useNewReducer=" + useNewReducer);
igfs.delete(new IgfsPath(PATH_OUTPUT), true);
JobConf jobConf = new JobConf();
jobConf.set(HadoopCommonUtils.JOB_COUNTER_WRITER_PROPERTY, IgniteHadoopFileSystemCounterWriter.class.getName());
jobConf.setUser(USER);
jobConf.set(IgniteHadoopFileSystemCounterWriter.COUNTER_WRITER_DIR_PROPERTY, "/xxx/${USER}/zzz");
//To split into about 40 items for v2
jobConf.setInt(FileInputFormat.SPLIT_MAXSIZE, 65000);
//For v1
jobConf.setInt("fs.local.block.size", 65000);
// File system coordinates.
setupFileSystems(jobConf);
HadoopWordCount1.setTasksClasses(jobConf, !useNewMapper, !useNewCombiner, !useNewReducer);
Job job = Job.getInstance(jobConf);
HadoopWordCount2.setTasksClasses(job, useNewMapper, useNewCombiner, useNewReducer, compressOutputSnappy());
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
FileInputFormat.setInputPaths(job, new Path(igfsScheme() + inFile.toString()));
FileOutputFormat.setOutputPath(job, new Path(igfsScheme() + PATH_OUTPUT));
job.setJarByClass(HadoopWordCount2.class);
HadoopJobId jobId = new HadoopJobId(UUID.randomUUID(), 1);
IgniteInternalFuture<?> fut = grid(0).hadoop().submit(jobId, createJobInfo(job.getConfiguration(), null));
fut.get();
checkJobStatistics(jobId);
final String outFile = PATH_OUTPUT + "/" + (useNewReducer ? "part-r-" : "part-") + "00000";
checkOwner(new IgfsPath(PATH_OUTPUT + "/" + "_SUCCESS"));
checkOwner(new IgfsPath(outFile));
String actual = readAndSortFile(outFile, job.getConfiguration());
assertEquals("Use new mapper: " + useNewMapper + ", new combiner: " + useNewCombiner + ", new reducer: " +
useNewReducer,
"blue\t" + blue + "\n" +
"green\t" + green + "\n" +
"red\t" + red + "\n" +
"yellow\t" + yellow + "\n",
actual
);
}