类org.apache.hadoop.mapred.Merger.Segment源码实例Demo

下面列出了怎么用org.apache.hadoop.mapred.Merger.Segment的API类实例代码及写法,或者点击链接到github查看源代码。

源代码1 项目: RDFS   文件: BlockMapOutputBuffer.java
public synchronized void flush() throws IOException, ClassNotFoundException,
    InterruptedException {
  if (numSpills > 0 && lastSpillInMem) {
    // if there is already one spills, we can try to hold this last spill in
    // memory.
    sortReduceParts();
    for (int i = 0; i < partitions; i++) {
      this.inMemorySegments[i] =
          new Segment<K, V>(this.reducePartitions[i].getIReader(),
              true);
    }
    hasInMemorySpill=true;
  } else {
    sortAndSpill();      
  }
  long mergeStartMilli = System.currentTimeMillis();
  ProcResourceValues mergeStartProcVals = task.getCurrentProcResourceValues();
  mergeParts();
  long mergeEndMilli = System.currentTimeMillis();
  ProcResourceValues mergeEndProcVals = task.getCurrentProcResourceValues();
  mapSpillSortCounter.incMergeCounters(mergeStartProcVals, mergeEndProcVals,
      mergeEndMilli - mergeStartMilli);
}
 
源代码2 项目: hadoop   文件: BackupStore.java
public void mark() throws IOException {

    // We read one KV pair in advance in hasNext. 
    // If hasNext has read the next KV pair from a new segment, but the
    // user has not called next() for that KV, then reset the readSegmentIndex
    // to the previous segment

    if (nextKVOffset == 0) {
      assert (readSegmentIndex != 0);
      assert (currentKVOffset != 0);
      readSegmentIndex --;
    }

    // just drop segments before the current active segment

    int i = 0;
    Iterator<Segment<K,V>> itr = segmentList.iterator();
    while (itr.hasNext()) {
      Segment<K,V> s = itr.next();
      if (i == readSegmentIndex) {
        break;
      }
      s.close();
      itr.remove();
      i++;
      LOG.debug("Dropping a segment");
    }

    // FirstSegmentOffset is the offset in the current segment from where we
    // need to start reading on the next reset

    firstSegmentOffset = currentKVOffset;
    readSegmentIndex = 0;

    LOG.debug("Setting the FirsSegmentOffset to " + currentKVOffset);
  }
 
源代码3 项目: hadoop   文件: BackupStore.java
public void reset() throws IOException {

    // Create a new segment for the previously written records only if we
    // are not already in the reset mode
    
    if (!inReset) {
      if (fileCache.isActive) {
        fileCache.createInDiskSegment();
      } else {
        memCache.createInMemorySegment();
      }
    } 

    inReset = true;
    
    // Reset the segments to the correct position from where the next read
    // should begin. 
    for (int i = 0; i < segmentList.size(); i++) {
      Segment<K,V> s = segmentList.get(i);
      if (s.inMemory()) {
        int offset = (i == 0) ? firstSegmentOffset : 0;
        s.getReader().reset(offset);
      } else {
        s.closeReader();
        if (i == 0) {
          s.reinitReader(firstSegmentOffset);
          s.getReader().disableChecksumValidation();
        }
      }
    }
    
    currentKVOffset = firstSegmentOffset;
    nextKVOffset = -1;
    readSegmentIndex = 0;
    hasMore = false;
    lastSegmentEOF = false;

    LOG.debug("Reset - First segment offset is " + firstSegmentOffset +
        " Segment List Size is " + segmentList.size());
  }
 
源代码4 项目: hadoop   文件: BackupStore.java
private void clearSegmentList() throws IOException {
  for (Segment<K,V> segment: segmentList) {
    long len = segment.getLength();
    segment.close();
    if (segment.inMemory()) {
     memCache.unreserve(len);
    }
  }
  segmentList.clear();
}
 
源代码5 项目: hadoop   文件: BackupStore.java
/**
 * This method creates a memory segment from the existing buffer
 * @throws IOException
 */
void createInMemorySegment () throws IOException {

  // If nothing was written in this block because the record size
  // was greater than the allocated block size, just return.
  if (usedSize == 0) {
    ramManager.unreserve(blockSize);
    return;
  }

  // spaceAvailable would have ensured that there is enough space
  // left for the EOF markers.
  assert ((blockSize - usedSize) >= EOF_MARKER_SIZE);
  
  WritableUtils.writeVInt(dataOut, IFile.EOF_MARKER);
  WritableUtils.writeVInt(dataOut, IFile.EOF_MARKER);

  usedSize += EOF_MARKER_SIZE;

  ramManager.unreserve(blockSize - usedSize);

  Reader<K, V> reader = 
    new org.apache.hadoop.mapreduce.task.reduce.InMemoryReader<K, V>(null, 
        (org.apache.hadoop.mapred.TaskAttemptID) tid, 
        dataOut.getData(), 0, usedSize, conf);
  Segment<K, V> segment = new Segment<K, V>(reader, false);
  segmentList.add(segment);
  LOG.debug("Added Memory Segment to List. List Size is " + 
      segmentList.size());
}
 
源代码6 项目: hadoop   文件: BackupStore.java
void createInDiskSegment() throws IOException {
  assert (writer != null);
  writer.close();
  Segment<K,V> s = new Segment<K, V>(conf, fs, file, null, true);
  writer = null;
  segmentList.add(s);
  LOG.debug("Disk Segment added to List. Size is "  + segmentList.size());
}
 
源代码7 项目: hadoop   文件: MergeManagerImpl.java
@Override
public void merge(List<InMemoryMapOutput<K, V>> inputs) throws IOException {
  if (inputs == null || inputs.size() == 0) {
    return;
  }

  TaskAttemptID dummyMapId = inputs.get(0).getMapId(); 
  List<Segment<K, V>> inMemorySegments = new ArrayList<Segment<K, V>>();
  long mergeOutputSize = 
    createInMemorySegments(inputs, inMemorySegments, 0);
  int noInMemorySegments = inMemorySegments.size();
  
  InMemoryMapOutput<K, V> mergedMapOutputs = 
    unconditionalReserve(dummyMapId, mergeOutputSize, false);
  
  Writer<K, V> writer = 
    new InMemoryWriter<K, V>(mergedMapOutputs.getArrayStream());
  
  LOG.info("Initiating Memory-to-Memory merge with " + noInMemorySegments +
           " segments of total-size: " + mergeOutputSize);

  RawKeyValueIterator rIter = 
    Merger.merge(jobConf, rfs,
                 (Class<K>)jobConf.getMapOutputKeyClass(),
                 (Class<V>)jobConf.getMapOutputValueClass(),
                 inMemorySegments, inMemorySegments.size(),
                 new Path(reduceId.toString()),
                 (RawComparator<K>)jobConf.getOutputKeyComparator(),
                 reporter, null, null, null);
  Merger.writeFile(rIter, writer, reporter, jobConf);
  writer.close();

  LOG.info(reduceId +  
           " Memory-to-Memory merge of the " + noInMemorySegments +
           " files in-memory complete.");

  // Note the output of the merge
  closeInMemoryMergedFile(mergedMapOutputs);
}
 
源代码8 项目: hadoop   文件: TestMerger.java
private List<Segment<Text, Text>> getUncompressedSegments() throws IOException {
  List<Segment<Text, Text>> segments = new ArrayList<Segment<Text, Text>>();
  for (int i = 0; i < 2; i++) {
    segments.add(getUncompressedSegment(i));
  }
  return segments;
}
 
源代码9 项目: hadoop   文件: TestMerger.java
private List<Segment<Text, Text>> getCompressedSegments() throws IOException {
  List<Segment<Text, Text>> segments = new ArrayList<Segment<Text, Text>>();
  for (int i = 0; i < 2; i++) {
    segments.add(getCompressedSegment(i));
  }
  return segments;
}
 
源代码10 项目: big-c   文件: BackupStore.java
public void mark() throws IOException {

    // We read one KV pair in advance in hasNext. 
    // If hasNext has read the next KV pair from a new segment, but the
    // user has not called next() for that KV, then reset the readSegmentIndex
    // to the previous segment

    if (nextKVOffset == 0) {
      assert (readSegmentIndex != 0);
      assert (currentKVOffset != 0);
      readSegmentIndex --;
    }

    // just drop segments before the current active segment

    int i = 0;
    Iterator<Segment<K,V>> itr = segmentList.iterator();
    while (itr.hasNext()) {
      Segment<K,V> s = itr.next();
      if (i == readSegmentIndex) {
        break;
      }
      s.close();
      itr.remove();
      i++;
      LOG.debug("Dropping a segment");
    }

    // FirstSegmentOffset is the offset in the current segment from where we
    // need to start reading on the next reset

    firstSegmentOffset = currentKVOffset;
    readSegmentIndex = 0;

    LOG.debug("Setting the FirsSegmentOffset to " + currentKVOffset);
  }
 
源代码11 项目: big-c   文件: BackupStore.java
public void reset() throws IOException {

    // Create a new segment for the previously written records only if we
    // are not already in the reset mode
    
    if (!inReset) {
      if (fileCache.isActive) {
        fileCache.createInDiskSegment();
      } else {
        memCache.createInMemorySegment();
      }
    } 

    inReset = true;
    
    // Reset the segments to the correct position from where the next read
    // should begin. 
    for (int i = 0; i < segmentList.size(); i++) {
      Segment<K,V> s = segmentList.get(i);
      if (s.inMemory()) {
        int offset = (i == 0) ? firstSegmentOffset : 0;
        s.getReader().reset(offset);
      } else {
        s.closeReader();
        if (i == 0) {
          s.reinitReader(firstSegmentOffset);
          s.getReader().disableChecksumValidation();
        }
      }
    }
    
    currentKVOffset = firstSegmentOffset;
    nextKVOffset = -1;
    readSegmentIndex = 0;
    hasMore = false;
    lastSegmentEOF = false;

    LOG.debug("Reset - First segment offset is " + firstSegmentOffset +
        " Segment List Size is " + segmentList.size());
  }
 
源代码12 项目: big-c   文件: BackupStore.java
private void clearSegmentList() throws IOException {
  for (Segment<K,V> segment: segmentList) {
    long len = segment.getLength();
    segment.close();
    if (segment.inMemory()) {
     memCache.unreserve(len);
    }
  }
  segmentList.clear();
}
 
源代码13 项目: big-c   文件: BackupStore.java
/**
 * This method creates a memory segment from the existing buffer
 * @throws IOException
 */
void createInMemorySegment () throws IOException {

  // If nothing was written in this block because the record size
  // was greater than the allocated block size, just return.
  if (usedSize == 0) {
    ramManager.unreserve(blockSize);
    return;
  }

  // spaceAvailable would have ensured that there is enough space
  // left for the EOF markers.
  assert ((blockSize - usedSize) >= EOF_MARKER_SIZE);
  
  WritableUtils.writeVInt(dataOut, IFile.EOF_MARKER);
  WritableUtils.writeVInt(dataOut, IFile.EOF_MARKER);

  usedSize += EOF_MARKER_SIZE;

  ramManager.unreserve(blockSize - usedSize);

  Reader<K, V> reader = 
    new org.apache.hadoop.mapreduce.task.reduce.InMemoryReader<K, V>(null, 
        (org.apache.hadoop.mapred.TaskAttemptID) tid, 
        dataOut.getData(), 0, usedSize, conf);
  Segment<K, V> segment = new Segment<K, V>(reader, false);
  segmentList.add(segment);
  LOG.debug("Added Memory Segment to List. List Size is " + 
      segmentList.size());
}
 
源代码14 项目: big-c   文件: BackupStore.java
void createInDiskSegment() throws IOException {
  assert (writer != null);
  writer.close();
  Segment<K,V> s = new Segment<K, V>(conf, fs, file, null, true);
  writer = null;
  segmentList.add(s);
  LOG.debug("Disk Segment added to List. Size is "  + segmentList.size());
}
 
源代码15 项目: big-c   文件: MergeManagerImpl.java
@Override
public void merge(List<InMemoryMapOutput<K, V>> inputs) throws IOException {
  if (inputs == null || inputs.size() == 0) {
    return;
  }

  TaskAttemptID dummyMapId = inputs.get(0).getMapId(); 
  List<Segment<K, V>> inMemorySegments = new ArrayList<Segment<K, V>>();
  long mergeOutputSize = 
    createInMemorySegments(inputs, inMemorySegments, 0);
  int noInMemorySegments = inMemorySegments.size();
  
  InMemoryMapOutput<K, V> mergedMapOutputs = 
    unconditionalReserve(dummyMapId, mergeOutputSize, false);
  
  Writer<K, V> writer = 
    new InMemoryWriter<K, V>(mergedMapOutputs.getArrayStream());
  
  LOG.info("Initiating Memory-to-Memory merge with " + noInMemorySegments +
           " segments of total-size: " + mergeOutputSize);

  RawKeyValueIterator rIter = 
    Merger.merge(jobConf, rfs,
                 (Class<K>)jobConf.getMapOutputKeyClass(),
                 (Class<V>)jobConf.getMapOutputValueClass(),
                 inMemorySegments, inMemorySegments.size(),
                 new Path(reduceId.toString()),
                 (RawComparator<K>)jobConf.getOutputKeyComparator(),
                 reporter, null, null, null);
  Merger.writeFile(rIter, writer, reporter, jobConf);
  writer.close();

  LOG.info(reduceId +  
           " Memory-to-Memory merge of the " + noInMemorySegments +
           " files in-memory complete.");

  // Note the output of the merge
  closeInMemoryMergedFile(mergedMapOutputs);
}
 
源代码16 项目: big-c   文件: TestMerger.java
private List<Segment<Text, Text>> getUncompressedSegments() throws IOException {
  List<Segment<Text, Text>> segments = new ArrayList<Segment<Text, Text>>();
  for (int i = 0; i < 2; i++) {
    segments.add(getUncompressedSegment(i));
  }
  return segments;
}
 
源代码17 项目: big-c   文件: TestMerger.java
private List<Segment<Text, Text>> getCompressedSegments() throws IOException {
  List<Segment<Text, Text>> segments = new ArrayList<Segment<Text, Text>>();
  for (int i = 0; i < 2; i++) {
    segments.add(getCompressedSegment(i));
  }
  return segments;
}
 
源代码18 项目: hadoop   文件: BackupStore.java
public boolean hasNext() throws IOException {
  
  if (lastSegmentEOF) {
    return false;
  }
  
  // We read the next KV from the cache to decide if there is any left.
  // Since hasNext can be called several times before the actual call to 
  // next(), we use hasMore to avoid extra reads. hasMore is set to false
  // when the user actually consumes this record in next()

  if (hasMore) {
    return true;
  }

  Segment<K,V> seg = segmentList.get(readSegmentIndex);
  // Mark the current position. This would be set to currentKVOffset
  // when the user consumes this record in next(). 
  nextKVOffset = (int) seg.getActualPosition();
  if (seg.nextRawKey()) {
    currentKey = seg.getKey();
    seg.getValue(currentValue);
    hasMore = true;
    return true;
  } else {
    if (!seg.inMemory()) {
      seg.closeReader();
    }
  }

  // If this is the last segment, mark the lastSegmentEOF flag and return
  if (readSegmentIndex == segmentList.size() - 1) {
    nextKVOffset = -1;
    lastSegmentEOF = true;
    return false;
  }

  nextKVOffset = 0;
  readSegmentIndex ++;

  Segment<K,V> nextSegment = segmentList.get(readSegmentIndex);
  
  // We possibly are moving from a memory segment to a disk segment.
  // Reset so that we do not corrupt the in-memory segment buffer.
  // See HADOOP-5494
  
  if (!nextSegment.inMemory()) {
    currentValue.reset(currentDiskValue.getData(), 
        currentDiskValue.getLength());
    nextSegment.init(null);
  }
 
  if (nextSegment.nextRawKey()) {
    currentKey = nextSegment.getKey();
    nextSegment.getValue(currentValue);
    hasMore = true;
    return true;
  } else {
    throw new IOException("New segment did not have even one K/V");
  }
}
 
源代码19 项目: hadoop   文件: MergeManagerImpl.java
@Override
public void merge(List<InMemoryMapOutput<K,V>> inputs) throws IOException {
  if (inputs == null || inputs.size() == 0) {
    return;
  }
  
  //name this output file same as the name of the first file that is 
  //there in the current list of inmem files (this is guaranteed to
  //be absent on the disk currently. So we don't overwrite a prev. 
  //created spill). Also we need to create the output file now since
  //it is not guaranteed that this file will be present after merge
  //is called (we delete empty files as soon as we see them
  //in the merge method)

  //figure out the mapId 
  TaskAttemptID mapId = inputs.get(0).getMapId();
  TaskID mapTaskId = mapId.getTaskID();

  List<Segment<K, V>> inMemorySegments = new ArrayList<Segment<K, V>>();
  long mergeOutputSize = 
    createInMemorySegments(inputs, inMemorySegments,0);
  int noInMemorySegments = inMemorySegments.size();

  Path outputPath = 
    mapOutputFile.getInputFileForWrite(mapTaskId,
                                       mergeOutputSize).suffix(
                                           Task.MERGED_OUTPUT_PREFIX);

  FSDataOutputStream out = CryptoUtils.wrapIfNecessary(jobConf, rfs.create(outputPath));
  Writer<K, V> writer = new Writer<K, V>(jobConf, out,
      (Class<K>) jobConf.getMapOutputKeyClass(),
      (Class<V>) jobConf.getMapOutputValueClass(), codec, null, true);

  RawKeyValueIterator rIter = null;
  CompressAwarePath compressAwarePath;
  try {
    LOG.info("Initiating in-memory merge with " + noInMemorySegments + 
             " segments...");
    
    rIter = Merger.merge(jobConf, rfs,
                         (Class<K>)jobConf.getMapOutputKeyClass(),
                         (Class<V>)jobConf.getMapOutputValueClass(),
                         inMemorySegments, inMemorySegments.size(),
                         new Path(reduceId.toString()),
                         (RawComparator<K>)jobConf.getOutputKeyComparator(),
                         reporter, spilledRecordsCounter, null, null);
    
    if (null == combinerClass) {
      Merger.writeFile(rIter, writer, reporter, jobConf);
    } else {
      combineCollector.setWriter(writer);
      combineAndSpill(rIter, reduceCombineInputCounter);
    }
    writer.close();
    compressAwarePath = new CompressAwarePath(outputPath,
        writer.getRawLength(), writer.getCompressedLength());

    LOG.info(reduceId +  
        " Merge of the " + noInMemorySegments +
        " files in-memory complete." +
        " Local file is " + outputPath + " of size " + 
        localFS.getFileStatus(outputPath).getLen());
  } catch (IOException e) { 
    //make sure that we delete the ondisk file that we created 
    //earlier when we invoked cloneFileAttributes
    localFS.delete(outputPath, true);
    throw e;
  }

  // Note the output of the merge
  closeOnDiskFile(compressAwarePath);
}
 
源代码20 项目: hadoop   文件: TestMerger.java
@SuppressWarnings( { "unchecked" })
public void testMergeShouldReturnProperProgress(
    List<Segment<Text, Text>> segments) throws IOException {
  Path tmpDir = new Path("localpath");
  Class<Text> keyClass = (Class<Text>) jobConf.getMapOutputKeyClass();
  Class<Text> valueClass = (Class<Text>) jobConf.getMapOutputValueClass();
  RawComparator<Text> comparator = jobConf.getOutputKeyComparator();
  Counter readsCounter = new Counter();
  Counter writesCounter = new Counter();
  Progress mergePhase = new Progress();
  RawKeyValueIterator mergeQueue = Merger.merge(conf, fs, keyClass,
      valueClass, segments, 2, tmpDir, comparator, getReporter(),
      readsCounter, writesCounter, mergePhase);
  final float epsilon = 0.00001f;

  // Reading 6 keys total, 3 each in 2 segments, so each key read moves the
  // progress forward 1/6th of the way. Initially the first keys from each
  // segment have been read as part of the merge setup, so progress = 2/6.
  Assert.assertEquals(2/6.0f, mergeQueue.getProgress().get(), epsilon);

  // The first next() returns one of the keys already read during merge setup
  Assert.assertTrue(mergeQueue.next());
  Assert.assertEquals(2/6.0f, mergeQueue.getProgress().get(), epsilon);

  // Subsequent next() calls should read one key and move progress
  Assert.assertTrue(mergeQueue.next());
  Assert.assertEquals(3/6.0f, mergeQueue.getProgress().get(), epsilon);
  Assert.assertTrue(mergeQueue.next());
  Assert.assertEquals(4/6.0f, mergeQueue.getProgress().get(), epsilon);

  // At this point we've exhausted all of the keys in one segment
  // so getting the next key will return the already cached key from the
  // other segment
  Assert.assertTrue(mergeQueue.next());
  Assert.assertEquals(4/6.0f, mergeQueue.getProgress().get(), epsilon);

  // Subsequent next() calls should read one key and move progress
  Assert.assertTrue(mergeQueue.next());
  Assert.assertEquals(5/6.0f, mergeQueue.getProgress().get(), epsilon);
  Assert.assertTrue(mergeQueue.next());
  Assert.assertEquals(1.0f, mergeQueue.getProgress().get(), epsilon);

  // Now there should be no more input
  Assert.assertFalse(mergeQueue.next());
  Assert.assertEquals(1.0f, mergeQueue.getProgress().get(), epsilon);
  Assert.assertTrue(mergeQueue.getKey() == null);
  Assert.assertEquals(0, mergeQueue.getValue().getData().length);
}
 
源代码21 项目: hadoop   文件: TestMerger.java
private Segment<Text, Text> getUncompressedSegment(int i) throws IOException {
  return new Segment<Text, Text>(getReader(i, false), false);
}
 
源代码22 项目: hadoop   文件: TestMerger.java
private Segment<Text, Text> getCompressedSegment(int i) throws IOException {
  return new Segment<Text, Text>(getReader(i, true), false, 3000l);
}
 
源代码23 项目: big-c   文件: BackupStore.java
public boolean hasNext() throws IOException {
  
  if (lastSegmentEOF) {
    return false;
  }
  
  // We read the next KV from the cache to decide if there is any left.
  // Since hasNext can be called several times before the actual call to 
  // next(), we use hasMore to avoid extra reads. hasMore is set to false
  // when the user actually consumes this record in next()

  if (hasMore) {
    return true;
  }

  Segment<K,V> seg = segmentList.get(readSegmentIndex);
  // Mark the current position. This would be set to currentKVOffset
  // when the user consumes this record in next(). 
  nextKVOffset = (int) seg.getActualPosition();
  if (seg.nextRawKey()) {
    currentKey = seg.getKey();
    seg.getValue(currentValue);
    hasMore = true;
    return true;
  } else {
    if (!seg.inMemory()) {
      seg.closeReader();
    }
  }

  // If this is the last segment, mark the lastSegmentEOF flag and return
  if (readSegmentIndex == segmentList.size() - 1) {
    nextKVOffset = -1;
    lastSegmentEOF = true;
    return false;
  }

  nextKVOffset = 0;
  readSegmentIndex ++;

  Segment<K,V> nextSegment = segmentList.get(readSegmentIndex);
  
  // We possibly are moving from a memory segment to a disk segment.
  // Reset so that we do not corrupt the in-memory segment buffer.
  // See HADOOP-5494
  
  if (!nextSegment.inMemory()) {
    currentValue.reset(currentDiskValue.getData(), 
        currentDiskValue.getLength());
    nextSegment.init(null);
  }
 
  if (nextSegment.nextRawKey()) {
    currentKey = nextSegment.getKey();
    nextSegment.getValue(currentValue);
    hasMore = true;
    return true;
  } else {
    throw new IOException("New segment did not have even one K/V");
  }
}
 
源代码24 项目: big-c   文件: MergeManagerImpl.java
@Override
public void merge(List<InMemoryMapOutput<K,V>> inputs) throws IOException {
  if (inputs == null || inputs.size() == 0) {
    return;
  }
  
  //name this output file same as the name of the first file that is 
  //there in the current list of inmem files (this is guaranteed to
  //be absent on the disk currently. So we don't overwrite a prev. 
  //created spill). Also we need to create the output file now since
  //it is not guaranteed that this file will be present after merge
  //is called (we delete empty files as soon as we see them
  //in the merge method)

  //figure out the mapId 
  TaskAttemptID mapId = inputs.get(0).getMapId();
  TaskID mapTaskId = mapId.getTaskID();

  List<Segment<K, V>> inMemorySegments = new ArrayList<Segment<K, V>>();
  long mergeOutputSize = 
    createInMemorySegments(inputs, inMemorySegments,0);
  int noInMemorySegments = inMemorySegments.size();

  Path outputPath = 
    mapOutputFile.getInputFileForWrite(mapTaskId,
                                       mergeOutputSize).suffix(
                                           Task.MERGED_OUTPUT_PREFIX);

  FSDataOutputStream out = CryptoUtils.wrapIfNecessary(jobConf, rfs.create(outputPath));
  Writer<K, V> writer = new Writer<K, V>(jobConf, out,
      (Class<K>) jobConf.getMapOutputKeyClass(),
      (Class<V>) jobConf.getMapOutputValueClass(), codec, null, true);

  RawKeyValueIterator rIter = null;
  CompressAwarePath compressAwarePath;
  try {
    LOG.info("Initiating in-memory merge with " + noInMemorySegments + 
             " segments...");
    
    rIter = Merger.merge(jobConf, rfs,
                         (Class<K>)jobConf.getMapOutputKeyClass(),
                         (Class<V>)jobConf.getMapOutputValueClass(),
                         inMemorySegments, inMemorySegments.size(),
                         new Path(reduceId.toString()),
                         (RawComparator<K>)jobConf.getOutputKeyComparator(),
                         reporter, spilledRecordsCounter, null, null);
    
    if (null == combinerClass) {
      Merger.writeFile(rIter, writer, reporter, jobConf);
    } else {
      combineCollector.setWriter(writer);
      combineAndSpill(rIter, reduceCombineInputCounter);
    }
    writer.close();
    compressAwarePath = new CompressAwarePath(outputPath,
        writer.getRawLength(), writer.getCompressedLength());

    LOG.info(reduceId +  
        " Merge of the " + noInMemorySegments +
        " files in-memory complete." +
        " Local file is " + outputPath + " of size " + 
        localFS.getFileStatus(outputPath).getLen());
  } catch (IOException e) { 
    //make sure that we delete the ondisk file that we created 
    //earlier when we invoked cloneFileAttributes
    localFS.delete(outputPath, true);
    throw e;
  }

  // Note the output of the merge
  closeOnDiskFile(compressAwarePath);
}
 
源代码25 项目: big-c   文件: TestMerger.java
@SuppressWarnings( { "unchecked" })
public void testMergeShouldReturnProperProgress(
    List<Segment<Text, Text>> segments) throws IOException {
  Path tmpDir = new Path("localpath");
  Class<Text> keyClass = (Class<Text>) jobConf.getMapOutputKeyClass();
  Class<Text> valueClass = (Class<Text>) jobConf.getMapOutputValueClass();
  RawComparator<Text> comparator = jobConf.getOutputKeyComparator();
  Counter readsCounter = new Counter();
  Counter writesCounter = new Counter();
  Progress mergePhase = new Progress();
  RawKeyValueIterator mergeQueue = Merger.merge(conf, fs, keyClass,
      valueClass, segments, 2, tmpDir, comparator, getReporter(),
      readsCounter, writesCounter, mergePhase);
  final float epsilon = 0.00001f;

  // Reading 6 keys total, 3 each in 2 segments, so each key read moves the
  // progress forward 1/6th of the way. Initially the first keys from each
  // segment have been read as part of the merge setup, so progress = 2/6.
  Assert.assertEquals(2/6.0f, mergeQueue.getProgress().get(), epsilon);

  // The first next() returns one of the keys already read during merge setup
  Assert.assertTrue(mergeQueue.next());
  Assert.assertEquals(2/6.0f, mergeQueue.getProgress().get(), epsilon);

  // Subsequent next() calls should read one key and move progress
  Assert.assertTrue(mergeQueue.next());
  Assert.assertEquals(3/6.0f, mergeQueue.getProgress().get(), epsilon);
  Assert.assertTrue(mergeQueue.next());
  Assert.assertEquals(4/6.0f, mergeQueue.getProgress().get(), epsilon);

  // At this point we've exhausted all of the keys in one segment
  // so getting the next key will return the already cached key from the
  // other segment
  Assert.assertTrue(mergeQueue.next());
  Assert.assertEquals(4/6.0f, mergeQueue.getProgress().get(), epsilon);

  // Subsequent next() calls should read one key and move progress
  Assert.assertTrue(mergeQueue.next());
  Assert.assertEquals(5/6.0f, mergeQueue.getProgress().get(), epsilon);
  Assert.assertTrue(mergeQueue.next());
  Assert.assertEquals(1.0f, mergeQueue.getProgress().get(), epsilon);

  // Now there should be no more input
  Assert.assertFalse(mergeQueue.next());
  Assert.assertEquals(1.0f, mergeQueue.getProgress().get(), epsilon);
  Assert.assertTrue(mergeQueue.getKey() == null);
  Assert.assertEquals(0, mergeQueue.getValue().getData().length);
}
 
源代码26 项目: big-c   文件: TestMerger.java
private Segment<Text, Text> getUncompressedSegment(int i) throws IOException {
  return new Segment<Text, Text>(getReader(i, false), false);
}
 
源代码27 项目: big-c   文件: TestMerger.java
private Segment<Text, Text> getCompressedSegment(int i) throws IOException {
  return new Segment<Text, Text>(getReader(i, true), false, 3000l);
}
 
源代码28 项目: RDFS   文件: ReduceTask.java
@SuppressWarnings("unchecked")
private void doInMemMerge() throws IOException{
  if (mapOutputsFilesInMemory.size() == 0) {
    return;
  }

  //name this output file same as the name of the first file that is
  //there in the current list of inmem files (this is guaranteed to
  //be absent on the disk currently. So we don't overwrite a prev.
  //created spill). Also we need to create the output file now since
  //it is not guaranteed that this file will be present after merge
  //is called (we delete empty files as soon as we see them
  //in the merge method)

  //figure out the mapId
  TaskID mapId = mapOutputsFilesInMemory.get(0).mapId;

  List<Segment<K, V>> inMemorySegments = new ArrayList<Segment<K,V>>();
  long mergeOutputSize = createInMemorySegments(inMemorySegments, 0);
  int noInMemorySegments = inMemorySegments.size();

  Path outputPath = mapOutputFile.getInputFileForWrite(mapId,
                    reduceTask.getTaskID(), mergeOutputSize);

  Writer writer =
    new Writer(conf, rfs, outputPath,
               conf.getMapOutputKeyClass(),
               conf.getMapOutputValueClass(),
               codec, null);

  RawKeyValueIterator rIter = null;
  try {
    LOG.info("Initiating in-memory merge with " + noInMemorySegments +
             " segments...");

    rIter = Merger.merge(conf, rfs,
                         (Class<K>)conf.getMapOutputKeyClass(),
                         (Class<V>)conf.getMapOutputValueClass(),
                         inMemorySegments, inMemorySegments.size(),
                         new Path(reduceTask.getTaskID().toString()),
                         conf.getOutputKeyComparator(), reporter,
                         spilledRecordsCounter, null);

    if (combinerRunner == null) {
      Merger.writeFile(rIter, writer, reporter, conf);
    } else {
      combineCollector.setWriter(writer);
      combinerRunner.combine(rIter, combineCollector);
    }
    writer.close();

    LOG.info(reduceTask.getTaskID() +
        " Merge of the " + noInMemorySegments +
        " files in-memory complete." +
        " Local file is " + outputPath + " of size " +
        localFileSys.getFileStatus(outputPath).getLen());
  } catch (Exception e) {
    //make sure that we delete the ondisk file that we created
    //earlier when we invoked cloneFileAttributes
    localFileSys.delete(outputPath, true);
    throw (IOException)new IOException
            ("Intermediate merge failed").initCause(e);
  }

  // Note the output of the merge
  FileStatus status = localFileSys.getFileStatus(outputPath);
  synchronized (mapOutputFilesOnDisk) {
    addToMapOutputFilesOnDisk(status);
  }
}
 
源代码29 项目: RDFS   文件: BlockMapOutputBuffer.java
@SuppressWarnings( { "unchecked", "deprecation" })
public BlockMapOutputBuffer(TaskUmbilicalProtocol umbilical, JobConf job,
    TaskReporter reporter, MapTask task) throws IOException,
    ClassNotFoundException {
  this.task = task;
  this.job = job;
  this.reporter = reporter;
  localFs = FileSystem.getLocal(job);
  partitions = job.getNumReduceTasks();
  indexCacheList = new ArrayList<SpillRecord>();
  if (partitions > 0) {
    partitioner = (Partitioner<K, V>) ReflectionUtils.newInstance(job
        .getPartitionerClass(), job);
  } else {
    partitioner = new Partitioner() {
      @Override
      public int getPartition(Object key, Object value, int numPartitions) {
        return -1;
      }

      @Override
      public void configure(JobConf job) {
      }
    };
  }
  rfs = ((LocalFileSystem) localFs).getRaw();

  float spillper = job.getFloat("io.sort.spill.percent", (float) 0.9);
  if (spillper > (float) 1.0 || spillper < (float) 0.0) {
    LOG.error("Invalid \"io.sort.spill.percent\": " + spillper);
    spillper = 0.8f;
  }
  
  lastSpillInMem = job.getBoolean("mapred.map.lastspill.memory", true);
  numBigRecordsWarnThreshold =
      job.getInt("mapred.map.bigrecord.spill.warn.threshold", 500);

  int sortmb = job.getInt("io.sort.mb", 100);
  boolean localMode = job.get("mapred.job.tracker", "local").equals("local");
  if (localMode) {
    sortmb = job.getInt("io.sort.mb.localmode", 100);
  }
  if ((sortmb & 0x7FF) != sortmb) {
    throw new IOException("Invalid \"io.sort.mb\": " + sortmb);
  }
  LOG.info("io.sort.mb = " + sortmb);
  // buffers and accounting
  kvBufferSize = sortmb << 20;
  kvbuffer = new byte[kvBufferSize];
  softBufferLimit = (int) (kvbuffer.length * spillper);
  // k/v serialization
  keyClass = (Class<K>) job.getMapOutputKeyClass();
  valClass = (Class<V>) job.getMapOutputValueClass();
  if (!BytesWritable.class.isAssignableFrom(keyClass)
      || !BytesWritable.class.isAssignableFrom(valClass)) {
    throw new IOException(this.getClass().getName()
        + "  only support " + BytesWritable.class.getName()
        + " as key and value classes, MapOutputKeyClass is "
        + keyClass.getName() + ", MapOutputValueClass is "
        + valClass.getName());
  }

  int numMappers = job.getNumMapTasks();
  memoryBlockAllocator =
      new MemoryBlockAllocator(kvBufferSize, softBufferLimit, numMappers,
          partitions, this);

  // counters
  mapOutputByteCounter = reporter.getCounter(MAP_OUTPUT_BYTES);
  mapOutputRecordCounter = reporter.getCounter(MAP_OUTPUT_RECORDS);
  mapSpillSortCounter = new MapSpillSortCounters(reporter);

  reducePartitions = new ReducePartition[partitions];
  inMemorySegments = new Segment[partitions];
  for (int i = 0; i < partitions; i++) {
    reducePartitions[i] = new ReducePartition(i, this.memoryBlockAllocator,
        this.kvbuffer, this, this.reporter);
  }     
  // compression
  if (job.getCompressMapOutput()) {
    Class<? extends CompressionCodec> codecClass = job
        .getMapOutputCompressorClass(DefaultCodec.class);
    codec = ReflectionUtils.newInstance(codecClass, job);
  }
}
 
源代码30 项目: hadoop-gpu   文件: ReduceTask.java
@SuppressWarnings("unchecked")
private void doInMemMerge() throws IOException{
  if (mapOutputsFilesInMemory.size() == 0) {
    return;
  }
  
  //name this output file same as the name of the first file that is 
  //there in the current list of inmem files (this is guaranteed to
  //be absent on the disk currently. So we don't overwrite a prev. 
  //created spill). Also we need to create the output file now since
  //it is not guaranteed that this file will be present after merge
  //is called (we delete empty files as soon as we see them
  //in the merge method)

  //figure out the mapId 
  TaskID mapId = mapOutputsFilesInMemory.get(0).mapId;

  List<Segment<K, V>> inMemorySegments = new ArrayList<Segment<K,V>>();
  long mergeOutputSize = createInMemorySegments(inMemorySegments, 0);
  int noInMemorySegments = inMemorySegments.size();

  Path outputPath = mapOutputFile.getInputFileForWrite(mapId, 
                    reduceTask.getTaskID(), mergeOutputSize);

  Writer writer = 
    new Writer(conf, rfs, outputPath,
               conf.getMapOutputKeyClass(),
               conf.getMapOutputValueClass(),
               codec, null);

  RawKeyValueIterator rIter = null;
  try {
    LOG.info("Initiating in-memory merge with " + noInMemorySegments + 
             " segments...");
    
    rIter = Merger.merge(conf, rfs,
                         (Class<K>)conf.getMapOutputKeyClass(),
                         (Class<V>)conf.getMapOutputValueClass(),
                         inMemorySegments, inMemorySegments.size(),
                         new Path(reduceTask.getTaskID().toString()),
                         conf.getOutputKeyComparator(), reporter,
                         spilledRecordsCounter, null);
    
    if (combinerRunner == null) {
      Merger.writeFile(rIter, writer, reporter, conf);
    } else {
      combineCollector.setWriter(writer);
      combinerRunner.combine(rIter, combineCollector);
    }
    writer.close();

    LOG.info(reduceTask.getTaskID() + 
        " Merge of the " + noInMemorySegments +
        " files in-memory complete." +
        " Local file is " + outputPath + " of size " + 
        localFileSys.getFileStatus(outputPath).getLen());
  } catch (Exception e) { 
    //make sure that we delete the ondisk file that we created 
    //earlier when we invoked cloneFileAttributes
    localFileSys.delete(outputPath, true);
    throw (IOException)new IOException
            ("Intermediate merge failed").initCause(e);
  }

  // Note the output of the merge
  FileStatus status = localFileSys.getFileStatus(outputPath);
  synchronized (mapOutputFilesOnDisk) {
    addToMapOutputFilesOnDisk(status);
  }
}
 
 类所在包
 同包方法