下面列出了org.apache.hadoop.mapred.JobHistory#Task ( ) 实例代码,或者点击链接到github查看源代码,也可以在右侧发表评论。
private static Map<JobHistory.Keys, String> getLastSuccessfulTaskAttempt(
JobHistory.Task task) {
Map<String, JobHistory.TaskAttempt> taskAttempts = task
.getTaskAttempts();
int size = taskAttempts.size();
Iterator<Map.Entry<String, JobHistory.TaskAttempt>> kv = taskAttempts
.entrySet().iterator();
for (int i = 0; i < size; i++) {
// CHECK_IT: Only one SUCCESSFUL TASK ATTEMPT
Map.Entry<String, JobHistory.TaskAttempt> tae = kv.next();
JobHistory.TaskAttempt attempt = tae.getValue();
if (null != attempt && null != attempt.getValues() && attempt.getValues().containsKey(JobHistory.Keys.TASK_STATUS) && attempt.getValues().get(JobHistory.Keys.TASK_STATUS).equals(
"SUCCESS")) {
return attempt.getValues();
}
}
return null;
}
private java.util.Map<JobHistory.Keys, String> getLastSuccessfulTaskAttempt(JobHistory.Task task) {
Map<String, JobHistory.TaskAttempt> taskAttempts = task.getTaskAttempts();
int size = taskAttempts.size();
java.util.Iterator<Map.Entry<String, JobHistory.TaskAttempt>> kv = taskAttempts.entrySet().iterator();
for (int i=0; i<size; i++) {
// CHECK_IT: Only one SUCCESSFUL TASK ATTEMPT
Map.Entry<String, JobHistory.TaskAttempt> tae = kv.next();
JobHistory.TaskAttempt attempt = tae.getValue();
if (attempt.getValues().get(JobHistory.Keys.TASK_STATUS).equals("SUCCESS")) {
return attempt.getValues();
}
}
return null;
}
private java.util.Map<JobHistory.Keys, String> getLastSuccessfulTaskAttempt(JobHistory.Task task) {
Map<String, JobHistory.TaskAttempt> taskAttempts = task.getTaskAttempts();
int size = taskAttempts.size();
java.util.Iterator<Map.Entry<String, JobHistory.TaskAttempt>> kv = taskAttempts.entrySet().iterator();
for (int i=0; i<size; i++) {
// CHECK_IT: Only one SUCCESSFUL TASK ATTEMPT
Map.Entry<String, JobHistory.TaskAttempt> tae = kv.next();
JobHistory.TaskAttempt attempt = tae.getValue();
if (attempt.getValues().get(JobHistory.Keys.TASK_STATUS).equals("SUCCESS")) {
return attempt.getValues();
}
}
return null;
}
public static void dumpTaskTimes(String... args)
throws Exception {
JobHistory.JobInfo job = JobHistoryHelper.getJobInfoFromCliArgs(args);
long startTime = job.getLong(JobHistory.Keys.LAUNCH_TIME);
long endTime = job.getLong(JobHistory.Keys.FINISH_TIME);
List<TimeRange> mapRanges = new ArrayList<TimeRange>();
List<TimeRange> reduceRanges = new ArrayList<TimeRange>();
List<TimeRange> shuffleRanges = new ArrayList<TimeRange>();
List<TimeRange> sortRanges = new ArrayList<TimeRange>();
Map<String, JobHistory.Task> tasks = job.getAllTasks();
for (JobHistory.Task task : tasks.values()) {
for (JobHistory.TaskAttempt attempt : task.getTaskAttempts()
.values()) {
String taskId = attempt.get(JobHistory.Keys.TASK_ATTEMPT_ID);
String taskType = task.get(JobHistory.Keys.TASK_TYPE);
String taskStatus = task.get(JobHistory.Keys.TASK_STATUS);
System.out.println(taskId + " " + taskType + " " + taskStatus);
long taskStartTime =
attempt.getLong(JobHistory.Keys.START_TIME);
long taskEndTime =
attempt.getLong(JobHistory.Keys.FINISH_TIME);
TimeRange range =
new TimeRange(TimeUnit.MILLISECONDS, taskStartTime,
taskEndTime);
if (JobHistory.Values.MAP.name().equals(taskType)) {
mapRanges.add(range);
} else if (JobHistory.Values.REDUCE.name().equals(taskType)) {
long shuffleEndTime =
attempt.getLong(JobHistory.Keys.SHUFFLE_FINISHED);
long sortEndTime =
attempt.getLong(JobHistory.Keys.SORT_FINISHED);
shuffleRanges.add(
new TimeRange(TimeUnit.MILLISECONDS, taskStartTime,
shuffleEndTime));
sortRanges.add(
new TimeRange(TimeUnit.MILLISECONDS, shuffleEndTime,
sortEndTime));
reduceRanges.add(
new TimeRange(TimeUnit.MILLISECONDS, sortEndTime,
taskEndTime));
}
}
}
// output the data, tab-separated in the following order:
// time-offset #-map-tasks #-reduce-tasks #-shuffle-tasks #-sort-tasks #-waste-tasks
// steps of 1 second
StringBuilder sb = new StringBuilder();
sb.append("time")
.append("\tmap")
.append("\treduce")
.append("\tshuffle")
.append("\tsort")
;
System.err.println(sb);
int timeOffset = 0;
for (long i = startTime; i <= endTime; i += 1000) {
sb = new StringBuilder();
sb.append(timeOffset)
.append("\t").append(countRangesForTime(mapRanges, i))
.append("\t").append(countRangesForTime(reduceRanges, i))
.append("\t").append(countRangesForTime(shuffleRanges, i))
.append("\t").append(countRangesForTime(sortRanges, i))
;
System.err.println(sb);
timeOffset++;
}
}
public static void addTask(List<TaskMetrics> metrics,
JobHistory.JobInfo job,
String taskType)
throws ParseException {
Map<String, JobHistory.Task> tasks = job.getAllTasks();
for (JobHistory.Task task : tasks.values()) {
for (JobHistory.TaskAttempt attempt : task.getTaskAttempts()
.values()) {
if (taskType.equals(task.get(JobHistory.Keys.TASK_TYPE))) {
TaskMetrics metric = new TaskMetrics();
metrics.add(metric);
metric.setType(taskType)
.setTaskId(attempt.get(JobHistory.Keys.TASK_ATTEMPT_ID))
.setHost(attempt.get(JobHistory.Keys.HOSTNAME))
.setStatus(attempt.get(JobHistory.Keys.TASK_STATUS));
long taskOverallTime =
attempt.getLong(JobHistory.Keys.FINISH_TIME) -
attempt.getLong(JobHistory.Keys.START_TIME);
metric.setOverallTimeMillis(taskOverallTime);
metric.setInputBytes(
extractNumericCounter(
attempt.get(JobHistory.Keys.COUNTERS),
MAP_INPUT_BYTES.name(),
REDUCE_SHUFFLE_BYTES.name()));
metric.setOutputBytes(
extractNumericCounter(
attempt.get(JobHistory.Keys.COUNTERS),
MAP_OUTPUT_BYTES.name(),
"HDFS_BYTES_WRITTEN"));
metric.setInputRecords(
extractNumericCounter(
attempt.get(JobHistory.Keys.COUNTERS),
MAP_INPUT_RECORDS.name(),
REDUCE_INPUT_RECORDS.name()));
metric.setOutputRecords(
extractNumericCounter(
attempt.get(JobHistory.Keys.COUNTERS),
MAP_OUTPUT_RECORDS.name(),
REDUCE_OUTPUT_RECORDS.name()));
if (JobHistory.Values.REDUCE.name()
.equals(task.get(JobHistory.Keys.TASK_TYPE))) {
long shuffleTime =
attempt.getLong(JobHistory.Keys.SHUFFLE_FINISHED) -
attempt.getLong(JobHistory.Keys.START_TIME);
long sortTime =
attempt.getLong(JobHistory.Keys.SORT_FINISHED) -
attempt
.getLong(JobHistory.Keys.SHUFFLE_FINISHED);
metric.setShuffleTimeMillis(shuffleTime);
metric.setSortTimeMillis(sortTime);
}
}
}
}
}
public static void printAllTaskAttempts(PaddedTable table,
JobHistory.JobInfo job,
String taskType)
throws ParseException {
Map<String, JobHistory.Task> tasks = job.getAllTasks();
for (JobHistory.Task task : tasks.values()) {
for (JobHistory.TaskAttempt attempt : task.getTaskAttempts()
.values()) {
if (taskType.equals(task.get(JobHistory.Keys.TASK_TYPE))) {
long taskOverallTime =
attempt.getLong(JobHistory.Keys.FINISH_TIME) -
attempt.getLong(JobHistory.Keys.START_TIME);
long shuffleTime =
attempt.getLong(JobHistory.Keys.SHUFFLE_FINISHED) -
attempt.getLong(JobHistory.Keys.START_TIME);
long taskSortTime =
attempt.getLong(JobHistory.Keys.SORT_FINISHED) -
attempt
.getLong(JobHistory.Keys.SHUFFLE_FINISHED);
table.newRow()
.addColumnValue(taskType)
.addColumnValue(
attempt.get(JobHistory.Keys.TASK_ATTEMPT_ID))
.addColumnValue(
attempt.get(JobHistory.Keys.TASK_STATUS))
.addColumnValue(attempt.get(JobHistory.Keys.HOSTNAME))
.addColumnValue(formatTime(taskOverallTime));
if (JobHistory.Values.REDUCE.name()
.equals(task.get(JobHistory.Keys.TASK_TYPE))) {
table.addColumnValue(formatTime(shuffleTime))
.addColumnValue(formatTime(taskSortTime));
} else {
table.addColumnValue("").addColumnValue("");
}
table.addColumnValue(
extractCounter(attempt.get(JobHistory.Keys.COUNTERS),
MAP_INPUT_BYTES.name()));
table.addColumnValue(
extractCounter(attempt.get(JobHistory.Keys.COUNTERS),
MAP_OUTPUT_BYTES.name()));
table.addColumnValue(
extractCounter(attempt.get(JobHistory.Keys.COUNTERS),
MAP_INPUT_RECORDS.name(),
REDUCE_INPUT_RECORDS.name()));
table.addColumnValue(
extractCounter(attempt.get(JobHistory.Keys.COUNTERS),
MAP_OUTPUT_RECORDS.name(),
REDUCE_OUTPUT_RECORDS.name()));
}
}
}
}