下面列出了怎么用io.netty.util.internal.InternalThreadLocalMap的API类实例代码及写法,或者点击链接到github查看源代码。
/**
* Return {@code true} if the implementation is {@link Sharable} and so can be added
* to different {@link ChannelPipeline}s.
* 如果实现是ChannelHandler,则返回true。可共享的等等可以添加到不同的管道中。
*/
public boolean isSharable() {
/**
* Cache the result of {@link Sharable} annotation detection to workaround a condition. We use a
* {@link ThreadLocal} and {@link WeakHashMap} to eliminate the volatile write/reads. Using different
* {@link WeakHashMap} instances per {@link Thread} is good enough for us and the number of
* {@link Thread}s are quite limited anyway.
* 缓存ChannelHandler的结果。可共享注释检测来解决一个条件。我们使用ThreadLocal和WeakHashMap来消除易失性的写/读。每个线程使用不同的WeakHashMap实例就足够了,而且线程的数量也相当有限。
*
* See <a href="https://github.com/netty/netty/issues/2289">#2289</a>.
*/
Class<?> clazz = getClass();
Map<Class<?>, Boolean> cache = InternalThreadLocalMap.get().handlerSharableCache();
Boolean sharable = cache.get(clazz);
if (sharable == null) {
sharable = clazz.isAnnotationPresent(Sharable.class);
cache.put(clazz, sharable);
}
return sharable;
}
/**
* Removes all {@link FastThreadLocal} variables bound to the current thread. This operation is useful when you
* are in a container environment, and you don't want to leave the thread local variables in the threads you do not
* manage.删除所有绑定到当前线程的FastThreadLocal变量。当您在容器环境中,并且您不想将线程局部变量留在您没有管理的线程中时,此操作非常有用。
*/
//
public static void removeAll() {
InternalThreadLocalMap threadLocalMap = InternalThreadLocalMap.getIfSet();
if (threadLocalMap == null) {
return;
}
try {
Object v = threadLocalMap.indexedVariable(variablesToRemoveIndex);
if (v != null && v != InternalThreadLocalMap.UNSET) {
@SuppressWarnings("unchecked")
Set<FastThreadLocal<?>> variablesToRemove = (Set<FastThreadLocal<?>>) v;
FastThreadLocal<?>[] variablesToRemoveArray =
variablesToRemove.toArray(new FastThreadLocal[variablesToRemove.size()]);
for (FastThreadLocal<?> tlv: variablesToRemoveArray) {
tlv.remove(threadLocalMap);
}
}
} finally {
InternalThreadLocalMap.remove();
}
}
/**
* Sets the value to uninitialized for the specified thread local map;
* a proceeding call to get() will trigger a call to initialValue().
* The specified thread local map must be for the current thread.
* 将指定的线程本地映射的值设置为未初始化;get()的进程调用将触发对initialValue()的调用。指定的线程本地映射必须针对当前线程。
*/
//
@SuppressWarnings("unchecked")
public final void remove(InternalThreadLocalMap threadLocalMap) {
if (threadLocalMap == null) {
return;
}
Object v = threadLocalMap.removeIndexedVariable(index);
removeFromVariablesToRemove(threadLocalMap, this);
if (v != InternalThreadLocalMap.UNSET) {
try {
onRemoval((V) v);
} catch (Exception e) {
PlatformDependent.throwException(e);
}
}
}
private void notifyListeners() {
EventExecutor executor = executor();
if (executor.inEventLoop()) {
final InternalThreadLocalMap threadLocals = InternalThreadLocalMap.get();
final int stackDepth = threadLocals.futureListenerStackDepth();
if (stackDepth < MAX_LISTENER_STACK_DEPTH) {
threadLocals.setFutureListenerStackDepth(stackDepth + 1);
try {
// 立即通知listener
notifyListenersNow();
} finally {
threadLocals.setFutureListenerStackDepth(stackDepth);
}
return;
}
}
safeExecute(executor, new Runnable() {
@Override
public void run() {
notifyListenersNow();
}
});
}
@Benchmark
@BenchmarkMode({ Mode.Throughput })
@OutputTimeUnit(TimeUnit.MICROSECONDS)
@SuppressWarnings("unchecked")
public Integer fastThreadWithArrayMap() {
Object obj = fastThread.threadLocalMap().indexedVariable(1024);
ConcurrentIntToObjectArrayMap<Integer> map;
if (obj != InternalThreadLocalMap.UNSET) {
map = (ConcurrentIntToObjectArrayMap<Integer>) obj;
return map.getOrUpdate(1024, () -> value);
}
map = new ConcurrentIntToObjectArrayMap<>();
fastThread.threadLocalMap().setIndexedVariable(1024, map);
return map.getOrUpdate(1024, () -> value);
}
public FingerprintTrustManagerFactorySHA256(final String... fp) {
final byte[][] fingerprints = toFingerprintArray(Arrays.asList(fp));
final List<byte[]> list = InternalThreadLocalMap.get().arrayList();
for (final byte[] f: fingerprints) {
if (f == null) {
break;
}
if (f.length != Wally.SHA256_LEN) {
throw new IllegalArgumentException("malformed fingerprint: " +
Wally.hex_from_bytes(f) + " (expected: SHA256)");
}
list.add(f.clone());
}
this.fingerprints = list.toArray(new byte[list.size()][]);
}
/**
* Return {@code true} if the implementation is {@link Sharable} and so can be added
* to different {@link ChannelPipeline}s.
*/
public boolean isSharable() {
/**
* Cache the result of {@link Sharable} annotation detection to workaround a condition. We use a
* {@link ThreadLocal} and {@link WeakHashMap} to eliminate the volatile write/reads. Using different
* {@link WeakHashMap} instances per {@link Thread} is good enough for us and the number of
* {@link Thread}s are quite limited anyway.
*
* See <a href="https://github.com/netty/netty/issues/2289">#2289</a>.
*/
Class<?> clazz = getClass();
Map<Class<?>, Boolean> cache = InternalThreadLocalMap.get().handlerSharableCache();
Boolean sharable = cache.get(clazz);
if (sharable == null) {
sharable = clazz.isAnnotationPresent(Sharable.class);
cache.put(clazz, sharable);
}
return sharable;
}
/**
* Removes all {@link FastThreadLocal} variables bound to the current thread. This operation is useful when you
* are in a container environment, and you don't want to leave the thread local variables in the threads you do not
* manage.
*/
public static void removeAll() {
InternalThreadLocalMap threadLocalMap = InternalThreadLocalMap.getIfSet();
if (threadLocalMap == null) {
return;
}
try {
Object v = threadLocalMap.indexedVariable(variablesToRemoveIndex);
if (v != null && v != InternalThreadLocalMap.UNSET) {
@SuppressWarnings("unchecked")
Set<FastThreadLocal<?>> variablesToRemove = (Set<FastThreadLocal<?>>) v;
FastThreadLocal<?>[] variablesToRemoveArray =
variablesToRemove.toArray(new FastThreadLocal[variablesToRemove.size()]);
for (FastThreadLocal<?> tlv: variablesToRemoveArray) {
tlv.remove(threadLocalMap);
}
}
} finally {
InternalThreadLocalMap.remove();
}
}
/**
* Sets the value to uninitialized for the specified thread local map;
* a proceeding call to get() will trigger a call to initialValue().
* The specified thread local map must be for the current thread.
*/
@SuppressWarnings("unchecked")
public final void remove(InternalThreadLocalMap threadLocalMap) {
if (threadLocalMap == null) {
return;
}
Object v = threadLocalMap.removeIndexedVariable(index);
removeFromVariablesToRemove(threadLocalMap, this);
if (v != InternalThreadLocalMap.UNSET) {
try {
onRemoval((V) v);
} catch (Exception e) {
PlatformDependent.throwException(e);
}
}
}
protected static void notifyListener(
final EventExecutor eventExecutor, final Future<?> future, final GenericFutureListener<?> l) {
if (eventExecutor.inEventLoop()) {
final InternalThreadLocalMap threadLocals = InternalThreadLocalMap.get();
final int stackDepth = threadLocals.futureListenerStackDepth();
if (stackDepth < MAX_LISTENER_STACK_DEPTH) {
threadLocals.setFutureListenerStackDepth(stackDepth + 1);
try {
notifyListener0(future, l);
} finally {
threadLocals.setFutureListenerStackDepth(stackDepth);
}
return;
}
}
execute(eventExecutor, new Runnable() {
@Override
public void run() {
notifyListener0(future, l);
}
});
}
/**
* Returns a cached thread-local {@link CharsetEncoder} for the specified
* <tt>charset</tt>.
*/
public static CharsetEncoder getEncoder(Charset charset) {
if (charset == null) {
throw new NullPointerException("charset");
}
Map<Charset, CharsetEncoder> map = InternalThreadLocalMap.get().charsetEncoderCache();
CharsetEncoder e = map.get(charset);
if (e != null) {
e.reset();
e.onMalformedInput(CodingErrorAction.REPLACE);
e.onUnmappableCharacter(CodingErrorAction.REPLACE);
return e;
}
e = charset.newEncoder();
e.onMalformedInput(CodingErrorAction.REPLACE);
e.onUnmappableCharacter(CodingErrorAction.REPLACE);
map.put(charset, e);
return e;
}
/**
* Returns a cached thread-local {@link CharsetDecoder} for the specified
* <tt>charset</tt>.
*/
public static CharsetDecoder getDecoder(Charset charset) {
if (charset == null) {
throw new NullPointerException("charset");
}
Map<Charset, CharsetDecoder> map = InternalThreadLocalMap.get().charsetDecoderCache();
CharsetDecoder d = map.get(charset);
if (d != null) {
d.reset();
d.onMalformedInput(CodingErrorAction.REPLACE);
d.onUnmappableCharacter(CodingErrorAction.REPLACE);
return d;
}
d = charset.newDecoder();
d.onMalformedInput(CodingErrorAction.REPLACE);
d.onUnmappableCharacter(CodingErrorAction.REPLACE);
map.put(charset, d);
return d;
}
/**
* Encodes the specified cookies into a single Cookie header value.将指定的Cookie编码为单个Cookie标头值。
*
* @param cookies some cookies
* @return a Rfc6265 style Cookie header value, null if no cookies are passed.
*/
public String encode(Iterable<? extends Cookie> cookies) {
Iterator<? extends Cookie> cookiesIt = checkNotNull(cookies, "cookies").iterator();
if (!cookiesIt.hasNext()) {
return null;
}
StringBuilder buf = stringBuilder();
if (strict) {
Cookie firstCookie = cookiesIt.next();
if (!cookiesIt.hasNext()) {
encode(buf, firstCookie);
} else {
List<Cookie> cookiesList = InternalThreadLocalMap.get().arrayList();
cookiesList.add(firstCookie);
while (cookiesIt.hasNext()) {
cookiesList.add(cookiesIt.next());
}
Cookie[] cookiesSorted = cookiesList.toArray(new Cookie[cookiesList.size()]);
Arrays.sort(cookiesSorted, COOKIE_COMPARATOR);
for (Cookie c : cookiesSorted) {
encode(buf, c);
}
}
} else {
while (cookiesIt.hasNext()) {
encode(buf, cookiesIt.next());
}
}
return stripTrailingSeparatorOrNull(buf);
}
/**
* Split one header value in Multipart
* @return an array of String where values that were separated by ';' or ','
*/
private static String[] splitMultipartHeaderValues(String svalue) {
List<String> values = InternalThreadLocalMap.get().arrayList(1);
boolean inQuote = false;
boolean escapeNext = false;
int start = 0;
for (int i = 0; i < svalue.length(); i++) {
char c = svalue.charAt(i);
if (inQuote) {
if (escapeNext) {
escapeNext = false;
} else {
if (c == '\\') {
escapeNext = true;
} else if (c == '"') {
inQuote = false;
}
}
} else {
if (c == '"') {
inQuote = true;
} else if (c == ';') {
values.add(svalue.substring(start, i));
start = i + 1;
}
}
}
values.add(svalue.substring(start));
return values.toArray(new String[values.size()]);
}
@Override
protected void doBeginRead() throws Exception {
if (readInProgress) {
return;
}
ChannelPipeline pipeline = pipeline();
Queue<Object> inboundBuffer = this.inboundBuffer;
if (inboundBuffer.isEmpty()) {
readInProgress = true;
return;
}
final InternalThreadLocalMap threadLocals = InternalThreadLocalMap.get();
final Integer stackDepth = threadLocals.localChannelReaderStackDepth();
if (stackDepth < MAX_READER_STACK_DEPTH) {
threadLocals.setLocalChannelReaderStackDepth(stackDepth + 1);
try {
for (;;) {
Object received = inboundBuffer.poll();
if (received == null) {
break;
}
pipeline.fireChannelRead(received);
}
pipeline.fireChannelReadComplete();
} finally {
threadLocals.setLocalChannelReaderStackDepth(stackDepth);
}
} else {
try {
eventLoop().execute(readTask);
} catch (Throwable cause) {
logger.warn("Closing Local channels {}-{} because exception occurred!", this, peer, cause);
close();
peer.close();
PlatformDependent.throwException(cause);
}
}
}
/**
* Splits the specified {@link String} with the specified delimiter..
*/
public AsciiString[] split(char delim) {
final List<AsciiString> res = InternalThreadLocalMap.get().arrayList();
int start = 0;
final int length = length();
for (int i = start; i < length; i++) {
if (charAt(i) == delim) {
if (start == i) {
res.add(EMPTY_STRING);
} else {
res.add(new AsciiString(value, start + arrayOffset(), i - start, false));
}
start = i + 1;
}
}
if (start == 0) { // If no delimiter was found in the value
res.add(this);
} else {
if (start != length) {
// Add the last element if it's not empty.
res.add(new AsciiString(value, start + arrayOffset(), length - start, false));
} else {
// Truncate trailing empty elements.
for (int i = res.size() - 1; i >= 0; i--) {
if (res.get(i).isEmpty()) {
res.remove(i);
} else {
break;
}
}
}
}
return res.toArray(new AsciiString[res.size()]);
}
/**
* Returns the number of thread local variables bound to the current thread.
*/
//
public static int size() {
InternalThreadLocalMap threadLocalMap = InternalThreadLocalMap.getIfSet();
if (threadLocalMap == null) {
return 0;
} else {
return threadLocalMap.size();
}
}
@SuppressWarnings("unchecked")
private static void addToVariablesToRemove(InternalThreadLocalMap threadLocalMap, FastThreadLocal<?> variable) {
Object v = threadLocalMap.indexedVariable(variablesToRemoveIndex);
Set<FastThreadLocal<?>> variablesToRemove;
if (v == InternalThreadLocalMap.UNSET || v == null) {
variablesToRemove = Collections.newSetFromMap(new IdentityHashMap<FastThreadLocal<?>, Boolean>());
threadLocalMap.setIndexedVariable(variablesToRemoveIndex, variablesToRemove);
} else {
variablesToRemove = (Set<FastThreadLocal<?>>) v;
}
variablesToRemove.add(variable);
}
private static void removeFromVariablesToRemove(
InternalThreadLocalMap threadLocalMap, FastThreadLocal<?> variable) {
Object v = threadLocalMap.indexedVariable(variablesToRemoveIndex);
if (v == InternalThreadLocalMap.UNSET || v == null) {
return;
}
@SuppressWarnings("unchecked")
Set<FastThreadLocal<?>> variablesToRemove = (Set<FastThreadLocal<?>>) v;
variablesToRemove.remove(variable);
}
/**
* Returns the current value for the current thread 返回当前线程的当前值
*/
@SuppressWarnings("unchecked")
public final V get() {
InternalThreadLocalMap threadLocalMap = InternalThreadLocalMap.get();
Object v = threadLocalMap.indexedVariable(index);
if (v != InternalThreadLocalMap.UNSET) {
return (V) v;
}
V value = initialize(threadLocalMap);
registerCleaner(threadLocalMap);
return value;
}
private void registerCleaner(final InternalThreadLocalMap threadLocalMap) {
Thread current = Thread.currentThread();
if (FastThreadLocalThread.willCleanupFastThreadLocals(current) ||
threadLocalMap.indexedVariable(cleanerFlagIndex) != InternalThreadLocalMap.UNSET) {
return;
}
// removeIndexedVariable(cleanerFlagIndex) isn't necessary because the finally cleanup is tied to the lifetime
// of the thread, and this Object will be discarded if the associated thread is GCed.
// removeIndexedVariable(cleanerFlagIndex)不是必需的,因为最终的清理与生命周期有关
//线程,如果关联的线程是GCed,这个对象将被丢弃。
threadLocalMap.setIndexedVariable(cleanerFlagIndex, Boolean.TRUE);
// We will need to ensure we will trigger remove(InternalThreadLocalMap) so everything will be released
// and FastThreadLocal.onRemoval(...) will be called.
//我们需要确保我们将触发remove(InternalThreadLocalMap),以便释放所有内容
//和fastthreadlocal.onremove(…)将被调用。
ObjectCleaner.register(current, new Runnable() {
@Override
public void run() {
remove(threadLocalMap);
// It's fine to not call InternalThreadLocalMap.remove() here as this will only be triggered once
// the Thread is collected by GC. In this case the ThreadLocal will be gone away already.在这里不调用InternalThreadLocalMap.remove()是可以的,因为它只会被触发一次
//线程被GC收集。在这种情况下,ThreadLocal已经消失了。
}
});
}
/**
* Returns the current value for the specified thread local map.
* The specified thread local map must be for the current thread.
*/
@SuppressWarnings("unchecked")
public final V get(InternalThreadLocalMap threadLocalMap) {
Object v = threadLocalMap.indexedVariable(index);
if (v != InternalThreadLocalMap.UNSET) {
return (V) v;
}
return initialize(threadLocalMap);
}
private V initialize(InternalThreadLocalMap threadLocalMap) {
V v = null;
try {
v = initialValue();
} catch (Exception e) {
PlatformDependent.throwException(e);
}
threadLocalMap.setIndexedVariable(index, v);
addToVariablesToRemove(threadLocalMap, this);
return v;
}
/**
* Set the value for the current thread.
*/
//
public final void set(V value) {
if (value != InternalThreadLocalMap.UNSET) {
InternalThreadLocalMap threadLocalMap = InternalThreadLocalMap.get();
if (setKnownNotUnset(threadLocalMap, value)) {
registerCleaner(threadLocalMap);
}
} else {
remove();
}
}
/**
* Set the value for the specified thread local map. The specified thread local map must be for the current thread.设置指定线程局部映射的值。指定的线程本地映射必须为当前线程。
*/
//
public final void set(InternalThreadLocalMap threadLocalMap, V value) {
if (value != InternalThreadLocalMap.UNSET) {
setKnownNotUnset(threadLocalMap, value);
} else {
remove(threadLocalMap);
}
}
/**
* @return see {@link InternalThreadLocalMap#setIndexedVariable(int, Object)}.
*/
//
private boolean setKnownNotUnset(InternalThreadLocalMap threadLocalMap, V value) {
if (threadLocalMap.setIndexedVariable(index, value)) {
addToVariablesToRemove(threadLocalMap, this);
return true;
}
return false;
}
/**
* The logic in this method should be identical to {@link #notifyListeners()} but
* cannot share code because the listener(s) cannot be cached for an instance of {@link DefaultPromise} since the
* listener(s) may be changed and is protected by a synchronized operation.
* 该方法中的逻辑应该与notifylistener()相同,但不能共享代码,因为侦听器(s)不能缓存为DefaultPromise的实例,因为侦听器(s)可能被更改,并受到同步操作的保护。
*/
private static void notifyListenerWithStackOverFlowProtection(final EventExecutor executor,
final Future<?> future,
final GenericFutureListener<?> listener) {
// 在事件循环中就直接使用当前线程,避免线程上下文切换
if (executor.inEventLoop()) {
final InternalThreadLocalMap threadLocals = InternalThreadLocalMap.get();
final int stackDepth = threadLocals.futureListenerStackDepth();
if (stackDepth < MAX_LISTENER_STACK_DEPTH) {
threadLocals.setFutureListenerStackDepth(stackDepth + 1);
try {
notifyListener0(future, listener);
} finally {
threadLocals.setFutureListenerStackDepth(stackDepth);
}
return;
}
}
// 不在事件循环中就从执行器中查询线程执行
safeExecute(executor, new Runnable() {
@Override
public void run() {
notifyListener0(future, listener);
}
});
}
/**
* Returns a cached thread-local {@link CharsetEncoder} for the specified {@link Charset}.
*
* @param charset The specified charset
* @return The encoder for the specified {@code charset}
* 为指定的字符集返回缓存的线程本地CharsetEncoder。
*/
public static CharsetEncoder encoder(Charset charset) {
checkNotNull(charset, "charset");
Map<Charset, CharsetEncoder> map = InternalThreadLocalMap.get().charsetEncoderCache();
CharsetEncoder e = map.get(charset);
if (e != null) {
e.reset().onMalformedInput(CodingErrorAction.REPLACE).onUnmappableCharacter(CodingErrorAction.REPLACE);
return e;
}
e = encoder(charset, CodingErrorAction.REPLACE, CodingErrorAction.REPLACE);
map.put(charset, e);
return e;
}
/**
* Returns a cached thread-local {@link CharsetDecoder} for the specified {@link Charset}.返回指定字符集的缓存线程本地CharsetDecoder。
*
* @param charset The specified charset
* @return The decoder for the specified {@code charset}
*/
public static CharsetDecoder decoder(Charset charset) {
checkNotNull(charset, "charset");
Map<Charset, CharsetDecoder> map = InternalThreadLocalMap.get().charsetDecoderCache();
CharsetDecoder d = map.get(charset);
if (d != null) {
d.reset().onMalformedInput(CodingErrorAction.REPLACE).onUnmappableCharacter(CodingErrorAction.REPLACE);
return d;
}
d = decoder(charset, CodingErrorAction.REPLACE, CodingErrorAction.REPLACE);
map.put(charset, d);
return d;
}
@Benchmark
@BenchmarkMode({ Mode.Throughput })
@OutputTimeUnit(TimeUnit.MICROSECONDS)
public Integer fastThread() {
Object obj = fastThread.threadLocalMap().indexedVariable(256);
if (obj != InternalThreadLocalMap.UNSET) {
return (Integer) obj;
}
Integer value = 100;
fastThread.threadLocalMap().setIndexedVariable(256, value);
return value;
}