下面列出了怎么用org.apache.hadoop.mapred.SequenceFileInputFormat的API类实例代码及写法,或者点击链接到github查看源代码。
/**
* Run the test
*
* @throws IOException on error
*/
public static void runTests() throws IOException {
config.setLong("io.bytes.per.checksum", bytesPerChecksum);
JobConf job = new JobConf(config, NNBench.class);
job.setJobName("NNBench-" + operation);
FileInputFormat.setInputPaths(job, new Path(baseDir, CONTROL_DIR_NAME));
job.setInputFormat(SequenceFileInputFormat.class);
// Explicitly set number of max map attempts to 1.
job.setMaxMapAttempts(1);
// Explicitly turn off speculative execution
job.setSpeculativeExecution(false);
job.setMapperClass(NNBenchMapper.class);
job.setReducerClass(NNBenchReducer.class);
FileOutputFormat.setOutputPath(job, new Path(baseDir, OUTPUT_DIR_NAME));
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Text.class);
job.setNumReduceTasks((int) numberOfReduces);
JobClient.runJob(job);
}
private void runIOTest(
Class<? extends Mapper<Text, LongWritable, Text, Text>> mapperClass,
Path outputDir) throws IOException {
JobConf job = new JobConf(config, TestDFSIO.class);
FileInputFormat.setInputPaths(job, getControlDir(config));
job.setInputFormat(SequenceFileInputFormat.class);
job.setMapperClass(mapperClass);
job.setReducerClass(AccumulatingReducer.class);
FileOutputFormat.setOutputPath(job, outputDir);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Text.class);
job.setNumReduceTasks(1);
JobClient.runJob(job);
}
private static void joinAs(String jointype,
Class<? extends SimpleCheckerBase> c) throws Exception {
final int srcs = 4;
Configuration conf = new Configuration();
JobConf job = new JobConf(conf, c);
Path base = cluster.getFileSystem().makeQualified(new Path("/"+jointype));
Path[] src = writeSimpleSrc(base, conf, srcs);
job.set("mapreduce.join.expr", CompositeInputFormat.compose(jointype,
SequenceFileInputFormat.class, src));
job.setInt("testdatamerge.sources", srcs);
job.setInputFormat(CompositeInputFormat.class);
FileOutputFormat.setOutputPath(job, new Path(base, "out"));
job.setMapperClass(c);
job.setReducerClass(c);
job.setOutputKeyClass(IntWritable.class);
job.setOutputValueClass(IntWritable.class);
JobClient.runJob(job);
base.getFileSystem(job).delete(base, true);
}
private void runIOTest(
Class<? extends Mapper<Text, LongWritable, Text, Text>> mapperClass,
Path outputDir) throws IOException {
JobConf job = new JobConf(config, TestDFSIO.class);
FileInputFormat.setInputPaths(job, getControlDir(config));
job.setInputFormat(SequenceFileInputFormat.class);
job.setMapperClass(mapperClass);
job.setReducerClass(AccumulatingReducer.class);
FileOutputFormat.setOutputPath(job, outputDir);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Text.class);
job.setNumReduceTasks(1);
JobClient.runJob(job);
}
private static void joinAs(String jointype,
Class<? extends SimpleCheckerBase> c) throws Exception {
final int srcs = 4;
Configuration conf = new Configuration();
JobConf job = new JobConf(conf, c);
Path base = cluster.getFileSystem().makeQualified(new Path("/"+jointype));
Path[] src = writeSimpleSrc(base, conf, srcs);
job.set("mapreduce.join.expr", CompositeInputFormat.compose(jointype,
SequenceFileInputFormat.class, src));
job.setInt("testdatamerge.sources", srcs);
job.setInputFormat(CompositeInputFormat.class);
FileOutputFormat.setOutputPath(job, new Path(base, "out"));
job.setMapperClass(c);
job.setReducerClass(c);
job.setOutputKeyClass(IntWritable.class);
job.setOutputValueClass(IntWritable.class);
JobClient.runJob(job);
base.getFileSystem(job).delete(base, true);
}
/**
* Run the test
*
* @throws IOException on error
*/
public static void runTests() throws IOException {
config.setLong("io.bytes.per.checksum", bytesPerChecksum);
JobConf job = new JobConf(config, NNBench.class);
job.setJobName("NNBench-" + operation);
FileInputFormat.setInputPaths(job, new Path(baseDir, CONTROL_DIR_NAME));
job.setInputFormat(SequenceFileInputFormat.class);
// Explicitly set number of max map attempts to 1.
job.setMaxMapAttempts(1);
// Explicitly turn off speculative execution
job.setSpeculativeExecution(false);
job.setMapperClass(NNBenchMapper.class);
job.setReducerClass(NNBenchReducer.class);
FileOutputFormat.setOutputPath(job, new Path(baseDir, OUTPUT_DIR_NAME));
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Text.class);
job.setNumReduceTasks((int) numberOfReduces);
JobClient.runJob(job);
}
@Before
public void setUp() throws Exception {
conf = getDefaultDirectoryIngestMapperConfiguration();
Path dir = new Path(fs.getWorkingDirectory(), "build");
Path sub = new Path(dir, "DIMT");
Path tempDir = new Path(sub, "tmp-dir");
Path seqDir = new Path(sub, "seq-dir");// this is the location where the
// fixture will write inputs.seq
fs.mkdirs(tempDir);
tempFiles = setupDir(fs, tempDir);
conf.set(TEMP_DIR, seqDir.toString());
jobConf = new JobConf(conf);
jobConf.setMapperClass(DirectoryIngestMapper.class);
jobConf.setInputFormat(SequenceFileInputFormat.class);
FileOutputFormat.setOutputPath(jobConf, OUTPUT_DIRECTORY_PATH);
org.apache.hadoop.mapred.FileInputFormat.setInputPaths(jobConf, new Path(tempDir, "*"));
Path[] paths = org.apache.hadoop.mapred.FileInputFormat.getInputPaths(jobConf);
assertEquals(1, paths.length);
}
@Test
public void test() throws Exception {
prepareFrankensteinSeqFileInput();
Configuration conf = getDefaultSequenceFileIngestMapperConfiguration();
Job job = createJobBasedOnConfiguration(conf, SequenceFileIngestMapper.class);
((JobConf)job.getConfiguration()).setInputFormat(SequenceFileInputFormat.class);
List<String> results = runJobSuccessfully(job, 776);
assertNumDocsProcessed(job, 776);
assertEquals(776, results.size());
for (String docStr : results) {
assertNotNull(docStr);
}
}
public static JobConf createMergeJob(Configuration config, Path linkDb, boolean normalize, boolean filter) {
Path newLinkDb =
new Path("linkdb-merge-" +
Integer.toString(new Random().nextInt(Integer.MAX_VALUE)));
JobConf job = new NutchJob(config);
job.setJobName("linkdb merge " + linkDb);
job.setInputFormat(SequenceFileInputFormat.class);
job.setMapperClass(LinkDbFilter.class);
job.setBoolean(LinkDbFilter.URL_NORMALIZING, normalize);
job.setBoolean(LinkDbFilter.URL_FILTERING, filter);
job.setReducerClass(LinkDbMerger.class);
FileOutputFormat.setOutputPath(job, newLinkDb);
job.setOutputFormat(MapFileOutputFormat.class);
job.setBoolean("mapred.output.compress", true);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Inlinks.class);
// https://issues.apache.org/jira/browse/NUTCH-1069
job.setBoolean("mapreduce.fileoutputcommitter.marksuccessfuljobs", false);
return job;
}
public void delete(String crawldb, String solrUrl, boolean noCommit) throws IOException {
SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
long start = System.currentTimeMillis();
LOG.info("SolrClean: starting at " + sdf.format(start));
JobConf job = new NutchJob(getConf());
FileInputFormat.addInputPath(job, new Path(crawldb, CrawlDb.CURRENT_NAME));
job.setBoolean("noCommit", noCommit);
job.set(SolrConstants.SERVER_URL, solrUrl);
job.setInputFormat(SequenceFileInputFormat.class);
job.setOutputFormat(NullOutputFormat.class);
job.setMapOutputKeyClass(ByteWritable.class);
job.setMapOutputValueClass(Text.class);
job.setMapperClass(DBFilter.class);
job.setReducerClass(SolrDeleter.class);
JobClient.runJob(job);
long end = System.currentTimeMillis();
LOG.info("SolrClean: finished at " + sdf.format(end) + ", elapsed: " + TimingUtil.elapsedTime(start, end));
}
public static void writeTest(FileSystem fs, boolean fastCheck)
throws Exception {
fs.delete(DATA_DIR, true);
fs.delete(WRITE_DIR, true);
JobConf job = new JobConf(conf, TestFileSystem.class);
job.setBoolean("fs.test.fastCheck", fastCheck);
FileInputFormat.setInputPaths(job, CONTROL_DIR);
job.setInputFormat(SequenceFileInputFormat.class);
job.setMapperClass(WriteMapper.class);
job.setReducerClass(LongSumReducer.class);
FileOutputFormat.setOutputPath(job, WRITE_DIR);
job.setOutputKeyClass(UTF8.class);
job.setOutputValueClass(LongWritable.class);
job.setNumReduceTasks(1);
JobClient.runJob(job);
}
public static void readTest(FileSystem fs, boolean fastCheck)
throws Exception {
fs.delete(READ_DIR, true);
JobConf job = new JobConf(conf, TestFileSystem.class);
job.setBoolean("fs.test.fastCheck", fastCheck);
FileInputFormat.setInputPaths(job, CONTROL_DIR);
job.setInputFormat(SequenceFileInputFormat.class);
job.setMapperClass(ReadMapper.class);
job.setReducerClass(LongSumReducer.class);
FileOutputFormat.setOutputPath(job, READ_DIR);
job.setOutputKeyClass(UTF8.class);
job.setOutputValueClass(LongWritable.class);
job.setNumReduceTasks(1);
JobClient.runJob(job);
}
public static void seekTest(FileSystem fs, boolean fastCheck)
throws Exception {
fs.delete(READ_DIR, true);
JobConf job = new JobConf(conf, TestFileSystem.class);
job.setBoolean("fs.test.fastCheck", fastCheck);
FileInputFormat.setInputPaths(job,CONTROL_DIR);
job.setInputFormat(SequenceFileInputFormat.class);
job.setMapperClass(SeekMapper.class);
job.setReducerClass(LongSumReducer.class);
FileOutputFormat.setOutputPath(job, READ_DIR);
job.setOutputKeyClass(UTF8.class);
job.setOutputValueClass(LongWritable.class);
job.setNumReduceTasks(1);
JobClient.runJob(job);
}
private static void joinAs(String jointype,
Class<? extends SimpleCheckerBase> c) throws Exception {
final int srcs = 4;
Configuration conf = new Configuration();
JobConf job = new JobConf(conf, c);
Path base = cluster.getFileSystem().makeQualified(new Path("/"+jointype));
Path[] src = writeSimpleSrc(base, conf, srcs);
job.set("mapred.join.expr", CompositeInputFormat.compose(jointype,
SequenceFileInputFormat.class, src));
job.setInt("testdatamerge.sources", srcs);
job.setInputFormat(CompositeInputFormat.class);
FileOutputFormat.setOutputPath(job, new Path(base, "out"));
job.setMapperClass(c);
job.setReducerClass(c);
job.setOutputKeyClass(IntWritable.class);
job.setOutputValueClass(IntWritable.class);
JobClient.runJob(job);
base.getFileSystem(job).delete(base, true);
}
/**
* Run the test
*
* @throws IOException on error
*/
public static void runTests(Configuration config) throws IOException {
config.setLong("io.bytes.per.checksum", bytesPerChecksum);
JobConf job = new JobConf(config, NNBench.class);
job.setJobName("NNBench-" + operation);
FileInputFormat.setInputPaths(job, new Path(baseDir, CONTROL_DIR_NAME));
job.setInputFormat(SequenceFileInputFormat.class);
// Explicitly set number of max map attempts to 1.
job.setMaxMapAttempts(1);
// Explicitly turn off speculative execution
job.setSpeculativeExecution(false);
job.setMapperClass(NNBenchMapper.class);
job.setReducerClass(NNBenchReducer.class);
FileOutputFormat.setOutputPath(job, new Path(baseDir, OUTPUT_DIR_NAME));
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Text.class);
job.setNumReduceTasks((int) numberOfReduces);
JobClient.runJob(job);
}
private void updateJobConf(JobConf conf, Path inputPath, Path outputPath) {
// set specific job config
conf.setLong(NUMBER_OF_MAPS_KEY, nmaps);
conf.setLong(NUMBER_OF_THREADS_KEY, nthreads);
conf.setInt(BUFFER_SIZE_KEY, buffersize);
conf.setLong(WRITER_DATARATE_KEY, datarate);
conf.setLong("mapred.task.timeout", Long.MAX_VALUE);
conf.set(OUTPUT_DIR_KEY, output);
// set the output and input for the map reduce
FileInputFormat.setInputPaths(conf, inputPath);
FileOutputFormat.setOutputPath(conf, outputPath);
conf.setInputFormat(SequenceFileInputFormat.class);
conf.setOutputKeyClass(Text.class);
conf.setOutputValueClass(Text.class);
conf.setNumReduceTasks(1);
conf.setSpeculativeExecution(false);
}
public static JobConf createMergeJob(Configuration config, Path linkDb, boolean normalize, boolean filter) {
Path newLinkDb =
new Path("linkdb-merge-" +
Integer.toString(new Random().nextInt(Integer.MAX_VALUE)));
JobConf job = new NutchJob(config);
job.setJobName("linkdb merge " + linkDb);
job.setInputFormat(SequenceFileInputFormat.class);
job.setMapperClass(LinkDbFilter.class);
job.setBoolean(LinkDbFilter.URL_NORMALIZING, normalize);
job.setBoolean(LinkDbFilter.URL_FILTERING, filter);
job.setReducerClass(LinkDbMerger.class);
FileOutputFormat.setOutputPath(job, newLinkDb);
job.setOutputFormat(MapFileOutputFormat.class);
job.setBoolean("mapred.output.compress", true);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Inlinks.class);
// https://issues.apache.org/jira/browse/NUTCH-1069
job.setBoolean("mapreduce.fileoutputcommitter.marksuccessfuljobs", false);
return job;
}
@Test(timeout = 5000)
public void test0PhysicalInputs() throws Exception {
Path workDir = new Path(TEST_ROOT_DIR, "testSingleSplit");
JobConf jobConf = new JobConf(defaultConf);
jobConf.setInputFormat(org.apache.hadoop.mapred.SequenceFileInputFormat.class);
FileInputFormat.setInputPaths(jobConf, workDir);
InputContext inputContext = createTezInputContext(jobConf, new Configuration(false));
MultiMRInput mMrInput = new MultiMRInput(inputContext, 0);
mMrInput.initialize();
mMrInput.start();
assertEquals(0, mMrInput.getKeyValueReaders().size());
List<Event> events = new LinkedList<>();
try {
mMrInput.handleEvents(events);
fail("HandleEvents should cause an input with 0 physical inputs to fail");
} catch (Exception e) {
assertTrue(e instanceof IllegalStateException);
}
}
public static void writeTest(FileSystem fs, boolean fastCheck)
throws Exception {
fs.delete(DATA_DIR, true);
fs.delete(WRITE_DIR, true);
JobConf job = new JobConf(conf, TestFileSystem.class);
job.setBoolean("fs.test.fastCheck", fastCheck);
FileInputFormat.setInputPaths(job, CONTROL_DIR);
job.setInputFormat(SequenceFileInputFormat.class);
job.setMapperClass(WriteMapper.class);
job.setReducerClass(LongSumReducer.class);
FileOutputFormat.setOutputPath(job, WRITE_DIR);
job.setOutputKeyClass(UTF8.class);
job.setOutputValueClass(LongWritable.class);
job.setNumReduceTasks(1);
JobClient.runJob(job);
}
public static void readTest(FileSystem fs, boolean fastCheck)
throws Exception {
fs.delete(READ_DIR, true);
JobConf job = new JobConf(conf, TestFileSystem.class);
job.setBoolean("fs.test.fastCheck", fastCheck);
FileInputFormat.setInputPaths(job, CONTROL_DIR);
job.setInputFormat(SequenceFileInputFormat.class);
job.setMapperClass(ReadMapper.class);
job.setReducerClass(LongSumReducer.class);
FileOutputFormat.setOutputPath(job, READ_DIR);
job.setOutputKeyClass(UTF8.class);
job.setOutputValueClass(LongWritable.class);
job.setNumReduceTasks(1);
JobClient.runJob(job);
}
public static void seekTest(FileSystem fs, boolean fastCheck)
throws Exception {
fs.delete(READ_DIR, true);
JobConf job = new JobConf(conf, TestFileSystem.class);
job.setBoolean("fs.test.fastCheck", fastCheck);
FileInputFormat.setInputPaths(job,CONTROL_DIR);
job.setInputFormat(SequenceFileInputFormat.class);
job.setMapperClass(SeekMapper.class);
job.setReducerClass(LongSumReducer.class);
FileOutputFormat.setOutputPath(job, READ_DIR);
job.setOutputKeyClass(UTF8.class);
job.setOutputValueClass(LongWritable.class);
job.setNumReduceTasks(1);
JobClient.runJob(job);
}
private static void joinAs(String jointype,
Class<? extends SimpleCheckerBase> c) throws Exception {
final int srcs = 4;
Configuration conf = new Configuration();
JobConf job = new JobConf(conf, c);
Path base = cluster.getFileSystem().makeQualified(new Path("/"+jointype));
Path[] src = writeSimpleSrc(base, conf, srcs);
job.set("mapred.join.expr", CompositeInputFormat.compose(jointype,
SequenceFileInputFormat.class, src));
job.setInt("testdatamerge.sources", srcs);
job.setInputFormat(CompositeInputFormat.class);
FileOutputFormat.setOutputPath(job, new Path(base, "out"));
job.setMapperClass(c);
job.setReducerClass(c);
job.setOutputKeyClass(IntWritable.class);
job.setOutputValueClass(IntWritable.class);
JobClient.runJob(job);
base.getFileSystem(job).delete(base, true);
}
/**
* Run the test
*
* @throws IOException on error
*/
public static void runTests() throws IOException {
config.setLong("io.bytes.per.checksum", bytesPerChecksum);
JobConf job = new JobConf(config, NNBench.class);
job.setJobName("NNBench-" + operation);
FileInputFormat.setInputPaths(job, new Path(baseDir, CONTROL_DIR_NAME));
job.setInputFormat(SequenceFileInputFormat.class);
// Explicitly set number of max map attempts to 1.
job.setMaxMapAttempts(1);
// Explicitly turn off speculative execution
job.setSpeculativeExecution(false);
job.setMapperClass(NNBenchMapper.class);
job.setReducerClass(NNBenchReducer.class);
FileOutputFormat.setOutputPath(job, new Path(baseDir, OUTPUT_DIR_NAME));
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Text.class);
job.setNumReduceTasks((int) numberOfReduces);
JobClient.runJob(job);
}
@Test
public void testCreateJob() throws IOException {
JobConf job;
ArrayList<String> dummyArgs = new ArrayList<String>();
dummyArgs.add("-input"); dummyArgs.add("dummy");
dummyArgs.add("-output"); dummyArgs.add("dummy");
dummyArgs.add("-mapper"); dummyArgs.add("dummy");
dummyArgs.add("-reducer"); dummyArgs.add("dummy");
ArrayList<String> args;
args = new ArrayList<String>(dummyArgs);
args.add("-inputformat");
args.add("org.apache.hadoop.mapred.KeyValueTextInputFormat");
job = StreamJob.createJob(args.toArray(new String[] {}));
assertEquals(KeyValueTextInputFormat.class, job.getInputFormat().getClass());
args = new ArrayList<String>(dummyArgs);
args.add("-inputformat");
args.add("org.apache.hadoop.mapred.SequenceFileInputFormat");
job = StreamJob.createJob(args.toArray(new String[] {}));
assertEquals(SequenceFileInputFormat.class, job.getInputFormat().getClass());
args = new ArrayList<String>(dummyArgs);
args.add("-inputformat");
args.add("org.apache.hadoop.mapred.KeyValueTextInputFormat");
args.add("-inputreader");
args.add("StreamXmlRecordReader,begin=<doc>,end=</doc>");
job = StreamJob.createJob(args.toArray(new String[] {}));
assertEquals(StreamInputFormat.class, job.getInputFormat().getClass());
}
@Test
public void testCreateJob() throws IOException {
JobConf job;
ArrayList<String> dummyArgs = new ArrayList<String>();
dummyArgs.add("-input"); dummyArgs.add("dummy");
dummyArgs.add("-output"); dummyArgs.add("dummy");
dummyArgs.add("-mapper"); dummyArgs.add("dummy");
dummyArgs.add("-reducer"); dummyArgs.add("dummy");
ArrayList<String> args;
args = new ArrayList<String>(dummyArgs);
args.add("-inputformat");
args.add("org.apache.hadoop.mapred.KeyValueTextInputFormat");
job = StreamJob.createJob(args.toArray(new String[] {}));
assertEquals(KeyValueTextInputFormat.class, job.getInputFormat().getClass());
args = new ArrayList<String>(dummyArgs);
args.add("-inputformat");
args.add("org.apache.hadoop.mapred.SequenceFileInputFormat");
job = StreamJob.createJob(args.toArray(new String[] {}));
assertEquals(SequenceFileInputFormat.class, job.getInputFormat().getClass());
args = new ArrayList<String>(dummyArgs);
args.add("-inputformat");
args.add("org.apache.hadoop.mapred.KeyValueTextInputFormat");
args.add("-inputreader");
args.add("StreamXmlRecordReader,begin=<doc>,end=</doc>");
job = StreamJob.createJob(args.toArray(new String[] {}));
assertEquals(StreamInputFormat.class, job.getInputFormat().getClass());
}
@Override
public void init(JobConf conf) throws IOException {
// Expand the input path glob into a sequence file of inputs
Path actualInput = new Path(conf.get(TEMP_DIR), "inputs.seq");
expandGlob(conf, actualInput, FileInputFormat.getInputPaths(conf));
// Configure the real M/R job
conf.setInputFormat(SequenceFileInputFormat.class);
FileInputFormat.setInputPaths(conf, actualInput);
conf.setMapperClass(DirectoryIngestMapper.class);
}
@Override
public void init(JobConf conf) throws IOException {
boolean override = conf.getBoolean(IngestJob.INPUT_FORMAT_OVERRIDE, false);
if (override == false) {
conf.setInputFormat(SequenceFileInputFormat.class);
}// else the user has overridden the input format and we assume it is OK.
}
@Override
public void init(JobConf conf) throws IOException {
boolean override = conf.getBoolean(IngestJob.INPUT_FORMAT_OVERRIDE, false);
if (override == false) {
conf.setInputFormat(SequenceFileInputFormat.class);
}// else the user has overridden the input format and we assume it is OK.
}
@Test
public void test() throws Exception {
Configuration conf = getDefaultSipsIngestMapperConfiguration();
create100EntrySequenceFile(conf);
Job job = createJobBasedOnConfiguration(conf, SipsIngestMapper.class);
((JobConf)job.getConfiguration()).setInputFormat(SequenceFileInputFormat.class);
final List<String> results = runJobSuccessfully(job,100);
assertNumDocsProcessed(job, 100);
for (String docString : results) {
assertNotNull(docString);
}
}
public void processDumpJob(String crawlDb, String output, Configuration config, String format, String regex, String status) throws IOException {
if (LOG.isInfoEnabled()) {
LOG.info("CrawlDb dump: starting");
LOG.info("CrawlDb db: " + crawlDb);
}
Path outFolder = new Path(output);
JobConf job = new NutchJob(config);
job.setJobName("dump " + crawlDb);
FileInputFormat.addInputPath(job, new Path(crawlDb, CrawlDb.CURRENT_NAME));
job.setInputFormat(SequenceFileInputFormat.class);
FileOutputFormat.setOutputPath(job, outFolder);
if (format.equals("csv")) {
job.setOutputFormat(CrawlDatumCsvOutputFormat.class);
}
else if (format.equals("crawldb")) {
job.setOutputFormat(MapFileOutputFormat.class);
} else {
job.setOutputFormat(TextOutputFormat.class);
}
if (status != null) job.set("status", status);
if (regex != null) job.set("regex", regex);
job.setMapperClass(CrawlDbDumpMapper.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(CrawlDatum.class);
JobClient.runJob(job);
if (LOG.isInfoEnabled()) { LOG.info("CrawlDb dump: done"); }
}