下面列出了org.apache.hadoop.fs.Path#WINDOWS 实例代码,或者点击链接到github查看源代码,也可以在右侧发表评论。
@Override
protected List<PathData> expandArgument(String arg) throws IOException {
List<PathData> items = new LinkedList<PathData>();
if (arg.equals("-")) {
readStdin = true;
} else {
try {
items.add(new PathData(new URI(arg), getConf()));
} catch (URISyntaxException e) {
if (Path.WINDOWS) {
// Unlike URI, PathData knows how to parse Windows drive-letter paths.
items.add(new PathData(arg, getConf()));
} else {
throw new IOException("Unexpected URISyntaxException: " + e.toString());
}
}
}
return items;
}
@Override
protected List<PathData> expandArgument(String arg) throws IOException {
List<PathData> items = new LinkedList<PathData>();
if (arg.equals("-")) {
readStdin = true;
} else {
try {
items.add(new PathData(new URI(arg), getConf()));
} catch (URISyntaxException e) {
if (Path.WINDOWS) {
// Unlike URI, PathData knows how to parse Windows drive-letter paths.
items.add(new PathData(arg, getConf()));
} else {
throw new IOException("Unexpected URISyntaxException: " + e.toString());
}
}
}
return items;
}
private static void checkStat(File f, String owner, String group,
String expectedOwner,
String expectedGroup) throws IOException {
boolean success = true;
if (expectedOwner != null &&
!expectedOwner.equals(owner)) {
if (Path.WINDOWS) {
UserGroupInformation ugi =
UserGroupInformation.createRemoteUser(expectedOwner);
final String adminsGroupString = "Administrators";
success = owner.equals(adminsGroupString)
&& Arrays.asList(ugi.getGroupNames()).contains(adminsGroupString);
} else {
success = false;
}
}
if (!success) {
throw new IOException(
"Owner '" + owner + "' for path " + f + " did not match " +
"expected owner '" + expectedOwner + "'");
}
}
@Test (timeout = 5000)
public void testInvalidWindowsPath() throws Exception {
if (!Path.WINDOWS) {
return;
}
// Verify that the following invalid paths are rejected.
String [] winPaths = {
"N:\\foo/bar"
};
for (String path : winPaths) {
try {
PathData item = new PathData(path, conf);
fail("Did not throw for invalid path " + path);
} catch (IOException ioe) {
}
}
}
@Test (timeout = 30000)
public void testFstat() throws Exception {
FileOutputStream fos = new FileOutputStream(
new File(TEST_DIR, "testfstat"));
NativeIO.POSIX.Stat stat = NativeIO.POSIX.getFstat(fos.getFD());
fos.close();
LOG.info("Stat: " + String.valueOf(stat));
String owner = stat.getOwner();
String expectedOwner = System.getProperty("user.name");
if (Path.WINDOWS) {
UserGroupInformation ugi =
UserGroupInformation.createRemoteUser(expectedOwner);
final String adminsGroupString = "Administrators";
if (Arrays.asList(ugi.getGroupNames()).contains(adminsGroupString)) {
expectedOwner = adminsGroupString;
}
}
assertEquals(expectedOwner, owner);
assertNotNull(stat.getGroup());
assertTrue(!stat.getGroup().isEmpty());
assertEquals("Stat mode field should indicate a regular file",
NativeIO.POSIX.Stat.S_IFREG,
stat.getMode() & NativeIO.POSIX.Stat.S_IFMT);
}
@Test (timeout = 30000)
public void testOpenMissingWithoutCreate() throws Exception {
if (Path.WINDOWS) {
return;
}
LOG.info("Open a missing file without O_CREAT and it should fail");
try {
FileDescriptor fd = NativeIO.POSIX.open(
new File(TEST_DIR, "doesntexist").getAbsolutePath(),
NativeIO.POSIX.O_WRONLY, 0700);
fail("Able to open a new file without O_CREAT");
} catch (NativeIOException nioe) {
LOG.info("Got expected exception", nioe);
assertEquals(Errno.ENOENT, nioe.getErrno());
}
}
private static void checkStat(File f, String owner, String group,
String expectedOwner,
String expectedGroup) throws IOException {
boolean success = true;
if (expectedOwner != null &&
!expectedOwner.equals(owner)) {
if (Path.WINDOWS) {
UserGroupInformation ugi =
UserGroupInformation.createRemoteUser(expectedOwner);
final String adminsGroupString = "Administrators";
success = owner.equals(adminsGroupString)
&& Arrays.asList(ugi.getGroupNames()).contains(adminsGroupString);
} else {
success = false;
}
}
if (!success) {
throw new IOException(
"Owner '" + owner + "' for path " + f + " did not match " +
"expected owner '" + expectedOwner + "'");
}
}
/** Normalize the given Windows path string. This does the following:
* 1. Adds "file:" scheme for absolute paths.
* 2. Ensures the scheme-specific part starts with '/' per RFC2396.
* 3. Replaces backslash path separators with forward slashes.
* @param pathString Path string supplied by the user.
* @return normalized absolute path string. Returns the input string
* if it is not a Windows absolute path.
*/
private static String normalizeWindowsPath(String pathString)
throws IOException
{
if (!Path.WINDOWS) {
return pathString;
}
boolean slashed =
((pathString.length() >= 1) && (pathString.charAt(0) == '/'));
// Is it a backslash-separated absolute path?
if (windowsNonUriAbsolutePath1.matcher(pathString).find()) {
// Forward slashes disallowed in a backslash-separated path.
if (pathString.indexOf('/') != -1) {
throw new IOException("Invalid path string " + pathString);
}
pathString = pathString.replace('\\', '/');
return "file:" + (slashed ? "" : "/") + pathString;
}
// Is it a forward slash-separated absolute path?
if (windowsNonUriAbsolutePath2.matcher(pathString).find()) {
return "file:" + (slashed ? "" : "/") + pathString;
}
// Is it a backslash-separated relative file path (no scheme and
// no drive-letter specifier)?
if ((pathString.indexOf(':') == -1) && (pathString.indexOf('\\') != -1)) {
pathString = pathString.replace('\\', '/');
}
return pathString;
}
/**
* The last arg is expected to be a local path, if only one argument is
* given then the destination will be the current directory
* @param args is the list of arguments
*/
protected void getLocalDestination(LinkedList<String> args)
throws IOException {
String pathString = (args.size() < 2) ? Path.CUR_DIR : args.removeLast();
try {
dst = new PathData(new URI(pathString), getConf());
} catch (URISyntaxException e) {
if (Path.WINDOWS) {
// Unlike URI, PathData knows how to parse Windows drive-letter paths.
dst = new PathData(pathString, getConf());
} else {
throw new IOException("unexpected URISyntaxException", e);
}
}
}
@Test (timeout = 30000)
public void testGetGroupName() throws IOException {
if (Path.WINDOWS) {
return;
}
assertFalse(NativeIO.POSIX.getGroupName(0).isEmpty());
}
@Override
protected List<PathData> expandArgument(String arg) throws IOException {
List<PathData> items = new LinkedList<PathData>();
try {
items.add(new PathData(new URI(arg), getConf()));
} catch (URISyntaxException e) {
if (Path.WINDOWS) {
// Unlike URI, PathData knows how to parse Windows drive-letter paths.
items.add(new PathData(arg, getConf()));
} else {
throw new IOException("unexpected URISyntaxException", e);
}
}
return items;
}
@Test (timeout = 5000)
public void testToFileRawWindowsPaths() throws Exception {
if (!Path.WINDOWS) {
return;
}
// Can we handle raw Windows paths? The files need not exist for
// these tests to succeed.
String[] winPaths = {
"n:\\",
"N:\\",
"N:\\foo",
"N:\\foo\\bar",
"N:/",
"N:/foo",
"N:/foo/bar"
};
PathData item;
for (String path : winPaths) {
item = new PathData(path, conf);
assertEquals(new File(path), item.toFile());
}
item = new PathData("foo\\bar", conf);
assertEquals(new File(testDir + "\\foo\\bar"), item.toFile());
}
@Test (timeout = 30000)
public void testOpenWithCreate() throws Exception {
if (Path.WINDOWS) {
return;
}
LOG.info("Test creating a file with O_CREAT");
FileDescriptor fd = NativeIO.POSIX.open(
new File(TEST_DIR, "testWorkingOpen").getAbsolutePath(),
NativeIO.POSIX.O_WRONLY | NativeIO.POSIX.O_CREAT, 0700);
assertNotNull(true);
assertTrue(fd.valid());
FileOutputStream fos = new FileOutputStream(fd);
fos.write("foo".getBytes());
fos.close();
assertFalse(fd.valid());
LOG.info("Test exclusive create");
try {
fd = NativeIO.POSIX.open(
new File(TEST_DIR, "testWorkingOpen").getAbsolutePath(),
NativeIO.POSIX.O_WRONLY | NativeIO.POSIX.O_CREAT | NativeIO.POSIX.O_EXCL, 0700);
fail("Was able to create existing file with O_EXCL");
} catch (NativeIOException nioe) {
LOG.info("Got expected exception for failed exclusive create", nioe);
assertEquals(Errno.EEXIST, nioe.getErrno());
}
}
@Test(timeout=10000)
public void testContainerLogsFileAccess() throws IOException {
// This test will run only if NativeIO is enabled as SecureIOUtils
// require it to be enabled.
Assume.assumeTrue(NativeIO.isAvailable());
Configuration conf = new Configuration();
conf.set(CommonConfigurationKeysPublic.HADOOP_SECURITY_AUTHENTICATION,
"kerberos");
UserGroupInformation.setConfiguration(conf);
File workDir = new File(testWorkDir, "testContainerLogsFileAccess1");
Path remoteAppLogFile =
new Path(workDir.getAbsolutePath(), "aggregatedLogFile");
Path srcFileRoot = new Path(workDir.getAbsolutePath(), "srcFiles");
String data = "Log File content for container : ";
// Creating files for container1. Log aggregator will try to read log files
// with illegal user.
ApplicationId applicationId = ApplicationId.newInstance(1, 1);
ApplicationAttemptId applicationAttemptId =
ApplicationAttemptId.newInstance(applicationId, 1);
ContainerId testContainerId1 =
ContainerId.newContainerId(applicationAttemptId, 1);
Path appDir =
new Path(srcFileRoot, testContainerId1.getApplicationAttemptId()
.getApplicationId().toString());
Path srcFilePath1 = new Path(appDir, testContainerId1.toString());
String stdout = "stdout";
String stderr = "stderr";
writeSrcFile(srcFilePath1, stdout, data + testContainerId1.toString()
+ stdout);
writeSrcFile(srcFilePath1, stderr, data + testContainerId1.toString()
+ stderr);
UserGroupInformation ugi =
UserGroupInformation.getCurrentUser();
LogWriter logWriter = new LogWriter(conf, remoteAppLogFile, ugi);
LogKey logKey = new LogKey(testContainerId1);
String randomUser = "randomUser";
LogValue logValue =
spy(new LogValue(Collections.singletonList(srcFileRoot.toString()),
testContainerId1, randomUser));
// It is trying simulate a situation where first log file is owned by
// different user (probably symlink) and second one by the user itself.
// The first file should not be aggregated. Because this log file has the invalid
// user name.
when(logValue.getUser()).thenReturn(randomUser).thenReturn(
ugi.getShortUserName());
logWriter.append(logKey, logValue);
logWriter.close();
BufferedReader in =
new BufferedReader(new FileReader(new File(remoteAppLogFile
.toUri().getRawPath())));
String line;
StringBuffer sb = new StringBuffer("");
while ((line = in.readLine()) != null) {
LOG.info(line);
sb.append(line);
}
line = sb.toString();
String expectedOwner = ugi.getShortUserName();
if (Path.WINDOWS) {
final String adminsGroupString = "Administrators";
if (Arrays.asList(ugi.getGroupNames()).contains(adminsGroupString)) {
expectedOwner = adminsGroupString;
}
}
// This file: stderr should not be aggregated.
// And we will not aggregate the log message.
String stdoutFile1 =
StringUtils.join(
File.separator,
Arrays.asList(new String[] {
workDir.getAbsolutePath(), "srcFiles",
testContainerId1.getApplicationAttemptId().getApplicationId()
.toString(), testContainerId1.toString(), stderr }));
// The file: stdout is expected to be aggregated.
String stdoutFile2 =
StringUtils.join(
File.separator,
Arrays.asList(new String[] {
workDir.getAbsolutePath(), "srcFiles",
testContainerId1.getApplicationAttemptId().getApplicationId()
.toString(), testContainerId1.toString(), stdout }));
String message2 =
"Owner '" + expectedOwner + "' for path "
+ stdoutFile2 + " did not match expected owner '"
+ ugi.getShortUserName() + "'";
Assert.assertFalse(line.contains(message2));
Assert.assertFalse(line.contains(data + testContainerId1.toString()
+ stderr));
Assert.assertTrue(line.contains(data + testContainerId1.toString()
+ stdout));
}
@Override
public void serviceInit(Configuration conf) throws Exception {
conf.set(MRConfig.FRAMEWORK_NAME, MRConfig.YARN_FRAMEWORK_NAME);
if (conf.get(MRJobConfig.MR_AM_STAGING_DIR) == null) {
conf.set(MRJobConfig.MR_AM_STAGING_DIR, new File(getTestWorkDir(),
"apps_staging_dir/").getAbsolutePath());
}
// By default, VMEM monitoring disabled, PMEM monitoring enabled.
if (!conf.getBoolean(
MRConfig.MAPREDUCE_MINICLUSTER_CONTROL_RESOURCE_MONITORING,
MRConfig.DEFAULT_MAPREDUCE_MINICLUSTER_CONTROL_RESOURCE_MONITORING)) {
conf.setBoolean(YarnConfiguration.NM_PMEM_CHECK_ENABLED, false);
conf.setBoolean(YarnConfiguration.NM_VMEM_CHECK_ENABLED, false);
}
conf.set(CommonConfigurationKeys.FS_PERMISSIONS_UMASK_KEY, "000");
try {
Path stagingPath = FileContext.getFileContext(conf).makeQualified(
new Path(conf.get(MRJobConfig.MR_AM_STAGING_DIR)));
/*
* Re-configure the staging path on Windows if the file system is localFs.
* We need to use a absolute path that contains the drive letter. The unit
* test could run on a different drive than the AM. We can run into the
* issue that job files are localized to the drive where the test runs on,
* while the AM starts on a different drive and fails to find the job
* metafiles. Using absolute path can avoid this ambiguity.
*/
if (Path.WINDOWS) {
if (LocalFileSystem.class.isInstance(stagingPath.getFileSystem(conf))) {
conf.set(MRJobConfig.MR_AM_STAGING_DIR,
new File(conf.get(MRJobConfig.MR_AM_STAGING_DIR))
.getAbsolutePath());
}
}
FileContext fc=FileContext.getFileContext(stagingPath.toUri(), conf);
if (fc.util().exists(stagingPath)) {
LOG.info(stagingPath + " exists! deleting...");
fc.delete(stagingPath, true);
}
LOG.info("mkdir: " + stagingPath);
//mkdir the staging directory so that right permissions are set while running as proxy user
fc.mkdir(stagingPath, null, true);
//mkdir done directory as well
String doneDir = JobHistoryUtils.getConfiguredHistoryServerDoneDirPrefix(conf);
Path doneDirPath = fc.makeQualified(new Path(doneDir));
fc.mkdir(doneDirPath, null, true);
} catch (IOException e) {
throw new YarnRuntimeException("Could not create staging directory. ", e);
}
conf.set(MRConfig.MASTER_ADDRESS, "test"); // The default is local because of
// which shuffle doesn't happen
//configure the shuffle service in NM
conf.setStrings(YarnConfiguration.NM_AUX_SERVICES,
new String[] { ShuffleHandler.MAPREDUCE_SHUFFLE_SERVICEID });
conf.setClass(String.format(YarnConfiguration.NM_AUX_SERVICE_FMT,
ShuffleHandler.MAPREDUCE_SHUFFLE_SERVICEID), ShuffleHandler.class,
Service.class);
// Non-standard shuffle port
conf.setInt(ShuffleHandler.SHUFFLE_PORT_CONFIG_KEY, 0);
conf.setClass(YarnConfiguration.NM_CONTAINER_EXECUTOR,
DefaultContainerExecutor.class, ContainerExecutor.class);
// TestMRJobs is for testing non-uberized operation only; see TestUberAM
// for corresponding uberized tests.
conf.setBoolean(MRJobConfig.JOB_UBERTASK_ENABLE, false);
super.serviceInit(conf);
}
/**
* This method starts the data node with the specified conf.
*
* @param conf - the configuration
* if conf's CONFIG_PROPERTY_SIMULATED property is set
* then a simulated storage based data node is created.
*
* @param dataDirs - only for a non-simulated storage data node
* @throws IOException
*/
void startDataNode(Configuration conf,
List<StorageLocation> dataDirs,
SecureResources resources
) throws IOException {
// settings global for all BPs in the Data Node
this.secureResources = resources;
synchronized (this) {
this.dataDirs = dataDirs;
}
this.conf = conf;
this.dnConf = new DNConf(conf);
checkSecureConfig(dnConf, conf, resources);
this.spanReceiverHost =
SpanReceiverHost.get(conf, DFSConfigKeys.DFS_SERVER_HTRACE_PREFIX);
if (dnConf.maxLockedMemory > 0) {
if (!NativeIO.POSIX.getCacheManipulator().verifyCanMlock()) {
throw new RuntimeException(String.format(
"Cannot start datanode because the configured max locked memory" +
" size (%s) is greater than zero and native code is not available.",
DFS_DATANODE_MAX_LOCKED_MEMORY_KEY));
}
if (Path.WINDOWS) {
NativeIO.Windows.extendWorkingSetSize(dnConf.maxLockedMemory);
} else {
long ulimit = NativeIO.POSIX.getCacheManipulator().getMemlockLimit();
if (dnConf.maxLockedMemory > ulimit) {
throw new RuntimeException(String.format(
"Cannot start datanode because the configured max locked memory" +
" size (%s) of %d bytes is more than the datanode's available" +
" RLIMIT_MEMLOCK ulimit of %d bytes.",
DFS_DATANODE_MAX_LOCKED_MEMORY_KEY,
dnConf.maxLockedMemory,
ulimit));
}
}
}
LOG.info("Starting DataNode with maxLockedMemory = " +
dnConf.maxLockedMemory);
storage = new DataStorage();
// global DN settings
registerMXBean();
initDataXceiver(conf);
startInfoServer(conf);
pauseMonitor = new JvmPauseMonitor(conf);
pauseMonitor.start();
// BlockPoolTokenSecretManager is required to create ipc server.
this.blockPoolTokenSecretManager = new BlockPoolTokenSecretManager();
// Login is done by now. Set the DN user name.
dnUserName = UserGroupInformation.getCurrentUser().getShortUserName();
LOG.info("dnUserName = " + dnUserName);
LOG.info("supergroup = " + supergroup);
initIpcServer(conf);
metrics = DataNodeMetrics.create(conf, getDisplayName());
metrics.getJvmMetrics().setPauseMonitor(pauseMonitor);
blockPoolManager = new BlockPoolManager(this);
blockPoolManager.refreshNamenodes(conf);
// Create the ReadaheadPool from the DataNode context so we can
// exit without having to explicitly shutdown its thread pool.
readaheadPool = ReadaheadPool.getInstance();
saslClient = new SaslDataTransferClient(dnConf.conf,
dnConf.saslPropsResolver, dnConf.trustedChannelResolver);
saslServer = new SaslDataTransferServer(dnConf, blockPoolTokenSecretManager);
}
/**
* Attempts to acquire an exclusive lock on the storage.
*
* @return A lock object representing the newly-acquired lock or
* <code>null</code> if storage is already locked.
* @throws IOException if locking fails.
*/
@SuppressWarnings("resource")
FileLock tryLock() throws IOException {
boolean deletionHookAdded = false;
File lockF = new File(root, STORAGE_FILE_LOCK);
if (!lockF.exists()) {
lockF.deleteOnExit();
deletionHookAdded = true;
}
RandomAccessFile file = new RandomAccessFile(lockF, "rws");
String jvmName = ManagementFactory.getRuntimeMXBean().getName();
FileLock res = null;
try {
res = file.getChannel().tryLock();
if (null == res) {
throw new OverlappingFileLockException();
}
file.write(jvmName.getBytes(Charsets.UTF_8));
LOG.info("Lock on " + lockF + " acquired by nodename " + jvmName);
} catch(OverlappingFileLockException oe) {
// Cannot read from the locked file on Windows.
String lockingJvmName = Path.WINDOWS ? "" : (" " + file.readLine());
LOG.error("It appears that another node " + lockingJvmName
+ " has already locked the storage directory: " + root, oe);
file.close();
return null;
} catch(IOException e) {
LOG.error("Failed to acquire lock on " + lockF
+ ". If this storage directory is mounted via NFS, "
+ "ensure that the appropriate nfs lock services are running.", e);
file.close();
throw e;
}
if (!deletionHookAdded) {
// If the file existed prior to our startup, we didn't
// call deleteOnExit above. But since we successfully locked
// the dir, we can take care of cleaning it up.
lockF.deleteOnExit();
}
return res;
}
private static void initKdc() throws Exception {
Properties kdcConf = MiniKdc.createConf();
kdc = new MiniKdc(kdcConf, baseDir);
kdc.start();
configuration = new HdfsConfiguration();
SecurityUtil.setAuthenticationMethod(UserGroupInformation.AuthenticationMethod.KERBEROS, configuration);
UserGroupInformation.setConfiguration(configuration);
assertTrue("Expected configuration to enable security", UserGroupInformation.isSecurityEnabled());
userName = UserGroupInformation.createUserForTesting("guest", new String[]{"users"}).getUserName();
File keytabFile = new File(baseDir, userName + ".keytab");
String keytab = keytabFile.getAbsolutePath();
// Windows will not reverse name lookup "127.0.0.1" to "localhost".
String krbInstance = Path.WINDOWS ? "127.0.0.1" : "localhost";
kdc.createPrincipal(keytabFile, userName + "/" + krbInstance, "HTTP/" + krbInstance);
String hdfsPrincipal = userName + "/" + krbInstance + "@" + kdc.getRealm();
String spnegoPrincipal = "HTTP/" + krbInstance + "@" + kdc.getRealm();
configuration.set(DFS_NAMENODE_KERBEROS_PRINCIPAL_KEY, hdfsPrincipal);
configuration.set(DFS_NAMENODE_KEYTAB_FILE_KEY, keytab);
configuration.set(DFS_DATANODE_KERBEROS_PRINCIPAL_KEY, hdfsPrincipal);
configuration.set(DFS_DATANODE_KEYTAB_FILE_KEY, keytab);
configuration.set(DFS_WEB_AUTHENTICATION_KERBEROS_PRINCIPAL_KEY, spnegoPrincipal);
configuration.set(DFS_JOURNALNODE_KEYTAB_FILE_KEY, keytab);
configuration.set(DFS_JOURNALNODE_KERBEROS_PRINCIPAL_KEY, hdfsPrincipal);
configuration.set(DFS_JOURNALNODE_KERBEROS_INTERNAL_SPNEGO_PRINCIPAL_KEY, spnegoPrincipal);
configuration.setBoolean(DFS_BLOCK_ACCESS_TOKEN_ENABLE_KEY, true);
configuration.set(DFS_DATA_ENCRYPTION_ALGORITHM_KEY, "authentication");
configuration.set(DFS_HTTP_POLICY_KEY, HttpConfig.Policy.HTTP_AND_HTTPS.name());
configuration.set(DFS_NAMENODE_HTTPS_ADDRESS_KEY, "localhost:0");
configuration.set(DFS_DATANODE_HTTPS_ADDRESS_KEY, "localhost:0");
configuration.set(DFS_JOURNALNODE_HTTPS_ADDRESS_KEY, "localhost:0");
configuration.setInt(IPC_CLIENT_CONNECT_MAX_RETRIES_KEY, 10);
configuration.set("hadoop.proxyuser." + userName + ".hosts", "*");
configuration.set("hadoop.proxyuser." + userName + ".groups", "*");
configuration.setBoolean("dfs.permissions", true);
String keystoresDir = baseDir.getAbsolutePath();
File sslClientConfFile = new File(keystoresDir + "/ssl-client.xml");
File sslServerConfFile = new File(keystoresDir + "/ssl-server.xml");
KeyStoreTestUtil.setupSSLConfig(keystoresDir, keystoresDir, configuration, false);
configuration.set(DFS_CLIENT_HTTPS_KEYSTORE_RESOURCE_KEY,
sslClientConfFile.getName());
configuration.set(DFS_SERVER_HTTPS_KEYSTORE_RESOURCE_KEY,
sslServerConfFile.getName());
setupKnox(keytab, hdfsPrincipal);
}
@BeforeClass
public static void init() throws Exception {
baseDir = new File(System.getProperty("test.build.dir", "target/test-dir"),
TestSecureNNWithQJM.class.getSimpleName());
FileUtil.fullyDelete(baseDir);
assertTrue(baseDir.mkdirs());
Properties kdcConf = MiniKdc.createConf();
kdc = new MiniKdc(kdcConf, baseDir);
kdc.start();
baseConf = new HdfsConfiguration();
SecurityUtil.setAuthenticationMethod(AuthenticationMethod.KERBEROS,
baseConf);
UserGroupInformation.setConfiguration(baseConf);
assertTrue("Expected configuration to enable security",
UserGroupInformation.isSecurityEnabled());
String userName = UserGroupInformation.getLoginUser().getShortUserName();
File keytabFile = new File(baseDir, userName + ".keytab");
String keytab = keytabFile.getAbsolutePath();
// Windows will not reverse name lookup "127.0.0.1" to "localhost".
String krbInstance = Path.WINDOWS ? "127.0.0.1" : "localhost";
kdc.createPrincipal(keytabFile,
userName + "/" + krbInstance,
"HTTP/" + krbInstance);
String hdfsPrincipal = userName + "/" + krbInstance + "@" + kdc.getRealm();
String spnegoPrincipal = "HTTP/" + krbInstance + "@" + kdc.getRealm();
baseConf.set(DFS_NAMENODE_KERBEROS_PRINCIPAL_KEY, hdfsPrincipal);
baseConf.set(DFS_NAMENODE_KEYTAB_FILE_KEY, keytab);
baseConf.set(DFS_DATANODE_KERBEROS_PRINCIPAL_KEY, hdfsPrincipal);
baseConf.set(DFS_DATANODE_KEYTAB_FILE_KEY, keytab);
baseConf.set(DFS_WEB_AUTHENTICATION_KERBEROS_PRINCIPAL_KEY, spnegoPrincipal);
baseConf.set(DFS_JOURNALNODE_KEYTAB_FILE_KEY, keytab);
baseConf.set(DFS_JOURNALNODE_KERBEROS_PRINCIPAL_KEY, hdfsPrincipal);
baseConf.set(DFS_JOURNALNODE_KERBEROS_INTERNAL_SPNEGO_PRINCIPAL_KEY,
spnegoPrincipal);
baseConf.setBoolean(DFS_BLOCK_ACCESS_TOKEN_ENABLE_KEY, true);
baseConf.set(DFS_DATA_TRANSFER_PROTECTION_KEY, "authentication");
baseConf.set(DFS_HTTP_POLICY_KEY, HttpConfig.Policy.HTTPS_ONLY.name());
baseConf.set(DFS_NAMENODE_HTTPS_ADDRESS_KEY, "localhost:0");
baseConf.set(DFS_DATANODE_HTTPS_ADDRESS_KEY, "localhost:0");
baseConf.set(DFS_JOURNALNODE_HTTPS_ADDRESS_KEY, "localhost:0");
baseConf.setInt(IPC_CLIENT_CONNECT_MAX_RETRIES_ON_SASL_KEY, 10);
String keystoresDir = baseDir.getAbsolutePath();
String sslConfDir = KeyStoreTestUtil.getClasspathDir(
TestSecureNNWithQJM.class);
KeyStoreTestUtil.setupSSLConfig(keystoresDir, sslConfDir, baseConf, false);
}
@Override
public void serviceInit(Configuration conf) throws Exception {
conf.set(MRConfig.FRAMEWORK_NAME, MRConfig.YARN_TEZ_FRAMEWORK_NAME);
// blacklisting disabled to prevent scheduling issues
conf.setBoolean(TezConfiguration.TEZ_AM_NODE_BLACKLISTING_ENABLED, false);
if (conf.get(MRJobConfig.MR_AM_STAGING_DIR) == null) {
conf.set(MRJobConfig.MR_AM_STAGING_DIR, new File(getTestWorkDir(),
"apps_staging_dir" + Path.SEPARATOR).getAbsolutePath());
}
if (conf.get(YarnConfiguration.DEBUG_NM_DELETE_DELAY_SEC) == null) {
// nothing defined. set quick delete value
conf.setLong(YarnConfiguration.DEBUG_NM_DELETE_DELAY_SEC, 0l);
}
File appJarLocalFile = new File(MiniTezCluster.APPJAR);
if (!appJarLocalFile.exists()) {
String message = "TezAppJar " + MiniTezCluster.APPJAR
+ " not found. Exiting.";
LOG.info(message);
throw new TezUncheckedException(message);
}
FileSystem fs = FileSystem.get(conf);
Path testRootDir = fs.makeQualified(new Path("target", getName() + "-tmpDir"));
Path appRemoteJar = new Path(testRootDir, "TezAppJar.jar");
// Copy AppJar and make it public.
Path appMasterJar = new Path(MiniTezCluster.APPJAR);
fs.copyFromLocalFile(appMasterJar, appRemoteJar);
fs.setPermission(appRemoteJar, new FsPermission("777"));
conf.set(TezConfiguration.TEZ_LIB_URIS, appRemoteJar.toUri().toString());
LOG.info("Set TEZ-LIB-URI to: " + conf.get(TezConfiguration.TEZ_LIB_URIS));
// VMEM monitoring disabled, PMEM monitoring enabled.
conf.setBoolean(YarnConfiguration.NM_PMEM_CHECK_ENABLED, false);
conf.setBoolean(YarnConfiguration.NM_VMEM_CHECK_ENABLED, false);
conf.set(CommonConfigurationKeys.FS_PERMISSIONS_UMASK_KEY, "000");
try {
Path stagingPath = FileContext.getFileContext(conf).makeQualified(
new Path(conf.get(MRJobConfig.MR_AM_STAGING_DIR)));
/*
* Re-configure the staging path on Windows if the file system is localFs.
* We need to use a absolute path that contains the drive letter. The unit
* test could run on a different drive than the AM. We can run into the
* issue that job files are localized to the drive where the test runs on,
* while the AM starts on a different drive and fails to find the job
* metafiles. Using absolute path can avoid this ambiguity.
*/
if (Path.WINDOWS) {
if (LocalFileSystem.class.isInstance(stagingPath.getFileSystem(conf))) {
conf.set(MRJobConfig.MR_AM_STAGING_DIR,
new File(conf.get(MRJobConfig.MR_AM_STAGING_DIR))
.getAbsolutePath());
}
}
FileContext fc=FileContext.getFileContext(stagingPath.toUri(), conf);
if (fc.util().exists(stagingPath)) {
LOG.info(stagingPath + " exists! deleting...");
fc.delete(stagingPath, true);
}
LOG.info("mkdir: " + stagingPath);
fc.mkdir(stagingPath, null, true);
//mkdir done directory as well
String doneDir =
JobHistoryUtils.getConfiguredHistoryServerDoneDirPrefix(conf);
Path doneDirPath = fc.makeQualified(new Path(doneDir));
fc.mkdir(doneDirPath, null, true);
} catch (IOException e) {
throw new TezUncheckedException("Could not create staging directory. ", e);
}
conf.set(MRConfig.MASTER_ADDRESS, "test");
//configure the shuffle service in NM
conf.setStrings(YarnConfiguration.NM_AUX_SERVICES,
new String[] { ShuffleHandler.MAPREDUCE_SHUFFLE_SERVICEID });
conf.setClass(String.format(YarnConfiguration.NM_AUX_SERVICE_FMT,
ShuffleHandler.MAPREDUCE_SHUFFLE_SERVICEID), ShuffleHandler.class,
Service.class);
// Non-standard shuffle port
conf.setInt(ShuffleHandler.SHUFFLE_PORT_CONFIG_KEY, 0);
conf.setClass(YarnConfiguration.NM_CONTAINER_EXECUTOR,
DefaultContainerExecutor.class, ContainerExecutor.class);
// TestMRJobs is for testing non-uberized operation only; see TestUberAM
// for corresponding uberized tests.
conf.setBoolean(MRJobConfig.JOB_UBERTASK_ENABLE, false);
super.serviceInit(conf);
}