下面列出了java.lang.InterruptedException#org.apache.hadoop.mapreduce.Reducer 实例代码,或者点击链接到github查看源代码,也可以在右侧发表评论。
@Override
public void reduce(final IntermediateProspect prospect, final Iterable<LongWritable> counts, final Date timestamp, final Reducer.Context context) throws IOException, InterruptedException {
long sum = 0;
for(final LongWritable count : counts) {
sum += count.get();
}
final String indexType = prospect.getTripleValueType().getIndexType();
// not sure if this is the best idea..
if ((sum >= 0) || indexType.equals(TripleValueType.PREDICATE.getIndexType())) {
final Mutation m = new Mutation(indexType + DELIM + prospect.getData() + DELIM + ProspectorUtils.getReverseIndexDateTime(timestamp));
final String dataType = prospect.getDataType();
final ColumnVisibility visibility = new ColumnVisibility(prospect.getVisibility());
final Value sumValue = new Value(("" + sum).getBytes(StandardCharsets.UTF_8));
m.put(COUNT, prospect.getDataType(), visibility, timestamp.getTime(), sumValue);
context.write(null, m);
}
}
@SuppressWarnings({ "rawtypes", "unchecked" })
protected void setup(Reducer.Context context) throws IOException, InterruptedException {
super.setup(context);
maxViolationsInReport = context.getConfiguration().getInt(DataValidationConstants.DV_NUM_REPORT_VIOLATION, 1000);
String dir = context.getConfiguration().get(SLAVE_FILE_LOC);
dirPath = JobUtil.getAndReplaceHolders(dir);
fileHandlerMap = new DVLRUCache(DataValidationConstants.TEN);
offsetLinesMap = new TreeMap<>();
ViolationPersistenceBean bean = new ViolationPersistenceBean();
bean.setLineNum(Integer.MAX_VALUE);
nullMap = new MultiValueTreeMap<String,ViolationPersistenceBean>(maxViolationsInReport);
dataTypeMap = new MultiValueTreeMap<String,ViolationPersistenceBean>(maxViolationsInReport);
regexMap = new MultiValueTreeMap<String,ViolationPersistenceBean>(maxViolationsInReport);
numFieldsMap = new MultiValueTreeMap<String,ViolationPersistenceBean>(maxViolationsInReport);
fileNames = new HashSet<String>();
}
private static int getReduceTaskNum(double totalSizeInM, KylinConfig kylinConfig) {
double perReduceInputMB = kylinConfig.getDefaultHadoopJobReducerInputMB();
double reduceCountRatio = kylinConfig.getDefaultHadoopJobReducerCountRatio();
// number of reduce tasks
int numReduceTasks = (int) Math.round(totalSizeInM / perReduceInputMB * reduceCountRatio);
// at least 1 reducer by default
numReduceTasks = Math.max(kylinConfig.getHadoopJobMinReducerNumber(), numReduceTasks);
// no more than 500 reducer by default
numReduceTasks = Math.min(kylinConfig.getHadoopJobMaxReducerNumber(), numReduceTasks);
logger.info("Having total map input MB " + Math.round(totalSizeInM));
logger.info("Having per reduce MB " + perReduceInputMB);
logger.info("Setting " + Reducer.Context.NUM_REDUCES + "=" + numReduceTasks);
return numReduceTasks;
}
@Override
public void reduce(
final GroupIDText key,
final Iterable<CountofDoubleWritable> values,
final Reducer<GroupIDText, CountofDoubleWritable, GroupIDText, CountofDoubleWritable>.Context context)
throws IOException, InterruptedException {
double expectation = 0;
double ptCount = 0;
for (final CountofDoubleWritable value : values) {
expectation += value.getValue();
ptCount += value.getCount();
}
outputValue.set(expectation, ptCount);
context.write(key, outputValue);
}
@Override
protected void reduce(Text key, Iterable<FlowBean> values,
Reducer<Text, FlowBean, Text, FlowBean>.Context context)
throws IOException, InterruptedException {
//super.reduce(arg0, arg1, arg2);
long upFlow = 0;
long downFlow = 0;
//long flowSum = 0;
for (FlowBean flowBean : values) {
upFlow += flowBean.getUpFlow();
downFlow += flowBean.getDownFlow();
//flowSum += flowBean.getSumFlow();
}
result.setPhoneNum(key.toString());
result.setUpFlow(upFlow);
result.setDownFlow(downFlow);
//result.setSumFlow(flowSum);
result.setSumFlow(upFlow + downFlow);
context.write(key, result);
}
@Override
protected void reduce(Text key, Iterable<FlowBean> values,
Reducer<Text, FlowBean, Text, FlowBean>.Context context)
throws IOException, InterruptedException {
//super.reduce(arg0, arg1, arg2);
long upFlow = 0;
long downFlow = 0;
//long flowSum = 0;
for (FlowBean flowBean : values) {
upFlow += flowBean.getUpFlow();
downFlow += flowBean.getDownFlow();
//flowSum += flowBean.getSumFlow();
}
result.setPhoneNum(key.toString());
result.setUpFlow(upFlow);
result.setDownFlow(downFlow);
//result.setSumFlow(flowSum);
result.setSumFlow(upFlow + downFlow);
context.write(key, result);
}
public static void convertCentersSequenceFileToText (Configuration conf, FileSystem fs, String seqFilePath, String outputPath) throws Exception {
Path seqFile = new Path (seqFilePath);
Path output = new Path (outputPath);
if (fs.exists(output)) {
fs.delete(output, true);
}
Job job = Job.getInstance(conf);
job.setMapperClass(CenterSequenceToTextConverter.class);
job.setReducerClass(Reducer.class);
job.setNumReduceTasks(0);
job.setMapOutputKeyClass(LongWritable.class);
job.setMapOutputValueClass(Text.class);
job.setOutputKeyClass(LongWritable.class);
job.setOutputValueClass(Text.class);
job.setOutputFormatClass(TextOutputFormat.class);
job.setInputFormatClass(SequenceFileInputFormat.class);
FileInputFormat.addInputPath(job, seqFile);
FileOutputFormat.setOutputPath(job, output);
job.waitForCompletion(true);
}
/**
* Creates a simple fail job.
*
* @param conf Configuration object
* @param outdir Output directory.
* @param indirs Comma separated input directories.
* @return Job initialized for a simple fail job.
* @throws Exception If an error occurs creating job configuration.
*/
public static Job createFailJob(Configuration conf, Path outdir,
Path... indirs) throws Exception {
FileSystem fs = outdir.getFileSystem(conf);
if (fs.exists(outdir)) {
fs.delete(outdir, true);
}
conf.setInt(MRJobConfig.MAP_MAX_ATTEMPTS, 2);
Job theJob = Job.getInstance(conf);
theJob.setJobName("Fail-Job");
FileInputFormat.setInputPaths(theJob, indirs);
theJob.setMapperClass(FailMapper.class);
theJob.setReducerClass(Reducer.class);
theJob.setNumReduceTasks(0);
FileOutputFormat.setOutputPath(theJob, outdir);
theJob.setOutputKeyClass(Text.class);
theJob.setOutputValueClass(Text.class);
return theJob;
}
/**
* Add reducer that reads from context and writes to a queue
*/
@SuppressWarnings("unchecked")
void addReducer(TaskInputOutputContext inputContext,
ChainBlockingQueue<KeyValuePair<?, ?>> outputQueue) throws IOException,
InterruptedException {
Class<?> keyOutClass = rConf.getClass(REDUCER_OUTPUT_KEY_CLASS,
Object.class);
Class<?> valueOutClass = rConf.getClass(REDUCER_OUTPUT_VALUE_CLASS,
Object.class);
RecordWriter rw = new ChainRecordWriter(keyOutClass, valueOutClass,
outputQueue, rConf);
Reducer.Context reducerContext = createReduceContext(rw,
(ReduceContext) inputContext, rConf);
ReduceRunner runner = new ReduceRunner(reducerContext, reducer, rw);
threads.add(runner);
}
@Override
protected void reduceNativeValues(
final GeoWaveInputKey key,
final Iterable<Object> values,
final Reducer<GeoWaveInputKey, ObjectWritable, GeoWaveOutputKey, Object>.Context context)
throws IOException, InterruptedException {
final Iterator<Object> objects = values.iterator();
while (objects.hasNext()) {
final AdapterToIndexMapping mapping = store.getIndicesForAdapter(key.getInternalAdapterId());
context.write(
new GeoWaveOutputKey<>(
internalAdapterStore.getTypeName(mapping.getAdapterId()),
mapping.getIndexNames()),
objects.next());
}
}
/**
* Creates a simple fail job.
*
* @param conf Configuration object
* @param outdir Output directory.
* @param indirs Comma separated input directories.
* @return Job initialized for a simple kill job.
* @throws Exception If an error occurs creating job configuration.
*/
public static Job createKillJob(Configuration conf, Path outdir,
Path... indirs) throws Exception {
Job theJob = Job.getInstance(conf);
theJob.setJobName("Kill-Job");
FileInputFormat.setInputPaths(theJob, indirs);
theJob.setMapperClass(KillMapper.class);
theJob.setReducerClass(Reducer.class);
theJob.setNumReduceTasks(0);
FileOutputFormat.setOutputPath(theJob, outdir);
theJob.setOutputKeyClass(Text.class);
theJob.setOutputValueClass(Text.class);
return theJob;
}
/**
* Add reducer that reads from context and writes to a queue
*/
@SuppressWarnings("unchecked")
void addReducer(TaskInputOutputContext inputContext,
ChainBlockingQueue<KeyValuePair<?, ?>> outputQueue) throws IOException,
InterruptedException {
Class<?> keyOutClass = rConf.getClass(REDUCER_OUTPUT_KEY_CLASS,
Object.class);
Class<?> valueOutClass = rConf.getClass(REDUCER_OUTPUT_VALUE_CLASS,
Object.class);
RecordWriter rw = new ChainRecordWriter(keyOutClass, valueOutClass,
outputQueue, rConf);
Reducer.Context reducerContext = createReduceContext(rw,
(ReduceContext) inputContext, rConf);
ReduceRunner runner = new ReduceRunner(reducerContext, reducer, rw);
threads.add(runner);
}
private static int getReduceTaskNum(double totalSizeInM, KylinConfig kylinConfig) {
double perReduceInputMB = kylinConfig.getDefaultHadoopJobReducerInputMB();
double reduceCountRatio = kylinConfig.getDefaultHadoopJobReducerCountRatio();
// number of reduce tasks
int numReduceTasks = (int) Math.round(totalSizeInM / perReduceInputMB * reduceCountRatio);
// at least 1 reducer by default
numReduceTasks = Math.max(kylinConfig.getHadoopJobMinReducerNumber(), numReduceTasks);
// no more than 500 reducer by default
numReduceTasks = Math.min(kylinConfig.getHadoopJobMaxReducerNumber(), numReduceTasks);
logger.info("Having total map input MB " + Math.round(totalSizeInM));
logger.info("Having per reduce MB " + perReduceInputMB);
logger.info("Setting " + Reducer.Context.NUM_REDUCES + "=" + numReduceTasks);
return numReduceTasks;
}
public void setupRunner(String jobName, Class<?> runnerClass,
Class<? extends TableMapper<?, ?>> mapperClass, Class<? extends Reducer<?, ?, ?, ?>> reducerClass,
Class<? extends WritableComparable<?>> outputKeyClass,
Class<? extends Writable> outputValueClass,
Class<? extends OutputFormat<?, ?>> outputFormatClass) {
this.setupRunner(jobName, runnerClass, mapperClass, reducerClass, outputKeyClass, outputValueClass, outputKeyClass, outputValueClass, outputFormatClass);
}
@Override
protected void reduceNativeValues(
final GeoWaveInputKey key,
final Iterable<Object> values,
final Reducer<GeoWaveInputKey, ObjectWritable, GeoWaveOutputKey, GridCoverage>.Context context)
throws IOException, InterruptedException {
final GridCoverage mergedCoverage = helper.getMergedCoverage(key, values);
if (mergedCoverage != null) {
context.write(helper.getGeoWaveOutputKey(), mergedCoverage);
}
}
protected void reduce(
CellWritableComparable row,
Iterable<Cell> kvs,
Reducer<CellWritableComparable,
Cell, ImmutableBytesWritable, Cell>.Context context)
throws java.io.IOException, InterruptedException {
int index = 0;
for (Cell kv : kvs) {
context.write(new ImmutableBytesWritable(CellUtil.cloneRow(kv)),
new MapReduceExtendedCell(kv));
if (++index % 100 == 0)
context.setStatus("Wrote " + index + " KeyValues, "
+ "and the rowkey whose is being wrote is " + Bytes.toString(kv.getRowArray()));
}
}
/**
* 具体设置参数
*
* @param jobName
* @param runnerClass
* @param mapperClass
* @param reducerClass
* @param mapOutputKeyClass
* @param mapOutputValueClass
* @param outputKeyClass
* @param outputValueClass
* @param outputFormatClass
*/
public void setupRunner(String jobName, Class<?> runnerClass, Class<? extends TableMapper<?, ?>> mapperClass, Class<? extends Reducer<?, ?, ?, ?>> reducerClass, Class<? extends WritableComparable<?>> mapOutputKeyClass, Class<? extends Writable> mapOutputValueClass, Class<? extends WritableComparable<?>> outputKeyClass, Class<? extends Writable> outputValueClass,
Class<? extends OutputFormat<?, ?>> outputFormatClass) {
this.jobName = jobName;
this.runnerClass = runnerClass;
this.mapperClass = mapperClass;
this.reducerClass = reducerClass;
this.mapOutputKeyClass = mapOutputKeyClass;
this.mapOutputValueClass = mapOutputValueClass;
this.outputKeyClass = outputKeyClass;
this.outputValueClass = outputValueClass;
this.outputFormatClass = outputFormatClass;
this.isCallSetUpRunnerMethod = true;
}
@Override
protected void setup(
final Reducer<PartitionDataWritable, AdapterWithObjectWritable, KEYOUT, VALUEOUT>.Context context)
throws IOException, InterruptedException {
super.setup(context);
final ScopedJobConfiguration config =
new ScopedJobConfiguration(context.getConfiguration(), NNMapReduce.class);
// first run must at least form a triangle
minOwners = config.getInt(ClusteringParameters.Clustering.MINIMUM_SIZE, 2);
LOGGER.info("Minumum owners = {}", minOwners);
}
@Override
protected void processSummary(
final PartitionData partitionData,
final Boolean summary,
final org.apache.hadoop.mapreduce.Reducer.Context context) {
// do nothing
}
@Override
protected void reduce(Text key, Iterable<DoubleWritable> values,
Reducer<Text, DoubleWritable, Text, DoubleWritable>.Context context)
throws IOException, InterruptedException {
double sum = 0;
for (DoubleWritable value : values) {
sum = sum + value.get();
}
context.write(key, new DoubleWritable(sum));
}
@Test
public void reduce(@Mocked final Reducer.Context defaultContext) throws IOException,InterruptedException {
EthereumBlockReducer reducer = new EthereumBlockReducer();
final Text defaultKey = new Text("Transaction Count:");
final IntWritable oneInt = new IntWritable(1);
final IntWritable twoInt = new IntWritable(2);
final LongWritable resultLong = new LongWritable(3);
final ArrayList al = new ArrayList<IntWritable>();
al.add(oneInt);
al.add(twoInt);
new Expectations() {{
defaultContext.write(defaultKey,resultLong); times=1;
}};
reducer.reduce(defaultKey,al,defaultContext);
}
@Override
protected void setup(
final Reducer<GeoWaveInputKey, ObjectWritable, GeoWaveOutputKey, Object>.Context context)
throws IOException, InterruptedException {
super.setup(context);
store = GeoWaveOutputFormat.getJobContextAdapterIndexMappingStore(context);
internalAdapterStore = GeoWaveOutputFormat.getJobContextInternalAdapterStore(context);
}
List<MRPair<KEYOUT,VALUEOUT>> run() throws IOException, InterruptedException {
TaskAttemptID id = new TaskAttemptID("testJob", 0, TaskType.REDUCE, 0, 0);
final MockReduceContext context = new MockReduceContext(this.conf, id, BulkIngestKey.class, Value.class);
context.reduceInput = this.input.iterator();
Reducer<KEYIN,VALUEIN,KEYOUT,VALUEOUT>.Context con = new WrappedReducer<KEYIN,VALUEIN,KEYOUT,VALUEOUT>().getReducerContext(context);
this.reducer.run(con);
return context.output;
}
@Override
protected void reduce(Text key, Iterable<LongWritable> values,
Reducer<Text, LongWritable, Text, LongWritable>.Context context)
throws IOException, InterruptedException {
//super.reduce(arg0, arg1, arg2);
long count = 0;
for (LongWritable value : values) {
count += value.get();
}
result.set(count);
context.write(key, result);
}
public void setContext(Reducer.Context context) {
this.context = context;
// mem = context.getConfiguration().get("mapreduce.reduce.java.opts");
mem = "-Xmx" + (int)(0.8*Integer.parseInt(context.getConfiguration().get("mapreduce.reduce.memory.mb"))) + "m";
String customArgs = HalvadeConf.getCustomArgs(context.getConfiguration(), "java", "");
if(customArgs != null)
java.add(customArgs);
}
@Override
protected void
setup(Reducer<ImmutableBytesWritable, Put, ImmutableBytesWritable, KeyValue>.Context context)
throws IOException, InterruptedException {
Configuration conf = context.getConfiguration();
this.kvCreator = new CellCreator(conf);
}
/**
* Creates and runs an MR job
*
* @param conf
* @throws IOException
* @throws InterruptedException
* @throws ClassNotFoundException
*/
public void createAndRunJob(Configuration conf) throws IOException,
InterruptedException, ClassNotFoundException {
Job job = Job.getInstance(conf);
job.setJarByClass(TestLineRecordReaderJobs.class);
job.setMapperClass(Mapper.class);
job.setReducerClass(Reducer.class);
FileInputFormat.addInputPath(job, inputDir);
FileOutputFormat.setOutputPath(job, outputDir);
job.waitForCompletion(true);
}
@Override
public int run(String[] strings) throws Exception {
conf.set(MRUtils.JOB_NAME_PROP, "Upgrade to Rya 3.2.2");
//faster
init();
Job job = new Job(conf);
job.setJarByClass(Upgrade322Tool.class);
setupAccumuloInput(job);
AccumuloInputFormat.setInputTableName(job, MRUtils.getTablePrefix(conf) + TBL_OSP_SUFFIX);
//we do not need to change any row that is a string, custom, or iri type
IteratorSetting regex = new IteratorSetting(30, "regex",
RegExFilter.class);
RegExFilter.setRegexs(regex, "\\w*" + TYPE_DELIM + "[\u0003|\u0008|\u0002]", null, null, null, false);
RegExFilter.setNegate(regex, true);
// set input output of the particular job
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(Mutation.class);
setupAccumuloOutput(job, MRUtils.getTablePrefix(conf) +
TBL_SPO_SUFFIX);
// set mapper and reducer classes
job.setMapperClass(Upgrade322Mapper.class);
job.setReducerClass(Reducer.class);
// Submit the job
return job.waitForCompletion(true) ? 0 : 1;
}
@Override
protected void reduce(
final KEYIN key,
final Iterable<VALUEIN> values,
final Reducer<KEYIN, VALUEIN, GeoWaveInputKey, ObjectWritable>.Context context)
throws IOException, InterruptedException {
reduceWritableValues(key, values, context);
}
@Override
public int run(final String[] args) throws Exception {
final Configuration conf = getConf();
conf.set("fs.defaultFS", "file:///");
final Job job = Job.getInstance(conf, JOB_NAME);
job.setJarByClass(getClass());
FileInputFormat.setInputPaths(job, new Path(TEST_DATA_LOCATION));
FileOutputFormat.setOutputPath(job, cleanPathForReuse(conf, OUTPUT_PATH));
job.setMapperClass(SimpleFeatureToAccumuloKeyValueMapper.class);
job.setReducerClass(Reducer.class); // (Identity Reducer)
job.setInputFormatClass(GeonamesDataFileInputFormat.class);
job.setOutputFormatClass(AccumuloFileOutputFormat.class);
job.setMapOutputKeyClass(Key.class);
job.setMapOutputValueClass(Value.class);
job.setOutputKeyClass(Key.class);
job.setOutputValueClass(Value.class);
job.setNumReduceTasks(1);
job.setSpeculativeExecution(false);
final boolean result = job.waitForCompletion(true);
mapInputRecords =
job.getCounters().findCounter(TASK_COUNTER_GROUP_NAME, MAP_INPUT_RECORDS).getValue();
mapOutputRecords =
job.getCounters().findCounter(TASK_COUNTER_GROUP_NAME, MAP_OUTPUT_RECORDS).getValue();
return result ? 0 : 1;
}