下面列出了怎么用org.apache.hadoop.mapreduce.lib.input.SequenceFileRecordReader的API类实例代码及写法,或者点击链接到github查看源代码。
/**
* Actually instantiate the user's chosen RecordReader implementation.
*/
@SuppressWarnings("unchecked")
private void createChildReader() throws IOException, InterruptedException {
LOG.debug("ChildSplit operates on: " + split.getPath(index));
Configuration conf = context.getConfiguration();
// Determine the file format we're reading.
Class rrClass;
if (ExportJobBase.isSequenceFiles(conf, split.getPath(index))) {
rrClass = SequenceFileRecordReader.class;
} else {
rrClass = LineRecordReader.class;
}
// Create the appropriate record reader.
this.rr = (RecordReader<LongWritable, Object>)
ReflectionUtils.newInstance(rrClass, conf);
}
private Stream<KV<Text, LongWritable>> extractResultsFromFile(String fileName) {
try (SequenceFileRecordReader<Text, LongWritable> reader = new SequenceFileRecordReader<>()) {
Path path = new Path(fileName);
TaskAttemptContext taskContext =
HadoopFormats.createTaskAttemptContext(new Configuration(), new JobID("readJob", 0), 0);
reader.initialize(
new FileSplit(path, 0L, Long.MAX_VALUE, new String[] {"localhost"}), taskContext);
List<KV<Text, LongWritable>> result = new ArrayList<>();
while (reader.nextKeyValue()) {
result.add(
KV.of(
new Text(reader.getCurrentKey().toString()),
new LongWritable(reader.getCurrentValue().get())));
}
return result.stream();
} catch (Exception e) {
throw new RuntimeException(e);
}
}
private void openForRead(TaskAttemptContext taskAttemptContext)
throws IOException, InterruptedException {
reader = new SequenceFileRecordReader<K, V>();
reader.initialize(new FileSplit(chunkFilePath, 0,
DistCpUtils.getFileSize(chunkFilePath, configuration), null),
taskAttemptContext);
}
/**
* Returns a reader for this split of the distributed cache file list.
*/
@Override
public RecordReader<LongWritable, BytesWritable> createRecordReader(
InputSplit split, final TaskAttemptContext taskContext)
throws IOException, InterruptedException {
return new SequenceFileRecordReader<LongWritable, BytesWritable>();
}
private void openForRead(TaskAttemptContext taskAttemptContext)
throws IOException, InterruptedException {
reader = new SequenceFileRecordReader<K, V>();
reader.initialize(new FileSplit(chunkFilePath, 0,
DistCpUtils.getFileSize(chunkFilePath, configuration), null),
taskAttemptContext);
}
/**
* Returns a reader for this split of the distributed cache file list.
*/
@Override
public RecordReader<LongWritable, BytesWritable> createRecordReader(
InputSplit split, final TaskAttemptContext taskContext)
throws IOException, InterruptedException {
return new SequenceFileRecordReader<LongWritable, BytesWritable>();
}
@Override
public RecordReader<NullWritable, V> createRecordReader(final InputSplit split, final TaskAttemptContext context)
throws IOException, InterruptedException {
final SequenceFileRecordReader<NullWritable, V> reader = new SequenceFileRecordReader<NullWritable, V>();
reader.initialize(split, context);
return reader;
}
private void openForRead(TaskAttemptContext taskAttemptContext) throws IOException, InterruptedException {
reader = new SequenceFileRecordReader<>();
reader
.initialize(new FileSplit(chunkFilePath, 0, getFileSize(chunkFilePath, configuration), null),
taskAttemptContext);
}
/**
* Getter for the record-reader, opened to the chunk-file.
* @return Opened Sequence-file reader.
*/
public SequenceFileRecordReader<K,V> getReader() {
assert reader != null : "Reader un-initialized!";
return reader;
}
/**
* Getter for the record-reader, opened to the chunk-file.
* @return Opened Sequence-file reader.
*/
public SequenceFileRecordReader<K,V> getReader() {
assert reader != null : "Reader un-initialized!";
return reader;
}
@SuppressWarnings("unchecked")
@Override
public void prepareToRead(RecordReader reader, PigSplit split)
throws IOException {
this.reader = (SequenceFileRecordReader) reader;
}
@SuppressWarnings("unchecked")
@Override
public void prepareToRead(RecordReader reader, PigSplit split)
throws IOException {
this.reader = (SequenceFileRecordReader) reader;
}
@Override
public RecordReader<K, V> createRecordReader(InputSplit split, TaskAttemptContext context) throws IOException {
return new SequenceFileRecordReader<K, V>();
}
/**
* Implementation of InputFormat::createRecordReader().
* @param split The split for which the RecordReader is sought.
* @param context The context of the current task-attempt.
* @return A SequenceFileRecordReader instance, (since the copy-listing is a
* simple sequence-file.)
* @throws IOException
* @throws InterruptedException
*/
@Override
public RecordReader<Text, CopyListingFileStatus> createRecordReader(
InputSplit split, TaskAttemptContext context)
throws IOException, InterruptedException {
return new SequenceFileRecordReader<Text, CopyListingFileStatus>();
}
/**
* Implementation of InputFormat::createRecordReader().
* @param split The split for which the RecordReader is sought.
* @param context The context of the current task-attempt.
* @return A SequenceFileRecordReader instance, (since the copy-listing is a
* simple sequence-file.)
* @throws IOException
* @throws InterruptedException
*/
@Override
public RecordReader<Text, CopyListingFileStatus> createRecordReader(
InputSplit split, TaskAttemptContext context)
throws IOException, InterruptedException {
return new SequenceFileRecordReader<Text, CopyListingFileStatus>();
}
/**
* Implementation of InputFormat::createRecordReader().
*
* @param split The split for which the RecordReader is sought.
* @param context The context of the current task-attempt.
* @return A SequenceFileRecordReader instance, (since the copy-listing is a simple sequence-file.)
* @throws IOException
* @throws InterruptedException
*/
@Override
public RecordReader<Text, CopyListingFileStatus> createRecordReader(InputSplit split, TaskAttemptContext context)
throws IOException, InterruptedException {
return new SequenceFileRecordReader<>();
}
/**
* Getter for the record-reader, opened to the chunk-file.
*
* @return Opened Sequence-file reader.
*/
public SequenceFileRecordReader<K, V> getReader() {
assert reader != null : "Reader un-initialized!";
return reader;
}