下面列出了怎么用org.apache.hadoop.mapred.JobHistory.JobInfo的API类实例代码及写法,或者点击链接到github查看源代码。
public static JobInfo getJobInfo(HttpServletRequest request, FileSystem fs)
throws IOException {
String jobid = request.getParameter("jobid");
String logFile = request.getParameter("logFile");
synchronized(jobHistoryCache) {
JobInfo jobInfo = jobHistoryCache.remove(jobid);
if (jobInfo == null) {
jobInfo = new JobHistory.JobInfo(jobid);
LOG.info("Loading Job History file "+jobid + ". Cache size is " +
jobHistoryCache.size());
DefaultJobHistoryParser.parseJobTasks( logFile, jobInfo, fs) ;
}
jobHistoryCache.put(jobid, jobInfo);
if (jobHistoryCache.size() > CACHE_SIZE) {
Iterator<Map.Entry<String, JobInfo>> it =
jobHistoryCache.entrySet().iterator();
String removeJobId = it.next().getKey();
it.remove();
LOG.info("Job History file removed form cache "+removeJobId);
}
return jobInfo;
}
}
/**
* @param jobConfFile - URL pointing to job configuration (job_conf.xml) file
* @param jobHistoryLogFile - URL pointing to job history log file
* @param testsConfFile - file path for test configuration file (optional).
* If not specified default path is:$HADOOP_HOME/contrib/vaidya/pxpd_tests_config.xml
* @param reportFile - file path for storing report (optional)
*/
public PostExPerformanceDiagnoser (String jobConfFile, String jobHistoryFile, InputStream testsConfFileIs,
String reportFile) throws Exception {
this._jobHistoryFile = jobHistoryFile;
this._testsConfFileIs = testsConfFileIs;
this._reportFile = reportFile;
this._jobConfFile = jobConfFile;
/*
* Read the job information necessary for post performance analysis
*/
JobConf jobConf = new JobConf();
JobInfo jobInfo = new JobInfo("");
readJobInformation(jobConf, jobInfo);
this._jobExecutionStatistics = new JobStatistics(jobConf, jobInfo);
}
/**
* read and populate job statistics information.
*/
private void readJobInformation(JobConf jobConf, JobInfo jobInfo) throws Exception {
/*
* Convert the input strings to URL
*/
URL jobConfFileUrl = new URL(this._jobConfFile);
URL jobHistoryFileUrl = new URL (this._jobHistoryFile);
/*
* Read the Job Configuration from the jobConfFile url
*/
jobConf.addResource(jobConfFileUrl);
/*
* Read JobHistoryFile and build job counters to evaluate diagnostic rules
*/
if (jobHistoryFileUrl.getProtocol().equals("hdfs")) {
DefaultJobHistoryParser.parseJobTasks (jobHistoryFileUrl.getPath(), jobInfo, FileSystem.get(jobConf));
} else if (jobHistoryFileUrl.getProtocol().equals("file")) {
DefaultJobHistoryParser.parseJobTasks (jobHistoryFileUrl.getPath(), jobInfo, FileSystem.getLocal(jobConf));
} else {
throw new Exception("Malformed URL. Protocol: "+jobHistoryFileUrl.getProtocol());
}
}
/**
* @param jobConfFile - URL pointing to job configuration (job_conf.xml) file
* @param jobHistoryLogFile - URL pointing to job history log file
* @param testsConfFile - file path for test configuration file (optional).
* If not specified default path is:$HADOOP_HOME/contrib/vaidya/pxpd_tests_config.xml
* @param reportFile - file path for storing report (optional)
*/
public PostExPerformanceDiagnoser (String jobConfFile, String jobHistoryFile, InputStream testsConfFileIs,
String reportFile) throws Exception {
this._jobHistoryFile = jobHistoryFile;
this._testsConfFileIs = testsConfFileIs;
this._reportFile = reportFile;
this._jobConfFile = jobConfFile;
/*
* Read the job information necessary for post performance analysis
*/
JobConf jobConf = new JobConf();
JobInfo jobInfo = new JobInfo("");
readJobInformation(jobConf, jobInfo);
this._jobExecutionStatistics = new JobStatistics(jobConf, jobInfo);
}
/**
* read and populate job statistics information.
*/
private void readJobInformation(JobConf jobConf, JobInfo jobInfo) throws Exception {
/*
* Convert the input strings to URL
*/
URL jobConfFileUrl = new URL(this._jobConfFile);
URL jobHistoryFileUrl = new URL (this._jobHistoryFile);
/*
* Read the Job Configuration from the jobConfFile url
*/
jobConf.addResource(jobConfFileUrl);
/*
* Read JobHistoryFile and build job counters to evaluate diagnostic rules
*/
if (jobHistoryFileUrl.getProtocol().equals("hdfs")) {
DefaultJobHistoryParser.parseJobTasks (jobHistoryFileUrl.getPath(), jobInfo, FileSystem.get(jobConf));
} else if (jobHistoryFileUrl.getProtocol().equals("file")) {
DefaultJobHistoryParser.parseJobTasks (jobHistoryFileUrl.getPath(), jobInfo, FileSystem.getLocal(jobConf));
} else {
throw new Exception("Malformed URL. Protocol: "+jobHistoryFileUrl.getProtocol());
}
}
public static void parseJobHistory(Configuration jobConf, JobInfo jobInfo, MRJobInfo value) {
value.job.clear();
populateJob(jobInfo.getValues(), value.job);
value.mapTask.clear();
value.reduceTask.clear();
populateMapReduceTaskLists(value, jobInfo.getAllTasks());
}
/**
* Tests the JobHistory parser with different versions of job history files
*/
public void testJobHistoryVersion() throws IOException {
// If new job history version comes up, the modified parser may fail for
// the history file created by writeHistoryFile().
for (long version = 0; version <= JobHistory.VERSION; version++) {
JobConf conf = new JobConf();
FileSystem fs = FileSystem.getLocal(conf);
// cleanup
fs.delete(TEST_DIR, true);
Path historyPath = new Path(TEST_DIR + "/_logs/history/" +
FILENAME + version);
fs.delete(historyPath, false);
FSDataOutputStream out = fs.create(historyPath);
writeHistoryFile(out, version);
out.close();
JobInfo job = new JobHistory.JobInfo(JOB);
DefaultJobHistoryParser.parseJobTasks(historyPath.toString(), job, fs);
assertTrue("Failed to parse jobhistory files of version " + version,
job.getAllTasks().size() > 0);
// cleanup
fs.delete(TEST_DIR, true);
}
}
public JobStatistics (JobConf jobConf, JobInfo jobInfo) throws ParseException {
this._jobConf = jobConf;
this._jobInfo = jobInfo;
this._job = new Hashtable<Enum, String>();
populate_Job(this._job, this._jobInfo.getValues());
populate_MapReduceTaskLists(this._mapTaskList, this._reduceTaskList, this._jobInfo.getAllTasks());
// Add the Job Type: MAP_REDUCE, MAP_ONLY
if (getLongValue(JobKeys.TOTAL_REDUCES) == 0) {
this._job.put(JobKeys.JOBTYPE,"MAP_ONLY");
} else {
this._job.put(JobKeys.JOBTYPE,"MAP_REDUCE");
}
}
/**
* Tests the JobHistory parser with different versions of job history files
*/
public void testJobHistoryVersion() throws IOException {
// If new job history version comes up, the modified parser may fail for
// the history file created by writeHistoryFile().
for (long version = 0; version <= JobHistory.VERSION; version++) {
JobConf conf = new JobConf();
FileSystem fs = FileSystem.getLocal(conf);
// cleanup
fs.delete(TEST_DIR, true);
Path historyPath = new Path(TEST_DIR + "/_logs/history/" +
FILENAME + version);
fs.delete(historyPath, false);
FSDataOutputStream out = fs.create(historyPath);
writeHistoryFile(out, version);
out.close();
JobInfo job = new JobHistory.JobInfo(JOB);
DefaultJobHistoryParser.parseJobTasks(historyPath.toString(), job, fs);
assertTrue("Failed to parse jobhistory files of version " + version,
job.getAllTasks().size() > 0);
// cleanup
fs.delete(TEST_DIR, true);
}
}
public JobStatistics (JobConf jobConf, JobInfo jobInfo) throws ParseException {
this._jobConf = jobConf;
this._jobInfo = jobInfo;
this._job = new Hashtable<Enum, String>();
populate_Job(this._job, this._jobInfo.getValues());
populate_MapReduceTaskLists(this._mapTaskList, this._reduceTaskList, this._jobInfo.getAllTasks());
}
@Override
public boolean nextKeyValue() throws IOException, InterruptedException {
if (location != null) {
LOG.info("load: " + location);
Path full = new Path(location);
String[] jobDetails =
JobInfo.decodeJobHistoryFileName(full.getName()).split("_");
String jobId = jobDetails[2] + "_" + jobDetails[3] + "_"
+ jobDetails[4];
JobHistory.JobInfo job = new JobHistory.JobInfo(jobId);
value = new MRJobInfo();
FileSystem fs = full.getFileSystem(conf);
FileStatus fstat = fs.getFileStatus(full);
LOG.info("file size: " + fstat.getLen());
DefaultJobHistoryParser.parseJobTasks(location, job,
full.getFileSystem(conf));
LOG.info("job history parsed sucessfully");
HadoopJobHistoryLoader.parseJobHistory(conf, job, value);
LOG.info("get parsed job history");
// parse Hadoop job xml file
Path parent = full.getParent();
String jobXml = jobDetails[0] + "_" + jobDetails[1] + "_" + jobDetails[2] + "_conf.xml";
Path p = new Path(parent, jobXml);
FSDataInputStream fileIn = fs.open(p);
Map<String, String> val = HadoopJobHistoryLoader
.parseJobXML(fileIn);
for (String key : val.keySet()) {
value.job.put(key, val.get(key));
}
location = null;
return true;
}
value = null;
return false;
}