下面列出了org.apache.hadoop.mapred.TextInputFormat#configure ( ) 实例代码,或者点击链接到github查看源代码,也可以在右侧发表评论。
private void readLIBSVMMatrixFromHDFS(InputSplit[] splits, Path path, JobConf job,
MatrixBlock dest, long rlen, long clen, int blen)
throws IOException
{
FileInputFormat.addInputPath(job, path);
TextInputFormat informat = new TextInputFormat();
informat.configure(job);
ExecutorService pool = CommonThreadPool.get(_numThreads);
try
{
// create read tasks for all splits
ArrayList<LIBSVMReadTask> tasks = new ArrayList<>();
int splitCount = 0;
for (InputSplit split : splits) {
tasks.add( new LIBSVMReadTask(split, _offsets, informat, job, dest, rlen, clen, splitCount++) );
}
pool.invokeAll(tasks);
pool.shutdown();
// check return codes and aggregate nnz
long lnnz = 0;
for (LIBSVMReadTask rt : tasks) {
lnnz += rt.getPartialNnz();
if (!rt.getReturnCode()) {
Exception err = rt.getException();
throw new IOException("Read task for libsvm input failed: "+ err.toString(), err);
}
}
dest.setNonZeros(lnnz);
}
catch (Exception e) {
throw new IOException("Threadpool issue, while parallel read.", e);
}
}
protected TensorBlock readTextCellTensorFromHDFS(Path path, JobConf job, long[] dims, ValueType[] schema) throws IOException {
FileInputFormat.addInputPath(job, path);
TextInputFormat informat = new TextInputFormat();
informat.configure(job);
InputSplit[] splits = informat.getSplits(job, 1);
LongWritable key = new LongWritable();
Text value = new Text();
int[] idims = Arrays.stream(dims).mapToInt(i -> (int) i).toArray();
TensorBlock ret;
if (schema.length == 1)
ret = new TensorBlock(schema[0], idims).allocateBlock();
else
ret = new TensorBlock(schema, idims).allocateBlock();
try {
int[] ix = new int[dims.length];
for (InputSplit split : splits) {
RecordReader<LongWritable, Text> reader = informat.getRecordReader(split, job, Reporter.NULL);
try {
while (reader.next(key, value)) {
String[] parts = Arrays.stream(IOUtilFunctions.splitCSV(value.toString(), " "))
.filter(s -> !s.isEmpty()).toArray(String[]::new);
for (int i = 0; i < ix.length; i++) {
ix[i] = Integer.parseInt(parts[i]) - 1;
}
ret.set(ix, parts[ix.length]);
}
}
finally {
IOUtilFunctions.closeSilently(reader);
}
}
}
catch (Exception ex) {
throw new IOException("Unable to read tensor in text cell format.", ex);
}
return ret;
}
protected Pair<Integer,Integer> computeCSVSize( Path path, JobConf job, FileSystem fs)
throws IOException
{
TextInputFormat informat = new TextInputFormat();
informat.configure(job);
InputSplit[] splits = informat.getSplits(job, 1);
splits = IOUtilFunctions.sortInputSplits(splits);
//compute number of columns
int ncol = IOUtilFunctions.countNumColumnsCSV(splits, informat, job, _props.getDelim());
//compute number of rows
int nrow = 0;
for( int i=0; i<splits.length; i++ )
{
RecordReader<LongWritable, Text> reader = informat.getRecordReader(splits[i], job, Reporter.NULL);
LongWritable key = new LongWritable();
Text value = new Text();
try
{
//ignore header of first split
if( i==0 && _props.hasHeader() )
reader.next(key, value);
//count remaining number of rows, ignore meta data
while ( reader.next(key, value) ) {
String val = value.toString();
nrow += ( val.startsWith(TfUtils.TXMTD_MVPREFIX)
|| val.startsWith(TfUtils.TXMTD_NDPREFIX)) ? 0 : 1;
}
}
finally {
IOUtilFunctions.closeSilently(reader);
}
}
return new Pair<>(nrow, ncol);
}
@Override
protected TensorBlock readTextCellTensorFromHDFS(Path path, JobConf job, long[] dims,
Types.ValueType[] schema) throws IOException {
FileInputFormat.addInputPath(job, path);
TextInputFormat informat = new TextInputFormat();
informat.configure(job);
int[] idims = Arrays.stream(dims).mapToInt(i -> (int) i).toArray();
TensorBlock ret;
if( schema.length == 1 )
ret = new TensorBlock(schema[0], idims).allocateBlock();
else
ret = new TensorBlock(schema, idims).allocateBlock();
try {
ExecutorService pool = CommonThreadPool.get(_numThreads);
InputSplit[] splits = informat.getSplits(job, _numThreads);
//create and execute read tasks for all splits
List<TensorReaderTextCellParallel.ReadTask> tasks = Arrays.stream(splits)
.map(s -> new TensorReaderTextCellParallel.ReadTask(s, informat, job, ret))
.collect(Collectors.toList());
List<Future<Object>> rt = pool.invokeAll(tasks);
//check for exceptions
for (Future<Object> task : rt)
task.get();
pool.shutdown();
}
catch (Exception e) {
throw new IOException("Threadpool issue, while parallel read.", e);
}
return ret;
}
private void readCSVMatrixFromHDFS(InputSplit[] splits, Path path, JobConf job,
MatrixBlock dest, long rlen, long clen, int blen,
boolean hasHeader, String delim, boolean fill, double fillValue)
throws IOException
{
FileInputFormat.addInputPath(job, path);
TextInputFormat informat = new TextInputFormat();
informat.configure(job);
ExecutorService pool = CommonThreadPool.get(_numThreads);
try
{
// create read tasks for all splits
ArrayList<CSVReadTask> tasks = new ArrayList<>();
int splitCount = 0;
for (InputSplit split : splits) {
tasks.add( new CSVReadTask(split, _offsets, informat, job, dest,
rlen, clen, hasHeader, delim, fill, fillValue, splitCount++) );
}
pool.invokeAll(tasks);
pool.shutdown();
// check return codes and aggregate nnz
long lnnz = 0;
for (CSVReadTask rt : tasks) {
lnnz += rt.getPartialNnz();
if (!rt.getReturnCode()) {
Exception err = rt.getException();
throw new IOException("Read task for csv input failed: "+ err.toString(), err);
}
}
dest.setNonZeros(lnnz);
}
catch (Exception e) {
throw new IOException("Threadpool issue, while parallel read.", e);
}
}
public void testNodeProcessingSchema(MapOperator<LongWritable, Text, Text, IntWritable> oper) throws IOException
{
CollectorTestSink sortSink = new CollectorTestSink();
oper.output.setSink(sortSink);
oper.setMapClass(WordCount.Map.class);
oper.setCombineClass(WordCount.Reduce.class);
oper.setDirName(testMeta.testDir);
oper.setConfigFile(null);
oper.setInputFormatClass(TextInputFormat.class);
Configuration conf = new Configuration();
JobConf jobConf = new JobConf(conf);
FileInputFormat.setInputPaths(jobConf, new Path(testMeta.testDir));
TextInputFormat inputFormat = new TextInputFormat();
inputFormat.configure(jobConf);
InputSplit[] splits = inputFormat.getSplits(jobConf, 1);
SerializationFactory serializationFactory = new SerializationFactory(conf);
Serializer keySerializer = serializationFactory.getSerializer(splits[0].getClass());
keySerializer.open(oper.getOutstream());
keySerializer.serialize(splits[0]);
oper.setInputSplitClass(splits[0].getClass());
keySerializer.close();
oper.setup(null);
oper.beginWindow(0);
oper.emitTuples();
oper.emitTuples();
oper.endWindow();
oper.beginWindow(1);
oper.emitTuples();
oper.endWindow();
Assert.assertEquals("number emitted tuples", 3, sortSink.collectedTuples.size());
for (Object o : sortSink.collectedTuples) {
LOG.debug(o.toString());
}
LOG.debug("Done testing round\n");
oper.teardown();
}
protected void readCSVFrameFromHDFS( Path path, JobConf job, FileSystem fs,
FrameBlock dest, ValueType[] schema, String[] names, long rlen, long clen)
throws IOException
{
TextInputFormat informat = new TextInputFormat();
informat.configure(job);
InputSplit[] splits = informat.getSplits(job, 1);
splits = IOUtilFunctions.sortInputSplits(splits);
for( int i=0, rpos=0; i<splits.length; i++ )
rpos = readCSVFrameFromInputSplit(splits[i], informat,
job, dest, schema, names, rlen, clen, rpos, i==0);
}
protected Pair<Integer,Integer> computeCSVSize( Path path, JobConf job, FileSystem fs)
throws IOException
{
TextInputFormat informat = new TextInputFormat();
informat.configure(job);
InputSplit[] splits = informat.getSplits(job, 1);
splits = IOUtilFunctions.sortInputSplits(splits);
//compute number of columns
int ncol = IOUtilFunctions.countNumColumnsCSV(splits, informat, job, _props.getDelim());
//compute number of rows
int nrow = 0;
for( int i=0; i<splits.length; i++ )
{
RecordReader<LongWritable, Text> reader = informat.getRecordReader(splits[i], job, Reporter.NULL);
LongWritable key = new LongWritable();
Text value = new Text();
try
{
//ignore header of first split
if( i==0 && _props.hasHeader() )
reader.next(key, value);
//count remaining number of rows, ignore meta data
while ( reader.next(key, value) ) {
String val = value.toString();
nrow += ( val.startsWith(TfUtils.TXMTD_MVPREFIX)
|| val.startsWith(TfUtils.TXMTD_NDPREFIX)) ? 0 : 1;
}
}
finally {
IOUtilFunctions.closeSilently(reader);
}
}
return new Pair<>(nrow, ncol);
}
@Override
public MatrixBlock readMatrixFromHDFS(String fname, long rlen, long clen,
int blen, long estnnz)
throws IOException, DMLRuntimeException
{
// prepare file access
JobConf job = new JobConf(ConfigurationManager.getCachedJobConf());
Path path = new Path(fname);
FileSystem fs = IOUtilFunctions.getFileSystem(path, job);
FileInputFormat.addInputPath(job, path);
TextInputFormat informat = new TextInputFormat();
informat.configure(job);
InputSplit[] splits = informat.getSplits(job, _numThreads);
splits = IOUtilFunctions.sortInputSplits(splits);
// check existence and non-empty file
checkValidInputFile(fs, path);
// allocate output matrix block
// First Read Pass (count rows/cols, determine offsets, allocate matrix block)
MatrixBlock ret = computeLIBSVMSizeAndCreateOutputMatrixBlock(splits, path, job, rlen, clen, estnnz);
rlen = ret.getNumRows();
clen = ret.getNumColumns();
// Second Read Pass (read, parse strings, append to matrix block)
readLIBSVMMatrixFromHDFS(splits, path, job, ret, rlen, clen, blen);
//post-processing (representation-specific, change of sparse/dense block representation)
// - nnz explicitly maintained in parallel for the individual splits
ret.examSparsity();
// sanity check for parallel row count (since determined internally)
if (rlen >= 0 && rlen != ret.getNumRows())
throw new DMLRuntimeException("Read matrix inconsistent with given meta data: "
+ "expected nrow="+ rlen + ", real nrow=" + ret.getNumRows());
return ret;
}
private void readLIBSVMMatrixFromHDFS(InputSplit[] splits, Path path, JobConf job,
MatrixBlock dest, long rlen, long clen, int blen)
throws IOException
{
FileInputFormat.addInputPath(job, path);
TextInputFormat informat = new TextInputFormat();
informat.configure(job);
ExecutorService pool = CommonThreadPool.get(_numThreads);
try
{
// create read tasks for all splits
ArrayList<LIBSVMReadTask> tasks = new ArrayList<>();
int splitCount = 0;
for (InputSplit split : splits) {
tasks.add( new LIBSVMReadTask(split, _offsets, informat, job, dest, rlen, clen, splitCount++) );
}
pool.invokeAll(tasks);
pool.shutdown();
// check return codes and aggregate nnz
long lnnz = 0;
for (LIBSVMReadTask rt : tasks) {
lnnz += rt.getPartialNnz();
if (!rt.getReturnCode()) {
Exception err = rt.getException();
throw new IOException("Read task for libsvm input failed: "+ err.toString(), err);
}
}
dest.setNonZeros(lnnz);
}
catch (Exception e) {
throw new IOException("Threadpool issue, while parallel read.", e);
}
}
private static void mergeTextCellWithoutComp( String fnameNew, MatrixObject outMo, ArrayList<MatrixObject> inMO )
{
try
{
//delete target file if already exists
HDFSTool.deleteFileIfExistOnHDFS(fnameNew);
if( ALLOW_COPY_CELLFILES )
{
copyAllFiles(fnameNew, inMO);
return; //we're done
}
//actual merge
JobConf job = new JobConf(ConfigurationManager.getCachedJobConf());
Path path = new Path( fnameNew );
FileSystem fs = IOUtilFunctions.getFileSystem(path, job);
BufferedWriter out = new BufferedWriter(new OutputStreamWriter(fs.create(path,true)));
String valueStr = null;
try
{
for( MatrixObject in : inMO ) //read/write all inputs
{
if( LOG.isTraceEnabled() )
LOG.trace("ResultMerge (local, file): Merge input "+in.hashCode()+" (fname="
+in.getFileName()+") via stream merge");
JobConf tmpJob = new JobConf(ConfigurationManager.getCachedJobConf());
Path tmpPath = new Path(in.getFileName());
FileInputFormat.addInputPath(tmpJob, tmpPath);
TextInputFormat informat = new TextInputFormat();
informat.configure(tmpJob);
InputSplit[] splits = informat.getSplits(tmpJob, 1);
LongWritable key = new LongWritable();
Text value = new Text();
for(InputSplit split: splits)
{
RecordReader<LongWritable,Text> reader = informat.getRecordReader(split, tmpJob, Reporter.NULL);
try
{
while(reader.next(key, value))
{
valueStr = value.toString().trim();
out.write( valueStr+"\n" );
}
}
finally {
IOUtilFunctions.closeSilently(reader);
}
}
}
}
finally {
IOUtilFunctions.closeSilently(out);
}
}
catch(Exception ex)
{
throw new DMLRuntimeException("Unable to merge text cell results.", ex);
}
}
private static void createTextCellStagingFile( String fnameStaging, MatrixObject mo, long ID )
throws IOException, DMLRuntimeException
{
JobConf job = new JobConf(ConfigurationManager.getCachedJobConf());
Path path = new Path(mo.getFileName());
FileInputFormat.addInputPath(job, path);
TextInputFormat informat = new TextInputFormat();
informat.configure(job);
InputSplit[] splits = informat.getSplits(job, 1);
LinkedList<Cell> buffer = new LinkedList<>();
LongWritable key = new LongWritable();
Text value = new Text();
DataCharacteristics mc = mo.getDataCharacteristics();
int blen = mc.getBlocksize();
//long row = -1, col = -1; //FIXME needs reconsideration whenever textcell is used actively
//NOTE MB: Originally, we used long row, col but this led reproducibly to JIT compilation
// errors during runtime; experienced under WINDOWS, Intel x86-64, IBM JDK 64bit/32bit.
// It works fine with int row, col but we require long for larger matrices.
// Since, textcell is never used for result merge (hybrid/hadoop: binaryblock, singlenode:binarycell)
// we just propose the to exclude it with -Xjit:exclude={package.method*}(count=0,optLevel=0)
FastStringTokenizer st = new FastStringTokenizer(' ');
for(InputSplit split : splits)
{
RecordReader<LongWritable,Text> reader = informat.getRecordReader(split, job, Reporter.NULL);
try
{
while(reader.next(key, value))
{
st.reset( value.toString() ); //reset tokenizer
long row = st.nextLong();
long col = st.nextLong();
double lvalue = Double.parseDouble( st.nextToken() );
Cell tmp = new Cell( row, col, lvalue );
buffer.addLast( tmp );
if( buffer.size() > StagingFileUtils.CELL_BUFFER_SIZE ) //periodic flush
{
appendCellBufferToStagingArea(fnameStaging, ID, buffer, blen);
buffer.clear();
}
}
//final flush
if( !buffer.isEmpty() )
{
appendCellBufferToStagingArea(fnameStaging, ID, buffer, blen);
buffer.clear();
}
}
finally {
IOUtilFunctions.closeSilently(reader);
}
}
}
private void partitionTextCell( String fname, String fnameStaging, String fnameNew, long rlen, long clen, int blen )
{
long row = -1;
long col = -1;
try
{
//STEP 1: read matrix from HDFS and write blocks to local staging area
//check and add input path
JobConf job = new JobConf(ConfigurationManager.getCachedJobConf());
Path path = new Path(fname);
FileInputFormat.addInputPath(job, path);
TextInputFormat informat = new TextInputFormat();
informat.configure(job);
InputSplit[] splits = informat.getSplits(job, 1);
LinkedList<Cell> buffer = new LinkedList<>();
LongWritable key = new LongWritable();
Text value = new Text();
FastStringTokenizer st = new FastStringTokenizer(' ');
for(InputSplit split: splits)
{
RecordReader<LongWritable,Text> reader = informat.getRecordReader(split, job, Reporter.NULL);
try
{
while(reader.next(key, value))
{
st.reset( value.toString() ); //reset tokenizer
row = st.nextLong();
col = st.nextLong();
double lvalue = st.nextDouble();
Cell tmp = new Cell( row, col, lvalue );
buffer.addLast( tmp );
if( buffer.size() > StagingFileUtils.CELL_BUFFER_SIZE ) //periodic flush
{
appendCellBufferToStagingArea(fnameStaging, buffer, blen);
buffer.clear();
}
}
//final flush
if( !buffer.isEmpty() )
{
appendCellBufferToStagingArea(fnameStaging, buffer, blen);
buffer.clear();
}
}
finally {
IOUtilFunctions.closeSilently(reader);
}
}
//STEP 2: read matrix blocks from staging area and write matrix to HDFS
String[] fnamesPartitions = new File(fnameStaging).list();
if(PARALLEL)
{
int len = Math.min(fnamesPartitions.length, _par);
Thread[] threads = new Thread[len];
for( int i=0;i<len;i++ )
{
int start = i*(int)Math.ceil(((double)fnamesPartitions.length)/len);
int end = (i+1)*(int)Math.ceil(((double)fnamesPartitions.length)/len)-1;
end = Math.min(end, fnamesPartitions.length-1);
threads[i] = new Thread(new DataPartitionerWorkerTextCell(job, fnameNew, fnameStaging, fnamesPartitions, start, end));
threads[i].start();
}
for( Thread t : threads )
t.join();
}
else
{
for( String pdir : fnamesPartitions )
writeTextCellFileToHDFS( job, fnameNew, fnameStaging+"/"+pdir );
}
}
catch (Exception e)
{
//post-mortem error handling and bounds checking
if( row < 1 || row > rlen || col < 1 || col > clen )
{
throw new DMLRuntimeException("Matrix cell ["+(row)+","+(col)+"] " +
"out of overall matrix range [1:"+rlen+",1:"+clen+"].");
}
else
throw new DMLRuntimeException("Unable to partition text cell matrix.", e);
}
}
private MatrixBlock computeLIBSVMSizeAndCreateOutputMatrixBlock(InputSplit[] splits, Path path,
JobConf job, long rlen, long clen, long estnnz)
throws IOException, DMLRuntimeException
{
int nrow = 0;
int ncol = (int) clen;
FileInputFormat.addInputPath(job, path);
TextInputFormat informat = new TextInputFormat();
informat.configure(job);
// count rows in parallel per split
try
{
ExecutorService pool = CommonThreadPool.get(_numThreads);
ArrayList<CountRowsTask> tasks = new ArrayList<>();
for (InputSplit split : splits) {
tasks.add(new CountRowsTask(split, informat, job));
}
pool.invokeAll(tasks);
pool.shutdown();
// collect row counts for offset computation
// early error notify in case not all tasks successful
_offsets = new SplitOffsetInfos(tasks.size());
for (CountRowsTask rt : tasks) {
if (!rt.getReturnCode())
throw new IOException("Count task for libsvm input failed: "+ rt.getErrMsg());
_offsets.setOffsetPerSplit(tasks.indexOf(rt), nrow);
_offsets.setLenghtPerSplit(tasks.indexOf(rt), rt.getRowCount());
nrow = nrow + rt.getRowCount();
}
}
catch (Exception e) {
throw new IOException("Threadpool Error " + e.getMessage(), e);
}
//robustness for wrong dimensions which are already compiled into the plan
if( (rlen != -1 && nrow != rlen) || (clen != -1 && ncol != clen) ) {
String msg = "Read matrix dimensions differ from meta data: ["+nrow+"x"+ncol+"] vs. ["+rlen+"x"+clen+"].";
if( rlen < nrow || clen < ncol ) {
//a) specified matrix dimensions too small
throw new DMLRuntimeException(msg);
}
else {
//b) specified matrix dimensions too large -> padding and warning
LOG.warn(msg);
nrow = (int) rlen;
ncol = (int) clen;
}
}
// allocate target matrix block based on given size;
// need to allocate sparse as well since lock-free insert into target
long estnnz2 = (estnnz < 0) ? (long)nrow * ncol : estnnz;
return createOutputMatrixBlock(nrow, ncol, nrow, estnnz2, true, true);
}
@Test(timeout=10000)
public void testFormat() throws Exception {
JobConf job = new JobConf(defaultConf);
Random random = new Random();
long seed = random.nextLong();
LOG.info("seed = "+seed);
random.setSeed(seed);
localFs.delete(workDir, true);
FileInputFormat.setInputPaths(job, workDir);
final int length = 10000;
final int numFiles = 10;
createFiles(length, numFiles, random);
// create a combined split for the files
TextInputFormat wrappedFormat = new TextInputFormat();
wrappedFormat.configure(job);
TezGroupedSplitsInputFormat<LongWritable , Text> format =
new TezGroupedSplitsInputFormat<LongWritable, Text>();
format.setConf(job);
format.setDesiredNumberOfSplits(1);
format.setInputFormat(wrappedFormat);
LongWritable key = new LongWritable();
Text value = new Text();
for (int i = 0; i < 3; i++) {
int numSplits = random.nextInt(length/20)+1;
LOG.info("splitting: requesting = " + numSplits);
InputSplit[] splits = format.getSplits(job, numSplits);
LOG.info("splitting: got = " + splits.length);
// we should have a single split as the length is comfortably smaller than
// the block size
assertEquals("We got more than one splits!", 1, splits.length);
InputSplit split = splits[0];
assertEquals("It should be TezGroupedSplit",
TezGroupedSplit.class, split.getClass());
// check the split
BitSet bits = new BitSet(length);
LOG.debug("split= " + split);
RecordReader<LongWritable, Text> reader =
format.getRecordReader(split, job, voidReporter);
try {
int count = 0;
while (reader.next(key, value)) {
int v = Integer.parseInt(value.toString());
LOG.debug("read " + v);
if (bits.get(v)) {
LOG.warn("conflict with " + v +
" at position "+reader.getPos());
}
assertFalse("Key in multiple partitions.", bits.get(v));
bits.set(v);
count++;
}
LOG.info("splits="+split+" count=" + count);
} finally {
reader.close();
}
assertEquals("Some keys in no partition.", length, bits.cardinality());
}
}
private static void mergeTextCellWithoutComp( String fnameNew, MatrixObject outMo, ArrayList<MatrixObject> inMO )
{
try
{
//delete target file if already exists
HDFSTool.deleteFileIfExistOnHDFS(fnameNew);
if( ALLOW_COPY_CELLFILES )
{
copyAllFiles(fnameNew, inMO);
return; //we're done
}
//actual merge
JobConf job = new JobConf(ConfigurationManager.getCachedJobConf());
Path path = new Path( fnameNew );
FileSystem fs = IOUtilFunctions.getFileSystem(path, job);
BufferedWriter out = new BufferedWriter(new OutputStreamWriter(fs.create(path,true)));
String valueStr = null;
try
{
for( MatrixObject in : inMO ) //read/write all inputs
{
if( LOG.isTraceEnabled() )
LOG.trace("ResultMerge (local, file): Merge input "+in.hashCode()+" (fname="
+in.getFileName()+") via stream merge");
JobConf tmpJob = new JobConf(ConfigurationManager.getCachedJobConf());
Path tmpPath = new Path(in.getFileName());
FileInputFormat.addInputPath(tmpJob, tmpPath);
TextInputFormat informat = new TextInputFormat();
informat.configure(tmpJob);
InputSplit[] splits = informat.getSplits(tmpJob, 1);
LongWritable key = new LongWritable();
Text value = new Text();
for(InputSplit split: splits)
{
RecordReader<LongWritable,Text> reader = informat.getRecordReader(split, tmpJob, Reporter.NULL);
try
{
while(reader.next(key, value))
{
valueStr = value.toString().trim();
out.write( valueStr+"\n" );
}
}
finally {
IOUtilFunctions.closeSilently(reader);
}
}
}
}
finally {
IOUtilFunctions.closeSilently(out);
}
}
catch(Exception ex)
{
throw new DMLRuntimeException("Unable to merge text cell results.", ex);
}
}
protected void readTextCellMatrixFromHDFS( Path path, JobConf job, MatrixBlock dest, long rlen, long clen, int blen )
throws IOException
{
boolean sparse = dest.isInSparseFormat();
FileInputFormat.addInputPath(job, path);
TextInputFormat informat = new TextInputFormat();
informat.configure(job);
InputSplit[] splits = informat.getSplits(job, 1);
LongWritable key = new LongWritable();
Text value = new Text();
IJV cell = new IJV();
long nnz = 0;
try
{
FastStringTokenizer st = new FastStringTokenizer(' ');
for(InputSplit split: splits) {
RecordReader<LongWritable,Text> reader = informat.getRecordReader(split, job, Reporter.NULL);
try {
if( sparse ) { //SPARSE<-value
while( reader.next(key, value) ) {
cell = parseCell(value.toString(), st, cell, _mmProps);
appendCell(cell, dest, _mmProps);
}
dest.sortSparseRows();
}
else { //DENSE<-value
DenseBlock a = dest.getDenseBlock();
while( reader.next(key, value) ) {
cell = parseCell(value.toString(), st, cell, _mmProps);
nnz += appendCell(cell, a, _mmProps);
}
}
}
finally {
IOUtilFunctions.closeSilently(reader);
}
}
if( !dest.isInSparseFormat() )
dest.setNonZeros(nnz);
}
catch(Exception ex) {
//post-mortem error handling and bounds checking
if( cell.getI() < 0 || cell.getI() + 1 > rlen || cell.getJ() < 0 || cell.getJ() + 1 > clen )
throw new IOException("Matrix cell ["+(cell.getI()+1)+","+(cell.getJ()+1)+"] "
+ "out of overall matrix range [1:"+rlen+",1:"+clen+"].");
else
throw new IOException( "Unable to read matrix in text cell format.", ex );
}
}
private MatrixBlock computeCSVSizeAndCreateOutputMatrixBlock(InputSplit[] splits, Path path,
JobConf job, boolean hasHeader, String delim, long rlen, long clen, long estnnz)
throws IOException, DMLRuntimeException
{
int nrow = 0;
int ncol = 0;
FileInputFormat.addInputPath(job, path);
TextInputFormat informat = new TextInputFormat();
informat.configure(job);
// count no of entities in the first non-header row
LongWritable key = new LongWritable();
Text oneLine = new Text();
RecordReader<LongWritable, Text> reader = informat
.getRecordReader(splits[0], job, Reporter.NULL);
try {
if (reader.next(key, oneLine)) {
String cellStr = oneLine.toString().trim();
ncol = StringUtils.countMatches(cellStr, delim) + 1;
}
}
finally {
IOUtilFunctions.closeSilently(reader);
}
// count rows in parallel per split
try
{
ExecutorService pool = CommonThreadPool.get(_numThreads);
ArrayList<CountRowsTask> tasks = new ArrayList<>();
for (InputSplit split : splits) {
tasks.add(new CountRowsTask(split, informat, job, hasHeader));
hasHeader = false;
}
List<Future<Long>> ret = pool.invokeAll(tasks);
pool.shutdown();
// collect row counts for offset computation
// early error notify in case not all tasks successful
_offsets = new SplitOffsetInfos(tasks.size());
for (Future<Long> rc : ret) {
int lnrow = (int)rc.get().longValue(); //incl error handling
_offsets.setOffsetPerSplit(ret.indexOf(rc), nrow);
_offsets.setLenghtPerSplit(ret.indexOf(rc), lnrow);
nrow = nrow + lnrow;
}
}
catch (Exception e) {
throw new IOException("Threadpool Error " + e.getMessage(), e);
}
//robustness for wrong dimensions which are already compiled into the plan
if( (rlen != -1 && nrow != rlen) || (clen != -1 && ncol != clen) ) {
String msg = "Read matrix dimensions differ from meta data: ["+nrow+"x"+ncol+"] vs. ["+rlen+"x"+clen+"].";
if( rlen < nrow || clen < ncol ) {
//a) specified matrix dimensions too small
throw new DMLRuntimeException(msg);
}
else {
//b) specified matrix dimensions too large -> padding and warning
LOG.warn(msg);
nrow = (int) rlen;
ncol = (int) clen;
}
}
// allocate target matrix block based on given size;
// need to allocate sparse as well since lock-free insert into target
long estnnz2 = (estnnz < 0) ? (long)nrow * ncol : estnnz;
return createOutputMatrixBlock(nrow, ncol, nrow, estnnz2, true, true);
}
private MatrixBlock computeLIBSVMSizeAndCreateOutputMatrixBlock(InputSplit[] splits, Path path,
JobConf job, long rlen, long clen, long estnnz)
throws IOException, DMLRuntimeException
{
int nrow = 0;
int ncol = (int) clen;
FileInputFormat.addInputPath(job, path);
TextInputFormat informat = new TextInputFormat();
informat.configure(job);
// count rows in parallel per split
try
{
ExecutorService pool = CommonThreadPool.get(_numThreads);
ArrayList<CountRowsTask> tasks = new ArrayList<>();
for (InputSplit split : splits) {
tasks.add(new CountRowsTask(split, informat, job));
}
pool.invokeAll(tasks);
pool.shutdown();
// collect row counts for offset computation
// early error notify in case not all tasks successful
_offsets = new SplitOffsetInfos(tasks.size());
for (CountRowsTask rt : tasks) {
if (!rt.getReturnCode())
throw new IOException("Count task for libsvm input failed: "+ rt.getErrMsg());
_offsets.setOffsetPerSplit(tasks.indexOf(rt), nrow);
_offsets.setLenghtPerSplit(tasks.indexOf(rt), rt.getRowCount());
nrow = nrow + rt.getRowCount();
}
}
catch (Exception e) {
throw new IOException("Threadpool Error " + e.getMessage(), e);
}
//robustness for wrong dimensions which are already compiled into the plan
if( (rlen != -1 && nrow != rlen) || (clen != -1 && ncol != clen) ) {
String msg = "Read matrix dimensions differ from meta data: ["+nrow+"x"+ncol+"] vs. ["+rlen+"x"+clen+"].";
if( rlen < nrow || clen < ncol ) {
//a) specified matrix dimensions too small
throw new DMLRuntimeException(msg);
}
else {
//b) specified matrix dimensions too large -> padding and warning
LOG.warn(msg);
nrow = (int) rlen;
ncol = (int) clen;
}
}
// allocate target matrix block based on given size;
// need to allocate sparse as well since lock-free insert into target
long estnnz2 = (estnnz < 0) ? (long)nrow * ncol : estnnz;
return createOutputMatrixBlock(nrow, ncol, nrow, estnnz2, true, true);
}
/**
* Test using the gzip codec for reading
*/
@Test(timeout=10000)
public void testGzip() throws IOException {
JobConf job = new JobConf(defaultConf);
CompressionCodec gzip = new GzipCodec();
ReflectionUtils.setConf(gzip, job);
localFs.delete(workDir, true);
writeFile(localFs, new Path(workDir, "part1.txt.gz"), gzip,
"the quick\nbrown\nfox jumped\nover\n the lazy\n dog\n");
writeFile(localFs, new Path(workDir, "part2.txt.gz"), gzip,
"is\ngzip\n");
writeFile(localFs, new Path(workDir, "part3.txt.gz"), gzip,
"one\nmore\nsplit\n");
FileInputFormat.setInputPaths(job, workDir);
TextInputFormat wrappedFormat = new TextInputFormat();
wrappedFormat.configure(job);
TezGroupedSplitsInputFormat<LongWritable , Text> format =
new TezGroupedSplitsInputFormat<LongWritable, Text>();
format.setConf(job);
format.setInputFormat(wrappedFormat);
// TextInputFormat will produce 3 splits
for (int j=1; j<=3; ++j) {
format.setDesiredNumberOfSplits(j);
InputSplit[] splits = format.getSplits(job, 100);
if (j==1) {
// j==1 covers single split corner case
// and does not do grouping
assertEquals("compressed splits == " + j, j, splits.length);
}
List<Text> results = new ArrayList<Text>();
for (int i=0; i<splits.length; ++i) {
List<Text> read = readSplit(format, splits[i], job);
results.addAll(read);
}
assertEquals("splits length", 11, results.size());
final String[] firstList =
{"the quick", "brown", "fox jumped", "over", " the lazy", " dog"};
final String[] secondList = {"is", "gzip"};
final String[] thirdList = {"one", "more", "split"};
String first = results.get(0).toString();
int start = 0;
switch (first.charAt(0)) {
case 't':
start = testResults(results, firstList, start);
break;
case 'i':
start = testResults(results, secondList, start);
break;
case 'o':
start = testResults(results, thirdList, start);
break;
default:
Assert.fail("unexpected first token - " + first);
}
}
}