org.apache.hadoop.mapred.InputSplit#getLocations ( )源码实例Demo

下面列出了org.apache.hadoop.mapred.InputSplit#getLocations ( ) 实例代码,或者点击链接到github查看源代码,也可以在右侧发表评论。

@Override
protected void updateLocations(final NumberedSplit<InputSplit> numberedSplit) {
  try {
    final InputSplit split = numberedSplit.getEntry();
    final String[] locations = split.getLocations();
    for (final String location : locations) {
      BlockingQueue<NumberedSplit<InputSplit>> newSplitQue = new LinkedBlockingQueue<>();
      final BlockingQueue<NumberedSplit<InputSplit>> splitQue = locationToSplits.putIfAbsent(location, newSplitQue);
      if (splitQue != null) {
        newSplitQue = splitQue;
      }
      newSplitQue.add(numberedSplit);
    }
  } catch (final IOException e) {
    throw new RuntimeException("Unable to get InputSplits using the specified InputFormat", e);
  }
}
 
源代码2 项目: incubator-nemo   文件: SparkSourceUtil.java
/**
 * Gets the source location of a Spark partition.
 *
 * @param partition the partition to get location.
 * @return a list of locations.
 * @throws RuntimeException if failed to get source location.
 */
static List<String> getPartitionLocation(final Partition partition) {
  try {
    if (partition instanceof HadoopPartition) {
      final Field inputSplitField = partition.getClass().getDeclaredField("inputSplit");
      inputSplitField.setAccessible(true);
      final InputSplit inputSplit = (InputSplit) ((SerializableWritable) inputSplitField.get(partition)).value();

      final String[] splitLocations = inputSplit.getLocations();
      final List<String> parsedLocations = new ArrayList<>();

      for (final String loc : splitLocations) {
        final String canonicalHostName = InetAddress.getByName(loc).getCanonicalHostName();
        parsedLocations.add(canonicalHostName);
      }

      if (parsedLocations.size() == 1 && parsedLocations.get(0).equals("localhost")) {
        return Collections.emptyList();
      } else {
        return parsedLocations;
      }
    } else {
      return Collections.emptyList();
    }
  } catch (final Exception e) {
    throw new RuntimeException(e);
  }
}
 
源代码3 项目: hadoop   文件: CompositeInputSplit.java
/**
 * Collect a set of hosts from all child InputSplits.
 */
public String[] getLocations() throws IOException {
  HashSet<String> hosts = new HashSet<String>();
  for (InputSplit s : splits) {
    String[] hints = s.getLocations();
    if (hints != null && hints.length > 0) {
      for (String host : hints) {
        hosts.add(host);
      }
    }
  }
  return hosts.toArray(new String[hosts.size()]);
}
 
源代码4 项目: big-c   文件: CompositeInputSplit.java
/**
 * Collect a set of hosts from all child InputSplits.
 */
public String[] getLocations() throws IOException {
  HashSet<String> hosts = new HashSet<String>();
  for (InputSplit s : splits) {
    String[] hints = s.getLocations();
    if (hints != null && hints.length > 0) {
      for (String host : hints) {
        hosts.add(host);
      }
    }
  }
  return hosts.toArray(new String[hosts.size()]);
}
 
源代码5 项目: RDFS   文件: CompositeInputSplit.java
/**
 * Collect a set of hosts from all child InputSplits.
 */
public String[] getLocations() throws IOException {
  HashSet<String> hosts = new HashSet<String>();
  for (InputSplit s : splits) {
    String[] hints = s.getLocations();
    if (hints != null && hints.length > 0) {
      for (String host : hints) {
        hosts.add(host);
      }
    }
  }
  return hosts.toArray(new String[hosts.size()]);
}
 
源代码6 项目: hadoop-gpu   文件: CompositeInputSplit.java
/**
 * Collect a set of hosts from all child InputSplits.
 */
public String[] getLocations() throws IOException {
  HashSet<String> hosts = new HashSet<String>();
  for (InputSplit s : splits) {
    String[] hints = s.getLocations();
    if (hints != null && hints.length > 0) {
      for (String host : hints) {
        hosts.add(host);
      }
    }
  }
  return hosts.toArray(new String[hosts.size()]);
}