下面列出了io.netty.channel.ChannelConfig#setAutoRead ( ) 实例代码,或者点击链接到github查看源代码,也可以在右侧发表评论。
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
final ChannelConfig config = ctx.channel().config();
if (config.isAutoRead()) {
// stop accept new connections for 1 second to allow the channel to recover
// See https://github.com/netty/netty/issues/1328
config.setAutoRead(false);
ctx.channel().eventLoop().schedule(new Runnable() {
@Override
public void run() {
config.setAutoRead(true);
}
}, 1, TimeUnit.SECONDS);
}
// still let the exceptionCaught event flow through the pipeline to give the user
// a chance to do something with it
ctx.fireExceptionCaught(cause);
}
@Override
public void channelWritabilityChanged(ChannelHandlerContext ctx) throws Exception {
Channel ch = ctx.channel();
ChannelConfig config = ch.config();
// 高水位线: ChannelOption.WRITE_BUFFER_HIGH_WATER_MARK
// 低水位线: ChannelOption.WRITE_BUFFER_LOW_WATER_MARK
if (!ch.isWritable()) {
// 当前channel的缓冲区(OutboundBuffer)大小超过了WRITE_BUFFER_HIGH_WATER_MARK
if (logger.isWarnEnabled()) {
logger.warn("{} is not writable, high water mask: {}, the number of flushed entries that are not written yet: {}.",
ch, config.getWriteBufferHighWaterMark(), ch.unsafe().outboundBuffer().size());
}
config.setAutoRead(false);
} else {
// 曾经高于高水位线的OutboundBuffer现在已经低于WRITE_BUFFER_LOW_WATER_MARK了
if (logger.isWarnEnabled()) {
logger.warn("{} is writable(rehabilitate), low water mask: {}, the number of flushed entries that are not written yet: {}.",
ch, config.getWriteBufferLowWaterMark(), ch.unsafe().outboundBuffer().size());
}
config.setAutoRead(true);
}
}
@Override
public void channelWritabilityChanged(ChannelHandlerContext ctx) throws Exception {
Channel ch = ctx.channel();
ChannelConfig config = ch.config();
// 高水位线: ChannelOption.WRITE_BUFFER_HIGH_WATER_MARK
// 低水位线: ChannelOption.WRITE_BUFFER_LOW_WATER_MARK
if (!ch.isWritable()) {
// 当前channel的缓冲区(OutboundBuffer)大小超过了WRITE_BUFFER_HIGH_WATER_MARK
if (logger.isWarnEnabled()) {
logger.warn("{} is not writable, high water mask: {}, the number of flushed entries that are not written yet: {}.",
ch, config.getWriteBufferHighWaterMark(), ch.unsafe().outboundBuffer().size());
}
config.setAutoRead(false);
} else {
// 曾经高于高水位线的OutboundBuffer现在已经低于WRITE_BUFFER_LOW_WATER_MARK了
if (logger.isWarnEnabled()) {
logger.warn("{} is writable(rehabilitate), low water mask: {}, the number of flushed entries that are not written yet: {}.",
ch, config.getWriteBufferLowWaterMark(), ch.unsafe().outboundBuffer().size());
}
config.setAutoRead(true);
}
}
@Override
public void channelWritabilityChanged(ChannelHandlerContext ctx) throws Exception {
Channel ch = ctx.channel();
ChannelConfig config = ch.config();
// 高水位线: ChannelOption.WRITE_BUFFER_HIGH_WATER_MARK
// 低水位线: ChannelOption.WRITE_BUFFER_LOW_WATER_MARK
if (!ch.isWritable()) {
// 当前channel的缓冲区(OutboundBuffer)大小超过了WRITE_BUFFER_HIGH_WATER_MARK
if (logger.isWarnEnabled()) {
logger.warn("{} is not writable, high water mask: {}, the number of flushed entries that are not written yet: {}.",
ch, config.getWriteBufferHighWaterMark(), ch.unsafe().outboundBuffer().size());
}
config.setAutoRead(false);
} else {
// 曾经高于高水位线的OutboundBuffer现在已经低于WRITE_BUFFER_LOW_WATER_MARK了
if (logger.isWarnEnabled()) {
logger.warn("{} is writable(rehabilitate), low water mask: {}, the number of flushed entries that are not written yet: {}.",
ch, config.getWriteBufferLowWaterMark(), ch.unsafe().outboundBuffer().size());
}
config.setAutoRead(true);
}
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
final ChannelConfig config = ctx.channel().config();
if (config.isAutoRead()) {
// stop accept new connections for 1 second to allow the channel to recover
// See https://github.com/netty/netty/issues/1328
config.setAutoRead(false);
ctx.channel().eventLoop().schedule(new Runnable() {
@Override
public void run() {
config.setAutoRead(true);
}
}, 1, TimeUnit.SECONDS);
}
// still let the exceptionCaught event flow through the pipeline to give the user
// a chance to do something with it
ctx.fireExceptionCaught(cause);
}
@Override
public void run() {
Channel channel = ctx.channel();
ChannelConfig config = channel.config();
if (!config.isAutoRead() && isHandlerActive(ctx)) {
// If AutoRead is False and Active is True, user make a direct setAutoRead(false)
// Then Just reset the status
if (logger.isDebugEnabled()) {
logger.debug("Not unsuspend: " + config.isAutoRead() + ':' +
isHandlerActive(ctx));
}
channel.attr(READ_SUSPENDED).set(false);
} else {
// Anything else allows the handler to reset the AutoRead
if (logger.isDebugEnabled()) {
if (config.isAutoRead() && !isHandlerActive(ctx)) {
logger.debug("Unsuspend: " + config.isAutoRead() + ':' +
isHandlerActive(ctx));
} else {
logger.debug("Normal unsuspend: " + config.isAutoRead() + ':'
+ isHandlerActive(ctx));
}
}
channel.attr(READ_SUSPENDED).set(false);
config.setAutoRead(true);
channel.read();
}
if (logger.isDebugEnabled()) {
logger.debug("Unsuspend final status => " + config.isAutoRead() + ':'
+ isHandlerActive(ctx));
}
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
final ChannelConfig config = ctx.channel().config();
if (config.isAutoRead()) {
// stop accept new connections for 1 second to allow the channel to recover
// See https://github.com/netty/netty/issues/1328
config.setAutoRead(false);
ctx.channel().eventLoop().schedule(enableAutoReadTask, 1, TimeUnit.SECONDS);
}
// still let the exceptionCaught event flow through the pipeline to give the user
// a chance to do something with it
ctx.fireExceptionCaught(cause);
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
ChannelConfig config = ctx.channel().config();
if (config.isAutoRead()) {
// stop accept new connections for 1 second to allow the channel to recover
// See https://github.com/netty/netty/issues/1328
config.setAutoRead(false);
ctx.channel().eventLoop().schedule(enableAutoReadTask, 1, TimeUnit.SECONDS);
}
// still let the exceptionCaught event flow through the pipeline to give the user
// a chance to do something with it
ctx.fireExceptionCaught(cause);
}
@Override
public void run() {
ChannelConfig config = ctx.channel().config();
if (!config.isAutoRead() && isHandlerActive(ctx)) {
// If AutoRead is False and Active is True, user make a direct setAutoRead(false)
// Then Just reset the status
if (logger.isDebugEnabled()) {
logger.debug("Not unsuspend: " + config.isAutoRead() + ':' +
isHandlerActive(ctx));
}
ctx.attr(READ_SUSPENDED).set(false);
} else {
// Anything else allows the handler to reset the AutoRead
if (logger.isDebugEnabled()) {
if (config.isAutoRead() && !isHandlerActive(ctx)) {
logger.debug("Unsuspend: " + config.isAutoRead() + ':' +
isHandlerActive(ctx));
} else {
logger.debug("Normal unsuspend: " + config.isAutoRead() + ':'
+ isHandlerActive(ctx));
}
}
ctx.attr(READ_SUSPENDED).set(false);
config.setAutoRead(true);
ctx.channel().read();
}
if (logger.isDebugEnabled()) {
logger.debug("Unsupsend final status => " + config.isAutoRead() + ':'
+ isHandlerActive(ctx));
}
}
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) {
if (msg instanceof Request) {
ChannelConfig config = ctx.channel().config();
if (++outstanding >= high && config.isAutoRead()) {
config.setAutoRead(false);
}
}
ctx.fireChannelRead(msg);
}
@Override
public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) {
if (msg instanceof Response) {
ChannelConfig config = ctx.channel().config();
if (--outstanding <= low && !config.isAutoRead()) {
config.setAutoRead(true);
}
}
ctx.write(msg, promise);
}