下面列出了org.apache.hadoop.mapreduce.v2.LogParams#org.apache.hadoop.mapreduce.v2.api.protocolrecords.GetJobReportRequest 实例代码,或者点击链接到github查看源代码,也可以在右侧发表评论。
@Override
public GetJobReportResponse getJobReport(GetJobReportRequest request)
throws IOException {
JobId jobId = request.getJobId();
// false is for retain compatibility
Job job = verifyAndGetJob(jobId, JobACL.VIEW_JOB, false);
GetJobReportResponse response =
recordFactory.newRecordInstance(GetJobReportResponse.class);
if (job != null) {
response.setJobReport(job.getReport());
}
else {
response.setJobReport(null);
}
return response;
}
@Override
public GetJobReportResponse getJobReport(GetJobReportRequest request)
throws IOException {
JobReport jobReport =
recordFactory.newRecordInstance(JobReport.class);
jobReport.setJobId(request.getJobId());
jobReport.setJobState(jobState);
jobReport.setUser(applicationReport.getUser());
jobReport.setStartTime(applicationReport.getStartTime());
jobReport.setDiagnostics(applicationReport.getDiagnostics());
jobReport.setJobName(applicationReport.getName());
jobReport.setTrackingUrl(applicationReport.getTrackingUrl());
jobReport.setFinishTime(applicationReport.getFinishTime());
GetJobReportResponse resp =
recordFactory.newRecordInstance(GetJobReportResponse.class);
resp.setJobReport(jobReport);
return resp;
}
public JobStatus getJobStatus(JobID oldJobID) throws IOException {
org.apache.hadoop.mapreduce.v2.api.records.JobId jobId =
TypeConverter.toYarn(oldJobID);
GetJobReportRequest request =
recordFactory.newRecordInstance(GetJobReportRequest.class);
request.setJobId(jobId);
JobReport report = ((GetJobReportResponse) invoke("getJobReport",
GetJobReportRequest.class, request)).getJobReport();
JobStatus jobStatus = null;
if (report != null) {
if (StringUtils.isEmpty(report.getJobFile())) {
String jobFile = MRApps.getJobFile(conf, report.getUser(), oldJobID);
report.setJobFile(jobFile);
}
String historyTrackingUrl = report.getTrackingUrl();
String url = StringUtils.isNotEmpty(historyTrackingUrl)
? historyTrackingUrl : trackingUrl;
jobStatus = TypeConverter.fromYarn(report, url);
}
return jobStatus;
}
@Test
public void testRetriesOnConnectionFailure() throws Exception {
MRClientProtocol historyServerProxy = mock(MRClientProtocol.class);
when(historyServerProxy.getJobReport(getJobReportRequest())).thenThrow(
new RuntimeException("1")).thenThrow(new RuntimeException("2"))
.thenReturn(getJobReportResponse());
ResourceMgrDelegate rm = mock(ResourceMgrDelegate.class);
when(rm.getApplicationReport(TypeConverter.toYarn(oldJobId).getAppId()))
.thenReturn(null);
ClientServiceDelegate clientServiceDelegate = getClientServiceDelegate(
historyServerProxy, rm);
JobStatus jobStatus = clientServiceDelegate.getJobStatus(oldJobId);
Assert.assertNotNull(jobStatus);
verify(historyServerProxy, times(3)).getJobReport(
any(GetJobReportRequest.class));
}
@Override
public GetJobReportResponse getJobReport(GetJobReportRequest request)
throws IOException {
amContact = true;
JobReport jobReport = recordFactory.newRecordInstance(JobReport.class);
jobReport.setJobId(request.getJobId());
jobReport.setJobState(JobState.RUNNING);
jobReport.setJobName("TestClientRedirect-jobname");
jobReport.setUser("TestClientRedirect-user");
jobReport.setStartTime(0L);
jobReport.setFinishTime(1L);
GetJobReportResponse response = recordFactory
.newRecordInstance(GetJobReportResponse.class);
response.setJobReport(jobReport);
return response;
}
@Override
public GetJobReportResponse getJobReport(GetJobReportRequest request)
throws IOException {
JobId jobId = request.getJobId();
// false is for retain compatibility
Job job = verifyAndGetJob(jobId, JobACL.VIEW_JOB, false);
GetJobReportResponse response =
recordFactory.newRecordInstance(GetJobReportResponse.class);
if (job != null) {
response.setJobReport(job.getReport());
}
else {
response.setJobReport(null);
}
return response;
}
@Override
public GetJobReportResponse getJobReport(GetJobReportRequest request)
throws IOException {
JobReport jobReport =
recordFactory.newRecordInstance(JobReport.class);
jobReport.setJobId(request.getJobId());
jobReport.setJobState(jobState);
jobReport.setUser(applicationReport.getUser());
jobReport.setStartTime(applicationReport.getStartTime());
jobReport.setDiagnostics(applicationReport.getDiagnostics());
jobReport.setJobName(applicationReport.getName());
jobReport.setTrackingUrl(applicationReport.getTrackingUrl());
jobReport.setFinishTime(applicationReport.getFinishTime());
GetJobReportResponse resp =
recordFactory.newRecordInstance(GetJobReportResponse.class);
resp.setJobReport(jobReport);
return resp;
}
public JobStatus getJobStatus(JobID oldJobID) throws IOException {
org.apache.hadoop.mapreduce.v2.api.records.JobId jobId =
TypeConverter.toYarn(oldJobID);
GetJobReportRequest request =
recordFactory.newRecordInstance(GetJobReportRequest.class);
request.setJobId(jobId);
JobReport report = ((GetJobReportResponse) invoke("getJobReport",
GetJobReportRequest.class, request)).getJobReport();
JobStatus jobStatus = null;
if (report != null) {
if (StringUtils.isEmpty(report.getJobFile())) {
String jobFile = MRApps.getJobFile(conf, report.getUser(), oldJobID);
report.setJobFile(jobFile);
}
String historyTrackingUrl = report.getTrackingUrl();
String url = StringUtils.isNotEmpty(historyTrackingUrl)
? historyTrackingUrl : trackingUrl;
jobStatus = TypeConverter.fromYarn(report, url);
}
return jobStatus;
}
@Test
public void testRetriesOnConnectionFailure() throws Exception {
MRClientProtocol historyServerProxy = mock(MRClientProtocol.class);
when(historyServerProxy.getJobReport(getJobReportRequest())).thenThrow(
new RuntimeException("1")).thenThrow(new RuntimeException("2"))
.thenReturn(getJobReportResponse());
ResourceMgrDelegate rm = mock(ResourceMgrDelegate.class);
when(rm.getApplicationReport(TypeConverter.toYarn(oldJobId).getAppId()))
.thenReturn(null);
ClientServiceDelegate clientServiceDelegate = getClientServiceDelegate(
historyServerProxy, rm);
JobStatus jobStatus = clientServiceDelegate.getJobStatus(oldJobId);
Assert.assertNotNull(jobStatus);
verify(historyServerProxy, times(3)).getJobReport(
any(GetJobReportRequest.class));
}
@Override
public GetJobReportResponse getJobReport(GetJobReportRequest request)
throws IOException {
amContact = true;
JobReport jobReport = recordFactory.newRecordInstance(JobReport.class);
jobReport.setJobId(request.getJobId());
jobReport.setJobState(JobState.RUNNING);
jobReport.setJobName("TestClientRedirect-jobname");
jobReport.setUser("TestClientRedirect-user");
jobReport.setStartTime(0L);
jobReport.setFinishTime(1L);
GetJobReportResponse response = recordFactory
.newRecordInstance(GetJobReportResponse.class);
response.setJobReport(jobReport);
return response;
}
@Override
public GetJobReportResponse getJobReport(GetJobReportRequest request)
throws IOException {
JobReport jobReport =
recordFactory.newRecordInstance(JobReport.class);
jobReport.setJobId(request.getJobId());
jobReport.setJobState(jobState);
jobReport.setUser(applicationReport.getUser());
jobReport.setStartTime(applicationReport.getStartTime());
jobReport.setDiagnostics(applicationReport.getDiagnostics());
jobReport.setJobName(applicationReport.getName());
jobReport.setTrackingUrl(applicationReport.getTrackingUrl());
jobReport.setFinishTime(applicationReport.getFinishTime());
GetJobReportResponse resp =
recordFactory.newRecordInstance(GetJobReportResponse.class);
resp.setJobReport(jobReport);
return resp;
}
@Override
public GetJobReportResponse getJobReport(GetJobReportRequest request)
throws IOException {
JobReport jobReport =
recordFactory.newRecordInstance(JobReport.class);
jobReport.setJobId(request.getJobId());
jobReport.setJobState(jobState);
jobReport.setUser(applicationReport.getUser());
jobReport.setStartTime(applicationReport.getStartTime());
jobReport.setDiagnostics(applicationReport.getDiagnostics());
jobReport.setJobName(applicationReport.getName());
jobReport.setTrackingUrl(applicationReport.getTrackingUrl());
jobReport.setFinishTime(applicationReport.getFinishTime());
GetJobReportResponse resp =
recordFactory.newRecordInstance(GetJobReportResponse.class);
resp.setJobReport(jobReport);
return resp;
}
@Test
public void testRMDownRestoreForJobStatusBeforeGetAMReport()
throws IOException {
Configuration conf = new YarnConfiguration();
conf.setInt(MRJobConfig.MR_CLIENT_MAX_RETRIES, 3);
conf.set(MRConfig.FRAMEWORK_NAME, MRConfig.YARN_FRAMEWORK_NAME);
conf.setBoolean(MRJobConfig.JOB_AM_ACCESS_DISABLED,
!isAMReachableFromClient);
MRClientProtocol historyServerProxy = mock(MRClientProtocol.class);
when(historyServerProxy.getJobReport(any(GetJobReportRequest.class)))
.thenReturn(getJobReportResponse());
ResourceMgrDelegate rmDelegate = mock(ResourceMgrDelegate.class);
try {
when(rmDelegate.getApplicationReport(jobId.getAppId())).thenThrow(
new java.lang.reflect.UndeclaredThrowableException(new IOException(
"Connection refuced1"))).thenThrow(
new java.lang.reflect.UndeclaredThrowableException(new IOException(
"Connection refuced2")))
.thenReturn(getFinishedApplicationReport());
ClientServiceDelegate clientServiceDelegate = new ClientServiceDelegate(
conf, rmDelegate, oldJobId, historyServerProxy);
JobStatus jobStatus = clientServiceDelegate.getJobStatus(oldJobId);
verify(rmDelegate, times(3)).getApplicationReport(
any(ApplicationId.class));
Assert.assertNotNull(jobStatus);
} catch (YarnException e) {
throw new IOException(e);
}
}
@Override
public GetJobReportResponse getJobReport(GetJobReportRequest request)
throws IOException {
GetJobReportRequestProto requestProto = ((GetJobReportRequestPBImpl)request).getProto();
try {
return new GetJobReportResponsePBImpl(proxy.getJobReport(null, requestProto));
} catch (ServiceException e) {
throw unwrapAndThrowException(e);
}
}
@Override
public GetJobReportResponse getJobReport(GetJobReportRequest request)
throws IOException {
JobId jobId = request.getJobId();
Job job = verifyAndGetJob(jobId, false);
GetJobReportResponse response = recordFactory.newRecordInstance(GetJobReportResponse.class);
if (job != null) {
response.setJobReport(job.getReport());
}
else {
response.setJobReport(null);
}
return response;
}
@Test
public void testRMDownRestoreForJobStatusBeforeGetAMReport()
throws IOException {
Configuration conf = new YarnConfiguration();
conf.setInt(MRJobConfig.MR_CLIENT_MAX_RETRIES, 3);
conf.set(MRConfig.FRAMEWORK_NAME, MRConfig.YARN_FRAMEWORK_NAME);
conf.setBoolean(MRJobConfig.JOB_AM_ACCESS_DISABLED,
!isAMReachableFromClient);
MRClientProtocol historyServerProxy = mock(MRClientProtocol.class);
when(historyServerProxy.getJobReport(any(GetJobReportRequest.class)))
.thenReturn(getJobReportResponse());
ResourceMgrDelegate rmDelegate = mock(ResourceMgrDelegate.class);
try {
when(rmDelegate.getApplicationReport(jobId.getAppId())).thenThrow(
new java.lang.reflect.UndeclaredThrowableException(new IOException(
"Connection refuced1"))).thenThrow(
new java.lang.reflect.UndeclaredThrowableException(new IOException(
"Connection refuced2")))
.thenReturn(getFinishedApplicationReport());
ClientServiceDelegate clientServiceDelegate = new ClientServiceDelegate(
conf, rmDelegate, oldJobId, historyServerProxy);
JobStatus jobStatus = clientServiceDelegate.getJobStatus(oldJobId);
verify(rmDelegate, times(3)).getApplicationReport(
any(ApplicationId.class));
Assert.assertNotNull(jobStatus);
} catch (YarnException e) {
throw new IOException(e);
}
}
@Override
public GetJobReportResponse getJobReport(GetJobReportRequest request)
throws IOException {
GetJobReportRequestProto requestProto = ((GetJobReportRequestPBImpl)request).getProto();
try {
return new GetJobReportResponsePBImpl(proxy.getJobReport(null, requestProto));
} catch (ServiceException e) {
throw unwrapAndThrowException(e);
}
}
@Override
public GetJobReportResponse getJobReport(GetJobReportRequest request)
throws IOException {
JobId jobId = request.getJobId();
Job job = verifyAndGetJob(jobId, false);
GetJobReportResponse response = recordFactory.newRecordInstance(GetJobReportResponse.class);
if (job != null) {
response.setJobReport(job.getReport());
}
else {
response.setJobReport(null);
}
return response;
}
/**
* Given the required details (application id and suffix id) for JobId it gives the JobReport
* @param appid, the Application Id instance
* @param id, the suffix id
* @return the Job Report
* @throws IOException
*/
public JobReport getJobReport(ApplicationId appId, int id) throws IOException{
JobId jobId = YarnCommunicatorUtil.getJobId(appId, (int)1);
GetJobReportRequestProto proto = GetJobReportRequestProto.getDefaultInstance();
GetJobReportRequest request = new GetJobReportRequestPBImpl(proto);
request.setJobId(jobId);
GetJobReportResponse jobReportResponse = proxy.getJobReport(request);
return jobReportResponse.getJobReport();
}
public JobReport getJobReport(JobId jobId) throws IOException {
GetJobReportRequestProto proto = GetJobReportRequestProto.getDefaultInstance();
GetJobReportRequest request = new GetJobReportRequestPBImpl(proto);
request.setJobId(jobId);
GetJobReportResponse jobReportResponse = proxy.getJobReport(request);
return jobReportResponse.getJobReport();
}
public LogParams getLogFilePath(JobID oldJobID, TaskAttemptID oldTaskAttemptID)
throws IOException {
org.apache.hadoop.mapreduce.v2.api.records.JobId jobId =
TypeConverter.toYarn(oldJobID);
GetJobReportRequest request =
recordFactory.newRecordInstance(GetJobReportRequest.class);
request.setJobId(jobId);
JobReport report =
((GetJobReportResponse) invoke("getJobReport",
GetJobReportRequest.class, request)).getJobReport();
if (EnumSet.of(JobState.SUCCEEDED, JobState.FAILED, JobState.KILLED,
JobState.ERROR).contains(report.getJobState())) {
if (oldTaskAttemptID != null) {
GetTaskAttemptReportRequest taRequest =
recordFactory.newRecordInstance(GetTaskAttemptReportRequest.class);
taRequest.setTaskAttemptId(TypeConverter.toYarn(oldTaskAttemptID));
TaskAttemptReport taReport =
((GetTaskAttemptReportResponse) invoke("getTaskAttemptReport",
GetTaskAttemptReportRequest.class, taRequest))
.getTaskAttemptReport();
if (taReport.getContainerId() == null
|| taReport.getNodeManagerHost() == null) {
throw new IOException("Unable to get log information for task: "
+ oldTaskAttemptID);
}
return new LogParams(
taReport.getContainerId().toString(),
taReport.getContainerId().getApplicationAttemptId()
.getApplicationId().toString(),
NodeId.newInstance(taReport.getNodeManagerHost(),
taReport.getNodeManagerPort()).toString(), report.getUser());
} else {
if (report.getAMInfos() == null || report.getAMInfos().size() == 0) {
throw new IOException("Unable to get log information for job: "
+ oldJobID);
}
AMInfo amInfo = report.getAMInfos().get(report.getAMInfos().size() - 1);
return new LogParams(
amInfo.getContainerId().toString(),
amInfo.getAppAttemptId().getApplicationId().toString(),
NodeId.newInstance(amInfo.getNodeManagerHost(),
amInfo.getNodeManagerPort()).toString(), report.getUser());
}
} else {
throw new IOException("Cannot get log path for a in-progress job");
}
}
@Test
public void testRetriesOnAMConnectionFailures() throws Exception {
if (!isAMReachableFromClient) {
return;
}
ResourceMgrDelegate rm = mock(ResourceMgrDelegate.class);
when(rm.getApplicationReport(TypeConverter.toYarn(oldJobId).getAppId()))
.thenReturn(getRunningApplicationReport("am1", 78));
// throw exception in 1st, 2nd, 3rd and 4th call of getJobReport, and
// succeed in the 5th call.
final MRClientProtocol amProxy = mock(MRClientProtocol.class);
when(amProxy.getJobReport(any(GetJobReportRequest.class)))
.thenThrow(new RuntimeException("11"))
.thenThrow(new RuntimeException("22"))
.thenThrow(new RuntimeException("33"))
.thenThrow(new RuntimeException("44")).thenReturn(getJobReportResponse());
Configuration conf = new YarnConfiguration();
conf.set(MRConfig.FRAMEWORK_NAME, MRConfig.YARN_FRAMEWORK_NAME);
conf.setBoolean(MRJobConfig.JOB_AM_ACCESS_DISABLED,
!isAMReachableFromClient);
ClientServiceDelegate clientServiceDelegate =
new ClientServiceDelegate(conf, rm, oldJobId, null) {
@Override
MRClientProtocol instantiateAMProxy(
final InetSocketAddress serviceAddr) throws IOException {
super.instantiateAMProxy(serviceAddr);
return amProxy;
}
};
JobStatus jobStatus = clientServiceDelegate.getJobStatus(oldJobId);
Assert.assertNotNull(jobStatus);
// assert maxClientRetry is not decremented.
Assert.assertEquals(conf.getInt(MRJobConfig.MR_CLIENT_MAX_RETRIES,
MRJobConfig.DEFAULT_MR_CLIENT_MAX_RETRIES), clientServiceDelegate
.getMaxClientRetry());
verify(amProxy, times(5)).getJobReport(any(GetJobReportRequest.class));
}
@Test
public void testNoRetryOnAMAuthorizationException() throws Exception {
if (!isAMReachableFromClient) {
return;
}
ResourceMgrDelegate rm = mock(ResourceMgrDelegate.class);
when(rm.getApplicationReport(TypeConverter.toYarn(oldJobId).getAppId()))
.thenReturn(getRunningApplicationReport("am1", 78));
// throw authorization exception on first invocation
final MRClientProtocol amProxy = mock(MRClientProtocol.class);
when(amProxy.getJobReport(any(GetJobReportRequest.class)))
.thenThrow(new AuthorizationException("Denied"));
Configuration conf = new YarnConfiguration();
conf.set(MRConfig.FRAMEWORK_NAME, MRConfig.YARN_FRAMEWORK_NAME);
conf.setBoolean(MRJobConfig.JOB_AM_ACCESS_DISABLED,
!isAMReachableFromClient);
ClientServiceDelegate clientServiceDelegate =
new ClientServiceDelegate(conf, rm, oldJobId, null) {
@Override
MRClientProtocol instantiateAMProxy(
final InetSocketAddress serviceAddr) throws IOException {
super.instantiateAMProxy(serviceAddr);
return amProxy;
}
};
try {
clientServiceDelegate.getJobStatus(oldJobId);
Assert.fail("Exception should be thrown upon AuthorizationException");
} catch (IOException e) {
Assert.assertEquals(AuthorizationException.class.getName() + ": Denied",
e.getMessage());
}
// assert maxClientRetry is not decremented.
Assert.assertEquals(conf.getInt(MRJobConfig.MR_CLIENT_MAX_RETRIES,
MRJobConfig.DEFAULT_MR_CLIENT_MAX_RETRIES), clientServiceDelegate
.getMaxClientRetry());
verify(amProxy, times(1)).getJobReport(any(GetJobReportRequest.class));
}
@Test
public void testReconnectOnAMRestart() throws IOException {
//test not applicable when AM not reachable
//as instantiateAMProxy is not called at all
if(!isAMReachableFromClient) {
return;
}
MRClientProtocol historyServerProxy = mock(MRClientProtocol.class);
// RM returns AM1 url, null, null and AM2 url on invocations.
// Nulls simulate the time when AM2 is in the process of restarting.
ResourceMgrDelegate rmDelegate = mock(ResourceMgrDelegate.class);
try {
when(rmDelegate.getApplicationReport(jobId.getAppId())).thenReturn(
getRunningApplicationReport("am1", 78)).thenReturn(
getRunningApplicationReport(null, 0)).thenReturn(
getRunningApplicationReport(null, 0)).thenReturn(
getRunningApplicationReport("am2", 90));
} catch (YarnException e) {
throw new IOException(e);
}
GetJobReportResponse jobReportResponse1 = mock(GetJobReportResponse.class);
when(jobReportResponse1.getJobReport()).thenReturn(
MRBuilderUtils.newJobReport(jobId, "jobName-firstGen", "user",
JobState.RUNNING, 0, 0, 0, 0, 0, 0, 0, "anything", null,
false, ""));
// First AM returns a report with jobName firstGen and simulates AM shutdown
// on second invocation.
MRClientProtocol firstGenAMProxy = mock(MRClientProtocol.class);
when(firstGenAMProxy.getJobReport(any(GetJobReportRequest.class)))
.thenReturn(jobReportResponse1).thenThrow(
new RuntimeException("AM is down!"));
GetJobReportResponse jobReportResponse2 = mock(GetJobReportResponse.class);
when(jobReportResponse2.getJobReport()).thenReturn(
MRBuilderUtils.newJobReport(jobId, "jobName-secondGen", "user",
JobState.RUNNING, 0, 0, 0, 0, 0, 0, 0, "anything", null,
false, ""));
// Second AM generation returns a report with jobName secondGen
MRClientProtocol secondGenAMProxy = mock(MRClientProtocol.class);
when(secondGenAMProxy.getJobReport(any(GetJobReportRequest.class)))
.thenReturn(jobReportResponse2);
ClientServiceDelegate clientServiceDelegate = spy(getClientServiceDelegate(
historyServerProxy, rmDelegate));
// First time, connection should be to AM1, then to AM2. Further requests
// should use the same proxy to AM2 and so instantiateProxy shouldn't be
// called.
doReturn(firstGenAMProxy).doReturn(secondGenAMProxy).when(
clientServiceDelegate).instantiateAMProxy(any(InetSocketAddress.class));
JobStatus jobStatus = clientServiceDelegate.getJobStatus(oldJobId);
Assert.assertNotNull(jobStatus);
Assert.assertEquals("jobName-firstGen", jobStatus.getJobName());
jobStatus = clientServiceDelegate.getJobStatus(oldJobId);
Assert.assertNotNull(jobStatus);
Assert.assertEquals("jobName-secondGen", jobStatus.getJobName());
jobStatus = clientServiceDelegate.getJobStatus(oldJobId);
Assert.assertNotNull(jobStatus);
Assert.assertEquals("jobName-secondGen", jobStatus.getJobName());
verify(clientServiceDelegate, times(2)).instantiateAMProxy(
any(InetSocketAddress.class));
}
private GetJobReportRequest getJobReportRequest() {
GetJobReportRequest request = Records.newRecord(GetJobReportRequest.class);
request.setJobId(jobId);
return request;
}
@Test (timeout = 90000)
public void testJobHistoryData() throws IOException, InterruptedException,
AvroRemoteException, ClassNotFoundException {
if (!(new File(MiniMRYarnCluster.APPJAR)).exists()) {
LOG.info("MRAppJar " + MiniMRYarnCluster.APPJAR
+ " not found. Not running test.");
return;
}
SleepJob sleepJob = new SleepJob();
sleepJob.setConf(mrCluster.getConfig());
// Job with 3 maps and 2 reduces
Job job = sleepJob.createJob(3, 2, 1000, 1, 500, 1);
job.setJarByClass(SleepJob.class);
job.addFileToClassPath(APP_JAR); // The AppMaster jar itself.
job.waitForCompletion(true);
Counters counterMR = job.getCounters();
JobId jobId = TypeConverter.toYarn(job.getJobID());
ApplicationId appID = jobId.getAppId();
int pollElapsed = 0;
while (true) {
Thread.sleep(1000);
pollElapsed += 1000;
if (TERMINAL_RM_APP_STATES.contains(
mrCluster.getResourceManager().getRMContext().getRMApps().get(appID)
.getState())) {
break;
}
if (pollElapsed >= 60000) {
LOG.warn("application did not reach terminal state within 60 seconds");
break;
}
}
Assert.assertEquals(RMAppState.FINISHED, mrCluster.getResourceManager()
.getRMContext().getRMApps().get(appID).getState());
Counters counterHS = job.getCounters();
//TODO the Assert below worked. need to check
//Should we compare each field or convert to V2 counter and compare
LOG.info("CounterHS " + counterHS);
LOG.info("CounterMR " + counterMR);
Assert.assertEquals(counterHS, counterMR);
HSClientProtocol historyClient = instantiateHistoryProxy();
GetJobReportRequest gjReq = Records.newRecord(GetJobReportRequest.class);
gjReq.setJobId(jobId);
JobReport jobReport = historyClient.getJobReport(gjReq).getJobReport();
verifyJobReport(jobReport, jobId);
}
@Override
public GetJobReportResponse getJobReport(GetJobReportRequest request)
throws IOException {
return null;
}
public LogParams getLogFilePath(JobID oldJobID, TaskAttemptID oldTaskAttemptID)
throws IOException {
org.apache.hadoop.mapreduce.v2.api.records.JobId jobId =
TypeConverter.toYarn(oldJobID);
GetJobReportRequest request =
recordFactory.newRecordInstance(GetJobReportRequest.class);
request.setJobId(jobId);
JobReport report =
((GetJobReportResponse) invoke("getJobReport",
GetJobReportRequest.class, request)).getJobReport();
if (EnumSet.of(JobState.SUCCEEDED, JobState.FAILED, JobState.KILLED,
JobState.ERROR).contains(report.getJobState())) {
if (oldTaskAttemptID != null) {
GetTaskAttemptReportRequest taRequest =
recordFactory.newRecordInstance(GetTaskAttemptReportRequest.class);
taRequest.setTaskAttemptId(TypeConverter.toYarn(oldTaskAttemptID));
TaskAttemptReport taReport =
((GetTaskAttemptReportResponse) invoke("getTaskAttemptReport",
GetTaskAttemptReportRequest.class, taRequest))
.getTaskAttemptReport();
if (taReport.getContainerId() == null
|| taReport.getNodeManagerHost() == null) {
throw new IOException("Unable to get log information for task: "
+ oldTaskAttemptID);
}
return new LogParams(
taReport.getContainerId().toString(),
taReport.getContainerId().getApplicationAttemptId()
.getApplicationId().toString(),
NodeId.newInstance(taReport.getNodeManagerHost(),
taReport.getNodeManagerPort()).toString(), report.getUser());
} else {
if (report.getAMInfos() == null || report.getAMInfos().size() == 0) {
throw new IOException("Unable to get log information for job: "
+ oldJobID);
}
AMInfo amInfo = report.getAMInfos().get(report.getAMInfos().size() - 1);
return new LogParams(
amInfo.getContainerId().toString(),
amInfo.getAppAttemptId().getApplicationId().toString(),
NodeId.newInstance(amInfo.getNodeManagerHost(),
amInfo.getNodeManagerPort()).toString(), report.getUser());
}
} else {
throw new IOException("Cannot get log path for a in-progress job");
}
}
@Test
public void testRetriesOnAMConnectionFailures() throws Exception {
if (!isAMReachableFromClient) {
return;
}
ResourceMgrDelegate rm = mock(ResourceMgrDelegate.class);
when(rm.getApplicationReport(TypeConverter.toYarn(oldJobId).getAppId()))
.thenReturn(getRunningApplicationReport("am1", 78));
// throw exception in 1st, 2nd, 3rd and 4th call of getJobReport, and
// succeed in the 5th call.
final MRClientProtocol amProxy = mock(MRClientProtocol.class);
when(amProxy.getJobReport(any(GetJobReportRequest.class)))
.thenThrow(new RuntimeException("11"))
.thenThrow(new RuntimeException("22"))
.thenThrow(new RuntimeException("33"))
.thenThrow(new RuntimeException("44")).thenReturn(getJobReportResponse());
Configuration conf = new YarnConfiguration();
conf.set(MRConfig.FRAMEWORK_NAME, MRConfig.YARN_FRAMEWORK_NAME);
conf.setBoolean(MRJobConfig.JOB_AM_ACCESS_DISABLED,
!isAMReachableFromClient);
ClientServiceDelegate clientServiceDelegate =
new ClientServiceDelegate(conf, rm, oldJobId, null) {
@Override
MRClientProtocol instantiateAMProxy(
final InetSocketAddress serviceAddr) throws IOException {
super.instantiateAMProxy(serviceAddr);
return amProxy;
}
};
JobStatus jobStatus = clientServiceDelegate.getJobStatus(oldJobId);
Assert.assertNotNull(jobStatus);
// assert maxClientRetry is not decremented.
Assert.assertEquals(conf.getInt(MRJobConfig.MR_CLIENT_MAX_RETRIES,
MRJobConfig.DEFAULT_MR_CLIENT_MAX_RETRIES), clientServiceDelegate
.getMaxClientRetry());
verify(amProxy, times(5)).getJobReport(any(GetJobReportRequest.class));
}
@Test
public void testNoRetryOnAMAuthorizationException() throws Exception {
if (!isAMReachableFromClient) {
return;
}
ResourceMgrDelegate rm = mock(ResourceMgrDelegate.class);
when(rm.getApplicationReport(TypeConverter.toYarn(oldJobId).getAppId()))
.thenReturn(getRunningApplicationReport("am1", 78));
// throw authorization exception on first invocation
final MRClientProtocol amProxy = mock(MRClientProtocol.class);
when(amProxy.getJobReport(any(GetJobReportRequest.class)))
.thenThrow(new AuthorizationException("Denied"));
Configuration conf = new YarnConfiguration();
conf.set(MRConfig.FRAMEWORK_NAME, MRConfig.YARN_FRAMEWORK_NAME);
conf.setBoolean(MRJobConfig.JOB_AM_ACCESS_DISABLED,
!isAMReachableFromClient);
ClientServiceDelegate clientServiceDelegate =
new ClientServiceDelegate(conf, rm, oldJobId, null) {
@Override
MRClientProtocol instantiateAMProxy(
final InetSocketAddress serviceAddr) throws IOException {
super.instantiateAMProxy(serviceAddr);
return amProxy;
}
};
try {
clientServiceDelegate.getJobStatus(oldJobId);
Assert.fail("Exception should be thrown upon AuthorizationException");
} catch (IOException e) {
Assert.assertEquals(AuthorizationException.class.getName() + ": Denied",
e.getMessage());
}
// assert maxClientRetry is not decremented.
Assert.assertEquals(conf.getInt(MRJobConfig.MR_CLIENT_MAX_RETRIES,
MRJobConfig.DEFAULT_MR_CLIENT_MAX_RETRIES), clientServiceDelegate
.getMaxClientRetry());
verify(amProxy, times(1)).getJobReport(any(GetJobReportRequest.class));
}