org.apache.hadoop.mapreduce.task.ReduceContextImpl#org.apache.hadoop.mapreduce.TaskInputOutputContext源码实例Demo

下面列出了org.apache.hadoop.mapreduce.task.ReduceContextImpl#org.apache.hadoop.mapreduce.TaskInputOutputContext 实例代码,或者点击链接到github查看源代码,也可以在右侧发表评论。

源代码1 项目: big-c   文件: Chain.java
/**
 * Add mapper that reads and writes from/to the queue
 */
@SuppressWarnings("unchecked")
void addMapper(ChainBlockingQueue<KeyValuePair<?, ?>> input,
    ChainBlockingQueue<KeyValuePair<?, ?>> output,
    TaskInputOutputContext context, int index) throws IOException,
    InterruptedException {
  Configuration conf = getConf(index);
  Class<?> keyClass = conf.getClass(MAPPER_INPUT_KEY_CLASS, Object.class);
  Class<?> valueClass = conf.getClass(MAPPER_INPUT_VALUE_CLASS, Object.class);
  Class<?> keyOutClass = conf.getClass(MAPPER_OUTPUT_KEY_CLASS, Object.class);
  Class<?> valueOutClass = conf.getClass(MAPPER_OUTPUT_VALUE_CLASS,
      Object.class);
  RecordReader rr = new ChainRecordReader(keyClass, valueClass, input, conf);
  RecordWriter rw = new ChainRecordWriter(keyOutClass, valueOutClass, output,
      conf);
  MapRunner runner = new MapRunner(mappers.get(index), createMapContext(rr,
      rw, context, getConf(index)), rr, rw);
  threads.add(runner);
}
 
源代码2 项目: datawave   文件: TableCachingContextWriter.java
@Override
protected void flush(Multimap<BulkIngestKey,Value> entries, TaskInputOutputContext<?,?,BulkIngestKey,Value> context) throws IOException,
                InterruptedException {
    Multimap<BulkIngestKey,Value> residual = HashMultimap.create();
    for (BulkIngestKey key : entries.keySet()) {
        Collection<Value> values = entries.get(key);
        if (tableCacheConf.containsKey(key.getTableName())) {
            cache(key, values, context);
        } else {
            residual.putAll(key, values);
        }
    }
    if (!residual.isEmpty()) {
        contextWriter.write(residual, context);
    }
}
 
源代码3 项目: big-c   文件: Chain.java
/**
 * Add mapper(the first mapper) that reads input from the input
 * context and writes to queue
 */
@SuppressWarnings("unchecked")
void addMapper(TaskInputOutputContext inputContext,
    ChainBlockingQueue<KeyValuePair<?, ?>> output, int index)
    throws IOException, InterruptedException {
  Configuration conf = getConf(index);
  Class<?> keyOutClass = conf.getClass(MAPPER_OUTPUT_KEY_CLASS, Object.class);
  Class<?> valueOutClass = conf.getClass(MAPPER_OUTPUT_VALUE_CLASS,
      Object.class);

  RecordReader rr = new ChainRecordReader(inputContext);
  RecordWriter rw = new ChainRecordWriter(keyOutClass, valueOutClass, output,
      conf);
  Mapper.Context mapperContext = createMapContext(rr, rw,
      (MapContext) inputContext, getConf(index));
  MapRunner runner = new MapRunner(mappers.get(index), mapperContext, rr, rw);
  threads.add(runner);
}
 
源代码4 项目: hadoop   文件: Chain.java
/**
 * Add reducer that reads from context and writes to a queue
 */
@SuppressWarnings("unchecked")
void addReducer(TaskInputOutputContext inputContext,
    ChainBlockingQueue<KeyValuePair<?, ?>> outputQueue) throws IOException,
    InterruptedException {

  Class<?> keyOutClass = rConf.getClass(REDUCER_OUTPUT_KEY_CLASS,
      Object.class);
  Class<?> valueOutClass = rConf.getClass(REDUCER_OUTPUT_VALUE_CLASS,
      Object.class);
  RecordWriter rw = new ChainRecordWriter(keyOutClass, valueOutClass,
      outputQueue, rConf);
  Reducer.Context reducerContext = createReduceContext(rw,
      (ReduceContext) inputContext, rConf);
  ReduceRunner runner = new ReduceRunner(reducerContext, reducer, rw);
  threads.add(runner);
}
 
源代码5 项目: kylin   文件: HadoopMultipleOutputFormat.java
@Override
public void open(int taskNumber, int numTasks) throws IOException {
    super.open(taskNumber, numTasks);

    synchronized (OPEN_MULTIPLE_MUTEX) {
        try {
            TaskInputOutputContext taskInputOutputContext = new ReduceContextImpl(configuration,
                    context.getTaskAttemptID(), new InputIterator(), new GenericCounter(), new GenericCounter(),
                    recordWriter, outputCommitter, new DummyReporter(), null,
                    BytesWritable.class, BytesWritable.class);
            this.writer = new MultipleOutputs(taskInputOutputContext);
        } catch (InterruptedException e) {
            throw new IOException("Could not create MultipleOutputs.", e);
        }
    }
}
 
源代码6 项目: datawave   文件: AbstractContextWriter.java
/**
 * Write the key, value to the cache.
 */
@Override
public void write(BulkIngestKey key, Value value, TaskInputOutputContext<?,?,OK,OV> context) throws IOException, InterruptedException {
    if (constraintChecker != null && constraintChecker.isConfigured()) {
        constraintChecker.check(key.getTableName(), key.getKey().getColumnVisibilityData().getBackingArray());
    }
    
    cache.put(key, value);
    this.count++;
    if (counters != null) {
        counters.incrementCounter(key);
    }
    if (cache.size() > this.maxSize) {
        commit(context);
    }
}
 
源代码7 项目: kite   文件: DatasetSourceTarget.java
@Override
public ReadableData<E> asReadable() {
  return new ReadableData<E>() {

    @Override
    public Set<SourceTarget<?>> getSourceTargets() {
      return ImmutableSet.<SourceTarget<?>>of(DatasetSourceTarget.this);
    }

    @Override
    public void configure(Configuration conf) {
      // TODO: optimize for file-based datasets by using distributed cache
    }

    @Override
    public Iterable<E> read(TaskInputOutputContext<?, ?, ?, ?> context) throws IOException {
      return view.newReader();
    }
  };
}
 
源代码8 项目: big-c   文件: Chain.java
/**
 * Add reducer that reads from context and writes to a queue
 */
@SuppressWarnings("unchecked")
void addReducer(TaskInputOutputContext inputContext,
    ChainBlockingQueue<KeyValuePair<?, ?>> outputQueue) throws IOException,
    InterruptedException {

  Class<?> keyOutClass = rConf.getClass(REDUCER_OUTPUT_KEY_CLASS,
      Object.class);
  Class<?> valueOutClass = rConf.getClass(REDUCER_OUTPUT_VALUE_CLASS,
      Object.class);
  RecordWriter rw = new ChainRecordWriter(keyOutClass, valueOutClass,
      outputQueue, rConf);
  Reducer.Context reducerContext = createReduceContext(rw,
      (ReduceContext) inputContext, rConf);
  ReduceRunner runner = new ReduceRunner(reducerContext, reducer, rw);
  threads.add(runner);
}
 
源代码9 项目: datawave   文件: ProtobufEdgeDataTypeHandler.java
protected long writeEdges(EdgeDataBundle value, TaskInputOutputContext<KEYIN,? extends RawRecordContainer,KEYOUT,VALUEOUT> context,
                ContextWriter<KEYOUT,VALUEOUT> contextWriter, boolean validActivtyDate, boolean sameActivityDate, long eventDate) throws IOException,
                InterruptedException {
    
    long edgesCreated = 0;
    if (eventDate < newFormatStartDate) {
        edgesCreated += writeEdges(value, context, contextWriter, EdgeKey.DATE_TYPE.OLD_EVENT);
    } else {
        if (validActivtyDate) {
            if (sameActivityDate) {
                edgesCreated += writeEdges(value, context, contextWriter, EdgeKey.DATE_TYPE.ACTIVITY_AND_EVENT);
            } else {
                edgesCreated += writeEdges(value, context, contextWriter, EdgeKey.DATE_TYPE.ACTIVITY_ONLY);
                edgesCreated += writeEdges(value, context, contextWriter, EdgeKey.DATE_TYPE.EVENT_ONLY);
            }
        } else {
            edgesCreated += writeEdges(value, context, contextWriter, EdgeKey.DATE_TYPE.EVENT_ONLY);
        }
    }
    
    return edgesCreated;
}
 
源代码10 项目: halvade   文件: HalvadeFileUtils.java
protected static int attemptDownloadFileFromHDFS(TaskInputOutputContext context, FileSystem fs, String from, String to, int tries) throws IOException {
    if(from.equalsIgnoreCase(to)) return 0;
    int val = privateDownloadFileFromHDFS(context, fs, from, to);
    int try_ = 1;
    while (val != 0 && try_ < tries) {
        val = privateDownloadFileFromHDFS(context, fs, from, to);
        try_++;
    }
    if(val == 0)
        Logger.DEBUG(from + " downloaded");
    else {
        Logger.DEBUG(from + " failed to download");   
        throw new IOException("expection downloading "  + from + " to " + to + " from fs <" + fs + "> with error val: " + val);
    }
    return val;            
}
 
源代码11 项目: datawave   文件: MultiTableRangePartitioner.java
@Override
public int getPartition(BulkIngestKey key, Value value, int numPartitions) {
    readCacheFilesIfNecessary();
    
    String tableName = key.getTableName().toString();
    Text[] cutPointArray = splitsByTable.get().get(tableName);
    
    if (null == cutPointArray)
        return (tableName.hashCode() & Integer.MAX_VALUE) % numPartitions;
    key.getKey().getRow(holder);
    int index = Arrays.binarySearch(cutPointArray, holder);
    index = calculateIndex(index, numPartitions, tableName, cutPointArray.length);
    
    index = partitionLimiter.limit(numPartitions, index);
    
    TaskInputOutputContext<?,?,?,?> c = context;
    if (c != null && collectStats) {
        c.getCounter("Partitions: " + key.getTableName(), "part." + formatter.format(index)).increment(1);
    }
    
    return index;
}
 
源代码12 项目: big-c   文件: DistSum.java
/** Compute sigma */
static void compute(Summation sigma,
    TaskInputOutputContext<?, ?, NullWritable, TaskResult> context
    ) throws IOException, InterruptedException {
  String s;
  LOG.info(s = "sigma=" + sigma);
  context.setStatus(s);

  final long start = System.currentTimeMillis();
  sigma.compute();
  final long duration = System.currentTimeMillis() - start;
  final TaskResult result = new TaskResult(sigma, duration);

  LOG.info(s = "result=" + result);
  context.setStatus(s);
  context.write(NullWritable.get(), result);
}
 
@BeforeClass
@SuppressWarnings("unchecked")
public void setUp() {
  TaskInputOutputContext<Object, Object, Object, Object> mockContext = Mockito.mock(TaskInputOutputContext.class);

  this.recordsProcessedCount = Mockito.mock(Counter.class);
  Mockito.when(mockContext.getCounter(
      this.name, MetricRegistry.name(RECORDS_PROCESSED, Measurements.COUNT.getName())))
      .thenReturn(this.recordsProcessedCount);

  this.recordProcessRateCount = Mockito.mock(Counter.class);
  Mockito.when(mockContext.getCounter(
      this.name, MetricRegistry.name(RECORD_PROCESS_RATE, Measurements.COUNT.getName())))
      .thenReturn(this.recordProcessRateCount);

  this.recordSizeDistributionCount = Mockito.mock(Counter.class);
  Mockito.when(mockContext.getCounter(
      this.name, MetricRegistry.name(RECORD_SIZE_DISTRIBUTION, Measurements.COUNT.getName())))
      .thenReturn(this.recordSizeDistributionCount);

  this.totalDurationCount = Mockito.mock(Counter.class);
  Mockito.when(mockContext.getCounter(
      this.name, MetricRegistry.name(TOTAL_DURATION, Measurements.COUNT.getName())))
      .thenReturn(this.totalDurationCount);

  this.queueSize = Mockito.mock(Counter.class);
  Mockito.when(mockContext.getCounter(this.name, QUEUE_SIZE)).thenReturn(this.queueSize);

  this.hadoopCounterReporter = NewAPIHadoopCounterReporter.builder(mockContext)
      .convertRatesTo(TimeUnit.SECONDS)
      .convertDurationsTo(TimeUnit.SECONDS)
      .filter(MetricFilter.ALL)
      .build(MetricContext.builder(this.name).build());
}
 
源代码14 项目: big-c   文件: Chain.java
/**
 * Add mapper(the last mapper) that reads input from
 * queue and writes output to the output context
 */
@SuppressWarnings("unchecked")
void addMapper(ChainBlockingQueue<KeyValuePair<?, ?>> input,
    TaskInputOutputContext outputContext, int index) throws IOException,
    InterruptedException {
  Configuration conf = getConf(index);
  Class<?> keyClass = conf.getClass(MAPPER_INPUT_KEY_CLASS, Object.class);
  Class<?> valueClass = conf.getClass(MAPPER_INPUT_VALUE_CLASS, Object.class);
  RecordReader rr = new ChainRecordReader(keyClass, valueClass, input, conf);
  RecordWriter rw = new ChainRecordWriter(outputContext);
  MapRunner runner = new MapRunner(mappers.get(index), createMapContext(rr,
      rw, outputContext, getConf(index)), rr, rw);
  threads.add(runner);
}
 
源代码15 项目: datawave   文件: AggregatingContextWriter.java
@Override
protected void flush(Multimap<BulkIngestKey,Value> entries, TaskInputOutputContext<?,?,OK,OV> context) throws IOException, InterruptedException {
    // Note we are not calling the "countWrite" method as this will be done by the underlying ContextWriter
    // if so configured
    reducer.reduce(entries, context);
    reducer.flush(context);
}
 
源代码16 项目: datawave   文件: TableCachingContextWriter.java
private void flushAll(TaskInputOutputContext<?,?,BulkIngestKey,Value> context) throws IOException, InterruptedException {
    // pass all of the data through the delegate and clear the cache
    for (Map.Entry<Text,Multimap<BulkIngestKey,Value>> entries : aggregatedCache.entrySet()) {
        if (!entries.getValue().isEmpty()) {
            getCounter(context, FLUSHED_BUFFER_TOTAL, entries.getKey().toString()).increment(entries.getValue().size());
            getCounter(context, FLUSHED_BUFFER_COUNTER, entries.getKey().toString()).increment(1);
            contextWriter.write(entries.getValue(), context);
        }
    }
    aggregatedCache.clear();
}
 
源代码17 项目: datawave   文件: TableCachingContextWriter.java
private void cache(BulkIngestKey key, Collection<Value> values, TaskInputOutputContext<?,?,BulkIngestKey,Value> context) throws IOException,
                InterruptedException {
    List<Value> valueList = new ArrayList<>();
    valueList.addAll(values);
    
    Multimap<BulkIngestKey,Value> entries = aggregatedCache.get(key.getTableName());
    if (entries != null) {
        valueList.addAll(entries.removeAll(key));
    } else {
        entries = HashMultimap.create();
        aggregatedCache.put(key.getTableName(), entries);
    }
    
    // reduce the entries as needed
    if (valueList.size() > 1) {
        entries.putAll(key, reduceValues(key, valueList, context));
    } else {
        entries.putAll(key, valueList);
    }
    
    // now flush this tables cache if needed
    if (entries.size() >= tableCacheConf.get(key.getTableName())) {
        // register that we overran the cache for this table
        getCounter(context, FLUSHED_BUFFER_TOTAL, key.getTableName().toString()).increment(entries.size());
        getCounter(context, FLUSHED_BUFFER_COUNTER, key.getTableName().toString()).increment(1);
        contextWriter.write(entries, context);
        aggregatedCache.remove(key.getTableName());
    }
}
 
源代码18 项目: big-c   文件: Chain.java
@SuppressWarnings("unchecked")
<KEYIN, VALUEIN, KEYOUT, VALUEOUT> void runReducer(
    TaskInputOutputContext<KEYIN, VALUEIN, KEYOUT, VALUEOUT> context)
    throws IOException, InterruptedException {
  RecordWriter<KEYOUT, VALUEOUT> rw = new ChainRecordWriter<KEYOUT, VALUEOUT>(
      context);
  Reducer.Context reducerContext = createReduceContext(rw,
      (ReduceContext) context, rConf);
  reducer.run(reducerContext);
  rw.close(context);
}
 
源代码19 项目: datawave   文件: AbstractContextWriter.java
/**
 * Clean up the context writer. Default implementation executes the flush method.
 *
 * @param context
 * @throws IOException
 * @throws InterruptedException
 */
@Override
public void cleanup(TaskInputOutputContext<?,?,OK,OV> context) throws IOException, InterruptedException {
    commit(context);
    // also flush the counters at this point
    if (simpleClassName != null) {
        getCounter(context, "ContextWriter", simpleClassName).increment(this.count);
        this.count = 0;
    }
    if (counters != null) {
        counters.flush(getContext(context));
    }
    super.close();
}
 
源代码20 项目: datawave   文件: AggregatingMetricsStore.java
public AggregatingMetricsStore(ContextWriter<OK,OV> contextWriter, TaskInputOutputContext<?,?,OK,OV> context) {
    this.contextWriter = contextWriter;
    this.context = context;
    this.maxSize = MetricsConfiguration.getAggBufferSize(context.getConfiguration());
    this.table = new Text(MetricsConfiguration.getTable(context.getConfiguration()));
    this.counts = new Counts<>(maxSize * 2);
    this.flusher = new FlushMetrics();
}
 
源代码21 项目: hadoop   文件: Chain.java
/**
 * Create a map context that is based on ChainMapContext and the given record
 * reader and record writer
 */
private <KEYIN, VALUEIN, KEYOUT, VALUEOUT> 
Mapper<KEYIN, VALUEIN, KEYOUT, VALUEOUT>.Context createMapContext(
    RecordReader<KEYIN, VALUEIN> rr, RecordWriter<KEYOUT, VALUEOUT> rw,
    TaskInputOutputContext<KEYIN, VALUEIN, KEYOUT, VALUEOUT> context,
    Configuration conf) {
  MapContext<KEYIN, VALUEIN, KEYOUT, VALUEOUT> mapContext = 
    new ChainMapContextImpl<KEYIN, VALUEIN, KEYOUT, VALUEOUT>(
      context, rr, rw, conf);
  Mapper<KEYIN, VALUEIN, KEYOUT, VALUEOUT>.Context mapperContext = 
    new WrappedMapper<KEYIN, VALUEIN, KEYOUT, VALUEOUT>()
      .getMapContext(mapContext);
  return mapperContext;
}
 
源代码22 项目: hadoop   文件: Chain.java
@SuppressWarnings("unchecked")
<KEYIN, VALUEIN, KEYOUT, VALUEOUT> void runReducer(
    TaskInputOutputContext<KEYIN, VALUEIN, KEYOUT, VALUEOUT> context)
    throws IOException, InterruptedException {
  RecordWriter<KEYOUT, VALUEOUT> rw = new ChainRecordWriter<KEYOUT, VALUEOUT>(
      context);
  Reducer.Context reducerContext = createReduceContext(rw,
      (ReduceContext) context, rConf);
  reducer.run(reducerContext);
  rw.close(context);
}
 
源代码23 项目: datawave   文件: MetricsService.java
public MetricsService(ContextWriter<OK,OV> contextWriter, TaskInputOutputContext<?,?,OK,OV> context) {
    Configuration conf = context.getConfiguration();
    
    this.date = DateHelper.format(new Date());
    
    this.fieldNames = MetricsConfiguration.getFieldNames(conf);
    this.enabledLabels = MetricsConfiguration.getLabels(conf);
    this.enabledKeys = enabledLabels.keySet();
    
    this.wildcardedLabels = new HashSet<>();
    for (Map.Entry<String,String> entry : enabledLabels.entries()) {
        if (WILDCARD.equals(entry.getValue())) {
            wildcardedLabels.add(entry.getKey());
        }
    }
    
    this.receivers = new HashMap<>();
    for (MetricsReceiver receiver : MetricsConfiguration.getReceivers(conf)) {
        this.receivers.put(receiver.getMetric(), receiver);
        receiver.configure(conf, date);
    }
    
    this.store = new AggregatingMetricsStore<>(contextWriter, context);
    
    if (logger.isInfoEnabled()) {
        logger.info("Metrics Service Initialized");
        logger.info("enabledLabels = " + enabledLabels);
        logger.info("receivers = " + receivers);
        logger.info("fieldNames = " + fieldNames);
    }
    
    Preconditions.checkNotNull(fieldNames);
    Preconditions.checkArgument(!enabledLabels.isEmpty());
    Preconditions.checkArgument(!receivers.isEmpty());
}
 
源代码24 项目: jumbune   文件: LogUtil.java
/**
 * Get desired information from reducer's context like job, reduce instance,
 * etc
 * 
 * @param context
 *            - reduce context to get all the required information of
 *            reducer like its job, reduce instance, etc
 * @param className
 *            - Class which is calling this method
 * @param methodName
 *            - Class Method which is calling this method
 */
@SuppressWarnings(RAW_TYPES)
public static void getReduceContextInfo(TaskInputOutputContext context,
		String className, String methodName) {
	Counter counter = context.getCounter(MAPRED_COUNTER,
			REDUCE_INPUT_RECORDS);
	getLogMsg(className, methodName, counter.getDisplayName(), COUNTERS,
			counter.getValue());

	counter = context.getCounter(MAPRED_COUNTER, REDUCE_OUTPUT_RECORDS);
	getLogMsg(className, methodName, counter.getDisplayName(), COUNTERS,
			counter.getValue());
}
 
源代码25 项目: datawave   文件: AggregatingReducer.java
/**
 * This method is called once for each key. Most applications will define their reduce class by overriding this method. The default implementation is an
 * identity function.
 */
@SuppressWarnings("unchecked")
public void doReduce(IK key, Iterable<IV> values, TaskInputOutputContext<?,?,OK,OV> context) throws IOException, InterruptedException {
    for (IV value : values) {
        context.write((OK) key, (OV) value);
    }
}
 
源代码26 项目: datafu   文件: PartitioningReducer.java
@Override
public void setContext(TaskInputOutputContext<Object,Object,Object,Object> context)
{           
  super.setContext(context);
  
  // ... and we also write the final output to multiple directories
  _multipleOutputs = new AvroMultipleOutputs(context);
}
 
源代码27 项目: datawave   文件: ProtobufEdgeDataTypeHandler.java
protected int writeKey(Key key, Value val, TaskInputOutputContext<KEYIN,? extends RawRecordContainer,KEYOUT,VALUEOUT> context,
                ContextWriter<KEYOUT,VALUEOUT> contextWriter) throws IOException, InterruptedException {
    if (key == null)
        return 0;
    BulkIngestKey bk = new BulkIngestKey(new Text(this.edgeTableName), key);
    contextWriter.write(bk, val, context);
    return 1;
}
 
@Override
public long process(KEYIN key, RawRecordContainer event, Multimap<String,NormalizedContentInterface> eventFields,
                TaskInputOutputContext<KEYIN,? extends RawRecordContainer,KEYOUT,VALUEOUT> context, ContextWriter<KEYOUT,VALUEOUT> contextWriter)
                throws IOException, InterruptedException {
    
    // Hold some event-specific variables to avoid re-processing
    this.shardId = getShardId(event);
    
    if (tokenHelper.isVerboseShardCounters()) {
        context.getCounter("EVENT_SHARD_ID", new String(this.shardId)).increment(1);
    }
    
    this.eventDataTypeName = event.getDataType().outputName();
    this.eventUid = event.getId().toString();
    
    // write the standard set of keys
    Multimap<BulkIngestKey,Value> keys = super.processBulk(key, event, eventFields, new ContextWrappedStatusReporter(context));
    long count = keys.size();
    contextWriter.write(keys, context);
    
    StatusReporter reporter = new ContextWrappedStatusReporter(context);
    
    // gc before we get into the tokenization piece
    keys = null;
    
    // stream the tokens to the context writer here
    count += tokenizeEvent(event, context, contextWriter, reporter);
    
    // return the number of records written
    return count;
}
 
源代码29 项目: datawave   文件: WikipediaDataTypeHandlerTest.java
@Override
protected void flush(Multimap<BulkIngestKey,Value> entries, TaskInputOutputContext<?,?,BulkIngestKey,Value> context) throws IOException,
                InterruptedException {
    for (Map.Entry<BulkIngestKey,Value> entry : entries.entries()) {
        cache.put(entry.getKey(), entry.getValue());
    }
}
 
源代码30 项目: datawave   文件: WikipediaDataTypeHandler.java
/**
 * Creates and writes the BulkIngestKey for the event's field/value to the ContextWriter (instead of the Multimap that the {@link ShardedDataTypeHandler}
 * uses).
 * 
 * @param event
 * @param contextWriter
 * @param context
 * @param nFV
 * @param shardId
 * @param visibility
 * @throws IOException
 * @throws InterruptedException
 */
@Override
protected void createShardEventColumn(RawRecordContainer event, ContextWriter<KEYOUT,VALUEOUT> contextWriter,
                TaskInputOutputContext<KEYIN,? extends RawRecordContainer,KEYOUT,VALUEOUT> context, NormalizedContentInterface nFV, byte[] shardId,
                byte[] visibility) throws IOException, InterruptedException {
    
    String fieldName = nFV.getEventFieldName();
    String fieldValue = nFV.getEventFieldValue();
    
    if (this.ingestHelper.isIndexOnlyField(fieldName))
        return;
    
    if (this.ingestHelper.isCompositeField(fieldName) && !this.ingestHelper.isOverloadedCompositeField(fieldName))
        return;
    
    if (StringUtils.isEmpty(fieldValue))
        return;
    
    Text colf = new Text(event.getDataType().outputName());
    TextUtil.textAppend(colf, event.getId().toString(), this.eventReplaceMalformedUTF8);
    
    Text colq = new Text(fieldName);
    TextUtil.textAppend(colq, fieldValue, this.ingestHelper.getReplaceMalformedUTF8());
    Key k = createKey(shardId, colf, colq, visibility, event.getDate(), this.ingestHelper.getDeleteMode());
    BulkIngestKey bKey = new BulkIngestKey(new Text(this.getShardTableName()), k);
    contextWriter.write(bKey, DataTypeHandler.NULL_VALUE, context);
}