下面列出了怎么用org.apache.hadoop.io.Writable的API类实例代码及写法,或者点击链接到github查看源代码。
/**
* This method does the work of deserializing a record into Java objects
* that Hive can work with via the ObjectInspector interface.
*/
//@Override
public Object deserialize(Writable blob) throws SerDeException {
if (Log.isTraceEnabled())
SpliceLogUtils.trace(Log, "deserialize " + blob);
ExecRowWritable rowWritable = (ExecRowWritable) blob;
objectCache.clear();
ExecRow val = rowWritable.get();
if (val == null)
return null;
DataValueDescriptor[] dvd = val.getRowArray();
if (dvd == null || dvd.length == 0)
return objectCache;
for (int i = 0; i< dvd.length; i++) {
objectCache.add(hiveTypeToObject(colTypes.get(i).getTypeName(),dvd[i]));
}
return objectCache;
}
public synchronized boolean next(WritableComparable key, Writable value) throws IOException {
numNext++;
if (pos_ >= end_) {
return false;
}
DataOutputBuffer buf = new DataOutputBuffer();
if (!readUntilMatchBegin()) {
return false;
}
if (!readUntilMatchEnd(buf)) {
return false;
}
// There is only one elem..key/value splitting is not done here.
byte[] record = new byte[buf.getLength()];
System.arraycopy(buf.getData(), 0, record, 0, record.length);
numRecStats(record, 0, record.length);
((Text) key).set(record);
((Text) value).set("");
return true;
}
public static void main(String[] args) throws Exception {
Configuration conf = new Configuration();
String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();
if (otherArgs.length < 2) {
System.err.println("Usage: ValuesTest configFile outputDir");
System.exit(2);
}
Job job = Job.getInstance(conf);
job.setJarByClass(ValuesTest.class);
job.setInputFormatClass(ValueInputFormat.class);
job.setMapperClass(ValueMapper.class);
job.setMapOutputKeyClass(LongWritable.class);
job.setMapOutputValueClass(Text.class);
job.setOutputFormatClass(TextOutputFormat.class);
FileOutputFormat.setOutputPath(job, new Path(otherArgs[1]));
conf = job.getConfiguration();
conf.addResource(otherArgs[0]);
conf.setClass(MarkLogicConstants.INPUT_VALUE_CLASS, Text.class,
Writable.class);
conf.setClass(MarkLogicConstants.INPUT_LEXICON_FUNCTION_CLASS,
ValuesFunction.class, Values.class);
System.exit(job.waitForCompletion(true) ? 0 : 1);
}
public void testNestedIterable() throws Exception {
Random r = new Random();
Writable[] writs = {
new BooleanWritable(r.nextBoolean()),
new FloatWritable(r.nextFloat()),
new FloatWritable(r.nextFloat()),
new IntWritable(r.nextInt()),
new LongWritable(r.nextLong()),
new BytesWritable("dingo".getBytes()),
new LongWritable(r.nextLong()),
new IntWritable(r.nextInt()),
new BytesWritable("yak".getBytes()),
new IntWritable(r.nextInt())
};
TupleWritable sTuple = makeTuple(writs);
assertTrue("Bad count", writs.length == verifIter(writs, sTuple, 0));
}
public void testWideTuple() throws Exception {
Text emptyText = new Text("Should be empty");
Writable[] values = new Writable[64];
Arrays.fill(values,emptyText);
values[42] = new Text("Number 42");
TupleWritable tuple = new TupleWritable(values);
tuple.setWritten(42);
for (int pos=0; pos<tuple.size();pos++) {
boolean has = tuple.has(pos);
if (pos == 42) {
assertTrue(has);
}
else {
assertFalse("Tuple position is incorrectly labelled as set: " + pos, has);
}
}
}
private void blowPipe(WritableComparable key, Writable val, OutputCollector output) throws IOException {
numRecRead_++;
maybeLogRecord();
// i took out the check for doPipe_. it's ridiculous.
// doPipes is set under conditions where the reducer is
// IdentityReducer. so the code would never come through this
// path.
if (outerrThreadsThrowable != null) {
mapRedFinished();
throw new IOException ("MROutput/MRErrThread failed:"
+ StringUtils.stringifyException(outerrThreadsThrowable));
}
if(!this.ignoreKey) {
write(key);
clientOut_.write('\t');
}
write(val);
clientOut_.write('\n');
// clientOut_.flush();
}
/**
* Gets serializer for specified class.
*
* @param cls Class.
* @param jobConf Job configuration.
* @return Appropriate serializer.
*/
@SuppressWarnings("unchecked")
private HadoopSerialization getSerialization(Class<?> cls, Configuration jobConf) throws IgniteCheckedException {
A.notNull(cls, "cls");
SerializationFactory factory = new SerializationFactory(jobConf);
Serialization<?> serialization = factory.getSerialization(cls);
if (serialization == null)
throw new IgniteCheckedException("Failed to find serialization for: " + cls.getName());
if (serialization.getClass() == WritableSerialization.class)
return new HadoopWritableSerialization((Class<? extends Writable>)cls);
return new HadoopSerializationWrapper(serialization, cls);
}
private static void createMultiStripeFile(File file)
throws IOException, ReflectiveOperationException, SerDeException
{
FileSinkOperator.RecordWriter writer = createOrcRecordWriter(file, ORC_12, OrcTester.Compression.NONE, javaLongObjectInspector);
@SuppressWarnings("deprecation") Serializer serde = new OrcSerde();
SettableStructObjectInspector objectInspector = createSettableStructObjectInspector("test", javaLongObjectInspector);
Object row = objectInspector.create();
StructField field = objectInspector.getAllStructFieldRefs().get(0);
for (int i = 0; i < 300; i += 3) {
if ((i > 0) && (i % 60 == 0)) {
flushWriter(writer);
}
objectInspector.setStructFieldData(row, field, (long) i);
Writable record = serde.serialize(row, objectInspector);
writer.write(record);
}
writer.close(false);
}
@Override
public Object deserialize(Writable writable)
throws SerDeException {
Row row = (Row) writable;
// Since this implementation uses a StructObjectInspector return a list of deserialized values in the same
// order as the original properties.
int i = 0;
for (Map.Entry<String, TypeInfo> column : _columns) {
String columnName = column.getKey();
TypeInfo type = column.getValue();
// Get the raw value from traversing the JSON map
Object rawValue = getRawValue(columnName, row);
// Deserialize the value to the expected type
Object value = deserialize(type, rawValue);
_values.set(i++, value);
}
return _values;
}
public static void main(String[] args) throws Exception {
Configuration conf = new Configuration();
String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();
if (otherArgs.length < 2) {
System.err.println("Usage: ElementAttributeValuesTest configFile outputDir");
System.exit(2);
}
Job job = Job.getInstance(conf);
job.setJarByClass(ElementAttributeValuesTest.class);
job.setInputFormatClass(ValueInputFormat.class);
job.setMapperClass(ElementAttrValueMapper.class);
job.setMapOutputKeyClass(LongWritable.class);
job.setMapOutputValueClass(Text.class);
job.setOutputFormatClass(TextOutputFormat.class);
FileOutputFormat.setOutputPath(job, new Path(otherArgs[1]));
conf = job.getConfiguration();
conf.addResource(otherArgs[0]);
conf.setClass(MarkLogicConstants.INPUT_VALUE_CLASS, Text.class,
Writable.class);
conf.setClass(MarkLogicConstants.INPUT_LEXICON_FUNCTION_CLASS,
ElementAttributeValuesFunction.class, ElementAttributeValues.class);
System.exit(job.waitForCompletion(true) ? 0 : 1);
}
@Test
public void TestPointWrite() throws Exception {
ArrayList<Object> stuff = new ArrayList<Object>();
Properties proptab = new Properties();
proptab.setProperty(HiveShims.serdeConstants.LIST_COLUMNS, "shape");
proptab.setProperty(HiveShims.serdeConstants.LIST_COLUMN_TYPES, "binary");
AbstractSerDe jserde = mkSerDe(proptab);
StructObjectInspector rowOI = (StructObjectInspector)jserde.getObjectInspector();
// {"properties":{},"geometry":{"type":"Point","coordinates":[15.0,5.0]}}
addWritable(stuff, new Point(15.0, 5.0));
Writable jsw = jserde.serialize(stuff, rowOI);
String rslt = ((Text)jsw).toString();
JsonNode jn = new ObjectMapper().readTree(rslt);
jn = jn.findValue("geometry");
Assert.assertNotNull(jn.findValue("type"));
Assert.assertNotNull(jn.findValue("coordinates"));
}
public void configure(JobConf job) {
// 'key' == sortInput for sort-input; key == sortOutput for sort-output
key = deduceInputFile(job);
if (key == sortOutput) {
partitioner = new HashPartitioner<WritableComparable, Writable>();
// Figure the 'current' partition and no. of reduces of the 'sort'
try {
URI inputURI = new URI(job.get(JobContext.MAP_INPUT_FILE));
String inputFile = inputURI.getPath();
// part file is of the form part-r-xxxxx
partition = Integer.valueOf(inputFile.substring(
inputFile.lastIndexOf("part") + 7)).intValue();
noSortReducers = job.getInt(SORT_REDUCES, -1);
} catch (Exception e) {
System.err.println("Caught: " + e);
System.exit(-1);
}
}
}
/**
* Given an output filename, write a bunch of random records to it.
*/
public void map(WritableComparable key,
Writable value,
Context context) throws IOException,InterruptedException {
int itemCount = 0;
while (numBytesToWrite > 0) {
int keyLength = minKeySize +
(keySizeRange != 0 ? random.nextInt(keySizeRange) : 0);
randomKey.setSize(keyLength);
randomizeBytes(randomKey.getBytes(), 0, randomKey.getLength());
int valueLength = minValueSize +
(valueSizeRange != 0 ? random.nextInt(valueSizeRange) : 0);
randomValue.setSize(valueLength);
randomizeBytes(randomValue.getBytes(), 0, randomValue.getLength());
context.write(randomKey, randomValue);
numBytesToWrite -= keyLength + valueLength;
context.getCounter(Counters.BYTES_WRITTEN).increment(keyLength + valueLength);
context.getCounter(Counters.RECORDS_WRITTEN).increment(1);
if (++itemCount % 200 == 0) {
context.setStatus("wrote record " + itemCount + ". " +
numBytesToWrite + " bytes left.");
}
}
context.setStatus("done with " + itemCount + " records.");
}
/**
* Determine the Hadoop writable type to pass Kettle type back to Hadoop as.
*
* @param kettleType
* @return Java type to convert {@code kettleType} to when sending data back to Hadoop.
*/
public static Class<? extends Writable> getWritableForKettleType( ValueMetaInterface kettleType ) {
if ( kettleType == null ) {
return NullWritable.class;
}
switch ( kettleType.getType() ) {
case ValueMetaInterface.TYPE_STRING:
case ValueMetaInterface.TYPE_BIGNUMBER:
case ValueMetaInterface.TYPE_DATE:
return Text.class;
case ValueMetaInterface.TYPE_INTEGER:
return LongWritable.class;
case ValueMetaInterface.TYPE_NUMBER:
return DoubleWritable.class;
case ValueMetaInterface.TYPE_BOOLEAN:
return BooleanWritable.class;
case ValueMetaInterface.TYPE_BINARY:
return BytesWritable.class;
default:
return Text.class;
}
}
/**
* Prints a JSON representation of the ArrayWritable for easier debuggability.
*/
public static String arrayWritableToString(ArrayWritable writable) {
if (writable == null) {
return "null";
}
StringBuilder builder = new StringBuilder();
Writable[] values = writable.get();
builder.append("\"values_" + Math.random() + "_" + values.length + "\": {");
int i = 0;
for (Writable w : values) {
if (w instanceof ArrayWritable) {
builder.append(arrayWritableToString((ArrayWritable) w)).append(",");
} else {
builder.append("\"value" + i + "\":\"" + w + "\"").append(",");
if (w == null) {
builder.append("\"type" + i + "\":\"unknown\"").append(",");
} else {
builder.append("\"type" + i + "\":\"" + w.getClass().getSimpleName() + "\"").append(",");
}
}
i++;
}
builder.deleteCharAt(builder.length() - 1);
builder.append("}");
return builder.toString();
}
@Deprecated
public static void cloneWritableInto(Writable dst,
Writable src) throws IOException {
CopyInCopyOutBuffer buffer = cloneBuffers.get();
buffer.outBuffer.reset();
src.write(buffer.outBuffer);
buffer.moveData();
dst.readFields(buffer.inBuffer);
}
public void broadcast(Writable writable) throws IOException {
int numWorkers = getWorkerCount();
for (int j = 0; j < numWorkers; ++j) {
sendMessageToWorker(writable, j);
}
}
public void reduce(IntWritable key, Iterator<Writable> values,
OutputCollector<IntWritable, Text> out,
Reporter reporter) throws IOException {
int currentKey = key.get();
// keys should be in descending order
if (currentKey > lastKey) {
fail("Keys not in sorted descending order");
}
lastKey = currentKey;
out.collect(key, new Text("success"));
}
/**
* Close all child RRs.
*/
public void close() throws IOException {
if (kids != null) {
for (RecordReader<K,? extends Writable> rr : kids) {
rr.close();
}
}
if (jc != null) {
jc.close();
}
}
@Test
public void testGetValue() {
BSONObject field = new BasicBSONObject("field", 1.312D);
BSONObject entry = new BasicBSONObject("double", field);
BSONWritable entity = new BSONWritable(entry);
DoubleValueMapper mapper = new DoubleValueMapper("double.field");
Writable value = mapper.getValue(entity);
assertFalse(value instanceof NullWritable);
assertTrue(value instanceof DoubleWritable);
assertEquals(((DoubleWritable) value).get(), 1.312D, 0.05);
}
@Override
public Tuple2<Long, Writable> call(Tuple2<Row, Long> arg0)
throws Exception
{
long rowix = arg0._2() + 1;
//process row data
int off = _containsID ? 1: 0;
Object obj = _isVector ? arg0._1().get(off) : arg0._1();
boolean sparse = (obj instanceof SparseVector);
MatrixBlock mb = new MatrixBlock(1, (int)_clen, sparse);
if( _isVector ) {
Vector vect = (Vector) obj;
if( vect instanceof SparseVector ) {
SparseVector svect = (SparseVector) vect;
int lnnz = svect.numNonzeros();
for( int k=0; k<lnnz; k++ )
mb.appendValue(0, svect.indices()[k], svect.values()[k]);
}
else { //dense
for( int j=0; j<_clen; j++ )
mb.appendValue(0, j, vect.apply(j));
}
}
else { //row
Row row = (Row) obj;
for( int j=off; j<off+_clen; j++ )
mb.appendValue(0, j-off, UtilFunctions.getDouble(row.get(j)));
}
mb.examSparsity();
return new Tuple2<>(rowix, new PairWritableBlock(new MatrixIndexes(1,1),mb));
}
public static byte[] toBytes(Writable writable) {
try {
ByteArrayOutputStream bout = new ByteArrayOutputStream();
DataOutputStream out = new DataOutputStream(bout);
writable.write(out);
out.close();
bout.close();
return bout.toByteArray();
} catch (IOException e) {
throw new RuntimeException(e);
}
}
/**
* Changes input into ObjectWritables.
*/
public void map(Text key, Writable value,
OutputCollector<Text, ObjectWritable> output, Reporter reporter)
throws IOException {
ObjectWritable objWrite = new ObjectWritable();
objWrite.set(value);
output.collect(key, objWrite);
}
private void writeObject(ObjectOutputStream out) throws IOException {
// serialize the parent fields and the final fields
out.defaultWriteObject();
// write the input split
((Writable) mapreduceInputSplit).write(out);
}
public Writable read() throws IOException {
Type type = in.readType();
if (type == null) {
return null;
}
switch (type) {
case BYTES:
return readBytes();
case BYTE:
return readByte();
case BOOL:
return readBoolean();
case INT:
return readVInt();
case LONG:
return readVLong();
case FLOAT:
return readFloat();
case DOUBLE:
return readDouble();
case STRING:
return readText();
case VECTOR:
return readArray();
case MAP:
return readMap();
case WRITABLE:
return readWritable();
default:
throw new RuntimeException("unknown type");
}
}
@Override
public Object deserialize(Writable writable) throws SerDeException {
// Different segments could contain different schemas.
// Especially the column orders could be different.
// Here we re-map the column names to the real column ids.
SchemaWritable reader = (SchemaWritable) writable;
if (this.projectCols != reader.columns) {
// Don't have to do it every time, only when schema is changed.
mapColIndex(reader.columns);
projectCols = reader.columns;
}
if (!isMapNeeded) {
serdeSize = columnNames.size();
return reader;
} else {
Writable[] projectWritables = reader.get();
Writable[] writables = new Writable[columnNames.size()];
for (int i = 0; i < validColIndexes.length; i++) {
int colIndex = validColIndexes[i];
int mapColId = validColMapIds[i];
writables[colIndex] = projectWritables[mapColId];
}
serdeSize = validColIndexes.length;
return new ArrayWritable(Writable.class, writables);
}
}
public HadoopInputSplit(int splitNumber, org.apache.hadoop.mapreduce.InputSplit mapreduceInputSplit, JobContext jobContext) {
super(splitNumber, (String) null);
if (mapreduceInputSplit == null) {
throw new NullPointerException("Hadoop input split must not be null");
}
if (!(mapreduceInputSplit instanceof Writable)) {
throw new IllegalArgumentException("InputSplit must implement Writable interface.");
}
this.splitType = mapreduceInputSplit.getClass();
this.mapreduceInputSplit = mapreduceInputSplit;
}
@Override
public Tuple2<Long, Writable> call(Tuple2<Row, Long> arg0)
throws Exception
{
long rowix = arg0._2() + 1;
//process row data
int off = _containsID ? 1: 0;
Object obj = _isVector ? arg0._1().get(off) : arg0._1();
boolean sparse = (obj instanceof SparseVector);
MatrixBlock mb = new MatrixBlock(1, (int)_clen, sparse);
if( _isVector ) {
Vector vect = (Vector) obj;
if( vect instanceof SparseVector ) {
SparseVector svect = (SparseVector) vect;
int lnnz = svect.numNonzeros();
for( int k=0; k<lnnz; k++ )
mb.appendValue(0, svect.indices()[k], svect.values()[k]);
}
else { //dense
for( int j=0; j<_clen; j++ )
mb.appendValue(0, j, vect.apply(j));
}
}
else { //row
Row row = (Row) obj;
for( int j=off; j<off+_clen; j++ )
mb.appendValue(0, j-off, UtilFunctions.getDouble(row.get(j)));
}
mb.examSparsity();
return new Tuple2<>(rowix, new PairWritableBlock(new MatrixIndexes(1,1),mb));
}
/** Constructs a server listening on the named port and address. Parameters passed must
* be of the named class. The <code>handlerCount</handlerCount> determines
* the number of handler threads that will be used to process calls.
*
*/
protected Server(String bindAddress, int port,
Class<? extends Writable> paramClass, int handlerCount,
Configuration conf, String serverName)
throws IOException {
this.bindAddress = bindAddress;
this.conf = conf;
this.port = port;
this.paramClass = paramClass;
this.handlerCount = handlerCount;
this.socketSendBufferSize = 0;
this.maxQueueSize = handlerCount * conf.getInt(
IPC_SERVER_HANDLER_QUEUE_SIZE_KEY,
MAX_QUEUE_SIZE_PER_HANDLER);
this.maxRespSize = conf.getInt(IPC_SERVER_RPC_MAX_RESPONSE_SIZE_KEY,
IPC_SERVER_RPC_MAX_RESPONSE_SIZE_DEFAULT);
this.readThreads = conf.getInt(IPC_SERVER_RPC_READ_THREADS_KEY,
IPC_SERVER_RPC_READ_THREADS_DEFAULT);
this.callQueue = new LinkedBlockingQueue<Call>(maxQueueSize);
this.maxIdleTime = 2*conf.getInt("ipc.client.connection.maxidletime", 1000);
this.maxConnectionsToNuke = conf.getInt("ipc.client.kill.max", 10);
this.thresholdIdleConnections = conf.getInt("ipc.client.idlethreshold", 4000);
// Start the listener here and let it bind to the port
listener = new Listener();
this.port = listener.getAddress().getPort();
this.rpcMetrics = new RpcMetrics(serverName,
Integer.toString(this.port), this);
this.tcpNoDelay = conf.getBoolean("ipc.server.tcpnodelay", false);
// Create the responder here
responder = new Responder();
}
@PublicEvolving
public WritableTypeInfo(Class<T> typeClass) {
this.typeClass = checkNotNull(typeClass);
checkArgument(
Writable.class.isAssignableFrom(typeClass) && !typeClass.equals(Writable.class),
"WritableTypeInfo can only be used for subclasses of %s", Writable.class.getName());
}