下面列出了org.apache.hadoop.fs.FSDataInputStream#setDropBehind ( ) 实例代码,或者点击链接到github查看源代码,也可以在右侧发表评论。
static long readHdfsFile(FileSystem fs, Path p, long length,
Boolean dropBehind) throws Exception {
FSDataInputStream fis = null;
long totalRead = 0;
try {
fis = fs.open(p);
if (dropBehind != null) {
fis.setDropBehind(dropBehind);
}
byte buf[] = new byte[8196];
while (length > 0) {
int amt = (length > buf.length) ? buf.length : (int)length;
int ret = fis.read(buf, 0, amt);
if (ret == -1) {
return totalRead;
}
totalRead += ret;
length -= ret;
}
} catch (IOException e) {
LOG.error("ioexception", e);
} finally {
if (fis != null) {
fis.close();
}
}
throw new RuntimeException("unreachable");
}
@Test(timeout=120000)
public void testSeekAfterSetDropBehind() throws Exception {
// start a cluster
LOG.info("testSeekAfterSetDropBehind");
Configuration conf = new HdfsConfiguration();
MiniDFSCluster cluster = null;
String TEST_PATH = "/test";
int TEST_PATH_LEN = MAX_TEST_FILE_LEN;
try {
cluster = new MiniDFSCluster.Builder(conf).numDataNodes(1)
.build();
cluster.waitActive();
FileSystem fs = cluster.getFileSystem();
createHdfsFile(fs, new Path(TEST_PATH), TEST_PATH_LEN, false);
// verify that we can seek after setDropBehind
FSDataInputStream fis = fs.open(new Path(TEST_PATH));
try {
Assert.assertTrue(fis.read() != -1); // create BlockReader
fis.setDropBehind(false); // clear BlockReader
fis.seek(2); // seek
} finally {
fis.close();
}
} finally {
if (cluster != null) {
cluster.shutdown();
}
}
}
static long readHdfsFile(FileSystem fs, Path p, long length,
Boolean dropBehind) throws Exception {
FSDataInputStream fis = null;
long totalRead = 0;
try {
fis = fs.open(p);
if (dropBehind != null) {
fis.setDropBehind(dropBehind);
}
byte buf[] = new byte[8196];
while (length > 0) {
int amt = (length > buf.length) ? buf.length : (int)length;
int ret = fis.read(buf, 0, amt);
if (ret == -1) {
return totalRead;
}
totalRead += ret;
length -= ret;
}
} catch (IOException e) {
LOG.error("ioexception", e);
} finally {
if (fis != null) {
fis.close();
}
}
throw new RuntimeException("unreachable");
}
@Test(timeout=120000)
public void testSeekAfterSetDropBehind() throws Exception {
// start a cluster
LOG.info("testSeekAfterSetDropBehind");
Configuration conf = new HdfsConfiguration();
MiniDFSCluster cluster = null;
String TEST_PATH = "/test";
int TEST_PATH_LEN = MAX_TEST_FILE_LEN;
try {
cluster = new MiniDFSCluster.Builder(conf).numDataNodes(1)
.build();
cluster.waitActive();
FileSystem fs = cluster.getFileSystem();
createHdfsFile(fs, new Path(TEST_PATH), TEST_PATH_LEN, false);
// verify that we can seek after setDropBehind
FSDataInputStream fis = fs.open(new Path(TEST_PATH));
try {
Assert.assertTrue(fis.read() != -1); // create BlockReader
fis.setDropBehind(false); // clear BlockReader
fis.seek(2); // seek
} finally {
fis.close();
}
} finally {
if (cluster != null) {
cluster.shutdown();
}
}
}