下面列出了怎么用org.apache.hadoop.mapreduce.OutputCommitter的API类实例代码及写法,或者点击链接到github查看源代码。
@SuppressWarnings("unchecked")
NewCombinerRunner(Class reducerClass,
JobConf job,
org.apache.hadoop.mapreduce.TaskAttemptID taskId,
org.apache.hadoop.mapreduce.TaskAttemptContext context,
Counters.Counter inputCounter,
TaskReporter reporter,
org.apache.hadoop.mapreduce.OutputCommitter committer) {
super(inputCounter, job, reporter);
this.reducerClass = reducerClass;
this.taskId = taskId;
keyClass = (Class<K>) context.getMapOutputKeyClass();
valueClass = (Class<V>) context.getMapOutputValueClass();
comparator = (RawComparator<K>) context.getCombinerKeyGroupingComparator();
this.committer = committer;
}
@Test(timeout=20000)
public void testCommitJobFailsJob() throws Exception {
Configuration conf = new Configuration();
conf.set(MRJobConfig.MR_AM_STAGING_DIR, stagingDir);
AsyncDispatcher dispatcher = new AsyncDispatcher();
dispatcher.init(conf);
dispatcher.start();
CyclicBarrier syncBarrier = new CyclicBarrier(2);
OutputCommitter committer = new TestingOutputCommitter(syncBarrier, false);
CommitterEventHandler commitHandler =
createCommitterEventHandler(dispatcher, committer);
commitHandler.init(conf);
commitHandler.start();
JobImpl job = createRunningStubbedJob(conf, dispatcher, 2, null);
completeJobTasks(job);
assertJobState(job, JobStateInternal.COMMITTING);
// let the committer fail and verify the job fails
syncBarrier.await();
assertJobState(job, JobStateInternal.FAILED);
dispatcher.stop();
commitHandler.stop();
}
@Test(timeout=20000)
public void testCommitJobFailsJob() throws Exception {
Configuration conf = new Configuration();
conf.set(MRJobConfig.MR_AM_STAGING_DIR, stagingDir);
AsyncDispatcher dispatcher = new AsyncDispatcher();
dispatcher.init(conf);
dispatcher.start();
CyclicBarrier syncBarrier = new CyclicBarrier(2);
OutputCommitter committer = new TestingOutputCommitter(syncBarrier, false);
CommitterEventHandler commitHandler =
createCommitterEventHandler(dispatcher, committer);
commitHandler.init(conf);
commitHandler.start();
JobImpl job = createRunningStubbedJob(conf, dispatcher, 2, null);
completeJobTasks(job);
assertJobState(job, JobStateInternal.COMMITTING);
// let the committer fail and verify the job fails
syncBarrier.await();
assertJobState(job, JobStateInternal.FAILED);
dispatcher.stop();
commitHandler.stop();
}
@SuppressWarnings("rawtypes")
public TestJob(JobId jobId, ApplicationAttemptId applicationAttemptId,
Configuration conf, EventHandler eventHandler,
TaskAttemptListener taskAttemptListener, Clock clock,
OutputCommitter committer, boolean newApiCommitter,
String user, AppContext appContext,
JobStateInternal forcedState, String diagnostic) {
super(jobId, getApplicationAttemptId(applicationId, getStartCount()),
conf, eventHandler, taskAttemptListener,
new JobTokenSecretManager(), new Credentials(), clock,
getCompletedTaskFromPreviousRun(), metrics, committer,
newApiCommitter, user, System.currentTimeMillis(), getAllAMInfos(),
appContext, forcedState, diagnostic);
// This "this leak" is okay because the retained pointer is in an
// instance variable.
localStateMachine = localFactory.make(this);
}
@Test
public void testOpen() throws Exception {
OutputFormat<String, Long> dummyOutputFormat = mock(DummyOutputFormat.class);
OutputCommitter outputCommitter = setupOutputCommitter(true);
when(dummyOutputFormat.getOutputCommitter(any(TaskAttemptContext.class))).thenReturn(outputCommitter);
HadoopOutputFormat<String, Long> hadoopOutputFormat = setupHadoopOutputFormat(dummyOutputFormat,
Job.getInstance(), new DummyRecordWriter(), setupOutputCommitter(true), new Configuration());
hadoopOutputFormat.open(1, 4);
verify(hadoopOutputFormat.outputCommitter, times(1)).setupJob(any(JobContext.class));
verify(hadoopOutputFormat.mapreduceOutputFormat, times(1)).getRecordWriter(any(TaskAttemptContext.class));
}
@Test
public void testCloseWithNeedsTaskCommitTrue() throws Exception {
RecordWriter<String, Long> recordWriter = Mockito.mock(DummyRecordWriter.class);
OutputCommitter outputCommitter = setupOutputCommitter(true);
HadoopOutputFormat<String, Long> hadoopOutputFormat = setupHadoopOutputFormat(new DummyOutputFormat(),
Job.getInstance(), recordWriter, outputCommitter, new Configuration());
hadoopOutputFormat.close();
verify(outputCommitter, times(1)).commitTask(nullable(TaskAttemptContext.class));
verify(recordWriter, times(1)).close(nullable(TaskAttemptContext.class));
}
@Test
public void testCloseWithNeedsTaskCommitFalse() throws Exception {
RecordWriter<String, Long> recordWriter = Mockito.mock(DummyRecordWriter.class);
OutputCommitter outputCommitter = setupOutputCommitter(false);
HadoopOutputFormat<String, Long> hadoopOutputFormat = setupHadoopOutputFormat(new DummyOutputFormat(),
Job.getInstance(), recordWriter, outputCommitter, new Configuration());
hadoopOutputFormat.close();
verify(outputCommitter, times(0)).commitTask(nullable(TaskAttemptContext.class));
verify(recordWriter, times(1)).close(nullable(TaskAttemptContext.class));
}
private OutputCommitter setupOutputCommitter(boolean needsTaskCommit) throws IOException {
OutputCommitter outputCommitter = Mockito.mock(OutputCommitter.class);
when(outputCommitter.needsTaskCommit(nullable(TaskAttemptContext.class))).thenReturn(needsTaskCommit);
doNothing().when(outputCommitter).commitTask(any(TaskAttemptContext.class));
return outputCommitter;
}
private HadoopOutputFormat<String, Long> setupHadoopOutputFormat(
OutputFormat<String, Long> outputFormat,
Job job,
RecordWriter<String, Long> recordWriter,
OutputCommitter outputCommitter,
Configuration configuration) {
HadoopOutputFormat<String, Long> hadoopOutputFormat = new HadoopOutputFormat<>(outputFormat, job);
hadoopOutputFormat.recordWriter = recordWriter;
hadoopOutputFormat.outputCommitter = outputCommitter;
hadoopOutputFormat.configuration = configuration;
hadoopOutputFormat.configuration.set(MAPRED_OUTPUT_DIR_KEY, MAPRED_OUTPUT_PATH);
return hadoopOutputFormat;
}
@Override
public OutputCommitter getOutputCommitter(TaskAttemptContext context) throws IOException, InterruptedException {
final OutputCommitter outputCommitter = Mockito.mock(OutputCommitter.class);
doNothing().when(outputCommitter).setupJob(any(JobContext.class));
return outputCommitter;
}
public MapContextImpl(Configuration conf, TaskAttemptID taskid,
RecordReader<KEYIN,VALUEIN> reader,
RecordWriter<KEYOUT,VALUEOUT> writer,
OutputCommitter committer,
TaskContext context,
InputSplit split, Reporter reporter) {
super(conf, taskid, writer, committer, context, reporter);
this.reader = reader;
this.split = split;
}
@Test
public void testCloseWithNeedsTaskCommitFalse() throws Exception {
RecordWriter<String, Long> recordWriter = Mockito.mock(DummyRecordWriter.class);
OutputCommitter outputCommitter = setupOutputCommitter(false);
HadoopOutputFormat<String, Long> hadoopOutputFormat = setupHadoopOutputFormat(new DummyOutputFormat(),
Job.getInstance(), recordWriter, outputCommitter, new Configuration());
hadoopOutputFormat.close();
verify(outputCommitter, times(0)).commitTask(nullable(TaskAttemptContext.class));
verify(recordWriter, times(1)).close(nullable(TaskAttemptContext.class));
}
private OutputCommitter setupOutputCommitter(boolean needsTaskCommit) throws IOException {
OutputCommitter outputCommitter = Mockito.mock(OutputCommitter.class);
when(outputCommitter.needsTaskCommit(nullable(TaskAttemptContext.class))).thenReturn(needsTaskCommit);
doNothing().when(outputCommitter).commitTask(any(TaskAttemptContext.class));
return outputCommitter;
}
@Override
public boolean needsTaskCommit(TaskAttemptContext taskContext) throws IOException {
boolean result = false;
for (OutputCommitter committer : committers) {
result |= committer.needsTaskCommit(taskContext);
}
return result;
}
/**
* Instantiates MapContext under Hadoop 1 and MapContextImpl under Hadoop 2.
*/
public static MapContext newMapContext(Configuration conf,
TaskAttemptID taskAttemptID,
RecordReader recordReader,
RecordWriter recordWriter,
OutputCommitter outputCommitter,
StatusReporter statusReporter,
InputSplit inputSplit) {
return (MapContext) newInstance(MAP_CONTEXT_CONSTRUCTOR,
conf, taskAttemptID, recordReader, recordWriter, outputCommitter,
statusReporter, inputSplit);
}
private Path getTaskOutputPath(TaskAttemptContext conf) throws IOException {
Path taskOutputPath;
OutputCommitter committer = getOutputCommitter(conf);
if (committer instanceof FileOutputCommitter) {
taskOutputPath = ((FileOutputCommitter) committer).getWorkPath();
} else {
Path outputPaht = getOutputPath(conf);
if (outputPaht == null) {
throw new IOException("Undefined job output path.");
}
taskOutputPath = outputPaht;
}
return taskOutputPath;
}
private Path getTaskOutputPath(TaskAttemptContext conf) throws IOException {
Path taskOutputPath;
OutputCommitter committer = getOutputCommitter(conf);
if (committer instanceof FileOutputCommitter) {
taskOutputPath = ((FileOutputCommitter) committer).getWorkPath();
} else {
Path outputPaht = getOutputPath(conf);
if (outputPaht == null) {
throw new IOException("Undefined job output path.");
}
taskOutputPath = outputPaht;
}
return taskOutputPath;
}
@Test
public void testOpen() throws Exception {
OutputFormat<String, Long> dummyOutputFormat = mock(DummyOutputFormat.class);
OutputCommitter outputCommitter = setupOutputCommitter(true);
when(dummyOutputFormat.getOutputCommitter(any(TaskAttemptContext.class))).thenReturn(outputCommitter);
HadoopOutputFormat<String, Long> hadoopOutputFormat = setupHadoopOutputFormat(dummyOutputFormat,
Job.getInstance(), new DummyRecordWriter(), setupOutputCommitter(true), new Configuration());
hadoopOutputFormat.open(1, 4);
verify(hadoopOutputFormat.outputCommitter, times(1)).setupJob(any(JobContext.class));
verify(hadoopOutputFormat.mapreduceOutputFormat, times(1)).getRecordWriter(any(TaskAttemptContext.class));
}
private void writeBadOutput(TaskAttempt attempt, Configuration conf)
throws Exception {
TaskAttemptContext tContext = new TaskAttemptContextImpl(conf,
TypeConverter.fromYarn(attempt.getID()));
TextOutputFormat<?, ?> theOutputFormat = new TextOutputFormat();
RecordWriter theRecordWriter = theOutputFormat
.getRecordWriter(tContext);
NullWritable nullWritable = NullWritable.get();
try {
theRecordWriter.write(key2, val2);
theRecordWriter.write(null, nullWritable);
theRecordWriter.write(null, val2);
theRecordWriter.write(nullWritable, val1);
theRecordWriter.write(key1, nullWritable);
theRecordWriter.write(key2, null);
theRecordWriter.write(null, null);
theRecordWriter.write(key1, val1);
} finally {
theRecordWriter.close(tContext);
}
OutputFormat outputFormat = ReflectionUtils.newInstance(
tContext.getOutputFormatClass(), conf);
OutputCommitter committer = outputFormat.getOutputCommitter(tContext);
committer.commitTask(tContext);
}
@SuppressWarnings("unchecked")
protected static <INKEY,INVALUE,OUTKEY,OUTVALUE>
org.apache.hadoop.mapreduce.Reducer<INKEY,INVALUE,OUTKEY,OUTVALUE>.Context
createReduceContext(org.apache.hadoop.mapreduce.Reducer
<INKEY,INVALUE,OUTKEY,OUTVALUE> reducer,
Configuration job,
org.apache.hadoop.mapreduce.TaskAttemptID taskId,
RawKeyValueIterator rIter,
org.apache.hadoop.mapreduce.Counter inputKeyCounter,
org.apache.hadoop.mapreduce.Counter inputValueCounter,
org.apache.hadoop.mapreduce.RecordWriter<OUTKEY,OUTVALUE> output,
org.apache.hadoop.mapreduce.OutputCommitter committer,
org.apache.hadoop.mapreduce.StatusReporter reporter,
RawComparator<INKEY> comparator,
Class<INKEY> keyClass, Class<INVALUE> valueClass
) throws IOException, InterruptedException {
org.apache.hadoop.mapreduce.ReduceContext<INKEY, INVALUE, OUTKEY, OUTVALUE>
reduceContext =
new ReduceContextImpl<INKEY, INVALUE, OUTKEY, OUTVALUE>(job, taskId,
rIter,
inputKeyCounter,
inputValueCounter,
output,
committer,
reporter,
comparator,
keyClass,
valueClass);
org.apache.hadoop.mapreduce.Reducer<INKEY,INVALUE,OUTKEY,OUTVALUE>.Context
reducerContext =
new WrappedReducer<INKEY, INVALUE, OUTKEY, OUTVALUE>().getReducerContext(
reduceContext);
return reducerContext;
}
@Override
public OutputCommitter getOutputCommitter(TaskAttemptContext
taskattemptcontext) throws IOException, InterruptedException {
setupUdfEnvAndStores(taskattemptcontext);
// we return an instance of PigOutputCommitter to Hadoop - this instance
// will wrap the real OutputCommitter(s) belonging to the store(s)
return new PigOutputCommitter(taskattemptcontext,
mapStores,
reduceStores);
}
private HadoopOutputFormat<String, Long> setupHadoopOutputFormat(
OutputFormat<String, Long> outputFormat,
Job job,
RecordWriter<String, Long> recordWriter,
OutputCommitter outputCommitter,
Configuration configuration) {
HadoopOutputFormat<String, Long> hadoopOutputFormat = new HadoopOutputFormat<>(outputFormat, job);
hadoopOutputFormat.recordWriter = recordWriter;
hadoopOutputFormat.outputCommitter = outputCommitter;
hadoopOutputFormat.configuration = configuration;
hadoopOutputFormat.configuration.set(MAPRED_OUTPUT_DIR_KEY, MAPRED_OUTPUT_PATH);
return hadoopOutputFormat;
}
public TaskInputOutputContextImpl(Configuration conf, TaskAttemptID taskid,
RecordWriter<KEYOUT,VALUEOUT> output,
OutputCommitter committer,
TaskContext context, Reporter reporter) {
super(conf, taskid, context.getCounters(), reporter);
this.output = output;
this.committer = committer;
}
@Override
public OutputCommitter getOutputCommitter(
TaskAttemptContext taskattemptcontext) throws IOException,
InterruptedException {
setupUdfEnvAndStores(taskattemptcontext);
// we return an instance of PigOutputCommitterTez (PIG-4202) to Hadoop - this instance
// will wrap the real OutputCommitter(s) belonging to the store(s)
return new PigOutputCommitterTez(taskattemptcontext,
mapStores,
reduceStores);
}
@Test
public void testCloseWithNeedsTaskCommitFalse() throws Exception {
RecordWriter<String, Long> recordWriter = Mockito.mock(DummyRecordWriter.class);
OutputCommitter outputCommitter = setupOutputCommitter(false);
HadoopOutputFormat<String, Long> hadoopOutputFormat = setupHadoopOutputFormat(new DummyOutputFormat(),
Job.getInstance(), recordWriter, outputCommitter, new Configuration());
hadoopOutputFormat.close();
verify(outputCommitter, times(0)).commitTask(nullable(TaskAttemptContext.class));
verify(recordWriter, times(1)).close(nullable(TaskAttemptContext.class));
}
@Test
public void testJobNoTasks() {
Configuration conf = new Configuration();
conf.setInt(MRJobConfig.NUM_REDUCES, 0);
conf.set(MRJobConfig.MR_AM_STAGING_DIR, stagingDir);
conf.set(MRJobConfig.WORKFLOW_ID, "testId");
conf.set(MRJobConfig.WORKFLOW_NAME, "testName");
conf.set(MRJobConfig.WORKFLOW_NODE_NAME, "testNodeName");
conf.set(MRJobConfig.WORKFLOW_ADJACENCY_PREFIX_STRING + "key1", "value1");
conf.set(MRJobConfig.WORKFLOW_ADJACENCY_PREFIX_STRING + "key2", "value2");
conf.set(MRJobConfig.WORKFLOW_TAGS, "tag1,tag2");
AsyncDispatcher dispatcher = new AsyncDispatcher();
dispatcher.init(conf);
dispatcher.start();
OutputCommitter committer = mock(OutputCommitter.class);
CommitterEventHandler commitHandler =
createCommitterEventHandler(dispatcher, committer);
commitHandler.init(conf);
commitHandler.start();
JobSubmittedEventHandler jseHandler = new JobSubmittedEventHandler("testId",
"testName", "testNodeName", "\"key2\"=\"value2\" \"key1\"=\"value1\" ",
"tag1,tag2");
dispatcher.register(EventType.class, jseHandler);
JobImpl job = createStubbedJob(conf, dispatcher, 0, null);
job.handle(new JobEvent(job.getID(), JobEventType.JOB_INIT));
assertJobState(job, JobStateInternal.INITED);
job.handle(new JobStartEvent(job.getID()));
assertJobState(job, JobStateInternal.SUCCEEDED);
dispatcher.stop();
commitHandler.stop();
try {
Assert.assertTrue(jseHandler.getAssertValue());
} catch (InterruptedException e) {
Assert.fail("Workflow related attributes are not tested properly");
}
}
@Test(timeout=20000)
public void testKilledDuringKillAbort() throws Exception {
Configuration conf = new Configuration();
conf.set(MRJobConfig.MR_AM_STAGING_DIR, stagingDir);
AsyncDispatcher dispatcher = new AsyncDispatcher();
dispatcher.init(conf);
dispatcher.start();
OutputCommitter committer = new StubbedOutputCommitter() {
@Override
public synchronized void abortJob(JobContext jobContext, State state)
throws IOException {
while (!Thread.interrupted()) {
try {
wait();
} catch (InterruptedException e) {
}
}
}
};
CommitterEventHandler commitHandler =
createCommitterEventHandler(dispatcher, committer);
commitHandler.init(conf);
commitHandler.start();
JobImpl job = createStubbedJob(conf, dispatcher, 2, null);
JobId jobId = job.getID();
job.handle(new JobEvent(jobId, JobEventType.JOB_INIT));
assertJobState(job, JobStateInternal.INITED);
job.handle(new JobStartEvent(jobId));
assertJobState(job, JobStateInternal.SETUP);
job.handle(new JobEvent(jobId, JobEventType.JOB_KILL));
assertJobState(job, JobStateInternal.KILL_ABORT);
job.handle(new JobEvent(jobId, JobEventType.JOB_KILL));
assertJobState(job, JobStateInternal.KILLED);
dispatcher.stop();
commitHandler.stop();
}
@Override
public synchronized OutputCommitter getOutputCommitter(TaskAttemptContext context) throws IOException {
if (this.committer == null) {
this.committer = new CompactorOutputCommitter(FileOutputFormat.getOutputPath(context), context);
}
return this.committer;
}
@Test (timeout=10000)
public void testFailAbortDoesntHang() throws IOException {
Configuration conf = new Configuration();
conf.set(MRJobConfig.MR_AM_STAGING_DIR, stagingDir);
conf.set(MRJobConfig.MR_AM_COMMITTER_CANCEL_TIMEOUT_MS, "1000");
DrainDispatcher dispatcher = new DrainDispatcher();
dispatcher.init(conf);
dispatcher.start();
OutputCommitter committer = Mockito.mock(OutputCommitter.class);
CommitterEventHandler commitHandler =
createCommitterEventHandler(dispatcher, committer);
commitHandler.init(conf);
commitHandler.start();
//Job has only 1 mapper task. No reducers
conf.setInt(MRJobConfig.NUM_REDUCES, 0);
conf.setInt(MRJobConfig.MAP_MAX_ATTEMPTS, 1);
JobImpl job = createRunningStubbedJob(conf, dispatcher, 1, null);
//Fail / finish all the tasks. This should land the JobImpl directly in the
//FAIL_ABORT state
for(Task t: job.tasks.values()) {
TaskImpl task = (TaskImpl) t;
task.handle(new TaskEvent(task.getID(), TaskEventType.T_SCHEDULE));
for(TaskAttempt ta: task.getAttempts().values()) {
task.handle(new TaskTAttemptEvent(ta.getID(),
TaskEventType.T_ATTEMPT_FAILED));
}
}
dispatcher.await();
//Verify abortJob is called once and the job failed
Mockito.verify(committer, Mockito.timeout(2000).times(1))
.abortJob((JobContext) Mockito.any(), (State) Mockito.any());
assertJobState(job, JobStateInternal.FAILED);
dispatcher.stop();
}
@Test
public void testTransitionsAtFailed() throws IOException {
Configuration conf = new Configuration();
AsyncDispatcher dispatcher = new AsyncDispatcher();
dispatcher.init(conf);
dispatcher.start();
OutputCommitter committer = mock(OutputCommitter.class);
doThrow(new IOException("forcefail"))
.when(committer).setupJob(any(JobContext.class));
CommitterEventHandler commitHandler =
createCommitterEventHandler(dispatcher, committer);
commitHandler.init(conf);
commitHandler.start();
AppContext mockContext = mock(AppContext.class);
when(mockContext.hasSuccessfullyUnregistered()).thenReturn(false);
JobImpl job = createStubbedJob(conf, dispatcher, 2, mockContext);
JobId jobId = job.getID();
job.handle(new JobEvent(jobId, JobEventType.JOB_INIT));
assertJobState(job, JobStateInternal.INITED);
job.handle(new JobStartEvent(jobId));
assertJobState(job, JobStateInternal.FAILED);
job.handle(new JobEvent(jobId, JobEventType.JOB_TASK_COMPLETED));
assertJobState(job, JobStateInternal.FAILED);
job.handle(new JobEvent(jobId, JobEventType.JOB_TASK_ATTEMPT_COMPLETED));
assertJobState(job, JobStateInternal.FAILED);
job.handle(new JobEvent(jobId, JobEventType.JOB_MAP_TASK_RESCHEDULED));
assertJobState(job, JobStateInternal.FAILED);
job.handle(new JobEvent(jobId, JobEventType.JOB_TASK_ATTEMPT_FETCH_FAILURE));
assertJobState(job, JobStateInternal.FAILED);
Assert.assertEquals(JobState.RUNNING, job.getState());
when(mockContext.hasSuccessfullyUnregistered()).thenReturn(true);
Assert.assertEquals(JobState.FAILED, job.getState());
dispatcher.stop();
commitHandler.stop();
}