下面列出了org.apache.hadoop.fs.FileUtil#copy ( ) 实例代码,或者点击链接到github查看源代码,也可以在右侧发表评论。
/**
* Commits the snapshot process by moving the working snapshot
* to the finalized filepath
*
* @param snapshotDir The file path of the completed snapshots
* @param workingDir The file path of the in progress snapshots
* @param fs The file system of the completed snapshots
* @param workingDirFs The file system of the in progress snapshots
* @param conf Configuration
*
* @throws SnapshotCreationException if the snapshot could not be moved
* @throws IOException the filesystem could not be reached
*/
public static void completeSnapshot(Path snapshotDir, Path workingDir, FileSystem fs,
FileSystem workingDirFs, final Configuration conf)
throws SnapshotCreationException, IOException {
LOG.debug("Sentinel is done, just moving the snapshot from " + workingDir + " to "
+ snapshotDir);
// If the working and completed snapshot directory are on the same file system, attempt
// to rename the working snapshot directory to the completed location. If that fails,
// or the file systems differ, attempt to copy the directory over, throwing an exception
// if this fails
URI workingURI = workingDirFs.getUri();
URI rootURI = fs.getUri();
if ((!workingURI.getScheme().equals(rootURI.getScheme()) ||
workingURI.getAuthority() == null ||
!workingURI.getAuthority().equals(rootURI.getAuthority()) ||
workingURI.getUserInfo() == null ||
!workingURI.getUserInfo().equals(rootURI.getUserInfo()) ||
!fs.rename(workingDir, snapshotDir)) && !FileUtil.copy(workingDirFs, workingDir, fs,
snapshotDir, true, true, conf)) {
throw new SnapshotCreationException("Failed to copy working directory(" + workingDir
+ ") to completed directory(" + snapshotDir + ").");
}
}
/**
* We use staging dir as mock HDFS dir.
* */
@Override
public boolean copyRemoteToLocal(String remoteUri, String localUri)
throws IOException {
// mock the copy from HDFS into a local copy
Path remoteToLocalDir = new Path(convertToStagingPath(remoteUri));
File old = new File(convertToStagingPath(localUri));
if (old.isDirectory() && old.exists()) {
if (!FileUtil.fullyDelete(old)) {
throw new IOException("Cannot delete temp dir:"
+ old.getAbsolutePath());
}
}
return FileUtil.copy(getDefaultFileSystem(), remoteToLocalDir,
new File(localUri), false,
getDefaultFileSystem().getConf());
}
@Override
void prepareForWrite(String workingDir, boolean isGlobal) throws IOException {
if (!fileSystem.exists(basePath)) {
logger.info("Global dict at {} doesn't exist, create a new one", basePath);
fileSystem.mkdirs(basePath);
}
migrateOldLayout();
logger.trace("Prepare to write Global dict at {}, isGlobal={}", workingDir, isGlobal);
Path working = new Path(workingDir);
if (fileSystem.exists(working)) {
fileSystem.delete(working, true);
logger.trace("Working directory {} exits, delete it first", working);
}
// when build dict, copy all data into working dir and work on it, avoiding suddenly server crash made data corrupt
Long[] versions = listAllVersions();
if (versions.length > 0 && isGlobal) {
Path latestVersion = getVersionDir(versions[versions.length - 1]);
FileUtil.copy(fileSystem, latestVersion, fileSystem, working, false, true, conf);
} else {
fileSystem.mkdirs(working);
}
}
private String setupTopCustomerQueryBundleJobApp() throws IOException {
String userName = System.getProperty("user.name");
String workFlowRoot = hadoopClusterService.getHDFSUri() + "/usr/"
+ userName + "/oozie/bundle-hive-to-es-topqueries";
// put oozie app in hadoop
DistributedFileSystem fs = hadoopClusterService.getFileSystem();
Path workFlowRootPath = new Path(workFlowRoot);
fs.delete(workFlowRootPath, true);
File wfDir = new ClassPathResource("oozie/bundle-hive-to-es-topqueries")
.getFile();
LOG.debug("wfdir: {}", wfDir.getAbsolutePath());
FileUtil.copy(wfDir, fs, workFlowRootPath, false, new Configuration());
FileUtil.copy(new ClassPathResource("hive/hive-site.xml").getFile(),
fs, new Path(workFlowRoot), false, new Configuration());
return workFlowRoot;
}
/**
* Try to locate and load the JobConf file for this job so to get more
* details on the job (number of maps and of reduces)
*/
private void loadJobFile() {
try {
String jobFile = getJobFile();
FileSystem fs = location.getDFS();
File tmp = File.createTempFile(getJobID().toString(), ".xml");
if (FileUtil.copy(fs, new Path(jobFile), tmp, false, location
.getConfiguration())) {
this.jobConf = new JobConf(tmp.toString());
this.totalMaps = jobConf.getNumMapTasks();
this.totalReduces = jobConf.getNumReduceTasks();
}
} catch (IOException ioe) {
ioe.printStackTrace();
}
}
private Path copyRemoteFiles(Path parentDir, Path originalPath,
Configuration conf, short replication) throws IOException {
// check if we do not need to copy the files
// is jt using the same file system.
// just checking for uri strings... doing no dns lookups
// to see if the filesystems are the same. This is not optimal.
// but avoids name resolution.
FileSystem remoteFs = null;
remoteFs = originalPath.getFileSystem(conf);
if (compareFs(remoteFs, jtFs)) {
return originalPath;
}
// this might have name collisions. copy will throw an exception
// parse the original path to create new path
Path newPath = new Path(parentDir, originalPath.getName());
FileUtil.copy(remoteFs, originalPath, jtFs, newPath, false, conf);
jtFs.setReplication(newPath, replication);
return newPath;
}
public static void copyNameDirs(Collection<URI> srcDirs, Collection<URI> dstDirs,
Configuration dstConf) throws IOException {
URI srcDir = Lists.newArrayList(srcDirs).get(0);
FileSystem dstFS = FileSystem.getLocal(dstConf).getRaw();
for (URI dstDir : dstDirs) {
Preconditions.checkArgument(!dstDir.equals(srcDir),
"src and dst are the same: " + dstDir);
File dstDirF = new File(dstDir);
if (dstDirF.exists()) {
if (!FileUtil.fullyDelete(dstDirF)) {
throw new IOException("Unable to delete: " + dstDirF);
}
}
LOG.info("Copying namedir from primary node dir "
+ srcDir + " to " + dstDir);
FileUtil.copy(
new File(srcDir),
dstFS, new Path(dstDir), false, dstConf);
}
}
private String setupHiveAddPartitionWorkflowApp() throws IOException {
String userName = System.getProperty("user.name");
String workFlowRoot = hadoopClusterService.getHDFSUri() + "/usr/"
+ userName + "/oozie/wf-hive-add-partition";
// put oozie app in hadoop
DistributedFileSystem fs = hadoopClusterService.getFileSystem();
Path workFlowRootPath = new Path(workFlowRoot);
fs.delete(workFlowRootPath, true);
File wfDir = new ClassPathResource("oozie/wf-hive-add-partition")
.getFile();
LOG.debug("wfdir: {}", wfDir.getAbsolutePath());
FileUtil.copy(wfDir, fs, workFlowRootPath, false, new Configuration());
FileUtil.copy(new ClassPathResource("hive/hive-site.xml").getFile(),
fs, new Path(workFlowRoot), false, new Configuration());
return workFlowRoot;
}
@Override
public boolean cp(final String sourceLocation, final String targetLocation) {
try {
return FileUtil.copy(this.fs, new Path(sourceLocation), this.fs, new Path(targetLocation), false, new Configuration());
} catch (final IOException e) {
throw new IllegalStateException(e.getMessage(), e);
}
}
@Override
public boolean rename(Path src, Path dst) throws IOException {
if (pathToFile(src).renameTo(pathToFile(dst))) {
return true;
}
return FileUtil.copy(this, src, this, dst, true, getConf());
}
@Override
public void onNext(final ResourceLaunchEvent resourceLaunchEvent) {
try {
LOG.log(Level.INFO, "resourceLaunch. {0}", resourceLaunchEvent.toString());
final File localStagingFolder =
Files.createTempDirectory(this.fileNames.getEvaluatorFolderPrefix()).toFile();
final Configuration evaluatorConfiguration = Tang.Factory.getTang()
.newConfigurationBuilder(resourceLaunchEvent.getEvaluatorConf())
.build();
final File configurationFile = new File(
localStagingFolder, this.fileNames.getEvaluatorConfigurationName());
this.configurationSerializer.toFile(evaluatorConfiguration, configurationFile);
JobJarMaker.copy(resourceLaunchEvent.getFileSet(), localStagingFolder);
final FileSystem fileSystem = FileSystem.get(new org.apache.hadoop.conf.Configuration());
final Path hdfsFolder = new Path(fileSystem.getUri() + "/" + resourceLaunchEvent.getIdentifier() + "/");
FileUtil.copy(localStagingFolder, fileSystem, hdfsFolder, false, new org.apache.hadoop.conf.Configuration());
// TODO[JIRA REEF-102]: Replace REEFExecutor with a simple launch command (we only need to launch REEFExecutor)
final List<String> command =
getLaunchCommand(resourceLaunchEvent, this.executors.getMemory(resourceLaunchEvent.getIdentifier()));
this.executors.launchEvaluator(
new EvaluatorLaunch(resourceLaunchEvent.getIdentifier(), StringUtils.join(command, ' ')));
} catch (final IOException e) {
throw new RuntimeException(e);
}
}
private Path copyFileToRemote(
Path destDir,
Path srcPath,
Short replication) throws IOException {
FileSystem destFs = destDir.getFileSystem(hadoopConf);
FileSystem srcFs = srcPath.getFileSystem(hadoopConf);
Path destPath = new Path(destDir, srcPath.getName());
LOGGER.info("Uploading resource " + srcPath + " to " + destPath);
FileUtil.copy(srcFs, srcPath, destFs, destPath, false, hadoopConf);
destFs.setReplication(destPath, replication);
destFs.setPermission(destPath, APP_FILE_PERMISSION);
return destPath;
}
@Override
public void copy(String srcPath, String destPath) {
try {
FileUtil.copy(new File(srcPath), this.hdfs, new Path(destPath),
false, this.conf);
} catch (IOException e) {
throw new RuntimeException(String.format(
"Failed to copy file '%s' to '%s'", srcPath, destPath));
}
}
/**
* Test that findContainingJar works correctly even if the
* path has a "+" sign or spaces in it
*/
@Test
public void testFindContainingJarWithPlus() throws Exception {
new File(TEST_DIR_WITH_SPECIAL_CHARS).mkdirs();
Configuration conf = new Configuration();
FileSystem localfs = FileSystem.getLocal(conf);
FileUtil.copy(localfs, new Path(JAR_RELATIVE_PATH),
localfs, new Path(TEST_DIR_WITH_SPECIAL_CHARS, "test.jar"),
false, true, conf);
testJarAtPath(TEST_DIR_WITH_SPECIAL_CHARS + File.separator + "test.jar");
}
private static void copyPath(FileSystem srcFs, Path src, FileSystem dstFs, Path dst, boolean deleteSource,
boolean overwrite, Configuration conf) throws IOException {
Preconditions.checkArgument(srcFs.exists(src),
String.format("Cannot copy from %s to %s because src does not exist", src, dst));
Preconditions.checkArgument(overwrite || !dstFs.exists(dst),
String.format("Cannot copy from %s to %s because dst exists", src, dst));
try {
boolean isSourceFileSystemLocal = srcFs instanceof LocalFileSystem || srcFs instanceof RawLocalFileSystem;
if (isSourceFileSystemLocal) {
try {
dstFs.copyFromLocalFile(deleteSource, overwrite, src, dst);
} catch (IOException e) {
throw new IOException(String.format("Failed to copy %s to %s", src, dst), e);
}
} else if (!FileUtil.copy(srcFs, src, dstFs, dst, deleteSource, overwrite, conf)) {
throw new IOException(String.format("Failed to copy %s to %s", src, dst));
}
} catch (Throwable t1) {
try {
deleteIfExists(dstFs, dst, true);
} catch (Throwable t2) {
// Do nothing
}
throw t1;
}
}
public static void copyFileOnHDFS(String originalDir, String newDir) throws IOException {
Path originalPath = new Path(originalDir);
Path newPath = new Path(newDir);
boolean deleteSource = false;
boolean overwrite = true;
JobConf job = new JobConf(ConfigurationManager.getCachedJobConf());
FileSystem fs = IOUtilFunctions.getFileSystem(originalPath, job);
if (fs.exists(originalPath)) {
FileUtil.copy(fs, originalPath, fs, newPath, deleteSource, overwrite, job);
}
}
public void copyInitialState(Path origAppDir) throws IOException
{
// locate previous snapshot
long copyStart = System.currentTimeMillis();
String newAppDir = this.dag.assertAppPath();
FSRecoveryHandler recoveryHandler = new FSRecoveryHandler(origAppDir.toString(), conf);
// read snapshot against new dependencies
Object snapshot = recoveryHandler.restore();
if (snapshot == null) {
throw new IllegalArgumentException("No previous application state found in " + origAppDir);
}
InputStream logIs = recoveryHandler.getLog();
// modify snapshot state to switch app id
((StreamingContainerManager.CheckpointState)snapshot).setApplicationId(this.dag, conf);
Path checkpointPath = new Path(newAppDir, LogicalPlan.SUBDIR_CHECKPOINTS);
FileSystem fs = FileSystem.newInstance(origAppDir.toUri(), conf);
// remove the path that was created by the storage agent during deserialization and replacement
fs.delete(checkpointPath, true);
// write snapshot to new location
recoveryHandler = new FSRecoveryHandler(newAppDir, conf);
recoveryHandler.save(snapshot);
OutputStream logOs = recoveryHandler.rotateLog();
IOUtils.copy(logIs, logOs);
logOs.flush();
logOs.close();
logIs.close();
List<String> excludeDirs = Arrays.asList(LogicalPlan.SUBDIR_CHECKPOINTS, LogicalPlan.SUBDIR_EVENTS, LogicalPlan.SUBDIR_STATS);
// copy sub directories that are not present in target
FileStatus[] lFiles = fs.listStatus(origAppDir);
// In case of MapR/MapR-FS, f.getPath().toString() returns path as maprfs:///<orig app dir>
// whereas origAppDir.toString & newAppDir are in maprfs:/<orig or new app dir> format
// e.g.
// f.getPath().toString -> maprfs:///user/dtadmin/datatorrent/apps/application_1481890072066_0004/checkpoints
// origAppDir -> maprfs:/user/dtadmin/datatorrent/apps/application_1481890072066_0004
// newAppDir -> maprfs:/user/dtadmin/datatorrent/apps/application_1481890072066_0005
String origAppDirPath = Path.getPathWithoutSchemeAndAuthority(origAppDir).toString();
String newAppDirPath = Path.getPathWithoutSchemeAndAuthority(new Path(newAppDir)).toString();
for (FileStatus f : lFiles) {
if (f.isDirectory() && !excludeDirs.contains(f.getPath().getName())) {
String targetPath = f.getPath().toString().replace(origAppDirPath, newAppDirPath);
if (!fs.exists(new Path(targetPath))) {
LOG.debug("Copying {} size {} to {}", f.getPath(), f.getLen(), targetPath);
long start = System.currentTimeMillis();
FileUtil.copy(fs, f.getPath(), fs, new Path(targetPath), false, conf);
LOG.debug("Copying {} to {} took {} ms", f.getPath(), f.getLen(), targetPath, System.currentTimeMillis() - start);
} else {
LOG.debug("Ignoring {} as it already exists under {}", f.getPath(), targetPath);
}
}
}
LOG.info("Copying initial state took {} ms", System.currentTimeMillis() - copyStart);
}
@Test
public void testDeleteWhenTempDirIsNotEmpty() throws Exception {
final TableName tableName = TableName.valueOf(name.getMethodName());
final String FAM = "fam";
final byte[][] splitKeys = new byte[][] {
Bytes.toBytes("b"), Bytes.toBytes("c"), Bytes.toBytes("d")
};
// create the table
MasterProcedureTestingUtility.createTable(
getMasterProcedureExecutor(), tableName, splitKeys, FAM);
// get the current store files for the regions
List<HRegion> regions = UTIL.getHBaseCluster().getRegions(tableName);
// make sure we have 4 regions serving this table
assertEquals(4, regions.size());
// load the table
try (Table table = UTIL.getConnection().getTable(tableName)) {
UTIL.loadTable(table, Bytes.toBytes(FAM));
}
// disable the table so that we can manipulate the files
UTIL.getAdmin().disableTable(tableName);
final MasterFileSystem masterFileSystem =
UTIL.getMiniHBaseCluster().getMaster().getMasterFileSystem();
final Path tableDir = CommonFSUtils.getTableDir(masterFileSystem.getRootDir(), tableName);
final Path tempDir = masterFileSystem.getTempDir();
final Path tempTableDir = CommonFSUtils.getTableDir(tempDir, tableName);
final FileSystem fs = masterFileSystem.getFileSystem();
// copy the table to the temporary directory to make sure the temp directory is not empty
if (!FileUtil.copy(fs, tableDir, fs, tempTableDir, false, UTIL.getConfiguration())) {
fail();
}
// delete the table
final ProcedureExecutor<MasterProcedureEnv> procExec = getMasterProcedureExecutor();
long procId = ProcedureTestingUtility.submitAndWait(procExec,
new DeleteTableProcedure(procExec.getEnvironment(), tableName));
ProcedureTestingUtility.assertProcNotFailed(procExec, procId);
MasterProcedureTestingUtility.validateTableDeletion(getMaster(), tableName);
// check if the temporary directory is deleted
assertFalse(fs.exists(tempTableDir));
// check for the existence of the archive directory
for (HRegion region : regions) {
Path archiveDir = HFileArchiveTestingUtil.getRegionArchiveDir(UTIL.getConfiguration(),
region);
assertTrue(fs.exists(archiveDir));
}
}
/**
* Test replicateEntries with a bulk load entry for 25 HFiles
*/
@Test
public void testReplicateEntriesForHFiles() throws Exception {
Path dir = TEST_UTIL.getDataTestDirOnTestFS("testReplicateEntries");
Path familyDir = new Path(dir, Bytes.toString(FAM_NAME1));
int numRows = 10;
List<Path> p = new ArrayList<>(1);
final String hfilePrefix = "hfile-";
// 1. Generate 25 hfile ranges
Random rng = new SecureRandom();
Set<Integer> numbers = new HashSet<>();
while (numbers.size() < 50) {
numbers.add(rng.nextInt(1000));
}
List<Integer> numberList = new ArrayList<>(numbers);
Collections.sort(numberList);
Map<String, Long> storeFilesSize = new HashMap<>(1);
// 2. Create 25 hfiles
Configuration conf = TEST_UTIL.getConfiguration();
FileSystem fs = dir.getFileSystem(conf);
Iterator<Integer> numbersItr = numberList.iterator();
for (int i = 0; i < 25; i++) {
Path hfilePath = new Path(familyDir, hfilePrefix + i);
HFileTestUtil.createHFile(conf, fs, hfilePath, FAM_NAME1, FAM_NAME1,
Bytes.toBytes(numbersItr.next()), Bytes.toBytes(numbersItr.next()), numRows);
p.add(hfilePath);
storeFilesSize.put(hfilePath.getName(), fs.getFileStatus(hfilePath).getLen());
}
// 3. Create a BulkLoadDescriptor and a WALEdit
Map<byte[], List<Path>> storeFiles = new HashMap<>(1);
storeFiles.put(FAM_NAME1, p);
org.apache.hadoop.hbase.wal.WALEdit edit = null;
WALProtos.BulkLoadDescriptor loadDescriptor = null;
try (Connection c = ConnectionFactory.createConnection(conf);
RegionLocator l = c.getRegionLocator(TABLE_NAME1)) {
RegionInfo regionInfo = l.getAllRegionLocations().get(0).getRegion();
loadDescriptor =
ProtobufUtil.toBulkLoadDescriptor(TABLE_NAME1,
UnsafeByteOperations.unsafeWrap(regionInfo.getEncodedNameAsBytes()),
storeFiles, storeFilesSize, 1);
edit = org.apache.hadoop.hbase.wal.WALEdit.createBulkLoadEvent(regionInfo,
loadDescriptor);
}
List<WALEntry> entries = new ArrayList<>(1);
// 4. Create a WALEntryBuilder
WALEntry.Builder builder = createWALEntryBuilder(TABLE_NAME1);
// 5. Copy the hfile to the path as it is in reality
for (int i = 0; i < 25; i++) {
String pathToHfileFromNS =
new StringBuilder(100).append(TABLE_NAME1.getNamespaceAsString()).append(Path.SEPARATOR)
.append(Bytes.toString(TABLE_NAME1.getName())).append(Path.SEPARATOR)
.append(Bytes.toString(loadDescriptor.getEncodedRegionName().toByteArray()))
.append(Path.SEPARATOR).append(Bytes.toString(FAM_NAME1)).append(Path.SEPARATOR)
.append(hfilePrefix + i).toString();
String dst = baseNamespaceDir + Path.SEPARATOR + pathToHfileFromNS;
Path dstPath = new Path(dst);
FileUtil.copy(fs, p.get(0), fs, dstPath, false, conf);
}
entries.add(builder.build());
try (ResultScanner scanner = table1.getScanner(new Scan())) {
// 6. Assert no existing data in table
assertEquals(0, scanner.next(numRows).length);
}
// 7. Replicate the bulk loaded entry
SINK.replicateEntries(entries, CellUtil.createCellScanner(edit.getCells().iterator()),
replicationClusterId, baseNamespaceDir, hfileArchiveDir);
try (ResultScanner scanner = table1.getScanner(new Scan())) {
// 8. Assert data is replicated
assertEquals(numRows, scanner.next(numRows).length);
}
// Clean up the created hfiles or it will mess up subsequent tests
}
@Test (timeout = 30000)
public void testNewNamenodeTakesOverWriter() throws Exception {
File nn1Dir = new File(
MiniDFSCluster.getBaseDirectory() + "/TestNNWithQJM/image-nn1");
File nn2Dir = new File(
MiniDFSCluster.getBaseDirectory() + "/TestNNWithQJM/image-nn2");
conf.set(DFSConfigKeys.DFS_NAMENODE_NAME_DIR_KEY,
nn1Dir.getAbsolutePath());
conf.set(DFSConfigKeys.DFS_NAMENODE_EDITS_DIR_KEY,
mjc.getQuorumJournalURI("myjournal").toString());
// Start the cluster once to generate the dfs dirs
MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf)
.numDataNodes(0)
.manageNameDfsDirs(false)
.checkExitOnShutdown(false)
.build();
// Shutdown the cluster before making a copy of the namenode dir
// to release all file locks, otherwise, the copy will fail on
// some platforms.
cluster.shutdown();
try {
// Start a second NN pointed to the same quorum.
// We need to copy the image dir from the first NN -- or else
// the new NN will just be rejected because of Namespace mismatch.
FileUtil.fullyDelete(nn2Dir);
FileUtil.copy(nn1Dir, FileSystem.getLocal(conf).getRaw(),
new Path(nn2Dir.getAbsolutePath()), false, conf);
// Start the cluster again
cluster = new MiniDFSCluster.Builder(conf)
.numDataNodes(0)
.format(false)
.manageNameDfsDirs(false)
.checkExitOnShutdown(false)
.build();
cluster.getFileSystem().mkdirs(TEST_PATH);
Configuration conf2 = new Configuration();
conf2.set(DFSConfigKeys.DFS_NAMENODE_NAME_DIR_KEY,
nn2Dir.getAbsolutePath());
conf2.set(DFSConfigKeys.DFS_NAMENODE_EDITS_DIR_KEY,
mjc.getQuorumJournalURI("myjournal").toString());
MiniDFSCluster cluster2 = new MiniDFSCluster.Builder(conf2)
.numDataNodes(0)
.format(false)
.manageNameDfsDirs(false)
.build();
// Check that the new cluster sees the edits made on the old cluster
try {
assertTrue(cluster2.getFileSystem().exists(TEST_PATH));
} finally {
cluster2.shutdown();
}
// Check that, if we try to write to the old NN
// that it aborts.
try {
cluster.getFileSystem().mkdirs(new Path("/x"));
fail("Did not abort trying to write to a fenced NN");
} catch (RemoteException re) {
GenericTestUtils.assertExceptionContains(
"Could not sync enough journals to persistent storage", re);
}
} finally {
//cluster.shutdown();
}
}