类org.apache.hadoop.mapred.JobHistory源码实例Demo

下面列出了怎么用org.apache.hadoop.mapred.JobHistory的API类实例代码及写法,或者点击链接到github查看源代码。

源代码1 项目: hiped2   文件: TaskThroughput.java
public static void printTasks(JobHistory.JobInfo job)
    throws ParseException, IllegalAccessException,
    InvocationTargetException, NoSuchMethodException {

  List<TaskMetrics> mapMetrics = JobHistoryHelper.getMapTaskMetrics(
      job);
  List<TaskMetrics> reduceMetrics = JobHistoryHelper.getReduceTaskMetrics(
      job);

  decorateHeader("MAP TASKS");

  dumpTasks(mapMetrics, "throughput",
      new TaskMetrics.OverallThroughputComparator(), "getOverallThroughputBytesPerSecond",
      false, false);

  decorateHeader("REDUCE TASKS");

  dumpTasks(reduceMetrics, "throughput",
      new TaskMetrics.OverallThroughputComparator(), "getOverallThroughputBytesPerSecond",
      false, true);
}
 
源代码2 项目: hiped2   文件: JobHistoryHelper.java
public static JobHistory.JobInfo getJobInfoFromHdfsOutputDir(String outputDir, Configuration conf)
    throws IOException {
  Path output = new Path(outputDir);
  Path historyLogDir = new Path(output, "_logs/history");
    FileSystem fs = output.getFileSystem(conf);
    if (!fs.exists(output)) {
      throw new IOException("History directory " + historyLogDir.toString()
          + " does not exist");
    }
    Path[] jobFiles = FileUtil.stat2Paths(fs.listStatus(historyLogDir,
        jobLogFileFilter));
    if (jobFiles.length == 0) {
      throw new IOException("Not a valid history directory "
          + historyLogDir.toString());
    }
    String[] jobDetails =
        JobHistory.JobInfo.decodeJobHistoryFileName(jobFiles[0].getName()).
            split("_");
    String jobId = jobDetails[2] + "_" + jobDetails[3] + "_" + jobDetails[4];
    JobHistory.JobInfo job = new JobHistory.JobInfo(jobId);
    DefaultJobHistoryParser.parseJobTasks(jobFiles[0].toString(), job, fs);
  return job;
}
 
源代码3 项目: hiped2   文件: DataSkewGnuplot.java
public static void dumpTaskTimes(String... args)
    throws Exception {
  JobHistory.JobInfo job = JobHistoryHelper.getJobInfoFromCliArgs(args);

  List<TaskMetrics> mapMetrics =
      JobHistoryHelper.getMapTaskMetrics(job);
  List<TaskMetrics> reduceMetrics =
      JobHistoryHelper.getReduceTaskMetrics(
          job);

  System.out.println("# MAP-EXEC-TIME-SECS\tMAP_INPUT_BYTES");
  dumpTaskTimes(mapMetrics, new TaskMetrics.ExecTimeComparator());

  System.out.println();
  System.out.println("# REDUCE-EXEC-TIME-SECS\tREDUCE_INPUT_BYTES");
  dumpTaskTimes(reduceMetrics, new TaskMetrics.ExecTimeComparator());
}
 
源代码4 项目: hiped2   文件: ExtractJobMetrics.java
public static void printAllTaskAttempts(JobHistory.JobInfo job)
    throws ParseException {
  PaddedTable table = new PaddedTable();
  table
      .addColumnTitle("Type")
      .addColumnTitle("TaskId")
      .addColumnTitle("Status")
      .addColumnTitle("Host")
      .addColumnTitle("OverallTime(HH:MM:SS)")
      .addColumnTitle("ShuffleTime(HH:MM:SS)")
      .addColumnTitle("SortTime(HH:MM:SS)")
      .addColumnTitle("MapInputBytes")
      .addColumnTitle("MapOutputBytes")
      .addColumnTitle("InputRecords")
      .addColumnTitle("OputputRecords");

  printAllTaskAttempts(table, job, JobHistory.Values.MAP.name());
  printAllTaskAttempts(table, job, JobHistory.Values.REDUCE.name());
  printAllTaskAttempts(table, job, JobHistory.Values.SETUP.name());
  printAllTaskAttempts(table, job, JobHistory.Values.CLEANUP.name());

  System.out.println(table);
}
 
源代码5 项目: spork   文件: HadoopJobHistoryLoader.java
private static Map<JobHistory.Keys, String> getLastSuccessfulTaskAttempt(
        JobHistory.Task task) {

    Map<String, JobHistory.TaskAttempt> taskAttempts = task
            .getTaskAttempts();
    int size = taskAttempts.size();
    Iterator<Map.Entry<String, JobHistory.TaskAttempt>> kv = taskAttempts
            .entrySet().iterator();
    for (int i = 0; i < size; i++) {
        // CHECK_IT: Only one SUCCESSFUL TASK ATTEMPT
        Map.Entry<String, JobHistory.TaskAttempt> tae = kv.next();
        JobHistory.TaskAttempt attempt = tae.getValue();
        if (null != attempt && null != attempt.getValues() && attempt.getValues().containsKey(JobHistory.Keys.TASK_STATUS) && attempt.getValues().get(JobHistory.Keys.TASK_STATUS).equals(
                "SUCCESS")) {
            return attempt.getValues();
        }
    }

    return null;
}
 
源代码6 项目: RDFS   文件: JobStatistics.java
private java.util.Map<JobHistory.Keys, String> getLastSuccessfulTaskAttempt(JobHistory.Task task) {
  
  Map<String, JobHistory.TaskAttempt> taskAttempts = task.getTaskAttempts();
  int size = taskAttempts.size();
  java.util.Iterator<Map.Entry<String, JobHistory.TaskAttempt>> kv = taskAttempts.entrySet().iterator();
  for (int i=0; i<size; i++) {
    // CHECK_IT: Only one SUCCESSFUL TASK ATTEMPT
    Map.Entry<String, JobHistory.TaskAttempt> tae = kv.next();
    JobHistory.TaskAttempt attempt = tae.getValue();
    if (attempt.getValues().get(JobHistory.Keys.TASK_STATUS).equals("SUCCESS")) {
      return attempt.getValues();
    }
  }
  
  return null;
}
 
源代码7 项目: hadoop-gpu   文件: JobStatistics.java
private java.util.Map<JobHistory.Keys, String> getLastSuccessfulTaskAttempt(JobHistory.Task task) {
  
  Map<String, JobHistory.TaskAttempt> taskAttempts = task.getTaskAttempts();
  int size = taskAttempts.size();
  java.util.Iterator<Map.Entry<String, JobHistory.TaskAttempt>> kv = taskAttempts.entrySet().iterator();
  for (int i=0; i<size; i++) {
    // CHECK_IT: Only one SUCCESSFUL TASK ATTEMPT
    Map.Entry<String, JobHistory.TaskAttempt> tae = kv.next();
    JobHistory.TaskAttempt attempt = tae.getValue();
    if (attempt.getValues().get(JobHistory.Keys.TASK_STATUS).equals("SUCCESS")) {
      return attempt.getValues();
    }
  }
  
  return null;
}
 
源代码8 项目: hiped2   文件: DataSkewMetrics.java
public static void printTasks(JobHistory.JobInfo job)
    throws ParseException, IllegalAccessException,
    InvocationTargetException, NoSuchMethodException {

  List<TaskMetrics> mapMetrics = JobHistoryHelper.getMapTaskMetrics(
      job);
  List<TaskMetrics> reduceMetrics = JobHistoryHelper.getReduceTaskMetrics(
      job);

  decorateHeader("MAP TASKS");

  dumpTasks(mapMetrics, "execution time",
      new TaskMetrics.ExecTimeComparator(), "getOverallTimeMillis",
      true);
  dumpTasks(mapMetrics, "input records",
      new TaskMetrics.InputRecordsComparator(), "getInputRecords",
      false);
  dumpTasks(mapMetrics, "input bytes",
      new TaskMetrics.InputBytesComparator(), "getInputBytes",
      false);

  decorateHeader("REDUCE TASKS");

  dumpTasks(reduceMetrics, "execution time",
      new TaskMetrics.ExecTimeComparator(), "getOverallTimeMillis", true);
  dumpTasks(reduceMetrics, "input records",
      new TaskMetrics.InputRecordsComparator(), "getInputRecords",
      false);
  dumpTasks(reduceMetrics, "input bytes",
      new TaskMetrics.InputBytesComparator(), "getInputBytes",
      false);
}
 
源代码9 项目: hiped2   文件: JobHistoryHelper.java
public static JobHistory.JobInfo getJobInfoFromCliArgs(Configuration conf, String ... args)
    throws IOException {
  String usage = "Expected 2 arguments, either --hdfsdir <dir> or --localfile <path>";
  if(args.length != 2) {
    throw new IOException(usage);
  }
  if("--hdfsdir".equals(args[0])) {
    return getJobInfoFromHdfsOutputDir(args[1], conf);
  } else if("--localfile".equals(args[0])) {
    return getJobInfoFromLocalFile(args[1], conf);
  }
  throw new IOException("Unexpected option '" + args[0] + "' \n" + usage);
}
 
源代码10 项目: hiped2   文件: JobHistoryHelper.java
public static JobHistory.JobInfo getJobInfoFromLocalFile(String outputFile, Configuration conf)
    throws IOException {
  FileSystem fs = FileSystem.getLocal(conf);

  Path outputFilePath = new Path(outputFile);

  String[] jobDetails =
      JobHistory.JobInfo.decodeJobHistoryFileName(outputFilePath.getName()).
          split("_");
  String jobId = jobDetails[2] + "_" + jobDetails[3] + "_" + jobDetails[4];
  JobHistory.JobInfo job = new JobHistory.JobInfo(jobId);
  DefaultJobHistoryParser.parseJobTasks(outputFile, job, fs);
  return job;
}
 
源代码11 项目: hiped2   文件: JobHistoryHelper.java
public static List<TaskMetrics> getMapTaskMetrics(
    JobHistory.JobInfo job)
    throws ParseException {
  List<TaskMetrics> metrics = new ArrayList<TaskMetrics>();
  addTask(metrics, job, JobHistory.Values.MAP.name());
  return metrics;
}
 
源代码12 项目: hiped2   文件: JobHistoryHelper.java
public static List<TaskMetrics> getReduceTaskMetrics(
    JobHistory.JobInfo job)
    throws ParseException {
  List<TaskMetrics> metrics = new ArrayList<TaskMetrics>();
  addTask(metrics, job, JobHistory.Values.REDUCE.name());
  return metrics;
}
 
源代码13 项目: spork   文件: HadoopJobHistoryLoader.java
private static void populateJob (Map<JobHistory.Keys, String> jobC, Map<String, String> job) {            
    int size = jobC.size();
    Iterator<Map.Entry<JobHistory.Keys, String>> kv = jobC.entrySet().iterator();
    for (int i = 0; i < size; i++) {
        Map.Entry<JobHistory.Keys, String> entry = (Map.Entry<JobHistory.Keys, String>) kv.next();
        JobHistory.Keys key = entry.getKey();
        String value = entry.getValue();
        switch (key) {
        case JOBTRACKERID: job.put(JobKeys.JOBTRACKERID.toString(), value); break;           
        case FINISH_TIME: job.put(JobKeys.FINISH_TIME.toString(), value); break;
        case JOBID: job.put(JobKeys.JOBID.toString(), value); break;
        case JOBNAME: job.put(JobKeys.JOBNAME.toString(), value); break;
        case USER: job.put(JobKeys.USER.toString(), value); break;
        case JOBCONF: job.put(JobKeys.JOBCONF.toString(), value); break;
        case SUBMIT_TIME: job.put(JobKeys.SUBMIT_TIME.toString(), value); break;
        case LAUNCH_TIME: job.put(JobKeys.LAUNCH_TIME.toString(), value); break;
        case TOTAL_MAPS: job.put(JobKeys.TOTAL_MAPS.toString(), value); break;
        case TOTAL_REDUCES: job.put(JobKeys.TOTAL_REDUCES.toString(), value); break;
        case FAILED_MAPS: job.put(JobKeys.FAILED_MAPS.toString(), value); break;
        case FAILED_REDUCES: job.put(JobKeys.FAILED_REDUCES.toString(), value); break;
        case FINISHED_MAPS: job.put(JobKeys.FINISHED_MAPS.toString(), value); break;
        case FINISHED_REDUCES: job.put(JobKeys.FINISHED_REDUCES.toString(), value); break;
        case JOB_STATUS: job.put(JobKeys.STATUS.toString(), value); break;
        case COUNTERS:
            value.concat(",");
            parseAndAddJobCounters(job, value);
            break;
        default: 
            LOG.debug("JobHistory.Keys."+ key + " : NOT INCLUDED IN LOADER RETURN VALUE");
            break;
        }
    }
}
 
源代码14 项目: RDFS   文件: JobStatistics.java
private void populate_Job (Hashtable<Enum, String> job, java.util.Map<JobHistory.Keys, String> jobC) throws ParseException {
  int size = jobC.size(); 
  java.util.Iterator<Map.Entry<JobHistory.Keys, String>> kv = jobC.entrySet().iterator();
  for (int i = 0; i < size; i++)
  {
    Map.Entry<JobHistory.Keys, String> entry = (Map.Entry<JobHistory.Keys, String>) kv.next();
    JobHistory.Keys key = entry.getKey();
    String value = entry.getValue();
    //System.out.println("JobHistory.JobKeys."+key+": "+value);
    switch (key) {
    case JOBTRACKERID: job.put(JobKeys.JOBTRACKERID, value); break;
    case FINISH_TIME: job.put(JobKeys.FINISH_TIME, value); break;
    case JOBID: job.put(JobKeys.JOBID, value); break;
    case JOBNAME: job.put(JobKeys.JOBNAME, value); break;
    case USER: job.put(JobKeys.USER, value); break;
    case JOBCONF: job.put(JobKeys.JOBCONF, value); break;
    case SUBMIT_TIME: job.put(JobKeys.SUBMIT_TIME, value); break;
    case LAUNCH_TIME: job.put(JobKeys.LAUNCH_TIME, value); break;
    case TOTAL_MAPS: job.put(JobKeys.TOTAL_MAPS, value); break;
    case TOTAL_REDUCES: job.put(JobKeys.TOTAL_REDUCES, value); break;
    case FAILED_MAPS: job.put(JobKeys.FAILED_MAPS, value); break;
    case FAILED_REDUCES: job.put(JobKeys.FAILED_REDUCES, value); break;
    case FINISHED_MAPS: job.put(JobKeys.FINISHED_MAPS, value); break;
    case FINISHED_REDUCES: job.put(JobKeys.FINISHED_REDUCES, value); break;
    case JOB_STATUS: job.put(JobKeys.STATUS, value); break;
    case JOB_PRIORITY: job.put(JobKeys.JOB_PRIORITY, value); break;
    case COUNTERS:
      value.concat(",");
      parseAndAddJobCounters(job, value);
      break;
    default:   System.err.println("JobHistory.Keys."+key+" : NOT INCLUDED IN PERFORMANCE ADVISOR COUNTERS");
             break;
    }
  }
}
 
源代码15 项目: hadoop-gpu   文件: JobStatistics.java
private void populate_Job (Hashtable<Enum, String> job, java.util.Map<JobHistory.Keys, String> jobC) throws ParseException {
  int size = jobC.size(); 
  java.util.Iterator<Map.Entry<JobHistory.Keys, String>> kv = jobC.entrySet().iterator();
  for (int i = 0; i < size; i++)
  {
    Map.Entry<JobHistory.Keys, String> entry = (Map.Entry<JobHistory.Keys, String>) kv.next();
    JobHistory.Keys key = entry.getKey();
    String value = entry.getValue();
    switch (key) {
    case JOBTRACKERID: job.put(JobKeys.JOBTRACKERID, value); break;
    //case START_TIME: job.put(JobKeys., value); break;
    case FINISH_TIME: job.put(JobKeys.FINISH_TIME, value); break;
    case JOBID: job.put(JobKeys.JOBID, value); break;
    case JOBNAME: job.put(JobKeys.JOBNAME, value); break;
    case USER: job.put(JobKeys.USER, value); break;
    case JOBCONF: job.put(JobKeys.JOBCONF, value); break;
    case SUBMIT_TIME: job.put(JobKeys.SUBMIT_TIME, value); break;
    case LAUNCH_TIME: job.put(JobKeys.LAUNCH_TIME, value); break;
    case TOTAL_MAPS: job.put(JobKeys.TOTAL_MAPS, value); break;
    case TOTAL_REDUCES: job.put(JobKeys.TOTAL_REDUCES, value); break;
    case FAILED_MAPS: job.put(JobKeys.FAILED_MAPS, value); break;
    case FAILED_REDUCES: job.put(JobKeys.FAILED_REDUCES, value); break;
    case FINISHED_MAPS: job.put(JobKeys.FINISHED_MAPS, value); break;
    case FINISHED_REDUCES: job.put(JobKeys.FINISHED_REDUCES, value); break;
    case JOB_STATUS: job.put(JobKeys.STATUS, value); break;
    case COUNTERS:
      value.concat(",");
      parseAndAddJobCounters(job, value);
      break;
    default:   System.out.println("JobHistory.Keys."+key+" : NOT INCLUDED IN PERFORMANCE ADVISOR COUNTERS");
             break;
    }
  }
}
 
源代码16 项目: hiped2   文件: ExtractJobTaskTimeline.java
public static void dumpTaskTimes(String... args)
    throws Exception {

  JobHistory.JobInfo job = JobHistoryHelper.getJobInfoFromCliArgs(args);

  long startTime = job.getLong(JobHistory.Keys.LAUNCH_TIME);
  long endTime = job.getLong(JobHistory.Keys.FINISH_TIME);

  List<TimeRange> mapRanges = new ArrayList<TimeRange>();
  List<TimeRange> reduceRanges = new ArrayList<TimeRange>();
  List<TimeRange> shuffleRanges = new ArrayList<TimeRange>();
  List<TimeRange> sortRanges = new ArrayList<TimeRange>();


  Map<String, JobHistory.Task> tasks = job.getAllTasks();
  for (JobHistory.Task task : tasks.values()) {
    for (JobHistory.TaskAttempt attempt : task.getTaskAttempts()
        .values()) {

      String taskId = attempt.get(JobHistory.Keys.TASK_ATTEMPT_ID);
      String taskType = task.get(JobHistory.Keys.TASK_TYPE);
      String taskStatus = task.get(JobHistory.Keys.TASK_STATUS);

      System.out.println(taskId + " " + taskType + " " + taskStatus);


      long taskStartTime =
          attempt.getLong(JobHistory.Keys.START_TIME);
      long taskEndTime =
          attempt.getLong(JobHistory.Keys.FINISH_TIME);

      TimeRange range =
          new TimeRange(TimeUnit.MILLISECONDS, taskStartTime,
              taskEndTime);

        if (JobHistory.Values.MAP.name().equals(taskType)) {
          mapRanges.add(range);
        } else if (JobHistory.Values.REDUCE.name().equals(taskType)) {

          long shuffleEndTime =
              attempt.getLong(JobHistory.Keys.SHUFFLE_FINISHED);
          long sortEndTime =
              attempt.getLong(JobHistory.Keys.SORT_FINISHED);

          shuffleRanges.add(
              new TimeRange(TimeUnit.MILLISECONDS, taskStartTime,
                  shuffleEndTime));
          sortRanges.add(
              new TimeRange(TimeUnit.MILLISECONDS, shuffleEndTime,
                  sortEndTime));
          reduceRanges.add(
              new TimeRange(TimeUnit.MILLISECONDS, sortEndTime,
                  taskEndTime));
        }
    }
  }

  // output the data, tab-separated in the following order:
  // time-offset  #-map-tasks  #-reduce-tasks  #-shuffle-tasks  #-sort-tasks  #-waste-tasks
  // steps of 1 second
  StringBuilder sb = new StringBuilder();
  sb.append("time")
      .append("\tmap")
      .append("\treduce")
      .append("\tshuffle")
      .append("\tsort")
  ;
  System.err.println(sb);

  int timeOffset = 0;
  for (long i = startTime; i <= endTime; i += 1000) {
    sb = new StringBuilder();
    sb.append(timeOffset)
        .append("\t").append(countRangesForTime(mapRanges, i))
        .append("\t").append(countRangesForTime(reduceRanges, i))
        .append("\t").append(countRangesForTime(shuffleRanges, i))
        .append("\t").append(countRangesForTime(sortRanges, i))
    ;

    System.err.println(sb);
    timeOffset++;

  }

}
 
源代码17 项目: hiped2   文件: JobHistoryHelper.java
public static JobHistory.JobInfo getJobInfoFromCliArgs(String ... args)
    throws IOException {
  return getJobInfoFromCliArgs(new Configuration(), args);
}
 
源代码18 项目: hiped2   文件: JobHistoryHelper.java
public static void addTask(List<TaskMetrics> metrics,
                           JobHistory.JobInfo job,
                           String taskType)
    throws ParseException {
  Map<String, JobHistory.Task> tasks = job.getAllTasks();
  for (JobHistory.Task task : tasks.values()) {
    for (JobHistory.TaskAttempt attempt : task.getTaskAttempts()
        .values()) {
      if (taskType.equals(task.get(JobHistory.Keys.TASK_TYPE))) {

        TaskMetrics metric = new TaskMetrics();
        metrics.add(metric);
        metric.setType(taskType)
            .setTaskId(attempt.get(JobHistory.Keys.TASK_ATTEMPT_ID))
            .setHost(attempt.get(JobHistory.Keys.HOSTNAME))
            .setStatus(attempt.get(JobHistory.Keys.TASK_STATUS));

        long taskOverallTime =
            attempt.getLong(JobHistory.Keys.FINISH_TIME) -
                attempt.getLong(JobHistory.Keys.START_TIME);

        metric.setOverallTimeMillis(taskOverallTime);

        metric.setInputBytes(
            extractNumericCounter(
                attempt.get(JobHistory.Keys.COUNTERS),
                MAP_INPUT_BYTES.name(),
                REDUCE_SHUFFLE_BYTES.name()));

        metric.setOutputBytes(
            extractNumericCounter(
                attempt.get(JobHistory.Keys.COUNTERS),
                MAP_OUTPUT_BYTES.name(),
                "HDFS_BYTES_WRITTEN"));

        metric.setInputRecords(
            extractNumericCounter(
                attempt.get(JobHistory.Keys.COUNTERS),
                MAP_INPUT_RECORDS.name(),
                REDUCE_INPUT_RECORDS.name()));

        metric.setOutputRecords(
            extractNumericCounter(
                attempt.get(JobHistory.Keys.COUNTERS),
                MAP_OUTPUT_RECORDS.name(),
                REDUCE_OUTPUT_RECORDS.name()));

        if (JobHistory.Values.REDUCE.name()
            .equals(task.get(JobHistory.Keys.TASK_TYPE))) {
          long shuffleTime =
              attempt.getLong(JobHistory.Keys.SHUFFLE_FINISHED) -
                  attempt.getLong(JobHistory.Keys.START_TIME);
          long sortTime =
              attempt.getLong(JobHistory.Keys.SORT_FINISHED) -
                  attempt
                      .getLong(JobHistory.Keys.SHUFFLE_FINISHED);

          metric.setShuffleTimeMillis(shuffleTime);
          metric.setSortTimeMillis(sortTime);
        }

      }
    }
  }
}
 
源代码19 项目: hiped2   文件: MetricSummary.java
public static void printTasks(JobHistory.JobInfo job)
    throws ParseException, IllegalAccessException,
    InvocationTargetException, NoSuchMethodException {

  List<TaskMetrics> mapMetrics = JobHistoryHelper.getMapTaskMetrics(
      job);
  List<TaskMetrics> reduceMetrics =
      JobHistoryHelper.getReduceTaskMetrics(
          job);

  System.out.println("Job Statistics");
  System.out.println("");

  PaddedTable table = new PaddedTable();
  table
      .addColumnTitle("Item")
      .addColumnTitle("Min")
      .addColumnTitle("Max")
      .addColumnTitle("Median")
      .addColumnTitle("Mean")
      .addColumnTitle("StdDev");

  dumpTasks(table, mapMetrics, "execution time",
      "getOverallTimeMillis", true);
  dumpTasks(table, mapMetrics, "input records", "getInputRecords",
      false);
  dumpTasks(table, mapMetrics, "input bytes", "getInputBytes", false);
  dumpTasks(table, mapMetrics, "output records", "getOutputRecords",
      false);
  dumpTasks(table, mapMetrics, "output bytes", "getOutputBytes",
      false);

  DataSkewMetrics.decorateHeader("MAP TASKS");
  System.out.println("");
  System.out.println("Num Map Tasks:    " + mapMetrics.size());
  System.out.println("");
  System.out.println(table);
  table.clearRows();

  if(reduceMetrics.size() > 0) {

    dumpTasks(table, reduceMetrics, "execution time",
        "getOverallTimeMillis", true);
    dumpTasks(table, reduceMetrics, "shuffle time",
        "getShuffleTimeMillis", true);
    dumpTasks(table, reduceMetrics, "sort time", "getSortTimeMillis",
        true);
    dumpTasks(table, reduceMetrics, "input records", "getInputRecords",
        false);
    dumpTasks(table, reduceMetrics, "input bytes", "getInputBytes",
        false);
    dumpTasks(table, reduceMetrics, "output records",
        "getOutputRecords", false);
    dumpTasks(table, reduceMetrics, "output bytes", "getOutputBytes",
        false);

    DataSkewMetrics.decorateHeader("REDUCE TASKS");
    System.out.println("");
    System.out.println("Num Reduce Tasks: " + reduceMetrics.size());
    System.out.println("");
    System.out.println(table);
  }
}
 
源代码20 项目: hiped2   文件: ExtractJobMetrics.java
public static void printAllTaskAttempts(PaddedTable table,
                                        JobHistory.JobInfo job,
                                        String taskType)
    throws ParseException {
  Map<String, JobHistory.Task> tasks = job.getAllTasks();
  for (JobHistory.Task task : tasks.values()) {
    for (JobHistory.TaskAttempt attempt : task.getTaskAttempts()
        .values()) {
      if (taskType.equals(task.get(JobHistory.Keys.TASK_TYPE))) {

        long taskOverallTime =
                attempt.getLong(JobHistory.Keys.FINISH_TIME) -
                    attempt.getLong(JobHistory.Keys.START_TIME);
        long shuffleTime =
                attempt.getLong(JobHistory.Keys.SHUFFLE_FINISHED) -
                    attempt.getLong(JobHistory.Keys.START_TIME);
        long taskSortTime =
                attempt.getLong(JobHistory.Keys.SORT_FINISHED) -
                    attempt
                        .getLong(JobHistory.Keys.SHUFFLE_FINISHED);

        table.newRow()
            .addColumnValue(taskType)
            .addColumnValue(
                attempt.get(JobHistory.Keys.TASK_ATTEMPT_ID))
            .addColumnValue(
                attempt.get(JobHistory.Keys.TASK_STATUS))
            .addColumnValue(attempt.get(JobHistory.Keys.HOSTNAME))
            .addColumnValue(formatTime(taskOverallTime));

        if (JobHistory.Values.REDUCE.name()
            .equals(task.get(JobHistory.Keys.TASK_TYPE))) {
          table.addColumnValue(formatTime(shuffleTime))
              .addColumnValue(formatTime(taskSortTime));
        } else {
          table.addColumnValue("").addColumnValue("");
        }
        table.addColumnValue(
            extractCounter(attempt.get(JobHistory.Keys.COUNTERS),
                MAP_INPUT_BYTES.name()));
        table.addColumnValue(
            extractCounter(attempt.get(JobHistory.Keys.COUNTERS),
                MAP_OUTPUT_BYTES.name()));
        table.addColumnValue(
            extractCounter(attempt.get(JobHistory.Keys.COUNTERS),
                MAP_INPUT_RECORDS.name(),
                REDUCE_INPUT_RECORDS.name()));
        table.addColumnValue(
            extractCounter(attempt.get(JobHistory.Keys.COUNTERS),
                MAP_OUTPUT_RECORDS.name(),
                REDUCE_OUTPUT_RECORDS.name()));
      }
    }
  }
}
 
源代码21 项目: spork   文件: HadoopJobHistoryLoader.java
@Override
public boolean nextKeyValue() throws IOException, InterruptedException {
    if (location != null) {
        LOG.info("load: " + location);  
        Path full = new Path(location);  
        String[] jobDetails = 
            JobInfo.decodeJobHistoryFileName(full.getName()).split("_");
        String jobId = jobDetails[2] + "_" + jobDetails[3] + "_"
                + jobDetails[4];
        JobHistory.JobInfo job = new JobHistory.JobInfo(jobId); 
 
        value = new MRJobInfo();
                                    
        FileSystem fs = full.getFileSystem(conf);
        FileStatus fstat = fs.getFileStatus(full);
        
        LOG.info("file size: " + fstat.getLen());
        DefaultJobHistoryParser.parseJobTasks(location, job,
                full.getFileSystem(conf)); 
        LOG.info("job history parsed sucessfully");
        HadoopJobHistoryLoader.parseJobHistory(conf, job, value);
        LOG.info("get parsed job history");
        
        // parse Hadoop job xml file
        Path parent = full.getParent();
        String jobXml = jobDetails[0] + "_" + jobDetails[1] + "_" + jobDetails[2] + "_conf.xml";
        Path p = new Path(parent, jobXml);  
     
        FSDataInputStream fileIn = fs.open(p);
        Map<String, String> val = HadoopJobHistoryLoader
                .parseJobXML(fileIn);
        for (String key : val.keySet()) {
            value.job.put(key, val.get(key));
        }
        
        location = null;
        return true;
    }          
    value = null;
    return false;
}
 
源代码22 项目: hiped2   文件: ExtractJobMetrics.java
public static void dumpTaskTimes(String... args)
    throws Exception {

  JobHistory.JobInfo job = JobHistoryHelper.getJobInfoFromCliArgs(args);

  printAllTaskAttempts(job);
}
 
 类所在包
 同包方法