下面列出了org.apache.hadoop.mapred.FileSplit#getLength ( ) 实例代码,或者点击链接到github查看源代码,也可以在右侧发表评论。
private InputStream getInputStream(JobConf jobConf, FileSplit split) throws IOException, ClassNotFoundException {
FSDataInputStream fsin = null;
// open the file and seek to the start of the split
long splitStart = split.getStart();
long splitEnd = splitStart + split.getLength();
Path file = split.getPath();
FileSystem fs = file.getFileSystem(jobConf);
fsin = fs.open(split.getPath());
fsin.seek(splitStart);
Configuration conf = new Configuration();
CompressionCodecFactory compressionCodecFactory = new CompressionCodecFactory(conf);
CompressionCodec codec = compressionCodecFactory.getCodec(split.getPath());
Decompressor decompressor = CodecPool.getDecompressor(codec);
if (codec instanceof SplittableCompressionCodec) {
return ((SplittableCompressionCodec) codec).createInputStream(fsin,
decompressor,
splitStart,
splitEnd,
SplittableCompressionCodec.READ_MODE.BYBLOCK);
} else {
return codec.createInputStream(fsin, decompressor);
}
}
@Override
public RecordReader<NullWritable,ColumnAndIndex> getRecordReader( final InputSplit split, final JobConf job, final Reporter reporter ) throws IOException {
FileSplit fileSplit = (FileSplit)split;
Path path = fileSplit.getPath();
FileSystem fs = path.getFileSystem( job );
long fileLength = fs.getLength( path );
long start = fileSplit.getStart();
long length = fileSplit.getLength();
InputStream in = fs.open( path );
IJobReporter jobReporter = new HadoopJobReporter( reporter );
jobReporter.setStatus( String.format( "Read file : %s" , path.toString() ) );
HiveReaderSetting hiveConfig = new HiveReaderSetting( fileSplit , job );
if ( hiveConfig.isVectorMode() ){
IVectorizedReaderSetting vectorizedSetting = new HiveVectorizedReaderSetting( fileSplit , job , hiveConfig );
return (RecordReader)new MDSHiveDirectVectorizedReader( in , fileLength , start , length , vectorizedSetting , jobReporter );
}
else{
return new MDSHiveLineReader( in , fileLength , start , length , hiveConfig , jobReporter , spreadCounter );
}
}
/**
* Constructor
* @param job
* @param split
* @throws IOException
*/
public LineDocRecordReader(Configuration job, FileSplit split)
throws IOException {
long start = split.getStart();
long end = start + split.getLength();
final Path file = split.getPath();
// open the file and seek to the start of the split
FileSystem fs = file.getFileSystem(job);
FSDataInputStream fileIn = fs.open(split.getPath());
InputStream in = fileIn;
boolean skipFirstLine = false;
if (start != 0) {
skipFirstLine = true; // wait till BufferedInputStream to skip
--start;
fileIn.seek(start);
}
this.in = new BufferedInputStream(in);
if (skipFirstLine) { // skip first line and re-establish "start".
start += LineDocRecordReader.readData(this.in, null, EOL);
}
this.start = start;
this.pos = start;
this.end = end;
}
public ParsedRecordReader ( FileSplit split,
Configuration conf,
Class<? extends Parser> parser_class,
Trees args ) throws IOException {
start = split.getStart();
end = start + split.getLength();
Path file = split.getPath();
FileSystem fs = file.getFileSystem(conf);
fsin = fs.open(split.getPath());
try {
parser = parser_class.newInstance();
} catch (Exception ex) {
throw new Error("Unrecognized parser:"+parser_class);
};
parser.initialize(args);
parser.open(fsin,start,end);
result = null;
}
/**
* @param clsName Input split class name.
* @param in Input stream.
* @param hosts Optional hosts.
* @return File block or {@code null} if it is not a {@link FileSplit} instance.
* @throws IgniteCheckedException If failed.
*/
@Nullable public static HadoopFileBlock readFileBlock(String clsName, FSDataInputStream in,
@Nullable String[] hosts) throws IgniteCheckedException {
if (!FileSplit.class.getName().equals(clsName))
return null;
FileSplit split = U.newInstance(FileSplit.class);
try {
split.readFields(in);
}
catch (IOException e) {
throw new IgniteCheckedException(e);
}
if (hosts == null)
hosts = EMPTY_HOSTS;
return new HadoopFileBlock(hosts, split.getPath().toUri(), split.getStart(), split.getLength());
}
public XmlRecordReader(FileSplit input, JobConf jobConf) throws IOException {
Configuration conf = jobConf;
this.startTag = conf.get(START_TAG_KEY).getBytes("utf-8");
this.endTag = conf.get(END_TAG_KEY).getBytes("utf-8");
FileSplit split = (FileSplit) input;
Path file = split.getPath();
CompressionCodecFactory compressionCodecs = new CompressionCodecFactory(conf);
CompressionCodec codec = compressionCodecs.getCodec(file);
FileSystem fs = file.getFileSystem(conf);
if (codec != null) {
this.fsin = new DataInputStream(codec.createInputStream(fs.open(file)));
//Data read only happens in first split and invalid other splits.
//This is to avoid reading duplicate data for compressed files.
this.start = (split.getStart() == 0) ? 0 : Long.MAX_VALUE;
this.end = Long.MAX_VALUE;
} else {
this.start = split.getStart();
this.end = this.start + split.getLength();
FSDataInputStream fileIn = fs.open(file);
fileIn.seek(this.start);
this.fsin = fileIn;
}
this.recordStartPos = this.start;
this.pos = this.start;
}
/**
* Constructor
* @param job
* @param split
* @throws IOException
*/
public LineDocRecordReader(Configuration job, FileSplit split)
throws IOException {
long start = split.getStart();
long end = start + split.getLength();
final Path file = split.getPath();
// open the file and seek to the start of the split
FileSystem fs = file.getFileSystem(job);
FSDataInputStream fileIn = fs.open(split.getPath());
InputStream in = fileIn;
boolean skipFirstLine = false;
if (start != 0) {
skipFirstLine = true; // wait till BufferedInputStream to skip
--start;
fileIn.seek(start);
}
this.in = new BufferedInputStream(in);
if (skipFirstLine) { // skip first line and re-establish "start".
start += LineDocRecordReader.readData(this.in, null, EOL);
}
this.start = start;
this.pos = start;
this.end = end;
}
@Override
public RecordReader<NullWritable, OrcLazyRow>
getRecordReader(InputSplit inputSplit, JobConf conf,
Reporter reporter) throws IOException {
ReaderWriterProfiler.setProfilerOptions(conf);
FileSplit fileSplit = (FileSplit) inputSplit;
Path path = fileSplit.getPath();
FileSystem fs = path.getFileSystem(conf);
reporter.setStatus(fileSplit.toString());
return new OrcRecordReader(
OrcFile.createReader(fs, path, conf),
conf,
fileSplit.getStart(),
fileSplit.getLength()
);
}
/**
* Create new multi-line json object reader.
*
* @param conf Hadoop context
* @param split HDFS split to start the reading from
* @throws IOException IOException when reading the file
*/
public JsonRecordReader(JobConf conf, FileSplit split) throws IOException {
this.jsonMemberName = conf.get(RECORD_MEMBER_IDENTIFIER);
this.maxObjectLength = conf.getInt(RECORD_MAX_LENGTH, Integer.MAX_VALUE);
start = split.getStart();
end = start + split.getLength();
final Path file = split.getPath();
compressionCodecs = new CompressionCodecFactory(conf);
final CompressionCodec codec = compressionCodecs.getCodec(file);
// openForWrite the file and seek to the start of the split
FileSystem fs = file.getFileSystem(conf);
FSDataInputStream fileIn = fs.open(split.getPath());
if (codec != null) {
is = codec.createInputStream(fileIn);
start = 0;
end = Long.MAX_VALUE;
} else {
if (start != 0) {
fileIn.seek(start);
}
is = fileIn;
}
parser = new PartitionedJsonParser(is);
this.pos = start;
}
public HoodieRealtimeFileSplit(FileSplit baseSplit, String basePath, List<String> deltaLogPaths, String maxCommitTime)
throws IOException {
super(baseSplit.getPath(), baseSplit.getStart(), baseSplit.getLength(), baseSplit.getLocations());
this.deltaLogPaths = deltaLogPaths;
this.maxCommitTime = maxCommitTime;
this.basePath = basePath;
}
/**
* Constructor that sets the configuration and file split.
*
* @param conf The job configuration.
* @param split The file split to read from.
*
* @throws IOException If an IO error occurs while initializing file split.
*/
public ArcRecordReader(Configuration conf, FileSplit split)
throws IOException {
Path path = split.getPath();
FileSystem fs = path.getFileSystem(conf);
fileLen = fs.getFileStatus(split.getPath()).getLen();
this.conf = conf;
this.in = fs.open(split.getPath());
this.splitStart = split.getStart();
this.splitEnd = splitStart + split.getLength();
this.splitLen = split.getLength();
in.seek(splitStart);
}
/**
* gets a ParquetInputSplit corresponding to a split given by Hive
*
* @param oldSplit The split given by Hive
* @param conf The JobConf of the Hive job
* @return a ParquetInputSplit corresponding to the oldSplit
* @throws IOException if the config cannot be enhanced or if the footer cannot be read from the file
*/
protected ParquetInputSplit getSplit(
final InputSplit oldSplit,
final JobConf conf
) throws IOException {
if (oldSplit instanceof FileSplit) {
FileSplit fileSplit = (FileSplit) oldSplit;
final long splitStart = fileSplit.getStart();
final long splitLength = fileSplit.getLength();
final Path finalPath = fileSplit.getPath();
final JobConf cloneJob = hiveBinding.pushProjectionsAndFilters(conf, finalPath.getParent());
final ParquetMetadata parquetMetadata = ParquetFileReader.readFooter(cloneJob, finalPath, SKIP_ROW_GROUPS);
final FileMetaData fileMetaData = parquetMetadata.getFileMetaData();
final ReadContext readContext =
new DataWritableReadSupport()
.init(cloneJob, fileMetaData.getKeyValueMetaData(), fileMetaData.getSchema());
schemaSize = MessageTypeParser.parseMessageType(
readContext.getReadSupportMetadata().get(DataWritableReadSupport.HIVE_SCHEMA_KEY)
).getFieldCount();
return new ParquetInputSplit(
finalPath,
splitStart,
splitStart + splitLength,
splitLength,
fileSplit.getLocations(),
null);
} else {
throw new IllegalArgumentException("Unknown split type: " + oldSplit);
}
}
protected AvroAsTextRecordReaderCopy(FileReader<T> reader, FileSplit split)
throws IOException {
this.reader = reader;
reader.sync(split.getStart()); // sync to start
this.start = reader.tell();
this.end = split.getStart() + split.getLength();
}
/**
* Constructor that sets the configuration and file split.
*
* @param conf The job configuration.
* @param split The file split to read from.
*
* @throws IOException If an IO error occurs while initializing file split.
*/
public ArcRecordReader(Configuration conf, FileSplit split)
throws IOException {
Path path = split.getPath();
FileSystem fs = path.getFileSystem(conf);
fileLen = fs.getFileStatus(split.getPath()).getLen();
this.conf = conf;
this.in = fs.open(split.getPath());
this.splitStart = split.getStart();
this.splitEnd = splitStart + split.getLength();
this.splitLen = split.getLength();
in.seek(splitStart);
}
@Override
public String getInputSplitSignature(InputSplit inputSplit) {
FileSplit baseSplit = (FileSplit) ((HCatSplit) inputSplit).getBaseSplit();
//file name(for intermediate table) + start pos + length
return baseSplit.getPath().getName() + "_" + baseSplit.getStart() + "_" + baseSplit.getLength();
}
/**
* Constructs a ChunkRecordReader instance.
*
* @param job the job configuration
* @param split contains the file name, begin byte of the split and the
* bytes length
* @throws IOException if an I/O error occurs when accessing the file or
* creating input stream to read from it
*/
public ChunkRecordReader(Configuration job, FileSplit split)
throws IOException, IncompatibleInputStreamException {
maxLineLength = job.getInt(MAX_LINE_LENGTH, Integer.MAX_VALUE);
validateLength(maxLineLength);
start = split.getStart();
end = start + split.getLength();
final Path file = split.getPath();
compressionCodecs = new CompressionCodecFactory(job);
codec = compressionCodecs.getCodec(file);
// openForWrite the file and seek to the start of the split
final FileSystem fs = file.getFileSystem(job);
fileIn = fs.open(file, ChunkReader.DEFAULT_BUFFER_SIZE);
fileLength = getInputStream().getFileLength();
if (isCompressedInput()) {
decompressor = CodecPool.getDecompressor(codec);
if (codec instanceof SplittableCompressionCodec) {
final SplitCompressionInputStream cIn = ((SplittableCompressionCodec) codec).createInputStream(
fileIn, decompressor, start, end,
SplittableCompressionCodec.READ_MODE.BYBLOCK);
in = new ChunkReader(cIn);
start = cIn.getAdjustedStart();
end = cIn.getAdjustedEnd();
filePosition = cIn; // take pos from compressed stream
} else {
in = new ChunkReader(codec.createInputStream(fileIn,
decompressor));
filePosition = fileIn;
}
} else {
fileIn.seek(start);
in = new ChunkReader(fileIn);
filePosition = fileIn;
}
/*
* If this is not the first split, we always throw away first record
* because we always (except the last split) read one extra line in
* next() method.
*/
if (start != 0) {
start += in.readLine(new ChunkWritable(), maxBytesToConsume(start));
}
this.pos = start;
}
/**
* Creates an Abstract Record Reader for Ethereum blocks
* @param split Split to use (assumed to be a file split)
* @param job Configuration:
* io.file.buffer.size: Size of in-memory specified in the given Configuration. If io.file.buffer.size is not specified the default buffersize will be used. Furthermore, one may specify hadoopcryptoledger.ethereumblockinputformat.maxblocksize, which defines the maximum size a Ethereum block may have. By default it is 1M). If you want to experiment with performance using DirectByteBuffer instead of HeapByteBuffer you can use "hadoopcryptoledeger.ethereumblockinputformat.usedirectbuffer" (default: false). Note that it might have some unwanted consequences such as circumwenting Yarn memory management. The option is experimental and might be removed in future versions.
* @param reporter Reporter
*
*
* @throws java.io.IOException in case of errors reading from the filestream provided by Hadoop
*
*/
public AbstractEthereumRecordReader(FileSplit split,JobConf job, Reporter reporter) throws IOException {
LOG.debug("Reading configuration");
// parse configuration
this.reporter=reporter;
this.conf=job;
this.maxSizeEthereumBlock=conf.getInt(AbstractEthereumRecordReader.CONF_MAXBLOCKSIZE,AbstractEthereumRecordReader.DEFAULT_MAXSIZE_ETHEREUMBLOCK);
this.bufferSize=conf.getInt(AbstractEthereumRecordReader.CONF_BUFFERSIZE,AbstractEthereumRecordReader.DEFAULT_BUFFERSIZE);
this.useDirectBuffer=conf.getBoolean(AbstractEthereumRecordReader.CONF_USEDIRECTBUFFER,AbstractEthereumRecordReader.DEFAULT_USEDIRECTBUFFER);
// Initialize start and end of split
start = split.getStart();
end = start + split.getLength();
final Path file = split.getPath();
codec = new CompressionCodecFactory(job).getCodec(file);
final FileSystem fs = file.getFileSystem(job);
fileIn = fs.open(file);
// open stream
if (isCompressedInput()) { // decompress
LOG.debug("Decompressing file");
decompressor = CodecPool.getDecompressor(codec);
if (codec instanceof SplittableCompressionCodec) {
LOG.debug("SplittableCompressionCodec");
final SplitCompressionInputStream cIn =((SplittableCompressionCodec)codec).createInputStream(fileIn, decompressor, start, end,SplittableCompressionCodec.READ_MODE.CONTINUOUS);
ebr = new EthereumBlockReader(cIn, this.maxSizeEthereumBlock,this.bufferSize,this.useDirectBuffer);
start = cIn.getAdjustedStart();
end = cIn.getAdjustedEnd();
filePosition = cIn; // take pos from compressed stream
} else {
LOG.debug("Not-splitable compression codec");
ebr = new EthereumBlockReader(codec.createInputStream(fileIn,decompressor), this.maxSizeEthereumBlock,this.bufferSize,this.useDirectBuffer);
filePosition = fileIn;
}
} else {
LOG.debug("Processing file without compression");
fileIn.seek(start);
ebr = new EthereumBlockReader(fileIn, this.maxSizeEthereumBlock,this.bufferSize,this.useDirectBuffer);
filePosition = fileIn;
}
// initialize reader
this.reporter.setStatus("Ready to read");
}
/**
* Creates an Abstract Record Reader for Bitcoin blocks
* @param split Split to use (assumed to be a file split)
* @param job Configuration:
* io.file.buffer.size: Size of in-memory specified in the given Configuration. If io.file.buffer.size is not specified the default buffersize (maximum size of a bitcoin block) will be used. The configuration hadoopcryptoledger.bitcoinblockinputformat.filter.magic allows specifying the magic identifier of the block. The magic is a comma-separated list of Hex-values (e.g. F9BEB4D9,FABFB5DA,0B110907,0B110907). The default magic is always F9BEB4D9. One needs to specify at least one magic, otherwise it will be difficult to find blocks in splits. Furthermore, one may specify hadoopcryptoledger.bitcoinblockinputformat.maxblocksize, which defines the maximum size a bitcoin block may have. By default it is 8M). If you want to experiment with performance using DirectByteBuffer instead of HeapByteBuffer you can use "hadoopcryptoledeger.bitcoinblockinputformat.usedirectbuffer" (default: false). Note that it might have some unwanted consequences such as circumwenting Yarn memory management. The option is experimental and might be removed in future versions.
* @param reporter Reporter
*
*
* @throws java.io.IOException in case of errors reading from the filestream provided by Hadoop
* @throws org.zuinnote.hadoop.bitcoin.format.exception.HadoopCryptoLedgerConfigurationException in case of an invalid HadoopCryptoLedger-specific configuration of the inputformat
* @throws org.zuinnote.hadoop.bitcoin.format.exception.BitcoinBlockReadException in case the Bitcoin data contains invalid blocks (e.g. magic might be different)
*
*/
public AbstractBitcoinRecordReader(FileSplit split,JobConf job, Reporter reporter) throws IOException,HadoopCryptoLedgerConfigurationException,BitcoinBlockReadException {
LOG.debug("Reading configuration");
// parse configuration
this.reporter=reporter;
this.conf=job;
this.maxSizeBitcoinBlock=conf.getInt(AbstractBitcoinRecordReader.CONF_MAXBLOCKSIZE,AbstractBitcoinRecordReader.DEFAULT_MAXSIZE_BITCOINBLOCK);
this.bufferSize=conf.getInt(AbstractBitcoinRecordReader.CONF_BUFFERSIZE,AbstractBitcoinRecordReader.DEFAULT_BUFFERSIZE);
this.specificMagic=conf.get(AbstractBitcoinRecordReader.CONF_FILTERMAGIC);
// we need to provide at least
if ((this.specificMagic==null) || (this.specificMagic.length()==0)) {
this.specificMagic=AbstractBitcoinRecordReader.DEFAULT_MAGIC;
}
if ((this.specificMagic!=null) && (this.specificMagic.length()>0)) {
this.specificMagicStringArray=specificMagic.split(",");
specificMagicByteArray=new byte[specificMagicStringArray.length][4]; // each magic is always 4 byte
for (int i=0;i<specificMagicStringArray.length;i++) {
byte[] currentMagicNo=BitcoinUtil.convertHexStringToByteArray(specificMagicStringArray[i]);
if (currentMagicNo.length!=4) {
throw new HadoopCryptoLedgerConfigurationException("Error: Configuration. Magic number has not a length of 4 bytes. Index: "+i);
}
specificMagicByteArray[i]=currentMagicNo;
}
}
this.useDirectBuffer=conf.getBoolean(AbstractBitcoinRecordReader.CONF_USEDIRECTBUFFER,AbstractBitcoinRecordReader.DEFAULT_USEDIRECTBUFFER);
this.readAuxPOW=conf.getBoolean(AbstractBitcoinRecordReader.CONF_READAUXPOW,AbstractBitcoinRecordReader.DEFAULT_READAUXPOW);
// Initialize start and end of split
start = split.getStart();
end = start + split.getLength();
final Path file = split.getPath();
codec = new CompressionCodecFactory(job).getCodec(file);
final FileSystem fs = file.getFileSystem(job);
fileIn = fs.open(file);
// open stream
if (isCompressedInput()) { // decompress
LOG.debug("Decompressing file");
decompressor = CodecPool.getDecompressor(codec);
if (codec instanceof SplittableCompressionCodec) {
LOG.debug("SplittableCompressionCodec");
final SplitCompressionInputStream cIn =((SplittableCompressionCodec)codec).createInputStream(fileIn, decompressor, start, end,SplittableCompressionCodec.READ_MODE.CONTINUOUS);
bbr = new BitcoinBlockReader(cIn, this.maxSizeBitcoinBlock,this.bufferSize,this.specificMagicByteArray,this.useDirectBuffer,this.readAuxPOW);
start = cIn.getAdjustedStart();
end = cIn.getAdjustedEnd();
filePosition = cIn; // take pos from compressed stream
} else {
LOG.debug("Not-splitable compression codec");
bbr = new BitcoinBlockReader(codec.createInputStream(fileIn,decompressor), this.maxSizeBitcoinBlock,this.bufferSize,this.specificMagicByteArray,this.useDirectBuffer,this.readAuxPOW);
filePosition = fileIn;
}
} else {
LOG.debug("Processing file without compression");
fileIn.seek(start);
bbr = new BitcoinBlockReader(fileIn, this.maxSizeBitcoinBlock,this.bufferSize,this.specificMagicByteArray,this.useDirectBuffer,this.readAuxPOW);
filePosition = fileIn;
}
// initialize reader
// seek to block start (for the case a block overlaps a split)
LOG.debug("Seeking to block start");
this.reporter.setStatus("Seeking Block start");
bbr.seekBlockStart();
this.reporter.setStatus("Ready to read");
}
public CopybookRecordReader(FileSplit genericSplit, JobConf job)
throws IOException {
try {
String cblPath = job.get(Const.COPYBOOK_INPUTFORMAT_CBL_HDFS_PATH_CONF);
if (cblPath == null) {
if (job != null) {
MapWork mrwork = Utilities.getMapWork(job);
if (mrwork == null) {
System.out.println("When running a client side hive job you have to set \"copybook.inputformat.cbl.hdfs.path\" before executing the query." );
System.out.println("When running a MR job we can get this from the hive TBLProperties" );
}
Map<String, PartitionDesc> map = mrwork.getPathToPartitionInfo();
for (Map.Entry<String, PartitionDesc> pathsAndParts : map.entrySet()) {
System.out.println("Hey");
Properties props = pathsAndParts.getValue().getProperties();
cblPath = props
.getProperty(Const.COPYBOOK_INPUTFORMAT_CBL_HDFS_PATH_CONF);
break;
}
}
}
FileSystem fs = FileSystem.get(job);
BufferedInputStream inputStream = new BufferedInputStream(
fs.open(new Path(cblPath)));
CobolCopybookLoader copybookInt = new CobolCopybookLoader();
externalRecord = copybookInt
.loadCopyBook(inputStream, "RR", CopybookLoader.SPLIT_NONE, 0,
"cp037", Convert.FMT_MAINFRAME, 0, null);
int fileStructure = Constants.IO_FIXED_LENGTH;
for (ExternalField field : externalRecord.getRecordFields()) {
recordByteLength += field.getLen();
}
// jump to the point in the split that the first whole record of split
// starts at
FileSplit split = (FileSplit) genericSplit;
start = split.getStart();
end = start + split.getLength();
final Path file = split.getPath();
BufferedInputStream fileIn = new BufferedInputStream(fs.open(split
.getPath()));
if (start != 0) {
pos = start - (start % recordByteLength) + recordByteLength;
fileIn.skip(pos);
}
ret = LineIOProvider.getInstance().getLineReader(
fileStructure,
LineIOProvider.getInstance().getLineProvider(fileStructure));
ret.open(fileIn, externalRecord);
} catch (Exception e) {
e.printStackTrace();
}
}
@Override
public org.apache.hadoop.mapreduce.InputSplit apply(final InputSplit input) {
final FileSplit split = (FileSplit) input;
return new org.apache.hadoop.mapreduce.lib.input.FileSplit(split.getPath(), split.getStart(),
split.getLength(), S3_SPLIT_HOST);
}