下面列出了org.apache.hadoop.mapreduce.split.JobSplit.SplitMetaInfo#org.apache.hadoop.mapreduce.MRConfig 实例代码,或者点击链接到github查看源代码,也可以在右侧发表评论。
@Override
public void map(LongWritable key, Text value, Context context)
throws IOException {
StringBuilder sb = new StringBuilder(512);
for (int i = 0; i < 1000; i++) {
sb.append("a");
}
context.setStatus(sb.toString());
int progressStatusLength = context.getConfiguration().getInt(
MRConfig.PROGRESS_STATUS_LEN_LIMIT_KEY,
MRConfig.PROGRESS_STATUS_LEN_LIMIT_DEFAULT);
if (context.getStatus().length() > progressStatusLength) {
throw new IOException("Status is not truncated");
}
}
@Test (timeout = 120000)
public void testSetClasspathWithJobClassloader() throws IOException {
Configuration conf = new Configuration();
conf.setBoolean(MRConfig.MAPREDUCE_APP_SUBMISSION_CROSS_PLATFORM, true);
conf.setBoolean(MRJobConfig.MAPREDUCE_JOB_CLASSLOADER, true);
Map<String, String> env = new HashMap<String, String>();
MRApps.setClasspath(env, conf);
String cp = env.get("CLASSPATH");
String appCp = env.get("APP_CLASSPATH");
assertFalse("MAPREDUCE_JOB_CLASSLOADER true, but job.jar is in the"
+ " classpath!", cp.contains("jar" + ApplicationConstants.CLASS_PATH_SEPARATOR + "job"));
assertFalse("MAPREDUCE_JOB_CLASSLOADER true, but PWD is in the classpath!",
cp.contains("PWD"));
String expectedAppClasspath = StringUtils.join(ApplicationConstants.CLASS_PATH_SEPARATOR,
Arrays.asList(ApplicationConstants.Environment.PWD.$$(), "job.jar/job.jar",
"job.jar/classes/", "job.jar/lib/*",
ApplicationConstants.Environment.PWD.$$() + "/*"));
assertEquals("MAPREDUCE_JOB_CLASSLOADER true, but job.jar is not in the app"
+ " classpath!", expectedAppClasspath, appCp);
}
@Test
public void testGetClusterStatusWithLocalJobRunner() throws Exception {
Configuration conf = new Configuration();
conf.set(JTConfig.JT_IPC_ADDRESS, MRConfig.LOCAL_FRAMEWORK_NAME);
conf.set(MRConfig.FRAMEWORK_NAME, MRConfig.LOCAL_FRAMEWORK_NAME);
JobClient client = new JobClient(conf);
ClusterStatus clusterStatus = client.getClusterStatus(true);
Collection<String> activeTrackerNames = clusterStatus
.getActiveTrackerNames();
Assert.assertEquals(0, activeTrackerNames.size());
int blacklistedTrackers = clusterStatus.getBlacklistedTrackers();
Assert.assertEquals(0, blacklistedTrackers);
Collection<BlackListInfo> blackListedTrackersInfo = clusterStatus
.getBlackListedTrackersInfo();
Assert.assertEquals(0, blackListedTrackersInfo.size());
}
@Test
public void testFramework() {
JobConf jobConf = new JobConf();
jobConf.set(JTConfig.JT_IPC_ADDRESS, MRConfig.LOCAL_FRAMEWORK_NAME);
jobConf.set(MRConfig.FRAMEWORK_NAME, MRConfig.YARN_FRAMEWORK_NAME);
assertFalse("Expected 'isLocal' to be false",
StreamUtil.isLocalJobTracker(jobConf));
jobConf.set(JTConfig.JT_IPC_ADDRESS, MRConfig.LOCAL_FRAMEWORK_NAME);
jobConf.set(MRConfig.FRAMEWORK_NAME, MRConfig.CLASSIC_FRAMEWORK_NAME);
assertFalse("Expected 'isLocal' to be false",
StreamUtil.isLocalJobTracker(jobConf));
jobConf.set(JTConfig.JT_IPC_ADDRESS, "jthost:9090");
jobConf.set(MRConfig.FRAMEWORK_NAME, MRConfig.LOCAL_FRAMEWORK_NAME);
assertTrue("Expected 'isLocal' to be true",
StreamUtil.isLocalJobTracker(jobConf));
}
@BeforeClass
public static void setUp() throws Exception {
final Configuration conf = new Configuration();
conf.set(MRConfig.FRAMEWORK_NAME, MRConfig.YARN_FRAMEWORK_NAME);
conf.set(YarnConfiguration.RM_PRINCIPAL, "jt_id/" + SecurityUtil.HOSTNAME_PATTERN + "@APACHE.ORG");
final MiniDFSCluster.Builder builder = new MiniDFSCluster.Builder(conf);
builder.checkExitOnShutdown(true);
builder.numDataNodes(numSlaves);
builder.format(true);
builder.racks(null);
dfsCluster = builder.build();
mrCluster = new MiniMRYarnCluster(TestBinaryTokenFile.class.getName(), noOfNMs);
mrCluster.init(conf);
mrCluster.start();
NameNodeAdapter.getDtSecretManager(dfsCluster.getNamesystem()).startThreads();
FileSystem fs = dfsCluster.getFileSystem();
p1 = new Path("file1");
p1 = fs.makeQualified(p1);
}
@Test (timeout = 120000)
public void testSetClasspathWithUserPrecendence() {
Configuration conf = new Configuration();
conf.setBoolean(MRConfig.MAPREDUCE_APP_SUBMISSION_CROSS_PLATFORM, true);
conf.setBoolean(MRJobConfig.MAPREDUCE_JOB_USER_CLASSPATH_FIRST, true);
Map<String, String> env = new HashMap<String, String>();
try {
MRApps.setClasspath(env, conf);
} catch (Exception e) {
fail("Got exception while setting classpath");
}
String env_str = env.get("CLASSPATH");
String expectedClasspath = StringUtils.join(ApplicationConstants.CLASS_PATH_SEPARATOR,
Arrays.asList(ApplicationConstants.Environment.PWD.$$(), "job.jar/job.jar",
"job.jar/classes/", "job.jar/lib/*",
ApplicationConstants.Environment.PWD.$$() + "/*"));
assertTrue("MAPREDUCE_JOB_USER_CLASSPATH_FIRST set, but not taking effect!",
env_str.startsWith(expectedClasspath));
}
@Test (timeout = 120000)
public void testSetClasspathWithNoUserPrecendence() {
Configuration conf = new Configuration();
conf.setBoolean(MRConfig.MAPREDUCE_APP_SUBMISSION_CROSS_PLATFORM, true);
conf.setBoolean(MRJobConfig.MAPREDUCE_JOB_USER_CLASSPATH_FIRST, false);
Map<String, String> env = new HashMap<String, String>();
try {
MRApps.setClasspath(env, conf);
} catch (Exception e) {
fail("Got exception while setting classpath");
}
String env_str = env.get("CLASSPATH");
String expectedClasspath = StringUtils.join(ApplicationConstants.CLASS_PATH_SEPARATOR,
Arrays.asList("job.jar/job.jar", "job.jar/classes/", "job.jar/lib/*",
ApplicationConstants.Environment.PWD.$$() + "/*"));
assertTrue("MAPREDUCE_JOB_USER_CLASSPATH_FIRST false, and job.jar is not in"
+ " the classpath!", env_str.contains(expectedClasspath));
assertFalse("MAPREDUCE_JOB_USER_CLASSPATH_FIRST false, but taking effect!",
env_str.startsWith(expectedClasspath));
}
public void setConf(Configuration conf) {
if (conf instanceof JobConf) {
this.conf = (JobConf) conf;
} else {
this.conf = new JobConf(conf);
}
this.mapOutputFile = ReflectionUtils.newInstance(
conf.getClass(MRConfig.TASK_LOCAL_OUTPUT_CLASS,
MROutputFiles.class, MapOutputFile.class), conf);
this.lDirAlloc = new LocalDirAllocator(MRConfig.LOCAL_DIR);
// add the static resolutions (this is required for the junit to
// work on testcases that simulate multiple nodes on a single physical
// node.
String hostToResolved[] = conf.getStrings(MRConfig.STATIC_RESOLUTIONS);
if (hostToResolved != null) {
for (String str : hostToResolved) {
String name = str.substring(0, str.indexOf('='));
String resolvedName = str.substring(str.indexOf('=') + 1);
NetUtils.addStaticResolution(name, resolvedName);
}
}
}
/**
* Create a checksum input stream that reads
* @param in The input stream to be verified for checksum.
* @param len The length of the input stream including checksum bytes.
*/
public IFileInputStream(InputStream in, long len, Configuration conf) {
this.in = in;
this.inFd = getFileDescriptorIfAvail(in);
sum = DataChecksum.newDataChecksum(DataChecksum.Type.CRC32,
Integer.MAX_VALUE);
checksumSize = sum.getChecksumSize();
length = len;
dataLength = length - checksumSize;
conf = (conf != null) ? conf : new Configuration();
readahead = conf.getBoolean(MRConfig.MAPRED_IFILE_READAHEAD,
MRConfig.DEFAULT_MAPRED_IFILE_READAHEAD);
readaheadLength = conf.getInt(MRConfig.MAPRED_IFILE_READAHEAD_BYTES,
MRConfig.DEFAULT_MAPRED_IFILE_READAHEAD_BYTES);
doReadahead();
}
@Test
public void testClusterAdmins() {
Map<JobACL, AccessControlList> tmpJobACLs = new HashMap<JobACL, AccessControlList>();
Configuration conf = new Configuration();
String jobOwner = "testuser";
conf.set(JobACL.VIEW_JOB.getAclName(), jobOwner);
conf.set(JobACL.MODIFY_JOB.getAclName(), jobOwner);
conf.setBoolean(MRConfig.MR_ACLS_ENABLED, true);
String clusterAdmin = "testuser2";
conf.set(MRConfig.MR_ADMINS, clusterAdmin);
JobACLsManager aclsManager = new JobACLsManager(conf);
tmpJobACLs = aclsManager.constructJobACLs(conf);
final Map<JobACL, AccessControlList> jobACLs = tmpJobACLs;
UserGroupInformation callerUGI = UserGroupInformation.createUserForTesting(
clusterAdmin, new String[] {});
// cluster admin should have access
boolean val = aclsManager.checkAccess(callerUGI, JobACL.VIEW_JOB, jobOwner,
jobACLs.get(JobACL.VIEW_JOB));
assertTrue("cluster admin should have view access", val);
val = aclsManager.checkAccess(callerUGI, JobACL.MODIFY_JOB, jobOwner,
jobACLs.get(JobACL.MODIFY_JOB));
assertTrue("cluster admin should have modify access", val);
}
@Test
public void testGetMasterUser() {
YarnConfiguration conf = new YarnConfiguration();
conf.set(MRConfig.MASTER_USER_NAME, "foo");
conf.set(YarnConfiguration.RM_PRINCIPAL, "bar");
// default is yarn framework
assertEquals(Master.getMasterUserName(conf), "bar");
// set framework name to classic
conf.set(MRConfig.FRAMEWORK_NAME, MRConfig.CLASSIC_FRAMEWORK_NAME);
assertEquals(Master.getMasterUserName(conf), "foo");
// change framework to yarn
conf.set(MRConfig.FRAMEWORK_NAME, MRConfig.YARN_FRAMEWORK_NAME);
assertEquals(Master.getMasterUserName(conf), "bar");
}
@Test
public void testClusterAdmins() {
Map<JobACL, AccessControlList> tmpJobACLs = new HashMap<JobACL, AccessControlList>();
Configuration conf = new Configuration();
String jobOwner = "testuser";
conf.set(JobACL.VIEW_JOB.getAclName(), jobOwner);
conf.set(JobACL.MODIFY_JOB.getAclName(), jobOwner);
conf.setBoolean(MRConfig.MR_ACLS_ENABLED, true);
String clusterAdmin = "testuser2";
conf.set(MRConfig.MR_ADMINS, clusterAdmin);
JobACLsManager aclsManager = new JobACLsManager(conf);
tmpJobACLs = aclsManager.constructJobACLs(conf);
final Map<JobACL, AccessControlList> jobACLs = tmpJobACLs;
UserGroupInformation callerUGI = UserGroupInformation.createUserForTesting(
clusterAdmin, new String[] {});
// cluster admin should have access
boolean val = aclsManager.checkAccess(callerUGI, JobACL.VIEW_JOB, jobOwner,
jobACLs.get(JobACL.VIEW_JOB));
assertTrue("cluster admin should have view access", val);
val = aclsManager.checkAccess(callerUGI, JobACL.MODIFY_JOB, jobOwner,
jobACLs.get(JobACL.MODIFY_JOB));
assertTrue("cluster admin should have modify access", val);
}
@Test
public void testGroups() {
Map<JobACL, AccessControlList> tmpJobACLs = new HashMap<JobACL, AccessControlList>();
Configuration conf = new Configuration();
String jobOwner = "testuser";
conf.set(JobACL.VIEW_JOB.getAclName(), jobOwner);
conf.setBoolean(MRConfig.MR_ACLS_ENABLED, true);
String user = "testuser2";
String adminGroup = "adminGroup";
conf.set(MRConfig.MR_ADMINS, " " + adminGroup);
JobACLsManager aclsManager = new JobACLsManager(conf);
tmpJobACLs = aclsManager.constructJobACLs(conf);
final Map<JobACL, AccessControlList> jobACLs = tmpJobACLs;
UserGroupInformation callerUGI = UserGroupInformation.createUserForTesting(
user, new String[] {adminGroup});
// acls off so anyone should have access
boolean val = aclsManager.checkAccess(callerUGI, JobACL.VIEW_JOB, jobOwner,
jobACLs.get(JobACL.VIEW_JOB));
assertTrue("user in admin group should have access", val);
}
@Test
public void testMaxBlockLocationsNewSplits() throws Exception {
TEST_DIR.mkdirs();
try {
Configuration conf = new Configuration();
conf.setInt(MRConfig.MAX_BLOCK_LOCATIONS_KEY, 4);
Path submitDir = new Path(TEST_DIR.getAbsolutePath());
FileSystem fs = FileSystem.getLocal(conf);
FileSplit split = new FileSplit(new Path("/some/path"), 0, 1,
new String[] { "loc1", "loc2", "loc3", "loc4", "loc5" });
JobSplitWriter.createSplitFiles(submitDir, conf, fs,
new FileSplit[] { split });
JobSplit.TaskSplitMetaInfo[] infos =
SplitMetaInfoReader.readSplitMetaInfo(new JobID(), fs, conf,
submitDir);
assertEquals("unexpected number of splits", 1, infos.length);
assertEquals("unexpected number of split locations",
4, infos[0].getLocations().length);
} finally {
FileUtil.fullyDelete(TEST_DIR);
}
}
@Test
public void testMaxBlockLocationsOldSplits() throws Exception {
TEST_DIR.mkdirs();
try {
Configuration conf = new Configuration();
conf.setInt(MRConfig.MAX_BLOCK_LOCATIONS_KEY, 4);
Path submitDir = new Path(TEST_DIR.getAbsolutePath());
FileSystem fs = FileSystem.getLocal(conf);
org.apache.hadoop.mapred.FileSplit split =
new org.apache.hadoop.mapred.FileSplit(new Path("/some/path"), 0, 1,
new String[] { "loc1", "loc2", "loc3", "loc4", "loc5" });
JobSplitWriter.createSplitFiles(submitDir, conf, fs,
new org.apache.hadoop.mapred.InputSplit[] { split });
JobSplit.TaskSplitMetaInfo[] infos =
SplitMetaInfoReader.readSplitMetaInfo(new JobID(), fs, conf,
submitDir);
assertEquals("unexpected number of splits", 1, infos.length);
assertEquals("unexpected number of split locations",
4, infos[0].getLocations().length);
} finally {
FileUtil.fullyDelete(TEST_DIR);
}
}
@Test
public void testMaxBlockLocationsOldSplits() throws Exception {
TEST_DIR.mkdirs();
try {
Configuration conf = new Configuration();
conf.setInt(MRConfig.MAX_BLOCK_LOCATIONS_KEY, 4);
Path submitDir = new Path(TEST_DIR.getAbsolutePath());
FileSystem fs = FileSystem.getLocal(conf);
org.apache.hadoop.mapred.FileSplit split =
new org.apache.hadoop.mapred.FileSplit(new Path("/some/path"), 0, 1,
new String[] { "loc1", "loc2", "loc3", "loc4", "loc5" });
JobSplitWriter.createSplitFiles(submitDir, conf, fs,
new org.apache.hadoop.mapred.InputSplit[] { split });
JobSplit.TaskSplitMetaInfo[] infos =
SplitMetaInfoReader.readSplitMetaInfo(new JobID(), fs, conf,
submitDir);
assertEquals("unexpected number of splits", 1, infos.length);
assertEquals("unexpected number of split locations",
4, infos[0].getLocations().length);
} finally {
FileUtil.fullyDelete(TEST_DIR);
}
}
/**
* Sets the high ram job properties in the simulated job's configuration.
*/
@SuppressWarnings("deprecation")
static void configureHighRamProperties(Configuration sourceConf,
Configuration destConf) {
// set the memory per map task
scaleConfigParameter(sourceConf, destConf,
MRConfig.MAPMEMORY_MB, MRJobConfig.MAP_MEMORY_MB,
MRJobConfig.DEFAULT_MAP_MEMORY_MB);
// validate and fail early
validateTaskMemoryLimits(destConf, MRJobConfig.MAP_MEMORY_MB,
JTConfig.JT_MAX_MAPMEMORY_MB);
// set the memory per reduce task
scaleConfigParameter(sourceConf, destConf,
MRConfig.REDUCEMEMORY_MB, MRJobConfig.REDUCE_MEMORY_MB,
MRJobConfig.DEFAULT_REDUCE_MEMORY_MB);
// validate and fail early
validateTaskMemoryLimits(destConf, MRJobConfig.REDUCE_MEMORY_MB,
JTConfig.JT_MAX_REDUCEMEMORY_MB);
}
@Test
public void testFramework() {
JobConf jobConf = new JobConf();
jobConf.set(JTConfig.JT_IPC_ADDRESS, MRConfig.LOCAL_FRAMEWORK_NAME);
jobConf.set(MRConfig.FRAMEWORK_NAME, MRConfig.YARN_FRAMEWORK_NAME);
assertFalse("Expected 'isLocal' to be false",
StreamUtil.isLocalJobTracker(jobConf));
jobConf.set(JTConfig.JT_IPC_ADDRESS, MRConfig.LOCAL_FRAMEWORK_NAME);
jobConf.set(MRConfig.FRAMEWORK_NAME, MRConfig.CLASSIC_FRAMEWORK_NAME);
assertFalse("Expected 'isLocal' to be false",
StreamUtil.isLocalJobTracker(jobConf));
jobConf.set(JTConfig.JT_IPC_ADDRESS, "jthost:9090");
jobConf.set(MRConfig.FRAMEWORK_NAME, MRConfig.LOCAL_FRAMEWORK_NAME);
assertTrue("Expected 'isLocal' to be true",
StreamUtil.isLocalJobTracker(jobConf));
}
@Test (timeout = 120000)
public void testSetClasspathWithNoUserPrecendence() {
Configuration conf = new Configuration();
conf.setBoolean(MRConfig.MAPREDUCE_APP_SUBMISSION_CROSS_PLATFORM, true);
conf.setBoolean(MRJobConfig.MAPREDUCE_JOB_USER_CLASSPATH_FIRST, false);
Map<String, String> env = new HashMap<String, String>();
try {
MRApps.setClasspath(env, conf);
} catch (Exception e) {
fail("Got exception while setting classpath");
}
String env_str = env.get("CLASSPATH");
String expectedClasspath = StringUtils.join(ApplicationConstants.CLASS_PATH_SEPARATOR,
Arrays.asList("job.jar/job.jar", "job.jar/classes/", "job.jar/lib/*",
ApplicationConstants.Environment.PWD.$$() + "/*"));
assertTrue("MAPREDUCE_JOB_USER_CLASSPATH_FIRST false, and job.jar is not in"
+ " the classpath!", env_str.contains(expectedClasspath));
assertFalse("MAPREDUCE_JOB_USER_CLASSPATH_FIRST false, but taking effect!",
env_str.startsWith(expectedClasspath));
}
@Test (timeout = 30000)
public void testCommandLine() throws Exception {
MyMRApp app = new MyMRApp(1, 0, true, this.getClass().getName(), true);
Configuration conf = new Configuration();
conf.setBoolean(MRConfig.MAPREDUCE_APP_SUBMISSION_CROSS_PLATFORM, true);
Job job = app.submit(conf);
app.waitForState(job, JobState.SUCCEEDED);
app.verifyCompleted();
Assert.assertEquals(
"[" + MRApps.crossPlatformify("JAVA_HOME") + "/bin/java" +
" -Djava.net.preferIPv4Stack=true" +
" -Dhadoop.metrics.log.level=WARN" +
" -Xmx200m -Djava.io.tmpdir=" + MRApps.crossPlatformify("PWD") + "/tmp" +
" -Dlog4j.configuration=container-log4j.properties" +
" -Dyarn.app.container.log.dir=<LOG_DIR>" +
" -Dyarn.app.container.log.filesize=0" +
" -Dhadoop.root.logger=INFO,CLA -Dhadoop.root.logfile=syslog" +
" org.apache.hadoop.mapred.YarnChild 127.0.0.1" +
" 54321" +
" attempt_0_0000_m_000000_0" +
" 0" +
" 1><LOG_DIR>/stdout" +
" 2><LOG_DIR>/stderr ]", app.myCommandLine);
Assert.assertTrue("HADOOP_ROOT_LOGGER not set for job",
app.cmdEnvironment.containsKey("HADOOP_ROOT_LOGGER"));
Assert.assertEquals("INFO,console",
app.cmdEnvironment.get("HADOOP_ROOT_LOGGER"));
Assert.assertTrue("HADOOP_CLIENT_OPTS not set for job",
app.cmdEnvironment.containsKey("HADOOP_CLIENT_OPTS"));
Assert.assertEquals("", app.cmdEnvironment.get("HADOOP_CLIENT_OPTS"));
}
@Test
public void testAMStandardEnv() throws Exception {
final String ADMIN_LIB_PATH = "foo";
final String USER_LIB_PATH = "bar";
final String USER_SHELL = "shell";
JobConf jobConf = new JobConf();
jobConf.set(MRJobConfig.MR_AM_ADMIN_USER_ENV, "LD_LIBRARY_PATH=" +
ADMIN_LIB_PATH);
jobConf.set(MRJobConfig.MR_AM_ENV, "LD_LIBRARY_PATH="
+ USER_LIB_PATH);
jobConf.set(MRJobConfig.MAPRED_ADMIN_USER_SHELL, USER_SHELL);
YARNRunner yarnRunner = new YARNRunner(jobConf);
ApplicationSubmissionContext appSubCtx =
buildSubmitContext(yarnRunner, jobConf);
// make sure PWD is first in the lib path
ContainerLaunchContext clc = appSubCtx.getAMContainerSpec();
Map<String, String> env = clc.getEnvironment();
String libPath = env.get(Environment.LD_LIBRARY_PATH.name());
assertNotNull("LD_LIBRARY_PATH not set", libPath);
String cps = jobConf.getBoolean(
MRConfig.MAPREDUCE_APP_SUBMISSION_CROSS_PLATFORM,
MRConfig.DEFAULT_MAPREDUCE_APP_SUBMISSION_CROSS_PLATFORM)
? ApplicationConstants.CLASS_PATH_SEPARATOR : File.pathSeparator;
assertEquals("Bad AM LD_LIBRARY_PATH setting",
MRApps.crossPlatformifyMREnv(conf, Environment.PWD)
+ cps + ADMIN_LIB_PATH + cps + USER_LIB_PATH, libPath);
// make sure SHELL is set
String shell = env.get(Environment.SHELL.name());
assertNotNull("SHELL not set", shell);
assertEquals("Bad SHELL setting", USER_SHELL, shell);
}
@Override
public ClientProtocol create(Configuration conf) throws IOException {
if (MRConfig.YARN_FRAMEWORK_NAME.equals(conf.get(MRConfig.FRAMEWORK_NAME))) {
return new YARNRunner(conf);
}
return null;
}
@Before
public void setupTezCluster() throws Exception {
if (enableSSLInCluster) {
// Enable SSL debugging
System.setProperty("javax.net.debug", "all");
setupKeyStores();
}
conf.setBoolean(MRConfig.SHUFFLE_SSL_ENABLED_KEY, enableSSLInCluster);
// 3 seconds should be good enough in local machine
conf.setInt(TezRuntimeConfiguration.TEZ_RUNTIME_SHUFFLE_CONNECT_TIMEOUT, 3 * 1000);
conf.setInt(TezRuntimeConfiguration.TEZ_RUNTIME_SHUFFLE_READ_TIMEOUT, 3 * 1000);
//set to low value so that it can detect failures quickly
conf.setInt(TezRuntimeConfiguration.TEZ_RUNTIME_SHUFFLE_FETCH_FAILURES_LIMIT, 2);
conf.setLong(TezConfiguration.TEZ_AM_SLEEP_TIME_BEFORE_EXIT_MILLIS, 500);
String sslConf = conf.get(SSL_CLIENT_CONF_KEY, "ssl-client.xml");
conf.addResource(sslConf);
miniTezCluster = new MiniTezCluster(TestSecureShuffle.class.getName() + "-" +
(enableSSLInCluster ? "withssl" : "withoutssl"), 1, 1, 1);
miniTezCluster.init(conf);
miniTezCluster.start();
createSampleFile(inputLoc);
}
@Test
public void testGetMasterAddress() {
YarnConfiguration conf = new YarnConfiguration();
// Default is yarn framework
String masterHostname = Master.getMasterAddress(conf).getHostName();
// no address set so should default to default rm address
InetSocketAddress rmAddr = NetUtils.createSocketAddr(YarnConfiguration.DEFAULT_RM_ADDRESS);
assertEquals(masterHostname, rmAddr.getHostName());
// Trying invalid master address for classic
conf.set(MRConfig.FRAMEWORK_NAME, MRConfig.CLASSIC_FRAMEWORK_NAME);
conf.set(MRConfig.MASTER_ADDRESS, "local:invalid");
// should throw an exception for invalid value
try {
Master.getMasterAddress(conf);
fail("Should not reach here as there is a bad master address");
}
catch (Exception e) {
// Expected
}
// Change master address to a valid value
conf.set(MRConfig.MASTER_ADDRESS, "bar.com:8042");
masterHostname = Master.getMasterAddress(conf).getHostName();
assertEquals(masterHostname, "bar.com");
// change framework to yarn
conf.set(MRConfig.FRAMEWORK_NAME, MRConfig.YARN_FRAMEWORK_NAME);
conf.set(YarnConfiguration.RM_ADDRESS, "foo1.com:8192");
masterHostname = Master.getMasterAddress(conf).getHostName();
assertEquals(masterHostname, "foo1.com");
}
@Before
public void configure() throws Exception {
Path testdir = new Path(TEST_DIR.getAbsolutePath());
Path inDir = new Path(testdir, "in");
Path outDir = new Path(testdir, "out");
FileSystem fs = FileSystem.get(conf);
fs.delete(testdir, true);
conf.setInt(JobContext.IO_SORT_MB, 1);
conf.setInputFormat(SequenceFileInputFormat.class);
FileInputFormat.setInputPaths(conf, inDir);
FileOutputFormat.setOutputPath(conf, outDir);
conf.setMapperClass(TextGen.class);
conf.setReducerClass(TextReduce.class);
conf.setOutputKeyClass(Text.class);
conf.setOutputValueClass(Text.class);
conf.set(MRConfig.FRAMEWORK_NAME, MRConfig.LOCAL_FRAMEWORK_NAME);
conf.setOutputFormat(SequenceFileOutputFormat.class);
if (!fs.mkdirs(testdir)) {
throw new IOException("Mkdirs failed to create " + testdir.toString());
}
if (!fs.mkdirs(inDir)) {
throw new IOException("Mkdirs failed to create " + inDir.toString());
}
Path inFile = new Path(inDir, "part0");
SequenceFile.Writer writer = SequenceFile.createWriter(fs, conf, inFile,
Text.class, Text.class);
writer.append(new Text("rec: 1"), new Text("Hello"));
writer.close();
jc = new JobClient(conf);
}
@Test
public void testClusterNoAdmins() {
Map<JobACL, AccessControlList> tmpJobACLs = new HashMap<JobACL, AccessControlList>();
Configuration conf = new Configuration();
String jobOwner = "testuser";
conf.set(JobACL.VIEW_JOB.getAclName(), "");
conf.setBoolean(MRConfig.MR_ACLS_ENABLED, true);
String noAdminUser = "testuser2";
JobACLsManager aclsManager = new JobACLsManager(conf);
tmpJobACLs = aclsManager.constructJobACLs(conf);
final Map<JobACL, AccessControlList> jobACLs = tmpJobACLs;
UserGroupInformation callerUGI = UserGroupInformation.createUserForTesting(
noAdminUser, new String[] {});
// random user should not have access
boolean val = aclsManager.checkAccess(callerUGI, JobACL.VIEW_JOB, jobOwner,
jobACLs.get(JobACL.VIEW_JOB));
assertFalse("random user should not have view access", val);
val = aclsManager.checkAccess(callerUGI, JobACL.MODIFY_JOB, jobOwner,
jobACLs.get(JobACL.MODIFY_JOB));
assertFalse("random user should not have modify access", val);
callerUGI = UserGroupInformation.createUserForTesting(jobOwner,
new String[] {});
// Owner should have access
val = aclsManager.checkAccess(callerUGI, JobACL.VIEW_JOB, jobOwner,
jobACLs.get(JobACL.VIEW_JOB));
assertTrue("owner should have view access", val);
val = aclsManager.checkAccess(callerUGI, JobACL.MODIFY_JOB, jobOwner,
jobACLs.get(JobACL.MODIFY_JOB));
assertTrue("owner should have modify access", val);
}
private static void testHighRamConfig(long jobMapMB, long jobReduceMB,
long clusterMapMB, long clusterReduceMB, long simulatedClusterMapMB,
long simulatedClusterReduceMB, long expectedMapMB, long expectedReduceMB,
Configuration gConf)
throws IOException {
Configuration simulatedJobConf = new Configuration(gConf);
simulatedJobConf.setLong(MRConfig.MAPMEMORY_MB, simulatedClusterMapMB);
simulatedJobConf.setLong(MRConfig.REDUCEMEMORY_MB,
simulatedClusterReduceMB);
// define a source conf
Configuration sourceConf = new Configuration();
// configure the original job
sourceConf.setLong(MRJobConfig.MAP_MEMORY_MB, jobMapMB);
sourceConf.setLong(MRConfig.MAPMEMORY_MB, clusterMapMB);
sourceConf.setLong(MRJobConfig.REDUCE_MEMORY_MB, jobReduceMB);
sourceConf.setLong(MRConfig.REDUCEMEMORY_MB, clusterReduceMB);
// define a mock job
MockJob story = new MockJob(sourceConf);
GridmixJob job = new DummyGridmixJob(simulatedJobConf, story);
Job simulatedJob = job.getJob();
Configuration simulatedConf = simulatedJob.getConfiguration();
// check if the high ram properties are not set
assertEquals(expectedMapMB,
simulatedConf.getLong(MRJobConfig.MAP_MEMORY_MB,
MRJobConfig.DEFAULT_MAP_MEMORY_MB));
assertEquals(expectedReduceMB,
simulatedConf.getLong(MRJobConfig.REDUCE_MEMORY_MB,
MRJobConfig.DEFAULT_MAP_MEMORY_MB));
}
@SuppressWarnings("unchecked")
private static <T extends InputSplit>
SplitMetaInfo[] writeNewSplits(Configuration conf,
T[] array, FSDataOutputStream out)
throws IOException, InterruptedException {
SplitMetaInfo[] info = new SplitMetaInfo[array.length];
if (array.length != 0) {
SerializationFactory factory = new SerializationFactory(conf);
int i = 0;
int maxBlockLocations = conf.getInt(MRConfig.MAX_BLOCK_LOCATIONS_KEY,
MRConfig.MAX_BLOCK_LOCATIONS_DEFAULT);
long offset = out.getPos();
for(T split: array) {
long prevCount = out.getPos();
Text.writeString(out, split.getClass().getName());
Serializer<T> serializer =
factory.getSerializer((Class<T>) split.getClass());
serializer.open(out);
serializer.serialize(split);
long currCount = out.getPos();
String[] locations = split.getLocations();
if (locations.length > maxBlockLocations) {
LOG.warn("Max block location exceeded for split: "
+ split + " splitsize: " + locations.length +
" maxsize: " + maxBlockLocations);
locations = Arrays.copyOf(locations, maxBlockLocations);
}
info[i++] =
new JobSplit.SplitMetaInfo(
locations, offset,
split.getLength());
offset += currCount - prevCount;
}
}
return info;
}
@Override
public ClientProtocol create(Configuration conf) throws IOException {
String framework =
conf.get(MRConfig.FRAMEWORK_NAME, MRConfig.LOCAL_FRAMEWORK_NAME);
if (!MRConfig.LOCAL_FRAMEWORK_NAME.equals(framework)) {
return null;
}
conf.setInt(JobContext.NUM_MAPS, 1);
return new LocalJobRunner(conf);
}
public void setUpJobConf(JobConf job) {
job.set(TezRuntimeFrameworkConfigs.LOCAL_DIRS, workDir.toString());
job.set(MRConfig.LOCAL_DIR, workDir.toString());
job.setClass(
Constants.TEZ_RUNTIME_TASK_OUTPUT_MANAGER,
TezTaskOutputFiles.class,
TezTaskOutput.class);
job.set(TezRuntimeConfiguration.TEZ_RUNTIME_PARTITIONER_CLASS, MRPartitioner.class.getName());
job.setNumReduceTasks(1);
job.setInt(MRJobConfig.FILEOUTPUTCOMMITTER_ALGORITHM_VERSION, 1);
}