下面列出了org.apache.hadoop.mapred.FileAlreadyExistsException#org.apache.hadoop.yarn.api.ApplicationConstants.Environment 实例代码,或者点击链接到github查看源代码,也可以在右侧发表评论。
/**
* Makes MDC properties
*/
public static void setupMDC(String service)
{
MDC.put("apex.service", service);
String value = StramClientUtils.getHostName();
MDC.put("apex.node", value == null ? "unknown" : value);
value = System.getenv(Environment.USER.key());
if (value != null) {
MDC.put("apex.user", value);
}
value = System.getenv(Environment.CONTAINER_ID.name());
if (value != null) {
ContainerId containerId = ConverterUtils.toContainerId(value);
ApplicationId applicationId = containerId.getApplicationAttemptId().getApplicationId();
MDC.put("apex.containerId", containerId.toString());
MDC.put("apex.applicationId", applicationId.toString());
}
value = System.getProperty(APPLICATION_NAME.getLongName());
if (value != null) {
MDC.put("apex.application", value);
}
}
/**
* Creates a {@link ApplicationClassLoader} if
* {@link MRJobConfig#MAPREDUCE_JOB_CLASSLOADER} is set to true, and
* the APP_CLASSPATH environment variable is set.
* @param conf
* @return the created job classloader, or null if the job classloader is not
* enabled or the APP_CLASSPATH environment variable is not set
* @throws IOException
*/
public static ClassLoader createJobClassLoader(Configuration conf)
throws IOException {
ClassLoader jobClassLoader = null;
if (conf.getBoolean(MRJobConfig.MAPREDUCE_JOB_CLASSLOADER, false)) {
String appClasspath = System.getenv(Environment.APP_CLASSPATH.key());
if (appClasspath == null) {
LOG.warn("Not creating job classloader since APP_CLASSPATH is not set.");
} else {
LOG.info("Creating job classloader");
if (LOG.isDebugEnabled()) {
LOG.debug("APP_CLASSPATH=" + appClasspath);
}
String[] systemClasses = getSystemClasses(conf);
jobClassLoader = createJobClassLoader(appClasspath,
systemClasses);
}
}
return jobClassLoader;
}
/**
* Creates a {@link ApplicationClassLoader} if
* {@link MRJobConfig#MAPREDUCE_JOB_CLASSLOADER} is set to true, and
* the APP_CLASSPATH environment variable is set.
* @param conf
* @return the created job classloader, or null if the job classloader is not
* enabled or the APP_CLASSPATH environment variable is not set
* @throws IOException
*/
public static ClassLoader createJobClassLoader(Configuration conf)
throws IOException {
ClassLoader jobClassLoader = null;
if (conf.getBoolean(MRJobConfig.MAPREDUCE_JOB_CLASSLOADER, false)) {
String appClasspath = System.getenv(Environment.APP_CLASSPATH.key());
if (appClasspath == null) {
LOG.warn("Not creating job classloader since APP_CLASSPATH is not set.");
} else {
LOG.info("Creating job classloader");
if (LOG.isDebugEnabled()) {
LOG.debug("APP_CLASSPATH=" + appClasspath);
}
String[] systemClasses = getSystemClasses(conf);
jobClassLoader = createJobClassLoader(appClasspath,
systemClasses);
}
}
return jobClassLoader;
}
/**
* Makes MDC properties
*/
public static void setupMDC(String service)
{
MDC.put("apex.service", service);
String value = StramClientUtils.getHostName();
MDC.put("apex.node", value == null ? "unknown" : value);
value = System.getenv(Environment.USER.key());
if (value != null) {
MDC.put("apex.user", value);
}
value = System.getenv(Environment.CONTAINER_ID.name());
if (value != null) {
ContainerId containerId = ConverterUtils.toContainerId(value);
ApplicationId applicationId = containerId.getApplicationAttemptId().getApplicationId();
MDC.put("apex.containerId", containerId.toString());
MDC.put("apex.applicationId", applicationId.toString());
}
value = System.getProperty(APPLICATION_NAME.getLongName());
if (value != null) {
MDC.put("apex.application", value);
}
}
private void testCommonEnvSettingsForMRTasks(Map<String, String> env) {
Assert.assertTrue(env.containsKey("foo"));
Assert.assertTrue(env.containsKey("bar"));
Assert.assertTrue(env.containsKey(Environment.LD_LIBRARY_PATH.name()));
Assert.assertTrue(env.containsKey(Environment.SHELL.name()));
Assert.assertTrue(env.containsKey("HADOOP_ROOT_LOGGER"));
Assert.assertEquals("$PWD:$TEZ_ADMIN_ENV_TEST/lib/native",
env.get(Environment.LD_LIBRARY_PATH.name()));
// TEZ-273 will reinstate this or similar.
// for (String val : YarnConfiguration.DEFAULT_YARN_APPLICATION_CLASSPATH) {
// Assert.assertTrue(env.get(Environment.CLASSPATH.name()).contains(val));
// }
// Assert.assertTrue(0 ==
// env.get(Environment.CLASSPATH.name()).indexOf(Environment.PWD.$()));
}
/**
* Setup
*
* @param containerTask
* the new task specification. Must be a valid task
* @param childUGI
* the old UGI instance being used
* @return
*/
UserGroupInformation handleNewTaskCredentials(ContainerTask containerTask,
UserGroupInformation childUGI) {
// Re-use the UGI only if the Credentials have not changed.
Preconditions.checkState(containerTask.shouldDie() != true);
Preconditions.checkState(containerTask.getTaskSpec() != null);
if (containerTask.haveCredentialsChanged()) {
LOG.info("Refreshing UGI since Credentials have changed");
Credentials taskCreds = containerTask.getCredentials();
if (taskCreds != null) {
LOG.info("Credentials : #Tokens=" + taskCreds.numberOfTokens() + ", #SecretKeys="
+ taskCreds.numberOfSecretKeys());
childUGI = UserGroupInformation.createRemoteUser(System
.getenv(ApplicationConstants.Environment.USER.toString()));
childUGI.addCredentials(containerTask.getCredentials());
} else {
LOG.info("Not loading any credentials, since no credentials provided");
}
}
return childUGI;
}
public static String getFrameworkClasspath(Configuration conf) {
Map<String, String> environment = new HashMap<String, String>();
TezYARNUtils.addToEnvironment(environment,
Environment.CLASSPATH.name(),
Environment.PWD.$(),
File.pathSeparator);
TezYARNUtils.addToEnvironment(environment,
Environment.CLASSPATH.name(),
Environment.PWD.$() + File.separator + "*",
File.pathSeparator);
// Add YARN/COMMON/HDFS jars and conf locations to path
for (String c : conf.getStrings(
YarnConfiguration.YARN_APPLICATION_CLASSPATH,
YarnConfiguration.DEFAULT_YARN_APPLICATION_CLASSPATH)) {
TezYARNUtils.addToEnvironment(environment, Environment.CLASSPATH.name(),
c.trim(), File.pathSeparator);
}
return StringInterner.weakIntern(environment.get(Environment.CLASSPATH.name()));
}
public static void setupDefaultEnv(Map<String, String> env,
Configuration conf, String confEnvKey, String confEnvKeyDefault) {
// Setup the CLASSPATH in environment
// i.e. add { Hadoop jars, job jar, CWD } to classpath.
String classpath = getFrameworkClasspath(conf);
TezYARNUtils.addToEnvironment(env,
ApplicationConstants.Environment.CLASSPATH.name(),
classpath, File.pathSeparator);
// set any env from config if it is not set already
TezYARNUtils.setEnvIfAbsentFromInputString(env, conf.get(
confEnvKey, confEnvKeyDefault));
// Append pwd to LD_LIBRARY_PATH
// Done separately here because this is known to work platform independent
TezYARNUtils.addToEnvironment(env, Environment.LD_LIBRARY_PATH.name(),
Environment.PWD.$(), File.pathSeparator);
}
private void setupAppMasterEnv(Map<String, String> appMasterEnv) {
StringBuilder classPathEnv = new StringBuilder();
classPathEnv.append(Environment.CLASSPATH.$()).append(File.pathSeparatorChar);
classPathEnv.append("./*");
for (String c : conf.getStrings(
YarnConfiguration.YARN_APPLICATION_CLASSPATH,
YarnConfiguration.DEFAULT_YARN_APPLICATION_CLASSPATH)) {
classPathEnv.append(File.pathSeparatorChar);
classPathEnv.append(c.trim());
}
String envStr = classPathEnv.toString();
LOG.info("env: " + envStr);
appMasterEnv.put(Environment.CLASSPATH.name(), envStr);
}
public void startContainer(Container container, VMConfig appVMConfig) throws YarnException, IOException {
String command = appVMConfig.buildCommand();
ContainerLaunchContext ctx = Records.newRecord(ContainerLaunchContext.class);
if(vmConfig.getVmResources().size() > 0) {
appVMConfig.getVmResources().putAll(vmConfig.getVmResources());
VMResources vmResources = new VMResources(conf, appVMConfig);
ctx.setLocalResources(vmResources);
}
Map<String, String> appEnv = new HashMap<String, String>();
boolean miniClusterEnv = vmConfig.getEnvironment() == VMConfig.Environment.YARN_MINICLUSTER;
setupAppClasspath(miniClusterEnv , conf, appEnv);
ctx.setEnvironment(appEnv);
StringBuilder sb = new StringBuilder();
List<String> commands = Collections.singletonList(
sb.append(command).
append(" 1> ").append(ApplicationConstants.LOG_DIR_EXPANSION_VAR).append("/stdout").
append(" 2> ").append(ApplicationConstants.LOG_DIR_EXPANSION_VAR).append("/stderr").toString()
);
ctx.setCommands(commands);
nmClient.startContainer(container, ctx);
//TODO: update vm descriptor status
}
void setupAppClasspath(boolean miniClusterEnv, Configuration conf, Map<String, String> appMasterEnv) {
if(miniClusterEnv) {
String cps = System.getProperty("java.class.path") ;
String[] cp = cps.split(":") ;
for(String selCp : cp) {
Apps.addToEnvironment(appMasterEnv, Environment.CLASSPATH.name(), selCp, ":");
}
} else {
StringBuilder classPathEnv = new StringBuilder();
classPathEnv.append(Environment.CLASSPATH.$()).append(File.pathSeparatorChar);
classPathEnv.append("./*");
String[] classpath = conf.getStrings(
YarnConfiguration.YARN_APPLICATION_CLASSPATH,
YarnConfiguration.DEFAULT_YARN_APPLICATION_CLASSPATH
) ;
for (String selClasspath : classpath) {
classPathEnv.append(File.pathSeparatorChar);
classPathEnv.append(selClasspath.trim());
}
appMasterEnv.put(Environment.CLASSPATH.name(), classPathEnv.toString());
System.err.println("CLASSPATH: " + classPathEnv);
}
Apps.addToEnvironment(appMasterEnv, Environment.CLASSPATH.name(), Environment.PWD.$() + File.separator + "*", ":");
}
private void testCommonEnvSettingsForMRTasks(Map<String, String> env) {
Assert.assertTrue(env.containsKey("foo"));
Assert.assertTrue(env.containsKey("bar"));
Assert.assertTrue(env.containsKey(Environment.LD_LIBRARY_PATH.name()));
Assert.assertTrue(env.containsKey(Environment.SHELL.name()));
Assert.assertTrue(env.containsKey("HADOOP_ROOT_LOGGER"));
/* On non-windows platform ensure that LD_LIBRARY_PATH is being set and PWD is present.
* on windows platform LD_LIBRARY_PATH is not applicable. check the PATH is being appended
* by the user setting (ex user may set HADOOP_HOME\\bin.
*/
if (!Shell.WINDOWS) {
Assert.assertEquals("$PWD:$TEZ_ADMIN_ENV_TEST/lib/native",
env.get(Environment.LD_LIBRARY_PATH.name()));
} else {
Assert.assertTrue(env.get(Environment.PATH.name()).contains(";%TEZ_ADMIN_ENV%\\bin"));
}
// TEZ-273 will reinstate this or similar.
// for (String val : YarnConfiguration.DEFAULT_YARN_APPLICATION_CLASSPATH) {
// Assert.assertTrue(env.get(Environment.CLASSPATH.name()).contains(val));
// }
// Assert.assertTrue(0 ==
// env.get(Environment.CLASSPATH.name()).indexOf(Environment.PWD.$()));
}
private TezChild createTezChild(Configuration defaultConf, ContainerId containerId,
String tokenIdentifier, int attemptNumber, String[] localDirs,
TezTaskUmbilicalProtocol tezTaskUmbilicalProtocol,
Credentials credentials) throws
InterruptedException, TezException, IOException {
Map<String, String> containerEnv = new HashMap<String, String>();
containerEnv.putAll(localEnv);
// Use the user from env if it's available.
String user = isLocalMode ? System.getenv(Environment.USER.name()) : context.getUser();
containerEnv.put(Environment.USER.name(), user);
long memAvailable;
synchronized (this) { // needed to fix findbugs Inconsistent synchronization warning
memAvailable = Runtime.getRuntime().maxMemory() / numExecutors;
}
TezChild tezChild =
TezChild.newTezChild(defaultConf, null, 0, containerId.toString(), tokenIdentifier,
attemptNumber, localDirs, workingDirectory, containerEnv, "", executionContext, credentials,
memAvailable, context.getUser(), tezTaskUmbilicalProtocol, false,
context.getHadoopShim());
return tezChild;
}
@Private
@VisibleForTesting
static String constructAMLaunchOpts(TezConfiguration tezConf, Resource capability) {
String defaultOpts = tezConf.get(TezConfiguration.TEZ_AM_LAUNCH_CLUSTER_DEFAULT_CMD_OPTS,
TezConfiguration.TEZ_AM_LAUNCH_CLUSTER_DEFAULT_CMD_OPTS_DEFAULT);
Path tmpDir = new Path(Environment.PWD.$(),
YarnConfiguration.DEFAULT_CONTAINER_TEMP_DIR);
String amOpts = "-Djava.io.tmpdir=" + tmpDir + " ";
if (defaultOpts != null && !defaultOpts.isEmpty()) {
amOpts = amOpts + defaultOpts + " ";
}
amOpts = amOpts + tezConf.get(TezConfiguration.TEZ_AM_LAUNCH_CMD_OPTS,
TezConfiguration.TEZ_AM_LAUNCH_CMD_OPTS_DEFAULT);
amOpts = maybeAddDefaultMemoryJavaOpts(amOpts, capability,
tezConf.getDouble(TezConfiguration.TEZ_CONTAINER_MAX_JAVA_HEAP_FRACTION,
TezConfiguration.TEZ_CONTAINER_MAX_JAVA_HEAP_FRACTION_DEFAULT));
return amOpts;
}
public static void setupDefaultEnv(Map<String, String> env, Configuration conf, String userEnvKey, String userEnvDefault,
String clusterDefaultEnvKey, String clusterDefaultEnvDefault, boolean usingArchive) {
// Setup the CLASSPATH in environment
// i.e. add { Hadoop jars, job jar, CWD } to classpath.
String classpath = getFrameworkClasspath(conf, usingArchive);
TezYARNUtils.addToEnvironment(env,
ApplicationConstants.Environment.CLASSPATH.name(),
classpath, File.pathSeparator);
// Pre-pend pwd to LD_LIBRARY_PATH
// Done separately here because this is known to work platform
// independent
TezYARNUtils.addToEnvironment(env,
Environment.LD_LIBRARY_PATH.name(), Environment.PWD.$(), File.pathSeparator);
TezYARNUtils.appendToEnvFromInputString(env,
conf.get(userEnvKey, userEnvDefault), File.pathSeparator);
// set any env from config if it is not set already
TezYARNUtils.appendToEnvFromInputString(env,
conf.get(clusterDefaultEnvKey, clusterDefaultEnvDefault), File.pathSeparator);
}
@Test(timeout = 5000)
public void testNoHadoopConfInClasspath() {
Configuration conf = new Configuration(false);
conf.setBoolean(TezConfiguration.TEZ_CLASSPATH_ADD_HADOOP_CONF, true);
String classpath = TezYARNUtils.getFrameworkClasspath(conf, true);
Assert.assertTrue(classpath.contains(Environment.PWD.$()));
Assert.assertTrue(classpath.contains(Environment.PWD.$() + File.separator + "*"));
Assert.assertTrue(classpath.contains(TezConstants.TEZ_TAR_LR_NAME + File.separator + "*"));
Assert.assertTrue(classpath.contains(TezConstants.TEZ_TAR_LR_NAME + File.separator
+ "lib" + File.separator + "*"));
Assert.assertTrue(classpath.contains(Environment.HADOOP_CONF_DIR.$()));
Assert.assertTrue(classpath.indexOf(Environment.PWD.$()) <
classpath.indexOf(TezConstants.TEZ_TAR_LR_NAME));
Assert.assertTrue(classpath.indexOf(TezConstants.TEZ_TAR_LR_NAME) <
classpath.indexOf(Environment.HADOOP_CONF_DIR.$()));
}
@Test(timeout = 5000)
public void testSetupDefaultEnvironment() {
Configuration conf = new Configuration(false);
conf.set(TezConfiguration.TEZ_AM_LAUNCH_ENV, "LD_LIBRARY_PATH=USER_PATH,USER_KEY=USER_VALUE");
conf.set(TezConfiguration.TEZ_AM_LAUNCH_CLUSTER_DEFAULT_ENV, "LD_LIBRARY_PATH=DEFAULT_PATH,DEFAULT_KEY=DEFAULT_VALUE");
Map<String, String> environment = new TreeMap<String, String>();
TezYARNUtils.setupDefaultEnv(environment, conf,
TezConfiguration.TEZ_AM_LAUNCH_ENV,
TezConfiguration.TEZ_AM_LAUNCH_ENV_DEFAULT,
TezConfiguration.TEZ_AM_LAUNCH_CLUSTER_DEFAULT_ENV,
TezConfiguration.TEZ_AM_LAUNCH_CLUSTER_DEFAULT_ENV_DEFAULT, false);
String value1 = environment.get("USER_KEY");
Assert.assertEquals("User env should merge with default env", "USER_VALUE", value1);
String value2 = environment.get("DEFAULT_KEY");
Assert.assertEquals("User env should merge with default env", "DEFAULT_VALUE", value2);
String value3 = environment.get("LD_LIBRARY_PATH");
Assert.assertEquals("User env should append default env",
Environment.PWD.$() + File.pathSeparator + "USER_PATH" + File.pathSeparator + "DEFAULT_PATH", value3);
}
/**
* The instance entry point for the YARN task executor. Obtains user group information and calls
* the main work method {@link TaskManagerRunner#runTaskManager(Configuration, ResourceID)} as a
* privileged action.
*
* @param args The command line arguments.
*/
private static void run(String[] args) {
try {
LOG.debug("All environment variables: {}", ENV);
final String currDir = ENV.get(Environment.PWD.key());
LOG.info("Current working Directory: {}", currDir);
final Configuration configuration = GlobalConfiguration.loadConfiguration(currDir);
FileSystem.initialize(configuration);
setupConfigurationAndInstallSecurityContext(configuration, currDir, ENV);
final String containerId = ENV.get(YarnResourceManager.ENV_FLINK_CONTAINER_ID);
Preconditions.checkArgument(containerId != null,
"ContainerId variable %s not set", YarnResourceManager.ENV_FLINK_CONTAINER_ID);
SecurityUtils.getInstalledContext().runSecured((Callable<Void>) () -> {
TaskManagerRunner.runTaskManager(configuration, new ResourceID(containerId));
return null;
});
}
catch (Throwable t) {
final Throwable strippedThrowable = ExceptionUtils.stripException(t, UndeclaredThrowableException.class);
// make sure that everything whatever ends up in the log
LOG.error("YARN TaskManager initialization failed.", strippedThrowable);
System.exit(INIT_ERROR_EXIT_CODE);
}
}
@VisibleForTesting
static void setupConfigurationAndInstallSecurityContext(Configuration configuration, String currDir, Map<String, String> variables) throws Exception {
final String localDirs = variables.get(Environment.LOCAL_DIRS.key());
LOG.info("Current working/local Directory: {}", localDirs);
BootstrapTools.updateTmpDirectoriesInConfiguration(configuration, localDirs);
setupConfigurationFromVariables(configuration, currDir, variables);
installSecurityContext(configuration);
}
public static void setupYarnClassPath(Configuration conf, Map<String, String> appMasterEnv) {
addToEnvironment(
appMasterEnv,
Environment.CLASSPATH.name(),
appMasterEnv.get(ENV_FLINK_CLASSPATH));
String[] applicationClassPathEntries = conf.getStrings(
YarnConfiguration.YARN_APPLICATION_CLASSPATH,
YarnConfiguration.DEFAULT_YARN_APPLICATION_CLASSPATH);
for (String c : applicationClassPathEntries) {
addToEnvironment(appMasterEnv, Environment.CLASSPATH.name(), c.trim());
}
}
/**
* The instance entry point for the YARN task executor. Obtains user group information and calls
* the main work method {@link TaskManagerRunner#runTaskManager(Configuration, ResourceID)} as a
* privileged action.
*
* @param args The command line arguments.
*/
private static void run(String[] args) {
try {
LOG.debug("All environment variables: {}", ENV);
final String currDir = ENV.get(Environment.PWD.key());
LOG.info("Current working Directory: {}", currDir);
final Configuration configuration = GlobalConfiguration.loadConfiguration(currDir);
FileSystem.initialize(configuration, PluginUtils.createPluginManagerFromRootFolder(configuration));
setupConfigurationAndInstallSecurityContext(configuration, currDir, ENV);
final String containerId = ENV.get(YarnResourceManager.ENV_FLINK_CONTAINER_ID);
Preconditions.checkArgument(containerId != null,
"ContainerId variable %s not set", YarnResourceManager.ENV_FLINK_CONTAINER_ID);
SecurityUtils.getInstalledContext().runSecured((Callable<Void>) () -> {
TaskManagerRunner.runTaskManager(configuration, new ResourceID(containerId));
return null;
});
}
catch (Throwable t) {
final Throwable strippedThrowable = ExceptionUtils.stripException(t, UndeclaredThrowableException.class);
// make sure that everything whatever ends up in the log
LOG.error("YARN TaskManager initialization failed.", strippedThrowable);
System.exit(INIT_ERROR_EXIT_CODE);
}
}
@VisibleForTesting
static void setupConfigurationAndInstallSecurityContext(Configuration configuration, String currDir, Map<String, String> variables) throws Exception {
final String localDirs = variables.get(Environment.LOCAL_DIRS.key());
LOG.info("Current working/local Directory: {}", localDirs);
BootstrapTools.updateTmpDirectoriesInConfiguration(configuration, localDirs);
setupConfigurationFromVariables(configuration, currDir, variables);
installSecurityContext(configuration);
}
public static void setupYarnClassPath(Configuration conf, Map<String, String> appMasterEnv) {
addToEnvironment(
appMasterEnv,
Environment.CLASSPATH.name(),
appMasterEnv.get(ENV_FLINK_CLASSPATH));
String[] applicationClassPathEntries = conf.getStrings(
YarnConfiguration.YARN_APPLICATION_CLASSPATH,
YarnConfiguration.DEFAULT_YARN_APPLICATION_CLASSPATH);
for (String c : applicationClassPathEntries) {
addToEnvironment(appMasterEnv, Environment.CLASSPATH.name(), c.trim());
}
}
@Override
public void init(CommandLine cliParser) throws Exception {
LOG.info("Starting ApplicationMaster");
// Args have to be initialized first
this.args = new ApplicationMasterArgs(cliParser);
this.clusterSpec = ClusterSpec.empty();
String hostname = System.getenv(Environment.NM_HOST.name());
int rpcPort = setupRPCService(hostname);
RegisterApplicationMasterResponse response = setupRMConnection(hostname, rpcPort);
setupPreviousRunningContainers(response);
setupContainerResource(response);
setupNMConnection();
}
private void setupPreviousRunningContainers(RegisterApplicationMasterResponse response) {
String containerIdStr =
System.getenv(Environment.CONTAINER_ID.name());
ContainerId containerId = ContainerId.fromString(containerIdStr);
appAttemptId = containerId.getApplicationAttemptId();
List<Container> previousAMRunningContainers =
response.getContainersFromPreviousAttempts();
LOG.info(appAttemptId + " received " + previousAMRunningContainers.size()
+ " previous attempts' running containers on AM registration.");
for (Container container : previousAMRunningContainers) {
launchedContainers.add(container.getId());
}
allocatedContainerNum.addAndGet(previousAMRunningContainers.size());
}
/**
* Lock this on initialClasspath so that there is only one fork in the AM for
* getting the initial class-path. TODO: We already construct
* a parent CLC and use it for all the containers, so this should go away
* once the mr-generated-classpath stuff is gone.
*/
private static String getInitialClasspath(Configuration conf) throws IOException {
synchronized (classpathLock) {
if (initialClasspathFlag.get()) {
return initialClasspath;
}
Map<String, String> env = new HashMap<String, String>();
MRApps.setClasspath(env, conf);
initialClasspath = env.get(Environment.CLASSPATH.name());
initialAppClasspath = env.get(Environment.APP_CLASSPATH.name());
initialClasspathFlag.set(true);
return initialClasspath;
}
}
@Test
public void testAMStandardEnv() throws Exception {
final String ADMIN_LIB_PATH = "foo";
final String USER_LIB_PATH = "bar";
final String USER_SHELL = "shell";
JobConf jobConf = new JobConf();
jobConf.set(MRJobConfig.MR_AM_ADMIN_USER_ENV, "LD_LIBRARY_PATH=" +
ADMIN_LIB_PATH);
jobConf.set(MRJobConfig.MR_AM_ENV, "LD_LIBRARY_PATH="
+ USER_LIB_PATH);
jobConf.set(MRJobConfig.MAPRED_ADMIN_USER_SHELL, USER_SHELL);
YARNRunner yarnRunner = new YARNRunner(jobConf);
ApplicationSubmissionContext appSubCtx =
buildSubmitContext(yarnRunner, jobConf);
// make sure PWD is first in the lib path
ContainerLaunchContext clc = appSubCtx.getAMContainerSpec();
Map<String, String> env = clc.getEnvironment();
String libPath = env.get(Environment.LD_LIBRARY_PATH.name());
assertNotNull("LD_LIBRARY_PATH not set", libPath);
String cps = jobConf.getBoolean(
MRConfig.MAPREDUCE_APP_SUBMISSION_CROSS_PLATFORM,
MRConfig.DEFAULT_MAPREDUCE_APP_SUBMISSION_CROSS_PLATFORM)
? ApplicationConstants.CLASS_PATH_SEPARATOR : File.pathSeparator;
assertEquals("Bad AM LD_LIBRARY_PATH setting",
MRApps.crossPlatformifyMREnv(conf, Environment.PWD)
+ cps + ADMIN_LIB_PATH + cps + USER_LIB_PATH, libPath);
// make sure SHELL is set
String shell = env.get(Environment.SHELL.name());
assertNotNull("SHELL not set", shell);
assertEquals("Bad SHELL setting", USER_SHELL, shell);
}
/**
* Lock this on initialClasspath so that there is only one fork in the AM for
* getting the initial class-path. TODO: We already construct
* a parent CLC and use it for all the containers, so this should go away
* once the mr-generated-classpath stuff is gone.
*/
private static String getInitialClasspath(Configuration conf) throws IOException {
synchronized (classpathLock) {
if (initialClasspathFlag.get()) {
return initialClasspath;
}
Map<String, String> env = new HashMap<String, String>();
MRApps.setClasspath(env, conf);
initialClasspath = env.get(Environment.CLASSPATH.name());
initialAppClasspath = env.get(Environment.APP_CLASSPATH.name());
initialClasspathFlag.set(true);
return initialClasspath;
}
}
@Test
public void testAMStandardEnv() throws Exception {
final String ADMIN_LIB_PATH = "foo";
final String USER_LIB_PATH = "bar";
final String USER_SHELL = "shell";
JobConf jobConf = new JobConf();
jobConf.set(MRJobConfig.MR_AM_ADMIN_USER_ENV, "LD_LIBRARY_PATH=" +
ADMIN_LIB_PATH);
jobConf.set(MRJobConfig.MR_AM_ENV, "LD_LIBRARY_PATH="
+ USER_LIB_PATH);
jobConf.set(MRJobConfig.MAPRED_ADMIN_USER_SHELL, USER_SHELL);
YARNRunner yarnRunner = new YARNRunner(jobConf);
ApplicationSubmissionContext appSubCtx =
buildSubmitContext(yarnRunner, jobConf);
// make sure PWD is first in the lib path
ContainerLaunchContext clc = appSubCtx.getAMContainerSpec();
Map<String, String> env = clc.getEnvironment();
String libPath = env.get(Environment.LD_LIBRARY_PATH.name());
assertNotNull("LD_LIBRARY_PATH not set", libPath);
String cps = jobConf.getBoolean(
MRConfig.MAPREDUCE_APP_SUBMISSION_CROSS_PLATFORM,
MRConfig.DEFAULT_MAPREDUCE_APP_SUBMISSION_CROSS_PLATFORM)
? ApplicationConstants.CLASS_PATH_SEPARATOR : File.pathSeparator;
assertEquals("Bad AM LD_LIBRARY_PATH setting",
MRApps.crossPlatformifyMREnv(conf, Environment.PWD)
+ cps + ADMIN_LIB_PATH + cps + USER_LIB_PATH, libPath);
// make sure SHELL is set
String shell = env.get(Environment.SHELL.name());
assertNotNull("SHELL not set", shell);
assertEquals("Bad SHELL setting", USER_SHELL, shell);
}
/**
* Extracts {@link LocalResource}s from the {@link Container}.
*/
@SuppressWarnings("unchecked")
private Set<Path> extractUserProvidedClassPathEntries(Container container) {
Map<Path, List<String>> localizedResources;
try {
Field lf = container.getClass().getDeclaredField("localizedResources");
lf.setAccessible(true);
localizedResources = (Map<Path, List<String>>) lf.get(container);
Set<Path> paths = localizedResources.keySet();
// Needed for Tez
for (Path path : paths) {
if (path.toString().endsWith("tez-conf.pb") || path.toString().endsWith("tez-dag.pb")){
File sourceFile = new File(path.toUri());
File targetFile = new File(System.getenv(Environment.PWD.name()) + "/" + sourceFile.getName());
FileUtils.copyFile(sourceFile, targetFile);
// System.out.println("######## Copied file: " + targetFile);
// FileInputStream fis = new FileInputStream(new File(System.getenv(Environment.PWD.name()), targetFile.getName()));
// System.out.println(fis.available());
// fis.close();
// break;
}
}
return paths;
}
catch (Exception e) {
throw new RuntimeException(e);
}
}