org.apache.hadoop.fs.FSDataInputStream#getPos ( )源码实例Demo

下面列出了org.apache.hadoop.fs.FSDataInputStream#getPos ( ) 实例代码,或者点击链接到github查看源代码,也可以在右侧发表评论。

源代码1 项目: hadoop   文件: Tail.java
private long dumpFromOffset(PathData item, long offset) throws IOException {
  long fileSize = item.refreshStatus().getLen();
  if (offset > fileSize) return fileSize;
  // treat a negative offset as relative to end of the file, floor of 0
  if (offset < 0) {
    offset = Math.max(fileSize + offset, 0);
  }
  
  FSDataInputStream in = item.fs.open(item.path);
  try {
    in.seek(offset);
    // use conf so the system configured io block size is used
    IOUtils.copyBytes(in, System.out, getConf(), false);
    offset = in.getPos();
  } finally {
    in.close();
  }
  return offset;
}
 
源代码2 项目: hadoop   文件: MapTask.java
@SuppressWarnings("unchecked")
private <T> T getSplitDetails(Path file, long offset) 
 throws IOException {
  FileSystem fs = file.getFileSystem(conf);
  FSDataInputStream inFile = fs.open(file);
  inFile.seek(offset);
  String className = StringInterner.weakIntern(Text.readString(inFile));
  Class<T> cls;
  try {
    cls = (Class<T>) conf.getClassByName(className);
  } catch (ClassNotFoundException ce) {
    IOException wrap = new IOException("Split class " + className + 
                                        " not found");
    wrap.initCause(ce);
    throw wrap;
  }
  SerializationFactory factory = new SerializationFactory(conf);
  Deserializer<T> deserializer = 
    (Deserializer<T>) factory.getDeserializer(cls);
  deserializer.open(inFile);
  T split = deserializer.deserialize(null);
  long pos = inFile.getPos();
  getCounters().findCounter(
      TaskCounter.SPLIT_RAW_BYTES).increment(pos - offset);
  inFile.close();
  return split;
}
 
源代码3 项目: big-c   文件: MapTask.java
@SuppressWarnings("unchecked")
private <T> T getSplitDetails(Path file, long offset) 
 throws IOException {
  FileSystem fs = file.getFileSystem(conf);
  FSDataInputStream inFile = fs.open(file);
  inFile.seek(offset);
  String className = StringInterner.weakIntern(Text.readString(inFile));
  Class<T> cls;
  try {
    cls = (Class<T>) conf.getClassByName(className);
  } catch (ClassNotFoundException ce) {
    IOException wrap = new IOException("Split class " + className + 
                                        " not found");
    wrap.initCause(ce);
    throw wrap;
  }
  SerializationFactory factory = new SerializationFactory(conf);
  Deserializer<T> deserializer = 
    (Deserializer<T>) factory.getDeserializer(cls);
  deserializer.open(inFile);
  T split = deserializer.deserialize(null);
  long pos = inFile.getPos();
  getCounters().findCounter(
      TaskCounter.SPLIT_RAW_BYTES).increment(pos - offset);
  inFile.close();
  return split;
}
 
源代码4 项目: tez   文件: MRInputUtils.java
@SuppressWarnings("unchecked")
public static InputSplit getOldSplitDetailsFromDisk(TaskSplitIndex splitMetaInfo,
    JobConf jobConf, TezCounter splitBytesCounter) throws IOException {
  Path file = new Path(splitMetaInfo.getSplitLocation());
  FileSystem fs = FileSystem.getLocal(jobConf);
  file = fs.makeQualified(file);
  LOG.info("Reading input split file from : " + file);
  long offset = splitMetaInfo.getStartOffset();

  FSDataInputStream inFile = fs.open(file);
  inFile.seek(offset);
  String className = Text.readString(inFile);
  Class<org.apache.hadoop.mapred.InputSplit> cls;
  try {
    cls = (Class<org.apache.hadoop.mapred.InputSplit>) jobConf.getClassByName(className);
  } catch (ClassNotFoundException ce) {
    IOException wrap = new IOException("Split class " + className + " not found");
    wrap.initCause(ce);
    throw wrap;
  }
  SerializationFactory factory = new SerializationFactory(jobConf);
  Deserializer<org.apache.hadoop.mapred.InputSplit> deserializer = (Deserializer<org.apache.hadoop.mapred.InputSplit>) factory
      .getDeserializer(cls);
  deserializer.open(inFile);
  org.apache.hadoop.mapred.InputSplit split = deserializer.deserialize(null);
  long pos = inFile.getPos();
  if (splitBytesCounter != null) {
    splitBytesCounter.increment(pos - offset);
  }
  inFile.close();
  return split;
}
 
源代码5 项目: succinct   文件: SuccinctIndexedFileStream.java
/**
 * Constructor to map a file containing Succinct data structures via stream.
 *
 * @param filePath Path of the file.
 * @param conf     Configuration for the filesystem.
 * @throws IOException
 */
public SuccinctIndexedFileStream(Path filePath, Configuration conf) throws IOException {
  super(filePath, conf);
  FSDataInputStream is = getStream(filePath);
  is.seek(endOfFileStream);
  int len = is.readInt();
  offsets = new int[len];
  for (int i = 0; i < len; i++) {
    offsets[i] = is.readInt();
  }
  endOfIndexedFileStream = is.getPos();
  is.close();
}
 
源代码6 项目: tajo   文件: FSDataInputChannel.java
public FSDataInputChannel(FSDataInputStream inputStream) throws IOException {
  if (inputStream.getWrappedStream() instanceof ByteBufferReadable) {
    this.isDirectRead = true;
  } else {
    /* LocalFileSystem, S3 does not support ByteBufferReadable */
    this.channel = Channels.newChannel(inputStream);
  }
  this.inputStream = inputStream;
  this.size = inputStream.getPos() + inputStream.available();
}
 
源代码7 项目: hbase   文件: ProtobufLogReader.java
private String initInternal(FSDataInputStream stream, boolean isFirst)
    throws IOException {
  close();
  long expectedPos = PB_WAL_MAGIC.length;
  if (stream == null) {
    stream = fs.open(path);
    stream.seek(expectedPos);
  }
  if (stream.getPos() != expectedPos) {
    throw new IOException("The stream is at invalid position: " + stream.getPos());
  }
  // Initialize metadata or, when we reset, just skip the header.
  WALProtos.WALHeader.Builder builder = WALProtos.WALHeader.newBuilder();
  WALHdrContext hdrCtxt = readHeader(builder, stream);
  WALHdrResult walHdrRes = hdrCtxt.getResult();
  if (walHdrRes == WALHdrResult.EOF) {
    throw new EOFException("Couldn't read WAL PB header");
  }
  if (walHdrRes == WALHdrResult.UNKNOWN_WRITER_CLS) {
    throw new IOException("Got unknown writer class: " + builder.getWriterClsName());
  }
  if (isFirst) {
    WALProtos.WALHeader header = builder.build();
    this.hasCompression = header.hasHasCompression() && header.getHasCompression();
    this.hasTagCompression = header.hasHasTagCompression() && header.getHasTagCompression();
  }
  this.inputStream = stream;
  this.walEditsStopOffset = this.fileLength;
  long currentPosition = stream.getPos();
  trailerPresent = setTrailerIfPresent();
  this.seekOnFs(currentPosition);
  if (LOG.isTraceEnabled()) {
    LOG.trace("After reading the trailer: walEditsStopOffset: " + this.walEditsStopOffset
        + ", fileLength: " + this.fileLength + ", " + "trailerPresent: " + (trailerPresent ? "true, size: " + trailer.getSerializedSize() : "false") + ", currentPosition: " + currentPosition);
  }
  
  codecClsName = hdrCtxt.getCellCodecClsName();
  
  return hdrCtxt.getCellCodecClsName();
}
 
private void loadIndex(Path path) throws IOException {
  FSDataInputStream inputStream = _fileSystem.open(path);
  byte[] buf = new byte[MAGIC.length];
  inputStream.readFully(buf);
  if (!Arrays.equals(MAGIC, buf)) {
    throw new IOException("File [" + path + "] not a " + BLUR_KEY_VALUE + " file.");
  }
  int version = inputStream.readInt();
  if (version == 1) {
    long fileLength = HdfsUtils.getFileLength(_fileSystem, path, inputStream);
    Operation operation = new Operation();
    try {
      while (inputStream.getPos() < fileLength) {
        try {
          operation.readFields(inputStream);
        } catch (IOException e) {
          // End of sync point found
          return;
        }
        loadIndex(path, operation);
      }
    } finally {
      inputStream.close();
    }
  } else {
    throw new IOException("Unknown version [" + version + "]");
  }
}
 
源代码9 项目: spork   文件: CBZip2InputStream.java
public CBZip2InputStream(FSDataInputStream zStream, int blockSize, long end)
throws IOException {
    endOffsetOfSplit = end;
    // initialize retPos to the beginning of the current InputSplit
    // see comments in getPos() to understand how this is used.
    retPos = zStream.getPos();
	ll8 = null;
    tt = null;
    checkComputedCombinedCRC = blockSize == -1;
    bsSetStream(zStream);
    initialize(blockSize);
    initBlock(blockSize != -1);
    setupBlock();
}
 
源代码10 项目: incubator-tez   文件: MRInputUtils.java
@SuppressWarnings("unchecked")
public static org.apache.hadoop.mapreduce.InputSplit getNewSplitDetailsFromDisk(
    TaskSplitIndex splitMetaInfo, JobConf jobConf, TezCounter splitBytesCounter)
    throws IOException {
  Path file = new Path(splitMetaInfo.getSplitLocation());
  long offset = splitMetaInfo.getStartOffset();

  // Split information read from local filesystem.
  FileSystem fs = FileSystem.getLocal(jobConf);
  file = fs.makeQualified(file);
  LOG.info("Reading input split file from : " + file);
  FSDataInputStream inFile = fs.open(file);
  inFile.seek(offset);
  String className = Text.readString(inFile);
  Class<org.apache.hadoop.mapreduce.InputSplit> cls;
  try {
    cls = (Class<org.apache.hadoop.mapreduce.InputSplit>) jobConf.getClassByName(className);
  } catch (ClassNotFoundException ce) {
    IOException wrap = new IOException("Split class " + className + " not found");
    wrap.initCause(ce);
    throw wrap;
  }
  SerializationFactory factory = new SerializationFactory(jobConf);
  Deserializer<org.apache.hadoop.mapreduce.InputSplit> deserializer = (Deserializer<org.apache.hadoop.mapreduce.InputSplit>) factory
      .getDeserializer(cls);
  deserializer.open(inFile);
  org.apache.hadoop.mapreduce.InputSplit split = deserializer.deserialize(null);
  long pos = inFile.getPos();
  if (splitBytesCounter != null) {
    splitBytesCounter.increment(pos - offset);
  }
  inFile.close();
  return split;
}
 
源代码11 项目: incubator-tez   文件: MRInputUtils.java
@SuppressWarnings("unchecked")
public static InputSplit getOldSplitDetailsFromDisk(TaskSplitIndex splitMetaInfo,
    JobConf jobConf, TezCounter splitBytesCounter) throws IOException {
  Path file = new Path(splitMetaInfo.getSplitLocation());
  FileSystem fs = FileSystem.getLocal(jobConf);
  file = fs.makeQualified(file);
  LOG.info("Reading input split file from : " + file);
  long offset = splitMetaInfo.getStartOffset();

  FSDataInputStream inFile = fs.open(file);
  inFile.seek(offset);
  String className = Text.readString(inFile);
  Class<org.apache.hadoop.mapred.InputSplit> cls;
  try {
    cls = (Class<org.apache.hadoop.mapred.InputSplit>) jobConf.getClassByName(className);
  } catch (ClassNotFoundException ce) {
    IOException wrap = new IOException("Split class " + className + " not found");
    wrap.initCause(ce);
    throw wrap;
  }
  SerializationFactory factory = new SerializationFactory(jobConf);
  Deserializer<org.apache.hadoop.mapred.InputSplit> deserializer = (Deserializer<org.apache.hadoop.mapred.InputSplit>) factory
      .getDeserializer(cls);
  deserializer.open(inFile);
  org.apache.hadoop.mapred.InputSplit split = deserializer.deserialize(null);
  long pos = inFile.getPos();
  if (splitBytesCounter != null) {
    splitBytesCounter.increment(pos - offset);
  }
  inFile.close();
  return split;
}
 
源代码12 项目: tez   文件: MRInputUtils.java
@SuppressWarnings("unchecked")
public static org.apache.hadoop.mapreduce.InputSplit getNewSplitDetailsFromDisk(
    TaskSplitIndex splitMetaInfo, JobConf jobConf, TezCounter splitBytesCounter)
    throws IOException {
  Path file = new Path(splitMetaInfo.getSplitLocation());
  long offset = splitMetaInfo.getStartOffset();

  // Split information read from local filesystem.
  FileSystem fs = FileSystem.getLocal(jobConf);
  file = fs.makeQualified(file);
  LOG.info("Reading input split file from : " + file);
  FSDataInputStream inFile = fs.open(file);
  inFile.seek(offset);
  String className = Text.readString(inFile);
  Class<org.apache.hadoop.mapreduce.InputSplit> cls;
  try {
    cls = (Class<org.apache.hadoop.mapreduce.InputSplit>) jobConf.getClassByName(className);
  } catch (ClassNotFoundException ce) {
    IOException wrap = new IOException("Split class " + className + " not found");
    wrap.initCause(ce);
    throw wrap;
  }
  SerializationFactory factory = new SerializationFactory(jobConf);
  Deserializer<org.apache.hadoop.mapreduce.InputSplit> deserializer = (Deserializer<org.apache.hadoop.mapreduce.InputSplit>) factory
      .getDeserializer(cls);
  deserializer.open(inFile);
  org.apache.hadoop.mapreduce.InputSplit split = deserializer.deserialize(null);
  long pos = inFile.getPos();
  if (splitBytesCounter != null) {
    splitBytesCounter.increment(pos - offset);
  }
  inFile.close();
  return split;
}
 
源代码13 项目: incubator-crail   文件: HdfsIOBenchmark.java
public void readSequentialDirect() throws Exception {
	System.out.println("reading sequential file in direct mode " + path);
	Configuration conf = new Configuration();
	FileSystem fs = FileSystem.get(conf);
	FileStatus status = fs.getFileStatus(path);
	FSDataInputStream instream = fs.open(path);
	ByteBuffer buf = ByteBuffer.allocateDirect(size);
	buf.clear();
	double sumbytes = 0;
	double ops = 0;
	System.out.println("file capacity " + status.getLen());
	System.out.println("read size " + size);
	System.out.println("operations " + loop);
	
	long start = System.currentTimeMillis();
	while (ops < loop) {
		buf.clear();
		double ret = (double) instream.read(buf);
		if (ret > 0) {
			sumbytes = sumbytes + ret;
			ops = ops + 1.0;
		} else {
			ops = ops + 1.0;
			if (instream.getPos() == 0){
				break;
			} else {
				instream.seek(0);
			}
		}
	}
	long end = System.currentTimeMillis();
	double executionTime = ((double) (end - start)) / 1000.0;
	double throughput = 0.0;
	double latency = 0.0;
	double sumbits = sumbytes * 8.0;
	if (executionTime > 0) {
		throughput = sumbits / executionTime / 1024.0 / 1024.0;
		latency = 1000000.0 * executionTime / ops;
	}
	System.out.println("execution time " + executionTime);
	System.out.println("ops " + ops);
	System.out.println("sumbytes " + sumbytes);
	System.out.println("throughput " + throughput);
	System.out.println("latency " + latency);
	System.out.println("closing stream");
	instream.close();	
	fs.close();
}
 
源代码14 项目: incubator-crail   文件: HdfsIOBenchmark.java
public void readSequentialHeap() throws Exception {
	System.out.println("reading sequential file in heap mode " + path);
	Configuration conf = new Configuration();
	FileSystem fs = FileSystem.get(conf);
	FileStatus status = fs.getFileStatus(path);
	FSDataInputStream instream = fs.open(path);
	byte[] buf = new byte[size];
	double sumbytes = 0;
	double ops = 0;
	System.out.println("file capacity " + status.getLen());
	System.out.println("read size " + size);
	System.out.println("operations " + loop);
	
	long start = System.currentTimeMillis();
	while (ops < loop) {
		double ret = (double) this.read(instream, buf);
		if (ret > 0) {
			sumbytes = sumbytes + ret;
			ops = ops + 1.0;
		} else {
			ops = ops + 1.0;
			if (instream.getPos() == 0){
				break;
			} else {
				instream.seek(0);
			}
		}
	}
	long end = System.currentTimeMillis();
	double executionTime = ((double) (end - start)) / 1000.0;
	double throughput = 0.0;
	double latency = 0.0;
	double sumbits = sumbytes * 8.0;
	if (executionTime > 0) {
		throughput = sumbits / executionTime / 1024.0 / 1024.0;
		latency = 1000000.0 * executionTime / ops;
	}
	System.out.println("execution time " + executionTime);
	System.out.println("ops " + ops);
	System.out.println("sumbytes " + sumbytes);
	System.out.println("throughput " + throughput);
	System.out.println("latency " + latency);
	System.out.println("closing stream");
	instream.close();	
	fs.close();
}
 
源代码15 项目: hadoop   文件: TestWriteRead.java
/**
 * read chunks into buffer repeatedly until total of VisibleLen byte are read.
 * Return total number of bytes read
 */
private long readUntilEnd(FSDataInputStream in, byte[] buffer, long size,
    String fname, long pos, long visibleLen, boolean positionReadOption)
    throws IOException {

  if (pos >= visibleLen || visibleLen <= 0)
    return 0;

  int chunkNumber = 0;
  long totalByteRead = 0;
  long currentPosition = pos;
  int byteRead = 0;
  long byteLeftToRead = visibleLen - pos;
  int byteToReadThisRound = 0;

  if (!positionReadOption) {
    in.seek(pos);
    currentPosition = in.getPos();
  }
  if (verboseOption)
    LOG.info("reader begin: position: " + pos + " ; currentOffset = "
        + currentPosition + " ; bufferSize =" + buffer.length
        + " ; Filename = " + fname);
  try {
    while (byteLeftToRead > 0 && currentPosition < visibleLen) {
      byteToReadThisRound = (int) (byteLeftToRead >= buffer.length 
          ? buffer.length : byteLeftToRead);
      if (positionReadOption) {
        byteRead = in.read(currentPosition, buffer, 0, byteToReadThisRound);
      } else {
        byteRead = in.read(buffer, 0, byteToReadThisRound);
      }
      if (byteRead <= 0)
        break;
      chunkNumber++;
      totalByteRead += byteRead;
      currentPosition += byteRead;
      byteLeftToRead -= byteRead;

      if (verboseOption) {
        LOG.info("reader: Number of byte read: " + byteRead
            + " ; totalByteRead = " + totalByteRead + " ; currentPosition="
            + currentPosition + " ; chunkNumber =" + chunkNumber
            + "; File name = " + fname);
      }
    }
  } catch (IOException e) {
    throw new IOException(
        "#### Exception caught in readUntilEnd: reader  currentOffset = "
            + currentPosition + " ; totalByteRead =" + totalByteRead
            + " ; latest byteRead = " + byteRead + "; visibleLen= "
            + visibleLen + " ; bufferLen = " + buffer.length
            + " ; Filename = " + fname, e);
  }

  if (verboseOption)
    LOG.info("reader end:   position: " + pos + " ; currentOffset = "
        + currentPosition + " ; totalByteRead =" + totalByteRead
        + " ; Filename = " + fname);

  return totalByteRead;
}
 
源代码16 项目: big-c   文件: TestWriteRead.java
/**
 * read chunks into buffer repeatedly until total of VisibleLen byte are read.
 * Return total number of bytes read
 */
private long readUntilEnd(FSDataInputStream in, byte[] buffer, long size,
    String fname, long pos, long visibleLen, boolean positionReadOption)
    throws IOException {

  if (pos >= visibleLen || visibleLen <= 0)
    return 0;

  int chunkNumber = 0;
  long totalByteRead = 0;
  long currentPosition = pos;
  int byteRead = 0;
  long byteLeftToRead = visibleLen - pos;
  int byteToReadThisRound = 0;

  if (!positionReadOption) {
    in.seek(pos);
    currentPosition = in.getPos();
  }
  if (verboseOption)
    LOG.info("reader begin: position: " + pos + " ; currentOffset = "
        + currentPosition + " ; bufferSize =" + buffer.length
        + " ; Filename = " + fname);
  try {
    while (byteLeftToRead > 0 && currentPosition < visibleLen) {
      byteToReadThisRound = (int) (byteLeftToRead >= buffer.length 
          ? buffer.length : byteLeftToRead);
      if (positionReadOption) {
        byteRead = in.read(currentPosition, buffer, 0, byteToReadThisRound);
      } else {
        byteRead = in.read(buffer, 0, byteToReadThisRound);
      }
      if (byteRead <= 0)
        break;
      chunkNumber++;
      totalByteRead += byteRead;
      currentPosition += byteRead;
      byteLeftToRead -= byteRead;

      if (verboseOption) {
        LOG.info("reader: Number of byte read: " + byteRead
            + " ; totalByteRead = " + totalByteRead + " ; currentPosition="
            + currentPosition + " ; chunkNumber =" + chunkNumber
            + "; File name = " + fname);
      }
    }
  } catch (IOException e) {
    throw new IOException(
        "#### Exception caught in readUntilEnd: reader  currentOffset = "
            + currentPosition + " ; totalByteRead =" + totalByteRead
            + " ; latest byteRead = " + byteRead + "; visibleLen= "
            + visibleLen + " ; bufferLen = " + buffer.length
            + " ; Filename = " + fname, e);
  }

  if (verboseOption)
    LOG.info("reader end:   position: " + pos + " ; currentOffset = "
        + currentPosition + " ; totalByteRead =" + totalByteRead
        + " ; Filename = " + fname);

  return totalByteRead;
}
 
源代码17 项目: crail   文件: HdfsIOBenchmark.java
public void readSequentialDirect() throws Exception {
	System.out.println("reading sequential file in direct mode " + path);
	Configuration conf = new Configuration();
	FileSystem fs = FileSystem.get(conf);
	FileStatus status = fs.getFileStatus(path);
	FSDataInputStream instream = fs.open(path);
	ByteBuffer buf = ByteBuffer.allocateDirect(size);
	buf.clear();
	double sumbytes = 0;
	double ops = 0;
	System.out.println("file capacity " + status.getLen());
	System.out.println("read size " + size);
	System.out.println("operations " + loop);
	
	long start = System.currentTimeMillis();
	while (ops < loop) {
		buf.clear();
		double ret = (double) instream.read(buf);
		if (ret > 0) {
			sumbytes = sumbytes + ret;
			ops = ops + 1.0;
		} else {
			ops = ops + 1.0;
			if (instream.getPos() == 0){
				break;
			} else {
				instream.seek(0);
			}
		}
	}
	long end = System.currentTimeMillis();
	double executionTime = ((double) (end - start)) / 1000.0;
	double throughput = 0.0;
	double latency = 0.0;
	double sumbits = sumbytes * 8.0;
	if (executionTime > 0) {
		throughput = sumbits / executionTime / 1024.0 / 1024.0;
		latency = 1000000.0 * executionTime / ops;
	}
	System.out.println("execution time " + executionTime);
	System.out.println("ops " + ops);
	System.out.println("sumbytes " + sumbytes);
	System.out.println("throughput " + throughput);
	System.out.println("latency " + latency);
	System.out.println("closing stream");
	instream.close();	
	fs.close();
}
 
源代码18 项目: crail   文件: HdfsIOBenchmark.java
public void readSequentialHeap() throws Exception {
	System.out.println("reading sequential file in heap mode " + path);
	Configuration conf = new Configuration();
	FileSystem fs = FileSystem.get(conf);
	FileStatus status = fs.getFileStatus(path);
	FSDataInputStream instream = fs.open(path);
	byte[] buf = new byte[size];
	double sumbytes = 0;
	double ops = 0;
	System.out.println("file capacity " + status.getLen());
	System.out.println("read size " + size);
	System.out.println("operations " + loop);
	
	long start = System.currentTimeMillis();
	while (ops < loop) {
		double ret = (double) this.read(instream, buf);
		if (ret > 0) {
			sumbytes = sumbytes + ret;
			ops = ops + 1.0;
		} else {
			ops = ops + 1.0;
			if (instream.getPos() == 0){
				break;
			} else {
				instream.seek(0);
			}
		}
	}
	long end = System.currentTimeMillis();
	double executionTime = ((double) (end - start)) / 1000.0;
	double throughput = 0.0;
	double latency = 0.0;
	double sumbits = sumbytes * 8.0;
	if (executionTime > 0) {
		throughput = sumbits / executionTime / 1024.0 / 1024.0;
		latency = 1000000.0 * executionTime / ops;
	}
	System.out.println("execution time " + executionTime);
	System.out.println("ops " + ops);
	System.out.println("sumbytes " + sumbytes);
	System.out.println("throughput " + throughput);
	System.out.println("latency " + latency);
	System.out.println("closing stream");
	instream.close();	
	fs.close();
}
 
/**
 * Recovers a file which exists on the disk. If the length of the file is not same as the
 * length which the operator remembers then the file is truncated. <br/>
 * When always writing to a temporary file, then a file is restored even when the length is same as what the
 * operator remembers however this is done only for files which had open streams that weren't closed before
 * failure.
 *
 * @param filename     name of the actual file.
 * @param partFileName name of the part file. When not rolling this is same as filename; otherwise this is the
 *                     latest open part file name.
 * @param filepath     path of the file. When always writing to temp file, this is the path of the temp file;
 *                     otherwise path of the actual file.
 * @throws IOException
 */
private void recoverFile(String filename, String partFileName, Path filepath) throws IOException
{
  LOG.debug("path exists {}", filepath);
  long offset = endOffsets.get(filename).longValue();
  FSDataInputStream inputStream = fs.open(filepath);
  FileStatus status = fs.getFileStatus(filepath);

  if (status.getLen() != offset) {
    LOG.info("path corrupted {} {} {}", filepath, offset, status.getLen());
    byte[] buffer = new byte[COPY_BUFFER_SIZE];
    String recoveryFileName = partFileName + '.' + System.currentTimeMillis() + TMP_EXTENSION;
    Path recoveryFilePath = new Path(filePath + Path.SEPARATOR + recoveryFileName);
    FSDataOutputStream fsOutput = openStream(recoveryFilePath, false);

    while (inputStream.getPos() < offset) {
      long remainingBytes = offset - inputStream.getPos();
      int bytesToWrite = remainingBytes < COPY_BUFFER_SIZE ? (int)remainingBytes : COPY_BUFFER_SIZE;
      inputStream.read(buffer);
      fsOutput.write(buffer, 0, bytesToWrite);
    }

    flush(fsOutput);
    fsOutput.close();
    inputStream.close();

    LOG.debug("active {} recovery {} ", filepath, recoveryFilePath);

    if (alwaysWriteToTmp) {
      //recovery file is used as the new tmp file and we cannot delete the old tmp file because when the operator
      //is restored to an earlier check-pointed window, it will look for an older tmp.
      fileNameToTmpName.put(partFileName, recoveryFileName);
    } else {
      LOG.debug("recovery path {} actual path {} ", recoveryFilePath, status.getPath());
      rename(recoveryFilePath, status.getPath());
    }
  } else {
    if (alwaysWriteToTmp && filesWithOpenStreams.contains(filename)) {
      String currentTmp = partFileName + '.' + System.currentTimeMillis() + TMP_EXTENSION;
      FSDataOutputStream outputStream = openStream(new Path(filePath + Path.SEPARATOR + currentTmp), false);
      IOUtils.copy(inputStream, outputStream);
      streamsCache.put(filename, new FSFilterStreamContext(outputStream));
      fileNameToTmpName.put(partFileName, currentTmp);
    }
    inputStream.close();
  }
}
 
源代码20 项目: jumbune   文件: DataValidationInputFormat.java
/**
 * Generate splits.
 *
 * @param job refers to JobContext that is being used to read the configurations of the job that ran
 * @param minSize refers to the minimum file block size.
 * @param maxSize refers to the maximum file block size.
 * @param splits refers  to a list of splits that are being generated.
 * @param file refers to the FileStatus required to determine block size,length,allocations.
 * @throws IOException Signals that an I/O exception has occurred.
 */
private void generateSplits(JobContext job, long minSize, long maxSize,
		List<InputSplit> splits, FileStatus file) throws IOException {
	Path path = file.getPath();
	int numOfRecordsInCurrentSplit = 0;
	int numOfRecordsInPreviousSplit = 0;
	FileSystem fs = path.getFileSystem(job.getConfiguration());
	long length = file.getLen();
	BlockLocation[] blkLocations = fs.getFileBlockLocations(file, 0,
			length);
	FSDataInputStream fsin = null ;
	if ((length != 0) && isSplitable(job, path)) {
		long blockSize = file.getBlockSize();
		long splitSize = computeSplitSize(blockSize, minSize, maxSize);
		long bytesRemaining = length;
		
		// checking the occurrences of the record separator in current
		// split
		recordSeparator = job.getConfiguration()
				.get(DataValidationConstants.RECORD_SEPARATOR)
				.getBytes();
		while (((double) bytesRemaining) / splitSize > SPLIT_SLOP) {
			int blkIndex = getBlockIndex(blkLocations, length
					- bytesRemaining);
			long start = length - bytesRemaining;
			long end = start + splitSize;
			try{
			fsin = fs.open(path);
			fsin.seek(start);
			long pos = start;
			int b = 0;
			int bufferPos = 0;
			while (true) {
				b = fsin.read();
				pos = fsin.getPos();
				if (b == -1) {
					break;}
				if (b == recordSeparator[bufferPos]) {
					bufferPos++;
					if (bufferPos == recordSeparator.length) {
						numOfRecordsInCurrentSplit++;
						bufferPos = 0;
						if (pos > end) {
							break;
						}
					}
				} else {
					// reset the value of buffer position to zero
					bufferPos = 0;
				}

			}}finally{
				if(fsin != null){
					fsin.close();
				}
			}

			splits.add(new DataValidationFileSplit(path, start,
					splitSize, numOfRecordsInPreviousSplit,
					blkLocations[blkIndex].getHosts()));
			bytesRemaining -= splitSize;
			numOfRecordsInPreviousSplit = numOfRecordsInCurrentSplit;
			numOfRecordsInCurrentSplit = 0;
		}

		addSplitIfBytesRemaining(splits, path, numOfRecordsInPreviousSplit,
				length, blkLocations, bytesRemaining);
	} else if (length != 0) {
		splits.add(new DataValidationFileSplit(path, 0, length,
				numOfRecordsInPreviousSplit, blkLocations[0].getHosts()));
	} else {
		splits.add(new DataValidationFileSplit(path, 0, length,
				numOfRecordsInPreviousSplit, new String[0]));
	}
}