下面列出了org.apache.hadoop.mapred.JobConf#getMapOutputKeyClass ( ) 实例代码,或者点击链接到github查看源代码,也可以在右侧发表评论。
private void combineAndSpill(
RawKeyValueIterator kvIter,
Counters.Counter inCounter) throws IOException {
JobConf job = jobConf;
Reducer combiner = ReflectionUtils.newInstance(combinerClass, job);
Class<K> keyClass = (Class<K>) job.getMapOutputKeyClass();
Class<V> valClass = (Class<V>) job.getMapOutputValueClass();
RawComparator<K> comparator =
(RawComparator<K>)job.getCombinerKeyGroupingComparator();
try {
CombineValuesIterator values = new CombineValuesIterator(
kvIter, comparator, keyClass, valClass, job, Reporter.NULL,
inCounter);
while (values.more()) {
combiner.reduce(values.getKey(), values, combineCollector,
Reporter.NULL);
values.nextKey();
}
} finally {
combiner.close();
}
}
private void combineAndSpill(
RawKeyValueIterator kvIter,
Counters.Counter inCounter) throws IOException {
JobConf job = jobConf;
Reducer combiner = ReflectionUtils.newInstance(combinerClass, job);
Class<K> keyClass = (Class<K>) job.getMapOutputKeyClass();
Class<V> valClass = (Class<V>) job.getMapOutputValueClass();
RawComparator<K> comparator =
(RawComparator<K>)job.getCombinerKeyGroupingComparator();
try {
CombineValuesIterator values = new CombineValuesIterator(
kvIter, comparator, keyClass, valClass, job, Reporter.NULL,
inCounter);
while (values.more()) {
combiner.reduce(values.getKey(), values, combineCollector,
Reporter.NULL);
values.nextKey();
}
} finally {
combiner.close();
}
}
/**
* Try initializing partially raw comparator for job.
*
* @param conf Configuration.
*/
private void initializePartiallyRawComparator(JobConf conf) {
String clsName = conf.get(HadoopJobProperty.JOB_PARTIALLY_RAW_COMPARATOR.propertyName(), null);
if (clsName == null) {
Class keyCls = conf.getMapOutputKeyClass();
while (keyCls != null) {
clsName = PARTIAL_COMPARATORS.get(keyCls.getName());
if (clsName != null) {
conf.set(HadoopJobProperty.JOB_PARTIALLY_RAW_COMPARATOR.propertyName(), clsName);
break;
}
keyCls = keyCls.getSuperclass();
}
}
}
/**
* Read in the partition file and build indexing data structures.
* If the keytype is {@link org.apache.hadoop.io.BinaryComparable} and
* <tt>total.order.partitioner.natural.order</tt> is not false, a trie
* of the first <tt>total.order.partitioner.max.trie.depth</tt>(2) + 1 bytes
* will be built. Otherwise, keys will be located using a binary search of
* the partition keyset using the {@link org.apache.hadoop.io.RawComparator}
* defined for this job. The input file must be sorted with the same
* comparator and contain {@link
org.apache.hadoop.mapred.JobConf#getNumReduceTasks} - 1 keys.
*/
@SuppressWarnings("unchecked") // keytype from conf not static
public void configure(JobConf job) {
try {
String parts = getPartitionFile(job);
final Path partFile = new Path(parts);
final FileSystem fs = (DEFAULT_PATH.equals(parts))
? FileSystem.getLocal(job) // assume in DistributedCache
: partFile.getFileSystem(job);
Class<K> keyClass = (Class<K>)job.getMapOutputKeyClass();
K[] splitPoints = readPartitions(fs, partFile, keyClass, job);
if (splitPoints.length != job.getNumReduceTasks() - 1) {
throw new IOException("Wrong number of partitions in keyset");
}
RawComparator<K> comparator =
(RawComparator<K>) job.getOutputKeyComparator();
for (int i = 0; i < splitPoints.length - 1; ++i) {
if (comparator.compare(splitPoints[i], splitPoints[i+1]) >= 0) {
throw new IOException("Split points are out of order");
}
}
boolean natOrder =
job.getBoolean("total.order.partitioner.natural.order", true);
if (natOrder && BinaryComparable.class.isAssignableFrom(keyClass)) {
partitions = buildTrie((BinaryComparable[])splitPoints, 0,
splitPoints.length, new byte[0],
job.getInt("total.order.partitioner.max.trie.depth", 2));
} else {
partitions = new BinarySearchNode(splitPoints, comparator);
}
} catch (IOException e) {
throw new IllegalArgumentException("Can't read partitions file", e);
}
}
/**
* Read in the partition file and build indexing data structures.
* If the keytype is {@link org.apache.hadoop.io.BinaryComparable} and
* <tt>total.order.partitioner.natural.order</tt> is not false, a trie
* of the first <tt>total.order.partitioner.max.trie.depth</tt>(2) + 1 bytes
* will be built. Otherwise, keys will be located using a binary search of
* the partition keyset using the {@link org.apache.hadoop.io.RawComparator}
* defined for this job. The input file must be sorted with the same
* comparator and contain {@link
org.apache.hadoop.mapred.JobConf#getNumReduceTasks} - 1 keys.
*/
@SuppressWarnings("unchecked") // keytype from conf not static
public void configure(JobConf job) {
try {
String parts = getPartitionFile(job);
final Path partFile = new Path(parts);
final FileSystem fs = (DEFAULT_PATH.equals(parts))
? FileSystem.getLocal(job) // assume in DistributedCache
: partFile.getFileSystem(job);
Class<K> keyClass = (Class<K>)job.getMapOutputKeyClass();
K[] splitPoints = readPartitions(fs, partFile, keyClass, job);
if (splitPoints.length != job.getNumReduceTasks() - 1) {
throw new IOException("Wrong number of partitions in keyset");
}
RawComparator<K> comparator =
(RawComparator<K>) job.getOutputKeyComparator();
for (int i = 0; i < splitPoints.length - 1; ++i) {
if (comparator.compare(splitPoints[i], splitPoints[i+1]) >= 0) {
throw new IOException("Split points are out of order");
}
}
boolean natOrder =
job.getBoolean("total.order.partitioner.natural.order", true);
if (natOrder && BinaryComparable.class.isAssignableFrom(keyClass)) {
partitions = buildTrie((BinaryComparable[])splitPoints, 0,
splitPoints.length, new byte[0],
job.getInt("total.order.partitioner.max.trie.depth", 2));
} else {
partitions = new BinarySearchNode(splitPoints, comparator);
}
} catch (IOException e) {
throw new IllegalArgumentException("Can't read partitions file", e);
}
}