下面列出了org.apache.hadoop.fs.FileContext#delete ( ) 实例代码,或者点击链接到github查看源代码,也可以在右侧发表评论。
@Override
public void stop() {
if (yarnCluster != null) {
try {
yarnCluster.stop();
} finally {
yarnCluster = null;
}
}
try {
FileContext fsContext = FileContext.getLocalFSFileContext();
fsContext
.delete(
new Path(conf
.get("yarn.timeline-service.leveldb-timeline-store.path")),
true);
}catch(Exception e){}
}
private boolean deleteDagDirectories(MessageEvent evt,
List<String> dagCompletedQ, List<String> jobQ,
List<String> dagIdQ) {
if (jobQ == null || jobQ.isEmpty()) {
return false;
}
if (dagCompletedQ != null && !dagCompletedQ.isEmpty() && dagCompletedQ.get(0).contains("delete")
&& dagIdQ != null && !dagIdQ.isEmpty()) {
String base = getDagLocation(jobQ.get(0), dagIdQ.get(0), userRsrc.get(jobQ.get(0)));
try {
FileContext lfc = FileContext.getLocalFSFileContext();
for(Path dagPath : lDirAlloc.getAllLocalPathsToRead(base, conf)) {
lfc.delete(dagPath, true);
}
} catch (IOException e) {
LOG.warn("Encountered exception during dag delete "+ e);
}
evt.getChannel().write(new DefaultHttpResponse(HTTP_1_1, OK));
evt.getChannel().close();
return true;
}
return false;
}
@After
public void tearDown() throws IOException {
if (yarnCluster != null) {
try {
yarnCluster.stop();
} finally {
yarnCluster = null;
}
}
FileContext fsContext = FileContext.getLocalFSFileContext();
fsContext
.delete(
new Path(conf
.get("yarn.timeline-service.leveldb-timeline-store.path")),
true);
}
/**
* Create simple log file
*
* @return
* @throws IOException
*/
private Path createLogFile() throws IOException {
FileContext files = FileContext.getLocalFSFileContext();
Path ws = new Path(workSpace.getAbsoluteFile().getAbsolutePath());
files.delete(ws, true);
Path workSpacePath = new Path(workSpace.getAbsolutePath(), "log");
files.mkdir(workSpacePath, null, true);
LOG.info("create logfile.log");
Path logfile1 = new Path(workSpacePath, "logfile.log");
FSDataOutputStream os = files.create(logfile1,
EnumSet.of(CreateFlag.CREATE));
os.writeBytes("4 3" + EL + "1 3" + EL + "4 44" + EL);
os.writeBytes("2 3" + EL + "1 3" + EL + "0 45" + EL);
os.writeBytes("4 3" + EL + "1 3" + EL + "1 44" + EL);
os.flush();
os.close();
LOG.info("create logfile1.log");
Path logfile2 = new Path(workSpacePath, "logfile1.log");
os = files.create(logfile2, EnumSet.of(CreateFlag.CREATE));
os.writeBytes("4 3" + EL + "1 3" + EL + "3 44" + EL);
os.writeBytes("2 3" + EL + "1 3" + EL + "0 45" + EL);
os.writeBytes("4 3" + EL + "1 3" + EL + "1 44" + EL);
os.flush();
os.close();
return workSpacePath;
}
static public FileContext setupForViewFsLocalFs(FileContextTestHelper helper) throws Exception {
/**
* create the test root on local_fs - the mount table will point here
*/
FileContext fsTarget = FileContext.getLocalFSFileContext();
Path targetOfTests = helper.getTestRootPath(fsTarget);
// In case previous test was killed before cleanup
fsTarget.delete(targetOfTests, true);
fsTarget.mkdir(targetOfTests, FileContext.DEFAULT_PERM, true);
Configuration conf = new Configuration();
// Set up viewfs link for test dir as described above
String testDir = helper.getTestRootPath(fsTarget).toUri()
.getPath();
linkUpFirstComponents(conf, testDir, fsTarget, "test dir");
// Set up viewfs link for home dir as described above
setUpHomeDir(conf, fsTarget);
// the test path may be relative to working dir - we need to make that work:
// Set up viewfs link for wd as described above
String wdDir = fsTarget.getWorkingDirectory().toUri().getPath();
linkUpFirstComponents(conf, wdDir, fsTarget, "working dir");
FileContext fc = FileContext.getFileContext(FsConstants.VIEWFS_URI, conf);
fc.setWorkingDirectory(new Path(wdDir)); // in case testdir relative to wd.
Log.info("Working dir is: " + fc.getWorkingDirectory());
//System.out.println("SRCOfTests = "+ getTestRootPath(fc, "test"));
//System.out.println("TargetOfTests = "+ targetOfTests.toUri());
return fc;
}
/**
* Create simple log file
*
* @return
* @throws IOException
*/
private Path createLogFile() throws IOException {
FileContext files = FileContext.getLocalFSFileContext();
Path ws = new Path(workSpace.getAbsoluteFile().getAbsolutePath());
files.delete(ws, true);
Path workSpacePath = new Path(workSpace.getAbsolutePath(), "log");
files.mkdir(workSpacePath, null, true);
LOG.info("create logfile.log");
Path logfile1 = new Path(workSpacePath, "logfile.log");
FSDataOutputStream os = files.create(logfile1,
EnumSet.of(CreateFlag.CREATE));
os.writeBytes("4 3" + EL + "1 3" + EL + "4 44" + EL);
os.writeBytes("2 3" + EL + "1 3" + EL + "0 45" + EL);
os.writeBytes("4 3" + EL + "1 3" + EL + "1 44" + EL);
os.flush();
os.close();
LOG.info("create logfile1.log");
Path logfile2 = new Path(workSpacePath, "logfile1.log");
os = files.create(logfile2, EnumSet.of(CreateFlag.CREATE));
os.writeBytes("4 3" + EL + "1 3" + EL + "3 44" + EL);
os.writeBytes("2 3" + EL + "1 3" + EL + "0 45" + EL);
os.writeBytes("4 3" + EL + "1 3" + EL + "1 44" + EL);
os.flush();
os.close();
return workSpacePath;
}
static public FileContext setupForViewFsLocalFs(FileContextTestHelper helper) throws Exception {
/**
* create the test root on local_fs - the mount table will point here
*/
FileContext fsTarget = FileContext.getLocalFSFileContext();
Path targetOfTests = helper.getTestRootPath(fsTarget);
// In case previous test was killed before cleanup
fsTarget.delete(targetOfTests, true);
fsTarget.mkdir(targetOfTests, FileContext.DEFAULT_PERM, true);
Configuration conf = new Configuration();
// Set up viewfs link for test dir as described above
String testDir = helper.getTestRootPath(fsTarget).toUri()
.getPath();
linkUpFirstComponents(conf, testDir, fsTarget, "test dir");
// Set up viewfs link for home dir as described above
setUpHomeDir(conf, fsTarget);
// the test path may be relative to working dir - we need to make that work:
// Set up viewfs link for wd as described above
String wdDir = fsTarget.getWorkingDirectory().toUri().getPath();
linkUpFirstComponents(conf, wdDir, fsTarget, "working dir");
FileContext fc = FileContext.getFileContext(FsConstants.VIEWFS_URI, conf);
fc.setWorkingDirectory(new Path(wdDir)); // in case testdir relative to wd.
Log.info("Working dir is: " + fc.getWorkingDirectory());
//System.out.println("SRCOfTests = "+ getTestRootPath(fc, "test"));
//System.out.println("TargetOfTests = "+ targetOfTests.toUri());
return fc;
}
public void close() throws IOException {
for (File symlink : symlinksCreated) {
if (!symlink.delete()) {
LOG.warn("Failed to delete symlink created by the local job runner: " +
symlink);
}
}
FileContext localFSFileContext = FileContext.getLocalFSFileContext();
for (String archive : localArchives) {
localFSFileContext.delete(new Path(archive), true);
}
for (String file : localFiles) {
localFSFileContext.delete(new Path(file), true);
}
}
private void clearDir(FileContext fc, Path p) throws IOException {
try {
fc.delete(p, true);
} catch (FileNotFoundException e) {
// ignore
}
fc.mkdir(p, FsPermission.getDirDefault(), false);
}
@Override
public Object restore() throws IOException
{
FileContext fc = FileContext.getFileContext(fs.getUri());
// recover from wherever it was left
if (fc.util().exists(snapshotBackupPath)) {
LOG.warn("Incomplete checkpoint, reverting to {}", snapshotBackupPath);
fc.rename(snapshotBackupPath, snapshotPath, Rename.OVERWRITE);
// combine logs (w/o append, create new file)
Path tmpLogPath = new Path(basedir, "log.combined");
try (FSDataOutputStream fsOut = fc.create(tmpLogPath, EnumSet.of(CreateFlag.CREATE, CreateFlag.OVERWRITE))) {
try (FSDataInputStream fsIn = fc.open(logBackupPath)) {
IOUtils.copy(fsIn, fsOut);
}
try (FSDataInputStream fsIn = fc.open(logPath)) {
IOUtils.copy(fsIn, fsOut);
}
}
fc.rename(tmpLogPath, logPath, Rename.OVERWRITE);
fc.delete(logBackupPath, false);
} else {
// we have log backup, but no checkpoint backup
// failure between log rotation and writing checkpoint
if (fc.util().exists(logBackupPath)) {
LOG.warn("Found {}, did checkpointing fail?", logBackupPath);
fc.rename(logBackupPath, logPath, Rename.OVERWRITE);
}
}
if (!fc.util().exists(snapshotPath)) {
LOG.debug("No existing checkpoint.");
return null;
}
LOG.debug("Reading checkpoint {}", snapshotPath);
InputStream is = fc.open(snapshotPath);
// indeterministic class loading behavior
// http://stackoverflow.com/questions/9110677/readresolve-not-working-an-instance-of-guavas-serializedform-appears
final ClassLoader loader = Thread.currentThread().getContextClassLoader();
try (ObjectInputStream ois = new ObjectInputStream(is)
{
@Override
protected Class<?> resolveClass(ObjectStreamClass objectStreamClass)
throws IOException, ClassNotFoundException
{
return Class.forName(objectStreamClass.getName(), true, loader);
}
}) {
return ois.readObject();
} catch (ClassNotFoundException cnfe) {
throw new IOException("Failed to read checkpointed state", cnfe);
}
}
@After
public void deleteBaseDir() throws IOException {
FileContext lfs = FileContext.getLocalFSFileContext();
lfs.delete(new Path(basedir.getPath()), true);
}
/**
*
* delete the test directory in the target local fs
*/
static public void tearDownForViewFsLocalFs(FileContextTestHelper helper) throws Exception {
FileContext fclocal = FileContext.getLocalFSFileContext();
Path targetOfTests = helper.getTestRootPath(fclocal);
fclocal.delete(targetOfTests, true);
}
@Override
public void serviceInit(Configuration conf) throws Exception {
conf.set(MRConfig.FRAMEWORK_NAME, MRConfig.YARN_FRAMEWORK_NAME);
if (conf.get(MRJobConfig.MR_AM_STAGING_DIR) == null) {
conf.set(MRJobConfig.MR_AM_STAGING_DIR, new File(getTestWorkDir(),
"apps_staging_dir/").getAbsolutePath());
}
// By default, VMEM monitoring disabled, PMEM monitoring enabled.
if (!conf.getBoolean(
MRConfig.MAPREDUCE_MINICLUSTER_CONTROL_RESOURCE_MONITORING,
MRConfig.DEFAULT_MAPREDUCE_MINICLUSTER_CONTROL_RESOURCE_MONITORING)) {
conf.setBoolean(YarnConfiguration.NM_PMEM_CHECK_ENABLED, false);
conf.setBoolean(YarnConfiguration.NM_VMEM_CHECK_ENABLED, false);
}
conf.set(CommonConfigurationKeys.FS_PERMISSIONS_UMASK_KEY, "000");
try {
Path stagingPath = FileContext.getFileContext(conf).makeQualified(
new Path(conf.get(MRJobConfig.MR_AM_STAGING_DIR)));
/*
* Re-configure the staging path on Windows if the file system is localFs.
* We need to use a absolute path that contains the drive letter. The unit
* test could run on a different drive than the AM. We can run into the
* issue that job files are localized to the drive where the test runs on,
* while the AM starts on a different drive and fails to find the job
* metafiles. Using absolute path can avoid this ambiguity.
*/
if (Path.WINDOWS) {
if (LocalFileSystem.class.isInstance(stagingPath.getFileSystem(conf))) {
conf.set(MRJobConfig.MR_AM_STAGING_DIR,
new File(conf.get(MRJobConfig.MR_AM_STAGING_DIR))
.getAbsolutePath());
}
}
FileContext fc=FileContext.getFileContext(stagingPath.toUri(), conf);
if (fc.util().exists(stagingPath)) {
LOG.info(stagingPath + " exists! deleting...");
fc.delete(stagingPath, true);
}
LOG.info("mkdir: " + stagingPath);
//mkdir the staging directory so that right permissions are set while running as proxy user
fc.mkdir(stagingPath, null, true);
//mkdir done directory as well
String doneDir = JobHistoryUtils.getConfiguredHistoryServerDoneDirPrefix(conf);
Path doneDirPath = fc.makeQualified(new Path(doneDir));
fc.mkdir(doneDirPath, null, true);
} catch (IOException e) {
throw new YarnRuntimeException("Could not create staging directory. ", e);
}
conf.set(MRConfig.MASTER_ADDRESS, "test"); // The default is local because of
// which shuffle doesn't happen
//configure the shuffle service in NM
conf.setStrings(YarnConfiguration.NM_AUX_SERVICES,
new String[] { ShuffleHandler.MAPREDUCE_SHUFFLE_SERVICEID });
conf.setClass(String.format(YarnConfiguration.NM_AUX_SERVICE_FMT,
ShuffleHandler.MAPREDUCE_SHUFFLE_SERVICEID), ShuffleHandler.class,
Service.class);
// Non-standard shuffle port
conf.setInt(ShuffleHandler.SHUFFLE_PORT_CONFIG_KEY, 0);
conf.setClass(YarnConfiguration.NM_CONTAINER_EXECUTOR,
DefaultContainerExecutor.class, ContainerExecutor.class);
// TestMRJobs is for testing non-uberized operation only; see TestUberAM
// for corresponding uberized tests.
conf.setBoolean(MRJobConfig.JOB_UBERTASK_ENABLE, false);
super.serviceInit(conf);
}
@Override
public void init(Configuration conf) {
conf.setSocketAddr(YarnConfiguration.RM_ADDRESS, new InetSocketAddress("127.0.0.1", 0));
conf.setSocketAddr(YarnConfiguration.RM_SCHEDULER_ADDRESS, new InetSocketAddress("127.0.0.1", 0));
conf.set(MRConfig.FRAMEWORK_NAME, MRConfig.YARN_FRAMEWORK_NAME);
if (conf.get(MRJobConfig.MR_AM_STAGING_DIR) == null) {
conf.set(MRJobConfig.MR_AM_STAGING_DIR, new File(getTestWorkDir(),
"apps_staging_dir/").getAbsolutePath());
}
conf.set(CommonConfigurationKeys.FS_PERMISSIONS_UMASK_KEY, "000");
try {
Path stagingPath = FileContext.getFileContext(conf).makeQualified(
new Path(conf.get(MRJobConfig.MR_AM_STAGING_DIR)));
FileContext fc=FileContext.getFileContext(stagingPath.toUri(), conf);
if (fc.util().exists(stagingPath)) {
LOG.info(stagingPath + " exists! deleting...");
fc.delete(stagingPath, true);
}
LOG.info("mkdir: " + stagingPath);
//mkdir the staging directory so that right permissions are set while running as proxy user
fc.mkdir(stagingPath, null, true);
//mkdir done directory as well
String doneDir = JobHistoryUtils
.getConfiguredHistoryServerDoneDirPrefix(conf);
Path doneDirPath = fc.makeQualified(new Path(doneDir));
fc.mkdir(doneDirPath, null, true);
} catch (IOException e) {
throw new YarnRuntimeException("Could not create staging directory. ", e);
}
conf.set(MRConfig.MASTER_ADDRESS, "test"); // The default is local because of
// which shuffle doesn't happen
//configure the shuffle service in NM
conf.setStrings(YarnConfiguration.NM_AUX_SERVICES, PullServerAuxService.PULLSERVER_SERVICEID);
conf.setClass(String.format(YarnConfiguration.NM_AUX_SERVICE_FMT,
PullServerAuxService.PULLSERVER_SERVICEID), PullServerAuxService.class,
Service.class);
// Non-standard shuffle port
conf.setInt(TajoConf.ConfVars.PULLSERVER_PORT.name(), 0);
// local directory
conf.set(TajoConf.ConfVars.WORKER_TEMPORAL_DIR.name(), "/tmp/tajo-localdir");
conf.setClass(YarnConfiguration.NM_CONTAINER_EXECUTOR,
DefaultContainerExecutor.class, ContainerExecutor.class);
// TestMRJobs is for testing non-uberized operation only; see TestUberAM
// for corresponding uberized tests.
conf.setBoolean(MRJobConfig.JOB_UBERTASK_ENABLE, false);
conf.setInt("yarn.nodemanager.delete.debug-delay-sec", 600);
super.init(conf);
}
/**
*
* delete the test directory in the target local fs
*/
static public void tearDownForViewFsLocalFs(FileContextTestHelper helper) throws Exception {
FileContext fclocal = FileContext.getLocalFSFileContext();
Path targetOfTests = helper.getTestRootPath(fclocal);
fclocal.delete(targetOfTests, true);
}
@AfterClass
public static void deleteTestDir() throws IOException {
FileContext fs = FileContext.getLocalFSFileContext();
fs.delete(new Path("target", TestFSDownload.class.getSimpleName()), true);
}
@After
public void deleteBaseDir() throws IOException {
FileContext lfs = FileContext.getLocalFSFileContext();
lfs.delete(new Path(basedir.getPath()), true);
}
@Override
public Object restore() throws IOException
{
FileContext fc = FileContext.getFileContext(fs.getUri());
// recover from wherever it was left
if (fc.util().exists(snapshotBackupPath)) {
LOG.warn("Incomplete checkpoint, reverting to {}", snapshotBackupPath);
fc.rename(snapshotBackupPath, snapshotPath, Rename.OVERWRITE);
// combine logs (w/o append, create new file)
Path tmpLogPath = new Path(basedir, "log.combined");
try (FSDataOutputStream fsOut = fc.create(tmpLogPath, EnumSet.of(CreateFlag.CREATE, CreateFlag.OVERWRITE))) {
try (FSDataInputStream fsIn = fc.open(logBackupPath)) {
IOUtils.copy(fsIn, fsOut);
}
try (FSDataInputStream fsIn = fc.open(logPath)) {
IOUtils.copy(fsIn, fsOut);
}
}
fc.rename(tmpLogPath, logPath, Rename.OVERWRITE);
fc.delete(logBackupPath, false);
} else {
// we have log backup, but no checkpoint backup
// failure between log rotation and writing checkpoint
if (fc.util().exists(logBackupPath)) {
LOG.warn("Found {}, did checkpointing fail?", logBackupPath);
fc.rename(logBackupPath, logPath, Rename.OVERWRITE);
}
}
if (!fc.util().exists(snapshotPath)) {
LOG.debug("No existing checkpoint.");
return null;
}
LOG.debug("Reading checkpoint {}", snapshotPath);
InputStream is = fc.open(snapshotPath);
// indeterministic class loading behavior
// http://stackoverflow.com/questions/9110677/readresolve-not-working-an-instance-of-guavas-serializedform-appears
final ClassLoader loader = Thread.currentThread().getContextClassLoader();
try (ObjectInputStream ois = new ObjectInputStream(is)
{
@Override
protected Class<?> resolveClass(ObjectStreamClass objectStreamClass)
throws IOException, ClassNotFoundException
{
return Class.forName(objectStreamClass.getName(), true, loader);
}
}) {
return ois.readObject();
} catch (ClassNotFoundException cnfe) {
throw new IOException("Failed to read checkpointed state", cnfe);
}
}
@Override
public void serviceInit(Configuration conf) throws Exception {
conf.set(MRConfig.FRAMEWORK_NAME, MRConfig.YARN_TEZ_FRAMEWORK_NAME);
// blacklisting disabled to prevent scheduling issues
conf.setBoolean(TezConfiguration.TEZ_AM_NODE_BLACKLISTING_ENABLED, false);
if (conf.get(MRJobConfig.MR_AM_STAGING_DIR) == null) {
conf.set(MRJobConfig.MR_AM_STAGING_DIR, new File(getTestWorkDir(),
"apps_staging_dir" + Path.SEPARATOR).getAbsolutePath());
}
if (conf.get(YarnConfiguration.DEBUG_NM_DELETE_DELAY_SEC) == null) {
// nothing defined. set quick delete value
conf.setLong(YarnConfiguration.DEBUG_NM_DELETE_DELAY_SEC, 0l);
}
File appJarLocalFile = new File(MiniTezCluster.APPJAR);
if (!appJarLocalFile.exists()) {
String message = "TezAppJar " + MiniTezCluster.APPJAR
+ " not found. Exiting.";
LOG.info(message);
throw new TezUncheckedException(message);
}
FileSystem fs = FileSystem.get(conf);
Path testRootDir = fs.makeQualified(new Path("target", getName() + "-tmpDir"));
Path appRemoteJar = new Path(testRootDir, "TezAppJar.jar");
// Copy AppJar and make it public.
Path appMasterJar = new Path(MiniTezCluster.APPJAR);
fs.copyFromLocalFile(appMasterJar, appRemoteJar);
fs.setPermission(appRemoteJar, new FsPermission("777"));
conf.set(TezConfiguration.TEZ_LIB_URIS, appRemoteJar.toUri().toString());
LOG.info("Set TEZ-LIB-URI to: " + conf.get(TezConfiguration.TEZ_LIB_URIS));
// VMEM monitoring disabled, PMEM monitoring enabled.
conf.setBoolean(YarnConfiguration.NM_PMEM_CHECK_ENABLED, false);
conf.setBoolean(YarnConfiguration.NM_VMEM_CHECK_ENABLED, false);
conf.set(CommonConfigurationKeys.FS_PERMISSIONS_UMASK_KEY, "000");
try {
Path stagingPath = FileContext.getFileContext(conf).makeQualified(
new Path(conf.get(MRJobConfig.MR_AM_STAGING_DIR)));
/*
* Re-configure the staging path on Windows if the file system is localFs.
* We need to use a absolute path that contains the drive letter. The unit
* test could run on a different drive than the AM. We can run into the
* issue that job files are localized to the drive where the test runs on,
* while the AM starts on a different drive and fails to find the job
* metafiles. Using absolute path can avoid this ambiguity.
*/
if (Path.WINDOWS) {
if (LocalFileSystem.class.isInstance(stagingPath.getFileSystem(conf))) {
conf.set(MRJobConfig.MR_AM_STAGING_DIR,
new File(conf.get(MRJobConfig.MR_AM_STAGING_DIR))
.getAbsolutePath());
}
}
FileContext fc=FileContext.getFileContext(stagingPath.toUri(), conf);
if (fc.util().exists(stagingPath)) {
LOG.info(stagingPath + " exists! deleting...");
fc.delete(stagingPath, true);
}
LOG.info("mkdir: " + stagingPath);
fc.mkdir(stagingPath, null, true);
//mkdir done directory as well
String doneDir =
JobHistoryUtils.getConfiguredHistoryServerDoneDirPrefix(conf);
Path doneDirPath = fc.makeQualified(new Path(doneDir));
fc.mkdir(doneDirPath, null, true);
} catch (IOException e) {
throw new TezUncheckedException("Could not create staging directory. ", e);
}
conf.set(MRConfig.MASTER_ADDRESS, "test");
//configure the shuffle service in NM
conf.setStrings(YarnConfiguration.NM_AUX_SERVICES,
new String[] { ShuffleHandler.MAPREDUCE_SHUFFLE_SERVICEID });
conf.setClass(String.format(YarnConfiguration.NM_AUX_SERVICE_FMT,
ShuffleHandler.MAPREDUCE_SHUFFLE_SERVICEID), ShuffleHandler.class,
Service.class);
// Non-standard shuffle port
conf.setInt(ShuffleHandler.SHUFFLE_PORT_CONFIG_KEY, 0);
conf.setClass(YarnConfiguration.NM_CONTAINER_EXECUTOR,
DefaultContainerExecutor.class, ContainerExecutor.class);
// TestMRJobs is for testing non-uberized operation only; see TestUberAM
// for corresponding uberized tests.
conf.setBoolean(MRJobConfig.JOB_UBERTASK_ENABLE, false);
super.serviceInit(conf);
}
@Override
public void serviceInit(Configuration conf) throws Exception {
conf.set(MRConfig.FRAMEWORK_NAME, MRConfig.YARN_FRAMEWORK_NAME);
if (conf.get(MRJobConfig.MR_AM_STAGING_DIR) == null) {
conf.set(MRJobConfig.MR_AM_STAGING_DIR, new File(getTestWorkDir(),
"apps_staging_dir/").getAbsolutePath());
}
// By default, VMEM monitoring disabled, PMEM monitoring enabled.
if (!conf.getBoolean(
MRConfig.MAPREDUCE_MINICLUSTER_CONTROL_RESOURCE_MONITORING,
MRConfig.DEFAULT_MAPREDUCE_MINICLUSTER_CONTROL_RESOURCE_MONITORING)) {
conf.setBoolean(YarnConfiguration.NM_PMEM_CHECK_ENABLED, false);
conf.setBoolean(YarnConfiguration.NM_VMEM_CHECK_ENABLED, false);
}
conf.set(CommonConfigurationKeys.FS_PERMISSIONS_UMASK_KEY, "000");
try {
Path stagingPath = FileContext.getFileContext(conf).makeQualified(
new Path(conf.get(MRJobConfig.MR_AM_STAGING_DIR)));
/*
* Re-configure the staging path on Windows if the file system is localFs.
* We need to use a absolute path that contains the drive letter. The unit
* test could run on a different drive than the AM. We can run into the
* issue that job files are localized to the drive where the test runs on,
* while the AM starts on a different drive and fails to find the job
* metafiles. Using absolute path can avoid this ambiguity.
*/
if (Path.WINDOWS) {
if (LocalFileSystem.class.isInstance(stagingPath.getFileSystem(conf))) {
conf.set(MRJobConfig.MR_AM_STAGING_DIR,
new File(conf.get(MRJobConfig.MR_AM_STAGING_DIR))
.getAbsolutePath());
}
}
FileContext fc=FileContext.getFileContext(stagingPath.toUri(), conf);
if (fc.util().exists(stagingPath)) {
LOG.info(stagingPath + " exists! deleting...");
fc.delete(stagingPath, true);
}
LOG.info("mkdir: " + stagingPath);
//mkdir the staging directory so that right permissions are set while running as proxy user
fc.mkdir(stagingPath, null, true);
//mkdir done directory as well
String doneDir = JobHistoryUtils.getConfiguredHistoryServerDoneDirPrefix(conf);
Path doneDirPath = fc.makeQualified(new Path(doneDir));
fc.mkdir(doneDirPath, null, true);
} catch (IOException e) {
throw new YarnRuntimeException("Could not create staging directory. ", e);
}
conf.set(MRConfig.MASTER_ADDRESS, "test"); // The default is local because of
// which shuffle doesn't happen
//configure the shuffle service in NM
conf.setStrings(YarnConfiguration.NM_AUX_SERVICES,
new String[] { ShuffleHandler.MAPREDUCE_SHUFFLE_SERVICEID });
conf.setClass(String.format(YarnConfiguration.NM_AUX_SERVICE_FMT,
ShuffleHandler.MAPREDUCE_SHUFFLE_SERVICEID), ShuffleHandler.class,
Service.class);
// Non-standard shuffle port
conf.setInt(ShuffleHandler.SHUFFLE_PORT_CONFIG_KEY, 0);
conf.setClass(YarnConfiguration.NM_CONTAINER_EXECUTOR,
DefaultContainerExecutor.class, ContainerExecutor.class);
// TestMRJobs is for testing non-uberized operation only; see TestUberAM
// for corresponding uberized tests.
conf.setBoolean(MRJobConfig.JOB_UBERTASK_ENABLE, false);
super.serviceInit(conf);
}