下面列出了org.apache.hadoop.fs.FileContext#rename ( ) 实例代码,或者点击链接到github查看源代码,也可以在右侧发表评论。
@Override
public void move(Directory fromDir, Directory toDir, String fileName, IOContext ioContext) throws IOException {
Directory baseFromDir = getBaseDir(fromDir);
Directory baseToDir = getBaseDir(toDir);
if (baseFromDir instanceof HdfsDirectory && baseToDir instanceof HdfsDirectory) {
Path dir1 = ((HdfsDirectory) baseFromDir).getHdfsDirPath();
Path dir2 = ((HdfsDirectory) baseToDir).getHdfsDirPath();
Path file1 = new Path(dir1, fileName);
Path file2 = new Path(dir2, fileName);
FileContext fileContext = FileContext.getFileContext(getConf(dir1));
fileContext.rename(file1, file2);
return;
}
super.move(fromDir, toDir, fileName, ioContext);
}
/**
* Tests that an edits log created using CreateEditsLog is valid and can be
* loaded successfully by a namenode.
*/
@Test(timeout=60000)
public void testCanLoadCreatedEditsLog() throws Exception {
// Format namenode.
HdfsConfiguration conf = new HdfsConfiguration();
File nameDir = new File(HDFS_DIR, "name");
conf.set(DFS_NAMENODE_NAME_DIR_KEY, Util.fileAsURI(nameDir).toString());
DFSTestUtil.formatNameNode(conf);
// Call CreateEditsLog and move the resulting edits to the name dir.
CreateEditsLog.main(new String[] { "-f", "1000", "0", "1", "-d",
TEST_DIR.getAbsolutePath() });
Path editsWildcard = new Path(TEST_DIR.getAbsolutePath(), "*");
FileContext localFc = FileContext.getLocalFSFileContext();
for (FileStatus edits: localFc.util().globStatus(editsWildcard)) {
Path src = edits.getPath();
Path dst = new Path(new File(nameDir, "current").getAbsolutePath(),
src.getName());
localFc.rename(src, dst);
}
// Start a namenode to try to load the edits.
cluster = new MiniDFSCluster.Builder(conf)
.format(false)
.manageNameDfsDirs(false)
.waitSafeMode(false)
.build();
cluster.waitClusterUp();
// Test successful, because no exception thrown.
}
/**
* Tests that an edits log created using CreateEditsLog is valid and can be
* loaded successfully by a namenode.
*/
@Test(timeout=60000)
public void testCanLoadCreatedEditsLog() throws Exception {
// Format namenode.
HdfsConfiguration conf = new HdfsConfiguration();
File nameDir = new File(HDFS_DIR, "name");
conf.set(DFS_NAMENODE_NAME_DIR_KEY, Util.fileAsURI(nameDir).toString());
DFSTestUtil.formatNameNode(conf);
// Call CreateEditsLog and move the resulting edits to the name dir.
CreateEditsLog.main(new String[] { "-f", "1000", "0", "1", "-d",
TEST_DIR.getAbsolutePath() });
Path editsWildcard = new Path(TEST_DIR.getAbsolutePath(), "*");
FileContext localFc = FileContext.getLocalFSFileContext();
for (FileStatus edits: localFc.util().globStatus(editsWildcard)) {
Path src = edits.getPath();
Path dst = new Path(new File(nameDir, "current").getAbsolutePath(),
src.getName());
localFc.rename(src, dst);
}
// Start a namenode to try to load the edits.
cluster = new MiniDFSCluster.Builder(conf)
.format(false)
.manageNameDfsDirs(false)
.waitSafeMode(false)
.build();
cluster.waitClusterUp();
// Test successful, because no exception thrown.
}
@Override
public void commit() throws IOException {
checkIsNotTmp();
// FileContext supports atomic rename, whereas FileSystem doesn't
FileContext fc = FileContext.getFileContext(_hadoopConf);
Path dest = new Path(_path.getParent(), BLOBSTORE_DATA_FILE);
if (_mustBeNew) {
fc.rename(_path, dest);
} else {
fc.rename(_path, dest, Options.Rename.OVERWRITE);
}
// Note, we could add support for setting the replication factor
}
@Override
public Object restore() throws IOException
{
FileContext fc = FileContext.getFileContext(fs.getUri());
// recover from wherever it was left
if (fc.util().exists(snapshotBackupPath)) {
LOG.warn("Incomplete checkpoint, reverting to {}", snapshotBackupPath);
fc.rename(snapshotBackupPath, snapshotPath, Rename.OVERWRITE);
// combine logs (w/o append, create new file)
Path tmpLogPath = new Path(basedir, "log.combined");
try (FSDataOutputStream fsOut = fc.create(tmpLogPath, EnumSet.of(CreateFlag.CREATE, CreateFlag.OVERWRITE))) {
try (FSDataInputStream fsIn = fc.open(logBackupPath)) {
IOUtils.copy(fsIn, fsOut);
}
try (FSDataInputStream fsIn = fc.open(logPath)) {
IOUtils.copy(fsIn, fsOut);
}
}
fc.rename(tmpLogPath, logPath, Rename.OVERWRITE);
fc.delete(logBackupPath, false);
} else {
// we have log backup, but no checkpoint backup
// failure between log rotation and writing checkpoint
if (fc.util().exists(logBackupPath)) {
LOG.warn("Found {}, did checkpointing fail?", logBackupPath);
fc.rename(logBackupPath, logPath, Rename.OVERWRITE);
}
}
if (!fc.util().exists(snapshotPath)) {
LOG.debug("No existing checkpoint.");
return null;
}
LOG.debug("Reading checkpoint {}", snapshotPath);
InputStream is = fc.open(snapshotPath);
// indeterministic class loading behavior
// http://stackoverflow.com/questions/9110677/readresolve-not-working-an-instance-of-guavas-serializedform-appears
final ClassLoader loader = Thread.currentThread().getContextClassLoader();
try (ObjectInputStream ois = new ObjectInputStream(is)
{
@Override
protected Class<?> resolveClass(ObjectStreamClass objectStreamClass)
throws IOException, ClassNotFoundException
{
return Class.forName(objectStreamClass.getName(), true, loader);
}
}) {
return ois.readObject();
} catch (ClassNotFoundException cnfe) {
throw new IOException("Failed to read checkpointed state", cnfe);
}
}
public void renameWithOverwrite(Directory dir, String fileName, String toName) throws IOException {
String hdfsDirPath = getPath(dir);
FileContext fileContext = FileContext.getFileContext(getConf(new Path(hdfsDirPath)));
fileContext.rename(new Path(hdfsDirPath, fileName), new Path(hdfsDirPath, toName), Options.Rename.OVERWRITE);
}
@Override
public Object restore() throws IOException
{
FileContext fc = FileContext.getFileContext(fs.getUri());
// recover from wherever it was left
if (fc.util().exists(snapshotBackupPath)) {
LOG.warn("Incomplete checkpoint, reverting to {}", snapshotBackupPath);
fc.rename(snapshotBackupPath, snapshotPath, Rename.OVERWRITE);
// combine logs (w/o append, create new file)
Path tmpLogPath = new Path(basedir, "log.combined");
try (FSDataOutputStream fsOut = fc.create(tmpLogPath, EnumSet.of(CreateFlag.CREATE, CreateFlag.OVERWRITE))) {
try (FSDataInputStream fsIn = fc.open(logBackupPath)) {
IOUtils.copy(fsIn, fsOut);
}
try (FSDataInputStream fsIn = fc.open(logPath)) {
IOUtils.copy(fsIn, fsOut);
}
}
fc.rename(tmpLogPath, logPath, Rename.OVERWRITE);
fc.delete(logBackupPath, false);
} else {
// we have log backup, but no checkpoint backup
// failure between log rotation and writing checkpoint
if (fc.util().exists(logBackupPath)) {
LOG.warn("Found {}, did checkpointing fail?", logBackupPath);
fc.rename(logBackupPath, logPath, Rename.OVERWRITE);
}
}
if (!fc.util().exists(snapshotPath)) {
LOG.debug("No existing checkpoint.");
return null;
}
LOG.debug("Reading checkpoint {}", snapshotPath);
InputStream is = fc.open(snapshotPath);
// indeterministic class loading behavior
// http://stackoverflow.com/questions/9110677/readresolve-not-working-an-instance-of-guavas-serializedform-appears
final ClassLoader loader = Thread.currentThread().getContextClassLoader();
try (ObjectInputStream ois = new ObjectInputStream(is)
{
@Override
protected Class<?> resolveClass(ObjectStreamClass objectStreamClass)
throws IOException, ClassNotFoundException
{
return Class.forName(objectStreamClass.getName(), true, loader);
}
}) {
return ois.readObject();
} catch (ClassNotFoundException cnfe) {
throw new IOException("Failed to read checkpointed state", cnfe);
}
}
/**
* A wrapper around {@link FileContext#rename(Path, Path, Options.Rename...)}}.
*/
public static void renamePath(FileContext fc, Path oldName, Path newName, boolean overwrite)
throws IOException {
Options.Rename renameOptions = (overwrite) ? Options.Rename.OVERWRITE : Options.Rename.NONE;
fc.rename(oldName, newName, renameOptions);
}
public static void main(String[] args) throws IOException {
FileContext fileContext = FileContext.getFileContext(new Configuration());
Path src = new Path("/tmp/garmadon/test");
Path dst = new Path("/tmp/garmadon/test2");
fileContext.mkdir(src, FsPermission.getDefault(), true);
fileContext.rename(src, dst, Options.Rename.OVERWRITE);
fileContext.listStatus(dst);
fileContext.delete(dst, false);
}