下面列出了怎么用org.apache.hadoop.mapred.Merger.Segment的API类实例代码及写法,或者点击链接到github查看源代码。
public synchronized void flush() throws IOException, ClassNotFoundException,
InterruptedException {
if (numSpills > 0 && lastSpillInMem) {
// if there is already one spills, we can try to hold this last spill in
// memory.
sortReduceParts();
for (int i = 0; i < partitions; i++) {
this.inMemorySegments[i] =
new Segment<K, V>(this.reducePartitions[i].getIReader(),
true);
}
hasInMemorySpill=true;
} else {
sortAndSpill();
}
long mergeStartMilli = System.currentTimeMillis();
ProcResourceValues mergeStartProcVals = task.getCurrentProcResourceValues();
mergeParts();
long mergeEndMilli = System.currentTimeMillis();
ProcResourceValues mergeEndProcVals = task.getCurrentProcResourceValues();
mapSpillSortCounter.incMergeCounters(mergeStartProcVals, mergeEndProcVals,
mergeEndMilli - mergeStartMilli);
}
public void mark() throws IOException {
// We read one KV pair in advance in hasNext.
// If hasNext has read the next KV pair from a new segment, but the
// user has not called next() for that KV, then reset the readSegmentIndex
// to the previous segment
if (nextKVOffset == 0) {
assert (readSegmentIndex != 0);
assert (currentKVOffset != 0);
readSegmentIndex --;
}
// just drop segments before the current active segment
int i = 0;
Iterator<Segment<K,V>> itr = segmentList.iterator();
while (itr.hasNext()) {
Segment<K,V> s = itr.next();
if (i == readSegmentIndex) {
break;
}
s.close();
itr.remove();
i++;
LOG.debug("Dropping a segment");
}
// FirstSegmentOffset is the offset in the current segment from where we
// need to start reading on the next reset
firstSegmentOffset = currentKVOffset;
readSegmentIndex = 0;
LOG.debug("Setting the FirsSegmentOffset to " + currentKVOffset);
}
public void reset() throws IOException {
// Create a new segment for the previously written records only if we
// are not already in the reset mode
if (!inReset) {
if (fileCache.isActive) {
fileCache.createInDiskSegment();
} else {
memCache.createInMemorySegment();
}
}
inReset = true;
// Reset the segments to the correct position from where the next read
// should begin.
for (int i = 0; i < segmentList.size(); i++) {
Segment<K,V> s = segmentList.get(i);
if (s.inMemory()) {
int offset = (i == 0) ? firstSegmentOffset : 0;
s.getReader().reset(offset);
} else {
s.closeReader();
if (i == 0) {
s.reinitReader(firstSegmentOffset);
s.getReader().disableChecksumValidation();
}
}
}
currentKVOffset = firstSegmentOffset;
nextKVOffset = -1;
readSegmentIndex = 0;
hasMore = false;
lastSegmentEOF = false;
LOG.debug("Reset - First segment offset is " + firstSegmentOffset +
" Segment List Size is " + segmentList.size());
}
private void clearSegmentList() throws IOException {
for (Segment<K,V> segment: segmentList) {
long len = segment.getLength();
segment.close();
if (segment.inMemory()) {
memCache.unreserve(len);
}
}
segmentList.clear();
}
/**
* This method creates a memory segment from the existing buffer
* @throws IOException
*/
void createInMemorySegment () throws IOException {
// If nothing was written in this block because the record size
// was greater than the allocated block size, just return.
if (usedSize == 0) {
ramManager.unreserve(blockSize);
return;
}
// spaceAvailable would have ensured that there is enough space
// left for the EOF markers.
assert ((blockSize - usedSize) >= EOF_MARKER_SIZE);
WritableUtils.writeVInt(dataOut, IFile.EOF_MARKER);
WritableUtils.writeVInt(dataOut, IFile.EOF_MARKER);
usedSize += EOF_MARKER_SIZE;
ramManager.unreserve(blockSize - usedSize);
Reader<K, V> reader =
new org.apache.hadoop.mapreduce.task.reduce.InMemoryReader<K, V>(null,
(org.apache.hadoop.mapred.TaskAttemptID) tid,
dataOut.getData(), 0, usedSize, conf);
Segment<K, V> segment = new Segment<K, V>(reader, false);
segmentList.add(segment);
LOG.debug("Added Memory Segment to List. List Size is " +
segmentList.size());
}
void createInDiskSegment() throws IOException {
assert (writer != null);
writer.close();
Segment<K,V> s = new Segment<K, V>(conf, fs, file, null, true);
writer = null;
segmentList.add(s);
LOG.debug("Disk Segment added to List. Size is " + segmentList.size());
}
@Override
public void merge(List<InMemoryMapOutput<K, V>> inputs) throws IOException {
if (inputs == null || inputs.size() == 0) {
return;
}
TaskAttemptID dummyMapId = inputs.get(0).getMapId();
List<Segment<K, V>> inMemorySegments = new ArrayList<Segment<K, V>>();
long mergeOutputSize =
createInMemorySegments(inputs, inMemorySegments, 0);
int noInMemorySegments = inMemorySegments.size();
InMemoryMapOutput<K, V> mergedMapOutputs =
unconditionalReserve(dummyMapId, mergeOutputSize, false);
Writer<K, V> writer =
new InMemoryWriter<K, V>(mergedMapOutputs.getArrayStream());
LOG.info("Initiating Memory-to-Memory merge with " + noInMemorySegments +
" segments of total-size: " + mergeOutputSize);
RawKeyValueIterator rIter =
Merger.merge(jobConf, rfs,
(Class<K>)jobConf.getMapOutputKeyClass(),
(Class<V>)jobConf.getMapOutputValueClass(),
inMemorySegments, inMemorySegments.size(),
new Path(reduceId.toString()),
(RawComparator<K>)jobConf.getOutputKeyComparator(),
reporter, null, null, null);
Merger.writeFile(rIter, writer, reporter, jobConf);
writer.close();
LOG.info(reduceId +
" Memory-to-Memory merge of the " + noInMemorySegments +
" files in-memory complete.");
// Note the output of the merge
closeInMemoryMergedFile(mergedMapOutputs);
}
private List<Segment<Text, Text>> getUncompressedSegments() throws IOException {
List<Segment<Text, Text>> segments = new ArrayList<Segment<Text, Text>>();
for (int i = 0; i < 2; i++) {
segments.add(getUncompressedSegment(i));
}
return segments;
}
private List<Segment<Text, Text>> getCompressedSegments() throws IOException {
List<Segment<Text, Text>> segments = new ArrayList<Segment<Text, Text>>();
for (int i = 0; i < 2; i++) {
segments.add(getCompressedSegment(i));
}
return segments;
}
public void mark() throws IOException {
// We read one KV pair in advance in hasNext.
// If hasNext has read the next KV pair from a new segment, but the
// user has not called next() for that KV, then reset the readSegmentIndex
// to the previous segment
if (nextKVOffset == 0) {
assert (readSegmentIndex != 0);
assert (currentKVOffset != 0);
readSegmentIndex --;
}
// just drop segments before the current active segment
int i = 0;
Iterator<Segment<K,V>> itr = segmentList.iterator();
while (itr.hasNext()) {
Segment<K,V> s = itr.next();
if (i == readSegmentIndex) {
break;
}
s.close();
itr.remove();
i++;
LOG.debug("Dropping a segment");
}
// FirstSegmentOffset is the offset in the current segment from where we
// need to start reading on the next reset
firstSegmentOffset = currentKVOffset;
readSegmentIndex = 0;
LOG.debug("Setting the FirsSegmentOffset to " + currentKVOffset);
}
public void reset() throws IOException {
// Create a new segment for the previously written records only if we
// are not already in the reset mode
if (!inReset) {
if (fileCache.isActive) {
fileCache.createInDiskSegment();
} else {
memCache.createInMemorySegment();
}
}
inReset = true;
// Reset the segments to the correct position from where the next read
// should begin.
for (int i = 0; i < segmentList.size(); i++) {
Segment<K,V> s = segmentList.get(i);
if (s.inMemory()) {
int offset = (i == 0) ? firstSegmentOffset : 0;
s.getReader().reset(offset);
} else {
s.closeReader();
if (i == 0) {
s.reinitReader(firstSegmentOffset);
s.getReader().disableChecksumValidation();
}
}
}
currentKVOffset = firstSegmentOffset;
nextKVOffset = -1;
readSegmentIndex = 0;
hasMore = false;
lastSegmentEOF = false;
LOG.debug("Reset - First segment offset is " + firstSegmentOffset +
" Segment List Size is " + segmentList.size());
}
private void clearSegmentList() throws IOException {
for (Segment<K,V> segment: segmentList) {
long len = segment.getLength();
segment.close();
if (segment.inMemory()) {
memCache.unreserve(len);
}
}
segmentList.clear();
}
/**
* This method creates a memory segment from the existing buffer
* @throws IOException
*/
void createInMemorySegment () throws IOException {
// If nothing was written in this block because the record size
// was greater than the allocated block size, just return.
if (usedSize == 0) {
ramManager.unreserve(blockSize);
return;
}
// spaceAvailable would have ensured that there is enough space
// left for the EOF markers.
assert ((blockSize - usedSize) >= EOF_MARKER_SIZE);
WritableUtils.writeVInt(dataOut, IFile.EOF_MARKER);
WritableUtils.writeVInt(dataOut, IFile.EOF_MARKER);
usedSize += EOF_MARKER_SIZE;
ramManager.unreserve(blockSize - usedSize);
Reader<K, V> reader =
new org.apache.hadoop.mapreduce.task.reduce.InMemoryReader<K, V>(null,
(org.apache.hadoop.mapred.TaskAttemptID) tid,
dataOut.getData(), 0, usedSize, conf);
Segment<K, V> segment = new Segment<K, V>(reader, false);
segmentList.add(segment);
LOG.debug("Added Memory Segment to List. List Size is " +
segmentList.size());
}
void createInDiskSegment() throws IOException {
assert (writer != null);
writer.close();
Segment<K,V> s = new Segment<K, V>(conf, fs, file, null, true);
writer = null;
segmentList.add(s);
LOG.debug("Disk Segment added to List. Size is " + segmentList.size());
}
@Override
public void merge(List<InMemoryMapOutput<K, V>> inputs) throws IOException {
if (inputs == null || inputs.size() == 0) {
return;
}
TaskAttemptID dummyMapId = inputs.get(0).getMapId();
List<Segment<K, V>> inMemorySegments = new ArrayList<Segment<K, V>>();
long mergeOutputSize =
createInMemorySegments(inputs, inMemorySegments, 0);
int noInMemorySegments = inMemorySegments.size();
InMemoryMapOutput<K, V> mergedMapOutputs =
unconditionalReserve(dummyMapId, mergeOutputSize, false);
Writer<K, V> writer =
new InMemoryWriter<K, V>(mergedMapOutputs.getArrayStream());
LOG.info("Initiating Memory-to-Memory merge with " + noInMemorySegments +
" segments of total-size: " + mergeOutputSize);
RawKeyValueIterator rIter =
Merger.merge(jobConf, rfs,
(Class<K>)jobConf.getMapOutputKeyClass(),
(Class<V>)jobConf.getMapOutputValueClass(),
inMemorySegments, inMemorySegments.size(),
new Path(reduceId.toString()),
(RawComparator<K>)jobConf.getOutputKeyComparator(),
reporter, null, null, null);
Merger.writeFile(rIter, writer, reporter, jobConf);
writer.close();
LOG.info(reduceId +
" Memory-to-Memory merge of the " + noInMemorySegments +
" files in-memory complete.");
// Note the output of the merge
closeInMemoryMergedFile(mergedMapOutputs);
}
private List<Segment<Text, Text>> getUncompressedSegments() throws IOException {
List<Segment<Text, Text>> segments = new ArrayList<Segment<Text, Text>>();
for (int i = 0; i < 2; i++) {
segments.add(getUncompressedSegment(i));
}
return segments;
}
private List<Segment<Text, Text>> getCompressedSegments() throws IOException {
List<Segment<Text, Text>> segments = new ArrayList<Segment<Text, Text>>();
for (int i = 0; i < 2; i++) {
segments.add(getCompressedSegment(i));
}
return segments;
}
public boolean hasNext() throws IOException {
if (lastSegmentEOF) {
return false;
}
// We read the next KV from the cache to decide if there is any left.
// Since hasNext can be called several times before the actual call to
// next(), we use hasMore to avoid extra reads. hasMore is set to false
// when the user actually consumes this record in next()
if (hasMore) {
return true;
}
Segment<K,V> seg = segmentList.get(readSegmentIndex);
// Mark the current position. This would be set to currentKVOffset
// when the user consumes this record in next().
nextKVOffset = (int) seg.getActualPosition();
if (seg.nextRawKey()) {
currentKey = seg.getKey();
seg.getValue(currentValue);
hasMore = true;
return true;
} else {
if (!seg.inMemory()) {
seg.closeReader();
}
}
// If this is the last segment, mark the lastSegmentEOF flag and return
if (readSegmentIndex == segmentList.size() - 1) {
nextKVOffset = -1;
lastSegmentEOF = true;
return false;
}
nextKVOffset = 0;
readSegmentIndex ++;
Segment<K,V> nextSegment = segmentList.get(readSegmentIndex);
// We possibly are moving from a memory segment to a disk segment.
// Reset so that we do not corrupt the in-memory segment buffer.
// See HADOOP-5494
if (!nextSegment.inMemory()) {
currentValue.reset(currentDiskValue.getData(),
currentDiskValue.getLength());
nextSegment.init(null);
}
if (nextSegment.nextRawKey()) {
currentKey = nextSegment.getKey();
nextSegment.getValue(currentValue);
hasMore = true;
return true;
} else {
throw new IOException("New segment did not have even one K/V");
}
}
@Override
public void merge(List<InMemoryMapOutput<K,V>> inputs) throws IOException {
if (inputs == null || inputs.size() == 0) {
return;
}
//name this output file same as the name of the first file that is
//there in the current list of inmem files (this is guaranteed to
//be absent on the disk currently. So we don't overwrite a prev.
//created spill). Also we need to create the output file now since
//it is not guaranteed that this file will be present after merge
//is called (we delete empty files as soon as we see them
//in the merge method)
//figure out the mapId
TaskAttemptID mapId = inputs.get(0).getMapId();
TaskID mapTaskId = mapId.getTaskID();
List<Segment<K, V>> inMemorySegments = new ArrayList<Segment<K, V>>();
long mergeOutputSize =
createInMemorySegments(inputs, inMemorySegments,0);
int noInMemorySegments = inMemorySegments.size();
Path outputPath =
mapOutputFile.getInputFileForWrite(mapTaskId,
mergeOutputSize).suffix(
Task.MERGED_OUTPUT_PREFIX);
FSDataOutputStream out = CryptoUtils.wrapIfNecessary(jobConf, rfs.create(outputPath));
Writer<K, V> writer = new Writer<K, V>(jobConf, out,
(Class<K>) jobConf.getMapOutputKeyClass(),
(Class<V>) jobConf.getMapOutputValueClass(), codec, null, true);
RawKeyValueIterator rIter = null;
CompressAwarePath compressAwarePath;
try {
LOG.info("Initiating in-memory merge with " + noInMemorySegments +
" segments...");
rIter = Merger.merge(jobConf, rfs,
(Class<K>)jobConf.getMapOutputKeyClass(),
(Class<V>)jobConf.getMapOutputValueClass(),
inMemorySegments, inMemorySegments.size(),
new Path(reduceId.toString()),
(RawComparator<K>)jobConf.getOutputKeyComparator(),
reporter, spilledRecordsCounter, null, null);
if (null == combinerClass) {
Merger.writeFile(rIter, writer, reporter, jobConf);
} else {
combineCollector.setWriter(writer);
combineAndSpill(rIter, reduceCombineInputCounter);
}
writer.close();
compressAwarePath = new CompressAwarePath(outputPath,
writer.getRawLength(), writer.getCompressedLength());
LOG.info(reduceId +
" Merge of the " + noInMemorySegments +
" files in-memory complete." +
" Local file is " + outputPath + " of size " +
localFS.getFileStatus(outputPath).getLen());
} catch (IOException e) {
//make sure that we delete the ondisk file that we created
//earlier when we invoked cloneFileAttributes
localFS.delete(outputPath, true);
throw e;
}
// Note the output of the merge
closeOnDiskFile(compressAwarePath);
}
@SuppressWarnings( { "unchecked" })
public void testMergeShouldReturnProperProgress(
List<Segment<Text, Text>> segments) throws IOException {
Path tmpDir = new Path("localpath");
Class<Text> keyClass = (Class<Text>) jobConf.getMapOutputKeyClass();
Class<Text> valueClass = (Class<Text>) jobConf.getMapOutputValueClass();
RawComparator<Text> comparator = jobConf.getOutputKeyComparator();
Counter readsCounter = new Counter();
Counter writesCounter = new Counter();
Progress mergePhase = new Progress();
RawKeyValueIterator mergeQueue = Merger.merge(conf, fs, keyClass,
valueClass, segments, 2, tmpDir, comparator, getReporter(),
readsCounter, writesCounter, mergePhase);
final float epsilon = 0.00001f;
// Reading 6 keys total, 3 each in 2 segments, so each key read moves the
// progress forward 1/6th of the way. Initially the first keys from each
// segment have been read as part of the merge setup, so progress = 2/6.
Assert.assertEquals(2/6.0f, mergeQueue.getProgress().get(), epsilon);
// The first next() returns one of the keys already read during merge setup
Assert.assertTrue(mergeQueue.next());
Assert.assertEquals(2/6.0f, mergeQueue.getProgress().get(), epsilon);
// Subsequent next() calls should read one key and move progress
Assert.assertTrue(mergeQueue.next());
Assert.assertEquals(3/6.0f, mergeQueue.getProgress().get(), epsilon);
Assert.assertTrue(mergeQueue.next());
Assert.assertEquals(4/6.0f, mergeQueue.getProgress().get(), epsilon);
// At this point we've exhausted all of the keys in one segment
// so getting the next key will return the already cached key from the
// other segment
Assert.assertTrue(mergeQueue.next());
Assert.assertEquals(4/6.0f, mergeQueue.getProgress().get(), epsilon);
// Subsequent next() calls should read one key and move progress
Assert.assertTrue(mergeQueue.next());
Assert.assertEquals(5/6.0f, mergeQueue.getProgress().get(), epsilon);
Assert.assertTrue(mergeQueue.next());
Assert.assertEquals(1.0f, mergeQueue.getProgress().get(), epsilon);
// Now there should be no more input
Assert.assertFalse(mergeQueue.next());
Assert.assertEquals(1.0f, mergeQueue.getProgress().get(), epsilon);
Assert.assertTrue(mergeQueue.getKey() == null);
Assert.assertEquals(0, mergeQueue.getValue().getData().length);
}
private Segment<Text, Text> getUncompressedSegment(int i) throws IOException {
return new Segment<Text, Text>(getReader(i, false), false);
}
private Segment<Text, Text> getCompressedSegment(int i) throws IOException {
return new Segment<Text, Text>(getReader(i, true), false, 3000l);
}
public boolean hasNext() throws IOException {
if (lastSegmentEOF) {
return false;
}
// We read the next KV from the cache to decide if there is any left.
// Since hasNext can be called several times before the actual call to
// next(), we use hasMore to avoid extra reads. hasMore is set to false
// when the user actually consumes this record in next()
if (hasMore) {
return true;
}
Segment<K,V> seg = segmentList.get(readSegmentIndex);
// Mark the current position. This would be set to currentKVOffset
// when the user consumes this record in next().
nextKVOffset = (int) seg.getActualPosition();
if (seg.nextRawKey()) {
currentKey = seg.getKey();
seg.getValue(currentValue);
hasMore = true;
return true;
} else {
if (!seg.inMemory()) {
seg.closeReader();
}
}
// If this is the last segment, mark the lastSegmentEOF flag and return
if (readSegmentIndex == segmentList.size() - 1) {
nextKVOffset = -1;
lastSegmentEOF = true;
return false;
}
nextKVOffset = 0;
readSegmentIndex ++;
Segment<K,V> nextSegment = segmentList.get(readSegmentIndex);
// We possibly are moving from a memory segment to a disk segment.
// Reset so that we do not corrupt the in-memory segment buffer.
// See HADOOP-5494
if (!nextSegment.inMemory()) {
currentValue.reset(currentDiskValue.getData(),
currentDiskValue.getLength());
nextSegment.init(null);
}
if (nextSegment.nextRawKey()) {
currentKey = nextSegment.getKey();
nextSegment.getValue(currentValue);
hasMore = true;
return true;
} else {
throw new IOException("New segment did not have even one K/V");
}
}
@Override
public void merge(List<InMemoryMapOutput<K,V>> inputs) throws IOException {
if (inputs == null || inputs.size() == 0) {
return;
}
//name this output file same as the name of the first file that is
//there in the current list of inmem files (this is guaranteed to
//be absent on the disk currently. So we don't overwrite a prev.
//created spill). Also we need to create the output file now since
//it is not guaranteed that this file will be present after merge
//is called (we delete empty files as soon as we see them
//in the merge method)
//figure out the mapId
TaskAttemptID mapId = inputs.get(0).getMapId();
TaskID mapTaskId = mapId.getTaskID();
List<Segment<K, V>> inMemorySegments = new ArrayList<Segment<K, V>>();
long mergeOutputSize =
createInMemorySegments(inputs, inMemorySegments,0);
int noInMemorySegments = inMemorySegments.size();
Path outputPath =
mapOutputFile.getInputFileForWrite(mapTaskId,
mergeOutputSize).suffix(
Task.MERGED_OUTPUT_PREFIX);
FSDataOutputStream out = CryptoUtils.wrapIfNecessary(jobConf, rfs.create(outputPath));
Writer<K, V> writer = new Writer<K, V>(jobConf, out,
(Class<K>) jobConf.getMapOutputKeyClass(),
(Class<V>) jobConf.getMapOutputValueClass(), codec, null, true);
RawKeyValueIterator rIter = null;
CompressAwarePath compressAwarePath;
try {
LOG.info("Initiating in-memory merge with " + noInMemorySegments +
" segments...");
rIter = Merger.merge(jobConf, rfs,
(Class<K>)jobConf.getMapOutputKeyClass(),
(Class<V>)jobConf.getMapOutputValueClass(),
inMemorySegments, inMemorySegments.size(),
new Path(reduceId.toString()),
(RawComparator<K>)jobConf.getOutputKeyComparator(),
reporter, spilledRecordsCounter, null, null);
if (null == combinerClass) {
Merger.writeFile(rIter, writer, reporter, jobConf);
} else {
combineCollector.setWriter(writer);
combineAndSpill(rIter, reduceCombineInputCounter);
}
writer.close();
compressAwarePath = new CompressAwarePath(outputPath,
writer.getRawLength(), writer.getCompressedLength());
LOG.info(reduceId +
" Merge of the " + noInMemorySegments +
" files in-memory complete." +
" Local file is " + outputPath + " of size " +
localFS.getFileStatus(outputPath).getLen());
} catch (IOException e) {
//make sure that we delete the ondisk file that we created
//earlier when we invoked cloneFileAttributes
localFS.delete(outputPath, true);
throw e;
}
// Note the output of the merge
closeOnDiskFile(compressAwarePath);
}
@SuppressWarnings( { "unchecked" })
public void testMergeShouldReturnProperProgress(
List<Segment<Text, Text>> segments) throws IOException {
Path tmpDir = new Path("localpath");
Class<Text> keyClass = (Class<Text>) jobConf.getMapOutputKeyClass();
Class<Text> valueClass = (Class<Text>) jobConf.getMapOutputValueClass();
RawComparator<Text> comparator = jobConf.getOutputKeyComparator();
Counter readsCounter = new Counter();
Counter writesCounter = new Counter();
Progress mergePhase = new Progress();
RawKeyValueIterator mergeQueue = Merger.merge(conf, fs, keyClass,
valueClass, segments, 2, tmpDir, comparator, getReporter(),
readsCounter, writesCounter, mergePhase);
final float epsilon = 0.00001f;
// Reading 6 keys total, 3 each in 2 segments, so each key read moves the
// progress forward 1/6th of the way. Initially the first keys from each
// segment have been read as part of the merge setup, so progress = 2/6.
Assert.assertEquals(2/6.0f, mergeQueue.getProgress().get(), epsilon);
// The first next() returns one of the keys already read during merge setup
Assert.assertTrue(mergeQueue.next());
Assert.assertEquals(2/6.0f, mergeQueue.getProgress().get(), epsilon);
// Subsequent next() calls should read one key and move progress
Assert.assertTrue(mergeQueue.next());
Assert.assertEquals(3/6.0f, mergeQueue.getProgress().get(), epsilon);
Assert.assertTrue(mergeQueue.next());
Assert.assertEquals(4/6.0f, mergeQueue.getProgress().get(), epsilon);
// At this point we've exhausted all of the keys in one segment
// so getting the next key will return the already cached key from the
// other segment
Assert.assertTrue(mergeQueue.next());
Assert.assertEquals(4/6.0f, mergeQueue.getProgress().get(), epsilon);
// Subsequent next() calls should read one key and move progress
Assert.assertTrue(mergeQueue.next());
Assert.assertEquals(5/6.0f, mergeQueue.getProgress().get(), epsilon);
Assert.assertTrue(mergeQueue.next());
Assert.assertEquals(1.0f, mergeQueue.getProgress().get(), epsilon);
// Now there should be no more input
Assert.assertFalse(mergeQueue.next());
Assert.assertEquals(1.0f, mergeQueue.getProgress().get(), epsilon);
Assert.assertTrue(mergeQueue.getKey() == null);
Assert.assertEquals(0, mergeQueue.getValue().getData().length);
}
private Segment<Text, Text> getUncompressedSegment(int i) throws IOException {
return new Segment<Text, Text>(getReader(i, false), false);
}
private Segment<Text, Text> getCompressedSegment(int i) throws IOException {
return new Segment<Text, Text>(getReader(i, true), false, 3000l);
}
@SuppressWarnings("unchecked")
private void doInMemMerge() throws IOException{
if (mapOutputsFilesInMemory.size() == 0) {
return;
}
//name this output file same as the name of the first file that is
//there in the current list of inmem files (this is guaranteed to
//be absent on the disk currently. So we don't overwrite a prev.
//created spill). Also we need to create the output file now since
//it is not guaranteed that this file will be present after merge
//is called (we delete empty files as soon as we see them
//in the merge method)
//figure out the mapId
TaskID mapId = mapOutputsFilesInMemory.get(0).mapId;
List<Segment<K, V>> inMemorySegments = new ArrayList<Segment<K,V>>();
long mergeOutputSize = createInMemorySegments(inMemorySegments, 0);
int noInMemorySegments = inMemorySegments.size();
Path outputPath = mapOutputFile.getInputFileForWrite(mapId,
reduceTask.getTaskID(), mergeOutputSize);
Writer writer =
new Writer(conf, rfs, outputPath,
conf.getMapOutputKeyClass(),
conf.getMapOutputValueClass(),
codec, null);
RawKeyValueIterator rIter = null;
try {
LOG.info("Initiating in-memory merge with " + noInMemorySegments +
" segments...");
rIter = Merger.merge(conf, rfs,
(Class<K>)conf.getMapOutputKeyClass(),
(Class<V>)conf.getMapOutputValueClass(),
inMemorySegments, inMemorySegments.size(),
new Path(reduceTask.getTaskID().toString()),
conf.getOutputKeyComparator(), reporter,
spilledRecordsCounter, null);
if (combinerRunner == null) {
Merger.writeFile(rIter, writer, reporter, conf);
} else {
combineCollector.setWriter(writer);
combinerRunner.combine(rIter, combineCollector);
}
writer.close();
LOG.info(reduceTask.getTaskID() +
" Merge of the " + noInMemorySegments +
" files in-memory complete." +
" Local file is " + outputPath + " of size " +
localFileSys.getFileStatus(outputPath).getLen());
} catch (Exception e) {
//make sure that we delete the ondisk file that we created
//earlier when we invoked cloneFileAttributes
localFileSys.delete(outputPath, true);
throw (IOException)new IOException
("Intermediate merge failed").initCause(e);
}
// Note the output of the merge
FileStatus status = localFileSys.getFileStatus(outputPath);
synchronized (mapOutputFilesOnDisk) {
addToMapOutputFilesOnDisk(status);
}
}
@SuppressWarnings( { "unchecked", "deprecation" })
public BlockMapOutputBuffer(TaskUmbilicalProtocol umbilical, JobConf job,
TaskReporter reporter, MapTask task) throws IOException,
ClassNotFoundException {
this.task = task;
this.job = job;
this.reporter = reporter;
localFs = FileSystem.getLocal(job);
partitions = job.getNumReduceTasks();
indexCacheList = new ArrayList<SpillRecord>();
if (partitions > 0) {
partitioner = (Partitioner<K, V>) ReflectionUtils.newInstance(job
.getPartitionerClass(), job);
} else {
partitioner = new Partitioner() {
@Override
public int getPartition(Object key, Object value, int numPartitions) {
return -1;
}
@Override
public void configure(JobConf job) {
}
};
}
rfs = ((LocalFileSystem) localFs).getRaw();
float spillper = job.getFloat("io.sort.spill.percent", (float) 0.9);
if (spillper > (float) 1.0 || spillper < (float) 0.0) {
LOG.error("Invalid \"io.sort.spill.percent\": " + spillper);
spillper = 0.8f;
}
lastSpillInMem = job.getBoolean("mapred.map.lastspill.memory", true);
numBigRecordsWarnThreshold =
job.getInt("mapred.map.bigrecord.spill.warn.threshold", 500);
int sortmb = job.getInt("io.sort.mb", 100);
boolean localMode = job.get("mapred.job.tracker", "local").equals("local");
if (localMode) {
sortmb = job.getInt("io.sort.mb.localmode", 100);
}
if ((sortmb & 0x7FF) != sortmb) {
throw new IOException("Invalid \"io.sort.mb\": " + sortmb);
}
LOG.info("io.sort.mb = " + sortmb);
// buffers and accounting
kvBufferSize = sortmb << 20;
kvbuffer = new byte[kvBufferSize];
softBufferLimit = (int) (kvbuffer.length * spillper);
// k/v serialization
keyClass = (Class<K>) job.getMapOutputKeyClass();
valClass = (Class<V>) job.getMapOutputValueClass();
if (!BytesWritable.class.isAssignableFrom(keyClass)
|| !BytesWritable.class.isAssignableFrom(valClass)) {
throw new IOException(this.getClass().getName()
+ " only support " + BytesWritable.class.getName()
+ " as key and value classes, MapOutputKeyClass is "
+ keyClass.getName() + ", MapOutputValueClass is "
+ valClass.getName());
}
int numMappers = job.getNumMapTasks();
memoryBlockAllocator =
new MemoryBlockAllocator(kvBufferSize, softBufferLimit, numMappers,
partitions, this);
// counters
mapOutputByteCounter = reporter.getCounter(MAP_OUTPUT_BYTES);
mapOutputRecordCounter = reporter.getCounter(MAP_OUTPUT_RECORDS);
mapSpillSortCounter = new MapSpillSortCounters(reporter);
reducePartitions = new ReducePartition[partitions];
inMemorySegments = new Segment[partitions];
for (int i = 0; i < partitions; i++) {
reducePartitions[i] = new ReducePartition(i, this.memoryBlockAllocator,
this.kvbuffer, this, this.reporter);
}
// compression
if (job.getCompressMapOutput()) {
Class<? extends CompressionCodec> codecClass = job
.getMapOutputCompressorClass(DefaultCodec.class);
codec = ReflectionUtils.newInstance(codecClass, job);
}
}
@SuppressWarnings("unchecked")
private void doInMemMerge() throws IOException{
if (mapOutputsFilesInMemory.size() == 0) {
return;
}
//name this output file same as the name of the first file that is
//there in the current list of inmem files (this is guaranteed to
//be absent on the disk currently. So we don't overwrite a prev.
//created spill). Also we need to create the output file now since
//it is not guaranteed that this file will be present after merge
//is called (we delete empty files as soon as we see them
//in the merge method)
//figure out the mapId
TaskID mapId = mapOutputsFilesInMemory.get(0).mapId;
List<Segment<K, V>> inMemorySegments = new ArrayList<Segment<K,V>>();
long mergeOutputSize = createInMemorySegments(inMemorySegments, 0);
int noInMemorySegments = inMemorySegments.size();
Path outputPath = mapOutputFile.getInputFileForWrite(mapId,
reduceTask.getTaskID(), mergeOutputSize);
Writer writer =
new Writer(conf, rfs, outputPath,
conf.getMapOutputKeyClass(),
conf.getMapOutputValueClass(),
codec, null);
RawKeyValueIterator rIter = null;
try {
LOG.info("Initiating in-memory merge with " + noInMemorySegments +
" segments...");
rIter = Merger.merge(conf, rfs,
(Class<K>)conf.getMapOutputKeyClass(),
(Class<V>)conf.getMapOutputValueClass(),
inMemorySegments, inMemorySegments.size(),
new Path(reduceTask.getTaskID().toString()),
conf.getOutputKeyComparator(), reporter,
spilledRecordsCounter, null);
if (combinerRunner == null) {
Merger.writeFile(rIter, writer, reporter, conf);
} else {
combineCollector.setWriter(writer);
combinerRunner.combine(rIter, combineCollector);
}
writer.close();
LOG.info(reduceTask.getTaskID() +
" Merge of the " + noInMemorySegments +
" files in-memory complete." +
" Local file is " + outputPath + " of size " +
localFileSys.getFileStatus(outputPath).getLen());
} catch (Exception e) {
//make sure that we delete the ondisk file that we created
//earlier when we invoked cloneFileAttributes
localFileSys.delete(outputPath, true);
throw (IOException)new IOException
("Intermediate merge failed").initCause(e);
}
// Note the output of the merge
FileStatus status = localFileSys.getFileStatus(outputPath);
synchronized (mapOutputFilesOnDisk) {
addToMapOutputFilesOnDisk(status);
}
}