org.apache.hadoop.mapreduce.Job#submit ( )源码实例Demo

下面列出了org.apache.hadoop.mapreduce.Job#submit ( ) 实例代码,或者点击链接到github查看源代码,也可以在右侧发表评论。

源代码1 项目: big-c   文件: TestCompressionEmulationUtils.java
/**
 * Runs a GridMix data-generation job.
 */
private static void runDataGenJob(Configuration conf, Path tempDir) 
throws IOException, ClassNotFoundException, InterruptedException {
  JobClient client = new JobClient(conf);
  
  // get the local job runner
  conf.setInt(MRJobConfig.NUM_MAPS, 1);
  
  Job job = Job.getInstance(conf);
  
  CompressionEmulationUtil.configure(job);
  job.setInputFormatClass(CustomInputFormat.class);
  
  // set the output path
  FileOutputFormat.setOutputPath(job, tempDir);
  
  // submit and wait for completion
  job.submit();
  int ret = job.waitForCompletion(true) ? 0 : 1;

  assertEquals("Job Failed", 0, ret);
}
 
源代码2 项目: hadoop   文件: TestCompressionEmulationUtils.java
/**
 * Runs a GridMix data-generation job.
 */
private static void runDataGenJob(Configuration conf, Path tempDir) 
throws IOException, ClassNotFoundException, InterruptedException {
  JobClient client = new JobClient(conf);
  
  // get the local job runner
  conf.setInt(MRJobConfig.NUM_MAPS, 1);
  
  Job job = Job.getInstance(conf);
  
  CompressionEmulationUtil.configure(job);
  job.setInputFormatClass(CustomInputFormat.class);
  
  // set the output path
  FileOutputFormat.setOutputPath(job, tempDir);
  
  // submit and wait for completion
  job.submit();
  int ret = job.waitForCompletion(true) ? 0 : 1;

  assertEquals("Job Failed", 0, ret);
}
 
private void waitForJob(Job job) throws Exception {
  job.submit();
  while (!job.isComplete()) {
    sleep(100);
  }
  if (!job.isSuccessful()) {
    throw new RuntimeException("job failed " + job.getJobName());
  }
}
 
源代码4 项目: big-c   文件: TestMRWithDistributedCache.java
private void testWithConf(Configuration conf) throws IOException,
    InterruptedException, ClassNotFoundException, URISyntaxException {
  // Create a temporary file of length 1.
  Path first = createTempFile("distributed.first", "x");
  // Create two jars with a single file inside them.
  Path second =
      makeJar(new Path(TEST_ROOT_DIR, "distributed.second.jar"), 2);
  Path third =
      makeJar(new Path(TEST_ROOT_DIR, "distributed.third.jar"), 3);
  Path fourth =
      makeJar(new Path(TEST_ROOT_DIR, "distributed.fourth.jar"), 4);


  Job job = Job.getInstance(conf);
  job.setMapperClass(DistributedCacheCheckerMapper.class);
  job.setReducerClass(DistributedCacheCheckerReducer.class);
  job.setOutputFormatClass(NullOutputFormat.class);
  FileInputFormat.setInputPaths(job, first);
  // Creates the Job Configuration
  job.addCacheFile(
    new URI(first.toUri().toString() + "#distributed.first.symlink"));
  job.addFileToClassPath(second);
  job.addArchiveToClassPath(third);
  job.addCacheArchive(fourth.toUri());
  job.setMaxMapAttempts(1); // speed up failures

  job.submit();
  assertTrue(job.waitForCompletion(false));
}
 
源代码5 项目: incubator-tez   文件: TestMRRJobs.java
@Test (timeout = 60000)
public void testFailingAttempt() throws IOException, InterruptedException,
    ClassNotFoundException {

  LOG.info("\n\n\nStarting testFailingAttempt().");

  if (!(new File(MiniTezCluster.APPJAR)).exists()) {
    LOG.info("MRAppJar " + MiniTezCluster.APPJAR
             + " not found. Not running test.");
    return;
  }

  Configuration sleepConf = new Configuration(mrrTezCluster.getConfig());

  MRRSleepJob sleepJob = new MRRSleepJob();
  sleepJob.setConf(sleepConf);

  Job job = sleepJob.createJob(1, 1, 1, 1, 1,
      1, 1, 1, 1, 1);

  job.setJarByClass(MRRSleepJob.class);
  job.setMaxMapAttempts(3); // speed up failures
  job.getConfiguration().setBoolean(MRRSleepJob.MAP_THROW_ERROR, true);
  job.getConfiguration().set(MRRSleepJob.MAP_ERROR_TASK_IDS, "0");

  job.submit();
  boolean succeeded = job.waitForCompletion(true);
  Assert.assertTrue(succeeded);
  Assert.assertEquals(JobStatus.State.SUCCEEDED, job.getJobState());

  // FIXME once counters and task progress can be obtained properly
  // TODO verify failed task diagnostics
}
 
源代码6 项目: tez   文件: TestMRRJobs.java
@Test (timeout = 60000)
public void testFailingAttempt() throws IOException, InterruptedException,
    ClassNotFoundException {

  LOG.info("\n\n\nStarting testFailingAttempt().");

  if (!(new File(MiniTezCluster.APPJAR)).exists()) {
    LOG.info("MRAppJar " + MiniTezCluster.APPJAR
             + " not found. Not running test.");
    return;
  }

  Configuration sleepConf = new Configuration(mrrTezCluster.getConfig());

  MRRSleepJob sleepJob = new MRRSleepJob();
  sleepJob.setConf(sleepConf);

  Job job = sleepJob.createJob(1, 1, 1, 1, 1,
      1, 1, 1, 1, 1);

  job.setJarByClass(MRRSleepJob.class);
  job.setMaxMapAttempts(3); // speed up failures
  job.getConfiguration().setBoolean(MRRSleepJob.MAP_THROW_ERROR, true);
  job.getConfiguration().set(MRRSleepJob.MAP_ERROR_TASK_IDS, "0");

  job.submit();
  boolean succeeded = job.waitForCompletion(true);
  Assert.assertTrue(succeeded);
  Assert.assertEquals(JobStatus.State.SUCCEEDED, job.getJobState());

  // FIXME once counters and task progress can be obtained properly
  // TODO verify failed task diagnostics
}
 
源代码7 项目: parquet-mr   文件: TestReflectInputOutputFormat.java
private void waitForJob(Job job) throws Exception {
  job.submit();
  while (!job.isComplete()) {
    LOG.debug("waiting for job {}", job.getJobName());
    sleep(100);
  }
  LOG.info("status for job {}: {}", job.getJobName(), (job.isSuccessful() ? "SUCCESS" : "FAILURE"));
  if (!job.isSuccessful()) {
    throw new RuntimeException("job failed " + job.getJobName());
  }
}
 
源代码8 项目: RDFS   文件: DistBlockIntegrityMonitor.java
void submitJob(Job job, List<String> filesInJob, Priority priority, 
    Map<Job, List<LostFileInfo>> jobIndex)
        throws IOException, InterruptedException, ClassNotFoundException {
  job.submit();
  LOG.info("Job " + job.getID() + "(" + job.getJobName() +
      ") started");
  jobIndex.put(job, null);
}
 
源代码9 项目: hadoop   文件: TestMRWithDistributedCache.java
private void testWithConf(Configuration conf) throws IOException,
    InterruptedException, ClassNotFoundException, URISyntaxException {
  // Create a temporary file of length 1.
  Path first = createTempFile("distributed.first", "x");
  // Create two jars with a single file inside them.
  Path second =
      makeJar(new Path(TEST_ROOT_DIR, "distributed.second.jar"), 2);
  Path third =
      makeJar(new Path(TEST_ROOT_DIR, "distributed.third.jar"), 3);
  Path fourth =
      makeJar(new Path(TEST_ROOT_DIR, "distributed.fourth.jar"), 4);


  Job job = Job.getInstance(conf);
  job.setMapperClass(DistributedCacheCheckerMapper.class);
  job.setReducerClass(DistributedCacheCheckerReducer.class);
  job.setOutputFormatClass(NullOutputFormat.class);
  FileInputFormat.setInputPaths(job, first);
  // Creates the Job Configuration
  job.addCacheFile(
    new URI(first.toUri().toString() + "#distributed.first.symlink"));
  job.addFileToClassPath(second);
  job.addArchiveToClassPath(third);
  job.addCacheArchive(fourth.toUri());
  job.setMaxMapAttempts(1); // speed up failures

  job.submit();
  assertTrue(job.waitForCompletion(false));
}
 
源代码10 项目: hadoop   文件: Util.java
/** Run a job. */
static void runJob(String name, Job job, Machine machine, String startmessage, Util.Timer timer) {
  JOB_SEMAPHORE.acquireUninterruptibly();
  Long starttime = null;
  try {
    try {
      starttime = timer.tick("starting " + name + " ...\n  " + startmessage);

      //initialize and submit a job
      machine.init(job);
      job.submit();

      // Separate jobs
      final long sleeptime = 1000L * job.getConfiguration().getInt(JOB_SEPARATION_PROPERTY, 10);
      if (sleeptime > 0) {
        Util.out.println(name + "> sleep(" + Util.millis2String(sleeptime) + ")");
        Thread.sleep(sleeptime);
      }
    } finally {
      JOB_SEMAPHORE.release();
    }

    if (!job.waitForCompletion(false))
      throw new RuntimeException(name + " failed.");
  } catch(Exception e) {
    throw e instanceof RuntimeException? (RuntimeException)e: new RuntimeException(e);
  } finally {
    if (starttime != null)
      timer.tick(name + "> timetaken=" + Util.millis2String(timer.tick() - starttime));
  }
}
 
源代码11 项目: parquet-mr   文件: TestSpecificInputOutputFormat.java
private void waitForJob(Job job) throws Exception {
  job.submit();
  while (!job.isComplete()) {
    LOG.debug("waiting for job {}", job.getJobName());
    sleep(100);
  }
  LOG.info("status for job {}: {}", job.getJobName(), (job.isSuccessful() ? "SUCCESS" : "FAILURE"));
  if (!job.isSuccessful()) {
    throw new RuntimeException("job failed " + job.getJobName());
  }
}
 
源代码12 项目: parquet-mr   文件: TestInputOutputFormat.java
private void waitForJob(Job job) throws Exception {
  job.submit();
  while (!job.isComplete()) {
    LOG.debug("waiting for job {}", job.getJobName());
    sleep(100);
  }
  LOG.info("status for job {}: {}", job.getJobName(), (job.isSuccessful() ? "SUCCESS" : "FAILURE"));
  if (!job.isSuccessful()) {
    throw new RuntimeException("job failed " + job.getJobName());
  }
}
 
源代码13 项目: datawave   文件: IngestMetricsSummaryLoader.java
@Override
public int run(String[] args) throws Exception {
    Configuration conf = JobSetupUtil.configure(args, getConf(), log);
    
    JobSetupUtil.printConfig(getConf(), log);
    
    Job job = Job.getInstance(conf);
    Configuration jconf = job.getConfiguration();
    job.setJarByClass(this.getClass());
    
    boolean useHourlyPrecision = Boolean.valueOf(jconf.get(MetricsConfig.USE_HOURLY_PRECISION, MetricsConfig.DEFAULT_USE_HOURLY_PRECISION));
    
    if (useHourlyPrecision) {
        job.setJobName("IngestMetricsSummaries (hourly)");
    } else {
        job.setJobName("IngestMetricsSummaries");
    }
    
    try {
        Connections.initTables(conf);
    } catch (AccumuloException | AccumuloSecurityException e) {
        throw new IOException(e);
    }
    
    String inputTable = jconf.get(MetricsConfig.FILE_GRAPH_TABLE, MetricsConfig.DEFAULT_FILE_GRAPH_TABLE);
    String outputTable = HourlyPrecisionHelper.getOutputTable(jconf, useHourlyPrecision);
    String userName = jconf.get(MetricsConfig.USER);
    String password = jconf.get(MetricsConfig.PASS);
    String instance = jconf.get(MetricsConfig.INSTANCE);
    String zookeepers = jconf.get(MetricsConfig.ZOOKEEPERS, "localhost");
    Range dayRange = JobSetupUtil.computeTimeRange(jconf, log);
    long delta = Long.parseLong(dayRange.getEndKey().getRow().toString()) - Long.parseLong(dayRange.getStartKey().getRow().toString());
    int numDays = (int) Math.max(1, delta / TimeUnit.DAYS.toMillis(1));
    
    job.setMapperClass(IngestMetricsMapper.class);
    job.setMapOutputKeyClass(Key.class);
    job.setMapOutputValueClass(Value.class);
    job.setInputFormatClass(AccumuloInputFormat.class);
    AccumuloInputFormat.setConnectorInfo(job, userName, new PasswordToken(password));
    AccumuloInputFormat.setZooKeeperInstance(job, ClientConfiguration.loadDefault().withInstance(instance).withZkHosts(zookeepers));
    AccumuloInputFormat.setInputTableName(job, inputTable);
    AccumuloInputFormat.setScanAuthorizations(job, Authorizations.EMPTY);
    AccumuloInputFormat.setRanges(job, Collections.singletonList(dayRange));
    
    // Ensure all data for a day goes to the same reducer so that we aggregate it correctly before sending to Accumulo
    RowPartitioner.configureJob(job);
    
    // Configure the reducer and output format to write out our metrics
    MetricsDailySummaryReducer.configureJob(job, numDays, jconf.get(MetricsConfig.INSTANCE), jconf.get(MetricsConfig.ZOOKEEPERS), userName, password,
                    outputTable);
    
    job.submit();
    JobSetupUtil.changeJobPriority(job, log);
    
    job.waitForCompletion(true);
    
    return 0;
}
 
源代码14 项目: datawave   文件: QueryMetricsSummaryLoader.java
@Override
public int run(String[] args) throws Exception {
    Configuration conf = JobSetupUtil.configure(args, getConf(), log);
    
    JobSetupUtil.printConfig(getConf(), log);
    
    Job job = Job.getInstance(conf);
    Configuration jconf = job.getConfiguration();
    job.setJarByClass(this.getClass());
    
    boolean useHourlyPrecision = Boolean.valueOf(jconf.get(MetricsConfig.USE_HOURLY_PRECISION, MetricsConfig.DEFAULT_USE_HOURLY_PRECISION));
    
    if (useHourlyPrecision) {
        job.setJobName("QueryMetricsSummaries (hourly)");
    } else {
        job.setJobName("QueryMetricsSummaries");
    }
    
    try {
        Connections.initTables(conf);
    } catch (AccumuloException | AccumuloSecurityException e) {
        throw new IOException(e);
    }
    
    String inputTable = jconf.get(MetricsConfig.QUERY_METRICS_EVENT_TABLE, MetricsConfig.DEFAULT_QUERY_METRICS_EVENT_TABLE);
    String outputTable = HourlyPrecisionHelper.getOutputTable(jconf, useHourlyPrecision);
    
    String userName = jconf.get(MetricsConfig.WAREHOUSE_USERNAME);
    String password = jconf.get(MetricsConfig.WAREHOUSE_PASSWORD);
    String instance = jconf.get(MetricsConfig.WAREHOUSE_INSTANCE);
    String zookeepers = jconf.get(MetricsConfig.WAREHOUSE_ZOOKEEPERS, "localhost");
    Connector con = Connections.warehouseConnection(jconf);
    Authorizations auths = con.securityOperations().getUserAuthorizations(con.whoami());
    Collection<Range> dayRanges = JobSetupUtil.computeShardedDayRange(jconf, log);
    Range timeRange = JobSetupUtil.computeTimeRange(jconf, log);
    long delta = Long.parseLong(timeRange.getEndKey().getRow().toString()) - Long.parseLong(timeRange.getStartKey().getRow().toString());
    int numDays = (int) Math.max(1, delta / TimeUnit.DAYS.toMillis(1));
    
    job.setMapperClass(QueryMetricsMapper.class);
    job.setMapOutputKeyClass(Key.class);
    job.setMapOutputValueClass(Value.class);
    job.setInputFormatClass(AccumuloInputFormat.class);
    
    AccumuloInputFormat.setConnectorInfo(job, userName, new PasswordToken(password));
    AccumuloInputFormat.setZooKeeperInstance(job, ClientConfiguration.loadDefault().withInstance(instance).withZkHosts(zookeepers));
    AccumuloInputFormat.setRanges(job, dayRanges);
    AccumuloInputFormat.setAutoAdjustRanges(job, false);
    AccumuloInputFormat.setInputTableName(job, inputTable);
    AccumuloInputFormat.setScanAuthorizations(job, auths);
    
    IteratorSetting regex = new IteratorSetting(50, RegExFilter.class);
    regex.addOption(RegExFilter.COLF_REGEX, QUERY_METRICS_REGEX);
    AccumuloInputFormat.addIterator(job, regex);
    
    // Ensure all data for a day goes to the same reducer so that we aggregate it correctly before sending to Accumulo
    RowPartitioner.configureJob(job);
    
    // Configure the reducer and output format to write out our metrics
    MetricsDailySummaryReducer.configureJob(job, numDays, jconf.get(MetricsConfig.INSTANCE), jconf.get(MetricsConfig.ZOOKEEPERS),
                    jconf.get(MetricsConfig.USER), jconf.get(MetricsConfig.PASS), outputTable);
    
    job.submit();
    JobSetupUtil.changeJobPriority(job, log);
    
    job.waitForCompletion(true);
    
    return 0;
}
 
源代码15 项目: hadoop   文件: TestMRJobs.java
private void testJobClassloader(boolean useCustomClasses) throws IOException,
    InterruptedException, ClassNotFoundException {
  LOG.info("\n\n\nStarting testJobClassloader()"
      + " useCustomClasses=" + useCustomClasses);

  if (!(new File(MiniMRYarnCluster.APPJAR)).exists()) {
    LOG.info("MRAppJar " + MiniMRYarnCluster.APPJAR
             + " not found. Not running test.");
    return;
  }
  final Configuration sleepConf = new Configuration(mrCluster.getConfig());
  // set master address to local to test that local mode applied iff framework == local
  sleepConf.set(MRConfig.MASTER_ADDRESS, "local");
  sleepConf.setBoolean(MRJobConfig.MAPREDUCE_JOB_CLASSLOADER, true);
  if (useCustomClasses) {
    // to test AM loading user classes such as output format class, we want
    // to blacklist them from the system classes (they need to be prepended
    // as the first match wins)
    String systemClasses = ApplicationClassLoader.SYSTEM_CLASSES_DEFAULT;
    // exclude the custom classes from system classes
    systemClasses = "-" + CustomOutputFormat.class.getName() + ",-" +
        CustomSpeculator.class.getName() + "," +
        systemClasses;
    sleepConf.set(MRJobConfig.MAPREDUCE_JOB_CLASSLOADER_SYSTEM_CLASSES,
        systemClasses);
  }
  sleepConf.set(MRJobConfig.IO_SORT_MB, TEST_IO_SORT_MB);
  sleepConf.set(MRJobConfig.MR_AM_LOG_LEVEL, Level.ALL.toString());
  sleepConf.set(MRJobConfig.MAP_LOG_LEVEL, Level.ALL.toString());
  sleepConf.set(MRJobConfig.REDUCE_LOG_LEVEL, Level.ALL.toString());
  sleepConf.set(MRJobConfig.MAP_JAVA_OPTS, "-verbose:class");
  final SleepJob sleepJob = new SleepJob();
  sleepJob.setConf(sleepConf);
  final Job job = sleepJob.createJob(1, 1, 10, 1, 10, 1);
  job.setMapperClass(ConfVerificationMapper.class);
  job.addFileToClassPath(APP_JAR); // The AppMaster jar itself.
  job.setJarByClass(SleepJob.class);
  job.setMaxMapAttempts(1); // speed up failures
  if (useCustomClasses) {
    // set custom output format class and speculator class
    job.setOutputFormatClass(CustomOutputFormat.class);
    final Configuration jobConf = job.getConfiguration();
    jobConf.setClass(MRJobConfig.MR_AM_JOB_SPECULATOR, CustomSpeculator.class,
        Speculator.class);
    // speculation needs to be enabled for the speculator to be loaded
    jobConf.setBoolean(MRJobConfig.MAP_SPECULATIVE, true);
  }
  job.submit();
  boolean succeeded = job.waitForCompletion(true);
  Assert.assertTrue("Job status: " + job.getStatus().getFailureInfo(),
      succeeded);
}
 
源代码16 项目: sqoop-on-spark   文件: MapreduceSubmissionEngine.java
private void submitToCluster(MRJobRequest request, Job job) throws IOException, InterruptedException, ClassNotFoundException {
  job.submit();
  request.getJobSubmission().setExternalJobId(job.getJobID().toString());
  request.getJobSubmission().setExternalLink(job.getTrackingURL());
}
 
源代码17 项目: accumulo-recipes   文件: EventOutputFormatIT.java
public void runJob(Job job, EventStore eventStore) throws IOException, AccumuloSecurityException, ClassNotFoundException, InterruptedException, TableExistsException, AccumuloException, TableNotFoundException {
    File dir = temporaryFolder.newFolder("input");

    FileOutputStream fileOutputStream = new FileOutputStream(new File(dir,"uuids.txt"));
    PrintWriter printWriter = new PrintWriter(fileOutputStream);
    int countTotalResults = 100;
    try {
        for (int i = 0; i < countTotalResults; i++) {
            printWriter.println(""+i);
        }
    } finally {
        printWriter.flush();
        fileOutputStream.close();
    }

    Configuration conf = new Configuration();
    FileSystem fs = FileSystem.getLocal(conf);
    fs.setWorkingDirectory(new Path(dir.getAbsolutePath()));

    Path inputPath = fs.makeQualified(new Path(dir.getAbsolutePath()));  // local path


    EventOutputFormat.setZooKeeperInstance(job, accumuloMiniClusterDriver.getClientConfiguration());
    EventOutputFormat.setConnectorInfo(job, PRINCIPAL, new PasswordToken(accumuloMiniClusterDriver.getRootPassword()));
    job.setJarByClass(getClass());
    job.setMapperClass(TestMapper.class);

    job.setInputFormatClass(TextInputFormat.class);
    job.setMapOutputKeyClass(Text.class);
    job.setMapOutputValueClass(EventWritable.class);
    job.setOutputFormatClass(EventOutputFormat.class);

    FileInputFormat.setInputPaths(job, inputPath);

    job.submit();
    job.waitForCompletion(true);

    Iterable<Event> itr = eventStore.query(new Date(currentTimeMillis() - 25000),
            new Date(), Collections.singleton(TYPE), QueryBuilder.create().and().eq(KEY_1, VAL_1).end().build(), null, DEFAULT_AUTHS);

    List<Event> queryResults = Lists.newArrayList(itr);
    assertEquals(countTotalResults,queryResults.size());
}
 
源代码18 项目: aegisthus   文件: Distcp.java
protected boolean runJob(Job job, CommandLine cl) throws IOException, InterruptedException, ClassNotFoundException {
	job.submit();
	System.out.println(job.getJobID());
	System.out.println(job.getTrackingURL());
	return job.waitForCompletion(true);
}
 
源代码19 项目: incubator-tez   文件: TestMRRJobs.java
@Test (timeout = 60000)
public void testMRRSleepJobWithCompression() throws IOException,
    InterruptedException, ClassNotFoundException {
  LOG.info("\n\n\nStarting testMRRSleepJobWithCompression().");

  if (!(new File(MiniTezCluster.APPJAR)).exists()) {
    LOG.info("MRAppJar " + MiniTezCluster.APPJAR
             + " not found. Not running test.");
    return;
  }

  Configuration sleepConf = new Configuration(mrrTezCluster.getConfig());

  MRRSleepJob sleepJob = new MRRSleepJob();
  sleepJob.setConf(sleepConf);

  Job job = sleepJob.createJob(1, 1, 2, 1, 1,
      1, 1, 1, 1, 1);

  job.setJarByClass(MRRSleepJob.class);
  job.setMaxMapAttempts(1); // speed up failures

  // enable compression
  job.getConfiguration().setBoolean(MRJobConfig.MAP_OUTPUT_COMPRESS, true);
  job.getConfiguration().set(MRJobConfig.MAP_OUTPUT_COMPRESS_CODEC,
      DefaultCodec.class.getName());

  job.submit();
  String trackingUrl = job.getTrackingURL();
  String jobId = job.getJobID().toString();
  boolean succeeded = job.waitForCompletion(true);
  Assert.assertTrue(succeeded);
  Assert.assertEquals(JobStatus.State.SUCCEEDED, job.getJobState());
  Assert.assertTrue("Tracking URL was " + trackingUrl +
                    " but didn't Match Job ID " + jobId ,
        trackingUrl.endsWith(jobId.substring(jobId.lastIndexOf("_")) + "/"));

  // FIXME once counters and task progress can be obtained properly
  // TODO use dag client to test counters and task progress?
  // what about completed jobs?

}
 
源代码20 项目: big-c   文件: TestMRJobs.java
@Test (timeout = 60000)
public void testRandomWriter() throws IOException, InterruptedException,
    ClassNotFoundException {
  
  LOG.info("\n\n\nStarting testRandomWriter().");
  if (!(new File(MiniMRYarnCluster.APPJAR)).exists()) {
    LOG.info("MRAppJar " + MiniMRYarnCluster.APPJAR
             + " not found. Not running test.");
    return;
  }

  RandomTextWriterJob randomWriterJob = new RandomTextWriterJob();
  mrCluster.getConfig().set(RandomTextWriterJob.TOTAL_BYTES, "3072");
  mrCluster.getConfig().set(RandomTextWriterJob.BYTES_PER_MAP, "1024");
  Job job = randomWriterJob.createJob(mrCluster.getConfig());
  Path outputDir = new Path(OUTPUT_ROOT_DIR, "random-output");
  FileOutputFormat.setOutputPath(job, outputDir);
  job.setSpeculativeExecution(false);
  job.addFileToClassPath(APP_JAR); // The AppMaster jar itself.
  job.setJarByClass(RandomTextWriterJob.class);
  job.setMaxMapAttempts(1); // speed up failures
  job.submit();
  String trackingUrl = job.getTrackingURL();
  String jobId = job.getJobID().toString();
  boolean succeeded = job.waitForCompletion(true);
  Assert.assertTrue(succeeded);
  Assert.assertEquals(JobStatus.State.SUCCEEDED, job.getJobState());
  Assert.assertTrue("Tracking URL was " + trackingUrl +
                    " but didn't Match Job ID " + jobId ,
        trackingUrl.endsWith(jobId.substring(jobId.lastIndexOf("_")) + "/"));
  
  // Make sure there are three files in the output-dir
  
  RemoteIterator<FileStatus> iterator =
      FileContext.getFileContext(mrCluster.getConfig()).listStatus(
          outputDir);
  int count = 0;
  while (iterator.hasNext()) {
    FileStatus file = iterator.next();
    if (!file.getPath().getName()
        .equals(FileOutputCommitter.SUCCEEDED_FILE_NAME)) {
      count++;
    }
  }
  Assert.assertEquals("Number of part files is wrong!", 3, count);
  verifyRandomWriterCounters(job);

  // TODO later:  add explicit "isUber()" checks of some sort
}