下面列出了org.apache.hadoop.mapreduce.Job#getConfiguration ( ) 实例代码,或者点击链接到github查看源代码,也可以在右侧发表评论。
public static void main(String[] args) throws Exception {
Configuration conf = new Configuration();
if (args.length < 2) {
System.err.println("Usage: WikiLoader configFile inputDir");
System.exit(2);
}
String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();
Job job = Job.getInstance(conf, "wiki loader");
job.setJarByClass(WikiLoader.class);
job.setInputFormatClass(WikiInputFormat.class);
job.setMapperClass(ArticleMapper.class);
job.setMapOutputKeyClass(DocumentURI.class);
job.setMapOutputValueClass(Text.class);
job.setOutputFormatClass(ContentOutputFormat.class);
ContentInputFormat.setInputPaths(job, new Path(otherArgs[1]));
conf = job.getConfiguration();
conf.addResource(otherArgs[0]);
System.exit(job.waitForCompletion(true) ? 0 : 1);
}
private Job configureSubmittableJob(Job job, Path outputPath, Class<IndexScrutinyMapperForTest> mapperClass) throws Exception {
Configuration conf = job.getConfiguration();
conf.setBoolean("mapreduce.job.user.classpath.first", true);
HBaseConfiguration.merge(conf, HBaseConfiguration.create(conf));
job.setJarByClass(IndexScrutinyTool.class);
job.setOutputFormatClass(NullOutputFormat.class);
if (outputInvalidRows && OutputFormat.FILE.equals(outputFormat)) {
job.setOutputFormatClass(TextOutputFormat.class);
FileOutputFormat.setOutputPath(job, outputPath);
}
job.setMapperClass((mapperClass == null ? IndexScrutinyMapper.class : mapperClass));
job.setNumReduceTasks(0);
// Set the Output classes
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(Text.class);
TableMapReduceUtil.addDependencyJars(job);
return job;
}
public static void main(String[] args) throws Exception {
Configuration conf = new Configuration();
String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();
if (otherArgs.length < 2) {
System.err.println("Usage: MultithreadedZipContentLoader configFile inputDir threadCount");
System.exit(2);
}
Job job = Job.getInstance(conf);
job.setJarByClass(MultithreadedZipContentLoader.class);
job.setInputFormatClass(ZipContentInputFormat.class);
job.setMapperClass(MultithreadedMapper.class);
MultithreadedMapper.setMapperClass(job, ZipContentMapper.class);
MultithreadedMapper.setNumberOfThreads(job, Integer.parseInt(args[2]));
job.setMapOutputKeyClass(DocumentURI.class);
job.setMapOutputValueClass(Text.class);
job.setOutputFormatClass(ContentOutputFormat.class);
ZipContentInputFormat.setInputPaths(job, new Path(otherArgs[1]));
conf = job.getConfiguration();
conf.addResource(otherArgs[0]);
System.exit(job.waitForCompletion(true) ? 0 : 1);
}
@Override
protected void configureInputFormat(Job job, String tableName,
String tableClassName, String splitByCol)
throws ClassNotFoundException, IOException {
if (options.getOdpsTable() != null) {
Configuration conf = job.getConfiguration();
setInputFormatClass(OdpsExportInputFormat.class);
conf.set(OdpsConstants.TABLE_NAME, options.getOdpsTable());
conf.set(OdpsConstants.ACCESS_ID, options.getOdpsAccessID());
conf.set(OdpsConstants.ACCESS_KEY, options.getOdpsAccessKey());
conf.set(OdpsConstants.ENDPOINT, options.getOdpsEndPoint());
conf.set(OdpsConstants.PROJECT, options.getOdpsProject());
String partitionSpec = options.getOdpsPartitionSpec();
if (partitionSpec != null) {
conf.set(OdpsConstants.PARTITION_SPEC, partitionSpec);
}
setMapperClass(OdpsExportMapper.class);
}
super.configureInputFormat(job, tableName, tableClassName, splitByCol);
if (!isHCatJob && options.getOdpsTable() == null) {
FileInputFormat.addInputPath(job, getInputPath());
}
}
@Override
protected void configureOutputFormat(Job job, String tableName,
String tableClassName) throws ClassNotFoundException, IOException {
// Use the DelegatingOutputFormat with the HBasePutProcessor.
job.setOutputFormatClass(getOutputFormatClass());
Configuration conf = job.getConfiguration();
conf.setClass("sqoop.output.delegate.field.map.processor.class",
HBasePutProcessor.class,
FieldMapProcessor.class);
// Set the HBase parameters (table, column family, row key):
conf.set(HBasePutProcessor.TABLE_NAME_KEY, options.getHBaseTable());
conf.set(HBasePutProcessor.COL_FAMILY_KEY, options.getHBaseColFamily());
// What column of the input becomes the row key?
String rowKeyCol = options.getHBaseRowKeyColumn();
if (null == rowKeyCol) {
// User didn't explicitly set one. If there's a split-by column set,
// use that.
rowKeyCol = options.getSplitByCol();
}
if (null == rowKeyCol) {
// No split-by column is explicitly set.
// If the table has a primary key, use that.
ConnManager manager = getContext().getConnManager();
rowKeyCol = manager.getPrimaryKey(tableName);
}
if (null == rowKeyCol) {
// Give up here if this is still unset.
throw new IOException("Could not determine the row-key column. "
+ "Use --hbase-row-key to specify the input column that "
+ "names each row.");
}
conf.set(HBasePutProcessor.ROW_KEY_COLUMN_KEY, rowKeyCol);
}
@Override
public int run(String[] args)
throws Exception
{
Job job = Job.getInstance(getConf());
for (Map.Entry<String, String> next : job.getConfiguration()) {
System.out.println(next.getKey() + ": " + next.getValue());
}
job.setJarByClass(PagesByURLExtractor.class);
job.setJobName(PagesByURLExtractor.class.getName());
// mapper
job.setMapperClass(MapperClass.class);
// input
job.setInputFormatClass(WARCInputFormat.class);
// output
job.setOutputFormatClass(WARCOutputFormat.class);
job.setOutputKeyClass(NullWritable.class);
job.setOutputValueClass(WARCWritable.class);
FileOutputFormat.setCompressOutput(job, true);
// paths
String commaSeparatedInputFiles = args[0];
String outputPath = args[1];
// load IDs to be searched for
job.getConfiguration().set(MAPREDUCE_MAPPER_URLS, loadURLs(args[2]));
FileInputFormat.addInputPaths(job, commaSeparatedInputFiles);
FileOutputFormat.setOutputPath(job, new Path(outputPath));
return job.waitForCompletion(true) ? 0 : 1;
}
private void testCommitterInternal(int version) throws Exception {
Job job = Job.getInstance();
FileOutputFormat.setOutputPath(job, outDir);
Configuration conf = job.getConfiguration();
conf.set(MRJobConfig.TASK_ATTEMPT_ID, attempt);
conf.setInt(FileOutputCommitter.FILEOUTPUTCOMMITTER_ALGORITHM_VERSION,
version);
JobContext jContext = new JobContextImpl(conf, taskID.getJobID());
TaskAttemptContext tContext = new TaskAttemptContextImpl(conf, taskID);
FileOutputCommitter committer = new FileOutputCommitter(outDir, tContext);
// setup
committer.setupJob(jContext);
committer.setupTask(tContext);
// write output
TextOutputFormat theOutputFormat = new TextOutputFormat();
RecordWriter theRecordWriter = theOutputFormat.getRecordWriter(tContext);
writeOutput(theRecordWriter, tContext);
// do commit
committer.commitTask(tContext);
committer.commitJob(jContext);
// validate output
validateContent(outDir);
FileUtil.fullyDelete(new File(outDir.toString()));
}
@Override
public void setLocation(String location, Job job) throws IOException {
final Configuration configuration = job.getConfiguration();
//explicitly turning off combining splits.
configuration.setBoolean("pig.noSplitCombination", true);
//to have phoenix working on a secured cluster
TableMapReduceUtil.initCredentials(job);
this.initializePhoenixPigConfiguration(location, configuration);
}
public static void main(String[] args) throws Exception {
Opts opts = new Opts();
opts.parseArgs(TeraSortIngest.class.getName(), args);
Job job = Job.getInstance(opts.getHadoopConfig());
job.setJobName(TeraSortIngest.class.getName());
job.setJarByClass(TeraSortIngest.class);
job.setInputFormatClass(RangeInputFormat.class);
job.setMapperClass(SortGenMapper.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(Mutation.class);
job.setNumReduceTasks(0);
job.setOutputFormatClass(AccumuloOutputFormat.class);
AccumuloOutputFormat.configure().clientProperties(opts.getClientProperties())
.defaultTable(opts.tableName).createTables(true).store(job);
Configuration conf = job.getConfiguration();
conf.setLong(NUMROWS, opts.numRows);
conf.setInt("cloudgen.minkeylength", opts.minKeyLength);
conf.setInt("cloudgen.maxkeylength", opts.maxKeyLength);
conf.setInt("cloudgen.minvaluelength", opts.minValueLength);
conf.setInt("cloudgen.maxvaluelength", opts.maxValueLength);
conf.set("cloudgen.tablename", opts.tableName);
if (opts.splits != 0)
conf.setInt(NUMSPLITS, opts.splits);
System.exit(job.waitForCompletion(true) ? 0 : 1);
}
@Test
public void testSetLoadLocation() throws IOException, ParseException {
AbstractAccumuloStorage s = getAbstractAccumuloStorage();
Job actual = new Job();
s.setLocation(getDefaultLoadLocation(), actual);
Configuration actualConf = actual.getConfiguration();
Job expected = getDefaultExpectedLoadJob();
Configuration expectedConf = expected.getConfiguration();
s.loadDependentJars(expectedConf);
assertConfigurationsEqual(expectedConf, actualConf);
}
/**
* Uses the HBase Front Door Api to write to index table. Submits the job and either returns or
* waits for the job completion based on runForeground parameter.
*
* @param job job
* @param outputPath output path
* @param runForeground - if true, waits for job completion, else submits and returns
* immediately.
* @throws Exception
*/
private void configureSubmittableJobUsingDirectApi(Job job, Path outputPath, TableName outputTableName,
boolean skipDependencyJars, boolean runForeground)
throws Exception {
job.setMapperClass(getDirectMapperClass());
job.setReducerClass(getDirectReducerClass());
Configuration conf = job.getConfiguration();
HBaseConfiguration.merge(conf, HBaseConfiguration.create(conf));
conf.set(TableOutputFormat.OUTPUT_TABLE, outputTableName.getNameAsString());
//Set the Output classes
job.setMapOutputValueClass(IntWritable.class);
job.setOutputKeyClass(NullWritable.class);
job.setOutputValueClass(NullWritable.class);
if (!skipDependencyJars) {
TableMapReduceUtil.addDependencyJars(job);
}
job.setNumReduceTasks(1);
if (!runForeground) {
LOG.info("Running Index Build in Background - Submit async and exit");
job.submit();
return;
}
LOG.info("Running Index Build in Foreground. Waits for the build to complete. This may take a long time!.");
boolean result = job.waitForCompletion(true);
if (!result) {
LOG.error("IndexTool job failed!");
throw new Exception("IndexTool job failed: " + job.toString());
}
FileSystem.get(conf).delete(outputPath, true);
}
@SuppressWarnings("deprecation")
@Override
public ResourceSchema getSchema(String location, Job job)
throws IOException {
Configuration conf = job.getConfiguration();
Properties props = ConfigurationUtil.toProperties(conf);
// At compile time in batch mode, the file may not exist
// (such as intermediate file). Just return null - the
// same way as we would if we did not get a valid record
String[] locations = getPathStrings(location);
for (String loc : locations) {
// since local mode now is implemented as hadoop's local mode
// we can treat either local or hadoop mode as hadoop mode - hence
// we can use HDataStorage and FileLocalizer.openDFSFile below
HDataStorage storage;
try {
storage = new HDataStorage((new org.apache.hadoop.fs.Path(loc)).toUri(), props);
} catch (RuntimeException e) {
throw new IOException(e);
}
if (!FileLocalizer.fileExists(loc, storage)) {
return null;
}
}
return Utils.getSchema(this, location, false, job);
}
public HadoopOutputFormatBase(org.apache.hadoop.mapreduce.OutputFormat<K, V> mapreduceOutputFormat, Job job) {
super(job.getCredentials());
this.mapreduceOutputFormat = mapreduceOutputFormat;
this.configuration = job.getConfiguration();
HadoopUtils.mergeHadoopConf(configuration);
}
private void testJobClassloader(boolean useCustomClasses) throws IOException,
InterruptedException, ClassNotFoundException {
LOG.info("\n\n\nStarting testJobClassloader()"
+ " useCustomClasses=" + useCustomClasses);
if (!(new File(MiniMRYarnCluster.APPJAR)).exists()) {
LOG.info("MRAppJar " + MiniMRYarnCluster.APPJAR
+ " not found. Not running test.");
return;
}
final Configuration sleepConf = new Configuration(mrCluster.getConfig());
// set master address to local to test that local mode applied iff framework == local
sleepConf.set(MRConfig.MASTER_ADDRESS, "local");
sleepConf.setBoolean(MRJobConfig.MAPREDUCE_JOB_CLASSLOADER, true);
if (useCustomClasses) {
// to test AM loading user classes such as output format class, we want
// to blacklist them from the system classes (they need to be prepended
// as the first match wins)
String systemClasses = ApplicationClassLoader.SYSTEM_CLASSES_DEFAULT;
// exclude the custom classes from system classes
systemClasses = "-" + CustomOutputFormat.class.getName() + ",-" +
CustomSpeculator.class.getName() + "," +
systemClasses;
sleepConf.set(MRJobConfig.MAPREDUCE_JOB_CLASSLOADER_SYSTEM_CLASSES,
systemClasses);
}
sleepConf.set(MRJobConfig.IO_SORT_MB, TEST_IO_SORT_MB);
sleepConf.set(MRJobConfig.MR_AM_LOG_LEVEL, Level.ALL.toString());
sleepConf.set(MRJobConfig.MAP_LOG_LEVEL, Level.ALL.toString());
sleepConf.set(MRJobConfig.REDUCE_LOG_LEVEL, Level.ALL.toString());
sleepConf.set(MRJobConfig.MAP_JAVA_OPTS, "-verbose:class");
final SleepJob sleepJob = new SleepJob();
sleepJob.setConf(sleepConf);
final Job job = sleepJob.createJob(1, 1, 10, 1, 10, 1);
job.setMapperClass(ConfVerificationMapper.class);
job.addFileToClassPath(APP_JAR); // The AppMaster jar itself.
job.setJarByClass(SleepJob.class);
job.setMaxMapAttempts(1); // speed up failures
if (useCustomClasses) {
// set custom output format class and speculator class
job.setOutputFormatClass(CustomOutputFormat.class);
final Configuration jobConf = job.getConfiguration();
jobConf.setClass(MRJobConfig.MR_AM_JOB_SPECULATOR, CustomSpeculator.class,
Speculator.class);
// speculation needs to be enabled for the speculator to be loaded
jobConf.setBoolean(MRJobConfig.MAP_SPECULATIVE, true);
}
job.submit();
boolean succeeded = job.waitForCompletion(true);
Assert.assertTrue("Job status: " + job.getStatus().getFailureInfo(),
succeeded);
}
public int run(String[] args) throws Exception {
String inputFoo = args[0];
String inputBar = args[1];
String output = args[2];
String fooValueMaxFilter = args[3];
String joinValueMaxFilter = args[4];
int numberOfReducers = Integer.parseInt(args[5]);
//A
Job job = Job.getInstance();
//B
job.setJarByClass(JoinFilterExampleMRJob.class);
job.setJobName("JoinFilterExampleMRJob");
//C
Configuration config = job.getConfiguration();
config.set(FOO_TABLE_CONF, inputFoo);
config.set(BAR_TABLE_CONF, inputBar);
config.set(FOO_VAL_MAX_CONF, fooValueMaxFilter);
config.set(JOIN_VAL_MAX_CONF, joinValueMaxFilter);
// D
job.setInputFormatClass(TextInputFormat.class);
TextInputFormat.addInputPath(job, new Path(inputFoo));
TextInputFormat.addInputPath(job, new Path(inputBar));
// E
job.setOutputFormatClass(TextOutputFormat.class);
TextOutputFormat.setOutputPath(job, new Path(output));
// F
job.setMapperClass(JoinFilterMapper.class);
job.setReducerClass(JoinFilterReducer.class);
job.setPartitionerClass(JoinFilterPartitioner.class);
// G
job.setOutputKeyClass(NullWritable.class);
job.setOutputValueClass(Text.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(Text.class);
//H
job.setNumReduceTasks(numberOfReducers);
// I
job.waitForCompletion(true);
return 0;
}
@Test
public void iterateOverParquetFile() throws Exception {
ConfigurationProxy conf = new ConfigurationProxy();
conf.set( "fs.defaultFS", "file:///" );
Job job = Job.getInstance( conf );
String marshallStr = null;
switch ( testType ) {
case "DATA":
marshallStr =
new ParquetInputFieldList( ParquetUtils.createSchema( ValueMetaInterface.TYPE_INTEGER ) ).marshall();
expectedException = ExpectedException.none();
break;
case "EMPTY":
marshallStr = new SchemaDescription().marshall();
expectedException.expect( RuntimeException.class );
break;
default:
org.junit.Assert.fail( "Invalid test type used." );
}
switch ( provider ) {
case "APACHE":
job.getConfiguration()
.set( org.pentaho.hadoop.shim.common.format.parquet.delegate.apache.ParquetConverter.PARQUET_SCHEMA_CONF_KEY,
marshallStr );
org.apache.parquet.hadoop.api.ReadSupport<RowMetaAndData> apacheReadSupport =
new org.pentaho.hadoop.shim.common.format.parquet.delegate.apache.PentahoParquetReadSupport();
org.apache.parquet.hadoop.ParquetRecordReader<RowMetaAndData> apacheNativeRecordReader =
new org.apache.parquet.hadoop.ParquetRecordReader<>( apacheReadSupport,
org.apache.parquet.hadoop.ParquetInputFormat.getFilter( job.getConfiguration() ) );
org.apache.parquet.hadoop.ParquetInputFormat<RowMetaAndData> apacheNativeParquetInputFormat =
new org.apache.parquet.hadoop.ParquetInputFormat<>();
FileInputFormat.setInputPaths( job, getClass().getClassLoader().getResource( testFile ).toExternalForm() );
InputSplit apacheInputSplit = apacheNativeParquetInputFormat.getSplits( job ).get( 0 );
TaskAttemptContextImpl apacheTask = new TaskAttemptContextImpl( job.getConfiguration(), new TaskAttemptID() );
apacheNativeRecordReader.initialize( apacheInputSplit, apacheTask );
org.pentaho.hadoop.shim.common.format.parquet.delegate.apache.PentahoParquetRecordReader apacheRecordReader =
new org.pentaho.hadoop.shim.common.format.parquet.delegate.apache.PentahoParquetRecordReader(
apacheNativeRecordReader );
switch ( testType ) {
case "DATA":
Assert.assertTrue( apacheRecordReader.iterator().hasNext() );
Assert.assertNotNull( apacheRecordReader.iterator().next() );
break;
case "EMPTY":
Assert.assertFalse( apacheRecordReader.iterator().hasNext() );
Assert.assertNull( apacheRecordReader.iterator().next() );
break;
default:
org.junit.Assert.fail( "Invalid test type used." );
}
apacheRecordReader.close();
break;
case "TWITTER":
job.getConfiguration()
.set( org.pentaho.hadoop.shim.common.format.parquet.delegate.twitter.ParquetConverter.PARQUET_SCHEMA_CONF_KEY,
marshallStr );
parquet.hadoop.api.ReadSupport<RowMetaAndData> twitterReadSupport =
new org.pentaho.hadoop.shim.common.format.parquet.delegate.twitter.PentahoParquetReadSupport();
parquet.hadoop.ParquetRecordReader<RowMetaAndData> twitterNativeRecordReader =
new parquet.hadoop.ParquetRecordReader<>( twitterReadSupport,
parquet.hadoop.ParquetInputFormat.getFilter( job.getConfiguration() ) );
parquet.hadoop.ParquetInputFormat<RowMetaAndData> twitterNativeParquetInputFormat =
new parquet.hadoop.ParquetInputFormat<>();
FileInputFormat.setInputPaths( job, getClass().getClassLoader().getResource( testFile ).toExternalForm() );
InputSplit twitterInputSplit = twitterNativeParquetInputFormat.getSplits( job ).get( 0 );
TaskAttemptContextImpl twitterTask = new TaskAttemptContextImpl( job.getConfiguration(), new TaskAttemptID() );
twitterNativeRecordReader.initialize( twitterInputSplit, twitterTask );
org.pentaho.hadoop.shim.common.format.parquet.delegate.twitter.PentahoParquetRecordReader twitterRecordReader =
new org.pentaho.hadoop.shim.common.format.parquet.delegate.twitter.PentahoParquetRecordReader(
twitterNativeRecordReader );
switch ( testType ) {
case "DATA":
Assert.assertTrue( twitterRecordReader.iterator().hasNext() );
Assert.assertNotNull( twitterRecordReader.iterator().next() );
break;
case "EMPTY":
Assert.assertFalse( twitterRecordReader.iterator().hasNext() );
Assert.assertNull( twitterRecordReader.iterator().next() );
break;
default:
org.junit.Assert.fail( "Invalid test type used." );
}
twitterRecordReader.close();
break;
default:
org.junit.Assert.fail( "Invalid provider name used." );
}
}
/**
* Randomize the split order, then take the specified number of keys from
* each split sampled, where each key is selected with the specified
* probability and possibly replaced by a subsequently selected key when
* the quota of keys from that split is satisfied.
*/
@SuppressWarnings("unchecked") // ArrayList::toArray doesn't preserve type
public K[] getSample(InputFormat<K,V> inf, Job job)
throws IOException, InterruptedException {
List<InputSplit> splits = inf.getSplits(job);
ArrayList<K> samples = new ArrayList<K>(numSamples);
int splitsToSample = Math.min(maxSplitsSampled, splits.size());
Random r = new Random();
long seed = r.nextLong();
r.setSeed(seed);
LOG.debug("seed: " + seed);
// shuffle splits
for (int i = 0; i < splits.size(); ++i) {
InputSplit tmp = splits.get(i);
int j = r.nextInt(splits.size());
splits.set(i, splits.get(j));
splits.set(j, tmp);
}
// our target rate is in terms of the maximum number of sample splits,
// but we accept the possibility of sampling additional splits to hit
// the target sample keyset
for (int i = 0; i < splitsToSample ||
(i < splits.size() && samples.size() < numSamples); ++i) {
TaskAttemptContext samplingContext = new TaskAttemptContextImpl(
job.getConfiguration(), new TaskAttemptID());
RecordReader<K,V> reader = inf.createRecordReader(
splits.get(i), samplingContext);
reader.initialize(splits.get(i), samplingContext);
while (reader.nextKeyValue()) {
if (r.nextDouble() <= freq) {
if (samples.size() < numSamples) {
samples.add(ReflectionUtils.copy(job.getConfiguration(),
reader.getCurrentKey(), null));
} else {
// When exceeding the maximum number of samples, replace a
// random element with this one, then adjust the frequency
// to reflect the possibility of existing elements being
// pushed out
int ind = r.nextInt(numSamples);
if (ind != numSamples) {
samples.set(ind, ReflectionUtils.copy(job.getConfiguration(),
reader.getCurrentKey(), null));
}
freq *= (numSamples - 1) / (double) numSamples;
}
}
}
reader.close();
}
return (K[])samples.toArray();
}
@Override
public int run(String[] args) throws Exception {
Job job = Job.getInstance(getConf());
Configuration configuration = job.getConfiguration();
job.setJarByClass(Aegisthus.class);
CommandLine cl = getOptions(args);
if (cl == null) {
return 1;
}
// Check all of the paths and load the sstable version from the input filenames
List<Path> paths = Lists.newArrayList();
if (cl.hasOption(Feature.CMD_ARG_INPUT_FILE)) {
for (String input : cl.getOptionValues(Feature.CMD_ARG_INPUT_FILE)) {
paths.add(new Path(input));
}
}
if (cl.hasOption(Feature.CMD_ARG_INPUT_DIR)) {
paths.addAll(getDataFiles(configuration, cl.getOptionValue(Feature.CMD_ARG_INPUT_DIR)));
}
LOG.info("Processing paths: {}", paths);
// At this point we have the version of sstable that we can use for this run
Descriptor.Version version = Descriptor.Version.CURRENT;
if (cl.hasOption(Feature.CMD_ARG_SSTABLE_OUTPUT_VERSION)) {
version = new Descriptor.Version(cl.getOptionValue(Feature.CMD_ARG_SSTABLE_OUTPUT_VERSION));
}
configuration.set(Feature.CONF_SSTABLE_VERSION, version.toString());
if (configuration.get(Feature.CONF_CQL_SCHEMA) != null) {
setConfigurationFromCql(configuration);
}
if(cl.hasOption(Feature.CMD_ARG_COMBINE_SPLITS)) {
job.setInputFormatClass(AegisthusCombinedInputFormat.class);
} else {
job.setInputFormatClass(AegisthusInputFormat.class);
}
job.setMapOutputKeyClass(AegisthusKey.class);
job.setMapOutputValueClass(AtomWritable.class);
job.setOutputKeyClass(AegisthusKey.class);
job.setOutputValueClass(RowWritable.class);
job.setMapperClass(AegisthusKeyMapper.class);
job.setReducerClass(CassSSTableReducer.class);
job.setGroupingComparatorClass(AegisthusKeyGroupingComparator.class);
job.setPartitionerClass(AegisthusKeyPartitioner.class);
job.setSortComparatorClass(AegisthusKeySortingComparator.class);
TextInputFormat.setInputPaths(job, paths.toArray(new Path[paths.size()]));
if (cl.hasOption(Feature.CMD_ARG_PRODUCE_SSTABLE)) {
job.setOutputFormatClass(SSTableOutputFormat.class);
} else {
job.setOutputFormatClass(JsonOutputFormat.class);
}
CustomFileNameFileOutputFormat.setOutputPath(job, new Path(cl.getOptionValue(Feature.CMD_ARG_OUTPUT_DIR)));
job.submit();
if (configuration.getBoolean(Feature.CONF_SHUTDOWN_HOOK, true)) {
Runtime.getRuntime().addShutdownHook(new JobKiller(job));
}
System.out.println(job.getJobID());
System.out.println(job.getTrackingURL());
boolean success = job.waitForCompletion(true);
if (success) {
Counter errorCounter = job.getCounters().findCounter("aegisthus", "error_skipped_input");
long errorCount = errorCounter != null ? errorCounter.getValue() : 0L;
int maxAllowed = configuration.getInt(Feature.CONF_MAX_CORRUPT_FILES_TO_SKIP, 0);
if (errorCounter != null && errorCounter.getValue() > maxAllowed) {
LOG.error("Found {} corrupt files which is greater than the max allowed {}", errorCount, maxAllowed);
success = false;
} else if (errorCount > 0) {
LOG.warn("Found {} corrupt files but not failing the job because the max allowed is {}",
errorCount, maxAllowed);
}
}
return success ? 0 : 1;
}
/**
* Sets the Reducer class to the chain job.
*
* <p>
* The configuration properties of the chain job have precedence over the
* configuration properties of the Reducer.
*
* @param job
* the chain job.
* @param klass
* the Reducer class to add.
* @param inputKeyClass
* reducer input key class.
* @param inputValueClass
* reducer input value class.
* @param outputKeyClass
* reducer output key class.
* @param outputValueClass
* reducer output value class.
* @param reducerConf
* a configuration for the Reducer class. It is recommended to use a
* Configuration without default values using the
* <code>Configuration(boolean loadDefaults)</code> constructor with
* FALSE.
*/
@SuppressWarnings("unchecked")
protected static void setReducer(Job job, Class<? extends Reducer> klass,
Class<?> inputKeyClass, Class<?> inputValueClass,
Class<?> outputKeyClass, Class<?> outputValueClass,
Configuration reducerConf) {
String prefix = getPrefix(false);
Configuration jobConf = job.getConfiguration();
checkReducerAlreadySet(false, jobConf, prefix, false);
jobConf.setClass(prefix + CHAIN_REDUCER_CLASS, klass, Reducer.class);
setReducerConf(jobConf, inputKeyClass, inputValueClass, outputKeyClass,
outputValueClass, reducerConf, prefix);
}
/**
* Configures the {@code Job} to use the {@code IcebergInputFormat} and
* returns a helper to add further configuration.
*
* @param job the {@code Job} to configure
*/
public static ConfigBuilder configure(Job job) {
job.setInputFormatClass(IcebergInputFormat.class);
return new ConfigBuilder(job.getConfiguration());
}