org.apache.hadoop.mapred.InputFormat#getRecordReader ( )源码实例Demo

下面列出了org.apache.hadoop.mapred.InputFormat#getRecordReader ( ) 实例代码,或者点击链接到github查看源代码,也可以在右侧发表评论。

源代码1 项目: hadoop-gpu   文件: InputSampler.java
/**
 * For each split sampled, emit when the ratio of the number of records
 * retained to the total record count is less than the specified
 * frequency.
 */
@SuppressWarnings("unchecked") // ArrayList::toArray doesn't preserve type
public K[] getSample(InputFormat<K,V> inf, JobConf job) throws IOException {
  InputSplit[] splits = inf.getSplits(job, job.getNumMapTasks());
  ArrayList<K> samples = new ArrayList<K>();
  int splitsToSample = Math.min(maxSplitsSampled, splits.length);
  int splitStep = splits.length / splitsToSample;
  long records = 0;
  long kept = 0;
  for (int i = 0; i < splitsToSample; ++i) {
    RecordReader<K,V> reader = inf.getRecordReader(splits[i * splitStep],
        job, Reporter.NULL);
    K key = reader.createKey();
    V value = reader.createValue();
    while (reader.next(key, value)) {
      ++records;
      if ((double) kept / records < freq) {
        ++kept;
        samples.add(key);
        key = reader.createKey();
      }
    }
    reader.close();
  }
  return (K[])samples.toArray();
}
 
源代码2 项目: hadoop   文件: InputSampler.java
/**
 * From each split sampled, take the first numSamples / numSplits records.
 */
@SuppressWarnings("unchecked") // ArrayList::toArray doesn't preserve type
public K[] getSample(InputFormat<K,V> inf, JobConf job) throws IOException {
  InputSplit[] splits = inf.getSplits(job, job.getNumMapTasks());
  ArrayList<K> samples = new ArrayList<K>(numSamples);
  int splitsToSample = Math.min(maxSplitsSampled, splits.length);
  int splitStep = splits.length / splitsToSample;
  int samplesPerSplit = numSamples / splitsToSample;
  long records = 0;
  for (int i = 0; i < splitsToSample; ++i) {
    RecordReader<K,V> reader = inf.getRecordReader(splits[i * splitStep],
        job, Reporter.NULL);
    K key = reader.createKey();
    V value = reader.createValue();
    while (reader.next(key, value)) {
      samples.add(key);
      key = reader.createKey();
      ++records;
      if ((i+1) * samplesPerSplit <= records) {
        break;
      }
    }
    reader.close();
  }
  return (K[])samples.toArray();
}
 
源代码3 项目: hadoop   文件: InputSampler.java
/**
 * For each split sampled, emit when the ratio of the number of records
 * retained to the total record count is less than the specified
 * frequency.
 */
@SuppressWarnings("unchecked") // ArrayList::toArray doesn't preserve type
public K[] getSample(InputFormat<K,V> inf, JobConf job) throws IOException {
  InputSplit[] splits = inf.getSplits(job, job.getNumMapTasks());
  ArrayList<K> samples = new ArrayList<K>();
  int splitsToSample = Math.min(maxSplitsSampled, splits.length);
  int splitStep = splits.length / splitsToSample;
  long records = 0;
  long kept = 0;
  for (int i = 0; i < splitsToSample; ++i) {
    RecordReader<K,V> reader = inf.getRecordReader(splits[i * splitStep],
        job, Reporter.NULL);
    K key = reader.createKey();
    V value = reader.createValue();
    while (reader.next(key, value)) {
      ++records;
      if ((double) kept / records < freq) {
        ++kept;
        samples.add(key);
        key = reader.createKey();
      }
    }
    reader.close();
  }
  return (K[])samples.toArray();
}
 
源代码4 项目: big-c   文件: InputSampler.java
/**
 * From each split sampled, take the first numSamples / numSplits records.
 */
@SuppressWarnings("unchecked") // ArrayList::toArray doesn't preserve type
public K[] getSample(InputFormat<K,V> inf, JobConf job) throws IOException {
  InputSplit[] splits = inf.getSplits(job, job.getNumMapTasks());
  ArrayList<K> samples = new ArrayList<K>(numSamples);
  int splitsToSample = Math.min(maxSplitsSampled, splits.length);
  int splitStep = splits.length / splitsToSample;
  int samplesPerSplit = numSamples / splitsToSample;
  long records = 0;
  for (int i = 0; i < splitsToSample; ++i) {
    RecordReader<K,V> reader = inf.getRecordReader(splits[i * splitStep],
        job, Reporter.NULL);
    K key = reader.createKey();
    V value = reader.createValue();
    while (reader.next(key, value)) {
      samples.add(key);
      key = reader.createKey();
      ++records;
      if ((i+1) * samplesPerSplit <= records) {
        break;
      }
    }
    reader.close();
  }
  return (K[])samples.toArray();
}
 
源代码5 项目: big-c   文件: InputSampler.java
/**
 * For each split sampled, emit when the ratio of the number of records
 * retained to the total record count is less than the specified
 * frequency.
 */
@SuppressWarnings("unchecked") // ArrayList::toArray doesn't preserve type
public K[] getSample(InputFormat<K,V> inf, JobConf job) throws IOException {
  InputSplit[] splits = inf.getSplits(job, job.getNumMapTasks());
  ArrayList<K> samples = new ArrayList<K>();
  int splitsToSample = Math.min(maxSplitsSampled, splits.length);
  int splitStep = splits.length / splitsToSample;
  long records = 0;
  long kept = 0;
  for (int i = 0; i < splitsToSample; ++i) {
    RecordReader<K,V> reader = inf.getRecordReader(splits[i * splitStep],
        job, Reporter.NULL);
    K key = reader.createKey();
    V value = reader.createValue();
    while (reader.next(key, value)) {
      ++records;
      if ((double) kept / records < freq) {
        ++kept;
        samples.add(key);
        key = reader.createKey();
      }
    }
    reader.close();
  }
  return (K[])samples.toArray();
}
 
源代码6 项目: hadoop-gpu   文件: InputSampler.java
/**
 * From each split sampled, take the first numSamples / numSplits records.
 */
@SuppressWarnings("unchecked") // ArrayList::toArray doesn't preserve type
public K[] getSample(InputFormat<K,V> inf, JobConf job) throws IOException {
  InputSplit[] splits = inf.getSplits(job, job.getNumMapTasks());
  ArrayList<K> samples = new ArrayList<K>(numSamples);
  int splitsToSample = Math.min(maxSplitsSampled, splits.length);
  int splitStep = splits.length / splitsToSample;
  int samplesPerSplit = numSamples / splitsToSample;
  long records = 0;
  for (int i = 0; i < splitsToSample; ++i) {
    RecordReader<K,V> reader = inf.getRecordReader(splits[i * splitStep],
        job, Reporter.NULL);
    K key = reader.createKey();
    V value = reader.createValue();
    while (reader.next(key, value)) {
      samples.add(key);
      key = reader.createKey();
      ++records;
      if ((i+1) * samplesPerSplit <= records) {
        break;
      }
    }
    reader.close();
  }
  return (K[])samples.toArray();
}
 
源代码7 项目: RDFS   文件: InputSampler.java
/**
 * From each split sampled, take the first numSamples / numSplits records.
 */
@SuppressWarnings("unchecked") // ArrayList::toArray doesn't preserve type
public K[] getSample(InputFormat<K,V> inf, JobConf job) throws IOException {
  InputSplit[] splits = inf.getSplits(job, job.getNumMapTasks());
  ArrayList<K> samples = new ArrayList<K>(numSamples);
  int splitsToSample = Math.min(maxSplitsSampled, splits.length);
  int splitStep = splits.length / splitsToSample;
  int samplesPerSplit = numSamples / splitsToSample;
  long records = 0;
  for (int i = 0; i < splitsToSample; ++i) {
    RecordReader<K,V> reader = inf.getRecordReader(splits[i * splitStep],
        job, Reporter.NULL);
    K key = reader.createKey();
    V value = reader.createValue();
    while (reader.next(key, value)) {
      samples.add(key);
      key = reader.createKey();
      ++records;
      if ((i+1) * samplesPerSplit <= records) {
        break;
      }
    }
    reader.close();
  }
  return (K[])samples.toArray();
}
 
源代码8 项目: RDFS   文件: InputSampler.java
/**
 * For each split sampled, emit when the ratio of the number of records
 * retained to the total record count is less than the specified
 * frequency.
 */
@SuppressWarnings("unchecked") // ArrayList::toArray doesn't preserve type
public K[] getSample(InputFormat<K,V> inf, JobConf job) throws IOException {
  InputSplit[] splits = inf.getSplits(job, job.getNumMapTasks());
  ArrayList<K> samples = new ArrayList<K>();
  int splitsToSample = Math.min(maxSplitsSampled, splits.length);
  int splitStep = splits.length / splitsToSample;
  long records = 0;
  long kept = 0;
  for (int i = 0; i < splitsToSample; ++i) {
    RecordReader<K,V> reader = inf.getRecordReader(splits[i * splitStep],
        job, Reporter.NULL);
    K key = reader.createKey();
    V value = reader.createValue();
    while (reader.next(key, value)) {
      ++records;
      if ((double) kept / records < freq) {
        ++kept;
        samples.add(key);
        key = reader.createKey();
      }
    }
    reader.close();
  }
  return (K[])samples.toArray();
}
 
源代码9 项目: systemds   文件: IOUtilFunctions.java
/**
 * Counts the number of columns in a given collection of csv file splits. This primitive aborts 
 * if a row with more than 0 columns is found and hence is robust against empty file splits etc.
 * 
 * @param splits input splits
 * @param informat input format
 * @param job job configruation
 * @param delim delimiter
 * @return the number of columns in the collection of csv file splits
 * @throws IOException if IOException occurs
 */
@SuppressWarnings({ "rawtypes", "unchecked" })
public static int countNumColumnsCSV(InputSplit[] splits, InputFormat informat, JobConf job, String delim ) 
	throws IOException 
{
	LongWritable key = new LongWritable();
	Text value = new Text();
	int ncol = -1; 
	for( int i=0; i<splits.length && ncol<=0; i++ ) {
		RecordReader<LongWritable, Text> reader = 
				informat.getRecordReader(splits[i], job, Reporter.NULL);
		try {
			if( reader.next(key, value) ) {
				boolean hasValue = true;
				if( value.toString().startsWith(TfUtils.TXMTD_MVPREFIX) )
					hasValue = reader.next(key, value);
				if( value.toString().startsWith(TfUtils.TXMTD_NDPREFIX) )
					hasValue = reader.next(key, value);
				String row = value.toString().trim();
				if( hasValue && !row.isEmpty() ) {
					ncol = IOUtilFunctions.countTokensCSV(row, delim);
				}
			}
		}
		finally {
			closeSilently(reader);	
		}
	}
	return ncol;
}
 
源代码10 项目: hadoop   文件: DelegatingInputFormat.java
@SuppressWarnings("unchecked")
public RecordReader<K, V> getRecordReader(InputSplit split, JobConf conf,
    Reporter reporter) throws IOException {

  // Find the InputFormat and then the RecordReader from the
  // TaggedInputSplit.

  TaggedInputSplit taggedInputSplit = (TaggedInputSplit) split;
  InputFormat<K, V> inputFormat = (InputFormat<K, V>) ReflectionUtils
     .newInstance(taggedInputSplit.getInputFormatClass(), conf);
  return inputFormat.getRecordReader(taggedInputSplit.getInputSplit(), conf,
     reporter);
}
 
源代码11 项目: big-c   文件: DelegatingInputFormat.java
@SuppressWarnings("unchecked")
public RecordReader<K, V> getRecordReader(InputSplit split, JobConf conf,
    Reporter reporter) throws IOException {

  // Find the InputFormat and then the RecordReader from the
  // TaggedInputSplit.

  TaggedInputSplit taggedInputSplit = (TaggedInputSplit) split;
  InputFormat<K, V> inputFormat = (InputFormat<K, V>) ReflectionUtils
     .newInstance(taggedInputSplit.getInputFormatClass(), conf);
  return inputFormat.getRecordReader(taggedInputSplit.getInputSplit(), conf,
     reporter);
}
 
源代码12 项目: tez   文件: TestGroupedSplits.java
private static List<Text> readSplit(InputFormat<LongWritable,Text> format,
                                    InputSplit split,
                                    JobConf job) throws IOException {
  List<Text> result = new ArrayList<Text>();
  RecordReader<LongWritable, Text> reader =
    format.getRecordReader(split, job, voidReporter);
  LongWritable key = reader.createKey();
  Text value = reader.createValue();
  while (reader.next(key, value)) {
    result.add(value);
    value = reader.createValue();
  }
  reader.close();
  return result;
}
 
源代码13 项目: hadoop-gpu   文件: DelegatingInputFormat.java
@SuppressWarnings("unchecked")
public RecordReader<K, V> getRecordReader(InputSplit split, JobConf conf,
    Reporter reporter) throws IOException {

  // Find the InputFormat and then the RecordReader from the
  // TaggedInputSplit.

  TaggedInputSplit taggedInputSplit = (TaggedInputSplit) split;
  InputFormat<K, V> inputFormat = (InputFormat<K, V>) ReflectionUtils
     .newInstance(taggedInputSplit.getInputFormatClass(), conf);
  return inputFormat.getRecordReader(taggedInputSplit.getInputSplit(), conf,
     reporter);
}
 
源代码14 项目: hive-dwrf   文件: TestInputOutputFormat.java
@Test
public void testEmptyFile() throws Exception {
  JobConf job = new JobConf(conf);
  Properties properties = new Properties();
  HiveOutputFormat<?, ?> outFormat = new OrcOutputFormat();
  FileSinkOperator.RecordWriter writer =
      outFormat.getHiveRecordWriter(conf, testFilePath, MyRow.class, true,
          properties, Reporter.NULL);
  writer.close(true);
  properties.setProperty("columns", "x,y");
  properties.setProperty("columns.types", "int:int");
  SerDe serde = new OrcSerde();
  serde.initialize(conf, properties);
  InputFormat<?,?> in = new OrcInputFormat();
  FileInputFormat.setInputPaths(conf, testFilePath.toString());
  InputSplit[] splits = in.getSplits(conf, 1);
  assertEquals(1, splits.length);

  // read the whole file
  conf.set("hive.io.file.readcolumn.ids", "0,1");
  org.apache.hadoop.mapred.RecordReader reader =
      in.getRecordReader(splits[0], conf, Reporter.NULL);
  Object key = reader.createKey();
  Object value = reader.createValue();
  assertEquals(0.0, reader.getProgress(), 0.00001);
  assertEquals(0, reader.getPos());
  assertEquals(false, reader.next(key, value));
  reader.close();
  assertEquals(null, serde.getSerDeStats());
}
 
源代码15 项目: RDFS   文件: DelegatingInputFormat.java
@SuppressWarnings("unchecked")
public RecordReader<K, V> getRecordReader(InputSplit split, JobConf conf,
    Reporter reporter) throws IOException {

  // Find the InputFormat and then the RecordReader from the
  // TaggedInputSplit.

  TaggedInputSplit taggedInputSplit = (TaggedInputSplit) split;
  InputFormat<K, V> inputFormat = (InputFormat<K, V>) ReflectionUtils
     .newInstance(taggedInputSplit.getInputFormatClass(), conf);
  return inputFormat.getRecordReader(taggedInputSplit.getInputSplit(), conf,
     reporter);
}
 
源代码16 项目: presto   文件: HiveUtil.java
public static RecordReader<?, ?> createRecordReader(Configuration configuration, Path path, long start, long length, Properties schema, List<HiveColumnHandle> columns)
{
    // determine which hive columns we will read
    List<HiveColumnHandle> readColumns = columns.stream()
            .filter(column -> column.getColumnType() == REGULAR)
            .collect(toImmutableList());

    // Projected columns are not supported here
    readColumns.forEach(readColumn -> checkArgument(readColumn.isBaseColumn(), "column %s is not a base column", readColumn.getName()));

    List<Integer> readHiveColumnIndexes = readColumns.stream()
            .map(HiveColumnHandle::getBaseHiveColumnIndex)
            .collect(toImmutableList());

    // Tell hive the columns we would like to read, this lets hive optimize reading column oriented files
    configuration = copy(configuration);
    setReadColumns(configuration, readHiveColumnIndexes);

    InputFormat<?, ?> inputFormat = getInputFormat(configuration, schema, true);
    JobConf jobConf = toJobConf(configuration);
    FileSplit fileSplit = new FileSplit(path, start, length, (String[]) null);

    // propagate serialization configuration to getRecordReader
    schema.stringPropertyNames().stream()
            .filter(name -> name.startsWith("serialization."))
            .forEach(name -> jobConf.set(name, schema.getProperty(name)));

    configureCompressionCodecs(jobConf);

    try {
        RecordReader<WritableComparable, Writable> recordReader = (RecordReader<WritableComparable, Writable>) inputFormat.getRecordReader(fileSplit, jobConf, Reporter.NULL);

        int headerCount = getHeaderCount(schema);
        if (headerCount > 0) {
            Utilities.skipHeader(recordReader, headerCount, recordReader.createKey(), recordReader.createValue());
        }

        int footerCount = getFooterCount(schema);
        if (footerCount > 0) {
            recordReader = new FooterAwareRecordReader<>(recordReader, footerCount, jobConf);
        }

        return recordReader;
    }
    catch (IOException e) {
        if (e instanceof TextLineLengthLimitExceededException) {
            throw new PrestoException(HIVE_BAD_DATA, "Line too long in text file: " + path, e);
        }

        throw new PrestoException(HIVE_CANNOT_OPEN_SPLIT, format("Error opening Hive split %s (offset=%s, length=%s) using %s: %s",
                path,
                start,
                length,
                getInputFormatName(schema),
                firstNonNull(e.getMessage(), e.getClass().getName())),
                e);
    }
}
 
源代码17 项目: hadoop   文件: InputSampler.java
/**
 * Randomize the split order, then take the specified number of keys from
 * each split sampled, where each key is selected with the specified
 * probability and possibly replaced by a subsequently selected key when
 * the quota of keys from that split is satisfied.
 */
@SuppressWarnings("unchecked") // ArrayList::toArray doesn't preserve type
public K[] getSample(InputFormat<K,V> inf, JobConf job) throws IOException {
  InputSplit[] splits = inf.getSplits(job, job.getNumMapTasks());
  ArrayList<K> samples = new ArrayList<K>(numSamples);
  int splitsToSample = Math.min(maxSplitsSampled, splits.length);

  Random r = new Random();
  long seed = r.nextLong();
  r.setSeed(seed);
  LOG.debug("seed: " + seed);
  // shuffle splits
  for (int i = 0; i < splits.length; ++i) {
    InputSplit tmp = splits[i];
    int j = r.nextInt(splits.length);
    splits[i] = splits[j];
    splits[j] = tmp;
  }
  // our target rate is in terms of the maximum number of sample splits,
  // but we accept the possibility of sampling additional splits to hit
  // the target sample keyset
  for (int i = 0; i < splitsToSample ||
                 (i < splits.length && samples.size() < numSamples); ++i) {
    RecordReader<K,V> reader = inf.getRecordReader(splits[i], job,
        Reporter.NULL);
    K key = reader.createKey();
    V value = reader.createValue();
    while (reader.next(key, value)) {
      if (r.nextDouble() <= freq) {
        if (samples.size() < numSamples) {
          samples.add(key);
        } else {
          // When exceeding the maximum number of samples, replace a
          // random element with this one, then adjust the frequency
          // to reflect the possibility of existing elements being
          // pushed out
          int ind = r.nextInt(numSamples);
          if (ind != numSamples) {
            samples.set(ind, key);
          }
          freq *= (numSamples - 1) / (double) numSamples;
        }
        key = reader.createKey();
      }
    }
    reader.close();
  }
  return (K[])samples.toArray();
}
 
源代码18 项目: hadoop-gpu   文件: InputSampler.java
/**
 * Randomize the split order, then take the specified number of keys from
 * each split sampled, where each key is selected with the specified
 * probability and possibly replaced by a subsequently selected key when
 * the quota of keys from that split is satisfied.
 */
@SuppressWarnings("unchecked") // ArrayList::toArray doesn't preserve type
public K[] getSample(InputFormat<K,V> inf, JobConf job) throws IOException {
  InputSplit[] splits = inf.getSplits(job, job.getNumMapTasks());
  ArrayList<K> samples = new ArrayList<K>(numSamples);
  int splitsToSample = Math.min(maxSplitsSampled, splits.length);

  Random r = new Random();
  long seed = r.nextLong();
  r.setSeed(seed);
  LOG.debug("seed: " + seed);
  // shuffle splits
  for (int i = 0; i < splits.length; ++i) {
    InputSplit tmp = splits[i];
    int j = r.nextInt(splits.length);
    splits[i] = splits[j];
    splits[j] = tmp;
  }
  // our target rate is in terms of the maximum number of sample splits,
  // but we accept the possibility of sampling additional splits to hit
  // the target sample keyset
  for (int i = 0; i < splitsToSample ||
                 (i < splits.length && samples.size() < numSamples); ++i) {
    RecordReader<K,V> reader = inf.getRecordReader(splits[i], job,
        Reporter.NULL);
    K key = reader.createKey();
    V value = reader.createValue();
    while (reader.next(key, value)) {
      if (r.nextDouble() <= freq) {
        if (samples.size() < numSamples) {
          samples.add(key);
        } else {
          // When exceeding the maximum number of samples, replace a
          // random element with this one, then adjust the frequency
          // to reflect the possibility of existing elements being
          // pushed out
          int ind = r.nextInt(numSamples);
          if (ind != numSamples) {
            samples.set(ind, key);
          }
          freq *= (numSamples - 1) / (double) numSamples;
        }
        key = reader.createKey();
      }
    }
    reader.close();
  }
  return (K[])samples.toArray();
}
 
源代码19 项目: systemds   文件: FrameReaderTextCSV.java
protected final int readCSVFrameFromInputSplit( InputSplit split, InputFormat<LongWritable,Text> informat, JobConf job, 
		FrameBlock dest, ValueType[] schema, String[] names, long rlen, long clen, int rl, boolean first)
	throws IOException
{
	boolean hasHeader = _props.hasHeader();
	boolean isFill = _props.isFill();
	double dfillValue = _props.getFillValue();
	String sfillValue = String.valueOf(_props.getFillValue());
	String delim = _props.getDelim();
	
	//create record reader
	RecordReader<LongWritable, Text> reader = informat.getRecordReader(split, job, Reporter.NULL);
	LongWritable key = new LongWritable();
	Text value = new Text();
	int row = rl;
	int col = -1;
	
	//handle header if existing
	if(first && hasHeader ) {
		reader.next(key, value); //read header
		dest.setColumnNames(value.toString().split(delim));
	}
		
	// Read the data
	boolean emptyValuesFound = false;
	try
	{
		while( reader.next(key, value) ) //foreach line
		{
			String cellStr = value.toString().trim();
			emptyValuesFound = false; col = 0;
			String[] parts = IOUtilFunctions.splitCSV(cellStr, delim);
			
			//parse frame meta data (missing values / num distinct)
			if( parts[0].equals(TfUtils.TXMTD_MVPREFIX) || parts[0].equals(TfUtils.TXMTD_NDPREFIX) ) {
				if( parts[0].equals(TfUtils.TXMTD_MVPREFIX) )
					for( int j=0; j<dest.getNumColumns(); j++ )
						dest.getColumnMetadata(j).setMvValue(parts[j+1]);
				else if( parts[0].equals(TfUtils.TXMTD_NDPREFIX) )
					for( int j=0; j<dest.getNumColumns(); j++ )
						dest.getColumnMetadata(j).setNumDistinct(Long.parseLong(parts[j+1]));
				continue;
			}
			
			for( String part : parts ) //foreach cell
			{
				part = part.trim();
				if ( part.isEmpty() ) {
					if( isFill && dfillValue!=0 )
						dest.set(row, col, UtilFunctions.stringToObject(schema[col], sfillValue));
					emptyValuesFound = true;
				}
				else {
					dest.set(row, col, UtilFunctions.stringToObject(schema[col], part));
				}
				col++;
			}
			
			//sanity checks for empty values and number of columns
			IOUtilFunctions.checkAndRaiseErrorCSVEmptyField(cellStr, isFill, emptyValuesFound);
			IOUtilFunctions.checkAndRaiseErrorCSVNumColumns("", cellStr, parts, clen);
			row++;
		}
	}
	finally {
		IOUtilFunctions.closeSilently(reader);
	}
	
	return row;
}
 
源代码20 项目: RDFS   文件: InputSampler.java
/**
 * Randomize the split order, then take the specified number of keys from
 * each split sampled, where each key is selected with the specified
 * probability and possibly replaced by a subsequently selected key when
 * the quota of keys from that split is satisfied.
 */
@SuppressWarnings("unchecked") // ArrayList::toArray doesn't preserve type
public K[] getSample(InputFormat<K,V> inf, JobConf job) throws IOException {
  InputSplit[] splits = inf.getSplits(job, job.getNumMapTasks());
  ArrayList<K> samples = new ArrayList<K>(numSamples);
  int splitsToSample = Math.min(maxSplitsSampled, splits.length);

  Random r = new Random();
  long seed = r.nextLong();
  r.setSeed(seed);
  LOG.debug("seed: " + seed);
  // shuffle splits
  for (int i = 0; i < splits.length; ++i) {
    InputSplit tmp = splits[i];
    int j = r.nextInt(splits.length);
    splits[i] = splits[j];
    splits[j] = tmp;
  }
  // our target rate is in terms of the maximum number of sample splits,
  // but we accept the possibility of sampling additional splits to hit
  // the target sample keyset
  for (int i = 0; i < splitsToSample ||
                 (i < splits.length && samples.size() < numSamples); ++i) {
    RecordReader<K,V> reader = inf.getRecordReader(splits[i], job,
        Reporter.NULL);
    K key = reader.createKey();
    V value = reader.createValue();
    while (reader.next(key, value)) {
      if (r.nextDouble() <= freq) {
        if (samples.size() < numSamples) {
          samples.add(key);
        } else {
          // When exceeding the maximum number of samples, replace a
          // random element with this one, then adjust the frequency
          // to reflect the possibility of existing elements being
          // pushed out
          int ind = r.nextInt(numSamples);
          if (ind != numSamples) {
            samples.set(ind, key);
          }
          freq *= (numSamples - 1) / (double) numSamples;
        }
        key = reader.createKey();
      }
    }
    reader.close();
  }
  return (K[])samples.toArray();
}