下面列出了io.netty.channel.EventLoop#inEventLoop ( ) 实例代码,或者点击链接到github查看源代码,也可以在右侧发表评论。
@Override
public Future<Void> release(final Channel channel, final Promise<Void> promise) {
checkNotNull(channel, "channel");
checkNotNull(promise, "promise");
try {
EventLoop loop = channel.eventLoop();
if (loop.inEventLoop()) {
doReleaseChannel(channel, promise);
} else {
loop.execute(new Runnable() {
@Override
public void run() {
// 释放channel
doReleaseChannel(channel, promise);
}
});
}
} catch (Throwable cause) {
// 关闭channel,发布promise失败事件
closeAndFail(channel, cause, promise);
}
return promise;
}
@Override
public ChannelFuture shutdownOutput(final ChannelPromise future) {
EventLoop loop = eventLoop();
if (loop.inEventLoop()) {
try {
socket.shutdownOutput();
future.setSuccess();
} catch (Throwable t) {
future.setFailure(t);
}
} else {
loop.execute(new Runnable() {
@Override
public void run() {
shutdownOutput(future);
}
});
}
return future;
}
/**
* @deprecated Use {@link #clearReadPending()} if appropriate instead.
* No longer supported.
*/
@Deprecated
protected void setReadPending(final boolean readPending) {
if (isRegistered()) {
EventLoop eventLoop = eventLoop();
if (eventLoop.inEventLoop()) {
setReadPending0(readPending);
} else {
eventLoop.execute(new Runnable() {
@Override
public void run() {
setReadPending0(readPending);
}
});
}
} else {
// Best effort if we are not registered yet clear readPending.
// NB: We only set the boolean field instead of calling clearReadPending0(), because the SelectionKey is
// not set yet so it would produce an assertion failure.
this.readPending = readPending;
}
}
/**
* Set read pending to {@code false}.
*/
protected final void clearReadPending() {
// 如果渠道已经注册了
if (isRegistered()) {
// 获取事件组
EventLoop eventLoop = eventLoop();
// 如果当前线程在事件组
if (eventLoop.inEventLoop()) {
clearReadPending0();
} else {
// 否则异步执行清除自动读
eventLoop.execute(clearReadPendingRunnable);
}
} else {
// Best effort if we are not registered yet clear readPending. This happens during channel initialization.
// NB: We only set the boolean field instead of calling clearReadPending0(), because the SelectionKey is
// not set yet so it would produce an assertion failure.
readPending = false;
}
}
public void setTimeoutController(TimeoutController timeoutController, EventLoop eventLoop) {
requireNonNull(timeoutController, "timeoutController");
requireNonNull(eventLoop, "eventLoop");
checkState(this.timeoutController == null, "timeoutController is set already.");
this.timeoutController = timeoutController;
this.eventLoop = eventLoop;
final Consumer<TimeoutController> pendingTimeoutTask = this.pendingTimeoutTask;
if (pendingTimeoutTask != null) {
if (eventLoop.inEventLoop()) {
pendingTimeoutTask.accept(timeoutController);
} else {
eventLoop.execute(() -> pendingTimeoutTask.accept(timeoutController));
}
}
}
@Override
public ChannelFuture shutdownOutput(final ChannelPromise promise) {
Executor closeExecutor = ((EpollSocketChannelUnsafe) unsafe()).closeExecutor();
if (closeExecutor != null) {
closeExecutor.execute(new OneTimeTask() {
@Override
public void run() {
shutdownOutput0(promise);
}
});
} else {
EventLoop loop = eventLoop();
if (loop.inEventLoop()) {
shutdownOutput0(promise);
} else {
loop.execute(new OneTimeTask() {
@Override
public void run() {
shutdownOutput0(promise);
}
});
}
}
return promise;
}
@Override
public ChannelFuture shutdownOutput(final ChannelPromise promise) {
Executor closeExecutor = ((NioSocketChannelUnsafe) unsafe()).closeExecutor();
if (closeExecutor != null) {
closeExecutor.execute(new OneTimeTask() {
@Override
public void run() {
shutdownOutput0(promise);
}
});
} else {
EventLoop loop = eventLoop();
if (loop.inEventLoop()) {
shutdownOutput0(promise);
} else {
loop.execute(new OneTimeTask() {
@Override
public void run() {
shutdownOutput0(promise);
}
});
}
}
return promise;
}
@Override
public ChannelFuture shutdownInput(final ChannelPromise promise) {
Executor closeExecutor = ((EpollStreamUnsafe) unsafe()).prepareToClose();
if (closeExecutor != null) {
closeExecutor.execute(new Runnable() {
@Override
public void run() {
shutdownInput0(promise);
}
});
} else {
EventLoop loop = eventLoop();
if (loop.inEventLoop()) {
shutdownInput0(promise);
} else {
loop.execute(new Runnable() {
@Override
public void run() {
shutdownInput0(promise);
}
});
}
}
return promise;
}
final void clearEpollIn() {
// Only clear if registered with an EventLoop as otherwise
if (isRegistered()) {
final EventLoop loop = eventLoop();
final AbstractEpollUnsafe unsafe = (AbstractEpollUnsafe) unsafe();
if (loop.inEventLoop()) {
unsafe.clearEpollIn0();
} else {
// schedule a task to clear the EPOLLIN as it is not safe to modify it directly
loop.execute(new OneTimeTask() {
@Override
public void run() {
if (!config().isAutoRead() && !unsafe.readPending) {
// Still no read triggered so clear it now
unsafe.clearEpollIn0();
}
}
});
}
} else {
// The EventLoop is not registered atm so just update the flags so the correct value
// will be used once the channel is registered
flags &= ~readFlag;
}
}
@Override
protected void doClose() throws Exception {
if (state <= 2) {
// Update all internal state before the closeFuture is notified.
if (localAddress != null) {
if (parent() == null) {
LocalChannelRegistry.unregister(localAddress);
}
localAddress = null;
}
state = 3;
}
final LocalChannel peer = this.peer;
if (peer != null && peer.isActive()) {
// Need to execute the close in the correct EventLoop
// See https://github.com/netty/netty/issues/1777
EventLoop eventLoop = peer.eventLoop();
// Also check if the registration was not done yet. In this case we submit the close to the EventLoop
// to make sure it is run after the registration completes.
//
// See https://github.com/netty/netty/issues/2144
if (eventLoop.inEventLoop() && !registerInProgress) {
peer.unsafe().close(unsafe().voidPromise());
} else {
peer.eventLoop().execute(new Runnable() {
@Override
public void run() {
peer.unsafe().close(unsafe().voidPromise());
}
});
}
this.peer = null;
}
}
@Override
public ChannelFuture shutdownInput(final ChannelPromise promise) {
EventLoop loop = eventLoop();
if (loop.inEventLoop()) {
shutdownInput0(promise);
} else {
loop.execute(new Runnable() {
@Override
public void run() {
shutdownInput0(promise);
}
});
}
return promise;
}
@Override
public ChannelFuture shutdownOutput(final ChannelPromise promise) {
final EventLoop loop = eventLoop();
if (loop.inEventLoop()) {
((AbstractUnsafe) unsafe()).shutdownOutput(promise);
} else {
loop.execute(new Runnable() {
@Override
public void run() {
((AbstractUnsafe) unsafe()).shutdownOutput(promise);
}
});
}
return promise;
}
@Override
public ChannelFuture shutdownInput(final ChannelPromise promise) {
EventLoop loop = eventLoop();
if (loop.inEventLoop()) {
shutdownInput0(promise);
} else {
loop.execute(new Runnable() {
@Override
public void run() {
shutdownInput0(promise);
}
});
}
return promise;
}
@Override
protected void doCloseAsyncGracefully() {
EventLoop eventLoop = channel().eventLoop();
if (eventLoop.inEventLoop()) {
invokeUserCloseHandler();
} else {
eventLoop.execute(DefaultNettyConnection.this::invokeUserCloseHandler);
}
}
/**
* Set read pending to {@code false}.
*/
protected final void clearReadPending() {
if (isRegistered()) {
EventLoop eventLoop = eventLoop();
if (eventLoop.inEventLoop()) {
readPending = false;
} else {
eventLoop.execute(clearReadPendingRunnable);
}
} else {
// Best effort if we are not registered yet clear readPending. This happens during channel initialization.
readPending = false;
}
}
@Override
protected void doClose() throws Exception {
active = false;
// Even if we allow half closed sockets we should give up on reading. Otherwise we may allow a read attempt on a
// socket which has not even been connected yet. This has been observed to block during unit tests.
inputClosedSeenErrorOnRead = true;
try {
if (isRegistered()) {
// The FD will be closed, which should take care of deleting any associated events from kqueue, but
// since we rely upon jniSelfRef to be consistent we make sure that we clear this reference out for
// all events which are pending in kqueue to avoid referencing a deleted pointer at a later time.
// Need to check if we are on the EventLoop as doClose() may be triggered by the GlobalEventExecutor
// if SO_LINGER is used.
//
// See https://github.com/netty/netty/issues/7159
EventLoop loop = eventLoop();
if (loop.inEventLoop()) {
doDeregister();
} else {
loop.execute(new Runnable() {
@Override
public void run() {
try {
doDeregister();
} catch (Throwable cause) {
pipeline().fireExceptionCaught(cause);
}
}
});
}
}
} finally {
socket.close();
}
}
@Override
public ChannelFuture shutdownOutput(final ChannelPromise promise) {
EventLoop loop = eventLoop();
if (loop.inEventLoop()) {
((AbstractUnsafe) unsafe()).shutdownOutput(promise);
} else {
loop.execute(new Runnable() {
@Override
public void run() {
((AbstractUnsafe) unsafe()).shutdownOutput(promise);
}
});
}
return promise;
}
void initiateGracefulClose(final Runnable whenInitiated) {
EventLoop eventLoop = channel.eventLoop();
if (eventLoop.inEventLoop()) {
doCloseAsyncGracefully0(whenInitiated);
} else {
eventLoop.execute(() -> doCloseAsyncGracefully0(whenInitiated));
}
}
@Override
protected void doClose() throws Exception {
final LocalChannel peer = this.peer;
State oldState = state;
try {
if (oldState != State.CLOSED) {
// Update all internal state before the closeFuture is notified.
if (localAddress != null) {
if (parent() == null) {
LocalChannelRegistry.unregister(localAddress);
}
localAddress = null;
}
// State change must happen before finishPeerRead to ensure writes are released either in doWrite or
// channelRead.
state = State.CLOSED;
// Preserve order of event and force a read operation now before the close operation is processed.
if (writeInProgress && peer != null) {
finishPeerRead(peer);
}
ChannelPromise promise = connectPromise;
if (promise != null) {
// Use tryFailure() instead of setFailure() to avoid the race against cancel().
promise.tryFailure(DO_CLOSE_CLOSED_CHANNEL_EXCEPTION);
connectPromise = null;
}
}
if (peer != null) {
this.peer = null;
// Always call peer.eventLoop().execute() even if peer.eventLoop().inEventLoop() is true.
// This ensures that if both channels are on the same event loop, the peer's channelInActive
// event is triggered *after* this peer's channelInActive event
EventLoop peerEventLoop = peer.eventLoop();
final boolean peerIsActive = peer.isActive();
try {
peerEventLoop.execute(new Runnable() {
@Override
public void run() {
peer.tryClose(peerIsActive);
}
});
} catch (Throwable cause) {
logger.warn("Releasing Inbound Queues for channels {}-{} because exception occurred!",
this, peer, cause);
if (peerEventLoop.inEventLoop()) {
peer.releaseInboundBuffers();
} else {
// inboundBuffers is a SPSC so we may leak if the event loop is shutdown prematurely or
// rejects the close Runnable but give a best effort.
peer.close();
}
PlatformDependent.throwException(cause);
}
}
} finally {
// Release all buffers if the Channel was already registered in the past and if it was not closed before.
if (oldState != null && oldState != State.CLOSED) {
// We need to release all the buffers that may be put into our inbound queue since we closed the Channel
// to ensure we not leak any memory. This is fine as it basically gives the same guarantees as TCP which
// means even if the promise was notified before its not really guaranteed that the "remote peer" will
// see the buffer at all.
releaseInboundBuffers();
}
}
}
private boolean isAllowedToBlock() {
final EventLoop eventLoop = channel.eventLoop();
final boolean inEventLoop = eventLoop.inEventLoop();
return !inEventLoop;
}