下面列出了io.netty.channel.Channel#read ( ) 实例代码,或者点击链接到github查看源代码,也可以在右侧发表评论。
@Override
public void run() {
Channel channel = ctx.channel();
ChannelConfig config = channel.config();
if (!config.isAutoRead() && isHandlerActive(ctx)) {
// If AutoRead is False and Active is True, user make a direct setAutoRead(false)
// Then Just reset the status
if (logger.isDebugEnabled()) {
logger.debug("Not unsuspend: " + config.isAutoRead() + ':' +
isHandlerActive(ctx));
}
channel.attr(READ_SUSPENDED).set(false);
} else {
// Anything else allows the handler to reset the AutoRead
if (logger.isDebugEnabled()) {
if (config.isAutoRead() && !isHandlerActive(ctx)) {
logger.debug("Unsuspend: " + config.isAutoRead() + ':' +
isHandlerActive(ctx));
} else {
logger.debug("Normal unsuspend: " + config.isAutoRead() + ':'
+ isHandlerActive(ctx));
}
}
channel.attr(READ_SUSPENDED).set(false);
config.setAutoRead(true);
channel.read();
}
if (logger.isDebugEnabled()) {
logger.debug("Unsuspend final status => " + config.isAutoRead() + ':'
+ isHandlerActive(ctx));
}
}
public static void triggerChannelAutoRead(Channel channel) {
if(channel == null || channel.config().isAutoRead()) {
return;
}
openChannelAutoRead(channel);
channel.read();
}
private void writeClientRequestToOrigin(final PooledConnection conn, int readTimeout) {
final Channel ch = conn.getChannel();
passport.setOnChannel(ch);
// set read timeout on origin channel
ch.attr(ClientTimeoutHandler.ORIGIN_RESPONSE_READ_TIMEOUT).set(readTimeout);
context.set(ORIGIN_CHANNEL, ch);
context.set(POOLED_ORIGIN_CONNECTION_KEY, conn);
preWriteToOrigin(chosenServer.get(), zuulRequest);
final ChannelPipeline pipeline = ch.pipeline();
originResponseReceiver = getOriginResponseReceiver();
pipeline.addBefore("connectionPoolHandler", OriginResponseReceiver.CHANNEL_HANDLER_NAME, originResponseReceiver);
// check if body needs to be repopulated for retry
repopulateRetryBody();
ch.write(zuulRequest);
writeBufferedBodyContent(zuulRequest, ch);
ch.flush();
//Get ready to read origin's response
ch.read();
originConn = conn;
channelCtx.read();
}
public void testReadPendingIsResetAfterEachRead(ServerBootstrap sb, Bootstrap cb) throws Throwable {
Channel serverChannel = null;
Channel clientChannel = null;
try {
ReadPendingInitializer serverInitializer = new ReadPendingInitializer();
ReadPendingInitializer clientInitializer = new ReadPendingInitializer();
sb.option(ChannelOption.SO_BACKLOG, 1024)
.option(ChannelOption.AUTO_READ, true)
.childOption(ChannelOption.AUTO_READ, false)
// We intend to do 2 reads per read loop wakeup
.childOption(ChannelOption.RCVBUF_ALLOCATOR, new TestNumReadsRecvByteBufAllocator(2))
.childHandler(serverInitializer);
serverChannel = sb.bind().syncUninterruptibly().channel();
cb.option(ChannelOption.AUTO_READ, false)
// We intend to do 2 reads per read loop wakeup
.option(ChannelOption.RCVBUF_ALLOCATOR, new TestNumReadsRecvByteBufAllocator(2))
.handler(clientInitializer);
clientChannel = cb.connect(serverChannel.localAddress()).syncUninterruptibly().channel();
// 4 bytes means 2 read loops for TestNumReadsRecvByteBufAllocator
clientChannel.writeAndFlush(Unpooled.wrappedBuffer(new byte[4]));
// 4 bytes means 2 read loops for TestNumReadsRecvByteBufAllocator
assertTrue(serverInitializer.channelInitLatch.await(5, TimeUnit.SECONDS));
serverInitializer.channel.writeAndFlush(Unpooled.wrappedBuffer(new byte[4]));
serverInitializer.channel.read();
serverInitializer.readPendingHandler.assertAllRead();
clientChannel.read();
clientInitializer.readPendingHandler.assertAllRead();
} finally {
if (serverChannel != null) {
serverChannel.close().syncUninterruptibly();
}
if (clientChannel != null) {
clientChannel.close().syncUninterruptibly();
}
}
}
private static void testAutoReadOffDuringReadOnlyReadsOneTime(boolean readOutsideEventLoopThread,
ServerBootstrap sb, Bootstrap cb) throws Throwable {
Channel serverChannel = null;
Channel clientChannel = null;
try {
AutoReadInitializer serverInitializer = new AutoReadInitializer(!readOutsideEventLoopThread);
AutoReadInitializer clientInitializer = new AutoReadInitializer(!readOutsideEventLoopThread);
sb.option(ChannelOption.SO_BACKLOG, 1024)
.option(ChannelOption.AUTO_READ, true)
.childOption(ChannelOption.AUTO_READ, true)
// We want to ensure that we attempt multiple individual read operations per read loop so we can
// test the auto read feature being turned off when data is first read.
.childOption(ChannelOption.RCVBUF_ALLOCATOR, new TestRecvByteBufAllocator())
.childHandler(serverInitializer);
serverChannel = sb.bind().syncUninterruptibly().channel();
cb.option(ChannelOption.AUTO_READ, true)
// We want to ensure that we attempt multiple individual read operations per read loop so we can
// test the auto read feature being turned off when data is first read.
.option(ChannelOption.RCVBUF_ALLOCATOR, new TestRecvByteBufAllocator())
.handler(clientInitializer);
clientChannel = cb.connect(serverChannel.localAddress()).syncUninterruptibly().channel();
// 3 bytes means 3 independent reads for TestRecvByteBufAllocator
clientChannel.writeAndFlush(Unpooled.wrappedBuffer(new byte[3]));
serverInitializer.autoReadHandler.assertSingleRead();
// 3 bytes means 3 independent reads for TestRecvByteBufAllocator
serverInitializer.channel.writeAndFlush(Unpooled.wrappedBuffer(new byte[3]));
clientInitializer.autoReadHandler.assertSingleRead();
if (readOutsideEventLoopThread) {
serverInitializer.channel.read();
}
serverInitializer.autoReadHandler.assertSingleReadSecondTry();
if (readOutsideEventLoopThread) {
clientChannel.read();
}
clientInitializer.autoReadHandler.assertSingleReadSecondTry();
} finally {
if (clientChannel != null) {
clientChannel.close().sync();
}
if (serverChannel != null) {
serverChannel.close().sync();
}
}
}