java.nio.ByteBuffer#reset ( )源码实例Demo

下面列出了java.nio.ByteBuffer#reset ( ) 实例代码,或者点击链接到github查看源代码,也可以在右侧发表评论。

源代码1 项目: jdk8u-dev-jdk   文件: ISO2022.java
private int findDesigBuf(ByteBuffer in, byte[][] desigs) {
    if (desigs == null) return -1;
    int i = 0;
    while (i < desigs.length) {
        if (desigs[i] != null && in.remaining() >= desigs[i].length) {
            int j = 0;
            in.mark();
            while (j < desigs[i].length && in.get() == desigs[i][j]) { j++; }
            if (j == desigs[i].length)
                return i;
            in.reset();
        }
        i++;
    }
    return -1;
}
 
源代码2 项目: lavaplayer   文件: OpusPacketRouter.java
private int processFrameSize(ByteBuffer buffer) {
  int frameSize;

  if (buffer.isDirect()) {
    buffer.mark();
    buffer.get(headerBytes);
    buffer.reset();

    frameSize = OpusDecoder.getPacketFrameSize(inputFrequency, headerBytes, 0, headerBytes.length);
  } else {
    frameSize = OpusDecoder.getPacketFrameSize(inputFrequency, buffer.array(), buffer.position(), buffer.remaining());
  }

  if (frameSize == 0) {
    return 0;
  } else if (frameSize != lastFrameSize) {
    lastFrameSize = frameSize;
    inputFormat = new OpusAudioDataFormat(inputChannels, inputFrequency, frameSize);
  }

  currentFrameDuration = frameSize * 1000 / inputFrequency;
  currentTimecode += currentFrameDuration;
  return frameSize;
}
 
源代码3 项目: RDFS   文件: Text.java
/**
 * Returns the next code point at the current position in
 * the buffer. The buffer's position will be incremented.
 * Any mark set on this buffer will be changed by this method!
 */
public static int bytesToCodePoint(ByteBuffer bytes) {
  bytes.mark();
  byte b = bytes.get();
  bytes.reset();
  int extraBytesToRead = bytesFromUTF8[(b & 0xFF)];
  if (extraBytesToRead < 0) return -1; // trailing byte!
  int ch = 0;

  switch (extraBytesToRead) {
  case 5: ch += (bytes.get() & 0xFF); ch <<= 6; /* remember, illegal UTF-8 */
  case 4: ch += (bytes.get() & 0xFF); ch <<= 6; /* remember, illegal UTF-8 */
  case 3: ch += (bytes.get() & 0xFF); ch <<= 6;
  case 2: ch += (bytes.get() & 0xFF); ch <<= 6;
  case 1: ch += (bytes.get() & 0xFF); ch <<= 6;
  case 0: ch += (bytes.get() & 0xFF);
  }
  ch -= offsetsFromUTF8[extraBytesToRead];

  return ch;
}
 
源代码4 项目: RipplePower   文件: WebSocketImpl.java
private HandshakeState isFlashEdgeCase( ByteBuffer request ) throws IncompleteHandshakeException {
	request.mark();
	if( request.limit() > Draft.FLASH_POLICY_REQUEST.length ) {
		return HandshakeState.NOT_MATCHED;
	} else if( request.limit() < Draft.FLASH_POLICY_REQUEST.length ) {
		throw new IncompleteHandshakeException( Draft.FLASH_POLICY_REQUEST.length );
	} else {

		for( int flash_policy_index = 0; request.hasRemaining(); flash_policy_index++ ) {
			if( Draft.FLASH_POLICY_REQUEST[flash_policy_index] != request.get() ) {
				request.reset();
				return HandshakeState.NOT_MATCHED;
			}
		}
		return HandshakeState.MATCHED;
	}
}
 
源代码5 项目: parquet-mr   文件: TestHadoop2ByteBufferReads.java
@Test
public void testDirectReadFullyPosition() throws Exception {
  ByteBuffer readBuffer = ByteBuffer.allocateDirect(10);
  readBuffer.position(3);
  readBuffer.mark();

  FSDataInputStream hadoopStream = new FSDataInputStream(new MockHadoopInputStream(2, 3, 3));
  MockBufferReader reader = new MockBufferReader(hadoopStream);

  H2SeekableInputStream.readFully(reader, readBuffer);
  Assert.assertEquals(10, readBuffer.position());
  Assert.assertEquals(10, readBuffer.limit());

  H2SeekableInputStream.readFully(reader, readBuffer);
  Assert.assertEquals(10, readBuffer.position());
  Assert.assertEquals(10, readBuffer.limit());

  readBuffer.reset();
  Assert.assertEquals("Buffer contents should match",
      ByteBuffer.wrap(TEST_ARRAY, 0, 7), readBuffer);
}
 
源代码6 项目: hadoop   文件: Text.java
/**
 * Finds any occurence of <code>what</code> in the backing
 * buffer, starting as position <code>start</code>. The starting
 * position is measured in bytes and the return value is in
 * terms of byte position in the buffer. The backing buffer is
 * not converted to a string for this operation.
 * @return byte position of the first occurence of the search
 *         string in the UTF-8 buffer or -1 if not found
 */
public int find(String what, int start) {
  try {
    ByteBuffer src = ByteBuffer.wrap(this.bytes,0,this.length);
    ByteBuffer tgt = encode(what);
    byte b = tgt.get();
    src.position(start);
        
    while (src.hasRemaining()) {
      if (b == src.get()) { // matching first byte
        src.mark(); // save position in loop
        tgt.mark(); // save position in target
        boolean found = true;
        int pos = src.position()-1;
        while (tgt.hasRemaining()) {
          if (!src.hasRemaining()) { // src expired first
            tgt.reset();
            src.reset();
            found = false;
            break;
          }
          if (!(tgt.get() == src.get())) {
            tgt.reset();
            src.reset();
            found = false;
            break; // no match
          }
        }
        if (found) return pos;
      }
    }
    return -1; // not found
  } catch (CharacterCodingException e) {
    // can't get here
    e.printStackTrace();
    return -1;
  }
}
 
源代码7 项目: Kafdrop   文件: ByteUtils.java
private static byte[] readBytes(ByteBuffer buffer, int offset, int size)
{
   byte[] dest = new byte[size];
   if (buffer.hasArray())
   {
      System.arraycopy(buffer.array(), buffer.arrayOffset() + offset, dest, 0, size);
   }
   else
   {
      buffer.mark();
      buffer.get(dest);
      buffer.reset();
   }
   return dest;
}
 
@Override
public JournalStoreQueryResult parse(byte[] bytes) {
    ByteBuffer buffer = ByteBuffer.wrap(bytes);
    int cmd = buffer.get();
    int code = buffer.get();
    long index = buffer.getLong();
    int entriesSize = buffer.getShort();
    List<JournalEntry> entries = new ArrayList<>(entriesSize);
    byte[] headerBytes = new byte[journalEntryParser.headerLength()];
    for (int i = 0; i < entriesSize; i++) {
        buffer.mark();
        buffer.get(headerBytes);
        buffer.reset();
        JournalEntry header = journalEntryParser.parseHeader(headerBytes);
        int length = header.getLength();
        byte[] raw = new byte[length];
        buffer.get(raw);
        JournalEntry entry = journalEntryParser.parse(raw);
        entries.add(entry);
    }

    int boundariesSize = buffer.getShort();
    Map<Integer, JournalStoreQueryResult.Boundary> boundaries = new HashMap<>(boundariesSize);
    for (int i = 0; i < boundariesSize; i++) {
        boundaries.put((int) buffer.getShort(),
                new JournalStoreQueryResult.Boundary(buffer.getLong(), buffer.getLong()));
    }

    return new JournalStoreQueryResult(entries, boundaries, cmd, index, code);

}
 
源代码9 项目: mochadoom   文件: DoomSaveGame.java
@Override
public void unpack(ByteBuffer buf) throws IOException {
    name=DoomBuffer.getNullTerminatedString(buf, SAVESTRINGSIZE);
    vcheck=DoomBuffer.getNullTerminatedString(buf, VERSIONSIZE);
    String vcheckb= ("version "+VERSION);
    // no more unpacking, and report it.
    if (wrongversion = !(vcheckb.equalsIgnoreCase(vcheck))) return;
    gameskill = buf.get(); 
    gameepisode = buf.get();
    gamemap = buf.get();
    
    for (int i=0 ; i<MAXPLAYERS ; i++) 
    playeringame[i] = buf.get()!=0; 

    // load a base level (this doesn't advance the pointer?) 
    //G_InitNew (gameskill, gameepisode, gamemap); 
 
    // get the times 
    int a = C2JUtils.toUnsignedByte(buf.get()); 
    int b = C2JUtils.toUnsignedByte(buf.get());
    int c =  C2JUtils.toUnsignedByte(buf.get());
    // Quite anomalous, leveltime is stored as a BIG ENDIAN, 24-bit unsigned integer :-S
    leveltime = (a<<16) + (b<<8) + c; 

    // Mark this position...
    buf.mark();
    buf.position(buf.limit()-1);
    if (buf.get() != 0x1d) properend=false; else
        properend=true;
    buf.reset();
        
    // We've loaded whatever consistutes "header" info, the rest must be unpacked by proper
    // methods in the game engine itself.
}
 
源代码10 项目: apkfile   文件: XmlResourceMapChunk.java
private List<Integer> enumerateResources(ByteBuffer buffer) {
    int resourceCount = (getOriginalChunkSize() - getHeaderSize()) / RESOURCE_SIZE;
    List<Integer> result = new ArrayList<>(resourceCount);
    int offset = this.offset + getHeaderSize();
    buffer.mark();
    buffer.position(offset);

    for (int i = 0; i < resourceCount; ++i) {
        result.add(buffer.getInt());
    }

    buffer.reset();
    return result;
}
 
@Test
public void testNull() {
    BigDecimal input = null;
    ByteBuffer buffer = ByteBuffer.allocate(256);
    buffer.mark();
    bigDecimalSerializer.serialize(input, buffer);
    buffer.reset();
    BigDecimal output = bigDecimalSerializer.deserialize(buffer);
    assertEquals(input, output);
}
 
源代码12 项目: Slyther   文件: Charsetfunctions.java
public static String stringUtf8( ByteBuffer bytes ) throws InvalidDataException {
	CharsetDecoder decode = Charset.forName( "UTF8" ).newDecoder();
	decode.onMalformedInput( codingErrorAction );
	decode.onUnmappableCharacter( codingErrorAction );
	// decode.replaceWith( "X" );
	String s;
	try {
		bytes.mark();
		s = decode.decode( bytes ).toString();
		bytes.reset();
	} catch ( CharacterCodingException e ) {
		throw new InvalidDataException( CloseFrame.NO_UTF8, e );
	}
	return s;
}
 
源代码13 项目: openjdk-jdk9   文件: SameBuffer.java
private void runGCMWithSeparateBuffers(int mode, ByteBuffer buffer,
        ByteBuffer textBB, int txtOffset, int dataLength,
        AlgorithmParameters params) throws Exception {
    // take offset into account
    textBB.position(txtOffset);
    textBB.mark();

    // first, generate the cipher text at an allocated buffer
    Cipher cipher = createCipher(mode, params);
    cipher.updateAAD(buffer);
    buffer.flip();
    ByteBuffer outBB = ByteBuffer.allocateDirect(
            cipher.getOutputSize(dataLength));

    cipher.doFinal(textBB, outBB);// get cipher text in outBB
    outBB.flip();

    // restore positions
    textBB.reset();

    // next, generate cipher text again in a buffer that shares content
    Cipher anotherCipher = createCipher(mode, params);
    anotherCipher.updateAAD(buffer);
    buffer.flip();
    ByteBuffer buf2 = textBB.duplicate(); // buf2 shares textBuf context
    buf2.limit(txtOffset + anotherCipher.getOutputSize(dataLength));
    int dataProcessed2 = anotherCipher.doFinal(textBB, buf2);
    buf2.position(txtOffset);
    buf2.limit(txtOffset + dataProcessed2);

    if (!buf2.equals(outBB)) {
        throw new RuntimeException(
                "Two results are not equal, mode:" + mode);
    }
}
 
源代码14 项目: alipay-sdk-java-all   文件: Charsetfunctions.java
public static String stringUtf8(ByteBuffer bytes) throws InvalidDataException {
    CharsetDecoder decode = Charset.forName("UTF8").newDecoder();
    decode.onMalformedInput(codingErrorAction);
    decode.onUnmappableCharacter(codingErrorAction);
    // decode.replaceWith( "X" );
    String s;
    try {
        bytes.mark();
        s = decode.decode(bytes).toString();
        bytes.reset();
    } catch (CharacterCodingException e) {
        throw new InvalidDataException(CloseFrame.NO_UTF8, e);
    }
    return s;
}
 
源代码15 项目: android-arscblamer   文件: XmlStartElementChunk.java
private List<XmlAttribute> enumerateAttributes(ByteBuffer buffer) {
  List<XmlAttribute> result = new ArrayList<>(attributeCount);
  int offset = this.offset + getHeaderSize() + attributeStart;
  int endOffset = offset + XmlAttribute.SIZE * attributeCount;
  buffer.mark();
  buffer.position(offset);

  while (offset < endOffset) {
    result.add(XmlAttribute.create(buffer, this));
    offset += XmlAttribute.SIZE;
  }

  buffer.reset();
  return result;
}
 
源代码16 项目: rocketmq-read   文件: CommitLog.java
@Override
public AppendMessageResult doAppend(final long fileFromOffset, final ByteBuffer byteBuffer, final int maxBlank,
    final MessageExtBatch messageExtBatch) {
    byteBuffer.mark();
    //physical offset
    long wroteOffset = fileFromOffset + byteBuffer.position();
    // Record ConsumeQueue information
    keyBuilder.setLength(0);
    keyBuilder.append(messageExtBatch.getTopic());
    keyBuilder.append('-');
    keyBuilder.append(messageExtBatch.getQueueId());
    String key = keyBuilder.toString();
    Long queueOffset = CommitLog.this.topicQueueTable.get(key);
    if (null == queueOffset) {
        queueOffset = 0L;
        CommitLog.this.topicQueueTable.put(key, queueOffset);
    }
    long beginQueueOffset = queueOffset;
    int totalMsgLen = 0;
    int msgNum = 0;
    msgIdBuilder.setLength(0);
    final long beginTimeMills = CommitLog.this.defaultMessageStore.now();
    ByteBuffer messagesByteBuff = messageExtBatch.getEncodedBuff();
    this.resetByteBuffer(hostHolder, 8);
    ByteBuffer storeHostBytes = messageExtBatch.getStoreHostBytes(hostHolder);
    messagesByteBuff.mark();
    while (messagesByteBuff.hasRemaining()) {
        // 1 TOTALSIZE
        final int msgPos = messagesByteBuff.position();
        final int msgLen = messagesByteBuff.getInt();
        final int bodyLen = msgLen - 40; //only for log, just estimate it
        // Exceeds the maximum message
        if (msgLen > this.maxMessageSize) {
            CommitLog.log.warn("message size exceeded, msg total size: " + msgLen + ", msg body size: " + bodyLen
                + ", maxMessageSize: " + this.maxMessageSize);
            return new AppendMessageResult(AppendMessageStatus.MESSAGE_SIZE_EXCEEDED);
        }
        totalMsgLen += msgLen;
        // Determines whether there is sufficient free space
        if ((totalMsgLen + END_FILE_MIN_BLANK_LENGTH) > maxBlank) {
            this.resetByteBuffer(this.msgStoreItemMemory, 8);
            // 1 TOTALSIZE
            this.msgStoreItemMemory.putInt(maxBlank);
            // 2 MAGICCODE
            this.msgStoreItemMemory.putInt(CommitLog.BLANK_MAGIC_CODE);
            // 3 The remaining space may be any value
            //ignore previous read
            messagesByteBuff.reset();
            // Here the length of the specially set maxBlank
            byteBuffer.reset(); //ignore the previous appended messages
            byteBuffer.put(this.msgStoreItemMemory.array(), 0, 8);
            return new AppendMessageResult(AppendMessageStatus.END_OF_FILE, wroteOffset, maxBlank, msgIdBuilder.toString(), messageExtBatch.getStoreTimestamp(),
                beginQueueOffset, CommitLog.this.defaultMessageStore.now() - beginTimeMills);
        }
        //move to add queue offset and commitlog offset
        messagesByteBuff.position(msgPos + 20);
        messagesByteBuff.putLong(queueOffset);
        messagesByteBuff.putLong(wroteOffset + totalMsgLen - msgLen);

        storeHostBytes.rewind();
        String msgId = MessageDecoder.createMessageId(this.msgIdMemory, storeHostBytes, wroteOffset + totalMsgLen - msgLen);
        if (msgIdBuilder.length() > 0) {
            msgIdBuilder.append(',').append(msgId);
        } else {
            msgIdBuilder.append(msgId);
        }
        queueOffset++;
        msgNum++;
        messagesByteBuff.position(msgPos + msgLen);
    }

    messagesByteBuff.position(0);
    messagesByteBuff.limit(totalMsgLen);
    byteBuffer.put(messagesByteBuff);
    messageExtBatch.setEncodedBuff(null);
    AppendMessageResult result = new AppendMessageResult(AppendMessageStatus.PUT_OK, wroteOffset, totalMsgLen, msgIdBuilder.toString(),
        messageExtBatch.getStoreTimestamp(), beginQueueOffset, CommitLog.this.defaultMessageStore.now() - beginTimeMills);
    result.setMsgNum(msgNum);
    CommitLog.this.topicQueueTable.put(key, queueOffset);

    return result;
}
 
源代码17 项目: sis   文件: FirstKeywordPeek.java
/**
 * Returns {@link ProbeResult#SUPPORTED} if the given storage appears to begin with an expected keyword
 * Returning {@code SUPPORTED} from this method does not guarantee that reading or writing will succeed,
 * only that there appears to be a reasonable chance of success based on a brief inspection of the storage
 * header.
 *
 * @param  connector information about the storage (URL, stream, JDBC connection, <i>etc</i>).
 * @return {@link ProbeResult#SUPPORTED} if the given storage seems to be readable.
 * @throws DataStoreException if an I/O or SQL error occurred.
 */
@SuppressWarnings("null")
public final ProbeResult probeContent(final StorageConnector connector) throws DataStoreException {
    char[] keyword = null;
    int pos = 0;
    try {
        final ByteBuffer buffer = connector.getStorageAs(ByteBuffer.class);
        final Reader reader;
        if (buffer != null) {
            buffer.mark();
            reader = null;
        } else {
            // User gave us explicitly a Reader (e.g. a StringReader wrapping a String instance).
            reader = connector.getStorageAs(Reader.class);
            if (reader == null) {
                return ProbeResult.UNSUPPORTED_STORAGE;
            }
            reader.mark(READ_AHEAD_LIMIT);
        }
        /*
         * Ignore leading spaces and comments if any, then get a keyword no longer than 'maxLength'.
         * That keyword shall be followed by [ or (, ignoring whitespaces.
         */
        int c;
        while ((c = nextAfterSpaces(buffer, reader)) == COMMENT) {
            toEndOfLine(buffer, reader);
        }
        int s;
        if ((s = isKeywordChar(c)) >= ACCEPT) {
            keyword = new char[maxLength];
            do {
                if (s == ACCEPT) {
                    if (pos >= keyword.length) {
                        pos = 0;                // Keyword too long.
                        break;
                    }
                    keyword[pos++] = (char) c;
                }
                c = (buffer == null) ? IOUtilities.readCodePoint(reader) : buffer.hasRemaining() ? (char) buffer.get() : -1;
            } while ((s = isKeywordChar(c)) >= ACCEPT);
            /*
             * At this point we finished to read and store the keyword.
             * Verify if the keyword is followed by a character that indicate a keyword end.
             */
            if (Character.isWhitespace(c)) {
                c = nextAfterSpaces(buffer, reader);
            }
            if (!isPostKeyword(c)) {
                pos = 0;
            }
        }
        if (buffer != null) {
            buffer.reset();
        } else {
            reader.reset();
        }
        if (c < 0) {
            return ProbeResult.INSUFFICIENT_BYTES;
        }
    } catch (IOException e) {
        throw new DataStoreException(e);
    }
    return forKeyword(keyword, pos);
}
 
源代码18 项目: hadoopcryptoledger   文件: BitcoinBlockReader.java
/**
* Parses the Bitcoin transactions in a byte buffer. 
*
* @param rawByteBuffer ByteBuffer from which the transactions have to be parsed
* @param noOfTransactions Number of expected transactions
*
* @return Array of transactions
*
*
*/

public List<BitcoinTransaction> parseTransactions(ByteBuffer rawByteBuffer,long noOfTransactions) {
	ArrayList<BitcoinTransaction> resultTransactions = new ArrayList<>((int)noOfTransactions);
	// read all transactions from ByteBuffer
	for (int k=0;k<noOfTransactions;k++) {
		// read version
		int currentVersion=rawByteBuffer.getInt();
		// read inCounter
		byte[] currentInCounterVarInt=BitcoinUtil.convertVarIntByteBufferToByteArray(rawByteBuffer);
		
		long currentNoOfInputs=BitcoinUtil.getVarInt(currentInCounterVarInt);
		boolean segwit=false;
		byte marker=1;
		byte flag=0;
		// check segwit marker
		if (currentNoOfInputs==0) {
			// this seems to be segwit - lets be sure
			// check segwit flag
			rawByteBuffer.mark();
			byte segwitFlag = rawByteBuffer.get();
			if (segwitFlag!=0) {
				// load the real number of inputs
				segwit=true;
				marker=0;
				flag=segwitFlag;
				currentInCounterVarInt=BitcoinUtil.convertVarIntByteBufferToByteArray(rawByteBuffer);
				currentNoOfInputs=BitcoinUtil.getVarInt(currentInCounterVarInt);
			} else {
				LOG.warn("It seems a block with 0 transaction inputs was found");
				rawByteBuffer.reset();
			}
		}
		// read inputs
		List<BitcoinTransactionInput> currentTransactionInput = parseTransactionInputs(rawByteBuffer, currentNoOfInputs);
				
		// read outCounter
		byte[] currentOutCounterVarInt=BitcoinUtil.convertVarIntByteBufferToByteArray(rawByteBuffer);
		long currentNoOfOutput=BitcoinUtil.getVarInt(currentOutCounterVarInt);
		// read outputs
		List<BitcoinTransactionOutput> currentTransactionOutput = parseTransactionOutputs(rawByteBuffer,currentNoOfOutput);
				
		List<BitcoinScriptWitnessItem> currentListOfTransactionSegwits;
		if (segwit) {
			// read segwit data
			// for each transaction input there is at least some segwit data item
			// read scriptWitness size
			
			
			 currentListOfTransactionSegwits=new ArrayList<>();
			for (int i=0;i<currentNoOfInputs;i++) {
				// get no of witness items for input
				byte[] currentWitnessCounterVarInt=BitcoinUtil.convertVarIntByteBufferToByteArray(rawByteBuffer);
				long currentNoOfWitnesses=BitcoinUtil.getVarInt(currentWitnessCounterVarInt);
				List<BitcoinScriptWitness> currentTransactionSegwit = new ArrayList<>((int)currentNoOfWitnesses);
				for (int j=0;j<(int)currentNoOfWitnesses;j++) {
					// read size of segwit script
					byte[] currentTransactionSegwitScriptLength=BitcoinUtil.convertVarIntByteBufferToByteArray(rawByteBuffer);
					long currentTransactionSegwitScriptSize=BitcoinUtil.getVarInt(currentTransactionSegwitScriptLength);
					int currentTransactionSegwitScriptSizeInt= (int)currentTransactionSegwitScriptSize;
					// read segwit script
					byte[] currentTransactionInSegwitScript=new byte[currentTransactionSegwitScriptSizeInt];
					rawByteBuffer.get(currentTransactionInSegwitScript,0,currentTransactionSegwitScriptSizeInt);
					// add segwit
					currentTransactionSegwit.add(new BitcoinScriptWitness(currentTransactionSegwitScriptLength,currentTransactionInSegwitScript));
				}
				currentListOfTransactionSegwits.add(new BitcoinScriptWitnessItem(currentWitnessCounterVarInt,currentTransactionSegwit));
			}
		} else {
			currentListOfTransactionSegwits=new  ArrayList<>();
		}
		// lock_time
		int currentTransactionLockTime = rawByteBuffer.getInt();
		// add transaction
		resultTransactions.add(new BitcoinTransaction(marker,flag,currentVersion,currentInCounterVarInt,currentTransactionInput,currentOutCounterVarInt,currentTransactionOutput,currentListOfTransactionSegwits,currentTransactionLockTime));
	}
	return resultTransactions;
}
 
源代码19 项目: DDMQ   文件: CommitLog.java
public AppendMessageResult doAppend(final long fileFromOffset, final ByteBuffer byteBuffer, final int maxBlank,
    final MessageExtBatch messageExtBatch) {
    if (messageExtBatch.isMultiTopic()) {
        return doAppendMultiTopic(fileFromOffset, byteBuffer, maxBlank, messageExtBatch);
    }
    byteBuffer.mark();
    //physical offset
    long wroteOffset = fileFromOffset + byteBuffer.position();
    // Record ConsumeQueue information
    keyBuilder.setLength(0);
    keyBuilder.append(messageExtBatch.getTopic());
    keyBuilder.append('-');
    keyBuilder.append(messageExtBatch.getQueueId());
    String key = keyBuilder.toString();
    Long queueOffset = CommitLog.this.topicQueueTable.get(key);
    if (null == queueOffset) {
        queueOffset = 0L;
        CommitLog.this.topicQueueTable.put(key, queueOffset);
    }
    long beginQueueOffset = queueOffset;
    int totalMsgLen = 0;
    int msgNum = 0;
    msgIdBuilder.setLength(0);
    final long beginTimeMills = CommitLog.this.defaultMessageStore.now();
    ByteBuffer messagesByteBuff = messageExtBatch.getEncodedBuff();
    this.resetByteBuffer(hostHolder, 8);
    ByteBuffer storeHostBytes = messageExtBatch.getStoreHostBytes(hostHolder);
    messagesByteBuff.mark();
    while (messagesByteBuff.hasRemaining()) {
        // 1 TOTALSIZE
        final int msgPos = messagesByteBuff.position();
        final int msgLen = messagesByteBuff.getInt();
        final int bodyLen = msgLen - 40; //only for log, just estimate it
        // Exceeds the maximum message
        if (msgLen > this.maxMessageSize) {
            CommitLog.log.warn("message size exceeded, msg total size: " + msgLen + ", msg body size: " + bodyLen
                + ", maxMessageSize: " + this.maxMessageSize);
            return new AppendMessageResult(AppendMessageStatus.MESSAGE_SIZE_EXCEEDED);
        }
        totalMsgLen += msgLen;
        // Determines whether there is sufficient free space
        if ((totalMsgLen + END_FILE_MIN_BLANK_LENGTH) > maxBlank) {
            this.resetByteBuffer(this.msgStoreItemMemory, 8);
            // 1 TOTALSIZE
            this.msgStoreItemMemory.putInt(maxBlank);
            // 2 MAGICCODE
            this.msgStoreItemMemory.putInt(CommitLog.BLANK_MAGIC_CODE);
            // 3 The remaining space may be any value
            //ignore previous read
            messagesByteBuff.reset();
            // Here the length of the specially set maxBlank
            byteBuffer.reset(); //ignore the previous appended messages
            byteBuffer.put(this.msgStoreItemMemory.array(), 0, 8);
            return new AppendMessageResult(AppendMessageStatus.END_OF_FILE, wroteOffset, maxBlank, msgIdBuilder.toString(), messageExtBatch.getStoreTimestamp(),
                beginQueueOffset, CommitLog.this.defaultMessageStore.now() - beginTimeMills);
        }
        //move to add queue offset and commitlog offset
        messagesByteBuff.position(msgPos + 20);
        messagesByteBuff.putLong(queueOffset);
        messagesByteBuff.putLong(wroteOffset + totalMsgLen - msgLen);

        storeHostBytes.rewind();
        String msgId = MessageDecoder.createMessageId(this.msgIdMemory, storeHostBytes, wroteOffset + totalMsgLen - msgLen);
        if (msgIdBuilder.length() > 0) {
            msgIdBuilder.append(',').append(msgId);
        } else {
            msgIdBuilder.append(msgId);
        }
        queueOffset++;
        msgNum++;
        messagesByteBuff.position(msgPos + msgLen);
    }

    messagesByteBuff.position(0);
    messagesByteBuff.limit(totalMsgLen);
    byteBuffer.put(messagesByteBuff);
    messageExtBatch.setEncodedBuff(null);
    AppendMessageResult result = new AppendMessageResult(AppendMessageStatus.PUT_OK, wroteOffset, totalMsgLen, msgIdBuilder.toString(),
        messageExtBatch.getStoreTimestamp(), beginQueueOffset, CommitLog.this.defaultMessageStore.now() - beginTimeMills);
    result.setMsgNum(msgNum);
    CommitLog.this.topicQueueTable.put(key, queueOffset);

    return result;
}
 
源代码20 项目: OpenRS   文件: FileStore.java
/**
 * Writes a file.
 * 
 * @param type
 *            The type of the file.
 * @param id
 *            The id of the file.
 * @param data
 *            A {@link ByteBuffer} containing the contents of the file.
 * @throws IOException
 *             if an I/O error occurs.
 */
public void write(int type, int id, ByteBuffer data) throws IOException {
	data.mark();
	if (!write(type, id, data, true)) {
		data.reset();
		write(type, id, data, false);
	}
}