下面列出了java.nio.ByteBuffer#reset ( ) 实例代码,或者点击链接到github查看源代码,也可以在右侧发表评论。
private int findDesigBuf(ByteBuffer in, byte[][] desigs) {
if (desigs == null) return -1;
int i = 0;
while (i < desigs.length) {
if (desigs[i] != null && in.remaining() >= desigs[i].length) {
int j = 0;
in.mark();
while (j < desigs[i].length && in.get() == desigs[i][j]) { j++; }
if (j == desigs[i].length)
return i;
in.reset();
}
i++;
}
return -1;
}
private int processFrameSize(ByteBuffer buffer) {
int frameSize;
if (buffer.isDirect()) {
buffer.mark();
buffer.get(headerBytes);
buffer.reset();
frameSize = OpusDecoder.getPacketFrameSize(inputFrequency, headerBytes, 0, headerBytes.length);
} else {
frameSize = OpusDecoder.getPacketFrameSize(inputFrequency, buffer.array(), buffer.position(), buffer.remaining());
}
if (frameSize == 0) {
return 0;
} else if (frameSize != lastFrameSize) {
lastFrameSize = frameSize;
inputFormat = new OpusAudioDataFormat(inputChannels, inputFrequency, frameSize);
}
currentFrameDuration = frameSize * 1000 / inputFrequency;
currentTimecode += currentFrameDuration;
return frameSize;
}
/**
* Returns the next code point at the current position in
* the buffer. The buffer's position will be incremented.
* Any mark set on this buffer will be changed by this method!
*/
public static int bytesToCodePoint(ByteBuffer bytes) {
bytes.mark();
byte b = bytes.get();
bytes.reset();
int extraBytesToRead = bytesFromUTF8[(b & 0xFF)];
if (extraBytesToRead < 0) return -1; // trailing byte!
int ch = 0;
switch (extraBytesToRead) {
case 5: ch += (bytes.get() & 0xFF); ch <<= 6; /* remember, illegal UTF-8 */
case 4: ch += (bytes.get() & 0xFF); ch <<= 6; /* remember, illegal UTF-8 */
case 3: ch += (bytes.get() & 0xFF); ch <<= 6;
case 2: ch += (bytes.get() & 0xFF); ch <<= 6;
case 1: ch += (bytes.get() & 0xFF); ch <<= 6;
case 0: ch += (bytes.get() & 0xFF);
}
ch -= offsetsFromUTF8[extraBytesToRead];
return ch;
}
private HandshakeState isFlashEdgeCase( ByteBuffer request ) throws IncompleteHandshakeException {
request.mark();
if( request.limit() > Draft.FLASH_POLICY_REQUEST.length ) {
return HandshakeState.NOT_MATCHED;
} else if( request.limit() < Draft.FLASH_POLICY_REQUEST.length ) {
throw new IncompleteHandshakeException( Draft.FLASH_POLICY_REQUEST.length );
} else {
for( int flash_policy_index = 0; request.hasRemaining(); flash_policy_index++ ) {
if( Draft.FLASH_POLICY_REQUEST[flash_policy_index] != request.get() ) {
request.reset();
return HandshakeState.NOT_MATCHED;
}
}
return HandshakeState.MATCHED;
}
}
@Test
public void testDirectReadFullyPosition() throws Exception {
ByteBuffer readBuffer = ByteBuffer.allocateDirect(10);
readBuffer.position(3);
readBuffer.mark();
FSDataInputStream hadoopStream = new FSDataInputStream(new MockHadoopInputStream(2, 3, 3));
MockBufferReader reader = new MockBufferReader(hadoopStream);
H2SeekableInputStream.readFully(reader, readBuffer);
Assert.assertEquals(10, readBuffer.position());
Assert.assertEquals(10, readBuffer.limit());
H2SeekableInputStream.readFully(reader, readBuffer);
Assert.assertEquals(10, readBuffer.position());
Assert.assertEquals(10, readBuffer.limit());
readBuffer.reset();
Assert.assertEquals("Buffer contents should match",
ByteBuffer.wrap(TEST_ARRAY, 0, 7), readBuffer);
}
/**
* Finds any occurence of <code>what</code> in the backing
* buffer, starting as position <code>start</code>. The starting
* position is measured in bytes and the return value is in
* terms of byte position in the buffer. The backing buffer is
* not converted to a string for this operation.
* @return byte position of the first occurence of the search
* string in the UTF-8 buffer or -1 if not found
*/
public int find(String what, int start) {
try {
ByteBuffer src = ByteBuffer.wrap(this.bytes,0,this.length);
ByteBuffer tgt = encode(what);
byte b = tgt.get();
src.position(start);
while (src.hasRemaining()) {
if (b == src.get()) { // matching first byte
src.mark(); // save position in loop
tgt.mark(); // save position in target
boolean found = true;
int pos = src.position()-1;
while (tgt.hasRemaining()) {
if (!src.hasRemaining()) { // src expired first
tgt.reset();
src.reset();
found = false;
break;
}
if (!(tgt.get() == src.get())) {
tgt.reset();
src.reset();
found = false;
break; // no match
}
}
if (found) return pos;
}
}
return -1; // not found
} catch (CharacterCodingException e) {
// can't get here
e.printStackTrace();
return -1;
}
}
private static byte[] readBytes(ByteBuffer buffer, int offset, int size)
{
byte[] dest = new byte[size];
if (buffer.hasArray())
{
System.arraycopy(buffer.array(), buffer.arrayOffset() + offset, dest, 0, size);
}
else
{
buffer.mark();
buffer.get(dest);
buffer.reset();
}
return dest;
}
@Override
public JournalStoreQueryResult parse(byte[] bytes) {
ByteBuffer buffer = ByteBuffer.wrap(bytes);
int cmd = buffer.get();
int code = buffer.get();
long index = buffer.getLong();
int entriesSize = buffer.getShort();
List<JournalEntry> entries = new ArrayList<>(entriesSize);
byte[] headerBytes = new byte[journalEntryParser.headerLength()];
for (int i = 0; i < entriesSize; i++) {
buffer.mark();
buffer.get(headerBytes);
buffer.reset();
JournalEntry header = journalEntryParser.parseHeader(headerBytes);
int length = header.getLength();
byte[] raw = new byte[length];
buffer.get(raw);
JournalEntry entry = journalEntryParser.parse(raw);
entries.add(entry);
}
int boundariesSize = buffer.getShort();
Map<Integer, JournalStoreQueryResult.Boundary> boundaries = new HashMap<>(boundariesSize);
for (int i = 0; i < boundariesSize; i++) {
boundaries.put((int) buffer.getShort(),
new JournalStoreQueryResult.Boundary(buffer.getLong(), buffer.getLong()));
}
return new JournalStoreQueryResult(entries, boundaries, cmd, index, code);
}
@Override
public void unpack(ByteBuffer buf) throws IOException {
name=DoomBuffer.getNullTerminatedString(buf, SAVESTRINGSIZE);
vcheck=DoomBuffer.getNullTerminatedString(buf, VERSIONSIZE);
String vcheckb= ("version "+VERSION);
// no more unpacking, and report it.
if (wrongversion = !(vcheckb.equalsIgnoreCase(vcheck))) return;
gameskill = buf.get();
gameepisode = buf.get();
gamemap = buf.get();
for (int i=0 ; i<MAXPLAYERS ; i++)
playeringame[i] = buf.get()!=0;
// load a base level (this doesn't advance the pointer?)
//G_InitNew (gameskill, gameepisode, gamemap);
// get the times
int a = C2JUtils.toUnsignedByte(buf.get());
int b = C2JUtils.toUnsignedByte(buf.get());
int c = C2JUtils.toUnsignedByte(buf.get());
// Quite anomalous, leveltime is stored as a BIG ENDIAN, 24-bit unsigned integer :-S
leveltime = (a<<16) + (b<<8) + c;
// Mark this position...
buf.mark();
buf.position(buf.limit()-1);
if (buf.get() != 0x1d) properend=false; else
properend=true;
buf.reset();
// We've loaded whatever consistutes "header" info, the rest must be unpacked by proper
// methods in the game engine itself.
}
private List<Integer> enumerateResources(ByteBuffer buffer) {
int resourceCount = (getOriginalChunkSize() - getHeaderSize()) / RESOURCE_SIZE;
List<Integer> result = new ArrayList<>(resourceCount);
int offset = this.offset + getHeaderSize();
buffer.mark();
buffer.position(offset);
for (int i = 0; i < resourceCount; ++i) {
result.add(buffer.getInt());
}
buffer.reset();
return result;
}
@Test
public void testNull() {
BigDecimal input = null;
ByteBuffer buffer = ByteBuffer.allocate(256);
buffer.mark();
bigDecimalSerializer.serialize(input, buffer);
buffer.reset();
BigDecimal output = bigDecimalSerializer.deserialize(buffer);
assertEquals(input, output);
}
public static String stringUtf8( ByteBuffer bytes ) throws InvalidDataException {
CharsetDecoder decode = Charset.forName( "UTF8" ).newDecoder();
decode.onMalformedInput( codingErrorAction );
decode.onUnmappableCharacter( codingErrorAction );
// decode.replaceWith( "X" );
String s;
try {
bytes.mark();
s = decode.decode( bytes ).toString();
bytes.reset();
} catch ( CharacterCodingException e ) {
throw new InvalidDataException( CloseFrame.NO_UTF8, e );
}
return s;
}
private void runGCMWithSeparateBuffers(int mode, ByteBuffer buffer,
ByteBuffer textBB, int txtOffset, int dataLength,
AlgorithmParameters params) throws Exception {
// take offset into account
textBB.position(txtOffset);
textBB.mark();
// first, generate the cipher text at an allocated buffer
Cipher cipher = createCipher(mode, params);
cipher.updateAAD(buffer);
buffer.flip();
ByteBuffer outBB = ByteBuffer.allocateDirect(
cipher.getOutputSize(dataLength));
cipher.doFinal(textBB, outBB);// get cipher text in outBB
outBB.flip();
// restore positions
textBB.reset();
// next, generate cipher text again in a buffer that shares content
Cipher anotherCipher = createCipher(mode, params);
anotherCipher.updateAAD(buffer);
buffer.flip();
ByteBuffer buf2 = textBB.duplicate(); // buf2 shares textBuf context
buf2.limit(txtOffset + anotherCipher.getOutputSize(dataLength));
int dataProcessed2 = anotherCipher.doFinal(textBB, buf2);
buf2.position(txtOffset);
buf2.limit(txtOffset + dataProcessed2);
if (!buf2.equals(outBB)) {
throw new RuntimeException(
"Two results are not equal, mode:" + mode);
}
}
public static String stringUtf8(ByteBuffer bytes) throws InvalidDataException {
CharsetDecoder decode = Charset.forName("UTF8").newDecoder();
decode.onMalformedInput(codingErrorAction);
decode.onUnmappableCharacter(codingErrorAction);
// decode.replaceWith( "X" );
String s;
try {
bytes.mark();
s = decode.decode(bytes).toString();
bytes.reset();
} catch (CharacterCodingException e) {
throw new InvalidDataException(CloseFrame.NO_UTF8, e);
}
return s;
}
private List<XmlAttribute> enumerateAttributes(ByteBuffer buffer) {
List<XmlAttribute> result = new ArrayList<>(attributeCount);
int offset = this.offset + getHeaderSize() + attributeStart;
int endOffset = offset + XmlAttribute.SIZE * attributeCount;
buffer.mark();
buffer.position(offset);
while (offset < endOffset) {
result.add(XmlAttribute.create(buffer, this));
offset += XmlAttribute.SIZE;
}
buffer.reset();
return result;
}
@Override
public AppendMessageResult doAppend(final long fileFromOffset, final ByteBuffer byteBuffer, final int maxBlank,
final MessageExtBatch messageExtBatch) {
byteBuffer.mark();
//physical offset
long wroteOffset = fileFromOffset + byteBuffer.position();
// Record ConsumeQueue information
keyBuilder.setLength(0);
keyBuilder.append(messageExtBatch.getTopic());
keyBuilder.append('-');
keyBuilder.append(messageExtBatch.getQueueId());
String key = keyBuilder.toString();
Long queueOffset = CommitLog.this.topicQueueTable.get(key);
if (null == queueOffset) {
queueOffset = 0L;
CommitLog.this.topicQueueTable.put(key, queueOffset);
}
long beginQueueOffset = queueOffset;
int totalMsgLen = 0;
int msgNum = 0;
msgIdBuilder.setLength(0);
final long beginTimeMills = CommitLog.this.defaultMessageStore.now();
ByteBuffer messagesByteBuff = messageExtBatch.getEncodedBuff();
this.resetByteBuffer(hostHolder, 8);
ByteBuffer storeHostBytes = messageExtBatch.getStoreHostBytes(hostHolder);
messagesByteBuff.mark();
while (messagesByteBuff.hasRemaining()) {
// 1 TOTALSIZE
final int msgPos = messagesByteBuff.position();
final int msgLen = messagesByteBuff.getInt();
final int bodyLen = msgLen - 40; //only for log, just estimate it
// Exceeds the maximum message
if (msgLen > this.maxMessageSize) {
CommitLog.log.warn("message size exceeded, msg total size: " + msgLen + ", msg body size: " + bodyLen
+ ", maxMessageSize: " + this.maxMessageSize);
return new AppendMessageResult(AppendMessageStatus.MESSAGE_SIZE_EXCEEDED);
}
totalMsgLen += msgLen;
// Determines whether there is sufficient free space
if ((totalMsgLen + END_FILE_MIN_BLANK_LENGTH) > maxBlank) {
this.resetByteBuffer(this.msgStoreItemMemory, 8);
// 1 TOTALSIZE
this.msgStoreItemMemory.putInt(maxBlank);
// 2 MAGICCODE
this.msgStoreItemMemory.putInt(CommitLog.BLANK_MAGIC_CODE);
// 3 The remaining space may be any value
//ignore previous read
messagesByteBuff.reset();
// Here the length of the specially set maxBlank
byteBuffer.reset(); //ignore the previous appended messages
byteBuffer.put(this.msgStoreItemMemory.array(), 0, 8);
return new AppendMessageResult(AppendMessageStatus.END_OF_FILE, wroteOffset, maxBlank, msgIdBuilder.toString(), messageExtBatch.getStoreTimestamp(),
beginQueueOffset, CommitLog.this.defaultMessageStore.now() - beginTimeMills);
}
//move to add queue offset and commitlog offset
messagesByteBuff.position(msgPos + 20);
messagesByteBuff.putLong(queueOffset);
messagesByteBuff.putLong(wroteOffset + totalMsgLen - msgLen);
storeHostBytes.rewind();
String msgId = MessageDecoder.createMessageId(this.msgIdMemory, storeHostBytes, wroteOffset + totalMsgLen - msgLen);
if (msgIdBuilder.length() > 0) {
msgIdBuilder.append(',').append(msgId);
} else {
msgIdBuilder.append(msgId);
}
queueOffset++;
msgNum++;
messagesByteBuff.position(msgPos + msgLen);
}
messagesByteBuff.position(0);
messagesByteBuff.limit(totalMsgLen);
byteBuffer.put(messagesByteBuff);
messageExtBatch.setEncodedBuff(null);
AppendMessageResult result = new AppendMessageResult(AppendMessageStatus.PUT_OK, wroteOffset, totalMsgLen, msgIdBuilder.toString(),
messageExtBatch.getStoreTimestamp(), beginQueueOffset, CommitLog.this.defaultMessageStore.now() - beginTimeMills);
result.setMsgNum(msgNum);
CommitLog.this.topicQueueTable.put(key, queueOffset);
return result;
}
/**
* Returns {@link ProbeResult#SUPPORTED} if the given storage appears to begin with an expected keyword
* Returning {@code SUPPORTED} from this method does not guarantee that reading or writing will succeed,
* only that there appears to be a reasonable chance of success based on a brief inspection of the storage
* header.
*
* @param connector information about the storage (URL, stream, JDBC connection, <i>etc</i>).
* @return {@link ProbeResult#SUPPORTED} if the given storage seems to be readable.
* @throws DataStoreException if an I/O or SQL error occurred.
*/
@SuppressWarnings("null")
public final ProbeResult probeContent(final StorageConnector connector) throws DataStoreException {
char[] keyword = null;
int pos = 0;
try {
final ByteBuffer buffer = connector.getStorageAs(ByteBuffer.class);
final Reader reader;
if (buffer != null) {
buffer.mark();
reader = null;
} else {
// User gave us explicitly a Reader (e.g. a StringReader wrapping a String instance).
reader = connector.getStorageAs(Reader.class);
if (reader == null) {
return ProbeResult.UNSUPPORTED_STORAGE;
}
reader.mark(READ_AHEAD_LIMIT);
}
/*
* Ignore leading spaces and comments if any, then get a keyword no longer than 'maxLength'.
* That keyword shall be followed by [ or (, ignoring whitespaces.
*/
int c;
while ((c = nextAfterSpaces(buffer, reader)) == COMMENT) {
toEndOfLine(buffer, reader);
}
int s;
if ((s = isKeywordChar(c)) >= ACCEPT) {
keyword = new char[maxLength];
do {
if (s == ACCEPT) {
if (pos >= keyword.length) {
pos = 0; // Keyword too long.
break;
}
keyword[pos++] = (char) c;
}
c = (buffer == null) ? IOUtilities.readCodePoint(reader) : buffer.hasRemaining() ? (char) buffer.get() : -1;
} while ((s = isKeywordChar(c)) >= ACCEPT);
/*
* At this point we finished to read and store the keyword.
* Verify if the keyword is followed by a character that indicate a keyword end.
*/
if (Character.isWhitespace(c)) {
c = nextAfterSpaces(buffer, reader);
}
if (!isPostKeyword(c)) {
pos = 0;
}
}
if (buffer != null) {
buffer.reset();
} else {
reader.reset();
}
if (c < 0) {
return ProbeResult.INSUFFICIENT_BYTES;
}
} catch (IOException e) {
throw new DataStoreException(e);
}
return forKeyword(keyword, pos);
}
/**
* Parses the Bitcoin transactions in a byte buffer.
*
* @param rawByteBuffer ByteBuffer from which the transactions have to be parsed
* @param noOfTransactions Number of expected transactions
*
* @return Array of transactions
*
*
*/
public List<BitcoinTransaction> parseTransactions(ByteBuffer rawByteBuffer,long noOfTransactions) {
ArrayList<BitcoinTransaction> resultTransactions = new ArrayList<>((int)noOfTransactions);
// read all transactions from ByteBuffer
for (int k=0;k<noOfTransactions;k++) {
// read version
int currentVersion=rawByteBuffer.getInt();
// read inCounter
byte[] currentInCounterVarInt=BitcoinUtil.convertVarIntByteBufferToByteArray(rawByteBuffer);
long currentNoOfInputs=BitcoinUtil.getVarInt(currentInCounterVarInt);
boolean segwit=false;
byte marker=1;
byte flag=0;
// check segwit marker
if (currentNoOfInputs==0) {
// this seems to be segwit - lets be sure
// check segwit flag
rawByteBuffer.mark();
byte segwitFlag = rawByteBuffer.get();
if (segwitFlag!=0) {
// load the real number of inputs
segwit=true;
marker=0;
flag=segwitFlag;
currentInCounterVarInt=BitcoinUtil.convertVarIntByteBufferToByteArray(rawByteBuffer);
currentNoOfInputs=BitcoinUtil.getVarInt(currentInCounterVarInt);
} else {
LOG.warn("It seems a block with 0 transaction inputs was found");
rawByteBuffer.reset();
}
}
// read inputs
List<BitcoinTransactionInput> currentTransactionInput = parseTransactionInputs(rawByteBuffer, currentNoOfInputs);
// read outCounter
byte[] currentOutCounterVarInt=BitcoinUtil.convertVarIntByteBufferToByteArray(rawByteBuffer);
long currentNoOfOutput=BitcoinUtil.getVarInt(currentOutCounterVarInt);
// read outputs
List<BitcoinTransactionOutput> currentTransactionOutput = parseTransactionOutputs(rawByteBuffer,currentNoOfOutput);
List<BitcoinScriptWitnessItem> currentListOfTransactionSegwits;
if (segwit) {
// read segwit data
// for each transaction input there is at least some segwit data item
// read scriptWitness size
currentListOfTransactionSegwits=new ArrayList<>();
for (int i=0;i<currentNoOfInputs;i++) {
// get no of witness items for input
byte[] currentWitnessCounterVarInt=BitcoinUtil.convertVarIntByteBufferToByteArray(rawByteBuffer);
long currentNoOfWitnesses=BitcoinUtil.getVarInt(currentWitnessCounterVarInt);
List<BitcoinScriptWitness> currentTransactionSegwit = new ArrayList<>((int)currentNoOfWitnesses);
for (int j=0;j<(int)currentNoOfWitnesses;j++) {
// read size of segwit script
byte[] currentTransactionSegwitScriptLength=BitcoinUtil.convertVarIntByteBufferToByteArray(rawByteBuffer);
long currentTransactionSegwitScriptSize=BitcoinUtil.getVarInt(currentTransactionSegwitScriptLength);
int currentTransactionSegwitScriptSizeInt= (int)currentTransactionSegwitScriptSize;
// read segwit script
byte[] currentTransactionInSegwitScript=new byte[currentTransactionSegwitScriptSizeInt];
rawByteBuffer.get(currentTransactionInSegwitScript,0,currentTransactionSegwitScriptSizeInt);
// add segwit
currentTransactionSegwit.add(new BitcoinScriptWitness(currentTransactionSegwitScriptLength,currentTransactionInSegwitScript));
}
currentListOfTransactionSegwits.add(new BitcoinScriptWitnessItem(currentWitnessCounterVarInt,currentTransactionSegwit));
}
} else {
currentListOfTransactionSegwits=new ArrayList<>();
}
// lock_time
int currentTransactionLockTime = rawByteBuffer.getInt();
// add transaction
resultTransactions.add(new BitcoinTransaction(marker,flag,currentVersion,currentInCounterVarInt,currentTransactionInput,currentOutCounterVarInt,currentTransactionOutput,currentListOfTransactionSegwits,currentTransactionLockTime));
}
return resultTransactions;
}
public AppendMessageResult doAppend(final long fileFromOffset, final ByteBuffer byteBuffer, final int maxBlank,
final MessageExtBatch messageExtBatch) {
if (messageExtBatch.isMultiTopic()) {
return doAppendMultiTopic(fileFromOffset, byteBuffer, maxBlank, messageExtBatch);
}
byteBuffer.mark();
//physical offset
long wroteOffset = fileFromOffset + byteBuffer.position();
// Record ConsumeQueue information
keyBuilder.setLength(0);
keyBuilder.append(messageExtBatch.getTopic());
keyBuilder.append('-');
keyBuilder.append(messageExtBatch.getQueueId());
String key = keyBuilder.toString();
Long queueOffset = CommitLog.this.topicQueueTable.get(key);
if (null == queueOffset) {
queueOffset = 0L;
CommitLog.this.topicQueueTable.put(key, queueOffset);
}
long beginQueueOffset = queueOffset;
int totalMsgLen = 0;
int msgNum = 0;
msgIdBuilder.setLength(0);
final long beginTimeMills = CommitLog.this.defaultMessageStore.now();
ByteBuffer messagesByteBuff = messageExtBatch.getEncodedBuff();
this.resetByteBuffer(hostHolder, 8);
ByteBuffer storeHostBytes = messageExtBatch.getStoreHostBytes(hostHolder);
messagesByteBuff.mark();
while (messagesByteBuff.hasRemaining()) {
// 1 TOTALSIZE
final int msgPos = messagesByteBuff.position();
final int msgLen = messagesByteBuff.getInt();
final int bodyLen = msgLen - 40; //only for log, just estimate it
// Exceeds the maximum message
if (msgLen > this.maxMessageSize) {
CommitLog.log.warn("message size exceeded, msg total size: " + msgLen + ", msg body size: " + bodyLen
+ ", maxMessageSize: " + this.maxMessageSize);
return new AppendMessageResult(AppendMessageStatus.MESSAGE_SIZE_EXCEEDED);
}
totalMsgLen += msgLen;
// Determines whether there is sufficient free space
if ((totalMsgLen + END_FILE_MIN_BLANK_LENGTH) > maxBlank) {
this.resetByteBuffer(this.msgStoreItemMemory, 8);
// 1 TOTALSIZE
this.msgStoreItemMemory.putInt(maxBlank);
// 2 MAGICCODE
this.msgStoreItemMemory.putInt(CommitLog.BLANK_MAGIC_CODE);
// 3 The remaining space may be any value
//ignore previous read
messagesByteBuff.reset();
// Here the length of the specially set maxBlank
byteBuffer.reset(); //ignore the previous appended messages
byteBuffer.put(this.msgStoreItemMemory.array(), 0, 8);
return new AppendMessageResult(AppendMessageStatus.END_OF_FILE, wroteOffset, maxBlank, msgIdBuilder.toString(), messageExtBatch.getStoreTimestamp(),
beginQueueOffset, CommitLog.this.defaultMessageStore.now() - beginTimeMills);
}
//move to add queue offset and commitlog offset
messagesByteBuff.position(msgPos + 20);
messagesByteBuff.putLong(queueOffset);
messagesByteBuff.putLong(wroteOffset + totalMsgLen - msgLen);
storeHostBytes.rewind();
String msgId = MessageDecoder.createMessageId(this.msgIdMemory, storeHostBytes, wroteOffset + totalMsgLen - msgLen);
if (msgIdBuilder.length() > 0) {
msgIdBuilder.append(',').append(msgId);
} else {
msgIdBuilder.append(msgId);
}
queueOffset++;
msgNum++;
messagesByteBuff.position(msgPos + msgLen);
}
messagesByteBuff.position(0);
messagesByteBuff.limit(totalMsgLen);
byteBuffer.put(messagesByteBuff);
messageExtBatch.setEncodedBuff(null);
AppendMessageResult result = new AppendMessageResult(AppendMessageStatus.PUT_OK, wroteOffset, totalMsgLen, msgIdBuilder.toString(),
messageExtBatch.getStoreTimestamp(), beginQueueOffset, CommitLog.this.defaultMessageStore.now() - beginTimeMills);
result.setMsgNum(msgNum);
CommitLog.this.topicQueueTable.put(key, queueOffset);
return result;
}
/**
* Writes a file.
*
* @param type
* The type of the file.
* @param id
* The id of the file.
* @param data
* A {@link ByteBuffer} containing the contents of the file.
* @throws IOException
* if an I/O error occurs.
*/
public void write(int type, int id, ByteBuffer data) throws IOException {
data.mark();
if (!write(type, id, data, true)) {
data.reset();
write(type, id, data, false);
}
}