org.apache.hadoop.mapred.JobHistory#JobInfo ( )源码实例Demo

下面列出了org.apache.hadoop.mapred.JobHistory#JobInfo ( ) 实例代码,或者点击链接到github查看源代码,也可以在右侧发表评论。

源代码1 项目: hiped2   文件: TaskThroughput.java
public static void printTasks(JobHistory.JobInfo job)
    throws ParseException, IllegalAccessException,
    InvocationTargetException, NoSuchMethodException {

  List<TaskMetrics> mapMetrics = JobHistoryHelper.getMapTaskMetrics(
      job);
  List<TaskMetrics> reduceMetrics = JobHistoryHelper.getReduceTaskMetrics(
      job);

  decorateHeader("MAP TASKS");

  dumpTasks(mapMetrics, "throughput",
      new TaskMetrics.OverallThroughputComparator(), "getOverallThroughputBytesPerSecond",
      false, false);

  decorateHeader("REDUCE TASKS");

  dumpTasks(reduceMetrics, "throughput",
      new TaskMetrics.OverallThroughputComparator(), "getOverallThroughputBytesPerSecond",
      false, true);
}
 
源代码2 项目: hiped2   文件: JobHistoryHelper.java
public static JobHistory.JobInfo getJobInfoFromHdfsOutputDir(String outputDir, Configuration conf)
    throws IOException {
  Path output = new Path(outputDir);
  Path historyLogDir = new Path(output, "_logs/history");
    FileSystem fs = output.getFileSystem(conf);
    if (!fs.exists(output)) {
      throw new IOException("History directory " + historyLogDir.toString()
          + " does not exist");
    }
    Path[] jobFiles = FileUtil.stat2Paths(fs.listStatus(historyLogDir,
        jobLogFileFilter));
    if (jobFiles.length == 0) {
      throw new IOException("Not a valid history directory "
          + historyLogDir.toString());
    }
    String[] jobDetails =
        JobHistory.JobInfo.decodeJobHistoryFileName(jobFiles[0].getName()).
            split("_");
    String jobId = jobDetails[2] + "_" + jobDetails[3] + "_" + jobDetails[4];
    JobHistory.JobInfo job = new JobHistory.JobInfo(jobId);
    DefaultJobHistoryParser.parseJobTasks(jobFiles[0].toString(), job, fs);
  return job;
}
 
源代码3 项目: hiped2   文件: DataSkewGnuplot.java
public static void dumpTaskTimes(String... args)
    throws Exception {
  JobHistory.JobInfo job = JobHistoryHelper.getJobInfoFromCliArgs(args);

  List<TaskMetrics> mapMetrics =
      JobHistoryHelper.getMapTaskMetrics(job);
  List<TaskMetrics> reduceMetrics =
      JobHistoryHelper.getReduceTaskMetrics(
          job);

  System.out.println("# MAP-EXEC-TIME-SECS\tMAP_INPUT_BYTES");
  dumpTaskTimes(mapMetrics, new TaskMetrics.ExecTimeComparator());

  System.out.println();
  System.out.println("# REDUCE-EXEC-TIME-SECS\tREDUCE_INPUT_BYTES");
  dumpTaskTimes(reduceMetrics, new TaskMetrics.ExecTimeComparator());
}
 
源代码4 项目: hiped2   文件: ExtractJobMetrics.java
public static void printAllTaskAttempts(JobHistory.JobInfo job)
    throws ParseException {
  PaddedTable table = new PaddedTable();
  table
      .addColumnTitle("Type")
      .addColumnTitle("TaskId")
      .addColumnTitle("Status")
      .addColumnTitle("Host")
      .addColumnTitle("OverallTime(HH:MM:SS)")
      .addColumnTitle("ShuffleTime(HH:MM:SS)")
      .addColumnTitle("SortTime(HH:MM:SS)")
      .addColumnTitle("MapInputBytes")
      .addColumnTitle("MapOutputBytes")
      .addColumnTitle("InputRecords")
      .addColumnTitle("OputputRecords");

  printAllTaskAttempts(table, job, JobHistory.Values.MAP.name());
  printAllTaskAttempts(table, job, JobHistory.Values.REDUCE.name());
  printAllTaskAttempts(table, job, JobHistory.Values.SETUP.name());
  printAllTaskAttempts(table, job, JobHistory.Values.CLEANUP.name());

  System.out.println(table);
}
 
源代码5 项目: hiped2   文件: DataSkewMetrics.java
public static void printTasks(JobHistory.JobInfo job)
    throws ParseException, IllegalAccessException,
    InvocationTargetException, NoSuchMethodException {

  List<TaskMetrics> mapMetrics = JobHistoryHelper.getMapTaskMetrics(
      job);
  List<TaskMetrics> reduceMetrics = JobHistoryHelper.getReduceTaskMetrics(
      job);

  decorateHeader("MAP TASKS");

  dumpTasks(mapMetrics, "execution time",
      new TaskMetrics.ExecTimeComparator(), "getOverallTimeMillis",
      true);
  dumpTasks(mapMetrics, "input records",
      new TaskMetrics.InputRecordsComparator(), "getInputRecords",
      false);
  dumpTasks(mapMetrics, "input bytes",
      new TaskMetrics.InputBytesComparator(), "getInputBytes",
      false);

  decorateHeader("REDUCE TASKS");

  dumpTasks(reduceMetrics, "execution time",
      new TaskMetrics.ExecTimeComparator(), "getOverallTimeMillis", true);
  dumpTasks(reduceMetrics, "input records",
      new TaskMetrics.InputRecordsComparator(), "getInputRecords",
      false);
  dumpTasks(reduceMetrics, "input bytes",
      new TaskMetrics.InputBytesComparator(), "getInputBytes",
      false);
}
 
源代码6 项目: hiped2   文件: JobHistoryHelper.java
public static JobHistory.JobInfo getJobInfoFromCliArgs(Configuration conf, String ... args)
    throws IOException {
  String usage = "Expected 2 arguments, either --hdfsdir <dir> or --localfile <path>";
  if(args.length != 2) {
    throw new IOException(usage);
  }
  if("--hdfsdir".equals(args[0])) {
    return getJobInfoFromHdfsOutputDir(args[1], conf);
  } else if("--localfile".equals(args[0])) {
    return getJobInfoFromLocalFile(args[1], conf);
  }
  throw new IOException("Unexpected option '" + args[0] + "' \n" + usage);
}
 
源代码7 项目: hiped2   文件: JobHistoryHelper.java
public static JobHistory.JobInfo getJobInfoFromLocalFile(String outputFile, Configuration conf)
    throws IOException {
  FileSystem fs = FileSystem.getLocal(conf);

  Path outputFilePath = new Path(outputFile);

  String[] jobDetails =
      JobHistory.JobInfo.decodeJobHistoryFileName(outputFilePath.getName()).
          split("_");
  String jobId = jobDetails[2] + "_" + jobDetails[3] + "_" + jobDetails[4];
  JobHistory.JobInfo job = new JobHistory.JobInfo(jobId);
  DefaultJobHistoryParser.parseJobTasks(outputFile, job, fs);
  return job;
}
 
源代码8 项目: hiped2   文件: JobHistoryHelper.java
public static List<TaskMetrics> getMapTaskMetrics(
    JobHistory.JobInfo job)
    throws ParseException {
  List<TaskMetrics> metrics = new ArrayList<TaskMetrics>();
  addTask(metrics, job, JobHistory.Values.MAP.name());
  return metrics;
}
 
源代码9 项目: hiped2   文件: JobHistoryHelper.java
public static List<TaskMetrics> getReduceTaskMetrics(
    JobHistory.JobInfo job)
    throws ParseException {
  List<TaskMetrics> metrics = new ArrayList<TaskMetrics>();
  addTask(metrics, job, JobHistory.Values.REDUCE.name());
  return metrics;
}
 
源代码10 项目: hiped2   文件: ExtractJobTaskTimeline.java
public static void dumpTaskTimes(String... args)
    throws Exception {

  JobHistory.JobInfo job = JobHistoryHelper.getJobInfoFromCliArgs(args);

  long startTime = job.getLong(JobHistory.Keys.LAUNCH_TIME);
  long endTime = job.getLong(JobHistory.Keys.FINISH_TIME);

  List<TimeRange> mapRanges = new ArrayList<TimeRange>();
  List<TimeRange> reduceRanges = new ArrayList<TimeRange>();
  List<TimeRange> shuffleRanges = new ArrayList<TimeRange>();
  List<TimeRange> sortRanges = new ArrayList<TimeRange>();


  Map<String, JobHistory.Task> tasks = job.getAllTasks();
  for (JobHistory.Task task : tasks.values()) {
    for (JobHistory.TaskAttempt attempt : task.getTaskAttempts()
        .values()) {

      String taskId = attempt.get(JobHistory.Keys.TASK_ATTEMPT_ID);
      String taskType = task.get(JobHistory.Keys.TASK_TYPE);
      String taskStatus = task.get(JobHistory.Keys.TASK_STATUS);

      System.out.println(taskId + " " + taskType + " " + taskStatus);


      long taskStartTime =
          attempt.getLong(JobHistory.Keys.START_TIME);
      long taskEndTime =
          attempt.getLong(JobHistory.Keys.FINISH_TIME);

      TimeRange range =
          new TimeRange(TimeUnit.MILLISECONDS, taskStartTime,
              taskEndTime);

        if (JobHistory.Values.MAP.name().equals(taskType)) {
          mapRanges.add(range);
        } else if (JobHistory.Values.REDUCE.name().equals(taskType)) {

          long shuffleEndTime =
              attempt.getLong(JobHistory.Keys.SHUFFLE_FINISHED);
          long sortEndTime =
              attempt.getLong(JobHistory.Keys.SORT_FINISHED);

          shuffleRanges.add(
              new TimeRange(TimeUnit.MILLISECONDS, taskStartTime,
                  shuffleEndTime));
          sortRanges.add(
              new TimeRange(TimeUnit.MILLISECONDS, shuffleEndTime,
                  sortEndTime));
          reduceRanges.add(
              new TimeRange(TimeUnit.MILLISECONDS, sortEndTime,
                  taskEndTime));
        }
    }
  }

  // output the data, tab-separated in the following order:
  // time-offset  #-map-tasks  #-reduce-tasks  #-shuffle-tasks  #-sort-tasks  #-waste-tasks
  // steps of 1 second
  StringBuilder sb = new StringBuilder();
  sb.append("time")
      .append("\tmap")
      .append("\treduce")
      .append("\tshuffle")
      .append("\tsort")
  ;
  System.err.println(sb);

  int timeOffset = 0;
  for (long i = startTime; i <= endTime; i += 1000) {
    sb = new StringBuilder();
    sb.append(timeOffset)
        .append("\t").append(countRangesForTime(mapRanges, i))
        .append("\t").append(countRangesForTime(reduceRanges, i))
        .append("\t").append(countRangesForTime(shuffleRanges, i))
        .append("\t").append(countRangesForTime(sortRanges, i))
    ;

    System.err.println(sb);
    timeOffset++;

  }

}
 
源代码11 项目: hiped2   文件: JobHistoryHelper.java
public static JobHistory.JobInfo getJobInfoFromCliArgs(String ... args)
    throws IOException {
  return getJobInfoFromCliArgs(new Configuration(), args);
}
 
源代码12 项目: hiped2   文件: JobHistoryHelper.java
public static void addTask(List<TaskMetrics> metrics,
                           JobHistory.JobInfo job,
                           String taskType)
    throws ParseException {
  Map<String, JobHistory.Task> tasks = job.getAllTasks();
  for (JobHistory.Task task : tasks.values()) {
    for (JobHistory.TaskAttempt attempt : task.getTaskAttempts()
        .values()) {
      if (taskType.equals(task.get(JobHistory.Keys.TASK_TYPE))) {

        TaskMetrics metric = new TaskMetrics();
        metrics.add(metric);
        metric.setType(taskType)
            .setTaskId(attempt.get(JobHistory.Keys.TASK_ATTEMPT_ID))
            .setHost(attempt.get(JobHistory.Keys.HOSTNAME))
            .setStatus(attempt.get(JobHistory.Keys.TASK_STATUS));

        long taskOverallTime =
            attempt.getLong(JobHistory.Keys.FINISH_TIME) -
                attempt.getLong(JobHistory.Keys.START_TIME);

        metric.setOverallTimeMillis(taskOverallTime);

        metric.setInputBytes(
            extractNumericCounter(
                attempt.get(JobHistory.Keys.COUNTERS),
                MAP_INPUT_BYTES.name(),
                REDUCE_SHUFFLE_BYTES.name()));

        metric.setOutputBytes(
            extractNumericCounter(
                attempt.get(JobHistory.Keys.COUNTERS),
                MAP_OUTPUT_BYTES.name(),
                "HDFS_BYTES_WRITTEN"));

        metric.setInputRecords(
            extractNumericCounter(
                attempt.get(JobHistory.Keys.COUNTERS),
                MAP_INPUT_RECORDS.name(),
                REDUCE_INPUT_RECORDS.name()));

        metric.setOutputRecords(
            extractNumericCounter(
                attempt.get(JobHistory.Keys.COUNTERS),
                MAP_OUTPUT_RECORDS.name(),
                REDUCE_OUTPUT_RECORDS.name()));

        if (JobHistory.Values.REDUCE.name()
            .equals(task.get(JobHistory.Keys.TASK_TYPE))) {
          long shuffleTime =
              attempt.getLong(JobHistory.Keys.SHUFFLE_FINISHED) -
                  attempt.getLong(JobHistory.Keys.START_TIME);
          long sortTime =
              attempt.getLong(JobHistory.Keys.SORT_FINISHED) -
                  attempt
                      .getLong(JobHistory.Keys.SHUFFLE_FINISHED);

          metric.setShuffleTimeMillis(shuffleTime);
          metric.setSortTimeMillis(sortTime);
        }

      }
    }
  }
}
 
源代码13 项目: hiped2   文件: MetricSummary.java
public static void printTasks(JobHistory.JobInfo job)
    throws ParseException, IllegalAccessException,
    InvocationTargetException, NoSuchMethodException {

  List<TaskMetrics> mapMetrics = JobHistoryHelper.getMapTaskMetrics(
      job);
  List<TaskMetrics> reduceMetrics =
      JobHistoryHelper.getReduceTaskMetrics(
          job);

  System.out.println("Job Statistics");
  System.out.println("");

  PaddedTable table = new PaddedTable();
  table
      .addColumnTitle("Item")
      .addColumnTitle("Min")
      .addColumnTitle("Max")
      .addColumnTitle("Median")
      .addColumnTitle("Mean")
      .addColumnTitle("StdDev");

  dumpTasks(table, mapMetrics, "execution time",
      "getOverallTimeMillis", true);
  dumpTasks(table, mapMetrics, "input records", "getInputRecords",
      false);
  dumpTasks(table, mapMetrics, "input bytes", "getInputBytes", false);
  dumpTasks(table, mapMetrics, "output records", "getOutputRecords",
      false);
  dumpTasks(table, mapMetrics, "output bytes", "getOutputBytes",
      false);

  DataSkewMetrics.decorateHeader("MAP TASKS");
  System.out.println("");
  System.out.println("Num Map Tasks:    " + mapMetrics.size());
  System.out.println("");
  System.out.println(table);
  table.clearRows();

  if(reduceMetrics.size() > 0) {

    dumpTasks(table, reduceMetrics, "execution time",
        "getOverallTimeMillis", true);
    dumpTasks(table, reduceMetrics, "shuffle time",
        "getShuffleTimeMillis", true);
    dumpTasks(table, reduceMetrics, "sort time", "getSortTimeMillis",
        true);
    dumpTasks(table, reduceMetrics, "input records", "getInputRecords",
        false);
    dumpTasks(table, reduceMetrics, "input bytes", "getInputBytes",
        false);
    dumpTasks(table, reduceMetrics, "output records",
        "getOutputRecords", false);
    dumpTasks(table, reduceMetrics, "output bytes", "getOutputBytes",
        false);

    DataSkewMetrics.decorateHeader("REDUCE TASKS");
    System.out.println("");
    System.out.println("Num Reduce Tasks: " + reduceMetrics.size());
    System.out.println("");
    System.out.println(table);
  }
}
 
源代码14 项目: hiped2   文件: ExtractJobMetrics.java
public static void printAllTaskAttempts(PaddedTable table,
                                        JobHistory.JobInfo job,
                                        String taskType)
    throws ParseException {
  Map<String, JobHistory.Task> tasks = job.getAllTasks();
  for (JobHistory.Task task : tasks.values()) {
    for (JobHistory.TaskAttempt attempt : task.getTaskAttempts()
        .values()) {
      if (taskType.equals(task.get(JobHistory.Keys.TASK_TYPE))) {

        long taskOverallTime =
                attempt.getLong(JobHistory.Keys.FINISH_TIME) -
                    attempt.getLong(JobHistory.Keys.START_TIME);
        long shuffleTime =
                attempt.getLong(JobHistory.Keys.SHUFFLE_FINISHED) -
                    attempt.getLong(JobHistory.Keys.START_TIME);
        long taskSortTime =
                attempt.getLong(JobHistory.Keys.SORT_FINISHED) -
                    attempt
                        .getLong(JobHistory.Keys.SHUFFLE_FINISHED);

        table.newRow()
            .addColumnValue(taskType)
            .addColumnValue(
                attempt.get(JobHistory.Keys.TASK_ATTEMPT_ID))
            .addColumnValue(
                attempt.get(JobHistory.Keys.TASK_STATUS))
            .addColumnValue(attempt.get(JobHistory.Keys.HOSTNAME))
            .addColumnValue(formatTime(taskOverallTime));

        if (JobHistory.Values.REDUCE.name()
            .equals(task.get(JobHistory.Keys.TASK_TYPE))) {
          table.addColumnValue(formatTime(shuffleTime))
              .addColumnValue(formatTime(taskSortTime));
        } else {
          table.addColumnValue("").addColumnValue("");
        }
        table.addColumnValue(
            extractCounter(attempt.get(JobHistory.Keys.COUNTERS),
                MAP_INPUT_BYTES.name()));
        table.addColumnValue(
            extractCounter(attempt.get(JobHistory.Keys.COUNTERS),
                MAP_OUTPUT_BYTES.name()));
        table.addColumnValue(
            extractCounter(attempt.get(JobHistory.Keys.COUNTERS),
                MAP_INPUT_RECORDS.name(),
                REDUCE_INPUT_RECORDS.name()));
        table.addColumnValue(
            extractCounter(attempt.get(JobHistory.Keys.COUNTERS),
                MAP_OUTPUT_RECORDS.name(),
                REDUCE_OUTPUT_RECORDS.name()));
      }
    }
  }
}
 
源代码15 项目: spork   文件: HadoopJobHistoryLoader.java
@Override
public boolean nextKeyValue() throws IOException, InterruptedException {
    if (location != null) {
        LOG.info("load: " + location);  
        Path full = new Path(location);  
        String[] jobDetails = 
            JobInfo.decodeJobHistoryFileName(full.getName()).split("_");
        String jobId = jobDetails[2] + "_" + jobDetails[3] + "_"
                + jobDetails[4];
        JobHistory.JobInfo job = new JobHistory.JobInfo(jobId); 
 
        value = new MRJobInfo();
                                    
        FileSystem fs = full.getFileSystem(conf);
        FileStatus fstat = fs.getFileStatus(full);
        
        LOG.info("file size: " + fstat.getLen());
        DefaultJobHistoryParser.parseJobTasks(location, job,
                full.getFileSystem(conf)); 
        LOG.info("job history parsed sucessfully");
        HadoopJobHistoryLoader.parseJobHistory(conf, job, value);
        LOG.info("get parsed job history");
        
        // parse Hadoop job xml file
        Path parent = full.getParent();
        String jobXml = jobDetails[0] + "_" + jobDetails[1] + "_" + jobDetails[2] + "_conf.xml";
        Path p = new Path(parent, jobXml);  
     
        FSDataInputStream fileIn = fs.open(p);
        Map<String, String> val = HadoopJobHistoryLoader
                .parseJobXML(fileIn);
        for (String key : val.keySet()) {
            value.job.put(key, val.get(key));
        }
        
        location = null;
        return true;
    }          
    value = null;
    return false;
}
 
源代码16 项目: hiped2   文件: ExtractJobMetrics.java
public static void dumpTaskTimes(String... args)
    throws Exception {

  JobHistory.JobInfo job = JobHistoryHelper.getJobInfoFromCliArgs(args);

  printAllTaskAttempts(job);
}