下面列出了org.apache.hadoop.io.IOUtils#cleanup ( ) 实例代码,或者点击链接到github查看源代码,也可以在右侧发表评论。
@Override
public void close() throws IOException {
IOUtils.cleanup(null, reader, fileScanner);
if (fileScanner != null) {
try {
TableStats stats = fileScanner.getInputStats();
if (stats != null) {
inputStats = (TableStats) stats.clone();
}
} catch (CloneNotSupportedException e) {
e.printStackTrace();
}
}
reader = null;
fileScanner = null;
plan = null;
qual = null;
projector = null;
indexLookupKey = null;
}
private static void testFailoverAfterCrashDuringLogRoll(boolean writeHeader)
throws Exception {
Configuration conf = new Configuration();
conf.setInt(DFSConfigKeys.DFS_HA_TAILEDITS_PERIOD_KEY, Integer.MAX_VALUE);
MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf)
.nnTopology(MiniDFSNNTopology.simpleHATopology())
.numDataNodes(0)
.build();
FileSystem fs = HATestUtil.configureFailoverFs(cluster, conf);
try {
cluster.transitionToActive(0);
NameNode nn0 = cluster.getNameNode(0);
nn0.getRpcServer().rollEditLog();
cluster.shutdownNameNode(0);
createEmptyInProgressEditLog(cluster, nn0, writeHeader);
cluster.transitionToActive(1);
} finally {
IOUtils.cleanup(LOG, fs);
cluster.shutdown();
}
}
private void writeFile(Path file, byte[] data) throws IOException {
final int WRITE_BUFFER_SIZE = 4096;
FSDataOutputStream out = fs.create(file, FILE_PERMISSIONS, true,
WRITE_BUFFER_SIZE, fs.getDefaultReplication(file),
fs.getDefaultBlockSize(file), null);
try {
try {
out.write(data);
out.close();
out = null;
} finally {
IOUtils.cleanup(LOG, out);
}
} catch (IOException e) {
fs.delete(file, false);
throw e;
}
}
/**
* Convenience method for reading a token storage file, and loading the Tokens
* therein in the passed UGI
* @param filename
* @param conf
* @throws IOException
*/
public static Credentials readTokenStorageFile(Path filename, Configuration conf)
throws IOException {
FSDataInputStream in = null;
Credentials credentials = new Credentials();
try {
in = filename.getFileSystem(conf).open(filename);
credentials.readTokenStorageStream(in);
in.close();
return credentials;
} catch(IOException ioe) {
throw new IOException("Exception reading " + filename, ioe);
} finally {
IOUtils.cleanup(LOG, in);
}
}
/**
* Make sure that we starting reading the correct op when we request a stream
* with a txid in the middle of an edit log file.
*/
@Test
public void testReadFromMiddleOfEditLog() throws CorruptionException,
IOException {
File f = new File(TestEditLog.TEST_DIR + "/readfrommiddleofeditlog");
NNStorage storage = setupEdits(Collections.<URI>singletonList(f.toURI()),
10);
StorageDirectory sd = storage.dirIterator(NameNodeDirType.EDITS).next();
FileJournalManager jm = new FileJournalManager(conf, sd, storage);
EditLogInputStream elis = getJournalInputStream(jm, 5, true);
try {
FSEditLogOp op = elis.readOp();
assertEquals("read unexpected op", op.getTransactionId(), 5);
} finally {
IOUtils.cleanup(LOG, elis);
}
}
/**
* Ensure that even if a file is in a directory with the sticky bit on,
* another user can write to that file (assuming correct permissions).
*/
private void confirmCanAppend(Configuration conf, Path p) throws Exception {
// Write a file to the new tmp directory as a regular user
Path file = new Path(p, "foo");
writeFile(hdfsAsUser1, file);
hdfsAsUser1.setPermission(file, new FsPermission((short) 0777));
// Log onto cluster as another user and attempt to append to file
Path file2 = new Path(p, "foo");
FSDataOutputStream h = null;
try {
h = hdfsAsUser2.append(file2);
h.write("Some more data".getBytes());
h.close();
h = null;
} finally {
IOUtils.cleanup(null, h);
}
}
@Test
public void testMainMethodMapFile() {
String inFile = "mainMethodMapFile.mapfile";
String path = new Path(TEST_DIR, inFile).toString();
String[] args = { path, path };
MapFile.Writer writer = null;
try {
writer = createWriter(inFile, IntWritable.class, Text.class);
writer.append(new IntWritable(1), new Text("test_text1"));
writer.append(new IntWritable(2), new Text("test_text2"));
writer.close();
MapFile.main(args);
} catch (Exception ex) {
fail("testMainMethodMapFile error !!!");
} finally {
IOUtils.cleanup(null, writer);
}
}
@Override // Storage
public boolean isPreUpgradableLayout(StorageDirectory sd) throws IOException {
if (disablePreUpgradableLayoutCheck) {
return false;
}
File oldImageDir = new File(sd.getRoot(), "image");
if (!oldImageDir.exists()) {
return false;
}
// check the layout version inside the image file
File oldF = new File(oldImageDir, "fsimage");
RandomAccessFile oldFile = new RandomAccessFile(oldF, "rws");
try {
oldFile.seek(0);
int oldVersion = oldFile.readInt();
oldFile.close();
oldFile = null;
if (oldVersion < LAST_PRE_UPGRADE_LAYOUT_VERSION)
return false;
} finally {
IOUtils.cleanup(LOG, oldFile);
}
return true;
}
private static void testFailoverAfterCrashDuringLogRoll(boolean writeHeader)
throws Exception {
Configuration conf = new Configuration();
conf.setInt(DFSConfigKeys.DFS_HA_TAILEDITS_PERIOD_KEY, Integer.MAX_VALUE);
MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf)
.nnTopology(MiniDFSNNTopology.simpleHATopology())
.numDataNodes(0)
.build();
FileSystem fs = HATestUtil.configureFailoverFs(cluster, conf);
try {
cluster.transitionToActive(0);
NameNode nn0 = cluster.getNameNode(0);
nn0.getRpcServer().rollEditLog();
cluster.shutdownNameNode(0);
createEmptyInProgressEditLog(cluster, nn0, writeHeader);
cluster.transitionToActive(1);
} finally {
IOUtils.cleanup(LOG, fs);
cluster.shutdown();
}
}
@Override
public void channelUnregistered(ChannelHandlerContext ctx) throws Exception {
if(getState() != FetcherState.FETCH_DATA_FINISHED){
//channel is closed, but cannot complete fetcher
endFetch(FetcherState.FETCH_FAILED);
LOG.error("Channel closed by peer: " + ctx.channel());
}
IOUtils.cleanup(LOG, fc, raf);
super.channelUnregistered(ctx);
}
@Override
public void close() throws IOException {
_isClosed.set(true);
IOUtils.cleanup(LOG, makeCloseable(_bulkIndexingTimer, _watchForIdleBulkWriters),
makeCloseable(_indexWriterTimer, _watchForIdleWriter), _indexImporter, _mutationQueueProcessor,
makeCloseable(_writer.get()), _indexReader.get(), _directory);
}
@After
public void shutdown() {
IOUtils.cleanup(null, fs);
if (cluster != null) {
cluster.shutdown();
}
}
public static void shuffleToDisk(OutputStream output, String hostIdentifier,
InputStream input, long compressedLength, Log LOG, String identifier)
throws IOException {
// Copy data to local-disk
long bytesLeft = compressedLength;
try {
final int BYTES_TO_READ = 64 * 1024;
byte[] buf = new byte[BYTES_TO_READ];
while (bytesLeft > 0) {
int n = input.read(buf, 0, (int) Math.min(bytesLeft, BYTES_TO_READ));
if (n < 0) {
throw new IOException("read past end of stream reading "
+ identifier);
}
output.write(buf, 0, n);
bytesLeft -= n;
// metrics.inputBytes(n);
}
LOG.info("Read " + (compressedLength - bytesLeft)
+ " bytes from input for " + identifier);
output.close();
} catch (IOException ioe) {
// Close the streams
IOUtils.cleanup(LOG, input, output);
// Re-throw
throw ioe;
}
// Sanity check
if (bytesLeft != 0) {
throw new IOException("Incomplete map output received for " +
identifier + " from " +
hostIdentifier + " (" +
bytesLeft + " bytes missing of " +
compressedLength + ")");
}
}
@AfterClass
public static void shutdown() {
IOUtils.cleanup(null, fs);
if (cluster != null) {
cluster.shutdown();
}
}
@After
public void shutdown() throws IOException {
IOUtils.cleanup(LOG, toClose.toArray(new Closeable[0]));
// Should not leak clients between tests -- this can cause flaky tests.
// (See HDFS-4643)
GenericTestUtils.assertNoThreadsMatching(".*IPC Client.*");
if (cluster != null) {
cluster.shutdown();
}
}
/**
* Test getting all the storage policies from the namenode
*/
@Test
public void testGetAllStoragePolicies() throws Exception {
final MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf)
.numDataNodes(0).build();
cluster.waitActive();
final DistributedFileSystem fs = cluster.getFileSystem();
try {
BlockStoragePolicy[] policies = fs.getStoragePolicies();
Assert.assertEquals(6, policies.length);
Assert.assertEquals(POLICY_SUITE.getPolicy(COLD).toString(),
policies[0].toString());
Assert.assertEquals(POLICY_SUITE.getPolicy(WARM).toString(),
policies[1].toString());
Assert.assertEquals(POLICY_SUITE.getPolicy(HOT).toString(),
policies[2].toString());
Assert.assertEquals(POLICY_SUITE.getPolicy(ONESSD).toString(),
policies[3].toString());
Assert.assertEquals(POLICY_SUITE.getPolicy(ALLSSD).toString(),
policies[4].toString());
Assert.assertEquals(POLICY_SUITE.getPolicy(LAZY_PERSIST).toString(),
policies[5].toString());
} finally {
IOUtils.cleanup(null, fs);
cluster.shutdown();
}
}
private void createDatabaseAndTable(ServiceTracker tracker) throws Exception {
TajoClient client = null;
try {
client = new TajoClientImpl(tracker);
client.executeQuery("CREATE TABLE default.ha_test1 (age int);");
client.executeQuery("CREATE TABLE default.ha_test2 (age int);");
} finally {
IOUtils.cleanup(null, client);
}
}
@VisibleForTesting
List<String> getEntityTypes() throws IOException {
LeveldbIterator iterator = null;
try {
iterator = getDbIterator(false);
List<String> entityTypes = new ArrayList<String>();
iterator.seek(ENTITY_ENTRY_PREFIX);
while (iterator.hasNext()) {
byte[] key = iterator.peekNext().getKey();
if (key[0] != ENTITY_ENTRY_PREFIX[0]) {
break;
}
KeyParser kp = new KeyParser(key,
ENTITY_ENTRY_PREFIX.length);
String entityType = kp.getNextString();
entityTypes.add(entityType);
byte[] lookupKey = KeyBuilder.newInstance().add(ENTITY_ENTRY_PREFIX)
.add(entityType).getBytesForLookup();
if (lookupKey[lookupKey.length - 1] != 0x0) {
throw new IOException("Found unexpected end byte in lookup key");
}
lookupKey[lookupKey.length - 1] = 0x1;
iterator.seek(lookupKey);
}
return entityTypes;
} catch(DBException e) {
throw new IOException(e);
} finally {
IOUtils.cleanup(LOG, iterator);
}
}
/**
* Test hsync with END_BLOCK flag.
*/
@Test
public void hSyncEndBlock_00() throws IOException {
final int preferredBlockSize = 1024;
Configuration conf = new HdfsConfiguration();
conf.setInt(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, preferredBlockSize);
MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf).numDataNodes(2)
.build();
DistributedFileSystem fileSystem = cluster.getFileSystem();
FSDataOutputStream stm = null;
try {
Path path = new Path("/" + fName);
stm = fileSystem.create(path, true, 4096, (short) 2,
AppendTestUtil.BLOCK_SIZE);
System.out.println("Created file " + path.toString());
((DFSOutputStream) stm.getWrappedStream()).hsync(EnumSet
.of(SyncFlag.END_BLOCK));
long currentFileLength = fileSystem.getFileStatus(path).getLen();
assertEquals(0L, currentFileLength);
LocatedBlocks blocks = fileSystem.dfs.getLocatedBlocks(path.toString(), 0);
assertEquals(0, blocks.getLocatedBlocks().size());
// write a block and call hsync(end_block) at the block boundary
stm.write(new byte[preferredBlockSize]);
((DFSOutputStream) stm.getWrappedStream()).hsync(EnumSet
.of(SyncFlag.END_BLOCK));
currentFileLength = fileSystem.getFileStatus(path).getLen();
assertEquals(preferredBlockSize, currentFileLength);
blocks = fileSystem.dfs.getLocatedBlocks(path.toString(), 0);
assertEquals(1, blocks.getLocatedBlocks().size());
// call hsync then call hsync(end_block) immediately
stm.write(new byte[preferredBlockSize / 2]);
stm.hsync();
((DFSOutputStream) stm.getWrappedStream()).hsync(EnumSet
.of(SyncFlag.END_BLOCK));
currentFileLength = fileSystem.getFileStatus(path).getLen();
assertEquals(preferredBlockSize + preferredBlockSize / 2,
currentFileLength);
blocks = fileSystem.dfs.getLocatedBlocks(path.toString(), 0);
assertEquals(2, blocks.getLocatedBlocks().size());
stm.write(new byte[preferredBlockSize / 4]);
stm.hsync();
currentFileLength = fileSystem.getFileStatus(path).getLen();
assertEquals(preferredBlockSize + preferredBlockSize / 2
+ preferredBlockSize / 4, currentFileLength);
blocks = fileSystem.dfs.getLocatedBlocks(path.toString(), 0);
assertEquals(3, blocks.getLocatedBlocks().size());
} finally {
IOUtils.cleanup(null, stm, fileSystem);
if (cluster != null) {
cluster.shutdown();
}
}
}
public void close() throws IOException {
IOUtils.cleanup(LOG, fis);
dir.close();
}