io.netty.channel.ChannelOutboundBuffer#size ( )源码实例Demo

下面列出了io.netty.channel.ChannelOutboundBuffer#size ( ) 实例代码,或者点击链接到github查看源代码,也可以在右侧发表评论。

@Override
protected void doWrite(ChannelOutboundBuffer in) throws Exception {
    int writeSpinCount = config().getWriteSpinCount();
    do {
        final int msgCount = in.size();
        // Do gathering write if the outbound buffer entries start with more than one ByteBuf.
        if (msgCount > 1 && in.current() instanceof ByteBuf) {
            writeSpinCount -= doWriteMultiple(in);
        } else if (msgCount == 0) {
            // Wrote all messages.
            writeFilter(false);
            // Return here so we don't set the WRITE flag.
            return;
        } else { // msgCount == 1
            writeSpinCount -= doWriteSingle(in);
        }

        // We do not break the loop here even if the outbound buffer was flushed completely,
        // because a user might have triggered another write and flush when we notify his or her
        // listeners.
    } while (writeSpinCount > 0);

    if (writeSpinCount == 0) {
        // It is possible that we have set the write filter, woken up by KQUEUE because the socket is writable, and
        // then use our write quantum. In this case we no longer want to set the write filter because the socket is
        // still writable (as far as we know). We will find out next time we attempt to write if the socket is
        // writable and set the write filter if necessary.
        writeFilter(false);

        // We used our writeSpin quantum, and should try to write again later.
        eventLoop().execute(flushTask);
    } else {
        // Underlying descriptor can not accept all data currently, so set the WRITE flag to be woken up
        // when it can accept more data.
        writeFilter(true);
    }
}
 
源代码2 项目: netty-4.1.22   文件: AbstractEpollStreamChannel.java
@Override
protected void doWrite(ChannelOutboundBuffer in) throws Exception {
    int writeSpinCount = config().getWriteSpinCount();
    do {
        final int msgCount = in.size();
        // Do gathering write if the outbound buffer entries start with more than one ByteBuf.
        if (msgCount > 1 && in.current() instanceof ByteBuf) {
            writeSpinCount -= doWriteMultiple(in);
        } else if (msgCount == 0) {
            // Wrote all messages.
            clearFlag(Native.EPOLLOUT);
            // Return here so we not set the EPOLLOUT flag.
            return;
        } else {  // msgCount == 1
            writeSpinCount -= doWriteSingle(in);
        }

        // We do not break the loop here even if the outbound buffer was flushed completely,
        // because a user might have triggered another write and flush when we notify his or her
        // listeners.
    } while (writeSpinCount > 0);

    if (writeSpinCount == 0) {
        // It is possible that we have set EPOLLOUT, woken up by EPOLL because the socket is writable, and then use
        // our write quantum. In this case we no longer want to set the EPOLLOUT flag because the socket is still
        // writable (as far as we know). We will find out next time we attempt to write if the socket is writable
        // and set the EPOLLOUT if necessary.
        clearFlag(Native.EPOLLOUT);

        // We used our writeSpin quantum, and should try to write again later.
        eventLoop().execute(flushTask);
    } else {
        // Underlying descriptor can not accept all data currently, so set the EPOLLOUT flag to be woken up
        // when it can accept more data.
        setFlag(Native.EPOLLOUT);
    }
}
 
@Override
protected void doWrite(ChannelOutboundBuffer in) throws Exception {
    int writeSpinCount = config().getWriteSpinCount();
    for (;;) {
        final int msgCount = in.size();

        if (msgCount == 0) {
            // Wrote all messages.
            clearFlag(Native.EPOLLOUT);
            break;
        }

        // Do gathering write if the outbounf buffer entries start with more than one ByteBuf.
        if (msgCount > 1 && in.current() instanceof ByteBuf) {
            if (!doWriteMultiple(in, writeSpinCount)) {
                break;
            }

            // We do not break the loop here even if the outbound buffer was flushed completely,
            // because a user might have triggered another write and flush when we notify his or her
            // listeners.
        } else { // msgCount == 1
            if (!doWriteSingle(in, writeSpinCount)) {
                break;
            }
        }
    }
}
 
源代码4 项目: netty-4.1.22   文件: OioSctpChannel.java
@Override
protected void doWrite(ChannelOutboundBuffer in) throws Exception {
    if (!writeSelector.isOpen()) {
        return;
    }
    final int size = in.size();
    final int selectedKeys = writeSelector.select(SO_TIMEOUT);
    if (selectedKeys > 0) {
        final Set<SelectionKey> writableKeys = writeSelector.selectedKeys();
        if (writableKeys.isEmpty()) {
            return;
        }
        Iterator<SelectionKey> writableKeysIt = writableKeys.iterator();
        int written = 0;
        for (;;) {
            if (written == size) {
                // all written
                return;
            }
            writableKeysIt.next();
            writableKeysIt.remove();

            SctpMessage packet = (SctpMessage) in.current();
            if (packet == null) {
                return;
            }

            ByteBuf data = packet.content();
            int dataLen = data.readableBytes();
            ByteBuffer nioData;

            if (data.nioBufferCount() != -1) {
                nioData = data.nioBuffer();
            } else {
                nioData = ByteBuffer.allocate(dataLen);
                data.getBytes(data.readerIndex(), nioData);
                nioData.flip();
            }

            final MessageInfo mi = MessageInfo.createOutgoing(association(), null, packet.streamIdentifier());
            mi.payloadProtocolID(packet.protocolIdentifier());
            mi.streamNumber(packet.streamIdentifier());
            mi.unordered(packet.isUnordered());

            ch.send(nioData, mi);
            written ++;
            in.remove();

            if (!writableKeysIt.hasNext()) {
                return;
            }
        }
    }
}
 
源代码5 项目: netty-4.1.22   文件: EpollDatagramChannel.java
@Override
protected void doWrite(ChannelOutboundBuffer in) throws Exception {
    for (;;) {
        Object msg = in.current();
        if (msg == null) {
            // Wrote all messages.
            clearFlag(Native.EPOLLOUT);
            break;
        }

        try {
            // Check if sendmmsg(...) is supported which is only the case for GLIBC 2.14+
            if (Native.IS_SUPPORTING_SENDMMSG && in.size() > 1) {
                NativeDatagramPacketArray array = NativeDatagramPacketArray.getInstance(in);
                int cnt = array.count();

                if (cnt >= 1) {
                    // Try to use gathering writes via sendmmsg(...) syscall.
                    int offset = 0;
                    NativeDatagramPacketArray.NativeDatagramPacket[] packets = array.packets();

                    while (cnt > 0) {
                        int send = Native.sendmmsg(socket.intValue(), packets, offset, cnt);
                        if (send == 0) {
                            // Did not write all messages.
                            setFlag(Native.EPOLLOUT);
                            return;
                        }
                        for (int i = 0; i < send; i++) {
                            in.remove();
                        }
                        cnt -= send;
                        offset += send;
                    }
                    continue;
                }
            }
            boolean done = false;
            for (int i = config().getWriteSpinCount(); i > 0; --i) {
                if (doWriteMessage(msg)) {
                    done = true;
                    break;
                }
            }

            if (done) {
                in.remove();
            } else {
                // Did not write all messages.
                setFlag(Native.EPOLLOUT);
                break;
            }
        } catch (IOException e) {
            // Continue on write error as a DatagramChannel can write to multiple remote peers
            //
            // See https://github.com/netty/netty/issues/2665
            in.remove(e);
        }
    }
}
 
源代码6 项目: netty4.0.27Learn   文件: LocalChannel.java
@Override
protected void doWrite(ChannelOutboundBuffer in) throws Exception {
    if (state < 2) {
        throw new NotYetConnectedException();
    }
    if (state > 2) {
        throw new ClosedChannelException();
    }

    final LocalChannel peer = this.peer;
    final ChannelPipeline peerPipeline = peer.pipeline();
    final EventLoop peerLoop = peer.eventLoop();

    if (peerLoop == eventLoop()) {
        for (;;) {
            Object msg = in.current();
            if (msg == null) {
                break;
            }
            peer.inboundBuffer.add(msg);
            ReferenceCountUtil.retain(msg);
            in.remove();
        }
        finishPeerRead(peer, peerPipeline);
    } else {
        // Use a copy because the original msgs will be recycled by AbstractChannel.
        final Object[] msgsCopy = new Object[in.size()];
        for (int i = 0; i < msgsCopy.length; i ++) {
            msgsCopy[i] = ReferenceCountUtil.retain(in.current());
            in.remove();
        }

        peerLoop.execute(new Runnable() {
            @Override
            public void run() {
                Collections.addAll(peer.inboundBuffer, msgsCopy);
                finishPeerRead(peer, peerPipeline);
            }
        });
    }
}
 
源代码7 项目: netty4.0.27Learn   文件: OioSctpChannel.java
@Override
protected void doWrite(ChannelOutboundBuffer in) throws Exception {
    if (!writeSelector.isOpen()) {
        return;
    }
    final int size = in.size();
    final int selectedKeys = writeSelector.select(SO_TIMEOUT);
    if (selectedKeys > 0) {
        final Set<SelectionKey> writableKeys = writeSelector.selectedKeys();
        if (writableKeys.isEmpty()) {
            return;
        }
        Iterator<SelectionKey> writableKeysIt = writableKeys.iterator();
        int written = 0;
        for (;;) {
            if (written == size) {
                // all written
                return;
            }
            writableKeysIt.next();
            writableKeysIt.remove();

            SctpMessage packet = (SctpMessage) in.current();
            if (packet == null) {
                return;
            }

            ByteBuf data = packet.content();
            int dataLen = data.readableBytes();
            ByteBuffer nioData;

            if (data.nioBufferCount() != -1) {
                nioData = data.nioBuffer();
            } else {
                nioData = ByteBuffer.allocate(dataLen);
                data.getBytes(data.readerIndex(), nioData);
                nioData.flip();
            }

            final MessageInfo mi = MessageInfo.createOutgoing(association(), null, packet.streamIdentifier());
            mi.payloadProtocolID(packet.protocolIdentifier());
            mi.streamNumber(packet.streamIdentifier());

            ch.send(nioData, mi);
            written ++;
            in.remove();

            if (!writableKeysIt.hasNext()) {
                return;
            }
        }
    }
}
 
源代码8 项目: netty4.0.27Learn   文件: EpollDatagramChannel.java
@Override
protected void doWrite(ChannelOutboundBuffer in) throws Exception {
    for (;;) {
        Object msg = in.current();
        if (msg == null) {
            // Wrote all messages.
            clearFlag(Native.EPOLLOUT);
            break;
        }

        try {
            // Check if sendmmsg(...) is supported which is only the case for GLIBC 2.14+
            if (Native.IS_SUPPORTING_SENDMMSG && in.size() > 1) {
                NativeDatagramPacketArray array = NativeDatagramPacketArray.getInstance(in);
                int cnt = array.count();

                if (cnt >= 1) {
                    // Try to use gathering writes via sendmmsg(...) syscall.
                    int offset = 0;
                    NativeDatagramPacketArray.NativeDatagramPacket[] packets = array.packets();

                    while (cnt > 0) {
                        int send = Native.sendmmsg(fd().intValue(), packets, offset, cnt);
                        if (send == 0) {
                            // Did not write all messages.
                            setFlag(Native.EPOLLOUT);
                            return;
                        }
                        for (int i = 0; i < send; i++) {
                            in.remove();
                        }
                        cnt -= send;
                        offset += send;
                    }
                    continue;
                }
            }
            boolean done = false;
            for (int i = config().getWriteSpinCount() - 1; i >= 0; i--) {
                if (doWriteMessage(msg)) {
                    done = true;
                    break;
                }
            }

            if (done) {
                in.remove();
            } else {
                // Did not write all messages.
                setFlag(Native.EPOLLOUT);
                break;
            }
        } catch (IOException e) {
            // Continue on write error as a DatagramChannel can write to multiple remote peers
            //
            // See https://github.com/netty/netty/issues/2665
            in.remove(e);
        }
    }
}