下面列出了org.apache.hadoop.fs.FSDataInputStream#readFully ( ) 实例代码,或者点击链接到github查看源代码,也可以在右侧发表评论。
@Override
public void initialize(InputSplit inSplit, TaskAttemptContext context)
throws IOException, InterruptedException {
bytesTotal = inSplit.getLength();
Path file = ((FileSplit)inSplit).getPath();
FileSystem fs = file.getFileSystem(context.getConfiguration());
FSDataInputStream fileIn = fs.open(file);
key.set(file.toString());
byte[] buf = new byte[(int)inSplit.getLength()];
try {
fileIn.readFully(buf);
value.set(buf);
hasNext = true;
} catch (Exception e) {
hasNext = false;
} finally {
fileIn.close();
}
}
/** check if the files have been copied correctly. */
public boolean checkFiles(FileSystem fs, String topdir) throws IOException {
//Configuration conf = new Configuration();
Path root = new Path(topdir);
for (int idx = 0; idx < nFiles; idx++) {
Path fPath = new Path(root, files[idx].getName());
FSDataInputStream in = fs.open(fPath);
byte[] toRead = new byte[files[idx].getSize()];
byte[] toCompare = new byte[files[idx].getSize()];
Random rb = new Random(files[idx].getSeed());
rb.nextBytes(toCompare);
in.readFully(0, toRead);
in.close();
for (int i = 0; i < toRead.length; i++) {
if (toRead[i] != toCompare[i]) {
return false;
}
}
toRead = null;
toCompare = null;
}
return true;
}
public static void reportCorruptBlocks(FileSystem fs, Path file, int[] idxs,
long blockSize) throws IOException {
FSDataInputStream in = fs.open(file);
try {
for (int idx: idxs) {
long offset = idx * blockSize;
LOG.info("Reporting corrupt block " + file + ":" + offset);
in.seek(offset);
try {
in.readFully(new byte[(int)blockSize]);
fail("Expected exception not thrown for " + file + ":" + offset);
} catch (org.apache.hadoop.fs.ChecksumException e) {
} catch (org.apache.hadoop.fs.BlockMissingException bme) {
}
}
} finally {
in.close();
}
}
/** check if the files have been copied correctly. */
public boolean checkFiles(FileSystem fs, String topdir) throws IOException {
//Configuration conf = new Configuration();
Path root = new Path(topdir);
for (int idx = 0; idx < nFiles; idx++) {
Path fPath = new Path(root, files[idx].getName());
FSDataInputStream in = fs.open(fPath);
byte[] toRead = new byte[files[idx].getSize()];
byte[] toCompare = new byte[files[idx].getSize()];
Random rb = new Random(files[idx].getSeed());
rb.nextBytes(toCompare);
in.readFully(0, toRead);
in.close();
for (int i = 0; i < toRead.length; i++) {
if (toRead[i] != toCompare[i]) {
return false;
}
}
toRead = null;
toCompare = null;
}
return true;
}
private static TableDescriptor readTableDescriptor(FileSystem fs, FileStatus status)
throws IOException {
int len = Ints.checkedCast(status.getLen());
byte [] content = new byte[len];
FSDataInputStream fsDataInputStream = fs.open(status.getPath());
try {
fsDataInputStream.readFully(content);
} finally {
fsDataInputStream.close();
}
TableDescriptor htd = null;
try {
htd = TableDescriptorBuilder.parseFrom(content);
} catch (DeserializationException e) {
throw new IOException("content=" + Bytes.toShort(content), e);
}
return htd;
}
/**
* Test DistributedRaidFileSystem with relative path
*/
public void testRelativePath() throws Exception {
stripeLength = 3;
mySetup("xor", 1);
try {
DistributedRaidFileSystem raidfs = getRaidFS();
Path file = new Path(raidfs.getHomeDirectory(), "raidtest/file1");
Path file1 = new Path("raidtest/file1");
long crc = createTestFile(raidfs.getFileSystem(), file, 1, 8, 8192L);
FileStatus stat = fileSys.getFileStatus(file);
LOG.info("Created " + file + ", crc=" + crc + ", len=" + stat.getLen());
byte[] filebytes = new byte[(int)stat.getLen()];
// Test that readFully returns the correct CRC when there are no errors.
FSDataInputStream stm = raidfs.open(file);
stm.readFully(0, filebytes);
assertEquals(crc, bufferCRC(filebytes));
stm.close();
stm = raidfs.open(file1);
stm.readFully(0, filebytes);
assertEquals(crc, bufferCRC(filebytes));
stm.close();
} finally {
myTearDown();
}
}
private static byte[] readAllWithReadFully(int totalLength, FSDataInputStream fsdis, boolean close)
throws IOException {
final ByteArrayOutputStream baos = new ByteArrayOutputStream();
// Simulate reading of some data structures of known length:
final byte[] buffer = new byte[17];
final int times = totalLength / buffer.length;
final int remainder = totalLength % buffer.length;
// it would be simpler to leave the position tracking to the
// InputStream, but we need to check the methods #readFully(2)
// and #readFully(4) that receive the position as a parameter:
int position = 0;
try {
// read "data structures":
for (int i=0; i<times; i++) {
fsdis.readFully(position, buffer);
position += buffer.length;
baos.write(buffer);
}
if (remainder > 0) {
// read the remainder:
fsdis.readFully(position, buffer, 0, remainder);
position += remainder;
baos.write(buffer, 0, remainder);
}
try {
fsdis.readFully(position, buffer, 0, 1);
assertTrue(false);
} catch (IOException ioe) {
// okay
}
assertEquals(totalLength, position);
final byte[] result = baos.toByteArray();
assertEquals(totalLength, result.length);
return result;
} finally {
if (close) {
fsdis.close();
}
}
}
/**
* Verify that the read at a specific offset in a stream
* matches that expected
* @param stm stream
* @param fileContents original file contents
* @param seekOff seek offset
* @param toRead number of bytes to read
* @throws IOException IO problems
*/
public static void verifyRead(FSDataInputStream stm, byte[] fileContents,
int seekOff, int toRead) throws IOException {
byte[] out = new byte[toRead];
stm.seek(seekOff);
stm.readFully(out);
byte[] expected = Arrays.copyOfRange(fileContents, seekOff,
seekOff + toRead);
compareByteArrays(expected, out,toRead);
}
private void checkFile(FileSystem fileSys, Path name, int repl)
throws IOException {
boolean done = false;
// wait till all full blocks are confirmed by the datanodes.
while (!done) {
try {
Thread.sleep(1000);
} catch (InterruptedException e) {}
done = true;
BlockLocation[] locations = fileSys.getFileBlockLocations(
fileSys.getFileStatus(name), 0, fileSize);
if (locations.length < numBlocks) {
done = false;
continue;
}
for (int idx = 0; idx < locations.length; idx++) {
if (locations[idx].getHosts().length < repl) {
done = false;
break;
}
}
}
FSDataInputStream stm = fileSys.open(name);
final byte[] expected;
if (simulatedStorage) {
expected = new byte[numBlocks * blockSize];
for (int i= 0; i < expected.length; i++) {
expected[i] = SimulatedFSDataset.DEFAULT_DATABYTE;
}
} else {
expected = AppendTestUtil.randomBytes(seed, numBlocks*blockSize);
}
// do a sanity check. Read the file
byte[] actual = new byte[numBlocks * blockSize];
stm.readFully(0, actual);
stm.close();
checkData(actual, 0, expected, "Read 1");
}
private void checkFile(Path name) throws Exception {
FSDataInputStream stm = fileSys.open(name);
// do a sanity check. Read the file
stm.readFully(0, actual);
checkAndEraseData(actual, 0, expected, "Read Sanity Test");
stm.close();
}
private void checkFile(Path name) throws Exception {
FSDataInputStream stm = fileSys.open(name);
// do a sanity check. Read the file
stm.readFully(0, actual);
checkAndEraseData(actual, 0, expected, "Read Sanity Test");
stm.close();
// do a sanity check. Get the file checksum
fileSys.getFileChecksum(name);
}
public static Path readRealPathDataFromSymlinkPath(FileSystem fileSystem, Path linkPath) throws IOException,
UnsupportedEncodingException {
FileStatus fileStatus = fileSystem.getFileStatus(linkPath);
FSDataInputStream inputStream = fileSystem.open(linkPath);
byte[] buf = new byte[(int) fileStatus.getLen()];
inputStream.readFully(buf);
inputStream.close();
Path path = new Path(new String(buf, UTF_8));
return path;
}
private byte[] readFile(Path file, long numBytes) throws IOException {
byte[] data = new byte[(int)numBytes];
FSDataInputStream in = fs.open(file);
try {
in.readFully(data);
} finally {
IOUtils.cleanup(LOG, in);
}
return data;
}
static String slurpHadoop(Path p, FileSystem fs) throws IOException {
int len = (int) fs.getFileStatus(p).getLen();
byte[] buf = new byte[len];
FSDataInputStream in = fs.open(p);
String contents = null;
try {
in.readFully(in.getPos(), buf);
contents = new String(buf, "UTF-8");
} finally {
in.close();
}
return contents;
}
private static byte[] readAllWithReadFully(int totalLength, FSDataInputStream fsdis, boolean close)
throws IOException {
final ByteArrayOutputStream baos = new ByteArrayOutputStream();
// Simulate reading of some data structures of known length:
final byte[] buffer = new byte[17];
final int times = totalLength / buffer.length;
final int remainder = totalLength % buffer.length;
// it would be simpler to leave the position tracking to the
// InputStream, but we need to check the methods #readFully(2)
// and #readFully(4) that receive the position as a parameter:
int position = 0;
try {
// read "data structures":
for (int i=0; i<times; i++) {
fsdis.readFully(position, buffer);
position += buffer.length;
baos.write(buffer);
}
if (remainder > 0) {
// read the remainder:
fsdis.readFully(position, buffer, 0, remainder);
position += remainder;
baos.write(buffer, 0, remainder);
}
try {
fsdis.readFully(position, buffer, 0, 1);
assertTrue(false);
} catch (IOException ioe) {
// okay
}
assertEquals(totalLength, position);
final byte[] result = baos.toByteArray();
assertEquals(totalLength, result.length);
return result;
} finally {
if (close) {
fsdis.close();
}
}
}
private void datanodeRestartTest(MiniDFSCluster cluster, FileSystem fileSys,
Path name) throws IOException {
// skip this test if using simulated storage since simulated blocks
// don't survive datanode restarts.
if (simulatedStorage) {
return;
}
int numBlocks = 1;
assertTrue(numBlocks <= DFSConfigKeys.DFS_CLIENT_MAX_BLOCK_ACQUIRE_FAILURES_DEFAULT);
byte[] expected = new byte[numBlocks * blockSize];
Random rand = new Random(seed);
rand.nextBytes(expected);
byte[] actual = new byte[numBlocks * blockSize];
FSDataInputStream stm = fileSys.open(name);
// read a block and get block locations cached as a result
stm.readFully(0, actual);
checkAndEraseData(actual, 0, expected, "Pread Datanode Restart Setup");
// restart all datanodes. it is expected that they will
// restart on different ports, hence, cached block locations
// will no longer work.
assertTrue(cluster.restartDataNodes());
cluster.waitActive();
// verify the block can be read again using the same InputStream
// (via re-fetching of block locations from namenode). there is a
// 3 sec sleep in chooseDataNode(), which can be shortened for
// this test if configurable.
stm.readFully(0, actual);
checkAndEraseData(actual, 0, expected, "Pread Datanode Restart Test");
}
public void testMissingBlocksAlert() throws IOException,
InterruptedException {
MiniDFSCluster cluster = null;
try {
Configuration conf = new Configuration();
//minimize test delay
conf.setInt("dfs.replication.interval", 0);
int fileLen = 10*1024;
//start a cluster with single datanode
cluster = new MiniDFSCluster(conf, 1, true, null);
cluster.waitActive();
DistributedFileSystem dfs =
(DistributedFileSystem) cluster.getFileSystem();
// create a normal file
DFSTestUtil.createFile(dfs, new Path("/testMissingBlocksAlert/file1"),
fileLen, (short)3, 0);
Path corruptFile = new Path("/testMissingBlocks/corruptFile");
DFSTestUtil.createFile(dfs, corruptFile, fileLen, (short)3, 0);
// Corrupt the block
String block = DFSTestUtil.getFirstBlock(dfs, corruptFile).getBlockName();
TestDatanodeBlockScanner.corruptReplica(block, 0);
// read the file so that the corrupt block is reported to NN
FSDataInputStream in = dfs.open(corruptFile);
try {
in.readFully(new byte[fileLen]);
} catch (ChecksumException ignored) { // checksum error is expected.
}
in.close();
LOG.info("Waiting for missing blocks count to increase...");
while (dfs.getMissingBlocksCount() <= 0) {
Thread.sleep(100);
}
assertTrue(dfs.getMissingBlocksCount() == 1);
// Now verify that it shows up on webui
URL url = new URL("http://" + conf.get("dfs.http.address") +
"/dfshealth.jsp");
String dfsFrontPage = DFSTestUtil.urlGet(url);
String warnStr = "WARNING : There are about ";
assertTrue("HDFS Front page does not contain expected warning",
dfsFrontPage.contains(warnStr + "1 missing blocks"));
// now do the reverse : remove the file expect the number of missing
// blocks to go to zero
dfs.delete(corruptFile, true);
LOG.info("Waiting for missing blocks count to be zero...");
while (dfs.getMissingBlocksCount() > 0) {
Thread.sleep(100);
}
// and make sure WARNING disappears
// Now verify that it shows up on webui
dfsFrontPage = DFSTestUtil.urlGet(url);
assertFalse("HDFS Front page contains unexpected warning",
dfsFrontPage.contains(warnStr));
} finally {
if (cluster != null) {
cluster.shutdown();
}
}
}
public void testCacheFileSystem() throws IOException {
// configure a cached filessytem, cache size is 10 KB.
mySetup(10*1024L);
try {
// create a 5K file using the LookasideCache. This write
// should be cached in the cache.
Path file = new Path("/hdfs/testRead");
long crc = createTestFile(lfs, file, 1, 5, 1024L);
FileStatus stat = lfs.getFileStatus(file);
LOG.info("Created " + file + ", crc=" + crc + ", len=" + stat.getLen());
assertTrue(lfs.lookasideCache.getCacheSize() == 5 * 1024);
// Test that readFully via the Lookasidecache fetches correct data
// from the cache.
FSDataInputStream stm = lfs.open(file);
byte[] filebytes = new byte[(int)stat.getLen()];
stm.readFully(0, filebytes);
assertEquals(crc, bufferCRC(filebytes));
stm.close();
// assert that there is one element of size 5K in the cache
assertEquals(5*1024, lfs.lookasideCache.getCacheSize());
// create a 6K file using the LookasideCache. This is an
// overwrite of the earlier file, so the cache should reflect
// the new size of the file.
crc = createTestFile(lfs, file, 1, 6, 1024L);
stat = lfs.getFileStatus(file);
LOG.info("Created " + file + ", crc=" + crc + ", len=" + stat.getLen());
// assert that there is one element of size 6K in the cache
assertEquals(6*1024, lfs.lookasideCache.getCacheSize());
// verify reading file2 from the cache
stm = lfs.open(file);
filebytes = new byte[(int)stat.getLen()];
stm.readFully(0, filebytes);
assertEquals(crc, bufferCRC(filebytes));
stm.close();
// add a 5 KB file to the cache. This should start eviction of
// the earlier file.
Path file2 = new Path("/hdfs/testRead2");
crc = createTestFile(lfs, file2, 1, 5, 1024L);
stat = lfs.getFileStatus(file2);
LOG.info("Created " + file2 + ", crc=" + crc + ", len=" + stat.getLen());
assertEquals(5*1024, lfs.lookasideCache.getCacheSize());
// move file2 to file3
Path file3 = new Path("/hdfs/testRead3");
assertTrue(lfs.rename(file2, file3));
// delete file3. This should clear out the cache.
lfs.delete(file3, false);
assertEquals(0, lfs.lookasideCache.getCacheSize());
} finally {
myTearDown();
}
}
/**
* Read the list of ranges from the file.
* @param file the file to read
* @param base the base of the stripe
* @param range the disk ranges within the stripe to read
* @return the bytes read for each disk range, which is the same length as
* ranges
* @throws IOException
*/
private DiskRangeList readDiskRanges(FSDataInputStream file,
ZeroCopyAdapter zcr,
long base,
DiskRangeList range,
boolean doForceDirect) throws IOException {
if (range == null) return null;
DiskRangeList prev = range.prev;
if (prev == null) {
prev = new DiskRangeList.MutateHelper(range);
}
while (range != null) {
if (range.hasData()) {
range = range.next;
continue;
}
int len = (int) (range.getEnd() - range.getOffset());
long off = range.getOffset();
if (zcr != null) {
file.seek(base + off);
boolean hasReplaced = false;
while (len > 0) {
ByteBuffer partial = zcr.readBuffer(len, false);
readBytes += partial.remaining();
BufferChunk bc = new BufferChunk(partial, off);
if (!hasReplaced) {
range.replaceSelfWith(bc);
hasReplaced = true;
} else {
range.insertAfter(bc);
}
range = bc;
int read = partial.remaining();
len -= read;
off += read;
}
} else {
// Don't use HDFS ByteBuffer API because it has no readFully, and is buggy and pointless.
byte[] buffer = new byte[len];
file.readFully((base + off), buffer, 0, buffer.length);
readBytes += buffer.length;
ByteBuffer bb = null;
if (doForceDirect) {
bb = ByteBuffer.allocateDirect(len);
bb.put(buffer);
bb.position(0);
bb.limit(len);
} else {
bb = ByteBuffer.wrap(buffer);
}
range = range.replaceSelfWith(new BufferChunk(bb, range.getOffset()));
}
range = range.next;
}
return prev.next;
}
private void pReadFile(FileSystem fileSys, Path name) throws IOException {
FSDataInputStream stm = fileSys.open(name);
byte[] expected = new byte[12 * blockSize];
if (simulatedStorage) {
for (int i= 0; i < expected.length; i++) {
expected[i] = SimulatedFSDataset.DEFAULT_DATABYTE;
}
} else {
Random rand = new Random(seed);
rand.nextBytes(expected);
}
// do a sanity check. Read first 4K bytes
byte[] actual = new byte[4096];
stm.readFully(actual);
checkAndEraseData(actual, 0, expected, "Read Sanity Test");
// now do a pread for the first 8K bytes
actual = new byte[8192];
doPread(stm, 0L, actual, 0, 8192);
checkAndEraseData(actual, 0, expected, "Pread Test 1");
// Now check to see if the normal read returns 4K-8K byte range
actual = new byte[4096];
stm.readFully(actual);
checkAndEraseData(actual, 4096, expected, "Pread Test 2");
// Now see if we can cross a single block boundary successfully
// read 4K bytes from blockSize - 2K offset
stm.readFully(blockSize - 2048, actual, 0, 4096);
checkAndEraseData(actual, (blockSize - 2048), expected, "Pread Test 3");
// now see if we can cross two block boundaries successfully
// read blockSize + 4K bytes from blockSize - 2K offset
actual = new byte[blockSize + 4096];
stm.readFully(blockSize - 2048, actual);
checkAndEraseData(actual, (blockSize - 2048), expected, "Pread Test 4");
// now see if we can cross two block boundaries that are not cached
// read blockSize + 4K bytes from 10*blockSize - 2K offset
actual = new byte[blockSize + 4096];
stm.readFully(10 * blockSize - 2048, actual);
checkAndEraseData(actual, (10 * blockSize - 2048), expected, "Pread Test 5");
// now check that even after all these preads, we can still read
// bytes 8K-12K
actual = new byte[4096];
stm.readFully(actual);
checkAndEraseData(actual, 8192, expected, "Pread Test 6");
// done
stm.close();
// check block location caching
stm = fileSys.open(name);
stm.readFully(1, actual, 0, 4096);
stm.readFully(4*blockSize, actual, 0, 4096);
stm.readFully(7*blockSize, actual, 0, 4096);
actual = new byte[3*4096];
stm.readFully(0*blockSize, actual, 0, 3*4096);
checkAndEraseData(actual, 0, expected, "Pread Test 7");
actual = new byte[8*4096];
stm.readFully(3*blockSize, actual, 0, 8*4096);
checkAndEraseData(actual, 3*blockSize, expected, "Pread Test 8");
// read the tail
stm.readFully(11*blockSize+blockSize/2, actual, 0, blockSize/2);
IOException res = null;
try { // read beyond the end of the file
stm.readFully(11*blockSize+blockSize/2, actual, 0, blockSize);
} catch (IOException e) {
// should throw an exception
res = e;
}
assertTrue("Error reading beyond file boundary.", res != null);
stm.close();
}