下面列出了怎么用org.apache.hadoop.mapreduce.v2.jobhistory.JobIndexInfo的API类实例代码及写法,或者点击链接到github查看源代码。
private void addDirectoryToJobListCache(Path path) throws IOException {
if (LOG.isDebugEnabled()) {
LOG.debug("Adding " + path + " to job list cache.");
}
List<FileStatus> historyFileList = scanDirectoryForHistoryFiles(path,
doneDirFc);
for (FileStatus fs : historyFileList) {
if (LOG.isDebugEnabled()) {
LOG.debug("Adding in history for " + fs.getPath());
}
JobIndexInfo jobIndexInfo = FileNameIndexUtils.getIndexInfo(fs.getPath()
.getName());
String confFileName = JobHistoryUtils
.getIntermediateConfFileName(jobIndexInfo.getJobId());
String summaryFileName = JobHistoryUtils
.getIntermediateSummaryFileName(jobIndexInfo.getJobId());
HistoryFileInfo fileInfo = createHistoryFileInfo(fs.getPath(), new Path(fs
.getPath().getParent(), confFileName), new Path(fs.getPath()
.getParent(), summaryFileName), jobIndexInfo, true);
jobListCache.addIfAbsent(fileInfo);
}
}
/**
* Searches the job history file FileStatus list for the specified JobId.
*
* @param fileStatusList
* fileStatus list of Job History Files.
* @param jobId
* The JobId to find.
* @return A FileInfo object for the jobId, null if not found.
* @throws IOException
*/
private HistoryFileInfo getJobFileInfo(List<FileStatus> fileStatusList,
JobId jobId) throws IOException {
for (FileStatus fs : fileStatusList) {
JobIndexInfo jobIndexInfo = FileNameIndexUtils.getIndexInfo(fs.getPath()
.getName());
if (jobIndexInfo.getJobId().equals(jobId)) {
String confFileName = JobHistoryUtils
.getIntermediateConfFileName(jobIndexInfo.getJobId());
String summaryFileName = JobHistoryUtils
.getIntermediateSummaryFileName(jobIndexInfo.getJobId());
HistoryFileInfo fileInfo = createHistoryFileInfo(fs.getPath(), new Path(
fs.getPath().getParent(), confFileName), new Path(fs.getPath()
.getParent(), summaryFileName), jobIndexInfo, true);
return fileInfo;
}
}
return null;
}
/**
* Simple test PartialJob
*/
@Test(timeout = 3000)
public void testPartialJob() throws Exception {
JobId jobId = new JobIdPBImpl();
jobId.setId(0);
JobIndexInfo jii = new JobIndexInfo(0L, System.currentTimeMillis(), "user",
"jobName", jobId, 3, 2, "JobStatus");
PartialJob test = new PartialJob(jii, jobId);
assertEquals(1.0f, test.getProgress(), 0.001);
assertNull(test.getAllCounters());
assertNull(test.getTasks());
assertNull(test.getTasks(TaskType.MAP));
assertNull(test.getTask(new TaskIdPBImpl()));
assertNull(test.getTaskAttemptCompletionEvents(0, 100));
assertNull(test.getMapAttemptCompletionEvents(0, 100));
assertTrue(test.checkAccess(UserGroupInformation.getCurrentUser(), null));
assertNull(test.getAMInfos());
}
private static JobsPair split(Map<JobId, Job> mocked) throws IOException {
JobsPair ret = new JobsPair();
ret.full = Maps.newHashMap();
ret.partial = Maps.newHashMap();
for(Map.Entry<JobId, Job> entry: mocked.entrySet()) {
JobId id = entry.getKey();
Job j = entry.getValue();
MockCompletedJob mockJob = new MockCompletedJob(j);
// use MockCompletedJob to set everything below to make sure
// consistent with what history server would do
ret.full.put(id, mockJob);
JobReport report = mockJob.getReport();
JobIndexInfo info = new JobIndexInfo(report.getStartTime(),
report.getFinishTime(), mockJob.getUserName(), mockJob.getName(), id,
mockJob.getCompletedMaps(), mockJob.getCompletedReduces(),
String.valueOf(mockJob.getState()));
info.setJobStartTime(report.getStartTime());
info.setQueueName(mockJob.getQueueName());
ret.partial.put(id, new PartialJob(info, id));
}
return ret;
}
private void addDirectoryToJobListCache(Path path) throws IOException {
if (LOG.isDebugEnabled()) {
LOG.debug("Adding " + path + " to job list cache.");
}
List<FileStatus> historyFileList = scanDirectoryForHistoryFiles(path,
doneDirFc);
for (FileStatus fs : historyFileList) {
if (LOG.isDebugEnabled()) {
LOG.debug("Adding in history for " + fs.getPath());
}
JobIndexInfo jobIndexInfo = FileNameIndexUtils.getIndexInfo(fs.getPath()
.getName());
String confFileName = JobHistoryUtils
.getIntermediateConfFileName(jobIndexInfo.getJobId());
String summaryFileName = JobHistoryUtils
.getIntermediateSummaryFileName(jobIndexInfo.getJobId());
HistoryFileInfo fileInfo = new HistoryFileInfo(fs.getPath(), new Path(fs
.getPath().getParent(), confFileName), new Path(fs.getPath()
.getParent(), summaryFileName), jobIndexInfo, true);
jobListCache.addIfAbsent(fileInfo);
}
}
/**
* Searches the job history file FileStatus list for the specified JobId.
*
* @param fileStatusList
* fileStatus list of Job History Files.
* @param jobId
* The JobId to find.
* @return A FileInfo object for the jobId, null if not found.
* @throws IOException
*/
private HistoryFileInfo getJobFileInfo(List<FileStatus> fileStatusList,
JobId jobId) throws IOException {
for (FileStatus fs : fileStatusList) {
JobIndexInfo jobIndexInfo = FileNameIndexUtils.getIndexInfo(fs.getPath()
.getName());
if (jobIndexInfo.getJobId().equals(jobId)) {
String confFileName = JobHistoryUtils
.getIntermediateConfFileName(jobIndexInfo.getJobId());
String summaryFileName = JobHistoryUtils
.getIntermediateSummaryFileName(jobIndexInfo.getJobId());
HistoryFileInfo fileInfo = new HistoryFileInfo(fs.getPath(), new Path(
fs.getPath().getParent(), confFileName), new Path(fs.getPath()
.getParent(), summaryFileName), jobIndexInfo, true);
return fileInfo;
}
}
return null;
}
/**
* Simple test PartialJob
*/
@Test(timeout = 3000)
public void testPartialJob() throws Exception {
JobId jobId = new JobIdPBImpl();
jobId.setId(0);
JobIndexInfo jii = new JobIndexInfo(0L, System.currentTimeMillis(), "user",
"jobName", jobId, 3, 2, "JobStatus");
PartialJob test = new PartialJob(jii, jobId);
assertEquals(1.0f, test.getProgress(), 0.001);
assertNull(test.getAllCounters());
assertNull(test.getTasks());
assertNull(test.getTasks(TaskType.MAP));
assertNull(test.getTask(new TaskIdPBImpl()));
assertNull(test.getTaskAttemptCompletionEvents(0, 100));
assertNull(test.getMapAttemptCompletionEvents(0, 100));
assertTrue(test.checkAccess(UserGroupInformation.getCurrentUser(), null));
assertNull(test.getAMInfos());
}
private static JobsPair split(Map<JobId, Job> mocked) throws IOException {
JobsPair ret = new JobsPair();
ret.full = Maps.newHashMap();
ret.partial = Maps.newHashMap();
for(Map.Entry<JobId, Job> entry: mocked.entrySet()) {
JobId id = entry.getKey();
Job j = entry.getValue();
MockCompletedJob mockJob = new MockCompletedJob(j);
// use MockCompletedJob to set everything below to make sure
// consistent with what history server would do
ret.full.put(id, mockJob);
JobReport report = mockJob.getReport();
JobIndexInfo info = new JobIndexInfo(report.getStartTime(),
report.getFinishTime(), mockJob.getUserName(), mockJob.getName(), id,
mockJob.getCompletedMaps(), mockJob.getCompletedReduces(),
String.valueOf(mockJob.getState()));
info.setJobStartTime(report.getStartTime());
info.setQueueName(mockJob.getQueueName());
ret.partial.put(id, new PartialJob(info, id));
}
return ret;
}
MetaInfo(Path historyFile, Path conf, EventWriter writer, String user,
String jobName, JobId jobId, String forcedJobStateOnShutDown,
String queueName) {
this.historyFile = historyFile;
this.confFile = conf;
this.writer = writer;
this.jobIndexInfo =
new JobIndexInfo(-1, -1, user, jobName, jobId, -1, -1, null,
queueName);
this.jobSummary = new JobSummary();
this.flushTimer = new Timer("FlushTimer", true);
this.forcedJobStateOnShutDown = forcedJobStateOnShutDown;
}
public PartialJob(JobIndexInfo jobIndexInfo, JobId jobId) {
this.jobIndexInfo = jobIndexInfo;
this.jobId = jobId;
jobReport = RecordFactoryProvider.getRecordFactory(null).newRecordInstance(JobReport.class);
jobReport.setSubmitTime(jobIndexInfo.getSubmitTime());
jobReport.setStartTime(jobIndexInfo.getJobStartTime());
jobReport.setFinishTime(jobIndexInfo.getFinishTime());
jobReport.setJobState(getState());
}
public HistoryFileInfo(Path historyFile, Path confFile, Path summaryFile,
JobIndexInfo jobIndexInfo, boolean isInDone) {
this.historyFile = historyFile;
this.confFile = confFile;
this.summaryFile = summaryFile;
this.jobIndexInfo = jobIndexInfo;
state = isInDone ? HistoryInfoState.IN_DONE
: HistoryInfoState.IN_INTERMEDIATE;
}
/**
* Create a HistoryFileInfo instance that hangs on parsing job files.
*/
@Override
protected HistoryFileManager.HistoryFileInfo createHistoryFileInfo(
Path historyFile, Path confFile, Path summaryFile,
JobIndexInfo jobIndexInfo, boolean isInDone) {
return new HistoryFileInfo(historyFile, confFile, summaryFile,
jobIndexInfo, isInDone,
scanningDoneSignals.get(jobIndexInfo .getJobId()));
}
MetaInfo(Path historyFile, Path conf, EventWriter writer, String user,
String jobName, JobId jobId, String forcedJobStateOnShutDown,
String queueName) {
this.historyFile = historyFile;
this.confFile = conf;
this.writer = writer;
this.jobIndexInfo =
new JobIndexInfo(-1, -1, user, jobName, jobId, -1, -1, null,
queueName);
this.jobSummary = new JobSummary();
this.flushTimer = new Timer("FlushTimer", true);
this.forcedJobStateOnShutDown = forcedJobStateOnShutDown;
}
public PartialJob(JobIndexInfo jobIndexInfo, JobId jobId) {
this.jobIndexInfo = jobIndexInfo;
this.jobId = jobId;
jobReport = RecordFactoryProvider.getRecordFactory(null).newRecordInstance(JobReport.class);
jobReport.setSubmitTime(jobIndexInfo.getSubmitTime());
jobReport.setStartTime(jobIndexInfo.getJobStartTime());
jobReport.setFinishTime(jobIndexInfo.getFinishTime());
jobReport.setJobState(getState());
}
private HistoryFileInfo(Path historyFile, Path confFile, Path summaryFile,
JobIndexInfo jobIndexInfo, boolean isInDone) {
this.historyFile = historyFile;
this.confFile = confFile;
this.summaryFile = summaryFile;
this.jobIndexInfo = jobIndexInfo;
state = isInDone ? HistoryInfoState.IN_DONE
: HistoryInfoState.IN_INTERMEDIATE;
}
private void addJobsForHistoryDir(String historyDir, List<AnalyticJob> jobs, long startTime, long endTime)
throws Exception {
if (_fs.exists(new Path(historyDir))) {
RemoteIterator<LocatedFileStatus> it = _fs.listFiles(new Path(historyDir), true);
while (it.hasNext()) {
String histFilename = it.next().getPath().getName();
if (histFilename.endsWith(".jhist")) {
try {
JobIndexInfo indexInfo = FileNameIndexUtils.getIndexInfo(histFilename);
String appId = Utils.getApplicationIdFromJobId(indexInfo.getJobId().toString());
// Add the job only if required.
if (indexInfo.getFinishTime() >= startTime && indexInfo.getFinishTime() <= endTime) {
jobs.add(new AnalyticJob().setAppId(appId).setStartTime(indexInfo.getSubmitTime()).
setFinishTime(indexInfo.getFinishTime()).setName(indexInfo.getJobName()).
setUser(indexInfo.getUser()).setQueueName(indexInfo.getQueueName()).
setAppType(_fetcherConfigurationData.getAppType()));
}
} catch (IOException e) {
// Fall back to parsing the filename by ourselves.
String[] jobDetails = histFilename.split("-");
jobs.add(new AnalyticJob().setAppId(Utils.getApplicationIdFromJobId(jobDetails[0])).
setAppType(_fetcherConfigurationData.getAppType()));
}
}
}
}
}
JobIndexInfo getJobIndexInfo() {
return jobIndexInfo;
}
public JobIndexInfo getJobIndexInfo() {
return jobIndexInfo;
}
protected HistoryFileInfo createHistoryFileInfo(Path historyFile,
Path confFile, Path summaryFile, JobIndexInfo jobIndexInfo,
boolean isInDone) {
return new HistoryFileInfo(
historyFile, confFile, summaryFile, jobIndexInfo, isInDone);
}
/**
* Clean up older history files.
*
* @throws IOException
* on any error trying to remove the entries.
*/
@SuppressWarnings("unchecked")
void clean() throws IOException {
long cutoff = System.currentTimeMillis() - maxHistoryAge;
boolean halted = false;
List<FileStatus> serialDirList = getHistoryDirsForCleaning(cutoff);
// Sort in ascending order. Relies on YYYY/MM/DD/Serial
Collections.sort(serialDirList);
for (FileStatus serialDir : serialDirList) {
List<FileStatus> historyFileList = scanDirectoryForHistoryFiles(
serialDir.getPath(), doneDirFc);
for (FileStatus historyFile : historyFileList) {
JobIndexInfo jobIndexInfo = FileNameIndexUtils.getIndexInfo(historyFile
.getPath().getName());
long effectiveTimestamp = getEffectiveTimestamp(
jobIndexInfo.getFinishTime(), historyFile);
if (effectiveTimestamp <= cutoff) {
HistoryFileInfo fileInfo = this.jobListCache.get(jobIndexInfo
.getJobId());
if (fileInfo == null) {
String confFileName = JobHistoryUtils
.getIntermediateConfFileName(jobIndexInfo.getJobId());
fileInfo = createHistoryFileInfo(historyFile.getPath(), new Path(
historyFile.getPath().getParent(), confFileName), null,
jobIndexInfo, true);
}
deleteJobFromDone(fileInfo);
} else {
halted = true;
break;
}
}
if (!halted) {
deleteDir(serialDir);
removeDirectoryFromSerialNumberIndex(serialDir.getPath());
existingDoneSubdirs.remove(serialDir.getPath());
} else {
break; // Don't scan any more directories.
}
}
}
HistoryFileInfo(Path historyFile, Path confFile, Path summaryFile,
JobIndexInfo jobIndexInfo, boolean isInDone,
CountDownLatch scanningDoneSignal) {
super(historyFile, confFile, summaryFile, jobIndexInfo, isInDone);
this.scanningDoneSignal = scanningDoneSignal;
}
JobIndexInfo getJobIndexInfo() {
return jobIndexInfo;
}
public JobIndexInfo getJobIndexInfo() {
return jobIndexInfo;
}
/**
* Clean up older history files.
*
* @throws IOException
* on any error trying to remove the entries.
*/
@SuppressWarnings("unchecked")
void clean() throws IOException {
long cutoff = System.currentTimeMillis() - maxHistoryAge;
boolean halted = false;
List<FileStatus> serialDirList = getHistoryDirsForCleaning(cutoff);
// Sort in ascending order. Relies on YYYY/MM/DD/Serial
Collections.sort(serialDirList);
for (FileStatus serialDir : serialDirList) {
List<FileStatus> historyFileList = scanDirectoryForHistoryFiles(
serialDir.getPath(), doneDirFc);
for (FileStatus historyFile : historyFileList) {
JobIndexInfo jobIndexInfo = FileNameIndexUtils.getIndexInfo(historyFile
.getPath().getName());
long effectiveTimestamp = getEffectiveTimestamp(
jobIndexInfo.getFinishTime(), historyFile);
if (effectiveTimestamp <= cutoff) {
HistoryFileInfo fileInfo = this.jobListCache.get(jobIndexInfo
.getJobId());
if (fileInfo == null) {
String confFileName = JobHistoryUtils
.getIntermediateConfFileName(jobIndexInfo.getJobId());
fileInfo = new HistoryFileInfo(historyFile.getPath(), new Path(
historyFile.getPath().getParent(), confFileName), null,
jobIndexInfo, true);
}
deleteJobFromDone(fileInfo);
} else {
halted = true;
break;
}
}
if (!halted) {
deleteDir(serialDir);
removeDirectoryFromSerialNumberIndex(serialDir.getPath());
existingDoneSubdirs.remove(serialDir.getPath());
} else {
break; // Don't scan any more directories.
}
}
}