下面列出了org.apache.hadoop.mapred.MapRunnable#org.apache.tez.runtime.library.api.KeyValueWriter 实例代码,或者点击链接到github查看源代码,也可以在右侧发表评论。
@Override
public void run() throws Exception {
Preconditions.checkArgument(getInputs().size() == 1);
Preconditions.checkArgument(getOutputs().size() == 1);
// the recommended approach is to cast the reader/writer to a specific type instead
// of casting the input/output. This allows the actual input/output type to be replaced
// without affecting the semantic guarantees of the data type that are represented by
// the reader and writer.
// The inputs/outputs are referenced via the names assigned in the DAG.
KeyValueReader kvReader = (KeyValueReader) getInputs().get(INPUT).getReader();
KeyValueWriter kvWriter = (KeyValueWriter) getOutputs().get(SUM).getWriter();
while (kvReader.next()) {
String[] split = kvReader.getCurrentValue().toString().split(",");
if (split.length > columnIndex) {
text.set(split[columnIndex]);
kvWriter.write(text, ONE);
}
}
}
@Override
public void run() throws Exception {
Preconditions.checkArgument(getInputs().size() == 1);
Preconditions.checkArgument(getOutputs().size() == 1);
KeyValueWriter kvWriter = (KeyValueWriter) getOutputs().get(OUTPUT).getWriter();
UnorderedKVReader kvReader = (UnorderedKVReader) getInputs().get(SUM).getReader();
while (kvReader.next()) {
localTop.store(
Integer.valueOf(kvReader.getCurrentKey().toString()),
kvReader.getCurrentValue().toString()
);
}
Map<Integer, List<String>> result = localTop.getTopKSorted();
for (int top : result.keySet()) {
kvWriter.write(new Text(join(result.get(top), ",")), new IntWritable(top));
}
}
@Override
public void attachOutputs(Map<String, LogicalOutput> outputs,
Configuration conf) throws ExecException {
writers = new ArrayList<KeyValueWriter>();
for (String outputKey : outputKeys) {
LogicalOutput output = outputs.get(outputKey);
if (output == null) {
throw new ExecException("Output to vertex " + outputKey
+ " is missing");
}
try {
KeyValueWriter writer = (KeyValueWriter) output.getWriter();
writers.add(writer);
LOG.info("Attached output to vertex " + outputKey + " : output=" + output + ", writer=" + writer);
} catch (Exception e) {
throw new ExecException(e);
}
}
count = 0;
if (!taskIndexWithRecordIndexAsKey) {
key = EMPTY_KEY;
}
}
@Override
public void run() throws Exception {
Preconditions.checkState(getInputs().size() == 1);
Preconditions.checkState(getOutputs().size() == 1);
LogicalInput input = getInputs().values().iterator().next();
Reader rawReader = input.getReader();
Preconditions.checkState(rawReader instanceof KeyValueReader);
LogicalOutput output = getOutputs().values().iterator().next();
KeyValueReader reader = (KeyValueReader) rawReader;
KeyValueWriter writer = (KeyValueWriter) output.getWriter();
while (reader.next()) {
Object val = reader.getCurrentValue();
writer.write(val, NullWritable.get());
}
}
@Override
public void run() throws Exception {
Preconditions.checkArgument(getOutputs().size() == 1);
OnFileUnorderedKVOutput output = (OnFileUnorderedKVOutput) getOutputs().values().iterator()
.next();
KeyValueWriter kvWriter = (KeyValueWriter) output.getWriter();
kvWriter.write(word, new IntWritable(getContext().getTaskIndex()));
byte[] userPayload = getContext().getUserPayload();
if (userPayload != null) {
boolean doLocalityCheck = userPayload[0] > 0 ? true : false;
if (doLocalityCheck) {
ObjectRegistry objectRegistry = ObjectRegistryFactory.getObjectRegistry();
String entry = String.valueOf(getContext().getTaskIndex());
objectRegistry.add(ObjectLifeCycle.DAG, entry, entry);
}
}
}
@Override
public void run() throws Exception {
Preconditions.checkArgument(getInputs().size() == 1);
Preconditions.checkArgument(getOutputs().size() == 1);
MRInput input = (MRInput) getInputs().values().iterator().next();
KeyValueReader kvReader = input.getReader();
Output output = getOutputs().values().iterator().next();
KeyValueWriter kvWriter = (KeyValueWriter) output.getWriter();
while (kvReader.next()) {
StringTokenizer itr = new StringTokenizer(kvReader.getCurrentValue().toString());
while (itr.hasMoreTokens()) {
word.set(itr.nextToken());
kvWriter.write(word, one);
}
}
}
@Override
public void run() throws Exception {
Preconditions.checkArgument(getInputs().size() == 1);
MROutput out = (MROutput) getOutputs().values().iterator().next();
KeyValueWriter kvWriter = out.getWriter();
KeyValuesReader kvReader = (KeyValuesReader) getInputs().values().iterator().next()
.getReader();
while (kvReader.next()) {
Text word = (Text) kvReader.getCurrentKey();
int sum = 0;
for (Object value : kvReader.getCurrentValues()) {
sum += ((IntWritable) value).get();
}
kvWriter.write(word, new IntWritable(sum));
}
}
@Override
public KeyValueWriter getWriter() throws IOException {
return new KeyValueWriter() {
private final boolean useNewWriter = useNewApi;
@SuppressWarnings("unchecked")
@Override
public void write(Object key, Object value) throws IOException {
if (useNewWriter) {
try {
newRecordWriter.write(key, value);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new IOException("Interrupted while writing next key-value",e);
}
} else {
oldRecordWriter.write(key, value);
}
outputRecordCounter.increment(1);
}
};
}
@Override
public void run() throws Exception {
KeyValueWriter kvWriter = (KeyValueWriter) getOutputs().get(OUTPUT).getWriter();
KeyValueReader kvReader1 = (KeyValueReader) getInputs().get(VERTEX1).getReader();
KeyValueReader kvReader2 = (KeyValueReader) getInputs().get(VERTEX2).getReader();
KeyValueReader kvReader3 = (KeyValueReader) getInputs().get(VERTEX3).getReader();
Set<String> v2TokenSet = new HashSet<>();
Set<String> v3TokenSet = new HashSet<>();
while (kvReader2.next()) {
v2TokenSet.add(kvReader2.getCurrentKey().toString());
}
while (kvReader3.next()) {
v3TokenSet.add(kvReader3.getCurrentKey().toString());
}
while (kvReader1.next()) {
String left = kvReader1.getCurrentKey().toString();
if (v3TokenSet.contains(left)) {
for (String right : v2TokenSet) {
kvWriter.write(left, right);
}
}
}
}
@Override
public void run() throws Exception {
Preconditions.checkState(getInputs().size() == 2);
Preconditions.checkState(getOutputs().size() == 1);
// Get the input data for the 2 sides of the join from the 2 inputs
LogicalInput logicalInput1 = getInputs().get(input1);
LogicalInput logicalInput2 = getInputs().get(input2);
Reader inputReader1 = logicalInput1.getReader();
Reader inputReader2 = logicalInput2.getReader();
Preconditions.checkState(inputReader1 instanceof KeyValuesReader);
Preconditions.checkState(inputReader2 instanceof KeyValuesReader);
LogicalOutput lo = getOutputs().get(joinOutput);
Preconditions.checkState(lo.getWriter() instanceof KeyValueWriter);
KeyValueWriter writer = (KeyValueWriter) lo.getWriter();
join((KeyValuesReader) inputReader1, (KeyValuesReader) inputReader2,
writer);
}
@Override
public void run() throws Exception {
Preconditions.checkArgument(getInputs().size() == 1);
Preconditions.checkArgument(getOutputs().size() == 1);
// the recommended approach is to cast the reader/writer to a specific type instead
// of casting the input/output. This allows the actual input/output type to be replaced
// without affecting the semantic guarantees of the data type that are represented by
// the reader and writer.
// The inputs/outputs are referenced via the names assigned in the DAG.
KeyValueReader kvReader = (KeyValueReader) getInputs().get(INPUT).getReader();
KeyValueWriter kvWriter = (KeyValueWriter) getOutputs().get(SUMMATION).getWriter();
while (kvReader.next()) {
StringTokenizer itr = new StringTokenizer(kvReader.getCurrentValue().toString());
while (itr.hasMoreTokens()) {
word.set(itr.nextToken());
// Count 1 every time a word is observed. Word is the key a 1 is the value
kvWriter.write(word, one);
}
}
}
@Override
public void run() throws Exception {
Preconditions.checkArgument(getInputs().size() == 1);
Preconditions.checkArgument(getOutputs().size() == 1);
KeyValueWriter kvWriter = (KeyValueWriter) getOutputs().get(OUTPUT).getWriter();
// The KeyValues reader provides all values for a given key. The aggregation of values per key
// is done by the LogicalInput. Since the key is the word and the values are its counts in
// the different TokenProcessors, summing all values per key provides the sum for that word.
KeyValuesReader kvReader = (KeyValuesReader) getInputs().get(TOKENIZER).getReader();
while (kvReader.next()) {
Text word = (Text) kvReader.getCurrentKey();
int sum = 0;
for (Object value : kvReader.getCurrentValues()) {
sum += ((IntWritable) value).get();
}
kvWriter.write(word, new IntWritable(sum));
}
// deriving from SimpleMRProcessor takes care of committing the output
// It automatically invokes the commit logic for the OutputFormat if necessary.
}
@Override
public void run() throws Exception {
Preconditions.checkState(getInputs().size() == 1);
Preconditions.checkState(getOutputs().size() == 1);
// not looking up inputs and outputs by name because there is just one
// instance and this processor is used in many different DAGs with
// different names for inputs and outputs
LogicalInput input = getInputs().values().iterator().next();
Reader rawReader = input.getReader();
Preconditions.checkState(rawReader instanceof KeyValueReader);
LogicalOutput output = getOutputs().values().iterator().next();
KeyValueReader reader = (KeyValueReader) rawReader;
KeyValueWriter writer = (KeyValueWriter) output.getWriter();
while (reader.next()) {
Object val = reader.getCurrentValue();
// The data value itself is the join key. Simply write it out as the
// key.
// The output value is null.
writer.write(val, NullWritable.get());
}
}
@Override
public void run() throws Exception {
Preconditions.checkArgument(getInputs().size() == 1);
Preconditions.checkArgument(getOutputs().size() == 1);
// the recommended approach is to cast the reader/writer to a specific type instead
// of casting the input/output. This allows the actual input/output type to be replaced
// without affecting the semantic guarantees of the data type that are represented by
// the reader and writer.
// The inputs/outputs are referenced via the names assigned in the DAG.
KeyValueWriter kvWriter = (KeyValueWriter) getOutputs().get(SORTER).getWriter();
KeyValuesReader kvReader = (KeyValuesReader) getInputs().get(TOKENIZER).getReader();
while (kvReader.next()) {
Text word = (Text) kvReader.getCurrentKey();
int sum = 0;
for (Object value : kvReader.getCurrentValues()) {
sum += ((IntWritable) value).get();
}
// write the sum as the key and the word as the value
kvWriter.write(new IntWritable(sum), word);
}
}
@Override
public void run() throws Exception {
Preconditions.checkArgument(getOutputs().size() == 1);
UnorderedKVOutput output = (UnorderedKVOutput) getOutputs().values().iterator()
.next();
KeyValueWriter kvWriter = (KeyValueWriter) output.getWriter();
kvWriter.write(word, new IntWritable(getContext().getTaskIndex()));
ByteBuffer userPayload =
getContext().getUserPayload() == null ? null : getContext().getUserPayload().getPayload();
if (userPayload != null) {
boolean doLocalityCheck = getContext().getUserPayload().getPayload().get(0) > 0 ? true : false;
if (doLocalityCheck) {
ObjectRegistry objectRegistry = getContext().getObjectRegistry();
String entry = String.valueOf(getContext().getTaskIndex());
objectRegistry.cacheForDAG(entry, entry);
}
}
}
/**
* Get a key value write to write Map Reduce compatible output
*/
@Override
public KeyValueWriter getWriter() throws IOException {
return new KeyValueWriter() {
private final boolean useNewWriter = useNewApi;
@SuppressWarnings("unchecked")
@Override
public void write(Object key, Object value) throws IOException {
if (useNewWriter) {
try {
newRecordWriter.write(key, value);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new IOInterruptedException("Interrupted while writing next key-value",e);
}
} else {
oldRecordWriter.write(key, value);
}
outputRecordCounter.increment(1);
getContext().notifyProgress();
}
};
}
@Override
public void run() throws Exception {
Preconditions.checkArgument(getInputs().size() == 1);
Preconditions.checkArgument(getOutputs().size() == 1);
// The KeyValues reader provides all values for a given key. The aggregation of values per key
// is done by the LogicalInput. Since the key is the word and the values are its counts in
// the different TokenProcessors, summing all values per key provides the sum for that word.
KeyValueWriter kvWriter = (KeyValueWriter) getOutputs().get(WRITER).getWriter();
KeyValuesReader kvReader = (KeyValuesReader) getInputs().get(TOKENIZER).getReader();
while (kvReader.next()) {
Text currentWord = (Text) kvReader.getCurrentKey();
int sum = 0;
for (Object val : kvReader.getCurrentValues()) {
sum += ((IntWritable) val).get();
}
localTop.store(sum, currentWord.toString());
}
// write to the output only the local top results
Map<Integer, List<String>> result = localTop.getTopK();
for (int top : result.keySet()) {
IntWritable topWritable = new IntWritable(top);
for (String string : result.get(top)) {
word.set(string);
kvWriter.write(topWritable, word);
}
}
}
@Override
public void run() throws Exception {
KeyValueWriter streamOutputWriter = (KeyValueWriter) getOutputs().get(OUTPUT).getWriter();
long sizeLarge = 0;
while (sizeLarge < streamOutputFileSize) {
String str = createRowString();
Text text = new Text(str);
int size = text.getLength();
streamOutputWriter.write(text, NullWritable.get());
sizeLarge += size;
}
}
@Override
public void attachOutputs(Map<String, LogicalOutput> outputs,
Configuration conf) throws ExecException {
LogicalOutput output = outputs.get(outputKey);
if (output == null) {
throw new ExecException("Output to vertex " + outputKey + " is missing");
}
try {
writer = (KeyValueWriter) output.getWriter();
LOG.info("Attached output to vertex " + outputKey + " : output=" + output + ", writer=" + writer);
} catch (Exception e) {
throw new ExecException(e);
}
}
@Override
public void attachOutputs(Map<String, LogicalOutput> outputs,
Configuration conf) throws ExecException {
LogicalOutput output = outputs.get(outputKey);
if (output == null) {
throw new ExecException("Output to vertex " + outputKey + " is missing");
}
try {
writer = (KeyValueWriter) output.getWriter();
LOG.info("Attached output to vertex " + outputKey + " : output=" + output + ", writer=" + writer);
} catch (Exception e) {
throw new ExecException(e);
}
}
@Override
public Result getNextTuple() throws ExecException {
Result inp;
while (true) {
inp = processInput();
if (inp.returnStatus == POStatus.STATUS_EOP
|| inp.returnStatus == POStatus.STATUS_ERR) {
break;
}
if (inp.returnStatus == POStatus.STATUS_NULL) {
continue;
}
for (KeyValueWriter writer : writers) {
try {
if (taskIndexWithRecordIndexAsKey) {
Tuple tuple = tupleFactory.newTuple(2);
tuple.set(0, taskIndex);
tuple.set(1, count++);
key = tuple;
}
writer.write(key, inp.result);
} catch (IOException e) {
throw new ExecException(e);
}
}
return RESULT_EMPTY;
}
return inp;
}
@Override
public void attachOutputs(Map<String, LogicalOutput> outputs,
Configuration conf) throws ExecException {
LogicalOutput output = outputs.get(outputKey);
if (output == null) {
throw new ExecException("Output to vertex " + outputKey + " is missing");
}
try {
writer = (KeyValueWriter) output.getWriter();
LOG.info("Attached output to vertex " + outputKey + " : output=" + output + ", writer=" + writer);
} catch (Exception e) {
throw new ExecException(e);
}
}
@Override
public void run() throws Exception {
Preconditions.checkState(getInputs().size() == 2);
Preconditions.checkState(getOutputs().size() == 1);
LogicalInput streamInput = getInputs().get("partitioner1");
LogicalInput hashInput = getInputs().get("partitioner2");
Reader rawStreamReader = streamInput.getReader();
Reader rawHashReader = hashInput.getReader();
Preconditions.checkState(rawStreamReader instanceof KeyValueReader);
Preconditions.checkState(rawHashReader instanceof KeyValueReader);
LogicalOutput lo = getOutputs().values().iterator().next();
Preconditions.checkState(lo instanceof MROutput);
MROutput output = (MROutput) lo;
KeyValueWriter writer = output.getWriter();
KeyValueReader hashKvReader = (KeyValueReader) rawHashReader;
Set<Text> keySet = new HashSet<Text>();
while (hashKvReader.next()) {
keySet.add(new Text((Text) hashKvReader.getCurrentKey()));
}
KeyValueReader streamKvReader = (KeyValueReader) rawStreamReader;
while (streamKvReader.next()) {
Text key = (Text) streamKvReader.getCurrentKey();
if (keySet.contains(key)) {
writer.write(key, NullWritable.get());
}
}
LOG.info("Completed Processing. Trying to commit");
while (!getContext().canCommit()) {
Thread.sleep(100l);
}
output.commit();
}
void runOldMapper(
final JobConf job,
final MRTaskReporter reporter,
final MRInputLegacy input,
final KeyValueWriter output
) throws IOException, InterruptedException {
// Initialize input in-line since it sets parameters which may be used by the processor.
// Done only for MRInput.
// TODO use new method in MRInput to get required info
//input.initialize(job, master);
InputSplit inputSplit = input.getOldInputSplit();
updateJobWithSplit(job, inputSplit);
RecordReader in = new OldRecordReader(input);
OutputCollector collector = new OldOutputCollector(output);
MapRunnable runner =
(MapRunnable)ReflectionUtils.newInstance(job.getMapRunnerClass(), job);
runner.run(in, collector, (Reporter)reporter);
// Set progress to 1.0f if there was no exception,
reporter.setProgress(1.0f);
// start the sort phase only if there are reducers
this.statusUpdate();
}
@Override
public void run() throws Exception {
Preconditions.checkArgument(getInputs().size() == 1);
Preconditions.checkArgument(getOutputs().size() == 1);
KeyValueReader kvReader = (KeyValueReader) getInputs().get(INPUT).getReader();
KeyValueWriter kvWriter = (KeyValueWriter) getOutputs().get(VERTEX4).getWriter();
while (kvReader.next()) {
StringTokenizer itr = new StringTokenizer(kvReader.getCurrentValue().toString());
while (itr.hasMoreTokens()) {
kvWriter.write(new Text(itr.nextToken()), new IntWritable(1));
}
}
}
@Override
public void run() throws Exception {
Preconditions.checkState(getInputs().size() == 2);
Preconditions.checkState(getOutputs().size() == 1);
// Get the input data for the 2 sides of the join from the 2 inputs
LogicalInput streamInput = getInputs().get(streamingSide);
LogicalInput hashInput = getInputs().get(hashSide);
Reader rawStreamReader = streamInput.getReader();
Reader rawHashReader = hashInput.getReader();
Preconditions.checkState(rawStreamReader instanceof KeyValueReader);
Preconditions.checkState(rawHashReader instanceof KeyValueReader);
LogicalOutput lo = getOutputs().get(joinOutput);
Preconditions.checkState(lo.getWriter() instanceof KeyValueWriter);
KeyValueWriter writer = (KeyValueWriter) lo.getWriter();
// create a hash table for the hash side
KeyValueReader hashKvReader = (KeyValueReader) rawHashReader;
Set<Text> keySet = new HashSet<Text>();
while (hashKvReader.next()) {
keySet.add(new Text((Text) hashKvReader.getCurrentKey()));
}
// read the stream side and join it using the hash table
KeyValueReader streamKvReader = (KeyValueReader) rawStreamReader;
while (streamKvReader.next()) {
Text key = (Text) streamKvReader.getCurrentKey();
if (keySet.contains(key)) {
writer.write(key, NullWritable.get());
}
}
}
@Override
public void run() throws Exception {
Preconditions.checkArgument(getInputs().size() == 1);
Preconditions.checkArgument(getOutputs().size() == 1);
KeyValueWriter kvWriter = (KeyValueWriter) getOutputs().get(OUTPUT).getWriter();
KeyValuesReader kvReader = (KeyValuesReader) getInputs().get(SUMMATION).getReader();
while (kvReader.next()) {
Object sum = kvReader.getCurrentKey();
for (Object word : kvReader.getCurrentValues()) {
kvWriter.write(word, sum);
}
}
// deriving from SimpleMRProcessor takes care of committing the output
}
@Override
public void run() throws Exception {
Preconditions.checkArgument(getInputs().size() == 1);
Preconditions.checkArgument(getOutputs().size() == 1);
KeyValueReader kvReader = (KeyValueReader) getInputs().get(INPUT).getReader();
KeyValueWriter kvWriter = (KeyValueWriter) getOutputs().get(VERTEX3).getWriter();
while (kvReader.next()) {
Object key = kvReader.getCurrentKey();
Object value = kvReader.getCurrentValue();
kvWriter.write(new Text((String)key), new IntWritable(1));
}
}
@Override
public Writer getWriter() throws Exception {
return new KeyValueWriter() {
@Override
public void write(Object key, Object value) throws IOException {
System.out.println(key + " XXX " + value);
}
};
}
@Override
public void run() throws Exception {
if (inputs.size() != 1) {
throw new IllegalStateException("FilterByWordOutputProcessor processor can only work with a single input");
}
if (outputs.size() != 1) {
throw new IllegalStateException("FilterByWordOutputProcessor processor can only work with a single output");
}
for (LogicalInput input : inputs.values()) {
input.start();
}
for (LogicalOutput output : outputs.values()) {
output.start();
}
LogicalInput li = inputs.values().iterator().next();
if (! (li instanceof UnorderedKVInput)) {
throw new IllegalStateException("FilterByWordOutputProcessor processor can only work with ShuffledUnorderedKVInput");
}
LogicalOutput lo = outputs.values().iterator().next();
if (! (lo instanceof MROutput)) {
throw new IllegalStateException("FilterByWordOutputProcessor processor can only work with MROutput");
}
UnorderedKVInput kvInput = (UnorderedKVInput) li;
MROutput mrOutput = (MROutput) lo;
KeyValueReader kvReader = kvInput.getReader();
KeyValueWriter kvWriter = mrOutput.getWriter();
while (kvReader.next()) {
Object key = kvReader.getCurrentKey();
Object value = kvReader.getCurrentValue();
kvWriter.write(key, value);
}
}