org.apache.hadoop.mapred.MapRunnable#org.apache.tez.runtime.library.api.KeyValueWriter源码实例Demo

下面列出了org.apache.hadoop.mapred.MapRunnable#org.apache.tez.runtime.library.api.KeyValueWriter 实例代码,或者点击链接到github查看源代码,也可以在右侧发表评论。

源代码1 项目: sequenceiq-samples   文件: TopK.java
@Override
public void run() throws Exception {
    Preconditions.checkArgument(getInputs().size() == 1);
    Preconditions.checkArgument(getOutputs().size() == 1);
    // the recommended approach is to cast the reader/writer to a specific type instead
    // of casting the input/output. This allows the actual input/output type to be replaced
    // without affecting the semantic guarantees of the data type that are represented by
    // the reader and writer.
    // The inputs/outputs are referenced via the names assigned in the DAG.
    KeyValueReader kvReader = (KeyValueReader) getInputs().get(INPUT).getReader();
    KeyValueWriter kvWriter = (KeyValueWriter) getOutputs().get(SUM).getWriter();
    while (kvReader.next()) {
        String[] split = kvReader.getCurrentValue().toString().split(",");
        if (split.length > columnIndex) {
            text.set(split[columnIndex]);
            kvWriter.write(text, ONE);
        }
    }
}
 
源代码2 项目: sequenceiq-samples   文件: TopK.java
@Override
public void run() throws Exception {
    Preconditions.checkArgument(getInputs().size() == 1);
    Preconditions.checkArgument(getOutputs().size() == 1);
    KeyValueWriter kvWriter = (KeyValueWriter) getOutputs().get(OUTPUT).getWriter();
    UnorderedKVReader kvReader = (UnorderedKVReader) getInputs().get(SUM).getReader();
    while (kvReader.next()) {
        localTop.store(
                Integer.valueOf(kvReader.getCurrentKey().toString()),
                kvReader.getCurrentValue().toString()
        );
    }
    Map<Integer, List<String>> result = localTop.getTopKSorted();
    for (int top : result.keySet()) {
        kvWriter.write(new Text(join(result.get(top), ",")), new IntWritable(top));
    }
}
 
源代码3 项目: spork   文件: POValueOutputTez.java
@Override
public void attachOutputs(Map<String, LogicalOutput> outputs,
        Configuration conf) throws ExecException {
    writers = new ArrayList<KeyValueWriter>();
    for (String outputKey : outputKeys) {
        LogicalOutput output = outputs.get(outputKey);
        if (output == null) {
            throw new ExecException("Output to vertex " + outputKey
                    + " is missing");
        }
        try {
            KeyValueWriter writer = (KeyValueWriter) output.getWriter();
            writers.add(writer);
            LOG.info("Attached output to vertex " + outputKey + " : output=" + output + ", writer=" + writer);
        } catch (Exception e) {
            throw new ExecException(e);
        }
    }
    count = 0;
    if (!taskIndexWithRecordIndexAsKey) {
        key = EMPTY_KEY;
    }
}
 
源代码4 项目: incubator-tez   文件: IntersectExample.java
@Override
public void run() throws Exception {
  Preconditions.checkState(getInputs().size() == 1);
  Preconditions.checkState(getOutputs().size() == 1);
  LogicalInput input = getInputs().values().iterator().next();
  Reader rawReader = input.getReader();
  Preconditions.checkState(rawReader instanceof KeyValueReader);
  LogicalOutput output = getOutputs().values().iterator().next();

  KeyValueReader reader = (KeyValueReader) rawReader;
  KeyValueWriter writer = (KeyValueWriter) output.getWriter();

  while (reader.next()) {
    Object val = reader.getCurrentValue();
    writer.write(val, NullWritable.get());
  }
}
 
@Override
public void run() throws Exception {
  Preconditions.checkArgument(getOutputs().size() == 1);
  OnFileUnorderedKVOutput output = (OnFileUnorderedKVOutput) getOutputs().values().iterator()
      .next();
  KeyValueWriter kvWriter = (KeyValueWriter) output.getWriter();
  kvWriter.write(word, new IntWritable(getContext().getTaskIndex()));
  byte[] userPayload = getContext().getUserPayload();
  if (userPayload != null) {
    boolean doLocalityCheck = userPayload[0] > 0 ? true : false;
    if (doLocalityCheck) {
      ObjectRegistry objectRegistry = ObjectRegistryFactory.getObjectRegistry();
      String entry = String.valueOf(getContext().getTaskIndex());
      objectRegistry.add(ObjectLifeCycle.DAG, entry, entry);
    }
  }
}
 
源代码6 项目: incubator-tez   文件: WordCount.java
@Override
public void run() throws Exception {
  Preconditions.checkArgument(getInputs().size() == 1);
  Preconditions.checkArgument(getOutputs().size() == 1);
  MRInput input = (MRInput) getInputs().values().iterator().next();
  KeyValueReader kvReader = input.getReader();
  Output output = getOutputs().values().iterator().next();
  KeyValueWriter kvWriter = (KeyValueWriter) output.getWriter();
  while (kvReader.next()) {
    StringTokenizer itr = new StringTokenizer(kvReader.getCurrentValue().toString());
    while (itr.hasMoreTokens()) {
      word.set(itr.nextToken());
      kvWriter.write(word, one);
    }
  }
}
 
源代码7 项目: incubator-tez   文件: WordCount.java
@Override
public void run() throws Exception {
  Preconditions.checkArgument(getInputs().size() == 1);
  MROutput out = (MROutput) getOutputs().values().iterator().next();
  KeyValueWriter kvWriter = out.getWriter();
  KeyValuesReader kvReader = (KeyValuesReader) getInputs().values().iterator().next()
      .getReader();
  while (kvReader.next()) {
    Text word = (Text) kvReader.getCurrentKey();
    int sum = 0;
    for (Object value : kvReader.getCurrentValues()) {
      sum += ((IntWritable) value).get();
    }
    kvWriter.write(word, new IntWritable(sum));
  }
}
 
源代码8 项目: incubator-tez   文件: MROutput.java
@Override
public KeyValueWriter getWriter() throws IOException {
  return new KeyValueWriter() {
    private final boolean useNewWriter = useNewApi;

    @SuppressWarnings("unchecked")
    @Override
    public void write(Object key, Object value) throws IOException {
      if (useNewWriter) {
        try {
          newRecordWriter.write(key, value);
        } catch (InterruptedException e) {
          Thread.currentThread().interrupt();
          throw new IOException("Interrupted while writing next key-value",e);
        }
      } else {
        oldRecordWriter.write(key, value);
      }
      outputRecordCounter.increment(1);
    }
  };
}
 
源代码9 项目: tez   文件: CartesianProduct.java
@Override
public void run() throws Exception {
  KeyValueWriter kvWriter = (KeyValueWriter) getOutputs().get(OUTPUT).getWriter();
  KeyValueReader kvReader1 = (KeyValueReader) getInputs().get(VERTEX1).getReader();
  KeyValueReader kvReader2 = (KeyValueReader) getInputs().get(VERTEX2).getReader();
  KeyValueReader kvReader3 = (KeyValueReader) getInputs().get(VERTEX3).getReader();
  Set<String> v2TokenSet = new HashSet<>();
  Set<String> v3TokenSet = new HashSet<>();

  while (kvReader2.next()) {
    v2TokenSet.add(kvReader2.getCurrentKey().toString());
  }
  while (kvReader3.next()) {
    v3TokenSet.add(kvReader3.getCurrentKey().toString());
  }

  while (kvReader1.next()) {
    String left = kvReader1.getCurrentKey().toString();
    if (v3TokenSet.contains(left)) {
      for (String right : v2TokenSet) {
        kvWriter.write(left, right);
      }
    }
  }
}
 
源代码10 项目: tez   文件: SortMergeJoinExample.java
@Override
public void run() throws Exception {
  Preconditions.checkState(getInputs().size() == 2);
  Preconditions.checkState(getOutputs().size() == 1);
  // Get the input data for the 2 sides of the join from the 2 inputs
  LogicalInput logicalInput1 = getInputs().get(input1);
  LogicalInput logicalInput2 = getInputs().get(input2);
  Reader inputReader1 = logicalInput1.getReader();
  Reader inputReader2 = logicalInput2.getReader();
  Preconditions.checkState(inputReader1 instanceof KeyValuesReader);
  Preconditions.checkState(inputReader2 instanceof KeyValuesReader);
  LogicalOutput lo = getOutputs().get(joinOutput);
  Preconditions.checkState(lo.getWriter() instanceof KeyValueWriter);
  KeyValueWriter writer = (KeyValueWriter) lo.getWriter();

  join((KeyValuesReader) inputReader1, (KeyValuesReader) inputReader2,
      writer);
}
 
源代码11 项目: tez   文件: WordCount.java
@Override
public void run() throws Exception {
  Preconditions.checkArgument(getInputs().size() == 1);
  Preconditions.checkArgument(getOutputs().size() == 1);
  // the recommended approach is to cast the reader/writer to a specific type instead
  // of casting the input/output. This allows the actual input/output type to be replaced
  // without affecting the semantic guarantees of the data type that are represented by
  // the reader and writer.
  // The inputs/outputs are referenced via the names assigned in the DAG.
  KeyValueReader kvReader = (KeyValueReader) getInputs().get(INPUT).getReader();
  KeyValueWriter kvWriter = (KeyValueWriter) getOutputs().get(SUMMATION).getWriter();
  while (kvReader.next()) {
    StringTokenizer itr = new StringTokenizer(kvReader.getCurrentValue().toString());
    while (itr.hasMoreTokens()) {
      word.set(itr.nextToken());
      // Count 1 every time a word is observed. Word is the key a 1 is the value
      kvWriter.write(word, one);
    }
  }
}
 
源代码12 项目: tez   文件: WordCount.java
@Override
public void run() throws Exception {
  Preconditions.checkArgument(getInputs().size() == 1);
  Preconditions.checkArgument(getOutputs().size() == 1);
  KeyValueWriter kvWriter = (KeyValueWriter) getOutputs().get(OUTPUT).getWriter();
  // The KeyValues reader provides all values for a given key. The aggregation of values per key
  // is done by the LogicalInput. Since the key is the word and the values are its counts in 
  // the different TokenProcessors, summing all values per key provides the sum for that word.
  KeyValuesReader kvReader = (KeyValuesReader) getInputs().get(TOKENIZER).getReader();
  while (kvReader.next()) {
    Text word = (Text) kvReader.getCurrentKey();
    int sum = 0;
    for (Object value : kvReader.getCurrentValues()) {
      sum += ((IntWritable) value).get();
    }
    kvWriter.write(word, new IntWritable(sum));
  }
  // deriving from SimpleMRProcessor takes care of committing the output
  // It automatically invokes the commit logic for the OutputFormat if necessary.
}
 
源代码13 项目: tez   文件: HashJoinExample.java
@Override
public void run() throws Exception {
  Preconditions.checkState(getInputs().size() == 1);
  Preconditions.checkState(getOutputs().size() == 1);
  // not looking up inputs and outputs by name because there is just one
  // instance and this processor is used in many different DAGs with
  // different names for inputs and outputs
  LogicalInput input = getInputs().values().iterator().next();
  Reader rawReader = input.getReader();
  Preconditions.checkState(rawReader instanceof KeyValueReader);
  LogicalOutput output = getOutputs().values().iterator().next();

  KeyValueReader reader = (KeyValueReader) rawReader;
  KeyValueWriter writer = (KeyValueWriter) output.getWriter();

  while (reader.next()) {
    Object val = reader.getCurrentValue();
    // The data value itself is the join key. Simply write it out as the
    // key.
    // The output value is null.
    writer.write(val, NullWritable.get());
  }
}
 
源代码14 项目: tez   文件: OrderedWordCount.java
@Override
public void run() throws Exception {
  Preconditions.checkArgument(getInputs().size() == 1);
  Preconditions.checkArgument(getOutputs().size() == 1);
  // the recommended approach is to cast the reader/writer to a specific type instead
  // of casting the input/output. This allows the actual input/output type to be replaced
  // without affecting the semantic guarantees of the data type that are represented by
  // the reader and writer.
  // The inputs/outputs are referenced via the names assigned in the DAG.
  KeyValueWriter kvWriter = (KeyValueWriter) getOutputs().get(SORTER).getWriter();
  KeyValuesReader kvReader = (KeyValuesReader) getInputs().get(TOKENIZER).getReader();
  while (kvReader.next()) {
    Text word = (Text) kvReader.getCurrentKey();
    int sum = 0;
    for (Object value : kvReader.getCurrentValues()) {
      sum += ((IntWritable) value).get();
    }
    // write the sum as the key and the word as the value
    kvWriter.write(new IntWritable(sum), word);
  }
}
 
源代码15 项目: tez   文件: BroadcastAndOneToOneExample.java
@Override
public void run() throws Exception {
  Preconditions.checkArgument(getOutputs().size() == 1);
  UnorderedKVOutput output = (UnorderedKVOutput) getOutputs().values().iterator()
      .next();
  KeyValueWriter kvWriter = (KeyValueWriter) output.getWriter();
  kvWriter.write(word, new IntWritable(getContext().getTaskIndex()));
  ByteBuffer userPayload =
      getContext().getUserPayload() == null ? null : getContext().getUserPayload().getPayload();
  if (userPayload != null) {
    boolean doLocalityCheck = getContext().getUserPayload().getPayload().get(0) > 0 ? true : false;
    if (doLocalityCheck) {
      ObjectRegistry objectRegistry = getContext().getObjectRegistry();
      String entry = String.valueOf(getContext().getTaskIndex());
      objectRegistry.cacheForDAG(entry, entry);
    }
  }
}
 
源代码16 项目: tez   文件: MROutput.java
/**
 * Get a key value write to write Map Reduce compatible output
 */
@Override
public KeyValueWriter getWriter() throws IOException {
  return new KeyValueWriter() {
    private final boolean useNewWriter = useNewApi;

    @SuppressWarnings("unchecked")
    @Override
    public void write(Object key, Object value) throws IOException {
      if (useNewWriter) {
        try {
          newRecordWriter.write(key, value);
        } catch (InterruptedException e) {
          Thread.currentThread().interrupt();
          throw new IOInterruptedException("Interrupted while writing next key-value",e);
        }
      } else {
        oldRecordWriter.write(key, value);
      }
      outputRecordCounter.increment(1);
      getContext().notifyProgress();
    }
  };
}
 
源代码17 项目: sequenceiq-samples   文件: TopK.java
@Override
public void run() throws Exception {
    Preconditions.checkArgument(getInputs().size() == 1);
    Preconditions.checkArgument(getOutputs().size() == 1);
    // The KeyValues reader provides all values for a given key. The aggregation of values per key
    // is done by the LogicalInput. Since the key is the word and the values are its counts in
    // the different TokenProcessors, summing all values per key provides the sum for that word.
    KeyValueWriter kvWriter = (KeyValueWriter) getOutputs().get(WRITER).getWriter();
    KeyValuesReader kvReader = (KeyValuesReader) getInputs().get(TOKENIZER).getReader();
    while (kvReader.next()) {
        Text currentWord = (Text) kvReader.getCurrentKey();
        int sum = 0;
        for (Object val : kvReader.getCurrentValues()) {
            sum += ((IntWritable) val).get();
        }
        localTop.store(sum, currentWord.toString());
    }

    // write to the output only the local top results
    Map<Integer, List<String>> result = localTop.getTopK();
    for (int top : result.keySet()) {
        IntWritable topWritable = new IntWritable(top);
        for (String string : result.get(top)) {
            word.set(string);
            kvWriter.write(topWritable, word);
        }
    }
}
 
源代码18 项目: sequenceiq-samples   文件: TopKDataGen.java
@Override
public void run() throws Exception {
    KeyValueWriter streamOutputWriter = (KeyValueWriter) getOutputs().get(OUTPUT).getWriter();
    long sizeLarge = 0;
    while (sizeLarge < streamOutputFileSize) {
        String str = createRowString();
        Text text = new Text(str);
        int size = text.getLength();
        streamOutputWriter.write(text, NullWritable.get());
        sizeLarge += size;
    }
}
 
源代码19 项目: spork   文件: POLocalRearrangeTez.java
@Override
public void attachOutputs(Map<String, LogicalOutput> outputs,
        Configuration conf) throws ExecException {
    LogicalOutput output = outputs.get(outputKey);
    if (output == null) {
        throw new ExecException("Output to vertex " + outputKey + " is missing");
    }
    try {
        writer = (KeyValueWriter) output.getWriter();
        LOG.info("Attached output to vertex " + outputKey + " : output=" + output + ", writer=" + writer);
    } catch (Exception e) {
        throw new ExecException(e);
    }
}
 
源代码20 项目: spork   文件: POCounterStatsTez.java
@Override
public void attachOutputs(Map<String, LogicalOutput> outputs,
        Configuration conf) throws ExecException {
    LogicalOutput output = outputs.get(outputKey);
    if (output == null) {
        throw new ExecException("Output to vertex " + outputKey + " is missing");
    }
    try {
        writer = (KeyValueWriter) output.getWriter();
        LOG.info("Attached output to vertex " + outputKey + " : output=" + output + ", writer=" + writer);
    } catch (Exception e) {
        throw new ExecException(e);
    }
}
 
源代码21 项目: spork   文件: POValueOutputTez.java
@Override
public Result getNextTuple() throws ExecException {
    Result inp;
    while (true) {
        inp = processInput();
        if (inp.returnStatus == POStatus.STATUS_EOP
                || inp.returnStatus == POStatus.STATUS_ERR) {
            break;
        }
        if (inp.returnStatus == POStatus.STATUS_NULL) {
            continue;
        }
        for (KeyValueWriter writer : writers) {
            try {
                if (taskIndexWithRecordIndexAsKey) {
                    Tuple tuple = tupleFactory.newTuple(2);
                    tuple.set(0, taskIndex);
                    tuple.set(1, count++);
                    key = tuple;
                }
                writer.write(key, inp.result);
            } catch (IOException e) {
                throw new ExecException(e);
            }
        }
        return RESULT_EMPTY;
    }
    return inp;
}
 
源代码22 项目: spork   文件: POIdentityInOutTez.java
@Override
public void attachOutputs(Map<String, LogicalOutput> outputs,
        Configuration conf) throws ExecException {
    LogicalOutput output = outputs.get(outputKey);
    if (output == null) {
        throw new ExecException("Output to vertex " + outputKey + " is missing");
    }
    try {
        writer = (KeyValueWriter) output.getWriter();
        LOG.info("Attached output to vertex " + outputKey + " : output=" + output + ", writer=" + writer);
    } catch (Exception e) {
        throw new ExecException(e);
    }
}
 
源代码23 项目: incubator-tez   文件: IntersectExample.java
@Override
public void run() throws Exception {
  Preconditions.checkState(getInputs().size() == 2);
  Preconditions.checkState(getOutputs().size() == 1);
  LogicalInput streamInput = getInputs().get("partitioner1");
  LogicalInput hashInput = getInputs().get("partitioner2");
  Reader rawStreamReader = streamInput.getReader();
  Reader rawHashReader = hashInput.getReader();
  Preconditions.checkState(rawStreamReader instanceof KeyValueReader);
  Preconditions.checkState(rawHashReader instanceof KeyValueReader);
  LogicalOutput lo = getOutputs().values().iterator().next();
  Preconditions.checkState(lo instanceof MROutput);
  MROutput output = (MROutput) lo;
  KeyValueWriter writer = output.getWriter();

  KeyValueReader hashKvReader = (KeyValueReader) rawHashReader;
  Set<Text> keySet = new HashSet<Text>();
  while (hashKvReader.next()) {
    keySet.add(new Text((Text) hashKvReader.getCurrentKey()));
  }

  KeyValueReader streamKvReader = (KeyValueReader) rawStreamReader;
  while (streamKvReader.next()) {
    Text key = (Text) streamKvReader.getCurrentKey();
    if (keySet.contains(key)) {
      writer.write(key, NullWritable.get());
    }
  }

  LOG.info("Completed Processing. Trying to commit");
  while (!getContext().canCommit()) {
    Thread.sleep(100l);
  }
  output.commit();
}
 
源代码24 项目: incubator-tez   文件: MapProcessor.java
void runOldMapper(
    final JobConf job,
    final MRTaskReporter reporter,
    final MRInputLegacy input,
    final KeyValueWriter output
    ) throws IOException, InterruptedException {

  // Initialize input in-line since it sets parameters which may be used by the processor.
  // Done only for MRInput.
  // TODO use new method in MRInput to get required info
  //input.initialize(job, master);
  
  InputSplit inputSplit = input.getOldInputSplit();
  
  updateJobWithSplit(job, inputSplit);

  RecordReader in = new OldRecordReader(input);

  OutputCollector collector = new OldOutputCollector(output);

  MapRunnable runner =
      (MapRunnable)ReflectionUtils.newInstance(job.getMapRunnerClass(), job);

  runner.run(in, collector, (Reporter)reporter);
  
  // Set progress to 1.0f if there was no exception,
  reporter.setProgress(1.0f);
  // start the sort phase only if there are reducers
  this.statusUpdate();
}
 
源代码25 项目: tez   文件: CartesianProduct.java
@Override
public void run() throws Exception {
  Preconditions.checkArgument(getInputs().size() == 1);
  Preconditions.checkArgument(getOutputs().size() == 1);
  KeyValueReader kvReader = (KeyValueReader) getInputs().get(INPUT).getReader();
  KeyValueWriter kvWriter = (KeyValueWriter) getOutputs().get(VERTEX4).getWriter();
  while (kvReader.next()) {
    StringTokenizer itr = new StringTokenizer(kvReader.getCurrentValue().toString());
    while (itr.hasMoreTokens()) {
      kvWriter.write(new Text(itr.nextToken()), new IntWritable(1));
    }
  }
}
 
源代码26 项目: tez   文件: HashJoinExample.java
@Override
public void run() throws Exception {
  Preconditions.checkState(getInputs().size() == 2);
  Preconditions.checkState(getOutputs().size() == 1);
  // Get the input data for the 2 sides of the join from the 2 inputs
  LogicalInput streamInput = getInputs().get(streamingSide);
  LogicalInput hashInput = getInputs().get(hashSide);
  Reader rawStreamReader = streamInput.getReader();
  Reader rawHashReader = hashInput.getReader();
  Preconditions.checkState(rawStreamReader instanceof KeyValueReader);
  Preconditions.checkState(rawHashReader instanceof KeyValueReader);
  LogicalOutput lo = getOutputs().get(joinOutput);
  Preconditions.checkState(lo.getWriter() instanceof KeyValueWriter);
  KeyValueWriter writer = (KeyValueWriter) lo.getWriter();

  // create a hash table for the hash side
  KeyValueReader hashKvReader = (KeyValueReader) rawHashReader;
  Set<Text> keySet = new HashSet<Text>();
  while (hashKvReader.next()) {
    keySet.add(new Text((Text) hashKvReader.getCurrentKey()));
  }

  // read the stream side and join it using the hash table
  KeyValueReader streamKvReader = (KeyValueReader) rawStreamReader;
  while (streamKvReader.next()) {
    Text key = (Text) streamKvReader.getCurrentKey();
    if (keySet.contains(key)) {
      writer.write(key, NullWritable.get());
    }
  }
}
 
源代码27 项目: tez   文件: OrderedWordCount.java
@Override
public void run() throws Exception {
  Preconditions.checkArgument(getInputs().size() == 1);
  Preconditions.checkArgument(getOutputs().size() == 1);
  KeyValueWriter kvWriter = (KeyValueWriter) getOutputs().get(OUTPUT).getWriter();
  KeyValuesReader kvReader = (KeyValuesReader) getInputs().get(SUMMATION).getReader();
  while (kvReader.next()) {
    Object sum = kvReader.getCurrentKey();
    for (Object word : kvReader.getCurrentValues()) {
      kvWriter.write(word, sum);
    }
  }
  // deriving from SimpleMRProcessor takes care of committing the output
}
 
源代码28 项目: tez   文件: CartesianProduct.java
@Override
public void run() throws Exception {
  Preconditions.checkArgument(getInputs().size() == 1);
  Preconditions.checkArgument(getOutputs().size() == 1);
  KeyValueReader kvReader = (KeyValueReader) getInputs().get(INPUT).getReader();
  KeyValueWriter kvWriter = (KeyValueWriter) getOutputs().get(VERTEX3).getWriter();
  while (kvReader.next()) {
    Object key = kvReader.getCurrentKey();
    Object value = kvReader.getCurrentValue();
    kvWriter.write(new Text((String)key), new IntWritable(1));
  }
}
 
源代码29 项目: tez   文件: CartesianProduct.java
@Override
public Writer getWriter() throws Exception {
  return new KeyValueWriter() {
    @Override
    public void write(Object key, Object value) throws IOException {
      System.out.println(key + " XXX " + value);
    }
  };
}
 
源代码30 项目: tez   文件: FilterByWordOutputProcessor.java
@Override
public void run() throws Exception {
  
  if (inputs.size() != 1) {
    throw new IllegalStateException("FilterByWordOutputProcessor processor can only work with a single input");
  }

  if (outputs.size() != 1) {
    throw new IllegalStateException("FilterByWordOutputProcessor processor can only work with a single output");
  }

  for (LogicalInput input : inputs.values()) {
    input.start();
  }
  for (LogicalOutput output : outputs.values()) {
    output.start();
  }

  LogicalInput li = inputs.values().iterator().next();
  if (! (li instanceof UnorderedKVInput)) {
    throw new IllegalStateException("FilterByWordOutputProcessor processor can only work with ShuffledUnorderedKVInput");
  }

  LogicalOutput lo = outputs.values().iterator().next();
  if (! (lo instanceof MROutput)) {
    throw new IllegalStateException("FilterByWordOutputProcessor processor can only work with MROutput");
  }

  UnorderedKVInput kvInput = (UnorderedKVInput) li;
  MROutput mrOutput = (MROutput) lo;

  KeyValueReader kvReader = kvInput.getReader();
  KeyValueWriter kvWriter = mrOutput.getWriter();
  while (kvReader.next()) {
    Object key = kvReader.getCurrentKey();
    Object value = kvReader.getCurrentValue();

    kvWriter.write(key, value);
  }
}