下面列出了java.util.concurrent.ThreadPoolExecutor#allowsCoreThreadTimeOut ( ) 实例代码,或者点击链接到github查看源代码,也可以在右侧发表评论。
private void resetThreadPoolSize() {
if (!ThreadPoolExecutor.class.isInstance(SumkThreadPool.executor())) {
return;
}
ThreadPoolExecutor pool = (ThreadPoolExecutor) SumkThreadPool.executor();
int size = AppInfo.getInt("sumk.core.threadpool.core", 0);
if (size > 0 && pool.getCorePoolSize() != size) {
logger.info("change ThreadPool size from {} to {}", pool.getCorePoolSize(), size);
pool.setCorePoolSize(size);
}
size = AppInfo.getInt("sumk.core.threadpool.max", 0);
if (size > 0 && pool.getMaximumPoolSize() != size) {
logger.info("change ThreadPool max size from {} to {}", pool.getMaximumPoolSize(), size);
pool.setMaximumPoolSize(size);
}
size = AppInfo.getInt("sumk.core.threadpool.aliveTime", 0);
if (size > 0 && pool.getKeepAliveTime(TimeUnit.MILLISECONDS) != size) {
logger.info("change ThreadPool keepalive time from {} to {}", pool.getKeepAliveTime(TimeUnit.MILLISECONDS),
size);
pool.setKeepAliveTime(size, TimeUnit.MILLISECONDS);
}
String v = AppInfo.get("sumk.core.threadpool.allowCoreThreadTimeOut", null);
if (v != null) {
boolean allowCoreTimeout = "1".equals(v) || "true".equalsIgnoreCase(v);
if (allowCoreTimeout != pool.allowsCoreThreadTimeOut()) {
logger.info("change ThreadPool allowsCoreThreadTimeOut from {} to {}", pool.allowsCoreThreadTimeOut(),
allowCoreTimeout);
pool.allowCoreThreadTimeOut(allowCoreTimeout);
}
}
}
@Test
public void createShard5() throws Exception {
int taskNum = 10;
ThreadPoolExecutor executorService = new ThreadPoolExecutor(taskNum, taskNum, 1L, TimeUnit.SECONDS,
new SynchronousQueue<>());
executorService.allowsCoreThreadTimeOut();
for(int i = 0; i < taskNum; i++) {
executorService.execute(new Runnable() {
@Override
public void run() {
List<RedisCreateInfo> createInfo = createInfo(Lists.newArrayList("192.168.0.1:6379", "192.168.0.1:6380"),
Lists.newArrayList("192.168.0.2:6379", "192.168.0.2:6380"));
RetMessage result = metaUpdate.createShard(clusterName, shardName, createInfo);
logger.info("{}", result);
}
});
}
waitConditionUntilTimeOut(()->executorService.getCompletedTaskCount() == taskNum, 5000);
System.out.println("=========================");
List<ShardTbl> shards = shardService.findAllByClusterName(clusterName);
logger.info("{}", shards);
Assert.assertEquals(1, shards.size());
List<RedisTbl> redisTbls = redisService.findRedisesByDcClusterShard(activeDC, clusterName, shardName);
logger.info("{}", redisTbls);
Assert.assertEquals(2, redisTbls.size());
List<RedisTbl> keepers = redisService.findKeepersByDcClusterShard(activeDC, clusterName, shardName);
logger.info("{}", keepers);
Assert.assertEquals(2, keepers.size());
}
@Test
public void createShard6() throws Exception {
int taskNum = 3;
ThreadPoolExecutor executorService = new ThreadPoolExecutor(taskNum, taskNum, 1L, TimeUnit.SECONDS,
new SynchronousQueue<>());
executorService.allowsCoreThreadTimeOut();
for(int i = 0; i < taskNum; i++) {
executorService.execute(new Runnable() {
@Override
public void run() {
ShardCreateInfo shardCreateInfo = new ShardCreateInfo();
shardCreateInfo.setShardMonitorName(shardName);
shardCreateInfo.setShardName(shardName);
RetMessage result = metaUpdate.createShards(clusterName, Lists.newArrayList(shardCreateInfo));
logger.info("{}", result);
}
});
}
waitConditionUntilTimeOut(()->executorService.getCompletedTaskCount() == taskNum, 2000);
System.out.println("=========================");
List<ShardTbl> shards = shardService.findAllByClusterName(clusterName);
logger.info("{}", shards);
Assert.assertEquals(1, shards.size());
}
private ThreadTaskManager() {
mExePool = new ThreadPoolExecutor(CORE_POOL_NUM, Integer.MAX_VALUE,
60L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>());
mExePool.allowsCoreThreadTimeOut();
}