下面列出了org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptId#getId ( ) 实例代码,或者点击链接到github查看源代码,也可以在右侧发表评论。
@Override
protected ContainerLauncher createContainerLauncher(AppContext context) {
MockContainerLauncher launcher = new MockContainerLauncher() {
@Override
public void handle(ContainerLauncherEvent event) {
TaskAttemptId taskAttemptID = event.getTaskAttemptID();
// Pass everything except the 2nd attempt of the first task.
if (taskAttemptID.getId() != 1
|| taskAttemptID.getTaskId().getId() != 0) {
super.handle(event);
}
}
};
launcher.shufflePort = 5467;
return launcher;
}
@Override
protected void dispatch(Event event) {
if (event instanceof TaskAttemptEvent) {
TaskAttemptEvent attemptEvent = (TaskAttemptEvent) event;
TaskAttemptId attemptID = ((TaskAttemptEvent) event).getTaskAttemptID();
if (attemptEvent.getType() == this.attemptEventTypeToWait
&& attemptID.getTaskId().getId() == 0 && attemptID.getId() == 0 ) {
try {
latch.await();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
super.dispatch(event);
}
@Override
protected void attemptLaunched(TaskAttemptId attemptID) {
if (attemptID.getTaskId().getId() == 0 && attemptID.getId() == 0) {
//this blocks the first task's first attempt
//the subsequent ones are completed
try {
latch.await();
} catch (InterruptedException e) {
e.printStackTrace();
}
} else {
getContext().getEventHandler().handle(
new TaskAttemptEvent(attemptID,
TaskAttemptEventType.TA_DONE));
}
}
@Override
protected ContainerLauncher createContainerLauncher(AppContext context) {
MockContainerLauncher launcher = new MockContainerLauncher() {
@Override
public void handle(ContainerLauncherEvent event) {
TaskAttemptId taskAttemptID = event.getTaskAttemptID();
// Pass everything except the 2nd attempt of the first task.
if (taskAttemptID.getId() != 1
|| taskAttemptID.getTaskId().getId() != 0) {
super.handle(event);
}
}
};
launcher.shufflePort = 5467;
return launcher;
}
@Override
protected void dispatch(Event event) {
if (event instanceof TaskAttemptEvent) {
TaskAttemptEvent attemptEvent = (TaskAttemptEvent) event;
TaskAttemptId attemptID = ((TaskAttemptEvent) event).getTaskAttemptID();
if (attemptEvent.getType() == this.attemptEventTypeToWait
&& attemptID.getTaskId().getId() == 0 && attemptID.getId() == 0 ) {
try {
latch.await();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
super.dispatch(event);
}
@Override
protected void attemptLaunched(TaskAttemptId attemptID) {
if (attemptID.getTaskId().getId() == 0 && attemptID.getId() == 0) {
//this blocks the first task's first attempt
//the subsequent ones are completed
try {
latch.await();
} catch (InterruptedException e) {
e.printStackTrace();
}
} else {
getContext().getEventHandler().handle(
new TaskAttemptEvent(attemptID,
TaskAttemptEventType.TA_DONE));
}
}
@Override
protected void attemptLaunched(TaskAttemptId attemptID) {
if (attemptID.getTaskId().getId() == 0 && attemptID.getId() == 0) {
//check if it is first task's first attempt
// send the Fail event
getContext().getEventHandler().handle(
new TaskAttemptEvent(attemptID,
TaskAttemptEventType.TA_FAILMSG));
} else {
getContext().getEventHandler().handle(
new TaskAttemptEvent(attemptID,
TaskAttemptEventType.TA_DONE));
}
}
@Override
public long estimatedRuntime(TaskAttemptId id) {
if ((id.getTaskId().getId() == 0) && (id.getId() == 0)) {
return SPECULATE_THIS;
}
return super.estimatedRuntime(id);
}
@SuppressWarnings("unchecked")
@Override
protected void attemptLaunched(TaskAttemptId attemptID) {
if (attemptID.getTaskId().getId() == 0 && attemptID.getId() == 0) {
getContext().getEventHandler().handle(
new TaskAttemptEvent(attemptID, TaskAttemptEventType.TA_FAILMSG));
} else {
getContext().getEventHandler().handle(
new TaskAttemptEvent(attemptID, TaskAttemptEventType.TA_DONE));
}
}
@Override
protected void attemptLaunched(TaskAttemptId attemptID) {
if (attemptID.getTaskId().getId() == 0 && attemptID.getId() == 0) {
//check if it is first task's first attempt
// send the Fail event
getContext().getEventHandler().handle(
new TaskAttemptEvent(attemptID,
TaskAttemptEventType.TA_FAILMSG));
} else {
getContext().getEventHandler().handle(
new TaskAttemptEvent(attemptID,
TaskAttemptEventType.TA_DONE));
}
}
@Override
public long estimatedRuntime(TaskAttemptId id) {
if ((id.getTaskId().getId() == 0) && (id.getId() == 0)) {
return SPECULATE_THIS;
}
return super.estimatedRuntime(id);
}
@SuppressWarnings("unchecked")
@Override
protected void attemptLaunched(TaskAttemptId attemptID) {
if (attemptID.getTaskId().getId() == 0 && attemptID.getId() == 0) {
getContext().getEventHandler().handle(
new TaskAttemptEvent(attemptID, TaskAttemptEventType.TA_FAILMSG));
} else {
getContext().getEventHandler().handle(
new TaskAttemptEvent(attemptID, TaskAttemptEventType.TA_DONE));
}
}
@Test
public void testKillTaskWait() throws Exception {
final Dispatcher dispatcher = new AsyncDispatcher() {
private TaskAttemptEvent cachedKillEvent;
@Override
protected void dispatch(Event event) {
if (event instanceof TaskAttemptEvent) {
TaskAttemptEvent killEvent = (TaskAttemptEvent) event;
if (killEvent.getType() == TaskAttemptEventType.TA_KILL) {
TaskAttemptId taID = killEvent.getTaskAttemptID();
if (taID.getTaskId().getTaskType() == TaskType.REDUCE
&& taID.getTaskId().getId() == 0 && taID.getId() == 0) {
// Task is asking the reduce TA to kill itself. 'Create' a race
// condition. Make the task succeed and then inform the task that
// TA has succeeded. Once Task gets the TA succeeded event at
// KILL_WAIT, then relay the actual kill signal to TA
super.dispatch(new TaskAttemptEvent(taID,
TaskAttemptEventType.TA_DONE));
super.dispatch(new TaskAttemptEvent(taID,
TaskAttemptEventType.TA_CONTAINER_CLEANED));
super.dispatch(new TaskTAttemptEvent(taID,
TaskEventType.T_ATTEMPT_SUCCEEDED));
this.cachedKillEvent = killEvent;
return;
}
}
} else if (event instanceof TaskEvent) {
TaskEvent taskEvent = (TaskEvent) event;
if (taskEvent.getType() == TaskEventType.T_ATTEMPT_SUCCEEDED
&& this.cachedKillEvent != null) {
// When the TA comes and reports that it is done, send the
// cachedKillEvent
super.dispatch(this.cachedKillEvent);
return;
}
}
super.dispatch(event);
}
};
MRApp app = new MRApp(1, 1, false, this.getClass().getName(), true) {
@Override
public Dispatcher createDispatcher() {
return dispatcher;
}
};
Job job = app.submit(new Configuration());
JobId jobId = app.getJobId();
app.waitForState(job, JobState.RUNNING);
Assert.assertEquals("Num tasks not correct", 2, job.getTasks().size());
Iterator<Task> it = job.getTasks().values().iterator();
Task mapTask = it.next();
Task reduceTask = it.next();
app.waitForState(mapTask, TaskState.RUNNING);
app.waitForState(reduceTask, TaskState.RUNNING);
TaskAttempt mapAttempt = mapTask.getAttempts().values().iterator().next();
app.waitForState(mapAttempt, TaskAttemptState.RUNNING);
TaskAttempt reduceAttempt = reduceTask.getAttempts().values().iterator().next();
app.waitForState(reduceAttempt, TaskAttemptState.RUNNING);
// Finish map
app.getContext().getEventHandler().handle(
new TaskAttemptEvent(
mapAttempt.getID(),
TaskAttemptEventType.TA_DONE));
app.waitForState(mapTask, TaskState.SUCCEEDED);
// Now kill the job
app.getContext().getEventHandler()
.handle(new JobEvent(jobId, JobEventType.JOB_KILL));
app.waitForInternalState((JobImpl)job, JobStateInternal.KILLED);
}
public static org.apache.hadoop.mapred.TaskAttemptID fromYarn(
TaskAttemptId id) {
return new org.apache.hadoop.mapred.TaskAttemptID(fromYarn(id.getTaskId()),
id.getId());
}
@Test
public void testKillTaskWait() throws Exception {
final Dispatcher dispatcher = new AsyncDispatcher() {
private TaskAttemptEvent cachedKillEvent;
@Override
protected void dispatch(Event event) {
if (event instanceof TaskAttemptEvent) {
TaskAttemptEvent killEvent = (TaskAttemptEvent) event;
if (killEvent.getType() == TaskAttemptEventType.TA_KILL) {
TaskAttemptId taID = killEvent.getTaskAttemptID();
if (taID.getTaskId().getTaskType() == TaskType.REDUCE
&& taID.getTaskId().getId() == 0 && taID.getId() == 0) {
// Task is asking the reduce TA to kill itself. 'Create' a race
// condition. Make the task succeed and then inform the task that
// TA has succeeded. Once Task gets the TA succeeded event at
// KILL_WAIT, then relay the actual kill signal to TA
super.dispatch(new TaskAttemptEvent(taID,
TaskAttemptEventType.TA_DONE));
super.dispatch(new TaskAttemptEvent(taID,
TaskAttemptEventType.TA_CONTAINER_CLEANED));
super.dispatch(new TaskTAttemptEvent(taID,
TaskEventType.T_ATTEMPT_SUCCEEDED));
this.cachedKillEvent = killEvent;
return;
}
}
} else if (event instanceof TaskEvent) {
TaskEvent taskEvent = (TaskEvent) event;
if (taskEvent.getType() == TaskEventType.T_ATTEMPT_SUCCEEDED
&& this.cachedKillEvent != null) {
// When the TA comes and reports that it is done, send the
// cachedKillEvent
super.dispatch(this.cachedKillEvent);
return;
}
}
super.dispatch(event);
}
};
MRApp app = new MRApp(1, 1, false, this.getClass().getName(), true) {
@Override
public Dispatcher createDispatcher() {
return dispatcher;
}
};
Job job = app.submit(new Configuration());
JobId jobId = app.getJobId();
app.waitForState(job, JobState.RUNNING);
Assert.assertEquals("Num tasks not correct", 2, job.getTasks().size());
Iterator<Task> it = job.getTasks().values().iterator();
Task mapTask = it.next();
Task reduceTask = it.next();
app.waitForState(mapTask, TaskState.RUNNING);
app.waitForState(reduceTask, TaskState.RUNNING);
TaskAttempt mapAttempt = mapTask.getAttempts().values().iterator().next();
app.waitForState(mapAttempt, TaskAttemptState.RUNNING);
TaskAttempt reduceAttempt = reduceTask.getAttempts().values().iterator().next();
app.waitForState(reduceAttempt, TaskAttemptState.RUNNING);
// Finish map
app.getContext().getEventHandler().handle(
new TaskAttemptEvent(
mapAttempt.getID(),
TaskAttemptEventType.TA_DONE));
app.waitForState(mapTask, TaskState.SUCCEEDED);
// Now kill the job
app.getContext().getEventHandler()
.handle(new JobEvent(jobId, JobEventType.JOB_KILL));
app.waitForInternalState((JobImpl)job, JobStateInternal.KILLED);
}
public static org.apache.hadoop.mapred.TaskAttemptID fromYarn(
TaskAttemptId id) {
return new org.apache.hadoop.mapred.TaskAttemptID(fromYarn(id.getTaskId()),
id.getId());
}