下面列出了怎么用org.apache.hadoop.fs.FileUtil的API类实例代码及写法,或者点击链接到github查看源代码。
/**
* Add quotes to each of the command strings and
* return as a single string
* @param cmd The command to be quoted
* @param isExecutable makes shell path if the first
* argument is executable
* @return returns The quoted string.
* @throws IOException
*/
public static String addCommand(List<String> cmd, boolean isExecutable)
throws IOException {
StringBuffer command = new StringBuffer();
for(String s: cmd) {
command.append('\'');
if (isExecutable) {
// the executable name needs to be expressed as a shell path for the
// shell to find it.
command.append(FileUtil.makeShellPath(new File(s)));
isExecutable = false;
} else {
command.append(s);
}
command.append('\'');
command.append(" ");
}
return command.toString();
}
/**
* Starts an instance of DataNode
* @throws IOException
*/
@Before
public void startUp() throws IOException, URISyntaxException {
tearDownDone = false;
conf = new HdfsConfiguration();
conf.set(DFSConfigKeys.DFS_DATANODE_DATA_DIR_KEY, DATA_DIR);
conf.set(DFSConfigKeys.DFS_DATANODE_ADDRESS_KEY, "0.0.0.0:0");
conf.set(DFSConfigKeys.DFS_DATANODE_HTTP_ADDRESS_KEY, "0.0.0.0:0");
conf.set(DFSConfigKeys.DFS_DATANODE_IPC_ADDRESS_KEY, "0.0.0.0:0");
conf.setInt(CommonConfigurationKeys.IPC_CLIENT_CONNECT_MAX_RETRIES_KEY, 0);
FileSystem.setDefaultUri(conf,
"hdfs://" + NN_ADDR.getHostName() + ":" + NN_ADDR.getPort());
File dataDir = new File(DATA_DIR);
FileUtil.fullyDelete(dataDir);
dataDir.mkdirs();
StorageLocation location = StorageLocation.parse(dataDir.getPath());
locations.add(location);
}
/**
* Checks that the current running process can read, write, and execute the
* given directory by using methods of the File object.
*
* @param dir File to check
* @throws DiskErrorException if dir is not readable, not writable, or not
* executable
*/
private static void checkAccessByFileMethods(File dir)
throws DiskErrorException {
if (!FileUtil.canRead(dir)) {
throw new DiskErrorException("Directory is not readable: "
+ dir.toString());
}
if (!FileUtil.canWrite(dir)) {
throw new DiskErrorException("Directory is not writable: "
+ dir.toString());
}
if (!FileUtil.canExecute(dir)) {
throw new DiskErrorException("Directory is not executable: "
+ dir.toString());
}
}
@Test
public void testFailedOpen() throws Exception {
File logDir = new File(TEST_DIR, "testFailedOpen");
logDir.mkdirs();
FSEditLog log = FSImageTestUtil.createStandaloneEditLog(logDir);
try {
FileUtil.setWritable(logDir, false);
log.openForWrite();
fail("Did no throw exception on only having a bad dir");
} catch (IOException ioe) {
GenericTestUtils.assertExceptionContains(
"too few journals successfully started", ioe);
} finally {
FileUtil.setWritable(logDir, true);
log.close();
}
}
public static String readOutput(Path outDir, Configuration conf)
throws IOException {
FileSystem fs = outDir.getFileSystem(conf);
StringBuffer result = new StringBuffer();
Path[] fileList = FileUtil.stat2Paths(fs.listStatus(outDir,
new Utils.OutputFileUtils.OutputFilesFilter()));
for (Path outputFile : fileList) {
LOG.info("Path" + ": "+ outputFile);
BufferedReader file =
new BufferedReader(new InputStreamReader(fs.open(outputFile)));
String line = file.readLine();
while (line != null) {
result.append(line);
result.append("\n");
line = file.readLine();
}
file.close();
}
return result.toString();
}
public void testCommandLine() {
try {
createInput();
job = new StreamJob(genArgs(), true);
if(job.go() != 0) {
throw new Exception("Job Failed");
}
StringBuffer output = new StringBuffer(256);
Path[] fileList = FileUtil.stat2Paths(fileSys.listStatus(
new Path(OUTPUT_DIR)));
for (int i = 0; i < fileList.length; i++){
BufferedReader bread =
new BufferedReader(new InputStreamReader(fileSys.open(fileList[i])));
output.append(bread.readLine());
output.append("\n");
output.append(bread.readLine());
output.append("\n");
}
assertEquals(expectedOutput, output.toString());
} catch (Exception e) {
e.printStackTrace();
} finally {
CACHE_FILE_1.delete();
CACHE_FILE_2.delete();
}
}
/** Test randomly mixing append, snapshot and truncate operations. */
@Test(timeout=TEST_TIMEOUT_SECOND*1000)
public void testAST() throws Exception {
final String dirPathString = "/dir";
final Path dir = new Path(dirPathString);
dfs.mkdirs(dir);
dfs.allowSnapshot(dir);
final File localDir = new File(
System.getProperty("test.build.data", "target/test/data")
+ dirPathString);
if (localDir.exists()) {
FileUtil.fullyDelete(localDir);
}
localDir.mkdirs();
final DirWorker w = new DirWorker(dir, localDir, FILE_WORKER_NUM);
w.startAllFiles();
w.start();
Worker.sleep(TEST_TIME_SECOND * 1000);
w.stop();
w.stopAllFiles();
w.checkEverything();
}
/**
* Test that a volume that is considered failed on startup is seen as
* a failed volume by the NN.
*/
@Test
public void testFailedVolumeOnStartupIsCounted() throws Exception {
assumeTrue(!System.getProperty("os.name").startsWith("Windows"));
final DatanodeManager dm = cluster.getNamesystem().getBlockManager(
).getDatanodeManager();
long origCapacity = DFSTestUtil.getLiveDatanodeCapacity(dm);
File dir = new File(cluster.getInstanceStorageDir(0, 0), "current");
try {
prepareDirToFail(dir);
restartDatanodes(1, false);
// The cluster is up..
assertEquals(true, cluster.getDataNodes().get(0)
.isBPServiceAlive(cluster.getNamesystem().getBlockPoolId()));
// but there has been a single volume failure
DFSTestUtil.waitForDatanodeStatus(dm, 1, 0, 1,
origCapacity / 2, WAIT_FOR_HEARTBEATS);
} finally {
FileUtil.chmod(dir.toString(), "755");
}
}
public void copy(ElementDescriptor dstName,
Properties dstConfiguration,
boolean removeSrc)
throws IOException {
FileSystem srcFS = this.fs.getHFS();
FileSystem dstFS = ((HPath)dstName).fs.getHFS();
Path srcPath = this.path;
Path dstPath = ((HPath)dstName).path;
boolean result = FileUtil.copy(srcFS,
srcPath,
dstFS,
dstPath,
false,
new Configuration());
if (!result) {
int errCode = 2097;
String msg = "Failed to copy from: " + this.toString() +
" to: " + dstName.toString();
throw new ExecException(msg, errCode, PigException.BUG);
}
}
@Test(timeout=60000)
public void testDirectoryFallbacks() throws Exception {
File nonExistentPath = new File(TEST_BASE, "nonexistent");
File permissionDeniedPath = new File("/");
File goodPath = new File(TEST_BASE, "testDirectoryFallbacks");
goodPath.mkdirs();
try {
SharedFileDescriptorFactory.create("shm_",
new String[] { nonExistentPath.getAbsolutePath(),
permissionDeniedPath.getAbsolutePath() });
Assert.fail();
} catch (IOException e) {
}
SharedFileDescriptorFactory factory =
SharedFileDescriptorFactory.create("shm_",
new String[] { nonExistentPath.getAbsolutePath(),
permissionDeniedPath.getAbsolutePath(),
goodPath.getAbsolutePath() } );
Assert.assertEquals(goodPath.getAbsolutePath(), factory.getPath());
FileUtil.fullyDelete(goodPath);
}
/**
* (Mock)Test JobTracker.removeJobTasks() which is called only when the job
* retires.
*/
public void testJobRemoval() throws Exception {
LOG.info("Starting testJobRemoval");
MiniMRCluster mr = null;
try {
JobConf conf = new JobConf();
mr = startCluster(conf, 0);
JobTracker jobtracker = mr.getJobTrackerRunner().getJobTracker();
// test map task removal
testRemoveJobTasks(jobtracker, conf, TaskType.MAP);
// test reduce task removal
testRemoveJobTasks(jobtracker, conf, TaskType.REDUCE);
// test job setup removal
testRemoveJobTasks(jobtracker, conf, TaskType.JOB_SETUP);
// test job cleanup removal
testRemoveJobTasks(jobtracker, conf, TaskType.JOB_CLEANUP);
} finally {
if (mr != null) { mr.shutdown();}
// cleanup
FileUtil.fullyDelete(new File(testDir.toString()));
}
}
@BeforeClass
public static void createHDFS() {
try {
baseDir = new File("./target/localfs/fs_tests").getAbsoluteFile();
FileUtil.fullyDelete(baseDir);
org.apache.hadoop.conf.Configuration hdConf = new org.apache.hadoop.conf.Configuration();
localFsURI = "file:///" + baseDir + "/";
localFs = new org.apache.hadoop.fs.Path(localFsURI).getFileSystem(hdConf);
} catch (Throwable e) {
e.printStackTrace();
Assert.fail("Test failed " + e.getMessage());
}
}
@Before
public void createHDFS() {
if (failoverStrategy.equals(FailoverStrategy.RestartPipelinedRegionStrategy)) {
// TODO the 'NO_OF_RETRIES' is useless for current RestartPipelinedRegionStrategy,
// for this ContinuousFileProcessingCheckpointITCase, using RestartPipelinedRegionStrategy would result in endless running.
throw new AssumptionViolatedException("ignored ContinuousFileProcessingCheckpointITCase when using RestartPipelinedRegionStrategy");
}
try {
baseDir = new File("./target/localfs/fs_tests").getAbsoluteFile();
FileUtil.fullyDelete(baseDir);
org.apache.hadoop.conf.Configuration hdConf = new org.apache.hadoop.conf.Configuration();
localFsURI = "file:///" + baseDir + "/";
localFs = new org.apache.hadoop.fs.Path(localFsURI).getFileSystem(hdConf);
} catch (Throwable e) {
e.printStackTrace();
Assert.fail("Test failed " + e.getMessage());
}
}
/**
* List all of the files in 'dir' that match the regex 'pattern'.
* Then check that this list is identical to 'expectedMatches'.
* @throws IOException if the dir is inaccessible
*/
public static void assertGlobEquals(File dir, String pattern,
String ... expectedMatches) throws IOException {
Set<String> found = Sets.newTreeSet();
for (File f : FileUtil.listFiles(dir)) {
if (f.getName().matches(pattern)) {
found.add(f.getName());
}
}
Set<String> expectedSet = Sets.newTreeSet(
Arrays.asList(expectedMatches));
Assert.assertEquals("Bad files matching " + pattern + " in " + dir,
Joiner.on(",").join(expectedSet),
Joiner.on(",").join(found));
}
private MiniJournalCluster(Builder b) throws IOException {
LOG.info("Starting MiniJournalCluster with " +
b.numJournalNodes + " journal nodes");
if (b.baseDir != null) {
this.baseDir = new File(b.baseDir);
} else {
this.baseDir = new File(MiniDFSCluster.getBaseDirectory());
}
nodes = new JNInfo[b.numJournalNodes];
for (int i = 0; i < b.numJournalNodes; i++) {
if (b.format) {
File dir = getStorageDir(i);
LOG.debug("Fully deleting JN directory " + dir);
FileUtil.fullyDelete(dir);
}
JournalNode jn = new JournalNode();
jn.setConf(createConfForNode(b, i));
jn.start();
nodes[i] = new JNInfo(jn);
}
}
/**
* Checks that the current running process can read, write, and execute the
* given directory by using methods of the File object.
*
* @param dir File to check
* @throws DiskErrorException if dir is not readable, not writable, or not
* executable
*/
private static void checkAccessByFileMethods(File dir)
throws DiskErrorException {
if (!FileUtil.canRead(dir)) {
throw new DiskErrorException("Directory is not readable: "
+ dir.toString());
}
if (!FileUtil.canWrite(dir)) {
throw new DiskErrorException("Directory is not writable: "
+ dir.toString());
}
if (!FileUtil.canExecute(dir)) {
throw new DiskErrorException("Directory is not executable: "
+ dir.toString());
}
}
@Before
public void setUp() {
assertTrue(FileUtil.fullyDelete(TEST_DIR));
assertTrue(TEST_DIR.mkdirs());
oldStdout = System.out;
oldStderr = System.err;
stdout = new ByteArrayOutputStream();
printStdout = new PrintStream(stdout);
System.setOut(printStdout);
stderr = new ByteArrayOutputStream();
printStderr = new PrintStream(stderr);
System.setErr(printStderr);
}
/**
* Recovers the specified path from the parity file
*/
public Path[] recover(String cmd, String argv[], int startindex)
throws IOException {
Path[] paths = new Path[(argv.length - startindex) / 2];
int j = 0;
for (int i = startindex; i < argv.length; i = i + 2) {
String path = argv[i];
long corruptOffset = Long.parseLong(argv[i+1]);
LOG.info("RaidShell recoverFile for " + path + " corruptOffset " + corruptOffset);
Path recovered = new Path("/tmp/recovered." + System.currentTimeMillis());
FileSystem fs = recovered.getFileSystem(conf);
DistributedFileSystem dfs = (DistributedFileSystem)fs;
Configuration raidConf = new Configuration(conf);
raidConf.set("fs.hdfs.impl",
"org.apache.hadoop.hdfs.DistributedRaidFileSystem");
raidConf.set("fs.raid.underlyingfs.impl",
"org.apache.hadoop.hdfs.DistributedFileSystem");
raidConf.setBoolean("fs.hdfs.impl.disable.cache", true);
java.net.URI dfsUri = dfs.getUri();
FileSystem raidFs = FileSystem.get(dfsUri, raidConf);
FileUtil.copy(raidFs, new Path(path), fs, recovered, false, conf);
paths[j] = recovered;
LOG.info("Raidshell created recovery file " + paths[j]);
j++;
}
return paths;
}
/**
* Deletes a given chunk.
*
* @param container - Container for the chunk
* @param blockID - ID of the block
* @param info - Chunk Info
* @throws StorageContainerException
*/
@Override
public void deleteChunk(Container container, BlockID blockID, ChunkInfo info)
throws StorageContainerException {
checkLayoutVersion(container);
Preconditions.checkNotNull(blockID, "Block ID cannot be null.");
KeyValueContainer kvContainer = (KeyValueContainer) container;
// In version1, we have only chunk file.
File chunkFile = getChunkFile(kvContainer, blockID, info);
// if the chunk file does not exist, it might have already been deleted.
// The call might be because of reapply of transactions on datanode
// restart.
if (!chunkFile.exists()) {
LOG.warn("Chunk file not found for chunk {}", info);
return;
}
long chunkFileSize = chunkFile.length();
boolean allowed = info.getLen() == chunkFileSize
// chunk written by new client to old datanode, expected
// file length is offset + real chunk length; see HDDS-3644
|| info.getLen() + info.getOffset() == chunkFileSize;
if (allowed) {
FileUtil.fullyDelete(chunkFile);
LOG.info("Deleted chunk file {} (size {}) for chunk {}",
chunkFile, chunkFileSize, info);
} else {
LOG.error("Not Supported Operation. Trying to delete a " +
"chunk that is in shared file. chunk info : {}", info);
throw new StorageContainerException("Not Supported Operation. " +
"Trying to delete a chunk that is in shared file. chunk info : "
+ info, UNSUPPORTED_REQUEST);
}
}
public void runStreamJob(final String outputExpect, boolean ignoreKey)
throws Exception
{
String outFileName = "part-00000";
File outFile = null;
try {
try {
FileUtil.fullyDelete(OUTPUT_DIR.getAbsoluteFile());
} catch (Exception e) {
}
createInput();
boolean mayExit = false;
// During tests, the default Configuration will use a local mapred
// So don't specify -config or -cluster
job = new StreamJob(genArgs(ignoreKey), mayExit);
job.go();
outFile = new File(OUTPUT_DIR, outFileName).getAbsoluteFile();
String output = StreamUtil.slurp(outFile);
System.err.println("outEx1=" + outputExpect);
System.err.println(" out1=" + output);
assertEquals(outputExpect, output);
} finally {
INPUT_FILE.delete();
FileUtil.fullyDelete(OUTPUT_DIR.getAbsoluteFile());
}
}
/**
* Earlier versions of HDFS had a bug (HDFS-2991) which caused
* append(), when called exactly at a block boundary,
* to not log an OP_ADD. This ensures that we can read from
* such buggy versions correctly, by loading an image created
* using a namesystem image created with 0.23.1-rc2 exhibiting
* the issue.
*/
@Test
public void testLoadLogsFromBuggyEarlierVersions() throws IOException {
final Configuration conf = new HdfsConfiguration();
String tarFile = System.getProperty("test.cache.data", "build/test/cache")
+ "/" + HADOOP_23_BROKEN_APPEND_TGZ;
String testDir = PathUtils.getTestDirName(getClass());
File dfsDir = new File(testDir, "image-with-buggy-append");
if (dfsDir.exists() && !FileUtil.fullyDelete(dfsDir)) {
throw new IOException("Could not delete dfs directory '" + dfsDir + "'");
}
FileUtil.unTar(new File(tarFile), new File(testDir));
File nameDir = new File(dfsDir, "name");
GenericTestUtils.assertExists(nameDir);
conf.set(DFSConfigKeys.DFS_NAMENODE_NAME_DIR_KEY, nameDir.getAbsolutePath());
MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf).numDataNodes(0)
.format(false)
.manageDataDfsDirs(false)
.manageNameDfsDirs(false)
.numDataNodes(0)
.waitSafeMode(false)
.startupOption(StartupOption.UPGRADE)
.build();
try {
FileSystem fs = cluster.getFileSystem();
Path testPath = new Path("/tmp/io_data/test_io_0");
assertEquals(2*1024*1024, fs.getFileStatus(testPath).getLen());
} finally {
cluster.shutdown();
}
}
/**
* Tests FTPFileSystem, create(), open(), delete(), mkdirs(), rename(),
* listStatus(), getStatus() APIs. *
*
* @throws Exception
*/
public void testReadWrite() throws Exception {
DFSTestUtil util = new DFSTestUtil("TestFTPFileSystem", 20, 3, 1024 * 1024);
localFs.setWorkingDirectory(workDir);
Path localData = new Path(workDir, "srcData");
Path remoteData = new Path("srcData");
util.createFiles(localFs, localData.toUri().getPath());
boolean dataConsistency = util.checkFiles(localFs, localData.getName());
assertTrue("Test data corrupted", dataConsistency);
// Copy files and directories recursively to FTP file system.
boolean filesCopied = FileUtil.copy(localFs, localData, ftpFs, remoteData,
false, defaultConf);
assertTrue("Copying to FTPFileSystem failed", filesCopied);
// Rename the remote copy
Path renamedData = new Path("Renamed");
boolean renamed = ftpFs.rename(remoteData, renamedData);
assertTrue("Rename failed", renamed);
// Copy files and directories from FTP file system and delete remote copy.
filesCopied = FileUtil.copy(ftpFs, renamedData, localFs, workDir, true,
defaultConf);
assertTrue("Copying from FTPFileSystem fails", filesCopied);
// Check if the data was received completely without any corruption.
dataConsistency = util.checkFiles(localFs, renamedData.getName());
assertTrue("Invalid or corrupted data recieved from FTP Server!",
dataConsistency);
// Delete local copies
boolean deleteSuccess = localFs.delete(renamedData, true)
& localFs.delete(localData, true);
assertTrue("Local test data deletion failed", deleteSuccess);
}
private static MiniDFSCluster startMini(String testName) throws IOException {
File baseDir = new File("./target/hdfs/" + testName).getAbsoluteFile();
FileUtil.fullyDelete(baseDir);
Configuration conf = new Configuration();
conf.set(MiniDFSCluster.HDFS_MINIDFS_BASEDIR, baseDir.getAbsolutePath());
MiniDFSCluster.Builder builder = new MiniDFSCluster.Builder(conf);
MiniDFSCluster hdfsCluster = builder.clusterId(testName).build();
hdfsCluster.waitActive();
return hdfsCluster;
}
@After
public void cleanup() {
if (scm != null) {
scm.stop();
scm.join();
}
FileUtil.fullyDelete(testDir);
}
@AfterClass
public static void tearDown() throws Exception {
if (cluster != null) {
cluster.shutdown();
}
FileUtil.fullyDelete(new File(BASEDIR));
KeyStoreTestUtil.cleanupSSLConfig(keystoresDir, sslConfDir);
}
@Override
public InputSplit[] getSplits(JobConf job, int numSplits)
throws IOException {
Path[] paths = FileUtil.stat2Paths(FileInputFormat.listStatus(job));
List<MultiFileSplit> splits = new ArrayList<MultiFileSplit>(Math.min(numSplits, paths.length));
if (paths.length != 0) {
// HADOOP-1818: Manage splits only if there are paths
long[] lengths = new long[paths.length];
long totLength = 0;
for(int i=0; i<paths.length; i++) {
FileSystem fs = paths[i].getFileSystem(job);
lengths[i] = fs.getContentSummary(paths[i]).getLength();
totLength += lengths[i];
}
double avgLengthPerSplit = ((double)totLength) / numSplits;
long cumulativeLength = 0;
int startIndex = 0;
for(int i=0; i<numSplits; i++) {
int splitSize = findSize(i, avgLengthPerSplit, cumulativeLength
, startIndex, lengths);
if (splitSize != 0) {
// HADOOP-1818: Manage split only if split size is not equals to 0
Path[] splitPaths = new Path[splitSize];
long[] splitLengths = new long[splitSize];
System.arraycopy(paths, startIndex, splitPaths , 0, splitSize);
System.arraycopy(lengths, startIndex, splitLengths , 0, splitSize);
splits.add(new MultiFileSplit(job, splitPaths, splitLengths));
startIndex += splitSize;
for(long l: splitLengths) {
cumulativeLength += l;
}
}
}
}
return splits.toArray(new MultiFileSplit[splits.size()]);
}
public void unpackStorage() throws IOException {
String tarFile = System.getProperty("test.cache.data", "build/test/cache") +
"/hadoop-26-dfs-dir.tgz";
String dataDir = System.getProperty("test.build.data", "build/test/data");
File dfsDir = new File(dataDir, "dfs");
if ( dfsDir.exists() && !FileUtil.fullyDelete(dfsDir) ) {
throw new IOException("Could not delete dfs directory '" + dfsDir + "'");
}
FileUtil.unTar(new File(tarFile), new File(dataDir));
//Now read the reference info
BufferedReader reader = new BufferedReader(
new FileReader(System.getProperty("test.cache.data", "build/test/cache") +
"/hadoop-dfs-dir.txt"));
String line;
while ( (line = reader.readLine()) != null ) {
line = line.trim();
if (line.length() <= 0 || line.startsWith("#")) {
continue;
}
String[] arr = line.split("\\s+\t\\s+");
if (arr.length < 1) {
continue;
}
if (arr[0].equals("printChecksums")) {
printChecksum = true;
break;
}
if (arr.length < 2) {
continue;
}
ReferenceFileInfo info = new ReferenceFileInfo();
info.path = arr[0];
info.checksum = Long.parseLong(arr[1]);
refList.add(info);
}
reader.close();
}
@Before
public void setup() throws Exception {
conf = new OzoneConfiguration();
String ozoneMetaPath =
GenericTestUtils.getTempPath("ozoneMeta");
File ozoneMetaFile = new File(ozoneMetaPath);
conf.set(OZONE_METADATA_DIRS, ozoneMetaPath);
FileUtil.fullyDelete(ozoneMetaFile);
String keyDirName = conf.get(HDDS_KEY_DIR_NAME,
HDDS_KEY_DIR_NAME_DEFAULT);
File ozoneKeyDir = new File(ozoneMetaFile, keyDirName);
ozoneKeyDir.mkdirs();
conf.setBoolean(OZONE_SECURITY_ENABLED_KEY, true);
conf.setBoolean(HddsConfigKeys.HDDS_GRPC_TLS_ENABLED, true);
conf.setBoolean(HddsConfigKeys.HDDS_GRPC_TLS_TEST_CERT, true);
long expiryTime = conf.getTimeDuration(
HddsConfigKeys.HDDS_BLOCK_TOKEN_EXPIRY_TIME,
HddsConfigKeys.HDDS_BLOCK_TOKEN_EXPIRY_TIME_DEFAULT,
TimeUnit.MILLISECONDS);
caClient = new CertificateClientTestImpl(conf);
secretManager = new OzoneBlockTokenSecretManager(new SecurityConfig(conf),
expiryTime, caClient.getCertificate().
getSerialNumber().toString());
}
/**
* Delete all files and directories in the trash directories.
*/
public void restoreTrash() {
for (StorageDirectory sd : storageDirs) {
File trashRoot = getTrashRootDir(sd);
try {
Preconditions.checkState(!(trashRoot.exists() && sd.getPreviousDir().exists()));
restoreBlockFilesFromTrash(trashRoot);
FileUtil.fullyDelete(getTrashRootDir(sd));
} catch (IOException ioe) {
LOG.warn("Restoring trash failed for storage directory " + sd);
}
}
}
private void testCommitterInternal(int version) throws Exception {
Job job = Job.getInstance();
FileOutputFormat.setOutputPath(job, outDir);
Configuration conf = job.getConfiguration();
conf.set(MRJobConfig.TASK_ATTEMPT_ID, attempt);
conf.setInt(FileOutputCommitter.FILEOUTPUTCOMMITTER_ALGORITHM_VERSION,
version);
JobContext jContext = new JobContextImpl(conf, taskID.getJobID());
TaskAttemptContext tContext = new TaskAttemptContextImpl(conf, taskID);
FileOutputCommitter committer = new FileOutputCommitter(outDir, tContext);
// setup
committer.setupJob(jContext);
committer.setupTask(tContext);
// write output
TextOutputFormat theOutputFormat = new TextOutputFormat();
RecordWriter theRecordWriter = theOutputFormat.getRecordWriter(tContext);
writeOutput(theRecordWriter, tContext);
// do commit
committer.commitTask(tContext);
committer.commitJob(jContext);
// validate output
validateContent(outDir);
FileUtil.fullyDelete(new File(outDir.toString()));
}