下面列出了怎么用org.apache.hadoop.mapreduce.lib.output.NullOutputFormat的API类实例代码及写法,或者点击链接到github查看源代码。
public Job createJob()
throws IOException {
Configuration conf = getConf();
conf.setInt(MRJobConfig.NUM_MAPS, 1);
Job job = Job.getInstance(conf, "test");
job.setNumReduceTasks(1);
job.setJarByClass(CredentialsTestJob.class);
job.setNumReduceTasks(1);
job.setMapperClass(CredentialsTestJob.CredentialsTestMapper.class);
job.setMapOutputKeyClass(IntWritable.class);
job.setMapOutputValueClass(NullWritable.class);
job.setReducerClass(CredentialsTestJob.CredentialsTestReducer.class);
job.setInputFormatClass(SleepJob.SleepInputFormat.class);
job.setPartitionerClass(SleepJob.SleepJobPartitioner.class);
job.setOutputFormatClass(NullOutputFormat.class);
job.setSpeculativeExecution(false);
job.setJobName("test job");
FileInputFormat.addInputPath(job, new Path("ignored"));
return job;
}
public Job createJob(boolean failMappers, boolean failReducers, Path inputFile)
throws IOException {
Configuration conf = getConf();
conf.setBoolean(FAIL_MAP, failMappers);
conf.setBoolean(FAIL_REDUCE, failReducers);
Job job = Job.getInstance(conf, "fail");
job.setJarByClass(FailJob.class);
job.setMapperClass(FailMapper.class);
job.setMapOutputKeyClass(LongWritable.class);
job.setMapOutputValueClass(NullWritable.class);
job.setReducerClass(FailReducer.class);
job.setOutputFormatClass(NullOutputFormat.class);
job.setInputFormatClass(TextInputFormat.class);
job.setSpeculativeExecution(false);
job.setJobName("Fail job");
FileInputFormat.addInputPath(job, inputFile);
return job;
}
private static void runTest(String name, Job job) throws Exception {
job.setNumReduceTasks(1);
job.getConfiguration().set(MRConfig.FRAMEWORK_NAME, MRConfig.LOCAL_FRAMEWORK_NAME);
job.getConfiguration().setInt(MRJobConfig.IO_SORT_FACTOR, 1000);
job.getConfiguration().set("fs.defaultFS", "file:///");
job.getConfiguration().setInt("test.mapcollection.num.maps", 1);
job.setInputFormatClass(FakeIF.class);
job.setOutputFormatClass(NullOutputFormat.class);
job.setMapperClass(Mapper.class);
job.setReducerClass(SpillReducer.class);
job.setMapOutputKeyClass(KeyWritable.class);
job.setMapOutputValueClass(ValWritable.class);
job.setSortComparatorClass(VariableComparator.class);
LOG.info("Running " + name);
assertTrue("Job failed!", job.waitForCompletion(false));
}
public Job createJob(int numMapper, int numReducer,
long mapSleepTime, int mapSleepCount,
long reduceSleepTime, int reduceSleepCount)
throws IOException {
Configuration conf = getConf();
conf.setLong(MAP_SLEEP_TIME, mapSleepTime);
conf.setLong(REDUCE_SLEEP_TIME, reduceSleepTime);
conf.setInt(MAP_SLEEP_COUNT, mapSleepCount);
conf.setInt(REDUCE_SLEEP_COUNT, reduceSleepCount);
conf.setInt(MRJobConfig.NUM_MAPS, numMapper);
Job job = Job.getInstance(conf, "sleep");
job.setNumReduceTasks(numReducer);
job.setJarByClass(SleepJob.class);
job.setMapperClass(SleepMapper.class);
job.setMapOutputKeyClass(IntWritable.class);
job.setMapOutputValueClass(NullWritable.class);
job.setReducerClass(SleepReducer.class);
job.setOutputFormatClass(NullOutputFormat.class);
job.setInputFormatClass(SleepInputFormat.class);
job.setPartitionerClass(SleepJobPartitioner.class);
job.setSpeculativeExecution(false);
job.setJobName("Sleep job");
FileInputFormat.addInputPath(job, new Path("ignored"));
return job;
}
@Override
public Job call() throws IOException, InterruptedException,
ClassNotFoundException {
UserGroupInformation ugi = UserGroupInformation.getLoginUser();
ugi.doAs( new PrivilegedExceptionAction <Job>() {
public Job run() throws IOException, ClassNotFoundException,
InterruptedException {
job.setMapperClass(GenDCDataMapper.class);
job.setNumReduceTasks(0);
job.setMapOutputKeyClass(NullWritable.class);
job.setMapOutputValueClass(BytesWritable.class);
job.setInputFormatClass(GenDCDataFormat.class);
job.setOutputFormatClass(NullOutputFormat.class);
job.setJarByClass(GenerateDistCacheData.class);
try {
FileInputFormat.addInputPath(job, new Path("ignored"));
} catch (IOException e) {
LOG.error("Error while adding input path ", e);
}
job.submit();
return job;
}
});
return job;
}
public Job createJob()
throws IOException {
Configuration conf = getConf();
conf.setInt(MRJobConfig.NUM_MAPS, 1);
Job job = Job.getInstance(conf, "test");
job.setNumReduceTasks(1);
job.setJarByClass(CredentialsTestJob.class);
job.setNumReduceTasks(1);
job.setMapperClass(CredentialsTestJob.CredentialsTestMapper.class);
job.setMapOutputKeyClass(IntWritable.class);
job.setMapOutputValueClass(NullWritable.class);
job.setReducerClass(CredentialsTestJob.CredentialsTestReducer.class);
job.setInputFormatClass(SleepJob.SleepInputFormat.class);
job.setPartitionerClass(SleepJob.SleepJobPartitioner.class);
job.setOutputFormatClass(NullOutputFormat.class);
job.setSpeculativeExecution(false);
job.setJobName("test job");
FileInputFormat.addInputPath(job, new Path("ignored"));
return job;
}
public Job createJob(boolean failMappers, boolean failReducers, Path inputFile)
throws IOException {
Configuration conf = getConf();
conf.setBoolean(FAIL_MAP, failMappers);
conf.setBoolean(FAIL_REDUCE, failReducers);
Job job = Job.getInstance(conf, "fail");
job.setJarByClass(FailJob.class);
job.setMapperClass(FailMapper.class);
job.setMapOutputKeyClass(LongWritable.class);
job.setMapOutputValueClass(NullWritable.class);
job.setReducerClass(FailReducer.class);
job.setOutputFormatClass(NullOutputFormat.class);
job.setInputFormatClass(TextInputFormat.class);
job.setSpeculativeExecution(false);
job.setJobName("Fail job");
FileInputFormat.addInputPath(job, inputFile);
return job;
}
private static void runTest(String name, Job job) throws Exception {
job.setNumReduceTasks(1);
job.getConfiguration().set(MRConfig.FRAMEWORK_NAME, MRConfig.LOCAL_FRAMEWORK_NAME);
job.getConfiguration().setInt(MRJobConfig.IO_SORT_FACTOR, 1000);
job.getConfiguration().set("fs.defaultFS", "file:///");
job.getConfiguration().setInt("test.mapcollection.num.maps", 1);
job.setInputFormatClass(FakeIF.class);
job.setOutputFormatClass(NullOutputFormat.class);
job.setMapperClass(Mapper.class);
job.setReducerClass(SpillReducer.class);
job.setMapOutputKeyClass(KeyWritable.class);
job.setMapOutputValueClass(ValWritable.class);
job.setSortComparatorClass(VariableComparator.class);
LOG.info("Running " + name);
assertTrue("Job failed!", job.waitForCompletion(false));
}
public Job createJob(int numMapper, int numReducer,
long mapSleepTime, int mapSleepCount,
long reduceSleepTime, int reduceSleepCount)
throws IOException {
Configuration conf = getConf();
conf.setLong(MAP_SLEEP_TIME, mapSleepTime);
conf.setLong(REDUCE_SLEEP_TIME, reduceSleepTime);
conf.setInt(MAP_SLEEP_COUNT, mapSleepCount);
conf.setInt(REDUCE_SLEEP_COUNT, reduceSleepCount);
conf.setInt(MRJobConfig.NUM_MAPS, numMapper);
Job job = Job.getInstance(conf, "sleep");
job.setNumReduceTasks(numReducer);
job.setJarByClass(SleepJob.class);
job.setMapperClass(SleepMapper.class);
job.setMapOutputKeyClass(IntWritable.class);
job.setMapOutputValueClass(NullWritable.class);
job.setReducerClass(SleepReducer.class);
job.setOutputFormatClass(NullOutputFormat.class);
job.setInputFormatClass(SleepInputFormat.class);
job.setPartitionerClass(SleepJobPartitioner.class);
job.setSpeculativeExecution(false);
job.setJobName("Sleep job");
FileInputFormat.addInputPath(job, new Path("ignored"));
return job;
}
@Override
public Job call() throws IOException, InterruptedException,
ClassNotFoundException {
UserGroupInformation ugi = UserGroupInformation.getLoginUser();
ugi.doAs( new PrivilegedExceptionAction <Job>() {
public Job run() throws IOException, ClassNotFoundException,
InterruptedException {
job.setMapperClass(GenDCDataMapper.class);
job.setNumReduceTasks(0);
job.setMapOutputKeyClass(NullWritable.class);
job.setMapOutputValueClass(BytesWritable.class);
job.setInputFormatClass(GenDCDataFormat.class);
job.setOutputFormatClass(NullOutputFormat.class);
job.setJarByClass(GenerateDistCacheData.class);
try {
FileInputFormat.addInputPath(job, new Path("ignored"));
} catch (IOException e) {
LOG.error("Error while adding input path ", e);
}
job.submit();
return job;
}
});
return job;
}
private void runTestOnTable() throws InterruptedException, ClassNotFoundException {
Job job = null;
try {
Configuration conf = graph.configuration().toHBaseConfiguration();
job = Job.getInstance(conf, "test123");
job.setOutputFormatClass(NullOutputFormat.class);
job.setNumReduceTasks(0);
Scan scan = new Scan();
scan.addColumn(FAMILY_NAME, COLUMN_NAME);
scan.setTimeRange(MINSTAMP, MAXSTAMP);
scan.setMaxVersions();
TableMapReduceUtil.initTableMapperJob(TABLE_NAME.getNameAsString(),
scan, ProcessTimeRangeMapper.class, Text.class, Text.class, job,
true, TableInputFormat.class);
job.waitForCompletion(true);
} catch (IOException e) {
// TODO Auto-generated catch block
e.printStackTrace();
} finally {
if (job != null) {
FileUtil.fullyDelete(
new File(job.getConfiguration().get("hadoop.tmp.dir")));
}
}
}
private Job getVertexJobWithDefaultMapper(org.apache.hadoop.conf.Configuration c) throws IOException {
Job job = Job.getInstance(c);
job.setJarByClass(HadoopScanMapper.class);
job.setJobName("testPartitionedVertexScan");
job.setOutputKeyClass(NullWritable.class);
job.setOutputValueClass(NullWritable.class);
job.setMapOutputKeyClass(NullWritable.class);
job.setMapOutputValueClass(NullWritable.class);
job.setNumReduceTasks(0);
job.setOutputFormatClass(NullOutputFormat.class);
job.setInputFormatClass(CassandraInputFormat.class);
return job;
}
@Test
public void testJobConfigurationsWithDryMode() throws Exception {
Path bulkOutputPath = new Path(util.getDataTestDirOnTestFS(tn.getNameAsString()),"hfiles");
String INPUT_FILE = "InputFile1.csv";
// Prepare the arguments required for the test.
String[] argsArray = new String[] {
"-D" + ImportTsv.COLUMNS_CONF_KEY + "=HBASE_ROW_KEY,FAM:A,FAM:B",
"-D" + ImportTsv.SEPARATOR_CONF_KEY + "=,",
"-D" + ImportTsv.BULK_OUTPUT_CONF_KEY + "=" + bulkOutputPath.toString(),
"-D" + ImportTsv.DRY_RUN_CONF_KEY + "=true",
tn.getNameAsString(),
INPUT_FILE };
assertEquals("running test job configuration failed.", 0, ToolRunner.run(
new Configuration(util.getConfiguration()),
new ImportTsv() {
@Override
public int run(String[] args) throws Exception {
Job job = createSubmittableJob(getConf(), args);
assertTrue(job.getOutputFormatClass().equals(NullOutputFormat.class));
return 0;
}
}, argsArray));
// Delete table created by createSubmittableJob.
util.deleteTable(tn);
}
void testInputFormat(Class<? extends InputFormat> clazz)
throws IOException, InterruptedException, ClassNotFoundException {
final Job job = MapreduceTestingShim.createJob(UTIL.getConfiguration());
job.setInputFormatClass(clazz);
job.setOutputFormatClass(NullOutputFormat.class);
job.setMapperClass(ExampleVerifier.class);
job.setNumReduceTasks(0);
LOG.debug("submitting job.");
assertTrue("job failed!", job.waitForCompletion(true));
assertEquals("Saw the wrong number of instances of the filtered-for row.", 2, job.getCounters()
.findCounter(TestTableInputFormat.class.getName() + ":row", "aaa").getValue());
assertEquals("Saw any instances of the filtered out row.", 0, job.getCounters()
.findCounter(TestTableInputFormat.class.getName() + ":row", "bbb").getValue());
assertEquals("Saw the wrong number of instances of columnA.", 1, job.getCounters()
.findCounter(TestTableInputFormat.class.getName() + ":family", "columnA").getValue());
assertEquals("Saw the wrong number of instances of columnB.", 1, job.getCounters()
.findCounter(TestTableInputFormat.class.getName() + ":family", "columnB").getValue());
assertEquals("Saw the wrong count of values for the filtered-for row.", 2, job.getCounters()
.findCounter(TestTableInputFormat.class.getName() + ":value", "value aaa").getValue());
assertEquals("Saw the wrong count of values for the filtered-out row.", 0, job.getCounters()
.findCounter(TestTableInputFormat.class.getName() + ":value", "value bbb").getValue());
}
/**
* Run MR job to check the number of mapper = expectedNumOfSplits
*/
protected void testNumOfSplitsMR(int splitsPerRegion, int expectedNumOfSplits)
throws IOException, InterruptedException, ClassNotFoundException {
String jobName = "TestJobForNumOfSplits-MR";
LOG.info("Before map/reduce startup - job " + jobName);
JobConf c = new JobConf(TEST_UTIL.getConfiguration());
Scan scan = new Scan();
scan.addFamily(INPUT_FAMILYS[0]);
scan.addFamily(INPUT_FAMILYS[1]);
c.setInt("hbase.mapreduce.tableinput.mappers.per.region", splitsPerRegion);
c.set(KEY_STARTROW, "");
c.set(KEY_LASTROW, "");
Job job = Job.getInstance(c, jobName);
TableMapReduceUtil.initTableMapperJob(TABLE_NAME.getNameAsString(), scan, ScanMapper.class,
ImmutableBytesWritable.class, ImmutableBytesWritable.class, job);
job.setReducerClass(ScanReducer.class);
job.setNumReduceTasks(1);
job.setOutputFormatClass(NullOutputFormat.class);
assertTrue("job failed!", job.waitForCompletion(true));
// for some reason, hbase does not expose JobCounter.TOTAL_LAUNCHED_MAPS,
// we use TaskCounter.SHUFFLED_MAPS to get total launched maps
assertEquals("Saw the wrong count of mappers per region", expectedNumOfSplits,
job.getCounters().findCounter(TaskCounter.SHUFFLED_MAPS).getValue());
}
public List<Message> read(Path parquetPath) throws Exception {
synchronized (ReadUsingMR.class) {
outputMessages = new ArrayList<Message>();
final Job job = new Job(conf, "read");
job.setInputFormatClass(ProtoParquetInputFormat.class);
ProtoParquetInputFormat.setInputPaths(job, parquetPath);
if (projection != null) {
ProtoParquetInputFormat.setRequestedProjection(job, projection);
}
job.setMapperClass(ReadingMapper.class);
job.setNumReduceTasks(0);
job.setOutputFormatClass(NullOutputFormat.class);
WriteUsingMR.waitForJob(job);
List<Message> result = Collections.unmodifiableList(outputMessages);
outputMessages = null;
return result;
}
}
private Job configureSubmittableJob(Job job, Path outputPath, Class<IndexScrutinyMapperForTest> mapperClass) throws Exception {
Configuration conf = job.getConfiguration();
conf.setBoolean("mapreduce.job.user.classpath.first", true);
HBaseConfiguration.merge(conf, HBaseConfiguration.create(conf));
job.setJarByClass(IndexScrutinyTool.class);
job.setOutputFormatClass(NullOutputFormat.class);
if (outputInvalidRows && OutputFormat.FILE.equals(outputFormat)) {
job.setOutputFormatClass(TextOutputFormat.class);
FileOutputFormat.setOutputPath(job, outputPath);
}
job.setMapperClass((mapperClass == null ? IndexScrutinyMapper.class : mapperClass));
job.setNumReduceTasks(0);
// Set the Output classes
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(Text.class);
TableMapReduceUtil.addDependencyJars(job);
return job;
}
public void testOneRemoteJT() throws Exception {
LOG.info("Starting testOneRemoteJT");
String[] racks = "/rack-1".split(",");
String[] trackers = "tracker-1".split(",");
corona = new MiniCoronaCluster.Builder().numTaskTrackers(1).racks(racks)
.hosts(trackers).build();
Configuration conf = corona.createJobConf();
conf.set("mapred.job.tracker", "corona");
conf.set("mapred.job.tracker.class", CoronaJobTracker.class.getName());
String locationsCsv = "tracker-1";
conf.set("test.locations", locationsCsv);
conf.setBoolean("mapred.coronajobtracker.forceremote", true);
Job job = new Job(conf);
job.setMapperClass(TstJob.TestMapper.class);
job.setInputFormatClass(TstJob.TestInputFormat.class);
job.setOutputFormatClass(NullOutputFormat.class);
job.setNumReduceTasks(0);
job.getConfiguration().set("io.sort.record.pct", "0.50");
job.getConfiguration().set("io.sort.mb", "25");
boolean success = job.waitForCompletion(true);
assertTrue("Job did not succeed", success);
}
@Override
public void exportTable(ExportJobContext context)
throws IOException, ExportException {
context.setConnManager(this);
options.setStagingTableName(null);
PGBulkloadExportJob jobbase =
new PGBulkloadExportJob(context,
null,
ExportInputFormat.class,
NullOutputFormat.class);
jobbase.runExport();
}
public void exportTable(ExportJobContext context)
throws IOException, ExportException {
context.setConnManager(this);
PostgreSQLCopyExportJob job =
new PostgreSQLCopyExportJob(context,
null,
ExportInputFormat.class,
NullOutputFormat.class);
job.runExport();
}
private void testWithConf(Configuration conf) throws IOException,
InterruptedException, ClassNotFoundException, URISyntaxException {
// Create a temporary file of length 1.
Path first = createTempFile("distributed.first", "x");
// Create two jars with a single file inside them.
Path second =
makeJar(new Path(TEST_ROOT_DIR, "distributed.second.jar"), 2);
Path third =
makeJar(new Path(TEST_ROOT_DIR, "distributed.third.jar"), 3);
Path fourth =
makeJar(new Path(TEST_ROOT_DIR, "distributed.fourth.jar"), 4);
Job job = Job.getInstance(conf);
job.setMapperClass(DistributedCacheCheckerMapper.class);
job.setReducerClass(DistributedCacheCheckerReducer.class);
job.setOutputFormatClass(NullOutputFormat.class);
FileInputFormat.setInputPaths(job, first);
// Creates the Job Configuration
job.addCacheFile(
new URI(first.toUri().toString() + "#distributed.first.symlink"));
job.addFileToClassPath(second);
job.addArchiveToClassPath(third);
job.addCacheArchive(fourth.toUri());
job.setMaxMapAttempts(1); // speed up failures
job.submit();
assertTrue(job.waitForCompletion(false));
}
private static Job getJobForClient() throws IOException {
Job job = Job.getInstance(new Configuration());
job.getConfiguration().set("mapred.job.tracker", "localhost:" + PORT);
job.setInputFormatClass(NullInputFormat.class);
job.setOutputFormatClass(NullOutputFormat.class);
job.setNumReduceTasks(0);
return job;
}
@Override
public Job call()
throws IOException, InterruptedException, ClassNotFoundException {
ugi.doAs(
new PrivilegedExceptionAction<Job>() {
public Job run()
throws IOException, ClassNotFoundException, InterruptedException {
job.setMapperClass(SleepMapper.class);
job.setReducerClass(SleepReducer.class);
job.setNumReduceTasks((mapTasksOnly) ? 0 : jobdesc.getNumberReduces());
job.setMapOutputKeyClass(GridmixKey.class);
job.setMapOutputValueClass(NullWritable.class);
job.setSortComparatorClass(GridmixKey.Comparator.class);
job.setGroupingComparatorClass(SpecGroupingComparator.class);
job.setInputFormatClass(SleepInputFormat.class);
job.setOutputFormatClass(NullOutputFormat.class);
job.setPartitionerClass(DraftPartitioner.class);
job.setJarByClass(SleepJob.class);
job.getConfiguration().setBoolean(Job.USED_GENERIC_PARSER, true);
job.submit();
return job;
}
});
return job;
}
private void testWithConf(Configuration conf) throws IOException,
InterruptedException, ClassNotFoundException, URISyntaxException {
// Create a temporary file of length 1.
Path first = createTempFile("distributed.first", "x");
// Create two jars with a single file inside them.
Path second =
makeJar(new Path(TEST_ROOT_DIR, "distributed.second.jar"), 2);
Path third =
makeJar(new Path(TEST_ROOT_DIR, "distributed.third.jar"), 3);
Path fourth =
makeJar(new Path(TEST_ROOT_DIR, "distributed.fourth.jar"), 4);
Job job = Job.getInstance(conf);
job.setMapperClass(DistributedCacheCheckerMapper.class);
job.setReducerClass(DistributedCacheCheckerReducer.class);
job.setOutputFormatClass(NullOutputFormat.class);
FileInputFormat.setInputPaths(job, first);
// Creates the Job Configuration
job.addCacheFile(
new URI(first.toUri().toString() + "#distributed.first.symlink"));
job.addFileToClassPath(second);
job.addArchiveToClassPath(third);
job.addCacheArchive(fourth.toUri());
job.setMaxMapAttempts(1); // speed up failures
job.submit();
assertTrue(job.waitForCompletion(false));
}
private static Job getJobForClient() throws IOException {
Job job = Job.getInstance(new Configuration());
job.getConfiguration().set("mapred.job.tracker", "localhost:" + PORT);
job.setInputFormatClass(NullInputFormat.class);
job.setOutputFormatClass(NullOutputFormat.class);
job.setNumReduceTasks(0);
return job;
}
@Override
public Job call()
throws IOException, InterruptedException, ClassNotFoundException {
ugi.doAs(
new PrivilegedExceptionAction<Job>() {
public Job run()
throws IOException, ClassNotFoundException, InterruptedException {
job.setMapperClass(SleepMapper.class);
job.setReducerClass(SleepReducer.class);
job.setNumReduceTasks((mapTasksOnly) ? 0 : jobdesc.getNumberReduces());
job.setMapOutputKeyClass(GridmixKey.class);
job.setMapOutputValueClass(NullWritable.class);
job.setSortComparatorClass(GridmixKey.Comparator.class);
job.setGroupingComparatorClass(SpecGroupingComparator.class);
job.setInputFormatClass(SleepInputFormat.class);
job.setOutputFormatClass(NullOutputFormat.class);
job.setPartitionerClass(DraftPartitioner.class);
job.setJarByClass(SleepJob.class);
job.getConfiguration().setBoolean(Job.USED_GENERIC_PARSER, true);
job.submit();
return job;
}
});
return job;
}
@Override
public int run(String[] args) throws Exception {
if (args.length != 2) {
throw new IllegalArgumentException(
"Usage : " + CIFTester.class.getName() + " <table> <mapperClass>");
}
String table = args[0];
assertionErrors.put(table, new AssertionError("Dummy"));
assertionErrors.put(table + "_map_ioexception", new AssertionError("Dummy_ioexception"));
getConf().set("MRTester_tableName", table);
Job job = Job.getInstance(getConf());
job.setJobName(this.getClass().getSimpleName() + "_" + System.currentTimeMillis());
job.setJarByClass(this.getClass());
job.setInputFormatClass(ChunkInputFormat.class);
ChunkInputFormat.setZooKeeperInstance(job, getCluster().getClientConfig());
ChunkInputFormat.setConnectorInfo(job, getAdminPrincipal(), getAdminToken());
ChunkInputFormat.setInputTableName(job, table);
ChunkInputFormat.setScanAuthorizations(job, AUTHS);
@SuppressWarnings("unchecked")
Class<? extends Mapper<?,?,?,?>> forName = (Class<? extends Mapper<?,?,?,?>>) Class
.forName(args[1]);
job.setMapperClass(forName);
job.setMapOutputKeyClass(Key.class);
job.setMapOutputValueClass(Value.class);
job.setOutputFormatClass(NullOutputFormat.class);
job.setNumReduceTasks(0);
job.waitForCompletion(true);
return job.isSuccessful() ? 0 : 1;
}
public static ScanMetrics runJob(org.apache.hadoop.conf.Configuration hadoopConf,
Class<? extends InputFormat> inputFormat, String jobName,
Class<? extends Mapper> mapperClass)
throws IOException, InterruptedException, ClassNotFoundException {
Job job = Job.getInstance(hadoopConf);
//job.setJarByClass(HadoopScanMapper.class);
job.setJarByClass(mapperClass);
//job.setJobName(HadoopScanMapper.class.getSimpleName() + "[" + scanJob + "]");
job.setJobName(jobName);
job.setOutputKeyClass(NullWritable.class);
job.setOutputValueClass(NullWritable.class);
job.setMapOutputKeyClass(NullWritable.class);
job.setMapOutputValueClass(NullWritable.class);
job.setNumReduceTasks(0);
//job.setMapperClass(HadoopScanMapper.class);
job.setMapperClass(mapperClass);
job.setOutputFormatClass(NullOutputFormat.class);
job.setInputFormatClass(inputFormat);
boolean success = job.waitForCompletion(true);
if (!success) {
String f;
try {
// Just in case one of Job's methods throws an exception
f = String.format("MapReduce JobID %s terminated abnormally: %s",
job.getJobID().toString(), HadoopCompatLoader.DEFAULT_COMPAT.getJobFailureString(job));
} catch (RuntimeException e) {
f = "Job failed (unable to read job status programmatically -- see MapReduce logs for information)";
}
throw new IOException(f);
} else {
return DEFAULT_COMPAT.getMetrics(job.getCounters());
}
}
@Override
public Job call() throws Exception {
// We're explicitly disabling speculative execution
conf.set("mapreduce.map.speculative", "false");
conf.set("mapreduce.map.maxattempts", "1");
conf.set("mapreduce.job.user.classpath.first", "true");
conf.set("mapreduce.task.classpath.user.precedence", "true");
conf.set("mapreduce.task.classpath.first", "true");
addNecessaryJarsToJob(conf);
Job job = Job.getInstance(conf);
// IO formats
job.setInputFormatClass(getInputFormatClass());
job.setOutputFormatClass(NullOutputFormat.class);
// Mapper & job output
job.setMapperClass(getMapperClass());
job.setOutputKeyClass(NullWritable.class);
job.setOutputValueClass(NullWritable.class);
// It's map only job
job.setNumReduceTasks(0);
// General configuration
job.setJarByClass(getClass());
return job;
}
public int run(String[] args) throws Exception {
String input = args[0];
Configuration conf = getConf();
Job job = Job.getInstance(conf, Mapper2HbaseDemo.class.getSimpleName());
job.setJarByClass(Mapper2HbaseDemo.class);
job.setMapperClass(Map.class);
job.setNumReduceTasks(0);
job.setInputFormatClass(TextInputFormat.class);
TextInputFormat.setInputPaths(job, input);
job.setOutputFormatClass(NullOutputFormat.class);
return job.waitForCompletion(true) ? 0 : 1;
}