下面列出了io.netty.buffer.UnpooledByteBufAllocator#DEFAULT 实例代码,或者点击链接到github查看源代码,也可以在右侧发表评论。
/**
* 反转义
*/
@Override
public ByteBuf unEscape(ByteBuf source) {
int low = source.readerIndex();
int high = source.writerIndex();
int mark = source.indexOf(low, high, (byte) 0x7d);
if (mark == -1)
return source;
List<ByteBuf> bufList = new ArrayList<>(3);
int len;
do {
len = mark + 2 - low;
bufList.add(slice(source, low, len));
low += len;
mark = source.indexOf(low, high, (byte) 0x7d);
} while (mark > 0);
bufList.add(source.slice(low, high - low));
return new CompositeByteBuf(UnpooledByteBufAllocator.DEFAULT, false, bufList.size(), bufList);
}
public static void main(String[] args) throws Exception {
BinarySerializer codec = ExampleConstants.BINARY_SERIALIZER;
ByteBufAllocator byteBufAllocator = UnpooledByteBufAllocator.DEFAULT;
// NetUtils初始化,避免输出扰乱视听
System.out.println(NetUtils.getOuterIp());
ExampleMessages.FullMessage fullMessage = newFullMessage();
ByteBuf encodeResult = codec.writeObject(byteBufAllocator, fullMessage);
System.out.println("--------------------encode length-------------------");
System.out.println(encodeResult.readableBytes());
System.out.println("-----------------------origin---------------------");
System.out.println(fullMessage);
Object decodeResult = codec.readObject(encodeResult);
System.out.println("-----------------------decode--------------------");
System.out.println(decodeResult);
System.out.println("equals = " + fullMessage.equals(decodeResult));
encodeResult.release();
}
public static void main(String[] args) throws Exception {
BinarySerializer codec = ExampleConstants.BINARY_SERIALIZER;
ByteBufAllocator byteBufAllocator = UnpooledByteBufAllocator.DEFAULT;
final p_test.p_helloworld hello = p_test.p_helloworld.newBuilder()
.setA(1)
.setB(5506665554142L)
.addAllC(Arrays.asList(0, 1, 2, 3, 4, 5))
.setE("hello")
.setF(true)
.setG(1.1f)
.setH(2.0d)
.setK(p_test.ERole.AGE)
.build();
ByteBuf encodeResult = codec.writeObject(byteBufAllocator, hello);
Object decodeResult = codec.readObject(encodeResult);
System.out.println(decodeResult);
System.out.println("equals = " + hello.equals(decodeResult));
encodeResult.release();
}
@Setup(Level.Trial)
public void setup() {
byte[] bytes = new byte[256];
content = Unpooled.buffer(bytes.length);
content.writeBytes(bytes);
ByteBuf testContent = Unpooled.unreleasableBuffer(content.asReadOnly());
List<RedisMessage> rList = new ArrayList<RedisMessage>(arraySize);
for (int i = 0; i < arraySize; ++i) {
rList.add(new FullBulkStringRedisMessage(testContent));
}
redisArray = new ArrayRedisMessage(rList);
encoder = new RedisEncoder();
context = new EmbeddedChannelWriteReleaseHandlerContext(pooledAllocator ? PooledByteBufAllocator.DEFAULT :
UnpooledByteBufAllocator.DEFAULT, encoder) {
@Override
protected void handleException(Throwable t) {
handleUnexpectedException(t);
}
};
}
private static <REQ> ByteBuf buildRequestBuf(String service, String version, String method, int seqid, REQ request, BeanSerializer<REQ> requestSerializer) throws SoaException {
AbstractByteBufAllocator allocator =
SoaSystemEnvProperties.SOA_POOLED_BYTEBUF ?
PooledByteBufAllocator.DEFAULT : UnpooledByteBufAllocator.DEFAULT;
final ByteBuf requestBuf = allocator.buffer(8192);
SoaMessageBuilder<REQ> builder = new SoaMessageBuilder<>();
try {
SoaHeader header = SoaHeaderHelper.buildHeader(service, version, method);
ByteBuf buf = builder.buffer(requestBuf)
.header(header)
.body(request, requestSerializer)
.seqid(seqid)
.build();
return buf;
} catch (TException e) {
e.printStackTrace();
}
return null;
}
protected Bootstrap initBootstrap() {
AbstractByteBufAllocator allocator =
SoaSystemEnvProperties.SOA_POOLED_BYTEBUF ?
PooledByteBufAllocator.DEFAULT : UnpooledByteBufAllocator.DEFAULT;
bootstrap = new Bootstrap();
bootstrap.group(workerGroup);
bootstrap.channel(NioSocketChannel.class);
bootstrap.option(ChannelOption.SO_KEEPALIVE, true);
bootstrap.option(ChannelOption.ALLOCATOR, allocator);
bootstrap.handler(new ChannelInitializer<SocketChannel>() {
@Override
public void initChannel(SocketChannel ch) throws Exception {
ch.pipeline().addLast(new IdleStateHandler(readerIdleTimeSeconds, writerIdleTimeSeconds, allIdleTimeSeconds),
new SoaFrameDecoder(), //粘包和断包处理
new SoaIdleHandler(),
new SoaClientHandler(callBack));
}
});
return bootstrap;
}
@Test
public void usingNettyDataBufferFactory_PooledHttpData() {
final DataBufferFactoryWrapper<?> wrapper =
new DataBufferFactoryWrapper<>(new NettyDataBufferFactory(UnpooledByteBufAllocator.DEFAULT));
final PooledHttpData httpData1 =
PooledHttpData.wrap(Unpooled.wrappedBuffer("abc".getBytes()));
final DataBuffer buffer = wrapper.toDataBuffer(httpData1);
assertThat(buffer).isInstanceOf(NettyDataBuffer.class);
assertThat(((NettyDataBuffer) buffer).getNativeBuffer().refCnt()).isOne();
final HttpData httpData2 = wrapper.toHttpData(buffer);
assertThat(httpData2).isInstanceOf(PooledHttpData.class);
assertThat(((PooledHttpData) httpData2).content())
.isEqualTo(((NettyDataBuffer) buffer).getNativeBuffer());
assertThat(((PooledHttpData) httpData2).refCnt()).isOne();
}
@Setup(Level.Trial)
public void setup() {
byte[] bytes = new byte[256];
content = Unpooled.buffer(bytes.length);
content.writeBytes(bytes);
ByteBuf testContent = Unpooled.unreleasableBuffer(content.asReadOnly());
HttpHeaders headersWithChunked = new DefaultHttpHeaders(false);
headersWithChunked.add(HttpHeaderNames.TRANSFER_ENCODING, HttpHeaderValues.CHUNKED);
HttpHeaders headersWithContentLength = new DefaultHttpHeaders(false);
headersWithContentLength.add(HttpHeaderNames.CONTENT_LENGTH, testContent.readableBytes());
fullRequest = new DefaultFullHttpRequest(HttpVersion.HTTP_1_1, HttpMethod.POST, "/index", testContent,
headersWithContentLength, EmptyHttpHeaders.INSTANCE);
contentLengthRequest = new DefaultHttpRequest(HttpVersion.HTTP_1_1, HttpMethod.POST, "/index",
headersWithContentLength);
chunkedRequest = new DefaultHttpRequest(HttpVersion.HTTP_1_1, HttpMethod.POST, "/index", headersWithChunked);
lastContent = new DefaultLastHttpContent(testContent, false);
encoder = new HttpRequestEncoder();
context = new EmbeddedChannelWriteReleaseHandlerContext(pooledAllocator ? PooledByteBufAllocator.DEFAULT :
UnpooledByteBufAllocator.DEFAULT, encoder) {
@Override
protected void handleException(Throwable t) {
handleUnexpectedException(t);
}
};
}
@Setup(Level.Trial)
public void setup() {
writer = new DefaultHttp2FrameWriter();
oldWriter = new OldDefaultHttp2FrameWriter();
payload = pooled ? PooledByteBufAllocator.DEFAULT.buffer(payloadSize) : Unpooled.buffer(payloadSize);
payload.writeZero(payloadSize);
ctx = new EmbeddedChannelWriteReleaseHandlerContext(
pooled ? PooledByteBufAllocator.DEFAULT : UnpooledByteBufAllocator.DEFAULT,
new ChannelInboundHandlerAdapter()) {
@Override
protected void handleException(Throwable t) {
handleUnexpectedException(t);
}
};
}
/**
* 创建非缓存的缓冲区
*
* @param url
* @return
*/
protected static ByteBufAllocator createUnPooled(final URL url) {
String preferDirect = url.getString(BUFFER_PREFER_DIRECT_KEY);
if ("true".equalsIgnoreCase(preferDirect)) {
return new UnpooledByteBufAllocator(true);
} else if ("false".equalsIgnoreCase(preferDirect)) {
return new UnpooledByteBufAllocator(false);
} else {
return UnpooledByteBufAllocator.DEFAULT;
}
}
@Test
public void doesNotProcessByteBufHolder() {
CopyByteBufHandler handler = new CopyByteBufHandler(UnpooledByteBufAllocator.DEFAULT);
ChannelHandlerContext ctx = mock(ChannelHandlerContext.class);
ByteBuf buf = mock(ByteBuf.class);
IllegalArgumentException ex = assertThrows(IllegalArgumentException.class,
// Use DatagramPacket as a ByteBufHolder implementation:
() -> handler.channelRead(ctx, new DatagramPacket(buf, mock(InetSocketAddress.class))));
assertThat(ex.getMessage(), startsWith("Unexpected ReferenceCounted msg"));
verify(ctx, never()).fireChannelRead(any());
verify(buf).release();
}
@Test
public void copiesAndReleasesPooledByteBuf() {
ByteBufAllocator pooledAllocator = PooledByteBufAllocator.DEFAULT;
ByteBufAllocator unpooledAllocator = UnpooledByteBufAllocator.DEFAULT;
CopyByteBufHandler handler = new CopyByteBufHandler(unpooledAllocator);
ChannelHandlerContext ctx = mock(ChannelHandlerContext.class);
ArgumentCaptor<ByteBuf> valueCapture = ArgumentCaptor.forClass(ByteBuf.class);
doReturn(ctx).when(ctx).fireChannelRead(valueCapture.capture());
ByteBuf pooledBuf = pooledAllocator.buffer(4);
assertThat(pooledBuf.alloc(), is(pooledAllocator));
try {
assertThat(writeAscii(pooledBuf, "test"), is(4));
handler.channelRead(ctx, pooledBuf);
assertThat(pooledBuf.refCnt(), is(0));
ByteBuf unpooledBuf = valueCapture.getValue();
assertThat(unpooledBuf, is(not(sameInstance(pooledBuf))));
assertThat(unpooledBuf.alloc(), is(unpooledAllocator));
assertThat(unpooledBuf.toString(US_ASCII), equalTo("test"));
} finally {
if (pooledBuf.refCnt() > 0) {
pooledBuf.release();
}
}
}
@Test
public void forwardsOtherTypes() {
CopyByteBufHandler handler = new CopyByteBufHandler(UnpooledByteBufAllocator.DEFAULT);
ChannelHandlerContext ctx = mock(ChannelHandlerContext.class);
ArgumentCaptor<String> valueCapture = ArgumentCaptor.forClass(String.class);
doReturn(ctx).when(ctx).fireChannelRead(valueCapture.capture());
handler.channelRead(ctx, "test");
assertThat(valueCapture.getValue(), equalTo("test"));
}
/**
* convert {@link DynamicCompositeByteBuf} to netty {@link ByteBuf},
* the reference count of its underlying buffers are not increased.
* @return netty ByteBuf
*/
public ByteBuf nettyByteBuf() {
if (readableBytes == 0) {
return Unpooled.EMPTY_BUFFER;
}
int size = buffers.size();
if (size == 1) {
return buffers.pop();
}
return new CompositeByteBuf(UnpooledByteBufAllocator.DEFAULT, false,
size, buffers.toArray(new ByteBuf[0]));
}
@Override
protected <REQ> ByteBuf buildRequestBuf(String service, String version, String method, int seqid, REQ request, BeanSerializer<REQ> requestSerializer) throws SoaException {
AbstractByteBufAllocator allocator =
SoaSystemEnvProperties.SOA_POOLED_BYTEBUF ?
PooledByteBufAllocator.DEFAULT : UnpooledByteBufAllocator.DEFAULT;
final ByteBuf requestBuf = allocator.buffer(8192);
SoaMessageBuilder<REQ> builder = new SoaMessageBuilder<>();
try {
SoaHeader header = SoaHeaderHelper.buildHeader(service, version, method);
ByteBuf buf = builder.buffer(requestBuf)
.header(header)
.body(request, requestSerializer)
.seqid(seqid)
.build();
return buf;
} catch (TException e) {
LOGGER.error(e.getMessage(), e);
requestBuf.release();
if (e instanceof SoaException) {
throw (SoaException)e;
} else {
throw new SoaException(e);
}
}
}
@Test
public void createsEntityUsingGivenSource() throws IOException {
// given
HCRequestFactory factory = createDefaultTestObject();
String expectedUrl = UUID.randomUUID().toString();
Request request = createDefaultMockRequest(expectedUrl, "POST");
ByteBuf byteBuf = new CompositeByteBuf(UnpooledByteBufAllocator.DEFAULT, false, 2);
byte[] expectedBytes = UUID.randomUUID().toString().getBytes();
byteBuf.writeBytes(expectedBytes);
ItemSource<ByteBuf> itemSource = mock(ItemSource.class);
when(itemSource.getSource()).thenReturn(byteBuf);
when(request.serialize()).thenReturn(itemSource);
// when
HttpEntityEnclosingRequest result = (HttpEntityEnclosingRequest) factory.create(expectedUrl, request);
// then
assertEquals(expectedBytes.length, result.getEntity().getContentLength());
ByteBuf source = (ByteBuf) request.serialize().getSource();
source.readBytes(new byte[source.writerIndex()]);
InputStream inputStream = result.getEntity().getContent();
assertEquals(0, inputStream.available());
}
@Override
protected ByteBuf getWriteByteBuf() {
int length = payload.length;
CompositeByteBuf result = new CompositeByteBuf(UnpooledByteBufAllocator.DEFAULT, false, payload.length + 1);
String prefix = String.format("%c%d\r\n", ASTERISK_BYTE, length);
result.addComponent(Unpooled.wrappedBuffer(prefix.getBytes()));
for(Object o : payload){
ByteBuf buff = ParserManager.parse(o);
result.addComponent(buff);
}
result.setIndex(0, result.capacity());
return result;
}
@Override
public void start() {
LOGGER.warn("Plugin::" + getClass().getSimpleName() + "::start");
LOGGER.info("Bind Local Port {} [Netty]", port);
LOGGER.info("ByteBufAllocator:{}", SoaSystemEnvProperties.SOA_POOLED_BYTEBUF ? "pooled" : "unpooled");
Thread bootstrapThread = new Thread("NettyContainer-Thread") {
@Override
public void run() {
try {
bootstrap = new ServerBootstrap();
AbstractByteBufAllocator allocator =
SoaSystemEnvProperties.SOA_POOLED_BYTEBUF ?
PooledByteBufAllocator.DEFAULT : UnpooledByteBufAllocator.DEFAULT;
//链路控制
ChannelHandler soaLinkStateHandler = new SoaLinkStateHandler();
//编解码器
ChannelHandler soaMsgDecoder = new SoaMsgDecoder(container);
ChannelHandler soaMsgEncoder = new SoaMsgEncoder(container);
//业务处理器
ChannelHandler soaServerHandler = new SoaServerHandler(container);
ChannelHandler soaInvokeCounter = MONITOR_ENABLE ? new SoaInvokeCounter() : null;
//限流 handler
SoaFreqHandler freqHandler = FREQ_LIMIT_ENABLE ? new SoaFreqHandler() : null;
bootstrap.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class)
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
// 超时设置
ch.pipeline().addLast(HandlerConstants.IDLE_STATE_HANDLER, new IdleStateHandler(20, 0, 0));
//粘包和断包处理
ch.pipeline().addLast(HandlerConstants.SOA_FRAME_DECODER_HANDLER, new SoaFrameDecoder());
// 链路监控检测
ch.pipeline().addLast(HandlerConstants.SOA_IDLE_HANDLER, soaLinkStateHandler);
ch.pipeline().addLast(HandlerConstants.SOA_MSG_ENCODER_HANDLER, soaMsgEncoder);
ch.pipeline().addLast(HandlerConstants.SOA_MSG_DECODER_HANDLER, soaMsgDecoder);
if (FREQ_LIMIT_ENABLE) {
// 添加服务限流handler
ch.pipeline().addLast(HandlerConstants.SOA_FREQ_HANDLER, freqHandler);
}
// 服务调用统计
if (MONITOR_ENABLE) {
ch.pipeline().addLast(HandlerConstants.SOA_INVOKE_COUNTER_HANDLER, soaInvokeCounter);
}
ch.pipeline().addLast(HandlerConstants.SOA_SERVER_HANDLER, soaServerHandler);
}
})
.option(ChannelOption.SO_BACKLOG, 1024)
.option(ChannelOption.ALLOCATOR, allocator)
.childOption(ChannelOption.SO_KEEPALIVE, true)
.childOption(ChannelOption.ALLOCATOR, allocator);
// Start the server.
ChannelFuture f = bootstrap.bind(port).sync();
// Wait until the connection is closed.
f.channel().closeFuture().sync();
} catch (InterruptedException e) {
LOGGER.error(e.getMessage(), e);
} finally {
workerGroup.shutdownGracefully();
bossGroup.shutdownGracefully();
}
}
};
bootstrapThread.setDaemon(true);
bootstrapThread.start();
}
@BeforeEach
void setUp() {
framer = new ArmeriaMessageFramer(UnpooledByteBufAllocator.DEFAULT, 1024);
}
/**
* Create a new CacheClient with the safest default.
* Use {@link #newBuilder() } in order to have full control.
*
* @param clientId
* @param sharedSecret
* @param brokerLocator
*/
public CacheClient(String clientId, String sharedSecret, ServerLocator brokerLocator) {
this(clientId, sharedSecret, brokerLocator, true, UnpooledByteBufAllocator.DEFAULT);
}