下面列出了org.apache.hadoop.mapreduce.Job#killJob ( ) 实例代码,或者点击链接到github查看源代码,也可以在右侧发表评论。
@Override
public void cancel(String jobId) throws IOException {
JobID id = JobID.forName(jobId);
Cluster cluster = new Cluster(this.getConf());
try {
Job job = cluster.getJob(id);
if (job == null) {
LOG.error("No job found for " + id);
// should we throw exception
return;
}
if (job.isComplete() || job.isRetired()) {
return;
}
job.killJob();
LOG.debug("Killed copy job " + id);
} catch (InterruptedException e) {
throw new IOException(e);
}
}
private void submitAndWait(Job job) throws ClassNotFoundException, IOException, InterruptedException {
job.submit();
MRCompactor.addRunningHadoopJob(this.dataset, job);
LOG.info(String.format("MR job submitted for dataset %s, input %s, url: %s", this.dataset, getInputPaths(),
job.getTrackingURL()));
while (!job.isComplete()) {
if (this.policy == Policy.ABORT_ASAP) {
LOG.info(String.format(
"MR job for dataset %s, input %s killed due to input data incompleteness." + " Will try again later",
this.dataset, getInputPaths()));
job.killJob();
return;
}
Thread.sleep(MR_JOB_CHECK_COMPLETE_INTERVAL_MS);
}
if (!job.isSuccessful()) {
throw new RuntimeException(String.format("MR job failed for topic %s, input %s, url: %s", this.dataset,
getInputPaths(), job.getTrackingURL()));
}
}
@Override
public void cancel() throws IOException {
try {
for (Map.Entry<Dataset, Job> entry : MRCompactor.RUNNING_MR_JOBS.entrySet()) {
Job hadoopJob = entry.getValue();
if (!hadoopJob.isComplete()) {
LOG.info(String.format("Killing hadoop job %s for dataset %s", hadoopJob.getJobID(), entry.getKey()));
hadoopJob.killJob();
}
}
} finally {
try {
ExecutorsUtils.shutdownExecutorService(this.jobExecutor, Optional.of(LOG), 0, TimeUnit.NANOSECONDS);
} finally {
if (this.verifier.isPresent()) {
this.verifier.get().closeNow();
}
}
}
}
private void testKilledJob(String fileName,
Class<? extends OutputFormat> output, String[] exclude) throws Exception {
Path outDir = getNewOutputDir();
Job job = MapReduceTestUtil.createKillJob(conf, outDir, inDir);
job.setOutputFormatClass(output);
job.submit();
// wait for the setup to be completed
while (job.setupProgress() != 1.0f) {
UtilsForTests.waitFor(100);
}
job.killJob(); // kill the job
assertFalse("Job did not get kill", job.waitForCompletion(true));
if (fileName != null) {
Path testFile = new Path(outDir, fileName);
assertTrue("File " + testFile + " missing for job " + job.getJobID(), fs
.exists(testFile));
}
// check if the files from the missing set exists
for (String ex : exclude) {
Path file = new Path(outDir, ex);
assertFalse("File " + file + " should not be present for killed job "
+ job.getJobID(), fs.exists(file));
}
}
private void testKilledJob(String fileName,
Class<? extends OutputFormat> output, String[] exclude) throws Exception {
Path outDir = getNewOutputDir();
Job job = MapReduceTestUtil.createKillJob(conf, outDir, inDir);
job.setOutputFormatClass(output);
job.submit();
// wait for the setup to be completed
while (job.setupProgress() != 1.0f) {
UtilsForTests.waitFor(100);
}
job.killJob(); // kill the job
assertFalse("Job did not get kill", job.waitForCompletion(true));
if (fileName != null) {
Path testFile = new Path(outDir, fileName);
assertTrue("File " + testFile + " missing for job " + job.getJobID(), fs
.exists(testFile));
}
// check if the files from the missing set exists
for (String ex : exclude) {
Path file = new Path(outDir, ex);
assertFalse("File " + file + " should not be present for killed job "
+ job.getJobID(), fs.exists(file));
}
}
private void stopAllExistingMRJobs(String blurEnv, Configuration conf) throws YarnException, IOException,
InterruptedException {
Cluster cluster = new Cluster(conf);
JobStatus[] allJobStatuses = cluster.getAllJobStatuses();
for (JobStatus jobStatus : allJobStatuses) {
if (jobStatus.isJobComplete()) {
continue;
}
String jobFile = jobStatus.getJobFile();
JobID jobID = jobStatus.getJobID();
Job job = cluster.getJob(jobID);
FileSystem fileSystem = FileSystem.get(job.getConfiguration());
Configuration configuration = new Configuration(false);
Path path = new Path(jobFile);
Path makeQualified = path.makeQualified(fileSystem.getUri(), fileSystem.getWorkingDirectory());
if (hasReadAccess(fileSystem, makeQualified)) {
try (FSDataInputStream in = fileSystem.open(makeQualified)) {
configuration.addResource(copy(in));
}
String jobBlurEnv = configuration.get(BLUR_ENV);
LOG.info("Checking job [{0}] has env [{1}] current env set to [{2}]", jobID, jobBlurEnv, blurEnv);
if (blurEnv.equals(jobBlurEnv)) {
LOG.info("Killing running job [{0}]", jobID);
job.killJob();
}
}
}
}
@Test
public void testRedirect() throws Exception {
Configuration conf = new YarnConfiguration();
conf.set(MRConfig.FRAMEWORK_NAME, MRConfig.YARN_FRAMEWORK_NAME);
conf.set(YarnConfiguration.RM_ADDRESS, RMADDRESS);
conf.set(JHAdminConfig.MR_HISTORY_ADDRESS, HSHOSTADDRESS);
// Start the RM.
RMService rmService = new RMService("test");
rmService.init(conf);
rmService.start();
// Start the AM.
AMService amService = new AMService();
amService.init(conf);
amService.start(conf);
// Start the HS.
HistoryService historyService = new HistoryService();
historyService.init(conf);
historyService.start(conf);
LOG.info("services started");
Cluster cluster = new Cluster(conf);
org.apache.hadoop.mapreduce.JobID jobID =
new org.apache.hadoop.mapred.JobID("201103121733", 1);
org.apache.hadoop.mapreduce.Counters counters =
cluster.getJob(jobID).getCounters();
validateCounters(counters);
Assert.assertTrue(amContact);
LOG.info("Sleeping for 5 seconds before stop for" +
" the client socket to not get EOF immediately..");
Thread.sleep(5000);
//bring down the AM service
amService.stop();
LOG.info("Sleeping for 5 seconds after stop for" +
" the server to exit cleanly..");
Thread.sleep(5000);
amRestarting = true;
// Same client
//results are returned from fake (not started job)
counters = cluster.getJob(jobID).getCounters();
Assert.assertEquals(0, counters.countCounters());
Job job = cluster.getJob(jobID);
org.apache.hadoop.mapreduce.TaskID taskId =
new org.apache.hadoop.mapreduce.TaskID(jobID, TaskType.MAP, 0);
TaskAttemptID tId = new TaskAttemptID(taskId, 0);
//invoke all methods to check that no exception is thrown
job.killJob();
job.killTask(tId);
job.failTask(tId);
job.getTaskCompletionEvents(0, 100);
job.getStatus();
job.getTaskDiagnostics(tId);
job.getTaskReports(TaskType.MAP);
job.getTrackingURL();
amRestarting = false;
amService = new AMService();
amService.init(conf);
amService.start(conf);
amContact = false; //reset
counters = cluster.getJob(jobID).getCounters();
validateCounters(counters);
Assert.assertTrue(amContact);
// Stop the AM. It is not even restarting. So it should be treated as
// completed.
amService.stop();
// Same client
counters = cluster.getJob(jobID).getCounters();
validateCounters(counters);
Assert.assertTrue(hsContact);
rmService.stop();
historyService.stop();
}
@Test
public void testRedirect() throws Exception {
Configuration conf = new YarnConfiguration();
conf.set(MRConfig.FRAMEWORK_NAME, MRConfig.YARN_FRAMEWORK_NAME);
conf.set(YarnConfiguration.RM_ADDRESS, RMADDRESS);
conf.set(JHAdminConfig.MR_HISTORY_ADDRESS, HSHOSTADDRESS);
// Start the RM.
RMService rmService = new RMService("test");
rmService.init(conf);
rmService.start();
// Start the AM.
AMService amService = new AMService();
amService.init(conf);
amService.start(conf);
// Start the HS.
HistoryService historyService = new HistoryService();
historyService.init(conf);
historyService.start(conf);
LOG.info("services started");
Cluster cluster = new Cluster(conf);
org.apache.hadoop.mapreduce.JobID jobID =
new org.apache.hadoop.mapred.JobID("201103121733", 1);
org.apache.hadoop.mapreduce.Counters counters =
cluster.getJob(jobID).getCounters();
validateCounters(counters);
Assert.assertTrue(amContact);
LOG.info("Sleeping for 5 seconds before stop for" +
" the client socket to not get EOF immediately..");
Thread.sleep(5000);
//bring down the AM service
amService.stop();
LOG.info("Sleeping for 5 seconds after stop for" +
" the server to exit cleanly..");
Thread.sleep(5000);
amRestarting = true;
// Same client
//results are returned from fake (not started job)
counters = cluster.getJob(jobID).getCounters();
Assert.assertEquals(0, counters.countCounters());
Job job = cluster.getJob(jobID);
org.apache.hadoop.mapreduce.TaskID taskId =
new org.apache.hadoop.mapreduce.TaskID(jobID, TaskType.MAP, 0);
TaskAttemptID tId = new TaskAttemptID(taskId, 0);
//invoke all methods to check that no exception is thrown
job.killJob();
job.killTask(tId);
job.failTask(tId);
job.getTaskCompletionEvents(0, 100);
job.getStatus();
job.getTaskDiagnostics(tId);
job.getTaskReports(TaskType.MAP);
job.getTrackingURL();
amRestarting = false;
amService = new AMService();
amService.init(conf);
amService.start(conf);
amContact = false; //reset
counters = cluster.getJob(jobID).getCounters();
validateCounters(counters);
Assert.assertTrue(amContact);
// Stop the AM. It is not even restarting. So it should be treated as
// completed.
amService.stop();
// Same client
counters = cluster.getJob(jobID).getCounters();
validateCounters(counters);
Assert.assertTrue(hsContact);
rmService.stop();
historyService.stop();
}
public void testBlurOutputFormatCleanupDuringJobKillTest() throws IOException, InterruptedException,
ClassNotFoundException {
Path input = getInDir();
Path output = getOutDir();
_fileSystem.delete(input, true);
_fileSystem.delete(output, true);
// 1500 * 50 = 75,000
writeRecordsFile(new Path(input, "part1"), 1, 50, 1, 1500, "cf1");
// 100 * 5000 = 500,000
writeRecordsFile(new Path(input, "part2"), 1, 5000, 2000, 100, "cf1");
Job job = Job.getInstance(_conf, "blur index");
job.setJarByClass(BlurOutputFormatTest.class);
job.setMapperClass(CsvBlurMapper.class);
job.setInputFormatClass(TextInputFormat.class);
FileInputFormat.addInputPath(job, input);
CsvBlurMapper.addColumns(job, "cf1", "col");
Path tablePath = new Path(new Path(_root, "table"), "test");
TableDescriptor tableDescriptor = new TableDescriptor();
tableDescriptor.setShardCount(2);
tableDescriptor.setTableUri(tablePath.toString());
tableDescriptor.setName("test");
createShardDirectories(getOutDir(), 2);
BlurOutputFormat.setupJob(job, tableDescriptor);
BlurOutputFormat.setOutputPath(job, output);
BlurOutputFormat.setIndexLocally(job, false);
job.submit();
boolean killCalled = false;
while (!job.isComplete()) {
Thread.sleep(1000);
System.out.printf("Killed [" + killCalled + "] Map [%f] Reduce [%f]%n", job.mapProgress() * 100,
job.reduceProgress() * 100);
if (job.reduceProgress() > 0.7 && !killCalled) {
job.killJob();
killCalled = true;
}
}
assertFalse(job.isSuccessful());
for (int i = 0; i < tableDescriptor.getShardCount(); i++) {
Path path = new Path(output, ShardUtil.getShardName(i));
FileSystem fileSystem = path.getFileSystem(job.getConfiguration());
FileStatus[] listStatus = fileSystem.listStatus(path);
assertEquals(toString(listStatus), 0, listStatus.length);
}
}