下面列出了怎么用org.apache.hadoop.fs.ChecksumException的API类实例代码及写法,或者点击链接到github查看源代码。
@Override
public int doRead(BlockReader blockReader, int off, int len)
throws ChecksumException, IOException {
int oldpos = buf.position();
int oldlimit = buf.limit();
boolean success = false;
try {
int ret = blockReader.read(buf);
success = true;
updateReadStatistics(readStatistics, ret, blockReader);
return ret;
} finally {
if (!success) {
// Reset to original state so that retries work correctly.
buf.position(oldpos);
buf.limit(oldlimit);
}
}
}
/**
* Verify multiple CRC chunks.
*/
private void verifyChunks(ByteBuffer dataBuf, ByteBuffer checksumBuf)
throws IOException {
try {
clientChecksum.verifyChunkedSums(dataBuf, checksumBuf, clientname, 0);
} catch (ChecksumException ce) {
LOG.warn("Checksum error in block " + block + " from " + inAddr, ce);
// No need to report to namenode when client is writing.
if (srcDataNode != null && isDatanode) {
try {
LOG.info("report corrupt " + block + " from datanode " +
srcDataNode + " to namenode");
datanode.reportRemoteBadBlock(srcDataNode, block);
} catch (IOException e) {
LOG.warn("Failed to report bad " + block +
" from datanode " + srcDataNode + " to namenode");
}
}
throw new IOException("Unexpected checksum mismatch while writing "
+ block + " from " + inAddr);
}
}
/**
* Compute checksum for chunks and verify the checksum that is read from
* the metadata file is correct.
*
* @param buf buffer that has checksum and data
* @param dataOffset position where data is written in the buf
* @param datalen length of data
* @param numChunks number of chunks corresponding to data
* @param checksumOffset offset where checksum is written in the buf
* @throws ChecksumException on failed checksum verification
*/
public void verifyChecksum(final byte[] buf, final int dataOffset,
final int datalen, final int numChunks, final int checksumOffset)
throws ChecksumException {
int dOff = dataOffset;
int cOff = checksumOffset;
int dLeft = datalen;
for (int i = 0; i < numChunks; i++) {
checksum.reset();
int dLen = Math.min(dLeft, chunkSize);
checksum.update(buf, dOff, dLen);
if (!checksum.compare(buf, cOff)) {
long failedPos = offset + datalen - dLeft;
throw new ChecksumException("Checksum failed at " + failedPos,
failedPos);
}
dLeft -= dLen;
dOff += dLen;
cOff += checksumSize;
}
}
private long tailFile(Path file, long startPos) throws IOException {
long numRead = 0;
FSDataInputStream inputStream = fileSystem.open(file);
inputStream.seek(startPos);
int len = 4 * 1024;
byte[] buf = new byte[len];
int read;
while ((read = inputStream.read(buf)) > -1) {
LOG.info(String.format("read %d bytes", read));
if (!validateSequentialBytes(buf, (int) (startPos + numRead), read)) {
LOG.error(String.format("invalid bytes: [%s]\n", Arrays.toString(buf)));
throw new ChecksumException(
String.format("unable to validate bytes"),
startPos
);
}
numRead += read;
}
inputStream.close();
return numRead + startPos - 1;
}
/**
* Ask dfs client to read the file
*/
private void dfsClientReadFile(Path corruptedFile) throws IOException,
UnresolvedLinkException {
DFSInputStream in = dfs.dfs.open(corruptedFile.toUri().getPath());
byte[] buf = new byte[buffersize];
int nRead = 0; // total number of bytes read
try {
do {
nRead = in.read(buf, 0, buf.length);
} while (nRead > 0);
} catch (ChecksumException ce) {
// caught ChecksumException if all replicas are bad, ignore and continue.
LOG.debug("DfsClientReadFile caught ChecksumException.");
} catch (BlockMissingException bme) {
// caught BlockMissingException, ignore.
LOG.debug("DfsClientReadFile caught BlockMissingException.");
}
}
public void doShortCircuitReadBlockFileCorruptionTest() throws IOException,
InterruptedException {
final String METHOD_NAME = GenericTestUtils.getMethodName();
Path path1 = new Path("/" + METHOD_NAME + ".01.dat");
Path path2 = new Path("/" + METHOD_NAME + ".02.dat");
final int SEED = 0xFADED;
makeRandomTestFile(path1, BLOCK_SIZE, true, SEED);
ensureFileReplicasOnStorageType(path1, RAM_DISK);
// Create another file with a replica on RAM_DISK, which evicts the first.
makeRandomTestFile(path2, BLOCK_SIZE, true, SEED);
// Sleep for a short time to allow the lazy writer thread to do its job.
Thread.sleep(3 * LAZY_WRITER_INTERVAL_SEC * 1000);
triggerBlockReport();
// Corrupt the lazy-persisted block file, and verify that checksum
// verification catches it.
ensureFileReplicasOnStorageType(path1, DEFAULT);
cluster.corruptReplica(0, DFSTestUtil.getFirstBlock(fs, path1));
exception.expect(ChecksumException.class);
DFSTestUtil.readFileBuffer(fs, path1);
}
public void doShortCircuitReadMetaFileCorruptionTest() throws IOException,
InterruptedException {
final String METHOD_NAME = GenericTestUtils.getMethodName();
Path path1 = new Path("/" + METHOD_NAME + ".01.dat");
Path path2 = new Path("/" + METHOD_NAME + ".02.dat");
final int SEED = 0xFADED;
makeRandomTestFile(path1, BLOCK_SIZE, true, SEED);
ensureFileReplicasOnStorageType(path1, RAM_DISK);
// Create another file with a replica on RAM_DISK, which evicts the first.
makeRandomTestFile(path2, BLOCK_SIZE, true, SEED);
// Sleep for a short time to allow the lazy writer thread to do its job.
Thread.sleep(3 * LAZY_WRITER_INTERVAL_SEC * 1000);
triggerBlockReport();
// Corrupt the lazy-persisted checksum file, and verify that checksum
// verification catches it.
ensureFileReplicasOnStorageType(path1, DEFAULT);
File metaFile = cluster.getBlockMetadataFile(0,
DFSTestUtil.getFirstBlock(fs, path1));
MiniDFSCluster.corruptBlock(metaFile);
exception.expect(ChecksumException.class);
DFSTestUtil.readFileBuffer(fs, path1);
}
public void doTest(BlockReaderLocal reader, byte original[])
throws IOException {
byte buf[] = new byte[TEST_LENGTH];
try {
reader.readFully(buf, 0, 10);
assertArrayRegionsEqual(original, 0, buf, 0, 10);
reader.readFully(buf, 10, 100);
assertArrayRegionsEqual(original, 10, buf, 10, 100);
reader.readFully(buf, 110, 700);
assertArrayRegionsEqual(original, 110, buf, 110, 700);
reader.skip(1); // skip from offset 810 to offset 811
reader.readFully(buf, 811, 5);
assertArrayRegionsEqual(original, 811, buf, 811, 5);
reader.readFully(buf, 816, 900);
if (usingChecksums) {
// We should detect the corruption when using a checksum file.
Assert.fail("did not detect corruption");
}
} catch (ChecksumException e) {
if (!usingChecksums) {
Assert.fail("didn't expect to get ChecksumException: not " +
"using checksums.");
}
}
}
@Override
public int doRead(BlockReader blockReader, int off, int len)
throws ChecksumException, IOException {
int oldpos = buf.position();
int oldlimit = buf.limit();
boolean success = false;
try {
int ret = blockReader.read(buf);
success = true;
updateReadStatistics(readStatistics, ret, blockReader);
return ret;
} finally {
if (!success) {
// Reset to original state so that retries work correctly.
buf.position(oldpos);
buf.limit(oldlimit);
}
}
}
/**
* Verify multiple CRC chunks.
*/
private void verifyChunks(ByteBuffer dataBuf, ByteBuffer checksumBuf)
throws IOException {
try {
clientChecksum.verifyChunkedSums(dataBuf, checksumBuf, clientname, 0);
} catch (ChecksumException ce) {
LOG.warn("Checksum error in block " + block + " from " + inAddr, ce);
// No need to report to namenode when client is writing.
if (srcDataNode != null && isDatanode) {
try {
LOG.info("report corrupt " + block + " from datanode " +
srcDataNode + " to namenode");
datanode.reportRemoteBadBlock(srcDataNode, block);
} catch (IOException e) {
LOG.warn("Failed to report bad " + block +
" from datanode " + srcDataNode + " to namenode");
}
}
throw new IOException("Unexpected checksum mismatch while writing "
+ block + " from " + inAddr);
}
}
/**
* Compute checksum for chunks and verify the checksum that is read from
* the metadata file is correct.
*
* @param buf buffer that has checksum and data
* @param dataOffset position where data is written in the buf
* @param datalen length of data
* @param numChunks number of chunks corresponding to data
* @param checksumOffset offset where checksum is written in the buf
* @throws ChecksumException on failed checksum verification
*/
public void verifyChecksum(final byte[] buf, final int dataOffset,
final int datalen, final int numChunks, final int checksumOffset)
throws ChecksumException {
int dOff = dataOffset;
int cOff = checksumOffset;
int dLeft = datalen;
for (int i = 0; i < numChunks; i++) {
checksum.reset();
int dLen = Math.min(dLeft, chunkSize);
checksum.update(buf, dOff, dLen);
if (!checksum.compare(buf, cOff)) {
long failedPos = offset + datalen - dLeft;
throw new ChecksumException("Checksum failed at " + failedPos,
failedPos);
}
dLeft -= dLen;
dOff += dLen;
cOff += checksumSize;
}
}
private long tailFile(Path file, long startPos) throws IOException {
long numRead = 0;
FSDataInputStream inputStream = fileSystem.open(file);
inputStream.seek(startPos);
int len = 4 * 1024;
byte[] buf = new byte[len];
int read;
while ((read = inputStream.read(buf)) > -1) {
LOG.info(String.format("read %d bytes", read));
if (!validateSequentialBytes(buf, (int) (startPos + numRead), read)) {
LOG.error(String.format("invalid bytes: [%s]\n", Arrays.toString(buf)));
throw new ChecksumException(
String.format("unable to validate bytes"),
startPos
);
}
numRead += read;
}
inputStream.close();
return numRead + startPos - 1;
}
/**
* Ask dfs client to read the file
*/
private void dfsClientReadFile(Path corruptedFile) throws IOException,
UnresolvedLinkException {
DFSInputStream in = dfs.dfs.open(corruptedFile.toUri().getPath());
byte[] buf = new byte[buffersize];
int nRead = 0; // total number of bytes read
try {
do {
nRead = in.read(buf, 0, buf.length);
} while (nRead > 0);
} catch (ChecksumException ce) {
// caught ChecksumException if all replicas are bad, ignore and continue.
LOG.debug("DfsClientReadFile caught ChecksumException.");
} catch (BlockMissingException bme) {
// caught BlockMissingException, ignore.
LOG.debug("DfsClientReadFile caught BlockMissingException.");
}
}
public void doShortCircuitReadBlockFileCorruptionTest() throws IOException,
InterruptedException {
final String METHOD_NAME = GenericTestUtils.getMethodName();
Path path1 = new Path("/" + METHOD_NAME + ".01.dat");
Path path2 = new Path("/" + METHOD_NAME + ".02.dat");
final int SEED = 0xFADED;
makeRandomTestFile(path1, BLOCK_SIZE, true, SEED);
ensureFileReplicasOnStorageType(path1, RAM_DISK);
// Create another file with a replica on RAM_DISK, which evicts the first.
makeRandomTestFile(path2, BLOCK_SIZE, true, SEED);
// Sleep for a short time to allow the lazy writer thread to do its job.
Thread.sleep(3 * LAZY_WRITER_INTERVAL_SEC * 1000);
triggerBlockReport();
// Corrupt the lazy-persisted block file, and verify that checksum
// verification catches it.
ensureFileReplicasOnStorageType(path1, DEFAULT);
cluster.corruptReplica(0, DFSTestUtil.getFirstBlock(fs, path1));
exception.expect(ChecksumException.class);
DFSTestUtil.readFileBuffer(fs, path1);
}
public void doShortCircuitReadMetaFileCorruptionTest() throws IOException,
InterruptedException {
final String METHOD_NAME = GenericTestUtils.getMethodName();
Path path1 = new Path("/" + METHOD_NAME + ".01.dat");
Path path2 = new Path("/" + METHOD_NAME + ".02.dat");
final int SEED = 0xFADED;
makeRandomTestFile(path1, BLOCK_SIZE, true, SEED);
ensureFileReplicasOnStorageType(path1, RAM_DISK);
// Create another file with a replica on RAM_DISK, which evicts the first.
makeRandomTestFile(path2, BLOCK_SIZE, true, SEED);
// Sleep for a short time to allow the lazy writer thread to do its job.
Thread.sleep(3 * LAZY_WRITER_INTERVAL_SEC * 1000);
triggerBlockReport();
// Corrupt the lazy-persisted checksum file, and verify that checksum
// verification catches it.
ensureFileReplicasOnStorageType(path1, DEFAULT);
File metaFile = cluster.getBlockMetadataFile(0,
DFSTestUtil.getFirstBlock(fs, path1));
MiniDFSCluster.corruptBlock(metaFile);
exception.expect(ChecksumException.class);
DFSTestUtil.readFileBuffer(fs, path1);
}
public void doTest(BlockReaderLocal reader, byte original[])
throws IOException {
byte buf[] = new byte[TEST_LENGTH];
try {
reader.readFully(buf, 0, 10);
assertArrayRegionsEqual(original, 0, buf, 0, 10);
reader.readFully(buf, 10, 100);
assertArrayRegionsEqual(original, 10, buf, 10, 100);
reader.readFully(buf, 110, 700);
assertArrayRegionsEqual(original, 110, buf, 110, 700);
reader.skip(1); // skip from offset 810 to offset 811
reader.readFully(buf, 811, 5);
assertArrayRegionsEqual(original, 811, buf, 811, 5);
reader.readFully(buf, 816, 900);
if (usingChecksums) {
// We should detect the corruption when using a checksum file.
Assert.fail("did not detect corruption");
}
} catch (ChecksumException e) {
if (!usingChecksums) {
Assert.fail("didn't expect to get ChecksumException: not " +
"using checksums.");
}
}
}
private long tailFile(Path file, long startPos) throws IOException {
long numRead = 0;
FSDataInputStream inputStream = fileSystem.open(file);
inputStream.seek(startPos);
int len = 4 * 1024;
byte[] buf = new byte[len];
int read;
while ((read = inputStream.read(buf)) > -1) {
LOG.info(String.format("read %d bytes", read));
if (!validateSequentialBytes(buf, (int) (startPos + numRead), read)) {
LOG.error(String.format("invalid bytes: [%s]\n", Arrays.toString(buf)));
throw new ChecksumException(
String.format("unable to validate bytes"),
startPos
);
}
numRead += read;
}
inputStream.close();
return numRead + startPos - 1;
}
@Override
protected void _processEventIO(InjectionEvent event, Object... args)
throws IOException {
if (synchronizationPoint == event) {
if(disabled)
return;
LOG.info("PROCESSING EVENT: " + synchronizationPoint + " counter: " + simulatedFailure);
simulatedFailure++;
if(event == InjectionEvent.INGEST_READ_OP && ((simulatedFailure % 3) == 1)){
LOG.info("Throwing checksum exception");
throw new ChecksumException("Testing checksum exception...", 0);
}
if(event == InjectionEvent.INGEST_READ_OP && ((simulatedFailure % 7) == 1)){
LOG.info("Throwing IO exception");
throw new IOException("Testing IO exception...");
}
}
}
/**
* Validate a transaction's checksum
*/
private void validateChecksum(DataInputStream in,
Checksum checksum,
long txid)
throws IOException {
if (checksum != null) {
int calculatedChecksum = (int)checksum.getValue();
int readChecksum = in.readInt(); // read in checksum
if (readChecksum != calculatedChecksum) {
throw new ChecksumException(
"Transaction is corrupt. Calculated checksum is " +
calculatedChecksum + " but read checksum " + readChecksum, txid);
}
}
}
private int doRead(byte[]b, int off, int len) throws IOException {
// If we are trying to read past the end of data, just read
// the left over data
if (currentOffset + len > dataLength) {
len = (int) dataLength - (int)currentOffset;
}
int bytesRead = in.read(b, off, len);
if (bytesRead < 0) {
throw new ChecksumException("Checksum Error", 0);
}
sum.update(b,off,bytesRead);
currentOffset += bytesRead;
if (disableChecksumValidation) {
return bytesRead;
}
if (currentOffset == dataLength) {
// The last four bytes are checksum. Strip them and verify
csum = new byte[checksumSize];
IOUtils.readFully(in, csum, 0, checksumSize);
if (!sum.compare(csum, 0)) {
throw new ChecksumException("Checksum Error", 0);
}
}
return bytesRead;
}
public void testBadIndex() throws Exception {
final int parts = 30;
fs.delete(p, true);
conf.setInt(TTConfig.TT_INDEX_CACHE, 1);
IndexCache cache = new IndexCache(conf);
Path f = new Path(p, "badindex");
FSDataOutputStream out = fs.create(f, false);
CheckedOutputStream iout = new CheckedOutputStream(out, new CRC32());
DataOutputStream dout = new DataOutputStream(iout);
for (int i = 0; i < parts; ++i) {
for (int j = 0; j < MapTask.MAP_OUTPUT_INDEX_RECORD_LENGTH / 8; ++j) {
if (0 == (i % 3)) {
dout.writeLong(i);
} else {
out.writeLong(i);
}
}
}
out.writeLong(iout.getChecksum().getValue());
dout.close();
try {
cache.getIndexInformation("badindex", 7, f,
UserGroupInformation.getCurrentUser().getShortUserName());
fail("Did not detect bad checksum");
} catch (IOException e) {
if (!(e.getCause() instanceof ChecksumException)) {
throw e;
}
}
}
@Override
public int doRead(BlockReader blockReader, int off, int len)
throws ChecksumException, IOException {
int nRead = blockReader.read(buf, off, len);
updateReadStatistics(readStatistics, nRead, blockReader);
return nRead;
}
/**
* Validate a transaction's checksum
*/
static void validateChecksum(boolean supportChecksum,
DataInputStream rawStream, Checksum checksum, int tid)
throws IOException {
if (supportChecksum) {
int expectedChecksum = rawStream.readInt(); // read in checksum
int calculatedChecksum = (int)checksum.getValue();
if (expectedChecksum != calculatedChecksum) {
throw new ChecksumException(
"Transaction " + tid + " is corrupt.", tid);
}
}
}
/**
* Validate a transaction's checksum
*/
private void validateChecksum(DataInputStream in,
Checksum checksum,
long txid)
throws IOException {
if (checksum != null) {
int calculatedChecksum = (int)checksum.getValue();
int readChecksum = in.readInt(); // read in checksum
if (readChecksum != calculatedChecksum) {
throw new ChecksumException(
"Transaction is corrupt. Calculated checksum is " +
calculatedChecksum + " but read checksum " + readChecksum, txid);
}
}
}
private void checkFileCorruption(LocalFileSystem fileSys, Path file,
Path fileToCorrupt) throws IOException {
// corrupt the file
RandomAccessFile out =
new RandomAccessFile(new File(fileToCorrupt.toString()), "rw");
byte[] buf = new byte[(int)fileSys.getFileStatus(file).getLen()];
int corruptFileLen = (int)fileSys.getFileStatus(fileToCorrupt).getLen();
assertTrue(buf.length >= corruptFileLen);
rand.nextBytes(buf);
out.seek(corruptFileLen/2);
out.write(buf, 0, corruptFileLen/4);
out.close();
boolean gotException = false;
InputStream in = fileSys.open(file);
try {
IOUtils.readFully(in, buf, 0, buf.length);
} catch (ChecksumException e) {
gotException = true;
}
assertTrue(gotException);
in.close();
}
/**
* Implementation of chunked verification specifically on byte arrays. This
* is to avoid the copy when dealing with ByteBuffers that have array backing.
*/
private void verifyChunkedSums(
byte[] data, int dataOff, int dataLen,
byte[] checksums, int checksumsOff, String fileName,
long basePos) throws ChecksumException {
if (type.size == 0) return;
if (NativeCrc32.isAvailable()) {
NativeCrc32.verifyChunkedSumsByteArray(bytesPerChecksum, type.id,
checksums, checksumsOff, data, dataOff, dataLen, fileName, basePos);
return;
}
int remaining = dataLen;
int dataPos = 0;
while (remaining > 0) {
int n = Math.min(remaining, bytesPerChecksum);
summer.reset();
summer.update(data, dataOff + dataPos, n);
dataPos += n;
remaining -= n;
int calculated = (int)summer.getValue();
int stored = (checksums[checksumsOff] << 24 & 0xff000000) |
(checksums[checksumsOff + 1] << 16 & 0xff0000) |
(checksums[checksumsOff + 2] << 8 & 0xff00) |
checksums[checksumsOff + 3] & 0xff;
checksumsOff += 4;
if (calculated != stored) {
long errPos = basePos + dataPos - n;
throw new ChecksumException(
"Checksum error: "+ fileName + " at "+ errPos +
" exp: " + stored + " got: " + calculated, errPos);
}
}
}
public static void verifyChunkedSumsByteArray(int bytesPerSum,
int checksumType, byte[] sums, int sumsOffset, byte[] data,
int dataOffset, int dataLength, String fileName, long basePos)
throws ChecksumException {
nativeComputeChunkedSumsByteArray(bytesPerSum, checksumType,
sums, sumsOffset,
data, dataOffset, dataLength,
fileName, basePos, true);
}
@Test
public void testVerifyChunkedSumsSuccess() throws ChecksumException {
allocateDirectByteBuffers();
fillDataAndValidChecksums();
NativeCrc32.verifyChunkedSums(bytesPerChecksum, checksumType.id,
checksums, data, fileName, BASE_POSITION);
}
@Test
public void testVerifyChunkedSumsFail() throws ChecksumException {
allocateDirectByteBuffers();
fillDataAndInvalidChecksums();
exception.expect(ChecksumException.class);
NativeCrc32.verifyChunkedSums(bytesPerChecksum, checksumType.id,
checksums, data, fileName, BASE_POSITION);
}
@Test
public void testVerifyChunkedSumsByteArraySuccess() throws ChecksumException {
allocateArrayByteBuffers();
fillDataAndValidChecksums();
NativeCrc32.verifyChunkedSumsByteArray(bytesPerChecksum, checksumType.id,
checksums.array(), checksums.position(), data.array(), data.position(),
data.remaining(), fileName, BASE_POSITION);
}