org.apache.hadoop.mapred.InputFormat#getSplits ( )源码实例Demo

下面列出了org.apache.hadoop.mapred.InputFormat#getSplits ( ) 实例代码,或者点击链接到github查看源代码,也可以在右侧发表评论。

private String getFilePath(Table tbl) throws Exception {

        StorageDescriptor descTable = tbl.getSd();

        InputFormat<?, ?> fformat = HiveDataFragmenter.makeInputFormat(descTable.getInputFormat(), jobConf);

        FileInputFormat.setInputPaths(jobConf, new Path(descTable.getLocation()));

        InputSplit[] splits;
        try {
            splits = fformat.getSplits(jobConf, 1);
        } catch (org.apache.hadoop.mapred.InvalidInputException e) {
            LOG.debug("getSplits failed on " + e.getMessage());
            throw new RuntimeException("Unable to get file path for table.");
        }

        for (InputSplit split : splits) {
            FileSplit fsp = (FileSplit) split;
            String[] hosts = fsp.getLocations();
            String filepath = fsp.getPath().toString();
            return filepath;
        }
        throw new RuntimeException("Unable to get file path for table.");
    }
 
源代码2 项目: hadoop   文件: InputSampler.java
/**
 * From each split sampled, take the first numSamples / numSplits records.
 */
@SuppressWarnings("unchecked") // ArrayList::toArray doesn't preserve type
public K[] getSample(InputFormat<K,V> inf, JobConf job) throws IOException {
  InputSplit[] splits = inf.getSplits(job, job.getNumMapTasks());
  ArrayList<K> samples = new ArrayList<K>(numSamples);
  int splitsToSample = Math.min(maxSplitsSampled, splits.length);
  int splitStep = splits.length / splitsToSample;
  int samplesPerSplit = numSamples / splitsToSample;
  long records = 0;
  for (int i = 0; i < splitsToSample; ++i) {
    RecordReader<K,V> reader = inf.getRecordReader(splits[i * splitStep],
        job, Reporter.NULL);
    K key = reader.createKey();
    V value = reader.createValue();
    while (reader.next(key, value)) {
      samples.add(key);
      key = reader.createKey();
      ++records;
      if ((i+1) * samplesPerSplit <= records) {
        break;
      }
    }
    reader.close();
  }
  return (K[])samples.toArray();
}
 
源代码3 项目: hadoop   文件: InputSampler.java
/**
 * For each split sampled, emit when the ratio of the number of records
 * retained to the total record count is less than the specified
 * frequency.
 */
@SuppressWarnings("unchecked") // ArrayList::toArray doesn't preserve type
public K[] getSample(InputFormat<K,V> inf, JobConf job) throws IOException {
  InputSplit[] splits = inf.getSplits(job, job.getNumMapTasks());
  ArrayList<K> samples = new ArrayList<K>();
  int splitsToSample = Math.min(maxSplitsSampled, splits.length);
  int splitStep = splits.length / splitsToSample;
  long records = 0;
  long kept = 0;
  for (int i = 0; i < splitsToSample; ++i) {
    RecordReader<K,V> reader = inf.getRecordReader(splits[i * splitStep],
        job, Reporter.NULL);
    K key = reader.createKey();
    V value = reader.createValue();
    while (reader.next(key, value)) {
      ++records;
      if ((double) kept / records < freq) {
        ++kept;
        samples.add(key);
        key = reader.createKey();
      }
    }
    reader.close();
  }
  return (K[])samples.toArray();
}
 
源代码4 项目: dremio-oss   文件: HiveMetadataUtils.java
private static List<InputSplit> getInputSplits(final InputFormat<?, ?> format, final JobConf job) {
  InputSplit[] inputSplits;
  try {
    // Parquet logic in hive-3.1.1 does not check recursively by default.
    job.set(FileInputFormat.INPUT_DIR_RECURSIVE, "true");
    inputSplits = format.getSplits(job, 1);
  } catch (IOException e) {
    throw new RuntimeException(e);
  }

  if (null == inputSplits) {
    return Collections.emptyList();
  } else {
    return Arrays.asList(inputSplits);
  }
}
 
源代码5 项目: big-c   文件: InputSampler.java
/**
 * From each split sampled, take the first numSamples / numSplits records.
 */
@SuppressWarnings("unchecked") // ArrayList::toArray doesn't preserve type
public K[] getSample(InputFormat<K,V> inf, JobConf job) throws IOException {
  InputSplit[] splits = inf.getSplits(job, job.getNumMapTasks());
  ArrayList<K> samples = new ArrayList<K>(numSamples);
  int splitsToSample = Math.min(maxSplitsSampled, splits.length);
  int splitStep = splits.length / splitsToSample;
  int samplesPerSplit = numSamples / splitsToSample;
  long records = 0;
  for (int i = 0; i < splitsToSample; ++i) {
    RecordReader<K,V> reader = inf.getRecordReader(splits[i * splitStep],
        job, Reporter.NULL);
    K key = reader.createKey();
    V value = reader.createValue();
    while (reader.next(key, value)) {
      samples.add(key);
      key = reader.createKey();
      ++records;
      if ((i+1) * samplesPerSplit <= records) {
        break;
      }
    }
    reader.close();
  }
  return (K[])samples.toArray();
}
 
源代码6 项目: big-c   文件: InputSampler.java
/**
 * For each split sampled, emit when the ratio of the number of records
 * retained to the total record count is less than the specified
 * frequency.
 */
@SuppressWarnings("unchecked") // ArrayList::toArray doesn't preserve type
public K[] getSample(InputFormat<K,V> inf, JobConf job) throws IOException {
  InputSplit[] splits = inf.getSplits(job, job.getNumMapTasks());
  ArrayList<K> samples = new ArrayList<K>();
  int splitsToSample = Math.min(maxSplitsSampled, splits.length);
  int splitStep = splits.length / splitsToSample;
  long records = 0;
  long kept = 0;
  for (int i = 0; i < splitsToSample; ++i) {
    RecordReader<K,V> reader = inf.getRecordReader(splits[i * splitStep],
        job, Reporter.NULL);
    K key = reader.createKey();
    V value = reader.createValue();
    while (reader.next(key, value)) {
      ++records;
      if ((double) kept / records < freq) {
        ++kept;
        samples.add(key);
        key = reader.createKey();
      }
    }
    reader.close();
  }
  return (K[])samples.toArray();
}
 
源代码7 项目: incubator-gobblin   文件: HiveUtils.java
/**
 * Get paths from a Hive location using the provided input format.
 */
public static Set<Path> getPaths(InputFormat<?, ?> inputFormat, Path location) throws IOException {
  JobConf jobConf = new JobConf(getHadoopConfiguration());

  Set<Path> paths = Sets.newHashSet();

  FileInputFormat.addInputPaths(jobConf, location.toString());
  InputSplit[] splits = inputFormat.getSplits(jobConf, 1000);
  for (InputSplit split : splits) {
    if (!(split instanceof FileSplit)) {
      throw new IOException("Not a file split. Found " + split.getClass().getName());
    }
    FileSplit fileSplit = (FileSplit) split;
    paths.add(fileSplit.getPath());
  }

  return paths;
}
 
源代码8 项目: RDFS   文件: InputSampler.java
/**
 * From each split sampled, take the first numSamples / numSplits records.
 */
@SuppressWarnings("unchecked") // ArrayList::toArray doesn't preserve type
public K[] getSample(InputFormat<K,V> inf, JobConf job) throws IOException {
  InputSplit[] splits = inf.getSplits(job, job.getNumMapTasks());
  ArrayList<K> samples = new ArrayList<K>(numSamples);
  int splitsToSample = Math.min(maxSplitsSampled, splits.length);
  int splitStep = splits.length / splitsToSample;
  int samplesPerSplit = numSamples / splitsToSample;
  long records = 0;
  for (int i = 0; i < splitsToSample; ++i) {
    RecordReader<K,V> reader = inf.getRecordReader(splits[i * splitStep],
        job, Reporter.NULL);
    K key = reader.createKey();
    V value = reader.createValue();
    while (reader.next(key, value)) {
      samples.add(key);
      key = reader.createKey();
      ++records;
      if ((i+1) * samplesPerSplit <= records) {
        break;
      }
    }
    reader.close();
  }
  return (K[])samples.toArray();
}
 
源代码9 项目: hadoop-gpu   文件: InputSampler.java
/**
 * For each split sampled, emit when the ratio of the number of records
 * retained to the total record count is less than the specified
 * frequency.
 */
@SuppressWarnings("unchecked") // ArrayList::toArray doesn't preserve type
public K[] getSample(InputFormat<K,V> inf, JobConf job) throws IOException {
  InputSplit[] splits = inf.getSplits(job, job.getNumMapTasks());
  ArrayList<K> samples = new ArrayList<K>();
  int splitsToSample = Math.min(maxSplitsSampled, splits.length);
  int splitStep = splits.length / splitsToSample;
  long records = 0;
  long kept = 0;
  for (int i = 0; i < splitsToSample; ++i) {
    RecordReader<K,V> reader = inf.getRecordReader(splits[i * splitStep],
        job, Reporter.NULL);
    K key = reader.createKey();
    V value = reader.createValue();
    while (reader.next(key, value)) {
      ++records;
      if ((double) kept / records < freq) {
        ++kept;
        samples.add(key);
        key = reader.createKey();
      }
    }
    reader.close();
  }
  return (K[])samples.toArray();
}
 
源代码10 项目: RDFS   文件: InputSampler.java
/**
 * For each split sampled, emit when the ratio of the number of records
 * retained to the total record count is less than the specified
 * frequency.
 */
@SuppressWarnings("unchecked") // ArrayList::toArray doesn't preserve type
public K[] getSample(InputFormat<K,V> inf, JobConf job) throws IOException {
  InputSplit[] splits = inf.getSplits(job, job.getNumMapTasks());
  ArrayList<K> samples = new ArrayList<K>();
  int splitsToSample = Math.min(maxSplitsSampled, splits.length);
  int splitStep = splits.length / splitsToSample;
  long records = 0;
  long kept = 0;
  for (int i = 0; i < splitsToSample; ++i) {
    RecordReader<K,V> reader = inf.getRecordReader(splits[i * splitStep],
        job, Reporter.NULL);
    K key = reader.createKey();
    V value = reader.createValue();
    while (reader.next(key, value)) {
      ++records;
      if ((double) kept / records < freq) {
        ++kept;
        samples.add(key);
        key = reader.createKey();
      }
    }
    reader.close();
  }
  return (K[])samples.toArray();
}
 
源代码11 项目: hadoop-gpu   文件: InputSampler.java
/**
 * From each split sampled, take the first numSamples / numSplits records.
 */
@SuppressWarnings("unchecked") // ArrayList::toArray doesn't preserve type
public K[] getSample(InputFormat<K,V> inf, JobConf job) throws IOException {
  InputSplit[] splits = inf.getSplits(job, job.getNumMapTasks());
  ArrayList<K> samples = new ArrayList<K>(numSamples);
  int splitsToSample = Math.min(maxSplitsSampled, splits.length);
  int splitStep = splits.length / splitsToSample;
  int samplesPerSplit = numSamples / splitsToSample;
  long records = 0;
  for (int i = 0; i < splitsToSample; ++i) {
    RecordReader<K,V> reader = inf.getRecordReader(splits[i * splitStep],
        job, Reporter.NULL);
    K key = reader.createKey();
    V value = reader.createValue();
    while (reader.next(key, value)) {
      samples.add(key);
      key = reader.createKey();
      ++records;
      if ((i+1) * samplesPerSplit <= records) {
        break;
      }
    }
    reader.close();
  }
  return (K[])samples.toArray();
}
 
源代码12 项目: dremio-oss   文件: HiveMetadataUtils.java
private static List<InputSplit> getInputSplits(final InputFormat<?, ?> format, final JobConf job) {
  InputSplit[] inputSplits;
  try {
    inputSplits = format.getSplits(job, 1);
  } catch (IOException e) {
    throw new RuntimeException(e);
  }

  if (null == inputSplits) {
    return Collections.emptyList();
  } else {
    return Arrays.asList(inputSplits);
  }
}
 
源代码13 项目: gemfirexd-oss   文件: GemFireXDFragmenter.java
private InputSplit[] getSplits() throws IOException {
  InputFormat<Key, Row> inputFormat = this.gfxdManager.getInputFormat();
  try {
    return inputFormat.getSplits(this.jobConf, 1);
  } catch (FileNotFoundException fnfe) {
    throw new FileNotFoundException(
        "Table "
            + this.gfxdManager.getTable()
            + " not found. "
            + "The LOCATION string may contain incorrect value for one or more of the following:"
            + "1. Path to HDFSSTORE (homeDir), 2. Schema name or 3. Table name. "
            + GemFireXDManager.LOCATION_FORMAT);
  }
}
 
源代码14 项目: gemfirexd-oss   文件: GemFireXDFragmenter.java
private InputSplit[] getSplits() throws IOException {
  InputFormat<Key, Row> inputFormat = this.gfxdManager.getInputFormat();
  try {
    return inputFormat.getSplits(this.jobConf, 1);
  } catch (FileNotFoundException fnfe) {
    throw new FileNotFoundException(
        "Table "
            + this.gfxdManager.getTable()
            + " not found. "
            + "The LOCATION string may contain incorrect value for one or more of the following:"
            + "1. Path to HDFSSTORE (homeDir), 2. Schema name or 3. Table name. "
            + GemFireXDManager.LOCATION_FORMAT);
  }
}
 
@SuppressWarnings("rawtypes")
AbstractEvaluatorToPartitionStrategy(
    final String inputFormatClassName, final Set<String> serializedDataPartitions) {
  LOG.fine("AbstractEvaluatorToPartitionStrategy injected");
  Validate.notEmpty(inputFormatClassName);
  Validate.notEmpty(serializedDataPartitions);

  locationToSplits = new ConcurrentHashMap<>();
  evaluatorToSplits = new ConcurrentHashMap<>();
  unallocatedSplits = new LinkedBlockingQueue<>();
  setUp();

  final Map<DistributedDataSetPartition, InputSplit[]> splitsPerPartition = new HashMap<>();
  for (final String serializedDataPartition : serializedDataPartitions) {
    final DistributedDataSetPartition dp = DistributedDataSetPartitionSerializer.deserialize(serializedDataPartition);
    final ExternalConstructor<JobConf> jobConfExternalConstructor = new JobConfExternalConstructor(
        inputFormatClassName, dp.getPath());
    try {
      final JobConf jobConf = jobConfExternalConstructor.newInstance();
      final InputFormat inputFormat = jobConf.getInputFormat();
      final InputSplit[] inputSplits = inputFormat.getSplits(jobConf, dp.getDesiredSplits());
      if (LOG.isLoggable(Level.FINEST)) {
        LOG.log(Level.FINEST, "Splits for partition: {0} {1}", new Object[] {dp, Arrays.toString(inputSplits)});
      }
      this.totalNumberOfSplits += inputSplits.length;
      splitsPerPartition.put(dp, inputSplits);
    } catch (final IOException e) {
      throw new RuntimeException("Unable to get InputSplits using the specified InputFormat", e);
    }
  }
  init(splitsPerPartition);
  LOG.log(Level.FINE, "Total Number of splits: {0}", this.totalNumberOfSplits);
}
 
源代码16 项目: hive-dwrf   文件: TestInputOutputFormat.java
@Test
public void testEmptyFile() throws Exception {
  JobConf job = new JobConf(conf);
  Properties properties = new Properties();
  HiveOutputFormat<?, ?> outFormat = new OrcOutputFormat();
  FileSinkOperator.RecordWriter writer =
      outFormat.getHiveRecordWriter(conf, testFilePath, MyRow.class, true,
          properties, Reporter.NULL);
  writer.close(true);
  properties.setProperty("columns", "x,y");
  properties.setProperty("columns.types", "int:int");
  SerDe serde = new OrcSerde();
  serde.initialize(conf, properties);
  InputFormat<?,?> in = new OrcInputFormat();
  FileInputFormat.setInputPaths(conf, testFilePath.toString());
  InputSplit[] splits = in.getSplits(conf, 1);
  assertEquals(1, splits.length);

  // read the whole file
  conf.set("hive.io.file.readcolumn.ids", "0,1");
  org.apache.hadoop.mapred.RecordReader reader =
      in.getRecordReader(splits[0], conf, Reporter.NULL);
  Object key = reader.createKey();
  Object value = reader.createValue();
  assertEquals(0.0, reader.getProgress(), 0.00001);
  assertEquals(0, reader.getPos());
  assertEquals(false, reader.next(key, value));
  reader.close();
  assertEquals(null, serde.getSerDeStats());
}
 
源代码17 项目: pxf   文件: HiveDataFragmenter.java
private void fetchMetaData(HiveTablePartition tablePartition,
                           boolean hasComplexTypes,
                           List<Integer> hiveIndexes,
                           String allColumnNames,
                           String allColumnTypes)
        throws Exception {
    InputFormat<?, ?> fformat = makeInputFormat(
            tablePartition.storageDesc.getInputFormat(), jobConf);
    String profile = null;
    String userProfile = context.getProfile();
    if (userProfile != null) {
        // evaluate optimal profile based on file format if profile was explicitly specified in url
        // if user passed accessor+fragmenter+resolver - use them
        profile = ProfileFactory.get(fformat, hasComplexTypes, userProfile);
    }
    String fragmenterForProfile;
    if (profile != null) {
        fragmenterForProfile = context.getPluginConf().getPlugins(profile).get("FRAGMENTER");
    } else {
        fragmenterForProfile = context.getFragmenter();
    }

    FileInputFormat.setInputPaths(jobConf, new Path(
            tablePartition.storageDesc.getLocation()));

    InputSplit[] splits;
    try {
        splits = fformat.getSplits(jobConf, 1);
    } catch (org.apache.hadoop.mapred.InvalidInputException e) {
        LOG.debug("getSplits failed on " + e.getMessage());
        return;
    }

    for (InputSplit split : splits) {
        FileSplit fsp = (FileSplit) split;
        String[] hosts = fsp.getLocations();
        String filepath = fsp.getPath().toString();

        byte[] locationInfo = HdfsUtilities.prepareFragmentMetadata(fsp);
        byte[] userData = hiveClientWrapper.makeUserData(
                fragmenterForProfile,
                tablePartition,
                filterInFragmenter,
                hiveIndexes,
                allColumnNames,
                allColumnTypes);
        Fragment fragment = new Fragment(filepath, hosts, locationInfo,
                userData, profile);
        fragments.add(fragment);
    }
}
 
源代码18 项目: hadoop   文件: InputSampler.java
/**
 * Randomize the split order, then take the specified number of keys from
 * each split sampled, where each key is selected with the specified
 * probability and possibly replaced by a subsequently selected key when
 * the quota of keys from that split is satisfied.
 */
@SuppressWarnings("unchecked") // ArrayList::toArray doesn't preserve type
public K[] getSample(InputFormat<K,V> inf, JobConf job) throws IOException {
  InputSplit[] splits = inf.getSplits(job, job.getNumMapTasks());
  ArrayList<K> samples = new ArrayList<K>(numSamples);
  int splitsToSample = Math.min(maxSplitsSampled, splits.length);

  Random r = new Random();
  long seed = r.nextLong();
  r.setSeed(seed);
  LOG.debug("seed: " + seed);
  // shuffle splits
  for (int i = 0; i < splits.length; ++i) {
    InputSplit tmp = splits[i];
    int j = r.nextInt(splits.length);
    splits[i] = splits[j];
    splits[j] = tmp;
  }
  // our target rate is in terms of the maximum number of sample splits,
  // but we accept the possibility of sampling additional splits to hit
  // the target sample keyset
  for (int i = 0; i < splitsToSample ||
                 (i < splits.length && samples.size() < numSamples); ++i) {
    RecordReader<K,V> reader = inf.getRecordReader(splits[i], job,
        Reporter.NULL);
    K key = reader.createKey();
    V value = reader.createValue();
    while (reader.next(key, value)) {
      if (r.nextDouble() <= freq) {
        if (samples.size() < numSamples) {
          samples.add(key);
        } else {
          // When exceeding the maximum number of samples, replace a
          // random element with this one, then adjust the frequency
          // to reflect the possibility of existing elements being
          // pushed out
          int ind = r.nextInt(numSamples);
          if (ind != numSamples) {
            samples.set(ind, key);
          }
          freq *= (numSamples - 1) / (double) numSamples;
        }
        key = reader.createKey();
      }
    }
    reader.close();
  }
  return (K[])samples.toArray();
}
 
源代码19 项目: big-c   文件: InputSampler.java
/**
 * Randomize the split order, then take the specified number of keys from
 * each split sampled, where each key is selected with the specified
 * probability and possibly replaced by a subsequently selected key when
 * the quota of keys from that split is satisfied.
 */
@SuppressWarnings("unchecked") // ArrayList::toArray doesn't preserve type
public K[] getSample(InputFormat<K,V> inf, JobConf job) throws IOException {
  InputSplit[] splits = inf.getSplits(job, job.getNumMapTasks());
  ArrayList<K> samples = new ArrayList<K>(numSamples);
  int splitsToSample = Math.min(maxSplitsSampled, splits.length);

  Random r = new Random();
  long seed = r.nextLong();
  r.setSeed(seed);
  LOG.debug("seed: " + seed);
  // shuffle splits
  for (int i = 0; i < splits.length; ++i) {
    InputSplit tmp = splits[i];
    int j = r.nextInt(splits.length);
    splits[i] = splits[j];
    splits[j] = tmp;
  }
  // our target rate is in terms of the maximum number of sample splits,
  // but we accept the possibility of sampling additional splits to hit
  // the target sample keyset
  for (int i = 0; i < splitsToSample ||
                 (i < splits.length && samples.size() < numSamples); ++i) {
    RecordReader<K,V> reader = inf.getRecordReader(splits[i], job,
        Reporter.NULL);
    K key = reader.createKey();
    V value = reader.createValue();
    while (reader.next(key, value)) {
      if (r.nextDouble() <= freq) {
        if (samples.size() < numSamples) {
          samples.add(key);
        } else {
          // When exceeding the maximum number of samples, replace a
          // random element with this one, then adjust the frequency
          // to reflect the possibility of existing elements being
          // pushed out
          int ind = r.nextInt(numSamples);
          if (ind != numSamples) {
            samples.set(ind, key);
          }
          freq *= (numSamples - 1) / (double) numSamples;
        }
        key = reader.createKey();
      }
    }
    reader.close();
  }
  return (K[])samples.toArray();
}
 
源代码20 项目: hadoop-gpu   文件: InputSampler.java
/**
 * Randomize the split order, then take the specified number of keys from
 * each split sampled, where each key is selected with the specified
 * probability and possibly replaced by a subsequently selected key when
 * the quota of keys from that split is satisfied.
 */
@SuppressWarnings("unchecked") // ArrayList::toArray doesn't preserve type
public K[] getSample(InputFormat<K,V> inf, JobConf job) throws IOException {
  InputSplit[] splits = inf.getSplits(job, job.getNumMapTasks());
  ArrayList<K> samples = new ArrayList<K>(numSamples);
  int splitsToSample = Math.min(maxSplitsSampled, splits.length);

  Random r = new Random();
  long seed = r.nextLong();
  r.setSeed(seed);
  LOG.debug("seed: " + seed);
  // shuffle splits
  for (int i = 0; i < splits.length; ++i) {
    InputSplit tmp = splits[i];
    int j = r.nextInt(splits.length);
    splits[i] = splits[j];
    splits[j] = tmp;
  }
  // our target rate is in terms of the maximum number of sample splits,
  // but we accept the possibility of sampling additional splits to hit
  // the target sample keyset
  for (int i = 0; i < splitsToSample ||
                 (i < splits.length && samples.size() < numSamples); ++i) {
    RecordReader<K,V> reader = inf.getRecordReader(splits[i], job,
        Reporter.NULL);
    K key = reader.createKey();
    V value = reader.createValue();
    while (reader.next(key, value)) {
      if (r.nextDouble() <= freq) {
        if (samples.size() < numSamples) {
          samples.add(key);
        } else {
          // When exceeding the maximum number of samples, replace a
          // random element with this one, then adjust the frequency
          // to reflect the possibility of existing elements being
          // pushed out
          int ind = r.nextInt(numSamples);
          if (ind != numSamples) {
            samples.set(ind, key);
          }
          freq *= (numSamples - 1) / (double) numSamples;
        }
        key = reader.createKey();
      }
    }
    reader.close();
  }
  return (K[])samples.toArray();
}