org.apache.hadoop.fs.FSDataOutputStream#getPos ( )源码实例Demo

下面列出了org.apache.hadoop.fs.FSDataOutputStream#getPos ( ) 实例代码,或者点击链接到github查看源代码,也可以在右侧发表评论。

源代码1 项目: imhotep   文件: SquallArchiveWriter.java
private void internalAppendFile(FSDataOutputStream os, File file, List<String> parentDirectories, SquallArchiveCompressor compressor, String archiveFilename) throws IOException {
    final String baseFilename = file.getName().replaceAll("\\s+", "_");
    final String filename = makeFilename(parentDirectories, baseFilename);
    final long size = file.length();
    final long timestamp = file.lastModified();
    final long startOffset = os.getPos();

    final InputStream is = new BufferedInputStream(new FileInputStream(file));
    final String checksum;
    try {
        final CompressionOutputStream cos = compressor.newOutputStream(os);
        final DigestOutputStream dos = new DigestOutputStream(cos, ArchiveUtils.getMD5Digest());
        ByteStreams.copy(is, dos);
        checksum = ArchiveUtils.toHex(dos.getMessageDigest().digest());
        cos.finish();
    } finally {
        is.close();
    }

    pendingMetadataWrites.add(new FileMetadata(filename, size, timestamp, checksum, startOffset, compressor, archiveFilename));
}
 
源代码2 项目: tez   文件: TestMergeManager.java
private SrcFileInfo createFile(Configuration conf, FileSystem fs, Path path, int numPartitions,
                               int numKeysPerPartition, int startKey) throws IOException {
  FSDataOutputStream outStream = fs.create(path);
  int currentKey = startKey;
  SrcFileInfo srcFileInfo = new SrcFileInfo();
  srcFileInfo.indexedRecords = new TezIndexRecord[numPartitions];
  srcFileInfo.path = path;
  for (int i = 0; i < numPartitions; i++) {
    long pos = outStream.getPos();
    IFile.Writer writer =
        new IFile.Writer(conf, outStream, IntWritable.class, IntWritable.class, null, null, null);
    for (int j = 0; j < numKeysPerPartition; j++) {
      writer.append(new IntWritable(currentKey), new IntWritable(currentKey));
      currentKey++;
    }
    writer.close();
    srcFileInfo.indexedRecords[i] =
        new TezIndexRecord(pos, writer.getRawLength(), writer.getCompressedLength());
  }
  outStream.close();
  return srcFileInfo;
}
 
源代码3 项目: big-c   文件: TestBlockUnderConstruction.java
void writeFile(Path file, FSDataOutputStream stm, int size)
throws IOException {
  long blocksBefore = stm.getPos() / BLOCK_SIZE;
  
  TestFileCreation.writeFile(stm, BLOCK_SIZE);
  // need to make sure the full block is completely flushed to the DataNodes
  // (see FSOutputSummer#flush)
  stm.flush();
  int blocksAfter = 0;
  // wait until the block is allocated by DataStreamer
  BlockLocation[] locatedBlocks;
  while(blocksAfter <= blocksBefore) {
    locatedBlocks = DFSClientAdapter.getDFSClient(hdfs).getBlockLocations(
        file.toString(), 0L, BLOCK_SIZE*NUM_BLOCKS);
    blocksAfter = locatedBlocks == null ? 0 : locatedBlocks.length;
  }
}
 
源代码4 项目: hbase   文件: ProcedureWALFormat.java
public static long writeTrailer(FSDataOutputStream stream, ProcedureStoreTracker tracker)
    throws IOException {
  long offset = stream.getPos();

  // Write EOF Entry
  ProcedureWALEntry.newBuilder()
    .setType(ProcedureWALEntry.Type.PROCEDURE_WAL_EOF)
    .build().writeDelimitedTo(stream);

  // Write Tracker
  tracker.toProto().writeDelimitedTo(stream);

  stream.write(TRAILER_VERSION);
  StreamUtils.writeLong(stream, TRAILER_MAGIC);
  StreamUtils.writeLong(stream, offset);
  return stream.getPos() - offset;
}
 
源代码5 项目: Bats   文件: SpoolingRawBatchBuffer.java
public void writeToStream(FSDataOutputStream stream) throws IOException {
  Stopwatch watch = Stopwatch.createStarted();
  available = false;
  check = ThreadLocalRandom.current().nextLong();
  start = stream.getPos();
  logger.debug("Writing check value {} at position {}", check, start);
  stream.writeLong(check);
  batch.getHeader().writeDelimitedTo(stream);
  ByteBuf buf = batch.getBody();
  if (buf != null) {
    bodyLength = buf.capacity();
  } else {
    bodyLength = 0;
  }
  if (bodyLength > 0) {
    buf.getBytes(0, stream, bodyLength);
  }
  stream.hsync();
  FileStatus status = fs.getFileStatus(path);
  long len = status.getLen();
  logger.debug("After spooling batch, stream at position {}. File length {}", stream.getPos(), len);
  batch.sendOk();
  latch.countDown();
  long t = watch.elapsed(TimeUnit.MICROSECONDS);
  logger.debug("Took {} us to spool {} to disk. Rate {} mb/s", t, bodyLength, bodyLength / t);
  if (buf != null) {
    buf.release();
  }
}
 
源代码6 项目: DBus   文件: DefaultPullHandler.java
private boolean isCreateNewFile(FSDataOutputStream fsDataOutputStream, int len) {
    if (fsDataOutputStream == null) {
        return true;
    } else {
        try {
            return (fsDataOutputStream.getPos() + len) > hdfsFileMaxSize;
        } catch (IOException e) {
            logger.error(e.getMessage(), e);
        }
        return true;
    }
}
 
源代码7 项目: hadoop   文件: JobSplitWriter.java
@SuppressWarnings("unchecked")
private static <T extends InputSplit> 
SplitMetaInfo[] writeNewSplits(Configuration conf, 
    T[] array, FSDataOutputStream out)
throws IOException, InterruptedException {

  SplitMetaInfo[] info = new SplitMetaInfo[array.length];
  if (array.length != 0) {
    SerializationFactory factory = new SerializationFactory(conf);
    int i = 0;
    int maxBlockLocations = conf.getInt(MRConfig.MAX_BLOCK_LOCATIONS_KEY,
        MRConfig.MAX_BLOCK_LOCATIONS_DEFAULT);
    long offset = out.getPos();
    for(T split: array) {
      long prevCount = out.getPos();
      Text.writeString(out, split.getClass().getName());
      Serializer<T> serializer = 
        factory.getSerializer((Class<T>) split.getClass());
      serializer.open(out);
      serializer.serialize(split);
      long currCount = out.getPos();
      String[] locations = split.getLocations();
      if (locations.length > maxBlockLocations) {
        LOG.warn("Max block location exceeded for split: "
            + split + " splitsize: " + locations.length +
            " maxsize: " + maxBlockLocations);
        locations = Arrays.copyOf(locations, maxBlockLocations);
      }
      info[i++] = 
        new JobSplit.SplitMetaInfo( 
            locations, offset,
            split.getLength());
      offset += currCount - prevCount;
    }
  }
  return info;
}
 
源代码8 项目: big-c   文件: BCFile.java
/**
 * Constructor
 * 
 * @param fout
 *          FS output stream.
 * @param compressionName
 *          Name of the compression algorithm, which will be used for all
 *          data blocks.
 * @throws IOException
 * @see Compression#getSupportedAlgorithms
 */
public Writer(FSDataOutputStream fout, String compressionName,
    Configuration conf) throws IOException {
  if (fout.getPos() != 0) {
    throw new IOException("Output file not at zero offset.");
  }

  this.out = fout;
  this.conf = conf;
  dataIndex = new DataIndex(compressionName);
  metaIndex = new MetaIndex();
  fsOutputBuffer = new BytesWritable();
  Magic.write(fout);
}
 
源代码9 项目: attic-apex-malhar   文件: DTBCFile.java
/**
 * Constructor
 *
 * @param fout
 *          FS output stream.
 * @param compressionName
 *          Name of the compression algorithm, which will be used for all
 *          data blocks.
 * @throws IOException
 * @see Compression#getSupportedAlgorithms
 */
public Writer(FSDataOutputStream fout, String compressionName,
    Configuration conf) throws IOException {
  if (fout.getPos() != 0) {
    throw new IOException("Output file not at zero offset.");
  }

  this.out = fout;
  this.conf = conf;
  dataIndex = new DataIndex(compressionName);
  metaIndex = new MetaIndex();
  fsOutputBuffer = new BytesWritable();
  Magic.write(fout);
}
 
源代码10 项目: hadoop   文件: MapTask.java
private void sortAndSpill() throws IOException, ClassNotFoundException,
                                   InterruptedException {
  //approximate the length of the output file to be the length of the
  //buffer + header lengths for the partitions
  final long size = distanceTo(bufstart, bufend, bufvoid) +
              partitions * APPROX_HEADER_LENGTH;
  FSDataOutputStream out = null;
  try {
    // create spill file
    final SpillRecord spillRec = new SpillRecord(partitions);
    final Path filename =
        mapOutputFile.getSpillFileForWrite(numSpills, size);
    out = rfs.create(filename);

    final int mstart = kvend / NMETA;
    final int mend = 1 + // kvend is a valid record
      (kvstart >= kvend
      ? kvstart
      : kvmeta.capacity() + kvstart) / NMETA;
    sorter.sort(MapOutputBuffer.this, mstart, mend, reporter);
    int spindex = mstart;
    final IndexRecord rec = new IndexRecord();
    final InMemValBytes value = new InMemValBytes();
    for (int i = 0; i < partitions; ++i) {
      IFile.Writer<K, V> writer = null;
      try {
        long segmentStart = out.getPos();
        FSDataOutputStream partitionOut = CryptoUtils.wrapIfNecessary(job, out);
        writer = new Writer<K, V>(job, partitionOut, keyClass, valClass, codec,
                                  spilledRecordsCounter);
        if (combinerRunner == null) {
          // spill directly
          DataInputBuffer key = new DataInputBuffer();
          while (spindex < mend &&
              kvmeta.get(offsetFor(spindex % maxRec) + PARTITION) == i) {
            final int kvoff = offsetFor(spindex % maxRec);
            int keystart = kvmeta.get(kvoff + KEYSTART);
            int valstart = kvmeta.get(kvoff + VALSTART);
            key.reset(kvbuffer, keystart, valstart - keystart);
            getVBytesForOffset(kvoff, value);
            writer.append(key, value);
            ++spindex;
          }
        } else {
          int spstart = spindex;
          while (spindex < mend &&
              kvmeta.get(offsetFor(spindex % maxRec)
                        + PARTITION) == i) {
            ++spindex;
          }
          // Note: we would like to avoid the combiner if we've fewer
          // than some threshold of records for a partition
          if (spstart != spindex) {
            combineCollector.setWriter(writer);
            RawKeyValueIterator kvIter =
              new MRResultIterator(spstart, spindex);
            combinerRunner.combine(kvIter, combineCollector);
          }
        }

        // close the writer
        writer.close();

        // record offsets
        rec.startOffset = segmentStart;
        rec.rawLength = writer.getRawLength() + CryptoUtils.cryptoPadding(job);
        rec.partLength = writer.getCompressedLength() + CryptoUtils.cryptoPadding(job);
        spillRec.putIndex(rec, i);

        writer = null;
      } finally {
        if (null != writer) writer.close();
      }
    }

    if (totalIndexCacheMemory >= indexCacheMemoryLimit) {
      // create spill index file
      Path indexFilename =
          mapOutputFile.getSpillIndexFileForWrite(numSpills, partitions
              * MAP_OUTPUT_INDEX_RECORD_LENGTH);
      spillRec.writeToFile(indexFilename, job);
    } else {
      indexCacheList.add(spillRec);
      totalIndexCacheMemory +=
        spillRec.size() * MAP_OUTPUT_INDEX_RECORD_LENGTH;
    }
    LOG.info("Finished spill " + numSpills);
    ++numSpills;
  } finally {
    if (out != null) out.close();
  }
}
 
源代码11 项目: hadoop   文件: MapTask.java
/**
 * Handles the degenerate case where serialization fails to fit in
 * the in-memory buffer, so we must spill the record from collect
 * directly to a spill file. Consider this "losing".
 */
private void spillSingleRecord(final K key, final V value,
                               int partition) throws IOException {
  long size = kvbuffer.length + partitions * APPROX_HEADER_LENGTH;
  FSDataOutputStream out = null;
  try {
    // create spill file
    final SpillRecord spillRec = new SpillRecord(partitions);
    final Path filename =
        mapOutputFile.getSpillFileForWrite(numSpills, size);
    out = rfs.create(filename);

    // we don't run the combiner for a single record
    IndexRecord rec = new IndexRecord();
    for (int i = 0; i < partitions; ++i) {
      IFile.Writer<K, V> writer = null;
      try {
        long segmentStart = out.getPos();
        // Create a new codec, don't care!
        FSDataOutputStream partitionOut = CryptoUtils.wrapIfNecessary(job, out);
        writer = new IFile.Writer<K,V>(job, partitionOut, keyClass, valClass, codec,
                                        spilledRecordsCounter);

        if (i == partition) {
          final long recordStart = out.getPos();
          writer.append(key, value);
          // Note that our map byte count will not be accurate with
          // compression
          mapOutputByteCounter.increment(out.getPos() - recordStart);
        }
        writer.close();

        // record offsets
        rec.startOffset = segmentStart;
        rec.rawLength = writer.getRawLength() + CryptoUtils.cryptoPadding(job);
        rec.partLength = writer.getCompressedLength() + CryptoUtils.cryptoPadding(job);
        spillRec.putIndex(rec, i);

        writer = null;
      } catch (IOException e) {
        if (null != writer) writer.close();
        throw e;
      }
    }
    if (totalIndexCacheMemory >= indexCacheMemoryLimit) {
      // create spill index file
      Path indexFilename =
          mapOutputFile.getSpillIndexFileForWrite(numSpills, partitions
              * MAP_OUTPUT_INDEX_RECORD_LENGTH);
      spillRec.writeToFile(indexFilename, job);
    } else {
      indexCacheList.add(spillRec);
      totalIndexCacheMemory +=
        spillRec.size() * MAP_OUTPUT_INDEX_RECORD_LENGTH;
    }
    ++numSpills;
  } finally {
    if (out != null) out.close();
  }
}
 
源代码12 项目: hadoop-gpu   文件: MapTask.java
/**
 * Handles the degenerate case where serialization fails to fit in
 * the in-memory buffer, so we must spill the record from collect
 * directly to a spill file. Consider this "losing".
 */
private void spillSingleRecord(final K key, final V value,
                               int partition) throws IOException {
  long size = kvbuffer.length + partitions * APPROX_HEADER_LENGTH;
  FSDataOutputStream out = null;
  try {
    // create spill file
    final SpillRecord spillRec = new SpillRecord(partitions);
    final Path filename = mapOutputFile.getSpillFileForWrite(getTaskID(),
        numSpills, size);
    out = rfs.create(filename);
    
    // we don't run the combiner for a single record
    IndexRecord rec = new IndexRecord();
    for (int i = 0; i < partitions; ++i) {
      IFile.Writer<K, V> writer = null;
      try {
        long segmentStart = out.getPos();
        // Create a new codec, don't care!
        writer = new IFile.Writer<K,V>(job, out, keyClass, valClass, codec,
                                        spilledRecordsCounter);

        if (i == partition) {
          final long recordStart = out.getPos();
          writer.append(key, value);
          // Note that our map byte count will not be accurate with
          // compression
          mapOutputByteCounter.increment(out.getPos() - recordStart);
        }
        writer.close();

        // record offsets
        rec.startOffset = segmentStart;
        rec.rawLength = writer.getRawLength();
        rec.partLength = writer.getCompressedLength();
        spillRec.putIndex(rec, i);

        writer = null;
      } catch (IOException e) {
        if (null != writer) writer.close();
        throw e;
      }
    }
    if (totalIndexCacheMemory >= INDEX_CACHE_MEMORY_LIMIT) {
      // create spill index file
      Path indexFilename = mapOutputFile.getSpillIndexFileForWrite(
          getTaskID(), numSpills,
          partitions * MAP_OUTPUT_INDEX_RECORD_LENGTH);
      spillRec.writeToFile(indexFilename, job);
    } else {
      indexCacheList.add(spillRec);
      totalIndexCacheMemory +=
        spillRec.size() * MAP_OUTPUT_INDEX_RECORD_LENGTH;
    }
    ++numSpills;
  } finally {
    if (out != null) out.close();
  }
}
 
源代码13 项目: hadoop   文件: CryptoFSDataOutputStream.java
public CryptoFSDataOutputStream(FSDataOutputStream out, CryptoCodec codec,
    int bufferSize, byte[] key, byte[] iv) throws IOException {
  super(new CryptoOutputStream(out, codec, bufferSize, key, iv, 
      out.getPos()), null, out.getPos()); 
  this.fsOut = out;
}
 
源代码14 项目: hbase   文件: HFileBlockIndex.java
/**
 * Writes the root level and intermediate levels of the block index into
 * the output stream, generating the tree from bottom up. Assumes that the
 * leaf level has been inline-written to the disk if there is enough data
 * for more than one leaf block. We iterate by breaking the current level
 * of the block index, starting with the index of all leaf-level blocks,
 * into chunks small enough to be written to disk, and generate its parent
 * level, until we end up with a level small enough to become the root
 * level.
 *
 * If the leaf level is not large enough, there is no inline block index
 * anymore, so we only write that level of block index to disk as the root
 * level.
 *
 * @param out FSDataOutputStream
 * @return position at which we entered the root-level index.
 * @throws IOException
 */
public long writeIndexBlocks(FSDataOutputStream out) throws IOException {
  if (curInlineChunk != null && curInlineChunk.getNumEntries() != 0) {
    throw new IOException("Trying to write a multi-level block index, " +
        "but are " + curInlineChunk.getNumEntries() + " entries in the " +
        "last inline chunk.");
  }

  // We need to get mid-key metadata before we create intermediate
  // indexes and overwrite the root chunk.
  byte[] midKeyMetadata = numLevels > 1 ? rootChunk.getMidKeyMetadata()
      : null;

  if (curInlineChunk != null) {
    while (rootChunk.getRootSize() > maxChunkSize
        // HBASE-16288: if firstKey is larger than maxChunkSize we will loop indefinitely
        && rootChunk.getNumEntries() > minIndexNumEntries
        // Sanity check. We will not hit this (minIndexNumEntries ^ 16) blocks can be addressed
        && numLevels < 16) {
      rootChunk = writeIntermediateLevel(out, rootChunk);
      numLevels += 1;
    }
  }

  // write the root level
  long rootLevelIndexPos = out.getPos();

  {
    DataOutput blockStream =
        blockWriter.startWriting(BlockType.ROOT_INDEX);
    rootChunk.writeRoot(blockStream);
    if (midKeyMetadata != null)
      blockStream.write(midKeyMetadata);
    blockWriter.writeHeaderAndData(out);
    if (cacheConf != null) {
      cacheConf.getBlockCache().ifPresent(cache -> {
        HFileBlock blockForCaching = blockWriter.getBlockForCaching(cacheConf);
        cache.cacheBlock(new BlockCacheKey(nameForCaching, rootLevelIndexPos, true,
            blockForCaching.getBlockType()), blockForCaching);
      });
    }
  }

  // Add root index block size
  totalBlockOnDiskSize += blockWriter.getOnDiskSizeWithoutHeader();
  totalBlockUncompressedSize +=
      blockWriter.getUncompressedSizeWithoutHeader();

  if (LOG.isTraceEnabled()) {
    LOG.trace("Wrote a " + numLevels + "-level index with root level at pos "
      + rootLevelIndexPos + ", " + rootChunk.getNumEntries()
      + " root-level entries, " + totalNumEntries + " total entries, "
      + StringUtils.humanReadableInt(this.totalBlockOnDiskSize) +
      " on-disk size, "
      + StringUtils.humanReadableInt(totalBlockUncompressedSize) +
      " total uncompressed size.");
  }
  return rootLevelIndexPos;
}
 
源代码15 项目: tez   文件: DefaultSorter.java
/**
 * Handles the degenerate case where serialization fails to fit in
 * the in-memory buffer, so we must spill the record from collect
 * directly to a spill file. Consider this "losing".
 */
private void spillSingleRecord(final Object key, final Object value,
                               int partition) throws IOException {
  long size = kvbuffer.length + partitions * APPROX_HEADER_LENGTH;
  FSDataOutputStream out = null;
  try {
    // create spill file
    final TezSpillRecord spillRec = new TezSpillRecord(partitions);
    final Path filename =
        mapOutputFile.getSpillFileForWrite(numSpills, size);
    spillFilePaths.put(numSpills, filename);
    out = rfs.create(filename);
    ensureSpillFilePermissions(filename, rfs);

    // we don't run the combiner for a single record
    for (int i = 0; i < partitions; ++i) {
      IFile.Writer writer = null;
      try {
        long segmentStart = out.getPos();
        // Create a new codec, don't care!
        if (!sendEmptyPartitionDetails || (i == partition)) {
          writer = new Writer(conf, out, keyClass, valClass, codec,
              spilledRecordsCounter, null, false);
        }
        if (i == partition) {
          final long recordStart = out.getPos();
          writer.append(key, value);
          // Note that our map byte count will not be accurate with
          // compression
          mapOutputByteCounter.increment(out.getPos() - recordStart);
        }
        long rawLength =0;
        long partLength =0;
        if (writer != null) {
          writer.close();
          rawLength = writer.getRawLength();
          partLength = writer.getCompressedLength();
        }
        adjustSpillCounters(rawLength, partLength);

        // record offsets
        TezIndexRecord rec = new TezIndexRecord(segmentStart, rawLength, partLength);
        spillRec.putIndex(rec, i);

        writer = null;
      } catch (IOException e) {
        if (null != writer) writer.close();
        throw e;
      }
    }
    if (totalIndexCacheMemory >= indexCacheMemoryLimit) {
      // create spill index file
      Path indexFilename =
          mapOutputFile.getSpillIndexFileForWrite(numSpills, partitions
              * MAP_OUTPUT_INDEX_RECORD_LENGTH);
      spillFileIndexPaths.put(numSpills, indexFilename);
      spillRec.writeToFile(indexFilename, conf, localFs);
    } else {
      indexCacheList.add(spillRec);
      totalIndexCacheMemory +=
        spillRec.size() * MAP_OUTPUT_INDEX_RECORD_LENGTH;
    }
    ++numSpills;
    if (!isFinalMergeEnabled()) {
      numShuffleChunks.setValue(numSpills);
    } else if (numSpills > 1) {
      //Increment only when there is atleast one previous spill
      numAdditionalSpills.increment(1);
    }
  } finally {
    if (out != null) out.close();
  }
}
 
源代码16 项目: tez   文件: PipelinedSorter.java
public boolean spill(boolean ignoreEmptySpills) throws IOException {
  FSDataOutputStream out = null;
  try {
    try {
      boolean ret = merger.ready();
      // if merger returned false and ignore merge is true,
      // then return directly without spilling
      if (!ret && ignoreEmptySpills){
        return false;
      }
    } catch (InterruptedException e) {
      Thread.currentThread().interrupt();
      LOG.info(outputContext.getDestinationVertexName() + ": Interrupted while waiting for mergers to complete");
      throw new IOInterruptedException(outputContext.getDestinationVertexName() + ": Interrupted while waiting for mergers to complete", e);
    }

    // create spill file
    final long size = capacity +
        + (partitions * APPROX_HEADER_LENGTH);
    final TezSpillRecord spillRec = new TezSpillRecord(partitions);
    final Path filename =
      mapOutputFile.getSpillFileForWrite(numSpills, size);
    spillFilePaths.put(numSpills, filename);
    out = rfs.create(filename, true, 4096);
    ensureSpillFilePermissions(filename, rfs);
    LOG.info(outputContext.getDestinationVertexName() + ": Spilling to " + filename.toString());
    for (int i = 0; i < partitions; ++i) {
      if (isThreadInterrupted()) {
        return false;
      }
      outputContext.notifyProgress();
      TezRawKeyValueIterator kvIter = merger.filter(i);
      //write merged output to disk
      long segmentStart = out.getPos();
      Writer writer = null;
      boolean hasNext = kvIter.hasNext();
      if (hasNext || !sendEmptyPartitionDetails) {
        writer = new Writer(conf, out, keyClass, valClass, codec,
            spilledRecordsCounter, null, merger.needsRLE());
      }
      if (combiner == null) {
        while (kvIter.next()) {
          writer.append(kvIter.getKey(), kvIter.getValue());
        }
      } else {          
        if (hasNext) {
          runCombineProcessor(kvIter, writer);
        }
      }
      long rawLength = 0;
      long partLength = 0;
      //close
      if (writer != null) {
        writer.close();
        rawLength = writer.getRawLength();
        partLength = writer.getCompressedLength();
      }
      adjustSpillCounters(rawLength, partLength);
      // record offsets
      final TezIndexRecord rec =
          new TezIndexRecord(segmentStart, rawLength, partLength);
      spillRec.putIndex(rec, i);
      if (!isFinalMergeEnabled() && reportPartitionStats()) {
        partitionStats[i] += partLength;
      }
    }

    Path indexFilename =
      mapOutputFile.getSpillIndexFileForWrite(numSpills, partitions
          * MAP_OUTPUT_INDEX_RECORD_LENGTH);
    spillFileIndexPaths.put(numSpills, indexFilename);
    spillRec.writeToFile(indexFilename, conf, localFs);
    //TODO: honor cache limits
    indexCacheList.add(spillRec);
    ++numSpills;
    if (!isFinalMergeEnabled()) {
      fileOutputByteCounter.increment(rfs.getFileStatus(filename).getLen());
      //No final merge. Set the number of files offered via shuffle-handler
      numShuffleChunks.setValue(numSpills);
    }
    return true;
  } finally {
    if (out != null) {
      out.close();
    }
  }
}
 
源代码17 项目: incubator-tez   文件: DefaultSorter.java
/**
 * Handles the degenerate case where serialization fails to fit in
 * the in-memory buffer, so we must spill the record from collect
 * directly to a spill file. Consider this "losing".
 */
private void spillSingleRecord(final Object key, final Object value,
                               int partition) throws IOException {
  long size = kvbuffer.length + partitions * APPROX_HEADER_LENGTH;
  FSDataOutputStream out = null;
  try {
    // create spill file
    final TezSpillRecord spillRec = new TezSpillRecord(partitions);
    final Path filename =
        mapOutputFile.getSpillFileForWrite(numSpills, size);
    out = rfs.create(filename);

    // we don't run the combiner for a single record
    for (int i = 0; i < partitions; ++i) {
      IFile.Writer writer = null;
      try {
        long segmentStart = out.getPos();
        // Create a new codec, don't care!
        writer = new IFile.Writer(conf, out, keyClass, valClass, codec,
                                        spilledRecordsCounter, null);

        if (i == partition) {
          final long recordStart = out.getPos();
          writer.append(key, value);
          // Note that our map byte count will not be accurate with
          // compression
          mapOutputByteCounter.increment(out.getPos() - recordStart);
        }
        writer.close();

        if (numSpills > 0) {
          additionalSpillBytesWritten.increment(writer.getCompressedLength());
          numAdditionalSpills.increment(1);
          outputBytesWithOverheadCounter.setValue(0);
        } else {
          // Set this up for the first write only. Subsequent ones will be handled in the final merge.
          outputBytesWithOverheadCounter.increment(writer.getRawLength());
        }

        // record offsets
        TezIndexRecord rec =
            new TezIndexRecord(
                segmentStart,
                writer.getRawLength(),
                writer.getCompressedLength());
        spillRec.putIndex(rec, i);

        writer = null;
      } catch (IOException e) {
        if (null != writer) writer.close();
        throw e;
      }
    }
    if (totalIndexCacheMemory >= indexCacheMemoryLimit) {
      // create spill index file
      Path indexFilename =
          mapOutputFile.getSpillIndexFileForWrite(numSpills, partitions
              * MAP_OUTPUT_INDEX_RECORD_LENGTH);
      spillRec.writeToFile(indexFilename, conf);
    } else {
      indexCacheList.add(spillRec);
      totalIndexCacheMemory +=
        spillRec.size() * MAP_OUTPUT_INDEX_RECORD_LENGTH;
    }
    ++numSpills;
  } finally {
    if (out != null) out.close();
  }
}
 
源代码18 项目: tez   文件: PipelinedSorter.java
private void spillSingleRecord(final Object key, final Object value,
        int partition) throws IOException {
  final TezSpillRecord spillRec = new TezSpillRecord(partitions);
  // getSpillFileForWrite with size -1 as the serialized size of KV pair is still unknown
  final Path filename = mapOutputFile.getSpillFileForWrite(numSpills, -1);
  Path indexFilename =
      mapOutputFile.getSpillIndexFileForWrite(numSpills, partitions
          * MAP_OUTPUT_INDEX_RECORD_LENGTH);
  spillFilePaths.put(numSpills, filename);
  FSDataOutputStream out = rfs.create(filename, true, 4096);
  ensureSpillFilePermissions(filename, rfs);

  try {
    LOG.info(outputContext.getDestinationVertexName() + ": Spilling to " + filename.toString() +
        ", indexFilename=" + indexFilename);
    for (int i = 0; i < partitions; ++i) {
      if (isThreadInterrupted()) {
        return;
      }
      Writer writer = null;
      try {
        long segmentStart = out.getPos();
        if (!sendEmptyPartitionDetails || (i == partition)) {
          writer = new Writer(conf, out, keyClass, valClass, codec,
              spilledRecordsCounter, null, false);
        }
        // we need not check for combiner since its a single record
        if (i == partition) {
          final long recordStart = out.getPos();
          writer.append(key, value);
          mapOutputRecordCounter.increment(1);
          mapOutputByteCounter.increment(out.getPos() - recordStart);
        }
        long rawLength = 0;
        long partLength = 0;
        if (writer != null) {
          writer.close();
          rawLength = writer.getRawLength();
          partLength = writer.getCompressedLength();
        }
        adjustSpillCounters(rawLength, partLength);
        // record offsets
        final TezIndexRecord rec =
            new TezIndexRecord(
                segmentStart, rawLength, partLength);
        spillRec.putIndex(rec, i);
        writer = null;
      } finally {
        if (null != writer) {
          writer.close();
        }
      }
    }

    spillFileIndexPaths.put(numSpills, indexFilename);
    spillRec.writeToFile(indexFilename, conf, localFs);
    //TODO: honor cache limits
    indexCacheList.add(spillRec);
    ++numSpills;
    if (!isFinalMergeEnabled()) {
        fileOutputByteCounter.increment(rfs.getFileStatus(filename).getLen());
        //No final merge. Set the number of files offered via shuffle-handler
        numShuffleChunks.setValue(numSpills);
    }
    if (pipelinedShuffle) {
      sendPipelinedShuffleEvents();
    }
  } finally {
      out.close();
  }
}
 
源代码19 项目: RDFS   文件: MapTask.java
/**
 * Handles the degenerate case where serialization fails to fit in
 * the in-memory buffer, so we must spill the record from collect
 * directly to a spill file. Consider this "losing".
 */
private void spillSingleRecord(final K key, final V value,
                               int partition) throws IOException {
  long size = kvbuffer.length + partitions * APPROX_HEADER_LENGTH;
  FSDataOutputStream out = null;
  try {
    
    long spillStartMilli = System.currentTimeMillis();
    ProcResourceValues spillStartProcVals = getCurrentProcResourceValues();
    long spillBytes = 0;
    
    // create spill file
    final SpillRecord spillRec = new SpillRecord(partitions);
    final Path filename = mapOutputFile.getSpillFileForWrite(getTaskID(),
        numSpills, size);
    out = rfs.create(filename);
    
    // we don't run the combiner for a single record
    IndexRecord rec = new IndexRecord();
    for (int i = 0; i < partitions; ++i) {
      IFile.Writer<K, V> writer = null;
      try {
        long segmentStart = out.getPos();
        // Create a new codec, don't care!
        writer = new IFile.Writer<K,V>(job, out, keyClass, valClass, codec,
                                        spilledRecordsCounter);

        if (i == partition) {
          final long recordStart = out.getPos();
          writer.append(key, value);
          // Note that our map byte count will not be accurate with
          // compression
          mapOutputByteCounter.increment(out.getPos() - recordStart);
        }
        writer.close();

        // record offsets
        rec.startOffset = segmentStart;
        rec.rawLength = writer.getRawLength();
        rec.partLength = writer.getCompressedLength();
        spillBytes += writer.getCompressedLength();
        spillRec.putIndex(rec, i);

        writer = null;
      } catch (IOException e) {
        if (null != writer) writer.close();
        throw e;
      }
    }
    if (totalIndexCacheMemory >= INDEX_CACHE_MEMORY_LIMIT) {
      // create spill index file
      Path indexFilename = mapOutputFile.getSpillIndexFileForWrite(
          getTaskID(), numSpills,
          partitions * MAP_OUTPUT_INDEX_RECORD_LENGTH);
      spillRec.writeToFile(indexFilename, job);
    } else {
      indexCacheList.add(spillRec);
      totalIndexCacheMemory +=
        spillRec.size() * MAP_OUTPUT_INDEX_RECORD_LENGTH;
    }
    
    long spillEndMilli = System.currentTimeMillis();
    ProcResourceValues spillEndProcVals = getCurrentProcResourceValues();        
    spillSortCounters.incCountersPerSpill(spillStartProcVals,
        spillEndProcVals, spillEndMilli - spillStartMilli, spillBytes);
    ++numSpills;
  } finally {
    if (out != null) out.close();
  }
}
 
源代码20 项目: incubator-tez   文件: DefaultSorter.java
protected void spill(int mstart, int mend)
    throws IOException, InterruptedException {

  //approximate the length of the output file to be the length of the
  //buffer + header lengths for the partitions
  final long size = (bufend >= bufstart
      ? bufend - bufstart
      : (bufvoid - bufend) + bufstart) +
              partitions * APPROX_HEADER_LENGTH;
  FSDataOutputStream out = null;
  try {
    // create spill file
    final TezSpillRecord spillRec = new TezSpillRecord(partitions);
    final Path filename =
        mapOutputFile.getSpillFileForWrite(numSpills, size);
    out = rfs.create(filename);

    int spindex = mstart;
    final InMemValBytes value = createInMemValBytes();
    for (int i = 0; i < partitions; ++i) {
      IFile.Writer writer = null;
      try {
        long segmentStart = out.getPos();
        writer = new Writer(conf, out, keyClass, valClass, codec,
                                  spilledRecordsCounter, null);
        if (combiner == null) {
          // spill directly
          DataInputBuffer key = new DataInputBuffer();
          while (spindex < mend &&
              kvmeta.get(offsetFor(spindex) + PARTITION) == i) {
            final int kvoff = offsetFor(spindex);
            int keystart = kvmeta.get(kvoff + KEYSTART);
            int valstart = kvmeta.get(kvoff + VALSTART);
            key.reset(kvbuffer, keystart, valstart - keystart);
            getVBytesForOffset(kvoff, value);
            writer.append(key, value);
            ++spindex;
          }
        } else {
          int spstart = spindex;
          while (spindex < mend &&
              kvmeta.get(offsetFor(spindex)
                        + PARTITION) == i) {
            ++spindex;
          }
          // Note: we would like to avoid the combiner if we've fewer
          // than some threshold of records for a partition
          if (spstart != spindex) {
            TezRawKeyValueIterator kvIter =
              new MRResultIterator(spstart, spindex);
            if (LOG.isDebugEnabled()) {
              LOG.debug("Running combine processor");
            }
            runCombineProcessor(kvIter, writer);
          }
        }

        // close the writer
        writer.close();
        if (numSpills > 0) {
          additionalSpillBytesWritten.increment(writer.getCompressedLength());
          numAdditionalSpills.increment(1);
          // Reset the value will be set during the final merge.
          outputBytesWithOverheadCounter.setValue(0);
        } else {
          // Set this up for the first write only. Subsequent ones will be handled in the final merge.
          outputBytesWithOverheadCounter.increment(writer.getRawLength());
        }
        // record offsets
        final TezIndexRecord rec =
            new TezIndexRecord(
                segmentStart,
                writer.getRawLength(),
                writer.getCompressedLength());
        spillRec.putIndex(rec, i);

        writer = null;
      } finally {
        if (null != writer) writer.close();
      }
    }

    if (totalIndexCacheMemory >= indexCacheMemoryLimit) {
      // create spill index file
      Path indexFilename =
          mapOutputFile.getSpillIndexFileForWrite(numSpills, partitions
              * MAP_OUTPUT_INDEX_RECORD_LENGTH);
      spillRec.writeToFile(indexFilename, conf);
    } else {
      indexCacheList.add(spillRec);
      totalIndexCacheMemory +=
        spillRec.size() * MAP_OUTPUT_INDEX_RECORD_LENGTH;
    }
    LOG.info("Finished spill " + numSpills);
    ++numSpills;
  } finally {
    if (out != null) out.close();
  }
}