类org.apache.hadoop.mapreduce.lib.input.SequenceFileRecordReader源码实例Demo

下面列出了怎么用org.apache.hadoop.mapreduce.lib.input.SequenceFileRecordReader的API类实例代码及写法,或者点击链接到github查看源代码。

/**
 * Actually instantiate the user's chosen RecordReader implementation.
 */
@SuppressWarnings("unchecked")
private void createChildReader() throws IOException, InterruptedException {
  LOG.debug("ChildSplit operates on: " + split.getPath(index));

  Configuration conf = context.getConfiguration();

  // Determine the file format we're reading.
  Class rrClass;
  if (ExportJobBase.isSequenceFiles(conf, split.getPath(index))) {
    rrClass = SequenceFileRecordReader.class;
  } else {
    rrClass = LineRecordReader.class;
  }

  // Create the appropriate record reader.
  this.rr = (RecordReader<LongWritable, Object>)
      ReflectionUtils.newInstance(rrClass, conf);
}
 
源代码2 项目: beam   文件: HadoopFormatIOSequenceFileTest.java
private Stream<KV<Text, LongWritable>> extractResultsFromFile(String fileName) {
  try (SequenceFileRecordReader<Text, LongWritable> reader = new SequenceFileRecordReader<>()) {
    Path path = new Path(fileName);
    TaskAttemptContext taskContext =
        HadoopFormats.createTaskAttemptContext(new Configuration(), new JobID("readJob", 0), 0);
    reader.initialize(
        new FileSplit(path, 0L, Long.MAX_VALUE, new String[] {"localhost"}), taskContext);
    List<KV<Text, LongWritable>> result = new ArrayList<>();

    while (reader.nextKeyValue()) {
      result.add(
          KV.of(
              new Text(reader.getCurrentKey().toString()),
              new LongWritable(reader.getCurrentValue().get())));
    }

    return result.stream();
  } catch (Exception e) {
    throw new RuntimeException(e);
  }
}
 
源代码3 项目: hadoop   文件: DynamicInputChunk.java
private void openForRead(TaskAttemptContext taskAttemptContext)
        throws IOException, InterruptedException {
  reader = new SequenceFileRecordReader<K, V>();
  reader.initialize(new FileSplit(chunkFilePath, 0,
          DistCpUtils.getFileSize(chunkFilePath, configuration), null),
          taskAttemptContext);
}
 
源代码4 项目: hadoop   文件: GenerateDistCacheData.java
/**
 * Returns a reader for this split of the distributed cache file list.
 */
@Override
public RecordReader<LongWritable, BytesWritable> createRecordReader(
    InputSplit split, final TaskAttemptContext taskContext)
    throws IOException, InterruptedException {
  return new SequenceFileRecordReader<LongWritable, BytesWritable>();
}
 
源代码5 项目: big-c   文件: DynamicInputChunk.java
private void openForRead(TaskAttemptContext taskAttemptContext)
        throws IOException, InterruptedException {
  reader = new SequenceFileRecordReader<K, V>();
  reader.initialize(new FileSplit(chunkFilePath, 0,
          DistCpUtils.getFileSize(chunkFilePath, configuration), null),
          taskAttemptContext);
}
 
源代码6 项目: big-c   文件: GenerateDistCacheData.java
/**
 * Returns a reader for this split of the distributed cache file list.
 */
@Override
public RecordReader<LongWritable, BytesWritable> createRecordReader(
    InputSplit split, final TaskAttemptContext taskContext)
    throws IOException, InterruptedException {
  return new SequenceFileRecordReader<LongWritable, BytesWritable>();
}
 
源代码7 项目: kangaroo   文件: WritableValueInputFormat.java
@Override
public RecordReader<NullWritable, V> createRecordReader(final InputSplit split, final TaskAttemptContext context)
        throws IOException, InterruptedException {
    final SequenceFileRecordReader<NullWritable, V> reader = new SequenceFileRecordReader<NullWritable, V>();
    reader.initialize(split, context);
    return reader;
}
 
源代码8 项目: circus-train   文件: DynamicInputChunk.java
private void openForRead(TaskAttemptContext taskAttemptContext) throws IOException, InterruptedException {
  reader = new SequenceFileRecordReader<>();
  reader
      .initialize(new FileSplit(chunkFilePath, 0, getFileSize(chunkFilePath, configuration), null),
          taskAttemptContext);
}
 
源代码9 项目: hadoop   文件: DynamicInputChunk.java
/**
 * Getter for the record-reader, opened to the chunk-file.
 * @return Opened Sequence-file reader.
 */
public SequenceFileRecordReader<K,V> getReader() {
  assert reader != null : "Reader un-initialized!";
  return reader;
}
 
源代码10 项目: big-c   文件: DynamicInputChunk.java
/**
 * Getter for the record-reader, opened to the chunk-file.
 * @return Opened Sequence-file reader.
 */
public SequenceFileRecordReader<K,V> getReader() {
  assert reader != null : "Reader un-initialized!";
  return reader;
}
 
源代码11 项目: hiped2   文件: SequenceFileStockLoader.java
@SuppressWarnings("unchecked")
@Override
public void prepareToRead(RecordReader reader, PigSplit split)
    throws IOException {
  this.reader = (SequenceFileRecordReader) reader;
}
 
源代码12 项目: spork   文件: SequenceFileLoader.java
@SuppressWarnings("unchecked")
@Override
public void prepareToRead(RecordReader reader, PigSplit split)
      throws IOException {
  this.reader = (SequenceFileRecordReader) reader;
}
 
源代码13 项目: kangaroo   文件: S3SequenceFileInputFormat.java
@Override
public RecordReader<K, V> createRecordReader(InputSplit split, TaskAttemptContext context) throws IOException {
    return new SequenceFileRecordReader<K, V>();
}
 
源代码14 项目: hadoop   文件: UniformSizeInputFormat.java
/**
 * Implementation of InputFormat::createRecordReader().
 * @param split The split for which the RecordReader is sought.
 * @param context The context of the current task-attempt.
 * @return A SequenceFileRecordReader instance, (since the copy-listing is a
 * simple sequence-file.)
 * @throws IOException
 * @throws InterruptedException
 */
@Override
public RecordReader<Text, CopyListingFileStatus> createRecordReader(
    InputSplit split, TaskAttemptContext context)
    throws IOException, InterruptedException {
  return new SequenceFileRecordReader<Text, CopyListingFileStatus>();
}
 
源代码15 项目: big-c   文件: UniformSizeInputFormat.java
/**
 * Implementation of InputFormat::createRecordReader().
 * @param split The split for which the RecordReader is sought.
 * @param context The context of the current task-attempt.
 * @return A SequenceFileRecordReader instance, (since the copy-listing is a
 * simple sequence-file.)
 * @throws IOException
 * @throws InterruptedException
 */
@Override
public RecordReader<Text, CopyListingFileStatus> createRecordReader(
    InputSplit split, TaskAttemptContext context)
    throws IOException, InterruptedException {
  return new SequenceFileRecordReader<Text, CopyListingFileStatus>();
}
 
源代码16 项目: circus-train   文件: UniformSizeInputFormat.java
/**
 * Implementation of InputFormat::createRecordReader().
 *
 * @param split The split for which the RecordReader is sought.
 * @param context The context of the current task-attempt.
 * @return A SequenceFileRecordReader instance, (since the copy-listing is a simple sequence-file.)
 * @throws IOException
 * @throws InterruptedException
 */
@Override
public RecordReader<Text, CopyListingFileStatus> createRecordReader(InputSplit split, TaskAttemptContext context)
  throws IOException, InterruptedException {
  return new SequenceFileRecordReader<>();
}
 
源代码17 项目: circus-train   文件: DynamicInputChunk.java
/**
 * Getter for the record-reader, opened to the chunk-file.
 *
 * @return Opened Sequence-file reader.
 */
public SequenceFileRecordReader<K, V> getReader() {
  assert reader != null : "Reader un-initialized!";
  return reader;
}
 
 同包方法