io.netty.channel.ChannelConfig#isAutoRead ( )源码实例Demo

下面列出了io.netty.channel.ChannelConfig#isAutoRead ( ) 实例代码,或者点击链接到github查看源代码,也可以在右侧发表评论。

源代码1 项目: arcusplatform   文件: ServerBootstrap.java
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
    final ChannelConfig config = ctx.channel().config();
    if (config.isAutoRead()) {
        // stop accept new connections for 1 second to allow the channel to recover
        // See https://github.com/netty/netty/issues/1328
        config.setAutoRead(false);
        ctx.channel().eventLoop().schedule(new Runnable() {
            @Override
            public void run() {
               config.setAutoRead(true);
            }
        }, 1, TimeUnit.SECONDS);
    }
    // still let the exceptionCaught event flow through the pipeline to give the user
    // a chance to do something with it
    ctx.fireExceptionCaught(cause);
}
 
源代码2 项目: netty-4.1.22   文件: AbstractKQueueChannel.java
final void readReadyFinally(ChannelConfig config) {
    maybeMoreDataToRead = allocHandle.maybeMoreDataToRead();
    // Check if there is a readPending which was not processed yet.
    // This could be for two reasons:
    // * The user called Channel.read() or ChannelHandlerContext.read() in channelRead(...) method
    // * The user called Channel.read() or ChannelHandlerContext.read() in channelReadComplete(...) method
    //
    // See https://github.com/netty/netty/issues/2254
    if (!readPending && !config.isAutoRead()) {
        clearReadFilter0();
    } else if (readPending && maybeMoreDataToRead) {
        // trigger a read again as there may be something left to read and because of ET we
        // will not get notified again until we read everything from the socket
        //
        // It is possible the last fireChannelRead call could cause the user to call read() again, or if
        // autoRead is true the call to channelReadComplete would also call read, but maybeMoreDataToRead is set
        // to false before every read operation to prevent re-entry into readReady() we will not read from
        // the underlying OS again unless the user happens to call read again.
        executeReadReadyRunnable(config);
    }
}
 
源代码3 项目: netty-4.1.22   文件: AbstractEpollChannel.java
final void epollInFinally(ChannelConfig config) {
    maybeMoreDataToRead = allocHandle.isEdgeTriggered() && allocHandle.maybeMoreDataToRead();
    // Check if there is a readPending which was not processed yet.
    // This could be for two reasons:
    // * The user called Channel.read() or ChannelHandlerContext.read() in channelRead(...) method
    // * The user called Channel.read() or ChannelHandlerContext.read() in channelReadComplete(...) method
    //
    // See https://github.com/netty/netty/issues/2254
    if (!readPending && !config.isAutoRead()) {
        clearEpollIn();
    } else if (readPending && maybeMoreDataToRead) {
        // trigger a read again as there may be something left to read and because of epoll ET we
        // will not get notified again until we read everything from the socket
        //
        // It is possible the last fireChannelRead call could cause the user to call read() again, or if
        // autoRead is true the call to channelReadComplete would also call read, but maybeMoreDataToRead is set
        // to false before every read operation to prevent re-entry into epollInReady() we will not read from
        // the underlying OS again unless the user happens to call read again.
        executeEpollInReadyRunnable(config);
    }
}
 
源代码4 项目: netty4.0.27Learn   文件: ServerBootstrap.java
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
    final ChannelConfig config = ctx.channel().config();
    if (config.isAutoRead()) {
        // stop accept new connections for 1 second to allow the channel to recover
        // See https://github.com/netty/netty/issues/1328
        config.setAutoRead(false);
        ctx.channel().eventLoop().schedule(new Runnable() {
            @Override
            public void run() {
               config.setAutoRead(true);
            }
        }, 1, TimeUnit.SECONDS);
    }
    // still let the exceptionCaught event flow through the pipeline to give the user
    // a chance to do something with it
    ctx.fireExceptionCaught(cause);
}
 
@Override
public void run() {
    Channel channel = ctx.channel();
    ChannelConfig config = channel.config();
    if (!config.isAutoRead() && isHandlerActive(ctx)) {
        // If AutoRead is False and Active is True, user make a direct setAutoRead(false)
        // Then Just reset the status
        if (logger.isDebugEnabled()) {
            logger.debug("Not unsuspend: " + config.isAutoRead() + ':' +
                    isHandlerActive(ctx));
        }
        channel.attr(READ_SUSPENDED).set(false);
    } else {
        // Anything else allows the handler to reset the AutoRead
        if (logger.isDebugEnabled()) {
            if (config.isAutoRead() && !isHandlerActive(ctx)) {
                logger.debug("Unsuspend: " + config.isAutoRead() + ':' +
                        isHandlerActive(ctx));
            } else {
                logger.debug("Normal unsuspend: " + config.isAutoRead() + ':'
                        + isHandlerActive(ctx));
            }
        }
        channel.attr(READ_SUSPENDED).set(false);
        config.setAutoRead(true);
        channel.read();
    }
    if (logger.isDebugEnabled()) {
        logger.debug("Unsuspend final status => " + config.isAutoRead() + ':'
                + isHandlerActive(ctx));
    }
}
 
源代码6 项目: netty-4.1.22   文件: ServerBootstrap.java
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
    final ChannelConfig config = ctx.channel().config();
    if (config.isAutoRead()) {
        // stop accept new connections for 1 second to allow the channel to recover
        // See https://github.com/netty/netty/issues/1328
        config.setAutoRead(false);
        ctx.channel().eventLoop().schedule(enableAutoReadTask, 1, TimeUnit.SECONDS);
    }
    // still let the exceptionCaught event flow through the pipeline to give the user
    // a chance to do something with it
    ctx.fireExceptionCaught(cause);
}
 
源代码7 项目: reactor-netty   文件: ServerTransport.java
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
	ChannelConfig config = ctx.channel().config();
	if (config.isAutoRead()) {
		// stop accept new connections for 1 second to allow the channel to recover
		// See https://github.com/netty/netty/issues/1328
		config.setAutoRead(false);
		ctx.channel().eventLoop().schedule(enableAutoReadTask, 1, TimeUnit.SECONDS);
	}
	// still let the exceptionCaught event flow through the pipeline to give the user
	// a chance to do something with it
	ctx.fireExceptionCaught(cause);
}
 
@Override
public void run() {
    ChannelConfig config = ctx.channel().config();
    if (!config.isAutoRead() && isHandlerActive(ctx)) {
        // If AutoRead is False and Active is True, user make a direct setAutoRead(false)
        // Then Just reset the status
        if (logger.isDebugEnabled()) {
            logger.debug("Not unsuspend: " + config.isAutoRead() + ':' +
                    isHandlerActive(ctx));
        }
        ctx.attr(READ_SUSPENDED).set(false);
    } else {
        // Anything else allows the handler to reset the AutoRead
        if (logger.isDebugEnabled()) {
            if (config.isAutoRead() && !isHandlerActive(ctx)) {
                logger.debug("Unsuspend: " + config.isAutoRead() + ':' +
                        isHandlerActive(ctx));
            } else {
                logger.debug("Normal unsuspend: " + config.isAutoRead() + ':'
                        + isHandlerActive(ctx));
            }
        }
        ctx.attr(READ_SUSPENDED).set(false);
        config.setAutoRead(true);
        ctx.channel().read();
    }
    if (logger.isDebugEnabled()) {
        logger.debug("Unsupsend final status => " + config.isAutoRead() + ':'
                + isHandlerActive(ctx));
    }
}
 
源代码9 项目: tchannel-java   文件: LoadControlHandler.java
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) {
    if (msg instanceof Request) {
        ChannelConfig config = ctx.channel().config();
        if (++outstanding >= high && config.isAutoRead()) {
            config.setAutoRead(false);
        }
    }
    ctx.fireChannelRead(msg);
}
 
源代码10 项目: tchannel-java   文件: LoadControlHandler.java
@Override
public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) {
    if (msg instanceof Response) {
        ChannelConfig config = ctx.channel().config();
        if (--outstanding <= low && !config.isAutoRead()) {
            config.setAutoRead(true);
        }
    }
    ctx.write(msg, promise);
}
 
源代码11 项目: netty-4.1.22   文件: AbstractOioByteChannel.java
@Override
protected void doRead() {
    final ChannelConfig config = config();
    if (isInputShutdown() || !readPending) {
        // We have to check readPending here because the Runnable to read could have been scheduled and later
        // during the same read loop readPending was set to false.
        return;
    }
    // In OIO we should set readPending to false even if the read was not successful so we can schedule
    // another read on the event loop if no reads are done.
    readPending = false;

    final ChannelPipeline pipeline = pipeline();
    final ByteBufAllocator allocator = config.getAllocator();
    final RecvByteBufAllocator.Handle allocHandle = unsafe().recvBufAllocHandle();
    allocHandle.reset(config);

    ByteBuf byteBuf = null;
    boolean close = false;
    boolean readData = false;
    try {
        byteBuf = allocHandle.allocate(allocator);
        do {
            allocHandle.lastBytesRead(doReadBytes(byteBuf));
            if (allocHandle.lastBytesRead() <= 0) {
                if (!byteBuf.isReadable()) { // nothing was read. release the buffer.
                    byteBuf.release();
                    byteBuf = null;
                    close = allocHandle.lastBytesRead() < 0;
                    if (close) {
                        // There is nothing left to read as we received an EOF.
                        readPending = false;
                    }
                }
                break;
            } else {
                readData = true;
            }

            final int available = available();
            if (available <= 0) {
                break;
            }

            // Oio collects consecutive read operations into 1 ByteBuf before propagating up the pipeline.
            if (!byteBuf.isWritable()) {
                final int capacity = byteBuf.capacity();
                final int maxCapacity = byteBuf.maxCapacity();
                if (capacity == maxCapacity) {
                    allocHandle.incMessagesRead(1);
                    readPending = false;
                    pipeline.fireChannelRead(byteBuf);
                    byteBuf = allocHandle.allocate(allocator);
                } else {
                    final int writerIndex = byteBuf.writerIndex();
                    if (writerIndex + available > maxCapacity) {
                        byteBuf.capacity(maxCapacity);
                    } else {
                        byteBuf.ensureWritable(available);
                    }
                }
            }
        } while (allocHandle.continueReading());

        if (byteBuf != null) {
            // It is possible we allocated a buffer because the previous one was not writable, but then didn't use
            // it because allocHandle.continueReading() returned false.
            if (byteBuf.isReadable()) {
                readPending = false;
                pipeline.fireChannelRead(byteBuf);
            } else {
                byteBuf.release();
            }
            byteBuf = null;
        }

        if (readData) {
            allocHandle.readComplete();
            pipeline.fireChannelReadComplete();
        }

        if (close) {
            closeOnRead(pipeline);
        }
    } catch (Throwable t) {
        handleReadException(pipeline, byteBuf, t, close, allocHandle);
    } finally {
        if (readPending || config.isAutoRead() || !readData && isActive()) {
            // Reading 0 bytes could mean there is a SocketTimeout and no data was actually read, so we
            // should execute read() again because no data may have been read.
            read();
        }
    }
}
 
源代码12 项目: netty-4.1.22   文件: AbstractOioMessageChannel.java
@Override
protected void doRead() {
    if (!readPending) {
        // We have to check readPending here because the Runnable to read could have been scheduled and later
        // during the same read loop readPending was set to false.
        return;
    }
    // In OIO we should set readPending to false even if the read was not successful so we can schedule
    // another read on the event loop if no reads are done.
    readPending = false;

    final ChannelConfig config = config();
    final ChannelPipeline pipeline = pipeline();
    final RecvByteBufAllocator.Handle allocHandle = unsafe().recvBufAllocHandle();
    allocHandle.reset(config);

    boolean closed = false;
    Throwable exception = null;
    try {
        do {
            // Perform a read.
            int localRead = doReadMessages(readBuf);
            if (localRead == 0) {
                break;
            }
            if (localRead < 0) {
                closed = true;
                break;
            }

            allocHandle.incMessagesRead(localRead);
        } while (allocHandle.continueReading());
    } catch (Throwable t) {
        exception = t;
    }

    boolean readData = false;
    int size = readBuf.size();
    if (size > 0) {
        readData = true;
        for (int i = 0; i < size; i++) {
            readPending = false;
            pipeline.fireChannelRead(readBuf.get(i));
        }
        readBuf.clear();
        allocHandle.readComplete();
        pipeline.fireChannelReadComplete();
    }

    if (exception != null) {
        if (exception instanceof IOException) {
            closed = true;
        }

        pipeline.fireExceptionCaught(exception);
    }

    if (closed) {
        if (isOpen()) {
            unsafe().close(unsafe().voidPromise());
        }
    } else if (readPending || config.isAutoRead() || !readData && isActive()) {
        // Reading 0 bytes could mean there is a SocketTimeout and no data was actually read, so we
        // should execute read() again because no data may have been read.
        read();
    }
}
 
源代码13 项目: netty-4.1.22   文件: AbstractNioByteChannel.java
@Override
public final void read() {
    final ChannelConfig config = config();
    final ChannelPipeline pipeline = pipeline();
    final ByteBufAllocator allocator = config.getAllocator();
    final RecvByteBufAllocator.Handle allocHandle = recvBufAllocHandle();
    allocHandle.reset(config);

    ByteBuf byteBuf = null;
    boolean close = false;
    try {
        do {
            byteBuf = allocHandle.allocate(allocator);
            allocHandle.lastBytesRead(doReadBytes(byteBuf));
            if (allocHandle.lastBytesRead() <= 0) {
                // nothing was read. release the buffer.
                byteBuf.release();
                byteBuf = null;
                close = allocHandle.lastBytesRead() < 0;
                if (close) {
                    // There is nothing left to read as we received an EOF.
                    readPending = false;
                }
                break;
            }

            allocHandle.incMessagesRead(1);
            readPending = false;
            pipeline.fireChannelRead(byteBuf);
            byteBuf = null;
        } while (allocHandle.continueReading());

        allocHandle.readComplete();
        pipeline.fireChannelReadComplete();

        if (close) {
            closeOnRead(pipeline);
        }
    } catch (Throwable t) {
        handleReadException(pipeline, byteBuf, t, close, allocHandle);
    } finally {
        // Check if there is a readPending which was not processed yet.
        // This could be for two reasons:
        // * The user called Channel.read() or ChannelHandlerContext.read() in channelRead(...) method
        // * The user called Channel.read() or ChannelHandlerContext.read() in channelReadComplete(...) method
        //
        // See https://github.com/netty/netty/issues/2254
        if (!readPending && !config.isAutoRead()) {
            removeReadOp();
        }
    }
}
 
@Override
protected void doRead() {
    final ChannelConfig config = config();
    final ChannelPipeline pipeline = pipeline();
    boolean closed = false;
    final int maxMessagesPerRead = config.getMaxMessagesPerRead();

    Throwable exception = null;
    int localRead = 0;
    int totalRead = 0;
    try {
        for (;;) {
            // Perform a read.
            localRead = doReadMessages(readBuf);
            if (localRead == 0) {
                break;
            }
            if (localRead < 0) {
                closed = true;
                break;
            }

            // Notify with the received messages and clear the buffer.
            int size = readBuf.size();
            for (int i = 0; i < size; i ++) {
                pipeline.fireChannelRead(readBuf.get(i));
            }
            readBuf.clear();

            // Do not read beyond maxMessagesPerRead.
            // Do not continue reading if autoRead has been turned off.
            totalRead += localRead;
            if (totalRead >= maxMessagesPerRead || !config.isAutoRead()) {
                break;
            }
        }
    } catch (Throwable t) {
        exception = t;
    }

    pipeline.fireChannelReadComplete();

    if (exception != null) {
        if (exception instanceof IOException) {
            closed = true;
        }

        pipeline().fireExceptionCaught(exception);
    }

    if (closed) {
        if (isOpen()) {
            unsafe().close(unsafe().voidPromise());
        }
    } else if (localRead == 0 && isActive()) {
        // If the read amount was 0 and the channel is still active we need to trigger a new read()
        // as otherwise we will never try to read again and the user will never know.
        // Just call read() is ok here as it will be submitted to the EventLoop as a task and so we are
        // able to process the rest of the tasks in the queue first.
        //
        // See https://github.com/netty/netty/issues/2404
        read();
    }
}
 
源代码15 项目: netty4.0.27Learn   文件: AbstractNioByteChannel.java
@Override
public final void read() {
    final ChannelConfig config = config();
    if (!config.isAutoRead() && !isReadPending()) {
        // ChannelConfig.setAutoRead(false) was called in the meantime
        removeReadOp();
        return;
    }

    final ChannelPipeline pipeline = pipeline();
    final ByteBufAllocator allocator = config.getAllocator();
    final int maxMessagesPerRead = config.getMaxMessagesPerRead();
    RecvByteBufAllocator.Handle allocHandle = this.allocHandle;
    if (allocHandle == null) {
        this.allocHandle = allocHandle = config.getRecvByteBufAllocator().newHandle();
    }

    ByteBuf byteBuf = null;
    int messages = 0;
    boolean close = false;
    try {
        int totalReadAmount = 0;
        boolean readPendingReset = false;
        do {
            byteBuf = allocHandle.allocate(allocator);
            int writable = byteBuf.writableBytes();
            int localReadAmount = doReadBytes(byteBuf);
            if (localReadAmount <= 0) {
                // not was read release the buffer
                byteBuf.release();
                byteBuf = null;
                close = localReadAmount < 0;
                break;
            }
            if (!readPendingReset) {
                readPendingReset = true;
                setReadPending(false);
            }
            pipeline.fireChannelRead(byteBuf);
            byteBuf = null;

            if (totalReadAmount >= Integer.MAX_VALUE - localReadAmount) {
                // Avoid overflow.
                totalReadAmount = Integer.MAX_VALUE;
                break;
            }

            totalReadAmount += localReadAmount;

            // stop reading
            if (!config.isAutoRead()) {
                break;
            }

            if (localReadAmount < writable) {
                // Read less than what the buffer can hold,
                // which might mean we drained the recv buffer completely.
                break;
            }
        } while (++ messages < maxMessagesPerRead);

        pipeline.fireChannelReadComplete();
        allocHandle.record(totalReadAmount);

        if (close) {
            closeOnRead(pipeline);
            close = false;
        }
    } catch (Throwable t) {
        handleReadException(pipeline, byteBuf, t, close);
    } finally {
        // Check if there is a readPending which was not processed yet.
        // This could be for two reasons:
        // * The user called Channel.read() or ChannelHandlerContext.read() in channelRead(...) method
        // * The user called Channel.read() or ChannelHandlerContext.read() in channelReadComplete(...) method
        //
        // See https://github.com/netty/netty/issues/2254
        if (!config.isAutoRead() && !isReadPending()) {
            removeReadOp();
        }
    }
}
 
@Override
void epollInReady() {
    final ChannelConfig config = config();
    boolean edgeTriggered = isFlagSet(Native.EPOLLET);

    if (!readPending && !edgeTriggered && !config.isAutoRead()) {
        // ChannelConfig.setAutoRead(false) was called in the meantime
        clearEpollIn0();
        return;
    }

    final ChannelPipeline pipeline = pipeline();
    final ByteBufAllocator allocator = config.getAllocator();
    RecvByteBufAllocator.Handle allocHandle = this.allocHandle;
    if (allocHandle == null) {
        this.allocHandle = allocHandle = config.getRecvByteBufAllocator().newHandle();
    }

    ByteBuf byteBuf = null;
    boolean close = false;
    try {
        // if edgeTriggered is used we need to read all messages as we are not notified again otherwise.
        final int maxMessagesPerRead = edgeTriggered
                ? Integer.MAX_VALUE : config.getMaxMessagesPerRead();
        int messages = 0;
        int totalReadAmount = 0;
        do {
            // we use a direct buffer here as the native implementations only be able
            // to handle direct buffers.
            byteBuf = allocHandle.allocate(allocator);
            int writable = byteBuf.writableBytes();
            int localReadAmount = doReadBytes(byteBuf);
            if (localReadAmount <= 0) {
                // not was read release the buffer
                byteBuf.release();
                close = localReadAmount < 0;
                break;
            }
            readPending = false;
            pipeline.fireChannelRead(byteBuf);
            byteBuf = null;

            if (totalReadAmount >= Integer.MAX_VALUE - localReadAmount) {
                allocHandle.record(totalReadAmount);

                // Avoid overflow.
                totalReadAmount = localReadAmount;
            } else {
                totalReadAmount += localReadAmount;
            }

            if (localReadAmount < writable) {
                // Read less than what the buffer can hold,
                // which might mean we drained the recv buffer completely.
                break;
            }
            if (!edgeTriggered && !config.isAutoRead()) {
                // This is not using EPOLLET so we can stop reading
                // ASAP as we will get notified again later with
                // pending data
                break;
            }
        } while (++ messages < maxMessagesPerRead);

        pipeline.fireChannelReadComplete();
        allocHandle.record(totalReadAmount);

        if (close) {
            closeOnRead(pipeline);
            close = false;
        }
    } catch (Throwable t) {
        boolean closed = handleReadException(pipeline, byteBuf, t, close);
        if (!closed) {
            // trigger a read again as there may be something left to read and because of epoll ET we
            // will not get notified again until we read everything from the socket
            eventLoop().execute(new Runnable() {
                @Override
                public void run() {
                    epollInReady();
                }
            });
        }
    } finally {
        // Check if there is a readPending which was not processed yet.
        // This could be for two reasons:
        // * The user called Channel.read() or ChannelHandlerContext.read() in channelRead(...) method
        // * The user called Channel.read() or ChannelHandlerContext.read() in channelReadComplete(...) method
        //
        // See https://github.com/netty/netty/issues/2254
        if (!readPending && !config.isAutoRead()) {
            clearEpollIn0();
        }
    }
}
 
@Override
public void handleEvent(ConduitStreamSourceChannel channel) {
    final ChannelConfig config = config();
    final ChannelPipeline pipeline = pipeline();
    final ByteBufAllocator allocator = config.getAllocator();
    final int maxMessagesPerRead = config.getMaxMessagesPerRead();
    RecvByteBufAllocator.Handle allocHandle = this.allocHandle;
    if (allocHandle == null) {
        this.allocHandle = allocHandle = config.getRecvByteBufAllocator().newHandle();
    }

    ByteBuf byteBuf = null;
    int messages = 0;
    boolean close = false;
    try {
        int byteBufCapacity = allocHandle.guess();
        int totalReadAmount = 0;
        do {
            byteBuf = allocator.ioBuffer(byteBufCapacity);
            int writable = byteBuf.writableBytes();
            int localReadAmount = byteBuf.writeBytes(channel, byteBuf.writableBytes());
            if (localReadAmount <= 0) {
                // not was read release the buffer
                byteBuf.release();
                close = localReadAmount < 0;
                break;
            }
            ((AbstractXnioUnsafe) unsafe()).readPending = false;
            pipeline.fireChannelRead(byteBuf);
            byteBuf = null;

            if (totalReadAmount >= Integer.MAX_VALUE - localReadAmount) {
                // Avoid overflow.
                totalReadAmount = Integer.MAX_VALUE;
                break;
            }

            totalReadAmount += localReadAmount;

            // stop reading
            if (!config.isAutoRead()) {
                break;
            }

            if (localReadAmount < writable) {
                // Read less than what the buffer can hold,
                // which might mean we drained the recv buffer completely.
                break;
            }
        } while (++ messages < maxMessagesPerRead);

        pipeline.fireChannelReadComplete();
        allocHandle.record(totalReadAmount);

        if (close) {
            closeOnRead();
            close = false;
        }
    } catch (Throwable t) {
        handleReadException(pipeline, byteBuf, t, close);
    } finally {
        // Check if there is a readPending which was not processed yet.
        // This could be for two reasons:
        // * The user called Channel.read() or ChannelHandlerContext.read() in channelRead(...) method
        // * The user called Channel.read() or ChannelHandlerContext.read() in channelReadComplete(...) method
        //
        // See https://github.com/netty/netty/issues/2254
        if (!config.isAutoRead() && !((AbstractXnioUnsafe) unsafe()).readPending) {
            removeReadOp(channel);
        }
    }
}