下面列出了怎么用java.util.concurrent.ThreadPoolExecutor.CallerRunsPolicy的API类实例代码及写法,或者点击链接到github查看源代码。
/**
*
* @return
* @author tanyaowu
*/
public static SynThreadPoolExecutor getTioExecutor() {
if (tioExecutor != null) {
return tioExecutor;
}
synchronized (Threads.class) {
if (tioExecutor != null) {
return tioExecutor;
}
LinkedBlockingQueue<Runnable> runnableQueue = new LinkedBlockingQueue<>();
// ArrayBlockingQueue<Runnable> tioQueue = new ArrayBlockingQueue<>(QUEUE_CAPACITY);
String threadName = "tio-worker";
DefaultThreadFactory defaultThreadFactory = DefaultThreadFactory.getInstance(threadName, Thread.MAX_PRIORITY);
CallerRunsPolicy callerRunsPolicy = new TioCallerRunsPolicy();
tioExecutor = new SynThreadPoolExecutor(MAX_POOL_SIZE_FOR_TIO, MAX_POOL_SIZE_FOR_TIO, KEEP_ALIVE_TIME, runnableQueue, defaultThreadFactory, threadName,
callerRunsPolicy);
// tioExecutor = new SynThreadPoolExecutor(AVAILABLE_PROCESSORS * 2, Integer.MAX_VALUE, 60, new SynchronousQueue<Runnable>(), defaultThreadFactory, tioThreadName);
tioExecutor.prestartCoreThread();
// tioExecutor.prestartAllCoreThreads();
return tioExecutor;
}
}
/**
* 初始化默认的线程池。在我们的默认线程池配置中,我们做了以下几件事,以保障吞吐量:
* 1. 默认预发分配32个的核心线程,以保障数据较小时(Channel数较少时)的实时吞吐。
* 2. 工作队列的大小适当调小,这样在用户数据量比较大(Channel数较多)时,可以更快的触发线程池新建线程的策略,及时的弹起更多的计算资源。
* 3. 设置了默认的线程保活时间(默认60s),当数据量下去后,可以及时回收线程资源。
*
* @param threadPrefix: 线程名称的前缀标识
* @return
*/
public static ThreadPoolExecutor newDefaultThreadPool(final String threadPrefix) {
return new ThreadPoolExecutor(CORE_POOL_SIZE, MAX_CORE_POOL_SIZE, KEEP_ALIVE_SECONDS, TimeUnit.SECONDS,
new ArrayBlockingQueue<Runnable>(BLOCKING_QUEUE_SIZE),
new ThreadFactory() {
private final AtomicInteger counter = new AtomicInteger();
@Override
public Thread newThread(Runnable r) {
String threadName = threadPrefix + counter.getAndIncrement();
LOG.info("TunnelWorkerConfig new thread: " + threadName);
return new Thread(r, threadName);
}
},
new CallerRunsPolicy());
}
@Test
void boundedQueueElasticSize() {
final ThreadPoolExecutor executor = ThreadPoolExecutors.builder()
.boundedQueue(1)
.elasticSize(0, 1)
.keepAlive(Duration.ofMinutes(1))
.threadFactory(Executors.defaultThreadFactory())
.handler(new CallerRunsPolicy())
.build();
assertThat(executor.getCorePoolSize(), is(0));
assertThat(executor.getMaximumPoolSize(), is(1));
assertThat(executor.getQueue(), is(instanceOf(ArrayBlockingQueue.class)));
assertThat(executor.allowsCoreThreadTimeOut(), is(false));
}
/**
* @return the executor for UI background processes.
*/
@Bean(name = "uiExecutor")
@ConditionalOnMissingBean(name = "uiExecutor")
public Executor uiExecutor() {
final BlockingQueue<Runnable> blockingQueue = new ArrayBlockingQueue<>(20);
final ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(1, 20, 10000, TimeUnit.MILLISECONDS,
blockingQueue, new ThreadFactoryBuilder().setNameFormat("ui-executor-pool-%d").build());
threadPoolExecutor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
return new DelegatingSecurityContextExecutor(threadPoolExecutor);
}
/**
* Creates and configures a new {@link ScheduledExecutorService} with a
* timeout value of {@code seconds}. If the timeout value is below or equal
* to zero then the returned executor will never timeout.
*
* @return the newly created and configured executor service.
*/
private static ScheduledExecutorService createServiceExecutor(long seconds) {
Preconditions.checkArgument(seconds >= 0, "The timeout value must be equal to or greater than 0!");
ScheduledThreadPoolExecutor executor = new ScheduledThreadPoolExecutor(1);
executor.setRejectedExecutionHandler(new CallerRunsPolicy());
executor.setThreadFactory(new ThreadFactoryBuilder().setNameFormat("ServiceQueueThread").build());
if (seconds > 0) {
executor.setKeepAliveTime(seconds, TimeUnit.SECONDS);
executor.allowCoreThreadTimeOut(true);
}
return executor;
}
@Bean
ListeningScheduledExecutorService cloudApiListeningScheduledExecutorService() {
return MoreExecutors
.listeningDecorator(new MDCCleanerScheduledExecutor(executorServicePoolSize,
new ThreadFactoryBuilder().setNameFormat("cloud-api-%d").build(),
new CallerRunsPolicy()));
}
@Ignore
@Test
public void givenCallerRunsPolicy_WhenSaturated_ThenTheCallerThreadRunsTheTask() {
executor = new ThreadPoolExecutor(1, 1, 0, MILLISECONDS, new SynchronousQueue<>(), new CallerRunsPolicy());
executor.execute(() -> waitFor(250));
long startTime = System.currentTimeMillis();
executor.execute(() -> waitFor(500));
long blockedDuration = System.currentTimeMillis() - startTime;
assertThat(blockedDuration).isGreaterThanOrEqualTo(500);
}
/**
* Creates and configures the update service for this game sync executor.
* The returned executor is <b>unconfigurable</b> meaning it's configuration
* can no longer be modified.
*
* @param nThreads
* the amount of threads to create this service.
* @return the newly created and configured service.
*/
private ExecutorService create(int nThreads) {
if (nThreads <= 1)
return null;
ThreadPoolExecutor executor = (ThreadPoolExecutor) Executors.newFixedThreadPool(nThreads);
executor.setRejectedExecutionHandler(new CallerRunsPolicy());
executor.setThreadFactory(new ThreadFactoryBuilder().setNameFormat("GameSyncThread").build());
return Executors.unconfigurableExecutorService(executor);
}