下面列出了io.netty.buffer.Unpooled#directBuffer ( ) 实例代码,或者点击链接到github查看源代码,也可以在右侧发表评论。
@Setup
public void setup() {
// Use buffer sizes that will also allow to write UTF-8 without grow the buffer
buffer = Unpooled.directBuffer(512);
wrapped = Unpooled.unreleasableBuffer(Unpooled.directBuffer(512));
asciiSequence = new StringBuilder(128);
for (int i = 0; i < 128; i++) {
asciiSequence.append('a');
}
ascii = asciiSequence.toString();
// Generate some mixed UTF-8 String for benchmark
utf8Sequence = new StringBuilder(128);
char[] chars = "Some UTF-8 like äÄ∏ŒŒ".toCharArray();
for (int i = 0; i < 128; i++) {
utf8Sequence.append(chars[i % chars.length]);
}
utf8 = utf8Sequence.toString();
asciiSequence = utf8Sequence;
asciiBuffer = Unpooled.copiedBuffer(ascii, CharsetUtil.US_ASCII);
utf8Buffer = Unpooled.copiedBuffer(utf8, CharsetUtil.UTF_8);
}
@Test
public void testGetBytesToWritableBufferThatIsDirect() {
byte[] data = new byte[] { 0, 1, 2, 3, 4 };
ByteBuf byteBuffer = Unpooled.directBuffer(data.length, data.length);
byteBuffer.writeBytes(data);
ProtonReadableBufferImpl buffer = new ProtonReadableBufferImpl(byteBuffer);
ByteBuf targetBuffer = Unpooled.buffer(data.length, data.length);
ProtonWritableBufferImpl target = new ProtonWritableBufferImpl(targetBuffer);
buffer.get(target);
assertFalse(buffer.hasRemaining());
for (int i = 0; i < data.length; i++) {
assertEquals(data[i], target.getBuffer().readByte());
}
}
@Test
public void testGetBytesToWritableBufferThatIsDirect() {
byte[] data = new byte[] {0, 1, 2, 3, 4};
ByteBuf byteBuffer = Unpooled.directBuffer(data.length, data.length);
byteBuffer.writeBytes(data);
NettyReadable buffer = new NettyReadable(byteBuffer);
ByteBuf targetBuffer = Unpooled.buffer(data.length, data.length);
NettyWritable target = new NettyWritable(targetBuffer);
buffer.get(target);
assertFalse(buffer.hasRemaining());
for (int i = 0; i < data.length; i++) {
assertEquals(data[i], target.getByteBuf().readByte());
}
}
@Test
public void testEncodeUnCompressed() {
String sample = randomString(MIN_BLOCK_SIZE);
EmbeddedChannel channel = new EmbeddedChannel(encoder);
ByteBuf byteBuf = Unpooled.directBuffer(MIN_BLOCK_SIZE);
byte[] sampleBytes = sample.getBytes();
byteBuf.ensureWritable(sampleBytes.length);
byteBuf.writeBytes(sampleBytes);
Assert.assertTrue(channel.writeOutbound(byteBuf));
ByteBuf target = channel.readOutbound();
Assert.assertNotNull(target);
target.readerIndex(HEADER_LENGTH);
Assert.assertEquals(sample, target.toString(Charset.defaultCharset()));
}
@Setup
public void setup() {
// Use buffer sizes that will also allow to write UTF-8 without grow the buffer
buffer = Unpooled.directBuffer(512);
wrapped = Unpooled.unreleasableBuffer(Unpooled.directBuffer(512));
asciiSequence = new StringBuilder(128);
for (int i = 0; i < 128; i++) {
asciiSequence.append('a');
}
ascii = asciiSequence.toString();
// Generate some mixed UTF-8 String for benchmark
utf8Sequence = new StringBuilder(128);
char[] chars = "Some UTF-8 like äÄ∏ŒŒ".toCharArray();
for (int i = 0; i < 128; i++) {
utf8Sequence.append(chars[i % chars.length]);
}
utf8 = utf8Sequence.toString();
asciiSequence = utf8Sequence;
}
@Test
public void testGetBytesToWritableBufferThatIsDirect() {
byte[] data = new byte[] { 0, 1, 2, 3, 4};
ByteBuf byteBuffer = Unpooled.directBuffer(data.length, data.length);
byteBuffer.writeBytes(data);
AmqpReadableBuffer buffer = new AmqpReadableBuffer(byteBuffer);
ByteBuf targetBuffer = Unpooled.buffer(data.length, data.length);
AmqpWritableBuffer target = new AmqpWritableBuffer(targetBuffer);
buffer.get(target);
assertFalse(buffer.hasRemaining());
for (int i = 0; i < data.length; i++) {
assertEquals(data[i], target.getBuffer().readByte());
}
}
@Setup
public void setup() {
byteBuffer = ByteBuffer.allocate(8);
directByteBuffer = ByteBuffer.allocateDirect(8);
buffer = Unpooled.buffer(8);
directBuffer = Unpooled.directBuffer(8);
directBufferPooled = PooledByteBufAllocator.DEFAULT.directBuffer(8);
}
@Setup
public void setup() {
swappedByteBuf = new SwappedByteBuf(Unpooled.directBuffer(8));
unsafeSwappedByteBuf = Unpooled.directBuffer(8).order(ByteOrder.LITTLE_ENDIAN);
if (unsafeSwappedByteBuf.getClass().equals(SwappedByteBuf.class)) {
throw new IllegalStateException("Should not use " + SwappedByteBuf.class.getSimpleName());
}
}
@DataPoints("largeData")
public static ByteBuf[] largeData() {
ByteBuf heap = Unpooled.wrappedBuffer(BYTES_LARGE);
ByteBuf direct = Unpooled.directBuffer(BYTES_LARGE.length);
direct.writeBytes(BYTES_LARGE);
return new ByteBuf[] {heap, direct};
}
@DataPoints("smallData")
public static ByteBuf[] smallData() {
ByteBuf heap = Unpooled.wrappedBuffer(compressedBytesSmall);
ByteBuf direct = Unpooled.directBuffer(compressedBytesSmall.length);
direct.writeBytes(compressedBytesSmall);
return new ByteBuf[] {heap, direct};
}
@DataPoints("largeData")
public static ByteBuf[] largeData() {
ByteBuf heap = Unpooled.wrappedBuffer(compressedBytesLarge);
ByteBuf direct = Unpooled.directBuffer(compressedBytesLarge.length);
direct.writeBytes(compressedBytesLarge);
return new ByteBuf[] {heap, direct};
}
@Test
public void testArrayAccessWhenNoArray() {
ByteBuf byteBuffer = Unpooled.directBuffer();
ProtonReadableBufferImpl buffer = new ProtonReadableBufferImpl(byteBuffer);
assertFalse(buffer.hasArray());
}
/**
* Read data and return as a byte array.
* @param offset File offset to read from
* @param len Number of bytes to read
* @return A CompletableFuture that will be carry the byte[] result when the read is completed
*/
default CompletableFuture<byte[]> readFully(long offset, int len) {
final ByteBuf buf = Unpooled.directBuffer(len);
CompletableFuture<Void> innerFuture = readFully(offset, buf, 0, len);
return innerFuture.thenApply((v) -> {
byte[] bytes = new byte[len];
buf.getBytes(0, bytes, 0, len);
return bytes;
}).whenComplete((a,b) -> buf.release());
}
private ByteBuf getInputByteBuf(String sample) {
ByteBuf byteBuf = Unpooled.directBuffer(DEFAULT_BLOCK_SIZE);
byte[] sampleBytes = sample.getBytes();
byteBuf.ensureWritable(sampleBytes.length);
byteBuf.writeBytes(sampleBytes);
return byteBuf;
}
@Test
public void testArrayAccessWhenNoArray() {
ByteBuf byteBuffer = Unpooled.directBuffer();
NettyReadable buffer = new NettyReadable(byteBuffer);
assertFalse(buffer.hasArray());
}
@Test
public void createUnpooledDirectBufferTest() {
ByteBuf buf = Unpooled.directBuffer(11);
testBuffer(buf, true);
}
private void testSeekableStream(SeekableInputStream inputStream) throws IOException {
int streamPos = 0;
assertEquals(streamPos, inputStream.getPos());
// Read some bytes from the start
final byte[] buf = new byte[1000];
inputStream.readFully(buf, 0, 88);
compareData(buf, 0, streamPos, 88);
streamPos += 88;
assertEquals(streamPos, inputStream.getPos());
final byte[] shortBuf = new byte[17];
inputStream.readFully(shortBuf);
compareData(shortBuf, 0, streamPos, 17);
streamPos += 17;
assertEquals(streamPos, inputStream.getPos());
// test ByteBuffer interfaces
final ByteBuffer shortByteBuf = ByteBuffer.allocate(25);
inputStream.read(shortByteBuf);
compareData(shortByteBuf.array(), 0, streamPos, 25);
streamPos += 25;
assertEquals(streamPos, inputStream.getPos());
final ByteBuffer shortByteBuf2 = ByteBuffer.allocateDirect(71);
inputStream.read(shortByteBuf2);
final ByteBuf compareBuf = Unpooled.directBuffer(100);
shortByteBuf2.flip();
compareBuf.writeBytes(shortByteBuf2);
compareData(compareBuf, streamPos, 71);
streamPos += 71;
assertEquals(streamPos, inputStream.getPos());
final ByteBuffer shortByteBuf3 = ByteBuffer.allocate(66);
inputStream.readFully(shortByteBuf3);
compareData(shortByteBuf3.array(), 0, streamPos, 66);
streamPos += 66;
assertEquals(streamPos, inputStream.getPos());
// Test plain old read interface
buf[0] = (byte) inputStream.read();
buf[1] = (byte) inputStream.read();
buf[2] = (byte) inputStream.read();
compareData(buf, 0, streamPos, 3);
streamPos += 3;
assertEquals(streamPos, inputStream.getPos());
// Skip some, then read
streamPos += 50; // skip 50 bytes
inputStream.seek(streamPos);
inputStream.readFully(buf, 0, 37);
compareData(buf, 0, streamPos, 37);
streamPos += 37;
assertEquals(streamPos, inputStream.getPos());
// skip to near the end, then read
streamPos = TEST_DATA_SIZE - 100;
inputStream.seek(streamPos);
inputStream.readFully(buf, 0, 100);
compareData(buf, 0, streamPos,100);
streamPos += 100;
assertEquals(streamPos, inputStream.getPos());
}
@Test
public void createUnpooledDirectBufferTest() {
ByteBuf buf = Unpooled.directBuffer(11);
testBuffer(buf, true);
}
@Test
public void createUnpooledDirectBufferTest() {
ByteBuf buf = Unpooled.directBuffer(11);
testBuffer(buf, true);
}
private ByteBuf processMessageChunk(ByteBuf compressedPayload, MessageMetadata msgMetadata, MessageIdImpl msgId,
MessageIdData messageId, ClientCnx cnx) {
// Lazy task scheduling to expire incomplete chunk message
if (!expireChunkMessageTaskScheduled && expireTimeOfIncompleteChunkedMessageMillis > 0) {
((ScheduledExecutorService) listenerExecutor).scheduleAtFixedRate(() -> {
removeExpireIncompleteChunkedMessages();
}, expireTimeOfIncompleteChunkedMessageMillis, expireTimeOfIncompleteChunkedMessageMillis,
TimeUnit.MILLISECONDS);
expireChunkMessageTaskScheduled = true;
}
if (msgMetadata.getChunkId() == 0) {
ByteBuf chunkedMsgBuffer = Unpooled.directBuffer(msgMetadata.getTotalChunkMsgSize(),
msgMetadata.getTotalChunkMsgSize());
int totalChunks = msgMetadata.getNumChunksFromMsg();
chunkedMessagesMap.computeIfAbsent(msgMetadata.getUuid(),
(key) -> ChunkedMessageCtx.get(totalChunks, chunkedMsgBuffer));
pendingChunckedMessageCount++;
if (maxPendingChuckedMessage > 0 && pendingChunckedMessageCount > maxPendingChuckedMessage) {
removeOldestPendingChunkedMessage();
}
pendingChunckedMessageUuidQueue.add(msgMetadata.getUuid());
}
ChunkedMessageCtx chunkedMsgCtx = chunkedMessagesMap.get(msgMetadata.getUuid());
// discard message if chunk is out-of-order
if (chunkedMsgCtx == null || chunkedMsgCtx.chunkedMsgBuffer == null
|| msgMetadata.getChunkId() != (chunkedMsgCtx.lastChunkedMessageId + 1)
|| msgMetadata.getChunkId() >= msgMetadata.getTotalChunkMsgSize()) {
// means we lost the first chunk: should never happen
log.info("Received unexpected chunk messageId {}, last-chunk-id{}, chunkId = {}, total-chunks {}", msgId,
(chunkedMsgCtx != null ? chunkedMsgCtx.lastChunkedMessageId : null), msgMetadata.getChunkId(),
msgMetadata.getTotalChunkMsgSize());
if (chunkedMsgCtx != null) {
if (chunkedMsgCtx.chunkedMsgBuffer != null) {
ReferenceCountUtil.safeRelease(chunkedMsgCtx.chunkedMsgBuffer);
}
chunkedMsgCtx.recycle();
}
chunkedMessagesMap.remove(msgMetadata.getUuid());
compressedPayload.release();
increaseAvailablePermits(cnx);
if (expireTimeOfIncompleteChunkedMessageMillis > 0
&& System.currentTimeMillis() > (msgMetadata.getPublishTime()
+ expireTimeOfIncompleteChunkedMessageMillis)) {
doAcknowledge(msgId, AckType.Individual, Collections.emptyMap(), null);
} else {
trackMessage(msgId);
}
return null;
}
chunkedMsgCtx.chunkedMessageIds[msgMetadata.getChunkId()] = msgId;
// append the chunked payload and update lastChunkedMessage-id
chunkedMsgCtx.chunkedMsgBuffer.writeBytes(compressedPayload);
chunkedMsgCtx.lastChunkedMessageId = msgMetadata.getChunkId();
// if final chunk is not received yet then release payload and return
if (msgMetadata.getChunkId() != (msgMetadata.getNumChunksFromMsg() - 1)) {
compressedPayload.release();
increaseAvailablePermits(cnx);
return null;
}
// last chunk received: so, stitch chunked-messages and clear up chunkedMsgBuffer
if (log.isDebugEnabled()) {
log.debug("Chunked message completed chunkId {}, total-chunks {}, msgId {} sequenceId {}",
msgMetadata.getChunkId(), msgMetadata.getNumChunksFromMsg(), msgId, msgMetadata.getSequenceId());
}
// remove buffer from the map, add chucked messageId to unack-message tracker, and reduce pending-chunked-message count
chunkedMessagesMap.remove(msgMetadata.getUuid());
unAckedChunckedMessageIdSequenceMap.put(msgId, chunkedMsgCtx.chunkedMessageIds);
pendingChunckedMessageCount--;
compressedPayload.release();
compressedPayload = chunkedMsgCtx.chunkedMsgBuffer;
chunkedMsgCtx.recycle();
ByteBuf uncompressedPayload = uncompressPayloadIfNeeded(messageId, msgMetadata, compressedPayload, cnx, false);
compressedPayload.release();
return uncompressedPayload;
}