下面列出了io.netty.channel.ChannelOutboundBuffer#size ( ) 实例代码,或者点击链接到github查看源代码,也可以在右侧发表评论。
@Override
protected void doWrite(ChannelOutboundBuffer in) throws Exception {
int writeSpinCount = config().getWriteSpinCount();
do {
final int msgCount = in.size();
// Do gathering write if the outbound buffer entries start with more than one ByteBuf.
if (msgCount > 1 && in.current() instanceof ByteBuf) {
writeSpinCount -= doWriteMultiple(in);
} else if (msgCount == 0) {
// Wrote all messages.
writeFilter(false);
// Return here so we don't set the WRITE flag.
return;
} else { // msgCount == 1
writeSpinCount -= doWriteSingle(in);
}
// We do not break the loop here even if the outbound buffer was flushed completely,
// because a user might have triggered another write and flush when we notify his or her
// listeners.
} while (writeSpinCount > 0);
if (writeSpinCount == 0) {
// It is possible that we have set the write filter, woken up by KQUEUE because the socket is writable, and
// then use our write quantum. In this case we no longer want to set the write filter because the socket is
// still writable (as far as we know). We will find out next time we attempt to write if the socket is
// writable and set the write filter if necessary.
writeFilter(false);
// We used our writeSpin quantum, and should try to write again later.
eventLoop().execute(flushTask);
} else {
// Underlying descriptor can not accept all data currently, so set the WRITE flag to be woken up
// when it can accept more data.
writeFilter(true);
}
}
@Override
protected void doWrite(ChannelOutboundBuffer in) throws Exception {
int writeSpinCount = config().getWriteSpinCount();
do {
final int msgCount = in.size();
// Do gathering write if the outbound buffer entries start with more than one ByteBuf.
if (msgCount > 1 && in.current() instanceof ByteBuf) {
writeSpinCount -= doWriteMultiple(in);
} else if (msgCount == 0) {
// Wrote all messages.
clearFlag(Native.EPOLLOUT);
// Return here so we not set the EPOLLOUT flag.
return;
} else { // msgCount == 1
writeSpinCount -= doWriteSingle(in);
}
// We do not break the loop here even if the outbound buffer was flushed completely,
// because a user might have triggered another write and flush when we notify his or her
// listeners.
} while (writeSpinCount > 0);
if (writeSpinCount == 0) {
// It is possible that we have set EPOLLOUT, woken up by EPOLL because the socket is writable, and then use
// our write quantum. In this case we no longer want to set the EPOLLOUT flag because the socket is still
// writable (as far as we know). We will find out next time we attempt to write if the socket is writable
// and set the EPOLLOUT if necessary.
clearFlag(Native.EPOLLOUT);
// We used our writeSpin quantum, and should try to write again later.
eventLoop().execute(flushTask);
} else {
// Underlying descriptor can not accept all data currently, so set the EPOLLOUT flag to be woken up
// when it can accept more data.
setFlag(Native.EPOLLOUT);
}
}
@Override
protected void doWrite(ChannelOutboundBuffer in) throws Exception {
int writeSpinCount = config().getWriteSpinCount();
for (;;) {
final int msgCount = in.size();
if (msgCount == 0) {
// Wrote all messages.
clearFlag(Native.EPOLLOUT);
break;
}
// Do gathering write if the outbounf buffer entries start with more than one ByteBuf.
if (msgCount > 1 && in.current() instanceof ByteBuf) {
if (!doWriteMultiple(in, writeSpinCount)) {
break;
}
// We do not break the loop here even if the outbound buffer was flushed completely,
// because a user might have triggered another write and flush when we notify his or her
// listeners.
} else { // msgCount == 1
if (!doWriteSingle(in, writeSpinCount)) {
break;
}
}
}
}
@Override
protected void doWrite(ChannelOutboundBuffer in) throws Exception {
if (!writeSelector.isOpen()) {
return;
}
final int size = in.size();
final int selectedKeys = writeSelector.select(SO_TIMEOUT);
if (selectedKeys > 0) {
final Set<SelectionKey> writableKeys = writeSelector.selectedKeys();
if (writableKeys.isEmpty()) {
return;
}
Iterator<SelectionKey> writableKeysIt = writableKeys.iterator();
int written = 0;
for (;;) {
if (written == size) {
// all written
return;
}
writableKeysIt.next();
writableKeysIt.remove();
SctpMessage packet = (SctpMessage) in.current();
if (packet == null) {
return;
}
ByteBuf data = packet.content();
int dataLen = data.readableBytes();
ByteBuffer nioData;
if (data.nioBufferCount() != -1) {
nioData = data.nioBuffer();
} else {
nioData = ByteBuffer.allocate(dataLen);
data.getBytes(data.readerIndex(), nioData);
nioData.flip();
}
final MessageInfo mi = MessageInfo.createOutgoing(association(), null, packet.streamIdentifier());
mi.payloadProtocolID(packet.protocolIdentifier());
mi.streamNumber(packet.streamIdentifier());
mi.unordered(packet.isUnordered());
ch.send(nioData, mi);
written ++;
in.remove();
if (!writableKeysIt.hasNext()) {
return;
}
}
}
}
@Override
protected void doWrite(ChannelOutboundBuffer in) throws Exception {
for (;;) {
Object msg = in.current();
if (msg == null) {
// Wrote all messages.
clearFlag(Native.EPOLLOUT);
break;
}
try {
// Check if sendmmsg(...) is supported which is only the case for GLIBC 2.14+
if (Native.IS_SUPPORTING_SENDMMSG && in.size() > 1) {
NativeDatagramPacketArray array = NativeDatagramPacketArray.getInstance(in);
int cnt = array.count();
if (cnt >= 1) {
// Try to use gathering writes via sendmmsg(...) syscall.
int offset = 0;
NativeDatagramPacketArray.NativeDatagramPacket[] packets = array.packets();
while (cnt > 0) {
int send = Native.sendmmsg(socket.intValue(), packets, offset, cnt);
if (send == 0) {
// Did not write all messages.
setFlag(Native.EPOLLOUT);
return;
}
for (int i = 0; i < send; i++) {
in.remove();
}
cnt -= send;
offset += send;
}
continue;
}
}
boolean done = false;
for (int i = config().getWriteSpinCount(); i > 0; --i) {
if (doWriteMessage(msg)) {
done = true;
break;
}
}
if (done) {
in.remove();
} else {
// Did not write all messages.
setFlag(Native.EPOLLOUT);
break;
}
} catch (IOException e) {
// Continue on write error as a DatagramChannel can write to multiple remote peers
//
// See https://github.com/netty/netty/issues/2665
in.remove(e);
}
}
}
@Override
protected void doWrite(ChannelOutboundBuffer in) throws Exception {
if (state < 2) {
throw new NotYetConnectedException();
}
if (state > 2) {
throw new ClosedChannelException();
}
final LocalChannel peer = this.peer;
final ChannelPipeline peerPipeline = peer.pipeline();
final EventLoop peerLoop = peer.eventLoop();
if (peerLoop == eventLoop()) {
for (;;) {
Object msg = in.current();
if (msg == null) {
break;
}
peer.inboundBuffer.add(msg);
ReferenceCountUtil.retain(msg);
in.remove();
}
finishPeerRead(peer, peerPipeline);
} else {
// Use a copy because the original msgs will be recycled by AbstractChannel.
final Object[] msgsCopy = new Object[in.size()];
for (int i = 0; i < msgsCopy.length; i ++) {
msgsCopy[i] = ReferenceCountUtil.retain(in.current());
in.remove();
}
peerLoop.execute(new Runnable() {
@Override
public void run() {
Collections.addAll(peer.inboundBuffer, msgsCopy);
finishPeerRead(peer, peerPipeline);
}
});
}
}
@Override
protected void doWrite(ChannelOutboundBuffer in) throws Exception {
if (!writeSelector.isOpen()) {
return;
}
final int size = in.size();
final int selectedKeys = writeSelector.select(SO_TIMEOUT);
if (selectedKeys > 0) {
final Set<SelectionKey> writableKeys = writeSelector.selectedKeys();
if (writableKeys.isEmpty()) {
return;
}
Iterator<SelectionKey> writableKeysIt = writableKeys.iterator();
int written = 0;
for (;;) {
if (written == size) {
// all written
return;
}
writableKeysIt.next();
writableKeysIt.remove();
SctpMessage packet = (SctpMessage) in.current();
if (packet == null) {
return;
}
ByteBuf data = packet.content();
int dataLen = data.readableBytes();
ByteBuffer nioData;
if (data.nioBufferCount() != -1) {
nioData = data.nioBuffer();
} else {
nioData = ByteBuffer.allocate(dataLen);
data.getBytes(data.readerIndex(), nioData);
nioData.flip();
}
final MessageInfo mi = MessageInfo.createOutgoing(association(), null, packet.streamIdentifier());
mi.payloadProtocolID(packet.protocolIdentifier());
mi.streamNumber(packet.streamIdentifier());
ch.send(nioData, mi);
written ++;
in.remove();
if (!writableKeysIt.hasNext()) {
return;
}
}
}
}
@Override
protected void doWrite(ChannelOutboundBuffer in) throws Exception {
for (;;) {
Object msg = in.current();
if (msg == null) {
// Wrote all messages.
clearFlag(Native.EPOLLOUT);
break;
}
try {
// Check if sendmmsg(...) is supported which is only the case for GLIBC 2.14+
if (Native.IS_SUPPORTING_SENDMMSG && in.size() > 1) {
NativeDatagramPacketArray array = NativeDatagramPacketArray.getInstance(in);
int cnt = array.count();
if (cnt >= 1) {
// Try to use gathering writes via sendmmsg(...) syscall.
int offset = 0;
NativeDatagramPacketArray.NativeDatagramPacket[] packets = array.packets();
while (cnt > 0) {
int send = Native.sendmmsg(fd().intValue(), packets, offset, cnt);
if (send == 0) {
// Did not write all messages.
setFlag(Native.EPOLLOUT);
return;
}
for (int i = 0; i < send; i++) {
in.remove();
}
cnt -= send;
offset += send;
}
continue;
}
}
boolean done = false;
for (int i = config().getWriteSpinCount() - 1; i >= 0; i--) {
if (doWriteMessage(msg)) {
done = true;
break;
}
}
if (done) {
in.remove();
} else {
// Did not write all messages.
setFlag(Native.EPOLLOUT);
break;
}
} catch (IOException e) {
// Continue on write error as a DatagramChannel can write to multiple remote peers
//
// See https://github.com/netty/netty/issues/2665
in.remove(e);
}
}
}