下面列出了org.apache.hadoop.mapreduce.lib.output.MultipleOutputs#setCountersEnabled ( ) 实例代码,或者点击链接到github查看源代码,也可以在右侧发表评论。
/**
* Set up a MapReduce job to output human-readable text.
*/
protected void configureTextOutput(String destination) {
Path outPath;
outPath = MRReasoningUtils.getOutputPath(job.getConfiguration(), destination);
TextOutputFormat.setOutputPath(job, outPath);
LazyOutputFormat.setOutputFormatClass(job, TextOutputFormat.class);
MultipleOutputs.addNamedOutput(job, MRReasoningUtils.INTERMEDIATE_OUT,
TextOutputFormat.class, NullWritable.class, Text.class);
MultipleOutputs.addNamedOutput(job, MRReasoningUtils.TERMINAL_OUT,
TextOutputFormat.class, NullWritable.class, Text.class);
MultipleOutputs.addNamedOutput(job, MRReasoningUtils.SCHEMA_OUT,
TextOutputFormat.class, NullWritable.class, Text.class);
MultipleOutputs.addNamedOutput(job, MRReasoningUtils.INCONSISTENT_OUT,
TextOutputFormat.class, NullWritable.class, Text.class);
MultipleOutputs.addNamedOutput(job, MRReasoningUtils.DEBUG_OUT,
TextOutputFormat.class, Text.class, Text.class);
MultipleOutputs.setCountersEnabled(job, true);
}
/**
* Set up the MapReduce job to output a schema (TBox).
*/
protected void configureSchemaOutput() {
Path outPath = MRReasoningUtils.getSchemaPath(job.getConfiguration());
SequenceFileOutputFormat.setOutputPath(job, outPath);
job.setOutputFormatClass(SequenceFileOutputFormat.class);
job.setOutputKeyClass(NullWritable.class);
job.setOutputValueClass(SchemaWritable.class);
LazyOutputFormat.setOutputFormatClass(job, SequenceFileOutputFormat.class);
MultipleOutputs.addNamedOutput(job, "schemaobj",
SequenceFileOutputFormat.class, NullWritable.class, SchemaWritable.class);
MultipleOutputs.addNamedOutput(job, MRReasoningUtils.DEBUG_OUT,
TextOutputFormat.class, Text.class, Text.class);
MultipleOutputs.setCountersEnabled(job, true);
}
/**
* Set up a MapReduce job to output newly derived triples.
* @param intermediate True if this is intermediate data. Outputs
* to [base]-[iteration]-[temp].
*/
protected void configureDerivationOutput(boolean intermediate) {
Path outPath;
Configuration conf = job.getConfiguration();
int iteration = MRReasoningUtils.getCurrentIteration(conf);
if (intermediate) {
outPath = MRReasoningUtils.getOutputPath(conf,
MRReasoningUtils.OUTPUT_BASE + iteration
+ MRReasoningUtils.TEMP_SUFFIX);
}
else {
outPath = MRReasoningUtils.getOutputPath(conf,
MRReasoningUtils.OUTPUT_BASE + iteration);
}
SequenceFileOutputFormat.setOutputPath(job, outPath);
LazyOutputFormat.setOutputFormatClass(job, SequenceFileOutputFormat.class);
MultipleOutputs.addNamedOutput(job, MRReasoningUtils.INTERMEDIATE_OUT,
SequenceFileOutputFormat.class, Fact.class, NullWritable.class);
MultipleOutputs.addNamedOutput(job, MRReasoningUtils.TERMINAL_OUT,
SequenceFileOutputFormat.class, Fact.class, NullWritable.class);
MultipleOutputs.addNamedOutput(job, MRReasoningUtils.SCHEMA_OUT,
SequenceFileOutputFormat.class, Fact.class, NullWritable.class);
MultipleOutputs.addNamedOutput(job, MRReasoningUtils.INCONSISTENT_OUT,
SequenceFileOutputFormat.class, Derivation.class, NullWritable.class);
MultipleOutputs.setCountersEnabled(job, true);
// Set up an output for diagnostic info, if needed
MultipleOutputs.addNamedOutput(job, MRReasoningUtils.DEBUG_OUT,
TextOutputFormat.class, Text.class, Text.class);
}
@Override
public int run(String[] args) throws Exception {
Configuration conf = new Configuration();
GenericOptionsParser parser = new GenericOptionsParser(conf, args);
String[] otherArgs = parser.getRemainingArgs();
if (otherArgs.length != 2) {
System.err.println("Usage: BinningTags <in> <out>");
ToolRunner.printGenericCommandUsage(System.err);
System.exit(2);
}
Job job = new Job(conf, "Binning Tags");
job.setJarByClass(BinningTags.class);
// Configure the MultipleOutputs by adding an output called "bins"
// With the proper output format and mapper key/value pairs
MultipleOutputs.addNamedOutput(job, "bins", TextOutputFormat.class,
Text.class, NullWritable.class);
// Enable the counters for the job
// If there are a significant number of different named outputs, this
// should be disabled
MultipleOutputs.setCountersEnabled(job, true);
// Map-only job
job.setNumReduceTasks(0);
job.setMapperClass(BinningMapper.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(NullWritable.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(NullWritable.class);
FileInputFormat.addInputPath(job, new Path(otherArgs[0]));
FileOutputFormat.setOutputPath(job, new Path(otherArgs[1]));
boolean success = job.waitForCompletion(true);
return success ? 0 : 1;
}
public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
Configuration conf = new Configuration();
String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();
// LOGGER.debug("Data validation job received args length [ " +
// otherArgs.length + "]");
StringBuilder sb = new StringBuilder();
for (int j = 0; j < otherArgs.length; j++) {
sb.append(otherArgs[j]);
}
String validationInfoJson = sb.toString();
Gson gson = new Gson();
DataSourceCompValidationInfo validationInfo = gson.fromJson(validationInfoJson,
DataSourceCompValidationInfo.class);
DataSourceCompJobExecutor dscJobExecutor = new DataSourceCompJobExecutor();
dscJobExecutor.removeSlash(validationInfo);
dscJobExecutor.addTransformationNumber(validationInfo);
DataSourceCompMapperInfo mapperInfo = dscJobExecutor.createMapperInfo(validationInfo);
String outputPath = DataSourceCompConstants.OUTPUT_DIR_PATH + new Date().getTime();
// String outputPath = "/destination";
conf.set("validationInfoJson", gson.toJson(validationInfo));
conf.set("mapperInfoJson", gson.toJson(mapperInfo));
conf.set("fs.hdfs.impl", org.apache.hadoop.hdfs.DistributedFileSystem.class.getName());
conf.set("fs.file.impl", org.apache.hadoop.fs.LocalFileSystem.class.getName());
FileSystem fileSystem = FileSystem.get(conf);
List<Path> mapperFilesList = dscJobExecutor.getFiles(validationInfo.getSourcePath(), fileSystem);
mapperFilesList.addAll(dscJobExecutor.getFiles(validationInfo.getDestinationPath(), fileSystem));
Map<String, String> filesMap = dscJobExecutor.encodeFilesMap(mapperFilesList);
Map<String, String> reverseFilesMap = dscJobExecutor.invertMap(filesMap);
Path[] patharr = new Path[mapperFilesList.size()];
for (int i = 0; i < mapperFilesList.size(); i++) {
patharr[i] = mapperFilesList.get(i);
}
conf.set("filesMap", gson.toJson(filesMap));
String recordSeparator = validationInfo.getRecordSeparator();
if (recordSeparator == null || recordSeparator.trim().isEmpty()) {
recordSeparator = "\n";
}
conf.set("textinputformat.record.delimiter", recordSeparator);
Job job = Job.getInstance(conf, "jumbune_dsc_" + validationInfo.getJobName());
job.setJarByClass(DataSourceCompJobExecutor.class);
job.setMapperClass(org.jumbune.datavalidation.dsc.DataSourceCompMapper.class);
job.setReducerClass(DataSourceCompReducer.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(DataSourceCompMapValueWritable.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Text.class);
job.setInputFormatClass(TextInputFormat.class);
job.setOutputFormatClass(TextOutputFormat.class);
FileInputFormat.setInputPaths(job, patharr);
FileOutputFormat.setOutputPath(job, new Path(outputPath));
MultipleOutputs.setCountersEnabled(job, true);
job.waitForCompletion(true);
String workerDirPath = validationInfo.getSlaveFileLoc();
dscJobExecutor.copyResult(conf, outputPath, workerDirPath);
dscJobExecutor.renameFiles(workerDirPath, reverseFilesMap);
DataSourceCompReportBean reportBean = dscJobExecutor.calculateCounters(job, outputPath, reverseFilesMap,
validationInfo.getValidationsList());
LOGGER.info(DataValidationConstants.DV_REPORT + gson.toJson(reportBean));
}
public static void main(String[] args) throws Exception {
Configuration conf = new Configuration();
String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();
if (otherArgs.length != 3) {
System.err.println("Usage: JobChainingDriver <posts> <users> <out>");
System.exit(2);
}
Path postInput = new Path(otherArgs[0]);
Path userInput = new Path(otherArgs[1]);
Path outputDirIntermediate = new Path(otherArgs[2] + "_int");
Path outputDir = new Path(otherArgs[2]);
// Setup first job to counter user posts
Job countingJob = new Job(conf, "JobChaining-Counting");
countingJob.setJarByClass(BasicJobChaining.class);
// Set our mapper and reducer, we can use the API's long sum reducer for
// a combiner!
countingJob.setMapperClass(UserIdCountMapper.class);
countingJob.setCombinerClass(LongSumReducer.class);
countingJob.setReducerClass(UserIdSumReducer.class);
countingJob.setOutputKeyClass(Text.class);
countingJob.setOutputValueClass(LongWritable.class);
countingJob.setInputFormatClass(TextInputFormat.class);
TextInputFormat.addInputPath(countingJob, postInput);
countingJob.setOutputFormatClass(TextOutputFormat.class);
TextOutputFormat.setOutputPath(countingJob, outputDirIntermediate);
// Execute job and grab exit code
int code = countingJob.waitForCompletion(true) ? 0 : 1;
if (code == 0) {
// Calculate the average posts per user by getting counter values
double numRecords = (double) countingJob.getCounters()
.findCounter(AVERAGE_CALC_GROUP, UserIdCountMapper.RECORDS_COUNTER_NAME)
.getValue();
double numUsers = (double) countingJob.getCounters()
.findCounter(AVERAGE_CALC_GROUP, UserIdSumReducer.USERS_COUNTER_NAME)
.getValue();
double averagePostsPerUser = numRecords / numUsers;
// Setup binning job
Job binningJob = new Job(new Configuration(), "JobChaining-Binning");
binningJob.setJarByClass(BasicJobChaining.class);
// Set mapper and the average posts per user
binningJob.setMapperClass(UserIdBinningMapper.class);
UserIdBinningMapper.setAveragePostsPerUser(binningJob, averagePostsPerUser);
binningJob.setNumReduceTasks(0);
binningJob.setInputFormatClass(TextInputFormat.class);
TextInputFormat.addInputPath(binningJob, outputDirIntermediate);
// Add two named outputs for below/above average
MultipleOutputs.addNamedOutput(binningJob, MULTIPLE_OUTPUTS_BELOW_NAME,
TextOutputFormat.class, Text.class, Text.class);
MultipleOutputs.addNamedOutput(binningJob, MULTIPLE_OUTPUTS_ABOVE_NAME,
TextOutputFormat.class, Text.class, Text.class);
MultipleOutputs.setCountersEnabled(binningJob, true);
TextOutputFormat.setOutputPath(binningJob, outputDir);
// Add the user files to the DistributedCache
FileStatus[] userFiles = FileSystem.get(conf).listStatus(userInput);
for (FileStatus status : userFiles) {
DistributedCache.addCacheFile(status.getPath().toUri(),
binningJob.getConfiguration());
}
// Execute job and grab exit code
code = binningJob.waitForCompletion(true) ? 0 : 1;
}
// Clean up the intermediate output
FileSystem.get(conf).delete(outputDirIntermediate, true);
System.exit(code);
}