下面列出了org.apache.hadoop.io.serializer.JavaSerializationComparator#org.apache.hadoop.mapreduce.Mapper 实例代码,或者点击链接到github查看源代码,也可以在右侧发表评论。
@Override
protected void setup(final org.apache.hadoop.mapreduce.Mapper.Context context)
throws IOException, InterruptedException {
super.setup(context);
try {
final String ingestWithMapperStr =
context.getConfiguration().get(AbstractMapReduceIngest.INGEST_PLUGIN_KEY);
final byte[] ingestWithMapperBytes = ByteArrayUtils.byteArrayFromString(ingestWithMapperStr);
ingestWithMapper = (IngestWithMapper) PersistenceUtils.fromBinary(ingestWithMapperBytes);
globalVisibility =
context.getConfiguration().get(AbstractMapReduceIngest.GLOBAL_VISIBILITY_KEY);
indexNames = AbstractMapReduceIngest.getIndexNames(context.getConfiguration());
} catch (final Exception e) {
throw new IllegalArgumentException(e);
}
}
@Override
protected void map(LongWritable key, Text value,
Mapper<LongWritable, Text, Text, FlowBean>.Context context)
throws IOException, InterruptedException {
//super.map(key, value, context);
line = value.toString();
String[] fields = StringUtils.split(line, SEPARATOR);
if (fields.length != 11) {
LOGGER.error("invalid line: {}", line);
System.err.println("invalid line: " + line);
} else {
phoneNum = fields[1];
upFlow = Long.parseLong(fields[8]);
downFlow = Long.parseLong(fields[9]);
flowBean.setPhoneNum(phoneNum);
flowBean.setUpFlow(upFlow);
flowBean.setDownFlow(downFlow);
//sumFlow = upFlow + downFlow;
flowBean.setSumFlow(upFlow + downFlow);
text.set(phoneNum);
context.write(text, flowBean);
}
}
@SuppressWarnings("rawtypes")
public static
Class<? extends Mapper> getMapClass(mapper m) {
Class<? extends Mapper> rval = null;
switch(m) {
case IDMapper:
rval = tmp.getClass();
break;
case StringMapper:
rval = org.slc.sli.aggregation.mapreduce.map.StringValueMapper.class;
break;
case LongMapper:
rval = org.slc.sli.aggregation.mapreduce.map.LongValueMapper.class;
break;
case DoubleMapper:
rval = org.slc.sli.aggregation.mapreduce.map.DoubleValueMapper.class;
break;
case EnumMapper:
rval = org.slc.sli.aggregation.mapreduce.map.EnumValueMapper.class;
break;
}
return rval;
}
@SuppressWarnings("unchecked")
@Override
protected void setup(
final Mapper<GeoWaveInputKey, Object, PartitionDataWritable, AdapterWithObjectWritable>.Context context)
throws IOException, InterruptedException {
super.setup(context);
final ScopedJobConfiguration config =
new ScopedJobConfiguration(context.getConfiguration(), NNMapReduce.class, LOGGER);
serializationTool = new HadoopWritableSerializationTool(context);
try {
partitioner =
config.getInstance(
PartitionParameters.Partition.PARTITIONER_CLASS,
Partitioner.class,
OrthodromicDistancePartitioner.class);
partitioner.initialize(context, NNMapReduce.class);
} catch (final Exception e1) {
throw new IOException(e1);
}
}
private void testCopyingExistingFiles(FileSystem fs, CopyMapper copyMapper,
Mapper<Text, CopyListingFileStatus, Text, Text>.Context context) {
try {
for (Path path : pathList) {
copyMapper.map(new Text(DistCpUtils.getRelativePath(new Path(SOURCE_PATH), path)),
new CopyListingFileStatus(fs.getFileStatus(path)), context);
}
Assert.assertEquals(nFiles,
context.getCounter(CopyMapper.Counter.SKIP).getValue());
}
catch (Exception exception) {
Assert.assertTrue("Caught unexpected exception:" + exception.getMessage(),
false);
}
}
/**
* Set up the MapReduce job to use as inputs both an Accumulo table and the
* files containing previously derived information. Looks for a file for
* every iteration number so far, preferring final cleaned up output from
* that iteration but falling back on intermediate data if necessary.
* @param tableMapper Mapper class to use for database input
* @param rdfMapper Mapper class to use for direct RDF input
* @param fileMapper Mapper class to use for derived triples input
* @param incMapper Mapper class to use for derived inconsistencies input
* @param filter True to exclude previously derived data that couldn't be
* used to derive anything new at this point.
*/
protected void configureMultipleInput(
Class<? extends Mapper<Key, Value, ?, ?>> tableMapper,
Class<? extends Mapper<LongWritable, RyaStatementWritable, ?, ?>> rdfMapper,
Class<? extends Mapper<Fact, NullWritable, ?, ?>> fileMapper,
Class<? extends Mapper<Derivation, NullWritable, ?, ?>> incMapper,
boolean filter)
throws IOException, AccumuloSecurityException {
Path inputPath = MRReasoningUtils.getInputPath(job.getConfiguration());
if (inputPath != null) {
configureRdfInput(inputPath, rdfMapper);
}
else {
configureAccumuloInput(tableMapper);
}
configureFileInput(fileMapper, incMapper, filter);
}
@Override
protected Class<? extends Mapper> getMapperClass() {
if (isHCatJob) {
return SqoopHCatUtilities.getExportMapperClass();
}
if (options.getOdpsTable() != null) {
return OdpsExportMapper.class;
}
switch (fileType) {
case SEQUENCE_FILE:
return SequenceFileExportMapper.class;
case AVRO_DATA_FILE:
return AvroExportMapper.class;
case PARQUET_FILE:
return ParquetExportMapper.class;
case UNKNOWN:
default:
return TextExportMapper.class;
}
}
@SuppressWarnings({ "rawtypes", "unchecked" })
@Override
protected void setup(Mapper.Context context) throws IOException, InterruptedException {
super.setup(context);
ConfigSections cfg = JobConfiguration.fromHadoopConfiguration(context.getConfiguration());
MetadataConfig meta = cfg.getMetadata();
bands = meta.getCutPoints();
BSONObject obj = MongoConfigUtil.getFields(context.getConfiguration());
if (obj != null) {
fields = obj.keySet().toArray(new String[0]);
} else {
throw new IllegalArgumentException("Invalid configuration found. Aggregates must "
+ "specify a the hadoop.map.fields property.");
}
}
@Override
protected void map(
final GeoWaveInputKey key,
final SimpleFeature value,
final Mapper<GeoWaveInputKey, SimpleFeature, AvroKey<AvroSimpleFeatureCollection>, NullWritable>.Context context)
throws IOException, InterruptedException {
AvroSFCWriter avroWriter = adapterIdToAvroWriterMap.get(key.getInternalAdapterId());
if (avroWriter == null) {
avroWriter = new AvroSFCWriter(value.getFeatureType(), batchSize);
adapterIdToAvroWriterMap.put(key.getInternalAdapterId(), avroWriter);
}
final AvroSimpleFeatureCollection retVal = avroWriter.write(value);
if (retVal != null) {
outKey.datum(retVal);
context.write(outKey, outVal);
}
}
@Test
public void testSuperMapperClass() throws SecurityException,
NoSuchMethodException, IllegalArgumentException, IllegalAccessException,
InvocationTargetException {
String jarFile = "dummyJarFile";
String tableName = "dummyTableName";
Path path = new Path("dummyPath");
options.setFileLayout(SqoopOptions.FileLayout.AvroDataFile);
ImportJobContext context = new ImportJobContext(tableName, jarFile,
options, path);
avroImportJob = new MainframeImportJob(options, context);
// To access protected method by means of reflection
Class[] types = {};
Method m_getMapperClass = MainframeImportJob.class.getDeclaredMethod(
"getMapperClass", types);
m_getMapperClass.setAccessible(true);
Class<? extends Mapper> mapper = (Class<? extends Mapper>) m_getMapperClass
.invoke(avroImportJob);
assertEquals(mapper, org.apache.sqoop.mapreduce.AvroImportMapper.class);
}
/**
* Tests one of the mappers throwing exception.
*
* @throws Exception
*/
public void testChainFail() throws Exception {
Configuration conf = createJobConf();
Job job = MapReduceTestUtil.createJob(conf, inDir, outDir, 1, 0, input);
job.setJobName("chain");
ChainMapper.addMapper(job, Mapper.class, LongWritable.class, Text.class,
LongWritable.class, Text.class, null);
ChainMapper.addMapper(job, FailMap.class, LongWritable.class, Text.class,
IntWritable.class, Text.class, null);
ChainMapper.addMapper(job, Mapper.class, IntWritable.class, Text.class,
LongWritable.class, Text.class, null);
job.waitForCompletion(true);
assertTrue("Job Not failed", !job.isSuccessful());
}
@Override
protected void mapNativeValue(
final GeoWaveInputKey key,
final Object value,
final org.apache.hadoop.mapreduce.Mapper<GeoWaveInputKey, ObjectWritable, GroupIDText, BytesWritable>.Context context)
throws IOException, InterruptedException {
final AnalyticItemWrapper<Object> item = itemWrapperFactory.create(value);
nestedGroupCentroidAssigner.findCentroidForLevel(item, centroidAssociationFn);
final byte[] outData = association.toBinary();
outputValWritable.set(outData, 0, outData.length);
context.write(outputKeyWritable, outputValWritable);
}
@Override
protected void doSetup(Mapper.Context context) throws IOException {
super.doSetup(context);
long baseCuboid = Cuboid.getBaseCuboidId(cubeDesc);
GTInfo gtInfo = CubeGridTable.newGTInfo(Cuboid.findForMandatory(cubeDesc, baseCuboid),
new CubeDimEncMap(cubeDesc, dictionaryMap));
keyValueBuffer = ByteBuffer.allocate(gtInfo.getMaxRecordLength());
keyOffset = cubeSegment.getRowKeyPreambleSize();
}
@Override
public void configure(final Job job) throws Exception {
job.setMapperClass(Mapper.class);
job.setReducerClass(InputToOutputKeyReducer.class);
job.setMapOutputKeyClass(GeoWaveInputKey.class);
job.setMapOutputValueClass(ObjectWritable.class);
job.setOutputKeyClass(GeoWaveOutputKey.class);
job.setOutputValueClass(Object.class);
job.setSpeculativeExecution(false);
job.setJobName("GeoWave Input to Output");
job.setReduceSpeculativeExecution(false);
}
public static void main(String[] args) throws Exception {
Configuration conf = new Configuration();
Path inputFile = new Path(args[0]);
Path outputFile = new Path(args[1]);
FileSystem hdfs = outputFile.getFileSystem(conf);
hdfs.delete(outputFile, true);
Class<?> codecClass = Class.forName(args[2]);
conf.setBoolean("mapred.output.compress", true);
conf.setClass("mapred.output.compression.codec",
codecClass,
CompressionCodec.class);
conf.setBoolean("mapred.compress.map.output", true);
conf.setClass("mapred.map.output.compression.codec",
codecClass,
CompressionCodec.class);
Job job = new Job(conf);
job.setJarByClass(CompressedMapReduce.class);
job.setMapperClass(Mapper.class);
job.setReducerClass(Reducer.class);
job.setInputFormatClass(TextInputFormat.class);
job.setOutputFormatClass(TextOutputFormat.class);
FileInputFormat.addInputPath(job, inputFile);
FileOutputFormat.setOutputPath(job, outputFile);
job.waitForCompletion(true);
}
private Path getTmpFile(Path target, Mapper.Context context) {
Path targetWorkPath = new Path(context.getConfiguration().
get(DistCpConstants.CONF_LABEL_TARGET_WORK_PATH));
Path root = target.equals(targetWorkPath)? targetWorkPath.getParent() : targetWorkPath;
LOG.info("Creating temp file: " +
new Path(root, ".distcp.tmp." + context.getTaskAttemptID().toString()));
return new Path(root, ".distcp.tmp." + context.getTaskAttemptID().toString());
}
@SuppressWarnings("unchecked")
protected void setup(Context context)
throws IOException, InterruptedException {
// Find the Mapper from the TaggedInputSplit.
TaggedInputSplit inputSplit = (TaggedInputSplit) context.getInputSplit();
mapper = (Mapper<K1, V1, K2, V2>) ReflectionUtils.newInstance(inputSplit
.getMapperClass(), context.getConfiguration());
}
@Override
protected void cleanup(
Mapper<JobFile, FileStatus, ImmutableBytesWritable, Put>.Context context)
throws IOException, InterruptedException {
if (hbaseConnection != null) {
hbaseConnection.close();
}
super.cleanup(context);
}
@Test
public void testMapper() throws IOException, InterruptedException {
Configuration conf = new Configuration();
// basic config
conf.set("statsd.host", "localhost");
conf.set("statsd.port", "8125");
conf.set("mapreduce.job.queuename", "queue1");
conf.set("mapreduce.job.name", "job1");
// some valid aspect configs
conf.set("statsd.final.gauge.MyGroup1", "CounterGroup1");
conf.set("statsd.final.counter.MyGroup2", "CounterGroup2/Counter1");
conf.set("statsd.live.time.MyGroup3.MyCounter2", "CounterGroup3/Counter2");
conf.set("statsd.live.counter.TestGroup", TestCounters.class.getName());
CounterToStatsDConfiguration config = new CounterToStatsDConfiguration(conf);
TestStatsDEnabledMapper mapper = new TestStatsDEnabledMapper();
Mapper.Context context = mapper.createTestContext(conf);
mapper.setup(context);
Assert.assertNotNull(mapper.getHelper());
TaskAttemptContext returnedContext = mapper.getContext(context);
Assert.assertEquals(CounterStatsDClient.class.getName() + '$' + "StatsDTaskAttemptContext", returnedContext.getClass().getName());
Counter testCounter = mapper.getCounter(context, TestCounters.COUNTER1);
Assert.assertEquals(CounterStatsDClient.class.getName() + '$' + "StatsDCounter", testCounter.getClass().getName());
testCounter = mapper.getCounter(context, "CounterGroup1", "Counter1");
Assert.assertEquals(CounterStatsDClient.class.getName() + '$' + "StatsDCounter", testCounter.getClass().getName());
Assert.assertFalse(((CounterStatsDClientTest.TestCounterStatsDClient) (mapper.getHelper()).getClient()).stopped);
mapper.cleanup(context);
Assert.assertNull(mapper.getHelper().getClient());
}
private void ingestTestData(Configuration conf, TestFileLoader loader) throws IOException, InterruptedException {
log.debug("------------- ingestTestData -------------");
File tmpDir = new File(System.getProperty("java.io.tmpdir"));
Path tmpPath = new Path(tmpDir.toURI());
Path seqFile = new Path(tmpPath, UUID.randomUUID().toString());
TaskAttemptID id = new TaskAttemptID("testJob", 0, TaskType.MAP, 0, 0);
TaskAttemptContext context = new TaskAttemptContextImpl(conf, id);
try (final RawLocalFileSystem rfs = createSequenceFile(conf, seqFile, loader)) {
InputSplit split = new FileSplit(seqFile, 0, rfs.pathToFile(seqFile).length(), null);
EventSequenceFileRecordReader<LongWritable> rr = new EventSequenceFileRecordReader<>();
rr.initialize(split, context);
Path ocPath = new Path(tmpPath, "oc");
OutputCommitter oc = new FileOutputCommitter(ocPath, context);
rfs.deleteOnExit(ocPath);
StandaloneStatusReporter sr = new StandaloneStatusReporter();
EventMapper<LongWritable,RawRecordContainer,Text,Mutation> mapper = new EventMapper<>();
MapContext<LongWritable,RawRecordContainer,Text,Mutation> mapContext = new MapContextImpl<>(conf, id, rr, this.recordWriter, oc, sr, split);
Mapper<LongWritable,RawRecordContainer,Text,Mutation>.Context con = new WrappedMapper<LongWritable,RawRecordContainer,Text,Mutation>()
.getMapContext(mapContext);
mapper.run(con);
mapper.cleanup(con);
} finally {
this.recordWriter.close(context);
}
}
public MapRunner(Mapper<KEYIN, VALUEIN, KEYOUT, VALUEOUT> mapper,
Mapper<KEYIN, VALUEIN, KEYOUT, VALUEOUT>.Context mapperContext,
RecordReader<KEYIN, VALUEIN> rr, RecordWriter<KEYOUT, VALUEOUT> rw)
throws IOException, InterruptedException {
this.mapper = mapper;
this.rr = rr;
this.rw = rw;
this.chainContext = mapperContext;
}
@Override
public void setup(Mapper<WALKey, WALEdit, ImmutableBytesWritable, Mutation>.Context context)
throws IOException {
super.setup(context);
try {
this.keysToFind = readKeysToSearch(context.getConfiguration());
LOG.info("Loaded keys to find: count=" + this.keysToFind.size());
} catch (InterruptedException e) {
throw new InterruptedIOException(e.toString());
}
}
@Override
protected void setup(Mapper<NullWritable, BytesWritable, Text, BytesWritable>.Context context)
throws IOException, InterruptedException {
InputSplit split = context.getInputSplit();
Path path = ((FileSplit)split).getPath();//???
fileNameKey = new Text(path.toString());
}
@Test
public void map(@Mocked final Mapper.Context defaultContext) throws IOException,InterruptedException {
EthereumBlockMap mapper = new EthereumBlockMap();
final BytesWritable key = new BytesWritable( );
final EthereumBlock value = new EthereumBlock(null,new ArrayList<EthereumTransaction>(),null);
final Text defaultKey = new Text("Transaction Count:");
final IntWritable nullInt = new IntWritable(0);
new Expectations() {{
defaultContext.write(defaultKey,nullInt); times=1;
}};
mapper.map(key,value,defaultContext);
}
/**
* Set up the MapReduce job to use an RDF file as an input.
* @param rdfMapper class to use
*/
protected void configureRdfInput(Path inputPath,
Class<? extends Mapper<LongWritable, RyaStatementWritable, ?, ?>> rdfMapper) {
Configuration conf = job.getConfiguration();
String format = conf.get(MRUtils.FORMAT_PROP, RDFFormat.RDFXML.getName());
conf.set(MRUtils.FORMAT_PROP, format);
MultipleInputs.addInputPath(job, inputPath,
RdfFileInputFormat.class, rdfMapper);
}
public ImportJobBase(final SqoopOptions opts,
final Class<? extends Mapper> mapperClass,
final Class<? extends InputFormat> inputFormatClass,
final Class<? extends OutputFormat> outputFormatClass,
final ImportJobContext context) {
super(opts, mapperClass, inputFormatClass, outputFormatClass);
this.context = context;
}
@Test
public void map(@Mocked final Mapper.Context defaultContext) throws IOException,InterruptedException {
BitcoinTransactionMap mapper = new BitcoinTransactionMap();
final BytesWritable key = new BytesWritable();
final BitcoinTransaction value = new BitcoinTransaction(0,new byte[0], new ArrayList<BitcoinTransactionInput>(),new byte[0],new ArrayList<BitcoinTransactionOutput>(),0);
final Text defaultKey = new Text("Transaction Input Count:");
final IntWritable nullInt = new IntWritable(0);
new Expectations() {{
defaultContext.write(defaultKey,nullInt); times=1;
}};
mapper.map(key,value,defaultContext);
}
static public BWAAlnInstance getBWAInstance(Mapper.Context context, String bin) throws IOException, InterruptedException, URISyntaxException {
if(instance == null) {
instance = new BWAAlnInstance(context, bin);
instance.startAligner(context);
}
BWAAlnInstance.context = context;
Logger.DEBUG("Started BWA");
return instance;
}
@Override
protected Class<? extends Mapper> getMapperClass() {
if (options.getFileLayout() == SqoopOptions.FileLayout.TextFile) {
return MainframeDatasetImportMapper.class;
} else {
return super.getMapperClass();
}
}
@Override
protected void map(
NullWritable n1, NullWritable n2,
Mapper<NullWritable, NullWritable,
ImmutableBytesWritable, Put>.Context context)
throws java.io.IOException, InterruptedException {
byte keyBytes[] = new byte[keyLength];
byte valBytes[] = new byte[valLength];
int taskId = context.getTaskAttemptID().getTaskID().getId();
assert taskId < Byte.MAX_VALUE : "Unit tests dont support > 127 tasks!";
Random random = new Random();
byte[] key;
for (int j = 0; j < tables.length; ++j) {
for (int i = 0; i < ROWSPERSPLIT; i++) {
random.nextBytes(keyBytes);
// Ensure that unique tasks generate unique keys
keyBytes[keyLength - 1] = (byte) (taskId & 0xFF);
random.nextBytes(valBytes);
key = keyBytes;
if (multiTableMapper) {
key = MultiTableHFileOutputFormat.createCompositeKey(tables[j].getName(), keyBytes);
}
for (byte[] family : TestHFileOutputFormat2.FAMILIES) {
Put p = new Put(keyBytes);
p.addColumn(family, QUALIFIER, valBytes);
// set TTL to very low so that the scan does not return any value
p.setTTL(1l);
context.write(new ImmutableBytesWritable(key), p);
}
}
}
}