类org.apache.hadoop.fs.ReadOption源码实例Demo

下面列出了怎么用org.apache.hadoop.fs.ReadOption的API类实例代码及写法,或者点击链接到github查看源代码。

源代码1 项目: hadoop   文件: BlockReaderLocal.java
/**
 * Get or create a memory map for this replica.
 * 
 * There are two kinds of ClientMmap objects we could fetch here: one that 
 * will always read pre-checksummed data, and one that may read data that
 * hasn't been checksummed.
 *
 * If we fetch the former, "safe" kind of ClientMmap, we have to increment
 * the anchor count on the shared memory slot.  This will tell the DataNode
 * not to munlock the block until this ClientMmap is closed.
 * If we fetch the latter, we don't bother with anchoring.
 *
 * @param opts     The options to use, such as SKIP_CHECKSUMS.
 * 
 * @return         null on failure; the ClientMmap otherwise.
 */
@Override
public ClientMmap getClientMmap(EnumSet<ReadOption> opts) {
  boolean anchor = verifyChecksum &&
      (opts.contains(ReadOption.SKIP_CHECKSUMS) == false);
  if (anchor) {
    if (!createNoChecksumContext()) {
      if (LOG.isTraceEnabled()) {
        LOG.trace("can't get an mmap for " + block + " of " + filename + 
            " since SKIP_CHECKSUMS was not given, " +
            "we aren't skipping checksums, and the block is not mlocked.");
      }
      return null;
    }
  }
  ClientMmap clientMmap = null;
  try {
    clientMmap = replica.getOrCreateClientMmap(anchor);
  } finally {
    if ((clientMmap == null) && anchor) {
      releaseNoChecksumContext();
    }
  }
  return clientMmap;
}
 
源代码2 项目: hadoop   文件: TestCryptoStreams.java
@Override
public ByteBuffer read(ByteBufferPool bufferPool, int maxLength,
    EnumSet<ReadOption> opts) throws IOException,
    UnsupportedOperationException {
  if (bufferPool == null) {
    throw new IOException("Please specify buffer pool.");
  }
  ByteBuffer buffer = bufferPool.getBuffer(true, maxLength);
  int pos = buffer.position();
  int n = read(buffer);
  if (n >= 0) {
    buffer.position(pos);
    return buffer;
  }
  
  return null;
}
 
源代码3 项目: big-c   文件: BlockReaderLocal.java
/**
 * Get or create a memory map for this replica.
 * 
 * There are two kinds of ClientMmap objects we could fetch here: one that 
 * will always read pre-checksummed data, and one that may read data that
 * hasn't been checksummed.
 *
 * If we fetch the former, "safe" kind of ClientMmap, we have to increment
 * the anchor count on the shared memory slot.  This will tell the DataNode
 * not to munlock the block until this ClientMmap is closed.
 * If we fetch the latter, we don't bother with anchoring.
 *
 * @param opts     The options to use, such as SKIP_CHECKSUMS.
 * 
 * @return         null on failure; the ClientMmap otherwise.
 */
@Override
public ClientMmap getClientMmap(EnumSet<ReadOption> opts) {
  boolean anchor = verifyChecksum &&
      (opts.contains(ReadOption.SKIP_CHECKSUMS) == false);
  if (anchor) {
    if (!createNoChecksumContext()) {
      if (LOG.isTraceEnabled()) {
        LOG.trace("can't get an mmap for " + block + " of " + filename + 
            " since SKIP_CHECKSUMS was not given, " +
            "we aren't skipping checksums, and the block is not mlocked.");
      }
      return null;
    }
  }
  ClientMmap clientMmap = null;
  try {
    clientMmap = replica.getOrCreateClientMmap(anchor);
  } finally {
    if ((clientMmap == null) && anchor) {
      releaseNoChecksumContext();
    }
  }
  return clientMmap;
}
 
源代码4 项目: big-c   文件: TestCryptoStreams.java
@Override
public ByteBuffer read(ByteBufferPool bufferPool, int maxLength,
    EnumSet<ReadOption> opts) throws IOException,
    UnsupportedOperationException {
  if (bufferPool == null) {
    throw new IOException("Please specify buffer pool.");
  }
  ByteBuffer buffer = bufferPool.getBuffer(true, maxLength);
  int pos = buffer.position();
  int n = read(buffer);
  if (n >= 0) {
    buffer.position(pos);
    return buffer;
  }
  
  return null;
}
 
源代码5 项目: Bats   文件: DrillFSDataInputStream.java
@Override
public ByteBuffer read(ByteBufferPool bufferPool, int maxLength, EnumSet<ReadOption> opts) throws IOException, UnsupportedOperationException {
  operatorStats.startWait();
  try {
    return underlyingIs.read(bufferPool, maxLength, opts);
  } finally {
    operatorStats.stopWait();
  }
}
 
源代码6 项目: hadoop   文件: DFSInputStream.java
@Override
public synchronized ByteBuffer read(ByteBufferPool bufferPool,
    int maxLength, EnumSet<ReadOption> opts) 
        throws IOException, UnsupportedOperationException {
  if (maxLength == 0) {
    return EMPTY_BUFFER;
  } else if (maxLength < 0) {
    throw new IllegalArgumentException("can't read a negative " +
        "number of bytes.");
  }
  if ((blockReader == null) || (blockEnd == -1)) {
    if (pos >= getFileLength()) {
      return null;
    }
    /*
     * If we don't have a blockReader, or the one we have has no more bytes
     * left to read, we call seekToBlockSource to get a new blockReader and
     * recalculate blockEnd.  Note that we assume we're not at EOF here
     * (we check this above).
     */
    if ((!seekToBlockSource(pos)) || (blockReader == null)) {
      throw new IOException("failed to allocate new BlockReader " +
          "at position " + pos);
    }
  }
  ByteBuffer buffer = null;
  if (dfsClient.getConf().shortCircuitMmapEnabled) {
    buffer = tryReadZeroCopy(maxLength, opts);
  }
  if (buffer != null) {
    return buffer;
  }
  buffer = ByteBufferUtil.fallbackRead(this, bufferPool, maxLength);
  if (buffer != null) {
    getExtendedReadBuffers().put(buffer, bufferPool);
  }
  return buffer;
}
 
源代码7 项目: hadoop   文件: CryptoInputStream.java
@Override
public ByteBuffer read(ByteBufferPool bufferPool, int maxLength,
    EnumSet<ReadOption> opts) throws IOException,
    UnsupportedOperationException {
  checkStream();
  try {
    if (outBuffer.remaining() > 0) {
      // Have some decrypted data unread, need to reset.
      ((Seekable) in).seek(getPos());
      resetStreamOffset(getPos());
    }
    final ByteBuffer buffer = ((HasEnhancedByteBufferAccess) in).
        read(bufferPool, maxLength, opts);
    if (buffer != null) {
      final int n = buffer.remaining();
      if (n > 0) {
        streamOffset += buffer.remaining(); // Read n bytes
        final int pos = buffer.position();
        decrypt(buffer, n, pos);
      }
    }
    return buffer;
  } catch (ClassCastException e) {
    throw new UnsupportedOperationException("This stream does not support " + 
        "enhanced byte buffer access.");
  }
}
 
源代码8 项目: hadoop   文件: CryptoStreamsTestBase.java
@Test(timeout=120000)
public void testHasEnhancedByteBufferAccess() throws Exception {
  OutputStream out = getOutputStream(defaultBufferSize);
  writeData(out);
  
  InputStream in = getInputStream(defaultBufferSize);
  final int len1 = dataLen / 8;
  // ByteBuffer size is len1
  ByteBuffer buffer = ((HasEnhancedByteBufferAccess) in).read(
      getBufferPool(), len1, EnumSet.of(ReadOption.SKIP_CHECKSUMS));
  int n1 = buffer.remaining();
  byte[] readData = new byte[n1];
  buffer.get(readData);
  byte[] expectedData = new byte[n1];
  System.arraycopy(data, 0, expectedData, 0, n1);
  Assert.assertArrayEquals(readData, expectedData);
  ((HasEnhancedByteBufferAccess) in).releaseBuffer(buffer);
  
  // Read len1 bytes
  readData = new byte[len1];
  readAll(in, readData, 0, len1);
  expectedData = new byte[len1];
  System.arraycopy(data, n1, expectedData, 0, len1);
  Assert.assertArrayEquals(readData, expectedData);
  
  // ByteBuffer size is len1
  buffer = ((HasEnhancedByteBufferAccess) in).read(
      getBufferPool(), len1, EnumSet.of(ReadOption.SKIP_CHECKSUMS));
  int n2 = buffer.remaining();
  readData = new byte[n2];
  buffer.get(readData);
  expectedData = new byte[n2];
  System.arraycopy(data, n1 + len1, expectedData, 0, n2);
  Assert.assertArrayEquals(readData, expectedData);
  ((HasEnhancedByteBufferAccess) in).releaseBuffer(buffer);
  
  in.close();
}
 
源代码9 项目: dremio-oss   文件: FSDataInputStreamWrapper.java
@Override
public ByteBuffer read(ByteBufferPool bufferPool, int maxLength, EnumSet<ReadOption> opts) throws IOException, UnsupportedOperationException {
  try {
    return underlyingIs.read(bufferPool, maxLength, opts);
  } catch(FSError e) {
    throw HadoopFileSystemWrapper.propagateFSError(e);
  }
}
 
源代码10 项目: big-c   文件: DFSInputStream.java
@Override
public synchronized ByteBuffer read(ByteBufferPool bufferPool,
    int maxLength, EnumSet<ReadOption> opts) 
        throws IOException, UnsupportedOperationException {
  if (maxLength == 0) {
    return EMPTY_BUFFER;
  } else if (maxLength < 0) {
    throw new IllegalArgumentException("can't read a negative " +
        "number of bytes.");
  }
  if ((blockReader == null) || (blockEnd == -1)) {
    if (pos >= getFileLength()) {
      return null;
    }
    /*
     * If we don't have a blockReader, or the one we have has no more bytes
     * left to read, we call seekToBlockSource to get a new blockReader and
     * recalculate blockEnd.  Note that we assume we're not at EOF here
     * (we check this above).
     */
    if ((!seekToBlockSource(pos)) || (blockReader == null)) {
      throw new IOException("failed to allocate new BlockReader " +
          "at position " + pos);
    }
  }
  ByteBuffer buffer = null;
  if (dfsClient.getConf().shortCircuitMmapEnabled) {
    buffer = tryReadZeroCopy(maxLength, opts);
  }
  if (buffer != null) {
    return buffer;
  }
  buffer = ByteBufferUtil.fallbackRead(this, bufferPool, maxLength);
  if (buffer != null) {
    getExtendedReadBuffers().put(buffer, bufferPool);
  }
  return buffer;
}
 
源代码11 项目: big-c   文件: CryptoInputStream.java
@Override
public ByteBuffer read(ByteBufferPool bufferPool, int maxLength,
    EnumSet<ReadOption> opts) throws IOException,
    UnsupportedOperationException {
  checkStream();
  try {
    if (outBuffer.remaining() > 0) {
      // Have some decrypted data unread, need to reset.
      ((Seekable) in).seek(getPos());
      resetStreamOffset(getPos());
    }
    final ByteBuffer buffer = ((HasEnhancedByteBufferAccess) in).
        read(bufferPool, maxLength, opts);
    if (buffer != null) {
      final int n = buffer.remaining();
      if (n > 0) {
        streamOffset += buffer.remaining(); // Read n bytes
        final int pos = buffer.position();
        decrypt(buffer, n, pos);
      }
    }
    return buffer;
  } catch (ClassCastException e) {
    throw new UnsupportedOperationException("This stream does not support " + 
        "enhanced byte buffer access.");
  }
}
 
源代码12 项目: big-c   文件: CryptoStreamsTestBase.java
@Test(timeout=120000)
public void testHasEnhancedByteBufferAccess() throws Exception {
  OutputStream out = getOutputStream(defaultBufferSize);
  writeData(out);
  
  InputStream in = getInputStream(defaultBufferSize);
  final int len1 = dataLen / 8;
  // ByteBuffer size is len1
  ByteBuffer buffer = ((HasEnhancedByteBufferAccess) in).read(
      getBufferPool(), len1, EnumSet.of(ReadOption.SKIP_CHECKSUMS));
  int n1 = buffer.remaining();
  byte[] readData = new byte[n1];
  buffer.get(readData);
  byte[] expectedData = new byte[n1];
  System.arraycopy(data, 0, expectedData, 0, n1);
  Assert.assertArrayEquals(readData, expectedData);
  ((HasEnhancedByteBufferAccess) in).releaseBuffer(buffer);
  
  // Read len1 bytes
  readData = new byte[len1];
  readAll(in, readData, 0, len1);
  expectedData = new byte[len1];
  System.arraycopy(data, n1, expectedData, 0, len1);
  Assert.assertArrayEquals(readData, expectedData);
  
  // ByteBuffer size is len1
  buffer = ((HasEnhancedByteBufferAccess) in).read(
      getBufferPool(), len1, EnumSet.of(ReadOption.SKIP_CHECKSUMS));
  int n2 = buffer.remaining();
  readData = new byte[n2];
  buffer.get(readData);
  expectedData = new byte[n2];
  System.arraycopy(data, n1 + len1, expectedData, 0, n2);
  Assert.assertArrayEquals(readData, expectedData);
  ((HasEnhancedByteBufferAccess) in).releaseBuffer(buffer);
  
  in.close();
}
 
源代码13 项目: hudi   文件: InLineFsDataInputStream.java
@Override
public ByteBuffer read(ByteBufferPool bufferPool, int maxLength, EnumSet<ReadOption> opts)
    throws IOException, UnsupportedOperationException {
  if (maxLength > this.length) {
    throw new IOException("Attempting to read max length beyond inline content");
  }
  return outerStream.read(bufferPool, maxLength, opts);
}
 
源代码14 项目: tajo   文件: ZeroCopyAdapter.java
public final ByteBuffer readBuffer(int maxLength, boolean verifyChecksums)
    throws IOException {
  EnumSet<ReadOption> options = NO_CHECK_SUM;
  if (verifyChecksums) {
    options = CHECK_SUM;
  }
  return this.in.read(this.pool, maxLength, options);
}
 
源代码15 项目: hadoop   文件: RemoteBlockReader.java
@Override
public ClientMmap getClientMmap(EnumSet<ReadOption> opts) {
  return null;
}
 
源代码16 项目: hadoop   文件: DFSInputStream.java
private synchronized ByteBuffer tryReadZeroCopy(int maxLength,
    EnumSet<ReadOption> opts) throws IOException {
  // Copy 'pos' and 'blockEnd' to local variables to make it easier for the
  // JVM to optimize this function.
  final long curPos = pos;
  final long curEnd = blockEnd;
  final long blockStartInFile = currentLocatedBlock.getStartOffset();
  final long blockPos = curPos - blockStartInFile;

  // Shorten this read if the end of the block is nearby.
  long length63;
  if ((curPos + maxLength) <= (curEnd + 1)) {
    length63 = maxLength;
  } else {
    length63 = 1 + curEnd - curPos;
    if (length63 <= 0) {
      if (DFSClient.LOG.isDebugEnabled()) {
        DFSClient.LOG.debug("Unable to perform a zero-copy read from offset " +
          curPos + " of " + src + "; " + length63 + " bytes left in block.  " +
          "blockPos=" + blockPos + "; curPos=" + curPos +
          "; curEnd=" + curEnd);
      }
      return null;
    }
    if (DFSClient.LOG.isDebugEnabled()) {
      DFSClient.LOG.debug("Reducing read length from " + maxLength +
          " to " + length63 + " to avoid going more than one byte " +
          "past the end of the block.  blockPos=" + blockPos +
          "; curPos=" + curPos + "; curEnd=" + curEnd);
    }
  }
  // Make sure that don't go beyond 31-bit offsets in the MappedByteBuffer.
  int length;
  if (blockPos + length63 <= Integer.MAX_VALUE) {
    length = (int)length63;
  } else {
    long length31 = Integer.MAX_VALUE - blockPos;
    if (length31 <= 0) {
      // Java ByteBuffers can't be longer than 2 GB, because they use
      // 4-byte signed integers to represent capacity, etc.
      // So we can't mmap the parts of the block higher than the 2 GB offset.
      // FIXME: we could work around this with multiple memory maps.
      // See HDFS-5101.
      if (DFSClient.LOG.isDebugEnabled()) {
        DFSClient.LOG.debug("Unable to perform a zero-copy read from offset " +
          curPos + " of " + src + "; 31-bit MappedByteBuffer limit " +
          "exceeded.  blockPos=" + blockPos + ", curEnd=" + curEnd);
      }
      return null;
    }
    length = (int)length31;
    if (DFSClient.LOG.isDebugEnabled()) {
      DFSClient.LOG.debug("Reducing read length from " + maxLength +
          " to " + length + " to avoid 31-bit limit.  " +
          "blockPos=" + blockPos + "; curPos=" + curPos +
          "; curEnd=" + curEnd);
    }
  }
  final ClientMmap clientMmap = blockReader.getClientMmap(opts);
  if (clientMmap == null) {
    if (DFSClient.LOG.isDebugEnabled()) {
      DFSClient.LOG.debug("unable to perform a zero-copy read from offset " +
        curPos + " of " + src + "; BlockReader#getClientMmap returned " +
        "null.");
    }
    return null;
  }
  boolean success = false;
  ByteBuffer buffer;
  try {
    seek(curPos + length);
    buffer = clientMmap.getMappedByteBuffer().asReadOnlyBuffer();
    buffer.position((int)blockPos);
    buffer.limit((int)(blockPos + length));
    getExtendedReadBuffers().put(buffer, clientMmap);
    synchronized (infoLock) {
      readStatistics.addZeroCopyBytes(length);
    }
    if (DFSClient.LOG.isDebugEnabled()) {
      DFSClient.LOG.debug("readZeroCopy read " + length + 
          " bytes from offset " + curPos + " via the zero-copy read " +
          "path.  blockEnd = " + blockEnd);
    }
    success = true;
  } finally {
    if (!success) {
      IOUtils.closeQuietly(clientMmap);
    }
  }
  return buffer;
}
 
源代码17 项目: hadoop   文件: RemoteBlockReader2.java
@Override
public ClientMmap getClientMmap(EnumSet<ReadOption> opts) {
  return null;
}
 
源代码18 项目: hadoop   文件: BlockReaderLocalLegacy.java
@Override
public ClientMmap getClientMmap(EnumSet<ReadOption> opts) {
  return null;
}
 
源代码19 项目: hadoop   文件: TestFsDatasetCacheRevocation.java
/**
 * Test that when a client has a replica mmapped, we will not un-mlock that
 * replica for a reasonable amount of time, even if an uncache request
 * occurs.
 */
@Test(timeout=120000)
public void testPinning() throws Exception {
  assumeTrue(NativeCodeLoader.isNativeCodeLoaded() && !Path.WINDOWS);
  Configuration conf = getDefaultConf();
  // Set a really long revocation timeout, so that we won't reach it during
  // this test.
  conf.setLong(DFSConfigKeys.DFS_DATANODE_CACHE_REVOCATION_TIMEOUT_MS,
      1800000L);
  // Poll very often
  conf.setLong(DFSConfigKeys.DFS_DATANODE_CACHE_REVOCATION_POLLING_MS, 2L);
  MiniDFSCluster cluster = null;
  cluster = new MiniDFSCluster.Builder(conf).numDataNodes(1).build();
  cluster.waitActive();
  DistributedFileSystem dfs = cluster.getFileSystem();

  // Create and cache a file.
  final String TEST_FILE = "/test_file";
  DFSTestUtil.createFile(dfs, new Path(TEST_FILE),
      BLOCK_SIZE, (short)1, 0xcafe);
  dfs.addCachePool(new CachePoolInfo("pool"));
  long cacheDirectiveId =
    dfs.addCacheDirective(new CacheDirectiveInfo.Builder().
      setPool("pool").setPath(new Path(TEST_FILE)).
        setReplication((short) 1).build());
  FsDatasetSpi<?> fsd = cluster.getDataNodes().get(0).getFSDataset();
  DFSTestUtil.verifyExpectedCacheUsage(BLOCK_SIZE, 1, fsd);

  // Mmap the file.
  FSDataInputStream in = dfs.open(new Path(TEST_FILE));
  ByteBuffer buf =
      in.read(null, BLOCK_SIZE, EnumSet.noneOf(ReadOption.class));

  // Attempt to uncache file.  The file should still be cached.
  dfs.removeCacheDirective(cacheDirectiveId);
  Thread.sleep(500);
  DFSTestUtil.verifyExpectedCacheUsage(BLOCK_SIZE, 1, fsd);

  // Un-mmap the file.  The file should be uncached after this.
  in.releaseBuffer(buf);
  DFSTestUtil.verifyExpectedCacheUsage(0, 0, fsd);

  // Cleanup
  in.close();
  cluster.shutdown();
}
 
源代码20 项目: hadoop   文件: TestFsDatasetCacheRevocation.java
/**
 * Test that when we have an uncache request, and the client refuses to release
 * the replica for a long time, we will un-mlock it.
 */
@Test(timeout=120000)
public void testRevocation() throws Exception {
  assumeTrue(NativeCodeLoader.isNativeCodeLoaded() && !Path.WINDOWS);
  BlockReaderTestUtil.enableHdfsCachingTracing();
  BlockReaderTestUtil.enableShortCircuitShmTracing();
  Configuration conf = getDefaultConf();
  // Set a really short revocation timeout.
  conf.setLong(DFSConfigKeys.DFS_DATANODE_CACHE_REVOCATION_TIMEOUT_MS, 250L);
  // Poll very often
  conf.setLong(DFSConfigKeys.DFS_DATANODE_CACHE_REVOCATION_POLLING_MS, 2L);
  MiniDFSCluster cluster = null;
  cluster = new MiniDFSCluster.Builder(conf).numDataNodes(1).build();
  cluster.waitActive();
  DistributedFileSystem dfs = cluster.getFileSystem();

  // Create and cache a file.
  final String TEST_FILE = "/test_file2";
  DFSTestUtil.createFile(dfs, new Path(TEST_FILE),
      BLOCK_SIZE, (short)1, 0xcafe);
  dfs.addCachePool(new CachePoolInfo("pool"));
  long cacheDirectiveId =
      dfs.addCacheDirective(new CacheDirectiveInfo.Builder().
          setPool("pool").setPath(new Path(TEST_FILE)).
          setReplication((short) 1).build());
  FsDatasetSpi<?> fsd = cluster.getDataNodes().get(0).getFSDataset();
  DFSTestUtil.verifyExpectedCacheUsage(BLOCK_SIZE, 1, fsd);

  // Mmap the file.
  FSDataInputStream in = dfs.open(new Path(TEST_FILE));
  ByteBuffer buf =
      in.read(null, BLOCK_SIZE, EnumSet.noneOf(ReadOption.class));

  // Attempt to uncache file.  The file should get uncached.
  LOG.info("removing cache directive {}", cacheDirectiveId);
  dfs.removeCacheDirective(cacheDirectiveId);
  LOG.info("finished removing cache directive {}", cacheDirectiveId);
  Thread.sleep(1000);
  DFSTestUtil.verifyExpectedCacheUsage(0, 0, fsd);

  // Cleanup
  in.releaseBuffer(buf);
  in.close();
  cluster.shutdown();
}
 
@Override
public ByteBuffer read(ByteBufferPool bufferPool, int maxLength, EnumSet<ReadOption> opts) throws IOException, UnsupportedOperationException {
  try (WaitRecorder recorder = OperatorStats.getWaitRecorder(operatorStats)) {
    return super.read(bufferPool, maxLength, opts);
  }
}
 
@Override
public ByteBuffer read(ByteBufferPool bufferPool, int maxLength, EnumSet<ReadOption> opts) throws IOException, UnsupportedOperationException {
  try (WaitRecorder recorder = OperatorStats.getWaitRecorder(operatorStats)) {
    return super.read(bufferPool, maxLength, opts);
  }
}
 
源代码23 项目: big-c   文件: RemoteBlockReader.java
@Override
public ClientMmap getClientMmap(EnumSet<ReadOption> opts) {
  return null;
}
 
源代码24 项目: big-c   文件: DFSInputStream.java
private synchronized ByteBuffer tryReadZeroCopy(int maxLength,
    EnumSet<ReadOption> opts) throws IOException {
  // Copy 'pos' and 'blockEnd' to local variables to make it easier for the
  // JVM to optimize this function.
  final long curPos = pos;
  final long curEnd = blockEnd;
  final long blockStartInFile = currentLocatedBlock.getStartOffset();
  final long blockPos = curPos - blockStartInFile;

  // Shorten this read if the end of the block is nearby.
  long length63;
  if ((curPos + maxLength) <= (curEnd + 1)) {
    length63 = maxLength;
  } else {
    length63 = 1 + curEnd - curPos;
    if (length63 <= 0) {
      if (DFSClient.LOG.isDebugEnabled()) {
        DFSClient.LOG.debug("Unable to perform a zero-copy read from offset " +
          curPos + " of " + src + "; " + length63 + " bytes left in block.  " +
          "blockPos=" + blockPos + "; curPos=" + curPos +
          "; curEnd=" + curEnd);
      }
      return null;
    }
    if (DFSClient.LOG.isDebugEnabled()) {
      DFSClient.LOG.debug("Reducing read length from " + maxLength +
          " to " + length63 + " to avoid going more than one byte " +
          "past the end of the block.  blockPos=" + blockPos +
          "; curPos=" + curPos + "; curEnd=" + curEnd);
    }
  }
  // Make sure that don't go beyond 31-bit offsets in the MappedByteBuffer.
  int length;
  if (blockPos + length63 <= Integer.MAX_VALUE) {
    length = (int)length63;
  } else {
    long length31 = Integer.MAX_VALUE - blockPos;
    if (length31 <= 0) {
      // Java ByteBuffers can't be longer than 2 GB, because they use
      // 4-byte signed integers to represent capacity, etc.
      // So we can't mmap the parts of the block higher than the 2 GB offset.
      // FIXME: we could work around this with multiple memory maps.
      // See HDFS-5101.
      if (DFSClient.LOG.isDebugEnabled()) {
        DFSClient.LOG.debug("Unable to perform a zero-copy read from offset " +
          curPos + " of " + src + "; 31-bit MappedByteBuffer limit " +
          "exceeded.  blockPos=" + blockPos + ", curEnd=" + curEnd);
      }
      return null;
    }
    length = (int)length31;
    if (DFSClient.LOG.isDebugEnabled()) {
      DFSClient.LOG.debug("Reducing read length from " + maxLength +
          " to " + length + " to avoid 31-bit limit.  " +
          "blockPos=" + blockPos + "; curPos=" + curPos +
          "; curEnd=" + curEnd);
    }
  }
  final ClientMmap clientMmap = blockReader.getClientMmap(opts);
  if (clientMmap == null) {
    if (DFSClient.LOG.isDebugEnabled()) {
      DFSClient.LOG.debug("unable to perform a zero-copy read from offset " +
        curPos + " of " + src + "; BlockReader#getClientMmap returned " +
        "null.");
    }
    return null;
  }
  boolean success = false;
  ByteBuffer buffer;
  try {
    seek(curPos + length);
    buffer = clientMmap.getMappedByteBuffer().asReadOnlyBuffer();
    buffer.position((int)blockPos);
    buffer.limit((int)(blockPos + length));
    getExtendedReadBuffers().put(buffer, clientMmap);
    synchronized (infoLock) {
      readStatistics.addZeroCopyBytes(length);
    }
    if (DFSClient.LOG.isDebugEnabled()) {
      DFSClient.LOG.debug("readZeroCopy read " + length + 
          " bytes from offset " + curPos + " via the zero-copy read " +
          "path.  blockEnd = " + blockEnd);
    }
    success = true;
  } finally {
    if (!success) {
      IOUtils.closeQuietly(clientMmap);
    }
  }
  return buffer;
}
 
源代码25 项目: big-c   文件: RemoteBlockReader2.java
@Override
public ClientMmap getClientMmap(EnumSet<ReadOption> opts) {
  return null;
}
 
源代码26 项目: big-c   文件: BlockReaderLocalLegacy.java
@Override
public ClientMmap getClientMmap(EnumSet<ReadOption> opts) {
  return null;
}
 
源代码27 项目: big-c   文件: TestFsDatasetCacheRevocation.java
/**
 * Test that when a client has a replica mmapped, we will not un-mlock that
 * replica for a reasonable amount of time, even if an uncache request
 * occurs.
 */
@Test(timeout=120000)
public void testPinning() throws Exception {
  assumeTrue(NativeCodeLoader.isNativeCodeLoaded() && !Path.WINDOWS);
  Configuration conf = getDefaultConf();
  // Set a really long revocation timeout, so that we won't reach it during
  // this test.
  conf.setLong(DFSConfigKeys.DFS_DATANODE_CACHE_REVOCATION_TIMEOUT_MS,
      1800000L);
  // Poll very often
  conf.setLong(DFSConfigKeys.DFS_DATANODE_CACHE_REVOCATION_POLLING_MS, 2L);
  MiniDFSCluster cluster = null;
  cluster = new MiniDFSCluster.Builder(conf).numDataNodes(1).build();
  cluster.waitActive();
  DistributedFileSystem dfs = cluster.getFileSystem();

  // Create and cache a file.
  final String TEST_FILE = "/test_file";
  DFSTestUtil.createFile(dfs, new Path(TEST_FILE),
      BLOCK_SIZE, (short)1, 0xcafe);
  dfs.addCachePool(new CachePoolInfo("pool"));
  long cacheDirectiveId =
    dfs.addCacheDirective(new CacheDirectiveInfo.Builder().
      setPool("pool").setPath(new Path(TEST_FILE)).
        setReplication((short) 1).build());
  FsDatasetSpi<?> fsd = cluster.getDataNodes().get(0).getFSDataset();
  DFSTestUtil.verifyExpectedCacheUsage(BLOCK_SIZE, 1, fsd);

  // Mmap the file.
  FSDataInputStream in = dfs.open(new Path(TEST_FILE));
  ByteBuffer buf =
      in.read(null, BLOCK_SIZE, EnumSet.noneOf(ReadOption.class));

  // Attempt to uncache file.  The file should still be cached.
  dfs.removeCacheDirective(cacheDirectiveId);
  Thread.sleep(500);
  DFSTestUtil.verifyExpectedCacheUsage(BLOCK_SIZE, 1, fsd);

  // Un-mmap the file.  The file should be uncached after this.
  in.releaseBuffer(buf);
  DFSTestUtil.verifyExpectedCacheUsage(0, 0, fsd);

  // Cleanup
  in.close();
  cluster.shutdown();
}
 
源代码28 项目: big-c   文件: TestFsDatasetCacheRevocation.java
/**
 * Test that when we have an uncache request, and the client refuses to release
 * the replica for a long time, we will un-mlock it.
 */
@Test(timeout=120000)
public void testRevocation() throws Exception {
  assumeTrue(NativeCodeLoader.isNativeCodeLoaded() && !Path.WINDOWS);
  BlockReaderTestUtil.enableHdfsCachingTracing();
  BlockReaderTestUtil.enableShortCircuitShmTracing();
  Configuration conf = getDefaultConf();
  // Set a really short revocation timeout.
  conf.setLong(DFSConfigKeys.DFS_DATANODE_CACHE_REVOCATION_TIMEOUT_MS, 250L);
  // Poll very often
  conf.setLong(DFSConfigKeys.DFS_DATANODE_CACHE_REVOCATION_POLLING_MS, 2L);
  MiniDFSCluster cluster = null;
  cluster = new MiniDFSCluster.Builder(conf).numDataNodes(1).build();
  cluster.waitActive();
  DistributedFileSystem dfs = cluster.getFileSystem();

  // Create and cache a file.
  final String TEST_FILE = "/test_file2";
  DFSTestUtil.createFile(dfs, new Path(TEST_FILE),
      BLOCK_SIZE, (short)1, 0xcafe);
  dfs.addCachePool(new CachePoolInfo("pool"));
  long cacheDirectiveId =
      dfs.addCacheDirective(new CacheDirectiveInfo.Builder().
          setPool("pool").setPath(new Path(TEST_FILE)).
          setReplication((short) 1).build());
  FsDatasetSpi<?> fsd = cluster.getDataNodes().get(0).getFSDataset();
  DFSTestUtil.verifyExpectedCacheUsage(BLOCK_SIZE, 1, fsd);

  // Mmap the file.
  FSDataInputStream in = dfs.open(new Path(TEST_FILE));
  ByteBuffer buf =
      in.read(null, BLOCK_SIZE, EnumSet.noneOf(ReadOption.class));

  // Attempt to uncache file.  The file should get uncached.
  LOG.info("removing cache directive {}", cacheDirectiveId);
  dfs.removeCacheDirective(cacheDirectiveId);
  LOG.info("finished removing cache directive {}", cacheDirectiveId);
  Thread.sleep(1000);
  DFSTestUtil.verifyExpectedCacheUsage(0, 0, fsd);

  // Cleanup
  in.releaseBuffer(buf);
  in.close();
  cluster.shutdown();
}
 
源代码29 项目: hbase   文件: TestFSDataInputStreamWrapper.java
@Override
public ByteBuffer read(ByteBufferPool paramByteBufferPool,
    int paramInt, EnumSet<ReadOption> paramEnumSet)
        throws IOException, UnsupportedOperationException {
  return null;
}
 
源代码30 项目: hadoop   文件: BlockReader.java
/**
 * Get a ClientMmap object for this BlockReader.
 *
 * @param opts          The read options to use.
 * @return              The ClientMmap object, or null if mmap is not
 *                      supported.
 */
ClientMmap getClientMmap(EnumSet<ReadOption> opts);
 
 类所在包
 同包方法