下面列出了怎么用org.apache.hadoop.io.WritableUtils的API类实例代码及写法,或者点击链接到github查看源代码。
@Override
public void write(DataOutput out) throws IOException {
Text.writeString(out, queueName);
WritableUtils.writeEnum(out, queueState);
if(schedulingInfo!= null) {
Text.writeString(out, schedulingInfo);
}else {
Text.writeString(out, "N/A");
}
out.writeInt(stats.length);
for (JobStatus stat : stats) {
stat.write(out);
}
out.writeInt(children.size());
for(QueueInfo childQueueInfo : children) {
childQueueInfo.write(out);
}
}
private long writeRecords(int count, boolean knownKeyLength,
boolean knownValueLength, boolean close) throws IOException {
long rawDataSize = 0;
for (int nx = 0; nx < count; nx++) {
String key = TestDTFileByteArrays.composeSortedKey("key", nx);
DataOutputStream outKey =
writer.prepareAppendKey(knownKeyLength ? key.length() : -1);
outKey.write(key.getBytes());
outKey.close();
String value = "value" + nx;
DataOutputStream outValue =
writer.prepareAppendValue(knownValueLength ? value.length() : -1);
outValue.write(value.getBytes());
outValue.close();
rawDataSize +=
WritableUtils.getVIntSize(key.getBytes().length)
+ key.getBytes().length
+ WritableUtils.getVIntSize(value.getBytes().length)
+ value.getBytes().length;
}
if (close) {
closeOutput();
}
return rawDataSize;
}
@Override
public void readFields(DataInput in) throws IOException {
size = WritableUtils.readVInt(in);
int payload = size - WritableUtils.getVIntSize(size);
if (payload > Long.SIZE / Byte.SIZE) {
seed = in.readLong();
payload -= Long.SIZE / Byte.SIZE;
} else {
Arrays.fill(literal, (byte)0);
in.readFully(literal, 0, payload);
dib.reset(literal, 0, literal.length);
seed = dib.readLong();
payload = 0;
}
final int vBytes = in.skipBytes(payload);
if (vBytes != payload) {
throw new EOFException("Expected " + payload + ", read " + vBytes);
}
}
public void write(DataOutput out) throws IOException {
Text.writeString(out, jobFile);
taskId.write(out);
out.writeInt(partition);
out.writeInt(numSlotsRequired);
taskStatus.write(out);
skipRanges.write(out);
out.writeBoolean(skipping);
out.writeBoolean(jobCleanup);
if (jobCleanup) {
WritableUtils.writeEnum(out, jobRunStateForCleanup);
}
out.writeBoolean(jobSetup);
out.writeBoolean(writeSkipRecs);
out.writeBoolean(taskCleanup);
Text.writeString(out, user);
out.writeInt(encryptedSpillKey.length);
extraData.write(out);
out.write(encryptedSpillKey);
}
public void append(DataInputBuffer key, DataInputBuffer value)
throws IOException {
int keyLength = key.getLength() - key.getPosition();
if (keyLength < 0) {
throw new IOException("Negative key-length not allowed: " + keyLength +
" for " + key);
}
int valueLength = value.getLength() - value.getPosition();
if (valueLength < 0) {
throw new IOException("Negative value-length not allowed: " +
valueLength + " for " + value);
}
WritableUtils.writeVInt(out, keyLength);
WritableUtils.writeVInt(out, valueLength);
out.write(key.getData(), key.getPosition(), keyLength);
out.write(value.getData(), value.getPosition(), valueLength);
// Update bytes written
decompressedBytesWritten += keyLength + valueLength +
WritableUtils.getVIntSize(keyLength) +
WritableUtils.getVIntSize(valueLength);
++numRecordsWritten;
}
/** {@inheritDoc} */
public void write(DataOutput out) throws IOException {
super.write(out);
//TODO: move it to DatanodeID once DatanodeID is not stored in FSImage
out.writeShort(ipcPort);
out.writeLong(capacity);
out.writeLong(dfsUsed);
out.writeLong(remaining);
out.writeLong(lastUpdate);
out.writeInt(xceiverCount);
Text.writeString(out, location);
Text.writeString(out, hostName == null? "": hostName);
WritableUtils.writeEnum(out, getAdminState());
}
/**
* FileSystemGroup ::= #scheme (scheme #counter (key value)*)*
*/
@Override
public void write(DataOutput out) throws IOException {
WritableUtils.writeVInt(out, map.size()); // #scheme
for (Map.Entry<String, Object[]> entry : map.entrySet()) {
WritableUtils.writeString(out, entry.getKey()); // scheme
// #counter for the above scheme
WritableUtils.writeVInt(out, numSetCounters(entry.getValue()));
for (Object counter : entry.getValue()) {
if (counter == null) continue;
FSCounter c = (FSCounter) ((TezCounter)counter).getUnderlyingCounter();
WritableUtils.writeVInt(out, c.key.ordinal()); // key
WritableUtils.writeVLong(out, c.getValue()); // value
}
}
}
public void readFields(DataInput in) throws IOException {
this.taskid.readFields(in);
this.progress = in.readFloat();
this.runState = WritableUtils.readEnum(in, State.class);
this.diagnosticInfo = Text.readString(in);
this.stateString = Text.readString(in);
this.phase = WritableUtils.readEnum(in, Phase.class);
this.startTime = in.readLong();
this.finishTime = in.readLong();
counters = new Counters();
this.includeCounters = in.readBoolean();
this.outputSize = in.readLong();
if (includeCounters) {
counters.readFields(in);
}
nextRecordRange.readFields(in);
this.runOnGPU = in.readBoolean();
}
/**
* Aggregate all LinkNode objects for a given url.
*/
public void reduce(Text key, Iterator<LinkNode> values,
OutputCollector<Text, LinkNodes> output, Reporter reporter)
throws IOException {
List<LinkNode> nodeList = new ArrayList<LinkNode>();
int numNodes = 0;
while (values.hasNext()) {
LinkNode cur = values.next();
if (numNodes < maxInlinks) {
nodeList.add((LinkNode)WritableUtils.clone(cur, conf));
numNodes++;
}
else {
break;
}
}
LinkNode[] linkNodesAr = nodeList.toArray(new LinkNode[nodeList.size()]);
LinkNodes linkNodes = new LinkNodes(linkNodesAr);
output.collect(key, linkNodes);
}
/**
* FileSystemGroup ::= #scheme (scheme #counter (key value)*)*
*/
@Override
public void write(DataOutput out) throws IOException {
WritableUtils.writeVInt(out, map.size()); // #scheme
for (Map.Entry<String, Object[]> entry : map.entrySet()) {
WritableUtils.writeString(out, entry.getKey()); // scheme
// #counter for the above scheme
WritableUtils.writeVInt(out, numSetCounters(entry.getValue()));
for (Object counter : entry.getValue()) {
if (counter == null) continue;
@SuppressWarnings("unchecked")
FSCounter c = (FSCounter) ((Counter)counter).getUnderlyingCounter();
WritableUtils.writeVInt(out, c.key.ordinal()); // key
WritableUtils.writeVLong(out, c.getValue()); // value
}
}
}
/** {@inheritDoc} */
public void readFields(DataInput in) throws IOException {
super.readFields(in);
//TODO: move it to DatanodeID once DatanodeID is not stored in FSImage
this.ipcPort = in.readShort() & 0x0000ffff;
this.capacity = in.readLong();
this.dfsUsed = in.readLong();
this.remaining = in.readLong();
this.lastUpdate = in.readLong();
this.xceiverCount = in.readInt();
this.location = Text.readString(in);
this.hostName = Text.readString(in);
setAdminState(WritableUtils.readEnum(in, AdminStates.class));
}
/**
* Decode a vint from the buffer pointed at to by ptr and
* increment the offset of the ptr by the length of the
* vint.
* @param ptr a pointer to a byte array buffer
* @return the decoded vint value as a long
*/
public static long vlongFromBytes(ImmutableBytesWritable ptr) {
final byte [] buffer = ptr.get();
final int offset = ptr.getOffset();
byte firstByte = buffer[offset];
int len = WritableUtils.decodeVIntSize(firstByte);
if (len == 1) {
ptr.set(buffer, offset+1, ptr.getLength());
return firstByte;
}
long i = 0;
for (int idx = 0; idx < len-1; idx++) {
byte b = buffer[offset + 1 + idx];
i = i << 8;
i = i | (b & 0xFF);
}
ptr.set(buffer, offset+len, ptr.getLength());
return (WritableUtils.isNegativeVInt(firstByte) ? ~i : i);
}
public boolean next(DataInputBuffer key, DataInputBuffer value)
throws IOException {
MemoryBlockIndex memBlkIdx = keyValueIterator.next();
if (memBlkIdx != null) {
int pos = memBlkIdx.getIndex();
MemoryBlock memBlk = memBlkIdx.getMemoryBlock();
int offset = memBlk.offsets[pos];
int keyLen = memBlk.keyLenArray[pos];
int valLen = memBlk.valueLenArray[pos];
dataOutputBuffer.reset();
dataOutputBuffer.writeInt(keyLen);
dataOutputBuffer.write(kvbuffer, offset, keyLen);
dataOutputBuffer.writeInt(valLen);
dataOutputBuffer.write(kvbuffer, offset + keyLen, valLen);
key.reset(dataOutputBuffer.getData(), 0, keyLen
+ WritableUtils.INT_LENGTH_BYTES);
value.reset(dataOutputBuffer.getData(), keyLen
+ WritableUtils.INT_LENGTH_BYTES, valLen
+ WritableUtils.INT_LENGTH_BYTES);
return true;
}
return false;
}
public void append(DataInputBuffer key, DataInputBuffer value)
throws IOException {
int keyLength = key.getLength() - key.getPosition();
if (keyLength < 0) {
throw new IOException("Negative key-length not allowed: " + keyLength +
" for " + key);
}
int valueLength = value.getLength() - value.getPosition();
if (valueLength < 0) {
throw new IOException("Negative value-length not allowed: " +
valueLength + " for " + value);
}
WritableUtils.writeVInt(out, keyLength);
WritableUtils.writeVInt(out, valueLength);
out.write(key.getData(), key.getPosition(), keyLength);
out.write(value.getData(), value.getPosition(), valueLength);
// Update bytes written
decompressedBytesWritten += keyLength + valueLength +
WritableUtils.getVIntSize(keyLength) +
WritableUtils.getVIntSize(valueLength);
++numRecordsWritten;
}
@Override
public void readFields(DataInput input) throws IOException {
this.emptyCFName = WritableUtils.readCompressedByteArray(input);
int bitsetLongArraySize = WritableUtils.readVInt(input);
long[] bitsetLongArray = new long[bitsetLongArraySize];
for (int i = 0; i < bitsetLongArraySize; i++) {
bitsetLongArray[i] = WritableUtils.readVLong(input);
}
this.trackedColumns = BitSet.valueOf(bitsetLongArray);
this.encodingScheme = QualifierEncodingScheme.values()[WritableUtils.readVInt(input)];
int conditionOnlyCfsSize = WritableUtils.readVInt(input);
this.conditionOnlyCfs = new TreeSet<byte[]>(Bytes.BYTES_COMPARATOR);
while (conditionOnlyCfsSize > 0) {
this.conditionOnlyCfs.add(WritableUtils.readCompressedByteArray(input));
conditionOnlyCfsSize--;
}
}
/**
* {@inheritDoc}
* @throws IOException If the child InputSplit cannot be read, typically
* for faliing access checks.
*/
@SuppressWarnings("unchecked") // Generic array assignment
public void readFields(DataInput in) throws IOException {
int card = WritableUtils.readVInt(in);
if (splits == null || splits.length != card) {
splits = new InputSplit[card];
}
Class<? extends InputSplit>[] cls = new Class[card];
try {
for (int i = 0; i < card; ++i) {
cls[i] =
Class.forName(Text.readString(in)).asSubclass(InputSplit.class);
}
for (int i = 0; i < card; ++i) {
splits[i] = ReflectionUtils.newInstance(cls[i], null);
splits[i].readFields(in);
}
} catch (ClassNotFoundException e) {
throw (IOException)new IOException("Failed split init").initCause(e);
}
}
public void write(DataOutput out) throws IOException {
taskid.write(out);
out.writeFloat(progress);
Text.writeString(out, state);
out.writeLong(startTime);
out.writeLong(finishTime);
out.writeBoolean(runOnGPU);
WritableUtils.writeStringArray(out, diagnostics);
counters.write(out);
WritableUtils.writeEnum(out, currentStatus);
if (currentStatus == TIPStatus.RUNNING) {
WritableUtils.writeVInt(out, runningAttempts.size());
TaskAttemptID t[] = new TaskAttemptID[0];
t = runningAttempts.toArray(t);
for (int i = 0; i < t.length; i++) {
t[i].write(out);
}
} else if (currentStatus == TIPStatus.COMPLETE) {
successfulAttempt.write(out);
}
}
/**
* Sets this instance's content from the input.
*/
@Override
public void readFields(DataInput in)
throws IOException {
// Read the length as a variable int
int length = WritableUtils.readVInt(in);
// If necessary increase the buffer capacity
if (length > _text.capacity()) {
_text = ByteBuffer.allocate(length);
}
// For efficiency read directly into the buffer's array
in.readFully(_text.array(), 0, length);
// Since we bypassed putting into the buffer set the position and limit directly
_text.position(0);
_text.limit(length);
// Set the map to null since the contents may have changed.
_map = null;
}
@Override
public void readFields(DataInput input) throws IOException {
// read/write type ordinal, maxLength presence, scale presence and isNullable bit together to save space
int typeAndFlag = WritableUtils.readVInt(input);
isNullable = (typeAndFlag & 0x01) != 0;
if ((typeAndFlag & 0x02) != 0) {
scale = WritableUtils.readVInt(input);
}
if ((typeAndFlag & 0x04) != 0) {
maxLength = WritableUtils.readVInt(input);
}
type = PDataType.values()[typeAndFlag >>> 3];
if (type.isFixedWidth() && type.getByteSize() == null) {
byteSize = WritableUtils.readVInt(input);
}
columnModifier = ColumnModifier.fromSystemValue(WritableUtils.readVInt(input));
}
public void write(DataOutput out) throws IOException {
taskId.write(out);
WritableUtils.writeVInt(out, idWithinJob);
out.writeBoolean(isMap);
WritableUtils.writeEnum(out, status);
WritableUtils.writeString(out, taskTrackerHttp);
WritableUtils.writeVInt(out, taskRunTime);
WritableUtils.writeVInt(out, eventId);
}
/**
*/
@Override
public void write(DataOutput out) throws IOException {
WritableUtils.writeVInt(out, keyId);
WritableUtils.writeVLong(out, expiryDate);
if (keyBytes == null) {
WritableUtils.writeVInt(out, -1);
} else {
WritableUtils.writeVInt(out, keyBytes.length);
out.write(keyBytes);
}
}
public void write(DataOutput out) throws IOException {
// Write the SEGMENT_HEADER_ID to distinguish this from a LobRecord.
WritableUtils.writeVLong(out, SEGMENT_HEADER_ID);
// The length of the main body of the segment is the length of the
// data byte array.
int segmentBytesLen = recordLenBytes.getLength();
WritableUtils.writeVLong(out, segmentBytesLen);
// Write the body of the segment.
out.write(recordLenBytes.getBytes(), 0, segmentBytesLen);
}
@Override
public void write(DataOutput out) throws IOException {
//TODO Write resources version no too
WritableUtils.writeVLong(out, cumulativeCpuUsage); // long #1
WritableUtils.writeVLong(out, cumulativeGpuUsage);
WritableUtils.writeVLong(out, virtualMemoryUsage); // long #2
WritableUtils.writeVLong(out, physicalMemoryUsage); // long #3
WritableUtils.writeVLong(out, heapUsage); // long #4
}
@Override
public void write(DataOutput dataOutput) throws IOException {
dataOutput.writeInt(this.uvs);
dataOutput.writeInt(this.visits);
dataOutput.writeInt(this.bounceNumber);
WritableUtils.writeEnum(dataOutput, this.kpi);
}
public void readFields(DataInput in) throws IOException {
int length = WritableUtils.readVInt(in);
datum = new LogFieldWritable[length];
for (int i = 0; i < length; i++) {
LogFieldWritable value = new LogFieldWritable();
value.readFields(in);
datum[i] = value;
}
}
@Override
public void readFields(DataInput in) throws IOException {
clear();
int size = WritableUtils.readVInt(in);
for(int i=0; i < size; ++i) {
String key = org.apache.hadoop.io.Text.readString(in);
String value = org.apache.hadoop.io.Text.readString(in);
set(key, value);
String sources[] = WritableUtils.readCompressedStringArray(in);
if (sources != null) {
putIntoUpdatingResource(key, sources);
}
}
}
@Override
public void readFields(DataInput in) throws IOException {
try {
int cnt = in.readInt();
for (int i = 0; i < cnt; i++) {
String className = WritableUtils.readString(in);
AegSplit split = AEG_SPLIT_LOADING_CACHE.get(className).newInstance();
split.readFields(in);
splits.add(split);
}
} catch (Throwable t) {
Throwables.propagateIfPossible(t, IOException.class);
throw new IOException("Unexpected exception", t);
}
}
public void write(DataOutput out) throws IOException {
taskId.write(out);
WritableUtils.writeVInt(out, idWithinJob);
out.writeBoolean(isMap);
WritableUtils.writeEnum(out, status);
WritableUtils.writeString(out, taskTrackerHttp);
WritableUtils.writeVInt(out, taskRunTime);
WritableUtils.writeVInt(out, eventId);
}
/**
* Reads a compressed entry into an array.
* The output into the array ends up length-prefixed.
*
* @param to the array to write into
* @param offset array offset to start writing to
* @param in the DataInput to read from
* @param dict the dictionary to use for compression
*
* @return the length of the uncompressed data
*/
@Deprecated
static int uncompressIntoArray(byte[] to, int offset, DataInput in,
Dictionary dict) throws IOException {
byte status = in.readByte();
if (status == Dictionary.NOT_IN_DICTIONARY) {
// status byte indicating that data to be read is not in dictionary.
// if this isn't in the dictionary, we need to add to the dictionary.
int length = WritableUtils.readVInt(in);
in.readFully(to, offset, length);
dict.addEntry(to, offset, length);
return length;
} else {
// the status byte also acts as the higher order byte of the dictionary
// entry
short dictIdx = toShort(status, in.readByte());
byte[] entry;
try {
entry = dict.getEntry(dictIdx);
} catch (Exception ex) {
throw new IOException("Unable to uncompress the log entry", ex);
}
if (entry == null) {
throw new IOException("Missing dictionary entry for index "
+ dictIdx);
}
// now we write the uncompressed value.
Bytes.putBytes(to, offset, entry, 0, entry.length);
return entry.length;
}
}
/**
* Get the split meta info for the task with a specific index. This method
* reduces the overhead of creating meta objects below the index of the task.
*
* @param conf job configuration.
* @param fs FileSystem.
* @param index the index of the task.
* @return split meta info object of the task.
* @throws IOException
*/
public static TaskSplitMetaInfo getSplitMetaInfo(Configuration conf,
FileSystem fs, int index) throws IOException {
FSDataInputStream in = null;
try {
in = getFSDataIS(conf, fs);
final String jobSplitFile = MRJobConfig.JOB_SPLIT;
final String basePath =
conf.get(MRFrameworkConfigs.TASK_LOCAL_RESOURCE_DIR, ".");
final int numSplits = WritableUtils.readVInt(in); // TODO: check for insane values
if (numSplits <= index) {
throw new IOException("Index is larger than the number of splits");
}
JobSplit.SplitMetaInfo splitMetaInfo = new JobSplit.SplitMetaInfo();
int iter = 0;
while (iter++ <= index) {
splitMetaInfo.readFields(in);
}
JobSplit.TaskSplitIndex splitIndex = new JobSplit.TaskSplitIndex(
new Path(basePath, jobSplitFile)
.toUri().toString(), splitMetaInfo.getStartOffset());
return new JobSplit.TaskSplitMetaInfo(splitIndex,
splitMetaInfo.getLocations(), splitMetaInfo.getInputDataLength());
} finally {
if (in != null) {
in.close();
}
}
}