下面列出了怎么用io.netty.channel.socket.ChannelInputShutdownReadComplete的API类实例代码及写法,或者点击链接到github查看源代码。
/**
* Shutdown the input side of the channel.
*/
//
void shutdownInput(boolean rdHup) {
if (!socket.isInputShutdown()) {
if (isAllowHalfClosure(config())) {
try {
socket.shutdown(true, false);
} catch (IOException ignored) {
// We attempted to shutdown and failed, which means the input has already effectively been
// shutdown.
fireEventAndClose(ChannelInputShutdownEvent.INSTANCE);
return;
} catch (NotYetConnectedException ignore) {
// We attempted to shutdown and failed, which means the input has already effectively been
// shutdown.
}
clearEpollIn();
pipeline().fireUserEventTriggered(ChannelInputShutdownEvent.INSTANCE);
} else {
close(voidPromise());
}
} else if (!rdHup) {
inputClosedSeenErrorOnRead = true;
pipeline().fireUserEventTriggered(ChannelInputShutdownReadComplete.INSTANCE);
}
}
@Override
public void userEventTriggered(ChannelHandlerContext ctx, Object evt) {
if (evt == CloseHandler.ProtocolPayloadEndEvent.OUTBOUND) {
connection.channelOutboundListener.channelOutboundClosed();
} else if (evt == ChannelOutputShutdownEvent.INSTANCE) {
connection.closeHandler.channelClosedOutbound(ctx);
connection.channelOutboundListener.channelClosed(StacklessClosedChannelException.newInstance(
DefaultNettyConnection.class, "userEventTriggered(...)"));
} else if (evt == ChannelInputShutdownReadComplete.INSTANCE) {
// Notify close handler first to enhance error reporting
connection.closeHandler.channelClosedInbound(ctx);
// ChannelInputShutdownEvent is not always triggered and can get triggered before we tried to read
// all the available data. ChannelInputShutdownReadComplete is the one that seems to (at least in
// the current netty version) gets triggered reliably at the appropriate time.
connection.nettyChannelPublisher.channelInboundClosed();
} else if (evt instanceof SslHandshakeCompletionEvent) {
connection.sslSession = extractSslSession(ctx.pipeline(), (SslHandshakeCompletionEvent) evt,
this::tryFailSubscriber);
if (subscriber != null) {
assert waitForSslHandshake;
completeSubscriber();
}
}
ctx.fireUserEventTriggered(evt);
}
@Override
public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
if (evt instanceof SslHandshakeCompletionEvent) {
final SslHandler sslHandler = ctx.channel().pipeline().get(SslHandler.class);
sslSession = sslHandler != null ? sslHandler.engine().getSession() : null;
return;
}
if (evt instanceof SslCloseCompletionEvent ||
evt instanceof ChannelInputShutdownReadComplete) {
// Expected events
return;
}
logger.warn("{} Unexpected user event: {}", ctx.channel(), evt);
}
private void closeOnRead(ChannelPipeline pipeline) {
if (!isInputShutdown0()) {
if (Boolean.TRUE.equals(config().getOption(ChannelOption.ALLOW_HALF_CLOSURE))) {
shutdownInput();
pipeline.fireUserEventTriggered(ChannelInputShutdownEvent.INSTANCE);
} else {
close(voidPromise());
}
} else {
pipeline.fireUserEventTriggered(ChannelInputShutdownReadComplete.INSTANCE);
}
}
/**
* Shutdown the input side of the channel.
*/
void shutdownInput(boolean readEOF) {
// We need to take special care of calling finishConnect() if readEOF is true and we not
// fullfilled the connectPromise yet. If we fail to do so the connectPromise will be failed
// with a ClosedChannelException as a close() will happen and so the FD is closed before we
// have a chance to call finishConnect() later on. Calling finishConnect() here will ensure
// we observe the correct exception in case of a connect failure.
if (readEOF && connectPromise != null) {
finishConnect();
}
if (!socket.isInputShutdown()) {
if (isAllowHalfClosure(config())) {
try {
socket.shutdown(true, false);
} catch (IOException ignored) {
// We attempted to shutdown and failed, which means the input has already effectively been
// shutdown.
fireEventAndClose(ChannelInputShutdownEvent.INSTANCE);
return;
} catch (NotYetConnectedException ignore) {
// We attempted to shutdown and failed, which means the input has already effectively been
// shutdown.
}
pipeline().fireUserEventTriggered(ChannelInputShutdownEvent.INSTANCE);
} else {
close(voidPromise());
}
} else if (!readEOF) {
inputClosedSeenErrorOnRead = true;
pipeline().fireUserEventTriggered(ChannelInputShutdownReadComplete.INSTANCE);
}
}
private ServerSocketChannel startServer() {
EventLoopAwareNettyIoExecutor eventLoopAwareNettyIoExecutor =
toEventLoopAwareNettyIoExecutor(S_CTX.ioExecutor());
EventLoop loop = eventLoopAwareNettyIoExecutor.eventLoopGroup().next();
ServerBootstrap bs = new ServerBootstrap();
bs.group(loop);
bs.channel(serverChannel(loop, InetSocketAddress.class));
bs.childHandler(new ChannelInitializer() {
@Override
protected void initChannel(final Channel ch) {
sChannel = (SocketChannel) ch;
ch.pipeline().addLast(new ChannelInboundHandlerAdapter() {
@Override
public void userEventTriggered(final ChannelHandlerContext ctx, final Object evt) {
LOGGER.debug("Server Evt: {}", evt.getClass().getSimpleName());
if (evt == ChannelInputShutdownEvent.INSTANCE) {
serverInputShutdownLatch.countDown();
} else if (evt == ChannelInputShutdownReadComplete.INSTANCE) {
serverInputShutdownReadCompleteLatch.countDown();
} else if (evt == ChannelOutputShutdownEvent.INSTANCE) {
serverOutputShutdownLatch.countDown();
}
release(evt);
}
});
ch.eventLoop().execute(connectedLatch::countDown);
}
});
bs.childOption(AUTO_READ, true);
bs.childOption(ALLOW_HALF_CLOSURE, true);
bs.childOption(AUTO_CLOSE, false);
return (ServerSocketChannel) bs.bind(localAddress(0))
.syncUninterruptibly().channel();
}
private SocketChannel connectClient(InetSocketAddress address) {
EventLoopAwareNettyIoExecutor eventLoopAwareNettyIoExecutor =
toEventLoopAwareNettyIoExecutor(C_CTX.ioExecutor());
EventLoop loop = eventLoopAwareNettyIoExecutor.eventLoopGroup().next();
Bootstrap bs = new Bootstrap();
bs.group(loop);
bs.channel(socketChannel(loop, InetSocketAddress.class));
bs.handler(new ChannelInitializer() {
@Override
protected void initChannel(final Channel ch) {
ch.pipeline().addLast(new ChannelInboundHandlerAdapter() {
@Override
public void userEventTriggered(final ChannelHandlerContext ctx, final Object evt) {
LOGGER.debug("Client Evt: {}", evt.getClass().getSimpleName());
if (evt == ChannelInputShutdownEvent.INSTANCE) {
clientInputShutdownLatch.countDown();
} else if (evt == ChannelInputShutdownReadComplete.INSTANCE) {
clientInputShutdownReadCompleteLatch.countDown();
} else if (evt == ChannelOutputShutdownEvent.INSTANCE) {
clientOutputShutdownLatch.countDown();
}
release(evt);
}
});
}
});
bs.option(AUTO_READ, true);
bs.option(ALLOW_HALF_CLOSURE, true);
bs.option(AUTO_CLOSE, false);
return (SocketChannel) bs.connect(address).syncUninterruptibly().channel();
}