
下面列出了org.apache.hadoop.mapred.TaskStatus.State#RUNNING 实例代码,或者点击链接到github查看源代码,也可以在右侧发表评论。

源代码1 项目: RDFS   文件:
 * Frees up bookkeping memory used by completed tasks. 
 * Has no effect on the events or logs produced by the SimulatorTaskTracker.
 * We need this in order not to report completed task multiple times and 
 * to ensure that we do not run out of Java heap memory in larger 
 * simulations.
private void garbageCollectCompletedTasks() {
  for (Iterator<TaskAttemptID> iter = tasks.keySet().iterator();
       iter.hasNext();) {
    TaskAttemptID taskId =;
    SimulatorTaskInProgress tip = tasks.get(taskId);
    if (tip.getTaskStatus().getRunState() != State.RUNNING) {
      if (LOG.isDebugEnabled()) {
        LOG.debug("Garbage collected SimulatorTIP, taskId=" + taskId);
      // We don't have to / must not touch usedMapSlots and usedReduceSlots
      // as those were already updated by processTaskAttemptCompletionEvent() 
      // when the task switched its state from running
源代码2 项目: RDFS   文件:
public int getRunningReduceTasks() {
  int running = 0;
  for (TaskInfo task : localTasksInfo) {
    if (!task.isMap() &&
            task.getTaskState() == State.RUNNING) {
  return running;
源代码3 项目: RDFS   文件:
public long getTotalWastedTime() {
  long total = 0;
  for (TaskInfo task : localTasksInfo) {
    if (task.isMap() ||
            (task.getTaskState() == State.RUNNING &&
            task.getTaskProgress() > 0)) {
      // The reduces that did not yet progress can be considered not started
      total += task.getRunningTime();
  return total;
源代码4 项目: RDFS   文件:
public long getRunningTimeWasted() {
  long runningTimeWasted = 0;
  for (TaskInfo task : localTasksInfo) {
    if (task.getTaskState() == State.RUNNING &&
            (task.isMap() || task.getTaskProgress() > 0)) {
      runningTimeWasted += task.getRunningTime();

  // Another level of complexity here would be to count the time it would
  // take to rerun all the map tasks that were not fetched yet.
  return runningTimeWasted;
源代码5 项目: RDFS   文件:
int findLaunchTaskActions(HeartbeatResponse response) {
  TaskTrackerAction[] actions = response.getActions();
  int numLaunchTaskActions = 0;
  // HashSet<> numLaunchTaskActions
  for (TaskTrackerAction action : actions) {
    if (action instanceof SimulatorLaunchTaskAction) {
      Task task = ((SimulatorLaunchTaskAction) action).getTask();

      TaskAttemptID taskId = task.getTaskID();
      if (tasks.containsKey(taskId)) {
        // already have this not need to generate new status
      TaskStatus status;
      if (task.isMapTask()) {
        status = new MapTaskStatus(taskId, 0f, 1, State.RUNNING, "", "",
            taskTrackerName, Phase.MAP, new Counters());
      } else {
        status = new ReduceTaskStatus(taskId, 0f, 1, State.RUNNING, "", "",
            taskTrackerName, Phase.SHUFFLE, new Counters());
      SimulatorTaskInProgress tip = new SimulatorTaskInProgress(
          (SimulatorLaunchTaskAction) action, status,;
      tasks.put(taskId, tip);
  return numLaunchTaskActions;
源代码6 项目: RDFS   文件:
 * Is the given task considered as 'running' ?
 * @param taskStatus
 * @return
private boolean isTaskRunning(TaskStatus taskStatus) {
  TaskStatus.State state = taskStatus.getRunState();
  return (state == State.RUNNING || state == State.UNASSIGNED || 
源代码7 项目: RDFS   文件:
private boolean isTaskRunning(TaskStatus.State state) {
  return (state == State.RUNNING || state == State.UNASSIGNED);
源代码8 项目: RDFS   文件:
 * Stops running a task attempt on the task tracker. It also updates the 
 * number of available slots accordingly.
 * @param finalStatus the TaskStatus containing the task id and final 
 *        status of the task attempt. This rountine asserts a lot of the
 *        finalStatus params, in case it is coming from a task attempt
 *        completion event sent to ourselves. Only the run state, finish time,
 *        and progress fields of the task attempt are updated.
 * @param now Current simulation time, used for assert only
private void finishRunningTask(TaskStatus finalStatus, long now) {
  TaskAttemptID taskId = finalStatus.getTaskID();
  if (LOG.isDebugEnabled()) {
    LOG.debug("Finishing running task id=" + taskId + ", now=" + now);

  SimulatorTaskInProgress tip = tasks.get(taskId);
  if (tip == null) {
    throw new IllegalArgumentException("Unknown task attempt " + taskId
        + " completed");
  TaskStatus currentStatus = tip.getTaskStatus();
  if (currentStatus.getRunState() != State.RUNNING) {
    throw new IllegalArgumentException(
        "Task attempt to finish is not running: " + tip);

  // Check that finalStatus describes a task attempt that has just been
  // completed
  State finalRunState = finalStatus.getRunState();
  if (finalRunState != State.SUCCEEDED && finalRunState != State.FAILED
      && finalRunState != State.KILLED) {
    throw new IllegalArgumentException(
        "Final run state for completed task can't be : " + finalRunState
            + " " + tip);

  if (now != finalStatus.getFinishTime()) {
    throw new IllegalArgumentException(
        "Current time does not match task finish time: now=" + now
            + ", finish=" + finalStatus.getFinishTime());

  if (currentStatus.getIsMap() != finalStatus.getIsMap()
      || currentStatus.getNumSlots() != finalStatus.getNumSlots()
      || currentStatus.getPhase() != finalStatus.getPhase()
      || currentStatus.getStartTime() != finalStatus.getStartTime()) {
    throw new IllegalArgumentException(
        "Current status does not match final status");

  // We can't assert getShuffleFinishTime() and getSortFinishTime() for
  // reduces as those were unknown when the task attempt completion event
  // was created. We have not called setMapFinishTime() for maps either.
  // If we were really thorough we could update the progress of the task
  // and check if it is consistent with finalStatus.

  // If we've got this far it is safe to update the task status

  // and update the free slots
  int numSlots = currentStatus.getNumSlots();
  if (tip.isMapTask()) {
    usedMapSlots -= numSlots;
    if (usedMapSlots < 0) {
      throw new IllegalStateException(
          "TaskTracker reaches negative map slots: " + usedMapSlots);
  } else {
    usedReduceSlots -= numSlots;
    if (usedReduceSlots < 0) {
      throw new IllegalStateException(
          "TaskTracker reaches negative reduce slots: " + usedReduceSlots);
源代码9 项目: RDFS   文件:
 * Launches a task on the simulated task tracker. 
 * @param action SimulatorLaunchTaskAction sent by the job tracker
 * @param now current simulation time
 * @return new events generated, a TaskAttemptCompletionEvent for map
 *         tasks, empty otherwise
private List<SimulatorEvent> handleSimulatorLaunchTaskAction(
                       SimulatorLaunchTaskAction action, long now) {
  if (LOG.isDebugEnabled()) {
    LOG.debug("Handling launch task action " + action);
  // First, create statuses and update used slots for map and reduce 
  // task separately
  Task task = action.getTask();
  TaskAttemptID taskId = task.getTaskID();
  if (tasks.containsKey(taskId)) {
    throw new IllegalArgumentException("Multiple launch of task id =" + taskId);

  // Ctor of MapTaskStatus and ReduceTaskStatus need deprecated 
  // o.a.h.mapred.TaskAttemptID, hence the downgrade
  org.apache.hadoop.mapred.TaskAttemptID taskIdOldApi = 
  TaskStatus status;
  int numSlotsRequired = task.getNumSlotsRequired();    
  Counters emptyCounters = new Counters();
  if (task.isMapTask()) {     
    status = new MapTaskStatus(taskIdOldApi, 0f, numSlotsRequired,
                               State.RUNNING, "", "", taskTrackerName, 
                               Phase.MAP, emptyCounters);
    usedMapSlots += numSlotsRequired;
    if (usedMapSlots > maxMapSlots) {
      throw new IllegalStateException("usedMapSlots exceeds maxMapSlots: " + 
        usedMapSlots + " > " + maxMapSlots);
  } else {
    status = new ReduceTaskStatus(taskIdOldApi, 0f, numSlotsRequired, 
                                  State.RUNNING, "", "", taskTrackerName,
                                  Phase.SHUFFLE, emptyCounters);
    usedReduceSlots += numSlotsRequired;
    if (usedReduceSlots > maxReduceSlots) {
      throw new IllegalStateException("usedReduceSlots exceeds usedReduceSlots: " + 
          usedReduceSlots + " > " + usedReduceSlots);
  //  Second, create and store a TIP
  SimulatorTaskInProgress tip = 
    new SimulatorTaskInProgress(action, status, now);
  tasks.put(taskId, tip);

  // Third, schedule events for ourselves
  if (task.isMapTask()) {
    // we know when this task attempts ends iff it is a map 
    TaskAttemptCompletionEvent e = createTaskAttemptCompletionEvent(tip, now);
    return Collections.<SimulatorEvent>singletonList(e);
  } else { 
    // reduce, completion time can only be determined when all maps are done
    return SimulatorEngine.EMPTY_EVENTS;
源代码10 项目: RDFS   文件:
 * Updates the progress indicator of a task if it is running.
 * @param tip simulator task in progress whose progress is to be updated
 * @param now current simulation time
private void progressTaskStatus(SimulatorTaskInProgress tip, long now) {
  TaskStatus status = tip.getTaskStatus();
  if (status.getRunState() != State.RUNNING) {
    return; // nothing to be done

  boolean isMap = tip.isMapTask();
  // Time when the user space code started
  long startTime = -1;
  // Time spent in map or just in the REDUCE phase of a reduce task
  long runTime = tip.getUserSpaceRunTime();
  float progress = 0.0f;
  if (isMap) {
    // We linearly estimate the progress of maps since their start 
    startTime = status.getStartTime();
    progress = ((float)(now - startTime)) / runTime;
  } else {
    // We don't model reduce progress in the SHUFFLE or SORT phases
    // We use linear estimate for the 3rd, REDUCE phase
    Phase reducePhase = status.getPhase();
    switch (reducePhase) {
    case SHUFFLE:
      progress = 0.0f; // 0 phase is done out of 3
    case SORT:
      progress = 1.0f/3; // 1 phase is done out of 3
    case REDUCE: {
      // REDUCE phase with the user code started when sort finished
      startTime = status.getSortFinishTime();
      // 0.66f : 2 phases are done out of of 3
      progress = 2.0f/3 + (((float) (now - startTime)) / runTime) / 3.0f;
      // should never get here
      throw new IllegalArgumentException("Invalid reducePhase=" + reducePhase);
  final float EPSILON = 0.0001f;
  if (progress < -EPSILON || progress > 1 + EPSILON) {
    throw new IllegalStateException("Task progress out of range: " + progress);
  progress = Math.max(Math.min(1.0f, progress), 0.0f);
  if (LOG.isDebugEnabled()) {
    LOG.debug("Updated task progress, taskId=" + status.getTaskID()
        + ", progress=" + status.getProgress());
源代码11 项目: RDFS   文件:
public void runMapTask(String taskTrackerName, TaskAttemptID taskId,
                       long mapStart, long mapRuntime, long killHeartbeat) {
  long mapDone = mapStart + mapRuntime;
  long mapEndHeartbeat = nextHeartbeat(mapDone);
  final boolean isKilled = (killHeartbeat>=0);
  if (isKilled) {
    mapEndHeartbeat = nextHeartbeat(killHeartbeat + 1);

  LOG.debug("mapStart=" + mapStart + ", mapDone=" + mapDone + 
            ", mapEndHeartbeat=" + mapEndHeartbeat + 
            ", killHeartbeat=" + killHeartbeat);
  final int numSlotsRequired = 1;
  org.apache.hadoop.mapred.TaskAttemptID taskIdOldApi = 
  Task task = new MapTask("dummyjobfile", taskIdOldApi, 0, "dummysplitclass",
                          null, numSlotsRequired);
  // all byte counters are 0
  TaskInfo taskInfo = new TaskInfo(0, 0, 0, 0, 0); 
  MapTaskAttemptInfo taskAttemptInfo = 
      new MapTaskAttemptInfo(State.SUCCEEDED, taskInfo, mapRuntime);
  TaskTrackerAction action = 
      new SimulatorLaunchTaskAction(task, taskAttemptInfo);
  if (isKilled) {
    action = new KillTaskAction(taskIdOldApi);

  for(long simulationTime = mapStart + heartbeatInterval; 
      simulationTime <= mapEndHeartbeat;
      simulationTime += heartbeatInterval) {
    State state = simulationTime < mapEndHeartbeat ? 
        State.RUNNING : State.SUCCEEDED;
    if (simulationTime == mapEndHeartbeat && isKilled) {
      state = State.KILLED;
    MapTaskStatus mapStatus = new MapTaskStatus(
        task.getTaskID(), 0.0f, 0, state, "", "", null, Phase.MAP, null);
源代码12 项目: RDFS   文件:
public void runReduceTask(String taskTrackerName, TaskAttemptID taskId,
                          long reduceStart, long mapDoneDelay, 
                          long reduceRuntime, long killHeartbeat) {
  long mapDone = nextHeartbeat(reduceStart + mapDoneDelay);
  long reduceDone = mapDone + reduceRuntime;
  long reduceEndHeartbeat = nextHeartbeat(reduceDone);
  final boolean isKilled = (killHeartbeat>=0);
  if (isKilled) {
    reduceEndHeartbeat = nextHeartbeat(killHeartbeat + 1);

  LOG.debug("reduceStart=" + reduceStart + ", mapDone=" + mapDone + 
            ", reduceDone=" + reduceDone + 
            ", reduceEndHeartbeat=" + reduceEndHeartbeat +
            ", killHeartbeat=" + killHeartbeat);

  final int numSlotsRequired = 1;
  org.apache.hadoop.mapred.TaskAttemptID taskIdOldApi = 
  Task task = new ReduceTask("dummyjobfile", taskIdOldApi, 0, 0,
  // all byte counters are 0
  TaskInfo taskInfo = new TaskInfo(0, 0, 0, 0, 0); 
  ReduceTaskAttemptInfo taskAttemptInfo = 
      new ReduceTaskAttemptInfo(State.SUCCEEDED, taskInfo, 0, 0, 
  TaskTrackerAction action = 
      new SimulatorLaunchTaskAction(task, taskAttemptInfo);    
  if (!isKilled || mapDone < killHeartbeat) {
    action = new AllMapsCompletedTaskAction(task.getTaskID());
  if (isKilled) {
    action = new KillTaskAction(taskIdOldApi);

  for(long simulationTime = reduceStart + heartbeatInterval; 
      simulationTime <= reduceEndHeartbeat;
      simulationTime += heartbeatInterval) {
    State state = simulationTime < reduceEndHeartbeat ? 
        State.RUNNING : State.SUCCEEDED;
    if (simulationTime == reduceEndHeartbeat && isKilled) {
      state = State.KILLED;
    // mapDone is when the all maps done event delivered
    Phase phase = simulationTime <= mapDone ? Phase.SHUFFLE : Phase.REDUCE; 
    ReduceTaskStatus reduceStatus = new ReduceTaskStatus(
        task.getTaskID(), 0.0f, 0, state, "", "", null, phase, null);