下面列出了怎么用org.apache.hadoop.fs.RemoteIterator的API类实例代码及写法,或者点击链接到github查看源代码。
public List<String> listFiles(String dir) throws IOException
{
List<String> files = new ArrayList<>();
Path path = new Path(dir);
FileStatus fileStatus = fileSystem.getFileStatus(path);
if (!fileStatus.isDirectory()) {
throw new FileNotFoundException("Cannot read directory " + dir);
}
RemoteIterator<LocatedFileStatus> it = fileSystem.listFiles(path, false);
while (it.hasNext()) {
LocatedFileStatus lfs = it.next();
files.add(lfs.getPath().getName());
}
return files;
}
public List<LocatedFileStatus> listFilesInfo(String dir) throws IOException
{
List<LocatedFileStatus> files = new ArrayList<>();
Path path = new Path(dir);
FileStatus fileStatus = fileSystem.getFileStatus(path);
if (!fileStatus.isDirectory()) {
throw new FileNotFoundException("Cannot read directory " + dir);
}
RemoteIterator<LocatedFileStatus> it = fileSystem.listFiles(path, false);
while (it.hasNext()) {
LocatedFileStatus lfs = it.next();
files.add(lfs);
}
return files;
}
/** Test the FileStatus obtained calling listStatus on a file */
@Test
public void testListStatusOnFile() throws IOException {
FileStatus[] stats = fs.listStatus(file1);
assertEquals(1, stats.length);
FileStatus status = stats[0];
assertFalse(file1 + " should be a file", status.isDirectory());
assertEquals(blockSize, status.getBlockSize());
assertEquals(1, status.getReplication());
assertEquals(fileSize, status.getLen());
assertEquals(file1.makeQualified(fs.getUri(),
fs.getWorkingDirectory()).toString(),
status.getPath().toString());
RemoteIterator<FileStatus> itor = fc.listStatus(file1);
status = itor.next();
assertEquals(stats[0], status);
assertFalse(file1 + " should be a file", status.isDirectory());
}
private static RemoteIterator<LocatedFileStatus> simpleRemoteIterator(List<LocatedFileStatus> files)
{
return new RemoteIterator<LocatedFileStatus>()
{
private final Iterator<LocatedFileStatus> iterator = ImmutableList.copyOf(files).iterator();
@Override
public boolean hasNext()
{
return iterator.hasNext();
}
@Override
public LocatedFileStatus next()
{
return iterator.next();
}
};
}
@Override
public RemoteIterator<LocatedFileStatus> listLocatedStatus(Path f)
{
return new RemoteIterator<LocatedFileStatus>()
{
private final Iterator<LocatedFileStatus> iterator = files.iterator();
@Override
public boolean hasNext()
{
return iterator.hasNext();
}
@Override
public LocatedFileStatus next()
{
return iterator.next();
}
};
}
void checkPath() {
try {
RemoteIterator<LocatedFileStatus> files = fs.listFiles(folderPath, false);
if (files == null) {
throw new IllegalArgumentException("Invalid path " + folderPath);
}
while (files.hasNext()) {
LocatedFileStatus fileStatus = files.next();
Path path = fileStatus.getPath();
String name = path.getName();
if (name.endsWith(Constants.DATA_FILE_SUFFIX)) {
dataFilePath = path;
} else if (name.endsWith(Constants.META_FILE_SUFFIX)) {
metaFilePath = path;
} else {
logger.warn("Contains invalid file {} in path {}", path, folderPath);
}
}
if (dataFilePath == null || metaFilePath == null) {
throw new IllegalArgumentException("Invalid path " + folderPath);
}
} catch (IOException e) {
throw new RuntimeException("io error", e);
}
}
@Override
protected Callable<RemoteIterator<FileStatus>> newMapTask(final String address) throws IOException {
return new Callable<RemoteIterator<FileStatus>>() {
@Override
public RemoteIterator<FileStatus> call() throws Exception {
// Only directories should be listed with a fork/join task
final FileSystem fs = getDelegateFileSystem(address);
FileStatus status = fs.getFileStatus(path);
if (status.isFile()) {
throw new FileNotFoundException("Directory not found: " + path);
}
final RemoteIterator<FileStatus> remoteStatusIter = fs.listStatusIterator(path);
return new RemoteIterator<FileStatus>() {
@Override
public boolean hasNext() throws IOException {
return remoteStatusIter.hasNext();
}
@Override
public FileStatus next() throws IOException {
return fixFileStatus(address, remoteStatusIter.next());
}
};
}
};
}
@Test
public void testListStatusIteratorPastLastElement() throws IOException {
final Path root = new Path("/");
final RemoteIterator<FileStatus> statusIter = fs.listStatusIterator(root);
while (statusIter.hasNext()) {
statusIter.next();
}
try {
statusIter.next();
fail("NoSuchElementException should be throw when next() is called when there are no elements remaining.");
} catch (NoSuchElementException ex) {
// OK.
}
}
void enumerateDir() throws Exception {
System.out.println("enumarate dir, path " + path);
Configuration conf = new Configuration();
FileSystem fs = FileSystem.get(conf);
int repfactor = 4;
for (int k = 0; k < repfactor; k++) {
long start = System.currentTimeMillis();
for (int i = 0; i < size; i++) {
// single operation == loop
RemoteIterator<LocatedFileStatus> iter = fs.listFiles(path, false);
while (iter.hasNext()) {
iter.next();
}
}
long end = System.currentTimeMillis();
double executionTime = ((double) (end - start));
double latency = executionTime * 1000.0 / ((double) size);
System.out.println("execution time [ms] " + executionTime);
System.out.println("latency [us] " + latency);
}
fs.close();
}
private void deleteLocalDir(FileContext lfs, DeletionService del,
String localDir) throws IOException {
RemoteIterator<FileStatus> fileStatus = lfs.listStatus(new Path(localDir));
if (fileStatus != null) {
while (fileStatus.hasNext()) {
FileStatus status = fileStatus.next();
try {
if (status.getPath().getName().matches(".*" +
ContainerLocalizer.USERCACHE + "_DEL_.*")) {
LOG.info("usercache path : " + status.getPath().toString());
cleanUpFilesPerUserDir(lfs, del, status.getPath());
} else if (status.getPath().getName()
.matches(".*" + NM_PRIVATE_DIR + "_DEL_.*")
||
status.getPath().getName()
.matches(".*" + ContainerLocalizer.FILECACHE + "_DEL_.*")) {
del.delete(null, status.getPath(), new Path[] {});
}
} catch (IOException ex) {
// Do nothing, just give the warning
LOG.warn("Failed to delete this local Directory: " +
status.getPath().getName());
}
}
}
}
@Test
public void testClientWriteEmptyFile() throws Exception {
Path basePath = new Path(temporaryFolder.newFolder().getAbsolutePath());
Path path = ((PathCanonicalizer) clientFS).canonicalizePath(new Path(basePath, "testfile.bytes"));
// create a file
FSDataOutputStream stream = clientFS.create(path, false);
// close it without writing anything to it
stream.close();
// make sure the file was created
RemoteIterator<LocatedFileStatus> iter = client.fileSystem.listFiles(basePath, false);
assertEquals(true, iter.hasNext());
LocatedFileStatus status = iter.next();
try(FSDataInputStream in = clientFS.open(status.getPath())){
in.readByte();
fail("Fail is expected to be empty");
} catch (EOFException e) {
// empty file as expected
}
client.fileSystem.delete(status.getPath(), false);
}
static DataStatistics publishPlainDataStatistics(Configuration conf,
Path inputDir)
throws IOException {
FileSystem fs = inputDir.getFileSystem(conf);
// obtain input data file statuses
long dataSize = 0;
long fileCount = 0;
RemoteIterator<LocatedFileStatus> iter = fs.listFiles(inputDir, true);
PathFilter filter = new Utils.OutputFileUtils.OutputFilesFilter();
while (iter.hasNext()) {
LocatedFileStatus lStatus = iter.next();
if (filter.accept(lStatus.getPath())) {
dataSize += lStatus.getLen();
++fileCount;
}
}
// publish the plain data statistics
LOG.info("Total size of input data : "
+ StringUtils.humanReadableInt(dataSize));
LOG.info("Total number of input data files : " + fileCount);
return new DataStatistics(dataSize, fileCount, false);
}
/**
* Add files in the input path recursively into the results.
* @param result
* The List to store all files.
* @param fs
* The FileSystem.
* @param path
* The input path.
* @param inputFilter
* The input filter that can be used to filter files/dirs.
* @throws IOException
*/
protected void addInputPathRecursively(List<FileStatus> result,
FileSystem fs, Path path, PathFilter inputFilter)
throws IOException {
RemoteIterator<LocatedFileStatus> iter = fs.listLocatedStatus(path);
while (iter.hasNext()) {
LocatedFileStatus stat = iter.next();
if (inputFilter.accept(stat.getPath())) {
if (stat.isDirectory()) {
addInputPathRecursively(result, fs, stat.getPath(), inputFilter);
} else {
result.add(stat);
}
}
}
}
@Override
public Result call() throws Exception {
Result result = new Result();
result.fs = fs;
if (fileStatus.isDirectory()) {
RemoteIterator<LocatedFileStatus> iter = fs
.listLocatedStatus(fileStatus.getPath());
while (iter.hasNext()) {
LocatedFileStatus stat = iter.next();
if (inputFilter.accept(stat.getPath())) {
if (recursive && stat.isDirectory()) {
result.dirsNeedingRecursiveCalls.add(stat);
} else {
result.locatedFileStatuses.add(stat);
}
}
}
} else {
result.locatedFileStatuses.add(fileStatus);
}
return result;
}
/**
* Add files in the input path recursively into the results.
* @param result
* The List to store all files.
* @param fs
* The FileSystem.
* @param path
* The input path.
* @param inputFilter
* The input filter that can be used to filter files/dirs.
* @throws IOException
*/
protected void addInputPathRecursively(List<FileStatus> result,
FileSystem fs, Path path, PathFilter inputFilter)
throws IOException {
RemoteIterator<LocatedFileStatus> iter = fs.listLocatedStatus(path);
while (iter.hasNext()) {
LocatedFileStatus stat = iter.next();
if (inputFilter.accept(stat.getPath())) {
if (stat.isDirectory()) {
addInputPathRecursively(result, fs, stat.getPath(), inputFilter);
} else {
result.add(stat);
}
}
}
}
@VisibleForTesting
protected static List<FileStatus> scanDirectory(Path path, FileContext fc,
PathFilter pathFilter) throws IOException {
path = fc.makeQualified(path);
List<FileStatus> jhStatusList = new ArrayList<FileStatus>();
try {
RemoteIterator<FileStatus> fileStatusIter = fc.listStatus(path);
while (fileStatusIter.hasNext()) {
FileStatus fileStatus = fileStatusIter.next();
Path filePath = fileStatus.getPath();
if (fileStatus.isFile() && pathFilter.accept(filePath)) {
jhStatusList.add(fileStatus);
}
}
} catch (FileNotFoundException fe) {
LOG.error("Error while scanning directory " + path, fe);
}
return jhStatusList;
}
@Override
public int run(Configuration conf, List<String> args) throws IOException {
if (!args.isEmpty()) {
System.err.println("Can't understand argument: " + args.get(0));
return 1;
}
final DistributedFileSystem dfs = AdminHelper.getDFS(conf);
try {
final TableListing listing = new TableListing.Builder()
.addField("").addField("", true)
.wrapWidth(AdminHelper.MAX_LINE_WIDTH).hideHeaders().build();
final RemoteIterator<EncryptionZone> it = dfs.listEncryptionZones();
while (it.hasNext()) {
EncryptionZone ez = it.next();
listing.addRow(ez.getPath(), ez.getKeyName());
}
System.out.println(listing.toString());
} catch (IOException e) {
System.err.println(prettifyException(e));
return 2;
}
return 0;
}
@Test(timeout=60000)
public void testListFiles() throws IOException {
Configuration conf = new HdfsConfiguration();
MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf).build();
try {
DistributedFileSystem fs = cluster.getFileSystem();
final Path relative = new Path("relative");
fs.create(new Path(relative, "foo")).close();
final List<LocatedFileStatus> retVal = new ArrayList<LocatedFileStatus>();
final RemoteIterator<LocatedFileStatus> iter = fs.listFiles(relative, true);
while (iter.hasNext()) {
retVal.add(iter.next());
}
System.out.println("retVal = " + retVal);
} finally {
cluster.shutdown();
}
}
@Override
boolean checkNamenodeBeforeReturn() throws Exception {
for (int i = 0; i < CHECKTIMES; i++) {
RemoteIterator<CacheDirectiveEntry> iter =
dfs.listCacheDirectives(
new CacheDirectiveInfo.Builder().
setPool(directive.getPool()).
setPath(directive.getPath()).
build());
if (iter.hasNext()) {
return true;
}
Thread.sleep(1000);
}
return false;
}
@Override
boolean checkNamenodeBeforeReturn() throws Exception {
for (int i = 0; i < CHECKTIMES; i++) {
RemoteIterator<CacheDirectiveEntry> iter =
dfs.listCacheDirectives(
new CacheDirectiveInfo.Builder().
setPool(directive.getPool()).
setPath(directive.getPath()).
build());
while (iter.hasNext()) {
CacheDirectiveInfo result = iter.next().getInfo();
if ((result.getId() == id) &&
(result.getReplication().shortValue() == newReplication)) {
return true;
}
}
Thread.sleep(1000);
}
return false;
}
@Override
boolean checkNamenodeBeforeReturn() throws Exception {
for (int i = 0; i < CHECKTIMES; i++) {
RemoteIterator<CacheDirectiveEntry> iter =
dfs.listCacheDirectives(
new CacheDirectiveInfo.Builder().
setPool(directive.getPool()).
setPath(directive.getPath()).
build());
if (!iter.hasNext()) {
return true;
}
Thread.sleep(1000);
}
return false;
}
@SuppressWarnings("unchecked")
private void listCachePools(
HashSet<String> poolNames, int active) throws Exception {
HashSet<String> tmpNames = (HashSet<String>)poolNames.clone();
RemoteIterator<CachePoolEntry> pools = dfs.listCachePools();
int poolCount = poolNames.size();
for (int i=0; i<poolCount; i++) {
CachePoolEntry pool = pools.next();
String pollName = pool.getInfo().getPoolName();
assertTrue("The pool name should be expected", tmpNames.remove(pollName));
if (i % 2 == 0) {
int standby = active;
active = (standby == 0) ? 1 : 0;
cluster.transitionToStandby(standby);
cluster.transitionToActive(active);
cluster.waitActive(active);
}
}
assertTrue("All pools must be found", tmpNames.isEmpty());
}
private static void checkEquals(RemoteIterator<LocatedFileStatus> i1,
RemoteIterator<LocatedFileStatus> i2) throws IOException {
while (i1.hasNext()) {
assertTrue(i2.hasNext());
// Compare all the fields but the path name, which is relative
// to the original path from listFiles.
LocatedFileStatus l1 = i1.next();
LocatedFileStatus l2 = i2.next();
assertEquals(l1.getAccessTime(), l2.getAccessTime());
assertEquals(l1.getBlockSize(), l2.getBlockSize());
assertEquals(l1.getGroup(), l2.getGroup());
assertEquals(l1.getLen(), l2.getLen());
assertEquals(l1.getModificationTime(), l2.getModificationTime());
assertEquals(l1.getOwner(), l2.getOwner());
assertEquals(l1.getPermission(), l2.getPermission());
assertEquals(l1.getReplication(), l2.getReplication());
}
assertFalse(i2.hasNext());
}
public List<LocatedFileStatus> listFiles(Path path) throws IOException {
RemoteIterator<LocatedFileStatus> i = dfs.listFiles(path, false);
List<LocatedFileStatus> retList = new ArrayList<>();
while (i.hasNext()) {
LocatedFileStatus locatedFileStatus = i.next();
retList.add(locatedFileStatus);
}
return retList;
}
public int countFiles(Path path) throws IOException {
RemoteIterator<LocatedFileStatus> i = dfs.listFiles(path, false);
int files = 0;
while (i.hasNext()) {
files++;
i.next();
}
return files;
}
@Override
public long[] getWindowIds(int operatorId) throws IOException
{
Path lPath = new Path(path + Path.SEPARATOR + String.valueOf(operatorId));
try {
FileStatus status = fileContext.getFileStatus(lPath);
if (!status.isDirectory()) {
throw new RuntimeException("Checkpoint location is not a directory");
}
} catch (FileNotFoundException ex) {
// During initialization checkpoint directory may not exists.
fileContext.mkdir(lPath, FsPermission.getDirDefault(), true);
}
RemoteIterator<FileStatus> fileStatusRemoteIterator = fileContext.listStatus(lPath);
List<Long> lwindows = new ArrayList<>();
while (fileStatusRemoteIterator.hasNext()) {
FileStatus fileStatus = fileStatusRemoteIterator.next();
String name = fileStatus.getPath().getName();
if (name.equals(TMP_FILE)) {
continue;
}
lwindows.add(STATELESS_CHECKPOINT_WINDOW_ID.equals(name) ? Stateless.WINDOW_ID : Long.parseLong(name, 16));
}
long[] windowIds = new long[lwindows.size()];
for (int i = 0; i < windowIds.length; i++) {
windowIds[i] = lwindows.get(i);
}
return windowIds;
}
@Test
public void testListStatusIteratorRoot() throws IOException {
final Path root = new Path("/");
final RemoteIterator<FileStatus> statusIterator = fs.listStatusIterator(root);
assertTrue(statusIterator.hasNext());
final FileStatus onlyStatus = statusIterator.next();
assertEquals(new Path("pdfs:/foo"), onlyStatus.getPath());
assertTrue(onlyStatus.isDirectory());
assertEquals(0755, onlyStatus.getPermission().toExtendedShort());
assertTrue(!statusIterator.hasNext());
}
/**
* Filter a RemoteIterator based on a predicate that is allowed to throw an IOException.
*
* @param iter The RemoteIterator to filter.
* @param predicate The predicate to apply.
* @return the new RemoteIterator
*/
public static RemoteIterator<LocatedFileStatus> filter(RemoteIterator<LocatedFileStatus> iter, PredicateWithIOException<LocatedFileStatus> predicate) {
return new RemoteIterators.IterToRemote(Iterators.filter(
new RemoteIterators.RemoteToIter(iter),
t -> {
try {
return predicate.apply(t);
} catch (IOException ex) {
throw new CaughtIO(ex);
}
}
));
}
TreeSet<String> getAllFilePath(Path filePath, String resPathPrefix) throws IOException {
String fsPathPrefix = filePath.toUri().getPath();
TreeSet<String> fileList = new TreeSet<>();
RemoteIterator<LocatedFileStatus> it = fs.listFiles(filePath, true);
while (it.hasNext()) {
String path = it.next().getPath().toUri().getPath();
if (!path.startsWith(fsPathPrefix))
throw new IllegalStateException("File path " + path + " is supposed to start with " + fsPathPrefix);
String resPath = resPathPrefix + path.substring(fsPathPrefix.length() + 1);
fileList.add(resPath);
}
return fileList;
}
@Override
protected void visitFolderImpl(String folderPath, boolean recursive, VisitFilter filter, boolean loadContent,
Visitor visitor) throws IOException {
Path p = getRealHDFSPath(folderPath);
if (!fs.exists(p) || !fs.isDirectory(p)) {
return;
}
String fsPathPrefix = p.toUri().getPath();
String resPathPrefix = folderPath.endsWith("/") ? folderPath : folderPath + "/";
RemoteIterator<LocatedFileStatus> it = fs.listFiles(p, recursive);
while (it.hasNext()) {
LocatedFileStatus status = it.next();
if (status.isDirectory())
continue;
String path = status.getPath().toUri().getPath();
if (!path.startsWith(fsPathPrefix))
throw new IllegalStateException("File path " + path + " is supposed to start with " + fsPathPrefix);
String resPath = resPathPrefix + path.substring(fsPathPrefix.length() + 1);
if (filter.matches(resPath, status.getModificationTime())) {
RawResource raw;
if (loadContent)
raw = new RawResource(resPath, status.getModificationTime(), fs.open(status.getPath()));
else
raw = new RawResource(resPath, status.getModificationTime());
try {
visitor.visit(raw);
} finally {
raw.close();
}
}
}
}