下面列出了org.apache.hadoop.mapreduce.Mapper#Context ( ) 实例代码,或者点击链接到github查看源代码,也可以在右侧发表评论。
@Override
protected void startAligner(Mapper.Context context) throws IOException, InterruptedException {
File file1 = new File(getFileName(tmpdir, taskId, 1));
if (!file1.exists()) {
file1.createNewFile();
}
fastqFile1 = new BufferedWriter(new FileWriter(file1.getAbsoluteFile()));
if(isPaired) {
File file2 = new File(getFileName(tmpdir, taskId, 2));
if (!file2.exists()) {
file2.createNewFile();
}
fastqFile2 = new BufferedWriter(new FileWriter(file2.getAbsoluteFile()));
}
// make output dir!
File starOut = new File(starOutDir);
starOut.mkdirs();
}
/**
* Add mapper(the first mapper) that reads input from the input
* context and writes to queue
*/
@SuppressWarnings("unchecked")
void addMapper(TaskInputOutputContext inputContext,
ChainBlockingQueue<KeyValuePair<?, ?>> output, int index)
throws IOException, InterruptedException {
Configuration conf = getConf(index);
Class<?> keyOutClass = conf.getClass(MAPPER_OUTPUT_KEY_CLASS, Object.class);
Class<?> valueOutClass = conf.getClass(MAPPER_OUTPUT_VALUE_CLASS,
Object.class);
RecordReader rr = new ChainRecordReader(inputContext);
RecordWriter rw = new ChainRecordWriter(keyOutClass, valueOutClass, output,
conf);
Mapper.Context mapperContext = createMapContext(rr, rw,
(MapContext) inputContext, getConf(index));
MapRunner runner = new MapRunner(mappers.get(index), mapperContext, rr, rw);
threads.add(runner);
}
static public BWAAlnInstance getBWAInstance(Mapper.Context context, String bin) throws IOException, InterruptedException, URISyntaxException {
if(instance == null) {
instance = new BWAAlnInstance(context, bin);
instance.startAligner(context);
}
BWAAlnInstance.context = context;
Logger.DEBUG("Started BWA");
return instance;
}
@Override
protected void doSetup(Mapper.Context context) throws IOException {
super.doSetup(context);
long baseCuboid = Cuboid.getBaseCuboidId(cubeDesc);
GTInfo gtInfo = CubeGridTable.newGTInfo(Cuboid.findForMandatory(cubeDesc, baseCuboid),
new CubeDimEncMap(cubeDesc, dictionaryMap));
keyValueBuffer = ByteBuffer.allocate(gtInfo.getMaxRecordLength());
keyOffset = cubeSegment.getRowKeyPreambleSize();
}
public static AlignerInstance getSTARInstance(Mapper.Context context, String bin, int starType) throws URISyntaxException, IOException, InterruptedException {
if(instance == null) {
Logger.DEBUG("STAR instance type: " + starType);
instance = new STARInstance(context, bin, starType);
instance.startAligner(context);
}
BWAAlnInstance.context = context;
Logger.DEBUG("Started STAR");
return instance;
}
@Override
protected void map(LongWritable key, Text value, Mapper.Context context) throws IOException, InterruptedException {
String[] split = value.toString().split("\t");
try {
k.set(split[0]+"\t"+split[1]+"\t"+split[2]+"\t"+split[3]+"\t"+split[4]); // gene_id contig start end strand
v.set(Integer.parseInt(split[split.length - 1]));
context.write(k, v);
} catch (ArrayIndexOutOfBoundsException | NumberFormatException ex) { // ignore header lines!
Logger.DEBUG("invalid line ignored; " + value.toString());
}
}
@SuppressWarnings("rawtypes")
protected void setup(Mapper.Context context) throws IOException, InterruptedException {
String dpBeanString = context.getConfiguration().get(DataProfilingConstants.DATA_PROFILING_BEAN);
LOGGER.debug("Inside Mapper set up,data profiling bean received: "+ dpBeanString);
Gson gson = new Gson();
Type type = new TypeToken<DataProfilingBean>() {
}.getType();
DataProfilingBean dataProfilingBean= gson.fromJson(dpBeanString, type);
fieldSeparator = dataProfilingBean.getFieldSeparator();
fieldProfilingBeans = dataProfilingBean.getFieldProfilingRules();
}
private Path getTmpFile(Path target, Mapper.Context context) {
Path targetWorkPath = new Path(context.getConfiguration().
get(DistCpConstants.CONF_LABEL_TARGET_WORK_PATH));
Path root = target.equals(targetWorkPath)? targetWorkPath.getParent() : targetWorkPath;
LOG.info("Creating temp file: " +
new Path(root, ".distcp.tmp." + context.getTaskAttemptID().toString()));
return new Path(root, ".distcp.tmp." + context.getTaskAttemptID().toString());
}
private Cushaw2Instance(Mapper.Context context, String bin) throws IOException, URISyntaxException {
super(context, bin);
taskId = context.getTaskAttemptID().toString();
taskId = taskId.substring(taskId.indexOf("m_"));
ref = HalvadeFileUtils.downloadCushaw2Index(context, taskId);
cushaw2CustomArgs = HalvadeConf.getCustomArgs(context.getConfiguration(), "cushaw2", "");
}
@Override
protected void map(LongWritable key, Text value,
Mapper.Context context)
throws
IOException, InterruptedException {
String document = value.toString();
System.out.println("'" + document + "'");
try {
XMLStreamReader reader =
XMLInputFactory.newInstance().createXMLStreamReader(new
ByteArrayInputStream(document.getBytes()));
String propertyName = "";
String propertyValue = "";
String currentElement = "";
while (reader.hasNext()) {
int code = reader.next();
switch (code) {
case START_ELEMENT:
currentElement = reader.getLocalName();
break;
case CHARACTERS:
if (currentElement.equalsIgnoreCase("name")) {
propertyName += reader.getText();
} else if (currentElement.equalsIgnoreCase("value")) {
propertyValue += reader.getText();
}
break;
}
}
reader.close();
context.write(propertyName.trim(), propertyValue.trim());
} catch (Exception e) {
log.error("Error processing '" + document + "'", e);
}
}
@SuppressWarnings("unchecked")
void runMapper(TaskInputOutputContext context, int index) throws IOException,
InterruptedException {
Mapper mapper = mappers.get(index);
RecordReader rr = new ChainRecordReader(context);
RecordWriter rw = new ChainRecordWriter(context);
Mapper.Context mapperContext = createMapContext(rr, rw, context,
getConf(index));
mapper.run(mapperContext);
rr.close();
rw.close(context);
}
@Test
public void map(@Mocked final Mapper.Context defaultContext) throws IOException,InterruptedException {
BitcoinTransactionMap mapper = new BitcoinTransactionMap();
final BytesWritable key = new BytesWritable();
final BitcoinTransaction value = new BitcoinTransaction(0,new byte[0], new ArrayList<BitcoinTransactionInput>(),new byte[0],new ArrayList<BitcoinTransactionOutput>(),0);
final Text defaultKey = new Text("Transaction Input Count:");
final IntWritable nullInt = new IntWritable(0);
new Expectations() {{
defaultContext.write(defaultKey,nullInt); times=1;
}};
mapper.map(key,value,defaultContext);
}
protected void setup(Mapper.Context context){
String jsonString = context.getConfiguration().get(JsonDataVaildationConstants.JSON_ARGUMENT);
String regexString = context.getConfiguration().get(JsonDataVaildationConstants.REGEX_ARGUMENT);
String nullString = context.getConfiguration().get(JsonDataVaildationConstants.NULL_ARGUMENT);
tupleCounter = 0L;
cleanTupleCounter =0L;
recordsEmittByMap = 0L;
//Populating JsonKey and Data type
schema = getDatatypeExpression(jsonString);
// Adding JsonKey given by user
keylist = getKeyList(jsonString);
if(!(regexString == null)){
//Populating JsonKey and Regex
regex = getExpression(regexString);
}
if(!(nullString == null)){
//Populating JsonKey and NULLCONDITION
nullMap = getExpression(nullString);
}
FileSplit fileSplit = (FileSplit)context.getInputSplit();
splitStartOffset = fileSplit.getStart();
//calculating end offset of current split
splitEndOffset = splitStartOffset + fileSplit.getLength() - 1;
filename = fileSplit.getPath().toUri().getPath();
filename = filename.replaceAll(JsonDataVaildationConstants.FORWARD_SLASH, JsonDataVaildationConstants.JSON_DOT).substring(1, filename.length());
}
private Path getTmpFile(Path target, Mapper.Context context) {
Path targetWorkPath = new Path(context.getConfiguration().
get(DistCpConstants.CONF_LABEL_TARGET_WORK_PATH));
Path root = target.equals(targetWorkPath)? targetWorkPath.getParent() : targetWorkPath;
LOG.info("Creating temp file: " +
new Path(root, ".distcp.tmp." + context.getTaskAttemptID().toString()));
return new Path(root, ".distcp.tmp." + context.getTaskAttemptID().toString());
}
@VisibleForTesting
long copyBytes(FileStatus sourceFileStatus, long sourceOffset,
OutputStream outStream, int bufferSize, Mapper.Context context)
throws IOException {
Path source = sourceFileStatus.getPath();
byte buf[] = new byte[bufferSize];
ThrottledInputStream inStream = null;
long totalBytesRead = 0;
try {
inStream = getInputStream(source, context.getConfiguration());
int bytesRead = readBytes(inStream, buf, sourceOffset);
while (bytesRead >= 0) {
totalBytesRead += bytesRead;
if (action == FileAction.APPEND) {
sourceOffset += bytesRead;
}
outStream.write(buf, 0, bytesRead);
updateContextStatus(totalBytesRead, context, sourceFileStatus);
bytesRead = readBytes(inStream, buf, sourceOffset);
}
outStream.close();
outStream = null;
} finally {
IOUtils.cleanup(LOG, outStream, inStream);
}
return totalBytesRead;
}
private DummyAlignerInstance(Mapper.Context context, String bin) throws IOException, URISyntaxException {
super(context, bin);
taskId = context.getTaskAttemptID().toString();
taskId = taskId.substring(taskId.indexOf("m_"));
// ref = HalvadeFileUtils.downloadBWAIndex(context, taskId);
}
UploadProgressListener(Mapper.Context context, String description) {
this.context = context;
this.description = description;
}
public MapperWriter(Mapper.Context context, HourStatistic statistic) {
this.context = context;
this.statistic = statistic;
}
public CustomizedProgresserBase(Mapper.Context mapperContext) {
this.staticProgress = mapperContext.getConfiguration().getFloat(STATIC_PROGRESS, DEFAULT_STATIC_PROGRESS);
}
protected void getIdleCores(Mapper.Context context) throws IOException {
if(tasksLeft < containers ) threads = Math.max(6, threads);
}