下面列出了java.util.concurrent.ThreadPoolExecutor#getCorePoolSize ( ) 实例代码,或者点击链接到github查看源代码,也可以在右侧发表评论。
/**
* 修改线程池
*
* @param executor
* @param name
* @param parametric
* @param coreKey
* @param maxKey
*/
public static void updateThreadPool(final ThreadPoolExecutor executor, final String name, final Parametric parametric,
final String coreKey, final String maxKey) {
if (executor == null) {
return;
}
Integer core = parametric.getInteger(coreKey);
if (core != null && core > 0 && core != executor.getCorePoolSize()) {
logger.info(String.format("Core pool size of %s is changed from %d to %d",
name, executor.getCorePoolSize(), core));
executor.setCorePoolSize(core);
}
Integer max = parametric.getInteger(maxKey);
if (max != null && max > 0 && max != executor.getMaximumPoolSize()) {
logger.info(String.format("Maximum pool size of %s is changed from %d to %d",
name, executor.getMaximumPoolSize(), max));
executor.setMaximumPoolSize(max);
}
}
private void print(final ThreadPoolExecutor pool) {
System.out.println("==========================================");
final int activeCount = pool.getActiveCount();
System.out.println("activeCount = " + activeCount);
final int corePoolSize = pool.getCorePoolSize();
System.out.println("corePoolSize = " + corePoolSize);
final int largestPoolSize = pool.getLargestPoolSize();
System.out.println("largestPoolSize = " + largestPoolSize);
final int maximumPoolSize = pool.getMaximumPoolSize();
System.out.println("maximumPoolSize = " + maximumPoolSize);
final int poolSize = pool.getPoolSize();
System.out.println("poolSize = " + poolSize);
final int queueSize = pool.getQueue().size();
System.out.println("queueSize = " + queueSize);
final long taskCount = pool.getTaskCount();
System.out.println("taskCount = " + taskCount);
System.out.println("==========================================");
}
private void resetThreadPoolSize() {
if (!ThreadPoolExecutor.class.isInstance(SumkThreadPool.executor())) {
return;
}
ThreadPoolExecutor pool = (ThreadPoolExecutor) SumkThreadPool.executor();
int size = AppInfo.getInt("sumk.core.threadpool.core", 0);
if (size > 0 && pool.getCorePoolSize() != size) {
logger.info("change ThreadPool size from {} to {}", pool.getCorePoolSize(), size);
pool.setCorePoolSize(size);
}
size = AppInfo.getInt("sumk.core.threadpool.max", 0);
if (size > 0 && pool.getMaximumPoolSize() != size) {
logger.info("change ThreadPool max size from {} to {}", pool.getMaximumPoolSize(), size);
pool.setMaximumPoolSize(size);
}
size = AppInfo.getInt("sumk.core.threadpool.aliveTime", 0);
if (size > 0 && pool.getKeepAliveTime(TimeUnit.MILLISECONDS) != size) {
logger.info("change ThreadPool keepalive time from {} to {}", pool.getKeepAliveTime(TimeUnit.MILLISECONDS),
size);
pool.setKeepAliveTime(size, TimeUnit.MILLISECONDS);
}
String v = AppInfo.get("sumk.core.threadpool.allowCoreThreadTimeOut", null);
if (v != null) {
boolean allowCoreTimeout = "1".equals(v) || "true".equalsIgnoreCase(v);
if (allowCoreTimeout != pool.allowsCoreThreadTimeOut()) {
logger.info("change ThreadPool allowsCoreThreadTimeOut from {} to {}", pool.allowsCoreThreadTimeOut(),
allowCoreTimeout);
pool.allowCoreThreadTimeOut(allowCoreTimeout);
}
}
}
private static void logExecutor(final String name, final ThreadPoolExecutor executor) {
final int corePoolSize = executor.getCorePoolSize();
final int poolSize = executor.getPoolSize();
final int activeCount = executor.getActiveCount();
final long taskCount = executor.getTaskCount();
final long completedCount = executor.getCompletedTaskCount();
final boolean isShutdown = executor.isShutdown();
final boolean isTerminated = executor.isTerminated();
Log.v(TAG, name + " CorePoolSize:" + corePoolSize + " PoolSize:" + poolSize);
Log.v(TAG, name + " isShutdown:" + isShutdown + " isTerminated:" + isTerminated);
Log.v(TAG, name + " activeCount:" + activeCount + " taskCount:" + taskCount
+ " completedCount:" + completedCount);
}
@Override
public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
if (executor.getPoolSize() >= executor.getCorePoolSize()) {
throw new RejectedExecutionException(
"Can't accept new asynchronous request. Too many asynchronous jobs in progress");
}
delegate.rejectedExecution(r, executor);
}
ExecutorStateEvent(ExecutorService executor, LeaseCoordinator leaseCoordinator) {
if (executor instanceof ThreadPoolExecutor) {
this.isThreadPoolExecutor = true;
ThreadPoolExecutor ex = (ThreadPoolExecutor) executor;
this.executorName = ex.getClass().getSimpleName();
this.currentQueueSize = ex.getQueue().size();
this.activeThreads = ex.getActiveCount();
this.coreThreads = ex.getCorePoolSize();
this.largestPoolSize = ex.getLargestPoolSize();
this.maximumPoolSize = ex.getMaximumPoolSize();
}
this.leasesOwned = leaseCoordinator.getAssignments().size();
}
@Override
protected void serviceStart() throws Exception {
client.start();
ThreadFactory tf = new ThreadFactoryBuilder().setNameFormat(
this.getClass().getName() + " #%d").setDaemon(true).build();
// Start with a default core-pool size and change it dynamically.
int initSize = Math.min(INITIAL_THREAD_POOL_SIZE, maxThreadPoolSize);
threadPool = new ThreadPoolExecutor(initSize, Integer.MAX_VALUE, 1,
TimeUnit.HOURS, new LinkedBlockingQueue<Runnable>(), tf);
eventDispatcherThread = new Thread() {
@Override
public void run() {
ContainerEvent event = null;
Set<String> allNodes = new HashSet<String>();
while (!stopped.get() && !Thread.currentThread().isInterrupted()) {
try {
event = events.take();
} catch (InterruptedException e) {
if (!stopped.get()) {
LOG.error("Returning, thread interrupted", e);
}
return;
}
allNodes.add(event.getNodeId().toString());
int threadPoolSize = threadPool.getCorePoolSize();
// We can increase the pool size only if haven't reached the maximum
// limit yet.
if (threadPoolSize != maxThreadPoolSize) {
// nodes where containers will run at *this* point of time. This is
// *not* the cluster size and doesn't need to be.
int nodeNum = allNodes.size();
int idealThreadPoolSize = Math.min(maxThreadPoolSize, nodeNum);
if (threadPoolSize < idealThreadPoolSize) {
// Bump up the pool size to idealThreadPoolSize +
// INITIAL_POOL_SIZE, the later is just a buffer so we are not
// always increasing the pool-size
int newThreadPoolSize = Math.min(maxThreadPoolSize,
idealThreadPoolSize + INITIAL_THREAD_POOL_SIZE);
LOG.info("Set NMClientAsync thread pool size to " +
newThreadPoolSize + " as the number of nodes to talk to is "
+ nodeNum);
threadPool.setCorePoolSize(newThreadPoolSize);
}
}
// the events from the queue are handled in parallel with a thread
// pool
threadPool.execute(getContainerEventProcessor(event));
// TODO: Group launching of multiple containers to a single
// NodeManager into a single connection
}
}
};
eventDispatcherThread.setName("Container Event Dispatcher");
eventDispatcherThread.setDaemon(false);
eventDispatcherThread.start();
super.serviceStart();
}
protected void serviceStart() throws Exception {
ThreadFactory tf = new ThreadFactoryBuilder().setNameFormat(
"ContainerLauncher #%d").setDaemon(true).build();
// Start with a default core-pool size of 10 and change it dynamically.
launcherPool = new ThreadPoolExecutor(initialPoolSize,
Integer.MAX_VALUE, 1, TimeUnit.HOURS,
new LinkedBlockingQueue<Runnable>(),
tf);
eventHandlingThread = new Thread() {
@Override
public void run() {
ContainerLauncherEvent event = null;
Set<String> allNodes = new HashSet<String>();
while (!stopped.get() && !Thread.currentThread().isInterrupted()) {
try {
event = eventQueue.take();
} catch (InterruptedException e) {
if (!stopped.get()) {
LOG.error("Returning, interrupted : " + e);
}
return;
}
allNodes.add(event.getContainerMgrAddress());
int poolSize = launcherPool.getCorePoolSize();
// See if we need up the pool size only if haven't reached the
// maximum limit yet.
if (poolSize != limitOnPoolSize) {
// nodes where containers will run at *this* point of time. This is
// *not* the cluster size and doesn't need to be.
int numNodes = allNodes.size();
int idealPoolSize = Math.min(limitOnPoolSize, numNodes);
if (poolSize < idealPoolSize) {
// Bump up the pool size to idealPoolSize+initialPoolSize, the
// later is just a buffer so we are not always increasing the
// pool-size
int newPoolSize = Math.min(limitOnPoolSize, idealPoolSize
+ initialPoolSize);
LOG.info("Setting ContainerLauncher pool size to " + newPoolSize
+ " as number-of-nodes to talk to is " + numNodes);
launcherPool.setCorePoolSize(newPoolSize);
}
}
// the events from the queue are handled in parallel
// using a thread pool
launcherPool.execute(createEventProcessor(event));
// TODO: Group launching of multiple containers to a single
// NodeManager into a single connection
}
}
};
eventHandlingThread.setName("ContainerLauncher Event Handler");
eventHandlingThread.start();
super.serviceStart();
}
@Override
protected void serviceStart() throws Exception {
client.start();
ThreadFactory tf = new ThreadFactoryBuilder().setNameFormat(
this.getClass().getName() + " #%d").setDaemon(true).build();
// Start with a default core-pool size and change it dynamically.
int initSize = Math.min(INITIAL_THREAD_POOL_SIZE, maxThreadPoolSize);
threadPool = new ThreadPoolExecutor(initSize, Integer.MAX_VALUE, 1,
TimeUnit.HOURS, new LinkedBlockingQueue<Runnable>(), tf);
eventDispatcherThread = new Thread() {
@Override
public void run() {
ContainerEvent event = null;
Set<String> allNodes = new HashSet<String>();
while (!stopped.get() && !Thread.currentThread().isInterrupted()) {
try {
event = events.take();
} catch (InterruptedException e) {
if (!stopped.get()) {
LOG.error("Returning, thread interrupted", e);
}
return;
}
allNodes.add(event.getNodeId().toString());
int threadPoolSize = threadPool.getCorePoolSize();
// We can increase the pool size only if haven't reached the maximum
// limit yet.
if (threadPoolSize != maxThreadPoolSize) {
// nodes where containers will run at *this* point of time. This is
// *not* the cluster size and doesn't need to be.
int nodeNum = allNodes.size();
int idealThreadPoolSize = Math.min(maxThreadPoolSize, nodeNum);
if (threadPoolSize < idealThreadPoolSize) {
// Bump up the pool size to idealThreadPoolSize +
// INITIAL_POOL_SIZE, the later is just a buffer so we are not
// always increasing the pool-size
int newThreadPoolSize = Math.min(maxThreadPoolSize,
idealThreadPoolSize + INITIAL_THREAD_POOL_SIZE);
LOG.info("Set NMClientAsync thread pool size to " +
newThreadPoolSize + " as the number of nodes to talk to is "
+ nodeNum);
threadPool.setCorePoolSize(newThreadPoolSize);
}
}
// the events from the queue are handled in parallel with a thread
// pool
threadPool.execute(getContainerEventProcessor(event));
// TODO: Group launching of multiple containers to a single
// NodeManager into a single connection
}
}
};
eventDispatcherThread.setName("Container Event Dispatcher");
eventDispatcherThread.setDaemon(false);
eventDispatcherThread.start();
super.serviceStart();
}
protected void serviceStart() throws Exception {
ThreadFactory tf = new ThreadFactoryBuilder().setNameFormat(
"ContainerLauncher #%d").setDaemon(true).build();
// Start with a default core-pool size of 10 and change it dynamically.
launcherPool = new ThreadPoolExecutor(initialPoolSize,
Integer.MAX_VALUE, 1, TimeUnit.HOURS,
new LinkedBlockingQueue<Runnable>(),
tf);
eventHandlingThread = new Thread() {
@Override
public void run() {
ContainerLauncherEvent event = null;
Set<String> allNodes = new HashSet<String>();
while (!stopped.get() && !Thread.currentThread().isInterrupted()) {
try {
event = eventQueue.take();
} catch (InterruptedException e) {
if (!stopped.get()) {
LOG.error("Returning, interrupted : " + e);
}
return;
}
allNodes.add(event.getContainerMgrAddress());
int poolSize = launcherPool.getCorePoolSize();
// See if we need up the pool size only if haven't reached the
// maximum limit yet.
if (poolSize != limitOnPoolSize) {
// nodes where containers will run at *this* point of time. This is
// *not* the cluster size and doesn't need to be.
int numNodes = allNodes.size();
int idealPoolSize = Math.min(limitOnPoolSize, numNodes);
if (poolSize < idealPoolSize) {
// Bump up the pool size to idealPoolSize+initialPoolSize, the
// later is just a buffer so we are not always increasing the
// pool-size
int newPoolSize = Math.min(limitOnPoolSize, idealPoolSize
+ initialPoolSize);
LOG.info("Setting ContainerLauncher pool size to " + newPoolSize
+ " as number-of-nodes to talk to is " + numNodes);
launcherPool.setCorePoolSize(newPoolSize);
}
}
// the events from the queue are handled in parallel
// using a thread pool
launcherPool.execute(createEventProcessor(event));
// TODO: Group launching of multiple containers to a single
// NodeManager into a single connection
}
}
};
eventHandlingThread.setName("ContainerLauncher Event Handler");
eventHandlingThread.start();
super.serviceStart();
}
@Override
public void serviceStart() {
cmProxy =
new ContainerManagementProtocolProxy(getConfig());
ThreadFactory tf = new ThreadFactoryBuilder().setNameFormat(
"ContainerLauncher #%d").setDaemon(true).build();
// Start with a default core-pool size of 10 and change it dynamically.
launcherPool = new ThreadPoolExecutor(INITIAL_POOL_SIZE,
Integer.MAX_VALUE, 1, TimeUnit.HOURS,
new LinkedBlockingQueue<Runnable>(),
tf);
eventHandlingThread = new Thread() {
@Override
public void run() {
NMCommunicatorEvent event = null;
while (!Thread.currentThread().isInterrupted()) {
try {
event = eventQueue.take();
} catch (InterruptedException e) {
if(!serviceStopped.get()) {
LOG.error("Returning, interrupted : " + e);
}
return;
}
int poolSize = launcherPool.getCorePoolSize();
// See if we need up the pool size only if haven't reached the
// maximum limit yet.
if (poolSize != limitOnPoolSize) {
// nodes where containers will run at *this* point of time. This is
// *not* the cluster size and doesn't need to be.
int numNodes = context.getAllNodes().size();
int idealPoolSize = Math.min(limitOnPoolSize, numNodes);
if (poolSize < idealPoolSize) {
// Bump up the pool size to idealPoolSize+INITIAL_POOL_SIZE, the
// later is just a buffer so we are not always increasing the
// pool-size
int newPoolSize = Math.min(limitOnPoolSize, idealPoolSize
+ INITIAL_POOL_SIZE);
LOG.info("Setting ContainerLauncher pool size to " + newPoolSize
+ " as number-of-nodes to talk to is " + numNodes);
launcherPool.setCorePoolSize(newPoolSize);
}
}
// the events from the queue are handled in parallel
// using a thread pool
launcherPool.execute(createEventProcessor(event));
// TODO: Group launching of multiple containers to a single
// NodeManager into a single connection
}
}
};
eventHandlingThread.setName("ContainerLauncher Event Handler");
eventHandlingThread.start();
}
@Override
public void start() throws TezException {
// pass a copy of config to ContainerManagementProtocolProxy until YARN-3497 is fixed
cmProxy =
new ContainerManagementProtocolProxy(conf);
ThreadFactory tf = new ThreadFactoryBuilder().setNameFormat(
"ContainerLauncher #%d").setDaemon(true).build();
// Start with a default core-pool size of 10 and change it dynamically.
launcherPool = new ThreadPoolExecutor(INITIAL_POOL_SIZE,
Integer.MAX_VALUE, 1, TimeUnit.HOURS,
new LinkedBlockingQueue<Runnable>(),
tf, new CustomizedRejectedExecutionHandler());
eventHandlingThread = new Thread() {
@Override
public void run() {
ContainerOp event = null;
while (!Thread.currentThread().isInterrupted()) {
try {
event = eventQueue.take();
} catch (InterruptedException e) {
if(!serviceStopped.get()) {
LOG.error("Returning, interrupted : " + e);
}
return;
}
int poolSize = launcherPool.getCorePoolSize();
// See if we need up the pool size only if haven't reached the
// maximum limit yet.
if (poolSize != limitOnPoolSize) {
// nodes where containers will run at *this* point of time. This is
// *not* the cluster size and doesn't need to be.
int numNodes =
getContext().getNumNodes(TezConstants.getTezYarnServicePluginName());
int idealPoolSize = Math.min(limitOnPoolSize, numNodes);
if (poolSize < idealPoolSize) {
// Bump up the pool size to idealPoolSize+INITIAL_POOL_SIZE, the
// later is just a buffer so we are not always increasing the
// pool-size
int newPoolSize = Math.min(limitOnPoolSize, idealPoolSize
+ INITIAL_POOL_SIZE);
LOG.info("Setting ContainerLauncher pool size to " + newPoolSize
+ " as number-of-nodes to talk to is " + numNodes);
launcherPool.setCorePoolSize(newPoolSize);
}
}
// the events from the queue are handled in parallel
// using a thread pool
launcherPool.execute(createEventProcessor(event));
// TODO: Group launching of multiple containers to a single
// NodeManager into a single connection
}
}
};
eventHandlingThread.setName("ContainerLauncher Event Handler");
eventHandlingThread.start();
boolean cleanupDagDataOnComplete = ShuffleUtils.isTezShuffleHandler(conf)
&& conf.getBoolean(TezConfiguration.TEZ_AM_DAG_CLEANUP_ON_COMPLETION,
TezConfiguration.TEZ_AM_DAG_CLEANUP_ON_COMPLETION_DEFAULT);
if (cleanupDagDataOnComplete) {
String deletionTrackerClassName = conf.get(TezConfiguration.TEZ_AM_DELETION_TRACKER_CLASS,
TezConfiguration.TEZ_AM_DELETION_TRACKER_CLASS_DEFAULT);
deletionTracker = ReflectionUtils.createClazzInstance(
deletionTrackerClassName, new Class[]{Configuration.class}, new Object[]{conf});
}
}
public CacheBaseInfo(HeapCache _heapCache, InternalCache _userCache, long now) {
infoCreatedTime = now;
cache = _userCache;
heapCache = _heapCache;
metrics = _heapCache.metrics;
EvictionMetrics em = _heapCache.eviction.getMetrics();
newEntryCnt = em.getNewEntryCount();
expiredRemoveCnt = em.getExpiredRemovedCount();
evictedCnt = em.getEvictedCount();
maxSize = em.getMaxSize();
maxWeight = em.getMaxWeight();
currentWeight = em.getCurrentWeight();
clearedTime = _heapCache.clearedTime;
keyMutationCnt = _heapCache.keyMutationCnt;
removedCnt = em.getRemovedCount();
clearRemovedCnt = _heapCache.clearRemovedCnt;
clearCnt = _heapCache.clearCnt;
internalExceptionCnt = _heapCache.internalExceptionCnt;
evictionRunningCnt = em.getEvictionRunningCount();
integrityState = _heapCache.getIntegrityState();
collisionInfo = new CollisionInfo();
_heapCache.hash.calcHashCollisionInfo(collisionInfo);
extraStatistics = em.getExtraStatistics();
if (extraStatistics.startsWith(", ")) {
extraStatistics = extraStatistics.substring(2);
}
size = heapCache.getLocalSize();
missCnt = metrics.getReadThroughCount() + metrics.getExplicitLoadCount() +
metrics.getPeekHitNotFreshCount() + metrics.getPeekMissCount();
hitCnt = em.getHitCount();
correctedPutCnt = metrics.getPutNewEntryCount() + metrics.getPutHitCount();
if (_heapCache.loaderExecutor instanceof ExclusiveExecutor) {
ThreadPoolExecutor ex = ((ExclusiveExecutor) _heapCache.loaderExecutor).getThreadPoolExecutor();
asyncLoadsInFlight = ex.getActiveCount();
asyncLoadsStarted = ex.getTaskCount();
loaderThreadsLimit = ex.getCorePoolSize();
loaderThreadsMaxActive = ex.getLargestPoolSize();
}
totalLoadCnt = metrics.getReadThroughCount() + metrics.getExplicitLoadCount() +
metrics.getRefreshCount();
}