下面列出了org.apache.hadoop.mapred.InputFormat#getSplits ( ) 实例代码,或者点击链接到github查看源代码,也可以在右侧发表评论。
private String getFilePath(Table tbl) throws Exception {
StorageDescriptor descTable = tbl.getSd();
InputFormat<?, ?> fformat = HiveDataFragmenter.makeInputFormat(descTable.getInputFormat(), jobConf);
FileInputFormat.setInputPaths(jobConf, new Path(descTable.getLocation()));
InputSplit[] splits;
try {
splits = fformat.getSplits(jobConf, 1);
} catch (org.apache.hadoop.mapred.InvalidInputException e) {
LOG.debug("getSplits failed on " + e.getMessage());
throw new RuntimeException("Unable to get file path for table.");
}
for (InputSplit split : splits) {
FileSplit fsp = (FileSplit) split;
String[] hosts = fsp.getLocations();
String filepath = fsp.getPath().toString();
return filepath;
}
throw new RuntimeException("Unable to get file path for table.");
}
/**
* From each split sampled, take the first numSamples / numSplits records.
*/
@SuppressWarnings("unchecked") // ArrayList::toArray doesn't preserve type
public K[] getSample(InputFormat<K,V> inf, JobConf job) throws IOException {
InputSplit[] splits = inf.getSplits(job, job.getNumMapTasks());
ArrayList<K> samples = new ArrayList<K>(numSamples);
int splitsToSample = Math.min(maxSplitsSampled, splits.length);
int splitStep = splits.length / splitsToSample;
int samplesPerSplit = numSamples / splitsToSample;
long records = 0;
for (int i = 0; i < splitsToSample; ++i) {
RecordReader<K,V> reader = inf.getRecordReader(splits[i * splitStep],
job, Reporter.NULL);
K key = reader.createKey();
V value = reader.createValue();
while (reader.next(key, value)) {
samples.add(key);
key = reader.createKey();
++records;
if ((i+1) * samplesPerSplit <= records) {
break;
}
}
reader.close();
}
return (K[])samples.toArray();
}
/**
* For each split sampled, emit when the ratio of the number of records
* retained to the total record count is less than the specified
* frequency.
*/
@SuppressWarnings("unchecked") // ArrayList::toArray doesn't preserve type
public K[] getSample(InputFormat<K,V> inf, JobConf job) throws IOException {
InputSplit[] splits = inf.getSplits(job, job.getNumMapTasks());
ArrayList<K> samples = new ArrayList<K>();
int splitsToSample = Math.min(maxSplitsSampled, splits.length);
int splitStep = splits.length / splitsToSample;
long records = 0;
long kept = 0;
for (int i = 0; i < splitsToSample; ++i) {
RecordReader<K,V> reader = inf.getRecordReader(splits[i * splitStep],
job, Reporter.NULL);
K key = reader.createKey();
V value = reader.createValue();
while (reader.next(key, value)) {
++records;
if ((double) kept / records < freq) {
++kept;
samples.add(key);
key = reader.createKey();
}
}
reader.close();
}
return (K[])samples.toArray();
}
private static List<InputSplit> getInputSplits(final InputFormat<?, ?> format, final JobConf job) {
InputSplit[] inputSplits;
try {
// Parquet logic in hive-3.1.1 does not check recursively by default.
job.set(FileInputFormat.INPUT_DIR_RECURSIVE, "true");
inputSplits = format.getSplits(job, 1);
} catch (IOException e) {
throw new RuntimeException(e);
}
if (null == inputSplits) {
return Collections.emptyList();
} else {
return Arrays.asList(inputSplits);
}
}
/**
* From each split sampled, take the first numSamples / numSplits records.
*/
@SuppressWarnings("unchecked") // ArrayList::toArray doesn't preserve type
public K[] getSample(InputFormat<K,V> inf, JobConf job) throws IOException {
InputSplit[] splits = inf.getSplits(job, job.getNumMapTasks());
ArrayList<K> samples = new ArrayList<K>(numSamples);
int splitsToSample = Math.min(maxSplitsSampled, splits.length);
int splitStep = splits.length / splitsToSample;
int samplesPerSplit = numSamples / splitsToSample;
long records = 0;
for (int i = 0; i < splitsToSample; ++i) {
RecordReader<K,V> reader = inf.getRecordReader(splits[i * splitStep],
job, Reporter.NULL);
K key = reader.createKey();
V value = reader.createValue();
while (reader.next(key, value)) {
samples.add(key);
key = reader.createKey();
++records;
if ((i+1) * samplesPerSplit <= records) {
break;
}
}
reader.close();
}
return (K[])samples.toArray();
}
/**
* For each split sampled, emit when the ratio of the number of records
* retained to the total record count is less than the specified
* frequency.
*/
@SuppressWarnings("unchecked") // ArrayList::toArray doesn't preserve type
public K[] getSample(InputFormat<K,V> inf, JobConf job) throws IOException {
InputSplit[] splits = inf.getSplits(job, job.getNumMapTasks());
ArrayList<K> samples = new ArrayList<K>();
int splitsToSample = Math.min(maxSplitsSampled, splits.length);
int splitStep = splits.length / splitsToSample;
long records = 0;
long kept = 0;
for (int i = 0; i < splitsToSample; ++i) {
RecordReader<K,V> reader = inf.getRecordReader(splits[i * splitStep],
job, Reporter.NULL);
K key = reader.createKey();
V value = reader.createValue();
while (reader.next(key, value)) {
++records;
if ((double) kept / records < freq) {
++kept;
samples.add(key);
key = reader.createKey();
}
}
reader.close();
}
return (K[])samples.toArray();
}
/**
* Get paths from a Hive location using the provided input format.
*/
public static Set<Path> getPaths(InputFormat<?, ?> inputFormat, Path location) throws IOException {
JobConf jobConf = new JobConf(getHadoopConfiguration());
Set<Path> paths = Sets.newHashSet();
FileInputFormat.addInputPaths(jobConf, location.toString());
InputSplit[] splits = inputFormat.getSplits(jobConf, 1000);
for (InputSplit split : splits) {
if (!(split instanceof FileSplit)) {
throw new IOException("Not a file split. Found " + split.getClass().getName());
}
FileSplit fileSplit = (FileSplit) split;
paths.add(fileSplit.getPath());
}
return paths;
}
/**
* From each split sampled, take the first numSamples / numSplits records.
*/
@SuppressWarnings("unchecked") // ArrayList::toArray doesn't preserve type
public K[] getSample(InputFormat<K,V> inf, JobConf job) throws IOException {
InputSplit[] splits = inf.getSplits(job, job.getNumMapTasks());
ArrayList<K> samples = new ArrayList<K>(numSamples);
int splitsToSample = Math.min(maxSplitsSampled, splits.length);
int splitStep = splits.length / splitsToSample;
int samplesPerSplit = numSamples / splitsToSample;
long records = 0;
for (int i = 0; i < splitsToSample; ++i) {
RecordReader<K,V> reader = inf.getRecordReader(splits[i * splitStep],
job, Reporter.NULL);
K key = reader.createKey();
V value = reader.createValue();
while (reader.next(key, value)) {
samples.add(key);
key = reader.createKey();
++records;
if ((i+1) * samplesPerSplit <= records) {
break;
}
}
reader.close();
}
return (K[])samples.toArray();
}
/**
* For each split sampled, emit when the ratio of the number of records
* retained to the total record count is less than the specified
* frequency.
*/
@SuppressWarnings("unchecked") // ArrayList::toArray doesn't preserve type
public K[] getSample(InputFormat<K,V> inf, JobConf job) throws IOException {
InputSplit[] splits = inf.getSplits(job, job.getNumMapTasks());
ArrayList<K> samples = new ArrayList<K>();
int splitsToSample = Math.min(maxSplitsSampled, splits.length);
int splitStep = splits.length / splitsToSample;
long records = 0;
long kept = 0;
for (int i = 0; i < splitsToSample; ++i) {
RecordReader<K,V> reader = inf.getRecordReader(splits[i * splitStep],
job, Reporter.NULL);
K key = reader.createKey();
V value = reader.createValue();
while (reader.next(key, value)) {
++records;
if ((double) kept / records < freq) {
++kept;
samples.add(key);
key = reader.createKey();
}
}
reader.close();
}
return (K[])samples.toArray();
}
/**
* For each split sampled, emit when the ratio of the number of records
* retained to the total record count is less than the specified
* frequency.
*/
@SuppressWarnings("unchecked") // ArrayList::toArray doesn't preserve type
public K[] getSample(InputFormat<K,V> inf, JobConf job) throws IOException {
InputSplit[] splits = inf.getSplits(job, job.getNumMapTasks());
ArrayList<K> samples = new ArrayList<K>();
int splitsToSample = Math.min(maxSplitsSampled, splits.length);
int splitStep = splits.length / splitsToSample;
long records = 0;
long kept = 0;
for (int i = 0; i < splitsToSample; ++i) {
RecordReader<K,V> reader = inf.getRecordReader(splits[i * splitStep],
job, Reporter.NULL);
K key = reader.createKey();
V value = reader.createValue();
while (reader.next(key, value)) {
++records;
if ((double) kept / records < freq) {
++kept;
samples.add(key);
key = reader.createKey();
}
}
reader.close();
}
return (K[])samples.toArray();
}
/**
* From each split sampled, take the first numSamples / numSplits records.
*/
@SuppressWarnings("unchecked") // ArrayList::toArray doesn't preserve type
public K[] getSample(InputFormat<K,V> inf, JobConf job) throws IOException {
InputSplit[] splits = inf.getSplits(job, job.getNumMapTasks());
ArrayList<K> samples = new ArrayList<K>(numSamples);
int splitsToSample = Math.min(maxSplitsSampled, splits.length);
int splitStep = splits.length / splitsToSample;
int samplesPerSplit = numSamples / splitsToSample;
long records = 0;
for (int i = 0; i < splitsToSample; ++i) {
RecordReader<K,V> reader = inf.getRecordReader(splits[i * splitStep],
job, Reporter.NULL);
K key = reader.createKey();
V value = reader.createValue();
while (reader.next(key, value)) {
samples.add(key);
key = reader.createKey();
++records;
if ((i+1) * samplesPerSplit <= records) {
break;
}
}
reader.close();
}
return (K[])samples.toArray();
}
private static List<InputSplit> getInputSplits(final InputFormat<?, ?> format, final JobConf job) {
InputSplit[] inputSplits;
try {
inputSplits = format.getSplits(job, 1);
} catch (IOException e) {
throw new RuntimeException(e);
}
if (null == inputSplits) {
return Collections.emptyList();
} else {
return Arrays.asList(inputSplits);
}
}
private InputSplit[] getSplits() throws IOException {
InputFormat<Key, Row> inputFormat = this.gfxdManager.getInputFormat();
try {
return inputFormat.getSplits(this.jobConf, 1);
} catch (FileNotFoundException fnfe) {
throw new FileNotFoundException(
"Table "
+ this.gfxdManager.getTable()
+ " not found. "
+ "The LOCATION string may contain incorrect value for one or more of the following:"
+ "1. Path to HDFSSTORE (homeDir), 2. Schema name or 3. Table name. "
+ GemFireXDManager.LOCATION_FORMAT);
}
}
private InputSplit[] getSplits() throws IOException {
InputFormat<Key, Row> inputFormat = this.gfxdManager.getInputFormat();
try {
return inputFormat.getSplits(this.jobConf, 1);
} catch (FileNotFoundException fnfe) {
throw new FileNotFoundException(
"Table "
+ this.gfxdManager.getTable()
+ " not found. "
+ "The LOCATION string may contain incorrect value for one or more of the following:"
+ "1. Path to HDFSSTORE (homeDir), 2. Schema name or 3. Table name. "
+ GemFireXDManager.LOCATION_FORMAT);
}
}
@SuppressWarnings("rawtypes")
AbstractEvaluatorToPartitionStrategy(
final String inputFormatClassName, final Set<String> serializedDataPartitions) {
LOG.fine("AbstractEvaluatorToPartitionStrategy injected");
Validate.notEmpty(inputFormatClassName);
Validate.notEmpty(serializedDataPartitions);
locationToSplits = new ConcurrentHashMap<>();
evaluatorToSplits = new ConcurrentHashMap<>();
unallocatedSplits = new LinkedBlockingQueue<>();
setUp();
final Map<DistributedDataSetPartition, InputSplit[]> splitsPerPartition = new HashMap<>();
for (final String serializedDataPartition : serializedDataPartitions) {
final DistributedDataSetPartition dp = DistributedDataSetPartitionSerializer.deserialize(serializedDataPartition);
final ExternalConstructor<JobConf> jobConfExternalConstructor = new JobConfExternalConstructor(
inputFormatClassName, dp.getPath());
try {
final JobConf jobConf = jobConfExternalConstructor.newInstance();
final InputFormat inputFormat = jobConf.getInputFormat();
final InputSplit[] inputSplits = inputFormat.getSplits(jobConf, dp.getDesiredSplits());
if (LOG.isLoggable(Level.FINEST)) {
LOG.log(Level.FINEST, "Splits for partition: {0} {1}", new Object[] {dp, Arrays.toString(inputSplits)});
}
this.totalNumberOfSplits += inputSplits.length;
splitsPerPartition.put(dp, inputSplits);
} catch (final IOException e) {
throw new RuntimeException("Unable to get InputSplits using the specified InputFormat", e);
}
}
init(splitsPerPartition);
LOG.log(Level.FINE, "Total Number of splits: {0}", this.totalNumberOfSplits);
}
@Test
public void testEmptyFile() throws Exception {
JobConf job = new JobConf(conf);
Properties properties = new Properties();
HiveOutputFormat<?, ?> outFormat = new OrcOutputFormat();
FileSinkOperator.RecordWriter writer =
outFormat.getHiveRecordWriter(conf, testFilePath, MyRow.class, true,
properties, Reporter.NULL);
writer.close(true);
properties.setProperty("columns", "x,y");
properties.setProperty("columns.types", "int:int");
SerDe serde = new OrcSerde();
serde.initialize(conf, properties);
InputFormat<?,?> in = new OrcInputFormat();
FileInputFormat.setInputPaths(conf, testFilePath.toString());
InputSplit[] splits = in.getSplits(conf, 1);
assertEquals(1, splits.length);
// read the whole file
conf.set("hive.io.file.readcolumn.ids", "0,1");
org.apache.hadoop.mapred.RecordReader reader =
in.getRecordReader(splits[0], conf, Reporter.NULL);
Object key = reader.createKey();
Object value = reader.createValue();
assertEquals(0.0, reader.getProgress(), 0.00001);
assertEquals(0, reader.getPos());
assertEquals(false, reader.next(key, value));
reader.close();
assertEquals(null, serde.getSerDeStats());
}
private void fetchMetaData(HiveTablePartition tablePartition,
boolean hasComplexTypes,
List<Integer> hiveIndexes,
String allColumnNames,
String allColumnTypes)
throws Exception {
InputFormat<?, ?> fformat = makeInputFormat(
tablePartition.storageDesc.getInputFormat(), jobConf);
String profile = null;
String userProfile = context.getProfile();
if (userProfile != null) {
// evaluate optimal profile based on file format if profile was explicitly specified in url
// if user passed accessor+fragmenter+resolver - use them
profile = ProfileFactory.get(fformat, hasComplexTypes, userProfile);
}
String fragmenterForProfile;
if (profile != null) {
fragmenterForProfile = context.getPluginConf().getPlugins(profile).get("FRAGMENTER");
} else {
fragmenterForProfile = context.getFragmenter();
}
FileInputFormat.setInputPaths(jobConf, new Path(
tablePartition.storageDesc.getLocation()));
InputSplit[] splits;
try {
splits = fformat.getSplits(jobConf, 1);
} catch (org.apache.hadoop.mapred.InvalidInputException e) {
LOG.debug("getSplits failed on " + e.getMessage());
return;
}
for (InputSplit split : splits) {
FileSplit fsp = (FileSplit) split;
String[] hosts = fsp.getLocations();
String filepath = fsp.getPath().toString();
byte[] locationInfo = HdfsUtilities.prepareFragmentMetadata(fsp);
byte[] userData = hiveClientWrapper.makeUserData(
fragmenterForProfile,
tablePartition,
filterInFragmenter,
hiveIndexes,
allColumnNames,
allColumnTypes);
Fragment fragment = new Fragment(filepath, hosts, locationInfo,
userData, profile);
fragments.add(fragment);
}
}
/**
* Randomize the split order, then take the specified number of keys from
* each split sampled, where each key is selected with the specified
* probability and possibly replaced by a subsequently selected key when
* the quota of keys from that split is satisfied.
*/
@SuppressWarnings("unchecked") // ArrayList::toArray doesn't preserve type
public K[] getSample(InputFormat<K,V> inf, JobConf job) throws IOException {
InputSplit[] splits = inf.getSplits(job, job.getNumMapTasks());
ArrayList<K> samples = new ArrayList<K>(numSamples);
int splitsToSample = Math.min(maxSplitsSampled, splits.length);
Random r = new Random();
long seed = r.nextLong();
r.setSeed(seed);
LOG.debug("seed: " + seed);
// shuffle splits
for (int i = 0; i < splits.length; ++i) {
InputSplit tmp = splits[i];
int j = r.nextInt(splits.length);
splits[i] = splits[j];
splits[j] = tmp;
}
// our target rate is in terms of the maximum number of sample splits,
// but we accept the possibility of sampling additional splits to hit
// the target sample keyset
for (int i = 0; i < splitsToSample ||
(i < splits.length && samples.size() < numSamples); ++i) {
RecordReader<K,V> reader = inf.getRecordReader(splits[i], job,
Reporter.NULL);
K key = reader.createKey();
V value = reader.createValue();
while (reader.next(key, value)) {
if (r.nextDouble() <= freq) {
if (samples.size() < numSamples) {
samples.add(key);
} else {
// When exceeding the maximum number of samples, replace a
// random element with this one, then adjust the frequency
// to reflect the possibility of existing elements being
// pushed out
int ind = r.nextInt(numSamples);
if (ind != numSamples) {
samples.set(ind, key);
}
freq *= (numSamples - 1) / (double) numSamples;
}
key = reader.createKey();
}
}
reader.close();
}
return (K[])samples.toArray();
}
/**
* Randomize the split order, then take the specified number of keys from
* each split sampled, where each key is selected with the specified
* probability and possibly replaced by a subsequently selected key when
* the quota of keys from that split is satisfied.
*/
@SuppressWarnings("unchecked") // ArrayList::toArray doesn't preserve type
public K[] getSample(InputFormat<K,V> inf, JobConf job) throws IOException {
InputSplit[] splits = inf.getSplits(job, job.getNumMapTasks());
ArrayList<K> samples = new ArrayList<K>(numSamples);
int splitsToSample = Math.min(maxSplitsSampled, splits.length);
Random r = new Random();
long seed = r.nextLong();
r.setSeed(seed);
LOG.debug("seed: " + seed);
// shuffle splits
for (int i = 0; i < splits.length; ++i) {
InputSplit tmp = splits[i];
int j = r.nextInt(splits.length);
splits[i] = splits[j];
splits[j] = tmp;
}
// our target rate is in terms of the maximum number of sample splits,
// but we accept the possibility of sampling additional splits to hit
// the target sample keyset
for (int i = 0; i < splitsToSample ||
(i < splits.length && samples.size() < numSamples); ++i) {
RecordReader<K,V> reader = inf.getRecordReader(splits[i], job,
Reporter.NULL);
K key = reader.createKey();
V value = reader.createValue();
while (reader.next(key, value)) {
if (r.nextDouble() <= freq) {
if (samples.size() < numSamples) {
samples.add(key);
} else {
// When exceeding the maximum number of samples, replace a
// random element with this one, then adjust the frequency
// to reflect the possibility of existing elements being
// pushed out
int ind = r.nextInt(numSamples);
if (ind != numSamples) {
samples.set(ind, key);
}
freq *= (numSamples - 1) / (double) numSamples;
}
key = reader.createKey();
}
}
reader.close();
}
return (K[])samples.toArray();
}
/**
* Randomize the split order, then take the specified number of keys from
* each split sampled, where each key is selected with the specified
* probability and possibly replaced by a subsequently selected key when
* the quota of keys from that split is satisfied.
*/
@SuppressWarnings("unchecked") // ArrayList::toArray doesn't preserve type
public K[] getSample(InputFormat<K,V> inf, JobConf job) throws IOException {
InputSplit[] splits = inf.getSplits(job, job.getNumMapTasks());
ArrayList<K> samples = new ArrayList<K>(numSamples);
int splitsToSample = Math.min(maxSplitsSampled, splits.length);
Random r = new Random();
long seed = r.nextLong();
r.setSeed(seed);
LOG.debug("seed: " + seed);
// shuffle splits
for (int i = 0; i < splits.length; ++i) {
InputSplit tmp = splits[i];
int j = r.nextInt(splits.length);
splits[i] = splits[j];
splits[j] = tmp;
}
// our target rate is in terms of the maximum number of sample splits,
// but we accept the possibility of sampling additional splits to hit
// the target sample keyset
for (int i = 0; i < splitsToSample ||
(i < splits.length && samples.size() < numSamples); ++i) {
RecordReader<K,V> reader = inf.getRecordReader(splits[i], job,
Reporter.NULL);
K key = reader.createKey();
V value = reader.createValue();
while (reader.next(key, value)) {
if (r.nextDouble() <= freq) {
if (samples.size() < numSamples) {
samples.add(key);
} else {
// When exceeding the maximum number of samples, replace a
// random element with this one, then adjust the frequency
// to reflect the possibility of existing elements being
// pushed out
int ind = r.nextInt(numSamples);
if (ind != numSamples) {
samples.set(ind, key);
}
freq *= (numSamples - 1) / (double) numSamples;
}
key = reader.createKey();
}
}
reader.close();
}
return (K[])samples.toArray();
}