下面列出了org.apache.hadoop.mapred.JobConf#setReduceSpeculativeExecution ( ) 实例代码,或者点击链接到github查看源代码,也可以在右侧发表评论。
public void configure(JobConf job) {
// Set the mapper and reducers
job.setMapperClass(ReadDataJob.TestMapper.class);
// Make sure this jar is included
job.setJarByClass(ReadDataJob.TestMapper.class);
// Specify the input and output data formats
job.setInputFormat(TextInputFormat.class);
job.setOutputFormat(NullOutputFormat.class);
// Turn off speculative execution
job.setMapSpeculativeExecution(false);
job.setReduceSpeculativeExecution(false);
// Add the job input path
FileInputFormat.addInputPath(job, new Path(this.input_path));
}
public void configure(JobConf job) {
// Set the mapper and reducers
job.setMapperClass(TestMapper.class);
// job.setReducerClass(TestReducer.class);
// Set the output types of the mapper and reducer
// job.setMapOutputKeyClass(IntWritable.class);
// job.setMapOutputValueClass(NullWritable.class);
// job.setOutputKeyClass(NullWritable.class);
// job.setOutputValueClass(NullWritable.class);
// Make sure this jar is included
job.setJarByClass(TestMapper.class);
// Specify the input and output data formats
job.setInputFormat(TextInputFormat.class);
job.setOutputFormat(NullOutputFormat.class);
// Turn off speculative execution
job.setMapSpeculativeExecution(false);
job.setReduceSpeculativeExecution(false);
// Add the job input path
FileInputFormat.addInputPath(job, new Path(this.input_filename));
}
private static JobConf createJobConf(Configuration conf, boolean useFastCopy) {
Class<? extends InputFormat> inputFormat =
(useFastCopy) ? FastCopyInputFormat.class : CopyInputFormat.class;
JobConf jobconf = new JobConf(conf, DistCp.class);
jobconf.setJobName(NAME);
// turn off speculative execution, because DFS doesn't handle
// multiple writers to the same file.
jobconf.setReduceSpeculativeExecution(false);
jobconf.setMapOutputKeyClass(FilePairComparable.class);
jobconf.setMapOutputValueClass(Text.class);
jobconf.setOutputKeyClass(FilePairComparable.class);
jobconf.setOutputValueClass(Text.class);
jobconf.setInputFormat(inputFormat);
jobconf.setMapperClass(CopyFilesTask.class);
jobconf.setReducerClass(CopyFilesTask.class);
jobconf.setNumReduceTasks(conf.getInt(MAX_REDUCE_LABEL, 1));
// Prevent the reducer from starting until all maps are done.
jobconf.setInt("mapred.job.rushreduce.reduce.threshold", 0);
jobconf.setFloat("mapred.reduce.slowstart.completed.maps", 1.0f);
return jobconf;
}
/**
* Initializes the reduce-part of the job with the appropriate output settings
*
* @param job The job
* @param dbOutputFormatClass
* @param tableName The table to insert data into
* @param fieldNames The field names in the table. If unknown, supply the appropriate
*/
public static void setOutput(JobConf job, Class<? extends DBOutputFormat> dbOutputFormatClass,
String tableName, String[] fieldNames, String[] updateFields, int batchSize) {
if (dbOutputFormatClass == null) { job.setOutputFormat(DBOutputFormat.class); } else {
job.setOutputFormat(dbOutputFormatClass);
}
// writing doesn't always happen in reduce
job.setReduceSpeculativeExecution(false);
job.setMapSpeculativeExecution(false);
DBConfiguration dbConf = new DBConfiguration(job);
dbConf.setOutputTableName(tableName);
dbConf.setOutputFieldNames(fieldNames);
if (updateFields != null) { dbConf.setOutputUpdateFieldNames(updateFields); }
if (batchSize != -1) { dbConf.setBatchStatementsNum(batchSize); }
}
private static DBConfiguration setOutput(JobConf job, String tableName) {
job.setOutputFormat(DBOutputFormat.class);
job.setReduceSpeculativeExecution(false);
DBConfiguration dbConf = new DBConfiguration(job);
dbConf.setOutputTableName(tableName);
return dbConf;
}
private static DBConfiguration setOutput(JobConf job, String tableName) {
job.setOutputFormat(DBOutputFormat.class);
job.setReduceSpeculativeExecution(false);
DBConfiguration dbConf = new DBConfiguration(job);
dbConf.setOutputTableName(tableName);
return dbConf;
}
public void indexSolr(String solrUrl, Path crawlDb, Path linkDb,
List<Path> segments, boolean noCommit, boolean deleteGone, String solrParams,
boolean filter, boolean normalize) throws IOException {
SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
long start = System.currentTimeMillis();
LOG.info("SolrIndexer: starting at " + sdf.format(start));
final JobConf job = new NutchJob(getConf());
job.setJobName("index-solr " + solrUrl);
LOG.info("SolrIndexer: deleting gone documents: " + deleteGone);
LOG.info("SolrIndexer: URL filtering: " + filter);
LOG.info("SolrIndexer: URL normalizing: " + normalize);
IndexerMapReduce.initMRJob(crawlDb, linkDb, segments, job);
job.set(SolrConstants.SERVER_URL, solrUrl);
job.setBoolean(IndexerMapReduce.INDEXER_DELETE, deleteGone);
job.setBoolean(IndexerMapReduce.URL_FILTERING, filter);
job.setBoolean(IndexerMapReduce.URL_NORMALIZING, normalize);
if (solrParams != null) {
job.set(SolrConstants.PARAMS, solrParams);
}
NutchIndexWriterFactory.addClassToConf(job, SolrWriter.class);
job.setReduceSpeculativeExecution(false);
final Path tmp = new Path("tmp_" + System.currentTimeMillis() + "-" +
new Random().nextInt());
FileOutputFormat.setOutputPath(job, tmp);
try {
JobClient.runJob(job);
// do the commits once and for all the reducers in one go
SolrServer solr = SolrUtils.getCommonsHttpSolrServer(job);
if (!noCommit) {
solr.commit();
}
long end = System.currentTimeMillis();
LOG.info("SolrIndexer: finished at " + sdf.format(end) + ", elapsed: " + TimingUtil.elapsedTime(start, end));
}
catch (Exception e){
LOG.error(e.toString());
} finally {
FileSystem.get(job).delete(tmp, true);
}
}
public void index(Path crawlDb, Path linkDb, List<Path> segments,
boolean noCommit, boolean deleteGone, String params,
boolean filter, boolean normalize) throws IOException {
SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
long start = System.currentTimeMillis();
LOG.info("Indexer: starting at " + sdf.format(start));
final JobConf job = new NutchJob(getConf());
job.setJobName("Indexer");
LOG.info("Indexer: deleting gone documents: " + deleteGone);
LOG.info("Indexer: URL filtering: " + filter);
LOG.info("Indexer: URL normalizing: " + normalize);
IndexWriters writers = new IndexWriters(getConf());
LOG.info(writers.describe());
IndexerMapReduce.initMRJob(crawlDb, linkDb, segments, job);
// NOW PASSED ON THE COMMAND LINE AS A HADOOP PARAM
// job.set(SolrConstants.SERVER_URL, solrUrl);
job.setBoolean(IndexerMapReduce.INDEXER_DELETE, deleteGone);
job.setBoolean(IndexerMapReduce.URL_FILTERING, filter);
job.setBoolean(IndexerMapReduce.URL_NORMALIZING, normalize);
if (params != null) {
job.set(IndexerMapReduce.INDEXER_PARAMS, params);
}
job.setReduceSpeculativeExecution(false);
final Path tmp = new Path("tmp_" + System.currentTimeMillis() + "-"
+ new Random().nextInt());
FileOutputFormat.setOutputPath(job, tmp);
try {
JobClient.runJob(job);
// do the commits once and for all the reducers in one go
if (!noCommit) {
writers.open(job,"commit");
writers.commit();
}
long end = System.currentTimeMillis();
LOG.info("Indexer: finished at " + sdf.format(end) + ", elapsed: "
+ TimingUtil.elapsedTime(start, end));
} finally {
FileSystem.get(job).delete(tmp, true);
}
}
/**
* Initializes the reduce-part of the job with the appropriate output settings
*
* @param job
* The job
* @param tableName
* The table to insert data into
* @param fieldNames
* The field names in the table. If unknown, supply the appropriate
* number of nulls.
*/
public static void setOutput(JobConf job, String tableName, String... fieldNames) {
job.setOutputFormat(DBOutputFormat.class);
job.setReduceSpeculativeExecution(false);
DBConfiguration dbConf = new DBConfiguration(job);
dbConf.setOutputTableName(tableName);
dbConf.setOutputFieldNames(fieldNames);
}
/**
* Initializes the reduce-part of the job with the appropriate output settings
*
* @param job
* The job
* @param tableName
* The table to insert data into
* @param fieldNames
* The field names in the table. If unknown, supply the appropriate
* number of nulls.
*/
public static void setOutput(JobConf job, String tableName, String... fieldNames) {
job.setOutputFormat(DBOutputFormat.class);
job.setReduceSpeculativeExecution(false);
DBConfiguration dbConf = new DBConfiguration(job);
dbConf.setOutputTableName(tableName);
dbConf.setOutputFieldNames(fieldNames);
}