org.apache.hadoop.fs.FileContext#makeQualified ( )源码实例Demo

下面列出了org.apache.hadoop.fs.FileContext#makeQualified ( ) 实例代码,或者点击链接到github查看源代码,也可以在右侧发表评论。

源代码1 项目: hadoop   文件: ContainerLocalizer.java
private static void initDirs(Configuration conf, String user, String appId,
    FileContext lfs, List<Path> localDirs) throws IOException {
  if (null == localDirs || 0 == localDirs.size()) {
    throw new IOException("Cannot initialize without local dirs");
  }
  String[] appsFileCacheDirs = new String[localDirs.size()];
  String[] usersFileCacheDirs = new String[localDirs.size()];
  for (int i = 0, n = localDirs.size(); i < n; ++i) {
    // $x/usercache/$user
    Path base = lfs.makeQualified(
        new Path(new Path(localDirs.get(i), USERCACHE), user));
    // $x/usercache/$user/filecache
    Path userFileCacheDir = new Path(base, FILECACHE);
    usersFileCacheDirs[i] = userFileCacheDir.toString();
    createDir(lfs, userFileCacheDir, FILECACHE_PERMS, false);
    // $x/usercache/$user/appcache/$appId
    Path appBase = new Path(base, new Path(APPCACHE, appId));
    // $x/usercache/$user/appcache/$appId/filecache
    Path appFileCacheDir = new Path(appBase, FILECACHE);
    appsFileCacheDirs[i] = appFileCacheDir.toString();
    createDir(lfs, appFileCacheDir, FILECACHE_PERMS, false);
  }
  conf.setStrings(String.format(APPCACHE_CTXT_FMT, appId), appsFileCacheDirs);
  conf.setStrings(String.format(USERCACHE_CTXT_FMT, user), usersFileCacheDirs);
}
 
源代码2 项目: hadoop   文件: TestDefaultContainerExecutor.java
byte[] createTmpFile(Path dst, Random r, int len)
    throws IOException {
  // use unmodified local context
  FileContext lfs = FileContext.getLocalFSFileContext();
  dst = lfs.makeQualified(dst);
  lfs.mkdir(dst.getParent(), null, true);
  byte[] bytes = new byte[len];
  FSDataOutputStream out = null;
  try {
    out = lfs.create(dst, EnumSet.of(CREATE, OVERWRITE));
    r.nextBytes(bytes);
    out.write(bytes);
  } finally {
    if (out != null) out.close();
  }
  return bytes;
}
 
源代码3 项目: hadoop   文件: JobHistoryCopyService.java
public static FSDataInputStream getPreviousJobHistoryFileStream(
    Configuration conf, ApplicationAttemptId applicationAttemptId)
    throws IOException {
  FSDataInputStream in = null;
  Path historyFile = null;
  String jobId =
      TypeConverter.fromYarn(applicationAttemptId.getApplicationId())
        .toString();
  String jobhistoryDir =
      JobHistoryUtils.getConfiguredHistoryStagingDirPrefix(conf, jobId);
  Path histDirPath =
      FileContext.getFileContext(conf).makeQualified(new Path(jobhistoryDir));
  FileContext fc = FileContext.getFileContext(histDirPath.toUri(), conf);
  // read the previous history file
  historyFile =
      fc.makeQualified(JobHistoryUtils.getStagingJobHistoryFile(histDirPath,
        jobId, (applicationAttemptId.getAttemptId() - 1)));
  LOG.info("History file is at " + historyFile);
  in = fc.open(historyFile);
  return in;
}
 
源代码4 项目: hadoop   文件: HistoryFileManager.java
@VisibleForTesting
protected static List<FileStatus> scanDirectory(Path path, FileContext fc,
    PathFilter pathFilter) throws IOException {
  path = fc.makeQualified(path);
  List<FileStatus> jhStatusList = new ArrayList<FileStatus>();
  try {
    RemoteIterator<FileStatus> fileStatusIter = fc.listStatus(path);
    while (fileStatusIter.hasNext()) {
      FileStatus fileStatus = fileStatusIter.next();
      Path filePath = fileStatus.getPath();
      if (fileStatus.isFile() && pathFilter.accept(filePath)) {
        jhStatusList.add(fileStatus);
      }
    }
  } catch (FileNotFoundException fe) {
    LOG.error("Error while scanning directory " + path, fe);
  }
  return jhStatusList;
}
 
源代码5 项目: big-c   文件: ContainerLocalizer.java
private static void initDirs(Configuration conf, String user, String appId,
    FileContext lfs, List<Path> localDirs) throws IOException {
  if (null == localDirs || 0 == localDirs.size()) {
    throw new IOException("Cannot initialize without local dirs");
  }
  String[] appsFileCacheDirs = new String[localDirs.size()];
  String[] usersFileCacheDirs = new String[localDirs.size()];
  for (int i = 0, n = localDirs.size(); i < n; ++i) {
    // $x/usercache/$user
    Path base = lfs.makeQualified(
        new Path(new Path(localDirs.get(i), USERCACHE), user));
    // $x/usercache/$user/filecache
    Path userFileCacheDir = new Path(base, FILECACHE);
    usersFileCacheDirs[i] = userFileCacheDir.toString();
    createDir(lfs, userFileCacheDir, FILECACHE_PERMS, false);
    // $x/usercache/$user/appcache/$appId
    Path appBase = new Path(base, new Path(APPCACHE, appId));
    // $x/usercache/$user/appcache/$appId/filecache
    Path appFileCacheDir = new Path(appBase, FILECACHE);
    appsFileCacheDirs[i] = appFileCacheDir.toString();
    createDir(lfs, appFileCacheDir, FILECACHE_PERMS, false);
  }
  conf.setStrings(String.format(APPCACHE_CTXT_FMT, appId), appsFileCacheDirs);
  conf.setStrings(String.format(USERCACHE_CTXT_FMT, user), usersFileCacheDirs);
}
 
源代码6 项目: big-c   文件: TestDefaultContainerExecutor.java
byte[] createTmpFile(Path dst, Random r, int len)
    throws IOException {
  // use unmodified local context
  FileContext lfs = FileContext.getLocalFSFileContext();
  dst = lfs.makeQualified(dst);
  lfs.mkdir(dst.getParent(), null, true);
  byte[] bytes = new byte[len];
  FSDataOutputStream out = null;
  try {
    out = lfs.create(dst, EnumSet.of(CREATE, OVERWRITE));
    r.nextBytes(bytes);
    out.write(bytes);
  } finally {
    if (out != null) out.close();
  }
  return bytes;
}
 
源代码7 项目: big-c   文件: JobHistoryCopyService.java
public static FSDataInputStream getPreviousJobHistoryFileStream(
    Configuration conf, ApplicationAttemptId applicationAttemptId)
    throws IOException {
  FSDataInputStream in = null;
  Path historyFile = null;
  String jobId =
      TypeConverter.fromYarn(applicationAttemptId.getApplicationId())
        .toString();
  String jobhistoryDir =
      JobHistoryUtils.getConfiguredHistoryStagingDirPrefix(conf, jobId);
  Path histDirPath =
      FileContext.getFileContext(conf).makeQualified(new Path(jobhistoryDir));
  FileContext fc = FileContext.getFileContext(histDirPath.toUri(), conf);
  // read the previous history file
  historyFile =
      fc.makeQualified(JobHistoryUtils.getStagingJobHistoryFile(histDirPath,
        jobId, (applicationAttemptId.getAttemptId() - 1)));
  LOG.info("History file is at " + historyFile);
  in = fc.open(historyFile);
  return in;
}
 
源代码8 项目: big-c   文件: HistoryFileManager.java
@VisibleForTesting
protected static List<FileStatus> scanDirectory(Path path, FileContext fc,
    PathFilter pathFilter) throws IOException {
  path = fc.makeQualified(path);
  List<FileStatus> jhStatusList = new ArrayList<FileStatus>();
  try {
    RemoteIterator<FileStatus> fileStatusIter = fc.listStatus(path);
    while (fileStatusIter.hasNext()) {
      FileStatus fileStatus = fileStatusIter.next();
      Path filePath = fileStatus.getPath();
      if (fileStatus.isFile() && pathFilter.accept(filePath)) {
        jhStatusList.add(fileStatus);
      }
    }
  } catch (FileNotFoundException fe) {
    LOG.error("Error while scanning directory " + path, fe);
  }
  return jhStatusList;
}
 
源代码9 项目: hadoop   文件: JobHistoryUtils.java
public static Path getPreviousJobHistoryPath(
    Configuration conf, ApplicationAttemptId applicationAttemptId)
    throws IOException {
  String jobId =
      TypeConverter.fromYarn(applicationAttemptId.getApplicationId())
        .toString();
  String jobhistoryDir =
      JobHistoryUtils.getConfiguredHistoryStagingDirPrefix(conf, jobId);
  Path histDirPath = FileContext.getFileContext(conf).makeQualified(
          new Path(jobhistoryDir));
  FileContext fc = FileContext.getFileContext(histDirPath.toUri(), conf);
  return fc.makeQualified(JobHistoryUtils.getStagingJobHistoryFile(
      histDirPath,jobId, (applicationAttemptId.getAttemptId() - 1)));
}
 
源代码10 项目: hadoop   文件: TestJobHistoryParsing.java
private static String getJobSummary(FileContext fc, Path path)
    throws IOException {
  Path qPath = fc.makeQualified(path);
  FSDataInputStream in = fc.open(qPath);
  String jobSummaryString = in.readUTF();
  in.close();
  return jobSummaryString;
}
 
源代码11 项目: big-c   文件: JobHistoryUtils.java
public static Path getPreviousJobHistoryPath(
    Configuration conf, ApplicationAttemptId applicationAttemptId)
    throws IOException {
  String jobId =
      TypeConverter.fromYarn(applicationAttemptId.getApplicationId())
        .toString();
  String jobhistoryDir =
      JobHistoryUtils.getConfiguredHistoryStagingDirPrefix(conf, jobId);
  Path histDirPath = FileContext.getFileContext(conf).makeQualified(
          new Path(jobhistoryDir));
  FileContext fc = FileContext.getFileContext(histDirPath.toUri(), conf);
  return fc.makeQualified(JobHistoryUtils.getStagingJobHistoryFile(
      histDirPath,jobId, (applicationAttemptId.getAttemptId() - 1)));
}
 
源代码12 项目: big-c   文件: HistoryFileManager.java
private String getJobSummary(FileContext fc, Path path) throws IOException {
  Path qPath = fc.makeQualified(path);
  FSDataInputStream in = fc.open(qPath);
  String jobSummaryString = in.readUTF();
  in.close();
  return jobSummaryString;
}
 
源代码13 项目: big-c   文件: TestJobHistoryParsing.java
private static String getJobSummary(FileContext fc, Path path)
    throws IOException {
  Path qPath = fc.makeQualified(path);
  FSDataInputStream in = fc.open(qPath);
  String jobSummaryString = in.readUTF();
  in.close();
  return jobSummaryString;
}
 
源代码14 项目: hadoop   文件: TestContainerLocalizer.java
@SuppressWarnings("unchecked") // mocked generics
private ContainerLocalizer setupContainerLocalizerForTest()
    throws Exception {
  // don't actually create dirs
  doNothing().when(spylfs).mkdir(
      isA(Path.class), isA(FsPermission.class), anyBoolean());

  Configuration conf = new Configuration();
  FileContext lfs = FileContext.getFileContext(spylfs, conf);
  localDirs = new ArrayList<Path>();
  for (int i = 0; i < 4; ++i) {
    localDirs.add(lfs.makeQualified(new Path(basedir, i + "")));
  }
  RecordFactory mockRF = getMockLocalizerRecordFactory();
  ContainerLocalizer concreteLoc = new ContainerLocalizer(lfs, appUser,
      appId, containerId, localDirs, mockRF);
  ContainerLocalizer localizer = spy(concreteLoc);

  // return credential stream instead of opening local file
  random = new Random();
  long seed = random.nextLong();
  System.out.println("SEED: " + seed);
  random.setSeed(seed);
  DataInputBuffer appTokens = createFakeCredentials(random, 10);
  tokenPath =
    lfs.makeQualified(new Path(
          String.format(ContainerLocalizer.TOKEN_FILE_NAME_FMT,
              containerId)));
  doReturn(new FSDataInputStream(new FakeFSDataInputStream(appTokens))
      ).when(spylfs).open(tokenPath);
  nmProxy = mock(LocalizationProtocol.class);
  doReturn(nmProxy).when(localizer).getProxy(nmAddr);
  doNothing().when(localizer).sleep(anyInt());
  

  // return result instantly for deterministic test
  ExecutorService syncExec = mock(ExecutorService.class);
  CompletionService<Path> cs = mock(CompletionService.class);
  when(cs.submit(isA(Callable.class)))
    .thenAnswer(new Answer<Future<Path>>() {
        @Override
        public Future<Path> answer(InvocationOnMock invoc)
            throws Throwable {
          Future<Path> done = mock(Future.class);
          when(done.isDone()).thenReturn(true);
          FakeDownload d = (FakeDownload) invoc.getArguments()[0];
          when(done.get()).thenReturn(d.call());
          return done;
        }
      });
  doReturn(syncExec).when(localizer).createDownloadThreadPool();
  doReturn(cs).when(localizer).createCompletionService(syncExec);

  return localizer;
}
 
源代码15 项目: hadoop   文件: TestLocalResourcesTrackerImpl.java
@Test
@SuppressWarnings("unchecked")
public void testGetPathForLocalization() throws Exception {
  FileContext lfs = FileContext.getLocalFSFileContext();
  Path base_path = new Path("target",
      TestLocalResourcesTrackerImpl.class.getSimpleName());
  final String user = "someuser";
  final ApplicationId appId = ApplicationId.newInstance(1, 1);
  Configuration conf = new YarnConfiguration();
  DrainDispatcher dispatcher = null;
  dispatcher = createDispatcher(conf);
  EventHandler<LocalizerEvent> localizerEventHandler =
      mock(EventHandler.class);
  EventHandler<LocalizerEvent> containerEventHandler =
      mock(EventHandler.class);
  dispatcher.register(LocalizerEventType.class, localizerEventHandler);
  dispatcher.register(ContainerEventType.class, containerEventHandler);
  NMStateStoreService stateStore = mock(NMStateStoreService.class);
  DeletionService delService = mock(DeletionService.class);
  try {
    LocalResourceRequest req1 = createLocalResourceRequest(user, 1, 1,
        LocalResourceVisibility.PUBLIC);
    LocalizedResource lr1 = createLocalizedResource(req1, dispatcher);
    ConcurrentMap<LocalResourceRequest, LocalizedResource> localrsrc =
        new ConcurrentHashMap<LocalResourceRequest, LocalizedResource>();
    localrsrc.put(req1, lr1);
    LocalResourcesTrackerImpl tracker = new LocalResourcesTrackerImpl(user,
        appId, dispatcher, localrsrc, true, conf, stateStore, null);
    Path conflictPath = new Path(base_path, "10");
    Path qualifiedConflictPath = lfs.makeQualified(conflictPath);
    lfs.mkdir(qualifiedConflictPath, null, true);
    Path rPath = tracker.getPathForLocalization(req1, base_path,
        delService);
    Assert.assertFalse(lfs.util().exists(rPath));
    verify(delService, times(1)).delete(eq(user), eq(conflictPath));
  } finally {
    lfs.delete(base_path, true);
    if (dispatcher != null) {
      dispatcher.stop();
    }
  }
}
 
源代码16 项目: hadoop   文件: MiniMRYarnCluster.java
@Override
public void serviceInit(Configuration conf) throws Exception {
  conf.set(MRConfig.FRAMEWORK_NAME, MRConfig.YARN_FRAMEWORK_NAME);
  if (conf.get(MRJobConfig.MR_AM_STAGING_DIR) == null) {
    conf.set(MRJobConfig.MR_AM_STAGING_DIR, new File(getTestWorkDir(),
        "apps_staging_dir/").getAbsolutePath());
  }

  // By default, VMEM monitoring disabled, PMEM monitoring enabled.
  if (!conf.getBoolean(
      MRConfig.MAPREDUCE_MINICLUSTER_CONTROL_RESOURCE_MONITORING,
      MRConfig.DEFAULT_MAPREDUCE_MINICLUSTER_CONTROL_RESOURCE_MONITORING)) {
    conf.setBoolean(YarnConfiguration.NM_PMEM_CHECK_ENABLED, false);
    conf.setBoolean(YarnConfiguration.NM_VMEM_CHECK_ENABLED, false);
  }

  conf.set(CommonConfigurationKeys.FS_PERMISSIONS_UMASK_KEY,  "000");

  try {
    Path stagingPath = FileContext.getFileContext(conf).makeQualified(
        new Path(conf.get(MRJobConfig.MR_AM_STAGING_DIR)));
    /*
     * Re-configure the staging path on Windows if the file system is localFs.
     * We need to use a absolute path that contains the drive letter. The unit
     * test could run on a different drive than the AM. We can run into the
     * issue that job files are localized to the drive where the test runs on,
     * while the AM starts on a different drive and fails to find the job
     * metafiles. Using absolute path can avoid this ambiguity.
     */
    if (Path.WINDOWS) {
      if (LocalFileSystem.class.isInstance(stagingPath.getFileSystem(conf))) {
        conf.set(MRJobConfig.MR_AM_STAGING_DIR,
            new File(conf.get(MRJobConfig.MR_AM_STAGING_DIR))
                .getAbsolutePath());
      }
    }
    FileContext fc=FileContext.getFileContext(stagingPath.toUri(), conf);
    if (fc.util().exists(stagingPath)) {
      LOG.info(stagingPath + " exists! deleting...");
      fc.delete(stagingPath, true);
    }
    LOG.info("mkdir: " + stagingPath);
    //mkdir the staging directory so that right permissions are set while running as proxy user
    fc.mkdir(stagingPath, null, true);
    //mkdir done directory as well 
    String doneDir = JobHistoryUtils.getConfiguredHistoryServerDoneDirPrefix(conf);
    Path doneDirPath = fc.makeQualified(new Path(doneDir));
    fc.mkdir(doneDirPath, null, true);
  } catch (IOException e) {
    throw new YarnRuntimeException("Could not create staging directory. ", e);
  }
  conf.set(MRConfig.MASTER_ADDRESS, "test"); // The default is local because of
                                           // which shuffle doesn't happen
  //configure the shuffle service in NM
  conf.setStrings(YarnConfiguration.NM_AUX_SERVICES,
      new String[] { ShuffleHandler.MAPREDUCE_SHUFFLE_SERVICEID });
  conf.setClass(String.format(YarnConfiguration.NM_AUX_SERVICE_FMT,
      ShuffleHandler.MAPREDUCE_SHUFFLE_SERVICEID), ShuffleHandler.class,
      Service.class);

  // Non-standard shuffle port
  conf.setInt(ShuffleHandler.SHUFFLE_PORT_CONFIG_KEY, 0);

  conf.setClass(YarnConfiguration.NM_CONTAINER_EXECUTOR,
      DefaultContainerExecutor.class, ContainerExecutor.class);

  // TestMRJobs is for testing non-uberized operation only; see TestUberAM
  // for corresponding uberized tests.
  conf.setBoolean(MRJobConfig.JOB_UBERTASK_ENABLE, false);

  super.serviceInit(conf);
}
 
源代码17 项目: incubator-tez   文件: MiniTezCluster.java
@Override
public void serviceInit(Configuration conf) throws Exception {
  conf.set(MRConfig.FRAMEWORK_NAME, MRConfig.YARN_TEZ_FRAMEWORK_NAME);
  // blacklisting disabled to prevent scheduling issues
  conf.setBoolean(TezConfiguration.TEZ_AM_NODE_BLACKLISTING_ENABLED, false);
  if (conf.get(MRJobConfig.MR_AM_STAGING_DIR) == null) {
    conf.set(MRJobConfig.MR_AM_STAGING_DIR, new File(getTestWorkDir(),
        "apps_staging_dir" + Path.SEPARATOR).getAbsolutePath());
  }
  
  if (conf.get(YarnConfiguration.DEBUG_NM_DELETE_DELAY_SEC) == null) {
    // nothing defined. set quick delete value
    conf.setLong(YarnConfiguration.DEBUG_NM_DELETE_DELAY_SEC, 0l);
  }
  
  File appJarLocalFile = new File(MiniTezCluster.APPJAR);

  if (!appJarLocalFile.exists()) {
    String message = "TezAppJar " + MiniTezCluster.APPJAR
        + " not found. Exiting.";
    LOG.info(message);
    throw new TezUncheckedException(message);
  }
  
  FileSystem fs = FileSystem.get(conf);
  Path testRootDir = fs.makeQualified(new Path("target", getName() + "-tmpDir"));
  Path appRemoteJar = new Path(testRootDir, "TezAppJar.jar");
  // Copy AppJar and make it public.
  Path appMasterJar = new Path(MiniTezCluster.APPJAR);
  fs.copyFromLocalFile(appMasterJar, appRemoteJar);
  fs.setPermission(appRemoteJar, new FsPermission("777"));

  conf.set(TezConfiguration.TEZ_LIB_URIS, appRemoteJar.toUri().toString());
  LOG.info("Set TEZ-LIB-URI to: " + conf.get(TezConfiguration.TEZ_LIB_URIS));

  // VMEM monitoring disabled, PMEM monitoring enabled.
  conf.setBoolean(YarnConfiguration.NM_PMEM_CHECK_ENABLED, false);
  conf.setBoolean(YarnConfiguration.NM_VMEM_CHECK_ENABLED, false);

  conf.set(CommonConfigurationKeys.FS_PERMISSIONS_UMASK_KEY,  "000");

  try {
    Path stagingPath = FileContext.getFileContext(conf).makeQualified(
        new Path(conf.get(MRJobConfig.MR_AM_STAGING_DIR)));
    /*
     * Re-configure the staging path on Windows if the file system is localFs.
     * We need to use a absolute path that contains the drive letter. The unit
     * test could run on a different drive than the AM. We can run into the
     * issue that job files are localized to the drive where the test runs on,
     * while the AM starts on a different drive and fails to find the job
     * metafiles. Using absolute path can avoid this ambiguity.
     */
    if (Path.WINDOWS) {
      if (LocalFileSystem.class.isInstance(stagingPath.getFileSystem(conf))) {
        conf.set(MRJobConfig.MR_AM_STAGING_DIR,
            new File(conf.get(MRJobConfig.MR_AM_STAGING_DIR))
                .getAbsolutePath());
      }
    }
    FileContext fc=FileContext.getFileContext(stagingPath.toUri(), conf);
    if (fc.util().exists(stagingPath)) {
      LOG.info(stagingPath + " exists! deleting...");
      fc.delete(stagingPath, true);
    }
    LOG.info("mkdir: " + stagingPath);
    fc.mkdir(stagingPath, null, true);

    //mkdir done directory as well
    String doneDir =
        JobHistoryUtils.getConfiguredHistoryServerDoneDirPrefix(conf);
    Path doneDirPath = fc.makeQualified(new Path(doneDir));
    fc.mkdir(doneDirPath, null, true);
  } catch (IOException e) {
    throw new TezUncheckedException("Could not create staging directory. ", e);
  }
  conf.set(MRConfig.MASTER_ADDRESS, "test");

  //configure the shuffle service in NM
  conf.setStrings(YarnConfiguration.NM_AUX_SERVICES,
      new String[] { ShuffleHandler.MAPREDUCE_SHUFFLE_SERVICEID });
  conf.setClass(String.format(YarnConfiguration.NM_AUX_SERVICE_FMT,
      ShuffleHandler.MAPREDUCE_SHUFFLE_SERVICEID), ShuffleHandler.class,
      Service.class);

  // Non-standard shuffle port
  conf.setInt(ShuffleHandler.SHUFFLE_PORT_CONFIG_KEY, 0);

  conf.setClass(YarnConfiguration.NM_CONTAINER_EXECUTOR,
      DefaultContainerExecutor.class, ContainerExecutor.class);

  // TestMRJobs is for testing non-uberized operation only; see TestUberAM
  // for corresponding uberized tests.
  conf.setBoolean(MRJobConfig.JOB_UBERTASK_ENABLE, false);
  super.serviceInit(conf);
}
 
源代码18 项目: big-c   文件: TestContainerLocalizer.java
@SuppressWarnings("unchecked") // mocked generics
private ContainerLocalizer setupContainerLocalizerForTest()
    throws Exception {
  // don't actually create dirs
  doNothing().when(spylfs).mkdir(
      isA(Path.class), isA(FsPermission.class), anyBoolean());

  Configuration conf = new Configuration();
  FileContext lfs = FileContext.getFileContext(spylfs, conf);
  localDirs = new ArrayList<Path>();
  for (int i = 0; i < 4; ++i) {
    localDirs.add(lfs.makeQualified(new Path(basedir, i + "")));
  }
  RecordFactory mockRF = getMockLocalizerRecordFactory();
  ContainerLocalizer concreteLoc = new ContainerLocalizer(lfs, appUser,
      appId, containerId, localDirs, mockRF);
  ContainerLocalizer localizer = spy(concreteLoc);

  // return credential stream instead of opening local file
  random = new Random();
  long seed = random.nextLong();
  System.out.println("SEED: " + seed);
  random.setSeed(seed);
  DataInputBuffer appTokens = createFakeCredentials(random, 10);
  tokenPath =
    lfs.makeQualified(new Path(
          String.format(ContainerLocalizer.TOKEN_FILE_NAME_FMT,
              containerId)));
  doReturn(new FSDataInputStream(new FakeFSDataInputStream(appTokens))
      ).when(spylfs).open(tokenPath);
  nmProxy = mock(LocalizationProtocol.class);
  doReturn(nmProxy).when(localizer).getProxy(nmAddr);
  doNothing().when(localizer).sleep(anyInt());
  

  // return result instantly for deterministic test
  ExecutorService syncExec = mock(ExecutorService.class);
  CompletionService<Path> cs = mock(CompletionService.class);
  when(cs.submit(isA(Callable.class)))
    .thenAnswer(new Answer<Future<Path>>() {
        @Override
        public Future<Path> answer(InvocationOnMock invoc)
            throws Throwable {
          Future<Path> done = mock(Future.class);
          when(done.isDone()).thenReturn(true);
          FakeDownload d = (FakeDownload) invoc.getArguments()[0];
          when(done.get()).thenReturn(d.call());
          return done;
        }
      });
  doReturn(syncExec).when(localizer).createDownloadThreadPool();
  doReturn(cs).when(localizer).createCompletionService(syncExec);

  return localizer;
}
 
源代码19 项目: big-c   文件: MiniMRYarnCluster.java
@Override
public void serviceInit(Configuration conf) throws Exception {
  conf.set(MRConfig.FRAMEWORK_NAME, MRConfig.YARN_FRAMEWORK_NAME);
  if (conf.get(MRJobConfig.MR_AM_STAGING_DIR) == null) {
    conf.set(MRJobConfig.MR_AM_STAGING_DIR, new File(getTestWorkDir(),
        "apps_staging_dir/").getAbsolutePath());
  }

  // By default, VMEM monitoring disabled, PMEM monitoring enabled.
  if (!conf.getBoolean(
      MRConfig.MAPREDUCE_MINICLUSTER_CONTROL_RESOURCE_MONITORING,
      MRConfig.DEFAULT_MAPREDUCE_MINICLUSTER_CONTROL_RESOURCE_MONITORING)) {
    conf.setBoolean(YarnConfiguration.NM_PMEM_CHECK_ENABLED, false);
    conf.setBoolean(YarnConfiguration.NM_VMEM_CHECK_ENABLED, false);
  }

  conf.set(CommonConfigurationKeys.FS_PERMISSIONS_UMASK_KEY,  "000");

  try {
    Path stagingPath = FileContext.getFileContext(conf).makeQualified(
        new Path(conf.get(MRJobConfig.MR_AM_STAGING_DIR)));
    /*
     * Re-configure the staging path on Windows if the file system is localFs.
     * We need to use a absolute path that contains the drive letter. The unit
     * test could run on a different drive than the AM. We can run into the
     * issue that job files are localized to the drive where the test runs on,
     * while the AM starts on a different drive and fails to find the job
     * metafiles. Using absolute path can avoid this ambiguity.
     */
    if (Path.WINDOWS) {
      if (LocalFileSystem.class.isInstance(stagingPath.getFileSystem(conf))) {
        conf.set(MRJobConfig.MR_AM_STAGING_DIR,
            new File(conf.get(MRJobConfig.MR_AM_STAGING_DIR))
                .getAbsolutePath());
      }
    }
    FileContext fc=FileContext.getFileContext(stagingPath.toUri(), conf);
    if (fc.util().exists(stagingPath)) {
      LOG.info(stagingPath + " exists! deleting...");
      fc.delete(stagingPath, true);
    }
    LOG.info("mkdir: " + stagingPath);
    //mkdir the staging directory so that right permissions are set while running as proxy user
    fc.mkdir(stagingPath, null, true);
    //mkdir done directory as well 
    String doneDir = JobHistoryUtils.getConfiguredHistoryServerDoneDirPrefix(conf);
    Path doneDirPath = fc.makeQualified(new Path(doneDir));
    fc.mkdir(doneDirPath, null, true);
  } catch (IOException e) {
    throw new YarnRuntimeException("Could not create staging directory. ", e);
  }
  conf.set(MRConfig.MASTER_ADDRESS, "test"); // The default is local because of
                                           // which shuffle doesn't happen
  //configure the shuffle service in NM
  conf.setStrings(YarnConfiguration.NM_AUX_SERVICES,
      new String[] { ShuffleHandler.MAPREDUCE_SHUFFLE_SERVICEID });
  conf.setClass(String.format(YarnConfiguration.NM_AUX_SERVICE_FMT,
      ShuffleHandler.MAPREDUCE_SHUFFLE_SERVICEID), ShuffleHandler.class,
      Service.class);

  // Non-standard shuffle port
  conf.setInt(ShuffleHandler.SHUFFLE_PORT_CONFIG_KEY, 0);

  conf.setClass(YarnConfiguration.NM_CONTAINER_EXECUTOR,
      DefaultContainerExecutor.class, ContainerExecutor.class);

  // TestMRJobs is for testing non-uberized operation only; see TestUberAM
  // for corresponding uberized tests.
  conf.setBoolean(MRJobConfig.JOB_UBERTASK_ENABLE, false);

  super.serviceInit(conf);
}
 
源代码20 项目: incubator-tajo   文件: MiniTajoYarnCluster.java
@Override
public void init(Configuration conf) {

  conf.setSocketAddr(YarnConfiguration.RM_ADDRESS, new InetSocketAddress("127.0.0.1", 0));
  conf.setSocketAddr(YarnConfiguration.RM_SCHEDULER_ADDRESS, new InetSocketAddress("127.0.0.1", 0));

  conf.set(MRConfig.FRAMEWORK_NAME, MRConfig.YARN_FRAMEWORK_NAME);
  if (conf.get(MRJobConfig.MR_AM_STAGING_DIR) == null) {
    conf.set(MRJobConfig.MR_AM_STAGING_DIR, new File(getTestWorkDir(),
        "apps_staging_dir/").getAbsolutePath());
  }
  conf.set(CommonConfigurationKeys.FS_PERMISSIONS_UMASK_KEY,  "000");

  try {
    Path stagingPath = FileContext.getFileContext(conf).makeQualified(
        new Path(conf.get(MRJobConfig.MR_AM_STAGING_DIR)));
    FileContext fc=FileContext.getFileContext(stagingPath.toUri(), conf);
    if (fc.util().exists(stagingPath)) {
      LOG.info(stagingPath + " exists! deleting...");
      fc.delete(stagingPath, true);
    }
    LOG.info("mkdir: " + stagingPath);
    //mkdir the staging directory so that right permissions are set while running as proxy user
    fc.mkdir(stagingPath, null, true);
    //mkdir done directory as well
    String doneDir = JobHistoryUtils
        .getConfiguredHistoryServerDoneDirPrefix(conf);
    Path doneDirPath = fc.makeQualified(new Path(doneDir));
    fc.mkdir(doneDirPath, null, true);
  } catch (IOException e) {
    throw new YarnRuntimeException("Could not create staging directory. ", e);
  }
  conf.set(MRConfig.MASTER_ADDRESS, "test"); // The default is local because of
  // which shuffle doesn't happen
  //configure the shuffle service in NM
  conf.setStrings(YarnConfiguration.NM_AUX_SERVICES, PullServerAuxService.PULLSERVER_SERVICEID);
  conf.setClass(String.format(YarnConfiguration.NM_AUX_SERVICE_FMT,
      PullServerAuxService.PULLSERVER_SERVICEID), PullServerAuxService.class,
      Service.class);

  // Non-standard shuffle port
  conf.setInt(TajoConf.ConfVars.PULLSERVER_PORT.name(), 0);

  // local directory
  conf.set(TajoConf.ConfVars.WORKER_TEMPORAL_DIR.name(), "/tmp/tajo-localdir");

  conf.setClass(YarnConfiguration.NM_CONTAINER_EXECUTOR,
      DefaultContainerExecutor.class, ContainerExecutor.class);

  // TestMRJobs is for testing non-uberized operation only; see TestUberAM
  // for corresponding uberized tests.
  conf.setBoolean(MRJobConfig.JOB_UBERTASK_ENABLE, false);

  conf.setInt("yarn.nodemanager.delete.debug-delay-sec", 600);

  super.init(conf);
}