下面列出了org.apache.hadoop.mapreduce.Job#setReduceSpeculativeExecution ( ) 实例代码,或者点击链接到github查看源代码,也可以在右侧发表评论。
@Override
protected void beforeRunJob(Job job) throws IOException {
super.beforeRunJob(job);
job.setNumReduceTasks(3); // 每个统计维度一个reducer
job.setPartitionerClass(ActiveUserPartitioner.class); // 设置分区类
// 不启动推测执行
job.setMapSpeculativeExecution(false);
job.setReduceSpeculativeExecution(false);
}
private static DBConfiguration setOutput(Job job,
String tableName) throws IOException {
job.setOutputFormatClass(DBOutputFormat.class);
job.setReduceSpeculativeExecution(false);
DBConfiguration dbConf = new DBConfiguration(job.getConfiguration());
dbConf.setOutputTableName(tableName);
return dbConf;
}
private static DBConfiguration setOutput(Job job,
String tableName) throws IOException {
job.setOutputFormatClass(DBOutputFormat.class);
job.setReduceSpeculativeExecution(false);
DBConfiguration dbConf = new DBConfiguration(job.getConfiguration());
dbConf.setOutputTableName(tableName);
return dbConf;
}
@Override
protected void configure(final Job job) throws Exception {
final ScopedJobConfiguration configWrapper =
new ScopedJobConfiguration(job.getConfiguration(), SimpleFeatureOutputReducer.class);
reducerCount = Math.max(configWrapper.getInt(ExtractParameters.Extract.REDUCER_COUNT, 8), 1);
outputBaseDir = configWrapper.getString(MapReduceParameters.MRConfig.HDFS_BASE_DIR, "/tmp");
LOGGER.info("Output base directory " + outputBaseDir);
super.configure(job);
@SuppressWarnings("rawtypes")
final Class<? extends DimensionExtractor> dimensionExtractorClass =
job.getConfiguration().getClass(
GeoWaveConfiguratorBase.enumToConfKey(
SimpleFeatureOutputReducer.class,
ExtractParameters.Extract.DIMENSION_EXTRACT_CLASS),
SimpleFeatureGeometryExtractor.class,
DimensionExtractor.class);
GeoWaveOutputFormat.addDataAdapter(
job.getConfiguration(),
createAdapter(
job.getConfiguration().get(
GeoWaveConfiguratorBase.enumToConfKey(
SimpleFeatureOutputReducer.class,
ExtractParameters.Extract.OUTPUT_DATA_TYPE_ID)),
job.getConfiguration().get(
GeoWaveConfiguratorBase.enumToConfKey(
SimpleFeatureOutputReducer.class,
ExtractParameters.Extract.DATA_NAMESPACE_URI)),
dimensionExtractorClass));
job.setJobName("GeoWave Extract (" + dataStoreOptions.getGeoWaveNamespace() + ")");
job.setReduceSpeculativeExecution(false);
}
@Override
public void configure(final Job job) throws Exception {
job.setMapperClass(ConvexHullMapReduce.ConvexHullMap.class);
job.setMapOutputKeyClass(GeoWaveInputKey.class);
job.setMapOutputValueClass(ObjectWritable.class);
job.setReducerClass(ConvexHullMapReduce.ConvexHullReducer.class);
job.setReduceSpeculativeExecution(false);
job.setOutputKeyClass(GeoWaveOutputKey.class);
job.setOutputValueClass(Object.class);
}
@Override
public void configure(final Job job) throws Exception {
job.setMapperClass(Mapper.class);
job.setReducerClass(InputToOutputKeyReducer.class);
job.setMapOutputKeyClass(GeoWaveInputKey.class);
job.setMapOutputValueClass(ObjectWritable.class);
job.setOutputKeyClass(GeoWaveOutputKey.class);
job.setOutputValueClass(Object.class);
job.setSpeculativeExecution(false);
job.setJobName("GeoWave Input to Output");
job.setReduceSpeculativeExecution(false);
}
@Override
public void configure(final Job job) throws Exception {
job.setMapperClass(UpdateCentroidCostMapReduce.UpdateCentroidCostMap.class);
job.setMapOutputKeyClass(GroupIDText.class);
job.setMapOutputValueClass(CountofDoubleWritable.class);
job.setCombinerClass(UpdateCentroidCostMapReduce.UpdateCentroidCostCombiner.class);
job.setReducerClass(UpdateCentroidCostMapReduce.UpdateCentroidCostReducer.class);
job.setReduceSpeculativeExecution(false);
job.setOutputKeyClass(GeoWaveOutputKey.class);
job.setOutputValueClass(SimpleFeature.class);
}
@Override
public void configure(final Job job) throws Exception {
job.setMapperClass(KMeansMapReduce.KMeansMapper.class);
job.setMapOutputKeyClass(GroupIDText.class);
job.setMapOutputValueClass(BytesWritable.class);
job.setReducerClass(KMeansMapReduce.KMeansReduce.class);
job.setCombinerClass(KMeansMapReduce.KMeansCombiner.class);
job.setReduceSpeculativeExecution(false);
job.setOutputKeyClass(GeoWaveOutputKey.class);
job.setOutputValueClass(SimpleFeature.class);
}
@Override
public void configure(final Job job) throws Exception {
job.setMapperClass(KSamplerMapReduce.SampleMap.class);
job.setMapOutputKeyClass(GeoWaveInputKey.class);
job.setMapOutputValueClass(ObjectWritable.class);
job.setReducerClass(KSamplerMapReduce.SampleReducer.class);
job.setPartitionerClass(KSamplerMapReduce.SampleKeyPartitioner.class);
job.setReduceSpeculativeExecution(false);
job.setOutputKeyClass(GeoWaveOutputKey.class);
job.setOutputValueClass(Object.class);
}
@Override
public int run(String[] strings) throws Exception {
Configuration conf = getConf();
String dir = conf.get(LindenJobConfig.INPUT_DIR, null);
logger.info("input dir:" + dir);
Path inputPath = new Path(StringUtils.unEscapeString(dir));
Path outputPath = new Path(conf.get(LindenJobConfig.OUTPUT_DIR));
String indexPath = conf.get(LindenJobConfig.INDEX_PATH);
FileSystem fs = FileSystem.get(conf);
if (fs.exists(outputPath)) {
fs.delete(outputPath, true);
}
if (fs.exists(new Path(indexPath))) {
fs.delete(new Path(indexPath), true);
}
int numShards = conf.getInt(LindenJobConfig.NUM_SHARDS, 1);
Shard[] shards = createShards(indexPath, numShards);
Shard.setIndexShards(conf, shards);
//empty trash;
(new Trash(conf)).expunge();
Job job = Job.getInstance(conf, "linden-hadoop-indexing");
job.setJarByClass(LindenJob.class);
job.setMapperClass(LindenMapper.class);
job.setCombinerClass(LindenCombiner.class);
job.setReducerClass(LindenReducer.class);
job.setMapOutputKeyClass(Shard.class);
job.setMapOutputValueClass(IntermediateForm.class);
job.setOutputKeyClass(Shard.class);
job.setOutputValueClass(Text.class);
job.setInputFormatClass(TextInputFormat.class);
job.setOutputFormatClass(IndexUpdateOutputFormat.class);
job.setReduceSpeculativeExecution(false);
job.setNumReduceTasks(numShards);
String lindenSchemaFile = conf.get(LindenJobConfig.SCHEMA_FILE_URL);
if (lindenSchemaFile == null) {
throw new IOException("no schema file is found");
}
logger.info("Adding schema file: " + lindenSchemaFile);
job.addCacheFile(new URI(lindenSchemaFile + "#lindenSchema"));
String lindenPropertiesFile = conf.get(LindenJobConfig.LINDEN_PROPERTIES_FILE_URL);
if (lindenPropertiesFile == null) {
throw new IOException("no linden properties file is found");
}
logger.info("Adding linden properties file: " + lindenPropertiesFile);
job.addCacheFile(new URI(lindenPropertiesFile + "#lindenProperties"));
FileInputFormat.setInputPaths(job, inputPath);
FileOutputFormat.setOutputPath(job, outputPath);
Path[] inputs = FileInputFormat.getInputPaths(job);
StringBuilder buffer = new StringBuilder(inputs[0].toString());
for (int i = 1; i < inputs.length; i++) {
buffer.append(",");
buffer.append(inputs[i].toString());
}
logger.info("mapreduce.input.dir = " + buffer.toString());
logger.info("mapreduce.output.dir = " + FileOutputFormat.getOutputPath(job).toString());
logger.info("mapreduce.job.num.reduce.tasks = " + job.getNumReduceTasks());
logger.info(shards.length + " shards = " + conf.get(LindenJobConfig.INDEX_SHARDS));
logger.info("mapreduce.input.format.class = " + job.getInputFormatClass());
logger.info("mapreduce.output.format.class = " + job.getOutputFormatClass());
logger.info("mapreduce.cluster.temp.dir = " + conf.get(MRJobConfig.TEMP_DIR));
job.waitForCompletion(true);
if (!job.isSuccessful()) {
throw new RuntimeException("Job failed");
}
return 0;
}
@Override
public int run(CommandLine cmd) throws Exception {
String source = cmd.getOptionValue('t');
TableMapReduceUtil.addDependencyJars(getConf(),
HalyardExport.class,
NTriplesUtil.class,
Rio.class,
AbstractRDFHandler.class,
RDFFormat.class,
RDFParser.class,
HTable.class,
HBaseConfiguration.class,
AuthenticationProtos.class,
Trace.class,
Gauge.class);
HBaseConfiguration.addHbaseResources(getConf());
Job job = Job.getInstance(getConf(), "HalyardDelete " + source);
if (cmd.hasOption('s')) {
job.getConfiguration().set(SUBJECT, cmd.getOptionValue('s'));
}
if (cmd.hasOption('p')) {
job.getConfiguration().set(PREDICATE, cmd.getOptionValue('p'));
}
if (cmd.hasOption('o')) {
job.getConfiguration().set(OBJECT, cmd.getOptionValue('o'));
}
if (cmd.hasOption('g')) {
job.getConfiguration().setStrings(CONTEXTS, cmd.getOptionValues('g'));
}
job.setJarByClass(HalyardBulkDelete.class);
TableMapReduceUtil.initCredentials(job);
Scan scan = HalyardTableUtils.scan(null, null);
TableMapReduceUtil.initTableMapperJob(source,
scan,
DeleteMapper.class,
ImmutableBytesWritable.class,
LongWritable.class,
job);
job.setMapOutputKeyClass(ImmutableBytesWritable.class);
job.setMapOutputValueClass(KeyValue.class);
job.setSpeculativeExecution(false);
job.setMapSpeculativeExecution(false);
job.setReduceSpeculativeExecution(false);
try (HTable hTable = HalyardTableUtils.getTable(getConf(), source, false, 0)) {
HFileOutputFormat2.configureIncrementalLoad(job, hTable.getTableDescriptor(), hTable.getRegionLocator());
FileOutputFormat.setOutputPath(job, new Path(cmd.getOptionValue('f')));
TableMapReduceUtil.addDependencyJars(job);
if (job.waitForCompletion(true)) {
new LoadIncrementalHFiles(getConf()).doBulkLoad(new Path(cmd.getOptionValue('f')), hTable);
LOG.info("Bulk Delete Completed..");
return 0;
}
}
return -1;
}
@Override
protected int run(CommandLine cmd) throws Exception {
String source = cmd.getOptionValue('s');
String workdir = cmd.getOptionValue('w');
String target = cmd.getOptionValue('t');
getConf().setBoolean(SKIP_INVALID_PROPERTY, cmd.hasOption('i'));
getConf().setBoolean(VERIFY_DATATYPE_VALUES_PROPERTY, cmd.hasOption('d'));
getConf().setBoolean(TRUNCATE_PROPERTY, cmd.hasOption('r'));
getConf().setInt(SPLIT_BITS_PROPERTY, Integer.parseInt(cmd.getOptionValue('b', "3")));
if (cmd.hasOption('g')) getConf().set(DEFAULT_CONTEXT_PROPERTY, cmd.getOptionValue('g'));
getConf().setBoolean(OVERRIDE_CONTEXT_PROPERTY, cmd.hasOption('o'));
getConf().setLong(DEFAULT_TIMESTAMP_PROPERTY, Long.parseLong(cmd.getOptionValue('e', String.valueOf(System.currentTimeMillis()))));
if (cmd.hasOption('m')) getConf().setLong("mapreduce.input.fileinputformat.split.maxsize", Long.parseLong(cmd.getOptionValue('m')));
TableMapReduceUtil.addDependencyJars(getConf(),
NTriplesUtil.class,
Rio.class,
AbstractRDFHandler.class,
RDFFormat.class,
RDFParser.class);
HBaseConfiguration.addHbaseResources(getConf());
Job job = Job.getInstance(getConf(), "HalyardBulkLoad -> " + workdir + " -> " + target);
job.setJarByClass(HalyardBulkLoad.class);
job.setMapperClass(RDFMapper.class);
job.setMapOutputKeyClass(ImmutableBytesWritable.class);
job.setMapOutputValueClass(KeyValue.class);
job.setInputFormatClass(RioFileInputFormat.class);
job.setSpeculativeExecution(false);
job.setReduceSpeculativeExecution(false);
try (HTable hTable = HalyardTableUtils.getTable(getConf(), target, true, getConf().getInt(SPLIT_BITS_PROPERTY, 3))) {
HFileOutputFormat2.configureIncrementalLoad(job, hTable.getTableDescriptor(), hTable.getRegionLocator());
FileInputFormat.setInputDirRecursive(job, true);
FileInputFormat.setInputPaths(job, source);
FileOutputFormat.setOutputPath(job, new Path(workdir));
TableMapReduceUtil.addDependencyJars(job);
TableMapReduceUtil.initCredentials(job);
if (job.waitForCompletion(true)) {
if (getConf().getBoolean(TRUNCATE_PROPERTY, false)) {
HalyardTableUtils.truncateTable(hTable).close();
}
new LoadIncrementalHFiles(getConf()).doBulkLoad(new Path(workdir), hTable);
LOG.info("Bulk Load Completed..");
return 0;
}
}
return -1;
}
public int run(CommandLine cmd) throws Exception {
String source = cmd.getOptionValue('s');
String queryFiles = cmd.getOptionValue('q');
String workdir = cmd.getOptionValue('w');
getConf().setLong(DEFAULT_TIMESTAMP_PROPERTY, Long.parseLong(cmd.getOptionValue('e', String.valueOf(System.currentTimeMillis()))));
if (cmd.hasOption('i')) getConf().set(ELASTIC_INDEX_URL, cmd.getOptionValue('i'));
TableMapReduceUtil.addDependencyJars(getConf(),
HalyardExport.class,
NTriplesUtil.class,
Rio.class,
AbstractRDFHandler.class,
RDFFormat.class,
RDFParser.class,
HTable.class,
HBaseConfiguration.class,
AuthenticationProtos.class,
Trace.class,
Gauge.class);
HBaseConfiguration.addHbaseResources(getConf());
getConf().setStrings(TABLE_NAME_PROPERTY, source);
getConf().setLong(DEFAULT_TIMESTAMP_PROPERTY, getConf().getLong(DEFAULT_TIMESTAMP_PROPERTY, System.currentTimeMillis()));
int stages = 1;
for (int stage = 0; stage < stages; stage++) {
Job job = Job.getInstance(getConf(), "HalyardBulkUpdate -> " + workdir + " -> " + source + " stage #" + stage);
job.getConfiguration().setInt(STAGE_PROPERTY, stage);
job.setJarByClass(HalyardBulkUpdate.class);
job.setMapperClass(SPARQLUpdateMapper.class);
job.setMapOutputKeyClass(ImmutableBytesWritable.class);
job.setMapOutputValueClass(KeyValue.class);
job.setInputFormatClass(QueryInputFormat.class);
job.setSpeculativeExecution(false);
job.setReduceSpeculativeExecution(false);
try (HTable hTable = HalyardTableUtils.getTable(getConf(), source, false, 0)) {
HFileOutputFormat2.configureIncrementalLoad(job, hTable.getTableDescriptor(), hTable.getRegionLocator());
QueryInputFormat.setQueriesFromDirRecursive(job.getConfiguration(), queryFiles, true, stage);
Path outPath = new Path(workdir, "stage"+stage);
FileOutputFormat.setOutputPath(job, outPath);
TableMapReduceUtil.addDependencyJars(job);
TableMapReduceUtil.initCredentials(job);
if (stage == 0) { //count real number of stages
for (InputSplit is : new QueryInputFormat().getSplits(job)) {
QueryInputFormat.QueryInputSplit qis = (QueryInputFormat.QueryInputSplit)is;
int updates = QueryParserUtil.parseUpdate(QueryLanguage.SPARQL, qis.getQuery(), null).getUpdateExprs().size();
if (updates > stages) {
stages = updates;
}
LOG.log(Level.INFO, "{0} contains {1} stages of the update sequence.", new Object[]{qis.getQueryName(), updates});
}
LOG.log(Level.INFO, "Bulk Update will process {0} MapReduce stages.", stages);
}
if (job.waitForCompletion(true)) {
new LoadIncrementalHFiles(getConf()).doBulkLoad(outPath, hTable);
LOG.log(Level.INFO, "Stage #{0} of {1} completed..", new Object[]{stage, stages});
} else {
return -1;
}
}
}
LOG.info("Bulk Update Completed..");
return 0;
}