下面列出了org.apache.hadoop.mapred.TextInputFormat#getSplits ( ) 实例代码,或者点击链接到github查看源代码,也可以在右侧发表评论。
protected void readTextCellFrameFromHDFS( Path path, JobConf job, FileSystem fs, FrameBlock dest,
ValueType[] schema, String[] names, long rlen, long clen)
throws IOException
{
if( fs.isDirectory(path) ) {
FileInputFormat.addInputPath(job, path);
TextInputFormat informat = new TextInputFormat();
informat.configure(job);
InputSplit[] splits = informat.getSplits(job, 1);
for(InputSplit split: splits)
readTextCellFrameFromInputSplit(split, informat, job, dest);
}
else {
readRawTextCellFrameFromHDFS(path, job, fs, dest, schema, names, rlen, clen);
}
}
protected void readTextCellFrameFromHDFS( Path path, JobConf job, FileSystem fs, FrameBlock dest,
ValueType[] schema, String[] names, long rlen, long clen)
throws IOException
{
if( fs.isDirectory(path) ) {
FileInputFormat.addInputPath(job, path);
TextInputFormat informat = new TextInputFormat();
informat.configure(job);
InputSplit[] splits = informat.getSplits(job, 1);
for(InputSplit split: splits)
readTextCellFrameFromInputSplit(split, informat, job, dest);
}
else {
readRawTextCellFrameFromHDFS(path, job, fs, dest, schema, names, rlen, clen);
}
}
@Override
protected TensorBlock readTextCellTensorFromHDFS(Path path, JobConf job, long[] dims,
Types.ValueType[] schema) throws IOException {
FileInputFormat.addInputPath(job, path);
TextInputFormat informat = new TextInputFormat();
informat.configure(job);
int[] idims = Arrays.stream(dims).mapToInt(i -> (int) i).toArray();
TensorBlock ret;
if( schema.length == 1 )
ret = new TensorBlock(schema[0], idims).allocateBlock();
else
ret = new TensorBlock(schema, idims).allocateBlock();
try {
ExecutorService pool = CommonThreadPool.get(_numThreads);
InputSplit[] splits = informat.getSplits(job, _numThreads);
//create and execute read tasks for all splits
List<TensorReaderTextCellParallel.ReadTask> tasks = Arrays.stream(splits)
.map(s -> new TensorReaderTextCellParallel.ReadTask(s, informat, job, ret))
.collect(Collectors.toList());
List<Future<Object>> rt = pool.invokeAll(tasks);
//check for exceptions
for (Future<Object> task : rt)
task.get();
pool.shutdown();
}
catch (Exception e) {
throw new IOException("Threadpool issue, while parallel read.", e);
}
return ret;
}
protected void readCSVFrameFromHDFS( Path path, JobConf job, FileSystem fs,
FrameBlock dest, ValueType[] schema, String[] names, long rlen, long clen)
throws IOException
{
TextInputFormat informat = new TextInputFormat();
informat.configure(job);
InputSplit[] splits = informat.getSplits(job, 1);
splits = IOUtilFunctions.sortInputSplits(splits);
for( int i=0, rpos=0; i<splits.length; i++ )
rpos = readCSVFrameFromInputSplit(splits[i], informat,
job, dest, schema, names, rlen, clen, rpos, i==0);
}
@Override
protected Pair<Integer,Integer> computeCSVSize( Path path, JobConf job, FileSystem fs)
throws IOException
{
int numThreads = OptimizerUtils.getParallelTextReadParallelism();
TextInputFormat informat = new TextInputFormat();
informat.configure(job);
InputSplit[] splits = informat.getSplits(job, numThreads);
//compute number of columns
int ncol = IOUtilFunctions.countNumColumnsCSV(splits, informat, job, _props.getDelim());
//compute number of rows
int nrow = 0;
ExecutorService pool = CommonThreadPool.get(numThreads);
try {
ArrayList<CountRowsTask> tasks = new ArrayList<>();
for( int i=0; i<splits.length; i++ )
tasks.add(new CountRowsTask(splits[i], informat, job, _props.hasHeader(), i==0));
List<Future<Long>> cret = pool.invokeAll(tasks);
for( Future<Long> count : cret )
nrow += count.get().intValue();
}
catch (Exception e) {
throw new IOException("Failed parallel read of text csv input.", e);
}
finally {
pool.shutdown();
}
return new Pair<>(nrow, ncol);
}
protected Pair<Integer,Integer> computeCSVSize( Path path, JobConf job, FileSystem fs)
throws IOException
{
TextInputFormat informat = new TextInputFormat();
informat.configure(job);
InputSplit[] splits = informat.getSplits(job, 1);
splits = IOUtilFunctions.sortInputSplits(splits);
//compute number of columns
int ncol = IOUtilFunctions.countNumColumnsCSV(splits, informat, job, _props.getDelim());
//compute number of rows
int nrow = 0;
for( int i=0; i<splits.length; i++ )
{
RecordReader<LongWritable, Text> reader = informat.getRecordReader(splits[i], job, Reporter.NULL);
LongWritable key = new LongWritable();
Text value = new Text();
try
{
//ignore header of first split
if( i==0 && _props.hasHeader() )
reader.next(key, value);
//count remaining number of rows, ignore meta data
while ( reader.next(key, value) ) {
String val = value.toString();
nrow += ( val.startsWith(TfUtils.TXMTD_MVPREFIX)
|| val.startsWith(TfUtils.TXMTD_NDPREFIX)) ? 0 : 1;
}
}
finally {
IOUtilFunctions.closeSilently(reader);
}
}
return new Pair<>(nrow, ncol);
}
protected Pair<Integer,Integer> computeCSVSize( Path path, JobConf job, FileSystem fs)
throws IOException
{
TextInputFormat informat = new TextInputFormat();
informat.configure(job);
InputSplit[] splits = informat.getSplits(job, 1);
splits = IOUtilFunctions.sortInputSplits(splits);
//compute number of columns
int ncol = IOUtilFunctions.countNumColumnsCSV(splits, informat, job, _props.getDelim());
//compute number of rows
int nrow = 0;
for( int i=0; i<splits.length; i++ )
{
RecordReader<LongWritable, Text> reader = informat.getRecordReader(splits[i], job, Reporter.NULL);
LongWritable key = new LongWritable();
Text value = new Text();
try
{
//ignore header of first split
if( i==0 && _props.hasHeader() )
reader.next(key, value);
//count remaining number of rows, ignore meta data
while ( reader.next(key, value) ) {
String val = value.toString();
nrow += ( val.startsWith(TfUtils.TXMTD_MVPREFIX)
|| val.startsWith(TfUtils.TXMTD_NDPREFIX)) ? 0 : 1;
}
}
finally {
IOUtilFunctions.closeSilently(reader);
}
}
return new Pair<>(nrow, ncol);
}
@Override
protected TensorBlock readTextCellTensorFromHDFS(Path path, JobConf job, long[] dims,
Types.ValueType[] schema) throws IOException {
FileInputFormat.addInputPath(job, path);
TextInputFormat informat = new TextInputFormat();
informat.configure(job);
int[] idims = Arrays.stream(dims).mapToInt(i -> (int) i).toArray();
TensorBlock ret;
if( schema.length == 1 )
ret = new TensorBlock(schema[0], idims).allocateBlock();
else
ret = new TensorBlock(schema, idims).allocateBlock();
try {
ExecutorService pool = CommonThreadPool.get(_numThreads);
InputSplit[] splits = informat.getSplits(job, _numThreads);
//create and execute read tasks for all splits
List<TensorReaderTextCellParallel.ReadTask> tasks = Arrays.stream(splits)
.map(s -> new TensorReaderTextCellParallel.ReadTask(s, informat, job, ret))
.collect(Collectors.toList());
List<Future<Object>> rt = pool.invokeAll(tasks);
//check for exceptions
for (Future<Object> task : rt)
task.get();
pool.shutdown();
}
catch (Exception e) {
throw new IOException("Threadpool issue, while parallel read.", e);
}
return ret;
}
@Override
public MatrixBlock readMatrixFromHDFS(String fname, long rlen, long clen,
int blen, long estnnz)
throws IOException, DMLRuntimeException
{
// prepare file access
JobConf job = new JobConf(ConfigurationManager.getCachedJobConf());
Path path = new Path(fname);
FileSystem fs = IOUtilFunctions.getFileSystem(path, job);
FileInputFormat.addInputPath(job, path);
TextInputFormat informat = new TextInputFormat();
informat.configure(job);
InputSplit[] splits = informat.getSplits(job, _numThreads);
splits = IOUtilFunctions.sortInputSplits(splits);
// check existence and non-empty file
checkValidInputFile(fs, path);
// allocate output matrix block
// First Read Pass (count rows/cols, determine offsets, allocate matrix block)
MatrixBlock ret = computeLIBSVMSizeAndCreateOutputMatrixBlock(splits, path, job, rlen, clen, estnnz);
rlen = ret.getNumRows();
clen = ret.getNumColumns();
// Second Read Pass (read, parse strings, append to matrix block)
readLIBSVMMatrixFromHDFS(splits, path, job, ret, rlen, clen, blen);
//post-processing (representation-specific, change of sparse/dense block representation)
// - nnz explicitly maintained in parallel for the individual splits
ret.examSparsity();
// sanity check for parallel row count (since determined internally)
if (rlen >= 0 && rlen != ret.getNumRows())
throw new DMLRuntimeException("Read matrix inconsistent with given meta data: "
+ "expected nrow="+ rlen + ", real nrow=" + ret.getNumRows());
return ret;
}
@Override
protected void readTextCellFrameFromHDFS( Path path, JobConf job, FileSystem fs, FrameBlock dest,
ValueType[] schema, String[] names, long rlen, long clen)
throws IOException
{
int numThreads = OptimizerUtils.getParallelTextReadParallelism();
FileInputFormat.addInputPath(job, path);
TextInputFormat informat = new TextInputFormat();
informat.configure(job);
try
{
//create read tasks for all splits
ExecutorService pool = CommonThreadPool.get(numThreads);
InputSplit[] splits = informat.getSplits(job, numThreads);
ArrayList<ReadTask> tasks = new ArrayList<>();
for( InputSplit split : splits )
tasks.add(new ReadTask(split, informat, job, dest));
//wait until all tasks have been executed
List<Future<Object>> rt = pool.invokeAll(tasks);
pool.shutdown();
//check for exceptions
for( Future<Object> task : rt )
task.get();
}
catch (Exception e) {
throw new IOException("Failed parallel read of text cell input.", e);
}
}
@Override
protected void readTextCellFrameFromHDFS( Path path, JobConf job, FileSystem fs, FrameBlock dest,
ValueType[] schema, String[] names, long rlen, long clen)
throws IOException
{
int numThreads = OptimizerUtils.getParallelTextReadParallelism();
FileInputFormat.addInputPath(job, path);
TextInputFormat informat = new TextInputFormat();
informat.configure(job);
try
{
//create read tasks for all splits
ExecutorService pool = CommonThreadPool.get(numThreads);
InputSplit[] splits = informat.getSplits(job, numThreads);
ArrayList<ReadTask> tasks = new ArrayList<>();
for( InputSplit split : splits )
tasks.add(new ReadTask(split, informat, job, dest));
//wait until all tasks have been executed
List<Future<Object>> rt = pool.invokeAll(tasks);
pool.shutdown();
//check for exceptions
for( Future<Object> task : rt )
task.get();
}
catch (Exception e) {
throw new IOException("Failed parallel read of text cell input.", e);
}
}
@Override
public MatrixBlock readMatrixFromHDFS(String fname, long rlen, long clen,
int blen, long estnnz)
throws IOException, DMLRuntimeException
{
// prepare file access
JobConf job = new JobConf(ConfigurationManager.getCachedJobConf());
Path path = new Path(fname);
FileSystem fs = IOUtilFunctions.getFileSystem(path, job);
FileInputFormat.addInputPath(job, path);
TextInputFormat informat = new TextInputFormat();
informat.configure(job);
InputSplit[] splits = informat.getSplits(job, _numThreads);
splits = IOUtilFunctions.sortInputSplits(splits);
// check existence and non-empty file
checkValidInputFile(fs, path);
// allocate output matrix block
// First Read Pass (count rows/cols, determine offsets, allocate matrix block)
MatrixBlock ret = computeLIBSVMSizeAndCreateOutputMatrixBlock(splits, path, job, rlen, clen, estnnz);
rlen = ret.getNumRows();
clen = ret.getNumColumns();
// Second Read Pass (read, parse strings, append to matrix block)
readLIBSVMMatrixFromHDFS(splits, path, job, ret, rlen, clen, blen);
//post-processing (representation-specific, change of sparse/dense block representation)
// - nnz explicitly maintained in parallel for the individual splits
ret.examSparsity();
// sanity check for parallel row count (since determined internally)
if (rlen >= 0 && rlen != ret.getNumRows())
throw new DMLRuntimeException("Read matrix inconsistent with given meta data: "
+ "expected nrow="+ rlen + ", real nrow=" + ret.getNumRows());
return ret;
}
@Override
protected void readJSONLFrameFromHDFS(Path path, JobConf jobConf, FileSystem fileSystem,
FrameBlock dest, Types.ValueType[] schema, Map<String, Integer> schemaMap)
throws IOException
{
int numThreads = OptimizerUtils.getParallelTextReadParallelism();
TextInputFormat inputFormat = new TextInputFormat();
inputFormat.configure(jobConf);
InputSplit[] splits = inputFormat.getSplits(jobConf, numThreads);
splits = IOUtilFunctions.sortInputSplits(splits);
try{
ExecutorService executorPool = CommonThreadPool.get(Math.min(numThreads, splits.length));
//compute num rows per split
ArrayList<CountRowsTask> countRowsTasks = new ArrayList<>();
for (InputSplit split : splits){
countRowsTasks.add(new CountRowsTask(split, inputFormat, jobConf));
}
List<Future<Long>> ret = executorPool.invokeAll(countRowsTasks);
//compute row offset per split via cumsum on row counts
long offset = 0;
List<Long> offsets = new ArrayList<>();
for( Future<Long> rc : ret ) {
offsets.add(offset);
offset += rc.get();
}
//read individual splits
ArrayList<ReadRowsTask> readRowsTasks = new ArrayList<>();
for( int i=0; i<splits.length; i++ )
readRowsTasks.add(new ReadRowsTask(splits[i], inputFormat,
jobConf, dest, schemaMap, offsets.get(i).intValue()));
CommonThreadPool.invokeAndShutdown(executorPool, readRowsTasks);
}
catch (Exception e) {
throw new IOException("Failed parallel read of JSONL input.", e);
}
}
private void partitionTextCell( String fname, String fnameStaging, String fnameNew, long rlen, long clen, int blen )
{
long row = -1;
long col = -1;
try
{
//STEP 1: read matrix from HDFS and write blocks to local staging area
//check and add input path
JobConf job = new JobConf(ConfigurationManager.getCachedJobConf());
Path path = new Path(fname);
FileInputFormat.addInputPath(job, path);
TextInputFormat informat = new TextInputFormat();
informat.configure(job);
InputSplit[] splits = informat.getSplits(job, 1);
LinkedList<Cell> buffer = new LinkedList<>();
LongWritable key = new LongWritable();
Text value = new Text();
FastStringTokenizer st = new FastStringTokenizer(' ');
for(InputSplit split: splits)
{
RecordReader<LongWritable,Text> reader = informat.getRecordReader(split, job, Reporter.NULL);
try
{
while(reader.next(key, value))
{
st.reset( value.toString() ); //reset tokenizer
row = st.nextLong();
col = st.nextLong();
double lvalue = st.nextDouble();
Cell tmp = new Cell( row, col, lvalue );
buffer.addLast( tmp );
if( buffer.size() > StagingFileUtils.CELL_BUFFER_SIZE ) //periodic flush
{
appendCellBufferToStagingArea(fnameStaging, buffer, blen);
buffer.clear();
}
}
//final flush
if( !buffer.isEmpty() )
{
appendCellBufferToStagingArea(fnameStaging, buffer, blen);
buffer.clear();
}
}
finally {
IOUtilFunctions.closeSilently(reader);
}
}
//STEP 2: read matrix blocks from staging area and write matrix to HDFS
String[] fnamesPartitions = new File(fnameStaging).list();
if(PARALLEL)
{
int len = Math.min(fnamesPartitions.length, _par);
Thread[] threads = new Thread[len];
for( int i=0;i<len;i++ )
{
int start = i*(int)Math.ceil(((double)fnamesPartitions.length)/len);
int end = (i+1)*(int)Math.ceil(((double)fnamesPartitions.length)/len)-1;
end = Math.min(end, fnamesPartitions.length-1);
threads[i] = new Thread(new DataPartitionerWorkerTextCell(job, fnameNew, fnameStaging, fnamesPartitions, start, end));
threads[i].start();
}
for( Thread t : threads )
t.join();
}
else
{
for( String pdir : fnamesPartitions )
writeTextCellFileToHDFS( job, fnameNew, fnameStaging+"/"+pdir );
}
}
catch (Exception e)
{
//post-mortem error handling and bounds checking
if( row < 1 || row > rlen || col < 1 || col > clen )
{
throw new DMLRuntimeException("Matrix cell ["+(row)+","+(col)+"] " +
"out of overall matrix range [1:"+rlen+",1:"+clen+"].");
}
else
throw new DMLRuntimeException("Unable to partition text cell matrix.", e);
}
}
@Override
public MatrixBlock readMatrixFromHDFS(String fname, long rlen, long clen,
int blen, long estnnz)
throws IOException, DMLRuntimeException
{
// prepare file access
JobConf job = new JobConf(ConfigurationManager.getCachedJobConf());
Path path = new Path(fname);
FileSystem fs = IOUtilFunctions.getFileSystem(path, job);
FileInputFormat.addInputPath(job, path);
TextInputFormat informat = new TextInputFormat();
informat.configure(job);
InputSplit[] splits = informat.getSplits(job, _numThreads);
splits = IOUtilFunctions.sortInputSplits(splits);
// check existence and non-empty file
checkValidInputFile(fs, path);
// allocate output matrix block
// First Read Pass (count rows/cols, determine offsets, allocate matrix block)
MatrixBlock ret = computeCSVSizeAndCreateOutputMatrixBlock(splits, path, job,
_props.hasHeader(), _props.getDelim(), rlen, clen, estnnz);
rlen = ret.getNumRows();
clen = ret.getNumColumns();
// Second Read Pass (read, parse strings, append to matrix block)
readCSVMatrixFromHDFS(splits, path, job, ret, rlen, clen, blen,
_props.hasHeader(), _props.getDelim(), _props.isFill(),
_props.getFillValue());
//post-processing (representation-specific, change of sparse/dense block representation)
// - no sorting required for CSV because it is read in sorted order per row
// - nnz explicitly maintained in parallel for the individual splits
ret.examSparsity();
// sanity check for parallel row count (since determined internally)
if (rlen >= 0 && rlen != ret.getNumRows())
throw new DMLRuntimeException("Read matrix inconsistent with given meta data: "
+ "expected nrow="+ rlen + ", real nrow=" + ret.getNumRows());
return ret;
}
@Override
protected void readCSVFrameFromHDFS( Path path, JobConf job, FileSystem fs,
FrameBlock dest, ValueType[] schema, String[] names, long rlen, long clen)
throws IOException
{
int numThreads = OptimizerUtils.getParallelTextReadParallelism();
TextInputFormat informat = new TextInputFormat();
informat.configure(job);
InputSplit[] splits = informat.getSplits(job, numThreads);
splits = IOUtilFunctions.sortInputSplits(splits);
try
{
ExecutorService pool = CommonThreadPool.get(
Math.min(numThreads, splits.length));
//compute num rows per split
ArrayList<CountRowsTask> tasks = new ArrayList<>();
for( int i=0; i<splits.length; i++ )
tasks.add(new CountRowsTask(splits[i], informat, job, _props.hasHeader(), i==0));
List<Future<Long>> cret = pool.invokeAll(tasks);
//compute row offset per split via cumsum on row counts
long offset = 0;
List<Long> offsets = new ArrayList<>();
for( Future<Long> count : cret ) {
offsets.add(offset);
offset += count.get();
}
//read individual splits
ArrayList<ReadRowsTask> tasks2 = new ArrayList<>();
for( int i=0; i<splits.length; i++ )
tasks2.add( new ReadRowsTask(splits[i], informat, job, dest, offsets.get(i).intValue(), i==0));
CommonThreadPool.invokeAndShutdown(pool, tasks2);
}
catch (Exception e) {
throw new IOException("Failed parallel read of text csv input.", e);
}
}
protected void readTextCellMatrixFromHDFS( Path path, JobConf job, MatrixBlock dest, long rlen, long clen, int blen )
throws IOException
{
boolean sparse = dest.isInSparseFormat();
FileInputFormat.addInputPath(job, path);
TextInputFormat informat = new TextInputFormat();
informat.configure(job);
InputSplit[] splits = informat.getSplits(job, 1);
LongWritable key = new LongWritable();
Text value = new Text();
IJV cell = new IJV();
long nnz = 0;
try
{
FastStringTokenizer st = new FastStringTokenizer(' ');
for(InputSplit split: splits) {
RecordReader<LongWritable,Text> reader = informat.getRecordReader(split, job, Reporter.NULL);
try {
if( sparse ) { //SPARSE<-value
while( reader.next(key, value) ) {
cell = parseCell(value.toString(), st, cell, _mmProps);
appendCell(cell, dest, _mmProps);
}
dest.sortSparseRows();
}
else { //DENSE<-value
DenseBlock a = dest.getDenseBlock();
while( reader.next(key, value) ) {
cell = parseCell(value.toString(), st, cell, _mmProps);
nnz += appendCell(cell, a, _mmProps);
}
}
}
finally {
IOUtilFunctions.closeSilently(reader);
}
}
if( !dest.isInSparseFormat() )
dest.setNonZeros(nnz);
}
catch(Exception ex) {
//post-mortem error handling and bounds checking
if( cell.getI() < 0 || cell.getI() + 1 > rlen || cell.getJ() < 0 || cell.getJ() + 1 > clen )
throw new IOException("Matrix cell ["+(cell.getI()+1)+","+(cell.getJ()+1)+"] "
+ "out of overall matrix range [1:"+rlen+",1:"+clen+"].");
else
throw new IOException( "Unable to read matrix in text cell format.", ex );
}
}
protected void readTextCellMatrixFromHDFS( Path path, JobConf job, MatrixBlock dest, long rlen, long clen, int blen )
throws IOException
{
boolean sparse = dest.isInSparseFormat();
FileInputFormat.addInputPath(job, path);
TextInputFormat informat = new TextInputFormat();
informat.configure(job);
InputSplit[] splits = informat.getSplits(job, 1);
LongWritable key = new LongWritable();
Text value = new Text();
IJV cell = new IJV();
long nnz = 0;
try
{
FastStringTokenizer st = new FastStringTokenizer(' ');
for(InputSplit split: splits) {
RecordReader<LongWritable,Text> reader = informat.getRecordReader(split, job, Reporter.NULL);
try {
if( sparse ) { //SPARSE<-value
while( reader.next(key, value) ) {
cell = parseCell(value.toString(), st, cell, _mmProps);
appendCell(cell, dest, _mmProps);
}
dest.sortSparseRows();
}
else { //DENSE<-value
DenseBlock a = dest.getDenseBlock();
while( reader.next(key, value) ) {
cell = parseCell(value.toString(), st, cell, _mmProps);
nnz += appendCell(cell, a, _mmProps);
}
}
}
finally {
IOUtilFunctions.closeSilently(reader);
}
}
if( !dest.isInSparseFormat() )
dest.setNonZeros(nnz);
}
catch(Exception ex) {
//post-mortem error handling and bounds checking
if( cell.getI() < 0 || cell.getI() + 1 > rlen || cell.getJ() < 0 || cell.getJ() + 1 > clen )
throw new IOException("Matrix cell ["+(cell.getI()+1)+","+(cell.getJ()+1)+"] "
+ "out of overall matrix range [1:"+rlen+",1:"+clen+"].");
else
throw new IOException( "Unable to read matrix in text cell format.", ex );
}
}
private static void mergeTextCellWithoutComp( String fnameNew, MatrixObject outMo, ArrayList<MatrixObject> inMO )
{
try
{
//delete target file if already exists
HDFSTool.deleteFileIfExistOnHDFS(fnameNew);
if( ALLOW_COPY_CELLFILES )
{
copyAllFiles(fnameNew, inMO);
return; //we're done
}
//actual merge
JobConf job = new JobConf(ConfigurationManager.getCachedJobConf());
Path path = new Path( fnameNew );
FileSystem fs = IOUtilFunctions.getFileSystem(path, job);
BufferedWriter out = new BufferedWriter(new OutputStreamWriter(fs.create(path,true)));
String valueStr = null;
try
{
for( MatrixObject in : inMO ) //read/write all inputs
{
if( LOG.isTraceEnabled() )
LOG.trace("ResultMerge (local, file): Merge input "+in.hashCode()+" (fname="
+in.getFileName()+") via stream merge");
JobConf tmpJob = new JobConf(ConfigurationManager.getCachedJobConf());
Path tmpPath = new Path(in.getFileName());
FileInputFormat.addInputPath(tmpJob, tmpPath);
TextInputFormat informat = new TextInputFormat();
informat.configure(tmpJob);
InputSplit[] splits = informat.getSplits(tmpJob, 1);
LongWritable key = new LongWritable();
Text value = new Text();
for(InputSplit split: splits)
{
RecordReader<LongWritable,Text> reader = informat.getRecordReader(split, tmpJob, Reporter.NULL);
try
{
while(reader.next(key, value))
{
valueStr = value.toString().trim();
out.write( valueStr+"\n" );
}
}
finally {
IOUtilFunctions.closeSilently(reader);
}
}
}
}
finally {
IOUtilFunctions.closeSilently(out);
}
}
catch(Exception ex)
{
throw new DMLRuntimeException("Unable to merge text cell results.", ex);
}
}
@Override
public MatrixBlock readMatrixFromHDFS(String fname, long rlen, long clen,
int blen, long estnnz)
throws IOException, DMLRuntimeException
{
// prepare file access
JobConf job = new JobConf(ConfigurationManager.getCachedJobConf());
Path path = new Path(fname);
FileSystem fs = IOUtilFunctions.getFileSystem(path, job);
FileInputFormat.addInputPath(job, path);
TextInputFormat informat = new TextInputFormat();
informat.configure(job);
InputSplit[] splits = informat.getSplits(job, _numThreads);
splits = IOUtilFunctions.sortInputSplits(splits);
// check existence and non-empty file
checkValidInputFile(fs, path);
// allocate output matrix block
// First Read Pass (count rows/cols, determine offsets, allocate matrix block)
MatrixBlock ret = computeCSVSizeAndCreateOutputMatrixBlock(splits, path, job,
_props.hasHeader(), _props.getDelim(), rlen, clen, estnnz);
rlen = ret.getNumRows();
clen = ret.getNumColumns();
// Second Read Pass (read, parse strings, append to matrix block)
readCSVMatrixFromHDFS(splits, path, job, ret, rlen, clen, blen,
_props.hasHeader(), _props.getDelim(), _props.isFill(),
_props.getFillValue());
//post-processing (representation-specific, change of sparse/dense block representation)
// - no sorting required for CSV because it is read in sorted order per row
// - nnz explicitly maintained in parallel for the individual splits
ret.examSparsity();
// sanity check for parallel row count (since determined internally)
if (rlen >= 0 && rlen != ret.getNumRows())
throw new DMLRuntimeException("Read matrix inconsistent with given meta data: "
+ "expected nrow="+ rlen + ", real nrow=" + ret.getNumRows());
return ret;
}