下面列出了怎么用org.apache.hadoop.fs.PathFilter的API类实例代码及写法,或者点击链接到github查看源代码。
static DataStatistics publishPlainDataStatistics(Configuration conf,
Path inputDir)
throws IOException {
FileSystem fs = inputDir.getFileSystem(conf);
// obtain input data file statuses
long dataSize = 0;
long fileCount = 0;
RemoteIterator<LocatedFileStatus> iter = fs.listFiles(inputDir, true);
PathFilter filter = new Utils.OutputFileUtils.OutputFilesFilter();
while (iter.hasNext()) {
LocatedFileStatus lStatus = iter.next();
if (filter.accept(lStatus.getPath())) {
dataSize += lStatus.getLen();
++fileCount;
}
}
// publish the plain data statistics
LOG.info("Total size of input data : "
+ StringUtils.humanReadableInt(dataSize));
LOG.info("Total number of input data files : " + fileCount);
return new DataStatistics(dataSize, fileCount, false);
}
private static ArrayList<FileStatus> traverseImpl(Storage storage, Path path, PathFilter filter)
throws IOException {
if (!storage.exists(path.toString())) {
return new ArrayList<>();
}
ArrayList<FileStatus> result = new ArrayList<>();
FileStatus[] statuses = storage.listStatus(path.toString());
for (FileStatus status : statuses) {
if (status.isDirectory()) {
result.addAll(traverseImpl(storage, status.getPath(), filter));
} else {
if (filter.accept(status.getPath())) {
result.add(status);
}
}
}
return result;
}
/**
* Add files in the input path recursively into the results.
* @param result
* The List to store all files.
* @param fs
* The FileSystem.
* @param path
* The input path.
* @param inputFilter
* The input filter that can be used to filter files/dirs.
* @throws IOException
*/
protected void addInputPathRecursively(List<FileStatus> result,
FileSystem fs, Path path, PathFilter inputFilter)
throws IOException {
RemoteIterator<LocatedFileStatus> iter = fs.listLocatedStatus(path);
while (iter.hasNext()) {
LocatedFileStatus stat = iter.next();
if (inputFilter.accept(stat.getPath())) {
if (stat.isDirectory()) {
addInputPathRecursively(result, fs, stat.getPath(), inputFilter);
} else {
result.add(stat);
}
}
}
}
@Override
protected List<FileStatus> getTaskOutput(TaskAttemptContext context)
throws IOException {
PathFilter filter = HiddenPathFilter.get();
// get files on the local FS in the attempt path
Path attemptPath = getTaskAttemptPath(context);
FileSystem attemptFS = attemptPath.getFileSystem(context.getConfiguration());
RemoteIterator<LocatedFileStatus> iter = attemptFS
.listFiles(attemptPath, true /* recursive */ );
List<FileStatus> stats = Lists.newArrayList();
while (iter.hasNext()) {
FileStatus stat = iter.next();
if (filter.accept(stat.getPath())) {
stats.add(stat);
}
}
return stats;
}
/**
* Add files in the input path recursively into the results.
* @param result
* The List to store all files.
* @param fs
* The FileSystem.
* @param path
* The input path.
* @param inputFilter
* The input filter that can be used to filter files/dirs.
* @throws IOException
*/
protected void addInputPathRecursively(List<FileStatus> result,
FileSystem fs, Path path, PathFilter inputFilter)
throws IOException {
RemoteIterator<LocatedFileStatus> iter = fs.listLocatedStatus(path);
while (iter.hasNext()) {
LocatedFileStatus stat = iter.next();
if (inputFilter.accept(stat.getPath())) {
if (stat.isDirectory()) {
addInputPathRecursively(result, fs, stat.getPath(), inputFilter);
} else {
result.add(stat);
}
}
}
}
@Override
public FileStatus[] listStatus(Path path, PathFilter filter) throws IOException {
List<FileStatus> subPaths = pathsAndContent.get(path.toString());
List<FileStatus> filteredSubPaths = new ArrayList<>();
for (FileStatus subPath : subPaths) {
if (filter.accept(subPath.getPath())) {
filteredSubPaths.add(fakeFileStatus(subPath.getPath().toString()));
}
}
return filteredSubPaths.toArray(new FileStatus[filteredSubPaths.size()]);
}
@Override
public FileStatus[] listStatus(Path f, PathFilter filter) throws FileNotFoundException, IOException {
try (WaitRecorder recorder = OperatorStats.getWaitRecorder(operatorStats)) {
return underlyingFs.listStatus(f, filter);
} catch(FSError e) {
throw propagateFSError(e);
}
}
/**
* Will merge given array of filters into one.
* If given array of filters is empty, will return {@link #DUMMY_FILTER}.
*
* @param filters array of filters
* @return one filter that combines all given filters
*/
public static PathFilter mergeFilters(PathFilter... filters) {
if (filters.length == 0) {
return DUMMY_FILTER;
}
return path -> Stream.of(filters).allMatch(filter -> filter.accept(path));
}
/**
* Lists file statuses non-recursively based on given file system objects {@link Scope}.
*
* @param fs file system
* @param path path to file or directory
* @param scope file system objects scope
* @param suppressExceptions indicates if exceptions should be ignored
* @param filter filter to be applied
* @return list of file statuses
*/
private static List<FileStatus> listNonRecursive(FileSystem fs, Path path, Scope scope, boolean suppressExceptions, PathFilter filter) throws IOException {
try {
return Stream.of(fs.listStatus(path, filter))
.filter(status -> isStatusApplicable(status, scope))
.collect(Collectors.toList());
} catch (Exception e) {
if (suppressExceptions) {
logger.debug("Exception during listing file statuses", e);
return Collections.emptyList();
} else {
throw e;
}
}
}
private static List<FileStatus> listFilteredStatus(FileContext fc, Path root,
PathFilter filter) throws IOException {
List<FileStatus> fsList = remoteIterToList(fc.listStatus(root));
if (filter == null) {
return fsList;
} else {
List<FileStatus> filteredList = new LinkedList<FileStatus>();
for (FileStatus fs : fsList) {
if (filter.accept(fs.getPath())) {
filteredList.add(fs);
}
}
return filteredList;
}
}
private DirListingIterator(Path p, PathFilter filter,
boolean needLocation) throws IOException {
this.p = p;
this.src = getPathName(p);
this.filter = filter;
this.needLocation = needLocation;
// fetch the first batch of entries in the directory
thisListing = dfs.listPaths(src, HdfsFileStatus.EMPTY_NAME,
needLocation);
statistics.incrementReadOps(1);
if (thisListing == null) { // the directory does not exist
throw new FileNotFoundException("File " + p + " does not exist.");
}
i = 0;
}
public boolean accept(Path path) {
for (PathFilter filter : filters) {
if (!filter.accept(path)) {
return false;
}
}
return true;
}
@Override
protected RemoteIterator<LocatedFileStatus> listLocatedStatus(Path f,
PathFilter filter)
throws FileNotFoundException, IOException {
LOG.debug("listLocatedStatus with path filter: {}", f.toString());
return super.listLocatedStatus(f, filter);
}
/**
* Get a PathFilter instance of the filter set for the input paths.
*
* @return the PathFilter instance set for the job, NULL if none has been set.
*/
public static PathFilter getInputPathFilter(JobContext context) {
Configuration conf = context.getConfiguration();
Class<?> filterClass = conf.getClass(PATHFILTER_CLASS, null,
PathFilter.class);
return (filterClass != null) ?
(PathFilter) ReflectionUtils.newInstance(filterClass, conf) : null;
}
public boolean accept(Path path) {
for (PathFilter filter : filters) {
if (filter.accept(path)) {
return true;
}
}
return false;
}
protected FileStatus[] getBucketHoplogs(FileSystem fs, String regionAndBucket, final String type)
throws IOException {
FileStatus[] hoplogs = fs.listStatus(
new Path(testDataDir, regionAndBucket), new PathFilter() {
@Override
public boolean accept(Path file) {
return file.getName().endsWith(type);
}
});
return hoplogs;
}
private PathFilter createPathFilter(final ProcessContext context) {
final Pattern filePattern = Pattern.compile(context.getProperty(FILE_FILTER).getValue());
return new PathFilter() {
@Override
public boolean accept(Path path) {
return filePattern.matcher(path.getName()).matches();
}
};
}
/**
* Get the schema of AVRO files stored in a directory
*/
public static Schema getAvroSchema(Path path, Configuration conf)
throws IOException {
FileSystem fs = path.getFileSystem(conf);
Path fileToTest;
if (fs.isDirectory(path)) {
FileStatus[] fileStatuses = fs.listStatus(path, new PathFilter() {
@Override
public boolean accept(Path p) {
String name = p.getName();
return !name.startsWith("_") && !name.startsWith(".");
}
});
if (fileStatuses.length == 0) {
return null;
}
fileToTest = fileStatuses[0].getPath();
} else {
fileToTest = path;
}
SeekableInput input = new FsInput(fileToTest, conf);
DatumReader<GenericRecord> reader = new GenericDatumReader<GenericRecord>();
FileReader<GenericRecord> fileReader = DataFileReader.openReader(input, reader);
Schema result = fileReader.getSchema();
fileReader.close();
return result;
}
@Override
protected RemoteIterator<LocatedFileStatus> listLocatedStatus(Path f, final PathFilter filter) throws FileNotFoundException, IOException {
final String container = getContainerName(f);
final PathFilter alteredFilter = (path) -> {
return filter.accept(transform(path, container));
};
return RemoteIterators.transform(
ListAccessor.listLocatedFileStatus(getFileSystemForPath(f).fs(), pathWithoutContainer(f), alteredFilter),
t -> new LocatedFileStatus(ContainerFileSystem.transform(t, container), t.getBlockLocations())
);
}
public boolean accept(Path path) {
for (PathFilter filter : filters) {
if (filter.accept(path)) {
return true;
}
}
return false;
}
public String toString() {
StringBuffer buf = new StringBuffer();
buf.append("[");
for (PathFilter f: filters) {
buf.append(f);
buf.append(",");
}
buf.append("]");
return buf.toString();
}
protected void simpleAddInputPathRecursively(List<FileStatus> result,
FileSystem fs, Path path, PathFilter inputFilter)
throws IOException {
FileStatus[] files = fs.listStatus(path, inputFilter);
for (int j = 0; j < files.length; j++) {
if (files[j].isDirectory()) {
simpleAddInputPathRecursively(result, fs, files[j].getPath(),
inputFilter);
} else {
result.add(files[j]);
}
}
}
ProcessInputDirCallable(FileSystem fs, FileStatus fileStatus,
boolean recursive, PathFilter inputFilter) {
this.fs = fs;
this.fileStatus = fileStatus;
this.recursive = recursive;
this.inputFilter = inputFilter;
}
private List<FileStatus> singleThreadedListStatus(JobConf job, Path[] dirs,
PathFilter inputFilter, boolean recursive) throws IOException {
List<FileStatus> result = new ArrayList<FileStatus>();
List<IOException> errors = new ArrayList<IOException>();
for (Path p: dirs) {
FileSystem fs = p.getFileSystem(job);
FileStatus[] matches = fs.globStatus(p, inputFilter);
if (matches == null) {
errors.add(new IOException("Input path does not exist: " + p));
} else if (matches.length == 0) {
errors.add(new IOException("Input Pattern " + p + " matches 0 files"));
} else {
for (FileStatus globStat: matches) {
if (globStat.isDirectory()) {
RemoteIterator<LocatedFileStatus> iter =
fs.listLocatedStatus(globStat.getPath());
while (iter.hasNext()) {
LocatedFileStatus stat = iter.next();
if (inputFilter.accept(stat.getPath())) {
if (recursive && stat.isDirectory()) {
addInputPathRecursively(result, fs, stat.getPath(),
inputFilter);
} else {
result.add(stat);
}
}
}
} else {
result.add(globStat);
}
}
}
}
if (!errors.isEmpty()) {
throw new InvalidInputException(errors);
}
return result;
}
public boolean accept(Path path) {
for (PathFilter filter : filters) {
if (!filter.accept(path)) {
return false;
}
}
return true;
}
/**
* Get a PathFilter instance of the filter set for the input paths.
*
* @return the PathFilter instance set for the job, NULL if none has been set.
*/
public static PathFilter getInputPathFilter(JobConf conf) {
Class<? extends PathFilter> filterClass = conf.getClass(
org.apache.hadoop.mapreduce.lib.input.FileInputFormat.PATHFILTER_CLASS,
null, PathFilter.class);
return (filterClass != null) ?
ReflectionUtils.newInstance(filterClass, conf) : null;
}
protected List<FileStatus> listStatus(JobContext job
) throws IOException {
Path[] dirs = getInputPaths(job);
if (dirs.length == 0) {
throw new IOException("No input paths specified in job");
}
// get tokens for all the required FileSystems..
TokenCache.obtainTokensForNamenodes(job.getCredentials(), dirs,
job.getConfiguration());
// Whether we need to recursive look into the directory structure
boolean recursive = getInputDirRecursive(job);
// creates a MultiPathFilter with the hiddenFileFilter and the
// user provided one (if any).
List<PathFilter> filters = new ArrayList<PathFilter>();
filters.add(hiddenFileFilter);
PathFilter jobFilter = getInputPathFilter(job);
if (jobFilter != null) {
filters.add(jobFilter);
}
PathFilter inputFilter = new MultiPathFilter(filters);
List<FileStatus> result = simpleListStatus(job, dirs, inputFilter, recursive);
LOG.info("Total input paths to process : " + result.size());
return result;
}
private List<FileStatus> singleThreadedListStatus(JobConf job, Path[] dirs,
PathFilter inputFilter, boolean recursive) throws IOException {
List<FileStatus> result = new ArrayList<FileStatus>();
List<IOException> errors = new ArrayList<IOException>();
for (Path p: dirs) {
FileSystem fs = p.getFileSystem(job);
FileStatus[] matches = fs.globStatus(p, inputFilter);
if (matches == null) {
errors.add(new IOException("Input path does not exist: " + p));
} else if (matches.length == 0) {
errors.add(new IOException("Input Pattern " + p + " matches 0 files"));
} else {
for (FileStatus globStat: matches) {
if (globStat.isDirectory()) {
RemoteIterator<LocatedFileStatus> iter =
fs.listLocatedStatus(globStat.getPath());
while (iter.hasNext()) {
LocatedFileStatus stat = iter.next();
if (inputFilter.accept(stat.getPath())) {
if (recursive && stat.isDirectory()) {
addInputPathRecursively(result, fs, stat.getPath(),
inputFilter);
} else {
result.add(stat);
}
}
}
} else {
result.add(globStat);
}
}
}
}
if (!errors.isEmpty()) {
throw new InvalidInputException(errors);
}
return result;
}
ProcessInputDirCallable(FileSystem fs, FileStatus fileStatus,
boolean recursive, PathFilter inputFilter) {
this.fs = fs;
this.fileStatus = fileStatus;
this.recursive = recursive;
this.inputFilter = inputFilter;
}
@Override
public FileStatus[] globStatus(Path pathPattern, PathFilter filter) throws IOException {
try (WaitRecorder recorder = OperatorStats.getWaitRecorder(operatorStats)) {
return underlyingFs.globStatus(pathPattern, filter);
} catch(FSError e) {
throw propagateFSError(e);
}
}