下面列出了怎么用org.apache.hadoop.fs.ReadOption的API类实例代码及写法,或者点击链接到github查看源代码。
/**
* Get or create a memory map for this replica.
*
* There are two kinds of ClientMmap objects we could fetch here: one that
* will always read pre-checksummed data, and one that may read data that
* hasn't been checksummed.
*
* If we fetch the former, "safe" kind of ClientMmap, we have to increment
* the anchor count on the shared memory slot. This will tell the DataNode
* not to munlock the block until this ClientMmap is closed.
* If we fetch the latter, we don't bother with anchoring.
*
* @param opts The options to use, such as SKIP_CHECKSUMS.
*
* @return null on failure; the ClientMmap otherwise.
*/
@Override
public ClientMmap getClientMmap(EnumSet<ReadOption> opts) {
boolean anchor = verifyChecksum &&
(opts.contains(ReadOption.SKIP_CHECKSUMS) == false);
if (anchor) {
if (!createNoChecksumContext()) {
if (LOG.isTraceEnabled()) {
LOG.trace("can't get an mmap for " + block + " of " + filename +
" since SKIP_CHECKSUMS was not given, " +
"we aren't skipping checksums, and the block is not mlocked.");
}
return null;
}
}
ClientMmap clientMmap = null;
try {
clientMmap = replica.getOrCreateClientMmap(anchor);
} finally {
if ((clientMmap == null) && anchor) {
releaseNoChecksumContext();
}
}
return clientMmap;
}
@Override
public ByteBuffer read(ByteBufferPool bufferPool, int maxLength,
EnumSet<ReadOption> opts) throws IOException,
UnsupportedOperationException {
if (bufferPool == null) {
throw new IOException("Please specify buffer pool.");
}
ByteBuffer buffer = bufferPool.getBuffer(true, maxLength);
int pos = buffer.position();
int n = read(buffer);
if (n >= 0) {
buffer.position(pos);
return buffer;
}
return null;
}
/**
* Get or create a memory map for this replica.
*
* There are two kinds of ClientMmap objects we could fetch here: one that
* will always read pre-checksummed data, and one that may read data that
* hasn't been checksummed.
*
* If we fetch the former, "safe" kind of ClientMmap, we have to increment
* the anchor count on the shared memory slot. This will tell the DataNode
* not to munlock the block until this ClientMmap is closed.
* If we fetch the latter, we don't bother with anchoring.
*
* @param opts The options to use, such as SKIP_CHECKSUMS.
*
* @return null on failure; the ClientMmap otherwise.
*/
@Override
public ClientMmap getClientMmap(EnumSet<ReadOption> opts) {
boolean anchor = verifyChecksum &&
(opts.contains(ReadOption.SKIP_CHECKSUMS) == false);
if (anchor) {
if (!createNoChecksumContext()) {
if (LOG.isTraceEnabled()) {
LOG.trace("can't get an mmap for " + block + " of " + filename +
" since SKIP_CHECKSUMS was not given, " +
"we aren't skipping checksums, and the block is not mlocked.");
}
return null;
}
}
ClientMmap clientMmap = null;
try {
clientMmap = replica.getOrCreateClientMmap(anchor);
} finally {
if ((clientMmap == null) && anchor) {
releaseNoChecksumContext();
}
}
return clientMmap;
}
@Override
public ByteBuffer read(ByteBufferPool bufferPool, int maxLength,
EnumSet<ReadOption> opts) throws IOException,
UnsupportedOperationException {
if (bufferPool == null) {
throw new IOException("Please specify buffer pool.");
}
ByteBuffer buffer = bufferPool.getBuffer(true, maxLength);
int pos = buffer.position();
int n = read(buffer);
if (n >= 0) {
buffer.position(pos);
return buffer;
}
return null;
}
@Override
public ByteBuffer read(ByteBufferPool bufferPool, int maxLength, EnumSet<ReadOption> opts) throws IOException, UnsupportedOperationException {
operatorStats.startWait();
try {
return underlyingIs.read(bufferPool, maxLength, opts);
} finally {
operatorStats.stopWait();
}
}
@Override
public synchronized ByteBuffer read(ByteBufferPool bufferPool,
int maxLength, EnumSet<ReadOption> opts)
throws IOException, UnsupportedOperationException {
if (maxLength == 0) {
return EMPTY_BUFFER;
} else if (maxLength < 0) {
throw new IllegalArgumentException("can't read a negative " +
"number of bytes.");
}
if ((blockReader == null) || (blockEnd == -1)) {
if (pos >= getFileLength()) {
return null;
}
/*
* If we don't have a blockReader, or the one we have has no more bytes
* left to read, we call seekToBlockSource to get a new blockReader and
* recalculate blockEnd. Note that we assume we're not at EOF here
* (we check this above).
*/
if ((!seekToBlockSource(pos)) || (blockReader == null)) {
throw new IOException("failed to allocate new BlockReader " +
"at position " + pos);
}
}
ByteBuffer buffer = null;
if (dfsClient.getConf().shortCircuitMmapEnabled) {
buffer = tryReadZeroCopy(maxLength, opts);
}
if (buffer != null) {
return buffer;
}
buffer = ByteBufferUtil.fallbackRead(this, bufferPool, maxLength);
if (buffer != null) {
getExtendedReadBuffers().put(buffer, bufferPool);
}
return buffer;
}
@Override
public ByteBuffer read(ByteBufferPool bufferPool, int maxLength,
EnumSet<ReadOption> opts) throws IOException,
UnsupportedOperationException {
checkStream();
try {
if (outBuffer.remaining() > 0) {
// Have some decrypted data unread, need to reset.
((Seekable) in).seek(getPos());
resetStreamOffset(getPos());
}
final ByteBuffer buffer = ((HasEnhancedByteBufferAccess) in).
read(bufferPool, maxLength, opts);
if (buffer != null) {
final int n = buffer.remaining();
if (n > 0) {
streamOffset += buffer.remaining(); // Read n bytes
final int pos = buffer.position();
decrypt(buffer, n, pos);
}
}
return buffer;
} catch (ClassCastException e) {
throw new UnsupportedOperationException("This stream does not support " +
"enhanced byte buffer access.");
}
}
@Test(timeout=120000)
public void testHasEnhancedByteBufferAccess() throws Exception {
OutputStream out = getOutputStream(defaultBufferSize);
writeData(out);
InputStream in = getInputStream(defaultBufferSize);
final int len1 = dataLen / 8;
// ByteBuffer size is len1
ByteBuffer buffer = ((HasEnhancedByteBufferAccess) in).read(
getBufferPool(), len1, EnumSet.of(ReadOption.SKIP_CHECKSUMS));
int n1 = buffer.remaining();
byte[] readData = new byte[n1];
buffer.get(readData);
byte[] expectedData = new byte[n1];
System.arraycopy(data, 0, expectedData, 0, n1);
Assert.assertArrayEquals(readData, expectedData);
((HasEnhancedByteBufferAccess) in).releaseBuffer(buffer);
// Read len1 bytes
readData = new byte[len1];
readAll(in, readData, 0, len1);
expectedData = new byte[len1];
System.arraycopy(data, n1, expectedData, 0, len1);
Assert.assertArrayEquals(readData, expectedData);
// ByteBuffer size is len1
buffer = ((HasEnhancedByteBufferAccess) in).read(
getBufferPool(), len1, EnumSet.of(ReadOption.SKIP_CHECKSUMS));
int n2 = buffer.remaining();
readData = new byte[n2];
buffer.get(readData);
expectedData = new byte[n2];
System.arraycopy(data, n1 + len1, expectedData, 0, n2);
Assert.assertArrayEquals(readData, expectedData);
((HasEnhancedByteBufferAccess) in).releaseBuffer(buffer);
in.close();
}
@Override
public ByteBuffer read(ByteBufferPool bufferPool, int maxLength, EnumSet<ReadOption> opts) throws IOException, UnsupportedOperationException {
try {
return underlyingIs.read(bufferPool, maxLength, opts);
} catch(FSError e) {
throw HadoopFileSystemWrapper.propagateFSError(e);
}
}
@Override
public synchronized ByteBuffer read(ByteBufferPool bufferPool,
int maxLength, EnumSet<ReadOption> opts)
throws IOException, UnsupportedOperationException {
if (maxLength == 0) {
return EMPTY_BUFFER;
} else if (maxLength < 0) {
throw new IllegalArgumentException("can't read a negative " +
"number of bytes.");
}
if ((blockReader == null) || (blockEnd == -1)) {
if (pos >= getFileLength()) {
return null;
}
/*
* If we don't have a blockReader, or the one we have has no more bytes
* left to read, we call seekToBlockSource to get a new blockReader and
* recalculate blockEnd. Note that we assume we're not at EOF here
* (we check this above).
*/
if ((!seekToBlockSource(pos)) || (blockReader == null)) {
throw new IOException("failed to allocate new BlockReader " +
"at position " + pos);
}
}
ByteBuffer buffer = null;
if (dfsClient.getConf().shortCircuitMmapEnabled) {
buffer = tryReadZeroCopy(maxLength, opts);
}
if (buffer != null) {
return buffer;
}
buffer = ByteBufferUtil.fallbackRead(this, bufferPool, maxLength);
if (buffer != null) {
getExtendedReadBuffers().put(buffer, bufferPool);
}
return buffer;
}
@Override
public ByteBuffer read(ByteBufferPool bufferPool, int maxLength,
EnumSet<ReadOption> opts) throws IOException,
UnsupportedOperationException {
checkStream();
try {
if (outBuffer.remaining() > 0) {
// Have some decrypted data unread, need to reset.
((Seekable) in).seek(getPos());
resetStreamOffset(getPos());
}
final ByteBuffer buffer = ((HasEnhancedByteBufferAccess) in).
read(bufferPool, maxLength, opts);
if (buffer != null) {
final int n = buffer.remaining();
if (n > 0) {
streamOffset += buffer.remaining(); // Read n bytes
final int pos = buffer.position();
decrypt(buffer, n, pos);
}
}
return buffer;
} catch (ClassCastException e) {
throw new UnsupportedOperationException("This stream does not support " +
"enhanced byte buffer access.");
}
}
@Test(timeout=120000)
public void testHasEnhancedByteBufferAccess() throws Exception {
OutputStream out = getOutputStream(defaultBufferSize);
writeData(out);
InputStream in = getInputStream(defaultBufferSize);
final int len1 = dataLen / 8;
// ByteBuffer size is len1
ByteBuffer buffer = ((HasEnhancedByteBufferAccess) in).read(
getBufferPool(), len1, EnumSet.of(ReadOption.SKIP_CHECKSUMS));
int n1 = buffer.remaining();
byte[] readData = new byte[n1];
buffer.get(readData);
byte[] expectedData = new byte[n1];
System.arraycopy(data, 0, expectedData, 0, n1);
Assert.assertArrayEquals(readData, expectedData);
((HasEnhancedByteBufferAccess) in).releaseBuffer(buffer);
// Read len1 bytes
readData = new byte[len1];
readAll(in, readData, 0, len1);
expectedData = new byte[len1];
System.arraycopy(data, n1, expectedData, 0, len1);
Assert.assertArrayEquals(readData, expectedData);
// ByteBuffer size is len1
buffer = ((HasEnhancedByteBufferAccess) in).read(
getBufferPool(), len1, EnumSet.of(ReadOption.SKIP_CHECKSUMS));
int n2 = buffer.remaining();
readData = new byte[n2];
buffer.get(readData);
expectedData = new byte[n2];
System.arraycopy(data, n1 + len1, expectedData, 0, n2);
Assert.assertArrayEquals(readData, expectedData);
((HasEnhancedByteBufferAccess) in).releaseBuffer(buffer);
in.close();
}
@Override
public ByteBuffer read(ByteBufferPool bufferPool, int maxLength, EnumSet<ReadOption> opts)
throws IOException, UnsupportedOperationException {
if (maxLength > this.length) {
throw new IOException("Attempting to read max length beyond inline content");
}
return outerStream.read(bufferPool, maxLength, opts);
}
public final ByteBuffer readBuffer(int maxLength, boolean verifyChecksums)
throws IOException {
EnumSet<ReadOption> options = NO_CHECK_SUM;
if (verifyChecksums) {
options = CHECK_SUM;
}
return this.in.read(this.pool, maxLength, options);
}
@Override
public ClientMmap getClientMmap(EnumSet<ReadOption> opts) {
return null;
}
private synchronized ByteBuffer tryReadZeroCopy(int maxLength,
EnumSet<ReadOption> opts) throws IOException {
// Copy 'pos' and 'blockEnd' to local variables to make it easier for the
// JVM to optimize this function.
final long curPos = pos;
final long curEnd = blockEnd;
final long blockStartInFile = currentLocatedBlock.getStartOffset();
final long blockPos = curPos - blockStartInFile;
// Shorten this read if the end of the block is nearby.
long length63;
if ((curPos + maxLength) <= (curEnd + 1)) {
length63 = maxLength;
} else {
length63 = 1 + curEnd - curPos;
if (length63 <= 0) {
if (DFSClient.LOG.isDebugEnabled()) {
DFSClient.LOG.debug("Unable to perform a zero-copy read from offset " +
curPos + " of " + src + "; " + length63 + " bytes left in block. " +
"blockPos=" + blockPos + "; curPos=" + curPos +
"; curEnd=" + curEnd);
}
return null;
}
if (DFSClient.LOG.isDebugEnabled()) {
DFSClient.LOG.debug("Reducing read length from " + maxLength +
" to " + length63 + " to avoid going more than one byte " +
"past the end of the block. blockPos=" + blockPos +
"; curPos=" + curPos + "; curEnd=" + curEnd);
}
}
// Make sure that don't go beyond 31-bit offsets in the MappedByteBuffer.
int length;
if (blockPos + length63 <= Integer.MAX_VALUE) {
length = (int)length63;
} else {
long length31 = Integer.MAX_VALUE - blockPos;
if (length31 <= 0) {
// Java ByteBuffers can't be longer than 2 GB, because they use
// 4-byte signed integers to represent capacity, etc.
// So we can't mmap the parts of the block higher than the 2 GB offset.
// FIXME: we could work around this with multiple memory maps.
// See HDFS-5101.
if (DFSClient.LOG.isDebugEnabled()) {
DFSClient.LOG.debug("Unable to perform a zero-copy read from offset " +
curPos + " of " + src + "; 31-bit MappedByteBuffer limit " +
"exceeded. blockPos=" + blockPos + ", curEnd=" + curEnd);
}
return null;
}
length = (int)length31;
if (DFSClient.LOG.isDebugEnabled()) {
DFSClient.LOG.debug("Reducing read length from " + maxLength +
" to " + length + " to avoid 31-bit limit. " +
"blockPos=" + blockPos + "; curPos=" + curPos +
"; curEnd=" + curEnd);
}
}
final ClientMmap clientMmap = blockReader.getClientMmap(opts);
if (clientMmap == null) {
if (DFSClient.LOG.isDebugEnabled()) {
DFSClient.LOG.debug("unable to perform a zero-copy read from offset " +
curPos + " of " + src + "; BlockReader#getClientMmap returned " +
"null.");
}
return null;
}
boolean success = false;
ByteBuffer buffer;
try {
seek(curPos + length);
buffer = clientMmap.getMappedByteBuffer().asReadOnlyBuffer();
buffer.position((int)blockPos);
buffer.limit((int)(blockPos + length));
getExtendedReadBuffers().put(buffer, clientMmap);
synchronized (infoLock) {
readStatistics.addZeroCopyBytes(length);
}
if (DFSClient.LOG.isDebugEnabled()) {
DFSClient.LOG.debug("readZeroCopy read " + length +
" bytes from offset " + curPos + " via the zero-copy read " +
"path. blockEnd = " + blockEnd);
}
success = true;
} finally {
if (!success) {
IOUtils.closeQuietly(clientMmap);
}
}
return buffer;
}
@Override
public ClientMmap getClientMmap(EnumSet<ReadOption> opts) {
return null;
}
@Override
public ClientMmap getClientMmap(EnumSet<ReadOption> opts) {
return null;
}
/**
* Test that when a client has a replica mmapped, we will not un-mlock that
* replica for a reasonable amount of time, even if an uncache request
* occurs.
*/
@Test(timeout=120000)
public void testPinning() throws Exception {
assumeTrue(NativeCodeLoader.isNativeCodeLoaded() && !Path.WINDOWS);
Configuration conf = getDefaultConf();
// Set a really long revocation timeout, so that we won't reach it during
// this test.
conf.setLong(DFSConfigKeys.DFS_DATANODE_CACHE_REVOCATION_TIMEOUT_MS,
1800000L);
// Poll very often
conf.setLong(DFSConfigKeys.DFS_DATANODE_CACHE_REVOCATION_POLLING_MS, 2L);
MiniDFSCluster cluster = null;
cluster = new MiniDFSCluster.Builder(conf).numDataNodes(1).build();
cluster.waitActive();
DistributedFileSystem dfs = cluster.getFileSystem();
// Create and cache a file.
final String TEST_FILE = "/test_file";
DFSTestUtil.createFile(dfs, new Path(TEST_FILE),
BLOCK_SIZE, (short)1, 0xcafe);
dfs.addCachePool(new CachePoolInfo("pool"));
long cacheDirectiveId =
dfs.addCacheDirective(new CacheDirectiveInfo.Builder().
setPool("pool").setPath(new Path(TEST_FILE)).
setReplication((short) 1).build());
FsDatasetSpi<?> fsd = cluster.getDataNodes().get(0).getFSDataset();
DFSTestUtil.verifyExpectedCacheUsage(BLOCK_SIZE, 1, fsd);
// Mmap the file.
FSDataInputStream in = dfs.open(new Path(TEST_FILE));
ByteBuffer buf =
in.read(null, BLOCK_SIZE, EnumSet.noneOf(ReadOption.class));
// Attempt to uncache file. The file should still be cached.
dfs.removeCacheDirective(cacheDirectiveId);
Thread.sleep(500);
DFSTestUtil.verifyExpectedCacheUsage(BLOCK_SIZE, 1, fsd);
// Un-mmap the file. The file should be uncached after this.
in.releaseBuffer(buf);
DFSTestUtil.verifyExpectedCacheUsage(0, 0, fsd);
// Cleanup
in.close();
cluster.shutdown();
}
/**
* Test that when we have an uncache request, and the client refuses to release
* the replica for a long time, we will un-mlock it.
*/
@Test(timeout=120000)
public void testRevocation() throws Exception {
assumeTrue(NativeCodeLoader.isNativeCodeLoaded() && !Path.WINDOWS);
BlockReaderTestUtil.enableHdfsCachingTracing();
BlockReaderTestUtil.enableShortCircuitShmTracing();
Configuration conf = getDefaultConf();
// Set a really short revocation timeout.
conf.setLong(DFSConfigKeys.DFS_DATANODE_CACHE_REVOCATION_TIMEOUT_MS, 250L);
// Poll very often
conf.setLong(DFSConfigKeys.DFS_DATANODE_CACHE_REVOCATION_POLLING_MS, 2L);
MiniDFSCluster cluster = null;
cluster = new MiniDFSCluster.Builder(conf).numDataNodes(1).build();
cluster.waitActive();
DistributedFileSystem dfs = cluster.getFileSystem();
// Create and cache a file.
final String TEST_FILE = "/test_file2";
DFSTestUtil.createFile(dfs, new Path(TEST_FILE),
BLOCK_SIZE, (short)1, 0xcafe);
dfs.addCachePool(new CachePoolInfo("pool"));
long cacheDirectiveId =
dfs.addCacheDirective(new CacheDirectiveInfo.Builder().
setPool("pool").setPath(new Path(TEST_FILE)).
setReplication((short) 1).build());
FsDatasetSpi<?> fsd = cluster.getDataNodes().get(0).getFSDataset();
DFSTestUtil.verifyExpectedCacheUsage(BLOCK_SIZE, 1, fsd);
// Mmap the file.
FSDataInputStream in = dfs.open(new Path(TEST_FILE));
ByteBuffer buf =
in.read(null, BLOCK_SIZE, EnumSet.noneOf(ReadOption.class));
// Attempt to uncache file. The file should get uncached.
LOG.info("removing cache directive {}", cacheDirectiveId);
dfs.removeCacheDirective(cacheDirectiveId);
LOG.info("finished removing cache directive {}", cacheDirectiveId);
Thread.sleep(1000);
DFSTestUtil.verifyExpectedCacheUsage(0, 0, fsd);
// Cleanup
in.releaseBuffer(buf);
in.close();
cluster.shutdown();
}
@Override
public ByteBuffer read(ByteBufferPool bufferPool, int maxLength, EnumSet<ReadOption> opts) throws IOException, UnsupportedOperationException {
try (WaitRecorder recorder = OperatorStats.getWaitRecorder(operatorStats)) {
return super.read(bufferPool, maxLength, opts);
}
}
@Override
public ByteBuffer read(ByteBufferPool bufferPool, int maxLength, EnumSet<ReadOption> opts) throws IOException, UnsupportedOperationException {
try (WaitRecorder recorder = OperatorStats.getWaitRecorder(operatorStats)) {
return super.read(bufferPool, maxLength, opts);
}
}
@Override
public ClientMmap getClientMmap(EnumSet<ReadOption> opts) {
return null;
}
private synchronized ByteBuffer tryReadZeroCopy(int maxLength,
EnumSet<ReadOption> opts) throws IOException {
// Copy 'pos' and 'blockEnd' to local variables to make it easier for the
// JVM to optimize this function.
final long curPos = pos;
final long curEnd = blockEnd;
final long blockStartInFile = currentLocatedBlock.getStartOffset();
final long blockPos = curPos - blockStartInFile;
// Shorten this read if the end of the block is nearby.
long length63;
if ((curPos + maxLength) <= (curEnd + 1)) {
length63 = maxLength;
} else {
length63 = 1 + curEnd - curPos;
if (length63 <= 0) {
if (DFSClient.LOG.isDebugEnabled()) {
DFSClient.LOG.debug("Unable to perform a zero-copy read from offset " +
curPos + " of " + src + "; " + length63 + " bytes left in block. " +
"blockPos=" + blockPos + "; curPos=" + curPos +
"; curEnd=" + curEnd);
}
return null;
}
if (DFSClient.LOG.isDebugEnabled()) {
DFSClient.LOG.debug("Reducing read length from " + maxLength +
" to " + length63 + " to avoid going more than one byte " +
"past the end of the block. blockPos=" + blockPos +
"; curPos=" + curPos + "; curEnd=" + curEnd);
}
}
// Make sure that don't go beyond 31-bit offsets in the MappedByteBuffer.
int length;
if (blockPos + length63 <= Integer.MAX_VALUE) {
length = (int)length63;
} else {
long length31 = Integer.MAX_VALUE - blockPos;
if (length31 <= 0) {
// Java ByteBuffers can't be longer than 2 GB, because they use
// 4-byte signed integers to represent capacity, etc.
// So we can't mmap the parts of the block higher than the 2 GB offset.
// FIXME: we could work around this with multiple memory maps.
// See HDFS-5101.
if (DFSClient.LOG.isDebugEnabled()) {
DFSClient.LOG.debug("Unable to perform a zero-copy read from offset " +
curPos + " of " + src + "; 31-bit MappedByteBuffer limit " +
"exceeded. blockPos=" + blockPos + ", curEnd=" + curEnd);
}
return null;
}
length = (int)length31;
if (DFSClient.LOG.isDebugEnabled()) {
DFSClient.LOG.debug("Reducing read length from " + maxLength +
" to " + length + " to avoid 31-bit limit. " +
"blockPos=" + blockPos + "; curPos=" + curPos +
"; curEnd=" + curEnd);
}
}
final ClientMmap clientMmap = blockReader.getClientMmap(opts);
if (clientMmap == null) {
if (DFSClient.LOG.isDebugEnabled()) {
DFSClient.LOG.debug("unable to perform a zero-copy read from offset " +
curPos + " of " + src + "; BlockReader#getClientMmap returned " +
"null.");
}
return null;
}
boolean success = false;
ByteBuffer buffer;
try {
seek(curPos + length);
buffer = clientMmap.getMappedByteBuffer().asReadOnlyBuffer();
buffer.position((int)blockPos);
buffer.limit((int)(blockPos + length));
getExtendedReadBuffers().put(buffer, clientMmap);
synchronized (infoLock) {
readStatistics.addZeroCopyBytes(length);
}
if (DFSClient.LOG.isDebugEnabled()) {
DFSClient.LOG.debug("readZeroCopy read " + length +
" bytes from offset " + curPos + " via the zero-copy read " +
"path. blockEnd = " + blockEnd);
}
success = true;
} finally {
if (!success) {
IOUtils.closeQuietly(clientMmap);
}
}
return buffer;
}
@Override
public ClientMmap getClientMmap(EnumSet<ReadOption> opts) {
return null;
}
@Override
public ClientMmap getClientMmap(EnumSet<ReadOption> opts) {
return null;
}
/**
* Test that when a client has a replica mmapped, we will not un-mlock that
* replica for a reasonable amount of time, even if an uncache request
* occurs.
*/
@Test(timeout=120000)
public void testPinning() throws Exception {
assumeTrue(NativeCodeLoader.isNativeCodeLoaded() && !Path.WINDOWS);
Configuration conf = getDefaultConf();
// Set a really long revocation timeout, so that we won't reach it during
// this test.
conf.setLong(DFSConfigKeys.DFS_DATANODE_CACHE_REVOCATION_TIMEOUT_MS,
1800000L);
// Poll very often
conf.setLong(DFSConfigKeys.DFS_DATANODE_CACHE_REVOCATION_POLLING_MS, 2L);
MiniDFSCluster cluster = null;
cluster = new MiniDFSCluster.Builder(conf).numDataNodes(1).build();
cluster.waitActive();
DistributedFileSystem dfs = cluster.getFileSystem();
// Create and cache a file.
final String TEST_FILE = "/test_file";
DFSTestUtil.createFile(dfs, new Path(TEST_FILE),
BLOCK_SIZE, (short)1, 0xcafe);
dfs.addCachePool(new CachePoolInfo("pool"));
long cacheDirectiveId =
dfs.addCacheDirective(new CacheDirectiveInfo.Builder().
setPool("pool").setPath(new Path(TEST_FILE)).
setReplication((short) 1).build());
FsDatasetSpi<?> fsd = cluster.getDataNodes().get(0).getFSDataset();
DFSTestUtil.verifyExpectedCacheUsage(BLOCK_SIZE, 1, fsd);
// Mmap the file.
FSDataInputStream in = dfs.open(new Path(TEST_FILE));
ByteBuffer buf =
in.read(null, BLOCK_SIZE, EnumSet.noneOf(ReadOption.class));
// Attempt to uncache file. The file should still be cached.
dfs.removeCacheDirective(cacheDirectiveId);
Thread.sleep(500);
DFSTestUtil.verifyExpectedCacheUsage(BLOCK_SIZE, 1, fsd);
// Un-mmap the file. The file should be uncached after this.
in.releaseBuffer(buf);
DFSTestUtil.verifyExpectedCacheUsage(0, 0, fsd);
// Cleanup
in.close();
cluster.shutdown();
}
/**
* Test that when we have an uncache request, and the client refuses to release
* the replica for a long time, we will un-mlock it.
*/
@Test(timeout=120000)
public void testRevocation() throws Exception {
assumeTrue(NativeCodeLoader.isNativeCodeLoaded() && !Path.WINDOWS);
BlockReaderTestUtil.enableHdfsCachingTracing();
BlockReaderTestUtil.enableShortCircuitShmTracing();
Configuration conf = getDefaultConf();
// Set a really short revocation timeout.
conf.setLong(DFSConfigKeys.DFS_DATANODE_CACHE_REVOCATION_TIMEOUT_MS, 250L);
// Poll very often
conf.setLong(DFSConfigKeys.DFS_DATANODE_CACHE_REVOCATION_POLLING_MS, 2L);
MiniDFSCluster cluster = null;
cluster = new MiniDFSCluster.Builder(conf).numDataNodes(1).build();
cluster.waitActive();
DistributedFileSystem dfs = cluster.getFileSystem();
// Create and cache a file.
final String TEST_FILE = "/test_file2";
DFSTestUtil.createFile(dfs, new Path(TEST_FILE),
BLOCK_SIZE, (short)1, 0xcafe);
dfs.addCachePool(new CachePoolInfo("pool"));
long cacheDirectiveId =
dfs.addCacheDirective(new CacheDirectiveInfo.Builder().
setPool("pool").setPath(new Path(TEST_FILE)).
setReplication((short) 1).build());
FsDatasetSpi<?> fsd = cluster.getDataNodes().get(0).getFSDataset();
DFSTestUtil.verifyExpectedCacheUsage(BLOCK_SIZE, 1, fsd);
// Mmap the file.
FSDataInputStream in = dfs.open(new Path(TEST_FILE));
ByteBuffer buf =
in.read(null, BLOCK_SIZE, EnumSet.noneOf(ReadOption.class));
// Attempt to uncache file. The file should get uncached.
LOG.info("removing cache directive {}", cacheDirectiveId);
dfs.removeCacheDirective(cacheDirectiveId);
LOG.info("finished removing cache directive {}", cacheDirectiveId);
Thread.sleep(1000);
DFSTestUtil.verifyExpectedCacheUsage(0, 0, fsd);
// Cleanup
in.releaseBuffer(buf);
in.close();
cluster.shutdown();
}
@Override
public ByteBuffer read(ByteBufferPool paramByteBufferPool,
int paramInt, EnumSet<ReadOption> paramEnumSet)
throws IOException, UnsupportedOperationException {
return null;
}
/**
* Get a ClientMmap object for this BlockReader.
*
* @param opts The read options to use.
* @return The ClientMmap object, or null if mmap is not
* supported.
*/
ClientMmap getClientMmap(EnumSet<ReadOption> opts);