下面列出了io.netty.channel.EventLoop#execute ( ) 实例代码,或者点击链接到github查看源代码,也可以在右侧发表评论。
@Override
public Future<Void> release(final Channel channel, final Promise<Void> promise) {
checkNotNull(channel, "channel");
checkNotNull(promise, "promise");
try {
EventLoop loop = channel.eventLoop();
if (loop.inEventLoop()) {
doReleaseChannel(channel, promise);
} else {
loop.execute(new Runnable() {
@Override
public void run() {
// 释放channel
doReleaseChannel(channel, promise);
}
});
}
} catch (Throwable cause) {
// 关闭channel,发布promise失败事件
closeAndFail(channel, cause, promise);
}
return promise;
}
/**
* @deprecated Use {@link #clearReadPending()} if appropriate instead.
* No longer supported.
*/
@Deprecated
protected void setReadPending(final boolean readPending) {
if (isRegistered()) {
EventLoop eventLoop = eventLoop();
if (eventLoop.inEventLoop()) {
setReadPending0(readPending);
} else {
eventLoop.execute(new Runnable() {
@Override
public void run() {
setReadPending0(readPending);
}
});
}
} else {
// Best effort if we are not registered yet clear readPending.
// NB: We only set the boolean field instead of calling clearReadPending0(), because the SelectionKey is
// not set yet so it would produce an assertion failure.
this.readPending = readPending;
}
}
/**
* Set read pending to {@code false}.
*/
protected final void clearReadPending() {
// 如果渠道已经注册了
if (isRegistered()) {
// 获取事件组
EventLoop eventLoop = eventLoop();
// 如果当前线程在事件组
if (eventLoop.inEventLoop()) {
clearReadPending0();
} else {
// 否则异步执行清除自动读
eventLoop.execute(clearReadPendingRunnable);
}
} else {
// Best effort if we are not registered yet clear readPending. This happens during channel initialization.
// NB: We only set the boolean field instead of calling clearReadPending0(), because the SelectionKey is
// not set yet so it would produce an assertion failure.
readPending = false;
}
}
final void clearEpollIn() {
// Only clear if registered with an EventLoop as otherwise
if (isRegistered()) {
final EventLoop loop = eventLoop();
final AbstractEpollUnsafe unsafe = (AbstractEpollUnsafe) unsafe();
if (loop.inEventLoop()) {
unsafe.clearEpollIn0();
} else {
// schedule a task to clear the EPOLLIN as it is not safe to modify it directly
loop.execute(new OneTimeTask() {
@Override
public void run() {
if (!config().isAutoRead() && !unsafe.readPending) {
// Still no read triggered so clear it now
unsafe.clearEpollIn0();
}
}
});
}
} else {
// The EventLoop is not registered atm so just update the flags so the correct value
// will be used once the channel is registered
flags &= ~readFlag;
}
}
@Override
public void addTask(Runnable task) {
EventLoop eventLoop = channel.eventLoop();
while (!taskQueue.offer(task)) {
if (eventLoop.inEventLoop()) {
runAllTasks.run();
} else {
// TODO await?
eventLoop.execute(runAllTasks);
}
}
if (!taskQueue.isEmpty()) {
eventLoop.execute(runAllTasks);
}
}
@Override
public ChannelFuture shutdownOutput(final ChannelPromise promise) {
Executor closeExecutor = ((EpollSocketChannelUnsafe) unsafe()).closeExecutor();
if (closeExecutor != null) {
closeExecutor.execute(new OneTimeTask() {
@Override
public void run() {
shutdownOutput0(promise);
}
});
} else {
EventLoop loop = eventLoop();
if (loop.inEventLoop()) {
shutdownOutput0(promise);
} else {
loop.execute(new OneTimeTask() {
@Override
public void run() {
shutdownOutput0(promise);
}
});
}
}
return promise;
}
@Override
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
LOG.info(">>> channelUnregistered");
if (unrecognizedName) {
LOG.info(">>> unrecognizedName retry");
final EventLoop loop = ctx.channel().eventLoop();
loop.execute(new Runnable() {
@Override
public void run() {
try {
client.retry(loop);
} catch (InterruptedException e) {
LOG.info(">>> retry interrupted, shutdown");
client.stop();
}
}
});
} else {
LOG.info(">>> shutdown sucessfully");
client.stop();
}
}
final void clearEpollIn() {
// Only clear if registered with an EventLoop as otherwise
if (isRegistered()) {
final EventLoop loop = eventLoop();
final AbstractEpollUnsafe unsafe = (AbstractEpollUnsafe) unsafe();
if (loop.inEventLoop()) {
unsafe.clearEpollIn0();
} else {
// schedule a task to clear the EPOLLIN as it is not safe to modify it directly
loop.execute(new Runnable() {
@Override
public void run() {
if (!unsafe.readPending && !config().isAutoRead()) {
// Still no read triggered so clear it now
unsafe.clearEpollIn0();
}
}
});
}
} else {
// The EventLoop is not registered atm so just update the flags so the correct value
// will be used once the channel is registered
flags &= ~readFlag;
}
}
@Override
public ChannelFuture shutdownOutput(final ChannelPromise promise) {
EventLoop loop = eventLoop();
if (loop.inEventLoop()) {
shutdownOutput0(promise);
} else {
loop.execute(new Runnable() {
@Override
public void run() {
shutdownOutput0(promise);
}
});
}
return promise;
}
@Override
public ChannelFuture shutdownInput(final ChannelPromise promise) {
EventLoop loop = eventLoop();
if (loop.inEventLoop()) {
shutdownInput0(promise);
} else {
loop.execute(new Runnable() {
@Override
public void run() {
shutdownInput0(promise);
}
});
}
return promise;
}
void initiateGracefulClose(final Runnable whenInitiated) {
EventLoop eventLoop = channel.eventLoop();
if (eventLoop.inEventLoop()) {
doCloseAsyncGracefully0(whenInitiated);
} else {
eventLoop.execute(() -> doCloseAsyncGracefully0(whenInitiated));
}
}
private void acquireConnectionAndExecute(ClientRequestContext ctx, Endpoint endpointWithPort,
String ipAddr, HttpRequest req, DecodedHttpResponse res,
ClientConnectionTimingsBuilder timingsBuilder) {
final EventLoop eventLoop = ctx.eventLoop();
if (!eventLoop.inEventLoop()) {
eventLoop.execute(() -> acquireConnectionAndExecute(ctx, endpointWithPort, ipAddr,
req, res, timingsBuilder));
return;
}
final String host = extractHost(ctx, req, endpointWithPort);
final int port = endpointWithPort.port();
final SessionProtocol protocol = ctx.sessionProtocol();
final HttpChannelPool pool = factory.pool(ctx.eventLoop());
final PoolKey key = new PoolKey(host, ipAddr, port);
final PooledChannel pooledChannel = pool.acquireNow(protocol, key);
if (pooledChannel != null) {
logSession(ctx, pooledChannel, null);
doExecute(pooledChannel, ctx, req, res);
} else {
pool.acquireLater(protocol, key, timingsBuilder).handle((newPooledChannel, cause) -> {
logSession(ctx, newPooledChannel, timingsBuilder.build());
if (cause == null) {
doExecute(newPooledChannel, ctx, req, res);
} else {
final UnprocessedRequestException wrapped = UnprocessedRequestException.of(cause);
handleEarlyRequestException(ctx, req, wrapped);
res.close(wrapped);
}
return null;
});
}
}
/**
* Tries to retrieve healthy channel from the pool if any or creates a new channel otherwise.尝试从池中检索健康通道(如果有的话)或创建新的通道。
* @param promise the promise to provide acquire result.
* @return future for acquiring a channel.
*/
private Future<Channel> acquireHealthyFromPoolOrNew(final Promise<Channel> promise) {
try {
// 从deque中获取一个channel,这里是用双端队列存储的channel
final Channel ch = pollChannel();
if (ch == null) {
// No Channel left in the pool bootstrap a new Channel池中没有剩余通道引导新通道
Bootstrap bs = bootstrap.clone();
bs.attr(POOL_KEY, this);
// 如果channel不存在就创建一个
ChannelFuture f = connectChannel(bs);
if (f.isDone()) {
// promise发布连接成功事件
notifyConnect(f, promise);
} else {
f.addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
notifyConnect(future, promise);
}
});
}
return promise;
}
EventLoop loop = ch.eventLoop();
if (loop.inEventLoop()) {
doHealthCheck(ch, promise);
} else {
loop.execute(new Runnable() {
@Override
public void run() {
doHealthCheck(ch, promise);
}
});
}
} catch (Throwable cause) {
promise.tryFailure(cause);
}
return promise;
}
@Override
public void subscribe(CoreSubscriber<? super Channel> actual) {
EventLoop eventLoop = channel.eventLoop();
if (eventLoop.inEventLoop()) {
_subscribe(actual);
}
else {
eventLoop.execute(() -> _subscribe(actual));
}
}
/**
* Set read pending to {@code false}.
*/
protected final void clearReadPending() {
if (isRegistered()) {
EventLoop eventLoop = eventLoop();
if (eventLoop.inEventLoop()) {
readPending = false;
} else {
eventLoop.execute(clearReadPendingRunnable);
}
} else {
// Best effort if we are not registered yet clear readPending. This happens during channel initialization.
readPending = false;
}
}
@Override
public ChannelFuture shutdownOutput(final ChannelPromise promise) {
EventLoop loop = eventLoop();
if (loop.inEventLoop()) {
((AbstractUnsafe) unsafe()).shutdownOutput(promise);
} else {
loop.execute(new Runnable() {
@Override
public void run() {
((AbstractUnsafe) unsafe()).shutdownOutput(promise);
}
});
}
return promise;
}
@Override
public void send(WireCommand cmd) {
Channel c = getChannel();
// Work around for https://github.com/netty/netty/issues/3246
EventLoop eventLoop = c.eventLoop();
eventLoop.execute(() -> write(c, cmd));
}
@Override
protected void doClose() throws Exception {
final LocalChannel peer = this.peer;
State oldState = state;
try {
if (oldState != State.CLOSED) {
// Update all internal state before the closeFuture is notified.
if (localAddress != null) {
if (parent() == null) {
LocalChannelRegistry.unregister(localAddress);
}
localAddress = null;
}
// State change must happen before finishPeerRead to ensure writes are released either in doWrite or
// channelRead.
state = State.CLOSED;
// Preserve order of event and force a read operation now before the close operation is processed.
if (writeInProgress && peer != null) {
finishPeerRead(peer);
}
ChannelPromise promise = connectPromise;
if (promise != null) {
// Use tryFailure() instead of setFailure() to avoid the race against cancel().
promise.tryFailure(DO_CLOSE_CLOSED_CHANNEL_EXCEPTION);
connectPromise = null;
}
}
if (peer != null) {
this.peer = null;
// Always call peer.eventLoop().execute() even if peer.eventLoop().inEventLoop() is true.
// This ensures that if both channels are on the same event loop, the peer's channelInActive
// event is triggered *after* this peer's channelInActive event
EventLoop peerEventLoop = peer.eventLoop();
final boolean peerIsActive = peer.isActive();
try {
peerEventLoop.execute(new Runnable() {
@Override
public void run() {
peer.tryClose(peerIsActive);
}
});
} catch (Throwable cause) {
logger.warn("Releasing Inbound Queues for channels {}-{} because exception occurred!",
this, peer, cause);
if (peerEventLoop.inEventLoop()) {
peer.releaseInboundBuffers();
} else {
// inboundBuffers is a SPSC so we may leak if the event loop is shutdown prematurely or
// rejects the close Runnable but give a best effort.
peer.close();
}
PlatformDependent.throwException(cause);
}
}
} finally {
// Release all buffers if the Channel was already registered in the past and if it was not closed before.
if (oldState != null && oldState != State.CLOSED) {
// We need to release all the buffers that may be put into our inbound queue since we closed the Channel
// to ensure we not leak any memory. This is fine as it basically gives the same guarantees as TCP which
// means even if the promise was notified before its not really guaranteed that the "remote peer" will
// see the buffer at all.
releaseInboundBuffers();
}
}
}
@Override
protected void doWrite(ChannelOutboundBuffer in) throws Exception {
if (state < 2) {
throw new NotYetConnectedException();
}
if (state > 2) {
throw new ClosedChannelException();
}
final LocalChannel peer = this.peer;
final ChannelPipeline peerPipeline = peer.pipeline();
final EventLoop peerLoop = peer.eventLoop();
if (peerLoop == eventLoop()) {
for (;;) {
Object msg = in.current();
if (msg == null) {
break;
}
peer.inboundBuffer.add(msg);
ReferenceCountUtil.retain(msg);
in.remove();
}
finishPeerRead(peer, peerPipeline);
} else {
// Use a copy because the original msgs will be recycled by AbstractChannel.
final Object[] msgsCopy = new Object[in.size()];
for (int i = 0; i < msgsCopy.length; i ++) {
msgsCopy[i] = ReferenceCountUtil.retain(in.current());
in.remove();
}
peerLoop.execute(new Runnable() {
@Override
public void run() {
Collections.addAll(peer.inboundBuffer, msgsCopy);
finishPeerRead(peer, peerPipeline);
}
});
}
}
@Override
protected void doClose() throws Exception {
active = false;
// Even if we allow half closed sockets we should give up on reading. Otherwise we may allow a read attempt on a
// socket which has not even been connected yet. This has been observed to block during unit tests.
inputClosedSeenErrorOnRead = true;
try {
ChannelPromise promise = connectPromise;
if (promise != null) {
// Use tryFailure() instead of setFailure() to avoid the race against cancel().
promise.tryFailure(DO_CLOSE_CLOSED_CHANNEL_EXCEPTION);
connectPromise = null;
}
ScheduledFuture<?> future = connectTimeoutFuture;
if (future != null) {
future.cancel(false);
connectTimeoutFuture = null;
}
if (isRegistered()) {
// Need to check if we are on the EventLoop as doClose() may be triggered by the GlobalEventExecutor
// if SO_LINGER is used.
//
// See https://github.com/netty/netty/issues/7159
EventLoop loop = eventLoop();
if (loop.inEventLoop()) {
doDeregister();
} else {
loop.execute(new Runnable() {
@Override
public void run() {
try {
doDeregister();
} catch (Throwable cause) {
pipeline().fireExceptionCaught(cause);
}
}
});
}
}
} finally {
socket.close();
}
}