org.apache.hadoop.mapred.TextInputFormat#configure ( )源码实例Demo

下面列出了org.apache.hadoop.mapred.TextInputFormat#configure ( ) 实例代码,或者点击链接到github查看源代码,也可以在右侧发表评论。

源代码1 项目: systemds   文件: ReaderTextLIBSVMParallel.java
private void readLIBSVMMatrixFromHDFS(InputSplit[] splits, Path path, JobConf job, 
		MatrixBlock dest, long rlen, long clen, int blen) 
	throws IOException 
{
	FileInputFormat.addInputPath(job, path);
	TextInputFormat informat = new TextInputFormat();
	informat.configure(job);

	ExecutorService pool = CommonThreadPool.get(_numThreads);

	try 
	{
		// create read tasks for all splits
		ArrayList<LIBSVMReadTask> tasks = new ArrayList<>();
		int splitCount = 0;
		for (InputSplit split : splits) {
			tasks.add( new LIBSVMReadTask(split, _offsets, informat, job, dest, rlen, clen, splitCount++) );
		}
		pool.invokeAll(tasks);
		pool.shutdown();

		// check return codes and aggregate nnz
		long lnnz = 0;
		for (LIBSVMReadTask rt : tasks) {
			lnnz += rt.getPartialNnz();
			if (!rt.getReturnCode()) {
				Exception err = rt.getException();
				throw new IOException("Read task for libsvm input failed: "+ err.toString(), err);
			}
		}
		dest.setNonZeros(lnnz);
	} 
	catch (Exception e) {
		throw new IOException("Threadpool issue, while parallel read.", e);
	}
}
 
源代码2 项目: systemds   文件: TensorReaderTextCell.java
protected TensorBlock readTextCellTensorFromHDFS(Path path, JobConf job, long[] dims, ValueType[] schema) throws IOException {
	FileInputFormat.addInputPath(job, path);
	TextInputFormat informat = new TextInputFormat();
	informat.configure(job);
	InputSplit[] splits = informat.getSplits(job, 1);

	LongWritable key = new LongWritable();
	Text value = new Text();
	int[] idims = Arrays.stream(dims).mapToInt(i -> (int) i).toArray();
	TensorBlock ret;
	if (schema.length == 1)
		ret = new TensorBlock(schema[0], idims).allocateBlock();
	else
		ret = new TensorBlock(schema, idims).allocateBlock();

	try {
		int[] ix = new int[dims.length];
		for (InputSplit split : splits) {
			RecordReader<LongWritable, Text> reader = informat.getRecordReader(split, job, Reporter.NULL);
			try {
				while (reader.next(key, value)) {
					String[] parts = Arrays.stream(IOUtilFunctions.splitCSV(value.toString(), " "))
							.filter(s -> !s.isEmpty()).toArray(String[]::new);
					for (int i = 0; i < ix.length; i++) {
						ix[i] = Integer.parseInt(parts[i]) - 1;
					}
					ret.set(ix, parts[ix.length]);
				}
			}
			finally {
				IOUtilFunctions.closeSilently(reader);
			}
		}
	}
	catch (Exception ex) {
		throw new IOException("Unable to read tensor in text cell format.", ex);
	}
	return ret;
}
 
源代码3 项目: systemds   文件: FrameReaderTextCSV.java
protected Pair<Integer,Integer> computeCSVSize( Path path, JobConf job, FileSystem fs) 
	throws IOException 
{	
	TextInputFormat informat = new TextInputFormat();
	informat.configure(job);
	InputSplit[] splits = informat.getSplits(job, 1);
	splits = IOUtilFunctions.sortInputSplits(splits);
	
	//compute number of columns
	int ncol = IOUtilFunctions.countNumColumnsCSV(splits, informat, job, _props.getDelim());
	
	//compute number of rows
	int nrow = 0;
	for( int i=0; i<splits.length; i++ ) 
	{
		RecordReader<LongWritable, Text> reader = informat.getRecordReader(splits[i], job, Reporter.NULL);
		LongWritable key = new LongWritable();
		Text value = new Text();
		
		try
		{
			//ignore header of first split
			if( i==0 && _props.hasHeader() )
				reader.next(key, value);
			
			//count remaining number of rows, ignore meta data
			while ( reader.next(key, value) ) {
				String val = value.toString();
				nrow += ( val.startsWith(TfUtils.TXMTD_MVPREFIX)
					|| val.startsWith(TfUtils.TXMTD_NDPREFIX)) ? 0 : 1; 
			}
		}
		finally {
			IOUtilFunctions.closeSilently(reader);
		}
	}
	return new Pair<>(nrow, ncol);
}
 
源代码4 项目: systemds   文件: TensorReaderTextCellParallel.java
@Override
protected TensorBlock readTextCellTensorFromHDFS(Path path, JobConf job, long[] dims,
		Types.ValueType[] schema) throws IOException {
	FileInputFormat.addInputPath(job, path);
	TextInputFormat informat = new TextInputFormat();
	informat.configure(job);
	
	int[] idims = Arrays.stream(dims).mapToInt(i -> (int) i).toArray();
	TensorBlock ret;
	if( schema.length == 1 )
		ret = new TensorBlock(schema[0], idims).allocateBlock();
	else
		ret = new TensorBlock(schema, idims).allocateBlock();
	try {
		ExecutorService pool = CommonThreadPool.get(_numThreads);
		InputSplit[] splits = informat.getSplits(job, _numThreads);
		
		//create and execute read tasks for all splits
		List<TensorReaderTextCellParallel.ReadTask> tasks = Arrays.stream(splits)
				.map(s -> new TensorReaderTextCellParallel.ReadTask(s, informat, job, ret))
				.collect(Collectors.toList());
		List<Future<Object>> rt = pool.invokeAll(tasks);
		
		//check for exceptions
		for (Future<Object> task : rt)
			task.get();
		
		pool.shutdown();
	}
	catch (Exception e) {
		throw new IOException("Threadpool issue, while parallel read.", e);
	}
	return ret;
}
 
源代码5 项目: systemds   文件: ReaderTextCSVParallel.java
private void readCSVMatrixFromHDFS(InputSplit[] splits, Path path, JobConf job, 
		MatrixBlock dest, long rlen, long clen, int blen, 
		boolean hasHeader, String delim, boolean fill, double fillValue) 
	throws IOException 
{
	FileInputFormat.addInputPath(job, path);
	TextInputFormat informat = new TextInputFormat();
	informat.configure(job);

	ExecutorService pool = CommonThreadPool.get(_numThreads);

	try 
	{
		// create read tasks for all splits
		ArrayList<CSVReadTask> tasks = new ArrayList<>();
		int splitCount = 0;
		for (InputSplit split : splits) {
			tasks.add( new CSVReadTask(split, _offsets, informat, job, dest, 
				rlen, clen, hasHeader, delim, fill, fillValue, splitCount++) );
		}
		pool.invokeAll(tasks);
		pool.shutdown();

		// check return codes and aggregate nnz
		long lnnz = 0;
		for (CSVReadTask rt : tasks) {
			lnnz += rt.getPartialNnz();
			if (!rt.getReturnCode()) {
				Exception err = rt.getException();
				throw new IOException("Read task for csv input failed: "+ err.toString(), err);
			}
		}
		dest.setNonZeros(lnnz);
	} 
	catch (Exception e) {
		throw new IOException("Threadpool issue, while parallel read.", e);
	}
}
 
源代码6 项目: attic-apex-malhar   文件: MapOperatorTest.java
public void testNodeProcessingSchema(MapOperator<LongWritable, Text, Text, IntWritable> oper) throws IOException
{

  CollectorTestSink sortSink = new CollectorTestSink();
  oper.output.setSink(sortSink);

  oper.setMapClass(WordCount.Map.class);
  oper.setCombineClass(WordCount.Reduce.class);
  oper.setDirName(testMeta.testDir);
  oper.setConfigFile(null);
  oper.setInputFormatClass(TextInputFormat.class);

  Configuration conf = new Configuration();
  JobConf jobConf = new JobConf(conf);
  FileInputFormat.setInputPaths(jobConf, new Path(testMeta.testDir));
  TextInputFormat inputFormat = new TextInputFormat();
  inputFormat.configure(jobConf);
  InputSplit[] splits = inputFormat.getSplits(jobConf, 1);
  SerializationFactory serializationFactory = new SerializationFactory(conf);
  Serializer keySerializer = serializationFactory.getSerializer(splits[0].getClass());
  keySerializer.open(oper.getOutstream());
  keySerializer.serialize(splits[0]);
  oper.setInputSplitClass(splits[0].getClass());
  keySerializer.close();
  oper.setup(null);
  oper.beginWindow(0);
  oper.emitTuples();
  oper.emitTuples();
  oper.endWindow();
  oper.beginWindow(1);
  oper.emitTuples();
  oper.endWindow();

  Assert.assertEquals("number emitted tuples", 3, sortSink.collectedTuples.size());
  for (Object o : sortSink.collectedTuples) {
    LOG.debug(o.toString());
  }
  LOG.debug("Done testing round\n");
  oper.teardown();
}
 
源代码7 项目: systemds   文件: FrameReaderTextCSV.java
protected void readCSVFrameFromHDFS( Path path, JobConf job, FileSystem fs, 
		FrameBlock dest, ValueType[] schema, String[] names, long rlen, long clen) 
	throws IOException
{
	TextInputFormat informat = new TextInputFormat();
	informat.configure(job);
	InputSplit[] splits = informat.getSplits(job, 1);
	splits = IOUtilFunctions.sortInputSplits(splits);
	for( int i=0, rpos=0; i<splits.length; i++ )
		rpos = readCSVFrameFromInputSplit(splits[i], informat,
			job, dest, schema, names, rlen, clen, rpos, i==0);
}
 
源代码8 项目: systemds   文件: FrameReaderTextCSV.java
protected Pair<Integer,Integer> computeCSVSize( Path path, JobConf job, FileSystem fs) 
	throws IOException 
{	
	TextInputFormat informat = new TextInputFormat();
	informat.configure(job);
	InputSplit[] splits = informat.getSplits(job, 1);
	splits = IOUtilFunctions.sortInputSplits(splits);
	
	//compute number of columns
	int ncol = IOUtilFunctions.countNumColumnsCSV(splits, informat, job, _props.getDelim());
	
	//compute number of rows
	int nrow = 0;
	for( int i=0; i<splits.length; i++ ) 
	{
		RecordReader<LongWritable, Text> reader = informat.getRecordReader(splits[i], job, Reporter.NULL);
		LongWritable key = new LongWritable();
		Text value = new Text();
		
		try
		{
			//ignore header of first split
			if( i==0 && _props.hasHeader() )
				reader.next(key, value);
			
			//count remaining number of rows, ignore meta data
			while ( reader.next(key, value) ) {
				String val = value.toString();
				nrow += ( val.startsWith(TfUtils.TXMTD_MVPREFIX)
					|| val.startsWith(TfUtils.TXMTD_NDPREFIX)) ? 0 : 1; 
			}
		}
		finally {
			IOUtilFunctions.closeSilently(reader);
		}
	}
	return new Pair<>(nrow, ncol);
}
 
源代码9 项目: systemds   文件: ReaderTextLIBSVMParallel.java
@Override
public MatrixBlock readMatrixFromHDFS(String fname, long rlen, long clen,
		int blen, long estnnz) 
	throws IOException, DMLRuntimeException 
{
	// prepare file access
	JobConf job = new JobConf(ConfigurationManager.getCachedJobConf());
	Path path = new Path(fname);
	FileSystem fs = IOUtilFunctions.getFileSystem(path, job);
	
	FileInputFormat.addInputPath(job, path);
	TextInputFormat informat = new TextInputFormat();
	informat.configure(job);

	InputSplit[] splits = informat.getSplits(job, _numThreads);
	splits = IOUtilFunctions.sortInputSplits(splits);

	// check existence and non-empty file
	checkValidInputFile(fs, path);

	// allocate output matrix block
	// First Read Pass (count rows/cols, determine offsets, allocate matrix block)
	MatrixBlock ret = computeLIBSVMSizeAndCreateOutputMatrixBlock(splits, path, job, rlen, clen, estnnz);
	rlen = ret.getNumRows();
	clen = ret.getNumColumns();

	// Second Read Pass (read, parse strings, append to matrix block)
	readLIBSVMMatrixFromHDFS(splits, path, job, ret, rlen, clen, blen);
	
	//post-processing (representation-specific, change of sparse/dense block representation)
	// - nnz explicitly maintained in parallel for the individual splits
	ret.examSparsity();

	// sanity check for parallel row count (since determined internally)
	if (rlen >= 0 && rlen != ret.getNumRows())
		throw new DMLRuntimeException("Read matrix inconsistent with given meta data: "
				+ "expected nrow="+ rlen + ", real nrow=" + ret.getNumRows());

	return ret;
}
 
源代码10 项目: systemds   文件: ReaderTextLIBSVMParallel.java
private void readLIBSVMMatrixFromHDFS(InputSplit[] splits, Path path, JobConf job, 
		MatrixBlock dest, long rlen, long clen, int blen) 
	throws IOException 
{
	FileInputFormat.addInputPath(job, path);
	TextInputFormat informat = new TextInputFormat();
	informat.configure(job);

	ExecutorService pool = CommonThreadPool.get(_numThreads);

	try 
	{
		// create read tasks for all splits
		ArrayList<LIBSVMReadTask> tasks = new ArrayList<>();
		int splitCount = 0;
		for (InputSplit split : splits) {
			tasks.add( new LIBSVMReadTask(split, _offsets, informat, job, dest, rlen, clen, splitCount++) );
		}
		pool.invokeAll(tasks);
		pool.shutdown();

		// check return codes and aggregate nnz
		long lnnz = 0;
		for (LIBSVMReadTask rt : tasks) {
			lnnz += rt.getPartialNnz();
			if (!rt.getReturnCode()) {
				Exception err = rt.getException();
				throw new IOException("Read task for libsvm input failed: "+ err.toString(), err);
			}
		}
		dest.setNonZeros(lnnz);
	} 
	catch (Exception e) {
		throw new IOException("Threadpool issue, while parallel read.", e);
	}
}
 
源代码11 项目: systemds   文件: ResultMergeLocalFile.java
private static void mergeTextCellWithoutComp( String fnameNew, MatrixObject outMo, ArrayList<MatrixObject> inMO ) 
{
	try
	{
		//delete target file if already exists
		HDFSTool.deleteFileIfExistOnHDFS(fnameNew);
		
		if( ALLOW_COPY_CELLFILES )
		{
			copyAllFiles(fnameNew, inMO);
			return; //we're done
		}
		
		//actual merge
		JobConf job = new JobConf(ConfigurationManager.getCachedJobConf());
		Path path = new Path( fnameNew );
		FileSystem fs = IOUtilFunctions.getFileSystem(path, job);
		BufferedWriter out = new BufferedWriter(new OutputStreamWriter(fs.create(path,true)));		
		
		String valueStr = null;
		
		try
		{
			for( MatrixObject in : inMO ) //read/write all inputs
			{
				if( LOG.isTraceEnabled() )
					LOG.trace("ResultMerge (local, file): Merge input "+in.hashCode()+" (fname="
						+in.getFileName()+") via stream merge");
				
				JobConf tmpJob = new JobConf(ConfigurationManager.getCachedJobConf());
				Path tmpPath = new Path(in.getFileName());
				FileInputFormat.addInputPath(tmpJob, tmpPath);
				TextInputFormat informat = new TextInputFormat();
				informat.configure(tmpJob);
				InputSplit[] splits = informat.getSplits(tmpJob, 1);
				
				LongWritable key = new LongWritable();
				Text value = new Text();
	
				for(InputSplit split: splits)
				{
					RecordReader<LongWritable,Text> reader = informat.getRecordReader(split, tmpJob, Reporter.NULL);
					try
					{
						while(reader.next(key, value))
						{
							valueStr = value.toString().trim();	
							out.write( valueStr+"\n" );
						}
					}
					finally {
						IOUtilFunctions.closeSilently(reader);
					}
				}
			}
		}
		finally {
			IOUtilFunctions.closeSilently(out);
		}
	}
	catch(Exception ex)
	{
		throw new DMLRuntimeException("Unable to merge text cell results.", ex);
	}
}
 
源代码12 项目: systemds   文件: ResultMergeLocalFile.java
private static void createTextCellStagingFile( String fnameStaging, MatrixObject mo, long ID ) 
	throws IOException, DMLRuntimeException
{		
	JobConf job = new JobConf(ConfigurationManager.getCachedJobConf());
	Path path = new Path(mo.getFileName());
	FileInputFormat.addInputPath(job, path);
	TextInputFormat informat = new TextInputFormat();
	informat.configure(job);
	InputSplit[] splits = informat.getSplits(job, 1);
	
	LinkedList<Cell> buffer = new LinkedList<>();
	LongWritable key = new LongWritable();
	Text value = new Text();

	DataCharacteristics mc = mo.getDataCharacteristics();
	int blen = mc.getBlocksize(); 
	//long row = -1, col = -1; //FIXME needs reconsideration whenever textcell is used actively
	//NOTE MB: Originally, we used long row, col but this led reproducibly to JIT compilation
	// errors during runtime; experienced under WINDOWS, Intel x86-64, IBM JDK 64bit/32bit.
	// It works fine with int row, col but we require long for larger matrices.
	// Since, textcell is never used for result merge (hybrid/hadoop: binaryblock, singlenode:binarycell)
	// we just propose the to exclude it with -Xjit:exclude={package.method*}(count=0,optLevel=0)
	
	FastStringTokenizer st = new FastStringTokenizer(' ');
	
	for(InputSplit split : splits)
	{
		RecordReader<LongWritable,Text> reader = informat.getRecordReader(split, job, Reporter.NULL);
		try
		{
			while(reader.next(key, value))
			{
				st.reset( value.toString() ); //reset tokenizer
				long row = st.nextLong();
			    long col = st.nextLong();
				double lvalue = Double.parseDouble( st.nextToken() );
				
				Cell tmp = new Cell( row, col, lvalue ); 
				
				buffer.addLast( tmp );
				if( buffer.size() > StagingFileUtils.CELL_BUFFER_SIZE ) //periodic flush
				{
					appendCellBufferToStagingArea(fnameStaging, ID, buffer, blen);
					buffer.clear();
				}
			}
			
			//final flush
			if( !buffer.isEmpty() )
			{
				appendCellBufferToStagingArea(fnameStaging, ID, buffer, blen);
				buffer.clear();
			}
		}
		finally {
			IOUtilFunctions.closeSilently(reader);
		}
	}
}
 
源代码13 项目: systemds   文件: DataPartitionerLocal.java
private void partitionTextCell( String fname, String fnameStaging, String fnameNew, long rlen, long clen, int blen ) 
{
	long row = -1;
	long col = -1;
	
	try 
	{
		//STEP 1: read matrix from HDFS and write blocks to local staging area
		//check and add input path
		JobConf job = new JobConf(ConfigurationManager.getCachedJobConf());
		Path path = new Path(fname);
		FileInputFormat.addInputPath(job, path);
		TextInputFormat informat = new TextInputFormat();
		informat.configure(job);
		InputSplit[] splits = informat.getSplits(job, 1);
		
		LinkedList<Cell> buffer = new LinkedList<>();
		LongWritable key = new LongWritable();
		Text value = new Text();
		FastStringTokenizer st = new FastStringTokenizer(' ');
		
		for(InputSplit split: splits)
		{
			RecordReader<LongWritable,Text> reader = informat.getRecordReader(split, job, Reporter.NULL);
			try
			{
				while(reader.next(key, value))
				{
					st.reset( value.toString() ); //reset tokenizer
					row = st.nextLong();
					col = st.nextLong();
					double lvalue = st.nextDouble();
					Cell tmp = new Cell( row, col, lvalue ); 
	
					buffer.addLast( tmp );
					if( buffer.size() > StagingFileUtils.CELL_BUFFER_SIZE ) //periodic flush
					{
						appendCellBufferToStagingArea(fnameStaging, buffer, blen);
						buffer.clear();
					}
				}
				
				//final flush
				if( !buffer.isEmpty() )
				{
					appendCellBufferToStagingArea(fnameStaging, buffer, blen);
					buffer.clear();
				}
			}
			finally {
				IOUtilFunctions.closeSilently(reader);
			}
		}

		//STEP 2: read matrix blocks from staging area and write matrix to HDFS
		String[] fnamesPartitions = new File(fnameStaging).list();	
		if(PARALLEL) 
		{
			int len = Math.min(fnamesPartitions.length, _par);
			Thread[] threads = new Thread[len];
			for( int i=0;i<len;i++ )
			{
				int start = i*(int)Math.ceil(((double)fnamesPartitions.length)/len);
				int end = (i+1)*(int)Math.ceil(((double)fnamesPartitions.length)/len)-1;
				end = Math.min(end, fnamesPartitions.length-1);
				threads[i] = new Thread(new DataPartitionerWorkerTextCell(job, fnameNew, fnameStaging, fnamesPartitions, start, end));
				threads[i].start();
			}
			
			for( Thread t : threads )
				t.join();
		}
		else
		{
			for( String pdir : fnamesPartitions  )
				writeTextCellFileToHDFS( job, fnameNew, fnameStaging+"/"+pdir );	
		}
	} 
	catch (Exception e) 
	{
		//post-mortem error handling and bounds checking
		if( row < 1 || row > rlen || col < 1 || col > clen )
		{
			throw new DMLRuntimeException("Matrix cell ["+(row)+","+(col)+"] " +
								          "out of overall matrix range [1:"+rlen+",1:"+clen+"].");
		}
		else
			throw new DMLRuntimeException("Unable to partition text cell matrix.", e);
	}
}
 
源代码14 项目: systemds   文件: ReaderTextLIBSVMParallel.java
private MatrixBlock computeLIBSVMSizeAndCreateOutputMatrixBlock(InputSplit[] splits, Path path,
		JobConf job, long rlen, long clen, long estnnz)
	throws IOException, DMLRuntimeException 
{
	int nrow = 0;
	int ncol = (int) clen;
	
	FileInputFormat.addInputPath(job, path);
	TextInputFormat informat = new TextInputFormat();
	informat.configure(job);

	// count rows in parallel per split
	try 
	{
		ExecutorService pool = CommonThreadPool.get(_numThreads);
		ArrayList<CountRowsTask> tasks = new ArrayList<>();
		for (InputSplit split : splits) {
			tasks.add(new CountRowsTask(split, informat, job));
		}
		pool.invokeAll(tasks);
		pool.shutdown();

		// collect row counts for offset computation
		// early error notify in case not all tasks successful
		_offsets = new SplitOffsetInfos(tasks.size());
		for (CountRowsTask rt : tasks) {
			if (!rt.getReturnCode())
				throw new IOException("Count task for libsvm input failed: "+ rt.getErrMsg());
			_offsets.setOffsetPerSplit(tasks.indexOf(rt), nrow);
			_offsets.setLenghtPerSplit(tasks.indexOf(rt), rt.getRowCount());
			nrow = nrow + rt.getRowCount();
		}
	} 
	catch (Exception e) {
		throw new IOException("Threadpool Error " + e.getMessage(), e);
	}
	
	//robustness for wrong dimensions which are already compiled into the plan
	if( (rlen != -1 && nrow != rlen) || (clen != -1 && ncol != clen) ) {
		String msg = "Read matrix dimensions differ from meta data: ["+nrow+"x"+ncol+"] vs. ["+rlen+"x"+clen+"].";
		if( rlen < nrow || clen < ncol ) {
			//a) specified matrix dimensions too small
			throw new DMLRuntimeException(msg);
		}
		else {
			//b) specified matrix dimensions too large -> padding and warning
			LOG.warn(msg);
			nrow = (int) rlen;
			ncol = (int) clen;
		}
	}
	
	// allocate target matrix block based on given size; 
	// need to allocate sparse as well since lock-free insert into target
	long estnnz2 = (estnnz < 0) ? (long)nrow * ncol : estnnz;
	return createOutputMatrixBlock(nrow, ncol, nrow, estnnz2, true, true);
}
 
源代码15 项目: tez   文件: TestGroupedSplits.java
@Test(timeout=10000)
public void testFormat() throws Exception {
  JobConf job = new JobConf(defaultConf);

  Random random = new Random();
  long seed = random.nextLong();
  LOG.info("seed = "+seed);
  random.setSeed(seed);

  localFs.delete(workDir, true);
  FileInputFormat.setInputPaths(job, workDir);

  final int length = 10000;
  final int numFiles = 10;

  createFiles(length, numFiles, random);

  // create a combined split for the files
  TextInputFormat wrappedFormat = new TextInputFormat();
  wrappedFormat.configure(job);
  TezGroupedSplitsInputFormat<LongWritable , Text> format = 
      new TezGroupedSplitsInputFormat<LongWritable, Text>();
  format.setConf(job);
  format.setDesiredNumberOfSplits(1);
  format.setInputFormat(wrappedFormat);
  LongWritable key = new LongWritable();
  Text value = new Text();
  for (int i = 0; i < 3; i++) {
    int numSplits = random.nextInt(length/20)+1;
    LOG.info("splitting: requesting = " + numSplits);
    InputSplit[] splits = format.getSplits(job, numSplits);
    LOG.info("splitting: got =        " + splits.length);

    // we should have a single split as the length is comfortably smaller than
    // the block size
    assertEquals("We got more than one splits!", 1, splits.length);
    InputSplit split = splits[0];
    assertEquals("It should be TezGroupedSplit",
      TezGroupedSplit.class, split.getClass());

    // check the split
    BitSet bits = new BitSet(length);
    LOG.debug("split= " + split);
    RecordReader<LongWritable, Text> reader =
      format.getRecordReader(split, job, voidReporter);
    try {
      int count = 0;
      while (reader.next(key, value)) {
        int v = Integer.parseInt(value.toString());
        LOG.debug("read " + v);
        if (bits.get(v)) {
          LOG.warn("conflict with " + v +
                   " at position "+reader.getPos());
        }
        assertFalse("Key in multiple partitions.", bits.get(v));
        bits.set(v);
        count++;
      }
      LOG.info("splits="+split+" count=" + count);
    } finally {
      reader.close();
    }
    assertEquals("Some keys in no partition.", length, bits.cardinality());
  }
}
 
源代码16 项目: systemds   文件: ResultMergeLocalFile.java
private static void mergeTextCellWithoutComp( String fnameNew, MatrixObject outMo, ArrayList<MatrixObject> inMO ) 
{
	try
	{
		//delete target file if already exists
		HDFSTool.deleteFileIfExistOnHDFS(fnameNew);
		
		if( ALLOW_COPY_CELLFILES )
		{
			copyAllFiles(fnameNew, inMO);
			return; //we're done
		}
		
		//actual merge
		JobConf job = new JobConf(ConfigurationManager.getCachedJobConf());
		Path path = new Path( fnameNew );
		FileSystem fs = IOUtilFunctions.getFileSystem(path, job);
		BufferedWriter out = new BufferedWriter(new OutputStreamWriter(fs.create(path,true)));		
		
		String valueStr = null;
		
		try
		{
			for( MatrixObject in : inMO ) //read/write all inputs
			{
				if( LOG.isTraceEnabled() )
					LOG.trace("ResultMerge (local, file): Merge input "+in.hashCode()+" (fname="
						+in.getFileName()+") via stream merge");
				
				JobConf tmpJob = new JobConf(ConfigurationManager.getCachedJobConf());
				Path tmpPath = new Path(in.getFileName());
				FileInputFormat.addInputPath(tmpJob, tmpPath);
				TextInputFormat informat = new TextInputFormat();
				informat.configure(tmpJob);
				InputSplit[] splits = informat.getSplits(tmpJob, 1);
				
				LongWritable key = new LongWritable();
				Text value = new Text();
	
				for(InputSplit split: splits)
				{
					RecordReader<LongWritable,Text> reader = informat.getRecordReader(split, tmpJob, Reporter.NULL);
					try
					{
						while(reader.next(key, value))
						{
							valueStr = value.toString().trim();	
							out.write( valueStr+"\n" );
						}
					}
					finally {
						IOUtilFunctions.closeSilently(reader);
					}
				}
			}
		}
		finally {
			IOUtilFunctions.closeSilently(out);
		}
	}
	catch(Exception ex)
	{
		throw new DMLRuntimeException("Unable to merge text cell results.", ex);
	}
}
 
源代码17 项目: systemds   文件: ReaderTextCell.java
protected void readTextCellMatrixFromHDFS( Path path, JobConf job, MatrixBlock dest, long rlen, long clen, int blen )
	throws IOException
{
	boolean sparse = dest.isInSparseFormat();
	FileInputFormat.addInputPath(job, path);
	TextInputFormat informat = new TextInputFormat();
	informat.configure(job);
	InputSplit[] splits = informat.getSplits(job, 1);
	
	LongWritable key = new LongWritable();
	Text value = new Text();
	IJV cell = new IJV();
	long nnz = 0;
	
	try
	{
		FastStringTokenizer st = new FastStringTokenizer(' ');
		
		for(InputSplit split: splits) {
			RecordReader<LongWritable,Text> reader = informat.getRecordReader(split, job, Reporter.NULL);
			try {
				if( sparse ) { //SPARSE<-value
					while( reader.next(key, value) ) {
						cell = parseCell(value.toString(), st, cell, _mmProps);
						appendCell(cell, dest, _mmProps);
					}
					dest.sortSparseRows();
				} 
				else { //DENSE<-value
					DenseBlock a = dest.getDenseBlock();
					while( reader.next(key, value) ) {
						cell = parseCell(value.toString(), st, cell, _mmProps);
						nnz += appendCell(cell, a, _mmProps);
					}
				}
			}
			finally {
				IOUtilFunctions.closeSilently(reader);
			}
		}
		
		if( !dest.isInSparseFormat() )
			dest.setNonZeros(nnz);
	}
	catch(Exception ex) {
		//post-mortem error handling and bounds checking
		if( cell.getI() < 0 || cell.getI() + 1 > rlen || cell.getJ() < 0 || cell.getJ() + 1 > clen )
			throw new IOException("Matrix cell ["+(cell.getI()+1)+","+(cell.getJ()+1)+"] "
				+ "out of overall matrix range [1:"+rlen+",1:"+clen+"].");
		else
			throw new IOException( "Unable to read matrix in text cell format.", ex );
	}
}
 
源代码18 项目: systemds   文件: ReaderTextCSVParallel.java
private MatrixBlock computeCSVSizeAndCreateOutputMatrixBlock(InputSplit[] splits, Path path,
		JobConf job, boolean hasHeader, String delim, long rlen, long clen, long estnnz)
	throws IOException, DMLRuntimeException 
{
	int nrow = 0;
	int ncol = 0;
	
	FileInputFormat.addInputPath(job, path);
	TextInputFormat informat = new TextInputFormat();
	informat.configure(job);

	// count no of entities in the first non-header row
	LongWritable key = new LongWritable();
	Text oneLine = new Text();
	RecordReader<LongWritable, Text> reader = informat
			.getRecordReader(splits[0], job, Reporter.NULL);
	try {
		if (reader.next(key, oneLine)) {
			String cellStr = oneLine.toString().trim();
			ncol = StringUtils.countMatches(cellStr, delim) + 1;
		}
	} 
	finally {
		IOUtilFunctions.closeSilently(reader);
	}

	// count rows in parallel per split
	try 
	{
		ExecutorService pool = CommonThreadPool.get(_numThreads);
		ArrayList<CountRowsTask> tasks = new ArrayList<>();
		for (InputSplit split : splits) {
			tasks.add(new CountRowsTask(split, informat, job, hasHeader));
			hasHeader = false;
		}
		List<Future<Long>> ret = pool.invokeAll(tasks);
		pool.shutdown();

		// collect row counts for offset computation
		// early error notify in case not all tasks successful
		_offsets = new SplitOffsetInfos(tasks.size());
		for (Future<Long> rc : ret) {
			int lnrow = (int)rc.get().longValue(); //incl error handling
			_offsets.setOffsetPerSplit(ret.indexOf(rc), nrow);
			_offsets.setLenghtPerSplit(ret.indexOf(rc), lnrow);
			nrow = nrow + lnrow;
		}
	} 
	catch (Exception e) {
		throw new IOException("Threadpool Error " + e.getMessage(), e);
	}
	
	//robustness for wrong dimensions which are already compiled into the plan
	if( (rlen != -1 && nrow != rlen) || (clen != -1 && ncol != clen) ) {
		String msg = "Read matrix dimensions differ from meta data: ["+nrow+"x"+ncol+"] vs. ["+rlen+"x"+clen+"].";
		if( rlen < nrow || clen < ncol ) {
			//a) specified matrix dimensions too small
			throw new DMLRuntimeException(msg);
		}
		else {
			//b) specified matrix dimensions too large -> padding and warning
			LOG.warn(msg);
			nrow = (int) rlen;
			ncol = (int) clen;
		}
	}
	
	// allocate target matrix block based on given size; 
	// need to allocate sparse as well since lock-free insert into target
	long estnnz2 = (estnnz < 0) ? (long)nrow * ncol : estnnz;
	return createOutputMatrixBlock(nrow, ncol, nrow, estnnz2, true, true);
}
 
源代码19 项目: systemds   文件: ReaderTextLIBSVMParallel.java
private MatrixBlock computeLIBSVMSizeAndCreateOutputMatrixBlock(InputSplit[] splits, Path path,
		JobConf job, long rlen, long clen, long estnnz)
	throws IOException, DMLRuntimeException 
{
	int nrow = 0;
	int ncol = (int) clen;
	
	FileInputFormat.addInputPath(job, path);
	TextInputFormat informat = new TextInputFormat();
	informat.configure(job);

	// count rows in parallel per split
	try 
	{
		ExecutorService pool = CommonThreadPool.get(_numThreads);
		ArrayList<CountRowsTask> tasks = new ArrayList<>();
		for (InputSplit split : splits) {
			tasks.add(new CountRowsTask(split, informat, job));
		}
		pool.invokeAll(tasks);
		pool.shutdown();

		// collect row counts for offset computation
		// early error notify in case not all tasks successful
		_offsets = new SplitOffsetInfos(tasks.size());
		for (CountRowsTask rt : tasks) {
			if (!rt.getReturnCode())
				throw new IOException("Count task for libsvm input failed: "+ rt.getErrMsg());
			_offsets.setOffsetPerSplit(tasks.indexOf(rt), nrow);
			_offsets.setLenghtPerSplit(tasks.indexOf(rt), rt.getRowCount());
			nrow = nrow + rt.getRowCount();
		}
	} 
	catch (Exception e) {
		throw new IOException("Threadpool Error " + e.getMessage(), e);
	}
	
	//robustness for wrong dimensions which are already compiled into the plan
	if( (rlen != -1 && nrow != rlen) || (clen != -1 && ncol != clen) ) {
		String msg = "Read matrix dimensions differ from meta data: ["+nrow+"x"+ncol+"] vs. ["+rlen+"x"+clen+"].";
		if( rlen < nrow || clen < ncol ) {
			//a) specified matrix dimensions too small
			throw new DMLRuntimeException(msg);
		}
		else {
			//b) specified matrix dimensions too large -> padding and warning
			LOG.warn(msg);
			nrow = (int) rlen;
			ncol = (int) clen;
		}
	}
	
	// allocate target matrix block based on given size; 
	// need to allocate sparse as well since lock-free insert into target
	long estnnz2 = (estnnz < 0) ? (long)nrow * ncol : estnnz;
	return createOutputMatrixBlock(nrow, ncol, nrow, estnnz2, true, true);
}
 
源代码20 项目: tez   文件: TestGroupedSplits.java
/**
 * Test using the gzip codec for reading
 */
@Test(timeout=10000)
public void testGzip() throws IOException {
  JobConf job = new JobConf(defaultConf);
  CompressionCodec gzip = new GzipCodec();
  ReflectionUtils.setConf(gzip, job);
  localFs.delete(workDir, true);
  writeFile(localFs, new Path(workDir, "part1.txt.gz"), gzip,
            "the quick\nbrown\nfox jumped\nover\n the lazy\n dog\n");
  writeFile(localFs, new Path(workDir, "part2.txt.gz"), gzip,
            "is\ngzip\n");
  writeFile(localFs, new Path(workDir, "part3.txt.gz"), gzip,
      "one\nmore\nsplit\n");
  FileInputFormat.setInputPaths(job, workDir);
  TextInputFormat wrappedFormat = new TextInputFormat();
  wrappedFormat.configure(job);
  TezGroupedSplitsInputFormat<LongWritable , Text> format = 
      new TezGroupedSplitsInputFormat<LongWritable, Text>();
  format.setConf(job);
  format.setInputFormat(wrappedFormat);
  
  // TextInputFormat will produce 3 splits
  for (int j=1; j<=3; ++j) {
    format.setDesiredNumberOfSplits(j);
    InputSplit[] splits = format.getSplits(job, 100);
    if (j==1) {
      // j==1 covers single split corner case
      // and does not do grouping
      assertEquals("compressed splits == " + j, j, splits.length);
    }
    List<Text> results = new ArrayList<Text>();
    for (int i=0; i<splits.length; ++i) { 
      List<Text> read = readSplit(format, splits[i], job);
      results.addAll(read);
    }
    assertEquals("splits length", 11, results.size());

    final String[] firstList =
      {"the quick", "brown", "fox jumped", "over", " the lazy", " dog"};
    final String[] secondList = {"is", "gzip"};
    final String[] thirdList = {"one", "more", "split"};
    String first = results.get(0).toString();
    int start = 0;
    switch (first.charAt(0)) {
    case 't':
      start = testResults(results, firstList, start);
      break;
    case 'i':
      start = testResults(results, secondList, start);
      break;
    case 'o':
      start = testResults(results, thirdList, start);
      break;
    default:
      Assert.fail("unexpected first token - " + first);
    }
  }
}