下面列出了org.apache.hadoop.fs.FSDataOutputStream#getPos ( ) 实例代码,或者点击链接到github查看源代码,也可以在右侧发表评论。
private void internalAppendFile(FSDataOutputStream os, File file, List<String> parentDirectories, SquallArchiveCompressor compressor, String archiveFilename) throws IOException {
final String baseFilename = file.getName().replaceAll("\\s+", "_");
final String filename = makeFilename(parentDirectories, baseFilename);
final long size = file.length();
final long timestamp = file.lastModified();
final long startOffset = os.getPos();
final InputStream is = new BufferedInputStream(new FileInputStream(file));
final String checksum;
try {
final CompressionOutputStream cos = compressor.newOutputStream(os);
final DigestOutputStream dos = new DigestOutputStream(cos, ArchiveUtils.getMD5Digest());
ByteStreams.copy(is, dos);
checksum = ArchiveUtils.toHex(dos.getMessageDigest().digest());
cos.finish();
} finally {
is.close();
}
pendingMetadataWrites.add(new FileMetadata(filename, size, timestamp, checksum, startOffset, compressor, archiveFilename));
}
private SrcFileInfo createFile(Configuration conf, FileSystem fs, Path path, int numPartitions,
int numKeysPerPartition, int startKey) throws IOException {
FSDataOutputStream outStream = fs.create(path);
int currentKey = startKey;
SrcFileInfo srcFileInfo = new SrcFileInfo();
srcFileInfo.indexedRecords = new TezIndexRecord[numPartitions];
srcFileInfo.path = path;
for (int i = 0; i < numPartitions; i++) {
long pos = outStream.getPos();
IFile.Writer writer =
new IFile.Writer(conf, outStream, IntWritable.class, IntWritable.class, null, null, null);
for (int j = 0; j < numKeysPerPartition; j++) {
writer.append(new IntWritable(currentKey), new IntWritable(currentKey));
currentKey++;
}
writer.close();
srcFileInfo.indexedRecords[i] =
new TezIndexRecord(pos, writer.getRawLength(), writer.getCompressedLength());
}
outStream.close();
return srcFileInfo;
}
void writeFile(Path file, FSDataOutputStream stm, int size)
throws IOException {
long blocksBefore = stm.getPos() / BLOCK_SIZE;
TestFileCreation.writeFile(stm, BLOCK_SIZE);
// need to make sure the full block is completely flushed to the DataNodes
// (see FSOutputSummer#flush)
stm.flush();
int blocksAfter = 0;
// wait until the block is allocated by DataStreamer
BlockLocation[] locatedBlocks;
while(blocksAfter <= blocksBefore) {
locatedBlocks = DFSClientAdapter.getDFSClient(hdfs).getBlockLocations(
file.toString(), 0L, BLOCK_SIZE*NUM_BLOCKS);
blocksAfter = locatedBlocks == null ? 0 : locatedBlocks.length;
}
}
public static long writeTrailer(FSDataOutputStream stream, ProcedureStoreTracker tracker)
throws IOException {
long offset = stream.getPos();
// Write EOF Entry
ProcedureWALEntry.newBuilder()
.setType(ProcedureWALEntry.Type.PROCEDURE_WAL_EOF)
.build().writeDelimitedTo(stream);
// Write Tracker
tracker.toProto().writeDelimitedTo(stream);
stream.write(TRAILER_VERSION);
StreamUtils.writeLong(stream, TRAILER_MAGIC);
StreamUtils.writeLong(stream, offset);
return stream.getPos() - offset;
}
public void writeToStream(FSDataOutputStream stream) throws IOException {
Stopwatch watch = Stopwatch.createStarted();
available = false;
check = ThreadLocalRandom.current().nextLong();
start = stream.getPos();
logger.debug("Writing check value {} at position {}", check, start);
stream.writeLong(check);
batch.getHeader().writeDelimitedTo(stream);
ByteBuf buf = batch.getBody();
if (buf != null) {
bodyLength = buf.capacity();
} else {
bodyLength = 0;
}
if (bodyLength > 0) {
buf.getBytes(0, stream, bodyLength);
}
stream.hsync();
FileStatus status = fs.getFileStatus(path);
long len = status.getLen();
logger.debug("After spooling batch, stream at position {}. File length {}", stream.getPos(), len);
batch.sendOk();
latch.countDown();
long t = watch.elapsed(TimeUnit.MICROSECONDS);
logger.debug("Took {} us to spool {} to disk. Rate {} mb/s", t, bodyLength, bodyLength / t);
if (buf != null) {
buf.release();
}
}
private boolean isCreateNewFile(FSDataOutputStream fsDataOutputStream, int len) {
if (fsDataOutputStream == null) {
return true;
} else {
try {
return (fsDataOutputStream.getPos() + len) > hdfsFileMaxSize;
} catch (IOException e) {
logger.error(e.getMessage(), e);
}
return true;
}
}
@SuppressWarnings("unchecked")
private static <T extends InputSplit>
SplitMetaInfo[] writeNewSplits(Configuration conf,
T[] array, FSDataOutputStream out)
throws IOException, InterruptedException {
SplitMetaInfo[] info = new SplitMetaInfo[array.length];
if (array.length != 0) {
SerializationFactory factory = new SerializationFactory(conf);
int i = 0;
int maxBlockLocations = conf.getInt(MRConfig.MAX_BLOCK_LOCATIONS_KEY,
MRConfig.MAX_BLOCK_LOCATIONS_DEFAULT);
long offset = out.getPos();
for(T split: array) {
long prevCount = out.getPos();
Text.writeString(out, split.getClass().getName());
Serializer<T> serializer =
factory.getSerializer((Class<T>) split.getClass());
serializer.open(out);
serializer.serialize(split);
long currCount = out.getPos();
String[] locations = split.getLocations();
if (locations.length > maxBlockLocations) {
LOG.warn("Max block location exceeded for split: "
+ split + " splitsize: " + locations.length +
" maxsize: " + maxBlockLocations);
locations = Arrays.copyOf(locations, maxBlockLocations);
}
info[i++] =
new JobSplit.SplitMetaInfo(
locations, offset,
split.getLength());
offset += currCount - prevCount;
}
}
return info;
}
/**
* Constructor
*
* @param fout
* FS output stream.
* @param compressionName
* Name of the compression algorithm, which will be used for all
* data blocks.
* @throws IOException
* @see Compression#getSupportedAlgorithms
*/
public Writer(FSDataOutputStream fout, String compressionName,
Configuration conf) throws IOException {
if (fout.getPos() != 0) {
throw new IOException("Output file not at zero offset.");
}
this.out = fout;
this.conf = conf;
dataIndex = new DataIndex(compressionName);
metaIndex = new MetaIndex();
fsOutputBuffer = new BytesWritable();
Magic.write(fout);
}
/**
* Constructor
*
* @param fout
* FS output stream.
* @param compressionName
* Name of the compression algorithm, which will be used for all
* data blocks.
* @throws IOException
* @see Compression#getSupportedAlgorithms
*/
public Writer(FSDataOutputStream fout, String compressionName,
Configuration conf) throws IOException {
if (fout.getPos() != 0) {
throw new IOException("Output file not at zero offset.");
}
this.out = fout;
this.conf = conf;
dataIndex = new DataIndex(compressionName);
metaIndex = new MetaIndex();
fsOutputBuffer = new BytesWritable();
Magic.write(fout);
}
private void sortAndSpill() throws IOException, ClassNotFoundException,
InterruptedException {
//approximate the length of the output file to be the length of the
//buffer + header lengths for the partitions
final long size = distanceTo(bufstart, bufend, bufvoid) +
partitions * APPROX_HEADER_LENGTH;
FSDataOutputStream out = null;
try {
// create spill file
final SpillRecord spillRec = new SpillRecord(partitions);
final Path filename =
mapOutputFile.getSpillFileForWrite(numSpills, size);
out = rfs.create(filename);
final int mstart = kvend / NMETA;
final int mend = 1 + // kvend is a valid record
(kvstart >= kvend
? kvstart
: kvmeta.capacity() + kvstart) / NMETA;
sorter.sort(MapOutputBuffer.this, mstart, mend, reporter);
int spindex = mstart;
final IndexRecord rec = new IndexRecord();
final InMemValBytes value = new InMemValBytes();
for (int i = 0; i < partitions; ++i) {
IFile.Writer<K, V> writer = null;
try {
long segmentStart = out.getPos();
FSDataOutputStream partitionOut = CryptoUtils.wrapIfNecessary(job, out);
writer = new Writer<K, V>(job, partitionOut, keyClass, valClass, codec,
spilledRecordsCounter);
if (combinerRunner == null) {
// spill directly
DataInputBuffer key = new DataInputBuffer();
while (spindex < mend &&
kvmeta.get(offsetFor(spindex % maxRec) + PARTITION) == i) {
final int kvoff = offsetFor(spindex % maxRec);
int keystart = kvmeta.get(kvoff + KEYSTART);
int valstart = kvmeta.get(kvoff + VALSTART);
key.reset(kvbuffer, keystart, valstart - keystart);
getVBytesForOffset(kvoff, value);
writer.append(key, value);
++spindex;
}
} else {
int spstart = spindex;
while (spindex < mend &&
kvmeta.get(offsetFor(spindex % maxRec)
+ PARTITION) == i) {
++spindex;
}
// Note: we would like to avoid the combiner if we've fewer
// than some threshold of records for a partition
if (spstart != spindex) {
combineCollector.setWriter(writer);
RawKeyValueIterator kvIter =
new MRResultIterator(spstart, spindex);
combinerRunner.combine(kvIter, combineCollector);
}
}
// close the writer
writer.close();
// record offsets
rec.startOffset = segmentStart;
rec.rawLength = writer.getRawLength() + CryptoUtils.cryptoPadding(job);
rec.partLength = writer.getCompressedLength() + CryptoUtils.cryptoPadding(job);
spillRec.putIndex(rec, i);
writer = null;
} finally {
if (null != writer) writer.close();
}
}
if (totalIndexCacheMemory >= indexCacheMemoryLimit) {
// create spill index file
Path indexFilename =
mapOutputFile.getSpillIndexFileForWrite(numSpills, partitions
* MAP_OUTPUT_INDEX_RECORD_LENGTH);
spillRec.writeToFile(indexFilename, job);
} else {
indexCacheList.add(spillRec);
totalIndexCacheMemory +=
spillRec.size() * MAP_OUTPUT_INDEX_RECORD_LENGTH;
}
LOG.info("Finished spill " + numSpills);
++numSpills;
} finally {
if (out != null) out.close();
}
}
/**
* Handles the degenerate case where serialization fails to fit in
* the in-memory buffer, so we must spill the record from collect
* directly to a spill file. Consider this "losing".
*/
private void spillSingleRecord(final K key, final V value,
int partition) throws IOException {
long size = kvbuffer.length + partitions * APPROX_HEADER_LENGTH;
FSDataOutputStream out = null;
try {
// create spill file
final SpillRecord spillRec = new SpillRecord(partitions);
final Path filename =
mapOutputFile.getSpillFileForWrite(numSpills, size);
out = rfs.create(filename);
// we don't run the combiner for a single record
IndexRecord rec = new IndexRecord();
for (int i = 0; i < partitions; ++i) {
IFile.Writer<K, V> writer = null;
try {
long segmentStart = out.getPos();
// Create a new codec, don't care!
FSDataOutputStream partitionOut = CryptoUtils.wrapIfNecessary(job, out);
writer = new IFile.Writer<K,V>(job, partitionOut, keyClass, valClass, codec,
spilledRecordsCounter);
if (i == partition) {
final long recordStart = out.getPos();
writer.append(key, value);
// Note that our map byte count will not be accurate with
// compression
mapOutputByteCounter.increment(out.getPos() - recordStart);
}
writer.close();
// record offsets
rec.startOffset = segmentStart;
rec.rawLength = writer.getRawLength() + CryptoUtils.cryptoPadding(job);
rec.partLength = writer.getCompressedLength() + CryptoUtils.cryptoPadding(job);
spillRec.putIndex(rec, i);
writer = null;
} catch (IOException e) {
if (null != writer) writer.close();
throw e;
}
}
if (totalIndexCacheMemory >= indexCacheMemoryLimit) {
// create spill index file
Path indexFilename =
mapOutputFile.getSpillIndexFileForWrite(numSpills, partitions
* MAP_OUTPUT_INDEX_RECORD_LENGTH);
spillRec.writeToFile(indexFilename, job);
} else {
indexCacheList.add(spillRec);
totalIndexCacheMemory +=
spillRec.size() * MAP_OUTPUT_INDEX_RECORD_LENGTH;
}
++numSpills;
} finally {
if (out != null) out.close();
}
}
/**
* Handles the degenerate case where serialization fails to fit in
* the in-memory buffer, so we must spill the record from collect
* directly to a spill file. Consider this "losing".
*/
private void spillSingleRecord(final K key, final V value,
int partition) throws IOException {
long size = kvbuffer.length + partitions * APPROX_HEADER_LENGTH;
FSDataOutputStream out = null;
try {
// create spill file
final SpillRecord spillRec = new SpillRecord(partitions);
final Path filename = mapOutputFile.getSpillFileForWrite(getTaskID(),
numSpills, size);
out = rfs.create(filename);
// we don't run the combiner for a single record
IndexRecord rec = new IndexRecord();
for (int i = 0; i < partitions; ++i) {
IFile.Writer<K, V> writer = null;
try {
long segmentStart = out.getPos();
// Create a new codec, don't care!
writer = new IFile.Writer<K,V>(job, out, keyClass, valClass, codec,
spilledRecordsCounter);
if (i == partition) {
final long recordStart = out.getPos();
writer.append(key, value);
// Note that our map byte count will not be accurate with
// compression
mapOutputByteCounter.increment(out.getPos() - recordStart);
}
writer.close();
// record offsets
rec.startOffset = segmentStart;
rec.rawLength = writer.getRawLength();
rec.partLength = writer.getCompressedLength();
spillRec.putIndex(rec, i);
writer = null;
} catch (IOException e) {
if (null != writer) writer.close();
throw e;
}
}
if (totalIndexCacheMemory >= INDEX_CACHE_MEMORY_LIMIT) {
// create spill index file
Path indexFilename = mapOutputFile.getSpillIndexFileForWrite(
getTaskID(), numSpills,
partitions * MAP_OUTPUT_INDEX_RECORD_LENGTH);
spillRec.writeToFile(indexFilename, job);
} else {
indexCacheList.add(spillRec);
totalIndexCacheMemory +=
spillRec.size() * MAP_OUTPUT_INDEX_RECORD_LENGTH;
}
++numSpills;
} finally {
if (out != null) out.close();
}
}
public CryptoFSDataOutputStream(FSDataOutputStream out, CryptoCodec codec,
int bufferSize, byte[] key, byte[] iv) throws IOException {
super(new CryptoOutputStream(out, codec, bufferSize, key, iv,
out.getPos()), null, out.getPos());
this.fsOut = out;
}
/**
* Writes the root level and intermediate levels of the block index into
* the output stream, generating the tree from bottom up. Assumes that the
* leaf level has been inline-written to the disk if there is enough data
* for more than one leaf block. We iterate by breaking the current level
* of the block index, starting with the index of all leaf-level blocks,
* into chunks small enough to be written to disk, and generate its parent
* level, until we end up with a level small enough to become the root
* level.
*
* If the leaf level is not large enough, there is no inline block index
* anymore, so we only write that level of block index to disk as the root
* level.
*
* @param out FSDataOutputStream
* @return position at which we entered the root-level index.
* @throws IOException
*/
public long writeIndexBlocks(FSDataOutputStream out) throws IOException {
if (curInlineChunk != null && curInlineChunk.getNumEntries() != 0) {
throw new IOException("Trying to write a multi-level block index, " +
"but are " + curInlineChunk.getNumEntries() + " entries in the " +
"last inline chunk.");
}
// We need to get mid-key metadata before we create intermediate
// indexes and overwrite the root chunk.
byte[] midKeyMetadata = numLevels > 1 ? rootChunk.getMidKeyMetadata()
: null;
if (curInlineChunk != null) {
while (rootChunk.getRootSize() > maxChunkSize
// HBASE-16288: if firstKey is larger than maxChunkSize we will loop indefinitely
&& rootChunk.getNumEntries() > minIndexNumEntries
// Sanity check. We will not hit this (minIndexNumEntries ^ 16) blocks can be addressed
&& numLevels < 16) {
rootChunk = writeIntermediateLevel(out, rootChunk);
numLevels += 1;
}
}
// write the root level
long rootLevelIndexPos = out.getPos();
{
DataOutput blockStream =
blockWriter.startWriting(BlockType.ROOT_INDEX);
rootChunk.writeRoot(blockStream);
if (midKeyMetadata != null)
blockStream.write(midKeyMetadata);
blockWriter.writeHeaderAndData(out);
if (cacheConf != null) {
cacheConf.getBlockCache().ifPresent(cache -> {
HFileBlock blockForCaching = blockWriter.getBlockForCaching(cacheConf);
cache.cacheBlock(new BlockCacheKey(nameForCaching, rootLevelIndexPos, true,
blockForCaching.getBlockType()), blockForCaching);
});
}
}
// Add root index block size
totalBlockOnDiskSize += blockWriter.getOnDiskSizeWithoutHeader();
totalBlockUncompressedSize +=
blockWriter.getUncompressedSizeWithoutHeader();
if (LOG.isTraceEnabled()) {
LOG.trace("Wrote a " + numLevels + "-level index with root level at pos "
+ rootLevelIndexPos + ", " + rootChunk.getNumEntries()
+ " root-level entries, " + totalNumEntries + " total entries, "
+ StringUtils.humanReadableInt(this.totalBlockOnDiskSize) +
" on-disk size, "
+ StringUtils.humanReadableInt(totalBlockUncompressedSize) +
" total uncompressed size.");
}
return rootLevelIndexPos;
}
/**
* Handles the degenerate case where serialization fails to fit in
* the in-memory buffer, so we must spill the record from collect
* directly to a spill file. Consider this "losing".
*/
private void spillSingleRecord(final Object key, final Object value,
int partition) throws IOException {
long size = kvbuffer.length + partitions * APPROX_HEADER_LENGTH;
FSDataOutputStream out = null;
try {
// create spill file
final TezSpillRecord spillRec = new TezSpillRecord(partitions);
final Path filename =
mapOutputFile.getSpillFileForWrite(numSpills, size);
spillFilePaths.put(numSpills, filename);
out = rfs.create(filename);
ensureSpillFilePermissions(filename, rfs);
// we don't run the combiner for a single record
for (int i = 0; i < partitions; ++i) {
IFile.Writer writer = null;
try {
long segmentStart = out.getPos();
// Create a new codec, don't care!
if (!sendEmptyPartitionDetails || (i == partition)) {
writer = new Writer(conf, out, keyClass, valClass, codec,
spilledRecordsCounter, null, false);
}
if (i == partition) {
final long recordStart = out.getPos();
writer.append(key, value);
// Note that our map byte count will not be accurate with
// compression
mapOutputByteCounter.increment(out.getPos() - recordStart);
}
long rawLength =0;
long partLength =0;
if (writer != null) {
writer.close();
rawLength = writer.getRawLength();
partLength = writer.getCompressedLength();
}
adjustSpillCounters(rawLength, partLength);
// record offsets
TezIndexRecord rec = new TezIndexRecord(segmentStart, rawLength, partLength);
spillRec.putIndex(rec, i);
writer = null;
} catch (IOException e) {
if (null != writer) writer.close();
throw e;
}
}
if (totalIndexCacheMemory >= indexCacheMemoryLimit) {
// create spill index file
Path indexFilename =
mapOutputFile.getSpillIndexFileForWrite(numSpills, partitions
* MAP_OUTPUT_INDEX_RECORD_LENGTH);
spillFileIndexPaths.put(numSpills, indexFilename);
spillRec.writeToFile(indexFilename, conf, localFs);
} else {
indexCacheList.add(spillRec);
totalIndexCacheMemory +=
spillRec.size() * MAP_OUTPUT_INDEX_RECORD_LENGTH;
}
++numSpills;
if (!isFinalMergeEnabled()) {
numShuffleChunks.setValue(numSpills);
} else if (numSpills > 1) {
//Increment only when there is atleast one previous spill
numAdditionalSpills.increment(1);
}
} finally {
if (out != null) out.close();
}
}
public boolean spill(boolean ignoreEmptySpills) throws IOException {
FSDataOutputStream out = null;
try {
try {
boolean ret = merger.ready();
// if merger returned false and ignore merge is true,
// then return directly without spilling
if (!ret && ignoreEmptySpills){
return false;
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
LOG.info(outputContext.getDestinationVertexName() + ": Interrupted while waiting for mergers to complete");
throw new IOInterruptedException(outputContext.getDestinationVertexName() + ": Interrupted while waiting for mergers to complete", e);
}
// create spill file
final long size = capacity +
+ (partitions * APPROX_HEADER_LENGTH);
final TezSpillRecord spillRec = new TezSpillRecord(partitions);
final Path filename =
mapOutputFile.getSpillFileForWrite(numSpills, size);
spillFilePaths.put(numSpills, filename);
out = rfs.create(filename, true, 4096);
ensureSpillFilePermissions(filename, rfs);
LOG.info(outputContext.getDestinationVertexName() + ": Spilling to " + filename.toString());
for (int i = 0; i < partitions; ++i) {
if (isThreadInterrupted()) {
return false;
}
outputContext.notifyProgress();
TezRawKeyValueIterator kvIter = merger.filter(i);
//write merged output to disk
long segmentStart = out.getPos();
Writer writer = null;
boolean hasNext = kvIter.hasNext();
if (hasNext || !sendEmptyPartitionDetails) {
writer = new Writer(conf, out, keyClass, valClass, codec,
spilledRecordsCounter, null, merger.needsRLE());
}
if (combiner == null) {
while (kvIter.next()) {
writer.append(kvIter.getKey(), kvIter.getValue());
}
} else {
if (hasNext) {
runCombineProcessor(kvIter, writer);
}
}
long rawLength = 0;
long partLength = 0;
//close
if (writer != null) {
writer.close();
rawLength = writer.getRawLength();
partLength = writer.getCompressedLength();
}
adjustSpillCounters(rawLength, partLength);
// record offsets
final TezIndexRecord rec =
new TezIndexRecord(segmentStart, rawLength, partLength);
spillRec.putIndex(rec, i);
if (!isFinalMergeEnabled() && reportPartitionStats()) {
partitionStats[i] += partLength;
}
}
Path indexFilename =
mapOutputFile.getSpillIndexFileForWrite(numSpills, partitions
* MAP_OUTPUT_INDEX_RECORD_LENGTH);
spillFileIndexPaths.put(numSpills, indexFilename);
spillRec.writeToFile(indexFilename, conf, localFs);
//TODO: honor cache limits
indexCacheList.add(spillRec);
++numSpills;
if (!isFinalMergeEnabled()) {
fileOutputByteCounter.increment(rfs.getFileStatus(filename).getLen());
//No final merge. Set the number of files offered via shuffle-handler
numShuffleChunks.setValue(numSpills);
}
return true;
} finally {
if (out != null) {
out.close();
}
}
}
/**
* Handles the degenerate case where serialization fails to fit in
* the in-memory buffer, so we must spill the record from collect
* directly to a spill file. Consider this "losing".
*/
private void spillSingleRecord(final Object key, final Object value,
int partition) throws IOException {
long size = kvbuffer.length + partitions * APPROX_HEADER_LENGTH;
FSDataOutputStream out = null;
try {
// create spill file
final TezSpillRecord spillRec = new TezSpillRecord(partitions);
final Path filename =
mapOutputFile.getSpillFileForWrite(numSpills, size);
out = rfs.create(filename);
// we don't run the combiner for a single record
for (int i = 0; i < partitions; ++i) {
IFile.Writer writer = null;
try {
long segmentStart = out.getPos();
// Create a new codec, don't care!
writer = new IFile.Writer(conf, out, keyClass, valClass, codec,
spilledRecordsCounter, null);
if (i == partition) {
final long recordStart = out.getPos();
writer.append(key, value);
// Note that our map byte count will not be accurate with
// compression
mapOutputByteCounter.increment(out.getPos() - recordStart);
}
writer.close();
if (numSpills > 0) {
additionalSpillBytesWritten.increment(writer.getCompressedLength());
numAdditionalSpills.increment(1);
outputBytesWithOverheadCounter.setValue(0);
} else {
// Set this up for the first write only. Subsequent ones will be handled in the final merge.
outputBytesWithOverheadCounter.increment(writer.getRawLength());
}
// record offsets
TezIndexRecord rec =
new TezIndexRecord(
segmentStart,
writer.getRawLength(),
writer.getCompressedLength());
spillRec.putIndex(rec, i);
writer = null;
} catch (IOException e) {
if (null != writer) writer.close();
throw e;
}
}
if (totalIndexCacheMemory >= indexCacheMemoryLimit) {
// create spill index file
Path indexFilename =
mapOutputFile.getSpillIndexFileForWrite(numSpills, partitions
* MAP_OUTPUT_INDEX_RECORD_LENGTH);
spillRec.writeToFile(indexFilename, conf);
} else {
indexCacheList.add(spillRec);
totalIndexCacheMemory +=
spillRec.size() * MAP_OUTPUT_INDEX_RECORD_LENGTH;
}
++numSpills;
} finally {
if (out != null) out.close();
}
}
private void spillSingleRecord(final Object key, final Object value,
int partition) throws IOException {
final TezSpillRecord spillRec = new TezSpillRecord(partitions);
// getSpillFileForWrite with size -1 as the serialized size of KV pair is still unknown
final Path filename = mapOutputFile.getSpillFileForWrite(numSpills, -1);
Path indexFilename =
mapOutputFile.getSpillIndexFileForWrite(numSpills, partitions
* MAP_OUTPUT_INDEX_RECORD_LENGTH);
spillFilePaths.put(numSpills, filename);
FSDataOutputStream out = rfs.create(filename, true, 4096);
ensureSpillFilePermissions(filename, rfs);
try {
LOG.info(outputContext.getDestinationVertexName() + ": Spilling to " + filename.toString() +
", indexFilename=" + indexFilename);
for (int i = 0; i < partitions; ++i) {
if (isThreadInterrupted()) {
return;
}
Writer writer = null;
try {
long segmentStart = out.getPos();
if (!sendEmptyPartitionDetails || (i == partition)) {
writer = new Writer(conf, out, keyClass, valClass, codec,
spilledRecordsCounter, null, false);
}
// we need not check for combiner since its a single record
if (i == partition) {
final long recordStart = out.getPos();
writer.append(key, value);
mapOutputRecordCounter.increment(1);
mapOutputByteCounter.increment(out.getPos() - recordStart);
}
long rawLength = 0;
long partLength = 0;
if (writer != null) {
writer.close();
rawLength = writer.getRawLength();
partLength = writer.getCompressedLength();
}
adjustSpillCounters(rawLength, partLength);
// record offsets
final TezIndexRecord rec =
new TezIndexRecord(
segmentStart, rawLength, partLength);
spillRec.putIndex(rec, i);
writer = null;
} finally {
if (null != writer) {
writer.close();
}
}
}
spillFileIndexPaths.put(numSpills, indexFilename);
spillRec.writeToFile(indexFilename, conf, localFs);
//TODO: honor cache limits
indexCacheList.add(spillRec);
++numSpills;
if (!isFinalMergeEnabled()) {
fileOutputByteCounter.increment(rfs.getFileStatus(filename).getLen());
//No final merge. Set the number of files offered via shuffle-handler
numShuffleChunks.setValue(numSpills);
}
if (pipelinedShuffle) {
sendPipelinedShuffleEvents();
}
} finally {
out.close();
}
}
/**
* Handles the degenerate case where serialization fails to fit in
* the in-memory buffer, so we must spill the record from collect
* directly to a spill file. Consider this "losing".
*/
private void spillSingleRecord(final K key, final V value,
int partition) throws IOException {
long size = kvbuffer.length + partitions * APPROX_HEADER_LENGTH;
FSDataOutputStream out = null;
try {
long spillStartMilli = System.currentTimeMillis();
ProcResourceValues spillStartProcVals = getCurrentProcResourceValues();
long spillBytes = 0;
// create spill file
final SpillRecord spillRec = new SpillRecord(partitions);
final Path filename = mapOutputFile.getSpillFileForWrite(getTaskID(),
numSpills, size);
out = rfs.create(filename);
// we don't run the combiner for a single record
IndexRecord rec = new IndexRecord();
for (int i = 0; i < partitions; ++i) {
IFile.Writer<K, V> writer = null;
try {
long segmentStart = out.getPos();
// Create a new codec, don't care!
writer = new IFile.Writer<K,V>(job, out, keyClass, valClass, codec,
spilledRecordsCounter);
if (i == partition) {
final long recordStart = out.getPos();
writer.append(key, value);
// Note that our map byte count will not be accurate with
// compression
mapOutputByteCounter.increment(out.getPos() - recordStart);
}
writer.close();
// record offsets
rec.startOffset = segmentStart;
rec.rawLength = writer.getRawLength();
rec.partLength = writer.getCompressedLength();
spillBytes += writer.getCompressedLength();
spillRec.putIndex(rec, i);
writer = null;
} catch (IOException e) {
if (null != writer) writer.close();
throw e;
}
}
if (totalIndexCacheMemory >= INDEX_CACHE_MEMORY_LIMIT) {
// create spill index file
Path indexFilename = mapOutputFile.getSpillIndexFileForWrite(
getTaskID(), numSpills,
partitions * MAP_OUTPUT_INDEX_RECORD_LENGTH);
spillRec.writeToFile(indexFilename, job);
} else {
indexCacheList.add(spillRec);
totalIndexCacheMemory +=
spillRec.size() * MAP_OUTPUT_INDEX_RECORD_LENGTH;
}
long spillEndMilli = System.currentTimeMillis();
ProcResourceValues spillEndProcVals = getCurrentProcResourceValues();
spillSortCounters.incCountersPerSpill(spillStartProcVals,
spillEndProcVals, spillEndMilli - spillStartMilli, spillBytes);
++numSpills;
} finally {
if (out != null) out.close();
}
}
protected void spill(int mstart, int mend)
throws IOException, InterruptedException {
//approximate the length of the output file to be the length of the
//buffer + header lengths for the partitions
final long size = (bufend >= bufstart
? bufend - bufstart
: (bufvoid - bufend) + bufstart) +
partitions * APPROX_HEADER_LENGTH;
FSDataOutputStream out = null;
try {
// create spill file
final TezSpillRecord spillRec = new TezSpillRecord(partitions);
final Path filename =
mapOutputFile.getSpillFileForWrite(numSpills, size);
out = rfs.create(filename);
int spindex = mstart;
final InMemValBytes value = createInMemValBytes();
for (int i = 0; i < partitions; ++i) {
IFile.Writer writer = null;
try {
long segmentStart = out.getPos();
writer = new Writer(conf, out, keyClass, valClass, codec,
spilledRecordsCounter, null);
if (combiner == null) {
// spill directly
DataInputBuffer key = new DataInputBuffer();
while (spindex < mend &&
kvmeta.get(offsetFor(spindex) + PARTITION) == i) {
final int kvoff = offsetFor(spindex);
int keystart = kvmeta.get(kvoff + KEYSTART);
int valstart = kvmeta.get(kvoff + VALSTART);
key.reset(kvbuffer, keystart, valstart - keystart);
getVBytesForOffset(kvoff, value);
writer.append(key, value);
++spindex;
}
} else {
int spstart = spindex;
while (spindex < mend &&
kvmeta.get(offsetFor(spindex)
+ PARTITION) == i) {
++spindex;
}
// Note: we would like to avoid the combiner if we've fewer
// than some threshold of records for a partition
if (spstart != spindex) {
TezRawKeyValueIterator kvIter =
new MRResultIterator(spstart, spindex);
if (LOG.isDebugEnabled()) {
LOG.debug("Running combine processor");
}
runCombineProcessor(kvIter, writer);
}
}
// close the writer
writer.close();
if (numSpills > 0) {
additionalSpillBytesWritten.increment(writer.getCompressedLength());
numAdditionalSpills.increment(1);
// Reset the value will be set during the final merge.
outputBytesWithOverheadCounter.setValue(0);
} else {
// Set this up for the first write only. Subsequent ones will be handled in the final merge.
outputBytesWithOverheadCounter.increment(writer.getRawLength());
}
// record offsets
final TezIndexRecord rec =
new TezIndexRecord(
segmentStart,
writer.getRawLength(),
writer.getCompressedLength());
spillRec.putIndex(rec, i);
writer = null;
} finally {
if (null != writer) writer.close();
}
}
if (totalIndexCacheMemory >= indexCacheMemoryLimit) {
// create spill index file
Path indexFilename =
mapOutputFile.getSpillIndexFileForWrite(numSpills, partitions
* MAP_OUTPUT_INDEX_RECORD_LENGTH);
spillRec.writeToFile(indexFilename, conf);
} else {
indexCacheList.add(spillRec);
totalIndexCacheMemory +=
spillRec.size() * MAP_OUTPUT_INDEX_RECORD_LENGTH;
}
LOG.info("Finished spill " + numSpills);
++numSpills;
} finally {
if (out != null) out.close();
}
}