下面列出了怎么用org.apache.hadoop.mapreduce.v2.jobhistory.FileNameIndexUtils的API类实例代码及写法,或者点击链接到github查看源代码。
private void addDirectoryToJobListCache(Path path) throws IOException {
if (LOG.isDebugEnabled()) {
LOG.debug("Adding " + path + " to job list cache.");
}
List<FileStatus> historyFileList = scanDirectoryForHistoryFiles(path,
doneDirFc);
for (FileStatus fs : historyFileList) {
if (LOG.isDebugEnabled()) {
LOG.debug("Adding in history for " + fs.getPath());
}
JobIndexInfo jobIndexInfo = FileNameIndexUtils.getIndexInfo(fs.getPath()
.getName());
String confFileName = JobHistoryUtils
.getIntermediateConfFileName(jobIndexInfo.getJobId());
String summaryFileName = JobHistoryUtils
.getIntermediateSummaryFileName(jobIndexInfo.getJobId());
HistoryFileInfo fileInfo = createHistoryFileInfo(fs.getPath(), new Path(fs
.getPath().getParent(), confFileName), new Path(fs.getPath()
.getParent(), summaryFileName), jobIndexInfo, true);
jobListCache.addIfAbsent(fileInfo);
}
}
/**
* Searches the job history file FileStatus list for the specified JobId.
*
* @param fileStatusList
* fileStatus list of Job History Files.
* @param jobId
* The JobId to find.
* @return A FileInfo object for the jobId, null if not found.
* @throws IOException
*/
private HistoryFileInfo getJobFileInfo(List<FileStatus> fileStatusList,
JobId jobId) throws IOException {
for (FileStatus fs : fileStatusList) {
JobIndexInfo jobIndexInfo = FileNameIndexUtils.getIndexInfo(fs.getPath()
.getName());
if (jobIndexInfo.getJobId().equals(jobId)) {
String confFileName = JobHistoryUtils
.getIntermediateConfFileName(jobIndexInfo.getJobId());
String summaryFileName = JobHistoryUtils
.getIntermediateSummaryFileName(jobIndexInfo.getJobId());
HistoryFileInfo fileInfo = createHistoryFileInfo(fs.getPath(), new Path(
fs.getPath().getParent(), confFileName), new Path(fs.getPath()
.getParent(), summaryFileName), jobIndexInfo, true);
return fileInfo;
}
}
return null;
}
private void addDirectoryToJobListCache(Path path) throws IOException {
if (LOG.isDebugEnabled()) {
LOG.debug("Adding " + path + " to job list cache.");
}
List<FileStatus> historyFileList = scanDirectoryForHistoryFiles(path,
doneDirFc);
for (FileStatus fs : historyFileList) {
if (LOG.isDebugEnabled()) {
LOG.debug("Adding in history for " + fs.getPath());
}
JobIndexInfo jobIndexInfo = FileNameIndexUtils.getIndexInfo(fs.getPath()
.getName());
String confFileName = JobHistoryUtils
.getIntermediateConfFileName(jobIndexInfo.getJobId());
String summaryFileName = JobHistoryUtils
.getIntermediateSummaryFileName(jobIndexInfo.getJobId());
HistoryFileInfo fileInfo = new HistoryFileInfo(fs.getPath(), new Path(fs
.getPath().getParent(), confFileName), new Path(fs.getPath()
.getParent(), summaryFileName), jobIndexInfo, true);
jobListCache.addIfAbsent(fileInfo);
}
}
/**
* Searches the job history file FileStatus list for the specified JobId.
*
* @param fileStatusList
* fileStatus list of Job History Files.
* @param jobId
* The JobId to find.
* @return A FileInfo object for the jobId, null if not found.
* @throws IOException
*/
private HistoryFileInfo getJobFileInfo(List<FileStatus> fileStatusList,
JobId jobId) throws IOException {
for (FileStatus fs : fileStatusList) {
JobIndexInfo jobIndexInfo = FileNameIndexUtils.getIndexInfo(fs.getPath()
.getName());
if (jobIndexInfo.getJobId().equals(jobId)) {
String confFileName = JobHistoryUtils
.getIntermediateConfFileName(jobIndexInfo.getJobId());
String summaryFileName = JobHistoryUtils
.getIntermediateSummaryFileName(jobIndexInfo.getJobId());
HistoryFileInfo fileInfo = new HistoryFileInfo(fs.getPath(), new Path(
fs.getPath().getParent(), confFileName), new Path(fs.getPath()
.getParent(), summaryFileName), jobIndexInfo, true);
return fileInfo;
}
}
return null;
}
private void addJobsForHistoryDir(String historyDir, List<AnalyticJob> jobs, long startTime, long endTime)
throws Exception {
if (_fs.exists(new Path(historyDir))) {
RemoteIterator<LocatedFileStatus> it = _fs.listFiles(new Path(historyDir), true);
while (it.hasNext()) {
String histFilename = it.next().getPath().getName();
if (histFilename.endsWith(".jhist")) {
try {
JobIndexInfo indexInfo = FileNameIndexUtils.getIndexInfo(histFilename);
String appId = Utils.getApplicationIdFromJobId(indexInfo.getJobId().toString());
// Add the job only if required.
if (indexInfo.getFinishTime() >= startTime && indexInfo.getFinishTime() <= endTime) {
jobs.add(new AnalyticJob().setAppId(appId).setStartTime(indexInfo.getSubmitTime()).
setFinishTime(indexInfo.getFinishTime()).setName(indexInfo.getJobName()).
setUser(indexInfo.getUser()).setQueueName(indexInfo.getQueueName()).
setAppType(_fetcherConfigurationData.getAppType()));
}
} catch (IOException e) {
// Fall back to parsing the filename by ourselves.
String[] jobDetails = histFilename.split("-");
jobs.add(new AnalyticJob().setAppId(Utils.getApplicationIdFromJobId(jobDetails[0])).
setAppType(_fetcherConfigurationData.getAppType()));
}
}
}
}
}
/**
* Clean up older history files.
*
* @throws IOException
* on any error trying to remove the entries.
*/
@SuppressWarnings("unchecked")
void clean() throws IOException {
long cutoff = System.currentTimeMillis() - maxHistoryAge;
boolean halted = false;
List<FileStatus> serialDirList = getHistoryDirsForCleaning(cutoff);
// Sort in ascending order. Relies on YYYY/MM/DD/Serial
Collections.sort(serialDirList);
for (FileStatus serialDir : serialDirList) {
List<FileStatus> historyFileList = scanDirectoryForHistoryFiles(
serialDir.getPath(), doneDirFc);
for (FileStatus historyFile : historyFileList) {
JobIndexInfo jobIndexInfo = FileNameIndexUtils.getIndexInfo(historyFile
.getPath().getName());
long effectiveTimestamp = getEffectiveTimestamp(
jobIndexInfo.getFinishTime(), historyFile);
if (effectiveTimestamp <= cutoff) {
HistoryFileInfo fileInfo = this.jobListCache.get(jobIndexInfo
.getJobId());
if (fileInfo == null) {
String confFileName = JobHistoryUtils
.getIntermediateConfFileName(jobIndexInfo.getJobId());
fileInfo = createHistoryFileInfo(historyFile.getPath(), new Path(
historyFile.getPath().getParent(), confFileName), null,
jobIndexInfo, true);
}
deleteJobFromDone(fileInfo);
} else {
halted = true;
break;
}
}
if (!halted) {
deleteDir(serialDir);
removeDirectoryFromSerialNumberIndex(serialDir.getPath());
existingDoneSubdirs.remove(serialDir.getPath());
} else {
break; // Don't scan any more directories.
}
}
}
/**
* Clean up older history files.
*
* @throws IOException
* on any error trying to remove the entries.
*/
@SuppressWarnings("unchecked")
void clean() throws IOException {
long cutoff = System.currentTimeMillis() - maxHistoryAge;
boolean halted = false;
List<FileStatus> serialDirList = getHistoryDirsForCleaning(cutoff);
// Sort in ascending order. Relies on YYYY/MM/DD/Serial
Collections.sort(serialDirList);
for (FileStatus serialDir : serialDirList) {
List<FileStatus> historyFileList = scanDirectoryForHistoryFiles(
serialDir.getPath(), doneDirFc);
for (FileStatus historyFile : historyFileList) {
JobIndexInfo jobIndexInfo = FileNameIndexUtils.getIndexInfo(historyFile
.getPath().getName());
long effectiveTimestamp = getEffectiveTimestamp(
jobIndexInfo.getFinishTime(), historyFile);
if (effectiveTimestamp <= cutoff) {
HistoryFileInfo fileInfo = this.jobListCache.get(jobIndexInfo
.getJobId());
if (fileInfo == null) {
String confFileName = JobHistoryUtils
.getIntermediateConfFileName(jobIndexInfo.getJobId());
fileInfo = new HistoryFileInfo(historyFile.getPath(), new Path(
historyFile.getPath().getParent(), confFileName), null,
jobIndexInfo, true);
}
deleteJobFromDone(fileInfo);
} else {
halted = true;
break;
}
}
if (!halted) {
deleteDir(serialDir);
removeDirectoryFromSerialNumberIndex(serialDir.getPath());
existingDoneSubdirs.remove(serialDir.getPath());
} else {
break; // Don't scan any more directories.
}
}
}