

 * Actually instantiate the user's chosen RecordReader implementation.
private void createChildReader() throws IOException, InterruptedException {
  LOG.debug("ChildSplit operates on: " + split.getPath(index));

  Configuration conf = context.getConfiguration();

  // Determine the file format we're reading.
  Class rrClass;
  if (ExportJobBase.isSequenceFiles(conf, split.getPath(index))) {
    rrClass = SequenceFileRecordReader.class;
  } else {
    rrClass = LineRecordReader.class;

  // Create the appropriate record reader.
  this.rr = (RecordReader<LongWritable, Object>)
      ReflectionUtils.newInstance(rrClass, conf);
源代码2 项目: incubator-retired-pirk   文件: JSONRecordReader.java
public void initialize(InputSplit inputSplit, TaskAttemptContext context) throws IOException
  key = new Text();
  value = new MapWritable();
  jsonParser = new JSONParser();

  lineReader = new LineRecordReader();
  lineReader.initialize(inputSplit, context);

  queryString = context.getConfiguration().get("query", "?q=*");

  // Load the data schemas
  FileSystem fs = FileSystem.get(context.getConfiguration());
    SystemConfiguration.setProperty("data.schemas", context.getConfiguration().get("data.schemas"));
    DataSchemaLoader.initialize(true, fs);
  } catch (Exception e)
  String dataSchemaName = context.getConfiguration().get("dataSchemaName");
  dataSchema = DataSchemaRegistry.get(dataSchemaName);
源代码3 项目: components   文件: ConfigurableHDFSFileSource.java
protected Job jobInstance() throws IOException {
    Job job = Job.getInstance();
    // deserialize map to conf
    Configuration conf = job.getConfiguration();
    for (Map.Entry<String, String> entry : map.entrySet()) {
        conf.set(entry.getKey(), entry.getValue());
    // TODO: We've explicitly listed all the schemas supported here, but the filesystem schema could be dynamically
    // generated from the path (resolved against the default name node).
    conf.set("fs.gs.impl.disable.cache", "true");
    conf.set("fs.s3t.impl.disable.cache", "true");
    conf.set("fs.file.impl.disable.cache", "true");
    conf.set("fs.hdfs.impl.disable.cache", "true");
             Integer.toString(conf.getInt("maxRowSize", 10*1024*1024))); // 10 Mo Max per line.

    return job;
源代码4 项目: hadoop-connectors   文件: GsonRecordReader.java
 * Called once at initialization to initialize the RecordReader.
 * @param genericSplit the split that defines the range of records to read.
 * @param context the information about the task.
 * @throws IOException on IO Error.
public void initialize(InputSplit genericSplit, TaskAttemptContext context)
    throws IOException, InterruptedException {
  if (logger.atFine().isEnabled()) {
          "initialize('%s', '%s')",
          HadoopToStringUtil.toString(genericSplit), HadoopToStringUtil.toString(context));
  Preconditions.checkArgument(genericSplit instanceof FileSplit,
      "InputSplit genericSplit should be an instance of FileSplit.");
  // Get FileSplit.
  FileSplit fileSplit = (FileSplit) genericSplit;
  // Create the JsonParser.
  jsonParser = new JsonParser();
  // Initialize the LineRecordReader.
  lineReader = new LineRecordReader();
  lineReader.initialize(fileSplit, context);
源代码5 项目: components   文件: LimitedLineRecordReader.java
public static <KEYIN, VALUEIN>  RecordReader<KEYIN, VALUEIN> buildReader(RecordReader<KEYIN, VALUEIN> reader) {
    if (reader instanceof LimitedLineRecordReader) {
        // already encapsulate
        return reader;
    if (!LineRecordReader.class.isInstance(reader) &&
        !CSVFileRecordReader.class.isInstance(reader)) {
        // only line file reader.
        return reader;
    return (RecordReader<KEYIN, VALUEIN>) new LimitedLineRecordReader(reader);
源代码6 项目: mrgeo   文件: DelimitedVectorRecordReader.java
@SuppressWarnings("squid:S2095") // recordReader is closed explictly in the close() method
public void initialize(InputSplit split, TaskAttemptContext context) throws IOException,
  if (split instanceof FileSplit)
    FileSplit fsplit = (FileSplit) split;
    delimitedParser = getDelimitedParser(fsplit.getPath().toString(),
    recordReader = new LineRecordReader();
    recordReader.initialize(fsplit, context);
    // Skip the first
    if (delimitedParser.getSkipFirstLine())
      // Only skip the first line of the first split. The other
      // splits are somewhere in the middle of the original file,
      // so their first lines should not be skipped.
      if (fsplit.getStart() != 0)
    throw new IOException("input split is not a FileSplit");
源代码7 项目: presto   文件: GenericHiveRecordCursorProvider.java
public Optional<ReaderRecordCursorWithProjections> createRecordCursor(
        Configuration configuration,
        ConnectorSession session,
        Path path,
        long start,
        long length,
        long fileSize,
        Properties schema,
        List<HiveColumnHandle> columns,
        TupleDomain<HiveColumnHandle> effectivePredicate,
        DateTimeZone hiveStorageTimeZone,
        TypeManager typeManager,
        boolean s3SelectPushdownEnabled)
    configuration.setInt(LineRecordReader.MAX_LINE_LENGTH, textMaxLineLengthBytes);

    // make sure the FileSystem is created with the proper Configuration object
    try {
        this.hdfsEnvironment.getFileSystem(session.getUser(), path, configuration);
    catch (IOException e) {
        throw new PrestoException(HIVE_FILESYSTEM_ERROR, "Failed getting FileSystem: " + path, e);

    Optional<ReaderProjections> projectedReaderColumns = projectBaseColumns(columns);

    RecordCursor cursor = hdfsEnvironment.doAs(session.getUser(), () -> {
        RecordReader<?, ?> recordReader = HiveUtil.createRecordReader(

        return new GenericHiveRecordCursor<>(

    return Optional.of(new ReaderRecordCursorWithProjections(cursor, projectedReaderColumns));
源代码8 项目: tinkerpop   文件: ScriptRecordReader.java
public ScriptRecordReader() {
    this.lineRecordReader = new LineRecordReader();
源代码9 项目: tinkerpop   文件: GraphSONRecordReader.java
public GraphSONRecordReader() {
    this.lineRecordReader = new LineRecordReader();
public RecordReader<LongWritable, Text> createDelegateRecordReader(
    InputSplit split, Configuration configuration) throws IOException, InterruptedException {
  logger.atFine().log("createDelegateRecordReader -> new LineRecordReader");
  return new LineRecordReader();
源代码11 项目: spork   文件: RegExLoader.java
public void prepareToRead(RecordReader reader, PigSplit split)
        throws IOException {
    in = (LineRecordReader) reader;
源代码12 项目: phoenix   文件: PhoenixTextInputFormat.java
public RecordReader<LongWritable, Text> createRecordReader(InputSplit split, TaskAttemptContext context) {
  RecordReader<LongWritable,Text> rr = super.createRecordReader(split, context);
  return new PhoenixLineRecordReader((LineRecordReader) rr);
源代码13 项目: phoenix   文件: PhoenixTextInputFormat.java
private PhoenixLineRecordReader(LineRecordReader rr) {
  this.rr = rr;
源代码14 项目: mrgeo   文件: DelimitedVectorRecordReader.java
public VectorLineProducer(LineRecordReader recordReader)
  lineRecordReader = recordReader;
源代码15 项目: geowave   文件: GeonamesDataFileInputFormat.java
public RecordReader<LongWritable, Text> createRecordReader(
    final InputSplit split,
    final TaskAttemptContext context) throws IOException, InterruptedException {
  return new LineRecordReader();
源代码16 项目: kangaroo   文件: S3TextInputFormat.java
public RecordReader<LongWritable, Text> createRecordReader(InputSplit split, TaskAttemptContext context) {
    final String delimiter = context.getConfiguration().get("textinputformat.record.delimiter");
    return new LineRecordReader(delimiter != null ? delimiter.getBytes() : null);