下面列出了怎么用org.apache.hadoop.mapreduce.TaskAttemptContext的API类实例代码及写法,或者点击链接到github查看源代码。
private void initializeSuperClass(InputSplit split, TaskAttemptContext context) throws IOException {
super.initialize(split, context);
if (split instanceof FileSplit) {
FileSplit fs = (FileSplit) split;
Path p = fs.getPath();
rawFileName = p.getName();
if (log.isDebugEnabled()) {
log.debug("FileSplit Info: ");
log.debug("Start: " + fs.getStart());
log.debug("Length: " + fs.getLength());
log.debug("Locations: " + Arrays.toString(fs.getLocations()));
log.debug("Path: " + fs.getPath());
}
} else {
throw new IOException("Input Split unhandled.");
}
}
@Override
public void initialize(InputSplit split, TaskAttemptContext context) throws IOException,
InterruptedException
{
if (split instanceof MrsPyramidInputSplit)
{
MrsPyramidInputSplit fsplit = (MrsPyramidInputSplit) split;
ifContext = ImageInputFormatContext.load(context.getConfiguration());
if (ifContext.getBounds() != null)
{
inputBounds = ifContext.getBounds();
}
scannedInputReader = createRecordReader(fsplit, context);
tilesize = ifContext.getTileSize();
zoomLevel = ifContext.getZoomLevel();
}
else
{
throw new IOException("Got a split of type " + split.getClass().getCanonicalName() +
" but expected one of type " + MrsPyramidInputSplit.class.getCanonicalName());
}
}
public ParsedRecordReader ( FileSplit split,
TaskAttemptContext context,
Class<? extends Parser> parser_class,
Trees args ) throws IOException {
Configuration conf = context.getConfiguration();
start = split.getStart();
end = start + split.getLength();
Path file = split.getPath();
FileSystem fs = file.getFileSystem(conf);
fsin = fs.open(split.getPath());
try {
parser = parser_class.newInstance();
} catch (Exception ex) {
throw new Error("Unrecognized parser:"+parser_class);
};
parser.initialize(args);
parser.open(fsin,start,end);
result = null;
}
public UpdateRecordWriter(TaskAttemptContext context)
throws ClassNotFoundException, SQLException {
super(context);
Configuration conf = getConf();
DBConfiguration dbConf = new DBConfiguration(conf);
this.tableName = dbConf.getOutputTableName();
this.columnNames = dbConf.getOutputFieldNames();
String updateKeyColumns =
conf.get(ExportJobBase.SQOOP_EXPORT_UPDATE_COL_KEY);
Set<String> updateKeys = new LinkedHashSet<String>();
StringTokenizer stok = new StringTokenizer(updateKeyColumns, ",");
while (stok.hasMoreTokens()) {
String nextUpdateKey = stok.nextToken().trim();
if (nextUpdateKey.length() > 0) {
updateKeys.add(nextUpdateKey);
} else {
throw new RuntimeException("Invalid update key column value specified"
+ ": '" + updateKeyColumns + "'");
}
}
updateCols = updateKeys.toArray(new String[updateKeys.size()]);
}
/** {@inheritDoc} */
public RecordWriter<K, V> getRecordWriter(TaskAttemptContext context)
throws IOException {
DBConfiguration dbConf = new DBConfiguration(context.getConfiguration());
String tableName = dbConf.getOutputTableName();
String[] fieldNames = dbConf.getOutputFieldNames();
if(fieldNames == null) {
fieldNames = new String[dbConf.getOutputFieldCount()];
}
try {
Connection connection = dbConf.getConnection();
PreparedStatement statement = null;
statement = connection.prepareStatement(
constructQuery(tableName, fieldNames));
return new DBRecordWriter(connection, statement);
} catch (Exception ex) {
throw new IOException(ex.getMessage());
}
}
@Override
public void close(TaskAttemptContext arg0) throws IOException,
InterruptedException {
if (txtArchive != null) {
txtArchive.close();
}
if (xmlArchive != null) {
xmlArchive.close();
}
if (jsonArchive != null) {
jsonArchive.close();
}
if (binaryArchive != null) {
binaryArchive.close();
}
}
@Override
public RecordReader<Key,Value> createRecordReader(InputSplit split, TaskAttemptContext context) {
return new RecordReaderBase<Key,Value>() {
@Override
public boolean nextKeyValue() throws IOException, InterruptedException {
if (scannerIterator.hasNext()) {
++numKeysRead;
Entry<Key,Value> entry = scannerIterator.next();
currentK = currentKey = entry.getKey();
currentV = currentValue = entry.getValue();
if (log.isTraceEnabled())
log.trace("Processing key/value pair: " + DefaultFormatter.formatEntry(entry, true));
return true;
} else if (numKeysRead < 0) {
numKeysRead = 0;
}
return false;
}
};
}
private NYCTLCReader getNYCTLCRecordReader(String file) throws IOException, URISyntaxException {
InputSplit split = ColumnBasedHandlerTestUtil.getSplit(file);
TaskAttemptContext ctx = new TaskAttemptContextImpl(conf, new TaskAttemptID());
TypeRegistry.reset();
TypeRegistry.getInstance(ctx.getConfiguration());
log.debug(TypeRegistry.getContents());
NYCTLCReader reader = new NYCTLCReader();
reader.initialize(split, ctx);
return reader;
}
@SuppressFBWarnings(value = {"SQL_INJECTION_JDBC", "SQL_PREPARED_STATEMENT_GENERATED_FROM_NONCONSTANT_STRING"}, justification = "User supplied queries are a requirement")
@Override
public void initialize(InputSplit split, TaskAttemptContext context) throws IOException, InterruptedException
{
if (!(split instanceof PgInputSplit)) {
throw new IOException("Expected an instance of PgInputSplit");
}
offset = ((PgInputSplit) split).getOffset();
limit = ((PgInputSplit) split).getLimit();
currIndex = offset - 1;
try
{
conn = PgVectorDataProvider.getDbConnection(dbSettings);
// If the offset is < 0, then there is only one partition, so no need
// for a limit query.
String fullQuery = (offset < 0) ? dbSettings.getQuery() : (dbSettings.getQuery() + " OFFSET " + offset + " LIMIT " + limit);
stmt = conn.prepareStatement(fullQuery,
ResultSet.TYPE_FORWARD_ONLY,
ResultSet.CONCUR_READ_ONLY);
rs = ((PreparedStatement) stmt).executeQuery();
ResultSetMetaData metadata = rs.getMetaData();
columnCount = metadata.getColumnCount();
columnLabels = new String[columnCount];
for (int c=1; c <= columnCount; c++) {
columnLabels[c-1] = metadata.getColumnLabel(c);
}
}
catch (SQLException e)
{
throw new IOException("Could not open database.", e);
}
}
@Override
public RecordReader<NullWritable, MV> createRecordReader(InputSplit inputSplit,
TaskAttemptContext taskAttemptContext) throws IOException, InterruptedException {
MneMapreduceRecordReader<MV, V> reader = new MneMapreduceRecordReader<MV, V>();
reader.initialize(inputSplit, taskAttemptContext);
return reader;
}
/**
* Returns a reader for this split of the distributed cache file list.
*/
@Override
public RecordReader<LongWritable, BytesWritable> createRecordReader(
InputSplit split, final TaskAttemptContext taskContext)
throws IOException, InterruptedException {
return new SequenceFileRecordReader<LongWritable, BytesWritable>();
}
@Override
public void initialize(InputSplit split, TaskAttemptContext context) throws IOException,
InterruptedException
{
// TODO eaw - Better to use isAssignableFrom so it doesn't break if TiledInputSplit is ever subclassed
if (split instanceof TiledInputSplit)
{
TiledInputSplit tiledInputSplit = (TiledInputSplit) split;
startTileId = tiledInputSplit.getStartTileId();
endTileId = tiledInputSplit.getEndTileId();
// TODO, can use tiledInputSplit instead of casting split again
FileSplit fileSplit = (FileSplit) ((TiledInputSplit) split).getWrappedSplit();
Configuration conf = context.getConfiguration();
Path path = fileSplit.getPath();
FileSystem fs = path.getFileSystem(conf);
// Use a factory to create the reader reader to make this class easier to test and support decoupling the reader
// lifecycle from this object's lifecycle.
reader = readerFactory.createReader(fs, path, conf);
try
{
key = (TileIdWritable) reader.getKeyClass().newInstance();
value = (RasterWritable) reader.getValueClass().newInstance();
}
catch (InstantiationException | IllegalAccessException e)
{
throw new IOException(e);
}
}
else
{
// TODO eaw - IllegalArgumentException would be more appropriate here
throw new IOException("Expected a TiledInputSplit but received " + split.getClass().getName());
}
}
private void writeOutput(TaskAttempt attempt, Configuration conf)
throws Exception {
TaskAttemptContext tContext = new TaskAttemptContextImpl(conf,
TypeConverter.fromYarn(attempt.getID()));
TextOutputFormat<?, ?> theOutputFormat = new TextOutputFormat();
RecordWriter theRecordWriter = theOutputFormat
.getRecordWriter(tContext);
NullWritable nullWritable = NullWritable.get();
try {
theRecordWriter.write(key1, val1);
theRecordWriter.write(null, nullWritable);
theRecordWriter.write(null, val1);
theRecordWriter.write(nullWritable, val2);
theRecordWriter.write(key2, nullWritable);
theRecordWriter.write(key1, null);
theRecordWriter.write(null, null);
theRecordWriter.write(key2, val2);
} finally {
theRecordWriter.close(tContext);
}
OutputFormat outputFormat = ReflectionUtils.newInstance(
tContext.getOutputFormatClass(), conf);
OutputCommitter committer = outputFormat.getOutputCommitter(tContext);
committer.commitTask(tContext);
}
public void testInvalidVersionNumber() throws IOException {
Job job = Job.getInstance();
FileOutputFormat.setOutputPath(job, outDir);
Configuration conf = job.getConfiguration();
conf.set(MRJobConfig.TASK_ATTEMPT_ID, attempt);
conf.setInt(FileOutputCommitter.FILEOUTPUTCOMMITTER_ALGORITHM_VERSION, 3);
TaskAttemptContext tContext = new TaskAttemptContextImpl(conf, taskID);
try {
new FileOutputCommitter(outDir, tContext);
fail("should've thrown an exception!");
} catch (IOException e) {
//test passed
}
}
/**
* From each split sampled, take the first numSamples / numSplits records.
*/
@SuppressWarnings("unchecked") // ArrayList::toArray doesn't preserve type
public K[] getSample(InputFormat<K,V> inf, Job job)
throws IOException, InterruptedException {
List<InputSplit> splits = inf.getSplits(job);
ArrayList<K> samples = new ArrayList<K>(numSamples);
int splitsToSample = Math.min(maxSplitsSampled, splits.size());
int samplesPerSplit = numSamples / splitsToSample;
long records = 0;
for (int i = 0; i < splitsToSample; ++i) {
TaskAttemptContext samplingContext = new TaskAttemptContextImpl(
job.getConfiguration(), new TaskAttemptID());
RecordReader<K,V> reader = inf.createRecordReader(
splits.get(i), samplingContext);
reader.initialize(splits.get(i), samplingContext);
while (reader.nextKeyValue()) {
samples.add(ReflectionUtils.copy(job.getConfiguration(),
reader.getCurrentKey(), null));
++records;
if ((i+1) * samplesPerSplit <= records) {
break;
}
}
reader.close();
}
return (K[])samples.toArray();
}
public SolrRecordWriter(TaskAttemptContext context, Path outputShardDir, int batchSize) {
this.batchSize = batchSize;
this.batch = new ArrayList<>(batchSize);
Configuration conf = context.getConfiguration();
// setLogLevel("org.apache.solr.core", "WARN");
// setLogLevel("org.apache.solr.update", "WARN");
heartBeater = new HeartBeater(context);
try {
heartBeater.needHeartBeat();
Path solrHomeDir = SolrRecordWriter.findSolrConfig(conf);
FileSystem fs = outputShardDir.getFileSystem(conf);
EmbeddedSolrServer solr = createEmbeddedSolrServer(solrHomeDir, fs, outputShardDir);
batchWriter = new BatchWriter(solr, batchSize,
context.getTaskAttemptID().getTaskID(),
SolrOutputFormat.getSolrWriterThreadCount(conf),
SolrOutputFormat.getSolrWriterQueueSize(conf));
} catch (Exception e) {
throw new IllegalStateException(String.format(Locale.ENGLISH,
"Failed to initialize record writer for %s, %s", context.getJobName(), conf
.get("mapred.task.id")), e);
} finally {
heartBeater.cancelHeartBeat();
}
}
@Override
public void close(TaskAttemptContext context) throws IOException {
for (BufferedMutator mutator : mutatorMap.values()) {
mutator.close();
}
if (connection != null) {
connection.close();
}
}
@Override
public RecordReader<K, V> createRecordReader(InputSplit split,
TaskAttemptContext context) throws IOException, InterruptedException {
TezGroupedSplit groupedSplit = (TezGroupedSplit) split;
initInputFormatFromSplit(groupedSplit);
return new TezGroupedSplitsRecordReader(groupedSplit, context);
}
@Override
@edu.umd.cs.findbugs.annotations.SuppressWarnings(value="UWF_FIELD_NOT_INITIALIZED_IN_CONSTRUCTOR",
justification="Delegate set by setConf")
public RecordReader<E, Void> createRecordReader(InputSplit inputSplit, TaskAttemptContext taskAttemptContext) throws IOException, InterruptedException {
Configuration conf = Hadoop.TaskAttemptContext.getConfiguration.invoke(taskAttemptContext);
DefaultConfiguration.init(conf);
return delegate.createRecordReader(inputSplit, taskAttemptContext);
}
@Test
public void testStripBOM() throws IOException {
// the test data contains a BOM at the start of the file
// confirm the BOM is skipped by LineRecordReader
String UTF8_BOM = "\uFEFF";
URL testFileUrl = getClass().getClassLoader().getResource("testBOM.txt");
assertNotNull("Cannot find testBOM.txt", testFileUrl);
File testFile = new File(testFileUrl.getFile());
Path testFilePath = new Path(testFile.getAbsolutePath());
long testFileSize = testFile.length();
Configuration conf = new Configuration();
conf.setInt(org.apache.hadoop.mapreduce.lib.input.
LineRecordReader.MAX_LINE_LENGTH, Integer.MAX_VALUE);
TaskAttemptContext context = new TaskAttemptContextImpl(conf, new TaskAttemptID());
// read the data and check whether BOM is skipped
FileSplit split = new FileSplit(testFilePath, 0, testFileSize,
(String[])null);
LineRecordReader reader = new LineRecordReader();
reader.initialize(split, context);
int numRecords = 0;
boolean firstLine = true;
boolean skipBOM = true;
while (reader.nextKeyValue()) {
if (firstLine) {
firstLine = false;
if (reader.getCurrentValue().toString().startsWith(UTF8_BOM)) {
skipBOM = false;
}
}
++numRecords;
}
reader.close();
assertTrue("BOM is not skipped", skipBOM);
}
@Override
public void setup(TaskAttemptContext context) {
IngestConfiguration ingestConfiguration = IngestConfigurationFactory.getIngestConfiguration();
markingsHelper = ingestConfiguration.getMarkingsHelper(context.getConfiguration(), TypeRegistry.getType(TypeRegistry.ERROR_PREFIX));
super.setup(context);
this.errorHelper = (ErrorShardedIngestHelper) (TypeRegistry.getType("error").getIngestHelper(context.getConfiguration()));
this.errorHelper.setDelegateHelper(this.helper);
this.helper = this.errorHelper;
this.conf = context.getConfiguration();
this.setupDictionaryCache(conf.getInt(ERROR_PROP_PREFIX + SHARD_DICTIONARY_CACHE_ENTRIES, ShardedDataTypeHandler.SHARD_DINDEX_CACHE_DEFAULT_SIZE));
setShardTableName(new Text(ConfigurationHelper.isNull(conf, ERROR_PROP_PREFIX + SHARD_TNAME, String.class)));
String tableName = conf.get(ERROR_PROP_PREFIX + SHARD_GIDX_TNAME);
setShardIndexTableName(tableName == null ? null : new Text(tableName));
tableName = conf.get(ERROR_PROP_PREFIX + SHARD_GRIDX_TNAME);
setShardReverseIndexTableName(tableName == null ? null : new Text(tableName));
tableName = conf.get(ERROR_PROP_PREFIX + METADATA_TABLE_NAME);
if (tableName == null) {
setMetadataTableName(null);
setMetadata(null);
} else {
setMetadataTableName(new Text(tableName));
setMetadata(ingestConfiguration.createMetadata(getShardTableName(), getMetadataTableName(), null /* no load date table */,
getShardIndexTableName(), getShardReverseIndexTableName(), conf.getBoolean(ERROR_PROP_PREFIX + METADATA_TERM_FREQUENCY, false)));
}
tableName = conf.get(ERROR_PROP_PREFIX + SHARD_DINDX_NAME);
setShardDictionaryIndexTableName(tableName == null ? null : new Text(tableName));
try {
defaultVisibility = flatten(markingFunctions.translateToColumnVisibility(markingsHelper.getDefaultMarkings()));
} catch (Exception e) {
throw new IllegalArgumentException("Failed to parse security marking configuration", e);
}
log.info("ShardedErrorDataTypeHandler configured.");
}
/**
* @param context
* @param mapStores
* @param reduceStores
* @throws IOException
*/
public PigOutputCommitter(TaskAttemptContext context,
List<POStore> mapStores, List<POStore> reduceStores)
throws IOException {
// create and store the map and reduce output committers
mapOutputCommitters = getCommitters(context, mapStores);
reduceOutputCommitters = getCommitters(context, reduceStores);
recoverySupported = context.getConfiguration().getBoolean(PigConfiguration.PIG_OUTPUT_COMMITTER_RECOVERY, false);
}
@Override
public void close(TaskAttemptContext context
) throws IOException,InterruptedException {
try {
collector.flush();
} catch (ClassNotFoundException cnf) {
throw new IOException("can't find class ", cnf);
}
collector.close();
}
@Override
public void initialize(InputSplit arg0, TaskAttemptContext arg1)
throws IOException, InterruptedException {
startJsonCurlyTag = ("{").getBytes(Charsets.UTF_8);
endJsonCurlyTag = ("}").getBytes(Charsets.UTF_8);
startJsonSquareTag = ("[").getBytes(Charsets.UTF_8);
endJsonSquareTag = ("]").getBytes(Charsets.UTF_8);
}
public EdgeReader<Text, LongWritable> createEdgeReader(final RecordReader<LongWritable,Text> rr) throws IOException {
return new DGALongEdgeValueReader(){
@Override
protected RecordReader<LongWritable, Text> createLineRecordReader(InputSplit inputSplit, TaskAttemptContext context) throws IOException, InterruptedException {
return rr;
}
};
}
@Override
public OutputCommitter getOutputCommitter(TaskAttemptContext context) throws IOException, InterruptedException {
List<OutputFormat> formats = getNewApiFormats(CompatHandler.taskAttemptContext(context).getConfiguration());
List<OutputCommitter> committers = new ArrayList<OutputCommitter>();
for (OutputFormat format : formats) {
committers.add(format.getOutputCommitter(context));
}
return new MultiNewOutputCommitter(committers);
}
public void init(InputSplit split, TaskAttemptContext context) throws IOException, InterruptedException {
baseCuboid = Cuboid.getBaseCuboid(cubeDesc);
rowKeyEncoder = AbstractRowKeyEncoder.createInstance(cubeSegment, baseCuboid);
FileSystem fs = FileSystem.get(context.getConfiguration());
FileSplit fSplit = (FileSplit) split;
Path path = fSplit.getPath();
rowRecordReader = new RowRecordReader(cubeDesc, path, fs);
metricsValuesBuffer = ByteBuffer.allocate(BufferedMeasureCodec.DEFAULT_BUFFER_SIZE);
rowCount = new AtomicInteger(0);
}
public RecordWriter<K,SAMRecordWritable> getRecordWriter(
TaskAttemptContext ctx, Path out)
throws IOException
{
if (this.header == null)
throw new IOException(
"Can't create a RecordWriter without the SAM header");
final boolean writeHeader = ctx.getConfiguration().getBoolean(
WRITE_HEADER_PROPERTY, true);
switch (format) {
case BAM:
return new KeyIgnoringBAMRecordWriter<K>(
out, header, writeHeader, ctx);
case SAM:
return new KeyIgnoringSAMRecordWriter<K>(
out, header, writeHeader, ctx);
case CRAM:
return new KeyIgnoringCRAMRecordWriter<K>(
out, header, writeHeader, ctx);
default: assert false; return null;
}
}
@Override
public void initialize(InputSplit curSplit, TaskAttemptContext curContext)
throws IOException, InterruptedException {
this.split = (CombineFileSplit) curSplit;
this.context = curContext;
if (null == rr) {
createChildReader();
}
FileSplit fileSplit = new FileSplit(this.split.getPath(index),
this.split.getOffset(index), this.split.getLength(index),
this.split.getLocations());
this.rr.initialize(fileSplit, this.context);
}
@Override
public void initialize(InputSplit inputSplit,
TaskAttemptContext taskAttemptContext)
throws IOException, InterruptedException {
split = (MainframeDatasetInputSplit)inputSplit;
conf = taskAttemptContext.getConfiguration();
inputClass = (Class<T>) (conf.getClass(
DBConfiguration.INPUT_CLASS_PROPERTY, null));
key = null;
datasetRecord = null;
numberRecordRead = 0;
datasetProcessed = 0;
}