下面列出了org.apache.hadoop.mapreduce.Job#addCacheArchive ( ) 实例代码,或者点击链接到github查看源代码,也可以在右侧发表评论。
protected void packPluginsToDistributedCache(Job job) {
String pluginsRootDir = PluginManager.get().getPluginsRootDir();
if (new File(pluginsRootDir).exists()) {
File pluginsTarGzFile = new File(PINOT_PLUGINS_TAR_GZ);
try {
TarGzCompressionUtils.createTarGzOfDirectory(pluginsRootDir, pluginsTarGzFile.getPath());
} catch (IOException e) {
LOGGER.error("Failed to tar plugins directory", e);
throw new RuntimeException(e);
}
job.addCacheArchive(pluginsTarGzFile.toURI());
String pluginsIncludes = System.getProperty(PLUGINS_INCLUDE_PROPERTY_NAME);
if (pluginsIncludes != null) {
job.getConfiguration().set(PLUGINS_INCLUDE_PROPERTY_NAME, pluginsIncludes);
}
} else {
LOGGER.warn("Cannot find local Pinot plugins directory at [{}]", pluginsRootDir);
}
}
protected void addDepsJarToDistributedCache(Job job, String depsJarDir)
throws IOException {
if (depsJarDir != null) {
URI depsJarDirURI = URI.create(depsJarDir);
if (depsJarDirURI.getScheme() == null) {
depsJarDirURI = new File(depsJarDir).toURI();
}
PinotFS pinotFS = PinotFSFactory.create(depsJarDirURI.getScheme());
String[] files = pinotFS.listFiles(depsJarDirURI, true);
for (String file : files) {
URI fileURI = URI.create(file);
if (!pinotFS.isDirectory(fileURI)) {
if (file.endsWith(".jar")) {
LOGGER.info("Adding deps jar: {} to distributed cache", file);
job.addCacheArchive(fileURI);
}
}
}
}
}
private void testWithConf(Configuration conf) throws IOException,
InterruptedException, ClassNotFoundException, URISyntaxException {
// Create a temporary file of length 1.
Path first = createTempFile("distributed.first", "x");
// Create two jars with a single file inside them.
Path second =
makeJar(new Path(TEST_ROOT_DIR, "distributed.second.jar"), 2);
Path third =
makeJar(new Path(TEST_ROOT_DIR, "distributed.third.jar"), 3);
Path fourth =
makeJar(new Path(TEST_ROOT_DIR, "distributed.fourth.jar"), 4);
Job job = Job.getInstance(conf);
job.setMapperClass(DistributedCacheCheckerMapper.class);
job.setReducerClass(DistributedCacheCheckerReducer.class);
job.setOutputFormatClass(NullOutputFormat.class);
FileInputFormat.setInputPaths(job, first);
// Creates the Job Configuration
job.addCacheFile(
new URI(first.toUri().toString() + "#distributed.first.symlink"));
job.addFileToClassPath(second);
job.addArchiveToClassPath(third);
job.addCacheArchive(fourth.toUri());
job.setMaxMapAttempts(1); // speed up failures
job.submit();
assertTrue(job.waitForCompletion(false));
}
private void testWithConf(Configuration conf) throws IOException,
InterruptedException, ClassNotFoundException, URISyntaxException {
// Create a temporary file of length 1.
Path first = createTempFile("distributed.first", "x");
// Create two jars with a single file inside them.
Path second =
makeJar(new Path(TEST_ROOT_DIR, "distributed.second.jar"), 2);
Path third =
makeJar(new Path(TEST_ROOT_DIR, "distributed.third.jar"), 3);
Path fourth =
makeJar(new Path(TEST_ROOT_DIR, "distributed.fourth.jar"), 4);
Job job = Job.getInstance(conf);
job.setMapperClass(DistributedCacheCheckerMapper.class);
job.setReducerClass(DistributedCacheCheckerReducer.class);
job.setOutputFormatClass(NullOutputFormat.class);
FileInputFormat.setInputPaths(job, first);
// Creates the Job Configuration
job.addCacheFile(
new URI(first.toUri().toString() + "#distributed.first.symlink"));
job.addFileToClassPath(second);
job.addArchiveToClassPath(third);
job.addCacheArchive(fourth.toUri());
job.setMaxMapAttempts(1); // speed up failures
job.submit();
assertTrue(job.waitForCompletion(false));
}
public static void addDepsJarToDistributedCacheHelper(FileSystem fileSystem, Job job, Path depsJarDir)
throws IOException {
FileStatus[] fileStatuses = fileSystem.listStatus(depsJarDir);
for (FileStatus fileStatus : fileStatuses) {
if (fileStatus.isDirectory()) {
addDepsJarToDistributedCacheHelper(fileSystem, job, fileStatus.getPath());
} else {
Path depJarPath = fileStatus.getPath();
if (depJarPath.getName().endsWith(".jar")) {
_logger.info("Adding deps jar: {} to distributed cache", depJarPath);
job.addCacheArchive(depJarPath.toUri());
}
}
}
}
public void _testDistributedCache(String jobJarPath) throws Exception {
if (!(new File(MiniMRYarnCluster.APPJAR)).exists()) {
LOG.info("MRAppJar " + MiniMRYarnCluster.APPJAR
+ " not found. Not running test.");
return;
}
// Create a temporary file of length 1.
Path first = createTempFile("distributed.first", "x");
// Create two jars with a single file inside them.
Path second =
makeJar(new Path(TEST_ROOT_DIR, "distributed.second.jar"), 2);
Path third =
makeJar(new Path(TEST_ROOT_DIR, "distributed.third.jar"), 3);
Path fourth =
makeJar(new Path(TEST_ROOT_DIR, "distributed.fourth.jar"), 4);
Job job = Job.getInstance(mrCluster.getConfig());
// Set the job jar to a new "dummy" jar so we can check that its extracted
// properly
job.setJar(jobJarPath);
// Because the job jar is a "dummy" jar, we need to include the jar with
// DistributedCacheChecker or it won't be able to find it
Path distributedCacheCheckerJar = new Path(
JarFinder.getJar(DistributedCacheChecker.class));
job.addFileToClassPath(distributedCacheCheckerJar.makeQualified(
localFs.getUri(), distributedCacheCheckerJar.getParent()));
job.setMapperClass(DistributedCacheChecker.class);
job.setOutputFormatClass(NullOutputFormat.class);
FileInputFormat.setInputPaths(job, first);
// Creates the Job Configuration
job.addCacheFile(
new URI(first.toUri().toString() + "#distributed.first.symlink"));
job.addFileToClassPath(second);
// The AppMaster jar itself
job.addFileToClassPath(
APP_JAR.makeQualified(localFs.getUri(), APP_JAR.getParent()));
job.addArchiveToClassPath(third);
job.addCacheArchive(fourth.toUri());
job.setMaxMapAttempts(1); // speed up failures
job.submit();
String trackingUrl = job.getTrackingURL();
String jobId = job.getJobID().toString();
Assert.assertTrue(job.waitForCompletion(false));
Assert.assertTrue("Tracking URL was " + trackingUrl +
" but didn't Match Job ID " + jobId ,
trackingUrl.endsWith(jobId.substring(jobId.lastIndexOf("_")) + "/"));
}
public void _testDistributedCache(String jobJarPath) throws Exception {
if (!(new File(MiniMRYarnCluster.APPJAR)).exists()) {
LOG.info("MRAppJar " + MiniMRYarnCluster.APPJAR
+ " not found. Not running test.");
return;
}
// Create a temporary file of length 1.
Path first = createTempFile("distributed.first", "x");
// Create two jars with a single file inside them.
Path second =
makeJar(new Path(TEST_ROOT_DIR, "distributed.second.jar"), 2);
Path third =
makeJar(new Path(TEST_ROOT_DIR, "distributed.third.jar"), 3);
Path fourth =
makeJar(new Path(TEST_ROOT_DIR, "distributed.fourth.jar"), 4);
Job job = Job.getInstance(mrCluster.getConfig());
// Set the job jar to a new "dummy" jar so we can check that its extracted
// properly
job.setJar(jobJarPath);
// Because the job jar is a "dummy" jar, we need to include the jar with
// DistributedCacheChecker or it won't be able to find it
Path distributedCacheCheckerJar = new Path(
JarFinder.getJar(DistributedCacheChecker.class));
job.addFileToClassPath(distributedCacheCheckerJar.makeQualified(
localFs.getUri(), distributedCacheCheckerJar.getParent()));
job.setMapperClass(DistributedCacheChecker.class);
job.setOutputFormatClass(NullOutputFormat.class);
FileInputFormat.setInputPaths(job, first);
// Creates the Job Configuration
job.addCacheFile(
new URI(first.toUri().toString() + "#distributed.first.symlink"));
job.addFileToClassPath(second);
// The AppMaster jar itself
job.addFileToClassPath(
APP_JAR.makeQualified(localFs.getUri(), APP_JAR.getParent()));
job.addArchiveToClassPath(third);
job.addCacheArchive(fourth.toUri());
job.setMaxMapAttempts(1); // speed up failures
job.submit();
String trackingUrl = job.getTrackingURL();
String jobId = job.getJobID().toString();
Assert.assertTrue(job.waitForCompletion(false));
Assert.assertTrue("Tracking URL was " + trackingUrl +
" but didn't Match Job ID " + jobId ,
trackingUrl.endsWith(jobId.substring(jobId.lastIndexOf("_")) + "/"));
}
protected int runPass1RNAJob(Configuration pass1Conf, String tmpOutDir) throws IOException, InterruptedException, ClassNotFoundException, URISyntaxException {
HalvadeConf.setIsPass2(pass1Conf, false);
HalvadeResourceManager.setJobResources(halvadeOpts, pass1Conf, HalvadeResourceManager.RNA_SHMEM_PASS1, halvadeOpts.nodes == 1, halvadeOpts.useBamInput);
int pass2Reduces = HalvadeResourceManager.getPass2Reduces(halvadeOpts);
halvadeOpts.splitChromosomes(pass1Conf, pass2Reduces);
HalvadeConf.setPass2Suffix(pass1Conf, pass2suffix);
Job pass1Job = Job.getInstance(pass1Conf, "Halvade pass 1 RNA pipeline");
pass1Job.addCacheArchive(new URI(halvadeOpts.halvadeBinaries));
pass1Job.setJarByClass(be.ugent.intec.halvade.hadoop.mapreduce.HalvadeMapper.class);
// set pass 2 suffix so only this job finds it!
FileSystem fs = FileSystem.get(new URI(halvadeOpts.in), pass1Conf);
try {
if (fs.getFileStatus(new Path(halvadeOpts.in)).isDirectory()) {
// add every file in directory
FileStatus[] files = fs.listStatus(new Path(halvadeOpts.in));
for(FileStatus file : files) {
if (!file.isDirectory()) {
FileInputFormat.addInputPath(pass1Job, file.getPath());
}
}
} else {
FileInputFormat.addInputPath(pass1Job, new Path(halvadeOpts.in));
}
} catch (IOException | IllegalArgumentException e) {
Logger.EXCEPTION(e);
}
FileSystem outFs = FileSystem.get(new URI(tmpOutDir), pass1Conf);
boolean skipPass1 = false;
if (outFs.exists(new Path(tmpOutDir))) {
// check if genome already exists
skipPass1 = outFs.exists(new Path(tmpOutDir + "/_SUCCESS"));
if(skipPass1)
Logger.DEBUG("pass1 genome already created, skipping pass 1");
else {
Logger.INFO("The output directory \'" + tmpOutDir + "\' already exists.");
Logger.INFO("ERROR: Please remove this directory before trying again.");
System.exit(-2);
}
}
if(!skipPass1) {
FileOutputFormat.setOutputPath(pass1Job, new Path(tmpOutDir));
pass1Job.setMapperClass(be.ugent.intec.halvade.hadoop.mapreduce.StarAlignPassXMapper.class);
pass1Job.setInputFormatClass(HalvadeTextInputFormat.class);
pass1Job.setMapOutputKeyClass(GenomeSJ.class);
pass1Job.setMapOutputValueClass(Text.class);
pass1Job.setSortComparatorClass(GenomeSJSortComparator.class);
pass1Job.setGroupingComparatorClass(GenomeSJGroupingComparator.class);
pass1Job.setNumReduceTasks(1);
pass1Job.setReducerClass(be.ugent.intec.halvade.hadoop.mapreduce.RebuildStarGenomeReducer.class);
pass1Job.setOutputKeyClass(LongWritable.class);
pass1Job.setOutputValueClass(Text.class);
return runTimedJob(pass1Job, "Halvade pass 1 Job");
} else
return 0;
}
protected int runHalvadeJob(Configuration halvadeConf, String tmpOutDir, int jobType) throws IOException, URISyntaxException, InterruptedException, ClassNotFoundException {
String pipeline = "";
if(jobType == HalvadeResourceManager.RNA_SHMEM_PASS2) {
HalvadeConf.setIsPass2(halvadeConf, true);
HalvadeResourceManager.setJobResources(halvadeOpts, halvadeConf, jobType, false, halvadeOpts.useBamInput);
pipeline = RNA_PASS2;
} else if(jobType == HalvadeResourceManager.DNA) {
HalvadeResourceManager.setJobResources(halvadeOpts, halvadeConf, jobType, false, halvadeOpts.useBamInput);
pipeline = DNA;
}
halvadeOpts.splitChromosomes(halvadeConf, 0);
HalvadeConf.setOutDir(halvadeConf, tmpOutDir);
FileSystem outFs = FileSystem.get(new URI(tmpOutDir), halvadeConf);
if (outFs.exists(new Path(tmpOutDir))) {
Logger.INFO("The output directory \'" + tmpOutDir + "\' already exists.");
Logger.INFO("ERROR: Please remove this directory before trying again.");
System.exit(-2);
}
if(halvadeOpts.useBamInput)
setHeaderFile(halvadeOpts.in, halvadeConf);
if(halvadeOpts.rnaPipeline)
HalvadeConf.setPass2Suffix(halvadeConf, pass2suffix);
Job halvadeJob = Job.getInstance(halvadeConf, "Halvade" + pipeline);
halvadeJob.addCacheArchive(new URI(halvadeOpts.halvadeBinaries));
halvadeJob.setJarByClass(be.ugent.intec.halvade.hadoop.mapreduce.HalvadeMapper.class);
addInputFiles(halvadeOpts.in, halvadeConf, halvadeJob);
FileOutputFormat.setOutputPath(halvadeJob, new Path(tmpOutDir));
if(jobType == HalvadeResourceManager.RNA_SHMEM_PASS2) {
halvadeJob.setMapperClass(be.ugent.intec.halvade.hadoop.mapreduce.StarAlignPassXMapper.class);
halvadeJob.setReducerClass(be.ugent.intec.halvade.hadoop.mapreduce.RnaGATKReducer.class);
} else if(jobType == HalvadeResourceManager.DNA){
halvadeJob.setMapperClass(halvadeOpts.alignmentTools[halvadeOpts.aln]);
halvadeJob.setReducerClass(be.ugent.intec.halvade.hadoop.mapreduce.DnaGATKReducer.class);
}
halvadeJob.setMapOutputKeyClass(ChromosomeRegion.class);
halvadeJob.setMapOutputValueClass(SAMRecordWritable.class);
halvadeJob.setInputFormatClass(HalvadeTextInputFormat.class);
halvadeJob.setOutputKeyClass(Text.class);
if(halvadeOpts.mergeBam) {
halvadeJob.setSortComparatorClass(SimpleChrRegionComparator.class);
halvadeJob.setOutputValueClass(SAMRecordWritable.class);
}else {
halvadeJob.setPartitionerClass(ChrRgPartitioner.class);
halvadeJob.setSortComparatorClass(ChrRgSortComparator.class);
halvadeJob.setGroupingComparatorClass(ChrRgGroupingComparator.class);
halvadeJob.setOutputValueClass(VariantContextWritable.class);
}
if(halvadeOpts.justAlign && !halvadeOpts.mergeBam)
halvadeJob.setNumReduceTasks(0);
else if (halvadeOpts.mergeBam) {
halvadeJob.setReducerClass(be.ugent.intec.halvade.hadoop.mapreduce.BamMergeReducer.class);
halvadeJob.setNumReduceTasks(1);
} else {
halvadeJob.setNumReduceTasks(halvadeOpts.reduces);
if(halvadeOpts.countOnly) {
halvadeJob.setReducerClass(be.ugent.intec.halvade.hadoop.mapreduce.CountReadsReducer.class);
halvadeJob.setOutputValueClass(LongWritable.class);
}
}
if(halvadeOpts.useBamInput) {
halvadeJob.setMapperClass(be.ugent.intec.halvade.hadoop.mapreduce.AlignedBamMapper.class);
halvadeJob.setInputFormatClass(BAMInputFormat.class);
}
return runTimedJob(halvadeJob, "Halvade Job");
}