类org.apache.hadoop.fs.HdfsBlockLocation源码实例Demo

下面列出了怎么用org.apache.hadoop.fs.HdfsBlockLocation的API类实例代码及写法,或者点击链接到github查看源代码。


@Override
public BlockLocation[] getFileBlockLocations(
    FileStatus stat, long start, long len) throws IOException {
  if (stat.isDir()) {
    return null;
  }
  System.out.println("File " + stat.getPath());
  String name = stat.getPath().toUri().getPath();
  BlockLocation[] locs =
    super.getFileBlockLocations(stat, start, len);
  if (name.equals(fileWithMissingBlocks)) {
    System.out.println("Returning missing blocks for " + fileWithMissingBlocks);
    locs[0] = new HdfsBlockLocation(new BlockLocation(new String[0],
        new String[0], locs[0].getOffset(), locs[0].getLength()), null);
  }
  return locs;
}
 
源代码2 项目: hadoop   文件: DFSClient.java

/**
 * Get block location info about file
 * 
 * getBlockLocations() returns a list of hostnames that store 
 * data for a specific file region.  It returns a set of hostnames
 * for every block within the indicated region.
 *
 * This function is very useful when writing code that considers
 * data-placement when performing operations.  For example, the
 * MapReduce system tries to schedule tasks on the same machines
 * as the data-block the task processes. 
 */
public BlockLocation[] getBlockLocations(String src, long start, 
      long length) throws IOException, UnresolvedLinkException {
  TraceScope scope = getPathTraceScope("getBlockLocations", src);
  try {
    LocatedBlocks blocks = getLocatedBlocks(src, start, length);
    BlockLocation[] locations =  DFSUtil.locatedBlocks2Locations(blocks);
    HdfsBlockLocation[] hdfsLocations = new HdfsBlockLocation[locations.length];
    for (int i = 0; i < locations.length; i++) {
      hdfsLocations[i] = new HdfsBlockLocation(locations[i], blocks.get(i));
    }
    return hdfsLocations;
  } finally {
    scope.close();
  }
}
 
源代码3 项目: hadoop   文件: TestFsDatasetCache.java

@Test(timeout=60000)
public void testUncacheUnknownBlock() throws Exception {
  // Create a file
  Path fileName = new Path("/testUncacheUnknownBlock");
  int fileLen = 4096;
  DFSTestUtil.createFile(fs, fileName, fileLen, (short)1, 0xFDFD);
  HdfsBlockLocation[] locs = (HdfsBlockLocation[])fs.getFileBlockLocations(
      fileName, 0, fileLen);

  // Try to uncache it without caching it first
  setHeartbeatResponse(uncacheBlocks(locs));

  GenericTestUtils.waitFor(new Supplier<Boolean>() {
    @Override
    public Boolean get() {
      return fsd.getNumBlocksFailedToUncache() > 0;
    }
  }, 100, 10000);
}
 
源代码4 项目: hadoop   文件: TestFsDatasetCache.java

@Test(timeout=60000)
public void testPageRounder() throws Exception {
  // Write a small file
  Path fileName = new Path("/testPageRounder");
  final int smallBlocks = 512; // This should be smaller than the page size
  assertTrue("Page size should be greater than smallBlocks!",
      PAGE_SIZE > smallBlocks);
  final int numBlocks = 5;
  final int fileLen = smallBlocks * numBlocks;
  FSDataOutputStream out =
      fs.create(fileName, false, 4096, (short)1, smallBlocks);
  out.write(new byte[fileLen]);
  out.close();
  HdfsBlockLocation[] locs = (HdfsBlockLocation[])fs.getFileBlockLocations(
      fileName, 0, fileLen);
  // Cache the file and check the sizes match the page size
  setHeartbeatResponse(cacheBlocks(locs));
  DFSTestUtil.verifyExpectedCacheUsage(PAGE_SIZE * numBlocks, numBlocks, fsd);
  // Uncache and check that it decrements by the page size too
  setHeartbeatResponse(uncacheBlocks(locs));
  DFSTestUtil.verifyExpectedCacheUsage(0, 0, fsd);
}
 

@Override
public BlockLocation[] getFileBlockLocations(
    FileStatus stat, long start, long len) throws IOException {
  if (stat.isDir()) {
    return null;
  }
  System.out.println("File " + stat.getPath());
  String name = stat.getPath().toUri().getPath();
  BlockLocation[] locs =
    super.getFileBlockLocations(stat, start, len);
  if (name.equals(fileWithMissingBlocks)) {
    System.out.println("Returning missing blocks for " + fileWithMissingBlocks);
    locs[0] = new HdfsBlockLocation(new BlockLocation(new String[0],
        new String[0], locs[0].getOffset(), locs[0].getLength()), null);
  }
  return locs;
}
 
源代码6 项目: big-c   文件: DFSClient.java

/**
 * Get block location info about file
 * 
 * getBlockLocations() returns a list of hostnames that store 
 * data for a specific file region.  It returns a set of hostnames
 * for every block within the indicated region.
 *
 * This function is very useful when writing code that considers
 * data-placement when performing operations.  For example, the
 * MapReduce system tries to schedule tasks on the same machines
 * as the data-block the task processes. 
 */
public BlockLocation[] getBlockLocations(String src, long start, 
      long length) throws IOException, UnresolvedLinkException {
  TraceScope scope = getPathTraceScope("getBlockLocations", src);
  try {
    LocatedBlocks blocks = getLocatedBlocks(src, start, length);
    BlockLocation[] locations =  DFSUtil.locatedBlocks2Locations(blocks);
    HdfsBlockLocation[] hdfsLocations = new HdfsBlockLocation[locations.length];
    for (int i = 0; i < locations.length; i++) {
      hdfsLocations[i] = new HdfsBlockLocation(locations[i], blocks.get(i));
    }
    return hdfsLocations;
  } finally {
    scope.close();
  }
}
 
源代码7 项目: big-c   文件: TestFsDatasetCache.java

@Test(timeout=60000)
public void testUncacheUnknownBlock() throws Exception {
  // Create a file
  Path fileName = new Path("/testUncacheUnknownBlock");
  int fileLen = 4096;
  DFSTestUtil.createFile(fs, fileName, fileLen, (short)1, 0xFDFD);
  HdfsBlockLocation[] locs = (HdfsBlockLocation[])fs.getFileBlockLocations(
      fileName, 0, fileLen);

  // Try to uncache it without caching it first
  setHeartbeatResponse(uncacheBlocks(locs));

  GenericTestUtils.waitFor(new Supplier<Boolean>() {
    @Override
    public Boolean get() {
      return fsd.getNumBlocksFailedToUncache() > 0;
    }
  }, 100, 10000);
}
 
源代码8 项目: big-c   文件: TestFsDatasetCache.java

@Test(timeout=60000)
public void testPageRounder() throws Exception {
  // Write a small file
  Path fileName = new Path("/testPageRounder");
  final int smallBlocks = 512; // This should be smaller than the page size
  assertTrue("Page size should be greater than smallBlocks!",
      PAGE_SIZE > smallBlocks);
  final int numBlocks = 5;
  final int fileLen = smallBlocks * numBlocks;
  FSDataOutputStream out =
      fs.create(fileName, false, 4096, (short)1, smallBlocks);
  out.write(new byte[fileLen]);
  out.close();
  HdfsBlockLocation[] locs = (HdfsBlockLocation[])fs.getFileBlockLocations(
      fileName, 0, fileLen);
  // Cache the file and check the sizes match the page size
  setHeartbeatResponse(cacheBlocks(locs));
  DFSTestUtil.verifyExpectedCacheUsage(PAGE_SIZE * numBlocks, numBlocks, fsd);
  // Uncache and check that it decrements by the page size too
  setHeartbeatResponse(uncacheBlocks(locs));
  DFSTestUtil.verifyExpectedCacheUsage(0, 0, fsd);
}
 
源代码9 项目: hadoop   文件: TestFsDatasetCache.java

/**
 * Creates a cache or uncache DatanodeCommand from an array of locations
 */
private static DatanodeCommand getResponse(HdfsBlockLocation[] locs,
    int action) {
  String bpid = locs[0].getLocatedBlock().getBlock().getBlockPoolId();
  long[] blocks = new long[locs.length];
  for (int i=0; i<locs.length; i++) {
    blocks[i] = locs[i].getLocatedBlock().getBlock().getBlockId();
  }
  return new BlockIdCommand(action, bpid, blocks);
}
 
源代码10 项目: big-c   文件: TestFsDatasetCache.java

/**
 * Creates a cache or uncache DatanodeCommand from an array of locations
 */
private static DatanodeCommand getResponse(HdfsBlockLocation[] locs,
    int action) {
  String bpid = locs[0].getLocatedBlock().getBlock().getBlockPoolId();
  long[] blocks = new long[locs.length];
  for (int i=0; i<locs.length; i++) {
    blocks[i] = locs[i].getLocatedBlock().getBlock().getBlockId();
  }
  return new BlockIdCommand(action, bpid, blocks);
}
 

private static String getFirstBlockId(FileSystem fileSystem, Path realFile) throws IOException {
  FileStatus fileStatus = fileSystem.getFileStatus(realFile);
  BlockLocation[] locations = fileSystem.getFileBlockLocations(fileStatus, 0, 1);
  HdfsBlockLocation location = (HdfsBlockLocation) locations[0];
  LocatedBlock locatedBlock = location.getLocatedBlock();
  ExtendedBlock block = locatedBlock.getBlock();
  return toNiceString(block.getBlockId());
}
 
源代码12 项目: hadoop   文件: DFSClient.java

/**
 * Get block location information about a list of {@link HdfsBlockLocation}.
 * Used by {@link DistributedFileSystem#getFileBlockStorageLocations(List)} to
 * get {@link BlockStorageLocation}s for blocks returned by
 * {@link DistributedFileSystem#getFileBlockLocations(org.apache.hadoop.fs.FileStatus, long, long)}
 * .
 * 
 * This is done by making a round of RPCs to the associated datanodes, asking
 * the volume of each block replica. The returned array of
 * {@link BlockStorageLocation} expose this information as a
 * {@link VolumeId}.
 * 
 * @param blockLocations
 *          target blocks on which to query volume location information
 * @return volumeBlockLocations original block array augmented with additional
 *         volume location information for each replica.
 */
public BlockStorageLocation[] getBlockStorageLocations(
    List<BlockLocation> blockLocations) throws IOException,
    UnsupportedOperationException, InvalidBlockTokenException {
  if (!getConf().getHdfsBlocksMetadataEnabled) {
    throw new UnsupportedOperationException("Datanode-side support for " +
        "getVolumeBlockLocations() must also be enabled in the client " +
        "configuration.");
  }
  // Downcast blockLocations and fetch out required LocatedBlock(s)
  List<LocatedBlock> blocks = new ArrayList<LocatedBlock>();
  for (BlockLocation loc : blockLocations) {
    if (!(loc instanceof HdfsBlockLocation)) {
      throw new ClassCastException("DFSClient#getVolumeBlockLocations " +
          "expected to be passed HdfsBlockLocations");
    }
    HdfsBlockLocation hdfsLoc = (HdfsBlockLocation) loc;
    blocks.add(hdfsLoc.getLocatedBlock());
  }
  
  // Re-group the LocatedBlocks to be grouped by datanodes, with the values
  // a list of the LocatedBlocks on the datanode.
  Map<DatanodeInfo, List<LocatedBlock>> datanodeBlocks = 
      new LinkedHashMap<DatanodeInfo, List<LocatedBlock>>();
  for (LocatedBlock b : blocks) {
    for (DatanodeInfo info : b.getLocations()) {
      if (!datanodeBlocks.containsKey(info)) {
        datanodeBlocks.put(info, new ArrayList<LocatedBlock>());
      }
      List<LocatedBlock> l = datanodeBlocks.get(info);
      l.add(b);
    }
  }
      
  // Make RPCs to the datanodes to get volume locations for its replicas
  TraceScope scope =
    Trace.startSpan("getBlockStorageLocations", traceSampler);
  Map<DatanodeInfo, HdfsBlocksMetadata> metadatas;
  try {
    metadatas = BlockStorageLocationUtil.
        queryDatanodesForHdfsBlocksMetadata(conf, datanodeBlocks,
            getConf().getFileBlockStorageLocationsNumThreads,
            getConf().getFileBlockStorageLocationsTimeoutMs,
            getConf().connectToDnViaHostname);
    if (LOG.isTraceEnabled()) {
      LOG.trace("metadata returned: "
          + Joiner.on("\n").withKeyValueSeparator("=").join(metadatas));
    }
  } finally {
    scope.close();
  }
  
  // Regroup the returned VolumeId metadata to again be grouped by
  // LocatedBlock rather than by datanode
  Map<LocatedBlock, List<VolumeId>> blockVolumeIds = BlockStorageLocationUtil
      .associateVolumeIdsWithBlocks(blocks, metadatas);
  
  // Combine original BlockLocations with new VolumeId information
  BlockStorageLocation[] volumeBlockLocations = BlockStorageLocationUtil
      .convertToVolumeBlockLocations(blocks, blockVolumeIds);

  return volumeBlockLocations;
}
 
源代码13 项目: hadoop   文件: TestFsDatasetCache.java

private static DatanodeCommand[] cacheBlock(HdfsBlockLocation loc) {
  return cacheBlocks(new HdfsBlockLocation[] {loc});
}
 
源代码14 项目: hadoop   文件: TestFsDatasetCache.java

private static DatanodeCommand[] cacheBlocks(HdfsBlockLocation[] locs) {
  return new DatanodeCommand[] {
      getResponse(locs, DatanodeProtocol.DNA_CACHE)
  };
}
 
源代码15 项目: hadoop   文件: TestFsDatasetCache.java

private static DatanodeCommand[] uncacheBlock(HdfsBlockLocation loc) {
  return uncacheBlocks(new HdfsBlockLocation[] {loc});
}
 
源代码16 项目: hadoop   文件: TestFsDatasetCache.java

private static DatanodeCommand[] uncacheBlocks(HdfsBlockLocation[] locs) {
  return new DatanodeCommand[] {
      getResponse(locs, DatanodeProtocol.DNA_UNCACHE)
  };
}
 
源代码17 项目: hadoop   文件: TestFsDatasetCache.java

@Test(timeout=600000)
public void testFilesExceedMaxLockedMemory() throws Exception {
  LOG.info("beginning testFilesExceedMaxLockedMemory");

  // Create some test files that will exceed total cache capacity
  final int numFiles = 5;
  final long fileSize = CACHE_CAPACITY / (numFiles-1);

  final Path[] testFiles = new Path[numFiles];
  final HdfsBlockLocation[][] fileLocs = new HdfsBlockLocation[numFiles][];
  final long[] fileSizes = new long[numFiles];
  for (int i=0; i<numFiles; i++) {
    testFiles[i] = new Path("/testFilesExceedMaxLockedMemory-" + i);
    DFSTestUtil.createFile(fs, testFiles[i], fileSize, (short)1, 0xDFAl);
    fileLocs[i] = (HdfsBlockLocation[])fs.getFileBlockLocations(
        testFiles[i], 0, fileSize);
    // Get the file size (sum of blocks)
    long[] sizes = getBlockSizes(fileLocs[i]);
    for (int j=0; j<sizes.length; j++) {
      fileSizes[i] += sizes[j];
    }
  }

  // Cache the first n-1 files
  long total = 0;
  DFSTestUtil.verifyExpectedCacheUsage(0, 0, fsd);
  for (int i=0; i<numFiles-1; i++) {
    setHeartbeatResponse(cacheBlocks(fileLocs[i]));
    total = DFSTestUtil.verifyExpectedCacheUsage(
        rounder.round(total + fileSizes[i]), 4 * (i + 1), fsd);
  }

  // nth file should hit a capacity exception
  final LogVerificationAppender appender = new LogVerificationAppender();
  final Logger logger = Logger.getRootLogger();
  logger.addAppender(appender);
  setHeartbeatResponse(cacheBlocks(fileLocs[numFiles-1]));

  GenericTestUtils.waitFor(new Supplier<Boolean>() {
    @Override
    public Boolean get() {
      int lines = appender.countLinesWithMessage(
          "more bytes in the cache: " +
          DFSConfigKeys.DFS_DATANODE_MAX_LOCKED_MEMORY_KEY);
      return lines > 0;
    }
  }, 500, 30000);
  // Also check the metrics for the failure
  assertTrue("Expected more than 0 failed cache attempts",
      fsd.getNumBlocksFailedToCache() > 0);

  // Uncache the n-1 files
  int curCachedBlocks = 16;
  for (int i=0; i<numFiles-1; i++) {
    setHeartbeatResponse(uncacheBlocks(fileLocs[i]));
    long uncachedBytes = rounder.round(fileSizes[i]);
    total -= uncachedBytes;
    curCachedBlocks -= uncachedBytes / BLOCK_SIZE;
    DFSTestUtil.verifyExpectedCacheUsage(total, curCachedBlocks, fsd);
  }
  LOG.info("finishing testFilesExceedMaxLockedMemory");
}
 
源代码18 项目: big-c   文件: DFSClient.java

/**
 * Get block location information about a list of {@link HdfsBlockLocation}.
 * Used by {@link DistributedFileSystem#getFileBlockStorageLocations(List)} to
 * get {@link BlockStorageLocation}s for blocks returned by
 * {@link DistributedFileSystem#getFileBlockLocations(org.apache.hadoop.fs.FileStatus, long, long)}
 * .
 * 
 * This is done by making a round of RPCs to the associated datanodes, asking
 * the volume of each block replica. The returned array of
 * {@link BlockStorageLocation} expose this information as a
 * {@link VolumeId}.
 * 
 * @param blockLocations
 *          target blocks on which to query volume location information
 * @return volumeBlockLocations original block array augmented with additional
 *         volume location information for each replica.
 */
public BlockStorageLocation[] getBlockStorageLocations(
    List<BlockLocation> blockLocations) throws IOException,
    UnsupportedOperationException, InvalidBlockTokenException {
  if (!getConf().getHdfsBlocksMetadataEnabled) {
    throw new UnsupportedOperationException("Datanode-side support for " +
        "getVolumeBlockLocations() must also be enabled in the client " +
        "configuration.");
  }
  // Downcast blockLocations and fetch out required LocatedBlock(s)
  List<LocatedBlock> blocks = new ArrayList<LocatedBlock>();
  for (BlockLocation loc : blockLocations) {
    if (!(loc instanceof HdfsBlockLocation)) {
      throw new ClassCastException("DFSClient#getVolumeBlockLocations " +
          "expected to be passed HdfsBlockLocations");
    }
    HdfsBlockLocation hdfsLoc = (HdfsBlockLocation) loc;
    blocks.add(hdfsLoc.getLocatedBlock());
  }
  
  // Re-group the LocatedBlocks to be grouped by datanodes, with the values
  // a list of the LocatedBlocks on the datanode.
  Map<DatanodeInfo, List<LocatedBlock>> datanodeBlocks = 
      new LinkedHashMap<DatanodeInfo, List<LocatedBlock>>();
  for (LocatedBlock b : blocks) {
    for (DatanodeInfo info : b.getLocations()) {
      if (!datanodeBlocks.containsKey(info)) {
        datanodeBlocks.put(info, new ArrayList<LocatedBlock>());
      }
      List<LocatedBlock> l = datanodeBlocks.get(info);
      l.add(b);
    }
  }
      
  // Make RPCs to the datanodes to get volume locations for its replicas
  TraceScope scope =
    Trace.startSpan("getBlockStorageLocations", traceSampler);
  Map<DatanodeInfo, HdfsBlocksMetadata> metadatas;
  try {
    metadatas = BlockStorageLocationUtil.
        queryDatanodesForHdfsBlocksMetadata(conf, datanodeBlocks,
            getConf().getFileBlockStorageLocationsNumThreads,
            getConf().getFileBlockStorageLocationsTimeoutMs,
            getConf().connectToDnViaHostname);
    if (LOG.isTraceEnabled()) {
      LOG.trace("metadata returned: "
          + Joiner.on("\n").withKeyValueSeparator("=").join(metadatas));
    }
  } finally {
    scope.close();
  }
  
  // Regroup the returned VolumeId metadata to again be grouped by
  // LocatedBlock rather than by datanode
  Map<LocatedBlock, List<VolumeId>> blockVolumeIds = BlockStorageLocationUtil
      .associateVolumeIdsWithBlocks(blocks, metadatas);
  
  // Combine original BlockLocations with new VolumeId information
  BlockStorageLocation[] volumeBlockLocations = BlockStorageLocationUtil
      .convertToVolumeBlockLocations(blocks, blockVolumeIds);

  return volumeBlockLocations;
}
 
源代码19 项目: big-c   文件: TestFsDatasetCache.java

private static DatanodeCommand[] cacheBlock(HdfsBlockLocation loc) {
  return cacheBlocks(new HdfsBlockLocation[] {loc});
}
 
源代码20 项目: big-c   文件: TestFsDatasetCache.java

private static DatanodeCommand[] cacheBlocks(HdfsBlockLocation[] locs) {
  return new DatanodeCommand[] {
      getResponse(locs, DatanodeProtocol.DNA_CACHE)
  };
}
 
源代码21 项目: big-c   文件: TestFsDatasetCache.java

private static DatanodeCommand[] uncacheBlock(HdfsBlockLocation loc) {
  return uncacheBlocks(new HdfsBlockLocation[] {loc});
}
 
源代码22 项目: big-c   文件: TestFsDatasetCache.java

private static DatanodeCommand[] uncacheBlocks(HdfsBlockLocation[] locs) {
  return new DatanodeCommand[] {
      getResponse(locs, DatanodeProtocol.DNA_UNCACHE)
  };
}
 
源代码23 项目: big-c   文件: TestFsDatasetCache.java

@Test(timeout=600000)
public void testFilesExceedMaxLockedMemory() throws Exception {
  LOG.info("beginning testFilesExceedMaxLockedMemory");

  // Create some test files that will exceed total cache capacity
  final int numFiles = 5;
  final long fileSize = CACHE_CAPACITY / (numFiles-1);

  final Path[] testFiles = new Path[numFiles];
  final HdfsBlockLocation[][] fileLocs = new HdfsBlockLocation[numFiles][];
  final long[] fileSizes = new long[numFiles];
  for (int i=0; i<numFiles; i++) {
    testFiles[i] = new Path("/testFilesExceedMaxLockedMemory-" + i);
    DFSTestUtil.createFile(fs, testFiles[i], fileSize, (short)1, 0xDFAl);
    fileLocs[i] = (HdfsBlockLocation[])fs.getFileBlockLocations(
        testFiles[i], 0, fileSize);
    // Get the file size (sum of blocks)
    long[] sizes = getBlockSizes(fileLocs[i]);
    for (int j=0; j<sizes.length; j++) {
      fileSizes[i] += sizes[j];
    }
  }

  // Cache the first n-1 files
  long total = 0;
  DFSTestUtil.verifyExpectedCacheUsage(0, 0, fsd);
  for (int i=0; i<numFiles-1; i++) {
    setHeartbeatResponse(cacheBlocks(fileLocs[i]));
    total = DFSTestUtil.verifyExpectedCacheUsage(
        rounder.round(total + fileSizes[i]), 4 * (i + 1), fsd);
  }

  // nth file should hit a capacity exception
  final LogVerificationAppender appender = new LogVerificationAppender();
  final Logger logger = Logger.getRootLogger();
  logger.addAppender(appender);
  setHeartbeatResponse(cacheBlocks(fileLocs[numFiles-1]));

  GenericTestUtils.waitFor(new Supplier<Boolean>() {
    @Override
    public Boolean get() {
      int lines = appender.countLinesWithMessage(
          "more bytes in the cache: " +
          DFSConfigKeys.DFS_DATANODE_MAX_LOCKED_MEMORY_KEY);
      return lines > 0;
    }
  }, 500, 30000);
  // Also check the metrics for the failure
  assertTrue("Expected more than 0 failed cache attempts",
      fsd.getNumBlocksFailedToCache() > 0);

  // Uncache the n-1 files
  int curCachedBlocks = 16;
  for (int i=0; i<numFiles-1; i++) {
    setHeartbeatResponse(uncacheBlocks(fileLocs[i]));
    long uncachedBytes = rounder.round(fileSizes[i]);
    total -= uncachedBytes;
    curCachedBlocks -= uncachedBytes / BLOCK_SIZE;
    DFSTestUtil.verifyExpectedCacheUsage(total, curCachedBlocks, fsd);
  }
  LOG.info("finishing testFilesExceedMaxLockedMemory");
}
 
 类所在包
 同包方法