下面列出了org.apache.hadoop.mapreduce.task.ReduceContextImpl#org.apache.hadoop.mapreduce.TaskInputOutputContext 实例代码,或者点击链接到github查看源代码,也可以在右侧发表评论。
/**
* Add mapper that reads and writes from/to the queue
*/
@SuppressWarnings("unchecked")
void addMapper(ChainBlockingQueue<KeyValuePair<?, ?>> input,
ChainBlockingQueue<KeyValuePair<?, ?>> output,
TaskInputOutputContext context, int index) throws IOException,
InterruptedException {
Configuration conf = getConf(index);
Class<?> keyClass = conf.getClass(MAPPER_INPUT_KEY_CLASS, Object.class);
Class<?> valueClass = conf.getClass(MAPPER_INPUT_VALUE_CLASS, Object.class);
Class<?> keyOutClass = conf.getClass(MAPPER_OUTPUT_KEY_CLASS, Object.class);
Class<?> valueOutClass = conf.getClass(MAPPER_OUTPUT_VALUE_CLASS,
Object.class);
RecordReader rr = new ChainRecordReader(keyClass, valueClass, input, conf);
RecordWriter rw = new ChainRecordWriter(keyOutClass, valueOutClass, output,
conf);
MapRunner runner = new MapRunner(mappers.get(index), createMapContext(rr,
rw, context, getConf(index)), rr, rw);
threads.add(runner);
}
@Override
protected void flush(Multimap<BulkIngestKey,Value> entries, TaskInputOutputContext<?,?,BulkIngestKey,Value> context) throws IOException,
InterruptedException {
Multimap<BulkIngestKey,Value> residual = HashMultimap.create();
for (BulkIngestKey key : entries.keySet()) {
Collection<Value> values = entries.get(key);
if (tableCacheConf.containsKey(key.getTableName())) {
cache(key, values, context);
} else {
residual.putAll(key, values);
}
}
if (!residual.isEmpty()) {
contextWriter.write(residual, context);
}
}
/**
* Add mapper(the first mapper) that reads input from the input
* context and writes to queue
*/
@SuppressWarnings("unchecked")
void addMapper(TaskInputOutputContext inputContext,
ChainBlockingQueue<KeyValuePair<?, ?>> output, int index)
throws IOException, InterruptedException {
Configuration conf = getConf(index);
Class<?> keyOutClass = conf.getClass(MAPPER_OUTPUT_KEY_CLASS, Object.class);
Class<?> valueOutClass = conf.getClass(MAPPER_OUTPUT_VALUE_CLASS,
Object.class);
RecordReader rr = new ChainRecordReader(inputContext);
RecordWriter rw = new ChainRecordWriter(keyOutClass, valueOutClass, output,
conf);
Mapper.Context mapperContext = createMapContext(rr, rw,
(MapContext) inputContext, getConf(index));
MapRunner runner = new MapRunner(mappers.get(index), mapperContext, rr, rw);
threads.add(runner);
}
/**
* Add reducer that reads from context and writes to a queue
*/
@SuppressWarnings("unchecked")
void addReducer(TaskInputOutputContext inputContext,
ChainBlockingQueue<KeyValuePair<?, ?>> outputQueue) throws IOException,
InterruptedException {
Class<?> keyOutClass = rConf.getClass(REDUCER_OUTPUT_KEY_CLASS,
Object.class);
Class<?> valueOutClass = rConf.getClass(REDUCER_OUTPUT_VALUE_CLASS,
Object.class);
RecordWriter rw = new ChainRecordWriter(keyOutClass, valueOutClass,
outputQueue, rConf);
Reducer.Context reducerContext = createReduceContext(rw,
(ReduceContext) inputContext, rConf);
ReduceRunner runner = new ReduceRunner(reducerContext, reducer, rw);
threads.add(runner);
}
@Override
public void open(int taskNumber, int numTasks) throws IOException {
super.open(taskNumber, numTasks);
synchronized (OPEN_MULTIPLE_MUTEX) {
try {
TaskInputOutputContext taskInputOutputContext = new ReduceContextImpl(configuration,
context.getTaskAttemptID(), new InputIterator(), new GenericCounter(), new GenericCounter(),
recordWriter, outputCommitter, new DummyReporter(), null,
BytesWritable.class, BytesWritable.class);
this.writer = new MultipleOutputs(taskInputOutputContext);
} catch (InterruptedException e) {
throw new IOException("Could not create MultipleOutputs.", e);
}
}
}
/**
* Write the key, value to the cache.
*/
@Override
public void write(BulkIngestKey key, Value value, TaskInputOutputContext<?,?,OK,OV> context) throws IOException, InterruptedException {
if (constraintChecker != null && constraintChecker.isConfigured()) {
constraintChecker.check(key.getTableName(), key.getKey().getColumnVisibilityData().getBackingArray());
}
cache.put(key, value);
this.count++;
if (counters != null) {
counters.incrementCounter(key);
}
if (cache.size() > this.maxSize) {
commit(context);
}
}
@Override
public ReadableData<E> asReadable() {
return new ReadableData<E>() {
@Override
public Set<SourceTarget<?>> getSourceTargets() {
return ImmutableSet.<SourceTarget<?>>of(DatasetSourceTarget.this);
}
@Override
public void configure(Configuration conf) {
// TODO: optimize for file-based datasets by using distributed cache
}
@Override
public Iterable<E> read(TaskInputOutputContext<?, ?, ?, ?> context) throws IOException {
return view.newReader();
}
};
}
/**
* Add reducer that reads from context and writes to a queue
*/
@SuppressWarnings("unchecked")
void addReducer(TaskInputOutputContext inputContext,
ChainBlockingQueue<KeyValuePair<?, ?>> outputQueue) throws IOException,
InterruptedException {
Class<?> keyOutClass = rConf.getClass(REDUCER_OUTPUT_KEY_CLASS,
Object.class);
Class<?> valueOutClass = rConf.getClass(REDUCER_OUTPUT_VALUE_CLASS,
Object.class);
RecordWriter rw = new ChainRecordWriter(keyOutClass, valueOutClass,
outputQueue, rConf);
Reducer.Context reducerContext = createReduceContext(rw,
(ReduceContext) inputContext, rConf);
ReduceRunner runner = new ReduceRunner(reducerContext, reducer, rw);
threads.add(runner);
}
protected long writeEdges(EdgeDataBundle value, TaskInputOutputContext<KEYIN,? extends RawRecordContainer,KEYOUT,VALUEOUT> context,
ContextWriter<KEYOUT,VALUEOUT> contextWriter, boolean validActivtyDate, boolean sameActivityDate, long eventDate) throws IOException,
InterruptedException {
long edgesCreated = 0;
if (eventDate < newFormatStartDate) {
edgesCreated += writeEdges(value, context, contextWriter, EdgeKey.DATE_TYPE.OLD_EVENT);
} else {
if (validActivtyDate) {
if (sameActivityDate) {
edgesCreated += writeEdges(value, context, contextWriter, EdgeKey.DATE_TYPE.ACTIVITY_AND_EVENT);
} else {
edgesCreated += writeEdges(value, context, contextWriter, EdgeKey.DATE_TYPE.ACTIVITY_ONLY);
edgesCreated += writeEdges(value, context, contextWriter, EdgeKey.DATE_TYPE.EVENT_ONLY);
}
} else {
edgesCreated += writeEdges(value, context, contextWriter, EdgeKey.DATE_TYPE.EVENT_ONLY);
}
}
return edgesCreated;
}
protected static int attemptDownloadFileFromHDFS(TaskInputOutputContext context, FileSystem fs, String from, String to, int tries) throws IOException {
if(from.equalsIgnoreCase(to)) return 0;
int val = privateDownloadFileFromHDFS(context, fs, from, to);
int try_ = 1;
while (val != 0 && try_ < tries) {
val = privateDownloadFileFromHDFS(context, fs, from, to);
try_++;
}
if(val == 0)
Logger.DEBUG(from + " downloaded");
else {
Logger.DEBUG(from + " failed to download");
throw new IOException("expection downloading " + from + " to " + to + " from fs <" + fs + "> with error val: " + val);
}
return val;
}
@Override
public int getPartition(BulkIngestKey key, Value value, int numPartitions) {
readCacheFilesIfNecessary();
String tableName = key.getTableName().toString();
Text[] cutPointArray = splitsByTable.get().get(tableName);
if (null == cutPointArray)
return (tableName.hashCode() & Integer.MAX_VALUE) % numPartitions;
key.getKey().getRow(holder);
int index = Arrays.binarySearch(cutPointArray, holder);
index = calculateIndex(index, numPartitions, tableName, cutPointArray.length);
index = partitionLimiter.limit(numPartitions, index);
TaskInputOutputContext<?,?,?,?> c = context;
if (c != null && collectStats) {
c.getCounter("Partitions: " + key.getTableName(), "part." + formatter.format(index)).increment(1);
}
return index;
}
/** Compute sigma */
static void compute(Summation sigma,
TaskInputOutputContext<?, ?, NullWritable, TaskResult> context
) throws IOException, InterruptedException {
String s;
LOG.info(s = "sigma=" + sigma);
context.setStatus(s);
final long start = System.currentTimeMillis();
sigma.compute();
final long duration = System.currentTimeMillis() - start;
final TaskResult result = new TaskResult(sigma, duration);
LOG.info(s = "result=" + result);
context.setStatus(s);
context.write(NullWritable.get(), result);
}
@BeforeClass
@SuppressWarnings("unchecked")
public void setUp() {
TaskInputOutputContext<Object, Object, Object, Object> mockContext = Mockito.mock(TaskInputOutputContext.class);
this.recordsProcessedCount = Mockito.mock(Counter.class);
Mockito.when(mockContext.getCounter(
this.name, MetricRegistry.name(RECORDS_PROCESSED, Measurements.COUNT.getName())))
.thenReturn(this.recordsProcessedCount);
this.recordProcessRateCount = Mockito.mock(Counter.class);
Mockito.when(mockContext.getCounter(
this.name, MetricRegistry.name(RECORD_PROCESS_RATE, Measurements.COUNT.getName())))
.thenReturn(this.recordProcessRateCount);
this.recordSizeDistributionCount = Mockito.mock(Counter.class);
Mockito.when(mockContext.getCounter(
this.name, MetricRegistry.name(RECORD_SIZE_DISTRIBUTION, Measurements.COUNT.getName())))
.thenReturn(this.recordSizeDistributionCount);
this.totalDurationCount = Mockito.mock(Counter.class);
Mockito.when(mockContext.getCounter(
this.name, MetricRegistry.name(TOTAL_DURATION, Measurements.COUNT.getName())))
.thenReturn(this.totalDurationCount);
this.queueSize = Mockito.mock(Counter.class);
Mockito.when(mockContext.getCounter(this.name, QUEUE_SIZE)).thenReturn(this.queueSize);
this.hadoopCounterReporter = NewAPIHadoopCounterReporter.builder(mockContext)
.convertRatesTo(TimeUnit.SECONDS)
.convertDurationsTo(TimeUnit.SECONDS)
.filter(MetricFilter.ALL)
.build(MetricContext.builder(this.name).build());
}
/**
* Add mapper(the last mapper) that reads input from
* queue and writes output to the output context
*/
@SuppressWarnings("unchecked")
void addMapper(ChainBlockingQueue<KeyValuePair<?, ?>> input,
TaskInputOutputContext outputContext, int index) throws IOException,
InterruptedException {
Configuration conf = getConf(index);
Class<?> keyClass = conf.getClass(MAPPER_INPUT_KEY_CLASS, Object.class);
Class<?> valueClass = conf.getClass(MAPPER_INPUT_VALUE_CLASS, Object.class);
RecordReader rr = new ChainRecordReader(keyClass, valueClass, input, conf);
RecordWriter rw = new ChainRecordWriter(outputContext);
MapRunner runner = new MapRunner(mappers.get(index), createMapContext(rr,
rw, outputContext, getConf(index)), rr, rw);
threads.add(runner);
}
@Override
protected void flush(Multimap<BulkIngestKey,Value> entries, TaskInputOutputContext<?,?,OK,OV> context) throws IOException, InterruptedException {
// Note we are not calling the "countWrite" method as this will be done by the underlying ContextWriter
// if so configured
reducer.reduce(entries, context);
reducer.flush(context);
}
private void flushAll(TaskInputOutputContext<?,?,BulkIngestKey,Value> context) throws IOException, InterruptedException {
// pass all of the data through the delegate and clear the cache
for (Map.Entry<Text,Multimap<BulkIngestKey,Value>> entries : aggregatedCache.entrySet()) {
if (!entries.getValue().isEmpty()) {
getCounter(context, FLUSHED_BUFFER_TOTAL, entries.getKey().toString()).increment(entries.getValue().size());
getCounter(context, FLUSHED_BUFFER_COUNTER, entries.getKey().toString()).increment(1);
contextWriter.write(entries.getValue(), context);
}
}
aggregatedCache.clear();
}
private void cache(BulkIngestKey key, Collection<Value> values, TaskInputOutputContext<?,?,BulkIngestKey,Value> context) throws IOException,
InterruptedException {
List<Value> valueList = new ArrayList<>();
valueList.addAll(values);
Multimap<BulkIngestKey,Value> entries = aggregatedCache.get(key.getTableName());
if (entries != null) {
valueList.addAll(entries.removeAll(key));
} else {
entries = HashMultimap.create();
aggregatedCache.put(key.getTableName(), entries);
}
// reduce the entries as needed
if (valueList.size() > 1) {
entries.putAll(key, reduceValues(key, valueList, context));
} else {
entries.putAll(key, valueList);
}
// now flush this tables cache if needed
if (entries.size() >= tableCacheConf.get(key.getTableName())) {
// register that we overran the cache for this table
getCounter(context, FLUSHED_BUFFER_TOTAL, key.getTableName().toString()).increment(entries.size());
getCounter(context, FLUSHED_BUFFER_COUNTER, key.getTableName().toString()).increment(1);
contextWriter.write(entries, context);
aggregatedCache.remove(key.getTableName());
}
}
@SuppressWarnings("unchecked")
<KEYIN, VALUEIN, KEYOUT, VALUEOUT> void runReducer(
TaskInputOutputContext<KEYIN, VALUEIN, KEYOUT, VALUEOUT> context)
throws IOException, InterruptedException {
RecordWriter<KEYOUT, VALUEOUT> rw = new ChainRecordWriter<KEYOUT, VALUEOUT>(
context);
Reducer.Context reducerContext = createReduceContext(rw,
(ReduceContext) context, rConf);
reducer.run(reducerContext);
rw.close(context);
}
/**
* Clean up the context writer. Default implementation executes the flush method.
*
* @param context
* @throws IOException
* @throws InterruptedException
*/
@Override
public void cleanup(TaskInputOutputContext<?,?,OK,OV> context) throws IOException, InterruptedException {
commit(context);
// also flush the counters at this point
if (simpleClassName != null) {
getCounter(context, "ContextWriter", simpleClassName).increment(this.count);
this.count = 0;
}
if (counters != null) {
counters.flush(getContext(context));
}
super.close();
}
public AggregatingMetricsStore(ContextWriter<OK,OV> contextWriter, TaskInputOutputContext<?,?,OK,OV> context) {
this.contextWriter = contextWriter;
this.context = context;
this.maxSize = MetricsConfiguration.getAggBufferSize(context.getConfiguration());
this.table = new Text(MetricsConfiguration.getTable(context.getConfiguration()));
this.counts = new Counts<>(maxSize * 2);
this.flusher = new FlushMetrics();
}
/**
* Create a map context that is based on ChainMapContext and the given record
* reader and record writer
*/
private <KEYIN, VALUEIN, KEYOUT, VALUEOUT>
Mapper<KEYIN, VALUEIN, KEYOUT, VALUEOUT>.Context createMapContext(
RecordReader<KEYIN, VALUEIN> rr, RecordWriter<KEYOUT, VALUEOUT> rw,
TaskInputOutputContext<KEYIN, VALUEIN, KEYOUT, VALUEOUT> context,
Configuration conf) {
MapContext<KEYIN, VALUEIN, KEYOUT, VALUEOUT> mapContext =
new ChainMapContextImpl<KEYIN, VALUEIN, KEYOUT, VALUEOUT>(
context, rr, rw, conf);
Mapper<KEYIN, VALUEIN, KEYOUT, VALUEOUT>.Context mapperContext =
new WrappedMapper<KEYIN, VALUEIN, KEYOUT, VALUEOUT>()
.getMapContext(mapContext);
return mapperContext;
}
@SuppressWarnings("unchecked")
<KEYIN, VALUEIN, KEYOUT, VALUEOUT> void runReducer(
TaskInputOutputContext<KEYIN, VALUEIN, KEYOUT, VALUEOUT> context)
throws IOException, InterruptedException {
RecordWriter<KEYOUT, VALUEOUT> rw = new ChainRecordWriter<KEYOUT, VALUEOUT>(
context);
Reducer.Context reducerContext = createReduceContext(rw,
(ReduceContext) context, rConf);
reducer.run(reducerContext);
rw.close(context);
}
public MetricsService(ContextWriter<OK,OV> contextWriter, TaskInputOutputContext<?,?,OK,OV> context) {
Configuration conf = context.getConfiguration();
this.date = DateHelper.format(new Date());
this.fieldNames = MetricsConfiguration.getFieldNames(conf);
this.enabledLabels = MetricsConfiguration.getLabels(conf);
this.enabledKeys = enabledLabels.keySet();
this.wildcardedLabels = new HashSet<>();
for (Map.Entry<String,String> entry : enabledLabels.entries()) {
if (WILDCARD.equals(entry.getValue())) {
wildcardedLabels.add(entry.getKey());
}
}
this.receivers = new HashMap<>();
for (MetricsReceiver receiver : MetricsConfiguration.getReceivers(conf)) {
this.receivers.put(receiver.getMetric(), receiver);
receiver.configure(conf, date);
}
this.store = new AggregatingMetricsStore<>(contextWriter, context);
if (logger.isInfoEnabled()) {
logger.info("Metrics Service Initialized");
logger.info("enabledLabels = " + enabledLabels);
logger.info("receivers = " + receivers);
logger.info("fieldNames = " + fieldNames);
}
Preconditions.checkNotNull(fieldNames);
Preconditions.checkArgument(!enabledLabels.isEmpty());
Preconditions.checkArgument(!receivers.isEmpty());
}
/**
* Get desired information from reducer's context like job, reduce instance,
* etc
*
* @param context
* - reduce context to get all the required information of
* reducer like its job, reduce instance, etc
* @param className
* - Class which is calling this method
* @param methodName
* - Class Method which is calling this method
*/
@SuppressWarnings(RAW_TYPES)
public static void getReduceContextInfo(TaskInputOutputContext context,
String className, String methodName) {
Counter counter = context.getCounter(MAPRED_COUNTER,
REDUCE_INPUT_RECORDS);
getLogMsg(className, methodName, counter.getDisplayName(), COUNTERS,
counter.getValue());
counter = context.getCounter(MAPRED_COUNTER, REDUCE_OUTPUT_RECORDS);
getLogMsg(className, methodName, counter.getDisplayName(), COUNTERS,
counter.getValue());
}
/**
* This method is called once for each key. Most applications will define their reduce class by overriding this method. The default implementation is an
* identity function.
*/
@SuppressWarnings("unchecked")
public void doReduce(IK key, Iterable<IV> values, TaskInputOutputContext<?,?,OK,OV> context) throws IOException, InterruptedException {
for (IV value : values) {
context.write((OK) key, (OV) value);
}
}
@Override
public void setContext(TaskInputOutputContext<Object,Object,Object,Object> context)
{
super.setContext(context);
// ... and we also write the final output to multiple directories
_multipleOutputs = new AvroMultipleOutputs(context);
}
protected int writeKey(Key key, Value val, TaskInputOutputContext<KEYIN,? extends RawRecordContainer,KEYOUT,VALUEOUT> context,
ContextWriter<KEYOUT,VALUEOUT> contextWriter) throws IOException, InterruptedException {
if (key == null)
return 0;
BulkIngestKey bk = new BulkIngestKey(new Text(this.edgeTableName), key);
contextWriter.write(bk, val, context);
return 1;
}
@Override
public long process(KEYIN key, RawRecordContainer event, Multimap<String,NormalizedContentInterface> eventFields,
TaskInputOutputContext<KEYIN,? extends RawRecordContainer,KEYOUT,VALUEOUT> context, ContextWriter<KEYOUT,VALUEOUT> contextWriter)
throws IOException, InterruptedException {
// Hold some event-specific variables to avoid re-processing
this.shardId = getShardId(event);
if (tokenHelper.isVerboseShardCounters()) {
context.getCounter("EVENT_SHARD_ID", new String(this.shardId)).increment(1);
}
this.eventDataTypeName = event.getDataType().outputName();
this.eventUid = event.getId().toString();
// write the standard set of keys
Multimap<BulkIngestKey,Value> keys = super.processBulk(key, event, eventFields, new ContextWrappedStatusReporter(context));
long count = keys.size();
contextWriter.write(keys, context);
StatusReporter reporter = new ContextWrappedStatusReporter(context);
// gc before we get into the tokenization piece
keys = null;
// stream the tokens to the context writer here
count += tokenizeEvent(event, context, contextWriter, reporter);
// return the number of records written
return count;
}
@Override
protected void flush(Multimap<BulkIngestKey,Value> entries, TaskInputOutputContext<?,?,BulkIngestKey,Value> context) throws IOException,
InterruptedException {
for (Map.Entry<BulkIngestKey,Value> entry : entries.entries()) {
cache.put(entry.getKey(), entry.getValue());
}
}
/**
* Creates and writes the BulkIngestKey for the event's field/value to the ContextWriter (instead of the Multimap that the {@link ShardedDataTypeHandler}
* uses).
*
* @param event
* @param contextWriter
* @param context
* @param nFV
* @param shardId
* @param visibility
* @throws IOException
* @throws InterruptedException
*/
@Override
protected void createShardEventColumn(RawRecordContainer event, ContextWriter<KEYOUT,VALUEOUT> contextWriter,
TaskInputOutputContext<KEYIN,? extends RawRecordContainer,KEYOUT,VALUEOUT> context, NormalizedContentInterface nFV, byte[] shardId,
byte[] visibility) throws IOException, InterruptedException {
String fieldName = nFV.getEventFieldName();
String fieldValue = nFV.getEventFieldValue();
if (this.ingestHelper.isIndexOnlyField(fieldName))
return;
if (this.ingestHelper.isCompositeField(fieldName) && !this.ingestHelper.isOverloadedCompositeField(fieldName))
return;
if (StringUtils.isEmpty(fieldValue))
return;
Text colf = new Text(event.getDataType().outputName());
TextUtil.textAppend(colf, event.getId().toString(), this.eventReplaceMalformedUTF8);
Text colq = new Text(fieldName);
TextUtil.textAppend(colq, fieldValue, this.ingestHelper.getReplaceMalformedUTF8());
Key k = createKey(shardId, colf, colq, visibility, event.getDate(), this.ingestHelper.getDeleteMode());
BulkIngestKey bKey = new BulkIngestKey(new Text(this.getShardTableName()), k);
contextWriter.write(bKey, DataTypeHandler.NULL_VALUE, context);
}