下面列出了org.apache.hadoop.mapreduce.Job#setJobName ( ) 实例代码,或者点击链接到github查看源代码,也可以在右侧发表评论。
/**
* @param args the cli arguments
*/
public int run(String[] args)
throws IOException, InterruptedException, ClassNotFoundException {
Job job = Job.getInstance(getConf());
if (args.length != 2) {
usage();
return 2;
}
setNumberOfRows(job, parseHumanLong(args[0]));
Path outputDir = new Path(args[1]);
FileOutputFormat.setOutputPath(job, outputDir);
job.setJobName("TeraGen");
job.setJarByClass(TeraGen.class);
job.setMapperClass(SortGenMapper.class);
job.setNumReduceTasks(0);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Text.class);
job.setInputFormatClass(RangeInputFormat.class);
job.setOutputFormatClass(TeraOutputFormat.class);
return job.waitForCompletion(true) ? 0 : 1;
}
/**
* Tests Reducer throwing exception.
*
* @throws Exception
*/
public void testReducerFail() throws Exception {
Configuration conf = createJobConf();
Job job = MapReduceTestUtil.createJob(conf, inDir, outDir, 1, 1, input);
job.setJobName("chain");
ChainMapper.addMapper(job, Mapper.class, LongWritable.class, Text.class,
LongWritable.class, Text.class, null);
ChainReducer.setReducer(job, FailReduce.class, LongWritable.class,
Text.class, LongWritable.class, Text.class, null);
ChainReducer.addMapper(job, Mapper.class, LongWritable.class, Text.class,
LongWritable.class, Text.class, null);
job.waitForCompletion(true);
assertTrue("Job Not failed", !job.isSuccessful());
}
public static void main(String[] args) throws Exception {
Configuration conf = new Configuration();
String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();
if (otherArgs.length != 3) {
System.err.println("Usage: xflowstatic <type> <in> <out>");
System.exit(2);
}
conf.set(TYPE_KEY, otherArgs[0]);
Job job = Job.getInstance();
job.setJobName("xflowstatic");
job.setJarByClass(XflowStatic.class);
job.setMapperClass(XflowMapper.class);
job.setCombinerClass(IntSumReducer.class);
job.setReducerClass(IntSumReducer.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
FileInputFormat.addInputPath(job, new Path(otherArgs[1]));
FileOutputFormat.setOutputPath(job, new Path(otherArgs[2]));
System.exit(job.waitForCompletion(true) ? 0 : 1);
}
public int run(String[] args) throws Exception {
Job job = Job.getInstance(getConf());
if (args.length != 2) {
usage();
return 2;
}
TeraInputFormat.setInputPaths(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));
job.setJobName("TeraSum");
job.setJarByClass(TeraChecksum.class);
job.setMapperClass(ChecksumMapper.class);
job.setReducerClass(ChecksumReducer.class);
job.setOutputKeyClass(NullWritable.class);
job.setOutputValueClass(Unsigned16.class);
// force a single reducer
job.setNumReduceTasks(1);
job.setInputFormatClass(TeraInputFormat.class);
return job.waitForCompletion(true) ? 0 : 1;
}
public Job createJob(boolean failMappers, boolean failReducers, Path inputFile)
throws IOException {
Configuration conf = getConf();
conf.setBoolean(FAIL_MAP, failMappers);
conf.setBoolean(FAIL_REDUCE, failReducers);
Job job = Job.getInstance(conf, "fail");
job.setJarByClass(FailJob.class);
job.setMapperClass(FailMapper.class);
job.setMapOutputKeyClass(LongWritable.class);
job.setMapOutputValueClass(NullWritable.class);
job.setReducerClass(FailReducer.class);
job.setOutputFormatClass(NullOutputFormat.class);
job.setInputFormatClass(TextInputFormat.class);
job.setSpeculativeExecution(false);
job.setJobName("Fail job");
FileInputFormat.addInputPath(job, inputFile);
return job;
}
@Override
public int run(String[] args)
throws Exception
{
Job job = Job.getInstance(getConf());
job.setJarByClass(Phase3Step4LocalDeDuplication.class);
job.setJobName(Phase3Step4LocalDeDuplication.class.getName());
// paths
String inputPath = args[0];
// text files of ids to be deleted
String outputPath = args[1];
// input: reading max N lines for each mapper
job.setInputFormatClass(NLineInputFormat.class);
NLineInputFormat.addInputPath(job, new Path(inputPath));
job.getConfiguration().setInt("mapreduce.input.lineinputformat.linespermap", LINES);
// mapper
job.setMapperClass(LocalGreedyDeDuplicationMapper.class);
LazyOutputFormat.setOutputFormatClass(job, TextOutputFormat.class);
// reducer
job.setReducerClass(IDCollectorReducer.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(NullWritable.class);
FileOutputFormat.setOutputPath(job, new Path(outputPath));
return job.waitForCompletion(true) ? 0 : 1;
}
public int run(String[] args) throws Exception {
if(args.length < 2) {
printUsage();
return 2;
}
Job job = Job.getInstance(getConf());
job.setJobName("MultiFileWordCount");
job.setJarByClass(MultiFileWordCount.class);
//set the InputFormat of the job to our InputFormat
job.setInputFormatClass(MyInputFormat.class);
// the keys are words (strings)
job.setOutputKeyClass(Text.class);
// the values are counts (ints)
job.setOutputValueClass(IntWritable.class);
//use the defined mapper
job.setMapperClass(MapClass.class);
//use the WordCount Reducer
job.setCombinerClass(IntSumReducer.class);
job.setReducerClass(IntSumReducer.class);
FileInputFormat.addInputPaths(job, args[0]);
FileOutputFormat.setOutputPath(job, new Path(args[1]));
return job.waitForCompletion(true) ? 0 : 1;
}
@Override
public int run(String[] args)
throws Exception
{
Configuration conf = getConf();
String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();
Job job = Job.getInstance();
job.setJarByClass(WordCounterExample.class);
job.setJobName(WordCounterExample.class.getName());
// mapper
job.setMapperClass(WordCounterMapper.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(LongWritable.class);
// combiner + reducer
job.setCombinerClass(TextLongCountingReducer.class);
job.setReducerClass(TextLongCountingReducer.class);
job.setInputFormatClass(WARCInputFormat.class);
job.setOutputFormatClass(TextOutputFormat.class);
// paths
String commaSeparatedInputFiles = otherArgs[0];
String outputPath = otherArgs[1];
FileInputFormat.addInputPaths(job, commaSeparatedInputFiles);
FileOutputFormat.setOutputPath(job, new Path(outputPath));
return job.waitForCompletion(true) ? 0 : 1;
}
@Override
public int run(String[] args)
throws Exception
{
Job job = Job.getInstance(getConf());
job.setJarByClass(Phase3Step3NearDupTuplesCreation.class);
job.setJobName(Phase3Step3NearDupTuplesCreation.class.getName());
// mapper
job.setMapperClass(CreateTuplesMapper.class);
job.setMapOutputKeyClass(NullWritable.class);
job.setMapOutputValueClass(TreeSet.class);
job.setInputFormatClass(TextInputFormat.class);
LazyOutputFormat.setOutputFormatClass(job, TextOutputFormat.class);
// paths
String commaSeparatedInputFiles = args[0];
String outputPath = args[1];
FileInputFormat.addInputPaths(job, commaSeparatedInputFiles);
FileOutputFormat.setOutputPath(job, new Path(outputPath));
job.setNumReduceTasks(0); //must be added or the mapper wont be called
return job.waitForCompletion(true) ? 0 : 1;
}
/**
* Job configurator
*
* @param job job instance
* @param jarByClass class of the jar
* @param mapperClass mapper
* @param reducerClass reducer
* @param commaSeparatedInputFiles input paths
* @param outputPath output
* @throws IOException I/O exception
*/
public static void configureJob(Job job, Class<?> jarByClass,
Class<? extends Mapper> mapperClass, Class<? extends Reducer> reducerClass,
String commaSeparatedInputFiles, String outputPath)
throws IOException
{
job.setJarByClass(jarByClass);
job.setJobName(jarByClass.getName());
// mapper
job.setMapperClass(mapperClass);
// reducer
job.setReducerClass(reducerClass);
// input-output is warc
job.setInputFormatClass(WARCInputFormat.class);
// prevent producing empty files
LazyOutputFormat.setOutputFormatClass(job, WARCOutputFormat.class);
// intermediate data
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(WARCWritable.class);
// output data
job.setOutputKeyClass(NullWritable.class);
job.setOutputValueClass(WARCWritable.class);
// set output compression to GZip
FileOutputFormat.setCompressOutput(job, true);
FileOutputFormat.setOutputCompressorClass(job, GzipCodec.class);
FileInputFormat.addInputPaths(job, commaSeparatedInputFiles);
FileOutputFormat.setOutputPath(job, new Path(outputPath));
}
@Override
public void configure(final Job job) throws Exception {
job.setMapperClass(Mapper.class);
job.setReducerClass(InputToOutputKeyReducer.class);
job.setMapOutputKeyClass(GeoWaveInputKey.class);
job.setMapOutputValueClass(ObjectWritable.class);
job.setOutputKeyClass(GeoWaveOutputKey.class);
job.setOutputValueClass(Object.class);
job.setSpeculativeExecution(false);
job.setJobName("GeoWave Input to Output");
job.setReduceSpeculativeExecution(false);
}
@Override
public int run(String[] args)
throws Exception
{
Job job = Job.getInstance(getConf());
job.setJarByClass(Phase3Step1ExtractNearDupInfo.class);
job.setJobName(Phase3Step1ExtractNearDupInfo.class.getName());
// mapper
job.setMapperClass(MapperClass.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(DocumentInfo.class);
// reducer
job.setReducerClass(DeDuplicationTextOutputReducer.class);
job.setOutputKeyClass(NullWritable.class);
job.setOutputValueClass(List.class);
job.setInputFormatClass(WARCInputFormat.class);
LazyOutputFormat.setOutputFormatClass(job, DocumentInfoOutputFormat.class);
// paths
String commaSeparatedInputFiles = args[0];
String outputPath = args[1];
FileInputFormat.addInputPaths(job, commaSeparatedInputFiles);
FileOutputFormat.setOutputPath(job, new Path(outputPath));
return job.waitForCompletion(true) ? 0 : 1;
}
/**
* The MapReduce driver - setup and launch the job.
*
* @param args the command-line arguments
* @return the process exit code
* @throws Exception if something goes wrong
*/
public int run(final String[] args) throws Exception {
Cli cli = Cli.builder().setArgs(args).addOptions(Opts.values()).build();
int result = cli.runCmd();
if (result != 0) {
return result;
}
File localStocksFile = new File(cli.getArgValueAsString(Opts.INPUT));
Path inputPath = new Path(cli.getArgValueAsString(Opts.PB_INPUT));
Path outputPath = new Path(cli.getArgValueAsString(Opts.OUTPUT));
Configuration conf = super.getConf();
if (!inputPath.getName().endsWith(".lzo")) {
throw new Exception("HDFS stock file must have a .lzo suffix");
}
generateInput(conf, localStocksFile, inputPath);
Job job = new Job(conf);
job.setJobName(StockProtocolBuffersMapReduce.class.getName());
job.setJarByClass(StockProtocolBuffersMapReduce.class);
job.setMapperClass(PBMapper.class);
job.setReducerClass(PBReducer.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(ProtobufStockWritable.class);
MultiInputFormat.setClassConf(Stock.class, job.getConfiguration());
LzoProtobufBlockOutputFormat.setClassConf(StockAvg.class, job.getConfiguration());
job.setInputFormatClass(LzoProtobufBlockInputFormat.class);
job.setOutputFormatClass(LzoProtobufBlockOutputFormat.class);
FileInputFormat.setInputPaths(job, inputPath);
FileOutputFormat.setOutputPath(job, outputPath);
if (job.waitForCompletion(true)) {
return 0;
}
return 1;
}
/**
* The original run() has been modified to:
* - take command parameters for running terasort on Pravega streams
* - use special mapper and reducer to convert data type required by Pravega hadoop connector
*/
public int run(String[] args) throws Exception {
if (args.length != 6) {
usage();
return 2;
}
LOG.info("starting");
Path inputDir = new Path(args[0]);
Path outputDir = new Path(args[1]);
getConf().setStrings(INPUT_URI_STRING, args[2]);
getConf().setStrings(INPUT_SCOPE_NAME, args[3]);
getConf().setStrings(INPUT_STREAM_NAME, args[4]);
getConf().setStrings(INPUT_DESERIALIZER, TextSerializer.class.getName());
getConf().setStrings(OUTPUT_SCOPE_NAME, args[3]);
getConf().setStrings(OUTPUT_URI_STRING, args[2]);
getConf().setStrings(OUTPUT_DESERIALIZER, TextSerializer.class.getName());
getConf().setStrings(OUTPUT_STREAM_PREFIX, args[5]);
Job job = Job.getInstance(getConf());
boolean useSimplePartitioner = getUseSimplePartitioner(job);
TeraInputFormat.setInputPaths(job, inputDir);
FileOutputFormat.setOutputPath(job, outputDir);
job.setJobName("TeraSort");
job.setJarByClass(TeraSort.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Text.class);
job.setMapperClass(TeraSortMapper.class);
job.setReducerClass(TeraSortReducer.class);
job.setInputFormatClass(PravegaInputFormat.class);
job.setOutputFormatClass(PravegaTeraSortOutputFormat.class);
if (useSimplePartitioner) {
job.setPartitionerClass(SimplePartitioner.class);
} else {
long start = System.currentTimeMillis();
Path partitionFile = new Path(outputDir,
TeraInputFormat.PARTITION_FILENAME);
URI partitionUri = new URI(partitionFile.toString() +
"#" + TeraInputFormat.PARTITION_FILENAME);
try {
TeraInputFormat.writePartitionFile(job, partitionFile);
} catch (Throwable e) {
LOG.error(e.getMessage());
return -1;
}
job.addCacheFile(partitionUri);
long end = System.currentTimeMillis();
LOG.info("Spent " + (end - start) + "ms computing partitions.");
job.setPartitionerClass(TotalOrderPartitioner.class);
}
job.getConfiguration().setInt("dfs.replication", getOutputReplication(job));
int ret = job.waitForCompletion(true) ? 0 : 1;
LOG.info("done");
return ret;
}
@Override
@SuppressWarnings("squid:S1166") // Exception caught and error message printed
@SuppressFBWarnings(value = "PATH_TRAVERSAL_IN", justification = "File used for reading script")
public int run(final CommandLine line, final Configuration conf,
final ProviderProperties providerProperties) throws ParseException
{
System.out.println(log.getClass().getName());
String expression = line.getOptionValue("e");
String output = line.getOptionValue("o");
String script = line.getOptionValue("s");
if (expression == null && script == null)
{
throw new ParseException("Either an expression or script must be specified.");
}
try
{
if (script != null)
{
File f = new File(script);
int total = (int) f.length();
byte[] buffer = new byte[total];
int read = 0;
try (FileInputStream fis = new FileInputStream(f))
{
while (read < total)
{
read += fis.read(buffer, read, total - read);
}
expression = new String(buffer);
}
}
String protectionLevel = line.getOptionValue("pl");
log.debug("expression: " + expression);
log.debug("output: " + output);
Job job = new Job();
job.setJobName("MapAlgebra");
MrsImageDataProvider dp =
DataProviderFactory.getMrsImageDataProvider(output, AccessMode.OVERWRITE, providerProperties);
String useProtectionLevel = ProtectionLevelUtils.getAndValidateProtectionLevel(dp, protectionLevel);
boolean valid = org.mrgeo.mapalgebra.MapAlgebra.validate(expression, providerProperties);
if (valid)
{
if (org.mrgeo.mapalgebra.MapAlgebra.mapalgebra(expression, output, conf,
providerProperties, useProtectionLevel))
{
if (line.hasOption("b"))
{
System.out.println("Building pyramids...");
if (!BuildPyramid.build(output, new MeanAggregator(), conf, providerProperties))
{
System.out.println("Building pyramids failed. See YARN logs for more information.");
}
}
}
}
}
catch (IOException e)
{
System.out.println("Failure while running map algebra " + e.getMessage());
return -1;
}
return 0;
}
/**
* Run an import job to read a table in to HDFS.
*
* @param tableName the database table to read; may be null if a free-form
* query is specified in the SqoopOptions, and the ImportJobBase subclass
* supports free-form queries.
* @param ormJarFile the Jar file to insert into the dcache classpath.
* (may be null)
* @param splitByCol the column of the database table to use to split
* the import
* @param conf A fresh Hadoop Configuration to use to build an MR job.
* @throws IOException if the job encountered an IO problem
* @throws ImportException if the job failed unexpectedly or was
* misconfigured.
*/
public void runImport(String tableName, String ormJarFile, String splitByCol,
Configuration conf) throws IOException, ImportException {
// Check if there are runtime error checks to do
if (isHCatJob && options.isDirect()
&& !context.getConnManager().isDirectModeHCatSupported()) {
throw new IOException("Direct import is not compatible with "
+ "HCatalog operations using the connection manager "
+ context.getConnManager().getClass().getName()
+ ". Please remove the parameter --direct");
}
if (options.getAccumuloTable() != null && options.isDirect()
&& !getContext().getConnManager().isDirectModeAccumuloSupported()) {
throw new IOException("Direct mode is incompatible with "
+ "Accumulo. Please remove the parameter --direct");
}
if (options.getHBaseTable() != null && options.isDirect()
&& !getContext().getConnManager().isDirectModeHBaseSupported()) {
throw new IOException("Direct mode is incompatible with "
+ "HBase. Please remove the parameter --direct");
}
if (null != tableName) {
LOG.info("Beginning import of " + tableName);
} else {
LOG.info("Beginning query import.");
}
String tableClassName = null;
if (!getContext().getConnManager().isORMFacilitySelfManaged()) {
tableClassName =
new TableClassName(options).getClassForTable(tableName);
}
// For ORM self managed, we leave the tableClassName to null so that
// we don't check for non-existing classes.
loadJars(conf, ormJarFile, tableClassName);
Job job = createJob(conf);
try {
// Set the external jar to use for the job.
job.getConfiguration().set("mapred.jar", ormJarFile);
if (options.getMapreduceJobName() != null) {
job.setJobName(options.getMapreduceJobName());
}
propagateOptionsToJob(job);
configureInputFormat(job, tableName, tableClassName, splitByCol);
configureOutputFormat(job, tableName, tableClassName);
configureMapper(job, tableName, tableClassName);
configureNumTasks(job);
cacheJars(job, getContext().getConnManager());
jobSetup(job);
setJob(job);
boolean success = runJob(job);
if (!success) {
throw new ImportException("Import job failed!");
}
completeImport(job);
if (options.isValidationEnabled()) {
validateImport(tableName, conf, job);
}
} catch (InterruptedException ie) {
throw new IOException(ie);
} catch (ClassNotFoundException cnfe) {
throw new IOException(cnfe);
} finally {
unloadJars();
jobTeardown(job);
}
}
@Override
public int run(String[] args) throws Exception {
_configure(args);
final Configuration conf = getConf();
String type = conf.get(MetricsConfig.TYPE);
/*
* if the type is "errors", we want to process all of the errors from the metrics files first and then run the regular ingest metrics process
*/
// MetricsServer.setServerConf(conf);
// MetricsServer.initInstance();
if ("errors".equals(type)) {
try {
launchErrorsJob(Job.getInstance(conf), conf);
} catch (Exception e) {
log.info("Failed to launch errors job", e);
}
type = "ingest";
conf.set(MetricsConfig.TYPE, type);
}
/* Type logic so I can differeniate between loader and ingest metrics jobs */
Class<? extends Mapper<?,?,?,?>> mapperClass;
String outTable;
Path inputDirectoryPath = new Path(conf.get(MetricsConfig.INPUT_DIRECTORY));
FileSystem fs = FileSystem.get(inputDirectoryPath.toUri(), conf);
FileStatus[] fstats = fs.listStatus(inputDirectoryPath);
Path[] files = FileUtil.stat2Paths(fstats);
Path[] fileBuffer = new Path[MAX_FILES];
for (int i = 0; i < files.length;) {
Job job = Job.getInstance(getConf());
job.setJarByClass(this.getClass());
job.getConfiguration().setInt("mapred.job.reuse.jvm.num.tasks", -1);
if ("ingest".equalsIgnoreCase(type)) {
mapperClass = IngestMetricsMapper.class;
outTable = conf.get(MetricsConfig.INGEST_TABLE, MetricsConfig.DEFAULT_INGEST_TABLE);
job.setInputFormatClass(SequenceFileInputFormat.class);
} else if ("loader".equalsIgnoreCase(type)) {
mapperClass = LoaderMetricsMapper.class;
outTable = conf.get(MetricsConfig.LOADER_TABLE, MetricsConfig.DEFAULT_LOADER_TABLE);
job.setInputFormatClass(SequenceFileInputFormat.class);
} else if ("flagmaker".equalsIgnoreCase(type)) {
mapperClass = FlagMakerMetricsMapper.class;
outTable = conf.get(MetricsConfig.FLAGMAKER_TABLE, MetricsConfig.DEFAULT_FLAGMAKER_TABLE);
job.setInputFormatClass(SequenceFileInputFormat.class);
} else {
log.error(type + " is not a valid job type. Please use <ingest|loader>.");
return -1;
}
job.setJobName("MetricsIngester-" + type);
if (files.length - i > MAX_FILES) {
System.arraycopy(files, i, fileBuffer, 0, MAX_FILES);
i += MAX_FILES;
} else {
fileBuffer = new Path[files.length - i];
System.arraycopy(files, i, fileBuffer, 0, fileBuffer.length);
i += files.length - i;
}
SequenceFileInputFormat.setInputPaths(job, fileBuffer);
job.setMapperClass(mapperClass);
job.setNumReduceTasks(0);
job.setOutputFormatClass(AccumuloOutputFormat.class);
AccumuloOutputFormat.setConnectorInfo(job, conf.get(MetricsConfig.USER), new PasswordToken(conf.get(MetricsConfig.PASS, "").getBytes()));
AccumuloOutputFormat.setCreateTables(job, createTables);
AccumuloOutputFormat.setDefaultTableName(job, outTable);
log.info("zookeepers = " + conf.get(MetricsConfig.ZOOKEEPERS));
log.info("instance = " + conf.get(MetricsConfig.INSTANCE));
log.info("clientConfuguration = "
+ ClientConfiguration.loadDefault().withInstance(conf.get(MetricsConfig.INSTANCE)).withZkHosts(conf.get(MetricsConfig.ZOOKEEPERS)));
AccumuloOutputFormat.setZooKeeperInstance(job,
ClientConfiguration.loadDefault().withInstance(conf.get(MetricsConfig.INSTANCE)).withZkHosts(conf.get(MetricsConfig.ZOOKEEPERS)));
AccumuloOutputFormat.setBatchWriterOptions(job, new BatchWriterConfig().setMaxLatency(25, TimeUnit.MILLISECONDS));
job.submit();
job.waitForCompletion(true);
if (job.isSuccessful()) {
for (Path p : fileBuffer) {
fs.delete(p, true);
}
}
}
return 0;
}
public static void main (String[] args) throws IOException, InterruptedException, ClassNotFoundException {
if (args.length == 0) {
System.out
.println("ExportHBaseTableToDelimiteredTxt {tableName} {ColumnFamily} {outputPath} {shouldCompressWithGz} {schemaLocationOnHdfs} {delimiter} {rowKeyColumn.Optional}");
return;
}
String table = args[0];
String columnFamily = args[1];
String outputPath = args[2];
String shouldCompression = args[3];
String schemaFilePath = args[4];
String delimiter = args[5];
String rowKeyColumn = "";
if (args.length > 6) {
rowKeyColumn = args[6];
}
Job job = Job.getInstance();
job.getConfiguration().set(ROW_KEY_COLUMN_CONF, rowKeyColumn);
HBaseConfiguration.addHbaseResources(job.getConfiguration());
job.getConfiguration().set(SHOULD_COMPRESSION_CONF, shouldCompression);
job.getConfiguration().set(SCHEMA_FILE_LOCATION_CONF, schemaFilePath);
job.getConfiguration().set(OUTPUT_PATH_CONF, outputPath);
job.getConfiguration().set(DELIMITER_CONF, delimiter);
job.setJarByClass(ExportHBaseTableToDelimiteredTxt.class);
job.setJobName("ExportHBaseTableToDelimiteredTxt ");
Scan scan = new Scan();
scan.setCaching(500); // 1 is the default in Scan, which will be bad for
// MapReduce jobs
scan.setCacheBlocks(false); // don't set to true for MR jobs
scan.addFamily(Bytes.toBytes(columnFamily));
TableMapReduceUtil.initTableMapperJob(table, // input HBase table name
scan, // Scan instance to control CF and attribute selection
MyMapper.class, // mapper
null, // mapper output key
null, // mapper output value
job);
job.setOutputFormatClass(NullOutputFormat.class); // because we aren't
// emitting anything from
// mapper
job.setNumReduceTasks(0);
boolean b = job.waitForCompletion(true);
}
/**
* Run a map/reduce job for estimating Pi.
*
* @return the estimated value of Pi
*/
@SuppressWarnings("finally")
public static BigDecimal estimate(int numMaps, long numPoints, Job job
) throws IOException {
//setup job conf
job.setJobName(PiEstimator.class.getSimpleName());
job.setInputFormatClass(SequenceFileInputFormat.class);
job.setOutputKeyClass(BooleanWritable.class);
job.setOutputValueClass(LongWritable.class);
job.setOutputFormatClass(SequenceFileOutputFormat.class);
job.setMapperClass(PiMapper.class);
job.setReducerClass(PiReducer.class);
job.setNumReduceTasks(1);
// turn off speculative execution, because DFS doesn't handle
// multiple writers to the same file.
job.setSpeculativeExecution(false);
//setup input/output directories
//final Path inDir = new Path(TMP_DIR, "in");
final Path inDir = new Path("/home/hadoop1/tmp_dir", "in");
System.out.println("inDir =" + inDir.toString());
//final Path outDir = new Path(TMP_DIR, "out");
final Path outDir = new Path("/home/hadoop1/tmp_dir", "out");
System.out.println("outDir =" + outDir.toString());
FileInputFormat.addInputPath(job, inDir);
FileOutputFormat.setOutputPath(job, outDir);
final FileSystem fs = FileSystem.get(job.getConfiguration());
if (fs.exists(TMP_DIR)) {
throw new IOException("Tmp directory " + fs.makeQualified(TMP_DIR)
+ " already exists. Please remove it first.");
}
if (!fs.mkdirs(inDir)) {
throw new IOException("Cannot create input directory " + inDir);
}
try {
//generate an input file for each map task
for(int i=0; i < numMaps; ++i) {
final Path file = new Path(inDir, "part"+i);
final LongWritable offset = new LongWritable(i * numPoints);
final LongWritable size = new LongWritable(numPoints);
final SequenceFile.Writer writer = SequenceFile.createWriter(
fs, job.getConfiguration(), file,
LongWritable.class, LongWritable.class, CompressionType.NONE);
try {
writer.append(offset, size);
} finally {
writer.close();
}
System.out.println("Wrote input for Map #"+i);
}
//start a map/reduce job
System.out.println("Starting Job");
final long startTime = System.currentTimeMillis();
Boolean waitforCompletion = job.waitForCompletion(true) ;
final double duration = (System.currentTimeMillis() - startTime)/1000.0;
System.out.println("Job Finished in " + duration + " seconds");
//read outputs
Path inFile = new Path(outDir, "reduce-out");
LongWritable numInside = new LongWritable();
LongWritable numOutside = new LongWritable();
SequenceFile.Reader reader = new SequenceFile.Reader(fs, inFile, job.getConfiguration());
try {
reader.next(numInside, numOutside);
} finally {
reader.close();
}
//compute estimated value
return BigDecimal.valueOf(4).setScale(20)
.multiply(BigDecimal.valueOf(numInside.get()))
.divide(BigDecimal.valueOf(numMaps))
.divide(BigDecimal.valueOf(numPoints));
}catch (InterruptedException e){
System.out.println("Job Exception " + e.getMessage() );
} finally {
fs.delete(TMP_DIR, true);
return BigDecimal.valueOf(4);
}
}
public int run(String[] args) throws Exception {
//创建job
Configuration config = getConf();
//添加自定义配置
config.addResource("mr.xml");
Job job = Job.getInstance(config);
//通过job设置一些参数
job.setJarByClass(ParseLogJob.class);
job.setJobName("parselog");
job.setMapperClass(LogMapper.class);
job.setReducerClass(LogReducer.class);
job.setMapOutputKeyClass(TextLongWritable.class);
job.setGroupingComparatorClass(TextLongGroupComparator.class);
job.setPartitionerClass(TextLongPartition.class);
job.setMapOutputValueClass(LogWritable.class);
job.setOutputValueClass(Text.class);
//设置CombineFileInputFormat
job.setInputFormatClass(CombineTextInputFormat.class);
//添加分布式缓存
job.addCacheFile(new URI(config.get("ip.file.path")));
//添加输入和输出数据
FileInputFormat.addInputPath(job, new Path(args[0]));
Path outputPath = new Path(args[1]);
FileOutputFormat.setOutputPath(job, outputPath);
//设置压缩类型
// FileOutputFormat.setCompressOutput(job, true);
// FileOutputFormat.setOutputCompressorClass(job, LzopCodec.class);
FileSystem fs = FileSystem.get(config);
if (fs.exists(outputPath)) {
fs.delete(outputPath, true);
}
//运行程序
if (!job.waitForCompletion(true)) {
throw new RuntimeException(job.getJobName() + "failed!");
}
return 0;
}