下面列出了怎么用org.apache.hadoop.mapreduce.Mapper.Context的API类实例代码及写法,或者点击链接到github查看源代码。
@Ignore("convenient trial tool for dev")
@Test
public void test() throws IOException, InterruptedException {
Configuration hconf = HadoopUtil.getCurrentConfiguration();
HiveToBaseCuboidMapper mapper = new HiveToBaseCuboidMapper();
Context context = MockupMapContext.create(hconf, metadataUrl, cubeName, null);
mapper.doSetup(context);
Reader reader = new Reader(hconf, SequenceFile.Reader.file(srcPath));
Writable key = (Writable) ReflectionUtils.newInstance(reader.getKeyClass(), hconf);
Text value = new Text();
while (reader.next(key, value)) {
mapper.map(key, value, context);
}
reader.close();
}
public void map(LongWritable key, Writable value, Context context)
throws IOException, InterruptedException {
try {
String str = value.toString();
if (value instanceof Text) {
writer.write(str, 0, str.length());
writer.newLine();
} else if (value instanceof SqoopRecord) {
writer.write(str, 0, str.length());
}
} catch (Exception e) {
doExecuteUpdate("DROP TABLE " + tmpTableName);
cleanup(context);
throw new IOException(e);
}
}
@Override
public void map(LongWritable key, Writable value, Context context)
throws IOException, InterruptedException {
line.setLength(0);
line.append(value.toString());
if (value instanceof Text) {
line.append(System.getProperty("line.separator"));
}
try {
byte[]data = line.toString().getBytes("UTF-8");
copyin.writeToCopy(data, 0, data.length);
} catch (SQLException ex) {
LoggingUtils.logAll(LOG, "Unable to execute copy", ex);
close();
throw new IOException(ex);
}
}
@Ignore("convenient trial tool for dev")
@Test
public void test() throws IOException, InterruptedException {
Configuration hconf = HadoopUtil.getCurrentConfiguration();
HiveToBaseCuboidMapper mapper = new HiveToBaseCuboidMapper();
Context context = MockupMapContext.create(hconf, metadataUrl, cubeName, null);
mapper.doSetup(context);
Reader reader = new Reader(hconf, SequenceFile.Reader.file(srcPath));
Writable key = (Writable) ReflectionUtils.newInstance(reader.getKeyClass(), hconf);
Text value = new Text();
while (reader.next(key, value)) {
mapper.map(key, value, context);
}
reader.close();
}
@Ignore("convenient trial tool for dev")
@Test
public void test() throws IOException, InterruptedException {
Configuration hconf = new Configuration();
BaseCuboidMapper mapper = new BaseCuboidMapper();
Context context = MockupMapContext.create(hconf, metadataUrl, cubeName, null);
mapper.setup(context);
Reader reader = new Reader(hconf, SequenceFile.Reader.file(srcPath));
Writable key = (Writable) ReflectionUtils.newInstance(reader.getKeyClass(), hconf);
Text value = new Text();
while (reader.next(key, value)) {
mapper.map(key, value, context);
}
reader.close();
}
public void map(Map<String, ByteBuffer> keys, Map<String, ByteBuffer> columns, Context context) throws IOException, InterruptedException
{
for (Entry<String, ByteBuffer> column : columns.entrySet())
{
if (!"line".equalsIgnoreCase(column.getKey()))
continue;
String value = ByteBufferUtil.string(column.getValue());
StringTokenizer itr = new StringTokenizer(value);
while (itr.hasMoreTokens())
{
word.set(itr.nextToken());
context.write(word, one);
}
}
}
@Override
public void map(NullWritable key, NullWritable value, Context context) throws IOException, InterruptedException {
int counter = 0;
System.out.println("starting mapper");
System.out.println();
for (int i = 0; i < numberOfRecords; i++) {
String keyRoot = StringUtils.leftPad(Integer.toString(r.nextInt(Short.MAX_VALUE)), 5, '0');
if (i % 1000 == 0) {
System.out.print(".");
}
for (int j = 0; j < 10; j++) {
hKey.set(Bytes.toBytes(keyRoot + "|" + runID + "|" + taskId));
kv = new KeyValue(hKey.get(), columnFamily, Bytes.toBytes("C" + j), Bytes.toBytes("counter:" + counter++ ));
context.write(hKey, kv);
}
}
System.out.println("finished mapper");
}
@Override
protected void startAligner(Mapper.Context context) throws IOException, InterruptedException {
// make command
String customArgs = HalvadeConf.getCustomArgs(context.getConfiguration(), "bwa", "mem");
String[] command = CommandGenerator.bwaMem(bin, ref, null, null, isPaired, true, threads, customArgs);
pbw = new ProcessBuilderWrapper(command, bin);
// run command
// needs to be streamed to output otherwise the process blocks ...
pbw.startProcess(null, System.err);
// check if alive.
if(!pbw.isAlive())
throw new ProcessException("BWA mem", pbw.getExitState());
pbw.getSTDINWriter();
// make a SAMstream handler
ssh = new SAMStreamHandler(instance, context, false);
ssh.start();
}
/**
* Constructor for SampleMachineConsumer - needs the Mapper Context
*
* @param context A Hadoop MapReduce Mapper.Context to which this consumer
* should writer
*/
public SampleMachineConsumer(final Context context) {
super();
ContextWriter contextWrite = new ContextWriter(context, template);
this.addDataWriter(contextWrite);
this.addDataTransformer(new SampleMachineTransformer());
exit = new AtomicBoolean(false);
handler = new JenkinsReportingHandler(exit);
currentRow = -1;
finalRow = -2;
setReportGap(1000);
}
public void map(Object key, Text value, Context context) throws IOException, InterruptedException {
String[] strs = value.toString().split("\t");
num.set(Integer.parseInt(strs[1]));
// 将次数作为key进行升序排序
context.write(num, new Text(strs[0]));
System.out.println(num.get() + "," + strs[0]);
}
public void reduce(IntWritable key, Iterable<Text> values, Context context)
throws IOException, InterruptedException {
for (Text value : values) {
// 排序后再次颠倒k-v,将日期作为key
System.out.println(value.toString() + ":" + key.get());
context.write(value, key);
}
}
@Override
protected void cleanup(Context context)
throws IOException, InterruptedException {
try {
copyin.endCopy();
} catch (SQLException ex) {
LoggingUtils.logAll(LOG, "Unable to finalize copy", ex);
throw new IOException(ex);
}
close();
}
@Override
protected void setup(Context context) throws IOException, InterruptedException {
super.setup(context);
Configuration conf = context.getConfiguration();
// Instantiate a copy of the user's class to hold and parse the record.
String recordClassName = conf.get(
ExportJobBase.SQOOP_EXPORT_TABLE_CLASS_KEY);
if (null == recordClassName) {
throw new IOException("Export table class name ("
+ ExportJobBase.SQOOP_EXPORT_TABLE_CLASS_KEY
+ ") is not set!");
}
try {
Class cls = Class.forName(recordClassName, true,
Thread.currentThread().getContextClassLoader());
recordImpl = (SqoopRecord) ReflectionUtils.newInstance(cls, conf);
} catch (ClassNotFoundException cnfe) {
throw new IOException(cnfe);
}
if (null == recordImpl) {
throw new IOException("Could not instantiate object of type "
+ recordClassName);
}
columnTypes = DefaultStringifier.load(conf, AVRO_COLUMN_TYPES_MAP,
MapWritable.class);
}
@SuppressWarnings({"rawtypes", "unchecked"})
@Test (timeout=10000)
public void testLoadMapper() throws Exception {
Configuration conf = new Configuration();
conf.setInt(JobContext.NUM_REDUCES, 2);
CompressionEmulationUtil.setCompressionEmulationEnabled(conf, true);
conf.setBoolean(MRJobConfig.MAP_OUTPUT_COMPRESS, true);
TaskAttemptID taskId = new TaskAttemptID();
RecordReader<NullWritable, GridmixRecord> reader = new FakeRecordReader();
LoadRecordGkGrWriter writer = new LoadRecordGkGrWriter();
OutputCommitter committer = new CustomOutputCommitter();
StatusReporter reporter = new TaskAttemptContextImpl.DummyReporter();
LoadSplit split = getLoadSplit();
MapContext<NullWritable, GridmixRecord, GridmixKey, GridmixRecord> mapContext = new MapContextImpl<NullWritable, GridmixRecord, GridmixKey, GridmixRecord>(
conf, taskId, reader, writer, committer, reporter, split);
// context
Context ctx = new WrappedMapper<NullWritable, GridmixRecord, GridmixKey, GridmixRecord>()
.getMapContext(mapContext);
reader.initialize(split, ctx);
ctx.getConfiguration().setBoolean(MRJobConfig.MAP_OUTPUT_COMPRESS, true);
CompressionEmulationUtil.setCompressionEmulationEnabled(
ctx.getConfiguration(), true);
LoadJob.LoadMapper mapper = new LoadJob.LoadMapper();
// setup, map, clean
mapper.run(ctx);
Map<GridmixKey, GridmixRecord> data = writer.getData();
// check result
assertEquals(2, data.size());
}
@SuppressWarnings({"unchecked", "rawtypes"})
@Test (timeout=30000)
public void testSleepMapper() throws Exception {
SleepJob.SleepMapper test = new SleepJob.SleepMapper();
Configuration conf = new Configuration();
conf.setInt(JobContext.NUM_REDUCES, 2);
CompressionEmulationUtil.setCompressionEmulationEnabled(conf, true);
conf.setBoolean(MRJobConfig.MAP_OUTPUT_COMPRESS, true);
TaskAttemptID taskId = new TaskAttemptID();
FakeRecordLLReader reader = new FakeRecordLLReader();
LoadRecordGkNullWriter writer = new LoadRecordGkNullWriter();
OutputCommitter committer = new CustomOutputCommitter();
StatusReporter reporter = new TaskAttemptContextImpl.DummyReporter();
SleepSplit split = getSleepSplit();
MapContext<LongWritable, LongWritable, GridmixKey, NullWritable> mapcontext = new MapContextImpl<LongWritable, LongWritable, GridmixKey, NullWritable>(
conf, taskId, reader, writer, committer, reporter, split);
Context context = new WrappedMapper<LongWritable, LongWritable, GridmixKey, NullWritable>()
.getMapContext(mapcontext);
long start = System.currentTimeMillis();
LOG.info("start:" + start);
LongWritable key = new LongWritable(start + 2000);
LongWritable value = new LongWritable(start + 2000);
// should slip 2 sec
test.map(key, value, context);
LOG.info("finish:" + System.currentTimeMillis());
assertTrue(System.currentTimeMillis() >= (start + 2000));
test.cleanup(context);
assertEquals(1, writer.getData().size());
}
@SuppressWarnings({"rawtypes", "unchecked"})
@Test (timeout=10000)
public void testLoadMapper() throws Exception {
Configuration conf = new Configuration();
conf.setInt(JobContext.NUM_REDUCES, 2);
CompressionEmulationUtil.setCompressionEmulationEnabled(conf, true);
conf.setBoolean(MRJobConfig.MAP_OUTPUT_COMPRESS, true);
TaskAttemptID taskId = new TaskAttemptID();
RecordReader<NullWritable, GridmixRecord> reader = new FakeRecordReader();
LoadRecordGkGrWriter writer = new LoadRecordGkGrWriter();
OutputCommitter committer = new CustomOutputCommitter();
StatusReporter reporter = new TaskAttemptContextImpl.DummyReporter();
LoadSplit split = getLoadSplit();
MapContext<NullWritable, GridmixRecord, GridmixKey, GridmixRecord> mapContext = new MapContextImpl<NullWritable, GridmixRecord, GridmixKey, GridmixRecord>(
conf, taskId, reader, writer, committer, reporter, split);
// context
Context ctx = new WrappedMapper<NullWritable, GridmixRecord, GridmixKey, GridmixRecord>()
.getMapContext(mapContext);
reader.initialize(split, ctx);
ctx.getConfiguration().setBoolean(MRJobConfig.MAP_OUTPUT_COMPRESS, true);
CompressionEmulationUtil.setCompressionEmulationEnabled(
ctx.getConfiguration(), true);
LoadJob.LoadMapper mapper = new LoadJob.LoadMapper();
// setup, map, clean
mapper.run(ctx);
Map<GridmixKey, GridmixRecord> data = writer.getData();
// check result
assertEquals(2, data.size());
}
@SuppressWarnings({"unchecked", "rawtypes"})
@Test (timeout=30000)
public void testSleepMapper() throws Exception {
SleepJob.SleepMapper test = new SleepJob.SleepMapper();
Configuration conf = new Configuration();
conf.setInt(JobContext.NUM_REDUCES, 2);
CompressionEmulationUtil.setCompressionEmulationEnabled(conf, true);
conf.setBoolean(MRJobConfig.MAP_OUTPUT_COMPRESS, true);
TaskAttemptID taskId = new TaskAttemptID();
FakeRecordLLReader reader = new FakeRecordLLReader();
LoadRecordGkNullWriter writer = new LoadRecordGkNullWriter();
OutputCommitter committer = new CustomOutputCommitter();
StatusReporter reporter = new TaskAttemptContextImpl.DummyReporter();
SleepSplit split = getSleepSplit();
MapContext<LongWritable, LongWritable, GridmixKey, NullWritable> mapcontext = new MapContextImpl<LongWritable, LongWritable, GridmixKey, NullWritable>(
conf, taskId, reader, writer, committer, reporter, split);
Context context = new WrappedMapper<LongWritable, LongWritable, GridmixKey, NullWritable>()
.getMapContext(mapcontext);
long start = System.currentTimeMillis();
LOG.info("start:" + start);
LongWritable key = new LongWritable(start + 2000);
LongWritable value = new LongWritable(start + 2000);
// should slip 2 sec
test.map(key, value, context);
LOG.info("finish:" + System.currentTimeMillis());
assertTrue(System.currentTimeMillis() >= (start + 2000));
test.cleanup(context);
assertEquals(1, writer.getData().size());
}
@Override
protected void setup(Context context) throws IOException, InterruptedException {
Configuration conf = context.getConfiguration();
decimationFactor = conf.getInt(DECIMATION_FACTOR_PROPERTY, DEFAULT_DECIMATION_FACTOR);
for (byte b = 1; b < 6; b++) {
context.write(new ImmutableBytesWritable(new byte[] {b}), new LongWritable(1));
}
timestamp = conf.getLong(DEFAULT_TIMESTAMP_PROPERTY, System.currentTimeMillis());
}
@Override
protected void map(LongWritable key, Statement value, final Context context) throws IOException, InterruptedException {
if (counter++ == next) {
next = counter + random.nextInt(decimationFactor);
for (KeyValue keyValue: HalyardTableUtils.toKeyValues(value.getSubject(), value.getPredicate(), value.getObject(), value.getContext(), false, timestamp)) {
context.write(new ImmutableBytesWritable(keyValue.getRowArray(), keyValue.getRowOffset(), keyValue.getRowLength()), new LongWritable(keyValue.getLength()));
}
}
}
@Override
public void reduce(ImmutableBytesWritable key, Iterable<LongWritable> values, Context context) throws IOException, InterruptedException {
byte region = key.get()[key.getOffset()];
if (lastRegion != region || size > splitLimit) {
byte[] split = lastRegion != region ? new byte[]{region} : key.copyBytes();
splits.add(split);
context.setStatus("#" + splits.size() + " " + Arrays.toString(split));
lastRegion = key.get()[key.getOffset()];
size = 0;
}
for (LongWritable val : values) {
size += val.get();
}
}
public SqoopDataWriter(Context context, IntermediateDataFormat<Object> f,
IntermediateDataFormat<Object> t, Matcher m) {
this.context = context;
fromIDF = f;
toIDF = t;
matcher = m;
}
@SuppressWarnings("rawtypes")
public static void cleanup(final Context context) {
closeMap();
closeCommentMap();
closeIssuesMap();
closeCommitMap();
}
/**
* Add logging in map cleanup method
*
* @param context
* - map context
* @param className
* - Class which is calling this method
* @param methodName
* - Class Method which is calling this method
*/
@SuppressWarnings(RAW_TYPES)
public static void getMapContextInfoCleanup(Context context,
String className, String methodName) {
Counter counter = context.getCounter(MAPRED_COUNTER, MAP_INPUT_RECORDS);
getLogMsg(className, methodName, counter.getDisplayName(), COUNTERS,
counter.getValue());
counter = context.getCounter(MAPRED_COUNTER, MAP_OUTPUT_RECORDS);
getLogMsg(className, methodName, counter.getDisplayName(), COUNTERS,
counter.getValue());
}
/**
* Test map method of Importer
*/
@SuppressWarnings({ "unchecked", "rawtypes" })
@Test
public void testKeyValueImporter() throws Throwable {
CellImporter importer = new CellImporter();
Configuration configuration = new Configuration();
Context ctx = mock(Context.class);
when(ctx.getConfiguration()).thenReturn(configuration);
doAnswer(new Answer<Void>() {
@Override
public Void answer(InvocationOnMock invocation) throws Throwable {
ImmutableBytesWritable writer = (ImmutableBytesWritable) invocation.getArgument(0);
MapReduceExtendedCell key = (MapReduceExtendedCell) invocation.getArgument(1);
assertEquals("Key", Bytes.toString(writer.get()));
assertEquals("row", Bytes.toString(CellUtil.cloneRow(key)));
return null;
}
}).when(ctx).write(any(), any());
importer.setup(ctx);
Result value = mock(Result.class);
KeyValue[] keys = {
new KeyValue(Bytes.toBytes("row"), Bytes.toBytes("family"), Bytes.toBytes("qualifier"),
Bytes.toBytes("value")),
new KeyValue(Bytes.toBytes("row"), Bytes.toBytes("family"), Bytes.toBytes("qualifier"),
Bytes.toBytes("value1")) };
when(value.rawCells()).thenReturn(keys);
importer.map(new ImmutableBytesWritable(Bytes.toBytes("Key")), value, ctx);
}
/**
* Test SampleUploader from examples
*/
@SuppressWarnings("unchecked")
@Test
public void testSampleUploader() throws Exception {
Configuration configuration = new Configuration();
Uploader uploader = new Uploader();
Mapper<LongWritable, Text, ImmutableBytesWritable, Put>.Context ctx = mock(Context.class);
doAnswer(new Answer<Void>() {
@Override
public Void answer(InvocationOnMock invocation) throws Throwable {
ImmutableBytesWritable writer = (ImmutableBytesWritable) invocation.getArgument(0);
Put put = (Put) invocation.getArgument(1);
assertEquals("row", Bytes.toString(writer.get()));
assertEquals("row", Bytes.toString(put.getRow()));
return null;
}
}).when(ctx).write(any(), any());
uploader.map(null, new Text("row,family,qualifier,value"), ctx);
Path dir = util.getDataTestDirOnTestFS("testSampleUploader");
String[] args = { dir.toString(), "simpleTable" };
Job job = SampleUploader.configureJob(configuration, args);
assertEquals(SequenceFileInputFormat.class, job.getInputFormatClass());
}
/**
* Test IndexBuilder from examples
*/
@SuppressWarnings("unchecked")
@Test
public void testIndexBuilder() throws Exception {
Configuration configuration = new Configuration();
String[] args = { "tableName", "columnFamily", "column1", "column2" };
IndexBuilder.configureJob(configuration, args);
assertEquals("tableName", configuration.get("index.tablename"));
assertEquals("tableName", configuration.get(TableInputFormat.INPUT_TABLE));
assertEquals("column1,column2", configuration.get("index.fields"));
Map map = new Map();
ImmutableBytesWritable rowKey = new ImmutableBytesWritable(Bytes.toBytes("test"));
Mapper<ImmutableBytesWritable, Result, ImmutableBytesWritable, Put>.Context ctx =
mock(Context.class);
when(ctx.getConfiguration()).thenReturn(configuration);
doAnswer(new Answer<Void>() {
@Override
public Void answer(InvocationOnMock invocation) throws Throwable {
ImmutableBytesWritable writer = (ImmutableBytesWritable) invocation.getArgument(0);
Put put = (Put) invocation.getArgument(1);
assertEquals("tableName-column1", Bytes.toString(writer.get()));
assertEquals("test", Bytes.toString(put.getRow()));
return null;
}
}).when(ctx).write(any(), any());
Result result = mock(Result.class);
when(result.getValue(Bytes.toBytes("columnFamily"), Bytes.toBytes("column1"))).thenReturn(
Bytes.toBytes("test"));
map.setup(ctx);
map.map(rowKey, result, ctx);
}
public void map(Long key, Row row, Context context) throws IOException, InterruptedException
{
String value = row.getString("line");
logger.debug("read {}:{}={} from {}", new Object[] {key, "line", value, context.getInputSplit()});
StringTokenizer itr = new StringTokenizer(value);
while (itr.hasMoreTokens())
{
word.set(itr.nextToken());
context.write(word, one);
}
}
public void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException
{
int sum = 0;
for (IntWritable val : values)
sum += val.get();
context.write(key, new IntWritable(sum));
}
public void reduce(Text word, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException
{
int sum = 0;
for (IntWritable val : values)
sum += val.get();
keys.put("word", ByteBufferUtil.bytes(word.toString()));
context.write(keys, getBindVariables(word, sum));
}
@Override
public void setup(Context context) {
System.out.println("starting setup");
columnFamily = Bytes.toBytes(context.getConfiguration().get(COLUMN_FAMILY));
runID = context.getConfiguration().get(RUN_ID);
taskId = context.getTaskAttemptID().getTaskID().getId();
numberOfRecords = context.getConfiguration().getInt(NUMBER_OF_RECORDS, 1000) / context.getConfiguration().getInt("nmapinputformat.num.maps", 1);
System.out.println("finished setup");
}