org.apache.hadoop.mapred.RecordReader#next ( )源码实例Demo

下面列出了org.apache.hadoop.mapred.RecordReader#next ( ) 实例代码,或者点击链接到github查看源代码,也可以在右侧发表评论。

源代码1 项目: systemds   文件: IOUtilFunctions.java
@Override
public Long call() throws Exception {
	RecordReader<LongWritable, Text> reader = _inputFormat.getRecordReader(_split, _jobConf, Reporter.NULL);
	LongWritable key = new LongWritable();
	Text value = new Text();
	long nrows = 0;

	try{
		// count rows from the first non-header row
		if (_hasHeader)
			reader.next(key, value);
		while (reader.next(key, value))
			nrows++;
	}
	finally {
		IOUtilFunctions.closeSilently(reader);
	}
	return nrows;
}
 
源代码2 项目: systemds   文件: ReaderTextLIBSVMParallel.java
@Override
public Object call() 
	throws Exception 
{
	RecordReader<LongWritable, Text> reader = _informat.getRecordReader(_split, _job, Reporter.NULL);
	LongWritable key = new LongWritable();
	Text oneLine = new Text();

	try {
		// count rows from the first row
		while (reader.next(key, oneLine)) {
			_nrows++;
		}
	} 
	catch (Exception e) {
		_rc = false;
		_errMsg = "RecordReader error libsvm format. split: "+ _split.toString() + e.getMessage();
		throw new IOException(_errMsg);
	} 
	finally {
		IOUtilFunctions.closeSilently(reader);
	}

	return null;
}
 
源代码3 项目: hudi   文件: TestHoodieParquetInputFormat.java
private void ensureRecordsInCommit(String msg, String commit, int expectedNumberOfRecordsInCommit,
    int totalExpected) throws IOException {
  int actualCount = 0;
  int totalCount = 0;
  InputSplit[] splits = inputFormat.getSplits(jobConf, 1);
  for (InputSplit split : splits) {
    RecordReader<NullWritable, ArrayWritable> recordReader = inputFormat.getRecordReader(split, jobConf, null);
    NullWritable key = recordReader.createKey();
    ArrayWritable writable = recordReader.createValue();

    while (recordReader.next(key, writable)) {
      // writable returns an array with [field1, field2, _hoodie_commit_time,
      // _hoodie_commit_seqno]
      // Take the commit time and compare with the one we are interested in
      if (commit.equals((writable.get()[2]).toString())) {
        actualCount++;
      }
      totalCount++;
    }
  }
  assertEquals(expectedNumberOfRecordsInCommit, actualCount, msg);
  assertEquals(totalExpected, totalCount, msg);
}
 
源代码4 项目: hadoop   文件: InputSampler.java
/**
 * From each split sampled, take the first numSamples / numSplits records.
 */
@SuppressWarnings("unchecked") // ArrayList::toArray doesn't preserve type
public K[] getSample(InputFormat<K,V> inf, JobConf job) throws IOException {
  InputSplit[] splits = inf.getSplits(job, job.getNumMapTasks());
  ArrayList<K> samples = new ArrayList<K>(numSamples);
  int splitsToSample = Math.min(maxSplitsSampled, splits.length);
  int splitStep = splits.length / splitsToSample;
  int samplesPerSplit = numSamples / splitsToSample;
  long records = 0;
  for (int i = 0; i < splitsToSample; ++i) {
    RecordReader<K,V> reader = inf.getRecordReader(splits[i * splitStep],
        job, Reporter.NULL);
    K key = reader.createKey();
    V value = reader.createValue();
    while (reader.next(key, value)) {
      samples.add(key);
      key = reader.createKey();
      ++records;
      if ((i+1) * samplesPerSplit <= records) {
        break;
      }
    }
    reader.close();
  }
  return (K[])samples.toArray();
}
 
源代码5 项目: systemds   文件: FrameReaderTextCSV.java
protected Pair<Integer,Integer> computeCSVSize( Path path, JobConf job, FileSystem fs) 
	throws IOException 
{	
	TextInputFormat informat = new TextInputFormat();
	informat.configure(job);
	InputSplit[] splits = informat.getSplits(job, 1);
	splits = IOUtilFunctions.sortInputSplits(splits);
	
	//compute number of columns
	int ncol = IOUtilFunctions.countNumColumnsCSV(splits, informat, job, _props.getDelim());
	
	//compute number of rows
	int nrow = 0;
	for( int i=0; i<splits.length; i++ ) 
	{
		RecordReader<LongWritable, Text> reader = informat.getRecordReader(splits[i], job, Reporter.NULL);
		LongWritable key = new LongWritable();
		Text value = new Text();
		
		try
		{
			//ignore header of first split
			if( i==0 && _props.hasHeader() )
				reader.next(key, value);
			
			//count remaining number of rows, ignore meta data
			while ( reader.next(key, value) ) {
				String val = value.toString();
				nrow += ( val.startsWith(TfUtils.TXMTD_MVPREFIX)
					|| val.startsWith(TfUtils.TXMTD_NDPREFIX)) ? 0 : 1; 
			}
		}
		finally {
			IOUtilFunctions.closeSilently(reader);
		}
	}
	return new Pair<>(nrow, ncol);
}
 
源代码6 项目: systemds   文件: TensorReaderTextCellParallel.java
@Override
public Object call() throws Exception {
	LongWritable key = new LongWritable();
	Text value = new Text();
	try {
		FastStringTokenizer st = new FastStringTokenizer(' ');
		
		int[] ix = new int[_dest.getNumDims()];
		RecordReader<LongWritable, Text> reader = _informat.getRecordReader(_split, _job, Reporter.NULL);
		try {
			while (reader.next(key, value)) {
				st.reset(value.toString());
				for (int i = 0; i < ix.length; i++) {
					ix[i] = st.nextInt() - 1;
				}
				_dest.set(ix, st.nextToken());
			}
		}
		finally {
			IOUtilFunctions.closeSilently(reader);
		}
	}
	catch (Exception ex) {
		throw new IOException("Unable to read tensor in text cell format.", ex);
	}
	return null;
}
 
源代码7 项目: big-c   文件: DumpTypedBytes.java
/**
 * Dump given list of files to standard output as typed bytes.
 */
@SuppressWarnings("unchecked")
private int dumpTypedBytes(List<FileStatus> files) throws IOException {
  JobConf job = new JobConf(getConf()); 
  DataOutputStream dout = new DataOutputStream(System.out);
  AutoInputFormat autoInputFormat = new AutoInputFormat();
  for (FileStatus fileStatus : files) {
    FileSplit split = new FileSplit(fileStatus.getPath(), 0,
      fileStatus.getLen() * fileStatus.getBlockSize(),
      (String[]) null);
    RecordReader recReader = null;
    try {
      recReader = autoInputFormat.getRecordReader(split, job, Reporter.NULL);
      Object key = recReader.createKey();
      Object value = recReader.createValue();
      while (recReader.next(key, value)) {
        if (key instanceof Writable) {
          TypedBytesWritableOutput.get(dout).write((Writable) key);
        } else {
          TypedBytesOutput.get(dout).write(key);
        }
        if (value instanceof Writable) {
          TypedBytesWritableOutput.get(dout).write((Writable) value);
        } else {
          TypedBytesOutput.get(dout).write(value);
        }
      }
    } finally {
      if (recReader != null) {
        recReader.close();
      }
    }
  }
  dout.flush();
  return 0;
}
 
源代码8 项目: systemds   文件: ResultMergeLocalFile.java
private static void mergeTextCellWithoutComp( String fnameNew, MatrixObject outMo, ArrayList<MatrixObject> inMO ) 
{
	try
	{
		//delete target file if already exists
		HDFSTool.deleteFileIfExistOnHDFS(fnameNew);
		
		if( ALLOW_COPY_CELLFILES )
		{
			copyAllFiles(fnameNew, inMO);
			return; //we're done
		}
		
		//actual merge
		JobConf job = new JobConf(ConfigurationManager.getCachedJobConf());
		Path path = new Path( fnameNew );
		FileSystem fs = IOUtilFunctions.getFileSystem(path, job);
		BufferedWriter out = new BufferedWriter(new OutputStreamWriter(fs.create(path,true)));		
		
		String valueStr = null;
		
		try
		{
			for( MatrixObject in : inMO ) //read/write all inputs
			{
				if( LOG.isTraceEnabled() )
					LOG.trace("ResultMerge (local, file): Merge input "+in.hashCode()+" (fname="
						+in.getFileName()+") via stream merge");
				
				JobConf tmpJob = new JobConf(ConfigurationManager.getCachedJobConf());
				Path tmpPath = new Path(in.getFileName());
				FileInputFormat.addInputPath(tmpJob, tmpPath);
				TextInputFormat informat = new TextInputFormat();
				informat.configure(tmpJob);
				InputSplit[] splits = informat.getSplits(tmpJob, 1);
				
				LongWritable key = new LongWritable();
				Text value = new Text();
	
				for(InputSplit split: splits)
				{
					RecordReader<LongWritable,Text> reader = informat.getRecordReader(split, tmpJob, Reporter.NULL);
					try
					{
						while(reader.next(key, value))
						{
							valueStr = value.toString().trim();	
							out.write( valueStr+"\n" );
						}
					}
					finally {
						IOUtilFunctions.closeSilently(reader);
					}
				}
			}
		}
		finally {
			IOUtilFunctions.closeSilently(out);
		}
	}
	catch(Exception ex)
	{
		throw new DMLRuntimeException("Unable to merge text cell results.", ex);
	}
}
 
源代码9 项目: systemds   文件: ResultMergeLocalFile.java
private static void createTextCellStagingFile( String fnameStaging, MatrixObject mo, long ID ) 
	throws IOException, DMLRuntimeException
{		
	JobConf job = new JobConf(ConfigurationManager.getCachedJobConf());
	Path path = new Path(mo.getFileName());
	FileInputFormat.addInputPath(job, path);
	TextInputFormat informat = new TextInputFormat();
	informat.configure(job);
	InputSplit[] splits = informat.getSplits(job, 1);
	
	LinkedList<Cell> buffer = new LinkedList<>();
	LongWritable key = new LongWritable();
	Text value = new Text();

	DataCharacteristics mc = mo.getDataCharacteristics();
	int blen = mc.getBlocksize(); 
	//long row = -1, col = -1; //FIXME needs reconsideration whenever textcell is used actively
	//NOTE MB: Originally, we used long row, col but this led reproducibly to JIT compilation
	// errors during runtime; experienced under WINDOWS, Intel x86-64, IBM JDK 64bit/32bit.
	// It works fine with int row, col but we require long for larger matrices.
	// Since, textcell is never used for result merge (hybrid/hadoop: binaryblock, singlenode:binarycell)
	// we just propose the to exclude it with -Xjit:exclude={package.method*}(count=0,optLevel=0)
	
	FastStringTokenizer st = new FastStringTokenizer(' ');
	
	for(InputSplit split : splits)
	{
		RecordReader<LongWritable,Text> reader = informat.getRecordReader(split, job, Reporter.NULL);
		try
		{
			while(reader.next(key, value))
			{
				st.reset( value.toString() ); //reset tokenizer
				long row = st.nextLong();
			    long col = st.nextLong();
				double lvalue = Double.parseDouble( st.nextToken() );
				
				Cell tmp = new Cell( row, col, lvalue ); 
				
				buffer.addLast( tmp );
				if( buffer.size() > StagingFileUtils.CELL_BUFFER_SIZE ) //periodic flush
				{
					appendCellBufferToStagingArea(fnameStaging, ID, buffer, blen);
					buffer.clear();
				}
			}
			
			//final flush
			if( !buffer.isEmpty() )
			{
				appendCellBufferToStagingArea(fnameStaging, ID, buffer, blen);
				buffer.clear();
			}
		}
		finally {
			IOUtilFunctions.closeSilently(reader);
		}
	}
}
 
源代码10 项目: big-c   文件: InputSampler.java
/**
 * Randomize the split order, then take the specified number of keys from
 * each split sampled, where each key is selected with the specified
 * probability and possibly replaced by a subsequently selected key when
 * the quota of keys from that split is satisfied.
 */
@SuppressWarnings("unchecked") // ArrayList::toArray doesn't preserve type
public K[] getSample(InputFormat<K,V> inf, JobConf job) throws IOException {
  InputSplit[] splits = inf.getSplits(job, job.getNumMapTasks());
  ArrayList<K> samples = new ArrayList<K>(numSamples);
  int splitsToSample = Math.min(maxSplitsSampled, splits.length);

  Random r = new Random();
  long seed = r.nextLong();
  r.setSeed(seed);
  LOG.debug("seed: " + seed);
  // shuffle splits
  for (int i = 0; i < splits.length; ++i) {
    InputSplit tmp = splits[i];
    int j = r.nextInt(splits.length);
    splits[i] = splits[j];
    splits[j] = tmp;
  }
  // our target rate is in terms of the maximum number of sample splits,
  // but we accept the possibility of sampling additional splits to hit
  // the target sample keyset
  for (int i = 0; i < splitsToSample ||
                 (i < splits.length && samples.size() < numSamples); ++i) {
    RecordReader<K,V> reader = inf.getRecordReader(splits[i], job,
        Reporter.NULL);
    K key = reader.createKey();
    V value = reader.createValue();
    while (reader.next(key, value)) {
      if (r.nextDouble() <= freq) {
        if (samples.size() < numSamples) {
          samples.add(key);
        } else {
          // When exceeding the maximum number of samples, replace a
          // random element with this one, then adjust the frequency
          // to reflect the possibility of existing elements being
          // pushed out
          int ind = r.nextInt(numSamples);
          if (ind != numSamples) {
            samples.set(ind, key);
          }
          freq *= (numSamples - 1) / (double) numSamples;
        }
        key = reader.createKey();
      }
    }
    reader.close();
  }
  return (K[])samples.toArray();
}
 
源代码11 项目: systemds   文件: ReaderTextCSVParallel.java
private MatrixBlock computeCSVSizeAndCreateOutputMatrixBlock(InputSplit[] splits, Path path,
		JobConf job, boolean hasHeader, String delim, long rlen, long clen, long estnnz)
	throws IOException, DMLRuntimeException 
{
	int nrow = 0;
	int ncol = 0;
	
	FileInputFormat.addInputPath(job, path);
	TextInputFormat informat = new TextInputFormat();
	informat.configure(job);

	// count no of entities in the first non-header row
	LongWritable key = new LongWritable();
	Text oneLine = new Text();
	RecordReader<LongWritable, Text> reader = informat
			.getRecordReader(splits[0], job, Reporter.NULL);
	try {
		if (reader.next(key, oneLine)) {
			String cellStr = oneLine.toString().trim();
			ncol = StringUtils.countMatches(cellStr, delim) + 1;
		}
	} 
	finally {
		IOUtilFunctions.closeSilently(reader);
	}

	// count rows in parallel per split
	try 
	{
		ExecutorService pool = CommonThreadPool.get(_numThreads);
		ArrayList<CountRowsTask> tasks = new ArrayList<>();
		for (InputSplit split : splits) {
			tasks.add(new CountRowsTask(split, informat, job, hasHeader));
			hasHeader = false;
		}
		List<Future<Long>> ret = pool.invokeAll(tasks);
		pool.shutdown();

		// collect row counts for offset computation
		// early error notify in case not all tasks successful
		_offsets = new SplitOffsetInfos(tasks.size());
		for (Future<Long> rc : ret) {
			int lnrow = (int)rc.get().longValue(); //incl error handling
			_offsets.setOffsetPerSplit(ret.indexOf(rc), nrow);
			_offsets.setLenghtPerSplit(ret.indexOf(rc), lnrow);
			nrow = nrow + lnrow;
		}
	} 
	catch (Exception e) {
		throw new IOException("Threadpool Error " + e.getMessage(), e);
	}
	
	//robustness for wrong dimensions which are already compiled into the plan
	if( (rlen != -1 && nrow != rlen) || (clen != -1 && ncol != clen) ) {
		String msg = "Read matrix dimensions differ from meta data: ["+nrow+"x"+ncol+"] vs. ["+rlen+"x"+clen+"].";
		if( rlen < nrow || clen < ncol ) {
			//a) specified matrix dimensions too small
			throw new DMLRuntimeException(msg);
		}
		else {
			//b) specified matrix dimensions too large -> padding and warning
			LOG.warn(msg);
			nrow = (int) rlen;
			ncol = (int) clen;
		}
	}
	
	// allocate target matrix block based on given size; 
	// need to allocate sparse as well since lock-free insert into target
	long estnnz2 = (estnnz < 0) ? (long)nrow * ncol : estnnz;
	return createOutputMatrixBlock(nrow, ncol, nrow, estnnz2, true, true);
}
 
源代码12 项目: hudi   文件: TestHoodieCombineHiveInputFormat.java
@Test
@Disabled
public void testHoodieRealtimeCombineHoodieInputFormat() throws Exception {

  Configuration conf = new Configuration();
  // initial commit
  Schema schema = HoodieAvroUtils.addMetadataFields(SchemaTestUtil.getEvolvedSchema());
  HoodieTestUtils.init(hadoopConf, tempDir.toAbsolutePath().toString(), HoodieTableType.MERGE_ON_READ);
  String commitTime = "100";
  final int numRecords = 1000;
  // Create 3 parquet files with 1000 records each
  File partitionDir = InputFormatTestUtil.prepareParquetTable(tempDir, schema, 3, numRecords, commitTime);
  InputFormatTestUtil.commit(tempDir, commitTime);

  // insert 1000 update records to log file 0
  String newCommitTime = "101";
  HoodieLogFormat.Writer writer =
      InputFormatTestUtil.writeDataBlockToLogFile(partitionDir, fs, schema, "fileid0", commitTime, newCommitTime,
          numRecords, numRecords, 0);
  writer.close();
  // insert 1000 update records to log file 1
  writer =
      InputFormatTestUtil.writeDataBlockToLogFile(partitionDir, fs, schema, "fileid1", commitTime, newCommitTime,
          numRecords, numRecords, 0);
  writer.close();
  // insert 1000 update records to log file 2
  writer =
      InputFormatTestUtil.writeDataBlockToLogFile(partitionDir, fs, schema, "fileid2", commitTime, newCommitTime,
          numRecords, numRecords, 0);
  writer.close();

  TableDesc tblDesc = Utilities.defaultTd;
  // Set the input format
  tblDesc.setInputFileFormatClass(HoodieCombineHiveInputFormat.class);
  PartitionDesc partDesc = new PartitionDesc(tblDesc, null);
  LinkedHashMap<Path, PartitionDesc> pt = new LinkedHashMap<>();
  pt.put(new Path(tempDir.toAbsolutePath().toString()), partDesc);
  MapredWork mrwork = new MapredWork();
  mrwork.getMapWork().setPathToPartitionInfo(pt);
  Path mapWorkPath = new Path(tempDir.toAbsolutePath().toString());
  Utilities.setMapRedWork(conf, mrwork, mapWorkPath);
  jobConf = new JobConf(conf);
  // Add the paths
  FileInputFormat.setInputPaths(jobConf, partitionDir.getPath());
  jobConf.set(HAS_MAP_WORK, "true");
  // The following config tells Hive to choose ExecMapper to read the MAP_WORK
  jobConf.set(MAPRED_MAPPER_CLASS, ExecMapper.class.getName());
  // setting the split size to be 3 to create one split for 3 file groups
  jobConf.set(org.apache.hadoop.mapreduce.lib.input.FileInputFormat.SPLIT_MAXSIZE, "3");

  HoodieCombineHiveInputFormat combineHiveInputFormat = new HoodieCombineHiveInputFormat();
  String tripsHiveColumnTypes = "double,string,string,string,double,double,double,double,double";
  InputFormatTestUtil.setPropsForInputFormat(jobConf, schema, tripsHiveColumnTypes);
  InputSplit[] splits = combineHiveInputFormat.getSplits(jobConf, 1);
  // Since the SPLIT_SIZE is 3, we should create only 1 split with all 3 file groups
  assertEquals(1, splits.length);
  RecordReader<NullWritable, ArrayWritable> recordReader =
      combineHiveInputFormat.getRecordReader(splits[0], jobConf, null);
  NullWritable nullWritable = recordReader.createKey();
  ArrayWritable arrayWritable = recordReader.createValue();
  int counter = 0;
  while (recordReader.next(nullWritable, arrayWritable)) {
    // read over all the splits
    counter++;
  }
  // should read out 3 splits, each for file0, file1, file2 containing 1000 records each
  assertEquals(3000, counter);
}
 
源代码13 项目: big-c   文件: MultithreadedMapRunner.java
public void run(RecordReader<K1, V1> input, OutputCollector<K2, V2> output,
                Reporter reporter)
  throws IOException {
  try {
    // allocate key & value instances these objects will not be reused
    // because execution of Mapper.map is not serialized.
    K1 key = input.createKey();
    V1 value = input.createValue();

    while (input.next(key, value)) {

      executorService.execute(new MapperInvokeRunable(key, value, output,
                              reporter));

      checkForExceptionsFromProcessingThreads();

      // Allocate new key & value instances as mapper is running in parallel
      key = input.createKey();
      value = input.createValue();
    }

    if (LOG.isDebugEnabled()) {
      LOG.debug("Finished dispatching all Mappper.map calls, job "
                + job.getJobName());
    }

    // Graceful shutdown of the Threadpool, it will let all scheduled
    // Runnables to end.
    executorService.shutdown();

    try {

      // Now waiting for all Runnables to end.
      while (!executorService.awaitTermination(100, TimeUnit.MILLISECONDS)) {
        if (LOG.isDebugEnabled()) {
          LOG.debug("Awaiting all running Mappper.map calls to finish, job "
                    + job.getJobName());
        }

        // NOTE: while Mapper.map dispatching has concluded there are still
        // map calls in progress and exceptions would be thrown.
        checkForExceptionsFromProcessingThreads();

      }

      // NOTE: it could be that a map call has had an exception after the
      // call for awaitTermination() returing true. And edge case but it
      // could happen.
      checkForExceptionsFromProcessingThreads();

    } catch (IOException ioEx) {
      // Forcing a shutdown of all thread of the threadpool and rethrowing
      // the IOException
      executorService.shutdownNow();
      throw ioEx;
    } catch (InterruptedException iEx) {
      throw new RuntimeException(iEx);
    }

  } finally {
    mapper.close();
  }
}
 
源代码14 项目: dremio-oss   文件: HiveTextReader.java
@Override
public int populateData() throws IOException, SerDeException {
  final SkipRecordsInspector skipRecordsInspector = this.skipRecordsInspector;
  final RecordReader<Object, Object> reader = this.reader;
  final Converter partTblObjectInspectorConverter = this.partTblObjectInspectorConverter;
  final Object key = this.key;

  final int numRowsPerBatch = (int) this.numRowsPerBatch;

  final StructField[] selectedStructFieldRefs = this.selectedStructFieldRefs;
  final SerDe partitionSerDe = this.partitionSerDe;
  final StructObjectInspector finalOI = this.finalOI;
  final ObjectInspector[] selectedColumnObjInspectors = this.selectedColumnObjInspectors;
  final HiveFieldConverter[] selectedColumnFieldConverters = this.selectedColumnFieldConverters;
  final ValueVector[] vectors = this.vectors;

  skipRecordsInspector.reset();
  Object value;

  int recordCount = 0;

  while (recordCount < numRowsPerBatch) {
    try (OperatorStats.WaitRecorder recorder = OperatorStats.getWaitRecorder(this.context.getStats())) {
      boolean hasNext = reader.next(key, value = skipRecordsInspector.getNextValue());
      if (!hasNext) {
        break;
      }
    }
    catch(FSError e) {
      throw HadoopFileSystemWrapper.propagateFSError(e);
    }
    if (skipRecordsInspector.doSkipHeader(recordCount++)) {
      continue;
    }
    Object bufferedValue = skipRecordsInspector.bufferAdd(value);
    if (bufferedValue != null) {
      Object deSerializedValue = partitionSerDe.deserialize((Writable) bufferedValue);
      if (partTblObjectInspectorConverter != null) {
        deSerializedValue = partTblObjectInspectorConverter.convert(deSerializedValue);
      }

      for (int i = 0; i < selectedStructFieldRefs.length; i++) {
        Object hiveValue = finalOI.getStructFieldData(deSerializedValue, selectedStructFieldRefs[i]);
        if (hiveValue != null) {
          selectedColumnFieldConverters[i].setSafeValue(selectedColumnObjInspectors[i], hiveValue, vectors[i], skipRecordsInspector.getActualCount());
        }
      }
      skipRecordsInspector.incrementActualCount();
    }
    skipRecordsInspector.incrementTempCount();
  }
  for (int i = 0; i < selectedStructFieldRefs.length; i++) {
    vectors[i].setValueCount(skipRecordsInspector.getActualCount());
  }

  skipRecordsInspector.updateContinuance();
  return skipRecordsInspector.getActualCount();
}
 
源代码15 项目: systemds   文件: FrameReaderTextCSV.java
protected final int readCSVFrameFromInputSplit( InputSplit split, InputFormat<LongWritable,Text> informat, JobConf job, 
		FrameBlock dest, ValueType[] schema, String[] names, long rlen, long clen, int rl, boolean first)
	throws IOException
{
	boolean hasHeader = _props.hasHeader();
	boolean isFill = _props.isFill();
	double dfillValue = _props.getFillValue();
	String sfillValue = String.valueOf(_props.getFillValue());
	String delim = _props.getDelim();
	
	//create record reader
	RecordReader<LongWritable, Text> reader = informat.getRecordReader(split, job, Reporter.NULL);
	LongWritable key = new LongWritable();
	Text value = new Text();
	int row = rl;
	int col = -1;
	
	//handle header if existing
	if(first && hasHeader ) {
		reader.next(key, value); //read header
		dest.setColumnNames(value.toString().split(delim));
	}
		
	// Read the data
	boolean emptyValuesFound = false;
	try
	{
		while( reader.next(key, value) ) //foreach line
		{
			String cellStr = value.toString().trim();
			emptyValuesFound = false; col = 0;
			String[] parts = IOUtilFunctions.splitCSV(cellStr, delim);
			
			//parse frame meta data (missing values / num distinct)
			if( parts[0].equals(TfUtils.TXMTD_MVPREFIX) || parts[0].equals(TfUtils.TXMTD_NDPREFIX) ) {
				if( parts[0].equals(TfUtils.TXMTD_MVPREFIX) )
					for( int j=0; j<dest.getNumColumns(); j++ )
						dest.getColumnMetadata(j).setMvValue(parts[j+1]);
				else if( parts[0].equals(TfUtils.TXMTD_NDPREFIX) )
					for( int j=0; j<dest.getNumColumns(); j++ )
						dest.getColumnMetadata(j).setNumDistinct(Long.parseLong(parts[j+1]));
				continue;
			}
			
			for( String part : parts ) //foreach cell
			{
				part = part.trim();
				if ( part.isEmpty() ) {
					if( isFill && dfillValue!=0 )
						dest.set(row, col, UtilFunctions.stringToObject(schema[col], sfillValue));
					emptyValuesFound = true;
				}
				else {
					dest.set(row, col, UtilFunctions.stringToObject(schema[col], part));
				}
				col++;
			}
			
			//sanity checks for empty values and number of columns
			IOUtilFunctions.checkAndRaiseErrorCSVEmptyField(cellStr, isFill, emptyValuesFound);
			IOUtilFunctions.checkAndRaiseErrorCSVNumColumns("", cellStr, parts, clen);
			row++;
		}
	}
	finally {
		IOUtilFunctions.closeSilently(reader);
	}
	
	return row;
}
 
源代码16 项目: systemds   文件: ReaderTextCell.java
protected void readTextCellMatrixFromHDFS( Path path, JobConf job, MatrixBlock dest, long rlen, long clen, int blen )
	throws IOException
{
	boolean sparse = dest.isInSparseFormat();
	FileInputFormat.addInputPath(job, path);
	TextInputFormat informat = new TextInputFormat();
	informat.configure(job);
	InputSplit[] splits = informat.getSplits(job, 1);
	
	LongWritable key = new LongWritable();
	Text value = new Text();
	IJV cell = new IJV();
	long nnz = 0;
	
	try
	{
		FastStringTokenizer st = new FastStringTokenizer(' ');
		
		for(InputSplit split: splits) {
			RecordReader<LongWritable,Text> reader = informat.getRecordReader(split, job, Reporter.NULL);
			try {
				if( sparse ) { //SPARSE<-value
					while( reader.next(key, value) ) {
						cell = parseCell(value.toString(), st, cell, _mmProps);
						appendCell(cell, dest, _mmProps);
					}
					dest.sortSparseRows();
				} 
				else { //DENSE<-value
					DenseBlock a = dest.getDenseBlock();
					while( reader.next(key, value) ) {
						cell = parseCell(value.toString(), st, cell, _mmProps);
						nnz += appendCell(cell, a, _mmProps);
					}
				}
			}
			finally {
				IOUtilFunctions.closeSilently(reader);
			}
		}
		
		if( !dest.isInSparseFormat() )
			dest.setNonZeros(nnz);
	}
	catch(Exception ex) {
		//post-mortem error handling and bounds checking
		if( cell.getI() < 0 || cell.getI() + 1 > rlen || cell.getJ() < 0 || cell.getJ() + 1 > clen )
			throw new IOException("Matrix cell ["+(cell.getI()+1)+","+(cell.getJ()+1)+"] "
				+ "out of overall matrix range [1:"+rlen+",1:"+clen+"].");
		else
			throw new IOException( "Unable to read matrix in text cell format.", ex );
	}
}
 
源代码17 项目: gemfirexd-oss   文件: EventInputFormatTest.java
public void testEventInputFormat() throws Exception {
  getConnection();
  Connection conn = startNetserverAndGetLocalNetConnection();
  
  Statement st = conn.createStatement();
  st.execute("create hdfsstore myhdfs namenode 'localhost' homedir '" + HDFS_DIR + "' batchtimeinterval 5000 milliseconds");
  st.execute("create table app.mytab1 (col1 int primary key, col2 varchar(100)) persistent hdfsstore (myhdfs) BUCKETS 1");

  PreparedStatement ps = conn.prepareStatement("insert into mytab1 values (?, ?)");
  int NUM_ENTRIES = 20;
  for(int i = 0; i < NUM_ENTRIES; i++) {
    ps.setInt(1, i);
    ps.setString(2, "Value-" + System.nanoTime());
    ps.execute();
  }
  //Wait for data to get to HDFS...
  String qname = HDFSStoreFactoryImpl.getEventQueueName("/APP/MYTAB1");
  st.execute("CALL SYS.WAIT_FOR_SENDER_QUEUE_FLUSH('" + qname + "', 1, 0)");
  
  Configuration conf = new Configuration();
  FileSystem fs = FileSystem.get(conf);
  FileStatus[] list = fs.listStatus(new Path(HDFS_DIR + "/APP_MYTAB1/0/"));
  assertEquals(1, list.length);
  
  conf.set(RowInputFormat.INPUT_TABLE, "MYTAB1");
  conf.set(RowInputFormat.HOME_DIR, HDFS_DIR);
  
  JobConf job = new JobConf(conf);
  job.setBoolean(RowInputFormat.CHECKPOINT_MODE, false);
  RowInputFormat ipformat = new RowInputFormat();
  InputSplit[] splits = ipformat.getSplits(job, 2);
  assertEquals(1, splits.length);
  CombineFileSplit split = (CombineFileSplit) splits[0];
  assertEquals(1, split.getPaths().length);
  assertEquals(list[0].getPath().toString(), split.getPath(0).toString());
  assertEquals(0, split.getOffset(0));
  assertEquals(list[0].getLen(), split.getLength(0));

  RecordReader<Key, Row> rr = ipformat.getRecordReader(split, job, null);
  Key key = rr.createKey();
  Row value = rr.createValue();

  int count = 0;
  while (rr.next(key, value)) {
    assertEquals(count++, value.getRowAsResultSet().getInt("col1"));
  }
  
  assertEquals(20, count);
  
  TestUtil.shutDown();
}
 
源代码18 项目: systemds   文件: ReaderTextLIBSVMParallel.java
@Override
public Object call() 
	throws Exception 
{
	long lnnz = 0;
	
	try 
	{
		RecordReader<LongWritable, Text> reader = _informat.getRecordReader(_split, _job, Reporter.NULL);
		LongWritable key = new LongWritable();
		Text value = new Text();
		SparseRowVector vect = new SparseRowVector(1024);
		
		int row = _splitoffsets.getOffsetPerSplit(_splitCount);

		try {
			while (reader.next(key, value)) { // foreach line
				String rowStr = value.toString().trim();
				lnnz += ReaderTextLIBSVM.parseLibsvmRow(rowStr, vect, (int)_clen);
				_dest.appendRow(row, vect);
				row++;
			}

			// sanity checks (number of rows)
			if (row != (_splitoffsets.getOffsetPerSplit(_splitCount) + _splitoffsets.getLenghtPerSplit(_splitCount)) ) {
				throw new IOException("Incorrect number of rows ("+ row+ ") found in delimited file ("
					+ (_splitoffsets.getOffsetPerSplit(_splitCount) 
					+ _splitoffsets.getLenghtPerSplit(_splitCount))+ "): " + value);
			}
		} 
		finally {
			IOUtilFunctions.closeSilently(reader);
		}
	} 
	catch (Exception ex) {
		// central error handling (return code, message)
		_rc = false;
		_exception = ex;
	}

	//post processing
	_nnz = lnnz;
	return null;
}
 
源代码19 项目: hadoop   文件: PipesMapRunner.java
/**
 * Run the map task.
 * @param input the set of inputs
 * @param output the object to collect the outputs of the map
 * @param reporter the object to update with status
 */
@SuppressWarnings("unchecked")
public void run(RecordReader<K1, V1> input, OutputCollector<K2, V2> output,
                Reporter reporter) throws IOException {
  Application<K1, V1, K2, V2> application = null;
  try {
    RecordReader<FloatWritable, NullWritable> fakeInput = 
      (!Submitter.getIsJavaRecordReader(job) && 
       !Submitter.getIsJavaMapper(job)) ? 
 (RecordReader<FloatWritable, NullWritable>) input : null;
    application = new Application<K1, V1, K2, V2>(job, fakeInput, output, 
                                                  reporter,
        (Class<? extends K2>) job.getOutputKeyClass(), 
        (Class<? extends V2>) job.getOutputValueClass());
  } catch (InterruptedException ie) {
    throw new RuntimeException("interrupted", ie);
  }
  DownwardProtocol<K1, V1> downlink = application.getDownlink();
  boolean isJavaInput = Submitter.getIsJavaRecordReader(job);
  downlink.runMap(reporter.getInputSplit(), 
                  job.getNumReduceTasks(), isJavaInput);
  boolean skipping = job.getBoolean(MRJobConfig.SKIP_RECORDS, false);
  try {
    if (isJavaInput) {
      // allocate key & value instances that are re-used for all entries
      K1 key = input.createKey();
      V1 value = input.createValue();
      downlink.setInputTypes(key.getClass().getName(),
                             value.getClass().getName());
      
      while (input.next(key, value)) {
        // map pair to output
        downlink.mapItem(key, value);
        if(skipping) {
          //flush the streams on every record input if running in skip mode
          //so that we don't buffer other records surrounding a bad record.
          downlink.flush();
        }
      }
      downlink.endOfInput();
    }
    application.waitForFinish();
  } catch (Throwable t) {
    application.abort(t);
  } finally {
    application.cleanup();
  }
}
 
源代码20 项目: hadoop   文件: MultithreadedMapRunner.java
public void run(RecordReader<K1, V1> input, OutputCollector<K2, V2> output,
                Reporter reporter)
  throws IOException {
  try {
    // allocate key & value instances these objects will not be reused
    // because execution of Mapper.map is not serialized.
    K1 key = input.createKey();
    V1 value = input.createValue();

    while (input.next(key, value)) {

      executorService.execute(new MapperInvokeRunable(key, value, output,
                              reporter));

      checkForExceptionsFromProcessingThreads();

      // Allocate new key & value instances as mapper is running in parallel
      key = input.createKey();
      value = input.createValue();
    }

    if (LOG.isDebugEnabled()) {
      LOG.debug("Finished dispatching all Mappper.map calls, job "
                + job.getJobName());
    }

    // Graceful shutdown of the Threadpool, it will let all scheduled
    // Runnables to end.
    executorService.shutdown();

    try {

      // Now waiting for all Runnables to end.
      while (!executorService.awaitTermination(100, TimeUnit.MILLISECONDS)) {
        if (LOG.isDebugEnabled()) {
          LOG.debug("Awaiting all running Mappper.map calls to finish, job "
                    + job.getJobName());
        }

        // NOTE: while Mapper.map dispatching has concluded there are still
        // map calls in progress and exceptions would be thrown.
        checkForExceptionsFromProcessingThreads();

      }

      // NOTE: it could be that a map call has had an exception after the
      // call for awaitTermination() returing true. And edge case but it
      // could happen.
      checkForExceptionsFromProcessingThreads();

    } catch (IOException ioEx) {
      // Forcing a shutdown of all thread of the threadpool and rethrowing
      // the IOException
      executorService.shutdownNow();
      throw ioEx;
    } catch (InterruptedException iEx) {
      throw new RuntimeException(iEx);
    }

  } finally {
    mapper.close();
  }
}