下面列出了怎么用org.apache.hadoop.mapreduce.server.jobtracker.JTConfig的API类实例代码及写法,或者点击链接到github查看源代码。
@Before
public void setUp() throws Exception {
dfs = new MiniDFSCluster.Builder(conf).numDataNodes(4).build();
fs = DFS_UGI.doAs(new PrivilegedExceptionAction<FileSystem>() {
public FileSystem run() throws IOException {
return dfs.getFileSystem();
}
});
// Home directories for users
mkdir(fs, "/user", "nobody", "nogroup", (short)01777);
mkdir(fs, "/user/alice", "alice", "nogroup", (short)0755);
mkdir(fs, "/user/bob", "bob", "nogroup", (short)0755);
// staging directory root with sticky bit
UserGroupInformation MR_UGI = UserGroupInformation.getLoginUser();
mkdir(fs, "/staging", MR_UGI.getShortUserName(), "nogroup", (short)01777);
JobConf mrConf = new JobConf();
mrConf.set(JTConfig.JT_STAGING_AREA_ROOT, "/staging");
mr = new MiniMRCluster(0, 0, 4, dfs.getFileSystem().getUri().toString(),
1, null, null, MR_UGI, mrConf);
}
public void testWithDFS() throws IOException {
MiniDFSCluster dfs = null;
MiniMRCluster mr = null;
FileSystem fileSys = null;
try {
final int taskTrackers = 4;
JobConf conf = new JobConf();
conf.set(JTConfig.JT_SYSTEM_DIR, "/tmp/custom/mapred/system");
dfs = new MiniDFSCluster.Builder(conf).numDataNodes(4).build();
fileSys = dfs.getFileSystem();
mr = new MiniMRCluster(taskTrackers, fileSys.getUri().toString(), 1, null, null, conf);
runWordCount(mr, mr.createJobConf(), conf.get("mapred.system.dir"));
} finally {
if (dfs != null) { dfs.shutdown(); }
if (mr != null) { mr.shutdown();
}
}
}
@Test
public void testGetClusterStatusWithLocalJobRunner() throws Exception {
Configuration conf = new Configuration();
conf.set(JTConfig.JT_IPC_ADDRESS, MRConfig.LOCAL_FRAMEWORK_NAME);
conf.set(MRConfig.FRAMEWORK_NAME, MRConfig.LOCAL_FRAMEWORK_NAME);
JobClient client = new JobClient(conf);
ClusterStatus clusterStatus = client.getClusterStatus(true);
Collection<String> activeTrackerNames = clusterStatus
.getActiveTrackerNames();
Assert.assertEquals(0, activeTrackerNames.size());
int blacklistedTrackers = clusterStatus.getBlacklistedTrackers();
Assert.assertEquals(0, blacklistedTrackers);
Collection<BlackListInfo> blackListedTrackersInfo = clusterStatus
.getBlackListedTrackersInfo();
Assert.assertEquals(0, blackListedTrackersInfo.size());
}
/**
* Sets the high ram job properties in the simulated job's configuration.
*/
@SuppressWarnings("deprecation")
static void configureHighRamProperties(Configuration sourceConf,
Configuration destConf) {
// set the memory per map task
scaleConfigParameter(sourceConf, destConf,
MRConfig.MAPMEMORY_MB, MRJobConfig.MAP_MEMORY_MB,
MRJobConfig.DEFAULT_MAP_MEMORY_MB);
// validate and fail early
validateTaskMemoryLimits(destConf, MRJobConfig.MAP_MEMORY_MB,
JTConfig.JT_MAX_MAPMEMORY_MB);
// set the memory per reduce task
scaleConfigParameter(sourceConf, destConf,
MRConfig.REDUCEMEMORY_MB, MRJobConfig.REDUCE_MEMORY_MB,
MRJobConfig.DEFAULT_REDUCE_MEMORY_MB);
// validate and fail early
validateTaskMemoryLimits(destConf, MRJobConfig.REDUCE_MEMORY_MB,
JTConfig.JT_MAX_REDUCEMEMORY_MB);
}
@Test
public void testFramework() {
JobConf jobConf = new JobConf();
jobConf.set(JTConfig.JT_IPC_ADDRESS, MRConfig.LOCAL_FRAMEWORK_NAME);
jobConf.set(MRConfig.FRAMEWORK_NAME, MRConfig.YARN_FRAMEWORK_NAME);
assertFalse("Expected 'isLocal' to be false",
StreamUtil.isLocalJobTracker(jobConf));
jobConf.set(JTConfig.JT_IPC_ADDRESS, MRConfig.LOCAL_FRAMEWORK_NAME);
jobConf.set(MRConfig.FRAMEWORK_NAME, MRConfig.CLASSIC_FRAMEWORK_NAME);
assertFalse("Expected 'isLocal' to be false",
StreamUtil.isLocalJobTracker(jobConf));
jobConf.set(JTConfig.JT_IPC_ADDRESS, "jthost:9090");
jobConf.set(MRConfig.FRAMEWORK_NAME, MRConfig.LOCAL_FRAMEWORK_NAME);
assertTrue("Expected 'isLocal' to be true",
StreamUtil.isLocalJobTracker(jobConf));
}
protected String[] genArgs(String jobtracker, String mapper, String reducer)
{
return new String[] {
"-input", INPUT_FILE,
"-output", OUTPUT_DIR,
"-mapper", mapper,
"-reducer", reducer,
"-jobconf", MRJobConfig.NUM_MAPS + "=1",
"-jobconf", MRJobConfig.NUM_REDUCES + "=1",
"-jobconf", MRJobConfig.PRESERVE_FAILED_TASK_FILES + "=true",
"-jobconf", "stream.tmpdir=" + new Path(TEST_ROOT_DIR).toUri().getPath(),
"-jobconf", JTConfig.JT_IPC_ADDRESS + "="+jobtracker,
"-jobconf", "fs.default.name=file:///",
"-jobconf", "mapred.jar=" + TestStreaming.STREAMING_JAR,
"-jobconf", "mapreduce.framework.name=yarn"
};
}
void runStreamJob(TaskType type, boolean isEmptyInput) throws IOException {
boolean mayExit = false;
StreamJob job = new StreamJob(genArgs(
mr.createJobConf().get(JTConfig.JT_IPC_ADDRESS), map, reduce), mayExit);
int returnValue = job.go();
assertEquals(0, returnValue);
// If input to reducer is empty, dummy reporter(which ignores all
// reporting lines) is set for MRErrorThread in waitOutputThreads(). So
// expectedCounterValue is 0 for empty-input-to-reducer case.
// Output of reducer is also empty for empty-input-to-reducer case.
int expectedCounterValue = 0;
if (type == TaskType.MAP || !isEmptyInput) {
validateTaskStatus(job, type);
// output is from "print STDOUT" statements in perl script
validateJobOutput(job.getConf());
expectedCounterValue = 2;
}
validateUserCounter(job, expectedCounterValue);
validateTaskStderr(job, type);
deleteOutDir(fs);
}
@Before
public void setUp() throws Exception {
dfs = new MiniDFSCluster.Builder(conf).numDataNodes(4).build();
fs = DFS_UGI.doAs(new PrivilegedExceptionAction<FileSystem>() {
public FileSystem run() throws IOException {
return dfs.getFileSystem();
}
});
// Home directories for users
mkdir(fs, "/user", "nobody", "nogroup", (short)01777);
mkdir(fs, "/user/alice", "alice", "nogroup", (short)0755);
mkdir(fs, "/user/bob", "bob", "nogroup", (short)0755);
// staging directory root with sticky bit
UserGroupInformation MR_UGI = UserGroupInformation.getLoginUser();
mkdir(fs, "/staging", MR_UGI.getShortUserName(), "nogroup", (short)01777);
JobConf mrConf = new JobConf();
mrConf.set(JTConfig.JT_STAGING_AREA_ROOT, "/staging");
mr = new MiniMRCluster(0, 0, 4, dfs.getFileSystem().getUri().toString(),
1, null, null, MR_UGI, mrConf);
}
public void testWithDFS() throws IOException {
MiniDFSCluster dfs = null;
MiniMRCluster mr = null;
FileSystem fileSys = null;
try {
final int taskTrackers = 4;
JobConf conf = new JobConf();
conf.set(JTConfig.JT_SYSTEM_DIR, "/tmp/custom/mapred/system");
dfs = new MiniDFSCluster.Builder(conf).numDataNodes(4).build();
fileSys = dfs.getFileSystem();
mr = new MiniMRCluster(taskTrackers, fileSys.getUri().toString(), 1, null, null, conf);
runWordCount(mr, mr.createJobConf(), conf.get("mapred.system.dir"));
} finally {
if (dfs != null) { dfs.shutdown(); }
if (mr != null) { mr.shutdown();
}
}
}
@Test
public void testGetClusterStatusWithLocalJobRunner() throws Exception {
Configuration conf = new Configuration();
conf.set(JTConfig.JT_IPC_ADDRESS, MRConfig.LOCAL_FRAMEWORK_NAME);
conf.set(MRConfig.FRAMEWORK_NAME, MRConfig.LOCAL_FRAMEWORK_NAME);
JobClient client = new JobClient(conf);
ClusterStatus clusterStatus = client.getClusterStatus(true);
Collection<String> activeTrackerNames = clusterStatus
.getActiveTrackerNames();
Assert.assertEquals(0, activeTrackerNames.size());
int blacklistedTrackers = clusterStatus.getBlacklistedTrackers();
Assert.assertEquals(0, blacklistedTrackers);
Collection<BlackListInfo> blackListedTrackersInfo = clusterStatus
.getBlackListedTrackersInfo();
Assert.assertEquals(0, blackListedTrackersInfo.size());
}
/**
* Sets the high ram job properties in the simulated job's configuration.
*/
@SuppressWarnings("deprecation")
static void configureHighRamProperties(Configuration sourceConf,
Configuration destConf) {
// set the memory per map task
scaleConfigParameter(sourceConf, destConf,
MRConfig.MAPMEMORY_MB, MRJobConfig.MAP_MEMORY_MB,
MRJobConfig.DEFAULT_MAP_MEMORY_MB);
// validate and fail early
validateTaskMemoryLimits(destConf, MRJobConfig.MAP_MEMORY_MB,
JTConfig.JT_MAX_MAPMEMORY_MB);
// set the memory per reduce task
scaleConfigParameter(sourceConf, destConf,
MRConfig.REDUCEMEMORY_MB, MRJobConfig.REDUCE_MEMORY_MB,
MRJobConfig.DEFAULT_REDUCE_MEMORY_MB);
// validate and fail early
validateTaskMemoryLimits(destConf, MRJobConfig.REDUCE_MEMORY_MB,
JTConfig.JT_MAX_REDUCEMEMORY_MB);
}
@Test
public void testFramework() {
JobConf jobConf = new JobConf();
jobConf.set(JTConfig.JT_IPC_ADDRESS, MRConfig.LOCAL_FRAMEWORK_NAME);
jobConf.set(MRConfig.FRAMEWORK_NAME, MRConfig.YARN_FRAMEWORK_NAME);
assertFalse("Expected 'isLocal' to be false",
StreamUtil.isLocalJobTracker(jobConf));
jobConf.set(JTConfig.JT_IPC_ADDRESS, MRConfig.LOCAL_FRAMEWORK_NAME);
jobConf.set(MRConfig.FRAMEWORK_NAME, MRConfig.CLASSIC_FRAMEWORK_NAME);
assertFalse("Expected 'isLocal' to be false",
StreamUtil.isLocalJobTracker(jobConf));
jobConf.set(JTConfig.JT_IPC_ADDRESS, "jthost:9090");
jobConf.set(MRConfig.FRAMEWORK_NAME, MRConfig.LOCAL_FRAMEWORK_NAME);
assertTrue("Expected 'isLocal' to be true",
StreamUtil.isLocalJobTracker(jobConf));
}
protected String[] genArgs(String jobtracker, String mapper, String reducer)
{
return new String[] {
"-input", INPUT_FILE,
"-output", OUTPUT_DIR,
"-mapper", mapper,
"-reducer", reducer,
"-jobconf", MRJobConfig.NUM_MAPS + "=1",
"-jobconf", MRJobConfig.NUM_REDUCES + "=1",
"-jobconf", MRJobConfig.PRESERVE_FAILED_TASK_FILES + "=true",
"-jobconf", "stream.tmpdir=" + new Path(TEST_ROOT_DIR).toUri().getPath(),
"-jobconf", JTConfig.JT_IPC_ADDRESS + "="+jobtracker,
"-jobconf", "fs.default.name=file:///",
"-jobconf", "mapred.jar=" + TestStreaming.STREAMING_JAR,
"-jobconf", "mapreduce.framework.name=yarn"
};
}
void runStreamJob(TaskType type, boolean isEmptyInput) throws IOException {
boolean mayExit = false;
StreamJob job = new StreamJob(genArgs(
mr.createJobConf().get(JTConfig.JT_IPC_ADDRESS), map, reduce), mayExit);
int returnValue = job.go();
assertEquals(0, returnValue);
// If input to reducer is empty, dummy reporter(which ignores all
// reporting lines) is set for MRErrorThread in waitOutputThreads(). So
// expectedCounterValue is 0 for empty-input-to-reducer case.
// Output of reducer is also empty for empty-input-to-reducer case.
int expectedCounterValue = 0;
if (type == TaskType.MAP || !isEmptyInput) {
validateTaskStatus(job, type);
// output is from "print STDOUT" statements in perl script
validateJobOutput(job.getConf());
expectedCounterValue = 2;
}
validateUserCounter(job, expectedCounterValue);
validateTaskStderr(job, type);
deleteOutDir(fs);
}
static void configureWordCount(FileSystem fs, JobConf conf, String input,
int numMaps, int numReduces, Path inDir, Path outDir) throws IOException {
fs.delete(outDir, true);
if (!fs.mkdirs(inDir)) {
throw new IOException("Mkdirs failed to create " + inDir.toString());
}
DataOutputStream file = fs.create(new Path(inDir, "part-0"));
file.writeBytes(input);
file.close();
FileSystem.setDefaultUri(conf, fs.getUri());
conf.set(JTConfig.FRAMEWORK_NAME, JTConfig.YARN_FRAMEWORK_NAME);
conf.setJobName("wordcount");
conf.setInputFormat(TextInputFormat.class);
// the keys are words (strings)
conf.setOutputKeyClass(Text.class);
// the values are counts (ints)
conf.setOutputValueClass(IntWritable.class);
conf.set("mapred.mapper.class", "testjar.ClassWordCount$MapClass");
conf.set("mapred.combine.class", "testjar.ClassWordCount$Reduce");
conf.set("mapred.reducer.class", "testjar.ClassWordCount$Reduce");
FileInputFormat.setInputPaths(conf, inDir);
FileOutputFormat.setOutputPath(conf, outDir);
conf.setNumMapTasks(numMaps);
conf.setNumReduceTasks(numReduces);
//set the tests jar file
conf.setJarByClass(TestMiniMRClasspath.class);
}
public int run(String[] args) throws Exception {
Configuration conf = getConf();
if ("local".equals(conf.get(JTConfig.JT_IPC_ADDRESS, "local"))) {
displayUsage();
}
String[] otherArgs =
new GenericOptionsParser(conf, args).getRemainingArgs();
if (otherArgs.length == 2) {
if (otherArgs[0].equals("-scratchdir")) {
dir = otherArgs[1];
} else {
displayUsage();
}
}
else if (otherArgs.length == 0) {
dir = System.getProperty("user.dir");
} else {
displayUsage();
}
//to protect against the case of jobs failing even when multiple attempts
//fail, set some high values for the max attempts
conf.setInt(JobContext.MAP_MAX_ATTEMPTS, 10);
conf.setInt(JobContext.REDUCE_MAX_ATTEMPTS, 10);
runSleepJobTest(new JobClient(new JobConf(conf)), conf);
runSortJobTests(new JobClient(new JobConf(conf)), conf);
return 0;
}
@Test
public void testClusterWithJTClientProvider() throws Exception {
Configuration conf = new Configuration();
try {
conf.set(MRConfig.FRAMEWORK_NAME, "classic");
conf.set(JTConfig.JT_IPC_ADDRESS, "local");
new Cluster(conf);
fail("Cluster with classic Framework name should not use "
+ "local JT address");
} catch (IOException e) {
assertTrue(e.getMessage().contains(
"Cannot initialize Cluster. Please check"));
}
}
/**
* @see org.apache.hadoop.mapreduce.protocol.ClientProtocol#getStagingAreaDir()
*/
public String getStagingAreaDir() throws IOException {
Path stagingRootDir = new Path(conf.get(JTConfig.JT_STAGING_AREA_ROOT,
"/tmp/hadoop/mapred/staging"));
UserGroupInformation ugi = UserGroupInformation.getCurrentUser();
String user;
randid = rand.nextInt(Integer.MAX_VALUE);
if (ugi != null) {
user = ugi.getShortUserName() + randid;
} else {
user = "dummy" + randid;
}
return fs.makeQualified(new Path(stagingRootDir, user+"/.staging")).toString();
}
/** Tests using the local job runner. */
public void testLocalJobRunner() throws Exception {
symlinkFile.delete(); // ensure symlink is not present (e.g. if test is
// killed part way through)
Configuration c = new Configuration();
c.set(JTConfig.JT_IPC_ADDRESS, "local");
c.set("fs.defaultFS", "file:///");
testWithConf(c);
assertFalse("Symlink not removed by local job runner",
// Symlink target will have gone so can't use File.exists()
Arrays.asList(new File(".").list()).contains(symlinkFile.getName()));
}
public static void initCluster(Class<?> caller) throws IOException {
Configuration conf = new Configuration();
// conf.set("mapred.queue.names", "default,q1,q2");
conf.set("mapred.queue.names", "default");
conf.set("yarn.scheduler.capacity.root.queues", "default");
conf.set("yarn.scheduler.capacity.root.default.capacity", "100.0");
conf.setBoolean(GRIDMIX_USE_QUEUE_IN_TRACE, false);
conf.set(GRIDMIX_DEFAULT_QUEUE, "default");
dfsCluster = new MiniDFSCluster.Builder(conf).numDataNodes(1).format(true)
.build();// MiniDFSCluster(conf, 3, true, null);
dfs = dfsCluster.getFileSystem();
conf.set(JTConfig.JT_RETIREJOBS, "false");
mrvl = MiniMRClientClusterFactory.create(caller, 2, conf);
conf = mrvl.getConfig();
String[] files = conf.getStrings(MRJobConfig.CACHE_FILES);
if (files != null) {
String[] timestamps = new String[files.length];
for (int i = 0; i < files.length; i++) {
timestamps[i] = Long.toString(System.currentTimeMillis());
}
conf.setStrings(MRJobConfig.CACHE_FILE_TIMESTAMPS, timestamps);
}
}
/**
* Test {@link ClusterSummarizer}.
*/
@Test (timeout=20000)
public void testClusterSummarizer() throws IOException {
ClusterSummarizer cs = new ClusterSummarizer();
Configuration conf = new Configuration();
String jt = "test-jt:1234";
String nn = "test-nn:5678";
conf.set(JTConfig.JT_IPC_ADDRESS, jt);
conf.set(CommonConfigurationKeys.FS_DEFAULT_NAME_KEY, nn);
cs.start(conf);
assertEquals("JT name mismatch", jt, cs.getJobTrackerInfo());
assertEquals("NN name mismatch", nn, cs.getNamenodeInfo());
ClusterStats cStats = ClusterStats.getClusterStats();
conf.set(JTConfig.JT_IPC_ADDRESS, "local");
conf.set(CommonConfigurationKeys.FS_DEFAULT_NAME_KEY, "local");
JobClient jc = new JobClient(conf);
cStats.setClusterMetric(jc.getClusterStatus());
cs.update(cStats);
// test
assertEquals("Cluster summary test failed!", 1, cs.getMaxMapTasks());
assertEquals("Cluster summary test failed!", 1, cs.getMaxReduceTasks());
assertEquals("Cluster summary test failed!", 1, cs.getNumActiveTrackers());
assertEquals("Cluster summary test failed!", 0,
cs.getNumBlacklistedTrackers());
}
/**
* Start the cluster and create input file before running the actual test.
*
* @throws IOException
*/
@Before
public void setUp() throws IOException {
conf = new JobConf();
conf.setBoolean(JTConfig.JT_RETIREJOBS, false);
conf.setBoolean(JTConfig.JT_PERSIST_JOBSTATUS, false);
mr = new MiniMRCluster(1, "file:///", 3, null , null, conf);
Path inFile = new Path(INPUT_FILE);
fs = inFile.getFileSystem(mr.createJobConf());
clean(fs);
buildExpectedJobOutput();
}
static void configureWordCount(FileSystem fs, JobConf conf, String input,
int numMaps, int numReduces, Path inDir, Path outDir) throws IOException {
fs.delete(outDir, true);
if (!fs.mkdirs(inDir)) {
throw new IOException("Mkdirs failed to create " + inDir.toString());
}
DataOutputStream file = fs.create(new Path(inDir, "part-0"));
file.writeBytes(input);
file.close();
FileSystem.setDefaultUri(conf, fs.getUri());
conf.set(JTConfig.FRAMEWORK_NAME, JTConfig.YARN_FRAMEWORK_NAME);
conf.setJobName("wordcount");
conf.setInputFormat(TextInputFormat.class);
// the keys are words (strings)
conf.setOutputKeyClass(Text.class);
// the values are counts (ints)
conf.setOutputValueClass(IntWritable.class);
conf.set("mapred.mapper.class", "testjar.ClassWordCount$MapClass");
conf.set("mapred.combine.class", "testjar.ClassWordCount$Reduce");
conf.set("mapred.reducer.class", "testjar.ClassWordCount$Reduce");
FileInputFormat.setInputPaths(conf, inDir);
FileOutputFormat.setOutputPath(conf, outDir);
conf.setNumMapTasks(numMaps);
conf.setNumReduceTasks(numReduces);
//set the tests jar file
conf.setJarByClass(TestMiniMRClasspath.class);
}
public int run(String[] args) throws Exception {
Configuration conf = getConf();
if ("local".equals(conf.get(JTConfig.JT_IPC_ADDRESS, "local"))) {
displayUsage();
}
String[] otherArgs =
new GenericOptionsParser(conf, args).getRemainingArgs();
if (otherArgs.length == 2) {
if (otherArgs[0].equals("-scratchdir")) {
dir = otherArgs[1];
} else {
displayUsage();
}
}
else if (otherArgs.length == 0) {
dir = System.getProperty("user.dir");
} else {
displayUsage();
}
//to protect against the case of jobs failing even when multiple attempts
//fail, set some high values for the max attempts
conf.setInt(JobContext.MAP_MAX_ATTEMPTS, 10);
conf.setInt(JobContext.REDUCE_MAX_ATTEMPTS, 10);
runSleepJobTest(new JobClient(new JobConf(conf)), conf);
runSortJobTests(new JobClient(new JobConf(conf)), conf);
return 0;
}
@Test
public void testClusterWithJTClientProvider() throws Exception {
Configuration conf = new Configuration();
try {
conf.set(MRConfig.FRAMEWORK_NAME, "classic");
conf.set(JTConfig.JT_IPC_ADDRESS, "local");
new Cluster(conf);
fail("Cluster with classic Framework name should not use "
+ "local JT address");
} catch (IOException e) {
assertTrue(e.getMessage().contains(
"Cannot initialize Cluster. Please check"));
}
}
/**
* @see org.apache.hadoop.mapreduce.protocol.ClientProtocol#getStagingAreaDir()
*/
public String getStagingAreaDir() throws IOException {
Path stagingRootDir = new Path(conf.get(JTConfig.JT_STAGING_AREA_ROOT,
"/tmp/hadoop/mapred/staging"));
UserGroupInformation ugi = UserGroupInformation.getCurrentUser();
String user;
randid = rand.nextInt(Integer.MAX_VALUE);
if (ugi != null) {
user = ugi.getShortUserName() + randid;
} else {
user = "dummy" + randid;
}
return fs.makeQualified(new Path(stagingRootDir, user+"/.staging")).toString();
}
/** Tests using the local job runner. */
public void testLocalJobRunner() throws Exception {
symlinkFile.delete(); // ensure symlink is not present (e.g. if test is
// killed part way through)
Configuration c = new Configuration();
c.set(JTConfig.JT_IPC_ADDRESS, "local");
c.set("fs.defaultFS", "file:///");
testWithConf(c);
assertFalse("Symlink not removed by local job runner",
// Symlink target will have gone so can't use File.exists()
Arrays.asList(new File(".").list()).contains(symlinkFile.getName()));
}
public static void initCluster(Class<?> caller) throws IOException {
Configuration conf = new Configuration();
// conf.set("mapred.queue.names", "default,q1,q2");
conf.set("mapred.queue.names", "default");
conf.set("yarn.scheduler.capacity.root.queues", "default");
conf.set("yarn.scheduler.capacity.root.default.capacity", "100.0");
conf.setBoolean(GRIDMIX_USE_QUEUE_IN_TRACE, false);
conf.set(GRIDMIX_DEFAULT_QUEUE, "default");
dfsCluster = new MiniDFSCluster.Builder(conf).numDataNodes(1).format(true)
.build();// MiniDFSCluster(conf, 3, true, null);
dfs = dfsCluster.getFileSystem();
conf.set(JTConfig.JT_RETIREJOBS, "false");
mrvl = MiniMRClientClusterFactory.create(caller, 2, conf);
conf = mrvl.getConfig();
String[] files = conf.getStrings(MRJobConfig.CACHE_FILES);
if (files != null) {
String[] timestamps = new String[files.length];
for (int i = 0; i < files.length; i++) {
timestamps[i] = Long.toString(System.currentTimeMillis());
}
conf.setStrings(MRJobConfig.CACHE_FILE_TIMESTAMPS, timestamps);
}
}
/**
* Test {@link ClusterSummarizer}.
*/
@Test (timeout=20000)
public void testClusterSummarizer() throws IOException {
ClusterSummarizer cs = new ClusterSummarizer();
Configuration conf = new Configuration();
String jt = "test-jt:1234";
String nn = "test-nn:5678";
conf.set(JTConfig.JT_IPC_ADDRESS, jt);
conf.set(CommonConfigurationKeys.FS_DEFAULT_NAME_KEY, nn);
cs.start(conf);
assertEquals("JT name mismatch", jt, cs.getJobTrackerInfo());
assertEquals("NN name mismatch", nn, cs.getNamenodeInfo());
ClusterStats cStats = ClusterStats.getClusterStats();
conf.set(JTConfig.JT_IPC_ADDRESS, "local");
conf.set(CommonConfigurationKeys.FS_DEFAULT_NAME_KEY, "local");
JobClient jc = new JobClient(conf);
cStats.setClusterMetric(jc.getClusterStatus());
cs.update(cStats);
// test
assertEquals("Cluster summary test failed!", 1, cs.getMaxMapTasks());
assertEquals("Cluster summary test failed!", 1, cs.getMaxReduceTasks());
assertEquals("Cluster summary test failed!", 1, cs.getNumActiveTrackers());
assertEquals("Cluster summary test failed!", 0,
cs.getNumBlacklistedTrackers());
}
/**
* Start the cluster and create input file before running the actual test.
*
* @throws IOException
*/
@Before
public void setUp() throws IOException {
conf = new JobConf();
conf.setBoolean(JTConfig.JT_RETIREJOBS, false);
conf.setBoolean(JTConfig.JT_PERSIST_JOBSTATUS, false);
mr = new MiniMRCluster(1, "file:///", 3, null , null, conf);
Path inFile = new Path(INPUT_FILE);
fs = inFile.getFileSystem(mr.createJobConf());
clean(fs);
buildExpectedJobOutput();
}