下面列出了org.apache.hadoop.fs.FSDataInputStream#getPos ( ) 实例代码,或者点击链接到github查看源代码,也可以在右侧发表评论。
private long dumpFromOffset(PathData item, long offset) throws IOException {
long fileSize = item.refreshStatus().getLen();
if (offset > fileSize) return fileSize;
// treat a negative offset as relative to end of the file, floor of 0
if (offset < 0) {
offset = Math.max(fileSize + offset, 0);
}
FSDataInputStream in = item.fs.open(item.path);
try {
in.seek(offset);
// use conf so the system configured io block size is used
IOUtils.copyBytes(in, System.out, getConf(), false);
offset = in.getPos();
} finally {
in.close();
}
return offset;
}
@SuppressWarnings("unchecked")
private <T> T getSplitDetails(Path file, long offset)
throws IOException {
FileSystem fs = file.getFileSystem(conf);
FSDataInputStream inFile = fs.open(file);
inFile.seek(offset);
String className = StringInterner.weakIntern(Text.readString(inFile));
Class<T> cls;
try {
cls = (Class<T>) conf.getClassByName(className);
} catch (ClassNotFoundException ce) {
IOException wrap = new IOException("Split class " + className +
" not found");
wrap.initCause(ce);
throw wrap;
}
SerializationFactory factory = new SerializationFactory(conf);
Deserializer<T> deserializer =
(Deserializer<T>) factory.getDeserializer(cls);
deserializer.open(inFile);
T split = deserializer.deserialize(null);
long pos = inFile.getPos();
getCounters().findCounter(
TaskCounter.SPLIT_RAW_BYTES).increment(pos - offset);
inFile.close();
return split;
}
@SuppressWarnings("unchecked")
private <T> T getSplitDetails(Path file, long offset)
throws IOException {
FileSystem fs = file.getFileSystem(conf);
FSDataInputStream inFile = fs.open(file);
inFile.seek(offset);
String className = StringInterner.weakIntern(Text.readString(inFile));
Class<T> cls;
try {
cls = (Class<T>) conf.getClassByName(className);
} catch (ClassNotFoundException ce) {
IOException wrap = new IOException("Split class " + className +
" not found");
wrap.initCause(ce);
throw wrap;
}
SerializationFactory factory = new SerializationFactory(conf);
Deserializer<T> deserializer =
(Deserializer<T>) factory.getDeserializer(cls);
deserializer.open(inFile);
T split = deserializer.deserialize(null);
long pos = inFile.getPos();
getCounters().findCounter(
TaskCounter.SPLIT_RAW_BYTES).increment(pos - offset);
inFile.close();
return split;
}
@SuppressWarnings("unchecked")
public static InputSplit getOldSplitDetailsFromDisk(TaskSplitIndex splitMetaInfo,
JobConf jobConf, TezCounter splitBytesCounter) throws IOException {
Path file = new Path(splitMetaInfo.getSplitLocation());
FileSystem fs = FileSystem.getLocal(jobConf);
file = fs.makeQualified(file);
LOG.info("Reading input split file from : " + file);
long offset = splitMetaInfo.getStartOffset();
FSDataInputStream inFile = fs.open(file);
inFile.seek(offset);
String className = Text.readString(inFile);
Class<org.apache.hadoop.mapred.InputSplit> cls;
try {
cls = (Class<org.apache.hadoop.mapred.InputSplit>) jobConf.getClassByName(className);
} catch (ClassNotFoundException ce) {
IOException wrap = new IOException("Split class " + className + " not found");
wrap.initCause(ce);
throw wrap;
}
SerializationFactory factory = new SerializationFactory(jobConf);
Deserializer<org.apache.hadoop.mapred.InputSplit> deserializer = (Deserializer<org.apache.hadoop.mapred.InputSplit>) factory
.getDeserializer(cls);
deserializer.open(inFile);
org.apache.hadoop.mapred.InputSplit split = deserializer.deserialize(null);
long pos = inFile.getPos();
if (splitBytesCounter != null) {
splitBytesCounter.increment(pos - offset);
}
inFile.close();
return split;
}
/**
* Constructor to map a file containing Succinct data structures via stream.
*
* @param filePath Path of the file.
* @param conf Configuration for the filesystem.
* @throws IOException
*/
public SuccinctIndexedFileStream(Path filePath, Configuration conf) throws IOException {
super(filePath, conf);
FSDataInputStream is = getStream(filePath);
is.seek(endOfFileStream);
int len = is.readInt();
offsets = new int[len];
for (int i = 0; i < len; i++) {
offsets[i] = is.readInt();
}
endOfIndexedFileStream = is.getPos();
is.close();
}
public FSDataInputChannel(FSDataInputStream inputStream) throws IOException {
if (inputStream.getWrappedStream() instanceof ByteBufferReadable) {
this.isDirectRead = true;
} else {
/* LocalFileSystem, S3 does not support ByteBufferReadable */
this.channel = Channels.newChannel(inputStream);
}
this.inputStream = inputStream;
this.size = inputStream.getPos() + inputStream.available();
}
private String initInternal(FSDataInputStream stream, boolean isFirst)
throws IOException {
close();
long expectedPos = PB_WAL_MAGIC.length;
if (stream == null) {
stream = fs.open(path);
stream.seek(expectedPos);
}
if (stream.getPos() != expectedPos) {
throw new IOException("The stream is at invalid position: " + stream.getPos());
}
// Initialize metadata or, when we reset, just skip the header.
WALProtos.WALHeader.Builder builder = WALProtos.WALHeader.newBuilder();
WALHdrContext hdrCtxt = readHeader(builder, stream);
WALHdrResult walHdrRes = hdrCtxt.getResult();
if (walHdrRes == WALHdrResult.EOF) {
throw new EOFException("Couldn't read WAL PB header");
}
if (walHdrRes == WALHdrResult.UNKNOWN_WRITER_CLS) {
throw new IOException("Got unknown writer class: " + builder.getWriterClsName());
}
if (isFirst) {
WALProtos.WALHeader header = builder.build();
this.hasCompression = header.hasHasCompression() && header.getHasCompression();
this.hasTagCompression = header.hasHasTagCompression() && header.getHasTagCompression();
}
this.inputStream = stream;
this.walEditsStopOffset = this.fileLength;
long currentPosition = stream.getPos();
trailerPresent = setTrailerIfPresent();
this.seekOnFs(currentPosition);
if (LOG.isTraceEnabled()) {
LOG.trace("After reading the trailer: walEditsStopOffset: " + this.walEditsStopOffset
+ ", fileLength: " + this.fileLength + ", " + "trailerPresent: " + (trailerPresent ? "true, size: " + trailer.getSerializedSize() : "false") + ", currentPosition: " + currentPosition);
}
codecClsName = hdrCtxt.getCellCodecClsName();
return hdrCtxt.getCellCodecClsName();
}
private void loadIndex(Path path) throws IOException {
FSDataInputStream inputStream = _fileSystem.open(path);
byte[] buf = new byte[MAGIC.length];
inputStream.readFully(buf);
if (!Arrays.equals(MAGIC, buf)) {
throw new IOException("File [" + path + "] not a " + BLUR_KEY_VALUE + " file.");
}
int version = inputStream.readInt();
if (version == 1) {
long fileLength = HdfsUtils.getFileLength(_fileSystem, path, inputStream);
Operation operation = new Operation();
try {
while (inputStream.getPos() < fileLength) {
try {
operation.readFields(inputStream);
} catch (IOException e) {
// End of sync point found
return;
}
loadIndex(path, operation);
}
} finally {
inputStream.close();
}
} else {
throw new IOException("Unknown version [" + version + "]");
}
}
public CBZip2InputStream(FSDataInputStream zStream, int blockSize, long end)
throws IOException {
endOffsetOfSplit = end;
// initialize retPos to the beginning of the current InputSplit
// see comments in getPos() to understand how this is used.
retPos = zStream.getPos();
ll8 = null;
tt = null;
checkComputedCombinedCRC = blockSize == -1;
bsSetStream(zStream);
initialize(blockSize);
initBlock(blockSize != -1);
setupBlock();
}
@SuppressWarnings("unchecked")
public static org.apache.hadoop.mapreduce.InputSplit getNewSplitDetailsFromDisk(
TaskSplitIndex splitMetaInfo, JobConf jobConf, TezCounter splitBytesCounter)
throws IOException {
Path file = new Path(splitMetaInfo.getSplitLocation());
long offset = splitMetaInfo.getStartOffset();
// Split information read from local filesystem.
FileSystem fs = FileSystem.getLocal(jobConf);
file = fs.makeQualified(file);
LOG.info("Reading input split file from : " + file);
FSDataInputStream inFile = fs.open(file);
inFile.seek(offset);
String className = Text.readString(inFile);
Class<org.apache.hadoop.mapreduce.InputSplit> cls;
try {
cls = (Class<org.apache.hadoop.mapreduce.InputSplit>) jobConf.getClassByName(className);
} catch (ClassNotFoundException ce) {
IOException wrap = new IOException("Split class " + className + " not found");
wrap.initCause(ce);
throw wrap;
}
SerializationFactory factory = new SerializationFactory(jobConf);
Deserializer<org.apache.hadoop.mapreduce.InputSplit> deserializer = (Deserializer<org.apache.hadoop.mapreduce.InputSplit>) factory
.getDeserializer(cls);
deserializer.open(inFile);
org.apache.hadoop.mapreduce.InputSplit split = deserializer.deserialize(null);
long pos = inFile.getPos();
if (splitBytesCounter != null) {
splitBytesCounter.increment(pos - offset);
}
inFile.close();
return split;
}
@SuppressWarnings("unchecked")
public static InputSplit getOldSplitDetailsFromDisk(TaskSplitIndex splitMetaInfo,
JobConf jobConf, TezCounter splitBytesCounter) throws IOException {
Path file = new Path(splitMetaInfo.getSplitLocation());
FileSystem fs = FileSystem.getLocal(jobConf);
file = fs.makeQualified(file);
LOG.info("Reading input split file from : " + file);
long offset = splitMetaInfo.getStartOffset();
FSDataInputStream inFile = fs.open(file);
inFile.seek(offset);
String className = Text.readString(inFile);
Class<org.apache.hadoop.mapred.InputSplit> cls;
try {
cls = (Class<org.apache.hadoop.mapred.InputSplit>) jobConf.getClassByName(className);
} catch (ClassNotFoundException ce) {
IOException wrap = new IOException("Split class " + className + " not found");
wrap.initCause(ce);
throw wrap;
}
SerializationFactory factory = new SerializationFactory(jobConf);
Deserializer<org.apache.hadoop.mapred.InputSplit> deserializer = (Deserializer<org.apache.hadoop.mapred.InputSplit>) factory
.getDeserializer(cls);
deserializer.open(inFile);
org.apache.hadoop.mapred.InputSplit split = deserializer.deserialize(null);
long pos = inFile.getPos();
if (splitBytesCounter != null) {
splitBytesCounter.increment(pos - offset);
}
inFile.close();
return split;
}
@SuppressWarnings("unchecked")
public static org.apache.hadoop.mapreduce.InputSplit getNewSplitDetailsFromDisk(
TaskSplitIndex splitMetaInfo, JobConf jobConf, TezCounter splitBytesCounter)
throws IOException {
Path file = new Path(splitMetaInfo.getSplitLocation());
long offset = splitMetaInfo.getStartOffset();
// Split information read from local filesystem.
FileSystem fs = FileSystem.getLocal(jobConf);
file = fs.makeQualified(file);
LOG.info("Reading input split file from : " + file);
FSDataInputStream inFile = fs.open(file);
inFile.seek(offset);
String className = Text.readString(inFile);
Class<org.apache.hadoop.mapreduce.InputSplit> cls;
try {
cls = (Class<org.apache.hadoop.mapreduce.InputSplit>) jobConf.getClassByName(className);
} catch (ClassNotFoundException ce) {
IOException wrap = new IOException("Split class " + className + " not found");
wrap.initCause(ce);
throw wrap;
}
SerializationFactory factory = new SerializationFactory(jobConf);
Deserializer<org.apache.hadoop.mapreduce.InputSplit> deserializer = (Deserializer<org.apache.hadoop.mapreduce.InputSplit>) factory
.getDeserializer(cls);
deserializer.open(inFile);
org.apache.hadoop.mapreduce.InputSplit split = deserializer.deserialize(null);
long pos = inFile.getPos();
if (splitBytesCounter != null) {
splitBytesCounter.increment(pos - offset);
}
inFile.close();
return split;
}
public void readSequentialDirect() throws Exception {
System.out.println("reading sequential file in direct mode " + path);
Configuration conf = new Configuration();
FileSystem fs = FileSystem.get(conf);
FileStatus status = fs.getFileStatus(path);
FSDataInputStream instream = fs.open(path);
ByteBuffer buf = ByteBuffer.allocateDirect(size);
buf.clear();
double sumbytes = 0;
double ops = 0;
System.out.println("file capacity " + status.getLen());
System.out.println("read size " + size);
System.out.println("operations " + loop);
long start = System.currentTimeMillis();
while (ops < loop) {
buf.clear();
double ret = (double) instream.read(buf);
if (ret > 0) {
sumbytes = sumbytes + ret;
ops = ops + 1.0;
} else {
ops = ops + 1.0;
if (instream.getPos() == 0){
break;
} else {
instream.seek(0);
}
}
}
long end = System.currentTimeMillis();
double executionTime = ((double) (end - start)) / 1000.0;
double throughput = 0.0;
double latency = 0.0;
double sumbits = sumbytes * 8.0;
if (executionTime > 0) {
throughput = sumbits / executionTime / 1024.0 / 1024.0;
latency = 1000000.0 * executionTime / ops;
}
System.out.println("execution time " + executionTime);
System.out.println("ops " + ops);
System.out.println("sumbytes " + sumbytes);
System.out.println("throughput " + throughput);
System.out.println("latency " + latency);
System.out.println("closing stream");
instream.close();
fs.close();
}
public void readSequentialHeap() throws Exception {
System.out.println("reading sequential file in heap mode " + path);
Configuration conf = new Configuration();
FileSystem fs = FileSystem.get(conf);
FileStatus status = fs.getFileStatus(path);
FSDataInputStream instream = fs.open(path);
byte[] buf = new byte[size];
double sumbytes = 0;
double ops = 0;
System.out.println("file capacity " + status.getLen());
System.out.println("read size " + size);
System.out.println("operations " + loop);
long start = System.currentTimeMillis();
while (ops < loop) {
double ret = (double) this.read(instream, buf);
if (ret > 0) {
sumbytes = sumbytes + ret;
ops = ops + 1.0;
} else {
ops = ops + 1.0;
if (instream.getPos() == 0){
break;
} else {
instream.seek(0);
}
}
}
long end = System.currentTimeMillis();
double executionTime = ((double) (end - start)) / 1000.0;
double throughput = 0.0;
double latency = 0.0;
double sumbits = sumbytes * 8.0;
if (executionTime > 0) {
throughput = sumbits / executionTime / 1024.0 / 1024.0;
latency = 1000000.0 * executionTime / ops;
}
System.out.println("execution time " + executionTime);
System.out.println("ops " + ops);
System.out.println("sumbytes " + sumbytes);
System.out.println("throughput " + throughput);
System.out.println("latency " + latency);
System.out.println("closing stream");
instream.close();
fs.close();
}
/**
* read chunks into buffer repeatedly until total of VisibleLen byte are read.
* Return total number of bytes read
*/
private long readUntilEnd(FSDataInputStream in, byte[] buffer, long size,
String fname, long pos, long visibleLen, boolean positionReadOption)
throws IOException {
if (pos >= visibleLen || visibleLen <= 0)
return 0;
int chunkNumber = 0;
long totalByteRead = 0;
long currentPosition = pos;
int byteRead = 0;
long byteLeftToRead = visibleLen - pos;
int byteToReadThisRound = 0;
if (!positionReadOption) {
in.seek(pos);
currentPosition = in.getPos();
}
if (verboseOption)
LOG.info("reader begin: position: " + pos + " ; currentOffset = "
+ currentPosition + " ; bufferSize =" + buffer.length
+ " ; Filename = " + fname);
try {
while (byteLeftToRead > 0 && currentPosition < visibleLen) {
byteToReadThisRound = (int) (byteLeftToRead >= buffer.length
? buffer.length : byteLeftToRead);
if (positionReadOption) {
byteRead = in.read(currentPosition, buffer, 0, byteToReadThisRound);
} else {
byteRead = in.read(buffer, 0, byteToReadThisRound);
}
if (byteRead <= 0)
break;
chunkNumber++;
totalByteRead += byteRead;
currentPosition += byteRead;
byteLeftToRead -= byteRead;
if (verboseOption) {
LOG.info("reader: Number of byte read: " + byteRead
+ " ; totalByteRead = " + totalByteRead + " ; currentPosition="
+ currentPosition + " ; chunkNumber =" + chunkNumber
+ "; File name = " + fname);
}
}
} catch (IOException e) {
throw new IOException(
"#### Exception caught in readUntilEnd: reader currentOffset = "
+ currentPosition + " ; totalByteRead =" + totalByteRead
+ " ; latest byteRead = " + byteRead + "; visibleLen= "
+ visibleLen + " ; bufferLen = " + buffer.length
+ " ; Filename = " + fname, e);
}
if (verboseOption)
LOG.info("reader end: position: " + pos + " ; currentOffset = "
+ currentPosition + " ; totalByteRead =" + totalByteRead
+ " ; Filename = " + fname);
return totalByteRead;
}
/**
* read chunks into buffer repeatedly until total of VisibleLen byte are read.
* Return total number of bytes read
*/
private long readUntilEnd(FSDataInputStream in, byte[] buffer, long size,
String fname, long pos, long visibleLen, boolean positionReadOption)
throws IOException {
if (pos >= visibleLen || visibleLen <= 0)
return 0;
int chunkNumber = 0;
long totalByteRead = 0;
long currentPosition = pos;
int byteRead = 0;
long byteLeftToRead = visibleLen - pos;
int byteToReadThisRound = 0;
if (!positionReadOption) {
in.seek(pos);
currentPosition = in.getPos();
}
if (verboseOption)
LOG.info("reader begin: position: " + pos + " ; currentOffset = "
+ currentPosition + " ; bufferSize =" + buffer.length
+ " ; Filename = " + fname);
try {
while (byteLeftToRead > 0 && currentPosition < visibleLen) {
byteToReadThisRound = (int) (byteLeftToRead >= buffer.length
? buffer.length : byteLeftToRead);
if (positionReadOption) {
byteRead = in.read(currentPosition, buffer, 0, byteToReadThisRound);
} else {
byteRead = in.read(buffer, 0, byteToReadThisRound);
}
if (byteRead <= 0)
break;
chunkNumber++;
totalByteRead += byteRead;
currentPosition += byteRead;
byteLeftToRead -= byteRead;
if (verboseOption) {
LOG.info("reader: Number of byte read: " + byteRead
+ " ; totalByteRead = " + totalByteRead + " ; currentPosition="
+ currentPosition + " ; chunkNumber =" + chunkNumber
+ "; File name = " + fname);
}
}
} catch (IOException e) {
throw new IOException(
"#### Exception caught in readUntilEnd: reader currentOffset = "
+ currentPosition + " ; totalByteRead =" + totalByteRead
+ " ; latest byteRead = " + byteRead + "; visibleLen= "
+ visibleLen + " ; bufferLen = " + buffer.length
+ " ; Filename = " + fname, e);
}
if (verboseOption)
LOG.info("reader end: position: " + pos + " ; currentOffset = "
+ currentPosition + " ; totalByteRead =" + totalByteRead
+ " ; Filename = " + fname);
return totalByteRead;
}
public void readSequentialDirect() throws Exception {
System.out.println("reading sequential file in direct mode " + path);
Configuration conf = new Configuration();
FileSystem fs = FileSystem.get(conf);
FileStatus status = fs.getFileStatus(path);
FSDataInputStream instream = fs.open(path);
ByteBuffer buf = ByteBuffer.allocateDirect(size);
buf.clear();
double sumbytes = 0;
double ops = 0;
System.out.println("file capacity " + status.getLen());
System.out.println("read size " + size);
System.out.println("operations " + loop);
long start = System.currentTimeMillis();
while (ops < loop) {
buf.clear();
double ret = (double) instream.read(buf);
if (ret > 0) {
sumbytes = sumbytes + ret;
ops = ops + 1.0;
} else {
ops = ops + 1.0;
if (instream.getPos() == 0){
break;
} else {
instream.seek(0);
}
}
}
long end = System.currentTimeMillis();
double executionTime = ((double) (end - start)) / 1000.0;
double throughput = 0.0;
double latency = 0.0;
double sumbits = sumbytes * 8.0;
if (executionTime > 0) {
throughput = sumbits / executionTime / 1024.0 / 1024.0;
latency = 1000000.0 * executionTime / ops;
}
System.out.println("execution time " + executionTime);
System.out.println("ops " + ops);
System.out.println("sumbytes " + sumbytes);
System.out.println("throughput " + throughput);
System.out.println("latency " + latency);
System.out.println("closing stream");
instream.close();
fs.close();
}
public void readSequentialHeap() throws Exception {
System.out.println("reading sequential file in heap mode " + path);
Configuration conf = new Configuration();
FileSystem fs = FileSystem.get(conf);
FileStatus status = fs.getFileStatus(path);
FSDataInputStream instream = fs.open(path);
byte[] buf = new byte[size];
double sumbytes = 0;
double ops = 0;
System.out.println("file capacity " + status.getLen());
System.out.println("read size " + size);
System.out.println("operations " + loop);
long start = System.currentTimeMillis();
while (ops < loop) {
double ret = (double) this.read(instream, buf);
if (ret > 0) {
sumbytes = sumbytes + ret;
ops = ops + 1.0;
} else {
ops = ops + 1.0;
if (instream.getPos() == 0){
break;
} else {
instream.seek(0);
}
}
}
long end = System.currentTimeMillis();
double executionTime = ((double) (end - start)) / 1000.0;
double throughput = 0.0;
double latency = 0.0;
double sumbits = sumbytes * 8.0;
if (executionTime > 0) {
throughput = sumbits / executionTime / 1024.0 / 1024.0;
latency = 1000000.0 * executionTime / ops;
}
System.out.println("execution time " + executionTime);
System.out.println("ops " + ops);
System.out.println("sumbytes " + sumbytes);
System.out.println("throughput " + throughput);
System.out.println("latency " + latency);
System.out.println("closing stream");
instream.close();
fs.close();
}
/**
* Recovers a file which exists on the disk. If the length of the file is not same as the
* length which the operator remembers then the file is truncated. <br/>
* When always writing to a temporary file, then a file is restored even when the length is same as what the
* operator remembers however this is done only for files which had open streams that weren't closed before
* failure.
*
* @param filename name of the actual file.
* @param partFileName name of the part file. When not rolling this is same as filename; otherwise this is the
* latest open part file name.
* @param filepath path of the file. When always writing to temp file, this is the path of the temp file;
* otherwise path of the actual file.
* @throws IOException
*/
private void recoverFile(String filename, String partFileName, Path filepath) throws IOException
{
LOG.debug("path exists {}", filepath);
long offset = endOffsets.get(filename).longValue();
FSDataInputStream inputStream = fs.open(filepath);
FileStatus status = fs.getFileStatus(filepath);
if (status.getLen() != offset) {
LOG.info("path corrupted {} {} {}", filepath, offset, status.getLen());
byte[] buffer = new byte[COPY_BUFFER_SIZE];
String recoveryFileName = partFileName + '.' + System.currentTimeMillis() + TMP_EXTENSION;
Path recoveryFilePath = new Path(filePath + Path.SEPARATOR + recoveryFileName);
FSDataOutputStream fsOutput = openStream(recoveryFilePath, false);
while (inputStream.getPos() < offset) {
long remainingBytes = offset - inputStream.getPos();
int bytesToWrite = remainingBytes < COPY_BUFFER_SIZE ? (int)remainingBytes : COPY_BUFFER_SIZE;
inputStream.read(buffer);
fsOutput.write(buffer, 0, bytesToWrite);
}
flush(fsOutput);
fsOutput.close();
inputStream.close();
LOG.debug("active {} recovery {} ", filepath, recoveryFilePath);
if (alwaysWriteToTmp) {
//recovery file is used as the new tmp file and we cannot delete the old tmp file because when the operator
//is restored to an earlier check-pointed window, it will look for an older tmp.
fileNameToTmpName.put(partFileName, recoveryFileName);
} else {
LOG.debug("recovery path {} actual path {} ", recoveryFilePath, status.getPath());
rename(recoveryFilePath, status.getPath());
}
} else {
if (alwaysWriteToTmp && filesWithOpenStreams.contains(filename)) {
String currentTmp = partFileName + '.' + System.currentTimeMillis() + TMP_EXTENSION;
FSDataOutputStream outputStream = openStream(new Path(filePath + Path.SEPARATOR + currentTmp), false);
IOUtils.copy(inputStream, outputStream);
streamsCache.put(filename, new FSFilterStreamContext(outputStream));
fileNameToTmpName.put(partFileName, currentTmp);
}
inputStream.close();
}
}
/**
* Generate splits.
*
* @param job refers to JobContext that is being used to read the configurations of the job that ran
* @param minSize refers to the minimum file block size.
* @param maxSize refers to the maximum file block size.
* @param splits refers to a list of splits that are being generated.
* @param file refers to the FileStatus required to determine block size,length,allocations.
* @throws IOException Signals that an I/O exception has occurred.
*/
private void generateSplits(JobContext job, long minSize, long maxSize,
List<InputSplit> splits, FileStatus file) throws IOException {
Path path = file.getPath();
int numOfRecordsInCurrentSplit = 0;
int numOfRecordsInPreviousSplit = 0;
FileSystem fs = path.getFileSystem(job.getConfiguration());
long length = file.getLen();
BlockLocation[] blkLocations = fs.getFileBlockLocations(file, 0,
length);
FSDataInputStream fsin = null ;
if ((length != 0) && isSplitable(job, path)) {
long blockSize = file.getBlockSize();
long splitSize = computeSplitSize(blockSize, minSize, maxSize);
long bytesRemaining = length;
// checking the occurrences of the record separator in current
// split
recordSeparator = job.getConfiguration()
.get(DataValidationConstants.RECORD_SEPARATOR)
.getBytes();
while (((double) bytesRemaining) / splitSize > SPLIT_SLOP) {
int blkIndex = getBlockIndex(blkLocations, length
- bytesRemaining);
long start = length - bytesRemaining;
long end = start + splitSize;
try{
fsin = fs.open(path);
fsin.seek(start);
long pos = start;
int b = 0;
int bufferPos = 0;
while (true) {
b = fsin.read();
pos = fsin.getPos();
if (b == -1) {
break;}
if (b == recordSeparator[bufferPos]) {
bufferPos++;
if (bufferPos == recordSeparator.length) {
numOfRecordsInCurrentSplit++;
bufferPos = 0;
if (pos > end) {
break;
}
}
} else {
// reset the value of buffer position to zero
bufferPos = 0;
}
}}finally{
if(fsin != null){
fsin.close();
}
}
splits.add(new DataValidationFileSplit(path, start,
splitSize, numOfRecordsInPreviousSplit,
blkLocations[blkIndex].getHosts()));
bytesRemaining -= splitSize;
numOfRecordsInPreviousSplit = numOfRecordsInCurrentSplit;
numOfRecordsInCurrentSplit = 0;
}
addSplitIfBytesRemaining(splits, path, numOfRecordsInPreviousSplit,
length, blkLocations, bytesRemaining);
} else if (length != 0) {
splits.add(new DataValidationFileSplit(path, 0, length,
numOfRecordsInPreviousSplit, blkLocations[0].getHosts()));
} else {
splits.add(new DataValidationFileSplit(path, 0, length,
numOfRecordsInPreviousSplit, new String[0]));
}
}