下面列出了org.apache.hadoop.mapred.TaskStatus.State#SUCCEEDED 实例代码,或者点击链接到github查看源代码,也可以在右侧发表评论。
private long doMakeUpReduceRuntime(State state) {
long reduceTime;
try {
if (state == State.SUCCEEDED) {
reduceTime = makeUpRuntime(job.getSuccessfulReduceAttemptCDF());
} else if (state == State.FAILED) {
reduceTime = makeUpRuntime(job.getFailedReduceAttemptCDF());
} else {
throw new IllegalArgumentException(
"state is neither SUCCEEDED nor FAILED: " + state);
}
return reduceTime;
} catch (NoValueToMakeUpRuntime e) {
return 0;
}
}
private State makeUpState(int taskAttemptNumber, double[] numAttempts) {
// if numAttempts == null we are returning FAILED.
if(numAttempts == null) {
return State.FAILED;
}
if (taskAttemptNumber >= numAttempts.length - 1) {
// always succeed
return State.SUCCEEDED;
} else {
double pSucceed = numAttempts[taskAttemptNumber];
double pFail = 0;
for (int i = taskAttemptNumber + 1; i < numAttempts.length; i++) {
pFail += numAttempts[i];
}
return (random.nextDouble() < pSucceed / (pSucceed + pFail)) ? State.SUCCEEDED
: State.FAILED;
}
}
@SuppressWarnings({ "deprecation", "incomplete-switch" })
@Override
public TaskAttemptInfo getTaskAttemptInfo(
TaskType taskType, int taskNumber, int taskAttemptNumber) {
switch (taskType) {
case MAP:
return new MapTaskAttemptInfo(
State.SUCCEEDED,
new TaskInfo(
m_bytesIn[taskNumber], m_recsIn[taskNumber],
m_bytesOut[taskNumber], m_recsOut[taskNumber], -1),
100);
case REDUCE:
return new ReduceTaskAttemptInfo(
State.SUCCEEDED,
new TaskInfo(
r_bytesIn[taskNumber], r_recsIn[taskNumber],
r_bytesOut[taskNumber], r_recsOut[taskNumber], -1),
100, 100, 100);
}
throw new UnsupportedOperationException();
}
private long doMakeUpReduceRuntime(State state) {
long reduceTime;
try {
if (state == State.SUCCEEDED) {
reduceTime = makeUpRuntime(job.getSuccessfulReduceAttemptCDF());
} else if (state == State.FAILED) {
reduceTime = makeUpRuntime(job.getFailedReduceAttemptCDF());
} else {
throw new IllegalArgumentException(
"state is neither SUCCEEDED nor FAILED: " + state);
}
return reduceTime;
} catch (NoValueToMakeUpRuntime e) {
return 0;
}
}
private List<TaskStatus> collectAndCloneTaskStatuses() {
ArrayList<TaskStatus> statuses = new ArrayList<TaskStatus>();
Set<TaskAttemptID> mark = new HashSet<TaskAttemptID>();
for (SimulatorTaskInProgress tip : tasks.values()) {
statuses.add((TaskStatus) tip.getTaskStatus().clone());
if (tip.getFinalRunState() == State.SUCCEEDED) {
mark.add(tip.getTaskStatus().getTaskID());
}
}
for (TaskAttemptID taskId : mark) {
tasks.remove(taskId);
}
return statuses;
}
@SuppressWarnings({ "deprecation", "incomplete-switch" })
@Override
public TaskAttemptInfo getTaskAttemptInfo(
TaskType taskType, int taskNumber, int taskAttemptNumber) {
switch (taskType) {
case MAP:
return new MapTaskAttemptInfo(
State.SUCCEEDED,
new TaskInfo(
m_bytesIn[taskNumber], m_recsIn[taskNumber],
m_bytesOut[taskNumber], m_recsOut[taskNumber], -1),
100);
case REDUCE:
return new ReduceTaskAttemptInfo(
State.SUCCEEDED,
new TaskInfo(
r_bytesIn[taskNumber], r_recsIn[taskNumber],
r_bytesOut[taskNumber], r_recsOut[taskNumber], -1),
100, 100, 100);
}
throw new UnsupportedOperationException();
}
private long doMakeUpReduceRuntime(State state) {
long reduceTime;
try {
if (state == State.SUCCEEDED) {
reduceTime = makeUpRuntime(job.getSuccessfulReduceAttemptCDF());
} else if (state == State.FAILED) {
reduceTime = makeUpRuntime(job.getFailedReduceAttemptCDF());
} else {
throw new IllegalArgumentException(
"state is neither SUCCEEDED nor FAILED: " + state);
}
return reduceTime;
} catch (NoValueToMakeUpRuntime e) {
return 0;
}
}
private long makeUpMapRuntime(State state, int locality) {
long runtime;
// make up runtime
if (state == State.SUCCEEDED || state == State.FAILED) {
List<LoggedDiscreteCDF> cdfList =
state == State.SUCCEEDED ? job.getSuccessfulMapAttemptCDFs() : job
.getFailedMapAttemptCDFs();
// XXX MapCDFs is a ArrayList of 4 possible groups: distance=0, 1, 2, and
// the last group is "distance cannot be determined". All pig jobs
// would have only the 4th group, and pig tasks usually do not have
// any locality, so this group should count as "distance=2".
// However, setup/cleanup tasks are also counted in the 4th group.
// These tasks do not make sense.
try {
runtime = makeUpRuntime(cdfList.get(locality));
} catch (NoValueToMakeUpRuntime e) {
runtime = makeUpRuntime(cdfList);
}
} else {
throw new IllegalArgumentException(
"state is neither SUCCEEDED nor FAILED: " + state);
}
return runtime;
}
protected TaskAttemptInfo
(State state, TaskInfo taskInfo, List<List<Integer>> allSplits) {
if (state == State.SUCCEEDED || state == State.FAILED) {
this.state = state;
} else {
throw new IllegalArgumentException("status cannot be " + state);
}
this.taskInfo = taskInfo;
this.allSplits = allSplits;
}
private static State convertState(Values status) {
if (status == Values.SUCCESS) {
return State.SUCCEEDED;
} else if (status == Values.FAILED) {
return State.FAILED;
} else if (status == Values.KILLED) {
return State.KILLED;
} else {
throw new IllegalArgumentException("unknown status " + status);
}
}
private long makeUpMapRuntime(State state, int locality) {
long runtime;
// make up runtime
if (state == State.SUCCEEDED || state == State.FAILED) {
List<LoggedDiscreteCDF> cdfList =
state == State.SUCCEEDED ? job.getSuccessfulMapAttemptCDFs() : job
.getFailedMapAttemptCDFs();
// XXX MapCDFs is a ArrayList of 4 possible groups: distance=0, 1, 2, and
// the last group is "distance cannot be determined". All pig jobs
// would have only the 4th group, and pig tasks usually do not have
// any locality, so this group should count as "distance=2".
// However, setup/cleanup tasks are also counted in the 4th group.
// These tasks do not make sense.
if(cdfList==null) {
runtime = -1;
return runtime;
}
try {
runtime = makeUpRuntime(cdfList.get(locality));
} catch (NoValueToMakeUpRuntime e) {
runtime = makeUpRuntime(cdfList);
}
} else {
throw new IllegalArgumentException(
"state is neither SUCCEEDED nor FAILED: " + state);
}
return runtime;
}
public void expectReduceTask(SimulatorTaskTracker taskTracker,
TaskAttemptID taskId, long mapDone,
long reduceRuntime) {
long reduceDone = mapDone + reduceRuntime;
org.apache.hadoop.mapred.TaskAttemptID taskIdOldApi =
org.apache.hadoop.mapred.TaskAttemptID.downgrade(taskId);
ReduceTaskStatus status = new ReduceTaskStatus(taskIdOldApi, 1.0f, 1,
State.SUCCEEDED, null, null, null, Phase.REDUCE, null);
status.setFinishTime(reduceDone);
TaskAttemptCompletionEvent completionEvent =
new TaskAttemptCompletionEvent(taskTracker, status);
addExpected(mapDone, completionEvent);
}
private static State convertState(Values status) {
if (status == Values.SUCCESS) {
return State.SUCCEEDED;
} else if (status == Values.FAILED) {
return State.FAILED;
} else if (status == Values.KILLED) {
return State.KILLED;
} else {
throw new IllegalArgumentException("unknown status " + status);
}
}
private long makeUpMapRuntime(State state, int locality) {
long runtime;
// make up runtime
if (state == State.SUCCEEDED || state == State.FAILED) {
List<LoggedDiscreteCDF> cdfList =
state == State.SUCCEEDED ? job.getSuccessfulMapAttemptCDFs() : job
.getFailedMapAttemptCDFs();
// XXX MapCDFs is a ArrayList of 4 possible groups: distance=0, 1, 2, and
// the last group is "distance cannot be determined". All pig jobs
// would have only the 4th group, and pig tasks usually do not have
// any locality, so this group should count as "distance=2".
// However, setup/cleanup tasks are also counted in the 4th group.
// These tasks do not make sense.
if(cdfList==null) {
runtime = -1;
return runtime;
}
try {
runtime = makeUpRuntime(cdfList.get(locality));
} catch (NoValueToMakeUpRuntime e) {
runtime = makeUpRuntime(cdfList);
}
} else {
throw new IllegalArgumentException(
"state is neither SUCCEEDED nor FAILED: " + state);
}
return runtime;
}
protected TaskAttemptInfo(State state, TaskInfo taskInfo) {
if (state == State.SUCCEEDED || state == State.FAILED) {
this.state = state;
} else {
throw new IllegalArgumentException("status cannot be " + state);
}
this.taskInfo = taskInfo;
}
private static State convertState(Values status) {
if (status == Values.SUCCESS) {
return State.SUCCEEDED;
} else if (status == Values.FAILED) {
return State.FAILED;
} else if (status == Values.KILLED) {
return State.KILLED;
} else {
throw new IllegalArgumentException("unknown status " + status);
}
}
public void expectMapTask(SimulatorTaskTracker taskTracker,
TaskAttemptID taskId,
long mapStart, long mapRuntime) {
long mapDone = mapStart + mapRuntime;
org.apache.hadoop.mapred.TaskAttemptID taskIdOldApi =
org.apache.hadoop.mapred.TaskAttemptID.downgrade(taskId);
MapTaskStatus status = new MapTaskStatus(taskIdOldApi, 1.0f, 1,
State.SUCCEEDED, null, null, null, Phase.MAP, null);
status.setFinishTime(mapDone);
TaskAttemptCompletionEvent completionEvent =
new TaskAttemptCompletionEvent(taskTracker, status);
addExpected(mapStart, completionEvent);
}
/**
* Stops running a task attempt on the task tracker. It also updates the
* number of available slots accordingly.
*
* @param finalStatus the TaskStatus containing the task id and final
* status of the task attempt. This rountine asserts a lot of the
* finalStatus params, in case it is coming from a task attempt
* completion event sent to ourselves. Only the run state, finish time,
* and progress fields of the task attempt are updated.
* @param now Current simulation time, used for assert only
*/
private void finishRunningTask(TaskStatus finalStatus, long now) {
TaskAttemptID taskId = finalStatus.getTaskID();
if (LOG.isDebugEnabled()) {
LOG.debug("Finishing running task id=" + taskId + ", now=" + now);
}
SimulatorTaskInProgress tip = tasks.get(taskId);
if (tip == null) {
throw new IllegalArgumentException("Unknown task attempt " + taskId
+ " completed");
}
TaskStatus currentStatus = tip.getTaskStatus();
if (currentStatus.getRunState() != State.RUNNING) {
throw new IllegalArgumentException(
"Task attempt to finish is not running: " + tip);
}
// Check that finalStatus describes a task attempt that has just been
// completed
State finalRunState = finalStatus.getRunState();
if (finalRunState != State.SUCCEEDED && finalRunState != State.FAILED
&& finalRunState != State.KILLED) {
throw new IllegalArgumentException(
"Final run state for completed task can't be : " + finalRunState
+ " " + tip);
}
if (now != finalStatus.getFinishTime()) {
throw new IllegalArgumentException(
"Current time does not match task finish time: now=" + now
+ ", finish=" + finalStatus.getFinishTime());
}
if (currentStatus.getIsMap() != finalStatus.getIsMap()
|| currentStatus.getNumSlots() != finalStatus.getNumSlots()
|| currentStatus.getPhase() != finalStatus.getPhase()
|| currentStatus.getStartTime() != finalStatus.getStartTime()) {
throw new IllegalArgumentException(
"Current status does not match final status");
}
// We can't assert getShuffleFinishTime() and getSortFinishTime() for
// reduces as those were unknown when the task attempt completion event
// was created. We have not called setMapFinishTime() for maps either.
// If we were really thorough we could update the progress of the task
// and check if it is consistent with finalStatus.
// If we've got this far it is safe to update the task status
currentStatus.setRunState(finalStatus.getRunState());
currentStatus.setFinishTime(finalStatus.getFinishTime());
currentStatus.setProgress(finalStatus.getProgress());
// and update the free slots
int numSlots = currentStatus.getNumSlots();
if (tip.isMapTask()) {
usedMapSlots -= numSlots;
if (usedMapSlots < 0) {
throw new IllegalStateException(
"TaskTracker reaches negative map slots: " + usedMapSlots);
}
} else {
usedReduceSlots -= numSlots;
if (usedReduceSlots < 0) {
throw new IllegalStateException(
"TaskTracker reaches negative reduce slots: " + usedReduceSlots);
}
}
}
public void runMapTask(String taskTrackerName, TaskAttemptID taskId,
long mapStart, long mapRuntime, long killHeartbeat) {
long mapDone = mapStart + mapRuntime;
long mapEndHeartbeat = nextHeartbeat(mapDone);
final boolean isKilled = (killHeartbeat>=0);
if (isKilled) {
mapEndHeartbeat = nextHeartbeat(killHeartbeat + 1);
}
LOG.debug("mapStart=" + mapStart + ", mapDone=" + mapDone +
", mapEndHeartbeat=" + mapEndHeartbeat +
", killHeartbeat=" + killHeartbeat);
final int numSlotsRequired = 1;
org.apache.hadoop.mapred.TaskAttemptID taskIdOldApi =
org.apache.hadoop.mapred.TaskAttemptID.downgrade(taskId);
Task task = new MapTask("dummyjobfile", taskIdOldApi, 0, "dummysplitclass",
null, numSlotsRequired);
// all byte counters are 0
TaskInfo taskInfo = new TaskInfo(0, 0, 0, 0, 0);
MapTaskAttemptInfo taskAttemptInfo =
new MapTaskAttemptInfo(State.SUCCEEDED, taskInfo, mapRuntime);
TaskTrackerAction action =
new SimulatorLaunchTaskAction(task, taskAttemptInfo);
heartbeats.get(mapStart).get(taskTrackerName).addTaskTrackerAction(action);
if (isKilled) {
action = new KillTaskAction(taskIdOldApi);
heartbeats.get(killHeartbeat).get(taskTrackerName).addTaskTrackerAction(
action);
}
for(long simulationTime = mapStart + heartbeatInterval;
simulationTime <= mapEndHeartbeat;
simulationTime += heartbeatInterval) {
State state = simulationTime < mapEndHeartbeat ?
State.RUNNING : State.SUCCEEDED;
if (simulationTime == mapEndHeartbeat && isKilled) {
state = State.KILLED;
}
MapTaskStatus mapStatus = new MapTaskStatus(
task.getTaskID(), 0.0f, 0, state, "", "", null, Phase.MAP, null);
heartbeats.get(simulationTime).get(taskTrackerName).addTaskReport(
mapStatus);
}
}
public void runReduceTask(String taskTrackerName, TaskAttemptID taskId,
long reduceStart, long mapDoneDelay,
long reduceRuntime, long killHeartbeat) {
long mapDone = nextHeartbeat(reduceStart + mapDoneDelay);
long reduceDone = mapDone + reduceRuntime;
long reduceEndHeartbeat = nextHeartbeat(reduceDone);
final boolean isKilled = (killHeartbeat>=0);
if (isKilled) {
reduceEndHeartbeat = nextHeartbeat(killHeartbeat + 1);
}
LOG.debug("reduceStart=" + reduceStart + ", mapDone=" + mapDone +
", reduceDone=" + reduceDone +
", reduceEndHeartbeat=" + reduceEndHeartbeat +
", killHeartbeat=" + killHeartbeat);
final int numSlotsRequired = 1;
org.apache.hadoop.mapred.TaskAttemptID taskIdOldApi =
org.apache.hadoop.mapred.TaskAttemptID.downgrade(taskId);
Task task = new ReduceTask("dummyjobfile", taskIdOldApi, 0, 0,
numSlotsRequired);
// all byte counters are 0
TaskInfo taskInfo = new TaskInfo(0, 0, 0, 0, 0);
ReduceTaskAttemptInfo taskAttemptInfo =
new ReduceTaskAttemptInfo(State.SUCCEEDED, taskInfo, 0, 0,
reduceRuntime);
TaskTrackerAction action =
new SimulatorLaunchTaskAction(task, taskAttemptInfo);
heartbeats.get(reduceStart).get(taskTrackerName).addTaskTrackerAction(
action);
if (!isKilled || mapDone < killHeartbeat) {
action = new AllMapsCompletedTaskAction(task.getTaskID());
heartbeats.get(mapDone).get(taskTrackerName).addTaskTrackerAction(
action);
}
if (isKilled) {
action = new KillTaskAction(taskIdOldApi);
heartbeats.get(killHeartbeat).get(taskTrackerName).addTaskTrackerAction(
action);
}
for(long simulationTime = reduceStart + heartbeatInterval;
simulationTime <= reduceEndHeartbeat;
simulationTime += heartbeatInterval) {
State state = simulationTime < reduceEndHeartbeat ?
State.RUNNING : State.SUCCEEDED;
if (simulationTime == reduceEndHeartbeat && isKilled) {
state = State.KILLED;
}
// mapDone is when the all maps done event delivered
Phase phase = simulationTime <= mapDone ? Phase.SHUFFLE : Phase.REDUCE;
ReduceTaskStatus reduceStatus = new ReduceTaskStatus(
task.getTaskID(), 0.0f, 0, state, "", "", null, phase, null);
heartbeats.get(simulationTime).get(taskTrackerName).addTaskReport(
reduceStatus);
}
}