下面列出了怎么用org.apache.hadoop.mapred.JobStatus的API类实例代码及写法,或者点击链接到github查看源代码。
private void printJobAnalysis() {
if (!job.getJobStatus().equals
(JobStatus.getJobRunState(JobStatus.SUCCEEDED))) {
System.out.println("No Analysis available as job did not finish");
return;
}
AnalyzedJob avg = new AnalyzedJob(job);
System.out.println("\nAnalysis");
System.out.println("=========");
printAnalysis(avg.getMapTasks(), cMap, "map", avg.getAvgMapTime(), 10);
printLast(avg.getMapTasks(), "map", cFinishMapRed);
if (avg.getReduceTasks().length > 0) {
printAnalysis(avg.getReduceTasks(), cShuffle, "shuffle",
avg.getAvgShuffleTime(), 10);
printLast(avg.getReduceTasks(), "shuffle", cFinishShuffle);
printAnalysis(avg.getReduceTasks(), cReduce, "reduce",
avg.getAvgReduceTime(), 10);
printLast(avg.getReduceTasks(), "reduce", cFinishMapRed);
}
System.out.println("=========");
}
private void printJobAnalysis() {
if (!job.getJobStatus().equals
(JobStatus.getJobRunState(JobStatus.SUCCEEDED))) {
System.out.println("No Analysis available as job did not finish");
return;
}
AnalyzedJob avg = new AnalyzedJob(job);
System.out.println("\nAnalysis");
System.out.println("=========");
printAnalysis(avg.getMapTasks(), cMap, "map", avg.getAvgMapTime(), 10);
printLast(avg.getMapTasks(), "map", cFinishMapRed);
if (avg.getReduceTasks().length > 0) {
printAnalysis(avg.getReduceTasks(), cShuffle, "shuffle",
avg.getAvgShuffleTime(), 10);
printLast(avg.getReduceTasks(), "shuffle", cFinishShuffle);
printAnalysis(avg.getReduceTasks(), cReduce, "reduce",
avg.getAvgReduceTime(), 10);
printLast(avg.getReduceTasks(), "reduce", cFinishMapRed);
}
System.out.println("=========");
}
/**
* Convert map-reduce specific job status constants to Sqoop job status
* constants.
*
* @param status Map-reduce job constant
* @return Equivalent submission status
*/
private SubmissionStatus convertMapreduceState(int status) {
if(status == JobStatus.PREP) {
return SubmissionStatus.BOOTING;
} else if (status == JobStatus.RUNNING) {
return SubmissionStatus.RUNNING;
} else if (status == JobStatus.FAILED) {
return SubmissionStatus.FAILED;
} else if (status == JobStatus.KILLED) {
return SubmissionStatus.FAILED;
} else if (status == JobStatus.SUCCEEDED) {
return SubmissionStatus.SUCCEEDED;
}
throw new SqoopException(MapreduceSubmissionError.MAPREDUCE_0004,
"Unknown status " + status);
}
/** {@inheritDoc} */
@Override public void run(HadoopTaskContext taskCtx) throws IgniteCheckedException {
HadoopV2TaskContext ctx = (HadoopV2TaskContext)taskCtx;
JobContext jobCtx = ctx.jobContext();
try {
OutputCommitter committer = jobCtx.getJobConf().getOutputCommitter();
if (abort)
committer.abortJob(jobCtx, JobStatus.State.FAILED);
else
committer.commitJob(jobCtx);
}
catch (IOException e) {
throw new IgniteCheckedException(e);
}
}
public Map<String, Object> getJobDetails(JobClient jobClient, String jobId)
throws AnkushException {
String errMsg = "Unable to getch Hadoop jobs details, could not connect to Hadoop JobClient.";
try {
if (jobClient != null) {
// Get the jobs that are submitted.
JobStatus[] jobStatus = jobClient.getAllJobs();
for (JobStatus jobSts : jobStatus) {
}
}
} catch (Exception e) {
HadoopUtils.addAndLogError(this.LOG, this.clusterConfig, errMsg,
Constant.Component.Name.HADOOP, e);
throw new AnkushException(errMsg);
}
return null;
}
/**
* Update this job status according to the given JobStatus
*
* @param status
*/
void update(JobStatus status) {
this.status = status;
try {
this.counters = running.getCounters();
this.completed = running.isComplete();
this.successful = running.isSuccessful();
this.mapProgress = running.mapProgress();
this.reduceProgress = running.reduceProgress();
// running.getTaskCompletionEvents(fromEvent);
} catch (IOException ioe) {
ioe.printStackTrace();
}
this.completedMaps = (int) (this.totalMaps * this.mapProgress);
this.completedReduces = (int) (this.totalReduces * this.reduceProgress);
}
@Override
public synchronized JobStatus submitJob(JobID jobId) throws IOException {
boolean loggingEnabled = LOG.isDebugEnabled();
if (loggingEnabled) {
LOG.debug("submitJob for jobname = " + jobId);
}
if (jobs.containsKey(jobId)) {
// job already running, don't start twice
if (loggingEnabled) {
LOG.debug("Job '" + jobId.getId() + "' already present ");
}
return jobs.get(jobId).getStatus();
}
JobStory jobStory = SimulatorJobCache.get(jobId);
if (jobStory == null) {
throw new IllegalArgumentException("Job not found in SimulatorJobCache: "+jobId);
}
validateAndSetClock(jobStory.getSubmissionTime());
SimulatorJobInProgress job = new SimulatorJobInProgress(jobId, this,
this.conf,
jobStory);
return addJob(jobId, job);
}
/**
* Safely clean-up all data structures at the end of the
* job (success/failure/killed). In addition to performing the tasks that the
* original finalizeJob does, we also inform the SimulatorEngine about the
* completion of this job.
*
* @param job completed job.
*/
@Override
synchronized void finalizeJob(JobInProgress job) {
// Let the SimulatorEngine know that the job is done
JobStatus cloneStatus = (JobStatus)job.getStatus().clone();
engine.markCompletedJob(cloneStatus,
SimulatorJobTracker.getClock().getTime());
JobID jobId = job.getStatus().getJobID();
LOG.info("Finished job " + jobId + " endtime = " +
getClock().getTime() + " with status: " +
JobStatus.getJobRunState(job.getStatus().getRunState()));
// for updating the metrics and JobHistory, invoke the original
// finalizeJob.
super.finalizeJob(job);
// now placing this job in queue for future nuking
cleanupJob(job);
}
@Test
public void testRun_Running() throws Exception {
String jobId = "job_201407251005_0815";
createDefinition("mytest", jobId);
RunningJob job = createJob(jobId, JobStatus.RUNNING);
when(job.getJobState()).thenReturn(JobStatus.RUNNING);
Assert.assertEquals(0, executorService.getQueue().size());
checkAllIndexes();
Assert.assertEquals(1, executorService.getQueue().size());
verify(model, VerificationModeFactory.times(1)).getIndexer(anyString());
verify(model, VerificationModeFactory.times(0)).updateIndexerInternal(any(IndexerDefinition.class));
Thread.sleep(60);
Assert.assertEquals(1, executorService.getQueue().size());
verify(model, VerificationModeFactory.times(2)).getIndexer(anyString());
verify(model, VerificationModeFactory.times(0)).updateIndexerInternal(any(IndexerDefinition.class));
when(job.getJobState()).thenReturn(JobStatus.SUCCEEDED);
Thread.sleep(60);
Assert.assertEquals(0, executorService.getQueue().size());
verify(model, VerificationModeFactory.times(3)).getIndexer(anyString());
verify(model, VerificationModeFactory.times(1)).updateIndexerInternal(any(IndexerDefinition.class));
}
/**
* Update this job status according to the given JobStatus
*
* @param status
*/
void update(JobStatus status) {
this.status = status;
try {
this.counters = running.getCounters();
this.completed = running.isComplete();
this.successful = running.isSuccessful();
this.mapProgress = running.mapProgress();
this.reduceProgress = running.reduceProgress();
// running.getTaskCompletionEvents(fromEvent);
} catch (IOException ioe) {
ioe.printStackTrace();
}
this.completedMaps = (int) (this.totalMaps * this.mapProgress);
this.completedReduces = (int) (this.totalReduces * this.reduceProgress);
}
protected int jobFailed(Job job, RunningJob runningJob, FileSystem fs, Path workDir) throws IOException {
log.error("Map Reduce job " + job.getJobName() + " was unsuccessful. Check the logs.");
log.error("Since job was not successful, deleting work directory: " + workDir);
boolean deleted = fs.delete(workDir, true);
if (!deleted) {
log.error("Unable to remove job working directory: " + workDir);
}
if (runningJob.getJobState() == JobStatus.KILLED) {
log.warn("Job was killed");
return -2;
} else {
log.error("Job failed with a jobstate of " + runningJob.getJobState());
return -3;
}
}
private void handleJobFinishedEvent(JobFinishedEvent event) {
info.finishTime = event.getFinishTime();
info.finishedMaps = event.getFinishedMaps();
info.finishedReduces = event.getFinishedReduces();
info.failedMaps = event.getFailedMaps();
info.failedReduces = event.getFailedReduces();
info.totalCounters = event.getTotalCounters();
info.mapCounters = event.getMapCounters();
info.reduceCounters = event.getReduceCounters();
info.jobStatus = JobStatus.getJobRunState(JobStatus.SUCCEEDED);
}
/**
* Print the job/task/attempt summary information
* @throws IOException
*/
public void print() throws IOException{
printJobDetails();
printTaskSummary();
printJobAnalysis();
printTasks(TaskType.JOB_SETUP, TaskStatus.State.FAILED.toString());
printTasks(TaskType.JOB_SETUP, TaskStatus.State.KILLED.toString());
printTasks(TaskType.MAP, TaskStatus.State.FAILED.toString());
printTasks(TaskType.MAP, TaskStatus.State.KILLED.toString());
printTasks(TaskType.REDUCE, TaskStatus.State.FAILED.toString());
printTasks(TaskType.REDUCE, TaskStatus.State.KILLED.toString());
printTasks(TaskType.JOB_CLEANUP, TaskStatus.State.FAILED.toString());
printTasks(TaskType.JOB_CLEANUP,
JobStatus.getJobRunState(JobStatus.KILLED));
if (printAll) {
printTasks(TaskType.JOB_SETUP, TaskStatus.State.SUCCEEDED.toString());
printTasks(TaskType.MAP, TaskStatus.State.SUCCEEDED.toString());
printTasks(TaskType.REDUCE, TaskStatus.State.SUCCEEDED.toString());
printTasks(TaskType.JOB_CLEANUP, TaskStatus.State.SUCCEEDED.toString());
printAllTaskAttempts(TaskType.JOB_SETUP);
printAllTaskAttempts(TaskType.MAP);
printAllTaskAttempts(TaskType.REDUCE);
printAllTaskAttempts(TaskType.JOB_CLEANUP);
}
FilteredJob filter = new FilteredJob(job,
TaskStatus.State.FAILED.toString());
printFailedAttempts(filter);
filter = new FilteredJob(job,
TaskStatus.State.KILLED.toString());
printFailedAttempts(filter);
}
private void handleJobFinishedEvent(JobFinishedEvent event) {
info.finishTime = event.getFinishTime();
info.finishedMaps = event.getFinishedMaps();
info.finishedReduces = event.getFinishedReduces();
info.failedMaps = event.getFailedMaps();
info.failedReduces = event.getFailedReduces();
info.totalCounters = event.getTotalCounters();
info.mapCounters = event.getMapCounters();
info.reduceCounters = event.getReduceCounters();
info.jobStatus = JobStatus.getJobRunState(JobStatus.SUCCEEDED);
}
/**
* Print the job/task/attempt summary information
* @throws IOException
*/
public void print() throws IOException{
printJobDetails();
printTaskSummary();
printJobAnalysis();
printTasks(TaskType.JOB_SETUP, TaskStatus.State.FAILED.toString());
printTasks(TaskType.JOB_SETUP, TaskStatus.State.KILLED.toString());
printTasks(TaskType.MAP, TaskStatus.State.FAILED.toString());
printTasks(TaskType.MAP, TaskStatus.State.KILLED.toString());
printTasks(TaskType.REDUCE, TaskStatus.State.FAILED.toString());
printTasks(TaskType.REDUCE, TaskStatus.State.KILLED.toString());
printTasks(TaskType.JOB_CLEANUP, TaskStatus.State.FAILED.toString());
printTasks(TaskType.JOB_CLEANUP,
JobStatus.getJobRunState(JobStatus.KILLED));
if (printAll) {
printTasks(TaskType.JOB_SETUP, TaskStatus.State.SUCCEEDED.toString());
printTasks(TaskType.MAP, TaskStatus.State.SUCCEEDED.toString());
printTasks(TaskType.REDUCE, TaskStatus.State.SUCCEEDED.toString());
printTasks(TaskType.JOB_CLEANUP, TaskStatus.State.SUCCEEDED.toString());
printAllTaskAttempts(TaskType.JOB_SETUP);
printAllTaskAttempts(TaskType.MAP);
printAllTaskAttempts(TaskType.REDUCE);
printAllTaskAttempts(TaskType.JOB_CLEANUP);
}
FilteredJob filter = new FilteredJob(job,
TaskStatus.State.FAILED.toString());
printFailedAttempts(filter);
filter = new FilteredJob(job,
TaskStatus.State.KILLED.toString());
printFailedAttempts(filter);
}
/**
* Returns an XML-formatted table of the jobs in the list.
* This is called repeatedly for different lists of jobs (e.g., running, completed, failed).
*/
public void generateJobTable(JspWriter out, String label, List<JobInProgress> jobs)
throws IOException {
if (jobs.size() > 0) {
for (JobInProgress job : jobs) {
JobProfile profile = job.getProfile();
JobStatus status = job.getStatus();
JobID jobid = profile.getJobID();
int desiredMaps = job.desiredMaps();
int desiredReduces = job.desiredReduces();
int completedMaps = job.finishedMaps();
int completedReduces = job.finishedReduces();
String name = profile.getJobName();
out.print("<" + label + "_job jobid=\"" + jobid + "\">\n");
out.print(" <jobid>" + jobid + "</jobid>\n");
out.print(" <user>" + profile.getUser() + "</user>\n");
out.print(" <name>" + ("".equals(name) ? " " : name) + "</name>\n");
out.print(" <map_complete>" + StringUtils.formatPercent(status.mapProgress(), 2) + "</map_complete>\n");
out.print(" <map_total>" + desiredMaps + "</map_total>\n");
out.print(" <maps_completed>" + completedMaps + "</maps_completed>\n");
out.print(" <reduce_complete>" + StringUtils.formatPercent(status.reduceProgress(), 2) + "</reduce_complete>\n");
out.print(" <reduce_total>" + desiredReduces + "</reduce_total>\n");
out.print(" <reduces_completed>" + completedReduces + "</reduces_completed>\n");
out.print("</" + label + "_job>\n");
}
}
}
/**
* Updates the status of a job
*
* @param job the job to update
*/
private void updateJob(final HadoopJob job, JobStatus status) {
job.update(status);
Display.getDefault().asyncExec(new Runnable() {
public void run() {
fireJobChanged(job);
}
});
}
static JobState ofInt(int state) {
switch (state) {
case JobStatus.PREP:
return PREPARE;
case JobStatus.RUNNING:
return RUNNING;
case JobStatus.FAILED:
return FAILED;
case JobStatus.SUCCEEDED:
return SUCCEEDED;
default:
return null;
}
}
/**
* Constructor for a Hadoop job representation
*
* @param location
* @param id
* @param running
* @param status
*/
public HadoopJob(HadoopServer location, JobID id, RunningJob running,
JobStatus status) {
this.location = location;
this.jobId = id;
this.running = running;
loadJobFile();
update(status);
}
@SuppressWarnings("deprecation")
private JobStatus submitJob(JobStory job)
throws IOException, InterruptedException {
// honor the JobID from JobStory first.
JobID jobId = job.getJobID();
if (jobId == null) {
// If not available, obtain JobID from JobTracker.
jobId = jobTracker.getNewJobId();
}
SimulatorJobCache.put(org.apache.hadoop.mapred.JobID.downgrade(jobId), job);
return jobTracker.submitJob(org.apache.hadoop.mapred.JobID.downgrade(jobId));
}
@Test
public void testRun() throws Exception {
String jobId = "job_201407251005_0815";
createDefinition("mytest", jobId);
createJob(jobId, JobStatus.SUCCEEDED);
checkAllIndexes();
verify(model, VerificationModeFactory.atLeastOnce()).updateIndexerInternal(any(IndexerDefinition.class));
}
@Test
public void testRun_MultiIndex() throws Exception {
String jobId1 = "job_201407251005_0001";
String jobId2 = "job_201407251005_0002";
createDefinition("mytest1", jobId1);
RunningJob job1 = createJob(jobId1, JobStatus.RUNNING);
createDefinition("mytest2", jobId2);
RunningJob job2 = createJob(jobId2, JobStatus.RUNNING);
Assert.assertEquals(0, executorService.getQueue().size());
checkAllIndexes();
Assert.assertEquals(2, executorService.getQueue().size());
verify(model, VerificationModeFactory.times(0)).updateIndexerInternal(any(IndexerDefinition.class));
Thread.sleep(60);
Assert.assertEquals(2, executorService.getQueue().size());
verify(model, VerificationModeFactory.times(0)).updateIndexerInternal(any(IndexerDefinition.class));
when(job1.getJobState()).thenReturn(JobStatus.SUCCEEDED);
Thread.sleep(60);
Assert.assertEquals(1, executorService.getQueue().size());
verify(model, VerificationModeFactory.times(1)).updateIndexerInternal(any(IndexerDefinition.class));
when(job2.getJobState()).thenReturn(JobStatus.SUCCEEDED);
Thread.sleep(60);
Assert.assertEquals(0, executorService.getQueue().size());
verify(model, VerificationModeFactory.times(2)).updateIndexerInternal(any(IndexerDefinition.class));
Thread.sleep(60);
Assert.assertEquals(0, executorService.getQueue().size());
verify(model, VerificationModeFactory.times(2)).updateIndexerInternal(any(IndexerDefinition.class));
}
/**
* Updates the status of a job
*
* @param job the job to update
*/
private void updateJob(final HadoopJob job, JobStatus status) {
job.update(status);
Display.getDefault().asyncExec(new Runnable() {
public void run() {
fireJobChanged(job);
}
});
}
static JobState ofInt(int state) {
switch (state) {
case JobStatus.PREP:
return PREPARE;
case JobStatus.RUNNING:
return RUNNING;
case JobStatus.FAILED:
return FAILED;
case JobStatus.SUCCEEDED:
return SUCCEEDED;
default:
return null;
}
}
/**
* Constructor for a Hadoop job representation
*
* @param location
* @param id
* @param running
* @param status
*/
public HadoopJob(HadoopServer location, JobID id, RunningJob running,
JobStatus status) {
this.location = location;
this.jobId = id;
this.running = running;
loadJobFile();
update(status);
}
private void updateRunnability() {
// Start by marking everything as not runnable
for (JobInfo info: infos.values()) {
info.runnable = false;
}
// Create a list of sorted jobs in order of start time and priority
List<JobInProgress> jobs = new ArrayList<JobInProgress>(infos.keySet());
Collections.sort(jobs, new FifoJobComparator());
// Mark jobs as runnable in order of start time and priority, until
// user or pool limits have been reached.
Map<String, Integer> userJobs = new HashMap<String, Integer>();
Map<String, Integer> poolJobs = new HashMap<String, Integer>();
for (JobInProgress job: jobs) {
if (job.getStatus().getRunState() == JobStatus.RUNNING) {
String user = job.getJobConf().getUser();
String pool = poolMgr.getPoolName(job);
int userCount = userJobs.containsKey(user) ? userJobs.get(user) : 0;
int poolCount = poolJobs.containsKey(pool) ? poolJobs.get(pool) : 0;
if (userCount < poolMgr.getUserMaxJobs(user) &&
poolCount < poolMgr.getPoolMaxJobs(pool)) {
infos.get(job).runnable = true;
userJobs.put(user, userCount + 1);
poolJobs.put(pool, poolCount + 1);
}
}
}
}
public FakeJobInProgress(JobConf jobConf,
FakeTaskTrackerManager taskTrackerManager) throws IOException {
super(new JobID("test", ++jobCounter), jobConf);
this.taskTrackerManager = taskTrackerManager;
this.startTime = System.currentTimeMillis();
this.status = new JobStatus();
this.status.setRunState(JobStatus.PREP);
}
public void testNonRunningJobsAreIgnored() throws IOException {
submitJobs(1, JobStatus.PREP, 10, 10);
submitJobs(1, JobStatus.SUCCEEDED, 10, 10);
submitJobs(1, JobStatus.FAILED, 10, 10);
submitJobs(1, JobStatus.KILLED, 10, 10);
assertNull(scheduler.assignTasks(tracker("tt1")));
advanceTime(100); // Check that we still don't assign jobs after an update
assertNull(scheduler.assignTasks(tracker("tt1")));
}
public void testSizeBasedWeight() throws Exception {
scheduler.sizeBasedWeight = true;
JobInProgress job1 = submitJob(JobStatus.RUNNING, 2, 10);
JobInProgress job2 = submitJob(JobStatus.RUNNING, 20, 1);
assertTrue(scheduler.infos.get(job2).mapFairShare >
scheduler.infos.get(job1).mapFairShare);
assertTrue(scheduler.infos.get(job1).reduceFairShare >
scheduler.infos.get(job2).reduceFairShare);
}
/**
* This test submits jobs in two pools, poolA and poolB. None of the
* jobs in poolA have maps, but this should not affect their reduce
* share.
*/
public void testPoolWeightsWhenNoMaps() throws Exception {
// Set up pools file
PrintWriter out = new PrintWriter(new FileWriter(ALLOC_FILE));
out.println("<?xml version=\"1.0\"?>");
out.println("<allocations>");
out.println("<pool name=\"poolA\">");
out.println("<weight>2.0</weight>");
out.println("</pool>");
out.println("<pool name=\"poolB\">");
out.println("<weight>1.0</weight>");
out.println("</pool>");
out.println("</allocations>");
out.close();
scheduler.getPoolManager().reloadAllocs();
// Submit jobs, advancing time in-between to make sure that they are
// all submitted at distinct times.
JobInProgress job1 = submitJob(JobStatus.RUNNING, 0, 10, "poolA");
JobInfo info1 = scheduler.infos.get(job1);
JobInProgress job2 = submitJob(JobStatus.RUNNING, 0, 10, "poolA");
JobInfo info2 = scheduler.infos.get(job2);
JobInProgress job3 = submitJob(JobStatus.RUNNING, 10, 10, "poolB");
JobInfo info3 = scheduler.infos.get(job3);
advanceTime(10);
assertEquals(0, info1.mapWeight, 0.01);
assertEquals(1.0, info1.reduceWeight, 0.01);
assertEquals(0, info2.mapWeight, 0.01);
assertEquals(1.0, info2.reduceWeight, 0.01);
assertEquals(1.0, info3.mapWeight, 0.01);
assertEquals(1.0, info3.reduceWeight, 0.01);
assertEquals(0, info1.mapFairShare, 0.01);
assertEquals(1.33, info1.reduceFairShare, 0.01);
assertEquals(0, info2.mapFairShare, 0.01);
assertEquals(1.33, info2.reduceFairShare, 0.01);
assertEquals(4, info3.mapFairShare, 0.01);
assertEquals(1.33, info3.reduceFairShare, 0.01);
}