下面列出了io.netty.channel.Channel#eventLoop ( ) 实例代码,或者点击链接到github查看源代码,也可以在右侧发表评论。
@Override
public Future<Void> release(final Channel channel, final Promise<Void> promise) {
checkNotNull(channel, "channel");
checkNotNull(promise, "promise");
try {
EventLoop loop = channel.eventLoop();
if (loop.inEventLoop()) {
doReleaseChannel(channel, promise);
} else {
loop.execute(new Runnable() {
@Override
public void run() {
// 释放channel
doReleaseChannel(channel, promise);
}
});
}
} catch (Throwable cause) {
// 关闭channel,发布promise失败事件
closeAndFail(channel, cause, promise);
}
return promise;
}
private void write(Append cmd) throws ConnectionFailedException {
Channel channel = nettyHandler.getChannel();
EventLoop eventLoop = channel.eventLoop();
ChannelPromise promise = channel.newPromise();
promise.addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) {
throttle.release(cmd.getDataLength());
nettyHandler.setRecentMessage();
if (!future.isSuccess()) {
future.channel().pipeline().fireExceptionCaught(future.cause());
}
}
});
// Work around for https://github.com/netty/netty/issues/3246
eventLoop.execute(() -> {
try {
if (!closed.get()) {
channel.write(cmd, promise);
}
} catch (Exception e) {
channel.pipeline().fireExceptionCaught(e);
}
});
Exceptions.handleInterrupted(() -> throttle.acquire(cmd.getDataLength()));
}
private void write(WireCommand cmd) throws ConnectionFailedException {
Channel channel = nettyHandler.getChannel();
EventLoop eventLoop = channel.eventLoop();
ChannelPromise promise = channel.newPromise();
promise.addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) {
nettyHandler.setRecentMessage();
if (!future.isSuccess()) {
future.channel().pipeline().fireExceptionCaught(future.cause());
}
}
});
// Work around for https://github.com/netty/netty/issues/3246
eventLoop.execute(() -> {
try {
if (!closed.get()) {
channel.write(cmd, promise);
}
} catch (Exception e) {
channel.pipeline().fireExceptionCaught(e);
}
});
}
/**
* Tries to retrieve healthy channel from the pool if any or creates a new channel otherwise.尝试从池中检索健康通道(如果有的话)或创建新的通道。
* @param promise the promise to provide acquire result.
* @return future for acquiring a channel.
*/
private Future<Channel> acquireHealthyFromPoolOrNew(final Promise<Channel> promise) {
try {
// 从deque中获取一个channel,这里是用双端队列存储的channel
final Channel ch = pollChannel();
if (ch == null) {
// No Channel left in the pool bootstrap a new Channel池中没有剩余通道引导新通道
Bootstrap bs = bootstrap.clone();
bs.attr(POOL_KEY, this);
// 如果channel不存在就创建一个
ChannelFuture f = connectChannel(bs);
if (f.isDone()) {
// promise发布连接成功事件
notifyConnect(f, promise);
} else {
f.addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
notifyConnect(future, promise);
}
});
}
return promise;
}
EventLoop loop = ch.eventLoop();
if (loop.inEventLoop()) {
doHealthCheck(ch, promise);
} else {
loop.execute(new Runnable() {
@Override
public void run() {
doHealthCheck(ch, promise);
}
});
}
} catch (Throwable cause) {
promise.tryFailure(cause);
}
return promise;
}
@Override
public Future<Boolean> isHealthy(Channel channel) {
EventLoop loop = channel.eventLoop();
if(!channel.isActive()) {
logger.warn("local addr: {}, remote addr {} , channel not active, closed..", channel.localAddress(), channel.remoteAddress());
}
return channel.isActive() ? loop.newSucceededFuture(Boolean.TRUE) : loop.newSucceededFuture(Boolean.FALSE);
}
public XrpcRequest(
FullHttpRequest request,
ServerContext connectionContext,
Map<String, String> groups,
Channel channel) {
this.h1Request = request;
this.h2Headers = null;
this.connectionContext = connectionContext;
this.groups = groups;
this.upstreamChannel = channel;
this.alloc = channel.alloc();
this.eventLoop = channel.eventLoop();
this.h2Data = null;
}
public XrpcRequest(
Http2Headers headers,
ServerContext connectionContext,
Map<String, String> groups,
Channel channel) {
this.h1Request = null;
this.h2Headers = headers;
this.connectionContext = connectionContext;
this.groups = groups;
this.upstreamChannel = channel;
this.alloc = channel.alloc();
this.eventLoop = channel.eventLoop();
this.h2Data = alloc.compositeBuffer();
}
@Override
public Future<Void> release(Channel channel) {
if (shouldRelease(channel)) {
return delegate.release(channel);
} else {
return new SucceededFuture<>(channel.eventLoop(), null);
}
}
@Override
public void send(WireCommand cmd) {
Channel c = getChannel();
// Work around for https://github.com/netty/netty/issues/3246
EventLoop eventLoop = c.eventLoop();
eventLoop.execute(() -> write(c, cmd));
}
/**
* Returns the current execution context or creates one.
*/
private ThreadContext getOrCreateContext(Channel channel) {
ThreadContext context = ThreadContext.currentContext();
if (context != null) {
return context;
}
return new SingleThreadContext(Thread.currentThread(), channel.eventLoop(), this.context.serializer().clone());
}
@Override
public Future<Boolean> isHealthy(Channel channel) {
EventLoop loop = channel.eventLoop();
if (channel.isActive()) {
passedHealthCheckCount.incrementAndGet();
return loop.newSucceededFuture(Boolean.TRUE);
} else {
failedHealthCheckCount.incrementAndGet();
return loop.newSucceededFuture(Boolean.FALSE);
}
}
@Override
public Future<Boolean> isHealthy(Channel channel) {
EventLoop loop = channel.eventLoop();
return channel.isActive()? loop.newSucceededFuture(Boolean.TRUE) : loop.newSucceededFuture(Boolean.FALSE);
}
public BatchSender(Channel channel) {
this.channel = channel;
this.voidPromise = channel.voidPromise();
this.eventLoop = channel.eventLoop();
}
NettyChannelPublisher(Channel channel, Predicate<T> terminalSignalPredicate, CloseHandler closeHandler) {
this.eventLoop = channel.eventLoop();
this.channel = channel;
this.closeHandler = closeHandler;
this.terminalSignalPredicate = requireNonNull(terminalSignalPredicate);
}
public boolean isSameEventLoop(Channel channel) {
return initialEventLoop == channel.eventLoop();
}
public BatchFlusher(final Channel channel, final int maxPending) {
this.channel = channel;
this.maxPending = maxPending;
this.eventLoop = channel.eventLoop();
}