类org.apache.hadoop.mapred.TaskStatus源码实例Demo

下面列出了怎么用org.apache.hadoop.mapred.TaskStatus的API类实例代码及写法,或者点击链接到github查看源代码。

源代码1 项目: hadoop   文件: JobBuilder.java
private static Values getPre21Value(String name) {
  if (name.equalsIgnoreCase("JOB_CLEANUP")) {
    return Values.CLEANUP;
  }
  if (name.equalsIgnoreCase("JOB_SETUP")) {
    return Values.SETUP;
  }

  // Note that pre-21, the task state of a successful task was logged as 
  // SUCCESS while from 21 onwards, its logged as SUCCEEDED.
  if (name.equalsIgnoreCase(TaskStatus.State.SUCCEEDED.toString())) {
    return Values.SUCCESS;
  }
  
  return Values.valueOf(StringUtils.toUpperCase(name));
}
 
源代码2 项目: hadoop   文件: SleepJob.java
private TaskAttemptInfo getSuccessfulAttemptInfo(TaskType type, int task) {
  TaskAttemptInfo ret;
  for (int i = 0; true; ++i) {
    // Rumen should make up an attempt if it's missing. Or this won't work
    // at all. It's hard to discern what is happening in there.
    ret = jobdesc.getTaskAttemptInfo(type, task, i);
    if (ret.getRunState() == TaskStatus.State.SUCCEEDED) {
      break;
    }
  }
  if(ret.getRunState() != TaskStatus.State.SUCCEEDED) {
    LOG.warn("No sucessful attempts tasktype " + type +" task "+ task);
  }

  return ret;
}
 
源代码3 项目: big-c   文件: JobBuilder.java
private static Values getPre21Value(String name) {
  if (name.equalsIgnoreCase("JOB_CLEANUP")) {
    return Values.CLEANUP;
  }
  if (name.equalsIgnoreCase("JOB_SETUP")) {
    return Values.SETUP;
  }

  // Note that pre-21, the task state of a successful task was logged as 
  // SUCCESS while from 21 onwards, its logged as SUCCEEDED.
  if (name.equalsIgnoreCase(TaskStatus.State.SUCCEEDED.toString())) {
    return Values.SUCCESS;
  }
  
  return Values.valueOf(StringUtils.toUpperCase(name));
}
 
源代码4 项目: big-c   文件: SleepJob.java
private TaskAttemptInfo getSuccessfulAttemptInfo(TaskType type, int task) {
  TaskAttemptInfo ret;
  for (int i = 0; true; ++i) {
    // Rumen should make up an attempt if it's missing. Or this won't work
    // at all. It's hard to discern what is happening in there.
    ret = jobdesc.getTaskAttemptInfo(type, task, i);
    if (ret.getRunState() == TaskStatus.State.SUCCEEDED) {
      break;
    }
  }
  if(ret.getRunState() != TaskStatus.State.SUCCEEDED) {
    LOG.warn("No sucessful attempts tasktype " + type +" task "+ task);
  }

  return ret;
}
 
源代码5 项目: hadoop   文件: JobHistoryParser.java
private void handleTaskFinishedEvent(TaskFinishedEvent event) {
  TaskInfo taskInfo = info.tasksMap.get(event.getTaskId());
  taskInfo.counters = event.getCounters();
  taskInfo.finishTime = event.getFinishTime();
  taskInfo.status = TaskStatus.State.SUCCEEDED.toString();
  taskInfo.successfulAttemptId = event.getSuccessfulTaskAttemptId();
}
 
源代码6 项目: hadoop   文件: JobHistoryParser.java
private void handleTaskFailedEvent(TaskFailedEvent event) {
  TaskInfo taskInfo = info.tasksMap.get(event.getTaskId());
  taskInfo.status = TaskStatus.State.FAILED.toString();
  taskInfo.finishTime = event.getFinishTime();
  taskInfo.error = StringInterner.weakIntern(event.getError());
  taskInfo.failedDueToAttemptId = event.getFailedAttemptID();
  taskInfo.counters = event.getCounters();
}
 
/** Get the event type */
public EventType getEventType() {
  // Note that the task type can be setup/map/reduce/cleanup but the 
  // attempt-type can only be map/reduce.
  // find out if the task failed or got killed
  boolean failed = TaskStatus.State.FAILED.toString().equals(getTaskStatus());
  return getTaskId().getTaskType() == TaskType.MAP 
         ? (failed 
            ? EventType.MAP_ATTEMPT_FAILED
            : EventType.MAP_ATTEMPT_KILLED)
         : (failed
            ? EventType.REDUCE_ATTEMPT_FAILED
            : EventType.REDUCE_ATTEMPT_KILLED);
}
 
源代码8 项目: hadoop   文件: HistoryViewer.java
/**
 * Print the job/task/attempt summary information
 * @throws IOException
 */
public void print() throws IOException{
  printJobDetails();
  printTaskSummary();
  printJobAnalysis();
  printTasks(TaskType.JOB_SETUP, TaskStatus.State.FAILED.toString());
  printTasks(TaskType.JOB_SETUP, TaskStatus.State.KILLED.toString());
  printTasks(TaskType.MAP, TaskStatus.State.FAILED.toString());
  printTasks(TaskType.MAP, TaskStatus.State.KILLED.toString());
  printTasks(TaskType.REDUCE, TaskStatus.State.FAILED.toString());
  printTasks(TaskType.REDUCE, TaskStatus.State.KILLED.toString());
  printTasks(TaskType.JOB_CLEANUP, TaskStatus.State.FAILED.toString());
  printTasks(TaskType.JOB_CLEANUP, 
      JobStatus.getJobRunState(JobStatus.KILLED));
  if (printAll) {
    printTasks(TaskType.JOB_SETUP, TaskStatus.State.SUCCEEDED.toString());
    printTasks(TaskType.MAP, TaskStatus.State.SUCCEEDED.toString());
    printTasks(TaskType.REDUCE, TaskStatus.State.SUCCEEDED.toString());
    printTasks(TaskType.JOB_CLEANUP, TaskStatus.State.SUCCEEDED.toString());
    printAllTaskAttempts(TaskType.JOB_SETUP);
    printAllTaskAttempts(TaskType.MAP);
    printAllTaskAttempts(TaskType.REDUCE);
    printAllTaskAttempts(TaskType.JOB_CLEANUP);
  }
  
  FilteredJob filter = new FilteredJob(job, 
      TaskStatus.State.FAILED.toString());
  printFailedAttempts(filter);
  
  filter = new FilteredJob(job,
      TaskStatus.State.KILLED.toString());
  printFailedAttempts(filter);
}
 
源代码9 项目: hadoop   文件: ShuffleSchedulerImpl.java
public ShuffleSchedulerImpl(JobConf job, TaskStatus status,
                        TaskAttemptID reduceId,
                        ExceptionReporter reporter,
                        Progress progress,
                        Counters.Counter shuffledMapsCounter,
                        Counters.Counter reduceShuffleBytes,
                        Counters.Counter failedShuffleCounter) {
  totalMaps = job.getNumMapTasks();
  abortFailureLimit = Math.max(30, totalMaps / 10);
  copyTimeTracker = new CopyTimeTracker();
  remainingMaps = totalMaps;
  finishedMaps = new boolean[remainingMaps];
  this.reporter = reporter;
  this.status = status;
  this.reduceId = reduceId;
  this.progress = progress;
  this.shuffledMapsCounter = shuffledMapsCounter;
  this.reduceShuffleBytes = reduceShuffleBytes;
  this.failedShuffleCounter = failedShuffleCounter;
  this.startTime = Time.monotonicNow();
  lastProgressTime = startTime;
  referee.start();
  this.maxFailedUniqueFetches = Math.min(totalMaps, 5);
  this.maxFetchFailuresBeforeReporting = job.getInt(
      MRJobConfig.SHUFFLE_FETCH_FAILURES, REPORT_FAILURE_LIMIT);
  this.reportReadErrorImmediately = job.getBoolean(
      MRJobConfig.SHUFFLE_NOTIFY_READERROR, true);

  this.maxDelay = job.getLong(MRJobConfig.MAX_SHUFFLE_FETCH_RETRY_DELAY,
      MRJobConfig.DEFAULT_MAX_SHUFFLE_FETCH_RETRY_DELAY);
  this.maxHostFailures = job.getInt(
      MRJobConfig.MAX_SHUFFLE_FETCH_HOST_FAILURES,
      MRJobConfig.DEFAULT_MAX_SHUFFLE_FETCH_HOST_FAILURES);
}
 
源代码10 项目: hadoop   文件: TestShuffleScheduler.java
@SuppressWarnings("rawtypes")
@Test
public void testTipFailed() throws Exception {
  JobConf job = new JobConf();
  job.setNumMapTasks(2);

  TaskStatus status = new TaskStatus() {
    @Override
    public boolean getIsMap() {
      return false;
    }

    @Override
    public void addFetchFailedMap(TaskAttemptID mapTaskId) {
    }
  };
  Progress progress = new Progress();

  TaskAttemptID reduceId = new TaskAttemptID("314159", 0, TaskType.REDUCE,
      0, 0);
  ShuffleSchedulerImpl scheduler = new ShuffleSchedulerImpl(job, status,
      reduceId, null, progress, null, null, null);

  JobID jobId = new JobID();
  TaskID taskId1 = new TaskID(jobId, TaskType.REDUCE, 1);
  scheduler.tipFailed(taskId1);

  Assert.assertEquals("Progress should be 0.5", 0.5f, progress.getProgress(),
      0.0f);
  Assert.assertFalse(scheduler.waitUntilDone(1));

  TaskID taskId0 = new TaskID(jobId, TaskType.REDUCE, 0);
  scheduler.tipFailed(taskId0);
  Assert.assertEquals("Progress should be 1.0", 1.0f, progress.getProgress(),
      0.0f);
  Assert.assertTrue(scheduler.waitUntilDone(1));
}
 
源代码11 项目: big-c   文件: JobHistoryParser.java
private void handleTaskFinishedEvent(TaskFinishedEvent event) {
  TaskInfo taskInfo = info.tasksMap.get(event.getTaskId());
  taskInfo.counters = event.getCounters();
  taskInfo.finishTime = event.getFinishTime();
  taskInfo.status = TaskStatus.State.SUCCEEDED.toString();
  taskInfo.successfulAttemptId = event.getSuccessfulTaskAttemptId();
}
 
源代码12 项目: big-c   文件: JobHistoryParser.java
private void handleTaskFailedEvent(TaskFailedEvent event) {
  TaskInfo taskInfo = info.tasksMap.get(event.getTaskId());
  taskInfo.status = TaskStatus.State.FAILED.toString();
  taskInfo.finishTime = event.getFinishTime();
  taskInfo.error = StringInterner.weakIntern(event.getError());
  taskInfo.failedDueToAttemptId = event.getFailedAttemptID();
  taskInfo.counters = event.getCounters();
}
 
/** Get the event type */
public EventType getEventType() {
  // Note that the task type can be setup/map/reduce/cleanup but the 
  // attempt-type can only be map/reduce.
  // find out if the task failed or got killed
  boolean failed = TaskStatus.State.FAILED.toString().equals(getTaskStatus());
  return getTaskId().getTaskType() == TaskType.MAP 
         ? (failed 
            ? EventType.MAP_ATTEMPT_FAILED
            : EventType.MAP_ATTEMPT_KILLED)
         : (failed
            ? EventType.REDUCE_ATTEMPT_FAILED
            : EventType.REDUCE_ATTEMPT_KILLED);
}
 
源代码14 项目: big-c   文件: HistoryViewer.java
/**
 * Print the job/task/attempt summary information
 * @throws IOException
 */
public void print() throws IOException{
  printJobDetails();
  printTaskSummary();
  printJobAnalysis();
  printTasks(TaskType.JOB_SETUP, TaskStatus.State.FAILED.toString());
  printTasks(TaskType.JOB_SETUP, TaskStatus.State.KILLED.toString());
  printTasks(TaskType.MAP, TaskStatus.State.FAILED.toString());
  printTasks(TaskType.MAP, TaskStatus.State.KILLED.toString());
  printTasks(TaskType.REDUCE, TaskStatus.State.FAILED.toString());
  printTasks(TaskType.REDUCE, TaskStatus.State.KILLED.toString());
  printTasks(TaskType.JOB_CLEANUP, TaskStatus.State.FAILED.toString());
  printTasks(TaskType.JOB_CLEANUP, 
      JobStatus.getJobRunState(JobStatus.KILLED));
  if (printAll) {
    printTasks(TaskType.JOB_SETUP, TaskStatus.State.SUCCEEDED.toString());
    printTasks(TaskType.MAP, TaskStatus.State.SUCCEEDED.toString());
    printTasks(TaskType.REDUCE, TaskStatus.State.SUCCEEDED.toString());
    printTasks(TaskType.JOB_CLEANUP, TaskStatus.State.SUCCEEDED.toString());
    printAllTaskAttempts(TaskType.JOB_SETUP);
    printAllTaskAttempts(TaskType.MAP);
    printAllTaskAttempts(TaskType.REDUCE);
    printAllTaskAttempts(TaskType.JOB_CLEANUP);
  }
  
  FilteredJob filter = new FilteredJob(job, 
      TaskStatus.State.FAILED.toString());
  printFailedAttempts(filter);
  
  filter = new FilteredJob(job,
      TaskStatus.State.KILLED.toString());
  printFailedAttempts(filter);
}
 
源代码15 项目: big-c   文件: ShuffleSchedulerImpl.java
public ShuffleSchedulerImpl(JobConf job, TaskStatus status,
                        TaskAttemptID reduceId,
                        ExceptionReporter reporter,
                        Progress progress,
                        Counters.Counter shuffledMapsCounter,
                        Counters.Counter reduceShuffleBytes,
                        Counters.Counter failedShuffleCounter) {
  totalMaps = job.getNumMapTasks();
  abortFailureLimit = Math.max(30, totalMaps / 10);
  copyTimeTracker = new CopyTimeTracker();
  remainingMaps = totalMaps;
  finishedMaps = new boolean[remainingMaps];
  this.reporter = reporter;
  this.status = status;
  this.reduceId = reduceId;
  this.progress = progress;
  this.shuffledMapsCounter = shuffledMapsCounter;
  this.reduceShuffleBytes = reduceShuffleBytes;
  this.failedShuffleCounter = failedShuffleCounter;
  this.startTime = Time.monotonicNow();
  lastProgressTime = startTime;
  referee.start();
  this.maxFailedUniqueFetches = Math.min(totalMaps, 5);
  this.maxFetchFailuresBeforeReporting = job.getInt(
      MRJobConfig.SHUFFLE_FETCH_FAILURES, REPORT_FAILURE_LIMIT);
  this.reportReadErrorImmediately = job.getBoolean(
      MRJobConfig.SHUFFLE_NOTIFY_READERROR, true);

  this.maxDelay = job.getLong(MRJobConfig.MAX_SHUFFLE_FETCH_RETRY_DELAY,
      MRJobConfig.DEFAULT_MAX_SHUFFLE_FETCH_RETRY_DELAY);
  this.maxHostFailures = job.getInt(
      MRJobConfig.MAX_SHUFFLE_FETCH_HOST_FAILURES,
      MRJobConfig.DEFAULT_MAX_SHUFFLE_FETCH_HOST_FAILURES);
}
 
源代码16 项目: big-c   文件: TestShuffleScheduler.java
@SuppressWarnings("rawtypes")
@Test
public void testTipFailed() throws Exception {
  JobConf job = new JobConf();
  job.setNumMapTasks(2);

  TaskStatus status = new TaskStatus() {
    @Override
    public boolean getIsMap() {
      return false;
    }

    @Override
    public void addFetchFailedMap(TaskAttemptID mapTaskId) {
    }
  };
  Progress progress = new Progress();

  TaskAttemptID reduceId = new TaskAttemptID("314159", 0, TaskType.REDUCE,
      0, 0);
  ShuffleSchedulerImpl scheduler = new ShuffleSchedulerImpl(job, status,
      reduceId, null, progress, null, null, null);

  JobID jobId = new JobID();
  TaskID taskId1 = new TaskID(jobId, TaskType.REDUCE, 1);
  scheduler.tipFailed(taskId1);

  Assert.assertEquals("Progress should be 0.5", 0.5f, progress.getProgress(),
      0.0f);
  Assert.assertFalse(scheduler.waitUntilDone(1));

  TaskID taskId0 = new TaskID(jobId, TaskType.REDUCE, 0);
  scheduler.tipFailed(taskId0);
  Assert.assertEquals("Progress should be 1.0", 1.0f, progress.getProgress(),
      0.0f);
  Assert.assertTrue(scheduler.waitUntilDone(1));
}
 
源代码17 项目: hadoop   文件: JobHistoryParser.java
private void handleTaskAttemptFailedEvent(
    TaskAttemptUnsuccessfulCompletionEvent event) {
  TaskInfo taskInfo = info.tasksMap.get(event.getTaskId());
  if(taskInfo == null) {
    LOG.warn("TaskInfo is null for TaskAttemptUnsuccessfulCompletionEvent"
        + " taskId:  " + event.getTaskId().toString());
    return;
  }
  TaskAttemptInfo attemptInfo = 
    taskInfo.attemptsMap.get(event.getTaskAttemptId());
  if(attemptInfo == null) {
    LOG.warn("AttemptInfo is null for TaskAttemptUnsuccessfulCompletionEvent"
        + " taskAttemptId:  " + event.getTaskAttemptId().toString());
    return;
  }
  attemptInfo.finishTime = event.getFinishTime();
  attemptInfo.error = StringInterner.weakIntern(event.getError());
  attemptInfo.status = StringInterner.weakIntern(event.getTaskStatus());
  attemptInfo.hostname = StringInterner.weakIntern(event.getHostname());
  attemptInfo.port = event.getPort();
  attemptInfo.rackname = StringInterner.weakIntern(event.getRackName());
  attemptInfo.shuffleFinishTime = event.getFinishTime();
  attemptInfo.sortFinishTime = event.getFinishTime();
  attemptInfo.mapFinishTime = event.getFinishTime();
  attemptInfo.counters = event.getCounters();
  if(TaskStatus.State.SUCCEEDED.toString().equals(taskInfo.status))
  {
    //this is a successful task
    if(attemptInfo.getAttemptId().equals(taskInfo.getSuccessfulAttemptId()))
    {
      // the failed attempt is the one that made this task successful
      // so its no longer successful. Reset fields set in
      // handleTaskFinishedEvent()
      taskInfo.counters = null;
      taskInfo.finishTime = -1;
      taskInfo.status = null;
      taskInfo.successfulAttemptId = null;
    }
  }
  info.completedTaskAttemptsMap.put(event.getTaskAttemptId(), attemptInfo);
}
 
源代码18 项目: hadoop   文件: HistoryViewer.java
/** Create summary information for the parsed job */
public SummarizedJob(JobInfo job) {
  tasks = job.getAllTasks();

  for (JobHistoryParser.TaskInfo task : tasks.values()) {
    Map<TaskAttemptID, JobHistoryParser.TaskAttemptInfo> attempts = 
      task.getAllTaskAttempts();
    //allHosts.put(task.getHo(Keys.HOSTNAME), "");
    for (JobHistoryParser.TaskAttemptInfo attempt : attempts.values()) {
      long startTime = attempt.getStartTime(); 
      long finishTime = attempt.getFinishTime();
      if (attempt.getTaskType().equals(TaskType.MAP)) {
        if (mapStarted== 0 || mapStarted > startTime) {
          mapStarted = startTime; 
        }
        if (mapFinished < finishTime) {
          mapFinished = finishTime; 
        }
        totalMaps++; 
        if (attempt.getTaskStatus().equals
            (TaskStatus.State.FAILED.toString())) {
          numFailedMaps++; 
        } else if (attempt.getTaskStatus().equals
            (TaskStatus.State.KILLED.toString())) {
          numKilledMaps++;
        }
      } else if (attempt.getTaskType().equals(TaskType.REDUCE)) {
        if (reduceStarted==0||reduceStarted > startTime) {
          reduceStarted = startTime; 
        }
        if (reduceFinished < finishTime) {
          reduceFinished = finishTime; 
        }
        totalReduces++; 
        if (attempt.getTaskStatus().equals
            (TaskStatus.State.FAILED.toString())) {
          numFailedReduces++; 
        } else if (attempt.getTaskStatus().equals
            (TaskStatus.State.KILLED.toString())) {
          numKilledReduces++;
        }
      } else if (attempt.getTaskType().equals(TaskType.JOB_CLEANUP)) {
        if (cleanupStarted==0||cleanupStarted > startTime) {
          cleanupStarted = startTime; 
        }
        if (cleanupFinished < finishTime) {
          cleanupFinished = finishTime; 
        }
        totalCleanups++; 
        if (attempt.getTaskStatus().equals
            (TaskStatus.State.SUCCEEDED.toString())) {
          numFinishedCleanups++; 
        } else if (attempt.getTaskStatus().equals
            (TaskStatus.State.FAILED.toString())) {
          numFailedCleanups++;
        } else if (attempt.getTaskStatus().equals
            (TaskStatus.State.KILLED.toString())) {
          numKilledCleanups++;
        }
      } else if (attempt.getTaskType().equals(TaskType.JOB_SETUP)) {
        if (setupStarted==0||setupStarted > startTime) {
          setupStarted = startTime; 
        }
        if (setupFinished < finishTime) {
          setupFinished = finishTime; 
        }
        totalSetups++; 
        if (attempt.getTaskStatus().equals
            (TaskStatus.State.SUCCEEDED.toString())) {
          numFinishedSetups++;
        } else if (attempt.getTaskStatus().equals
            (TaskStatus.State.FAILED.toString())) {
          numFailedSetups++;
        } else if (attempt.getTaskStatus().equals
            (TaskStatus.State.KILLED.toString())) {
          numKilledSetups++;
        }
      }
    }
  }
}
 
源代码19 项目: hadoop   文件: HistoryViewer.java
/** Generate analysis information for the parsed job */
public AnalyzedJob (JobInfo job) {
  Map<TaskID, JobHistoryParser.TaskInfo> tasks = job.getAllTasks();
  int finishedMaps = (int) job.getFinishedMaps();
  int finishedReduces = (int) job.getFinishedReduces();
  mapTasks = 
    new JobHistoryParser.TaskAttemptInfo[finishedMaps]; 
  reduceTasks = 
    new JobHistoryParser.TaskAttemptInfo[finishedReduces]; 
  int mapIndex = 0 , reduceIndex=0; 
  avgMapTime = 0;
  avgReduceTime = 0;
  avgShuffleTime = 0;

  for (JobHistoryParser.TaskInfo task : tasks.values()) {
    Map<TaskAttemptID, JobHistoryParser.TaskAttemptInfo> attempts =
      task.getAllTaskAttempts();
    for (JobHistoryParser.TaskAttemptInfo attempt : attempts.values()) {
      if (attempt.getTaskStatus().
          equals(TaskStatus.State.SUCCEEDED.toString())) {
        long avgFinishTime = (attempt.getFinishTime() -
            attempt.getStartTime());
        if (attempt.getTaskType().equals(TaskType.MAP)) {
          mapTasks[mapIndex++] = attempt; 
          avgMapTime += avgFinishTime;
        } else if (attempt.getTaskType().equals(TaskType.REDUCE)) {
          reduceTasks[reduceIndex++] = attempt;
          avgShuffleTime += (attempt.getShuffleFinishTime() - 
              attempt.getStartTime());
          avgReduceTime += (attempt.getFinishTime() -
              attempt.getShuffleFinishTime());
        }
        break;
      }
    }
  }
  if (finishedMaps > 0) {
    avgMapTime /= finishedMaps;
  }
  if (finishedReduces > 0) {
    avgReduceTime /= finishedReduces;
    avgShuffleTime /= finishedReduces;
  }
}
 
源代码20 项目: hadoop   文件: Shuffle.java
@Override
public RawKeyValueIterator run() throws IOException, InterruptedException {
  // Scale the maximum events we fetch per RPC call to mitigate OOM issues
  // on the ApplicationMaster when a thundering herd of reducers fetch events
  // TODO: This should not be necessary after HADOOP-8942
  int eventsPerReducer = Math.max(MIN_EVENTS_TO_FETCH,
      MAX_RPC_OUTSTANDING_EVENTS / jobConf.getNumReduceTasks());
  int maxEventsToFetch = Math.min(MAX_EVENTS_TO_FETCH, eventsPerReducer);

  // Start the map-completion events fetcher thread
  final EventFetcher<K,V> eventFetcher = 
    new EventFetcher<K,V>(reduceId, umbilical, scheduler, this,
        maxEventsToFetch);
  eventFetcher.start();
  
  // Start the map-output fetcher threads
  boolean isLocal = localMapFiles != null;
  final int numFetchers = isLocal ? 1 :
    jobConf.getInt(MRJobConfig.SHUFFLE_PARALLEL_COPIES, 5);
  Fetcher<K,V>[] fetchers = new Fetcher[numFetchers];
  if (isLocal) {
    fetchers[0] = new LocalFetcher<K, V>(jobConf, reduceId, scheduler,
        merger, reporter, metrics, this, reduceTask.getShuffleSecret(),
        localMapFiles);
    fetchers[0].start();
  } else {
    for (int i=0; i < numFetchers; ++i) {
      fetchers[i] = new Fetcher<K,V>(jobConf, reduceId, scheduler, merger, 
                                     reporter, metrics, this, 
                                     reduceTask.getShuffleSecret());
      fetchers[i].start();
    }
  }
  
  // Wait for shuffle to complete successfully
  while (!scheduler.waitUntilDone(PROGRESS_FREQUENCY)) {
    reporter.progress();
    
    synchronized (this) {
      if (throwable != null) {
        throw new ShuffleError("error in shuffle in " + throwingThreadName,
                               throwable);
      }
    }
  }

  // Stop the event-fetcher thread
  eventFetcher.shutDown();
  
  // Stop the map-output fetcher threads
  for (Fetcher<K,V> fetcher : fetchers) {
    fetcher.shutDown();
  }
  
  // stop the scheduler
  scheduler.close();

  copyPhase.complete(); // copy is already complete
  taskStatus.setPhase(TaskStatus.Phase.SORT);
  reduceTask.statusUpdate(umbilical);

  // Finish the on-going merges...
  RawKeyValueIterator kvIter = null;
  try {
    kvIter = merger.close();
  } catch (Throwable e) {
    throw new ShuffleError("Error while doing final merge " , e);
  }

  // Sanity check
  synchronized (this) {
    if (throwable != null) {
      throw new ShuffleError("error in shuffle in " + throwingThreadName,
                             throwable);
    }
  }
  
  return kvIter;
}
 
源代码21 项目: hadoop   文件: TestShufflePlugin.java
@Test
/**
 * A testing method verifying availability and accessibility of API that is needed
 * for sub-classes of ShuffleConsumerPlugin
 */
public void testConsumerApi() {

  JobConf jobConf = new JobConf();
  ShuffleConsumerPlugin<K, V> shuffleConsumerPlugin = new TestShuffleConsumerPlugin<K, V>();

  //mock creation
  ReduceTask mockReduceTask = mock(ReduceTask.class);
  TaskUmbilicalProtocol mockUmbilical = mock(TaskUmbilicalProtocol.class);
  Reporter mockReporter = mock(Reporter.class);
  FileSystem mockFileSystem = mock(FileSystem.class);
  Class<? extends org.apache.hadoop.mapred.Reducer>  combinerClass = jobConf.getCombinerClass();
  @SuppressWarnings("unchecked")  // needed for mock with generic
  CombineOutputCollector<K, V>  mockCombineOutputCollector =
    (CombineOutputCollector<K, V>) mock(CombineOutputCollector.class);
  org.apache.hadoop.mapreduce.TaskAttemptID mockTaskAttemptID =
    mock(org.apache.hadoop.mapreduce.TaskAttemptID.class);
  LocalDirAllocator mockLocalDirAllocator = mock(LocalDirAllocator.class);
  CompressionCodec mockCompressionCodec = mock(CompressionCodec.class);
  Counter mockCounter = mock(Counter.class);
  TaskStatus mockTaskStatus = mock(TaskStatus.class);
  Progress mockProgress = mock(Progress.class);
  MapOutputFile mockMapOutputFile = mock(MapOutputFile.class);
  Task mockTask = mock(Task.class);

  try {
    String [] dirs = jobConf.getLocalDirs();
    // verify that these APIs are available through super class handler
    ShuffleConsumerPlugin.Context<K, V> context =
   new ShuffleConsumerPlugin.Context<K, V>(mockTaskAttemptID, jobConf, mockFileSystem,
                                              mockUmbilical, mockLocalDirAllocator,
                                              mockReporter, mockCompressionCodec,
                                              combinerClass, mockCombineOutputCollector,
                                              mockCounter, mockCounter, mockCounter,
                                              mockCounter, mockCounter, mockCounter,
                                              mockTaskStatus, mockProgress, mockProgress,
                                              mockTask, mockMapOutputFile, null);
    shuffleConsumerPlugin.init(context);
    shuffleConsumerPlugin.run();
    shuffleConsumerPlugin.close();
  }
  catch (Exception e) {
    assertTrue("Threw exception:" + e, false);
  }

  // verify that these APIs are available for 3rd party plugins
  mockReduceTask.getTaskID();
  mockReduceTask.getJobID();
  mockReduceTask.getNumMaps();
  mockReduceTask.getPartition();
  mockReporter.progress();
}
 
源代码22 项目: hadoop   文件: TestShuffleScheduler.java
@SuppressWarnings("rawtypes")
@Test
public <K, V> void TestSucceedAndFailedCopyMap() throws Exception {
  JobConf job = new JobConf();
  job.setNumMapTasks(2);
  //mock creation
  TaskUmbilicalProtocol mockUmbilical = mock(TaskUmbilicalProtocol.class);
  Reporter mockReporter = mock(Reporter.class);
  FileSystem mockFileSystem = mock(FileSystem.class);
  Class<? extends org.apache.hadoop.mapred.Reducer>  combinerClass = job.getCombinerClass();
  @SuppressWarnings("unchecked")  // needed for mock with generic
  CombineOutputCollector<K, V>  mockCombineOutputCollector =
      (CombineOutputCollector<K, V>) mock(CombineOutputCollector.class);
  org.apache.hadoop.mapreduce.TaskAttemptID mockTaskAttemptID =
      mock(org.apache.hadoop.mapreduce.TaskAttemptID.class);
  LocalDirAllocator mockLocalDirAllocator = mock(LocalDirAllocator.class);
  CompressionCodec mockCompressionCodec = mock(CompressionCodec.class);
  Counter mockCounter = mock(Counter.class);
  TaskStatus mockTaskStatus = mock(TaskStatus.class);
  Progress mockProgress = mock(Progress.class);
  MapOutputFile mockMapOutputFile = mock(MapOutputFile.class);
  Task mockTask = mock(Task.class);
  @SuppressWarnings("unchecked")
  MapOutput<K, V> output = mock(MapOutput.class);

  ShuffleConsumerPlugin.Context<K, V> context =
      new ShuffleConsumerPlugin.Context<K, V>(
          mockTaskAttemptID, job, mockFileSystem,
          mockUmbilical, mockLocalDirAllocator,
          mockReporter, mockCompressionCodec,
          combinerClass, mockCombineOutputCollector,
          mockCounter, mockCounter, mockCounter,
          mockCounter, mockCounter, mockCounter,
          mockTaskStatus, mockProgress, mockProgress,
          mockTask, mockMapOutputFile, null);
  TaskStatus status = new TaskStatus() {
    @Override
    public boolean getIsMap() {
      return false;
    }
    @Override
    public void addFetchFailedMap(TaskAttemptID mapTaskId) {
    }
  };
  Progress progress = new Progress();
  ShuffleSchedulerImpl<K, V> scheduler = new ShuffleSchedulerImpl<K, V>(job,
      status, null, null, progress, context.getShuffledMapsCounter(),
      context.getReduceShuffleBytes(), context.getFailedShuffleCounter());

  MapHost host1 = new MapHost("host1", null);
  TaskAttemptID failedAttemptID = new TaskAttemptID(
      new org.apache.hadoop.mapred.TaskID(
      new JobID("test",0), TaskType.MAP, 0), 0);

  TaskAttemptID succeedAttemptID = new TaskAttemptID(
      new org.apache.hadoop.mapred.TaskID(
      new JobID("test",0), TaskType.MAP, 1), 1);

  // handle output fetch failure for failedAttemptID, part I
  scheduler.hostFailed(host1.getHostName());

  // handle output fetch succeed for succeedAttemptID
  long bytes = (long)500 * 1024 * 1024;
  scheduler.copySucceeded(succeedAttemptID, host1, bytes, 0, 500000, output);

  // handle output fetch failure for failedAttemptID, part II
  // for MAPREDUCE-6361: verify no NPE exception get thrown out
  scheduler.copyFailed(failedAttemptID, host1, true, false);
}
 
源代码23 项目: big-c   文件: JobHistoryParser.java
private void handleTaskAttemptFailedEvent(
    TaskAttemptUnsuccessfulCompletionEvent event) {
  TaskInfo taskInfo = info.tasksMap.get(event.getTaskId());
  if(taskInfo == null) {
    LOG.warn("TaskInfo is null for TaskAttemptUnsuccessfulCompletionEvent"
        + " taskId:  " + event.getTaskId().toString());
    return;
  }
  TaskAttemptInfo attemptInfo = 
    taskInfo.attemptsMap.get(event.getTaskAttemptId());
  if(attemptInfo == null) {
    LOG.warn("AttemptInfo is null for TaskAttemptUnsuccessfulCompletionEvent"
        + " taskAttemptId:  " + event.getTaskAttemptId().toString());
    return;
  }
  attemptInfo.finishTime = event.getFinishTime();
  attemptInfo.error = StringInterner.weakIntern(event.getError());
  attemptInfo.status = StringInterner.weakIntern(event.getTaskStatus());
  attemptInfo.hostname = StringInterner.weakIntern(event.getHostname());
  attemptInfo.port = event.getPort();
  attemptInfo.rackname = StringInterner.weakIntern(event.getRackName());
  attemptInfo.shuffleFinishTime = event.getFinishTime();
  attemptInfo.sortFinishTime = event.getFinishTime();
  attemptInfo.mapFinishTime = event.getFinishTime();
  attemptInfo.counters = event.getCounters();
  if(TaskStatus.State.SUCCEEDED.toString().equals(taskInfo.status))
  {
    //this is a successful task
    if(attemptInfo.getAttemptId().equals(taskInfo.getSuccessfulAttemptId()))
    {
      // the failed attempt is the one that made this task successful
      // so its no longer successful. Reset fields set in
      // handleTaskFinishedEvent()
      taskInfo.counters = null;
      taskInfo.finishTime = -1;
      taskInfo.status = null;
      taskInfo.successfulAttemptId = null;
    }
  }
  info.completedTaskAttemptsMap.put(event.getTaskAttemptId(), attemptInfo);
}
 
源代码24 项目: big-c   文件: HistoryViewer.java
/** Create summary information for the parsed job */
public SummarizedJob(JobInfo job) {
  tasks = job.getAllTasks();

  for (JobHistoryParser.TaskInfo task : tasks.values()) {
    Map<TaskAttemptID, JobHistoryParser.TaskAttemptInfo> attempts = 
      task.getAllTaskAttempts();
    //allHosts.put(task.getHo(Keys.HOSTNAME), "");
    for (JobHistoryParser.TaskAttemptInfo attempt : attempts.values()) {
      long startTime = attempt.getStartTime(); 
      long finishTime = attempt.getFinishTime();
      if (attempt.getTaskType().equals(TaskType.MAP)) {
        if (mapStarted== 0 || mapStarted > startTime) {
          mapStarted = startTime; 
        }
        if (mapFinished < finishTime) {
          mapFinished = finishTime; 
        }
        totalMaps++; 
        if (attempt.getTaskStatus().equals
            (TaskStatus.State.FAILED.toString())) {
          numFailedMaps++; 
        } else if (attempt.getTaskStatus().equals
            (TaskStatus.State.KILLED.toString())) {
          numKilledMaps++;
        }
      } else if (attempt.getTaskType().equals(TaskType.REDUCE)) {
        if (reduceStarted==0||reduceStarted > startTime) {
          reduceStarted = startTime; 
        }
        if (reduceFinished < finishTime) {
          reduceFinished = finishTime; 
        }
        totalReduces++; 
        if (attempt.getTaskStatus().equals
            (TaskStatus.State.FAILED.toString())) {
          numFailedReduces++; 
        } else if (attempt.getTaskStatus().equals
            (TaskStatus.State.KILLED.toString())) {
          numKilledReduces++;
        }
      } else if (attempt.getTaskType().equals(TaskType.JOB_CLEANUP)) {
        if (cleanupStarted==0||cleanupStarted > startTime) {
          cleanupStarted = startTime; 
        }
        if (cleanupFinished < finishTime) {
          cleanupFinished = finishTime; 
        }
        totalCleanups++; 
        if (attempt.getTaskStatus().equals
            (TaskStatus.State.SUCCEEDED.toString())) {
          numFinishedCleanups++; 
        } else if (attempt.getTaskStatus().equals
            (TaskStatus.State.FAILED.toString())) {
          numFailedCleanups++;
        } else if (attempt.getTaskStatus().equals
            (TaskStatus.State.KILLED.toString())) {
          numKilledCleanups++;
        }
      } else if (attempt.getTaskType().equals(TaskType.JOB_SETUP)) {
        if (setupStarted==0||setupStarted > startTime) {
          setupStarted = startTime; 
        }
        if (setupFinished < finishTime) {
          setupFinished = finishTime; 
        }
        totalSetups++; 
        if (attempt.getTaskStatus().equals
            (TaskStatus.State.SUCCEEDED.toString())) {
          numFinishedSetups++;
        } else if (attempt.getTaskStatus().equals
            (TaskStatus.State.FAILED.toString())) {
          numFailedSetups++;
        } else if (attempt.getTaskStatus().equals
            (TaskStatus.State.KILLED.toString())) {
          numKilledSetups++;
        }
      }
    }
  }
}
 
源代码25 项目: big-c   文件: HistoryViewer.java
/** Generate analysis information for the parsed job */
public AnalyzedJob (JobInfo job) {
  Map<TaskID, JobHistoryParser.TaskInfo> tasks = job.getAllTasks();
  int finishedMaps = (int) job.getFinishedMaps();
  int finishedReduces = (int) job.getFinishedReduces();
  mapTasks = 
    new JobHistoryParser.TaskAttemptInfo[finishedMaps]; 
  reduceTasks = 
    new JobHistoryParser.TaskAttemptInfo[finishedReduces]; 
  int mapIndex = 0 , reduceIndex=0; 
  avgMapTime = 0;
  avgReduceTime = 0;
  avgShuffleTime = 0;

  for (JobHistoryParser.TaskInfo task : tasks.values()) {
    Map<TaskAttemptID, JobHistoryParser.TaskAttemptInfo> attempts =
      task.getAllTaskAttempts();
    for (JobHistoryParser.TaskAttemptInfo attempt : attempts.values()) {
      if (attempt.getTaskStatus().
          equals(TaskStatus.State.SUCCEEDED.toString())) {
        long avgFinishTime = (attempt.getFinishTime() -
            attempt.getStartTime());
        if (attempt.getTaskType().equals(TaskType.MAP)) {
          mapTasks[mapIndex++] = attempt; 
          avgMapTime += avgFinishTime;
        } else if (attempt.getTaskType().equals(TaskType.REDUCE)) {
          reduceTasks[reduceIndex++] = attempt;
          avgShuffleTime += (attempt.getShuffleFinishTime() - 
              attempt.getStartTime());
          avgReduceTime += (attempt.getFinishTime() -
              attempt.getShuffleFinishTime());
        }
        break;
      }
    }
  }
  if (finishedMaps > 0) {
    avgMapTime /= finishedMaps;
  }
  if (finishedReduces > 0) {
    avgReduceTime /= finishedReduces;
    avgShuffleTime /= finishedReduces;
  }
}
 
源代码26 项目: big-c   文件: Shuffle.java
@Override
public RawKeyValueIterator run() throws IOException, InterruptedException {
  // Scale the maximum events we fetch per RPC call to mitigate OOM issues
  // on the ApplicationMaster when a thundering herd of reducers fetch events
  // TODO: This should not be necessary after HADOOP-8942
  int eventsPerReducer = Math.max(MIN_EVENTS_TO_FETCH,
      MAX_RPC_OUTSTANDING_EVENTS / jobConf.getNumReduceTasks());
  int maxEventsToFetch = Math.min(MAX_EVENTS_TO_FETCH, eventsPerReducer);

  // Start the map-completion events fetcher thread
  final EventFetcher<K,V> eventFetcher = 
    new EventFetcher<K,V>(reduceId, umbilical, scheduler, this,
        maxEventsToFetch);
  eventFetcher.start();
  
  // Start the map-output fetcher threads
  boolean isLocal = localMapFiles != null;
  final int numFetchers = isLocal ? 1 :
    jobConf.getInt(MRJobConfig.SHUFFLE_PARALLEL_COPIES, 5);
  Fetcher<K,V>[] fetchers = new Fetcher[numFetchers];
  if (isLocal) {
    fetchers[0] = new LocalFetcher<K, V>(jobConf, reduceId, scheduler,
        merger, reporter, metrics, this, reduceTask.getShuffleSecret(),
        localMapFiles);
    fetchers[0].start();
  } else {
    for (int i=0; i < numFetchers; ++i) {
      fetchers[i] = new Fetcher<K,V>(jobConf, reduceId, scheduler, merger, 
                                     reporter, metrics, this, 
                                     reduceTask.getShuffleSecret());
      fetchers[i].start();
    }
  }
  
  // Wait for shuffle to complete successfully
  while (!scheduler.waitUntilDone(PROGRESS_FREQUENCY)) {
    reporter.progress();
    
    synchronized (this) {
      if (throwable != null) {
        throw new ShuffleError("error in shuffle in " + throwingThreadName,
                               throwable);
      }
    }
  }

  // Stop the event-fetcher thread
  eventFetcher.shutDown();
  
  // Stop the map-output fetcher threads
  for (Fetcher<K,V> fetcher : fetchers) {
    fetcher.shutDown();
  }
  
  // stop the scheduler
  scheduler.close();

  copyPhase.complete(); // copy is already complete
  taskStatus.setPhase(TaskStatus.Phase.SORT);
  reduceTask.statusUpdate(umbilical);

  // Finish the on-going merges...
  RawKeyValueIterator kvIter = null;
  try {
    kvIter = merger.close();
  } catch (Throwable e) {
    throw new ShuffleError("Error while doing final merge " , e);
  }

  // Sanity check
  synchronized (this) {
    if (throwable != null) {
      throw new ShuffleError("error in shuffle in " + throwingThreadName,
                             throwable);
    }
  }
  
  return kvIter;
}
 
源代码27 项目: big-c   文件: TestShufflePlugin.java
@Test
/**
 * A testing method verifying availability and accessibility of API that is needed
 * for sub-classes of ShuffleConsumerPlugin
 */
public void testConsumerApi() {

  JobConf jobConf = new JobConf();
  ShuffleConsumerPlugin<K, V> shuffleConsumerPlugin = new TestShuffleConsumerPlugin<K, V>();

  //mock creation
  ReduceTask mockReduceTask = mock(ReduceTask.class);
  TaskUmbilicalProtocol mockUmbilical = mock(TaskUmbilicalProtocol.class);
  Reporter mockReporter = mock(Reporter.class);
  FileSystem mockFileSystem = mock(FileSystem.class);
  Class<? extends org.apache.hadoop.mapred.Reducer>  combinerClass = jobConf.getCombinerClass();
  @SuppressWarnings("unchecked")  // needed for mock with generic
  CombineOutputCollector<K, V>  mockCombineOutputCollector =
    (CombineOutputCollector<K, V>) mock(CombineOutputCollector.class);
  org.apache.hadoop.mapreduce.TaskAttemptID mockTaskAttemptID =
    mock(org.apache.hadoop.mapreduce.TaskAttemptID.class);
  LocalDirAllocator mockLocalDirAllocator = mock(LocalDirAllocator.class);
  CompressionCodec mockCompressionCodec = mock(CompressionCodec.class);
  Counter mockCounter = mock(Counter.class);
  TaskStatus mockTaskStatus = mock(TaskStatus.class);
  Progress mockProgress = mock(Progress.class);
  MapOutputFile mockMapOutputFile = mock(MapOutputFile.class);
  Task mockTask = mock(Task.class);

  try {
    String [] dirs = jobConf.getLocalDirs();
    // verify that these APIs are available through super class handler
    ShuffleConsumerPlugin.Context<K, V> context =
   new ShuffleConsumerPlugin.Context<K, V>(mockTaskAttemptID, jobConf, mockFileSystem,
                                              mockUmbilical, mockLocalDirAllocator,
                                              mockReporter, mockCompressionCodec,
                                              combinerClass, mockCombineOutputCollector,
                                              mockCounter, mockCounter, mockCounter,
                                              mockCounter, mockCounter, mockCounter,
                                              mockTaskStatus, mockProgress, mockProgress,
                                              mockTask, mockMapOutputFile, null);
    shuffleConsumerPlugin.init(context);
    shuffleConsumerPlugin.run();
    shuffleConsumerPlugin.close();
  }
  catch (Exception e) {
    assertTrue("Threw exception:" + e, false);
  }

  // verify that these APIs are available for 3rd party plugins
  mockReduceTask.getTaskID();
  mockReduceTask.getJobID();
  mockReduceTask.getNumMaps();
  mockReduceTask.getPartition();
  mockReporter.progress();
}
 
源代码28 项目: big-c   文件: TestShuffleScheduler.java
@SuppressWarnings("rawtypes")
@Test
public <K, V> void TestSucceedAndFailedCopyMap() throws Exception {
  JobConf job = new JobConf();
  job.setNumMapTasks(2);
  //mock creation
  TaskUmbilicalProtocol mockUmbilical = mock(TaskUmbilicalProtocol.class);
  Reporter mockReporter = mock(Reporter.class);
  FileSystem mockFileSystem = mock(FileSystem.class);
  Class<? extends org.apache.hadoop.mapred.Reducer>  combinerClass = job.getCombinerClass();
  @SuppressWarnings("unchecked")  // needed for mock with generic
  CombineOutputCollector<K, V>  mockCombineOutputCollector =
      (CombineOutputCollector<K, V>) mock(CombineOutputCollector.class);
  org.apache.hadoop.mapreduce.TaskAttemptID mockTaskAttemptID =
      mock(org.apache.hadoop.mapreduce.TaskAttemptID.class);
  LocalDirAllocator mockLocalDirAllocator = mock(LocalDirAllocator.class);
  CompressionCodec mockCompressionCodec = mock(CompressionCodec.class);
  Counter mockCounter = mock(Counter.class);
  TaskStatus mockTaskStatus = mock(TaskStatus.class);
  Progress mockProgress = mock(Progress.class);
  MapOutputFile mockMapOutputFile = mock(MapOutputFile.class);
  Task mockTask = mock(Task.class);
  @SuppressWarnings("unchecked")
  MapOutput<K, V> output = mock(MapOutput.class);

  ShuffleConsumerPlugin.Context<K, V> context =
      new ShuffleConsumerPlugin.Context<K, V>(
          mockTaskAttemptID, job, mockFileSystem,
          mockUmbilical, mockLocalDirAllocator,
          mockReporter, mockCompressionCodec,
          combinerClass, mockCombineOutputCollector,
          mockCounter, mockCounter, mockCounter,
          mockCounter, mockCounter, mockCounter,
          mockTaskStatus, mockProgress, mockProgress,
          mockTask, mockMapOutputFile, null);
  TaskStatus status = new TaskStatus() {
    @Override
    public boolean getIsMap() {
      return false;
    }
    @Override
    public void addFetchFailedMap(TaskAttemptID mapTaskId) {
    }
  };
  Progress progress = new Progress();
  ShuffleSchedulerImpl<K, V> scheduler = new ShuffleSchedulerImpl<K, V>(job,
      status, null, null, progress, context.getShuffledMapsCounter(),
      context.getReduceShuffleBytes(), context.getFailedShuffleCounter());

  MapHost host1 = new MapHost("host1", null);
  TaskAttemptID failedAttemptID = new TaskAttemptID(
      new org.apache.hadoop.mapred.TaskID(
      new JobID("test",0), TaskType.MAP, 0), 0);

  TaskAttemptID succeedAttemptID = new TaskAttemptID(
      new org.apache.hadoop.mapred.TaskID(
      new JobID("test",0), TaskType.MAP, 1), 1);

  // handle output fetch failure for failedAttemptID, part I
  scheduler.hostFailed(host1.getHostName());

  // handle output fetch succeed for succeedAttemptID
  long bytes = (long)500 * 1024 * 1024;
  scheduler.copySucceeded(succeedAttemptID, host1, bytes, 0, 500000, output);

  // handle output fetch failure for failedAttemptID, part II
  // for MAPREDUCE-6361: verify no NPE exception get thrown out
  scheduler.copyFailed(failedAttemptID, host1, true, false);
}
 
源代码29 项目: RDFS   文件: SimulatorJobTracker.java
/**
 * The getMapCompletion method is intended to inform taskTrackes when to change the status
 * of reduce tasks from "shuffle" to "reduce".
 * For all reduce tasks in this TaskTracker that are
 * in the shuffle phase, getMapCompletionTasks finds the number of finished maps for 
 * this job from the jobInProgress object. If this
 * number equals the number of desired maps for this job, then it adds an 
 * AllMapsCompletedTaskAction for this reduce task-attempt.
 * 
 * @param status
 *            The status of the task tracker
 * @return List of TaskTrackerActions
 */
private List<TaskTrackerAction> getMapCompletionTasks(
    TaskTrackerStatus status, 
    List<TaskTrackerAction> tasksToKill) {
  boolean loggingEnabled = LOG.isDebugEnabled();
  
  // Build up the list of tasks about to be killed
  Set<TaskAttemptID> killedTasks = new HashSet<TaskAttemptID>();
  if (tasksToKill != null) {
    for (TaskTrackerAction taskToKill : tasksToKill) {
      killedTasks.add(((KillTaskAction)taskToKill).getTaskID());
    }
  }

  String trackerName = status.getTrackerName();

  List<TaskTrackerAction> actions = new ArrayList<TaskTrackerAction>();

  // loop through the list of task statuses
  for (TaskStatus report : status.getTaskReports()) {

    TaskAttemptID taskAttemptId = report.getTaskID();
    SimulatorJobInProgress job = getSimulatorJob(taskAttemptId.getJobID());
    
    if(job ==null) {
      // This job has completed before.
      // and this is a zombie reduce-task
      Set<JobID> jobsToCleanup = trackerToJobsToCleanup.get(trackerName);
      if (jobsToCleanup == null) {
        jobsToCleanup = new HashSet<JobID>();
        trackerToJobsToCleanup.put(trackerName, jobsToCleanup);
      }
      jobsToCleanup.add(taskAttemptId.getJobID());
      continue;
    }   
    JobStatus jobStatus = job.getStatus();
    TaskInProgress tip = taskidToTIPMap.get(taskAttemptId);

    // if the  job is running, attempt is running
    // no KillTask is being sent for this attempt
    // task is a reduce and attempt is in shuffle phase
    // this precludes sending both KillTask and AllMapsCompletion 
    // for same reduce-attempt 

    if (jobStatus.getRunState()== JobStatus.RUNNING && 
        tip.isRunningTask(taskAttemptId) &&
        !killedTasks.contains(taskAttemptId) && 
        !report.getIsMap() &&
        report.getPhase() == TaskStatus.Phase.SHUFFLE) {

      if (loggingEnabled) {
        LOG.debug("Need map-completion information for REDUCEattempt "
            + taskAttemptId + " in tracker " + trackerName);

        LOG.debug("getMapCompletion: job=" + job.getJobID() + " pendingMaps="
            + job.pendingMaps());
      }
      // Check whether the number of finishedMaps equals the
      // number of maps
      boolean canSendMapCompletion = false;
     
      canSendMapCompletion = (job.finishedMaps()==job.desiredMaps());	

      if (canSendMapCompletion) {
        if (loggingEnabled) {
          LOG.debug("Adding MapCompletion for taskAttempt " + taskAttemptId
              + " in tracker " + trackerName);

          LOG.debug("FinishedMaps for job:" + job.getJobID() + " is = "
              + job.finishedMaps() + "/" + job.desiredMaps());

          LOG.debug("AllMapsCompleted for task " + taskAttemptId + " time="
              + getClock().getTime());
        }
        actions.add(new AllMapsCompletedTaskAction(taskAttemptId));
      }
    }
  }
  return actions;
}
 
源代码30 项目: RDFS   文件: SimulatorJobTracker.java
@Override
void updateTaskStatuses(TaskTrackerStatus status) {
  boolean loggingEnabled = LOG.isDebugEnabled();
  String trackerName = status.getTrackerName();
  // loop through the list of task statuses
  if (loggingEnabled) {
    LOG.debug("Updating task statuses for tracker " + trackerName);
  }
  for (TaskStatus report : status.getTaskReports()) {
    report.setTaskTracker(trackerName);
    TaskAttemptID taskAttemptId = report.getTaskID();
    JobID jobid = taskAttemptId.getJobID();
    if (loggingEnabled) {
      LOG.debug("Updating status for job " + jobid + " for task = "
          + taskAttemptId + " status=" + report.getProgress()
          + " for tracker: " + trackerName);
    }
    SimulatorJobInProgress job = 
      getSimulatorJob(taskAttemptId.getJobID());

    if(job ==null) {
      // This job bas completed before.
      Set<JobID> jobsToCleanup = trackerToJobsToCleanup.get(trackerName);
      if (jobsToCleanup == null) {
        jobsToCleanup = new HashSet<JobID>();
        trackerToJobsToCleanup.put(trackerName, jobsToCleanup);
      }
      jobsToCleanup.add(taskAttemptId.getJobID());
      continue;
    }
    TaskInProgress tip = taskidToTIPMap.get(taskAttemptId);

    JobStatus prevStatus = (JobStatus) job.getStatus().clone();
    job.updateTaskStatus(tip, (TaskStatus) report.clone());
    JobStatus newStatus = (JobStatus) job.getStatus().clone();
    if (tip.isComplete()) {
      if (loggingEnabled) {
        LOG.debug("Completed task attempt " + taskAttemptId + " tracker:"
            + trackerName + " time=" + getClock().getTime());
      }
    }

    if (prevStatus.getRunState() != newStatus.getRunState()) {
      if (loggingEnabled) {
        LOG.debug("Informing Listeners of job " + jobid + " of newStatus "
            + JobStatus.getJobRunState(newStatus.getRunState()));
      }
      JobStatusChangeEvent event = new JobStatusChangeEvent(job,
          EventType.RUN_STATE_CHANGED, prevStatus, newStatus);

      updateJobInProgressListeners(event);
    }

  }
}
 
 类所在包
 类方法
 同包方法