下面列出了怎么用org.apache.hadoop.mapreduce.ReduceContext的API类实例代码及写法,或者点击链接到github查看源代码。
/**
* Add reducer that reads from context and writes to a queue
*/
@SuppressWarnings("unchecked")
void addReducer(TaskInputOutputContext inputContext,
ChainBlockingQueue<KeyValuePair<?, ?>> outputQueue) throws IOException,
InterruptedException {
Class<?> keyOutClass = rConf.getClass(REDUCER_OUTPUT_KEY_CLASS,
Object.class);
Class<?> valueOutClass = rConf.getClass(REDUCER_OUTPUT_VALUE_CLASS,
Object.class);
RecordWriter rw = new ChainRecordWriter(keyOutClass, valueOutClass,
outputQueue, rConf);
Reducer.Context reducerContext = createReduceContext(rw,
(ReduceContext) inputContext, rConf);
ReduceRunner runner = new ReduceRunner(reducerContext, reducer, rw);
threads.add(runner);
}
/**
* Add reducer that reads from context and writes to a queue
*/
@SuppressWarnings("unchecked")
void addReducer(TaskInputOutputContext inputContext,
ChainBlockingQueue<KeyValuePair<?, ?>> outputQueue) throws IOException,
InterruptedException {
Class<?> keyOutClass = rConf.getClass(REDUCER_OUTPUT_KEY_CLASS,
Object.class);
Class<?> valueOutClass = rConf.getClass(REDUCER_OUTPUT_VALUE_CLASS,
Object.class);
RecordWriter rw = new ChainRecordWriter(keyOutClass, valueOutClass,
outputQueue, rConf);
Reducer.Context reducerContext = createReduceContext(rw,
(ReduceContext) inputContext, rConf);
ReduceRunner runner = new ReduceRunner(reducerContext, reducer, rw);
threads.add(runner);
}
@Override
protected void reduceNativeValues(
final GeoWaveInputKey key,
final Iterable<Object> values,
final ReduceContext<GeoWaveInputKey, ObjectWritable, GeoWaveInputKey, Object> context)
throws IOException, InterruptedException {
final Iterator<Object> valIt = values.iterator();
if (valIt.hasNext()) {
key.setInternalAdapterId( // TODO this is a bit of a hack, but the
// adapter is seemingly completely
// transient and never actually
// persisted - it seems unlikely that
// the value for internal adapter ID
// even matters, but if it does this is
// the best effort
InternalAdapterStoreImpl.getLazyInitialAdapterId(outputAdapter.getTypeName()));
final SimpleFeature feature = getSimpleFeature(key, valIt.next());
context.write(key, feature);
}
}
public ChainReduceContextImpl(
ReduceContext<KEYIN, VALUEIN, KEYOUT, VALUEOUT> base,
RecordWriter<KEYOUT, VALUEOUT> output, Configuration conf) {
this.base = base;
this.rw = output;
this.conf = conf;
}
/**
* Create a reduce context that is based on ChainMapContext and the given
* record writer
*/
private <KEYIN, VALUEIN, KEYOUT, VALUEOUT>
Reducer<KEYIN, VALUEIN, KEYOUT, VALUEOUT>.Context createReduceContext(
RecordWriter<KEYOUT, VALUEOUT> rw,
ReduceContext<KEYIN, VALUEIN, KEYOUT, VALUEOUT> context,
Configuration conf) {
ReduceContext<KEYIN, VALUEIN, KEYOUT, VALUEOUT> reduceContext =
new ChainReduceContextImpl<KEYIN, VALUEIN, KEYOUT, VALUEOUT>(
context, rw, conf);
Reducer<KEYIN, VALUEIN, KEYOUT, VALUEOUT>.Context reducerContext =
new WrappedReducer<KEYIN, VALUEIN, KEYOUT, VALUEOUT>()
.getReducerContext(reduceContext);
return reducerContext;
}
@SuppressWarnings("unchecked")
<KEYIN, VALUEIN, KEYOUT, VALUEOUT> void runReducer(
TaskInputOutputContext<KEYIN, VALUEIN, KEYOUT, VALUEOUT> context)
throws IOException, InterruptedException {
RecordWriter<KEYOUT, VALUEOUT> rw = new ChainRecordWriter<KEYOUT, VALUEOUT>(
context);
Reducer.Context reducerContext = createReduceContext(rw,
(ReduceContext) context, rConf);
reducer.run(reducerContext);
rw.close(context);
}
public ChainReduceContextImpl(
ReduceContext<KEYIN, VALUEIN, KEYOUT, VALUEOUT> base,
RecordWriter<KEYOUT, VALUEOUT> output, Configuration conf) {
this.base = base;
this.rw = output;
this.conf = conf;
}
/**
* Create a reduce context that is based on ChainMapContext and the given
* record writer
*/
private <KEYIN, VALUEIN, KEYOUT, VALUEOUT>
Reducer<KEYIN, VALUEIN, KEYOUT, VALUEOUT>.Context createReduceContext(
RecordWriter<KEYOUT, VALUEOUT> rw,
ReduceContext<KEYIN, VALUEIN, KEYOUT, VALUEOUT> context,
Configuration conf) {
ReduceContext<KEYIN, VALUEIN, KEYOUT, VALUEOUT> reduceContext =
new ChainReduceContextImpl<KEYIN, VALUEIN, KEYOUT, VALUEOUT>(
context, rw, conf);
Reducer<KEYIN, VALUEIN, KEYOUT, VALUEOUT>.Context reducerContext =
new WrappedReducer<KEYIN, VALUEIN, KEYOUT, VALUEOUT>()
.getReducerContext(reduceContext);
return reducerContext;
}
@SuppressWarnings("unchecked")
<KEYIN, VALUEIN, KEYOUT, VALUEOUT> void runReducer(
TaskInputOutputContext<KEYIN, VALUEIN, KEYOUT, VALUEOUT> context)
throws IOException, InterruptedException {
RecordWriter<KEYOUT, VALUEOUT> rw = new ChainRecordWriter<KEYOUT, VALUEOUT>(
context);
Reducer.Context reducerContext = createReduceContext(rw,
(ReduceContext) context, rConf);
reducer.run(reducerContext);
rw.close(context);
}
public static void create(ReduceContext context, Configuration conf) throws IOException
{
redContext = context;
isMapper = false;
initCommonConfig(conf);
PigStatusReporter.getInstance().setContext(new MRTaskContext(context));
}
@SuppressWarnings("rawtypes")
@BeforeTest
void setupConf() throws IOException
{
Configuration conf = new JobConf();
conf.setBoolean(CubertStrings.USE_COMPACT_SERIALIZATION, false);
PhaseContext.create((MapContext) new TestContext(), conf);
PhaseContext.create((ReduceContext) new TestContext(), conf);
}
@SuppressWarnings("unchecked")
public void reduce(Object keyObj,
Iterable<Object> values,
ReduceContext<Object,Object,Object,Object> context) throws IOException, InterruptedException
{
Accumulator<GenericRecord,GenericRecord> acc = getAccumulator();
if (acc == null)
{
throw new RuntimeException("No accumulator set for combiner!");
}
acc.cleanup();
long accumulatedCount = 0;
for (Object valueObj : values)
{
AvroValue<GenericRecord> value = (AvroValue<GenericRecord>)valueObj;
acc.accumulate(value.datum());
accumulatedCount++;
}
if (accumulatedCount > 0)
{
GenericRecord intermediateValue = acc.getFinal();
if (intermediateValue != null)
{
context.write((AvroKey<GenericRecord>)keyObj,new AvroValue<GenericRecord>(intermediateValue));
}
}
}
@Override
protected void reduceNativeValues(
final GeoWaveInputKey key,
final Iterable<Object> values,
final ReduceContext<GeoWaveInputKey, ObjectWritable, GeoWaveInputKey, Object> context)
throws IOException, InterruptedException {
final GridCoverage mergedCoverage = helper.getMergedCoverage(key, values);
if (mergedCoverage != null) {
context.write(key, mergedCoverage);
}
}
public Context(ReduceContext<KEYIN, VALUEIN, KEYOUT, VALUEOUT> reduceContext)
{
this.reduceContext = reduceContext;
}
@Test (timeout=3000)
public void testLoadJobLoadReducer() throws Exception {
LoadJob.LoadReducer test = new LoadJob.LoadReducer();
Configuration conf = new Configuration();
conf.setInt(JobContext.NUM_REDUCES, 2);
CompressionEmulationUtil.setCompressionEmulationEnabled(conf, true);
conf.setBoolean(FileOutputFormat.COMPRESS, true);
CompressionEmulationUtil.setCompressionEmulationEnabled(conf, true);
conf.setBoolean(MRJobConfig.MAP_OUTPUT_COMPRESS, true);
TaskAttemptID taskid = new TaskAttemptID();
RawKeyValueIterator input = new FakeRawKeyValueIterator();
Counter counter = new GenericCounter();
Counter inputValueCounter = new GenericCounter();
LoadRecordWriter output = new LoadRecordWriter();
OutputCommitter committer = new CustomOutputCommitter();
StatusReporter reporter = new DummyReporter();
RawComparator<GridmixKey> comparator = new FakeRawComparator();
ReduceContext<GridmixKey, GridmixRecord, NullWritable, GridmixRecord> reduceContext = new ReduceContextImpl<GridmixKey, GridmixRecord, NullWritable, GridmixRecord>(
conf, taskid, input, counter, inputValueCounter, output, committer,
reporter, comparator, GridmixKey.class, GridmixRecord.class);
// read for previous data
reduceContext.nextKeyValue();
org.apache.hadoop.mapreduce.Reducer<GridmixKey, GridmixRecord, NullWritable, GridmixRecord>.Context context = new WrappedReducer<GridmixKey, GridmixRecord, NullWritable, GridmixRecord>()
.getReducerContext(reduceContext);
// test.setup(context);
test.run(context);
// have been readed 9 records (-1 for previous)
assertEquals(9, counter.getValue());
assertEquals(10, inputValueCounter.getValue());
assertEquals(1, output.getData().size());
GridmixRecord record = output.getData().values().iterator()
.next();
assertEquals(1593, record.getSize());
}
@Test (timeout=3000)
public void testSleepReducer() throws Exception {
Configuration conf = new Configuration();
conf.setInt(JobContext.NUM_REDUCES, 2);
CompressionEmulationUtil.setCompressionEmulationEnabled(conf, true);
conf.setBoolean(FileOutputFormat.COMPRESS, true);
CompressionEmulationUtil.setCompressionEmulationEnabled(conf, true);
conf.setBoolean(MRJobConfig.MAP_OUTPUT_COMPRESS, true);
TaskAttemptID taskId = new TaskAttemptID();
RawKeyValueIterator input = new FakeRawKeyValueReducerIterator();
Counter counter = new GenericCounter();
Counter inputValueCounter = new GenericCounter();
RecordWriter<NullWritable, NullWritable> output = new LoadRecordReduceWriter();
OutputCommitter committer = new CustomOutputCommitter();
StatusReporter reporter = new DummyReporter();
RawComparator<GridmixKey> comparator = new FakeRawComparator();
ReduceContext<GridmixKey, NullWritable, NullWritable, NullWritable> reducecontext = new ReduceContextImpl<GridmixKey, NullWritable, NullWritable, NullWritable>(
conf, taskId, input, counter, inputValueCounter, output, committer,
reporter, comparator, GridmixKey.class, NullWritable.class);
org.apache.hadoop.mapreduce.Reducer<GridmixKey, NullWritable, NullWritable, NullWritable>.Context context = new WrappedReducer<GridmixKey, NullWritable, NullWritable, NullWritable>()
.getReducerContext(reducecontext);
SleepReducer test = new SleepReducer();
long start = System.currentTimeMillis();
test.setup(context);
long sleeper = context.getCurrentKey().getReduceOutputBytes();
// status has been changed
assertEquals("Sleeping... " + sleeper + " ms left", context.getStatus());
// should sleep 0.9 sec
assertTrue(System.currentTimeMillis() >= (start + sleeper));
test.cleanup(context);
// status has been changed again
assertEquals("Slept for " + sleeper, context.getStatus());
}
public Context(ReduceContext<KEYIN, VALUEIN, KEYOUT, VALUEOUT> reduceContext)
{
this.reduceContext = reduceContext;
}
@Test (timeout=3000)
public void testLoadJobLoadReducer() throws Exception {
LoadJob.LoadReducer test = new LoadJob.LoadReducer();
Configuration conf = new Configuration();
conf.setInt(JobContext.NUM_REDUCES, 2);
CompressionEmulationUtil.setCompressionEmulationEnabled(conf, true);
conf.setBoolean(FileOutputFormat.COMPRESS, true);
CompressionEmulationUtil.setCompressionEmulationEnabled(conf, true);
conf.setBoolean(MRJobConfig.MAP_OUTPUT_COMPRESS, true);
TaskAttemptID taskid = new TaskAttemptID();
RawKeyValueIterator input = new FakeRawKeyValueIterator();
Counter counter = new GenericCounter();
Counter inputValueCounter = new GenericCounter();
LoadRecordWriter output = new LoadRecordWriter();
OutputCommitter committer = new CustomOutputCommitter();
StatusReporter reporter = new DummyReporter();
RawComparator<GridmixKey> comparator = new FakeRawComparator();
ReduceContext<GridmixKey, GridmixRecord, NullWritable, GridmixRecord> reduceContext = new ReduceContextImpl<GridmixKey, GridmixRecord, NullWritable, GridmixRecord>(
conf, taskid, input, counter, inputValueCounter, output, committer,
reporter, comparator, GridmixKey.class, GridmixRecord.class);
// read for previous data
reduceContext.nextKeyValue();
org.apache.hadoop.mapreduce.Reducer<GridmixKey, GridmixRecord, NullWritable, GridmixRecord>.Context context = new WrappedReducer<GridmixKey, GridmixRecord, NullWritable, GridmixRecord>()
.getReducerContext(reduceContext);
// test.setup(context);
test.run(context);
// have been readed 9 records (-1 for previous)
assertEquals(9, counter.getValue());
assertEquals(10, inputValueCounter.getValue());
assertEquals(1, output.getData().size());
GridmixRecord record = output.getData().values().iterator()
.next();
assertEquals(1593, record.getSize());
}
@Test (timeout=3000)
public void testSleepReducer() throws Exception {
Configuration conf = new Configuration();
conf.setInt(JobContext.NUM_REDUCES, 2);
CompressionEmulationUtil.setCompressionEmulationEnabled(conf, true);
conf.setBoolean(FileOutputFormat.COMPRESS, true);
CompressionEmulationUtil.setCompressionEmulationEnabled(conf, true);
conf.setBoolean(MRJobConfig.MAP_OUTPUT_COMPRESS, true);
TaskAttemptID taskId = new TaskAttemptID();
RawKeyValueIterator input = new FakeRawKeyValueReducerIterator();
Counter counter = new GenericCounter();
Counter inputValueCounter = new GenericCounter();
RecordWriter<NullWritable, NullWritable> output = new LoadRecordReduceWriter();
OutputCommitter committer = new CustomOutputCommitter();
StatusReporter reporter = new DummyReporter();
RawComparator<GridmixKey> comparator = new FakeRawComparator();
ReduceContext<GridmixKey, NullWritable, NullWritable, NullWritable> reducecontext = new ReduceContextImpl<GridmixKey, NullWritable, NullWritable, NullWritable>(
conf, taskId, input, counter, inputValueCounter, output, committer,
reporter, comparator, GridmixKey.class, NullWritable.class);
org.apache.hadoop.mapreduce.Reducer<GridmixKey, NullWritable, NullWritable, NullWritable>.Context context = new WrappedReducer<GridmixKey, NullWritable, NullWritable, NullWritable>()
.getReducerContext(reducecontext);
SleepReducer test = new SleepReducer();
long start = System.currentTimeMillis();
test.setup(context);
long sleeper = context.getCurrentKey().getReduceOutputBytes();
// status has been changed
assertEquals("Sleeping... " + sleeper + " ms left", context.getStatus());
// should sleep 0.9 sec
assertTrue(System.currentTimeMillis() >= (start + sleeper));
test.cleanup(context);
// status has been changed again
assertEquals("Slept for " + sleeper, context.getStatus());
}
public static ReduceContext getRedContext()
{
return redContext;
}
void validateGroupingSets(Object[][] rows, String[] expected, String[] groupingSets) throws JsonGenerationException,
JsonMappingException,
IOException,
InterruptedException
{
/* Step 1: Create input block schema */
int ndims = rows[0].length - 1;
String[] dimensions = new String[ndims];
String[] columnNames = new String[ndims + 1];
columnNames[0] = "clickCount";
StringBuffer typeName = new StringBuffer();
for (int i = 0; i < ndims; i++)
{
if (i > 0)
typeName.append(",");
typeName.append("int ");
String name = "Dim" + i;
typeName.append(name);
columnNames[i + 1] = name;
dimensions[i] = name;
}
BlockSchema inputSchema = new BlockSchema(typeName.toString());
/** Step 2: Create json */
ObjectMapper mapper = new ObjectMapper();
ObjectNode node = mapper.createObjectNode();
Configuration conf = new JobConf();
PhaseContext.create((MapContext) new TestContext(), conf);
PhaseContext.create((ReduceContext) new TestContext(), conf);
// add aggregates into json
ArrayNode measures = mapper.createArrayNode();
measures.add(JsonUtils.createObjectNode("type",
"SUM",
"input",
"clickCount",
"output",
"sum_clicks"));
node.put("aggregates", measures);
// add dimensions into json
ArrayNode dimensionNode = mapper.createArrayNode();
for (int i = 0; i < dimensions.length; i++)
dimensionNode.add(dimensions[i]);
node.put("dimensions", dimensionNode);
// add grouping sets into json
ArrayNode groupingSetNode = mapper.createArrayNode();
if (groupingSets != null)
for (String str : groupingSets)
groupingSetNode.add(str);
node.put("groupingSets", groupingSetNode);
/** Step 3: create the input block */
HashMap<String, Block> map = new HashMap<String, Block>();
Block block = new ArrayBlock(Arrays.asList(rows), columnNames, 1);
map.put("block", block);
/** Step 4: create CUBE operator, initialize */
CubeOperator cd = new CubeOperator();
BlockSchema outputSchema = inputSchema.append(new BlockSchema("INT sum_clicks"));
BlockProperties props =
new BlockProperties(null, outputSchema, (BlockProperties) null);
cd.setInput(map, node, props);
/** Step 5: get the results from CUBE operator and put them in a set */
Set<String> computed = new HashSet<String>();
Tuple tuple;
while ((tuple = cd.next()) != null)
{
computed.add(tuple.toString());
}
/** Step 6: validate that computed and brute force results are same */
// System.out.println("Aggregated:" + computed);
// System.out.println("Expected: " + java.util.Arrays.toString(expected));
Assert.assertEquals(computed.size(), expected.length);
for (String entry : expected)
{
Assert.assertTrue(computed.contains(entry));
}
}
public IllustratorContext
getReducerContext(ReduceContext<PigNullableWritable, NullableTuple, PigNullableWritable, Writable> reduceContext) {
return new IllustratorContext(reduceContext);
}
public IllustratorContext(
ReduceContext<PigNullableWritable, NullableTuple, PigNullableWritable, Writable> reduceContext) {
super(reduceContext);
}
public abstract void reduce(Object key,
Iterable<Object> values,
ReduceContext<Object,Object,Object,Object> context) throws IOException,InterruptedException;
@SuppressWarnings("unchecked")
public void reduce(Object keyObj,
Iterable<Object> values,
ReduceContext<Object,Object,Object,Object> context) throws IOException, InterruptedException
{
Accumulator<GenericRecord,GenericRecord> acc = getAccumulator();
if (acc == null)
{
throw new RuntimeException("No combiner factory set");
}
long accumulatedCount = 0;
acc.cleanup();
for (Object valueObj : values)
{
GenericRecord value = ((AvroValue<GenericRecord>)valueObj).datum();
if (value.getSchema().getFullName().equals(getSchemas().getIntermediateValueSchema().getFullName()))
{
acc.accumulate(value);
accumulatedCount++;
}
else if (value.getSchema().getFullName().equals(getSchemas().getDatedIntermediateValueSchema().getFullName()))
{
if (!_reusePreviousOutput)
{
throw new RuntimeException("Did not expect " + getSchemas().getDatedIntermediateValueSchema().getFullName());
}
Long time = (Long)value.get("time");
GenericRecord data = (GenericData.Record)value.get("value");
if (time == null)
{
throw new RuntimeException("time is null");
}
if (data == null)
{
throw new RuntimeException("value is null");
}
if (time >= _beginTime && time <= _endTime)
{
acc.accumulate(data);
accumulatedCount++;
}
else if (time < _beginTime)
{
// pass through unchanged, reducer will handle it
context.write((AvroKey<GenericRecord>)keyObj,new AvroValue<GenericRecord>(value));
}
else
{
throw new RuntimeException(String.format("Time %d is greater than end time %d",time,_endTime));
}
}
else if (value.getSchema().getFullName().equals(getSchemas().getOutputValueSchema().getFullName()))
{
if (!_reusePreviousOutput)
{
throw new RuntimeException("Did not expect " + getSchemas().getOutputValueSchema().getFullName());
}
// pass through unchanged, reducer will handle it
context.write((AvroKey<GenericRecord>)keyObj,new AvroValue<GenericRecord>(value));
}
else
{
throw new RuntimeException("Unexpected type: " + value.getSchema().getFullName());
}
}
if (accumulatedCount > 0)
{
GenericRecord intermediateValue = acc.getFinal();
if (intermediateValue != null)
{
context.write((AvroKey<GenericRecord>)keyObj,new AvroValue<GenericRecord>(intermediateValue));
}
}
}
protected abstract void reduceNativeValues(
final KEYIN key,
final Iterable<VALUEIN> values,
final ReduceContext<KEYIN, VALUEIN, GeoWaveInputKey, Object> context)
throws IOException, InterruptedException;
protected abstract void reduceNativeValues(
final GeoWaveInputKey key,
final Iterable<Object> values,
final ReduceContext<GeoWaveInputKey, ObjectWritable, GeoWaveInputKey, Object> context)
throws IOException, InterruptedException;
public NativeReduceContext(
final ReduceContext<KEYIN, VALUEIN, GeoWaveInputKey, ObjectWritable> writableContext,
final HadoopWritableSerializationTool serializationTool) {
this.writableContext = writableContext;
this.serializationTool = serializationTool;
}
private static <KEYIN, VALUEIN, KEYOUT, VALUEOUT> org.apache.hadoop.mapreduce.Reducer<KEYIN, VALUEIN, KEYOUT, VALUEOUT>.Context createReduceContext(
Configuration conf,
TaskAttemptID mrTaskAttemptID,
final TezRawKeyValueIterator rawIter,
Counter combineInputKeyCounter,
Counter combineInputValueCounter,
RecordWriter<KEYOUT, VALUEOUT> recordWriter,
MRTaskReporter reporter,
RawComparator<KEYIN> comparator,
Class<KEYIN> keyClass,
Class<VALUEIN> valClass) throws InterruptedException, IOException {
RawKeyValueIterator r = new RawKeyValueIterator() {
@Override
public boolean next() throws IOException {
return rawIter.next();
}
@Override
public DataInputBuffer getValue() throws IOException {
return rawIter.getValue();
}
@Override
public Progress getProgress() {
return rawIter.getProgress();
}
@Override
public DataInputBuffer getKey() throws IOException {
return rawIter.getKey();
}
@Override
public void close() throws IOException {
rawIter.close();
}
};
ReduceContext<KEYIN, VALUEIN, KEYOUT, VALUEOUT> rContext = new ReduceContextImpl<KEYIN, VALUEIN, KEYOUT, VALUEOUT>(
conf, mrTaskAttemptID, r, combineInputKeyCounter,
combineInputValueCounter, recordWriter, null, reporter, comparator,
keyClass, valClass);
org.apache.hadoop.mapreduce.Reducer<KEYIN, VALUEIN, KEYOUT, VALUEOUT>.Context reducerContext = new WrappedReducer<KEYIN, VALUEIN, KEYOUT, VALUEOUT>()
.getReducerContext(rContext);
return reducerContext;
}
private static <KEYIN, VALUEIN, KEYOUT, VALUEOUT> org.apache.hadoop.mapreduce.Reducer<KEYIN, VALUEIN, KEYOUT, VALUEOUT>.Context createReduceContext(
Configuration conf,
TaskAttemptID mrTaskAttemptID,
final TezRawKeyValueIterator rawIter,
Counter combineInputRecordsCounter,
Counter combineOutputRecordsCounter,
RecordWriter<KEYOUT, VALUEOUT> recordWriter,
MRTaskReporter reporter,
RawComparator<KEYIN> comparator,
Class<KEYIN> keyClass,
Class<VALUEIN> valClass) throws InterruptedException, IOException {
RawKeyValueIterator r = new RawKeyValueIterator() {
@Override
public boolean next() throws IOException {
return rawIter.next();
}
@Override
public DataInputBuffer getValue() throws IOException {
return rawIter.getValue();
}
@Override
public Progress getProgress() {
return rawIter.getProgress();
}
@Override
public DataInputBuffer getKey() throws IOException {
return rawIter.getKey();
}
@Override
public void close() throws IOException {
rawIter.close();
}
};
ReduceContext<KEYIN, VALUEIN, KEYOUT, VALUEOUT> rContext = new ReduceContextImpl<KEYIN, VALUEIN, KEYOUT, VALUEOUT>(
conf, mrTaskAttemptID, r, null,
combineInputRecordsCounter, recordWriter, null, reporter, comparator,
keyClass, valClass);
org.apache.hadoop.mapreduce.Reducer<KEYIN, VALUEIN, KEYOUT, VALUEOUT>.Context reducerContext = new WrappedReducer<KEYIN, VALUEIN, KEYOUT, VALUEOUT>()
.getReducerContext(rContext);
return reducerContext;
}