下面列出了java.util.concurrent.ThreadPoolExecutor#getMaximumPoolSize ( ) 实例代码,或者点击链接到github查看源代码,也可以在右侧发表评论。
/**
* 修改线程池
*
* @param executor
* @param name
* @param parametric
* @param coreKey
* @param maxKey
*/
public static void updateThreadPool(final ThreadPoolExecutor executor, final String name, final Parametric parametric,
final String coreKey, final String maxKey) {
if (executor == null) {
return;
}
Integer core = parametric.getInteger(coreKey);
if (core != null && core > 0 && core != executor.getCorePoolSize()) {
logger.info(String.format("Core pool size of %s is changed from %d to %d",
name, executor.getCorePoolSize(), core));
executor.setCorePoolSize(core);
}
Integer max = parametric.getInteger(maxKey);
if (max != null && max > 0 && max != executor.getMaximumPoolSize()) {
logger.info(String.format("Maximum pool size of %s is changed from %d to %d",
name, executor.getMaximumPoolSize(), max));
executor.setMaximumPoolSize(max);
}
}
@Override
public ListenableFuture<Long> count(Map<String, ? extends Collection<Integer>> indexShardMap,
final WhereClause whereClause) throws IOException, InterruptedException {
List<Callable<Long>> callableList = new ArrayList<>();
for (Map.Entry<String, ? extends Collection<Integer>> entry : indexShardMap.entrySet()) {
final String index = entry.getKey();
for (final Integer shardId : entry.getValue()) {
callableList.add(new Callable<Long>() {
@Override
public Long call() throws Exception {
return count(index, shardId, whereClause);
}
});
}
}
ThreadPoolExecutor executor = (ThreadPoolExecutor) threadPool.executor(ThreadPool.Names.SEARCH);
int corePoolSize = executor.getMaximumPoolSize();
MergePartialCountFunction mergeFunction = new MergePartialCountFunction();
ListenableFuture<List<Long>> listListenableFuture = ThreadPools.runWithAvailableThreads(
executor, corePoolSize, callableList, mergeFunction);
return Futures.transform(listListenableFuture, mergeFunction);
}
/**
* Scan an {@link Executor} or {@link ExecutorService}, recognizing several special
* implementations. Unwraps some decorating schedulers, recognizes {@link Scannable}
* schedulers and delegates to their {@link Scannable#scanUnsafe(Scannable.Attr)}
* method, introspects {@link ThreadPoolExecutor} instances.
* <p>
* If no data can be extracted, defaults to the provided {@code orElse}
* {@link Scannable#scanUnsafe(Scannable.Attr) scanUnsafe}.
*
* @param executor the executor to introspect in a best effort manner.
* @param key the key to scan for. CAPACITY and BUFFERED mainly.
* @return an equivalent of {@link Scannable#scanUnsafe(Scannable.Attr)} but that can
* also work on some implementations of {@link Executor}
*/
@Nullable
static final Object scanExecutor(Executor executor, Scannable.Attr key) {
if (executor instanceof DelegateServiceScheduler.UnsupportedScheduledExecutorService) {
executor = ((DelegateServiceScheduler.UnsupportedScheduledExecutorService) executor).get();
}
if (executor instanceof Scannable) {
return ((Scannable) executor).scanUnsafe(key);
}
if (executor instanceof ExecutorService) {
ExecutorService service = (ExecutorService) executor;
if (key == Scannable.Attr.TERMINATED) return service.isTerminated();
if (key == Scannable.Attr.CANCELLED) return service.isShutdown();
}
if (executor instanceof ThreadPoolExecutor) {
final ThreadPoolExecutor poolExecutor = (ThreadPoolExecutor) executor;
if (key == Scannable.Attr.CAPACITY) return poolExecutor.getMaximumPoolSize();
if (key == Scannable.Attr.BUFFERED) return ((Long) (poolExecutor.getTaskCount() - poolExecutor.getCompletedTaskCount())).intValue();
if (key == Scannable.Attr.LARGE_BUFFERED) return poolExecutor.getTaskCount() - poolExecutor.getCompletedTaskCount();
}
return null;
}
private void print(final ThreadPoolExecutor pool) {
System.out.println("==========================================");
final int activeCount = pool.getActiveCount();
System.out.println("activeCount = " + activeCount);
final int corePoolSize = pool.getCorePoolSize();
System.out.println("corePoolSize = " + corePoolSize);
final int largestPoolSize = pool.getLargestPoolSize();
System.out.println("largestPoolSize = " + largestPoolSize);
final int maximumPoolSize = pool.getMaximumPoolSize();
System.out.println("maximumPoolSize = " + maximumPoolSize);
final int poolSize = pool.getPoolSize();
System.out.println("poolSize = " + poolSize);
final int queueSize = pool.getQueue().size();
System.out.println("queueSize = " + queueSize);
final long taskCount = pool.getTaskCount();
System.out.println("taskCount = " + taskCount);
System.out.println("==========================================");
}
@Override
public int blockForAvailableThreads() {
if (this.threadPoolExecutorUsed) {
final ThreadPoolExecutor threadPoolExecutor = (ThreadPoolExecutor) this.executor;
synchronized (this.threadAvailableLock) {
while (threadPoolExecutor.getMaximumPoolSize() - threadPoolExecutor.getActiveCount() < 1 && !threadPoolExecutor.isShutdown()) {
try {
this.threadAvailableLock.wait(500L);
} catch (final InterruptedException ignore) {
// no-op
}
}
return threadPoolExecutor.getMaximumPoolSize() - threadPoolExecutor.getActiveCount();
}
} else {
return 1;
}
}
@Override
public Status check() {
DataStore dataStore = ExtensionLoader.getExtensionLoader(DataStore.class).getDefaultExtension();
Map<String, Object> executors = dataStore.get(Constants.EXECUTOR_SERVICE_COMPONENT_KEY);
StringBuilder msg = new StringBuilder();
Status.Level level = Status.Level.OK;
for (Map.Entry<String, Object> entry : executors.entrySet()) {
String port = entry.getKey();
ExecutorService executor = (ExecutorService) entry.getValue();
if (executor != null && executor instanceof ThreadPoolExecutor) {
ThreadPoolExecutor tp = (ThreadPoolExecutor) executor;
boolean ok = tp.getActiveCount() < tp.getMaximumPoolSize() - 1;
Status.Level lvl = Status.Level.OK;
if (!ok) {
level = Status.Level.WARN;
lvl = Status.Level.WARN;
}
if (msg.length() > 0) {
msg.append(";");
}
msg.append("Pool status:" + lvl
+ ", max:" + tp.getMaximumPoolSize()
+ ", core:" + tp.getCorePoolSize()
+ ", largest:" + tp.getLargestPoolSize()
+ ", active:" + tp.getActiveCount()
+ ", task:" + tp.getTaskCount()
+ ", service port: " + port);
}
}
return msg.length() == 0 ? new Status(Status.Level.UNKNOWN) : new Status(level, msg.toString());
}
public Status check() {
DataStore dataStore = ExtensionLoader.getExtensionLoader(DataStore.class).getDefaultExtension();
Map<String, Object> executors = dataStore.get(Constants.EXECUTOR_SERVICE_COMPONENT_KEY);
StringBuilder msg = new StringBuilder();
Status.Level level = Status.Level.OK;
for(Map.Entry<String, Object> entry : executors.entrySet()) {
String port = entry.getKey();
ExecutorService executor = (ExecutorService) entry.getValue();
if (executor != null && executor instanceof ThreadPoolExecutor) {
ThreadPoolExecutor tp = (ThreadPoolExecutor) executor;
boolean ok = tp.getActiveCount() < tp.getMaximumPoolSize() - 1;
Status.Level lvl = Status.Level.OK;
if(!ok) {
level = Status.Level.WARN;
lvl = Status.Level.WARN;
}
if(msg.length() > 0) {
msg.append(";");
}
msg.append("Pool status:" + lvl
+ ", max:" + tp.getMaximumPoolSize()
+ ", core:" + tp.getCorePoolSize()
+ ", largest:" + tp.getLargestPoolSize()
+ ", active:" + tp.getActiveCount()
+ ", task:" + tp.getTaskCount()
+ ", service port: " + port);
}
}
return msg.length() == 0 ? new Status(Status.Level.UNKNOWN) : new Status(level, msg.toString());
}
public Status check() {
DataStore dataStore = ExtensionLoader.getExtensionLoader(DataStore.class).getDefaultExtension();
Map<String, Object> executors = dataStore.get(Constants.EXECUTOR_SERVICE_COMPONENT_KEY);
StringBuilder msg = new StringBuilder();
Status.Level level = Status.Level.OK;
for(Map.Entry<String, Object> entry : executors.entrySet()) {
String port = entry.getKey();
ExecutorService executor = (ExecutorService) entry.getValue();
if (executor != null && executor instanceof ThreadPoolExecutor) {
ThreadPoolExecutor tp = (ThreadPoolExecutor) executor;
boolean ok = tp.getActiveCount() < tp.getMaximumPoolSize() - 1;
Status.Level lvl = Status.Level.OK;
if(!ok) {
level = Status.Level.WARN;
lvl = Status.Level.WARN;
}
if(msg.length() > 0) {
msg.append(";");
}
msg.append("Pool status:" + lvl
+ ", max:" + tp.getMaximumPoolSize()
+ ", core:" + tp.getCorePoolSize()
+ ", largest:" + tp.getLargestPoolSize()
+ ", active:" + tp.getActiveCount()
+ ", task:" + tp.getTaskCount()
+ ", service port: " + port);
}
}
return msg.length() == 0 ? new Status(Status.Level.UNKNOWN) : new Status(level, msg.toString());
}
private void resetThreadPoolSize() {
if (!ThreadPoolExecutor.class.isInstance(SumkThreadPool.executor())) {
return;
}
ThreadPoolExecutor pool = (ThreadPoolExecutor) SumkThreadPool.executor();
int size = AppInfo.getInt("sumk.core.threadpool.core", 0);
if (size > 0 && pool.getCorePoolSize() != size) {
logger.info("change ThreadPool size from {} to {}", pool.getCorePoolSize(), size);
pool.setCorePoolSize(size);
}
size = AppInfo.getInt("sumk.core.threadpool.max", 0);
if (size > 0 && pool.getMaximumPoolSize() != size) {
logger.info("change ThreadPool max size from {} to {}", pool.getMaximumPoolSize(), size);
pool.setMaximumPoolSize(size);
}
size = AppInfo.getInt("sumk.core.threadpool.aliveTime", 0);
if (size > 0 && pool.getKeepAliveTime(TimeUnit.MILLISECONDS) != size) {
logger.info("change ThreadPool keepalive time from {} to {}", pool.getKeepAliveTime(TimeUnit.MILLISECONDS),
size);
pool.setKeepAliveTime(size, TimeUnit.MILLISECONDS);
}
String v = AppInfo.get("sumk.core.threadpool.allowCoreThreadTimeOut", null);
if (v != null) {
boolean allowCoreTimeout = "1".equals(v) || "true".equalsIgnoreCase(v);
if (allowCoreTimeout != pool.allowsCoreThreadTimeOut()) {
logger.info("change ThreadPool allowsCoreThreadTimeOut from {} to {}", pool.allowsCoreThreadTimeOut(),
allowCoreTimeout);
pool.allowCoreThreadTimeOut(allowCoreTimeout);
}
}
}
public Status check() {
DataStore dataStore = ExtensionLoader.getExtensionLoader(DataStore.class).getDefaultExtension();
Map<String, Object> executors = dataStore.get(Constants.EXECUTOR_SERVICE_COMPONENT_KEY);
StringBuilder msg = new StringBuilder();
Status.Level level = Status.Level.OK;
for(Map.Entry<String, Object> entry : executors.entrySet()) {
String port = entry.getKey();
ExecutorService executor = (ExecutorService) entry.getValue();
if (executor != null && executor instanceof ThreadPoolExecutor) {
ThreadPoolExecutor tp = (ThreadPoolExecutor) executor;
boolean ok = tp.getActiveCount() < tp.getMaximumPoolSize() - 1;
Status.Level lvl = Status.Level.OK;
if(!ok) {
level = Status.Level.WARN;
lvl = Status.Level.WARN;
}
if(msg.length() > 0) {
msg.append(";");
}
msg.append("Pool status:" + lvl
+ ", max:" + tp.getMaximumPoolSize()
+ ", core:" + tp.getCorePoolSize()
+ ", largest:" + tp.getLargestPoolSize()
+ ", active:" + tp.getActiveCount()
+ ", task:" + tp.getTaskCount()
+ ", service port: " + port);
}
}
return msg.length() == 0 ? new Status(Status.Level.UNKNOWN) : new Status(level, msg.toString());
}
public Status check() {
DataStore dataStore = ExtensionLoader.getExtensionLoader(DataStore.class).getDefaultExtension();
Map<String, Object> executors = dataStore.get(Constants.EXECUTOR_SERVICE_COMPONENT_KEY);
StringBuilder msg = new StringBuilder();
Status.Level level = Status.Level.OK;
for(Map.Entry<String, Object> entry : executors.entrySet()) {
String port = entry.getKey();
ExecutorService executor = (ExecutorService) entry.getValue();
if (executor != null && executor instanceof ThreadPoolExecutor) {
ThreadPoolExecutor tp = (ThreadPoolExecutor) executor;
boolean ok = tp.getActiveCount() < tp.getMaximumPoolSize() - 1;
Status.Level lvl = Status.Level.OK;
if(!ok) {
level = Status.Level.WARN;
lvl = Status.Level.WARN;
}
if(msg.length() > 0) {
msg.append(";");
}
msg.append("Pool status:" + lvl
+ ", max:" + tp.getMaximumPoolSize()
+ ", core:" + tp.getCorePoolSize()
+ ", largest:" + tp.getLargestPoolSize()
+ ", active:" + tp.getActiveCount()
+ ", task:" + tp.getTaskCount()
+ ", service port: " + port);
}
}
return msg.length() == 0 ? new Status(Status.Level.UNKNOWN) : new Status(level, msg.toString());
}
public boolean isLowOnThreads()
{
if (_executor instanceof ThreadPoolExecutor)
{
final ThreadPoolExecutor tpe = (ThreadPoolExecutor)_executor;
// getActiveCount() locks the thread pool, so execute it last
return tpe.getPoolSize() == tpe.getMaximumPoolSize() &&
tpe.getQueue().size() >= tpe.getPoolSize() - tpe.getActiveCount();
}
return false;
}
@Override
protected Result check() throws Exception {
DataStore dataStore = ExtensionLoader.getExtensionLoader(DataStore.class).getDefaultExtension();
Map<String, Object> executors = dataStore.get(Constants.EXECUTOR_SERVICE_COMPONENT_KEY);
for (Map.Entry<String, Object> entry : executors.entrySet()) {
ExecutorService executor = (ExecutorService) entry.getValue();
if (executor != null && executor instanceof ThreadPoolExecutor) {
ThreadPoolExecutor tp = (ThreadPoolExecutor) executor;
int activeCount = tp.getActiveCount();
int maximumPoolSize = tp.getMaximumPoolSize();
boolean ok = maximumPoolSize - activeCount > 1;
if (ok) {
return Result.healthy();
} else {
int remainingCapacity = tp.getQueue().remainingCapacity();
ok = remainingCapacity > 1;
if (ok) {
return Result.healthy();
} else {
return Result.unhealthy("maximumPoolSize:%s,activeCount:%s,remainingCapacity:%s", maximumPoolSize, activeCount,
remainingCapacity);
}
}
}
}
return Result.healthy();
}
/**
* Description: 通过线程池来处理消息<br>
*
* @author 王伟<br>
* @taskId <br>
* @param channel
* @param message
* @throws InterruptedException <br>
*/
public static void execute(final String channel, final Runnable message) {
synchronized (channel) {
ThreadPoolExecutor executor = executorMap.get(channel);
if (executor == null) {
executor = createThreadPoolExecutor();
executorMap.put(channel, executor);
}
BlockingQueue<Runnable> bq = executor.getQueue();
// 当线程池中的队列出现阻塞后,暂停从redis中进行获取
try {
long count = 0;
while (bq.remainingCapacity() == 0 && executor.getMaximumPoolSize() == executor.getPoolSize()) {
if (count++ % NUM_100 == 0) {
LoggerUtil.debug("wait message[{0}] execute, current pool size is [{1}]", channel, bq.size());
}
Thread.sleep(NUM_100);
}
executor.execute(message);
}
catch (InterruptedException e) {
LoggerUtil.error(e);
}
}
}
public Status check() {
DataStore dataStore = ExtensionLoader.getExtensionLoader(DataStore.class).getDefaultExtension();
Map<String, Object> executors = dataStore.get(Constants.EXECUTOR_SERVICE_COMPONENT_KEY);
StringBuilder msg = new StringBuilder();
Status.Level level = Status.Level.OK;
for(Map.Entry<String, Object> entry : executors.entrySet()) {
String port = entry.getKey();
ExecutorService executor = (ExecutorService) entry.getValue();
if (executor != null && executor instanceof ThreadPoolExecutor) {
ThreadPoolExecutor tp = (ThreadPoolExecutor) executor;
boolean ok = tp.getActiveCount() < tp.getMaximumPoolSize() - 1;
Status.Level lvl = Status.Level.OK;
if(!ok) {
level = Status.Level.WARN;
lvl = Status.Level.WARN;
}
if(msg.length() > 0) {
msg.append(";");
}
msg.append("Pool status:" + lvl
+ ", max:" + tp.getMaximumPoolSize()
+ ", core:" + tp.getCorePoolSize()
+ ", largest:" + tp.getLargestPoolSize()
+ ", active:" + tp.getActiveCount()
+ ", task:" + tp.getTaskCount()
+ ", service port: " + port);
}
}
return msg.length() == 0 ? new Status(Status.Level.UNKNOWN) : new Status(level, msg.toString());
}
@Override
public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
final long error = rejectedCount.incrementAndGet();
if ((error % logRate) == 0) {
final int maxPoolSize = executor != null ? executor.getMaximumPoolSize() : -1;
logger.warn("The executor uses finite bounds for both maximum threads and work queue capacity, and is saturated. Check the maxPoolSize, queueCapacity, and HBase options in the configuration. maxPoolSize={}, rejectedCount={}", maxPoolSize, error);
}
}
/**
* Is the pool low on threads ?
* @return True if active threads >= maximum number of threads
*/
public boolean isLowOnThreads()
{
if (executor instanceof ThreadPoolExecutor)
{
final ThreadPoolExecutor tpe = (ThreadPoolExecutor)executor;
return tpe.getActiveCount() >= tpe.getMaximumPoolSize();
}
return false;
}
ExecutorStateEvent(ExecutorService executor, LeaseCoordinator leaseCoordinator) {
if (executor instanceof ThreadPoolExecutor) {
this.isThreadPoolExecutor = true;
ThreadPoolExecutor ex = (ThreadPoolExecutor) executor;
this.executorName = ex.getClass().getSimpleName();
this.currentQueueSize = ex.getQueue().size();
this.activeThreads = ex.getActiveCount();
this.coreThreads = ex.getCorePoolSize();
this.largestPoolSize = ex.getLargestPoolSize();
this.maximumPoolSize = ex.getMaximumPoolSize();
}
this.leasesOwned = leaseCoordinator.getAssignments().size();
}
@Override
public ExecutorService getUnorderedExecutor(String poolAlias, BlockingQueue<Runnable> queue) {
ThreadPoolExecutor executor = getThreadPoolExecutor(poolAlias);
return new PartitionedUnorderedExecutor(queue, executor, executor.getMaximumPoolSize());
}
/**
* Returns maximum number of tasks that can be submitted to given
* pool (with bounded queue) before saturation (when submission
* throws RejectedExecutionException).
*/
static final int saturatedSize(ThreadPoolExecutor pool) {
BlockingQueue<Runnable> q = pool.getQueue();
return pool.getMaximumPoolSize() + q.size() + q.remainingCapacity();
}