下面列出了org.apache.hadoop.fs.FileUtil#symLink ( ) 实例代码,或者点击链接到github查看源代码,也可以在右侧发表评论。
/**
* Utility method for creating a symlink and warning on errors.
*
* If link is null, does nothing.
*/
private void symlink(File workDir, String target, String link)
throws IOException {
if (link != null) {
link = workDir.toString() + Path.SEPARATOR + link;
File flink = new File(link);
if (!flink.exists()) {
LOG.info(String.format("Creating symlink: %s <- %s", target, link));
if (0 != FileUtil.symLink(target, link)) {
LOG.warn(String.format("Failed to create symlink: %s <- %s", target,
link));
} else {
symlinksCreated.add(new File(link));
}
}
}
}
/**
* Utility method for creating a symlink and warning on errors.
*
* If link is null, does nothing.
*/
private void symlink(File workDir, String target, String link)
throws IOException {
if (link != null) {
link = workDir.toString() + Path.SEPARATOR + link;
File flink = new File(link);
if (!flink.exists()) {
LOG.info(String.format("Creating symlink: %s <- %s", target, link));
if (0 != FileUtil.symLink(target, link)) {
LOG.warn(String.format("Failed to create symlink: %s <- %s", target,
link));
} else {
symlinksCreated.add(new File(link));
}
}
}
}
/**
* Create symlinks for the files needed for the jobs in current directory
* @param job
* @throws IOException
*/
private void symLinkAndConfigureFiles(JobConf job) throws IOException {
if (!(job.getBoolean("mapred.used.genericoptionsparser", false))) {
LOG.warn("Use GenericOptionsParser for parsing the arguments. " +
"Applications should implement Tool for the same.");
}
// get all the command line arguments into the
// jobconf passed in by the user conf
String files = job.get("tmpfiles");
String archives = job.get("tmparchives");
// "tmpjars" are not needed because its in the classpath
List<String> filesToSymLink = new ArrayList<String>();
splitAndAdd(files, filesToSymLink);
splitAndAdd(archives, filesToSymLink);
for (String file : filesToSymLink) {
String target = new Path(file).toUri().getPath();
String basename = new File(target).getName();
String linkName = new File(".").getAbsolutePath() + File.separator + basename;
File toLink = new File(linkName);
if (toLink.exists()) {
LOG.info("Symlink " + linkName + " already exists. Delete it.");
toLink.delete();
}
int ret = FileUtil.symLink(target, linkName);
LOG.info("Creating symlink " + linkName + " -> " + target +
" returns " + ret + ".");
}
// Configure job name
String originalJar = job.getJar();
if (originalJar != null) {
// use jar name if job is not named.
if ("".equals(job.getJobName())) {
job.setJobName(new Path(originalJar).getName());
}
}
// Configure username
configureUserName(job);
}
private void localizeTask(Task task) throws IOException{
Path localTaskDir =
lDirAlloc.getLocalPathForWrite(
TaskTracker.getLocalTaskDir(task.getJobID().toString(),
task.getTaskID().toString(), task.isTaskCleanupTask()),
defaultJobConf );
FileSystem localFs = FileSystem.getLocal(fConf);
if (!localFs.mkdirs(localTaskDir)) {
throw new IOException("Mkdirs failed to create "
+ localTaskDir.toString());
}
// create symlink for ../work if it already doesnt exist
String workDir = lDirAlloc.getLocalPathToRead(
TaskTracker.getLocalJobDir(task.getJobID().toString())
+ Path.SEPARATOR
+ "work", defaultJobConf).toString();
String link = localTaskDir.getParent().toString()
+ Path.SEPARATOR + "work";
File flink = new File(link);
if (!flink.exists())
FileUtil.symLink(workDir, link);
// create the working-directory of the task
Path cwd = lDirAlloc.getLocalPathForWrite(
getLocalTaskDir(task.getJobID().toString(),
task.getTaskID().toString(), task.isTaskCleanupTask())
+ Path.SEPARATOR + MRConstants.WORKDIR,
defaultJobConf);
if (!localFs.mkdirs(cwd)) {
throw new IOException("Mkdirs failed to create "
+ cwd.toString());
}
Path localTaskFile = new Path(localTaskDir, "job.xml");
task.setJobFile(localTaskFile.toString());
localJobConf.set("mapred.local.dir",
fConf.get("mapred.local.dir"));
if (fConf.get("slave.host.name") != null) {
localJobConf.set("slave.host.name",
fConf.get("slave.host.name"));
}
localJobConf.set("mapred.task.id", task.getTaskID().toString());
keepFailedTaskFiles = localJobConf.getKeepFailedTaskFiles();
task.localizeConfiguration(localJobConf);
Task.saveStaticResolutions(localJobConf);
if (task.isMapTask()) {
debugCommand = localJobConf.getMapDebugScript();
} else {
debugCommand = localJobConf.getReduceDebugScript();
}
String keepPattern = localJobConf.getKeepTaskFilesPattern();
if (keepPattern != null) {
alwaysKeepTaskFiles =
Pattern.matches(keepPattern, task.getTaskID().toString());
} else {
alwaysKeepTaskFiles = false;
}
if (debugCommand != null || localJobConf.getProfileEnabled() ||
alwaysKeepTaskFiles || keepFailedTaskFiles) {
//disable jvm reuse
localJobConf.setNumTasksToExecutePerJvm(1);
}
if (isTaskMemoryManagerEnabled()) {
localJobConf.setBoolean("task.memory.mgmt.enabled", true);
}
OutputStream out = localFs.create(localTaskFile);
try {
localJobConf.writeXml(out);
} finally {
out.close();
}
task.setConf(localJobConf);
}
private void localizeTask(Task task) throws IOException{
Path localTaskDir =
lDirAlloc.getLocalPathForWrite(
TaskTracker.getLocalTaskDir(task.getJobID().toString(),
task.getTaskID().toString(), task.isTaskCleanupTask()),
defaultJobConf );
FileSystem localFs = FileSystem.getLocal(fConf);
if (!localFs.mkdirs(localTaskDir)) {
throw new IOException("Mkdirs failed to create "
+ localTaskDir.toString());
}
// create symlink for ../work if it already doesnt exist
String workDir = lDirAlloc.getLocalPathToRead(
TaskTracker.getLocalJobDir(task.getJobID().toString())
+ Path.SEPARATOR
+ "work", defaultJobConf).toString();
String link = localTaskDir.getParent().toString()
+ Path.SEPARATOR + "work";
File flink = new File(link);
if (!flink.exists())
FileUtil.symLink(workDir, link);
// create the working-directory of the task
Path cwd = lDirAlloc.getLocalPathForWrite(
getLocalTaskDir(task.getJobID().toString(),
task.getTaskID().toString(), task.isTaskCleanupTask())
+ Path.SEPARATOR + MRConstants.WORKDIR,
defaultJobConf);
if (!localFs.mkdirs(cwd)) {
throw new IOException("Mkdirs failed to create "
+ cwd.toString());
}
Path localTaskFile = new Path(localTaskDir, "job.xml");
task.setJobFile(localTaskFile.toString());
localJobConf.set("mapred.local.dir",
fConf.get("mapred.local.dir"));
if (fConf.get("slave.host.name") != null) {
localJobConf.set("slave.host.name",
fConf.get("slave.host.name"));
}
localJobConf.set("mapred.task.id", task.getTaskID().toString());
keepFailedTaskFiles = localJobConf.getKeepFailedTaskFiles();
task.localizeConfiguration(localJobConf);
List<String[]> staticResolutions = NetUtils.getAllStaticResolutions();
if (staticResolutions != null && staticResolutions.size() > 0) {
StringBuffer str = new StringBuffer();
for (int i = 0; i < staticResolutions.size(); i++) {
String[] hostToResolved = staticResolutions.get(i);
str.append(hostToResolved[0]+"="+hostToResolved[1]);
if (i != staticResolutions.size() - 1) {
str.append(',');
}
}
localJobConf.set("hadoop.net.static.resolutions", str.toString());
}
if (task.isMapTask()) {
debugCommand = localJobConf.getMapDebugScript();
} else {
debugCommand = localJobConf.getReduceDebugScript();
}
String keepPattern = localJobConf.getKeepTaskFilesPattern();
if (keepPattern != null) {
alwaysKeepTaskFiles =
Pattern.matches(keepPattern, task.getTaskID().toString());
} else {
alwaysKeepTaskFiles = false;
}
if (debugCommand != null || localJobConf.getProfileEnabled() ||
alwaysKeepTaskFiles || keepFailedTaskFiles) {
//disable jvm reuse
localJobConf.setNumTasksToExecutePerJvm(1);
}
if (isTaskMemoryManagerEnabled()) {
localJobConf.setBoolean("task.memory.mgmt.enabled", true);
}
OutputStream out = localFs.create(localTaskFile);
try {
localJobConf.writeXml(out);
} finally {
out.close();
}
task.setConf(localJobConf);
}