下面列出了org.apache.hadoop.mapred.InputSplit#getLocations ( ) 实例代码,或者点击链接到github查看源代码,也可以在右侧发表评论。
@Override
protected void updateLocations(final NumberedSplit<InputSplit> numberedSplit) {
try {
final InputSplit split = numberedSplit.getEntry();
final String[] locations = split.getLocations();
for (final String location : locations) {
BlockingQueue<NumberedSplit<InputSplit>> newSplitQue = new LinkedBlockingQueue<>();
final BlockingQueue<NumberedSplit<InputSplit>> splitQue = locationToSplits.putIfAbsent(location, newSplitQue);
if (splitQue != null) {
newSplitQue = splitQue;
}
newSplitQue.add(numberedSplit);
}
} catch (final IOException e) {
throw new RuntimeException("Unable to get InputSplits using the specified InputFormat", e);
}
}
/**
* Gets the source location of a Spark partition.
*
* @param partition the partition to get location.
* @return a list of locations.
* @throws RuntimeException if failed to get source location.
*/
static List<String> getPartitionLocation(final Partition partition) {
try {
if (partition instanceof HadoopPartition) {
final Field inputSplitField = partition.getClass().getDeclaredField("inputSplit");
inputSplitField.setAccessible(true);
final InputSplit inputSplit = (InputSplit) ((SerializableWritable) inputSplitField.get(partition)).value();
final String[] splitLocations = inputSplit.getLocations();
final List<String> parsedLocations = new ArrayList<>();
for (final String loc : splitLocations) {
final String canonicalHostName = InetAddress.getByName(loc).getCanonicalHostName();
parsedLocations.add(canonicalHostName);
}
if (parsedLocations.size() == 1 && parsedLocations.get(0).equals("localhost")) {
return Collections.emptyList();
} else {
return parsedLocations;
}
} else {
return Collections.emptyList();
}
} catch (final Exception e) {
throw new RuntimeException(e);
}
}
/**
* Collect a set of hosts from all child InputSplits.
*/
public String[] getLocations() throws IOException {
HashSet<String> hosts = new HashSet<String>();
for (InputSplit s : splits) {
String[] hints = s.getLocations();
if (hints != null && hints.length > 0) {
for (String host : hints) {
hosts.add(host);
}
}
}
return hosts.toArray(new String[hosts.size()]);
}
/**
* Collect a set of hosts from all child InputSplits.
*/
public String[] getLocations() throws IOException {
HashSet<String> hosts = new HashSet<String>();
for (InputSplit s : splits) {
String[] hints = s.getLocations();
if (hints != null && hints.length > 0) {
for (String host : hints) {
hosts.add(host);
}
}
}
return hosts.toArray(new String[hosts.size()]);
}
/**
* Collect a set of hosts from all child InputSplits.
*/
public String[] getLocations() throws IOException {
HashSet<String> hosts = new HashSet<String>();
for (InputSplit s : splits) {
String[] hints = s.getLocations();
if (hints != null && hints.length > 0) {
for (String host : hints) {
hosts.add(host);
}
}
}
return hosts.toArray(new String[hosts.size()]);
}
/**
* Collect a set of hosts from all child InputSplits.
*/
public String[] getLocations() throws IOException {
HashSet<String> hosts = new HashSet<String>();
for (InputSplit s : splits) {
String[] hints = s.getLocations();
if (hints != null && hints.length > 0) {
for (String host : hints) {
hosts.add(host);
}
}
}
return hosts.toArray(new String[hosts.size()]);
}