下面列出了怎么用org.apache.hadoop.mapred.MiniMRCluster的API类实例代码及写法,或者点击链接到github查看源代码。
@SuppressWarnings("deprecation")
@Override
public void start() throws Exception {
System.setProperty("test.build.data", getDataDir());
LOG.info("test.build.data set to: " + getDataDir());
System.setProperty("hadoop.log.dir", getLogDir());
LOG.info("log dir set to: " + getLogDir());
// Start DFS server
LOG.info("Starting DFS cluster...");
dfsCluster = new MiniDFSCluster(config, 1, true, null);
if (dfsCluster.isClusterUp()) {
LOG.info("Started DFS cluster on port: " + dfsCluster.getNameNodePort());
} else {
LOG.error("Could not start DFS cluster");
}
// Start MR server
LOG.info("Starting MR cluster");
mrCluster = new MiniMRCluster(0, 0, 1, dfsCluster.getFileSystem().getUri()
.toString(), 1, null, null, null, new JobConf(config));
LOG.info("Started MR cluster");
config = prepareConfiguration(mrCluster.createJobConf());
}
@Override
public JobConf obtainJobConf(MiniMRCluster cluster) {
if (cluster == null) return null;
try {
Object runner = cluster.getJobTrackerRunner();
Method meth = runner.getClass().getDeclaredMethod("getJobTracker", emptyParam);
Object tracker = meth.invoke(runner, new Object []{});
Method m = tracker.getClass().getDeclaredMethod("getConf", emptyParam);
return (JobConf) m.invoke(tracker, new Object []{});
} catch (NoSuchMethodException nsme) {
return null;
} catch (InvocationTargetException ite) {
return null;
} catch (IllegalAccessException iae) {
return null;
}
}
@Override
public JobConf obtainJobConf(MiniMRCluster cluster) {
if (cluster == null) return null;
try {
Object runner = cluster.getJobTrackerRunner();
Method meth = runner.getClass().getDeclaredMethod("getJobTracker", emptyParam);
Object tracker = meth.invoke(runner, new Object []{});
Method m = tracker.getClass().getDeclaredMethod("getConf", emptyParam);
return (JobConf) m.invoke(tracker, new Object []{});
} catch (NoSuchMethodException nsme) {
return null;
} catch (InvocationTargetException ite) {
return null;
} catch (IllegalAccessException iae) {
return null;
}
}
/**
* This tests the setting of memory limit for streaming processes.
* This will launch a streaming app which will allocate 10MB memory.
* First, program is launched with sufficient memory. And test expects
* it to succeed. Then program is launched with insufficient memory and
* is expected to be a failure.
*/
public void testCommandLine() {
if (StreamUtil.isCygwin()) {
return;
}
try {
final int numSlaves = 2;
Configuration conf = new Configuration();
dfs = new MiniDFSCluster(conf, numSlaves, true, null);
fs = dfs.getFileSystem();
mr = new MiniMRCluster(numSlaves, fs.getUri().toString(), 1);
writeInputFile(fs, inputPath);
map = StreamUtil.makeJavaCommand(UlimitApp.class, new String[]{});
runProgram(SET_MEMORY_LIMIT);
fs.delete(outputPath, true);
assertFalse("output not cleaned up", fs.exists(outputPath));
mr.waitUntilIdle();
} catch(IOException e) {
fail(e.toString());
} finally {
mr.shutdown();
dfs.shutdown();
}
}
/**
* This tests the setting of memory limit for streaming processes.
* This will launch a streaming app which will allocate 10MB memory.
* First, program is launched with sufficient memory. And test expects
* it to succeed. Then program is launched with insufficient memory and
* is expected to be a failure.
*/
public void testCommandLine() {
if (StreamUtil.isCygwin()) {
return;
}
try {
final int numSlaves = 2;
Configuration conf = new Configuration();
dfs = new MiniDFSCluster(conf, numSlaves, true, null);
fs = dfs.getFileSystem();
mr = new MiniMRCluster(numSlaves, fs.getUri().toString(), 1);
writeInputFile(fs, inputPath);
map = StreamUtil.makeJavaCommand(UlimitApp.class, new String[]{});
runProgram(SET_MEMORY_LIMIT);
fs.delete(outputPath, true);
assertFalse("output not cleaned up", fs.exists(outputPath));
mr.waitUntilIdle();
} catch(IOException e) {
fail(e.toString());
} finally {
mr.shutdown();
dfs.shutdown();
}
}
@Before
public void setup() throws Exception {
user1 = UserGroupInformation.createUserForTesting("alice",
new String[]{"users"});
user2 = UserGroupInformation.createUserForTesting("bob",
new String[]{"users"});
cluster = new MiniMRCluster(0,0,1,"file:///",1);
}
protected void setUp() throws Exception {
super.setUp();
if (System.getProperty("hadoop.log.dir") == null) {
System.setProperty("hadoop.log.dir", "/tmp");
}
int taskTrackers = 2;
int dataNodes = 2;
String proxyUser = System.getProperty("user.name");
String proxyGroup = "g";
StringBuilder sb = new StringBuilder();
sb.append("127.0.0.1,localhost");
for (InetAddress i : InetAddress.getAllByName(InetAddress.getLocalHost().getHostName())) {
sb.append(",").append(i.getCanonicalHostName());
}
JobConf conf = new JobConf();
conf.set("dfs.block.access.token.enable", "false");
conf.set("dfs.permissions", "true");
conf.set("hadoop.security.authentication", "simple");
dfsCluster = new MiniDFSCluster.Builder(conf).numDataNodes(dataNodes)
.build();
FileSystem fileSystem = dfsCluster.getFileSystem();
fileSystem.mkdirs(new Path("/tmp"));
fileSystem.mkdirs(new Path("/user"));
fileSystem.mkdirs(new Path("/hadoop/mapred/system"));
fileSystem.setPermission(new Path("/tmp"), FsPermission.valueOf("-rwxrwxrwx"));
fileSystem.setPermission(new Path("/user"), FsPermission.valueOf("-rwxrwxrwx"));
fileSystem.setPermission(new Path("/hadoop/mapred/system"), FsPermission.valueOf("-rwx------"));
String nnURI = fileSystem.getUri().toString();
int numDirs = 1;
String[] racks = null;
String[] hosts = null;
mrCluster = new MiniMRCluster(0, 0, taskTrackers, nnURI, numDirs, racks, hosts, null, conf);
ProxyUsers.refreshSuperUserGroupsConfiguration(conf);
}
public TestFileArgs() throws IOException
{
// Set up mini cluster
conf = new Configuration();
dfs = new MiniDFSCluster.Builder(conf).build();
fileSys = dfs.getFileSystem();
namenode = fileSys.getUri().getAuthority();
mr = new MiniMRCluster(1, namenode, 1);
map = LS_PATH;
FileSystem.setDefaultUri(conf, "hdfs://" + namenode);
setTestDir(new File("/tmp/TestFileArgs"));
}
/**
* Start the cluster and create input file before running the actual test.
*
* @throws IOException
*/
@Before
public void setUp() throws IOException {
conf = new JobConf();
conf.setBoolean(JTConfig.JT_RETIREJOBS, false);
conf.setBoolean(JTConfig.JT_PERSIST_JOBSTATUS, false);
mr = new MiniMRCluster(1, "file:///", 3, null , null, conf);
Path inFile = new Path(INPUT_FILE);
fs = inFile.getFileSystem(mr.createJobConf());
clean(fs);
buildExpectedJobOutput();
}
@Before
public void setup() throws Exception {
user1 = UserGroupInformation.createUserForTesting("alice",
new String[]{"users"});
user2 = UserGroupInformation.createUserForTesting("bob",
new String[]{"users"});
cluster = new MiniMRCluster(0,0,1,"file:///",1);
}
protected void setUp() throws Exception {
super.setUp();
if (System.getProperty("hadoop.log.dir") == null) {
System.setProperty("hadoop.log.dir", "/tmp");
}
int taskTrackers = 2;
int dataNodes = 2;
String proxyUser = System.getProperty("user.name");
String proxyGroup = "g";
StringBuilder sb = new StringBuilder();
sb.append("127.0.0.1,localhost");
for (InetAddress i : InetAddress.getAllByName(InetAddress.getLocalHost().getHostName())) {
sb.append(",").append(i.getCanonicalHostName());
}
JobConf conf = new JobConf();
conf.set("dfs.block.access.token.enable", "false");
conf.set("dfs.permissions", "true");
conf.set("hadoop.security.authentication", "simple");
dfsCluster = new MiniDFSCluster.Builder(conf).numDataNodes(dataNodes)
.build();
FileSystem fileSystem = dfsCluster.getFileSystem();
fileSystem.mkdirs(new Path("/tmp"));
fileSystem.mkdirs(new Path("/user"));
fileSystem.mkdirs(new Path("/hadoop/mapred/system"));
fileSystem.setPermission(new Path("/tmp"), FsPermission.valueOf("-rwxrwxrwx"));
fileSystem.setPermission(new Path("/user"), FsPermission.valueOf("-rwxrwxrwx"));
fileSystem.setPermission(new Path("/hadoop/mapred/system"), FsPermission.valueOf("-rwx------"));
String nnURI = fileSystem.getUri().toString();
int numDirs = 1;
String[] racks = null;
String[] hosts = null;
mrCluster = new MiniMRCluster(0, 0, taskTrackers, nnURI, numDirs, racks, hosts, null, conf);
ProxyUsers.refreshSuperUserGroupsConfiguration(conf);
}
public TestFileArgs() throws IOException
{
// Set up mini cluster
conf = new Configuration();
dfs = new MiniDFSCluster.Builder(conf).build();
fileSys = dfs.getFileSystem();
namenode = fileSys.getUri().getAuthority();
mr = new MiniMRCluster(1, namenode, 1);
map = LS_PATH;
FileSystem.setDefaultUri(conf, "hdfs://" + namenode);
setTestDir(new File("/tmp/TestFileArgs"));
}
/**
* Start the cluster and create input file before running the actual test.
*
* @throws IOException
*/
@Before
public void setUp() throws IOException {
conf = new JobConf();
conf.setBoolean(JTConfig.JT_RETIREJOBS, false);
conf.setBoolean(JTConfig.JT_PERSIST_JOBSTATUS, false);
mr = new MiniMRCluster(1, "file:///", 3, null , null, conf);
Path inFile = new Path(INPUT_FILE);
fs = inFile.getFileSystem(mr.createJobConf());
clean(fs);
buildExpectedJobOutput();
}
public void start() throws IOException {
File testCluster = new File(WORKING_DIRECTORY);
if (testCluster.exists()) {
FileUtil.deleteDirectory(testCluster);
}
testCluster.mkdirs();
File testClusterData = new File(WORKING_DIRECTORY + "/data");
File testClusterLog = new File(WORKING_DIRECTORY + "/logs");
if (cluster == null) {
conf = new HdfsConfiguration();
conf.set(MiniDFSCluster.HDFS_MINIDFS_BASEDIR,
testClusterData.getAbsolutePath());
cluster = new MiniDFSCluster.Builder(conf).build();
fs = cluster.getFileSystem();
// set mincluster as default config
HdfsUtil.setDefaultConfiguration(conf);
System.setProperty("hadoop.log.dir", testClusterLog.getAbsolutePath());
MiniMRCluster mrCluster = new MiniMRCluster(1, fs.getUri()
.toString(), 1, null, null, new JobConf(conf));
JobConf mrClusterConf = mrCluster.createJobConf();
HdfsUtil.setDefaultConfiguration(new Configuration(mrClusterConf));
System.out.println("------");
JobClient client = new JobClient(mrClusterConf);
ClusterStatus status = client.getClusterStatus(true);
System.out.println(status.getActiveTrackerNames());
}
}
private void startMRCluster() throws IOException {
FileSystem fs = HadoopTestUtils.getTestDFS();
m_mrCluster = new MiniMRCluster(
1, // # of task trackers
fs.getUri().toString(), // name node
1 // # of directories
);
}
@Override
public JobConf obtainJobConf(MiniMRCluster cluster) {
try {
Method meth = MiniMRCluster.class.getMethod("getJobTrackerConf", emptyParam);
return (JobConf) meth.invoke(cluster, new Object []{});
} catch (NoSuchMethodException nsme) {
return null;
} catch (InvocationTargetException ite) {
return null;
} catch (IllegalAccessException iae) {
return null;
}
}
/**
* Starts a <code>MiniMRCluster</code> with a default number of
* <code>TaskTracker</code>'s.
*
* @throws IOException When starting the cluster fails.
*/
public MiniMRCluster startMiniMapReduceCluster() throws IOException {
// Set a very high max-disk-utilization percentage to avoid the NodeManagers from failing.
conf.setIfUnset(
"yarn.nodemanager.disk-health-checker.max-disk-utilization-per-disk-percentage",
"99.0");
startMiniMapReduceCluster(2);
return mrCluster;
}
@Override
public JobConf obtainJobConf(MiniMRCluster cluster) {
try {
Method meth = MiniMRCluster.class.getMethod("getJobTrackerConf", emptyParam);
return (JobConf) meth.invoke(cluster, new Object []{});
} catch (NoSuchMethodException nsme) {
return null;
} catch (InvocationTargetException ite) {
return null;
} catch (IllegalAccessException iae) {
return null;
}
}
@Override
protected void setupMiniDfsAndMrClusters() {
try {
System.setProperty("hadoop.log.dir", "build/test/logs");
final int dataNodes = 4; // There will be 4 data nodes
final int taskTrackers = 4; // There will be 4 task tracker nodes
// Create the dir that holds hadoop-site.xml file
// Delete if hadoop-site.xml exists already
CONF_DIR.mkdirs();
if(CONF_FILE.exists()) {
CONF_FILE.delete();
}
// Builds and starts the mini dfs and mapreduce clusters
Configuration config = new Configuration();
m_dfs = new MiniDFSCluster(config, dataNodes, true, null);
m_fileSys = m_dfs.getFileSystem();
m_mr = new MiniMRCluster(taskTrackers, m_fileSys.getUri().toString(), 1);
// Write the necessary config info to hadoop-site.xml
m_conf = m_mr.createJobConf();
m_conf.setInt(MRConfiguration.SUMIT_REPLICATION, 2);
m_conf.setInt(MRConfiguration.MAP_MAX_ATTEMPTS, 2);
m_conf.setInt(MRConfiguration.REDUCE_MAX_ATTEMPTS, 2);
m_conf.set("dfs.datanode.address", "0.0.0.0:0");
m_conf.set("dfs.datanode.http.address", "0.0.0.0:0");
m_conf.set("pig.jobcontrol.sleep", "100");
m_conf.writeXml(new FileOutputStream(CONF_FILE));
// Set the system properties needed by Pig
System.setProperty("cluster", m_conf.get(MRConfiguration.JOB_TRACKER));
System.setProperty("namenode", m_conf.get("fs.default.name"));
System.setProperty("junit.hadoop.conf", CONF_DIR.getPath());
} catch (IOException e) {
throw new RuntimeException(e);
}
}
public void setUp() throws Exception {
// Read the testConfig.xml file
readTestConfigFile();
// Start up the mini dfs cluster
boolean success = false;
conf = new Configuration();
conf.setClass(PolicyProvider.POLICY_PROVIDER_CONFIG,
HadoopPolicyProvider.class, PolicyProvider.class);
conf.setBoolean(ServiceAuthorizationManager.SERVICE_AUTHORIZATION_CONFIG,
true);
dfsCluster = new MiniDFSCluster(conf, 1, true, null);
namenode = conf.get("fs.default.name", "file:///");
clitestDataDir = new File(TEST_CACHE_DATA_DIR).
toURI().toString().replace(' ', '+');
username = System.getProperty("user.name");
FileSystem fs = dfsCluster.getFileSystem();
assertTrue("Not a HDFS: "+fs.getUri(),
fs instanceof DistributedFileSystem);
dfs = (DistributedFileSystem) fs;
// Start up mini mr cluster
JobConf mrConf = new JobConf(conf);
mrCluster = new MiniMRCluster(1, dfsCluster.getFileSystem().getUri().toString(), 1,
null, null, mrConf);
jobtracker = mrCluster.createJobConf().get("mapred.job.tracker", "local");
success = true;
assertTrue("Error setting up Mini DFS & MR clusters", success);
}
/**
* create mapreduce and dfs clusters
*/
private void createClusters(boolean local) throws Exception {
new File(TEST_DIR).mkdirs(); // Make sure data directory exists
conf = new Configuration();
conf.set("raid.config.file", CONFIG_FILE);
conf.setBoolean("raid.config.reload", true);
conf.setLong("raid.config.reload.interval", RELOAD_INTERVAL);
// scan all policies once every 5 second
conf.setLong("raid.policy.rescan.interval", 5000);
// the RaidNode does the raiding inline (instead of submitting to map/reduce)
if (local) {
conf.set("raid.classname", "org.apache.hadoop.raid.LocalRaidNode");
} else {
conf.set("raid.classname", "org.apache.hadoop.raid.DistRaidNode");
}
// use local block fixer
conf.set("raid.blockfix.classname",
"org.apache.hadoop.raid.LocalBlockIntegrityMonitor");
conf.set("raid.server.address", "localhost:0");
// create a dfs and map-reduce cluster
final int taskTrackers = 4;
dfs = new MiniDFSCluster(conf, 3, true, null);
dfs.waitActive();
fileSys = dfs.getFileSystem();
namenode = fileSys.getUri().toString();
mr = new MiniMRCluster(taskTrackers, namenode, 3);
jobTrackerName = "localhost:" + mr.getJobTrackerPort();
hftp = "hftp://localhost.localdomain:" + dfs.getNameNodePort();
FileSystem.setDefaultUri(conf, namenode);
conf.set("mapred.job.tracker", jobTrackerName);
Utils.loadTestCodecs(conf);
}
private void mySetup(int stripeLength, int timeBeforeHar,
String xorCode, String rsCode, String code) throws Exception {
new File(TEST_DIR).mkdirs(); // Make sure data directory exists
conf = new Configuration();
conf.set("raid.config.file", CONFIG_FILE);
conf.setBoolean("raid.config.reload", true);
conf.setLong("raid.config.reload.interval", RELOAD_INTERVAL);
// scan all policies once every 5 second
conf.setLong("raid.policy.rescan.interval", 5000);
// do not use map-reduce cluster for Raiding
conf.set("raid.classname", "org.apache.hadoop.raid.LocalRaidNode");
conf.set("raid.server.address", "localhost:0");
Utils.loadTestCodecs(conf, stripeLength, stripeLength, 1, 3, "/destraid",
"/destraidrs", true, xorCode, rsCode,
false);
conf.setBoolean("dfs.permissions", false);
dfsCluster = new MiniDFSCluster(conf, NUM_DATANODES, true, null);
dfsCluster.waitActive();
fileSys = dfsCluster.getFileSystem();
namenode = fileSys.getUri().toString();
FileSystem.setDefaultUri(conf, namenode);
mr = new MiniMRCluster(4, namenode, 3);
jobTrackerName = "localhost:" + mr.getJobTrackerPort();
hftp = "hftp://localhost.localdomain:" + dfsCluster.getNameNodePort();
FileSystem.setDefaultUri(conf, namenode);
conf.set("mapred.job.tracker", jobTrackerName);
ConfigBuilder cb = new ConfigBuilder(CONFIG_FILE);
cb.addPolicy("RaidTest1", "/user/dhruba/raidtest", 1, 1, code);
cb.persist();
}
/**
* Test distRaid command
* @throws Exception
*/
public void testDistRaid() throws Exception {
LOG.info("TestDist started.");
// create a dfs and map-reduce cluster
mySetup(3, -1);
MiniMRCluster mr = new MiniMRCluster(4, namenode, 3);
String jobTrackerName = "localhost:" + mr.getJobTrackerPort();
conf.set("mapred.job.tracker", jobTrackerName);
try {
// Create files to be raided
TestRaidNode.createTestFiles(fileSys, RAID_SRC_PATH,
"/raid" + RAID_SRC_PATH, 1, 3, (short)3);
String subDir = RAID_SRC_PATH + "/subdir";
TestRaidNode.createTestFiles(
fileSys, subDir, "/raid" + subDir, 1, 3, (short)3);
// Create RaidShell and raid the files.
RaidShell shell = new RaidShell(conf);
String[] args = new String[3];
args[0] = "-distRaid";
args[1] = RAID_POLICY_NAME;
args[2] = RAID_SRC_PATH;
assertEquals(0, ToolRunner.run(shell, args));
// Check files are raided
checkIfFileRaided(new Path(RAID_SRC_PATH, "file0"));
checkIfFileRaided(new Path(subDir, "file0"));
} finally {
mr.shutdown();
myTearDown();
}
}
@BeforeClass
public static void initCluster() throws IOException {
Configuration conf = new Configuration();
dfsCluster = new MiniDFSCluster(conf, 3, true, null);
dfs = dfsCluster.getFileSystem();
mrCluster = new MiniMRCluster(3, dfs.getUri().toString(), 1, null, null,
new JobConf(conf));
}
protected void setUp() throws Exception {
super.setUp();
dfscluster = new MiniDFSCluster(new JobConf(), 2, true, null);
fs = dfscluster.getFileSystem();
mapred = new MiniMRCluster(2, fs.getUri().toString(), 1);
inputPath = new Path(fs.getHomeDirectory(), "test");
filea = new Path(inputPath,"a");
fileb = new Path(inputPath,"b");
filec = new Path(inputPath,"c");
archivePath = new Path(fs.getHomeDirectory(), "tmp");
}
public void setUp() throws Exception {
// Read the testConfig.xml file
readTestConfigFile();
// Start up the mini dfs cluster
boolean success = false;
conf = new Configuration();
conf.setClass(PolicyProvider.POLICY_PROVIDER_CONFIG,
HadoopPolicyProvider.class, PolicyProvider.class);
conf.setBoolean(ServiceAuthorizationManager.SERVICE_AUTHORIZATION_CONFIG,
true);
dfsCluster = new MiniDFSCluster(conf, 1, true, null);
namenode = conf.get("fs.default.name", "file:///");
clitestDataDir = new File(TEST_CACHE_DATA_DIR).
toURI().toString().replace(' ', '+');
username = System.getProperty("user.name");
FileSystem fs = dfsCluster.getFileSystem();
assertTrue("Not a HDFS: "+fs.getUri(),
fs instanceof DistributedFileSystem);
dfs = (DistributedFileSystem) fs;
// Start up mini mr cluster
JobConf mrConf = new JobConf(conf);
mrCluster = new MiniMRCluster(1, dfsCluster.getFileSystem().getUri().toString(), 1,
null, null, mrConf);
jobtracker = mrCluster.createJobConf().get("mapred.job.tracker", "local");
success = true;
assertTrue("Error setting up Mini DFS & MR clusters", success);
}
protected void setUp() throws Exception {
super.setUp();
if (System.getProperty("hadoop.log.dir") == null) {
System.setProperty("hadoop.log.dir", "/tmp");
}
int taskTrackers = 2;
int dataNodes = 2;
String proxyUser = System.getProperty("user.name");
String proxyGroup = "g";
StringBuilder sb = new StringBuilder();
sb.append("127.0.0.1,localhost");
for (InetAddress i : InetAddress.getAllByName(InetAddress.getLocalHost().getHostName())) {
sb.append(",").append(i.getCanonicalHostName());
}
JobConf conf = new JobConf();
conf.set("dfs.block.access.token.enable", "false");
conf.set("dfs.permissions", "true");
conf.set("hadoop.security.authentication", "simple");
conf.set("hadoop.proxyuser." + proxyUser + ".hosts", sb.toString());
conf.set("hadoop.proxyuser." + proxyUser + ".groups", proxyGroup);
String[] userGroups = new String[]{proxyGroup};
UserGroupInformation.createUserForTesting(proxyUser, userGroups);
UserGroupInformation.createUserForTesting("u1", userGroups);
UserGroupInformation.createUserForTesting("u2", new String[]{"gg"});
dfsCluster = new MiniDFSCluster.Builder(conf).numDataNodes(dataNodes)
.build();
FileSystem fileSystem = dfsCluster.getFileSystem();
fileSystem.mkdirs(new Path("/tmp"));
fileSystem.mkdirs(new Path("/user"));
fileSystem.mkdirs(new Path("/hadoop/mapred/system"));
fileSystem.setPermission(new Path("/tmp"), FsPermission.valueOf("-rwxrwxrwx"));
fileSystem.setPermission(new Path("/user"), FsPermission.valueOf("-rwxrwxrwx"));
fileSystem.setPermission(new Path("/hadoop/mapred/system"), FsPermission.valueOf("-rwx------"));
String nnURI = fileSystem.getUri().toString();
int numDirs = 1;
String[] racks = null;
String[] hosts = null;
mrCluster = new MiniMRCluster(0, 0, taskTrackers, nnURI, numDirs, racks, hosts, null, conf);
ProxyUsers.refreshSuperUserGroupsConfiguration(conf);
}
@Before
public void setUpJobTracker() throws IOException, InterruptedException {
cluster = new MiniMRCluster(0, "file:///", 1);
conf = cluster.createJobConf();
}
protected void setUp() throws Exception {
super.setUp();
if (System.getProperty("hadoop.log.dir") == null) {
System.setProperty("hadoop.log.dir", "/tmp");
}
int taskTrackers = 2;
int dataNodes = 2;
String proxyUser = System.getProperty("user.name");
String proxyGroup = "g";
StringBuilder sb = new StringBuilder();
sb.append("127.0.0.1,localhost");
for (InetAddress i : InetAddress.getAllByName(InetAddress.getLocalHost().getHostName())) {
sb.append(",").append(i.getCanonicalHostName());
}
JobConf conf = new JobConf();
conf.set("dfs.block.access.token.enable", "false");
conf.set("dfs.permissions", "true");
conf.set("hadoop.security.authentication", "simple");
conf.set("hadoop.proxyuser." + proxyUser + ".hosts", sb.toString());
conf.set("hadoop.proxyuser." + proxyUser + ".groups", proxyGroup);
String[] userGroups = new String[]{proxyGroup};
UserGroupInformation.createUserForTesting(proxyUser, userGroups);
UserGroupInformation.createUserForTesting("u1", userGroups);
UserGroupInformation.createUserForTesting("u2", new String[]{"gg"});
dfsCluster = new MiniDFSCluster.Builder(conf).numDataNodes(dataNodes)
.build();
FileSystem fileSystem = dfsCluster.getFileSystem();
fileSystem.mkdirs(new Path("/tmp"));
fileSystem.mkdirs(new Path("/user"));
fileSystem.mkdirs(new Path("/hadoop/mapred/system"));
fileSystem.setPermission(new Path("/tmp"), FsPermission.valueOf("-rwxrwxrwx"));
fileSystem.setPermission(new Path("/user"), FsPermission.valueOf("-rwxrwxrwx"));
fileSystem.setPermission(new Path("/hadoop/mapred/system"), FsPermission.valueOf("-rwx------"));
String nnURI = fileSystem.getUri().toString();
int numDirs = 1;
String[] racks = null;
String[] hosts = null;
mrCluster = new MiniMRCluster(0, 0, taskTrackers, nnURI, numDirs, racks, hosts, null, conf);
ProxyUsers.refreshSuperUserGroupsConfiguration(conf);
}
@Before
public void setUpJobTracker() throws IOException, InterruptedException {
cluster = new MiniMRCluster(0, "file:///", 1);
conf = cluster.createJobConf();
}