下面列出了怎么用org.apache.hadoop.mapreduce.v2.MiniMRYarnCluster的API类实例代码及写法,或者点击链接到github查看源代码。
@BeforeClass
public static void setUp() throws Exception {
final Configuration conf = new Configuration();
conf.set(MRConfig.FRAMEWORK_NAME, MRConfig.YARN_FRAMEWORK_NAME);
conf.set(YarnConfiguration.RM_PRINCIPAL, "jt_id/" + SecurityUtil.HOSTNAME_PATTERN + "@APACHE.ORG");
final MiniDFSCluster.Builder builder = new MiniDFSCluster.Builder(conf);
builder.checkExitOnShutdown(true);
builder.numDataNodes(numSlaves);
builder.format(true);
builder.racks(null);
dfsCluster = builder.build();
mrCluster = new MiniMRYarnCluster(TestBinaryTokenFile.class.getName(), noOfNMs);
mrCluster.init(conf);
mrCluster.start();
NameNodeAdapter.getDtSecretManager(dfsCluster.getNamesystem()).startThreads();
FileSystem fs = dfsCluster.getFileSystem();
p1 = new Path("file1");
p1 = fs.makeQualified(p1);
}
/**
* To ensure nothing broken after we removed normalization
* from the MRAM side
* @throws Exception
*/
@Test
public void testJobWithNonNormalizedCapabilities() throws Exception {
if (!(new File(MiniMRYarnCluster.APPJAR)).exists()) {
LOG.info("MRAppJar " + MiniMRYarnCluster.APPJAR
+ " not found. Not running test.");
return;
}
JobConf jobConf = new JobConf(mrCluster.getConfig());
jobConf.setInt("mapreduce.map.memory.mb", 700);
jobConf.setInt("mapred.reduce.memory.mb", 1500);
SleepJob sleepJob = new SleepJob();
sleepJob.setConf(jobConf);
Job job = sleepJob.createJob(3, 2, 1000, 1, 500, 1);
job.setJarByClass(SleepJob.class);
job.addFileToClassPath(APP_JAR); // The AppMaster jar itself.
job.submit();
boolean completed = job.waitForCompletion(true);
Assert.assertTrue("Job should be completed", completed);
Assert.assertEquals("Job should be finished successfully",
JobStatus.State.SUCCEEDED, job.getJobState());
}
@BeforeClass
public static void setUp() throws Exception {
final Configuration conf = new Configuration();
conf.set(MRConfig.FRAMEWORK_NAME, MRConfig.YARN_FRAMEWORK_NAME);
conf.set(YarnConfiguration.RM_PRINCIPAL, "jt_id/" + SecurityUtil.HOSTNAME_PATTERN + "@APACHE.ORG");
final MiniDFSCluster.Builder builder = new MiniDFSCluster.Builder(conf);
builder.checkExitOnShutdown(true);
builder.numDataNodes(numSlaves);
builder.format(true);
builder.racks(null);
dfsCluster = builder.build();
mrCluster = new MiniMRYarnCluster(TestBinaryTokenFile.class.getName(), noOfNMs);
mrCluster.init(conf);
mrCluster.start();
NameNodeAdapter.getDtSecretManager(dfsCluster.getNamesystem()).startThreads();
FileSystem fs = dfsCluster.getFileSystem();
p1 = new Path("file1");
p1 = fs.makeQualified(p1);
}
/**
* To ensure nothing broken after we removed normalization
* from the MRAM side
* @throws Exception
*/
@Test
public void testJobWithNonNormalizedCapabilities() throws Exception {
if (!(new File(MiniMRYarnCluster.APPJAR)).exists()) {
LOG.info("MRAppJar " + MiniMRYarnCluster.APPJAR
+ " not found. Not running test.");
return;
}
JobConf jobConf = new JobConf(mrCluster.getConfig());
jobConf.setInt("mapreduce.map.memory.mb", 700);
jobConf.setInt("mapred.reduce.memory.mb", 1500);
SleepJob sleepJob = new SleepJob();
sleepJob.setConf(jobConf);
Job job = sleepJob.createJob(3, 2, 1000, 1, 500, 1);
job.setJarByClass(SleepJob.class);
job.addFileToClassPath(APP_JAR); // The AppMaster jar itself.
job.submit();
boolean completed = job.waitForCompletion(true);
Assert.assertTrue("Job should be completed", completed);
Assert.assertEquals("Job should be finished successfully",
JobStatus.State.SUCCEEDED, job.getJobState());
}
@After
public void tearDown() {
if (!(new File(MiniMRYarnCluster.APPJAR)).exists()) {
LOG.info("MRAppJar " + MiniMRYarnCluster.APPJAR
+ " not found. Not running test.");
return;
}
if (mrCluster != null) {
mrCluster.stop();
}
}
private void stopMiniMRYarnCluster(MiniMRYarnCluster miniMRYarnCluster) {
try {
if (miniMRYarnCluster != null)
miniMRYarnCluster.stop();
} catch (Exception ignored) {
// nothing we can do
ignored.printStackTrace();
}
}
@BeforeClass
public static void setupMiniMRCluster() {
getConfiguration().set("fs.s3.impl", MockS3FileSystem.class.getName());
S3_OUTPUT_PATH = new Path("s3://bucket-name/output/path");
MR_CLUSTER = new MiniMRYarnCluster(
"test-s3-multipart-output-committer", 2);
MR_CLUSTER.init(getConfiguration());
MR_CLUSTER.start();
}
@After
public void tearDown() {
if (!(new File(MiniMRYarnCluster.APPJAR)).exists()) {
LOG.info("MRAppJar " + MiniMRYarnCluster.APPJAR
+ " not found. Not running test.");
return;
}
if (mrCluster != null) {
mrCluster.stop();
}
}
private void stopMiniMRYarnCluster(MiniMRYarnCluster miniMRYarnCluster) {
try {
if (miniMRYarnCluster != null)
miniMRYarnCluster.stop();
} catch (Exception ignored) {
// nothing we can do
ignored.printStackTrace();
}
}
@Override
public void start() throws Exception {
LOG.info("MR: Starting MiniMRYarnCluster");
configure();
miniMRYarnCluster = new MiniMRYarnCluster(testName, numNodeManagers);
miniMRYarnCluster.serviceInit(configuration);
miniMRYarnCluster.init(configuration);
miniMRYarnCluster.start();
}
public MiniMRYarnClusterAdapter(MiniMRYarnCluster miniMRYarnCluster) {
this.miniMRYarnCluster = miniMRYarnCluster;
}
public MiniMRYarnClusterAdapter(MiniMRYarnCluster miniMRYarnCluster) {
this.miniMRYarnCluster = miniMRYarnCluster;
}
public static synchronized Configuration getInstanceConfig() throws Exception {
if (conf == null) {
File zooRoot = File.createTempFile("hbase-zookeeper", "");
zooRoot.delete();
ZooKeeperServer zookeper = new ZooKeeperServer(zooRoot, zooRoot, 2000);
ServerCnxnFactory factory = ServerCnxnFactory.createFactory(new InetSocketAddress("localhost", 0), 5000);
factory.startup(zookeper);
YarnConfiguration yconf = new YarnConfiguration();
String argLine = System.getProperty("argLine");
if (argLine != null) {
yconf.set("yarn.app.mapreduce.am.command-opts", argLine.replace("jacoco.exec", "jacocoMR.exec"));
}
yconf.setBoolean(MRConfig.MAPREDUCE_MINICLUSTER_CONTROL_RESOURCE_MONITORING, false);
yconf.setClass(YarnConfiguration.RM_SCHEDULER, FifoScheduler.class, ResourceScheduler.class);
MiniMRYarnCluster miniCluster = new MiniMRYarnCluster("testCluster");
miniCluster.init(yconf);
String resourceManagerLink = yconf.get(YarnConfiguration.RM_ADDRESS);
yconf.setBoolean(MRJobConfig.JOB_UBERTASK_ENABLE, true);
miniCluster.start();
miniCluster.waitForNodeManagersToConnect(10000);
// following condition set in MiniYarnCluster:273
while (resourceManagerLink.endsWith(":0")) {
Thread.sleep(100);
resourceManagerLink = yconf.get(YarnConfiguration.RM_ADDRESS);
}
File hbaseRoot = File.createTempFile("hbase-root", "");
hbaseRoot.delete();
conf = HBaseConfiguration.create(miniCluster.getConfig());
conf.set(HConstants.HBASE_DIR, hbaseRoot.toURI().toURL().toString());
conf.setInt(HConstants.ZOOKEEPER_CLIENT_PORT, factory.getLocalPort());
conf.set("hbase.master.hostname", "localhost");
conf.set("hbase.regionserver.hostname", "localhost");
conf.setInt("hbase.master.info.port", -1);
conf.set("hbase.fs.tmp.dir", new File(System.getProperty("java.io.tmpdir")).toURI().toURL().toString());
LocalHBaseCluster cluster = new LocalHBaseCluster(conf);
cluster.startup();
}
return new Configuration(conf);
}
@Override
protected void setupMiniDfsAndMrClusters() {
try {
final int dataNodes = 4; // There will be 4 data nodes
final int taskTrackers = 4; // There will be 4 task tracker nodes
System.setProperty("hadoop.log.dir", "build/test/logs");
// Create the dir that holds hadoop-site.xml file
// Delete if hadoop-site.xml exists already
CONF_DIR.mkdirs();
if(CONF_FILE.exists()) {
CONF_FILE.delete();
}
// Builds and starts the mini dfs and mapreduce clusters
Configuration config = new Configuration();
config.set("yarn.scheduler.capacity.root.queues", "default");
config.set("yarn.scheduler.capacity.root.default.capacity", "100");
m_dfs = new MiniDFSCluster(config, dataNodes, true, null);
m_fileSys = m_dfs.getFileSystem();
m_dfs_conf = m_dfs.getConfiguration(0);
//Create user home directory
m_fileSys.mkdirs(m_fileSys.getWorkingDirectory());
m_mr = new MiniMRYarnCluster("PigMiniCluster", taskTrackers);
m_mr.init(m_dfs_conf);
m_mr.start();
// Write the necessary config info to hadoop-site.xml
m_mr_conf = new Configuration(m_mr.getConfig());
m_conf = m_mr_conf;
m_conf.set("fs.default.name", m_dfs_conf.get("fs.default.name"));
m_conf.unset(MRConfiguration.JOB_CACHE_FILES);
m_conf.setInt(MRConfiguration.IO_SORT_MB, 200);
m_conf.set(MRConfiguration.CHILD_JAVA_OPTS, "-Xmx512m");
m_conf.setInt(MRConfiguration.SUMIT_REPLICATION, 2);
m_conf.setInt(MRConfiguration.MAP_MAX_ATTEMPTS, 2);
m_conf.setInt(MRConfiguration.REDUCE_MAX_ATTEMPTS, 2);
m_conf.set("dfs.datanode.address", "0.0.0.0:0");
m_conf.set("dfs.datanode.http.address", "0.0.0.0:0");
m_conf.set("pig.jobcontrol.sleep", "100");
m_conf.writeXml(new FileOutputStream(CONF_FILE));
m_fileSys.copyFromLocalFile(new Path(CONF_FILE.getAbsoluteFile().toString()),
new Path("/pigtest/conf/hadoop-site.xml"));
DistributedCache.addFileToClassPath(new Path("/pigtest/conf/hadoop-site.xml"), m_conf);
System.err.println("XXX: Setting fs.default.name to: " + m_dfs_conf.get("fs.default.name"));
// Set the system properties needed by Pig
System.setProperty("cluster", m_conf.get(MRConfiguration.JOB_TRACKER));
//System.setProperty("namenode", m_dfs_conf.get("fs.default.name"));
System.setProperty("namenode", m_conf.get("fs.default.name"));
System.setProperty("junit.hadoop.conf", CONF_DIR.getPath());
} catch (IOException e) {
throw new RuntimeException(e);
}
}