下面列出了org.apache.hadoop.mapred.Reporter#getInputSplit ( ) 实例代码,或者点击链接到github查看源代码,也可以在右侧发表评论。
@SuppressWarnings("unchecked")
public void map(K1 key, V1 value, OutputCollector<K2, V2> outputCollector,
Reporter reporter) throws IOException {
if (mapper == null) {
// Find the Mapper from the TaggedInputSplit.
TaggedInputSplit inputSplit = (TaggedInputSplit) reporter.getInputSplit();
mapper = (Mapper<K1, V1, K2, V2>) ReflectionUtils.newInstance(inputSplit
.getMapperClass(), conf);
}
mapper.map(key, value, outputCollector, reporter);
}
@SuppressWarnings("unchecked")
public void map(K1 key, V1 value, OutputCollector<K2, V2> outputCollector,
Reporter reporter) throws IOException {
if (mapper == null) {
// Find the Mapper from the TaggedInputSplit.
TaggedInputSplit inputSplit = (TaggedInputSplit) reporter.getInputSplit();
mapper = (Mapper<K1, V1, K2, V2>) ReflectionUtils.newInstance(inputSplit
.getMapperClass(), conf);
}
mapper.map(key, value, outputCollector, reporter);
}
@Override
public LWDocument[] toDocuments(Writable key, Writable value, Reporter reporter,
Configuration conf) throws IOException {
if (key != null && value != null) {
LWDocument doc = createDocument(key.toString() + "-" + System.currentTimeMillis(), null);
Matcher matcher = regex.matcher(value.toString());
if (matcher != null) {
if (match) {
if (matcher.matches()) {
processMatch(doc, matcher);
}
} else {//
while (matcher.find()) {
processMatch(doc, matcher);
reporter.progress();//do we really even need this?
}
}
}
// Adding the file path where this record was taken
FileSplit fileSplit = (FileSplit) reporter.getInputSplit();
String originalLogFilePath = fileSplit.getPath().toUri().getPath();
doc.addField(FIELD_PATH, originalLogFilePath);
String docId = originalLogFilePath + "-" + doc.getId();
doc.setId(docId);
return new LWDocument[] {doc};
}
return null;
}
public void map(LongWritable key, Text val,
OutputCollector<Text, Text> output, Reporter reporter) throws IOException
{
FileSplit fileSplit = (FileSplit)reporter.getInputSplit();
String fileName = fileSplit.getPath().getName();
location.set(fileName);
String line = val.toString();
StringTokenizer itr = new StringTokenizer(line.toLowerCase());
while (itr.hasMoreTokens()) {
word.set(itr.nextToken());
output.collect(word, location);
}
}
@SuppressWarnings("unchecked")
public void map(K1 key, V1 value, OutputCollector<K2, V2> outputCollector,
Reporter reporter) throws IOException {
if (mapper == null) {
// Find the Mapper from the TaggedInputSplit.
TaggedInputSplit inputSplit = (TaggedInputSplit) reporter.getInputSplit();
mapper = (Mapper<K1, V1, K2, V2>) ReflectionUtils.newInstance(inputSplit
.getMapperClass(), conf);
}
mapper.map(key, value, outputCollector, reporter);
}
public void map(LongWritable key, Text val,
OutputCollector<Text, Text> output, Reporter reporter)
throws IOException {
FileSplit fileSplit = (FileSplit) reporter.getInputSplit();
String fileName = fileSplit.getPath().getName();
location.set(fileName);
String line = val.toString();
StringTokenizer itr = new StringTokenizer(line.toLowerCase());
while (itr.hasMoreTokens()) {
word.set(itr.nextToken());
output.collect(word, location);
}
}
@SuppressWarnings("unchecked")
public void map(K1 key, V1 value, OutputCollector<K2, V2> outputCollector,
Reporter reporter) throws IOException {
if (mapper == null) {
// Find the Mapper from the TaggedInputSplit.
TaggedInputSplit inputSplit = (TaggedInputSplit) reporter.getInputSplit();
mapper = (Mapper<K1, V1, K2, V2>) ReflectionUtils.newInstance(inputSplit
.getMapperClass(), conf);
}
mapper.map(key, value, outputCollector, reporter);
}
@Override
protected LWDocument[] toDocuments(LongWritable key, Text value, Reporter reporter,
Configuration conf) throws IOException {
Map<String, Object> params = new HashMap<String, Object>();
params.put(LOG_RUBY_PARAM, value.toString());
params.put(FILTERS_ARRAY_RUBY_PARAM, filters);
List<String> toRemoveList = new ArrayList<String>();
toRemoveList.add(LOG_RUBY_PARAM);
toRemoveList.add(FILTERS_ARRAY_RUBY_PARAM);
Object response = executeScript(MATCHER_RUBY_CLASS, params, toRemoveList);
try {
RubyHash hash = (RubyHash) response;
if (response != null) {
Set<String> keys = hash.keySet();
LWDocument document = createDocument();
for (String currentKey : keys) {
document.addField(currentKey, hash.get(currentKey));
}
// Adding the file where this log was taken
FileSplit fileSplit = (FileSplit) reporter.getInputSplit();
String originalLogFilePath = fileSplit.getPath().toUri().getPath();
document.addField(PATH_FIELD_NAME, originalLogFilePath);
// Adding offset value
document.addField(BYTE_OFFSET_FIELD_NAME, key.toString());
// Set ID
document.setId(originalLogFilePath + "-" + key.toString() + "-" + System.currentTimeMillis());
return new LWDocument[] {document};
} else {
return null;
}
} catch (Exception e) {
log.error("Error: " + e.getMessage());
throw new RuntimeException("Error executing ruby script");
}
}