下面列出了org.apache.hadoop.mapred.MiniMRCluster#createJobConf ( ) 实例代码,或者点击链接到github查看源代码,也可以在右侧发表评论。
public void start() throws IOException {
File testCluster = new File(WORKING_DIRECTORY);
if (testCluster.exists()) {
FileUtil.deleteDirectory(testCluster);
}
testCluster.mkdirs();
File testClusterData = new File(WORKING_DIRECTORY + "/data");
File testClusterLog = new File(WORKING_DIRECTORY + "/logs");
if (cluster == null) {
conf = new HdfsConfiguration();
conf.set(MiniDFSCluster.HDFS_MINIDFS_BASEDIR,
testClusterData.getAbsolutePath());
cluster = new MiniDFSCluster.Builder(conf).build();
fs = cluster.getFileSystem();
// set mincluster as default config
HdfsUtil.setDefaultConfiguration(conf);
System.setProperty("hadoop.log.dir", testClusterLog.getAbsolutePath());
MiniMRCluster mrCluster = new MiniMRCluster(1, fs.getUri()
.toString(), 1, null, null, new JobConf(conf));
JobConf mrClusterConf = mrCluster.createJobConf();
HdfsUtil.setDefaultConfiguration(new Configuration(mrClusterConf));
System.out.println("------");
JobClient client = new JobClient(mrClusterConf);
ClusterStatus status = client.getClusterStatus(true);
System.out.println(status.getActiveTrackerNames());
}
}
@Override
protected void setupMiniDfsAndMrClusters() {
try {
System.setProperty("hadoop.log.dir", "build/test/logs");
final int dataNodes = 4; // There will be 4 data nodes
final int taskTrackers = 4; // There will be 4 task tracker nodes
// Create the dir that holds hadoop-site.xml file
// Delete if hadoop-site.xml exists already
CONF_DIR.mkdirs();
if(CONF_FILE.exists()) {
CONF_FILE.delete();
}
// Builds and starts the mini dfs and mapreduce clusters
Configuration config = new Configuration();
m_dfs = new MiniDFSCluster(config, dataNodes, true, null);
m_fileSys = m_dfs.getFileSystem();
m_mr = new MiniMRCluster(taskTrackers, m_fileSys.getUri().toString(), 1);
// Write the necessary config info to hadoop-site.xml
m_conf = m_mr.createJobConf();
m_conf.setInt(MRConfiguration.SUMIT_REPLICATION, 2);
m_conf.setInt(MRConfiguration.MAP_MAX_ATTEMPTS, 2);
m_conf.setInt(MRConfiguration.REDUCE_MAX_ATTEMPTS, 2);
m_conf.set("dfs.datanode.address", "0.0.0.0:0");
m_conf.set("dfs.datanode.http.address", "0.0.0.0:0");
m_conf.set("pig.jobcontrol.sleep", "100");
m_conf.writeXml(new FileOutputStream(CONF_FILE));
// Set the system properties needed by Pig
System.setProperty("cluster", m_conf.get(MRConfiguration.JOB_TRACKER));
System.setProperty("namenode", m_conf.get("fs.default.name"));
System.setProperty("junit.hadoop.conf", CONF_DIR.getPath());
} catch (IOException e) {
throw new RuntimeException(e);
}
}
@Before
public void setUpJobTracker() throws IOException, InterruptedException {
cluster = new MiniMRCluster(0, "file:///", 1);
conf = cluster.createJobConf();
}
@Before
public void setUpJobTracker() throws IOException, InterruptedException {
cluster = new MiniMRCluster(0, "file:///", 1);
conf = cluster.createJobConf();
}
protected void startHiveMiniCluster() {
//Create and configure location for hive to dump junk in target folder
try {
FileUtils.forceMkdir(HIVE_BASE_DIR);
FileUtils.forceMkdir(HIVE_SCRATCH_DIR);
FileUtils.forceMkdir(HIVE_LOCAL_SCRATCH_DIR);
FileUtils.forceMkdir(HIVE_LOGS_DIR);
FileUtils.forceMkdir(HIVE_TMP_DIR);
FileUtils.forceMkdir(HIVE_WAREHOUSE_DIR);
FileUtils.forceMkdir(HIVE_HADOOP_TMP_DIR);
FileUtils.forceMkdir(HIVE_TESTDATA_DIR);
} catch (IOException e1) {
e1.printStackTrace();
System.exit(1);
}
System.setProperty("javax.jdo.option.ConnectionURL", "jdbc:derby:;databaseName=" + HIVE_METADB_DIR.getAbsolutePath() + ";create=true");
System.setProperty("hive.metastore.warehouse.dir", HIVE_WAREHOUSE_DIR.getAbsolutePath());
System.setProperty("hive.exec.scratchdir", HIVE_SCRATCH_DIR.getAbsolutePath());
System.setProperty("hive.exec.local.scratchdir", HIVE_LOCAL_SCRATCH_DIR.getAbsolutePath());
System.setProperty("hive.metastore.metadb.dir", HIVE_METADB_DIR.getAbsolutePath());
System.setProperty("test.log.dir", HIVE_LOGS_DIR.getAbsolutePath());
System.setProperty("hive.querylog.location", HIVE_TMP_DIR.getAbsolutePath());
System.setProperty("hadoop.tmp.dir", HIVE_HADOOP_TMP_DIR.getAbsolutePath());
System.setProperty("derby.stream.error.file", HIVE_BASE_DIR.getAbsolutePath() + "/derby.log");
// custom properties
System.setProperty("hive.server2.long.polling.timeout", "5000");
HiveConf conf = new HiveConf();
/* Build MiniDFSCluster */
try {
miniDFS = new MiniDFSCluster.Builder(conf).build();
/* Build MiniMR Cluster */
int numTaskTrackers = 1;
int numTaskTrackerDirectories = 1;
String[] racks = null;
String[] hosts = null;
miniMR = new MiniMRCluster(numTaskTrackers, miniDFS.getFileSystem().getUri().toString(), numTaskTrackerDirectories, racks, hosts, new JobConf(conf));
JobConf jobConf = miniMR.createJobConf(new JobConf(conf));
System.out.println("-------" + jobConf.get("fs.defaultFS"));
System.out.println("-------" + miniDFS.getFileSystem().getUri().toString());
System.setProperty("mapred.job.tracker", jobConf.get("mapred.job.tracker"));
} catch (IOException e) {
e.printStackTrace();
System.exit(1);
}
}
@Override
protected void setupMiniDfsAndMrClusters() {
try {
System.setProperty("hadoop.log.dir", "build/test/logs");
final int dataNodes = 4; // There will be 4 data nodes
final int taskTrackers = 4; // There will be 4 task tracker nodes
// Create the configuration hadoop-site.xml file
File conf_dir = new File("build/classes/");
conf_dir.mkdirs();
File conf_file = new File(conf_dir, "hadoop-site.xml");
conf_file.delete();
// Builds and starts the mini dfs and mapreduce clusters
Configuration config = new Configuration();
m_dfs = new MiniDFSCluster(config, dataNodes, true, null);
m_fileSys = m_dfs.getFileSystem();
m_mr = new MiniMRCluster(taskTrackers, m_fileSys.getUri().toString(), 1);
// Write the necessary config info to hadoop-site.xml
m_conf = m_mr.createJobConf();
m_conf.setInt("mapred.submit.replication", 2);
m_conf.set("dfs.datanode.address", "0.0.0.0:0");
m_conf.set("dfs.datanode.http.address", "0.0.0.0:0");
m_conf.set("mapred.map.max.attempts", "2");
m_conf.set("mapred.reduce.max.attempts", "2");
m_conf.set("pig.jobcontrol.sleep", "100");
try (OutputStream os = new FileOutputStream(conf_file))
{
m_conf.writeXml(os);
}
// Set the system properties needed by Pig
System.setProperty("cluster", m_conf.get("mapred.job.tracker"));
System.setProperty("namenode", m_conf.get("fs.default.name"));
System.setProperty("junit.hadoop.conf", conf_dir.getPath());
} catch (IOException e) {
throw new RuntimeException(e);
}
}
private void runProgram(MiniMRCluster mr, MiniDFSCluster dfs,
Path program, Path inputPath, Path outputPath,
int numMaps, int numReduces, String[] expectedResults
) throws IOException {
Path wordExec = new Path("/testing/bin/application");
JobConf job = mr.createJobConf();
job.setNumMapTasks(numMaps);
job.setNumReduceTasks(numReduces);
{
FileSystem fs = dfs.getFileSystem();
fs.delete(wordExec.getParent(), true);
fs.copyFromLocalFile(program, wordExec);
Submitter.setExecutable(job, fs.makeQualified(wordExec).toString());
Submitter.setIsJavaRecordReader(job, true);
Submitter.setIsJavaRecordWriter(job, true);
FileInputFormat.setInputPaths(job, inputPath);
FileOutputFormat.setOutputPath(job, outputPath);
RunningJob rJob = null;
if (numReduces == 0) {
rJob = Submitter.jobSubmit(job);
while (!rJob.isComplete()) {
try {
Thread.sleep(1000);
} catch (InterruptedException ie) {
throw new RuntimeException(ie);
}
}
} else {
rJob = Submitter.runJob(job);
}
assertTrue("pipes job failed", rJob.isSuccessful());
Counters counters = rJob.getCounters();
Counters.Group wordCountCounters = counters.getGroup("WORDCOUNT");
int numCounters = 0;
for (Counter c : wordCountCounters) {
System.out.println(c);
++numCounters;
}
assertTrue("No counters found!", (numCounters > 0));
}
List<String> results = new ArrayList<String>();
for (Path p:FileUtil.stat2Paths(dfs.getFileSystem().listStatus(outputPath,
new OutputLogFilter()))) {
results.add(TestMiniMRWithDFS.readOutput(p, job));
}
assertEquals("number of reduces is wrong",
expectedResults.length, results.size());
for(int i=0; i < results.size(); i++) {
assertEquals("pipes program " + program + " output " + i + " wrong",
expectedResults[i], results.get(i));
}
}
/**
* Run a map/reduce word count that does all of the map input and reduce
* output directly rather than sending it back up to Java.
* @param mr The mini mr cluster
* @param dfs the dfs cluster
* @param program the program to run
* @throws IOException
*/
private void runNonPipedProgram(MiniMRCluster mr, MiniDFSCluster dfs,
Path program) throws IOException {
JobConf job = mr.createJobConf();
job.setInputFormat(WordCountInputFormat.class);
FileSystem local = FileSystem.getLocal(job);
Path testDir = new Path("file:" + System.getProperty("test.build.data"),
"pipes");
Path inDir = new Path(testDir, "input");
Path outDir = new Path(testDir, "output");
Path wordExec = new Path("/testing/bin/application");
Path jobXml = new Path(testDir, "job.xml");
{
FileSystem fs = dfs.getFileSystem();
fs.delete(wordExec.getParent(), true);
fs.copyFromLocalFile(program, wordExec);
}
DataOutputStream out = local.create(new Path(inDir, "part0"));
out.writeBytes("i am a silly test\n");
out.writeBytes("you are silly\n");
out.writeBytes("i am a cat test\n");
out.writeBytes("you is silly\n");
out.writeBytes("i am a billy test\n");
out.writeBytes("hello are silly\n");
out.close();
out = local.create(new Path(inDir, "part1"));
out.writeBytes("mall world things drink java\n");
out.writeBytes("hall silly cats drink java\n");
out.writeBytes("all dogs bow wow\n");
out.writeBytes("hello drink java\n");
out.close();
local.delete(outDir, true);
local.mkdirs(outDir);
out = local.create(jobXml);
job.writeXml(out);
out.close();
System.err.println("About to run: Submitter -conf " + jobXml +
" -input " + inDir + " -output " + outDir +
" -program " +
dfs.getFileSystem().makeQualified(wordExec));
try {
int ret = ToolRunner.run(new Submitter(),
new String[]{"-conf", jobXml.toString(),
"-input", inDir.toString(),
"-output", outDir.toString(),
"-program",
dfs.getFileSystem().makeQualified(wordExec).toString(),
"-reduces", "2"});
assertEquals(0, ret);
} catch (Exception e) {
assertTrue("got exception: " + StringUtils.stringifyException(e), false);
}
}