下面列出了org.apache.hadoop.fs.FileStatus#getReplication ( ) 实例代码,或者点击链接到github查看源代码,也可以在右侧发表评论。
private FSDataOutputStream create(Path f, Reporter reporter,
FileStatus srcstat) throws IOException {
if (destFileSys.exists(f)) {
destFileSys.delete(f, false);
}
if (!preserve_status) {
return destFileSys.create(f, true, sizeBuf, reporter);
}
FsPermission permission = preseved.contains(FileAttribute.PERMISSION)?
srcstat.getPermission(): null;
short replication = preseved.contains(FileAttribute.REPLICATION)?
srcstat.getReplication(): destFileSys.getDefaultReplication(f);
long blockSize = preseved.contains(FileAttribute.BLOCK_SIZE)?
srcstat.getBlockSize(): destFileSys.getDefaultBlockSize(f);
return destFileSys.create(f, permission, true, sizeBuf, replication,
blockSize, reporter);
}
/**
* Increase the replication factor of _distcp_src_files to
* sqrt(min(maxMapsOnCluster, numMaps)). This is to reduce the chance of
* failing of distcp because of "not having a replication of _distcp_src_files
* available for reading for some maps".
*/
private static void setReplication(Configuration conf, JobConf jobConf,
Path srcfilelist, int numMaps) throws IOException {
int numMaxMaps = new JobClient(jobConf).getClusterStatus().getMaxMapTasks();
short replication = (short) Math.ceil(
Math.sqrt(Math.min(numMaxMaps, numMaps)));
FileSystem fs = srcfilelist.getFileSystem(conf);
FileStatus srcStatus = fs.getFileStatus(srcfilelist);
if (srcStatus.getReplication() < replication) {
if (!fs.setReplication(srcfilelist, replication)) {
throw new IOException("Unable to increase the replication of file " +
srcfilelist);
}
}
}
private FSDataOutputStream create(Path f, Reporter reporter,
FileStatus srcstat) throws IOException {
if (destFileSys.exists(f)) {
destFileSys.delete(f, false);
}
if (!preserve_status) {
return destFileSys.create(f, true, sizeBuf, reporter);
}
FsPermission permission = preseved.contains(FileAttribute.PERMISSION)?
srcstat.getPermission(): null;
short replication = preseved.contains(FileAttribute.REPLICATION)?
srcstat.getReplication(): destFileSys.getDefaultReplication(f);
long blockSize = preseved.contains(FileAttribute.BLOCK_SIZE)?
srcstat.getBlockSize(): destFileSys.getDefaultBlockSize(f);
return destFileSys.create(f, permission, true, sizeBuf, replication,
blockSize, reporter);
}
/**
* Increase the replication factor of _distcp_src_files to
* sqrt(min(maxMapsOnCluster, numMaps)). This is to reduce the chance of
* failing of distcp because of "not having a replication of _distcp_src_files
* available for reading for some maps".
*/
private static void setReplication(Configuration conf, JobConf jobConf,
Path srcfilelist, int numMaps) throws IOException {
int numMaxMaps = new JobClient(jobConf).getClusterStatus().getMaxMapTasks();
short replication = (short) Math.ceil(
Math.sqrt(Math.min(numMaxMaps, numMaps)));
FileSystem fs = srcfilelist.getFileSystem(conf);
FileStatus srcStatus = fs.getFileStatus(srcfilelist);
if (srcStatus.getReplication() < replication) {
if (!fs.setReplication(srcfilelist, replication)) {
throw new IOException("Unable to increase the replication of file " +
srcfilelist);
}
}
}
public void checkFile(FileSystem srcFs, FileStatus srcFile,
FileSystem parityFs, Path partFile, HarIndex.IndexEntry entry,
Codec codec) throws IOException {
if (srcFile.getReplication() > blockMoveMinRepl) {
// We only check placement for the file with 0..blockMoveMinRepl replicas.
return;
}
if (srcFs.getUri().equals(parityFs.getUri())) {
BlockAndDatanodeResolver resolver = new BlockAndDatanodeResolver(
srcFile.getPath(), srcFs, partFile, parityFs);
checkBlockLocations(
getBlockInfos(srcFs, srcFile),
getBlockInfos(parityFs, partFile, entry.startOffset, entry.length),
codec, srcFile, resolver);
} else {
// TODO: Move blocks in two clusters separately
LOG.warn("Source and parity are in different file system. " +
" source:" + srcFs.getUri() + " parity:" + parityFs.getUri() +
". Skip.");
}
}
private FSDataOutputStream create(Path f, Reporter reporter,
FileStatus srcstat) throws IOException {
if (destFileSys.exists(f)) {
destFileSys.delete(f, false);
}
if (!preserve_status) {
return destFileSys.create(f, true, sizeBuf, reporter);
}
FsPermission permission = preseved.contains(FileAttribute.PERMISSION)?
srcstat.getPermission(): null;
short replication = preseved.contains(FileAttribute.REPLICATION)?
srcstat.getReplication(): destFileSys.getDefaultReplication();
long blockSize = preseved.contains(FileAttribute.BLOCK_SIZE)?
srcstat.getBlockSize(): destFileSys.getDefaultBlockSize();
return destFileSys.create(f, permission, true, sizeBuf, replication,
blockSize, reporter);
}
boolean match(FileStatus f, long mtime, long now,
boolean skipParityCheck, Configuration conf) throws IOException {
String pathStr = normalizePath(f.getPath());
if (pathStr.startsWith(srcPrefix)) {
if (f.isDir() != codec.isDirRaid) {
return false;
}
if (now - mtime > modTimePeriod) {
return true;
}
// check if the file is already raided.
if (f.getReplication() == targetReplication) {
if (skipParityCheck ||
ParityFilePair.parityExists(f, codec, conf)) {
return true;
}
}
}
return false;
}
private FSDataOutputStream create(Path f, Reporter reporter,
FileStatus srcstat) throws IOException {
if (destFileSys.exists(f)) {
destFileSys.delete(f, false);
}
if (!preserve_status) {
return destFileSys.create(f, true, sizeBuf, reporter);
}
FsPermission permission = preserved.contains(FileAttribute.PERMISSION)?
srcstat.getPermission(): null;
short replication = preserved.contains(FileAttribute.REPLICATION)?
srcstat.getReplication(): destFileSys.getDefaultReplication();
long blockSize = preserved.contains(FileAttribute.BLOCK_SIZE)?
srcstat.getBlockSize(): destFileSys.getDefaultBlockSize();
return destFileSys.create(f, permission, true, sizeBuf, replication,
blockSize, reporter);
}
@Override
public FileStatus getFileStatus(Path f) throws IOException {
// it's the RawFS in place which messes things up as it dynamically returns the permissions...
// workaround by doing a copy
FileStatus fs = super.getFileStatus(f);
// work-around for Hive 0.14
if (SCRATCH_DIR.equals(f.toString())) {
System.out.println("Faking scratch dir permissions on Windows...");
return new FileStatus(fs.getLen(), fs.isDir(), fs.getReplication(), fs.getBlockSize(),
fs.getModificationTime(), fs.getAccessTime(), SCRATCH_DIR_PERMS, fs.getOwner(), fs.getGroup(),
fs.getPath());
// this doesn't work since the RawFS impl has its own algo that does the lookup dynamically
//fs.getPermission().fromShort((short) 777);
}
return fs;
}
private FSDataOutputStream create(Path f, Reporter reporter,
FileStatus srcstat) throws IOException {
if (destFileSys.exists(f)) {
destFileSys.delete(f, false);
}
if (!preserve_status) {
return destFileSys.create(f, true, sizeBuf, reporter);
}
FsPermission permission = preserved.contains(FileAttribute.PERMISSION)?
srcstat.getPermission(): null;
short replication = preserved.contains(FileAttribute.REPLICATION)?
srcstat.getReplication(): destFileSys.getDefaultReplication();
long blockSize = preserved.contains(FileAttribute.BLOCK_SIZE)?
srcstat.getBlockSize(): destFileSys.getDefaultBlockSize();
return destFileSys.create(f, permission, true, sizeBuf, replication,
blockSize, reporter);
}
static long savingFromRaidingFile(FileStatus stat, int stripeSize,
int paritySize, int targetReplication, int parityReplication) {
long currentReplication = stat.getReplication();
if (currentReplication > targetReplication) {
long numBlocks = numBlocks(stat);
long numStripes = numStripes(numBlocks, stripeSize);
long sourceSaving = stat.getLen()
* (currentReplication - targetReplication);
long parityBlocks = numStripes * paritySize;
return sourceSaving - parityBlocks * parityReplication
* stat.getBlockSize();
}
return 0;
}
public DFSFile(DFSPath parent, Path path) {
super(parent, path);
try {
FileStatus fs = getDFS().getFileStatus(path);
this.length = fs.getLen();
this.replication = fs.getReplication();
} catch (IOException e) {
e.printStackTrace();
}
}
public FileStatusExtended(FileStatus stat, Block[] blocks, String leaseHolder) {
super(stat.getLen(), stat.isDir(), stat.getReplication(),
stat.getBlockSize(), stat.getModificationTime(), stat.getAccessTime(),
stat.getPermission(), stat.getOwner(), stat.getGroup(),
stat.getPath());
this.blocks = blocks;
this.leaseHolder = (leaseHolder == null) ? "" : leaseHolder;
}
@Override
public FileStatus getFileStatus(Path inlinePath) throws IOException {
Path outerPath = InLineFSUtils.getOuterfilePathFromInlinePath(inlinePath);
FileSystem outerFs = outerPath.getFileSystem(conf);
FileStatus status = outerFs.getFileStatus(outerPath);
FileStatus toReturn = new FileStatus(InLineFSUtils.length(inlinePath), status.isDirectory(), status.getReplication(), status.getBlockSize(),
status.getModificationTime(), status.getAccessTime(), status.getPermission(), status.getOwner(),
status.getGroup(), inlinePath);
return toReturn;
}
/**
* @return <expected, gotten, backup>, where each is sorted
*/
private static List<List<String>> getFileLists(FileStatus[] previous, FileStatus[] archived) {
List<List<String>> files = new ArrayList<>(3);
// copy over the original files
List<String> originalFileNames = convertToString(previous);
files.add(originalFileNames);
List<String> currentFiles = new ArrayList<>(previous.length);
List<FileStatus> backedupFiles = new ArrayList<>(previous.length);
for (FileStatus f : archived) {
String name = f.getPath().getName();
// if the file has been backed up
if (name.contains(".")) {
Path parent = f.getPath().getParent();
String shortName = name.split("[.]")[0];
Path modPath = new Path(parent, shortName);
FileStatus file = new FileStatus(f.getLen(), f.isDirectory(), f.getReplication(),
f.getBlockSize(), f.getModificationTime(), modPath);
backedupFiles.add(file);
} else {
// otherwise, add it to the list to compare to the original store files
currentFiles.add(name);
}
}
files.add(currentFiles);
files.add(convertToString(backedupFiles));
return files;
}
/**
* Get the total physical size in the directory
* @param lfs the Files under the directory
* @return
*/
public static long getDirPhysicalSize(List<FileStatus> lfs) {
long totalSize = 0L;
if (null == lfs) {
return totalSize;
}
for (FileStatus fsStat : lfs) {
totalSize += fsStat.getLen() * fsStat.getReplication();
}
return totalSize;
}
private static short getReplicationFactor(
EnumSet<FileAttribute> fileAttributes,
FileStatus sourceFile, FileSystem targetFS, Path tmpTargetPath) {
return fileAttributes.contains(FileAttribute.REPLICATION)?
sourceFile.getReplication() : targetFS.getDefaultReplication(tmpTargetPath);
}
public void checkFile(String cmd, String[] args, int startIndex)
throws IOException {
if (startIndex >= args.length) {
printUsage(cmd);
throw new IllegalArgumentException("Insufficient arguments");
}
for (int i = startIndex; i < args.length; i++) {
Path p = new Path(args[i]);
FileSystem fs = p.getFileSystem(conf);
// if we got a raid fs, get the underlying fs
if (fs instanceof DistributedRaidFileSystem) {
fs = ((DistributedRaidFileSystem) fs).getFileSystem();
}
// We should be able to cast at this point.
DistributedFileSystem dfs = (DistributedFileSystem) fs;
RemoteIterator<Path> corruptIt = dfs.listCorruptFileBlocks(p);
int count = 0;
while (corruptIt.hasNext()) {
count++;
Path corruptFile = corruptIt.next();
// Result of checking.
String result = null;
FileStatus stat = fs.getFileStatus(p);
if (stat.getReplication() < fs.getDefaultReplication()) {
RaidInfo raidInfo = getFileRaidInfo(corruptFile);
if (raidInfo.codec == null) {
result = "Below default replication but no parity file found";
} else {
boolean notRecoverable = isFileCorrupt(dfs, corruptFile);
if (notRecoverable) {
result = "Missing too many blocks to be recovered " +
"using parity file " + raidInfo.parityPair.getPath();
} else {
result = "Has missing blocks but can be read using parity file " +
raidInfo.parityPair.getPath();
}
}
} else {
result = "At default replication, not raided";
}
out.println("Result of checking " + corruptFile + " : " +
result);
}
out.println("Found " + count + " files with missing blocks");
}
}
private void inc(FileStatus status) {
numFiles += 1;
numBlocks += computeNumBlocks(status);
numLogical += status.getLen();
numBytes += status.getLen() * status.getReplication();
}
void recurseHar(Codec codec, FileSystem destFs, FileStatus dest,
String destPrefix, FileSystem srcFs, long cutoff, String tmpHarPath)
throws IOException {
if (!dest.isDir()) {
return;
}
Path destPath = dest.getPath(); // pathname, no host:port
String destStr = destPath.toUri().getPath();
// If the source directory is a HAR, do nothing.
if (destStr.endsWith(".har")) {
return;
}
// Verify if it already contains a HAR directory
if (destFs.exists(new Path(destPath, destPath.getName() + HAR_SUFFIX))) {
return;
}
boolean shouldHar = false;
FileStatus[] files = destFs.listStatus(destPath);
long harBlockSize = -1;
short harReplication = -1;
if (files != null) {
shouldHar = files.length > 0;
for (FileStatus one : files) {
if (one.isDir()) {
recurseHar(codec, destFs, one, destPrefix, srcFs, cutoff,
tmpHarPath);
shouldHar = false;
} else if (one.getModificationTime() > cutoff) {
if (shouldHar) {
LOG.debug("Cannot archive " + destPath + " because "
+ one.getPath() + " was modified after cutoff");
shouldHar = false;
}
} else {
if (harBlockSize == -1) {
harBlockSize = one.getBlockSize();
} else if (harBlockSize != one.getBlockSize()) {
LOG.info("Block size of " + one.getPath() + " is "
+ one.getBlockSize()
+ " which is different from " + harBlockSize);
shouldHar = false;
}
if (harReplication == -1) {
harReplication = one.getReplication();
} else if (harReplication != one.getReplication()) {
LOG.info("Replication of " + one.getPath() + " is "
+ one.getReplication()
+ " which is different from " + harReplication);
shouldHar = false;
}
}
}
if (shouldHar) {
String src = destStr.replaceFirst(destPrefix, "");
Path srcPath = new Path(src);
FileStatus[] statuses = srcFs.listStatus(srcPath);
Path destPathPrefix = new Path(destPrefix)
.makeQualified(destFs);
if (statuses != null) {
for (FileStatus status : statuses) {
if (ParityFilePair.getParityFile(codec, status
.getPath().makeQualified(srcFs), conf) == null) {
LOG.debug("Cannot archive "
+ destPath
+ " because it doesn't contain parity file for "
+ status.getPath().makeQualified(srcFs)
+ " on destination " + destPathPrefix);
shouldHar = false;
break;
}
}
}
}
}
if (shouldHar) {
LOG.info("Archiving " + dest.getPath() + " to " + tmpHarPath);
singleHar(codec, destFs, dest, tmpHarPath, harBlockSize,
harReplication);
}
}