下面列出了怎么用org.apache.hadoop.fs.FileContext的API类实例代码及写法,或者点击链接到github查看源代码。
private static void initDirs(Configuration conf, String user, String appId,
FileContext lfs, List<Path> localDirs) throws IOException {
if (null == localDirs || 0 == localDirs.size()) {
throw new IOException("Cannot initialize without local dirs");
}
String[] appsFileCacheDirs = new String[localDirs.size()];
String[] usersFileCacheDirs = new String[localDirs.size()];
for (int i = 0, n = localDirs.size(); i < n; ++i) {
// $x/usercache/$user
Path base = lfs.makeQualified(
new Path(new Path(localDirs.get(i), USERCACHE), user));
// $x/usercache/$user/filecache
Path userFileCacheDir = new Path(base, FILECACHE);
usersFileCacheDirs[i] = userFileCacheDir.toString();
createDir(lfs, userFileCacheDir, FILECACHE_PERMS, false);
// $x/usercache/$user/appcache/$appId
Path appBase = new Path(base, new Path(APPCACHE, appId));
// $x/usercache/$user/appcache/$appId/filecache
Path appFileCacheDir = new Path(appBase, FILECACHE);
appsFileCacheDirs[i] = appFileCacheDir.toString();
createDir(lfs, appFileCacheDir, FILECACHE_PERMS, false);
}
conf.setStrings(String.format(APPCACHE_CTXT_FMT, appId), appsFileCacheDirs);
conf.setStrings(String.format(USERCACHE_CTXT_FMT, user), usersFileCacheDirs);
}
public AvroFileInputStream(FileStatus status) throws IOException {
pos = 0;
buffer = new byte[0];
GenericDatumReader<Object> reader = new GenericDatumReader<Object>();
FileContext fc = FileContext.getFileContext(new Configuration());
fileReader =
DataFileReader.openReader(new AvroFSInput(fc, status.getPath()),reader);
Schema schema = fileReader.getSchema();
writer = new GenericDatumWriter<Object>(schema);
output = new ByteArrayOutputStream();
JsonGenerator generator =
new JsonFactory().createJsonGenerator(output, JsonEncoding.UTF8);
MinimalPrettyPrinter prettyPrinter = new MinimalPrettyPrinter();
prettyPrinter.setRootValueSeparator(System.getProperty("line.separator"));
generator.setPrettyPrinter(prettyPrinter);
encoder = EncoderFactory.get().jsonEncoder(schema, generator);
}
static LocalResource createZipFile(FileContext files, Path p, int len,
Random r, LocalResourceVisibility vis) throws IOException,
URISyntaxException {
byte[] bytes = new byte[len];
r.nextBytes(bytes);
File archiveFile = new File(p.toUri().getPath() + ".ZIP");
archiveFile.createNewFile();
ZipOutputStream out = new ZipOutputStream(
new FileOutputStream(archiveFile));
out.putNextEntry(new ZipEntry(p.getName()));
out.write(bytes);
out.closeEntry();
out.close();
LocalResource ret = recordFactory.newRecordInstance(LocalResource.class);
ret.setResource(ConverterUtils.getYarnUrlFromPath(new Path(p.toString()
+ ".ZIP")));
ret.setSize(len);
ret.setType(LocalResourceType.ARCHIVE);
ret.setVisibility(vis);
ret.setTimestamp(files.getFileStatus(new Path(p.toString() + ".ZIP"))
.getModificationTime());
return ret;
}
/** Parse the command line arguments and initialize the data */
private int init(String[] args) {
try { // initialize file system handle
fc = FileContext.getFileContext(getConf());
} catch (IOException ioe) {
System.err.println("Can not initialize the file system: " +
ioe.getLocalizedMessage());
return -1;
}
for (int i = 0; i < args.length; i++) { // parse command line
if (args[i].equals("-root")) {
root = new Path(args[++i]);
} else if (args[i].equals("-inDir")) {
inDir = new File(args[++i]);
} else {
System.err.println(USAGE);
ToolRunner.printGenericCommandUsage(System.err);
System.exit(-1);
}
}
return 0;
}
private void mkdir(FileContext fc, Path path, FsPermission fsp)
throws IOException {
if (!fc.util().exists(path)) {
try {
fc.mkdir(path, fsp, true);
FileStatus fsStatus = fc.getFileStatus(path);
LOG.info("Perms after creating " + fsStatus.getPermission().toShort()
+ ", Expected: " + fsp.toShort());
if (fsStatus.getPermission().toShort() != fsp.toShort()) {
LOG.info("Explicitly setting permissions to : " + fsp.toShort()
+ ", " + fsp);
fc.setPermission(path, fsp);
}
} catch (FileAlreadyExistsException e) {
LOG.info("Directory: [" + path + "] already exists.");
}
}
}
@VisibleForTesting
protected static List<FileStatus> scanDirectory(Path path, FileContext fc,
PathFilter pathFilter) throws IOException {
path = fc.makeQualified(path);
List<FileStatus> jhStatusList = new ArrayList<FileStatus>();
try {
RemoteIterator<FileStatus> fileStatusIter = fc.listStatus(path);
while (fileStatusIter.hasNext()) {
FileStatus fileStatus = fileStatusIter.next();
Path filePath = fileStatus.getPath();
if (fileStatus.isFile() && pathFilter.accept(filePath)) {
jhStatusList.add(fileStatus);
}
}
} catch (FileNotFoundException fe) {
LOG.error("Error while scanning directory " + path, fe);
}
return jhStatusList;
}
@Before
public void setup() throws Exception {
conf = new HdfsConfiguration();
fsHelper = new FileSystemTestHelper();
// Set up java key store
String testRoot = fsHelper.getTestRootDir();
testRootDir = new File(testRoot).getAbsoluteFile();
conf.set(DFSConfigKeys.DFS_ENCRYPTION_KEY_PROVIDER_URI, getKeyProviderURI());
conf.setBoolean(DFSConfigKeys.DFS_NAMENODE_DELEGATION_TOKEN_ALWAYS_USE_KEY, true);
// Lower the batch size for testing
conf.setInt(DFSConfigKeys.DFS_NAMENODE_LIST_ENCRYPTION_ZONES_NUM_RESPONSES,
2);
cluster = new MiniDFSCluster.Builder(conf).numDataNodes(1).build();
Logger.getLogger(EncryptionZoneManager.class).setLevel(Level.TRACE);
fs = cluster.getFileSystem();
fsWrapper = new FileSystemTestWrapper(fs);
fcWrapper = new FileContextTestWrapper(
FileContext.getFileContext(cluster.getURI(), conf));
dfsAdmin = new HdfsAdmin(cluster.getURI(), conf);
setProvider();
// Create a test key
DFSTestUtil.createKey(TEST_KEY, cluster, conf);
}
private void cleanUpLocalDir(FileContext lfs, DeletionService del,
String localDir) {
long currentTimeStamp = System.currentTimeMillis();
renameLocalDir(lfs, localDir, ContainerLocalizer.USERCACHE,
currentTimeStamp);
renameLocalDir(lfs, localDir, ContainerLocalizer.FILECACHE,
currentTimeStamp);
renameLocalDir(lfs, localDir, ResourceLocalizationService.NM_PRIVATE_DIR,
currentTimeStamp);
try {
deleteLocalDir(lfs, del, localDir);
} catch (IOException e) {
// Do nothing, just give the warning
LOG.warn("Failed to delete localDir: " + localDir);
}
}
@Test
public void testApplication() throws IOException, Exception
{
try {
FileContext.getLocalFSFileContext().delete(new Path(new File(testMeta.dir).getAbsolutePath()), true);
int cnt = 7;
createAvroInput(cnt);
writeAvroFile(new File(FILENAME));
createAvroInput(cnt - 2);
writeAvroFile(new File(OTHER_FILE));
avroFileInput.setDirectory(testMeta.dir);
LocalMode lma = LocalMode.newInstance();
Configuration conf = new Configuration(false);
AvroReaderApplication avroReaderApplication = new AvroReaderApplication();
avroReaderApplication.setAvroFileInputOperator(avroFileInput);
lma.prepareDAG(avroReaderApplication, conf);
LocalMode.Controller lc = lma.getController();
lc.run(10000);// runs for 10 seconds and quits
} catch (ConstraintViolationException e) {
Assert.fail("constraint violations: " + e.getConstraintViolations());
}
}
@Before
public void setup() throws Exception {
conf = new HdfsConfiguration();
fsHelper = new FileSystemTestHelper();
// Set up java key store
String testRoot = fsHelper.getTestRootDir();
File testRootDir = new File(testRoot).getAbsoluteFile();
final Path jksPath = new Path(testRootDir.toString(), "test.jks");
conf.set(DFSConfigKeys.DFS_ENCRYPTION_KEY_PROVIDER_URI,
JavaKeyStoreProvider.SCHEME_NAME + "://file" + jksPath.toUri()
);
cluster = new MiniDFSCluster.Builder(conf).numDataNodes(1).build();
Logger.getLogger(EncryptionZoneManager.class).setLevel(Level.TRACE);
fs = cluster.getFileSystem();
fsWrapper = new FileSystemTestWrapper(cluster.getFileSystem());
fcWrapper = new FileContextTestWrapper(
FileContext.getFileContext(cluster.getURI(), conf));
dfsAdmin = new HdfsAdmin(cluster.getURI(), conf);
// Need to set the client's KeyProvider to the NN's for JKS,
// else the updates do not get flushed properly
fs.getClient().setKeyProvider(cluster.getNameNode().getNamesystem()
.getProvider());
DFSTestUtil.createKey(TEST_KEY, cluster, conf);
}
private Configuration readLaunchConfiguration() throws IOException
{
Path appPath = new Path(appContext.getApplicationPath());
Path configFilePath = new Path(appPath, LogicalPlan.LAUNCH_CONFIG_FILE_NAME);
try {
LOG.debug("Reading launch configuration file ");
URI uri = appPath.toUri();
Configuration config = new YarnConfiguration();
fileContext = uri.getScheme() == null ? FileContext.getFileContext(config) : FileContext.getFileContext(uri, config);
FSDataInputStream is = fileContext.open(configFilePath);
config.addResource(is);
LOG.debug("Read launch configuration");
return config;
} catch (FileNotFoundException ex) {
LOG.warn("Configuration file not found {}", configFilePath);
return new Configuration();
}
}
@Test
@SuppressWarnings("unchecked") // mocked generics
public void testContainerLocalizerClosesFilesystems() throws Exception {
// verify filesystems are closed when localizer doesn't fail
FileContext fs = FileContext.getLocalFSFileContext();
spylfs = spy(fs.getDefaultFileSystem());
ContainerLocalizer localizer = setupContainerLocalizerForTest();
doNothing().when(localizer).localizeFiles(any(LocalizationProtocol.class),
any(CompletionService.class), any(UserGroupInformation.class));
verify(localizer, never()).closeFileSystems(
any(UserGroupInformation.class));
localizer.runLocalization(nmAddr);
verify(localizer).closeFileSystems(any(UserGroupInformation.class));
spylfs = spy(fs.getDefaultFileSystem());
// verify filesystems are closed when localizer fails
localizer = setupContainerLocalizerForTest();
doThrow(new YarnRuntimeException("Forced Failure")).when(localizer).localizeFiles(
any(LocalizationProtocol.class), any(CompletionService.class),
any(UserGroupInformation.class));
verify(localizer, never()).closeFileSystems(
any(UserGroupInformation.class));
localizer.runLocalization(nmAddr);
verify(localizer).closeFileSystems(any(UserGroupInformation.class));
}
private void doReadTargetNotReadable() throws Exception {
try {
user.doAs(new PrivilegedExceptionAction<Object>() {
@Override
public Object run() throws IOException {
FileContext myfc = FileContext.getFileContext(conf);
myfc.open(link).read();
return null;
}
});
fail("Read link target even though target does not have"
+ " read permissions!");
} catch (IOException e) {
GenericTestUtils.assertExceptionContains("Permission denied", e);
}
}
@After
public void tearDown() throws IOException {
if (yarnCluster != null) {
try {
yarnCluster.stop();
} finally {
yarnCluster = null;
}
}
FileContext fsContext = FileContext.getLocalFSFileContext();
fsContext
.delete(
new Path(conf
.get("yarn.timeline-service.leveldb-timeline-store.path")),
true);
}
private void doDeleteLinkParentNotWritable() throws Exception {
// Try to delete where the symlink's parent dir is not writable
try {
user.doAs(new PrivilegedExceptionAction<Object>() {
@Override
public Object run() throws IOException {
FileContext myfc = FileContext.getFileContext(conf);
myfc.delete(link, false);
return null;
}
});
fail("Deleted symlink without write permissions on parent!");
} catch (AccessControlException e) {
GenericTestUtils.assertExceptionContains("Permission denied", e);
}
}
private void doDeleteLinkParentNotWritable() throws Exception {
// Try to delete where the symlink's parent dir is not writable
try {
user.doAs(new PrivilegedExceptionAction<Object>() {
@Override
public Object run() throws IOException {
FileContext myfc = FileContext.getFileContext(conf);
myfc.delete(link, false);
return null;
}
});
fail("Deleted symlink without write permissions on parent!");
} catch (AccessControlException e) {
GenericTestUtils.assertExceptionContains("Permission denied", e);
}
}
/** Parse the command line arguments and initialize the data */
private int init(String[] args) {
try { // initialize file system handle
fc = FileContext.getFileContext(getConf());
} catch (IOException ioe) {
System.err.println("Can not initialize the file system: " +
ioe.getLocalizedMessage());
return -1;
}
for (int i = 0; i < args.length; i++) { // parse command line
if (args[i].equals("-root")) {
root = new Path(args[++i]);
} else if (args[i].equals("-inDir")) {
inDir = new File(args[++i]);
} else {
System.err.println(USAGE);
ToolRunner.printGenericCommandUsage(System.err);
System.exit(-1);
}
}
return 0;
}
@Override
protected synchronized void serviceStop() throws Exception {
if (resourceManagers[index] != null) {
waitForAppMastersToFinish(5000);
resourceManagers[index].stop();
}
if (Shell.WINDOWS) {
// On Windows, clean up the short temporary symlink that was created to
// work around path length limitation.
String testWorkDirPath = testWorkDir.getAbsolutePath();
try {
FileContext.getLocalFSFileContext().delete(new Path(testWorkDirPath),
true);
} catch (IOException e) {
LOG.warn("could not cleanup symlink: " +
testWorkDir.getAbsolutePath());
}
}
super.serviceStop();
}
@Test
public void testContainerLaunch() throws Exception {
Assume.assumeTrue(shouldRun());
String expectedRunAsUser =
conf.get(YarnConfiguration.NM_NONSECURE_MODE_LOCAL_USER_KEY,
YarnConfiguration.DEFAULT_NM_NONSECURE_MODE_LOCAL_USER);
File touchFile = new File(workSpace, "touch-file");
int ret = runAndBlock("touch", touchFile.getAbsolutePath());
assertEquals(0, ret);
FileStatus fileStatus =
FileContext.getLocalFSFileContext().getFileStatus(
new Path(touchFile.getAbsolutePath()));
assertEquals(expectedRunAsUser, fileStatus.getOwner());
cleanupAppFiles(expectedRunAsUser);
}
@Override
protected void finished(Description description)
{
try {
FileContext.getLocalFSFileContext()
.delete(new Path(new File("target/" + description.getClassName()).getAbsolutePath()), true);
} catch (IOException e) {
throw new RuntimeException(e);
}
}
@Test
public void testList() throws IOException {
FileStatus fs = fc.getFileStatus(new Path("/"));
Assert.assertTrue(fs.isDirectory());
// should return the full path not the chrooted path
Assert.assertEquals(fs.getPath(), chrootedTo);
// list on Slash
FileStatus[] dirPaths = fc.util().listStatus(new Path("/"));
Assert.assertEquals(0, dirPaths.length);
fileContextTestHelper.createFileNonRecursive(fc, "/foo");
fileContextTestHelper.createFileNonRecursive(fc, "/bar");
fc.mkdir(new Path("/dirX"), FileContext.DEFAULT_PERM, false);
fc.mkdir(fileContextTestHelper.getTestRootPath(fc, "/dirY"),
FileContext.DEFAULT_PERM, false);
fc.mkdir(new Path("/dirX/dirXX"), FileContext.DEFAULT_PERM, false);
dirPaths = fc.util().listStatus(new Path("/"));
Assert.assertEquals(4, dirPaths.length);
// Note the the file status paths are the full paths on target
fs = fileContextTestHelper.containsPath(fcTarget, "foo", dirPaths);
Assert.assertNotNull(fs);
Assert.assertTrue(fs.isFile());
fs = fileContextTestHelper.containsPath(fcTarget, "bar", dirPaths);
Assert.assertNotNull(fs);
Assert.assertTrue(fs.isFile());
fs = fileContextTestHelper.containsPath(fcTarget, "dirX", dirPaths);
Assert.assertNotNull(fs);
Assert.assertTrue(fs.isDirectory());
fs = fileContextTestHelper.containsPath(fcTarget, "dirY", dirPaths);
Assert.assertNotNull(fs);
Assert.assertTrue(fs.isDirectory());
}
@After
public void tearDown() throws Exception {
if (testRootDir.exists()) {
FileContext.getLocalFSFileContext().delete(
new Path(testRootDir.getAbsolutePath()), true);
}
}
private static void writeFile(FileContext fc, Path name, int fileSize)
throws IOException {
// Create and write a file that contains three blocks of data
FSDataOutputStream stm = fc.create(name, EnumSet.of(CreateFlag.CREATE),
Options.CreateOpts.createParent());
byte[] buffer = new byte[fileSize];
Random rand = new Random(seed);
rand.nextBytes(buffer);
stm.write(buffer);
stm.close();
}
public AvroFileReader(FileSystem fs, Path filePath, Map<String, Object> config) throws IOException {
super(fs, filePath, new GenericRecordToStruct(), config);
AvroFSInput input = new AvroFSInput(FileContext.getFileContext(filePath.toUri()), filePath);
if (this.schema == null) {
this.reader = new DataFileReader<>(input, new SpecificDatumReader<>());
} else {
this.reader = new DataFileReader<>(input, new SpecificDatumReader<>(this.schema));
}
this.closed = false;
}
@Test
public void testMultipleFileAvroReads() throws Exception
{
FileContext.getLocalFSFileContext().delete(new Path(new File(testMeta.dir).getAbsolutePath()), true);
int cnt = 7;
createAvroInput(cnt);
writeAvroFile(new File(FILENAME));
writeAvroFile(new File(OTHER_FILE));
avroFileInput.output.setSink(output);
avroFileInput.completedFilesPort.setSink(completedFilesPort);
avroFileInput.errorRecordsPort.setSink(errorRecordsPort);
avroFileInput.setDirectory(testMeta.dir);
avroFileInput.setup(testMeta.context);
avroFileInput.beginWindow(0);
avroFileInput.emitTuples();
avroFileInput.beginWindow(1);
avroFileInput.emitTuples();
Assert.assertEquals("number tuples after window 0", cnt, output.collectedTuples.size());
avroFileInput.emitTuples();
avroFileInput.endWindow();
Assert.assertEquals("Error tuples", 0, errorRecordsPort.collectedTuples.size());
Assert.assertEquals("number tuples after window 1", 2 * cnt, output.collectedTuples.size());
Assert.assertEquals("Completed File", 2, completedFilesPort.collectedTuples.size());
avroFileInput.teardown();
}
public LogReader(Configuration conf, Path remoteAppLogFile)
throws IOException {
FileContext fileContext =
FileContext.getFileContext(remoteAppLogFile.toUri(), conf);
this.fsDataIStream = fileContext.open(remoteAppLogFile);
reader =
new TFile.Reader(this.fsDataIStream, fileContext.getFileStatus(
remoteAppLogFile).getLen(), conf);
this.scanner = reader.createScanner();
}
/** Main function called by tool runner.
* It first initializes data by parsing the command line arguments.
* It then calls the loadGenerator
*/
@Override
public int run(String[] args) throws Exception {
int exitCode = parseArgsMR(args);
if (exitCode != 0) {
return exitCode;
}
System.out.println("Running LoadGeneratorMR against fileSystem: " +
FileContext.getFileContext().getDefaultFileSystem().getUri());
return submitAsMapReduce(); // reducer will print the results
}
@BeforeClass
public static void clusterSetupAtBegining() throws IOException,
LoginException, URISyntaxException {
SupportsBlocks = true;
CONF.setBoolean(
DFSConfigKeys.DFS_NAMENODE_DELEGATION_TOKEN_ALWAYS_USE_KEY, true);
cluster = new MiniDFSCluster.Builder(CONF).numDataNodes(2).build();
cluster.waitClusterUp();
fc = FileContext.getFileContext(cluster.getURI(0), CONF);
Path defaultWorkingDirectory = fc.makeQualified( new Path("/user/" +
UserGroupInformation.getCurrentUser().getShortUserName()));
fc.mkdir(defaultWorkingDirectory, FileContext.DEFAULT_PERM, true);
}
@Before
public void setUp() throws Exception {
resourceMgrDelegate = mock(ResourceMgrDelegate.class);
conf = new YarnConfiguration();
conf.set(YarnConfiguration.RM_PRINCIPAL, "mapred/[email protected]");
clientCache = new ClientCache(conf, resourceMgrDelegate);
clientCache = spy(clientCache);
yarnRunner = new YARNRunner(conf, resourceMgrDelegate, clientCache);
yarnRunner = spy(yarnRunner);
submissionContext = mock(ApplicationSubmissionContext.class);
doAnswer(
new Answer<ApplicationSubmissionContext>() {
@Override
public ApplicationSubmissionContext answer(InvocationOnMock invocation)
throws Throwable {
return submissionContext;
}
}
).when(yarnRunner).createApplicationSubmissionContext(any(Configuration.class),
any(String.class), any(Credentials.class));
appId = ApplicationId.newInstance(System.currentTimeMillis(), 1);
jobId = TypeConverter.fromYarn(appId);
if (testWorkDir.exists()) {
FileContext.getLocalFSFileContext().delete(new Path(testWorkDir.toString()), true);
}
testWorkDir.mkdirs();
}
@Test
public void testFinalizeWithDelete() throws IOException
{
testMeta.fsWAL.setMaxLength(2 * 1024);
testMeta.fsWAL.setup();
FileSystemWAL.FileSystemWALWriter fsWALWriter = testMeta.fsWAL.getWriter();
write1KRecords(fsWALWriter, 2);
testMeta.fsWAL.beforeCheckpoint(0);
write1KRecords(fsWALWriter, 2);
testMeta.fsWAL.beforeCheckpoint(1);
write1KRecords(fsWALWriter, 2);
testMeta.fsWAL.beforeCheckpoint(2);
FileSystemWAL.FileSystemWALReader fsWALReader = testMeta.fsWAL.getReader();
assertNumTuplesRead(fsWALReader, 6);
testMeta.fsWAL.committed(0);
fsWALWriter.delete(new FileSystemWAL.FileSystemWALPointer(2, 0));
FileContext fileContext = FileContextUtils.getFileContext(testMeta.fsWAL.getFilePath());
Assert.assertTrue("part 0 exists ", !fileContext.util().exists(new Path(testMeta.fsWAL.getPartFilePath(0))));
testMeta.fsWAL.committed(1);
Assert.assertTrue("part 1 exists ", !fileContext.util().exists(new Path(testMeta.fsWAL.getPartFilePath(1))));
fsWALReader.seek(fsWALReader.getStartPointer());
assertNumTuplesRead(fsWALReader, 2);
}