类org.apache.hadoop.mapreduce.v2.jobhistory.JHAdminConfig源码实例Demo

下面列出了怎么用org.apache.hadoop.mapreduce.v2.jobhistory.JHAdminConfig的API类实例代码及写法,或者点击链接到github查看源代码。

源代码1 项目: spydra   文件: HistoryLogUtilsTest.java
@Test
public void testGenerateHadoopConfig() throws Exception {
  Configuration cfg = HistoryLogUtils.generateHadoopConfig(DUMMY_CLIENT_ID,
      DUMMY_USER_NAME, DUMMY_BUCKET);

  // We are asserting that the properties involving substitution have been changed
  checkPropertySubstitution(this.configWithoutSubstitute, cfg,
      YarnConfiguration.NM_REMOTE_APP_LOG_DIR,
      "gs://" + DUMMY_BUCKET + "/logs/such-client");

  checkPropertySubstitution(this.configWithoutSubstitute, cfg,
      JHAdminConfig.MR_HISTORY_INTERMEDIATE_DONE_DIR,
      "gs://" + DUMMY_BUCKET + "/history/such-client/done-intermediate");

  checkPropertySubstitution(this.configWithoutSubstitute, cfg,
      JHAdminConfig.MR_HISTORY_DONE_DIR,
      "gs://" + DUMMY_BUCKET + "/history/such-client/done");

  // Some additional guards to check whether we accidentally load additional config
  assertEquals("Sizes of configuration must not differ. Except for the user, client-id and bucket properties",
      cfg.size(), this.configWithoutSubstitute.size() + 3);
}
 
源代码2 项目: hadoop   文件: TestJobHistoryEventHandler.java
@Test
public void testGetHistoryIntermediateDoneDirForUser() throws IOException {
  // Test relative path
  Configuration conf = new Configuration();
  conf.set(JHAdminConfig.MR_HISTORY_INTERMEDIATE_DONE_DIR,
      "/mapred/history/done_intermediate");
  conf.set(MRJobConfig.USER_NAME, System.getProperty("user.name"));
  String pathStr = JobHistoryUtils.getHistoryIntermediateDoneDirForUser(conf);
  Assert.assertEquals("/mapred/history/done_intermediate/" +
      System.getProperty("user.name"), pathStr);

  // Test fully qualified path
  // Create default configuration pointing to the minicluster
  conf.set(CommonConfigurationKeysPublic.FS_DEFAULT_NAME_KEY,
      dfsCluster.getURI().toString());
  FileOutputStream os = new FileOutputStream(coreSitePath);
  conf.writeXml(os);
  os.close();
  // Simulate execution under a non-default namenode
  conf.set(CommonConfigurationKeysPublic.FS_DEFAULT_NAME_KEY,
          "file:///");
  pathStr = JobHistoryUtils.getHistoryIntermediateDoneDirForUser(conf);
  Assert.assertEquals(dfsCluster.getURI().toString() +
      "/mapred/history/done_intermediate/" + System.getProperty("user.name"),
      pathStr);
}
 
源代码3 项目: hadoop   文件: ClientCache.java
protected MRClientProtocol instantiateHistoryProxy()
    throws IOException {
  final String serviceAddr = conf.get(JHAdminConfig.MR_HISTORY_ADDRESS);
  if (StringUtils.isEmpty(serviceAddr)) {
    return null;
  }
  LOG.debug("Connecting to HistoryServer at: " + serviceAddr);
  final YarnRPC rpc = YarnRPC.create(conf);
  LOG.debug("Connected to HistoryServer at: " + serviceAddr);
  UserGroupInformation currentUser = UserGroupInformation.getCurrentUser();
  return currentUser.doAs(new PrivilegedAction<MRClientProtocol>() {
    @Override
    public MRClientProtocol run() {
      return (MRClientProtocol) rpc.getProxy(HSClientProtocol.class,
          NetUtils.createSocketAddr(serviceAddr), conf);
    }
  });
}
 
源代码4 项目: hadoop   文件: TestJobCleanup.java
@BeforeClass
public static void setUp() throws IOException {
  JobConf conf = new JobConf();
  fileSys = FileSystem.get(conf);
  fileSys.delete(new Path(TEST_ROOT_DIR), true);
  conf.set("mapred.job.tracker.handler.count", "1");
  conf.set("mapred.job.tracker", "127.0.0.1:0");
  conf.set("mapred.job.tracker.http.address", "127.0.0.1:0");
  conf.set("mapred.task.tracker.http.address", "127.0.0.1:0");
  conf.set(JHAdminConfig.MR_HISTORY_INTERMEDIATE_DONE_DIR, TEST_ROOT_DIR +
    "/intermediate");
  conf.set(org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter
    .SUCCESSFUL_JOB_OUTPUT_DIR_MARKER, "true");

  mr = new MiniMRCluster(1, "file:///", 1, null, null, conf);
  inDir = new Path(TEST_ROOT_DIR, "test-input");
  String input = "The quick brown fox\n" + "has many silly\n"
      + "red fox sox\n";
  DataOutputStream file = fileSys.create(new Path(inDir, "part-" + 0));
  file.writeBytes(input);
  file.close();
  emptyInDir = new Path(TEST_ROOT_DIR, "empty-input");
  fileSys.mkdirs(emptyInDir);
}
 
源代码5 项目: hadoop   文件: ClientHSSecurityInfo.java
@Override
public KerberosInfo getKerberosInfo(Class<?> protocol, Configuration conf) {
  if (!protocol
      .equals(HSClientProtocolPB.class)) {
    return null;
  }
  return new KerberosInfo() {

    @Override
    public Class<? extends Annotation> annotationType() {
      return null;
    }

    @Override
    public String serverPrincipal() {
      return JHAdminConfig.MR_HISTORY_PRINCIPAL;
    }

    @Override
    public String clientPrincipal() {
      return null;
    }
  };
}
 
源代码6 项目: hadoop   文件: HistoryClientService.java
@VisibleForTesting
protected void initializeWebApp(Configuration conf) {
  webApp = new HsWebApp(history);
  InetSocketAddress bindAddress = MRWebAppUtil.getJHSWebBindAddress(conf);
  // NOTE: there should be a .at(InetSocketAddress)
  WebApps
      .$for("jobhistory", HistoryClientService.class, this, "ws")
      .with(conf)
      .withHttpSpnegoKeytabKey(
          JHAdminConfig.MR_WEBAPP_SPNEGO_KEYTAB_FILE_KEY)
      .withHttpSpnegoPrincipalKey(
          JHAdminConfig.MR_WEBAPP_SPNEGO_USER_NAME_KEY)
      .at(NetUtils.getHostPortString(bindAddress)).start(webApp);
  
  String connectHost = MRWebAppUtil.getJHSWebappURLWithoutScheme(conf).split(":")[0];
  MRWebAppUtil.setJHSWebappURLWithoutScheme(conf,
      connectHost + ":" + webApp.getListenerAddress().getPort());
}
 
/**
 * Constructs an instance of the configured storage class
 * 
 * @param conf the configuration
 * @return the state storage instance
 */
public static HistoryServerStateStoreService getStore(Configuration conf) {
  Class<? extends HistoryServerStateStoreService> storeClass =
      HistoryServerNullStateStoreService.class;
  boolean recoveryEnabled = conf.getBoolean(
      JHAdminConfig.MR_HS_RECOVERY_ENABLE,
      JHAdminConfig.DEFAULT_MR_HS_RECOVERY_ENABLE);
  if (recoveryEnabled) {
    storeClass = conf.getClass(JHAdminConfig.MR_HS_STATE_STORE, null,
        HistoryServerStateStoreService.class);
    if (storeClass == null) {
      throw new RuntimeException("Unable to locate storage class, check "
          + JHAdminConfig.MR_HS_STATE_STORE);
    }
  }
  return ReflectionUtils.newInstance(storeClass, conf);
}
 
源代码8 项目: hadoop   文件: HSAdmin.java
private int refreshUserToGroupsMappings() throws IOException {
  // Get the current configuration
  Configuration conf = getConf();

  InetSocketAddress address = conf.getSocketAddr(
      JHAdminConfig.JHS_ADMIN_ADDRESS,
      JHAdminConfig.DEFAULT_JHS_ADMIN_ADDRESS,
      JHAdminConfig.DEFAULT_JHS_ADMIN_PORT);

  RefreshUserMappingsProtocol refreshProtocol = HSProxies.createProxy(conf,
      address, RefreshUserMappingsProtocol.class,
      UserGroupInformation.getCurrentUser());
  // Refresh the user-to-groups mappings
  refreshProtocol.refreshUserToGroupsMappings();

  return 0;
}
 
源代码9 项目: hadoop   文件: HSAdmin.java
private int refreshSuperUserGroupsConfiguration() throws IOException {
  // Refresh the super-user groups
  Configuration conf = getConf();
  InetSocketAddress address = conf.getSocketAddr(
      JHAdminConfig.JHS_ADMIN_ADDRESS,
      JHAdminConfig.DEFAULT_JHS_ADMIN_ADDRESS,
      JHAdminConfig.DEFAULT_JHS_ADMIN_PORT);

  RefreshUserMappingsProtocol refreshProtocol = HSProxies.createProxy(conf,
      address, RefreshUserMappingsProtocol.class,
      UserGroupInformation.getCurrentUser());
  // Refresh the super-user group mappings
  refreshProtocol.refreshSuperUserGroupsConfiguration();

  return 0;
}
 
源代码10 项目: big-c   文件: HSAdmin.java
private int refreshSuperUserGroupsConfiguration() throws IOException {
  // Refresh the super-user groups
  Configuration conf = getConf();
  InetSocketAddress address = conf.getSocketAddr(
      JHAdminConfig.JHS_ADMIN_ADDRESS,
      JHAdminConfig.DEFAULT_JHS_ADMIN_ADDRESS,
      JHAdminConfig.DEFAULT_JHS_ADMIN_PORT);

  RefreshUserMappingsProtocol refreshProtocol = HSProxies.createProxy(conf,
      address, RefreshUserMappingsProtocol.class,
      UserGroupInformation.getCurrentUser());
  // Refresh the super-user group mappings
  refreshProtocol.refreshSuperUserGroupsConfiguration();

  return 0;
}
 
源代码11 项目: incubator-tajo   文件: MiniTajoYarnCluster.java
@Override
public synchronized void start() {
  try {
    if (!getConfig().getBoolean(
        JHAdminConfig.MR_HISTORY_MINICLUSTER_FIXED_PORTS,
        JHAdminConfig.DEFAULT_MR_HISTORY_MINICLUSTER_FIXED_PORTS)) {
      // pick free random ports.
      getConfig().set(JHAdminConfig.MR_HISTORY_ADDRESS,
          MiniYARNCluster.getHostname() + ":0");
      getConfig().set(JHAdminConfig.MR_HISTORY_WEBAPP_ADDRESS,
          MiniYARNCluster.getHostname() + ":0");
    }
    super.start();
  } catch (Throwable t) {
    throw new YarnRuntimeException(t);
  }

  LOG.info("MiniMRYARN ResourceManager address: " +
      getConfig().get(YarnConfiguration.RM_ADDRESS));
  LOG.info("MiniMRYARN ResourceManager web address: " +
      getConfig().get(YarnConfiguration.RM_WEBAPP_ADDRESS));
  LOG.info("MiniMRYARN HistoryServer address: " +
      getConfig().get(JHAdminConfig.MR_HISTORY_ADDRESS));
  LOG.info("MiniMRYARN HistoryServer web address: " +
      getConfig().get(JHAdminConfig.MR_HISTORY_WEBAPP_ADDRESS));
}
 
源代码12 项目: big-c   文件: TestJobHistoryEventHandler.java
@Test
public void testGetHistoryIntermediateDoneDirForUser() throws IOException {
  // Test relative path
  Configuration conf = new Configuration();
  conf.set(JHAdminConfig.MR_HISTORY_INTERMEDIATE_DONE_DIR,
      "/mapred/history/done_intermediate");
  conf.set(MRJobConfig.USER_NAME, System.getProperty("user.name"));
  String pathStr = JobHistoryUtils.getHistoryIntermediateDoneDirForUser(conf);
  Assert.assertEquals("/mapred/history/done_intermediate/" +
      System.getProperty("user.name"), pathStr);

  // Test fully qualified path
  // Create default configuration pointing to the minicluster
  conf.set(CommonConfigurationKeysPublic.FS_DEFAULT_NAME_KEY,
      dfsCluster.getURI().toString());
  FileOutputStream os = new FileOutputStream(coreSitePath);
  conf.writeXml(os);
  os.close();
  // Simulate execution under a non-default namenode
  conf.set(CommonConfigurationKeysPublic.FS_DEFAULT_NAME_KEY,
          "file:///");
  pathStr = JobHistoryUtils.getHistoryIntermediateDoneDirForUser(conf);
  Assert.assertEquals(dfsCluster.getURI().toString() +
      "/mapred/history/done_intermediate/" + System.getProperty("user.name"),
      pathStr);
}
 
源代码13 项目: big-c   文件: ClientCache.java
protected MRClientProtocol instantiateHistoryProxy()
    throws IOException {
  final String serviceAddr = conf.get(JHAdminConfig.MR_HISTORY_ADDRESS);
  if (StringUtils.isEmpty(serviceAddr)) {
    return null;
  }
  LOG.debug("Connecting to HistoryServer at: " + serviceAddr);
  final YarnRPC rpc = YarnRPC.create(conf);
  LOG.debug("Connected to HistoryServer at: " + serviceAddr);
  UserGroupInformation currentUser = UserGroupInformation.getCurrentUser();
  return currentUser.doAs(new PrivilegedAction<MRClientProtocol>() {
    @Override
    public MRClientProtocol run() {
      return (MRClientProtocol) rpc.getProxy(HSClientProtocol.class,
          NetUtils.createSocketAddr(serviceAddr), conf);
    }
  });
}
 
/**
 * Constructs an instance of the configured storage class
 * 
 * @param conf the configuration
 * @return the state storage instance
 */
public static HistoryServerStateStoreService getStore(Configuration conf) {
  Class<? extends HistoryServerStateStoreService> storeClass =
      HistoryServerNullStateStoreService.class;
  boolean recoveryEnabled = conf.getBoolean(
      JHAdminConfig.MR_HS_RECOVERY_ENABLE,
      JHAdminConfig.DEFAULT_MR_HS_RECOVERY_ENABLE);
  if (recoveryEnabled) {
    storeClass = conf.getClass(JHAdminConfig.MR_HS_STATE_STORE, null,
        HistoryServerStateStoreService.class);
    if (storeClass == null) {
      throw new RuntimeException("Unable to locate storage class, check "
          + JHAdminConfig.MR_HS_STATE_STORE);
    }
  }
  return ReflectionUtils.newInstance(storeClass, conf);
}
 
源代码15 项目: big-c   文件: HSAdmin.java
private int refreshUserToGroupsMappings() throws IOException {
  // Get the current configuration
  Configuration conf = getConf();

  InetSocketAddress address = conf.getSocketAddr(
      JHAdminConfig.JHS_ADMIN_ADDRESS,
      JHAdminConfig.DEFAULT_JHS_ADMIN_ADDRESS,
      JHAdminConfig.DEFAULT_JHS_ADMIN_PORT);

  RefreshUserMappingsProtocol refreshProtocol = HSProxies.createProxy(conf,
      address, RefreshUserMappingsProtocol.class,
      UserGroupInformation.getCurrentUser());
  // Refresh the user-to-groups mappings
  refreshProtocol.refreshUserToGroupsMappings();

  return 0;
}
 
源代码16 项目: big-c   文件: HistoryClientService.java
@VisibleForTesting
protected void initializeWebApp(Configuration conf) {
  webApp = new HsWebApp(history);
  InetSocketAddress bindAddress = MRWebAppUtil.getJHSWebBindAddress(conf);
  // NOTE: there should be a .at(InetSocketAddress)
  WebApps
      .$for("jobhistory", HistoryClientService.class, this, "ws")
      .with(conf)
      .withHttpSpnegoKeytabKey(
          JHAdminConfig.MR_WEBAPP_SPNEGO_KEYTAB_FILE_KEY)
      .withHttpSpnegoPrincipalKey(
          JHAdminConfig.MR_WEBAPP_SPNEGO_USER_NAME_KEY)
      .at(NetUtils.getHostPortString(bindAddress)).start(webApp);
  
  String connectHost = MRWebAppUtil.getJHSWebappURLWithoutScheme(conf).split(":")[0];
  MRWebAppUtil.setJHSWebappURLWithoutScheme(conf,
      connectHost + ":" + webApp.getListenerAddress().getPort());
}
 
源代码17 项目: spydra   文件: HistoryLogUtils.java
/**
 * Starts a minimal JobHistoryServer.
 */
public static void startJhs(Configuration cfg) {
  try {
    JobHistoryServer jobHistoryServer = new JobHistoryServer();
    jobHistoryServer.init(cfg);
    logger.info(String.format(
        "Starting JobHistoryServer on: http://%s",
        cfg.get(JHAdminConfig.MR_HISTORY_WEBAPP_ADDRESS)));
    jobHistoryServer.start();
  } catch (Exception e) {
    logger.error("Error starting JobHistoryServer", e);
    System.exit(1);
  }
}
 
源代码18 项目: hadoop   文件: TestMRJobsWithHistoryService.java
private HSClientProtocol instantiateHistoryProxy() {
  final String serviceAddr =
      mrCluster.getConfig().get(JHAdminConfig.MR_HISTORY_ADDRESS);
  final YarnRPC rpc = YarnRPC.create(conf);
  HSClientProtocol historyClient =
      (HSClientProtocol) rpc.getProxy(HSClientProtocol.class,
          NetUtils.createSocketAddr(serviceAddr), mrCluster.getConfig());
  return historyClient;
}
 
源代码19 项目: hadoop   文件: MiniMRYarnCluster.java
public static String getResolvedMRHistoryWebAppURLWithoutScheme(
    Configuration conf, boolean isSSLEnabled) {
  InetSocketAddress address = null;
  if (isSSLEnabled) {
    address =
        conf.getSocketAddr(JHAdminConfig.MR_HISTORY_WEBAPP_HTTPS_ADDRESS,
            JHAdminConfig.DEFAULT_MR_HISTORY_WEBAPP_HTTPS_ADDRESS,
            JHAdminConfig.DEFAULT_MR_HISTORY_WEBAPP_HTTPS_PORT);
  } else {
    address =
        conf.getSocketAddr(JHAdminConfig.MR_HISTORY_WEBAPP_ADDRESS,
            JHAdminConfig.DEFAULT_MR_HISTORY_WEBAPP_ADDRESS,
            JHAdminConfig.DEFAULT_MR_HISTORY_WEBAPP_PORT);    }
  address = NetUtils.getConnectAddress(address);
  StringBuffer sb = new StringBuffer();
  InetAddress resolved = address.getAddress();
  if (resolved == null || resolved.isAnyLocalAddress() || 
      resolved.isLoopbackAddress()) {
    String lh = address.getHostName();
    try {
      lh = InetAddress.getLocalHost().getCanonicalHostName();
    } catch (UnknownHostException e) {
      //Ignore and fallback.
    }
    sb.append(lh);
  } else {
    sb.append(address.getHostName());
  }
  sb.append(":").append(address.getPort());
  return sb.toString();
}
 
源代码20 项目: big-c   文件: HSAdmin.java
private int refreshLogRetentionSettings() throws IOException {
  // Refresh log retention settings
  Configuration conf = getConf();
  InetSocketAddress address = conf.getSocketAddr(
      JHAdminConfig.JHS_ADMIN_ADDRESS,
      JHAdminConfig.DEFAULT_JHS_ADMIN_ADDRESS,
      JHAdminConfig.DEFAULT_JHS_ADMIN_PORT);

  HSAdminRefreshProtocol refreshProtocol = HSProxies.createProxy(conf,
      address, HSAdminRefreshProtocol.class,
      UserGroupInformation.getCurrentUser());

  refreshProtocol.refreshLogRetentionSettings();
  return 0;
}
 
源代码21 项目: hadoop   文件: MRWebAppUtil.java
public static void setJHSWebappURLWithoutScheme(Configuration conf,
    String hostAddress) {
  if (httpPolicyInJHS == Policy.HTTPS_ONLY) {
    conf.set(JHAdminConfig.MR_HISTORY_WEBAPP_HTTPS_ADDRESS, hostAddress);
  } else {
    conf.set(JHAdminConfig.MR_HISTORY_WEBAPP_ADDRESS, hostAddress);
  }
}
 
源代码22 项目: big-c   文件: HSAdmin.java
private int getGroups(String[] usernames) throws IOException {
  // Get groups users belongs to
  if (usernames.length == 0) {
    usernames = new String[] { UserGroupInformation.getCurrentUser()
        .getUserName() };
  }

  // Get the current configuration
  Configuration conf = getConf();

  InetSocketAddress address = conf.getSocketAddr(
      JHAdminConfig.JHS_ADMIN_ADDRESS,
      JHAdminConfig.DEFAULT_JHS_ADMIN_ADDRESS,
      JHAdminConfig.DEFAULT_JHS_ADMIN_PORT);

  GetUserMappingsProtocol getUserMappingProtocol = HSProxies.createProxy(
      conf, address, GetUserMappingsProtocol.class,
      UserGroupInformation.getCurrentUser());
  for (String username : usernames) {
    StringBuilder sb = new StringBuilder();
    sb.append(username + " :");
    for (String group : getUserMappingProtocol.getGroupsForUser(username)) {
      sb.append(" ");
      sb.append(group);
    }
    System.out.println(sb);
  }

  return 0;
}
 
源代码23 项目: big-c   文件: TestHistoryFileManager.java
private void testCreateHistoryDirs(Configuration conf, Clock clock)
    throws Exception {
  conf.set(JHAdminConfig.MR_HISTORY_DONE_DIR, "/" + UUID.randomUUID());
  conf.set(JHAdminConfig.MR_HISTORY_INTERMEDIATE_DONE_DIR, "/" + UUID.randomUUID());
  HistoryFileManager hfm = new HistoryFileManager();
  hfm.conf = conf;
  hfm.createHistoryDirs(clock, 500, 2000);
}
 
源代码24 项目: hadoop   文件: MRWebAppUtil.java
public static String getApplicationWebURLOnJHSWithoutScheme(Configuration conf,
    ApplicationId appId)
    throws UnknownHostException {
  //construct the history url for job
  String addr = getJHSWebappURLWithoutScheme(conf);
  Iterator<String> it = ADDR_SPLITTER.split(addr).iterator();
  it.next(); // ignore the bind host
  String port = it.next();
  // Use hs address to figure out the host for webapp
  addr = conf.get(JHAdminConfig.MR_HISTORY_ADDRESS,
      JHAdminConfig.DEFAULT_MR_HISTORY_ADDRESS);
  String host = ADDR_SPLITTER.split(addr).iterator().next();
  String hsAddress = JOINER.join(host, ":", port);
  InetSocketAddress address = NetUtils.createSocketAddr(
    hsAddress, getDefaultJHSWebappPort(),
    getDefaultJHSWebappURLWithoutScheme());
  StringBuffer sb = new StringBuffer();
  if (address.getAddress().isAnyLocalAddress() || 
      address.getAddress().isLoopbackAddress()) {
    sb.append(InetAddress.getLocalHost().getCanonicalHostName());
  } else {
    sb.append(address.getHostName());
  }
  sb.append(":").append(address.getPort());
  sb.append("/jobhistory/job/");
  JobID jobId = TypeConverter.fromYarn(appId);
  sb.append(jobId.toString());
  return sb.toString();
}
 
@Before
public void setup() {
  FileUtil.fullyDelete(testDir);
  testDir.mkdirs();
  conf = new Configuration();
  conf.setBoolean(JHAdminConfig.MR_HS_RECOVERY_ENABLE, true);
  conf.setClass(JHAdminConfig.MR_HS_STATE_STORE,
      HistoryServerLeveldbStateStoreService.class,
      HistoryServerStateStoreService.class);
  conf.set(JHAdminConfig.MR_HS_LEVELDB_STATE_STORE_PATH,
      testDir.getAbsoluteFile().toString());
}
 
源代码26 项目: hadoop   文件: CachedHistoryStorage.java
@SuppressWarnings("serial")
private void createLoadedJobCache(Configuration conf) {
  loadedJobCacheSize = conf.getInt(
      JHAdminConfig.MR_HISTORY_LOADED_JOB_CACHE_SIZE,
      JHAdminConfig.DEFAULT_MR_HISTORY_LOADED_JOB_CACHE_SIZE);

  loadedJobCache = Collections.synchronizedMap(new LinkedHashMap<JobId, Job>(
      loadedJobCacheSize + 1, 0.75f, true) {
    @Override
    public boolean removeEldestEntry(final Map.Entry<JobId, Job> eldest) {
      return super.size() > loadedJobCacheSize;
    }
  });
}
 
private Path createStorageDir(Configuration conf) throws IOException {
  String confPath = conf.get(JHAdminConfig.MR_HS_LEVELDB_STATE_STORE_PATH);
  if (confPath == null) {
    throw new IOException("No store location directory configured in " +
        JHAdminConfig.MR_HS_LEVELDB_STATE_STORE_PATH);
  }
  Path root = new Path(confPath, DB_NAME);
  FileSystem fs = FileSystem.getLocal(conf);
  fs.mkdirs(root, new FsPermission((short)0700));
  return root;
}
 
源代码28 项目: hadoop   文件: HistoryClientService.java
protected void serviceStart() throws Exception {
  Configuration conf = getConfig();
  YarnRPC rpc = YarnRPC.create(conf);
  initializeWebApp(conf);
  InetSocketAddress address = conf.getSocketAddr(
      JHAdminConfig.MR_HISTORY_BIND_HOST,
      JHAdminConfig.MR_HISTORY_ADDRESS,
      JHAdminConfig.DEFAULT_MR_HISTORY_ADDRESS,
      JHAdminConfig.DEFAULT_MR_HISTORY_PORT);

  server =
      rpc.getServer(HSClientProtocol.class, protocolHandler, address,
          conf, jhsDTSecretManager,
          conf.getInt(JHAdminConfig.MR_HISTORY_CLIENT_THREAD_COUNT,
              JHAdminConfig.DEFAULT_MR_HISTORY_CLIENT_THREAD_COUNT));

  // Enable service authorization?
  if (conf.getBoolean(
      CommonConfigurationKeysPublic.HADOOP_SECURITY_AUTHORIZATION,
      false)) {
    server.refreshServiceAcl(conf, new ClientHSPolicyProvider());
  }
  
  server.start();
  this.bindAddress = conf.updateConnectAddr(JHAdminConfig.MR_HISTORY_BIND_HOST,
                                            JHAdminConfig.MR_HISTORY_ADDRESS,
                                            JHAdminConfig.DEFAULT_MR_HISTORY_ADDRESS,
                                            server.getListenerAddress());
  LOG.info("Instantiated HistoryClientService at " + this.bindAddress);

  super.serviceStart();
}
 
源代码29 项目: hadoop   文件: JobHistory.java
@Override
protected void serviceInit(Configuration conf) throws Exception {
  LOG.info("JobHistory Init");
  this.conf = conf;
  this.appID = ApplicationId.newInstance(0, 0);
  this.appAttemptID = RecordFactoryProvider.getRecordFactory(conf)
      .newRecordInstance(ApplicationAttemptId.class);

  moveThreadInterval = conf.getLong(
      JHAdminConfig.MR_HISTORY_MOVE_INTERVAL_MS,
      JHAdminConfig.DEFAULT_MR_HISTORY_MOVE_INTERVAL_MS);

  hsManager = createHistoryFileManager();
  hsManager.init(conf);
  try {
    hsManager.initExisting();
  } catch (IOException e) {
    throw new YarnRuntimeException("Failed to intialize existing directories", e);
  }

  storage = createHistoryStorage();
  
  if (storage instanceof Service) {
    ((Service) storage).init(conf);
  }
  storage.setHistoryFileManager(hsManager);

  super.serviceInit(conf);
}
 
@Override
protected void initStorage(Configuration conf)
    throws IOException {
  final String storeUri = conf.get(JHAdminConfig.MR_HS_FS_STATE_STORE_URI);
  if (storeUri == null) {
    throw new IOException("No store location URI configured in " +
        JHAdminConfig.MR_HS_FS_STATE_STORE_URI);
  }

  LOG.info("Using " + storeUri + " for history server state storage");
  rootStatePath = new Path(storeUri, ROOT_STATE_DIR_NAME);
}
 
 同包方法