下面列出了怎么用org.apache.hadoop.mapreduce.TaskID的API类实例代码及写法,或者点击链接到github查看源代码。
@Override
public TaskID acquireTaskIdLock(Configuration conf) {
JobID jobId = HadoopFormats.getJobId(conf);
boolean lockAcquired = false;
int taskIdCandidate = 0;
while (!lockAcquired) {
taskIdCandidate = RANDOM_GEN.nextInt(Integer.MAX_VALUE);
Path path =
new Path(
locksDir,
String.format(LOCKS_DIR_TASK_PATTERN, getJobJtIdentifier(conf), taskIdCandidate));
lockAcquired = tryCreateFile(conf, path);
}
return HadoopFormats.createTaskID(jobId, taskIdCandidate);
}
private long computeFinishedMaps(JobInfo jobInfo, int numMaps,
int numSuccessfulMaps) {
if (numMaps == numSuccessfulMaps) {
return jobInfo.getFinishedMaps();
}
long numFinishedMaps = 0;
Map<org.apache.hadoop.mapreduce.TaskID, TaskInfo> taskInfos = jobInfo
.getAllTasks();
for (TaskInfo taskInfo : taskInfos.values()) {
if (TaskState.SUCCEEDED.toString().equals(taskInfo.getTaskStatus())) {
++numFinishedMaps;
}
}
return numFinishedMaps;
}
/**
* test a getters of TaskAttemptFinishedEvent and TaskAttemptFinished
*
* @throws Exception
*/
@Test(timeout = 10000)
public void testTaskAttemptFinishedEvent() throws Exception {
JobID jid = new JobID("001", 1);
TaskID tid = new TaskID(jid, TaskType.REDUCE, 2);
TaskAttemptID taskAttemptId = new TaskAttemptID(tid, 3);
Counters counters = new Counters();
TaskAttemptFinishedEvent test = new TaskAttemptFinishedEvent(taskAttemptId,
TaskType.REDUCE, "TEST", 123L, "RAKNAME", "HOSTNAME", "STATUS",
counters);
assertEquals(test.getAttemptId().toString(), taskAttemptId.toString());
assertEquals(test.getCounters(), counters);
assertEquals(test.getFinishTime(), 123L);
assertEquals(test.getHostname(), "HOSTNAME");
assertEquals(test.getRackName(), "RAKNAME");
assertEquals(test.getState(), "STATUS");
assertEquals(test.getTaskId(), tid);
assertEquals(test.getTaskStatus(), "TEST");
assertEquals(test.getTaskType(), TaskType.REDUCE);
}
/**
* Generate a unique filename, based on the task id, name, and extension
* @param context the task that is calling this
* @param name the base filename
* @param extension the filename extension
* @return a string like $name-[mrsct]-$id$extension
*/
public synchronized static String getUniqueFile(TaskAttemptContext context,
String name,
String extension) {
TaskID taskId = context.getTaskAttemptID().getTaskID();
int partition = taskId.getId();
StringBuilder result = new StringBuilder();
result.append(name);
result.append('-');
result.append(
TaskID.getRepresentingCharacter(taskId.getTaskType()));
result.append('-');
result.append(NUMBER_FORMAT.format(partition));
result.append(extension);
return result.toString();
}
HistoryEvent maybeEmitEvent(ParsedLine line, String taskIDName,
HistoryEventEmitter thatg) {
if (taskIDName == null) {
return null;
}
TaskID taskID = TaskID.forName(taskIDName);
String taskType = line.get("TASK_TYPE");
String startTime = line.get("START_TIME");
String splits = line.get("SPLITS");
if (startTime != null && taskType != null) {
Task20LineHistoryEventEmitter that =
(Task20LineHistoryEventEmitter) thatg;
that.originalStartTime = Long.parseLong(startTime);
that.originalTaskType =
Version20LogInterfaceUtils.get20TaskType(taskType);
return new TaskStartedEvent(taskID, that.originalStartTime,
that.originalTaskType, splits);
}
return null;
}
HistoryEvent maybeEmitEvent(ParsedLine line, String taskIDName,
HistoryEventEmitter thatg) {
if (taskIDName == null) {
return null;
}
TaskID taskID = TaskID.forName(taskIDName);
String finishTime = line.get("FINISH_TIME");
if (finishTime != null) {
return new TaskUpdatedEvent(taskID, Long.parseLong(finishTime));
}
return null;
}
private void loadAllTasks() {
if (tasksLoaded.get()) {
return;
}
tasksLock.lock();
try {
if (tasksLoaded.get()) {
return;
}
for (Map.Entry<TaskID, TaskInfo> entry : jobInfo.getAllTasks().entrySet()) {
TaskId yarnTaskID = TypeConverter.toYarn(entry.getKey());
TaskInfo taskInfo = entry.getValue();
Task task = new CompletedTask(yarnTaskID, taskInfo);
tasks.put(yarnTaskID, task);
if (task.getType() == TaskType.MAP) {
mapTasks.put(task.getID(), task);
} else if (task.getType() == TaskType.REDUCE) {
reduceTasks.put(task.getID(), task);
}
}
tasksLoaded.set(true);
} finally {
tasksLock.unlock();
}
}
public void setDatum(Object odatum) {
this.datum = (TaskFailed)odatum;
this.id =
TaskID.forName(datum.taskid.toString());
this.taskType =
TaskType.valueOf(datum.taskType.toString());
this.finishTime = datum.finishTime;
this.error = datum.error.toString();
this.failedDueToAttempt =
datum.failedDueToAttempt == null
? null
: TaskAttemptID.forName(
datum.failedDueToAttempt.toString());
this.status = datum.status.toString();
this.counters =
EventReader.fromAvro(datum.counters);
}
@Override
public void open(String uId) throws Exception {
this.hash = uId.hashCode();
Job job = ((ConfigurableHDFSFileSink<K, V>) getWriteOperation().getSink()).jobInstance();
FileOutputFormat.setOutputPath(job, new Path(path));
// Each Writer is responsible for writing one bundle of elements and is represented by one
// unique Hadoop task based on uId/hash. All tasks share the same job ID. Since Dataflow
// handles retrying of failed bundles, each task has one attempt only.
JobID jobId = job.getJobID();
TaskID taskId = new TaskID(jobId, TaskType.REDUCE, hash);
configure(job);
context = new TaskAttemptContextImpl(job.getConfiguration(), new TaskAttemptID(taskId, 0));
FileOutputFormat<K, V> outputFormat = formatClass.newInstance();
recordWriter = outputFormat.getRecordWriter(context);
outputCommitter = (FileOutputCommitter) outputFormat.getOutputCommitter(context);
}
private void loadAllTasks() {
if (tasksLoaded.get()) {
return;
}
tasksLock.lock();
try {
if (tasksLoaded.get()) {
return;
}
for (Map.Entry<TaskID, TaskInfo> entry : jobInfo.getAllTasks().entrySet()) {
TaskId yarnTaskID = TypeConverter.toYarn(entry.getKey());
TaskInfo taskInfo = entry.getValue();
Task task = new CompletedTask(yarnTaskID, taskInfo);
tasks.put(yarnTaskID, task);
if (task.getType() == TaskType.MAP) {
mapTasks.put(task.getID(), task);
} else if (task.getType() == TaskType.REDUCE) {
reduceTasks.put(task.getID(), task);
}
}
tasksLoaded.set(true);
} finally {
tasksLock.unlock();
}
}
private long computeFinishedMaps(JobInfo jobInfo, int numMaps,
int numSuccessfulMaps) {
if (numMaps == numSuccessfulMaps) {
return jobInfo.getFinishedMaps();
}
long numFinishedMaps = 0;
Map<org.apache.hadoop.mapreduce.TaskID, TaskInfo> taskInfos = jobInfo
.getAllTasks();
for (TaskInfo taskInfo : taskInfos.values()) {
if (TaskState.SUCCEEDED.toString().equals(taskInfo.getTaskStatus())) {
++numFinishedMaps;
}
}
return numFinishedMaps;
}
/**
* test some methods of CompletedTaskAttempt
*/
@Test (timeout=5000)
public void testCompletedTaskAttempt(){
TaskAttemptInfo attemptInfo= mock(TaskAttemptInfo.class);
when(attemptInfo.getRackname()).thenReturn("Rackname");
when(attemptInfo.getShuffleFinishTime()).thenReturn(11L);
when(attemptInfo.getSortFinishTime()).thenReturn(12L);
when(attemptInfo.getShufflePort()).thenReturn(10);
JobID jobId= new JobID("12345",0);
TaskID taskId =new TaskID(jobId,TaskType.REDUCE, 0);
TaskAttemptID taskAttemptId= new TaskAttemptID(taskId, 0);
when(attemptInfo.getAttemptId()).thenReturn(taskAttemptId);
CompletedTaskAttempt taskAttemt= new CompletedTaskAttempt(null,attemptInfo);
assertEquals( "Rackname", taskAttemt.getNodeRackName());
assertEquals( Phase.CLEANUP, taskAttemt.getPhase());
assertTrue( taskAttemt.isFinished());
assertEquals( 11L, taskAttemt.getShuffleFinishTime());
assertEquals( 12L, taskAttemt.getSortFinishTime());
assertEquals( 10, taskAttemt.getShufflePort());
}
private static Set<String> runTasks(JobContext job, int numTasks, int numFiles)
throws IOException {
Set<String> uploads = Sets.newHashSet();
for (int taskId = 0; taskId < numTasks; taskId += 1) {
TaskAttemptID attemptID = new TaskAttemptID(
new TaskID(JOB_ID, TaskType.REDUCE, taskId),
(taskId * 37) % numTasks);
TaskAttemptContext attempt = new TaskAttemptContextImpl(
new Configuration(job.getConfiguration()), attemptID);
MockedS3Committer taskCommitter = new MockedS3Committer(
S3_OUTPUT_PATH, attempt);
commitTask(taskCommitter, attempt, numFiles);
uploads.addAll(taskCommitter.results.getUploads());
}
return uploads;
}
/**
* test some methods of CompletedTaskAttempt
*/
@Test (timeout=5000)
public void testCompletedTaskAttempt(){
TaskAttemptInfo attemptInfo= mock(TaskAttemptInfo.class);
when(attemptInfo.getRackname()).thenReturn("Rackname");
when(attemptInfo.getShuffleFinishTime()).thenReturn(11L);
when(attemptInfo.getSortFinishTime()).thenReturn(12L);
when(attemptInfo.getShufflePort()).thenReturn(10);
JobID jobId= new JobID("12345",0);
TaskID taskId =new TaskID(jobId,TaskType.REDUCE, 0);
TaskAttemptID taskAttemptId= new TaskAttemptID(taskId, 0);
when(attemptInfo.getAttemptId()).thenReturn(taskAttemptId);
CompletedTaskAttempt taskAttemt= new CompletedTaskAttempt(null,attemptInfo);
assertEquals( "Rackname", taskAttemt.getNodeRackName());
assertEquals( Phase.CLEANUP, taskAttemt.getPhase());
assertTrue( taskAttemt.isFinished());
assertEquals( 11L, taskAttemt.getShuffleFinishTime());
assertEquals( 12L, taskAttemt.getSortFinishTime());
assertEquals( 10, taskAttemt.getShufflePort());
}
public MRCombiner(TaskContext taskContext) throws IOException {
final Configuration userConf = TezUtils.createConfFromUserPayload(taskContext.getUserPayload());
useNewApi = ConfigUtils.useNewApi(userConf);
if (useNewApi) {
conf = new JobConf(userConf);
} else {
conf = userConf;
}
assert(taskContext instanceof InputContext || taskContext instanceof OutputContext);
if (taskContext instanceof OutputContext) {
this.keyClass = ConfigUtils.getIntermediateOutputKeyClass(conf);
this.valClass = ConfigUtils.getIntermediateOutputValueClass(conf);
this.comparator = ConfigUtils.getIntermediateOutputKeyComparator(conf);
this.reporter = new MRTaskReporter((OutputContext)taskContext);
} else {
this.keyClass = ConfigUtils.getIntermediateInputKeyClass(conf);
this.valClass = ConfigUtils.getIntermediateInputValueClass(conf);
this.comparator = ConfigUtils.getIntermediateInputKeyComparator(conf);
this.reporter = new MRTaskReporter((InputContext)taskContext);
}
combineInputRecordsCounter = taskContext.getCounters().findCounter(TaskCounter.COMBINE_INPUT_RECORDS);
combineOutputRecordsCounter = taskContext.getCounters().findCounter(TaskCounter.COMBINE_OUTPUT_RECORDS);
boolean isMap = conf.getBoolean(MRConfig.IS_MAP_PROCESSOR,false);
this.mrTaskAttemptID = new TaskAttemptID(
new TaskID(String.valueOf(taskContext.getApplicationId()
.getClusterTimestamp()), taskContext.getApplicationId().getId(),
isMap ? TaskType.MAP : TaskType.REDUCE,
taskContext.getTaskIndex()), taskContext.getTaskAttemptNumber());
LOG.info("Using combineKeyClass: " + keyClass + ", combineValueClass: " + valClass + ", combineComparator: " +comparator + ", useNewApi: " + useNewApi);
}
public MRCombiner(TezTaskContext taskContext) throws IOException {
this.conf = TezUtils.createConfFromUserPayload(taskContext.getUserPayload());
assert(taskContext instanceof TezInputContext || taskContext instanceof TezOutputContext);
if (taskContext instanceof TezOutputContext) {
this.keyClass = ConfigUtils.getIntermediateOutputKeyClass(conf);
this.valClass = ConfigUtils.getIntermediateOutputValueClass(conf);
this.comparator = ConfigUtils.getIntermediateOutputKeyComparator(conf);
this.reporter = new MRTaskReporter((TezOutputContext)taskContext);
} else {
this.keyClass = ConfigUtils.getIntermediateInputKeyClass(conf);
this.valClass = ConfigUtils.getIntermediateInputValueClass(conf);
this.comparator = ConfigUtils.getIntermediateInputKeyComparator(conf);
this.reporter = new MRTaskReporter((TezInputContext)taskContext);
}
this.useNewApi = ConfigUtils.useNewApi(conf);
combineInputKeyCounter = taskContext.getCounters().findCounter(TaskCounter.COMBINE_INPUT_RECORDS);
combineInputValueCounter = taskContext.getCounters().findCounter(TaskCounter.COMBINE_OUTPUT_RECORDS);
boolean isMap = conf.getBoolean(MRConfig.IS_MAP_PROCESSOR,false);
this.mrTaskAttemptID = new TaskAttemptID(
new TaskID(String.valueOf(taskContext.getApplicationId()
.getClusterTimestamp()), taskContext.getApplicationId().getId(),
isMap ? TaskType.MAP : TaskType.REDUCE,
taskContext.getTaskIndex()), taskContext.getTaskAttemptNumber());
LOG.info("Using combineKeyClass: " + keyClass + ", combineValueClass: " + valClass + ", combineComparator: " +comparator + ", useNewApi: " + useNewApi);
}
public void write(K key, V value) throws IOException, InterruptedException {
TaskID taskID = job.getTaskAttemptID().getTaskID();
int partition = taskID.getId();
String baseName = getFileBaseName(key, NUMBER_FORMAT.format(partition));
RecordWriter<K, V> rw = this.recordWriter.get(baseName);
if (rw == null) {
rw = getBaseRecordWriter(job, baseName);
this.recordWriter.put(baseName, rw);
}
rw.write(null, value);
}
public void write(K key, V value) throws IOException, InterruptedException {
TaskID taskID = job.getTaskAttemptID().getTaskID();
int partition = taskID.getId();
String baseName = getFileBaseName(key, NUMBER_FORMAT.format(partition));
RecordWriter<K, V> rw = this.recordWriter.get(baseName);
if (rw == null) {
rw = getBaseRecordWriter(job, baseName);
this.recordWriter.put(baseName, rw);
}
rw.write(null, value);
}
/** Create a job info object where job information will be stored
* after a parse
*/
public JobInfo() {
submitTime = launchTime = finishTime = -1;
totalMaps = totalReduces = failedMaps = failedReduces = 0;
finishedMaps = finishedReduces = 0;
username = jobname = jobConfPath = jobQueueName = "";
tasksMap = new HashMap<TaskID, TaskInfo>();
completedTaskAttemptsMap = new HashMap<TaskAttemptID, TaskAttemptInfo>();
jobACLs = new HashMap<JobACL, AccessControlList>();
priority = JobPriority.NORMAL;
}
@Override
public void cleanUpPartialOutputForTask(TaskAttemptContext context)
throws IOException {
// we double check this is never invoked from a non-preemptable subclass.
// This should never happen, since the invoking codes is checking it too,
// but it is safer to double check. Errors handling this would produce
// inconsistent output.
if (!this.getClass().isAnnotationPresent(Checkpointable.class)) {
throw new IllegalStateException("Invoking cleanUpPartialOutputForTask() " +
"from non @Preemptable class");
}
FileSystem fs =
fsFor(getTaskAttemptPath(context), context.getConfiguration());
LOG.info("cleanUpPartialOutputForTask: removing everything belonging to " +
context.getTaskAttemptID().getTaskID() + " in: " +
getCommittedTaskPath(context).getParent());
final TaskAttemptID taid = context.getTaskAttemptID();
final TaskID tid = taid.getTaskID();
Path pCommit = getCommittedTaskPath(context).getParent();
// remove any committed output
for (int i = 0; i < taid.getId(); ++i) {
TaskAttemptID oldId = new TaskAttemptID(tid, i);
Path pTask = new Path(pCommit, oldId.toString());
if (fs.exists(pTask) && !fs.delete(pTask, true)) {
throw new IOException("Failed to delete " + pTask);
}
}
}
@SuppressWarnings("rawtypes")
@Test
public void testTipFailed() throws Exception {
JobConf job = new JobConf();
job.setNumMapTasks(2);
TaskStatus status = new TaskStatus() {
@Override
public boolean getIsMap() {
return false;
}
@Override
public void addFetchFailedMap(TaskAttemptID mapTaskId) {
}
};
Progress progress = new Progress();
TaskAttemptID reduceId = new TaskAttemptID("314159", 0, TaskType.REDUCE,
0, 0);
ShuffleSchedulerImpl scheduler = new ShuffleSchedulerImpl(job, status,
reduceId, null, progress, null, null, null);
JobID jobId = new JobID();
TaskID taskId1 = new TaskID(jobId, TaskType.REDUCE, 1);
scheduler.tipFailed(taskId1);
Assert.assertEquals("Progress should be 0.5", 0.5f, progress.getProgress(),
0.0f);
Assert.assertFalse(scheduler.waitUntilDone(1));
TaskID taskId0 = new TaskID(jobId, TaskType.REDUCE, 0);
scheduler.tipFailed(taskId0);
Assert.assertEquals("Progress should be 1.0", 1.0f, progress.getProgress(),
0.0f);
Assert.assertTrue(scheduler.waitUntilDone(1));
}
/** Create a job info object where job information will be stored
* after a parse
*/
public JobInfo() {
submitTime = launchTime = finishTime = -1;
totalMaps = totalReduces = failedMaps = failedReduces = 0;
finishedMaps = finishedReduces = 0;
username = jobname = jobConfPath = jobQueueName = "";
tasksMap = new HashMap<TaskID, TaskInfo>();
completedTaskAttemptsMap = new HashMap<TaskAttemptID, TaskAttemptInfo>();
jobACLs = new HashMap<JobACL, AccessControlList>();
priority = JobPriority.NORMAL;
}
public void setDatum(Object oDatum) {
this.datum = (TaskFinished)oDatum;
this.taskid = TaskID.forName(datum.taskid.toString());
if (datum.successfulAttemptId != null) {
this.successfulAttemptId = TaskAttemptID
.forName(datum.successfulAttemptId.toString());
}
this.finishTime = datum.finishTime;
this.taskType = TaskType.valueOf(datum.taskType.toString());
this.status = datum.status.toString();
this.counters = EventReader.fromAvro(datum.counters);
}
protected String getUniqueName(String name, String extension) {
int partition;
NumberFormat numberFormat = NumberFormat.getInstance();
numberFormat.setMinimumIntegerDigits(5);
numberFormat.setGroupingUsed(false);
if (null != getTaskAttemptContext()) {
TaskID taskId = getTaskAttemptContext().getTaskAttemptID().getTaskID();
partition = taskId.getId();
} else {
partition = getConfiguration().getInt(JobContext.TASK_PARTITION, -1);
}
if (partition == -1) {
throw new IllegalArgumentException("This method can only be called from an application");
}
String taskType = getConfiguration().getBoolean(JobContext.TASK_ISMAP, JobContext.DEFAULT_TASK_ISMAP) ? "m" : "r";
StringBuilder result = new StringBuilder();
result.append(name);
result.append('-');
result.append(taskType);
result.append('-');
result.append(numberFormat.format(partition));
result.append(extension);
return result.toString();
}
/**
* Create an event to record the successful completion of a task
* @param id Task ID
* @param attemptId Task Attempt ID of the successful attempt for this task
* @param finishTime Finish time of the task
* @param taskType Type of the task
* @param status Status string
* @param counters Counters for the task
*/
public TaskFinishedEvent(TaskID id, TaskAttemptID attemptId, long finishTime,
TaskType taskType,
String status, Counters counters) {
this.taskid = id;
this.successfulAttemptId = attemptId;
this.finishTime = finishTime;
this.taskType = taskType;
this.status = status;
this.counters = counters;
}
/**
* Create an event to record task failure
* @param id Task ID
* @param finishTime Finish time of the task
* @param taskType Type of the task
* @param error Error String
* @param status Status
* @param failedDueToAttempt The attempt id due to which the task failed
* @param counters Counters for the task
*/
public TaskFailedEvent(TaskID id, long finishTime,
TaskType taskType, String error, String status,
TaskAttemptID failedDueToAttempt, Counters counters) {
this.id = id;
this.finishTime = finishTime;
this.taskType = taskType;
this.error = error;
this.status = status;
this.failedDueToAttempt = failedDueToAttempt;
this.counters = counters;
}
/**
* Create an event to record task failure
* @param id Task ID
* @param finishTime Finish time of the task
* @param taskType Type of the task
* @param error Error String
* @param status Status
* @param failedDueToAttempt The attempt id due to which the task failed
* @param counters Counters for the task
*/
public TaskFailedEvent(TaskID id, long finishTime,
TaskType taskType, String error, String status,
TaskAttemptID failedDueToAttempt, Counters counters) {
this.id = id;
this.finishTime = finishTime;
this.taskType = taskType;
this.error = error;
this.status = status;
this.failedDueToAttempt = failedDueToAttempt;
this.counters = counters;
}
/** Apply the filter (status) on the parsed job and generate summary */
public FilteredJob(JobInfo job, String status) {
filter = status;
Map<TaskID, JobHistoryParser.TaskInfo> tasks = job.getAllTasks();
for (JobHistoryParser.TaskInfo task : tasks.values()) {
Map<TaskAttemptID, JobHistoryParser.TaskAttemptInfo> attempts =
task.getAllTaskAttempts();
for (JobHistoryParser.TaskAttemptInfo attempt : attempts.values()) {
if (attempt.getTaskStatus().equals(status)) {
String hostname = attempt.getHostname();
TaskID id = attempt.getAttemptId().getTaskID();
Set<TaskID> set = badNodesToFilteredTasks.get(hostname);
if (set == null) {
set = new TreeSet<TaskID>();
set.add(id);
badNodesToFilteredTasks.put(hostname, set);
}else{
set.add(id);
}
}
}
}
}
public synchronized void tipFailed(TaskID taskId) {
if (!finishedMaps[taskId.getId()]) {
finishedMaps[taskId.getId()] = true;
if (--remainingMaps == 0) {
notifyAll();
}
updateStatus();
}
}
/**
* Generate a unique filename, based on the task id, name, and extension
* @param context the task that is calling this
* @param name the base filename
* @param extension the filename extension
* @return a string like $name-[jobType]-$id$extension
*/
protected synchronized String getCustomFileName(TaskAttemptContext context,
String name,
String extension) {
TaskID taskId = context.getTaskAttemptID().getTaskID();
int partition = taskId.getId();
return name + '-' + NUMBER_FORMAT.format(partition) + extension;
}