下面列出了org.apache.hadoop.mapred.RunningJob#getCounters ( ) 实例代码,或者点击链接到github查看源代码,也可以在右侧发表评论。
/**
* @param jobClient
* @param jobReport
* @param jobId
* @param job
* @throws IOException
*/
private Map<String, Object> getDetailedJobReport(org.apache.hadoop.mapred.JobID jobId) throws IOException {
Map<String, Object> jobDetailedReport = new HashMap<String, Object>();
RunningJob job = jobClient.getJob(jobId);
Counters counters = job.getCounters();
List counterList = new ArrayList();
for (Group group : counters) {
Map<String, Object> counterMap = new HashMap<String, Object>();
counterMap.put("name", group.getDisplayName());
List subCounters = new ArrayList();
for (Counter counter : group) {
Map subCounter = new HashMap();
subCounter.put("name", counter.getDisplayName());
subCounter.put("value", counter.getCounter());
subCounters.add(subCounter);
}
counterMap.put("subCounters", subCounters);
counterList.add(counterMap);
}
jobDetailedReport.put("counters", counterList);
jobDetailedReport.put("mapReport",
getTaskReport(jobClient.getMapTaskReports(jobId)));
jobDetailedReport.put("reduceReport",
getTaskReport(jobClient.getReduceTaskReports(jobId)));
jobDetailedReport.put("cleanupReport",
getTaskReport(jobClient.getCleanupTaskReports(jobId)));
jobDetailedReport.put("setupReport",
getTaskReport(jobClient.getSetupTaskReports(jobId)));
return jobDetailedReport;
}
private void validateOutput(RunningJob runningJob, boolean validateCount)
throws Exception {
LOG.info(runningJob.getCounters().toString());
assertTrue(runningJob.isSuccessful());
if(validateCount) {
//validate counters
String counterGrp = "org.apache.hadoop.mapred.Task$Counter";
Counters counters = runningJob.getCounters();
assertEquals(counters.findCounter(counterGrp, "MAP_SKIPPED_RECORDS").
getCounter(),MAPPER_BAD_RECORDS.size());
int mapRecs = INPUTSIZE - MAPPER_BAD_RECORDS.size();
assertEquals(counters.findCounter(counterGrp, "MAP_INPUT_RECORDS").
getCounter(),mapRecs);
assertEquals(counters.findCounter(counterGrp, "MAP_OUTPUT_RECORDS").
getCounter(),mapRecs);
int redRecs = mapRecs - REDUCER_BAD_RECORDS.size();
assertEquals(counters.findCounter(counterGrp, "REDUCE_SKIPPED_RECORDS").
getCounter(),REDUCER_BAD_RECORDS.size());
assertEquals(counters.findCounter(counterGrp, "REDUCE_SKIPPED_GROUPS").
getCounter(),REDUCER_BAD_RECORDS.size());
assertEquals(counters.findCounter(counterGrp, "REDUCE_INPUT_GROUPS").
getCounter(),redRecs);
assertEquals(counters.findCounter(counterGrp, "REDUCE_INPUT_RECORDS").
getCounter(),redRecs);
assertEquals(counters.findCounter(counterGrp, "REDUCE_OUTPUT_RECORDS").
getCounter(),redRecs);
}
List<String> badRecs = new ArrayList<String>();
badRecs.addAll(MAPPER_BAD_RECORDS);
badRecs.addAll(REDUCER_BAD_RECORDS);
Path[] outputFiles = FileUtil.stat2Paths(
getFileSystem().listStatus(getOutputDir(),
new Utils.OutputFileUtils.OutputFilesFilter()));
if (outputFiles.length > 0) {
InputStream is = getFileSystem().open(outputFiles[0]);
BufferedReader reader = new BufferedReader(new InputStreamReader(is));
String line = reader.readLine();
int counter = 0;
while (line != null) {
counter++;
StringTokenizer tokeniz = new StringTokenizer(line, "\t");
String value = tokeniz.nextToken();
int index = value.indexOf("hey");
assertTrue(index>-1);
if(index>-1) {
String heyStr = value.substring(index);
assertTrue(!badRecs.contains(heyStr));
}
line = reader.readLine();
}
reader.close();
if(validateCount) {
assertEquals(INPUTSIZE-badRecs.size(), counter);
}
}
}
private void validateOutput(RunningJob runningJob, boolean validateCount)
throws Exception {
LOG.info(runningJob.getCounters().toString());
assertTrue(runningJob.isSuccessful());
if(validateCount) {
//validate counters
String counterGrp = "org.apache.hadoop.mapred.Task$Counter";
Counters counters = runningJob.getCounters();
assertEquals(counters.findCounter(counterGrp, "MAP_SKIPPED_RECORDS").
getCounter(),MAPPER_BAD_RECORDS.size());
int mapRecs = INPUTSIZE - MAPPER_BAD_RECORDS.size();
assertEquals(counters.findCounter(counterGrp, "MAP_INPUT_RECORDS").
getCounter(),mapRecs);
assertEquals(counters.findCounter(counterGrp, "MAP_OUTPUT_RECORDS").
getCounter(),mapRecs);
int redRecs = mapRecs - REDUCER_BAD_RECORDS.size();
assertEquals(counters.findCounter(counterGrp, "REDUCE_SKIPPED_RECORDS").
getCounter(),REDUCER_BAD_RECORDS.size());
assertEquals(counters.findCounter(counterGrp, "REDUCE_SKIPPED_GROUPS").
getCounter(),REDUCER_BAD_RECORDS.size());
assertEquals(counters.findCounter(counterGrp, "REDUCE_INPUT_GROUPS").
getCounter(),redRecs);
assertEquals(counters.findCounter(counterGrp, "REDUCE_INPUT_RECORDS").
getCounter(),redRecs);
assertEquals(counters.findCounter(counterGrp, "REDUCE_OUTPUT_RECORDS").
getCounter(),redRecs);
}
List<String> badRecs = new ArrayList<String>();
badRecs.addAll(MAPPER_BAD_RECORDS);
badRecs.addAll(REDUCER_BAD_RECORDS);
Path[] outputFiles = FileUtil.stat2Paths(
getFileSystem().listStatus(getOutputDir(),
new Utils.OutputFileUtils.OutputFilesFilter()));
if (outputFiles.length > 0) {
InputStream is = getFileSystem().open(outputFiles[0]);
BufferedReader reader = new BufferedReader(new InputStreamReader(is));
String line = reader.readLine();
int counter = 0;
while (line != null) {
counter++;
StringTokenizer tokeniz = new StringTokenizer(line, "\t");
String value = tokeniz.nextToken();
int index = value.indexOf("hey");
assertTrue(index>-1);
if(index>-1) {
String heyStr = value.substring(index);
assertTrue(!badRecs.contains(heyStr));
}
line = reader.readLine();
}
reader.close();
if(validateCount) {
assertEquals(INPUTSIZE-badRecs.size(), counter);
}
}
}
private void validateOutput(RunningJob runningJob, boolean validateCount)
throws Exception {
LOG.info(runningJob.getCounters().toString());
assertTrue(runningJob.isSuccessful());
if(validateCount) {
//validate counters
String counterGrp = "org.apache.hadoop.mapred.Task$Counter";
Counters counters = runningJob.getCounters();
assertEquals(counters.findCounter(counterGrp, "MAP_SKIPPED_RECORDS").
getCounter(),MAPPER_BAD_RECORDS.size());
int mapRecs = INPUTSIZE - MAPPER_BAD_RECORDS.size();
assertEquals(counters.findCounter(counterGrp, "MAP_INPUT_RECORDS").
getCounter(),mapRecs);
assertEquals(counters.findCounter(counterGrp, "MAP_OUTPUT_RECORDS").
getCounter(),mapRecs);
int redRecs = mapRecs - REDUCER_BAD_RECORDS.size();
assertEquals(counters.findCounter(counterGrp, "REDUCE_SKIPPED_RECORDS").
getCounter(),REDUCER_BAD_RECORDS.size());
assertEquals(counters.findCounter(counterGrp, "REDUCE_SKIPPED_GROUPS").
getCounter(),REDUCER_BAD_RECORDS.size());
assertEquals(counters.findCounter(counterGrp, "REDUCE_INPUT_GROUPS").
getCounter(),redRecs);
assertEquals(counters.findCounter(counterGrp, "REDUCE_INPUT_RECORDS").
getCounter(),redRecs);
assertEquals(counters.findCounter(counterGrp, "REDUCE_OUTPUT_RECORDS").
getCounter(),redRecs);
}
List<String> badRecs = new ArrayList<String>();
badRecs.addAll(MAPPER_BAD_RECORDS);
badRecs.addAll(REDUCER_BAD_RECORDS);
Path[] outputFiles = FileUtil.stat2Paths(
getFileSystem().listStatus(getOutputDir(),
new Utils.OutputFileUtils.OutputFilesFilter()));
if (outputFiles.length > 0) {
InputStream is = getFileSystem().open(outputFiles[0]);
BufferedReader reader = new BufferedReader(new InputStreamReader(is));
String line = reader.readLine();
int counter = 0;
while (line != null) {
counter++;
StringTokenizer tokeniz = new StringTokenizer(line, "\t");
String value = tokeniz.nextToken();
int index = value.indexOf("hey");
assertTrue(index>-1);
if(index>-1) {
String heyStr = value.substring(index);
assertTrue(!badRecs.contains(heyStr));
}
line = reader.readLine();
}
reader.close();
if(validateCount) {
assertEquals(INPUTSIZE-badRecs.size(), counter);
}
}
}
/**
* @param args
* @throws IOException
*/
public static void main(String[] args) throws IOException
{
String inpath = null;
String outpath = null;
int kmerlen = 0;
int numMappers = 1;
int numReducers = 1;
int showpos = 0;
int data = 1;
if (data == 0)
{
if (args.length != 6)
{
System.err.println("Usage: CountKmers filename outpath kmerlen showpos numMappers numReducers");
return;
}
inpath = args[0];
outpath = args[1];
kmerlen = Integer.parseInt(args[2]);
showpos = Integer.parseInt(args[3]);
numMappers = Integer.parseInt(args[4]);
numReducers = Integer.parseInt(args[5]);
}
else if (data == 1)
{
inpath = "/user/guest/cloudburst/s_suis.br";
outpath = "/user/mschatz/kmers";
kmerlen = 12;
showpos = 0;
numMappers = 1;
numReducers = 1;
}
System.out.println("inpath: " + inpath);
System.out.println("outpath: " + outpath);
System.out.println("kmerlen: " + kmerlen);
System.out.println("showpos: " + showpos);
System.out.println("nummappers: " + numMappers);
System.out.println("numreducers: " + numReducers);
JobConf conf = new JobConf(MerReduce.class);
conf.setNumMapTasks(numMappers);
conf.setNumReduceTasks(numReducers);
conf.addInputPath(new Path(inpath));;
conf.set("KMER_LEN", Integer.toString(kmerlen));
conf.set("SHOW_POS", Integer.toString(showpos));
conf.setInputFormat(SequenceFileInputFormat.class);
conf.setMapOutputKeyClass(BytesWritable.class);
conf.setMapOutputValueClass(IntWritable.class);
//conf.setCompressMapOutput(true);
conf.setOutputKeyClass(Text.class);
conf.setOutputValueClass(Text.class);
conf.setOutputFormat(TextOutputFormat.class);
conf.setMapperClass(MerMapClass.class);
conf.setReducerClass(MerReduceClass.class);
Path oPath = new Path(outpath);
conf.setOutputPath(oPath);
System.err.println(" Removing old results");
FileSystem.get(conf).delete(oPath);
conf.setJobName("CountMers");
Timer t = new Timer();
RunningJob rj = JobClient.runJob(conf);
System.err.println("CountMers Finished");
System.err.println("Total Running time was " + t.get());
Counters counters = rj.getCounters( );
Counters.Group task = counters.getGroup("org.apache.hadoop.mapred.Task$Counter");
long numDistinctMers = task.getCounter("REDUCE_INPUT_GROUPS");
System.err.println("Num Distinct Mers: " + numDistinctMers);
}
private void runProgram(MiniMRCluster mr, MiniDFSCluster dfs,
Path program, Path inputPath, Path outputPath,
int numMaps, int numReduces, String[] expectedResults
) throws IOException {
Path wordExec = new Path("/testing/bin/application");
JobConf job = mr.createJobConf();
job.setNumMapTasks(numMaps);
job.setNumReduceTasks(numReduces);
{
FileSystem fs = dfs.getFileSystem();
fs.delete(wordExec.getParent(), true);
fs.copyFromLocalFile(program, wordExec);
Submitter.setExecutable(job, fs.makeQualified(wordExec).toString());
Submitter.setIsJavaRecordReader(job, true);
Submitter.setIsJavaRecordWriter(job, true);
FileInputFormat.setInputPaths(job, inputPath);
FileOutputFormat.setOutputPath(job, outputPath);
RunningJob rJob = null;
if (numReduces == 0) {
rJob = Submitter.jobSubmit(job);
while (!rJob.isComplete()) {
try {
Thread.sleep(1000);
} catch (InterruptedException ie) {
throw new RuntimeException(ie);
}
}
} else {
rJob = Submitter.runJob(job);
}
assertTrue("pipes job failed", rJob.isSuccessful());
Counters counters = rJob.getCounters();
Counters.Group wordCountCounters = counters.getGroup("WORDCOUNT");
int numCounters = 0;
for (Counter c : wordCountCounters) {
System.out.println(c);
++numCounters;
}
assertTrue("No counters found!", (numCounters > 0));
}
List<String> results = new ArrayList<String>();
for (Path p:FileUtil.stat2Paths(dfs.getFileSystem().listStatus(outputPath,
new OutputLogFilter()))) {
results.add(TestMiniMRWithDFS.readOutput(p, job));
}
assertEquals("number of reduces is wrong",
expectedResults.length, results.size());
for(int i=0; i < results.size(); i++) {
assertEquals("pipes program " + program + " output " + i + " wrong",
expectedResults[i], results.get(i));
}
}
private void validateOutput(RunningJob runningJob, boolean validateCount)
throws Exception {
LOG.info(runningJob.getCounters().toString());
assertTrue(runningJob.isSuccessful());
if(validateCount) {
//validate counters
String counterGrp = "org.apache.hadoop.mapred.Task$Counter";
Counters counters = runningJob.getCounters();
assertEquals(counters.findCounter(counterGrp, "MAP_SKIPPED_RECORDS").
getCounter(),MAPPER_BAD_RECORDS.size());
int mapRecs = INPUTSIZE - MAPPER_BAD_RECORDS.size();
assertEquals(counters.findCounter(counterGrp, "MAP_INPUT_RECORDS").
getCounter(),mapRecs);
assertEquals(counters.findCounter(counterGrp, "MAP_OUTPUT_RECORDS").
getCounter(),mapRecs);
int redRecs = mapRecs - REDUCER_BAD_RECORDS.size();
assertEquals(counters.findCounter(counterGrp, "REDUCE_SKIPPED_RECORDS").
getCounter(),REDUCER_BAD_RECORDS.size());
assertEquals(counters.findCounter(counterGrp, "REDUCE_SKIPPED_GROUPS").
getCounter(),REDUCER_BAD_RECORDS.size());
assertEquals(counters.findCounter(counterGrp, "REDUCE_INPUT_GROUPS").
getCounter(),redRecs);
assertEquals(counters.findCounter(counterGrp, "REDUCE_INPUT_RECORDS").
getCounter(),redRecs);
assertEquals(counters.findCounter(counterGrp, "REDUCE_OUTPUT_RECORDS").
getCounter(),redRecs);
}
List<String> badRecs = new ArrayList<String>();
badRecs.addAll(MAPPER_BAD_RECORDS);
badRecs.addAll(REDUCER_BAD_RECORDS);
Path[] outputFiles = FileUtil.stat2Paths(
getFileSystem().listStatus(getOutputDir(),
new OutputLogFilter()));
if (outputFiles.length > 0) {
InputStream is = getFileSystem().open(outputFiles[0]);
BufferedReader reader = new BufferedReader(new InputStreamReader(is));
String line = reader.readLine();
int counter = 0;
while (line != null) {
counter++;
StringTokenizer tokeniz = new StringTokenizer(line, "\t");
String value = tokeniz.nextToken();
int index = value.indexOf("hey");
assertTrue(index>-1);
if(index>-1) {
String heyStr = value.substring(index);
assertTrue(!badRecs.contains(heyStr));
}
line = reader.readLine();
}
reader.close();
if(validateCount) {
assertEquals(INPUTSIZE-badRecs.size(), counter);
}
}
}