org.apache.hadoop.fs.FileStatus#getBlockSize ( )源码实例Demo

下面列出了org.apache.hadoop.fs.FileStatus#getBlockSize ( ) 实例代码,或者点击链接到github查看源代码,也可以在右侧发表评论。

@Override
public FileStatus getFileStatus(Path f) throws IOException {
    // it's the RawFS in place which messes things up as it dynamically returns the permissions...
    // workaround by doing a copy
    FileStatus fs = super.getFileStatus(f);

    // work-around for Hive 0.14
    if (SCRATCH_DIR.equals(f.toString())) {
        System.out.println("Faking scratch dir permissions on Windows...");

        return new FileStatus(fs.getLen(), fs.isDir(), fs.getReplication(), fs.getBlockSize(),
                fs.getModificationTime(), fs.getAccessTime(), SCRATCH_DIR_PERMS, fs.getOwner(), fs.getGroup(),
                fs.getPath());
        // this doesn't work since the RawFS impl has its own algo that does the lookup dynamically
        //fs.getPermission().fromShort((short) 777);
    }
    return fs;
}
 
源代码2 项目: big-c   文件: DistCpV1.java
private FSDataOutputStream create(Path f, Reporter reporter,
    FileStatus srcstat) throws IOException {
  if (destFileSys.exists(f)) {
    destFileSys.delete(f, false);
  }
  if (!preserve_status) {
    return destFileSys.create(f, true, sizeBuf, reporter);
  }

  FsPermission permission = preseved.contains(FileAttribute.PERMISSION)?
      srcstat.getPermission(): null;
  short replication = preseved.contains(FileAttribute.REPLICATION)?
      srcstat.getReplication(): destFileSys.getDefaultReplication(f);
  long blockSize = preseved.contains(FileAttribute.BLOCK_SIZE)?
      srcstat.getBlockSize(): destFileSys.getDefaultBlockSize(f);
  return destFileSys.create(f, permission, true, sizeBuf, replication,
      blockSize, reporter);
}
 
源代码3 项目: RDFS   文件: DataFsck.java
long shortestBlockLength(int stripeIndex, FileStatus stat, int stripeLength) {
  final long blockSize = stat.getBlockSize();
  final long stripeBytes = stripeLength * blockSize;
  int numStripes = (int) Math.ceil(stat.getLen() * 1.0 / stripeBytes);
  if (stripeIndex == numStripes - 1) {
    long remainder = stat.getLen() % blockSize;
    return (remainder == 0) ? blockSize : remainder;
  } else {
    return blockSize;
  }
}
 
源代码4 项目: WIFIProbe   文件: HDFSTool.java
public static void concat(String dir) throws IOException {


        String directory = NodeConfig.HDFS_PATH + dir;
        Configuration conf = new Configuration();
        DistributedFileSystem fs = (DistributedFileSystem)FileSystem.get(URI.create(directory), conf);
        FileStatus fileList[] = fs.listStatus(new Path(directory));

        if (fileList.length>=2) {

            ArrayList<Path>  srcs = new ArrayList<Path>(fileList.length);
            for (FileStatus fileStatus : fileList) {
                if ( fileStatus.isFile() &&
                        (fileStatus.getLen()&~fileStatus.getBlockSize())<fileStatus.getBlockSize()/2 ) {
                    srcs.add(fileStatus.getPath());
                }
            }

            if (srcs.size()>=2) {
                Logger.println("come to here");
                Path appended = srcs.get(0);
                Path[] sources = new Path[srcs.size()-1];
                for (int i=0; i<srcs.size()-1; i++) {
                    sources[i] = srcs.get(i+1);
                }
                Logger.println(fs==null);
                Logger.println(appended==null);
                Logger.println(sources==null);
                fs.concat(appended, sources);
                Logger.println("concat to : " + appended.getName());
                Logger.println(Arrays.toString(sources));
            }

            fs.close();
        }


    }
 
源代码5 项目: hadoop   文件: TestCacheDirectives.java
private static void checkNumCachedReplicas(final DistributedFileSystem dfs,
    final List<Path> paths, final int expectedBlocks,
    final int expectedReplicas)
    throws Exception {
  int numCachedBlocks = 0;
  int numCachedReplicas = 0;
  for (Path p: paths) {
    final FileStatus f = dfs.getFileStatus(p);
    final long len = f.getLen();
    final long blockSize = f.getBlockSize();
    // round it up to full blocks
    final long numBlocks = (len + blockSize - 1) / blockSize;
    BlockLocation[] locs = dfs.getFileBlockLocations(p, 0, len);
    assertEquals("Unexpected number of block locations for path " + p,
        numBlocks, locs.length);
    for (BlockLocation l: locs) {
      if (l.getCachedHosts().length > 0) {
        numCachedBlocks++;
      }
      numCachedReplicas += l.getCachedHosts().length;
    }
  }
  LOG.info("Found " + numCachedBlocks + " of " + expectedBlocks + " blocks");
  LOG.info("Found " + numCachedReplicas + " of " + expectedReplicas
      + " replicas");
  assertEquals("Unexpected number of cached blocks", expectedBlocks,
      numCachedBlocks);
  assertEquals("Unexpected number of cached replicas", expectedReplicas,
      numCachedReplicas);
}
 
源代码6 项目: hadoop   文件: NativeAzureFileSystem.java
/**
 * Return an array containing hostnames, offset and size of
 * portions of the given file. For WASB we'll just lie and give
 * fake hosts to make sure we get many splits in MR jobs.
 */
@Override
public BlockLocation[] getFileBlockLocations(FileStatus file,
    long start, long len) throws IOException {
  if (file == null) {
    return null;
  }

  if ((start < 0) || (len < 0)) {
    throw new IllegalArgumentException("Invalid start or len parameter");
  }

  if (file.getLen() < start) {
    return new BlockLocation[0];
  }
  final String blobLocationHost = getConf().get(
      AZURE_BLOCK_LOCATION_HOST_PROPERTY_NAME,
      AZURE_BLOCK_LOCATION_HOST_DEFAULT);
  final String[] name = { blobLocationHost };
  final String[] host = { blobLocationHost };
  long blockSize = file.getBlockSize();
  if (blockSize <= 0) {
    throw new IllegalArgumentException(
        "The block size for the given file is not a positive number: "
            + blockSize);
  }
  int numberOfLocations = (int) (len / blockSize)
      + ((len % blockSize == 0) ? 0 : 1);
  BlockLocation[] locations = new BlockLocation[numberOfLocations];
  for (int i = 0; i < locations.length; i++) {
    long currentOffset = start + (i * blockSize);
    long currentLength = Math.min(blockSize, start + len - currentOffset);
    locations[i] = new BlockLocation(name, host, currentOffset, currentLength);
  }
  return locations;
}
 
源代码7 项目: RDFS   文件: FileStripeReader.java
@Override
public InputStream buildOneInput(
    int locationIndex, long offsetInBlock,
    FileSystem srcFs, Path srcFile, FileStatus srcStat,
    FileSystem parityFs, Path parityFile, FileStatus parityStat
    ) throws IOException {
  final long blockSize = srcStat.getBlockSize();

  LOG.info("buildOneInput srcfile " + srcFile + " srclen " + srcStat.getLen() + 
      " parityfile " + parityFile + " paritylen " + parityStat.getLen() +
      " stripeindex " + stripeStartIdx + " locationindex " + locationIndex +
      " offsetinblock " + offsetInBlock);
  if (locationIndex < codec.parityLength) {
    return this.getParityFileInput(locationIndex, parityFile,
        parityFs, parityStat, offsetInBlock);
  } else {
    // Dealing with a src file here.
    int blockIdxInStripe = locationIndex - codec.parityLength;
    int blockIdx = (int)(codec.stripeLength * stripeStartIdx + blockIdxInStripe);
    long offset = blockSize * blockIdx + offsetInBlock;
    if (offset >= srcStat.getLen()) {
      LOG.info("Using zeros for " + srcFile + ":" + offset +
        " for location " + locationIndex);
      return new RaidUtils.ZeroInputStream(blockSize * (blockIdx + 1));
    } else {
      LOG.info("Opening " + srcFile + ":" + offset +
               " for location " + locationIndex);
      FSDataInputStream s = fs.open(
          srcFile, conf.getInt("io.file.buffer.size", 64 * 1024));
      s.seek(offset);
      return s;
    }
  }
}
 
@Override
public HadoopFileStatus getExtendedStatus(URI path) throws IOException {
	checkConnected();
	FileStatus status = dfs.getFileStatus(new Path(path));
	return new HadoopFileStatus(status.getPath().toUri(), status.getLen(), status.isDir(),
			status.getModificationTime(), status.getBlockSize(), status.getGroup(), status.getOwner(),
			status.getReplication());
}
 
源代码9 项目: hadoop   文件: DumpTypedBytes.java
/**
 * Dump given list of files to standard output as typed bytes.
 */
@SuppressWarnings("unchecked")
private int dumpTypedBytes(List<FileStatus> files) throws IOException {
  JobConf job = new JobConf(getConf()); 
  DataOutputStream dout = new DataOutputStream(System.out);
  AutoInputFormat autoInputFormat = new AutoInputFormat();
  for (FileStatus fileStatus : files) {
    FileSplit split = new FileSplit(fileStatus.getPath(), 0,
      fileStatus.getLen() * fileStatus.getBlockSize(),
      (String[]) null);
    RecordReader recReader = null;
    try {
      recReader = autoInputFormat.getRecordReader(split, job, Reporter.NULL);
      Object key = recReader.createKey();
      Object value = recReader.createValue();
      while (recReader.next(key, value)) {
        if (key instanceof Writable) {
          TypedBytesWritableOutput.get(dout).write((Writable) key);
        } else {
          TypedBytesOutput.get(dout).write(key);
        }
        if (value instanceof Writable) {
          TypedBytesWritableOutput.get(dout).write((Writable) value);
        } else {
          TypedBytesOutput.get(dout).write(value);
        }
      }
    } finally {
      if (recReader != null) {
        recReader.close();
      }
    }
  }
  dout.flush();
  return 0;
}
 
源代码10 项目: big-c   文件: NativeAzureFileSystem.java
/**
 * Return an array containing hostnames, offset and size of
 * portions of the given file. For WASB we'll just lie and give
 * fake hosts to make sure we get many splits in MR jobs.
 */
@Override
public BlockLocation[] getFileBlockLocations(FileStatus file,
    long start, long len) throws IOException {
  if (file == null) {
    return null;
  }

  if ((start < 0) || (len < 0)) {
    throw new IllegalArgumentException("Invalid start or len parameter");
  }

  if (file.getLen() < start) {
    return new BlockLocation[0];
  }
  final String blobLocationHost = getConf().get(
      AZURE_BLOCK_LOCATION_HOST_PROPERTY_NAME,
      AZURE_BLOCK_LOCATION_HOST_DEFAULT);
  final String[] name = { blobLocationHost };
  final String[] host = { blobLocationHost };
  long blockSize = file.getBlockSize();
  if (blockSize <= 0) {
    throw new IllegalArgumentException(
        "The block size for the given file is not a positive number: "
            + blockSize);
  }
  int numberOfLocations = (int) (len / blockSize)
      + ((len % blockSize == 0) ? 0 : 1);
  BlockLocation[] locations = new BlockLocation[numberOfLocations];
  for (int i = 0; i < locations.length; i++) {
    long currentOffset = start + (i * blockSize);
    long currentLength = Math.min(blockSize, start + len - currentOffset);
    locations[i] = new BlockLocation(name, host, currentOffset, currentLength);
  }
  return locations;
}
 
源代码11 项目: hudi   文件: InLineFileSystem.java
@Override
public FileStatus getFileStatus(Path inlinePath) throws IOException {
  Path outerPath = InLineFSUtils.getOuterfilePathFromInlinePath(inlinePath);
  FileSystem outerFs = outerPath.getFileSystem(conf);
  FileStatus status = outerFs.getFileStatus(outerPath);
  FileStatus toReturn = new FileStatus(InLineFSUtils.length(inlinePath), status.isDirectory(), status.getReplication(), status.getBlockSize(),
      status.getModificationTime(), status.getAccessTime(), status.getPermission(), status.getOwner(),
      status.getGroup(), inlinePath);
  return toReturn;
}
 
源代码12 项目: RDFS   文件: StripeReader.java
/**
 * Builds (codec.stripeLength + codec.parityLength) inputs given some erased locations.
 * Outputs:
 *  - the array of input streams @param inputs
 *  - the list of erased locations @param erasedLocations.
 *  - the list of locations that are not read @param locationsToNotRead.
 */
public InputStream[] buildInputs(
  FileSystem srcFs, Path srcFile, FileStatus srcStat,
  FileSystem parityFs, Path parityFile, FileStatus parityStat,
  int stripeIdx, long offsetInBlock, List<Integer> erasedLocations,
  List<Integer> locationsToRead, ErasureCode code)
    throws IOException {
  InputStream[] inputs = new InputStream[codec.stripeLength +
                                         codec.parityLength]; 
  boolean redo = false;
  do {
    /*
     * In the first iteration locationsToRead is empty.
     * It is populated according to locationsToReadForDecode.
     * In consecutive iterations (if a stream failed to open)
     * the list is cleared and re-populated.
     */
    locationsToRead.clear();
    locationsToRead.addAll(code.locationsToReadForDecode(erasedLocations));

    for (int i = 0; i < inputs.length; i++) {
      boolean isErased = (erasedLocations.indexOf(i) != -1);
      boolean shouldRead = (locationsToRead.indexOf(i) != -1);
      try {
        InputStream stm = null;
        if (isErased || !shouldRead) {
          if (isErased) {
            LOG.info("Location " + i + " is erased, using zeros");
          } else {
            LOG.info("Location " + i + " need not be read, using zeros");
          }

          stm = new RaidUtils.ZeroInputStream(srcStat.getBlockSize() * (
            (i < codec.parityLength) ?
            stripeIdx * codec.parityLength + i :
            stripeIdx * codec.stripeLength + i - codec.parityLength));
        } else {
          stm = buildOneInput(i, offsetInBlock,
                              srcFs, srcFile, srcStat,
                              parityFs, parityFile, parityStat);
        }
        inputs[i] = stm;
      } catch (IOException e) {
        if (e instanceof BlockMissingException || e instanceof ChecksumException) {
          erasedLocations.add(i);
          redo = true;
          RaidUtils.closeStreams(inputs);
          break;
        } else {
          throw e;
        }
      }
    }
  } while (redo);
  return inputs;
}
 
源代码13 项目: RDFS   文件: DataFsck.java
boolean checkAgainstParity(FileSystem fs, FileStatus stat) throws IOException {
  Codec code = null;
  ParityFilePair ppair = null;
  for (Codec codec: Codec.getCodecs()) {
    ppair = ParityFilePair.getParityFile(
      codec, stat.getPath(), conf);
    if (ppair != null) {
      code = codec; 
      break;
    } 
  }
  if (code == null) {
    LOG.info("No parity for " + stat.getPath());
    return false;
  }
  int parityLength = code.parityLength;
  LOG.info("Checking file parity " + stat.getPath() +
    " against parity " + ppair.getPath());

  final long blockSize = stat.getBlockSize();
  int stripeLength = code.stripeLength;
  long stripeBytes = stripeLength * blockSize;
  int numStripes = (int)Math.ceil(stat.getLen() * 1.0 / stripeBytes);

  // Look at all stripes.
  for (int stripeIndex = 0; stripeIndex < numStripes; stripeIndex++) {
    for (boolean lastKB : new boolean[]{true, false}) {
      long shortest = shortestBlockLength(stripeIndex, stat, stripeLength);
      // Optimization - if all blocks are the same size, one check is enough.
      if (!lastKB) {
        if (shortest == blockSize) {
          continue;
        }
      }

      long lastOffsetInBlock = lastKB ? blockSize : shortest;
      if (lastOffsetInBlock < CHECKSIZE) {
        lastOffsetInBlock = CHECKSIZE;
      }
      byte[][] stripeBufs = new byte[stripeLength][];
      for (int i = 0; i < stripeLength; i++) {
        stripeBufs[i] = new byte[CHECKSIZE];
      }
      byte[] parityBuf = new byte[CHECKSIZE];
      byte[] actualParityBuf = new byte[CHECKSIZE];
      // Read CHECKSIZE bytes from all blocks in a stripe and parity.
      computeParity(conf, fs, stat, code, stripeIndex, stripeBufs,
        parityBuf, lastOffsetInBlock);

      readActualParity(ppair, actualParityBuf,
        stripeIndex, parityLength, blockSize, lastOffsetInBlock);

      if (!Arrays.equals(parityBuf, actualParityBuf)) {
        return true;
      }
    }
  }
  // All stripes are good.
  LOG.info("Checking file parity " + stat.getPath() +
    " against parity " + ppair.getPath() + " was OK");
  return false;
}
 
源代码14 项目: iceberg   文件: DataFiles.java
public static DataFile fromStat(FileStatus stat, PartitionData partition, Metrics metrics) {
  String location = stat.getPath().toString();
  FileFormat format = FileFormat.fromFileName(location);
  return new GenericDataFile(
      location, format, partition, stat.getLen(), stat.getBlockSize(), metrics);
}
 
源代码15 项目: iceberg   文件: DataFiles.java
public static DataFile fromParquetStat(FileStatus stat, PartitionData partition, Metrics metrics) {
  String location = stat.getPath().toString();
  FileFormat format = FileFormat.PARQUET;
  return new GenericDataFile(
      location, format, partition, stat.getLen(), stat.getBlockSize(), metrics);
}
 
源代码16 项目: RDFS   文件: DataFsck.java
void computeParity(Configuration conf, FileSystem fs, FileStatus stat,
      Codec code,
      final int stripeIndex, byte[][] stripeBufs, byte[] parityBuf,
      long lastOffsetInBlock) throws IOException {
  final long blockSize = stat.getBlockSize();
  final long stripeBytes = stripeBufs.length * blockSize;
  final long stripeStartOffset = stripeIndex * stripeBytes;
  final long stripeEndOffset = stripeStartOffset + stripeBytes;
  LOG.info("Checking parity " + stat.getPath() + " with last offset " + lastOffsetInBlock);

  FSDataInputStream[] inputs = new FSDataInputStream[stripeBufs.length];
  try {
    int idx = 0;
    // Loop through the blocks in the stripe
    for (long blockStart = stripeStartOffset;
         blockStart < stripeEndOffset;
         blockStart += blockSize) {
      // First zero out the buffer.
      Arrays.fill(stripeBufs[idx], (byte)0);
      if (blockStart < stat.getLen()) {
        // Block is real, read some bytes from it.
        long readEndOffset = blockStart + lastOffsetInBlock; // readEndOffset > blockStart.
        long readStartOffset = readEndOffset - CHECKSIZE; // readEndOffset > readStartOffset.
        // readStartOffset = blockStart + lastOffsetInBlock - CHECKSIZE, readStartOffset >= blockStartOffset
        // blockStartOffset <= readStartOffset < readEndOffset
        // Check for the case that the readEndOffset is beyond eof.
        long blockEndOffset = Math.min((blockStart + blockSize), stat.getLen());

        if (readStartOffset < blockEndOffset) {
          // blockStart <= readStartOffset < blockEndOffset
          inputs[idx] = fs.open(stat.getPath());
          inputs[idx].seek(readStartOffset);
          int bytesToRead = (int)Math.min(CHECKSIZE, blockEndOffset - readStartOffset);
          IOUtils.readFully(inputs[idx], stripeBufs[idx], 0, bytesToRead);
          // Rest is zeros
        }
      }
      idx++;
    }
    if (code.id.equals("xor")) {
      for (int i = 0; i < CHECKSIZE; i++) {
        parityBuf[i] = 0;
        // For XOR, each byte is XOR of all the stripe bytes.
        for (int j = 0; j < stripeBufs.length; j++) {
          parityBuf[i] = (byte)(parityBuf[i] ^ stripeBufs[j][i]);
        }
      }
    } else if (code.id.equals("rs")) {
      int parityLength = code.parityLength;
      int[] msgbuf = new int[stripeBufs.length];
      int[] codebuf = new int[parityLength];
      ErasureCode rsCode = new ReedSolomonCode(stripeBufs.length, parityLength);
      for (int i = 0; i < CHECKSIZE; i++) {
        for (int j = 0; j < stripeBufs.length; j++) {
          msgbuf[j] = stripeBufs[j][i] & 0x000000FF;
        }
        rsCode.encode(msgbuf, codebuf);
        // Take the first parity byte.
        parityBuf[i] = (byte)codebuf[0];
      }
    }
  } finally {
    for (InputStream stm: inputs) {
      if (stm != null) stm.close();
    }
  }
}
 
源代码17 项目: RDFS   文件: DirectoryStripeReader.java
@Override
public InputStream buildOneInput(
    int locationIndex, long offsetInBlock,
    FileSystem srcFs, Path srcFile, FileStatus srcStat,
    FileSystem parityFs, Path parityFile, FileStatus parityStat
    ) throws IOException {
  final long blockSize = srcStat.getBlockSize();

  LOG.info("buildOneInput srcfile " + srcFile + " srclen " + srcStat.getLen() + 
      " parityfile " + parityFile + " paritylen " + parityStat.getLen() +
      " stripeindex " + stripeStartIdx + " locationindex " + locationIndex +
      " offsetinblock " + offsetInBlock);
  if (locationIndex < codec.parityLength) {
    return this.getParityFileInput(locationIndex, parityFile,
        parityFs, parityStat, offsetInBlock);
  } else {
    // Dealing with a src file here.
    int blockIdxInStripe = locationIndex - codec.parityLength;
    int curBlockIdx = (int)curStripeIdx * codec.stripeLength + blockIdxInStripe;
    if (curBlockIdx >= this.stripeBlocks.size()) {
      LOG.info("Using zeros because we reach the end of the stripe");
      return new RaidUtils.ZeroInputStream(blockSize * (curBlockIdx + 1));
    }
    BlockInfo bi = this.stripeBlocks.get(curBlockIdx);
    FileStatus fstat = lfs.get(bi.fileIdx);
    long offset = fstat.getBlockSize() * bi.blockId +
        offsetInBlock;
    if (offset >= fstat.getLen()) {
      LOG.info("Using zeros for " + fstat.getPath() + ":" + offset +
        " for location " + locationIndex);
      return new RaidUtils.ZeroInputStream(blockSize * (curBlockIdx + 1));
    } else {
      LOG.info("Opening " + fstat.getPath() + ":" + offset +
               " for location " + locationIndex);
      FSDataInputStream s = fs.open(
          fstat.getPath(), conf.getInt("io.file.buffer.size", 64 * 1024));
      s.seek(offset);
      return s;
    }
  }
}
 
源代码18 项目: RDFS   文件: TestRaidShellFsck.java
/**
 * removes a parity block in the specified stripe
 */
private void removeParityBlock(Path filePath, int stripe) throws IOException {
  // find parity file
  ParityFilePair ppair =
      ParityFilePair.getParityFile(Codec.getCodec("xor"), filePath, conf);
  String parityPathStr = ppair.getPath().toUri().getPath();
  LOG.info("parity path: " + parityPathStr);
  FileSystem parityFS = ppair.getFileSystem();
  if (!(parityFS instanceof DistributedFileSystem)) {
    throw new IOException("parity file is not on distributed file system");
  }
  DistributedFileSystem parityDFS = (DistributedFileSystem) parityFS;

  
  // now corrupt the block corresponding to the stripe selected
  FileStatus parityFileStatus =
    parityDFS.getFileStatus(new Path(parityPathStr));
  long parityBlockSize = parityFileStatus.getBlockSize();
  long parityFileLength = parityFileStatus.getLen();
  long parityFileLengthInBlocks = (parityFileLength / parityBlockSize) + 
    (((parityFileLength % parityBlockSize) == 0) ? 0L : 1L);
  if (parityFileLengthInBlocks <= stripe) {
    throw new IOException("selected stripe " + stripe + 
                          " but parity file only has " + 
                          parityFileLengthInBlocks + " blocks");
  }
  if (parityBlockSize != BLOCK_SIZE) {
    throw new IOException("file block size is " + BLOCK_SIZE + 
                          " but parity file block size is " + 
                          parityBlockSize);
  }
  LocatedBlocks parityFileBlocks = parityDFS.getClient().namenode.
    getBlockLocations(parityPathStr, 0, parityFileLength);
  if (parityFileBlocks.locatedBlockCount() != parityFileLengthInBlocks) {
    throw new IOException("expected " + parityFileLengthInBlocks + 
                          " parity file blocks but got " + 
                          parityFileBlocks.locatedBlockCount() + 
                          " blocks");
  }
  LocatedBlock parityFileBlock = parityFileBlocks.get(stripe);
  removeAndReportBlock(parityDFS, new Path(parityPathStr), parityFileBlock);
  LOG.info("removed parity file block/stripe " + stripe +
           " for " + filePath.toString());

}
 
源代码19 项目: RDFS   文件: RaidShell.java
/**
 * gets the parity blocks corresponding to file
 * returns the parity blocks in case of DFS
 * and the part blocks containing parity blocks
 * in case of HAR FS
 */
private BlockLocation[] getParityBlocks(final Path filePath,
                                        final long blockSize,
                                        final long numStripes,
                                        final RaidInfo raidInfo) 
  throws IOException {


  final String parityPathStr = raidInfo.parityPair.getPath().toUri().
    getPath();
  FileSystem parityFS = raidInfo.parityPair.getFileSystem();
  
  // get parity file metadata
  FileStatus parityFileStatus = parityFS.
    getFileStatus(new Path(parityPathStr));
  long parityFileLength = parityFileStatus.getLen();

  if (parityFileLength != numStripes * raidInfo.parityBlocksPerStripe *
      blockSize) {
    throw new IOException("expected parity file of length" + 
                          (numStripes * raidInfo.parityBlocksPerStripe *
                           blockSize) +
                          " but got parity file of length " + 
                          parityFileLength);
  }

  BlockLocation[] parityBlocks = 
    parityFS.getFileBlockLocations(parityFileStatus, 0L, parityFileLength);
  
  if (parityFS instanceof DistributedFileSystem ||
      parityFS instanceof DistributedRaidFileSystem) {
    long parityBlockSize = parityFileStatus.getBlockSize();
    if (parityBlockSize != blockSize) {
      throw new IOException("file block size is " + blockSize + 
                            " but parity file block size is " + 
                            parityBlockSize);
    }
  } else if (parityFS instanceof HarFileSystem) {
    LOG.debug("HAR FS found");
  } else {
    LOG.warn("parity file system is not of a supported type");
  }
  
  return parityBlocks;
}
 
源代码20 项目: jumbune   文件: DataValidationInputFormat.java
/**
 * Generate splits.
 *
 * @param job refers to JobContext that is being used to read the configurations of the job that ran
 * @param minSize refers to the minimum file block size.
 * @param maxSize refers to the maximum file block size.
 * @param splits refers  to a list of splits that are being generated.
 * @param file refers to the FileStatus required to determine block size,length,allocations.
 * @throws IOException Signals that an I/O exception has occurred.
 */
private void generateSplits(JobContext job, long minSize, long maxSize,
		List<InputSplit> splits, FileStatus file) throws IOException {
	Path path = file.getPath();
	int numOfRecordsInCurrentSplit = 0;
	int numOfRecordsInPreviousSplit = 0;
	FileSystem fs = path.getFileSystem(job.getConfiguration());
	long length = file.getLen();
	BlockLocation[] blkLocations = fs.getFileBlockLocations(file, 0,
			length);
	FSDataInputStream fsin = null ;
	if ((length != 0) && isSplitable(job, path)) {
		long blockSize = file.getBlockSize();
		long splitSize = computeSplitSize(blockSize, minSize, maxSize);
		long bytesRemaining = length;
		
		// checking the occurrences of the record separator in current
		// split
		recordSeparator = job.getConfiguration()
				.get(DataValidationConstants.RECORD_SEPARATOR)
				.getBytes();
		while (((double) bytesRemaining) / splitSize > SPLIT_SLOP) {
			int blkIndex = getBlockIndex(blkLocations, length
					- bytesRemaining);
			long start = length - bytesRemaining;
			long end = start + splitSize;
			try{
			fsin = fs.open(path);
			fsin.seek(start);
			long pos = start;
			int b = 0;
			int bufferPos = 0;
			while (true) {
				b = fsin.read();
				pos = fsin.getPos();
				if (b == -1) {
					break;}
				if (b == recordSeparator[bufferPos]) {
					bufferPos++;
					if (bufferPos == recordSeparator.length) {
						numOfRecordsInCurrentSplit++;
						bufferPos = 0;
						if (pos > end) {
							break;
						}
					}
				} else {
					// reset the value of buffer position to zero
					bufferPos = 0;
				}

			}}finally{
				if(fsin != null){
					fsin.close();
				}
			}

			splits.add(new DataValidationFileSplit(path, start,
					splitSize, numOfRecordsInPreviousSplit,
					blkLocations[blkIndex].getHosts()));
			bytesRemaining -= splitSize;
			numOfRecordsInPreviousSplit = numOfRecordsInCurrentSplit;
			numOfRecordsInCurrentSplit = 0;
		}

		addSplitIfBytesRemaining(splits, path, numOfRecordsInPreviousSplit,
				length, blkLocations, bytesRemaining);
	} else if (length != 0) {
		splits.add(new DataValidationFileSplit(path, 0, length,
				numOfRecordsInPreviousSplit, blkLocations[0].getHosts()));
	} else {
		splits.add(new DataValidationFileSplit(path, 0, length,
				numOfRecordsInPreviousSplit, new String[0]));
	}
}