下面列出了org.apache.hadoop.mapreduce.Job#addCacheFile ( ) 实例代码,或者点击链接到github查看源代码,也可以在右侧发表评论。
private void testWithConf(Configuration conf) throws IOException,
InterruptedException, ClassNotFoundException, URISyntaxException {
// Create a temporary file of length 1.
Path first = createTempFile("distributed.first", "x");
// Create two jars with a single file inside them.
Path second =
makeJar(new Path(TEST_ROOT_DIR, "distributed.second.jar"), 2);
Path third =
makeJar(new Path(TEST_ROOT_DIR, "distributed.third.jar"), 3);
Path fourth =
makeJar(new Path(TEST_ROOT_DIR, "distributed.fourth.jar"), 4);
Job job = Job.getInstance(conf);
job.setMapperClass(DistributedCacheCheckerMapper.class);
job.setReducerClass(DistributedCacheCheckerReducer.class);
job.setOutputFormatClass(NullOutputFormat.class);
FileInputFormat.setInputPaths(job, first);
// Creates the Job Configuration
job.addCacheFile(
new URI(first.toUri().toString() + "#distributed.first.symlink"));
job.addFileToClassPath(second);
job.addArchiveToClassPath(third);
job.addCacheArchive(fourth.toUri());
job.setMaxMapAttempts(1); // speed up failures
job.submit();
assertTrue(job.waitForCompletion(false));
}
@Override
public void initializeJob(Job job) {
try {
Configuration conf = job.getConfiguration();
// only create the splits file if we haven't already created it, possibly for another table
if (null == NonShardedSplitsFile.findSplitsFile(job.getConfiguration(), job.getLocalCacheFiles(), isTrimmed())) {
URI splitsFileUri = createTheSplitsFile(conf);
job.addCacheFile(splitsFileUri);
}
} catch (Exception e) {
log.error(e);
throw new RuntimeException("Failed to initialize partitioner for job", e);
}
}
/**
* Initialize the user, table, and authorization information for the configuration object that will be used with an Accumulo InputFormat.
*
* @param job
* the Hadoop Job object
* @param user
* a valid accumulo user
* @param passwd
* the user's password
* @param table
* the table to read
* @param auths
* the authorizations used to restrict data read
*/
public static void setInputInfo(Job job, String user, byte[] passwd, String table, Authorizations auths) {
Configuration conf = job.getConfiguration();
if (conf.getBoolean(INPUT_INFO_HAS_BEEN_SET, false))
throw new IllegalStateException("Input info can only be set once per job");
conf.setBoolean(INPUT_INFO_HAS_BEEN_SET, true);
ArgumentChecker.notNull(user, passwd, table);
conf.set(USERNAME, user);
conf.set("accumulo.username", user);
conf.set(TABLE_NAME, table);
if (auths != null && !auths.isEmpty())
conf.set(AUTHORIZATIONS, auths.toString());
try {
FileSystem fs = FileSystem.get(conf);
String workingDirectory = conf.get(WORKING_DIRECTORY, fs.getWorkingDirectory().toString());
Path work = new Path(workingDirectory);
Path file = new Path(work, conf.get("mapreduce.job.name") + System.currentTimeMillis() + ".pw");
conf.set(PASSWORD_PATH, file.toString());
fs = FileSystem.get(file.toUri(), conf);
byte[] encodedPw = Base64.encodeBase64(passwd);
try (FSDataOutputStream fos = fs.create(file, false)) {
fs.setPermission(file, new FsPermission(FsAction.ALL, FsAction.NONE, FsAction.NONE));
fs.deleteOnExit(file);
fos.writeInt(encodedPw.length);
fos.write(encodedPw);
}
conf.set("accumulo.password", new String(encodedPw));
job.addCacheFile(file.toUri());
} catch (IOException ioe) {
throw new RuntimeException(ioe);
}
}
/**
* Set the ranges to map over for this configuration object.
*
* @param job
* the Hadoop job
* @param ranges
* the ranges that will be mapped over
*/
public static void setRanges(Job job, Collection<Range> ranges) {
ArgumentChecker.notNull(job);
ArgumentChecker.notNull(ranges);
Configuration conf = job.getConfiguration();
FileSystem fs;
try {
fs = FileSystem.get(conf);
String workingDirectory = conf.get(WORKING_DIRECTORY, fs.getWorkingDirectory().toString());
Path work = new Path(workingDirectory);
Path tempPath = new Path(work, UUID.randomUUID() + ".cache");
fs = FileSystem.get(tempPath.toUri(), conf);
try (FSDataOutputStream outStream = fs.create(tempPath)) {
outStream.writeInt(ranges.size());
for (Range r : ranges) {
r.write(outStream);
}
outStream.close();
job.addCacheFile(tempPath.toUri());
} catch (IOException ex) {
throw new IllegalArgumentException("Unable to encode ranges to Base64", ex);
}
conf.set(RANGES, tempPath.toString());
} catch (IOException e) {
throw new IllegalArgumentException(e);
}
}
/**
* The MapReduce driver - setup and launch the job.
*
* @param args the command-line arguments
* @return the process exit code
* @throws Exception if something goes wrong
*/
public int run(final String[] args) throws Exception {
Cli cli = Cli.builder().setArgs(args).addOptions(Options.values()).build();
int result = cli.runCmd();
if (result != 0) {
return result;
}
Path usersPath = new Path(cli.getArgValueAsString(Options.USERS));
Path userLogsPath = new Path(cli.getArgValueAsString(Options.USER_LOGS));
Path bloomPath = new Path(cli.getArgValueAsString(Options.BLOOM_FILE));
Path outputPath = new Path(cli.getArgValueAsString(Options.OUTPUT));
Configuration conf = super.getConf();
Job job = new Job(conf);
job.setJarByClass(BloomJoin.class);
MultipleInputs.addInputPath(job, usersPath, TextInputFormat.class, UserMap.class);
MultipleInputs.addInputPath(job, userLogsPath, TextInputFormat.class, UserLogMap.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(Tuple.class);
job.setReducerClass(Reduce.class);
job.addCacheFile(bloomPath.toUri());
job.getConfiguration().set(AbstractFilterMap.DISTCACHE_FILENAME_CONFIG, bloomPath.getName());
FileInputFormat.setInputPaths(job, userLogsPath);
FileOutputFormat.setOutputPath(job, outputPath);
return job.waitForCompletion(true) ? 0 : 1;
}
public int run(String[] args) throws Exception {
LOG.info("starting");
Job job = Job.getInstance(getConf());
Path inputDir = new Path(args[0]);
Path outputDir = new Path(args[1]);
boolean useSimplePartitioner = getUseSimplePartitioner(job);
TeraInputFormat.setInputPaths(job, inputDir);
FileOutputFormat.setOutputPath(job, outputDir);
job.setJobName("TeraSort");
job.setJarByClass(TeraSort.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Text.class);
job.setInputFormatClass(TeraInputFormat.class);
job.setOutputFormatClass(TeraOutputFormat.class);
if (useSimplePartitioner) {
job.setPartitionerClass(SimplePartitioner.class);
} else {
long start = System.currentTimeMillis();
Path partitionFile = new Path(outputDir,
TeraInputFormat.PARTITION_FILENAME);
URI partitionUri = new URI(partitionFile.toString() +
"#" + TeraInputFormat.PARTITION_FILENAME);
try {
TeraInputFormat.writePartitionFile(job, partitionFile);
} catch (Throwable e) {
LOG.error(e.getMessage());
return -1;
}
job.addCacheFile(partitionUri);
long end = System.currentTimeMillis();
System.out.println("Spent " + (end - start) + "ms computing partitions.");
job.setPartitionerClass(TotalOrderPartitioner.class);
}
job.getConfiguration().setInt("dfs.replication", getOutputReplication(job));
TeraOutputFormat.setFinalSync(job, true);
int ret = job.waitForCompletion(true) ? 0 : 1;
LOG.info("done");
return ret;
}
private void testWithConf(Configuration conf) throws IOException,
InterruptedException, ClassNotFoundException, URISyntaxException {
// Create a temporary file of length 1.
Path first = createTempFile("distributed.first", "x");
// Create two jars with a single file inside them.
Path second =
makeJar(new Path(TEST_ROOT_DIR, "distributed.second.jar"), 2);
Path third =
makeJar(new Path(TEST_ROOT_DIR, "distributed.third.jar"), 3);
Path fourth =
makeJar(new Path(TEST_ROOT_DIR, "distributed.fourth.jar"), 4);
Job job = Job.getInstance(conf);
job.setMapperClass(DistributedCacheCheckerMapper.class);
job.setReducerClass(DistributedCacheCheckerReducer.class);
job.setOutputFormatClass(NullOutputFormat.class);
FileInputFormat.setInputPaths(job, first);
// Creates the Job Configuration
job.addCacheFile(
new URI(first.toUri().toString() + "#distributed.first.symlink"));
job.addFileToClassPath(second);
job.addArchiveToClassPath(third);
job.addCacheArchive(fourth.toUri());
job.setMaxMapAttempts(1); // speed up failures
job.submit();
assertTrue(job.waitForCompletion(false));
}
public int run(String[] args) throws Exception {
LOG.info("starting");
Job job = Job.getInstance(getConf());
Path inputDir = new Path(args[0]);
Path outputDir = new Path(args[1]);
boolean useSimplePartitioner = getUseSimplePartitioner(job);
TeraInputFormat.setInputPaths(job, inputDir);
FileOutputFormat.setOutputPath(job, outputDir);
job.setJobName("TeraSort");
job.setJarByClass(TeraSort.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Text.class);
job.setInputFormatClass(TeraInputFormat.class);
job.setOutputFormatClass(TeraOutputFormat.class);
if (useSimplePartitioner) {
job.setPartitionerClass(SimplePartitioner.class);
} else {
long start = System.currentTimeMillis();
Path partitionFile = new Path(outputDir,
TeraInputFormat.PARTITION_FILENAME);
URI partitionUri = new URI(partitionFile.toString() +
"#" + TeraInputFormat.PARTITION_FILENAME);
try {
TeraInputFormat.writePartitionFile(job, partitionFile);
} catch (Throwable e) {
LOG.error(e.getMessage());
return -1;
}
job.addCacheFile(partitionUri);
long end = System.currentTimeMillis();
System.out.println("Spent " + (end - start) + "ms computing partitions.");
job.setPartitionerClass(TotalOrderPartitioner.class);
}
job.getConfiguration().setInt("dfs.replication", getOutputReplication(job));
TeraOutputFormat.setFinalSync(job, true);
int ret = job.waitForCompletion(true) ? 0 : 1;
LOG.info("done");
return ret;
}
/**
* Add SSL files to distributed cache. Trust store, key store and ssl config xml
*
* @param job - Job handle
* @param sslConfigPath - ssl Configuration file specified through options
* @throws IOException - If any
*/
private void addSSLFilesToDistCache(Job job,
Path sslConfigPath) throws IOException {
Configuration configuration = job.getConfiguration();
FileSystem localFS = FileSystem.getLocal(configuration);
Configuration sslConf = new Configuration(false);
sslConf.addResource(sslConfigPath);
Path localStorePath = getLocalStorePath(sslConf,
DistCpConstants.CONF_LABEL_SSL_TRUST_STORE_LOCATION);
job.addCacheFile(localStorePath.makeQualified(localFS.getUri(),
localFS.getWorkingDirectory()).toUri());
configuration.set(DistCpConstants.CONF_LABEL_SSL_TRUST_STORE_LOCATION,
localStorePath.getName());
localStorePath = getLocalStorePath(sslConf,
DistCpConstants.CONF_LABEL_SSL_KEY_STORE_LOCATION);
job.addCacheFile(localStorePath.makeQualified(localFS.getUri(),
localFS.getWorkingDirectory()).toUri());
configuration.set(DistCpConstants.CONF_LABEL_SSL_KEY_STORE_LOCATION,
localStorePath.getName());
job.addCacheFile(sslConfigPath.makeQualified(localFS.getUri(),
localFS.getWorkingDirectory()).toUri());
}
public int run(String[] args) throws Exception {
//创建job
Configuration config = getConf();
//添加自定义配置
config.addResource("mr.xml");
Job job = Job.getInstance(config);
//通过job设置一些参数
//通过job设置一些参数
job.setJarByClass(ParseLogJob_End.class);
job.setJobName("parselog");
job.setMapperClass(LogMapper.class);
job.setReducerClass(LogReducer.class);
job.setMapOutputKeyClass(TextLongWritable.class);
job.setGroupingComparatorClass(TextLongGroupComparator.class);
job.setPartitionerClass(TextLongPartition.class);
job.setMapOutputValueClass(LogWritable.class);
job.setOutputValueClass(Text.class);
//设置CombineFileInputFormat
job.setInputFormatClass(CombineTextInputFormat.class);
//添加分布式缓存
job.addCacheFile(new URI(config.get("ip.file.path")));
//设置OutputFormat
// job.setOutputFormatClass(LogOutputFormat.class);
//设置压缩类型
// FileOutputFormat.setCompressOutput(job, true);
// FileOutputFormat.setOutputCompressorClass(job, LzopCodec.class);
FileSystem fs = FileSystem.get(config);
//添加输入和输出数据
FileStatus[] fileStatuses = fs.listStatus(new Path(args[0]));
for (int i = 0; i < fileStatuses.length; i++) {
MultipleInputs.addInputPath(job, fileStatuses[i].getPath(), TextInputFormat.class, LogMapper.class);
String inputPath = fileStatuses[i].getPath().toString();
String dir_name = inputPath.substring(inputPath.lastIndexOf('/')+1);
System.out.println(dir_name);
}
Path outputPath = new Path(args[1]);
FileOutputFormat.setOutputPath(job, outputPath);
if (fs.exists(outputPath)) {
fs.delete(outputPath, true);
}
//运行程序
if (!job.waitForCompletion(true)) {
throw new RuntimeException(job.getJobName() + "failed!");
}
return 0;
}
public int run(String[] args) throws Exception {
//创建job
Configuration config = getConf();
//添加自定义配置
config.addResource("mr.xml");
Job job = Job.getInstance(config);
//通过job设置一些参数
//通过job设置一些参数
job.setJarByClass(ParseLogJob.class);
job.setJobName("parselog");
job.setMapperClass(LogMapper.class);
job.setReducerClass(LogReducer.class);
job.setMapOutputKeyClass(TextLongWritable.class);
job.setGroupingComparatorClass(TextLongGroupComparator.class);
job.setPartitionerClass(TextLongPartition.class);
job.setMapOutputValueClass(LogWritable.class);
job.setOutputValueClass(Text.class);
//设置CombineFileInputFormat
job.setInputFormatClass(CombineTextInputFormat.class);
//添加分布式缓存
job.addCacheFile(new URI(config.get("ip.file.path")));
//设置OutputFormat
job.setOutputFormatClass(LogOutputFormat.class);
//添加输入和输出数据
FileInputFormat.addInputPath(job, new Path(args[0]));
Path outputPath = new Path(args[1]);
FileOutputFormat.setOutputPath(job, outputPath);
//设置压缩类型
// FileOutputFormat.setCompressOutput(job, true);
// FileOutputFormat.setOutputCompressorClass(job, LzopCodec.class);
FileSystem fs = FileSystem.get(config);
if (fs.exists(outputPath)) {
fs.delete(outputPath, true);
}
//运行程序
if (!job.waitForCompletion(true)) {
throw new RuntimeException(job.getJobName() + "failed!");
}
return 0;
}
public void _testDistributedCache(String jobJarPath) throws Exception {
if (!(new File(MiniMRYarnCluster.APPJAR)).exists()) {
LOG.info("MRAppJar " + MiniMRYarnCluster.APPJAR
+ " not found. Not running test.");
return;
}
// Create a temporary file of length 1.
Path first = createTempFile("distributed.first", "x");
// Create two jars with a single file inside them.
Path second =
makeJar(new Path(TEST_ROOT_DIR, "distributed.second.jar"), 2);
Path third =
makeJar(new Path(TEST_ROOT_DIR, "distributed.third.jar"), 3);
Path fourth =
makeJar(new Path(TEST_ROOT_DIR, "distributed.fourth.jar"), 4);
Job job = Job.getInstance(mrCluster.getConfig());
// Set the job jar to a new "dummy" jar so we can check that its extracted
// properly
job.setJar(jobJarPath);
// Because the job jar is a "dummy" jar, we need to include the jar with
// DistributedCacheChecker or it won't be able to find it
Path distributedCacheCheckerJar = new Path(
JarFinder.getJar(DistributedCacheChecker.class));
job.addFileToClassPath(distributedCacheCheckerJar.makeQualified(
localFs.getUri(), distributedCacheCheckerJar.getParent()));
job.setMapperClass(DistributedCacheChecker.class);
job.setOutputFormatClass(NullOutputFormat.class);
FileInputFormat.setInputPaths(job, first);
// Creates the Job Configuration
job.addCacheFile(
new URI(first.toUri().toString() + "#distributed.first.symlink"));
job.addFileToClassPath(second);
// The AppMaster jar itself
job.addFileToClassPath(
APP_JAR.makeQualified(localFs.getUri(), APP_JAR.getParent()));
job.addArchiveToClassPath(third);
job.addCacheArchive(fourth.toUri());
job.setMaxMapAttempts(1); // speed up failures
job.submit();
String trackingUrl = job.getTrackingURL();
String jobId = job.getJobID().toString();
Assert.assertTrue(job.waitForCompletion(false));
Assert.assertTrue("Tracking URL was " + trackingUrl +
" but didn't Match Job ID " + jobId ,
trackingUrl.endsWith(jobId.substring(jobId.lastIndexOf("_")) + "/"));
}
public int run(String[] args) throws Exception {
//创建job
Configuration config = getConf();
//添加自定义配置
config.addResource("mr.xml");
Job job = Job.getInstance(config);
//通过job设置一些参数
job.setJarByClass(ParseLogJob.class);
job.setJobName("parselog");
job.setMapperClass(LogMapper.class);
job.setReducerClass(LogReducer.class);
job.setMapOutputKeyClass(TextLongWritable.class);
job.setGroupingComparatorClass(TextLongGroupComparator.class);
job.setPartitionerClass(TextLongPartition.class);
job.setMapOutputValueClass(LogWritable.class);
job.setOutputValueClass(Text.class);
//设置CombineFileInputFormat
job.setInputFormatClass(CombineTextInputFormat.class);
//添加分布式缓存
job.addCacheFile(new URI(config.get("ip.file.path")));
//添加输入和输出数据
FileInputFormat.addInputPath(job, new Path(args[0]));
Path outputPath = new Path(args[1]);
FileOutputFormat.setOutputPath(job, outputPath);
//设置压缩类型
// FileOutputFormat.setCompressOutput(job, true);
// FileOutputFormat.setOutputCompressorClass(job, LzopCodec.class);
FileSystem fs = FileSystem.get(config);
if (fs.exists(outputPath)) {
fs.delete(outputPath, true);
}
//运行程序
if (!job.waitForCompletion(true)) {
throw new RuntimeException(job.getJobName() + "failed!");
}
return 0;
}
public int run(String[] args) throws Exception {
//创建job
Configuration config = getConf();
//添加自定义配置
config.addResource("mr.xml");
Job job = Job.getInstance(config);
//通过job设置一些参数
job.setJarByClass(ParseLogJob.class);
job.setJobName("parselog");
job.setMapperClass(LogMapper.class);
job.setReducerClass(LogReducer.class);
job.setMapOutputKeyClass(TextLongWritable.class);
job.setGroupingComparatorClass(TextLongGroupComparator.class);
job.setPartitionerClass(TextLongPartition.class);
job.setMapOutputValueClass(LogWritable.class);
job.setOutputValueClass(Text.class);
//添加分布式缓存
job.addCacheFile(new URI(config.get("ip.file.path")));
//添加输入和输出数据
FileInputFormat.addInputPath(job, new Path(args[0]));
Path outputPath = new Path(args[1]);
FileOutputFormat.setOutputPath(job, outputPath);
//设置压缩类型
// FileOutputFormat.setCompressOutput(job, true);
// FileOutputFormat.setOutputCompressorClass(job, LzopCodec.class);
FileSystem fs = FileSystem.get(config);
if (fs.exists(outputPath)) {
fs.delete(outputPath, true);
}
//运行程序
if (!job.waitForCompletion(true)) {
throw new RuntimeException(job.getJobName() + "failed!");
}
return 0;
}
public int run(String[] args) throws Exception {
if (args.length != 2) {
System.err.println("Invalid no. of arguments provided");
System.err.println("Usage: terasort <input-dir> <output-dir>");
return -1;
}
LOG.info("starting");
Job job = Job.getInstance(getConf());
Path inputDir = new Path(args[0]);
Path outputDir = new Path(args[1]);
boolean useSimplePartitioner = getUseSimplePartitioner(job);
TeraInputFormat.setInputPaths(job, inputDir);
FileOutputFormat.setOutputPath(job, outputDir);
job.setJobName("TeraSort");
job.setJarByClass(TeraSort.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Text.class);
job.setInputFormatClass(TeraInputFormat.class);
job.setOutputFormatClass(TeraOutputFormat.class);
if (useSimplePartitioner) {
job.setPartitionerClass(SimplePartitioner.class);
} else {
long start = System.currentTimeMillis();
Path partitionFile = new Path(outputDir,
TeraInputFormat.PARTITION_FILENAME);
URI partitionUri = new URI(partitionFile.toString() +
"#" + TeraInputFormat.PARTITION_FILENAME);
try {
TeraInputFormat.writePartitionFile(job, partitionFile);
} catch (Throwable e) {
LOG.error(e.getMessage());
return -1;
}
job.addCacheFile(partitionUri);
long end = System.currentTimeMillis();
System.out.println("Spent " + (end - start) + "ms computing partitions.");
job.setPartitionerClass(TotalOrderPartitioner.class);
}
job.getConfiguration().setInt("dfs.replication", getOutputReplication(job));
TeraOutputFormat.setFinalSync(job, true);
int ret = job.waitForCompletion(true) ? 0 : 1;
LOG.info("done");
return ret;
}
public int run(String[] args) throws Exception {
//创建job
Configuration config = getConf();
//添加自定义配置
config.addResource("mr.xml");
Job job = Job.getInstance(config);
//通过job设置一些参数
job.setJarByClass(ParseLogJob.class);
job.setJobName("parselog");
job.setMapperClass(LogMapper.class);
job.setReducerClass(LogReducer.class);
job.setMapOutputKeyClass(TextLongWritable.class);
job.setGroupingComparatorClass(TextLongGroupComparator.class);
job.setPartitionerClass(TextLongPartition.class);
job.setMapOutputValueClass(LogWritable.class);
job.setOutputValueClass(Text.class);
//设置CombineFileInputFormat
job.setInputFormatClass(CombineTextInputFormat.class);
//添加分布式缓存
job.addCacheFile(new URI(config.get("ip.file.path")));
//设置OutputFormat
job.setOutputFormatClass(LogOutputFormat.class);
//添加输入和输出数据
FileInputFormat.addInputPath(job, new Path(args[0]));
Path outputPath = new Path(args[1]);
FileOutputFormat.setOutputPath(job, outputPath);
//设置压缩类型
// FileOutputFormat.setCompressOutput(job, true);
// FileOutputFormat.setOutputCompressorClass(job, LzopCodec.class);
FileSystem fs = FileSystem.get(config);
if (fs.exists(outputPath)) {
fs.delete(outputPath, true);
}
//运行程序
if (!job.waitForCompletion(true)) {
throw new RuntimeException(job.getJobName() + "failed!");
}
return 0;
}
/**
* Creates Job instance and sets up necessary properties for it.
* @param conf The Job config.
* @return The job.
* @throws Exception On error.
*/
private Job setupConfig(JobConf conf) throws Exception {
Job job = Job.getInstance(conf);
Path inputDir = new Path(generateOutDir);
Path outputDir = new Path(sortOutDir);
boolean useSimplePartitioner = TeraSort.getUseSimplePartitioner(job);
TeraInputFormat.setInputPaths(job, inputDir);
FileOutputFormat.setOutputPath(job, outputDir);
job.setJobName("TeraSort");
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Text.class);
job.setInputFormatClass(TeraInputFormat.class);
job.setOutputFormatClass(TeraOutputFormat.class);
if (useSimplePartitioner)
job.setPartitionerClass(TeraSort.SimplePartitioner.class);
else {
long start = System.currentTimeMillis();
Path partFile = new Path(outputDir, PARTITION_FILENAME);
URI partUri = new URI(partFile.toString() + "#" + PARTITION_FILENAME);
try {
TeraInputFormat.writePartitionFile(job, partFile);
} catch (Throwable e) {
throw new RuntimeException(e);
}
job.addCacheFile(partUri);
long end = System.currentTimeMillis();
System.out.println("Spent " + (end - start) + "ms computing partitions. " +
"Partition file added to distributed cache: " + partUri);
job.setPartitionerClass(getTeraSortTotalOrderPartitioner()/*TeraSort.TotalOrderPartitioner.class*/);
}
job.getConfiguration().setInt("dfs.replication", TeraSort.getOutputReplication(job));
/* TeraOutputFormat.setFinalSync(job, true); */
Method m = TeraOutputFormat.class.getDeclaredMethod("setFinalSync", JobContext.class, boolean.class);
m.setAccessible(true);
m.invoke(null, job, true);
return job;
}
/**
* The original run() has been modified to:
* - take command parameters for running terasort on Pravega streams
* - use special mapper and reducer to convert data type required by Pravega hadoop connector
*/
public int run(String[] args) throws Exception {
if (args.length != 6) {
usage();
return 2;
}
LOG.info("starting");
Path inputDir = new Path(args[0]);
Path outputDir = new Path(args[1]);
getConf().setStrings(INPUT_URI_STRING, args[2]);
getConf().setStrings(INPUT_SCOPE_NAME, args[3]);
getConf().setStrings(INPUT_STREAM_NAME, args[4]);
getConf().setStrings(INPUT_DESERIALIZER, TextSerializer.class.getName());
getConf().setStrings(OUTPUT_SCOPE_NAME, args[3]);
getConf().setStrings(OUTPUT_URI_STRING, args[2]);
getConf().setStrings(OUTPUT_DESERIALIZER, TextSerializer.class.getName());
getConf().setStrings(OUTPUT_STREAM_PREFIX, args[5]);
Job job = Job.getInstance(getConf());
boolean useSimplePartitioner = getUseSimplePartitioner(job);
TeraInputFormat.setInputPaths(job, inputDir);
FileOutputFormat.setOutputPath(job, outputDir);
job.setJobName("TeraSort");
job.setJarByClass(TeraSort.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Text.class);
job.setMapperClass(TeraSortMapper.class);
job.setReducerClass(TeraSortReducer.class);
job.setInputFormatClass(PravegaInputFormat.class);
job.setOutputFormatClass(PravegaTeraSortOutputFormat.class);
if (useSimplePartitioner) {
job.setPartitionerClass(SimplePartitioner.class);
} else {
long start = System.currentTimeMillis();
Path partitionFile = new Path(outputDir,
TeraInputFormat.PARTITION_FILENAME);
URI partitionUri = new URI(partitionFile.toString() +
"#" + TeraInputFormat.PARTITION_FILENAME);
try {
TeraInputFormat.writePartitionFile(job, partitionFile);
} catch (Throwable e) {
LOG.error(e.getMessage());
return -1;
}
job.addCacheFile(partitionUri);
long end = System.currentTimeMillis();
LOG.info("Spent " + (end - start) + "ms computing partitions.");
job.setPartitionerClass(TotalOrderPartitioner.class);
}
job.getConfiguration().setInt("dfs.replication", getOutputReplication(job));
int ret = job.waitForCompletion(true) ? 0 : 1;
LOG.info("done");
return ret;
}
@Override
public int run(String[] strings) throws Exception {
Configuration conf = getConf();
String dir = conf.get(LindenJobConfig.INPUT_DIR, null);
logger.info("input dir:" + dir);
Path inputPath = new Path(StringUtils.unEscapeString(dir));
Path outputPath = new Path(conf.get(LindenJobConfig.OUTPUT_DIR));
String indexPath = conf.get(LindenJobConfig.INDEX_PATH);
FileSystem fs = FileSystem.get(conf);
if (fs.exists(outputPath)) {
fs.delete(outputPath, true);
}
if (fs.exists(new Path(indexPath))) {
fs.delete(new Path(indexPath), true);
}
int numShards = conf.getInt(LindenJobConfig.NUM_SHARDS, 1);
Shard[] shards = createShards(indexPath, numShards);
Shard.setIndexShards(conf, shards);
//empty trash;
(new Trash(conf)).expunge();
Job job = Job.getInstance(conf, "linden-hadoop-indexing");
job.setJarByClass(LindenJob.class);
job.setMapperClass(LindenMapper.class);
job.setCombinerClass(LindenCombiner.class);
job.setReducerClass(LindenReducer.class);
job.setMapOutputKeyClass(Shard.class);
job.setMapOutputValueClass(IntermediateForm.class);
job.setOutputKeyClass(Shard.class);
job.setOutputValueClass(Text.class);
job.setInputFormatClass(TextInputFormat.class);
job.setOutputFormatClass(IndexUpdateOutputFormat.class);
job.setReduceSpeculativeExecution(false);
job.setNumReduceTasks(numShards);
String lindenSchemaFile = conf.get(LindenJobConfig.SCHEMA_FILE_URL);
if (lindenSchemaFile == null) {
throw new IOException("no schema file is found");
}
logger.info("Adding schema file: " + lindenSchemaFile);
job.addCacheFile(new URI(lindenSchemaFile + "#lindenSchema"));
String lindenPropertiesFile = conf.get(LindenJobConfig.LINDEN_PROPERTIES_FILE_URL);
if (lindenPropertiesFile == null) {
throw new IOException("no linden properties file is found");
}
logger.info("Adding linden properties file: " + lindenPropertiesFile);
job.addCacheFile(new URI(lindenPropertiesFile + "#lindenProperties"));
FileInputFormat.setInputPaths(job, inputPath);
FileOutputFormat.setOutputPath(job, outputPath);
Path[] inputs = FileInputFormat.getInputPaths(job);
StringBuilder buffer = new StringBuilder(inputs[0].toString());
for (int i = 1; i < inputs.length; i++) {
buffer.append(",");
buffer.append(inputs[i].toString());
}
logger.info("mapreduce.input.dir = " + buffer.toString());
logger.info("mapreduce.output.dir = " + FileOutputFormat.getOutputPath(job).toString());
logger.info("mapreduce.job.num.reduce.tasks = " + job.getNumReduceTasks());
logger.info(shards.length + " shards = " + conf.get(LindenJobConfig.INDEX_SHARDS));
logger.info("mapreduce.input.format.class = " + job.getInputFormatClass());
logger.info("mapreduce.output.format.class = " + job.getOutputFormatClass());
logger.info("mapreduce.cluster.temp.dir = " + conf.get(MRJobConfig.TEMP_DIR));
job.waitForCompletion(true);
if (!job.isSuccessful()) {
throw new RuntimeException("Job failed");
}
return 0;
}
/**
* The MapReduce driver - setup and launch the job.
*
* @param args the command-line arguments
* @return the process exit code
* @throws Exception if something goes wrong
*/
public int run(final String[] args) throws Exception {
Cli cli = Cli.builder().setArgs(args).addOptions(Options.values()).build();
int result = cli.runCmd();
if (result != 0) {
return result;
}
Path usersPath = new Path(cli.getArgValueAsString(Options.USERS));
Path userLogsPath = new Path(cli.getArgValueAsString(Options.USER_LOGS));
Path outputPath = new Path(cli.getArgValueAsString(Options.OUTPUT));
Configuration conf = super.getConf();
Job job = new Job(conf);
job.setJarByClass(ReplicatedJoin.class);
job.setMapperClass(JoinMap.class);
job.addCacheFile(usersPath.toUri());
job.getConfiguration().set(JoinMap.DISTCACHE_FILENAME_CONFIG, usersPath.getName());
job.setNumReduceTasks(0);
FileInputFormat.setInputPaths(job, userLogsPath);
FileOutputFormat.setOutputPath(job, outputPath);
return job.waitForCompletion(true) ? 0 : 1;
}