下面列出了org.apache.hadoop.io.serializer.JavaSerializationComparator#org.apache.hadoop.mapreduce.Job 实例代码,或者点击链接到github查看源代码,也可以在右侧发表评论。
protected Job initJob(Configuration conf) throws IOException {
Job job = Job.getInstance(conf, this.jobName);
job.setJarByClass(this.runnerClass);
// 本地运行
// TableMapReduceUtil.initTableMapperJob(initScans(job), this.mapperClass, this.mapOutputKeyClass, this.mapOutputValueClass, job, false);
TableMapReduceUtil.initTableMapperJob(initScans(job), this.mapperClass, this.mapOutputKeyClass, this.mapOutputValueClass, job, true);
// 集群运行:本地提交和打包(jar)提交
// TableMapReduceUtil.initTableMapperJob(initScans(job),
// this.mapperClass, this.mapOutputKeyClass, this.mapOutputValueClass,
// job);
job.setReducerClass(this.reducerClass);
job.setOutputKeyClass(this.outputKeyClass);
job.setOutputValueClass(this.outputValueClass);
job.setOutputFormatClass(this.outputFormatClass);
return job;
}
/**
* Tests Reducer throwing exception.
*
* @throws Exception
*/
public void testReducerFail() throws Exception {
Configuration conf = createJobConf();
Job job = MapReduceTestUtil.createJob(conf, inDir, outDir, 1, 1, input);
job.setJobName("chain");
ChainMapper.addMapper(job, Mapper.class, LongWritable.class, Text.class,
LongWritable.class, Text.class, null);
ChainReducer.setReducer(job, FailReduce.class, LongWritable.class,
Text.class, LongWritable.class, Text.class, null);
ChainReducer.addMapper(job, Mapper.class, LongWritable.class, Text.class,
LongWritable.class, Text.class, null);
job.waitForCompletion(true);
assertTrue("Job Not failed", !job.isSuccessful());
}
/**
* Configure properties needed to connect to a Fluo application
*
* @param conf Job configuration
* @param config use {@link org.apache.fluo.api.config.FluoConfiguration} to configure
* programmatically
*/
public static void configure(Job conf, SimpleConfiguration config) {
try {
FluoConfiguration fconfig = new FluoConfiguration(config);
try (Environment env = new Environment(fconfig)) {
long ts =
env.getSharedResources().getTimestampTracker().allocateTimestamp().getTxTimestamp();
conf.getConfiguration().setLong(TIMESTAMP_CONF_KEY, ts);
ByteArrayOutputStream baos = new ByteArrayOutputStream();
config.save(baos);
conf.getConfiguration().set(PROPS_CONF_KEY,
new String(baos.toByteArray(), StandardCharsets.UTF_8));
AccumuloInputFormat.setZooKeeperInstance(conf, fconfig.getAccumuloInstance(),
fconfig.getAccumuloZookeepers());
AccumuloInputFormat.setConnectorInfo(conf, fconfig.getAccumuloUser(),
new PasswordToken(fconfig.getAccumuloPassword()));
AccumuloInputFormat.setInputTableName(conf, env.getTable());
AccumuloInputFormat.setScanAuthorizations(conf, env.getAuthorizations());
}
} catch (Exception e) {
throw new RuntimeException(e);
}
}
private static void setAccumuloOutput(final String instStr, final String zooStr, final String userStr, final String passStr, final Job job, final String tableName)
throws AccumuloSecurityException {
final AuthenticationToken token = new PasswordToken(passStr);
AccumuloOutputFormat.setConnectorInfo(job, userStr, token);
AccumuloOutputFormat.setDefaultTableName(job, tableName);
AccumuloOutputFormat.setCreateTables(job, true);
//TODO best way to do this?
if (zooStr.equals("mock")) {
AccumuloOutputFormat.setMockInstance(job, instStr);
} else {
AccumuloOutputFormat.setZooKeeperInstance(job, instStr, zooStr);
}
job.setOutputFormatClass(AccumuloOutputFormat.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Mutation.class);
}
/**
* Parse arguments and then runs a map/reduce job.
* Print output in standard out.
*
* @return a non-zero if there is an error. Otherwise, return 0.
*/
public int run(String[] args) throws Exception {
if (args.length != 2) {
System.err.println("Usage: "+getClass().getName()+" <nMaps> <nSamples>");
ToolRunner.printGenericCommandUsage(System.err);
return -1;
}
final int nMaps = Integer.parseInt(args[0]);
final long nSamples = Long.parseLong(args[1]);
System.out.println("Number of Maps = " + nMaps);
System.out.println("Samples per Map = " + nSamples);
Configuration conf = new Configuration();
final Job job = new Job(conf, "PiEstimatior");
System.out.println("Estimated value of Pi is "
+ estimate(nMaps, nSamples, job));
return 0;
}
public Job createJob()
throws IOException {
Configuration conf = getConf();
conf.setInt(MRJobConfig.NUM_MAPS, 1);
Job job = Job.getInstance(conf, "test");
job.setNumReduceTasks(1);
job.setJarByClass(CredentialsTestJob.class);
job.setNumReduceTasks(1);
job.setMapperClass(CredentialsTestJob.CredentialsTestMapper.class);
job.setMapOutputKeyClass(IntWritable.class);
job.setMapOutputValueClass(NullWritable.class);
job.setReducerClass(CredentialsTestJob.CredentialsTestReducer.class);
job.setInputFormatClass(SleepJob.SleepInputFormat.class);
job.setPartitionerClass(SleepJob.SleepJobPartitioner.class);
job.setOutputFormatClass(NullOutputFormat.class);
job.setSpeculativeExecution(false);
job.setJobName("test job");
FileInputFormat.addInputPath(job, new Path("ignored"));
return job;
}
public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
if (args.length != 2) {
System.err.println("Usage: AvgTemperature <input path> <output path>");
System.exit(-1);
}
Job job = Job.getInstance();
job.setJarByClass(AvgTemperature.class);
job.setJobName("MapReduce实验-气象数据集-求气温平均值");
FileInputFormat.addInputPath(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));
job.setMapperClass(TemperatureMapper.class);
job.setReducerClass(AvgTemperatureReducer.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
System.exit(job.waitForCompletion(true) ? 0 : 1);
}
private void exportHBaseConfiguration(String hbaseTableName) throws IOException {
Configuration hbaseConf = HBaseConnection.getCurrentHBaseConfiguration();
HadoopUtil.healSickConfig(hbaseConf);
Job job = Job.getInstance(hbaseConf, hbaseTableName);
HTable table = new HTable(hbaseConf, hbaseTableName);
HFileOutputFormat3.configureIncrementalLoadMap(job, table);
logger.info("Saving HBase configuration to {}", hbaseConfPath);
FileSystem fs = HadoopUtil.getWorkingFileSystem();
FSDataOutputStream out = null;
try {
out = fs.create(new Path(hbaseConfPath));
job.getConfiguration().writeXml(out);
} finally {
IOUtils.closeQuietly(out);
}
}
private void seekInRightStream(Object firstLeftKey) throws IOException{
rightLoader = (LoadFunc)PigContext.instantiateFuncFromSpec(rightLoaderFuncSpec);
// check if hadoop distributed cache is used
if (indexFile != null && rightLoader instanceof DefaultIndexableLoader) {
DefaultIndexableLoader loader = (DefaultIndexableLoader)rightLoader;
loader.setIndexFile(indexFile);
}
// Pass signature of the loader to rightLoader
// make a copy of the conf to use in calls to rightLoader.
rightLoader.setUDFContextSignature(signature);
Job job = new Job(new Configuration(PigMapReduce.sJobConfInternal.get()));
rightLoader.setLocation(rightInputFileName, job);
((IndexableLoadFunc)rightLoader).initialize(job.getConfiguration());
((IndexableLoadFunc)rightLoader).seekNear(
firstLeftKey instanceof Tuple ? (Tuple)firstLeftKey : mTupleFactory.newTuple(firstLeftKey));
}
protected void propagateOptionsToJob(Job job) {
super.propagateOptionsToJob(job);
SqoopOptions opts = context.getOptions();
Configuration conf = job.getConfiguration();
if (opts.getNullStringValue() != null) {
conf.set("postgresql.null.string", opts.getNullStringValue());
}
setDelimiter("postgresql.input.field.delim",
opts.getInputFieldDelim(), conf);
setDelimiter("postgresql.input.record.delim",
opts.getInputRecordDelim(), conf);
setDelimiter("postgresql.input.enclosedby",
opts.getInputEnclosedBy(), conf);
setDelimiter("postgresql.input.escapedby",
opts.getInputEscapedBy(), conf);
conf.setBoolean("postgresql.input.encloserequired",
opts.isInputEncloseRequired());
}
protected void writeProperties(String jobId, Job job, FileSystem fs, Path classpath, Properties properties) {
File f = null;
try {
f = File.createTempFile(jobId, ".properties");
try (FileOutputStream fos = new FileOutputStream(f)) {
properties.store(fos, "");
}
addSingleFile(f, new Path(classpath, "embedded-configuration.properties"), jobId, job, fs);
} catch (IOException e) {
log.error(e.getMessage(), e);
} finally {
if (f != null) {
f.delete();
}
}
}
protected void doVerify(Configuration conf, TableDescriptor tableDescriptor) throws Exception {
Path outputDir = getTestDir(TEST_NAME, "verify-output");
LOG.info("Verify output dir: " + outputDir);
Job job = Job.getInstance(conf);
job.setJarByClass(this.getClass());
job.setJobName(TEST_NAME + " Verification for " + tableDescriptor.getTableName());
setJobScannerConf(job);
Scan scan = new Scan();
TableMapReduceUtil.initTableMapperJob(
tableDescriptor.getTableName().getNameAsString(), scan, VerifyMapper.class,
BytesWritable.class, BytesWritable.class, job);
TableMapReduceUtil.addDependencyJarsForClasses(job.getConfiguration(), AbstractHBaseTool.class);
int scannerCaching = conf.getInt("verify.scannercaching", SCANNER_CACHING);
TableMapReduceUtil.setScannerCaching(job, scannerCaching);
job.setReducerClass(VerifyReducer.class);
job.setNumReduceTasks(conf.getInt(NUM_REDUCE_TASKS_KEY, NUM_REDUCE_TASKS_DEFAULT));
FileOutputFormat.setOutputPath(job, outputDir);
assertTrue(job.waitForCompletion(true));
long numOutputRecords = job.getCounters().findCounter(Counters.ROWS_WRITTEN).getValue();
assertEquals(0, numOutputRecords);
}
private void exportHBaseConfiguration(String hbaseTableName) throws IOException {
Configuration hbaseConf = HBaseConnection.getCurrentHBaseConfiguration();
HadoopUtil.healSickConfig(hbaseConf);
Job job = Job.getInstance(hbaseConf, hbaseTableName);
HTable table = new HTable(hbaseConf, hbaseTableName);
HFileOutputFormat3.configureIncrementalLoadMap(job, table);
logger.info("Saving HBase configuration to {}", hbaseConfPath);
FileSystem fs = HadoopUtil.getWorkingFileSystem();
FSDataOutputStream out = null;
try {
out = fs.create(new Path(hbaseConfPath));
job.getConfiguration().writeXml(out);
} finally {
IOUtils.closeQuietly(out);
}
}
/**
* Initializes the reduce-part of the job with
* the appropriate output settings
*
* @param job The job
* @param tableName The table to insert data into
* @param fieldNames The field names in the table.
*/
public static void setOutput(Job job, String tableName,
String... fieldNames) throws IOException {
if(fieldNames.length > 0 && fieldNames[0] != null) {
DBConfiguration dbConf = setOutput(job, tableName);
dbConf.setOutputFieldNames(fieldNames);
} else {
if (fieldNames.length > 0) {
setOutput(job, tableName, fieldNames.length);
}
else {
throw new IllegalArgumentException(
"Field names must be greater than 0");
}
}
}
public static void main(String[] args) throws IOException, InterruptedException, ClassNotFoundException {
Configuration conf = new Configuration();
// A is an m-by-n matrix; B is an n-by-p matrix.
conf.set("m", args[0]);
conf.set("n", args[1]);
conf.set("p", args[2]);
Job job = new Job(conf, "Matrix_Multiplication");
job.setJarByClass(MatMulDriver.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Text.class);
job.setMapperClass(MatMulMap.class);
//Don't use combiner if there is no scope of combining the output. Otherwise the job will get stuck.
//job.setCombinerClass(MatMulModGenReduce.class);
job.setReducerClass(MatMulReduce.class);
//args[3] is the input path.
FileInputFormat.addInputPath(job, new Path(args[3]));
//args[4] is the output path.
FileOutputFormat.setOutputPath(job, new Path(args[4]));
System.exit(job.waitForCompletion(true)?0:1);
}
private void configueAvroMergeJob(Configuration conf, Job job, Path oldPath, Path newPath)
throws IOException {
LOG.info("Trying to merge avro files");
final Schema oldPathSchema = AvroUtil.getAvroSchema(oldPath, conf);
final Schema newPathSchema = AvroUtil.getAvroSchema(newPath, conf);
if (oldPathSchema == null || newPathSchema == null || !oldPathSchema.equals(newPathSchema)) {
throw new IOException("Invalid schema for input directories. Schema for old data: ["
+ oldPathSchema + "]. Schema for new data: [" + newPathSchema + "]");
}
LOG.debug("Avro Schema:" + oldPathSchema);
job.setInputFormatClass(AvroInputFormat.class);
job.setOutputFormatClass(AvroOutputFormat.class);
job.setMapperClass(MergeAvroMapper.class);
job.setReducerClass(MergeAvroReducer.class);
AvroJob.setOutputSchema(job.getConfiguration(), oldPathSchema);
}
private void appendTmpDir(Job job, FileSystem fs, Path tmpDir, StringBuilder jarList, StringBuilder fileList) {
try {
FileStatus[] fList = fs.listStatus(tmpDir);
for (FileStatus file : fList) {
Path p = file.getPath();
if (fs.getFileStatus(p).isDirectory()) {
appendTmpDir(job, fs, p, jarList, fileList);
continue;
}
StringBuilder list = (p.getName().endsWith(".jar")) ? jarList : fileList;
if (list.length() > 0)
list.append(",");
list.append(fs.getFileStatus(p).getPath().toString());
}
} catch (IOException e) {
throw new RuntimeException(e);
}
}
@Override
public List<String> getPredicateFields(String location, Job job) throws IOException {
LOG.info("[{}]: getPredicateFields() -> {}", signature, location);
Schema schema = load(location, job).schema();
List<String> result = Lists.newArrayList();
for (Types.NestedField nf : schema.columns()) {
switch (nf.type().typeId()) {
case MAP:
case LIST:
case STRUCT:
continue;
default:
result.add(nf.name());
}
}
return result;
}
public static void main(String[] args) throws Exception {
Configuration conf = new Configuration();
if (args.length < 2) {
System.err.println(
"Usage: LinkCountCooccurrences configFile outputDir");
System.exit(2);
}
String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();
Job job = Job.getInstance(conf, "link count cooccurrences");
job.setJarByClass(LinkCountCooccurrences.class);
job.setInputFormatClass(KeyValueInputFormat.class);
job.setMapperClass(RefMapper.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(IntWritable.class);
job.setCombinerClass(IntSumReducer.class);
job.setReducerClass(IntSumReducer.class);
job.setOutputFormatClass(TextOutputFormat.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
FileOutputFormat.setOutputPath(job, new Path(otherArgs[1]));
conf = job.getConfiguration();
conf.addResource(otherArgs[0]);
conf.setClass(MarkLogicConstants.INPUT_KEY_CLASS, Text.class,
Writable.class);
conf.setClass(MarkLogicConstants.INPUT_VALUE_CLASS, Text.class,
Writable.class);
conf.setClass(MarkLogicConstants.INPUT_LEXICON_FUNCTION_CLASS,
HrefTitleMap.class, ElemAttrValueCooccurrences.class);
System.exit(job.waitForCompletion(true) ? 0 : 1);
}
@Test
public void testStatusLimit() throws IOException, InterruptedException,
ClassNotFoundException {
Path test = new Path(testRootTempDir, "testStatusLimit");
Configuration conf = new Configuration();
Path inDir = new Path(test, "in");
Path outDir = new Path(test, "out");
FileSystem fs = FileSystem.get(conf);
if (fs.exists(inDir)) {
fs.delete(inDir, true);
}
fs.mkdirs(inDir);
DataOutputStream file = fs.create(new Path(inDir, "part-" + 0));
file.writeBytes("testStatusLimit");
file.close();
if (fs.exists(outDir)) {
fs.delete(outDir, true);
}
Job job = Job.getInstance(conf, "testStatusLimit");
job.setMapperClass(StatusLimitMapper.class);
job.setNumReduceTasks(0);
FileInputFormat.addInputPath(job, inDir);
FileOutputFormat.setOutputPath(job, outDir);
job.waitForCompletion(true);
assertTrue("Job failed", job.isSuccessful());
}
@Test
public void testStatusLimit() throws IOException, InterruptedException,
ClassNotFoundException {
Path test = new Path(testRootTempDir, "testStatusLimit");
Configuration conf = new Configuration();
Path inDir = new Path(test, "in");
Path outDir = new Path(test, "out");
FileSystem fs = FileSystem.get(conf);
if (fs.exists(inDir)) {
fs.delete(inDir, true);
}
fs.mkdirs(inDir);
DataOutputStream file = fs.create(new Path(inDir, "part-" + 0));
file.writeBytes("testStatusLimit");
file.close();
if (fs.exists(outDir)) {
fs.delete(outDir, true);
}
Job job = Job.getInstance(conf, "testStatusLimit");
job.setMapperClass(StatusLimitMapper.class);
job.setNumReduceTasks(0);
FileInputFormat.addInputPath(job, inDir);
FileOutputFormat.setOutputPath(job, outDir);
job.waitForCompletion(true);
assertTrue("Job failed", job.isSuccessful());
}
protected int waitForCompletion(Job job) throws IOException, InterruptedException, ClassNotFoundException {
int retVal = 0;
long start = System.nanoTime();
if (isAsync) {
job.submit();
} else {
job.waitForCompletion(true);
retVal = job.isSuccessful() ? 0 : 1;
logger.debug("Job '" + job.getJobName() + "' finished "
+ (job.isSuccessful() ? "successfully in " : "with failures. Time taken ")
+ formatTime((System.nanoTime() - start) / 1000000L));
}
return retVal;
}
public static void main(final String[] args) throws Exception {
Configuration conf = new Configuration();
conf.set(MORPHLINE_FILE, args[2]);
conf.set(MORPHLINE_ID, args[3]);
Job job = Job.getInstance(conf, "data cleaning");
job.setJarByClass(MapperCleaner.class);
job.setMapperClass(Cleaner.class);
job.setNumReduceTasks(0);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(NullWritable.class);
FileInputFormat.addInputPath(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));
System.exit(job.waitForCompletion(true) ? 0 : 1);
}
/**
* Builds and runs the Hadoop job.
* @return 0 if the Hadoop job completes successfully and 1 otherwise.
*/
@Override
public int run(String[] arg0) throws Exception {
Configuration conf = getConf();
//
Job job = new Job(conf);
job.setJarByClass(WARCTagCounter.class);
job.setNumReduceTasks(1);
String inputPath = "data/*.warc.gz";
//inputPath = "s3n://aws-publicdatasets/common-crawl/crawl-data/CC-MAIN-2013-48/segments/1386163035819/wet/CC-MAIN-20131204131715-00000-ip-10-33-133-15.ec2.internal.warc.wet.gz";
//inputPath = "s3n://aws-publicdatasets/common-crawl/crawl-data/CC-MAIN-2013-48/segments/1386163035819/wet/*.warc.wet.gz";
LOG.info("Input path: " + inputPath);
FileInputFormat.addInputPath(job, new Path(inputPath));
String outputPath = "/tmp/cc/";
FileSystem fs = FileSystem.newInstance(conf);
if (fs.exists(new Path(outputPath))) {
fs.delete(new Path(outputPath), true);
}
FileOutputFormat.setOutputPath(job, new Path(outputPath));
job.setInputFormatClass(WARCFileInputFormat.class);
job.setOutputFormatClass(TextOutputFormat.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(LongWritable.class);
job.setMapperClass(TagCounterMap.TagCounterMapper.class);
job.setReducerClass(LongSumReducer.class);
return job.waitForCompletion(true) ? 0 : -1;
}
@Test
public void testInitTableMapperJob2() throws Exception {
Configuration configuration = new Configuration();
Job job = new Job(configuration, "tableName");
TableMapReduceUtil.initTableMapperJob(Bytes.toBytes("Table"), new Scan(),
Import.Importer.class, Text.class, Text.class, job, false, WALInputFormat.class);
assertEquals(WALInputFormat.class, job.getInputFormatClass());
assertEquals(Import.Importer.class, job.getMapperClass());
assertEquals(LongWritable.class, job.getOutputKeyClass());
assertEquals(Text.class, job.getOutputValueClass());
assertNull(job.getCombinerClass());
assertEquals("Table", job.getConfiguration().get(TableInputFormat.INPUT_TABLE));
}
private TableInputFormat getDelegate(Configuration conf) throws IOException {
TableInputFormat delegate = new TableInputFormat();
String tableName = HBaseMetadataProvider.getTableName(dataset.getName());
conf.set(TableInputFormat.INPUT_TABLE, tableName);
if (view != null) {
Job tempJob = new Job();
Scan scan = ((BaseEntityScanner) view.newEntityScanner()).getScan();
TableMapReduceUtil.initTableMapperJob(tableName, scan, TableMapper.class, null,
null, tempJob);
Configuration tempConf = Hadoop.JobContext.getConfiguration.invoke(tempJob);
conf.set(SCAN, tempConf.get(SCAN));
}
delegate.setConf(conf);
return delegate;
}
/**
* Submit an event when completeness verification is successful
*/
private void submitVerificationSuccessSlaEvent(Results.Result result) {
try {
CompactionSlaEventHelper.getEventSubmitterBuilder(result.dataset(), Optional.<Job> absent(), this.fs)
.eventSubmitter(this.eventSubmitter).eventName(CompactionSlaEventHelper.COMPLETION_VERIFICATION_SUCCESS_EVENT_NAME)
.additionalMetadata(Maps.transformValues(result.verificationContext(), Functions.toStringFunction())).build()
.submit();
} catch (Throwable t) {
LOG.warn("Failed to submit verification success event:" + t, t);
}
}
@SuppressWarnings("unchecked")
@Override
public void initialize(InputSplit inputSplit,
TaskAttemptContext taskAttemptContext) throws IOException,
InterruptedException {
FileSplit fileSplit = (FileSplit) inputSplit;
path = fileSplit.getPath();
String fileName = path.toUri().toString();
// select the correct load function and initialise
loadFuncHelper = new LoadFuncHelper(
taskAttemptContext.getConfiguration());
FuncSpec funcSpec = loadFuncHelper.determineFunction(fileName);
if (funcSpec == null) {
throw new IOException("Cannot determine LoadFunc for "
+ fileName);
}
selectedLoadFunc = (LoadFunc) PigContext
.instantiateFuncFromSpec(funcSpec);
selectedLoadFunc.setUDFContextSignature(udfSignature);
selectedLoadFunc.setLocation(fileName,
new Job(taskAttemptContext.getConfiguration(),
taskAttemptContext.getJobName()));
selectedReader = selectedLoadFunc.getInputFormat()
.createRecordReader(fileSplit, taskAttemptContext);
selectedReader.initialize(fileSplit, taskAttemptContext);
LOG.info("Using LoadFunc " + selectedLoadFunc.getClass().getName()
+ " on " + fileName);
}
public void testJobControlWithKillJob() throws Exception {
LOG.info("Starting testJobControlWithKillJob");
Configuration conf = createJobConf();
cleanupData(conf);
Job job1 = MapReduceTestUtil.createKillJob(conf, outdir_1, indir);
JobControl theControl = createDependencies(conf, job1);
while (cjob1.getJobState() != ControlledJob.State.RUNNING) {
try {
Thread.sleep(100);
} catch (InterruptedException e) {
break;
}
}
// verify adding dependingJo to RUNNING job fails.
assertFalse(cjob1.addDependingJob(cjob2));
// suspend jobcontrol and resume it again
theControl.suspend();
assertTrue(
theControl.getThreadState() == JobControl.ThreadState.SUSPENDED);
theControl.resume();
// kill the first job.
cjob1.killJob();
// wait till all the jobs complete
waitTillAllFinished(theControl);
assertTrue(cjob1.getJobState() == ControlledJob.State.FAILED);
assertTrue(cjob2.getJobState() == ControlledJob.State.SUCCESS);
assertTrue(cjob3.getJobState() == ControlledJob.State.DEPENDENT_FAILED);
assertTrue(cjob4.getJobState() == ControlledJob.State.DEPENDENT_FAILED);
theControl.stop();
}
private void doTest(int expectedNumDocs) throws Exception {
new DirectoryIngestMapper().getFixture().init(jobConf);
Job job = Job.getInstance(jobConf);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(LWDocumentWritable.class);
List<String> results = runJobSuccessfully(job, expectedNumDocs);
assertNumDocsProcessed(job, expectedNumDocs);
for (String docStr : results) {
assertNotNull(docStr);
}
}