下面列出了org.apache.hadoop.mapreduce.lib.input.CombineFileSplit#getLength ( ) 实例代码,或者点击链接到github查看源代码,也可以在右侧发表评论。
public CombineFileLineRecordReader(CombineFileSplit split,
TaskAttemptContext context, Integer index) throws IOException {
this.path = split.getPath(index);
fs = this.path.getFileSystem(context.getConfiguration());
this.startOffset = split.getOffset(index);
this.end = startOffset + split.getLength(index);
boolean skipFirstLine = false;
//open the file
fileIn = fs.open(path);
if (startOffset != 0) {
skipFirstLine = true;
--startOffset;
fileIn.seek(startOffset);
}
reader = new LineReader(fileIn);
if (skipFirstLine) { // skip first line and re-establish "startOffset".
startOffset += reader.readLine(new Text(), 0,
(int)Math.min((long)Integer.MAX_VALUE, end - startOffset));
}
this.pos = startOffset;
}
public CombineFileLineRecordReader(CombineFileSplit split,
TaskAttemptContext context, Integer index) throws IOException {
this.path = split.getPath(index);
fs = this.path.getFileSystem(context.getConfiguration());
this.startOffset = split.getOffset(index);
this.end = startOffset + split.getLength(index);
boolean skipFirstLine = false;
//open the file
fileIn = fs.open(path);
if (startOffset != 0) {
skipFirstLine = true;
--startOffset;
fileIn.seek(startOffset);
}
reader = new LineReader(fileIn);
if (skipFirstLine) { // skip first line and re-establish "startOffset".
startOffset += reader.readLine(new Text(), 0,
(int)Math.min((long)Integer.MAX_VALUE, end - startOffset));
}
this.pos = startOffset;
}
@Override
public List<InputSplit> getSplits(JobContext job) throws IOException {
List<InputSplit> splits = super.getSplits(job);
long maxSize = MAX_SINGLE_FILE_MULTIPLIER * job.getConfiguration().getLong("mapreduce.input.fileinputformat.split.maxsize", 0);
if (maxSize > 0) {
List<InputSplit> newSplits = new ArrayList<>();
for (InputSplit spl : splits) {
CombineFileSplit cfs = (CombineFileSplit)spl;
for (int i=0; i<cfs.getNumPaths(); i++) {
long length = cfs.getLength();
if (length > maxSize) {
int replicas = (int)Math.ceil((double)length / (double)maxSize);
Path path = cfs.getPath(i);
for (int r=1; r<replicas; r++) {
newSplits.add(new CombineFileSplit(new Path[]{path}, new long[]{r}, new long[]{length}, cfs.getLocations()));
}
}
}
}
splits.addAll(newSplits);
}
return splits;
}
@Override
public void initialize(InputSplit split, TaskAttemptContext context) throws IOException, InterruptedException {
if (delegate != null) {
delegate.close();
}
if (split instanceof CombineFileSplit) {
CombineFileSplit combineSplit = (CombineFileSplit) split;
FileSplit fileSplit = new FileSplit(combineSplit.getPath(idx), combineSplit.getOffset(idx),
combineSplit.getLength(idx), combineSplit.getLocations());
delegate = getInputFormat().createRecordReader(fileSplit, context);
delegate.initialize(fileSplit, context);
} else {
throw new DatasetOperationException(
"Split is not a CombineFileSplit: %s:%s",
split.getClass().getCanonicalName(), split);
}
}
void checkSplitEq(FileSystem fs, CombineFileSplit split, long bytes)
throws Exception {
long splitBytes = 0L;
HashSet<Path> uniq = new HashSet<Path>();
for (int i = 0; i < split.getNumPaths(); ++i) {
splitBytes += split.getLength(i);
assertTrue(
split.getLength(i) <= fs.getFileStatus(split.getPath(i)).getLen());
assertFalse(uniq.contains(split.getPath(i)));
uniq.add(split.getPath(i));
}
assertEquals(bytes, splitBytes);
}
public void testHfileSplitCompleteness() throws Exception {
cluster = initMiniCluster(CLUSTER_PORT, 1);
int count = 40;
HdfsSortedOplogOrganizer bucket1 = new HdfsSortedOplogOrganizer(
regionManager, 1);
ArrayList<TestEvent> items = new ArrayList<TestEvent>();
for (int i = 0; i < count; i++) {
items.add(new TestEvent(("key-" + i), ("value-" + System.nanoTime())));
}
bucket1.flush(items.iterator(), count);
Configuration conf = hdfsStore.getFileSystem().getConf();
GFInputFormat gfInputFormat = new GFInputFormat();
Job job = Job.getInstance(conf, "test");
conf = job.getConfiguration();
conf.set(GFInputFormat.INPUT_REGION, getName());
conf.set(GFInputFormat.HOME_DIR, testDataDir.getName());
conf.setBoolean(GFInputFormat.CHECKPOINT, false);
List<InputSplit> splits = gfInputFormat.getSplits(job);
assertTrue(1 < splits.size());
long lastBytePositionOfPrevious = 0;
for (InputSplit inputSplit : splits) {
CombineFileSplit split = (CombineFileSplit) inputSplit;
assertEquals(1, split.getPaths().length);
assertEquals(lastBytePositionOfPrevious, split.getOffset(0));
lastBytePositionOfPrevious += split.getLength();
assertEquals(1, split.getLocations().length);
}
Path bucketPath = new Path(regionPath, "1");
Path hopPath = new Path(bucketPath, bucket1.getSortedOplogs().iterator()
.next().get().getFileName());
FileStatus status = hdfsStore.getFileSystem().getFileStatus(hopPath);
assertEquals(status.getLen(), lastBytePositionOfPrevious);
}
void checkSplitEq(FileSystem fs, CombineFileSplit split, long bytes)
throws Exception {
long splitBytes = 0L;
HashSet<Path> uniq = new HashSet<Path>();
for (int i = 0; i < split.getNumPaths(); ++i) {
splitBytes += split.getLength(i);
assertTrue(
split.getLength(i) <= fs.getFileStatus(split.getPath(i)).getLen());
assertFalse(uniq.contains(split.getPath(i)));
uniq.add(split.getPath(i));
}
assertEquals(bytes, splitBytes);
}
public ParserPump(CombineFileSplit split, TaskAttemptContext context) {
this.context = context;
this.paths = split.getPaths();
this.sizes = split.getLengths();
this.offsets = split.getStartOffsets();
this.size = split.getLength();
Configuration conf = context.getConfiguration();
this.skipInvalid = conf.getBoolean(SKIP_INVALID_PROPERTY, false);
this.verifyDataTypeValues = conf.getBoolean(VERIFY_DATATYPE_VALUES_PROPERTY, false);
this.overrideRdfContext = conf.getBoolean(OVERRIDE_CONTEXT_PROPERTY, false);
this.defaultRdfContextPattern = conf.get(DEFAULT_CONTEXT_PROPERTY);
this.maxSize = MAX_SINGLE_FILE_MULTIPLIER * conf.getLong("mapreduce.input.fileinputformat.split.maxsize", 0);
}
public void testHfileSplitCompleteness() throws Exception {
cluster = initMiniCluster(CLUSTER_PORT, 1);
int count = 40;
HdfsSortedOplogOrganizer bucket1 = new HdfsSortedOplogOrganizer(
regionManager, 1);
ArrayList<TestEvent> items = new ArrayList<TestEvent>();
for (int i = 0; i < count; i++) {
items.add(new TestEvent(("key-" + i), ("value-" + System.nanoTime())));
}
bucket1.flush(items.iterator(), count);
Configuration conf = hdfsStore.getFileSystem().getConf();
GFInputFormat gfInputFormat = new GFInputFormat();
Job job = Job.getInstance(conf, "test");
conf = job.getConfiguration();
conf.set(GFInputFormat.INPUT_REGION, getName());
conf.set(GFInputFormat.HOME_DIR, testDataDir.getName());
conf.setBoolean(GFInputFormat.CHECKPOINT, false);
List<InputSplit> splits = gfInputFormat.getSplits(job);
assertTrue(1 < splits.size());
long lastBytePositionOfPrevious = 0;
for (InputSplit inputSplit : splits) {
CombineFileSplit split = (CombineFileSplit) inputSplit;
assertEquals(1, split.getPaths().length);
assertEquals(lastBytePositionOfPrevious, split.getOffset(0));
lastBytePositionOfPrevious += split.getLength();
assertEquals(1, split.getLocations().length);
}
Path bucketPath = new Path(regionPath, "1");
Path hopPath = new Path(bucketPath, bucket1.getSortedOplogs().iterator()
.next().get().getFileName());
FileStatus status = hdfsStore.getFileSystem().getFileStatus(hopPath);
assertEquals(status.getLen(), lastBytePositionOfPrevious);
}
@SuppressWarnings("unused")
public SequenceFileRecordReaderWrapper(CombineFileSplit split, TaskAttemptContext context, Integer index)
throws IOException {
fileSplit = new FileSplit(split.getPath(index), split.getOffset(index), split.getLength(index),
split.getLocations());
delegate = new SequenceFileInputFormat<Writable, Text>().createRecordReader(fileSplit, context);
}
@Override
public void initialize(InputSplit split, TaskAttemptContext context) throws IOException, InterruptedException {
CombineFileSplit fSplit = (CombineFileSplit) split;
super.initialize(new FileSplit(fSplit.getPath(index), 0, fSplit.getLength(index), fSplit.getLocations()), context);
}