下面列出了org.apache.hadoop.mapred.SequenceFileRecordReader#org.apache.hadoop.mapred.RecordReader 实例代码,或者点击链接到github查看源代码,也可以在右侧发表评论。
@Test
public void readExcelInputFormatExcel2003SingleSheetEncryptedNegativeLowFootprint() throws IOException {
JobConf job = new JobConf(defaultConf);
ClassLoader classLoader = getClass().getClassLoader();
String fileName = "excel2003encrypt.xls";
String fileNameSpreadSheet = classLoader.getResource(fileName).getFile();
Path file = new Path(fileNameSpreadSheet);
FileInputFormat.setInputPaths(job, file);
// set locale to the one of the test data
job.set("hadoopoffice.read.locale.bcp47", "de");
// low footprint
job.set("hadoopoffice.read.lowFootprint", "true");
// for decryption simply set the password
job.set("hadoopoffice.read.security.crypt.password", "test2");
ExcelFileInputFormat format = new ExcelFileInputFormat();
format.configure(job);
InputSplit[] inputSplits = format.getSplits(job, 1);
assertEquals(1, inputSplits.length, "Only one split generated for Excel file");
RecordReader<Text, ArrayWritable> reader = format.getRecordReader(inputSplits[0], job, reporter);
assertNull(reader, "Null record reader implies invalid password");
}
@Override
public Long call()
throws Exception
{
RecordReader<LongWritable, Text> reader = _informat.getRecordReader(_split, _job, Reporter.NULL);
LongWritable key = new LongWritable();
Text value = new Text();
long nrows = 0;
// count rows from the first non-header row
try {
if ( _firstSplit && _hasHeader )
reader.next(key, value);
while ( reader.next(key, value) ) {
String val = value.toString();
nrows += ( val.startsWith(TfUtils.TXMTD_MVPREFIX)
|| val.startsWith(TfUtils.TXMTD_NDPREFIX)) ? 0 : 1;
}
}
finally {
IOUtilFunctions.closeSilently(reader);
}
return nrows;
}
/**
* Construct a Unmerged record reader that parallely consumes both parquet and log records and buffers for upstream
* clients to consume.
*
* @param split File split
* @param job Job Configuration
* @param realReader Parquet Reader
*/
public RealtimeUnmergedRecordReader(HoodieRealtimeFileSplit split, JobConf job,
RecordReader<NullWritable, ArrayWritable> realReader) {
super(split, job);
this.parquetReader = new SafeParquetRecordReaderWrapper(realReader);
// Iterator for consuming records from parquet file
this.parquetRecordsIterator = new RecordReaderValueIterator<>(this.parquetReader);
this.executor = new BoundedInMemoryExecutor<>(getMaxCompactionMemoryInBytes(), getParallelProducers(),
Option.empty(), x -> x, new DefaultSizeEstimator<>());
// Consumer of this record reader
this.iterator = this.executor.getQueue().iterator();
this.logRecordScanner = new HoodieUnMergedLogRecordScanner(FSUtils.getFs(split.getPath().toString(), jobConf),
split.getBasePath(), split.getDeltaLogPaths(), getReaderSchema(), split.getMaxCommitTime(),
Boolean.parseBoolean(jobConf.get(HoodieRealtimeConfig.COMPACTION_LAZY_BLOCK_READ_ENABLED_PROP, HoodieRealtimeConfig.DEFAULT_COMPACTION_LAZY_BLOCK_READ_ENABLED)),
false, jobConf.getInt(HoodieRealtimeConfig.MAX_DFS_STREAM_BUFFER_SIZE_PROP, HoodieRealtimeConfig.DEFAULT_MAX_DFS_STREAM_BUFFER_SIZE), record -> {
// convert Hoodie log record to Hadoop AvroWritable and buffer
GenericRecord rec = (GenericRecord) record.getData().getInsertValue(getReaderSchema()).get();
ArrayWritable aWritable = (ArrayWritable) HoodieRealtimeRecordReaderUtils.avroToArrayWritable(rec, getHiveSchema());
this.executor.getQueue().insertRecord(aWritable);
});
// Start reading and buffering
this.executor.startProducers();
}
@Override
public Void call() throws Exception {
LongWritable key = new LongWritable();
Text value = new Text();
FastStringTokenizer st = new FastStringTokenizer(' ');
RecordReader<LongWritable,Text> reader = _informat.getRecordReader(_split, _job, Reporter.NULL);
try {
//counting without locking as conflicts unlikely
while( reader.next(key, value) ) {
if( value.toString().charAt(0) == '%' )
continue;
st.reset( value.toString() );
_rNnz[(int)st.nextLong()-1] ++;
if( _isSymmetric )
_rNnz[(int)st.nextLong()-1] ++;
}
}
finally {
IOUtilFunctions.closeSilently(reader);
}
return null;
}
private void ensureRecordsInCommit(String msg, String commit, int expectedNumberOfRecordsInCommit,
int totalExpected) throws IOException {
int actualCount = 0;
int totalCount = 0;
InputSplit[] splits = inputFormat.getSplits(jobConf, 1);
for (InputSplit split : splits) {
RecordReader<NullWritable, ArrayWritable> recordReader = inputFormat.getRecordReader(split, jobConf, null);
NullWritable key = recordReader.createKey();
ArrayWritable writable = recordReader.createValue();
while (recordReader.next(key, writable)) {
// writable returns an array with [field1, field2, _hoodie_commit_time,
// _hoodie_commit_seqno]
// Take the commit time and compare with the one we are interested in
if (commit.equals((writable.get()[2]).toString())) {
actualCount++;
}
totalCount++;
}
}
assertEquals(expectedNumberOfRecordsInCommit, actualCount, msg);
assertEquals(totalExpected, totalCount, msg);
}
@Override
public RecordReader getRecordReader(JobConf job, CombineFileSplit split, Reporter reporter,
Class<RecordReader<K, V>> rrClass) throws IOException {
isRealTime = Boolean.valueOf(job.get("hudi.hive.realtime", "false"));
if (isRealTime) {
List<RecordReader> recordReaders = new LinkedList<>();
ValidationUtils.checkArgument(split instanceof HoodieCombineRealtimeFileSplit, "Only "
+ HoodieCombineRealtimeFileSplit.class.getName() + " allowed, found " + split.getClass().getName());
for (InputSplit inputSplit : ((HoodieCombineRealtimeFileSplit) split).getRealtimeFileSplits()) {
if (split.getPaths().length == 0) {
continue;
}
FileInputFormat inputFormat = HoodieInputFormatUtils.getInputFormat(split.getPath(0).toString(), true, job);
recordReaders.add(inputFormat.getRecordReader(inputSplit, job, reporter));
}
return new HoodieCombineRealtimeRecordReader(job, split, recordReaders);
}
return new HadoopShimsSecure.CombineFileRecordReader(job, split, reporter, rrClass);
}
@Test
public void readEthereumBlockInputFormatBlock1() throws IOException, EthereumBlockReadException, ParseException, InterruptedException {
JobConf job = new JobConf(defaultConf);
ClassLoader classLoader = getClass().getClassLoader();
String fileName="eth1.bin";
String fileNameBlock=classLoader.getResource("testdata/"+fileName).getFile();
Path file = new Path(fileNameBlock);
FileInputFormat.setInputPaths(job, file);
EthereumBlockFileInputFormat format = new EthereumBlockFileInputFormat();
format.configure(job);
InputSplit[] inputSplits = format.getSplits(job,1);
assertEquals( 1, inputSplits.length,"Only one split generated for genesis block");
RecordReader<BytesWritable, EthereumBlock> reader = format.getRecordReader(inputSplits[0], job, reporter);
assertNotNull( reader,"Format returned null RecordReader");
BytesWritable key = new BytesWritable();
EthereumBlock block = new EthereumBlock();
assertTrue( reader.next(key,block),"Input Split for block 1 contains at least one block");
assertEquals( 0, block.getEthereumTransactions().size(),"Block 1 must have 0 transactions");
assertFalse( reader.next(key,block),"No further blocks in block 1");
reader.close();
}
@Test
public void readExcelInputFormatExcel2013SingleSheetEncryptedNegativeLowFootprint() throws IOException {
JobConf job = new JobConf(defaultConf);
ClassLoader classLoader = getClass().getClassLoader();
String fileName = "excel2013encrypt.xlsx";
String fileNameSpreadSheet = classLoader.getResource(fileName).getFile();
Path file = new Path(fileNameSpreadSheet);
FileInputFormat.setInputPaths(job, file);
// set locale to the one of the test data
job.set("hadoopoffice.read.locale.bcp47", "de");
// low footprint
job.set("hadoopoffice.read.lowFootprint", "true");
job.set("hadoopoffice.read.lowFootprint.parser", "stax");
// for decryption simply set the password
job.set("hadoopoffice.read.security.crypt.password", "test2");
ExcelFileInputFormat format = new ExcelFileInputFormat();
format.configure(job);
InputSplit[] inputSplits = format.getSplits(job, 1);
assertEquals(1, inputSplits.length, "Only one split generated for Excel file");
RecordReader<Text, ArrayWritable> reader = format.getRecordReader(inputSplits[0], job, reporter);
assertNull(reader, "Null record reader implies invalid password");
}
static RecordReader<NullWritable, DynamoDBItemWritable> getRecordReader(
InputSplit inputSplit, JobConf job, Reporter reporter) throws IOException {
// CombineFileSplit indicates the new export format which includes a manifest file
if (inputSplit instanceof CombineFileSplit) {
int version = job.getInt(DynamoDBConstants.EXPORT_FORMAT_VERSION, -1);
if (version != ExportManifestRecordWriter.FORMAT_VERSION) {
throw new IOException("Unknown version: " + job.get(DynamoDBConstants
.EXPORT_FORMAT_VERSION));
}
return new ImportCombineFileRecordReader((CombineFileSplit) inputSplit, job, reporter);
} else if (inputSplit instanceof FileSplit) {
// FileSplit indicates the old data pipeline format which doesn't include a manifest file
Path path = ((FileSplit) inputSplit).getPath();
return new ImportRecordReader(job, path);
} else {
throw new IOException("Expecting CombineFileSplit or FileSplit but the input split type is:"
+ " " + inputSplit.getClass());
}
}
@Override
public RecordReader<NullWritable, ArrayWritable> getRecordReader(final InputSplit split, final JobConf job,
final Reporter reporter) throws IOException {
// TODO enable automatic predicate pushdown after fixing issues
// FileSplit fileSplit = (FileSplit) split;
// HoodieTableMetadata metadata = getTableMetadata(fileSplit.getPath().getParent());
// String tableName = metadata.getTableName();
// String mode = HoodieHiveUtil.readMode(job, tableName);
// if (HoodieHiveUtil.INCREMENTAL_SCAN_MODE.equals(mode)) {
// FilterPredicate predicate = constructHoodiePredicate(job, tableName, split);
// LOG.info("Setting parquet predicate push down as " + predicate);
// ParquetInputFormat.setFilterPredicate(job, predicate);
// clearOutExistingPredicate(job);
// }
return super.getRecordReader(split, job, reporter);
}
/**
* test DBInputFormat class. Class should split result for chunks
* @throws Exception
*/
@Test(timeout = 10000)
public void testDBInputFormat() throws Exception {
JobConf configuration = new JobConf();
setupDriver(configuration);
DBInputFormat<NullDBWritable> format = new DBInputFormat<NullDBWritable>();
format.setConf(configuration);
format.setConf(configuration);
DBInputFormat.DBInputSplit splitter = new DBInputFormat.DBInputSplit(1, 10);
Reporter reporter = mock(Reporter.class);
RecordReader<LongWritable, NullDBWritable> reader = format.getRecordReader(
splitter, configuration, reporter);
configuration.setInt(MRJobConfig.NUM_MAPS, 3);
InputSplit[] lSplits = format.getSplits(configuration, 3);
assertEquals(5, lSplits[0].getLength());
assertEquals(3, lSplits.length);
// test reader .Some simple tests
assertEquals(LongWritable.class, reader.createKey().getClass());
assertEquals(0, reader.getPos());
assertEquals(0, reader.getProgress(), 0.001);
reader.close();
}
public RecordReader getRecordReader(InputSplit split, JobConf job,
Reporter reporter) throws IOException {
FileSplit fileSplit = (FileSplit) split;
FileSystem fs = FileSystem.get(fileSplit.getPath().toUri(), job);
FSDataInputStream is = fs.open(fileSplit.getPath());
byte[] header = new byte[3];
RecordReader reader = null;
try {
is.readFully(header);
} catch (EOFException eof) {
reader = textInputFormat.getRecordReader(split, job, reporter);
} finally {
is.close();
}
if (header[0] == 'S' && header[1] == 'E' && header[2] == 'Q') {
reader = seqFileInputFormat.getRecordReader(split, job, reporter);
} else {
reader = textInputFormat.getRecordReader(split, job, reporter);
}
return reader;
}
@Override
public RecordReader<Text, SpreadSheetCellDAO> getRecordReader(InputSplit split, JobConf job, Reporter reporter)
throws IOException {
/** Create reader **/
try {
// send configuration option to ms excel. The format of the Excel (old vs new) is detected automaitcally
job.set(HadoopOfficeReadConfiguration.CONF_MIMETYPE,"ms-excel");
return new ExcelCellRecordReader( (FileSplit) split,job,reporter);
} catch (FormatNotUnderstoodException e) {
// log
LOGIF.error(e);
} catch (GeneralSecurityException gse) {
LOGIF.error(gse);
}
return null;
}
@Override
public RecordReader<Text, DynamoDBItemWritable> getRecordReader(InputSplit split, JobConf conf,
Reporter reporter) throws
IOException {
reporter.progress();
Map<String, String> columnMapping =
HiveDynamoDBUtil.fromJsonString(conf.get(DynamoDBConstants.DYNAMODB_COLUMN_MAPPING));
Map<String, String> hiveTypeMapping = HiveDynamoDBUtil.extractHiveTypeMapping(conf);
DynamoDBQueryFilter queryFilter = getQueryFilter(conf, columnMapping, hiveTypeMapping);
DynamoDBSplit bbSplit = (DynamoDBSplit) split;
bbSplit.setDynamoDBFilterPushdown(queryFilter);
Collection<String> attributes = (columnMapping == null ? null : columnMapping.values());
DynamoDBRecordReaderContext context = buildHiveDynamoDBRecordReaderContext(bbSplit, conf,
reporter, attributes);
return new DefaultDynamoDBRecordReader(context);
}
public S3SelectRecordCursor(
Configuration configuration,
Path path,
RecordReader<K, V> recordReader,
long totalBytes,
Properties splitSchema,
List<HiveColumnHandle> columns,
DateTimeZone hiveStorageTimeZone)
{
super(configuration, path, recordReader, totalBytes, updateSplitSchema(splitSchema, columns), columns, hiveStorageTimeZone);
}
public RecordReader<K,V> getRecordReader(
InputSplit ignored, JobConf conf, Reporter reporter) {
return new RecordReader<K,V>() {
public boolean next(K key, V value) throws IOException { return false; }
public K createKey() {
return ReflectionUtils.newInstance(keyclass, null);
}
public V createValue() {
return ReflectionUtils.newInstance(valclass, null);
}
public long getPos() throws IOException { return 0L; }
public void close() throws IOException { }
public float getProgress() throws IOException { return 0.0f; }
};
}
@Override
public RecordReader<BytesWritable,BytesWritable> getRecordReader(InputSplit split, JobConf job, Reporter reporter) throws IOException {
/** Create reader **/
try {
return new BitcoinRawBlockRecordReader( (FileSplit) split,job,reporter);
} catch (HadoopCryptoLedgerConfigurationException|BitcoinBlockReadException e) {
// log
LOGFI.error(e);
}
return null;
}
public RecordReader<LongWritable, Text> getRecordReader(
InputSplit ignored, JobConf conf, Reporter reporter) throws IOException {
return new RecordReader<LongWritable, Text>() {
boolean sentOneRecord = false;
public boolean next(LongWritable key, Text value)
throws IOException {
key.set(1);
value.set("dummy");
if (sentOneRecord == false) { // first call
sentOneRecord = true;
return true;
}
return false; // we have sent one record - we are done
}
public LongWritable createKey() {
return new LongWritable();
}
public Text createValue() {
return new Text();
}
public long getPos() throws IOException {
return 1;
}
public void close() throws IOException {
}
public float getProgress() throws IOException {
return 1;
}
};
}
@Test(enabled = true, dependsOnMethods = {"testWriteLongData"})
public void testReadLongData() throws Exception {
long sum = 0L;
long reccnt = 0L;
File folder = new File(m_workdir.toString());
File[] listfiles = folder.listFiles();
for (int idx = 0; idx < listfiles.length; ++idx) {
if (listfiles[idx].isFile()
&& listfiles[idx].getName().startsWith(MneConfigHelper.getBaseOutputName(m_conf, null))
&& listfiles[idx].getName().endsWith(MneConfigHelper.DEFAULT_FILE_EXTENSION)) {
System.out.println(String.format("Verifying : %s", listfiles[idx].getName()));
FileSplit split = new FileSplit(
new Path(m_workdir, listfiles[idx].getName()), 0, 0L, new String[0]);
InputFormat<NullWritable, MneDurableInputValue<Long>> inputFormat =
new MneInputFormat<MneDurableInputValue<Long>, Long>();
RecordReader<NullWritable, MneDurableInputValue<Long>> reader =
inputFormat.getRecordReader(split, m_conf, null);
MneDurableInputValue<Long> mdval = null;
NullWritable mdkey = reader.createKey();
while (true) {
mdval = reader.createValue();
if (reader.next(mdkey, mdval)) {
sum += mdval.getValue();
++reccnt;
} else {
break;
}
}
reader.close();
}
}
AssertJUnit.assertEquals(m_sum, sum);
AssertJUnit.assertEquals(m_reccnt, reccnt);
System.out.println(String.format("The checksum of long data is %d", sum));
}
protected Pair<Integer,Integer> computeCSVSize( Path path, JobConf job, FileSystem fs)
throws IOException
{
TextInputFormat informat = new TextInputFormat();
informat.configure(job);
InputSplit[] splits = informat.getSplits(job, 1);
splits = IOUtilFunctions.sortInputSplits(splits);
//compute number of columns
int ncol = IOUtilFunctions.countNumColumnsCSV(splits, informat, job, _props.getDelim());
//compute number of rows
int nrow = 0;
for( int i=0; i<splits.length; i++ )
{
RecordReader<LongWritable, Text> reader = informat.getRecordReader(splits[i], job, Reporter.NULL);
LongWritable key = new LongWritable();
Text value = new Text();
try
{
//ignore header of first split
if( i==0 && _props.hasHeader() )
reader.next(key, value);
//count remaining number of rows, ignore meta data
while ( reader.next(key, value) ) {
String val = value.toString();
nrow += ( val.startsWith(TfUtils.TXMTD_MVPREFIX)
|| val.startsWith(TfUtils.TXMTD_NDPREFIX)) ? 0 : 1;
}
}
finally {
IOUtilFunctions.closeSilently(reader);
}
}
return new Pair<>(nrow, ncol);
}
@Override
public Object call() throws Exception {
LongWritable key = new LongWritable();
Text value = new Text();
try {
FastStringTokenizer st = new FastStringTokenizer(' ');
int[] ix = new int[_dest.getNumDims()];
RecordReader<LongWritable, Text> reader = _informat.getRecordReader(_split, _job, Reporter.NULL);
try {
while (reader.next(key, value)) {
st.reset(value.toString());
for (int i = 0; i < ix.length; i++) {
ix[i] = st.nextInt() - 1;
}
_dest.set(ix, st.nextToken());
}
}
finally {
IOUtilFunctions.closeSilently(reader);
}
}
catch (Exception ex) {
throw new IOException("Unable to read tensor in text cell format.", ex);
}
return null;
}
@Test
public void readExcelInputFormatExcel2013MultiSheetHeaderRegExLowFootprint() throws IOException {
JobConf job = new JobConf(defaultConf);
ClassLoader classLoader = getClass().getClassLoader();
String fileName = "multisheetheader.xlsx";
String fileNameSpreadSheet = classLoader.getResource(fileName).getFile();
Path file = new Path(fileNameSpreadSheet);
FileInputFormat.setInputPaths(job, file);
// set locale to the one of the test data
job.set("hadoopoffice.read.locale.bcp47", "us");
job.set("hadoopoffice.read.header.read", "true");
job.set("hadoopoffice.read.header.skipheaderinallsheets", "true");
job.set("hadoopoffice.read.header.column.names.regex", "column");
job.set("hadoopoffice.read.header.column.names.replace", "spalte");
job.set("hadoopoffice.read.lowFootprint", "true");
job.set("hadoopoffice.read.lowFootprint.parser", "stax");
ExcelFileInputFormat format = new ExcelFileInputFormat();
format.configure(job);
InputSplit[] inputSplits = format.getSplits(job, 1);
assertEquals(1, inputSplits.length, "Only one split generated for Excel file");
RecordReader<Text, ArrayWritable> reader = format.getRecordReader(inputSplits[0], job, reporter);
assertNotNull(reader, "Format returned null RecordReader");
assertEquals("spalte1", ((ExcelRecordReader) reader).getOfficeReader().getCurrentParser().getHeader()[0],
" header column 1 correctly read");
assertEquals("spalte2", ((ExcelRecordReader) reader).getOfficeReader().getCurrentParser().getHeader()[1],
" header column 2 correctly read");
assertEquals("spalte3", ((ExcelRecordReader) reader).getOfficeReader().getCurrentParser().getHeader()[2],
" header column 3 correctly read");
}
@Test
public void readEthereumBlockInputFormatBlock403419() throws IOException, EthereumBlockReadException, ParseException, InterruptedException {
JobConf job = new JobConf(defaultConf);
ClassLoader classLoader = getClass().getClassLoader();
String fileName="block403419.bin";
String fileNameBlock=classLoader.getResource("testdata/"+fileName).getFile();
Path file = new Path(fileNameBlock);
FileInputFormat.setInputPaths(job, file);
EthereumBlockFileInputFormat format = new EthereumBlockFileInputFormat();
format.configure(job);
InputSplit[] inputSplits = format.getSplits(job,1);
assertEquals( 1, inputSplits.length,"Only one split generated for block 403419");
RecordReader<BytesWritable, EthereumBlock> reader = format.getRecordReader(inputSplits[0], job, reporter);
assertNotNull( reader,"Format returned null RecordReader");
BytesWritable key = new BytesWritable();
EthereumBlock block = new EthereumBlock();
assertTrue( reader.next(key,block),"Input Split for block 403419 contains at least one block");
assertEquals( 2, block.getEthereumTransactions().size(),"Block 403419 must have 2 transactions");
EthereumBlockHeader ethereumBlockHeader = block.getEthereumBlockHeader();
assertEquals(
"f8b483dba2c3b7176a3da549ad41a48bb3121069",
bytesToHex(ethereumBlockHeader.getCoinBase()).toLowerCase(),
"Block 403419 was mined by f8b483dba2c3b7176a3da549ad41a48bb3121069"
);
assertEquals(
"08741fa532c05804d9c1086a311e47cc024bbc43980f561041ad1fbb3c223322",
bytesToHex(ethereumBlockHeader.getParentHash()).toLowerCase(),
"The parent of block 403419 has hash 08741fa532c05804d9c1086a311e47cc024bbc43980f561041ad1fbb3c223322"
);
assertFalse( reader.next(key,block),"No further lock 403419 in genesis Block");
reader.close();
}
public void run(RecordReader<K1, V1> input, OutputCollector<K2, V2> output,
Reporter reporter)
throws IOException {
PipeMapper pipeMapper = (PipeMapper)getMapper();
pipeMapper.startOutputThreads(output, reporter);
super.run(input, output, reporter);
}
@Override
public RecordReader<NullWritable, ArrayWritable> getRecordReader(final InputSplit split, final JobConf jobConf,
final Reporter reporter) throws IOException {
// Hive on Spark invokes multiple getRecordReaders from different threads in the same spark task (and hence the
// same JVM) unlike Hive on MR. Due to this, accesses to JobConf, which is shared across all threads, is at the
// risk of experiencing race conditions. Hence, we synchronize on the JobConf object here. There is negligible
// latency incurred here due to the synchronization since get record reader is called once per spilt before the
// actual heavy lifting of reading the parquet files happen.
if (jobConf.get(HoodieInputFormatUtils.HOODIE_READ_COLUMNS_PROP) == null) {
synchronized (jobConf) {
LOG.info(
"Before adding Hoodie columns, Projections :" + jobConf.get(ColumnProjectionUtils.READ_COLUMN_NAMES_CONF_STR)
+ ", Ids :" + jobConf.get(ColumnProjectionUtils.READ_COLUMN_IDS_CONF_STR));
if (jobConf.get(HoodieInputFormatUtils.HOODIE_READ_COLUMNS_PROP) == null) {
// Hive (across all versions) fails for queries like select count(`_hoodie_commit_time`) from table;
// In this case, the projection fields gets removed. Looking at HiveInputFormat implementation, in some cases
// hoodie additional projection columns are reset after calling setConf and only natural projections
// (one found in select queries) are set. things would break because of this.
// For e:g _hoodie_record_key would be missing and merge step would throw exceptions.
// TO fix this, hoodie columns are appended late at the time record-reader gets built instead of construction
// time.
cleanProjectionColumnIds(jobConf);
addRequiredProjectionFields(jobConf);
this.conf = jobConf;
this.conf.set(HoodieInputFormatUtils.HOODIE_READ_COLUMNS_PROP, "true");
}
}
}
LOG.info("Creating record reader with readCols :" + jobConf.get(ColumnProjectionUtils.READ_COLUMN_NAMES_CONF_STR)
+ ", Ids :" + jobConf.get(ColumnProjectionUtils.READ_COLUMN_IDS_CONF_STR));
// sanity check
ValidationUtils.checkArgument(split instanceof HoodieRealtimeFileSplit,
"HoodieRealtimeRecordReader can only work on HoodieRealtimeFileSplit and not with " + split);
return new HoodieRealtimeRecordReader((HoodieRealtimeFileSplit) split, jobConf,
super.getRecordReader(split, jobConf, reporter));
}
public RecordReader<Object, Object> getRecordReader(InputSplit split,
JobConf job, Reporter reporter) throws IOException {
return new RecordReader<Object, Object>() {
boolean once = false;
public boolean next(Object key, Object value) throws IOException {
if (!once) {
once = true;
return true;
}
return false;
}
public Object createKey() {
return new Object();
}
public Object createValue() {
return new Object();
}
public long getPos() throws IOException {
return 0L;
}
public void close() throws IOException {
}
public float getProgress() throws IOException {
return 0.0f;
}
};
}
public RecordReader<K,V> getRecordReader(
InputSplit ignored, JobConf conf, Reporter reporter) {
return new RecordReader<K,V>() {
public boolean next(K key, V value) throws IOException { return false; }
public K createKey() {
return ReflectionUtils.newInstance(keyclass, null);
}
public V createValue() {
return ReflectionUtils.newInstance(valclass, null);
}
public long getPos() throws IOException { return 0L; }
public void close() throws IOException { }
public float getProgress() throws IOException { return 0.0f; }
};
}
@Test
public void readExcelInputFormatExcel2013MultiSheetHeaderRegExLowFootprint() throws IOException {
JobConf job = new JobConf(defaultConf);
ClassLoader classLoader = getClass().getClassLoader();
String fileName = "multisheetheader.xlsx";
String fileNameSpreadSheet = classLoader.getResource(fileName).getFile();
Path file = new Path(fileNameSpreadSheet);
FileInputFormat.setInputPaths(job, file);
// set locale to the one of the test data
job.set("hadoopoffice.read.locale.bcp47", "us");
job.set("hadoopoffice.read.header.read", "true");
job.set("hadoopoffice.read.header.skipheaderinallsheets", "true");
job.set("hadoopoffice.read.header.column.names.regex", "column");
job.set("hadoopoffice.read.header.column.names.replace", "spalte");
job.set("hadoopoffice.read.lowFootprint", "true");
job.set("hadoopoffice.read.lowFootprint.parser", "sax");
ExcelFileInputFormat format = new ExcelFileInputFormat();
format.configure(job);
InputSplit[] inputSplits = format.getSplits(job, 1);
assertEquals(1, inputSplits.length, "Only one split generated for Excel file");
RecordReader<Text, ArrayWritable> reader = format.getRecordReader(inputSplits[0], job, reporter);
assertNotNull(reader, "Format returned null RecordReader");
assertEquals("spalte1", ((ExcelRecordReader) reader).getOfficeReader().getCurrentParser().getHeader()[0],
" header column 1 correctly read");
assertEquals("spalte2", ((ExcelRecordReader) reader).getOfficeReader().getCurrentParser().getHeader()[1],
" header column 2 correctly read");
assertEquals("spalte3", ((ExcelRecordReader) reader).getOfficeReader().getCurrentParser().getHeader()[2],
" header column 3 correctly read");
}
/**
* Create a handler that will handle any records output from the application.
* @param collector the "real" collector that takes the output
* @param reporter the reporter for reporting progress
*/
public OutputHandler(OutputCollector<K, V> collector, Reporter reporter,
RecordReader<FloatWritable,NullWritable> recordReader,
String expectedDigest) {
this.reporter = reporter;
this.collector = collector;
this.recordReader = recordReader;
this.expectedDigest = expectedDigest;
}
/** {@inheritDoc} */
@SuppressWarnings("unchecked") // Explicit check for value class agreement
public V createValue() {
if (null == valueclass) {
final Class<?> cls = kids[0].createValue().getClass();
for (RecordReader<K,? extends V> rr : kids) {
if (!cls.equals(rr.createValue().getClass())) {
throw new ClassCastException("Child value classes fail to agree");
}
}
valueclass = cls.asSubclass(Writable.class);
ivalue = createInternalValue();
}
return (V) ReflectionUtils.newInstance(valueclass, null);
}