下面列出了怎么用org.apache.hadoop.mapred.JobHistory.Keys的API类实例代码及写法,或者点击链接到github查看源代码。
private static Map<JobHistory.Keys, String> getLastSuccessfulTaskAttempt(
JobHistory.Task task) {
Map<String, JobHistory.TaskAttempt> taskAttempts = task
.getTaskAttempts();
int size = taskAttempts.size();
Iterator<Map.Entry<String, JobHistory.TaskAttempt>> kv = taskAttempts
.entrySet().iterator();
for (int i = 0; i < size; i++) {
// CHECK_IT: Only one SUCCESSFUL TASK ATTEMPT
Map.Entry<String, JobHistory.TaskAttempt> tae = kv.next();
JobHistory.TaskAttempt attempt = tae.getValue();
if (null != attempt && null != attempt.getValues() && attempt.getValues().containsKey(JobHistory.Keys.TASK_STATUS) && attempt.getValues().get(JobHistory.Keys.TASK_STATUS).equals(
"SUCCESS")) {
return attempt.getValues();
}
}
return null;
}
public void handle(JobHistory.RecordTypes recType, Map<Keys, String> values)
throws IOException {
if (recType.equals(JobHistory.RecordTypes.MapAttempt) ||
recType.equals(JobHistory.RecordTypes.ReduceAttempt)) {
if (failureType.equals(values.get(Keys.TASK_STATUS)) ) {
String hostName = values.get(Keys.HOSTNAME);
String taskid = values.get(Keys.TASKID);
Set<String> tasks = badNodesToNumFailedTasks.get(hostName);
if (null == tasks ){
tasks = new TreeSet<String>();
tasks.add(taskid);
badNodesToNumFailedTasks.put(hostName, tasks);
}else{
tasks.add(taskid);
}
}
}
}
private java.util.Map<JobHistory.Keys, String> getLastSuccessfulTaskAttempt(JobHistory.Task task) {
Map<String, JobHistory.TaskAttempt> taskAttempts = task.getTaskAttempts();
int size = taskAttempts.size();
java.util.Iterator<Map.Entry<String, JobHistory.TaskAttempt>> kv = taskAttempts.entrySet().iterator();
for (int i=0; i<size; i++) {
// CHECK_IT: Only one SUCCESSFUL TASK ATTEMPT
Map.Entry<String, JobHistory.TaskAttempt> tae = kv.next();
JobHistory.TaskAttempt attempt = tae.getValue();
if (attempt.getValues().get(JobHistory.Keys.TASK_STATUS).equals("SUCCESS")) {
return attempt.getValues();
}
}
return null;
}
/**
* Logs launch time of job.
*
* @param startTime start time of job.
* @param totalMaps total maps assigned by jobtracker.
* @param totalReduces total reduces.
*/
public void logInited(long startTime, int totalMaps, int totalReduces) {
if (disableHistory) {
return;
}
if (null != writers) {
log(writers, RecordTypes.Job,
new Keys[] {Keys.JOBID, Keys.LAUNCH_TIME, Keys.TOTAL_MAPS,
Keys.TOTAL_REDUCES, Keys.JOB_STATUS},
new String[] {jobId.toString(), String.valueOf(startTime),
String.valueOf(totalMaps),
String.valueOf(totalReduces),
Values.PREP.name()});
}
}
/**
* Logs job failed event. Closes the job history log file.
* @param timestamp time when job failure was detected in ms.
* @param finishedMaps no finished map tasks.
* @param finishedReduces no of finished reduce tasks.
*/
public void logFailed(long timestamp, int finishedMaps,
int finishedReduces, Counters counters) {
if (disableHistory) {
return;
}
if (null != writers) {
log(writers, RecordTypes.Job,
new Keys[] {Keys.JOBID, Keys.FINISH_TIME,
Keys.JOB_STATUS, Keys.FINISHED_MAPS,
Keys.FINISHED_REDUCES, Keys.COUNTERS},
new String[] {jobId.toString(),
String.valueOf(timestamp),
Values.FAILED.name(),
String.valueOf(finishedMaps),
String.valueOf(finishedReduces),
counters.makeEscapedCompactString()},
true);
closeAndClear(writers);
}
}
/**
* Logs job killed event. Closes the job history log file.
*
* @param timestamp
* time when job killed was issued in ms.
* @param finishedMaps
* no finished map tasks.
* @param finishedReduces
* no of finished reduce tasks.
*/
public void logKilled(long timestamp, int finishedMaps,
int finishedReduces, Counters counters) {
if (disableHistory) {
return;
}
if (null != writers) {
log(writers, RecordTypes.Job,
new Keys[] {Keys.JOBID,
Keys.FINISH_TIME, Keys.JOB_STATUS, Keys.FINISHED_MAPS,
Keys.FINISHED_REDUCES, Keys.COUNTERS },
new String[] {jobId.toString(),
String.valueOf(timestamp), Values.KILLED.name(),
String.valueOf(finishedMaps),
String.valueOf(finishedReduces),
counters.makeEscapedCompactString()},
true);
closeAndClear(writers);
}
}
/**
* Log start time of task (TIP).
* @param taskId task id
* @param taskType MAP or REDUCE
* @param startTime startTime of tip.
*/
public void logTaskStarted(TaskID taskId, String taskType,
long startTime, String splitLocations) {
if (disableHistory) {
return;
}
JobID id = taskId.getJobID();
if (!this.jobId.equals(id)) {
throw new RuntimeException("JobId from task: " + id +
" does not match expected: " + jobId);
}
if (null != writers) {
log(writers, RecordTypes.Task,
new Keys[]{Keys.TASKID, Keys.TASK_TYPE ,
Keys.START_TIME, Keys.SPLITS},
new String[]{taskId.toString(), taskType,
String.valueOf(startTime),
splitLocations});
}
}
/**
* Log finish time of task.
* @param taskId task id
* @param taskType MAP or REDUCE
* @param finishTime finish timeof task in ms
*/
public void logTaskFinished(TaskID taskId, String taskType,
long finishTime, Counters counters) {
if (disableHistory) {
return;
}
JobID id = taskId.getJobID();
if (!this.jobId.equals(id)) {
throw new RuntimeException("JobId from task: " + id +
" does not match expected: " + jobId);
}
if (null != writers) {
log(writers, RecordTypes.Task,
new Keys[]{Keys.TASKID, Keys.TASK_TYPE,
Keys.TASK_STATUS, Keys.FINISH_TIME,
Keys.COUNTERS},
new String[]{ taskId.toString(), taskType, Values.SUCCESS.name(),
String.valueOf(finishTime),
counters.makeEscapedCompactString()});
}
}
/**
* Update the finish time of task.
* @param taskId task id
* @param finishTime finish time of task in ms
*/
public void logTaskUpdates(TaskID taskId, long finishTime) {
if (disableHistory) {
return;
}
JobID id = taskId.getJobID();
if (!this.jobId.equals(id)) {
throw new RuntimeException("JobId from task: " + id +
" does not match expected: " + jobId);
}
if (null != writers) {
log(writers, RecordTypes.Task,
new Keys[]{Keys.TASKID, Keys.FINISH_TIME},
new String[]{ taskId.toString(),
String.valueOf(finishTime)});
}
}
/**
* Log a number of keys and values with the record. This method allows to do
* it in a synchronous fashion
* @param writers the writers to send the data to
* @param recordType the type to log
* @param keys keys to log
* @param values values to log
* @param sync if true - will block until the data is written
*/
private void log(ArrayList<PrintWriter> writers, RecordTypes recordType,
Keys[] keys, String[] values, boolean sync) {
StringBuffer buf = new StringBuffer(recordType.name());
buf.append(JobHistory.DELIMITER);
for (int i = 0; i < keys.length; i++) {
buf.append(keys[i]);
buf.append("=\"");
values[i] = JobHistory.escapeString(values[i]);
buf.append(values[i]);
buf.append("\"");
buf.append(JobHistory.DELIMITER);
}
buf.append(JobHistory.LINE_DELIMITER_CHAR);
for (PrintWriter out : writers) {
LogTask task = new LogTask(out, buf.toString());
if (sync) {
task.run();
} else {
fileManager.addWriteTask(task);
}
}
}
/**
* Adds a task-attempt in the listener
*/
private void processTaskAttempt(String taskAttemptId,
JobHistory.TaskAttempt attempt) {
TaskAttemptID id = TaskAttemptID.forName(taskAttemptId);
// Check if the transaction for this attempt can be committed
String taskStatus = attempt.get(Keys.TASK_STATUS);
if (taskStatus.length() > 0) {
// This means this is an update event
if (taskStatus.equals(Values.SUCCESS.name())) {
// Mark this attempt as hanging
hangingAttempts.put(id.getTaskID().toString(), taskAttemptId);
addSuccessfulAttempt(jip, id, attempt);
} else {
addUnsuccessfulAttempt(jip, id, attempt);
numEventsRecovered += 2;
}
} else {
createTaskAttempt(jip, id, attempt);
}
}
private JobStatusChangeEvent updateJob(JobInProgress jip,
JobHistory.JobInfo job) {
// Change the job priority
String jobpriority = job.get(Keys.JOB_PRIORITY);
JobPriority priority = JobPriority.valueOf(jobpriority);
// It's important to update this via the jobtracker's api as it will
// take care of updating the event listeners too
setJobPriority(jip.getJobID(), priority);
// Save the previous job status
JobStatus oldStatus = (JobStatus)jip.getStatus().clone();
// Set the start/launch time only if there are recovered tasks
// Increment the job's restart count
jip.updateJobInfo(job.getLong(JobHistory.Keys.SUBMIT_TIME),
job.getLong(JobHistory.Keys.LAUNCH_TIME));
// Save the new job status
JobStatus newStatus = (JobStatus)jip.getStatus().clone();
return new JobStatusChangeEvent(jip, EventType.START_TIME_CHANGED, oldStatus,
newStatus);
}
private void updateTip(TaskInProgress tip, JobHistory.Task task) {
long startTime = task.getLong(Keys.START_TIME);
if (startTime != 0) {
tip.setExecStartTime(startTime);
}
long finishTime = task.getLong(Keys.FINISH_TIME);
// For failed tasks finish-time will be missing
if (finishTime != 0) {
tip.setExecFinishTime(finishTime);
}
String cause = task.get(Keys.TASK_ATTEMPT_ID);
if (cause.length() > 0) {
// This means that the this is a FAILED events
TaskAttemptID id = TaskAttemptID.forName(cause);
TaskStatus status = tip.getTaskStatus(id);
synchronized (JobTracker.this) {
// This will add the tip failed event in the new log
tip.getJob().failedTask(tip, id, status.getDiagnosticInfo(),
status.getPhase(), status.getRunState(),
status.getTaskTracker());
}
}
}
public void handle(JobHistory.RecordTypes recType, Map<Keys, String> values)
throws IOException {
if (recType.equals(JobHistory.RecordTypes.MapAttempt) ||
recType.equals(JobHistory.RecordTypes.ReduceAttempt)) {
if (failureType.equals(values.get(Keys.TASK_STATUS)) ) {
String hostName = values.get(Keys.HOSTNAME);
String taskid = values.get(Keys.TASKID);
Set<String> tasks = badNodesToNumFailedTasks.get(hostName);
if (null == tasks ){
tasks = new TreeSet<String>();
tasks.add(taskid);
badNodesToNumFailedTasks.put(hostName, tasks);
}else{
tasks.add(taskid);
}
}
}
}
private java.util.Map<JobHistory.Keys, String> getLastSuccessfulTaskAttempt(JobHistory.Task task) {
Map<String, JobHistory.TaskAttempt> taskAttempts = task.getTaskAttempts();
int size = taskAttempts.size();
java.util.Iterator<Map.Entry<String, JobHistory.TaskAttempt>> kv = taskAttempts.entrySet().iterator();
for (int i=0; i<size; i++) {
// CHECK_IT: Only one SUCCESSFUL TASK ATTEMPT
Map.Entry<String, JobHistory.TaskAttempt> tae = kv.next();
JobHistory.TaskAttempt attempt = tae.getValue();
if (attempt.getValues().get(JobHistory.Keys.TASK_STATUS).equals("SUCCESS")) {
return attempt.getValues();
}
}
return null;
}
private static void populateJob (Map<JobHistory.Keys, String> jobC, Map<String, String> job) {
int size = jobC.size();
Iterator<Map.Entry<JobHistory.Keys, String>> kv = jobC.entrySet().iterator();
for (int i = 0; i < size; i++) {
Map.Entry<JobHistory.Keys, String> entry = (Map.Entry<JobHistory.Keys, String>) kv.next();
JobHistory.Keys key = entry.getKey();
String value = entry.getValue();
switch (key) {
case JOBTRACKERID: job.put(JobKeys.JOBTRACKERID.toString(), value); break;
case FINISH_TIME: job.put(JobKeys.FINISH_TIME.toString(), value); break;
case JOBID: job.put(JobKeys.JOBID.toString(), value); break;
case JOBNAME: job.put(JobKeys.JOBNAME.toString(), value); break;
case USER: job.put(JobKeys.USER.toString(), value); break;
case JOBCONF: job.put(JobKeys.JOBCONF.toString(), value); break;
case SUBMIT_TIME: job.put(JobKeys.SUBMIT_TIME.toString(), value); break;
case LAUNCH_TIME: job.put(JobKeys.LAUNCH_TIME.toString(), value); break;
case TOTAL_MAPS: job.put(JobKeys.TOTAL_MAPS.toString(), value); break;
case TOTAL_REDUCES: job.put(JobKeys.TOTAL_REDUCES.toString(), value); break;
case FAILED_MAPS: job.put(JobKeys.FAILED_MAPS.toString(), value); break;
case FAILED_REDUCES: job.put(JobKeys.FAILED_REDUCES.toString(), value); break;
case FINISHED_MAPS: job.put(JobKeys.FINISHED_MAPS.toString(), value); break;
case FINISHED_REDUCES: job.put(JobKeys.FINISHED_REDUCES.toString(), value); break;
case JOB_STATUS: job.put(JobKeys.STATUS.toString(), value); break;
case COUNTERS:
value.concat(",");
parseAndAddJobCounters(job, value);
break;
default:
LOG.debug("JobHistory.Keys."+ key + " : NOT INCLUDED IN LOADER RETURN VALUE");
break;
}
}
}
private JobHistory.Task getTask(String taskId) {
JobHistory.Task task = job.getAllTasks().get(taskId);
if (null == task) {
task = new JobHistory.Task();
task.set(Keys.TASKID, taskId);
job.getAllTasks().put(taskId, task);
}
return task;
}
private JobHistory.MapAttempt getMapAttempt(
String jobid, String jobTrackerId, String taskId, String taskAttemptId) {
JobHistory.Task task = getTask(taskId);
JobHistory.MapAttempt mapAttempt =
(JobHistory.MapAttempt) task.getTaskAttempts().get(taskAttemptId);
if (null == mapAttempt) {
mapAttempt = new JobHistory.MapAttempt();
mapAttempt.set(Keys.TASK_ATTEMPT_ID, taskAttemptId);
task.getTaskAttempts().put(taskAttemptId, mapAttempt);
}
return mapAttempt;
}
private JobHistory.ReduceAttempt getReduceAttempt(
String jobid, String jobTrackerId, String taskId, String taskAttemptId) {
JobHistory.Task task = getTask(taskId);
JobHistory.ReduceAttempt reduceAttempt =
(JobHistory.ReduceAttempt) task.getTaskAttempts().get(taskAttemptId);
if (null == reduceAttempt) {
reduceAttempt = new JobHistory.ReduceAttempt();
reduceAttempt.set(Keys.TASK_ATTEMPT_ID, taskAttemptId);
task.getTaskAttempts().put(taskAttemptId, reduceAttempt);
}
return reduceAttempt;
}
private void populate_Job (Hashtable<Enum, String> job, java.util.Map<JobHistory.Keys, String> jobC) throws ParseException {
int size = jobC.size();
java.util.Iterator<Map.Entry<JobHistory.Keys, String>> kv = jobC.entrySet().iterator();
for (int i = 0; i < size; i++)
{
Map.Entry<JobHistory.Keys, String> entry = (Map.Entry<JobHistory.Keys, String>) kv.next();
JobHistory.Keys key = entry.getKey();
String value = entry.getValue();
//System.out.println("JobHistory.JobKeys."+key+": "+value);
switch (key) {
case JOBTRACKERID: job.put(JobKeys.JOBTRACKERID, value); break;
case FINISH_TIME: job.put(JobKeys.FINISH_TIME, value); break;
case JOBID: job.put(JobKeys.JOBID, value); break;
case JOBNAME: job.put(JobKeys.JOBNAME, value); break;
case USER: job.put(JobKeys.USER, value); break;
case JOBCONF: job.put(JobKeys.JOBCONF, value); break;
case SUBMIT_TIME: job.put(JobKeys.SUBMIT_TIME, value); break;
case LAUNCH_TIME: job.put(JobKeys.LAUNCH_TIME, value); break;
case TOTAL_MAPS: job.put(JobKeys.TOTAL_MAPS, value); break;
case TOTAL_REDUCES: job.put(JobKeys.TOTAL_REDUCES, value); break;
case FAILED_MAPS: job.put(JobKeys.FAILED_MAPS, value); break;
case FAILED_REDUCES: job.put(JobKeys.FAILED_REDUCES, value); break;
case FINISHED_MAPS: job.put(JobKeys.FINISHED_MAPS, value); break;
case FINISHED_REDUCES: job.put(JobKeys.FINISHED_REDUCES, value); break;
case JOB_STATUS: job.put(JobKeys.STATUS, value); break;
case JOB_PRIORITY: job.put(JobKeys.JOB_PRIORITY, value); break;
case COUNTERS:
value.concat(",");
parseAndAddJobCounters(job, value);
break;
default: System.err.println("JobHistory.Keys."+key+" : NOT INCLUDED IN PERFORMANCE ADVISOR COUNTERS");
break;
}
}
}
/**
* Logs job as running
*/
public void logStarted() {
if (disableHistory) {
return;
}
if (null != writers) {
log(writers, RecordTypes.Job,
new Keys[] {Keys.JOBID, Keys.JOB_STATUS},
new String[] {jobId.toString(),
Values.RUNNING.name()});
}
}
/**
* Log job finished. closes the job file in history.
* @param finishTime finish time of job in ms.
* @param finishedMaps no of maps successfully finished.
* @param finishedReduces no of reduces finished sucessfully.
* @param failedMaps no of failed map tasks. (includes killed)
* @param failedReduces no of failed reduce tasks. (includes killed)
* @param killedMaps no of killed map tasks.
* @param killedReduces no of killed reduce tasks.
* @param counters the counters from the job
*/
public void logFinished(long finishTime,
int finishedMaps, int finishedReduces,
int failedMaps, int failedReduces,
int killedMaps, int killedReduces,
Counters mapCounters,
Counters reduceCounters,
Counters counters) {
if (disableHistory) {
return;
}
if (null != writers) {
log(writers, RecordTypes.Job,
new Keys[] {Keys.JOBID, Keys.FINISH_TIME,
Keys.JOB_STATUS, Keys.FINISHED_MAPS,
Keys.FINISHED_REDUCES,
Keys.FAILED_MAPS, Keys.FAILED_REDUCES,
Keys.KILLED_MAPS, Keys.KILLED_REDUCES,
Keys.MAP_COUNTERS, Keys.REDUCE_COUNTERS,
Keys.COUNTERS},
new String[] {jobId.toString(), Long.toString(finishTime),
Values.SUCCESS.name(),
String.valueOf(finishedMaps),
String.valueOf(finishedReduces),
String.valueOf(failedMaps),
String.valueOf(failedReduces),
String.valueOf(killedMaps),
String.valueOf(killedReduces),
mapCounters.makeEscapedCompactString(),
reduceCounters.makeEscapedCompactString(),
counters.makeEscapedCompactString()},
true);
closeAndClear(writers);
}
// NOTE: history cleaning stuff deleted from here. We should do that
// somewhere else!
}
/**
* Log job's priority.
* @param priority Jobs priority
*/
public void logJobPriority(JobID jobid, JobPriority priority) {
if (disableHistory) {
return;
}
if (null != writers) {
log(writers, RecordTypes.Job,
new Keys[] {Keys.JOBID, Keys.JOB_PRIORITY},
new String[] {jobId.toString(), priority.toString()});
}
}
/**
* Log the task failure
*
* @param taskId the task that failed
* @param taskType the type of the task
* @param time the time of the failure
* @param error the error the task failed with
* @param failedDueToAttempt The attempt that caused the failure, if any
*/
public void logTaskFailed(TaskID taskId, String taskType, long time,
String error,
TaskAttemptID failedDueToAttempt) {
if (disableHistory) {
return;
}
JobID id = taskId.getJobID();
if (!this.jobId.equals(id)) {
throw new RuntimeException("JobId from task: " + id +
" does not match expected: " + jobId);
}
if (null != writers) {
String failedAttempt = failedDueToAttempt == null ?
"" :
failedDueToAttempt.toString();
log(writers, RecordTypes.Task,
new Keys[]{Keys.TASKID, Keys.TASK_TYPE,
Keys.TASK_STATUS, Keys.FINISH_TIME,
Keys.ERROR, Keys.TASK_ATTEMPT_ID},
new String[]{ taskId.toString(), taskType,
Values.FAILED.name(),
String.valueOf(time) , error,
failedAttempt});
}
}
/**
* Log start time of this map task attempt.
*
* @param taskAttemptId task attempt id
* @param startTime start time of task attempt as reported by task tracker.
* @param trackerName name of the tracker executing the task attempt.
* @param httpPort http port of the task tracker executing the task attempt
* @param taskType Whether the attempt is cleanup or setup or map
*/
public void logMapTaskStarted(TaskAttemptID taskAttemptId, long startTime,
String trackerName, int httpPort,
String taskType) {
if (disableHistory) {
return;
}
JobID id = taskAttemptId.getJobID();
if (!this.jobId.equals(id)) {
throw new RuntimeException("JobId from task: " + id +
" does not match expected: " + jobId);
}
if (null != writers) {
log(writers, RecordTypes.MapAttempt,
new Keys[]{ Keys.TASK_TYPE, Keys.TASKID,
Keys.TASK_ATTEMPT_ID, Keys.START_TIME,
Keys.TRACKER_NAME, Keys.HTTP_PORT},
new String[]{taskType,
taskAttemptId.getTaskID().toString(),
taskAttemptId.toString(),
String.valueOf(startTime), trackerName,
httpPort == -1 ? "" :
String.valueOf(httpPort)});
}
}
/**
* Log finish time of map task attempt.
*
* @param taskAttemptId task attempt id
* @param finishTime finish time
* @param hostName host name
* @param taskType Whether the attempt is cleanup or setup or map
* @param stateString state string of the task attempt
* @param counter counters of the task attempt
*/
public void logMapTaskFinished(TaskAttemptID taskAttemptId,
long finishTime,
String hostName,
String taskType,
String stateString,
Counters counter) {
if (disableHistory) {
return;
}
JobID id = taskAttemptId.getJobID();
if (!this.jobId.equals(id)) {
throw new RuntimeException("JobId from task: " + id +
" does not match expected: " + jobId);
}
if (null != writers) {
log(writers, RecordTypes.MapAttempt,
new Keys[]{ Keys.TASK_TYPE, Keys.TASKID,
Keys.TASK_ATTEMPT_ID, Keys.TASK_STATUS,
Keys.FINISH_TIME, Keys.HOSTNAME,
Keys.STATE_STRING, Keys.COUNTERS},
new String[]{taskType,
taskAttemptId.getTaskID().toString(),
taskAttemptId.toString(),
Values.SUCCESS.name(),
String.valueOf(finishTime), hostName,
stateString,
counter.makeEscapedCompactString()});
}
}
/**
* Log task attempt failed event.
*
* @param taskAttemptId task attempt id
* @param timestamp timestamp
* @param hostName hostname of this task attempt.
* @param error error message if any for this task attempt.
* @param taskType Whether the attempt is cleanup or setup or map
*/
public void logMapTaskFailed(TaskAttemptID taskAttemptId,
long timestamp, String hostName,
String error, String taskType) {
if (disableHistory) {
return;
}
JobID id = taskAttemptId.getJobID();
if (!this.jobId.equals(id)) {
throw new RuntimeException("JobId from task: " + id +
" does not match expected: " + jobId);
}
if (null != writers) {
log(writers, RecordTypes.MapAttempt,
new Keys[]{Keys.TASK_TYPE, Keys.TASKID,
Keys.TASK_ATTEMPT_ID, Keys.TASK_STATUS,
Keys.FINISH_TIME, Keys.HOSTNAME, Keys.ERROR},
new String[]{ taskType,
taskAttemptId.getTaskID().toString(),
taskAttemptId.toString(),
Values.FAILED.name(),
String.valueOf(timestamp),
hostName, error});
}
}
/**
* Log task attempt killed event.
*
* @param taskAttemptId task attempt id
* @param timestamp timestamp
* @param hostName hostname of this task attempt.
* @param error error message if any for this task attempt.
* @param taskType Whether the attempt is cleanup or setup or map
*/
public void logMapTaskKilled(TaskAttemptID taskAttemptId,
long timestamp, String hostName,
String error, String taskType) {
if (disableHistory) {
return;
}
JobID id = taskAttemptId.getJobID();
if (!this.jobId.equals(id)) {
throw new RuntimeException("JobId from task: " + id +
" does not match expected: " + jobId);
}
if (null != writers) {
log(writers, RecordTypes.MapAttempt,
new Keys[]{Keys.TASK_TYPE, Keys.TASKID,
Keys.TASK_ATTEMPT_ID, Keys.TASK_STATUS,
Keys.FINISH_TIME, Keys.HOSTNAME,
Keys.ERROR},
new String[]{ taskType,
taskAttemptId.getTaskID().toString(),
taskAttemptId.toString(),
Values.KILLED.name(),
String.valueOf(timestamp),
hostName, error});
}
}
/**
* Log start time of Reduce task attempt.
*
* @param taskAttemptId task attempt id
* @param startTime start time
* @param trackerName tracker name
* @param httpPort the http port of the tracker executing the task attempt
* @param taskType Whether the attempt is cleanup or setup or reduce
*/
public void logReduceTaskStarted(TaskAttemptID taskAttemptId,
long startTime, String trackerName,
int httpPort,
String taskType) {
if (disableHistory) {
return;
}
JobID id = taskAttemptId.getJobID();
if (!this.jobId.equals(id)) {
throw new RuntimeException("JobId from task: " + id +
" does not match expected: " + jobId);
}
if (null != writers) {
log(writers, RecordTypes.ReduceAttempt,
new Keys[]{ Keys.TASK_TYPE, Keys.TASKID,
Keys.TASK_ATTEMPT_ID, Keys.START_TIME,
Keys.TRACKER_NAME, Keys.HTTP_PORT},
new String[]{taskType,
taskAttemptId.getTaskID().toString(),
taskAttemptId.toString(),
String.valueOf(startTime), trackerName,
httpPort == -1 ? "" :
String.valueOf(httpPort)});
}
}
/**
* Log finished event of this task.
*
* @param taskAttemptId task attempt id
* @param shuffleFinished shuffle finish time
* @param sortFinished sort finish time
* @param finishTime finish time of task
* @param hostName host name where task attempt executed
* @param taskType Whether the attempt is cleanup or setup or reduce
* @param stateString the state string of the attempt
* @param counter counters of the attempt
*/
public void logReduceTaskFinished(TaskAttemptID taskAttemptId,
long shuffleFinished,
long sortFinished, long finishTime,
String hostName, String taskType,
String stateString, Counters counter) {
if (disableHistory) {
return;
}
JobID id = taskAttemptId.getJobID();
if (!this.jobId.equals(id)) {
throw new RuntimeException("JobId from task: " + id +
" does not match expected: " + jobId);
}
if (null != writers) {
log(writers, RecordTypes.ReduceAttempt,
new Keys[]{ Keys.TASK_TYPE, Keys.TASKID,
Keys.TASK_ATTEMPT_ID, Keys.TASK_STATUS,
Keys.SHUFFLE_FINISHED, Keys.SORT_FINISHED,
Keys.FINISH_TIME, Keys.HOSTNAME,
Keys.STATE_STRING, Keys.COUNTERS},
new String[]{taskType,
taskAttemptId.getTaskID().toString(),
taskAttemptId.toString(),
Values.SUCCESS.name(),
String.valueOf(shuffleFinished),
String.valueOf(sortFinished),
String.valueOf(finishTime), hostName,
stateString,
counter.makeEscapedCompactString()});
}
}