下面列出了怎么用io.netty.util.concurrent.FastThreadLocalThread的API类实例代码及写法,或者点击链接到github查看源代码。
@Test
public void testWithoutUseCacheForAllThreads() {
assertFalse(Thread.currentThread() instanceof FastThreadLocalThread);
PooledByteBufAllocator pool = new PooledByteBufAllocator(
/*preferDirect=*/ false,
/*nHeapArena=*/ 1,
/*nDirectArena=*/ 1,
/*pageSize=*/8192,
/*maxOrder=*/ 11,
/*tinyCacheSize=*/ 0,
/*smallCacheSize=*/ 0,
/*normalCacheSize=*/ 0,
/*useCacheForAllThreads=*/ false);
ByteBuf buf = pool.buffer(1);
buf.release();
}
public static void test1() {
int size = 10000;
FastThreadLocal<String> tls[] = new FastThreadLocal[size];
for (int i = 0; i < size; i++) {
tls[i] = new FastThreadLocal<String>();
}
new FastThreadLocalThread(new Runnable() {
@Override
public void run() {
long starTime = System.currentTimeMillis();
for (int i = 0; i < size; i++) {
tls[i].set("value" + i);
}
for (int i = 0; i < size; i++) {
for (int k = 0; k < 100000; k++) {
tls[i].get();
}
}
System.out.println(System.currentTimeMillis() - starTime + "ms");
}
}).start();
}
public static void test2() throws Exception {
CountDownLatch cdl = new CountDownLatch(10000);
FastThreadLocal<String> threadLocal = new FastThreadLocal<String>();
long starTime = System.currentTimeMillis();
for (int i = 0; i < 10000; i++) {
new FastThreadLocalThread(new Runnable() {
@Override
public void run() {
threadLocal.set(Thread.currentThread().getName());
for (int k = 0; k < 100000; k++) {
threadLocal.get();
}
cdl.countDown();
}
}, "Thread" + (i + 1)).start();
}
cdl.await();
System.out.println(System.currentTimeMillis() - starTime);
}
@Override
protected synchronized PoolThreadCache initialValue() {
final PoolArena<byte[]> heapArena = leastUsedArena(heapArenas);
final PoolArena<ByteBuffer> directArena = leastUsedArena(directArenas);
Thread current = Thread.currentThread();
if (useCacheForAllThreads || current instanceof FastThreadLocalThread) {
return new PoolThreadCache(
heapArena, directArena, tinyCacheSize, smallCacheSize, normalCacheSize,
DEFAULT_MAX_CACHED_BUFFER_CAPACITY, DEFAULT_CACHE_TRIM_INTERVAL);
}
// No caching so just use 0 as sizes.没有缓存,所以只使用0作为大小。
return new PoolThreadCache(heapArena, directArena, 0, 0, 0, 0, 0);
}
private static ThreadCache createNewThreadCache(final PooledByteBufAllocator allocator)
throws InterruptedException {
final CountDownLatch latch = new CountDownLatch(1);
final CountDownLatch cacheLatch = new CountDownLatch(1);
final Thread t = new FastThreadLocalThread(new Runnable() {
@Override
public void run() {
ByteBuf buf = allocator.newHeapBuffer(1024, 1024);
// Countdown the latch after we allocated a buffer. At this point the cache must exists.
cacheLatch.countDown();
buf.writeZero(buf.capacity());
try {
latch.await();
} catch (InterruptedException e) {
throw new IllegalStateException(e);
}
buf.release();
FastThreadLocal.removeAll();
}
});
t.start();
// Wait until we allocated a buffer and so be sure the thread was started and the cache exists.
cacheLatch.await();
return new ThreadCache() {
@Override
public void destroy() throws InterruptedException {
latch.countDown();
t.join();
}
};
}
public static InternalThreadLocalMap getIfSet() {
Thread thread = Thread.currentThread();
if (thread instanceof FastThreadLocalThread) {
return ((FastThreadLocalThread) thread).threadLocalMap();
}
return slowThreadLocalMap.get();
}
public static InternalThreadLocalMap get() {
Thread thread = Thread.currentThread();
if (thread instanceof FastThreadLocalThread) {
return fastGet((FastThreadLocalThread) thread);
} else {
return slowGet();
}
}
private static InternalThreadLocalMap fastGet(FastThreadLocalThread thread) {
InternalThreadLocalMap threadLocalMap = thread.threadLocalMap();
if (threadLocalMap == null) {
thread.setThreadLocalMap(threadLocalMap = new InternalThreadLocalMap());
}
return threadLocalMap;
}
public static void remove() {
Thread thread = Thread.currentThread();
if (thread instanceof FastThreadLocalThread) {
((FastThreadLocalThread) thread).setThreadLocalMap(null);
} else {
slowThreadLocalMap.remove();
}
}
@Override
public Thread newThread(Runnable r) {
String name = namePrefix + threadNumber.getAndIncrement();
Thread t = new FastThreadLocalThread(group, r, name, 0);
t.setDaemon(true);
t.setPriority(Thread.NORM_PRIORITY);
log.info("create thread:{}", name);
return t;
}
public HDBClient(ClientConfiguration configuration, StatsLogger statsLogger) {
this.configuration = configuration;
this.statsLogger = statsLogger.scope("hdbclient");
int corePoolSize = configuration.getInt(ClientConfiguration.PROPERTY_CLIENT_CALLBACKS, ClientConfiguration.PROPERTY_CLIENT_CALLBACKS_DEFAULT);
this.maxOperationRetryCount = configuration.getInt(ClientConfiguration.PROPERTY_MAX_OPERATION_RETRY_COUNT, ClientConfiguration.PROPERTY_MAX_OPERATION_RETRY_COUNT_DEFAULT);
this.operationRetryDelay = configuration.getInt(ClientConfiguration.PROPERTY_OPERATION_RETRY_DELAY, ClientConfiguration.PROPERTY_OPERATION_RETRY_DELAY_DEFAULT);
this.thredpool = new ThreadPoolExecutor(corePoolSize, Integer.MAX_VALUE,
120L, TimeUnit.SECONDS,
new LinkedBlockingQueue<>(),
(Runnable r) -> {
Thread t = new FastThreadLocalThread(r, "hdb-client");
t.setDaemon(true);
return t;
});
this.networkGroup = NetworkUtils.isEnableEpoolNative() ? new EpollEventLoopGroup() : new NioEventLoopGroup();
this.localEventsGroup = new DefaultEventLoopGroup();
String mode = configuration.getString(ClientConfiguration.PROPERTY_MODE, ClientConfiguration.PROPERTY_MODE_LOCAL);
switch (mode) {
case ClientConfiguration.PROPERTY_MODE_LOCAL:
case ClientConfiguration.PROPERTY_MODE_STANDALONE:
this.clientSideMetadataProvider = new StaticClientSideMetadataProvider(
configuration.getString(ClientConfiguration.PROPERTY_SERVER_ADDRESS, ClientConfiguration.PROPERTY_SERVER_ADDRESS_DEFAULT),
configuration.getInt(ClientConfiguration.PROPERTY_SERVER_PORT, ClientConfiguration.PROPERTY_SERVER_PORT_DEFAULT),
configuration.getBoolean(ClientConfiguration.PROPERTY_SERVER_SSL, ClientConfiguration.PROPERTY_SERVER_SSL_DEFAULT)
);
break;
case ClientConfiguration.PROPERTY_MODE_CLUSTER:
this.clientSideMetadataProvider = new ZookeeperClientSideMetadataProvider(
configuration.getString(ClientConfiguration.PROPERTY_ZOOKEEPER_ADDRESS, ClientConfiguration.PROPERTY_ZOOKEEPER_ADDRESS_DEFAULT),
configuration.getInt(ClientConfiguration.PROPERTY_ZOOKEEPER_SESSIONTIMEOUT, ClientConfiguration.PROPERTY_ZOOKEEPER_SESSIONTIMEOUT_DEFAULT),
configuration.getString(ClientConfiguration.PROPERTY_ZOOKEEPER_PATH, ClientConfiguration.PROPERTY_ZOOKEEPER_PATH_DEFAULT)
);
break;
default:
throw new IllegalStateException(mode);
}
}
@Override
public Thread newThread(Runnable r) {
Thread t = new FastThreadLocalThread(threadGroup, r,//
m_namePrefix + "-" + m_threadNumber.getAndIncrement());
t.setDaemon(m_daemon);
if (t.getPriority() != Thread.NORM_PRIORITY)
t.setPriority(Thread.NORM_PRIORITY);
return t;
}
private static ThreadFactory createThreadFactory(String name) {
return runnable -> {
FastThreadLocalThread thread = new FastThreadLocalThread(runnable, name);
thread.setDaemon(true);
return thread;
};
}
public static InternalThreadLocalMap getIfSet() {
Thread thread = Thread.currentThread();
InternalThreadLocalMap threadLocalMap;
if (thread instanceof FastThreadLocalThread) {
threadLocalMap = ((FastThreadLocalThread) thread).threadLocalMap();
} else {
ThreadLocal<InternalThreadLocalMap> slowThreadLocalMap = UnpaddedInternalThreadLocalMap.slowThreadLocalMap;
if (slowThreadLocalMap == null) {
threadLocalMap = null;
} else {
threadLocalMap = slowThreadLocalMap.get();
}
}
return threadLocalMap;
}
public static InternalThreadLocalMap get() {
Thread thread = Thread.currentThread();
if (thread instanceof FastThreadLocalThread) {
return fastGet((FastThreadLocalThread) thread);
} else {
return slowGet();
}
}
private static InternalThreadLocalMap fastGet(FastThreadLocalThread thread) {
InternalThreadLocalMap threadLocalMap = thread.threadLocalMap();
if (threadLocalMap == null) {
thread.setThreadLocalMap(threadLocalMap = new InternalThreadLocalMap());
}
return threadLocalMap;
}
public static void remove() {
Thread thread = Thread.currentThread();
if (thread instanceof FastThreadLocalThread) {
((FastThreadLocalThread) thread).setThreadLocalMap(null);
} else {
ThreadLocal<InternalThreadLocalMap> slowThreadLocalMap = UnpaddedInternalThreadLocalMap.slowThreadLocalMap;
if (slowThreadLocalMap != null) {
slowThreadLocalMap.remove();
}
}
}
@Test
void testNonEventLoopThreadFactory() {
final ThreadGroup nonEventLoopThreadGroup = new ThreadGroup("normal-group");
final Thread nonEventLoopThread = ThreadFactories.builder("normal-thread")
.threadGroup(nonEventLoopThreadGroup)
.build()
.newThread(() -> {});
assertThat(nonEventLoopThread.getClass()).isSameAs(FastThreadLocalThread.class);
assertThat(nonEventLoopThread.getName()).startsWith("normal-thread");
assertThat(nonEventLoopThread.getPriority()).isEqualTo(Thread.NORM_PRIORITY);
assertThat(nonEventLoopThread.isDaemon()).isFalse();
assertThat(nonEventLoopThread.getThreadGroup().getName()).isEqualTo("normal-group");
final ThreadGroup nonEventLoopCustomThreadGroup = new ThreadGroup("custom-group");
final Thread nonEventLoopCustomThread = ThreadFactories.builder("custom-thread")
.priority(Thread.MAX_PRIORITY)
.daemon(true)
.threadGroup(nonEventLoopCustomThreadGroup)
.build()
.newThread(() -> {});
assertThat(nonEventLoopCustomThread.getClass()).isSameAs(FastThreadLocalThread.class);
assertThat(nonEventLoopCustomThread.getName()).startsWith("custom-thread");
assertThat(nonEventLoopCustomThread.getPriority()).isEqualTo(Thread.MAX_PRIORITY);
assertThat(nonEventLoopCustomThread.isDaemon()).isTrue();
assertThat(nonEventLoopCustomThread.getThreadGroup().getName()).isEqualTo("custom-group");
}
/**
* Register the given {@link Object} for which the {@link Runnable} will be executed once there are no references
* to the object anymore.
*
* This should only be used if there are no other ways to execute some cleanup once the Object is not reachable
* anymore because it is not a cheap way to handle the cleanup.
* 注册一个给定的对象,一旦不再有对该对象的引用,Runnable将被执行。只有当对象无法访问时,才应该使用这种方法执行一些清理,因为这不是一种廉价的处理清理的方法。
*/
//
public static void register(Object object, Runnable cleanupTask) {
AutomaticCleanerReference reference = new AutomaticCleanerReference(object,
ObjectUtil.checkNotNull(cleanupTask, "cleanupTask"));
// Its important to add the reference to the LIVE_SET before we access CLEANER_RUNNING to ensure correct
// behavior in multi-threaded environments.
//在访问CLEANER_RUNNING之前,将引用添加到LIVE_SET中,以确保正确
//多线程环境中的行为。
LIVE_SET.add(reference);
// Check if there is already a cleaner running.
//检查是否已经有一个吸尘器在运行。
if (CLEANER_RUNNING.compareAndSet(false, true)) {
final Thread cleanupThread = new FastThreadLocalThread(CLEANER_TASK);
cleanupThread.setPriority(Thread.MIN_PRIORITY);
// Set to null to ensure we not create classloader leaks by holding a strong reference to the inherited
// classloader.
//设置为null,通过保存对继承的强引用来确保不创建类加载器泄漏
//类加载器。
// See:
// - https://github.com/netty/netty/issues/7290
// - https://bugs.openjdk.java.net/browse/JDK-7008595
AccessController.doPrivileged(new PrivilegedAction<Void>() {
@Override
public Void run() {
cleanupThread.setContextClassLoader(null);
return null;
}
});
cleanupThread.setName(CLEANER_THREAD_NAME);
// Mark this as a daemon thread to ensure that we the JVM can exit if this is the only thread that is
// running.
//将其标记为守护进程线程,以确保如果这是惟一的线程,JVM可以退出
//运行。
cleanupThread.setDaemon(true);
cleanupThread.start();
}
}
protected Thread newThread(Runnable r, String name) {
return new FastThreadLocalThread(threadGroup, r, name);
}
@Override
public Thread newThread(Runnable r) {
return new FastThreadLocalThread(r, "herddb-srvcall-" + count.incrementAndGet());
}
@Override
public Thread newThread(Runnable r) {
return new FastThreadLocalThread(r, "db-dmlcall-" + count.incrementAndGet());
}
@Override
protected Thread wrapThread(ThreadGroup group, Runnable r, String name) {
return new FastThreadLocalThread(group, r, name);
}
@Override
Thread newThread(@Nullable ThreadGroup threadGroup, Runnable r, String name) {
return new FastThreadLocalThread(threadGroup, r, name);
}
public Thread newThread(final Runnable r) {
final FastThreadLocalThread t = new FastThreadLocalThread(r,
category + "-" + num++);
return t;
}