类org.apache.hadoop.fs.RemoteIterator源码实例Demo

下面列出了怎么用org.apache.hadoop.fs.RemoteIterator的API类实例代码及写法,或者点击链接到github查看源代码。

源代码1 项目: Bats   文件: FSAgent.java
public List<String> listFiles(String dir) throws IOException
{
  List<String> files = new ArrayList<>();
  Path path = new Path(dir);

  FileStatus fileStatus = fileSystem.getFileStatus(path);
  if (!fileStatus.isDirectory()) {
    throw new FileNotFoundException("Cannot read directory " + dir);
  }
  RemoteIterator<LocatedFileStatus> it = fileSystem.listFiles(path, false);
  while (it.hasNext()) {
    LocatedFileStatus lfs = it.next();
    files.add(lfs.getPath().getName());
  }
  return files;
}
 
源代码2 项目: Bats   文件: FSAgent.java
public List<LocatedFileStatus> listFilesInfo(String dir) throws IOException
{
  List<LocatedFileStatus> files = new ArrayList<>();
  Path path = new Path(dir);

  FileStatus fileStatus = fileSystem.getFileStatus(path);
  if (!fileStatus.isDirectory()) {
    throw new FileNotFoundException("Cannot read directory " + dir);
  }
  RemoteIterator<LocatedFileStatus> it = fileSystem.listFiles(path, false);
  while (it.hasNext()) {
    LocatedFileStatus lfs = it.next();
    files.add(lfs);
  }
  return files;
}
 
源代码3 项目: hadoop   文件: TestFileStatus.java
/** Test the FileStatus obtained calling listStatus on a file */
@Test
public void testListStatusOnFile() throws IOException {
  FileStatus[] stats = fs.listStatus(file1);
  assertEquals(1, stats.length);
  FileStatus status = stats[0];
  assertFalse(file1 + " should be a file", status.isDirectory());
  assertEquals(blockSize, status.getBlockSize());
  assertEquals(1, status.getReplication());
  assertEquals(fileSize, status.getLen());
  assertEquals(file1.makeQualified(fs.getUri(), 
      fs.getWorkingDirectory()).toString(), 
      status.getPath().toString());
  
  RemoteIterator<FileStatus> itor = fc.listStatus(file1);
  status = itor.next();
  assertEquals(stats[0], status);
  assertFalse(file1 + " should be a file", status.isDirectory());
}
 
源代码4 项目: presto   文件: CachingDirectoryLister.java
private static RemoteIterator<LocatedFileStatus> simpleRemoteIterator(List<LocatedFileStatus> files)
{
    return new RemoteIterator<LocatedFileStatus>()
    {
        private final Iterator<LocatedFileStatus> iterator = ImmutableList.copyOf(files).iterator();

        @Override
        public boolean hasNext()
        {
            return iterator.hasNext();
        }

        @Override
        public LocatedFileStatus next()
        {
            return iterator.next();
        }
    };
}
 
源代码5 项目: presto   文件: TestBackgroundHiveSplitLoader.java
@Override
public RemoteIterator<LocatedFileStatus> listLocatedStatus(Path f)
{
    return new RemoteIterator<LocatedFileStatus>()
    {
        private final Iterator<LocatedFileStatus> iterator = files.iterator();

        @Override
        public boolean hasNext()
        {
            return iterator.hasNext();
        }

        @Override
        public LocatedFileStatus next()
        {
            return iterator.next();
        }
    };
}
 
源代码6 项目: kylin-on-parquet-v2   文件: ColumnarFilesReader.java
void checkPath() {
    try {
        RemoteIterator<LocatedFileStatus> files = fs.listFiles(folderPath, false);
        if (files == null) {
            throw new IllegalArgumentException("Invalid path " + folderPath);
        }
        while (files.hasNext()) {
            LocatedFileStatus fileStatus = files.next();
            Path path = fileStatus.getPath();
            String name = path.getName();

            if (name.endsWith(Constants.DATA_FILE_SUFFIX)) {
                dataFilePath = path;
            } else if (name.endsWith(Constants.META_FILE_SUFFIX)) {
                metaFilePath = path;
            } else {
                logger.warn("Contains invalid file {} in path {}", path, folderPath);
            }
        }
        if (dataFilePath == null || metaFilePath == null) {
            throw new IllegalArgumentException("Invalid path " + folderPath);
        }
    } catch (IOException e) {
        throw new RuntimeException("io error", e);
    }
}
 
源代码7 项目: dremio-oss   文件: PseudoDistributedFileSystem.java
@Override
protected Callable<RemoteIterator<FileStatus>> newMapTask(final String address) throws IOException {
  return new Callable<RemoteIterator<FileStatus>>() {
    @Override
    public RemoteIterator<FileStatus> call() throws Exception {
      // Only directories should be listed with a fork/join task
      final FileSystem fs = getDelegateFileSystem(address);
      FileStatus status = fs.getFileStatus(path);
      if (status.isFile()) {
        throw new FileNotFoundException("Directory not found: " + path);
      }
      final RemoteIterator<FileStatus> remoteStatusIter = fs.listStatusIterator(path);
      return new RemoteIterator<FileStatus>() {
        @Override
        public boolean hasNext() throws IOException {
          return remoteStatusIter.hasNext();
        }

        @Override
        public FileStatus next() throws IOException {
          return fixFileStatus(address, remoteStatusIter.next());
        }
      };
    }
  };
}
 
@Test
public void testListStatusIteratorPastLastElement() throws IOException {
  final Path root = new Path("/");
  final RemoteIterator<FileStatus> statusIter = fs.listStatusIterator(root);

  while (statusIter.hasNext()) {
    statusIter.next();
  }

  try {
    statusIter.next();
    fail("NoSuchElementException should be throw when next() is called when there are no elements remaining.");
  } catch (NoSuchElementException ex) {
    // OK.
  }
}
 
源代码9 项目: incubator-crail   文件: HdfsIOBenchmark.java
void enumerateDir() throws Exception {
	System.out.println("enumarate dir, path " + path);
	Configuration conf = new Configuration();
	FileSystem fs = FileSystem.get(conf); 

	int repfactor = 4;
	for (int k = 0; k < repfactor; k++) {
		long start = System.currentTimeMillis();
		for (int i = 0; i < size; i++) {
			// single operation == loop
			RemoteIterator<LocatedFileStatus> iter = fs.listFiles(path, false);
			while (iter.hasNext()) {
				iter.next();
			}
		}
		long end = System.currentTimeMillis();
		double executionTime = ((double) (end - start));
		double latency = executionTime * 1000.0 / ((double) size);
		System.out.println("execution time [ms] " + executionTime);
		System.out.println("latency [us] " + latency);
	}
	fs.close();
}
 
源代码10 项目: hadoop   文件: ResourceLocalizationService.java
private void deleteLocalDir(FileContext lfs, DeletionService del,
    String localDir) throws IOException {
  RemoteIterator<FileStatus> fileStatus = lfs.listStatus(new Path(localDir));
  if (fileStatus != null) {
    while (fileStatus.hasNext()) {
      FileStatus status = fileStatus.next();
      try {
        if (status.getPath().getName().matches(".*" +
            ContainerLocalizer.USERCACHE + "_DEL_.*")) {
          LOG.info("usercache path : " + status.getPath().toString());
          cleanUpFilesPerUserDir(lfs, del, status.getPath());
        } else if (status.getPath().getName()
            .matches(".*" + NM_PRIVATE_DIR + "_DEL_.*")
            ||
            status.getPath().getName()
                .matches(".*" + ContainerLocalizer.FILECACHE + "_DEL_.*")) {
          del.delete(null, status.getPath(), new Path[] {});
        }
      } catch (IOException ex) {
        // Do nothing, just give the warning
        LOG.warn("Failed to delete this local Directory: " +
            status.getPath().getName());
      }
    }
  }
}
 
源代码11 项目: dremio-oss   文件: TestRemoteNodeFileSystemDual.java
@Test
public void testClientWriteEmptyFile() throws Exception {
  Path basePath = new Path(temporaryFolder.newFolder().getAbsolutePath());
  Path path = ((PathCanonicalizer) clientFS).canonicalizePath(new Path(basePath, "testfile.bytes"));

  // create a file
  FSDataOutputStream stream = clientFS.create(path, false);
  // close it without writing anything to it
  stream.close();

  // make sure the file was created
  RemoteIterator<LocatedFileStatus> iter = client.fileSystem.listFiles(basePath, false);
  assertEquals(true, iter.hasNext());
  LocatedFileStatus status = iter.next();

  try(FSDataInputStream in = clientFS.open(status.getPath())){
    in.readByte();
    fail("Fail is expected to be empty");
  } catch (EOFException e) {
    // empty file as expected
  }

  client.fileSystem.delete(status.getPath(), false);
}
 
源代码12 项目: hadoop   文件: GenerateData.java
static DataStatistics publishPlainDataStatistics(Configuration conf, 
                                                 Path inputDir) 
throws IOException {
  FileSystem fs = inputDir.getFileSystem(conf);

  // obtain input data file statuses
  long dataSize = 0;
  long fileCount = 0;
  RemoteIterator<LocatedFileStatus> iter = fs.listFiles(inputDir, true);
  PathFilter filter = new Utils.OutputFileUtils.OutputFilesFilter();
  while (iter.hasNext()) {
    LocatedFileStatus lStatus = iter.next();
    if (filter.accept(lStatus.getPath())) {
      dataSize += lStatus.getLen();
      ++fileCount;
    }
  }

  // publish the plain data statistics
  LOG.info("Total size of input data : " 
           + StringUtils.humanReadableInt(dataSize));
  LOG.info("Total number of input data files : " + fileCount);
  
  return new DataStatistics(dataSize, fileCount, false);
}
 
源代码13 项目: hadoop   文件: FileInputFormat.java
/**
 * Add files in the input path recursively into the results.
 * @param result
 *          The List to store all files.
 * @param fs
 *          The FileSystem.
 * @param path
 *          The input path.
 * @param inputFilter
 *          The input filter that can be used to filter files/dirs. 
 * @throws IOException
 */
protected void addInputPathRecursively(List<FileStatus> result,
    FileSystem fs, Path path, PathFilter inputFilter) 
    throws IOException {
  RemoteIterator<LocatedFileStatus> iter = fs.listLocatedStatus(path);
  while (iter.hasNext()) {
    LocatedFileStatus stat = iter.next();
    if (inputFilter.accept(stat.getPath())) {
      if (stat.isDirectory()) {
        addInputPathRecursively(result, fs, stat.getPath(), inputFilter);
      } else {
        result.add(stat);
      }
    }
  }
}
 
源代码14 项目: hadoop   文件: LocatedFileStatusFetcher.java
@Override
public Result call() throws Exception {
  Result result = new Result();
  result.fs = fs;

  if (fileStatus.isDirectory()) {
    RemoteIterator<LocatedFileStatus> iter = fs
        .listLocatedStatus(fileStatus.getPath());
    while (iter.hasNext()) {
      LocatedFileStatus stat = iter.next();
      if (inputFilter.accept(stat.getPath())) {
        if (recursive && stat.isDirectory()) {
          result.dirsNeedingRecursiveCalls.add(stat);
        } else {
          result.locatedFileStatuses.add(stat);
        }
      }
    }
  } else {
    result.locatedFileStatuses.add(fileStatus);
  }
  return result;
}
 
源代码15 项目: hadoop   文件: FileInputFormat.java
/**
 * Add files in the input path recursively into the results.
 * @param result
 *          The List to store all files.
 * @param fs
 *          The FileSystem.
 * @param path
 *          The input path.
 * @param inputFilter
 *          The input filter that can be used to filter files/dirs. 
 * @throws IOException
 */
protected void addInputPathRecursively(List<FileStatus> result,
    FileSystem fs, Path path, PathFilter inputFilter) 
    throws IOException {
  RemoteIterator<LocatedFileStatus> iter = fs.listLocatedStatus(path);
  while (iter.hasNext()) {
    LocatedFileStatus stat = iter.next();
    if (inputFilter.accept(stat.getPath())) {
      if (stat.isDirectory()) {
        addInputPathRecursively(result, fs, stat.getPath(), inputFilter);
      } else {
        result.add(stat);
      }
    }
  }
}
 
源代码16 项目: hadoop   文件: HistoryFileManager.java
@VisibleForTesting
protected static List<FileStatus> scanDirectory(Path path, FileContext fc,
    PathFilter pathFilter) throws IOException {
  path = fc.makeQualified(path);
  List<FileStatus> jhStatusList = new ArrayList<FileStatus>();
  try {
    RemoteIterator<FileStatus> fileStatusIter = fc.listStatus(path);
    while (fileStatusIter.hasNext()) {
      FileStatus fileStatus = fileStatusIter.next();
      Path filePath = fileStatus.getPath();
      if (fileStatus.isFile() && pathFilter.accept(filePath)) {
        jhStatusList.add(fileStatus);
      }
    }
  } catch (FileNotFoundException fe) {
    LOG.error("Error while scanning directory " + path, fe);
  }
  return jhStatusList;
}
 
源代码17 项目: hadoop   文件: CryptoAdmin.java
@Override
public int run(Configuration conf, List<String> args) throws IOException {
  if (!args.isEmpty()) {
    System.err.println("Can't understand argument: " + args.get(0));
    return 1;
  }

  final DistributedFileSystem dfs = AdminHelper.getDFS(conf);
  try {
    final TableListing listing = new TableListing.Builder()
      .addField("").addField("", true)
      .wrapWidth(AdminHelper.MAX_LINE_WIDTH).hideHeaders().build();
    final RemoteIterator<EncryptionZone> it = dfs.listEncryptionZones();
    while (it.hasNext()) {
      EncryptionZone ez = it.next();
      listing.addRow(ez.getPath(), ez.getKeyName());
    }
    System.out.println(listing.toString());
  } catch (IOException e) {
    System.err.println(prettifyException(e));
    return 2;
  }

  return 0;
}
 
源代码18 项目: hadoop   文件: TestDistributedFileSystem.java
@Test(timeout=60000)
public void testListFiles() throws IOException {
  Configuration conf = new HdfsConfiguration();
  MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf).build();
  
  try {
    DistributedFileSystem fs = cluster.getFileSystem();

    final Path relative = new Path("relative");
    fs.create(new Path(relative, "foo")).close();

    final List<LocatedFileStatus> retVal = new ArrayList<LocatedFileStatus>();
    final RemoteIterator<LocatedFileStatus> iter = fs.listFiles(relative, true);
    while (iter.hasNext()) {
      retVal.add(iter.next());
    }
    System.out.println("retVal = " + retVal);
  } finally {
    cluster.shutdown();
  }
}
 
源代码19 项目: hadoop   文件: TestRetryCacheWithHA.java
@Override
boolean checkNamenodeBeforeReturn() throws Exception {
  for (int i = 0; i < CHECKTIMES; i++) {
    RemoteIterator<CacheDirectiveEntry> iter =
        dfs.listCacheDirectives(
            new CacheDirectiveInfo.Builder().
                setPool(directive.getPool()).
                setPath(directive.getPath()).
                build());
    if (iter.hasNext()) {
      return true;
    }
    Thread.sleep(1000);
  }
  return false;
}
 
源代码20 项目: hadoop   文件: TestRetryCacheWithHA.java
@Override
boolean checkNamenodeBeforeReturn() throws Exception {
  for (int i = 0; i < CHECKTIMES; i++) {
    RemoteIterator<CacheDirectiveEntry> iter =
        dfs.listCacheDirectives(
            new CacheDirectiveInfo.Builder().
                setPool(directive.getPool()).
                setPath(directive.getPath()).
                build());
    while (iter.hasNext()) {
      CacheDirectiveInfo result = iter.next().getInfo();
      if ((result.getId() == id) &&
          (result.getReplication().shortValue() == newReplication)) {
        return true;
      }
    }
    Thread.sleep(1000);
  }
  return false;
}
 
源代码21 项目: hadoop   文件: TestRetryCacheWithHA.java
@Override
boolean checkNamenodeBeforeReturn() throws Exception {
  for (int i = 0; i < CHECKTIMES; i++) {
    RemoteIterator<CacheDirectiveEntry> iter =
        dfs.listCacheDirectives(
            new CacheDirectiveInfo.Builder().
              setPool(directive.getPool()).
              setPath(directive.getPath()).
              build());
    if (!iter.hasNext()) {
      return true;
    }
    Thread.sleep(1000);
  }
  return false;
}
 
源代码22 项目: hadoop   文件: TestRetryCacheWithHA.java
@SuppressWarnings("unchecked")
private void listCachePools(
    HashSet<String> poolNames, int active) throws Exception {
  HashSet<String> tmpNames = (HashSet<String>)poolNames.clone();
  RemoteIterator<CachePoolEntry> pools = dfs.listCachePools();
  int poolCount = poolNames.size();
  for (int i=0; i<poolCount; i++) {
    CachePoolEntry pool = pools.next();
    String pollName = pool.getInfo().getPoolName();
    assertTrue("The pool name should be expected", tmpNames.remove(pollName));
    if (i % 2 == 0) {
      int standby = active;
      active = (standby == 0) ? 1 : 0;
      cluster.transitionToStandby(standby);
      cluster.transitionToActive(active);
      cluster.waitActive(active);
    }
  }
  assertTrue("All pools must be found", tmpNames.isEmpty());
}
 
源代码23 项目: hadoop   文件: TestINodeFile.java
private static void checkEquals(RemoteIterator<LocatedFileStatus> i1,
    RemoteIterator<LocatedFileStatus> i2) throws IOException {
  while (i1.hasNext()) {
    assertTrue(i2.hasNext());
    
    // Compare all the fields but the path name, which is relative
    // to the original path from listFiles.
    LocatedFileStatus l1 = i1.next();
    LocatedFileStatus l2 = i2.next();
    assertEquals(l1.getAccessTime(), l2.getAccessTime());
    assertEquals(l1.getBlockSize(), l2.getBlockSize());
    assertEquals(l1.getGroup(), l2.getGroup());
    assertEquals(l1.getLen(), l2.getLen());
    assertEquals(l1.getModificationTime(), l2.getModificationTime());
    assertEquals(l1.getOwner(), l2.getOwner());
    assertEquals(l1.getPermission(), l2.getPermission());
    assertEquals(l1.getReplication(), l2.getReplication());
  }
  assertFalse(i2.hasNext());
}
 
源代码24 项目: camel-kafka-connector   文件: HDFSEasy.java
public List<LocatedFileStatus> listFiles(Path path) throws IOException {
    RemoteIterator<LocatedFileStatus> i = dfs.listFiles(path, false);

    List<LocatedFileStatus> retList = new ArrayList<>();
    while (i.hasNext()) {
        LocatedFileStatus locatedFileStatus = i.next();
        retList.add(locatedFileStatus);
    }

    return retList;
}
 
源代码25 项目: camel-kafka-connector   文件: HDFSEasy.java
public int countFiles(Path path) throws IOException {
    RemoteIterator<LocatedFileStatus> i = dfs.listFiles(path, false);
    int files = 0;
    while (i.hasNext()) {
        files++;
        i.next();
    }

    return files;
}
 
源代码26 项目: Bats   文件: FSStorageAgent.java
@Override
public long[] getWindowIds(int operatorId) throws IOException
{
  Path lPath = new Path(path + Path.SEPARATOR + String.valueOf(operatorId));
  try {
    FileStatus status = fileContext.getFileStatus(lPath);
    if (!status.isDirectory()) {
      throw new RuntimeException("Checkpoint location is not a directory");
    }
  } catch (FileNotFoundException ex) {
    // During initialization checkpoint directory may not exists.
    fileContext.mkdir(lPath, FsPermission.getDirDefault(), true);
  }

  RemoteIterator<FileStatus> fileStatusRemoteIterator = fileContext.listStatus(lPath);
  List<Long> lwindows = new ArrayList<>();
  while (fileStatusRemoteIterator.hasNext()) {
    FileStatus fileStatus = fileStatusRemoteIterator.next();
    String name = fileStatus.getPath().getName();
    if (name.equals(TMP_FILE)) {
      continue;
    }
    lwindows.add(STATELESS_CHECKPOINT_WINDOW_ID.equals(name) ? Stateless.WINDOW_ID : Long.parseLong(name, 16));
  }
  long[] windowIds = new long[lwindows.size()];
  for (int i = 0; i < windowIds.length; i++) {
    windowIds[i] = lwindows.get(i);
  }
  return windowIds;
}
 
@Test
public void testListStatusIteratorRoot() throws IOException {
  final Path root = new Path("/");
  final RemoteIterator<FileStatus> statusIterator = fs.listStatusIterator(root);

  assertTrue(statusIterator.hasNext());

  final FileStatus onlyStatus = statusIterator.next();
  assertEquals(new Path("pdfs:/foo"), onlyStatus.getPath());
  assertTrue(onlyStatus.isDirectory());
  assertEquals(0755, onlyStatus.getPermission().toExtendedShort());

  assertTrue(!statusIterator.hasNext());
}
 
源代码28 项目: dremio-oss   文件: RemoteIterators.java
/**
 * Filter a RemoteIterator based on a predicate that is allowed to throw an IOException.
 *
 * @param iter The RemoteIterator to filter.
 * @param predicate The predicate to apply.
 * @return the new RemoteIterator
 */
public static RemoteIterator<LocatedFileStatus> filter(RemoteIterator<LocatedFileStatus> iter, PredicateWithIOException<LocatedFileStatus> predicate) {
  return new RemoteIterators.IterToRemote(Iterators.filter(
      new RemoteIterators.RemoteToIter(iter),
      t -> {
        try {
          return predicate.apply(t);
        } catch (IOException ex) {
          throw new CaughtIO(ex);
        }
      }
  ));
}
 
源代码29 项目: kylin-on-parquet-v2   文件: HDFSResourceStore.java
TreeSet<String> getAllFilePath(Path filePath, String resPathPrefix) throws IOException {
    String fsPathPrefix = filePath.toUri().getPath();

    TreeSet<String> fileList = new TreeSet<>();
    RemoteIterator<LocatedFileStatus> it = fs.listFiles(filePath, true);
    while (it.hasNext()) {
        String path = it.next().getPath().toUri().getPath();
        if (!path.startsWith(fsPathPrefix))
            throw new IllegalStateException("File path " + path + " is supposed to start with " + fsPathPrefix);

        String resPath = resPathPrefix + path.substring(fsPathPrefix.length() + 1);
        fileList.add(resPath);
    }
    return fileList;
}
 
源代码30 项目: kylin-on-parquet-v2   文件: HDFSResourceStore.java
@Override
protected void visitFolderImpl(String folderPath, boolean recursive, VisitFilter filter, boolean loadContent,
        Visitor visitor) throws IOException {
    Path p = getRealHDFSPath(folderPath);
    if (!fs.exists(p) || !fs.isDirectory(p)) {
        return;
    }

    String fsPathPrefix = p.toUri().getPath();
    String resPathPrefix = folderPath.endsWith("/") ? folderPath : folderPath + "/";

    RemoteIterator<LocatedFileStatus> it = fs.listFiles(p, recursive);
    while (it.hasNext()) {
        LocatedFileStatus status = it.next();
        if (status.isDirectory())
            continue;

        String path = status.getPath().toUri().getPath();
        if (!path.startsWith(fsPathPrefix))
            throw new IllegalStateException("File path " + path + " is supposed to start with " + fsPathPrefix);

        String resPath = resPathPrefix + path.substring(fsPathPrefix.length() + 1);

        if (filter.matches(resPath, status.getModificationTime())) {
            RawResource raw;
            if (loadContent)
                raw = new RawResource(resPath, status.getModificationTime(), fs.open(status.getPath()));
            else
                raw = new RawResource(resPath, status.getModificationTime());

            try {
                visitor.visit(raw);
            } finally {
                raw.close();
            }
        }
    }
}
 
 类所在包
 类方法
 同包方法