下面列出了org.apache.hadoop.mapreduce.Job#setSpeculativeExecution ( ) 实例代码,或者点击链接到github查看源代码,也可以在右侧发表评论。
public Job createJob()
throws IOException {
Configuration conf = getConf();
conf.setInt(MRJobConfig.NUM_MAPS, 1);
Job job = Job.getInstance(conf, "test");
job.setNumReduceTasks(1);
job.setJarByClass(CredentialsTestJob.class);
job.setNumReduceTasks(1);
job.setMapperClass(CredentialsTestJob.CredentialsTestMapper.class);
job.setMapOutputKeyClass(IntWritable.class);
job.setMapOutputValueClass(NullWritable.class);
job.setReducerClass(CredentialsTestJob.CredentialsTestReducer.class);
job.setInputFormatClass(SleepJob.SleepInputFormat.class);
job.setPartitionerClass(SleepJob.SleepJobPartitioner.class);
job.setOutputFormatClass(NullOutputFormat.class);
job.setSpeculativeExecution(false);
job.setJobName("test job");
FileInputFormat.addInputPath(job, new Path("ignored"));
return job;
}
public Job createJob()
throws IOException {
Configuration conf = getConf();
conf.setInt(MRJobConfig.NUM_MAPS, 1);
Job job = Job.getInstance(conf, "test");
job.setNumReduceTasks(1);
job.setJarByClass(CredentialsTestJob.class);
job.setNumReduceTasks(1);
job.setMapperClass(CredentialsTestJob.CredentialsTestMapper.class);
job.setMapOutputKeyClass(IntWritable.class);
job.setMapOutputValueClass(NullWritable.class);
job.setReducerClass(CredentialsTestJob.CredentialsTestReducer.class);
job.setInputFormatClass(SleepJob.SleepInputFormat.class);
job.setPartitionerClass(SleepJob.SleepJobPartitioner.class);
job.setOutputFormatClass(NullOutputFormat.class);
job.setSpeculativeExecution(false);
job.setJobName("test job");
FileInputFormat.addInputPath(job, new Path("ignored"));
return job;
}
public Job createJob(boolean failMappers, boolean failReducers, Path inputFile)
throws IOException {
Configuration conf = getConf();
conf.setBoolean(FAIL_MAP, failMappers);
conf.setBoolean(FAIL_REDUCE, failReducers);
Job job = Job.getInstance(conf, "fail");
job.setJarByClass(FailJob.class);
job.setMapperClass(FailMapper.class);
job.setMapOutputKeyClass(LongWritable.class);
job.setMapOutputValueClass(NullWritable.class);
job.setReducerClass(FailReducer.class);
job.setOutputFormatClass(NullOutputFormat.class);
job.setInputFormatClass(TextInputFormat.class);
job.setSpeculativeExecution(false);
job.setJobName("Fail job");
FileInputFormat.addInputPath(job, inputFile);
return job;
}
@Override
protected void configure(final Job job) throws Exception {
job.setJobName("GeoWave Dedupe (" + dataStoreOptions.getGeoWaveNamespace() + ")");
job.setMapperClass(GeoWaveDedupeMapper.class);
job.setCombinerClass(GeoWaveDedupeCombiner.class);
job.setReducerClass(getReducer());
job.setMapOutputKeyClass(GeoWaveInputKey.class);
job.setMapOutputValueClass(ObjectWritable.class);
job.setOutputKeyClass(GeoWaveInputKey.class);
job.setOutputValueClass(ObjectWritable.class);
job.setInputFormatClass(GeoWaveInputFormat.class);
job.setOutputFormatClass(getOutputFormatClass());
job.setNumReduceTasks(getNumReduceTasks());
job.setSpeculativeExecution(false);
try (final FileSystem fs = FileSystem.get(job.getConfiguration())) {
final Path outputPath = getHdfsOutputPath();
fs.delete(outputPath, true);
FileOutputFormat.setOutputPath(job, outputPath);
}
}
public static void main(String[] args) throws Exception {
Opts opts = new Opts();
opts.parseArgs(NGramIngest.class.getName(), args);
Job job = Job.getInstance(opts.getHadoopConfig());
job.setJobName(NGramIngest.class.getSimpleName());
job.setJarByClass(NGramIngest.class);
job.setInputFormatClass(TextInputFormat.class);
job.setOutputFormatClass(AccumuloOutputFormat.class);
AccumuloOutputFormat.configure().clientProperties(opts.getClientProperties())
.defaultTable(opts.tableName).store(job);
job.setMapperClass(NGramMapper.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(Mutation.class);
job.setNumReduceTasks(0);
job.setSpeculativeExecution(false);
try (AccumuloClient client = opts.createAccumuloClient()) {
if (!client.tableOperations().exists(opts.tableName)) {
log.info("Creating table " + opts.tableName);
client.tableOperations().create(opts.tableName);
SortedSet<Text> splits = new TreeSet<>();
String numbers[] = "1 2 3 4 5 6 7 8 9".split("\\s");
String lower[] = "a b c d e f g h i j k l m n o p q r s t u v w x y z".split("\\s");
String upper[] = "A B C D E F G H I J K L M N O P Q R S T U V W X Y Z".split("\\s");
for (String[] array : new String[][] {numbers, lower, upper}) {
for (String s : array) {
splits.add(new Text(s));
}
}
client.tableOperations().addSplits(opts.tableName, splits);
}
}
TextInputFormat.addInputPath(job, new Path(opts.inputDirectory));
System.exit(job.waitForCompletion(true) ? 0 : 1);
}
@Override
public void configure(final Job job) throws Exception {
job.setMapperClass(Mapper.class);
job.setReducerClass(InputToOutputKeyReducer.class);
job.setMapOutputKeyClass(GeoWaveInputKey.class);
job.setMapOutputValueClass(ObjectWritable.class);
job.setOutputKeyClass(GeoWaveOutputKey.class);
job.setOutputValueClass(Object.class);
job.setSpeculativeExecution(false);
job.setJobName("GeoWave Input to Output");
job.setReduceSpeculativeExecution(false);
}
@Override
public void configure(final Job job) throws Exception {
job.setMapperClass(NNMapReduce.NNMapper.class);
job.setReducerClass(NNMapReduce.NNSimpleFeatureIDOutputReducer.class);
job.setMapOutputKeyClass(PartitionDataWritable.class);
job.setMapOutputValueClass(AdapterWithObjectWritable.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Text.class);
job.setSpeculativeExecution(false);
}
@Test (timeout = 60000)
public void testRandomWriter() throws IOException, InterruptedException,
ClassNotFoundException {
LOG.info("\n\n\nStarting testRandomWriter().");
if (!(new File(MiniTezCluster.APPJAR)).exists()) {
LOG.info("MRAppJar " + MiniTezCluster.APPJAR
+ " not found. Not running test.");
return;
}
RandomTextWriterJob randomWriterJob = new RandomTextWriterJob();
mrrTezCluster.getConfig().set(RandomTextWriterJob.TOTAL_BYTES, "3072");
mrrTezCluster.getConfig().set(RandomTextWriterJob.BYTES_PER_MAP, "1024");
Job job = randomWriterJob.createJob(mrrTezCluster.getConfig());
Path outputDir = new Path(OUTPUT_ROOT_DIR, "random-output");
FileOutputFormat.setOutputPath(job, outputDir);
job.setSpeculativeExecution(false);
job.setJarByClass(RandomTextWriterJob.class);
job.setMaxMapAttempts(1); // speed up failures
job.submit();
String trackingUrl = job.getTrackingURL();
String jobId = job.getJobID().toString();
boolean succeeded = job.waitForCompletion(true);
Assert.assertTrue(succeeded);
Assert.assertEquals(JobStatus.State.SUCCEEDED, job.getJobState());
Assert.assertTrue("Tracking URL was " + trackingUrl +
" but didn't Match Job ID " + jobId ,
trackingUrl.endsWith(jobId.substring(jobId.lastIndexOf("_")) + "/"));
// Make sure there are three files in the output-dir
RemoteIterator<FileStatus> iterator =
FileContext.getFileContext(mrrTezCluster.getConfig()).listStatus(
outputDir);
int count = 0;
while (iterator.hasNext()) {
FileStatus file = iterator.next();
if (!file.getPath().getName()
.equals(FileOutputCommitter.SUCCEEDED_FILE_NAME)) {
count++;
}
}
Assert.assertEquals("Number of part files is wrong!", 3, count);
}
@VisibleForTesting
public Job createJob(int numMapper, int numReducer, int iReduceStagesCount,
int numIReducer, long mapSleepTime, int mapSleepCount,
long reduceSleepTime, int reduceSleepCount,
long iReduceSleepTime, int iReduceSleepCount)
throws IOException {
Configuration conf = getConf();
conf.setLong(MAP_SLEEP_TIME, mapSleepTime);
conf.setLong(REDUCE_SLEEP_TIME, reduceSleepTime);
conf.setLong(IREDUCE_SLEEP_TIME, iReduceSleepTime);
conf.setInt(MAP_SLEEP_COUNT, mapSleepCount);
conf.setInt(REDUCE_SLEEP_COUNT, reduceSleepCount);
conf.setInt(IREDUCE_SLEEP_COUNT, iReduceSleepCount);
conf.setInt(MRJobConfig.NUM_MAPS, numMapper);
conf.setInt(IREDUCE_STAGES_COUNT, iReduceStagesCount);
conf.setInt(IREDUCE_TASKS_COUNT, numIReducer);
// Configure intermediate reduces
conf.setInt(
org.apache.tez.mapreduce.hadoop.MRJobConfig.MRR_INTERMEDIATE_STAGES,
iReduceStagesCount);
LOG.info("Running MRR with " + iReduceStagesCount + " IR stages");
for (int i = 1; i <= iReduceStagesCount; ++i) {
// Set reducer class for intermediate reduce
conf.setClass(
MultiStageMRConfigUtil.getPropertyNameForIntermediateStage(i,
"mapreduce.job.reduce.class"), ISleepReducer.class, Reducer.class);
// Set reducer output key class
conf.setClass(
MultiStageMRConfigUtil.getPropertyNameForIntermediateStage(i,
"mapreduce.map.output.key.class"), IntWritable.class, Object.class);
// Set reducer output value class
conf.setClass(
MultiStageMRConfigUtil.getPropertyNameForIntermediateStage(i,
"mapreduce.map.output.value.class"), IntWritable.class, Object.class);
conf.setInt(
MultiStageMRConfigUtil.getPropertyNameForIntermediateStage(i,
"mapreduce.job.reduces"), numIReducer);
}
Job job = Job.getInstance(conf, "sleep");
job.setNumReduceTasks(numReducer);
job.setJarByClass(MRRSleepJob.class);
job.setNumReduceTasks(numReducer);
job.setMapperClass(SleepMapper.class);
job.setMapOutputKeyClass(IntWritable.class);
job.setMapOutputValueClass(IntWritable.class);
job.setReducerClass(SleepReducer.class);
job.setOutputFormatClass(NullOutputFormat.class);
job.setInputFormatClass(SleepInputFormat.class);
job.setPartitionerClass(MRRSleepJobPartitioner.class);
job.setSpeculativeExecution(false);
job.setJobName("Sleep job");
FileInputFormat.addInputPath(job, new Path("ignored"));
return job;
}
private boolean multiplyColumns(Path outPathInit, Path outPathColumnMult) throws IOException, ClassNotFoundException, InterruptedException
{
boolean success;
Job columnMultJob = Job.getInstance(conf, "pir_columnMult");
columnMultJob.setSpeculativeExecution(false);
String columnMultJobName = "pir_columnMult";
// Set the same job configs as for the first iteration
columnMultJob.getConfiguration().set("mapreduce.map.memory.mb", SystemConfiguration.getProperty("mapreduce.map.memory.mb", "2000"));
columnMultJob.getConfiguration().set("mapreduce.reduce.memory.mb", SystemConfiguration.getProperty("mapreduce.reduce.memory.mb", "2000"));
columnMultJob.getConfiguration().set("mapreduce.map.java.opts", SystemConfiguration.getProperty("mapreduce.map.java.opts", "-Xmx1800m"));
columnMultJob.getConfiguration().set("mapreduce.reduce.java.opts", SystemConfiguration.getProperty("mapreduce.reduce.java.opts", "-Xmx1800m"));
columnMultJob.getConfiguration().set("mapreduce.map.speculative", "false");
columnMultJob.getConfiguration().set("mapreduce.reduce.speculative", "false");
columnMultJob.getConfiguration().set("pirMR.queryInputDir", SystemConfiguration.getProperty("pir.queryInput"));
columnMultJob.setJobName(columnMultJobName);
columnMultJob.setJarByClass(ColumnMultMapper.class);
columnMultJob.setNumReduceTasks(numReduceTasks);
// Set the Mapper, InputFormat, and input path
columnMultJob.setMapperClass(ColumnMultMapper.class);
columnMultJob.setInputFormatClass(TextInputFormat.class);
FileStatus[] status = fs.listStatus(outPathInit);
for (FileStatus fstat : status)
{
if (fstat.getPath().getName().startsWith(FileConst.PIR))
{
logger.info("fstat.getPath() = " + fstat.getPath().toString());
FileInputFormat.addInputPath(columnMultJob, fstat.getPath());
}
}
columnMultJob.setMapOutputKeyClass(LongWritable.class);
columnMultJob.setMapOutputValueClass(Text.class);
// Set the reducer and output options
columnMultJob.setReducerClass(ColumnMultReducer.class);
columnMultJob.setOutputKeyClass(LongWritable.class);
columnMultJob.setOutputValueClass(Text.class);
columnMultJob.getConfiguration().set("mapreduce.output.textoutputformat.separator", ",");
// Delete the output file, if it exists
if (fs.exists(outPathColumnMult))
{
fs.delete(outPathColumnMult, true);
}
FileOutputFormat.setOutputPath(columnMultJob, outPathColumnMult);
MultipleOutputs.addNamedOutput(columnMultJob, FileConst.PIR_COLS, TextOutputFormat.class, LongWritable.class, Text.class);
// Submit job, wait for completion
success = columnMultJob.waitForCompletion(true);
return success;
}
private boolean computeFinalResponse(Path outPathFinal) throws ClassNotFoundException, IOException, InterruptedException
{
boolean success;
Job finalResponseJob = Job.getInstance(conf, "pir_finalResponse");
finalResponseJob.setSpeculativeExecution(false);
String finalResponseJobName = "pir_finalResponse";
// Set the same job configs as for the first iteration
finalResponseJob.getConfiguration().set("mapreduce.map.memory.mb", SystemConfiguration.getProperty("mapreduce.map.memory.mb", "2000"));
finalResponseJob.getConfiguration().set("mapreduce.reduce.memory.mb", SystemConfiguration.getProperty("mapreduce.reduce.memory.mb", "2000"));
finalResponseJob.getConfiguration().set("mapreduce.map.java.opts", SystemConfiguration.getProperty("mapreduce.map.java.opts", "-Xmx1800m"));
finalResponseJob.getConfiguration().set("mapreduce.reduce.java.opts", SystemConfiguration.getProperty("mapreduce.reduce.java.opts", "-Xmx1800m"));
finalResponseJob.getConfiguration().set("pirMR.queryInputDir", SystemConfiguration.getProperty("pir.queryInput"));
finalResponseJob.getConfiguration().set("pirMR.outputFile", outputFile);
finalResponseJob.getConfiguration().set("mapreduce.map.speculative", "false");
finalResponseJob.getConfiguration().set("mapreduce.reduce.speculative", "false");
finalResponseJob.setJobName(finalResponseJobName);
finalResponseJob.setJarByClass(ColumnMultMapper.class);
finalResponseJob.setNumReduceTasks(1);
// Set the Mapper, InputFormat, and input path
finalResponseJob.setMapperClass(ColumnMultMapper.class);
finalResponseJob.setInputFormatClass(TextInputFormat.class);
FileStatus[] status = fs.listStatus(new Path(outputDirColumnMult));
for (FileStatus fstat : status)
{
if (fstat.getPath().getName().startsWith(FileConst.PIR_COLS))
{
logger.info("fstat.getPath() = " + fstat.getPath().toString());
FileInputFormat.addInputPath(finalResponseJob, fstat.getPath());
}
}
finalResponseJob.setMapOutputKeyClass(LongWritable.class);
finalResponseJob.setMapOutputValueClass(Text.class);
// Set the reducer and output options
finalResponseJob.setReducerClass(FinalResponseReducer.class);
finalResponseJob.setOutputKeyClass(LongWritable.class);
finalResponseJob.setOutputValueClass(Text.class);
finalResponseJob.getConfiguration().set("mapreduce.output.textoutputformat.separator", ",");
// Delete the output file, if it exists
if (fs.exists(outPathFinal))
{
fs.delete(outPathFinal, true);
}
FileOutputFormat.setOutputPath(finalResponseJob, outPathFinal);
MultipleOutputs.addNamedOutput(finalResponseJob, FileConst.PIR_FINAL, TextOutputFormat.class, LongWritable.class, Text.class);
// Submit job, wait for completion
success = finalResponseJob.waitForCompletion(true);
return success;
}
@Test (timeout = 60000)
public void testRandomWriter() throws IOException, InterruptedException,
ClassNotFoundException {
LOG.info("\n\n\nStarting testRandomWriter().");
if (!(new File(MiniMRYarnCluster.APPJAR)).exists()) {
LOG.info("MRAppJar " + MiniMRYarnCluster.APPJAR
+ " not found. Not running test.");
return;
}
RandomTextWriterJob randomWriterJob = new RandomTextWriterJob();
mrCluster.getConfig().set(RandomTextWriterJob.TOTAL_BYTES, "3072");
mrCluster.getConfig().set(RandomTextWriterJob.BYTES_PER_MAP, "1024");
Job job = randomWriterJob.createJob(mrCluster.getConfig());
Path outputDir = new Path(OUTPUT_ROOT_DIR, "random-output");
FileOutputFormat.setOutputPath(job, outputDir);
job.setSpeculativeExecution(false);
job.addFileToClassPath(APP_JAR); // The AppMaster jar itself.
job.setJarByClass(RandomTextWriterJob.class);
job.setMaxMapAttempts(1); // speed up failures
job.submit();
String trackingUrl = job.getTrackingURL();
String jobId = job.getJobID().toString();
boolean succeeded = job.waitForCompletion(true);
Assert.assertTrue(succeeded);
Assert.assertEquals(JobStatus.State.SUCCEEDED, job.getJobState());
Assert.assertTrue("Tracking URL was " + trackingUrl +
" but didn't Match Job ID " + jobId ,
trackingUrl.endsWith(jobId.substring(jobId.lastIndexOf("_")) + "/"));
// Make sure there are three files in the output-dir
RemoteIterator<FileStatus> iterator =
FileContext.getFileContext(mrCluster.getConfig()).listStatus(
outputDir);
int count = 0;
while (iterator.hasNext()) {
FileStatus file = iterator.next();
if (!file.getPath().getName()
.equals(FileOutputCommitter.SUCCEEDED_FILE_NAME)) {
count++;
}
}
Assert.assertEquals("Number of part files is wrong!", 3, count);
verifyRandomWriterCounters(job);
// TODO later: add explicit "isUber()" checks of some sort
}
/**
* Run a map/reduce job for estimating Pi.
*
* @return the estimated value of Pi
*/
@SuppressWarnings("finally")
public static BigDecimal estimate(int numMaps, long numPoints, Job job
) throws IOException {
//setup job conf
job.setJobName(PiEstimator.class.getSimpleName());
job.setInputFormatClass(SequenceFileInputFormat.class);
job.setOutputKeyClass(BooleanWritable.class);
job.setOutputValueClass(LongWritable.class);
job.setOutputFormatClass(SequenceFileOutputFormat.class);
job.setMapperClass(PiMapper.class);
job.setReducerClass(PiReducer.class);
job.setNumReduceTasks(1);
// turn off speculative execution, because DFS doesn't handle
// multiple writers to the same file.
job.setSpeculativeExecution(false);
//setup input/output directories
//final Path inDir = new Path(TMP_DIR, "in");
final Path inDir = new Path("/home/hadoop1/tmp_dir", "in");
System.out.println("inDir =" + inDir.toString());
//final Path outDir = new Path(TMP_DIR, "out");
final Path outDir = new Path("/home/hadoop1/tmp_dir", "out");
System.out.println("outDir =" + outDir.toString());
FileInputFormat.addInputPath(job, inDir);
FileOutputFormat.setOutputPath(job, outDir);
final FileSystem fs = FileSystem.get(job.getConfiguration());
if (fs.exists(TMP_DIR)) {
throw new IOException("Tmp directory " + fs.makeQualified(TMP_DIR)
+ " already exists. Please remove it first.");
}
if (!fs.mkdirs(inDir)) {
throw new IOException("Cannot create input directory " + inDir);
}
try {
//generate an input file for each map task
for(int i=0; i < numMaps; ++i) {
final Path file = new Path(inDir, "part"+i);
final LongWritable offset = new LongWritable(i * numPoints);
final LongWritable size = new LongWritable(numPoints);
final SequenceFile.Writer writer = SequenceFile.createWriter(
fs, job.getConfiguration(), file,
LongWritable.class, LongWritable.class, CompressionType.NONE);
try {
writer.append(offset, size);
} finally {
writer.close();
}
System.out.println("Wrote input for Map #"+i);
}
//start a map/reduce job
System.out.println("Starting Job");
final long startTime = System.currentTimeMillis();
Boolean waitforCompletion = job.waitForCompletion(true) ;
final double duration = (System.currentTimeMillis() - startTime)/1000.0;
System.out.println("Job Finished in " + duration + " seconds");
//read outputs
Path inFile = new Path(outDir, "reduce-out");
LongWritable numInside = new LongWritable();
LongWritable numOutside = new LongWritable();
SequenceFile.Reader reader = new SequenceFile.Reader(fs, inFile, job.getConfiguration());
try {
reader.next(numInside, numOutside);
} finally {
reader.close();
}
//compute estimated value
return BigDecimal.valueOf(4).setScale(20)
.multiply(BigDecimal.valueOf(numInside.get()))
.divide(BigDecimal.valueOf(numMaps))
.divide(BigDecimal.valueOf(numPoints));
}catch (InterruptedException e){
System.out.println("Job Exception " + e.getMessage() );
} finally {
fs.delete(TMP_DIR, true);
return BigDecimal.valueOf(4);
}
}
@Test (timeout = 60000)
public void testRandomWriter() throws IOException, InterruptedException,
ClassNotFoundException {
LOG.info("\n\n\nStarting testRandomWriter().");
if (!(new File(MiniTezCluster.APPJAR)).exists()) {
LOG.info("MRAppJar " + MiniTezCluster.APPJAR
+ " not found. Not running test.");
return;
}
RandomTextWriterJob randomWriterJob = new RandomTextWriterJob();
mrrTezCluster.getConfig().set(RandomTextWriterJob.TOTAL_BYTES, "3072");
mrrTezCluster.getConfig().set(RandomTextWriterJob.BYTES_PER_MAP, "1024");
Job job = randomWriterJob.createJob(mrrTezCluster.getConfig());
Path outputDir = new Path(OUTPUT_ROOT_DIR, "random-output");
FileOutputFormat.setOutputPath(job, outputDir);
job.setSpeculativeExecution(false);
job.setJarByClass(RandomTextWriterJob.class);
job.setMaxMapAttempts(1); // speed up failures
job.submit();
String trackingUrl = job.getTrackingURL();
String jobId = job.getJobID().toString();
boolean succeeded = job.waitForCompletion(true);
Assert.assertTrue(succeeded);
Assert.assertEquals(JobStatus.State.SUCCEEDED, job.getJobState());
Assert.assertTrue("Tracking URL was " + trackingUrl +
" but didn't Match Job ID " + jobId ,
trackingUrl.contains(jobId.substring(jobId.indexOf("_"))));
// Make sure there are three files in the output-dir
RemoteIterator<FileStatus> iterator =
FileContext.getFileContext(mrrTezCluster.getConfig()).listStatus(
outputDir);
int count = 0;
while (iterator.hasNext()) {
FileStatus file = iterator.next();
if (!file.getPath().getName()
.equals(FileOutputCommitter.SUCCEEDED_FILE_NAME)) {
count++;
}
}
Assert.assertEquals("Number of part files is wrong!", 3, count);
}
@Override
protected int run(CommandLine cmd) throws Exception {
String source = cmd.getOptionValue('s');
String workdir = cmd.getOptionValue('w');
String target = cmd.getOptionValue('t');
getConf().setBoolean(SKIP_INVALID_PROPERTY, cmd.hasOption('i'));
getConf().setBoolean(VERIFY_DATATYPE_VALUES_PROPERTY, cmd.hasOption('d'));
getConf().setBoolean(TRUNCATE_PROPERTY, cmd.hasOption('r'));
getConf().setInt(SPLIT_BITS_PROPERTY, Integer.parseInt(cmd.getOptionValue('b', "3")));
if (cmd.hasOption('g')) getConf().set(DEFAULT_CONTEXT_PROPERTY, cmd.getOptionValue('g'));
getConf().setBoolean(OVERRIDE_CONTEXT_PROPERTY, cmd.hasOption('o'));
getConf().setLong(DEFAULT_TIMESTAMP_PROPERTY, Long.parseLong(cmd.getOptionValue('e', String.valueOf(System.currentTimeMillis()))));
if (cmd.hasOption('m')) getConf().setLong("mapreduce.input.fileinputformat.split.maxsize", Long.parseLong(cmd.getOptionValue('m')));
TableMapReduceUtil.addDependencyJars(getConf(),
NTriplesUtil.class,
Rio.class,
AbstractRDFHandler.class,
RDFFormat.class,
RDFParser.class);
HBaseConfiguration.addHbaseResources(getConf());
Job job = Job.getInstance(getConf(), "HalyardBulkLoad -> " + workdir + " -> " + target);
job.setJarByClass(HalyardBulkLoad.class);
job.setMapperClass(RDFMapper.class);
job.setMapOutputKeyClass(ImmutableBytesWritable.class);
job.setMapOutputValueClass(KeyValue.class);
job.setInputFormatClass(RioFileInputFormat.class);
job.setSpeculativeExecution(false);
job.setReduceSpeculativeExecution(false);
try (HTable hTable = HalyardTableUtils.getTable(getConf(), target, true, getConf().getInt(SPLIT_BITS_PROPERTY, 3))) {
HFileOutputFormat2.configureIncrementalLoad(job, hTable.getTableDescriptor(), hTable.getRegionLocator());
FileInputFormat.setInputDirRecursive(job, true);
FileInputFormat.setInputPaths(job, source);
FileOutputFormat.setOutputPath(job, new Path(workdir));
TableMapReduceUtil.addDependencyJars(job);
TableMapReduceUtil.initCredentials(job);
if (job.waitForCompletion(true)) {
if (getConf().getBoolean(TRUNCATE_PROPERTY, false)) {
HalyardTableUtils.truncateTable(hTable).close();
}
new LoadIncrementalHFiles(getConf()).doBulkLoad(new Path(workdir), hTable);
LOG.info("Bulk Load Completed..");
return 0;
}
}
return -1;
}
public int run(CommandLine cmd) throws Exception {
String source = cmd.getOptionValue('s');
String queryFiles = cmd.getOptionValue('q');
String workdir = cmd.getOptionValue('w');
getConf().setLong(DEFAULT_TIMESTAMP_PROPERTY, Long.parseLong(cmd.getOptionValue('e', String.valueOf(System.currentTimeMillis()))));
if (cmd.hasOption('i')) getConf().set(ELASTIC_INDEX_URL, cmd.getOptionValue('i'));
TableMapReduceUtil.addDependencyJars(getConf(),
HalyardExport.class,
NTriplesUtil.class,
Rio.class,
AbstractRDFHandler.class,
RDFFormat.class,
RDFParser.class,
HTable.class,
HBaseConfiguration.class,
AuthenticationProtos.class,
Trace.class,
Gauge.class);
HBaseConfiguration.addHbaseResources(getConf());
getConf().setStrings(TABLE_NAME_PROPERTY, source);
getConf().setLong(DEFAULT_TIMESTAMP_PROPERTY, getConf().getLong(DEFAULT_TIMESTAMP_PROPERTY, System.currentTimeMillis()));
int stages = 1;
for (int stage = 0; stage < stages; stage++) {
Job job = Job.getInstance(getConf(), "HalyardBulkUpdate -> " + workdir + " -> " + source + " stage #" + stage);
job.getConfiguration().setInt(STAGE_PROPERTY, stage);
job.setJarByClass(HalyardBulkUpdate.class);
job.setMapperClass(SPARQLUpdateMapper.class);
job.setMapOutputKeyClass(ImmutableBytesWritable.class);
job.setMapOutputValueClass(KeyValue.class);
job.setInputFormatClass(QueryInputFormat.class);
job.setSpeculativeExecution(false);
job.setReduceSpeculativeExecution(false);
try (HTable hTable = HalyardTableUtils.getTable(getConf(), source, false, 0)) {
HFileOutputFormat2.configureIncrementalLoad(job, hTable.getTableDescriptor(), hTable.getRegionLocator());
QueryInputFormat.setQueriesFromDirRecursive(job.getConfiguration(), queryFiles, true, stage);
Path outPath = new Path(workdir, "stage"+stage);
FileOutputFormat.setOutputPath(job, outPath);
TableMapReduceUtil.addDependencyJars(job);
TableMapReduceUtil.initCredentials(job);
if (stage == 0) { //count real number of stages
for (InputSplit is : new QueryInputFormat().getSplits(job)) {
QueryInputFormat.QueryInputSplit qis = (QueryInputFormat.QueryInputSplit)is;
int updates = QueryParserUtil.parseUpdate(QueryLanguage.SPARQL, qis.getQuery(), null).getUpdateExprs().size();
if (updates > stages) {
stages = updates;
}
LOG.log(Level.INFO, "{0} contains {1} stages of the update sequence.", new Object[]{qis.getQueryName(), updates});
}
LOG.log(Level.INFO, "Bulk Update will process {0} MapReduce stages.", stages);
}
if (job.waitForCompletion(true)) {
new LoadIncrementalHFiles(getConf()).doBulkLoad(outPath, hTable);
LOG.log(Level.INFO, "Stage #{0} of {1} completed..", new Object[]{stage, stages});
} else {
return -1;
}
}
}
LOG.info("Bulk Update Completed..");
return 0;
}
/**
* Run a map/reduce job for estimating Pi.
*
* @return the estimated value of Pi
*/
public static JobID submitPiEstimationMRApp(String jobName, int numMaps, long numPoints,
Path tmpDir, Configuration conf
) throws IOException, ClassNotFoundException, InterruptedException {
Job job = new Job(conf);
//setup job conf
job.setJobName(jobName);
job.setJarByClass(QuasiMonteCarlo.class);
job.setInputFormatClass(SequenceFileInputFormat.class);
job.setOutputKeyClass(BooleanWritable.class);
job.setOutputValueClass(LongWritable.class);
job.setOutputFormatClass(SequenceFileOutputFormat.class);
job.setMapperClass(QmcMapper.class);
job.setReducerClass(QmcReducer.class);
job.setNumReduceTasks(1);
// turn off speculative execution, because DFS doesn't handle
// multiple writers to the same file.
job.setSpeculativeExecution(false);
//setup input/output directories
final Path inDir = new Path(tmpDir, "in");
final Path outDir = new Path(tmpDir, "out");
FileInputFormat.setInputPaths(job, inDir);
FileOutputFormat.setOutputPath(job, outDir);
final FileSystem fs = FileSystem.get(conf);
if (fs.exists(tmpDir)) {
fs.delete(tmpDir, true);
// throw new IOException("Tmp directory " + fs.makeQualified(tmpDir)
// + " already exists. Please remove it first.");
}
if (!fs.mkdirs(inDir)) {
throw new IOException("Cannot create input directory " + inDir);
}
// try {
//generate an input file for each map task
for(int i=0; i < numMaps; ++i) {
final Path file = new Path(inDir, "part"+i);
final LongWritable offset = new LongWritable(i * numPoints);
final LongWritable size = new LongWritable(numPoints);
final SequenceFile.Writer writer = SequenceFile.createWriter(
fs, conf, file,
LongWritable.class, LongWritable.class, CompressionType.NONE);
try {
writer.append(offset, size);
} finally {
writer.close();
}
System.out.println("Wrote input for Map #"+i);
}
//start a map/reduce job
System.out.println("Starting Job");
final long startTime = System.currentTimeMillis();
job.submit();
// final double duration = (System.currentTimeMillis() - startTime)/1000.0;
// System.out.println("Job Finished in " + duration + " seconds");
return job.getJobID();
// } finally {
// fs.delete(tmpDir, true);
// }
}
@Override
public void configure(final Job job) throws Exception {
super.configure(job);
job.setMapperClass(NNMapReduce.NNMapper.class);
job.setReducerClass(DBScanMapReduce.DBScanMapHullReducer.class);
job.setMapOutputKeyClass(PartitionDataWritable.class);
job.setMapOutputValueClass(AdapterWithObjectWritable.class);
job.setOutputKeyClass(GeoWaveInputKey.class);
job.setOutputValueClass(ObjectWritable.class);
job.setSpeculativeExecution(false);
final Configuration conf = job.getConfiguration();
conf.set("mapreduce.map.java.opts", "-Xmx" + memInMB + "m");
conf.set("mapreduce.reduce.java.opts", "-Xmx" + memInMB + "m");
conf.setLong("mapred.task.timeout", 2000000);
conf.setInt("mapreduce.task.io.sort.mb", 250);
job.getConfiguration().setBoolean("mapreduce.reduce.speculative", false);
Class<? extends CompressionCodec> bestCodecClass =
org.apache.hadoop.io.compress.DefaultCodec.class;
int rank = 0;
for (final Class<? extends CompressionCodec> codecClass : CompressionCodecFactory.getCodecClasses(
conf)) {
int r = 1;
for (final String codecs : CodecsRank) {
if (codecClass.getName().contains(codecs)) {
break;
}
r++;
}
if ((rank < r) && (r <= CodecsRank.length)) {
try {
final CompressionCodec codec = codecClass.newInstance();
if (Configurable.class.isAssignableFrom(codecClass)) {
((Configurable) codec).setConf(conf);
}
// throws an exception if not configurable in this context
CodecPool.getCompressor(codec);
bestCodecClass = codecClass;
rank = r;
} catch (final Throwable ex) {
// occurs when codec is not installed.
LOGGER.info("Not configuable in this context", ex);
}
}
}
LOGGER.warn("Compression with " + bestCodecClass.toString());
conf.setClass("mapreduce.map.output.compress.codec", bestCodecClass, CompressionCodec.class);
conf.setBoolean("mapreduce.map.output.compress", true);
conf.setBooleanIfUnset("first.iteration", firstIteration);
}
@VisibleForTesting
public Job createJob(int numMapper, int numReducer, int iReduceStagesCount,
int numIReducer, long mapSleepTime, int mapSleepCount,
long reduceSleepTime, int reduceSleepCount,
long iReduceSleepTime, int iReduceSleepCount)
throws IOException {
Configuration conf = getConf();
conf.setLong(MAP_SLEEP_TIME, mapSleepTime);
conf.setLong(REDUCE_SLEEP_TIME, reduceSleepTime);
conf.setLong(IREDUCE_SLEEP_TIME, iReduceSleepTime);
conf.setInt(MAP_SLEEP_COUNT, mapSleepCount);
conf.setInt(REDUCE_SLEEP_COUNT, reduceSleepCount);
conf.setInt(IREDUCE_SLEEP_COUNT, iReduceSleepCount);
conf.setInt(MRJobConfig.NUM_MAPS, numMapper);
conf.setInt(IREDUCE_STAGES_COUNT, iReduceStagesCount);
conf.setInt(IREDUCE_TASKS_COUNT, numIReducer);
// Configure intermediate reduces
conf.setInt(
org.apache.tez.mapreduce.hadoop.MRJobConfig.MRR_INTERMEDIATE_STAGES,
iReduceStagesCount);
LOG.info("Running MRR with " + iReduceStagesCount + " IR stages");
for (int i = 1; i <= iReduceStagesCount; ++i) {
// Set reducer class for intermediate reduce
conf.setClass(
MultiStageMRConfigUtil.getPropertyNameForIntermediateStage(i,
"mapreduce.job.reduce.class"), ISleepReducer.class, Reducer.class);
// Set reducer output key class
conf.setClass(
MultiStageMRConfigUtil.getPropertyNameForIntermediateStage(i,
"mapreduce.map.output.key.class"), IntWritable.class, Object.class);
// Set reducer output value class
conf.setClass(
MultiStageMRConfigUtil.getPropertyNameForIntermediateStage(i,
"mapreduce.map.output.value.class"), IntWritable.class, Object.class);
conf.setInt(
MultiStageMRConfigUtil.getPropertyNameForIntermediateStage(i,
"mapreduce.job.reduces"), numIReducer);
}
Job job = Job.getInstance(conf, "sleep");
job.setNumReduceTasks(numReducer);
job.setJarByClass(MRRSleepJob.class);
job.setNumReduceTasks(numReducer);
job.setMapperClass(SleepMapper.class);
job.setMapOutputKeyClass(IntWritable.class);
job.setMapOutputValueClass(IntWritable.class);
job.setReducerClass(SleepReducer.class);
job.setOutputFormatClass(NullOutputFormat.class);
job.setInputFormatClass(SleepInputFormat.class);
job.setPartitionerClass(MRRSleepJobPartitioner.class);
job.setSpeculativeExecution(false);
job.setJobName("Sleep job");
FileInputFormat.addInputPath(job, new Path("ignored"));
return job;
}
@Override
public int runJob() throws Exception {
final boolean job1Success = (super.runJob() == 0);
Assert.assertTrue(job1Success);
// after the first job there should be a sequence file with the
// filtered results which should match the expected results
// resources
final Job job = Job.getInstance(super.getConf());
final Configuration conf = job.getConfiguration();
MapReduceTestUtils.filterConfiguration(conf);
final ByteBuffer buf = ByteBuffer.allocate((8 * expectedResults.hashedCentroids.size()) + 4);
buf.putInt(expectedResults.hashedCentroids.size());
for (final Long hashedCentroid : expectedResults.hashedCentroids) {
buf.putLong(hashedCentroid);
}
conf.set(
MapReduceTestUtils.EXPECTED_RESULTS_KEY,
ByteArrayUtils.byteArrayToString(buf.array()));
GeoWaveInputFormat.setStoreOptions(conf, dataStoreOptions);
job.setJarByClass(this.getClass());
job.setJobName("GeoWave Test (" + dataStoreOptions.getGeoWaveNamespace() + ")");
job.setInputFormatClass(SequenceFileInputFormat.class);
job.setMapperClass(VerifyExpectedResultsMapper.class);
job.setMapOutputKeyClass(NullWritable.class);
job.setMapOutputValueClass(NullWritable.class);
job.setOutputFormatClass(NullOutputFormat.class);
job.setNumReduceTasks(0);
job.setSpeculativeExecution(false);
FileInputFormat.setInputPaths(job, getHdfsOutputPath());
final boolean job2success = job.waitForCompletion(true);
final Counters jobCounters = job.getCounters();
final Counter expectedCnt = jobCounters.findCounter(ResultCounterType.EXPECTED);
Assert.assertNotNull(expectedCnt);
Assert.assertEquals(expectedResults.count, expectedCnt.getValue());
final Counter errorCnt = jobCounters.findCounter(ResultCounterType.ERROR);
if (errorCnt != null) {
Assert.assertEquals(0L, errorCnt.getValue());
}
final Counter unexpectedCnt = jobCounters.findCounter(ResultCounterType.UNEXPECTED);
if (unexpectedCnt != null) {
Assert.assertEquals(0L, unexpectedCnt.getValue());
}
return job2success ? 0 : 1;
}