org.apache.hadoop.io.IOUtils#skipFully ( )源码实例Demo

下面列出了org.apache.hadoop.io.IOUtils#skipFully ( ) 实例代码,或者点击链接到github查看源代码,也可以在右侧发表评论。

源代码1 项目: hadoop   文件: InputStreamEntity.java
@Override
public void write(OutputStream os) throws IOException {
  IOUtils.skipFully(is, offset);
  if (len == -1) {
    IOUtils.copyBytes(is, os, 4096, true);
  } else {
    IOUtils.copyBytes(is, os, len, true);
  }
}
 
源代码2 项目: hadoop   文件: FSEditLogOp.java
private void verifyTerminator() throws IOException {
  /** The end of the edit log should contain only 0x00 or 0xff bytes.
   * If it contains other bytes, the log itself may be corrupt.
   * It is important to check this; if we don't, a stray OP_INVALID byte 
   * could make us stop reading the edit log halfway through, and we'd never
   * know that we had lost data.
   */
  byte[] buf = new byte[4096];
  limiter.clearLimit();
  int numRead = -1, idx = 0;
  while (true) {
    try {
      numRead = -1;
      idx = 0;
      numRead = in.read(buf);
      if (numRead == -1) {
        return;
      }
      while (idx < numRead) {
        if ((buf[idx] != (byte)0) && (buf[idx] != (byte)-1)) {
          throw new IOException("Read extra bytes after " +
            "the terminator!");
        }
        idx++;
      }
    } finally {
      // After reading each group of bytes, we reposition the mark one
      // byte before the next group.  Similarly, if there is an error, we
      // want to reposition the mark one byte before the error
      if (numRead != -1) { 
        in.reset();
        IOUtils.skipFully(in, idx);
        in.mark(buf.length + 1);
        IOUtils.skipFully(in, 1);
      }
    }
  }
}
 
源代码3 项目: hadoop   文件: FSEditLogOp.java
/**
 * Similar with decodeOp(), but instead of doing the real decoding, we skip
 * the content of the op if the length of the editlog is supported.
 * @return the last txid of the segment, or INVALID_TXID on exception
 */
public long scanOp() throws IOException {
  if (supportEditLogLength) {
    limiter.setLimit(maxOpSize);
    in.mark(maxOpSize);

    final byte opCodeByte;
    try {
      opCodeByte = in.readByte(); // op code
    } catch (EOFException e) {
      return HdfsConstants.INVALID_TXID;
    }

    FSEditLogOpCodes opCode = FSEditLogOpCodes.fromByte(opCodeByte);
    if (opCode == OP_INVALID) {
      verifyTerminator();
      return HdfsConstants.INVALID_TXID;
    }

    int length = in.readInt(); // read the length of the op
    long txid = in.readLong(); // read the txid

    // skip the remaining content
    IOUtils.skipFully(in, length - 8); 
    // TODO: do we want to verify checksum for JN? For now we don't.
    return txid;
  } else {
    FSEditLogOp op = decodeOp();
    return op == null ? HdfsConstants.INVALID_TXID : op.getTransactionId();
  }
}
 
源代码4 项目: hadoop   文件: TestShortCircuitLocalRead.java
private boolean checkUnsupportedMethod(FileSystem fs, Path file,
                                         byte[] expected, int readOffset) throws IOException {
  HdfsDataInputStream stm = (HdfsDataInputStream)fs.open(file);
  ByteBuffer actual = ByteBuffer.allocateDirect(expected.length - readOffset);
  IOUtils.skipFully(stm, readOffset);
  try {
    stm.read(actual);
  } catch(UnsupportedOperationException unex) {
    return true;
  }
  return false;
}
 
源代码5 项目: hadoop   文件: TestFSInputChecker.java
private void testSkip1(int skippedBytes) 
throws Exception {
  long oldPos = stm.getPos();
  IOUtils.skipFully(stm, skippedBytes);
  long newPos = oldPos + skippedBytes;
  assertEquals(stm.getPos(), newPos);
  stm.readFully(actual);
  checkAndEraseData(actual, (int)newPos, expected, "Read Sanity Test");
}
 
源代码6 项目: big-c   文件: TestFSInputChecker.java
private void testSkip1(int skippedBytes) 
throws Exception {
  long oldPos = stm.getPos();
  IOUtils.skipFully(stm, skippedBytes);
  long newPos = oldPos + skippedBytes;
  assertEquals(stm.getPos(), newPos);
  stm.readFully(actual);
  checkAndEraseData(actual, (int)newPos, expected, "Read Sanity Test");
}
 
源代码7 项目: big-c   文件: FSEditLogOp.java
private void verifyTerminator() throws IOException {
  /** The end of the edit log should contain only 0x00 or 0xff bytes.
   * If it contains other bytes, the log itself may be corrupt.
   * It is important to check this; if we don't, a stray OP_INVALID byte 
   * could make us stop reading the edit log halfway through, and we'd never
   * know that we had lost data.
   */
  byte[] buf = new byte[4096];
  limiter.clearLimit();
  int numRead = -1, idx = 0;
  while (true) {
    try {
      numRead = -1;
      idx = 0;
      numRead = in.read(buf);
      if (numRead == -1) {
        return;
      }
      while (idx < numRead) {
        if ((buf[idx] != (byte)0) && (buf[idx] != (byte)-1)) {
          throw new IOException("Read extra bytes after " +
            "the terminator!");
        }
        idx++;
      }
    } finally {
      // After reading each group of bytes, we reposition the mark one
      // byte before the next group.  Similarly, if there is an error, we
      // want to reposition the mark one byte before the error
      if (numRead != -1) { 
        in.reset();
        IOUtils.skipFully(in, idx);
        in.mark(buf.length + 1);
        IOUtils.skipFully(in, 1);
      }
    }
  }
}
 
源代码8 项目: big-c   文件: FSEditLogOp.java
/**
 * Similar with decodeOp(), but instead of doing the real decoding, we skip
 * the content of the op if the length of the editlog is supported.
 * @return the last txid of the segment, or INVALID_TXID on exception
 */
public long scanOp() throws IOException {
  if (supportEditLogLength) {
    limiter.setLimit(maxOpSize);
    in.mark(maxOpSize);

    final byte opCodeByte;
    try {
      opCodeByte = in.readByte(); // op code
    } catch (EOFException e) {
      return HdfsConstants.INVALID_TXID;
    }

    FSEditLogOpCodes opCode = FSEditLogOpCodes.fromByte(opCodeByte);
    if (opCode == OP_INVALID) {
      verifyTerminator();
      return HdfsConstants.INVALID_TXID;
    }

    int length = in.readInt(); // read the length of the op
    long txid = in.readLong(); // read the txid

    // skip the remaining content
    IOUtils.skipFully(in, length - 8); 
    // TODO: do we want to verify checksum for JN? For now we don't.
    return txid;
  } else {
    FSEditLogOp op = decodeOp();
    return op == null ? HdfsConstants.INVALID_TXID : op.getTransactionId();
  }
}
 
源代码9 项目: big-c   文件: SimulatedFSDataset.java
@Override // FsDatasetSpi
public synchronized InputStream getBlockInputStream(ExtendedBlock b,
    long seekOffset) throws IOException {
  InputStream result = getBlockInputStream(b);
  IOUtils.skipFully(result, seekOffset);
  return result;
}
 
源代码10 项目: big-c   文件: TestShortCircuitLocalRead.java
private boolean checkUnsupportedMethod(FileSystem fs, Path file,
                                         byte[] expected, int readOffset) throws IOException {
  HdfsDataInputStream stm = (HdfsDataInputStream)fs.open(file);
  ByteBuffer actual = ByteBuffer.allocateDirect(expected.length - readOffset);
  IOUtils.skipFully(stm, readOffset);
  try {
    stm.read(actual);
  } catch(UnsupportedOperationException unex) {
    return true;
  }
  return false;
}
 
源代码11 项目: hadoop   文件: BlockReaderLocalLegacy.java
private BlockReaderLocalLegacy(DFSClient.Conf conf, String hdfsfile,
    ExtendedBlock block, Token<BlockTokenIdentifier> token, long startOffset,
    long length, BlockLocalPathInfo pathinfo, DataChecksum checksum,
    boolean verifyChecksum, FileInputStream dataIn, long firstChunkOffset,
    FileInputStream checksumIn) throws IOException {
  this.filename = hdfsfile;
  this.checksum = checksum;
  this.verifyChecksum = verifyChecksum;
  this.startOffset = Math.max(startOffset, 0);
  this.blockId = block.getBlockId();

  bytesPerChecksum = this.checksum.getBytesPerChecksum();
  checksumSize = this.checksum.getChecksumSize();

  this.dataIn = dataIn;
  this.checksumIn = checksumIn;
  this.offsetFromChunkBoundary = (int) (startOffset-firstChunkOffset);

  int chunksPerChecksumRead = getSlowReadBufferNumChunks(
      conf.shortCircuitBufferSize, bytesPerChecksum);
  slowReadBuff = bufferPool.getBuffer(bytesPerChecksum * chunksPerChecksumRead);
  checksumBuff = bufferPool.getBuffer(checksumSize * chunksPerChecksumRead);
  // Initially the buffers have nothing to read.
  slowReadBuff.flip();
  checksumBuff.flip();
  boolean success = false;
  try {
    // Skip both input streams to beginning of the chunk containing startOffset
    IOUtils.skipFully(dataIn, firstChunkOffset);
    if (checksumIn != null) {
      long checkSumOffset = (firstChunkOffset / bytesPerChecksum) * checksumSize;
      IOUtils.skipFully(checksumIn, checkSumOffset);
    }
    success = true;
  } finally {
    if (!success) {
      bufferPool.returnBuffer(slowReadBuff);
      bufferPool.returnBuffer(checksumBuff);
    }
  }
}
 
源代码12 项目: hadoop   文件: EditLogFileInputStream.java
private FSEditLogOp nextOpImpl(boolean skipBrokenEdits) throws IOException {
  FSEditLogOp op = null;
  switch (state) {
  case UNINIT:
    try {
      init(true);
    } catch (Throwable e) {
      LOG.error("caught exception initializing " + this, e);
      if (skipBrokenEdits) {
        return null;
      }
      Throwables.propagateIfPossible(e, IOException.class);
    }
    Preconditions.checkState(state != State.UNINIT);
    return nextOpImpl(skipBrokenEdits);
  case OPEN:
    op = reader.readOp(skipBrokenEdits);
    if ((op != null) && (op.hasTransactionId())) {
      long txId = op.getTransactionId();
      if ((txId >= lastTxId) &&
          (lastTxId != HdfsConstants.INVALID_TXID)) {
        //
        // Sometimes, the NameNode crashes while it's writing to the
        // edit log.  In that case, you can end up with an unfinalized edit log
        // which has some garbage at the end.
        // JournalManager#recoverUnfinalizedSegments will finalize these
        // unfinished edit logs, giving them a defined final transaction 
        // ID.  Then they will be renamed, so that any subsequent
        // readers will have this information.
        //
        // Since there may be garbage at the end of these "cleaned up"
        // logs, we want to be sure to skip it here if we've read everything
        // we were supposed to read out of the stream.
        // So we force an EOF on all subsequent reads.
        //
        long skipAmt = log.length() - tracker.getPos();
        if (skipAmt > 0) {
          if (LOG.isDebugEnabled()) {
              LOG.debug("skipping " + skipAmt + " bytes at the end " +
                "of edit log  '" + getName() + "': reached txid " + txId +
                " out of " + lastTxId);
          }
          tracker.clearLimit();
          IOUtils.skipFully(tracker, skipAmt);
        }
      }
    }
    break;
    case CLOSED:
      break; // return null
  }
  return op;
}
 
源代码13 项目: hadoop   文件: TestEnhancedByteBufferAccess.java
@Test
public void testClientMmapDisable() throws Exception {
  HdfsConfiguration conf = initZeroCopyTest();
  conf.setBoolean(DFS_CLIENT_MMAP_ENABLED, false);
  MiniDFSCluster cluster = null;
  final Path TEST_PATH = new Path("/a");
  final int TEST_FILE_LENGTH = 16385;
  final int RANDOM_SEED = 23453;
  final String CONTEXT = "testClientMmapDisable";
  FSDataInputStream fsIn = null;
  DistributedFileSystem fs = null;
  conf.set(DFSConfigKeys.DFS_CLIENT_CONTEXT, CONTEXT);

  try {
    // With DFS_CLIENT_MMAP_ENABLED set to false, we should not do memory
    // mapped reads.
    cluster = new MiniDFSCluster.Builder(conf).numDataNodes(1).build();
    cluster.waitActive();
    fs = cluster.getFileSystem();
    DFSTestUtil.createFile(fs, TEST_PATH,
        TEST_FILE_LENGTH, (short)1, RANDOM_SEED);
    DFSTestUtil.waitReplication(fs, TEST_PATH, (short)1);
    fsIn = fs.open(TEST_PATH);
    try {
      fsIn.read(null, 1, EnumSet.of(ReadOption.SKIP_CHECKSUMS));
      Assert.fail("expected zero-copy read to fail when client mmaps " +
          "were disabled.");
    } catch (UnsupportedOperationException e) {
    }
  } finally {
    if (fsIn != null) fsIn.close();
    if (fs != null) fs.close();
    if (cluster != null) cluster.shutdown();
  }

  fsIn = null;
  fs = null;
  cluster = null;
  try {
    // Now try again with DFS_CLIENT_MMAP_CACHE_SIZE == 0.  It should work.
    conf.setBoolean(DFS_CLIENT_MMAP_ENABLED, true);
    conf.setInt(DFS_CLIENT_MMAP_CACHE_SIZE, 0);
    conf.set(DFSConfigKeys.DFS_CLIENT_CONTEXT, CONTEXT + ".1");
    cluster = new MiniDFSCluster.Builder(conf).numDataNodes(1).build();
    cluster.waitActive();
    fs = cluster.getFileSystem();
    DFSTestUtil.createFile(fs, TEST_PATH,
        TEST_FILE_LENGTH, (short)1, RANDOM_SEED);
    DFSTestUtil.waitReplication(fs, TEST_PATH, (short)1);
    fsIn = fs.open(TEST_PATH);
    ByteBuffer buf = fsIn.read(null, 1, EnumSet.of(ReadOption.SKIP_CHECKSUMS));
    fsIn.releaseBuffer(buf);
    // Test EOF behavior
    IOUtils.skipFully(fsIn, TEST_FILE_LENGTH - 1);
    buf = fsIn.read(null, 1, EnumSet.of(ReadOption.SKIP_CHECKSUMS));
    Assert.assertEquals(null, buf);
  } finally {
    if (fsIn != null) fsIn.close();
    if (fs != null) fs.close();
    if (cluster != null) cluster.shutdown();
  }
}
 
源代码14 项目: hadoop   文件: TestShortCircuitLocalRead.java
/** Check file content, reading as user {@code readingUser} */
static void checkFileContent(URI uri, Path name, byte[] expected,
    int readOffset, String readingUser, Configuration conf,
    boolean legacyShortCircuitFails)
    throws IOException, InterruptedException {
  // Ensure short circuit is enabled
  DistributedFileSystem fs = getFileSystem(readingUser, uri, conf);
  ClientContext getClientContext = ClientContext.getFromConf(conf);
  if (legacyShortCircuitFails) {
    assertFalse(getClientContext.getDisableLegacyBlockReaderLocal());
  }
  
  FSDataInputStream stm = fs.open(name);
  byte[] actual = new byte[expected.length-readOffset];
  stm.readFully(readOffset, actual);
  checkData(actual, readOffset, expected, "Read 2");
  stm.close();
  // Now read using a different API.
  actual = new byte[expected.length-readOffset];
  stm = fs.open(name);
  IOUtils.skipFully(stm, readOffset);
  //Read a small number of bytes first.
  int nread = stm.read(actual, 0, 3);
  nread += stm.read(actual, nread, 2);
  //Read across chunk boundary
  nread += stm.read(actual, nread, 517);
  checkData(actual, readOffset, expected, nread, "A few bytes");
  //Now read rest of it
  while (nread < actual.length) {
    int nbytes = stm.read(actual, nread, actual.length - nread);
    if (nbytes < 0) {
      throw new EOFException("End of file reached before reading fully.");
    }
    nread += nbytes;
  }
  checkData(actual, readOffset, expected, "Read 3");
  
  if (legacyShortCircuitFails) {
    assertTrue(getClientContext.getDisableLegacyBlockReaderLocal());
  }
  stm.close();
}
 
源代码15 项目: hadoop   文件: TestShortCircuitLocalRead.java
/** Check the file content, reading as user {@code readingUser} */
static void checkFileContentDirect(URI uri, Path name, byte[] expected,
    int readOffset, String readingUser, Configuration conf,
    boolean legacyShortCircuitFails)
    throws IOException, InterruptedException {
  // Ensure short circuit is enabled
  DistributedFileSystem fs = getFileSystem(readingUser, uri, conf);
  ClientContext clientContext = ClientContext.getFromConf(conf);
  if (legacyShortCircuitFails) {
    assertTrue(clientContext.getDisableLegacyBlockReaderLocal());
  }
  
  HdfsDataInputStream stm = (HdfsDataInputStream)fs.open(name);

  ByteBuffer actual = ByteBuffer.allocateDirect(expected.length - readOffset);

  IOUtils.skipFully(stm, readOffset);

  actual.limit(3);

  //Read a small number of bytes first.
  int nread = stm.read(actual);
  actual.limit(nread + 2);
  nread += stm.read(actual);

  // Read across chunk boundary
  actual.limit(Math.min(actual.capacity(), nread + 517));
  nread += stm.read(actual);
  checkData(arrayFromByteBuffer(actual), readOffset, expected, nread,
      "A few bytes");
  //Now read rest of it
  actual.limit(actual.capacity());
  while (actual.hasRemaining()) {
    int nbytes = stm.read(actual);

    if (nbytes < 0) {
      throw new EOFException("End of file reached before reading fully.");
    }
    nread += nbytes;
  }
  checkData(arrayFromByteBuffer(actual), readOffset, expected, "Read 3");
  if (legacyShortCircuitFails) {
    assertTrue(clientContext.getDisableLegacyBlockReaderLocal());
  }
  stm.close();
}
 
源代码16 项目: hbase   文件: BlockingRpcConnection.java
private void readResponse() {
  Call call = null;
  boolean expectedCall = false;
  try {
    // See HBaseServer.Call.setResponse for where we write out the response.
    // Total size of the response. Unused. But have to read it in anyways.
    int totalSize = in.readInt();

    // Read the header
    ResponseHeader responseHeader = ResponseHeader.parseDelimitedFrom(in);
    int id = responseHeader.getCallId();
    call = calls.remove(id); // call.done have to be set before leaving this method
    expectedCall = (call != null && !call.isDone());
    if (!expectedCall) {
      // So we got a response for which we have no corresponding 'call' here on the client-side.
      // We probably timed out waiting, cleaned up all references, and now the server decides
      // to return a response. There is nothing we can do w/ the response at this stage. Clean
      // out the wire of the response so its out of the way and we can get other responses on
      // this connection.
      int readSoFar = getTotalSizeWhenWrittenDelimited(responseHeader);
      int whatIsLeftToRead = totalSize - readSoFar;
      IOUtils.skipFully(in, whatIsLeftToRead);
      if (call != null) {
        call.callStats.setResponseSizeBytes(totalSize);
        call.callStats
            .setCallTimeMs(EnvironmentEdgeManager.currentTime() - call.callStats.getStartTime());
      }
      return;
    }
    if (responseHeader.hasException()) {
      ExceptionResponse exceptionResponse = responseHeader.getException();
      RemoteException re = createRemoteException(exceptionResponse);
      call.setException(re);
      call.callStats.setResponseSizeBytes(totalSize);
      call.callStats
          .setCallTimeMs(EnvironmentEdgeManager.currentTime() - call.callStats.getStartTime());
      if (isFatalConnectionException(exceptionResponse)) {
        synchronized (this) {
          closeConn(re);
        }
      }
    } else {
      Message value = null;
      if (call.responseDefaultType != null) {
        Builder builder = call.responseDefaultType.newBuilderForType();
        ProtobufUtil.mergeDelimitedFrom(builder, in);
        value = builder.build();
      }
      CellScanner cellBlockScanner = null;
      if (responseHeader.hasCellBlockMeta()) {
        int size = responseHeader.getCellBlockMeta().getLength();
        byte[] cellBlock = new byte[size];
        IOUtils.readFully(this.in, cellBlock, 0, cellBlock.length);
        cellBlockScanner = this.rpcClient.cellBlockBuilder.createCellScanner(this.codec,
          this.compressor, cellBlock);
      }
      call.setResponse(value, cellBlockScanner);
      call.callStats.setResponseSizeBytes(totalSize);
      call.callStats
          .setCallTimeMs(EnvironmentEdgeManager.currentTime() - call.callStats.getStartTime());
    }
  } catch (IOException e) {
    if (expectedCall) {
      call.setException(e);
    }
    if (e instanceof SocketTimeoutException) {
      // Clean up open calls but don't treat this as a fatal condition,
      // since we expect certain responses to not make it by the specified
      // {@link ConnectionId#rpcTimeout}.
      if (LOG.isTraceEnabled()) {
        LOG.trace("ignored", e);
      }
    } else {
      synchronized (this) {
        closeConn(e);
      }
    }
  }
}
 
源代码17 项目: big-c   文件: BlockReaderLocalLegacy.java
private BlockReaderLocalLegacy(DFSClient.Conf conf, String hdfsfile,
    ExtendedBlock block, Token<BlockTokenIdentifier> token, long startOffset,
    long length, BlockLocalPathInfo pathinfo, DataChecksum checksum,
    boolean verifyChecksum, FileInputStream dataIn, long firstChunkOffset,
    FileInputStream checksumIn) throws IOException {
  this.filename = hdfsfile;
  this.checksum = checksum;
  this.verifyChecksum = verifyChecksum;
  this.startOffset = Math.max(startOffset, 0);
  this.blockId = block.getBlockId();

  bytesPerChecksum = this.checksum.getBytesPerChecksum();
  checksumSize = this.checksum.getChecksumSize();

  this.dataIn = dataIn;
  this.checksumIn = checksumIn;
  this.offsetFromChunkBoundary = (int) (startOffset-firstChunkOffset);

  int chunksPerChecksumRead = getSlowReadBufferNumChunks(
      conf.shortCircuitBufferSize, bytesPerChecksum);
  slowReadBuff = bufferPool.getBuffer(bytesPerChecksum * chunksPerChecksumRead);
  checksumBuff = bufferPool.getBuffer(checksumSize * chunksPerChecksumRead);
  // Initially the buffers have nothing to read.
  slowReadBuff.flip();
  checksumBuff.flip();
  boolean success = false;
  try {
    // Skip both input streams to beginning of the chunk containing startOffset
    IOUtils.skipFully(dataIn, firstChunkOffset);
    if (checksumIn != null) {
      long checkSumOffset = (firstChunkOffset / bytesPerChecksum) * checksumSize;
      IOUtils.skipFully(checksumIn, checkSumOffset);
    }
    success = true;
  } finally {
    if (!success) {
      bufferPool.returnBuffer(slowReadBuff);
      bufferPool.returnBuffer(checksumBuff);
    }
  }
}
 
源代码18 项目: big-c   文件: EditLogFileInputStream.java
private FSEditLogOp nextOpImpl(boolean skipBrokenEdits) throws IOException {
  FSEditLogOp op = null;
  switch (state) {
  case UNINIT:
    try {
      init(true);
    } catch (Throwable e) {
      LOG.error("caught exception initializing " + this, e);
      if (skipBrokenEdits) {
        return null;
      }
      Throwables.propagateIfPossible(e, IOException.class);
    }
    Preconditions.checkState(state != State.UNINIT);
    return nextOpImpl(skipBrokenEdits);
  case OPEN:
    op = reader.readOp(skipBrokenEdits);
    if ((op != null) && (op.hasTransactionId())) {
      long txId = op.getTransactionId();
      if ((txId >= lastTxId) &&
          (lastTxId != HdfsConstants.INVALID_TXID)) {
        //
        // Sometimes, the NameNode crashes while it's writing to the
        // edit log.  In that case, you can end up with an unfinalized edit log
        // which has some garbage at the end.
        // JournalManager#recoverUnfinalizedSegments will finalize these
        // unfinished edit logs, giving them a defined final transaction 
        // ID.  Then they will be renamed, so that any subsequent
        // readers will have this information.
        //
        // Since there may be garbage at the end of these "cleaned up"
        // logs, we want to be sure to skip it here if we've read everything
        // we were supposed to read out of the stream.
        // So we force an EOF on all subsequent reads.
        //
        long skipAmt = log.length() - tracker.getPos();
        if (skipAmt > 0) {
          if (LOG.isDebugEnabled()) {
              LOG.debug("skipping " + skipAmt + " bytes at the end " +
                "of edit log  '" + getName() + "': reached txid " + txId +
                " out of " + lastTxId);
          }
          tracker.clearLimit();
          IOUtils.skipFully(tracker, skipAmt);
        }
      }
    }
    break;
    case CLOSED:
      break; // return null
  }
  return op;
}
 
源代码19 项目: big-c   文件: TestShortCircuitLocalRead.java
/** Check file content, reading as user {@code readingUser} */
static void checkFileContent(URI uri, Path name, byte[] expected,
    int readOffset, String readingUser, Configuration conf,
    boolean legacyShortCircuitFails)
    throws IOException, InterruptedException {
  // Ensure short circuit is enabled
  DistributedFileSystem fs = getFileSystem(readingUser, uri, conf);
  ClientContext getClientContext = ClientContext.getFromConf(conf);
  if (legacyShortCircuitFails) {
    assertFalse(getClientContext.getDisableLegacyBlockReaderLocal());
  }
  
  FSDataInputStream stm = fs.open(name);
  byte[] actual = new byte[expected.length-readOffset];
  stm.readFully(readOffset, actual);
  checkData(actual, readOffset, expected, "Read 2");
  stm.close();
  // Now read using a different API.
  actual = new byte[expected.length-readOffset];
  stm = fs.open(name);
  IOUtils.skipFully(stm, readOffset);
  //Read a small number of bytes first.
  int nread = stm.read(actual, 0, 3);
  nread += stm.read(actual, nread, 2);
  //Read across chunk boundary
  nread += stm.read(actual, nread, 517);
  checkData(actual, readOffset, expected, nread, "A few bytes");
  //Now read rest of it
  while (nread < actual.length) {
    int nbytes = stm.read(actual, nread, actual.length - nread);
    if (nbytes < 0) {
      throw new EOFException("End of file reached before reading fully.");
    }
    nread += nbytes;
  }
  checkData(actual, readOffset, expected, "Read 3");
  
  if (legacyShortCircuitFails) {
    assertTrue(getClientContext.getDisableLegacyBlockReaderLocal());
  }
  stm.close();
}
 
源代码20 项目: RDFS   文件: BlockXCodingSender.java
private void initialize(int namespaceId, Block block, long blockLength,
		long startOffset, long length, boolean corruptChecksumOk, 
		boolean verifyChecksum, DataInputStream metadataIn,
		InputStreamFactory streamFactory)
		throws IOException {
	try {
		this.block = block;
		this.corruptChecksumOk = corruptChecksumOk;
		this.verifyChecksum = verifyChecksum;
		this.blockLength = blockLength;
		
		this.conf = new Configuration();
		this.packetSize = conf.getInt("raid.blockreconstruct.packetsize", 4096);
		if (!corruptChecksumOk || metadataIn != null) {
			this.checksumIn = metadataIn;

			// read and handle the common header here. For now just a
			// version
			BlockMetadataHeader header = BlockMetadataHeader
					.readHeader(checksumIn);
			short version = header.getVersion();

			if (version != FSDataset.METADATA_VERSION) {
				LOG.warn("NTar:Wrong version (" + version
						+ ") for metadata file for " + block
						+ " ignoring ...");
			}
			checksum = header.getChecksum();
		} else {
			if (!ignoreChecksum) {
				LOG.warn("NTar:Could not find metadata file for " + block);
			}
			// This only decides the buffer size. Use BUFFER_SIZE?
			checksum = DataChecksum.newDataChecksum(
					DataChecksum.CHECKSUM_CRC32, 512);
		}

		/*
		 * If bytesPerChecksum is very large, then the metadata file is
		 * mostly corrupted. For now just truncate bytesPerchecksum to
		 * blockLength.
		 */
		bytesPerChecksum = checksum.getBytesPerChecksum();
		if (bytesPerChecksum > 10 * 1024 * 1024
				&& bytesPerChecksum > blockLength) {
			checksum = DataChecksum.newDataChecksum(
					checksum.getChecksumType(),
					Math.max((int) blockLength, 10 * 1024 * 1024));
			bytesPerChecksum = checksum.getBytesPerChecksum();
		}
		checksumSize = checksum.getChecksumSize();

		if (length < 0  || length > blockLength) {
			length = blockLength;
		}

		endOffset = blockLength;
		if (startOffset < 0 || startOffset >= endOffset) {
			//String msg = " Offset " + startOffset + " and length " + length
			//		+ " don't match block " + block + " ( blockLen "
			//		+ endOffset + " )";
			//LOG.error("NTar : BlockXCodingSender: " + msg);
			noData = true;
			return;
		}

		offset = (startOffset - (startOffset % bytesPerChecksum));
		if (length >= 0) {
			// Make sure endOffset points to end of a checksumed chunk.
			long tmpLen = startOffset + length;
			if (tmpLen % bytesPerChecksum != 0) {
				tmpLen += (bytesPerChecksum - tmpLen % bytesPerChecksum);
			}
			if (tmpLen < endOffset) {
				endOffset = tmpLen;
			}
		}

		// seek to the right offsets
		if (offset > 0) {
			long checksumSkip = (offset / bytesPerChecksum) * checksumSize;
			// note blockInStream is seeked when created below
			if (checksumSkip > 0 && checksumIn != null) {
				// Should we use seek() for checksum file as well?
				IOUtils.skipFully(checksumIn, checksumSkip);
			}
		}
		seqno = 0;

		blockIn = streamFactory.createStream(offset);
	} catch (IOException ioe) {
		IOUtils.closeStream(this);
		IOUtils.closeStream(blockIn);
		throw ioe;
	}
}