下面列出了java.util.concurrent.ThreadPoolExecutor#allowCoreThreadTimeOut ( ) 实例代码,或者点击链接到github查看源代码,也可以在右侧发表评论。
protected void initAsyncJobExecutionThreadPool() {
if (threadPoolQueue == null) {
LOGGER.info("Creating thread pool queue of size {}", queueSize);
threadPoolQueue = new ArrayBlockingQueue<>(queueSize);
}
if (executorService == null) {
LOGGER.info("Creating executor service with corePoolSize {}, maxPoolSize {} and keepAliveTime {}", corePoolSize, maxPoolSize, keepAliveTime);
BasicThreadFactory threadFactory = new BasicThreadFactory.Builder().namingPattern(threadPoolNamingPattern).build();
ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(corePoolSize, maxPoolSize, keepAliveTime,
TimeUnit.MILLISECONDS, threadPoolQueue, threadFactory);
threadPoolExecutor.allowCoreThreadTimeOut(allowCoreThreadTimeout);
executorService = threadPoolExecutor;
}
}
@Override
public void onCreate() {
// By default, a thread pool executor does not timeout its core threads, so it will
// never kill them when there isn't any work to do any more. That would mean the service
// can never die! By creating it this way and calling allowCoreThreadTimeOut, we allow
// the single thread to time out after WORKER_THREAD_TIMEOUT_SECONDS = 15 seconds, allowing
// the process to be reclaimed by the system any time after that if it's not doing
// anything else.
// Executors#newSingleThreadExecutor creates a ThreadPoolExecutor but it returns the
// superclass ExecutorService which does not have the #allowCoreThreadTimeOut method,
// so we can't use that.
mExecutor = new ThreadPoolExecutor(1 /* corePoolSize */, 1 /* maximumPoolSize */,
WORKER_THREAD_TIMEOUT_SECONDS /* keepAliveTime */,
TimeUnit.SECONDS /* unit for keepAliveTime */,
new LinkedBlockingQueue<Runnable>() /* workQueue */);
mExecutor.allowCoreThreadTimeOut(true);
}
/**
* allowCoreThreadTimeOut(true) causes idle threads to time out
*/
public void testAllowCoreThreadTimeOut_true() throws Exception {
long keepAliveTime = timeoutMillis();
final ThreadPoolExecutor p =
new CustomTPE(2, 10,
keepAliveTime, MILLISECONDS,
new ArrayBlockingQueue<Runnable>(10));
try (PoolCleaner cleaner = cleaner(p)) {
final CountDownLatch threadStarted = new CountDownLatch(1);
p.allowCoreThreadTimeOut(true);
p.execute(new CheckedRunnable() {
public void realRun() {
threadStarted.countDown();
assertEquals(1, p.getPoolSize());
}});
await(threadStarted);
delay(keepAliveTime);
long startTime = System.nanoTime();
while (p.getPoolSize() > 0
&& millisElapsedSince(startTime) < LONG_DELAY_MS)
Thread.yield();
assertTrue(millisElapsedSince(startTime) < LONG_DELAY_MS);
assertEquals(0, p.getPoolSize());
}
}
/**
* Creates the AST serializer executor service used for in-memory serialization of split functions' ASTs.
* It is created with an unbounded queue (so it can queue any number of pending tasks). Its core and max
* threads is the same, but they are all allowed to time out so when there's no work, they can all go
* away. The threads will be daemons, and they will time out if idle for a minute. Their priority is also
* slightly lower than normal priority as we'd prefer the CPU to keep running the program; serializing
* split function is a memory conservation measure (it allows us to release the AST), it can wait a bit.
* @return an executor service with above described characteristics.
*/
private static ExecutorService createAstSerializerExecutorService() {
final int threads = Math.max(1, Options.getIntProperty("nashorn.serialize.threads", Runtime.getRuntime().availableProcessors() / 2));
final ThreadPoolExecutor service = new ThreadPoolExecutor(threads, threads, 1L, TimeUnit.MINUTES, new LinkedBlockingDeque<Runnable>(),
new ThreadFactory() {
@Override
public Thread newThread(final Runnable r) {
final Thread t = new Thread(r, "Nashorn AST Serializer");
t.setDaemon(true);
t.setPriority(Thread.NORM_PRIORITY - 1);
return t;
}
});
service.allowCoreThreadTimeOut(true);
return service;
}
static ThreadPoolExecutor getPoolFromNs(final Configuration ns) {
final int maxQueueSize = ns.get(Constants.DYNAMODB_CLIENT_EXECUTOR_QUEUE_MAX_LENGTH);
final ThreadFactory factory = new ThreadFactoryBuilder().setNameFormat("getDelegate-%d").build();
//begin adaptation of constructor at
//https://github.com/buka/titan/blob/master/src/main/java/com/thinkaurelius/titan/diskstorage/dynamodb/DynamoDBClient.java#L104
final int maxPoolSize = ns.get(Constants.DYNAMODB_CLIENT_EXECUTOR_MAX_POOL_SIZE);
final int corePoolSize = ns.get(Constants.DYNAMODB_CLIENT_EXECUTOR_CORE_POOL_SIZE);
final long keepAlive = ns.get(Constants.DYNAMODB_CLIENT_EXECUTOR_KEEP_ALIVE);
final ThreadPoolExecutor executor = new ThreadPoolExecutor(corePoolSize, maxPoolSize, keepAlive,
TimeUnit.MILLISECONDS, new ArrayBlockingQueue<>(maxQueueSize), factory, new ThreadPoolExecutor.CallerRunsPolicy());
//end adaptation of constructor at
//https://github.com/buka/titan/blob/master/src/main/java/com/thinkaurelius/titan/diskstorage/dynamodb/DynamoDBClient.java#L104
executor.allowCoreThreadTimeOut(false);
executor.prestartAllCoreThreads();
return executor;
}
/**
* Create a AsyncDiskServices with a set of volumes (specified by their
* root directories).
*
* The AsyncDiskServices uses one ThreadPool per volume to do the async
* disk operations.
*
* @param volumes The roots of the file system volumes.
*/
public AsyncDiskService(String[] volumes) {
threadFactory = new ThreadFactory() {
@Override
public Thread newThread(Runnable r) {
return new Thread(threadGroup, r);
}
};
// Create one ThreadPool per volume
for (int v = 0 ; v < volumes.length; v++) {
ThreadPoolExecutor executor = new ThreadPoolExecutor(
CORE_THREADS_PER_VOLUME, MAXIMUM_THREADS_PER_VOLUME,
THREADS_KEEP_ALIVE_SECONDS, TimeUnit.SECONDS,
new LinkedBlockingQueue<Runnable>(), threadFactory);
// This can reduce the number of running threads
executor.allowCoreThreadTimeOut(true);
executors.put(volumes[v], executor);
}
}
public void createThreadPool() {
int maxUserConnected = GlobalContext.getConfig().getMaxUsersTotal();
int maxAliveThreads = maxUserConnected + GlobalContext.getConfig().getMaxUsersExempt();
int minAliveThreads = (int) Math.round(maxAliveThreads * 0.25);
_pool = new ThreadPoolExecutor(minAliveThreads, maxAliveThreads, 3 * 60, TimeUnit.SECONDS,
new SynchronousQueue<>(), new ConnectionThreadFactory(), new ThreadPoolExecutor.AbortPolicy());
_pool.allowCoreThreadTimeOut(false);
_pool.prestartAllCoreThreads();
}
void test(String[] args) throws Throwable {
final int threadCount = 10;
final int timeoutMillis = 30;
BlockingQueue<Runnable> q = new ArrayBlockingQueue<>(2*threadCount);
ThreadPoolExecutor tpe
= new ThreadPoolExecutor(threadCount, threadCount,
timeoutMillis, TimeUnit.MILLISECONDS,
q, new IdentifiableThreadFactory());
equal(tpe.getCorePoolSize(), threadCount);
check(! tpe.allowsCoreThreadTimeOut());
tpe.allowCoreThreadTimeOut(true);
check(tpe.allowsCoreThreadTimeOut());
equal(countExecutorThreads(), 0);
long startTime = System.nanoTime();
for (int i = 0; i < threadCount; i++) {
tpe.submit(() -> {});
int count = countExecutorThreads();
if (millisElapsedSince(startTime) < timeoutMillis)
equal(count, i + 1);
}
while (countExecutorThreads() > 0 &&
millisElapsedSince(startTime) < LONG_DELAY_MS)
Thread.yield();
equal(countExecutorThreads(), 0);
check(millisElapsedSince(startTime) >= timeoutMillis);
tpe.shutdown();
check(tpe.allowsCoreThreadTimeOut());
check(tpe.awaitTermination(LONG_DELAY_MS, MILLISECONDS));
System.out.printf("%nPassed = %d, failed = %d%n%n", passed, failed);
if (failed > 0) throw new Exception("Some tests failed");
}
protected ThreadPoolExecutor createNewThreadPoolService(Configuration conf) {
int nThreads = conf.getInt(
YarnConfiguration.RM_DELEGATION_TOKEN_RENEWER_THREAD_COUNT,
YarnConfiguration.DEFAULT_RM_DELEGATION_TOKEN_RENEWER_THREAD_COUNT);
ThreadFactory tf = new ThreadFactoryBuilder()
.setNameFormat("DelegationTokenRenewer #%d")
.build();
ThreadPoolExecutor pool =
new ThreadPoolExecutor(nThreads, nThreads, 3L,
TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>());
pool.setThreadFactory(tf);
pool.allowCoreThreadTimeOut(true);
return pool;
}
/**
* @return a pool that has one thread only at every time. A second action added to the pool (
* running concurrently), will cause an exception.
*/
private ExecutorService getPool() {
int maxThreads = 1;
long keepAliveTime = 60;
ThreadPoolExecutor pool =
new ThreadPoolExecutor(1, maxThreads, keepAliveTime, TimeUnit.SECONDS,
new SynchronousQueue<>(), Threads.newDaemonThreadFactory("hbase-table"));
pool.allowCoreThreadTimeOut(true);
return pool;
}
public static ThreadPoolExecutor createThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, boolean allowCoreThreadTimeOut) {
LOG.info("Thread pool executor is created, corePoolSize={}, maximumPoolSize={}, keepAliveTime={}, allowCoreThreadTimeOut={}", corePoolSize, maximumPoolSize, keepAliveTime, allowCoreThreadTimeOut);
ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(corePoolSize,
maximumPoolSize,
keepAliveTime,
TimeUnit.MILLISECONDS,
createBlockingQueue(),
createRejectedPolicy());
threadPoolExecutor.allowCoreThreadTimeOut(allowCoreThreadTimeOut);
return threadPoolExecutor;
}
void insertDisk(File[] volumes, Configuration conf) {
// Create one ThreadPool per volume
for (int v = 0 ; v < volumes.length; v++) {
final File vol = volumes[v];
ThreadFactory threadFactory = new ThreadFactory() {
int counter = 0;
@Override
public Thread newThread(Runnable r) {
int thisIndex;
synchronized (this) {
thisIndex = counter++;
}
Thread t = new Thread(threadGroup, r);
t.setName("Async disk worker #" + thisIndex +
" for volume " + vol);
return t;
}
};
ThreadPoolExecutor executor = new ThreadPoolExecutor(
CORE_THREADS_PER_VOLUME,
conf.getInt("dfs.datanode.max.deletion.threads.per.volume",
DEFAULT_MAXIMUM_THREADS_PER_VOLUME),
THREADS_KEEP_ALIVE_SECONDS, TimeUnit.SECONDS,
new LinkedBlockingQueue<Runnable>(), threadFactory);
// This can reduce the number of running threads
executor.allowCoreThreadTimeOut(true);
synchronized (this) {
executors.put(vol, executor);
}
}
}
public static ThreadPoolExecutor createThreadPoolExecutor(final int queueSize, final String threadName, ThreadFactory threadFactory, final RejectedExecutionHandler policy) {
if (threadFactory == null) {
threadFactory = new DefaultThreadFactory(threadName, true);
}
LinkedBlockingQueue<Runnable> queue = new LinkedBlockingQueue<>(queueSize);
ThreadPoolExecutor executor = new ThreadPoolExecutor(1, 1, 5, SECONDS, queue, threadFactory, policy);
executor.allowCoreThreadTimeOut(true);
return executor;
}
public static void init(String maxPoolSizeStr, String keepAliveTimeStr){
if(serviceRef.get() != null)
return;
synchronized (DalRequestExecutor.class) {
if(serviceRef.get() != null)
return;
int maxPoolSize = DEFAULT_MAX_POOL_SIZE;
if(maxPoolSizeStr != null)
maxPoolSize = Integer.parseInt(maxPoolSizeStr);
int keepAliveTime = DEFAULT_KEEP_ALIVE_TIME;
if(keepAliveTimeStr != null)
keepAliveTime = Integer.parseInt(keepAliveTimeStr);
ThreadPoolExecutor executer = new ThreadPoolExecutor(maxPoolSize, maxPoolSize, keepAliveTime, TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>(), new ThreadFactory() {
AtomicInteger atomic = new AtomicInteger();
@Override
public Thread newThread(Runnable r) {
return new Thread(r, "DalRequestExecutor-Worker-" + this.atomic.getAndIncrement());
}
});
executer.allowCoreThreadTimeOut(true);
serviceRef.set(executer);
}
}
public AsyncDataService() {
threadFactory = new ThreadFactory() {
@Override
public Thread newThread(Runnable r) {
return new Thread(threadGroup, r);
}
};
executor = new ThreadPoolExecutor(CORE_THREADS_PER_VOLUME,
MAXIMUM_THREADS_PER_VOLUME, THREADS_KEEP_ALIVE_SECONDS,
TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>(), threadFactory);
// This can reduce the number of running threads
executor.allowCoreThreadTimeOut(true);
}
private void initThreads() {
threadPool = new ThreadPoolExecutor(NUMBER_OF_CORES, NUMBER_OF_CORES, KEEP_ALIVE_TIME, KEEP_ALIVE_TIME_UNIT, threadQueue);
threadPool.allowCoreThreadTimeOut(true);
}
public ElasticJobExecutorService(final String namingPattern, final int threadSize) {
workQueue = new LinkedBlockingQueue<>();
threadPoolExecutor = new ThreadPoolExecutor(
threadSize, threadSize, 5L, TimeUnit.MINUTES, workQueue, new BasicThreadFactory.Builder().namingPattern(Joiner.on("-").join(namingPattern, "%s")).build());
threadPoolExecutor.allowCoreThreadTimeOut(true);
}
@SuppressLint({"NewApi"})
public static void allowCoreThreadTimeout(ThreadPoolExecutor executor, boolean value) {
if (VERSION.SDK_INT >= 9) {
executor.allowCoreThreadTimeOut(value);
}
}
public ThreadPoolExecutor build(final Options options) {
int corePoolSize = options.get(prefix + ".CorePoolSize", size);
if (corePoolSize < 1) {
corePoolSize = 1;
}
// Default setting is for a fixed pool size, MaximumPoolSize==CorePoolSize
int maximumPoolSize = Math.max(options.get(prefix + ".MaximumPoolSize", corePoolSize), corePoolSize);
if (maximumPoolSize < corePoolSize) {
maximumPoolSize = corePoolSize;
}
// Default QueueSize is bounded using the corePoolSize, else bounded pools will never grow
final int qsize = options.get(prefix + ".QueueSize", corePoolSize);
// Keep Threads inactive threads alive for 60 seconds by default
final Duration keepAliveTime = options.get(prefix + ".KeepAliveTime", new Duration(60, TimeUnit.SECONDS));
// All threads can be timed out by default
final boolean allowCoreThreadTimeout = options.get(prefix + ".AllowCoreThreadTimeOut", true);
// If the user explicitly set the QueueSize to 0, we default QueueType to SYNCHRONOUS
final QueueType defaultQueueType = qsize < 1 ? QueueType.SYNCHRONOUS : QueueType.LINKED;
final BlockingQueue<Runnable> queue = options.get(prefix + ".QueueType", defaultQueueType).create(options, prefix, qsize);
ThreadFactory factory = this.threadFactory;
if (factory == null) {
factory = new DaemonThreadFactory(prefix);
}
RejectedExecutionHandler handler = this.rejectedExecutionHandler;
if (handler == null) {
final String rejectedExecutionHandlerClass = options.get(prefix + ".RejectedExecutionHandlerClass", (String) null);
if (rejectedExecutionHandlerClass == null) {
final Duration duration = options.get(prefix + ".OfferTimeout", new Duration(30, TimeUnit.SECONDS));
handler = new OfferRejectedExecutionHandler(duration);
} else {
try {
handler = RejectedExecutionHandler.class.cast(Thread.currentThread().getContextClassLoader().loadClass(rejectedExecutionHandlerClass).newInstance());
} catch (final Exception e) {
throw new OpenEJBRuntimeException(e);
}
}
}
final ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(corePoolSize
, maximumPoolSize
, keepAliveTime.getTime()
, keepAliveTime.getUnit() != null ? keepAliveTime.getUnit() : TimeUnit.SECONDS
, queue
, factory
, handler
);
threadPoolExecutor.allowCoreThreadTimeOut(allowCoreThreadTimeout);
return threadPoolExecutor;
}
public ExecutorServiceObject(final String namingPattern, final int threadSize) {
workQueue = new LinkedBlockingQueue<>();
threadPoolExecutor = new ThreadPoolExecutor(threadSize, threadSize, 5L, TimeUnit.MINUTES, workQueue,
new BasicThreadFactory.Builder().namingPattern(Joiner.on("-").join(namingPattern, "%s")).build());
threadPoolExecutor.allowCoreThreadTimeOut(true);
}