下面列出了怎么用org.apache.hadoop.io.DataOutputBuffer的API类实例代码及写法,或者点击链接到github查看源代码。
public KeyValueWriter(Configuration conf, OutputStream output,
Class<K> kyClass, Class<V> valClass
) throws IOException {
keyClass = kyClass;
valueClass = valClass;
dataBuffer = new DataOutputBuffer();
SerializationFactory serializationFactory
= new SerializationFactory(conf);
keySerializer
= (Serializer<K>)serializationFactory.getSerializer(keyClass);
keySerializer.open(dataBuffer);
valueSerializer
= (Serializer<V>)serializationFactory.getSerializer(valueClass);
valueSerializer.open(dataBuffer);
outputStream = new DataOutputStream(output);
}
private <E> E makeCopyForPassByValue(Serialization<E> serialization,
E obj) throws IOException {
Serializer<E> ser =
serialization.getSerializer(GenericsUtil.getClass(obj));
Deserializer<E> deser =
serialization.getDeserializer(GenericsUtil.getClass(obj));
DataOutputBuffer dof = threadLocalDataOutputBuffer.get();
dof.reset();
ser.open(dof);
ser.serialize(obj);
ser.close();
obj = ReflectionUtils.newInstance(GenericsUtil.getClass(obj),
getChainJobConf());
ByteArrayInputStream bais =
new ByteArrayInputStream(dof.getData(), 0, dof.getLength());
deser.open(bais);
deser.deserialize(obj);
deser.close();
return obj;
}
@Test
public void testTermQuery() throws IOException {
TermQuery query = new TermQuery(new Term("field", "value"));
QueryWritable queryWritable = new QueryWritable();
queryWritable.setQuery(query);
DataOutputBuffer out = new DataOutputBuffer();
queryWritable.write(out);
byte[] data = out.getData();
int length = out.getLength();
DataInputBuffer in = new DataInputBuffer();
in.reset(data, length);
QueryWritable newQueryWritable = new QueryWritable();
newQueryWritable.readFields(in);
Query termQuery = newQueryWritable.getQuery();
assertEquals(query, termQuery);
}
@Test(timeout=10000)
public void testLocalizerStatusSerDe() throws Exception {
LocalizerStatus rsrcS = createLocalizerStatus();
assertTrue(rsrcS instanceof LocalizerStatusPBImpl);
LocalizerStatusPBImpl rsrcPb = (LocalizerStatusPBImpl) rsrcS;
DataOutputBuffer out = new DataOutputBuffer();
rsrcPb.getProto().writeDelimitedTo(out);
DataInputBuffer in = new DataInputBuffer();
in.reset(out.getData(), 0, out.getLength());
LocalizerStatusProto rsrcPbD =
LocalizerStatusProto.parseDelimitedFrom(in);
assertNotNull(rsrcPbD);
LocalizerStatus rsrcD =
new LocalizerStatusPBImpl(rsrcPbD);
assertEquals(rsrcS, rsrcD);
assertEquals("localizer0", rsrcS.getLocalizerId());
assertEquals("localizer0", rsrcD.getLocalizerId());
assertEquals(createLocalResourceStatus(), rsrcS.getResourceStatus(0));
assertEquals(createLocalResourceStatus(), rsrcD.getResourceStatus(0));
}
public KeyValueWriter(Configuration conf, OutputStream output,
Class<K> kyClass, Class<V> valClass
) throws IOException {
keyClass = kyClass;
valueClass = valClass;
dataBuffer = new DataOutputBuffer();
SerializationFactory serializationFactory
= new SerializationFactory(conf);
keySerializer
= (Serializer<K>)serializationFactory.getSerializer(keyClass);
keySerializer.open(dataBuffer);
valueSerializer
= (Serializer<V>)serializationFactory.getSerializer(valueClass);
valueSerializer.open(dataBuffer);
outputStream = new DataOutputStream(output);
}
@Before
public void setUp() throws IOException {
// Generate data
final int seed = new Random().nextInt();
final DataOutputBuffer dataBuf = new DataOutputBuffer();
final RandomDatum.Generator generator = new RandomDatum.Generator(seed);
for(int i = 0; i < count; ++i) {
generator.next();
final RandomDatum key = generator.getKey();
final RandomDatum value = generator.getValue();
key.write(dataBuf);
value.write(dataBuf);
}
LOG.info("Generated " + count + " records");
data = dataBuf.getData();
dataLen = dataBuf.getLength();
}
public synchronized boolean next(Text key, Text value) throws IOException {
numNext++;
if (pos_ >= end_) {
return false;
}
DataOutputBuffer buf = new DataOutputBuffer();
if (!readUntilMatchBegin()) {
return false;
}
if (pos_ >= end_ || !readUntilMatchEnd(buf)) {
return false;
}
// There is only one elem..key/value splitting is not done here.
byte[] record = new byte[buf.getLength()];
System.arraycopy(buf.getData(), 0, record, 0, record.length);
numRecStats(record, 0, record.length);
key.set(record);
value.set("");
return true;
}
EditLogBackupOutputStream(NamenodeRegistration bnReg, // backup node
JournalInfo journalInfo) // active name-node
throws IOException {
super();
this.bnRegistration = bnReg;
this.journalInfo = journalInfo;
InetSocketAddress bnAddress =
NetUtils.createSocketAddr(bnRegistration.getAddress());
try {
this.backupNode = NameNodeProxies.createNonHAProxy(new HdfsConfiguration(),
bnAddress, JournalProtocol.class, UserGroupInformation.getCurrentUser(),
true).getProxy();
} catch(IOException e) {
Storage.LOG.error("Error connecting to: " + bnAddress, e);
throw e;
}
this.doubleBuf = new EditsDoubleBuffer(DEFAULT_BUFFER_SIZE);
this.out = new DataOutputBuffer(DEFAULT_BUFFER_SIZE);
}
private <K> K serDeser(K conf) throws Exception {
SerializationFactory factory = new SerializationFactory(CONF);
Serializer<K> serializer =
factory.getSerializer(GenericsUtil.getClass(conf));
Deserializer<K> deserializer =
factory.getDeserializer(GenericsUtil.getClass(conf));
DataOutputBuffer out = new DataOutputBuffer();
serializer.open(out);
serializer.serialize(conf);
serializer.close();
DataInputBuffer in = new DataInputBuffer();
in.reset(out.getData(), out.getLength());
deserializer.open(in);
K after = deserializer.deserialize(null);
deserializer.close();
return after;
}
@Test
public void testReadWriteReplicaState() {
try {
DataOutputBuffer out = new DataOutputBuffer();
DataInputBuffer in = new DataInputBuffer();
for (HdfsServerConstants.ReplicaState repState : HdfsServerConstants.ReplicaState
.values()) {
repState.write(out);
in.reset(out.getData(), out.getLength());
HdfsServerConstants.ReplicaState result = HdfsServerConstants.ReplicaState
.read(in);
assertTrue("testReadWrite error !!!", repState == result);
out.reset();
in.reset();
}
} catch (Exception ex) {
fail("testReadWrite ex error ReplicaState");
}
}
public synchronized boolean next(WritableComparable key, Writable value) throws IOException {
numNext++;
if (pos_ >= end_) {
return false;
}
DataOutputBuffer buf = new DataOutputBuffer();
if (!readUntilMatchBegin()) {
return false;
}
if (!readUntilMatchEnd(buf)) {
return false;
}
// There is only one elem..key/value splitting is not done here.
byte[] record = new byte[buf.getLength()];
System.arraycopy(buf.getData(), 0, record, 0, record.length);
numRecStats(record, 0, record.length);
((Text) key).set(record);
((Text) value).set("");
return true;
}
public synchronized boolean next(Text key, Text value) throws IOException {
numNext++;
if (pos_ >= end_) {
return false;
}
DataOutputBuffer buf = new DataOutputBuffer();
if (!readUntilMatchBegin()) {
return false;
}
if (pos_ >= end_ || !readUntilMatchEnd(buf)) {
return false;
}
// There is only one elem..key/value splitting is not done here.
byte[] record = new byte[buf.getLength()];
System.arraycopy(buf.getData(), 0, record, 0, record.length);
numRecStats(record, 0, record.length);
key.set(record);
value.set("");
return true;
}
/** @deprecated Call {@link #nextRaw(DataOutputBuffer,SequenceFile.ValueBytes)}. */
@Deprecated
synchronized int next(DataOutputBuffer buffer) throws IOException {
// Unsupported for block-compressed sequence files
if (blockCompressed) {
throw new IOException("Unsupported call for block-compressed" +
" SequenceFiles - use SequenceFile.Reader.next(DataOutputStream, ValueBytes)");
}
try {
int length = readRecordLength();
if (length == -1) {
return -1;
}
int keyLength = in.readInt();
buffer.write(in, length);
return keyLength;
} catch (ChecksumException e) { // checksum failure
handleChecksumException(e);
return next(buffer);
}
}
public static void setTokensFor(ContainerLaunchContext amContainer, List<Path> paths, Configuration conf) throws IOException {
Credentials credentials = new Credentials();
// for HDFS
TokenCache.obtainTokensForNamenodes(credentials, paths.toArray(new Path[0]), conf);
// for HBase
obtainTokenForHBase(credentials, conf);
// for user
UserGroupInformation currUsr = UserGroupInformation.getCurrentUser();
Collection<Token<? extends TokenIdentifier>> usrTok = currUsr.getTokens();
for (Token<? extends TokenIdentifier> token : usrTok) {
final Text id = new Text(token.getIdentifier());
LOG.info("Adding user token " + id + " with " + token);
credentials.addToken(id, token);
}
try (DataOutputBuffer dob = new DataOutputBuffer()) {
credentials.writeTokenStorageToStream(dob);
if (LOG.isDebugEnabled()) {
LOG.debug("Wrote tokens. Credentials buffer length: " + dob.getLength());
}
ByteBuffer securityTokens = ByteBuffer.wrap(dob.getData(), 0, dob.getLength());
amContainer.setTokens(securityTokens);
}
}
@Override
protected OutputStream getOutputStream(int bufferSize, byte[] key, byte[] iv)
throws IOException {
DataOutputBuffer out = new DataOutputBuffer() {
@Override
public void flush() throws IOException {
buf = getData();
bufLen = getLength();
}
@Override
public void close() throws IOException {
buf = getData();
bufLen = getLength();
}
};
return new CryptoOutputStream(new FakeOutputStream(out),
codec, bufferSize, key, iv);
}
private static byte[] toByteArray(Writable... writables) {
final DataOutputBuffer out = new DataOutputBuffer();
try {
for(Writable w : writables) {
w.write(out);
}
out.close();
} catch (IOException e) {
throw new RuntimeException("Fail to convert writables to a byte array",e);
}
byte[] bytes = out.getData();
if (bytes.length == out.getLength()) {
return bytes;
}
byte[] result = new byte[out.getLength()];
System.arraycopy(bytes, 0, result, 0, out.getLength());
return result;
}
public void testIFileStream() throws Exception {
final int DLEN = 100;
DataOutputBuffer dob = new DataOutputBuffer(DLEN + 4);
IFileOutputStream ifos = new IFileOutputStream(dob);
for (int i = 0; i < DLEN; ++i) {
ifos.write(i);
}
ifos.close();
DataInputBuffer dib = new DataInputBuffer();
dib.reset(dob.getData(), DLEN + 4);
IFileInputStream ifis = new IFileInputStream(dib, 104, new Configuration());
for (int i = 0; i < DLEN; ++i) {
assertEquals(i, ifis.read());
}
ifis.close();
}
public static void setTokensFor(ContainerLaunchContext amContainer, Path[] paths, Configuration conf) throws IOException {
Credentials credentials = new Credentials();
// for HDFS
TokenCache.obtainTokensForNamenodes(credentials, paths, conf);
// for user
UserGroupInformation currUsr = UserGroupInformation.getCurrentUser();
Collection<Token<? extends TokenIdentifier>> usrTok = currUsr.getTokens();
for(Token<? extends TokenIdentifier> token : usrTok) {
final Text id = new Text(token.getIdentifier());
LOG.info("Adding user token "+id+" with "+token);
credentials.addToken(id, token);
}
DataOutputBuffer dob = new DataOutputBuffer();
credentials.writeTokenStorageToStream(dob);
LOG.debug("Wrote tokens. Credentials buffer length: "+dob.getLength());
ByteBuffer securityTokens = ByteBuffer.wrap(dob.getData(), 0, dob.getLength());
amContainer.setTokens(securityTokens);
}
@Test
@SuppressWarnings({"rawtypes", "unchecked"})
public void testWritableComparatorJavaSerialization() throws Exception {
Serialization ser = new JavaSerialization();
Serializer<TestWC> serializer = ser.getSerializer(TestWC.class);
DataOutputBuffer dob = new DataOutputBuffer();
serializer.open(dob);
TestWC orig = new TestWC(0);
serializer.serialize(orig);
serializer.close();
Deserializer<TestWC> deserializer = ser.getDeserializer(TestWC.class);
DataInputBuffer dib = new DataInputBuffer();
dib.reset(dob.getData(), 0, dob.getLength());
deserializer.open(dib);
TestWC deser = deserializer.deserialize(null);
deserializer.close();
assertEquals(orig, deser);
}
/**
* A utility that tests serialization/deserialization.
* @param conf configuration to use, "io.serializations" is read to
* determine the serialization
* @param <K> the class of the item
* @param before item to (de)serialize
* @return deserialized item
*/
public static <K> K testSerialization(Configuration conf, K before)
throws Exception {
SerializationFactory factory = new SerializationFactory(conf);
Serializer<K> serializer
= factory.getSerializer(GenericsUtil.getClass(before));
Deserializer<K> deserializer
= factory.getDeserializer(GenericsUtil.getClass(before));
DataOutputBuffer out = new DataOutputBuffer();
serializer.open(out);
serializer.serialize(before);
serializer.close();
DataInputBuffer in = new DataInputBuffer();
in.reset(out.getData(), out.getLength());
deserializer.open(in);
K after = deserializer.deserialize(null);
deserializer.close();
return after;
}
private <E> E makeCopyForPassByValue(Serialization<E> serialization,
E obj) throws IOException {
Serializer<E> ser =
serialization.getSerializer(GenericsUtil.getClass(obj));
Deserializer<E> deser =
serialization.getDeserializer(GenericsUtil.getClass(obj));
DataOutputBuffer dof = threadLocalDataOutputBuffer.get();
dof.reset();
ser.open(dof);
ser.serialize(obj);
ser.close();
obj = ReflectionUtils.newInstance(GenericsUtil.getClass(obj),
getChainJobConf());
ByteArrayInputStream bais =
new ByteArrayInputStream(dof.getData(), 0, dof.getLength());
deser.open(bais);
deser.deserialize(obj);
deser.close();
return obj;
}
/** Read a compressed buffer */
private synchronized void readBuffer(DataInputBuffer buffer,
CompressionInputStream filter) throws IOException {
// Read data into a temporary buffer
DataOutputBuffer dataBuffer = new DataOutputBuffer();
try {
int dataBufferLength = WritableUtils.readVInt(in);
dataBuffer.write(in, dataBufferLength);
// Set up 'buffer' connected to the input-stream
buffer.reset(dataBuffer.getData(), 0, dataBuffer.getLength());
} finally {
dataBuffer.close();
}
// Reset the codec
filter.resetState();
}
@Override
public void handle(Channel channel, Token<DelegationTokenIdentifier> token,
String serviceUrl) throws IOException {
Assert.assertEquals(testToken, token);
Credentials creds = new Credentials();
creds.addToken(new Text(serviceUrl), token);
DataOutputBuffer out = new DataOutputBuffer();
creds.write(out);
int fileLength = out.getData().length;
ChannelBuffer cbuffer = ChannelBuffers.buffer(fileLength);
cbuffer.writeBytes(out.getData());
HttpResponse response = new DefaultHttpResponse(HTTP_1_1, OK);
response.setHeader(HttpHeaders.Names.CONTENT_LENGTH,
String.valueOf(fileLength));
response.setContent(cbuffer);
channel.write(response).addListener(ChannelFutureListener.CLOSE);
}
public static byte[] createTxnData(int startTxn, int numTxns) throws Exception {
DataOutputBuffer buf = new DataOutputBuffer();
FSEditLogOp.Writer writer = new FSEditLogOp.Writer(buf);
for (long txid = startTxn; txid < startTxn + numTxns; txid++) {
FSEditLogOp op = NameNodeAdapter.createMkdirOp("tx " + txid);
op.setTransactionId(txid);
writer.writeOp(op);
}
return Arrays.copyOf(buf.getData(), buf.getLength());
}
/**
* Read an IndexSegment from an existing file.
*/
public IndexSegment(IndexTableEntry tableEntry, DataInput in)
throws IOException {
this.recordLenBytes = new BytesWritable();
this.outputBuffer = new DataOutputBuffer(10);
this.tableEntry = tableEntry;
readFields(in);
}
public static void copy(DataInputBuffer src, DataOutputBuffer dst) throws IOException {
byte[] b1 = src.getData();
int s1 = src.getPosition();
int l1 = src.getLength();
dst.reset();
dst.write(b1, s1, l1 - s1);
}
static void checkSpec(GridmixKey a, GridmixKey b) throws Exception {
final Random r = new Random();
final long s = r.nextLong();
r.setSeed(s);
LOG.info("spec: " + s);
final DataInputBuffer in = new DataInputBuffer();
final DataOutputBuffer out = new DataOutputBuffer();
a.setType(GridmixKey.REDUCE_SPEC);
b.setType(GridmixKey.REDUCE_SPEC);
for (int i = 0; i < 100; ++i) {
final int in_rec = r.nextInt(Integer.MAX_VALUE);
a.setReduceInputRecords(in_rec);
final int out_rec = r.nextInt(Integer.MAX_VALUE);
a.setReduceOutputRecords(out_rec);
final int out_bytes = r.nextInt(Integer.MAX_VALUE);
a.setReduceOutputBytes(out_bytes);
final int min = WritableUtils.getVIntSize(in_rec)
+ WritableUtils.getVIntSize(out_rec)
+ WritableUtils.getVIntSize(out_bytes);
assertEquals(min + 2, a.fixedBytes()); // meta + vint min
final int size = r.nextInt(1024) + a.fixedBytes() + 1;
setSerialize(a, r.nextLong(), size, out);
assertEquals(size, out.getLength());
assertTrue(a.equals(a));
assertEquals(0, a.compareTo(a));
in.reset(out.getData(), 0, out.getLength());
b.readFields(in);
assertEquals(size, b.getSize());
assertEquals(in_rec, b.getReduceInputRecords());
assertEquals(out_rec, b.getReduceOutputRecords());
assertEquals(out_bytes, b.getReduceOutputBytes());
assertTrue(a.equals(b));
assertEquals(0, a.compareTo(b));
assertEquals(a.hashCode(), b.hashCode());
}
}
private ContainerManagerApplicationProto buildAppProto(ApplicationId appId,
String user, Credentials credentials,
Map<ApplicationAccessType, String> appAcls,
LogAggregationContext logAggregationContext) {
ContainerManagerApplicationProto.Builder builder =
ContainerManagerApplicationProto.newBuilder();
builder.setId(((ApplicationIdPBImpl) appId).getProto());
builder.setUser(user);
if (logAggregationContext != null) {
builder.setLogAggregationContext((
(LogAggregationContextPBImpl)logAggregationContext).getProto());
}
builder.clearCredentials();
if (credentials != null) {
DataOutputBuffer dob = new DataOutputBuffer();
try {
credentials.writeTokenStorageToStream(dob);
builder.setCredentials(ByteString.copyFrom(dob.getData()));
} catch (IOException e) {
// should not occur
LOG.error("Cannot serialize credentials", e);
}
}
builder.clearAcls();
if (appAcls != null) {
for (Map.Entry<ApplicationAccessType, String> acl : appAcls.entrySet()) {
ApplicationACLMapProto p = ApplicationACLMapProto.newBuilder()
.setAccessType(ProtoUtils.convertToProtoFormat(acl.getKey()))
.setAcl(acl.getValue())
.build();
builder.addAcls(p);
}
}
return builder.build();
}
private void writeHeader() throws IOException {
// Write out the header and version
out.write(Server.HEADER.array());
out.write(Server.CURRENT_VERSION);
// Write out the ConnectionHeader
DataOutputBuffer buf = new DataOutputBuffer();
header.write(buf);
// Write out the payload length
int bufLen = buf.getLength();
out.writeInt(bufLen);
out.write(buf.getData(), 0, bufLen);
}
private void setupTokens(
ContainerLaunchContext container, ContainerId containerID)
throws IOException {
Map<String, String> environment = container.getEnvironment();
environment.put(ApplicationConstants.APPLICATION_WEB_PROXY_BASE_ENV,
application.getWebProxyBase());
// Set AppSubmitTime and MaxAppAttempts to be consumable by the AM.
ApplicationId applicationId =
application.getAppAttemptId().getApplicationId();
environment.put(
ApplicationConstants.APP_SUBMIT_TIME_ENV,
String.valueOf(rmContext.getRMApps()
.get(applicationId)
.getSubmitTime()));
environment.put(ApplicationConstants.MAX_APP_ATTEMPTS_ENV,
String.valueOf(rmContext.getRMApps().get(
applicationId).getMaxAppAttempts()));
Credentials credentials = new Credentials();
DataInputByteBuffer dibb = new DataInputByteBuffer();
if (container.getTokens() != null) {
// TODO: Don't do this kind of checks everywhere.
dibb.reset(container.getTokens());
credentials.readTokenStorageStream(dibb);
}
// Add AMRMToken
Token<AMRMTokenIdentifier> amrmToken = createAndSetAMRMToken();
if (amrmToken != null) {
credentials.addToken(amrmToken.getService(), amrmToken);
}
DataOutputBuffer dob = new DataOutputBuffer();
credentials.writeTokenStorageToStream(dob);
container.setTokens(ByteBuffer.wrap(dob.getData(), 0, dob.getLength()));
}