类org.apache.hadoop.fs.PathFilter源码实例Demo

下面列出了怎么用org.apache.hadoop.fs.PathFilter的API类实例代码及写法,或者点击链接到github查看源代码。

源代码1 项目: hadoop   文件: GenerateData.java
static DataStatistics publishPlainDataStatistics(Configuration conf, 
                                                 Path inputDir) 
throws IOException {
  FileSystem fs = inputDir.getFileSystem(conf);

  // obtain input data file statuses
  long dataSize = 0;
  long fileCount = 0;
  RemoteIterator<LocatedFileStatus> iter = fs.listFiles(inputDir, true);
  PathFilter filter = new Utils.OutputFileUtils.OutputFilesFilter();
  while (iter.hasNext()) {
    LocatedFileStatus lStatus = iter.next();
    if (filter.accept(lStatus.getPath())) {
      dataSize += lStatus.getLen();
      ++fileCount;
    }
  }

  // publish the plain data statistics
  LOG.info("Total size of input data : " 
           + StringUtils.humanReadableInt(dataSize));
  LOG.info("Total number of input data files : " + fileCount);
  
  return new DataStatistics(dataSize, fileCount, false);
}
 
源代码2 项目: streamx   文件: FileUtils.java
private static ArrayList<FileStatus> traverseImpl(Storage storage, Path path, PathFilter filter)
    throws IOException {
  if (!storage.exists(path.toString())) {
    return new ArrayList<>();
  }
  ArrayList<FileStatus> result = new ArrayList<>();
  FileStatus[] statuses = storage.listStatus(path.toString());
  for (FileStatus status : statuses) {
    if (status.isDirectory()) {
      result.addAll(traverseImpl(storage, status.getPath(), filter));
    } else {
      if (filter.accept(status.getPath())) {
        result.add(status);
      }
    }
  }
  return result;
}
 
源代码3 项目: big-c   文件: FileInputFormat.java
/**
 * Add files in the input path recursively into the results.
 * @param result
 *          The List to store all files.
 * @param fs
 *          The FileSystem.
 * @param path
 *          The input path.
 * @param inputFilter
 *          The input filter that can be used to filter files/dirs. 
 * @throws IOException
 */
protected void addInputPathRecursively(List<FileStatus> result,
    FileSystem fs, Path path, PathFilter inputFilter) 
    throws IOException {
  RemoteIterator<LocatedFileStatus> iter = fs.listLocatedStatus(path);
  while (iter.hasNext()) {
    LocatedFileStatus stat = iter.next();
    if (inputFilter.accept(stat.getPath())) {
      if (stat.isDirectory()) {
        addInputPathRecursively(result, fs, stat.getPath(), inputFilter);
      } else {
        result.add(stat);
      }
    }
  }
}
 
@Override
protected List<FileStatus> getTaskOutput(TaskAttemptContext context)
    throws IOException {
  PathFilter filter = HiddenPathFilter.get();

  // get files on the local FS in the attempt path
  Path attemptPath = getTaskAttemptPath(context);
  FileSystem attemptFS = attemptPath.getFileSystem(context.getConfiguration());
  RemoteIterator<LocatedFileStatus> iter = attemptFS
      .listFiles(attemptPath, true /* recursive */ );

  List<FileStatus> stats = Lists.newArrayList();
  while (iter.hasNext()) {
    FileStatus stat = iter.next();
    if (filter.accept(stat.getPath())) {
      stats.add(stat);
    }
  }

  return stats;
}
 
源代码5 项目: hadoop   文件: FileInputFormat.java
/**
 * Add files in the input path recursively into the results.
 * @param result
 *          The List to store all files.
 * @param fs
 *          The FileSystem.
 * @param path
 *          The input path.
 * @param inputFilter
 *          The input filter that can be used to filter files/dirs. 
 * @throws IOException
 */
protected void addInputPathRecursively(List<FileStatus> result,
    FileSystem fs, Path path, PathFilter inputFilter) 
    throws IOException {
  RemoteIterator<LocatedFileStatus> iter = fs.listLocatedStatus(path);
  while (iter.hasNext()) {
    LocatedFileStatus stat = iter.next();
    if (inputFilter.accept(stat.getPath())) {
      if (stat.isDirectory()) {
        addInputPathRecursively(result, fs, stat.getPath(), inputFilter);
      } else {
        result.add(stat);
      }
    }
  }
}
 
源代码6 项目: hadoop-etl-udfs   文件: HCatMetadataServiceTest.java
@Override
public FileStatus[] listStatus(Path path, PathFilter filter) throws IOException {
    List<FileStatus> subPaths = pathsAndContent.get(path.toString());
    List<FileStatus> filteredSubPaths = new ArrayList<>();
    for (FileStatus subPath : subPaths) {
        if (filter.accept(subPath.getPath())) {
            filteredSubPaths.add(fakeFileStatus(subPath.getPath().toString()));
        }
    }
    return filteredSubPaths.toArray(new FileStatus[filteredSubPaths.size()]);
}
 
源代码7 项目: dremio-oss   文件: HadoopFileSystemWrapper.java
@Override
public FileStatus[] listStatus(Path f, PathFilter filter) throws FileNotFoundException, IOException {
  try (WaitRecorder recorder = OperatorStats.getWaitRecorder(operatorStats)) {
    return underlyingFs.listStatus(f, filter);
  } catch(FSError e) {
    throw propagateFSError(e);
  }
}
 
源代码8 项目: Bats   文件: FileSystemUtil.java
/**
 * Will merge given array of filters into one.
 * If given array of filters is empty, will return {@link #DUMMY_FILTER}.
 *
 * @param filters array of filters
 * @return one filter that combines all given filters
 */
public static PathFilter mergeFilters(PathFilter... filters) {
  if (filters.length == 0) {
    return DUMMY_FILTER;
  }

  return path -> Stream.of(filters).allMatch(filter -> filter.accept(path));
}
 
源代码9 项目: Bats   文件: FileSystemUtil.java
/**
 * Lists file statuses non-recursively based on given file system objects {@link Scope}.
 *
 * @param fs file system
 * @param path path to file or directory
 * @param scope file system objects scope
 * @param suppressExceptions indicates if exceptions should be ignored
 * @param filter filter to be applied
 * @return list of file statuses
 */
private static List<FileStatus> listNonRecursive(FileSystem fs, Path path, Scope scope, boolean suppressExceptions, PathFilter filter) throws IOException {
  try {
    return Stream.of(fs.listStatus(path, filter))
      .filter(status -> isStatusApplicable(status, scope))
      .collect(Collectors.toList());
  } catch (Exception e) {
    if (suppressExceptions) {
      logger.debug("Exception during listing file statuses", e);
      return Collections.emptyList();
    } else {
      throw e;
    }
  }
}
 
源代码10 项目: big-c   文件: JobHistoryUtils.java
private static List<FileStatus> listFilteredStatus(FileContext fc, Path root,
    PathFilter filter) throws IOException {
  List<FileStatus> fsList = remoteIterToList(fc.listStatus(root));
  if (filter == null) {
    return fsList;
  } else {
    List<FileStatus> filteredList = new LinkedList<FileStatus>();
    for (FileStatus fs : fsList) {
      if (filter.accept(fs.getPath())) {
        filteredList.add(fs);
      }
    }
    return filteredList;
  }
}
 
源代码11 项目: big-c   文件: DistributedFileSystem.java
private DirListingIterator(Path p, PathFilter filter,
    boolean needLocation) throws IOException {
  this.p = p;
  this.src = getPathName(p);
  this.filter = filter;
  this.needLocation = needLocation;
  // fetch the first batch of entries in the directory
  thisListing = dfs.listPaths(src, HdfsFileStatus.EMPTY_NAME,
      needLocation);
  statistics.incrementReadOps(1);
  if (thisListing == null) { // the directory does not exist
    throw new FileNotFoundException("File " + p + " does not exist.");
  }
  i = 0;
}
 
public boolean accept(Path path) {
    for (PathFilter filter : filters) {
        if (!filter.accept(path)) {
            return false;
        }
    }
    return true;
}
 
源代码13 项目: stocator   文件: ObjectStoreFileSystem.java
@Override
protected RemoteIterator<LocatedFileStatus> listLocatedStatus(Path f,
    PathFilter filter)
    throws FileNotFoundException, IOException {
  LOG.debug("listLocatedStatus with path filter: {}", f.toString());
  return super.listLocatedStatus(f, filter);
}
 
源代码14 项目: big-c   文件: FileInputFormat.java
/**
 * Get a PathFilter instance of the filter set for the input paths.
 *
 * @return the PathFilter instance set for the job, NULL if none has been set.
 */
public static PathFilter getInputPathFilter(JobContext context) {
  Configuration conf = context.getConfiguration();
  Class<?> filterClass = conf.getClass(PATHFILTER_CLASS, null,
      PathFilter.class);
  return (filterClass != null) ?
      (PathFilter) ReflectionUtils.newInstance(filterClass, conf) : null;
}
 
源代码15 项目: big-c   文件: CombineFileInputFormat.java
public boolean accept(Path path) {
  for (PathFilter filter : filters) {
    if (filter.accept(path)) {
      return true;
    }
  }
  return false;
}
 
源代码16 项目: gemfirexd-oss   文件: BaseHoplogTestCase.java
protected FileStatus[] getBucketHoplogs(FileSystem fs, String regionAndBucket, final String type)
    throws IOException {
  FileStatus[] hoplogs = fs.listStatus(
      new Path(testDataDir, regionAndBucket), new PathFilter() {
        @Override
        public boolean accept(Path file) {
          return file.getName().endsWith(type);
        }
      });
  return hoplogs;
}
 
源代码17 项目: localization_nifi   文件: ListHDFS.java
private PathFilter createPathFilter(final ProcessContext context) {
    final Pattern filePattern = Pattern.compile(context.getProperty(FILE_FILTER).getValue());
    return new PathFilter() {
        @Override
        public boolean accept(Path path) {
            return filePattern.matcher(path.getName()).matches();
        }
    };
}
 
/**
 * Get the schema of AVRO files stored in a directory
 */
public static Schema getAvroSchema(Path path, Configuration conf)
    throws IOException {
  FileSystem fs = path.getFileSystem(conf);
  Path fileToTest;
  if (fs.isDirectory(path)) {
    FileStatus[] fileStatuses = fs.listStatus(path, new PathFilter() {
      @Override
      public boolean accept(Path p) {
        String name = p.getName();
        return !name.startsWith("_") && !name.startsWith(".");
      }
    });
    if (fileStatuses.length == 0) {
      return null;
    }
    fileToTest = fileStatuses[0].getPath();
  } else {
    fileToTest = path;
  }

  SeekableInput input = new FsInput(fileToTest, conf);
  DatumReader<GenericRecord> reader = new GenericDatumReader<GenericRecord>();
  FileReader<GenericRecord> fileReader = DataFileReader.openReader(input, reader);

  Schema result = fileReader.getSchema();
  fileReader.close();
  return result;
}
 
源代码19 项目: dremio-oss   文件: ContainerFileSystem.java
@Override
protected RemoteIterator<LocatedFileStatus> listLocatedStatus(Path f, final PathFilter filter) throws FileNotFoundException, IOException {
  final String container = getContainerName(f);
  final PathFilter alteredFilter = (path) -> {
    return filter.accept(transform(path, container));
  };

  return RemoteIterators.transform(
      ListAccessor.listLocatedFileStatus(getFileSystemForPath(f).fs(), pathWithoutContainer(f), alteredFilter),
      t -> new LocatedFileStatus(ContainerFileSystem.transform(t, container), t.getBlockLocations())
      );
}
 
public boolean accept(Path path) {
  for (PathFilter filter : filters) {
    if (filter.accept(path)) {
      return true;
    }
  }
  return false;
}
 
public String toString() {
  StringBuffer buf = new StringBuffer();
  buf.append("[");
  for (PathFilter f: filters) {
    buf.append(f);
    buf.append(",");
  }
  buf.append("]");
  return buf.toString();
}
 
protected void simpleAddInputPathRecursively(List<FileStatus> result,
        FileSystem fs, Path path, PathFilter inputFilter)
                throws IOException {
    FileStatus[] files = fs.listStatus(path, inputFilter);
    for (int j = 0; j < files.length; j++) {
        if (files[j].isDirectory()) {
            simpleAddInputPathRecursively(result, fs, files[j].getPath(),
                    inputFilter);
        } else {
            result.add(files[j]);
        }
    }
}
 
源代码23 项目: big-c   文件: LocatedFileStatusFetcher.java
ProcessInputDirCallable(FileSystem fs, FileStatus fileStatus,
    boolean recursive, PathFilter inputFilter) {
  this.fs = fs;
  this.fileStatus = fileStatus;
  this.recursive = recursive;
  this.inputFilter = inputFilter;
}
 
源代码24 项目: big-c   文件: FileInputFormat.java
private List<FileStatus> singleThreadedListStatus(JobConf job, Path[] dirs,
    PathFilter inputFilter, boolean recursive) throws IOException {
  List<FileStatus> result = new ArrayList<FileStatus>();
  List<IOException> errors = new ArrayList<IOException>();
  for (Path p: dirs) {
    FileSystem fs = p.getFileSystem(job); 
    FileStatus[] matches = fs.globStatus(p, inputFilter);
    if (matches == null) {
      errors.add(new IOException("Input path does not exist: " + p));
    } else if (matches.length == 0) {
      errors.add(new IOException("Input Pattern " + p + " matches 0 files"));
    } else {
      for (FileStatus globStat: matches) {
        if (globStat.isDirectory()) {
          RemoteIterator<LocatedFileStatus> iter =
              fs.listLocatedStatus(globStat.getPath());
          while (iter.hasNext()) {
            LocatedFileStatus stat = iter.next();
            if (inputFilter.accept(stat.getPath())) {
              if (recursive && stat.isDirectory()) {
                addInputPathRecursively(result, fs, stat.getPath(),
                    inputFilter);
              } else {
                result.add(stat);
              }
            }
          }
        } else {
          result.add(globStat);
        }
      }
    }
  }
  if (!errors.isEmpty()) {
    throw new InvalidInputException(errors);
  }
  return result;
}
 
源代码25 项目: hadoop   文件: FileInputFormat.java
public boolean accept(Path path) {
  for (PathFilter filter : filters) {
    if (!filter.accept(path)) {
      return false;
    }
  }
  return true;
}
 
源代码26 项目: hadoop   文件: FileInputFormat.java
/**
 * Get a PathFilter instance of the filter set for the input paths.
 *
 * @return the PathFilter instance set for the job, NULL if none has been set.
 */
public static PathFilter getInputPathFilter(JobConf conf) {
  Class<? extends PathFilter> filterClass = conf.getClass(
 org.apache.hadoop.mapreduce.lib.input.FileInputFormat.PATHFILTER_CLASS,
 null, PathFilter.class);
  return (filterClass != null) ?
      ReflectionUtils.newInstance(filterClass, conf) : null;
}
 
protected List<FileStatus> listStatus(JobContext job
        ) throws IOException {
    Path[] dirs = getInputPaths(job);
    if (dirs.length == 0) {
        throw new IOException("No input paths specified in job");
    }

    // get tokens for all the required FileSystems..
    TokenCache.obtainTokensForNamenodes(job.getCredentials(), dirs, 
            job.getConfiguration());

    // Whether we need to recursive look into the directory structure
    boolean recursive = getInputDirRecursive(job);

    // creates a MultiPathFilter with the hiddenFileFilter and the
    // user provided one (if any).
    List<PathFilter> filters = new ArrayList<PathFilter>();
    filters.add(hiddenFileFilter);
    PathFilter jobFilter = getInputPathFilter(job);
    if (jobFilter != null) {
        filters.add(jobFilter);
    }
    PathFilter inputFilter = new MultiPathFilter(filters);

    List<FileStatus> result = simpleListStatus(job, dirs, inputFilter, recursive);     

    LOG.info("Total input paths to process : " + result.size()); 
    return result;
}
 
源代码28 项目: hadoop   文件: FileInputFormat.java
private List<FileStatus> singleThreadedListStatus(JobConf job, Path[] dirs,
    PathFilter inputFilter, boolean recursive) throws IOException {
  List<FileStatus> result = new ArrayList<FileStatus>();
  List<IOException> errors = new ArrayList<IOException>();
  for (Path p: dirs) {
    FileSystem fs = p.getFileSystem(job); 
    FileStatus[] matches = fs.globStatus(p, inputFilter);
    if (matches == null) {
      errors.add(new IOException("Input path does not exist: " + p));
    } else if (matches.length == 0) {
      errors.add(new IOException("Input Pattern " + p + " matches 0 files"));
    } else {
      for (FileStatus globStat: matches) {
        if (globStat.isDirectory()) {
          RemoteIterator<LocatedFileStatus> iter =
              fs.listLocatedStatus(globStat.getPath());
          while (iter.hasNext()) {
            LocatedFileStatus stat = iter.next();
            if (inputFilter.accept(stat.getPath())) {
              if (recursive && stat.isDirectory()) {
                addInputPathRecursively(result, fs, stat.getPath(),
                    inputFilter);
              } else {
                result.add(stat);
              }
            }
          }
        } else {
          result.add(globStat);
        }
      }
    }
  }
  if (!errors.isEmpty()) {
    throw new InvalidInputException(errors);
  }
  return result;
}
 
源代码29 项目: hadoop   文件: LocatedFileStatusFetcher.java
ProcessInputDirCallable(FileSystem fs, FileStatus fileStatus,
    boolean recursive, PathFilter inputFilter) {
  this.fs = fs;
  this.fileStatus = fileStatus;
  this.recursive = recursive;
  this.inputFilter = inputFilter;
}
 
源代码30 项目: dremio-oss   文件: HadoopFileSystemWrapper.java
@Override
public FileStatus[] globStatus(Path pathPattern, PathFilter filter) throws IOException {
  try (WaitRecorder recorder = OperatorStats.getWaitRecorder(operatorStats)) {
    return underlyingFs.globStatus(pathPattern, filter);
  } catch(FSError e) {
    throw propagateFSError(e);
  }
}
 
 类所在包
 类方法
 同包方法