类org.apache.hadoop.mapreduce.RecordReader源码实例Demo

下面列出了怎么用org.apache.hadoop.mapreduce.RecordReader的API类实例代码及写法,或者点击链接到github查看源代码。

@Test
public void readEthereumBlockInputFormatBlock1346406() throws IOException, EthereumBlockReadException, ParseException, InterruptedException {
	Configuration conf = new Configuration(defaultConf);
	ClassLoader classLoader = getClass().getClassLoader();
	String fileName="eth1346406.bin";
	String fileNameBlock=classLoader.getResource("testdata/"+fileName).getFile();
	Path file = new Path(fileNameBlock);
	Job job = Job.getInstance(conf);
	FileInputFormat.setInputPaths(job, file);
	EthereumBlockFileInputFormat format = new EthereumBlockFileInputFormat();

	List<InputSplit> splits = format.getSplits(job);
	TaskAttemptContext context = new TaskAttemptContextImpl(conf, new TaskAttemptID());
	assertEquals( 1, splits.size(),"Only one split generated for block 1346406");
	RecordReader<BytesWritable, EthereumBlock> reader = format.createRecordReader(splits.get(0), context);
	assertNotNull( reader,"Format returned  null RecordReader");
	reader.initialize(splits.get(0),context);
	BytesWritable key = new BytesWritable();
	EthereumBlock block = new EthereumBlock();
	assertTrue( reader.nextKeyValue(),"Input Split for block 1346406 contains at least one block");
	key=reader.getCurrentKey();
	block=reader.getCurrentValue();
	assertEquals( 6, block.getEthereumTransactions().size(),"Block 1346406 must have 6 transactions");
	assertFalse( reader.nextKeyValue(),"No further blocks in block 1346406");
	reader.close();
}
 
@Test
public void readEthereumBlockInputFormatBlock1() throws IOException, EthereumBlockReadException, ParseException, InterruptedException {
	Configuration conf = new Configuration(defaultConf);
	ClassLoader classLoader = getClass().getClassLoader();
	String fileName="eth1.bin";
	String fileNameBlock=classLoader.getResource("testdata/"+fileName).getFile();
	Path file = new Path(fileNameBlock);
	Job job = Job.getInstance(conf);
	FileInputFormat.setInputPaths(job, file);
	EthereumBlockFileInputFormat format = new EthereumBlockFileInputFormat();

	List<InputSplit> splits = format.getSplits(job);
	TaskAttemptContext context = new TaskAttemptContextImpl(conf, new TaskAttemptID());
	assertEquals( 1, splits.size(),"Only one split generated for block 1");
	RecordReader<BytesWritable, EthereumBlock> reader = format.createRecordReader(splits.get(0), context);
	assertNotNull( reader,"Format returned  null RecordReader");
	reader.initialize(splits.get(0),context);
	BytesWritable key = new BytesWritable();
	EthereumBlock block = new EthereumBlock();
	assertTrue( reader.nextKeyValue(),"Input Split for block 1 contains at least one block");
	key=reader.getCurrentKey();
	block=reader.getCurrentValue();
	assertEquals( 0, block.getEthereumTransactions().size(),"Block 1 must have 0 transactions");
	assertFalse( reader.nextKeyValue(),"No further blocks in block 1");
	reader.close();
}
 
源代码3 项目: components   文件: ExcelFileInputFormat.java
@Override
public RecordReader<Void, IndexedRecord> createRecordReader(InputSplit split, TaskAttemptContext context) throws IOException {
  String encoding = context.getConfiguration().get(TALEND_ENCODING);
  String sheet = context.getConfiguration().get(TALEND_EXCEL_SHEET_NAME);
  long header = context.getConfiguration().getLong(TALEND_HEADER, 0l);
  long footer = context.getConfiguration().getLong(TALEND_FOOTER, 0l);
  String excelFormat = context.getConfiguration().get(TALEND_EXCEL_FORMAT, "EXCEL2007");
  long limit = context.getConfiguration().getLong(TALEND_EXCEL_LIMIT, -1);
  
  if("EXCEL2007".equals(excelFormat)) {
    return new Excel2007FileRecordReader(sheet, header, footer, limit);
  } else if("EXCEL97".equals(excelFormat)) {
    return new Excel97FileRecordReader(sheet, header, footer, limit);
  } else if("HTML".equals(excelFormat)) {
    return new ExcelHTMLFileRecordReader(encoding, header, footer, limit);
  }
  
  throw new IOException("not a valid excel format");
}
 
源代码4 项目: kite   文件: JSONInputFormat.java
@Override
public RecordReader<E, Void> createRecordReader(InputSplit split,
                                                TaskAttemptContext context)
    throws IOException, InterruptedException {
  Configuration conf = Hadoop.TaskAttemptContext
      .getConfiguration.invoke(context);
  Path path;
  if (split instanceof FileSplit) {
    path = ((FileSplit) split).getPath();
  } else {
    throw new DatasetOperationException(
        "Split is not a FileSplit: %s:%s",
        split.getClass().getCanonicalName(), split);
  }
  JSONFileReader<E> reader = new JSONFileReader<E>(
      path.getFileSystem(conf), path, accessor);
  reader.initialize();
  return reader.asRecordReader();
}
 
源代码5 项目: big-c   文件: Chain.java
/**
 * Add mapper(the first mapper) that reads input from the input
 * context and writes to queue
 */
@SuppressWarnings("unchecked")
void addMapper(TaskInputOutputContext inputContext,
    ChainBlockingQueue<KeyValuePair<?, ?>> output, int index)
    throws IOException, InterruptedException {
  Configuration conf = getConf(index);
  Class<?> keyOutClass = conf.getClass(MAPPER_OUTPUT_KEY_CLASS, Object.class);
  Class<?> valueOutClass = conf.getClass(MAPPER_OUTPUT_VALUE_CLASS,
      Object.class);

  RecordReader rr = new ChainRecordReader(inputContext);
  RecordWriter rw = new ChainRecordWriter(keyOutClass, valueOutClass, output,
      conf);
  Mapper.Context mapperContext = createMapContext(rr, rw,
      (MapContext) inputContext, getConf(index));
  MapRunner runner = new MapRunner(mappers.get(index), mapperContext, rr, rw);
  threads.add(runner);
}
 
@Override
/** {@inheritDoc} */
protected RecordReader<LongWritable, T> createDBRecordReader(
    DBInputSplit split, Configuration conf) throws IOException {

  DBConfiguration dbConf = getDBConf();
  Class<T> inputClass = (Class<T>) (dbConf.getInputClass());
  String dbProductName = getDBProductName();
  LOG.debug("Creating db record reader for db product: " + dbProductName);

  try {
    return new SQLServerDBRecordReader<T>(split, inputClass,
        conf, getConnection(), dbConf, dbConf.getInputConditions(),
        dbConf.getInputFieldNames(), dbConf.getInputTableName(),
        dbProductName);
  } catch (SQLException ex) {
    throw new IOException(ex);
  }
}
 
@Test
public void readEthereumBlockInputFormatGenesisBlock() throws IOException, EthereumBlockReadException, ParseException, InterruptedException {
	Configuration conf = new Configuration(defaultConf);
	ClassLoader classLoader = getClass().getClassLoader();
	String fileName="ethgenesis.bin";
	String fileNameBlock=classLoader.getResource("testdata/"+fileName).getFile();
	Path file = new Path(fileNameBlock);
	Job job = Job.getInstance(conf);
	FileInputFormat.setInputPaths(job, file);
	EthereumBlockFileInputFormat format = new EthereumBlockFileInputFormat();

	List<InputSplit> splits = format.getSplits(job);
	TaskAttemptContext context = new TaskAttemptContextImpl(conf, new TaskAttemptID());
	assertEquals( 1, splits.size(),"Only one split generated for genesis block");
	RecordReader<BytesWritable, EthereumBlock> reader = format.createRecordReader(splits.get(0), context);
	assertNotNull( reader,"Format returned  null RecordReader");
	reader.initialize(splits.get(0),context);
	BytesWritable key = new BytesWritable();
	EthereumBlock block = new EthereumBlock();
	assertTrue( reader.nextKeyValue(),"Input Split for genesis block contains at least one block");
	key=reader.getCurrentKey();
	block=reader.getCurrentValue();
	assertEquals( 0, block.getEthereumTransactions().size(),"Genesis Block must have 0 transactions");
	assertFalse( reader.nextKeyValue(),"No further blocks in genesis Block");
	reader.close();
}
 
@Test
public void readExcelInputFormatExcel2013SingleSheetEncryptedNegativeLowFootprint()
		throws IOException, InterruptedException {
	Configuration conf = new Configuration(defaultConf);
	ClassLoader classLoader = getClass().getClassLoader();
	String fileName = "excel2013encrypt.xlsx";
	String fileNameSpreadSheet = classLoader.getResource(fileName).getFile();
	Path file = new Path(fileNameSpreadSheet);
	// set locale to the one of the test data
	conf.set("hadoopoffice.read.locale.bcp47", "de");

	// low footprint
	conf.set("hadoopoffice.read.lowFootprint", "true");
	conf.set("hadoopoffice.read.lowFootprint.parser", "sax");
	// for decryption simply set the password
	conf.set("hadoopoffice.read.security.crypt.password", "test2");
	Job job = Job.getInstance(conf);
	FileInputFormat.setInputPaths(job, file);
	TaskAttemptContext context = new TaskAttemptContextImpl(conf, new TaskAttemptID());
	ExcelFileInputFormat format = new ExcelFileInputFormat();
	List<InputSplit> splits = format.getSplits(job);
	assertEquals(1, splits.size(), "Only one split generated for Excel file");
	RecordReader<Text, ArrayWritable> reader = format.createRecordReader(splits.get(0), context);
	InterruptedException ex = assertThrows(InterruptedException.class,
			() -> reader.initialize(splits.get(0), context), "Exception is thrown in case of wrong password");
}
 
源代码9 项目: vxquery   文件: XmlCollectionWithTagInputFormat.java
@Override
public RecordReader<LongWritable, Text> createRecordReader(InputSplit split, TaskAttemptContext context) {
    try {
        STARTING_TAG = context.getConfiguration().get("start_tag");
        ENDING_TAG = context.getConfiguration().get("end_tag");
        return new XmlRecordReader((FileSplit) split, context.getConfiguration());
    } catch (IOException ioe) {
        return null;
    }
}
 
源代码10 项目: datafu   文件: CombinedAvroKeyInputFormat.java
@SuppressWarnings("unchecked")
@Override
public RecordReader<AvroKey<T>, NullWritable> createRecordReader(InputSplit inputSplit,
                                                                 TaskAttemptContext context) throws IOException
{
  Schema readerSchema = AvroJob.getInputKeySchema(context.getConfiguration());
  if (null == readerSchema) {
    LOG.warn("Reader schema was not set. Use AvroJob.setInputKeySchema() if desired.");
    LOG.info("Using a reader schema equal to the writer schema.");
  }
      
  Object c = CombinedAvroKeyRecordReader.class;
  return new CombineFileRecordReader<AvroKey<T>, NullWritable>((CombineFileSplit) inputSplit, context, (Class<? extends RecordReader<AvroKey<T>, NullWritable>>)c);
}
 
源代码11 项目: big-c   文件: MapContextImpl.java
public MapContextImpl(Configuration conf, TaskAttemptID taskid,
                      RecordReader<KEYIN,VALUEIN> reader,
                      RecordWriter<KEYOUT,VALUEOUT> writer,
                      OutputCommitter committer,
                      StatusReporter reporter,
                      InputSplit split) {
  super(conf, taskid, writer, committer, reporter);
  this.reader = reader;
  this.split = split;
}
 
源代码12 项目: datawave   文件: Type.java
public Type(String name, String outputName, Class<? extends IngestHelperInterface> helperClass, Class<? extends RecordReader<?,?>> readerClass,
                String[] defaultDataTypeHandlers, int filterPriority, String[] defaultDataTypeFilters) {
    this.name = name;
    this.outputName = outputName;
    this.helperClass = helperClass;
    this.readerClass = readerClass;
    this.defaultDataTypeHandlers = defaultDataTypeHandlers;
    this.defaultDataTypeFilters = defaultDataTypeFilters;
    this.filterPriority = filterPriority;
}
 
源代码13 项目: hadoop   文件: DBInputFormat.java
protected RecordReader<LongWritable, T> createDBRecordReader(DBInputSplit split,
    Configuration conf) throws IOException {

  @SuppressWarnings("unchecked")
  Class<T> inputClass = (Class<T>) (dbConf.getInputClass());
  try {
    // use database product name to determine appropriate record reader.
    if (dbProductName.startsWith("ORACLE")) {
      // use Oracle-specific db reader.
      return new OracleDBRecordReader<T>(split, inputClass,
          conf, createConnection(), getDBConf(), conditions, fieldNames,
          tableName);
    } else if (dbProductName.startsWith("MYSQL")) {
      // use MySQL-specific db reader.
      return new MySQLDBRecordReader<T>(split, inputClass,
          conf, createConnection(), getDBConf(), conditions, fieldNames,
          tableName);
    } else {
      // Generic reader.
      return new DBRecordReader<T>(split, inputClass,
          conf, createConnection(), getDBConf(), conditions, fieldNames,
          tableName);
    }
  } catch (SQLException ex) {
    throw new IOException(ex.getMessage());
  }
}
 
public EdgeReader<Text, Text> createEdgeReader(final RecordReader<LongWritable,Text> rr) throws IOException {
    return new DGATextEdgeValueReader(){
        @Override
        protected RecordReader<LongWritable, Text> createLineRecordReader(InputSplit inputSplit, TaskAttemptContext context) throws IOException, InterruptedException {
            return rr;
        }
    };
}
 
源代码15 项目: hiped2   文件: CSVInputFormat.java
@Override
public RecordReader<LongWritable, TextArrayWritable>
createRecordReader(InputSplit split,
                   TaskAttemptContext context) {
  String csvDelimiter = HadoopCompat.getConfiguration(context).get( //<co id="ch02_comment_csv_inputformat1"/>
      CSV_TOKEN_SEPARATOR_CONFIG);

  Character separator = null;
  if(csvDelimiter != null && csvDelimiter.length() == 1) {
    separator = csvDelimiter.charAt(0);
  }

  return new CSVRecordReader(separator);             //<co id="ch02_comment_csv_inputformat2"/>
}
 
@Override
public RecordReader<K,RawRecordContainer> createRecordReader(InputSplit split, TaskAttemptContext context) throws IOException {
    SecureEventSequenceFileRecordReader<K> reader = new SecureEventSequenceFileRecordReader<>();
    try {
        reader.initialize(split, context);
    } catch (InterruptedException e) {
        throw new IOException("Error initializing SecureEventSequenceFileRecordReader", e);
    }
    return reader;
}
 
源代码17 项目: iceberg   文件: TestIcebergInputFormat.java
private static <T> Iterable<T> readRecords(
    IcebergInputFormat<T> inputFormat, InputSplit split, TaskAttemptContext context) {
  RecordReader<Void, T> recordReader = inputFormat.createRecordReader(split, context);
  List<T> records = new ArrayList<>();
  try {
    recordReader.initialize(split, context);
    while (recordReader.nextKeyValue()) {
      records.add(recordReader.getCurrentValue());
    }
  } catch (Exception e) {
    throw new RuntimeException(e);
  }
  return records;
}
 
源代码18 项目: big-c   文件: DataDrivenDBInputFormat.java
protected RecordReader<LongWritable, T> createDBRecordReader(DBInputSplit split,
    Configuration conf) throws IOException {

  DBConfiguration dbConf = getDBConf();
  @SuppressWarnings("unchecked")
  Class<T> inputClass = (Class<T>) (dbConf.getInputClass());
  String dbProductName = getDBProductName();

  LOG.debug("Creating db record reader for db product: " + dbProductName);

  try {
    // use database product name to determine appropriate record reader.
    if (dbProductName.startsWith("MYSQL")) {
      // use MySQL-specific db reader.
      return new MySQLDataDrivenDBRecordReader<T>(split, inputClass,
          conf, createConnection(), dbConf, dbConf.getInputConditions(),
          dbConf.getInputFieldNames(), dbConf.getInputTableName());
    } else {
      // Generic reader.
      return new DataDrivenDBRecordReader<T>(split, inputClass,
          conf, createConnection(), dbConf, dbConf.getInputConditions(),
          dbConf.getInputFieldNames(), dbConf.getInputTableName(),
          dbProductName);
    }
  } catch (SQLException ex) {
    throw new IOException(ex.getMessage());
  }
}
 
源代码19 项目: big-c   文件: GenerateDistCacheData.java
/**
 * Returns a reader for this split of the distributed cache file list.
 */
@Override
public RecordReader<LongWritable, BytesWritable> createRecordReader(
    InputSplit split, final TaskAttemptContext taskContext)
    throws IOException, InterruptedException {
  return new SequenceFileRecordReader<LongWritable, BytesWritable>();
}
 
@Test
public void readEthereumBlockInputFormatBlock0to10() throws IOException, EthereumBlockReadException, ParseException, InterruptedException {
	Configuration conf = new Configuration(defaultConf);
	ClassLoader classLoader = getClass().getClassLoader();
	String fileName="eth0to10.bin";
	String fileNameBlock=classLoader.getResource("testdata/"+fileName).getFile();
	Path file = new Path(fileNameBlock);
	Job job = Job.getInstance(conf);
	FileInputFormat.setInputPaths(job, file);
	EthereumBlockFileInputFormat format = new EthereumBlockFileInputFormat();

	List<InputSplit> splits = format.getSplits(job);
	TaskAttemptContext context = new TaskAttemptContextImpl(conf, new TaskAttemptID());
	assertEquals( 1, splits.size(),"Only one split generated for block 0..10");
	RecordReader<BytesWritable, EthereumBlock> reader = format.createRecordReader(splits.get(0), context);
	assertNotNull( reader,"Format returned  null RecordReader");
	reader.initialize(splits.get(0),context);
	BytesWritable key = new BytesWritable();
	EthereumBlock block = new EthereumBlock();
	int count=0;
	while (count<11) {
		if (reader.nextKeyValue()) {
			count++;
		}
	}
	assertEquals(11,count,"Block 0..10 contains 11 blocks");

	assertFalse( reader.nextKeyValue(),"No further blocks in block 0..10");
	reader.close();
}
 
@Override
public RecordReader<AvroKey<T>, NullWritable> createRecordReader(InputSplit split, TaskAttemptContext context)
    throws IOException {
  Class cls = AvroKeyRecordReaderWrapper.class;

  return new CombineFileRecordReader<>((CombineFileSplit) split, context,
      (Class<? extends RecordReader<AvroKey<T>, NullWritable>>) cls);
}
 
源代码22 项目: kite   文件: FileSystemViewKeyInputFormat.java
@Override
public RecordReader<E, Void> createRecordReader(InputSplit inputSplit,
    TaskAttemptContext taskAttemptContext) throws IOException, InterruptedException {
  RecordReader<E, Void> unfilteredRecordReader = createUnfilteredRecordReader
      (inputSplit, taskAttemptContext);
  if (view != null) {
    // use the constraints to filter out entities from the reader
    return new FilteredRecordReader<E>(unfilteredRecordReader,
        ((AbstractRefinableView) view).getConstraints(), view.getAccessor());
  }
  return unfilteredRecordReader;
}
 
源代码23 项目: Cubert   文件: PigAvroInputFormatAdaptor.java
@Override
public RecordReader<NullWritable, Writable> createRecordReader(InputSplit split,
                                                               TaskAttemptContext context) throws IOException,
        InterruptedException
{
    return getDelegate(context.getConfiguration()).createRecordReader(split, context);
}
 
源代码24 项目: phoenix   文件: PhoenixHBaseLoader.java
@SuppressWarnings("unchecked")
@Override
public void prepareToRead(RecordReader reader, PigSplit split) throws IOException {
    this.reader = reader;
    final String resourceSchemaAsStr = getValueFromUDFContext(this.contextSignature,RESOURCE_SCHEMA_SIGNATURE);
    if (resourceSchemaAsStr == null) {
        throw new IOException("Could not find schema in UDF context");
    }
   schema = (ResourceSchema)ObjectSerializer.deserialize(resourceSchemaAsStr); 
}
 
源代码25 项目: spork   文件: AllLoader.java
@Override
public RecordReader<Writable, Writable> createRecordReader(
        InputSplit inputSplit, TaskAttemptContext taskAttemptContext)
        throws IOException, InterruptedException {

    // this method plugs the AllReader into the system, and the
    // AllReader will when called select the correct LoadFunc
    // return new AllReader(udfSignature);
    return new AllReader(udfSignature);
}
 
源代码26 项目: big-c   文件: FixedLengthInputFormat.java
@Override
public RecordReader<LongWritable, BytesWritable>
    createRecordReader(InputSplit split, TaskAttemptContext context)
    throws IOException, InterruptedException {
  int recordLength = getRecordLength(context.getConfiguration());
  if (recordLength <= 0) {
    throw new IOException("Fixed record length " + recordLength
        + " is invalid.  It should be set to a value greater than zero");
  }
  return new FixedLengthRecordReader(recordLength);
}
 
源代码27 项目: emodb   文件: EmoInputFormat.java
@Override
public RecordReader<Text, Row> createRecordReader(InputSplit split, TaskAttemptContext context)
        throws IOException {
    FileSplit fileSplit  = (FileSplit) split;
    Path path = fileSplit.getPath();
    return new EmoRecordReader(BaseInputFormat.createRecordReader(context.getConfiguration(), path));
}
 
源代码28 项目: jumbune   文件: JsonFileInputFormat.java
@Override
public RecordReader<LongWritable, Text>  createRecordReader(InputSplit split, TaskAttemptContext context) {
	RecordReader<LongWritable, Text> recordReader = null;
	try 
    {
		recordReader = new JsonFileRecordReader(split, context);
    }
    catch (IOException ioe) 
    {
    	LOGGER.error(ioe);
    }
	return recordReader; 
}
 
源代码29 项目: datafu   文件: AvroMultipleInputsKeyInputFormat.java
/** {@inheritDoc} */
@Override
public RecordReader<AvroKey<T>, NullWritable> createRecordReader(
    InputSplit split, TaskAttemptContext context) throws IOException, InterruptedException 
{    
  Schema readerSchema = AvroMultipleInputsUtil.getInputKeySchemaForSplit(context.getConfiguration(), split);
  if (readerSchema == null)
  {
    throw new RuntimeException("Could not determine input schema");
  }
  return new AvroKeyRecordReader<T>(readerSchema);
}
 
@Override
public RecordReader<AvroWrapper<T>, NullWritable> createRecordReader(
    InputSplit split, TaskAttemptContext context) throws IOException,
    InterruptedException {
  context.setStatus(split.toString());
  return new AvroRecordReader<T>();
}
 
 同包方法