下面列出了怎么用io.netty.channel.ChannelOutboundBuffer的API类实例代码及写法,或者点击链接到github查看源代码。
@Override
protected void doWrite(ChannelOutboundBuffer in) throws Exception {
int writeSpinCount = config().getWriteSpinCount();
do {
Object msg = in.current();
if (msg == null) {
// Wrote all messages.
clearOpWrite();
// Directly return here so incompleteWrite(...) is not called.
return;
}
writeSpinCount -= doWriteInternal(in, msg);
} while (writeSpinCount > 0);
incompleteWrite(writeSpinCount < 0);
}
/**
* Write bytes form the given {@link ByteBuf} to the underlying {@link java.nio.channels.Channel}.
* @param in the collection which contains objects to write.
* @param buf the {@link ByteBuf} from which the bytes should be written
* @return The value that should be decremented from the write quantum which starts at
* {@link ChannelConfig#getWriteSpinCount()}. The typical use cases are as follows:
* <ul>
* <li>0 - if no write was attempted. This is appropriate if an empty {@link ByteBuf} (or other empty content)
* is encountered</li>
* <li>1 - if a single call to write data was made to the OS</li>
* <li>{@link ChannelUtils#WRITE_STATUS_SNDBUF_FULL} - if an attempt to write data was made to the OS, but no
* data was accepted</li>
* </ul>
*/
private int writeBytes(ChannelOutboundBuffer in, ByteBuf buf) throws Exception {
int readableBytes = buf.readableBytes();
if (readableBytes == 0) {
in.remove();
return 0;
}
if (buf.hasMemoryAddress() || buf.nioBufferCount() == 1) {
return doWriteBytes(in, buf);
} else {
ByteBuffer[] nioBuffers = buf.nioBuffers();
return writeBytesMultiple(in, nioBuffers, nioBuffers.length, readableBytes,
config().getMaxBytesPerGatheringWrite());
}
}
/**
* Write a {@link DefaultFileRegion}
* @param in the collection which contains objects to write.
* @param region the {@link DefaultFileRegion} from which the bytes should be written
* @return The value that should be decremented from the write quantum which starts at
* {@link ChannelConfig#getWriteSpinCount()}. The typical use cases are as follows:
* <ul>
* <li>0 - if no write was attempted. This is appropriate if an empty {@link ByteBuf} (or other empty content)
* is encountered</li>
* <li>1 - if a single call to write data was made to the OS</li>
* <li>{@link ChannelUtils#WRITE_STATUS_SNDBUF_FULL} - if an attempt to write data was made to the OS, but
* no data was accepted</li>
* </ul>
*/
private int writeDefaultFileRegion(ChannelOutboundBuffer in, DefaultFileRegion region) throws Exception {
final long regionCount = region.count();
if (region.transferred() >= regionCount) {
in.remove();
return 0;
}
final long offset = region.transferred();
final long flushedAmount = socket.sendFile(region, region.position(), offset, regionCount - offset);
if (flushedAmount > 0) {
in.progress(flushedAmount);
if (region.transferred() >= regionCount) {
in.remove();
}
return 1;
}
return WRITE_STATUS_SNDBUF_FULL;
}
/**
* Write a {@link FileRegion}
* @param in the collection which contains objects to write.
* @param region the {@link FileRegion} from which the bytes should be written
* @return The value that should be decremented from the write quantum which starts at
* {@link ChannelConfig#getWriteSpinCount()}. The typical use cases are as follows:
* <ul>
* <li>0 - if no write was attempted. This is appropriate if an empty {@link ByteBuf} (or other empty content)
* is encountered</li>
* <li>1 - if a single call to write data was made to the OS</li>
* <li>{@link ChannelUtils#WRITE_STATUS_SNDBUF_FULL} - if an attempt to write data was made to the OS, but no
* data was accepted</li>
* </ul>
*/
private int writeFileRegion(ChannelOutboundBuffer in, FileRegion region) throws Exception {
if (region.transferred() >= region.count()) {
in.remove();
return 0;
}
if (byteChannel == null) {
byteChannel = new KQueueSocketWritableByteChannel();
}
final long flushedAmount = region.transferTo(byteChannel, region.transferred());
if (flushedAmount > 0) {
in.progress(flushedAmount);
if (region.transferred() >= region.count()) {
in.remove();
}
return 1;
}
return WRITE_STATUS_SNDBUF_FULL;
}
/**
* Attempt to write multiple {@link ByteBuf} objects.
* @param in the collection which contains objects to write.
* @return The value that should be decremented from the write quantum which starts at
* {@link ChannelConfig#getWriteSpinCount()}. The typical use cases are as follows:
* <ul>
* <li>0 - if no write was attempted. This is appropriate if an empty {@link ByteBuf} (or other empty content)
* is encountered</li>
* <li>1 - if a single call to write data was made to the OS</li>
* <li>{@link ChannelUtils#WRITE_STATUS_SNDBUF_FULL} - if an attempt to write data was made to the OS, but no
* data was accepted</li>
* </ul>
* @throws Exception If an I/O error occurs.
*/
private int doWriteMultiple(ChannelOutboundBuffer in) throws Exception {
final long maxBytesPerGatheringWrite = config().getMaxBytesPerGatheringWrite();
if (PlatformDependent.hasUnsafe()) {
IovArray array = ((KQueueEventLoop) eventLoop()).cleanArray();
array.maxBytes(maxBytesPerGatheringWrite);
in.forEachFlushedMessage(array);
if (array.count() >= 1) {
// TODO: Handle the case where cnt == 1 specially.
return writeBytesMultiple(in, array);
}
} else {
ByteBuffer[] buffers = in.nioBuffers();
int cnt = in.nioBufferCount();
if (cnt >= 1) {
// TODO: Handle the case where cnt == 1 specially.
return writeBytesMultiple(in, buffers, cnt, in.nioBufferSize(), maxBytesPerGatheringWrite);
}
}
// cnt == 0, which means the outbound buffer contained empty buffers only.
in.removeBytes(0);
return 0;
}
/**
* Write bytes form the given {@link ByteBuf} to the underlying {@link java.nio.channels.Channel}.
* @param in the collection which contains objects to write.
* @param buf the {@link ByteBuf} from which the bytes should be written
* @return The value that should be decremented from the write quantum which starts at
* {@link ChannelConfig#getWriteSpinCount()}. The typical use cases are as follows:
* <ul>
* <li>0 - if no write was attempted. This is appropriate if an empty {@link ByteBuf} (or other empty content)
* is encountered</li>
* <li>1 - if a single call to write data was made to the OS</li>
* <li>{@link ChannelUtils#WRITE_STATUS_SNDBUF_FULL} - if an attempt to write data was made to the OS, but
* no data was accepted</li>
* </ul>
*/
private int writeBytes(ChannelOutboundBuffer in, ByteBuf buf) throws Exception {
int readableBytes = buf.readableBytes();
if (readableBytes == 0) {
in.remove();
return 0;
}
if (buf.hasMemoryAddress() || buf.nioBufferCount() == 1) {
return doWriteBytes(in, buf);
} else {
ByteBuffer[] nioBuffers = buf.nioBuffers();
return writeBytesMultiple(in, nioBuffers, nioBuffers.length, readableBytes,
config().getMaxBytesPerGatheringWrite());
}
}
/**
* Write a {@link DefaultFileRegion}
* @param in the collection which contains objects to write.
* @param region the {@link DefaultFileRegion} from which the bytes should be written
* @return The value that should be decremented from the write quantum which starts at
* {@link ChannelConfig#getWriteSpinCount()}. The typical use cases are as follows:
* <ul>
* <li>0 - if no write was attempted. This is appropriate if an empty {@link ByteBuf} (or other empty content)
* is encountered</li>
* <li>1 - if a single call to write data was made to the OS</li>
* <li>{@link ChannelUtils#WRITE_STATUS_SNDBUF_FULL} - if an attempt to write data was made to the OS, but
* no data was accepted</li>
* </ul>
*/
private int writeDefaultFileRegion(ChannelOutboundBuffer in, DefaultFileRegion region) throws Exception {
final long regionCount = region.count();
if (region.transferred() >= regionCount) {
in.remove();
return 0;
}
final long offset = region.transferred();
final long flushedAmount = socket.sendFile(region, region.position(), offset, regionCount - offset);
if (flushedAmount > 0) {
in.progress(flushedAmount);
if (region.transferred() >= regionCount) {
in.remove();
}
return 1;
}
return WRITE_STATUS_SNDBUF_FULL;
}
/**
* Write a {@link FileRegion}
* @param in the collection which contains objects to write.
* @param region the {@link FileRegion} from which the bytes should be written
* @return The value that should be decremented from the write quantum which starts at
* {@link ChannelConfig#getWriteSpinCount()}. The typical use cases are as follows:
* <ul>
* <li>0 - if no write was attempted. This is appropriate if an empty {@link ByteBuf} (or other empty content)
* is encountered</li>
* <li>1 - if a single call to write data was made to the OS</li>
* <li>{@link ChannelUtils#WRITE_STATUS_SNDBUF_FULL} - if an attempt to write data was made to the OS, but
* no data was accepted</li>
* </ul>
*/
private int writeFileRegion(ChannelOutboundBuffer in, FileRegion region) throws Exception {
if (region.transferred() >= region.count()) {
in.remove();
return 0;
}
if (byteChannel == null) {
byteChannel = new EpollSocketWritableByteChannel();
}
final long flushedAmount = region.transferTo(byteChannel, region.transferred());
if (flushedAmount > 0) {
in.progress(flushedAmount);
if (region.transferred() >= region.count()) {
in.remove();
}
return 1;
}
return WRITE_STATUS_SNDBUF_FULL;
}
/**
* Attempt to write a single object.
* @param in the collection which contains objects to write.
* @return The value that should be decremented from the write quantum which starts at
* {@link ChannelConfig#getWriteSpinCount()}. The typical use cases are as follows:
* <ul>
* <li>0 - if no write was attempted. This is appropriate if an empty {@link ByteBuf} (or other empty content)
* is encountered</li>
* <li>1 - if a single call to write data was made to the OS</li>
* <li>{@link ChannelUtils#WRITE_STATUS_SNDBUF_FULL} - if an attempt to write data was made to the OS, but
* no data was accepted</li>
* </ul>
* @throws Exception If an I/O error occurs.
*/
protected int doWriteSingle(ChannelOutboundBuffer in) throws Exception {
// The outbound buffer contains only one message or it contains a file region.
Object msg = in.current();
if (msg instanceof ByteBuf) {
return writeBytes(in, (ByteBuf) msg);
} else if (msg instanceof DefaultFileRegion) {
return writeDefaultFileRegion(in, (DefaultFileRegion) msg);
} else if (msg instanceof FileRegion) {
return writeFileRegion(in, (FileRegion) msg);
} else if (msg instanceof SpliceOutTask) {
if (!((SpliceOutTask) msg).spliceOut()) {
return WRITE_STATUS_SNDBUF_FULL;
}
in.remove();
return 1;
} else {
// Should never reach here.
throw new Error();
}
}
/**
* Attempt to write multiple {@link ByteBuf} objects.
* @param in the collection which contains objects to write.
* @return The value that should be decremented from the write quantum which starts at
* {@link ChannelConfig#getWriteSpinCount()}. The typical use cases are as follows:
* <ul>
* <li>0 - if no write was attempted. This is appropriate if an empty {@link ByteBuf} (or other empty content)
* is encountered</li>
* <li>1 - if a single call to write data was made to the OS</li>
* <li>{@link ChannelUtils#WRITE_STATUS_SNDBUF_FULL} - if an attempt to write data was made to the OS, but
* no data was accepted</li>
* </ul>
* @throws Exception If an I/O error occurs.
*/
private int doWriteMultiple(ChannelOutboundBuffer in) throws Exception {
final long maxBytesPerGatheringWrite = config().getMaxBytesPerGatheringWrite();
if (PlatformDependent.hasUnsafe()) {
IovArray array = ((EpollEventLoop) eventLoop()).cleanArray();
array.maxBytes(maxBytesPerGatheringWrite);
in.forEachFlushedMessage(array);
if (array.count() >= 1) {
// TODO: Handle the case where cnt == 1 specially.
return writeBytesMultiple(in, array);
}
} else {
ByteBuffer[] buffers = in.nioBuffers();
int cnt = in.nioBufferCount();
if (cnt >= 1) {
// TODO: Handle the case where cnt == 1 specially.
return writeBytesMultiple(in, buffers, cnt, in.nioBufferSize(), maxBytesPerGatheringWrite);
}
}
// cnt == 0, which means the outbound buffer contained empty buffers only.
in.removeBytes(0);
return 0;
}
@Override
protected boolean doWriteMessage(Object msg, ChannelOutboundBuffer in) throws Exception {
UkcpPacket packet = (UkcpPacket) msg;
InetSocketAddress remoteAddress = packet.remoteAddress();
ByteBuf data = packet.content();
final int dataLen = data.readableBytes();
if (dataLen == 0) {
return true;
}
final ByteBuffer nioData = data.internalNioBuffer(data.readerIndex(), dataLen);
final int writtenBytes;
writtenBytes = javaChannel().send(nioData, remoteAddress);
return writtenBytes > 0;
}
@Override
protected void doWrite(ChannelOutboundBuffer buffer) throws Exception {
final RecyclableArrayList list = RecyclableArrayList.newInstance();
boolean freeList = true;
try {
ByteBuf buf = null;
while ((buf = (ByteBuf) buffer.current()) != null) {
list.add(buf.retain());
buffer.remove();
}
freeList = false;
} finally {
if (freeList) {
for (Object obj : list) {
ReferenceCountUtil.safeRelease(obj);
}
list.recycle();
}
}
serverChannel.doWrite(list, remote);
}
/**
* Write bytes form the given {@link ByteBuf} to the underlying {@link java.nio.channels.Channel}.
* @param buf the {@link ByteBuf} from which the bytes should be written
*/
private boolean writeBytes(ChannelOutboundBuffer in, ByteBuf buf, int writeSpinCount) throws Exception {
int readableBytes = buf.readableBytes();
if (readableBytes == 0) {
in.remove();
return true;
}
if (buf.hasMemoryAddress() || buf.nioBufferCount() == 1) {
int writtenBytes = doWriteBytes(buf, writeSpinCount);
in.removeBytes(writtenBytes);
return writtenBytes == readableBytes;
} else {
ByteBuffer[] nioBuffers = buf.nioBuffers();
return writeBytesMultiple(in, nioBuffers, nioBuffers.length, readableBytes, writeSpinCount);
}
}
protected boolean doWriteSingle(ChannelOutboundBuffer in, int writeSpinCount) throws Exception {
// The outbound buffer contains only one message or it contains a file region.
Object msg = in.current();
if (msg instanceof ByteBuf) {
ByteBuf buf = (ByteBuf) msg;
if (!writeBytes(in, buf, writeSpinCount)) {
// was not able to write everything so break here we will get notified later again once
// the network stack can handle more writes.
return false;
}
} else if (msg instanceof DefaultFileRegion) {
DefaultFileRegion region = (DefaultFileRegion) msg;
if (!writeFileRegion(in, region, writeSpinCount)) {
// was not able to write everything so break here we will get notified later again once
// the network stack can handle more writes.
return false;
}
} else {
// Should never reach here.
throw new Error();
}
return true;
}
@Override
protected boolean doWriteMessage(Object msg, ChannelOutboundBuffer in) throws Exception {
// expects a message
final UdtMessage message = (UdtMessage) msg;
final ByteBuf byteBuf = message.content();
final int messageSize = byteBuf.readableBytes();
if (messageSize == 0) {
return true;
}
final long writtenBytes;
if (byteBuf.nioBufferCount() == 1) {
writtenBytes = javaChannel().write(byteBuf.nioBuffer());
} else {
writtenBytes = javaChannel().write(byteBuf.nioBuffers());
}
// wrote message completely
if (writtenBytes > 0 && writtenBytes != messageSize) {
throw new Error(
"Provider error: failed to write message. Provider library should be upgraded.");
}
return writtenBytes > 0;
}
/**
* @see #hasOutputChanged(ChannelHandlerContext, boolean)
*/
private void initOutputChanged(ChannelHandlerContext ctx) {
if (observeOutput) {
Channel channel = ctx.channel();
Unsafe unsafe = channel.unsafe();
ChannelOutboundBuffer buf = unsafe.outboundBuffer();
if (buf != null) {
lastMessageHashCode = System.identityHashCode(buf.current());
lastPendingWriteBytes = buf.totalPendingWriteBytes();
}
}
}
/**
* Returns {@code true} if and only if the {@link IdleStateHandler} was constructed
* with {@link #observeOutput} enabled and there has been an observed change in the
* {@link ChannelOutboundBuffer} between two consecutive calls of this method.
* 如果且仅当IdleStateHandler被构造为启用了observeOutput并且该方法的两个连续调用之间的ChannelOutboundBuffer中出现了观察到的更改时,返回true。
*
* https://github.com/netty/netty/issues/6150
*/
private boolean hasOutputChanged(ChannelHandlerContext ctx, boolean first) {
if (observeOutput) {
// We can take this shortcut if the ChannelPromises that got passed into write()
// appear to complete. It indicates "change" on message level and we simply assume
// that there's change happening on byte level. If the user doesn't observe channel
// writability events then they'll eventually OOME and there's clearly a different
// problem and idleness is least of their concerns.
if (lastChangeCheckTimeStamp != lastWriteTime) {
lastChangeCheckTimeStamp = lastWriteTime;
// But this applies only if it's the non-first call.
if (!first) {
return true;
}
}
Channel channel = ctx.channel();
Unsafe unsafe = channel.unsafe();
ChannelOutboundBuffer buf = unsafe.outboundBuffer();
if (buf != null) {
int messageHashCode = System.identityHashCode(buf.current());
long pendingWriteBytes = buf.totalPendingWriteBytes();
if (messageHashCode != lastMessageHashCode || pendingWriteBytes != lastPendingWriteBytes) {
lastMessageHashCode = messageHashCode;
lastPendingWriteBytes = pendingWriteBytes;
if (!first) {
return true;
}
}
}
}
return false;
}
public Object consume() {
ChannelOutboundBuffer buf = unsafe().outboundBuffer();
if (buf != null) {
Object msg = buf.current();
if (msg != null) {
ReferenceCountUtil.retain(msg);
buf.remove();
return msg;
}
}
return null;
}
@Override
protected void doWrite(ChannelOutboundBuffer in) throws Exception {
for (;;) {
Object msg = in.current();
if (msg == null) {
break;
}
ReferenceCountUtil.retain(msg);
handleOutboundMessage(msg);
in.remove();
}
}
@Override
protected void doWrite(ChannelOutboundBuffer in) throws Exception {
for (;;) {
Object msg = in.current();
if (msg == null) {
// nothing left to write
break;
}
if (msg instanceof ByteBuf) {
ByteBuf buf = (ByteBuf) msg;
int readableBytes = buf.readableBytes();
while (readableBytes > 0) {
doWriteBytes(buf);
int newReadableBytes = buf.readableBytes();
in.progress(readableBytes - newReadableBytes);
readableBytes = newReadableBytes;
}
in.remove();
} else if (msg instanceof FileRegion) {
FileRegion region = (FileRegion) msg;
long transferred = region.transferred();
doWriteFileRegion(region);
in.progress(region.transferred() - transferred);
in.remove();
} else {
in.remove(new UnsupportedOperationException(
"unsupported message type: " + StringUtil.simpleClassName(msg)));
}
}
}
@Override
protected boolean doWriteMessage(Object msg, ChannelOutboundBuffer in) throws Exception {
SctpMessage packet = (SctpMessage) msg;
ByteBuf data = packet.content();
int dataLen = data.readableBytes();
if (dataLen == 0) {
return true;
}
ByteBufAllocator alloc = alloc();
boolean needsCopy = data.nioBufferCount() != 1;
if (!needsCopy) {
if (!data.isDirect() && alloc.isDirectBufferPooled()) {
needsCopy = true;
}
}
ByteBuffer nioData;
if (!needsCopy) {
nioData = data.nioBuffer();
} else {
data = alloc.directBuffer(dataLen).writeBytes(data);
nioData = data.nioBuffer();
}
final MessageInfo mi = MessageInfo.createOutgoing(association(), null, packet.streamIdentifier());
mi.payloadProtocolID(packet.protocolIdentifier());
mi.streamNumber(packet.streamIdentifier());
mi.unordered(packet.isUnordered());
final int writtenBytes = javaChannel().send(nioData, mi);
return writtenBytes > 0;
}
@Override
protected void doWrite(ChannelOutboundBuffer in) throws Exception {
int writeSpinCount = config().getWriteSpinCount();
do {
final int msgCount = in.size();
// Do gathering write if the outbound buffer entries start with more than one ByteBuf.
if (msgCount > 1 && in.current() instanceof ByteBuf) {
writeSpinCount -= doWriteMultiple(in);
} else if (msgCount == 0) {
// Wrote all messages.
writeFilter(false);
// Return here so we don't set the WRITE flag.
return;
} else { // msgCount == 1
writeSpinCount -= doWriteSingle(in);
}
// We do not break the loop here even if the outbound buffer was flushed completely,
// because a user might have triggered another write and flush when we notify his or her
// listeners.
} while (writeSpinCount > 0);
if (writeSpinCount == 0) {
// It is possible that we have set the write filter, woken up by KQUEUE because the socket is writable, and
// then use our write quantum. In this case we no longer want to set the write filter because the socket is
// still writable (as far as we know). We will find out next time we attempt to write if the socket is
// writable and set the write filter if necessary.
writeFilter(false);
// We used our writeSpin quantum, and should try to write again later.
eventLoop().execute(flushTask);
} else {
// Underlying descriptor can not accept all data currently, so set the WRITE flag to be woken up
// when it can accept more data.
writeFilter(true);
}
}
/**
* Attempt to write a single object.
* @param in the collection which contains objects to write.
* @return The value that should be decremented from the write quantum which starts at
* {@link ChannelConfig#getWriteSpinCount()}. The typical use cases are as follows:
* <ul>
* <li>0 - if no write was attempted. This is appropriate if an empty {@link ByteBuf} (or other empty content)
* is encountered</li>
* <li>1 - if a single call to write data was made to the OS</li>
* <li>{@link ChannelUtils#WRITE_STATUS_SNDBUF_FULL} - if an attempt to write data was made to the OS, but no
* data was accepted</li>
* </ul>
* @throws Exception If an I/O error occurs.
*/
protected int doWriteSingle(ChannelOutboundBuffer in) throws Exception {
// The outbound buffer contains only one message or it contains a file region.
Object msg = in.current();
if (msg instanceof ByteBuf) {
return writeBytes(in, (ByteBuf) msg);
} else if (msg instanceof DefaultFileRegion) {
return writeDefaultFileRegion(in, (DefaultFileRegion) msg);
} else if (msg instanceof FileRegion) {
return writeFileRegion(in, (FileRegion) msg);
} else {
// Should never reach here.
throw new Error();
}
}
@Override
protected void doWrite(ChannelOutboundBuffer in) throws Exception {
for (;;) {
Object msg = in.current();
if (msg == null) {
// Wrote all messages.
writeFilter(false);
break;
}
try {
boolean done = false;
for (int i = config().getWriteSpinCount(); i > 0; --i) {
if (doWriteMessage(msg)) {
done = true;
break;
}
}
if (done) {
in.remove();
} else {
// Did not write all messages.
writeFilter(true);
break;
}
} catch (IOException e) {
// Continue on write error as a DatagramChannel can write to multiple remote peers
//
// See https://github.com/netty/netty/issues/2665
in.remove(e);
}
}
}
@Override
protected int doWriteSingle(ChannelOutboundBuffer in) throws Exception {
Object msg = in.current();
if (msg instanceof FileDescriptor && socket.sendFd(((FileDescriptor) msg).intValue()) > 0) {
// File descriptor was written, so remove it.
in.remove();
return 1;
}
return super.doWriteSingle(in);
}
/**
* Returns a {@link NativeDatagramPacketArray} which is filled with the flushed messages of
* {@link ChannelOutboundBuffer}.
*/
static NativeDatagramPacketArray getInstance(ChannelOutboundBuffer buffer) throws Exception {
NativeDatagramPacketArray array = ARRAY.get();
array.count = 0;
buffer.forEachFlushedMessage(array);
return array;
}
@Override
protected void doWrite(ChannelOutboundBuffer in) throws Exception {
int writeSpinCount = config().getWriteSpinCount();
do {
final int msgCount = in.size();
// Do gathering write if the outbound buffer entries start with more than one ByteBuf.
if (msgCount > 1 && in.current() instanceof ByteBuf) {
writeSpinCount -= doWriteMultiple(in);
} else if (msgCount == 0) {
// Wrote all messages.
clearFlag(Native.EPOLLOUT);
// Return here so we not set the EPOLLOUT flag.
return;
} else { // msgCount == 1
writeSpinCount -= doWriteSingle(in);
}
// We do not break the loop here even if the outbound buffer was flushed completely,
// because a user might have triggered another write and flush when we notify his or her
// listeners.
} while (writeSpinCount > 0);
if (writeSpinCount == 0) {
// It is possible that we have set EPOLLOUT, woken up by EPOLL because the socket is writable, and then use
// our write quantum. In this case we no longer want to set the EPOLLOUT flag because the socket is still
// writable (as far as we know). We will find out next time we attempt to write if the socket is writable
// and set the EPOLLOUT if necessary.
clearFlag(Native.EPOLLOUT);
// We used our writeSpin quantum, and should try to write again later.
eventLoop().execute(flushTask);
} else {
// Underlying descriptor can not accept all data currently, so set the EPOLLOUT flag to be woken up
// when it can accept more data.
setFlag(Native.EPOLLOUT);
}
}
@Override
protected int doWriteSingle(ChannelOutboundBuffer in) throws Exception {
Object msg = in.current();
if (msg instanceof FileDescriptor && socket.sendFd(((FileDescriptor) msg).intValue()) > 0) {
// File descriptor was written, so remove it.
in.remove();
return 1;
}
return super.doWriteSingle(in);
}
@Override
protected void doWrite(ChannelOutboundBuffer in) throws Exception {
boolean sent = false;
for (; ; ) {
Object msg = in.current();
if (msg == null) {
flushPending = false;
break;
}
try {
boolean done = false;
if (kcpSend((ByteBuf) msg)) {
done = true;
sent = true;
}
if (done) {
in.remove();
} else {
flushPending = true;
break;
}
} catch (IOException e) {
throw e; // throw exception and close channel
}
}
if (sent) {
// update kcp
if (ukcp.isFastFlush()) {
updateKcp();
} else {
kcpTsUpdate(-1);
}
}
}
@Override
protected void doWrite(ChannelOutboundBuffer in) throws Exception {
boolean sent = false;
for (; ; ) {
Object msg = in.current();
if (msg == null) {
flushPending = false;
break;
}
try {
boolean done = false;
if (kcpSend((ByteBuf) msg)) {
done = true;
sent = true;
}
if (done) {
in.remove();
} else {
flushPending = true;
break;
}
} catch (IOException e) {
throw e; // throw exception and close channel
}
}
if (sent) {
// update kcp
if (ukcp.isFastFlush()) {
parent().updateChildKcp(this);
} else {
kcpTsUpdate(-1);
}
}
}