下面列出了怎么用org.apache.hadoop.fs.FilterFileSystem的API类实例代码及写法,或者点击链接到github查看源代码。
@Test
public void testDeleteOnExitPathHandling() throws IOException {
Configuration conf = new Configuration();
conf.setClass("fs.mockfs.impl", MockFileSystem.class, FileSystem.class);
URI chrootUri = URI.create("mockfs://foo/a/b");
ChRootedFileSystem chrootFs = new ChRootedFileSystem(chrootUri, conf);
FileSystem mockFs = ((FilterFileSystem)chrootFs.getRawFileSystem())
.getRawFileSystem();
// ensure delete propagates the correct path
Path chrootPath = new Path("/c");
Path rawPath = new Path("/a/b/c");
chrootFs.delete(chrootPath, false);
verify(mockFs).delete(eq(rawPath), eq(false));
reset(mockFs);
// fake that the path exists for deleteOnExit
FileStatus stat = mock(FileStatus.class);
when(mockFs.getFileStatus(eq(rawPath))).thenReturn(stat);
// ensure deleteOnExit propagates the correct path
chrootFs.deleteOnExit(chrootPath);
chrootFs.close();
verify(mockFs).delete(eq(rawPath), eq(true));
}
@Test
public void testDeleteOnExitPathHandling() throws IOException {
Configuration conf = new Configuration();
conf.setClass("fs.mockfs.impl", MockFileSystem.class, FileSystem.class);
URI chrootUri = URI.create("mockfs://foo/a/b");
ChRootedFileSystem chrootFs = new ChRootedFileSystem(chrootUri, conf);
FileSystem mockFs = ((FilterFileSystem)chrootFs.getRawFileSystem())
.getRawFileSystem();
// ensure delete propagates the correct path
Path chrootPath = new Path("/c");
Path rawPath = new Path("/a/b/c");
chrootFs.delete(chrootPath, false);
verify(mockFs).delete(eq(rawPath), eq(false));
reset(mockFs);
// fake that the path exists for deleteOnExit
FileStatus stat = mock(FileStatus.class);
when(mockFs.getFileStatus(eq(rawPath))).thenReturn(stat);
// ensure deleteOnExit propagates the correct path
chrootFs.deleteOnExit(chrootPath);
chrootFs.close();
verify(mockFs).delete(eq(rawPath), eq(true));
}
private static FileSystem getRawFileSystem(FileSystem fileSystem)
{
if (fileSystem instanceof FilterFileSystem) {
return getRawFileSystem(((FilterFileSystem) fileSystem).getRawFileSystem());
}
return fileSystem;
}
private static CachingFileSystem<?> unwrapCachingFileSystem(FileSystem fileSystem)
{
if (fileSystem instanceof CachingFileSystem) {
return (CachingFileSystem<?>) fileSystem;
}
if (fileSystem instanceof FilterFileSystem) {
return unwrapCachingFileSystem(((FilterFileSystem) fileSystem).getRawFileSystem());
}
throw new IllegalStateException();
}
@SuppressWarnings("deprecation")
public void testSetupDistributedCacheConflicts() throws Exception {
Configuration conf = new Configuration();
conf.setClass("fs.mockfs.impl", MockFileSystem.class, FileSystem.class);
URI mockUri = URI.create("mockfs://mock/");
FileSystem mockFs = ((FilterFileSystem)FileSystem.get(mockUri, conf))
.getRawFileSystem();
URI archive = new URI("mockfs://mock/tmp/something.zip#something");
Path archivePath = new Path(archive);
URI file = new URI("mockfs://mock/tmp/something.txt#something");
Path filePath = new Path(file);
when(mockFs.resolvePath(archivePath)).thenReturn(archivePath);
when(mockFs.resolvePath(filePath)).thenReturn(filePath);
DistributedCache.addCacheArchive(archive, conf);
conf.set(MRJobConfig.CACHE_ARCHIVES_TIMESTAMPS, "10");
conf.set(MRJobConfig.CACHE_ARCHIVES_SIZES, "10");
conf.set(MRJobConfig.CACHE_ARCHIVES_VISIBILITIES, "true");
DistributedCache.addCacheFile(file, conf);
conf.set(MRJobConfig.CACHE_FILE_TIMESTAMPS, "11");
conf.set(MRJobConfig.CACHE_FILES_SIZES, "11");
conf.set(MRJobConfig.CACHE_FILE_VISIBILITIES, "true");
Map<String, LocalResource> localResources =
new HashMap<String, LocalResource>();
MRApps.setupDistributedCache(conf, localResources);
assertEquals(1, localResources.size());
LocalResource lr = localResources.get("something");
//Archive wins
assertNotNull(lr);
assertEquals(10l, lr.getSize());
assertEquals(10l, lr.getTimestamp());
assertEquals(LocalResourceType.ARCHIVE, lr.getType());
}
@SuppressWarnings("deprecation")
public void testSetupDistributedCacheConflictsFiles() throws Exception {
Configuration conf = new Configuration();
conf.setClass("fs.mockfs.impl", MockFileSystem.class, FileSystem.class);
URI mockUri = URI.create("mockfs://mock/");
FileSystem mockFs = ((FilterFileSystem)FileSystem.get(mockUri, conf))
.getRawFileSystem();
URI file = new URI("mockfs://mock/tmp/something.zip#something");
Path filePath = new Path(file);
URI file2 = new URI("mockfs://mock/tmp/something.txt#something");
Path file2Path = new Path(file2);
when(mockFs.resolvePath(filePath)).thenReturn(filePath);
when(mockFs.resolvePath(file2Path)).thenReturn(file2Path);
DistributedCache.addCacheFile(file, conf);
DistributedCache.addCacheFile(file2, conf);
conf.set(MRJobConfig.CACHE_FILE_TIMESTAMPS, "10,11");
conf.set(MRJobConfig.CACHE_FILES_SIZES, "10,11");
conf.set(MRJobConfig.CACHE_FILE_VISIBILITIES, "true,true");
Map<String, LocalResource> localResources =
new HashMap<String, LocalResource>();
MRApps.setupDistributedCache(conf, localResources);
assertEquals(1, localResources.size());
LocalResource lr = localResources.get("something");
//First one wins
assertNotNull(lr);
assertEquals(10l, lr.getSize());
assertEquals(10l, lr.getTimestamp());
assertEquals(LocalResourceType.FILE, lr.getType());
}
/**
* Tests that ChRootedFileSystem delegates calls for every ACL method to the
* underlying FileSystem with all Path arguments translated as required to
* enforce chroot.
*/
@Test
public void testAclMethodsPathTranslation() throws IOException {
Configuration conf = new Configuration();
conf.setClass("fs.mockfs.impl", MockFileSystem.class, FileSystem.class);
URI chrootUri = URI.create("mockfs://foo/a/b");
ChRootedFileSystem chrootFs = new ChRootedFileSystem(chrootUri, conf);
FileSystem mockFs = ((FilterFileSystem)chrootFs.getRawFileSystem())
.getRawFileSystem();
Path chrootPath = new Path("/c");
Path rawPath = new Path("/a/b/c");
List<AclEntry> entries = Collections.emptyList();
chrootFs.modifyAclEntries(chrootPath, entries);
verify(mockFs).modifyAclEntries(rawPath, entries);
chrootFs.removeAclEntries(chrootPath, entries);
verify(mockFs).removeAclEntries(rawPath, entries);
chrootFs.removeDefaultAcl(chrootPath);
verify(mockFs).removeDefaultAcl(rawPath);
chrootFs.removeAcl(chrootPath);
verify(mockFs).removeAcl(rawPath);
chrootFs.setAcl(chrootPath, entries);
verify(mockFs).setAcl(rawPath, entries);
chrootFs.getAclStatus(chrootPath);
verify(mockFs).getAclStatus(rawPath);
}
@SuppressWarnings("deprecation")
public void testSetupDistributedCacheConflicts() throws Exception {
Configuration conf = new Configuration();
conf.setClass("fs.mockfs.impl", MockFileSystem.class, FileSystem.class);
URI mockUri = URI.create("mockfs://mock/");
FileSystem mockFs = ((FilterFileSystem)FileSystem.get(mockUri, conf))
.getRawFileSystem();
URI archive = new URI("mockfs://mock/tmp/something.zip#something");
Path archivePath = new Path(archive);
URI file = new URI("mockfs://mock/tmp/something.txt#something");
Path filePath = new Path(file);
when(mockFs.resolvePath(archivePath)).thenReturn(archivePath);
when(mockFs.resolvePath(filePath)).thenReturn(filePath);
DistributedCache.addCacheArchive(archive, conf);
conf.set(MRJobConfig.CACHE_ARCHIVES_TIMESTAMPS, "10");
conf.set(MRJobConfig.CACHE_ARCHIVES_SIZES, "10");
conf.set(MRJobConfig.CACHE_ARCHIVES_VISIBILITIES, "true");
DistributedCache.addCacheFile(file, conf);
conf.set(MRJobConfig.CACHE_FILE_TIMESTAMPS, "11");
conf.set(MRJobConfig.CACHE_FILES_SIZES, "11");
conf.set(MRJobConfig.CACHE_FILE_VISIBILITIES, "true");
Map<String, LocalResource> localResources =
new HashMap<String, LocalResource>();
MRApps.setupDistributedCache(conf, localResources);
assertEquals(1, localResources.size());
LocalResource lr = localResources.get("something");
//Archive wins
assertNotNull(lr);
assertEquals(10l, lr.getSize());
assertEquals(10l, lr.getTimestamp());
assertEquals(LocalResourceType.ARCHIVE, lr.getType());
}
@SuppressWarnings("deprecation")
public void testSetupDistributedCacheConflictsFiles() throws Exception {
Configuration conf = new Configuration();
conf.setClass("fs.mockfs.impl", MockFileSystem.class, FileSystem.class);
URI mockUri = URI.create("mockfs://mock/");
FileSystem mockFs = ((FilterFileSystem)FileSystem.get(mockUri, conf))
.getRawFileSystem();
URI file = new URI("mockfs://mock/tmp/something.zip#something");
Path filePath = new Path(file);
URI file2 = new URI("mockfs://mock/tmp/something.txt#something");
Path file2Path = new Path(file2);
when(mockFs.resolvePath(filePath)).thenReturn(filePath);
when(mockFs.resolvePath(file2Path)).thenReturn(file2Path);
DistributedCache.addCacheFile(file, conf);
DistributedCache.addCacheFile(file2, conf);
conf.set(MRJobConfig.CACHE_FILE_TIMESTAMPS, "10,11");
conf.set(MRJobConfig.CACHE_FILES_SIZES, "10,11");
conf.set(MRJobConfig.CACHE_FILE_VISIBILITIES, "true,true");
Map<String, LocalResource> localResources =
new HashMap<String, LocalResource>();
MRApps.setupDistributedCache(conf, localResources);
assertEquals(1, localResources.size());
LocalResource lr = localResources.get("something");
//First one wins
assertNotNull(lr);
assertEquals(10l, lr.getSize());
assertEquals(10l, lr.getTimestamp());
assertEquals(LocalResourceType.FILE, lr.getType());
}
/**
* Tests that ChRootedFileSystem delegates calls for every ACL method to the
* underlying FileSystem with all Path arguments translated as required to
* enforce chroot.
*/
@Test
public void testAclMethodsPathTranslation() throws IOException {
Configuration conf = new Configuration();
conf.setClass("fs.mockfs.impl", MockFileSystem.class, FileSystem.class);
URI chrootUri = URI.create("mockfs://foo/a/b");
ChRootedFileSystem chrootFs = new ChRootedFileSystem(chrootUri, conf);
FileSystem mockFs = ((FilterFileSystem)chrootFs.getRawFileSystem())
.getRawFileSystem();
Path chrootPath = new Path("/c");
Path rawPath = new Path("/a/b/c");
List<AclEntry> entries = Collections.emptyList();
chrootFs.modifyAclEntries(chrootPath, entries);
verify(mockFs).modifyAclEntries(rawPath, entries);
chrootFs.removeAclEntries(chrootPath, entries);
verify(mockFs).removeAclEntries(rawPath, entries);
chrootFs.removeDefaultAcl(chrootPath);
verify(mockFs).removeDefaultAcl(rawPath);
chrootFs.removeAcl(chrootPath);
verify(mockFs).removeAcl(rawPath);
chrootFs.setAcl(chrootPath, entries);
verify(mockFs).setAcl(rawPath, entries);
chrootFs.getAclStatus(chrootPath);
verify(mockFs).getAclStatus(rawPath);
}
/**
* Recover the lease from HDFS, retrying multiple times.
*/
public static void recoverFileLease(FileSystem fs, Path p, Configuration conf,
CancelableProgressable reporter) throws IOException {
if (fs instanceof FilterFileSystem) {
fs = ((FilterFileSystem) fs).getRawFileSystem();
}
// lease recovery not needed for local file system case.
if (!(fs instanceof DistributedFileSystem)) {
return;
}
recoverDFSFileLease((DistributedFileSystem) fs, p, conf, reporter);
}
public static DistributedFileSystem convertToDFS(FileSystem fs) {
// for RaidDFS
if (fs instanceof FilterFileSystem) {
fs = ((FilterFileSystem) fs).getRawFileSystem();
}
if (fs instanceof DistributedFileSystem)
return (DistributedFileSystem) fs;
else
return null;
}
private void assertRawFileSystemInstanceOf(FileSystem actual, Class<? extends FileSystem> expectedType)
{
assertInstanceOf(actual, FilterFileSystem.class);
FileSystem rawFileSystem = ((FilterFileSystem) actual).getRawFileSystem();
assertInstanceOf(rawFileSystem, expectedType);
}
@SuppressWarnings("deprecation")
@Test (timeout = 30000)
public void testSetupDistributedCache() throws Exception {
Configuration conf = new Configuration();
conf.setClass("fs.mockfs.impl", MockFileSystem.class, FileSystem.class);
URI mockUri = URI.create("mockfs://mock/");
FileSystem mockFs = ((FilterFileSystem)FileSystem.get(mockUri, conf))
.getRawFileSystem();
URI archive = new URI("mockfs://mock/tmp/something.zip");
Path archivePath = new Path(archive);
URI file = new URI("mockfs://mock/tmp/something.txt#something");
Path filePath = new Path(file);
when(mockFs.resolvePath(archivePath)).thenReturn(archivePath);
when(mockFs.resolvePath(filePath)).thenReturn(filePath);
DistributedCache.addCacheArchive(archive, conf);
conf.set(MRJobConfig.CACHE_ARCHIVES_TIMESTAMPS, "10");
conf.set(MRJobConfig.CACHE_ARCHIVES_SIZES, "10");
conf.set(MRJobConfig.CACHE_ARCHIVES_VISIBILITIES, "true");
DistributedCache.addCacheFile(file, conf);
conf.set(MRJobConfig.CACHE_FILE_TIMESTAMPS, "11");
conf.set(MRJobConfig.CACHE_FILES_SIZES, "11");
conf.set(MRJobConfig.CACHE_FILE_VISIBILITIES, "true");
Map<String, LocalResource> localResources =
new HashMap<String, LocalResource>();
MRApps.setupDistributedCache(conf, localResources);
assertEquals(2, localResources.size());
LocalResource lr = localResources.get("something.zip");
assertNotNull(lr);
assertEquals(10l, lr.getSize());
assertEquals(10l, lr.getTimestamp());
assertEquals(LocalResourceType.ARCHIVE, lr.getType());
lr = localResources.get("something");
assertNotNull(lr);
assertEquals(11l, lr.getSize());
assertEquals(11l, lr.getTimestamp());
assertEquals(LocalResourceType.FILE, lr.getType());
}
@SuppressWarnings("deprecation")
@Test (timeout = 30000)
public void testSetupDistributedCache() throws Exception {
Configuration conf = new Configuration();
conf.setClass("fs.mockfs.impl", MockFileSystem.class, FileSystem.class);
URI mockUri = URI.create("mockfs://mock/");
FileSystem mockFs = ((FilterFileSystem)FileSystem.get(mockUri, conf))
.getRawFileSystem();
URI archive = new URI("mockfs://mock/tmp/something.zip");
Path archivePath = new Path(archive);
URI file = new URI("mockfs://mock/tmp/something.txt#something");
Path filePath = new Path(file);
when(mockFs.resolvePath(archivePath)).thenReturn(archivePath);
when(mockFs.resolvePath(filePath)).thenReturn(filePath);
DistributedCache.addCacheArchive(archive, conf);
conf.set(MRJobConfig.CACHE_ARCHIVES_TIMESTAMPS, "10");
conf.set(MRJobConfig.CACHE_ARCHIVES_SIZES, "10");
conf.set(MRJobConfig.CACHE_ARCHIVES_VISIBILITIES, "true");
DistributedCache.addCacheFile(file, conf);
conf.set(MRJobConfig.CACHE_FILE_TIMESTAMPS, "11");
conf.set(MRJobConfig.CACHE_FILES_SIZES, "11");
conf.set(MRJobConfig.CACHE_FILE_VISIBILITIES, "true");
Map<String, LocalResource> localResources =
new HashMap<String, LocalResource>();
MRApps.setupDistributedCache(conf, localResources);
assertEquals(2, localResources.size());
LocalResource lr = localResources.get("something.zip");
assertNotNull(lr);
assertEquals(10l, lr.getSize());
assertEquals(10l, lr.getTimestamp());
assertEquals(LocalResourceType.ARCHIVE, lr.getType());
lr = localResources.get("something");
assertNotNull(lr);
assertEquals(11l, lr.getSize());
assertEquals(11l, lr.getTimestamp());
assertEquals(LocalResourceType.FILE, lr.getType());
}
@Test
public void retriesIOExceptionInStatus() throws Exception {
Stoppable stop = new StoppableImplementation();
Configuration conf = UTIL.getConfiguration();
Path testDir = UTIL.getDataTestDir();
FileSystem fs = UTIL.getTestFileSystem();
String confKey = "hbase.test.cleaner.delegates";
Path child = new Path(testDir, "child");
Path file = new Path(child, "file");
fs.mkdirs(child);
fs.create(file).close();
assertTrue("test file didn't get created.", fs.exists(file));
final AtomicBoolean fails = new AtomicBoolean(true);
FilterFileSystem filtered = new FilterFileSystem(fs) {
public FileStatus[] listStatus(Path f) throws IOException {
if (fails.get()) {
throw new IOException("whomp whomp.");
}
return fs.listStatus(f);
}
};
AllValidPaths chore =
new AllValidPaths("test-retry-ioe", stop, conf, filtered, testDir, confKey, POOL);
// trouble talking to the filesystem
Boolean result = chore.runCleaner();
// verify that it couldn't clean the files.
assertTrue("test rig failed to inject failure.", fs.exists(file));
assertTrue("test rig failed to inject failure.", fs.exists(child));
// and verify that it accurately reported the failure.
assertFalse("chore should report that it failed.", result);
// filesystem is back
fails.set(false);
result = chore.runCleaner();
// verify everything is gone.
assertFalse("file should have been destroyed.", fs.exists(file));
assertFalse("directory should have been destroyed.", fs.exists(child));
// and verify that it accurately reported success.
assertTrue("chore should claim it succeeded.", result);
}