下面列出了怎么用org.apache.hadoop.mapreduce.jobhistory.JobHistoryParser的API类实例代码及写法,或者点击链接到github查看源代码。
@Test
public void testFailedJobHistoryWithoutDiagnostics() throws Exception {
final Path histPath = new Path(getClass().getClassLoader().getResource(
"job_1393307629410_0001-1393307687476-user-Sleep+job-1393307723835-0-0-FAILED-default-1393307693920.jhist")
.getFile());
final FileSystem lfs = FileSystem.getLocal(new Configuration());
final FSDataInputStream fsdis = lfs.open(histPath);
try {
JobHistoryParser parser = new JobHistoryParser(fsdis);
JobInfo info = parser.parse();
assertEquals("History parsed jobId incorrectly",
info.getJobId(), JobID.forName("job_1393307629410_0001") );
assertEquals("Default diagnostics incorrect ", "", info.getErrorInfo());
} finally {
fsdis.close();
}
}
@Test
public void testFailedJobHistoryWithoutDiagnostics() throws Exception {
final Path histPath = new Path(getClass().getClassLoader().getResource(
"job_1393307629410_0001-1393307687476-user-Sleep+job-1393307723835-0-0-FAILED-default-1393307693920.jhist")
.getFile());
final FileSystem lfs = FileSystem.getLocal(new Configuration());
final FSDataInputStream fsdis = lfs.open(histPath);
try {
JobHistoryParser parser = new JobHistoryParser(fsdis);
JobInfo info = parser.parse();
assertEquals("History parsed jobId incorrectly",
info.getJobId(), JobID.forName("job_1393307629410_0001") );
assertEquals("Default diagnostics incorrect ", "", info.getErrorInfo());
} finally {
fsdis.close();
}
}
private void populateJobFromJobInfo(AnalyticJob job, JobHistoryParser.JobInfo jobInfo) {
if(job.getStartTime() <= 0) {
job.setStartTime(jobInfo.getSubmitTime());
}
if(job.getFinishTime() <= 0) {
job.setFinishTime(jobInfo.getFinishTime());
}
if(job.getQueueName() == null || job.getQueueName().isEmpty()) {
job.setQueueName(jobInfo.getJobQueueName());
}
if (job.getUser() == null || job.getUser().isEmpty()) {
job.setUser(jobInfo.getUsername());
}
if (job.getName() == null || job.getName().isEmpty()) {
job.setName(jobInfo.getJobname());
}
}
private long[] getTaskExecTime(JobHistoryParser.TaskAttemptInfo attempInfo) {
long startTime = attempInfo.getStartTime();
long finishTime = attempInfo.getFinishTime();
boolean isMapper = (attempInfo.getTaskType() == TaskType.MAP);
long[] time;
if (isMapper) {
time = new long[]{finishTime - startTime, 0, 0, startTime, finishTime};
} else {
long shuffleFinishTime = attempInfo.getShuffleFinishTime();
long mergeFinishTime = attempInfo.getSortFinishTime();
time = new long[]{finishTime - startTime, shuffleFinishTime - startTime,
mergeFinishTime - shuffleFinishTime, startTime, finishTime};
}
return time;
}
@Override
public List<AMInfo> getAMInfos() {
List<AMInfo> amInfos = new LinkedList<AMInfo>();
for (org.apache.hadoop.mapreduce.jobhistory.JobHistoryParser.AMInfo jhAmInfo : jobInfo
.getAMInfos()) {
AMInfo amInfo =
MRBuilderUtils.newAMInfo(jhAmInfo.getAppAttemptId(),
jhAmInfo.getStartTime(), jhAmInfo.getContainerId(),
jhAmInfo.getNodeManagerHost(), jhAmInfo.getNodeManagerPort(),
jhAmInfo.getNodeManagerHttpPort());
amInfos.add(amInfo);
}
return amInfos;
}
/**
* Test compatibility of JobHistoryParser with 2.0.3-alpha history files
* @throws IOException
*/
@Test
public void testTaskAttemptUnsuccessfulCompletionWithoutCounters203() throws IOException
{
Path histPath = new Path(getClass().getClassLoader().getResource(
"job_2.0.3-alpha-FAILED.jhist").getFile());
JobHistoryParser parser = new JobHistoryParser(FileSystem.getLocal
(new Configuration()), histPath);
JobInfo jobInfo = parser.parse();
LOG.info(" job info: " + jobInfo.getJobname() + " "
+ jobInfo.getFinishedMaps() + " "
+ jobInfo.getTotalMaps() + " "
+ jobInfo.getJobId() ) ;
}
/**
* Test compatibility of JobHistoryParser with 2.4.0 history files
* @throws IOException
*/
@Test
public void testTaskAttemptUnsuccessfulCompletionWithoutCounters240() throws IOException
{
Path histPath = new Path(getClass().getClassLoader().getResource(
"job_2.4.0-FAILED.jhist").getFile());
JobHistoryParser parser = new JobHistoryParser(FileSystem.getLocal
(new Configuration()), histPath);
JobInfo jobInfo = parser.parse();
LOG.info(" job info: " + jobInfo.getJobname() + " "
+ jobInfo.getFinishedMaps() + " "
+ jobInfo.getTotalMaps() + " "
+ jobInfo.getJobId() );
}
/**
* Test compatibility of JobHistoryParser with 0.23.9 history files
* @throws IOException
*/
@Test
public void testTaskAttemptUnsuccessfulCompletionWithoutCounters0239() throws IOException
{
Path histPath = new Path(getClass().getClassLoader().getResource(
"job_0.23.9-FAILED.jhist").getFile());
JobHistoryParser parser = new JobHistoryParser(FileSystem.getLocal
(new Configuration()), histPath);
JobInfo jobInfo = parser.parse();
LOG.info(" job info: " + jobInfo.getJobname() + " "
+ jobInfo.getFinishedMaps() + " "
+ jobInfo.getTotalMaps() + " "
+ jobInfo.getJobId() ) ;
}
@Override
public List<AMInfo> getAMInfos() {
List<AMInfo> amInfos = new LinkedList<AMInfo>();
for (org.apache.hadoop.mapreduce.jobhistory.JobHistoryParser.AMInfo jhAmInfo : jobInfo
.getAMInfos()) {
AMInfo amInfo =
MRBuilderUtils.newAMInfo(jhAmInfo.getAppAttemptId(),
jhAmInfo.getStartTime(), jhAmInfo.getContainerId(),
jhAmInfo.getNodeManagerHost(), jhAmInfo.getNodeManagerPort(),
jhAmInfo.getNodeManagerHttpPort());
amInfos.add(amInfo);
}
return amInfos;
}
/**
* Test compatibility of JobHistoryParser with 2.0.3-alpha history files
* @throws IOException
*/
@Test
public void testTaskAttemptUnsuccessfulCompletionWithoutCounters203() throws IOException
{
Path histPath = new Path(getClass().getClassLoader().getResource(
"job_2.0.3-alpha-FAILED.jhist").getFile());
JobHistoryParser parser = new JobHistoryParser(FileSystem.getLocal
(new Configuration()), histPath);
JobInfo jobInfo = parser.parse();
LOG.info(" job info: " + jobInfo.getJobname() + " "
+ jobInfo.getFinishedMaps() + " "
+ jobInfo.getTotalMaps() + " "
+ jobInfo.getJobId() ) ;
}
/**
* Test compatibility of JobHistoryParser with 2.4.0 history files
* @throws IOException
*/
@Test
public void testTaskAttemptUnsuccessfulCompletionWithoutCounters240() throws IOException
{
Path histPath = new Path(getClass().getClassLoader().getResource(
"job_2.4.0-FAILED.jhist").getFile());
JobHistoryParser parser = new JobHistoryParser(FileSystem.getLocal
(new Configuration()), histPath);
JobInfo jobInfo = parser.parse();
LOG.info(" job info: " + jobInfo.getJobname() + " "
+ jobInfo.getFinishedMaps() + " "
+ jobInfo.getTotalMaps() + " "
+ jobInfo.getJobId() );
}
/**
* Test compatibility of JobHistoryParser with 0.23.9 history files
* @throws IOException
*/
@Test
public void testTaskAttemptUnsuccessfulCompletionWithoutCounters0239() throws IOException
{
Path histPath = new Path(getClass().getClassLoader().getResource(
"job_0.23.9-FAILED.jhist").getFile());
JobHistoryParser parser = new JobHistoryParser(FileSystem.getLocal
(new Configuration()), histPath);
JobInfo jobInfo = parser.parse();
LOG.info(" job info: " + jobInfo.getJobname() + " "
+ jobInfo.getFinishedMaps() + " "
+ jobInfo.getTotalMaps() + " "
+ jobInfo.getJobId() ) ;
}
protected MapReduceTaskData[] getTaskData(String jobId, List<JobHistoryParser.TaskInfo> infoList) {
int sampleSize = sampleAndGetSize(jobId, infoList);
List<MapReduceTaskData> taskList = new ArrayList<MapReduceTaskData>();
for (int i = 0; i < sampleSize; i++) {
JobHistoryParser.TaskInfo tInfo = infoList.get(i);
String taskId = tInfo.getTaskId().toString();
TaskAttemptID attemptId = null;
if(tInfo.getTaskStatus().equals("SUCCEEDED")) {
attemptId = tInfo.getSuccessfulAttemptId();
} else {
attemptId = tInfo.getFailedDueToAttemptId();
}
MapReduceTaskData taskData = new MapReduceTaskData(taskId, attemptId == null ? "" : attemptId.toString() , tInfo.getTaskStatus());
MapReduceCounterData taskCounterData = getCounterData(tInfo.getCounters());
long[] taskExecTime = null;
if (attemptId != null) {
taskExecTime = getTaskExecTime(tInfo.getAllTaskAttempts().get(attemptId));
}
taskData.setTimeAndCounter(taskExecTime, taskCounterData);
taskList.add(taskData);
}
return taskList.toArray(new MapReduceTaskData[taskList.size()]);
}
@Test
public void testGetTaskData() {
FetcherConfiguration fetcherConf = new FetcherConfiguration(document9.getDocumentElement());
try {
MapReduceFSFetcherHadoop2 fetcher = new MapReduceFSFetcherHadoop2(
fetcherConf.getFetchersConfigurationData().get(0));
String jobId = "job_14000_001";
List<JobHistoryParser.TaskInfo> infoList = new ArrayList<JobHistoryParser.TaskInfo>();
infoList.add(new MockTaskInfo(1, true));
infoList.add(new MockTaskInfo(2, false));
MapReduceTaskData[] taskList = fetcher.getTaskData(jobId, infoList);
Assert.assertNotNull("taskList should not be null.", taskList);
int succeededTaskCount = 0;
for (MapReduceTaskData task : taskList) {
Assert.assertNotNull("Null pointer in taskList.", task);
if(task.getState().equals("SUCCEEDED")) {
succeededTaskCount++;
}
}
Assert.assertEquals("Should have total two tasks.", 2, taskList.length);
Assert.assertEquals("Should have only one succeeded task.", 1, succeededTaskCount);
} catch (IOException e) {
Assert.assertNull("Failed to initialize FileSystem.", e);
}
}
public MockTaskInfo(int id, boolean succeeded) {
this.taskId = new TaskID("job1", 1, TaskType.MAP, id);
this.taskType = TaskType.MAP;
this.succeeded = succeeded;
this.counters = new Counters();
this.finishTime = System.currentTimeMillis();
this.startTime = finishTime - 10000;
this.failedDueToAttemptId = new TaskAttemptID(taskId, 0);
this.successfulAttemptId = new TaskAttemptID(taskId, 1);
this.attemptsMap = new HashMap<TaskAttemptID, JobHistoryParser.TaskAttemptInfo>();
this.attemptsMap.put(failedDueToAttemptId, new JobHistoryParser.TaskAttemptInfo());
this.attemptsMap.put(successfulAttemptId, new JobHistoryParser.TaskAttemptInfo());
}
private void parsePreviousJobHistory() throws IOException {
FSDataInputStream in = getPreviousJobHistoryStream(getConfig(),
appAttemptID);
JobHistoryParser parser = new JobHistoryParser(in);
JobInfo jobInfo = parser.parse();
Exception parseException = parser.getParseException();
if (parseException != null) {
LOG.info("Got an error parsing job-history file" +
", ignoring incomplete events.", parseException);
}
Map<org.apache.hadoop.mapreduce.TaskID, TaskInfo> taskInfos = jobInfo
.getAllTasks();
for (TaskInfo taskInfo : taskInfos.values()) {
if (TaskState.SUCCEEDED.toString().equals(taskInfo.getTaskStatus())) {
Iterator<Entry<TaskAttemptID, TaskAttemptInfo>> taskAttemptIterator =
taskInfo.getAllTaskAttempts().entrySet().iterator();
while (taskAttemptIterator.hasNext()) {
Map.Entry<TaskAttemptID, TaskAttemptInfo> currentEntry = taskAttemptIterator.next();
if (!jobInfo.getAllCompletedTaskAttempts().containsKey(currentEntry.getKey())) {
taskAttemptIterator.remove();
}
}
completedTasksFromPreviousRun
.put(TypeConverter.toYarn(taskInfo.getTaskId()), taskInfo);
LOG.info("Read from history task "
+ TypeConverter.toYarn(taskInfo.getTaskId()));
}
}
LOG.info("Read completed tasks from history "
+ completedTasksFromPreviousRun.size());
recoveredJobStartTime = jobInfo.getLaunchTime();
// recover AMInfos
List<JobHistoryParser.AMInfo> jhAmInfoList = jobInfo.getAMInfos();
if (jhAmInfoList != null) {
for (JobHistoryParser.AMInfo jhAmInfo : jhAmInfoList) {
AMInfo amInfo = MRBuilderUtils.newAMInfo(jhAmInfo.getAppAttemptId(),
jhAmInfo.getStartTime(), jhAmInfo.getContainerId(),
jhAmInfo.getNodeManagerHost(), jhAmInfo.getNodeManagerPort(),
jhAmInfo.getNodeManagerHttpPort());
amInfos.add(amInfo);
}
}
}
@Test
public void testMultipleFailedTasks() throws Exception {
JobHistoryParser parser =
new JobHistoryParser(Mockito.mock(FSDataInputStream.class));
EventReader reader = Mockito.mock(EventReader.class);
final AtomicInteger numEventsRead = new AtomicInteger(0); // Hack!
final org.apache.hadoop.mapreduce.TaskType taskType =
org.apache.hadoop.mapreduce.TaskType.MAP;
final TaskID[] tids = new TaskID[2];
final JobID jid = new JobID("1", 1);
tids[0] = new TaskID(jid, taskType, 0);
tids[1] = new TaskID(jid, taskType, 1);
Mockito.when(reader.getNextEvent()).thenAnswer(
new Answer<HistoryEvent>() {
public HistoryEvent answer(InvocationOnMock invocation)
throws IOException {
// send two task start and two task fail events for tasks 0 and 1
int eventId = numEventsRead.getAndIncrement();
TaskID tid = tids[eventId & 0x1];
if (eventId < 2) {
return new TaskStartedEvent(tid, 0, taskType, "");
}
if (eventId < 4) {
TaskFailedEvent tfe = new TaskFailedEvent(tid, 0, taskType,
"failed", "FAILED", null, new Counters());
tfe.setDatum(tfe.getDatum());
return tfe;
}
if (eventId < 5) {
JobUnsuccessfulCompletionEvent juce =
new JobUnsuccessfulCompletionEvent(jid, 100L, 2, 0,
"JOB_FAILED", Collections.singletonList(
"Task failed: " + tids[0].toString()));
return juce;
}
return null;
}
});
JobInfo info = parser.parse(reader);
assertTrue("Task 0 not implicated",
info.getErrorInfo().contains(tids[0].toString()));
}
private void parsePreviousJobHistory() throws IOException {
FSDataInputStream in = getPreviousJobHistoryStream(getConfig(),
appAttemptID);
JobHistoryParser parser = new JobHistoryParser(in);
JobInfo jobInfo = parser.parse();
Exception parseException = parser.getParseException();
if (parseException != null) {
LOG.info("Got an error parsing job-history file" +
", ignoring incomplete events.", parseException);
}
Map<org.apache.hadoop.mapreduce.TaskID, TaskInfo> taskInfos = jobInfo
.getAllTasks();
for (TaskInfo taskInfo : taskInfos.values()) {
if (TaskState.SUCCEEDED.toString().equals(taskInfo.getTaskStatus())) {
Iterator<Entry<TaskAttemptID, TaskAttemptInfo>> taskAttemptIterator =
taskInfo.getAllTaskAttempts().entrySet().iterator();
while (taskAttemptIterator.hasNext()) {
Map.Entry<TaskAttemptID, TaskAttemptInfo> currentEntry = taskAttemptIterator.next();
if (!jobInfo.getAllCompletedTaskAttempts().containsKey(currentEntry.getKey())) {
taskAttemptIterator.remove();
}
}
completedTasksFromPreviousRun
.put(TypeConverter.toYarn(taskInfo.getTaskId()), taskInfo);
LOG.info("Read from history task "
+ TypeConverter.toYarn(taskInfo.getTaskId()));
}
}
LOG.info("Read completed tasks from history "
+ completedTasksFromPreviousRun.size());
recoveredJobStartTime = jobInfo.getLaunchTime();
// recover AMInfos
List<JobHistoryParser.AMInfo> jhAmInfoList = jobInfo.getAMInfos();
if (jhAmInfoList != null) {
for (JobHistoryParser.AMInfo jhAmInfo : jhAmInfoList) {
AMInfo amInfo = MRBuilderUtils.newAMInfo(jhAmInfo.getAppAttemptId(),
jhAmInfo.getStartTime(), jhAmInfo.getContainerId(),
jhAmInfo.getNodeManagerHost(), jhAmInfo.getNodeManagerPort(),
jhAmInfo.getNodeManagerHttpPort());
amInfos.add(amInfo);
}
}
}
@Test
public void testMultipleFailedTasks() throws Exception {
JobHistoryParser parser =
new JobHistoryParser(Mockito.mock(FSDataInputStream.class));
EventReader reader = Mockito.mock(EventReader.class);
final AtomicInteger numEventsRead = new AtomicInteger(0); // Hack!
final org.apache.hadoop.mapreduce.TaskType taskType =
org.apache.hadoop.mapreduce.TaskType.MAP;
final TaskID[] tids = new TaskID[2];
final JobID jid = new JobID("1", 1);
tids[0] = new TaskID(jid, taskType, 0);
tids[1] = new TaskID(jid, taskType, 1);
Mockito.when(reader.getNextEvent()).thenAnswer(
new Answer<HistoryEvent>() {
public HistoryEvent answer(InvocationOnMock invocation)
throws IOException {
// send two task start and two task fail events for tasks 0 and 1
int eventId = numEventsRead.getAndIncrement();
TaskID tid = tids[eventId & 0x1];
if (eventId < 2) {
return new TaskStartedEvent(tid, 0, taskType, "");
}
if (eventId < 4) {
TaskFailedEvent tfe = new TaskFailedEvent(tid, 0, taskType,
"failed", "FAILED", null, new Counters());
tfe.setDatum(tfe.getDatum());
return tfe;
}
if (eventId < 5) {
JobUnsuccessfulCompletionEvent juce =
new JobUnsuccessfulCompletionEvent(jid, 100L, 2, 0,
"JOB_FAILED", Collections.singletonList(
"Task failed: " + tids[0].toString()));
return juce;
}
return null;
}
});
JobInfo info = parser.parse(reader);
assertTrue("Task 0 not implicated",
info.getErrorInfo().contains(tids[0].toString()));
}
@Override
public MapReduceApplicationData fetchData(AnalyticJob job) throws IOException {
DataFiles files = getHistoryFiles(job);
String confFile = files.getJobConfPath();
String histFile = files.getJobHistPath();
String appId = job.getAppId();
String jobId = Utils.getJobIdFromApplicationId(appId);
MapReduceApplicationData jobData = new MapReduceApplicationData();
jobData.setAppId(appId).setJobId(jobId);
// Fetch job config
Configuration jobConf = new Configuration(false);
jobConf.addResource(_fs.open(new Path(confFile)), confFile);
Properties jobConfProperties = new Properties();
for (Map.Entry<String, String> entry : jobConf) {
jobConfProperties.put(entry.getKey(), entry.getValue());
}
jobData.setJobConf(jobConfProperties);
// Check if job history file is too large and should be throttled
if (_fs.getFileStatus(new Path(histFile)).getLen() > _maxLogSizeInMB * FileUtils.ONE_MB) {
String errMsg =
"The history log of MapReduce application: " + appId + " is over the limit size of " + _maxLogSizeInMB + " MB, the parsing process gets throttled.";
logger.warn(errMsg);
jobData.setDiagnosticInfo(errMsg);
jobData.setSucceeded(false); // set succeeded to false to avoid heuristic analysis
return jobData;
}
// Analyze job history file
JobHistoryParser parser = new JobHistoryParser(_fs, histFile);
JobHistoryParser.JobInfo jobInfo = parser.parse();
IOException parseException = parser.getParseException();
if (parseException != null) {
throw new RuntimeException("Could not parse history file " + histFile, parseException);
}
// Populate missing fields from parsed job info. This info will be missing for backfilled jobs.
populateJobFromJobInfo(job, jobInfo);
jobData.setSubmitTime(jobInfo.getSubmitTime());
jobData.setStartTime(jobInfo.getLaunchTime());
jobData.setFinishTime(jobInfo.getFinishTime());
String state = jobInfo.getJobStatus();
if (state.equals("SUCCEEDED")) {
jobData.setSucceeded(true);
} else if (state.equals("FAILED")) {
jobData.setSucceeded(false);
jobData.setDiagnosticInfo(jobInfo.getErrorInfo());
} else {
throw new RuntimeException("job neither succeeded or failed. can not process it ");
}
// Fetch job counter
MapReduceCounterData jobCounter = getCounterData(jobInfo.getTotalCounters());
// Fetch task data
Map<TaskID, JobHistoryParser.TaskInfo> allTasks = jobInfo.getAllTasks();
List<JobHistoryParser.TaskInfo> mapperInfoList = new ArrayList<JobHistoryParser.TaskInfo>();
List<JobHistoryParser.TaskInfo> reducerInfoList = new ArrayList<JobHistoryParser.TaskInfo>();
for (JobHistoryParser.TaskInfo taskInfo : allTasks.values()) {
if (taskInfo.getTaskType() == TaskType.MAP) {
mapperInfoList.add(taskInfo);
} else {
reducerInfoList.add(taskInfo);
}
}
if (jobInfo.getTotalMaps() > MAX_SAMPLE_SIZE) {
logger.debug(jobId + " total mappers: " + mapperInfoList.size());
}
if (jobInfo.getTotalReduces() > MAX_SAMPLE_SIZE) {
logger.debug(jobId + " total reducers: " + reducerInfoList.size());
}
MapReduceTaskData[] mapperList = getTaskData(jobId, mapperInfoList);
MapReduceTaskData[] reducerList = getTaskData(jobId, reducerInfoList);
jobData.setCounters(jobCounter).setMapperData(mapperList).setReducerData(reducerList);
return jobData;
}
public Map<TaskAttemptID, JobHistoryParser.TaskAttemptInfo> getAllTaskAttempts() {
return attemptsMap;
}