下面列出了怎么用org.apache.hadoop.mapreduce.lib.map.WrappedMapper的API类实例代码及写法,或者点击链接到github查看源代码。
private void ingestTestData(Configuration conf, TestFileLoader loader) throws IOException, InterruptedException {
log.debug("------------- ingestTestData -------------");
File tmpDir = new File(System.getProperty("java.io.tmpdir"));
Path tmpPath = new Path(tmpDir.toURI());
Path seqFile = new Path(tmpPath, UUID.randomUUID().toString());
TaskAttemptID id = new TaskAttemptID("testJob", 0, TaskType.MAP, 0, 0);
TaskAttemptContext context = new TaskAttemptContextImpl(conf, id);
try (final RawLocalFileSystem rfs = createSequenceFile(conf, seqFile, loader)) {
InputSplit split = new FileSplit(seqFile, 0, rfs.pathToFile(seqFile).length(), null);
EventSequenceFileRecordReader<LongWritable> rr = new EventSequenceFileRecordReader<>();
rr.initialize(split, context);
Path ocPath = new Path(tmpPath, "oc");
OutputCommitter oc = new FileOutputCommitter(ocPath, context);
rfs.deleteOnExit(ocPath);
StandaloneStatusReporter sr = new StandaloneStatusReporter();
EventMapper<LongWritable,RawRecordContainer,Text,Mutation> mapper = new EventMapper<>();
MapContext<LongWritable,RawRecordContainer,Text,Mutation> mapContext = new MapContextImpl<>(conf, id, rr, this.recordWriter, oc, sr, split);
Mapper<LongWritable,RawRecordContainer,Text,Mutation>.Context con = new WrappedMapper<LongWritable,RawRecordContainer,Text,Mutation>()
.getMapContext(mapContext);
mapper.run(con);
mapper.cleanup(con);
} finally {
this.recordWriter.close(context);
}
}
public StubContext(Configuration conf, RecordReader<Text, CopyListingFileStatus> reader, int taskId)
throws IOException, InterruptedException {
WrappedMapper<Text, CopyListingFileStatus, Text, Text> wrappedMapper = new WrappedMapper<>();
MapContextImpl<Text, CopyListingFileStatus, Text, Text> contextImpl = new MapContextImpl<>(conf,
getTaskAttemptID(taskId), reader, writer, null, reporter, null);
this.reader = reader;
mapperContext = wrappedMapper.getMapContext(contextImpl);
}
/**
* Create a map context that is based on ChainMapContext and the given record
* reader and record writer
*/
private <KEYIN, VALUEIN, KEYOUT, VALUEOUT>
Mapper<KEYIN, VALUEIN, KEYOUT, VALUEOUT>.Context createMapContext(
RecordReader<KEYIN, VALUEIN> rr, RecordWriter<KEYOUT, VALUEOUT> rw,
TaskInputOutputContext<KEYIN, VALUEIN, KEYOUT, VALUEOUT> context,
Configuration conf) {
MapContext<KEYIN, VALUEIN, KEYOUT, VALUEOUT> mapContext =
new ChainMapContextImpl<KEYIN, VALUEIN, KEYOUT, VALUEOUT>(
context, rr, rw, conf);
Mapper<KEYIN, VALUEIN, KEYOUT, VALUEOUT>.Context mapperContext =
new WrappedMapper<KEYIN, VALUEIN, KEYOUT, VALUEOUT>()
.getMapContext(mapContext);
return mapperContext;
}
@Test
public void testCloneMapContext() throws Exception {
TaskID taskId = new TaskID(jobId, TaskType.MAP, 0);
TaskAttemptID taskAttemptid = new TaskAttemptID(taskId, 0);
MapContext<IntWritable, IntWritable, IntWritable, IntWritable> mapContext =
new MapContextImpl<IntWritable, IntWritable, IntWritable, IntWritable>(
conf, taskAttemptid, null, null, null, null, null);
Mapper<IntWritable, IntWritable, IntWritable, IntWritable>.Context mapperContext =
new WrappedMapper<IntWritable, IntWritable, IntWritable, IntWritable>().getMapContext(
mapContext);
ContextFactory.cloneMapContext(mapperContext, conf, null, null);
}
public StubContext(Configuration conf,
RecordReader<Text, CopyListingFileStatus> reader, int taskId)
throws IOException, InterruptedException {
WrappedMapper<Text, CopyListingFileStatus, Text, Text> wrappedMapper
= new WrappedMapper<Text, CopyListingFileStatus, Text, Text>();
MapContextImpl<Text, CopyListingFileStatus, Text, Text> contextImpl
= new MapContextImpl<Text, CopyListingFileStatus, Text, Text>(conf,
getTaskAttemptID(taskId), reader, writer,
null, reporter, null);
this.reader = reader;
this.mapperContext = wrappedMapper.getMapContext(contextImpl);
}
@SuppressWarnings({"rawtypes", "unchecked"})
@Test (timeout=10000)
public void testLoadMapper() throws Exception {
Configuration conf = new Configuration();
conf.setInt(JobContext.NUM_REDUCES, 2);
CompressionEmulationUtil.setCompressionEmulationEnabled(conf, true);
conf.setBoolean(MRJobConfig.MAP_OUTPUT_COMPRESS, true);
TaskAttemptID taskId = new TaskAttemptID();
RecordReader<NullWritable, GridmixRecord> reader = new FakeRecordReader();
LoadRecordGkGrWriter writer = new LoadRecordGkGrWriter();
OutputCommitter committer = new CustomOutputCommitter();
StatusReporter reporter = new TaskAttemptContextImpl.DummyReporter();
LoadSplit split = getLoadSplit();
MapContext<NullWritable, GridmixRecord, GridmixKey, GridmixRecord> mapContext = new MapContextImpl<NullWritable, GridmixRecord, GridmixKey, GridmixRecord>(
conf, taskId, reader, writer, committer, reporter, split);
// context
Context ctx = new WrappedMapper<NullWritable, GridmixRecord, GridmixKey, GridmixRecord>()
.getMapContext(mapContext);
reader.initialize(split, ctx);
ctx.getConfiguration().setBoolean(MRJobConfig.MAP_OUTPUT_COMPRESS, true);
CompressionEmulationUtil.setCompressionEmulationEnabled(
ctx.getConfiguration(), true);
LoadJob.LoadMapper mapper = new LoadJob.LoadMapper();
// setup, map, clean
mapper.run(ctx);
Map<GridmixKey, GridmixRecord> data = writer.getData();
// check result
assertEquals(2, data.size());
}
@SuppressWarnings({"unchecked", "rawtypes"})
@Test (timeout=30000)
public void testSleepMapper() throws Exception {
SleepJob.SleepMapper test = new SleepJob.SleepMapper();
Configuration conf = new Configuration();
conf.setInt(JobContext.NUM_REDUCES, 2);
CompressionEmulationUtil.setCompressionEmulationEnabled(conf, true);
conf.setBoolean(MRJobConfig.MAP_OUTPUT_COMPRESS, true);
TaskAttemptID taskId = new TaskAttemptID();
FakeRecordLLReader reader = new FakeRecordLLReader();
LoadRecordGkNullWriter writer = new LoadRecordGkNullWriter();
OutputCommitter committer = new CustomOutputCommitter();
StatusReporter reporter = new TaskAttemptContextImpl.DummyReporter();
SleepSplit split = getSleepSplit();
MapContext<LongWritable, LongWritable, GridmixKey, NullWritable> mapcontext = new MapContextImpl<LongWritable, LongWritable, GridmixKey, NullWritable>(
conf, taskId, reader, writer, committer, reporter, split);
Context context = new WrappedMapper<LongWritable, LongWritable, GridmixKey, NullWritable>()
.getMapContext(mapcontext);
long start = System.currentTimeMillis();
LOG.info("start:" + start);
LongWritable key = new LongWritable(start + 2000);
LongWritable value = new LongWritable(start + 2000);
// should slip 2 sec
test.map(key, value, context);
LOG.info("finish:" + System.currentTimeMillis());
assertTrue(System.currentTimeMillis() >= (start + 2000));
test.cleanup(context);
assertEquals(1, writer.getData().size());
}
/**
* Create a map context that is based on ChainMapContext and the given record
* reader and record writer
*/
private <KEYIN, VALUEIN, KEYOUT, VALUEOUT>
Mapper<KEYIN, VALUEIN, KEYOUT, VALUEOUT>.Context createMapContext(
RecordReader<KEYIN, VALUEIN> rr, RecordWriter<KEYOUT, VALUEOUT> rw,
TaskInputOutputContext<KEYIN, VALUEIN, KEYOUT, VALUEOUT> context,
Configuration conf) {
MapContext<KEYIN, VALUEIN, KEYOUT, VALUEOUT> mapContext =
new ChainMapContextImpl<KEYIN, VALUEIN, KEYOUT, VALUEOUT>(
context, rr, rw, conf);
Mapper<KEYIN, VALUEIN, KEYOUT, VALUEOUT>.Context mapperContext =
new WrappedMapper<KEYIN, VALUEIN, KEYOUT, VALUEOUT>()
.getMapContext(mapContext);
return mapperContext;
}
@Test
public void testCloneMapContext() throws Exception {
TaskID taskId = new TaskID(jobId, TaskType.MAP, 0);
TaskAttemptID taskAttemptid = new TaskAttemptID(taskId, 0);
MapContext<IntWritable, IntWritable, IntWritable, IntWritable> mapContext =
new MapContextImpl<IntWritable, IntWritable, IntWritable, IntWritable>(
conf, taskAttemptid, null, null, null, null, null);
Mapper<IntWritable, IntWritable, IntWritable, IntWritable>.Context mapperContext =
new WrappedMapper<IntWritable, IntWritable, IntWritable, IntWritable>().getMapContext(
mapContext);
ContextFactory.cloneMapContext(mapperContext, conf, null, null);
}
public StubContext(Configuration conf,
RecordReader<Text, CopyListingFileStatus> reader, int taskId)
throws IOException, InterruptedException {
WrappedMapper<Text, CopyListingFileStatus, Text, Text> wrappedMapper
= new WrappedMapper<Text, CopyListingFileStatus, Text, Text>();
MapContextImpl<Text, CopyListingFileStatus, Text, Text> contextImpl
= new MapContextImpl<Text, CopyListingFileStatus, Text, Text>(conf,
getTaskAttemptID(taskId), reader, writer,
null, reporter, null);
this.reader = reader;
this.mapperContext = wrappedMapper.getMapContext(contextImpl);
}
@SuppressWarnings({"rawtypes", "unchecked"})
@Test (timeout=10000)
public void testLoadMapper() throws Exception {
Configuration conf = new Configuration();
conf.setInt(JobContext.NUM_REDUCES, 2);
CompressionEmulationUtil.setCompressionEmulationEnabled(conf, true);
conf.setBoolean(MRJobConfig.MAP_OUTPUT_COMPRESS, true);
TaskAttemptID taskId = new TaskAttemptID();
RecordReader<NullWritable, GridmixRecord> reader = new FakeRecordReader();
LoadRecordGkGrWriter writer = new LoadRecordGkGrWriter();
OutputCommitter committer = new CustomOutputCommitter();
StatusReporter reporter = new TaskAttemptContextImpl.DummyReporter();
LoadSplit split = getLoadSplit();
MapContext<NullWritable, GridmixRecord, GridmixKey, GridmixRecord> mapContext = new MapContextImpl<NullWritable, GridmixRecord, GridmixKey, GridmixRecord>(
conf, taskId, reader, writer, committer, reporter, split);
// context
Context ctx = new WrappedMapper<NullWritable, GridmixRecord, GridmixKey, GridmixRecord>()
.getMapContext(mapContext);
reader.initialize(split, ctx);
ctx.getConfiguration().setBoolean(MRJobConfig.MAP_OUTPUT_COMPRESS, true);
CompressionEmulationUtil.setCompressionEmulationEnabled(
ctx.getConfiguration(), true);
LoadJob.LoadMapper mapper = new LoadJob.LoadMapper();
// setup, map, clean
mapper.run(ctx);
Map<GridmixKey, GridmixRecord> data = writer.getData();
// check result
assertEquals(2, data.size());
}
@SuppressWarnings({"unchecked", "rawtypes"})
@Test (timeout=30000)
public void testSleepMapper() throws Exception {
SleepJob.SleepMapper test = new SleepJob.SleepMapper();
Configuration conf = new Configuration();
conf.setInt(JobContext.NUM_REDUCES, 2);
CompressionEmulationUtil.setCompressionEmulationEnabled(conf, true);
conf.setBoolean(MRJobConfig.MAP_OUTPUT_COMPRESS, true);
TaskAttemptID taskId = new TaskAttemptID();
FakeRecordLLReader reader = new FakeRecordLLReader();
LoadRecordGkNullWriter writer = new LoadRecordGkNullWriter();
OutputCommitter committer = new CustomOutputCommitter();
StatusReporter reporter = new TaskAttemptContextImpl.DummyReporter();
SleepSplit split = getSleepSplit();
MapContext<LongWritable, LongWritable, GridmixKey, NullWritable> mapcontext = new MapContextImpl<LongWritable, LongWritable, GridmixKey, NullWritable>(
conf, taskId, reader, writer, committer, reporter, split);
Context context = new WrappedMapper<LongWritable, LongWritable, GridmixKey, NullWritable>()
.getMapContext(mapcontext);
long start = System.currentTimeMillis();
LOG.info("start:" + start);
LongWritable key = new LongWritable(start + 2000);
LongWritable value = new LongWritable(start + 2000);
// should slip 2 sec
test.map(key, value, context);
LOG.info("finish:" + System.currentTimeMillis());
assertTrue(System.currentTimeMillis() >= (start + 2000));
test.cleanup(context);
assertEquals(1, writer.getData().size());
}
private String getInputsplitHelper(Context context) {
try {
Field mapContextField = WrappedMapper.Context.class.getDeclaredField("mapContext");
mapContextField.setAccessible(true);
Path[] inputPaths = ((CombineFileSplit) ((MapContextImpl) mapContextField.get((WrappedMapper.Context) context))
.getInputSplit()).getPaths();
return Arrays.toString(inputPaths);
} catch (NoSuchFieldException | IllegalAccessException ie) {
throw new RuntimeException(ie);
}
}
/**
* Setting progress within implementation of {@link Mapper} for reporting progress.
* Gobblin (when running in MR mode) used to report progress only in {@link GobblinWorkUnitsInputFormat} while
* deserializing {@link WorkUnit} in MapReduce job. In that scenario, whenever workunit is deserialized (but not yet
* executed) the progress will be reported as 1.0f. This could implicitly disable the feature of speculative-execution
* provided by MR-framework as the latter is looking at the progress to determine if speculative-execution is necessary
* to trigger or not.
*
* Different application of Gobblin should have customized logic on calculating progress.
*/
void setProgressInMapper(float progress, Context context) {
try {
WrappedMapper.Context wrappedContext = ((WrappedMapper.Context) context);
Object contextImpl = RestrictedFieldAccessingUtils.getRestrictedFieldByReflection(wrappedContext, "mapContext", wrappedContext.getClass());
((org.apache.hadoop.mapred.Task.TaskReporter)RestrictedFieldAccessingUtils
.getRestrictedFieldByReflectionRecursively(contextImpl, "reporter", MapContextImpl.class)).setProgress(progress);
} catch (NoSuchFieldException | IllegalAccessException e) {
throw new RuntimeException(e);
}
}
@SuppressWarnings("unchecked")
private <INKEY,INVALUE,OUTKEY,OUTVALUE>
void runNewMapper(final JobConf job,
final TaskSplitIndex splitIndex,
final TaskUmbilicalProtocol umbilical,
TaskReporter reporter
) throws IOException, ClassNotFoundException,
InterruptedException {
// make a task context so we can get the classes
org.apache.hadoop.mapreduce.TaskAttemptContext taskContext =
new org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl(job,
getTaskID(),
reporter);
// make a mapper
org.apache.hadoop.mapreduce.Mapper<INKEY,INVALUE,OUTKEY,OUTVALUE> mapper =
(org.apache.hadoop.mapreduce.Mapper<INKEY,INVALUE,OUTKEY,OUTVALUE>)
ReflectionUtils.newInstance(taskContext.getMapperClass(), job);
// make the input format
org.apache.hadoop.mapreduce.InputFormat<INKEY,INVALUE> inputFormat =
(org.apache.hadoop.mapreduce.InputFormat<INKEY,INVALUE>)
ReflectionUtils.newInstance(taskContext.getInputFormatClass(), job);
// rebuild the input split
org.apache.hadoop.mapreduce.InputSplit split = null;
split = getSplitDetails(new Path(splitIndex.getSplitLocation()),
splitIndex.getStartOffset());
LOG.info("Processing split: " + split);
org.apache.hadoop.mapreduce.RecordReader<INKEY,INVALUE> input =
new NewTrackingRecordReader<INKEY,INVALUE>
(split, inputFormat, reporter, taskContext);
job.setBoolean(JobContext.SKIP_RECORDS, isSkipping());
org.apache.hadoop.mapreduce.RecordWriter output = null;
// get an output object
if (job.getNumReduceTasks() == 0) {
output =
new NewDirectOutputCollector(taskContext, job, umbilical, reporter);
} else {
output = new NewOutputCollector(taskContext, job, umbilical, reporter);
}
org.apache.hadoop.mapreduce.MapContext<INKEY, INVALUE, OUTKEY, OUTVALUE>
mapContext =
new MapContextImpl<INKEY, INVALUE, OUTKEY, OUTVALUE>(job, getTaskID(),
input, output,
committer,
reporter, split);
org.apache.hadoop.mapreduce.Mapper<INKEY,INVALUE,OUTKEY,OUTVALUE>.Context
mapperContext =
new WrappedMapper<INKEY, INVALUE, OUTKEY, OUTVALUE>().getMapContext(
mapContext);
try {
input.initialize(split, mapperContext);
mapper.run(mapperContext);
mapPhase.complete();
setPhase(TaskStatus.Phase.SORT);
statusUpdate(umbilical);
input.close();
input = null;
output.close(mapperContext);
output = null;
} finally {
closeQuietly(input);
closeQuietly(output, mapperContext);
}
}
@SuppressWarnings("unchecked")
private <INKEY,INVALUE,OUTKEY,OUTVALUE>
void runNewMapper(final JobConf job,
final TaskSplitIndex splitIndex,
final TaskUmbilicalProtocol umbilical,
TaskReporter reporter
) throws IOException, ClassNotFoundException,
InterruptedException {
// make a task context so we can get the classes
org.apache.hadoop.mapreduce.TaskAttemptContext taskContext =
new org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl(job,
getTaskID(),
reporter);
// make a mapper
org.apache.hadoop.mapreduce.Mapper<INKEY,INVALUE,OUTKEY,OUTVALUE> mapper =
(org.apache.hadoop.mapreduce.Mapper<INKEY,INVALUE,OUTKEY,OUTVALUE>)
ReflectionUtils.newInstance(taskContext.getMapperClass(), job);
// make the input format
org.apache.hadoop.mapreduce.InputFormat<INKEY,INVALUE> inputFormat =
(org.apache.hadoop.mapreduce.InputFormat<INKEY,INVALUE>)
ReflectionUtils.newInstance(taskContext.getInputFormatClass(), job);
// rebuild the input split
org.apache.hadoop.mapreduce.InputSplit split = null;
split = getSplitDetails(new Path(splitIndex.getSplitLocation()),
splitIndex.getStartOffset());
LOG.info("Processing split: " + split);
org.apache.hadoop.mapreduce.RecordReader<INKEY,INVALUE> input =
new NewTrackingRecordReader<INKEY,INVALUE>
(split, inputFormat, reporter, taskContext);
job.setBoolean(JobContext.SKIP_RECORDS, isSkipping());
org.apache.hadoop.mapreduce.RecordWriter output = null;
// get an output object
if (job.getNumReduceTasks() == 0) {
output =
new NewDirectOutputCollector(taskContext, job, umbilical, reporter);
} else {
output = new NewOutputCollector(taskContext, job, umbilical, reporter);
}
org.apache.hadoop.mapreduce.MapContext<INKEY, INVALUE, OUTKEY, OUTVALUE>
mapContext =
new MapContextImpl<INKEY, INVALUE, OUTKEY, OUTVALUE>(job, getTaskID(),
input, output,
committer,
reporter, split);
org.apache.hadoop.mapreduce.Mapper<INKEY,INVALUE,OUTKEY,OUTVALUE>.Context
mapperContext =
new WrappedMapper<INKEY, INVALUE, OUTKEY, OUTVALUE>().getMapContext(
mapContext);
try {
input.initialize(split, mapperContext);
mapper.run(mapperContext);
mapPhase.complete();
setPhase(TaskStatus.Phase.SORT);
statusUpdate(umbilical);
input.close();
input = null;
output.close(mapperContext);
output = null;
} finally {
closeQuietly(input);
closeQuietly(output, mapperContext);
}
}
@Override
public boolean inIllustrator(Context context) {
return ((WrappedMapper.Context)context).getConfiguration().get("inIllustrator")!=null;
}
private void runNewMapper(final JobConf job,
MRTaskReporter reporter,
final MRInputLegacy in,
KeyValueWriter out
) throws IOException, InterruptedException {
// Initialize input in-line since it sets parameters which may be used by the processor.
// Done only for MRInput.
// TODO use new method in MRInput to get required info
//in.initialize(job, master);
// make a task context so we can get the classes
org.apache.hadoop.mapreduce.TaskAttemptContext taskContext =
getTaskAttemptContext();
// make a mapper
org.apache.hadoop.mapreduce.Mapper mapper;
try {
mapper = (org.apache.hadoop.mapreduce.Mapper)
ReflectionUtils.newInstance(taskContext.getMapperClass(), job);
} catch (ClassNotFoundException cnfe) {
throw new IOException(cnfe);
}
org.apache.hadoop.mapreduce.RecordReader input =
new NewRecordReader(in);
org.apache.hadoop.mapreduce.RecordWriter output =
new NewOutputCollector(out);
org.apache.hadoop.mapreduce.InputSplit split = in.getNewInputSplit();
updateJobWithSplit(job, split);
org.apache.hadoop.mapreduce.MapContext
mapContext =
new MapContextImpl(
job, taskAttemptId,
input, output,
committer,
processorContext, split, reporter);
org.apache.hadoop.mapreduce.Mapper.Context mapperContext =
new WrappedMapper().getMapContext(mapContext);
input.initialize(split, mapperContext);
mapper.run(mapperContext);
// Set progress to 1.0f if there was no exception,
reporter.setProgress(1.0f);
this.statusUpdate();
input.close();
output.close(mapperContext);
}
private void runNewMapper(final JobConf job,
MRTaskReporter reporter,
final MRInputLegacy in,
KeyValueWriter out
) throws IOException, InterruptedException {
// Initialize input in-line since it sets parameters which may be used by the processor.
// Done only for MRInput.
// TODO use new method in MRInput to get required info
//in.initialize(job, master);
// make a task context so we can get the classes
org.apache.hadoop.mapreduce.TaskAttemptContext taskContext =
getTaskAttemptContext();
// make a mapper
org.apache.hadoop.mapreduce.Mapper mapper;
try {
mapper = (org.apache.hadoop.mapreduce.Mapper)
ReflectionUtils.newInstance(taskContext.getMapperClass(), job);
} catch (ClassNotFoundException cnfe) {
throw new IOException(cnfe);
}
org.apache.hadoop.mapreduce.RecordReader input =
new NewRecordReader(in);
org.apache.hadoop.mapreduce.RecordWriter output =
new NewOutputCollector(out);
org.apache.hadoop.mapreduce.InputSplit split = in.getNewInputSplit();
updateJobWithSplit(job, split);
org.apache.hadoop.mapreduce.MapContext
mapContext =
new MapContextImpl(
job, taskAttemptId,
input, output,
committer,
processorContext, split, reporter);
org.apache.hadoop.mapreduce.Mapper.Context mapperContext =
new WrappedMapper().getMapContext(mapContext);
input.initialize(split, mapperContext);
mapper.run(mapperContext);
// Set progress to 1.0f if there was no exception,
reporter.setProgress(1.0f);
this.statusUpdate();
input.close();
output.close(mapperContext);
}
/**
*
* Get mapper's illustrator context
*
* @param conf Configuration
* @param input Input bag to serve as data source
* @param output Map output buffer
* @param split the split
* @return Illustrator's context
* @throws IOException
* @throws InterruptedException
*/
@Override
public Context getIllustratorContext(Configuration conf, DataBag input,
List<Pair<PigNullableWritable, Writable>> output, InputSplit split)
throws IOException, InterruptedException {
org.apache.hadoop.mapreduce.Mapper.Context mapperContext = new WrappedMapper<Text, Tuple, PigNullableWritable, Writable>().getMapContext(new IllustratorContext(conf, input, output, split));
return mapperContext;
}
public static Context create(final Configuration hconf, String metadataUrl, String cubeName, final Object[] outKV) {
hconf.set(BatchConstants.CFG_CUBE_NAME, cubeName);
return new WrappedMapper().getMapContext(new MockupMapContext(hconf, outKV));
}
public static Context create(final Configuration hconf, String cubeName, final Object[] outKV) {
hconf.set(BatchConstants.CFG_CUBE_NAME, cubeName);
return new WrappedMapper().getMapContext(new MockupMapContext(hconf, outKV));
}
public static Context create(final Configuration hconf, String metadataUrl, String cubeName, final Object[] outKV) {
hconf.set(BatchConstants.CFG_CUBE_NAME, cubeName);
return new WrappedMapper().getMapContext(new MockupMapContext(hconf, outKV));
}
public static Context create(final Configuration hconf, String cubeName, final Object[] outKV) {
hconf.set(BatchConstants.CFG_CUBE_NAME, cubeName);
return new WrappedMapper().getMapContext(new MockupMapContext(hconf, outKV));
}