下面列出了org.apache.commons.cli.MissingArgumentException#org.apache.hadoop.yarn.conf.YarnConfiguration 实例代码,或者点击链接到github查看源代码,也可以在右侧发表评论。
/**
* If resource is public, verifyAccess should succeed
*/
@Test
public void testVerifyAccessPublicResource() throws Exception {
Configuration conf = new Configuration();
conf.setBoolean(YarnConfiguration.SHARED_CACHE_ENABLED, true);
LocalResource resource = mock(LocalResource.class);
// give public visibility
when(resource.getVisibility()).thenReturn(LocalResourceVisibility.PUBLIC);
Path localPath = mock(Path.class);
when(localPath.getName()).thenReturn("foo.jar");
String user = "joe";
SCMUploaderProtocol scmClient = mock(SCMUploaderProtocol.class);
FileSystem fs = mock(FileSystem.class);
FileSystem localFs = FileSystem.getLocal(conf);
SharedCacheUploader spied =
createSpiedUploader(resource, localPath, user, conf, scmClient, fs,
localFs);
assertTrue(spied.verifyAccess());
}
@Override
public RefreshUserToGroupsMappingsResponse refreshUserToGroupsMappings(
RefreshUserToGroupsMappingsRequest request)
throws YarnException, IOException {
String argName = "refreshUserToGroupsMappings";
UserGroupInformation user = checkAcls(argName);
checkRMStatus(user.getShortUserName(), argName, "refresh user-groups.");
Groups.getUserToGroupsMappingService(
getConfiguration(new Configuration(false),
YarnConfiguration.CORE_SITE_CONFIGURATION_FILE)).refresh();
RMAuditLogger.logSuccess(user.getShortUserName(), argName, "AdminService");
return recordFactory.newRecordInstance(
RefreshUserToGroupsMappingsResponse.class);
}
public void verifyClusterSchedulerFifoGeneric(String type, String state,
float capacity, float usedCapacity, int minQueueCapacity,
int maxQueueCapacity, int numNodes, int usedNodeCapacity,
int availNodeCapacity, int totalNodeCapacity, int numContainers)
throws JSONException, Exception {
assertEquals("type doesn't match", "fifoScheduler", type);
assertEquals("qstate doesn't match", QueueState.RUNNING.toString(), state);
assertEquals("capacity doesn't match", 1.0, capacity, 0.0);
assertEquals("usedCapacity doesn't match", 0.0, usedCapacity, 0.0);
assertEquals(
"minQueueMemoryCapacity doesn't match",
YarnConfiguration.DEFAULT_RM_SCHEDULER_MINIMUM_ALLOCATION_MB,
minQueueCapacity);
assertEquals("maxQueueMemoryCapacity doesn't match",
YarnConfiguration.DEFAULT_RM_SCHEDULER_MAXIMUM_ALLOCATION_MB,
maxQueueCapacity);
assertEquals("numNodes doesn't match", 0, numNodes);
assertEquals("usedNodeCapacity doesn't match", 0, usedNodeCapacity);
assertEquals("availNodeCapacity doesn't match", 0, availNodeCapacity);
assertEquals("totalNodeCapacity doesn't match", 0, totalNodeCapacity);
assertEquals("numContainers doesn't match", 0, numContainers);
}
@Test
public void testMinZeroResourcesSettings() throws IOException {
scheduler = new FairScheduler();
YarnConfiguration conf = new YarnConfiguration();
conf.setInt(YarnConfiguration.RM_SCHEDULER_MINIMUM_ALLOCATION_MB, 0);
conf.setInt(YarnConfiguration.RM_SCHEDULER_MINIMUM_ALLOCATION_VCORES, 0);
conf.setInt(YarnConfiguration.RM_SCHEDULER_MINIMUM_ALLOCATION_GCORES, 0);
conf.setInt(
FairSchedulerConfiguration.RM_SCHEDULER_INCREMENT_ALLOCATION_MB, 512);
conf.setInt(
FairSchedulerConfiguration.RM_SCHEDULER_INCREMENT_ALLOCATION_VCORES, 2);
conf.setInt(
FairSchedulerConfiguration.RM_SCHEDULER_INCREMENT_ALLOCATION_GCORES, 1);
scheduler.init(conf);
scheduler.reinitialize(conf, null);
Assert.assertEquals(0, scheduler.getMinimumResourceCapability().getMemory());
Assert.assertEquals(0, scheduler.getMinimumResourceCapability().getVirtualCores());
Assert.assertEquals(0, scheduler.getMinimumResourceCapability().getGpuCores());
Assert.assertEquals(512, scheduler.getIncrementResourceCapability().getMemory());
Assert.assertEquals(2, scheduler.getIncrementResourceCapability().getVirtualCores());
Assert.assertEquals(1, scheduler.getIncrementResourceCapability().getGpuCores());
}
private void testMinimumAllocation(YarnConfiguration conf, int testAlloc)
throws Exception {
MockRM rm = new MockRM(conf);
rm.start();
// Register node1
MockNM nm1 = rm.registerNode("127.0.0.1:1234", 6 * GB);
// Submit an application
RMApp app1 = rm.submitApp(testAlloc);
// kick the scheduling
nm1.nodeHeartbeat(true);
RMAppAttempt attempt1 = app1.getCurrentAppAttempt();
MockAM am1 = rm.sendAMLaunched(attempt1.getAppAttemptId());
am1.registerAppAttempt();
SchedulerNodeReport report_nm1 = rm.getResourceScheduler().getNodeReport(
nm1.getNodeId());
int checkAlloc =
conf.getInt(YarnConfiguration.RM_SCHEDULER_MINIMUM_ALLOCATION_MB,
YarnConfiguration.DEFAULT_RM_SCHEDULER_MINIMUM_ALLOCATION_MB);
Assert.assertEquals(checkAlloc, report_nm1.getUsedResource().getMemory());
rm.stop();
}
@Test
public void testValidPathsDirHandlerService() throws Exception {
Configuration conf = new YarnConfiguration();
String localDir1 = new File("file:///" + testDir, "localDir1").getPath();
String localDir2 = new File("hdfs:///" + testDir, "localDir2").getPath();
conf.set(YarnConfiguration.NM_LOCAL_DIRS, localDir1 + "," + localDir2);
String logDir1 = new File("file:///" + testDir, "logDir1").getPath();
conf.set(YarnConfiguration.NM_LOG_DIRS, logDir1);
LocalDirsHandlerService dirSvc = new LocalDirsHandlerService();
try {
dirSvc.init(conf);
Assert.fail("Service should have thrown an exception due to wrong URI");
} catch (YarnRuntimeException e) {
}
Assert.assertEquals("Service should not be inited",
STATE.STOPPED,
dirSvc.getServiceState());
dirSvc.close();
}
/**
* If the localPath does not exists, getActualPath should get to one level
* down
*/
@Test
public void testGetActualPath() throws Exception {
Configuration conf = new Configuration();
conf.setBoolean(YarnConfiguration.SHARED_CACHE_ENABLED, true);
LocalResource resource = mock(LocalResource.class);
// give public visibility
when(resource.getVisibility()).thenReturn(LocalResourceVisibility.PUBLIC);
Path localPath = new Path("foo.jar");
String user = "joe";
SCMUploaderProtocol scmClient = mock(SCMUploaderProtocol.class);
FileSystem fs = mock(FileSystem.class);
FileSystem localFs = mock(FileSystem.class);
// stub it to return a status that indicates a directory
FileStatus status = mock(FileStatus.class);
when(status.isDirectory()).thenReturn(true);
when(localFs.getFileStatus(localPath)).thenReturn(status);
SharedCacheUploader spied =
createSpiedUploader(resource, localPath, user, conf, scmClient, fs,
localFs);
Path actualPath = spied.getActualPath();
assertEquals(actualPath.getName(), localPath.getName());
assertEquals(actualPath.getParent().getName(), localPath.getName());
}
@Test
public void testNMExpiryAndHeartbeatIntervalsValidation() throws Exception {
Configuration conf = new YarnConfiguration();
conf.setLong(YarnConfiguration.RM_NM_EXPIRY_INTERVAL_MS, 1000);
conf.setLong(YarnConfiguration.RM_NM_HEARTBEAT_INTERVAL_MS, 1001);
resourceManager = new ResourceManager();;
try {
resourceManager.init(conf);
} catch (YarnRuntimeException e) {
// Exception is expected.
if (!e.getMessage().startsWith("Nodemanager expiry interval should be no"
+ " less than heartbeat interval")) {
throw e;
}
}
}
@Test
public void testDistroDefaultsOverwrite() throws Exception {
assumeNonMaprProfile();
YarnController controller = Mockito.mock(YarnController.class);
YarnService yarnService = new YarnService(new TestListener(), controller, Mockito.mock(NodeProvider.class));
Cluster myCluster = createCluster();
List<Property> props = myCluster.getClusterConfig().getSubPropertyList();
props.add(new Property(YarnDefaultsConfigurator.SPILL_PATH, "/abc/bcd"));
props.add(new Property(YarnDefaultsConfigurator.JAVA_LOGIN, "/abc/bcd/login.conf"));
myCluster.getClusterConfig().setDistroType(DistroType.HDP).setIsSecure(true);
YarnConfiguration myYarnConfig = new YarnConfiguration();
yarnService.updateYarnConfiguration(myCluster, myYarnConfig);
assertEquals("/abc/bcd/login.conf", myYarnConfig.get(YarnDefaultsConfigurator.JAVA_LOGIN));
assertNull(myYarnConfig.get(YarnDefaultsConfigurator.ZK_SASL_CLIENT));
assertNull(myYarnConfig.get(YarnDefaultsConfigurator.ZK_SASL_CLIENT_CONFIG));
assertNull(myYarnConfig.get(YarnDefaultsConfigurator.ZK_SASL_PROVIDER));
assertEquals("/abc/bcd", myYarnConfig.get(YarnDefaultsConfigurator.SPILL_PATH));
}
/**
* Test validateAndCreateResourceRequest fails on recovery, app should ignore
* this Exception and continue
*/
@Test (timeout = 30000)
public void testAppFailToValidateResourceRequestOnRecovery() throws Exception{
MemoryRMStateStore memStore = new MemoryRMStateStore();
memStore.init(conf);
rm1 = new MockRM(conf, memStore);
rm1.start();
MockNM nm1 =
new MockNM("127.0.0.1:1234", 8192, rm1.getResourceTrackerService());
nm1.registerNode();
RMApp app1 = rm1.submitApp(200);
MockAM am1 = MockRM.launchAndRegisterAM(app1, rm1, nm1);
// Change the config so that validateAndCreateResourceRequest throws
// exception on recovery
conf.setInt(YarnConfiguration.RM_SCHEDULER_MINIMUM_ALLOCATION_MB, 50);
conf.setInt(YarnConfiguration.RM_SCHEDULER_MAXIMUM_ALLOCATION_MB, 100);
rm2 = new MockRM(conf, memStore);
nm1.setResourceTrackerService(rm2.getResourceTrackerService());
rm2.start();
}
@Override
protected synchronized void startInternal() throws Exception {
// create filesystem only now, as part of service-start. By this time, RM is
// authenticated with kerberos so we are good to create a file-system
// handle.
Configuration conf = new Configuration(getConfig());
conf.setBoolean("dfs.client.retry.policy.enabled", true);
String retryPolicy =
conf.get(YarnConfiguration.FS_RM_STATE_STORE_RETRY_POLICY_SPEC,
YarnConfiguration.DEFAULT_FS_RM_STATE_STORE_RETRY_POLICY_SPEC);
conf.set("dfs.client.retry.policy.spec", retryPolicy);
fs = fsWorkingPath.getFileSystem(conf);
mkdirsWithRetries(rmDTSecretManagerRoot);
mkdirsWithRetries(rmAppRoot);
mkdirsWithRetries(amrmTokenSecretManagerRoot);
}
@Test
public void testExplicitFailover()
throws YarnException, InterruptedException, IOException {
conf.setBoolean(YarnConfiguration.AUTO_FAILOVER_ENABLED, false);
cluster.init(conf);
cluster.start();
getAdminService(0).transitionToActive(req);
assertFalse("RM never turned active", -1 == cluster.getActiveRMIndex());
verifyConnections();
explicitFailover();
verifyConnections();
explicitFailover();
verifyConnections();
}
@Before
public void startUp() {
Configuration conf = new Configuration();
conf.set(YarnConfiguration.SCM_STORE_CLASS,
InMemorySCMStore.class.getName());
conf.set(YarnConfiguration.SHARED_CACHE_ROOT, testDir.getPath());
AppChecker appChecker = spy(new DummyAppChecker());
store = new InMemorySCMStore(appChecker);
store.init(conf);
store.start();
service = new SharedCacheUploaderService(store);
service.init(conf);
service.start();
YarnRPC rpc = YarnRPC.create(new Configuration());
InetSocketAddress scmAddress =
conf.getSocketAddr(YarnConfiguration.SCM_UPLOADER_SERVER_ADDRESS,
YarnConfiguration.DEFAULT_SCM_UPLOADER_SERVER_ADDRESS,
YarnConfiguration.DEFAULT_SCM_UPLOADER_SERVER_PORT);
proxy =
(SCMUploaderProtocol) rpc.getProxy(
SCMUploaderProtocol.class, scmAddress, conf);
}
@Test
public void testYarnACLsEnabledForDomain() throws Exception {
Configuration conf = new YarnConfiguration();
conf.setBoolean(YarnConfiguration.YARN_ACL_ENABLE, true);
conf.set(YarnConfiguration.YARN_ADMIN_ACL, "admin");
TimelineACLsManager timelineACLsManager =
new TimelineACLsManager(conf);
TimelineDomain domain = new TimelineDomain();
domain.setOwner("owner");
Assert.assertTrue(
"Owner should be allowed to access",
timelineACLsManager.checkAccess(
UserGroupInformation.createRemoteUser("owner"), domain));
Assert.assertFalse(
"Other shouldn't be allowed to access",
timelineACLsManager.checkAccess(
UserGroupInformation.createRemoteUser("other"), domain));
Assert.assertTrue(
"Admin should be allowed to access",
timelineACLsManager.checkAccess(
UserGroupInformation.createRemoteUser("admin"), domain));
}
public static void populateYarnSecureConfigurations(Configuration conf, String principal, String keytab) {
conf.set(CommonConfigurationKeysPublic.HADOOP_SECURITY_AUTHENTICATION, "kerberos");
conf.set(CommonConfigurationKeysPublic.HADOOP_SECURITY_AUTHORIZATION, "true");
conf.set(YarnConfiguration.RM_KEYTAB, keytab);
conf.set(YarnConfiguration.RM_PRINCIPAL, principal);
conf.set(YarnConfiguration.NM_KEYTAB, keytab);
conf.set(YarnConfiguration.NM_PRINCIPAL, principal);
conf.set(YarnConfiguration.RM_WEBAPP_SPNEGO_USER_NAME_KEY, principal);
conf.set(YarnConfiguration.RM_WEBAPP_SPNEGO_KEYTAB_FILE_KEY, keytab);
conf.set(YarnConfiguration.NM_WEBAPP_SPNEGO_USER_NAME_KEY, principal);
conf.set(YarnConfiguration.NM_WEBAPP_SPNEGO_KEYTAB_FILE_KEY, keytab);
conf.set("hadoop.security.auth_to_local", "RULE:[1:$1] RULE:[2:$1]");
}
@Override
public void init(Configuration configuration, RMProxy<T> rmProxy,
Class<T> protocol) {
this.rmProxy = rmProxy;
this.protocol = protocol;
this.rmProxy.checkAllowedProtocols(this.protocol);
this.conf = new YarnConfiguration(configuration);
Collection<String> rmIds = HAUtil.getRMHAIds(conf);
this.rmServiceIds = rmIds.toArray(new String[rmIds.size()]);
conf.set(YarnConfiguration.RM_HA_ID, rmServiceIds[currentProxyIndex]);
conf.setInt(CommonConfigurationKeysPublic.IPC_CLIENT_CONNECT_MAX_RETRIES_KEY,
conf.getInt(YarnConfiguration.CLIENT_FAILOVER_RETRIES,
YarnConfiguration.DEFAULT_CLIENT_FAILOVER_RETRIES));
conf.setInt(CommonConfigurationKeysPublic.
IPC_CLIENT_CONNECT_MAX_RETRIES_ON_SOCKET_TIMEOUTS_KEY,
conf.getInt(YarnConfiguration.CLIENT_FAILOVER_RETRIES_ON_SOCKET_TIMEOUTS,
YarnConfiguration.DEFAULT_CLIENT_FAILOVER_RETRIES_ON_SOCKET_TIMEOUTS));
}
private void setupAppMasterEnv(Map<String, String> appMasterEnv) {
StringBuilder classPathEnv = new StringBuilder();
classPathEnv.append(Environment.CLASSPATH.$()).append(File.pathSeparatorChar);
classPathEnv.append("./*");
for (String c : conf.getStrings(
YarnConfiguration.YARN_APPLICATION_CLASSPATH,
YarnConfiguration.DEFAULT_YARN_APPLICATION_CLASSPATH)) {
classPathEnv.append(File.pathSeparatorChar);
classPathEnv.append(c.trim());
}
String envStr = classPathEnv.toString();
LOG.info("env: " + envStr);
appMasterEnv.put(Environment.CLASSPATH.name(), envStr);
}
private YarnConfiguration createNMConfig() {
YarnConfiguration conf = new YarnConfiguration();
conf.setInt(YarnConfiguration.NM_PMEM_MB, 5*1024); // 5GB
conf.set(YarnConfiguration.NM_ADDRESS, "127.0.0.1:12345");
conf.set(YarnConfiguration.NM_LOCALIZER_ADDRESS, "127.0.0.1:12346");
conf.set(YarnConfiguration.NM_LOG_DIRS, logsDir.getAbsolutePath());
conf.set(YarnConfiguration.NM_REMOTE_APP_LOG_DIR,
remoteLogsDir.getAbsolutePath());
conf.set(YarnConfiguration.NM_LOCAL_DIRS, nmLocalDir.getAbsolutePath());
conf.setLong(YarnConfiguration.NM_LOG_RETAIN_SECONDS, 1);
return conf;
}
@Test
public void testNodeRegistrationVersionLessThanRM() throws Exception {
writeToHostsFile("host2");
Configuration conf = new Configuration();
conf.set(YarnConfiguration.RM_NODES_INCLUDE_FILE_PATH, hostFile
.getAbsolutePath());
conf.set(YarnConfiguration.RM_NODEMANAGER_MINIMUM_VERSION,"EqualToRM" );
rm = new MockRM(conf);
rm.start();
String nmVersion = "1.9.9";
ResourceTrackerService resourceTrackerService = rm.getResourceTrackerService();
RegisterNodeManagerRequest req = Records.newRecord(
RegisterNodeManagerRequest.class);
NodeId nodeId = NodeId.newInstance("host2", 1234);
Resource capability = BuilderUtils.newResource(1024, 1, 1);
req.setResource(capability);
req.setNodeId(nodeId);
req.setHttpPort(1234);
req.setNMVersion(nmVersion);
// trying to register a invalid node.
RegisterNodeManagerResponse response = resourceTrackerService.registerNodeManager(req);
Assert.assertEquals(NodeAction.SHUTDOWN,response.getNodeAction());
Assert.assertTrue("Diagnostic message did not contain: 'Disallowed NodeManager " +
"Version "+ nmVersion + ", is less than the minimum version'",
response.getDiagnosticsMessage().contains("Disallowed NodeManager Version " +
nmVersion + ", is less than the minimum version "));
}
private ApplicationReport getFinishedApplicationReport() {
ApplicationId appId = ApplicationId.newInstance(1234, 5);
ApplicationAttemptId attemptId = ApplicationAttemptId.newInstance(
appId, 0);
return ApplicationReport.newInstance(appId, attemptId, "user", "queue",
"appname", "host", 124, null, YarnApplicationState.FINISHED,
"diagnostics", "url", 0, 0, FinalApplicationStatus.SUCCEEDED, null,
"N/A", 0.0f, YarnConfiguration.DEFAULT_APPLICATION_TYPE, null);
}
/**
* Alternative to start() that explicitly sets sessionAppId and doesn't start a new AM.
* The caller of getClient is responsible for initializing the new TezClient with a
* Configuration compatible with the existing AM. It is expected the caller has cached the
* original Configuration (e.g. in Zookeeper).
*
* In contrast to "start", no resources are localized. It is the responsibility of the caller to
* ensure that existing localized resources and staging dirs are still valid.
*
* @param appId
* @return 'this' just as a convenience for fluent style chaining
*/
public synchronized TezClient getClient(ApplicationId appId) throws TezException, IOException {
sessionAppId = appId;
amConfig.setYarnConfiguration(new YarnConfiguration(amConfig.getTezConfiguration()));
startFrameworkClient();
setupJavaOptsChecker();
if (!isSession) {
String msg = "Must be in session mode to bind TezClient to existing AM";
LOG.error(msg);
throw new IllegalStateException(msg);
}
LOG.info("Session mode. Reconnecting to session: " + sessionAppId.toString());
clientTimeout = amConfig.getTezConfiguration().getInt(
TezConfiguration.TEZ_SESSION_CLIENT_TIMEOUT_SECS,
TezConfiguration.TEZ_SESSION_CLIENT_TIMEOUT_SECS_DEFAULT);
try {
setupApplicationContext();
ApplicationReport appReport = frameworkClient.getApplicationReport(sessionAppId);
LOG.info("The url to track the Tez Session: " + appReport.getTrackingUrl());
sessionStarted.set(true);
} catch (YarnException e) {
throw new TezException(e);
}
startClientHeartbeat();
this.stagingFs = FileSystem.get(amConfig.getTezConfiguration());
return this;
}
/**
* Get the RM webapp address. The configuration that is passed in should not be used by other threads while this
* method is executing.
* @param conf The configuration
* @param sslEnabled Whether SSL is enabled or not
* @param rmId If HA is enabled the resource manager id
* @return The webapp socket address
*/
public static InetSocketAddress getRMWebAddress(Configuration conf, boolean sslEnabled, String rmId)
{
boolean isHA = (rmId != null);
if (isHA) {
conf = getYarnConfiguration(conf);
conf.set(ConfigUtils.RM_HA_ID, rmId);
}
InetSocketAddress address;
if (sslEnabled) {
address = conf.getSocketAddr(YarnConfiguration.RM_WEBAPP_HTTPS_ADDRESS, YarnConfiguration.DEFAULT_RM_WEBAPP_HTTPS_ADDRESS, YarnConfiguration.DEFAULT_RM_WEBAPP_HTTPS_PORT);
} else {
address = conf.getSocketAddr(YarnConfiguration.RM_WEBAPP_ADDRESS, YarnConfiguration.DEFAULT_RM_WEBAPP_ADDRESS, YarnConfiguration.DEFAULT_RM_WEBAPP_PORT);
}
if (isHA) {
conf.unset(ConfigUtils.RM_HA_ID);
}
LOG.info("rm webapp address setting {}", address);
LOG.debug("rm setting sources {}", conf.getPropertySources(YarnConfiguration.RM_WEBAPP_ADDRESS));
InetSocketAddress resolvedSocketAddress = NetUtils.getConnectAddress(address);
InetAddress resolved = resolvedSocketAddress.getAddress();
if (resolved == null || resolved.isAnyLocalAddress() || resolved.isLoopbackAddress()) {
try {
resolvedSocketAddress = InetSocketAddress.createUnresolved(InetAddress.getLocalHost().getCanonicalHostName(), address.getPort());
} catch (UnknownHostException e) {
//Ignore and fallback.
}
}
return resolvedSocketAddress;
}
private MutableGaugeInt[] buildBuckets(Configuration conf) {
ArrayList<Integer> buckets =
parseInts(conf.get(YarnConfiguration.RM_METRICS_RUNTIME_BUCKETS,
YarnConfiguration.DEFAULT_RM_METRICS_RUNTIME_BUCKETS));
MutableGaugeInt[] result = new MutableGaugeInt[buckets.size() + 1];
result[0] = registry.newGauge("running_0", "", 0);
long[] cuts = new long[buckets.size()];
for(int i=0; i < buckets.size(); ++i) {
result[i+1] = registry.newGauge("running_" + buckets.get(i), "", 0);
cuts[i] = buckets.get(i) * 1000L * 60; // covert from min to ms
}
this.runBuckets = new TimeBucketMetrics<ApplicationId>(cuts);
return result;
}
private void startRM() throws IOException, ClassNotFoundException {
Configuration rmConf = new YarnConfiguration();
String schedulerClass = rmConf.get(YarnConfiguration.RM_SCHEDULER);
rmConf.set(SLSConfiguration.RM_SCHEDULER, schedulerClass);
rmConf.set(YarnConfiguration.RM_SCHEDULER,
ResourceSchedulerWrapper.class.getName());
rmConf.set(SLSConfiguration.METRICS_OUTPUT_DIR, metricsOutputDir);
rm = new ResourceManager();
rm.init(rmConf);
rm.start();
}
public HistoryFileWriter(Path historyFile) throws IOException {
if (fs.exists(historyFile)) {
fsdos = fs.append(historyFile);
} else {
fsdos = fs.create(historyFile);
}
fs.setPermission(historyFile, HISTORY_FILE_UMASK);
writer =
new TFile.Writer(fsdos, MIN_BLOCK_SIZE, getConfig().get(
YarnConfiguration.FS_APPLICATION_HISTORY_STORE_COMPRESSION_TYPE,
YarnConfiguration.DEFAULT_FS_APPLICATION_HISTORY_STORE_COMPRESSION_TYPE), null,
getConfig());
}
private void startCluster(Configuration conf) throws Exception {
if (System.getProperty("hadoop.log.dir") == null) {
System.setProperty("hadoop.log.dir", "target/test-dir");
}
conf.set("dfs.block.access.token.enable", "false");
conf.set("dfs.permissions", "true");
conf.set("hadoop.security.authentication", "simple");
String cp = conf.get(YarnConfiguration.YARN_APPLICATION_CLASSPATH,
StringUtils.join(",",
YarnConfiguration.DEFAULT_YARN_CROSS_PLATFORM_APPLICATION_CLASSPATH))
+ File.pathSeparator + classpathDir;
conf.set(YarnConfiguration.YARN_APPLICATION_CLASSPATH, cp);
dfsCluster = new MiniDFSCluster.Builder(conf).build();
FileSystem fileSystem = dfsCluster.getFileSystem();
fileSystem.mkdirs(new Path("/tmp"));
fileSystem.mkdirs(new Path("/user"));
fileSystem.mkdirs(new Path("/hadoop/mapred/system"));
fileSystem.setPermission(
new Path("/tmp"), FsPermission.valueOf("-rwxrwxrwx"));
fileSystem.setPermission(
new Path("/user"), FsPermission.valueOf("-rwxrwxrwx"));
fileSystem.setPermission(
new Path("/hadoop/mapred/system"), FsPermission.valueOf("-rwx------"));
FileSystem.setDefaultUri(conf, fileSystem.getUri());
mrCluster = MiniMRClientClusterFactory.create(this.getClass(), 1, conf);
// so the minicluster conf is avail to the containers.
Writer writer = new FileWriter(classpathDir + "/core-site.xml");
mrCluster.getConfig().writeXml(writer);
writer.close();
}
private void prepareTimelineDomain() {
TimelineClient timelineClient = null;
if (conf.getBoolean(YarnConfiguration.TIMELINE_SERVICE_ENABLED,
YarnConfiguration.DEFAULT_TIMELINE_SERVICE_ENABLED)) {
timelineClient = TimelineClient.createTimelineClient();
timelineClient.init(conf);
timelineClient.start();
} else {
LOG.warn("Cannot put the domain " + domainId +
" because the timeline service is not enabled");
return;
}
try {
//TODO: we need to check and combine the existing timeline domain ACLs,
//but let's do it once we have client java library to query domains.
TimelineDomain domain = new TimelineDomain();
domain.setId(domainId);
domain.setReaders(
viewACLs != null && viewACLs.length() > 0 ? viewACLs : " ");
domain.setWriters(
modifyACLs != null && modifyACLs.length() > 0 ? modifyACLs : " ");
timelineClient.putDomain(domain);
LOG.info("Put the timeline domain: " +
TimelineUtils.dumpTimelineRecordtoJSON(domain));
} catch (Exception e) {
LOG.error("Error when putting the timeline domain", e);
} finally {
timelineClient.stop();
}
}
public static String getRawContainerLogsUrl(YarnConfiguration conf, String nodeHttpAddress, String appId, String containerId)
{
String logDirs = conf.get(YarnConfiguration.NM_LOG_DIRS);
if (logDirs.startsWith("${yarn.log.dir}")) {
return ConfigUtils.getSchemePrefix(conf) + nodeHttpAddress + "/logs" + logDirs.substring("${yarn.log.dir}".length()) + "/" + appId + "/" + containerId;
} else {
try {
String logDirsPath = new File(logDirs).getCanonicalPath();
String yarnLogDirPath = new File(getYarnLogDir()).getCanonicalPath();
if (logDirsPath.startsWith(yarnLogDirPath)) {
return ConfigUtils.getSchemePrefix(conf) + nodeHttpAddress + "/logs" + logDirsPath
.substring(yarnLogDirPath.length()) + "/" + appId + "/" + containerId;
} else {
if (!rawContainerLogWarningPrinted) {
LOG.warn("Cannot determine the location of container logs because of incompatible node manager log location ({}) and yarn log location ({})",
logDirsPath, yarnLogDirPath);
rawContainerLogWarningPrinted = true;
}
}
} catch (Exception ex) {
if (!rawContainerLogWarningPrinted) {
LOG.warn("Cannot determine the location of container logs because of error: ", ex);
rawContainerLogWarningPrinted = true;
}
}
}
return null;
}
public RMApp submitApp(int masterMemory,
LogAggregationContext logAggregationContext) throws Exception {
return submitApp(masterMemory, "", UserGroupInformation.getCurrentUser()
.getShortUserName(), null, false, null,
super.getConfig().getInt(YarnConfiguration.RM_AM_MAX_ATTEMPTS,
YarnConfiguration.DEFAULT_RM_AM_MAX_ATTEMPTS), null, null, true, false,
false, null, 0, logAggregationContext, true);
}
@Override
public void init(HiWayConfiguration conf_, FileSystem hdfs_, int containerMemory_, Map<String, Integer> customMemoryMap_, int containerCores_,
int requestPriority_) {
super.init(conf_, hdfs_, containerMemory_, customMemoryMap_, containerCores_, requestPriority_);
maxMem = conf.getInt(YarnConfiguration.RM_SCHEDULER_MAXIMUM_ALLOCATION_MB, YarnConfiguration.DEFAULT_RM_SCHEDULER_MAXIMUM_ALLOCATION_MB);
maxCores = conf.getInt(YarnConfiguration.RM_SCHEDULER_MAXIMUM_ALLOCATION_VCORES, YarnConfiguration.DEFAULT_RM_SCHEDULER_MAXIMUM_ALLOCATION_VCORES);
}