下面列出了怎么用org.apache.hadoop.io.RawComparator的API类实例代码及写法,或者点击链接到github查看源代码。
public static <K extends Object, V extends Object>
TezRawKeyValueIterator merge(Configuration conf, FileSystem fs,
Class keyClass, Class valueClass,
List<Segment> segments,
int mergeFactor, Path tmpDir,
RawComparator comparator, Progressable reporter,
boolean sortSegments,
TezCounter readsCounter,
TezCounter writesCounter,
TezCounter bytesReadCounter,
Progress mergePhase)
throws IOException, InterruptedException {
return new MergeQueue(conf, fs, segments, comparator, reporter,
sortSegments, false).merge(keyClass, valueClass,
mergeFactor, tmpDir,
readsCounter, writesCounter,
bytesReadCounter, mergePhase);
}
public MergeQueue(Configuration conf, FileSystem fs,
Path[] inputs, boolean deleteInputs,
CompressionCodec codec, RawComparator<K> comparator,
Progressable reporter)
throws IOException {
this.conf = conf;
this.fs = fs;
this.codec = codec;
this.comparator = comparator;
this.reporter = reporter;
for (Path file : inputs) {
segments.add(new Segment<K, V>(conf, fs, file, codec, !deleteInputs));
}
// Sort segments on file-lengths
Collections.sort(segments, segmentComparator);
}
public SkippingReduceValuesIterator(RawKeyValueIterator in,
RawComparator<KEY> comparator, Class<KEY> keyClass,
Class<VALUE> valClass, Configuration conf, TaskReporter reporter,
TaskUmbilicalProtocol umbilical) throws IOException {
super(in, comparator, keyClass, valClass, conf, reporter);
this.umbilical = umbilical;
this.skipGroupCounter =
reporter.getCounter(Counter.REDUCE_SKIPPED_GROUPS);
this.skipRecCounter =
reporter.getCounter(Counter.REDUCE_SKIPPED_RECORDS);
this.toWriteSkipRecs = toWriteSkipRecs() &&
SkipBadRecords.getSkipOutputPath(conf)!=null;
this.keyClass = keyClass;
this.valClass = valClass;
this.reporter = reporter;
skipIt = getSkipRanges().skipRangeIterator();
mayBeSkip();
}
public static <K extends Object, V extends Object>
RawKeyValueIterator merge(Configuration conf, FileSystem fs,
Class<K> keyClass, Class<V> valueClass,
CompressionCodec codec,
Path[] inputs, boolean deleteInputs,
int mergeFactor, Path tmpDir,
RawComparator<K> comparator, Progressable reporter,
Counters.Counter readsCounter,
Counters.Counter writesCounter,
Progress mergePhase)
throws IOException {
return
new MergeQueue<K, V>(conf, fs, inputs, deleteInputs, codec, comparator,
reporter, null,
TaskType.REDUCE).merge(keyClass, valueClass,
mergeFactor, tmpDir,
readsCounter, writesCounter,
mergePhase);
}
public static <K extends Object, V extends Object>
RawKeyValueIterator merge(Configuration conf, FileSystem fs,
Class<K> keyClass, Class<V> valueClass,
CompressionCodec codec,
Path[] inputs, boolean deleteInputs,
int mergeFactor, Path tmpDir,
RawComparator<K> comparator,
Progressable reporter,
Counters.Counter readsCounter,
Counters.Counter writesCounter,
Counters.Counter mergedMapOutputsCounter,
Progress mergePhase)
throws IOException {
return
new MergeQueue<K, V>(conf, fs, inputs, deleteInputs, codec, comparator,
reporter, mergedMapOutputsCounter,
TaskType.REDUCE).merge(
keyClass, valueClass,
mergeFactor, tmpDir,
readsCounter, writesCounter,
mergePhase);
}
public static <K extends Object, V extends Object>
RawKeyValueIterator merge(Configuration conf, FileSystem fs,
Class<K> keyClass, Class<V> valueClass,
CompressionCodec codec,
Path[] inputs, boolean deleteInputs,
int mergeFactor, Path tmpDir,
RawComparator<K> comparator, Progressable reporter,
Counters.Counter readsCounter,
Counters.Counter writesCounter)
throws IOException {
return
new MergeQueue<K, V>(conf, fs, inputs, deleteInputs, codec, comparator,
reporter).merge(keyClass, valueClass,
mergeFactor, tmpDir,
readsCounter, writesCounter);
}
private RawComparator getComparator(TestWithComparator comparator) {
switch (comparator) {
case LONG:
return new LongWritable.Comparator();
case INT:
return new IntWritable.Comparator();
case BYTES:
return new BytesWritable.Comparator();
case TEZ_BYTES:
return new TezBytesComparator();
case TEXT:
return new Text.Comparator();
case CUSTOM:
return new CustomKey.Comparator();
default:
return null;
}
}
public static <K extends Object, V extends Object>
RawKeyValueIterator merge(Configuration conf, FileSystem fs,
Class<K> keyClass, Class<V> valueClass,
CompressionCodec codec,
List<Segment<K, V>> segments,
int mergeFactor, Path tmpDir,
RawComparator<K> comparator, Progressable reporter,
boolean sortSegments,
Counters.Counter readsCounter,
Counters.Counter writesCounter,
Progress mergePhase,
TaskType taskType)
throws IOException {
return new MergeQueue<K, V>(conf, fs, segments, comparator, reporter,
sortSegments, codec,
taskType).merge(keyClass, valueClass,
mergeFactor, tmpDir,
readsCounter, writesCounter,
mergePhase);
}
public SkippingReduceValuesIterator(RawKeyValueIterator in,
RawComparator<KEY> comparator, Class<KEY> keyClass,
Class<VALUE> valClass, Configuration conf, TaskReporter reporter,
TaskUmbilicalProtocol umbilical) throws IOException {
super(in, comparator, keyClass, valClass, conf, reporter);
this.umbilical = umbilical;
this.skipGroupCounter =
reporter.getCounter(Counter.REDUCE_SKIPPED_GROUPS);
this.skipRecCounter =
reporter.getCounter(Counter.REDUCE_SKIPPED_RECORDS);
this.toWriteSkipRecs = toWriteSkipRecs() &&
SkipBadRecords.getSkipOutputPath(conf)!=null;
this.keyClass = keyClass;
this.valClass = valClass;
this.reporter = reporter;
skipIt = getSkipRanges().skipRangeIterator();
mayBeSkip();
}
public ValuesIterator (RawKeyValueIterator in,
RawComparator<KEY> comparator,
Class<KEY> keyClass,
Class<VALUE> valClass, Configuration conf,
Progressable reporter)
throws IOException {
this.in = in;
this.comparator = comparator;
this.reporter = reporter;
SerializationFactory serializationFactory = new SerializationFactory(conf);
this.keyDeserializer = serializationFactory.getDeserializer(keyClass);
this.keyDeserializer.open(keyIn);
this.valDeserializer = serializationFactory.getDeserializer(valClass);
this.valDeserializer.open(this.valueIn);
readNextKey();
key = nextKey;
nextKey = null; // force new instance creation
hasNext = more;
}
public ValuesIterator (TezRawKeyValueIterator in,
RawComparator<KEY> comparator,
Class<KEY> keyClass,
Class<VALUE> valClass, Configuration conf,
TezCounter inputKeyCounter,
TezCounter inputValueCounter)
throws IOException {
this.in = in;
this.comparator = comparator;
this.inputKeyCounter = inputKeyCounter;
this.inputValueCounter = inputValueCounter;
SerializationFactory serializationFactory = new SerializationFactory(conf);
this.keyDeserializer = serializationFactory.getDeserializer(keyClass);
this.keyDeserializer.open(keyIn);
this.valDeserializer = serializationFactory.getDeserializer(valClass);
this.valDeserializer.open(this.valueIn);
}
@SuppressWarnings("unchecked")
@Override
public int getPartition(PigNullableWritable key, Writable value,
int numPartitions){
if (!inited) {
init();
}
if (comparator == null) {
comparator = (RawComparator<PigNullableWritable>)PigMapReduce.sJobContext.getSortComparator();
}
if(!weightedParts.containsKey(key)){
int index = Arrays.binarySearch(quantiles, key, comparator);
if (index < 0)
index = -index-1;
else
index = index + 1;
return Math.min(index, numPartitions - 1);
}
DiscreteProbabilitySampleGenerator gen = weightedParts.get(key);
return gen.getNext();
}
public ValuesIterator (TezRawKeyValueIterator in,
RawComparator<KEY> comparator,
Class<KEY> keyClass,
Class<VALUE> valClass, Configuration conf,
TezCounter inputKeyCounter,
TezCounter inputValueCounter)
throws IOException {
this.in = in;
this.comparator = comparator;
this.inputKeyCounter = inputKeyCounter;
this.inputValueCounter = inputValueCounter;
SerializationFactory serializationFactory = new SerializationFactory(conf);
this.keyDeserializer = serializationFactory.getDeserializer(keyClass);
this.keyDeserializer.open(keyIn);
this.valDeserializer = serializationFactory.getDeserializer(valClass);
this.valDeserializer.open(this.valueIn);
}
public static <K extends Object, V extends Object>
TezRawKeyValueIterator merge(Configuration conf, FileSystem fs,
Class keyClass, Class valueClass,
List<Segment> segments,
int mergeFactor, Path tmpDir,
RawComparator comparator, Progressable reporter,
boolean sortSegments,
TezCounter readsCounter,
TezCounter writesCounter,
TezCounter bytesReadCounter,
Progress mergePhase)
throws IOException {
return new MergeQueue(conf, fs, segments, comparator, reporter,
sortSegments, false).merge(keyClass, valueClass,
mergeFactor, tmpDir,
readsCounter, writesCounter,
bytesReadCounter, mergePhase);
}
public MergeQueue(Configuration conf, FileSystem fs,
List<Segment> segments, RawComparator comparator,
Progressable reporter, boolean sortSegments, CompressionCodec codec,
boolean considerFinalMergeForProgress) {
this(conf, fs, segments, comparator, reporter, sortSegments, considerFinalMergeForProgress);
this.codec = codec;
}
public ReduceValuesIterator (RawKeyValueIterator in,
RawComparator<KEY> comparator,
Class<KEY> keyClass,
Class<VALUE> valClass,
Configuration conf, Progressable reporter)
throws IOException {
super(in, comparator, keyClass, valClass, conf, reporter);
}
public static <K extends Object, V extends Object>
RawKeyValueIterator merge(Configuration conf, FileSystem fs,
Class<K> keyClass, Class<V> valueClass,
List<Segment<K, V>> segments,
int mergeFactor, Path tmpDir,
RawComparator<K> comparator, Progressable reporter,
boolean sortSegments,
Counters.Counter readsCounter,
Counters.Counter writesCounter)
throws IOException {
return new MergeQueue<K, V>(conf, fs, segments, comparator, reporter,
sortSegments).merge(keyClass, valueClass,
mergeFactor, tmpDir,
readsCounter, writesCounter);
}
public ReduceValuesIterator (RawKeyValueIterator in,
RawComparator<KEY> comparator,
Class<KEY> keyClass,
Class<VALUE> valClass,
Configuration conf, Progressable reporter)
throws IOException {
super(in, comparator, keyClass, valClass, conf, reporter);
}
@SuppressWarnings("unchecked")
protected static <INKEY,INVALUE,OUTKEY,OUTVALUE>
org.apache.hadoop.mapreduce.Reducer<INKEY,INVALUE,OUTKEY,OUTVALUE>.Context
createReduceContext(org.apache.hadoop.mapreduce.Reducer
<INKEY,INVALUE,OUTKEY,OUTVALUE> reducer,
Configuration job,
org.apache.hadoop.mapreduce.TaskAttemptID taskId,
RawKeyValueIterator rIter,
org.apache.hadoop.mapreduce.Counter inputKeyCounter,
org.apache.hadoop.mapreduce.Counter inputValueCounter,
org.apache.hadoop.mapreduce.RecordWriter<OUTKEY,OUTVALUE> output,
org.apache.hadoop.mapreduce.OutputCommitter committer,
org.apache.hadoop.mapreduce.StatusReporter reporter,
RawComparator<INKEY> comparator,
Class<INKEY> keyClass, Class<INVALUE> valueClass
) throws IOException, InterruptedException {
org.apache.hadoop.mapreduce.ReduceContext<INKEY, INVALUE, OUTKEY, OUTVALUE>
reduceContext =
new ReduceContextImpl<INKEY, INVALUE, OUTKEY, OUTVALUE>(job, taskId,
rIter,
inputKeyCounter,
inputValueCounter,
output,
committer,
reporter,
comparator,
keyClass,
valueClass);
org.apache.hadoop.mapreduce.Reducer<INKEY,INVALUE,OUTKEY,OUTVALUE>.Context
reducerContext =
new WrappedReducer<INKEY, INVALUE, OUTKEY, OUTVALUE>().getReducerContext(
reduceContext);
return reducerContext;
}
public static <K extends Object, V extends Object>
RawKeyValueIterator merge(Configuration conf, FileSystem fs,
Class<K> keyClass, Class<V> valueClass,
List<Segment<K, V>> segments,
int mergeFactor, Path tmpDir,
RawComparator<K> comparator, Progressable reporter,
Counters.Counter readsCounter,
Counters.Counter writesCounter,
Progress mergePhase)
throws IOException {
return merge(conf, fs, keyClass, valueClass, segments, mergeFactor, tmpDir,
comparator, reporter, false, readsCounter, writesCounter,
mergePhase);
}
/**
* Read in the partition file and build indexing data structures.
* If the keytype is {@link org.apache.hadoop.io.BinaryComparable} and
* <tt>total.order.partitioner.natural.order</tt> is not false, a trie
* of the first <tt>total.order.partitioner.max.trie.depth</tt>(2) + 1 bytes
* will be built. Otherwise, keys will be located using a binary search of
* the partition keyset using the {@link org.apache.hadoop.io.RawComparator}
* defined for this job. The input file must be sorted with the same
* comparator and contain {@link
org.apache.hadoop.mapred.JobConf#getNumReduceTasks} - 1 keys.
*/
@SuppressWarnings("unchecked") // keytype from conf not static
public void configure(JobConf job) {
try {
String parts = getPartitionFile(job);
final Path partFile = new Path(parts);
final FileSystem fs = (DEFAULT_PATH.equals(parts))
? FileSystem.getLocal(job) // assume in DistributedCache
: partFile.getFileSystem(job);
Class<K> keyClass = (Class<K>)job.getMapOutputKeyClass();
K[] splitPoints = readPartitions(fs, partFile, keyClass, job);
if (splitPoints.length != job.getNumReduceTasks() - 1) {
throw new IOException("Wrong number of partitions in keyset");
}
RawComparator<K> comparator =
(RawComparator<K>) job.getOutputKeyComparator();
for (int i = 0; i < splitPoints.length - 1; ++i) {
if (comparator.compare(splitPoints[i], splitPoints[i+1]) >= 0) {
throw new IOException("Split points are out of order");
}
}
boolean natOrder =
job.getBoolean("total.order.partitioner.natural.order", true);
if (natOrder && BinaryComparable.class.isAssignableFrom(keyClass)) {
partitions = buildTrie((BinaryComparable[])splitPoints, 0,
splitPoints.length, new byte[0],
job.getInt("total.order.partitioner.max.trie.depth", 2));
} else {
partitions = new BinarySearchNode(splitPoints, comparator);
}
} catch (IOException e) {
throw new IllegalArgumentException("Can't read partitions file", e);
}
}
public static <K extends Object, V extends Object>
RawKeyValueIterator merge(Configuration conf, FileSystem fs,
Class<K> keyClass, Class<V> valueClass,
List<Segment<K, V>> segments,
int mergeFactor, Path tmpDir,
RawComparator<K> comparator, Progressable reporter,
Counters.Counter readsCounter,
Counters.Counter writesCounter)
throws IOException {
return merge(conf, fs, keyClass, valueClass, segments, mergeFactor, tmpDir,
comparator, reporter, false, readsCounter, writesCounter);
}
@SuppressWarnings("unchecked")
protected OldCombinerRunner(Class<? extends Reducer<K,V,K,V>> cls,
JobConf conf,
Counters.Counter inputCounter,
TaskReporter reporter) {
super(inputCounter, conf, reporter);
combinerClass = cls;
keyClass = (Class<K>) job.getMapOutputKeyClass();
valueClass = (Class<V>) job.getMapOutputValueClass();
comparator = (RawComparator<K>)
job.getCombinerKeyGroupingComparator();
}
/**
* Get the user defined {@link WritableComparable} comparator for
* grouping keys of inputs to the reduce.
*
* @return comparator set by the user for grouping values.
* @see #setOutputValueGroupingComparator(Class) for details.
*/
public RawComparator getOutputValueGroupingComparator() {
Class<? extends RawComparator> theClass = getClass(
JobContext.GROUP_COMPARATOR_CLASS, null, RawComparator.class);
if (theClass == null) {
return getOutputKeyComparator();
}
return ReflectionUtils.newInstance(theClass, this);
}
@SuppressWarnings("unchecked")
static BytesComparator makeComparator(String comparator) {
if (comparator.length() == 0) {
// unsorted keys
return null;
}
if (comparator.equals(COMPARATOR_MEMCMP)) {
// default comparator
return new BytesComparator(new MemcmpRawComparator());
} else if (comparator.startsWith(COMPARATOR_JCLASS)) {
String compClassName =
comparator.substring(COMPARATOR_JCLASS.length()).trim();
try {
Class compClass = Class.forName(compClassName);
// use its default ctor to create an instance
return new BytesComparator((RawComparator<Object>) compClass
.newInstance());
} catch (Exception e) {
throw new IllegalArgumentException(
"Failed to instantiate comparator: " + comparator + "("
+ e.toString() + ")");
}
} else {
throw new IllegalArgumentException("Unsupported comparator: "
+ comparator);
}
}
protected OldCombinerRunner(Class<? extends Reducer<K,V,K,V>> cls,
JobConf conf,
Counters.Counter inputCounter,
TaskReporter reporter) {
super(inputCounter, conf, reporter);
combinerClass = cls;
keyClass = (Class<K>) job.getMapOutputKeyClass();
valueClass = (Class<V>) job.getMapOutputValueClass();
comparator = (RawComparator<K>) job.getOutputKeyComparator();
}
/**
* Get the user defined {@link WritableComparable} comparator for
* grouping keys of inputs to the combiner.
*
* @return comparator set by the user for grouping values.
* @see #setCombinerKeyGroupingComparator(Class) for details.
*/
public RawComparator getCombinerKeyGroupingComparator() {
Class<? extends RawComparator> theClass = getClass(
JobContext.COMBINER_GROUP_COMPARATOR_CLASS, null, RawComparator.class);
if (theClass == null) {
return getOutputKeyComparator();
}
return ReflectionUtils.newInstance(theClass, this);
}
/**
* Get the user defined {@link WritableComparable} comparator for
* grouping keys of inputs to the reduce.
*
* @return comparator set by the user for grouping values.
* @see #setOutputValueGroupingComparator(Class) for details.
*/
public RawComparator getOutputValueGroupingComparator() {
Class<? extends RawComparator> theClass = getClass(
JobContext.GROUP_COMPARATOR_CLASS, null, RawComparator.class);
if (theClass == null) {
return getOutputKeyComparator();
}
return ReflectionUtils.newInstance(theClass, this);
}
/**
* Write a partition file for the given job, using the Sampler provided.
* Queries the sampler for a sample keyset, sorts by the output key
* comparator, selects the keys for each rank, and writes to the destination
* returned from {@link
org.apache.hadoop.mapred.lib.TotalOrderPartitioner#getPartitionFile}.
*/
@SuppressWarnings("unchecked") // getInputFormat, getOutputKeyComparator
public static <K,V> void writePartitionFile(JobConf job,
Sampler<K,V> sampler) throws IOException {
final InputFormat<K,V> inf = (InputFormat<K,V>) job.getInputFormat();
int numPartitions = job.getNumReduceTasks();
K[] samples = sampler.getSample(inf, job);
LOG.info("Using " + samples.length + " samples");
RawComparator<K> comparator =
(RawComparator<K>) job.getOutputKeyComparator();
Arrays.sort(samples, comparator);
Path dst = new Path(TotalOrderPartitioner.getPartitionFile(job));
FileSystem fs = dst.getFileSystem(job);
if (fs.exists(dst)) {
fs.delete(dst, false);
}
SequenceFile.Writer writer = SequenceFile.createWriter(fs, job, dst,
job.getMapOutputKeyClass(), NullWritable.class);
NullWritable nullValue = NullWritable.get();
float stepSize = samples.length / (float) numPartitions;
int last = -1;
for(int i = 1; i < numPartitions; ++i) {
int k = Math.round(stepSize * i);
while (last >= k && comparator.compare(samples[last], samples[k]) == 0) {
++k;
}
writer.append(samples[k], nullValue);
last = k;
}
writer.close();
}
public CombineValuesIterator(RawKeyValueIterator in,
RawComparator<KEY> comparator, Class<KEY> keyClass,
Class<VALUE> valClass, Configuration conf, Reporter reporter,
Counters.Counter combineInputCounter) throws IOException {
super(in, comparator, keyClass, valClass, conf, reporter);
this.combineInputCounter = combineInputCounter;
}