类org.apache.hadoop.mapred.ReduceTask源码实例Demo

下面列出了怎么用org.apache.hadoop.mapred.ReduceTask的API类实例代码及写法,或者点击链接到github查看源代码。

源代码1 项目: hadoop   文件: TestShufflePlugin.java
@Test
/**
 * A testing method verifying availability and accessibility of API that is needed
 * for sub-classes of ShuffleConsumerPlugin
 */
public void testConsumerApi() {

  JobConf jobConf = new JobConf();
  ShuffleConsumerPlugin<K, V> shuffleConsumerPlugin = new TestShuffleConsumerPlugin<K, V>();

  //mock creation
  ReduceTask mockReduceTask = mock(ReduceTask.class);
  TaskUmbilicalProtocol mockUmbilical = mock(TaskUmbilicalProtocol.class);
  Reporter mockReporter = mock(Reporter.class);
  FileSystem mockFileSystem = mock(FileSystem.class);
  Class<? extends org.apache.hadoop.mapred.Reducer>  combinerClass = jobConf.getCombinerClass();
  @SuppressWarnings("unchecked")  // needed for mock with generic
  CombineOutputCollector<K, V>  mockCombineOutputCollector =
    (CombineOutputCollector<K, V>) mock(CombineOutputCollector.class);
  org.apache.hadoop.mapreduce.TaskAttemptID mockTaskAttemptID =
    mock(org.apache.hadoop.mapreduce.TaskAttemptID.class);
  LocalDirAllocator mockLocalDirAllocator = mock(LocalDirAllocator.class);
  CompressionCodec mockCompressionCodec = mock(CompressionCodec.class);
  Counter mockCounter = mock(Counter.class);
  TaskStatus mockTaskStatus = mock(TaskStatus.class);
  Progress mockProgress = mock(Progress.class);
  MapOutputFile mockMapOutputFile = mock(MapOutputFile.class);
  Task mockTask = mock(Task.class);

  try {
    String [] dirs = jobConf.getLocalDirs();
    // verify that these APIs are available through super class handler
    ShuffleConsumerPlugin.Context<K, V> context =
   new ShuffleConsumerPlugin.Context<K, V>(mockTaskAttemptID, jobConf, mockFileSystem,
                                              mockUmbilical, mockLocalDirAllocator,
                                              mockReporter, mockCompressionCodec,
                                              combinerClass, mockCombineOutputCollector,
                                              mockCounter, mockCounter, mockCounter,
                                              mockCounter, mockCounter, mockCounter,
                                              mockTaskStatus, mockProgress, mockProgress,
                                              mockTask, mockMapOutputFile, null);
    shuffleConsumerPlugin.init(context);
    shuffleConsumerPlugin.run();
    shuffleConsumerPlugin.close();
  }
  catch (Exception e) {
    assertTrue("Threw exception:" + e, false);
  }

  // verify that these APIs are available for 3rd party plugins
  mockReduceTask.getTaskID();
  mockReduceTask.getJobID();
  mockReduceTask.getNumMaps();
  mockReduceTask.getPartition();
  mockReporter.progress();
}
 
源代码2 项目: big-c   文件: TestShufflePlugin.java
@Test
/**
 * A testing method verifying availability and accessibility of API that is needed
 * for sub-classes of ShuffleConsumerPlugin
 */
public void testConsumerApi() {

  JobConf jobConf = new JobConf();
  ShuffleConsumerPlugin<K, V> shuffleConsumerPlugin = new TestShuffleConsumerPlugin<K, V>();

  //mock creation
  ReduceTask mockReduceTask = mock(ReduceTask.class);
  TaskUmbilicalProtocol mockUmbilical = mock(TaskUmbilicalProtocol.class);
  Reporter mockReporter = mock(Reporter.class);
  FileSystem mockFileSystem = mock(FileSystem.class);
  Class<? extends org.apache.hadoop.mapred.Reducer>  combinerClass = jobConf.getCombinerClass();
  @SuppressWarnings("unchecked")  // needed for mock with generic
  CombineOutputCollector<K, V>  mockCombineOutputCollector =
    (CombineOutputCollector<K, V>) mock(CombineOutputCollector.class);
  org.apache.hadoop.mapreduce.TaskAttemptID mockTaskAttemptID =
    mock(org.apache.hadoop.mapreduce.TaskAttemptID.class);
  LocalDirAllocator mockLocalDirAllocator = mock(LocalDirAllocator.class);
  CompressionCodec mockCompressionCodec = mock(CompressionCodec.class);
  Counter mockCounter = mock(Counter.class);
  TaskStatus mockTaskStatus = mock(TaskStatus.class);
  Progress mockProgress = mock(Progress.class);
  MapOutputFile mockMapOutputFile = mock(MapOutputFile.class);
  Task mockTask = mock(Task.class);

  try {
    String [] dirs = jobConf.getLocalDirs();
    // verify that these APIs are available through super class handler
    ShuffleConsumerPlugin.Context<K, V> context =
   new ShuffleConsumerPlugin.Context<K, V>(mockTaskAttemptID, jobConf, mockFileSystem,
                                              mockUmbilical, mockLocalDirAllocator,
                                              mockReporter, mockCompressionCodec,
                                              combinerClass, mockCombineOutputCollector,
                                              mockCounter, mockCounter, mockCounter,
                                              mockCounter, mockCounter, mockCounter,
                                              mockTaskStatus, mockProgress, mockProgress,
                                              mockTask, mockMapOutputFile, null);
    shuffleConsumerPlugin.init(context);
    shuffleConsumerPlugin.run();
    shuffleConsumerPlugin.close();
  }
  catch (Exception e) {
    assertTrue("Threw exception:" + e, false);
  }

  // verify that these APIs are available for 3rd party plugins
  mockReduceTask.getTaskID();
  mockReduceTask.getJobID();
  mockReduceTask.getNumMaps();
  mockReduceTask.getPartition();
  mockReporter.progress();
}
 
源代码3 项目: RDFS   文件: TestReduceTaskFetchFail.java
@SuppressWarnings("deprecation")
@Test
public void testcheckAndInformJobTracker() throws Exception {
  //mock creation
  TaskUmbilicalProtocol mockUmbilical = mock(TaskUmbilicalProtocol.class);
  TaskReporter mockTaskReporter = mock(TaskReporter.class);

  JobConf conf = new JobConf();
  conf.setUser("testuser");
  conf.setJobName("testJob");
  conf.setSessionId("testSession");

  TaskAttemptID tid =  new TaskAttemptID();
  TestReduceTask rTask = new TestReduceTask();
  rTask.setConf(conf);

  ReduceTask.ReduceCopier reduceCopier = rTask.new TestReduceCopier(mockUmbilical, conf, mockTaskReporter);
  reduceCopier.checkAndInformJobTracker(1, tid, false);

  verify(mockTaskReporter, never()).progress();

  reduceCopier.checkAndInformJobTracker(10, tid, false);
  verify(mockTaskReporter, times(1)).progress();

  // Test the config setting
  conf.setInt("mapreduce.reduce.shuffle.maxfetchfailures", 3);

  rTask.setConf(conf);
  reduceCopier = rTask.new TestReduceCopier(mockUmbilical, conf, mockTaskReporter);

  reduceCopier.checkAndInformJobTracker(1, tid, false);
  verify(mockTaskReporter, times(1)).progress();

  reduceCopier.checkAndInformJobTracker(3, tid, false);
  verify(mockTaskReporter, times(2)).progress();

  reduceCopier.checkAndInformJobTracker(5, tid, false);
  verify(mockTaskReporter, times(2)).progress();

  reduceCopier.checkAndInformJobTracker(6, tid, false);
  verify(mockTaskReporter, times(3)).progress();

  // test readError and its config
  reduceCopier.checkAndInformJobTracker(7, tid, true);
  verify(mockTaskReporter, times(4)).progress();

  conf.setBoolean("mapreduce.reduce.shuffle.notify.readerror", false);

  rTask.setConf(conf);
  reduceCopier = rTask.new TestReduceCopier(mockUmbilical, conf, mockTaskReporter);

  reduceCopier.checkAndInformJobTracker(7, tid, true);
  verify(mockTaskReporter, times(4)).progress();

}
 
 类所在包
 类方法
 同包方法