org.apache.hadoop.mapred.JobStatus#getRunState ( )源码实例Demo

下面列出了org.apache.hadoop.mapred.JobStatus#getRunState ( ) 实例代码,或者点击链接到github查看源代码,也可以在右侧发表评论。

源代码1 项目: RDFS   文件: SimulatorJobTracker.java
/**
 * The getMapCompletion method is intended to inform taskTrackes when to change the status
 * of reduce tasks from "shuffle" to "reduce".
 * For all reduce tasks in this TaskTracker that are
 * in the shuffle phase, getMapCompletionTasks finds the number of finished maps for 
 * this job from the jobInProgress object. If this
 * number equals the number of desired maps for this job, then it adds an 
 * AllMapsCompletedTaskAction for this reduce task-attempt.
 * 
 * @param status
 *            The status of the task tracker
 * @return List of TaskTrackerActions
 */
private List<TaskTrackerAction> getMapCompletionTasks(
    TaskTrackerStatus status, 
    List<TaskTrackerAction> tasksToKill) {
  boolean loggingEnabled = LOG.isDebugEnabled();
  
  // Build up the list of tasks about to be killed
  Set<TaskAttemptID> killedTasks = new HashSet<TaskAttemptID>();
  if (tasksToKill != null) {
    for (TaskTrackerAction taskToKill : tasksToKill) {
      killedTasks.add(((KillTaskAction)taskToKill).getTaskID());
    }
  }

  String trackerName = status.getTrackerName();

  List<TaskTrackerAction> actions = new ArrayList<TaskTrackerAction>();

  // loop through the list of task statuses
  for (TaskStatus report : status.getTaskReports()) {

    TaskAttemptID taskAttemptId = report.getTaskID();
    SimulatorJobInProgress job = getSimulatorJob(taskAttemptId.getJobID());
    
    if(job ==null) {
      // This job has completed before.
      // and this is a zombie reduce-task
      Set<JobID> jobsToCleanup = trackerToJobsToCleanup.get(trackerName);
      if (jobsToCleanup == null) {
        jobsToCleanup = new HashSet<JobID>();
        trackerToJobsToCleanup.put(trackerName, jobsToCleanup);
      }
      jobsToCleanup.add(taskAttemptId.getJobID());
      continue;
    }   
    JobStatus jobStatus = job.getStatus();
    TaskInProgress tip = taskidToTIPMap.get(taskAttemptId);

    // if the  job is running, attempt is running
    // no KillTask is being sent for this attempt
    // task is a reduce and attempt is in shuffle phase
    // this precludes sending both KillTask and AllMapsCompletion 
    // for same reduce-attempt 

    if (jobStatus.getRunState()== JobStatus.RUNNING && 
        tip.isRunningTask(taskAttemptId) &&
        !killedTasks.contains(taskAttemptId) && 
        !report.getIsMap() &&
        report.getPhase() == TaskStatus.Phase.SHUFFLE) {

      if (loggingEnabled) {
        LOG.debug("Need map-completion information for REDUCEattempt "
            + taskAttemptId + " in tracker " + trackerName);

        LOG.debug("getMapCompletion: job=" + job.getJobID() + " pendingMaps="
            + job.pendingMaps());
      }
      // Check whether the number of finishedMaps equals the
      // number of maps
      boolean canSendMapCompletion = false;
     
      canSendMapCompletion = (job.finishedMaps()==job.desiredMaps());	

      if (canSendMapCompletion) {
        if (loggingEnabled) {
          LOG.debug("Adding MapCompletion for taskAttempt " + taskAttemptId
              + " in tracker " + trackerName);

          LOG.debug("FinishedMaps for job:" + job.getJobID() + " is = "
              + job.finishedMaps() + "/" + job.desiredMaps());

          LOG.debug("AllMapsCompleted for task " + taskAttemptId + " time="
              + getClock().getTime());
        }
        actions.add(new AllMapsCompletedTaskAction(taskAttemptId));
      }
    }
  }
  return actions;
}
 
源代码2 项目: RDFS   文件: SimulatorJobTracker.java
@Override
void updateTaskStatuses(TaskTrackerStatus status) {
  boolean loggingEnabled = LOG.isDebugEnabled();
  String trackerName = status.getTrackerName();
  // loop through the list of task statuses
  if (loggingEnabled) {
    LOG.debug("Updating task statuses for tracker " + trackerName);
  }
  for (TaskStatus report : status.getTaskReports()) {
    report.setTaskTracker(trackerName);
    TaskAttemptID taskAttemptId = report.getTaskID();
    JobID jobid = taskAttemptId.getJobID();
    if (loggingEnabled) {
      LOG.debug("Updating status for job " + jobid + " for task = "
          + taskAttemptId + " status=" + report.getProgress()
          + " for tracker: " + trackerName);
    }
    SimulatorJobInProgress job = 
      getSimulatorJob(taskAttemptId.getJobID());

    if(job ==null) {
      // This job bas completed before.
      Set<JobID> jobsToCleanup = trackerToJobsToCleanup.get(trackerName);
      if (jobsToCleanup == null) {
        jobsToCleanup = new HashSet<JobID>();
        trackerToJobsToCleanup.put(trackerName, jobsToCleanup);
      }
      jobsToCleanup.add(taskAttemptId.getJobID());
      continue;
    }
    TaskInProgress tip = taskidToTIPMap.get(taskAttemptId);

    JobStatus prevStatus = (JobStatus) job.getStatus().clone();
    job.updateTaskStatus(tip, (TaskStatus) report.clone());
    JobStatus newStatus = (JobStatus) job.getStatus().clone();
    if (tip.isComplete()) {
      if (loggingEnabled) {
        LOG.debug("Completed task attempt " + taskAttemptId + " tracker:"
            + trackerName + " time=" + getClock().getTime());
      }
    }

    if (prevStatus.getRunState() != newStatus.getRunState()) {
      if (loggingEnabled) {
        LOG.debug("Informing Listeners of job " + jobid + " of newStatus "
            + JobStatus.getJobRunState(newStatus.getRunState()));
      }
      JobStatusChangeEvent event = new JobStatusChangeEvent(job,
          EventType.RUN_STATE_CHANGED, prevStatus, newStatus);

      updateJobInProgressListeners(event);
    }

  }
}
 
 同类方法