类org.apache.hadoop.io.SequenceFile源码实例Demo

下面列出了怎么用org.apache.hadoop.io.SequenceFile的API类实例代码及写法,或者点击链接到github查看源代码。

源代码1 项目: hadoop   文件: DistributedFSCheck.java
private void createInputFile(String rootName) throws IOException {
  cleanup();  // clean up if previous run failed

  Path inputFile = new Path(MAP_INPUT_DIR, "in_file");
  SequenceFile.Writer writer =
    SequenceFile.createWriter(fs, fsConfig, inputFile, 
                              Text.class, LongWritable.class, CompressionType.NONE);
  
  try {
    nrFiles = 0;
    listSubtree(new Path(rootName), writer);
  } finally {
    writer.close();
  }
  LOG.info("Created map input files.");
}
 
源代码2 项目: spork   文件: TestSequenceFileLoader.java
@Override
public void setUp() throws Exception {
  pigServer = new PigServer(LOCAL);
  File tmpFile = File.createTempFile("test", ".txt");
  tmpFileName = tmpFile.getAbsolutePath();
  System.err.println("fileName: "+tmpFileName);
  Path path = new Path("file:///"+tmpFileName);
  JobConf conf = new JobConf();
  FileSystem fs = FileSystem.get(path.toUri(), conf);

  IntWritable key = new IntWritable();
  Text value = new Text();
  SequenceFile.Writer writer = null;
  try {
    writer = SequenceFile.createWriter(fs, conf, path,
                                       key.getClass(), value.getClass());
    for (int i=0; i < DATA.length; i++) {
      key.set(i);
      value.set(DATA[i]);
      writer.append(key, value);
    }
  } finally {
    IOUtils.closeStream(writer);
  }
}
 
源代码3 项目: compiler   文件: SeqCombiner.java
public static long readAndAppendAst(Configuration conf, FileSystem fileSystem, MapFile.Writer writer, String fileName, long lastKey) throws IOException {
	long newLastKey = lastKey;
	SequenceFile.Reader r = new SequenceFile.Reader(fileSystem, new Path(fileName), conf);
	LongWritable longKey = new LongWritable();
	BytesWritable value = new BytesWritable();
	try {
		while (r.next(longKey, value)) {
			newLastKey = longKey.get() + lastKey;
			writer.append(new LongWritable(newLastKey), value);
		}
	} catch (Exception e) {
		System.err.println(fileName);
		e.printStackTrace();
	} finally {
		r.close();
	}
	return newLastKey;
}
 
/**
 * Get token from the token sequence file.
 * @param authPath
 * @param proxyUserName
 * @return Token for proxyUserName if it exists.
 * @throws IOException
 */
private static Optional<Token<?>> getTokenFromSeqFile(String authPath, String proxyUserName) throws IOException {
  try (Closer closer = Closer.create()) {
    FileSystem localFs = FileSystem.getLocal(new Configuration());
    SequenceFile.Reader tokenReader =
        closer.register(new SequenceFile.Reader(localFs, new Path(authPath), localFs.getConf()));
    Text key = new Text();
    Token<?> value = new Token<>();
    while (tokenReader.next(key, value)) {
      LOG.info("Found token for " + key);
      if (key.toString().equals(proxyUserName)) {
        return Optional.<Token<?>> of(value);
      }
    }
  }
  return Optional.absent();
}
 
源代码5 项目: suro   文件: TestSequenceFileWriter.java
private int checkFileContents(String filePath, String message) throws IOException {
    SequenceFile.Reader r = new SequenceFile.Reader(
            FileSystem.get(new Configuration()),
            new Path(filePath),
            new Configuration());

    Text routingKey = new Text();
    MessageWritable value = new MessageWritable();
    StringSerDe serde = new StringSerDe();

    int i = 0;
    while (r.next(routingKey, value)) {
        assertEquals(routingKey.toString(), "routingKey");
        assertEquals(serde.deserialize(value.getMessage().getPayload()), message + i);
        ++i;
    }
    r.close();

    return i;
}
 
源代码6 项目: RDFS   文件: TestMapRed.java
public void reduce(WritableComparable key, Iterator values,
                   OutputCollector output, Reporter reporter
                   ) throws IOException {
  if (first) {
    first = false;
    MapOutputFile mapOutputFile = new MapOutputFile(taskId.getJobID());
    mapOutputFile.setConf(conf);
    Path input = mapOutputFile.getInputFile(0, taskId);
    FileSystem fs = FileSystem.get(conf);
    assertTrue("reduce input exists " + input, fs.exists(input));
    SequenceFile.Reader rdr = 
      new SequenceFile.Reader(fs, input, conf);
    assertEquals("is reduce input compressed " + input, 
                 compressInput, 
                 rdr.isCompressed());
    rdr.close();          
  }
}
 
源代码7 项目: alchemy   文件: SequenceFileUtil.java
public static void writeSequenceFile(String path) throws Exception{
	Writer.Option filePath = Writer.file(new Path(path));
	Writer.Option keyClass = Writer.keyClass(IntWritable.class);
	Writer.Option valueClass = Writer.valueClass(Text.class);
	Writer.Option compression = Writer.compression(CompressionType.NONE);
	Writer writer = SequenceFile.createWriter(configuration, filePath, keyClass, valueClass, compression);
	IntWritable key = new IntWritable();
	Text value = new Text("");
	for(int i=0;i<100;i++){
		key.set(i);
		value.set("value_"+i);
		writer.append(key, value);
	}
	writer.hflush();
	writer.close();
}
 
源代码8 项目: hadoop   文件: TestCopyListing.java
@Test
public void testFailOnCloseError() throws IOException {
  File inFile = File.createTempFile("TestCopyListingIn", null);
  inFile.deleteOnExit();
  File outFile = File.createTempFile("TestCopyListingOut", null);
  outFile.deleteOnExit();
  List<Path> srcs = new ArrayList<Path>();
  srcs.add(new Path(inFile.toURI()));
  
  Exception expectedEx = new IOException("boom");
  SequenceFile.Writer writer = mock(SequenceFile.Writer.class);
  doThrow(expectedEx).when(writer).close();
  
  SimpleCopyListing listing = new SimpleCopyListing(getConf(), CREDENTIALS);
  DistCpOptions options = new DistCpOptions(srcs, new Path(outFile.toURI()));
  Exception actualEx = null;
  try {
    listing.doBuildListing(writer, options);
  } catch (Exception e) {
    actualEx = e;
  }
  Assert.assertNotNull("close writer didn't fail", actualEx);
  Assert.assertEquals(expectedEx, actualEx);
}
 
源代码9 项目: big-c   文件: TestUniformSizeInputFormat.java
private void checkSplits(Path listFile, List<InputSplit> splits) throws IOException {
  long lastEnd = 0;

  //Verify if each split's start is matching with the previous end and
  //we are not missing anything
  for (InputSplit split : splits) {
    FileSplit fileSplit = (FileSplit) split;
    long start = fileSplit.getStart();
    Assert.assertEquals(lastEnd, start);
    lastEnd = start + fileSplit.getLength();
  }

  //Verify there is nothing more to read from the input file
  SequenceFile.Reader reader
          = new SequenceFile.Reader(cluster.getFileSystem().getConf(),
                  SequenceFile.Reader.file(listFile));

  try {
    reader.seek(lastEnd);
    CopyListingFileStatus srcFileStatus = new CopyListingFileStatus();
    Text srcRelPath = new Text();
    Assert.assertFalse(reader.next(srcRelPath, srcFileStatus));
  } finally {
    IOUtils.closeStream(reader);
  }
}
 
源代码10 项目: big-c   文件: DistributedFSCheck.java
private void createInputFile(String rootName) throws IOException {
  cleanup();  // clean up if previous run failed

  Path inputFile = new Path(MAP_INPUT_DIR, "in_file");
  SequenceFile.Writer writer =
    SequenceFile.createWriter(fs, fsConfig, inputFile, 
                              Text.class, LongWritable.class, CompressionType.NONE);
  
  try {
    nrFiles = 0;
    listSubtree(new Path(rootName), writer);
  } finally {
    writer.close();
  }
  LOG.info("Created map input files.");
}
 
源代码11 项目: coming   文件: 1000021_TestCDbwEvaluator_t.java
private void checkRefPoints(int numIterations) throws IOException {
  for (int i = 0; i <= numIterations; i++) {
    Path out = new Path(getTestTempDirPath("output"), "representativePoints-" + i);
    Configuration conf = new Configuration();
    FileSystem fs = FileSystem.get(conf);
    for (FileStatus file : fs.listStatus(out)) {
      if (!file.getPath().getName().startsWith(".")) {
        SequenceFile.Reader reader = new SequenceFile.Reader(fs, file.getPath(), conf);
        try {
          Writable clusterId = new IntWritable(0);
          VectorWritable point = new VectorWritable();
          while (reader.next(clusterId, point)) {
            System.out.println("\tC-" + clusterId + ": " + AbstractCluster.formatVector(point.get(), null));
          }
        } finally {
          reader.close();
        }
      }
    }
  }
}
 
源代码12 项目: mapreduce-kmeans   文件: KMeansReducer.java
@SuppressWarnings("deprecation")
@Override
protected void cleanup(Context context) throws IOException, InterruptedException {
	super.cleanup(context);
	Configuration conf = context.getConfiguration();
	Path outPath = new Path(conf.get("centroid.path"));
	FileSystem fs = FileSystem.get(conf);
	fs.delete(outPath, true);
	try (SequenceFile.Writer out = SequenceFile.createWriter(fs, context.getConfiguration(), outPath,
			ClusterCenter.class, IntWritable.class)) {
		final IntWritable value = new IntWritable(0);
		for (ClusterCenter center : centers) {
			out.append(center, value);
		}
	}
}
 
源代码13 项目: hadoop   文件: TestJoinDatamerge.java
private static int countProduct(IntWritable key, Path[] src, 
    Configuration conf) throws IOException {
  int product = 1;
  for (Path p : src) {
    int count = 0;
    SequenceFile.Reader r = new SequenceFile.Reader(
      cluster.getFileSystem(), p, conf);
    IntWritable k = new IntWritable();
    IntWritable v = new IntWritable();
    while (r.next(k, v)) {
      if (k.equals(key)) {
        count++;
      }
    }
    r.close();
    if (count != 0) {
      product *= count;
    }
  }
  return product;
}
 
源代码14 项目: big-c   文件: TestGlobbedCopyListing.java
private void verifyContents(Path listingPath) throws Exception {
  SequenceFile.Reader reader = new SequenceFile.Reader(cluster.getFileSystem(),
                                            listingPath, new Configuration());
  Text key   = new Text();
  CopyListingFileStatus value = new CopyListingFileStatus();
  Map<String, String> actualValues = new HashMap<String, String>();
  while (reader.next(key, value)) {
    if (value.isDirectory() && key.toString().equals("")) {
      // ignore root with empty relPath, which is an entry to be 
      // used for preserving root attributes etc.
      continue;
    }
    actualValues.put(value.getPath().toString(), key.toString());
  }

  Assert.assertEquals(expectedValues.size(), actualValues.size());
  for (Map.Entry<String, String> entry : actualValues.entrySet()) {
    Assert.assertEquals(entry.getValue(), expectedValues.get(entry.getKey()));
  }
}
 
private void walkOutput(Path output, Configuration conf, ResultReader resultReader) throws IOException {
  FileSystem fileSystem = output.getFileSystem(conf);
  FileStatus fileStatus = fileSystem.getFileStatus(output);
  if (fileStatus.isDir()) {
    FileStatus[] listStatus = fileSystem.listStatus(output, new PathFilter() {
      @Override
      public boolean accept(Path path) {
        return !path.getName().startsWith("_");
      }
    });
    for (FileStatus fs : listStatus) {
      walkOutput(fs.getPath(), conf, resultReader);
    }
  } else {
    Reader reader = new SequenceFile.Reader(fileSystem, output, conf);
    Text rowId = new Text();
    TableBlurRecord tableBlurRecord = new TableBlurRecord();
    while (reader.next(rowId, tableBlurRecord)) {
      resultReader.read(rowId, tableBlurRecord);
    }
    reader.close();
  }
}
 
源代码16 项目: systemds   文件: TensorReaderBinaryBlockParallel.java
@Override
public Object call() throws Exception {
	TensorBlock value = new TensorBlock();
	TensorIndexes key = new TensorIndexes();
	//directly read from sequence files (individual partfiles)
	try(SequenceFile.Reader reader = new SequenceFile.Reader(_job, SequenceFile.Reader.file(_path))) {
		//note: next(key, value) does not yet exploit the given serialization classes, 
		//record reader does but is generally slower.
		while (reader.next(key, value)) {
			if( value.isEmpty(false) )
				continue;
			int[] lower = new int[_dims.length];
			int[] upper = new int[lower.length];
			UtilFunctions.getBlockBounds(key, value.getLongDims(), _blen, lower, upper);
			_dest.copy(lower, upper, value);
		}
	}
	
	return null;
}
 
源代码17 项目: nutchpy   文件: SequenceWriter.java
public static void write_seq() throws IOException {

        String uri = "test_file.seq";
        Configuration conf = new Configuration();
        FileSystem fs = FileSystem.get(URI.create(uri), conf);
        Path path = new Path(uri);

        IntWritable key = new IntWritable();
        Text value = new Text();

        SequenceFile.Writer writer = null;

        try {
            writer = SequenceFile.createWriter(fs, conf, path,
                    key.getClass(), value.getClass());
            for (int i = 0; i < 100; i++) {
                key.set(100 - i);
                value.set(DATA[i % DATA.length]);
                System.out.printf("[%s]\t%s\t%s\n", writer.getLength(), key, value);
                writer.append(key, value);
            }
        } finally {
            IOUtils.closeStream(writer); }
    }
 
源代码18 项目: flink   文件: SequenceStreamingFileSinkITCase.java
private List<Tuple2<Long, String>> readSequenceFile(File file) throws IOException {
	SequenceFile.Reader reader = new SequenceFile.Reader(
		configuration, SequenceFile.Reader.file(new org.apache.hadoop.fs.Path(file.toURI())));
	LongWritable key = new LongWritable();
	Text val = new Text();
	ArrayList<Tuple2<Long, String>> results = new ArrayList<>();
	while (reader.next(key, val)) {
		results.add(new Tuple2<>(key.get(), val.toString()));
	}
	reader.close();
	return results;
}
 
源代码19 项目: Flink-CEPplus   文件: SequenceFileWriter.java
@Override
public void open(FileSystem fs, Path path) throws IOException {
	super.open(fs, path);
	if (keyClass == null) {
		throw new IllegalStateException("Key Class has not been initialized.");
	}
	if (valueClass == null) {
		throw new IllegalStateException("Value Class has not been initialized.");
	}

	CompressionCodec codec = null;

	Configuration conf = fs.getConf();

	if (!compressionCodecName.equals("None")) {
		CompressionCodecFactory codecFactory = new CompressionCodecFactory(conf);
		codec = codecFactory.getCodecByName(compressionCodecName);
		if (codec == null) {
			throw new RuntimeException("Codec " + compressionCodecName + " not found.");
		}
	}

	// the non-deprecated constructor syntax is only available in recent hadoop versions...
	writer = SequenceFile.createWriter(conf,
			getStream(),
			keyClass,
			valueClass,
			compressionType,
			codec);
}
 
源代码20 项目: hadoop-gpu   文件: TestMapRed.java
private static void printSequenceFile(FileSystem fs, Path p, 
                                      Configuration conf) throws IOException {
  SequenceFile.Reader r = new SequenceFile.Reader(fs, p, conf);
  Object key = null;
  Object value = null;
  while ((key = r.next(key)) != null) {
    value = r.getCurrentValue(value);
    System.out.println("  Row: " + key + ", " + value);
  }
  r.close();    
}
 
源代码21 项目: big-c   文件: Display.java
public TextRecordInputStream(FileStatus f) throws IOException {
  final Path fpath = f.getPath();
  final Configuration lconf = getConf();
  r = new SequenceFile.Reader(lconf, 
      SequenceFile.Reader.file(fpath));
  key = ReflectionUtils.newInstance(
      r.getKeyClass().asSubclass(WritableComparable.class), lconf);
  val = ReflectionUtils.newInstance(
      r.getValueClass().asSubclass(Writable.class), lconf);
  inbuf = new DataInputBuffer();
  outbuf = new DataOutputBuffer();
}
 
源代码22 项目: hadoop   文件: InputSampler.java
/**
 * Write a partition file for the given job, using the Sampler provided.
 * Queries the sampler for a sample keyset, sorts by the output key
 * comparator, selects the keys for each rank, and writes to the destination
 * returned from {@link TotalOrderPartitioner#getPartitionFile}.
 */
@SuppressWarnings("unchecked") // getInputFormat, getOutputKeyComparator
public static <K,V> void writePartitionFile(Job job, Sampler<K,V> sampler) 
    throws IOException, ClassNotFoundException, InterruptedException {
  Configuration conf = job.getConfiguration();
  final InputFormat inf = 
      ReflectionUtils.newInstance(job.getInputFormatClass(), conf);
  int numPartitions = job.getNumReduceTasks();
  K[] samples = (K[])sampler.getSample(inf, job);
  LOG.info("Using " + samples.length + " samples");
  RawComparator<K> comparator =
    (RawComparator<K>) job.getSortComparator();
  Arrays.sort(samples, comparator);
  Path dst = new Path(TotalOrderPartitioner.getPartitionFile(conf));
  FileSystem fs = dst.getFileSystem(conf);
  if (fs.exists(dst)) {
    fs.delete(dst, false);
  }
  SequenceFile.Writer writer = SequenceFile.createWriter(fs, 
    conf, dst, job.getMapOutputKeyClass(), NullWritable.class);
  NullWritable nullValue = NullWritable.get();
  float stepSize = samples.length / (float) numPartitions;
  int last = -1;
  for(int i = 1; i < numPartitions; ++i) {
    int k = Math.round(stepSize * i);
    while (last >= k && comparator.compare(samples[last], samples[k]) == 0) {
      ++k;
    }
    writer.append(samples[k], nullValue);
    last = k;
  }
  writer.close();
}
 
源代码23 项目: Flink-CEPplus   文件: SequenceFileWriterFactory.java
@Override
public SequenceFileWriter<K, V> create(FSDataOutputStream out) throws IOException {
	org.apache.hadoop.fs.FSDataOutputStream stream = new org.apache.hadoop.fs.FSDataOutputStream(out, null);
	CompressionCodec compressionCodec = getCompressionCodec(serializableHadoopConfig.get(), compressionCodecName);
	SequenceFile.Writer writer = SequenceFile.createWriter(
		serializableHadoopConfig.get(),
		SequenceFile.Writer.stream(stream),
		SequenceFile.Writer.keyClass(keyClass),
		SequenceFile.Writer.valueClass(valueClass),
		SequenceFile.Writer.compression(compressionType, compressionCodec));
	return new SequenceFileWriter<>(writer);
}
 
源代码24 项目: geowave   文件: GeoWaveNNIT.java
private int readFile() throws IllegalArgumentException, IOException {
  int count = 0;
  final FileSystem fs = FileSystem.get(MapReduceTestUtils.getConfiguration());
  final FileStatus[] fss =
      fs.listStatus(
          new Path(
              TestUtils.TEMP_DIR
                  + File.separator
                  + MapReduceTestEnvironment.HDFS_BASE_DIRECTORY
                  + "/t1/pairs"));
  for (final FileStatus ifs : fss) {
    if (ifs.isFile() && ifs.getPath().toString().matches(".*part-r-0000[0-9]")) {
      try (SequenceFile.Reader reader =
          new SequenceFile.Reader(
              MapReduceTestUtils.getConfiguration(),
              Reader.file(ifs.getPath()))) {

        final Text key = new Text();
        final Text val = new Text();

        while (reader.next(key, val)) {
          count++;
        }
      }
    }
  }
  return count;
}
 
源代码25 项目: hadoop-solr   文件: SipsIngestMapperTest.java
private void create100EntrySequenceFile(Configuration conf) throws Exception {
    try (SequenceFile.Writer writer = SequenceFile.createWriter(conf, SequenceFile.Writer.file(new Path(INPUT_DIRECTORY_PATH, "sip_info.txt")),
                SequenceFile.Writer.keyClass(Text.class), SequenceFile.Writer.valueClass(DoubleWritable.class))) {
        for (int i = 0; i < 100; i++) {
            writer.append(new Text("sip_" + i), new DoubleWritable(i / 100.0));
        }
    }
}
 
源代码26 项目: flink   文件: SequenceFileWriterTest.java
@Test
public void testDuplicate() {
	SequenceFileWriter<Text, Text> writer = new SequenceFileWriter("BZ", SequenceFile.CompressionType.BLOCK);
	writer.setSyncOnFlush(true);
	SequenceFileWriter<Text, Text> other = writer.duplicate();

	assertTrue(StreamWriterBaseComparator.equals(writer, other));

	writer.setSyncOnFlush(false);
	assertFalse(StreamWriterBaseComparator.equals(writer, other));
}
 
源代码27 项目: circus-train   文件: SimpleCopyListingTest.java
@Test(timeout = 10000)
public void buildListingForMultipleSources() throws Exception {
  FileSystem fs = FileSystem.get(config);
  String testRootString = temporaryRoot + "/source";
  Path testRoot = new Path(testRootString);
  if (fs.exists(testRoot)) {
    delete(fs, testRootString);
  }

  Path sourceDir1 = new Path(testRoot, "foo/baz/");
  Path sourceDir2 = new Path(testRoot, "foo/bang/");
  Path sourceFile1 = new Path(testRoot, "foo/bar/source.txt");
  URI target = URI.create("s3://bucket/target/moo/");

  fs.mkdirs(sourceDir1);
  fs.mkdirs(sourceDir2);
  createFile(fs, new Path(sourceDir1, "baz_1.dat"));
  createFile(fs, new Path(sourceDir1, "baz_2.dat"));
  createFile(fs, new Path(sourceDir2, "bang_0.dat"));
  createFile(fs, sourceFile1.toString());

  final Path listFile = new Path(testRoot, temporaryRoot + "/fileList.seq");

  listing.buildListing(listFile, options(Arrays.asList(sourceFile1, sourceDir1, sourceDir2), target));
  Set<String> expectedRelativePaths = Sets.newHashSet("/source.txt", "/baz_1.dat", "/baz_2.dat", "/bang_0.dat");
  try (SequenceFile.Reader reader = new SequenceFile.Reader(config, SequenceFile.Reader.file(listFile))) {
    CopyListingFileStatus fileStatus = new CopyListingFileStatus();
    Text relativePath = new Text();
    int relativePathCount = expectedRelativePaths.size();
    for (int i = 0; i < relativePathCount; i++) {
      assertThat(reader.next(relativePath, fileStatus), is(true));
      assertThat("Expected path not found " + relativePath.toString(),
          expectedRelativePaths.remove(relativePath.toString()), is(true));
    }
  }
  assertThat("Expected relativePaths to be empty but was: " + expectedRelativePaths, expectedRelativePaths.isEmpty(),
      is(true));
}
 
public MessagePackSequenceFileReader(LogFilePath path) throws Exception {
    Configuration config = new Configuration();
    Path fsPath = new Path(path.getLogFilePath());
    FileSystem fs = FileUtil.getFileSystem(path.getLogFilePath());
    this.mReader = new SequenceFile.Reader(fs, fsPath, config);
    this.mKey = (BytesWritable) mReader.getKeyClass().newInstance();
    this.mValue = (BytesWritable) mReader.getValueClass().newInstance();
}
 
源代码29 项目: datacollector   文件: TestRecordWriterManager.java
private void testSeqFile(CompressionCodec compressionCodec, SequenceFile.CompressionType compressionType)
    throws Exception {
  RecordWriterManager mgr = managerBuilder()
    .dirPathTemplate(getTestDir().toString() + "/${YYYY()}")
    .compressionCodec(compressionCodec)
    .compressionType(compressionType)
    .fileType(HdfsFileType.SEQUENCE_FILE)
    .build();

  FileSystem fs = FileSystem.get(uri, hdfsConf);
  Path file = new Path(getTestDir(), UUID.randomUUID().toString());
  long expires = System.currentTimeMillis() + 50000;
  RecordWriter writer = mgr.createWriter(fs, file, 50000);
  Assert.assertTrue(expires <= writer.getExpiresOn());
  Assert.assertFalse(writer.isTextFile());
  Assert.assertTrue(writer.isSeqFile());
  Record record = RecordCreator.create();
  record.set(Field.create("a"));
  writer.write(record);
  writer.close();

  SequenceFile.Reader reader = new SequenceFile.Reader(fs, file, new HdfsConfiguration());
  Text key = new Text();
  Text value = new Text();
  Assert.assertTrue(reader.next(key, value));
  Assert.assertNotNull(UUID.fromString(key.toString()));
  Assert.assertEquals("a", value.toString().trim());
  Assert.assertFalse(reader.next(key, value));
  reader.close();
}
 
源代码30 项目: RDFS   文件: SequenceFileAsBinaryInputFormat.java
public SequenceFileAsBinaryRecordReader(Configuration conf, FileSplit split)
    throws IOException {
  Path path = split.getPath();
  FileSystem fs = path.getFileSystem(conf);
  this.in = new SequenceFile.Reader(fs, path, conf);
  this.end = split.getStart() + split.getLength();
  if (split.getStart() > in.getPosition())
    in.sync(split.getStart());                  // sync to start
  this.start = in.getPosition();
  vbytes = in.createValueBytes();
  done = start >= end;
}
 
 类所在包
 同包方法