下面列出了org.apache.hadoop.io.DataOutputBuffer#write ( ) 实例代码,或者点击链接到github查看源代码,也可以在右侧发表评论。
public void write(DataOutput out) throws IOException {
if (firstKey == null) {
Utils.writeVInt(out, 0);
return;
}
DataOutputBuffer dob = new DataOutputBuffer();
Utils.writeVInt(dob, firstKey.size());
dob.write(firstKey.buffer());
Utils.writeVInt(out, dob.size());
out.write(dob.getData(), 0, dob.getLength());
for (TFileIndexEntry entry : index) {
dob.reset();
entry.write(dob);
Utils.writeVInt(out, dob.getLength());
out.write(dob.getData(), 0, dob.getLength());
}
}
/** Read a compressed buffer */
private synchronized void readBuffer(DataInputBuffer buffer,
CompressionInputStream filter) throws IOException {
// Read data into a temporary buffer
DataOutputBuffer dataBuffer = new DataOutputBuffer();
try {
int dataBufferLength = WritableUtils.readVInt(in);
dataBuffer.write(in, dataBufferLength);
// Set up 'buffer' connected to the input-stream
buffer.reset(dataBuffer.getData(), 0, dataBuffer.getLength());
} finally {
dataBuffer.close();
}
// Reset the codec
filter.resetState();
}
/** @deprecated Call {@link #nextRaw(DataOutputBuffer,SequenceFile.ValueBytes)}. */
@Deprecated
synchronized int next(DataOutputBuffer buffer) throws IOException {
// Unsupported for block-compressed sequence files
if (blockCompressed) {
throw new IOException("Unsupported call for block-compressed" +
" SequenceFiles - use SequenceFile.Reader.next(DataOutputStream, ValueBytes)");
}
try {
int length = readRecordLength();
if (length == -1) {
return -1;
}
int keyLength = in.readInt();
buffer.write(in, length);
return keyLength;
} catch (ChecksumException e) { // checksum failure
handleChecksumException(e);
return next(buffer);
}
}
public void write(DataOutput out) throws IOException {
if (firstKey == null) {
Utils.writeVInt(out, 0);
return;
}
DataOutputBuffer dob = new DataOutputBuffer();
Utils.writeVInt(dob, firstKey.size());
dob.write(firstKey.buffer());
Utils.writeVInt(out, dob.size());
out.write(dob.getData(), 0, dob.getLength());
for (TFileIndexEntry entry : index) {
dob.reset();
entry.write(dob);
Utils.writeVInt(out, dob.getLength());
out.write(dob.getData(), 0, dob.getLength());
}
}
/** Read a compressed buffer */
private synchronized void readBuffer(DataInputBuffer buffer,
CompressionInputStream filter) throws IOException {
// Read data into a temporary buffer
DataOutputBuffer dataBuffer = new DataOutputBuffer();
try {
int dataBufferLength = WritableUtils.readVInt(in);
dataBuffer.write(in, dataBufferLength);
// Set up 'buffer' connected to the input-stream
buffer.reset(dataBuffer.getData(), 0, dataBuffer.getLength());
} finally {
dataBuffer.close();
}
// Reset the codec
filter.resetState();
}
/** @deprecated Call {@link #nextRaw(DataOutputBuffer,SequenceFile.ValueBytes)}. */
@Deprecated
synchronized int next(DataOutputBuffer buffer) throws IOException {
// Unsupported for block-compressed sequence files
if (blockCompressed) {
throw new IOException("Unsupported call for block-compressed" +
" SequenceFiles - use SequenceFile.Reader.next(DataOutputStream, ValueBytes)");
}
try {
int length = readRecordLength();
if (length == -1) {
return -1;
}
int keyLength = in.readInt();
buffer.write(in, length);
return keyLength;
} catch (ChecksumException e) { // checksum failure
handleChecksumException(e);
return next(buffer);
}
}
public void write(DataOutput out) throws IOException {
if (firstKey == null) {
Utils.writeVInt(out, 0);
return;
}
DataOutputBuffer dob = new DataOutputBuffer();
Utils.writeVInt(dob, firstKey.size());
dob.write(firstKey.buffer());
Utils.writeVInt(out, dob.size());
out.write(dob.getData(), 0, dob.getLength());
for (TFileIndexEntry entry : index) {
dob.reset();
entry.write(dob);
Utils.writeVInt(out, dob.getLength());
out.write(dob.getData(), 0, dob.getLength());
}
}
public void write(DataOutput out) throws IOException {
if (firstKey == null) {
Utils.writeVInt(out, 0);
return;
}
DataOutputBuffer dob = new DataOutputBuffer();
Utils.writeVInt(dob, firstKey.size());
dob.write(firstKey.buffer());
Utils.writeVInt(out, dob.size());
out.write(dob.getData(), 0, dob.getLength());
for (TFileIndexEntry entry : index) {
dob.reset();
entry.write(dob);
Utils.writeVInt(out, dob.getLength());
out.write(dob.getData(), 0, dob.getLength());
}
}
@Override
public byte[] serialize(TMessageSet payload) {
DataOutputBuffer outBuffer = new DataOutputBuffer();
try {
outBuffer.reset();
outBuffer.writeUTF(payload.getApp());
outBuffer.writeInt(payload.getNumMessages());
outBuffer.writeByte(payload.getCompression());
outBuffer.writeLong(payload.getCrc());
outBuffer.writeInt(payload.getMessages().length);
outBuffer.write(payload.getMessages());
return ByteBuffer.wrap(outBuffer.getData(), 0, outBuffer.getLength()).array();
} catch (Exception e) {
throw new RuntimeException("Failed to serialize TMessageSet: "+e.getMessage(), e);
} finally {
Closeables.closeQuietly(outBuffer);
}
}
public void write(DataOutput out) throws IOException {
if (firstKey == null) {
Utils.writeVInt(out, 0);
return;
}
DataOutputBuffer dob = new DataOutputBuffer();
Utils.writeVInt(dob, firstKey.size());
dob.write(firstKey.buffer());
Utils.writeVInt(out, dob.size());
out.write(dob.getData(), 0, dob.getLength());
for (TFileIndexEntry entry : index) {
dob.reset();
entry.write(dob);
Utils.writeVInt(out, dob.getLength());
out.write(dob.getData(), 0, dob.getLength());
}
}
boolean fastReadUntilMatch(String textPat, boolean includePat, DataOutputBuffer outBufOrNull) throws IOException {
byte[] cpat = textPat.getBytes("UTF-8");
int m = 0;
boolean match = false;
int msup = cpat.length;
int LL = 120000 * 10;
bin_.mark(LL); // large number to invalidate mark
while (true) {
int b = bin_.read();
if (b == -1) break;
byte c = (byte) b; // this assumes eight-bit matching. OK with UTF-8
if (c == cpat[m]) {
m++;
if (m == msup) {
match = true;
break;
}
} else {
bin_.mark(LL); // rest mark so we could jump back if we found a match
if (outBufOrNull != null) {
outBufOrNull.write(cpat, 0, m);
outBufOrNull.write(c);
}
pos_ += m + 1; // skip m chars, +1 for 'c'
m = 0;
}
}
if (!includePat && match) {
bin_.reset();
} else if (outBufOrNull != null) {
outBufOrNull.write(cpat);
pos_ += msup;
}
return match;
}
void GzipConcatTest(Configuration conf,
Class<? extends Decompressor> decomClass) throws IOException {
Random r = new Random();
long seed = r.nextLong();
r.setSeed(seed);
LOG.info(decomClass + " seed: " + seed);
final int CONCAT = r.nextInt(4) + 3;
final int BUFLEN = 128 * 1024;
DataOutputBuffer dflbuf = new DataOutputBuffer();
DataOutputBuffer chkbuf = new DataOutputBuffer();
byte[] b = new byte[BUFLEN];
for (int i = 0; i < CONCAT; ++i) {
GZIPOutputStream gzout = new GZIPOutputStream(dflbuf);
r.nextBytes(b);
int len = r.nextInt(BUFLEN);
int off = r.nextInt(BUFLEN - len);
chkbuf.write(b, off, len);
gzout.write(b, off, len);
gzout.close();
}
final byte[] chk = Arrays.copyOf(chkbuf.getData(), chkbuf.getLength());
CompressionCodec codec = ReflectionUtils.newInstance(GzipCodec.class, conf);
Decompressor decom = codec.createDecompressor();
assertNotNull(decom);
assertEquals(decomClass, decom.getClass());
DataInputBuffer gzbuf = new DataInputBuffer();
gzbuf.reset(dflbuf.getData(), dflbuf.getLength());
InputStream gzin = codec.createInputStream(gzbuf, decom);
dflbuf.reset();
IOUtils.copyBytes(gzin, dflbuf, 4096);
final byte[] dflchk = Arrays.copyOf(dflbuf.getData(), dflbuf.getLength());
assertArrayEquals(chk, dflchk);
}
/** Read the next key/value pair in the file into <code>buffer</code>.
* Returns the length of the key read, or -1 if at end of file. The length
* of the value may be computed by calling buffer.getLength() before and
* after calls to this method. */
synchronized int next(DataOutputBuffer buffer) throws IOException {
try {
int length = readRecordLength();
if (length == -1) {
return -1;
}
int keyLength = in.readInt();
buffer.write(in, length);
return keyLength;
} catch (ChecksumException e) { // checksum failure
handleChecksumException(e);
return next(buffer);
}
}
boolean fastReadUntilMatch(String textPat, boolean includePat, DataOutputBuffer outBufOrNull) throws IOException {
byte[] cpat = textPat.getBytes("UTF-8");
int m = 0;
boolean match = false;
int msup = cpat.length;
int LL = 120000 * 10;
bin_.mark(LL); // large number to invalidate mark
while (true) {
int b = bin_.read();
if (b == -1) break;
byte c = (byte) b; // this assumes eight-bit matching. OK with UTF-8
if (c == cpat[m]) {
m++;
if (m == msup) {
match = true;
break;
}
} else {
bin_.mark(LL); // rest mark so we could jump back if we found a match
if (outBufOrNull != null) {
outBufOrNull.write(cpat, 0, m);
outBufOrNull.write(c);
}
pos_ += m + 1; // skip m chars, +1 for 'c'
m = 0;
}
}
if (!includePat && match) {
bin_.reset();
} else if (outBufOrNull != null) {
outBufOrNull.write(cpat);
pos_ += msup;
}
return match;
}
void GzipConcatTest(Configuration conf,
Class<? extends Decompressor> decomClass) throws IOException {
Random r = new Random();
long seed = r.nextLong();
r.setSeed(seed);
LOG.info(decomClass + " seed: " + seed);
final int CONCAT = r.nextInt(4) + 3;
final int BUFLEN = 128 * 1024;
DataOutputBuffer dflbuf = new DataOutputBuffer();
DataOutputBuffer chkbuf = new DataOutputBuffer();
byte[] b = new byte[BUFLEN];
for (int i = 0; i < CONCAT; ++i) {
GZIPOutputStream gzout = new GZIPOutputStream(dflbuf);
r.nextBytes(b);
int len = r.nextInt(BUFLEN);
int off = r.nextInt(BUFLEN - len);
chkbuf.write(b, off, len);
gzout.write(b, off, len);
gzout.close();
}
final byte[] chk = Arrays.copyOf(chkbuf.getData(), chkbuf.getLength());
CompressionCodec codec = ReflectionUtils.newInstance(GzipCodec.class, conf);
Decompressor decom = codec.createDecompressor();
assertNotNull(decom);
assertEquals(decomClass, decom.getClass());
DataInputBuffer gzbuf = new DataInputBuffer();
gzbuf.reset(dflbuf.getData(), dflbuf.getLength());
InputStream gzin = codec.createInputStream(gzbuf, decom);
dflbuf.reset();
IOUtils.copyBytes(gzin, dflbuf, 4096);
final byte[] dflchk = Arrays.copyOf(dflbuf.getData(), dflbuf.getLength());
assertArrayEquals(chk, dflchk);
}
boolean fastReadUntilMatch(String textPat, boolean includePat, DataOutputBuffer outBufOrNull) throws IOException {
byte[] cpat = textPat.getBytes("UTF-8");
int m = 0;
boolean match = false;
int msup = cpat.length;
int LL = 120000 * 10;
bin_.mark(LL); // large number to invalidate mark
while (true) {
int b = bin_.read();
if (b == -1) break;
byte c = (byte) b; // this assumes eight-bit matching. OK with UTF-8
if (c == cpat[m]) {
m++;
if (m == msup) {
match = true;
break;
}
} else {
bin_.mark(LL); // rest mark so we could jump back if we found a match
if (outBufOrNull != null) {
outBufOrNull.write(cpat, 0, m);
outBufOrNull.write(c);
pos_ += m;
}
m = 0;
}
}
if (!includePat && match) {
bin_.reset();
} else if (outBufOrNull != null) {
outBufOrNull.write(cpat);
pos_ += msup;
}
return match;
}
boolean fastReadUntilMatch(String textPat, boolean includePat, DataOutputBuffer outBufOrNull) throws IOException {
byte[] cpat = textPat.getBytes("UTF-8");
int m = 0;
boolean match = false;
int msup = cpat.length;
int LL = 120000 * 10;
bin_.mark(LL); // large number to invalidate mark
while (true) {
int b = bin_.read();
if (b == -1) break;
byte c = (byte) b; // this assumes eight-bit matching. OK with UTF-8
if (c == cpat[m]) {
m++;
if (m == msup) {
match = true;
break;
}
} else {
bin_.mark(LL); // rest mark so we could jump back if we found a match
if (outBufOrNull != null) {
outBufOrNull.write(cpat, 0, m);
outBufOrNull.write(c);
pos_ += m;
}
m = 0;
}
}
if (!includePat && match) {
bin_.reset();
} else if (outBufOrNull != null) {
outBufOrNull.write(cpat);
pos_ += msup;
}
return match;
}
public static void copy(DataInputBuffer src, DataOutputBuffer dst) throws IOException {
byte[] b1 = src.getData();
int s1 = src.getPosition();
int l1 = src.getLength();
dst.reset();
dst.write(b1, s1, l1 - s1);
}
public static void copy(DataOutputBuffer src, DataOutputBuffer dst) throws IOException {
byte[] b1 = src.getData();
int s1 = 0;
int l1 = src.getLength();
dst.reset();
dst.write(b1, s1, l1);
}
boolean fastReadUntilMatch(String textPat, boolean includePat, DataOutputBuffer outBufOrNull) throws IOException {
byte[] cpat = textPat.getBytes("UTF-8");
int m = 0;
boolean match = false;
int msup = cpat.length;
int LL = 120000 * 10;
bin_.mark(LL); // large number to invalidate mark
while (true) {
int b = bin_.read();
if (b == -1) break;
byte c = (byte) b; // this assumes eight-bit matching. OK with UTF-8
if (c == cpat[m]) {
m++;
if (m == msup) {
match = true;
break;
}
} else {
bin_.mark(LL); // rest mark so we could jump back if we found a match
if (outBufOrNull != null) {
outBufOrNull.write(cpat, 0, m);
outBufOrNull.write(c);
pos_ += m;
}
m = 0;
}
}
if (!includePat && match) {
bin_.reset();
} else if (outBufOrNull != null) {
outBufOrNull.write(cpat);
pos_ += msup;
}
return match;
}