类org.apache.hadoop.mapreduce.jobhistory.JobHistoryParser源码实例Demo

下面列出了怎么用org.apache.hadoop.mapreduce.jobhistory.JobHistoryParser的API类实例代码及写法,或者点击链接到github查看源代码。

源代码1 项目: hadoop   文件: TestJobHistoryParsing.java
@Test
public void testFailedJobHistoryWithoutDiagnostics() throws Exception {
  final Path histPath = new Path(getClass().getClassLoader().getResource(
      "job_1393307629410_0001-1393307687476-user-Sleep+job-1393307723835-0-0-FAILED-default-1393307693920.jhist")
      .getFile());
  final FileSystem lfs = FileSystem.getLocal(new Configuration());
  final FSDataInputStream fsdis = lfs.open(histPath);
  try {
    JobHistoryParser parser = new JobHistoryParser(fsdis);
    JobInfo info = parser.parse();
    assertEquals("History parsed jobId incorrectly",
        info.getJobId(), JobID.forName("job_1393307629410_0001") );
    assertEquals("Default diagnostics incorrect ", "", info.getErrorInfo());
  } finally {
    fsdis.close();
  }
}
 
源代码2 项目: big-c   文件: TestJobHistoryParsing.java
@Test
public void testFailedJobHistoryWithoutDiagnostics() throws Exception {
  final Path histPath = new Path(getClass().getClassLoader().getResource(
      "job_1393307629410_0001-1393307687476-user-Sleep+job-1393307723835-0-0-FAILED-default-1393307693920.jhist")
      .getFile());
  final FileSystem lfs = FileSystem.getLocal(new Configuration());
  final FSDataInputStream fsdis = lfs.open(histPath);
  try {
    JobHistoryParser parser = new JobHistoryParser(fsdis);
    JobInfo info = parser.parse();
    assertEquals("History parsed jobId incorrectly",
        info.getJobId(), JobID.forName("job_1393307629410_0001") );
    assertEquals("Default diagnostics incorrect ", "", info.getErrorInfo());
  } finally {
    fsdis.close();
  }
}
 
源代码3 项目: dr-elephant   文件: MapReduceFSFetcherHadoop2.java
private void populateJobFromJobInfo(AnalyticJob job, JobHistoryParser.JobInfo jobInfo) {
  if(job.getStartTime() <= 0) {
    job.setStartTime(jobInfo.getSubmitTime());
  }
  if(job.getFinishTime() <= 0) {
    job.setFinishTime(jobInfo.getFinishTime());
  }
  if(job.getQueueName() == null || job.getQueueName().isEmpty()) {
    job.setQueueName(jobInfo.getJobQueueName());
  }
  if (job.getUser() == null || job.getUser().isEmpty()) {
    job.setUser(jobInfo.getUsername());
  }
  if (job.getName() == null || job.getName().isEmpty()) {
    job.setName(jobInfo.getJobname());
  }
}
 
源代码4 项目: dr-elephant   文件: MapReduceFSFetcherHadoop2.java
private long[] getTaskExecTime(JobHistoryParser.TaskAttemptInfo attempInfo) {
  long startTime = attempInfo.getStartTime();
  long finishTime = attempInfo.getFinishTime();
  boolean isMapper = (attempInfo.getTaskType() == TaskType.MAP);

  long[] time;
  if (isMapper) {
    time = new long[]{finishTime - startTime, 0, 0, startTime, finishTime};
  } else {
    long shuffleFinishTime = attempInfo.getShuffleFinishTime();
    long mergeFinishTime = attempInfo.getSortFinishTime();
    time = new long[]{finishTime - startTime, shuffleFinishTime - startTime,
            mergeFinishTime - shuffleFinishTime, startTime, finishTime};
  }
  return time;
}
 
源代码5 项目: hadoop   文件: CompletedJob.java
@Override
public List<AMInfo> getAMInfos() {
  List<AMInfo> amInfos = new LinkedList<AMInfo>();
  for (org.apache.hadoop.mapreduce.jobhistory.JobHistoryParser.AMInfo jhAmInfo : jobInfo
      .getAMInfos()) {
    AMInfo amInfo =
        MRBuilderUtils.newAMInfo(jhAmInfo.getAppAttemptId(),
            jhAmInfo.getStartTime(), jhAmInfo.getContainerId(),
            jhAmInfo.getNodeManagerHost(), jhAmInfo.getNodeManagerPort(),
            jhAmInfo.getNodeManagerHttpPort());
 
    amInfos.add(amInfo);
  }
  return amInfos;
}
 
源代码6 项目: hadoop   文件: TestJobHistoryParsing.java
/**
 * Test compatibility of JobHistoryParser with 2.0.3-alpha history files
 * @throws IOException
 */
@Test
public void testTaskAttemptUnsuccessfulCompletionWithoutCounters203() throws IOException 
  { 
    Path histPath = new Path(getClass().getClassLoader().getResource(
      "job_2.0.3-alpha-FAILED.jhist").getFile());
    JobHistoryParser parser = new JobHistoryParser(FileSystem.getLocal
        (new Configuration()), histPath);
    JobInfo jobInfo = parser.parse(); 
    LOG.info(" job info: " + jobInfo.getJobname() + " "
      + jobInfo.getFinishedMaps() + " " 
      + jobInfo.getTotalMaps() + " " 
      + jobInfo.getJobId() ) ;
  }
 
源代码7 项目: hadoop   文件: TestJobHistoryParsing.java
/**
 * Test compatibility of JobHistoryParser with 2.4.0 history files
 * @throws IOException
 */
@Test
public void testTaskAttemptUnsuccessfulCompletionWithoutCounters240() throws IOException 
  {
    Path histPath = new Path(getClass().getClassLoader().getResource(
      "job_2.4.0-FAILED.jhist").getFile());
    JobHistoryParser parser = new JobHistoryParser(FileSystem.getLocal
        (new Configuration()), histPath);
    JobInfo jobInfo = parser.parse(); 
    LOG.info(" job info: " + jobInfo.getJobname() + " "
      + jobInfo.getFinishedMaps() + " "
      + jobInfo.getTotalMaps() + " "
      + jobInfo.getJobId() );
  }
 
源代码8 项目: hadoop   文件: TestJobHistoryParsing.java
/**
 * Test compatibility of JobHistoryParser with 0.23.9 history files
 * @throws IOException
 */
@Test
public void testTaskAttemptUnsuccessfulCompletionWithoutCounters0239() throws IOException 
  {
    Path histPath = new Path(getClass().getClassLoader().getResource(
        "job_0.23.9-FAILED.jhist").getFile());
    JobHistoryParser parser = new JobHistoryParser(FileSystem.getLocal
        (new Configuration()), histPath);
    JobInfo jobInfo = parser.parse(); 
    LOG.info(" job info: " + jobInfo.getJobname() + " "
      + jobInfo.getFinishedMaps() + " " 
      + jobInfo.getTotalMaps() + " " 
      + jobInfo.getJobId() ) ;
    }
 
源代码9 项目: big-c   文件: CompletedJob.java
@Override
public List<AMInfo> getAMInfos() {
  List<AMInfo> amInfos = new LinkedList<AMInfo>();
  for (org.apache.hadoop.mapreduce.jobhistory.JobHistoryParser.AMInfo jhAmInfo : jobInfo
      .getAMInfos()) {
    AMInfo amInfo =
        MRBuilderUtils.newAMInfo(jhAmInfo.getAppAttemptId(),
            jhAmInfo.getStartTime(), jhAmInfo.getContainerId(),
            jhAmInfo.getNodeManagerHost(), jhAmInfo.getNodeManagerPort(),
            jhAmInfo.getNodeManagerHttpPort());
 
    amInfos.add(amInfo);
  }
  return amInfos;
}
 
源代码10 项目: big-c   文件: TestJobHistoryParsing.java
/**
 * Test compatibility of JobHistoryParser with 2.0.3-alpha history files
 * @throws IOException
 */
@Test
public void testTaskAttemptUnsuccessfulCompletionWithoutCounters203() throws IOException 
  { 
    Path histPath = new Path(getClass().getClassLoader().getResource(
      "job_2.0.3-alpha-FAILED.jhist").getFile());
    JobHistoryParser parser = new JobHistoryParser(FileSystem.getLocal
        (new Configuration()), histPath);
    JobInfo jobInfo = parser.parse(); 
    LOG.info(" job info: " + jobInfo.getJobname() + " "
      + jobInfo.getFinishedMaps() + " " 
      + jobInfo.getTotalMaps() + " " 
      + jobInfo.getJobId() ) ;
  }
 
源代码11 项目: big-c   文件: TestJobHistoryParsing.java
/**
 * Test compatibility of JobHistoryParser with 2.4.0 history files
 * @throws IOException
 */
@Test
public void testTaskAttemptUnsuccessfulCompletionWithoutCounters240() throws IOException 
  {
    Path histPath = new Path(getClass().getClassLoader().getResource(
      "job_2.4.0-FAILED.jhist").getFile());
    JobHistoryParser parser = new JobHistoryParser(FileSystem.getLocal
        (new Configuration()), histPath);
    JobInfo jobInfo = parser.parse(); 
    LOG.info(" job info: " + jobInfo.getJobname() + " "
      + jobInfo.getFinishedMaps() + " "
      + jobInfo.getTotalMaps() + " "
      + jobInfo.getJobId() );
  }
 
源代码12 项目: big-c   文件: TestJobHistoryParsing.java
/**
 * Test compatibility of JobHistoryParser with 0.23.9 history files
 * @throws IOException
 */
@Test
public void testTaskAttemptUnsuccessfulCompletionWithoutCounters0239() throws IOException 
  {
    Path histPath = new Path(getClass().getClassLoader().getResource(
        "job_0.23.9-FAILED.jhist").getFile());
    JobHistoryParser parser = new JobHistoryParser(FileSystem.getLocal
        (new Configuration()), histPath);
    JobInfo jobInfo = parser.parse(); 
    LOG.info(" job info: " + jobInfo.getJobname() + " "
      + jobInfo.getFinishedMaps() + " " 
      + jobInfo.getTotalMaps() + " " 
      + jobInfo.getJobId() ) ;
    }
 
源代码13 项目: dr-elephant   文件: MapReduceFSFetcherHadoop2.java
protected MapReduceTaskData[] getTaskData(String jobId, List<JobHistoryParser.TaskInfo> infoList) {
  int sampleSize = sampleAndGetSize(jobId, infoList);

  List<MapReduceTaskData> taskList = new ArrayList<MapReduceTaskData>();
  for (int i = 0; i < sampleSize; i++) {
    JobHistoryParser.TaskInfo tInfo = infoList.get(i);

    String taskId = tInfo.getTaskId().toString();
    TaskAttemptID attemptId = null;
    if(tInfo.getTaskStatus().equals("SUCCEEDED")) {
      attemptId = tInfo.getSuccessfulAttemptId();
    } else {
      attemptId = tInfo.getFailedDueToAttemptId();
    }

    MapReduceTaskData taskData = new MapReduceTaskData(taskId, attemptId == null ? "" : attemptId.toString() , tInfo.getTaskStatus());

    MapReduceCounterData taskCounterData = getCounterData(tInfo.getCounters());

    long[] taskExecTime = null;
    if (attemptId != null) {
      taskExecTime = getTaskExecTime(tInfo.getAllTaskAttempts().get(attemptId));
    }

    taskData.setTimeAndCounter(taskExecTime, taskCounterData);
    taskList.add(taskData);
  }
  return taskList.toArray(new MapReduceTaskData[taskList.size()]);
}
 
@Test
public void testGetTaskData() {
  FetcherConfiguration fetcherConf = new FetcherConfiguration(document9.getDocumentElement());

  try {
    MapReduceFSFetcherHadoop2 fetcher = new MapReduceFSFetcherHadoop2(
            fetcherConf.getFetchersConfigurationData().get(0));
    String jobId = "job_14000_001";
    List<JobHistoryParser.TaskInfo> infoList = new ArrayList<JobHistoryParser.TaskInfo>();
    infoList.add(new MockTaskInfo(1, true));
    infoList.add(new MockTaskInfo(2, false));

    MapReduceTaskData[] taskList = fetcher.getTaskData(jobId, infoList);
    Assert.assertNotNull("taskList should not be null.", taskList);
    int succeededTaskCount = 0;
    for (MapReduceTaskData task : taskList) {
      Assert.assertNotNull("Null pointer in taskList.", task);
      if(task.getState().equals("SUCCEEDED")) {
        succeededTaskCount++;
      }
    }
    Assert.assertEquals("Should have total two tasks.", 2, taskList.length);
    Assert.assertEquals("Should have only one succeeded task.", 1, succeededTaskCount);
  } catch (IOException e) {
    Assert.assertNull("Failed to initialize FileSystem.", e);
  }
}
 
public MockTaskInfo(int id, boolean succeeded) {
  this.taskId = new TaskID("job1", 1, TaskType.MAP, id);
  this.taskType = TaskType.MAP;
  this.succeeded = succeeded;
  this.counters = new Counters();
  this.finishTime = System.currentTimeMillis();
  this.startTime = finishTime - 10000;
  this.failedDueToAttemptId = new TaskAttemptID(taskId, 0);
  this.successfulAttemptId = new TaskAttemptID(taskId, 1);
  this.attemptsMap = new HashMap<TaskAttemptID, JobHistoryParser.TaskAttemptInfo>();
  this.attemptsMap.put(failedDueToAttemptId, new JobHistoryParser.TaskAttemptInfo());
  this.attemptsMap.put(successfulAttemptId, new JobHistoryParser.TaskAttemptInfo());
}
 
源代码16 项目: hadoop   文件: MRAppMaster.java
private void parsePreviousJobHistory() throws IOException {
  FSDataInputStream in = getPreviousJobHistoryStream(getConfig(),
      appAttemptID);
  JobHistoryParser parser = new JobHistoryParser(in);
  JobInfo jobInfo = parser.parse();
  Exception parseException = parser.getParseException();
  if (parseException != null) {
    LOG.info("Got an error parsing job-history file" +
        ", ignoring incomplete events.", parseException);
  }
  Map<org.apache.hadoop.mapreduce.TaskID, TaskInfo> taskInfos = jobInfo
      .getAllTasks();
  for (TaskInfo taskInfo : taskInfos.values()) {
    if (TaskState.SUCCEEDED.toString().equals(taskInfo.getTaskStatus())) {
      Iterator<Entry<TaskAttemptID, TaskAttemptInfo>> taskAttemptIterator =
          taskInfo.getAllTaskAttempts().entrySet().iterator();
      while (taskAttemptIterator.hasNext()) {
        Map.Entry<TaskAttemptID, TaskAttemptInfo> currentEntry = taskAttemptIterator.next();
        if (!jobInfo.getAllCompletedTaskAttempts().containsKey(currentEntry.getKey())) {
          taskAttemptIterator.remove();
        }
      }
      completedTasksFromPreviousRun
          .put(TypeConverter.toYarn(taskInfo.getTaskId()), taskInfo);
      LOG.info("Read from history task "
          + TypeConverter.toYarn(taskInfo.getTaskId()));
    }
  }
  LOG.info("Read completed tasks from history "
      + completedTasksFromPreviousRun.size());
  recoveredJobStartTime = jobInfo.getLaunchTime();

  // recover AMInfos
  List<JobHistoryParser.AMInfo> jhAmInfoList = jobInfo.getAMInfos();
  if (jhAmInfoList != null) {
    for (JobHistoryParser.AMInfo jhAmInfo : jhAmInfoList) {
      AMInfo amInfo = MRBuilderUtils.newAMInfo(jhAmInfo.getAppAttemptId(),
          jhAmInfo.getStartTime(), jhAmInfo.getContainerId(),
          jhAmInfo.getNodeManagerHost(), jhAmInfo.getNodeManagerPort(),
          jhAmInfo.getNodeManagerHttpPort());
      amInfos.add(amInfo);
    }
  }
}
 
源代码17 项目: hadoop   文件: TestJobHistoryParsing.java
@Test
public void testMultipleFailedTasks() throws Exception {
  JobHistoryParser parser =
      new JobHistoryParser(Mockito.mock(FSDataInputStream.class));
  EventReader reader = Mockito.mock(EventReader.class);
  final AtomicInteger numEventsRead = new AtomicInteger(0); // Hack!
  final org.apache.hadoop.mapreduce.TaskType taskType =
      org.apache.hadoop.mapreduce.TaskType.MAP;
  final TaskID[] tids = new TaskID[2];
  final JobID jid = new JobID("1", 1);
  tids[0] = new TaskID(jid, taskType, 0);
  tids[1] = new TaskID(jid, taskType, 1);
  Mockito.when(reader.getNextEvent()).thenAnswer(
      new Answer<HistoryEvent>() {
        public HistoryEvent answer(InvocationOnMock invocation)
            throws IOException {
          // send two task start and two task fail events for tasks 0 and 1
          int eventId = numEventsRead.getAndIncrement();
          TaskID tid = tids[eventId & 0x1];
          if (eventId < 2) {
            return new TaskStartedEvent(tid, 0, taskType, "");
          }
          if (eventId < 4) {
            TaskFailedEvent tfe = new TaskFailedEvent(tid, 0, taskType,
                "failed", "FAILED", null, new Counters());
            tfe.setDatum(tfe.getDatum());
            return tfe;
          }
          if (eventId < 5) {
            JobUnsuccessfulCompletionEvent juce =
                new JobUnsuccessfulCompletionEvent(jid, 100L, 2, 0,
                    "JOB_FAILED", Collections.singletonList(
                        "Task failed: " + tids[0].toString()));
            return juce;
          }
          return null;
        }
      });
  JobInfo info = parser.parse(reader);
  assertTrue("Task 0 not implicated",
      info.getErrorInfo().contains(tids[0].toString()));
}
 
源代码18 项目: big-c   文件: MRAppMaster.java
private void parsePreviousJobHistory() throws IOException {
  FSDataInputStream in = getPreviousJobHistoryStream(getConfig(),
      appAttemptID);
  JobHistoryParser parser = new JobHistoryParser(in);
  JobInfo jobInfo = parser.parse();
  Exception parseException = parser.getParseException();
  if (parseException != null) {
    LOG.info("Got an error parsing job-history file" +
        ", ignoring incomplete events.", parseException);
  }
  Map<org.apache.hadoop.mapreduce.TaskID, TaskInfo> taskInfos = jobInfo
      .getAllTasks();
  for (TaskInfo taskInfo : taskInfos.values()) {
    if (TaskState.SUCCEEDED.toString().equals(taskInfo.getTaskStatus())) {
      Iterator<Entry<TaskAttemptID, TaskAttemptInfo>> taskAttemptIterator =
          taskInfo.getAllTaskAttempts().entrySet().iterator();
      while (taskAttemptIterator.hasNext()) {
        Map.Entry<TaskAttemptID, TaskAttemptInfo> currentEntry = taskAttemptIterator.next();
        if (!jobInfo.getAllCompletedTaskAttempts().containsKey(currentEntry.getKey())) {
          taskAttemptIterator.remove();
        }
      }
      completedTasksFromPreviousRun
          .put(TypeConverter.toYarn(taskInfo.getTaskId()), taskInfo);
      LOG.info("Read from history task "
          + TypeConverter.toYarn(taskInfo.getTaskId()));
    }
  }
  LOG.info("Read completed tasks from history "
      + completedTasksFromPreviousRun.size());
  recoveredJobStartTime = jobInfo.getLaunchTime();

  // recover AMInfos
  List<JobHistoryParser.AMInfo> jhAmInfoList = jobInfo.getAMInfos();
  if (jhAmInfoList != null) {
    for (JobHistoryParser.AMInfo jhAmInfo : jhAmInfoList) {
      AMInfo amInfo = MRBuilderUtils.newAMInfo(jhAmInfo.getAppAttemptId(),
          jhAmInfo.getStartTime(), jhAmInfo.getContainerId(),
          jhAmInfo.getNodeManagerHost(), jhAmInfo.getNodeManagerPort(),
          jhAmInfo.getNodeManagerHttpPort());
      amInfos.add(amInfo);
    }
  }
}
 
源代码19 项目: big-c   文件: TestJobHistoryParsing.java
@Test
public void testMultipleFailedTasks() throws Exception {
  JobHistoryParser parser =
      new JobHistoryParser(Mockito.mock(FSDataInputStream.class));
  EventReader reader = Mockito.mock(EventReader.class);
  final AtomicInteger numEventsRead = new AtomicInteger(0); // Hack!
  final org.apache.hadoop.mapreduce.TaskType taskType =
      org.apache.hadoop.mapreduce.TaskType.MAP;
  final TaskID[] tids = new TaskID[2];
  final JobID jid = new JobID("1", 1);
  tids[0] = new TaskID(jid, taskType, 0);
  tids[1] = new TaskID(jid, taskType, 1);
  Mockito.when(reader.getNextEvent()).thenAnswer(
      new Answer<HistoryEvent>() {
        public HistoryEvent answer(InvocationOnMock invocation)
            throws IOException {
          // send two task start and two task fail events for tasks 0 and 1
          int eventId = numEventsRead.getAndIncrement();
          TaskID tid = tids[eventId & 0x1];
          if (eventId < 2) {
            return new TaskStartedEvent(tid, 0, taskType, "");
          }
          if (eventId < 4) {
            TaskFailedEvent tfe = new TaskFailedEvent(tid, 0, taskType,
                "failed", "FAILED", null, new Counters());
            tfe.setDatum(tfe.getDatum());
            return tfe;
          }
          if (eventId < 5) {
            JobUnsuccessfulCompletionEvent juce =
                new JobUnsuccessfulCompletionEvent(jid, 100L, 2, 0,
                    "JOB_FAILED", Collections.singletonList(
                        "Task failed: " + tids[0].toString()));
            return juce;
          }
          return null;
        }
      });
  JobInfo info = parser.parse(reader);
  assertTrue("Task 0 not implicated",
      info.getErrorInfo().contains(tids[0].toString()));
}
 
源代码20 项目: dr-elephant   文件: MapReduceFSFetcherHadoop2.java
@Override
public MapReduceApplicationData fetchData(AnalyticJob job) throws IOException {
  DataFiles files = getHistoryFiles(job);
  String confFile = files.getJobConfPath();
  String histFile = files.getJobHistPath();
  String appId = job.getAppId();
  String jobId = Utils.getJobIdFromApplicationId(appId);

  MapReduceApplicationData jobData = new MapReduceApplicationData();
  jobData.setAppId(appId).setJobId(jobId);

  // Fetch job config
  Configuration jobConf = new Configuration(false);
  jobConf.addResource(_fs.open(new Path(confFile)), confFile);
  Properties jobConfProperties = new Properties();
  for (Map.Entry<String, String> entry : jobConf) {
    jobConfProperties.put(entry.getKey(), entry.getValue());
  }
  jobData.setJobConf(jobConfProperties);

  // Check if job history file is too large and should be throttled
  if (_fs.getFileStatus(new Path(histFile)).getLen() > _maxLogSizeInMB * FileUtils.ONE_MB) {
    String errMsg =
        "The history log of MapReduce application: " + appId + " is over the limit size of " + _maxLogSizeInMB + " MB, the parsing process gets throttled.";
    logger.warn(errMsg);
    jobData.setDiagnosticInfo(errMsg);
    jobData.setSucceeded(false);  // set succeeded to false to avoid heuristic analysis
    return jobData;
  }

  // Analyze job history file
  JobHistoryParser parser = new JobHistoryParser(_fs, histFile);
  JobHistoryParser.JobInfo jobInfo = parser.parse();
  IOException parseException = parser.getParseException();
  if (parseException != null) {
    throw new RuntimeException("Could not parse history file " + histFile, parseException);
  }
  // Populate missing fields from parsed job info. This info will be missing for backfilled jobs.
  populateJobFromJobInfo(job, jobInfo);

  jobData.setSubmitTime(jobInfo.getSubmitTime());
  jobData.setStartTime(jobInfo.getLaunchTime());
  jobData.setFinishTime(jobInfo.getFinishTime());

  String state = jobInfo.getJobStatus();
  if (state.equals("SUCCEEDED")) {
    jobData.setSucceeded(true);
  } else if (state.equals("FAILED")) {
    jobData.setSucceeded(false);
    jobData.setDiagnosticInfo(jobInfo.getErrorInfo());
  } else {
    throw new RuntimeException("job neither succeeded or failed. can not process it ");
  }

  // Fetch job counter
  MapReduceCounterData jobCounter = getCounterData(jobInfo.getTotalCounters());

  // Fetch task data
  Map<TaskID, JobHistoryParser.TaskInfo> allTasks = jobInfo.getAllTasks();
  List<JobHistoryParser.TaskInfo> mapperInfoList = new ArrayList<JobHistoryParser.TaskInfo>();
  List<JobHistoryParser.TaskInfo> reducerInfoList = new ArrayList<JobHistoryParser.TaskInfo>();
  for (JobHistoryParser.TaskInfo taskInfo : allTasks.values()) {
    if (taskInfo.getTaskType() == TaskType.MAP) {
      mapperInfoList.add(taskInfo);
    } else {
      reducerInfoList.add(taskInfo);
    }
  }
  if (jobInfo.getTotalMaps() > MAX_SAMPLE_SIZE) {
    logger.debug(jobId + " total mappers: " + mapperInfoList.size());
  }
  if (jobInfo.getTotalReduces() > MAX_SAMPLE_SIZE) {
    logger.debug(jobId + " total reducers: " + reducerInfoList.size());
  }
  MapReduceTaskData[] mapperList = getTaskData(jobId, mapperInfoList);
  MapReduceTaskData[] reducerList = getTaskData(jobId, reducerInfoList);

  jobData.setCounters(jobCounter).setMapperData(mapperList).setReducerData(reducerList);

  return jobData;
}
 
public Map<TaskAttemptID, JobHistoryParser.TaskAttemptInfo> getAllTaskAttempts() {
  return attemptsMap;
}
 
 同包方法