下面列出了org.apache.hadoop.mapreduce.Job#getTrackingURL ( ) 实例代码,或者点击链接到github查看源代码,也可以在右侧发表评论。
protected Job runFailingMapperJob()
throws IOException, InterruptedException, ClassNotFoundException {
Configuration myConf = new Configuration(mrCluster.getConfig());
myConf.setInt(MRJobConfig.NUM_MAPS, 1);
myConf.setInt(MRJobConfig.MAP_MAX_ATTEMPTS, 2); //reduce the number of attempts
Job job = Job.getInstance(myConf);
job.setJarByClass(FailingMapper.class);
job.setJobName("failmapper");
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Text.class);
job.setInputFormatClass(RandomInputFormat.class);
job.setOutputFormatClass(TextOutputFormat.class);
job.setMapperClass(FailingMapper.class);
job.setNumReduceTasks(0);
FileOutputFormat.setOutputPath(job, new Path(OUTPUT_ROOT_DIR,
"failmapper-output"));
job.addFileToClassPath(APP_JAR); // The AppMaster jar itself.
job.submit();
String trackingUrl = job.getTrackingURL();
String jobId = job.getJobID().toString();
boolean succeeded = job.waitForCompletion(true);
Assert.assertFalse(succeeded);
Assert.assertTrue("Tracking URL was " + trackingUrl +
" but didn't Match Job ID " + jobId ,
trackingUrl.endsWith(jobId.substring(jobId.lastIndexOf("_")) + "/"));
return job;
}
protected Job runFailingMapperJob()
throws IOException, InterruptedException, ClassNotFoundException {
Configuration myConf = new Configuration(mrCluster.getConfig());
myConf.setInt(MRJobConfig.NUM_MAPS, 1);
myConf.setInt(MRJobConfig.MAP_MAX_ATTEMPTS, 2); //reduce the number of attempts
Job job = Job.getInstance(myConf);
job.setJarByClass(FailingMapper.class);
job.setJobName("failmapper");
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Text.class);
job.setInputFormatClass(RandomInputFormat.class);
job.setOutputFormatClass(TextOutputFormat.class);
job.setMapperClass(FailingMapper.class);
job.setNumReduceTasks(0);
FileOutputFormat.setOutputPath(job, new Path(OUTPUT_ROOT_DIR,
"failmapper-output"));
job.addFileToClassPath(APP_JAR); // The AppMaster jar itself.
job.submit();
String trackingUrl = job.getTrackingURL();
String jobId = job.getJobID().toString();
boolean succeeded = job.waitForCompletion(true);
Assert.assertFalse(succeeded);
Assert.assertTrue("Tracking URL was " + trackingUrl +
" but didn't Match Job ID " + jobId ,
trackingUrl.endsWith(jobId.substring(jobId.lastIndexOf("_")) + "/"));
return job;
}
@Test (timeout = 60000)
public void testMRRSleepJob() throws IOException, InterruptedException,
ClassNotFoundException {
LOG.info("\n\n\nStarting testMRRSleepJob().");
if (!(new File(MiniTezCluster.APPJAR)).exists()) {
LOG.info("MRAppJar " + MiniTezCluster.APPJAR
+ " not found. Not running test.");
return;
}
Configuration sleepConf = new Configuration(mrrTezCluster.getConfig());
MRRSleepJob sleepJob = new MRRSleepJob();
sleepJob.setConf(sleepConf);
Job job = sleepJob.createJob(1, 1, 1, 1, 1,
1, 1, 1, 1, 1);
job.setJarByClass(MRRSleepJob.class);
job.setMaxMapAttempts(1); // speed up failures
job.submit();
String trackingUrl = job.getTrackingURL();
String jobId = job.getJobID().toString();
boolean succeeded = job.waitForCompletion(true);
Assert.assertTrue(succeeded);
Assert.assertEquals(JobStatus.State.SUCCEEDED, job.getJobState());
Assert.assertTrue("Tracking URL was " + trackingUrl +
" but didn't Match Job ID " + jobId ,
trackingUrl.endsWith(jobId.substring(jobId.lastIndexOf("_")) + "/"));
// FIXME once counters and task progress can be obtained properly
// TODO use dag client to test counters and task progress?
// what about completed jobs?
}
@Test (timeout = 60000)
public void testMRRSleepJob() throws IOException, InterruptedException,
ClassNotFoundException {
LOG.info("\n\n\nStarting testMRRSleepJob().");
if (!(new File(MiniTezCluster.APPJAR)).exists()) {
LOG.info("MRAppJar " + MiniTezCluster.APPJAR
+ " not found. Not running test.");
return;
}
Configuration sleepConf = new Configuration(mrrTezCluster.getConfig());
MRRSleepJob sleepJob = new MRRSleepJob();
sleepJob.setConf(sleepConf);
Job job = sleepJob.createJob(1, 1, 1, 1, 1,
1, 1, 1, 1, 1);
job.setJarByClass(MRRSleepJob.class);
job.setMaxMapAttempts(1); // speed up failures
job.submit();
String trackingUrl = job.getTrackingURL();
String jobId = job.getJobID().toString();
boolean succeeded = job.waitForCompletion(true);
Assert.assertTrue(succeeded);
Assert.assertEquals(JobStatus.State.SUCCEEDED, job.getJobState());
// There's one bug in YARN that there may be some suffix at the end of trackingURL (YARN-2246)
// After TEZ-1961, the tracking will change from http://localhost:53419/proxy/application_1430963524753_0005
// to http://localhost:53419/proxy/application_1430963524753_0005/ui/
// So here use String#contains to verify.
Assert.assertTrue("Tracking URL was " + trackingUrl +
" but didn't Match Job ID " + jobId ,
trackingUrl.contains(jobId.substring(jobId.indexOf("_"))));
// FIXME once counters and task progress can be obtained properly
// TODO use dag client to test counters and task progress?
// what about completed jobs?
}
@Test
public void testRedirect() throws Exception {
Configuration conf = new YarnConfiguration();
conf.set(MRConfig.FRAMEWORK_NAME, MRConfig.YARN_FRAMEWORK_NAME);
conf.set(YarnConfiguration.RM_ADDRESS, RMADDRESS);
conf.set(JHAdminConfig.MR_HISTORY_ADDRESS, HSHOSTADDRESS);
// Start the RM.
RMService rmService = new RMService("test");
rmService.init(conf);
rmService.start();
// Start the AM.
AMService amService = new AMService();
amService.init(conf);
amService.start(conf);
// Start the HS.
HistoryService historyService = new HistoryService();
historyService.init(conf);
historyService.start(conf);
LOG.info("services started");
Cluster cluster = new Cluster(conf);
org.apache.hadoop.mapreduce.JobID jobID =
new org.apache.hadoop.mapred.JobID("201103121733", 1);
org.apache.hadoop.mapreduce.Counters counters =
cluster.getJob(jobID).getCounters();
validateCounters(counters);
Assert.assertTrue(amContact);
LOG.info("Sleeping for 5 seconds before stop for" +
" the client socket to not get EOF immediately..");
Thread.sleep(5000);
//bring down the AM service
amService.stop();
LOG.info("Sleeping for 5 seconds after stop for" +
" the server to exit cleanly..");
Thread.sleep(5000);
amRestarting = true;
// Same client
//results are returned from fake (not started job)
counters = cluster.getJob(jobID).getCounters();
Assert.assertEquals(0, counters.countCounters());
Job job = cluster.getJob(jobID);
org.apache.hadoop.mapreduce.TaskID taskId =
new org.apache.hadoop.mapreduce.TaskID(jobID, TaskType.MAP, 0);
TaskAttemptID tId = new TaskAttemptID(taskId, 0);
//invoke all methods to check that no exception is thrown
job.killJob();
job.killTask(tId);
job.failTask(tId);
job.getTaskCompletionEvents(0, 100);
job.getStatus();
job.getTaskDiagnostics(tId);
job.getTaskReports(TaskType.MAP);
job.getTrackingURL();
amRestarting = false;
amService = new AMService();
amService.init(conf);
amService.start(conf);
amContact = false; //reset
counters = cluster.getJob(jobID).getCounters();
validateCounters(counters);
Assert.assertTrue(amContact);
// Stop the AM. It is not even restarting. So it should be treated as
// completed.
amService.stop();
// Same client
counters = cluster.getJob(jobID).getCounters();
validateCounters(counters);
Assert.assertTrue(hsContact);
rmService.stop();
historyService.stop();
}
private void testSleepJobInternal(boolean useRemoteJar) throws Exception {
LOG.info("\n\n\nStarting testSleepJob: useRemoteJar=" + useRemoteJar);
if (!(new File(MiniMRYarnCluster.APPJAR)).exists()) {
LOG.info("MRAppJar " + MiniMRYarnCluster.APPJAR
+ " not found. Not running test.");
return;
}
Configuration sleepConf = new Configuration(mrCluster.getConfig());
// set master address to local to test that local mode applied iff framework == local
sleepConf.set(MRConfig.MASTER_ADDRESS, "local");
SleepJob sleepJob = new SleepJob();
sleepJob.setConf(sleepConf);
// job with 3 maps (10s) and numReduces reduces (5s), 1 "record" each:
Job job = sleepJob.createJob(3, numSleepReducers, 10000, 1, 5000, 1);
job.addFileToClassPath(APP_JAR); // The AppMaster jar itself.
if (useRemoteJar) {
final Path localJar = new Path(
ClassUtil.findContainingJar(SleepJob.class));
ConfigUtil.addLink(job.getConfiguration(), "/jobjars",
localFs.makeQualified(localJar.getParent()).toUri());
job.setJar("viewfs:///jobjars/" + localJar.getName());
} else {
job.setJarByClass(SleepJob.class);
}
job.setMaxMapAttempts(1); // speed up failures
job.submit();
String trackingUrl = job.getTrackingURL();
String jobId = job.getJobID().toString();
boolean succeeded = job.waitForCompletion(true);
Assert.assertTrue(succeeded);
Assert.assertEquals(JobStatus.State.SUCCEEDED, job.getJobState());
Assert.assertTrue("Tracking URL was " + trackingUrl +
" but didn't Match Job ID " + jobId ,
trackingUrl.endsWith(jobId.substring(jobId.lastIndexOf("_")) + "/"));
verifySleepJobCounters(job);
verifyTaskProgress(job);
// TODO later: add explicit "isUber()" checks of some sort (extend
// JobStatus?)--compare against MRJobConfig.JOB_UBERTASK_ENABLE value
}
@Test (timeout = 60000)
public void testRandomWriter() throws IOException, InterruptedException,
ClassNotFoundException {
LOG.info("\n\n\nStarting testRandomWriter().");
if (!(new File(MiniMRYarnCluster.APPJAR)).exists()) {
LOG.info("MRAppJar " + MiniMRYarnCluster.APPJAR
+ " not found. Not running test.");
return;
}
RandomTextWriterJob randomWriterJob = new RandomTextWriterJob();
mrCluster.getConfig().set(RandomTextWriterJob.TOTAL_BYTES, "3072");
mrCluster.getConfig().set(RandomTextWriterJob.BYTES_PER_MAP, "1024");
Job job = randomWriterJob.createJob(mrCluster.getConfig());
Path outputDir = new Path(OUTPUT_ROOT_DIR, "random-output");
FileOutputFormat.setOutputPath(job, outputDir);
job.setSpeculativeExecution(false);
job.addFileToClassPath(APP_JAR); // The AppMaster jar itself.
job.setJarByClass(RandomTextWriterJob.class);
job.setMaxMapAttempts(1); // speed up failures
job.submit();
String trackingUrl = job.getTrackingURL();
String jobId = job.getJobID().toString();
boolean succeeded = job.waitForCompletion(true);
Assert.assertTrue(succeeded);
Assert.assertEquals(JobStatus.State.SUCCEEDED, job.getJobState());
Assert.assertTrue("Tracking URL was " + trackingUrl +
" but didn't Match Job ID " + jobId ,
trackingUrl.endsWith(jobId.substring(jobId.lastIndexOf("_")) + "/"));
// Make sure there are three files in the output-dir
RemoteIterator<FileStatus> iterator =
FileContext.getFileContext(mrCluster.getConfig()).listStatus(
outputDir);
int count = 0;
while (iterator.hasNext()) {
FileStatus file = iterator.next();
if (!file.getPath().getName()
.equals(FileOutputCommitter.SUCCEEDED_FILE_NAME)) {
count++;
}
}
Assert.assertEquals("Number of part files is wrong!", 3, count);
verifyRandomWriterCounters(job);
// TODO later: add explicit "isUber()" checks of some sort
}
public void _testDistributedCache(String jobJarPath) throws Exception {
if (!(new File(MiniMRYarnCluster.APPJAR)).exists()) {
LOG.info("MRAppJar " + MiniMRYarnCluster.APPJAR
+ " not found. Not running test.");
return;
}
// Create a temporary file of length 1.
Path first = createTempFile("distributed.first", "x");
// Create two jars with a single file inside them.
Path second =
makeJar(new Path(TEST_ROOT_DIR, "distributed.second.jar"), 2);
Path third =
makeJar(new Path(TEST_ROOT_DIR, "distributed.third.jar"), 3);
Path fourth =
makeJar(new Path(TEST_ROOT_DIR, "distributed.fourth.jar"), 4);
Job job = Job.getInstance(mrCluster.getConfig());
// Set the job jar to a new "dummy" jar so we can check that its extracted
// properly
job.setJar(jobJarPath);
// Because the job jar is a "dummy" jar, we need to include the jar with
// DistributedCacheChecker or it won't be able to find it
Path distributedCacheCheckerJar = new Path(
JarFinder.getJar(DistributedCacheChecker.class));
job.addFileToClassPath(distributedCacheCheckerJar.makeQualified(
localFs.getUri(), distributedCacheCheckerJar.getParent()));
job.setMapperClass(DistributedCacheChecker.class);
job.setOutputFormatClass(NullOutputFormat.class);
FileInputFormat.setInputPaths(job, first);
// Creates the Job Configuration
job.addCacheFile(
new URI(first.toUri().toString() + "#distributed.first.symlink"));
job.addFileToClassPath(second);
// The AppMaster jar itself
job.addFileToClassPath(
APP_JAR.makeQualified(localFs.getUri(), APP_JAR.getParent()));
job.addArchiveToClassPath(third);
job.addCacheArchive(fourth.toUri());
job.setMaxMapAttempts(1); // speed up failures
job.submit();
String trackingUrl = job.getTrackingURL();
String jobId = job.getJobID().toString();
Assert.assertTrue(job.waitForCompletion(false));
Assert.assertTrue("Tracking URL was " + trackingUrl +
" but didn't Match Job ID " + jobId ,
trackingUrl.endsWith(jobId.substring(jobId.lastIndexOf("_")) + "/"));
}
@Test
public void testRedirect() throws Exception {
Configuration conf = new YarnConfiguration();
conf.set(MRConfig.FRAMEWORK_NAME, MRConfig.YARN_FRAMEWORK_NAME);
conf.set(YarnConfiguration.RM_ADDRESS, RMADDRESS);
conf.set(JHAdminConfig.MR_HISTORY_ADDRESS, HSHOSTADDRESS);
// Start the RM.
RMService rmService = new RMService("test");
rmService.init(conf);
rmService.start();
// Start the AM.
AMService amService = new AMService();
amService.init(conf);
amService.start(conf);
// Start the HS.
HistoryService historyService = new HistoryService();
historyService.init(conf);
historyService.start(conf);
LOG.info("services started");
Cluster cluster = new Cluster(conf);
org.apache.hadoop.mapreduce.JobID jobID =
new org.apache.hadoop.mapred.JobID("201103121733", 1);
org.apache.hadoop.mapreduce.Counters counters =
cluster.getJob(jobID).getCounters();
validateCounters(counters);
Assert.assertTrue(amContact);
LOG.info("Sleeping for 5 seconds before stop for" +
" the client socket to not get EOF immediately..");
Thread.sleep(5000);
//bring down the AM service
amService.stop();
LOG.info("Sleeping for 5 seconds after stop for" +
" the server to exit cleanly..");
Thread.sleep(5000);
amRestarting = true;
// Same client
//results are returned from fake (not started job)
counters = cluster.getJob(jobID).getCounters();
Assert.assertEquals(0, counters.countCounters());
Job job = cluster.getJob(jobID);
org.apache.hadoop.mapreduce.TaskID taskId =
new org.apache.hadoop.mapreduce.TaskID(jobID, TaskType.MAP, 0);
TaskAttemptID tId = new TaskAttemptID(taskId, 0);
//invoke all methods to check that no exception is thrown
job.killJob();
job.killTask(tId);
job.failTask(tId);
job.getTaskCompletionEvents(0, 100);
job.getStatus();
job.getTaskDiagnostics(tId);
job.getTaskReports(TaskType.MAP);
job.getTrackingURL();
amRestarting = false;
amService = new AMService();
amService.init(conf);
amService.start(conf);
amContact = false; //reset
counters = cluster.getJob(jobID).getCounters();
validateCounters(counters);
Assert.assertTrue(amContact);
// Stop the AM. It is not even restarting. So it should be treated as
// completed.
amService.stop();
// Same client
counters = cluster.getJob(jobID).getCounters();
validateCounters(counters);
Assert.assertTrue(hsContact);
rmService.stop();
historyService.stop();
}
private void testSleepJobInternal(boolean useRemoteJar) throws Exception {
LOG.info("\n\n\nStarting testSleepJob: useRemoteJar=" + useRemoteJar);
if (!(new File(MiniMRYarnCluster.APPJAR)).exists()) {
LOG.info("MRAppJar " + MiniMRYarnCluster.APPJAR
+ " not found. Not running test.");
return;
}
Configuration sleepConf = new Configuration(mrCluster.getConfig());
// set master address to local to test that local mode applied iff framework == local
sleepConf.set(MRConfig.MASTER_ADDRESS, "local");
SleepJob sleepJob = new SleepJob();
sleepJob.setConf(sleepConf);
// job with 3 maps (10s) and numReduces reduces (5s), 1 "record" each:
Job job = sleepJob.createJob(3, numSleepReducers, 10000, 1, 5000, 1);
job.addFileToClassPath(APP_JAR); // The AppMaster jar itself.
if (useRemoteJar) {
final Path localJar = new Path(
ClassUtil.findContainingJar(SleepJob.class));
ConfigUtil.addLink(job.getConfiguration(), "/jobjars",
localFs.makeQualified(localJar.getParent()).toUri());
job.setJar("viewfs:///jobjars/" + localJar.getName());
} else {
job.setJarByClass(SleepJob.class);
}
job.setMaxMapAttempts(1); // speed up failures
job.submit();
String trackingUrl = job.getTrackingURL();
String jobId = job.getJobID().toString();
boolean succeeded = job.waitForCompletion(true);
Assert.assertTrue(succeeded);
Assert.assertEquals(JobStatus.State.SUCCEEDED, job.getJobState());
Assert.assertTrue("Tracking URL was " + trackingUrl +
" but didn't Match Job ID " + jobId ,
trackingUrl.endsWith(jobId.substring(jobId.lastIndexOf("_")) + "/"));
verifySleepJobCounters(job);
verifyTaskProgress(job);
// TODO later: add explicit "isUber()" checks of some sort (extend
// JobStatus?)--compare against MRJobConfig.JOB_UBERTASK_ENABLE value
}
@Test (timeout = 60000)
public void testRandomWriter() throws IOException, InterruptedException,
ClassNotFoundException {
LOG.info("\n\n\nStarting testRandomWriter().");
if (!(new File(MiniMRYarnCluster.APPJAR)).exists()) {
LOG.info("MRAppJar " + MiniMRYarnCluster.APPJAR
+ " not found. Not running test.");
return;
}
RandomTextWriterJob randomWriterJob = new RandomTextWriterJob();
mrCluster.getConfig().set(RandomTextWriterJob.TOTAL_BYTES, "3072");
mrCluster.getConfig().set(RandomTextWriterJob.BYTES_PER_MAP, "1024");
Job job = randomWriterJob.createJob(mrCluster.getConfig());
Path outputDir = new Path(OUTPUT_ROOT_DIR, "random-output");
FileOutputFormat.setOutputPath(job, outputDir);
job.setSpeculativeExecution(false);
job.addFileToClassPath(APP_JAR); // The AppMaster jar itself.
job.setJarByClass(RandomTextWriterJob.class);
job.setMaxMapAttempts(1); // speed up failures
job.submit();
String trackingUrl = job.getTrackingURL();
String jobId = job.getJobID().toString();
boolean succeeded = job.waitForCompletion(true);
Assert.assertTrue(succeeded);
Assert.assertEquals(JobStatus.State.SUCCEEDED, job.getJobState());
Assert.assertTrue("Tracking URL was " + trackingUrl +
" but didn't Match Job ID " + jobId ,
trackingUrl.endsWith(jobId.substring(jobId.lastIndexOf("_")) + "/"));
// Make sure there are three files in the output-dir
RemoteIterator<FileStatus> iterator =
FileContext.getFileContext(mrCluster.getConfig()).listStatus(
outputDir);
int count = 0;
while (iterator.hasNext()) {
FileStatus file = iterator.next();
if (!file.getPath().getName()
.equals(FileOutputCommitter.SUCCEEDED_FILE_NAME)) {
count++;
}
}
Assert.assertEquals("Number of part files is wrong!", 3, count);
verifyRandomWriterCounters(job);
// TODO later: add explicit "isUber()" checks of some sort
}
public void _testDistributedCache(String jobJarPath) throws Exception {
if (!(new File(MiniMRYarnCluster.APPJAR)).exists()) {
LOG.info("MRAppJar " + MiniMRYarnCluster.APPJAR
+ " not found. Not running test.");
return;
}
// Create a temporary file of length 1.
Path first = createTempFile("distributed.first", "x");
// Create two jars with a single file inside them.
Path second =
makeJar(new Path(TEST_ROOT_DIR, "distributed.second.jar"), 2);
Path third =
makeJar(new Path(TEST_ROOT_DIR, "distributed.third.jar"), 3);
Path fourth =
makeJar(new Path(TEST_ROOT_DIR, "distributed.fourth.jar"), 4);
Job job = Job.getInstance(mrCluster.getConfig());
// Set the job jar to a new "dummy" jar so we can check that its extracted
// properly
job.setJar(jobJarPath);
// Because the job jar is a "dummy" jar, we need to include the jar with
// DistributedCacheChecker or it won't be able to find it
Path distributedCacheCheckerJar = new Path(
JarFinder.getJar(DistributedCacheChecker.class));
job.addFileToClassPath(distributedCacheCheckerJar.makeQualified(
localFs.getUri(), distributedCacheCheckerJar.getParent()));
job.setMapperClass(DistributedCacheChecker.class);
job.setOutputFormatClass(NullOutputFormat.class);
FileInputFormat.setInputPaths(job, first);
// Creates the Job Configuration
job.addCacheFile(
new URI(first.toUri().toString() + "#distributed.first.symlink"));
job.addFileToClassPath(second);
// The AppMaster jar itself
job.addFileToClassPath(
APP_JAR.makeQualified(localFs.getUri(), APP_JAR.getParent()));
job.addArchiveToClassPath(third);
job.addCacheArchive(fourth.toUri());
job.setMaxMapAttempts(1); // speed up failures
job.submit();
String trackingUrl = job.getTrackingURL();
String jobId = job.getJobID().toString();
Assert.assertTrue(job.waitForCompletion(false));
Assert.assertTrue("Tracking URL was " + trackingUrl +
" but didn't Match Job ID " + jobId ,
trackingUrl.endsWith(jobId.substring(jobId.lastIndexOf("_")) + "/"));
}
@Test (timeout = 60000)
public void testRandomWriter() throws IOException, InterruptedException,
ClassNotFoundException {
LOG.info("\n\n\nStarting testRandomWriter().");
if (!(new File(MiniTezCluster.APPJAR)).exists()) {
LOG.info("MRAppJar " + MiniTezCluster.APPJAR
+ " not found. Not running test.");
return;
}
RandomTextWriterJob randomWriterJob = new RandomTextWriterJob();
mrrTezCluster.getConfig().set(RandomTextWriterJob.TOTAL_BYTES, "3072");
mrrTezCluster.getConfig().set(RandomTextWriterJob.BYTES_PER_MAP, "1024");
Job job = randomWriterJob.createJob(mrrTezCluster.getConfig());
Path outputDir = new Path(OUTPUT_ROOT_DIR, "random-output");
FileOutputFormat.setOutputPath(job, outputDir);
job.setSpeculativeExecution(false);
job.setJarByClass(RandomTextWriterJob.class);
job.setMaxMapAttempts(1); // speed up failures
job.submit();
String trackingUrl = job.getTrackingURL();
String jobId = job.getJobID().toString();
boolean succeeded = job.waitForCompletion(true);
Assert.assertTrue(succeeded);
Assert.assertEquals(JobStatus.State.SUCCEEDED, job.getJobState());
Assert.assertTrue("Tracking URL was " + trackingUrl +
" but didn't Match Job ID " + jobId ,
trackingUrl.endsWith(jobId.substring(jobId.lastIndexOf("_")) + "/"));
// Make sure there are three files in the output-dir
RemoteIterator<FileStatus> iterator =
FileContext.getFileContext(mrrTezCluster.getConfig()).listStatus(
outputDir);
int count = 0;
while (iterator.hasNext()) {
FileStatus file = iterator.next();
if (!file.getPath().getName()
.equals(FileOutputCommitter.SUCCEEDED_FILE_NAME)) {
count++;
}
}
Assert.assertEquals("Number of part files is wrong!", 3, count);
}
@Test (timeout = 60000)
public void testMRRSleepJobWithCompression() throws IOException,
InterruptedException, ClassNotFoundException {
LOG.info("\n\n\nStarting testMRRSleepJobWithCompression().");
if (!(new File(MiniTezCluster.APPJAR)).exists()) {
LOG.info("MRAppJar " + MiniTezCluster.APPJAR
+ " not found. Not running test.");
return;
}
Configuration sleepConf = new Configuration(mrrTezCluster.getConfig());
MRRSleepJob sleepJob = new MRRSleepJob();
sleepJob.setConf(sleepConf);
Job job = sleepJob.createJob(1, 1, 2, 1, 1,
1, 1, 1, 1, 1);
job.setJarByClass(MRRSleepJob.class);
job.setMaxMapAttempts(1); // speed up failures
// enable compression
job.getConfiguration().setBoolean(MRJobConfig.MAP_OUTPUT_COMPRESS, true);
job.getConfiguration().set(MRJobConfig.MAP_OUTPUT_COMPRESS_CODEC,
DefaultCodec.class.getName());
job.submit();
String trackingUrl = job.getTrackingURL();
String jobId = job.getJobID().toString();
boolean succeeded = job.waitForCompletion(true);
Assert.assertTrue(succeeded);
Assert.assertEquals(JobStatus.State.SUCCEEDED, job.getJobState());
Assert.assertTrue("Tracking URL was " + trackingUrl +
" but didn't Match Job ID " + jobId ,
trackingUrl.endsWith(jobId.substring(jobId.lastIndexOf("_")) + "/"));
// FIXME once counters and task progress can be obtained properly
// TODO use dag client to test counters and task progress?
// what about completed jobs?
}
@Test (timeout = 60000)
public void testRandomWriter() throws IOException, InterruptedException,
ClassNotFoundException {
LOG.info("\n\n\nStarting testRandomWriter().");
if (!(new File(MiniTezCluster.APPJAR)).exists()) {
LOG.info("MRAppJar " + MiniTezCluster.APPJAR
+ " not found. Not running test.");
return;
}
RandomTextWriterJob randomWriterJob = new RandomTextWriterJob();
mrrTezCluster.getConfig().set(RandomTextWriterJob.TOTAL_BYTES, "3072");
mrrTezCluster.getConfig().set(RandomTextWriterJob.BYTES_PER_MAP, "1024");
Job job = randomWriterJob.createJob(mrrTezCluster.getConfig());
Path outputDir = new Path(OUTPUT_ROOT_DIR, "random-output");
FileOutputFormat.setOutputPath(job, outputDir);
job.setSpeculativeExecution(false);
job.setJarByClass(RandomTextWriterJob.class);
job.setMaxMapAttempts(1); // speed up failures
job.submit();
String trackingUrl = job.getTrackingURL();
String jobId = job.getJobID().toString();
boolean succeeded = job.waitForCompletion(true);
Assert.assertTrue(succeeded);
Assert.assertEquals(JobStatus.State.SUCCEEDED, job.getJobState());
Assert.assertTrue("Tracking URL was " + trackingUrl +
" but didn't Match Job ID " + jobId ,
trackingUrl.contains(jobId.substring(jobId.indexOf("_"))));
// Make sure there are three files in the output-dir
RemoteIterator<FileStatus> iterator =
FileContext.getFileContext(mrrTezCluster.getConfig()).listStatus(
outputDir);
int count = 0;
while (iterator.hasNext()) {
FileStatus file = iterator.next();
if (!file.getPath().getName()
.equals(FileOutputCommitter.SUCCEEDED_FILE_NAME)) {
count++;
}
}
Assert.assertEquals("Number of part files is wrong!", 3, count);
}
@Test (timeout = 60000)
public void testMRRSleepJobWithCompression() throws IOException,
InterruptedException, ClassNotFoundException {
LOG.info("\n\n\nStarting testMRRSleepJobWithCompression().");
if (!(new File(MiniTezCluster.APPJAR)).exists()) {
LOG.info("MRAppJar " + MiniTezCluster.APPJAR
+ " not found. Not running test.");
return;
}
Configuration sleepConf = new Configuration(mrrTezCluster.getConfig());
MRRSleepJob sleepJob = new MRRSleepJob();
sleepJob.setConf(sleepConf);
Job job = sleepJob.createJob(1, 1, 2, 1, 1,
1, 1, 1, 1, 1);
job.setJarByClass(MRRSleepJob.class);
job.setMaxMapAttempts(1); // speed up failures
// enable compression
job.getConfiguration().setBoolean(MRJobConfig.MAP_OUTPUT_COMPRESS, true);
job.getConfiguration().set(MRJobConfig.MAP_OUTPUT_COMPRESS_CODEC,
DefaultCodec.class.getName());
job.submit();
String trackingUrl = job.getTrackingURL();
String jobId = job.getJobID().toString();
boolean succeeded = job.waitForCompletion(true);
Assert.assertTrue(succeeded);
Assert.assertEquals(JobStatus.State.SUCCEEDED, job.getJobState());
Assert.assertTrue("Tracking URL was " + trackingUrl +
" but didn't Match Job ID " + jobId ,
trackingUrl.contains(jobId.substring(jobId.indexOf("_"))));
// FIXME once counters and task progress can be obtained properly
// TODO use dag client to test counters and task progress?
// what about completed jobs?
}