org.apache.hadoop.fs.FSDataInputStream#seek ( )源码实例Demo

下面列出了org.apache.hadoop.fs.FSDataInputStream#seek ( ) 实例代码,或者点击链接到github查看源代码,也可以在右侧发表评论。

源代码1 项目: hadoop   文件: TestTracing.java
private void readTestFile(String testFileName) throws Exception {
  Path filePath = new Path(testFileName);
  FSDataInputStream istream = dfs.open(filePath, 10240);
  ByteBuffer buf = ByteBuffer.allocate(10240);

  int count = 0;
  try {
    while (istream.read(buf) > 0) {
      count += 1;
      buf.clear();
      istream.seek(istream.getPos() + 5);
    }
  } catch (IOException ioe) {
    // Ignore this it's probably a seek after eof.
  } finally {
    istream.close();
  }
}
 
源代码2 项目: hadoop   文件: TestSeekBug.java
private void seekReadFile(FileSystem fileSys, Path name) throws IOException {
  FSDataInputStream stm = fileSys.open(name, 4096);
  byte[] expected = new byte[ONEMB];
  Random rand = new Random(seed);
  rand.nextBytes(expected);
  
  // First read 128 bytes to set count in BufferedInputStream
  byte[] actual = new byte[128];
  stm.read(actual, 0, actual.length);
  // Now read a byte array that is bigger than the internal buffer
  actual = new byte[100000];
  IOUtils.readFully(stm, actual, 0, actual.length);
  checkAndEraseData(actual, 128, expected, "First Read Test");
  // now do a small seek, within the range that is already read
  stm.seek(96036); // 4 byte seek
  actual = new byte[128];
  IOUtils.readFully(stm, actual, 0, actual.length);
  checkAndEraseData(actual, 96036, expected, "Seek Bug");
  // all done
  stm.close();
}
 
源代码3 项目: RDFS   文件: TestHftpFileSystem.java
/**
 * Scenario: Read an under construction file using hftp.
 * 
 * Expected: Hftp should be able to read the latest byte after the file
 * has been hdfsSynced (but not yet closed).
 * 
 * @throws IOException
 */
public void testConcurrentRead() throws IOException {
  // Write a test file.
  FSDataOutputStream out = hdfs.create(TEST_FILE, true);
  out.writeBytes("123");
  out.sync();  // sync but not close
  
  // Try read using hftp.
  FSDataInputStream in = hftpFs.open(TEST_FILE);
  assertEquals('1', in.read());
  assertEquals('2', in.read());
  assertEquals('3', in.read());
  in.close();
  
  // Try seek and read.
  in = hftpFs.open(TEST_FILE);
  in.seek(2);
  assertEquals('3', in.read());
  in.close();
  
  out.close();
}
 
源代码4 项目: big-c   文件: TestSeekBug.java
/**
* Test (expected to throw IOE) for <code>FSDataInpuStream#seek</code>
* when the position argument is larger than the file size.
*/
@Test (expected=IOException.class)
public void testSeekPastFileSize() throws IOException {
  Configuration conf = new HdfsConfiguration();
  MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf).build();
  FileSystem fs = cluster.getFileSystem();
  try {
    Path seekFile = new Path("seekboundaries.dat");
    DFSTestUtil.createFile(
      fs,
      seekFile,
      ONEMB,
      fs.getDefaultReplication(seekFile),
      seed);
    FSDataInputStream stream = fs.open(seekFile);
    // Perform "safe seek" (expected to pass)
    stream.seek(65536);
    assertEquals(65536, stream.getPos());
    // expect IOE for this call
    stream.seek(ONEMB + ONEMB + ONEMB);
  } finally {
    fs.close();
    cluster.shutdown();
  }
}
 
源代码5 项目: RDFS   文件: BCFile.java
/**
 * Constructor
 * 
 * @param fin
 *          FS input stream.
 * @param fileLength
 *          Length of the corresponding file
 * @throws IOException
 */
public Reader(FSDataInputStream fin, long fileLength, Configuration conf)
    throws IOException {
  this.in = fin;
  this.conf = conf;

  // move the cursor to the beginning of the tail, containing: offset to the
  // meta block index, version and magic
  fin.seek(fileLength - Magic.size() - Version.size() - Long.SIZE
      / Byte.SIZE);
  long offsetIndexMeta = fin.readLong();
  version = new Version(fin);
  Magic.readAndVerify(fin);

  if (!version.compatibleWith(BCFile.API_VERSION)) {
    throw new RuntimeException("Incompatible BCFile fileBCFileVersion.");
  }

  // read meta index
  fin.seek(offsetIndexMeta);
  metaIndex = new MetaIndex(fin);

  // read data:BCFile.index, the data block index
  BlockReader blockR = getMetaBlock(DataIndex.BLOCK_NAME);
  try {
    dataIndex = new DataIndex(blockR);
  } finally {
    blockR.close();
  }
}
 
源代码6 项目: hadoop-gpu   文件: LineRecordReader.java
public LineRecordReader(Configuration job, 
                        FileSplit split) throws IOException {
  this.maxLineLength = job.getInt("mapred.linerecordreader.maxlength",
                                  Integer.MAX_VALUE);
  start = split.getStart();
  end = start + split.getLength();
  final Path file = split.getPath();
  compressionCodecs = new CompressionCodecFactory(job);
  final CompressionCodec codec = compressionCodecs.getCodec(file);

  // open the file and seek to the start of the split
  FileSystem fs = file.getFileSystem(job);
  FSDataInputStream fileIn = fs.open(split.getPath());
  boolean skipFirstLine = false;
  if (codec != null) {
    in = new LineReader(codec.createInputStream(fileIn), job);
    end = Long.MAX_VALUE;
  } else {
    if (start != 0) {
      skipFirstLine = true;
      --start;
      fileIn.seek(start);
    }
    in = new LineReader(fileIn, job);
  }
  if (skipFirstLine) {  // skip first line and re-establish "start".
    start += in.readLine(new Text(), 0,
                         (int)Math.min((long)Integer.MAX_VALUE, end - start));
  }
  this.pos = start;
}
 
源代码7 项目: hadoop   文件: TestHadoopArchives.java
private static void expectSeekIOE(FSDataInputStream fsdis, long seekPos, String message) {
  try {
    fsdis.seek(seekPos);
    assertTrue(message + " (Position = " + fsdis.getPos() + ")", false);
  } catch (IOException ioe) {
    // okay
  }
}
 
源代码8 项目: kylin   文件: FSInputGeneralColumnDataReader.java
public FSInputGeneralColumnDataReader(FSDataInputStream fsInputStream, int dataStartOffset, int dataLength)
        throws IOException {
    this.fsInputStream = fsInputStream;
    fsInputStream.seek(dataStartOffset + dataLength - 4L);
    this.numOfVals = fsInputStream.readInt();
    fsInputStream.seek(dataStartOffset);
}
 
源代码9 项目: hadoop   文件: BCFile.java
/**
 * Constructor
 * 
 * @param fin
 *          FS input stream.
 * @param fileLength
 *          Length of the corresponding file
 * @throws IOException
 */
public Reader(FSDataInputStream fin, long fileLength, Configuration conf)
    throws IOException {
  this.in = fin;
  this.conf = conf;

  // move the cursor to the beginning of the tail, containing: offset to the
  // meta block index, version and magic
  fin.seek(fileLength - Magic.size() - Version.size() - Long.SIZE
      / Byte.SIZE);
  long offsetIndexMeta = fin.readLong();
  version = new Version(fin);
  Magic.readAndVerify(fin);

  if (!version.compatibleWith(BCFile.API_VERSION)) {
    throw new RuntimeException("Incompatible BCFile fileBCFileVersion.");
  }

  // read meta index
  fin.seek(offsetIndexMeta);
  metaIndex = new MetaIndex(fin);

  // read data:BCFile.index, the data block index
  BlockReader blockR = getMetaBlock(DataIndex.BLOCK_NAME);
  try {
    dataIndex = new DataIndex(blockR);
  } finally {
    blockR.close();
  }
}
 
源代码10 项目: big-c   文件: TestCachingStrategy.java
@Test(timeout=120000)
public void testSeekAfterSetDropBehind() throws Exception {
  // start a cluster
  LOG.info("testSeekAfterSetDropBehind");
  Configuration conf = new HdfsConfiguration();
  MiniDFSCluster cluster = null;
  String TEST_PATH = "/test";
  int TEST_PATH_LEN = MAX_TEST_FILE_LEN;
  try {
    cluster = new MiniDFSCluster.Builder(conf).numDataNodes(1)
        .build();
    cluster.waitActive();
    FileSystem fs = cluster.getFileSystem();
    createHdfsFile(fs, new Path(TEST_PATH), TEST_PATH_LEN, false);
    // verify that we can seek after setDropBehind
    FSDataInputStream fis = fs.open(new Path(TEST_PATH));
    try {
      Assert.assertTrue(fis.read() != -1); // create BlockReader
      fis.setDropBehind(false); // clear BlockReader
      fis.seek(2); // seek
    } finally {
      fis.close();
    }
  } finally {
    if (cluster != null) {
      cluster.shutdown();
    }
  }
}
 
源代码11 项目: big-c   文件: MapTask.java
@SuppressWarnings("unchecked")
private <T> T getSplitDetails(Path file, long offset) 
 throws IOException {
  FileSystem fs = file.getFileSystem(conf);
  FSDataInputStream inFile = fs.open(file);
  inFile.seek(offset);
  String className = StringInterner.weakIntern(Text.readString(inFile));
  Class<T> cls;
  try {
    cls = (Class<T>) conf.getClassByName(className);
  } catch (ClassNotFoundException ce) {
    IOException wrap = new IOException("Split class " + className + 
                                        " not found");
    wrap.initCause(ce);
    throw wrap;
  }
  SerializationFactory factory = new SerializationFactory(conf);
  Deserializer<T> deserializer = 
    (Deserializer<T>) factory.getDeserializer(cls);
  deserializer.open(inFile);
  T split = deserializer.deserialize(null);
  long pos = inFile.getPos();
  getCounters().findCounter(
      TaskCounter.SPLIT_RAW_BYTES).increment(pos - offset);
  inFile.close();
  return split;
}
 
源代码12 项目: attic-apex-malhar   文件: DTBCFile.java
/**
 * Constructor
 *
 * @param fin
 *          FS input stream.
 * @param fileLength
 *          Length of the corresponding file
 * @throws IOException
 */
public Reader(FSDataInputStream fin, long fileLength, Configuration conf)
    throws IOException {
  this.in = fin;
  this.conf = conf;
  // A reader buffer to read the block
  baos = new ByteArrayOutputStream(DTFile.getFSInputBufferSize(conf) * 2);
  this.cacheKeys = new ArrayList<String>();
  // move the cursor to the beginning of the tail, containing: offset to the
  // meta block index, version and magic
  fin.seek(fileLength - Magic.size() - Version.size() - Long.SIZE
      / Byte.SIZE);
  long offsetIndexMeta = fin.readLong();
  version = new Version(fin);
  Magic.readAndVerify(fin);

  if (!version.compatibleWith(DTBCFile.API_VERSION)) {
    throw new RuntimeException("Incompatible BCFile fileBCFileVersion.");
  }

  // read meta index
  fin.seek(offsetIndexMeta);
  metaIndex = new MetaIndex(fin);

  // read data:BCFile.index, the data block index
  BlockReader blockR = getMetaBlock(DataIndex.BLOCK_NAME);
  try {
    dataIndex = new DataIndex(blockR);
  } finally {
    blockR.close();
  }
}
 
源代码13 项目: RDFS   文件: FileStripeReader.java
@Override
public InputStream[] getNextStripeInputs() throws IOException {
  InputStream[] blocks = new InputStream[codec.stripeLength];
  try {
    for (int i = 0; i < codec.stripeLength; i++) {
      long seekOffset = stripeStartOffset + i * blockSize;
      if (seekOffset < srcSize) {
        FSDataInputStream in = fs.open(srcFile, bufferSize);
        in.seek(seekOffset);
        LOG.info("Opening stream at " + srcFile + ":" + seekOffset);
        blocks[i] = in;
      } else {
        LOG.info("Using zeros at offset " + seekOffset);
        // We have no src data at this offset.
        blocks[i] = new RaidUtils.ZeroInputStream(
                          seekOffset + blockSize);
      }
    }
    stripeStartOffset += blockSize * codec.stripeLength;
    return blocks;
  } catch (IOException e) {
    // If there is an error during opening a stream, close the previously
    // opened streams and re-throw.
    RaidUtils.closeStreams(blocks);
    throw e;
  }
}
 
源代码14 项目: hadoop   文件: TestFSInputChecker.java
private void readAndCompare(FSDataInputStream in, int position, int len)
    throws IOException {
  byte[] b = new byte[len];
  in.seek(position);
  IOUtils.readFully(in, b, 0, b.length);

  for (int i = 0; i < b.length; i++) {
    assertEquals(expected[position + i], b[i]);
  }
}
 
源代码15 项目: hive-dwrf   文件: InStream.java
public static void read(FSDataInputStream file, long fileOffset, byte[] array, int arrayOffset,
    int length) throws IOException {
  file.seek(fileOffset);
  file.readFully(array, arrayOffset, length);
}
 
源代码16 项目: incubator-crail   文件: HdfsIOBenchmark.java
public void readSequentialDirect() throws Exception {
	System.out.println("reading sequential file in direct mode " + path);
	Configuration conf = new Configuration();
	FileSystem fs = FileSystem.get(conf);
	FileStatus status = fs.getFileStatus(path);
	FSDataInputStream instream = fs.open(path);
	ByteBuffer buf = ByteBuffer.allocateDirect(size);
	buf.clear();
	double sumbytes = 0;
	double ops = 0;
	System.out.println("file capacity " + status.getLen());
	System.out.println("read size " + size);
	System.out.println("operations " + loop);
	
	long start = System.currentTimeMillis();
	while (ops < loop) {
		buf.clear();
		double ret = (double) instream.read(buf);
		if (ret > 0) {
			sumbytes = sumbytes + ret;
			ops = ops + 1.0;
		} else {
			ops = ops + 1.0;
			if (instream.getPos() == 0){
				break;
			} else {
				instream.seek(0);
			}
		}
	}
	long end = System.currentTimeMillis();
	double executionTime = ((double) (end - start)) / 1000.0;
	double throughput = 0.0;
	double latency = 0.0;
	double sumbits = sumbytes * 8.0;
	if (executionTime > 0) {
		throughput = sumbits / executionTime / 1024.0 / 1024.0;
		latency = 1000000.0 * executionTime / ops;
	}
	System.out.println("execution time " + executionTime);
	System.out.println("ops " + ops);
	System.out.println("sumbytes " + sumbytes);
	System.out.println("throughput " + throughput);
	System.out.println("latency " + latency);
	System.out.println("closing stream");
	instream.close();	
	fs.close();
}
 
源代码17 项目: incubator-crail   文件: HdfsIOBenchmark.java
public void readSequentialHeap() throws Exception {
	System.out.println("reading sequential file in heap mode " + path);
	Configuration conf = new Configuration();
	FileSystem fs = FileSystem.get(conf);
	FileStatus status = fs.getFileStatus(path);
	FSDataInputStream instream = fs.open(path);
	byte[] buf = new byte[size];
	double sumbytes = 0;
	double ops = 0;
	System.out.println("file capacity " + status.getLen());
	System.out.println("read size " + size);
	System.out.println("operations " + loop);
	
	long start = System.currentTimeMillis();
	while (ops < loop) {
		double ret = (double) this.read(instream, buf);
		if (ret > 0) {
			sumbytes = sumbytes + ret;
			ops = ops + 1.0;
		} else {
			ops = ops + 1.0;
			if (instream.getPos() == 0){
				break;
			} else {
				instream.seek(0);
			}
		}
	}
	long end = System.currentTimeMillis();
	double executionTime = ((double) (end - start)) / 1000.0;
	double throughput = 0.0;
	double latency = 0.0;
	double sumbits = sumbytes * 8.0;
	if (executionTime > 0) {
		throughput = sumbits / executionTime / 1024.0 / 1024.0;
		latency = 1000000.0 * executionTime / ops;
	}
	System.out.println("execution time " + executionTime);
	System.out.println("ops " + ops);
	System.out.println("sumbytes " + sumbytes);
	System.out.println("throughput " + throughput);
	System.out.println("latency " + latency);
	System.out.println("closing stream");
	instream.close();	
	fs.close();
}
 
源代码18 项目: Cubert   文件: RubixRecordReader.java
public void initialize(InputSplit split, Configuration conf) throws IOException,
        InterruptedException
{
    @SuppressWarnings("unchecked")
    RubixInputSplit<K, V> rsplit = (RubixInputSplit<K, V>) split;

    SerializationFactory serializationFactory = new SerializationFactory(conf);
    switch (rsplit.getBlockSerializationType())
    {
    case DEFAULT:
        valueDeserializer =
                serializationFactory.getDeserializer(rsplit.getValueClass());
        break;
    case COMPACT:
        BlockSchema schema = rsplit.getSchema();
        valueDeserializer = new CompactDeserializer<V>(schema);
        break;
    }

    key = rsplit.getKey();

    // store the blockid and partition key in the conf
    conf.setLong("MY_BLOCK_ID", rsplit.getBlockId());
    conf.setLong("MY_NUM_RECORDS", rsplit.getNumRecords());
    ByteArrayOutputStream tmpOut = new ByteArrayOutputStream();
    ((Tuple) key).write(new DataOutputStream(tmpOut));
    String keySerialized = SerializerUtils.serializeToString(tmpOut.toByteArray());
    conf.set("MY_PARTITION_KEY", keySerialized);

    Path path = rsplit.getFilename();
    offset = rsplit.getOffset();
    length = rsplit.getLength();

    FileSystem fs = path.getFileSystem(conf);
    FSDataInputStream fsin = fs.open(path);
    fsin.seek(offset);

    blockInputStream = new BlockInputStream(fsin, length);
    in = blockInputStream;

    CompressionCodec codec = new CompressionCodecFactory(conf).getCodec(path);
    if (codec != null)
    {
        print.f("codec is not null and it is %s", codec.getClass().toString());
        in = codec.createInputStream(in);
    }
    else
    {
        print.f("codec is null");
    }

    valueDeserializer.open(in);
}
 
源代码19 项目: jumbune   文件: DataValidationInputFormat.java
/**
 * Generate splits.
 *
 * @param job refers to JobContext that is being used to read the configurations of the job that ran
 * @param minSize refers to the minimum file block size.
 * @param maxSize refers to the maximum file block size.
 * @param splits refers  to a list of splits that are being generated.
 * @param file refers to the FileStatus required to determine block size,length,allocations.
 * @throws IOException Signals that an I/O exception has occurred.
 */
private void generateSplits(JobContext job, long minSize, long maxSize,
		List<InputSplit> splits, FileStatus file) throws IOException {
	Path path = file.getPath();
	int numOfRecordsInCurrentSplit = 0;
	int numOfRecordsInPreviousSplit = 0;
	FileSystem fs = path.getFileSystem(job.getConfiguration());
	long length = file.getLen();
	BlockLocation[] blkLocations = fs.getFileBlockLocations(file, 0,
			length);
	FSDataInputStream fsin = null ;
	if ((length != 0) && isSplitable(job, path)) {
		long blockSize = file.getBlockSize();
		long splitSize = computeSplitSize(blockSize, minSize, maxSize);
		long bytesRemaining = length;
		
		// checking the occurrences of the record separator in current
		// split
		recordSeparator = job.getConfiguration()
				.get(DataValidationConstants.RECORD_SEPARATOR)
				.getBytes();
		while (((double) bytesRemaining) / splitSize > SPLIT_SLOP) {
			int blkIndex = getBlockIndex(blkLocations, length
					- bytesRemaining);
			long start = length - bytesRemaining;
			long end = start + splitSize;
			try{
			fsin = fs.open(path);
			fsin.seek(start);
			long pos = start;
			int b = 0;
			int bufferPos = 0;
			while (true) {
				b = fsin.read();
				pos = fsin.getPos();
				if (b == -1) {
					break;}
				if (b == recordSeparator[bufferPos]) {
					bufferPos++;
					if (bufferPos == recordSeparator.length) {
						numOfRecordsInCurrentSplit++;
						bufferPos = 0;
						if (pos > end) {
							break;
						}
					}
				} else {
					// reset the value of buffer position to zero
					bufferPos = 0;
				}

			}}finally{
				if(fsin != null){
					fsin.close();
				}
			}

			splits.add(new DataValidationFileSplit(path, start,
					splitSize, numOfRecordsInPreviousSplit,
					blkLocations[blkIndex].getHosts()));
			bytesRemaining -= splitSize;
			numOfRecordsInPreviousSplit = numOfRecordsInCurrentSplit;
			numOfRecordsInCurrentSplit = 0;
		}

		addSplitIfBytesRemaining(splits, path, numOfRecordsInPreviousSplit,
				length, blkLocations, bytesRemaining);
	} else if (length != 0) {
		splits.add(new DataValidationFileSplit(path, 0, length,
				numOfRecordsInPreviousSplit, blkLocations[0].getHosts()));
	} else {
		splits.add(new DataValidationFileSplit(path, 0, length,
				numOfRecordsInPreviousSplit, new String[0]));
	}
}
 
源代码20 项目: Cubert   文件: RubixFile.java
@SuppressWarnings("unchecked")
public List<KeyData<K>> getKeyData() throws IOException,
        ClassNotFoundException
{
    if (keyData != null)
        return keyData;

    final FileSystem fs = FileSystem.get(conf);
    keyData = new ArrayList<KeyData<K>>();

    final long filesize = fs.getFileStatus(path).getLen();
    FSDataInputStream in = fs.open(path);

    /* The last long in the file is the start position of the trailer section */
    in.seek(filesize - 8);
    long metaDataStartPos = in.readLong();

    in.seek(metaDataStartPos);

    ObjectMapper mapper = new ObjectMapper();
    metadataJson = mapper.readValue(in.readUTF(), JsonNode.class);

    int keySectionSize = in.readInt();

    // load the key section
    byte[] keySection = new byte[keySectionSize];

    in.seek(filesize - keySectionSize - 8);
    in.read(keySection, 0, keySectionSize);
    in.close();

    ByteArrayInputStream bis = new ByteArrayInputStream(keySection);
    DataInput dataInput = new DataInputStream(bis);

    int numberOfBlocks = metadataJson.get("numberOfBlocks").getIntValue();

    // load the key section
    keyClass = (Class<K>) ClassCache.forName(JsonUtils.getText(metadataJson, "keyClass"));
    valueClass =
            (Class<V>) ClassCache.forName(JsonUtils.getText(metadataJson, "valueClass"));

    SerializationFactory serializationFactory = new SerializationFactory(conf);
    Deserializer<K> deserializer = serializationFactory.getDeserializer(keyClass);

    deserializer.open(bis);

    while (bis.available() > 0 && numberOfBlocks > 0)
    {
        K key = deserializer.deserialize(null);

        long offset = dataInput.readLong();
        long blockId = dataInput.readLong();
        long numRecords = dataInput.readLong();

        keyData.add(new KeyData<K>(key, offset, 0, numRecords, blockId));
        numberOfBlocks--;
    }

    // Assign length to each keydata entry
    int numEntries = keyData.size();
    for (int i = 1; i < numEntries; i++)
    {
        KeyData<K> prev = keyData.get(i - 1);
        KeyData<K> current = keyData.get(i);

        prev.setLength(current.getOffset() - prev.getOffset());
    }

    if (numEntries > 0)
    {
        KeyData<K> last = keyData.get(numEntries - 1);
        last.setLength(metaDataStartPos - last.offset);
    }

    return keyData;
}