下面列出了org.apache.hadoop.io.IOUtils#closeStream ( ) 实例代码,或者点击链接到github查看源代码,也可以在右侧发表评论。
/**
* Read checksum into given buffer
* @param buf buffer to read the checksum into
* @param checksumOffset offset at which to write the checksum into buf
* @param checksumLen length of checksum to write
* @throws IOException on error
*/
private void readChecksum(byte[] buf, final int checksumOffset,
final int checksumLen) throws IOException {
if (checksumSize <= 0 && checksumIn == null) {
return;
}
try {
checksumIn.readFully(buf, checksumOffset, checksumLen);
} catch (IOException e) {
LOG.warn(" Could not read or failed to veirfy checksum for data"
+ " at offset " + offset + " for block " + block, e);
IOUtils.closeStream(checksumIn);
checksumIn = null;
if (corruptChecksumOk) {
if (checksumOffset < checksumLen) {
// Just fill the array with zeros.
Arrays.fill(buf, checksumOffset, checksumLen, (byte) 0);
}
} else {
throw e;
}
}
}
/**
* Convert from a JSON file
* @param resource input file
* @return the parsed JSON
* @throws IOException IO problems
* @throws JsonMappingException failure to map from the JSON to this class
*/
@SuppressWarnings({"IOResourceOpenedButNotSafelyClosed"})
public synchronized T fromResource(String resource)
throws IOException, JsonParseException, JsonMappingException {
InputStream resStream = null;
try {
resStream = this.getClass().getResourceAsStream(resource);
if (resStream == null) {
throw new FileNotFoundException(resource);
}
return mapper.readValue(resStream, classType);
} catch (IOException e) {
LOG.error("Exception while parsing json resource {}: {}", resource, e);
throw e;
} finally {
IOUtils.closeStream(resStream);
}
}
private void checkSnapshot(CubeManager cubeManager, CubeSegment cubeSegment) {
List<DimensionDesc> dimensionDescs = cubeSegment.getCubeDesc().getDimensions();
for (DimensionDesc dimensionDesc : dimensionDescs) {
TableRef lookup = dimensionDesc.getTableRef();
String tableIdentity = lookup.getTableIdentity();
if (cubeSegment.getModel().isLookupTable(tableIdentity) && !cubeSegment.getCubeDesc().isExtSnapshotTable(tableIdentity)) {
logger.info("Checking snapshot of {}", lookup);
try {
JoinDesc join = cubeSegment.getModel().getJoinsTree().getJoinByPKSide(lookup);
ILookupTable table = cubeManager.getLookupTable(cubeSegment, join);
if (table != null) {
IOUtils.closeStream(table);
}
} catch (Throwable th) {
throw new RuntimeException(String.format(Locale.ROOT, "Checking snapshot of %s failed.", lookup), th);
}
}
}
}
public ByteArrayInputStream downFile(String hdfsPath) throws IOException
{
InputStream in = fileSystem.open(new Path(hdfsPath));
ByteArrayOutputStream bos = new ByteArrayOutputStream();
try
{
IOUtils.copyBytes(in, bos, conf);
} catch (Exception e)
{
e.printStackTrace();
} finally
{
IOUtils.closeStream(in);
}
return new ByteArrayInputStream(bos.toByteArray());
}
void writeStreamToFile(InputStream in, PathData target,
boolean lazyPersist) throws IOException {
FSDataOutputStream out = null;
try {
out = create(target, lazyPersist);
IOUtils.copyBytes(in, out, getConf(), true);
} finally {
IOUtils.closeStream(out); // just in case copyBytes didn't
}
}
private long copyAndAssert(File tmpFile, File outFile,
long maxBandwidth, float factor,
int sleepTime, CB flag) throws IOException {
long bandwidth;
ThrottledInputStream in;
long maxBPS = (long) (maxBandwidth / factor);
if (maxBandwidth == 0) {
in = new ThrottledInputStream(new FileInputStream(tmpFile));
} else {
in = new ThrottledInputStream(new FileInputStream(tmpFile), maxBPS);
}
OutputStream out = new FileOutputStream(outFile);
try {
if (flag == CB.BUFFER) {
copyBytes(in, out, BUFF_SIZE);
} else if (flag == CB.BUFF_OFFSET){
copyBytesWithOffset(in, out, BUFF_SIZE);
} else {
copyByteByByte(in, out);
}
LOG.info(in);
bandwidth = in.getBytesPerSec();
Assert.assertEquals(in.getTotalBytesRead(), tmpFile.length());
Assert.assertTrue(in.getBytesPerSec() > maxBandwidth / (factor * 1.2));
Assert.assertTrue(in.getTotalSleepTime() > sleepTime || in.getBytesPerSec() <= maxBPS);
} finally {
IOUtils.closeStream(in);
IOUtils.closeStream(out);
}
return bandwidth;
}
public void close() {
try {
this.writer.close();
} catch (IOException e) {
LOG.warn("Exception closing writer", e);
}
IOUtils.closeStream(fsDataOStream);
}
/**
* Test which randomly alternates between appending with
* CRC32 and with CRC32C, crossing several block boundaries.
* Then, checks that all of the data can be read back correct.
*/
@Test(timeout=RANDOM_TEST_RUNTIME*2)
public void testAlgoSwitchRandomized() throws IOException {
FileSystem fsWithCrc32 = createFsWithChecksum("CRC32", 512);
FileSystem fsWithCrc32C = createFsWithChecksum("CRC32C", 512);
Path p = new Path("/testAlgoSwitchRandomized");
long seed = Time.now();
System.out.println("seed: " + seed);
Random r = new Random(seed);
// Create empty to start
IOUtils.closeStream(fsWithCrc32.create(p));
long st = Time.now();
int len = 0;
while (Time.now() - st < RANDOM_TEST_RUNTIME) {
int thisLen = r.nextInt(500);
FileSystem fs = (r.nextBoolean() ? fsWithCrc32 : fsWithCrc32C);
FSDataOutputStream stm = fs.append(p);
try {
AppendTestUtil.write(stm, len, thisLen);
} finally {
stm.close();
}
len += thisLen;
}
AppendTestUtil.check(fsWithCrc32, p, len);
AppendTestUtil.check(fsWithCrc32C, p, len);
}
/**
* Regression test for HDFS-2742. The issue in this bug was:
* - DN does a block report while file is open. This BR contains
* the block in RBW state.
* - Standby queues the RBW state in PendingDatanodeMessages
* - Standby processes edit logs during failover. Before fixing
* this bug, it was mistakenly applying the RBW reported state
* after the block had been completed, causing the block to get
* marked corrupt. Instead, we should now be applying the RBW
* message on OP_ADD, and then the FINALIZED message on OP_CLOSE.
*/
@Test
public void testBlockReportsWhileFileBeingWritten() throws Exception {
FSDataOutputStream out = fs.create(TEST_FILE_PATH);
try {
AppendTestUtil.write(out, 0, 10);
out.hflush();
// Block report will include the RBW replica, but will be
// queued on the StandbyNode.
cluster.triggerBlockReports();
} finally {
IOUtils.closeStream(out);
}
cluster.transitionToStandby(0);
cluster.transitionToActive(1);
// Verify that no replicas are marked corrupt, and that the
// file is readable from the failed-over standby.
BlockManagerTestUtil.updateState(nn1.getNamesystem().getBlockManager());
BlockManagerTestUtil.updateState(nn2.getNamesystem().getBlockManager());
assertEquals(0, nn1.getNamesystem().getCorruptReplicaBlocks());
assertEquals(0, nn2.getNamesystem().getCorruptReplicaBlocks());
DFSTestUtil.readFile(fs, TEST_FILE_PATH);
}
private void checkAccess(OutputStream out, final boolean reply,
final ExtendedBlock blk,
final Token<BlockTokenIdentifier> t,
final Op op,
final BlockTokenSecretManager.AccessMode mode) throws IOException {
if (datanode.isBlockTokenEnabled) {
if (LOG.isDebugEnabled()) {
LOG.debug("Checking block access token for block '" + blk.getBlockId()
+ "' with mode '" + mode + "'");
}
try {
datanode.blockPoolTokenSecretManager.checkAccess(t, null, blk, mode);
} catch(InvalidToken e) {
try {
if (reply) {
BlockOpResponseProto.Builder resp = BlockOpResponseProto.newBuilder()
.setStatus(ERROR_ACCESS_TOKEN);
if (mode == BlockTokenSecretManager.AccessMode.WRITE) {
DatanodeRegistration dnR =
datanode.getDNRegistrationForBP(blk.getBlockPoolId());
// NB: Unconditionally using the xfer addr w/o hostname
resp.setFirstBadLink(dnR.getXferAddr());
}
resp.build().writeDelimitedTo(out);
out.flush();
}
LOG.warn("Block token verification failed: op=" + op
+ ", remoteAddress=" + remoteAddress
+ ", message=" + e.getLocalizedMessage());
throw e;
} finally {
IOUtils.closeStream(out);
}
}
}
}
/**
* Test complete(..) - verifies that the fileId in the request
* matches that of the Inode.
* This test checks that FileNotFoundException exception is thrown in case
* the fileId does not match.
*/
@Test
public void testFileIdMismatch() throws IOException {
Configuration conf = new HdfsConfiguration();
MiniDFSCluster cluster =
new MiniDFSCluster.Builder(conf).numDataNodes(3).build();
DistributedFileSystem dfs = null;
try {
cluster.waitActive();
dfs = cluster.getFileSystem();
DFSClient client = dfs.dfs;
final Path f = new Path("/testFileIdMismatch.txt");
createFile(dfs, f, 3);
long someOtherFileId = -1;
try {
cluster.getNameNodeRpc()
.complete(f.toString(), client.clientName, null, someOtherFileId);
fail();
} catch(LeaseExpiredException e) {
FileSystem.LOG.info("Caught Expected LeaseExpiredException: ", e);
}
} finally {
IOUtils.closeStream(dfs);
cluster.shutdown();
}
}
/**
* Append specified length of bytes to a given file
*/
private static void appendFile(Path p, int length) throws IOException {
byte[] toAppend = new byte[length];
Random random = new Random();
random.nextBytes(toAppend);
FSDataOutputStream out = cluster.getFileSystem().append(p);
try {
out.write(toAppend);
} finally {
IOUtils.closeStream(out);
}
}
/**
* Old replica of the block should not be accepted as valid for append/read
*/
@Test
public void testFailedAppendBlockRejection() throws Exception {
Configuration conf = new HdfsConfiguration();
conf.set("dfs.client.block.write.replace-datanode-on-failure.enable",
"false");
MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf).numDataNodes(3)
.build();
DistributedFileSystem fs = null;
try {
fs = cluster.getFileSystem();
Path path = new Path("/test");
FSDataOutputStream out = fs.create(path);
out.writeBytes("hello\n");
out.close();
// stop one datanode
DataNodeProperties dnProp = cluster.stopDataNode(0);
String dnAddress = dnProp.datanode.getXferAddress().toString();
if (dnAddress.startsWith("/")) {
dnAddress = dnAddress.substring(1);
}
// append again to bump genstamps
for (int i = 0; i < 2; i++) {
out = fs.append(path);
out.writeBytes("helloagain\n");
out.close();
}
// re-open and make the block state as underconstruction
out = fs.append(path);
cluster.restartDataNode(dnProp, true);
// wait till the block report comes
Thread.sleep(2000);
// check the block locations, this should not contain restarted datanode
BlockLocation[] locations = fs.getFileBlockLocations(path, 0,
Long.MAX_VALUE);
String[] names = locations[0].getNames();
for (String node : names) {
if (node.equals(dnAddress)) {
fail("Failed append should not be present in latest block locations.");
}
}
out.close();
} finally {
IOUtils.closeStream(fs);
cluster.shutdown();
}
}
/**
* Test for the case where one of the DNs in the pipeline is in the
* process of doing a block report exactly when the block is closed.
* In this case, the block report becomes delayed until after the
* block is marked completed on the NN, and hence it reports an RBW
* replica for a COMPLETE block. Such a report should not be marked
* corrupt.
* This is a regression test for HDFS-2791.
*/
@Test(timeout=300000)
public void testOneReplicaRbwReportArrivesAfterBlockCompleted() throws Exception {
final CountDownLatch brFinished = new CountDownLatch(1);
DelayAnswer delayer = new GenericTestUtils.DelayAnswer(LOG) {
@Override
protected Object passThrough(InvocationOnMock invocation)
throws Throwable {
try {
return super.passThrough(invocation);
} finally {
// inform the test that our block report went through.
brFinished.countDown();
}
}
};
final String METHOD_NAME = GenericTestUtils.getMethodName();
Path filePath = new Path("/" + METHOD_NAME + ".dat");
// Start a second DN for this test -- we're checking
// what happens when one of the DNs is slowed for some reason.
REPL_FACTOR = 2;
startDNandWait(null, false);
NameNode nn = cluster.getNameNode();
FSDataOutputStream out = fs.create(filePath, REPL_FACTOR);
try {
AppendTestUtil.write(out, 0, 10);
out.hflush();
// Set up a spy so that we can delay the block report coming
// from this node.
DataNode dn = cluster.getDataNodes().get(0);
DatanodeProtocolClientSideTranslatorPB spy =
DataNodeTestUtils.spyOnBposToNN(dn, nn);
Mockito.doAnswer(delayer)
.when(spy).blockReport(
Mockito.<DatanodeRegistration>anyObject(),
Mockito.anyString(),
Mockito.<StorageBlockReport[]>anyObject(),
Mockito.<BlockReportContext>anyObject());
// Force a block report to be generated. The block report will have
// an RBW replica in it. Wait for the RPC to be sent, but block
// it before it gets to the NN.
dn.scheduleAllBlockReport(0);
delayer.waitForCall();
} finally {
IOUtils.closeStream(out);
}
// Now that the stream is closed, the NN will have the block in COMPLETE
// state.
delayer.proceed();
brFinished.await();
// Verify that no replicas are marked corrupt, and that the
// file is still readable.
BlockManagerTestUtil.updateState(nn.getNamesystem().getBlockManager());
assertEquals(0, nn.getNamesystem().getCorruptReplicaBlocks());
DFSTestUtil.readFile(fs, filePath);
// Ensure that the file is readable even from the DN that we futzed with.
cluster.stopDataNode(1);
DFSTestUtil.readFile(fs, filePath);
}
private void verifyBlock(Block block) {
BlockSender blockSender = null;
/* In case of failure, attempt to read second time to reduce
* transient errors. How do we flush block data from kernel
* buffers before the second read?
*/
for (int i=0; i<2; i++) {
boolean second = (i > 0);
try {
adjustThrottler();
blockSender = new BlockSender(block, 0, -1, false,
false, true, datanode);
DataOutputStream out =
new DataOutputStream(new IOUtils.NullOutputStream());
blockSender.sendBlock(out, null, throttler);
LOG.info((second ? "Second " : "") +
"Verification succeeded for " + block);
if ( second ) {
totalTransientErrors++;
}
updateScanStatus(block, ScanType.VERIFICATION_SCAN, true);
return;
} catch (IOException e) {
totalScanErrors++;
updateScanStatus(block, ScanType.VERIFICATION_SCAN, false);
// If the block does not exists anymore, then its not an error
if ( dataset.getFile(block) == null ) {
LOG.info("Verification failed for " + block + ". Its ok since " +
"it not in datanode dataset anymore.");
deleteBlock(block);
return;
}
LOG.warn((second ? "Second " : "First ") +
"Verification failed for " + block + ". Exception : " +
StringUtils.stringifyException(e));
if (second) {
datanode.getMetrics().blockVerificationFailures.inc();
handleScanFailure(block);
return;
}
} finally {
IOUtils.closeStream(blockSender);
datanode.getMetrics().blocksVerified.inc();
totalScans++;
totalVerifications++;
}
}
}
@Override
public void close() {
IOUtils.closeStream(dataOut);
IOUtils.closeStream(checksumOut);
}
@Override
public void blockChecksum(final ExtendedBlock block,
final Token<BlockTokenIdentifier> blockToken) throws IOException {
final DataOutputStream out = new DataOutputStream(
getOutputStream());
checkAccess(out, true, block, blockToken,
Op.BLOCK_CHECKSUM, BlockTokenSecretManager.AccessMode.READ);
// client side now can specify a range of the block for checksum
long requestLength = block.getNumBytes();
Preconditions.checkArgument(requestLength >= 0);
long visibleLength = datanode.data.getReplicaVisibleLength(block);
boolean partialBlk = requestLength < visibleLength;
updateCurrentThreadName("Reading metadata for block " + block);
final LengthInputStream metadataIn = datanode.data
.getMetaDataInputStream(block);
final DataInputStream checksumIn = new DataInputStream(
new BufferedInputStream(metadataIn, HdfsConstants.IO_FILE_BUFFER_SIZE));
updateCurrentThreadName("Getting checksum for block " + block);
try {
//read metadata file
final BlockMetadataHeader header = BlockMetadataHeader
.readHeader(checksumIn);
final DataChecksum checksum = header.getChecksum();
final int csize = checksum.getChecksumSize();
final int bytesPerCRC = checksum.getBytesPerChecksum();
final long crcPerBlock = csize <= 0 ? 0 :
(metadataIn.getLength() - BlockMetadataHeader.getHeaderSize()) / csize;
final MD5Hash md5 = partialBlk && crcPerBlock > 0 ?
calcPartialBlockChecksum(block, requestLength, checksum, checksumIn)
: MD5Hash.digest(checksumIn);
if (LOG.isDebugEnabled()) {
LOG.debug("block=" + block + ", bytesPerCRC=" + bytesPerCRC
+ ", crcPerBlock=" + crcPerBlock + ", md5=" + md5);
}
//write reply
BlockOpResponseProto.newBuilder()
.setStatus(SUCCESS)
.setChecksumResponse(OpBlockChecksumResponseProto.newBuilder()
.setBytesPerCrc(bytesPerCRC)
.setCrcPerBlock(crcPerBlock)
.setMd5(ByteString.copyFrom(md5.getDigest()))
.setCrcType(PBHelper.convert(checksum.getChecksumType())))
.build()
.writeDelimitedTo(out);
out.flush();
} catch (IOException ioe) {
LOG.info("blockChecksum " + block + " received exception " + ioe);
incrDatanodeNetworkErrors();
throw ioe;
} finally {
IOUtils.closeStream(out);
IOUtils.closeStream(checksumIn);
IOUtils.closeStream(metadataIn);
}
//update metrics
datanode.metrics.addBlockChecksumOp(elapsed());
}
public void runQueryTest(String query, String firstValStr,
int numExpectedResults, int expectedSum, String targetDir)
throws IOException {
ClassLoader prevClassLoader = null;
SequenceFile.Reader reader = null;
String [] argv = getArgv(true, query, targetDir, false);
runImport(argv);
try {
SqoopOptions opts = new ImportTool().parseArguments(
getArgv(false, query, targetDir, false),
null, null, true);
CompilationManager compileMgr = new CompilationManager(opts);
String jarFileName = compileMgr.getJarFilename();
prevClassLoader = ClassLoaderStack.addJarFile(jarFileName,
getTableName());
reader = SeqFileReader.getSeqFileReader(getDataFilePath().toString());
// here we can actually instantiate (k, v) pairs.
Configuration conf = new Configuration();
Object key = ReflectionUtils.newInstance(reader.getKeyClass(), conf);
Object val = ReflectionUtils.newInstance(reader.getValueClass(), conf);
if (reader.next(key) == null) {
fail("Empty SequenceFile during import");
}
// make sure that the value we think should be at the top, is.
reader.getCurrentValue(val);
assertEquals("Invalid ordering within sorted SeqFile", firstValStr,
val.toString());
// We know that these values are two ints separated by a ',' character.
// Since this is all dynamic, though, we don't want to actually link
// against the class and use its methods. So we just parse this back
// into int fields manually. Sum them up and ensure that we get the
// expected total for the first column, to verify that we got all the
// results from the db into the file.
int curSum = getFirstInt(val.toString());
int totalResults = 1;
// now sum up everything else in the file.
while (reader.next(key) != null) {
reader.getCurrentValue(val);
curSum += getFirstInt(val.toString());
totalResults++;
}
assertEquals("Total sum of first db column mismatch", expectedSum,
curSum);
assertEquals("Incorrect number of results for query", numExpectedResults,
totalResults);
} catch (InvalidOptionsException ioe) {
fail(ioe.toString());
} catch (ParseException pe) {
fail(pe.toString());
} finally {
IOUtils.closeStream(reader);
if (null != prevClassLoader) {
ClassLoaderStack.setCurrentClassLoader(prevClassLoader);
}
}
}
@Override
public void close() {
IOUtils.closeStream(dataIn);
IOUtils.closeStream(checksumIn);
IOUtils.cleanup(null, volumeRef);
}
/**
* Close the inner stream if not null. Even if an exception
* is raised during the close, the field is set to null
*/
private void closeInnerStream() {
IOUtils.closeStream(in);
in = null;
}