下面列出了怎么用io.netty.util.internal.RecyclableArrayList的API类实例代码及写法,或者点击链接到github查看源代码。
@Override
protected void doWrite(ChannelOutboundBuffer buffer) throws Exception {
final RecyclableArrayList list = RecyclableArrayList.newInstance();
boolean freeList = true;
try {
ByteBuf buf = null;
while ((buf = (ByteBuf) buffer.current()) != null) {
list.add(buf.retain());
buffer.remove();
}
freeList = false;
} finally {
if (freeList) {
for (Object obj : list) {
ReferenceCountUtil.safeRelease(obj);
}
list.recycle();
}
}
serverChannel.doWrite(list, remote);
}
/**
* Write messages to the outbound of this {@link Channel}.将消息写入该通道的出站。
*
* @param msgs the messages to be written
* @return bufferReadable returns {@code true} if the write operation did add something to the outbound buffer
*/
public boolean writeOutbound(Object... msgs) {
ensureOpen();
if (msgs.length == 0) {
return isNotEmpty(outboundMessages);
}
RecyclableArrayList futures = RecyclableArrayList.newInstance(msgs.length);
try {
for (Object m: msgs) {
if (m == null) {
break;
}
futures.add(write(m));
}
flushOutbound0();
int size = futures.size();
for (int i = 0; i < size; i++) {
ChannelFuture future = (ChannelFuture) futures.get(i);
if (future.isDone()) {
recordException(future);
} else {
// The write may be delayed to run later by runPendingTasks()
future.addListener(recordExceptionListener);
}
}
checkException();
return isNotEmpty(outboundMessages);
} finally {
futures.recycle();
}
}
@Override
public ByteBuffer[] nioBuffers(int index, int length) {
checkIndex(index, length);
if (length == 0) {
return EmptyArrays.EMPTY_BYTE_BUFFERS;
}
RecyclableArrayList array = RecyclableArrayList.newInstance(buffers.length);
try {
Component c = findComponent(index);
int i = c.index;
int adjustment = c.offset;
ByteBuf s = c.buf;
for (;;) {
int localLength = Math.min(length, s.readableBytes() - (index - adjustment));
switch (s.nioBufferCount()) {
case 0:
throw new UnsupportedOperationException();
case 1:
array.add(s.nioBuffer(index - adjustment, localLength));
break;
default:
Collections.addAll(array, s.nioBuffers(index - adjustment, localLength));
}
index += localLength;
length -= localLength;
adjustment += s.readableBytes();
if (length <= 0) {
break;
}
s = buffer(++i);
}
return array.toArray(new ByteBuffer[array.size()]);
} finally {
array.recycle();
}
}
protected void doWrite(RecyclableArrayList list, InetSocketAddress remote) {
Channel ioChannel = ioChannels.get(remote.hashCode() & (ioChannels.size() - 1));
ioChannel.eventLoop().execute(() -> {
try {
for (Object buf : list) {
ioChannel.write(new DatagramPacket((ByteBuf) buf, remote));
}
ioChannel.flush();
} finally {
list.recycle();
}
});
}
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
if (msg instanceof ByteBuf) {
RecyclableArrayList out = RecyclableArrayList.newInstance();
try {
ByteBuf data = (ByteBuf) msg;
first = cumulation == null;
if (first) {
cumulation = data;
} else {
cumulation = cumulator.cumulate(ctx.alloc(), cumulation, data);
}
callDecode(ctx, cumulation, out);
} catch (DecoderException e) {
throw e;
} catch (Throwable t) {
throw new DecoderException(t);
} finally {
if (cumulation != null && !cumulation.isReadable()) {
cumulation.release();
cumulation = null;
}
int size = out.size();
for (int i = 0; i < size; i ++) {
ctx.fireChannelRead(out.get(i));
}
out.recycle();
}
} else {
ctx.fireChannelRead(msg);
}
}
@Benchmark
public void recycleSameThread() {
RecyclableArrayList list = RecyclableArrayList.newInstance(size);
list.recycle();
}
@Setup(Level.Invocation)
public void setup() {
codecOutputList = CodecOutputList.newInstance();
recycleableArrayList = RecyclableArrayList.newInstance(16);
arrayList = new ArrayList<Object>(16);
}
public static <T>List<T> newRecyclableList(int minCapacity){
RecyclableArrayList finishListeners = RecyclableArrayList.newInstance(minCapacity);
return (List<T>) finishListeners;
}
/**
* This method has been modified to check the size of decoded msgs, which is represented by the
* local variable {@code RecyclableArrayList out}. If has decoded more than one msg,
* then construct an array list to submit all decoded msgs to the pipeline.
*
* @param ctx channel handler context
* @param msg data
*/
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
if (msg instanceof ByteBuf) {
RecyclableArrayList out = RecyclableArrayList.newInstance();
try {
ByteBuf data = (ByteBuf) msg;
first = cumulation == null;
if (first) {
cumulation = data;
} else {
cumulation = cumulator.cumulate(ctx.alloc(), cumulation, data);
}
callDecode(ctx, cumulation, out);
} catch (DecoderException e) {
throw e;
} catch (Throwable t) {
throw new DecoderException(t);
} finally {
if (cumulation != null && !cumulation.isReadable()) {
numReads = 0;
cumulation.release();
cumulation = null;
} else if (++numReads >= discardAfterReads) {
// We did enough reads already try to discard some bytes so we not risk to see a OOME.
// See https://github.com/netty/netty/issues/4275
numReads = 0;
discardSomeReadBytes();
}
int size = out.size();
if (size == 0) {
decodeWasNull = true;
} else if (size == 1) {
ctx.fireChannelRead(out.get(0));
} else {
ArrayList<Object> ret = new ArrayList<Object>(size);
for (int i = 0; i < size; i++) {
ret.add(out.get(i));
}
ctx.fireChannelRead(ret);
}
out.recycle();
}
} else {
ctx.fireChannelRead(msg);
}
}
@Benchmark
public void recycleSameThread() {
RecyclableArrayList list = RecyclableArrayList.newInstance(size);
list.recycle();
}
@Override
public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
RecyclableArrayList out = null;
try {
if (acceptOutboundMessage(msg)) {
out = RecyclableArrayList.newInstance();
@SuppressWarnings("unchecked")
I cast = (I) msg;
try {
encode(ctx, cast, out);
} finally {
ReferenceCountUtil.release(cast);
}
if (out.isEmpty()) {
out.recycle();
out = null;
throw new EncoderException(
StringUtil.simpleClassName(this) + " must produce at least one message.");
}
} else {
ctx.write(msg, promise);
}
} catch (EncoderException e) {
throw e;
} catch (Throwable t) {
throw new EncoderException(t);
} finally {
if (out != null) {
final int sizeMinusOne = out.size() - 1;
if (sizeMinusOne == 0) {
ctx.write(out.get(0), promise);
} else if (sizeMinusOne > 0) {
// Check if we can use a voidPromise for our extra writes to reduce GC-Pressure
// See https://github.com/netty/netty/issues/2525
ChannelPromise voidPromise = ctx.voidPromise();
boolean isVoidPromise = promise == voidPromise;
for (int i = 0; i < sizeMinusOne; i ++) {
ChannelPromise p;
if (isVoidPromise) {
p = voidPromise;
} else {
p = ctx.newPromise();
}
ctx.write(out.get(i), p);
}
ctx.write(out.get(sizeMinusOne), promise);
}
out.recycle();
}
}
}
@Override
@SuppressWarnings("unchecked")
public void write(final ChannelHandlerContext channelHandlerContext, final Object o, final ChannelPromise channelPromise) {
RecyclableArrayList instance = null;
try {
if (this.acceptOutboundMessage(o)) {
instance = RecyclableArrayList.newInstance();
try {
this.encode(channelHandlerContext, (I) o, (List<Object>) instance);
} finally {
ReferenceCountUtil.release(o);
}
if (instance.isEmpty()) {
instance.recycle();
instance = null;
channelPromise.setSuccess();
}
} else {
channelHandlerContext.write(o, channelPromise);
}
} catch (EncoderException ex) {
throw ex;
} catch (Throwable t) {
throw new EncoderException(t);
} finally {
if (instance != null) {
final int n = instance.size() - 1;
if (n == 0) {
channelHandlerContext.write(instance.get(0), channelPromise);
} else if (n > 0) {
ChannelPromise voidPromise = channelHandlerContext.voidPromise();
boolean b = channelPromise == voidPromise;
for (int i = 0; i < n; ++i) {
ChannelPromise promise;
if (b) {
promise = voidPromise;
} else {
promise = channelHandlerContext.newPromise();
}
channelHandlerContext.write(instance.get(i), promise);
}
channelHandlerContext.write(instance.get(n), channelPromise);
}
instance.recycle();
}
}
}