下面列出了org.apache.hadoop.mapreduce.JobContext#getConfiguration ( ) 实例代码,或者点击链接到github查看源代码,也可以在右侧发表评论。
public NestedGroupCentroidAssignment(
final JobContext context,
final Class<?> scope,
final Logger logger) throws InstantiationException, IllegalAccessException, IOException {
final ScopedJobConfiguration config =
new ScopedJobConfiguration(context.getConfiguration(), scope, logger);
endZoomLevel = config.getInt(CentroidParameters.Centroid.ZOOM_LEVEL, 1);
parentBatchID =
config.getString(
GlobalParameters.Global.PARENT_BATCH_ID,
config.getString(GlobalParameters.Global.BATCH_ID, null));
@SuppressWarnings("unchecked")
final DistanceFn<T> distanceFunction =
config.getInstance(
CommonParameters.Common.DISTANCE_FUNCTION_CLASS,
DistanceFn.class,
FeatureCentroidDistanceFn.class);
this.associationdFunction.setDistanceFunction(distanceFunction);
centroidManager = new CentroidManagerGeoWave<>(context, scope);
}
@Override
public void commitJob(JobContext jobContext) throws IOException {
// look through all the shards for attempts that need to be cleaned up.
// also find all the attempts that are finished
// then rename all the attempts jobs to commits
LOG.info("Commiting Job [{0}]", jobContext.getJobID());
Configuration configuration = jobContext.getConfiguration();
Path tableOutput = BlurOutputFormat.getOutputPath(configuration);
LOG.info("TableOutput path [{0}]", tableOutput);
makeSureNoEmptyShards(configuration, tableOutput);
FileSystem fileSystem = tableOutput.getFileSystem(configuration);
for (FileStatus fileStatus : fileSystem.listStatus(tableOutput)) {
LOG.info("Checking file status [{0}] with path [{1}]", fileStatus, fileStatus.getPath());
if (isShard(fileStatus)) {
commitOrAbortJob(jobContext, fileStatus.getPath(), true);
}
}
LOG.info("Commiting Complete [{0}]", jobContext.getJobID());
}
@Override
public List<InputSplit> getSplits(JobContext context) throws IOException, InterruptedException {
Configuration conf = context.getConfiguration();
Path snapshotDir = new Path(conf.get(CONF_SNAPSHOT_DIR));
FileSystem fs = FileSystem.get(snapshotDir.toUri(), conf);
List<Pair<SnapshotFileInfo, Long>> snapshotFiles = getSnapshotFiles(conf, fs, snapshotDir);
int mappers = conf.getInt(CONF_NUM_SPLITS, 0);
if (mappers == 0 && snapshotFiles.size() > 0) {
mappers = 1 + (snapshotFiles.size() / conf.getInt(CONF_MAP_GROUP, 10));
mappers = Math.min(mappers, snapshotFiles.size());
conf.setInt(CONF_NUM_SPLITS, mappers);
conf.setInt(MR_NUM_MAPS, mappers);
}
List<List<Pair<SnapshotFileInfo, Long>>> groups = getBalancedSplits(snapshotFiles, mappers);
List<InputSplit> splits = new ArrayList(groups.size());
for (List<Pair<SnapshotFileInfo, Long>> files: groups) {
splits.add(new ExportSnapshotInputSplit(files));
}
return splits;
}
@Override
/** {@inheritDoc} */
public void checkOutputSpecs(JobContext context)
throws IOException, InterruptedException {
Configuration conf = context.getConfiguration();
DBConfiguration dbConf = new DBConfiguration(conf);
// Sanity check all the configuration values we need.
if (null == conf.get(DBConfiguration.URL_PROPERTY)) {
throw new IOException("Database connection URL is not set.");
} else if (null == dbConf.getOutputTableName()) {
throw new IOException("Table name is not set for export.");
} else if (null == dbConf.getOutputFieldNames()) {
throw new IOException(
"Output field names are null.");
} else if (null == conf.get(ExportJobBase.SQOOP_EXPORT_UPDATE_COL_KEY)) {
throw new IOException("Update key column is not set for export.");
}
}
@Override
/** {@inheritDoc} */
public void checkOutputSpecs(JobContext context)
throws IOException, InterruptedException {
Configuration conf = context.getConfiguration();
DBConfiguration dbConf = new DBConfiguration(conf);
// Sanity check all the configuration values we need.
if (null == conf.get(DBConfiguration.URL_PROPERTY)) {
throw new IOException("Database connection URL is not set.");
} else if (null == dbConf.getOutputTableName()) {
throw new IOException("Table name is not set for export");
} else if (null == dbConf.getOutputFieldNames()
&& 0 == dbConf.getOutputFieldCount()) {
throw new IOException(
"Output field names are null and zero output field count set.");
}
}
/**
* Generate new-api mapreduce InputFormat splits
* @param jobContext JobContext required by InputFormat
* @param inputSplitDir Directory in which to generate splits information
*
* @return InputSplitInfo containing the split files' information and the
* location hints for each split generated to be used to determining parallelism of
* the map stage.
*
* @throws IOException
* @throws InterruptedException
* @throws ClassNotFoundException
*/
private static InputSplitInfoDisk writeNewSplits(JobContext jobContext,
Path inputSplitDir) throws IOException, InterruptedException,
ClassNotFoundException {
org.apache.hadoop.mapreduce.InputSplit[] splits =
generateNewSplits(jobContext, null, 0);
Configuration conf = jobContext.getConfiguration();
JobSplitWriter.createSplitFiles(inputSplitDir, conf,
inputSplitDir.getFileSystem(conf), splits);
List<TaskLocationHint> locationHints =
new ArrayList<TaskLocationHint>(splits.length);
for (int i = 0; i < splits.length; ++i) {
locationHints.add(
new TaskLocationHint(new HashSet<String>(
Arrays.asList(splits[i].getLocations())), null));
}
return new InputSplitInfoDisk(
JobSubmissionFiles.getJobSplitFile(inputSplitDir),
JobSubmissionFiles.getJobSplitMetaFile(inputSplitDir),
splits.length, locationHints, jobContext.getCredentials());
}
/**
* Get the {@link CompressionCodec} for compressing the job outputs.
* @param job the {@link Job} to look in
* @param defaultValue the {@link CompressionCodec} to return if not set
* @return the {@link CompressionCodec} to be used to compress the
* job outputs
* @throws IllegalArgumentException if the class was specified, but not found
*/
public static Class<? extends CompressionCodec>
getOutputCompressorClass(JobContext job,
Class<? extends CompressionCodec> defaultValue) {
Class<? extends CompressionCodec> codecClass = defaultValue;
Configuration conf = job.getConfiguration();
String name = conf.get("mapred.output.compression.codec");
if (name != null) {
try {
codecClass =
conf.getClassByName(name).asSubclass(CompressionCodec.class);
} catch (ClassNotFoundException e) {
throw new IllegalArgumentException("Compression codec " + name +
" was not found.", e);
}
}
return codecClass;
}
@Override
public List<InputSplit> getSplits(JobContext context) throws IOException, InterruptedException {
ArrayList<InputSplit> splits = new ArrayList<>();
Configuration conf = context.getConfiguration();
for (String qName : conf.getStringCollection(QUERIES)) {
int repeatCount = conf.getInt(PREFIX + qName + REPEAT_SUFFIX, 1);
String query = conf.get(PREFIX + qName + QUERY_SUFFIX);
for (int i=0; i<repeatCount; i++) {
splits.add(new QueryInputSplit(qName, query , i));
}
}
return splits;
}
@Override
public List<InputSplit> getSplits(JobContext context) throws IOException, InterruptedException {
final Configuration configuration = context.getConfiguration();
final QueryPlan queryPlan = getQueryPlan(context,configuration);
final List<KeyRange> allSplits = queryPlan.getSplits();
final List<InputSplit> splits = generateSplits(queryPlan,allSplits);
return splits;
}
public static <T> FieldExtractor<T> getFieldExtractor(JobContext job) {
Configuration conf = job.getConfiguration();
try {
//noinspection unchecked
return (FieldExtractor<T>) conf.getClassByName(conf.get(PinotOutputFormat.FIELD_EXTRACTOR_CLASS)).newInstance();
} catch (Exception e) {
throw new IllegalStateException(
"Caught exception while creating instance of field extractor configured with key: " + FIELD_EXTRACTOR_CLASS);
}
}
/** Defers to {@link BAMInputFormat} or {@link CRAMInputFormat} as appropriate for each
* individual path. SAM paths do not require special handling, so their splits are left
* unchanged.
*/
@Override public List<InputSplit> getSplits(JobContext job)
throws IOException
{
if (this.conf == null)
this.conf = job.getConfiguration();
final List<InputSplit> origSplits =
BAMInputFormat.removeIndexFiles(super.getSplits(job));
// We have to partition the splits by input format and hand them over to
// the *InputFormats for any further handling.
//
// BAMInputFormat and CRAMInputFormat need to change the split boundaries, so we can
// just extract the BAM and CRAM ones and leave the rest as they are.
final List<InputSplit>
bamOrigSplits = new ArrayList<InputSplit>(origSplits.size()),
cramOrigSplits = new ArrayList<InputSplit>(origSplits.size()),
newSplits = new ArrayList<InputSplit>(origSplits.size());
for (final InputSplit iSplit : origSplits) {
final FileSplit split = (FileSplit)iSplit;
if (SAMFormat.BAM.equals(getFormat(split.getPath())))
bamOrigSplits.add(split);
else if (SAMFormat.CRAM.equals(getFormat(split.getPath())))
cramOrigSplits.add(split);
else
newSplits.add(split);
}
newSplits.addAll(bamIF.getSplits(bamOrigSplits, job.getConfiguration()));
newSplits.addAll(cramIF.getSplits(cramOrigSplits, job.getConfiguration()));
return newSplits;
}
@Override
public List<InputSplit> getSplits(JobContext context) throws IOException {
Path[] dirs = getInputPaths(context);
List<BlurInputSplit> splits;
Configuration configuration = context.getConfiguration();
if (isSplitCommandSupported(configuration)) {
splits = getSplitsFromCommand(configuration, dirs);
} else {
splits = getSplits(configuration, dirs);
}
return toList(getMaxNumberOfMaps(configuration), splits);
}
@Override
public List<InputSplit> getSplits(JobContext jobContext) throws IOException {
//read the params from AccumuloInputFormat
Configuration conf = jobContext.getConfiguration();
Instance instance = MRUtils.AccumuloProps.getInstance(jobContext);
String user = MRUtils.AccumuloProps.getUsername(jobContext);
AuthenticationToken password = MRUtils.AccumuloProps.getPassword(jobContext);
String table = MRUtils.AccumuloProps.getTablename(jobContext);
ArgumentChecker.notNull(instance);
ArgumentChecker.notNull(table);
//find the files necessary
try {
Connector connector = instance.getConnector(user, password);
TableOperations tos = connector.tableOperations();
String tableId = tos.tableIdMap().get(table);
Scanner scanner = connector.createScanner("accumulo.metadata", Authorizations.EMPTY); //TODO: auths?
scanner.setRange(new Range(new Text(tableId + "\u0000"), new Text(tableId + "\uFFFD")));
scanner.fetchColumnFamily(new Text("file"));
List<String> files = new ArrayList<String>();
List<InputSplit> fileSplits = new ArrayList<InputSplit>();
for (Map.Entry<Key, Value> entry : scanner) {
String file = entry.getKey().getColumnQualifier().toString();
Path path = new Path(file);
FileSystem fs = path.getFileSystem(conf);
FileStatus fileStatus = fs.getFileStatus(path);
long len = fileStatus.getLen();
BlockLocation[] fileBlockLocations = fs.getFileBlockLocations(fileStatus, 0, len);
files.add(file);
fileSplits.add(new FileSplit(path, 0, len, fileBlockLocations[0].getHosts()));
}
System.out.println(files);
return fileSplits;
} catch (Exception e) {
throw new IOException(e);
}
}
@Override
/** {@inheritDoc} */
public void checkOutputSpecs(JobContext context)
throws IOException, InterruptedException {
Configuration conf = context.getConfiguration();
if (null == conf.get(DELEGATE_CLASS_KEY)) {
throw new IOException("Delegate FieldMapProcessor class is not set.");
}
}
@Override
public List<InputSplit> getSplits(JobContext job) throws IOException {
List<InputSplit> splits = new ArrayList<InputSplit>();
Configuration conf = job.getConfiguration();
String dsName
= conf.get(MainframeConfiguration.MAINFRAME_INPUT_DATASET_NAME);
LOG.info("Datasets to transfer from: " + dsName);
List<String> datasets = retrieveDatasets(dsName, conf);
if (datasets.isEmpty()) {
throw new IOException ("No sequential datasets retrieved from " + dsName);
} else {
int count = datasets.size();
int chunks = Math.min(count, ConfigurationHelper.getJobNumMaps(job));
for (int i = 0; i < chunks; i++) {
splits.add(new MainframeDatasetInputSplit());
}
int j = 0;
while(j < count) {
for (InputSplit sp : splits) {
if (j == count) {
break;
}
((MainframeDatasetInputSplit)sp).addDataset(datasets.get(j));
j++;
}
}
}
return splits;
}
/**
* Get a PathFilter instance of the filter set for the input paths.
*
* @return the PathFilter instance set for the job, NULL if none has been set.
*/
public static PathFilter getInputPathFilter(JobContext context) {
Configuration conf = context.getConfiguration();
Class<?> filterClass = conf.getClass("mapred.input.pathFilter.class", null,
PathFilter.class);
return (filterClass != null) ?
(PathFilter) ReflectionUtils.newInstance(filterClass, conf) : null;
}
public RasterTileResizeHelper(final JobContext context) {
index = JobContextIndexStore.getIndices(context)[0];
indexNames = new String[] {index.getName()};
final DataTypeAdapter[] adapters = JobContextAdapterStore.getDataAdapters(context);
final Configuration conf = context.getConfiguration();
final String newTypeName = conf.get(RasterTileResizeJobRunner.NEW_TYPE_NAME_KEY);
oldAdapterId = (short) conf.getInt(RasterTileResizeJobRunner.OLD_ADAPTER_ID_KEY, -1);
newAdapterId =
(short) conf.getInt(
RasterTileResizeJobRunner.NEW_ADAPTER_ID_KEY,
InternalAdapterStoreImpl.getLazyInitialAdapterId(newTypeName));
for (final DataTypeAdapter adapter : adapters) {
if (adapter.getTypeName().equals(newTypeName)) {
if (((RasterDataAdapter) adapter).getTransform() == null) {
// the new adapter doesn't have a merge strategy - resizing
// will require merging, so default to NoDataMergeStrategy
newAdapter =
new RasterDataAdapter(
(RasterDataAdapter) adapter,
newTypeName,
new NoDataMergeStrategy());
} else {
newAdapter = (RasterDataAdapter) adapter;
}
}
}
}
/**
* Use the input splits to take samples of the input and generate sample
* keys. By default reads 100,000 keys from 10 locations in the input, sorts
* them and picks N-1 keys to generate N equally sized partitions.
* @param job the job to sample
* @param partFile where to write the output file to
* @throws Throwable if something goes wrong
*/
public static void writePartitionFile(final JobContext job,
Path partFile) throws Throwable {
long t1 = System.currentTimeMillis();
Configuration conf = job.getConfiguration();
//Instead of reading from hdfs, now the input is from Pravega stream
final PravegaInputFormat inFormat = new PravegaInputFormat();
final TextSampler sampler = new TextSampler();
int partitions = job.getNumReduceTasks();
long sampleSize =
conf.getLong(TeraSortConfigKeys.SAMPLE_SIZE.key(),
TeraSortConfigKeys.DEFAULT_SAMPLE_SIZE);
final List<InputSplit> splits = inFormat.getSplits(job);
long t2 = System.currentTimeMillis();
System.out.println("Computing input splits took " + (t2 - t1) + "ms");
int samples =
Math.min(conf.getInt(TeraSortConfigKeys.NUM_PARTITIONS.key(),
TeraSortConfigKeys.DEFAULT_NUM_PARTITIONS),
splits.size());
System.out.println("Sampling " + samples + " splits of " + splits.size());
final long recordsPerSample = sampleSize / samples;
final int sampleStep = splits.size() / samples;
Thread[] samplerReader = new Thread[samples];
SamplerThreadGroup threadGroup = new SamplerThreadGroup("Sampler Reader Thread Group");
// take N samples from different parts of the input
for(int i=0; i < samples; ++i) {
final int idx = i;
samplerReader[i] =
new Thread (threadGroup,"Sampler Reader " + idx) {
{
setDaemon(true);
}
public void run() {
long records = 0;
try {
TaskAttemptContext context = new TaskAttemptContextImpl(
job.getConfiguration(), new TaskAttemptID());
RecordReader<Text, Text> reader =
inFormat.createRecordReader(splits.get(sampleStep * idx),
context);
reader.initialize(splits.get(sampleStep * idx), context);
while (reader.nextKeyValue()) {
sampler.addKey(new Text(reader.getCurrentValue().toString().substring(0, 10)));
records += 1;
if (recordsPerSample <= records) {
break;
}
}
} catch (IOException ie){
System.err.println("Got an exception while reading splits " +
StringUtils.stringifyException(ie));
throw new RuntimeException(ie);
} catch (InterruptedException e) {
}
}
};
samplerReader[i].start();
}
FileSystem outFs = partFile.getFileSystem(conf);
DataOutputStream writer = outFs.create(partFile, true, 64*1024, (short) 10,
outFs.getDefaultBlockSize(partFile));
for (int i = 0; i < samples; i++) {
try {
samplerReader[i].join();
if(threadGroup.getThrowable() != null){
throw threadGroup.getThrowable();
}
} catch (InterruptedException e) {
}
}
for(Text split : sampler.createPartitions(partitions)) {
split.write(writer);
}
writer.close();
long t3 = System.currentTimeMillis();
System.out.println("Computing parititions took " + (t3 - t2) + "ms");
}
private void init(final JobContext context, final Class<?> scope, final Logger logger)
throws IOException {
final ScopedJobConfiguration scopedJob =
new ScopedJobConfiguration(context.getConfiguration(), scope, logger);
try {
centroidFactory =
(AnalyticItemWrapperFactory<T>) CentroidParameters.Centroid.WRAPPER_FACTORY_CLASS.getHelper().getValue(
context,
scope,
CentroidItemWrapperFactory.class);
centroidFactory.initialize(context, scope, logger);
} catch (final Exception e1) {
LOGGER.error(
"Cannot instantiate "
+ GeoWaveConfiguratorBase.enumToConfKey(
this.getClass(),
CentroidParameters.Centroid.WRAPPER_FACTORY_CLASS));
throw new IOException(e1);
}
this.level = scopedJob.getInt(CentroidParameters.Centroid.ZOOM_LEVEL, 1);
centroidDataTypeId = scopedJob.getString(CentroidParameters.Centroid.DATA_TYPE_ID, "centroid");
batchId =
scopedJob.getString(
GlobalParameters.Global.BATCH_ID,
Long.toString(Calendar.getInstance().getTime().getTime()));
final String indexName =
scopedJob.getString(
CentroidParameters.Centroid.INDEX_NAME,
new SpatialDimensionalityTypeProvider().createIndex(new SpatialOptions()).getName());
final PersistableStore store =
(PersistableStore) StoreParameters.StoreParam.INPUT_STORE.getHelper().getValue(
context,
scope,
null);
dataStore = store.getDataStoreOptions().createDataStore();
indexStore = store.getDataStoreOptions().createIndexStore();
index = indexStore.getIndex(indexName);
final PersistentAdapterStore adapterStore = store.getDataStoreOptions().createAdapterStore();
adapter =
(GeotoolsFeatureDataAdapter) adapterStore.getAdapter(
store.getDataStoreOptions().createInternalAdapterStore().getAdapterId(
centroidDataTypeId)).getAdapter();
}
@Override
protected boolean isSplitable(JobContext context, Path filename) {
Configuration conf = context.getConfiguration();
return conf.getBoolean(ConfigConstants.CONF_SPLIT_INPUT, false)
&& !conf.getBoolean(ConfigConstants.INPUT_COMPRESSED, false);
}