下面列出了org.apache.hadoop.mapreduce.lib.input.CombineFileSplit#getPath ( ) 实例代码,或者点击链接到github查看源代码,也可以在右侧发表评论。
public CombineFileLineRecordReader(CombineFileSplit split,
TaskAttemptContext context, Integer index) throws IOException {
this.path = split.getPath(index);
fs = this.path.getFileSystem(context.getConfiguration());
this.startOffset = split.getOffset(index);
this.end = startOffset + split.getLength(index);
boolean skipFirstLine = false;
//open the file
fileIn = fs.open(path);
if (startOffset != 0) {
skipFirstLine = true;
--startOffset;
fileIn.seek(startOffset);
}
reader = new LineReader(fileIn);
if (skipFirstLine) { // skip first line and re-establish "startOffset".
startOffset += reader.readLine(new Text(), 0,
(int)Math.min((long)Integer.MAX_VALUE, end - startOffset));
}
this.pos = startOffset;
}
public CombineFileLineRecordReader(CombineFileSplit split,
TaskAttemptContext context, Integer index) throws IOException {
this.path = split.getPath(index);
fs = this.path.getFileSystem(context.getConfiguration());
this.startOffset = split.getOffset(index);
this.end = startOffset + split.getLength(index);
boolean skipFirstLine = false;
//open the file
fileIn = fs.open(path);
if (startOffset != 0) {
skipFirstLine = true;
--startOffset;
fileIn.seek(startOffset);
}
reader = new LineReader(fileIn);
if (skipFirstLine) { // skip first line and re-establish "startOffset".
startOffset += reader.readLine(new Text(), 0,
(int)Math.min((long)Integer.MAX_VALUE, end - startOffset));
}
this.pos = startOffset;
}
@Override
public List<InputSplit> getSplits(JobContext job) throws IOException {
List<InputSplit> splits = super.getSplits(job);
long maxSize = MAX_SINGLE_FILE_MULTIPLIER * job.getConfiguration().getLong("mapreduce.input.fileinputformat.split.maxsize", 0);
if (maxSize > 0) {
List<InputSplit> newSplits = new ArrayList<>();
for (InputSplit spl : splits) {
CombineFileSplit cfs = (CombineFileSplit)spl;
for (int i=0; i<cfs.getNumPaths(); i++) {
long length = cfs.getLength();
if (length > maxSize) {
int replicas = (int)Math.ceil((double)length / (double)maxSize);
Path path = cfs.getPath(i);
for (int r=1; r<replicas; r++) {
newSplits.add(new CombineFileSplit(new Path[]{path}, new long[]{r}, new long[]{length}, cfs.getLocations()));
}
}
}
}
splits.addAll(newSplits);
}
return splits;
}
@Override
public void initialize(InputSplit split, TaskAttemptContext context) throws IOException, InterruptedException {
if (delegate != null) {
delegate.close();
}
if (split instanceof CombineFileSplit) {
CombineFileSplit combineSplit = (CombineFileSplit) split;
FileSplit fileSplit = new FileSplit(combineSplit.getPath(idx), combineSplit.getOffset(idx),
combineSplit.getLength(idx), combineSplit.getLocations());
delegate = getInputFormat().createRecordReader(fileSplit, context);
delegate.initialize(fileSplit, context);
} else {
throw new DatasetOperationException(
"Split is not a CombineFileSplit: %s:%s",
split.getClass().getCanonicalName(), split);
}
}
public MDSCombineSpreadReader( final CombineFileSplit split , final TaskAttemptContext context , final Integer index ) throws IOException{
Configuration config = context.getConfiguration();
Path path = split.getPath( index );
FileSystem fs = path.getFileSystem( config );
long fileLength = fs.getLength( path );
InputStream in = fs.open( path );
innerReader = new MDSSpreadReader();
innerReader.setStream( in , fileLength , 0 , fileLength );
}
private static Schema getSchema(CombineFileSplit split, TaskAttemptContext cx, Integer idx) throws IOException {
Schema schema = AvroJob.getInputKeySchema(cx.getConfiguration());
if (schema != null) {
return schema;
}
Path path = split.getPath(idx);
FileSystem fs = path.getFileSystem(cx.getConfiguration());
return AvroUtils.getSchemaFromDataFile(path, fs);
}
@SuppressWarnings("unused")
public SequenceFileRecordReaderWrapper(CombineFileSplit split, TaskAttemptContext context, Integer index)
throws IOException {
fileSplit = new FileSplit(split.getPath(index), split.getOffset(index), split.getLength(index),
split.getLocations());
delegate = new SequenceFileInputFormat<Writable, Text>().createRecordReader(fileSplit, context);
}
@Override
public void initialize(InputSplit split, TaskAttemptContext context) throws IOException, InterruptedException {
CombineFileSplit fSplit = (CombineFileSplit) split;
super.initialize(new FileSplit(fSplit.getPath(index), 0, fSplit.getLength(index), fSplit.getLocations()), context);
}
private static TypeDescription getSchema(CombineFileSplit split, TaskAttemptContext context, Integer idx)
throws IOException {
Path path = split.getPath(idx);
return OrcUtils.getTypeDescriptionFromFile(context.getConfiguration(), path);
}
private static RecordReader getRecordReaderFromFile(CombineFileSplit split, TaskAttemptContext context, Integer idx)
throws IOException {
Path path = split.getPath(idx);
return OrcUtils.getRecordReaderFromFile(context.getConfiguration(), path).rows();
}