下面列出了怎么用org.apache.hadoop.mapreduce.RecordReader的API类实例代码及写法,或者点击链接到github查看源代码。
@Test
public void readEthereumBlockInputFormatBlock1346406() throws IOException, EthereumBlockReadException, ParseException, InterruptedException {
Configuration conf = new Configuration(defaultConf);
ClassLoader classLoader = getClass().getClassLoader();
String fileName="eth1346406.bin";
String fileNameBlock=classLoader.getResource("testdata/"+fileName).getFile();
Path file = new Path(fileNameBlock);
Job job = Job.getInstance(conf);
FileInputFormat.setInputPaths(job, file);
EthereumBlockFileInputFormat format = new EthereumBlockFileInputFormat();
List<InputSplit> splits = format.getSplits(job);
TaskAttemptContext context = new TaskAttemptContextImpl(conf, new TaskAttemptID());
assertEquals( 1, splits.size(),"Only one split generated for block 1346406");
RecordReader<BytesWritable, EthereumBlock> reader = format.createRecordReader(splits.get(0), context);
assertNotNull( reader,"Format returned null RecordReader");
reader.initialize(splits.get(0),context);
BytesWritable key = new BytesWritable();
EthereumBlock block = new EthereumBlock();
assertTrue( reader.nextKeyValue(),"Input Split for block 1346406 contains at least one block");
key=reader.getCurrentKey();
block=reader.getCurrentValue();
assertEquals( 6, block.getEthereumTransactions().size(),"Block 1346406 must have 6 transactions");
assertFalse( reader.nextKeyValue(),"No further blocks in block 1346406");
reader.close();
}
@Test
public void readEthereumBlockInputFormatBlock1() throws IOException, EthereumBlockReadException, ParseException, InterruptedException {
Configuration conf = new Configuration(defaultConf);
ClassLoader classLoader = getClass().getClassLoader();
String fileName="eth1.bin";
String fileNameBlock=classLoader.getResource("testdata/"+fileName).getFile();
Path file = new Path(fileNameBlock);
Job job = Job.getInstance(conf);
FileInputFormat.setInputPaths(job, file);
EthereumBlockFileInputFormat format = new EthereumBlockFileInputFormat();
List<InputSplit> splits = format.getSplits(job);
TaskAttemptContext context = new TaskAttemptContextImpl(conf, new TaskAttemptID());
assertEquals( 1, splits.size(),"Only one split generated for block 1");
RecordReader<BytesWritable, EthereumBlock> reader = format.createRecordReader(splits.get(0), context);
assertNotNull( reader,"Format returned null RecordReader");
reader.initialize(splits.get(0),context);
BytesWritable key = new BytesWritable();
EthereumBlock block = new EthereumBlock();
assertTrue( reader.nextKeyValue(),"Input Split for block 1 contains at least one block");
key=reader.getCurrentKey();
block=reader.getCurrentValue();
assertEquals( 0, block.getEthereumTransactions().size(),"Block 1 must have 0 transactions");
assertFalse( reader.nextKeyValue(),"No further blocks in block 1");
reader.close();
}
@Override
public RecordReader<Void, IndexedRecord> createRecordReader(InputSplit split, TaskAttemptContext context) throws IOException {
String encoding = context.getConfiguration().get(TALEND_ENCODING);
String sheet = context.getConfiguration().get(TALEND_EXCEL_SHEET_NAME);
long header = context.getConfiguration().getLong(TALEND_HEADER, 0l);
long footer = context.getConfiguration().getLong(TALEND_FOOTER, 0l);
String excelFormat = context.getConfiguration().get(TALEND_EXCEL_FORMAT, "EXCEL2007");
long limit = context.getConfiguration().getLong(TALEND_EXCEL_LIMIT, -1);
if("EXCEL2007".equals(excelFormat)) {
return new Excel2007FileRecordReader(sheet, header, footer, limit);
} else if("EXCEL97".equals(excelFormat)) {
return new Excel97FileRecordReader(sheet, header, footer, limit);
} else if("HTML".equals(excelFormat)) {
return new ExcelHTMLFileRecordReader(encoding, header, footer, limit);
}
throw new IOException("not a valid excel format");
}
@Override
public RecordReader<E, Void> createRecordReader(InputSplit split,
TaskAttemptContext context)
throws IOException, InterruptedException {
Configuration conf = Hadoop.TaskAttemptContext
.getConfiguration.invoke(context);
Path path;
if (split instanceof FileSplit) {
path = ((FileSplit) split).getPath();
} else {
throw new DatasetOperationException(
"Split is not a FileSplit: %s:%s",
split.getClass().getCanonicalName(), split);
}
JSONFileReader<E> reader = new JSONFileReader<E>(
path.getFileSystem(conf), path, accessor);
reader.initialize();
return reader.asRecordReader();
}
/**
* Add mapper(the first mapper) that reads input from the input
* context and writes to queue
*/
@SuppressWarnings("unchecked")
void addMapper(TaskInputOutputContext inputContext,
ChainBlockingQueue<KeyValuePair<?, ?>> output, int index)
throws IOException, InterruptedException {
Configuration conf = getConf(index);
Class<?> keyOutClass = conf.getClass(MAPPER_OUTPUT_KEY_CLASS, Object.class);
Class<?> valueOutClass = conf.getClass(MAPPER_OUTPUT_VALUE_CLASS,
Object.class);
RecordReader rr = new ChainRecordReader(inputContext);
RecordWriter rw = new ChainRecordWriter(keyOutClass, valueOutClass, output,
conf);
Mapper.Context mapperContext = createMapContext(rr, rw,
(MapContext) inputContext, getConf(index));
MapRunner runner = new MapRunner(mappers.get(index), mapperContext, rr, rw);
threads.add(runner);
}
@Override
/** {@inheritDoc} */
protected RecordReader<LongWritable, T> createDBRecordReader(
DBInputSplit split, Configuration conf) throws IOException {
DBConfiguration dbConf = getDBConf();
Class<T> inputClass = (Class<T>) (dbConf.getInputClass());
String dbProductName = getDBProductName();
LOG.debug("Creating db record reader for db product: " + dbProductName);
try {
return new SQLServerDBRecordReader<T>(split, inputClass,
conf, getConnection(), dbConf, dbConf.getInputConditions(),
dbConf.getInputFieldNames(), dbConf.getInputTableName(),
dbProductName);
} catch (SQLException ex) {
throw new IOException(ex);
}
}
@Test
public void readEthereumBlockInputFormatGenesisBlock() throws IOException, EthereumBlockReadException, ParseException, InterruptedException {
Configuration conf = new Configuration(defaultConf);
ClassLoader classLoader = getClass().getClassLoader();
String fileName="ethgenesis.bin";
String fileNameBlock=classLoader.getResource("testdata/"+fileName).getFile();
Path file = new Path(fileNameBlock);
Job job = Job.getInstance(conf);
FileInputFormat.setInputPaths(job, file);
EthereumBlockFileInputFormat format = new EthereumBlockFileInputFormat();
List<InputSplit> splits = format.getSplits(job);
TaskAttemptContext context = new TaskAttemptContextImpl(conf, new TaskAttemptID());
assertEquals( 1, splits.size(),"Only one split generated for genesis block");
RecordReader<BytesWritable, EthereumBlock> reader = format.createRecordReader(splits.get(0), context);
assertNotNull( reader,"Format returned null RecordReader");
reader.initialize(splits.get(0),context);
BytesWritable key = new BytesWritable();
EthereumBlock block = new EthereumBlock();
assertTrue( reader.nextKeyValue(),"Input Split for genesis block contains at least one block");
key=reader.getCurrentKey();
block=reader.getCurrentValue();
assertEquals( 0, block.getEthereumTransactions().size(),"Genesis Block must have 0 transactions");
assertFalse( reader.nextKeyValue(),"No further blocks in genesis Block");
reader.close();
}
@Test
public void readExcelInputFormatExcel2013SingleSheetEncryptedNegativeLowFootprint()
throws IOException, InterruptedException {
Configuration conf = new Configuration(defaultConf);
ClassLoader classLoader = getClass().getClassLoader();
String fileName = "excel2013encrypt.xlsx";
String fileNameSpreadSheet = classLoader.getResource(fileName).getFile();
Path file = new Path(fileNameSpreadSheet);
// set locale to the one of the test data
conf.set("hadoopoffice.read.locale.bcp47", "de");
// low footprint
conf.set("hadoopoffice.read.lowFootprint", "true");
conf.set("hadoopoffice.read.lowFootprint.parser", "sax");
// for decryption simply set the password
conf.set("hadoopoffice.read.security.crypt.password", "test2");
Job job = Job.getInstance(conf);
FileInputFormat.setInputPaths(job, file);
TaskAttemptContext context = new TaskAttemptContextImpl(conf, new TaskAttemptID());
ExcelFileInputFormat format = new ExcelFileInputFormat();
List<InputSplit> splits = format.getSplits(job);
assertEquals(1, splits.size(), "Only one split generated for Excel file");
RecordReader<Text, ArrayWritable> reader = format.createRecordReader(splits.get(0), context);
InterruptedException ex = assertThrows(InterruptedException.class,
() -> reader.initialize(splits.get(0), context), "Exception is thrown in case of wrong password");
}
@Override
public RecordReader<LongWritable, Text> createRecordReader(InputSplit split, TaskAttemptContext context) {
try {
STARTING_TAG = context.getConfiguration().get("start_tag");
ENDING_TAG = context.getConfiguration().get("end_tag");
return new XmlRecordReader((FileSplit) split, context.getConfiguration());
} catch (IOException ioe) {
return null;
}
}
@SuppressWarnings("unchecked")
@Override
public RecordReader<AvroKey<T>, NullWritable> createRecordReader(InputSplit inputSplit,
TaskAttemptContext context) throws IOException
{
Schema readerSchema = AvroJob.getInputKeySchema(context.getConfiguration());
if (null == readerSchema) {
LOG.warn("Reader schema was not set. Use AvroJob.setInputKeySchema() if desired.");
LOG.info("Using a reader schema equal to the writer schema.");
}
Object c = CombinedAvroKeyRecordReader.class;
return new CombineFileRecordReader<AvroKey<T>, NullWritable>((CombineFileSplit) inputSplit, context, (Class<? extends RecordReader<AvroKey<T>, NullWritable>>)c);
}
public MapContextImpl(Configuration conf, TaskAttemptID taskid,
RecordReader<KEYIN,VALUEIN> reader,
RecordWriter<KEYOUT,VALUEOUT> writer,
OutputCommitter committer,
StatusReporter reporter,
InputSplit split) {
super(conf, taskid, writer, committer, reporter);
this.reader = reader;
this.split = split;
}
public Type(String name, String outputName, Class<? extends IngestHelperInterface> helperClass, Class<? extends RecordReader<?,?>> readerClass,
String[] defaultDataTypeHandlers, int filterPriority, String[] defaultDataTypeFilters) {
this.name = name;
this.outputName = outputName;
this.helperClass = helperClass;
this.readerClass = readerClass;
this.defaultDataTypeHandlers = defaultDataTypeHandlers;
this.defaultDataTypeFilters = defaultDataTypeFilters;
this.filterPriority = filterPriority;
}
protected RecordReader<LongWritable, T> createDBRecordReader(DBInputSplit split,
Configuration conf) throws IOException {
@SuppressWarnings("unchecked")
Class<T> inputClass = (Class<T>) (dbConf.getInputClass());
try {
// use database product name to determine appropriate record reader.
if (dbProductName.startsWith("ORACLE")) {
// use Oracle-specific db reader.
return new OracleDBRecordReader<T>(split, inputClass,
conf, createConnection(), getDBConf(), conditions, fieldNames,
tableName);
} else if (dbProductName.startsWith("MYSQL")) {
// use MySQL-specific db reader.
return new MySQLDBRecordReader<T>(split, inputClass,
conf, createConnection(), getDBConf(), conditions, fieldNames,
tableName);
} else {
// Generic reader.
return new DBRecordReader<T>(split, inputClass,
conf, createConnection(), getDBConf(), conditions, fieldNames,
tableName);
}
} catch (SQLException ex) {
throw new IOException(ex.getMessage());
}
}
public EdgeReader<Text, Text> createEdgeReader(final RecordReader<LongWritable,Text> rr) throws IOException {
return new DGATextEdgeValueReader(){
@Override
protected RecordReader<LongWritable, Text> createLineRecordReader(InputSplit inputSplit, TaskAttemptContext context) throws IOException, InterruptedException {
return rr;
}
};
}
@Override
public RecordReader<LongWritable, TextArrayWritable>
createRecordReader(InputSplit split,
TaskAttemptContext context) {
String csvDelimiter = HadoopCompat.getConfiguration(context).get( //<co id="ch02_comment_csv_inputformat1"/>
CSV_TOKEN_SEPARATOR_CONFIG);
Character separator = null;
if(csvDelimiter != null && csvDelimiter.length() == 1) {
separator = csvDelimiter.charAt(0);
}
return new CSVRecordReader(separator); //<co id="ch02_comment_csv_inputformat2"/>
}
@Override
public RecordReader<K,RawRecordContainer> createRecordReader(InputSplit split, TaskAttemptContext context) throws IOException {
SecureEventSequenceFileRecordReader<K> reader = new SecureEventSequenceFileRecordReader<>();
try {
reader.initialize(split, context);
} catch (InterruptedException e) {
throw new IOException("Error initializing SecureEventSequenceFileRecordReader", e);
}
return reader;
}
private static <T> Iterable<T> readRecords(
IcebergInputFormat<T> inputFormat, InputSplit split, TaskAttemptContext context) {
RecordReader<Void, T> recordReader = inputFormat.createRecordReader(split, context);
List<T> records = new ArrayList<>();
try {
recordReader.initialize(split, context);
while (recordReader.nextKeyValue()) {
records.add(recordReader.getCurrentValue());
}
} catch (Exception e) {
throw new RuntimeException(e);
}
return records;
}
protected RecordReader<LongWritable, T> createDBRecordReader(DBInputSplit split,
Configuration conf) throws IOException {
DBConfiguration dbConf = getDBConf();
@SuppressWarnings("unchecked")
Class<T> inputClass = (Class<T>) (dbConf.getInputClass());
String dbProductName = getDBProductName();
LOG.debug("Creating db record reader for db product: " + dbProductName);
try {
// use database product name to determine appropriate record reader.
if (dbProductName.startsWith("MYSQL")) {
// use MySQL-specific db reader.
return new MySQLDataDrivenDBRecordReader<T>(split, inputClass,
conf, createConnection(), dbConf, dbConf.getInputConditions(),
dbConf.getInputFieldNames(), dbConf.getInputTableName());
} else {
// Generic reader.
return new DataDrivenDBRecordReader<T>(split, inputClass,
conf, createConnection(), dbConf, dbConf.getInputConditions(),
dbConf.getInputFieldNames(), dbConf.getInputTableName(),
dbProductName);
}
} catch (SQLException ex) {
throw new IOException(ex.getMessage());
}
}
/**
* Returns a reader for this split of the distributed cache file list.
*/
@Override
public RecordReader<LongWritable, BytesWritable> createRecordReader(
InputSplit split, final TaskAttemptContext taskContext)
throws IOException, InterruptedException {
return new SequenceFileRecordReader<LongWritable, BytesWritable>();
}
@Test
public void readEthereumBlockInputFormatBlock0to10() throws IOException, EthereumBlockReadException, ParseException, InterruptedException {
Configuration conf = new Configuration(defaultConf);
ClassLoader classLoader = getClass().getClassLoader();
String fileName="eth0to10.bin";
String fileNameBlock=classLoader.getResource("testdata/"+fileName).getFile();
Path file = new Path(fileNameBlock);
Job job = Job.getInstance(conf);
FileInputFormat.setInputPaths(job, file);
EthereumBlockFileInputFormat format = new EthereumBlockFileInputFormat();
List<InputSplit> splits = format.getSplits(job);
TaskAttemptContext context = new TaskAttemptContextImpl(conf, new TaskAttemptID());
assertEquals( 1, splits.size(),"Only one split generated for block 0..10");
RecordReader<BytesWritable, EthereumBlock> reader = format.createRecordReader(splits.get(0), context);
assertNotNull( reader,"Format returned null RecordReader");
reader.initialize(splits.get(0),context);
BytesWritable key = new BytesWritable();
EthereumBlock block = new EthereumBlock();
int count=0;
while (count<11) {
if (reader.nextKeyValue()) {
count++;
}
}
assertEquals(11,count,"Block 0..10 contains 11 blocks");
assertFalse( reader.nextKeyValue(),"No further blocks in block 0..10");
reader.close();
}
@Override
public RecordReader<AvroKey<T>, NullWritable> createRecordReader(InputSplit split, TaskAttemptContext context)
throws IOException {
Class cls = AvroKeyRecordReaderWrapper.class;
return new CombineFileRecordReader<>((CombineFileSplit) split, context,
(Class<? extends RecordReader<AvroKey<T>, NullWritable>>) cls);
}
@Override
public RecordReader<E, Void> createRecordReader(InputSplit inputSplit,
TaskAttemptContext taskAttemptContext) throws IOException, InterruptedException {
RecordReader<E, Void> unfilteredRecordReader = createUnfilteredRecordReader
(inputSplit, taskAttemptContext);
if (view != null) {
// use the constraints to filter out entities from the reader
return new FilteredRecordReader<E>(unfilteredRecordReader,
((AbstractRefinableView) view).getConstraints(), view.getAccessor());
}
return unfilteredRecordReader;
}
@Override
public RecordReader<NullWritable, Writable> createRecordReader(InputSplit split,
TaskAttemptContext context) throws IOException,
InterruptedException
{
return getDelegate(context.getConfiguration()).createRecordReader(split, context);
}
@SuppressWarnings("unchecked")
@Override
public void prepareToRead(RecordReader reader, PigSplit split) throws IOException {
this.reader = reader;
final String resourceSchemaAsStr = getValueFromUDFContext(this.contextSignature,RESOURCE_SCHEMA_SIGNATURE);
if (resourceSchemaAsStr == null) {
throw new IOException("Could not find schema in UDF context");
}
schema = (ResourceSchema)ObjectSerializer.deserialize(resourceSchemaAsStr);
}
@Override
public RecordReader<Writable, Writable> createRecordReader(
InputSplit inputSplit, TaskAttemptContext taskAttemptContext)
throws IOException, InterruptedException {
// this method plugs the AllReader into the system, and the
// AllReader will when called select the correct LoadFunc
// return new AllReader(udfSignature);
return new AllReader(udfSignature);
}
@Override
public RecordReader<LongWritable, BytesWritable>
createRecordReader(InputSplit split, TaskAttemptContext context)
throws IOException, InterruptedException {
int recordLength = getRecordLength(context.getConfiguration());
if (recordLength <= 0) {
throw new IOException("Fixed record length " + recordLength
+ " is invalid. It should be set to a value greater than zero");
}
return new FixedLengthRecordReader(recordLength);
}
@Override
public RecordReader<Text, Row> createRecordReader(InputSplit split, TaskAttemptContext context)
throws IOException {
FileSplit fileSplit = (FileSplit) split;
Path path = fileSplit.getPath();
return new EmoRecordReader(BaseInputFormat.createRecordReader(context.getConfiguration(), path));
}
@Override
public RecordReader<LongWritable, Text> createRecordReader(InputSplit split, TaskAttemptContext context) {
RecordReader<LongWritable, Text> recordReader = null;
try
{
recordReader = new JsonFileRecordReader(split, context);
}
catch (IOException ioe)
{
LOGGER.error(ioe);
}
return recordReader;
}
/** {@inheritDoc} */
@Override
public RecordReader<AvroKey<T>, NullWritable> createRecordReader(
InputSplit split, TaskAttemptContext context) throws IOException, InterruptedException
{
Schema readerSchema = AvroMultipleInputsUtil.getInputKeySchemaForSplit(context.getConfiguration(), split);
if (readerSchema == null)
{
throw new RuntimeException("Could not determine input schema");
}
return new AvroKeyRecordReader<T>(readerSchema);
}
@Override
public RecordReader<AvroWrapper<T>, NullWritable> createRecordReader(
InputSplit split, TaskAttemptContext context) throws IOException,
InterruptedException {
context.setStatus(split.toString());
return new AvroRecordReader<T>();
}