类org.apache.hadoop.mapred.RawKeyValueIterator源码实例Demo

下面列出了怎么用org.apache.hadoop.mapred.RawKeyValueIterator的API类实例代码及写法,或者点击链接到github查看源代码。

源代码1 项目: hadoop   文件: MergeManagerImpl.java
@Override
public RawKeyValueIterator close() throws Throwable {
  // Wait for on-going merges to complete
  if (memToMemMerger != null) { 
    memToMemMerger.close();
  }
  inMemoryMerger.close();
  onDiskMerger.close();
  
  List<InMemoryMapOutput<K, V>> memory = 
    new ArrayList<InMemoryMapOutput<K, V>>(inMemoryMergedMapOutputs);
  inMemoryMergedMapOutputs.clear();
  memory.addAll(inMemoryMapOutputs);
  inMemoryMapOutputs.clear();
  List<CompressAwarePath> disk = new ArrayList<CompressAwarePath>(onDiskMapOutputs);
  onDiskMapOutputs.clear();
  return finalMerge(jobConf, rfs, memory, disk);
}
 
源代码2 项目: hadoop   文件: MergeManagerImpl.java
private void combineAndSpill(
    RawKeyValueIterator kvIter,
    Counters.Counter inCounter) throws IOException {
  JobConf job = jobConf;
  Reducer combiner = ReflectionUtils.newInstance(combinerClass, job);
  Class<K> keyClass = (Class<K>) job.getMapOutputKeyClass();
  Class<V> valClass = (Class<V>) job.getMapOutputValueClass();
  RawComparator<K> comparator = 
    (RawComparator<K>)job.getCombinerKeyGroupingComparator();
  try {
    CombineValuesIterator values = new CombineValuesIterator(
        kvIter, comparator, keyClass, valClass, job, Reporter.NULL,
        inCounter);
    while (values.more()) {
      combiner.reduce(values.getKey(), values, combineCollector,
                      Reporter.NULL);
      values.nextKey();
    }
  } finally {
    combiner.close();
  }
}
 
源代码3 项目: big-c   文件: MergeManagerImpl.java
@Override
public RawKeyValueIterator close() throws Throwable {
  // Wait for on-going merges to complete
  if (memToMemMerger != null) { 
    memToMemMerger.close();
  }
  inMemoryMerger.close();
  onDiskMerger.close();
  
  List<InMemoryMapOutput<K, V>> memory = 
    new ArrayList<InMemoryMapOutput<K, V>>(inMemoryMergedMapOutputs);
  inMemoryMergedMapOutputs.clear();
  memory.addAll(inMemoryMapOutputs);
  inMemoryMapOutputs.clear();
  List<CompressAwarePath> disk = new ArrayList<CompressAwarePath>(onDiskMapOutputs);
  onDiskMapOutputs.clear();
  return finalMerge(jobConf, rfs, memory, disk);
}
 
源代码4 项目: big-c   文件: MergeManagerImpl.java
private void combineAndSpill(
    RawKeyValueIterator kvIter,
    Counters.Counter inCounter) throws IOException {
  JobConf job = jobConf;
  Reducer combiner = ReflectionUtils.newInstance(combinerClass, job);
  Class<K> keyClass = (Class<K>) job.getMapOutputKeyClass();
  Class<V> valClass = (Class<V>) job.getMapOutputValueClass();
  RawComparator<K> comparator = 
    (RawComparator<K>)job.getCombinerKeyGroupingComparator();
  try {
    CombineValuesIterator values = new CombineValuesIterator(
        kvIter, comparator, keyClass, valClass, job, Reporter.NULL,
        inCounter);
    while (values.more()) {
      combiner.reduce(values.getKey(), values, combineCollector,
                      Reporter.NULL);
      values.nextKey();
    }
  } finally {
    combiner.close();
  }
}
 
源代码5 项目: RDFS   文件: ReduceContext.java
public ReduceContext(Configuration conf, TaskAttemptID taskid,
                     RawKeyValueIterator input, 
                     Counter inputKeyCounter,
                     Counter inputValueCounter,
                     RecordWriter<KEYOUT,VALUEOUT> output,
                     OutputCommitter committer,
                     StatusReporter reporter,
                     RawComparator<KEYIN> comparator,
                     Class<KEYIN> keyClass,
                     Class<VALUEIN> valueClass
                     ) throws InterruptedException, IOException{
  super(conf, taskid, output, committer, reporter);
  this.input = input;
  this.inputKeyCounter = inputKeyCounter;
  this.inputValueCounter = inputValueCounter;
  this.comparator = comparator;
  SerializationFactory serializationFactory = new SerializationFactory(conf);
  this.keyDeserializer = serializationFactory.getDeserializer(keyClass);
  this.keyDeserializer.open(buffer);
  this.valueDeserializer = serializationFactory.getDeserializer(valueClass);
  this.valueDeserializer.open(buffer);
  hasMore = input.next();
}
 
源代码6 项目: hadoop-gpu   文件: ReduceContext.java
public ReduceContext(Configuration conf, TaskAttemptID taskid,
                     RawKeyValueIterator input, 
                     Counter inputCounter,
                     RecordWriter<KEYOUT,VALUEOUT> output,
                     OutputCommitter committer,
                     StatusReporter reporter,
                     RawComparator<KEYIN> comparator,
                     Class<KEYIN> keyClass,
                     Class<VALUEIN> valueClass
                     ) throws InterruptedException, IOException{
  super(conf, taskid, output, committer, reporter);
  this.input = input;
  this.inputCounter = inputCounter;
  this.comparator = comparator;
  SerializationFactory serializationFactory = new SerializationFactory(conf);
  this.keyDeserializer = serializationFactory.getDeserializer(keyClass);
  this.keyDeserializer.open(buffer);
  this.valueDeserializer = serializationFactory.getDeserializer(valueClass);
  this.valueDeserializer.open(buffer);
  hasMore = input.next();
}
 
源代码7 项目: hadoop   文件: ReduceContextImpl.java
public ReduceContextImpl(Configuration conf, TaskAttemptID taskid,
                         RawKeyValueIterator input, 
                         Counter inputKeyCounter,
                         Counter inputValueCounter,
                         RecordWriter<KEYOUT,VALUEOUT> output,
                         OutputCommitter committer,
                         StatusReporter reporter,
                         RawComparator<KEYIN> comparator,
                         Class<KEYIN> keyClass,
                         Class<VALUEIN> valueClass
                        ) throws InterruptedException, IOException{
  super(conf, taskid, output, committer, reporter);
  this.input = input;
  this.inputKeyCounter = inputKeyCounter;
  this.inputValueCounter = inputValueCounter;
  this.comparator = comparator;
  this.serializationFactory = new SerializationFactory(conf);
  this.keyDeserializer = serializationFactory.getDeserializer(keyClass);
  this.keyDeserializer.open(buffer);
  this.valueDeserializer = serializationFactory.getDeserializer(valueClass);
  this.valueDeserializer.open(buffer);
  hasMore = input.next();
  this.keyClass = keyClass;
  this.valueClass = valueClass;
  this.conf = conf;
  this.taskid = taskid;
}
 
源代码8 项目: hadoop   文件: MergeManagerImpl.java
@Override
public void merge(List<InMemoryMapOutput<K, V>> inputs) throws IOException {
  if (inputs == null || inputs.size() == 0) {
    return;
  }

  TaskAttemptID dummyMapId = inputs.get(0).getMapId(); 
  List<Segment<K, V>> inMemorySegments = new ArrayList<Segment<K, V>>();
  long mergeOutputSize = 
    createInMemorySegments(inputs, inMemorySegments, 0);
  int noInMemorySegments = inMemorySegments.size();
  
  InMemoryMapOutput<K, V> mergedMapOutputs = 
    unconditionalReserve(dummyMapId, mergeOutputSize, false);
  
  Writer<K, V> writer = 
    new InMemoryWriter<K, V>(mergedMapOutputs.getArrayStream());
  
  LOG.info("Initiating Memory-to-Memory merge with " + noInMemorySegments +
           " segments of total-size: " + mergeOutputSize);

  RawKeyValueIterator rIter = 
    Merger.merge(jobConf, rfs,
                 (Class<K>)jobConf.getMapOutputKeyClass(),
                 (Class<V>)jobConf.getMapOutputValueClass(),
                 inMemorySegments, inMemorySegments.size(),
                 new Path(reduceId.toString()),
                 (RawComparator<K>)jobConf.getOutputKeyComparator(),
                 reporter, null, null, null);
  Merger.writeFile(rIter, writer, reporter, jobConf);
  writer.close();

  LOG.info(reduceId +  
           " Memory-to-Memory merge of the " + noInMemorySegments +
           " files in-memory complete.");

  // Note the output of the merge
  closeInMemoryMergedFile(mergedMapOutputs);
}
 
源代码9 项目: big-c   文件: ReduceContextImpl.java
public ReduceContextImpl(Configuration conf, TaskAttemptID taskid,
                         RawKeyValueIterator input, 
                         Counter inputKeyCounter,
                         Counter inputValueCounter,
                         RecordWriter<KEYOUT,VALUEOUT> output,
                         OutputCommitter committer,
                         StatusReporter reporter,
                         RawComparator<KEYIN> comparator,
                         Class<KEYIN> keyClass,
                         Class<VALUEIN> valueClass
                        ) throws InterruptedException, IOException{
  super(conf, taskid, output, committer, reporter);
  this.input = input;
  this.inputKeyCounter = inputKeyCounter;
  this.inputValueCounter = inputValueCounter;
  this.comparator = comparator;
  this.serializationFactory = new SerializationFactory(conf);
  this.keyDeserializer = serializationFactory.getDeserializer(keyClass);
  this.keyDeserializer.open(buffer);
  this.valueDeserializer = serializationFactory.getDeserializer(valueClass);
  this.valueDeserializer.open(buffer);
  hasMore = input.next();
  this.keyClass = keyClass;
  this.valueClass = valueClass;
  this.conf = conf;
  this.taskid = taskid;
}
 
源代码10 项目: big-c   文件: MergeManagerImpl.java
@Override
public void merge(List<InMemoryMapOutput<K, V>> inputs) throws IOException {
  if (inputs == null || inputs.size() == 0) {
    return;
  }

  TaskAttemptID dummyMapId = inputs.get(0).getMapId(); 
  List<Segment<K, V>> inMemorySegments = new ArrayList<Segment<K, V>>();
  long mergeOutputSize = 
    createInMemorySegments(inputs, inMemorySegments, 0);
  int noInMemorySegments = inMemorySegments.size();
  
  InMemoryMapOutput<K, V> mergedMapOutputs = 
    unconditionalReserve(dummyMapId, mergeOutputSize, false);
  
  Writer<K, V> writer = 
    new InMemoryWriter<K, V>(mergedMapOutputs.getArrayStream());
  
  LOG.info("Initiating Memory-to-Memory merge with " + noInMemorySegments +
           " segments of total-size: " + mergeOutputSize);

  RawKeyValueIterator rIter = 
    Merger.merge(jobConf, rfs,
                 (Class<K>)jobConf.getMapOutputKeyClass(),
                 (Class<V>)jobConf.getMapOutputValueClass(),
                 inMemorySegments, inMemorySegments.size(),
                 new Path(reduceId.toString()),
                 (RawComparator<K>)jobConf.getOutputKeyComparator(),
                 reporter, null, null, null);
  Merger.writeFile(rIter, writer, reporter, jobConf);
  writer.close();

  LOG.info(reduceId +  
           " Memory-to-Memory merge of the " + noInMemorySegments +
           " files in-memory complete.");

  // Note the output of the merge
  closeInMemoryMergedFile(mergedMapOutputs);
}
 
源代码11 项目: RDFS   文件: Reducer.java
public Context(Configuration conf, TaskAttemptID taskid,
               RawKeyValueIterator input, 
               Counter inputKeyCounter,
               Counter inputValueCounter,
               RecordWriter<KEYOUT,VALUEOUT> output,
               OutputCommitter committer,
               StatusReporter reporter,
               RawComparator<KEYIN> comparator,
               Class<KEYIN> keyClass,
               Class<VALUEIN> valueClass
               ) throws IOException, InterruptedException {
  super(conf, taskid, input, inputKeyCounter, inputValueCounter,
        output, committer, reporter, 
        comparator, keyClass, valueClass);
}
 
源代码12 项目: hadoop-gpu   文件: Reducer.java
public Context(Configuration conf, TaskAttemptID taskid,
               RawKeyValueIterator input, 
               Counter inputCounter,
               RecordWriter<KEYOUT,VALUEOUT> output,
               OutputCommitter committer,
               StatusReporter reporter,
               RawComparator<KEYIN> comparator,
               Class<KEYIN> keyClass,
               Class<VALUEIN> valueClass
               ) throws IOException, InterruptedException {
  super(conf, taskid, input, inputCounter, output, committer, reporter, 
        comparator, keyClass, valueClass);
}
 
源代码13 项目: hadoop   文件: MergeManagerImpl.java
@Override
public void merge(List<InMemoryMapOutput<K,V>> inputs) throws IOException {
  if (inputs == null || inputs.size() == 0) {
    return;
  }
  
  //name this output file same as the name of the first file that is 
  //there in the current list of inmem files (this is guaranteed to
  //be absent on the disk currently. So we don't overwrite a prev. 
  //created spill). Also we need to create the output file now since
  //it is not guaranteed that this file will be present after merge
  //is called (we delete empty files as soon as we see them
  //in the merge method)

  //figure out the mapId 
  TaskAttemptID mapId = inputs.get(0).getMapId();
  TaskID mapTaskId = mapId.getTaskID();

  List<Segment<K, V>> inMemorySegments = new ArrayList<Segment<K, V>>();
  long mergeOutputSize = 
    createInMemorySegments(inputs, inMemorySegments,0);
  int noInMemorySegments = inMemorySegments.size();

  Path outputPath = 
    mapOutputFile.getInputFileForWrite(mapTaskId,
                                       mergeOutputSize).suffix(
                                           Task.MERGED_OUTPUT_PREFIX);

  FSDataOutputStream out = CryptoUtils.wrapIfNecessary(jobConf, rfs.create(outputPath));
  Writer<K, V> writer = new Writer<K, V>(jobConf, out,
      (Class<K>) jobConf.getMapOutputKeyClass(),
      (Class<V>) jobConf.getMapOutputValueClass(), codec, null, true);

  RawKeyValueIterator rIter = null;
  CompressAwarePath compressAwarePath;
  try {
    LOG.info("Initiating in-memory merge with " + noInMemorySegments + 
             " segments...");
    
    rIter = Merger.merge(jobConf, rfs,
                         (Class<K>)jobConf.getMapOutputKeyClass(),
                         (Class<V>)jobConf.getMapOutputValueClass(),
                         inMemorySegments, inMemorySegments.size(),
                         new Path(reduceId.toString()),
                         (RawComparator<K>)jobConf.getOutputKeyComparator(),
                         reporter, spilledRecordsCounter, null, null);
    
    if (null == combinerClass) {
      Merger.writeFile(rIter, writer, reporter, jobConf);
    } else {
      combineCollector.setWriter(writer);
      combineAndSpill(rIter, reduceCombineInputCounter);
    }
    writer.close();
    compressAwarePath = new CompressAwarePath(outputPath,
        writer.getRawLength(), writer.getCompressedLength());

    LOG.info(reduceId +  
        " Merge of the " + noInMemorySegments +
        " files in-memory complete." +
        " Local file is " + outputPath + " of size " + 
        localFS.getFileStatus(outputPath).getLen());
  } catch (IOException e) { 
    //make sure that we delete the ondisk file that we created 
    //earlier when we invoked cloneFileAttributes
    localFS.delete(outputPath, true);
    throw e;
  }

  // Note the output of the merge
  closeOnDiskFile(compressAwarePath);
}
 
源代码14 项目: hadoop   文件: MergeManagerImpl.java
@Override
public void merge(List<CompressAwarePath> inputs) throws IOException {
  // sanity check
  if (inputs == null || inputs.isEmpty()) {
    LOG.info("No ondisk files to merge...");
    return;
  }
  
  long approxOutputSize = 0;
  int bytesPerSum = 
    jobConf.getInt("io.bytes.per.checksum", 512);
  
  LOG.info("OnDiskMerger: We have  " + inputs.size() + 
           " map outputs on disk. Triggering merge...");
  
  // 1. Prepare the list of files to be merged. 
  for (CompressAwarePath file : inputs) {
    approxOutputSize += localFS.getFileStatus(file).getLen();
  }

  // add the checksum length
  approxOutputSize += 
    ChecksumFileSystem.getChecksumLength(approxOutputSize, bytesPerSum);

  // 2. Start the on-disk merge process
  Path outputPath = 
    localDirAllocator.getLocalPathForWrite(inputs.get(0).toString(), 
        approxOutputSize, jobConf).suffix(Task.MERGED_OUTPUT_PREFIX);

  FSDataOutputStream out = CryptoUtils.wrapIfNecessary(jobConf, rfs.create(outputPath));
  Writer<K, V> writer = new Writer<K, V>(jobConf, out,
      (Class<K>) jobConf.getMapOutputKeyClass(),
      (Class<V>) jobConf.getMapOutputValueClass(), codec, null, true);

  RawKeyValueIterator iter  = null;
  CompressAwarePath compressAwarePath;
  Path tmpDir = new Path(reduceId.toString());
  try {
    iter = Merger.merge(jobConf, rfs,
                        (Class<K>) jobConf.getMapOutputKeyClass(),
                        (Class<V>) jobConf.getMapOutputValueClass(),
                        codec, inputs.toArray(new Path[inputs.size()]), 
                        true, ioSortFactor, tmpDir, 
                        (RawComparator<K>) jobConf.getOutputKeyComparator(), 
                        reporter, spilledRecordsCounter, null, 
                        mergedMapOutputsCounter, null);

    Merger.writeFile(iter, writer, reporter, jobConf);
    writer.close();
    compressAwarePath = new CompressAwarePath(outputPath,
        writer.getRawLength(), writer.getCompressedLength());
  } catch (IOException e) {
    localFS.delete(outputPath, true);
    throw e;
  }

  closeOnDiskFile(compressAwarePath);

  LOG.info(reduceId +
      " Finished merging " + inputs.size() + 
      " map output files on disk of total-size " + 
      approxOutputSize + "." + 
      " Local output file is " + outputPath + " of size " +
      localFS.getFileStatus(outputPath).getLen());
}
 
源代码15 项目: hadoop   文件: MergeManagerImpl.java
public RawKVIteratorReader(RawKeyValueIterator kvIter, long size)
    throws IOException {
  super(null, null, size, null, spilledRecordsCounter);
  this.kvIter = kvIter;
}
 
源代码16 项目: hadoop   文件: Shuffle.java
@Override
public RawKeyValueIterator run() throws IOException, InterruptedException {
  // Scale the maximum events we fetch per RPC call to mitigate OOM issues
  // on the ApplicationMaster when a thundering herd of reducers fetch events
  // TODO: This should not be necessary after HADOOP-8942
  int eventsPerReducer = Math.max(MIN_EVENTS_TO_FETCH,
      MAX_RPC_OUTSTANDING_EVENTS / jobConf.getNumReduceTasks());
  int maxEventsToFetch = Math.min(MAX_EVENTS_TO_FETCH, eventsPerReducer);

  // Start the map-completion events fetcher thread
  final EventFetcher<K,V> eventFetcher = 
    new EventFetcher<K,V>(reduceId, umbilical, scheduler, this,
        maxEventsToFetch);
  eventFetcher.start();
  
  // Start the map-output fetcher threads
  boolean isLocal = localMapFiles != null;
  final int numFetchers = isLocal ? 1 :
    jobConf.getInt(MRJobConfig.SHUFFLE_PARALLEL_COPIES, 5);
  Fetcher<K,V>[] fetchers = new Fetcher[numFetchers];
  if (isLocal) {
    fetchers[0] = new LocalFetcher<K, V>(jobConf, reduceId, scheduler,
        merger, reporter, metrics, this, reduceTask.getShuffleSecret(),
        localMapFiles);
    fetchers[0].start();
  } else {
    for (int i=0; i < numFetchers; ++i) {
      fetchers[i] = new Fetcher<K,V>(jobConf, reduceId, scheduler, merger, 
                                     reporter, metrics, this, 
                                     reduceTask.getShuffleSecret());
      fetchers[i].start();
    }
  }
  
  // Wait for shuffle to complete successfully
  while (!scheduler.waitUntilDone(PROGRESS_FREQUENCY)) {
    reporter.progress();
    
    synchronized (this) {
      if (throwable != null) {
        throw new ShuffleError("error in shuffle in " + throwingThreadName,
                               throwable);
      }
    }
  }

  // Stop the event-fetcher thread
  eventFetcher.shutDown();
  
  // Stop the map-output fetcher threads
  for (Fetcher<K,V> fetcher : fetchers) {
    fetcher.shutDown();
  }
  
  // stop the scheduler
  scheduler.close();

  copyPhase.complete(); // copy is already complete
  taskStatus.setPhase(TaskStatus.Phase.SORT);
  reduceTask.statusUpdate(umbilical);

  // Finish the on-going merges...
  RawKeyValueIterator kvIter = null;
  try {
    kvIter = merger.close();
  } catch (Throwable e) {
    throw new ShuffleError("Error while doing final merge " , e);
  }

  // Sanity check
  synchronized (this) {
    if (throwable != null) {
      throw new ShuffleError("error in shuffle in " + throwingThreadName,
                             throwable);
    }
  }
  
  return kvIter;
}
 
源代码17 项目: hadoop   文件: TestShufflePlugin.java
@Override
public RawKeyValueIterator run() throws java.io.IOException, java.lang.InterruptedException{
  return null;
}
 
源代码18 项目: hadoop   文件: TestMerger.java
@SuppressWarnings( { "unchecked" })
public void testMergeShouldReturnProperProgress(
    List<Segment<Text, Text>> segments) throws IOException {
  Path tmpDir = new Path("localpath");
  Class<Text> keyClass = (Class<Text>) jobConf.getMapOutputKeyClass();
  Class<Text> valueClass = (Class<Text>) jobConf.getMapOutputValueClass();
  RawComparator<Text> comparator = jobConf.getOutputKeyComparator();
  Counter readsCounter = new Counter();
  Counter writesCounter = new Counter();
  Progress mergePhase = new Progress();
  RawKeyValueIterator mergeQueue = Merger.merge(conf, fs, keyClass,
      valueClass, segments, 2, tmpDir, comparator, getReporter(),
      readsCounter, writesCounter, mergePhase);
  final float epsilon = 0.00001f;

  // Reading 6 keys total, 3 each in 2 segments, so each key read moves the
  // progress forward 1/6th of the way. Initially the first keys from each
  // segment have been read as part of the merge setup, so progress = 2/6.
  Assert.assertEquals(2/6.0f, mergeQueue.getProgress().get(), epsilon);

  // The first next() returns one of the keys already read during merge setup
  Assert.assertTrue(mergeQueue.next());
  Assert.assertEquals(2/6.0f, mergeQueue.getProgress().get(), epsilon);

  // Subsequent next() calls should read one key and move progress
  Assert.assertTrue(mergeQueue.next());
  Assert.assertEquals(3/6.0f, mergeQueue.getProgress().get(), epsilon);
  Assert.assertTrue(mergeQueue.next());
  Assert.assertEquals(4/6.0f, mergeQueue.getProgress().get(), epsilon);

  // At this point we've exhausted all of the keys in one segment
  // so getting the next key will return the already cached key from the
  // other segment
  Assert.assertTrue(mergeQueue.next());
  Assert.assertEquals(4/6.0f, mergeQueue.getProgress().get(), epsilon);

  // Subsequent next() calls should read one key and move progress
  Assert.assertTrue(mergeQueue.next());
  Assert.assertEquals(5/6.0f, mergeQueue.getProgress().get(), epsilon);
  Assert.assertTrue(mergeQueue.next());
  Assert.assertEquals(1.0f, mergeQueue.getProgress().get(), epsilon);

  // Now there should be no more input
  Assert.assertFalse(mergeQueue.next());
  Assert.assertEquals(1.0f, mergeQueue.getProgress().get(), epsilon);
  Assert.assertTrue(mergeQueue.getKey() == null);
  Assert.assertEquals(0, mergeQueue.getValue().getData().length);
}
 
源代码19 项目: hadoop   文件: TestGridMixClasses.java
@Test (timeout=3000)
public void testLoadJobLoadReducer() throws Exception {
  LoadJob.LoadReducer test = new LoadJob.LoadReducer();

  Configuration conf = new Configuration();
  conf.setInt(JobContext.NUM_REDUCES, 2);
  CompressionEmulationUtil.setCompressionEmulationEnabled(conf, true);
  conf.setBoolean(FileOutputFormat.COMPRESS, true);

  CompressionEmulationUtil.setCompressionEmulationEnabled(conf, true);
  conf.setBoolean(MRJobConfig.MAP_OUTPUT_COMPRESS, true);
  TaskAttemptID taskid = new TaskAttemptID();

  RawKeyValueIterator input = new FakeRawKeyValueIterator();

  Counter counter = new GenericCounter();
  Counter inputValueCounter = new GenericCounter();
  LoadRecordWriter output = new LoadRecordWriter();

  OutputCommitter committer = new CustomOutputCommitter();

  StatusReporter reporter = new DummyReporter();
  RawComparator<GridmixKey> comparator = new FakeRawComparator();

  ReduceContext<GridmixKey, GridmixRecord, NullWritable, GridmixRecord> reduceContext = new ReduceContextImpl<GridmixKey, GridmixRecord, NullWritable, GridmixRecord>(
          conf, taskid, input, counter, inputValueCounter, output, committer,
          reporter, comparator, GridmixKey.class, GridmixRecord.class);
  // read for previous data
  reduceContext.nextKeyValue();
  org.apache.hadoop.mapreduce.Reducer<GridmixKey, GridmixRecord, NullWritable, GridmixRecord>.Context context = new WrappedReducer<GridmixKey, GridmixRecord, NullWritable, GridmixRecord>()
          .getReducerContext(reduceContext);

  // test.setup(context);
  test.run(context);
  // have been readed 9 records (-1 for previous)
  assertEquals(9, counter.getValue());
  assertEquals(10, inputValueCounter.getValue());
  assertEquals(1, output.getData().size());
  GridmixRecord record = output.getData().values().iterator()
          .next();

  assertEquals(1593, record.getSize());
}
 
源代码20 项目: hadoop   文件: TestGridMixClasses.java
@Test (timeout=3000)
public void testSleepReducer() throws Exception {
  Configuration conf = new Configuration();
  conf.setInt(JobContext.NUM_REDUCES, 2);
  CompressionEmulationUtil.setCompressionEmulationEnabled(conf, true);
  conf.setBoolean(FileOutputFormat.COMPRESS, true);

  CompressionEmulationUtil.setCompressionEmulationEnabled(conf, true);
  conf.setBoolean(MRJobConfig.MAP_OUTPUT_COMPRESS, true);
  TaskAttemptID taskId = new TaskAttemptID();

  RawKeyValueIterator input = new FakeRawKeyValueReducerIterator();

  Counter counter = new GenericCounter();
  Counter inputValueCounter = new GenericCounter();
  RecordWriter<NullWritable, NullWritable> output = new LoadRecordReduceWriter();

  OutputCommitter committer = new CustomOutputCommitter();

  StatusReporter reporter = new DummyReporter();
  RawComparator<GridmixKey> comparator = new FakeRawComparator();

  ReduceContext<GridmixKey, NullWritable, NullWritable, NullWritable> reducecontext = new ReduceContextImpl<GridmixKey, NullWritable, NullWritable, NullWritable>(
          conf, taskId, input, counter, inputValueCounter, output, committer,
          reporter, comparator, GridmixKey.class, NullWritable.class);
  org.apache.hadoop.mapreduce.Reducer<GridmixKey, NullWritable, NullWritable, NullWritable>.Context context = new WrappedReducer<GridmixKey, NullWritable, NullWritable, NullWritable>()
          .getReducerContext(reducecontext);

  SleepReducer test = new SleepReducer();
  long start = System.currentTimeMillis();
  test.setup(context);
  long sleeper = context.getCurrentKey().getReduceOutputBytes();
  // status has been changed
  assertEquals("Sleeping... " + sleeper + " ms left", context.getStatus());
  // should sleep 0.9 sec

  assertTrue(System.currentTimeMillis() >= (start + sleeper));
  test.cleanup(context);
  // status has been changed again

  assertEquals("Slept for " + sleeper, context.getStatus());

}
 
源代码21 项目: big-c   文件: MergeManagerImpl.java
@Override
public void merge(List<InMemoryMapOutput<K,V>> inputs) throws IOException {
  if (inputs == null || inputs.size() == 0) {
    return;
  }
  
  //name this output file same as the name of the first file that is 
  //there in the current list of inmem files (this is guaranteed to
  //be absent on the disk currently. So we don't overwrite a prev. 
  //created spill). Also we need to create the output file now since
  //it is not guaranteed that this file will be present after merge
  //is called (we delete empty files as soon as we see them
  //in the merge method)

  //figure out the mapId 
  TaskAttemptID mapId = inputs.get(0).getMapId();
  TaskID mapTaskId = mapId.getTaskID();

  List<Segment<K, V>> inMemorySegments = new ArrayList<Segment<K, V>>();
  long mergeOutputSize = 
    createInMemorySegments(inputs, inMemorySegments,0);
  int noInMemorySegments = inMemorySegments.size();

  Path outputPath = 
    mapOutputFile.getInputFileForWrite(mapTaskId,
                                       mergeOutputSize).suffix(
                                           Task.MERGED_OUTPUT_PREFIX);

  FSDataOutputStream out = CryptoUtils.wrapIfNecessary(jobConf, rfs.create(outputPath));
  Writer<K, V> writer = new Writer<K, V>(jobConf, out,
      (Class<K>) jobConf.getMapOutputKeyClass(),
      (Class<V>) jobConf.getMapOutputValueClass(), codec, null, true);

  RawKeyValueIterator rIter = null;
  CompressAwarePath compressAwarePath;
  try {
    LOG.info("Initiating in-memory merge with " + noInMemorySegments + 
             " segments...");
    
    rIter = Merger.merge(jobConf, rfs,
                         (Class<K>)jobConf.getMapOutputKeyClass(),
                         (Class<V>)jobConf.getMapOutputValueClass(),
                         inMemorySegments, inMemorySegments.size(),
                         new Path(reduceId.toString()),
                         (RawComparator<K>)jobConf.getOutputKeyComparator(),
                         reporter, spilledRecordsCounter, null, null);
    
    if (null == combinerClass) {
      Merger.writeFile(rIter, writer, reporter, jobConf);
    } else {
      combineCollector.setWriter(writer);
      combineAndSpill(rIter, reduceCombineInputCounter);
    }
    writer.close();
    compressAwarePath = new CompressAwarePath(outputPath,
        writer.getRawLength(), writer.getCompressedLength());

    LOG.info(reduceId +  
        " Merge of the " + noInMemorySegments +
        " files in-memory complete." +
        " Local file is " + outputPath + " of size " + 
        localFS.getFileStatus(outputPath).getLen());
  } catch (IOException e) { 
    //make sure that we delete the ondisk file that we created 
    //earlier when we invoked cloneFileAttributes
    localFS.delete(outputPath, true);
    throw e;
  }

  // Note the output of the merge
  closeOnDiskFile(compressAwarePath);
}
 
源代码22 项目: big-c   文件: MergeManagerImpl.java
@Override
public void merge(List<CompressAwarePath> inputs) throws IOException {
  // sanity check
  if (inputs == null || inputs.isEmpty()) {
    LOG.info("No ondisk files to merge...");
    return;
  }
  
  long approxOutputSize = 0;
  int bytesPerSum = 
    jobConf.getInt("io.bytes.per.checksum", 512);
  
  LOG.info("OnDiskMerger: We have  " + inputs.size() + 
           " map outputs on disk. Triggering merge...");
  
  // 1. Prepare the list of files to be merged. 
  for (CompressAwarePath file : inputs) {
    approxOutputSize += localFS.getFileStatus(file).getLen();
  }

  // add the checksum length
  approxOutputSize += 
    ChecksumFileSystem.getChecksumLength(approxOutputSize, bytesPerSum);

  // 2. Start the on-disk merge process
  Path outputPath = 
    localDirAllocator.getLocalPathForWrite(inputs.get(0).toString(), 
        approxOutputSize, jobConf).suffix(Task.MERGED_OUTPUT_PREFIX);

  FSDataOutputStream out = CryptoUtils.wrapIfNecessary(jobConf, rfs.create(outputPath));
  Writer<K, V> writer = new Writer<K, V>(jobConf, out,
      (Class<K>) jobConf.getMapOutputKeyClass(),
      (Class<V>) jobConf.getMapOutputValueClass(), codec, null, true);

  RawKeyValueIterator iter  = null;
  CompressAwarePath compressAwarePath;
  Path tmpDir = new Path(reduceId.toString());
  try {
    iter = Merger.merge(jobConf, rfs,
                        (Class<K>) jobConf.getMapOutputKeyClass(),
                        (Class<V>) jobConf.getMapOutputValueClass(),
                        codec, inputs.toArray(new Path[inputs.size()]), 
                        true, ioSortFactor, tmpDir, 
                        (RawComparator<K>) jobConf.getOutputKeyComparator(), 
                        reporter, spilledRecordsCounter, null, 
                        mergedMapOutputsCounter, null);

    Merger.writeFile(iter, writer, reporter, jobConf);
    writer.close();
    compressAwarePath = new CompressAwarePath(outputPath,
        writer.getRawLength(), writer.getCompressedLength());
  } catch (IOException e) {
    localFS.delete(outputPath, true);
    throw e;
  }

  closeOnDiskFile(compressAwarePath);

  LOG.info(reduceId +
      " Finished merging " + inputs.size() + 
      " map output files on disk of total-size " + 
      approxOutputSize + "." + 
      " Local output file is " + outputPath + " of size " +
      localFS.getFileStatus(outputPath).getLen());
}
 
源代码23 项目: big-c   文件: MergeManagerImpl.java
public RawKVIteratorReader(RawKeyValueIterator kvIter, long size)
    throws IOException {
  super(null, null, size, null, spilledRecordsCounter);
  this.kvIter = kvIter;
}
 
源代码24 项目: big-c   文件: Shuffle.java
@Override
public RawKeyValueIterator run() throws IOException, InterruptedException {
  // Scale the maximum events we fetch per RPC call to mitigate OOM issues
  // on the ApplicationMaster when a thundering herd of reducers fetch events
  // TODO: This should not be necessary after HADOOP-8942
  int eventsPerReducer = Math.max(MIN_EVENTS_TO_FETCH,
      MAX_RPC_OUTSTANDING_EVENTS / jobConf.getNumReduceTasks());
  int maxEventsToFetch = Math.min(MAX_EVENTS_TO_FETCH, eventsPerReducer);

  // Start the map-completion events fetcher thread
  final EventFetcher<K,V> eventFetcher = 
    new EventFetcher<K,V>(reduceId, umbilical, scheduler, this,
        maxEventsToFetch);
  eventFetcher.start();
  
  // Start the map-output fetcher threads
  boolean isLocal = localMapFiles != null;
  final int numFetchers = isLocal ? 1 :
    jobConf.getInt(MRJobConfig.SHUFFLE_PARALLEL_COPIES, 5);
  Fetcher<K,V>[] fetchers = new Fetcher[numFetchers];
  if (isLocal) {
    fetchers[0] = new LocalFetcher<K, V>(jobConf, reduceId, scheduler,
        merger, reporter, metrics, this, reduceTask.getShuffleSecret(),
        localMapFiles);
    fetchers[0].start();
  } else {
    for (int i=0; i < numFetchers; ++i) {
      fetchers[i] = new Fetcher<K,V>(jobConf, reduceId, scheduler, merger, 
                                     reporter, metrics, this, 
                                     reduceTask.getShuffleSecret());
      fetchers[i].start();
    }
  }
  
  // Wait for shuffle to complete successfully
  while (!scheduler.waitUntilDone(PROGRESS_FREQUENCY)) {
    reporter.progress();
    
    synchronized (this) {
      if (throwable != null) {
        throw new ShuffleError("error in shuffle in " + throwingThreadName,
                               throwable);
      }
    }
  }

  // Stop the event-fetcher thread
  eventFetcher.shutDown();
  
  // Stop the map-output fetcher threads
  for (Fetcher<K,V> fetcher : fetchers) {
    fetcher.shutDown();
  }
  
  // stop the scheduler
  scheduler.close();

  copyPhase.complete(); // copy is already complete
  taskStatus.setPhase(TaskStatus.Phase.SORT);
  reduceTask.statusUpdate(umbilical);

  // Finish the on-going merges...
  RawKeyValueIterator kvIter = null;
  try {
    kvIter = merger.close();
  } catch (Throwable e) {
    throw new ShuffleError("Error while doing final merge " , e);
  }

  // Sanity check
  synchronized (this) {
    if (throwable != null) {
      throw new ShuffleError("error in shuffle in " + throwingThreadName,
                             throwable);
    }
  }
  
  return kvIter;
}
 
源代码25 项目: big-c   文件: TestShufflePlugin.java
@Override
public RawKeyValueIterator run() throws java.io.IOException, java.lang.InterruptedException{
  return null;
}
 
源代码26 项目: big-c   文件: TestMerger.java
@SuppressWarnings( { "unchecked" })
public void testMergeShouldReturnProperProgress(
    List<Segment<Text, Text>> segments) throws IOException {
  Path tmpDir = new Path("localpath");
  Class<Text> keyClass = (Class<Text>) jobConf.getMapOutputKeyClass();
  Class<Text> valueClass = (Class<Text>) jobConf.getMapOutputValueClass();
  RawComparator<Text> comparator = jobConf.getOutputKeyComparator();
  Counter readsCounter = new Counter();
  Counter writesCounter = new Counter();
  Progress mergePhase = new Progress();
  RawKeyValueIterator mergeQueue = Merger.merge(conf, fs, keyClass,
      valueClass, segments, 2, tmpDir, comparator, getReporter(),
      readsCounter, writesCounter, mergePhase);
  final float epsilon = 0.00001f;

  // Reading 6 keys total, 3 each in 2 segments, so each key read moves the
  // progress forward 1/6th of the way. Initially the first keys from each
  // segment have been read as part of the merge setup, so progress = 2/6.
  Assert.assertEquals(2/6.0f, mergeQueue.getProgress().get(), epsilon);

  // The first next() returns one of the keys already read during merge setup
  Assert.assertTrue(mergeQueue.next());
  Assert.assertEquals(2/6.0f, mergeQueue.getProgress().get(), epsilon);

  // Subsequent next() calls should read one key and move progress
  Assert.assertTrue(mergeQueue.next());
  Assert.assertEquals(3/6.0f, mergeQueue.getProgress().get(), epsilon);
  Assert.assertTrue(mergeQueue.next());
  Assert.assertEquals(4/6.0f, mergeQueue.getProgress().get(), epsilon);

  // At this point we've exhausted all of the keys in one segment
  // so getting the next key will return the already cached key from the
  // other segment
  Assert.assertTrue(mergeQueue.next());
  Assert.assertEquals(4/6.0f, mergeQueue.getProgress().get(), epsilon);

  // Subsequent next() calls should read one key and move progress
  Assert.assertTrue(mergeQueue.next());
  Assert.assertEquals(5/6.0f, mergeQueue.getProgress().get(), epsilon);
  Assert.assertTrue(mergeQueue.next());
  Assert.assertEquals(1.0f, mergeQueue.getProgress().get(), epsilon);

  // Now there should be no more input
  Assert.assertFalse(mergeQueue.next());
  Assert.assertEquals(1.0f, mergeQueue.getProgress().get(), epsilon);
  Assert.assertTrue(mergeQueue.getKey() == null);
  Assert.assertEquals(0, mergeQueue.getValue().getData().length);
}
 
源代码27 项目: big-c   文件: TestGridMixClasses.java
@Test (timeout=3000)
public void testLoadJobLoadReducer() throws Exception {
  LoadJob.LoadReducer test = new LoadJob.LoadReducer();

  Configuration conf = new Configuration();
  conf.setInt(JobContext.NUM_REDUCES, 2);
  CompressionEmulationUtil.setCompressionEmulationEnabled(conf, true);
  conf.setBoolean(FileOutputFormat.COMPRESS, true);

  CompressionEmulationUtil.setCompressionEmulationEnabled(conf, true);
  conf.setBoolean(MRJobConfig.MAP_OUTPUT_COMPRESS, true);
  TaskAttemptID taskid = new TaskAttemptID();

  RawKeyValueIterator input = new FakeRawKeyValueIterator();

  Counter counter = new GenericCounter();
  Counter inputValueCounter = new GenericCounter();
  LoadRecordWriter output = new LoadRecordWriter();

  OutputCommitter committer = new CustomOutputCommitter();

  StatusReporter reporter = new DummyReporter();
  RawComparator<GridmixKey> comparator = new FakeRawComparator();

  ReduceContext<GridmixKey, GridmixRecord, NullWritable, GridmixRecord> reduceContext = new ReduceContextImpl<GridmixKey, GridmixRecord, NullWritable, GridmixRecord>(
          conf, taskid, input, counter, inputValueCounter, output, committer,
          reporter, comparator, GridmixKey.class, GridmixRecord.class);
  // read for previous data
  reduceContext.nextKeyValue();
  org.apache.hadoop.mapreduce.Reducer<GridmixKey, GridmixRecord, NullWritable, GridmixRecord>.Context context = new WrappedReducer<GridmixKey, GridmixRecord, NullWritable, GridmixRecord>()
          .getReducerContext(reduceContext);

  // test.setup(context);
  test.run(context);
  // have been readed 9 records (-1 for previous)
  assertEquals(9, counter.getValue());
  assertEquals(10, inputValueCounter.getValue());
  assertEquals(1, output.getData().size());
  GridmixRecord record = output.getData().values().iterator()
          .next();

  assertEquals(1593, record.getSize());
}
 
源代码28 项目: big-c   文件: TestGridMixClasses.java
@Test (timeout=3000)
public void testSleepReducer() throws Exception {
  Configuration conf = new Configuration();
  conf.setInt(JobContext.NUM_REDUCES, 2);
  CompressionEmulationUtil.setCompressionEmulationEnabled(conf, true);
  conf.setBoolean(FileOutputFormat.COMPRESS, true);

  CompressionEmulationUtil.setCompressionEmulationEnabled(conf, true);
  conf.setBoolean(MRJobConfig.MAP_OUTPUT_COMPRESS, true);
  TaskAttemptID taskId = new TaskAttemptID();

  RawKeyValueIterator input = new FakeRawKeyValueReducerIterator();

  Counter counter = new GenericCounter();
  Counter inputValueCounter = new GenericCounter();
  RecordWriter<NullWritable, NullWritable> output = new LoadRecordReduceWriter();

  OutputCommitter committer = new CustomOutputCommitter();

  StatusReporter reporter = new DummyReporter();
  RawComparator<GridmixKey> comparator = new FakeRawComparator();

  ReduceContext<GridmixKey, NullWritable, NullWritable, NullWritable> reducecontext = new ReduceContextImpl<GridmixKey, NullWritable, NullWritable, NullWritable>(
          conf, taskId, input, counter, inputValueCounter, output, committer,
          reporter, comparator, GridmixKey.class, NullWritable.class);
  org.apache.hadoop.mapreduce.Reducer<GridmixKey, NullWritable, NullWritable, NullWritable>.Context context = new WrappedReducer<GridmixKey, NullWritable, NullWritable, NullWritable>()
          .getReducerContext(reducecontext);

  SleepReducer test = new SleepReducer();
  long start = System.currentTimeMillis();
  test.setup(context);
  long sleeper = context.getCurrentKey().getReduceOutputBytes();
  // status has been changed
  assertEquals("Sleeping... " + sleeper + " ms left", context.getStatus());
  // should sleep 0.9 sec

  assertTrue(System.currentTimeMillis() >= (start + sleeper));
  test.cleanup(context);
  // status has been changed again

  assertEquals("Slept for " + sleeper, context.getStatus());

}
 
源代码29 项目: incubator-tez   文件: MRTask.java
protected static <INKEY,INVALUE,OUTKEY,OUTVALUE>
org.apache.hadoop.mapreduce.Reducer<INKEY,INVALUE,OUTKEY,OUTVALUE>.Context
createReduceContext(org.apache.hadoop.mapreduce.Reducer
                      <INKEY,INVALUE,OUTKEY,OUTVALUE> reducer,
                    Configuration job,
                    TaskAttemptID taskId,
                    final TezRawKeyValueIterator rIter,
                    org.apache.hadoop.mapreduce.Counter inputKeyCounter,
                    org.apache.hadoop.mapreduce.Counter inputValueCounter,
                    org.apache.hadoop.mapreduce.RecordWriter<OUTKEY,OUTVALUE> output,
                    org.apache.hadoop.mapreduce.OutputCommitter committer,
                    org.apache.hadoop.mapreduce.StatusReporter reporter,
                    RawComparator<INKEY> comparator,
                    Class<INKEY> keyClass, Class<INVALUE> valueClass
) throws IOException, InterruptedException {
  RawKeyValueIterator r =
      new RawKeyValueIterator() {

        @Override
        public boolean next() throws IOException {
          return rIter.next();
        }

        @Override
        public DataInputBuffer getValue() throws IOException {
          return rIter.getValue();
        }

        @Override
        public Progress getProgress() {
          return rIter.getProgress();
        }

        @Override
        public DataInputBuffer getKey() throws IOException {
          return rIter.getKey();
        }

        @Override
        public void close() throws IOException {
          rIter.close();
        }
      };
  org.apache.hadoop.mapreduce.ReduceContext<INKEY, INVALUE, OUTKEY, OUTVALUE>
  reduceContext =
    new ReduceContextImpl<INKEY, INVALUE, OUTKEY, OUTVALUE>(
        job,
        taskId,
        r,
        inputKeyCounter,
        inputValueCounter,
        output,
        committer,
        reporter,
        comparator,
        keyClass,
        valueClass);
  if (LOG.isDebugEnabled()) {
    LOG.debug("Using key class: " + keyClass
        + ", valueClass: " + valueClass);
  }

  org.apache.hadoop.mapreduce.Reducer<INKEY,INVALUE,OUTKEY,OUTVALUE>.Context
      reducerContext =
        new WrappedReducer<INKEY, INVALUE, OUTKEY, OUTVALUE>().getReducerContext(
            reduceContext);

  return reducerContext;
}
 
源代码30 项目: incubator-tez   文件: MRCombiner.java
private static <KEYIN, VALUEIN, KEYOUT, VALUEOUT> org.apache.hadoop.mapreduce.Reducer<KEYIN, VALUEIN, KEYOUT, VALUEOUT>.Context createReduceContext(
    Configuration conf,
    TaskAttemptID mrTaskAttemptID,
    final TezRawKeyValueIterator rawIter,
    Counter combineInputKeyCounter,
    Counter combineInputValueCounter,
    RecordWriter<KEYOUT, VALUEOUT> recordWriter,
    MRTaskReporter reporter,
    RawComparator<KEYIN> comparator,
    Class<KEYIN> keyClass,
    Class<VALUEIN> valClass) throws InterruptedException, IOException {

  RawKeyValueIterator r = new RawKeyValueIterator() {

    @Override
    public boolean next() throws IOException {
      return rawIter.next();
    }

    @Override
    public DataInputBuffer getValue() throws IOException {
      return rawIter.getValue();
    }

    @Override
    public Progress getProgress() {
      return rawIter.getProgress();
    }

    @Override
    public DataInputBuffer getKey() throws IOException {
      return rawIter.getKey();
    }

    @Override
    public void close() throws IOException {
      rawIter.close();
    }
  };

  ReduceContext<KEYIN, VALUEIN, KEYOUT, VALUEOUT> rContext = new ReduceContextImpl<KEYIN, VALUEIN, KEYOUT, VALUEOUT>(
      conf, mrTaskAttemptID, r, combineInputKeyCounter,
      combineInputValueCounter, recordWriter, null, reporter, comparator,
      keyClass, valClass);

  org.apache.hadoop.mapreduce.Reducer<KEYIN, VALUEIN, KEYOUT, VALUEOUT>.Context reducerContext = new WrappedReducer<KEYIN, VALUEIN, KEYOUT, VALUEOUT>()
      .getReducerContext(rContext);
  return reducerContext;
}
 
 类所在包
 类方法
 同包方法