类org.apache.hadoop.mapreduce.OutputCommitter源码实例Demo

下面列出了怎么用org.apache.hadoop.mapreduce.OutputCommitter的API类实例代码及写法,或者点击链接到github查看源代码。

源代码1 项目: big-c   文件: Task.java
@SuppressWarnings("unchecked")
NewCombinerRunner(Class reducerClass,
                  JobConf job,
                  org.apache.hadoop.mapreduce.TaskAttemptID taskId,
                  org.apache.hadoop.mapreduce.TaskAttemptContext context,
                  Counters.Counter inputCounter,
                  TaskReporter reporter,
                  org.apache.hadoop.mapreduce.OutputCommitter committer) {
  super(inputCounter, job, reporter);
  this.reducerClass = reducerClass;
  this.taskId = taskId;
  keyClass = (Class<K>) context.getMapOutputKeyClass();
  valueClass = (Class<V>) context.getMapOutputValueClass();
  comparator = (RawComparator<K>) context.getCombinerKeyGroupingComparator();
  this.committer = committer;
}
 
源代码2 项目: big-c   文件: TestJobImpl.java
@Test(timeout=20000)
public void testCommitJobFailsJob() throws Exception {
  Configuration conf = new Configuration();
  conf.set(MRJobConfig.MR_AM_STAGING_DIR, stagingDir);
  AsyncDispatcher dispatcher = new AsyncDispatcher();
  dispatcher.init(conf);
  dispatcher.start();
  CyclicBarrier syncBarrier = new CyclicBarrier(2);
  OutputCommitter committer = new TestingOutputCommitter(syncBarrier, false);
  CommitterEventHandler commitHandler =
      createCommitterEventHandler(dispatcher, committer);
  commitHandler.init(conf);
  commitHandler.start();

  JobImpl job = createRunningStubbedJob(conf, dispatcher, 2, null);
  completeJobTasks(job);
  assertJobState(job, JobStateInternal.COMMITTING);

  // let the committer fail and verify the job fails
  syncBarrier.await();
  assertJobState(job, JobStateInternal.FAILED);
  dispatcher.stop();
  commitHandler.stop();
}
 
源代码3 项目: hadoop   文件: TestJobImpl.java
@Test(timeout=20000)
public void testCommitJobFailsJob() throws Exception {
  Configuration conf = new Configuration();
  conf.set(MRJobConfig.MR_AM_STAGING_DIR, stagingDir);
  AsyncDispatcher dispatcher = new AsyncDispatcher();
  dispatcher.init(conf);
  dispatcher.start();
  CyclicBarrier syncBarrier = new CyclicBarrier(2);
  OutputCommitter committer = new TestingOutputCommitter(syncBarrier, false);
  CommitterEventHandler commitHandler =
      createCommitterEventHandler(dispatcher, committer);
  commitHandler.init(conf);
  commitHandler.start();

  JobImpl job = createRunningStubbedJob(conf, dispatcher, 2, null);
  completeJobTasks(job);
  assertJobState(job, JobStateInternal.COMMITTING);

  // let the committer fail and verify the job fails
  syncBarrier.await();
  assertJobState(job, JobStateInternal.FAILED);
  dispatcher.stop();
  commitHandler.stop();
}
 
源代码4 项目: big-c   文件: MRApp.java
@SuppressWarnings("rawtypes")
public TestJob(JobId jobId, ApplicationAttemptId applicationAttemptId,
    Configuration conf, EventHandler eventHandler,
    TaskAttemptListener taskAttemptListener, Clock clock,
    OutputCommitter committer, boolean newApiCommitter,
    String user, AppContext appContext,
    JobStateInternal forcedState, String diagnostic) {
  super(jobId, getApplicationAttemptId(applicationId, getStartCount()),
      conf, eventHandler, taskAttemptListener,
      new JobTokenSecretManager(), new Credentials(), clock,
      getCompletedTaskFromPreviousRun(), metrics, committer,
      newApiCommitter, user, System.currentTimeMillis(), getAllAMInfos(),
      appContext, forcedState, diagnostic);

  // This "this leak" is okay because the retained pointer is in an
  //  instance variable.
  localStateMachine = localFactory.make(this);
}
 
源代码5 项目: Flink-CEPplus   文件: HadoopOutputFormatTest.java
@Test
public void testOpen() throws Exception {

	OutputFormat<String, Long> dummyOutputFormat = mock(DummyOutputFormat.class);
	OutputCommitter outputCommitter = setupOutputCommitter(true);
	when(dummyOutputFormat.getOutputCommitter(any(TaskAttemptContext.class))).thenReturn(outputCommitter);

	HadoopOutputFormat<String, Long> hadoopOutputFormat = setupHadoopOutputFormat(dummyOutputFormat,
		Job.getInstance(), new DummyRecordWriter(), setupOutputCommitter(true), new Configuration());

	hadoopOutputFormat.open(1, 4);

	verify(hadoopOutputFormat.outputCommitter, times(1)).setupJob(any(JobContext.class));
	verify(hadoopOutputFormat.mapreduceOutputFormat, times(1)).getRecordWriter(any(TaskAttemptContext.class));
}
 
源代码6 项目: Flink-CEPplus   文件: HadoopOutputFormatTest.java
@Test
public void testCloseWithNeedsTaskCommitTrue() throws Exception {

	RecordWriter<String, Long> recordWriter = Mockito.mock(DummyRecordWriter.class);
	OutputCommitter outputCommitter = setupOutputCommitter(true);

	HadoopOutputFormat<String, Long> hadoopOutputFormat = setupHadoopOutputFormat(new DummyOutputFormat(),
		Job.getInstance(), recordWriter, outputCommitter, new Configuration());

	hadoopOutputFormat.close();

	verify(outputCommitter, times(1)).commitTask(nullable(TaskAttemptContext.class));
	verify(recordWriter, times(1)).close(nullable(TaskAttemptContext.class));
}
 
源代码7 项目: Flink-CEPplus   文件: HadoopOutputFormatTest.java
@Test
public void testCloseWithNeedsTaskCommitFalse() throws Exception {

	RecordWriter<String, Long> recordWriter = Mockito.mock(DummyRecordWriter.class);
	OutputCommitter outputCommitter = setupOutputCommitter(false);

	HadoopOutputFormat<String, Long> hadoopOutputFormat = setupHadoopOutputFormat(new DummyOutputFormat(),
		Job.getInstance(), recordWriter, outputCommitter, new Configuration());

	hadoopOutputFormat.close();

	verify(outputCommitter, times(0)).commitTask(nullable(TaskAttemptContext.class));
	verify(recordWriter, times(1)).close(nullable(TaskAttemptContext.class));
}
 
源代码8 项目: Flink-CEPplus   文件: HadoopOutputFormatTest.java
private OutputCommitter setupOutputCommitter(boolean needsTaskCommit) throws IOException {
	OutputCommitter outputCommitter = Mockito.mock(OutputCommitter.class);
	when(outputCommitter.needsTaskCommit(nullable(TaskAttemptContext.class))).thenReturn(needsTaskCommit);
	doNothing().when(outputCommitter).commitTask(any(TaskAttemptContext.class));

	return outputCommitter;
}
 
源代码9 项目: Flink-CEPplus   文件: HadoopOutputFormatTest.java
private HadoopOutputFormat<String, Long> setupHadoopOutputFormat(
	OutputFormat<String, Long> outputFormat,
	Job job,
	RecordWriter<String, Long> recordWriter,
	OutputCommitter outputCommitter,
	Configuration configuration) {

	HadoopOutputFormat<String, Long> hadoopOutputFormat = new HadoopOutputFormat<>(outputFormat, job);
	hadoopOutputFormat.recordWriter = recordWriter;
	hadoopOutputFormat.outputCommitter = outputCommitter;
	hadoopOutputFormat.configuration = configuration;
	hadoopOutputFormat.configuration.set(MAPRED_OUTPUT_DIR_KEY, MAPRED_OUTPUT_PATH);

	return hadoopOutputFormat;
}
 
源代码10 项目: Flink-CEPplus   文件: HadoopOutputFormatTest.java
@Override
public OutputCommitter getOutputCommitter(TaskAttemptContext context) throws IOException, InterruptedException {
	final OutputCommitter outputCommitter = Mockito.mock(OutputCommitter.class);
	doNothing().when(outputCommitter).setupJob(any(JobContext.class));

	return outputCommitter;
}
 
源代码11 项目: tez   文件: MapContextImpl.java
public MapContextImpl(Configuration conf, TaskAttemptID taskid,
                      RecordReader<KEYIN,VALUEIN> reader,
                      RecordWriter<KEYOUT,VALUEOUT> writer,
                      OutputCommitter committer,
                      TaskContext context,
                      InputSplit split, Reporter reporter) {
  super(conf, taskid, writer, committer, context, reporter);
  this.reader = reader;
  this.split = split;
}
 
源代码12 项目: flink   文件: HadoopOutputFormatTest.java
@Test
public void testCloseWithNeedsTaskCommitFalse() throws Exception {

	RecordWriter<String, Long> recordWriter = Mockito.mock(DummyRecordWriter.class);
	OutputCommitter outputCommitter = setupOutputCommitter(false);

	HadoopOutputFormat<String, Long> hadoopOutputFormat = setupHadoopOutputFormat(new DummyOutputFormat(),
		Job.getInstance(), recordWriter, outputCommitter, new Configuration());

	hadoopOutputFormat.close();

	verify(outputCommitter, times(0)).commitTask(nullable(TaskAttemptContext.class));
	verify(recordWriter, times(1)).close(nullable(TaskAttemptContext.class));
}
 
源代码13 项目: flink   文件: HadoopOutputFormatTest.java
private OutputCommitter setupOutputCommitter(boolean needsTaskCommit) throws IOException {
	OutputCommitter outputCommitter = Mockito.mock(OutputCommitter.class);
	when(outputCommitter.needsTaskCommit(nullable(TaskAttemptContext.class))).thenReturn(needsTaskCommit);
	doNothing().when(outputCommitter).commitTask(any(TaskAttemptContext.class));

	return outputCommitter;
}
 
源代码14 项目: elasticsearch-hadoop   文件: MultiOutputFormat.java
@Override
public boolean needsTaskCommit(TaskAttemptContext taskContext) throws IOException {
    boolean result = false;

    for (OutputCommitter committer : committers) {
        result |= committer.needsTaskCommit(taskContext);
    }

    return result;
}
 
源代码15 项目: stratio-cassandra   文件: HadoopCompat.java
/**
 * Instantiates MapContext under Hadoop 1 and MapContextImpl under Hadoop 2.
 */
public static MapContext newMapContext(Configuration conf,
                                       TaskAttemptID taskAttemptID,
                                       RecordReader recordReader,
                                       RecordWriter recordWriter,
                                       OutputCommitter outputCommitter,
                                       StatusReporter statusReporter,
                                       InputSplit inputSplit) {
    return (MapContext) newInstance(MAP_CONTEXT_CONSTRUCTOR,
            conf, taskAttemptID, recordReader, recordWriter, outputCommitter,
            statusReporter, inputSplit);
}
 
源代码16 项目: 163-bigdate-note   文件: LogOutputFormat.java
private Path getTaskOutputPath(TaskAttemptContext conf) throws IOException {
    Path taskOutputPath;
    OutputCommitter committer = getOutputCommitter(conf);
    if (committer instanceof FileOutputCommitter) {
        taskOutputPath = ((FileOutputCommitter) committer).getWorkPath();
    } else {
        Path outputPaht = getOutputPath(conf);
        if (outputPaht == null) {
            throw new IOException("Undefined job output path.");
        }
        taskOutputPath = outputPaht;
    }
    return taskOutputPath;
}
 
源代码17 项目: 163-bigdate-note   文件: LogOutputFormat.java
private Path getTaskOutputPath(TaskAttemptContext conf) throws IOException {
    Path taskOutputPath;
    OutputCommitter committer = getOutputCommitter(conf);
    if (committer instanceof FileOutputCommitter) {
        taskOutputPath = ((FileOutputCommitter) committer).getWorkPath();
    } else {
        Path outputPaht = getOutputPath(conf);
        if (outputPaht == null) {
            throw new IOException("Undefined job output path.");
        }
        taskOutputPath = outputPaht;
    }
    return taskOutputPath;
}
 
源代码18 项目: flink   文件: HadoopOutputFormatTest.java
@Test
public void testOpen() throws Exception {

	OutputFormat<String, Long> dummyOutputFormat = mock(DummyOutputFormat.class);
	OutputCommitter outputCommitter = setupOutputCommitter(true);
	when(dummyOutputFormat.getOutputCommitter(any(TaskAttemptContext.class))).thenReturn(outputCommitter);

	HadoopOutputFormat<String, Long> hadoopOutputFormat = setupHadoopOutputFormat(dummyOutputFormat,
		Job.getInstance(), new DummyRecordWriter(), setupOutputCommitter(true), new Configuration());

	hadoopOutputFormat.open(1, 4);

	verify(hadoopOutputFormat.outputCommitter, times(1)).setupJob(any(JobContext.class));
	verify(hadoopOutputFormat.mapreduceOutputFormat, times(1)).getRecordWriter(any(TaskAttemptContext.class));
}
 
源代码19 项目: big-c   文件: TestRecovery.java
private void writeBadOutput(TaskAttempt attempt, Configuration conf)
  throws Exception {
  TaskAttemptContext tContext = new TaskAttemptContextImpl(conf, 
      TypeConverter.fromYarn(attempt.getID()));
 
  TextOutputFormat<?, ?> theOutputFormat = new TextOutputFormat();
  RecordWriter theRecordWriter = theOutputFormat
      .getRecordWriter(tContext);
  
  NullWritable nullWritable = NullWritable.get();
  try {
    theRecordWriter.write(key2, val2);
    theRecordWriter.write(null, nullWritable);
    theRecordWriter.write(null, val2);
    theRecordWriter.write(nullWritable, val1);
    theRecordWriter.write(key1, nullWritable);
    theRecordWriter.write(key2, null);
    theRecordWriter.write(null, null);
    theRecordWriter.write(key1, val1);
  } finally {
    theRecordWriter.close(tContext);
  }
  
  OutputFormat outputFormat = ReflectionUtils.newInstance(
      tContext.getOutputFormatClass(), conf);
  OutputCommitter committer = outputFormat.getOutputCommitter(tContext);
  committer.commitTask(tContext);
}
 
源代码20 项目: big-c   文件: Task.java
@SuppressWarnings("unchecked")
protected static <INKEY,INVALUE,OUTKEY,OUTVALUE> 
org.apache.hadoop.mapreduce.Reducer<INKEY,INVALUE,OUTKEY,OUTVALUE>.Context
createReduceContext(org.apache.hadoop.mapreduce.Reducer
                      <INKEY,INVALUE,OUTKEY,OUTVALUE> reducer,
                    Configuration job,
                    org.apache.hadoop.mapreduce.TaskAttemptID taskId, 
                    RawKeyValueIterator rIter,
                    org.apache.hadoop.mapreduce.Counter inputKeyCounter,
                    org.apache.hadoop.mapreduce.Counter inputValueCounter,
                    org.apache.hadoop.mapreduce.RecordWriter<OUTKEY,OUTVALUE> output, 
                    org.apache.hadoop.mapreduce.OutputCommitter committer,
                    org.apache.hadoop.mapreduce.StatusReporter reporter,
                    RawComparator<INKEY> comparator,
                    Class<INKEY> keyClass, Class<INVALUE> valueClass
) throws IOException, InterruptedException {
  org.apache.hadoop.mapreduce.ReduceContext<INKEY, INVALUE, OUTKEY, OUTVALUE> 
  reduceContext = 
    new ReduceContextImpl<INKEY, INVALUE, OUTKEY, OUTVALUE>(job, taskId, 
                                                            rIter, 
                                                            inputKeyCounter, 
                                                            inputValueCounter, 
                                                            output, 
                                                            committer, 
                                                            reporter, 
                                                            comparator, 
                                                            keyClass, 
                                                            valueClass);

  org.apache.hadoop.mapreduce.Reducer<INKEY,INVALUE,OUTKEY,OUTVALUE>.Context 
      reducerContext = 
        new WrappedReducer<INKEY, INVALUE, OUTKEY, OUTVALUE>().getReducerContext(
            reduceContext);

  return reducerContext;
}
 
源代码21 项目: spork   文件: PigOutputFormat.java
@Override
public OutputCommitter getOutputCommitter(TaskAttemptContext
        taskattemptcontext) throws IOException, InterruptedException {
    setupUdfEnvAndStores(taskattemptcontext);

    // we return an instance of PigOutputCommitter to Hadoop - this instance
    // will wrap the real OutputCommitter(s) belonging to the store(s)
    return new PigOutputCommitter(taskattemptcontext,
            mapStores,
            reduceStores);
}
 
源代码22 项目: flink   文件: HadoopOutputFormatTest.java
private HadoopOutputFormat<String, Long> setupHadoopOutputFormat(
	OutputFormat<String, Long> outputFormat,
	Job job,
	RecordWriter<String, Long> recordWriter,
	OutputCommitter outputCommitter,
	Configuration configuration) {

	HadoopOutputFormat<String, Long> hadoopOutputFormat = new HadoopOutputFormat<>(outputFormat, job);
	hadoopOutputFormat.recordWriter = recordWriter;
	hadoopOutputFormat.outputCommitter = outputCommitter;
	hadoopOutputFormat.configuration = configuration;
	hadoopOutputFormat.configuration.set(MAPRED_OUTPUT_DIR_KEY, MAPRED_OUTPUT_PATH);

	return hadoopOutputFormat;
}
 
源代码23 项目: tez   文件: TaskInputOutputContextImpl.java
public TaskInputOutputContextImpl(Configuration conf, TaskAttemptID taskid,
                                  RecordWriter<KEYOUT,VALUEOUT> output,
                                  OutputCommitter committer,
                                  TaskContext context, Reporter reporter) {
  super(conf, taskid, context.getCounters(), reporter);
  this.output = output;
  this.committer = committer;
}
 
源代码24 项目: spork   文件: PigOutputFormatTez.java
@Override
public OutputCommitter getOutputCommitter(
        TaskAttemptContext taskattemptcontext) throws IOException,
        InterruptedException {
    setupUdfEnvAndStores(taskattemptcontext);

    // we return an instance of PigOutputCommitterTez (PIG-4202) to Hadoop - this instance
    // will wrap the real OutputCommitter(s) belonging to the store(s)
    return new PigOutputCommitterTez(taskattemptcontext,
            mapStores,
            reduceStores);
}
 
源代码25 项目: flink   文件: HadoopOutputFormatTest.java
@Test
public void testCloseWithNeedsTaskCommitFalse() throws Exception {

	RecordWriter<String, Long> recordWriter = Mockito.mock(DummyRecordWriter.class);
	OutputCommitter outputCommitter = setupOutputCommitter(false);

	HadoopOutputFormat<String, Long> hadoopOutputFormat = setupHadoopOutputFormat(new DummyOutputFormat(),
		Job.getInstance(), recordWriter, outputCommitter, new Configuration());

	hadoopOutputFormat.close();

	verify(outputCommitter, times(0)).commitTask(nullable(TaskAttemptContext.class));
	verify(recordWriter, times(1)).close(nullable(TaskAttemptContext.class));
}
 
源代码26 项目: hadoop   文件: TestJobImpl.java
@Test
public void testJobNoTasks() {
  Configuration conf = new Configuration();
  conf.setInt(MRJobConfig.NUM_REDUCES, 0);
  conf.set(MRJobConfig.MR_AM_STAGING_DIR, stagingDir);
  conf.set(MRJobConfig.WORKFLOW_ID, "testId");
  conf.set(MRJobConfig.WORKFLOW_NAME, "testName");
  conf.set(MRJobConfig.WORKFLOW_NODE_NAME, "testNodeName");
  conf.set(MRJobConfig.WORKFLOW_ADJACENCY_PREFIX_STRING + "key1", "value1");
  conf.set(MRJobConfig.WORKFLOW_ADJACENCY_PREFIX_STRING + "key2", "value2");
  conf.set(MRJobConfig.WORKFLOW_TAGS, "tag1,tag2");
  
 
  AsyncDispatcher dispatcher = new AsyncDispatcher();
  dispatcher.init(conf);
  dispatcher.start();
  OutputCommitter committer = mock(OutputCommitter.class);
  CommitterEventHandler commitHandler =
      createCommitterEventHandler(dispatcher, committer);
  commitHandler.init(conf);
  commitHandler.start();

  JobSubmittedEventHandler jseHandler = new JobSubmittedEventHandler("testId",
      "testName", "testNodeName", "\"key2\"=\"value2\" \"key1\"=\"value1\" ",
      "tag1,tag2");
  dispatcher.register(EventType.class, jseHandler);
  JobImpl job = createStubbedJob(conf, dispatcher, 0, null);
  job.handle(new JobEvent(job.getID(), JobEventType.JOB_INIT));
  assertJobState(job, JobStateInternal.INITED);
  job.handle(new JobStartEvent(job.getID()));
  assertJobState(job, JobStateInternal.SUCCEEDED);
  dispatcher.stop();
  commitHandler.stop();
  try {
    Assert.assertTrue(jseHandler.getAssertValue());
  } catch (InterruptedException e) {
    Assert.fail("Workflow related attributes are not tested properly");
  }
}
 
源代码27 项目: big-c   文件: TestJobImpl.java
@Test(timeout=20000)
public void testKilledDuringKillAbort() throws Exception {
  Configuration conf = new Configuration();
  conf.set(MRJobConfig.MR_AM_STAGING_DIR, stagingDir);
  AsyncDispatcher dispatcher = new AsyncDispatcher();
  dispatcher.init(conf);
  dispatcher.start();
  OutputCommitter committer = new StubbedOutputCommitter() {
    @Override
    public synchronized void abortJob(JobContext jobContext, State state)
        throws IOException {
      while (!Thread.interrupted()) {
        try {
          wait();
        } catch (InterruptedException e) {
        }
      }
    }
  };
  CommitterEventHandler commitHandler =
      createCommitterEventHandler(dispatcher, committer);
  commitHandler.init(conf);
  commitHandler.start();

  JobImpl job = createStubbedJob(conf, dispatcher, 2, null);
  JobId jobId = job.getID();
  job.handle(new JobEvent(jobId, JobEventType.JOB_INIT));
  assertJobState(job, JobStateInternal.INITED);
  job.handle(new JobStartEvent(jobId));
  assertJobState(job, JobStateInternal.SETUP);

  job.handle(new JobEvent(jobId, JobEventType.JOB_KILL));
  assertJobState(job, JobStateInternal.KILL_ABORT);

  job.handle(new JobEvent(jobId, JobEventType.JOB_KILL));
  assertJobState(job, JobStateInternal.KILLED);
  dispatcher.stop();
  commitHandler.stop();
}
 
@Override
public synchronized OutputCommitter getOutputCommitter(TaskAttemptContext context) throws IOException {
  if (this.committer == null) {
    this.committer = new CompactorOutputCommitter(FileOutputFormat.getOutputPath(context), context);
  }
  return this.committer;
}
 
源代码29 项目: hadoop   文件: TestJobImpl.java
@Test (timeout=10000)
public void testFailAbortDoesntHang() throws IOException {
  Configuration conf = new Configuration();
  conf.set(MRJobConfig.MR_AM_STAGING_DIR, stagingDir);
  conf.set(MRJobConfig.MR_AM_COMMITTER_CANCEL_TIMEOUT_MS, "1000");
  
  DrainDispatcher dispatcher = new DrainDispatcher();
  dispatcher.init(conf);
  dispatcher.start();
  OutputCommitter committer = Mockito.mock(OutputCommitter.class);
  CommitterEventHandler commitHandler =
      createCommitterEventHandler(dispatcher, committer);
  commitHandler.init(conf);
  commitHandler.start();
  //Job has only 1 mapper task. No reducers
  conf.setInt(MRJobConfig.NUM_REDUCES, 0);
  conf.setInt(MRJobConfig.MAP_MAX_ATTEMPTS, 1);
  JobImpl job = createRunningStubbedJob(conf, dispatcher, 1, null);

  //Fail / finish all the tasks. This should land the JobImpl directly in the
  //FAIL_ABORT state
  for(Task t: job.tasks.values()) {
    TaskImpl task = (TaskImpl) t;
    task.handle(new TaskEvent(task.getID(), TaskEventType.T_SCHEDULE));
    for(TaskAttempt ta: task.getAttempts().values()) {
      task.handle(new TaskTAttemptEvent(ta.getID(),
        TaskEventType.T_ATTEMPT_FAILED));
    }
  }

  dispatcher.await();
  //Verify abortJob is called once and the job failed
  Mockito.verify(committer, Mockito.timeout(2000).times(1))
    .abortJob((JobContext) Mockito.any(), (State) Mockito.any());
  assertJobState(job, JobStateInternal.FAILED);

  dispatcher.stop();
}
 
源代码30 项目: hadoop   文件: TestJobImpl.java
@Test
public void testTransitionsAtFailed() throws IOException {
  Configuration conf = new Configuration();
  AsyncDispatcher dispatcher = new AsyncDispatcher();
  dispatcher.init(conf);
  dispatcher.start();

  OutputCommitter committer = mock(OutputCommitter.class);
  doThrow(new IOException("forcefail"))
    .when(committer).setupJob(any(JobContext.class));
  CommitterEventHandler commitHandler =
      createCommitterEventHandler(dispatcher, committer);
  commitHandler.init(conf);
  commitHandler.start();

  AppContext mockContext = mock(AppContext.class);
  when(mockContext.hasSuccessfullyUnregistered()).thenReturn(false);
  JobImpl job = createStubbedJob(conf, dispatcher, 2, mockContext);
  JobId jobId = job.getID();
  job.handle(new JobEvent(jobId, JobEventType.JOB_INIT));
  assertJobState(job, JobStateInternal.INITED);
  job.handle(new JobStartEvent(jobId));
  assertJobState(job, JobStateInternal.FAILED);

  job.handle(new JobEvent(jobId, JobEventType.JOB_TASK_COMPLETED));
  assertJobState(job, JobStateInternal.FAILED);
  job.handle(new JobEvent(jobId, JobEventType.JOB_TASK_ATTEMPT_COMPLETED));
  assertJobState(job, JobStateInternal.FAILED);
  job.handle(new JobEvent(jobId, JobEventType.JOB_MAP_TASK_RESCHEDULED));
  assertJobState(job, JobStateInternal.FAILED);
  job.handle(new JobEvent(jobId, JobEventType.JOB_TASK_ATTEMPT_FETCH_FAILURE));
  assertJobState(job, JobStateInternal.FAILED);
  Assert.assertEquals(JobState.RUNNING, job.getState());
  when(mockContext.hasSuccessfullyUnregistered()).thenReturn(true);
  Assert.assertEquals(JobState.FAILED, job.getState());

  dispatcher.stop();
  commitHandler.stop();
}
 
 同包方法