下面列出了org.apache.hadoop.mapreduce.v2.api.records.TaskId#getJobId ( ) 实例代码,或者点击链接到github查看源代码,也可以在右侧发表评论。
private AtomicInteger containerNeed(TaskId taskID) {
JobId jobID = taskID.getJobId();
TaskType taskType = taskID.getTaskType();
ConcurrentMap<JobId, AtomicInteger> relevantMap
= taskType == TaskType.MAP ? mapContainerNeeds : reduceContainerNeeds;
AtomicInteger result = relevantMap.get(jobID);
if (result == null) {
relevantMap.putIfAbsent(jobID, new AtomicInteger(0));
result = relevantMap.get(jobID);
}
return result;
}
protected DataStatistics dataStatisticsForTask(TaskId taskID) {
JobId jobID = taskID.getJobId();
Job job = context.getJob(jobID);
if (job == null) {
return null;
}
Task task = job.getTask(taskID);
if (task == null) {
return null;
}
return task.getType() == TaskType.MAP
? mapperStatistics.get(job)
: task.getType() == TaskType.REDUCE
? reducerStatistics.get(job)
: null;
}
private long storedPerAttemptValue
(Map<TaskAttempt, AtomicLong> data, TaskAttemptId attemptID) {
TaskId taskID = attemptID.getTaskId();
JobId jobID = taskID.getJobId();
Job job = context.getJob(jobID);
Task task = job.getTask(taskID);
if (task == null) {
return -1L;
}
TaskAttempt taskAttempt = task.getAttempt(attemptID);
if (taskAttempt == null) {
return -1L;
}
AtomicLong estimate = data.get(taskAttempt);
return estimate == null ? -1L : estimate.get();
}
private AtomicInteger containerNeed(TaskId taskID) {
JobId jobID = taskID.getJobId();
TaskType taskType = taskID.getTaskType();
ConcurrentMap<JobId, AtomicInteger> relevantMap
= taskType == TaskType.MAP ? mapContainerNeeds : reduceContainerNeeds;
AtomicInteger result = relevantMap.get(jobID);
if (result == null) {
relevantMap.putIfAbsent(jobID, new AtomicInteger(0));
result = relevantMap.get(jobID);
}
return result;
}
protected DataStatistics dataStatisticsForTask(TaskId taskID) {
JobId jobID = taskID.getJobId();
Job job = context.getJob(jobID);
if (job == null) {
return null;
}
Task task = job.getTask(taskID);
if (task == null) {
return null;
}
return task.getType() == TaskType.MAP
? mapperStatistics.get(job)
: task.getType() == TaskType.REDUCE
? reducerStatistics.get(job)
: null;
}
private long storedPerAttemptValue
(Map<TaskAttempt, AtomicLong> data, TaskAttemptId attemptID) {
TaskId taskID = attemptID.getTaskId();
JobId jobID = taskID.getJobId();
Job job = context.getJob(jobID);
Task task = job.getTask(taskID);
if (task == null) {
return -1L;
}
TaskAttempt taskAttempt = task.getAttempt(attemptID);
if (taskAttempt == null) {
return -1L;
}
AtomicLong estimate = data.get(taskAttempt);
return estimate == null ? -1L : estimate.get();
}
boolean canSpeculate(AppContext context, TaskId taskID) {
// This class rejects speculating any task that already has speculations,
// or isn't running.
// Subclasses should call TaskSpeculationPredicate.canSpeculate(...) , but
// can be even more restrictive.
JobId jobID = taskID.getJobId();
Job job = context.getJob(jobID);
Task task = job.getTask(taskID);
return task.getAttempts().size() == 1;
}
@Override
public long thresholdRuntime(TaskId taskID) {
JobId jobID = taskID.getJobId();
Job job = context.getJob(jobID);
TaskType type = taskID.getTaskType();
DataStatistics statistics
= dataStatisticsForTask(taskID);
int completedTasksOfType
= type == TaskType.MAP
? job.getCompletedMaps() : job.getCompletedReduces();
int totalTasksOfType
= type == TaskType.MAP
? job.getTotalMaps() : job.getTotalReduces();
if (completedTasksOfType < MINIMUM_COMPLETE_NUMBER_TO_SPECULATE
|| (((float)completedTasksOfType) / totalTasksOfType)
< MINIMUM_COMPLETE_PROPORTION_TO_SPECULATE ) {
return Long.MAX_VALUE;
}
long result = statistics == null
? Long.MAX_VALUE
: (long)statistics.outlier(slowTaskRelativeTresholds.get(job));
return result;
}
private static JobCounterUpdateEvent createJobCounterUpdateEventTASucceeded(
TaskAttemptImpl taskAttempt) {
TaskId taskId = taskAttempt.attemptId.getTaskId();
JobCounterUpdateEvent jce = new JobCounterUpdateEvent(taskId.getJobId());
updateMillisCounters(jce, taskAttempt);
return jce;
}
boolean canSpeculate(AppContext context, TaskId taskID) {
// This class rejects speculating any task that already has speculations,
// or isn't running.
// Subclasses should call TaskSpeculationPredicate.canSpeculate(...) , but
// can be even more restrictive.
JobId jobID = taskID.getJobId();
Job job = context.getJob(jobID);
Task task = job.getTask(taskID);
return task.getAttempts().size() == 1;
}
@Override
public long thresholdRuntime(TaskId taskID) {
JobId jobID = taskID.getJobId();
Job job = context.getJob(jobID);
TaskType type = taskID.getTaskType();
DataStatistics statistics
= dataStatisticsForTask(taskID);
int completedTasksOfType
= type == TaskType.MAP
? job.getCompletedMaps() : job.getCompletedReduces();
int totalTasksOfType
= type == TaskType.MAP
? job.getTotalMaps() : job.getTotalReduces();
if (completedTasksOfType < MINIMUM_COMPLETE_NUMBER_TO_SPECULATE
|| (((float)completedTasksOfType) / totalTasksOfType)
< MINIMUM_COMPLETE_PROPORTION_TO_SPECULATE ) {
return Long.MAX_VALUE;
}
long result = statistics == null
? Long.MAX_VALUE
: (long)statistics.outlier(slowTaskRelativeTresholds.get(job));
return result;
}
private static JobCounterUpdateEvent createJobCounterUpdateEventTASucceeded(
TaskAttemptImpl taskAttempt) {
TaskId taskId = taskAttempt.attemptId.getTaskId();
JobCounterUpdateEvent jce = new JobCounterUpdateEvent(taskId.getJobId());
updateMillisCounters(jce, taskAttempt);
return jce;
}
@Override
public void updateAttempt(TaskAttemptStatus status, long timestamp) {
TaskAttemptId attemptID = status.id;
TaskId taskID = attemptID.getTaskId();
JobId jobID = taskID.getJobId();
Job job = context.getJob(jobID);
if (job == null) {
return;
}
Task task = job.getTask(taskID);
if (task == null) {
return;
}
Long boxedStart = startTimes.get(attemptID);
long start = boxedStart == null ? Long.MIN_VALUE : boxedStart;
TaskAttempt taskAttempt = task.getAttempt(attemptID);
if (taskAttempt.getState() == TaskAttemptState.SUCCEEDED) {
boolean isNew = false;
// is this a new success?
synchronized (doneTasks) {
if (!doneTasks.contains(task)) {
doneTasks.add(task);
isNew = true;
}
}
// It's a new completion
// Note that if a task completes twice [because of a previous speculation
// and a race, or a success followed by loss of the machine with the
// local data] we only count the first one.
if (isNew) {
long finish = timestamp;
if (start > 1L && finish > 1L && start <= finish) {
long duration = finish - start;
DataStatistics statistics
= dataStatisticsForTask(taskID);
if (statistics != null) {
statistics.add(duration);
}
}
}
}
}
@Override
public void updateAttempt(TaskAttemptStatus status, long timestamp) {
super.updateAttempt(status, timestamp);
TaskAttemptId attemptID = status.id;
TaskId taskID = attemptID.getTaskId();
JobId jobID = taskID.getJobId();
Job job = context.getJob(jobID);
if (job == null) {
return;
}
Task task = job.getTask(taskID);
if (task == null) {
return;
}
TaskAttempt taskAttempt = task.getAttempt(attemptID);
if (taskAttempt == null) {
return;
}
Long boxedStart = startTimes.get(attemptID);
long start = boxedStart == null ? Long.MIN_VALUE : boxedStart;
// We need to do two things.
// 1: If this is a completion, we accumulate statistics in the superclass
// 2: If this is not a completion, we learn more about it.
// This is not a completion, but we're cooking.
//
if (taskAttempt.getState() == TaskAttemptState.RUNNING) {
// See if this task is already in the registry
AtomicLong estimateContainer = attemptRuntimeEstimates.get(taskAttempt);
AtomicLong estimateVarianceContainer
= attemptRuntimeEstimateVariances.get(taskAttempt);
if (estimateContainer == null) {
if (attemptRuntimeEstimates.get(taskAttempt) == null) {
attemptRuntimeEstimates.put(taskAttempt, new AtomicLong());
estimateContainer = attemptRuntimeEstimates.get(taskAttempt);
}
}
if (estimateVarianceContainer == null) {
attemptRuntimeEstimateVariances.putIfAbsent(taskAttempt, new AtomicLong());
estimateVarianceContainer = attemptRuntimeEstimateVariances.get(taskAttempt);
}
long estimate = -1;
long varianceEstimate = -1;
// This code assumes that we'll never consider starting a third
// speculative task attempt if two are already running for this task
if (start > 0 && timestamp > start) {
estimate = (long) ((timestamp - start) / Math.max(0.0001, status.progress));
varianceEstimate = (long) (estimate * status.progress / 10);
}
if (estimateContainer != null) {
estimateContainer.set(estimate);
}
if (estimateVarianceContainer != null) {
estimateVarianceContainer.set(varianceEstimate);
}
}
}
public JobMapTaskRescheduledEvent(TaskId taskID) {
super(taskID.getJobId(), JobEventType.JOB_MAP_TASK_RESCHEDULED);
this.taskID = taskID;
}
public JobTaskEvent(TaskId taskID, TaskState taskState) {
super(taskID.getJobId(), JobEventType.JOB_TASK_COMPLETED);
this.taskID = taskID;
this.taskState = taskState;
}
@Override
public void updateAttempt(TaskAttemptStatus status, long timestamp) {
TaskAttemptId attemptID = status.id;
TaskId taskID = attemptID.getTaskId();
JobId jobID = taskID.getJobId();
Job job = context.getJob(jobID);
if (job == null) {
return;
}
Task task = job.getTask(taskID);
if (task == null) {
return;
}
Long boxedStart = startTimes.get(attemptID);
long start = boxedStart == null ? Long.MIN_VALUE : boxedStart;
TaskAttempt taskAttempt = task.getAttempt(attemptID);
if (taskAttempt.getState() == TaskAttemptState.SUCCEEDED) {
boolean isNew = false;
// is this a new success?
synchronized (doneTasks) {
if (!doneTasks.contains(task)) {
doneTasks.add(task);
isNew = true;
}
}
// It's a new completion
// Note that if a task completes twice [because of a previous speculation
// and a race, or a success followed by loss of the machine with the
// local data] we only count the first one.
if (isNew) {
long finish = timestamp;
if (start > 1L && finish > 1L && start <= finish) {
long duration = finish - start;
DataStatistics statistics
= dataStatisticsForTask(taskID);
if (statistics != null) {
statistics.add(duration);
}
}
}
}
}
@Override
public void updateAttempt(TaskAttemptStatus status, long timestamp) {
super.updateAttempt(status, timestamp);
TaskAttemptId attemptID = status.id;
TaskId taskID = attemptID.getTaskId();
JobId jobID = taskID.getJobId();
Job job = context.getJob(jobID);
if (job == null) {
return;
}
Task task = job.getTask(taskID);
if (task == null) {
return;
}
TaskAttempt taskAttempt = task.getAttempt(attemptID);
if (taskAttempt == null) {
return;
}
Long boxedStart = startTimes.get(attemptID);
long start = boxedStart == null ? Long.MIN_VALUE : boxedStart;
// We need to do two things.
// 1: If this is a completion, we accumulate statistics in the superclass
// 2: If this is not a completion, we learn more about it.
// This is not a completion, but we're cooking.
//
if (taskAttempt.getState() == TaskAttemptState.RUNNING) {
// See if this task is already in the registry
AtomicLong estimateContainer = attemptRuntimeEstimates.get(taskAttempt);
AtomicLong estimateVarianceContainer
= attemptRuntimeEstimateVariances.get(taskAttempt);
if (estimateContainer == null) {
if (attemptRuntimeEstimates.get(taskAttempt) == null) {
attemptRuntimeEstimates.put(taskAttempt, new AtomicLong());
estimateContainer = attemptRuntimeEstimates.get(taskAttempt);
}
}
if (estimateVarianceContainer == null) {
attemptRuntimeEstimateVariances.putIfAbsent(taskAttempt, new AtomicLong());
estimateVarianceContainer = attemptRuntimeEstimateVariances.get(taskAttempt);
}
long estimate = -1;
long varianceEstimate = -1;
// This code assumes that we'll never consider starting a third
// speculative task attempt if two are already running for this task
if (start > 0 && timestamp > start) {
estimate = (long) ((timestamp - start) / Math.max(0.0001, status.progress));
varianceEstimate = (long) (estimate * status.progress / 10);
}
if (estimateContainer != null) {
estimateContainer.set(estimate);
}
if (estimateVarianceContainer != null) {
estimateVarianceContainer.set(varianceEstimate);
}
}
}
public JobMapTaskRescheduledEvent(TaskId taskID) {
super(taskID.getJobId(), JobEventType.JOB_MAP_TASK_RESCHEDULED);
this.taskID = taskID;
}
public JobTaskEvent(TaskId taskID, TaskState taskState) {
super(taskID.getJobId(), JobEventType.JOB_TASK_COMPLETED);
this.taskID = taskID;
this.taskState = taskState;
}