下面列出了java.util.concurrent.ThreadPoolExecutor#setCorePoolSize ( ) 实例代码,或者点击链接到github查看源代码,也可以在右侧发表评论。
@Override
public void init() throws ServletException {
reNewThreadPool();
Runnable c = new Runnable() {
@Override
public void run() {
ThreadPoolExecutor p = poolExecutorRef.get();
p.setCorePoolSize(coreSize.get());
p.setMaximumPoolSize(maximumSize.get());
p.setKeepAliveTime(aliveTime.get(),TimeUnit.MILLISECONDS);
}
};
coreSize.addCallback(c);
maximumSize.addCallback(c);
aliveTime.addCallback(c);
// TODO metrics reporting
}
/**
* prestartCoreThread starts a thread if under corePoolSize, else doesn't
*/
public void testPrestartCoreThread() {
final ThreadPoolExecutor p =
new ThreadPoolExecutor(2, 6,
LONG_DELAY_MS, MILLISECONDS,
new ArrayBlockingQueue<Runnable>(10));
try (PoolCleaner cleaner = cleaner(p)) {
assertEquals(0, p.getPoolSize());
assertTrue(p.prestartCoreThread());
assertEquals(1, p.getPoolSize());
assertTrue(p.prestartCoreThread());
assertEquals(2, p.getPoolSize());
assertFalse(p.prestartCoreThread());
assertEquals(2, p.getPoolSize());
p.setCorePoolSize(4);
assertTrue(p.prestartCoreThread());
assertEquals(3, p.getPoolSize());
assertTrue(p.prestartCoreThread());
assertEquals(4, p.getPoolSize());
assertFalse(p.prestartCoreThread());
assertEquals(4, p.getPoolSize());
}
}
/**
* prestartCoreThread starts a thread if under corePoolSize, else doesn't
*/
public void testPrestartCoreThread() {
final ThreadPoolExecutor p =
new CustomTPE(2, 6,
LONG_DELAY_MS, MILLISECONDS,
new ArrayBlockingQueue<Runnable>(10));
try (PoolCleaner cleaner = cleaner(p)) {
assertEquals(0, p.getPoolSize());
assertTrue(p.prestartCoreThread());
assertEquals(1, p.getPoolSize());
assertTrue(p.prestartCoreThread());
assertEquals(2, p.getPoolSize());
assertFalse(p.prestartCoreThread());
assertEquals(2, p.getPoolSize());
p.setCorePoolSize(4);
assertTrue(p.prestartCoreThread());
assertEquals(3, p.getPoolSize());
assertTrue(p.prestartCoreThread());
assertEquals(4, p.getPoolSize());
assertFalse(p.prestartCoreThread());
assertEquals(4, p.getPoolSize());
}
}
/**
* prestartAllCoreThreads starts all corePoolSize threads
*/
public void testPrestartAllCoreThreads() {
final ThreadPoolExecutor p =
new ThreadPoolExecutor(2, 6,
LONG_DELAY_MS, MILLISECONDS,
new ArrayBlockingQueue<Runnable>(10));
try (PoolCleaner cleaner = cleaner(p)) {
assertEquals(0, p.getPoolSize());
p.prestartAllCoreThreads();
assertEquals(2, p.getPoolSize());
p.prestartAllCoreThreads();
assertEquals(2, p.getPoolSize());
p.setCorePoolSize(4);
p.prestartAllCoreThreads();
assertEquals(4, p.getPoolSize());
p.prestartAllCoreThreads();
assertEquals(4, p.getPoolSize());
}
}
/**
* prestartCoreThread starts a thread if under corePoolSize, else doesn't
*/
public void testPrestartCoreThread() {
final ThreadPoolExecutor p =
new CustomTPE(2, 6,
LONG_DELAY_MS, MILLISECONDS,
new ArrayBlockingQueue<Runnable>(10));
try (PoolCleaner cleaner = cleaner(p)) {
assertEquals(0, p.getPoolSize());
assertTrue(p.prestartCoreThread());
assertEquals(1, p.getPoolSize());
assertTrue(p.prestartCoreThread());
assertEquals(2, p.getPoolSize());
assertFalse(p.prestartCoreThread());
assertEquals(2, p.getPoolSize());
p.setCorePoolSize(4);
assertTrue(p.prestartCoreThread());
assertEquals(3, p.getPoolSize());
assertTrue(p.prestartCoreThread());
assertEquals(4, p.getPoolSize());
assertFalse(p.prestartCoreThread());
assertEquals(4, p.getPoolSize());
}
}
/**
* prestartAllCoreThreads starts all corePoolSize threads
*/
public void testPrestartAllCoreThreads() {
final ThreadPoolExecutor p =
new CustomTPE(2, 6,
LONG_DELAY_MS, MILLISECONDS,
new ArrayBlockingQueue<Runnable>(10));
try (PoolCleaner cleaner = cleaner(p)) {
assertEquals(0, p.getPoolSize());
p.prestartAllCoreThreads();
assertEquals(2, p.getPoolSize());
p.prestartAllCoreThreads();
assertEquals(2, p.getPoolSize());
p.setCorePoolSize(4);
p.prestartAllCoreThreads();
assertEquals(4, p.getPoolSize());
p.prestartAllCoreThreads();
assertEquals(4, p.getPoolSize());
}
}
@Override
protected void doInitialize() {
// .触发SPI
logger.info("tConsole -> trigger TelStartContextListener.onStart");
this.spiTrigger.notifySpiWithoutResult(TelStartContextListener.class, listener -> {
listener.onStart(AbstractTelService.this);
});
//
logger.info("tConsole -> applyCommand.");
this.applyCommand();
this.addCommand(new String[] { "get", "set" }, new GetSetExecutor());
this.addCommand(new String[] { "quit", "exit" }, new QuitExecutor());
this.addCommand(new String[] { "help" }, new HelpExecutor());
//
// .执行线程池
String shortName = "tConsole-Work";
int workSize = 2;
this.executor = Executors.newScheduledThreadPool(workSize, new NameThreadFactory(shortName, this.classLoader));
ThreadPoolExecutor threadPool = (ThreadPoolExecutor) this.executor;
threadPool.setCorePoolSize(workSize);
threadPool.setMaximumPoolSize(workSize);
logger.info("tConsole -> create TelnetHandler , threadShortName={} , workThreadSize = {}.", shortName, workSize);
}
public void testCorePoolSizeGreaterThanMax() {
ThreadPoolExecutor tp = new ThreadPoolExecutor(
1 /* core pool size */, 1 /* max pool size */,
1000, TimeUnit.MILLISECONDS, new ArrayBlockingQueue<>(10));
// It should be illegal to set a core pool size that's larger than the max
// pool size but apps have been allowed to get away with it so far. The pattern
// below occurs in a commonly used library. Note that the executor is in a sane
// state at the end of both method calls.
tp.setCorePoolSize(5);
tp.setMaximumPoolSize(5);
}
protected void modified(Map<String, Object> properties) {
for (Entry<String, Object> entry : properties.entrySet()) {
if (entry.getKey().equals("service.pid") || entry.getKey().equals("component.id")
|| entry.getKey().equals("component.name")) {
continue;
}
String poolName = entry.getKey();
Object config = entry.getValue();
if (config == null) {
configs.remove(poolName);
}
if (config instanceof String) {
try {
Integer poolSize = Integer.valueOf((String) config);
configs.put(poolName, poolSize);
ThreadPoolExecutor pool = (ThreadPoolExecutor) pools.get(poolName);
if (pool instanceof ScheduledThreadPoolExecutor) {
pool.setCorePoolSize(poolSize);
LOGGER.debug("Updated scheduled thread pool '{}' to size {}",
new Object[] { poolName, poolSize });
} else if (pool instanceof QueueingThreadPoolExecutor) {
pool.setMaximumPoolSize(poolSize);
LOGGER.debug("Updated queuing thread pool '{}' to size {}",
new Object[] { poolName, poolSize });
}
} catch (NumberFormatException e) {
LOGGER.warn("Ignoring invalid configuration for pool '{}': {} - value must be an integer",
new Object[] { poolName, config });
continue;
}
}
}
}
public KanboardAPI(String serverURL, final String username, final String password) throws IOException {
Authenticator.setDefault(new Authenticator() {
@Override
protected PasswordAuthentication getPasswordAuthentication() {
return new PasswordAuthentication(username, password.toCharArray());
}
});
kanboardURL = KanboardAPI.sanitizeURL(serverURL.trim());
Log.i(Constants.TAG, String.format("Host uses %s", kanboardURL.getProtocol()));
// threadPoolExecutor = new ThreadPoolExecutor(12, 12, 20, TimeUnit.SECONDS, new ArrayBlockingQueue<Runnable>(256));
threadPoolExecutor = (ThreadPoolExecutor) AsyncTask.THREAD_POOL_EXECUTOR;
threadPoolExecutor.setCorePoolSize(12);
threadPoolExecutor.setMaximumPoolSize(12);
}
/**
* setCorePoolSize of negative value throws IllegalArgumentException
*/
public void testCorePoolSizeIllegalArgumentException() {
final ThreadPoolExecutor p =
new ThreadPoolExecutor(1, 2,
LONG_DELAY_MS, MILLISECONDS,
new ArrayBlockingQueue<Runnable>(10));
try (PoolCleaner cleaner = cleaner(p)) {
try {
p.setCorePoolSize(-1);
shouldThrow();
} catch (IllegalArgumentException success) {}
}
}
/**
* setCorePoolSize of negative value throws IllegalArgumentException
*/
public void testCorePoolSizeIllegalArgumentException() {
final ThreadPoolExecutor p =
new CustomTPE(1, 2,
LONG_DELAY_MS, MILLISECONDS,
new ArrayBlockingQueue<Runnable>(10));
try (PoolCleaner cleaner = cleaner(p)) {
try {
p.setCorePoolSize(-1);
shouldThrow();
} catch (IllegalArgumentException success) {}
}
}
private void resetThreadPoolSize() {
if (!ThreadPoolExecutor.class.isInstance(SumkThreadPool.executor())) {
return;
}
ThreadPoolExecutor pool = (ThreadPoolExecutor) SumkThreadPool.executor();
int size = AppInfo.getInt("sumk.core.threadpool.core", 0);
if (size > 0 && pool.getCorePoolSize() != size) {
logger.info("change ThreadPool size from {} to {}", pool.getCorePoolSize(), size);
pool.setCorePoolSize(size);
}
size = AppInfo.getInt("sumk.core.threadpool.max", 0);
if (size > 0 && pool.getMaximumPoolSize() != size) {
logger.info("change ThreadPool max size from {} to {}", pool.getMaximumPoolSize(), size);
pool.setMaximumPoolSize(size);
}
size = AppInfo.getInt("sumk.core.threadpool.aliveTime", 0);
if (size > 0 && pool.getKeepAliveTime(TimeUnit.MILLISECONDS) != size) {
logger.info("change ThreadPool keepalive time from {} to {}", pool.getKeepAliveTime(TimeUnit.MILLISECONDS),
size);
pool.setKeepAliveTime(size, TimeUnit.MILLISECONDS);
}
String v = AppInfo.get("sumk.core.threadpool.allowCoreThreadTimeOut", null);
if (v != null) {
boolean allowCoreTimeout = "1".equals(v) || "true".equalsIgnoreCase(v);
if (allowCoreTimeout != pool.allowsCoreThreadTimeOut()) {
logger.info("change ThreadPool allowsCoreThreadTimeOut from {} to {}", pool.allowsCoreThreadTimeOut(),
allowCoreTimeout);
pool.allowCoreThreadTimeOut(allowCoreTimeout);
}
}
}
public KanboardAPI(String serverURL, final String username, final String password) throws IOException {
Authenticator.setDefault(new Authenticator() {
@Override
protected PasswordAuthentication getPasswordAuthentication() {
return new PasswordAuthentication(username, password.toCharArray());
}
});
kanboardURL = KanboardAPI.sanitizeURL(serverURL.trim());
Log.i(Constants.TAG, String.format("Host uses %s", kanboardURL.getProtocol()));
// threadPoolExecutor = new ThreadPoolExecutor(12, 12, 20, TimeUnit.SECONDS, new ArrayBlockingQueue<Runnable>(256));
threadPoolExecutor = (ThreadPoolExecutor) AsyncTask.THREAD_POOL_EXECUTOR;
threadPoolExecutor.setCorePoolSize(12);
threadPoolExecutor.setMaximumPoolSize(12);
}
protected void modified(Map<String, Object> properties) {
for (Entry<String, Object> entry : properties.entrySet()) {
if (Constants.SERVICE_PID.equals(entry.getKey()) || ComponentConstants.COMPONENT_ID.equals(entry.getKey())
|| ComponentConstants.COMPONENT_NAME.equals(entry.getKey())) {
continue;
}
String poolName = entry.getKey();
Object config = entry.getValue();
if (config == null) {
configs.remove(poolName);
}
if (config instanceof String) {
try {
Integer poolSize = Integer.valueOf((String) config);
configs.put(poolName, poolSize);
ThreadPoolExecutor pool = (ThreadPoolExecutor) pools.get(poolName);
if (pool instanceof ScheduledThreadPoolExecutor) {
pool.setCorePoolSize(poolSize);
LOGGER.debug("Updated scheduled thread pool '{}' to size {}", poolName, poolSize);
} else if (pool instanceof QueueingThreadPoolExecutor) {
pool.setMaximumPoolSize(poolSize);
LOGGER.debug("Updated queuing thread pool '{}' to size {}", poolName, poolSize);
}
} catch (NumberFormatException e) {
LOGGER.warn("Ignoring invalid configuration for pool '{}': {} - value must be an integer", poolName,
config);
continue;
}
}
}
}
static void setCorePoolSize(ThreadPoolExecutor pool, int n) {
pool.setCorePoolSize(n);
equal(pool.getCorePoolSize(), n);
awaitPoolSize(pool, n);
}
@Override
protected void serviceStart() throws Exception {
client.start();
ThreadFactory tf = new ThreadFactoryBuilder().setNameFormat(
this.getClass().getName() + " #%d").setDaemon(true).build();
// Start with a default core-pool size and change it dynamically.
int initSize = Math.min(INITIAL_THREAD_POOL_SIZE, maxThreadPoolSize);
threadPool = new ThreadPoolExecutor(initSize, Integer.MAX_VALUE, 1,
TimeUnit.HOURS, new LinkedBlockingQueue<Runnable>(), tf);
eventDispatcherThread = new Thread() {
@Override
public void run() {
ContainerEvent event = null;
Set<String> allNodes = new HashSet<String>();
while (!stopped.get() && !Thread.currentThread().isInterrupted()) {
try {
event = events.take();
} catch (InterruptedException e) {
if (!stopped.get()) {
LOG.error("Returning, thread interrupted", e);
}
return;
}
allNodes.add(event.getNodeId().toString());
int threadPoolSize = threadPool.getCorePoolSize();
// We can increase the pool size only if haven't reached the maximum
// limit yet.
if (threadPoolSize != maxThreadPoolSize) {
// nodes where containers will run at *this* point of time. This is
// *not* the cluster size and doesn't need to be.
int nodeNum = allNodes.size();
int idealThreadPoolSize = Math.min(maxThreadPoolSize, nodeNum);
if (threadPoolSize < idealThreadPoolSize) {
// Bump up the pool size to idealThreadPoolSize +
// INITIAL_POOL_SIZE, the later is just a buffer so we are not
// always increasing the pool-size
int newThreadPoolSize = Math.min(maxThreadPoolSize,
idealThreadPoolSize + INITIAL_THREAD_POOL_SIZE);
LOG.info("Set NMClientAsync thread pool size to " +
newThreadPoolSize + " as the number of nodes to talk to is "
+ nodeNum);
threadPool.setCorePoolSize(newThreadPoolSize);
}
}
// the events from the queue are handled in parallel with a thread
// pool
threadPool.execute(getContainerEventProcessor(event));
// TODO: Group launching of multiple containers to a single
// NodeManager into a single connection
}
}
};
eventDispatcherThread.setName("Container Event Dispatcher");
eventDispatcherThread.setDaemon(false);
eventDispatcherThread.start();
super.serviceStart();
}
protected void serviceStart() throws Exception {
ThreadFactory tf = new ThreadFactoryBuilder().setNameFormat(
"ContainerLauncher #%d").setDaemon(true).build();
// Start with a default core-pool size of 10 and change it dynamically.
launcherPool = new ThreadPoolExecutor(initialPoolSize,
Integer.MAX_VALUE, 1, TimeUnit.HOURS,
new LinkedBlockingQueue<Runnable>(),
tf);
eventHandlingThread = new Thread() {
@Override
public void run() {
ContainerLauncherEvent event = null;
Set<String> allNodes = new HashSet<String>();
while (!stopped.get() && !Thread.currentThread().isInterrupted()) {
try {
event = eventQueue.take();
} catch (InterruptedException e) {
if (!stopped.get()) {
LOG.error("Returning, interrupted : " + e);
}
return;
}
allNodes.add(event.getContainerMgrAddress());
int poolSize = launcherPool.getCorePoolSize();
// See if we need up the pool size only if haven't reached the
// maximum limit yet.
if (poolSize != limitOnPoolSize) {
// nodes where containers will run at *this* point of time. This is
// *not* the cluster size and doesn't need to be.
int numNodes = allNodes.size();
int idealPoolSize = Math.min(limitOnPoolSize, numNodes);
if (poolSize < idealPoolSize) {
// Bump up the pool size to idealPoolSize+initialPoolSize, the
// later is just a buffer so we are not always increasing the
// pool-size
int newPoolSize = Math.min(limitOnPoolSize, idealPoolSize
+ initialPoolSize);
LOG.info("Setting ContainerLauncher pool size to " + newPoolSize
+ " as number-of-nodes to talk to is " + numNodes);
launcherPool.setCorePoolSize(newPoolSize);
}
}
// the events from the queue are handled in parallel
// using a thread pool
launcherPool.execute(createEventProcessor(event));
// TODO: Group launching of multiple containers to a single
// NodeManager into a single connection
}
}
};
eventHandlingThread.setName("ContainerLauncher Event Handler");
eventHandlingThread.start();
super.serviceStart();
}
@Override
protected void serviceStart() throws Exception {
client.start();
ThreadFactory tf = new ThreadFactoryBuilder().setNameFormat(
this.getClass().getName() + " #%d").setDaemon(true).build();
// Start with a default core-pool size and change it dynamically.
int initSize = Math.min(INITIAL_THREAD_POOL_SIZE, maxThreadPoolSize);
threadPool = new ThreadPoolExecutor(initSize, Integer.MAX_VALUE, 1,
TimeUnit.HOURS, new LinkedBlockingQueue<Runnable>(), tf);
eventDispatcherThread = new Thread() {
@Override
public void run() {
ContainerEvent event = null;
Set<String> allNodes = new HashSet<String>();
while (!stopped.get() && !Thread.currentThread().isInterrupted()) {
try {
event = events.take();
} catch (InterruptedException e) {
if (!stopped.get()) {
LOG.error("Returning, thread interrupted", e);
}
return;
}
allNodes.add(event.getNodeId().toString());
int threadPoolSize = threadPool.getCorePoolSize();
// We can increase the pool size only if haven't reached the maximum
// limit yet.
if (threadPoolSize != maxThreadPoolSize) {
// nodes where containers will run at *this* point of time. This is
// *not* the cluster size and doesn't need to be.
int nodeNum = allNodes.size();
int idealThreadPoolSize = Math.min(maxThreadPoolSize, nodeNum);
if (threadPoolSize < idealThreadPoolSize) {
// Bump up the pool size to idealThreadPoolSize +
// INITIAL_POOL_SIZE, the later is just a buffer so we are not
// always increasing the pool-size
int newThreadPoolSize = Math.min(maxThreadPoolSize,
idealThreadPoolSize + INITIAL_THREAD_POOL_SIZE);
LOG.info("Set NMClientAsync thread pool size to " +
newThreadPoolSize + " as the number of nodes to talk to is "
+ nodeNum);
threadPool.setCorePoolSize(newThreadPoolSize);
}
}
// the events from the queue are handled in parallel with a thread
// pool
threadPool.execute(getContainerEventProcessor(event));
// TODO: Group launching of multiple containers to a single
// NodeManager into a single connection
}
}
};
eventDispatcherThread.setName("Container Event Dispatcher");
eventDispatcherThread.setDaemon(false);
eventDispatcherThread.start();
super.serviceStart();
}
protected void serviceStart() throws Exception {
ThreadFactory tf = new ThreadFactoryBuilder().setNameFormat(
"ContainerLauncher #%d").setDaemon(true).build();
// Start with a default core-pool size of 10 and change it dynamically.
launcherPool = new ThreadPoolExecutor(initialPoolSize,
Integer.MAX_VALUE, 1, TimeUnit.HOURS,
new LinkedBlockingQueue<Runnable>(),
tf);
eventHandlingThread = new Thread() {
@Override
public void run() {
ContainerLauncherEvent event = null;
Set<String> allNodes = new HashSet<String>();
while (!stopped.get() && !Thread.currentThread().isInterrupted()) {
try {
event = eventQueue.take();
} catch (InterruptedException e) {
if (!stopped.get()) {
LOG.error("Returning, interrupted : " + e);
}
return;
}
allNodes.add(event.getContainerMgrAddress());
int poolSize = launcherPool.getCorePoolSize();
// See if we need up the pool size only if haven't reached the
// maximum limit yet.
if (poolSize != limitOnPoolSize) {
// nodes where containers will run at *this* point of time. This is
// *not* the cluster size and doesn't need to be.
int numNodes = allNodes.size();
int idealPoolSize = Math.min(limitOnPoolSize, numNodes);
if (poolSize < idealPoolSize) {
// Bump up the pool size to idealPoolSize+initialPoolSize, the
// later is just a buffer so we are not always increasing the
// pool-size
int newPoolSize = Math.min(limitOnPoolSize, idealPoolSize
+ initialPoolSize);
LOG.info("Setting ContainerLauncher pool size to " + newPoolSize
+ " as number-of-nodes to talk to is " + numNodes);
launcherPool.setCorePoolSize(newPoolSize);
}
}
// the events from the queue are handled in parallel
// using a thread pool
launcherPool.execute(createEventProcessor(event));
// TODO: Group launching of multiple containers to a single
// NodeManager into a single connection
}
}
};
eventHandlingThread.setName("ContainerLauncher Event Handler");
eventHandlingThread.start();
super.serviceStart();
}