下面列出了org.apache.hadoop.mapreduce.TaskType#REDUCE 实例代码,或者点击链接到github查看源代码,也可以在右侧发表评论。
/**
* Given the reduce taskAttemptID, returns the TaskAttemptInfo. Deconstructs
* the reduce taskAttemptID and looks up the jobStory with the parts taskType,
* id of task, id of task attempt.
*
* @param taskTracker
* tasktracker
* @param taskAttemptID
* task-attempt
* @return TaskAttemptInfo for the reduce task-attempt
*/
private TaskAttemptInfo getReduceTaskAttemptInfo(TaskTracker taskTracker,
TaskAttemptID taskAttemptID) {
assert (!taskAttemptID.isMap());
TaskID taskId = taskAttemptID.getTaskID();
TaskType taskType;
if (taskAttemptID.isMap()) {
taskType = TaskType.MAP;
} else {
taskType = TaskType.REDUCE;
}
TaskAttemptInfo taskAttemptInfo = jobStory.getTaskAttemptInfo(taskType,
taskId.getId(), taskAttemptID.getId());
if (LOG.isDebugEnabled()) {
LOG.debug("get an attempt: "
+ taskAttemptID.toString()
+ ", state="
+ taskAttemptInfo.getRunState()
+ ", runtime="
+ ((taskAttemptID.isMap()) ? taskAttemptInfo.getRuntime()
: ((ReduceTaskAttemptInfo) taskAttemptInfo).getReduceRuntime()));
}
return taskAttemptInfo;
}
/**
* test a getters of TaskAttemptFinishedEvent and TaskAttemptFinished
*
* @throws Exception
*/
@Test(timeout = 10000)
public void testTaskAttemptFinishedEvent() throws Exception {
JobID jid = new JobID("001", 1);
TaskID tid = new TaskID(jid, TaskType.REDUCE, 2);
TaskAttemptID taskAttemptId = new TaskAttemptID(tid, 3);
Counters counters = new Counters();
TaskAttemptFinishedEvent test = new TaskAttemptFinishedEvent(taskAttemptId,
TaskType.REDUCE, "TEST", 123L, "RAKNAME", "HOSTNAME", "STATUS",
counters);
assertEquals(test.getAttemptId().toString(), taskAttemptId.toString());
assertEquals(test.getCounters(), counters);
assertEquals(test.getFinishTime(), 123L);
assertEquals(test.getHostname(), "HOSTNAME");
assertEquals(test.getRackName(), "RAKNAME");
assertEquals(test.getState(), "STATUS");
assertEquals(test.getTaskId(), tid);
assertEquals(test.getTaskStatus(), "TEST");
assertEquals(test.getTaskType(), TaskType.REDUCE);
}
public void reduce(Text key, Iterable<IntWritable> values,
Context context) throws IOException, InterruptedException {
// Make one reducer slower for speculative execution
TaskAttemptID taid = context.getTaskAttemptID();
long sleepTime = 100;
Configuration conf = context.getConfiguration();
boolean test_speculate_reduce =
conf.getBoolean(MRJobConfig.REDUCE_SPECULATIVE, false);
// IF TESTING REDUCE SPECULATIVE EXECUTION:
// Make the "*_r_000000_0" attempt take much longer than the others.
// When speculative execution is enabled, this should cause the attempt
// to be killed and restarted. At that point, the attempt ID will be
// "*_r_000000_1", so sleepTime will still remain 100ms.
if ( (taid.getTaskType() == TaskType.REDUCE) && test_speculate_reduce
&& (taid.getTaskID().getId() == 0) && (taid.getId() == 0)) {
sleepTime = 10000;
}
try{
Thread.sleep(sleepTime);
} catch(InterruptedException ie) {
// Ignore
}
context.write(key,new IntWritable(0));
}
/**
* test deprecated methods of TaskCompletionEvent
*/
@SuppressWarnings("deprecation")
@Test (timeout=5000)
public void testTaskCompletionEvent() {
TaskAttemptID taid = new TaskAttemptID("001", 1, TaskType.REDUCE, 2, 3);
TaskCompletionEvent template = new TaskCompletionEvent(12, taid, 13, true,
Status.SUCCEEDED, "httptracker");
TaskCompletionEvent testEl = TaskCompletionEvent.downgrade(template);
testEl.setTaskAttemptId(taid);
testEl.setTaskTrackerHttp("httpTracker");
testEl.setTaskId("attempt_001_0001_m_000002_04");
assertEquals("attempt_001_0001_m_000002_4",testEl.getTaskId());
testEl.setTaskStatus(Status.OBSOLETE);
assertEquals(Status.OBSOLETE.toString(), testEl.getStatus().toString());
testEl.setTaskRunTime(20);
assertEquals(testEl.getTaskRunTime(), 20);
testEl.setEventId(16);
assertEquals(testEl.getEventId(), 16);
}
/**
* Add the update refill msecs to the metrics. This method needs to be
* synchronized with numFreeSlots and is currently only called in run()
* under synchronization of numFreeSlots.
*
* @param usedSlots Number of slots refilled
*/
private void updateRefillMsecs(int usedSlots) {
long currentTime = System.currentTimeMillis();
for (int i = 0; i < usedSlots; ++i) {
// There should also be at least usedSlots entries in
// lastFreeMsecsQueue, but Corona can violate this
// principle by scheduling tasks before another task resource is
// confirmed to have been released.
if (lastFreeMsecsQueue.isEmpty()) {
LOG.warn("updateRefillMsecs: Only obtained refill times for " + i +
" out of " + usedSlots + " slots.");
break;
}
int refillMsecs = (int) (currentTime - lastFreeMsecsQueue.remove());
if (taskType == TaskType.MAP) {
addAveMapSlotRefillMsecs(refillMsecs);
} else if (taskType == TaskType.REDUCE) {
addAveReduceSlotRefillMsecs(refillMsecs);
} else {
throw new RuntimeException("updateRefillMsecs doesn't " +
"suppport task type " + taskType);
}
}
}
private static Set<String> runTasks(JobContext job, int numTasks, int numFiles)
throws IOException {
Set<String> uploads = Sets.newHashSet();
for (int taskId = 0; taskId < numTasks; taskId += 1) {
TaskAttemptID attemptID = new TaskAttemptID(
new TaskID(JOB_ID, TaskType.REDUCE, taskId),
(taskId * 37) % numTasks);
TaskAttemptContext attempt = new TaskAttemptContextImpl(
new Configuration(job.getConfiguration()), attemptID);
MockedS3Committer taskCommitter = new MockedS3Committer(
S3_OUTPUT_PATH, attempt);
commitTask(taskCommitter, attempt, numFiles);
uploads.addAll(taskCommitter.results.getUploads());
}
return uploads;
}
public MergeQueue(Configuration conf, FileSystem fs,
Path[] inputs, boolean deleteInputs,
CompressionCodec codec, RawComparator<K> comparator,
Progressable reporter)
throws IOException {
this(conf, fs, inputs, deleteInputs, codec, comparator, reporter, null,
TaskType.REDUCE);
}
public static org.apache.hadoop.mapred.TaskID
createMockTaskAttemptIDFromTezTaskId(TezTaskID tezTaId, boolean isMap) {
TezVertexID vId = tezTaId.getVertexID();
ApplicationId appId = vId.getDAGId().getApplicationId();
return new org.apache.hadoop.mapred.TaskID(String.valueOf(appId.getClusterTimestamp())
+ String.valueOf(vId.getId()), appId.getId(),
isMap ? TaskType.MAP : TaskType.REDUCE, tezTaId.getId());
}
/**
* Returns the type of the {@link TaskAttemptID} passed.
* The type of an attempt is determined by the nature of the task and not its
* id.
* For example,
* - Attempt 'attempt_123_01_m_01_0' might be a job-setup task even though it
* has a _m_ in its id. Hence the task type of this attempt is JOB_SETUP
* instead of MAP.
* - Similarly reduce attempt 'attempt_123_01_r_01_0' might have failed and is
* now supposed to do the task-level cleanup. In such a case this attempt
* will be of type TASK_CLEANUP instead of REDUCE.
*/
TaskType getAttemptType (TaskAttemptID id) {
if (isCleanupAttempt(id)) {
return TaskType.TASK_CLEANUP;
} else if (isJobSetupTask()) {
return TaskType.JOB_SETUP;
} else if (isJobCleanupTask()) {
return TaskType.JOB_CLEANUP;
} else if (isMapTask()) {
return TaskType.MAP;
} else {
return TaskType.REDUCE;
}
}
@Test
public void testConsecutiveFetch()
throws IOException, InterruptedException {
final int MAX_EVENTS_TO_FETCH = 100;
TaskAttemptID tid = new TaskAttemptID("12345", 1, TaskType.REDUCE, 1, 1);
TaskUmbilicalProtocol umbilical = mock(TaskUmbilicalProtocol.class);
when(umbilical.getMapCompletionEvents(any(JobID.class),
anyInt(), anyInt(), any(TaskAttemptID.class)))
.thenReturn(getMockedCompletionEventsUpdate(0, 0));
when(umbilical.getMapCompletionEvents(any(JobID.class),
eq(0), eq(MAX_EVENTS_TO_FETCH), eq(tid)))
.thenReturn(getMockedCompletionEventsUpdate(0, MAX_EVENTS_TO_FETCH));
when(umbilical.getMapCompletionEvents(any(JobID.class),
eq(MAX_EVENTS_TO_FETCH), eq(MAX_EVENTS_TO_FETCH), eq(tid)))
.thenReturn(getMockedCompletionEventsUpdate(MAX_EVENTS_TO_FETCH,
MAX_EVENTS_TO_FETCH));
when(umbilical.getMapCompletionEvents(any(JobID.class),
eq(MAX_EVENTS_TO_FETCH*2), eq(MAX_EVENTS_TO_FETCH), eq(tid)))
.thenReturn(getMockedCompletionEventsUpdate(MAX_EVENTS_TO_FETCH*2, 3));
@SuppressWarnings("unchecked")
ShuffleScheduler<String,String> scheduler =
mock(ShuffleScheduler.class);
ExceptionReporter reporter = mock(ExceptionReporter.class);
EventFetcherForTest<String,String> ef =
new EventFetcherForTest<String,String>(tid, umbilical, scheduler,
reporter, MAX_EVENTS_TO_FETCH);
ef.getMapCompletionEvents();
verify(reporter, never()).reportException(any(Throwable.class));
InOrder inOrder = inOrder(umbilical);
inOrder.verify(umbilical).getMapCompletionEvents(any(JobID.class),
eq(0), eq(MAX_EVENTS_TO_FETCH), eq(tid));
inOrder.verify(umbilical).getMapCompletionEvents(any(JobID.class),
eq(MAX_EVENTS_TO_FETCH), eq(MAX_EVENTS_TO_FETCH), eq(tid));
inOrder.verify(umbilical).getMapCompletionEvents(any(JobID.class),
eq(MAX_EVENTS_TO_FETCH*2), eq(MAX_EVENTS_TO_FETCH), eq(tid));
verify(scheduler, times(MAX_EVENTS_TO_FETCH*2 + 3)).resolve(
any(TaskCompletionEvent.class));
}
public void processEventForJobSummary(HistoryEvent event, JobSummary summary,
JobId jobId) {
// context.getJob could be used for some of this info as well.
switch (event.getEventType()) {
case JOB_SUBMITTED:
JobSubmittedEvent jse = (JobSubmittedEvent) event;
summary.setUser(jse.getUserName());
summary.setQueue(jse.getJobQueueName());
summary.setJobSubmitTime(jse.getSubmitTime());
summary.setJobName(jse.getJobName());
break;
case NORMALIZED_RESOURCE:
NormalizedResourceEvent normalizedResourceEvent =
(NormalizedResourceEvent) event;
if (normalizedResourceEvent.getTaskType() == TaskType.MAP) {
summary.setResourcesPerMap(normalizedResourceEvent.getMemory());
} else if (normalizedResourceEvent.getTaskType() == TaskType.REDUCE) {
summary.setResourcesPerReduce(normalizedResourceEvent.getMemory());
}
break;
case JOB_INITED:
JobInitedEvent jie = (JobInitedEvent) event;
summary.setJobLaunchTime(jie.getLaunchTime());
break;
case MAP_ATTEMPT_STARTED:
TaskAttemptStartedEvent mtase = (TaskAttemptStartedEvent) event;
if (summary.getFirstMapTaskLaunchTime() == 0)
summary.setFirstMapTaskLaunchTime(mtase.getStartTime());
break;
case REDUCE_ATTEMPT_STARTED:
TaskAttemptStartedEvent rtase = (TaskAttemptStartedEvent) event;
if (summary.getFirstReduceTaskLaunchTime() == 0)
summary.setFirstReduceTaskLaunchTime(rtase.getStartTime());
break;
case JOB_FINISHED:
JobFinishedEvent jfe = (JobFinishedEvent) event;
summary.setJobFinishTime(jfe.getFinishTime());
summary.setNumFinishedMaps(jfe.getFinishedMaps());
summary.setNumFailedMaps(jfe.getFailedMaps());
summary.setNumFinishedReduces(jfe.getFinishedReduces());
summary.setNumFailedReduces(jfe.getFailedReduces());
if (summary.getJobStatus() == null)
summary
.setJobStatus(org.apache.hadoop.mapreduce.JobStatus.State.SUCCEEDED
.toString());
// TODO JOB_FINISHED does not have state. Effectively job history does not
// have state about the finished job.
setSummarySlotSeconds(summary, jfe.getTotalCounters());
break;
case JOB_FAILED:
case JOB_KILLED:
JobUnsuccessfulCompletionEvent juce = (JobUnsuccessfulCompletionEvent) event;
summary.setJobStatus(juce.getStatus());
summary.setNumFinishedMaps(context.getJob(jobId).getTotalMaps());
summary.setNumFinishedReduces(context.getJob(jobId).getTotalReduces());
summary.setJobFinishTime(juce.getFinishTime());
setSummarySlotSeconds(summary, context.getJob(jobId).getAllCounters());
break;
default:
break;
}
}
ReduceSchedulingMgr(CapacityTaskScheduler schedulr) {
super(schedulr);
type = TaskType.REDUCE;
queueComparator = reduceComparator;
}
public MergeQueue(Configuration conf, FileSystem fs,
List<Segment<K, V>> segments, RawComparator<K> comparator,
Progressable reporter) {
this(conf, fs, segments, comparator, reporter, false, TaskType.REDUCE);
}
/**
* Check if a TT has enough memory to run of task specified from this job.
* @param job
* @param taskType
* @param taskTracker
* @return true if this TT has enough memory for this job. False otherwise.
*/
boolean matchesMemoryRequirements(JobInProgress job,TaskType taskType,
TaskTrackerStatus taskTracker) {
LOG.debug("Matching memory requirements of " + job.getJobID().toString()
+ " for scheduling on " + taskTracker.trackerName);
if (!isSchedulingBasedOnMemEnabled()) {
LOG.debug("Scheduling based on job's memory requirements is disabled."
+ " Ignoring any value set by job.");
return true;
}
Long memUsedOnTT = getMemReservedForTasks(taskTracker, taskType);
long totalMemUsableOnTT = 0;
long memForThisTask = 0;
if (taskType == TaskType.MAP) {
memForThisTask = job.getJobConf().getMemoryForMapTask();
totalMemUsableOnTT =
scheduler.getMemSizeForMapSlot() * taskTracker.getMaxMapSlots();
} else if (taskType == TaskType.REDUCE) {
memForThisTask = job.getJobConf().getMemoryForReduceTask();
totalMemUsableOnTT =
scheduler.getMemSizeForReduceSlot()
* taskTracker.getMaxReduceSlots();
}
long freeMemOnTT = totalMemUsableOnTT - memUsedOnTT.longValue();
if (memForThisTask > freeMemOnTT) {
if (LOG.isDebugEnabled()) {
LOG.debug("memForThisTask (" + memForThisTask + ") > freeMemOnTT ("
+ freeMemOnTT + "). A " + taskType + " task from "
+ job.getJobID().toString() + " cannot be scheduled on TT "
+ taskTracker.trackerName);
}
return false;
}
if (LOG.isDebugEnabled()) {
LOG.debug("memForThisTask = " + memForThisTask + ". freeMemOnTT = "
+ freeMemOnTT + ". A " + taskType.toString() + " task from "
+ job.getJobID().toString() + " matches memory requirements "
+ "on TT "+ taskTracker.trackerName);
}
return true;
}
@SuppressWarnings("rawtypes")
private org.apache.hadoop.mapreduce.OutputCommitter
getOutputCommitter(OutputCommitterContext context) {
org.apache.hadoop.mapreduce.OutputCommitter committer = null;
newApiCommitter = false;
if (jobConf.getBoolean("mapred.reducer.new-api", false)
|| jobConf.getBoolean("mapred.mapper.new-api", false)) {
newApiCommitter = true;
}
LOG.info("Committer for " + getContext().getVertexName() + ":" + getContext().getOutputName() +
" using " + (newApiCommitter ? "new" : "old") + "mapred API");
if (newApiCommitter) {
TaskAttemptID taskAttemptID = new TaskAttemptID(
Long.toString(context.getApplicationId().getClusterTimestamp()),
context.getApplicationId().getId(),
((jobConf.getBoolean(MRConfig.IS_MAP_PROCESSOR, false) ?
TaskType.MAP : TaskType.REDUCE)),
0, context.getDAGAttemptNumber());
TaskAttemptContext taskContext = new TaskAttemptContextImpl(jobConf,
taskAttemptID);
try {
OutputFormat outputFormat = ReflectionUtils.newInstance(taskContext
.getOutputFormatClass(), jobConf);
committer = outputFormat.getOutputCommitter(taskContext);
} catch (Exception e) {
throw new TezUncheckedException(e);
}
} else {
committer = ReflectionUtils.newInstance(jobConf.getClass(
"mapred.output.committer.class", FileOutputCommitter.class,
org.apache.hadoop.mapred.OutputCommitter.class), jobConf);
}
LOG.info("OutputCommitter for outputName="
+ context.getOutputName()
+ ", vertexName=" + context.getVertexName()
+ ", outputCommitterClass="
+ committer.getClass().getName());
return committer;
}
@Test
public void testConsecutiveFetch()
throws IOException, InterruptedException {
final int MAX_EVENTS_TO_FETCH = 100;
TaskAttemptID tid = new TaskAttemptID("12345", 1, TaskType.REDUCE, 1, 1);
TaskUmbilicalProtocol umbilical = mock(TaskUmbilicalProtocol.class);
when(umbilical.getMapCompletionEvents(any(JobID.class),
anyInt(), anyInt(), any(TaskAttemptID.class)))
.thenReturn(getMockedCompletionEventsUpdate(0, 0));
when(umbilical.getMapCompletionEvents(any(JobID.class),
eq(0), eq(MAX_EVENTS_TO_FETCH), eq(tid)))
.thenReturn(getMockedCompletionEventsUpdate(0, MAX_EVENTS_TO_FETCH));
when(umbilical.getMapCompletionEvents(any(JobID.class),
eq(MAX_EVENTS_TO_FETCH), eq(MAX_EVENTS_TO_FETCH), eq(tid)))
.thenReturn(getMockedCompletionEventsUpdate(MAX_EVENTS_TO_FETCH,
MAX_EVENTS_TO_FETCH));
when(umbilical.getMapCompletionEvents(any(JobID.class),
eq(MAX_EVENTS_TO_FETCH*2), eq(MAX_EVENTS_TO_FETCH), eq(tid)))
.thenReturn(getMockedCompletionEventsUpdate(MAX_EVENTS_TO_FETCH*2, 3));
@SuppressWarnings("unchecked")
ShuffleScheduler<String,String> scheduler =
mock(ShuffleScheduler.class);
ExceptionReporter reporter = mock(ExceptionReporter.class);
EventFetcherForTest<String,String> ef =
new EventFetcherForTest<String,String>(tid, umbilical, scheduler,
reporter, MAX_EVENTS_TO_FETCH);
ef.getMapCompletionEvents();
verify(reporter, never()).reportException(any(Throwable.class));
InOrder inOrder = inOrder(umbilical);
inOrder.verify(umbilical).getMapCompletionEvents(any(JobID.class),
eq(0), eq(MAX_EVENTS_TO_FETCH), eq(tid));
inOrder.verify(umbilical).getMapCompletionEvents(any(JobID.class),
eq(MAX_EVENTS_TO_FETCH), eq(MAX_EVENTS_TO_FETCH), eq(tid));
inOrder.verify(umbilical).getMapCompletionEvents(any(JobID.class),
eq(MAX_EVENTS_TO_FETCH*2), eq(MAX_EVENTS_TO_FETCH), eq(tid));
verify(scheduler, times(MAX_EVENTS_TO_FETCH*2 + 3)).resolve(
any(TaskCompletionEvent.class));
}
public static TaskID toMRTaskId(TezTaskID taskid) {
return new TaskID(
toMRJobId(taskid.getVertexID().getDAGId()),
taskid.getVertexID().getId() == 0 ? TaskType.MAP : TaskType.REDUCE,
taskid.getId());
}
public void processEventForJobSummary(HistoryEvent event, JobSummary summary,
JobId jobId) {
// context.getJob could be used for some of this info as well.
switch (event.getEventType()) {
case JOB_SUBMITTED:
JobSubmittedEvent jse = (JobSubmittedEvent) event;
summary.setUser(jse.getUserName());
summary.setQueue(jse.getJobQueueName());
summary.setJobSubmitTime(jse.getSubmitTime());
summary.setJobName(jse.getJobName());
break;
case NORMALIZED_RESOURCE:
NormalizedResourceEvent normalizedResourceEvent =
(NormalizedResourceEvent) event;
if (normalizedResourceEvent.getTaskType() == TaskType.MAP) {
summary.setResourcesPerMap(normalizedResourceEvent.getMemory());
} else if (normalizedResourceEvent.getTaskType() == TaskType.REDUCE) {
summary.setResourcesPerReduce(normalizedResourceEvent.getMemory());
}
break;
case JOB_INITED:
JobInitedEvent jie = (JobInitedEvent) event;
summary.setJobLaunchTime(jie.getLaunchTime());
break;
case MAP_ATTEMPT_STARTED:
TaskAttemptStartedEvent mtase = (TaskAttemptStartedEvent) event;
if (summary.getFirstMapTaskLaunchTime() == 0)
summary.setFirstMapTaskLaunchTime(mtase.getStartTime());
break;
case REDUCE_ATTEMPT_STARTED:
TaskAttemptStartedEvent rtase = (TaskAttemptStartedEvent) event;
if (summary.getFirstReduceTaskLaunchTime() == 0)
summary.setFirstReduceTaskLaunchTime(rtase.getStartTime());
break;
case JOB_FINISHED:
JobFinishedEvent jfe = (JobFinishedEvent) event;
summary.setJobFinishTime(jfe.getFinishTime());
summary.setNumFinishedMaps(jfe.getFinishedMaps());
summary.setNumFailedMaps(jfe.getFailedMaps());
summary.setNumFinishedReduces(jfe.getFinishedReduces());
summary.setNumFailedReduces(jfe.getFailedReduces());
if (summary.getJobStatus() == null)
summary
.setJobStatus(org.apache.hadoop.mapreduce.JobStatus.State.SUCCEEDED
.toString());
// TODO JOB_FINISHED does not have state. Effectively job history does not
// have state about the finished job.
setSummarySlotSeconds(summary, jfe.getTotalCounters());
break;
case JOB_FAILED:
case JOB_KILLED:
JobUnsuccessfulCompletionEvent juce = (JobUnsuccessfulCompletionEvent) event;
summary.setJobStatus(juce.getStatus());
summary.setNumFinishedMaps(context.getJob(jobId).getTotalMaps());
summary.setNumFinishedReduces(context.getJob(jobId).getTotalReduces());
summary.setJobFinishTime(juce.getFinishTime());
setSummarySlotSeconds(summary, context.getJob(jobId).getAllCounters());
break;
default:
break;
}
}
/**
* Constructs a TaskId object from given parts.
* @param jtIdentifier jobTracker identifier
* @param jobId job number
* @param isMap whether the tip is a map
* @param taskId taskId number
* @param id the task attempt number
* @deprecated Use {@link #TaskAttemptID(String, int, TaskType, int, int)}.
*/
@Deprecated
public TaskAttemptID(String jtIdentifier, int jobId, boolean isMap,
int taskId, int id) {
this(jtIdentifier, jobId, isMap ? TaskType.MAP : TaskType.REDUCE, taskId,
id);
}
/**
* Constructs a TaskId object from given parts.
* @param jtIdentifier jobTracker identifier
* @param jobId job number
* @param isMap whether the tip is a map
* @param taskId taskId number
* @param id the task attempt number
* @deprecated Use {@link #TaskAttemptID(String, int, TaskType, int, int)}.
*/
@Deprecated
public TaskAttemptID(String jtIdentifier, int jobId, boolean isMap,
int taskId, int id) {
this(jtIdentifier, jobId, isMap ? TaskType.MAP : TaskType.REDUCE, taskId,
id);
}