下面列出了org.apache.hadoop.mapreduce.Job#submit ( ) 实例代码,或者点击链接到github查看源代码,也可以在右侧发表评论。
/**
* Runs a GridMix data-generation job.
*/
private static void runDataGenJob(Configuration conf, Path tempDir)
throws IOException, ClassNotFoundException, InterruptedException {
JobClient client = new JobClient(conf);
// get the local job runner
conf.setInt(MRJobConfig.NUM_MAPS, 1);
Job job = Job.getInstance(conf);
CompressionEmulationUtil.configure(job);
job.setInputFormatClass(CustomInputFormat.class);
// set the output path
FileOutputFormat.setOutputPath(job, tempDir);
// submit and wait for completion
job.submit();
int ret = job.waitForCompletion(true) ? 0 : 1;
assertEquals("Job Failed", 0, ret);
}
/**
* Runs a GridMix data-generation job.
*/
private static void runDataGenJob(Configuration conf, Path tempDir)
throws IOException, ClassNotFoundException, InterruptedException {
JobClient client = new JobClient(conf);
// get the local job runner
conf.setInt(MRJobConfig.NUM_MAPS, 1);
Job job = Job.getInstance(conf);
CompressionEmulationUtil.configure(job);
job.setInputFormatClass(CustomInputFormat.class);
// set the output path
FileOutputFormat.setOutputPath(job, tempDir);
// submit and wait for completion
job.submit();
int ret = job.waitForCompletion(true) ? 0 : 1;
assertEquals("Job Failed", 0, ret);
}
private void waitForJob(Job job) throws Exception {
job.submit();
while (!job.isComplete()) {
sleep(100);
}
if (!job.isSuccessful()) {
throw new RuntimeException("job failed " + job.getJobName());
}
}
private void testWithConf(Configuration conf) throws IOException,
InterruptedException, ClassNotFoundException, URISyntaxException {
// Create a temporary file of length 1.
Path first = createTempFile("distributed.first", "x");
// Create two jars with a single file inside them.
Path second =
makeJar(new Path(TEST_ROOT_DIR, "distributed.second.jar"), 2);
Path third =
makeJar(new Path(TEST_ROOT_DIR, "distributed.third.jar"), 3);
Path fourth =
makeJar(new Path(TEST_ROOT_DIR, "distributed.fourth.jar"), 4);
Job job = Job.getInstance(conf);
job.setMapperClass(DistributedCacheCheckerMapper.class);
job.setReducerClass(DistributedCacheCheckerReducer.class);
job.setOutputFormatClass(NullOutputFormat.class);
FileInputFormat.setInputPaths(job, first);
// Creates the Job Configuration
job.addCacheFile(
new URI(first.toUri().toString() + "#distributed.first.symlink"));
job.addFileToClassPath(second);
job.addArchiveToClassPath(third);
job.addCacheArchive(fourth.toUri());
job.setMaxMapAttempts(1); // speed up failures
job.submit();
assertTrue(job.waitForCompletion(false));
}
@Test (timeout = 60000)
public void testFailingAttempt() throws IOException, InterruptedException,
ClassNotFoundException {
LOG.info("\n\n\nStarting testFailingAttempt().");
if (!(new File(MiniTezCluster.APPJAR)).exists()) {
LOG.info("MRAppJar " + MiniTezCluster.APPJAR
+ " not found. Not running test.");
return;
}
Configuration sleepConf = new Configuration(mrrTezCluster.getConfig());
MRRSleepJob sleepJob = new MRRSleepJob();
sleepJob.setConf(sleepConf);
Job job = sleepJob.createJob(1, 1, 1, 1, 1,
1, 1, 1, 1, 1);
job.setJarByClass(MRRSleepJob.class);
job.setMaxMapAttempts(3); // speed up failures
job.getConfiguration().setBoolean(MRRSleepJob.MAP_THROW_ERROR, true);
job.getConfiguration().set(MRRSleepJob.MAP_ERROR_TASK_IDS, "0");
job.submit();
boolean succeeded = job.waitForCompletion(true);
Assert.assertTrue(succeeded);
Assert.assertEquals(JobStatus.State.SUCCEEDED, job.getJobState());
// FIXME once counters and task progress can be obtained properly
// TODO verify failed task diagnostics
}
@Test (timeout = 60000)
public void testFailingAttempt() throws IOException, InterruptedException,
ClassNotFoundException {
LOG.info("\n\n\nStarting testFailingAttempt().");
if (!(new File(MiniTezCluster.APPJAR)).exists()) {
LOG.info("MRAppJar " + MiniTezCluster.APPJAR
+ " not found. Not running test.");
return;
}
Configuration sleepConf = new Configuration(mrrTezCluster.getConfig());
MRRSleepJob sleepJob = new MRRSleepJob();
sleepJob.setConf(sleepConf);
Job job = sleepJob.createJob(1, 1, 1, 1, 1,
1, 1, 1, 1, 1);
job.setJarByClass(MRRSleepJob.class);
job.setMaxMapAttempts(3); // speed up failures
job.getConfiguration().setBoolean(MRRSleepJob.MAP_THROW_ERROR, true);
job.getConfiguration().set(MRRSleepJob.MAP_ERROR_TASK_IDS, "0");
job.submit();
boolean succeeded = job.waitForCompletion(true);
Assert.assertTrue(succeeded);
Assert.assertEquals(JobStatus.State.SUCCEEDED, job.getJobState());
// FIXME once counters and task progress can be obtained properly
// TODO verify failed task diagnostics
}
private void waitForJob(Job job) throws Exception {
job.submit();
while (!job.isComplete()) {
LOG.debug("waiting for job {}", job.getJobName());
sleep(100);
}
LOG.info("status for job {}: {}", job.getJobName(), (job.isSuccessful() ? "SUCCESS" : "FAILURE"));
if (!job.isSuccessful()) {
throw new RuntimeException("job failed " + job.getJobName());
}
}
void submitJob(Job job, List<String> filesInJob, Priority priority,
Map<Job, List<LostFileInfo>> jobIndex)
throws IOException, InterruptedException, ClassNotFoundException {
job.submit();
LOG.info("Job " + job.getID() + "(" + job.getJobName() +
") started");
jobIndex.put(job, null);
}
private void testWithConf(Configuration conf) throws IOException,
InterruptedException, ClassNotFoundException, URISyntaxException {
// Create a temporary file of length 1.
Path first = createTempFile("distributed.first", "x");
// Create two jars with a single file inside them.
Path second =
makeJar(new Path(TEST_ROOT_DIR, "distributed.second.jar"), 2);
Path third =
makeJar(new Path(TEST_ROOT_DIR, "distributed.third.jar"), 3);
Path fourth =
makeJar(new Path(TEST_ROOT_DIR, "distributed.fourth.jar"), 4);
Job job = Job.getInstance(conf);
job.setMapperClass(DistributedCacheCheckerMapper.class);
job.setReducerClass(DistributedCacheCheckerReducer.class);
job.setOutputFormatClass(NullOutputFormat.class);
FileInputFormat.setInputPaths(job, first);
// Creates the Job Configuration
job.addCacheFile(
new URI(first.toUri().toString() + "#distributed.first.symlink"));
job.addFileToClassPath(second);
job.addArchiveToClassPath(third);
job.addCacheArchive(fourth.toUri());
job.setMaxMapAttempts(1); // speed up failures
job.submit();
assertTrue(job.waitForCompletion(false));
}
/** Run a job. */
static void runJob(String name, Job job, Machine machine, String startmessage, Util.Timer timer) {
JOB_SEMAPHORE.acquireUninterruptibly();
Long starttime = null;
try {
try {
starttime = timer.tick("starting " + name + " ...\n " + startmessage);
//initialize and submit a job
machine.init(job);
job.submit();
// Separate jobs
final long sleeptime = 1000L * job.getConfiguration().getInt(JOB_SEPARATION_PROPERTY, 10);
if (sleeptime > 0) {
Util.out.println(name + "> sleep(" + Util.millis2String(sleeptime) + ")");
Thread.sleep(sleeptime);
}
} finally {
JOB_SEMAPHORE.release();
}
if (!job.waitForCompletion(false))
throw new RuntimeException(name + " failed.");
} catch(Exception e) {
throw e instanceof RuntimeException? (RuntimeException)e: new RuntimeException(e);
} finally {
if (starttime != null)
timer.tick(name + "> timetaken=" + Util.millis2String(timer.tick() - starttime));
}
}
private void waitForJob(Job job) throws Exception {
job.submit();
while (!job.isComplete()) {
LOG.debug("waiting for job {}", job.getJobName());
sleep(100);
}
LOG.info("status for job {}: {}", job.getJobName(), (job.isSuccessful() ? "SUCCESS" : "FAILURE"));
if (!job.isSuccessful()) {
throw new RuntimeException("job failed " + job.getJobName());
}
}
private void waitForJob(Job job) throws Exception {
job.submit();
while (!job.isComplete()) {
LOG.debug("waiting for job {}", job.getJobName());
sleep(100);
}
LOG.info("status for job {}: {}", job.getJobName(), (job.isSuccessful() ? "SUCCESS" : "FAILURE"));
if (!job.isSuccessful()) {
throw new RuntimeException("job failed " + job.getJobName());
}
}
@Override
public int run(String[] args) throws Exception {
Configuration conf = JobSetupUtil.configure(args, getConf(), log);
JobSetupUtil.printConfig(getConf(), log);
Job job = Job.getInstance(conf);
Configuration jconf = job.getConfiguration();
job.setJarByClass(this.getClass());
boolean useHourlyPrecision = Boolean.valueOf(jconf.get(MetricsConfig.USE_HOURLY_PRECISION, MetricsConfig.DEFAULT_USE_HOURLY_PRECISION));
if (useHourlyPrecision) {
job.setJobName("IngestMetricsSummaries (hourly)");
} else {
job.setJobName("IngestMetricsSummaries");
}
try {
Connections.initTables(conf);
} catch (AccumuloException | AccumuloSecurityException e) {
throw new IOException(e);
}
String inputTable = jconf.get(MetricsConfig.FILE_GRAPH_TABLE, MetricsConfig.DEFAULT_FILE_GRAPH_TABLE);
String outputTable = HourlyPrecisionHelper.getOutputTable(jconf, useHourlyPrecision);
String userName = jconf.get(MetricsConfig.USER);
String password = jconf.get(MetricsConfig.PASS);
String instance = jconf.get(MetricsConfig.INSTANCE);
String zookeepers = jconf.get(MetricsConfig.ZOOKEEPERS, "localhost");
Range dayRange = JobSetupUtil.computeTimeRange(jconf, log);
long delta = Long.parseLong(dayRange.getEndKey().getRow().toString()) - Long.parseLong(dayRange.getStartKey().getRow().toString());
int numDays = (int) Math.max(1, delta / TimeUnit.DAYS.toMillis(1));
job.setMapperClass(IngestMetricsMapper.class);
job.setMapOutputKeyClass(Key.class);
job.setMapOutputValueClass(Value.class);
job.setInputFormatClass(AccumuloInputFormat.class);
AccumuloInputFormat.setConnectorInfo(job, userName, new PasswordToken(password));
AccumuloInputFormat.setZooKeeperInstance(job, ClientConfiguration.loadDefault().withInstance(instance).withZkHosts(zookeepers));
AccumuloInputFormat.setInputTableName(job, inputTable);
AccumuloInputFormat.setScanAuthorizations(job, Authorizations.EMPTY);
AccumuloInputFormat.setRanges(job, Collections.singletonList(dayRange));
// Ensure all data for a day goes to the same reducer so that we aggregate it correctly before sending to Accumulo
RowPartitioner.configureJob(job);
// Configure the reducer and output format to write out our metrics
MetricsDailySummaryReducer.configureJob(job, numDays, jconf.get(MetricsConfig.INSTANCE), jconf.get(MetricsConfig.ZOOKEEPERS), userName, password,
outputTable);
job.submit();
JobSetupUtil.changeJobPriority(job, log);
job.waitForCompletion(true);
return 0;
}
@Override
public int run(String[] args) throws Exception {
Configuration conf = JobSetupUtil.configure(args, getConf(), log);
JobSetupUtil.printConfig(getConf(), log);
Job job = Job.getInstance(conf);
Configuration jconf = job.getConfiguration();
job.setJarByClass(this.getClass());
boolean useHourlyPrecision = Boolean.valueOf(jconf.get(MetricsConfig.USE_HOURLY_PRECISION, MetricsConfig.DEFAULT_USE_HOURLY_PRECISION));
if (useHourlyPrecision) {
job.setJobName("QueryMetricsSummaries (hourly)");
} else {
job.setJobName("QueryMetricsSummaries");
}
try {
Connections.initTables(conf);
} catch (AccumuloException | AccumuloSecurityException e) {
throw new IOException(e);
}
String inputTable = jconf.get(MetricsConfig.QUERY_METRICS_EVENT_TABLE, MetricsConfig.DEFAULT_QUERY_METRICS_EVENT_TABLE);
String outputTable = HourlyPrecisionHelper.getOutputTable(jconf, useHourlyPrecision);
String userName = jconf.get(MetricsConfig.WAREHOUSE_USERNAME);
String password = jconf.get(MetricsConfig.WAREHOUSE_PASSWORD);
String instance = jconf.get(MetricsConfig.WAREHOUSE_INSTANCE);
String zookeepers = jconf.get(MetricsConfig.WAREHOUSE_ZOOKEEPERS, "localhost");
Connector con = Connections.warehouseConnection(jconf);
Authorizations auths = con.securityOperations().getUserAuthorizations(con.whoami());
Collection<Range> dayRanges = JobSetupUtil.computeShardedDayRange(jconf, log);
Range timeRange = JobSetupUtil.computeTimeRange(jconf, log);
long delta = Long.parseLong(timeRange.getEndKey().getRow().toString()) - Long.parseLong(timeRange.getStartKey().getRow().toString());
int numDays = (int) Math.max(1, delta / TimeUnit.DAYS.toMillis(1));
job.setMapperClass(QueryMetricsMapper.class);
job.setMapOutputKeyClass(Key.class);
job.setMapOutputValueClass(Value.class);
job.setInputFormatClass(AccumuloInputFormat.class);
AccumuloInputFormat.setConnectorInfo(job, userName, new PasswordToken(password));
AccumuloInputFormat.setZooKeeperInstance(job, ClientConfiguration.loadDefault().withInstance(instance).withZkHosts(zookeepers));
AccumuloInputFormat.setRanges(job, dayRanges);
AccumuloInputFormat.setAutoAdjustRanges(job, false);
AccumuloInputFormat.setInputTableName(job, inputTable);
AccumuloInputFormat.setScanAuthorizations(job, auths);
IteratorSetting regex = new IteratorSetting(50, RegExFilter.class);
regex.addOption(RegExFilter.COLF_REGEX, QUERY_METRICS_REGEX);
AccumuloInputFormat.addIterator(job, regex);
// Ensure all data for a day goes to the same reducer so that we aggregate it correctly before sending to Accumulo
RowPartitioner.configureJob(job);
// Configure the reducer and output format to write out our metrics
MetricsDailySummaryReducer.configureJob(job, numDays, jconf.get(MetricsConfig.INSTANCE), jconf.get(MetricsConfig.ZOOKEEPERS),
jconf.get(MetricsConfig.USER), jconf.get(MetricsConfig.PASS), outputTable);
job.submit();
JobSetupUtil.changeJobPriority(job, log);
job.waitForCompletion(true);
return 0;
}
private void testJobClassloader(boolean useCustomClasses) throws IOException,
InterruptedException, ClassNotFoundException {
LOG.info("\n\n\nStarting testJobClassloader()"
+ " useCustomClasses=" + useCustomClasses);
if (!(new File(MiniMRYarnCluster.APPJAR)).exists()) {
LOG.info("MRAppJar " + MiniMRYarnCluster.APPJAR
+ " not found. Not running test.");
return;
}
final Configuration sleepConf = new Configuration(mrCluster.getConfig());
// set master address to local to test that local mode applied iff framework == local
sleepConf.set(MRConfig.MASTER_ADDRESS, "local");
sleepConf.setBoolean(MRJobConfig.MAPREDUCE_JOB_CLASSLOADER, true);
if (useCustomClasses) {
// to test AM loading user classes such as output format class, we want
// to blacklist them from the system classes (they need to be prepended
// as the first match wins)
String systemClasses = ApplicationClassLoader.SYSTEM_CLASSES_DEFAULT;
// exclude the custom classes from system classes
systemClasses = "-" + CustomOutputFormat.class.getName() + ",-" +
CustomSpeculator.class.getName() + "," +
systemClasses;
sleepConf.set(MRJobConfig.MAPREDUCE_JOB_CLASSLOADER_SYSTEM_CLASSES,
systemClasses);
}
sleepConf.set(MRJobConfig.IO_SORT_MB, TEST_IO_SORT_MB);
sleepConf.set(MRJobConfig.MR_AM_LOG_LEVEL, Level.ALL.toString());
sleepConf.set(MRJobConfig.MAP_LOG_LEVEL, Level.ALL.toString());
sleepConf.set(MRJobConfig.REDUCE_LOG_LEVEL, Level.ALL.toString());
sleepConf.set(MRJobConfig.MAP_JAVA_OPTS, "-verbose:class");
final SleepJob sleepJob = new SleepJob();
sleepJob.setConf(sleepConf);
final Job job = sleepJob.createJob(1, 1, 10, 1, 10, 1);
job.setMapperClass(ConfVerificationMapper.class);
job.addFileToClassPath(APP_JAR); // The AppMaster jar itself.
job.setJarByClass(SleepJob.class);
job.setMaxMapAttempts(1); // speed up failures
if (useCustomClasses) {
// set custom output format class and speculator class
job.setOutputFormatClass(CustomOutputFormat.class);
final Configuration jobConf = job.getConfiguration();
jobConf.setClass(MRJobConfig.MR_AM_JOB_SPECULATOR, CustomSpeculator.class,
Speculator.class);
// speculation needs to be enabled for the speculator to be loaded
jobConf.setBoolean(MRJobConfig.MAP_SPECULATIVE, true);
}
job.submit();
boolean succeeded = job.waitForCompletion(true);
Assert.assertTrue("Job status: " + job.getStatus().getFailureInfo(),
succeeded);
}
private void submitToCluster(MRJobRequest request, Job job) throws IOException, InterruptedException, ClassNotFoundException {
job.submit();
request.getJobSubmission().setExternalJobId(job.getJobID().toString());
request.getJobSubmission().setExternalLink(job.getTrackingURL());
}
public void runJob(Job job, EventStore eventStore) throws IOException, AccumuloSecurityException, ClassNotFoundException, InterruptedException, TableExistsException, AccumuloException, TableNotFoundException {
File dir = temporaryFolder.newFolder("input");
FileOutputStream fileOutputStream = new FileOutputStream(new File(dir,"uuids.txt"));
PrintWriter printWriter = new PrintWriter(fileOutputStream);
int countTotalResults = 100;
try {
for (int i = 0; i < countTotalResults; i++) {
printWriter.println(""+i);
}
} finally {
printWriter.flush();
fileOutputStream.close();
}
Configuration conf = new Configuration();
FileSystem fs = FileSystem.getLocal(conf);
fs.setWorkingDirectory(new Path(dir.getAbsolutePath()));
Path inputPath = fs.makeQualified(new Path(dir.getAbsolutePath())); // local path
EventOutputFormat.setZooKeeperInstance(job, accumuloMiniClusterDriver.getClientConfiguration());
EventOutputFormat.setConnectorInfo(job, PRINCIPAL, new PasswordToken(accumuloMiniClusterDriver.getRootPassword()));
job.setJarByClass(getClass());
job.setMapperClass(TestMapper.class);
job.setInputFormatClass(TextInputFormat.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(EventWritable.class);
job.setOutputFormatClass(EventOutputFormat.class);
FileInputFormat.setInputPaths(job, inputPath);
job.submit();
job.waitForCompletion(true);
Iterable<Event> itr = eventStore.query(new Date(currentTimeMillis() - 25000),
new Date(), Collections.singleton(TYPE), QueryBuilder.create().and().eq(KEY_1, VAL_1).end().build(), null, DEFAULT_AUTHS);
List<Event> queryResults = Lists.newArrayList(itr);
assertEquals(countTotalResults,queryResults.size());
}
protected boolean runJob(Job job, CommandLine cl) throws IOException, InterruptedException, ClassNotFoundException {
job.submit();
System.out.println(job.getJobID());
System.out.println(job.getTrackingURL());
return job.waitForCompletion(true);
}
@Test (timeout = 60000)
public void testMRRSleepJobWithCompression() throws IOException,
InterruptedException, ClassNotFoundException {
LOG.info("\n\n\nStarting testMRRSleepJobWithCompression().");
if (!(new File(MiniTezCluster.APPJAR)).exists()) {
LOG.info("MRAppJar " + MiniTezCluster.APPJAR
+ " not found. Not running test.");
return;
}
Configuration sleepConf = new Configuration(mrrTezCluster.getConfig());
MRRSleepJob sleepJob = new MRRSleepJob();
sleepJob.setConf(sleepConf);
Job job = sleepJob.createJob(1, 1, 2, 1, 1,
1, 1, 1, 1, 1);
job.setJarByClass(MRRSleepJob.class);
job.setMaxMapAttempts(1); // speed up failures
// enable compression
job.getConfiguration().setBoolean(MRJobConfig.MAP_OUTPUT_COMPRESS, true);
job.getConfiguration().set(MRJobConfig.MAP_OUTPUT_COMPRESS_CODEC,
DefaultCodec.class.getName());
job.submit();
String trackingUrl = job.getTrackingURL();
String jobId = job.getJobID().toString();
boolean succeeded = job.waitForCompletion(true);
Assert.assertTrue(succeeded);
Assert.assertEquals(JobStatus.State.SUCCEEDED, job.getJobState());
Assert.assertTrue("Tracking URL was " + trackingUrl +
" but didn't Match Job ID " + jobId ,
trackingUrl.endsWith(jobId.substring(jobId.lastIndexOf("_")) + "/"));
// FIXME once counters and task progress can be obtained properly
// TODO use dag client to test counters and task progress?
// what about completed jobs?
}
@Test (timeout = 60000)
public void testRandomWriter() throws IOException, InterruptedException,
ClassNotFoundException {
LOG.info("\n\n\nStarting testRandomWriter().");
if (!(new File(MiniMRYarnCluster.APPJAR)).exists()) {
LOG.info("MRAppJar " + MiniMRYarnCluster.APPJAR
+ " not found. Not running test.");
return;
}
RandomTextWriterJob randomWriterJob = new RandomTextWriterJob();
mrCluster.getConfig().set(RandomTextWriterJob.TOTAL_BYTES, "3072");
mrCluster.getConfig().set(RandomTextWriterJob.BYTES_PER_MAP, "1024");
Job job = randomWriterJob.createJob(mrCluster.getConfig());
Path outputDir = new Path(OUTPUT_ROOT_DIR, "random-output");
FileOutputFormat.setOutputPath(job, outputDir);
job.setSpeculativeExecution(false);
job.addFileToClassPath(APP_JAR); // The AppMaster jar itself.
job.setJarByClass(RandomTextWriterJob.class);
job.setMaxMapAttempts(1); // speed up failures
job.submit();
String trackingUrl = job.getTrackingURL();
String jobId = job.getJobID().toString();
boolean succeeded = job.waitForCompletion(true);
Assert.assertTrue(succeeded);
Assert.assertEquals(JobStatus.State.SUCCEEDED, job.getJobState());
Assert.assertTrue("Tracking URL was " + trackingUrl +
" but didn't Match Job ID " + jobId ,
trackingUrl.endsWith(jobId.substring(jobId.lastIndexOf("_")) + "/"));
// Make sure there are three files in the output-dir
RemoteIterator<FileStatus> iterator =
FileContext.getFileContext(mrCluster.getConfig()).listStatus(
outputDir);
int count = 0;
while (iterator.hasNext()) {
FileStatus file = iterator.next();
if (!file.getPath().getName()
.equals(FileOutputCommitter.SUCCEEDED_FILE_NAME)) {
count++;
}
}
Assert.assertEquals("Number of part files is wrong!", 3, count);
verifyRandomWriterCounters(job);
// TODO later: add explicit "isUber()" checks of some sort
}