下面列出了org.apache.hadoop.mapred.TaskStatus.State#RUNNING 实例代码,或者点击链接到github查看源代码,也可以在右侧发表评论。
/**
* Frees up bookkeping memory used by completed tasks.
* Has no effect on the events or logs produced by the SimulatorTaskTracker.
* We need this in order not to report completed task multiple times and
* to ensure that we do not run out of Java heap memory in larger
* simulations.
*/
private void garbageCollectCompletedTasks() {
for (Iterator<TaskAttemptID> iter = tasks.keySet().iterator();
iter.hasNext();) {
TaskAttemptID taskId = iter.next();
SimulatorTaskInProgress tip = tasks.get(taskId);
if (tip.getTaskStatus().getRunState() != State.RUNNING) {
iter.remove();
if (LOG.isDebugEnabled()) {
LOG.debug("Garbage collected SimulatorTIP, taskId=" + taskId);
}
// We don't have to / must not touch usedMapSlots and usedReduceSlots
// as those were already updated by processTaskAttemptCompletionEvent()
// when the task switched its state from running
}
}
}
public int getRunningReduceTasks() {
int running = 0;
for (TaskInfo task : localTasksInfo) {
if (!task.isMap() &&
task.getTaskState() == State.RUNNING) {
running++;
}
}
return running;
}
public long getTotalWastedTime() {
long total = 0;
for (TaskInfo task : localTasksInfo) {
if (task.isMap() ||
(task.getTaskState() == State.RUNNING &&
task.getTaskProgress() > 0)) {
// The reduces that did not yet progress can be considered not started
total += task.getRunningTime();
}
}
return total;
}
public long getRunningTimeWasted() {
long runningTimeWasted = 0;
for (TaskInfo task : localTasksInfo) {
if (task.getTaskState() == State.RUNNING &&
(task.isMap() || task.getTaskProgress() > 0)) {
runningTimeWasted += task.getRunningTime();
}
}
// Another level of complexity here would be to count the time it would
// take to rerun all the map tasks that were not fetched yet.
return runningTimeWasted;
}
int findLaunchTaskActions(HeartbeatResponse response) {
TaskTrackerAction[] actions = response.getActions();
int numLaunchTaskActions = 0;
// HashSet<> numLaunchTaskActions
for (TaskTrackerAction action : actions) {
if (action instanceof SimulatorLaunchTaskAction) {
Task task = ((SimulatorLaunchTaskAction) action).getTask();
numLaunchTaskActions++;
TaskAttemptID taskId = task.getTaskID();
if (tasks.containsKey(taskId)) {
// already have this task..do not need to generate new status
continue;
}
TaskStatus status;
if (task.isMapTask()) {
status = new MapTaskStatus(taskId, 0f, 1, State.RUNNING, "", "",
taskTrackerName, Phase.MAP, new Counters());
} else {
status = new ReduceTaskStatus(taskId, 0f, 1, State.RUNNING, "", "",
taskTrackerName, Phase.SHUFFLE, new Counters());
}
status.setRunState(State.SUCCEEDED);
status.setStartTime(this.now);
SimulatorTaskInProgress tip = new SimulatorTaskInProgress(
(SimulatorLaunchTaskAction) action, status, this.now);
tasks.put(taskId, tip);
}
}
return numLaunchTaskActions;
}
/**
* Is the given task considered as 'running' ?
* @param taskStatus
* @return
*/
private boolean isTaskRunning(TaskStatus taskStatus) {
TaskStatus.State state = taskStatus.getRunState();
return (state == State.RUNNING || state == State.UNASSIGNED ||
taskStatus.inTaskCleanupPhase());
}
private boolean isTaskRunning(TaskStatus.State state) {
return (state == State.RUNNING || state == State.UNASSIGNED);
}
/**
* Stops running a task attempt on the task tracker. It also updates the
* number of available slots accordingly.
*
* @param finalStatus the TaskStatus containing the task id and final
* status of the task attempt. This rountine asserts a lot of the
* finalStatus params, in case it is coming from a task attempt
* completion event sent to ourselves. Only the run state, finish time,
* and progress fields of the task attempt are updated.
* @param now Current simulation time, used for assert only
*/
private void finishRunningTask(TaskStatus finalStatus, long now) {
TaskAttemptID taskId = finalStatus.getTaskID();
if (LOG.isDebugEnabled()) {
LOG.debug("Finishing running task id=" + taskId + ", now=" + now);
}
SimulatorTaskInProgress tip = tasks.get(taskId);
if (tip == null) {
throw new IllegalArgumentException("Unknown task attempt " + taskId
+ " completed");
}
TaskStatus currentStatus = tip.getTaskStatus();
if (currentStatus.getRunState() != State.RUNNING) {
throw new IllegalArgumentException(
"Task attempt to finish is not running: " + tip);
}
// Check that finalStatus describes a task attempt that has just been
// completed
State finalRunState = finalStatus.getRunState();
if (finalRunState != State.SUCCEEDED && finalRunState != State.FAILED
&& finalRunState != State.KILLED) {
throw new IllegalArgumentException(
"Final run state for completed task can't be : " + finalRunState
+ " " + tip);
}
if (now != finalStatus.getFinishTime()) {
throw new IllegalArgumentException(
"Current time does not match task finish time: now=" + now
+ ", finish=" + finalStatus.getFinishTime());
}
if (currentStatus.getIsMap() != finalStatus.getIsMap()
|| currentStatus.getNumSlots() != finalStatus.getNumSlots()
|| currentStatus.getPhase() != finalStatus.getPhase()
|| currentStatus.getStartTime() != finalStatus.getStartTime()) {
throw new IllegalArgumentException(
"Current status does not match final status");
}
// We can't assert getShuffleFinishTime() and getSortFinishTime() for
// reduces as those were unknown when the task attempt completion event
// was created. We have not called setMapFinishTime() for maps either.
// If we were really thorough we could update the progress of the task
// and check if it is consistent with finalStatus.
// If we've got this far it is safe to update the task status
currentStatus.setRunState(finalStatus.getRunState());
currentStatus.setFinishTime(finalStatus.getFinishTime());
currentStatus.setProgress(finalStatus.getProgress());
// and update the free slots
int numSlots = currentStatus.getNumSlots();
if (tip.isMapTask()) {
usedMapSlots -= numSlots;
if (usedMapSlots < 0) {
throw new IllegalStateException(
"TaskTracker reaches negative map slots: " + usedMapSlots);
}
} else {
usedReduceSlots -= numSlots;
if (usedReduceSlots < 0) {
throw new IllegalStateException(
"TaskTracker reaches negative reduce slots: " + usedReduceSlots);
}
}
}
/**
* Launches a task on the simulated task tracker.
*
* @param action SimulatorLaunchTaskAction sent by the job tracker
* @param now current simulation time
* @return new events generated, a TaskAttemptCompletionEvent for map
* tasks, empty otherwise
*/
private List<SimulatorEvent> handleSimulatorLaunchTaskAction(
SimulatorLaunchTaskAction action, long now) {
if (LOG.isDebugEnabled()) {
LOG.debug("Handling launch task action " + action);
}
// First, create statuses and update used slots for map and reduce
// task separately
Task task = action.getTask();
TaskAttemptID taskId = task.getTaskID();
if (tasks.containsKey(taskId)) {
throw new IllegalArgumentException("Multiple launch of task id =" + taskId);
}
// Ctor of MapTaskStatus and ReduceTaskStatus need deprecated
// o.a.h.mapred.TaskAttemptID, hence the downgrade
org.apache.hadoop.mapred.TaskAttemptID taskIdOldApi =
org.apache.hadoop.mapred.TaskAttemptID.downgrade(taskId);
TaskStatus status;
int numSlotsRequired = task.getNumSlotsRequired();
Counters emptyCounters = new Counters();
if (task.isMapTask()) {
status = new MapTaskStatus(taskIdOldApi, 0f, numSlotsRequired,
State.RUNNING, "", "", taskTrackerName,
Phase.MAP, emptyCounters);
usedMapSlots += numSlotsRequired;
if (usedMapSlots > maxMapSlots) {
throw new IllegalStateException("usedMapSlots exceeds maxMapSlots: " +
usedMapSlots + " > " + maxMapSlots);
}
} else {
status = new ReduceTaskStatus(taskIdOldApi, 0f, numSlotsRequired,
State.RUNNING, "", "", taskTrackerName,
Phase.SHUFFLE, emptyCounters);
usedReduceSlots += numSlotsRequired;
if (usedReduceSlots > maxReduceSlots) {
throw new IllegalStateException("usedReduceSlots exceeds usedReduceSlots: " +
usedReduceSlots + " > " + usedReduceSlots);
}
}
// Second, create and store a TIP
status.setStartTime(now);
SimulatorTaskInProgress tip =
new SimulatorTaskInProgress(action, status, now);
tasks.put(taskId, tip);
// Third, schedule events for ourselves
if (task.isMapTask()) {
// we know when this task attempts ends iff it is a map
TaskAttemptCompletionEvent e = createTaskAttemptCompletionEvent(tip, now);
return Collections.<SimulatorEvent>singletonList(e);
} else {
// reduce, completion time can only be determined when all maps are done
return SimulatorEngine.EMPTY_EVENTS;
}
}
/**
* Updates the progress indicator of a task if it is running.
*
* @param tip simulator task in progress whose progress is to be updated
* @param now current simulation time
*/
private void progressTaskStatus(SimulatorTaskInProgress tip, long now) {
TaskStatus status = tip.getTaskStatus();
if (status.getRunState() != State.RUNNING) {
return; // nothing to be done
}
boolean isMap = tip.isMapTask();
// Time when the user space code started
long startTime = -1;
// Time spent in map or just in the REDUCE phase of a reduce task
long runTime = tip.getUserSpaceRunTime();
float progress = 0.0f;
if (isMap) {
// We linearly estimate the progress of maps since their start
startTime = status.getStartTime();
progress = ((float)(now - startTime)) / runTime;
} else {
// We don't model reduce progress in the SHUFFLE or SORT phases
// We use linear estimate for the 3rd, REDUCE phase
Phase reducePhase = status.getPhase();
switch (reducePhase) {
case SHUFFLE:
progress = 0.0f; // 0 phase is done out of 3
break;
case SORT:
progress = 1.0f/3; // 1 phase is done out of 3
break;
case REDUCE: {
// REDUCE phase with the user code started when sort finished
startTime = status.getSortFinishTime();
// 0.66f : 2 phases are done out of of 3
progress = 2.0f/3 + (((float) (now - startTime)) / runTime) / 3.0f;
}
break;
default:
// should never get here
throw new IllegalArgumentException("Invalid reducePhase=" + reducePhase);
}
}
final float EPSILON = 0.0001f;
if (progress < -EPSILON || progress > 1 + EPSILON) {
throw new IllegalStateException("Task progress out of range: " + progress);
}
progress = Math.max(Math.min(1.0f, progress), 0.0f);
status.setProgress(progress);
if (LOG.isDebugEnabled()) {
LOG.debug("Updated task progress, taskId=" + status.getTaskID()
+ ", progress=" + status.getProgress());
}
}
public void runMapTask(String taskTrackerName, TaskAttemptID taskId,
long mapStart, long mapRuntime, long killHeartbeat) {
long mapDone = mapStart + mapRuntime;
long mapEndHeartbeat = nextHeartbeat(mapDone);
final boolean isKilled = (killHeartbeat>=0);
if (isKilled) {
mapEndHeartbeat = nextHeartbeat(killHeartbeat + 1);
}
LOG.debug("mapStart=" + mapStart + ", mapDone=" + mapDone +
", mapEndHeartbeat=" + mapEndHeartbeat +
", killHeartbeat=" + killHeartbeat);
final int numSlotsRequired = 1;
org.apache.hadoop.mapred.TaskAttemptID taskIdOldApi =
org.apache.hadoop.mapred.TaskAttemptID.downgrade(taskId);
Task task = new MapTask("dummyjobfile", taskIdOldApi, 0, "dummysplitclass",
null, numSlotsRequired);
// all byte counters are 0
TaskInfo taskInfo = new TaskInfo(0, 0, 0, 0, 0);
MapTaskAttemptInfo taskAttemptInfo =
new MapTaskAttemptInfo(State.SUCCEEDED, taskInfo, mapRuntime);
TaskTrackerAction action =
new SimulatorLaunchTaskAction(task, taskAttemptInfo);
heartbeats.get(mapStart).get(taskTrackerName).addTaskTrackerAction(action);
if (isKilled) {
action = new KillTaskAction(taskIdOldApi);
heartbeats.get(killHeartbeat).get(taskTrackerName).addTaskTrackerAction(
action);
}
for(long simulationTime = mapStart + heartbeatInterval;
simulationTime <= mapEndHeartbeat;
simulationTime += heartbeatInterval) {
State state = simulationTime < mapEndHeartbeat ?
State.RUNNING : State.SUCCEEDED;
if (simulationTime == mapEndHeartbeat && isKilled) {
state = State.KILLED;
}
MapTaskStatus mapStatus = new MapTaskStatus(
task.getTaskID(), 0.0f, 0, state, "", "", null, Phase.MAP, null);
heartbeats.get(simulationTime).get(taskTrackerName).addTaskReport(
mapStatus);
}
}
public void runReduceTask(String taskTrackerName, TaskAttemptID taskId,
long reduceStart, long mapDoneDelay,
long reduceRuntime, long killHeartbeat) {
long mapDone = nextHeartbeat(reduceStart + mapDoneDelay);
long reduceDone = mapDone + reduceRuntime;
long reduceEndHeartbeat = nextHeartbeat(reduceDone);
final boolean isKilled = (killHeartbeat>=0);
if (isKilled) {
reduceEndHeartbeat = nextHeartbeat(killHeartbeat + 1);
}
LOG.debug("reduceStart=" + reduceStart + ", mapDone=" + mapDone +
", reduceDone=" + reduceDone +
", reduceEndHeartbeat=" + reduceEndHeartbeat +
", killHeartbeat=" + killHeartbeat);
final int numSlotsRequired = 1;
org.apache.hadoop.mapred.TaskAttemptID taskIdOldApi =
org.apache.hadoop.mapred.TaskAttemptID.downgrade(taskId);
Task task = new ReduceTask("dummyjobfile", taskIdOldApi, 0, 0,
numSlotsRequired);
// all byte counters are 0
TaskInfo taskInfo = new TaskInfo(0, 0, 0, 0, 0);
ReduceTaskAttemptInfo taskAttemptInfo =
new ReduceTaskAttemptInfo(State.SUCCEEDED, taskInfo, 0, 0,
reduceRuntime);
TaskTrackerAction action =
new SimulatorLaunchTaskAction(task, taskAttemptInfo);
heartbeats.get(reduceStart).get(taskTrackerName).addTaskTrackerAction(
action);
if (!isKilled || mapDone < killHeartbeat) {
action = new AllMapsCompletedTaskAction(task.getTaskID());
heartbeats.get(mapDone).get(taskTrackerName).addTaskTrackerAction(
action);
}
if (isKilled) {
action = new KillTaskAction(taskIdOldApi);
heartbeats.get(killHeartbeat).get(taskTrackerName).addTaskTrackerAction(
action);
}
for(long simulationTime = reduceStart + heartbeatInterval;
simulationTime <= reduceEndHeartbeat;
simulationTime += heartbeatInterval) {
State state = simulationTime < reduceEndHeartbeat ?
State.RUNNING : State.SUCCEEDED;
if (simulationTime == reduceEndHeartbeat && isKilled) {
state = State.KILLED;
}
// mapDone is when the all maps done event delivered
Phase phase = simulationTime <= mapDone ? Phase.SHUFFLE : Phase.REDUCE;
ReduceTaskStatus reduceStatus = new ReduceTaskStatus(
task.getTaskID(), 0.0f, 0, state, "", "", null, phase, null);
heartbeats.get(simulationTime).get(taskTrackerName).addTaskReport(
reduceStatus);
}
}