下面列出了org.apache.hadoop.mapreduce.TaskAttemptID#getTaskID ( ) 实例代码,或者点击链接到github查看源代码,也可以在右侧发表评论。
/**
* Given the reduce taskAttemptID, returns the TaskAttemptInfo. Deconstructs
* the reduce taskAttemptID and looks up the jobStory with the parts taskType,
* id of task, id of task attempt.
*
* @param taskTracker
* tasktracker
* @param taskAttemptID
* task-attempt
* @return TaskAttemptInfo for the reduce task-attempt
*/
private TaskAttemptInfo getReduceTaskAttemptInfo(TaskTracker taskTracker,
TaskAttemptID taskAttemptID) {
assert (!taskAttemptID.isMap());
TaskID taskId = taskAttemptID.getTaskID();
TaskType taskType;
if (taskAttemptID.isMap()) {
taskType = TaskType.MAP;
} else {
taskType = TaskType.REDUCE;
}
TaskAttemptInfo taskAttemptInfo = jobStory.getTaskAttemptInfo(taskType,
taskId.getId(), taskAttemptID.getId());
if (LOG.isDebugEnabled()) {
LOG.debug("get an attempt: "
+ taskAttemptID.toString()
+ ", state="
+ taskAttemptInfo.getRunState()
+ ", runtime="
+ ((taskAttemptID.isMap()) ? taskAttemptInfo.getRuntime()
: ((ReduceTaskAttemptInfo) taskAttemptInfo).getReduceRuntime()));
}
return taskAttemptInfo;
}
@Override
public void cleanUpPartialOutputForTask(TaskAttemptContext context)
throws IOException {
// we double check this is never invoked from a non-preemptable subclass.
// This should never happen, since the invoking codes is checking it too,
// but it is safer to double check. Errors handling this would produce
// inconsistent output.
if (!this.getClass().isAnnotationPresent(Checkpointable.class)) {
throw new IllegalStateException("Invoking cleanUpPartialOutputForTask() " +
"from non @Preemptable class");
}
FileSystem fs =
fsFor(getTaskAttemptPath(context), context.getConfiguration());
LOG.info("cleanUpPartialOutputForTask: removing everything belonging to " +
context.getTaskAttemptID().getTaskID() + " in: " +
getCommittedTaskPath(context).getParent());
final TaskAttemptID taid = context.getTaskAttemptID();
final TaskID tid = taid.getTaskID();
Path pCommit = getCommittedTaskPath(context).getParent();
// remove any committed output
for (int i = 0; i < taid.getId(); ++i) {
TaskAttemptID oldId = new TaskAttemptID(tid, i);
Path pTask = new Path(pCommit, oldId.toString());
if (fs.exists(pTask) && !fs.delete(pTask, true)) {
throw new IOException("Failed to delete " + pTask);
}
}
}
/**
* Mask the job ID part in a {@link TaskAttemptID}.
*
* @param attemptId
* raw {@link TaskAttemptID} read from trace
* @return masked {@link TaskAttemptID} with empty {@link JobID}.
*/
private TaskAttemptID maskAttemptID(TaskAttemptID attemptId) {
JobID jobId = new JobID();
TaskType taskType = attemptId.getTaskType();
TaskID taskId = attemptId.getTaskID();
return new TaskAttemptID(jobId.getJtIdentifier(), jobId.getId(), taskType,
taskId.getId(), attemptId.getId());
}
@Override
public void cleanUpPartialOutputForTask(TaskAttemptContext context)
throws IOException {
// we double check this is never invoked from a non-preemptable subclass.
// This should never happen, since the invoking codes is checking it too,
// but it is safer to double check. Errors handling this would produce
// inconsistent output.
if (!this.getClass().isAnnotationPresent(Checkpointable.class)) {
throw new IllegalStateException("Invoking cleanUpPartialOutputForTask() " +
"from non @Preemptable class");
}
FileSystem fs =
fsFor(getTaskAttemptPath(context), context.getConfiguration());
LOG.info("cleanUpPartialOutputForTask: removing everything belonging to " +
context.getTaskAttemptID().getTaskID() + " in: " +
getCommittedTaskPath(context).getParent());
final TaskAttemptID taid = context.getTaskAttemptID();
final TaskID tid = taid.getTaskID();
Path pCommit = getCommittedTaskPath(context).getParent();
// remove any committed output
for (int i = 0; i < taid.getId(); ++i) {
TaskAttemptID oldId = new TaskAttemptID(tid, i);
Path pTask = new Path(pCommit, oldId.toString());
if (fs.exists(pTask) && !fs.delete(pTask, true)) {
throw new IOException("Failed to delete " + pTask);
}
}
}
/**
* Mask the job ID part in a {@link TaskAttemptID}.
*
* @param attemptId
* raw {@link TaskAttemptID} read from trace
* @return masked {@link TaskAttemptID} with empty {@link JobID}.
*/
private TaskAttemptID maskAttemptID(TaskAttemptID attemptId) {
JobID jobId = new JobID();
TaskType taskType = attemptId.getTaskType();
TaskID taskId = attemptId.getTaskID();
return new TaskAttemptID(jobId.getJtIdentifier(), jobId.getId(), taskType,
taskId.getId(), attemptId.getId());
}
@Override
public void merge(List<InMemoryMapOutput<K,V>> inputs) throws IOException {
if (inputs == null || inputs.size() == 0) {
return;
}
//name this output file same as the name of the first file that is
//there in the current list of inmem files (this is guaranteed to
//be absent on the disk currently. So we don't overwrite a prev.
//created spill). Also we need to create the output file now since
//it is not guaranteed that this file will be present after merge
//is called (we delete empty files as soon as we see them
//in the merge method)
//figure out the mapId
TaskAttemptID mapId = inputs.get(0).getMapId();
TaskID mapTaskId = mapId.getTaskID();
List<Segment<K, V>> inMemorySegments = new ArrayList<Segment<K, V>>();
long mergeOutputSize =
createInMemorySegments(inputs, inMemorySegments,0);
int noInMemorySegments = inMemorySegments.size();
Path outputPath =
mapOutputFile.getInputFileForWrite(mapTaskId,
mergeOutputSize).suffix(
Task.MERGED_OUTPUT_PREFIX);
FSDataOutputStream out = CryptoUtils.wrapIfNecessary(jobConf, rfs.create(outputPath));
Writer<K, V> writer = new Writer<K, V>(jobConf, out,
(Class<K>) jobConf.getMapOutputKeyClass(),
(Class<V>) jobConf.getMapOutputValueClass(), codec, null, true);
RawKeyValueIterator rIter = null;
CompressAwarePath compressAwarePath;
try {
LOG.info("Initiating in-memory merge with " + noInMemorySegments +
" segments...");
rIter = Merger.merge(jobConf, rfs,
(Class<K>)jobConf.getMapOutputKeyClass(),
(Class<V>)jobConf.getMapOutputValueClass(),
inMemorySegments, inMemorySegments.size(),
new Path(reduceId.toString()),
(RawComparator<K>)jobConf.getOutputKeyComparator(),
reporter, spilledRecordsCounter, null, null);
if (null == combinerClass) {
Merger.writeFile(rIter, writer, reporter, jobConf);
} else {
combineCollector.setWriter(writer);
combineAndSpill(rIter, reduceCombineInputCounter);
}
writer.close();
compressAwarePath = new CompressAwarePath(outputPath,
writer.getRawLength(), writer.getCompressedLength());
LOG.info(reduceId +
" Merge of the " + noInMemorySegments +
" files in-memory complete." +
" Local file is " + outputPath + " of size " +
localFS.getFileStatus(outputPath).getLen());
} catch (IOException e) {
//make sure that we delete the ondisk file that we created
//earlier when we invoked cloneFileAttributes
localFS.delete(outputPath, true);
throw e;
}
// Note the output of the merge
closeOnDiskFile(compressAwarePath);
}
@Override
public void merge(List<InMemoryMapOutput<K,V>> inputs) throws IOException {
if (inputs == null || inputs.size() == 0) {
return;
}
//name this output file same as the name of the first file that is
//there in the current list of inmem files (this is guaranteed to
//be absent on the disk currently. So we don't overwrite a prev.
//created spill). Also we need to create the output file now since
//it is not guaranteed that this file will be present after merge
//is called (we delete empty files as soon as we see them
//in the merge method)
//figure out the mapId
TaskAttemptID mapId = inputs.get(0).getMapId();
TaskID mapTaskId = mapId.getTaskID();
List<Segment<K, V>> inMemorySegments = new ArrayList<Segment<K, V>>();
long mergeOutputSize =
createInMemorySegments(inputs, inMemorySegments,0);
int noInMemorySegments = inMemorySegments.size();
Path outputPath =
mapOutputFile.getInputFileForWrite(mapTaskId,
mergeOutputSize).suffix(
Task.MERGED_OUTPUT_PREFIX);
FSDataOutputStream out = CryptoUtils.wrapIfNecessary(jobConf, rfs.create(outputPath));
Writer<K, V> writer = new Writer<K, V>(jobConf, out,
(Class<K>) jobConf.getMapOutputKeyClass(),
(Class<V>) jobConf.getMapOutputValueClass(), codec, null, true);
RawKeyValueIterator rIter = null;
CompressAwarePath compressAwarePath;
try {
LOG.info("Initiating in-memory merge with " + noInMemorySegments +
" segments...");
rIter = Merger.merge(jobConf, rfs,
(Class<K>)jobConf.getMapOutputKeyClass(),
(Class<V>)jobConf.getMapOutputValueClass(),
inMemorySegments, inMemorySegments.size(),
new Path(reduceId.toString()),
(RawComparator<K>)jobConf.getOutputKeyComparator(),
reporter, spilledRecordsCounter, null, null);
if (null == combinerClass) {
Merger.writeFile(rIter, writer, reporter, jobConf);
} else {
combineCollector.setWriter(writer);
combineAndSpill(rIter, reduceCombineInputCounter);
}
writer.close();
compressAwarePath = new CompressAwarePath(outputPath,
writer.getRawLength(), writer.getCompressedLength());
LOG.info(reduceId +
" Merge of the " + noInMemorySegments +
" files in-memory complete." +
" Local file is " + outputPath + " of size " +
localFS.getFileStatus(outputPath).getLen());
} catch (IOException e) {
//make sure that we delete the ondisk file that we created
//earlier when we invoked cloneFileAttributes
localFS.delete(outputPath, true);
throw e;
}
// Note the output of the merge
closeOnDiskFile(compressAwarePath);
}
/**
* Mask the job ID part in a {@link TaskAttemptID}.
*
* @param attemptId
* raw {@link TaskAttemptID} read from trace
* @return masked {@link TaskAttemptID} with empty {@link JobID}.
*/
private TaskAttemptID maskAttemptID(TaskAttemptID attemptId) {
JobID jobId = new JobID();
TaskID taskId = attemptId.getTaskID();
return new TaskAttemptID(jobId.getJtIdentifier(), jobId.getId(),
attemptId.isMap(), taskId.getId(), attemptId.getId());
}