下面列出了io.netty.channel.Channel#isRegistered ( ) 实例代码,或者点击链接到github查看源代码,也可以在右侧发表评论。
private void removePerRequestHandlers(Channel channel) {
channel.attr(IN_USE).set(false);
// Only remove per request handler if the channel is registered
// or open since DefaultChannelPipeline would remove handlers if
// channel is closed and unregistered
// See DefaultChannelPipeline.java#L1403
if (channel.isOpen() || channel.isRegistered()) {
removeIfExists(channel.pipeline(),
HttpStreamsClientHandler.class,
LastHttpContentHandler.class,
FlushOnReadHandler.class,
ResponseHandler.class,
ReadTimeoutHandler.class,
WriteTimeoutHandler.class);
}
}
public static String channelInfoForLogging(Channel ch)
{
if (ch == null) {
return "null";
}
String channelInfo = ch.toString()
+ ", active=" + ch.isActive()
+ ", open=" + ch.isOpen()
+ ", registered=" + ch.isRegistered()
+ ", writable=" + ch.isWritable()
+ ", id=" + ch.id();
CurrentPassport passport = CurrentPassport.fromChannel(ch);
return "Channel: " + channelInfo + ", Passport: " + String.valueOf(passport);
}
final ChannelFuture initAndRegister() {
Channel channel = null;
try {
channel = channelFactory().newChannel();
init(channel);
} catch (Throwable t) {
if (channel != null) {
// channel can be null if newChannel crashed (eg SocketException("too many open files"))
channel.unsafe().closeForcibly();
}
// as the Channel is not registered yet we need to force the usage of the GlobalEventExecutor
return new DefaultChannelPromise(channel, GlobalEventExecutor.INSTANCE).setFailure(t);
}
ChannelFuture regFuture = group().register(channel);
if (regFuture.cause() != null) {
if (channel.isRegistered()) {
channel.close();
} else {
channel.unsafe().closeForcibly();
}
}
// If we are here and the promise is not failed, it's one of the following cases:
// 1) If we attempted registration from the event loop, the registration has been completed at this point.
// i.e. It's safe to attempt bind() or connect() now because the channel has been registered.
// 2) If we attempted registration from the other thread, the registration request has been successfully
// added to the event loop's task queue for later execution.
// i.e. It's safe to attempt bind() or connect() now:
// because bind() or connect() will be executed *after* the scheduled registration task is executed
// because register(), bind(), and connect() are all bound to the same thread.
return regFuture;
}
private static void registerDone(ChannelFuture future) {
// Handle any errors that occurred on the local thread while registering. Even though
// failures can happen after this point, they will be handled by the channel by closing the
// childChannel.
if (!future.isSuccess()) {
Channel childChannel = future.channel();
if (childChannel.isRegistered()) {
childChannel.close();
} else {
childChannel.unsafe().closeForcibly();
}
}
}
@Substitute
final ChannelFuture initAndRegister() {
Channel channel = null;
try {
channel = channelFactory.newChannel();
init(channel);
} catch (Throwable t) {
// THE FIX IS HERE:
t.printStackTrace();
if (channel != null) {
// channel can be null if newChannel crashed (eg SocketException("too many open files"))
channel.unsafe().closeForcibly();
}
// as the Channel is not registered yet we need to force the usage of the GlobalEventExecutor
return new DefaultChannelPromise(channel, GlobalEventExecutor.INSTANCE).setFailure(t);
}
ChannelFuture regFuture = config().group().register(channel);
if (regFuture.cause() != null) {
if (channel.isRegistered()) {
channel.close();
} else {
channel.unsafe().closeForcibly();
}
}
// If we are here and the promise is not failed, it's one of the following cases:
// 1) If we attempted registration from the event loop, the registration has been completed at this point.
// i.e. It's safe to attempt bind() or connect() now because the channel has been registered.
// 2) If we attempted registration from the other thread, the registration request has been successfully
// added to the event loop's task queue for later execution.
// i.e. It's safe to attempt bind() or connect() now:
// because bind() or connect() will be executed *after* the scheduled registration task is executed
// because register(), bind(), and connect() are all bound to the same thread.
return regFuture;
}
@SuppressWarnings("StatementWithEmptyBody")
@Override
public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
Channel ch = ctx.channel();
if (ch.isActive() && ch.isRegistered()) {
// channelActive() event has been fired already, which means this.channelActive() will
// not be invoked. We have to initialize here instead.
initialize(ctx);
} else {
// channelActive() event has not been fired yet. this.channelActive() will be invoked
// and initialization will occur there.
}
}
final ChannelFuture initAndRegister() {
final Channel channel = channelFactory().newChannel();
try {
init(channel);
} catch (Throwable t) {
channel.unsafe().closeForcibly();
// as the Channel is not registered yet we need to force the usage of the GlobalEventExecutor
return new DefaultChannelPromise(channel, GlobalEventExecutor.INSTANCE).setFailure(t);
}
ChannelFuture regFuture = group().register(channel);
if (regFuture.cause() != null) {
if (channel.isRegistered()) {
channel.close();
} else {
channel.unsafe().closeForcibly();
}
}
// If we are here and the promise is not failed, it's one of the following cases:
// 1) If we attempted registration from the event loop, the registration has been completed at this point.
// i.e. It's safe to attempt bind() or connect() now because the channel has been registered.
// 2) If we attempted registration from the other thread, the registration request has been successfully
// added to the event loop's task queue for later execution.
// i.e. It's safe to attempt bind() or connect() now:
// because bind() or connect() will be executed *after* the scheduled registration task is executed
// because register(), bind(), and connect() are all bound to the same thread.
return regFuture;
}
final ChannelFuture initAndRegister() {
Channel channel = null;
try {
// 工厂模式创建渠道
channel = channelFactory.newChannel();
// 初始化渠道
init(channel);
} catch (Throwable t) {
if (channel != null) {
// channel can be null if newChannel crashed (eg SocketException("too many open files"))通道可以是空的,如果新通道崩溃(如SocketException(“太多打开的文件”))
// 出现异常,关闭通道
channel.unsafe().closeForcibly();
// as the Channel is not registered yet we need to force the usage of the GlobalEventExecutor由于通道尚未注册,我们需要强制使用GlobalEventExecutor
return new DefaultChannelPromise(channel, GlobalEventExecutor.INSTANCE).setFailure(t);
}
// as the Channel is not registered yet we need to force the usage of the GlobalEventExecutor由于通道尚未注册,我们需要强制使用GlobalEventExecutor
return new DefaultChannelPromise(new FailedChannel(), GlobalEventExecutor.INSTANCE).setFailure(t);
}
// 从服务端通道配置信息中获取事件组并注册通道
ChannelFuture regFuture = config().group().register(channel);
if (regFuture.cause() != null) {
// 通道已注册到事件组
if (channel.isRegistered()) {
channel.close();
} else {
channel.unsafe().closeForcibly();
}
}
// If we are here and the promise is not failed, it's one of the following cases:
// 1) If we attempted registration from the event loop, the registration has been completed at this point.
// i.e. It's safe to attempt bind() or connect() now because the channel has been registered.
// 2) If we attempted registration from the other thread, the registration request has been successfully
// added to the event loop's task queue for later execution.
// i.e. It's safe to attempt bind() or connect() now:
// because bind() or connect() will be executed *after* the scheduled registration task is executed
// because register(), bind(), and connect() are all bound to the same thread.//如果我们在这里,承诺没有落空,那就是下列情况之一:
// 1)如果我们试图从事件循环中注册,此时注册已经完成。
// 例如,现在尝试bind()或connect()是安全的,因为通道已经注册了。
// 2)如果我们试图从其他线程注册,注册请求已经成功
// 添加到事件循环的任务队列以供以后执行。
// 例如,现在尝试bind()或connect()是安全的:
// 因为bind()或connect()将在* *执行预定的注册任务之后执行
// 因为register()、bind()和connect()都绑定到同一个线程。
return regFuture;
}
@SuppressWarnings("FutureReturnValueIgnored")
static Mono<Channel> doInitAndRegister(
TransportConfig config,
ChannelInitializer<Channel> channelInitializer,
boolean isDomainSocket) {
EventLoopGroup elg = config.eventLoopGroup();
ChannelFactory<? extends Channel> channelFactory = config.connectionFactory(elg, isDomainSocket);
Channel channel = null;
try {
channel = channelFactory.newChannel();
if (channelInitializer instanceof ServerTransport.AcceptorInitializer) {
((ServerTransport.AcceptorInitializer) channelInitializer).acceptor.enableAutoReadTask(channel);
}
channel.pipeline().addLast(channelInitializer);
setChannelOptions(channel, config.options, isDomainSocket);
setAttributes(channel, config.attrs);
}
catch (Throwable t) {
if (channel != null) {
channel.unsafe().closeForcibly();
}
return Mono.error(t);
}
MonoChannelPromise monoChannelPromise = new MonoChannelPromise(channel);
channel.unsafe().register(elg.next(), monoChannelPromise);
Throwable cause = monoChannelPromise.cause();
if (cause != null) {
if (channel.isRegistered()) {
// "FutureReturnValueIgnored" this is deliberate
channel.close();
}
else {
channel.unsafe().closeForcibly();
}
}
return monoChannelPromise;
}