下面列出了org.apache.hadoop.mapreduce.v2.LogParams#org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptReport 实例代码,或者点击链接到github查看源代码,也可以在右侧发表评论。
public TaskAttemptInfo(TaskAttempt ta, TaskType type, Boolean isRunning) {
final TaskAttemptReport report = ta.getReport();
this.type = type.toString();
this.id = MRApps.toString(ta.getID());
this.nodeHttpAddress = ta.getNodeHttpAddress();
this.startTime = report.getStartTime();
this.finishTime = report.getFinishTime();
this.assignedContainerId = ConverterUtils.toString(report.getContainerId());
this.assignedContainer = report.getContainerId();
this.progress = report.getProgress() * 100;
this.status = report.getStateString();
this.state = report.getTaskAttemptState();
this.elapsedTime = Times
.elapsed(this.startTime, this.finishTime, isRunning);
if (this.elapsedTime == -1) {
this.elapsedTime = 0;
}
this.diagnostics = report.getDiagnosticInfo();
this.rack = ta.getNodeRackName();
}
private void testMRAppHistory(MRApp app) throws Exception {
Configuration conf = new Configuration();
Job job = app.submit(conf);
app.waitForState(job, JobState.FAILED);
Map<TaskId, Task> tasks = job.getTasks();
Assert.assertEquals("Num tasks is not correct", 1, tasks.size());
Task task = tasks.values().iterator().next();
Assert.assertEquals("Task state not correct", TaskState.FAILED, task
.getReport().getTaskState());
Map<TaskAttemptId, TaskAttempt> attempts = tasks.values().iterator().next()
.getAttempts();
Assert.assertEquals("Num attempts is not correct", 4, attempts.size());
Iterator<TaskAttempt> it = attempts.values().iterator();
TaskAttemptReport report = it.next().getReport();
Assert.assertEquals("Attempt state not correct", TaskAttemptState.FAILED,
report.getTaskAttemptState());
Assert.assertEquals("Diagnostic Information is not Correct",
"Test Diagnostic Event", report.getDiagnosticInfo());
report = it.next().getReport();
Assert.assertEquals("Attempt state not correct", TaskAttemptState.FAILED,
report.getTaskAttemptState());
}
public void waitForInternalState(TaskAttemptImpl attempt,
TaskAttemptStateInternal finalState) throws Exception {
int timeoutSecs = 0;
TaskAttemptReport report = attempt.getReport();
TaskAttemptStateInternal iState = attempt.getInternalState();
while (!finalState.equals(iState) && timeoutSecs++ < 20) {
System.out.println("TaskAttempt Internal State is : " + iState
+ " Waiting for Internal state : " + finalState + " progress : "
+ report.getProgress());
Thread.sleep(500);
report = attempt.getReport();
iState = attempt.getInternalState();
}
System.out.println("TaskAttempt Internal State is : " + iState);
Assert.assertEquals("TaskAttempt Internal state is not correct (timedout)",
finalState, iState);
}
public void waitForState(TaskAttempt attempt,
TaskAttemptState finalState) throws Exception {
int timeoutSecs = 0;
TaskAttemptReport report = attempt.getReport();
while (!finalState.equals(report.getTaskAttemptState()) &&
timeoutSecs++ < 20) {
System.out.println("TaskAttempt State is : " + report.getTaskAttemptState() +
" Waiting for state : " + finalState +
" progress : " + report.getProgress());
report = attempt.getReport();
Thread.sleep(500);
}
System.out.println("TaskAttempt State is : " + report.getTaskAttemptState());
Assert.assertEquals("TaskAttempt state is not correct (timedout)",
finalState,
report.getTaskAttemptState());
}
public void verifyCompleted() {
for (Job job : getContext().getAllJobs().values()) {
JobReport jobReport = job.getReport();
System.out.println("Job start time :" + jobReport.getStartTime());
System.out.println("Job finish time :" + jobReport.getFinishTime());
Assert.assertTrue("Job start time is not less than finish time",
jobReport.getStartTime() <= jobReport.getFinishTime());
Assert.assertTrue("Job finish time is in future",
jobReport.getFinishTime() <= System.currentTimeMillis());
for (Task task : job.getTasks().values()) {
TaskReport taskReport = task.getReport();
System.out.println("Task start time : " + taskReport.getStartTime());
System.out.println("Task finish time : " + taskReport.getFinishTime());
Assert.assertTrue("Task start time is not less than finish time",
taskReport.getStartTime() <= taskReport.getFinishTime());
for (TaskAttempt attempt : task.getAttempts().values()) {
TaskAttemptReport attemptReport = attempt.getReport();
Assert.assertTrue("Attempt start time is not less than finish time",
attemptReport.getStartTime() <= attemptReport.getFinishTime());
}
}
}
}
public TaskAttemptInfo(TaskAttempt ta, TaskType type, Boolean isRunning) {
final TaskAttemptReport report = ta.getReport();
this.type = type.toString();
this.id = MRApps.toString(ta.getID());
this.nodeHttpAddress = ta.getNodeHttpAddress();
this.startTime = report.getStartTime();
this.finishTime = report.getFinishTime();
this.assignedContainerId = ConverterUtils.toString(report.getContainerId());
this.assignedContainer = report.getContainerId();
this.progress = report.getProgress() * 100;
this.status = report.getStateString();
this.state = report.getTaskAttemptState();
this.elapsedTime = Times
.elapsed(this.startTime, this.finishTime, isRunning);
if (this.elapsedTime == -1) {
this.elapsedTime = 0;
}
this.diagnostics = report.getDiagnosticInfo();
this.rack = ta.getNodeRackName();
}
private void testMRAppHistory(MRApp app) throws Exception {
Configuration conf = new Configuration();
Job job = app.submit(conf);
app.waitForState(job, JobState.FAILED);
Map<TaskId, Task> tasks = job.getTasks();
Assert.assertEquals("Num tasks is not correct", 1, tasks.size());
Task task = tasks.values().iterator().next();
Assert.assertEquals("Task state not correct", TaskState.FAILED, task
.getReport().getTaskState());
Map<TaskAttemptId, TaskAttempt> attempts = tasks.values().iterator().next()
.getAttempts();
Assert.assertEquals("Num attempts is not correct", 4, attempts.size());
Iterator<TaskAttempt> it = attempts.values().iterator();
TaskAttemptReport report = it.next().getReport();
Assert.assertEquals("Attempt state not correct", TaskAttemptState.FAILED,
report.getTaskAttemptState());
Assert.assertEquals("Diagnostic Information is not Correct",
"Test Diagnostic Event", report.getDiagnosticInfo());
report = it.next().getReport();
Assert.assertEquals("Attempt state not correct", TaskAttemptState.FAILED,
report.getTaskAttemptState());
}
public void waitForInternalState(TaskAttemptImpl attempt,
TaskAttemptStateInternal finalState) throws Exception {
int timeoutSecs = 0;
TaskAttemptReport report = attempt.getReport();
TaskAttemptStateInternal iState = attempt.getInternalState();
while (!finalState.equals(iState) && timeoutSecs++ < 20) {
System.out.println("TaskAttempt Internal State is : " + iState
+ " Waiting for Internal state : " + finalState + " progress : "
+ report.getProgress());
Thread.sleep(500);
report = attempt.getReport();
iState = attempt.getInternalState();
}
System.out.println("TaskAttempt Internal State is : " + iState);
Assert.assertEquals("TaskAttempt Internal state is not correct (timedout)",
finalState, iState);
}
public void waitForState(TaskAttempt attempt,
TaskAttemptState finalState) throws Exception {
int timeoutSecs = 0;
TaskAttemptReport report = attempt.getReport();
while (!finalState.equals(report.getTaskAttemptState()) &&
timeoutSecs++ < 20) {
System.out.println("TaskAttempt State is : " + report.getTaskAttemptState() +
" Waiting for state : " + finalState +
" progress : " + report.getProgress());
report = attempt.getReport();
Thread.sleep(500);
}
System.out.println("TaskAttempt State is : " + report.getTaskAttemptState());
Assert.assertEquals("TaskAttempt state is not correct (timedout)",
finalState,
report.getTaskAttemptState());
}
public void verifyCompleted() {
for (Job job : getContext().getAllJobs().values()) {
JobReport jobReport = job.getReport();
System.out.println("Job start time :" + jobReport.getStartTime());
System.out.println("Job finish time :" + jobReport.getFinishTime());
Assert.assertTrue("Job start time is not less than finish time",
jobReport.getStartTime() <= jobReport.getFinishTime());
Assert.assertTrue("Job finish time is in future",
jobReport.getFinishTime() <= System.currentTimeMillis());
for (Task task : job.getTasks().values()) {
TaskReport taskReport = task.getReport();
System.out.println("Task start time : " + taskReport.getStartTime());
System.out.println("Task finish time : " + taskReport.getFinishTime());
Assert.assertTrue("Task start time is not less than finish time",
taskReport.getStartTime() <= taskReport.getFinishTime());
for (TaskAttempt attempt : task.getAttempts().values()) {
TaskAttemptReport attemptReport = attempt.getReport();
Assert.assertTrue("Attempt start time is not less than finish time",
attemptReport.getStartTime() <= attemptReport.getFinishTime());
}
}
}
}
@Override
public TaskAttemptReport getReport() {
TaskAttemptReport result = recordFactory.newRecordInstance(TaskAttemptReport.class);
readLock.lock();
try {
result.setTaskAttemptId(attemptId);
//take the LOCAL state of attempt
//DO NOT take from reportedStatus
result.setTaskAttemptState(getState());
result.setProgress(reportedStatus.progress);
result.setStartTime(launchTime);
result.setFinishTime(finishTime);
result.setShuffleFinishTime(this.reportedStatus.shuffleFinishTime);
result.setDiagnosticInfo(StringUtils.join(LINE_SEPARATOR, getDiagnostics()));
result.setPhase(reportedStatus.phase);
result.setStateString(reportedStatus.stateString);
result.setCounters(TypeConverter.toYarn(getCounters()));
result.setContainerId(this.getAssignedContainerID());
result.setNodeManagerHost(trackerName);
result.setNodeManagerHttpPort(httpPort);
if (this.container != null) {
result.setNodeManagerPort(this.container.getNodeId().getPort());
}
return result;
} finally {
readLock.unlock();
}
}
private void verifyTaskAttemptReport(TaskAttemptReport tar) {
Assert.assertEquals(TaskAttemptState.RUNNING, tar.getTaskAttemptState());
Assert.assertNotNull("TaskAttemptReport is null", tar);
Assert.assertEquals(MRApp.NM_HOST, tar.getNodeManagerHost());
Assert.assertEquals(MRApp.NM_PORT, tar.getNodeManagerPort());
Assert.assertEquals(MRApp.NM_HTTP_PORT, tar.getNodeManagerHttpPort());
Assert.assertEquals(1, tar.getContainerId().getApplicationAttemptId()
.getAttemptId());
}
public static TaskAttemptReport newTaskAttemptReport(TaskAttemptId id) {
ApplicationAttemptId appAttemptId = ApplicationAttemptId.newInstance(
id.getTaskId().getJobId().getAppId(), 0);
ContainerId containerId = ContainerId.newContainerId(appAttemptId, 0);
TaskAttemptReport report = Records.newRecord(TaskAttemptReport.class);
report.setTaskAttemptId(id);
report
.setStartTime(System.currentTimeMillis() - (int) (Math.random() * DT));
report.setFinishTime(System.currentTimeMillis()
+ (int) (Math.random() * DT) + 1);
if (id.getTaskId().getTaskType() == TaskType.REDUCE) {
report.setShuffleFinishTime(
(report.getFinishTime() + report.getStartTime()) / 2);
report.setSortFinishTime(
(report.getFinishTime() + report.getShuffleFinishTime()) / 2);
}
report.setPhase(PHASES.next());
report.setTaskAttemptState(TASK_ATTEMPT_STATES.next());
report.setProgress((float) Math.random());
report.setCounters(TypeConverter.toYarn(newCounters()));
report.setContainerId(containerId);
report.setDiagnosticInfo(DIAGS.next());
report.setStateString("Moving average " + Math.random());
return report;
}
@Override
public TaskAttemptReport getTaskAttemptReport() {
GetTaskAttemptReportResponseProtoOrBuilder p = viaProto ? proto : builder;
if (this.taskAttemptReport != null) {
return this.taskAttemptReport;
}
if (!p.hasTaskAttemptReport()) {
return null;
}
this.taskAttemptReport = convertFromProtoFormat(p.getTaskAttemptReport());
return this.taskAttemptReport;
}
@Override
public void setTaskAttemptReport(TaskAttemptReport taskAttemptReport) {
maybeInitBuilder();
if (taskAttemptReport == null)
builder.clearTaskAttemptReport();
this.taskAttemptReport = taskAttemptReport;
}
@Override
public synchronized TaskAttemptReport getReport() {
if (report == null) {
constructTaskAttemptReport();
}
return report;
}
private void constructTaskAttemptReport() {
report = Records.newRecord(TaskAttemptReport.class);
report.setTaskAttemptId(attemptId);
report.setTaskAttemptState(state);
report.setProgress(getProgress());
report.setStartTime(attemptInfo.getStartTime());
report.setFinishTime(attemptInfo.getFinishTime());
report.setShuffleFinishTime(attemptInfo.getShuffleFinishTime());
report.setSortFinishTime(attemptInfo.getSortFinishTime());
if (localDiagMessage != null) {
report
.setDiagnosticInfo(attemptInfo.getError() + ", " + localDiagMessage);
} else {
report.setDiagnosticInfo(attemptInfo.getError());
}
// report.setPhase(attemptInfo.get); //TODO
report.setStateString(attemptInfo.getState());
report.setCounters(TypeConverter.toYarn(getCounters()));
report.setContainerId(attemptInfo.getContainerId());
if (attemptInfo.getHostname() == null) {
report.setNodeManagerHost("UNKNOWN");
} else {
report.setNodeManagerHost(attemptInfo.getHostname());
report.setNodeManagerPort(attemptInfo.getPort());
}
report.setNodeManagerHttpPort(attemptInfo.getHttpPort());
}
@Test (timeout=10000)
public void testCompletedTaskAttempt() throws Exception {
HistoryFileInfo info = mock(HistoryFileInfo.class);
when(info.getConfFile()).thenReturn(fullConfPath);
completedJob =
new CompletedJob(conf, jobId, fullHistoryPath, loadTasks, "user",
info, jobAclsManager);
TaskId mt1Id = MRBuilderUtils.newTaskId(jobId, 0, TaskType.MAP);
TaskId rt1Id = MRBuilderUtils.newTaskId(jobId, 0, TaskType.REDUCE);
TaskAttemptId mta1Id = MRBuilderUtils.newTaskAttemptId(mt1Id, 0);
TaskAttemptId rta1Id = MRBuilderUtils.newTaskAttemptId(rt1Id, 0);
Task mt1 = completedJob.getTask(mt1Id);
Task rt1 = completedJob.getTask(rt1Id);
TaskAttempt mta1 = mt1.getAttempt(mta1Id);
assertEquals(TaskAttemptState.SUCCEEDED, mta1.getState());
assertEquals("localhost:45454", mta1.getAssignedContainerMgrAddress());
assertEquals("localhost:9999", mta1.getNodeHttpAddress());
TaskAttemptReport mta1Report = mta1.getReport();
assertEquals(TaskAttemptState.SUCCEEDED, mta1Report.getTaskAttemptState());
assertEquals("localhost", mta1Report.getNodeManagerHost());
assertEquals(45454, mta1Report.getNodeManagerPort());
assertEquals(9999, mta1Report.getNodeManagerHttpPort());
TaskAttempt rta1 = rt1.getAttempt(rta1Id);
assertEquals(TaskAttemptState.SUCCEEDED, rta1.getState());
assertEquals("localhost:45454", rta1.getAssignedContainerMgrAddress());
assertEquals("localhost:9999", rta1.getNodeHttpAddress());
TaskAttemptReport rta1Report = rta1.getReport();
assertEquals(TaskAttemptState.SUCCEEDED, rta1Report.getTaskAttemptState());
assertEquals("localhost", rta1Report.getNodeManagerHost());
assertEquals(45454, rta1Report.getNodeManagerPort());
assertEquals(9999, rta1Report.getNodeManagerHttpPort());
}
@Override
public TaskAttemptReport getReport() {
TaskAttemptReport result = recordFactory.newRecordInstance(TaskAttemptReport.class);
readLock.lock();
try {
result.setTaskAttemptId(attemptId);
//take the LOCAL state of attempt
//DO NOT take from reportedStatus
result.setTaskAttemptState(getState());
result.setProgress(reportedStatus.progress);
result.setStartTime(launchTime);
result.setFinishTime(finishTime);
result.setShuffleFinishTime(this.reportedStatus.shuffleFinishTime);
result.setDiagnosticInfo(StringUtils.join(LINE_SEPARATOR, getDiagnostics()));
result.setPhase(reportedStatus.phase);
result.setStateString(reportedStatus.stateString);
result.setCounters(TypeConverter.toYarn(getCounters()));
result.setContainerId(this.getAssignedContainerID());
result.setNodeManagerHost(trackerName);
result.setNodeManagerHttpPort(httpPort);
if (this.container != null) {
result.setNodeManagerPort(this.container.getNodeId().getPort());
}
return result;
} finally {
readLock.unlock();
}
}
private void verifyTaskAttemptReport(TaskAttemptReport tar) {
Assert.assertEquals(TaskAttemptState.RUNNING, tar.getTaskAttemptState());
Assert.assertNotNull("TaskAttemptReport is null", tar);
Assert.assertEquals(MRApp.NM_HOST, tar.getNodeManagerHost());
Assert.assertEquals(MRApp.NM_PORT, tar.getNodeManagerPort());
Assert.assertEquals(MRApp.NM_HTTP_PORT, tar.getNodeManagerHttpPort());
Assert.assertEquals(1, tar.getContainerId().getApplicationAttemptId()
.getAttemptId());
}
public static TaskAttemptReport newTaskAttemptReport(TaskAttemptId id) {
ApplicationAttemptId appAttemptId = ApplicationAttemptId.newInstance(
id.getTaskId().getJobId().getAppId(), 0);
ContainerId containerId = ContainerId.newContainerId(appAttemptId, 0);
TaskAttemptReport report = Records.newRecord(TaskAttemptReport.class);
report.setTaskAttemptId(id);
report
.setStartTime(System.currentTimeMillis() - (int) (Math.random() * DT));
report.setFinishTime(System.currentTimeMillis()
+ (int) (Math.random() * DT) + 1);
if (id.getTaskId().getTaskType() == TaskType.REDUCE) {
report.setShuffleFinishTime(
(report.getFinishTime() + report.getStartTime()) / 2);
report.setSortFinishTime(
(report.getFinishTime() + report.getShuffleFinishTime()) / 2);
}
report.setPhase(PHASES.next());
report.setTaskAttemptState(TASK_ATTEMPT_STATES.next());
report.setProgress((float) Math.random());
report.setCounters(TypeConverter.toYarn(newCounters()));
report.setContainerId(containerId);
report.setDiagnosticInfo(DIAGS.next());
report.setStateString("Moving average " + Math.random());
return report;
}
@Override
public TaskAttemptReport getTaskAttemptReport() {
GetTaskAttemptReportResponseProtoOrBuilder p = viaProto ? proto : builder;
if (this.taskAttemptReport != null) {
return this.taskAttemptReport;
}
if (!p.hasTaskAttemptReport()) {
return null;
}
this.taskAttemptReport = convertFromProtoFormat(p.getTaskAttemptReport());
return this.taskAttemptReport;
}
@Override
public void setTaskAttemptReport(TaskAttemptReport taskAttemptReport) {
maybeInitBuilder();
if (taskAttemptReport == null)
builder.clearTaskAttemptReport();
this.taskAttemptReport = taskAttemptReport;
}
@Override
public synchronized TaskAttemptReport getReport() {
if (report == null) {
constructTaskAttemptReport();
}
return report;
}
private void constructTaskAttemptReport() {
report = Records.newRecord(TaskAttemptReport.class);
report.setTaskAttemptId(attemptId);
report.setTaskAttemptState(state);
report.setProgress(getProgress());
report.setStartTime(attemptInfo.getStartTime());
report.setFinishTime(attemptInfo.getFinishTime());
report.setShuffleFinishTime(attemptInfo.getShuffleFinishTime());
report.setSortFinishTime(attemptInfo.getSortFinishTime());
if (localDiagMessage != null) {
report
.setDiagnosticInfo(attemptInfo.getError() + ", " + localDiagMessage);
} else {
report.setDiagnosticInfo(attemptInfo.getError());
}
// report.setPhase(attemptInfo.get); //TODO
report.setStateString(attemptInfo.getState());
report.setCounters(TypeConverter.toYarn(getCounters()));
report.setContainerId(attemptInfo.getContainerId());
if (attemptInfo.getHostname() == null) {
report.setNodeManagerHost("UNKNOWN");
} else {
report.setNodeManagerHost(attemptInfo.getHostname());
report.setNodeManagerPort(attemptInfo.getPort());
}
report.setNodeManagerHttpPort(attemptInfo.getHttpPort());
}
@Test (timeout=10000)
public void testCompletedTaskAttempt() throws Exception {
HistoryFileInfo info = mock(HistoryFileInfo.class);
when(info.getConfFile()).thenReturn(fullConfPath);
completedJob =
new CompletedJob(conf, jobId, fullHistoryPath, loadTasks, "user",
info, jobAclsManager);
TaskId mt1Id = MRBuilderUtils.newTaskId(jobId, 0, TaskType.MAP);
TaskId rt1Id = MRBuilderUtils.newTaskId(jobId, 0, TaskType.REDUCE);
TaskAttemptId mta1Id = MRBuilderUtils.newTaskAttemptId(mt1Id, 0);
TaskAttemptId rta1Id = MRBuilderUtils.newTaskAttemptId(rt1Id, 0);
Task mt1 = completedJob.getTask(mt1Id);
Task rt1 = completedJob.getTask(rt1Id);
TaskAttempt mta1 = mt1.getAttempt(mta1Id);
assertEquals(TaskAttemptState.SUCCEEDED, mta1.getState());
assertEquals("localhost:45454", mta1.getAssignedContainerMgrAddress());
assertEquals("localhost:9999", mta1.getNodeHttpAddress());
TaskAttemptReport mta1Report = mta1.getReport();
assertEquals(TaskAttemptState.SUCCEEDED, mta1Report.getTaskAttemptState());
assertEquals("localhost", mta1Report.getNodeManagerHost());
assertEquals(45454, mta1Report.getNodeManagerPort());
assertEquals(9999, mta1Report.getNodeManagerHttpPort());
TaskAttempt rta1 = rt1.getAttempt(rta1Id);
assertEquals(TaskAttemptState.SUCCEEDED, rta1.getState());
assertEquals("localhost:45454", rta1.getAssignedContainerMgrAddress());
assertEquals("localhost:9999", rta1.getNodeHttpAddress());
TaskAttemptReport rta1Report = rta1.getReport();
assertEquals(TaskAttemptState.SUCCEEDED, rta1Report.getTaskAttemptState());
assertEquals("localhost", rta1Report.getNodeManagerHost());
assertEquals(45454, rta1Report.getNodeManagerPort());
assertEquals(9999, rta1Report.getNodeManagerHttpPort());
}
/**
* Given the taskAttempt details (task id and attempt id), it gives the TaskAttemptReport
* @param taskId, the taskId instance
* @param attemptId, the attempt id as int
* @return the Task Attempt Report
* @throws IOException
*/
public TaskAttemptReport getTaskAttemptReport(TaskId taskId, int attemptId) throws IOException{
TaskAttemptId taskAttemptId = YarnCommunicatorUtil.getTaskAttemptId(taskId, 0);
GetTaskAttemptReportRequestProto request = GetTaskAttemptReportRequestProto.getDefaultInstance();
GetTaskAttemptReportRequest getTaskAttemptRequest = new GetTaskAttemptReportRequestPBImpl(request);
getTaskAttemptRequest.setTaskAttemptId(taskAttemptId);
GetTaskAttemptReportResponse taskAttemptReportResponse = proxy.getTaskAttemptReport(getTaskAttemptRequest);
return taskAttemptReportResponse.getTaskAttemptReport();
}
/**
* This method prepares a Map containing Node Manager details (hostname, port) on which successful attempts of the job ran
* @param jobId
* @return Map<String, Integer> containing hostname and rpc port of Node Managers
* @throws IOException
*/
public Map<String, Integer> getAttemptedNodes(JobId jobId) throws IOException{
Map<String, Integer> nodes = new HashMap<String, Integer>();
Map<TaskId, TaskReport> reports = getAllTaskReports(jobId);
for(Map.Entry<TaskId, TaskReport> report: reports.entrySet()){
TaskId taskId = report.getKey();
TaskReport taskReport = report.getValue();
TaskAttemptId attemptId = taskReport.getSuccessfulAttempt();
TaskAttemptReport taskAttemptReport = getTaskAttemptReport(taskId, attemptId.getId());
nodes.put(taskAttemptReport.getNodeManagerHost(), taskAttemptReport.getNodeManagerPort());
}
return nodes;
}
@Override
public TaskAttemptReport getReport() {
throw new UnsupportedOperationException("Not supported yet.");
}
public LogParams getLogFilePath(JobID oldJobID, TaskAttemptID oldTaskAttemptID)
throws IOException {
org.apache.hadoop.mapreduce.v2.api.records.JobId jobId =
TypeConverter.toYarn(oldJobID);
GetJobReportRequest request =
recordFactory.newRecordInstance(GetJobReportRequest.class);
request.setJobId(jobId);
JobReport report =
((GetJobReportResponse) invoke("getJobReport",
GetJobReportRequest.class, request)).getJobReport();
if (EnumSet.of(JobState.SUCCEEDED, JobState.FAILED, JobState.KILLED,
JobState.ERROR).contains(report.getJobState())) {
if (oldTaskAttemptID != null) {
GetTaskAttemptReportRequest taRequest =
recordFactory.newRecordInstance(GetTaskAttemptReportRequest.class);
taRequest.setTaskAttemptId(TypeConverter.toYarn(oldTaskAttemptID));
TaskAttemptReport taReport =
((GetTaskAttemptReportResponse) invoke("getTaskAttemptReport",
GetTaskAttemptReportRequest.class, taRequest))
.getTaskAttemptReport();
if (taReport.getContainerId() == null
|| taReport.getNodeManagerHost() == null) {
throw new IOException("Unable to get log information for task: "
+ oldTaskAttemptID);
}
return new LogParams(
taReport.getContainerId().toString(),
taReport.getContainerId().getApplicationAttemptId()
.getApplicationId().toString(),
NodeId.newInstance(taReport.getNodeManagerHost(),
taReport.getNodeManagerPort()).toString(), report.getUser());
} else {
if (report.getAMInfos() == null || report.getAMInfos().size() == 0) {
throw new IOException("Unable to get log information for job: "
+ oldJobID);
}
AMInfo amInfo = report.getAMInfos().get(report.getAMInfos().size() - 1);
return new LogParams(
amInfo.getContainerId().toString(),
amInfo.getAppAttemptId().getApplicationId().toString(),
NodeId.newInstance(amInfo.getNodeManagerHost(),
amInfo.getNodeManagerPort()).toString(), report.getUser());
}
} else {
throw new IOException("Cannot get log path for a in-progress job");
}
}