下面列出了org.apache.hadoop.fs.FileStatus#getBlockSize ( ) 实例代码,或者点击链接到github查看源代码,也可以在右侧发表评论。
@Override
public FileStatus getFileStatus(Path f) throws IOException {
// it's the RawFS in place which messes things up as it dynamically returns the permissions...
// workaround by doing a copy
FileStatus fs = super.getFileStatus(f);
// work-around for Hive 0.14
if (SCRATCH_DIR.equals(f.toString())) {
System.out.println("Faking scratch dir permissions on Windows...");
return new FileStatus(fs.getLen(), fs.isDir(), fs.getReplication(), fs.getBlockSize(),
fs.getModificationTime(), fs.getAccessTime(), SCRATCH_DIR_PERMS, fs.getOwner(), fs.getGroup(),
fs.getPath());
// this doesn't work since the RawFS impl has its own algo that does the lookup dynamically
//fs.getPermission().fromShort((short) 777);
}
return fs;
}
private FSDataOutputStream create(Path f, Reporter reporter,
FileStatus srcstat) throws IOException {
if (destFileSys.exists(f)) {
destFileSys.delete(f, false);
}
if (!preserve_status) {
return destFileSys.create(f, true, sizeBuf, reporter);
}
FsPermission permission = preseved.contains(FileAttribute.PERMISSION)?
srcstat.getPermission(): null;
short replication = preseved.contains(FileAttribute.REPLICATION)?
srcstat.getReplication(): destFileSys.getDefaultReplication(f);
long blockSize = preseved.contains(FileAttribute.BLOCK_SIZE)?
srcstat.getBlockSize(): destFileSys.getDefaultBlockSize(f);
return destFileSys.create(f, permission, true, sizeBuf, replication,
blockSize, reporter);
}
long shortestBlockLength(int stripeIndex, FileStatus stat, int stripeLength) {
final long blockSize = stat.getBlockSize();
final long stripeBytes = stripeLength * blockSize;
int numStripes = (int) Math.ceil(stat.getLen() * 1.0 / stripeBytes);
if (stripeIndex == numStripes - 1) {
long remainder = stat.getLen() % blockSize;
return (remainder == 0) ? blockSize : remainder;
} else {
return blockSize;
}
}
public static void concat(String dir) throws IOException {
String directory = NodeConfig.HDFS_PATH + dir;
Configuration conf = new Configuration();
DistributedFileSystem fs = (DistributedFileSystem)FileSystem.get(URI.create(directory), conf);
FileStatus fileList[] = fs.listStatus(new Path(directory));
if (fileList.length>=2) {
ArrayList<Path> srcs = new ArrayList<Path>(fileList.length);
for (FileStatus fileStatus : fileList) {
if ( fileStatus.isFile() &&
(fileStatus.getLen()&~fileStatus.getBlockSize())<fileStatus.getBlockSize()/2 ) {
srcs.add(fileStatus.getPath());
}
}
if (srcs.size()>=2) {
Logger.println("come to here");
Path appended = srcs.get(0);
Path[] sources = new Path[srcs.size()-1];
for (int i=0; i<srcs.size()-1; i++) {
sources[i] = srcs.get(i+1);
}
Logger.println(fs==null);
Logger.println(appended==null);
Logger.println(sources==null);
fs.concat(appended, sources);
Logger.println("concat to : " + appended.getName());
Logger.println(Arrays.toString(sources));
}
fs.close();
}
}
private static void checkNumCachedReplicas(final DistributedFileSystem dfs,
final List<Path> paths, final int expectedBlocks,
final int expectedReplicas)
throws Exception {
int numCachedBlocks = 0;
int numCachedReplicas = 0;
for (Path p: paths) {
final FileStatus f = dfs.getFileStatus(p);
final long len = f.getLen();
final long blockSize = f.getBlockSize();
// round it up to full blocks
final long numBlocks = (len + blockSize - 1) / blockSize;
BlockLocation[] locs = dfs.getFileBlockLocations(p, 0, len);
assertEquals("Unexpected number of block locations for path " + p,
numBlocks, locs.length);
for (BlockLocation l: locs) {
if (l.getCachedHosts().length > 0) {
numCachedBlocks++;
}
numCachedReplicas += l.getCachedHosts().length;
}
}
LOG.info("Found " + numCachedBlocks + " of " + expectedBlocks + " blocks");
LOG.info("Found " + numCachedReplicas + " of " + expectedReplicas
+ " replicas");
assertEquals("Unexpected number of cached blocks", expectedBlocks,
numCachedBlocks);
assertEquals("Unexpected number of cached replicas", expectedReplicas,
numCachedReplicas);
}
/**
* Return an array containing hostnames, offset and size of
* portions of the given file. For WASB we'll just lie and give
* fake hosts to make sure we get many splits in MR jobs.
*/
@Override
public BlockLocation[] getFileBlockLocations(FileStatus file,
long start, long len) throws IOException {
if (file == null) {
return null;
}
if ((start < 0) || (len < 0)) {
throw new IllegalArgumentException("Invalid start or len parameter");
}
if (file.getLen() < start) {
return new BlockLocation[0];
}
final String blobLocationHost = getConf().get(
AZURE_BLOCK_LOCATION_HOST_PROPERTY_NAME,
AZURE_BLOCK_LOCATION_HOST_DEFAULT);
final String[] name = { blobLocationHost };
final String[] host = { blobLocationHost };
long blockSize = file.getBlockSize();
if (blockSize <= 0) {
throw new IllegalArgumentException(
"The block size for the given file is not a positive number: "
+ blockSize);
}
int numberOfLocations = (int) (len / blockSize)
+ ((len % blockSize == 0) ? 0 : 1);
BlockLocation[] locations = new BlockLocation[numberOfLocations];
for (int i = 0; i < locations.length; i++) {
long currentOffset = start + (i * blockSize);
long currentLength = Math.min(blockSize, start + len - currentOffset);
locations[i] = new BlockLocation(name, host, currentOffset, currentLength);
}
return locations;
}
@Override
public InputStream buildOneInput(
int locationIndex, long offsetInBlock,
FileSystem srcFs, Path srcFile, FileStatus srcStat,
FileSystem parityFs, Path parityFile, FileStatus parityStat
) throws IOException {
final long blockSize = srcStat.getBlockSize();
LOG.info("buildOneInput srcfile " + srcFile + " srclen " + srcStat.getLen() +
" parityfile " + parityFile + " paritylen " + parityStat.getLen() +
" stripeindex " + stripeStartIdx + " locationindex " + locationIndex +
" offsetinblock " + offsetInBlock);
if (locationIndex < codec.parityLength) {
return this.getParityFileInput(locationIndex, parityFile,
parityFs, parityStat, offsetInBlock);
} else {
// Dealing with a src file here.
int blockIdxInStripe = locationIndex - codec.parityLength;
int blockIdx = (int)(codec.stripeLength * stripeStartIdx + blockIdxInStripe);
long offset = blockSize * blockIdx + offsetInBlock;
if (offset >= srcStat.getLen()) {
LOG.info("Using zeros for " + srcFile + ":" + offset +
" for location " + locationIndex);
return new RaidUtils.ZeroInputStream(blockSize * (blockIdx + 1));
} else {
LOG.info("Opening " + srcFile + ":" + offset +
" for location " + locationIndex);
FSDataInputStream s = fs.open(
srcFile, conf.getInt("io.file.buffer.size", 64 * 1024));
s.seek(offset);
return s;
}
}
}
@Override
public HadoopFileStatus getExtendedStatus(URI path) throws IOException {
checkConnected();
FileStatus status = dfs.getFileStatus(new Path(path));
return new HadoopFileStatus(status.getPath().toUri(), status.getLen(), status.isDir(),
status.getModificationTime(), status.getBlockSize(), status.getGroup(), status.getOwner(),
status.getReplication());
}
/**
* Dump given list of files to standard output as typed bytes.
*/
@SuppressWarnings("unchecked")
private int dumpTypedBytes(List<FileStatus> files) throws IOException {
JobConf job = new JobConf(getConf());
DataOutputStream dout = new DataOutputStream(System.out);
AutoInputFormat autoInputFormat = new AutoInputFormat();
for (FileStatus fileStatus : files) {
FileSplit split = new FileSplit(fileStatus.getPath(), 0,
fileStatus.getLen() * fileStatus.getBlockSize(),
(String[]) null);
RecordReader recReader = null;
try {
recReader = autoInputFormat.getRecordReader(split, job, Reporter.NULL);
Object key = recReader.createKey();
Object value = recReader.createValue();
while (recReader.next(key, value)) {
if (key instanceof Writable) {
TypedBytesWritableOutput.get(dout).write((Writable) key);
} else {
TypedBytesOutput.get(dout).write(key);
}
if (value instanceof Writable) {
TypedBytesWritableOutput.get(dout).write((Writable) value);
} else {
TypedBytesOutput.get(dout).write(value);
}
}
} finally {
if (recReader != null) {
recReader.close();
}
}
}
dout.flush();
return 0;
}
/**
* Return an array containing hostnames, offset and size of
* portions of the given file. For WASB we'll just lie and give
* fake hosts to make sure we get many splits in MR jobs.
*/
@Override
public BlockLocation[] getFileBlockLocations(FileStatus file,
long start, long len) throws IOException {
if (file == null) {
return null;
}
if ((start < 0) || (len < 0)) {
throw new IllegalArgumentException("Invalid start or len parameter");
}
if (file.getLen() < start) {
return new BlockLocation[0];
}
final String blobLocationHost = getConf().get(
AZURE_BLOCK_LOCATION_HOST_PROPERTY_NAME,
AZURE_BLOCK_LOCATION_HOST_DEFAULT);
final String[] name = { blobLocationHost };
final String[] host = { blobLocationHost };
long blockSize = file.getBlockSize();
if (blockSize <= 0) {
throw new IllegalArgumentException(
"The block size for the given file is not a positive number: "
+ blockSize);
}
int numberOfLocations = (int) (len / blockSize)
+ ((len % blockSize == 0) ? 0 : 1);
BlockLocation[] locations = new BlockLocation[numberOfLocations];
for (int i = 0; i < locations.length; i++) {
long currentOffset = start + (i * blockSize);
long currentLength = Math.min(blockSize, start + len - currentOffset);
locations[i] = new BlockLocation(name, host, currentOffset, currentLength);
}
return locations;
}
@Override
public FileStatus getFileStatus(Path inlinePath) throws IOException {
Path outerPath = InLineFSUtils.getOuterfilePathFromInlinePath(inlinePath);
FileSystem outerFs = outerPath.getFileSystem(conf);
FileStatus status = outerFs.getFileStatus(outerPath);
FileStatus toReturn = new FileStatus(InLineFSUtils.length(inlinePath), status.isDirectory(), status.getReplication(), status.getBlockSize(),
status.getModificationTime(), status.getAccessTime(), status.getPermission(), status.getOwner(),
status.getGroup(), inlinePath);
return toReturn;
}
/**
* Builds (codec.stripeLength + codec.parityLength) inputs given some erased locations.
* Outputs:
* - the array of input streams @param inputs
* - the list of erased locations @param erasedLocations.
* - the list of locations that are not read @param locationsToNotRead.
*/
public InputStream[] buildInputs(
FileSystem srcFs, Path srcFile, FileStatus srcStat,
FileSystem parityFs, Path parityFile, FileStatus parityStat,
int stripeIdx, long offsetInBlock, List<Integer> erasedLocations,
List<Integer> locationsToRead, ErasureCode code)
throws IOException {
InputStream[] inputs = new InputStream[codec.stripeLength +
codec.parityLength];
boolean redo = false;
do {
/*
* In the first iteration locationsToRead is empty.
* It is populated according to locationsToReadForDecode.
* In consecutive iterations (if a stream failed to open)
* the list is cleared and re-populated.
*/
locationsToRead.clear();
locationsToRead.addAll(code.locationsToReadForDecode(erasedLocations));
for (int i = 0; i < inputs.length; i++) {
boolean isErased = (erasedLocations.indexOf(i) != -1);
boolean shouldRead = (locationsToRead.indexOf(i) != -1);
try {
InputStream stm = null;
if (isErased || !shouldRead) {
if (isErased) {
LOG.info("Location " + i + " is erased, using zeros");
} else {
LOG.info("Location " + i + " need not be read, using zeros");
}
stm = new RaidUtils.ZeroInputStream(srcStat.getBlockSize() * (
(i < codec.parityLength) ?
stripeIdx * codec.parityLength + i :
stripeIdx * codec.stripeLength + i - codec.parityLength));
} else {
stm = buildOneInput(i, offsetInBlock,
srcFs, srcFile, srcStat,
parityFs, parityFile, parityStat);
}
inputs[i] = stm;
} catch (IOException e) {
if (e instanceof BlockMissingException || e instanceof ChecksumException) {
erasedLocations.add(i);
redo = true;
RaidUtils.closeStreams(inputs);
break;
} else {
throw e;
}
}
}
} while (redo);
return inputs;
}
boolean checkAgainstParity(FileSystem fs, FileStatus stat) throws IOException {
Codec code = null;
ParityFilePair ppair = null;
for (Codec codec: Codec.getCodecs()) {
ppair = ParityFilePair.getParityFile(
codec, stat.getPath(), conf);
if (ppair != null) {
code = codec;
break;
}
}
if (code == null) {
LOG.info("No parity for " + stat.getPath());
return false;
}
int parityLength = code.parityLength;
LOG.info("Checking file parity " + stat.getPath() +
" against parity " + ppair.getPath());
final long blockSize = stat.getBlockSize();
int stripeLength = code.stripeLength;
long stripeBytes = stripeLength * blockSize;
int numStripes = (int)Math.ceil(stat.getLen() * 1.0 / stripeBytes);
// Look at all stripes.
for (int stripeIndex = 0; stripeIndex < numStripes; stripeIndex++) {
for (boolean lastKB : new boolean[]{true, false}) {
long shortest = shortestBlockLength(stripeIndex, stat, stripeLength);
// Optimization - if all blocks are the same size, one check is enough.
if (!lastKB) {
if (shortest == blockSize) {
continue;
}
}
long lastOffsetInBlock = lastKB ? blockSize : shortest;
if (lastOffsetInBlock < CHECKSIZE) {
lastOffsetInBlock = CHECKSIZE;
}
byte[][] stripeBufs = new byte[stripeLength][];
for (int i = 0; i < stripeLength; i++) {
stripeBufs[i] = new byte[CHECKSIZE];
}
byte[] parityBuf = new byte[CHECKSIZE];
byte[] actualParityBuf = new byte[CHECKSIZE];
// Read CHECKSIZE bytes from all blocks in a stripe and parity.
computeParity(conf, fs, stat, code, stripeIndex, stripeBufs,
parityBuf, lastOffsetInBlock);
readActualParity(ppair, actualParityBuf,
stripeIndex, parityLength, blockSize, lastOffsetInBlock);
if (!Arrays.equals(parityBuf, actualParityBuf)) {
return true;
}
}
}
// All stripes are good.
LOG.info("Checking file parity " + stat.getPath() +
" against parity " + ppair.getPath() + " was OK");
return false;
}
public static DataFile fromStat(FileStatus stat, PartitionData partition, Metrics metrics) {
String location = stat.getPath().toString();
FileFormat format = FileFormat.fromFileName(location);
return new GenericDataFile(
location, format, partition, stat.getLen(), stat.getBlockSize(), metrics);
}
public static DataFile fromParquetStat(FileStatus stat, PartitionData partition, Metrics metrics) {
String location = stat.getPath().toString();
FileFormat format = FileFormat.PARQUET;
return new GenericDataFile(
location, format, partition, stat.getLen(), stat.getBlockSize(), metrics);
}
void computeParity(Configuration conf, FileSystem fs, FileStatus stat,
Codec code,
final int stripeIndex, byte[][] stripeBufs, byte[] parityBuf,
long lastOffsetInBlock) throws IOException {
final long blockSize = stat.getBlockSize();
final long stripeBytes = stripeBufs.length * blockSize;
final long stripeStartOffset = stripeIndex * stripeBytes;
final long stripeEndOffset = stripeStartOffset + stripeBytes;
LOG.info("Checking parity " + stat.getPath() + " with last offset " + lastOffsetInBlock);
FSDataInputStream[] inputs = new FSDataInputStream[stripeBufs.length];
try {
int idx = 0;
// Loop through the blocks in the stripe
for (long blockStart = stripeStartOffset;
blockStart < stripeEndOffset;
blockStart += blockSize) {
// First zero out the buffer.
Arrays.fill(stripeBufs[idx], (byte)0);
if (blockStart < stat.getLen()) {
// Block is real, read some bytes from it.
long readEndOffset = blockStart + lastOffsetInBlock; // readEndOffset > blockStart.
long readStartOffset = readEndOffset - CHECKSIZE; // readEndOffset > readStartOffset.
// readStartOffset = blockStart + lastOffsetInBlock - CHECKSIZE, readStartOffset >= blockStartOffset
// blockStartOffset <= readStartOffset < readEndOffset
// Check for the case that the readEndOffset is beyond eof.
long blockEndOffset = Math.min((blockStart + blockSize), stat.getLen());
if (readStartOffset < blockEndOffset) {
// blockStart <= readStartOffset < blockEndOffset
inputs[idx] = fs.open(stat.getPath());
inputs[idx].seek(readStartOffset);
int bytesToRead = (int)Math.min(CHECKSIZE, blockEndOffset - readStartOffset);
IOUtils.readFully(inputs[idx], stripeBufs[idx], 0, bytesToRead);
// Rest is zeros
}
}
idx++;
}
if (code.id.equals("xor")) {
for (int i = 0; i < CHECKSIZE; i++) {
parityBuf[i] = 0;
// For XOR, each byte is XOR of all the stripe bytes.
for (int j = 0; j < stripeBufs.length; j++) {
parityBuf[i] = (byte)(parityBuf[i] ^ stripeBufs[j][i]);
}
}
} else if (code.id.equals("rs")) {
int parityLength = code.parityLength;
int[] msgbuf = new int[stripeBufs.length];
int[] codebuf = new int[parityLength];
ErasureCode rsCode = new ReedSolomonCode(stripeBufs.length, parityLength);
for (int i = 0; i < CHECKSIZE; i++) {
for (int j = 0; j < stripeBufs.length; j++) {
msgbuf[j] = stripeBufs[j][i] & 0x000000FF;
}
rsCode.encode(msgbuf, codebuf);
// Take the first parity byte.
parityBuf[i] = (byte)codebuf[0];
}
}
} finally {
for (InputStream stm: inputs) {
if (stm != null) stm.close();
}
}
}
@Override
public InputStream buildOneInput(
int locationIndex, long offsetInBlock,
FileSystem srcFs, Path srcFile, FileStatus srcStat,
FileSystem parityFs, Path parityFile, FileStatus parityStat
) throws IOException {
final long blockSize = srcStat.getBlockSize();
LOG.info("buildOneInput srcfile " + srcFile + " srclen " + srcStat.getLen() +
" parityfile " + parityFile + " paritylen " + parityStat.getLen() +
" stripeindex " + stripeStartIdx + " locationindex " + locationIndex +
" offsetinblock " + offsetInBlock);
if (locationIndex < codec.parityLength) {
return this.getParityFileInput(locationIndex, parityFile,
parityFs, parityStat, offsetInBlock);
} else {
// Dealing with a src file here.
int blockIdxInStripe = locationIndex - codec.parityLength;
int curBlockIdx = (int)curStripeIdx * codec.stripeLength + blockIdxInStripe;
if (curBlockIdx >= this.stripeBlocks.size()) {
LOG.info("Using zeros because we reach the end of the stripe");
return new RaidUtils.ZeroInputStream(blockSize * (curBlockIdx + 1));
}
BlockInfo bi = this.stripeBlocks.get(curBlockIdx);
FileStatus fstat = lfs.get(bi.fileIdx);
long offset = fstat.getBlockSize() * bi.blockId +
offsetInBlock;
if (offset >= fstat.getLen()) {
LOG.info("Using zeros for " + fstat.getPath() + ":" + offset +
" for location " + locationIndex);
return new RaidUtils.ZeroInputStream(blockSize * (curBlockIdx + 1));
} else {
LOG.info("Opening " + fstat.getPath() + ":" + offset +
" for location " + locationIndex);
FSDataInputStream s = fs.open(
fstat.getPath(), conf.getInt("io.file.buffer.size", 64 * 1024));
s.seek(offset);
return s;
}
}
}
/**
* removes a parity block in the specified stripe
*/
private void removeParityBlock(Path filePath, int stripe) throws IOException {
// find parity file
ParityFilePair ppair =
ParityFilePair.getParityFile(Codec.getCodec("xor"), filePath, conf);
String parityPathStr = ppair.getPath().toUri().getPath();
LOG.info("parity path: " + parityPathStr);
FileSystem parityFS = ppair.getFileSystem();
if (!(parityFS instanceof DistributedFileSystem)) {
throw new IOException("parity file is not on distributed file system");
}
DistributedFileSystem parityDFS = (DistributedFileSystem) parityFS;
// now corrupt the block corresponding to the stripe selected
FileStatus parityFileStatus =
parityDFS.getFileStatus(new Path(parityPathStr));
long parityBlockSize = parityFileStatus.getBlockSize();
long parityFileLength = parityFileStatus.getLen();
long parityFileLengthInBlocks = (parityFileLength / parityBlockSize) +
(((parityFileLength % parityBlockSize) == 0) ? 0L : 1L);
if (parityFileLengthInBlocks <= stripe) {
throw new IOException("selected stripe " + stripe +
" but parity file only has " +
parityFileLengthInBlocks + " blocks");
}
if (parityBlockSize != BLOCK_SIZE) {
throw new IOException("file block size is " + BLOCK_SIZE +
" but parity file block size is " +
parityBlockSize);
}
LocatedBlocks parityFileBlocks = parityDFS.getClient().namenode.
getBlockLocations(parityPathStr, 0, parityFileLength);
if (parityFileBlocks.locatedBlockCount() != parityFileLengthInBlocks) {
throw new IOException("expected " + parityFileLengthInBlocks +
" parity file blocks but got " +
parityFileBlocks.locatedBlockCount() +
" blocks");
}
LocatedBlock parityFileBlock = parityFileBlocks.get(stripe);
removeAndReportBlock(parityDFS, new Path(parityPathStr), parityFileBlock);
LOG.info("removed parity file block/stripe " + stripe +
" for " + filePath.toString());
}
/**
* gets the parity blocks corresponding to file
* returns the parity blocks in case of DFS
* and the part blocks containing parity blocks
* in case of HAR FS
*/
private BlockLocation[] getParityBlocks(final Path filePath,
final long blockSize,
final long numStripes,
final RaidInfo raidInfo)
throws IOException {
final String parityPathStr = raidInfo.parityPair.getPath().toUri().
getPath();
FileSystem parityFS = raidInfo.parityPair.getFileSystem();
// get parity file metadata
FileStatus parityFileStatus = parityFS.
getFileStatus(new Path(parityPathStr));
long parityFileLength = parityFileStatus.getLen();
if (parityFileLength != numStripes * raidInfo.parityBlocksPerStripe *
blockSize) {
throw new IOException("expected parity file of length" +
(numStripes * raidInfo.parityBlocksPerStripe *
blockSize) +
" but got parity file of length " +
parityFileLength);
}
BlockLocation[] parityBlocks =
parityFS.getFileBlockLocations(parityFileStatus, 0L, parityFileLength);
if (parityFS instanceof DistributedFileSystem ||
parityFS instanceof DistributedRaidFileSystem) {
long parityBlockSize = parityFileStatus.getBlockSize();
if (parityBlockSize != blockSize) {
throw new IOException("file block size is " + blockSize +
" but parity file block size is " +
parityBlockSize);
}
} else if (parityFS instanceof HarFileSystem) {
LOG.debug("HAR FS found");
} else {
LOG.warn("parity file system is not of a supported type");
}
return parityBlocks;
}
/**
* Generate splits.
*
* @param job refers to JobContext that is being used to read the configurations of the job that ran
* @param minSize refers to the minimum file block size.
* @param maxSize refers to the maximum file block size.
* @param splits refers to a list of splits that are being generated.
* @param file refers to the FileStatus required to determine block size,length,allocations.
* @throws IOException Signals that an I/O exception has occurred.
*/
private void generateSplits(JobContext job, long minSize, long maxSize,
List<InputSplit> splits, FileStatus file) throws IOException {
Path path = file.getPath();
int numOfRecordsInCurrentSplit = 0;
int numOfRecordsInPreviousSplit = 0;
FileSystem fs = path.getFileSystem(job.getConfiguration());
long length = file.getLen();
BlockLocation[] blkLocations = fs.getFileBlockLocations(file, 0,
length);
FSDataInputStream fsin = null ;
if ((length != 0) && isSplitable(job, path)) {
long blockSize = file.getBlockSize();
long splitSize = computeSplitSize(blockSize, minSize, maxSize);
long bytesRemaining = length;
// checking the occurrences of the record separator in current
// split
recordSeparator = job.getConfiguration()
.get(DataValidationConstants.RECORD_SEPARATOR)
.getBytes();
while (((double) bytesRemaining) / splitSize > SPLIT_SLOP) {
int blkIndex = getBlockIndex(blkLocations, length
- bytesRemaining);
long start = length - bytesRemaining;
long end = start + splitSize;
try{
fsin = fs.open(path);
fsin.seek(start);
long pos = start;
int b = 0;
int bufferPos = 0;
while (true) {
b = fsin.read();
pos = fsin.getPos();
if (b == -1) {
break;}
if (b == recordSeparator[bufferPos]) {
bufferPos++;
if (bufferPos == recordSeparator.length) {
numOfRecordsInCurrentSplit++;
bufferPos = 0;
if (pos > end) {
break;
}
}
} else {
// reset the value of buffer position to zero
bufferPos = 0;
}
}}finally{
if(fsin != null){
fsin.close();
}
}
splits.add(new DataValidationFileSplit(path, start,
splitSize, numOfRecordsInPreviousSplit,
blkLocations[blkIndex].getHosts()));
bytesRemaining -= splitSize;
numOfRecordsInPreviousSplit = numOfRecordsInCurrentSplit;
numOfRecordsInCurrentSplit = 0;
}
addSplitIfBytesRemaining(splits, path, numOfRecordsInPreviousSplit,
length, blkLocations, bytesRemaining);
} else if (length != 0) {
splits.add(new DataValidationFileSplit(path, 0, length,
numOfRecordsInPreviousSplit, blkLocations[0].getHosts()));
} else {
splits.add(new DataValidationFileSplit(path, 0, length,
numOfRecordsInPreviousSplit, new String[0]));
}
}