org.apache.hadoop.mapred.TextInputFormat#getSplits ( )源码实例Demo

下面列出了org.apache.hadoop.mapred.TextInputFormat#getSplits ( ) 实例代码,或者点击链接到github查看源代码,也可以在右侧发表评论。

源代码1 项目: systemds   文件: FrameReaderTextCell.java
protected void readTextCellFrameFromHDFS( Path path, JobConf job, FileSystem fs, FrameBlock dest, 
		ValueType[] schema, String[] names, long rlen, long clen)
	throws IOException
{
	if( fs.isDirectory(path) ) {
		FileInputFormat.addInputPath(job, path);
		TextInputFormat informat = new TextInputFormat();
		informat.configure(job);
		InputSplit[] splits = informat.getSplits(job, 1);
		for(InputSplit split: splits)
			readTextCellFrameFromInputSplit(split, informat, job, dest);
	}
	else {
		readRawTextCellFrameFromHDFS(path, job, fs, dest, schema, names, rlen, clen);
	}
}
 
源代码2 项目: systemds   文件: FrameReaderTextCell.java
protected void readTextCellFrameFromHDFS( Path path, JobConf job, FileSystem fs, FrameBlock dest, 
		ValueType[] schema, String[] names, long rlen, long clen)
	throws IOException
{
	if( fs.isDirectory(path) ) {
		FileInputFormat.addInputPath(job, path);
		TextInputFormat informat = new TextInputFormat();
		informat.configure(job);
		InputSplit[] splits = informat.getSplits(job, 1);
		for(InputSplit split: splits)
			readTextCellFrameFromInputSplit(split, informat, job, dest);
	}
	else {
		readRawTextCellFrameFromHDFS(path, job, fs, dest, schema, names, rlen, clen);
	}
}
 
源代码3 项目: systemds   文件: TensorReaderTextCellParallel.java
@Override
protected TensorBlock readTextCellTensorFromHDFS(Path path, JobConf job, long[] dims,
		Types.ValueType[] schema) throws IOException {
	FileInputFormat.addInputPath(job, path);
	TextInputFormat informat = new TextInputFormat();
	informat.configure(job);
	
	int[] idims = Arrays.stream(dims).mapToInt(i -> (int) i).toArray();
	TensorBlock ret;
	if( schema.length == 1 )
		ret = new TensorBlock(schema[0], idims).allocateBlock();
	else
		ret = new TensorBlock(schema, idims).allocateBlock();
	try {
		ExecutorService pool = CommonThreadPool.get(_numThreads);
		InputSplit[] splits = informat.getSplits(job, _numThreads);
		
		//create and execute read tasks for all splits
		List<TensorReaderTextCellParallel.ReadTask> tasks = Arrays.stream(splits)
				.map(s -> new TensorReaderTextCellParallel.ReadTask(s, informat, job, ret))
				.collect(Collectors.toList());
		List<Future<Object>> rt = pool.invokeAll(tasks);
		
		//check for exceptions
		for (Future<Object> task : rt)
			task.get();
		
		pool.shutdown();
	}
	catch (Exception e) {
		throw new IOException("Threadpool issue, while parallel read.", e);
	}
	return ret;
}
 
源代码4 项目: systemds   文件: FrameReaderTextCSV.java
protected void readCSVFrameFromHDFS( Path path, JobConf job, FileSystem fs, 
		FrameBlock dest, ValueType[] schema, String[] names, long rlen, long clen) 
	throws IOException
{
	TextInputFormat informat = new TextInputFormat();
	informat.configure(job);
	InputSplit[] splits = informat.getSplits(job, 1);
	splits = IOUtilFunctions.sortInputSplits(splits);
	for( int i=0, rpos=0; i<splits.length; i++ )
		rpos = readCSVFrameFromInputSplit(splits[i], informat,
			job, dest, schema, names, rlen, clen, rpos, i==0);
}
 
源代码5 项目: systemds   文件: FrameReaderTextCSVParallel.java
@Override
protected Pair<Integer,Integer> computeCSVSize( Path path, JobConf job, FileSystem fs) 
	throws IOException 
{
	int numThreads = OptimizerUtils.getParallelTextReadParallelism();
	
	TextInputFormat informat = new TextInputFormat();
	informat.configure(job);
	InputSplit[] splits = informat.getSplits(job, numThreads);
	
	//compute number of columns
	int ncol = IOUtilFunctions.countNumColumnsCSV(splits, informat, job, _props.getDelim());
	
	//compute number of rows
	int nrow = 0;
	ExecutorService pool = CommonThreadPool.get(numThreads);
	try {
		ArrayList<CountRowsTask> tasks = new ArrayList<>();
		for( int i=0; i<splits.length; i++ )
			tasks.add(new CountRowsTask(splits[i], informat, job, _props.hasHeader(), i==0));
		List<Future<Long>> cret = pool.invokeAll(tasks);
		for( Future<Long> count : cret ) 
			nrow += count.get().intValue();
	}
	catch (Exception e) {
		throw new IOException("Failed parallel read of text csv input.", e);
	}
	finally {
		pool.shutdown();
	}
	return new Pair<>(nrow, ncol);
}
 
源代码6 项目: systemds   文件: FrameReaderTextCSV.java
protected Pair<Integer,Integer> computeCSVSize( Path path, JobConf job, FileSystem fs) 
	throws IOException 
{	
	TextInputFormat informat = new TextInputFormat();
	informat.configure(job);
	InputSplit[] splits = informat.getSplits(job, 1);
	splits = IOUtilFunctions.sortInputSplits(splits);
	
	//compute number of columns
	int ncol = IOUtilFunctions.countNumColumnsCSV(splits, informat, job, _props.getDelim());
	
	//compute number of rows
	int nrow = 0;
	for( int i=0; i<splits.length; i++ ) 
	{
		RecordReader<LongWritable, Text> reader = informat.getRecordReader(splits[i], job, Reporter.NULL);
		LongWritable key = new LongWritable();
		Text value = new Text();
		
		try
		{
			//ignore header of first split
			if( i==0 && _props.hasHeader() )
				reader.next(key, value);
			
			//count remaining number of rows, ignore meta data
			while ( reader.next(key, value) ) {
				String val = value.toString();
				nrow += ( val.startsWith(TfUtils.TXMTD_MVPREFIX)
					|| val.startsWith(TfUtils.TXMTD_NDPREFIX)) ? 0 : 1; 
			}
		}
		finally {
			IOUtilFunctions.closeSilently(reader);
		}
	}
	return new Pair<>(nrow, ncol);
}
 
源代码7 项目: systemds   文件: FrameReaderTextCSV.java
protected Pair<Integer,Integer> computeCSVSize( Path path, JobConf job, FileSystem fs) 
	throws IOException 
{	
	TextInputFormat informat = new TextInputFormat();
	informat.configure(job);
	InputSplit[] splits = informat.getSplits(job, 1);
	splits = IOUtilFunctions.sortInputSplits(splits);
	
	//compute number of columns
	int ncol = IOUtilFunctions.countNumColumnsCSV(splits, informat, job, _props.getDelim());
	
	//compute number of rows
	int nrow = 0;
	for( int i=0; i<splits.length; i++ ) 
	{
		RecordReader<LongWritable, Text> reader = informat.getRecordReader(splits[i], job, Reporter.NULL);
		LongWritable key = new LongWritable();
		Text value = new Text();
		
		try
		{
			//ignore header of first split
			if( i==0 && _props.hasHeader() )
				reader.next(key, value);
			
			//count remaining number of rows, ignore meta data
			while ( reader.next(key, value) ) {
				String val = value.toString();
				nrow += ( val.startsWith(TfUtils.TXMTD_MVPREFIX)
					|| val.startsWith(TfUtils.TXMTD_NDPREFIX)) ? 0 : 1; 
			}
		}
		finally {
			IOUtilFunctions.closeSilently(reader);
		}
	}
	return new Pair<>(nrow, ncol);
}
 
源代码8 项目: systemds   文件: TensorReaderTextCellParallel.java
@Override
protected TensorBlock readTextCellTensorFromHDFS(Path path, JobConf job, long[] dims,
		Types.ValueType[] schema) throws IOException {
	FileInputFormat.addInputPath(job, path);
	TextInputFormat informat = new TextInputFormat();
	informat.configure(job);
	
	int[] idims = Arrays.stream(dims).mapToInt(i -> (int) i).toArray();
	TensorBlock ret;
	if( schema.length == 1 )
		ret = new TensorBlock(schema[0], idims).allocateBlock();
	else
		ret = new TensorBlock(schema, idims).allocateBlock();
	try {
		ExecutorService pool = CommonThreadPool.get(_numThreads);
		InputSplit[] splits = informat.getSplits(job, _numThreads);
		
		//create and execute read tasks for all splits
		List<TensorReaderTextCellParallel.ReadTask> tasks = Arrays.stream(splits)
				.map(s -> new TensorReaderTextCellParallel.ReadTask(s, informat, job, ret))
				.collect(Collectors.toList());
		List<Future<Object>> rt = pool.invokeAll(tasks);
		
		//check for exceptions
		for (Future<Object> task : rt)
			task.get();
		
		pool.shutdown();
	}
	catch (Exception e) {
		throw new IOException("Threadpool issue, while parallel read.", e);
	}
	return ret;
}
 
源代码9 项目: systemds   文件: ReaderTextLIBSVMParallel.java
@Override
public MatrixBlock readMatrixFromHDFS(String fname, long rlen, long clen,
		int blen, long estnnz) 
	throws IOException, DMLRuntimeException 
{
	// prepare file access
	JobConf job = new JobConf(ConfigurationManager.getCachedJobConf());
	Path path = new Path(fname);
	FileSystem fs = IOUtilFunctions.getFileSystem(path, job);
	
	FileInputFormat.addInputPath(job, path);
	TextInputFormat informat = new TextInputFormat();
	informat.configure(job);

	InputSplit[] splits = informat.getSplits(job, _numThreads);
	splits = IOUtilFunctions.sortInputSplits(splits);

	// check existence and non-empty file
	checkValidInputFile(fs, path);

	// allocate output matrix block
	// First Read Pass (count rows/cols, determine offsets, allocate matrix block)
	MatrixBlock ret = computeLIBSVMSizeAndCreateOutputMatrixBlock(splits, path, job, rlen, clen, estnnz);
	rlen = ret.getNumRows();
	clen = ret.getNumColumns();

	// Second Read Pass (read, parse strings, append to matrix block)
	readLIBSVMMatrixFromHDFS(splits, path, job, ret, rlen, clen, blen);
	
	//post-processing (representation-specific, change of sparse/dense block representation)
	// - nnz explicitly maintained in parallel for the individual splits
	ret.examSparsity();

	// sanity check for parallel row count (since determined internally)
	if (rlen >= 0 && rlen != ret.getNumRows())
		throw new DMLRuntimeException("Read matrix inconsistent with given meta data: "
				+ "expected nrow="+ rlen + ", real nrow=" + ret.getNumRows());

	return ret;
}
 
源代码10 项目: systemds   文件: FrameReaderTextCellParallel.java
@Override
protected void readTextCellFrameFromHDFS( Path path, JobConf job, FileSystem fs, FrameBlock dest, 
		ValueType[] schema, String[] names, long rlen, long clen)
	throws IOException
{
	int numThreads = OptimizerUtils.getParallelTextReadParallelism();
	
	FileInputFormat.addInputPath(job, path);
	TextInputFormat informat = new TextInputFormat();
	informat.configure(job);
	
	try 
	{
		//create read tasks for all splits
		ExecutorService pool = CommonThreadPool.get(numThreads);
		InputSplit[] splits = informat.getSplits(job, numThreads);
		ArrayList<ReadTask> tasks = new ArrayList<>();
		for( InputSplit split : splits )
			tasks.add(new ReadTask(split, informat, job, dest));
		
		//wait until all tasks have been executed
		List<Future<Object>> rt = pool.invokeAll(tasks);
		pool.shutdown();
			
		//check for exceptions
		for( Future<Object> task : rt )
			task.get();
	} 
	catch (Exception e) {
		throw new IOException("Failed parallel read of text cell input.", e);
	}
}
 
源代码11 项目: systemds   文件: FrameReaderTextCellParallel.java
@Override
protected void readTextCellFrameFromHDFS( Path path, JobConf job, FileSystem fs, FrameBlock dest, 
		ValueType[] schema, String[] names, long rlen, long clen)
	throws IOException
{
	int numThreads = OptimizerUtils.getParallelTextReadParallelism();
	
	FileInputFormat.addInputPath(job, path);
	TextInputFormat informat = new TextInputFormat();
	informat.configure(job);
	
	try 
	{
		//create read tasks for all splits
		ExecutorService pool = CommonThreadPool.get(numThreads);
		InputSplit[] splits = informat.getSplits(job, numThreads);
		ArrayList<ReadTask> tasks = new ArrayList<>();
		for( InputSplit split : splits )
			tasks.add(new ReadTask(split, informat, job, dest));
		
		//wait until all tasks have been executed
		List<Future<Object>> rt = pool.invokeAll(tasks);
		pool.shutdown();
			
		//check for exceptions
		for( Future<Object> task : rt )
			task.get();
	} 
	catch (Exception e) {
		throw new IOException("Failed parallel read of text cell input.", e);
	}
}
 
源代码12 项目: systemds   文件: ReaderTextLIBSVMParallel.java
@Override
public MatrixBlock readMatrixFromHDFS(String fname, long rlen, long clen,
		int blen, long estnnz) 
	throws IOException, DMLRuntimeException 
{
	// prepare file access
	JobConf job = new JobConf(ConfigurationManager.getCachedJobConf());
	Path path = new Path(fname);
	FileSystem fs = IOUtilFunctions.getFileSystem(path, job);
	
	FileInputFormat.addInputPath(job, path);
	TextInputFormat informat = new TextInputFormat();
	informat.configure(job);

	InputSplit[] splits = informat.getSplits(job, _numThreads);
	splits = IOUtilFunctions.sortInputSplits(splits);

	// check existence and non-empty file
	checkValidInputFile(fs, path);

	// allocate output matrix block
	// First Read Pass (count rows/cols, determine offsets, allocate matrix block)
	MatrixBlock ret = computeLIBSVMSizeAndCreateOutputMatrixBlock(splits, path, job, rlen, clen, estnnz);
	rlen = ret.getNumRows();
	clen = ret.getNumColumns();

	// Second Read Pass (read, parse strings, append to matrix block)
	readLIBSVMMatrixFromHDFS(splits, path, job, ret, rlen, clen, blen);
	
	//post-processing (representation-specific, change of sparse/dense block representation)
	// - nnz explicitly maintained in parallel for the individual splits
	ret.examSparsity();

	// sanity check for parallel row count (since determined internally)
	if (rlen >= 0 && rlen != ret.getNumRows())
		throw new DMLRuntimeException("Read matrix inconsistent with given meta data: "
				+ "expected nrow="+ rlen + ", real nrow=" + ret.getNumRows());

	return ret;
}
 
源代码13 项目: systemds   文件: FrameReaderJSONLParallel.java
@Override
protected void readJSONLFrameFromHDFS(Path path, JobConf jobConf, FileSystem fileSystem,
		FrameBlock dest, Types.ValueType[] schema, Map<String, Integer> schemaMap) 
	throws IOException
{
	int numThreads = OptimizerUtils.getParallelTextReadParallelism();

	TextInputFormat inputFormat = new TextInputFormat();
	inputFormat.configure(jobConf);
	InputSplit[] splits = inputFormat.getSplits(jobConf, numThreads);
	splits = IOUtilFunctions.sortInputSplits(splits);

	try{
		ExecutorService executorPool = CommonThreadPool.get(Math.min(numThreads, splits.length));

		//compute num rows per split
		ArrayList<CountRowsTask> countRowsTasks = new ArrayList<>();
		for (InputSplit split : splits){
			countRowsTasks.add(new CountRowsTask(split, inputFormat, jobConf));
		}
		List<Future<Long>> ret = executorPool.invokeAll(countRowsTasks);

		//compute row offset per split via cumsum on row counts
		long offset = 0;
		List<Long> offsets = new ArrayList<>();
		for( Future<Long> rc : ret ) {
			offsets.add(offset);
			offset += rc.get();
		}

		//read individual splits
		ArrayList<ReadRowsTask> readRowsTasks = new ArrayList<>();
		for( int i=0; i<splits.length; i++ )
			readRowsTasks.add(new ReadRowsTask(splits[i], inputFormat,
				jobConf, dest, schemaMap, offsets.get(i).intValue()));
		CommonThreadPool.invokeAndShutdown(executorPool, readRowsTasks);
	}
	catch (Exception e) {
		throw new IOException("Failed parallel read of JSONL input.", e);
	}
}
 
源代码14 项目: systemds   文件: DataPartitionerLocal.java
private void partitionTextCell( String fname, String fnameStaging, String fnameNew, long rlen, long clen, int blen ) 
{
	long row = -1;
	long col = -1;
	
	try 
	{
		//STEP 1: read matrix from HDFS and write blocks to local staging area
		//check and add input path
		JobConf job = new JobConf(ConfigurationManager.getCachedJobConf());
		Path path = new Path(fname);
		FileInputFormat.addInputPath(job, path);
		TextInputFormat informat = new TextInputFormat();
		informat.configure(job);
		InputSplit[] splits = informat.getSplits(job, 1);
		
		LinkedList<Cell> buffer = new LinkedList<>();
		LongWritable key = new LongWritable();
		Text value = new Text();
		FastStringTokenizer st = new FastStringTokenizer(' ');
		
		for(InputSplit split: splits)
		{
			RecordReader<LongWritable,Text> reader = informat.getRecordReader(split, job, Reporter.NULL);
			try
			{
				while(reader.next(key, value))
				{
					st.reset( value.toString() ); //reset tokenizer
					row = st.nextLong();
					col = st.nextLong();
					double lvalue = st.nextDouble();
					Cell tmp = new Cell( row, col, lvalue ); 
	
					buffer.addLast( tmp );
					if( buffer.size() > StagingFileUtils.CELL_BUFFER_SIZE ) //periodic flush
					{
						appendCellBufferToStagingArea(fnameStaging, buffer, blen);
						buffer.clear();
					}
				}
				
				//final flush
				if( !buffer.isEmpty() )
				{
					appendCellBufferToStagingArea(fnameStaging, buffer, blen);
					buffer.clear();
				}
			}
			finally {
				IOUtilFunctions.closeSilently(reader);
			}
		}

		//STEP 2: read matrix blocks from staging area and write matrix to HDFS
		String[] fnamesPartitions = new File(fnameStaging).list();	
		if(PARALLEL) 
		{
			int len = Math.min(fnamesPartitions.length, _par);
			Thread[] threads = new Thread[len];
			for( int i=0;i<len;i++ )
			{
				int start = i*(int)Math.ceil(((double)fnamesPartitions.length)/len);
				int end = (i+1)*(int)Math.ceil(((double)fnamesPartitions.length)/len)-1;
				end = Math.min(end, fnamesPartitions.length-1);
				threads[i] = new Thread(new DataPartitionerWorkerTextCell(job, fnameNew, fnameStaging, fnamesPartitions, start, end));
				threads[i].start();
			}
			
			for( Thread t : threads )
				t.join();
		}
		else
		{
			for( String pdir : fnamesPartitions  )
				writeTextCellFileToHDFS( job, fnameNew, fnameStaging+"/"+pdir );	
		}
	} 
	catch (Exception e) 
	{
		//post-mortem error handling and bounds checking
		if( row < 1 || row > rlen || col < 1 || col > clen )
		{
			throw new DMLRuntimeException("Matrix cell ["+(row)+","+(col)+"] " +
								          "out of overall matrix range [1:"+rlen+",1:"+clen+"].");
		}
		else
			throw new DMLRuntimeException("Unable to partition text cell matrix.", e);
	}
}
 
源代码15 项目: systemds   文件: ReaderTextCSVParallel.java
@Override
public MatrixBlock readMatrixFromHDFS(String fname, long rlen, long clen,
		int blen, long estnnz) 
	throws IOException, DMLRuntimeException 
{
	// prepare file access
	JobConf job = new JobConf(ConfigurationManager.getCachedJobConf());
	Path path = new Path(fname);
	FileSystem fs = IOUtilFunctions.getFileSystem(path, job);
	
	FileInputFormat.addInputPath(job, path);
	TextInputFormat informat = new TextInputFormat();
	informat.configure(job);

	InputSplit[] splits = informat.getSplits(job, _numThreads);
	splits = IOUtilFunctions.sortInputSplits(splits);

	// check existence and non-empty file
	checkValidInputFile(fs, path);

	// allocate output matrix block
	// First Read Pass (count rows/cols, determine offsets, allocate matrix block)
	MatrixBlock ret = computeCSVSizeAndCreateOutputMatrixBlock(splits, path, job,
		_props.hasHeader(), _props.getDelim(), rlen, clen, estnnz);
	rlen = ret.getNumRows();
	clen = ret.getNumColumns();

	// Second Read Pass (read, parse strings, append to matrix block)
	readCSVMatrixFromHDFS(splits, path, job, ret, rlen, clen, blen,
			_props.hasHeader(), _props.getDelim(), _props.isFill(),
			_props.getFillValue());
	
	//post-processing (representation-specific, change of sparse/dense block representation)
	// - no sorting required for CSV because it is read in sorted order per row
	// - nnz explicitly maintained in parallel for the individual splits
	ret.examSparsity();

	// sanity check for parallel row count (since determined internally)
	if (rlen >= 0 && rlen != ret.getNumRows())
		throw new DMLRuntimeException("Read matrix inconsistent with given meta data: "
				+ "expected nrow="+ rlen + ", real nrow=" + ret.getNumRows());

	return ret;
}
 
源代码16 项目: systemds   文件: FrameReaderTextCSVParallel.java
@Override
protected void readCSVFrameFromHDFS( Path path, JobConf job, FileSystem fs, 
		FrameBlock dest, ValueType[] schema, String[] names, long rlen, long clen) 
	throws IOException
{
	int numThreads = OptimizerUtils.getParallelTextReadParallelism();
	
	TextInputFormat informat = new TextInputFormat();
	informat.configure(job);
	InputSplit[] splits = informat.getSplits(job, numThreads); 
	splits = IOUtilFunctions.sortInputSplits(splits);

	try 
	{
		ExecutorService pool = CommonThreadPool.get(
			Math.min(numThreads, splits.length));
		
		//compute num rows per split
		ArrayList<CountRowsTask> tasks = new ArrayList<>();
		for( int i=0; i<splits.length; i++ )
			tasks.add(new CountRowsTask(splits[i], informat, job, _props.hasHeader(), i==0));
		List<Future<Long>> cret = pool.invokeAll(tasks);

		//compute row offset per split via cumsum on row counts
		long offset = 0;
		List<Long> offsets = new ArrayList<>();
		for( Future<Long> count : cret ) {
			offsets.add(offset);
			offset += count.get();
		}
		
		//read individual splits
		ArrayList<ReadRowsTask> tasks2 = new ArrayList<>();
		for( int i=0; i<splits.length; i++ )
			tasks2.add( new ReadRowsTask(splits[i], informat, job, dest, offsets.get(i).intValue(), i==0));
		CommonThreadPool.invokeAndShutdown(pool, tasks2);
	} 
	catch (Exception e) {
		throw new IOException("Failed parallel read of text csv input.", e);
	}
}
 
源代码17 项目: systemds   文件: ReaderTextCell.java
protected void readTextCellMatrixFromHDFS( Path path, JobConf job, MatrixBlock dest, long rlen, long clen, int blen )
	throws IOException
{
	boolean sparse = dest.isInSparseFormat();
	FileInputFormat.addInputPath(job, path);
	TextInputFormat informat = new TextInputFormat();
	informat.configure(job);
	InputSplit[] splits = informat.getSplits(job, 1);
	
	LongWritable key = new LongWritable();
	Text value = new Text();
	IJV cell = new IJV();
	long nnz = 0;
	
	try
	{
		FastStringTokenizer st = new FastStringTokenizer(' ');
		
		for(InputSplit split: splits) {
			RecordReader<LongWritable,Text> reader = informat.getRecordReader(split, job, Reporter.NULL);
			try {
				if( sparse ) { //SPARSE<-value
					while( reader.next(key, value) ) {
						cell = parseCell(value.toString(), st, cell, _mmProps);
						appendCell(cell, dest, _mmProps);
					}
					dest.sortSparseRows();
				} 
				else { //DENSE<-value
					DenseBlock a = dest.getDenseBlock();
					while( reader.next(key, value) ) {
						cell = parseCell(value.toString(), st, cell, _mmProps);
						nnz += appendCell(cell, a, _mmProps);
					}
				}
			}
			finally {
				IOUtilFunctions.closeSilently(reader);
			}
		}
		
		if( !dest.isInSparseFormat() )
			dest.setNonZeros(nnz);
	}
	catch(Exception ex) {
		//post-mortem error handling and bounds checking
		if( cell.getI() < 0 || cell.getI() + 1 > rlen || cell.getJ() < 0 || cell.getJ() + 1 > clen )
			throw new IOException("Matrix cell ["+(cell.getI()+1)+","+(cell.getJ()+1)+"] "
				+ "out of overall matrix range [1:"+rlen+",1:"+clen+"].");
		else
			throw new IOException( "Unable to read matrix in text cell format.", ex );
	}
}
 
源代码18 项目: systemds   文件: ReaderTextCell.java
protected void readTextCellMatrixFromHDFS( Path path, JobConf job, MatrixBlock dest, long rlen, long clen, int blen )
	throws IOException
{
	boolean sparse = dest.isInSparseFormat();
	FileInputFormat.addInputPath(job, path);
	TextInputFormat informat = new TextInputFormat();
	informat.configure(job);
	InputSplit[] splits = informat.getSplits(job, 1);
	
	LongWritable key = new LongWritable();
	Text value = new Text();
	IJV cell = new IJV();
	long nnz = 0;
	
	try
	{
		FastStringTokenizer st = new FastStringTokenizer(' ');
		
		for(InputSplit split: splits) {
			RecordReader<LongWritable,Text> reader = informat.getRecordReader(split, job, Reporter.NULL);
			try {
				if( sparse ) { //SPARSE<-value
					while( reader.next(key, value) ) {
						cell = parseCell(value.toString(), st, cell, _mmProps);
						appendCell(cell, dest, _mmProps);
					}
					dest.sortSparseRows();
				} 
				else { //DENSE<-value
					DenseBlock a = dest.getDenseBlock();
					while( reader.next(key, value) ) {
						cell = parseCell(value.toString(), st, cell, _mmProps);
						nnz += appendCell(cell, a, _mmProps);
					}
				}
			}
			finally {
				IOUtilFunctions.closeSilently(reader);
			}
		}
		
		if( !dest.isInSparseFormat() )
			dest.setNonZeros(nnz);
	}
	catch(Exception ex) {
		//post-mortem error handling and bounds checking
		if( cell.getI() < 0 || cell.getI() + 1 > rlen || cell.getJ() < 0 || cell.getJ() + 1 > clen )
			throw new IOException("Matrix cell ["+(cell.getI()+1)+","+(cell.getJ()+1)+"] "
				+ "out of overall matrix range [1:"+rlen+",1:"+clen+"].");
		else
			throw new IOException( "Unable to read matrix in text cell format.", ex );
	}
}
 
源代码19 项目: systemds   文件: ResultMergeLocalFile.java
private static void mergeTextCellWithoutComp( String fnameNew, MatrixObject outMo, ArrayList<MatrixObject> inMO ) 
{
	try
	{
		//delete target file if already exists
		HDFSTool.deleteFileIfExistOnHDFS(fnameNew);
		
		if( ALLOW_COPY_CELLFILES )
		{
			copyAllFiles(fnameNew, inMO);
			return; //we're done
		}
		
		//actual merge
		JobConf job = new JobConf(ConfigurationManager.getCachedJobConf());
		Path path = new Path( fnameNew );
		FileSystem fs = IOUtilFunctions.getFileSystem(path, job);
		BufferedWriter out = new BufferedWriter(new OutputStreamWriter(fs.create(path,true)));		
		
		String valueStr = null;
		
		try
		{
			for( MatrixObject in : inMO ) //read/write all inputs
			{
				if( LOG.isTraceEnabled() )
					LOG.trace("ResultMerge (local, file): Merge input "+in.hashCode()+" (fname="
						+in.getFileName()+") via stream merge");
				
				JobConf tmpJob = new JobConf(ConfigurationManager.getCachedJobConf());
				Path tmpPath = new Path(in.getFileName());
				FileInputFormat.addInputPath(tmpJob, tmpPath);
				TextInputFormat informat = new TextInputFormat();
				informat.configure(tmpJob);
				InputSplit[] splits = informat.getSplits(tmpJob, 1);
				
				LongWritable key = new LongWritable();
				Text value = new Text();
	
				for(InputSplit split: splits)
				{
					RecordReader<LongWritable,Text> reader = informat.getRecordReader(split, tmpJob, Reporter.NULL);
					try
					{
						while(reader.next(key, value))
						{
							valueStr = value.toString().trim();	
							out.write( valueStr+"\n" );
						}
					}
					finally {
						IOUtilFunctions.closeSilently(reader);
					}
				}
			}
		}
		finally {
			IOUtilFunctions.closeSilently(out);
		}
	}
	catch(Exception ex)
	{
		throw new DMLRuntimeException("Unable to merge text cell results.", ex);
	}
}
 
源代码20 项目: systemds   文件: ReaderTextCSVParallel.java
@Override
public MatrixBlock readMatrixFromHDFS(String fname, long rlen, long clen,
		int blen, long estnnz) 
	throws IOException, DMLRuntimeException 
{
	// prepare file access
	JobConf job = new JobConf(ConfigurationManager.getCachedJobConf());
	Path path = new Path(fname);
	FileSystem fs = IOUtilFunctions.getFileSystem(path, job);
	
	FileInputFormat.addInputPath(job, path);
	TextInputFormat informat = new TextInputFormat();
	informat.configure(job);

	InputSplit[] splits = informat.getSplits(job, _numThreads);
	splits = IOUtilFunctions.sortInputSplits(splits);

	// check existence and non-empty file
	checkValidInputFile(fs, path);

	// allocate output matrix block
	// First Read Pass (count rows/cols, determine offsets, allocate matrix block)
	MatrixBlock ret = computeCSVSizeAndCreateOutputMatrixBlock(splits, path, job,
		_props.hasHeader(), _props.getDelim(), rlen, clen, estnnz);
	rlen = ret.getNumRows();
	clen = ret.getNumColumns();

	// Second Read Pass (read, parse strings, append to matrix block)
	readCSVMatrixFromHDFS(splits, path, job, ret, rlen, clen, blen,
			_props.hasHeader(), _props.getDelim(), _props.isFill(),
			_props.getFillValue());
	
	//post-processing (representation-specific, change of sparse/dense block representation)
	// - no sorting required for CSV because it is read in sorted order per row
	// - nnz explicitly maintained in parallel for the individual splits
	ret.examSparsity();

	// sanity check for parallel row count (since determined internally)
	if (rlen >= 0 && rlen != ret.getNumRows())
		throw new DMLRuntimeException("Read matrix inconsistent with given meta data: "
				+ "expected nrow="+ rlen + ", real nrow=" + ret.getNumRows());

	return ret;
}