下面列出了org.apache.hadoop.io.SequenceFile#Reader ( ) 实例代码,或者点击链接到github查看源代码,也可以在右侧发表评论。
private void checkRefPoints(int numIterations) throws IOException {
for (int i = 0; i <= numIterations; i++) {
Path out = new Path(getTestTempDirPath("output"), "representativePoints-" + i);
Configuration conf = new Configuration();
FileSystem fs = FileSystem.get(conf);
for (FileStatus file : fs.listStatus(out)) {
if (!file.getPath().getName().startsWith(".")) {
SequenceFile.Reader reader = new SequenceFile.Reader(fs, file.getPath(), conf);
try {
Writable clusterId = new IntWritable(0);
VectorWritable point = new VectorWritable();
while (reader.next(clusterId, point)) {
System.out.println("\tC-" + clusterId + ": " + AbstractCluster.formatVector(point.get(), null));
}
} finally {
reader.close();
}
}
}
}
}
public static void read(Path inputPath) throws IOException {
Configuration conf = new Configuration();
FileSystem fs = FileSystem.get(conf);
SequenceFile.Reader reader = //<co id="ch03_comment_seqfile_read1"/>
new SequenceFile.Reader(fs, inputPath, conf);
try {
System.out.println(
"Is block compressed = " + reader.isBlockCompressed());
Text key = new Text();
IntWritable value = new IntWritable();
while (reader.next(key, value)) { //<co id="ch03_comment_seqfile_read2"/>
System.out.println(key + "," + value);
}
} finally {
reader.close();
}
}
public void reduce(WritableComparable key, Iterator values,
OutputCollector output, Reporter reporter
) throws IOException {
if (first) {
first = false;
MapOutputFile mapOutputFile = new MapOutputFile(taskId.getJobID());
mapOutputFile.setConf(conf);
Path input = mapOutputFile.getInputFile(0, taskId);
FileSystem fs = FileSystem.get(conf);
assertTrue("reduce input exists " + input, fs.exists(input));
SequenceFile.Reader rdr =
new SequenceFile.Reader(fs, input, conf);
assertEquals("is reduce input compressed " + input,
compressInput,
rdr.isCompressed());
rdr.close();
}
}
public void reduce(WritableComparable key, Iterator values,
OutputCollector output, Reporter reporter
) throws IOException {
if (first) {
first = false;
MapOutputFile mapOutputFile = new MapOutputFile(taskId.getJobID());
mapOutputFile.setConf(conf);
Path input = mapOutputFile.getInputFile(0, taskId);
FileSystem fs = FileSystem.get(conf);
assertTrue("reduce input exists " + input, fs.exists(input));
SequenceFile.Reader rdr =
new SequenceFile.Reader(fs, input, conf);
assertEquals("is reduce input compressed " + input,
compressInput,
rdr.isCompressed());
rdr.close();
}
}
/**
* Read the cut points from the given IFile.
* @param fs The file system
* @param p The path to read
* @param keyClass The map output key class
* @param job The job config
* @throws IOException
*/
// matching key types enforced by passing in
@SuppressWarnings("unchecked") // map output key class
private K[] readPartitions(FileSystem fs, Path p, Class<K> keyClass,
Configuration conf) throws IOException {
SequenceFile.Reader reader = new SequenceFile.Reader(fs, p, conf);
ArrayList<K> parts = new ArrayList<K>();
K key = ReflectionUtils.newInstance(keyClass, conf);
NullWritable value = NullWritable.get();
try {
while (reader.next(key, value)) {
parts.add(key);
key = ReflectionUtils.newInstance(keyClass, conf);
}
reader.close();
reader = null;
} finally {
IOUtils.cleanup(LOG, reader);
}
return parts.toArray((K[])Array.newInstance(keyClass, parts.size()));
}
@Test
public void testIncompleteCommitMarker() throws Exception {
Path newLog = new Path(TMP_FOLDER.newFolder().getAbsolutePath(), LOG_FILE);
try (SequenceFile.Writer writer = SequenceFile.createWriter(fs, conf, newLog, LongWritable.class,
LongWritable.class,
SequenceFile.CompressionType.NONE)) {
String key = TxConstants.TransactionLog.NUM_ENTRIES_APPENDED;
SequenceFile.ValueBytes valueBytes = new IncompleteValueBytes();
writer.appendRaw(key.getBytes(), 0, key.length(), valueBytes);
writer.hflush();
writer.hsync();
}
// Read the incomplete commit marker
try (SequenceFile.Reader reader = new SequenceFile.Reader(fs, newLog, conf);
CommitMarkerCodec markerCodec = new CommitMarkerCodec()) {
try {
markerCodec.readMarker(reader);
Assert.fail("Expected EOF Exception to be thrown");
} catch (EOFException e) {
// expected since we didn't write the value bytes
}
}
}
private void checkSplits(Path listFile, List<InputSplit> splits) throws IOException {
long lastEnd = 0;
//Verify if each split's start is matching with the previous end and
//we are not missing anything
for (InputSplit split : splits) {
FileSplit fileSplit = (FileSplit) split;
long start = fileSplit.getStart();
Assert.assertEquals(lastEnd, start);
lastEnd = start + fileSplit.getLength();
}
//Verify there is nothing more to read from the input file
SequenceFile.Reader reader
= new SequenceFile.Reader(cluster.getFileSystem().getConf(),
SequenceFile.Reader.file(listFile));
try {
reader.seek(lastEnd);
CopyListingFileStatus srcFileStatus = new CopyListingFileStatus();
Text srcRelPath = new Text();
Assert.assertFalse(reader.next(srcRelPath, srcFileStatus));
} finally {
IOUtils.closeStream(reader);
}
}
public static void printFile(Path thePath) throws IOException
{
SequenceFile.Reader theReader = new SequenceFile.Reader(FileSystem.get(conf), thePath, conf);
IntWritable key = new IntWritable();
BytesWritable value = new BytesWritable();
while(theReader.next(key,value))
{
ar.fromBytes(value);
System.out.println(ar.toAlignment(key.get()));
}
}
public TextRecordInputStream(FileStatus f) throws IOException {
r = new SequenceFile.Reader(fs, f.getPath(), getConf());
key = ReflectionUtils.newInstance(r.getKeyClass().asSubclass(WritableComparable.class),
getConf());
val = ReflectionUtils.newInstance(r.getValueClass().asSubclass(Writable.class),
getConf());
inbuf = new DataInputBuffer();
outbuf = new DataOutputBuffer();
}
public void testNullKeys() throws Exception {
JobConf conf = new JobConf(TestMapRed.class);
FileSystem fs = FileSystem.getLocal(conf);
Path testdir = new Path(
System.getProperty("test.build.data","/tmp")).makeQualified(fs);
fs.delete(testdir, true);
Path inFile = new Path(testdir, "nullin/blah");
SequenceFile.Writer w = SequenceFile.createWriter(fs, conf, inFile,
NullWritable.class, Text.class, SequenceFile.CompressionType.NONE);
Text t = new Text();
t.set("AAAAAAAAAAAAAA"); w.append(NullWritable.get(), t);
t.set("BBBBBBBBBBBBBB"); w.append(NullWritable.get(), t);
t.set("CCCCCCCCCCCCCC"); w.append(NullWritable.get(), t);
t.set("DDDDDDDDDDDDDD"); w.append(NullWritable.get(), t);
t.set("EEEEEEEEEEEEEE"); w.append(NullWritable.get(), t);
t.set("FFFFFFFFFFFFFF"); w.append(NullWritable.get(), t);
t.set("GGGGGGGGGGGGGG"); w.append(NullWritable.get(), t);
t.set("HHHHHHHHHHHHHH"); w.append(NullWritable.get(), t);
w.close();
FileInputFormat.setInputPaths(conf, inFile);
FileOutputFormat.setOutputPath(conf, new Path(testdir, "nullout"));
conf.setMapperClass(NullMapper.class);
conf.setReducerClass(IdentityReducer.class);
conf.setOutputKeyClass(NullWritable.class);
conf.setOutputValueClass(Text.class);
conf.setInputFormat(SequenceFileInputFormat.class);
conf.setOutputFormat(SequenceFileOutputFormat.class);
conf.setNumReduceTasks(1);
JobClient.runJob(conf);
SequenceFile.Reader r = new SequenceFile.Reader(fs,
new Path(testdir, "nullout/part-00000"), conf);
String m = "AAAAAAAAAAAAAA";
for (int i = 1; r.next(NullWritable.get(), t); ++i) {
assertTrue(t.toString() + " doesn't match " + m, m.equals(t.toString()));
m = m.replace((char)('A' + i - 1), (char)('A' + i));
}
}
public void initialize(InputSplit split, TaskAttemptContext context)
throws IOException, InterruptedException {
Path path = ((FileSplit)split).getPath();
Configuration conf = context.getConfiguration();
FileSystem fs = path.getFileSystem(conf);
this.in = new SequenceFile.Reader(fs, path, conf);
this.end = ((FileSplit)split).getStart() + split.getLength();
if (((FileSplit)split).getStart() > in.getPosition()) {
in.sync(((FileSplit)split).getStart()); // sync to start
}
this.start = in.getPosition();
vbytes = in.createValueBytes();
done = start >= end;
}
private int readFile() throws IllegalArgumentException, IOException {
int count = 0;
final FileSystem fs = FileSystem.get(MapReduceTestUtils.getConfiguration());
final FileStatus[] fss =
fs.listStatus(
new Path(
TestUtils.TEMP_DIR
+ File.separator
+ MapReduceTestEnvironment.HDFS_BASE_DIRECTORY
+ "/t1/pairs"));
for (final FileStatus ifs : fss) {
if (ifs.isFile() && ifs.getPath().toString().matches(".*part-r-0000[0-9]")) {
try (SequenceFile.Reader reader =
new SequenceFile.Reader(
MapReduceTestUtils.getConfiguration(),
Reader.file(ifs.getPath()))) {
final Text key = new Text();
final Text val = new Text();
while (reader.next(key, val)) {
count++;
}
}
}
}
return count;
}
private void testSeqFile(CompressionCodec compressionCodec, SequenceFile.CompressionType compressionType)
throws Exception {
RecordWriterManager mgr = managerBuilder()
.dirPathTemplate(getTestDir().toString() + "/${YYYY()}")
.compressionCodec(compressionCodec)
.compressionType(compressionType)
.fileType(HdfsFileType.SEQUENCE_FILE)
.build();
FileSystem fs = FileSystem.get(uri, hdfsConf);
Path file = new Path(getTestDir(), UUID.randomUUID().toString());
long expires = System.currentTimeMillis() + 50000;
RecordWriter writer = mgr.createWriter(fs, file, 50000);
Assert.assertTrue(expires <= writer.getExpiresOn());
Assert.assertFalse(writer.isTextFile());
Assert.assertTrue(writer.isSeqFile());
Record record = RecordCreator.create();
record.set(Field.create("a"));
writer.write(record);
writer.close();
SequenceFile.Reader reader = new SequenceFile.Reader(fs, file, new HdfsConfiguration());
Text key = new Text();
Text value = new Text();
Assert.assertTrue(reader.next(key, value));
Assert.assertNotNull(UUID.fromString(key.toString()));
Assert.assertEquals("a", value.toString().trim());
Assert.assertFalse(reader.next(key, value));
reader.close();
}
@Test
public void typical() throws IOException {
File input = temp.newFolder("input");
File inputSub2 = new File(input, "sub1/sub2");
inputSub2.mkdirs();
Files.asCharSink(new File(inputSub2, "data"), UTF_8).write("test1");
File listFile = temp.newFile("listFile");
Path pathToListFile = new Path(listFile.toURI());
List<Path> sourceDataLocations = new ArrayList<>();
sourceDataLocations.add(new Path(inputSub2.toURI()));
DistCpOptions options = new DistCpOptions(sourceDataLocations, new Path("dummy"));
CircusTrainCopyListing.setRootPath(conf, new Path(input.toURI()));
CircusTrainCopyListing copyListing = new CircusTrainCopyListing(conf, null);
copyListing.doBuildListing(pathToListFile, options);
try (Reader reader = new SequenceFile.Reader(conf, SequenceFile.Reader.file(pathToListFile))) {
Text key = new Text();
CopyListingFileStatus value = new CopyListingFileStatus();
assertTrue(reader.next(key, value));
assertThat(key.toString(), is("/sub1/sub2"));
assertThat(value.getPath().toUri().toString(), endsWith("/input/sub1/sub2"));
assertTrue(reader.next(key, value));
assertThat(key.toString(), is("/sub1/sub2/data"));
assertThat(value.getPath().toUri().toString(), endsWith("/input/sub1/sub2/data"));
assertFalse(reader.next(key, value));
}
}
/**
* Creates iterator for sequence file records
* @param reader The sequence file reader
* @param start The starting record number
* @param limit number of records to read
*/
public RecordIterator(SequenceFile.Reader reader, long start,
long limit) throws IOException {
this.reader = reader;
this.key = (Writable) ReflectionUtils.newInstance(
reader.getKeyClass(), CONFIG);
this.value = (Writable) ReflectionUtils.newInstance(
reader.getValueClass(), CONFIG);
this.limit = limit;
//skip rows till the start position ;
for(int i = 0; i < start && reader.next(key, value); i++);
this.next = readNext();
}
/**
* This tests {@link SequenceFileWriter}
* with non-rolling output and without compression.
*/
@Test
public void testNonRollingSequenceFileWithoutCompressionWriter() throws Exception {
final int numElements = 20;
final String outPath = hdfsURI + "/seq-no-comp-non-rolling-out";
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(2);
DataStream<Tuple2<Integer, String>> source = env.addSource(new TestSourceFunction(numElements))
.broadcast()
.filter(new OddEvenFilter());
DataStream<Tuple2<IntWritable, Text>> mapped = source.map(new MapFunction<Tuple2<Integer, String>, Tuple2<IntWritable, Text>>() {
private static final long serialVersionUID = 1L;
@Override
public Tuple2<IntWritable, Text> map(Tuple2<Integer, String> value) throws Exception {
return Tuple2.of(new IntWritable(value.f0), new Text(value.f1));
}
});
RollingSink<Tuple2<IntWritable, Text>> sink = new RollingSink<Tuple2<IntWritable, Text>>(outPath)
.setWriter(new SequenceFileWriter<IntWritable, Text>())
.setBucketer(new NonRollingBucketer())
.setPartPrefix("part")
.setPendingPrefix("")
.setPendingSuffix("");
mapped.addSink(sink);
env.execute("RollingSink String Write Test");
FSDataInputStream inStream = dfs.open(new Path(outPath + "/part-0-0"));
SequenceFile.Reader reader = new SequenceFile.Reader(inStream,
1000,
0,
100000,
new Configuration());
IntWritable intWritable = new IntWritable();
Text txt = new Text();
for (int i = 0; i < numElements; i += 2) {
reader.next(intWritable, txt);
Assert.assertEquals(i, intWritable.get());
Assert.assertEquals("message #" + i, txt.toString());
}
reader.close();
inStream.close();
inStream = dfs.open(new Path(outPath + "/part-1-0"));
reader = new SequenceFile.Reader(inStream,
1000,
0,
100000,
new Configuration());
for (int i = 1; i < numElements; i += 2) {
reader.next(intWritable, txt);
Assert.assertEquals(i, intWritable.get());
Assert.assertEquals("message #" + i, txt.toString());
}
reader.close();
inStream.close();
}
/**
* This tests {@link SequenceFileWriter}
* with non-rolling output but with compression.
*/
@Test
public void testNonRollingSequenceFileWithCompressionWriter() throws Exception {
final int numElements = 20;
final String outPath = hdfsURI + "/seq-non-rolling-out";
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(2);
DataStream<Tuple2<Integer, String>> source = env.addSource(new TestSourceFunction(numElements))
.broadcast()
.filter(new OddEvenFilter());
DataStream<Tuple2<IntWritable, Text>> mapped = source.map(new MapFunction<Tuple2<Integer, String>, Tuple2<IntWritable, Text>>() {
private static final long serialVersionUID = 1L;
@Override
public Tuple2<IntWritable, Text> map(Tuple2<Integer, String> value) throws Exception {
return Tuple2.of(new IntWritable(value.f0), new Text(value.f1));
}
});
RollingSink<Tuple2<IntWritable, Text>> sink = new RollingSink<Tuple2<IntWritable, Text>>(outPath)
.setWriter(new SequenceFileWriter<IntWritable, Text>("Default", SequenceFile.CompressionType.BLOCK))
.setBucketer(new NonRollingBucketer())
.setPartPrefix("part")
.setPendingPrefix("")
.setPendingSuffix("");
mapped.addSink(sink);
env.execute("RollingSink String Write Test");
FSDataInputStream inStream = dfs.open(new Path(outPath + "/part-0-0"));
SequenceFile.Reader reader = new SequenceFile.Reader(inStream,
1000,
0,
100000,
new Configuration());
IntWritable intWritable = new IntWritable();
Text txt = new Text();
for (int i = 0; i < numElements; i += 2) {
reader.next(intWritable, txt);
Assert.assertEquals(i, intWritable.get());
Assert.assertEquals("message #" + i, txt.toString());
}
reader.close();
inStream.close();
inStream = dfs.open(new Path(outPath + "/part-1-0"));
reader = new SequenceFile.Reader(inStream,
1000,
0,
100000,
new Configuration());
for (int i = 1; i < numElements; i += 2) {
reader.next(intWritable, txt);
Assert.assertEquals(i, intWritable.get());
Assert.assertEquals("message #" + i, txt.toString());
}
reader.close();
inStream.close();
}
public void runWhereTest(String whereClause, String firstValStr,
int numExpectedResults, int expectedSum) throws IOException {
String [] columns = HsqldbTestServer.getFieldNames();
ClassLoader prevClassLoader = null;
SequenceFile.Reader reader = null;
String [] argv = getArgv(true, columns, whereClause);
runImport(argv);
try {
SqoopOptions opts = new ImportTool().parseArguments(
getArgv(false, columns, whereClause),
null, null, true);
CompilationManager compileMgr = new CompilationManager(opts);
String jarFileName = compileMgr.getJarFilename();
prevClassLoader = ClassLoaderStack.addJarFile(jarFileName,
getTableName());
reader = SeqFileReader.getSeqFileReader(getDataFilePath().toString());
// here we can actually instantiate (k, v) pairs.
Configuration conf = new Configuration();
Object key = ReflectionUtils.newInstance(reader.getKeyClass(), conf);
Object val = ReflectionUtils.newInstance(reader.getValueClass(), conf);
if (reader.next(key) == null) {
fail("Empty SequenceFile during import");
}
// make sure that the value we think should be at the top, is.
reader.getCurrentValue(val);
assertEquals("Invalid ordering within sorted SeqFile", firstValStr,
val.toString());
// We know that these values are two ints separated by a ',' character.
// Since this is all dynamic, though, we don't want to actually link
// against the class and use its methods. So we just parse this back
// into int fields manually. Sum them up and ensure that we get the
// expected total for the first column, to verify that we got all the
// results from the db into the file.
int curSum = getFirstInt(val.toString());
int totalResults = 1;
// now sum up everything else in the file.
while (reader.next(key) != null) {
reader.getCurrentValue(val);
curSum += getFirstInt(val.toString());
totalResults++;
}
assertEquals("Total sum of first db column mismatch", expectedSum,
curSum);
assertEquals("Incorrect number of results for query", numExpectedResults,
totalResults);
} catch (InvalidOptionsException ioe) {
fail(ioe.toString());
} catch (ParseException pe) {
fail(pe.toString());
} finally {
IOUtils.closeStream(reader);
if (null != prevClassLoader) {
ClassLoaderStack.setCurrentClassLoader(prevClassLoader);
}
}
}
@Override
public List<InputSplit> getSplits(JobContext jobCtxt) throws IOException {
final JobConf jobConf = new JobConf(jobCtxt.getConfiguration());
final JobClient client = new JobClient(jobConf);
ClusterStatus stat = client.getClusterStatus(true);
int numTrackers = stat.getTaskTrackers();
final int fileCount = jobConf.getInt(GRIDMIX_DISTCACHE_FILE_COUNT, -1);
// Total size of distributed cache files to be generated
final long totalSize = jobConf.getLong(GRIDMIX_DISTCACHE_BYTE_COUNT, -1);
// Get the path of the special file
String distCacheFileList = jobConf.get(GRIDMIX_DISTCACHE_FILE_LIST);
if (fileCount < 0 || totalSize < 0 || distCacheFileList == null) {
throw new RuntimeException("Invalid metadata: #files (" + fileCount
+ "), total_size (" + totalSize + "), filelisturi ("
+ distCacheFileList + ")");
}
Path sequenceFile = new Path(distCacheFileList);
FileSystem fs = sequenceFile.getFileSystem(jobConf);
FileStatus srcst = fs.getFileStatus(sequenceFile);
// Consider the number of TTs * mapSlotsPerTracker as number of mappers.
int numMapSlotsPerTracker = jobConf.getInt(TTConfig.TT_MAP_SLOTS, 2);
int numSplits = numTrackers * numMapSlotsPerTracker;
List<InputSplit> splits = new ArrayList<InputSplit>(numSplits);
LongWritable key = new LongWritable();
BytesWritable value = new BytesWritable();
// Average size of data to be generated by each map task
final long targetSize = Math.max(totalSize / numSplits,
DistributedCacheEmulator.AVG_BYTES_PER_MAP);
long splitStartPosition = 0L;
long splitEndPosition = 0L;
long acc = 0L;
long bytesRemaining = srcst.getLen();
SequenceFile.Reader reader = null;
try {
reader = new SequenceFile.Reader(fs, sequenceFile, jobConf);
while (reader.next(key, value)) {
// If adding this file would put this split past the target size,
// cut the last split and put this file in the next split.
if (acc + key.get() > targetSize && acc != 0) {
long splitSize = splitEndPosition - splitStartPosition;
splits.add(new FileSplit(
sequenceFile, splitStartPosition, splitSize, (String[])null));
bytesRemaining -= splitSize;
splitStartPosition = splitEndPosition;
acc = 0L;
}
acc += key.get();
splitEndPosition = reader.getPosition();
}
} finally {
if (reader != null) {
reader.close();
}
}
if (bytesRemaining != 0) {
splits.add(new FileSplit(
sequenceFile, splitStartPosition, bytesRemaining, (String[])null));
}
return splits;
}
/**
* Run a map/reduce job for estimating Pi.
*
* @return the estimated value of Pi
*/
public static BigDecimal estimate(int numMaps, long numPoints, JobConf jobConf
) throws IOException {
//setup job conf
jobConf.setJobName(PiEstimator.class.getSimpleName());
jobConf.setInputFormat(SequenceFileInputFormat.class);
jobConf.setOutputKeyClass(BooleanWritable.class);
jobConf.setOutputValueClass(LongWritable.class);
jobConf.setOutputFormat(SequenceFileOutputFormat.class);
jobConf.setMapperClass(PiMapper.class);
jobConf.setNumMapTasks(numMaps);
jobConf.setReducerClass(PiReducer.class);
jobConf.setNumReduceTasks(1);
// turn off speculative execution, because DFS doesn't handle
// multiple writers to the same file.
jobConf.setSpeculativeExecution(false);
//setup input/output directories
final Path inDir = new Path(TMP_DIR, "in");
final Path outDir = new Path(TMP_DIR, "out");
FileInputFormat.setInputPaths(jobConf, inDir);
FileOutputFormat.setOutputPath(jobConf, outDir);
final FileSystem fs = FileSystem.get(jobConf);
if (fs.exists(TMP_DIR)) {
throw new IOException("Tmp directory " + fs.makeQualified(TMP_DIR)
+ " already exists. Please remove it first.");
}
if (!fs.mkdirs(inDir)) {
throw new IOException("Cannot create input directory " + inDir);
}
try {
//generate an input file for each map task
for(int i=0; i < numMaps; ++i) {
final Path file = new Path(inDir, "part"+i);
final LongWritable offset = new LongWritable(i * numPoints);
final LongWritable size = new LongWritable(numPoints);
final SequenceFile.Writer writer = SequenceFile.createWriter(
fs, jobConf, file,
LongWritable.class, LongWritable.class, CompressionType.NONE);
try {
writer.append(offset, size);
} finally {
writer.close();
}
System.out.println("Wrote input for Map #"+i);
}
//start a map/reduce job
System.out.println("Starting Job");
final long startTime = System.currentTimeMillis();
JobClient.runJob(jobConf);
final double duration = (System.currentTimeMillis() - startTime)/1000.0;
System.out.println("Job Finished in " + duration + " seconds");
//read outputs
Path inFile = new Path(outDir, "reduce-out");
LongWritable numInside = new LongWritable();
LongWritable numOutside = new LongWritable();
SequenceFile.Reader reader = new SequenceFile.Reader(fs, inFile, jobConf);
try {
reader.next(numInside, numOutside);
} finally {
reader.close();
}
//compute estimated value
return BigDecimal.valueOf(4).setScale(20)
.multiply(BigDecimal.valueOf(numInside.get()))
.divide(BigDecimal.valueOf(numMaps))
.divide(BigDecimal.valueOf(numPoints));
} finally {
fs.delete(TMP_DIR, true);
}
}