下面列出了org.apache.hadoop.mapreduce.jobhistory.MapAttemptFinishedEvent#org.apache.hadoop.mapreduce.jobhistory.HistoryEvent 实例代码,或者点击链接到github查看源代码,也可以在右侧发表评论。
HistoryEvent maybeEmitEvent(ParsedLine line, String taskIDName,
HistoryEventEmitter thatg) {
if (taskIDName == null) {
return null;
}
TaskID taskID = TaskID.forName(taskIDName);
String taskType = line.get("TASK_TYPE");
String startTime = line.get("START_TIME");
String splits = line.get("SPLITS");
if (startTime != null && taskType != null) {
Task20LineHistoryEventEmitter that =
(Task20LineHistoryEventEmitter) thatg;
that.originalStartTime = Long.parseLong(startTime);
that.originalTaskType =
Version20LogInterfaceUtils.get20TaskType(taskType);
return new TaskStartedEvent(taskID, that.originalStartTime,
that.originalTaskType, splits);
}
return null;
}
HistoryEvent maybeEmitEvent(ParsedLine line, String taskIDName,
HistoryEventEmitter thatg) {
if (taskIDName == null) {
return null;
}
TaskID taskID = TaskID.forName(taskIDName);
String finishTime = line.get("FINISH_TIME");
if (finishTime != null) {
return new TaskUpdatedEvent(taskID, Long.parseLong(finishTime));
}
return null;
}
/**
* Process one {@link HistoryEvent}
*
* @param event
* The {@link HistoryEvent} to be processed.
*/
public void process(HistoryEvent event) {
if (event instanceof TaskAttemptFinishedEvent) {
processTaskAttemptFinishedEvent((TaskAttemptFinishedEvent) event);
} else if (event instanceof TaskAttemptUnsuccessfulCompletionEvent) {
processTaskAttemptUnsuccessfulCompletionEvent((TaskAttemptUnsuccessfulCompletionEvent) event);
} else if (event instanceof TaskStartedEvent) {
processTaskStartedEvent((TaskStartedEvent) event);
} else if (event instanceof MapAttemptFinishedEvent) {
processMapAttemptFinishedEvent((MapAttemptFinishedEvent) event);
} else if (event instanceof ReduceAttemptFinishedEvent) {
processReduceAttemptFinishedEvent((ReduceAttemptFinishedEvent) event);
}
// I do NOT expect these if statements to be exhaustive.
}
HistoryEvent maybeEmitEvent(ParsedLine line, String jobIDName,
HistoryEventEmitter thatg) {
JobID jobID = JobID.forName(jobIDName);
if (jobIDName == null) {
return null;
}
String priority = line.get("JOB_PRIORITY");
if (priority != null) {
return new JobPriorityChangeEvent(jobID, JobPriority.valueOf(priority));
}
return null;
}
HistoryEvent maybeEmitEvent(ParsedLine line, String jobIDName,
HistoryEventEmitter thatg) {
if (jobIDName == null) {
return null;
}
JobID jobID = JobID.forName(jobIDName);
String launchTime = line.get("LAUNCH_TIME");
String status = line.get("JOB_STATUS");
String totalMaps = line.get("TOTAL_MAPS");
String totalReduces = line.get("TOTAL_REDUCES");
String uberized = line.get("UBERIZED");
if (launchTime != null && totalMaps != null && totalReduces != null) {
return new JobInitedEvent(jobID, Long.parseLong(launchTime), Integer
.parseInt(totalMaps), Integer.parseInt(totalReduces), status,
Boolean.parseBoolean(uberized));
}
return null;
}
HistoryEvent maybeEmitEvent(ParsedLine line, String jobIDName,
HistoryEventEmitter thatg) {
if (jobIDName == null) {
return null;
}
JobID jobID = JobID.forName(jobIDName);
String status = line.get("JOB_STATUS");
if (status != null) {
return new JobStatusChangedEvent(jobID, status);
}
return null;
}
HistoryEvent maybeEmitEvent(ParsedLine line, String jobIDName,
HistoryEventEmitter thatg) {
if (jobIDName == null) {
return null;
}
JobID jobID = JobID.forName(jobIDName);
String launchTime = line.get("LAUNCH_TIME");
if (launchTime != null) {
Job20LineHistoryEventEmitter that =
(Job20LineHistoryEventEmitter) thatg;
return new JobInfoChangeEvent(jobID, that.originalSubmitTime, Long
.parseLong(launchTime));
}
return null;
}
HistoryEvent maybeEmitEvent(ParsedLine line, String jobIDName,
HistoryEventEmitter thatg) {
if (jobIDName == null) {
return null;
}
JobID jobID = JobID.forName(jobIDName);
String finishTime = line.get("FINISH_TIME");
String status = line.get("JOB_STATUS");
String finishedMaps = line.get("FINISHED_MAPS");
String finishedReduces = line.get("FINISHED_REDUCES");
if (status != null && !status.equalsIgnoreCase("success")
&& finishTime != null && finishedMaps != null
&& finishedReduces != null) {
return new JobUnsuccessfulCompletionEvent(jobID, Long
.parseLong(finishTime), Integer.parseInt(finishedMaps), Integer
.parseInt(finishedReduces), status);
}
return null;
}
HistoryEvent maybeEmitEvent(ParsedLine line, String taskIDName,
HistoryEventEmitter thatg) {
if (taskIDName == null) {
return null;
}
TaskID taskID = TaskID.forName(taskIDName);
String taskType = line.get("TASK_TYPE");
String startTime = line.get("START_TIME");
String splits = line.get("SPLITS");
if (startTime != null && taskType != null) {
Task20LineHistoryEventEmitter that =
(Task20LineHistoryEventEmitter) thatg;
that.originalStartTime = Long.parseLong(startTime);
that.originalTaskType =
Version20LogInterfaceUtils.get20TaskType(taskType);
return new TaskStartedEvent(taskID, that.originalStartTime,
that.originalTaskType, splits);
}
return null;
}
HistoryEvent maybeEmitEvent(ParsedLine line, String taskIDName,
HistoryEventEmitter thatg) {
if (taskIDName == null) {
return null;
}
TaskID taskID = TaskID.forName(taskIDName);
String finishTime = line.get("FINISH_TIME");
if (finishTime != null) {
return new TaskUpdatedEvent(taskID, Long.parseLong(finishTime));
}
return null;
}
/**
* Process one {@link HistoryEvent}
*
* @param event
* The {@link HistoryEvent} to be processed.
*/
public void process(HistoryEvent event) {
if (event instanceof TaskAttemptFinishedEvent) {
processTaskAttemptFinishedEvent((TaskAttemptFinishedEvent) event);
} else if (event instanceof TaskAttemptUnsuccessfulCompletionEvent) {
processTaskAttemptUnsuccessfulCompletionEvent((TaskAttemptUnsuccessfulCompletionEvent) event);
} else if (event instanceof TaskStartedEvent) {
processTaskStartedEvent((TaskStartedEvent) event);
} else if (event instanceof MapAttemptFinishedEvent) {
processMapAttemptFinishedEvent((MapAttemptFinishedEvent) event);
} else if (event instanceof ReduceAttemptFinishedEvent) {
processReduceAttemptFinishedEvent((ReduceAttemptFinishedEvent) event);
}
// I do NOT expect these if statements to be exhaustive.
}
HistoryEvent maybeEmitEvent(ParsedLine line, String jobIDName,
HistoryEventEmitter thatg) {
JobID jobID = JobID.forName(jobIDName);
if (jobIDName == null) {
return null;
}
String priority = line.get("JOB_PRIORITY");
if (priority != null) {
return new JobPriorityChangeEvent(jobID, JobPriority.valueOf(priority));
}
return null;
}
HistoryEvent maybeEmitEvent(ParsedLine line, String jobIDName,
HistoryEventEmitter thatg) {
if (jobIDName == null) {
return null;
}
JobID jobID = JobID.forName(jobIDName);
String launchTime = line.get("LAUNCH_TIME");
String status = line.get("JOB_STATUS");
String totalMaps = line.get("TOTAL_MAPS");
String totalReduces = line.get("TOTAL_REDUCES");
String uberized = line.get("UBERIZED");
if (launchTime != null && totalMaps != null && totalReduces != null) {
return new JobInitedEvent(jobID, Long.parseLong(launchTime), Integer
.parseInt(totalMaps), Integer.parseInt(totalReduces), status,
Boolean.parseBoolean(uberized));
}
return null;
}
HistoryEvent maybeEmitEvent(ParsedLine line, String jobIDName,
HistoryEventEmitter thatg) {
if (jobIDName == null) {
return null;
}
JobID jobID = JobID.forName(jobIDName);
String status = line.get("JOB_STATUS");
if (status != null) {
return new JobStatusChangedEvent(jobID, status);
}
return null;
}
HistoryEvent maybeEmitEvent(ParsedLine line, String jobIDName,
HistoryEventEmitter thatg) {
if (jobIDName == null) {
return null;
}
JobID jobID = JobID.forName(jobIDName);
String launchTime = line.get("LAUNCH_TIME");
if (launchTime != null) {
Job20LineHistoryEventEmitter that =
(Job20LineHistoryEventEmitter) thatg;
return new JobInfoChangeEvent(jobID, that.originalSubmitTime, Long
.parseLong(launchTime));
}
return null;
}
HistoryEvent maybeEmitEvent(ParsedLine line, String jobIDName,
HistoryEventEmitter thatg) {
if (jobIDName == null) {
return null;
}
JobID jobID = JobID.forName(jobIDName);
String finishTime = line.get("FINISH_TIME");
String status = line.get("JOB_STATUS");
String finishedMaps = line.get("FINISHED_MAPS");
String finishedReduces = line.get("FINISHED_REDUCES");
if (status != null && !status.equalsIgnoreCase("success")
&& finishTime != null && finishedMaps != null
&& finishedReduces != null) {
return new JobUnsuccessfulCompletionEvent(jobID, Long
.parseLong(finishTime), Integer.parseInt(finishedMaps), Integer
.parseInt(finishedReduces), status);
}
return null;
}
@Override
public void handleEvent(HistoryEvent event) {
EventType type = event.getEventType();
switch (type) {
case MAP_ATTEMPT_FINISHED:
handleMapAttemptFinishedEvent((MapAttemptFinishedEvent) event);
super.handleEvent(event);
break;
case REDUCE_ATTEMPT_FINISHED:
handleReduceAttemptFinishedEvent((ReduceAttemptFinishedEvent) event);
super.handleEvent(event);
break;
default:
super.handleEvent(event);
break;
}
}
private void verifyFailedStatus(MRAppMasterTest appMaster,
String expectedJobState) {
ArgumentCaptor<JobHistoryEvent> captor = ArgumentCaptor
.forClass(JobHistoryEvent.class);
// handle two events: AMStartedEvent and JobUnsuccessfulCompletionEvent
verify(appMaster.spyHistoryService, times(2))
.handleEvent(captor.capture());
HistoryEvent event = captor.getValue().getHistoryEvent();
assertTrue(event instanceof JobUnsuccessfulCompletionEvent);
assertEquals(((JobUnsuccessfulCompletionEvent) event).getStatus()
, expectedJobState);
}
HistoryEvent maybeEmitEvent(ParsedLine line, String taskAttemptIDName,
HistoryEventEmitter thatg) {
if (taskAttemptIDName == null) {
return null;
}
TaskAttemptID taskAttemptID = TaskAttemptID.forName(taskAttemptIDName);
String finishTime = line.get("FINISH_TIME");
String status = line.get("TASK_STATUS");
if (finishTime != null && status != null
&& status.equalsIgnoreCase("success")) {
String hostName = line.get("HOSTNAME");
String counters = line.get("COUNTERS");
String state = line.get("STATE_STRING");
String shuffleFinish = line.get("SHUFFLE_FINISHED");
String sortFinish = line.get("SORT_FINISHED");
if (shuffleFinish != null && sortFinish != null
&& "success".equalsIgnoreCase(status)) {
ReduceAttempt20LineHistoryEventEmitter that =
(ReduceAttempt20LineHistoryEventEmitter) thatg;
return new ReduceAttemptFinishedEvent
(taskAttemptID,
that.originalTaskType, status,
Long.parseLong(shuffleFinish),
Long.parseLong(sortFinish),
Long.parseLong(finishTime),
hostName, -1, null,
state, maybeParseCounters(counters),
null);
}
}
return null;
}
HistoryEvent maybeEmitEvent(ParsedLine line, String taskIDName,
HistoryEventEmitter thatg) {
if (taskIDName == null) {
return null;
}
TaskID taskID = TaskID.forName(taskIDName);
String status = line.get("TASK_STATUS");
String finishTime = line.get("FINISH_TIME");
String error = line.get("ERROR");
String counters = line.get("COUNTERS");
if (finishTime != null && error == null
&& (status != null && status.equalsIgnoreCase("success"))) {
Counters eventCounters = maybeParseCounters(counters);
Task20LineHistoryEventEmitter that =
(Task20LineHistoryEventEmitter) thatg;
if (that.originalTaskType == null) {
return null;
}
return new TaskFinishedEvent(taskID, null, Long.parseLong(finishTime),
that.originalTaskType, status, eventCounters);
}
return null;
}
HistoryEvent maybeEmitEvent(ParsedLine line, String taskIDName,
HistoryEventEmitter thatg) {
if (taskIDName == null) {
return null;
}
TaskID taskID = TaskID.forName(taskIDName);
String status = line.get("TASK_STATUS");
String finishTime = line.get("FINISH_TIME");
String taskType = line.get("TASK_TYPE");
String error = line.get("ERROR");
if (finishTime != null
&& (error != null || (status != null && !status
.equalsIgnoreCase("success")))) {
Task20LineHistoryEventEmitter that =
(Task20LineHistoryEventEmitter) thatg;
TaskType originalTaskType =
that.originalTaskType == null ? Version20LogInterfaceUtils
.get20TaskType(taskType) : that.originalTaskType;
return new TaskFailedEvent(taskID, Long.parseLong(finishTime),
originalTaskType, error, status, null);
}
return null;
}
HistoryEvent maybeEmitEvent(ParsedLine line, String taskAttemptIDName,
HistoryEventEmitter thatg) {
if (taskAttemptIDName == null) {
return null;
}
TaskAttemptID taskAttemptID = TaskAttemptID.forName(taskAttemptIDName);
String finishTime = line.get("FINISH_TIME");
String status = line.get("TASK_STATUS");
if (finishTime != null && status != null
&& status.equalsIgnoreCase("success")) {
String hostName = line.get("HOSTNAME");
String counters = line.get("COUNTERS");
String state = line.get("STATE_STRING");
MapAttempt20LineHistoryEventEmitter that =
(MapAttempt20LineHistoryEventEmitter) thatg;
if ("success".equalsIgnoreCase(status)) {
return new MapAttemptFinishedEvent
(taskAttemptID,
that.originalTaskType, status,
Long.parseLong(finishTime),
Long.parseLong(finishTime),
hostName, -1, null, state, maybeParseCounters(counters),
null);
}
}
return null;
}
HistoryEvent maybeEmitEvent(ParsedLine line, String jobIDName,
HistoryEventEmitter thatg) {
if (jobIDName == null) {
return null;
}
JobID jobID = JobID.forName(jobIDName);
String finishTime = line.get("FINISH_TIME");
String status = line.get("JOB_STATUS");
String finishedMaps = line.get("FINISHED_MAPS");
String finishedReduces = line.get("FINISHED_REDUCES");
String failedMaps = line.get("FAILED_MAPS");
String failedReduces = line.get("FAILED_REDUCES");
String counters = line.get("COUNTERS");
if (status != null && status.equalsIgnoreCase("success")
&& finishTime != null && finishedMaps != null
&& finishedReduces != null) {
return new JobFinishedEvent(jobID, Long.parseLong(finishTime), Integer
.parseInt(finishedMaps), Integer.parseInt(finishedReduces), Integer
.parseInt(failedMaps), Integer.parseInt(failedReduces), null, null,
maybeParseCounters(counters));
}
return null;
}
HistoryEvent maybeEmitEvent(ParsedLine line, String taskAttemptIDName,
HistoryEventEmitter thatg) {
if (taskAttemptIDName == null) {
return null;
}
TaskAttemptID taskAttemptID = TaskAttemptID.forName(taskAttemptIDName);
String startTime = line.get("START_TIME");
String taskType = line.get("TASK_TYPE");
String trackerName = line.get("TRACKER_NAME");
String httpPort = line.get("HTTP_PORT");
String locality = line.get("LOCALITY");
if (locality == null) {
locality = "";
}
String avataar = line.get("AVATAAR");
if (avataar == null) {
avataar = "";
}
if (startTime != null && taskType != null) {
TaskAttempt20LineEventEmitter that =
(TaskAttempt20LineEventEmitter) thatg;
that.originalStartTime = Long.parseLong(startTime);
that.originalTaskType =
Version20LogInterfaceUtils.get20TaskType(taskType);
int port =
httpPort.equals("") ? DEFAULT_HTTP_PORT : Integer
.parseInt(httpPort);
return new TaskAttemptStartedEvent(taskAttemptID,
that.originalTaskType, that.originalStartTime, trackerName, port, -1,
locality, avataar);
}
return null;
}
HistoryEvent maybeEmitEvent(ParsedLine line, String taskAttemptIDName,
HistoryEventEmitter thatg) {
if (taskAttemptIDName == null) {
return null;
}
TaskAttemptID taskAttemptID = TaskAttemptID.forName(taskAttemptIDName);
String finishTime = line.get("FINISH_TIME");
String status = line.get("TASK_STATUS");
if (finishTime != null && status != null
&& status.equalsIgnoreCase("success")) {
String hostName = line.get("HOSTNAME");
String counters = line.get("COUNTERS");
String state = line.get("STATE_STRING");
TaskAttempt20LineEventEmitter that =
(TaskAttempt20LineEventEmitter) thatg;
ParsedHost pHost = ParsedHost.parse(hostName);
return new TaskAttemptFinishedEvent(taskAttemptID,
that.originalTaskType, status, Long.parseLong(finishTime),
pHost.getRackName(), pHost.getNodeName(), state,
maybeParseCounters(counters));
}
return null;
}
HistoryEvent maybeEmitEvent(ParsedLine line, String taskAttemptIDName,
HistoryEventEmitter thatg) {
if (taskAttemptIDName == null) {
return null;
}
TaskAttemptID taskAttemptID = TaskAttemptID.forName(taskAttemptIDName);
String finishTime = line.get("FINISH_TIME");
String status = line.get("TASK_STATUS");
if (finishTime != null && status != null
&& !status.equalsIgnoreCase("success")) {
String hostName = line.get("HOSTNAME");
String error = line.get("ERROR");
TaskAttempt20LineEventEmitter that =
(TaskAttempt20LineEventEmitter) thatg;
ParsedHost pHost = ParsedHost.parse(hostName);
String rackName = null;
// Earlier versions of MR logged on hostnames (without rackname) for
// unsuccessful attempts
if (pHost != null) {
rackName = pHost.getRackName();
hostName = pHost.getNodeName();
}
return new TaskAttemptUnsuccessfulCompletionEvent
(taskAttemptID,
that.originalTaskType, status, Long.parseLong(finishTime),
hostName, -1, rackName, error, null);
}
return null;
}
void processJobHistory(JobHistoryParser parser, JobBuilder jobBuilder)
throws IOException {
HistoryEvent e;
while ((e = parser.nextEvent()) != null) {
jobBuilder.process(e);
topologyBuilder.process(e);
}
parser.close();
}
private void verifyFailedStatus(MRAppMasterTest appMaster,
String expectedJobState) {
ArgumentCaptor<JobHistoryEvent> captor = ArgumentCaptor
.forClass(JobHistoryEvent.class);
// handle two events: AMStartedEvent and JobUnsuccessfulCompletionEvent
verify(appMaster.spyHistoryService, times(2))
.handleEvent(captor.capture());
HistoryEvent event = captor.getValue().getHistoryEvent();
assertTrue(event instanceof JobUnsuccessfulCompletionEvent);
assertEquals(((JobUnsuccessfulCompletionEvent) event).getStatus()
, expectedJobState);
}
HistoryEvent maybeEmitEvent(ParsedLine line, String taskAttemptIDName,
HistoryEventEmitter thatg) {
if (taskAttemptIDName == null) {
return null;
}
TaskAttemptID taskAttemptID = TaskAttemptID.forName(taskAttemptIDName);
String finishTime = line.get("FINISH_TIME");
String status = line.get("TASK_STATUS");
if (finishTime != null && status != null
&& status.equalsIgnoreCase("success")) {
String hostName = line.get("HOSTNAME");
String counters = line.get("COUNTERS");
String state = line.get("STATE_STRING");
String shuffleFinish = line.get("SHUFFLE_FINISHED");
String sortFinish = line.get("SORT_FINISHED");
if (shuffleFinish != null && sortFinish != null
&& "success".equalsIgnoreCase(status)) {
ReduceAttempt20LineHistoryEventEmitter that =
(ReduceAttempt20LineHistoryEventEmitter) thatg;
return new ReduceAttemptFinishedEvent
(taskAttemptID,
that.originalTaskType, status,
Long.parseLong(shuffleFinish),
Long.parseLong(sortFinish),
Long.parseLong(finishTime),
hostName, -1, null,
state, maybeParseCounters(counters),
null);
}
}
return null;
}
HistoryEvent maybeEmitEvent(ParsedLine line, String taskIDName,
HistoryEventEmitter thatg) {
if (taskIDName == null) {
return null;
}
TaskID taskID = TaskID.forName(taskIDName);
String status = line.get("TASK_STATUS");
String finishTime = line.get("FINISH_TIME");
String error = line.get("ERROR");
String counters = line.get("COUNTERS");
if (finishTime != null && error == null
&& (status != null && status.equalsIgnoreCase("success"))) {
Counters eventCounters = maybeParseCounters(counters);
Task20LineHistoryEventEmitter that =
(Task20LineHistoryEventEmitter) thatg;
if (that.originalTaskType == null) {
return null;
}
return new TaskFinishedEvent(taskID, null, Long.parseLong(finishTime),
that.originalTaskType, status, eventCounters);
}
return null;
}