下面列出了org.apache.hadoop.mapred.TaskUmbilicalProtocol#org.apache.hadoop.mapred.TaskCompletionEvent 实例代码,或者点击链接到github查看源代码,也可以在右侧发表评论。
@Override
public TaskCompletionEvent[] getMapAttemptCompletionEvents(
int startIndex, int maxEvents) {
TaskCompletionEvent[] events = EMPTY_TASK_COMPLETION_EVENTS;
readLock.lock();
try {
if (mapAttemptCompletionEvents.size() > startIndex) {
int actualMax = Math.min(maxEvents,
(mapAttemptCompletionEvents.size() - startIndex));
events = mapAttemptCompletionEvents.subList(startIndex,
actualMax + startIndex).toArray(events);
}
return events;
} finally {
readLock.unlock();
}
}
public static TaskCompletionEvent.Status fromYarn(
TaskAttemptCompletionEventStatus newStatus) {
switch (newStatus) {
case FAILED:
return TaskCompletionEvent.Status.FAILED;
case KILLED:
return TaskCompletionEvent.Status.KILLED;
case OBSOLETE:
return TaskCompletionEvent.Status.OBSOLETE;
case SUCCEEDED:
return TaskCompletionEvent.Status.SUCCEEDED;
case TIPFAILED:
return TaskCompletionEvent.Status.TIPFAILED;
}
throw new YarnRuntimeException("Unrecognized status: " + newStatus);
}
@Override
public void resolve(TaskCompletionEvent event) {
switch (event.getTaskStatus()) {
case SUCCEEDED:
URI u = getBaseURI(reduceId, event.getTaskTrackerHttp());
addKnownMapOutput(u.getHost() + ":" + u.getPort(),
u.toString(),
event.getTaskAttemptId());
maxMapRuntime = Math.max(maxMapRuntime, event.getTaskRunTime());
break;
case FAILED:
case KILLED:
case OBSOLETE:
obsoleteMapOutput(event.getTaskAttemptId());
LOG.info("Ignoring obsolete output of " + event.getTaskStatus() +
" map-task: '" + event.getTaskAttemptId() + "'");
break;
case TIPFAILED:
tipFailed(event.getTaskAttemptId().getTaskID());
LOG.info("Ignoring output of failed map TIP: '" +
event.getTaskAttemptId() + "'");
break;
}
}
@Override
public TaskCompletionEvent[] getMapAttemptCompletionEvents(
int startIndex, int maxEvents) {
TaskCompletionEvent[] events = EMPTY_TASK_COMPLETION_EVENTS;
readLock.lock();
try {
if (mapAttemptCompletionEvents.size() > startIndex) {
int actualMax = Math.min(maxEvents,
(mapAttemptCompletionEvents.size() - startIndex));
events = mapAttemptCompletionEvents.subList(startIndex,
actualMax + startIndex).toArray(events);
}
return events;
} finally {
readLock.unlock();
}
}
public static TaskCompletionEvent.Status fromYarn(
TaskAttemptCompletionEventStatus newStatus) {
switch (newStatus) {
case FAILED:
return TaskCompletionEvent.Status.FAILED;
case KILLED:
return TaskCompletionEvent.Status.KILLED;
case OBSOLETE:
return TaskCompletionEvent.Status.OBSOLETE;
case SUCCEEDED:
return TaskCompletionEvent.Status.SUCCEEDED;
case TIPFAILED:
return TaskCompletionEvent.Status.TIPFAILED;
}
throw new YarnRuntimeException("Unrecognized status: " + newStatus);
}
@Override
public void resolve(TaskCompletionEvent event) {
switch (event.getTaskStatus()) {
case SUCCEEDED:
URI u = getBaseURI(reduceId, event.getTaskTrackerHttp());
addKnownMapOutput(u.getHost() + ":" + u.getPort(),
u.toString(),
event.getTaskAttemptId());
maxMapRuntime = Math.max(maxMapRuntime, event.getTaskRunTime());
break;
case FAILED:
case KILLED:
case OBSOLETE:
obsoleteMapOutput(event.getTaskAttemptId());
LOG.info("Ignoring obsolete output of " + event.getTaskStatus() +
" map-task: '" + event.getTaskAttemptId() + "'");
break;
case TIPFAILED:
tipFailed(event.getTaskAttemptId().getTaskID());
LOG.info("Ignoring output of failed map TIP: '" +
event.getTaskAttemptId() + "'");
break;
}
}
@Override
public void abortJob(final JobContext context, final JobStatus.State runState) throws java.io.IOException {
super.abortJob(context, runState);
final JobClient jobClient = new JobClient(new JobConf(context.getConfiguration()));
final RunningJob job = jobClient.getJob((org.apache.hadoop.mapred.JobID) JobID.forName(context.getConfiguration().get("mapred.job.id")));
String diag = "";
for (final TaskCompletionEvent event : job.getTaskCompletionEvents(0))
switch (event.getTaskStatus()) {
case SUCCEEDED:
break;
default:
diag += "Diagnostics for: " + event.getTaskTrackerHttp() + "\n";
for (final String s : job.getTaskDiagnostics(event.getTaskAttemptId()))
diag += s + "\n";
diag += "\n";
break;
}
updateStatus(diag, context.getConfiguration().getInt("boa.hadoop.jobid", 0));
}
public static TaskCompletionEvent[] runJob(JobConf conf, Class mapperClass,
boolean enableNoFetchEmptyMapOutputs) throws Exception {
conf.setMapperClass(mapperClass);
conf.setReducerClass(IdentityReducer.class);
conf.setOutputKeyClass(Text.class);
conf.setOutputValueClass(Text.class);
conf.setNumMapTasks(3);
conf.setNumReduceTasks(1);
conf.setInputFormat(FakeIF.class);
conf.setBoolean("mapred.enable.no.fetch.map.outputs", enableNoFetchEmptyMapOutputs);
FileInputFormat.setInputPaths(conf, new Path("/in"));
final Path outp = new Path("/out");
FileOutputFormat.setOutputPath(conf, outp);
RunningJob job = null;
job = JobClient.runJob(conf);
assertTrue(job.isSuccessful());
return job.getTaskCompletionEvents(0);
}
public static TaskCompletionEvent[] fromYarn(
TaskAttemptCompletionEvent[] newEvents) {
TaskCompletionEvent[] oldEvents =
new TaskCompletionEvent[newEvents.length];
int i = 0;
for (TaskAttemptCompletionEvent newEvent
: newEvents) {
oldEvents[i++] = fromYarn(newEvent);
}
return oldEvents;
}
public static TaskCompletionEvent fromYarn(
TaskAttemptCompletionEvent newEvent) {
return new TaskCompletionEvent(newEvent.getEventId(),
fromYarn(newEvent.getAttemptId()), newEvent.getAttemptId().getId(),
newEvent.getAttemptId().getTaskId().getTaskType().equals(TaskType.MAP),
fromYarn(newEvent.getStatus()),
newEvent.getMapOutputServerAddress());
}
private MapTaskCompletionEventsUpdate getMockedCompletionEventsUpdate(
int startIdx, int numEvents) {
ArrayList<TaskCompletionEvent> tceList =
new ArrayList<TaskCompletionEvent>(numEvents);
for (int i = 0; i < numEvents; ++i) {
int eventIdx = startIdx + i;
TaskCompletionEvent tce = new TaskCompletionEvent(eventIdx,
new TaskAttemptID("12345", 1, TaskType.MAP, eventIdx, 0),
eventIdx, true, TaskCompletionEvent.Status.SUCCEEDED,
"http://somehost:8888");
tceList.add(tce);
}
TaskCompletionEvent[] events = {};
return new MapTaskCompletionEventsUpdate(tceList.toArray(events), false);
}
@Override
public synchronized TaskCompletionEvent[] getMapAttemptCompletionEvents(
int startIndex, int maxEvents) {
if (mapCompletionEvents == null) {
constructTaskAttemptCompletionEvents();
}
return TypeConverter.fromYarn(getAttemptCompletionEvents(
mapCompletionEvents, startIndex, maxEvents));
}
/**
* Simple test of some methods of CompletedJob
* @throws Exception
*/
@Test (timeout=30000)
public void testGetTaskAttemptCompletionEvent() throws Exception{
HistoryFileInfo info = mock(HistoryFileInfo.class);
when(info.getConfFile()).thenReturn(fullConfPath);
completedJob =
new CompletedJob(conf, jobId, fullHistoryPath, loadTasks, "user",
info, jobAclsManager);
TaskCompletionEvent[] events= completedJob.getMapAttemptCompletionEvents(0,1000);
assertEquals(10, completedJob.getMapAttemptCompletionEvents(0,10).length);
int currentEventId=0;
for (TaskCompletionEvent taskAttemptCompletionEvent : events) {
int eventId= taskAttemptCompletionEvent.getEventId();
assertTrue(eventId>=currentEventId);
currentEventId=eventId;
}
assertNull(completedJob.loadConfFile() );
// job name
assertEquals("Sleep job",completedJob.getName());
// queue name
assertEquals("default",completedJob.getQueueName());
// progress
assertEquals(1.0, completedJob.getProgress(),0.001);
// 12 rows in answer
assertEquals(12,completedJob.getTaskAttemptCompletionEvents(0,1000).length);
// select first 10 rows
assertEquals(10,completedJob.getTaskAttemptCompletionEvents(0,10).length);
// select 5-10 rows include 5th
assertEquals(7,completedJob.getTaskAttemptCompletionEvents(5,10).length);
// without errors
assertEquals(1,completedJob.getDiagnostics().size());
assertEquals("",completedJob.getDiagnostics().get(0));
assertEquals(0, completedJob.getJobACLs().size());
}
public static TaskCompletionEvent[] fromYarn(
TaskAttemptCompletionEvent[] newEvents) {
TaskCompletionEvent[] oldEvents =
new TaskCompletionEvent[newEvents.length];
int i = 0;
for (TaskAttemptCompletionEvent newEvent
: newEvents) {
oldEvents[i++] = fromYarn(newEvent);
}
return oldEvents;
}
public static TaskCompletionEvent fromYarn(
TaskAttemptCompletionEvent newEvent) {
return new TaskCompletionEvent(newEvent.getEventId(),
fromYarn(newEvent.getAttemptId()), newEvent.getAttemptId().getId(),
newEvent.getAttemptId().getTaskId().getTaskType().equals(TaskType.MAP),
fromYarn(newEvent.getStatus()),
newEvent.getMapOutputServerAddress());
}
private MapTaskCompletionEventsUpdate getMockedCompletionEventsUpdate(
int startIdx, int numEvents) {
ArrayList<TaskCompletionEvent> tceList =
new ArrayList<TaskCompletionEvent>(numEvents);
for (int i = 0; i < numEvents; ++i) {
int eventIdx = startIdx + i;
TaskCompletionEvent tce = new TaskCompletionEvent(eventIdx,
new TaskAttemptID("12345", 1, TaskType.MAP, eventIdx, 0),
eventIdx, true, TaskCompletionEvent.Status.SUCCEEDED,
"http://somehost:8888");
tceList.add(tce);
}
TaskCompletionEvent[] events = {};
return new MapTaskCompletionEventsUpdate(tceList.toArray(events), false);
}
@Override
public synchronized TaskCompletionEvent[] getMapAttemptCompletionEvents(
int startIndex, int maxEvents) {
if (mapCompletionEvents == null) {
constructTaskAttemptCompletionEvents();
}
return TypeConverter.fromYarn(getAttemptCompletionEvents(
mapCompletionEvents, startIndex, maxEvents));
}
/**
* Simple test of some methods of CompletedJob
* @throws Exception
*/
@Test (timeout=30000)
public void testGetTaskAttemptCompletionEvent() throws Exception{
HistoryFileInfo info = mock(HistoryFileInfo.class);
when(info.getConfFile()).thenReturn(fullConfPath);
completedJob =
new CompletedJob(conf, jobId, fullHistoryPath, loadTasks, "user",
info, jobAclsManager);
TaskCompletionEvent[] events= completedJob.getMapAttemptCompletionEvents(0,1000);
assertEquals(10, completedJob.getMapAttemptCompletionEvents(0,10).length);
int currentEventId=0;
for (TaskCompletionEvent taskAttemptCompletionEvent : events) {
int eventId= taskAttemptCompletionEvent.getEventId();
assertTrue(eventId>=currentEventId);
currentEventId=eventId;
}
assertNull(completedJob.loadConfFile() );
// job name
assertEquals("Sleep job",completedJob.getName());
// queue name
assertEquals("default",completedJob.getQueueName());
// progress
assertEquals(1.0, completedJob.getProgress(),0.001);
// 12 rows in answer
assertEquals(12,completedJob.getTaskAttemptCompletionEvents(0,1000).length);
// select first 10 rows
assertEquals(10,completedJob.getTaskAttemptCompletionEvents(0,10).length);
// select 5-10 rows include 5th
assertEquals(7,completedJob.getTaskAttemptCompletionEvents(5,10).length);
// without errors
assertEquals(1,completedJob.getDiagnostics().size());
assertEquals("",completedJob.getDiagnostics().get(0));
assertEquals(0, completedJob.getJobACLs().size());
}
TaskCompletionEvent[]
getMapAttemptCompletionEvents(int startIndex, int maxEvents);
/**
* Note that this transition method is called directly (and synchronously)
* by MRAppMaster's init() method (i.e., no RPC, no thread-switching;
* just plain sequential call within AM context), so we can trigger
* modifications in AM state from here (at least, if AM is written that
* way; MR version is).
*/
@Override
public JobStateInternal transition(JobImpl job, JobEvent event) {
job.metrics.submittedJob(job);
job.metrics.preparingJob(job);
if (job.newApiCommitter) {
job.jobContext = new JobContextImpl(job.conf,
job.oldJobId);
} else {
job.jobContext = new org.apache.hadoop.mapred.JobContextImpl(
job.conf, job.oldJobId);
}
try {
setup(job);
job.fs = job.getFileSystem(job.conf);
//log to job history
JobSubmittedEvent jse = new JobSubmittedEvent(job.oldJobId,
job.conf.get(MRJobConfig.JOB_NAME, "test"),
job.conf.get(MRJobConfig.USER_NAME, "mapred"),
job.appSubmitTime,
job.remoteJobConfFile.toString(),
job.jobACLs, job.queueName,
job.conf.get(MRJobConfig.WORKFLOW_ID, ""),
job.conf.get(MRJobConfig.WORKFLOW_NAME, ""),
job.conf.get(MRJobConfig.WORKFLOW_NODE_NAME, ""),
getWorkflowAdjacencies(job.conf),
job.conf.get(MRJobConfig.WORKFLOW_TAGS, ""));
job.eventHandler.handle(new JobHistoryEvent(job.jobId, jse));
//TODO JH Verify jobACLs, UserName via UGI?
TaskSplitMetaInfo[] taskSplitMetaInfo = createSplits(job, job.jobId);
job.numMapTasks = taskSplitMetaInfo.length;
job.numReduceTasks = job.conf.getInt(MRJobConfig.NUM_REDUCES, 0);
if (job.numMapTasks == 0 && job.numReduceTasks == 0) {
job.addDiagnostic("No of maps and reduces are 0 " + job.jobId);
} else if (job.numMapTasks == 0) {
job.reduceWeight = 0.9f;
} else if (job.numReduceTasks == 0) {
job.mapWeight = 0.9f;
} else {
job.mapWeight = job.reduceWeight = 0.45f;
}
checkTaskLimits();
long inputLength = 0;
for (int i = 0; i < job.numMapTasks; ++i) {
inputLength += taskSplitMetaInfo[i].getInputDataLength();
}
job.makeUberDecision(inputLength);
job.taskAttemptCompletionEvents =
new ArrayList<TaskAttemptCompletionEvent>(
job.numMapTasks + job.numReduceTasks + 10);
job.mapAttemptCompletionEvents =
new ArrayList<TaskCompletionEvent>(job.numMapTasks + 10);
job.taskCompletionIdxToMapCompletionIdx = new ArrayList<Integer>(
job.numMapTasks + job.numReduceTasks + 10);
job.allowedMapFailuresPercent =
job.conf.getInt(MRJobConfig.MAP_FAILURES_MAX_PERCENT, 0);
job.allowedReduceFailuresPercent =
job.conf.getInt(MRJobConfig.REDUCE_FAILURES_MAXPERCENT, 0);
// create the Tasks but don't start them yet
createMapTasks(job, inputLength, taskSplitMetaInfo);
createReduceTasks(job);
job.metrics.endPreparingJob(job);
return JobStateInternal.INITED;
} catch (Exception e) {
LOG.warn("Job init failed", e);
job.metrics.endPreparingJob(job);
job.addDiagnostic("Job init failed : "
+ StringUtils.stringifyException(e));
// Leave job in the NEW state. The MR AM will detect that the state is
// not INITED and send a JOB_INIT_FAILED event.
return JobStateInternal.NEW;
}
}
@Override
public void transition(JobImpl job, JobEvent event) {
TaskAttemptCompletionEvent tce =
((JobTaskAttemptCompletedEvent) event).getCompletionEvent();
// Add the TaskAttemptCompletionEvent
//eventId is equal to index in the arraylist
tce.setEventId(job.taskAttemptCompletionEvents.size());
job.taskAttemptCompletionEvents.add(tce);
int mapEventIdx = -1;
if (TaskType.MAP.equals(tce.getAttemptId().getTaskId().getTaskType())) {
// we track map completions separately from task completions because
// - getMapAttemptCompletionEvents uses index ranges specific to maps
// - type converting the same events over and over is expensive
mapEventIdx = job.mapAttemptCompletionEvents.size();
job.mapAttemptCompletionEvents.add(TypeConverter.fromYarn(tce));
}
job.taskCompletionIdxToMapCompletionIdx.add(mapEventIdx);
TaskAttemptId attemptId = tce.getAttemptId();
TaskId taskId = attemptId.getTaskId();
//make the previous completion event as obsolete if it exists
Integer successEventNo =
job.successAttemptCompletionEventNoMap.remove(taskId);
if (successEventNo != null) {
TaskAttemptCompletionEvent successEvent =
job.taskAttemptCompletionEvents.get(successEventNo);
successEvent.setStatus(TaskAttemptCompletionEventStatus.OBSOLETE);
int mapCompletionIdx =
job.taskCompletionIdxToMapCompletionIdx.get(successEventNo);
if (mapCompletionIdx >= 0) {
// update the corresponding TaskCompletionEvent for the map
TaskCompletionEvent mapEvent =
job.mapAttemptCompletionEvents.get(mapCompletionIdx);
job.mapAttemptCompletionEvents.set(mapCompletionIdx,
new TaskCompletionEvent(mapEvent.getEventId(),
mapEvent.getTaskAttemptId(), mapEvent.idWithinJob(),
mapEvent.isMapTask(), TaskCompletionEvent.Status.OBSOLETE,
mapEvent.getTaskTrackerHttp()));
}
}
// if this attempt is not successful then why is the previous successful
// attempt being removed above - MAPREDUCE-4330
if (TaskAttemptCompletionEventStatus.SUCCEEDED.equals(tce.getStatus())) {
job.successAttemptCompletionEventNoMap.put(taskId, tce.getEventId());
// here we could have simply called Task.getSuccessfulAttempt() but
// the event that triggers this code is sent before
// Task.successfulAttempt is set and so there is no guarantee that it
// will be available now
Task task = job.tasks.get(taskId);
TaskAttempt attempt = task.getAttempt(attemptId);
NodeId nodeId = attempt.getNodeId();
assert (nodeId != null); // node must exist for a successful event
List<TaskAttemptId> taskAttemptIdList = job.nodesToSucceededTaskAttempts
.get(nodeId);
if (taskAttemptIdList == null) {
taskAttemptIdList = new ArrayList<TaskAttemptId>();
job.nodesToSucceededTaskAttempts.put(nodeId, taskAttemptIdList);
}
taskAttemptIdList.add(attempt.getID());
}
}
@Override
public TaskCompletionEvent[]
getMapAttemptCompletionEvents(int startIndex, int maxEvents) {
throw new UnsupportedOperationException("Not supported yet.");
}
@Test
public void testConsecutiveFetch()
throws IOException, InterruptedException {
final int MAX_EVENTS_TO_FETCH = 100;
TaskAttemptID tid = new TaskAttemptID("12345", 1, TaskType.REDUCE, 1, 1);
TaskUmbilicalProtocol umbilical = mock(TaskUmbilicalProtocol.class);
when(umbilical.getMapCompletionEvents(any(JobID.class),
anyInt(), anyInt(), any(TaskAttemptID.class)))
.thenReturn(getMockedCompletionEventsUpdate(0, 0));
when(umbilical.getMapCompletionEvents(any(JobID.class),
eq(0), eq(MAX_EVENTS_TO_FETCH), eq(tid)))
.thenReturn(getMockedCompletionEventsUpdate(0, MAX_EVENTS_TO_FETCH));
when(umbilical.getMapCompletionEvents(any(JobID.class),
eq(MAX_EVENTS_TO_FETCH), eq(MAX_EVENTS_TO_FETCH), eq(tid)))
.thenReturn(getMockedCompletionEventsUpdate(MAX_EVENTS_TO_FETCH,
MAX_EVENTS_TO_FETCH));
when(umbilical.getMapCompletionEvents(any(JobID.class),
eq(MAX_EVENTS_TO_FETCH*2), eq(MAX_EVENTS_TO_FETCH), eq(tid)))
.thenReturn(getMockedCompletionEventsUpdate(MAX_EVENTS_TO_FETCH*2, 3));
@SuppressWarnings("unchecked")
ShuffleScheduler<String,String> scheduler =
mock(ShuffleScheduler.class);
ExceptionReporter reporter = mock(ExceptionReporter.class);
EventFetcherForTest<String,String> ef =
new EventFetcherForTest<String,String>(tid, umbilical, scheduler,
reporter, MAX_EVENTS_TO_FETCH);
ef.getMapCompletionEvents();
verify(reporter, never()).reportException(any(Throwable.class));
InOrder inOrder = inOrder(umbilical);
inOrder.verify(umbilical).getMapCompletionEvents(any(JobID.class),
eq(0), eq(MAX_EVENTS_TO_FETCH), eq(tid));
inOrder.verify(umbilical).getMapCompletionEvents(any(JobID.class),
eq(MAX_EVENTS_TO_FETCH), eq(MAX_EVENTS_TO_FETCH), eq(tid));
inOrder.verify(umbilical).getMapCompletionEvents(any(JobID.class),
eq(MAX_EVENTS_TO_FETCH*2), eq(MAX_EVENTS_TO_FETCH), eq(tid));
verify(scheduler, times(MAX_EVENTS_TO_FETCH*2 + 3)).resolve(
any(TaskCompletionEvent.class));
}
@Override
public TaskCompletionEvent[] getMapAttemptCompletionEvents(
int startIndex, int maxEvents) {
return null;
}
@Override
public TaskCompletionEvent[] getMapAttemptCompletionEvents(
int startIndex, int maxEvents) {
return mockJob.getMapAttemptCompletionEvents(startIndex, maxEvents);
}
@Override
public TaskCompletionEvent[] getMapAttemptCompletionEvents(
int startIndex, int maxEvents) {
return job.getMapAttemptCompletionEvents(startIndex, maxEvents);
}
TaskCompletionEvent[]
getMapAttemptCompletionEvents(int startIndex, int maxEvents);
/**
* Note that this transition method is called directly (and synchronously)
* by MRAppMaster's init() method (i.e., no RPC, no thread-switching;
* just plain sequential call within AM context), so we can trigger
* modifications in AM state from here (at least, if AM is written that
* way; MR version is).
*/
@Override
public JobStateInternal transition(JobImpl job, JobEvent event) {
job.metrics.submittedJob(job);
job.metrics.preparingJob(job);
if (job.newApiCommitter) {
job.jobContext = new JobContextImpl(job.conf,
job.oldJobId);
} else {
job.jobContext = new org.apache.hadoop.mapred.JobContextImpl(
job.conf, job.oldJobId);
}
try {
setup(job);
job.fs = job.getFileSystem(job.conf);
//log to job history
JobSubmittedEvent jse = new JobSubmittedEvent(job.oldJobId,
job.conf.get(MRJobConfig.JOB_NAME, "test"),
job.conf.get(MRJobConfig.USER_NAME, "mapred"),
job.appSubmitTime,
job.remoteJobConfFile.toString(),
job.jobACLs, job.queueName,
job.conf.get(MRJobConfig.WORKFLOW_ID, ""),
job.conf.get(MRJobConfig.WORKFLOW_NAME, ""),
job.conf.get(MRJobConfig.WORKFLOW_NODE_NAME, ""),
getWorkflowAdjacencies(job.conf),
job.conf.get(MRJobConfig.WORKFLOW_TAGS, ""));
job.eventHandler.handle(new JobHistoryEvent(job.jobId, jse));
//TODO JH Verify jobACLs, UserName via UGI?
TaskSplitMetaInfo[] taskSplitMetaInfo = createSplits(job, job.jobId);
job.numMapTasks = taskSplitMetaInfo.length;
job.numReduceTasks = job.conf.getInt(MRJobConfig.NUM_REDUCES, 0);
if (job.numMapTasks == 0 && job.numReduceTasks == 0) {
job.addDiagnostic("No of maps and reduces are 0 " + job.jobId);
} else if (job.numMapTasks == 0) {
job.reduceWeight = 0.9f;
} else if (job.numReduceTasks == 0) {
job.mapWeight = 0.9f;
} else {
job.mapWeight = job.reduceWeight = 0.45f;
}
checkTaskLimits();
long inputLength = 0;
for (int i = 0; i < job.numMapTasks; ++i) {
inputLength += taskSplitMetaInfo[i].getInputDataLength();
}
job.makeUberDecision(inputLength);
job.taskAttemptCompletionEvents =
new ArrayList<TaskAttemptCompletionEvent>(
job.numMapTasks + job.numReduceTasks + 10);
job.mapAttemptCompletionEvents =
new ArrayList<TaskCompletionEvent>(job.numMapTasks + 10);
job.taskCompletionIdxToMapCompletionIdx = new ArrayList<Integer>(
job.numMapTasks + job.numReduceTasks + 10);
job.allowedMapFailuresPercent =
job.conf.getInt(MRJobConfig.MAP_FAILURES_MAX_PERCENT, 0);
job.allowedReduceFailuresPercent =
job.conf.getInt(MRJobConfig.REDUCE_FAILURES_MAXPERCENT, 0);
// create the Tasks but don't start them yet
createMapTasks(job, inputLength, taskSplitMetaInfo);
createReduceTasks(job);
job.metrics.endPreparingJob(job);
return JobStateInternal.INITED;
} catch (Exception e) {
LOG.warn("Job init failed", e);
job.metrics.endPreparingJob(job);
job.addDiagnostic("Job init failed : "
+ StringUtils.stringifyException(e));
// Leave job in the NEW state. The MR AM will detect that the state is
// not INITED and send a JOB_INIT_FAILED event.
return JobStateInternal.NEW;
}
}
@Override
public void transition(JobImpl job, JobEvent event) {
TaskAttemptCompletionEvent tce =
((JobTaskAttemptCompletedEvent) event).getCompletionEvent();
// Add the TaskAttemptCompletionEvent
//eventId is equal to index in the arraylist
tce.setEventId(job.taskAttemptCompletionEvents.size());
job.taskAttemptCompletionEvents.add(tce);
int mapEventIdx = -1;
if (TaskType.MAP.equals(tce.getAttemptId().getTaskId().getTaskType())) {
// we track map completions separately from task completions because
// - getMapAttemptCompletionEvents uses index ranges specific to maps
// - type converting the same events over and over is expensive
mapEventIdx = job.mapAttemptCompletionEvents.size();
job.mapAttemptCompletionEvents.add(TypeConverter.fromYarn(tce));
}
job.taskCompletionIdxToMapCompletionIdx.add(mapEventIdx);
TaskAttemptId attemptId = tce.getAttemptId();
TaskId taskId = attemptId.getTaskId();
//make the previous completion event as obsolete if it exists
Integer successEventNo =
job.successAttemptCompletionEventNoMap.remove(taskId);
if (successEventNo != null) {
TaskAttemptCompletionEvent successEvent =
job.taskAttemptCompletionEvents.get(successEventNo);
successEvent.setStatus(TaskAttemptCompletionEventStatus.OBSOLETE);
int mapCompletionIdx =
job.taskCompletionIdxToMapCompletionIdx.get(successEventNo);
if (mapCompletionIdx >= 0) {
// update the corresponding TaskCompletionEvent for the map
TaskCompletionEvent mapEvent =
job.mapAttemptCompletionEvents.get(mapCompletionIdx);
job.mapAttemptCompletionEvents.set(mapCompletionIdx,
new TaskCompletionEvent(mapEvent.getEventId(),
mapEvent.getTaskAttemptId(), mapEvent.idWithinJob(),
mapEvent.isMapTask(), TaskCompletionEvent.Status.OBSOLETE,
mapEvent.getTaskTrackerHttp()));
}
}
// if this attempt is not successful then why is the previous successful
// attempt being removed above - MAPREDUCE-4330
if (TaskAttemptCompletionEventStatus.SUCCEEDED.equals(tce.getStatus())) {
job.successAttemptCompletionEventNoMap.put(taskId, tce.getEventId());
// here we could have simply called Task.getSuccessfulAttempt() but
// the event that triggers this code is sent before
// Task.successfulAttempt is set and so there is no guarantee that it
// will be available now
Task task = job.tasks.get(taskId);
TaskAttempt attempt = task.getAttempt(attemptId);
NodeId nodeId = attempt.getNodeId();
assert (nodeId != null); // node must exist for a successful event
List<TaskAttemptId> taskAttemptIdList = job.nodesToSucceededTaskAttempts
.get(nodeId);
if (taskAttemptIdList == null) {
taskAttemptIdList = new ArrayList<TaskAttemptId>();
job.nodesToSucceededTaskAttempts.put(nodeId, taskAttemptIdList);
}
taskAttemptIdList.add(attempt.getID());
}
}
@Override
public TaskCompletionEvent[]
getMapAttemptCompletionEvents(int startIndex, int maxEvents) {
throw new UnsupportedOperationException("Not supported yet.");
}