下面列出了org.apache.hadoop.mapred.JobHistory#JobInfo ( ) 实例代码,或者点击链接到github查看源代码,也可以在右侧发表评论。
public static void printTasks(JobHistory.JobInfo job)
throws ParseException, IllegalAccessException,
InvocationTargetException, NoSuchMethodException {
List<TaskMetrics> mapMetrics = JobHistoryHelper.getMapTaskMetrics(
job);
List<TaskMetrics> reduceMetrics = JobHistoryHelper.getReduceTaskMetrics(
job);
decorateHeader("MAP TASKS");
dumpTasks(mapMetrics, "throughput",
new TaskMetrics.OverallThroughputComparator(), "getOverallThroughputBytesPerSecond",
false, false);
decorateHeader("REDUCE TASKS");
dumpTasks(reduceMetrics, "throughput",
new TaskMetrics.OverallThroughputComparator(), "getOverallThroughputBytesPerSecond",
false, true);
}
public static JobHistory.JobInfo getJobInfoFromHdfsOutputDir(String outputDir, Configuration conf)
throws IOException {
Path output = new Path(outputDir);
Path historyLogDir = new Path(output, "_logs/history");
FileSystem fs = output.getFileSystem(conf);
if (!fs.exists(output)) {
throw new IOException("History directory " + historyLogDir.toString()
+ " does not exist");
}
Path[] jobFiles = FileUtil.stat2Paths(fs.listStatus(historyLogDir,
jobLogFileFilter));
if (jobFiles.length == 0) {
throw new IOException("Not a valid history directory "
+ historyLogDir.toString());
}
String[] jobDetails =
JobHistory.JobInfo.decodeJobHistoryFileName(jobFiles[0].getName()).
split("_");
String jobId = jobDetails[2] + "_" + jobDetails[3] + "_" + jobDetails[4];
JobHistory.JobInfo job = new JobHistory.JobInfo(jobId);
DefaultJobHistoryParser.parseJobTasks(jobFiles[0].toString(), job, fs);
return job;
}
public static void dumpTaskTimes(String... args)
throws Exception {
JobHistory.JobInfo job = JobHistoryHelper.getJobInfoFromCliArgs(args);
List<TaskMetrics> mapMetrics =
JobHistoryHelper.getMapTaskMetrics(job);
List<TaskMetrics> reduceMetrics =
JobHistoryHelper.getReduceTaskMetrics(
job);
System.out.println("# MAP-EXEC-TIME-SECS\tMAP_INPUT_BYTES");
dumpTaskTimes(mapMetrics, new TaskMetrics.ExecTimeComparator());
System.out.println();
System.out.println("# REDUCE-EXEC-TIME-SECS\tREDUCE_INPUT_BYTES");
dumpTaskTimes(reduceMetrics, new TaskMetrics.ExecTimeComparator());
}
public static void printAllTaskAttempts(JobHistory.JobInfo job)
throws ParseException {
PaddedTable table = new PaddedTable();
table
.addColumnTitle("Type")
.addColumnTitle("TaskId")
.addColumnTitle("Status")
.addColumnTitle("Host")
.addColumnTitle("OverallTime(HH:MM:SS)")
.addColumnTitle("ShuffleTime(HH:MM:SS)")
.addColumnTitle("SortTime(HH:MM:SS)")
.addColumnTitle("MapInputBytes")
.addColumnTitle("MapOutputBytes")
.addColumnTitle("InputRecords")
.addColumnTitle("OputputRecords");
printAllTaskAttempts(table, job, JobHistory.Values.MAP.name());
printAllTaskAttempts(table, job, JobHistory.Values.REDUCE.name());
printAllTaskAttempts(table, job, JobHistory.Values.SETUP.name());
printAllTaskAttempts(table, job, JobHistory.Values.CLEANUP.name());
System.out.println(table);
}
public static void printTasks(JobHistory.JobInfo job)
throws ParseException, IllegalAccessException,
InvocationTargetException, NoSuchMethodException {
List<TaskMetrics> mapMetrics = JobHistoryHelper.getMapTaskMetrics(
job);
List<TaskMetrics> reduceMetrics = JobHistoryHelper.getReduceTaskMetrics(
job);
decorateHeader("MAP TASKS");
dumpTasks(mapMetrics, "execution time",
new TaskMetrics.ExecTimeComparator(), "getOverallTimeMillis",
true);
dumpTasks(mapMetrics, "input records",
new TaskMetrics.InputRecordsComparator(), "getInputRecords",
false);
dumpTasks(mapMetrics, "input bytes",
new TaskMetrics.InputBytesComparator(), "getInputBytes",
false);
decorateHeader("REDUCE TASKS");
dumpTasks(reduceMetrics, "execution time",
new TaskMetrics.ExecTimeComparator(), "getOverallTimeMillis", true);
dumpTasks(reduceMetrics, "input records",
new TaskMetrics.InputRecordsComparator(), "getInputRecords",
false);
dumpTasks(reduceMetrics, "input bytes",
new TaskMetrics.InputBytesComparator(), "getInputBytes",
false);
}
public static JobHistory.JobInfo getJobInfoFromCliArgs(Configuration conf, String ... args)
throws IOException {
String usage = "Expected 2 arguments, either --hdfsdir <dir> or --localfile <path>";
if(args.length != 2) {
throw new IOException(usage);
}
if("--hdfsdir".equals(args[0])) {
return getJobInfoFromHdfsOutputDir(args[1], conf);
} else if("--localfile".equals(args[0])) {
return getJobInfoFromLocalFile(args[1], conf);
}
throw new IOException("Unexpected option '" + args[0] + "' \n" + usage);
}
public static JobHistory.JobInfo getJobInfoFromLocalFile(String outputFile, Configuration conf)
throws IOException {
FileSystem fs = FileSystem.getLocal(conf);
Path outputFilePath = new Path(outputFile);
String[] jobDetails =
JobHistory.JobInfo.decodeJobHistoryFileName(outputFilePath.getName()).
split("_");
String jobId = jobDetails[2] + "_" + jobDetails[3] + "_" + jobDetails[4];
JobHistory.JobInfo job = new JobHistory.JobInfo(jobId);
DefaultJobHistoryParser.parseJobTasks(outputFile, job, fs);
return job;
}
public static List<TaskMetrics> getMapTaskMetrics(
JobHistory.JobInfo job)
throws ParseException {
List<TaskMetrics> metrics = new ArrayList<TaskMetrics>();
addTask(metrics, job, JobHistory.Values.MAP.name());
return metrics;
}
public static List<TaskMetrics> getReduceTaskMetrics(
JobHistory.JobInfo job)
throws ParseException {
List<TaskMetrics> metrics = new ArrayList<TaskMetrics>();
addTask(metrics, job, JobHistory.Values.REDUCE.name());
return metrics;
}
public static void dumpTaskTimes(String... args)
throws Exception {
JobHistory.JobInfo job = JobHistoryHelper.getJobInfoFromCliArgs(args);
long startTime = job.getLong(JobHistory.Keys.LAUNCH_TIME);
long endTime = job.getLong(JobHistory.Keys.FINISH_TIME);
List<TimeRange> mapRanges = new ArrayList<TimeRange>();
List<TimeRange> reduceRanges = new ArrayList<TimeRange>();
List<TimeRange> shuffleRanges = new ArrayList<TimeRange>();
List<TimeRange> sortRanges = new ArrayList<TimeRange>();
Map<String, JobHistory.Task> tasks = job.getAllTasks();
for (JobHistory.Task task : tasks.values()) {
for (JobHistory.TaskAttempt attempt : task.getTaskAttempts()
.values()) {
String taskId = attempt.get(JobHistory.Keys.TASK_ATTEMPT_ID);
String taskType = task.get(JobHistory.Keys.TASK_TYPE);
String taskStatus = task.get(JobHistory.Keys.TASK_STATUS);
System.out.println(taskId + " " + taskType + " " + taskStatus);
long taskStartTime =
attempt.getLong(JobHistory.Keys.START_TIME);
long taskEndTime =
attempt.getLong(JobHistory.Keys.FINISH_TIME);
TimeRange range =
new TimeRange(TimeUnit.MILLISECONDS, taskStartTime,
taskEndTime);
if (JobHistory.Values.MAP.name().equals(taskType)) {
mapRanges.add(range);
} else if (JobHistory.Values.REDUCE.name().equals(taskType)) {
long shuffleEndTime =
attempt.getLong(JobHistory.Keys.SHUFFLE_FINISHED);
long sortEndTime =
attempt.getLong(JobHistory.Keys.SORT_FINISHED);
shuffleRanges.add(
new TimeRange(TimeUnit.MILLISECONDS, taskStartTime,
shuffleEndTime));
sortRanges.add(
new TimeRange(TimeUnit.MILLISECONDS, shuffleEndTime,
sortEndTime));
reduceRanges.add(
new TimeRange(TimeUnit.MILLISECONDS, sortEndTime,
taskEndTime));
}
}
}
// output the data, tab-separated in the following order:
// time-offset #-map-tasks #-reduce-tasks #-shuffle-tasks #-sort-tasks #-waste-tasks
// steps of 1 second
StringBuilder sb = new StringBuilder();
sb.append("time")
.append("\tmap")
.append("\treduce")
.append("\tshuffle")
.append("\tsort")
;
System.err.println(sb);
int timeOffset = 0;
for (long i = startTime; i <= endTime; i += 1000) {
sb = new StringBuilder();
sb.append(timeOffset)
.append("\t").append(countRangesForTime(mapRanges, i))
.append("\t").append(countRangesForTime(reduceRanges, i))
.append("\t").append(countRangesForTime(shuffleRanges, i))
.append("\t").append(countRangesForTime(sortRanges, i))
;
System.err.println(sb);
timeOffset++;
}
}
public static JobHistory.JobInfo getJobInfoFromCliArgs(String ... args)
throws IOException {
return getJobInfoFromCliArgs(new Configuration(), args);
}
public static void addTask(List<TaskMetrics> metrics,
JobHistory.JobInfo job,
String taskType)
throws ParseException {
Map<String, JobHistory.Task> tasks = job.getAllTasks();
for (JobHistory.Task task : tasks.values()) {
for (JobHistory.TaskAttempt attempt : task.getTaskAttempts()
.values()) {
if (taskType.equals(task.get(JobHistory.Keys.TASK_TYPE))) {
TaskMetrics metric = new TaskMetrics();
metrics.add(metric);
metric.setType(taskType)
.setTaskId(attempt.get(JobHistory.Keys.TASK_ATTEMPT_ID))
.setHost(attempt.get(JobHistory.Keys.HOSTNAME))
.setStatus(attempt.get(JobHistory.Keys.TASK_STATUS));
long taskOverallTime =
attempt.getLong(JobHistory.Keys.FINISH_TIME) -
attempt.getLong(JobHistory.Keys.START_TIME);
metric.setOverallTimeMillis(taskOverallTime);
metric.setInputBytes(
extractNumericCounter(
attempt.get(JobHistory.Keys.COUNTERS),
MAP_INPUT_BYTES.name(),
REDUCE_SHUFFLE_BYTES.name()));
metric.setOutputBytes(
extractNumericCounter(
attempt.get(JobHistory.Keys.COUNTERS),
MAP_OUTPUT_BYTES.name(),
"HDFS_BYTES_WRITTEN"));
metric.setInputRecords(
extractNumericCounter(
attempt.get(JobHistory.Keys.COUNTERS),
MAP_INPUT_RECORDS.name(),
REDUCE_INPUT_RECORDS.name()));
metric.setOutputRecords(
extractNumericCounter(
attempt.get(JobHistory.Keys.COUNTERS),
MAP_OUTPUT_RECORDS.name(),
REDUCE_OUTPUT_RECORDS.name()));
if (JobHistory.Values.REDUCE.name()
.equals(task.get(JobHistory.Keys.TASK_TYPE))) {
long shuffleTime =
attempt.getLong(JobHistory.Keys.SHUFFLE_FINISHED) -
attempt.getLong(JobHistory.Keys.START_TIME);
long sortTime =
attempt.getLong(JobHistory.Keys.SORT_FINISHED) -
attempt
.getLong(JobHistory.Keys.SHUFFLE_FINISHED);
metric.setShuffleTimeMillis(shuffleTime);
metric.setSortTimeMillis(sortTime);
}
}
}
}
}
public static void printTasks(JobHistory.JobInfo job)
throws ParseException, IllegalAccessException,
InvocationTargetException, NoSuchMethodException {
List<TaskMetrics> mapMetrics = JobHistoryHelper.getMapTaskMetrics(
job);
List<TaskMetrics> reduceMetrics =
JobHistoryHelper.getReduceTaskMetrics(
job);
System.out.println("Job Statistics");
System.out.println("");
PaddedTable table = new PaddedTable();
table
.addColumnTitle("Item")
.addColumnTitle("Min")
.addColumnTitle("Max")
.addColumnTitle("Median")
.addColumnTitle("Mean")
.addColumnTitle("StdDev");
dumpTasks(table, mapMetrics, "execution time",
"getOverallTimeMillis", true);
dumpTasks(table, mapMetrics, "input records", "getInputRecords",
false);
dumpTasks(table, mapMetrics, "input bytes", "getInputBytes", false);
dumpTasks(table, mapMetrics, "output records", "getOutputRecords",
false);
dumpTasks(table, mapMetrics, "output bytes", "getOutputBytes",
false);
DataSkewMetrics.decorateHeader("MAP TASKS");
System.out.println("");
System.out.println("Num Map Tasks: " + mapMetrics.size());
System.out.println("");
System.out.println(table);
table.clearRows();
if(reduceMetrics.size() > 0) {
dumpTasks(table, reduceMetrics, "execution time",
"getOverallTimeMillis", true);
dumpTasks(table, reduceMetrics, "shuffle time",
"getShuffleTimeMillis", true);
dumpTasks(table, reduceMetrics, "sort time", "getSortTimeMillis",
true);
dumpTasks(table, reduceMetrics, "input records", "getInputRecords",
false);
dumpTasks(table, reduceMetrics, "input bytes", "getInputBytes",
false);
dumpTasks(table, reduceMetrics, "output records",
"getOutputRecords", false);
dumpTasks(table, reduceMetrics, "output bytes", "getOutputBytes",
false);
DataSkewMetrics.decorateHeader("REDUCE TASKS");
System.out.println("");
System.out.println("Num Reduce Tasks: " + reduceMetrics.size());
System.out.println("");
System.out.println(table);
}
}
public static void printAllTaskAttempts(PaddedTable table,
JobHistory.JobInfo job,
String taskType)
throws ParseException {
Map<String, JobHistory.Task> tasks = job.getAllTasks();
for (JobHistory.Task task : tasks.values()) {
for (JobHistory.TaskAttempt attempt : task.getTaskAttempts()
.values()) {
if (taskType.equals(task.get(JobHistory.Keys.TASK_TYPE))) {
long taskOverallTime =
attempt.getLong(JobHistory.Keys.FINISH_TIME) -
attempt.getLong(JobHistory.Keys.START_TIME);
long shuffleTime =
attempt.getLong(JobHistory.Keys.SHUFFLE_FINISHED) -
attempt.getLong(JobHistory.Keys.START_TIME);
long taskSortTime =
attempt.getLong(JobHistory.Keys.SORT_FINISHED) -
attempt
.getLong(JobHistory.Keys.SHUFFLE_FINISHED);
table.newRow()
.addColumnValue(taskType)
.addColumnValue(
attempt.get(JobHistory.Keys.TASK_ATTEMPT_ID))
.addColumnValue(
attempt.get(JobHistory.Keys.TASK_STATUS))
.addColumnValue(attempt.get(JobHistory.Keys.HOSTNAME))
.addColumnValue(formatTime(taskOverallTime));
if (JobHistory.Values.REDUCE.name()
.equals(task.get(JobHistory.Keys.TASK_TYPE))) {
table.addColumnValue(formatTime(shuffleTime))
.addColumnValue(formatTime(taskSortTime));
} else {
table.addColumnValue("").addColumnValue("");
}
table.addColumnValue(
extractCounter(attempt.get(JobHistory.Keys.COUNTERS),
MAP_INPUT_BYTES.name()));
table.addColumnValue(
extractCounter(attempt.get(JobHistory.Keys.COUNTERS),
MAP_OUTPUT_BYTES.name()));
table.addColumnValue(
extractCounter(attempt.get(JobHistory.Keys.COUNTERS),
MAP_INPUT_RECORDS.name(),
REDUCE_INPUT_RECORDS.name()));
table.addColumnValue(
extractCounter(attempt.get(JobHistory.Keys.COUNTERS),
MAP_OUTPUT_RECORDS.name(),
REDUCE_OUTPUT_RECORDS.name()));
}
}
}
}
@Override
public boolean nextKeyValue() throws IOException, InterruptedException {
if (location != null) {
LOG.info("load: " + location);
Path full = new Path(location);
String[] jobDetails =
JobInfo.decodeJobHistoryFileName(full.getName()).split("_");
String jobId = jobDetails[2] + "_" + jobDetails[3] + "_"
+ jobDetails[4];
JobHistory.JobInfo job = new JobHistory.JobInfo(jobId);
value = new MRJobInfo();
FileSystem fs = full.getFileSystem(conf);
FileStatus fstat = fs.getFileStatus(full);
LOG.info("file size: " + fstat.getLen());
DefaultJobHistoryParser.parseJobTasks(location, job,
full.getFileSystem(conf));
LOG.info("job history parsed sucessfully");
HadoopJobHistoryLoader.parseJobHistory(conf, job, value);
LOG.info("get parsed job history");
// parse Hadoop job xml file
Path parent = full.getParent();
String jobXml = jobDetails[0] + "_" + jobDetails[1] + "_" + jobDetails[2] + "_conf.xml";
Path p = new Path(parent, jobXml);
FSDataInputStream fileIn = fs.open(p);
Map<String, String> val = HadoopJobHistoryLoader
.parseJobXML(fileIn);
for (String key : val.keySet()) {
value.job.put(key, val.get(key));
}
location = null;
return true;
}
value = null;
return false;
}
public static void dumpTaskTimes(String... args)
throws Exception {
JobHistory.JobInfo job = JobHistoryHelper.getJobInfoFromCliArgs(args);
printAllTaskAttempts(job);
}