下面列出了org.apache.hadoop.mapred.JobStatus#getRunState ( ) 实例代码,或者点击链接到github查看源代码,也可以在右侧发表评论。
/**
* The getMapCompletion method is intended to inform taskTrackes when to change the status
* of reduce tasks from "shuffle" to "reduce".
* For all reduce tasks in this TaskTracker that are
* in the shuffle phase, getMapCompletionTasks finds the number of finished maps for
* this job from the jobInProgress object. If this
* number equals the number of desired maps for this job, then it adds an
* AllMapsCompletedTaskAction for this reduce task-attempt.
*
* @param status
* The status of the task tracker
* @return List of TaskTrackerActions
*/
private List<TaskTrackerAction> getMapCompletionTasks(
TaskTrackerStatus status,
List<TaskTrackerAction> tasksToKill) {
boolean loggingEnabled = LOG.isDebugEnabled();
// Build up the list of tasks about to be killed
Set<TaskAttemptID> killedTasks = new HashSet<TaskAttemptID>();
if (tasksToKill != null) {
for (TaskTrackerAction taskToKill : tasksToKill) {
killedTasks.add(((KillTaskAction)taskToKill).getTaskID());
}
}
String trackerName = status.getTrackerName();
List<TaskTrackerAction> actions = new ArrayList<TaskTrackerAction>();
// loop through the list of task statuses
for (TaskStatus report : status.getTaskReports()) {
TaskAttemptID taskAttemptId = report.getTaskID();
SimulatorJobInProgress job = getSimulatorJob(taskAttemptId.getJobID());
if(job ==null) {
// This job has completed before.
// and this is a zombie reduce-task
Set<JobID> jobsToCleanup = trackerToJobsToCleanup.get(trackerName);
if (jobsToCleanup == null) {
jobsToCleanup = new HashSet<JobID>();
trackerToJobsToCleanup.put(trackerName, jobsToCleanup);
}
jobsToCleanup.add(taskAttemptId.getJobID());
continue;
}
JobStatus jobStatus = job.getStatus();
TaskInProgress tip = taskidToTIPMap.get(taskAttemptId);
// if the job is running, attempt is running
// no KillTask is being sent for this attempt
// task is a reduce and attempt is in shuffle phase
// this precludes sending both KillTask and AllMapsCompletion
// for same reduce-attempt
if (jobStatus.getRunState()== JobStatus.RUNNING &&
tip.isRunningTask(taskAttemptId) &&
!killedTasks.contains(taskAttemptId) &&
!report.getIsMap() &&
report.getPhase() == TaskStatus.Phase.SHUFFLE) {
if (loggingEnabled) {
LOG.debug("Need map-completion information for REDUCEattempt "
+ taskAttemptId + " in tracker " + trackerName);
LOG.debug("getMapCompletion: job=" + job.getJobID() + " pendingMaps="
+ job.pendingMaps());
}
// Check whether the number of finishedMaps equals the
// number of maps
boolean canSendMapCompletion = false;
canSendMapCompletion = (job.finishedMaps()==job.desiredMaps());
if (canSendMapCompletion) {
if (loggingEnabled) {
LOG.debug("Adding MapCompletion for taskAttempt " + taskAttemptId
+ " in tracker " + trackerName);
LOG.debug("FinishedMaps for job:" + job.getJobID() + " is = "
+ job.finishedMaps() + "/" + job.desiredMaps());
LOG.debug("AllMapsCompleted for task " + taskAttemptId + " time="
+ getClock().getTime());
}
actions.add(new AllMapsCompletedTaskAction(taskAttemptId));
}
}
}
return actions;
}
@Override
void updateTaskStatuses(TaskTrackerStatus status) {
boolean loggingEnabled = LOG.isDebugEnabled();
String trackerName = status.getTrackerName();
// loop through the list of task statuses
if (loggingEnabled) {
LOG.debug("Updating task statuses for tracker " + trackerName);
}
for (TaskStatus report : status.getTaskReports()) {
report.setTaskTracker(trackerName);
TaskAttemptID taskAttemptId = report.getTaskID();
JobID jobid = taskAttemptId.getJobID();
if (loggingEnabled) {
LOG.debug("Updating status for job " + jobid + " for task = "
+ taskAttemptId + " status=" + report.getProgress()
+ " for tracker: " + trackerName);
}
SimulatorJobInProgress job =
getSimulatorJob(taskAttemptId.getJobID());
if(job ==null) {
// This job bas completed before.
Set<JobID> jobsToCleanup = trackerToJobsToCleanup.get(trackerName);
if (jobsToCleanup == null) {
jobsToCleanup = new HashSet<JobID>();
trackerToJobsToCleanup.put(trackerName, jobsToCleanup);
}
jobsToCleanup.add(taskAttemptId.getJobID());
continue;
}
TaskInProgress tip = taskidToTIPMap.get(taskAttemptId);
JobStatus prevStatus = (JobStatus) job.getStatus().clone();
job.updateTaskStatus(tip, (TaskStatus) report.clone());
JobStatus newStatus = (JobStatus) job.getStatus().clone();
if (tip.isComplete()) {
if (loggingEnabled) {
LOG.debug("Completed task attempt " + taskAttemptId + " tracker:"
+ trackerName + " time=" + getClock().getTime());
}
}
if (prevStatus.getRunState() != newStatus.getRunState()) {
if (loggingEnabled) {
LOG.debug("Informing Listeners of job " + jobid + " of newStatus "
+ JobStatus.getJobRunState(newStatus.getRunState()));
}
JobStatusChangeEvent event = new JobStatusChangeEvent(job,
EventType.RUN_STATE_CHANGED, prevStatus, newStatus);
updateJobInProgressListeners(event);
}
}
}