org.apache.hadoop.io.SequenceFile#Reader ( )源码实例Demo

下面列出了org.apache.hadoop.io.SequenceFile#Reader ( ) 实例代码,或者点击链接到github查看源代码,也可以在右侧发表评论。

源代码1 项目: coming   文件: 1000021_TestCDbwEvaluator_s.java
private void checkRefPoints(int numIterations) throws IOException {
  for (int i = 0; i <= numIterations; i++) {
    Path out = new Path(getTestTempDirPath("output"), "representativePoints-" + i);
    Configuration conf = new Configuration();
    FileSystem fs = FileSystem.get(conf);
    for (FileStatus file : fs.listStatus(out)) {
      if (!file.getPath().getName().startsWith(".")) {
        SequenceFile.Reader reader = new SequenceFile.Reader(fs, file.getPath(), conf);
        try {
          Writable clusterId = new IntWritable(0);
          VectorWritable point = new VectorWritable();
          while (reader.next(clusterId, point)) {
            System.out.println("\tC-" + clusterId + ": " + AbstractCluster.formatVector(point.get(), null));
          }
        } finally {
          reader.close();
        }
      }
    }
  }
}
 
源代码2 项目: hiped2   文件: SqoopSequenceFileReader.java
public static void read(Path inputPath) throws IOException {
  Configuration conf = new Configuration();
  FileSystem fs = FileSystem.get(conf);

  SequenceFile.Reader reader =   //<co id="ch03_comment_seqfile_read1"/>
      new SequenceFile.Reader(fs, inputPath, conf);

  try {
    System.out.println(
        "Is block compressed = " + reader.isBlockCompressed());

    Text key = new Text();
    IntWritable value = new IntWritable();

    while (reader.next(key, value)) {   //<co id="ch03_comment_seqfile_read2"/>
      System.out.println(key + "," + value);
    }
  } finally {
    reader.close();
  }
}
 
源代码3 项目: RDFS   文件: TestMapRed.java
public void reduce(WritableComparable key, Iterator values,
                   OutputCollector output, Reporter reporter
                   ) throws IOException {
  if (first) {
    first = false;
    MapOutputFile mapOutputFile = new MapOutputFile(taskId.getJobID());
    mapOutputFile.setConf(conf);
    Path input = mapOutputFile.getInputFile(0, taskId);
    FileSystem fs = FileSystem.get(conf);
    assertTrue("reduce input exists " + input, fs.exists(input));
    SequenceFile.Reader rdr = 
      new SequenceFile.Reader(fs, input, conf);
    assertEquals("is reduce input compressed " + input, 
                 compressInput, 
                 rdr.isCompressed());
    rdr.close();          
  }
}
 
源代码4 项目: hadoop-gpu   文件: TestMapRed.java
public void reduce(WritableComparable key, Iterator values,
                   OutputCollector output, Reporter reporter
                   ) throws IOException {
  if (first) {
    first = false;
    MapOutputFile mapOutputFile = new MapOutputFile(taskId.getJobID());
    mapOutputFile.setConf(conf);
    Path input = mapOutputFile.getInputFile(0, taskId);
    FileSystem fs = FileSystem.get(conf);
    assertTrue("reduce input exists " + input, fs.exists(input));
    SequenceFile.Reader rdr = 
      new SequenceFile.Reader(fs, input, conf);
    assertEquals("is reduce input compressed " + input, 
                 compressInput, 
                 rdr.isCompressed());
    rdr.close();          
  }
}
 
源代码5 项目: hadoop   文件: TotalOrderPartitioner.java
/**
 * Read the cut points from the given IFile.
 * @param fs The file system
 * @param p The path to read
 * @param keyClass The map output key class
 * @param job The job config
 * @throws IOException
 */
                               // matching key types enforced by passing in
@SuppressWarnings("unchecked") // map output key class
private K[] readPartitions(FileSystem fs, Path p, Class<K> keyClass,
    Configuration conf) throws IOException {
  SequenceFile.Reader reader = new SequenceFile.Reader(fs, p, conf);
  ArrayList<K> parts = new ArrayList<K>();
  K key = ReflectionUtils.newInstance(keyClass, conf);
  NullWritable value = NullWritable.get();
  try {
    while (reader.next(key, value)) {
      parts.add(key);
      key = ReflectionUtils.newInstance(keyClass, conf);
    }
    reader.close();
    reader = null;
  } finally {
    IOUtils.cleanup(LOG, reader);
  }
  return parts.toArray((K[])Array.newInstance(keyClass, parts.size()));
}
 
源代码6 项目: phoenix-tephra   文件: CommitMarkerCodecTest.java
@Test
public void testIncompleteCommitMarker() throws Exception {
  Path newLog = new Path(TMP_FOLDER.newFolder().getAbsolutePath(), LOG_FILE);
  try (SequenceFile.Writer writer = SequenceFile.createWriter(fs, conf, newLog, LongWritable.class,
                                                              LongWritable.class,
                                                              SequenceFile.CompressionType.NONE)) {
    String key = TxConstants.TransactionLog.NUM_ENTRIES_APPENDED;
    SequenceFile.ValueBytes valueBytes = new IncompleteValueBytes();
    writer.appendRaw(key.getBytes(), 0, key.length(), valueBytes);
    writer.hflush();
    writer.hsync();
  }

  // Read the incomplete commit marker
  try (SequenceFile.Reader reader = new SequenceFile.Reader(fs, newLog, conf);
       CommitMarkerCodec markerCodec = new CommitMarkerCodec()) {
    try {
      markerCodec.readMarker(reader);
      Assert.fail("Expected EOF Exception to be thrown");
    } catch (EOFException e) {
      // expected since we didn't write the value bytes
    }
  }
}
 
源代码7 项目: big-c   文件: TestUniformSizeInputFormat.java
private void checkSplits(Path listFile, List<InputSplit> splits) throws IOException {
  long lastEnd = 0;

  //Verify if each split's start is matching with the previous end and
  //we are not missing anything
  for (InputSplit split : splits) {
    FileSplit fileSplit = (FileSplit) split;
    long start = fileSplit.getStart();
    Assert.assertEquals(lastEnd, start);
    lastEnd = start + fileSplit.getLength();
  }

  //Verify there is nothing more to read from the input file
  SequenceFile.Reader reader
          = new SequenceFile.Reader(cluster.getFileSystem().getConf(),
                  SequenceFile.Reader.file(listFile));

  try {
    reader.seek(lastEnd);
    CopyListingFileStatus srcFileStatus = new CopyListingFileStatus();
    Text srcRelPath = new Text();
    Assert.assertFalse(reader.next(srcRelPath, srcFileStatus));
  } finally {
    IOUtils.closeStream(reader);
  }
}
 
源代码8 项目: emr-sample-apps   文件: PrintAlignments.java
public static void printFile(Path thePath) throws IOException
{
	SequenceFile.Reader theReader = new SequenceFile.Reader(FileSystem.get(conf), thePath, conf);
       
    IntWritable key = new IntWritable();
    BytesWritable value = new BytesWritable();
       
    while(theReader.next(key,value))
    {
    	ar.fromBytes(value);
    	System.out.println(ar.toAlignment(key.get()));
    }
}
 
源代码9 项目: hadoop-gpu   文件: FsShell.java
public TextRecordInputStream(FileStatus f) throws IOException {
  r = new SequenceFile.Reader(fs, f.getPath(), getConf());
  key = ReflectionUtils.newInstance(r.getKeyClass().asSubclass(WritableComparable.class),
                                    getConf());
  val = ReflectionUtils.newInstance(r.getValueClass().asSubclass(Writable.class),
                                    getConf());
  inbuf = new DataInputBuffer();
  outbuf = new DataOutputBuffer();
}
 
源代码10 项目: hadoop-gpu   文件: TestMapRed.java
public void testNullKeys() throws Exception {
  JobConf conf = new JobConf(TestMapRed.class);
  FileSystem fs = FileSystem.getLocal(conf);
  Path testdir = new Path(
      System.getProperty("test.build.data","/tmp")).makeQualified(fs);
  fs.delete(testdir, true);
  Path inFile = new Path(testdir, "nullin/blah");
  SequenceFile.Writer w = SequenceFile.createWriter(fs, conf, inFile,
      NullWritable.class, Text.class, SequenceFile.CompressionType.NONE);
  Text t = new Text();
  t.set("AAAAAAAAAAAAAA"); w.append(NullWritable.get(), t);
  t.set("BBBBBBBBBBBBBB"); w.append(NullWritable.get(), t);
  t.set("CCCCCCCCCCCCCC"); w.append(NullWritable.get(), t);
  t.set("DDDDDDDDDDDDDD"); w.append(NullWritable.get(), t);
  t.set("EEEEEEEEEEEEEE"); w.append(NullWritable.get(), t);
  t.set("FFFFFFFFFFFFFF"); w.append(NullWritable.get(), t);
  t.set("GGGGGGGGGGGGGG"); w.append(NullWritable.get(), t);
  t.set("HHHHHHHHHHHHHH"); w.append(NullWritable.get(), t);
  w.close();
  FileInputFormat.setInputPaths(conf, inFile);
  FileOutputFormat.setOutputPath(conf, new Path(testdir, "nullout"));
  conf.setMapperClass(NullMapper.class);
  conf.setReducerClass(IdentityReducer.class);
  conf.setOutputKeyClass(NullWritable.class);
  conf.setOutputValueClass(Text.class);
  conf.setInputFormat(SequenceFileInputFormat.class);
  conf.setOutputFormat(SequenceFileOutputFormat.class);
  conf.setNumReduceTasks(1);

  JobClient.runJob(conf);

  SequenceFile.Reader r = new SequenceFile.Reader(fs,
      new Path(testdir, "nullout/part-00000"), conf);
  String m = "AAAAAAAAAAAAAA";
  for (int i = 1; r.next(NullWritable.get(), t); ++i) {
    assertTrue(t.toString() + " doesn't match " + m, m.equals(t.toString()));
    m = m.replace((char)('A' + i - 1), (char)('A' + i));
  }
}
 
源代码11 项目: hadoop   文件: SequenceFileAsBinaryInputFormat.java
public void initialize(InputSplit split, TaskAttemptContext context) 
    throws IOException, InterruptedException {
  Path path = ((FileSplit)split).getPath();
  Configuration conf = context.getConfiguration();
  FileSystem fs = path.getFileSystem(conf);
  this.in = new SequenceFile.Reader(fs, path, conf);
  this.end = ((FileSplit)split).getStart() + split.getLength();
  if (((FileSplit)split).getStart() > in.getPosition()) {
    in.sync(((FileSplit)split).getStart());    // sync to start
  }
  this.start = in.getPosition();
  vbytes = in.createValueBytes();
  done = start >= end;
}
 
源代码12 项目: geowave   文件: GeoWaveNNIT.java
private int readFile() throws IllegalArgumentException, IOException {
  int count = 0;
  final FileSystem fs = FileSystem.get(MapReduceTestUtils.getConfiguration());
  final FileStatus[] fss =
      fs.listStatus(
          new Path(
              TestUtils.TEMP_DIR
                  + File.separator
                  + MapReduceTestEnvironment.HDFS_BASE_DIRECTORY
                  + "/t1/pairs"));
  for (final FileStatus ifs : fss) {
    if (ifs.isFile() && ifs.getPath().toString().matches(".*part-r-0000[0-9]")) {
      try (SequenceFile.Reader reader =
          new SequenceFile.Reader(
              MapReduceTestUtils.getConfiguration(),
              Reader.file(ifs.getPath()))) {

        final Text key = new Text();
        final Text val = new Text();

        while (reader.next(key, val)) {
          count++;
        }
      }
    }
  }
  return count;
}
 
源代码13 项目: datacollector   文件: TestRecordWriterManager.java
private void testSeqFile(CompressionCodec compressionCodec, SequenceFile.CompressionType compressionType)
    throws Exception {
  RecordWriterManager mgr = managerBuilder()
    .dirPathTemplate(getTestDir().toString() + "/${YYYY()}")
    .compressionCodec(compressionCodec)
    .compressionType(compressionType)
    .fileType(HdfsFileType.SEQUENCE_FILE)
    .build();

  FileSystem fs = FileSystem.get(uri, hdfsConf);
  Path file = new Path(getTestDir(), UUID.randomUUID().toString());
  long expires = System.currentTimeMillis() + 50000;
  RecordWriter writer = mgr.createWriter(fs, file, 50000);
  Assert.assertTrue(expires <= writer.getExpiresOn());
  Assert.assertFalse(writer.isTextFile());
  Assert.assertTrue(writer.isSeqFile());
  Record record = RecordCreator.create();
  record.set(Field.create("a"));
  writer.write(record);
  writer.close();

  SequenceFile.Reader reader = new SequenceFile.Reader(fs, file, new HdfsConfiguration());
  Text key = new Text();
  Text value = new Text();
  Assert.assertTrue(reader.next(key, value));
  Assert.assertNotNull(UUID.fromString(key.toString()));
  Assert.assertEquals("a", value.toString().trim());
  Assert.assertFalse(reader.next(key, value));
  reader.close();
}
 
源代码14 项目: circus-train   文件: CircusTrainCopyListingTest.java
@Test
public void typical() throws IOException {
  File input = temp.newFolder("input");
  File inputSub2 = new File(input, "sub1/sub2");
  inputSub2.mkdirs();
  Files.asCharSink(new File(inputSub2, "data"), UTF_8).write("test1");

  File listFile = temp.newFile("listFile");
  Path pathToListFile = new Path(listFile.toURI());

  List<Path> sourceDataLocations = new ArrayList<>();
  sourceDataLocations.add(new Path(inputSub2.toURI()));
  DistCpOptions options = new DistCpOptions(sourceDataLocations, new Path("dummy"));

  CircusTrainCopyListing.setRootPath(conf, new Path(input.toURI()));
  CircusTrainCopyListing copyListing = new CircusTrainCopyListing(conf, null);
  copyListing.doBuildListing(pathToListFile, options);

  try (Reader reader = new SequenceFile.Reader(conf, SequenceFile.Reader.file(pathToListFile))) {
    Text key = new Text();
    CopyListingFileStatus value = new CopyListingFileStatus();

    assertTrue(reader.next(key, value));
    assertThat(key.toString(), is("/sub1/sub2"));
    assertThat(value.getPath().toUri().toString(), endsWith("/input/sub1/sub2"));

    assertTrue(reader.next(key, value));
    assertThat(key.toString(), is("/sub1/sub2/data"));
    assertThat(value.getPath().toUri().toString(), endsWith("/input/sub1/sub2/data"));

    assertFalse(reader.next(key, value));

  }
}
 
源代码15 项目: nutchpy   文件: RecordIterator.java
/**
 * Creates iterator for sequence file records
 * @param reader The sequence file reader
 * @param start The starting record number
 * @param limit number of records to read
 */
public RecordIterator(SequenceFile.Reader reader, long start,
                      long limit) throws IOException {

    this.reader = reader;
    this.key = (Writable) ReflectionUtils.newInstance(
                                            reader.getKeyClass(), CONFIG);
    this.value = (Writable) ReflectionUtils.newInstance(
                                            reader.getValueClass(), CONFIG);
    this.limit = limit;

    //skip rows till the start position ;
    for(int i = 0; i < start && reader.next(key, value); i++);
    this.next = readNext();
}
 
源代码16 项目: Flink-CEPplus   文件: RollingSinkITCase.java
/**
 * This tests {@link SequenceFileWriter}
 * with non-rolling output and without compression.
 */
@Test
public void testNonRollingSequenceFileWithoutCompressionWriter() throws Exception {
	final int numElements = 20;
	final String outPath = hdfsURI + "/seq-no-comp-non-rolling-out";
	StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
	env.setParallelism(2);

	DataStream<Tuple2<Integer, String>> source = env.addSource(new TestSourceFunction(numElements))
			.broadcast()
			.filter(new OddEvenFilter());

	DataStream<Tuple2<IntWritable, Text>> mapped =  source.map(new MapFunction<Tuple2<Integer, String>, Tuple2<IntWritable, Text>>() {
		private static final long serialVersionUID = 1L;

		@Override
		public Tuple2<IntWritable, Text> map(Tuple2<Integer, String> value) throws Exception {
			return Tuple2.of(new IntWritable(value.f0), new Text(value.f1));
		}
	});

	RollingSink<Tuple2<IntWritable, Text>> sink = new RollingSink<Tuple2<IntWritable, Text>>(outPath)
			.setWriter(new SequenceFileWriter<IntWritable, Text>())
			.setBucketer(new NonRollingBucketer())
			.setPartPrefix("part")
			.setPendingPrefix("")
			.setPendingSuffix("");

	mapped.addSink(sink);

	env.execute("RollingSink String Write Test");

	FSDataInputStream inStream = dfs.open(new Path(outPath + "/part-0-0"));

	SequenceFile.Reader reader = new SequenceFile.Reader(inStream,
			1000,
			0,
			100000,
			new Configuration());

	IntWritable intWritable = new IntWritable();
	Text txt = new Text();

	for (int i = 0; i < numElements; i += 2) {
		reader.next(intWritable, txt);
		Assert.assertEquals(i, intWritable.get());
		Assert.assertEquals("message #" + i, txt.toString());
	}

	reader.close();
	inStream.close();

	inStream = dfs.open(new Path(outPath + "/part-1-0"));

	reader = new SequenceFile.Reader(inStream,
			1000,
			0,
			100000,
			new Configuration());

	for (int i = 1; i < numElements; i += 2) {
		reader.next(intWritable, txt);
		Assert.assertEquals(i, intWritable.get());
		Assert.assertEquals("message #" + i, txt.toString());
	}

	reader.close();
	inStream.close();
}
 
源代码17 项目: Flink-CEPplus   文件: RollingSinkITCase.java
/**
 * This tests {@link SequenceFileWriter}
 * with non-rolling output but with compression.
 */
@Test
public void testNonRollingSequenceFileWithCompressionWriter() throws Exception {
	final int numElements = 20;
	final String outPath = hdfsURI + "/seq-non-rolling-out";
	StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
	env.setParallelism(2);

	DataStream<Tuple2<Integer, String>> source = env.addSource(new TestSourceFunction(numElements))
			.broadcast()
			.filter(new OddEvenFilter());

	DataStream<Tuple2<IntWritable, Text>> mapped =  source.map(new MapFunction<Tuple2<Integer, String>, Tuple2<IntWritable, Text>>() {
		private static final long serialVersionUID = 1L;

		@Override
		public Tuple2<IntWritable, Text> map(Tuple2<Integer, String> value) throws Exception {
			return Tuple2.of(new IntWritable(value.f0), new Text(value.f1));
		}
	});

	RollingSink<Tuple2<IntWritable, Text>> sink = new RollingSink<Tuple2<IntWritable, Text>>(outPath)
			.setWriter(new SequenceFileWriter<IntWritable, Text>("Default", SequenceFile.CompressionType.BLOCK))
			.setBucketer(new NonRollingBucketer())
			.setPartPrefix("part")
			.setPendingPrefix("")
			.setPendingSuffix("");

	mapped.addSink(sink);

	env.execute("RollingSink String Write Test");

	FSDataInputStream inStream = dfs.open(new Path(outPath + "/part-0-0"));

	SequenceFile.Reader reader = new SequenceFile.Reader(inStream,
			1000,
			0,
			100000,
			new Configuration());

	IntWritable intWritable = new IntWritable();
	Text txt = new Text();

	for (int i = 0; i < numElements; i += 2) {
		reader.next(intWritable, txt);
		Assert.assertEquals(i, intWritable.get());
		Assert.assertEquals("message #" + i, txt.toString());
	}

	reader.close();
	inStream.close();

	inStream = dfs.open(new Path(outPath + "/part-1-0"));

	reader = new SequenceFile.Reader(inStream,
			1000,
			0,
			100000,
			new Configuration());

	for (int i = 1; i < numElements; i += 2) {
		reader.next(intWritable, txt);
		Assert.assertEquals(i, intWritable.get());
		Assert.assertEquals("message #" + i, txt.toString());
	}

	reader.close();
	inStream.close();
}
 
public void runWhereTest(String whereClause, String firstValStr,
    int numExpectedResults, int expectedSum) throws IOException {

  String [] columns = HsqldbTestServer.getFieldNames();
  ClassLoader prevClassLoader = null;
  SequenceFile.Reader reader = null;

  String [] argv = getArgv(true, columns, whereClause);
  runImport(argv);
  try {
    SqoopOptions opts = new ImportTool().parseArguments(
        getArgv(false, columns, whereClause),
        null, null, true);

    CompilationManager compileMgr = new CompilationManager(opts);
    String jarFileName = compileMgr.getJarFilename();

    prevClassLoader = ClassLoaderStack.addJarFile(jarFileName,
        getTableName());

    reader = SeqFileReader.getSeqFileReader(getDataFilePath().toString());

    // here we can actually instantiate (k, v) pairs.
    Configuration conf = new Configuration();
    Object key = ReflectionUtils.newInstance(reader.getKeyClass(), conf);
    Object val = ReflectionUtils.newInstance(reader.getValueClass(), conf);

    if (reader.next(key) == null) {
      fail("Empty SequenceFile during import");
    }

    // make sure that the value we think should be at the top, is.
    reader.getCurrentValue(val);
    assertEquals("Invalid ordering within sorted SeqFile", firstValStr,
        val.toString());

    // We know that these values are two ints separated by a ',' character.
    // Since this is all dynamic, though, we don't want to actually link
    // against the class and use its methods. So we just parse this back
    // into int fields manually.  Sum them up and ensure that we get the
    // expected total for the first column, to verify that we got all the
    // results from the db into the file.
    int curSum = getFirstInt(val.toString());
    int totalResults = 1;

    // now sum up everything else in the file.
    while (reader.next(key) != null) {
      reader.getCurrentValue(val);
      curSum += getFirstInt(val.toString());
      totalResults++;
    }

    assertEquals("Total sum of first db column mismatch", expectedSum,
        curSum);
    assertEquals("Incorrect number of results for query", numExpectedResults,
        totalResults);
  } catch (InvalidOptionsException ioe) {
    fail(ioe.toString());
  } catch (ParseException pe) {
    fail(pe.toString());
  } finally {
    IOUtils.closeStream(reader);

    if (null != prevClassLoader) {
      ClassLoaderStack.setCurrentClassLoader(prevClassLoader);
    }
  }
}
 
源代码19 项目: hadoop   文件: GenerateDistCacheData.java
@Override
public List<InputSplit> getSplits(JobContext jobCtxt) throws IOException {
  final JobConf jobConf = new JobConf(jobCtxt.getConfiguration());
  final JobClient client = new JobClient(jobConf);
  ClusterStatus stat = client.getClusterStatus(true);
  int numTrackers = stat.getTaskTrackers();
  final int fileCount = jobConf.getInt(GRIDMIX_DISTCACHE_FILE_COUNT, -1);

  // Total size of distributed cache files to be generated
  final long totalSize = jobConf.getLong(GRIDMIX_DISTCACHE_BYTE_COUNT, -1);
  // Get the path of the special file
  String distCacheFileList = jobConf.get(GRIDMIX_DISTCACHE_FILE_LIST);
  if (fileCount < 0 || totalSize < 0 || distCacheFileList == null) {
    throw new RuntimeException("Invalid metadata: #files (" + fileCount
        + "), total_size (" + totalSize + "), filelisturi ("
        + distCacheFileList + ")");
  }

  Path sequenceFile = new Path(distCacheFileList);
  FileSystem fs = sequenceFile.getFileSystem(jobConf);
  FileStatus srcst = fs.getFileStatus(sequenceFile);
  // Consider the number of TTs * mapSlotsPerTracker as number of mappers.
  int numMapSlotsPerTracker = jobConf.getInt(TTConfig.TT_MAP_SLOTS, 2);
  int numSplits = numTrackers * numMapSlotsPerTracker;

  List<InputSplit> splits = new ArrayList<InputSplit>(numSplits);
  LongWritable key = new LongWritable();
  BytesWritable value = new BytesWritable();

  // Average size of data to be generated by each map task
  final long targetSize = Math.max(totalSize / numSplits,
                            DistributedCacheEmulator.AVG_BYTES_PER_MAP);
  long splitStartPosition = 0L;
  long splitEndPosition = 0L;
  long acc = 0L;
  long bytesRemaining = srcst.getLen();
  SequenceFile.Reader reader = null;
  try {
    reader = new SequenceFile.Reader(fs, sequenceFile, jobConf);
    while (reader.next(key, value)) {

      // If adding this file would put this split past the target size,
      // cut the last split and put this file in the next split.
      if (acc + key.get() > targetSize && acc != 0) {
        long splitSize = splitEndPosition - splitStartPosition;
        splits.add(new FileSplit(
            sequenceFile, splitStartPosition, splitSize, (String[])null));
        bytesRemaining -= splitSize;
        splitStartPosition = splitEndPosition;
        acc = 0L;
      }
      acc += key.get();
      splitEndPosition = reader.getPosition();
    }
  } finally {
    if (reader != null) {
      reader.close();
    }
  }
  if (bytesRemaining != 0) {
    splits.add(new FileSplit(
        sequenceFile, splitStartPosition, bytesRemaining, (String[])null));
  }

  return splits;
}
 
源代码20 项目: RDFS   文件: PiEstimator.java
/**
 * Run a map/reduce job for estimating Pi.
 *
 * @return the estimated value of Pi
 */
public static BigDecimal estimate(int numMaps, long numPoints, JobConf jobConf
    ) throws IOException {
  //setup job conf
  jobConf.setJobName(PiEstimator.class.getSimpleName());

  jobConf.setInputFormat(SequenceFileInputFormat.class);

  jobConf.setOutputKeyClass(BooleanWritable.class);
  jobConf.setOutputValueClass(LongWritable.class);
  jobConf.setOutputFormat(SequenceFileOutputFormat.class);

  jobConf.setMapperClass(PiMapper.class);
  jobConf.setNumMapTasks(numMaps);

  jobConf.setReducerClass(PiReducer.class);
  jobConf.setNumReduceTasks(1);

  // turn off speculative execution, because DFS doesn't handle
  // multiple writers to the same file.
  jobConf.setSpeculativeExecution(false);

  //setup input/output directories
  final Path inDir = new Path(TMP_DIR, "in");
  final Path outDir = new Path(TMP_DIR, "out");
  FileInputFormat.setInputPaths(jobConf, inDir);
  FileOutputFormat.setOutputPath(jobConf, outDir);

  final FileSystem fs = FileSystem.get(jobConf);
  if (fs.exists(TMP_DIR)) {
    throw new IOException("Tmp directory " + fs.makeQualified(TMP_DIR)
        + " already exists.  Please remove it first.");
  }
  if (!fs.mkdirs(inDir)) {
    throw new IOException("Cannot create input directory " + inDir);
  }

  try {
    //generate an input file for each map task
    for(int i=0; i < numMaps; ++i) {
      final Path file = new Path(inDir, "part"+i);
      final LongWritable offset = new LongWritable(i * numPoints);
      final LongWritable size = new LongWritable(numPoints);
      final SequenceFile.Writer writer = SequenceFile.createWriter(
          fs, jobConf, file,
          LongWritable.class, LongWritable.class, CompressionType.NONE);
      try {
        writer.append(offset, size);
      } finally {
        writer.close();
      }
      System.out.println("Wrote input for Map #"+i);
    }

    //start a map/reduce job
    System.out.println("Starting Job");
    final long startTime = System.currentTimeMillis();
    JobClient.runJob(jobConf);
    final double duration = (System.currentTimeMillis() - startTime)/1000.0;
    System.out.println("Job Finished in " + duration + " seconds");

    //read outputs
    Path inFile = new Path(outDir, "reduce-out");
    LongWritable numInside = new LongWritable();
    LongWritable numOutside = new LongWritable();
    SequenceFile.Reader reader = new SequenceFile.Reader(fs, inFile, jobConf);
    try {
      reader.next(numInside, numOutside);
    } finally {
      reader.close();
    }

    //compute estimated value
    return BigDecimal.valueOf(4).setScale(20)
        .multiply(BigDecimal.valueOf(numInside.get()))
        .divide(BigDecimal.valueOf(numMaps))
        .divide(BigDecimal.valueOf(numPoints));
  } finally {
    fs.delete(TMP_DIR, true);
  }
}