org.mockito.internal.stubbing.answers.ThrowsException#org.apache.hadoop.hdfs.protocol.ExtendedBlock源码实例Demo

下面列出了org.mockito.internal.stubbing.answers.ThrowsException#org.apache.hadoop.hdfs.protocol.ExtendedBlock 实例代码,或者点击链接到github查看源代码,也可以在右侧发表评论。

源代码1 项目: big-c   文件: DFSInputStream.java
private void fetchBlockByteRange(LocatedBlock block, long start, long end,
    byte[] buf, int offset,
    Map<ExtendedBlock, Set<DatanodeInfo>> corruptedBlockMap)
    throws IOException {
  block = getBlockAt(block.getStartOffset());
  while (true) {
    DNAddrPair addressPair = chooseDataNode(block, null);
    try {
      actualGetFromOneDataNode(addressPair, block, start, end, buf, offset,
          corruptedBlockMap);
      return;
    } catch (IOException e) {
      // Ignore. Already processed inside the function.
      // Loop through to try the next node.
    }
  }
}
 
源代码2 项目: hadoop   文件: TestSimulatedFSDataset.java
@Test
public void testInvalidate() throws IOException {
  final SimulatedFSDataset fsdataset = getSimulatedFSDataset();
  int bytesAdded = addSomeBlocks(fsdataset);
  Block[] deleteBlocks = new Block[2];
  deleteBlocks[0] = new Block(1, 0, 0);
  deleteBlocks[1] = new Block(2, 0, 0);
  fsdataset.invalidate(bpid, deleteBlocks);
  checkInvalidBlock(new ExtendedBlock(bpid, deleteBlocks[0]));
  checkInvalidBlock(new ExtendedBlock(bpid, deleteBlocks[1]));
  long sizeDeleted = blockIdToLen(1) + blockIdToLen(2);
  assertEquals(bytesAdded-sizeDeleted, fsdataset.getDfsUsed());
  assertEquals(fsdataset.getCapacity()-bytesAdded+sizeDeleted,  fsdataset.getRemaining());
  
  // Now make sure the rest of the blocks are valid
  for (int i=3; i <= NUMBLOCKS; ++i) {
    Block b = new Block(i, 0, 0);
    assertTrue(fsdataset.isValidBlock(new ExtendedBlock(bpid, b)));
  }
}
 
源代码3 项目: big-c   文件: VolumeScanner.java
public synchronized void markSuspectBlock(ExtendedBlock block) {
  if (stopping) {
    LOG.info("{}: Not scheduling suspect block {} for " +
        "rescanning, because this volume scanner is stopping.", this, block);
    return;
  }
  Boolean recent = recentSuspectBlocks.getIfPresent(block);
  if (recent != null) {
    LOG.info("{}: Not scheduling suspect block {} for " +
        "rescanning, because we rescanned it recently.", this, block);
    return;
  }
  if (suspectBlocks.contains(block)) {
    LOG.info("{}: suspect block {} is already queued for " +
        "rescanning.", this, block);
    return;
  }
  suspectBlocks.add(block);
  recentSuspectBlocks.put(block, true);
  LOG.info("{}: Scheduling suspect block {} for rescanning.", this, block);
  notify(); // wake scanner thread.
}
 
源代码4 项目: hadoop   文件: Sender.java
@Override
public void readBlock(final ExtendedBlock blk,
    final Token<BlockTokenIdentifier> blockToken,
    final String clientName,
    final long blockOffset,
    final long length,
    final boolean sendChecksum,
    final CachingStrategy cachingStrategy) throws IOException {

  OpReadBlockProto proto = OpReadBlockProto.newBuilder()
    .setHeader(DataTransferProtoUtil.buildClientHeader(blk, clientName, blockToken))
    .setOffset(blockOffset)
    .setLen(length)
    .setSendChecksums(sendChecksum)
    .setCachingStrategy(getCachingStrategy(cachingStrategy))
    .build();

  send(out, Op.READ_BLOCK, proto);
}
 
源代码5 项目: big-c   文件: TestBalancerWithMultipleNameNodes.java
private static ExtendedBlock[][] generateBlocks(Suite s, long size
    ) throws IOException, InterruptedException, TimeoutException {
  final ExtendedBlock[][] blocks = new ExtendedBlock[s.clients.length][];
  for(int n = 0; n < s.clients.length; n++) {
    final long fileLen = size/s.replication;
    createFile(s, n, fileLen);

    final List<LocatedBlock> locatedBlocks = s.clients[n].getBlockLocations(
        FILE_NAME, 0, fileLen).getLocatedBlocks();

    final int numOfBlocks = locatedBlocks.size();
    blocks[n] = new ExtendedBlock[numOfBlocks];
    for(int i = 0; i < numOfBlocks; i++) {
      final ExtendedBlock b = locatedBlocks.get(i).getBlock();
      blocks[n][i] = new ExtendedBlock(b.getBlockPoolId(), b.getBlockId(),
          b.getNumBytes(), b.getGenerationStamp());
    }
  }
  return blocks;
}
 
private static ExtendedBlock[][] generateBlocks(Suite s, long size
    ) throws IOException, InterruptedException, TimeoutException {
  final ExtendedBlock[][] blocks = new ExtendedBlock[s.clients.length][];
  for(int n = 0; n < s.clients.length; n++) {
    final long fileLen = size/s.replication;
    createFile(s, n, fileLen);

    final List<LocatedBlock> locatedBlocks = s.clients[n].getBlockLocations(
        FILE_NAME, 0, fileLen).getLocatedBlocks();

    final int numOfBlocks = locatedBlocks.size();
    blocks[n] = new ExtendedBlock[numOfBlocks];
    for(int i = 0; i < numOfBlocks; i++) {
      final ExtendedBlock b = locatedBlocks.get(i).getBlock();
      blocks[n][i] = new ExtendedBlock(b.getBlockPoolId(), b.getBlockId(),
          b.getNumBytes(), b.getGenerationStamp());
    }
  }
  return blocks;
}
 
源代码7 项目: big-c   文件: FsDatasetImpl.java
@Override  // FsDatasetSpi
public synchronized ReplicaHandler recoverAppend(
    ExtendedBlock b, long newGS, long expectedBlockLen) throws IOException {
  LOG.info("Recover failed append to " + b);

  ReplicaInfo replicaInfo = recoverCheck(b, newGS, expectedBlockLen);

  FsVolumeReference ref = replicaInfo.getVolume().obtainReference();
  ReplicaBeingWritten replica;
  try {
    // change the replica's state/gs etc.
    if (replicaInfo.getState() == ReplicaState.FINALIZED) {
      replica = append(b.getBlockPoolId(), (FinalizedReplica) replicaInfo,
                       newGS, b.getNumBytes());
    } else { //RBW
      bumpReplicaGS(replicaInfo, newGS);
      replica = (ReplicaBeingWritten) replicaInfo;
    }
  } catch (IOException e) {
    IOUtils.cleanup(null, ref);
    throw e;
  }
  return new ReplicaHandler(replica, ref);
}
 
源代码8 项目: big-c   文件: BlockReaderLocalLegacy.java
LocalDatanodeInfo() {
  final int cacheSize = 10000;
  final float hashTableLoadFactor = 0.75f;
  int hashTableCapacity = (int) Math.ceil(cacheSize / hashTableLoadFactor) + 1;
  cache = Collections
      .synchronizedMap(new LinkedHashMap<ExtendedBlock, BlockLocalPathInfo>(
          hashTableCapacity, hashTableLoadFactor, true) {
        private static final long serialVersionUID = 1;

        @Override
        protected boolean removeEldestEntry(
            Map.Entry<ExtendedBlock, BlockLocalPathInfo> eldest) {
          return size() > cacheSize;
        }
      });
}
 
源代码9 项目: big-c   文件: Sender.java
@Override
public void readBlock(final ExtendedBlock blk,
    final Token<BlockTokenIdentifier> blockToken,
    final String clientName,
    final long blockOffset,
    final long length,
    final boolean sendChecksum,
    final CachingStrategy cachingStrategy) throws IOException {

  OpReadBlockProto proto = OpReadBlockProto.newBuilder()
    .setHeader(DataTransferProtoUtil.buildClientHeader(blk, clientName, blockToken))
    .setOffset(blockOffset)
    .setLen(length)
    .setSendChecksums(sendChecksum)
    .setCachingStrategy(getCachingStrategy(cachingStrategy))
    .build();

  send(out, Op.READ_BLOCK, proto);
}
 
源代码10 项目: big-c   文件: FsDatasetImpl.java
/**
 * Remove the temporary block file (if any)
 */
@Override // FsDatasetSpi
public synchronized void unfinalizeBlock(ExtendedBlock b) throws IOException {
  ReplicaInfo replicaInfo = volumeMap.get(b.getBlockPoolId(), 
      b.getLocalBlock());
  if (replicaInfo != null && replicaInfo.getState() == ReplicaState.TEMPORARY) {
    // remove from volumeMap
    volumeMap.remove(b.getBlockPoolId(), b.getLocalBlock());
    
    // delete the on-disk temp file
    if (delBlockFromDisk(replicaInfo.getBlockFile(), 
        replicaInfo.getMetaFile(), b.getLocalBlock())) {
      LOG.warn("Block " + b + " unfinalized and removed. " );
    }
    if (replicaInfo.getVolume().isTransientStorage()) {
      ramDiskReplicaTracker.discardReplica(b.getBlockPoolId(), b.getBlockId(), true);
    }
  }
}
 
源代码11 项目: big-c   文件: Sender.java
@Override
public void requestShortCircuitFds(final ExtendedBlock blk,
    final Token<BlockTokenIdentifier> blockToken,
    SlotId slotId, int maxVersion, boolean supportsReceiptVerification)
      throws IOException {
  OpRequestShortCircuitAccessProto.Builder builder =
      OpRequestShortCircuitAccessProto.newBuilder()
        .setHeader(DataTransferProtoUtil.buildBaseHeader(
          blk, blockToken)).setMaxVersion(maxVersion);
  if (slotId != null) {
    builder.setSlotId(PBHelper.convert(slotId));
  }
  builder.setSupportsReceiptVerification(supportsReceiptVerification);
  OpRequestShortCircuitAccessProto proto = builder.build();
  send(out, Op.REQUEST_SHORT_CIRCUIT_FDS, proto);
}
 
源代码12 项目: big-c   文件: DatanodeUtil.java
/**
 * @return the FileInputStream for the meta data of the given block.
 * @throws FileNotFoundException
 *           if the file not found.
 * @throws ClassCastException
 *           if the underlying input stream is not a FileInputStream.
 */
public static FileInputStream getMetaDataInputStream(
    ExtendedBlock b, FsDatasetSpi<?> data) throws IOException {
  final LengthInputStream lin = data.getMetaDataInputStream(b);
  if (lin == null) {
    throw new FileNotFoundException("Meta file for " + b + " not found.");
  }
  return (FileInputStream)lin.getWrappedStream();
}
 
源代码13 项目: hadoop   文件: TestClientReportBadBlock.java
/**
 * Corrupt a block on a data node. Replace the block file content with content
 * of 1, 2, ...BLOCK_SIZE.
 * 
 * @param block
 *          the ExtendedBlock to be corrupted
 * @param dn
 *          the data node where the block needs to be corrupted
 * @throws FileNotFoundException
 * @throws IOException
 */
private static void corruptBlock(final ExtendedBlock block, final DataNode dn)
    throws FileNotFoundException, IOException {
  final File f = DataNodeTestUtils.getBlockFile(
      dn, block.getBlockPoolId(), block.getLocalBlock());
  final RandomAccessFile raFile = new RandomAccessFile(f, "rw");
  final byte[] bytes = new byte[(int) BLOCK_SIZE];
  for (int i = 0; i < BLOCK_SIZE; i++) {
    bytes[i] = (byte) (i);
  }
  raFile.write(bytes);
  raFile.close();
}
 
源代码14 项目: big-c   文件: TestUnderReplicatedBlocks.java
@Test(timeout=60000) // 1 min timeout
public void testSetrepIncWithUnderReplicatedBlocks() throws Exception {
  Configuration conf = new HdfsConfiguration();
  final short REPLICATION_FACTOR = 2;
  final String FILE_NAME = "/testFile";
  final Path FILE_PATH = new Path(FILE_NAME);
  MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf).numDataNodes(REPLICATION_FACTOR + 1).build();
  try {
    // create a file with one block with a replication factor of 2
    final FileSystem fs = cluster.getFileSystem();
    DFSTestUtil.createFile(fs, FILE_PATH, 1L, REPLICATION_FACTOR, 1L);
    DFSTestUtil.waitReplication(fs, FILE_PATH, REPLICATION_FACTOR);
    
    // remove one replica from the blocksMap so block becomes under-replicated
    // but the block does not get put into the under-replicated blocks queue
    final BlockManager bm = cluster.getNamesystem().getBlockManager();
    ExtendedBlock b = DFSTestUtil.getFirstBlock(fs, FILE_PATH);
    DatanodeDescriptor dn = bm.blocksMap.getStorages(b.getLocalBlock())
        .iterator().next().getDatanodeDescriptor();
    bm.addToInvalidates(b.getLocalBlock(), dn);
    Thread.sleep(5000);
    bm.blocksMap.removeNode(b.getLocalBlock(), dn);
    
    // increment this file's replication factor
    FsShell shell = new FsShell(conf);
    assertEquals(0, shell.run(new String[]{
        "-setrep", "-w", Integer.toString(1+REPLICATION_FACTOR), FILE_NAME}));
  } finally {
    cluster.shutdown();
  }
  
}
 
源代码15 项目: big-c   文件: TestClientReportBadBlock.java
/**
 * Corrupt a block on a data node. Replace the block file content with content
 * of 1, 2, ...BLOCK_SIZE.
 * 
 * @param block
 *          the ExtendedBlock to be corrupted
 * @param dn
 *          the data node where the block needs to be corrupted
 * @throws FileNotFoundException
 * @throws IOException
 */
private static void corruptBlock(final ExtendedBlock block, final DataNode dn)
    throws FileNotFoundException, IOException {
  final File f = DataNodeTestUtils.getBlockFile(
      dn, block.getBlockPoolId(), block.getLocalBlock());
  final RandomAccessFile raFile = new RandomAccessFile(f, "rw");
  final byte[] bytes = new byte[(int) BLOCK_SIZE];
  for (int i = 0; i < BLOCK_SIZE; i++) {
    bytes[i] = (byte) (i);
  }
  raFile.write(bytes);
  raFile.close();
}
 
源代码16 项目: hadoop   文件: TestProcessCorruptBlocks.java
/**
 * The corrupt block has to be removed when the number of valid replicas
 * matches replication factor for the file. In this test, the above 
 * condition is achieved by increasing the number of good replicas by 
 * replicating on a new Datanode. 
 * The test strategy : 
 *   Bring up Cluster with 3 DataNodes
 *   Create a file  of replication factor 3
 *   Corrupt one replica of a block of the file 
 *   Verify that there are still 2 good replicas and 1 corrupt replica 
 *     (corrupt replica should not be removed since number of good replicas
 *      (2) is less  than replication factor (3)) 
 *   Start a new data node 
 *   Verify that the a new replica is created and corrupt replica is
 *   removed.
 * 
 */
@Test
public void testByAddingAnExtraDataNode() throws Exception {
  Configuration conf = new HdfsConfiguration();
  conf.setLong(DFSConfigKeys.DFS_BLOCKREPORT_INTERVAL_MSEC_KEY, 1000L);
  conf.set(DFSConfigKeys.DFS_NAMENODE_REPLICATION_PENDING_TIMEOUT_SEC_KEY, Integer.toString(2));
  MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf).numDataNodes(4).build();
  FileSystem fs = cluster.getFileSystem();
  final FSNamesystem namesystem = cluster.getNamesystem();
  DataNodeProperties dnPropsFourth = cluster.stopDataNode(3);

  try {
    final Path fileName = new Path("/foo1");
    DFSTestUtil.createFile(fs, fileName, 2, (short) 3, 0L);
    DFSTestUtil.waitReplication(fs, fileName, (short) 3);

    ExtendedBlock block = DFSTestUtil.getFirstBlock(fs, fileName);
    corruptBlock(cluster, fs, fileName, 0, block);

    DFSTestUtil.waitReplication(fs, fileName, (short) 2);

    assertEquals(2, countReplicas(namesystem, block).liveReplicas());
    assertEquals(1, countReplicas(namesystem, block).corruptReplicas());

    cluster.restartDataNode(dnPropsFourth);

    DFSTestUtil.waitReplication(fs, fileName, (short) 3);

    assertEquals(3, countReplicas(namesystem, block).liveReplicas());
    assertEquals(0, countReplicas(namesystem, block).corruptReplicas());
  } finally {
    cluster.shutdown();
  }
}
 
@Override
public long getReplicaVisibleLength(ExtendedBlock b) throws IOException {
  GetReplicaVisibleLengthRequestProto req = GetReplicaVisibleLengthRequestProto
      .newBuilder().setBlock(PBHelper.convert(b)).build();
  try {
    return rpcProxy.getReplicaVisibleLength(NULL_CONTROLLER, req).getLength();
  } catch (ServiceException e) {
    throw ProtobufHelper.getRemoteException(e);
  }
}
 
源代码18 项目: hadoop   文件: DataNode.java
/** Notify the corresponding namenode to delete the block. */
public void notifyNamenodeDeletedBlock(ExtendedBlock block, String storageUuid) {
  BPOfferService bpos = blockPoolManager.get(block.getBlockPoolId());
  if (bpos != null) {
    bpos.notifyNamenodeDeletedBlock(block, storageUuid);
  } else {
    LOG.error("Cannot find BPOfferService for reporting block deleted for bpid="
        + block.getBlockPoolId());
  }
}
 
源代码19 项目: big-c   文件: NameNodeRpcServer.java
@Override // ClientProtocol
public LocatedBlock getAdditionalDatanode(final String src,
    final long fileId, final ExtendedBlock blk,
    final DatanodeInfo[] existings, final String[] existingStorageIDs,
    final DatanodeInfo[] excludes,
    final int numAdditionalNodes, final String clientName
    ) throws IOException {
  checkNNStartup();
  if (LOG.isDebugEnabled()) {
    LOG.debug("getAdditionalDatanode: src=" + src
        + ", fileId=" + fileId
        + ", blk=" + blk
        + ", existings=" + Arrays.asList(existings)
        + ", excludes=" + Arrays.asList(excludes)
        + ", numAdditionalNodes=" + numAdditionalNodes
        + ", clientName=" + clientName);
  }

  metrics.incrGetAdditionalDatanodeOps();

  Set<Node> excludeSet = null;
  if (excludes != null) {
    excludeSet = new HashSet<Node>(excludes.length);
    for (Node node : excludes) {
      excludeSet.add(node);
    }
  }
  return namesystem.getAdditionalDatanode(src, fileId, blk, existings,
      existingStorageIDs, excludeSet, numAdditionalNodes, clientName);
}
 
源代码20 项目: big-c   文件: PBHelper.java
public static ExtendedBlockProto convert(final ExtendedBlock b) {
  if (b == null) return null;
 return ExtendedBlockProto.newBuilder().
    setPoolId(b.getBlockPoolId()).
    setBlockId(b.getBlockId()).
    setNumBytes(b.getNumBytes()).
    setGenerationStamp(b.getGenerationStamp()).
    build();
}
 
源代码21 项目: hadoop   文件: DataNode.java
/**
 * Report a bad block which is hosted on the local DN.
 */
public void reportBadBlocks(ExtendedBlock block) throws IOException{
  BPOfferService bpos = getBPOSForBlock(block);
  FsVolumeSpi volume = getFSDataset().getVolume(block);
  bpos.reportBadBlocks(
      block, volume.getStorageID(), volume.getStorageType());
}
 
源代码22 项目: hadoop   文件: SimulatedFSDataset.java
@Override // FsDatasetSpi
public synchronized boolean isValidBlock(ExtendedBlock b) {
  try {
    checkBlock(b, 0, ReplicaState.FINALIZED);
  } catch (IOException e) {
    return false;
  }
  return true;
}
 
private static String getFirstBlockId(FileSystem fileSystem, Path realFile) throws IOException {
  FileStatus fileStatus = fileSystem.getFileStatus(realFile);
  BlockLocation[] locations = fileSystem.getFileBlockLocations(fileStatus, 0, 1);
  HdfsBlockLocation location = (HdfsBlockLocation) locations[0];
  LocatedBlock locatedBlock = location.getLocatedBlock();
  ExtendedBlock block = locatedBlock.getBlock();
  return toNiceString(block.getBlockId());
}
 
源代码24 项目: hadoop   文件: DataNode.java
private void checkBlockToken(ExtendedBlock block, Token<BlockTokenIdentifier> token,
    AccessMode accessMode) throws IOException {
  if (isBlockTokenEnabled) {
    BlockTokenIdentifier id = new BlockTokenIdentifier();
    ByteArrayInputStream buf = new ByteArrayInputStream(token.getIdentifier());
    DataInputStream in = new DataInputStream(buf);
    id.readFields(in);
    if (LOG.isDebugEnabled()) {
      LOG.debug("Got: " + id.toString());
    }
    blockPoolTokenSecretManager.checkAccess(id, null, block, accessMode);
  }
}
 
源代码25 项目: big-c   文件: SimulatedFSDataset.java
/**
 * Check if a block is valid.
 *
 * @param b           The block to check.
 * @param minLength   The minimum length that the block must have.  May be 0.
 * @param state       If this is null, it is ignored.  If it is non-null, we
 *                        will check that the replica has this state.
 *
 * @throws ReplicaNotFoundException          If the replica is not found
 *
 * @throws UnexpectedReplicaStateException   If the replica is not in the 
 *                                             expected state.
 */
@Override // {@link FsDatasetSpi}
public void checkBlock(ExtendedBlock b, long minLength, ReplicaState state)
    throws ReplicaNotFoundException, UnexpectedReplicaStateException {
  final BInfo binfo = getBInfo(b);
  
  if (binfo == null) {
    throw new ReplicaNotFoundException(b);
  }
  if ((state == ReplicaState.FINALIZED && !binfo.isFinalized()) ||
      (state != ReplicaState.FINALIZED && binfo.isFinalized())) {
    throw new UnexpectedReplicaStateException(b,state);
  }
}
 
源代码26 项目: hadoop   文件: DataNode.java
protected void notifyNamenodeReceivingBlock(
    ExtendedBlock block, String storageUuid) {
  BPOfferService bpos = blockPoolManager.get(block.getBlockPoolId());
  if(bpos != null) {
    bpos.notifyNamenodeReceivingBlock(block, storageUuid);
  } else {
    LOG.error("Cannot find BPOfferService for reporting block receiving for bpid="
        + block.getBlockPoolId());
  }
}
 
源代码27 项目: big-c   文件: DataNode.java
/**
 * Transfer a replica to the datanode targets.
 * @param b the block to transfer.
 *          The corresponding replica must be an RBW or a Finalized.
 *          Its GS and numBytes will be set to
 *          the stored GS and the visible length. 
 * @param targets targets to transfer the block to
 * @param client client name
 */
void transferReplicaForPipelineRecovery(final ExtendedBlock b,
    final DatanodeInfo[] targets, final StorageType[] targetStorageTypes,
    final String client) throws IOException {
  final long storedGS;
  final long visible;
  final BlockConstructionStage stage;

  //get replica information
  synchronized(data) {
    Block storedBlock = data.getStoredBlock(b.getBlockPoolId(),
        b.getBlockId());
    if (null == storedBlock) {
      throw new IOException(b + " not found in datanode.");
    }
    storedGS = storedBlock.getGenerationStamp();
    if (storedGS < b.getGenerationStamp()) {
      throw new IOException(storedGS
          + " = storedGS < b.getGenerationStamp(), b=" + b);
    }
    // Update the genstamp with storedGS
    b.setGenerationStamp(storedGS);
    if (data.isValidRbw(b)) {
      stage = BlockConstructionStage.TRANSFER_RBW;
    } else if (data.isValidBlock(b)) {
      stage = BlockConstructionStage.TRANSFER_FINALIZED;
    } else {
      final String r = data.getReplicaString(b.getBlockPoolId(), b.getBlockId());
      throw new IOException(b + " is neither a RBW nor a Finalized, r=" + r);
    }
    visible = data.getReplicaVisibleLength(b);
  }
  //set visible length
  b.setNumBytes(visible);

  if (targets.length > 0) {
    new DataTransfer(targets, targetStorageTypes, b, stage, client).run();
  }
}
 
源代码28 项目: big-c   文件: TestWriteBlockGetsBlockLengthHint.java
/**
 * Override createRbw to verify that the block length that is passed
 * is correct. This requires both DFSOutputStream and BlockReceiver to
 * correctly propagate the hint to FsDatasetSpi.
 */
@Override
public synchronized ReplicaHandler createRbw(
    StorageType storageType, ExtendedBlock b, boolean allowLazyPersist)
    throws IOException {
  assertThat(b.getLocalBlock().getNumBytes(), is(EXPECTED_BLOCK_LENGTH));
  return super.createRbw(storageType, b, allowLazyPersist);
}
 
源代码29 项目: hadoop   文件: TestDFSUtil.java
/**
 * Test conversion of LocatedBlock to BlockLocation
 */
@Test
public void testLocatedBlocks2Locations() {
  DatanodeInfo d = DFSTestUtil.getLocalDatanodeInfo();
  DatanodeInfo[] ds = new DatanodeInfo[1];
  ds[0] = d;

  // ok
  ExtendedBlock b1 = new ExtendedBlock("bpid", 1, 1, 1);
  LocatedBlock l1 = new LocatedBlock(b1, ds, 0, false);

  // corrupt
  ExtendedBlock b2 = new ExtendedBlock("bpid", 2, 1, 1);
  LocatedBlock l2 = new LocatedBlock(b2, ds, 0, true);

  List<LocatedBlock> ls = Arrays.asList(l1, l2);
  LocatedBlocks lbs = new LocatedBlocks(10, false, ls, l2, true, null);

  BlockLocation[] bs = DFSUtil.locatedBlocks2Locations(lbs);

  assertTrue("expected 2 blocks but got " + bs.length,
             bs.length == 2);

  int corruptCount = 0;
  for (BlockLocation b: bs) {
    if (b.isCorrupt()) {
      corruptCount++;
    }
  }

  assertTrue("expected 1 corrupt files but got " + corruptCount,
      corruptCount == 1);

  // test an empty location
  bs = DFSUtil.locatedBlocks2Locations(new LocatedBlocks());
  assertEquals(0, bs.length);
}
 
源代码30 项目: hadoop   文件: BlockSender.java
private static Replica getReplica(ExtendedBlock block, DataNode datanode)
    throws ReplicaNotFoundException {
  Replica replica = datanode.data.getReplica(block.getBlockPoolId(),
      block.getBlockId());
  if (replica == null) {
    throw new ReplicaNotFoundException(block);
  }
  return replica;
}