org.apache.hadoop.io.RawComparator#compare ( )源码实例Demo

下面列出了org.apache.hadoop.io.RawComparator#compare ( ) 实例代码,或者点击链接到github查看源代码,也可以在右侧发表评论。

源代码1 项目: tajo   文件: Bytes.java
/**
 * Binary search for keys in indexes.
 *
 * @param arr array of byte arrays to search for
 * @param key the key you want to find
 * @param offset the offset in the key you want to find
 * @param length the length of the key
 * @param comparator a comparator to compare.
 * @return zero-based index of the key, if the key is present in the array.
 *         Otherwise, a value -(i + 1) such that the key is between arr[i -
 *         1] and arr[i] non-inclusively, where i is in [0, i], if we define
 *         arr[-1] = -Inf and arr[N] = Inf for an N-element array. The above
 *         means that this function can return 2N + 1 different values
 *         ranging from -(N + 1) to N - 1.
 */
public static int binarySearch(byte [][]arr, byte []key, int offset,
                               int length, RawComparator<?> comparator) {
  int low = 0;
  int high = arr.length - 1;

  while (low <= high) {
    int mid = (low+high) >>> 1;
    // we have to compare in this order, because the comparator order
    // has special logic when the 'left side' is a special key.
    int cmp = comparator.compare(key, offset, length,
        arr[mid], 0, arr[mid].length);
    // key lives above the midpoint
    if (cmp > 0)
      low = mid + 1;
      // key lives below the midpoint
    else if (cmp < 0)
      high = mid - 1;
      // BAM. how often does this really happen?
    else
      return mid;
  }
  return - (low+1);
}
 
源代码2 项目: incubator-tajo   文件: Bytes.java
/**
 * Binary search for keys in indexes.
 *
 * @param arr array of byte arrays to search for
 * @param key the key you want to find
 * @param offset the offset in the key you want to find
 * @param length the length of the key
 * @param comparator a comparator to compare.
 * @return zero-based index of the key, if the key is present in the array.
 *         Otherwise, a value -(i + 1) such that the key is between arr[i -
 *         1] and arr[i] non-inclusively, where i is in [0, i], if we define
 *         arr[-1] = -Inf and arr[N] = Inf for an N-element array. The above
 *         means that this function can return 2N + 1 different values
 *         ranging from -(N + 1) to N - 1.
 */
public static int binarySearch(byte [][]arr, byte []key, int offset,
    int length, RawComparator<byte []> comparator) {
  int low = 0;
  int high = arr.length - 1;

  while (low <= high) {
    int mid = (low+high) >>> 1;
    // we have to compare in this order, because the comparator order
    // has special logic when the 'left side' is a special key.
    int cmp = comparator.compare(key, offset, length,
        arr[mid], 0, arr[mid].length);
    // key lives above the midpoint
    if (cmp > 0)
      low = mid + 1;
    // key lives below the midpoint
    else if (cmp < 0)
      high = mid - 1;
    // BAM. how often does this really happen?
    else
      return mid;
  }
  return - (low+1);
}
 
源代码3 项目: hadoop   文件: InputSampler.java
/**
 * Write a partition file for the given job, using the Sampler provided.
 * Queries the sampler for a sample keyset, sorts by the output key
 * comparator, selects the keys for each rank, and writes to the destination
 * returned from {@link TotalOrderPartitioner#getPartitionFile}.
 */
@SuppressWarnings("unchecked") // getInputFormat, getOutputKeyComparator
public static <K,V> void writePartitionFile(Job job, Sampler<K,V> sampler) 
    throws IOException, ClassNotFoundException, InterruptedException {
  Configuration conf = job.getConfiguration();
  final InputFormat inf = 
      ReflectionUtils.newInstance(job.getInputFormatClass(), conf);
  int numPartitions = job.getNumReduceTasks();
  K[] samples = (K[])sampler.getSample(inf, job);
  LOG.info("Using " + samples.length + " samples");
  RawComparator<K> comparator =
    (RawComparator<K>) job.getSortComparator();
  Arrays.sort(samples, comparator);
  Path dst = new Path(TotalOrderPartitioner.getPartitionFile(conf));
  FileSystem fs = dst.getFileSystem(conf);
  if (fs.exists(dst)) {
    fs.delete(dst, false);
  }
  SequenceFile.Writer writer = SequenceFile.createWriter(fs, 
    conf, dst, job.getMapOutputKeyClass(), NullWritable.class);
  NullWritable nullValue = NullWritable.get();
  float stepSize = samples.length / (float) numPartitions;
  int last = -1;
  for(int i = 1; i < numPartitions; ++i) {
    int k = Math.round(stepSize * i);
    while (last >= k && comparator.compare(samples[last], samples[k]) == 0) {
      ++k;
    }
    writer.append(samples[k], nullValue);
    last = k;
  }
  writer.close();
}
 
源代码4 项目: big-c   文件: InputSampler.java
/**
 * Write a partition file for the given job, using the Sampler provided.
 * Queries the sampler for a sample keyset, sorts by the output key
 * comparator, selects the keys for each rank, and writes to the destination
 * returned from {@link TotalOrderPartitioner#getPartitionFile}.
 */
@SuppressWarnings("unchecked") // getInputFormat, getOutputKeyComparator
public static <K,V> void writePartitionFile(Job job, Sampler<K,V> sampler) 
    throws IOException, ClassNotFoundException, InterruptedException {
  Configuration conf = job.getConfiguration();
  final InputFormat inf = 
      ReflectionUtils.newInstance(job.getInputFormatClass(), conf);
  int numPartitions = job.getNumReduceTasks();
  K[] samples = (K[])sampler.getSample(inf, job);
  LOG.info("Using " + samples.length + " samples");
  RawComparator<K> comparator =
    (RawComparator<K>) job.getSortComparator();
  Arrays.sort(samples, comparator);
  Path dst = new Path(TotalOrderPartitioner.getPartitionFile(conf));
  FileSystem fs = dst.getFileSystem(conf);
  if (fs.exists(dst)) {
    fs.delete(dst, false);
  }
  SequenceFile.Writer writer = SequenceFile.createWriter(fs, 
    conf, dst, job.getMapOutputKeyClass(), NullWritable.class);
  NullWritable nullValue = NullWritable.get();
  float stepSize = samples.length / (float) numPartitions;
  int last = -1;
  for(int i = 1; i < numPartitions; ++i) {
    int k = Math.round(stepSize * i);
    while (last >= k && comparator.compare(samples[last], samples[k]) == 0) {
      ++k;
    }
    writer.append(samples[k], nullValue);
    last = k;
  }
  writer.close();
}
 
源代码5 项目: spork   文件: TestPigTupleRawComparator.java
private int compareHelper(NullableTuple t1, NullableTuple t2, RawComparator comparator) throws IOException {
    t1.write(dos1);
    t2.write(dos2);
    byte[] b1 = baos1.toByteArray();
    byte[] b2 = baos2.toByteArray();
    baos1.reset();
    baos2.reset();
    return comparator.compare(b1, 0, b1.length, b2, 0, b2.length);
}
 
源代码6 项目: RDFS   文件: TotalOrderPartitioner.java
/**
 * Read in the partition file and build indexing data structures.
 * If the keytype is {@link org.apache.hadoop.io.BinaryComparable} and
 * <tt>total.order.partitioner.natural.order</tt> is not false, a trie
 * of the first <tt>total.order.partitioner.max.trie.depth</tt>(2) + 1 bytes
 * will be built. Otherwise, keys will be located using a binary search of
 * the partition keyset using the {@link org.apache.hadoop.io.RawComparator}
 * defined for this job. The input file must be sorted with the same
 * comparator and contain {@link
   org.apache.hadoop.mapred.JobConf#getNumReduceTasks} - 1 keys.
 */
@SuppressWarnings("unchecked") // keytype from conf not static
public void configure(JobConf job) {
  try {
    String parts = getPartitionFile(job);
    final Path partFile = new Path(parts);
    final FileSystem fs = (DEFAULT_PATH.equals(parts))
      ? FileSystem.getLocal(job)     // assume in DistributedCache
      : partFile.getFileSystem(job);

    Class<K> keyClass = (Class<K>)job.getMapOutputKeyClass();
    K[] splitPoints = readPartitions(fs, partFile, keyClass, job);
    if (splitPoints.length != job.getNumReduceTasks() - 1) {
      throw new IOException("Wrong number of partitions in keyset");
    }
    RawComparator<K> comparator =
      (RawComparator<K>) job.getOutputKeyComparator();
    for (int i = 0; i < splitPoints.length - 1; ++i) {
      if (comparator.compare(splitPoints[i], splitPoints[i+1]) >= 0) {
        throw new IOException("Split points are out of order");
      }
    }
    boolean natOrder =
      job.getBoolean("total.order.partitioner.natural.order", true);
    if (natOrder && BinaryComparable.class.isAssignableFrom(keyClass)) {
      partitions = buildTrie((BinaryComparable[])splitPoints, 0,
          splitPoints.length, new byte[0],
          job.getInt("total.order.partitioner.max.trie.depth", 2));
    } else {
      partitions = new BinarySearchNode(splitPoints, comparator);
    }
  } catch (IOException e) {
    throw new IllegalArgumentException("Can't read partitions file", e);
  }
}
 
源代码7 项目: RDFS   文件: InputSampler.java
/**
 * Write a partition file for the given job, using the Sampler provided.
 * Queries the sampler for a sample keyset, sorts by the output key
 * comparator, selects the keys for each rank, and writes to the destination
 * returned from {@link
   org.apache.hadoop.mapred.lib.TotalOrderPartitioner#getPartitionFile}.
 */
@SuppressWarnings("unchecked") // getInputFormat, getOutputKeyComparator
public static <K,V> void writePartitionFile(JobConf job,
    Sampler<K,V> sampler) throws IOException {
  final InputFormat<K,V> inf = (InputFormat<K,V>) job.getInputFormat();
  int numPartitions = job.getNumReduceTasks();
  K[] samples = sampler.getSample(inf, job);
  LOG.info("Using " + samples.length + " samples");
  RawComparator<K> comparator =
    (RawComparator<K>) job.getOutputKeyComparator();
  Arrays.sort(samples, comparator);
  Path dst = new Path(TotalOrderPartitioner.getPartitionFile(job));
  FileSystem fs = dst.getFileSystem(job);
  if (fs.exists(dst)) {
    fs.delete(dst, false);
  }
  SequenceFile.Writer writer = SequenceFile.createWriter(fs, job, dst,
      job.getMapOutputKeyClass(), NullWritable.class);
  NullWritable nullValue = NullWritable.get();
  float stepSize = samples.length / (float) numPartitions;
  int last = -1;
  for(int i = 1; i < numPartitions; ++i) {
    int k = Math.round(stepSize * i);
    while (last >= k && comparator.compare(samples[last], samples[k]) == 0) {
      ++k;
    }
    writer.append(samples[k], nullValue);
    last = k;
  }
  writer.close();
}
 
源代码8 项目: hadoop-gpu   文件: TotalOrderPartitioner.java
/**
 * Read in the partition file and build indexing data structures.
 * If the keytype is {@link org.apache.hadoop.io.BinaryComparable} and
 * <tt>total.order.partitioner.natural.order</tt> is not false, a trie
 * of the first <tt>total.order.partitioner.max.trie.depth</tt>(2) + 1 bytes
 * will be built. Otherwise, keys will be located using a binary search of
 * the partition keyset using the {@link org.apache.hadoop.io.RawComparator}
 * defined for this job. The input file must be sorted with the same
 * comparator and contain {@link
   org.apache.hadoop.mapred.JobConf#getNumReduceTasks} - 1 keys.
 */
@SuppressWarnings("unchecked") // keytype from conf not static
public void configure(JobConf job) {
  try {
    String parts = getPartitionFile(job);
    final Path partFile = new Path(parts);
    final FileSystem fs = (DEFAULT_PATH.equals(parts))
      ? FileSystem.getLocal(job)     // assume in DistributedCache
      : partFile.getFileSystem(job);

    Class<K> keyClass = (Class<K>)job.getMapOutputKeyClass();
    K[] splitPoints = readPartitions(fs, partFile, keyClass, job);
    if (splitPoints.length != job.getNumReduceTasks() - 1) {
      throw new IOException("Wrong number of partitions in keyset");
    }
    RawComparator<K> comparator =
      (RawComparator<K>) job.getOutputKeyComparator();
    for (int i = 0; i < splitPoints.length - 1; ++i) {
      if (comparator.compare(splitPoints[i], splitPoints[i+1]) >= 0) {
        throw new IOException("Split points are out of order");
      }
    }
    boolean natOrder =
      job.getBoolean("total.order.partitioner.natural.order", true);
    if (natOrder && BinaryComparable.class.isAssignableFrom(keyClass)) {
      partitions = buildTrie((BinaryComparable[])splitPoints, 0,
          splitPoints.length, new byte[0],
          job.getInt("total.order.partitioner.max.trie.depth", 2));
    } else {
      partitions = new BinarySearchNode(splitPoints, comparator);
    }
  } catch (IOException e) {
    throw new IllegalArgumentException("Can't read partitions file", e);
  }
}
 
源代码9 项目: hadoop-gpu   文件: InputSampler.java
/**
 * Write a partition file for the given job, using the Sampler provided.
 * Queries the sampler for a sample keyset, sorts by the output key
 * comparator, selects the keys for each rank, and writes to the destination
 * returned from {@link
   org.apache.hadoop.mapred.lib.TotalOrderPartitioner#getPartitionFile}.
 */
@SuppressWarnings("unchecked") // getInputFormat, getOutputKeyComparator
public static <K,V> void writePartitionFile(JobConf job,
    Sampler<K,V> sampler) throws IOException {
  final InputFormat<K,V> inf = (InputFormat<K,V>) job.getInputFormat();
  int numPartitions = job.getNumReduceTasks();
  K[] samples = sampler.getSample(inf, job);
  LOG.info("Using " + samples.length + " samples");
  RawComparator<K> comparator =
    (RawComparator<K>) job.getOutputKeyComparator();
  Arrays.sort(samples, comparator);
  Path dst = new Path(TotalOrderPartitioner.getPartitionFile(job));
  FileSystem fs = dst.getFileSystem(job);
  if (fs.exists(dst)) {
    fs.delete(dst, false);
  }
  SequenceFile.Writer writer = SequenceFile.createWriter(fs, job, dst,
      job.getMapOutputKeyClass(), NullWritable.class);
  NullWritable nullValue = NullWritable.get();
  float stepSize = samples.length / (float) numPartitions;
  int last = -1;
  for(int i = 1; i < numPartitions; ++i) {
    int k = Math.round(stepSize * i);
    while (last >= k && comparator.compare(samples[last], samples[k]) == 0) {
      ++k;
    }
    writer.append(samples[k], nullValue);
    last = k;
  }
  writer.close();
}
 
源代码10 项目: hadoop   文件: TotalOrderPartitioner.java
/**
 * Read in the partition file and build indexing data structures.
 * If the keytype is {@link org.apache.hadoop.io.BinaryComparable} and
 * <tt>total.order.partitioner.natural.order</tt> is not false, a trie
 * of the first <tt>total.order.partitioner.max.trie.depth</tt>(2) + 1 bytes
 * will be built. Otherwise, keys will be located using a binary search of
 * the partition keyset using the {@link org.apache.hadoop.io.RawComparator}
 * defined for this job. The input file must be sorted with the same
 * comparator and contain {@link Job#getNumReduceTasks()} - 1 keys.
 */
@SuppressWarnings("unchecked") // keytype from conf not static
public void setConf(Configuration conf) {
  try {
    this.conf = conf;
    String parts = getPartitionFile(conf);
    final Path partFile = new Path(parts);
    final FileSystem fs = (DEFAULT_PATH.equals(parts))
      ? FileSystem.getLocal(conf)     // assume in DistributedCache
      : partFile.getFileSystem(conf);

    Job job = Job.getInstance(conf);
    Class<K> keyClass = (Class<K>)job.getMapOutputKeyClass();
    K[] splitPoints = readPartitions(fs, partFile, keyClass, conf);
    if (splitPoints.length != job.getNumReduceTasks() - 1) {
      throw new IOException("Wrong number of partitions in keyset");
    }
    RawComparator<K> comparator =
      (RawComparator<K>) job.getSortComparator();
    for (int i = 0; i < splitPoints.length - 1; ++i) {
      if (comparator.compare(splitPoints[i], splitPoints[i+1]) >= 0) {
        throw new IOException("Split points are out of order");
      }
    }
    boolean natOrder =
      conf.getBoolean(NATURAL_ORDER, true);
    if (natOrder && BinaryComparable.class.isAssignableFrom(keyClass)) {
      partitions = buildTrie((BinaryComparable[])splitPoints, 0,
          splitPoints.length, new byte[0],
          // Now that blocks of identical splitless trie nodes are 
          // represented reentrantly, and we develop a leaf for any trie
          // node with only one split point, the only reason for a depth
          // limit is to refute stack overflow or bloat in the pathological
          // case where the split points are long and mostly look like bytes 
          // iii...iixii...iii   .  Therefore, we make the default depth
          // limit large but not huge.
          conf.getInt(MAX_TRIE_DEPTH, 200));
    } else {
      partitions = new BinarySearchNode(splitPoints, comparator);
    }
  } catch (IOException e) {
    throw new IllegalArgumentException("Can't read partitions file", e);
  }
}
 
源代码11 项目: big-c   文件: TotalOrderPartitioner.java
/**
 * Read in the partition file and build indexing data structures.
 * If the keytype is {@link org.apache.hadoop.io.BinaryComparable} and
 * <tt>total.order.partitioner.natural.order</tt> is not false, a trie
 * of the first <tt>total.order.partitioner.max.trie.depth</tt>(2) + 1 bytes
 * will be built. Otherwise, keys will be located using a binary search of
 * the partition keyset using the {@link org.apache.hadoop.io.RawComparator}
 * defined for this job. The input file must be sorted with the same
 * comparator and contain {@link Job#getNumReduceTasks()} - 1 keys.
 */
@SuppressWarnings("unchecked") // keytype from conf not static
public void setConf(Configuration conf) {
  try {
    this.conf = conf;
    String parts = getPartitionFile(conf);
    final Path partFile = new Path(parts);
    final FileSystem fs = (DEFAULT_PATH.equals(parts))
      ? FileSystem.getLocal(conf)     // assume in DistributedCache
      : partFile.getFileSystem(conf);

    Job job = Job.getInstance(conf);
    Class<K> keyClass = (Class<K>)job.getMapOutputKeyClass();
    K[] splitPoints = readPartitions(fs, partFile, keyClass, conf);
    if (splitPoints.length != job.getNumReduceTasks() - 1) {
      throw new IOException("Wrong number of partitions in keyset");
    }
    RawComparator<K> comparator =
      (RawComparator<K>) job.getSortComparator();
    for (int i = 0; i < splitPoints.length - 1; ++i) {
      if (comparator.compare(splitPoints[i], splitPoints[i+1]) >= 0) {
        throw new IOException("Split points are out of order");
      }
    }
    boolean natOrder =
      conf.getBoolean(NATURAL_ORDER, true);
    if (natOrder && BinaryComparable.class.isAssignableFrom(keyClass)) {
      partitions = buildTrie((BinaryComparable[])splitPoints, 0,
          splitPoints.length, new byte[0],
          // Now that blocks of identical splitless trie nodes are 
          // represented reentrantly, and we develop a leaf for any trie
          // node with only one split point, the only reason for a depth
          // limit is to refute stack overflow or bloat in the pathological
          // case where the split points are long and mostly look like bytes 
          // iii...iixii...iii   .  Therefore, we make the default depth
          // limit large but not huge.
          conf.getInt(MAX_TRIE_DEPTH, 200));
    } else {
      partitions = new BinarySearchNode(splitPoints, comparator);
    }
  } catch (IOException e) {
    throw new IllegalArgumentException("Can't read partitions file", e);
  }
}
 
 同类方法