下面列出了怎么用org.apache.hadoop.mapred.TaskStatus的API类实例代码及写法,或者点击链接到github查看源代码。
private static Values getPre21Value(String name) {
if (name.equalsIgnoreCase("JOB_CLEANUP")) {
return Values.CLEANUP;
}
if (name.equalsIgnoreCase("JOB_SETUP")) {
return Values.SETUP;
}
// Note that pre-21, the task state of a successful task was logged as
// SUCCESS while from 21 onwards, its logged as SUCCEEDED.
if (name.equalsIgnoreCase(TaskStatus.State.SUCCEEDED.toString())) {
return Values.SUCCESS;
}
return Values.valueOf(StringUtils.toUpperCase(name));
}
private TaskAttemptInfo getSuccessfulAttemptInfo(TaskType type, int task) {
TaskAttemptInfo ret;
for (int i = 0; true; ++i) {
// Rumen should make up an attempt if it's missing. Or this won't work
// at all. It's hard to discern what is happening in there.
ret = jobdesc.getTaskAttemptInfo(type, task, i);
if (ret.getRunState() == TaskStatus.State.SUCCEEDED) {
break;
}
}
if(ret.getRunState() != TaskStatus.State.SUCCEEDED) {
LOG.warn("No sucessful attempts tasktype " + type +" task "+ task);
}
return ret;
}
private static Values getPre21Value(String name) {
if (name.equalsIgnoreCase("JOB_CLEANUP")) {
return Values.CLEANUP;
}
if (name.equalsIgnoreCase("JOB_SETUP")) {
return Values.SETUP;
}
// Note that pre-21, the task state of a successful task was logged as
// SUCCESS while from 21 onwards, its logged as SUCCEEDED.
if (name.equalsIgnoreCase(TaskStatus.State.SUCCEEDED.toString())) {
return Values.SUCCESS;
}
return Values.valueOf(StringUtils.toUpperCase(name));
}
private TaskAttemptInfo getSuccessfulAttemptInfo(TaskType type, int task) {
TaskAttemptInfo ret;
for (int i = 0; true; ++i) {
// Rumen should make up an attempt if it's missing. Or this won't work
// at all. It's hard to discern what is happening in there.
ret = jobdesc.getTaskAttemptInfo(type, task, i);
if (ret.getRunState() == TaskStatus.State.SUCCEEDED) {
break;
}
}
if(ret.getRunState() != TaskStatus.State.SUCCEEDED) {
LOG.warn("No sucessful attempts tasktype " + type +" task "+ task);
}
return ret;
}
private void handleTaskFinishedEvent(TaskFinishedEvent event) {
TaskInfo taskInfo = info.tasksMap.get(event.getTaskId());
taskInfo.counters = event.getCounters();
taskInfo.finishTime = event.getFinishTime();
taskInfo.status = TaskStatus.State.SUCCEEDED.toString();
taskInfo.successfulAttemptId = event.getSuccessfulTaskAttemptId();
}
private void handleTaskFailedEvent(TaskFailedEvent event) {
TaskInfo taskInfo = info.tasksMap.get(event.getTaskId());
taskInfo.status = TaskStatus.State.FAILED.toString();
taskInfo.finishTime = event.getFinishTime();
taskInfo.error = StringInterner.weakIntern(event.getError());
taskInfo.failedDueToAttemptId = event.getFailedAttemptID();
taskInfo.counters = event.getCounters();
}
/** Get the event type */
public EventType getEventType() {
// Note that the task type can be setup/map/reduce/cleanup but the
// attempt-type can only be map/reduce.
// find out if the task failed or got killed
boolean failed = TaskStatus.State.FAILED.toString().equals(getTaskStatus());
return getTaskId().getTaskType() == TaskType.MAP
? (failed
? EventType.MAP_ATTEMPT_FAILED
: EventType.MAP_ATTEMPT_KILLED)
: (failed
? EventType.REDUCE_ATTEMPT_FAILED
: EventType.REDUCE_ATTEMPT_KILLED);
}
/**
* Print the job/task/attempt summary information
* @throws IOException
*/
public void print() throws IOException{
printJobDetails();
printTaskSummary();
printJobAnalysis();
printTasks(TaskType.JOB_SETUP, TaskStatus.State.FAILED.toString());
printTasks(TaskType.JOB_SETUP, TaskStatus.State.KILLED.toString());
printTasks(TaskType.MAP, TaskStatus.State.FAILED.toString());
printTasks(TaskType.MAP, TaskStatus.State.KILLED.toString());
printTasks(TaskType.REDUCE, TaskStatus.State.FAILED.toString());
printTasks(TaskType.REDUCE, TaskStatus.State.KILLED.toString());
printTasks(TaskType.JOB_CLEANUP, TaskStatus.State.FAILED.toString());
printTasks(TaskType.JOB_CLEANUP,
JobStatus.getJobRunState(JobStatus.KILLED));
if (printAll) {
printTasks(TaskType.JOB_SETUP, TaskStatus.State.SUCCEEDED.toString());
printTasks(TaskType.MAP, TaskStatus.State.SUCCEEDED.toString());
printTasks(TaskType.REDUCE, TaskStatus.State.SUCCEEDED.toString());
printTasks(TaskType.JOB_CLEANUP, TaskStatus.State.SUCCEEDED.toString());
printAllTaskAttempts(TaskType.JOB_SETUP);
printAllTaskAttempts(TaskType.MAP);
printAllTaskAttempts(TaskType.REDUCE);
printAllTaskAttempts(TaskType.JOB_CLEANUP);
}
FilteredJob filter = new FilteredJob(job,
TaskStatus.State.FAILED.toString());
printFailedAttempts(filter);
filter = new FilteredJob(job,
TaskStatus.State.KILLED.toString());
printFailedAttempts(filter);
}
public ShuffleSchedulerImpl(JobConf job, TaskStatus status,
TaskAttemptID reduceId,
ExceptionReporter reporter,
Progress progress,
Counters.Counter shuffledMapsCounter,
Counters.Counter reduceShuffleBytes,
Counters.Counter failedShuffleCounter) {
totalMaps = job.getNumMapTasks();
abortFailureLimit = Math.max(30, totalMaps / 10);
copyTimeTracker = new CopyTimeTracker();
remainingMaps = totalMaps;
finishedMaps = new boolean[remainingMaps];
this.reporter = reporter;
this.status = status;
this.reduceId = reduceId;
this.progress = progress;
this.shuffledMapsCounter = shuffledMapsCounter;
this.reduceShuffleBytes = reduceShuffleBytes;
this.failedShuffleCounter = failedShuffleCounter;
this.startTime = Time.monotonicNow();
lastProgressTime = startTime;
referee.start();
this.maxFailedUniqueFetches = Math.min(totalMaps, 5);
this.maxFetchFailuresBeforeReporting = job.getInt(
MRJobConfig.SHUFFLE_FETCH_FAILURES, REPORT_FAILURE_LIMIT);
this.reportReadErrorImmediately = job.getBoolean(
MRJobConfig.SHUFFLE_NOTIFY_READERROR, true);
this.maxDelay = job.getLong(MRJobConfig.MAX_SHUFFLE_FETCH_RETRY_DELAY,
MRJobConfig.DEFAULT_MAX_SHUFFLE_FETCH_RETRY_DELAY);
this.maxHostFailures = job.getInt(
MRJobConfig.MAX_SHUFFLE_FETCH_HOST_FAILURES,
MRJobConfig.DEFAULT_MAX_SHUFFLE_FETCH_HOST_FAILURES);
}
@SuppressWarnings("rawtypes")
@Test
public void testTipFailed() throws Exception {
JobConf job = new JobConf();
job.setNumMapTasks(2);
TaskStatus status = new TaskStatus() {
@Override
public boolean getIsMap() {
return false;
}
@Override
public void addFetchFailedMap(TaskAttemptID mapTaskId) {
}
};
Progress progress = new Progress();
TaskAttemptID reduceId = new TaskAttemptID("314159", 0, TaskType.REDUCE,
0, 0);
ShuffleSchedulerImpl scheduler = new ShuffleSchedulerImpl(job, status,
reduceId, null, progress, null, null, null);
JobID jobId = new JobID();
TaskID taskId1 = new TaskID(jobId, TaskType.REDUCE, 1);
scheduler.tipFailed(taskId1);
Assert.assertEquals("Progress should be 0.5", 0.5f, progress.getProgress(),
0.0f);
Assert.assertFalse(scheduler.waitUntilDone(1));
TaskID taskId0 = new TaskID(jobId, TaskType.REDUCE, 0);
scheduler.tipFailed(taskId0);
Assert.assertEquals("Progress should be 1.0", 1.0f, progress.getProgress(),
0.0f);
Assert.assertTrue(scheduler.waitUntilDone(1));
}
private void handleTaskFinishedEvent(TaskFinishedEvent event) {
TaskInfo taskInfo = info.tasksMap.get(event.getTaskId());
taskInfo.counters = event.getCounters();
taskInfo.finishTime = event.getFinishTime();
taskInfo.status = TaskStatus.State.SUCCEEDED.toString();
taskInfo.successfulAttemptId = event.getSuccessfulTaskAttemptId();
}
private void handleTaskFailedEvent(TaskFailedEvent event) {
TaskInfo taskInfo = info.tasksMap.get(event.getTaskId());
taskInfo.status = TaskStatus.State.FAILED.toString();
taskInfo.finishTime = event.getFinishTime();
taskInfo.error = StringInterner.weakIntern(event.getError());
taskInfo.failedDueToAttemptId = event.getFailedAttemptID();
taskInfo.counters = event.getCounters();
}
/** Get the event type */
public EventType getEventType() {
// Note that the task type can be setup/map/reduce/cleanup but the
// attempt-type can only be map/reduce.
// find out if the task failed or got killed
boolean failed = TaskStatus.State.FAILED.toString().equals(getTaskStatus());
return getTaskId().getTaskType() == TaskType.MAP
? (failed
? EventType.MAP_ATTEMPT_FAILED
: EventType.MAP_ATTEMPT_KILLED)
: (failed
? EventType.REDUCE_ATTEMPT_FAILED
: EventType.REDUCE_ATTEMPT_KILLED);
}
/**
* Print the job/task/attempt summary information
* @throws IOException
*/
public void print() throws IOException{
printJobDetails();
printTaskSummary();
printJobAnalysis();
printTasks(TaskType.JOB_SETUP, TaskStatus.State.FAILED.toString());
printTasks(TaskType.JOB_SETUP, TaskStatus.State.KILLED.toString());
printTasks(TaskType.MAP, TaskStatus.State.FAILED.toString());
printTasks(TaskType.MAP, TaskStatus.State.KILLED.toString());
printTasks(TaskType.REDUCE, TaskStatus.State.FAILED.toString());
printTasks(TaskType.REDUCE, TaskStatus.State.KILLED.toString());
printTasks(TaskType.JOB_CLEANUP, TaskStatus.State.FAILED.toString());
printTasks(TaskType.JOB_CLEANUP,
JobStatus.getJobRunState(JobStatus.KILLED));
if (printAll) {
printTasks(TaskType.JOB_SETUP, TaskStatus.State.SUCCEEDED.toString());
printTasks(TaskType.MAP, TaskStatus.State.SUCCEEDED.toString());
printTasks(TaskType.REDUCE, TaskStatus.State.SUCCEEDED.toString());
printTasks(TaskType.JOB_CLEANUP, TaskStatus.State.SUCCEEDED.toString());
printAllTaskAttempts(TaskType.JOB_SETUP);
printAllTaskAttempts(TaskType.MAP);
printAllTaskAttempts(TaskType.REDUCE);
printAllTaskAttempts(TaskType.JOB_CLEANUP);
}
FilteredJob filter = new FilteredJob(job,
TaskStatus.State.FAILED.toString());
printFailedAttempts(filter);
filter = new FilteredJob(job,
TaskStatus.State.KILLED.toString());
printFailedAttempts(filter);
}
public ShuffleSchedulerImpl(JobConf job, TaskStatus status,
TaskAttemptID reduceId,
ExceptionReporter reporter,
Progress progress,
Counters.Counter shuffledMapsCounter,
Counters.Counter reduceShuffleBytes,
Counters.Counter failedShuffleCounter) {
totalMaps = job.getNumMapTasks();
abortFailureLimit = Math.max(30, totalMaps / 10);
copyTimeTracker = new CopyTimeTracker();
remainingMaps = totalMaps;
finishedMaps = new boolean[remainingMaps];
this.reporter = reporter;
this.status = status;
this.reduceId = reduceId;
this.progress = progress;
this.shuffledMapsCounter = shuffledMapsCounter;
this.reduceShuffleBytes = reduceShuffleBytes;
this.failedShuffleCounter = failedShuffleCounter;
this.startTime = Time.monotonicNow();
lastProgressTime = startTime;
referee.start();
this.maxFailedUniqueFetches = Math.min(totalMaps, 5);
this.maxFetchFailuresBeforeReporting = job.getInt(
MRJobConfig.SHUFFLE_FETCH_FAILURES, REPORT_FAILURE_LIMIT);
this.reportReadErrorImmediately = job.getBoolean(
MRJobConfig.SHUFFLE_NOTIFY_READERROR, true);
this.maxDelay = job.getLong(MRJobConfig.MAX_SHUFFLE_FETCH_RETRY_DELAY,
MRJobConfig.DEFAULT_MAX_SHUFFLE_FETCH_RETRY_DELAY);
this.maxHostFailures = job.getInt(
MRJobConfig.MAX_SHUFFLE_FETCH_HOST_FAILURES,
MRJobConfig.DEFAULT_MAX_SHUFFLE_FETCH_HOST_FAILURES);
}
@SuppressWarnings("rawtypes")
@Test
public void testTipFailed() throws Exception {
JobConf job = new JobConf();
job.setNumMapTasks(2);
TaskStatus status = new TaskStatus() {
@Override
public boolean getIsMap() {
return false;
}
@Override
public void addFetchFailedMap(TaskAttemptID mapTaskId) {
}
};
Progress progress = new Progress();
TaskAttemptID reduceId = new TaskAttemptID("314159", 0, TaskType.REDUCE,
0, 0);
ShuffleSchedulerImpl scheduler = new ShuffleSchedulerImpl(job, status,
reduceId, null, progress, null, null, null);
JobID jobId = new JobID();
TaskID taskId1 = new TaskID(jobId, TaskType.REDUCE, 1);
scheduler.tipFailed(taskId1);
Assert.assertEquals("Progress should be 0.5", 0.5f, progress.getProgress(),
0.0f);
Assert.assertFalse(scheduler.waitUntilDone(1));
TaskID taskId0 = new TaskID(jobId, TaskType.REDUCE, 0);
scheduler.tipFailed(taskId0);
Assert.assertEquals("Progress should be 1.0", 1.0f, progress.getProgress(),
0.0f);
Assert.assertTrue(scheduler.waitUntilDone(1));
}
private void handleTaskAttemptFailedEvent(
TaskAttemptUnsuccessfulCompletionEvent event) {
TaskInfo taskInfo = info.tasksMap.get(event.getTaskId());
if(taskInfo == null) {
LOG.warn("TaskInfo is null for TaskAttemptUnsuccessfulCompletionEvent"
+ " taskId: " + event.getTaskId().toString());
return;
}
TaskAttemptInfo attemptInfo =
taskInfo.attemptsMap.get(event.getTaskAttemptId());
if(attemptInfo == null) {
LOG.warn("AttemptInfo is null for TaskAttemptUnsuccessfulCompletionEvent"
+ " taskAttemptId: " + event.getTaskAttemptId().toString());
return;
}
attemptInfo.finishTime = event.getFinishTime();
attemptInfo.error = StringInterner.weakIntern(event.getError());
attemptInfo.status = StringInterner.weakIntern(event.getTaskStatus());
attemptInfo.hostname = StringInterner.weakIntern(event.getHostname());
attemptInfo.port = event.getPort();
attemptInfo.rackname = StringInterner.weakIntern(event.getRackName());
attemptInfo.shuffleFinishTime = event.getFinishTime();
attemptInfo.sortFinishTime = event.getFinishTime();
attemptInfo.mapFinishTime = event.getFinishTime();
attemptInfo.counters = event.getCounters();
if(TaskStatus.State.SUCCEEDED.toString().equals(taskInfo.status))
{
//this is a successful task
if(attemptInfo.getAttemptId().equals(taskInfo.getSuccessfulAttemptId()))
{
// the failed attempt is the one that made this task successful
// so its no longer successful. Reset fields set in
// handleTaskFinishedEvent()
taskInfo.counters = null;
taskInfo.finishTime = -1;
taskInfo.status = null;
taskInfo.successfulAttemptId = null;
}
}
info.completedTaskAttemptsMap.put(event.getTaskAttemptId(), attemptInfo);
}
/** Create summary information for the parsed job */
public SummarizedJob(JobInfo job) {
tasks = job.getAllTasks();
for (JobHistoryParser.TaskInfo task : tasks.values()) {
Map<TaskAttemptID, JobHistoryParser.TaskAttemptInfo> attempts =
task.getAllTaskAttempts();
//allHosts.put(task.getHo(Keys.HOSTNAME), "");
for (JobHistoryParser.TaskAttemptInfo attempt : attempts.values()) {
long startTime = attempt.getStartTime();
long finishTime = attempt.getFinishTime();
if (attempt.getTaskType().equals(TaskType.MAP)) {
if (mapStarted== 0 || mapStarted > startTime) {
mapStarted = startTime;
}
if (mapFinished < finishTime) {
mapFinished = finishTime;
}
totalMaps++;
if (attempt.getTaskStatus().equals
(TaskStatus.State.FAILED.toString())) {
numFailedMaps++;
} else if (attempt.getTaskStatus().equals
(TaskStatus.State.KILLED.toString())) {
numKilledMaps++;
}
} else if (attempt.getTaskType().equals(TaskType.REDUCE)) {
if (reduceStarted==0||reduceStarted > startTime) {
reduceStarted = startTime;
}
if (reduceFinished < finishTime) {
reduceFinished = finishTime;
}
totalReduces++;
if (attempt.getTaskStatus().equals
(TaskStatus.State.FAILED.toString())) {
numFailedReduces++;
} else if (attempt.getTaskStatus().equals
(TaskStatus.State.KILLED.toString())) {
numKilledReduces++;
}
} else if (attempt.getTaskType().equals(TaskType.JOB_CLEANUP)) {
if (cleanupStarted==0||cleanupStarted > startTime) {
cleanupStarted = startTime;
}
if (cleanupFinished < finishTime) {
cleanupFinished = finishTime;
}
totalCleanups++;
if (attempt.getTaskStatus().equals
(TaskStatus.State.SUCCEEDED.toString())) {
numFinishedCleanups++;
} else if (attempt.getTaskStatus().equals
(TaskStatus.State.FAILED.toString())) {
numFailedCleanups++;
} else if (attempt.getTaskStatus().equals
(TaskStatus.State.KILLED.toString())) {
numKilledCleanups++;
}
} else if (attempt.getTaskType().equals(TaskType.JOB_SETUP)) {
if (setupStarted==0||setupStarted > startTime) {
setupStarted = startTime;
}
if (setupFinished < finishTime) {
setupFinished = finishTime;
}
totalSetups++;
if (attempt.getTaskStatus().equals
(TaskStatus.State.SUCCEEDED.toString())) {
numFinishedSetups++;
} else if (attempt.getTaskStatus().equals
(TaskStatus.State.FAILED.toString())) {
numFailedSetups++;
} else if (attempt.getTaskStatus().equals
(TaskStatus.State.KILLED.toString())) {
numKilledSetups++;
}
}
}
}
}
/** Generate analysis information for the parsed job */
public AnalyzedJob (JobInfo job) {
Map<TaskID, JobHistoryParser.TaskInfo> tasks = job.getAllTasks();
int finishedMaps = (int) job.getFinishedMaps();
int finishedReduces = (int) job.getFinishedReduces();
mapTasks =
new JobHistoryParser.TaskAttemptInfo[finishedMaps];
reduceTasks =
new JobHistoryParser.TaskAttemptInfo[finishedReduces];
int mapIndex = 0 , reduceIndex=0;
avgMapTime = 0;
avgReduceTime = 0;
avgShuffleTime = 0;
for (JobHistoryParser.TaskInfo task : tasks.values()) {
Map<TaskAttemptID, JobHistoryParser.TaskAttemptInfo> attempts =
task.getAllTaskAttempts();
for (JobHistoryParser.TaskAttemptInfo attempt : attempts.values()) {
if (attempt.getTaskStatus().
equals(TaskStatus.State.SUCCEEDED.toString())) {
long avgFinishTime = (attempt.getFinishTime() -
attempt.getStartTime());
if (attempt.getTaskType().equals(TaskType.MAP)) {
mapTasks[mapIndex++] = attempt;
avgMapTime += avgFinishTime;
} else if (attempt.getTaskType().equals(TaskType.REDUCE)) {
reduceTasks[reduceIndex++] = attempt;
avgShuffleTime += (attempt.getShuffleFinishTime() -
attempt.getStartTime());
avgReduceTime += (attempt.getFinishTime() -
attempt.getShuffleFinishTime());
}
break;
}
}
}
if (finishedMaps > 0) {
avgMapTime /= finishedMaps;
}
if (finishedReduces > 0) {
avgReduceTime /= finishedReduces;
avgShuffleTime /= finishedReduces;
}
}
@Override
public RawKeyValueIterator run() throws IOException, InterruptedException {
// Scale the maximum events we fetch per RPC call to mitigate OOM issues
// on the ApplicationMaster when a thundering herd of reducers fetch events
// TODO: This should not be necessary after HADOOP-8942
int eventsPerReducer = Math.max(MIN_EVENTS_TO_FETCH,
MAX_RPC_OUTSTANDING_EVENTS / jobConf.getNumReduceTasks());
int maxEventsToFetch = Math.min(MAX_EVENTS_TO_FETCH, eventsPerReducer);
// Start the map-completion events fetcher thread
final EventFetcher<K,V> eventFetcher =
new EventFetcher<K,V>(reduceId, umbilical, scheduler, this,
maxEventsToFetch);
eventFetcher.start();
// Start the map-output fetcher threads
boolean isLocal = localMapFiles != null;
final int numFetchers = isLocal ? 1 :
jobConf.getInt(MRJobConfig.SHUFFLE_PARALLEL_COPIES, 5);
Fetcher<K,V>[] fetchers = new Fetcher[numFetchers];
if (isLocal) {
fetchers[0] = new LocalFetcher<K, V>(jobConf, reduceId, scheduler,
merger, reporter, metrics, this, reduceTask.getShuffleSecret(),
localMapFiles);
fetchers[0].start();
} else {
for (int i=0; i < numFetchers; ++i) {
fetchers[i] = new Fetcher<K,V>(jobConf, reduceId, scheduler, merger,
reporter, metrics, this,
reduceTask.getShuffleSecret());
fetchers[i].start();
}
}
// Wait for shuffle to complete successfully
while (!scheduler.waitUntilDone(PROGRESS_FREQUENCY)) {
reporter.progress();
synchronized (this) {
if (throwable != null) {
throw new ShuffleError("error in shuffle in " + throwingThreadName,
throwable);
}
}
}
// Stop the event-fetcher thread
eventFetcher.shutDown();
// Stop the map-output fetcher threads
for (Fetcher<K,V> fetcher : fetchers) {
fetcher.shutDown();
}
// stop the scheduler
scheduler.close();
copyPhase.complete(); // copy is already complete
taskStatus.setPhase(TaskStatus.Phase.SORT);
reduceTask.statusUpdate(umbilical);
// Finish the on-going merges...
RawKeyValueIterator kvIter = null;
try {
kvIter = merger.close();
} catch (Throwable e) {
throw new ShuffleError("Error while doing final merge " , e);
}
// Sanity check
synchronized (this) {
if (throwable != null) {
throw new ShuffleError("error in shuffle in " + throwingThreadName,
throwable);
}
}
return kvIter;
}
@Test
/**
* A testing method verifying availability and accessibility of API that is needed
* for sub-classes of ShuffleConsumerPlugin
*/
public void testConsumerApi() {
JobConf jobConf = new JobConf();
ShuffleConsumerPlugin<K, V> shuffleConsumerPlugin = new TestShuffleConsumerPlugin<K, V>();
//mock creation
ReduceTask mockReduceTask = mock(ReduceTask.class);
TaskUmbilicalProtocol mockUmbilical = mock(TaskUmbilicalProtocol.class);
Reporter mockReporter = mock(Reporter.class);
FileSystem mockFileSystem = mock(FileSystem.class);
Class<? extends org.apache.hadoop.mapred.Reducer> combinerClass = jobConf.getCombinerClass();
@SuppressWarnings("unchecked") // needed for mock with generic
CombineOutputCollector<K, V> mockCombineOutputCollector =
(CombineOutputCollector<K, V>) mock(CombineOutputCollector.class);
org.apache.hadoop.mapreduce.TaskAttemptID mockTaskAttemptID =
mock(org.apache.hadoop.mapreduce.TaskAttemptID.class);
LocalDirAllocator mockLocalDirAllocator = mock(LocalDirAllocator.class);
CompressionCodec mockCompressionCodec = mock(CompressionCodec.class);
Counter mockCounter = mock(Counter.class);
TaskStatus mockTaskStatus = mock(TaskStatus.class);
Progress mockProgress = mock(Progress.class);
MapOutputFile mockMapOutputFile = mock(MapOutputFile.class);
Task mockTask = mock(Task.class);
try {
String [] dirs = jobConf.getLocalDirs();
// verify that these APIs are available through super class handler
ShuffleConsumerPlugin.Context<K, V> context =
new ShuffleConsumerPlugin.Context<K, V>(mockTaskAttemptID, jobConf, mockFileSystem,
mockUmbilical, mockLocalDirAllocator,
mockReporter, mockCompressionCodec,
combinerClass, mockCombineOutputCollector,
mockCounter, mockCounter, mockCounter,
mockCounter, mockCounter, mockCounter,
mockTaskStatus, mockProgress, mockProgress,
mockTask, mockMapOutputFile, null);
shuffleConsumerPlugin.init(context);
shuffleConsumerPlugin.run();
shuffleConsumerPlugin.close();
}
catch (Exception e) {
assertTrue("Threw exception:" + e, false);
}
// verify that these APIs are available for 3rd party plugins
mockReduceTask.getTaskID();
mockReduceTask.getJobID();
mockReduceTask.getNumMaps();
mockReduceTask.getPartition();
mockReporter.progress();
}
@SuppressWarnings("rawtypes")
@Test
public <K, V> void TestSucceedAndFailedCopyMap() throws Exception {
JobConf job = new JobConf();
job.setNumMapTasks(2);
//mock creation
TaskUmbilicalProtocol mockUmbilical = mock(TaskUmbilicalProtocol.class);
Reporter mockReporter = mock(Reporter.class);
FileSystem mockFileSystem = mock(FileSystem.class);
Class<? extends org.apache.hadoop.mapred.Reducer> combinerClass = job.getCombinerClass();
@SuppressWarnings("unchecked") // needed for mock with generic
CombineOutputCollector<K, V> mockCombineOutputCollector =
(CombineOutputCollector<K, V>) mock(CombineOutputCollector.class);
org.apache.hadoop.mapreduce.TaskAttemptID mockTaskAttemptID =
mock(org.apache.hadoop.mapreduce.TaskAttemptID.class);
LocalDirAllocator mockLocalDirAllocator = mock(LocalDirAllocator.class);
CompressionCodec mockCompressionCodec = mock(CompressionCodec.class);
Counter mockCounter = mock(Counter.class);
TaskStatus mockTaskStatus = mock(TaskStatus.class);
Progress mockProgress = mock(Progress.class);
MapOutputFile mockMapOutputFile = mock(MapOutputFile.class);
Task mockTask = mock(Task.class);
@SuppressWarnings("unchecked")
MapOutput<K, V> output = mock(MapOutput.class);
ShuffleConsumerPlugin.Context<K, V> context =
new ShuffleConsumerPlugin.Context<K, V>(
mockTaskAttemptID, job, mockFileSystem,
mockUmbilical, mockLocalDirAllocator,
mockReporter, mockCompressionCodec,
combinerClass, mockCombineOutputCollector,
mockCounter, mockCounter, mockCounter,
mockCounter, mockCounter, mockCounter,
mockTaskStatus, mockProgress, mockProgress,
mockTask, mockMapOutputFile, null);
TaskStatus status = new TaskStatus() {
@Override
public boolean getIsMap() {
return false;
}
@Override
public void addFetchFailedMap(TaskAttemptID mapTaskId) {
}
};
Progress progress = new Progress();
ShuffleSchedulerImpl<K, V> scheduler = new ShuffleSchedulerImpl<K, V>(job,
status, null, null, progress, context.getShuffledMapsCounter(),
context.getReduceShuffleBytes(), context.getFailedShuffleCounter());
MapHost host1 = new MapHost("host1", null);
TaskAttemptID failedAttemptID = new TaskAttemptID(
new org.apache.hadoop.mapred.TaskID(
new JobID("test",0), TaskType.MAP, 0), 0);
TaskAttemptID succeedAttemptID = new TaskAttemptID(
new org.apache.hadoop.mapred.TaskID(
new JobID("test",0), TaskType.MAP, 1), 1);
// handle output fetch failure for failedAttemptID, part I
scheduler.hostFailed(host1.getHostName());
// handle output fetch succeed for succeedAttemptID
long bytes = (long)500 * 1024 * 1024;
scheduler.copySucceeded(succeedAttemptID, host1, bytes, 0, 500000, output);
// handle output fetch failure for failedAttemptID, part II
// for MAPREDUCE-6361: verify no NPE exception get thrown out
scheduler.copyFailed(failedAttemptID, host1, true, false);
}
private void handleTaskAttemptFailedEvent(
TaskAttemptUnsuccessfulCompletionEvent event) {
TaskInfo taskInfo = info.tasksMap.get(event.getTaskId());
if(taskInfo == null) {
LOG.warn("TaskInfo is null for TaskAttemptUnsuccessfulCompletionEvent"
+ " taskId: " + event.getTaskId().toString());
return;
}
TaskAttemptInfo attemptInfo =
taskInfo.attemptsMap.get(event.getTaskAttemptId());
if(attemptInfo == null) {
LOG.warn("AttemptInfo is null for TaskAttemptUnsuccessfulCompletionEvent"
+ " taskAttemptId: " + event.getTaskAttemptId().toString());
return;
}
attemptInfo.finishTime = event.getFinishTime();
attemptInfo.error = StringInterner.weakIntern(event.getError());
attemptInfo.status = StringInterner.weakIntern(event.getTaskStatus());
attemptInfo.hostname = StringInterner.weakIntern(event.getHostname());
attemptInfo.port = event.getPort();
attemptInfo.rackname = StringInterner.weakIntern(event.getRackName());
attemptInfo.shuffleFinishTime = event.getFinishTime();
attemptInfo.sortFinishTime = event.getFinishTime();
attemptInfo.mapFinishTime = event.getFinishTime();
attemptInfo.counters = event.getCounters();
if(TaskStatus.State.SUCCEEDED.toString().equals(taskInfo.status))
{
//this is a successful task
if(attemptInfo.getAttemptId().equals(taskInfo.getSuccessfulAttemptId()))
{
// the failed attempt is the one that made this task successful
// so its no longer successful. Reset fields set in
// handleTaskFinishedEvent()
taskInfo.counters = null;
taskInfo.finishTime = -1;
taskInfo.status = null;
taskInfo.successfulAttemptId = null;
}
}
info.completedTaskAttemptsMap.put(event.getTaskAttemptId(), attemptInfo);
}
/** Create summary information for the parsed job */
public SummarizedJob(JobInfo job) {
tasks = job.getAllTasks();
for (JobHistoryParser.TaskInfo task : tasks.values()) {
Map<TaskAttemptID, JobHistoryParser.TaskAttemptInfo> attempts =
task.getAllTaskAttempts();
//allHosts.put(task.getHo(Keys.HOSTNAME), "");
for (JobHistoryParser.TaskAttemptInfo attempt : attempts.values()) {
long startTime = attempt.getStartTime();
long finishTime = attempt.getFinishTime();
if (attempt.getTaskType().equals(TaskType.MAP)) {
if (mapStarted== 0 || mapStarted > startTime) {
mapStarted = startTime;
}
if (mapFinished < finishTime) {
mapFinished = finishTime;
}
totalMaps++;
if (attempt.getTaskStatus().equals
(TaskStatus.State.FAILED.toString())) {
numFailedMaps++;
} else if (attempt.getTaskStatus().equals
(TaskStatus.State.KILLED.toString())) {
numKilledMaps++;
}
} else if (attempt.getTaskType().equals(TaskType.REDUCE)) {
if (reduceStarted==0||reduceStarted > startTime) {
reduceStarted = startTime;
}
if (reduceFinished < finishTime) {
reduceFinished = finishTime;
}
totalReduces++;
if (attempt.getTaskStatus().equals
(TaskStatus.State.FAILED.toString())) {
numFailedReduces++;
} else if (attempt.getTaskStatus().equals
(TaskStatus.State.KILLED.toString())) {
numKilledReduces++;
}
} else if (attempt.getTaskType().equals(TaskType.JOB_CLEANUP)) {
if (cleanupStarted==0||cleanupStarted > startTime) {
cleanupStarted = startTime;
}
if (cleanupFinished < finishTime) {
cleanupFinished = finishTime;
}
totalCleanups++;
if (attempt.getTaskStatus().equals
(TaskStatus.State.SUCCEEDED.toString())) {
numFinishedCleanups++;
} else if (attempt.getTaskStatus().equals
(TaskStatus.State.FAILED.toString())) {
numFailedCleanups++;
} else if (attempt.getTaskStatus().equals
(TaskStatus.State.KILLED.toString())) {
numKilledCleanups++;
}
} else if (attempt.getTaskType().equals(TaskType.JOB_SETUP)) {
if (setupStarted==0||setupStarted > startTime) {
setupStarted = startTime;
}
if (setupFinished < finishTime) {
setupFinished = finishTime;
}
totalSetups++;
if (attempt.getTaskStatus().equals
(TaskStatus.State.SUCCEEDED.toString())) {
numFinishedSetups++;
} else if (attempt.getTaskStatus().equals
(TaskStatus.State.FAILED.toString())) {
numFailedSetups++;
} else if (attempt.getTaskStatus().equals
(TaskStatus.State.KILLED.toString())) {
numKilledSetups++;
}
}
}
}
}
/** Generate analysis information for the parsed job */
public AnalyzedJob (JobInfo job) {
Map<TaskID, JobHistoryParser.TaskInfo> tasks = job.getAllTasks();
int finishedMaps = (int) job.getFinishedMaps();
int finishedReduces = (int) job.getFinishedReduces();
mapTasks =
new JobHistoryParser.TaskAttemptInfo[finishedMaps];
reduceTasks =
new JobHistoryParser.TaskAttemptInfo[finishedReduces];
int mapIndex = 0 , reduceIndex=0;
avgMapTime = 0;
avgReduceTime = 0;
avgShuffleTime = 0;
for (JobHistoryParser.TaskInfo task : tasks.values()) {
Map<TaskAttemptID, JobHistoryParser.TaskAttemptInfo> attempts =
task.getAllTaskAttempts();
for (JobHistoryParser.TaskAttemptInfo attempt : attempts.values()) {
if (attempt.getTaskStatus().
equals(TaskStatus.State.SUCCEEDED.toString())) {
long avgFinishTime = (attempt.getFinishTime() -
attempt.getStartTime());
if (attempt.getTaskType().equals(TaskType.MAP)) {
mapTasks[mapIndex++] = attempt;
avgMapTime += avgFinishTime;
} else if (attempt.getTaskType().equals(TaskType.REDUCE)) {
reduceTasks[reduceIndex++] = attempt;
avgShuffleTime += (attempt.getShuffleFinishTime() -
attempt.getStartTime());
avgReduceTime += (attempt.getFinishTime() -
attempt.getShuffleFinishTime());
}
break;
}
}
}
if (finishedMaps > 0) {
avgMapTime /= finishedMaps;
}
if (finishedReduces > 0) {
avgReduceTime /= finishedReduces;
avgShuffleTime /= finishedReduces;
}
}
@Override
public RawKeyValueIterator run() throws IOException, InterruptedException {
// Scale the maximum events we fetch per RPC call to mitigate OOM issues
// on the ApplicationMaster when a thundering herd of reducers fetch events
// TODO: This should not be necessary after HADOOP-8942
int eventsPerReducer = Math.max(MIN_EVENTS_TO_FETCH,
MAX_RPC_OUTSTANDING_EVENTS / jobConf.getNumReduceTasks());
int maxEventsToFetch = Math.min(MAX_EVENTS_TO_FETCH, eventsPerReducer);
// Start the map-completion events fetcher thread
final EventFetcher<K,V> eventFetcher =
new EventFetcher<K,V>(reduceId, umbilical, scheduler, this,
maxEventsToFetch);
eventFetcher.start();
// Start the map-output fetcher threads
boolean isLocal = localMapFiles != null;
final int numFetchers = isLocal ? 1 :
jobConf.getInt(MRJobConfig.SHUFFLE_PARALLEL_COPIES, 5);
Fetcher<K,V>[] fetchers = new Fetcher[numFetchers];
if (isLocal) {
fetchers[0] = new LocalFetcher<K, V>(jobConf, reduceId, scheduler,
merger, reporter, metrics, this, reduceTask.getShuffleSecret(),
localMapFiles);
fetchers[0].start();
} else {
for (int i=0; i < numFetchers; ++i) {
fetchers[i] = new Fetcher<K,V>(jobConf, reduceId, scheduler, merger,
reporter, metrics, this,
reduceTask.getShuffleSecret());
fetchers[i].start();
}
}
// Wait for shuffle to complete successfully
while (!scheduler.waitUntilDone(PROGRESS_FREQUENCY)) {
reporter.progress();
synchronized (this) {
if (throwable != null) {
throw new ShuffleError("error in shuffle in " + throwingThreadName,
throwable);
}
}
}
// Stop the event-fetcher thread
eventFetcher.shutDown();
// Stop the map-output fetcher threads
for (Fetcher<K,V> fetcher : fetchers) {
fetcher.shutDown();
}
// stop the scheduler
scheduler.close();
copyPhase.complete(); // copy is already complete
taskStatus.setPhase(TaskStatus.Phase.SORT);
reduceTask.statusUpdate(umbilical);
// Finish the on-going merges...
RawKeyValueIterator kvIter = null;
try {
kvIter = merger.close();
} catch (Throwable e) {
throw new ShuffleError("Error while doing final merge " , e);
}
// Sanity check
synchronized (this) {
if (throwable != null) {
throw new ShuffleError("error in shuffle in " + throwingThreadName,
throwable);
}
}
return kvIter;
}
@Test
/**
* A testing method verifying availability and accessibility of API that is needed
* for sub-classes of ShuffleConsumerPlugin
*/
public void testConsumerApi() {
JobConf jobConf = new JobConf();
ShuffleConsumerPlugin<K, V> shuffleConsumerPlugin = new TestShuffleConsumerPlugin<K, V>();
//mock creation
ReduceTask mockReduceTask = mock(ReduceTask.class);
TaskUmbilicalProtocol mockUmbilical = mock(TaskUmbilicalProtocol.class);
Reporter mockReporter = mock(Reporter.class);
FileSystem mockFileSystem = mock(FileSystem.class);
Class<? extends org.apache.hadoop.mapred.Reducer> combinerClass = jobConf.getCombinerClass();
@SuppressWarnings("unchecked") // needed for mock with generic
CombineOutputCollector<K, V> mockCombineOutputCollector =
(CombineOutputCollector<K, V>) mock(CombineOutputCollector.class);
org.apache.hadoop.mapreduce.TaskAttemptID mockTaskAttemptID =
mock(org.apache.hadoop.mapreduce.TaskAttemptID.class);
LocalDirAllocator mockLocalDirAllocator = mock(LocalDirAllocator.class);
CompressionCodec mockCompressionCodec = mock(CompressionCodec.class);
Counter mockCounter = mock(Counter.class);
TaskStatus mockTaskStatus = mock(TaskStatus.class);
Progress mockProgress = mock(Progress.class);
MapOutputFile mockMapOutputFile = mock(MapOutputFile.class);
Task mockTask = mock(Task.class);
try {
String [] dirs = jobConf.getLocalDirs();
// verify that these APIs are available through super class handler
ShuffleConsumerPlugin.Context<K, V> context =
new ShuffleConsumerPlugin.Context<K, V>(mockTaskAttemptID, jobConf, mockFileSystem,
mockUmbilical, mockLocalDirAllocator,
mockReporter, mockCompressionCodec,
combinerClass, mockCombineOutputCollector,
mockCounter, mockCounter, mockCounter,
mockCounter, mockCounter, mockCounter,
mockTaskStatus, mockProgress, mockProgress,
mockTask, mockMapOutputFile, null);
shuffleConsumerPlugin.init(context);
shuffleConsumerPlugin.run();
shuffleConsumerPlugin.close();
}
catch (Exception e) {
assertTrue("Threw exception:" + e, false);
}
// verify that these APIs are available for 3rd party plugins
mockReduceTask.getTaskID();
mockReduceTask.getJobID();
mockReduceTask.getNumMaps();
mockReduceTask.getPartition();
mockReporter.progress();
}
@SuppressWarnings("rawtypes")
@Test
public <K, V> void TestSucceedAndFailedCopyMap() throws Exception {
JobConf job = new JobConf();
job.setNumMapTasks(2);
//mock creation
TaskUmbilicalProtocol mockUmbilical = mock(TaskUmbilicalProtocol.class);
Reporter mockReporter = mock(Reporter.class);
FileSystem mockFileSystem = mock(FileSystem.class);
Class<? extends org.apache.hadoop.mapred.Reducer> combinerClass = job.getCombinerClass();
@SuppressWarnings("unchecked") // needed for mock with generic
CombineOutputCollector<K, V> mockCombineOutputCollector =
(CombineOutputCollector<K, V>) mock(CombineOutputCollector.class);
org.apache.hadoop.mapreduce.TaskAttemptID mockTaskAttemptID =
mock(org.apache.hadoop.mapreduce.TaskAttemptID.class);
LocalDirAllocator mockLocalDirAllocator = mock(LocalDirAllocator.class);
CompressionCodec mockCompressionCodec = mock(CompressionCodec.class);
Counter mockCounter = mock(Counter.class);
TaskStatus mockTaskStatus = mock(TaskStatus.class);
Progress mockProgress = mock(Progress.class);
MapOutputFile mockMapOutputFile = mock(MapOutputFile.class);
Task mockTask = mock(Task.class);
@SuppressWarnings("unchecked")
MapOutput<K, V> output = mock(MapOutput.class);
ShuffleConsumerPlugin.Context<K, V> context =
new ShuffleConsumerPlugin.Context<K, V>(
mockTaskAttemptID, job, mockFileSystem,
mockUmbilical, mockLocalDirAllocator,
mockReporter, mockCompressionCodec,
combinerClass, mockCombineOutputCollector,
mockCounter, mockCounter, mockCounter,
mockCounter, mockCounter, mockCounter,
mockTaskStatus, mockProgress, mockProgress,
mockTask, mockMapOutputFile, null);
TaskStatus status = new TaskStatus() {
@Override
public boolean getIsMap() {
return false;
}
@Override
public void addFetchFailedMap(TaskAttemptID mapTaskId) {
}
};
Progress progress = new Progress();
ShuffleSchedulerImpl<K, V> scheduler = new ShuffleSchedulerImpl<K, V>(job,
status, null, null, progress, context.getShuffledMapsCounter(),
context.getReduceShuffleBytes(), context.getFailedShuffleCounter());
MapHost host1 = new MapHost("host1", null);
TaskAttemptID failedAttemptID = new TaskAttemptID(
new org.apache.hadoop.mapred.TaskID(
new JobID("test",0), TaskType.MAP, 0), 0);
TaskAttemptID succeedAttemptID = new TaskAttemptID(
new org.apache.hadoop.mapred.TaskID(
new JobID("test",0), TaskType.MAP, 1), 1);
// handle output fetch failure for failedAttemptID, part I
scheduler.hostFailed(host1.getHostName());
// handle output fetch succeed for succeedAttemptID
long bytes = (long)500 * 1024 * 1024;
scheduler.copySucceeded(succeedAttemptID, host1, bytes, 0, 500000, output);
// handle output fetch failure for failedAttemptID, part II
// for MAPREDUCE-6361: verify no NPE exception get thrown out
scheduler.copyFailed(failedAttemptID, host1, true, false);
}
/**
* The getMapCompletion method is intended to inform taskTrackes when to change the status
* of reduce tasks from "shuffle" to "reduce".
* For all reduce tasks in this TaskTracker that are
* in the shuffle phase, getMapCompletionTasks finds the number of finished maps for
* this job from the jobInProgress object. If this
* number equals the number of desired maps for this job, then it adds an
* AllMapsCompletedTaskAction for this reduce task-attempt.
*
* @param status
* The status of the task tracker
* @return List of TaskTrackerActions
*/
private List<TaskTrackerAction> getMapCompletionTasks(
TaskTrackerStatus status,
List<TaskTrackerAction> tasksToKill) {
boolean loggingEnabled = LOG.isDebugEnabled();
// Build up the list of tasks about to be killed
Set<TaskAttemptID> killedTasks = new HashSet<TaskAttemptID>();
if (tasksToKill != null) {
for (TaskTrackerAction taskToKill : tasksToKill) {
killedTasks.add(((KillTaskAction)taskToKill).getTaskID());
}
}
String trackerName = status.getTrackerName();
List<TaskTrackerAction> actions = new ArrayList<TaskTrackerAction>();
// loop through the list of task statuses
for (TaskStatus report : status.getTaskReports()) {
TaskAttemptID taskAttemptId = report.getTaskID();
SimulatorJobInProgress job = getSimulatorJob(taskAttemptId.getJobID());
if(job ==null) {
// This job has completed before.
// and this is a zombie reduce-task
Set<JobID> jobsToCleanup = trackerToJobsToCleanup.get(trackerName);
if (jobsToCleanup == null) {
jobsToCleanup = new HashSet<JobID>();
trackerToJobsToCleanup.put(trackerName, jobsToCleanup);
}
jobsToCleanup.add(taskAttemptId.getJobID());
continue;
}
JobStatus jobStatus = job.getStatus();
TaskInProgress tip = taskidToTIPMap.get(taskAttemptId);
// if the job is running, attempt is running
// no KillTask is being sent for this attempt
// task is a reduce and attempt is in shuffle phase
// this precludes sending both KillTask and AllMapsCompletion
// for same reduce-attempt
if (jobStatus.getRunState()== JobStatus.RUNNING &&
tip.isRunningTask(taskAttemptId) &&
!killedTasks.contains(taskAttemptId) &&
!report.getIsMap() &&
report.getPhase() == TaskStatus.Phase.SHUFFLE) {
if (loggingEnabled) {
LOG.debug("Need map-completion information for REDUCEattempt "
+ taskAttemptId + " in tracker " + trackerName);
LOG.debug("getMapCompletion: job=" + job.getJobID() + " pendingMaps="
+ job.pendingMaps());
}
// Check whether the number of finishedMaps equals the
// number of maps
boolean canSendMapCompletion = false;
canSendMapCompletion = (job.finishedMaps()==job.desiredMaps());
if (canSendMapCompletion) {
if (loggingEnabled) {
LOG.debug("Adding MapCompletion for taskAttempt " + taskAttemptId
+ " in tracker " + trackerName);
LOG.debug("FinishedMaps for job:" + job.getJobID() + " is = "
+ job.finishedMaps() + "/" + job.desiredMaps());
LOG.debug("AllMapsCompleted for task " + taskAttemptId + " time="
+ getClock().getTime());
}
actions.add(new AllMapsCompletedTaskAction(taskAttemptId));
}
}
}
return actions;
}
@Override
void updateTaskStatuses(TaskTrackerStatus status) {
boolean loggingEnabled = LOG.isDebugEnabled();
String trackerName = status.getTrackerName();
// loop through the list of task statuses
if (loggingEnabled) {
LOG.debug("Updating task statuses for tracker " + trackerName);
}
for (TaskStatus report : status.getTaskReports()) {
report.setTaskTracker(trackerName);
TaskAttemptID taskAttemptId = report.getTaskID();
JobID jobid = taskAttemptId.getJobID();
if (loggingEnabled) {
LOG.debug("Updating status for job " + jobid + " for task = "
+ taskAttemptId + " status=" + report.getProgress()
+ " for tracker: " + trackerName);
}
SimulatorJobInProgress job =
getSimulatorJob(taskAttemptId.getJobID());
if(job ==null) {
// This job bas completed before.
Set<JobID> jobsToCleanup = trackerToJobsToCleanup.get(trackerName);
if (jobsToCleanup == null) {
jobsToCleanup = new HashSet<JobID>();
trackerToJobsToCleanup.put(trackerName, jobsToCleanup);
}
jobsToCleanup.add(taskAttemptId.getJobID());
continue;
}
TaskInProgress tip = taskidToTIPMap.get(taskAttemptId);
JobStatus prevStatus = (JobStatus) job.getStatus().clone();
job.updateTaskStatus(tip, (TaskStatus) report.clone());
JobStatus newStatus = (JobStatus) job.getStatus().clone();
if (tip.isComplete()) {
if (loggingEnabled) {
LOG.debug("Completed task attempt " + taskAttemptId + " tracker:"
+ trackerName + " time=" + getClock().getTime());
}
}
if (prevStatus.getRunState() != newStatus.getRunState()) {
if (loggingEnabled) {
LOG.debug("Informing Listeners of job " + jobid + " of newStatus "
+ JobStatus.getJobRunState(newStatus.getRunState()));
}
JobStatusChangeEvent event = new JobStatusChangeEvent(job,
EventType.RUN_STATE_CHANGED, prevStatus, newStatus);
updateJobInProgressListeners(event);
}
}
}