下面列出了org.apache.hadoop.mapreduce.filecache.DistributedCache#addFileToClassPath ( ) 实例代码,或者点击链接到github查看源代码,也可以在右侧发表评论。
static boolean runJob(JobConf conf, Path inDir, Path outDir, int numMaps,
int numReds) throws IOException, InterruptedException {
FileSystem fs = FileSystem.get(conf);
if (fs.exists(outDir)) {
fs.delete(outDir, true);
}
if (!fs.exists(inDir)) {
fs.mkdirs(inDir);
}
String input = "The quick brown fox\n" + "has many silly\n"
+ "red fox sox\n";
for (int i = 0; i < numMaps; ++i) {
DataOutputStream file = fs.create(new Path(inDir, "part-" + i));
file.writeBytes(input);
file.close();
}
DistributedCache.addFileToClassPath(TestMRJobs.APP_JAR, conf, fs);
conf.setOutputCommitter(CustomOutputCommitter.class);
conf.setInputFormat(TextInputFormat.class);
conf.setOutputKeyClass(LongWritable.class);
conf.setOutputValueClass(Text.class);
FileInputFormat.setInputPaths(conf, inDir);
FileOutputFormat.setOutputPath(conf, outDir);
conf.setNumMapTasks(numMaps);
conf.setNumReduceTasks(numReds);
JobClient jobClient = new JobClient(conf);
RunningJob job = jobClient.submitJob(conf);
return jobClient.monitorAndPrintJob(conf, job);
}
@Test
public void testCombinerShouldUpdateTheReporter() throws Exception {
JobConf conf = new JobConf(mrCluster.getConfig());
int numMaps = 5;
int numReds = 2;
Path in = new Path(mrCluster.getTestWorkDir().getAbsolutePath(),
"testCombinerShouldUpdateTheReporter-in");
Path out = new Path(mrCluster.getTestWorkDir().getAbsolutePath(),
"testCombinerShouldUpdateTheReporter-out");
createInputOutPutFolder(in, out, numMaps);
conf.setJobName("test-job-with-combiner");
conf.setMapperClass(IdentityMapper.class);
conf.setCombinerClass(MyCombinerToCheckReporter.class);
//conf.setJarByClass(MyCombinerToCheckReporter.class);
conf.setReducerClass(IdentityReducer.class);
DistributedCache.addFileToClassPath(TestMRJobs.APP_JAR, conf);
conf.setOutputCommitter(CustomOutputCommitter.class);
conf.setInputFormat(TextInputFormat.class);
conf.setOutputKeyClass(LongWritable.class);
conf.setOutputValueClass(Text.class);
FileInputFormat.setInputPaths(conf, in);
FileOutputFormat.setOutputPath(conf, out);
conf.setNumMapTasks(numMaps);
conf.setNumReduceTasks(numReds);
runJob(conf);
}
static boolean runJob(JobConf conf, Path inDir, Path outDir, int numMaps,
int numReds) throws IOException, InterruptedException {
FileSystem fs = FileSystem.get(conf);
if (fs.exists(outDir)) {
fs.delete(outDir, true);
}
if (!fs.exists(inDir)) {
fs.mkdirs(inDir);
}
String input = "The quick brown fox\n" + "has many silly\n"
+ "red fox sox\n";
for (int i = 0; i < numMaps; ++i) {
DataOutputStream file = fs.create(new Path(inDir, "part-" + i));
file.writeBytes(input);
file.close();
}
DistributedCache.addFileToClassPath(TestMRJobs.APP_JAR, conf, fs);
conf.setOutputCommitter(CustomOutputCommitter.class);
conf.setInputFormat(TextInputFormat.class);
conf.setOutputKeyClass(LongWritable.class);
conf.setOutputValueClass(Text.class);
FileInputFormat.setInputPaths(conf, inDir);
FileOutputFormat.setOutputPath(conf, outDir);
conf.setNumMapTasks(numMaps);
conf.setNumReduceTasks(numReds);
JobClient jobClient = new JobClient(conf);
RunningJob job = jobClient.submitJob(conf);
return jobClient.monitorAndPrintJob(conf, job);
}
@Test
public void testCombinerShouldUpdateTheReporter() throws Exception {
JobConf conf = new JobConf(mrCluster.getConfig());
int numMaps = 5;
int numReds = 2;
Path in = new Path(mrCluster.getTestWorkDir().getAbsolutePath(),
"testCombinerShouldUpdateTheReporter-in");
Path out = new Path(mrCluster.getTestWorkDir().getAbsolutePath(),
"testCombinerShouldUpdateTheReporter-out");
createInputOutPutFolder(in, out, numMaps);
conf.setJobName("test-job-with-combiner");
conf.setMapperClass(IdentityMapper.class);
conf.setCombinerClass(MyCombinerToCheckReporter.class);
//conf.setJarByClass(MyCombinerToCheckReporter.class);
conf.setReducerClass(IdentityReducer.class);
DistributedCache.addFileToClassPath(TestMRJobs.APP_JAR, conf);
conf.setOutputCommitter(CustomOutputCommitter.class);
conf.setInputFormat(TextInputFormat.class);
conf.setOutputKeyClass(LongWritable.class);
conf.setOutputValueClass(Text.class);
FileInputFormat.setInputPaths(conf, in);
FileOutputFormat.setOutputPath(conf, out);
conf.setNumMapTasks(numMaps);
conf.setNumReduceTasks(numReds);
runJob(conf);
}
private void addHdfsJars(String hdfsJarFileList, Configuration conf) throws IOException {
for (String jarFile : SPLITTER.split(hdfsJarFileList)) {
FileStatus[] status = this.fs.listStatus(new Path(jarFile));
for (FileStatus fileStatus : status) {
if (!fileStatus.isDirectory()) {
Path path = new Path(jarFile, fileStatus.getPath().getName());
LOG.info(String.format("Adding %s to classpath", path));
DistributedCache.addFileToClassPath(path, conf, this.fs);
}
}
}
}
/**
* Add framework or job-specific jars to the classpath through DistributedCache
* so the mappers can use them.
*/
@SuppressWarnings("deprecation")
private void addJars(Path jarFileDir, String jarFileList, Configuration conf) throws IOException {
LocalFileSystem lfs = FileSystem.getLocal(conf);
for (String jarFile : SPLITTER.split(jarFileList)) {
Path srcJarFile = new Path(jarFile);
FileStatus[] fileStatusList = lfs.globStatus(srcJarFile);
for (FileStatus status : fileStatusList) {
// For each FileStatus there are chances it could fail in copying at the first attempt, due to file-existence
// or file-copy is ongoing by other job instance since all Gobblin jobs share the same jar file directory.
// the retryCount is to avoid cases (if any) where retry is going too far and causes job hanging.
int retryCount = 0;
boolean shouldFileBeAddedIntoDC = true;
Path destJarFile = calculateDestJarFile(status, jarFileDir);
// Adding destJarFile into HDFS until it exists and the size of file on targetPath matches the one on local path.
while (!this.fs.exists(destJarFile) || fs.getFileStatus(destJarFile).getLen() != status.getLen()) {
try {
if (this.fs.exists(destJarFile) && fs.getFileStatus(destJarFile).getLen() != status.getLen()) {
Thread.sleep(WAITING_TIME_ON_IMCOMPLETE_UPLOAD);
throw new IOException("Waiting for file to complete on uploading ... ");
}
// Set the first parameter as false for not deleting sourceFile
// Set the second parameter as false for not overwriting existing file on the target, by default it is true.
// If the file is preExisted but overwrite flag set to false, then an IOException if thrown.
this.fs.copyFromLocalFile(false, false, status.getPath(), destJarFile);
} catch (IOException | InterruptedException e) {
LOG.warn("Path:" + destJarFile + " is not copied successfully. Will require retry.");
retryCount += 1;
if (retryCount >= this.jarFileMaximumRetry) {
LOG.error("The jar file:" + destJarFile + "failed in being copied into hdfs", e);
// If retry reaches upper limit, skip copying this file.
shouldFileBeAddedIntoDC = false;
break;
}
}
}
if (shouldFileBeAddedIntoDC) {
// Then add the jar file on HDFS to the classpath
LOG.info(String.format("Adding %s to classpath", destJarFile));
DistributedCache.addFileToClassPath(destJarFile, conf, this.fs);
}
}
}
}
@Override
protected void setupMiniDfsAndMrClusters() {
try {
final int dataNodes = 4; // There will be 4 data nodes
final int taskTrackers = 4; // There will be 4 task tracker nodes
System.setProperty("hadoop.log.dir", "build/test/logs");
// Create the dir that holds hadoop-site.xml file
// Delete if hadoop-site.xml exists already
CONF_DIR.mkdirs();
if(CONF_FILE.exists()) {
CONF_FILE.delete();
}
// Builds and starts the mini dfs and mapreduce clusters
Configuration config = new Configuration();
config.set("yarn.scheduler.capacity.root.queues", "default");
config.set("yarn.scheduler.capacity.root.default.capacity", "100");
m_dfs = new MiniDFSCluster(config, dataNodes, true, null);
m_fileSys = m_dfs.getFileSystem();
m_dfs_conf = m_dfs.getConfiguration(0);
//Create user home directory
m_fileSys.mkdirs(m_fileSys.getWorkingDirectory());
m_mr = new MiniMRYarnCluster("PigMiniCluster", taskTrackers);
m_mr.init(m_dfs_conf);
m_mr.start();
// Write the necessary config info to hadoop-site.xml
m_mr_conf = new Configuration(m_mr.getConfig());
m_conf = m_mr_conf;
m_conf.set("fs.default.name", m_dfs_conf.get("fs.default.name"));
m_conf.unset(MRConfiguration.JOB_CACHE_FILES);
m_conf.setInt(MRConfiguration.IO_SORT_MB, 200);
m_conf.set(MRConfiguration.CHILD_JAVA_OPTS, "-Xmx512m");
m_conf.setInt(MRConfiguration.SUMIT_REPLICATION, 2);
m_conf.setInt(MRConfiguration.MAP_MAX_ATTEMPTS, 2);
m_conf.setInt(MRConfiguration.REDUCE_MAX_ATTEMPTS, 2);
m_conf.set("dfs.datanode.address", "0.0.0.0:0");
m_conf.set("dfs.datanode.http.address", "0.0.0.0:0");
m_conf.set("pig.jobcontrol.sleep", "100");
m_conf.writeXml(new FileOutputStream(CONF_FILE));
m_fileSys.copyFromLocalFile(new Path(CONF_FILE.getAbsoluteFile().toString()),
new Path("/pigtest/conf/hadoop-site.xml"));
DistributedCache.addFileToClassPath(new Path("/pigtest/conf/hadoop-site.xml"), m_conf);
System.err.println("XXX: Setting fs.default.name to: " + m_dfs_conf.get("fs.default.name"));
// Set the system properties needed by Pig
System.setProperty("cluster", m_conf.get(MRConfiguration.JOB_TRACKER));
//System.setProperty("namenode", m_dfs_conf.get("fs.default.name"));
System.setProperty("namenode", m_conf.get("fs.default.name"));
System.setProperty("junit.hadoop.conf", CONF_DIR.getPath());
} catch (IOException e) {
throw new RuntimeException(e);
}
}
/**
* Add an file path to the current set of classpath entries It adds the file
* to cache as well.
*
* Files added with this method will not be unpacked while being added to the
* classpath.
* To add archives to classpath, use the {@link #addArchiveToClassPath(Path)}
* method instead.
*
* @param file Path of the file to be added
*/
public void addFileToClassPath(Path file)
throws IOException {
ensureState(JobState.DEFINE);
DistributedCache.addFileToClassPath(file, conf, file.getFileSystem(conf));
}
/**
* Add an file path to the current set of classpath entries It adds the file
* to cache as well.
*
* Files added with this method will not be unpacked while being added to the
* classpath.
* To add archives to classpath, use the {@link #addArchiveToClassPath(Path)}
* method instead.
*
* @param file Path of the file to be added
*/
public void addFileToClassPath(Path file)
throws IOException {
ensureState(JobState.DEFINE);
DistributedCache.addFileToClassPath(file, conf, file.getFileSystem(conf));
}