下面列出了io.netty.util.internal.InternalThreadLocalMap#get ( ) 实例代码,或者点击链接到github查看源代码,也可以在右侧发表评论。
private void notifyListeners() {
EventExecutor executor = executor();
if (executor.inEventLoop()) {
final InternalThreadLocalMap threadLocals = InternalThreadLocalMap.get();
final int stackDepth = threadLocals.futureListenerStackDepth();
if (stackDepth < MAX_LISTENER_STACK_DEPTH) {
threadLocals.setFutureListenerStackDepth(stackDepth + 1);
try {
// 立即通知listener
notifyListenersNow();
} finally {
threadLocals.setFutureListenerStackDepth(stackDepth);
}
return;
}
}
safeExecute(executor, new Runnable() {
@Override
public void run() {
notifyListenersNow();
}
});
}
protected static void notifyListener(
final EventExecutor eventExecutor, final Future<?> future, final GenericFutureListener<?> l) {
if (eventExecutor.inEventLoop()) {
final InternalThreadLocalMap threadLocals = InternalThreadLocalMap.get();
final int stackDepth = threadLocals.futureListenerStackDepth();
if (stackDepth < MAX_LISTENER_STACK_DEPTH) {
threadLocals.setFutureListenerStackDepth(stackDepth + 1);
try {
notifyListener0(future, l);
} finally {
threadLocals.setFutureListenerStackDepth(stackDepth);
}
return;
}
}
execute(eventExecutor, new Runnable() {
@Override
public void run() {
notifyListener0(future, l);
}
});
}
@Override
protected void doBeginRead() throws Exception {
if (readInProgress) {
return;
}
ChannelPipeline pipeline = pipeline();
Queue<Object> inboundBuffer = this.inboundBuffer;
if (inboundBuffer.isEmpty()) {
readInProgress = true;
return;
}
final InternalThreadLocalMap threadLocals = InternalThreadLocalMap.get();
final Integer stackDepth = threadLocals.localChannelReaderStackDepth();
if (stackDepth < MAX_READER_STACK_DEPTH) {
threadLocals.setLocalChannelReaderStackDepth(stackDepth + 1);
try {
for (;;) {
Object received = inboundBuffer.poll();
if (received == null) {
break;
}
pipeline.fireChannelRead(received);
}
pipeline.fireChannelReadComplete();
} finally {
threadLocals.setLocalChannelReaderStackDepth(stackDepth);
}
} else {
try {
eventLoop().execute(readTask);
} catch (Throwable cause) {
logger.warn("Closing Local channels {}-{} because exception occurred!", this, peer, cause);
close();
peer.close();
PlatformDependent.throwException(cause);
}
}
}
/**
* Returns the current value for the current thread 返回当前线程的当前值
*/
@SuppressWarnings("unchecked")
public final V get() {
InternalThreadLocalMap threadLocalMap = InternalThreadLocalMap.get();
Object v = threadLocalMap.indexedVariable(index);
if (v != InternalThreadLocalMap.UNSET) {
return (V) v;
}
V value = initialize(threadLocalMap);
registerCleaner(threadLocalMap);
return value;
}
/**
* Set the value for the current thread.
*/
//
public final void set(V value) {
if (value != InternalThreadLocalMap.UNSET) {
InternalThreadLocalMap threadLocalMap = InternalThreadLocalMap.get();
if (setKnownNotUnset(threadLocalMap, value)) {
registerCleaner(threadLocalMap);
}
} else {
remove();
}
}
/**
* The logic in this method should be identical to {@link #notifyListeners()} but
* cannot share code because the listener(s) cannot be cached for an instance of {@link DefaultPromise} since the
* listener(s) may be changed and is protected by a synchronized operation.
* 该方法中的逻辑应该与notifylistener()相同,但不能共享代码,因为侦听器(s)不能缓存为DefaultPromise的实例,因为侦听器(s)可能被更改,并受到同步操作的保护。
*/
private static void notifyListenerWithStackOverFlowProtection(final EventExecutor executor,
final Future<?> future,
final GenericFutureListener<?> listener) {
// 在事件循环中就直接使用当前线程,避免线程上下文切换
if (executor.inEventLoop()) {
final InternalThreadLocalMap threadLocals = InternalThreadLocalMap.get();
final int stackDepth = threadLocals.futureListenerStackDepth();
if (stackDepth < MAX_LISTENER_STACK_DEPTH) {
threadLocals.setFutureListenerStackDepth(stackDepth + 1);
try {
notifyListener0(future, listener);
} finally {
threadLocals.setFutureListenerStackDepth(stackDepth);
}
return;
}
}
// 不在事件循环中就从执行器中查询线程执行
safeExecute(executor, new Runnable() {
@Override
public void run() {
notifyListener0(future, listener);
}
});
}
@Override
protected void doBeginRead() throws Exception {
if (readInProgress) {
return;
}
Queue<Object> inboundBuffer = this.inboundBuffer;
if (inboundBuffer.isEmpty()) {
readInProgress = true;
return;
}
final InternalThreadLocalMap threadLocals = InternalThreadLocalMap.get();
final Integer stackDepth = threadLocals.localChannelReaderStackDepth();
if (stackDepth < MAX_READER_STACK_DEPTH) {
threadLocals.setLocalChannelReaderStackDepth(stackDepth + 1);
try {
readInbound();
} finally {
threadLocals.setLocalChannelReaderStackDepth(stackDepth);
}
} else {
try {
eventLoop().execute(readTask);
} catch (Throwable cause) {
logger.warn("Closing Local channels {}-{} because exception occurred!", this, cause);
close();
virtualConnection.close();
PlatformDependent.throwException(cause);
}
}
}
@Override
protected void doBeginRead() throws Exception {
if (readInProgress) {
return;
}
ChannelPipeline pipeline = pipeline();
Queue<Object> inboundBuffer = this.inboundBuffer;
if (inboundBuffer.isEmpty()) {
readInProgress = true;
return;
}
final InternalThreadLocalMap threadLocals = InternalThreadLocalMap.get();
final Integer stackDepth = threadLocals.localChannelReaderStackDepth();
if (stackDepth < MAX_READER_STACK_DEPTH) {
threadLocals.setLocalChannelReaderStackDepth(stackDepth + 1);
try {
for (;;) {
Object received = inboundBuffer.poll();
if (received == null) {
break;
}
pipeline.fireChannelRead(received);
}
pipeline.fireChannelReadComplete();
} finally {
threadLocals.setLocalChannelReaderStackDepth(stackDepth);
}
} else {
eventLoop().execute(readTask);
}
}
@Nullable
@Override
@SuppressWarnings("unchecked")
public <T extends RequestContext> T push(RequestContext toPush) {
requireNonNull(toPush, "toPush");
pushCalled.incrementAndGet();
final InternalThreadLocalMap map = InternalThreadLocalMap.get();
final RequestContext oldCtx = context.get(map);
context.set(map, toPush);
return (T) oldCtx;
}