类org.apache.hadoop.fs.FileContext源码实例Demo

下面列出了怎么用org.apache.hadoop.fs.FileContext的API类实例代码及写法,或者点击链接到github查看源代码。

源代码1 项目: big-c   文件: ContainerLocalizer.java
private static void initDirs(Configuration conf, String user, String appId,
    FileContext lfs, List<Path> localDirs) throws IOException {
  if (null == localDirs || 0 == localDirs.size()) {
    throw new IOException("Cannot initialize without local dirs");
  }
  String[] appsFileCacheDirs = new String[localDirs.size()];
  String[] usersFileCacheDirs = new String[localDirs.size()];
  for (int i = 0, n = localDirs.size(); i < n; ++i) {
    // $x/usercache/$user
    Path base = lfs.makeQualified(
        new Path(new Path(localDirs.get(i), USERCACHE), user));
    // $x/usercache/$user/filecache
    Path userFileCacheDir = new Path(base, FILECACHE);
    usersFileCacheDirs[i] = userFileCacheDir.toString();
    createDir(lfs, userFileCacheDir, FILECACHE_PERMS, false);
    // $x/usercache/$user/appcache/$appId
    Path appBase = new Path(base, new Path(APPCACHE, appId));
    // $x/usercache/$user/appcache/$appId/filecache
    Path appFileCacheDir = new Path(appBase, FILECACHE);
    appsFileCacheDirs[i] = appFileCacheDir.toString();
    createDir(lfs, appFileCacheDir, FILECACHE_PERMS, false);
  }
  conf.setStrings(String.format(APPCACHE_CTXT_FMT, appId), appsFileCacheDirs);
  conf.setStrings(String.format(USERCACHE_CTXT_FMT, user), usersFileCacheDirs);
}
 
源代码2 项目: big-c   文件: Display.java
public AvroFileInputStream(FileStatus status) throws IOException {
  pos = 0;
  buffer = new byte[0];
  GenericDatumReader<Object> reader = new GenericDatumReader<Object>();
  FileContext fc = FileContext.getFileContext(new Configuration());
  fileReader =
    DataFileReader.openReader(new AvroFSInput(fc, status.getPath()),reader);
  Schema schema = fileReader.getSchema();
  writer = new GenericDatumWriter<Object>(schema);
  output = new ByteArrayOutputStream();
  JsonGenerator generator =
    new JsonFactory().createJsonGenerator(output, JsonEncoding.UTF8);
  MinimalPrettyPrinter prettyPrinter = new MinimalPrettyPrinter();
  prettyPrinter.setRootValueSeparator(System.getProperty("line.separator"));
  generator.setPrettyPrinter(prettyPrinter);
  encoder = EncoderFactory.get().jsonEncoder(schema, generator);
}
 
源代码3 项目: hadoop   文件: TestFSDownload.java
static LocalResource createZipFile(FileContext files, Path p, int len,
    Random r, LocalResourceVisibility vis) throws IOException,
    URISyntaxException {
  byte[] bytes = new byte[len];
  r.nextBytes(bytes);

  File archiveFile = new File(p.toUri().getPath() + ".ZIP");
  archiveFile.createNewFile();
  ZipOutputStream out = new ZipOutputStream(
      new FileOutputStream(archiveFile));
  out.putNextEntry(new ZipEntry(p.getName()));
  out.write(bytes);
  out.closeEntry();
  out.close();

  LocalResource ret = recordFactory.newRecordInstance(LocalResource.class);
  ret.setResource(ConverterUtils.getYarnUrlFromPath(new Path(p.toString()
      + ".ZIP")));
  ret.setSize(len);
  ret.setType(LocalResourceType.ARCHIVE);
  ret.setVisibility(vis);
  ret.setTimestamp(files.getFileStatus(new Path(p.toString() + ".ZIP"))
      .getModificationTime());
  return ret;
}
 
源代码4 项目: hadoop   文件: DataGenerator.java
/** Parse the command line arguments and initialize the data */
private int init(String[] args) {
  try { // initialize file system handle
    fc = FileContext.getFileContext(getConf());
  } catch (IOException ioe) {
    System.err.println("Can not initialize the file system: " + 
        ioe.getLocalizedMessage());
    return -1;
  }

  for (int i = 0; i < args.length; i++) { // parse command line
    if (args[i].equals("-root")) {
      root = new Path(args[++i]);
    } else if (args[i].equals("-inDir")) {
      inDir = new File(args[++i]);
    } else {
      System.err.println(USAGE);
      ToolRunner.printGenericCommandUsage(System.err);
      System.exit(-1);
    }
  }
  return 0;
}
 
源代码5 项目: hadoop   文件: HistoryFileManager.java
private void mkdir(FileContext fc, Path path, FsPermission fsp)
    throws IOException {
  if (!fc.util().exists(path)) {
    try {
      fc.mkdir(path, fsp, true);

      FileStatus fsStatus = fc.getFileStatus(path);
      LOG.info("Perms after creating " + fsStatus.getPermission().toShort()
          + ", Expected: " + fsp.toShort());
      if (fsStatus.getPermission().toShort() != fsp.toShort()) {
        LOG.info("Explicitly setting permissions to : " + fsp.toShort()
            + ", " + fsp);
        fc.setPermission(path, fsp);
      }
    } catch (FileAlreadyExistsException e) {
      LOG.info("Directory: [" + path + "] already exists.");
    }
  }
}
 
源代码6 项目: big-c   文件: HistoryFileManager.java
@VisibleForTesting
protected static List<FileStatus> scanDirectory(Path path, FileContext fc,
    PathFilter pathFilter) throws IOException {
  path = fc.makeQualified(path);
  List<FileStatus> jhStatusList = new ArrayList<FileStatus>();
  try {
    RemoteIterator<FileStatus> fileStatusIter = fc.listStatus(path);
    while (fileStatusIter.hasNext()) {
      FileStatus fileStatus = fileStatusIter.next();
      Path filePath = fileStatus.getPath();
      if (fileStatus.isFile() && pathFilter.accept(filePath)) {
        jhStatusList.add(fileStatus);
      }
    }
  } catch (FileNotFoundException fe) {
    LOG.error("Error while scanning directory " + path, fe);
  }
  return jhStatusList;
}
 
源代码7 项目: big-c   文件: TestEncryptionZones.java
@Before
public void setup() throws Exception {
  conf = new HdfsConfiguration();
  fsHelper = new FileSystemTestHelper();
  // Set up java key store
  String testRoot = fsHelper.getTestRootDir();
  testRootDir = new File(testRoot).getAbsoluteFile();
  conf.set(DFSConfigKeys.DFS_ENCRYPTION_KEY_PROVIDER_URI, getKeyProviderURI());
  conf.setBoolean(DFSConfigKeys.DFS_NAMENODE_DELEGATION_TOKEN_ALWAYS_USE_KEY, true);
  // Lower the batch size for testing
  conf.setInt(DFSConfigKeys.DFS_NAMENODE_LIST_ENCRYPTION_ZONES_NUM_RESPONSES,
      2);
  cluster = new MiniDFSCluster.Builder(conf).numDataNodes(1).build();
  Logger.getLogger(EncryptionZoneManager.class).setLevel(Level.TRACE);
  fs = cluster.getFileSystem();
  fsWrapper = new FileSystemTestWrapper(fs);
  fcWrapper = new FileContextTestWrapper(
      FileContext.getFileContext(cluster.getURI(), conf));
  dfsAdmin = new HdfsAdmin(cluster.getURI(), conf);
  setProvider();
  // Create a test key
  DFSTestUtil.createKey(TEST_KEY, cluster, conf);
}
 
源代码8 项目: big-c   文件: ResourceLocalizationService.java
private void cleanUpLocalDir(FileContext lfs, DeletionService del,
    String localDir) {
  long currentTimeStamp = System.currentTimeMillis();
  renameLocalDir(lfs, localDir, ContainerLocalizer.USERCACHE,
    currentTimeStamp);
  renameLocalDir(lfs, localDir, ContainerLocalizer.FILECACHE,
    currentTimeStamp);
  renameLocalDir(lfs, localDir, ResourceLocalizationService.NM_PRIVATE_DIR,
    currentTimeStamp);
  try {
    deleteLocalDir(lfs, del, localDir);
  } catch (IOException e) {
    // Do nothing, just give the warning
    LOG.warn("Failed to delete localDir: " + localDir);
  }
}
 
@Test
public void testApplication() throws IOException, Exception
{
  try {
    FileContext.getLocalFSFileContext().delete(new Path(new File(testMeta.dir).getAbsolutePath()), true);
    int cnt = 7;
    createAvroInput(cnt);
    writeAvroFile(new File(FILENAME));
    createAvroInput(cnt - 2);
    writeAvroFile(new File(OTHER_FILE));
    avroFileInput.setDirectory(testMeta.dir);

    LocalMode lma = LocalMode.newInstance();
    Configuration conf = new Configuration(false);

    AvroReaderApplication avroReaderApplication = new AvroReaderApplication();
    avroReaderApplication.setAvroFileInputOperator(avroFileInput);
    lma.prepareDAG(avroReaderApplication, conf);

    LocalMode.Controller lc = lma.getController();
    lc.run(10000);// runs for 10 seconds and quits
  } catch (ConstraintViolationException e) {
    Assert.fail("constraint violations: " + e.getConstraintViolations());
  }
}
 
源代码10 项目: hadoop   文件: TestReservedRawPaths.java
@Before
public void setup() throws Exception {
  conf = new HdfsConfiguration();
  fsHelper = new FileSystemTestHelper();
  // Set up java key store
  String testRoot = fsHelper.getTestRootDir();
  File testRootDir = new File(testRoot).getAbsoluteFile();
  final Path jksPath = new Path(testRootDir.toString(), "test.jks");
  conf.set(DFSConfigKeys.DFS_ENCRYPTION_KEY_PROVIDER_URI,
      JavaKeyStoreProvider.SCHEME_NAME + "://file" + jksPath.toUri()
  );
  cluster = new MiniDFSCluster.Builder(conf).numDataNodes(1).build();
  Logger.getLogger(EncryptionZoneManager.class).setLevel(Level.TRACE);
  fs = cluster.getFileSystem();
  fsWrapper = new FileSystemTestWrapper(cluster.getFileSystem());
  fcWrapper = new FileContextTestWrapper(
      FileContext.getFileContext(cluster.getURI(), conf));
  dfsAdmin = new HdfsAdmin(cluster.getURI(), conf);
  // Need to set the client's KeyProvider to the NN's for JKS,
  // else the updates do not get flushed properly
  fs.getClient().setKeyProvider(cluster.getNameNode().getNamesystem()
      .getProvider());
  DFSTestUtil.createKey(TEST_KEY, cluster, conf);
}
 
private Configuration readLaunchConfiguration() throws IOException
{
  Path appPath = new Path(appContext.getApplicationPath());
  Path  configFilePath = new Path(appPath, LogicalPlan.LAUNCH_CONFIG_FILE_NAME);
  try {
    LOG.debug("Reading launch configuration file ");
    URI uri = appPath.toUri();
    Configuration config = new YarnConfiguration();
    fileContext = uri.getScheme() == null ? FileContext.getFileContext(config) : FileContext.getFileContext(uri, config);
    FSDataInputStream is = fileContext.open(configFilePath);
    config.addResource(is);
    LOG.debug("Read launch configuration");
    return config;
  } catch (FileNotFoundException ex) {
    LOG.warn("Configuration file not found {}", configFilePath);
    return new Configuration();
  }
}
 
源代码12 项目: hadoop   文件: TestContainerLocalizer.java
@Test
@SuppressWarnings("unchecked") // mocked generics
public void testContainerLocalizerClosesFilesystems() throws Exception {
  // verify filesystems are closed when localizer doesn't fail
  FileContext fs = FileContext.getLocalFSFileContext();
  spylfs = spy(fs.getDefaultFileSystem());
  ContainerLocalizer localizer = setupContainerLocalizerForTest();
  doNothing().when(localizer).localizeFiles(any(LocalizationProtocol.class),
      any(CompletionService.class), any(UserGroupInformation.class));
  verify(localizer, never()).closeFileSystems(
      any(UserGroupInformation.class));
  localizer.runLocalization(nmAddr);
  verify(localizer).closeFileSystems(any(UserGroupInformation.class));

  spylfs = spy(fs.getDefaultFileSystem());
  // verify filesystems are closed when localizer fails
  localizer = setupContainerLocalizerForTest();
  doThrow(new YarnRuntimeException("Forced Failure")).when(localizer).localizeFiles(
      any(LocalizationProtocol.class), any(CompletionService.class),
      any(UserGroupInformation.class));
  verify(localizer, never()).closeFileSystems(
      any(UserGroupInformation.class));
  localizer.runLocalization(nmAddr);
  verify(localizer).closeFileSystems(any(UserGroupInformation.class));
}
 
源代码13 项目: hadoop   文件: TestPermissionSymlinks.java
private void doReadTargetNotReadable() throws Exception {
  try {
    user.doAs(new PrivilegedExceptionAction<Object>() {
      @Override
      public Object run() throws IOException {
        FileContext myfc = FileContext.getFileContext(conf);
        myfc.open(link).read();
        return null;
      }
    });
    fail("Read link target even though target does not have"
        + " read permissions!");
  } catch (IOException e) {
    GenericTestUtils.assertExceptionContains("Permission denied", e);
  }
}
 
源代码14 项目: big-c   文件: TestDistributedShell.java
@After
public void tearDown() throws IOException {
  if (yarnCluster != null) {
    try {
      yarnCluster.stop();
    } finally {
      yarnCluster = null;
    }
  }
  FileContext fsContext = FileContext.getLocalFSFileContext();
  fsContext
      .delete(
          new Path(conf
              .get("yarn.timeline-service.leveldb-timeline-store.path")),
          true);
}
 
源代码15 项目: big-c   文件: TestPermissionSymlinks.java
private void doDeleteLinkParentNotWritable() throws Exception {
  // Try to delete where the symlink's parent dir is not writable
  try {
    user.doAs(new PrivilegedExceptionAction<Object>() {
      @Override
      public Object run() throws IOException {
        FileContext myfc = FileContext.getFileContext(conf);
        myfc.delete(link, false);
        return null;
      }
    });
    fail("Deleted symlink without write permissions on parent!");
  } catch (AccessControlException e) {
    GenericTestUtils.assertExceptionContains("Permission denied", e);
  }
}
 
源代码16 项目: hadoop   文件: TestPermissionSymlinks.java
private void doDeleteLinkParentNotWritable() throws Exception {
  // Try to delete where the symlink's parent dir is not writable
  try {
    user.doAs(new PrivilegedExceptionAction<Object>() {
      @Override
      public Object run() throws IOException {
        FileContext myfc = FileContext.getFileContext(conf);
        myfc.delete(link, false);
        return null;
      }
    });
    fail("Deleted symlink without write permissions on parent!");
  } catch (AccessControlException e) {
    GenericTestUtils.assertExceptionContains("Permission denied", e);
  }
}
 
源代码17 项目: big-c   文件: DataGenerator.java
/** Parse the command line arguments and initialize the data */
private int init(String[] args) {
  try { // initialize file system handle
    fc = FileContext.getFileContext(getConf());
  } catch (IOException ioe) {
    System.err.println("Can not initialize the file system: " + 
        ioe.getLocalizedMessage());
    return -1;
  }

  for (int i = 0; i < args.length; i++) { // parse command line
    if (args[i].equals("-root")) {
      root = new Path(args[++i]);
    } else if (args[i].equals("-inDir")) {
      inDir = new File(args[++i]);
    } else {
      System.err.println(USAGE);
      ToolRunner.printGenericCommandUsage(System.err);
      System.exit(-1);
    }
  }
  return 0;
}
 
源代码18 项目: spliceengine   文件: MiniYARNClusterSplice.java
@Override
protected synchronized void serviceStop() throws Exception {
    if (resourceManagers[index] != null) {
        waitForAppMastersToFinish(5000);
        resourceManagers[index].stop();
    }

    if (Shell.WINDOWS) {
        // On Windows, clean up the short temporary symlink that was created to
        // work around path length limitation.
        String testWorkDirPath = testWorkDir.getAbsolutePath();
        try {
            FileContext.getLocalFSFileContext().delete(new Path(testWorkDirPath),
                                                       true);
        } catch (IOException e) {
            LOG.warn("could not cleanup symlink: " +
                         testWorkDir.getAbsolutePath());
        }
    }
    super.serviceStop();
}
 
源代码19 项目: hadoop   文件: TestLinuxContainerExecutor.java
@Test
public void testContainerLaunch() throws Exception {
  Assume.assumeTrue(shouldRun());
  String expectedRunAsUser =
      conf.get(YarnConfiguration.NM_NONSECURE_MODE_LOCAL_USER_KEY,
        YarnConfiguration.DEFAULT_NM_NONSECURE_MODE_LOCAL_USER);

  File touchFile = new File(workSpace, "touch-file");
  int ret = runAndBlock("touch", touchFile.getAbsolutePath());

  assertEquals(0, ret);
  FileStatus fileStatus =
      FileContext.getLocalFSFileContext().getFileStatus(
        new Path(touchFile.getAbsolutePath()));
  assertEquals(expectedRunAsUser, fileStatus.getOwner());
  cleanupAppFiles(expectedRunAsUser);

}
 
源代码20 项目: attic-apex-malhar   文件: FileSplitterTest.java
@Override
protected void finished(Description description)
{
  try {
    FileContext.getLocalFSFileContext()
        .delete(new Path(new File("target/" + description.getClassName()).getAbsolutePath()), true);
  } catch (IOException e) {
    throw new RuntimeException(e);
  }
}
 
源代码21 项目: hadoop   文件: TestChRootedFs.java
@Test
public void testList() throws IOException {
  
  FileStatus fs = fc.getFileStatus(new Path("/"));
  Assert.assertTrue(fs.isDirectory());
  //  should return the full path not the chrooted path
  Assert.assertEquals(fs.getPath(), chrootedTo);
  
  // list on Slash
  
  FileStatus[] dirPaths = fc.util().listStatus(new Path("/"));

  Assert.assertEquals(0, dirPaths.length);
  
  

  fileContextTestHelper.createFileNonRecursive(fc, "/foo");
  fileContextTestHelper.createFileNonRecursive(fc, "/bar");
  fc.mkdir(new Path("/dirX"), FileContext.DEFAULT_PERM, false);
  fc.mkdir(fileContextTestHelper.getTestRootPath(fc, "/dirY"),
      FileContext.DEFAULT_PERM, false);
  fc.mkdir(new Path("/dirX/dirXX"), FileContext.DEFAULT_PERM, false);
  
  dirPaths = fc.util().listStatus(new Path("/"));
  Assert.assertEquals(4, dirPaths.length);
  
  // Note the the file status paths are the full paths on target
  fs = fileContextTestHelper.containsPath(fcTarget, "foo", dirPaths);
    Assert.assertNotNull(fs);
    Assert.assertTrue(fs.isFile());
  fs = fileContextTestHelper.containsPath(fcTarget, "bar", dirPaths);
    Assert.assertNotNull(fs);
    Assert.assertTrue(fs.isFile());
  fs = fileContextTestHelper.containsPath(fcTarget, "dirX", dirPaths);
    Assert.assertNotNull(fs);
    Assert.assertTrue(fs.isDirectory());
  fs = fileContextTestHelper.containsPath(fcTarget, "dirY", dirPaths);
    Assert.assertNotNull(fs);
    Assert.assertTrue(fs.isDirectory());
}
 
源代码22 项目: hadoop   文件: TestNodeHealthScriptRunner.java
@After
public void tearDown() throws Exception {
  if (testRootDir.exists()) {
    FileContext.getLocalFSFileContext().delete(
        new Path(testRootDir.getAbsolutePath()), true);
  }
}
 
源代码23 项目: hadoop   文件: TestListFilesInFileContext.java
private static void writeFile(FileContext fc, Path name, int fileSize)
throws IOException {
  // Create and write a file that contains three blocks of data
  FSDataOutputStream stm = fc.create(name, EnumSet.of(CreateFlag.CREATE),
      Options.CreateOpts.createParent());
  byte[] buffer = new byte[fileSize];
  Random rand = new Random(seed);
  rand.nextBytes(buffer);
  stm.write(buffer);
  stm.close();
}
 
源代码24 项目: kafka-connect-fs   文件: AvroFileReader.java
public AvroFileReader(FileSystem fs, Path filePath, Map<String, Object> config) throws IOException {
    super(fs, filePath, new GenericRecordToStruct(), config);

    AvroFSInput input = new AvroFSInput(FileContext.getFileContext(filePath.toUri()), filePath);
    if (this.schema == null) {
        this.reader = new DataFileReader<>(input, new SpecificDatumReader<>());
    } else {
        this.reader = new DataFileReader<>(input, new SpecificDatumReader<>(this.schema));
    }
    this.closed = false;
}
 
@Test
public void testMultipleFileAvroReads() throws Exception
{
  FileContext.getLocalFSFileContext().delete(new Path(new File(testMeta.dir).getAbsolutePath()), true);

  int cnt = 7;

  createAvroInput(cnt);

  writeAvroFile(new File(FILENAME));
  writeAvroFile(new File(OTHER_FILE));

  avroFileInput.output.setSink(output);
  avroFileInput.completedFilesPort.setSink(completedFilesPort);
  avroFileInput.errorRecordsPort.setSink(errorRecordsPort);
  avroFileInput.setDirectory(testMeta.dir);
  avroFileInput.setup(testMeta.context);

  avroFileInput.beginWindow(0);
  avroFileInput.emitTuples();
  avroFileInput.beginWindow(1);
  avroFileInput.emitTuples();

  Assert.assertEquals("number tuples after window 0", cnt, output.collectedTuples.size());

  avroFileInput.emitTuples();
  avroFileInput.endWindow();

  Assert.assertEquals("Error tuples", 0, errorRecordsPort.collectedTuples.size());
  Assert.assertEquals("number tuples after window 1", 2 * cnt, output.collectedTuples.size());
  Assert.assertEquals("Completed File", 2, completedFilesPort.collectedTuples.size());

  avroFileInput.teardown();

}
 
源代码26 项目: hadoop   文件: AggregatedLogFormat.java
public LogReader(Configuration conf, Path remoteAppLogFile)
    throws IOException {
  FileContext fileContext =
      FileContext.getFileContext(remoteAppLogFile.toUri(), conf);
  this.fsDataIStream = fileContext.open(remoteAppLogFile);
  reader =
      new TFile.Reader(this.fsDataIStream, fileContext.getFileStatus(
          remoteAppLogFile).getLen(), conf);
  this.scanner = reader.createScanner();
}
 
源代码27 项目: hadoop   文件: LoadGeneratorMR.java
/** Main function called by tool runner.
 * It first initializes data by parsing the command line arguments.
 * It then calls the loadGenerator
 */
@Override
public int run(String[] args) throws Exception {
  int exitCode = parseArgsMR(args);
  if (exitCode != 0) {
    return exitCode;
  }
  System.out.println("Running LoadGeneratorMR against fileSystem: " + 
  FileContext.getFileContext().getDefaultFileSystem().getUri());

  return submitAsMapReduce(); // reducer will print the results
}
 
源代码28 项目: hadoop   文件: TestViewFsHdfs.java
@BeforeClass
public static void clusterSetupAtBegining() throws IOException,
    LoginException, URISyntaxException {
  SupportsBlocks = true;
  CONF.setBoolean(
      DFSConfigKeys.DFS_NAMENODE_DELEGATION_TOKEN_ALWAYS_USE_KEY, true);

  cluster = new MiniDFSCluster.Builder(CONF).numDataNodes(2).build();
  cluster.waitClusterUp();
  fc = FileContext.getFileContext(cluster.getURI(0), CONF);
  Path defaultWorkingDirectory = fc.makeQualified( new Path("/user/" + 
      UserGroupInformation.getCurrentUser().getShortUserName()));
  fc.mkdir(defaultWorkingDirectory, FileContext.DEFAULT_PERM, true);
}
 
源代码29 项目: hadoop   文件: TestYARNRunner.java
@Before
public void setUp() throws Exception {
  resourceMgrDelegate = mock(ResourceMgrDelegate.class);
  conf = new YarnConfiguration();
  conf.set(YarnConfiguration.RM_PRINCIPAL, "mapred/[email protected]");
  clientCache = new ClientCache(conf, resourceMgrDelegate);
  clientCache = spy(clientCache);
  yarnRunner = new YARNRunner(conf, resourceMgrDelegate, clientCache);
  yarnRunner = spy(yarnRunner);
  submissionContext = mock(ApplicationSubmissionContext.class);
  doAnswer(
      new Answer<ApplicationSubmissionContext>() {
        @Override
        public ApplicationSubmissionContext answer(InvocationOnMock invocation)
            throws Throwable {
          return submissionContext;
        }
      }
      ).when(yarnRunner).createApplicationSubmissionContext(any(Configuration.class),
          any(String.class), any(Credentials.class));

  appId = ApplicationId.newInstance(System.currentTimeMillis(), 1);
  jobId = TypeConverter.fromYarn(appId);
  if (testWorkDir.exists()) {
    FileContext.getLocalFSFileContext().delete(new Path(testWorkDir.toString()), true);
  }
  testWorkDir.mkdirs();
}
 
源代码30 项目: attic-apex-malhar   文件: FileSystemWALTest.java
@Test
public void testFinalizeWithDelete() throws IOException
{
  testMeta.fsWAL.setMaxLength(2 * 1024);
  testMeta.fsWAL.setup();

  FileSystemWAL.FileSystemWALWriter fsWALWriter = testMeta.fsWAL.getWriter();

  write1KRecords(fsWALWriter, 2);
  testMeta.fsWAL.beforeCheckpoint(0);

  write1KRecords(fsWALWriter, 2);
  testMeta.fsWAL.beforeCheckpoint(1);

  write1KRecords(fsWALWriter, 2);
  testMeta.fsWAL.beforeCheckpoint(2);

  FileSystemWAL.FileSystemWALReader fsWALReader = testMeta.fsWAL.getReader();
  assertNumTuplesRead(fsWALReader, 6);

  testMeta.fsWAL.committed(0);

  fsWALWriter.delete(new FileSystemWAL.FileSystemWALPointer(2, 0));

  FileContext fileContext = FileContextUtils.getFileContext(testMeta.fsWAL.getFilePath());
  Assert.assertTrue("part 0 exists ", !fileContext.util().exists(new Path(testMeta.fsWAL.getPartFilePath(0))));

  testMeta.fsWAL.committed(1);
  Assert.assertTrue("part 1 exists ", !fileContext.util().exists(new Path(testMeta.fsWAL.getPartFilePath(1))));

  fsWALReader.seek(fsWALReader.getStartPointer());
  assertNumTuplesRead(fsWALReader, 2);
}
 
 类所在包
 同包方法