下面列出了怎么用org.apache.hadoop.io.WritableComparable的API类实例代码及写法,或者点击链接到github查看源代码。
@Override
public int compare(WritableComparable a, WritableComparable b) {
TKey k1 = (TKey)a;
TKey k2 = (TKey)b;
// 年,月,温度,,且温度倒序:
int c1 = Integer.compare(k1.getYear(), k2.getYear());
if(c1 == 0 ){
int c2 = Integer.compare(k1.getMonth(), k2.getMonth());
if(c2 == 0){
return - Integer.compare(k1.getWd(),k2.getWd());
}
return c2;
}
return c1;
}
public void configure(JobConf job) {
// 'key' == sortInput for sort-input; key == sortOutput for sort-output
key = deduceInputFile(job);
if (key == sortOutput) {
partitioner = new HashPartitioner<WritableComparable, Writable>();
// Figure the 'current' partition and no. of reduces of the 'sort'
try {
URI inputURI = new URI(job.get("map.input.file"));
String inputFile = inputURI.getPath();
partition = Integer.valueOf(
inputFile.substring(inputFile.lastIndexOf("part")+5)
).intValue();
noSortReducers = job.getInt("sortvalidate.sort.reduce.tasks", -1);
} catch (Exception e) {
System.err.println("Caught: " + e);
System.exit(-1);
}
}
}
/**
* Given an output filename, write a bunch of random records to it.
*/
public void map(WritableComparable key,
Writable value,
Context context) throws IOException,InterruptedException {
int itemCount = 0;
while (numBytesToWrite > 0) {
int keyLength = minKeySize +
(keySizeRange != 0 ? random.nextInt(keySizeRange) : 0);
randomKey.setSize(keyLength);
randomizeBytes(randomKey.getBytes(), 0, randomKey.getLength());
int valueLength = minValueSize +
(valueSizeRange != 0 ? random.nextInt(valueSizeRange) : 0);
randomValue.setSize(valueLength);
randomizeBytes(randomValue.getBytes(), 0, randomValue.getLength());
context.write(randomKey, randomValue);
numBytesToWrite -= keyLength + valueLength;
context.getCounter(Counters.BYTES_WRITTEN).increment(keyLength + valueLength);
context.getCounter(Counters.RECORDS_WRITTEN).increment(1);
if (++itemCount % 200 == 0) {
context.setStatus("wrote record " + itemCount + ". " +
numBytesToWrite + " bytes left.");
}
}
context.setStatus("done with " + itemCount + " records.");
}
public synchronized boolean next(WritableComparable key, Writable value) throws IOException {
numNext++;
if (pos_ >= end_) {
return false;
}
DataOutputBuffer buf = new DataOutputBuffer();
if (!readUntilMatchBegin()) {
return false;
}
if (!readUntilMatchEnd(buf)) {
return false;
}
// There is only one elem..key/value splitting is not done here.
byte[] record = new byte[buf.getLength()];
System.arraycopy(buf.getData(), 0, record, 0, record.length);
numRecStats(record, 0, record.length);
((Text) key).set(record);
((Text) value).set("");
return true;
}
public RecordWriter<WritableComparable, Writable> getRecordWriter(
final FileSystem fs, JobConf job,
String name, final Progressable progress) throws IOException {
final Path segmentDumpFile = new Path(FileOutputFormat.getOutputPath(job), name);
// Get the old copy out of the way
if (fs.exists(segmentDumpFile)) fs.delete(segmentDumpFile, true);
final PrintStream printStream = new PrintStream(fs.create(segmentDumpFile));
return new RecordWriter<WritableComparable, Writable>() {
public synchronized void write(WritableComparable key, Writable value) throws IOException {
printStream.println(value);
}
public synchronized void close(Reporter reporter) throws IOException {
printStream.close();
}
};
}
public String toOutputString() {
StringBuilder strBuilder = new StringBuilder();
ArrayList<K> keys = new ArrayList<>(keyValueMap.keySet());
if (WritableComparable.class.isAssignableFrom(keyClass)) {
ArrayList<? extends WritableComparable> orderedKeys = (ArrayList<? extends WritableComparable>) keys;
Collections.sort(orderedKeys);
}
for (K key : keys) {
strBuilder.append(key.toString());
strBuilder.append(": ");
strBuilder.append(keyValueMap.get(key));
strBuilder.append('\n');
}
return strBuilder.toString();
}
@Override
public RecordWriter<WritableComparable, Tuple> getRecordWriter(TaskAttemptContext taskattemptcontext)
throws IOException, InterruptedException {
setupUdfEnvAndStores(taskattemptcontext);
if(mapStores.size() + reduceStores.size() == 1) {
// single store case
POStore store;
if(mapStores.size() == 1) {
store = mapStores.get(0);
} else {
store = reduceStores.get(0);
}
StoreFuncInterface sFunc = store.getStoreFunc();
// set output location
PigOutputFormat.setLocation(taskattemptcontext, store);
// The above call should have update the conf in the JobContext
// to have the output location - now call checkOutputSpecs()
RecordWriter writer = sFunc.getOutputFormat().getRecordWriter(
taskattemptcontext);
return new PigRecordWriter(writer, sFunc, Mode.SINGLE_STORE);
} else {
// multi store case - in this case, all writing is done through
// MapReducePOStoreImpl - set up a dummy RecordWriter
return new PigRecordWriter(null, null, Mode.MULTI_STORE);
}
}
/** Run a FileOperation */
public void map(Text key, FileOperation value,
OutputCollector<WritableComparable<?>, Text> out, Reporter reporter
) throws IOException {
try {
value.run(jobconf);
++succeedcount;
reporter.incrCounter(Counter.SUCCEED, 1);
} catch (IOException e) {
++failcount;
reporter.incrCounter(Counter.FAIL, 1);
String s = "FAIL: " + value + ", " + StringUtils.stringifyException(e);
out.collect(null, new Text(s));
LOG.info(s);
} finally {
reporter.setStatus(getCountString());
}
}
@Override
public RecordWriter<WritableComparable, Tuple> getRecordWriter(
TaskAttemptContext job) throws IOException,
InterruptedException {
Configuration conf = job.getConfiguration();
String codec = conf.get(PigConfiguration.PIG_TEMP_FILE_COMPRESSION_CODEC, "");
if (!codec.equals("lzo") && !codec.equals("gz") && !codec.equals("gzip"))
throw new IOException(
"Invalid temporary file compression codec [" + codec + "]. Expected compression codecs are gz(gzip) and lzo");
if (codec.equals("gzip")) {
codec = "gz";
}
mLog.info(codec + " compression codec in use");
Path file = getDefaultWorkFile(job, "");
return new TFileRecordWriter(file, codec, conf);
}
private static final WritableComparable<?> createWritable(DataType type)
{
switch (type)
{
case BOOLEAN:
return new BooleanWritable();
case BYTE:
return new ByteWritable();
case INT:
return new IntWritable();
case LONG:
return new LongWritable();
case FLOAT:
return new FloatWritable();
case DOUBLE:
return new DoubleWritable();
case STRING:
return new Text();
default:
return null;
}
}
public void configure(JobConf job) {
// 'key' == sortInput for sort-input; key == sortOutput for sort-output
key = deduceInputFile(job);
if (key == sortOutput) {
partitioner = new HashPartitioner<WritableComparable, Writable>();
// Figure the 'current' partition and no. of reduces of the 'sort'
try {
URI inputURI = new URI(job.get("map.input.file"));
String inputFile = inputURI.getPath();
partition = Integer.valueOf(
inputFile.substring(inputFile.lastIndexOf("part")+5)
).intValue();
noSortReducers = job.getInt("sortvalidate.sort.reduce.tasks", -1);
} catch (Exception e) {
System.err.println("Caught: " + e);
System.exit(-1);
}
}
}
/**
* It a single record from the map file for the given index
*
* @param index Index, between 0 and numRecords()-1
* @return Value from the MapFile
* @throws IOException If an error occurs during reading
*/
public V getRecord(long index) throws IOException {
//First: determine which reader to read from...
int readerIdx = -1;
for (int i = 0; i < recordIndexesEachReader.size(); i++) {
Pair<Long, Long> p = recordIndexesEachReader.get(i);
if (index >= p.getFirst() && index <= p.getSecond()) {
readerIdx = i;
break;
}
}
if (readerIdx == -1) {
throw new IllegalStateException("Index not found in any reader: " + index);
}
WritableComparable key = indexToKey.getKeyForIndex(index);
Writable value = ReflectionUtils.newInstance(recordClass, null);
V v = (V) readers[readerIdx].get(key, value);
return v;
}
public void configure(JobConf job) {
// 'key' == sortInput for sort-input; key == sortOutput for sort-output
key = deduceInputFile(job);
if (key == sortOutput) {
partitioner = new HashPartitioner<WritableComparable, Writable>();
// Figure the 'current' partition and no. of reduces of the 'sort'
try {
URI inputURI = new URI(job.get(JobContext.MAP_INPUT_FILE));
String inputFile = inputURI.getPath();
// part file is of the form part-r-xxxxx
partition = Integer.valueOf(inputFile.substring(
inputFile.lastIndexOf("part") + 7)).intValue();
noSortReducers = job.getInt(SORT_REDUCES, -1);
} catch (Exception e) {
System.err.println("Caught: " + e);
System.exit(-1);
}
}
}
/**
* Given an output filename, write a bunch of random records to it.
*/
public void map(WritableComparable key,
Writable value,
Context context) throws IOException,InterruptedException {
int itemCount = 0;
while (numBytesToWrite > 0) {
int keyLength = minKeySize +
(keySizeRange != 0 ? random.nextInt(keySizeRange) : 0);
randomKey.setSize(keyLength);
randomizeBytes(randomKey.getBytes(), 0, randomKey.getLength());
int valueLength = minValueSize +
(valueSizeRange != 0 ? random.nextInt(valueSizeRange) : 0);
randomValue.setSize(valueLength);
randomizeBytes(randomValue.getBytes(), 0, randomValue.getLength());
context.write(randomKey, randomValue);
numBytesToWrite -= keyLength + valueLength;
context.getCounter(Counters.BYTES_WRITTEN).increment(keyLength + valueLength);
context.getCounter(Counters.RECORDS_WRITTEN).increment(1);
if (++itemCount % 200 == 0) {
context.setStatus("wrote record " + itemCount + ". " +
numBytesToWrite + " bytes left.");
}
}
context.setStatus("done with " + itemCount + " records.");
}
/** The waiting function. The map exits once it gets a signal. Here the
* signal is the file existence.
*/
public void map(WritableComparable key, Writable val,
OutputCollector<WritableComparable, Writable> output,
Reporter reporter)
throws IOException {
if (shouldWait(id)) {
if (fs != null) {
while (!fs.exists(getSignalFile(id))) {
try {
reporter.progress();
synchronized (this) {
this.wait(1000); // wait for 1 sec
}
} catch (InterruptedException ie) {
System.out.println("Interrupted while the map was waiting for "
+ " the signal.");
break;
}
}
} else {
throw new IOException("Could not get the DFS!!");
}
}
}
/**
* Given an output filename, write a bunch of random records to it.
*/
public void map(WritableComparable key,
Writable value,
OutputCollector<BytesWritable, BytesWritable> output,
Reporter reporter) throws IOException {
int itemCount = 0;
while (numBytesToWrite > 0) {
int keyLength = minKeySize
+ (keySizeRange != 0 ? random.nextInt(keySizeRange) : 0);
randomKey.setSize(keyLength);
randomizeBytes(randomKey.getBytes(), 0, randomKey.getLength());
int valueLength = minValueSize
+ (valueSizeRange != 0 ? random.nextInt(valueSizeRange) : 0);
randomValue.setSize(valueLength);
randomizeBytes(randomValue.getBytes(), 0, randomValue.getLength());
output.collect(randomKey, randomValue);
numBytesToWrite -= keyLength + valueLength;
reporter.incrCounter(Counters.BYTES_WRITTEN, keyLength + valueLength);
reporter.incrCounter(Counters.RECORDS_WRITTEN, 1);
if (++itemCount % 200 == 0) {
reporter.setStatus("wrote record " + itemCount + ". "
+ numBytesToWrite + " bytes left.");
}
}
reporter.setStatus("done with " + itemCount + " records.");
}
/** The waiting function. The reduce exits once it gets a signal. Here the
* signal is the file existence.
*/
public void reduce(WritableComparable key, Iterator<Writable> val,
OutputCollector<WritableComparable, Writable> output,
Reporter reporter)
throws IOException {
if (fs != null) {
while (!fs.exists(signal)) {
try {
reporter.progress();
synchronized (this) {
this.wait(1000); // wait for 1 sec
}
} catch (InterruptedException ie) {
System.out.println("Interrupted while the map was waiting for the"
+ " signal.");
break;
}
}
} else {
throw new IOException("Could not get the DFS!!");
}
}
/**
* Given an output filename, write a bunch of random records to it.
*/
public void map(WritableComparable key,
Writable value,
Context context) throws IOException,InterruptedException {
int itemCount = 0;
while (numBytesToWrite > 0) {
int keyLength = minKeySize +
(keySizeRange != 0 ? random.nextInt(keySizeRange) : 0);
randomKey.setSize(keyLength);
randomizeBytes(randomKey.getBytes(), 0, randomKey.getLength());
int valueLength = minValueSize +
(valueSizeRange != 0 ? random.nextInt(valueSizeRange) : 0);
randomValue.setSize(valueLength);
randomizeBytes(randomValue.getBytes(), 0, randomValue.getLength());
context.write(randomKey, randomValue);
numBytesToWrite -= keyLength + valueLength;
context.getCounter(Counters.BYTES_WRITTEN).increment(keyLength + valueLength);
context.getCounter(Counters.RECORDS_WRITTEN).increment(1);
if (++itemCount % 200 == 0) {
context.setStatus("wrote record " + itemCount + ". " +
numBytesToWrite + " bytes left.");
}
}
context.setStatus("done with " + itemCount + " records.");
}
public void reduce(WritableComparable key, Iterator values,
OutputCollector output, Reporter reporter
) throws IOException {
if (first) {
first = false;
MapOutputFile mapOutputFile = new MapOutputFile(taskId.getJobID());
mapOutputFile.setConf(conf);
Path input = mapOutputFile.getInputFile(0, taskId);
FileSystem fs = FileSystem.get(conf);
assertTrue("reduce input exists " + input, fs.exists(input));
SequenceFile.Reader rdr =
new SequenceFile.Reader(fs, input, conf);
assertEquals("is reduce input compressed " + input,
compressInput,
rdr.isCompressed());
rdr.close();
}
}
@Override
public void write(WritableComparable wc, Tuple t) throws IOException,
InterruptedException {
DataOutputStream outputKey = writer.prepareAppendKey(KEY0.getLength());
try {
outputKey.write(KEY0.getBytes(), 0, KEY0.getLength());
}
finally {
outputKey.close();
}
// we really only want to write the tuple (value) out here
DataOutputStream outputValue = writer.prepareAppendValue(-1);
try {
sedes.writeDatum(outputValue, t);
}
finally {
outputValue.close();
}
}
/**
* Copies a file and validates the copy by checking the checksums.
* If validation fails, retries (max number of tries is distcp.file.retries)
* to copy the file.
*/
void copyWithRetries(FileStatus srcstat, Path relativedst,
OutputCollector<WritableComparable<?>, Text> out,
Reporter reporter) throws IOException {
// max tries to copy when validation of copy fails
final int maxRetries = job.getInt(FILE_RETRIES_LABEL, DEFAULT_FILE_RETRIES);
// save update flag for later copies within the same map task
final boolean saveUpdate = update;
int retryCnt = 1;
for (; retryCnt <= maxRetries; retryCnt++) {
try {
//copy the file and validate copy
copy(srcstat, relativedst, out, reporter);
break;// copy successful
} catch (IOException e) {
LOG.warn("Copy of " + srcstat.getPath() + " failed.", e);
if (retryCnt < maxRetries) {// copy failed and need to retry
LOG.info("Retrying copy of file " + srcstat.getPath());
update = true; // set update flag for retries
}
else {// no more retries... Give up
update = saveUpdate;
throw new IOException("Copy of file failed even with " + retryCnt
+ " tries.", e);
}
}
}
}
public static <K> RawComparator<K> getIntermediateOutputKeyComparator(Configuration conf) {
Class<? extends RawComparator> theClass = conf.getClass(
TezRuntimeConfiguration.TEZ_RUNTIME_KEY_COMPARATOR_CLASS, null,
RawComparator.class);
if (theClass != null)
return ReflectionUtils.newInstance(theClass, conf);
return WritableComparator.get(getIntermediateOutputKeyClass(conf).asSubclass(
WritableComparable.class));
}
public void setupRunner(String jobName, Class<?> runnerClass,
Class<? extends TableMapper<?, ?>> mapperClass, Class<? extends Reducer<?, ?, ?, ?>> reducerClass,
Class<? extends WritableComparable<?>> mapOutputKeyClass,
Class<? extends Writable> mapOutputValueClass,
Class<? extends WritableComparable<?>> outputKeyClass,
Class<? extends Writable> outputValueClass) {
this.setupRunner(jobName, runnerClass, mapperClass, reducerClass, mapOutputKeyClass, mapOutputValueClass, outputKeyClass, outputValueClass, TransformerOutputFormat.class);
}
/**
* 具体设置参数
*
* @param jobName
* @param runnerClass
* @param mapperClass
* @param reducerClass
* @param mapOutputKeyClass
* @param mapOutputValueClass
* @param outputKeyClass
* @param outputValueClass
* @param outputFormatClass
*/
public void setupRunner(String jobName, Class<?> runnerClass, Class<? extends TableMapper<?, ?>> mapperClass, Class<? extends Reducer<?, ?, ?, ?>> reducerClass, Class<? extends WritableComparable<?>> mapOutputKeyClass, Class<? extends Writable> mapOutputValueClass, Class<? extends WritableComparable<?>> outputKeyClass, Class<? extends Writable> outputValueClass,
Class<? extends OutputFormat<?, ?>> outputFormatClass) {
this.jobName = jobName;
this.runnerClass = runnerClass;
this.mapperClass = mapperClass;
this.reducerClass = reducerClass;
this.mapOutputKeyClass = mapOutputKeyClass;
this.mapOutputValueClass = mapOutputValueClass;
this.outputKeyClass = outputKeyClass;
this.outputValueClass = outputValueClass;
this.outputFormatClass = outputFormatClass;
this.isCallSetUpRunnerMethod = true;
}
@Override
public int compare(WritableComparable r1, WritableComparable r2) {
IpTimestampKey key1 = (IpTimestampKey) r1;
IpTimestampKey key2 = (IpTimestampKey) r2;
int result = key1.getIp().compareTo(key2.getIp());
if (result == 0) {
result = key1.getUnixTimestamp().compareTo(key2.getUnixTimestamp());
}
return result;
}
private void validateKeyValue(WritableComparable<?> k, Writable v,
int tupleSize, boolean firstTuple, boolean secondTuple,
TestType ttype) throws IOException {
System.out.println("out k:" + k + " v:" + v);
if (ttype.equals(TestType.OUTER_ASSOCIATIVITY)) {
validateOuterKeyValue((IntWritable)k, (TupleWritable)v, tupleSize,
firstTuple, secondTuple);
} else if (ttype.equals(TestType.INNER_ASSOCIATIVITY)) {
validateInnerKeyValue((IntWritable)k, (TupleWritable)v, tupleSize,
firstTuple, secondTuple);
}
if (ttype.equals(TestType.INNER_IDENTITY)) {
validateKeyValue_INNER_IDENTITY((IntWritable)k, (IntWritable)v);
}
}
/**
* Create a new key value common to all child RRs.
* @throws ClassCastException if key classes differ.
*/
@SuppressWarnings("unchecked") // Explicit check for key class agreement
public K createKey() {
if (null == keyclass) {
final Class<?> cls = kids[0].createKey().getClass();
for (RecordReader<K,? extends Writable> rr : kids) {
if (!cls.equals(rr.createKey().getClass())) {
throw new ClassCastException("Child key classes fail to agree");
}
}
keyclass = cls.asSubclass(WritableComparable.class);
}
return (K) ReflectionUtils.newInstance(keyclass, getConf());
}
@Override
@SuppressWarnings("rawtypes")
public int compare(WritableComparable k1, WritableComparable k2)
{
ResultMergeTaggedMatrixIndexes key1 = (ResultMergeTaggedMatrixIndexes)k1;
ResultMergeTaggedMatrixIndexes key2 = (ResultMergeTaggedMatrixIndexes)k2;
//group by matrix indexes only (including all tags)
return key1.getIndexes().compareTo(key2.getIndexes());
}
@Override
public int compare(WritableComparable a, WritableComparable b) {
TextLongWritable textLongA = (TextLongWritable) a;
TextLongWritable textLongB = (TextLongWritable) b;
return textLongA.getText().compareTo(textLongB.getText());
}
@Override
public int compare(WritableComparable a, WritableComparable b) {
TextLongWritable textLongA = (TextLongWritable) a;
TextLongWritable textLongB = (TextLongWritable) b;
return textLongA.getText().compareTo(textLongB.getText());
}