下面列出了怎么用org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptCompletionEvent的API类实例代码及写法,或者点击链接到github查看源代码。
@Override
public TaskAttemptCompletionEvent[] getTaskAttemptCompletionEvents(
int fromEventId, int maxEvents) {
TaskAttemptCompletionEvent[] events = EMPTY_TASK_ATTEMPT_COMPLETION_EVENTS;
readLock.lock();
try {
if (taskAttemptCompletionEvents.size() > fromEventId) {
int actualMax = Math.min(maxEvents,
(taskAttemptCompletionEvents.size() - fromEventId));
events = taskAttemptCompletionEvents.subList(fromEventId,
actualMax + fromEventId).toArray(events);
}
return events;
} finally {
readLock.unlock();
}
}
private void handleTaskAttemptCompletion(TaskAttemptId attemptId,
TaskAttemptCompletionEventStatus status) {
TaskAttempt attempt = attempts.get(attemptId);
//raise the completion event only if the container is assigned
// to nextAttemptNumber
if (attempt.getNodeHttpAddress() != null) {
TaskAttemptCompletionEvent tce = recordFactory
.newRecordInstance(TaskAttemptCompletionEvent.class);
tce.setEventId(-1);
String scheme = (encryptedShuffle) ? "https://" : "http://";
tce.setMapOutputServerAddress(StringInterner.weakIntern(scheme
+ attempt.getNodeHttpAddress().split(":")[0] + ":"
+ attempt.getShufflePort()));
tce.setStatus(status);
tce.setAttemptId(attempt.getID());
int runTime = 0;
if (attempt.getFinishTime() != 0 && attempt.getLaunchTime() !=0)
runTime = (int)(attempt.getFinishTime() - attempt.getLaunchTime());
tce.setAttemptRunTime(runTime);
//raise the event to job so that it adds the completion event to its
//data structures
eventHandler.handle(new JobTaskAttemptCompletedEvent(tce));
}
}
@Override
public TaskAttemptCompletionEvent[] getTaskAttemptCompletionEvents(
int fromEventId, int maxEvents) {
TaskAttemptCompletionEvent[] events = EMPTY_TASK_ATTEMPT_COMPLETION_EVENTS;
readLock.lock();
try {
if (taskAttemptCompletionEvents.size() > fromEventId) {
int actualMax = Math.min(maxEvents,
(taskAttemptCompletionEvents.size() - fromEventId));
events = taskAttemptCompletionEvents.subList(fromEventId,
actualMax + fromEventId).toArray(events);
}
return events;
} finally {
readLock.unlock();
}
}
private void handleTaskAttemptCompletion(TaskAttemptId attemptId,
TaskAttemptCompletionEventStatus status) {
TaskAttempt attempt = attempts.get(attemptId);
//raise the completion event only if the container is assigned
// to nextAttemptNumber
if (attempt.getNodeHttpAddress() != null) {
TaskAttemptCompletionEvent tce = recordFactory
.newRecordInstance(TaskAttemptCompletionEvent.class);
tce.setEventId(-1);
String scheme = (encryptedShuffle) ? "https://" : "http://";
tce.setMapOutputServerAddress(StringInterner.weakIntern(scheme
+ attempt.getNodeHttpAddress().split(":")[0] + ":"
+ attempt.getShufflePort()));
tce.setStatus(status);
tce.setAttemptId(attempt.getID());
int runTime = 0;
if (attempt.getFinishTime() != 0 && attempt.getLaunchTime() !=0)
runTime = (int)(attempt.getFinishTime() - attempt.getLaunchTime());
tce.setAttemptRunTime(runTime);
//raise the event to job so that it adds the completion event to its
//data structures
eventHandler.handle(new JobTaskAttemptCompletedEvent(tce));
}
}
private static TaskAttemptCompletionEvent createTce(int eventId,
boolean isMap, TaskAttemptCompletionEventStatus status) {
JobId jid = MRBuilderUtils.newJobId(12345, 1, 1);
TaskId tid = MRBuilderUtils.newTaskId(jid, 0,
isMap ? org.apache.hadoop.mapreduce.v2.api.records.TaskType.MAP
: org.apache.hadoop.mapreduce.v2.api.records.TaskType.REDUCE);
TaskAttemptId attemptId = MRBuilderUtils.newTaskAttemptId(tid, 0);
RecordFactory recordFactory = RecordFactoryProvider.getRecordFactory(null);
TaskAttemptCompletionEvent tce = recordFactory
.newRecordInstance(TaskAttemptCompletionEvent.class);
tce.setEventId(eventId);
tce.setAttemptId(attemptId);
tce.setStatus(status);
return tce;
}
@Override
public GetTaskAttemptCompletionEventsResponse getTaskAttemptCompletionEvents(
GetTaskAttemptCompletionEventsRequest request)
throws IOException {
GetTaskAttemptCompletionEventsResponse resp =
recordFactory.newRecordInstance(GetTaskAttemptCompletionEventsResponse.class);
resp.addAllCompletionEvents(new ArrayList<TaskAttemptCompletionEvent>());
return resp;
}
public static TaskCompletionEvent[] fromYarn(
TaskAttemptCompletionEvent[] newEvents) {
TaskCompletionEvent[] oldEvents =
new TaskCompletionEvent[newEvents.length];
int i = 0;
for (TaskAttemptCompletionEvent newEvent
: newEvents) {
oldEvents[i++] = fromYarn(newEvent);
}
return oldEvents;
}
public static TaskCompletionEvent fromYarn(
TaskAttemptCompletionEvent newEvent) {
return new TaskCompletionEvent(newEvent.getEventId(),
fromYarn(newEvent.getAttemptId()), newEvent.getAttemptId().getId(),
newEvent.getAttemptId().getTaskId().getTaskType().equals(TaskType.MAP),
fromYarn(newEvent.getStatus()),
newEvent.getMapOutputServerAddress());
}
private void initCompletionEvents() {
if (this.completionEvents != null) {
return;
}
GetTaskAttemptCompletionEventsResponseProtoOrBuilder p = viaProto ? proto : builder;
List<TaskAttemptCompletionEventProto> list = p.getCompletionEventsList();
this.completionEvents = new ArrayList<TaskAttemptCompletionEvent>();
for (TaskAttemptCompletionEventProto c : list) {
this.completionEvents.add(convertFromProtoFormat(c));
}
}
@Override
public void addAllCompletionEvents(final List<TaskAttemptCompletionEvent> completionEvents) {
if (completionEvents == null)
return;
initCompletionEvents();
this.completionEvents.addAll(completionEvents);
}
private void addCompletionEventsToProto() {
maybeInitBuilder();
builder.clearCompletionEvents();
if (completionEvents == null)
return;
Iterable<TaskAttemptCompletionEventProto> iterable = new Iterable<TaskAttemptCompletionEventProto>() {
@Override
public Iterator<TaskAttemptCompletionEventProto> iterator() {
return new Iterator<TaskAttemptCompletionEventProto>() {
Iterator<TaskAttemptCompletionEvent> iter = completionEvents.iterator();
@Override
public boolean hasNext() {
return iter.hasNext();
}
@Override
public TaskAttemptCompletionEventProto next() {
return convertToProtoFormat(iter.next());
}
@Override
public void remove() {
throw new UnsupportedOperationException();
}
};
}
};
builder.addAllCompletionEvents(iterable);
}
@Override
public synchronized TaskAttemptCompletionEvent[] getTaskAttemptCompletionEvents(
int fromEventId, int maxEvents) {
if (completionEvents == null) {
constructTaskAttemptCompletionEvents();
}
return getAttemptCompletionEvents(completionEvents,
fromEventId, maxEvents);
}
private static TaskAttemptCompletionEvent[] getAttemptCompletionEvents(
List<TaskAttemptCompletionEvent> eventList,
int startIndex, int maxEvents) {
TaskAttemptCompletionEvent[] events = new TaskAttemptCompletionEvent[0];
if (eventList.size() > startIndex) {
int actualMax = Math.min(maxEvents,
(eventList.size() - startIndex));
events = eventList.subList(startIndex, actualMax + startIndex)
.toArray(events);
}
return events;
}
private static TaskAttemptCompletionEvent createTce(int eventId,
boolean isMap, TaskAttemptCompletionEventStatus status) {
JobId jid = MRBuilderUtils.newJobId(12345, 1, 1);
TaskId tid = MRBuilderUtils.newTaskId(jid, 0,
isMap ? org.apache.hadoop.mapreduce.v2.api.records.TaskType.MAP
: org.apache.hadoop.mapreduce.v2.api.records.TaskType.REDUCE);
TaskAttemptId attemptId = MRBuilderUtils.newTaskAttemptId(tid, 0);
RecordFactory recordFactory = RecordFactoryProvider.getRecordFactory(null);
TaskAttemptCompletionEvent tce = recordFactory
.newRecordInstance(TaskAttemptCompletionEvent.class);
tce.setEventId(eventId);
tce.setAttemptId(attemptId);
tce.setStatus(status);
return tce;
}
@Override
public GetTaskAttemptCompletionEventsResponse getTaskAttemptCompletionEvents(
GetTaskAttemptCompletionEventsRequest request)
throws IOException {
GetTaskAttemptCompletionEventsResponse resp =
recordFactory.newRecordInstance(GetTaskAttemptCompletionEventsResponse.class);
resp.addAllCompletionEvents(new ArrayList<TaskAttemptCompletionEvent>());
return resp;
}
public static TaskCompletionEvent[] fromYarn(
TaskAttemptCompletionEvent[] newEvents) {
TaskCompletionEvent[] oldEvents =
new TaskCompletionEvent[newEvents.length];
int i = 0;
for (TaskAttemptCompletionEvent newEvent
: newEvents) {
oldEvents[i++] = fromYarn(newEvent);
}
return oldEvents;
}
public static TaskCompletionEvent fromYarn(
TaskAttemptCompletionEvent newEvent) {
return new TaskCompletionEvent(newEvent.getEventId(),
fromYarn(newEvent.getAttemptId()), newEvent.getAttemptId().getId(),
newEvent.getAttemptId().getTaskId().getTaskType().equals(TaskType.MAP),
fromYarn(newEvent.getStatus()),
newEvent.getMapOutputServerAddress());
}
private void initCompletionEvents() {
if (this.completionEvents != null) {
return;
}
GetTaskAttemptCompletionEventsResponseProtoOrBuilder p = viaProto ? proto : builder;
List<TaskAttemptCompletionEventProto> list = p.getCompletionEventsList();
this.completionEvents = new ArrayList<TaskAttemptCompletionEvent>();
for (TaskAttemptCompletionEventProto c : list) {
this.completionEvents.add(convertFromProtoFormat(c));
}
}
@Override
public void addAllCompletionEvents(final List<TaskAttemptCompletionEvent> completionEvents) {
if (completionEvents == null)
return;
initCompletionEvents();
this.completionEvents.addAll(completionEvents);
}
private void addCompletionEventsToProto() {
maybeInitBuilder();
builder.clearCompletionEvents();
if (completionEvents == null)
return;
Iterable<TaskAttemptCompletionEventProto> iterable = new Iterable<TaskAttemptCompletionEventProto>() {
@Override
public Iterator<TaskAttemptCompletionEventProto> iterator() {
return new Iterator<TaskAttemptCompletionEventProto>() {
Iterator<TaskAttemptCompletionEvent> iter = completionEvents.iterator();
@Override
public boolean hasNext() {
return iter.hasNext();
}
@Override
public TaskAttemptCompletionEventProto next() {
return convertToProtoFormat(iter.next());
}
@Override
public void remove() {
throw new UnsupportedOperationException();
}
};
}
};
builder.addAllCompletionEvents(iterable);
}
@Override
public synchronized TaskAttemptCompletionEvent[] getTaskAttemptCompletionEvents(
int fromEventId, int maxEvents) {
if (completionEvents == null) {
constructTaskAttemptCompletionEvents();
}
return getAttemptCompletionEvents(completionEvents,
fromEventId, maxEvents);
}
private static TaskAttemptCompletionEvent[] getAttemptCompletionEvents(
List<TaskAttemptCompletionEvent> eventList,
int startIndex, int maxEvents) {
TaskAttemptCompletionEvent[] events = new TaskAttemptCompletionEvent[0];
if (eventList.size() > startIndex) {
int actualMax = Math.min(maxEvents,
(eventList.size() - startIndex));
events = eventList.subList(startIndex, actualMax + startIndex)
.toArray(events);
}
return events;
}
@Override
public GetTaskAttemptCompletionEventsResponse getTaskAttemptCompletionEvents(
GetTaskAttemptCompletionEventsRequest request)
throws IOException {
GetTaskAttemptCompletionEventsResponse resp =
recordFactory.newRecordInstance(GetTaskAttemptCompletionEventsResponse.class);
resp.addAllCompletionEvents(new ArrayList<TaskAttemptCompletionEvent>());
return resp;
}
@Override
public GetTaskAttemptCompletionEventsResponse getTaskAttemptCompletionEvents(
GetTaskAttemptCompletionEventsRequest request)
throws IOException {
GetTaskAttemptCompletionEventsResponse resp =
recordFactory.newRecordInstance(GetTaskAttemptCompletionEventsResponse.class);
resp.addAllCompletionEvents(new ArrayList<TaskAttemptCompletionEvent>());
return resp;
}
TaskAttemptCompletionEvent[]
getTaskAttemptCompletionEvents(int fromEventId, int maxEvents);
public JobTaskAttemptCompletedEvent(TaskAttemptCompletionEvent completionEvent) {
super(completionEvent.getAttemptId().getTaskId().getJobId(),
JobEventType.JOB_TASK_ATTEMPT_COMPLETED);
this.completionEvent = completionEvent;
}
public TaskAttemptCompletionEvent getCompletionEvent() {
return completionEvent;
}
/**
* Note that this transition method is called directly (and synchronously)
* by MRAppMaster's init() method (i.e., no RPC, no thread-switching;
* just plain sequential call within AM context), so we can trigger
* modifications in AM state from here (at least, if AM is written that
* way; MR version is).
*/
@Override
public JobStateInternal transition(JobImpl job, JobEvent event) {
job.metrics.submittedJob(job);
job.metrics.preparingJob(job);
if (job.newApiCommitter) {
job.jobContext = new JobContextImpl(job.conf,
job.oldJobId);
} else {
job.jobContext = new org.apache.hadoop.mapred.JobContextImpl(
job.conf, job.oldJobId);
}
try {
setup(job);
job.fs = job.getFileSystem(job.conf);
//log to job history
JobSubmittedEvent jse = new JobSubmittedEvent(job.oldJobId,
job.conf.get(MRJobConfig.JOB_NAME, "test"),
job.conf.get(MRJobConfig.USER_NAME, "mapred"),
job.appSubmitTime,
job.remoteJobConfFile.toString(),
job.jobACLs, job.queueName,
job.conf.get(MRJobConfig.WORKFLOW_ID, ""),
job.conf.get(MRJobConfig.WORKFLOW_NAME, ""),
job.conf.get(MRJobConfig.WORKFLOW_NODE_NAME, ""),
getWorkflowAdjacencies(job.conf),
job.conf.get(MRJobConfig.WORKFLOW_TAGS, ""));
job.eventHandler.handle(new JobHistoryEvent(job.jobId, jse));
//TODO JH Verify jobACLs, UserName via UGI?
TaskSplitMetaInfo[] taskSplitMetaInfo = createSplits(job, job.jobId);
job.numMapTasks = taskSplitMetaInfo.length;
job.numReduceTasks = job.conf.getInt(MRJobConfig.NUM_REDUCES, 0);
if (job.numMapTasks == 0 && job.numReduceTasks == 0) {
job.addDiagnostic("No of maps and reduces are 0 " + job.jobId);
} else if (job.numMapTasks == 0) {
job.reduceWeight = 0.9f;
} else if (job.numReduceTasks == 0) {
job.mapWeight = 0.9f;
} else {
job.mapWeight = job.reduceWeight = 0.45f;
}
checkTaskLimits();
long inputLength = 0;
for (int i = 0; i < job.numMapTasks; ++i) {
inputLength += taskSplitMetaInfo[i].getInputDataLength();
}
job.makeUberDecision(inputLength);
job.taskAttemptCompletionEvents =
new ArrayList<TaskAttemptCompletionEvent>(
job.numMapTasks + job.numReduceTasks + 10);
job.mapAttemptCompletionEvents =
new ArrayList<TaskCompletionEvent>(job.numMapTasks + 10);
job.taskCompletionIdxToMapCompletionIdx = new ArrayList<Integer>(
job.numMapTasks + job.numReduceTasks + 10);
job.allowedMapFailuresPercent =
job.conf.getInt(MRJobConfig.MAP_FAILURES_MAX_PERCENT, 0);
job.allowedReduceFailuresPercent =
job.conf.getInt(MRJobConfig.REDUCE_FAILURES_MAXPERCENT, 0);
// create the Tasks but don't start them yet
createMapTasks(job, inputLength, taskSplitMetaInfo);
createReduceTasks(job);
job.metrics.endPreparingJob(job);
return JobStateInternal.INITED;
} catch (Exception e) {
LOG.warn("Job init failed", e);
job.metrics.endPreparingJob(job);
job.addDiagnostic("Job init failed : "
+ StringUtils.stringifyException(e));
// Leave job in the NEW state. The MR AM will detect that the state is
// not INITED and send a JOB_INIT_FAILED event.
return JobStateInternal.NEW;
}
}
@Override
public void transition(JobImpl job, JobEvent event) {
TaskAttemptCompletionEvent tce =
((JobTaskAttemptCompletedEvent) event).getCompletionEvent();
// Add the TaskAttemptCompletionEvent
//eventId is equal to index in the arraylist
tce.setEventId(job.taskAttemptCompletionEvents.size());
job.taskAttemptCompletionEvents.add(tce);
int mapEventIdx = -1;
if (TaskType.MAP.equals(tce.getAttemptId().getTaskId().getTaskType())) {
// we track map completions separately from task completions because
// - getMapAttemptCompletionEvents uses index ranges specific to maps
// - type converting the same events over and over is expensive
mapEventIdx = job.mapAttemptCompletionEvents.size();
job.mapAttemptCompletionEvents.add(TypeConverter.fromYarn(tce));
}
job.taskCompletionIdxToMapCompletionIdx.add(mapEventIdx);
TaskAttemptId attemptId = tce.getAttemptId();
TaskId taskId = attemptId.getTaskId();
//make the previous completion event as obsolete if it exists
Integer successEventNo =
job.successAttemptCompletionEventNoMap.remove(taskId);
if (successEventNo != null) {
TaskAttemptCompletionEvent successEvent =
job.taskAttemptCompletionEvents.get(successEventNo);
successEvent.setStatus(TaskAttemptCompletionEventStatus.OBSOLETE);
int mapCompletionIdx =
job.taskCompletionIdxToMapCompletionIdx.get(successEventNo);
if (mapCompletionIdx >= 0) {
// update the corresponding TaskCompletionEvent for the map
TaskCompletionEvent mapEvent =
job.mapAttemptCompletionEvents.get(mapCompletionIdx);
job.mapAttemptCompletionEvents.set(mapCompletionIdx,
new TaskCompletionEvent(mapEvent.getEventId(),
mapEvent.getTaskAttemptId(), mapEvent.idWithinJob(),
mapEvent.isMapTask(), TaskCompletionEvent.Status.OBSOLETE,
mapEvent.getTaskTrackerHttp()));
}
}
// if this attempt is not successful then why is the previous successful
// attempt being removed above - MAPREDUCE-4330
if (TaskAttemptCompletionEventStatus.SUCCEEDED.equals(tce.getStatus())) {
job.successAttemptCompletionEventNoMap.put(taskId, tce.getEventId());
// here we could have simply called Task.getSuccessfulAttempt() but
// the event that triggers this code is sent before
// Task.successfulAttempt is set and so there is no guarantee that it
// will be available now
Task task = job.tasks.get(taskId);
TaskAttempt attempt = task.getAttempt(attemptId);
NodeId nodeId = attempt.getNodeId();
assert (nodeId != null); // node must exist for a successful event
List<TaskAttemptId> taskAttemptIdList = job.nodesToSucceededTaskAttempts
.get(nodeId);
if (taskAttemptIdList == null) {
taskAttemptIdList = new ArrayList<TaskAttemptId>();
job.nodesToSucceededTaskAttempts.put(nodeId, taskAttemptIdList);
}
taskAttemptIdList.add(attempt.getID());
}
}
@Override
public TaskAttemptCompletionEvent[]
getTaskAttemptCompletionEvents(int fromEventId, int maxEvents) {
throw new UnsupportedOperationException("Not supported yet.");
}