下面列出了怎么用org.apache.hadoop.fs.LocatedFileStatus的API类实例代码及写法,或者点击链接到github查看源代码。
public List<String> listFiles(String dir) throws IOException
{
List<String> files = new ArrayList<>();
Path path = new Path(dir);
FileStatus fileStatus = fileSystem.getFileStatus(path);
if (!fileStatus.isDirectory()) {
throw new FileNotFoundException("Cannot read directory " + dir);
}
RemoteIterator<LocatedFileStatus> it = fileSystem.listFiles(path, false);
while (it.hasNext()) {
LocatedFileStatus lfs = it.next();
files.add(lfs.getPath().getName());
}
return files;
}
public List<LocatedFileStatus> listFilesInfo(String dir) throws IOException
{
List<LocatedFileStatus> files = new ArrayList<>();
Path path = new Path(dir);
FileStatus fileStatus = fileSystem.getFileStatus(path);
if (!fileStatus.isDirectory()) {
throw new FileNotFoundException("Cannot read directory " + dir);
}
RemoteIterator<LocatedFileStatus> it = fileSystem.listFiles(path, false);
while (it.hasNext()) {
LocatedFileStatus lfs = it.next();
files.add(lfs);
}
return files;
}
private static RemoteIterator<LocatedFileStatus> simpleRemoteIterator(List<LocatedFileStatus> files)
{
return new RemoteIterator<LocatedFileStatus>()
{
private final Iterator<LocatedFileStatus> iterator = ImmutableList.copyOf(files).iterator();
@Override
public boolean hasNext()
{
return iterator.hasNext();
}
@Override
public LocatedFileStatus next()
{
return iterator.next();
}
};
}
static DataStatistics publishPlainDataStatistics(Configuration conf,
Path inputDir)
throws IOException {
FileSystem fs = inputDir.getFileSystem(conf);
// obtain input data file statuses
long dataSize = 0;
long fileCount = 0;
RemoteIterator<LocatedFileStatus> iter = fs.listFiles(inputDir, true);
PathFilter filter = new Utils.OutputFileUtils.OutputFilesFilter();
while (iter.hasNext()) {
LocatedFileStatus lStatus = iter.next();
if (filter.accept(lStatus.getPath())) {
dataSize += lStatus.getLen();
++fileCount;
}
}
// publish the plain data statistics
LOG.info("Total size of input data : "
+ StringUtils.humanReadableInt(dataSize));
LOG.info("Total number of input data files : " + fileCount);
return new DataStatistics(dataSize, fileCount, false);
}
@Test
public void basicClientReadWrite() throws Exception {
Path basePath = new Path(temporaryFolder.newFolder().getAbsolutePath());
Path path = ((PathCanonicalizer) clientFS).canonicalizePath(new Path(basePath, "testfile.bytes"));
final byte[] randomBytesMoreThanBuffer = new byte[RemoteNodeFileSystem.REMOTE_WRITE_BUFFER_SIZE_DEFAULT * 3];
Random r = new Random();
r.nextBytes(randomBytesMoreThanBuffer);
try(FSDataOutputStream stream = clientFS.create(path, false)){
stream.write(randomBytesMoreThanBuffer);
}
RemoteIterator<LocatedFileStatus> iter = client.fileSystem.listFiles(basePath, false);
assertEquals(true, iter.hasNext());
LocatedFileStatus status = iter.next();
try(FSDataInputStream in = clientFS.open(status.getPath())){
byte[] back = new byte[randomBytesMoreThanBuffer.length];
int dataRead = in.read(back);
assertEquals(back.length, dataRead);
assertTrue(Arrays.equals(randomBytesMoreThanBuffer, back));
}
client.fileSystem.delete(status.getPath(), false);
}
private static BackgroundHiveSplitLoader backgroundHiveSplitLoader(
List<LocatedFileStatus> files,
TupleDomain<HiveColumnHandle> compactEffectivePredicate,
Optional<HiveBucketFilter> hiveBucketFilter,
Table table,
Optional<HiveBucketHandle> bucketHandle,
Optional<ValidWriteIdList> validWriteIds)
{
return backgroundHiveSplitLoader(
new TestingHdfsEnvironment(files),
compactEffectivePredicate,
hiveBucketFilter,
table,
bucketHandle,
validWriteIds);
}
@Override
public Result call() throws Exception {
Result result = new Result();
result.fs = fs;
if (fileStatus.isDirectory()) {
RemoteIterator<LocatedFileStatus> iter = fs
.listLocatedStatus(fileStatus.getPath());
while (iter.hasNext()) {
LocatedFileStatus stat = iter.next();
if (inputFilter.accept(stat.getPath())) {
if (recursive && stat.isDirectory()) {
result.dirsNeedingRecursiveCalls.add(stat);
} else {
result.locatedFileStatuses.add(stat);
}
}
}
} else {
result.locatedFileStatuses.add(fileStatus);
}
return result;
}
protected int getExtensioncount(final String hdfsstore, final String extension) throws Exception {
int counter =0 ;
HDFSStoreImpl hdfsStore = (HDFSStoreImpl) GemFireCacheImpl.getInstance().findHDFSStore(hdfsstore);
FileSystem fs = hdfsStore.getFileSystem();
try {
Path basePath = new Path(hdfsStore.getHomeDir());
RemoteIterator<LocatedFileStatus> files = fs.listFiles(basePath, true);
while(files.hasNext()) {
HashMap<String, String> entriesMap = new HashMap<String, String>();
LocatedFileStatus next = files.next();
if (next.getPath().getName().endsWith(extension))
counter++;
}
} catch (IOException e) {
e.printStackTrace();
}
return counter;
}
public static List<LocatedFileStatus> listDirectoryRaidLocatedFileStatus(
Configuration conf, FileSystem srcFs, Path p) throws IOException {
long minFileSize = conf.getLong(MINIMUM_RAIDABLE_FILESIZE_KEY,
MINIMUM_RAIDABLE_FILESIZE);
List<LocatedFileStatus> lfs = new ArrayList<LocatedFileStatus>();
RemoteIterator<LocatedFileStatus> iter = srcFs.listLocatedStatus(p);
while (iter.hasNext()) {
LocatedFileStatus stat = iter.next();
if (stat.isDir()) {
return null;
}
// We don't raid too small files
if (stat.getLen() < minFileSize) {
continue;
}
lfs.add(stat);
}
if (lfs.size() == 0)
return null;
return lfs;
}
/**
* List of the files in the given directory (path), as a {@code JavaRDD<String>}
*
* @param sc Spark context
* @param path Path to list files in
* @param recursive Whether to walk the directory tree recursively (i.e., include subdirectories)
* @param allowedExtensions If null: all files will be accepted. If non-null: only files with the specified extension will be allowed.
* Exclude the extension separator - i.e., use "txt" not ".txt" here.
* @param config Hadoop configuration to use. Must not be null.
* @return Paths in the directory
* @throws IOException If error occurs getting directory contents
*/
public static JavaRDD<String> listPaths(@NonNull JavaSparkContext sc, String path, boolean recursive,
Set<String> allowedExtensions, @NonNull Configuration config) throws IOException {
List<String> paths = new ArrayList<>();
FileSystem hdfs = FileSystem.get(URI.create(path), config);
RemoteIterator<LocatedFileStatus> fileIter = hdfs.listFiles(new org.apache.hadoop.fs.Path(path), recursive);
while (fileIter.hasNext()) {
String filePath = fileIter.next().getPath().toString();
if(allowedExtensions == null){
paths.add(filePath);
} else {
String ext = FilenameUtils.getExtension(path);
if(allowedExtensions.contains(ext)){
paths.add(filePath);
}
}
}
return sc.parallelize(paths);
}
@Override
public RemoteIterator<LocatedFileStatus> listLocatedStatus(Path f)
{
return new RemoteIterator<LocatedFileStatus>()
{
private final Iterator<LocatedFileStatus> iterator = files.iterator();
@Override
public boolean hasNext()
{
return iterator.hasNext();
}
@Override
public LocatedFileStatus next()
{
return iterator.next();
}
};
}
void enumerateDir() throws Exception {
System.out.println("enumarate dir, path " + path);
Configuration conf = new Configuration();
FileSystem fs = FileSystem.get(conf);
int repfactor = 4;
for (int k = 0; k < repfactor; k++) {
long start = System.currentTimeMillis();
for (int i = 0; i < size; i++) {
// single operation == loop
RemoteIterator<LocatedFileStatus> iter = fs.listFiles(path, false);
while (iter.hasNext()) {
iter.next();
}
}
long end = System.currentTimeMillis();
double executionTime = ((double) (end - start));
double latency = executionTime * 1000.0 / ((double) size);
System.out.println("execution time [ms] " + executionTime);
System.out.println("latency [us] " + latency);
}
fs.close();
}
@Override
public List<SplitPath> getInputSplits(Configuration config, Path path, int splitSize)
throws IOException {
ImmutableList.Builder<SplitPath> splits = ImmutableList.builder();
RemoteIterator<LocatedFileStatus> files = listFiles(path, false);
if (!files.hasNext()) {
// No splits. Don't return nothing, return a single empty split
String table = getTableName(_rootPath, path);
return ImmutableList.of(new SplitPath(getSplitPath(_rootPath, table, getEmptySplitFileName()), 1));
}
while (files.hasNext()) {
LocatedFileStatus file = files.next();
splits.add(new SplitPath(file.getPath(), file.getLen()));
}
return splits.build();
}
static DataStatistics publishPlainDataStatistics(Configuration conf,
Path inputDir)
throws IOException {
FileSystem fs = inputDir.getFileSystem(conf);
// obtain input data file statuses
long dataSize = 0;
long fileCount = 0;
RemoteIterator<LocatedFileStatus> iter = fs.listFiles(inputDir, true);
PathFilter filter = new Utils.OutputFileUtils.OutputFilesFilter();
while (iter.hasNext()) {
LocatedFileStatus lStatus = iter.next();
if (filter.accept(lStatus.getPath())) {
dataSize += lStatus.getLen();
++fileCount;
}
}
// publish the plain data statistics
LOG.info("Total size of input data : "
+ StringUtils.humanReadableInt(dataSize));
LOG.info("Total number of input data files : " + fileCount);
return new DataStatistics(dataSize, fileCount, false);
}
@Override
public void perform() throws Exception {
getLogger().info("Start deleting data files");
FileSystem fs = CommonFSUtils.getRootDirFileSystem(getConf());
Path rootDir = CommonFSUtils.getRootDir(getConf());
Path defaultDir = rootDir.suffix("/data/default");
RemoteIterator<LocatedFileStatus> iterator = fs.listFiles(defaultDir, true);
while (iterator.hasNext()){
LocatedFileStatus status = iterator.next();
if(!HFile.isHFileFormat(fs, status.getPath())){
continue;
}
if(RandomUtils.nextFloat(0, 100) > chance){
continue;
}
fs.delete(status.getPath(), true);
getLogger().info("Deleting {}", status.getPath());
}
getLogger().info("Done deleting data files");
}
protected int getExtensioncount(final String hdfsstore, final String extension) throws Exception {
int counter =0 ;
HDFSStoreImpl hdfsStore = (HDFSStoreImpl) GemFireCacheImpl.getInstance().findHDFSStore(hdfsstore);
FileSystem fs = hdfsStore.getFileSystem();
try {
Path basePath = new Path(hdfsStore.getHomeDir());
RemoteIterator<LocatedFileStatus> files = fs.listFiles(basePath, true);
while(files.hasNext()) {
HashMap<String, String> entriesMap = new HashMap<String, String>();
LocatedFileStatus next = files.next();
if (next.getPath().getName().endsWith(extension))
counter++;
}
} catch (IOException e) {
e.printStackTrace();
}
return counter;
}
@Override
public Result call() throws Exception {
Result result = new Result();
result.fs = fs;
if (fileStatus.isDirectory()) {
RemoteIterator<LocatedFileStatus> iter = fs
.listLocatedStatus(fileStatus.getPath());
while (iter.hasNext()) {
LocatedFileStatus stat = iter.next();
if (inputFilter.accept(stat.getPath())) {
if (recursive && stat.isDirectory()) {
result.dirsNeedingRecursiveCalls.add(stat);
} else {
result.locatedFileStatuses.add(stat);
}
}
}
} else {
result.locatedFileStatuses.add(fileStatus);
}
return result;
}
public int getNumberOfFiles() throws IOException {
DistributedFileSystem dfs = (DistributedFileSystem)fs;
RemoteIterator<LocatedFileStatus> iter = dfs.listLocatedStatus(outputPath);
int fn = 0;
while (iter.hasNext()) {
LocatedFileStatus lfs = iter.next();
if (lfs.isDir())
continue;
if (lfs.getBlockLocations().length != 1)
continue;
String curHost = rtc.cur_datanode;
for (String host: lfs.getBlockLocations()[0].getHosts()) {
if (curHost.equals(host)){
fn++;
break;
}
}
}
LOG.info(" Found " + fn + " files in " + dfs.getUri());
return fn;
}
/**
* Searches the given directory for the file.
*
* @param directory
* to search
* @param filename
* of file we want
* @return path if file exists in this directory.else return null.
*/
public Path searchInDirectory(Path directory, String filename) {
//Search the files and folder in this Path to find the one matching the filename.
try {
RemoteIterator<LocatedFileStatus> it = fs.listFiles(directory, true);
String[] parts;
Path path;
while (it.hasNext()) {
path = it.next().getPath();
parts = path.toString().split("/");
if (parts[parts.length - 1].equals(filename)) {
return path;
}
}
} catch (IOException e) {
if (LOGGER.isLoggable(Level.SEVERE)) {
LOGGER.severe(e.getMessage());
}
}
return null;
}
@Test(timeout=60000)
public void testListFiles() throws IOException {
Configuration conf = new HdfsConfiguration();
MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf).build();
try {
DistributedFileSystem fs = cluster.getFileSystem();
final Path relative = new Path("relative");
fs.create(new Path(relative, "foo")).close();
final List<LocatedFileStatus> retVal = new ArrayList<LocatedFileStatus>();
final RemoteIterator<LocatedFileStatus> iter = fs.listFiles(relative, true);
while (iter.hasNext()) {
retVal.add(iter.next());
}
System.out.println("retVal = " + retVal);
} finally {
cluster.shutdown();
}
}
/** Test when input path is a file */
@Test
public void testFile() throws IOException {
fc.mkdir(TEST_DIR, FsPermission.getDefault(), true);
writeFile(fc, FILE1, FILE_LEN);
RemoteIterator<LocatedFileStatus> itor = fc.util().listFiles(
FILE1, true);
LocatedFileStatus stat = itor.next();
assertFalse(itor.hasNext());
assertTrue(stat.isFile());
assertEquals(FILE_LEN, stat.getLen());
assertEquals(fc.makeQualified(FILE1), stat.getPath());
assertEquals(1, stat.getBlockLocations().length);
itor = fc.util().listFiles(FILE1, false);
stat = itor.next();
assertFalse(itor.hasNext());
assertTrue(stat.isFile());
assertEquals(FILE_LEN, stat.getLen());
assertEquals(fc.makeQualified(FILE1), stat.getPath());
assertEquals(1, stat.getBlockLocations().length);
}
private static void checkEquals(RemoteIterator<LocatedFileStatus> i1,
RemoteIterator<LocatedFileStatus> i2) throws IOException {
while (i1.hasNext()) {
assertTrue(i2.hasNext());
// Compare all the fields but the path name, which is relative
// to the original path from listFiles.
LocatedFileStatus l1 = i1.next();
LocatedFileStatus l2 = i2.next();
assertEquals(l1.getAccessTime(), l2.getAccessTime());
assertEquals(l1.getBlockSize(), l2.getBlockSize());
assertEquals(l1.getGroup(), l2.getGroup());
assertEquals(l1.getLen(), l2.getLen());
assertEquals(l1.getModificationTime(), l2.getModificationTime());
assertEquals(l1.getOwner(), l2.getOwner());
assertEquals(l1.getPermission(), l2.getPermission());
assertEquals(l1.getReplication(), l2.getReplication());
}
assertFalse(i2.hasNext());
}
private static void checkEquals(RemoteIterator<LocatedFileStatus> i1,
RemoteIterator<LocatedFileStatus> i2) throws IOException {
while (i1.hasNext()) {
assertTrue(i2.hasNext());
// Compare all the fields but the path name, which is relative
// to the original path from listFiles.
LocatedFileStatus l1 = i1.next();
LocatedFileStatus l2 = i2.next();
assertEquals(l1.getAccessTime(), l2.getAccessTime());
assertEquals(l1.getBlockSize(), l2.getBlockSize());
assertEquals(l1.getGroup(), l2.getGroup());
assertEquals(l1.getLen(), l2.getLen());
assertEquals(l1.getModificationTime(), l2.getModificationTime());
assertEquals(l1.getOwner(), l2.getOwner());
assertEquals(l1.getPermission(), l2.getPermission());
assertEquals(l1.getReplication(), l2.getReplication());
}
assertFalse(i2.hasNext());
}
/**
* To get this project to compile under Hadoop 1, this code needs to be
* commented out
*
*
* @param fs filesystem
* @param dir dir
* @param subdir subdir
* @param recursive recurse?
* @throws IOException IO problems
*/
public static void assertListFilesFinds(FileSystem fs,
Path dir,
Path subdir,
boolean recursive) throws IOException {
RemoteIterator<LocatedFileStatus> iterator =
fs.listFiles(dir, recursive);
boolean found = false;
int entries = 0;
StringBuilder builder = new StringBuilder();
while (iterator.hasNext()) {
LocatedFileStatus next = iterator.next();
entries++;
builder.append(next.toString()).append('\n');
if (next.getPath().equals(subdir)) {
found = true;
}
}
assertTrue("Path " + subdir
+ " not found in directory " + dir + " : "
+ " entries=" + entries
+ " content"
+ builder.toString(),
found);
}
@Test(timeout = 5000)
public void testNewSplitsGen() throws Exception {
DataSourceDescriptor dataSource = generateDataSourceDescriptorMapReduce(newSplitsDir);
Assert.assertTrue(dataSource.getAdditionalLocalFiles()
.containsKey(MRInputHelpers.JOB_SPLIT_RESOURCE_NAME));
Assert.assertTrue(dataSource.getAdditionalLocalFiles()
.containsKey(MRInputHelpers.JOB_SPLIT_METAINFO_RESOURCE_NAME));
RemoteIterator<LocatedFileStatus> files =
remoteFs.listFiles(newSplitsDir, false);
boolean foundSplitsFile = false;
boolean foundMetaFile = false;
int totalFilesFound = 0;
while (files.hasNext()) {
LocatedFileStatus status = files.next();
String fName = status.getPath().getName();
totalFilesFound++;
if (fName.equals(MRInputHelpers.JOB_SPLIT_RESOURCE_NAME)) {
foundSplitsFile = true;
} else if (fName.equals(MRInputHelpers.JOB_SPLIT_METAINFO_RESOURCE_NAME)) {
foundMetaFile = true;
} else {
Assert.fail("Found invalid file in splits dir, filename=" + fName);
}
Assert.assertTrue(status.getLen() > 0);
}
Assert.assertEquals(2, totalFilesFound);
Assert.assertTrue(foundSplitsFile);
Assert.assertTrue(foundMetaFile);
verifyLocationHints(newSplitsDir, dataSource.getLocationHint().getTaskLocationHints());
}
@Override
public BlockLocation[] getFileBlockLocations(FileStatus fileStatus,
long start, long len)
throws IOException {
if (fileStatus instanceof LocatedFileStatus) {
return ((LocatedFileStatus) fileStatus).getBlockLocations();
} else {
return super.getFileBlockLocations(fileStatus, start, len);
}
}
FileStatus convertFileStatus(FileStatusAdapter fileStatusAdapter) {
Path symLink = null;
try {
fileStatusAdapter.getSymlink();
} catch (Exception ex) {
//NOOP: If not symlink symlink remains null.
}
FileStatus fileStatus = new FileStatus(
fileStatusAdapter.getLength(),
fileStatusAdapter.isDir(),
fileStatusAdapter.getBlockReplication(),
fileStatusAdapter.getBlocksize(),
fileStatusAdapter.getModificationTime(),
fileStatusAdapter.getAccessTime(),
new FsPermission(fileStatusAdapter.getPermission()),
fileStatusAdapter.getOwner(),
fileStatusAdapter.getGroup(),
symLink,
fileStatusAdapter.getPath()
);
BlockLocation[] blockLocations = fileStatusAdapter.getBlockLocations();
if (blockLocations == null || blockLocations.length == 0) {
return fileStatus;
}
return new LocatedFileStatus(fileStatus, blockLocations);
}
private FileStatus convertFileStatus(
FileStatusAdapter fileStatusAdapter) {
Path symLink = null;
try {
fileStatusAdapter.getSymlink();
} catch (Exception ex) {
//NOOP: If not symlink symlink remains null.
}
FileStatus fileStatus = new FileStatus(
fileStatusAdapter.getLength(),
fileStatusAdapter.isDir(),
fileStatusAdapter.getBlockReplication(),
fileStatusAdapter.getBlocksize(),
fileStatusAdapter.getModificationTime(),
fileStatusAdapter.getAccessTime(),
new FsPermission(fileStatusAdapter.getPermission()),
fileStatusAdapter.getOwner(),
fileStatusAdapter.getGroup(),
symLink,
fileStatusAdapter.getPath()
);
BlockLocation[] blockLocations = fileStatusAdapter.getBlockLocations();
if (blockLocations == null || blockLocations.length == 0) {
return fileStatus;
}
return new LocatedFileStatus(fileStatus, blockLocations);
}
@Override
public RemoteIterator<LocatedFileStatus> listFiles(Path f, boolean recursive) throws FileNotFoundException, IOException {
final String container = getContainerName(f);
return RemoteIterators.transform(
getFileSystemForPath(f).fs().listFiles(pathWithoutContainer(f), recursive),
t -> new LocatedFileStatus(ContainerFileSystem.transform(t, container), t.getBlockLocations())
);
}
/** Test when input patch has a symbolic links as its children */
@Test
public void testSymbolicLinks() throws IOException {
writeFile(fc, FILE1, FILE_LEN);
writeFile(fc, FILE2, FILE_LEN);
writeFile(fc, FILE3, FILE_LEN);
Path dir4 = new Path(TEST_DIR, "dir4");
Path dir5 = new Path(dir4, "dir5");
Path file4 = new Path(dir4, "file4");
fc.createSymlink(DIR1, dir5, true);
fc.createSymlink(FILE1, file4, true);
RemoteIterator<LocatedFileStatus> itor = fc.util().listFiles(dir4, true);
LocatedFileStatus stat = itor.next();
assertTrue(stat.isFile());
assertEquals(fc.makeQualified(FILE2), stat.getPath());
stat = itor.next();
assertTrue(stat.isFile());
assertEquals(fc.makeQualified(FILE3), stat.getPath());
stat = itor.next();
assertTrue(stat.isFile());
assertEquals(fc.makeQualified(FILE1), stat.getPath());
assertFalse(itor.hasNext());
itor = fc.util().listFiles(dir4, false);
stat = itor.next();
assertTrue(stat.isFile());
assertEquals(fc.makeQualified(FILE1), stat.getPath());
assertFalse(itor.hasNext());
}