下面列出了org.apache.hadoop.io.SequenceFile#Writer ( ) 实例代码,或者点击链接到github查看源代码,也可以在右侧发表评论。
private static void createFiles(int length, int numFiles, Random random,
Job job) throws IOException {
Range[] ranges = createRanges(length, numFiles, random);
for (int i = 0; i < numFiles; i++) {
Path file = new Path(workDir, "test_" + i + ".seq");
// create a file with length entries
@SuppressWarnings("deprecation")
SequenceFile.Writer writer =
SequenceFile.createWriter(localFs, job.getConfiguration(), file,
IntWritable.class, BytesWritable.class);
Range range = ranges[i];
try {
for (int j = range.start; j < range.end; j++) {
IntWritable key = new IntWritable(j);
byte[] data = new byte[random.nextInt(10)];
random.nextBytes(data);
BytesWritable value = new BytesWritable(data);
writer.append(key, value);
}
} finally {
writer.close();
}
}
}
private static <T extends WritableComparable> Path writePartitionFile(
String testname, JobConf conf, T[] splits) throws IOException {
final FileSystem fs = FileSystem.getLocal(conf);
final Path testdir = new Path(System.getProperty("test.build.data", "/tmp")
).makeQualified(fs);
Path p = new Path(testdir, testname + "/_partition.lst");
TotalOrderPartitioner.setPartitionFile(conf, p);
conf.setNumReduceTasks(splits.length + 1);
SequenceFile.Writer w = null;
try {
NullWritable nw = NullWritable.get();
w = SequenceFile.createWriter(fs, conf, p,
splits[0].getClass(), NullWritable.class,
SequenceFile.CompressionType.NONE);
for (int i = 0; i < splits.length; ++i) {
w.append(splits[i], NullWritable.get());
}
} finally {
if (null != w)
w.close();
}
return p;
}
/**
* Create control files before a test run.
* Number of files created is equal to the number of maps specified
*
* @throws IOException on error
*/
private static void createControlFiles() throws IOException {
FileSystem tempFS = FileSystem.get(config);
LOG.info("Creating " + numberOfMaps + " control files");
for (int i = 0; i < numberOfMaps; i++) {
String strFileName = "NNBench_Controlfile_" + i;
Path filePath = new Path(new Path(baseDir, CONTROL_DIR_NAME),
strFileName);
SequenceFile.Writer writer = null;
try {
writer = SequenceFile.createWriter(tempFS, config, filePath, Text.class,
LongWritable.class, CompressionType.NONE);
writer.append(new Text(strFileName), new LongWritable(0l));
} catch(Exception e) {
throw new IOException(e.getLocalizedMessage());
} finally {
if (writer != null) {
writer.close();
}
writer = null;
}
}
}
/**
* @param args
* @throws IOException
*/
public static void main(String[] args) throws IOException {
if (args.length != 2) {
System.err.println("Usage: ConvertFastaForCloud file.fa outfile.br");
System.exit(-1);
}
String infile = args[0];
String outfile = args[1];
System.err.println("Converting " + infile + " into " + outfile);
JobConf config = new JobConf();
SequenceFile.Writer writer = SequenceFile.createWriter(FileSystem.get(config), config,
new Path(outfile), IntWritable.class, BytesWritable.class);
convertFile(infile, writer);
writer.close();
System.err.println("min_seq_len: " + min_seq_len);
System.err.println("max_seq_len: " + max_seq_len);
System.err.println("Using DNAString version: " + DNAString.VERSION);
}
public static LinkedHashMap<LongWritable, Text> createInputData(FileSystem fs, Path workDir,
Configuration job, String filename, long startKey, long numKeys, AtomicLong fileLength)
throws IOException {
LinkedHashMap<LongWritable, Text> data = new LinkedHashMap<LongWritable, Text>();
Path file = new Path(workDir, filename);
LOG.info("Generating data at path: " + file);
// create a file with length entries
@SuppressWarnings("deprecation")
SequenceFile.Writer writer = SequenceFile.createWriter(fs, job, file, LongWritable.class,
Text.class);
try {
Random r = new Random(System.currentTimeMillis());
LongWritable key = new LongWritable();
Text value = new Text();
for (long i = startKey; i < numKeys; i++) {
key.set(i);
value.set(Integer.toString(r.nextInt(10000)));
data.put(new LongWritable(key.get()), new Text(value.toString()));
writer.append(key, value);
LOG.info("<k, v> : <" + key.get() + ", " + value + ">");
}
fileLength.addAndGet(writer.getLength());
} finally {
writer.close();
}
return data;
}
private SequenceFile.Writer getWriter(Path pathToListFile) throws IOException {
FileSystem fs = pathToListFile.getFileSystem(getConf());
if (fs.exists(pathToListFile)) {
fs.delete(pathToListFile, false);
}
return SequenceFile.createWriter(getConf(),
SequenceFile.Writer.file(pathToListFile),
SequenceFile.Writer.keyClass(Text.class),
SequenceFile.Writer.valueClass(CopyListingFileStatus.class),
SequenceFile.Writer.compression(SequenceFile.CompressionType.NONE));
}
@VisibleForTesting
public void doBuildListing(SequenceFile.Writer fileListWriter, S3MapReduceCpOptions options, List<Path> globbedPaths)
throws IOException {
try {
for (Path path : globbedPaths) {
FileSystem sourceFS = path.getFileSystem(getConf());
path = makeQualified(path);
FileStatus rootStatus = sourceFS.getFileStatus(path);
Path sourcePathRoot = computeSourceRootPath(rootStatus, options);
LOG.info("Root source path is {}", sourcePathRoot);
FileStatus[] sourceFiles = sourceFS.listStatus(path);
boolean explore = (sourceFiles != null && sourceFiles.length > 0);
if (explore || rootStatus.isDirectory()) {
for (FileStatus sourceStatus : sourceFiles) {
if (sourceStatus.isFile()) {
LOG.debug("Recording source-path: {} for copy.", sourceStatus.getPath());
CopyListingFileStatus sourceCopyListingStatus = new CopyListingFileStatus(sourceStatus);
writeToFileListing(fileListWriter, sourceCopyListingStatus, sourcePathRoot, options);
}
if (isDirectoryAndNotEmpty(sourceFS, sourceStatus)) {
LOG.debug("Traversing non-empty source dir: {}", sourceStatus.getPath());
traverseNonEmptyDirectory(fileListWriter, sourceStatus, sourcePathRoot, options);
}
}
}
}
fileListWriter.close();
fileListWriter = null;
} finally {
IoUtil.closeSilently(LOG, fileListWriter);
}
}
/**
* Use the input splits to take samples of the input and generate sample
* keys. By default reads 100,000 keys from 10 locations in the input, sorts
* them and picks N-1 keys to generate N equally sized partitions.
* @param conf the job to sample
* @param partFile where to write the output file to
* @throws IOException if something goes wrong
*/
public static void writePartitionFile(JobConf conf,
Path partFile) throws IOException {
TeraInputFormat inFormat = new TeraInputFormat();
TextSampler sampler = new TextSampler();
Text key = new Text();
Text value = new Text();
int partitions = conf.getNumReduceTasks();
long sampleSize = conf.getLong(SAMPLE_SIZE, 100000);
InputSplit[] splits = inFormat.getSplits(conf, conf.getNumMapTasks());
int samples = Math.min(10, splits.length);
long recordsPerSample = sampleSize / samples;
int sampleStep = splits.length / samples;
long records = 0;
// take N samples from different parts of the input
for(int i=0; i < samples; ++i) {
RecordReader<Text,Text> reader =
inFormat.getRecordReader(splits[sampleStep * i], conf, null);
while (reader.next(key, value)) {
sampler.addKey(key);
records += 1;
if ((i+1) * recordsPerSample <= records) {
break;
}
}
}
FileSystem outFs = partFile.getFileSystem(conf);
if (outFs.exists(partFile)) {
outFs.delete(partFile, false);
}
SequenceFile.Writer writer =
SequenceFile.createWriter(outFs, conf, partFile, Text.class,
NullWritable.class);
NullWritable nullValue = NullWritable.get();
for(Text split : sampler.createPartitions(partitions)) {
writer.append(split, nullValue);
}
writer.close();
}
private static Path[] writeSimpleSrc(Path testdir, JobConf conf,
int srcs) throws IOException {
SequenceFile.Writer out[] = null;
Path[] src = new Path[srcs];
try {
out = createWriters(testdir, conf, srcs, src);
final int capacity = srcs * 2 + 1;
Text key = new Text();
key.set("ignored");
Text val = new Text();
for (int k = 0; k < capacity; ++k) {
for (int i = 0; i < srcs; ++i) {
val.set(Integer.toString(k % srcs == 0 ? k * srcs : k * srcs + i) +
"\t" + Integer.toString(10 * k + i));
out[i].append(key, val);
if (i == k) {
// add duplicate key
out[i].append(key, val);
}
}
}
} finally {
if (out != null) {
for (int i = 0; i < srcs; ++i) {
if (out[i] != null)
out[i].close();
}
}
}
return src;
}
@Override
public void setDataTarget(Object outputDataTarget) throws IOException {
if (outputDataTarget instanceof SequenceFile.Writer) {
writer = (SequenceFile.Writer) outputDataTarget;
return;
}
if (outputDataTarget instanceof URI) {
URI targetURI = (URI) outputDataTarget;
targetURI = HadoopSequenceFileParser.sandboxToFileURI(targetURI);
ClassLoader formerContextClassloader = Thread.currentThread().getContextClassLoader();
Thread.currentThread().setContextClassLoader(this.getClass().getClassLoader());
try {
if (fs == null) {
fs = HadoopSequenceFileParser.getFileSystem(targetURI, graph, user, config, this);
}
writer = SequenceFile.createWriter(fs, config,
new Path(targetURI.getPath()), // Path to new file on fileSystem
keyCopy.getValueClass(), // Key Data Type
valCopy.getValueClass(), // Value Data Type
SequenceFile.CompressionType.NONE);
} catch (ComponentNotReadyException e) {
throw new IOException("Failed to create Hadoop sequence file writer", e);
} finally {
Thread.currentThread().setContextClassLoader(formerContextClassloader);
}
} else {
throw new IOException("Unsupported data target type: " + outputDataTarget.getClass().getName());
}
}
@Override
protected Path createInputFileListing(Job job) throws IOException {
if (conf.get(NUMBER_OF_LEVELS_TO_PRESERVE_KEY) == null) {
return super.createInputFileListing(job);
}
long totalBytesExpected = 0;
int totalRecords = 0;
Path fileListingPath = getFileListingPath();
try (SequenceFile.Writer writer = getWriter(fileListingPath)) {
List<Path> srcFiles = getSourceFiles();
if (srcFiles.size() == 0) {
return fileListingPath;
}
totalRecords = srcFiles.size();
FileSystem fs = srcFiles.get(0).getFileSystem(conf);
for (Path path : srcFiles) {
FileStatus fst = fs.getFileStatus(path);
totalBytesExpected += fst.getLen();
Text key = getKey(path);
writer.append(key, new CopyListingFileStatus(fst));
}
writer.close();
// update jobs configuration
Configuration cfg = job.getConfiguration();
cfg.setLong(DistCpConstants.CONF_LABEL_TOTAL_BYTES_TO_BE_COPIED, totalBytesExpected);
cfg.set(DistCpConstants.CONF_LABEL_LISTING_FILE_PATH, fileListingPath.toString());
cfg.setLong(DistCpConstants.CONF_LABEL_TOTAL_NUMBER_OF_RECORDS, totalRecords);
} catch (NoSuchFieldException | SecurityException | IllegalArgumentException
| IllegalAccessException | NoSuchMethodException | ClassNotFoundException
| InvocationTargetException e) {
throw new IOException(e);
}
return fileListingPath;
}
private org.apache.hadoop.hive.ql.exec.FileSinkOperator.RecordWriter getMrWorkingPathWriter(
final Configuration configuration) throws IOException {
PrivilegedExceptionAction<org.apache.hadoop.hive.ql.exec.FileSinkOperator.RecordWriter> privilegedExceptionAction = new PrivilegedExceptionAction<org.apache.hadoop.hive.ql.exec.FileSinkOperator.RecordWriter>() {
@Override
public org.apache.hadoop.hive.ql.exec.FileSinkOperator.RecordWriter run() throws Exception {
String workingPathStr = configuration.get(BlurConstants.BLUR_BULK_UPDATE_WORKING_PATH);
Path workingPath = new Path(workingPathStr);
Path tmpDir = new Path(workingPath, "tmp");
FileSystem fileSystem = tmpDir.getFileSystem(configuration);
String loadId = configuration.get(BlurSerDe.BLUR_MR_LOAD_ID);
Path loadPath = new Path(tmpDir, loadId);
final Writer writer = new SequenceFile.Writer(fileSystem, configuration, new Path(loadPath, UUID.randomUUID()
.toString()), Text.class, BlurRecord.class);
return new org.apache.hadoop.hive.ql.exec.FileSinkOperator.RecordWriter() {
@Override
public void write(Writable w) throws IOException {
BlurRecord blurRecord = (BlurRecord) w;
String rowId = blurRecord.getRowId();
writer.append(new Text(rowId), blurRecord);
}
@Override
public void close(boolean abort) throws IOException {
writer.close();
}
};
}
};
UserGroupInformation userGroupInformation = getUGI(configuration);
try {
return userGroupInformation.doAs(privilegedExceptionAction);
} catch (InterruptedException e) {
throw new IOException(e);
}
}
/**
* The main driver for <code>LoadTypedBytes</code>.
*/
public int run(String[] args) throws Exception {
if (args.length == 0) {
System.err.println("Too few arguments!");
printUsage();
return 1;
}
Path path = new Path(args[0]);
FileSystem fs = path.getFileSystem(getConf());
if (fs.exists(path)) {
System.err.println("given path exists already!");
return -1;
}
TypedBytesInput tbinput = new TypedBytesInput(new DataInputStream(System.in));
SequenceFile.Writer writer = SequenceFile.createWriter(fs, conf, path,
TypedBytesWritable.class, TypedBytesWritable.class);
try {
TypedBytesWritable key = new TypedBytesWritable();
TypedBytesWritable value = new TypedBytesWritable();
byte[] rawKey = tbinput.readRaw();
while (rawKey != null) {
byte[] rawValue = tbinput.readRaw();
key.set(rawKey, 0, rawKey.length);
value.set(rawValue, 0, rawValue.length);
writer.append(key, value);
rawKey = tbinput.readRaw();
}
} finally {
writer.close();
}
return 0;
}
public static void main(String args[]) throws Exception {
System.out.println("Sequence File Creator");
String uri = args[0]; // output sequence file name
String filePath = args[1]; // text file to read from; Odd line is key,
// even line is value
Configuration conf = new Configuration();
FileSystem fs = FileSystem.get(URI.create(uri), conf);
Path path = new Path(uri);
SequenceFile.Writer writer = null;
SimpleSequenceFileKey key = new SimpleSequenceFileKey();
String line = null;
try (BufferedReader buffer =
new BufferedReader(new FileReader(filePath))) {
SimpleSequenceFileValue<Text> value =
new SimpleSequenceFileValue<Text>();
writer = SequenceFile.createWriter(fs, conf, path, key.getClass(),
value.getClass(), CompressionType.RECORD, new GzipCodec());
while ((line = buffer.readLine()) != null) {
key.setDocumentURI(new DocumentURI(line));
if ((line = buffer.readLine()) == null) {
break;
}
value.setValue(new Text(line));
writer.append(key, value);
}
} finally {
IOUtils.closeStream(writer);
}
}
public void runJob(int items) {
try {
JobConf conf = new JobConf(TestMapRed.class);
Path testdir = new Path("build/test/test.mapred.spill");
Path inDir = new Path(testdir, "in");
Path outDir = new Path(testdir, "out");
FileSystem fs = FileSystem.get(conf);
fs.delete(testdir, true);
conf.setInt("io.sort.mb", 1);
conf.setInputFormat(SequenceFileInputFormat.class);
FileInputFormat.setInputPaths(conf, inDir);
FileOutputFormat.setOutputPath(conf, outDir);
conf.setMapperClass(IdentityMapper.class);
conf.setReducerClass(IdentityReducer.class);
conf.setOutputKeyClass(Text.class);
conf.setOutputValueClass(Text.class);
conf.setOutputFormat(SequenceFileOutputFormat.class);
if (!fs.mkdirs(testdir)) {
throw new IOException("Mkdirs failed to create " + testdir.toString());
}
if (!fs.mkdirs(inDir)) {
throw new IOException("Mkdirs failed to create " + inDir.toString());
}
Path inFile = new Path(inDir, "part0");
SequenceFile.Writer writer = SequenceFile.createWriter(fs, conf, inFile,
Text.class, Text.class);
StringBuffer content = new StringBuffer();
for (int i = 0; i < 1000; i++) {
content.append(i).append(": This is one more line of content\n");
}
Text text = new Text(content.toString());
for (int i = 0; i < items; i++) {
writer.append(new Text("rec:" + i), text);
}
writer.close();
JobClient.runJob(conf);
} catch (Exception e) {
fail("Threw exception:" + e);
}
}
public static void main(String[] args) throws Exception {
if (args.length < 3) {
System.out.println("Usage: <TRAINING_DATA> <LABELS_DATA> <OUTPUT_PATH>");
System.out
.println("ex) train-images.idx3-ubyte train-labels.idx1-ubyte /tmp/mnist.seq");
System.exit(1);
}
String training_data = args[0];
String labels_data = args[1];
String output = args[2];
HamaConfiguration conf = new HamaConfiguration();
conf.set("dfs.block.size", "11554432");
FileSystem fs = FileSystem.get(conf);
DataInputStream imagesIn = new DataInputStream(new FileInputStream(
new File(training_data)));
DataInputStream labelsIn = new DataInputStream(new FileInputStream(
new File(labels_data)));
imagesIn.readInt(); // Magic number
int count = imagesIn.readInt();
labelsIn.readInt(); // Magic number
labelsIn.readInt(); // Count
imagesIn.readInt(); // Rows
imagesIn.readInt(); // Cols
System.out.println("Writing " + count + " samples on " + output);
byte[][] images = new byte[count][PIXELS];
byte[] labels = new byte[count];
for (int n = 0; n < count; n++) {
imagesIn.readFully(images[n]);
labels[n] = labelsIn.readByte();
}
@SuppressWarnings("deprecation")
SequenceFile.Writer writer = new SequenceFile.Writer(fs, conf, new Path(
output), LongWritable.class, FloatVectorWritable.class);
for (int i = 0; i < count; i++) {
float[] vals = new float[PIXELS + 10];
for (int j = 0; j < PIXELS; j++) {
vals[j] = rescale((images[i][j] & 0xff));
}
int label = (labels[i] & 0xff);
// embedding to one-hot vector
for (int j = 0; j < 10; j++) {
if (j == label)
vals[PIXELS + j] = 1.0f;
else
vals[PIXELS + j] = 0.0f;
}
writer.append(new LongWritable(), new FloatVectorWritable(
new DenseFloatVector(vals)));
}
imagesIn.close();
labelsIn.close();
writer.close();
}
private static void createBigMapInputFile(Configuration conf, FileSystem fs,
Path dir, long fileSizeInMB)
throws IOException {
// Check if the input path exists and is non-empty
if (fs.exists(dir)) {
FileStatus[] list = fs.listStatus(dir);
if (list != null && list.length > 0) {
throw new IOException("Input path: " + dir + " already exists... ");
}
}
Path file = new Path(dir, "part-0");
SequenceFile.Writer writer =
SequenceFile.createWriter(fs, conf, file,
BytesWritable.class, BytesWritable.class,
CompressionType.NONE);
long numBytesToWrite = fileSizeInMB * 1024 * 1024;
int minKeySize = conf.getInt("test.bmo.min_key", 10);;
int keySizeRange =
conf.getInt("test.bmo.max_key", 1000) - minKeySize;
int minValueSize = conf.getInt("test.bmo.min_value", 0);
int valueSizeRange =
conf.getInt("test.bmo.max_value", 20000) - minValueSize;
BytesWritable randomKey = new BytesWritable();
BytesWritable randomValue = new BytesWritable();
LOG.info("Writing " + numBytesToWrite + " bytes to " + file + " with " +
"minKeySize: " + minKeySize + " keySizeRange: " + keySizeRange +
" minValueSize: " + minValueSize + " valueSizeRange: " + valueSizeRange);
long start = System.currentTimeMillis();
while (numBytesToWrite > 0) {
int keyLength = minKeySize +
(keySizeRange != 0 ? random.nextInt(keySizeRange) : 0);
randomKey.setSize(keyLength);
randomizeBytes(randomKey.getBytes(), 0, randomKey.getLength());
int valueLength = minValueSize +
(valueSizeRange != 0 ? random.nextInt(valueSizeRange) : 0);
randomValue.setSize(valueLength);
randomizeBytes(randomValue.getBytes(), 0, randomValue.getLength());
writer.append(randomKey, randomValue);
numBytesToWrite -= keyLength + valueLength;
}
writer.close();
long end = System.currentTimeMillis();
LOG.info("Created " + file + " of size: " + fileSizeInMB + "MB in " +
(end-start)/1000 + "secs");
}
private static void createBigMapInputFile(Configuration conf, FileSystem fs,
Path dir, long fileSizeInMB)
throws IOException {
// Check if the input path exists and is non-empty
if (fs.exists(dir)) {
FileStatus[] list = fs.listStatus(dir);
if (list.length > 0) {
throw new IOException("Input path: " + dir + " already exists... ");
}
}
Path file = new Path(dir, "part-0");
SequenceFile.Writer writer =
SequenceFile.createWriter(fs, conf, file,
BytesWritable.class, BytesWritable.class,
CompressionType.NONE);
long numBytesToWrite = fileSizeInMB * 1024 * 1024;
int minKeySize = conf.getInt(MIN_KEY, 10);;
int keySizeRange =
conf.getInt(MAX_KEY, 1000) - minKeySize;
int minValueSize = conf.getInt(MIN_VALUE, 0);
int valueSizeRange =
conf.getInt(MAX_VALUE, 20000) - minValueSize;
BytesWritable randomKey = new BytesWritable();
BytesWritable randomValue = new BytesWritable();
LOG.info("Writing " + numBytesToWrite + " bytes to " + file + " with " +
"minKeySize: " + minKeySize + " keySizeRange: " + keySizeRange +
" minValueSize: " + minValueSize + " valueSizeRange: " + valueSizeRange);
long start = System.currentTimeMillis();
while (numBytesToWrite > 0) {
int keyLength = minKeySize +
(keySizeRange != 0 ? random.nextInt(keySizeRange) : 0);
randomKey.setSize(keyLength);
randomizeBytes(randomKey.getBytes(), 0, randomKey.getLength());
int valueLength = minValueSize +
(valueSizeRange != 0 ? random.nextInt(valueSizeRange) : 0);
randomValue.setSize(valueLength);
randomizeBytes(randomValue.getBytes(), 0, randomValue.getLength());
writer.append(randomKey, randomValue);
numBytesToWrite -= keyLength + valueLength;
}
writer.close();
long end = System.currentTimeMillis();
LOG.info("Created " + file + " of size: " + fileSizeInMB + "MB in " +
(end-start)/1000 + "secs");
}
/**
* Run a map/reduce job for estimating Pi.
*
* @return the estimated value of Pi
*/
public static BigDecimal estimatePi(int numMaps, long numPoints,
Path tmpDir, Configuration conf
) throws IOException, ClassNotFoundException, InterruptedException {
Job job = Job.getInstance(conf);
//setup job conf
job.setJobName(QuasiMonteCarlo.class.getSimpleName());
job.setJarByClass(QuasiMonteCarlo.class);
job.setInputFormatClass(SequenceFileInputFormat.class);
job.setOutputKeyClass(BooleanWritable.class);
job.setOutputValueClass(LongWritable.class);
job.setOutputFormatClass(SequenceFileOutputFormat.class);
job.setMapperClass(QmcMapper.class);
job.setReducerClass(QmcReducer.class);
job.setNumReduceTasks(1);
// turn off speculative execution, because DFS doesn't handle
// multiple writers to the same file.
job.setSpeculativeExecution(false);
//setup input/output directories
final Path inDir = new Path(tmpDir, "in");
final Path outDir = new Path(tmpDir, "out");
FileInputFormat.setInputPaths(job, inDir);
FileOutputFormat.setOutputPath(job, outDir);
final FileSystem fs = FileSystem.get(conf);
if (fs.exists(tmpDir)) {
throw new IOException("Tmp directory " + fs.makeQualified(tmpDir)
+ " already exists. Please remove it first.");
}
if (!fs.mkdirs(inDir)) {
throw new IOException("Cannot create input directory " + inDir);
}
try {
//generate an input file for each map task
for(int i=0; i < numMaps; ++i) {
final Path file = new Path(inDir, "part"+i);
final LongWritable offset = new LongWritable(i * numPoints);
final LongWritable size = new LongWritable(numPoints);
final SequenceFile.Writer writer = SequenceFile.createWriter(
fs, conf, file,
LongWritable.class, LongWritable.class, CompressionType.NONE);
try {
writer.append(offset, size);
} finally {
writer.close();
}
System.out.println("Wrote input for Map #"+i);
}
//start a map/reduce job
System.out.println("Starting Job");
final long startTime = System.currentTimeMillis();
job.waitForCompletion(true);
final double duration = (System.currentTimeMillis() - startTime)/1000.0;
System.out.println("Job Finished in " + duration + " seconds");
//read outputs
Path inFile = new Path(outDir, "reduce-out");
LongWritable numInside = new LongWritable();
LongWritable numOutside = new LongWritable();
SequenceFile.Reader reader = new SequenceFile.Reader(fs, inFile, conf);
try {
reader.next(numInside, numOutside);
} finally {
reader.close();
}
//compute estimated value
final BigDecimal numTotal
= BigDecimal.valueOf(numMaps).multiply(BigDecimal.valueOf(numPoints));
return BigDecimal.valueOf(4).setScale(20)
.multiply(BigDecimal.valueOf(numInside.get()))
.divide(numTotal, RoundingMode.HALF_UP);
} finally {
fs.delete(tmpDir, true);
}
}
@SuppressWarnings("deprecation")
protected final void writeBinaryBlockMatrixToSequenceFile( Path path, JobConf job, FileSystem fs, MatrixBlock src, int blen, int rl, int ru )
throws IOException
{
boolean sparse = src.isInSparseFormat();
int rlen = src.getNumRows();
int clen = src.getNumColumns();
// 1) create sequence file writer, with right replication factor
// (config via MRConfigurationNames.DFS_REPLICATION not possible since sequence file internally calls fs.getDefaultReplication())
SequenceFile.Writer writer = null;
if( _replication > 0 ) //if replication specified (otherwise default)
{
//copy of SequenceFile.Writer(fs, job, path, MatrixIndexes.class, MatrixBlock.class), except for replication
writer = new SequenceFile.Writer(fs, job, path, MatrixIndexes.class, MatrixBlock.class,
job.getInt(HDFSTool.IO_FILE_BUFFER_SIZE, 4096),
(short)_replication, fs.getDefaultBlockSize(), null, new SequenceFile.Metadata());
}
else
{
writer = new SequenceFile.Writer(fs, job, path, MatrixIndexes.class, MatrixBlock.class);
}
try
{
// 2) bound check for src block
if( src.getNumRows() > rlen || src.getNumColumns() > clen )
{
throw new IOException("Matrix block [1:"+src.getNumRows()+",1:"+src.getNumColumns()+"] " +
"out of overall matrix range [1:"+rlen+",1:"+clen+"].");
}
//3) reblock and write
MatrixIndexes indexes = new MatrixIndexes();
if( rlen <= blen && clen <= blen && rl == 0 ) //opt for single block
{
//directly write single block
indexes.setIndexes(1, 1);
writer.append(indexes, src);
}
else //general case
{
//initialize blocks for reuse (at most 4 different blocks required)
MatrixBlock[] blocks = createMatrixBlocksForReuse(rlen, clen, blen, sparse, src.getNonZeros());
//create and write subblocks of matrix
for(int blockRow = rl/blen; blockRow < (int)Math.ceil(ru/(double)blen); blockRow++)
for(int blockCol = 0; blockCol < (int)Math.ceil(src.getNumColumns()/(double)blen); blockCol++)
{
int maxRow = (blockRow*blen + blen < src.getNumRows()) ? blen : src.getNumRows() - blockRow*blen;
int maxCol = (blockCol*blen + blen < src.getNumColumns()) ? blen : src.getNumColumns() - blockCol*blen;
int row_offset = blockRow*blen;
int col_offset = blockCol*blen;
//get reuse matrix block
MatrixBlock block = getMatrixBlockForReuse(blocks, maxRow, maxCol, blen);
//copy submatrix to block
src.slice( row_offset, row_offset+maxRow-1,
col_offset, col_offset+maxCol-1, block );
//append block to sequence file
indexes.setIndexes(blockRow+1, blockCol+1);
writer.append(indexes, block);
//reset block for later reuse
block.reset();
}
}
}
finally {
IOUtilFunctions.closeSilently(writer);
}
}