下面列出了怎么用org.apache.hadoop.fs.FileStatus的API类实例代码及写法,或者点击链接到github查看源代码。
/**
* Check all files in a mob column family dir.
*/
protected void checkMobColFamDir(Path cfDir) throws IOException {
FileStatus[] statuses = null;
try {
statuses = fs.listStatus(cfDir); // use same filter as scanner.
} catch (FileNotFoundException fnfe) {
// Hadoop 0.23+ listStatus semantics throws an exception if the path does not exist.
LOG.warn("Mob colfam Directory " + cfDir +
" does not exist. Likely the table is deleted. Skipping.");
missedMobFiles.add(cfDir);
return;
}
List<FileStatus> hfs = FSUtils.filterFileStatuses(statuses, new HFileFilter(fs));
// Hadoop 1.0 listStatus does not throw an exception if the path does not exist.
if (hfs.isEmpty() && !fs.exists(cfDir)) {
LOG.warn("Mob colfam Directory " + cfDir +
" does not exist. Likely the table is deleted. Skipping.");
missedMobFiles.add(cfDir);
return;
}
for (FileStatus hfFs : hfs) {
Path hf = hfFs.getPath();
checkMobFile(hf);
}
}
private void cleanUp(boolean isAppendDictGlobal) throws IOException {
long timestamp = System.currentTimeMillis();
if (isAppendDictGlobal) {
Long[] versions = listAllVersions();
for (int i = 0; i < versions.length - maxVersions; i++) {
if (versions[i] + versionTTL < timestamp) {
fileSystem.delete(getVersionDir(versions[i]), true);
}
}
} else {
FileStatus[] segmentDictDirs = fileSystem.listStatus(basePath.getParent());
for (FileStatus fileStatus : segmentDictDirs) {
String filePath = fileStatus.getPath().getName();
Long version = Long.parseLong(filePath.split("_")[1]);
if (version + versionTTL < timestamp) {
fileSystem.delete(new Path(basePath.getParent() + "/" + filePath), true);
}
}
}
}
@Test
public void testHandleDeadWorker() throws Exception {
Table table = TEST_UTIL.createTable(TABLE_NAME, FAMILY, TEST_UTIL.KEYS_FOR_HBA_CREATE_TABLE);
for (int i = 0; i < 10; i++) {
TEST_UTIL.loadTable(table, FAMILY);
}
HRegionServer testServer = TEST_UTIL.getHBaseCluster().getRegionServer(0);
ProcedureExecutor<MasterProcedureEnv> masterPE = master.getMasterProcedureExecutor();
List<FileStatus> wals = splitWALManager.getWALsToSplit(testServer.getServerName(), false);
Assert.assertEquals(1, wals.size());
TEST_UTIL.getHBaseCluster().killRegionServer(testServer.getServerName());
TEST_UTIL.waitFor(30000, () -> master.getProcedures().stream()
.anyMatch(procedure -> procedure instanceof SplitWALProcedure));
Procedure splitWALProcedure = master.getProcedures().stream()
.filter(procedure -> procedure instanceof SplitWALProcedure).findAny().get();
Assert.assertNotNull(splitWALProcedure);
TEST_UTIL.waitFor(5000, () -> ((SplitWALProcedure) splitWALProcedure).getWorker() != null);
TEST_UTIL.getHBaseCluster()
.killRegionServer(((SplitWALProcedure) splitWALProcedure).getWorker());
ProcedureTestingUtility.waitProcedure(masterPE, splitWALProcedure.getProcId());
Assert.assertTrue(splitWALProcedure.isSuccess());
ProcedureTestingUtility.waitAllProcedures(masterPE);
}
@Test
public void testListDirectory() throws Exception {
Path rootFolder = new Path("testingList");
assertTrue(fs.mkdirs(rootFolder));
FileStatus[] listed = fs.listStatus(rootFolder);
assertEquals(0, listed.length);
Path innerFolder = new Path(rootFolder, "inner");
assertTrue(fs.mkdirs(innerFolder));
listed = fs.listStatus(rootFolder);
assertEquals(1, listed.length);
assertTrue(listed[0].isDirectory());
Path innerFile = new Path(innerFolder, "innerFile");
writeString(innerFile, "testing");
listed = fs.listStatus(rootFolder);
assertEquals(1, listed.length);
assertTrue(listed[0].isDirectory());
listed = fs.listStatus(innerFolder);
assertEquals(1, listed.length);
assertFalse(listed[0].isDirectory());
assertTrue(fs.delete(rootFolder, true));
}
@CliCommand(value = "su", help = "Changes current active user [*experimental*]")
public synchronized String su(@CliOption(key = {""}, help = "su [<username>]") String newUser) throws IOException {
if (StringUtils.isEmpty(newUser)) {
return "No username is defined! ";
}
// else {
// newUser = BashUtils.parseArguments(newUser)[0];
// }
final FileSystem fs = getFileSystem();
final Path usersDir = new Path("/user");
if (fs.exists(usersDir)) {
final String finalNewUser = newUser;
final boolean foundUser = Arrays.stream(fs.listStatus(usersDir)).
filter(FileStatus::isDirectory).
anyMatch(fileStatus -> fileStatus.getPath().getName().equals(finalNewUser));
if (!foundUser) {
return "User " + newUser + " does not exist!";
}
}
System.setProperty("HADOOP_USER_NAME", newUser);
UserGroupInformation.loginUserFromSubject(null);
currentDir = null;
return "";
}
private Set<FileStatus> getStatuses(final Path path, final boolean recursive, final FileSystem hdfs, final PathFilter filter) throws IOException {
final Set<FileStatus> statusSet = new HashSet<>();
getLogger().debug("Fetching listing for {}", new Object[] {path});
final FileStatus[] statuses = hdfs.listStatus(path, filter);
for ( final FileStatus status : statuses ) {
if ( status.isDirectory() ) {
if ( recursive ) {
try {
statusSet.addAll(getStatuses(status.getPath(), recursive, hdfs, filter));
} catch (final IOException ioe) {
getLogger().error("Failed to retrieve HDFS listing for subdirectory {} due to {}; will continue listing others", new Object[] {status.getPath(), ioe});
}
}
} else {
statusSet.add(status);
}
}
return statusSet;
}
/**
* For each of the requested resources for a container, determines the
* appropriate {@link LocalResourcesTracker} and forwards a
* {@link LocalResourceRequest} to that tracker.
*/
private void handleInitContainerResources(
ContainerLocalizationRequestEvent rsrcReqs) {
Container c = rsrcReqs.getContainer();
// create a loading cache for the file statuses
LoadingCache<Path,Future<FileStatus>> statCache =
CacheBuilder.newBuilder().build(FSDownload.createStatusCacheLoader(getConfig()));
LocalizerContext ctxt = new LocalizerContext(
c.getUser(), c.getContainerId(), c.getCredentials(), statCache);
Map<LocalResourceVisibility, Collection<LocalResourceRequest>> rsrcs =
rsrcReqs.getRequestedResources();
for (Map.Entry<LocalResourceVisibility, Collection<LocalResourceRequest>> e :
rsrcs.entrySet()) {
LocalResourcesTracker tracker =
getLocalResourcesTracker(e.getKey(), c.getUser(),
c.getContainerId().getApplicationAttemptId()
.getApplicationId());
for (LocalResourceRequest req : e.getValue()) {
tracker.handle(new ResourceRequestEvent(req, e.getKey(), ctxt));
}
}
}
public void checkPermissionRetention(Configuration conf, String ourUrl,
Path path) throws Exception {
CredentialProvider provider = CredentialProviderFactory.getProviders(conf).get(0);
// let's add a new credential and flush and check that permissions are still set to 777
char[] cred = new char[32];
for(int i =0; i < cred.length; ++i) {
cred[i] = (char) i;
}
// create a new key
try {
provider.createCredentialEntry("key5", cred);
} catch (Exception e) {
e.printStackTrace();
throw e;
}
provider.flush();
// get a new instance of the provider to ensure it was saved correctly
provider = CredentialProviderFactory.getProviders(conf).get(0);
assertArrayEquals(cred, provider.getCredentialEntry("key5").getCredential());
FileSystem fs = path.getFileSystem(conf);
FileStatus s = fs.getFileStatus(path);
assertTrue("Permissions should have been retained from the preexisting " +
"keystore.", s.getPermission().toString().equals("rwxrwxrwx"));
}
@Test
public void test() throws IOException, InterruptedException {
region
.update(r -> r.put(new Put(Bytes.toBytes(1)).addColumn(CF1, QUALIFIER, Bytes.toBytes(1))));
region.flush(true);
Path testDir = htu.getDataTestDir();
FileSystem fs = testDir.getFileSystem(htu.getConfiguration());
// no archived wal files yet
assertFalse(fs.exists(globalWALArchiveDir));
region.requestRollAll();
region.waitUntilWalRollFinished();
// should have one
FileStatus[] files = fs.listStatus(globalWALArchiveDir);
assertEquals(1, files.length);
Thread.sleep(2000);
// should still be there
assertTrue(fs.exists(files[0].getPath()));
Thread.sleep(6000);
// should have been cleaned
assertEquals(0, fs.listStatus(globalWALArchiveDir).length);
}
/**
* Find suitable partitions, extract timestamp and compare it with previousTimestamp.
*/
@VisibleForTesting
static List<Tuple2<List<String>, Long>> suitablePartitions(
Context context,
long previousTimestamp,
FileStatus[] statuses) {
List<Tuple2<List<String>, Long>> partValueList = new ArrayList<>();
for (FileStatus status : statuses) {
List<String> partValues = extractPartitionValues(
new org.apache.flink.core.fs.Path(status.getPath().toString()));
long timestamp = context.extractTimestamp(
context.partitionKeys(),
partValues,
// to UTC millisecond.
() -> TimestampData.fromTimestamp(
new Timestamp(status.getModificationTime())).getMillisecond());
if (timestamp >= previousTimestamp) {
partValueList.add(new Tuple2<>(partValues, timestamp));
}
}
return partValueList;
}
/**
* stores the modification and access time for this inode.
* The access time is precise upto an hour. The transaction, if needed, is
* written to the edits log but is not flushed.
*/
public synchronized void setTimes(String src, long mtime, long atime) throws IOException {
if (!isAccessTimeSupported() && atime != -1) {
throw new IOException("Access time for hdfs is not configured. " +
" Please set dfs.support.accessTime configuration parameter.");
}
//
// The caller needs to have write access to set access & modification times.
if (isPermissionEnabled) {
checkPathAccess(src, FsAction.WRITE);
}
INodeFile inode = dir.getFileINode(src);
if (inode != null) {
dir.setTimes(src, inode, mtime, atime, true);
if (auditLog.isInfoEnabled()) {
final FileStatus stat = dir.getFileInfo(src);
logAuditEvent(UserGroupInformation.getCurrentUGI(),
Server.getRemoteIp(),
"setTimes", src, null, stat);
}
} else {
throw new FileNotFoundException("File " + src + " does not exist.");
}
}
/**
* touch file
*/
public static void touch(String filePath) throws IOException {
checkHDFSConf();
Path path = new Path(filePath);
FileStatus st;
if (fileSystem.exists(path)) {
st = fileSystem.getFileStatus(path);
if (st.isDirectory()) {
throw new IOException(filePath + " is a directory");
} else if (st.getLen() != 0) {
throw new IOException(filePath + " must be a zero-length file");
}
}
FSDataOutputStream out = null;
try {
out = fileSystem.create(path);
} finally {
if (out != null) {
out.close();
}
}
}
private void doGetFileLinkStatusTargetNotReadable() throws Exception {
// Try to getFileLinkStatus the link when the target is not readable
user.doAs(new PrivilegedExceptionAction<Object>() {
@Override
public Object run() throws IOException {
FileContext myfc = FileContext.getFileContext(conf);
FileStatus stat = myfc.getFileLinkStatus(link);
assertEquals("Expected link's FileStatus path to match link!",
link.makeQualified(fs.getUri(), fs.getWorkingDirectory()), stat.getPath());
Path linkTarget = myfc.getLinkTarget(link);
assertEquals("Expected link's target to match target!",
target, linkTarget);
return null;
}
});
}
public List<LocatedFileStatus> listFilesInfo(String dir) throws IOException
{
List<LocatedFileStatus> files = new ArrayList<>();
Path path = new Path(dir);
FileStatus fileStatus = fileSystem.getFileStatus(path);
if (!fileStatus.isDirectory()) {
throw new FileNotFoundException("Cannot read directory " + dir);
}
RemoteIterator<LocatedFileStatus> it = fileSystem.listFiles(path, false);
while (it.hasNext()) {
LocatedFileStatus lfs = it.next();
files.add(lfs);
}
return files;
}
/** Test the FileStatus obtained calling listStatus on a file */
@Test
public void testListStatusOnFile() throws IOException {
FileStatus[] stats = fs.listStatus(file1);
assertEquals(1, stats.length);
FileStatus status = stats[0];
assertFalse(file1 + " should be a file", status.isDirectory());
assertEquals(blockSize, status.getBlockSize());
assertEquals(1, status.getReplication());
assertEquals(fileSize, status.getLen());
assertEquals(file1.makeQualified(fs.getUri(),
fs.getWorkingDirectory()).toString(),
status.getPath().toString());
RemoteIterator<FileStatus> itor = fc.listStatus(file1);
status = itor.next();
assertEquals(stats[0], status);
assertFalse(file1 + " should be a file", status.isDirectory());
}
private void removeFileSlicesForPartition(HoodieTimeline timeline, HoodieInstant instant, String partition,
List<String> paths) {
if (isPartitionAvailableInStore(partition)) {
LOG.info("Removing file slices for partition (" + partition + ") for instant (" + instant + ")");
FileStatus[] statuses = paths.stream().map(p -> {
FileStatus status = new FileStatus();
status.setPath(new Path(p));
return status;
}).toArray(FileStatus[]::new);
List<HoodieFileGroup> fileGroups =
buildFileGroups(statuses, timeline.filterCompletedAndCompactionInstants(), false);
applyDeltaFileSlicesToPartitionView(partition, fileGroups, DeltaApplyMode.REMOVE);
} else {
LOG.warn("Skipping partition (" + partition + ") when syncing instant (" + instant + ") as it is not loaded");
}
}
/**
* get all file status of a dir.
*/
public static List<FileStatus> listFileStatus(String dir) throws IOException {
checkHDFSConf();
List<FileStatus> fileStatusList = new ArrayList<>();
Path path = new Path(dir);
if (fileSystem.isFile(path)) {
return fileStatusList;
}
FileStatus[] statuses = fileSystem.listStatus(path);
for (FileStatus fileStatus : statuses) {
if (!fileStatus.isDirectory()) {
fileStatusList.add(fileStatus);
}
}
return fileStatusList;
}
@Override
void checkSnapshots() throws Exception {
for (Path snapshotFile : statusMap.keySet()) {
FileStatus currentStatus = fs.exists(snapshotFile) ? fs
.getFileStatus(snapshotFile) : null;
FileStatus originalStatus = statusMap.get(snapshotFile);
assertEquals(currentStatus, originalStatus);
if (currentStatus != null) {
String s = null;
if (!currentStatus.toString().equals(originalStatus.toString())) {
s = "FAILED: " + getClass().getSimpleName()
+ ": file=" + file + ", snapshotFile" + snapshotFile
+ "\n\n currentStatus = " + currentStatus
+ "\noriginalStatus = " + originalStatus
+ "\n\nfile : " + fsdir.getINode(file.toString()).toDetailString()
+ "\n\nsnapshotFile: " + fsdir.getINode(snapshotFile.toString()).toDetailString();
SnapshotTestHelper.dumpTree(s, cluster);
}
assertEquals(s, currentStatus.toString(), originalStatus.toString());
}
}
}
public static void setKnownSitesOnHDFS(Configuration conf, String[] val) throws IOException, URISyntaxException {
conf.setInt(numberOfSites, val.length);
FileSystem fs;
for(int i = 0; i < val.length;i ++) {
// check if dir add all files!
fs = FileSystem.get(new URI(val[i]), conf);
if(fs.isFile(new Path(val[i]))) {
conf.set(sitesOnHDFSName + i, val[i]);
} else {
FileStatus[] files = fs.listStatus(new Path(val[i]));
for(FileStatus file : files) {
if (!file.isDir()) {
conf.set(sitesOnHDFSName + i, file.getPath().toString());
}
}
}
}
}
@Test
public void testCustomProvider() throws Exception {
FileSystem fs = FileSystem.get(miniDFS.getConfiguration(0));
fs.mkdirs(new Path("/user/xxx"));
FileStatus status = fs.getFileStatus(new Path("/user/xxx"));
Assert.assertEquals(System.getProperty("user.name"), status.getOwner());
Assert.assertEquals("supergroup", status.getGroup());
Assert.assertEquals(new FsPermission((short) 0755), status.getPermission());
fs.mkdirs(new Path("/user/authz"));
Path p = new Path("/user/authz");
status = fs.getFileStatus(p);
Assert.assertEquals("foo", status.getOwner());
Assert.assertEquals("bar", status.getGroup());
Assert.assertEquals(new FsPermission((short) 0770), status.getPermission());
AclStatus aclStatus = fs.getAclStatus(p);
Assert.assertEquals(1, aclStatus.getEntries().size());
Assert.assertEquals(AclEntryType.GROUP, aclStatus.getEntries().get(0)
.getType());
Assert.assertEquals("xxx", aclStatus.getEntries().get(0)
.getName());
Assert.assertEquals(FsAction.ALL, aclStatus.getEntries().get(0)
.getPermission());
Map<String, byte[]> xAttrs = fs.getXAttrs(p);
Assert.assertTrue(xAttrs.containsKey("user.test"));
Assert.assertEquals(2, xAttrs.get("user.test").length);
}
/**
* groups together all the data blocks for the same HDFS block
*
* @param rowGroupBlocks data blocks (row groups)
* @param hdfsBlocksArray hdfs blocks
* @param fileStatus the containing file
* @param requestedSchema the schema requested by the user
* @param readSupportMetadata the metadata provided by the readSupport implementation in init
* @param minSplitSize the mapred.min.split.size
* @param maxSplitSize the mapred.max.split.size
* @return the splits (one per HDFS block)
* @throws IOException If hosts can't be retrieved for the HDFS block
*/
static <T> List<ParquetInputSplit> generateSplits(
List<BlockMetaData> rowGroupBlocks,
BlockLocation[] hdfsBlocksArray,
FileStatus fileStatus,
String requestedSchema,
Map<String, String> readSupportMetadata, long minSplitSize, long maxSplitSize) throws IOException {
List<SplitInfo> splitRowGroups =
generateSplitInfo(rowGroupBlocks, hdfsBlocksArray, minSplitSize, maxSplitSize);
//generate splits from rowGroups of each split
List<ParquetInputSplit> resultSplits = new ArrayList<ParquetInputSplit>();
for (SplitInfo splitInfo : splitRowGroups) {
ParquetInputSplit split = splitInfo.getParquetInputSplit(fileStatus, requestedSchema, readSupportMetadata);
resultSplits.add(split);
}
return resultSplits;
}
/**
* Finds the avro file in the input folder, and returns its avro schema
* @param inputPathDir
* @return
* @throws IOException
*/
public static Schema getSchema(String inputPathDir) throws IOException {
FileSystem fs = FileSystem.get(new Configuration());
Schema avroSchema = null;
for (String input : inputPathDir.split(ThirdEyeConstants.FIELD_SEPARATOR)) {
Path inputPath = new Path(input);
for (FileStatus fileStatus : fs.listStatus(inputPath)) {
if (fileStatus.isFile() && fileStatus.getPath().getName().endsWith(ThirdEyeConstants.AVRO_SUFFIX)) {
LOGGER.info("Extracting schema from {}", fileStatus.getPath());
avroSchema = extractSchemaFromAvro(fileStatus.getPath());
break;
}
}
}
return avroSchema;
}
private synchronized void storeGenerations() throws IOException {
FileSystem fileSystem = _path.getFileSystem(_configuration);
FileStatus[] listStatus = fileSystem.listStatus(_path);
SortedSet<FileStatus> existing = new TreeSet<FileStatus>(Arrays.asList(listStatus));
long currentFile;
if (!existing.isEmpty()) {
FileStatus last = existing.last();
currentFile = Long.parseLong(last.getPath().getName());
} else {
currentFile = 0;
}
Path path = new Path(_path, buffer(currentFile + 1));
LOG.info("Creating new snapshot file [{0}]", path);
FSDataOutputStream outputStream = fileSystem.create(path, false);
Writer writer = SequenceFile.createWriter(_configuration, outputStream, Text.class, LongWritable.class,
CompressionType.NONE, null);
for (Entry<String, Long> e : _namesToGenerations.entrySet()) {
writer.append(new Text(e.getKey()), new LongWritable(e.getValue()));
}
writer.close();
outputStream.close();
cleanupOldFiles(fileSystem, existing);
}
@Override
public TransactionLogReader getReader() throws IOException {
FileStatus status = fs.getFileStatus(logPath);
long length = status.getLen();
TransactionLogReader reader;
// check if this file needs to be recovered due to failure
// Check for possibly empty file. With appends, currently Hadoop reports a
// zero length even if the file has been sync'd. Revisit if HDFS-376 or
// HDFS-878 is committed.
if (length <= 0) {
LOG.warn("File " + logPath + " might be still open, length is 0");
}
HDFSUtil hdfsUtil = new HDFSUtil();
hdfsUtil.recoverFileLease(fs, logPath, hConf);
try {
FileStatus newStatus = fs.getFileStatus(logPath);
LOG.info("New file size for " + logPath + " is " + newStatus.getLen());
SequenceFile.Reader fileReader = new SequenceFile.Reader(fs, logPath, hConf);
reader = new HDFSTransactionLogReaderSupplier(fileReader).get();
} catch (EOFException e) {
if (length <= 0) {
// TODO should we ignore an empty, not-last log file if skip.errors
// is false? Either way, the caller should decide what to do. E.g.
// ignore if this is the last log in sequence.
// TODO is this scenario still possible if the log has been
// recovered (i.e. closed)
LOG.warn("Could not open " + logPath + " for reading. File is empty", e);
return null;
} else {
// EOFException being ignored
return null;
}
}
return reader;
}
private static void touchFile(String path, boolean createMultipleBlocks,
ChecksumOpt checksumOpt) throws Exception {
FileSystem fs;
DataOutputStream outputStream = null;
try {
fs = cluster.getFileSystem();
final Path qualifiedPath = new Path(path).makeQualified(fs.getUri(),
fs.getWorkingDirectory());
final long blockSize = createMultipleBlocks ? NON_DEFAULT_BLOCK_SIZE : fs
.getDefaultBlockSize(qualifiedPath) * 2;
FsPermission permission = FsPermission.getFileDefault().applyUMask(
FsPermission.getUMask(fs.getConf()));
outputStream = fs.create(qualifiedPath, permission,
EnumSet.of(CreateFlag.CREATE, CreateFlag.OVERWRITE), 0,
(short) (fs.getDefaultReplication(qualifiedPath) * 2), blockSize,
null, checksumOpt);
byte[] bytes = new byte[DEFAULT_FILE_SIZE];
outputStream.write(bytes);
long fileSize = DEFAULT_FILE_SIZE;
if (createMultipleBlocks) {
while (fileSize < 2*blockSize) {
outputStream.write(bytes);
outputStream.flush();
fileSize += DEFAULT_FILE_SIZE;
}
}
pathList.add(qualifiedPath);
++nFiles;
FileStatus fileStatus = fs.getFileStatus(qualifiedPath);
System.out.println(fileStatus.getBlockSize());
System.out.println(fileStatus.getReplication());
}
finally {
IOUtils.cleanup(null, outputStream);
}
}
@Test(timeout = TestConstants.SWIFT_TEST_TIMEOUT)
public void testListStatusEmptyDirectory() throws Exception {
FileStatus[] paths;
paths = sFileSystem.listStatus(path(getBaseURI() + "/test/swift/a"));
assertEquals(dumpStats("/test/swift/a", paths), 0,
paths.length);
}
private LocalResource registerLocalResource(FileSystem fs, Path remoteRsrcPath)
throws IOException
{
LocalResource localResource = Records.newRecord(LocalResource.class);
FileStatus jarStat = fs.getFileStatus(remoteRsrcPath);
localResource.setResource(ConverterUtils.getYarnUrlFromURI(remoteRsrcPath.toUri()));
localResource.setSize(jarStat.getLen());
localResource.setTimestamp(jarStat.getModificationTime());
localResource.setType(LocalResourceType.FILE);
localResource.setVisibility(LocalResourceVisibility.APPLICATION);
return localResource;
}
@Override public BlockLocation[] getBlockLocation() throws IOException {
URI uri = ContentFactories.scrubURI(m_uri);
FileSystem fs = FileSystem.get(uri,s_config);
Path path = new Path(uri);
FileStatus status = fs.getFileStatus(path);
return fs.getFileBlockLocations(status, 0, status.getBlockSize());
}
@VisibleForTesting
protected CopyableFile generateCopyableFile(FileStatus singleFile, Path targetPath, long timestampFromPath,
Path locationToCopy) throws IOException {
return CopyableFile.fromOriginAndDestination(srcFs, singleFile, targetPath, configuration)
.originTimestamp(timestampFromPath).upstreamTimestamp(timestampFromPath)
.fileSet(PathUtils.getPathWithoutSchemeAndAuthority(locationToCopy).toString()).build();
}
@Override
public FileStatus getFileStatus(Path file) throws IOException {
FTPClient client = connect();
try {
FileStatus status = getFileStatus(client, file);
return status;
} finally {
disconnect(client);
}
}