下面列出了java.util.concurrent.ThreadPoolExecutor#getPoolSize ( ) 实例代码,或者点击链接到github查看源代码,也可以在右侧发表评论。
/**
* allowCoreThreadTimeOut(true) causes idle threads to time out
*/
public void testAllowCoreThreadTimeOut_true() throws Exception {
long keepAliveTime = timeoutMillis();
final ThreadPoolExecutor p =
new ThreadPoolExecutor(2, 10,
keepAliveTime, MILLISECONDS,
new ArrayBlockingQueue<Runnable>(10));
try (PoolCleaner cleaner = cleaner(p)) {
final CountDownLatch threadStarted = new CountDownLatch(1);
p.allowCoreThreadTimeOut(true);
p.execute(new CheckedRunnable() {
public void realRun() {
threadStarted.countDown();
assertEquals(1, p.getPoolSize());
}});
await(threadStarted);
delay(keepAliveTime);
long startTime = System.nanoTime();
while (p.getPoolSize() > 0
&& millisElapsedSince(startTime) < LONG_DELAY_MS)
Thread.yield();
assertTrue(millisElapsedSince(startTime) < LONG_DELAY_MS);
assertEquals(0, p.getPoolSize());
}
}
/**
* allowCoreThreadTimeOut(true) causes idle threads to time out
*/
public void testAllowCoreThreadTimeOut_true() throws Exception {
long keepAliveTime = timeoutMillis();
final ThreadPoolExecutor p =
new CustomTPE(2, 10,
keepAliveTime, MILLISECONDS,
new ArrayBlockingQueue<Runnable>(10));
try (PoolCleaner cleaner = cleaner(p)) {
final CountDownLatch threadStarted = new CountDownLatch(1);
p.allowCoreThreadTimeOut(true);
p.execute(new CheckedRunnable() {
public void realRun() {
threadStarted.countDown();
assertEquals(1, p.getPoolSize());
}});
await(threadStarted);
delay(keepAliveTime);
long startTime = System.nanoTime();
while (p.getPoolSize() > 0
&& millisElapsedSince(startTime) < LONG_DELAY_MS)
Thread.yield();
assertTrue(millisElapsedSince(startTime) < LONG_DELAY_MS);
assertEquals(0, p.getPoolSize());
}
}
/**
* Create description of an executor service for logging.
*
* @param execSvcName Name of the service.
* @param execSvc Service to create a description for.
*/
private String createExecutorDescription(String execSvcName, ExecutorService execSvc) {
int poolActiveThreads = 0;
int poolIdleThreads = 0;
int poolQSize = 0;
if (execSvc instanceof ThreadPoolExecutor) {
ThreadPoolExecutor exec = (ThreadPoolExecutor)execSvc;
int poolSize = exec.getPoolSize();
poolActiveThreads = Math.min(poolSize, exec.getActiveCount());
poolIdleThreads = poolSize - poolActiveThreads;
poolQSize = exec.getQueue().size();
}
return execSvcName + " [active=" + poolActiveThreads + ", idle=" + poolIdleThreads + ", qSize=" + poolQSize + "]";
}
/**
* allowCoreThreadTimeOut(true) causes idle threads to time out
*/
public void testAllowCoreThreadTimeOut_true() throws Exception {
long keepAliveTime = timeoutMillis();
final ThreadPoolExecutor p =
new ThreadPoolExecutor(2, 10,
keepAliveTime, MILLISECONDS,
new ArrayBlockingQueue<Runnable>(10));
try (PoolCleaner cleaner = cleaner(p)) {
final CountDownLatch threadStarted = new CountDownLatch(1);
p.allowCoreThreadTimeOut(true);
p.execute(new CheckedRunnable() {
public void realRun() {
threadStarted.countDown();
assertEquals(1, p.getPoolSize());
}});
await(threadStarted);
delay(keepAliveTime);
long startTime = System.nanoTime();
while (p.getPoolSize() > 0
&& millisElapsedSince(startTime) < LONG_DELAY_MS)
Thread.yield();
assertTrue(millisElapsedSince(startTime) < LONG_DELAY_MS);
assertEquals(0, p.getPoolSize());
}
}
/**
* allowCoreThreadTimeOut(true) causes idle threads to time out
*/
public void testAllowCoreThreadTimeOut_true() throws Exception {
long keepAliveTime = timeoutMillis();
final ThreadPoolExecutor p =
new CustomTPE(2, 10,
keepAliveTime, MILLISECONDS,
new ArrayBlockingQueue<Runnable>(10));
try (PoolCleaner cleaner = cleaner(p)) {
final CountDownLatch threadStarted = new CountDownLatch(1);
p.allowCoreThreadTimeOut(true);
p.execute(new CheckedRunnable() {
public void realRun() {
threadStarted.countDown();
assertEquals(1, p.getPoolSize());
}});
await(threadStarted);
delay(keepAliveTime);
long startTime = System.nanoTime();
while (p.getPoolSize() > 0
&& millisElapsedSince(startTime) < LONG_DELAY_MS)
Thread.yield();
assertTrue(millisElapsedSince(startTime) < LONG_DELAY_MS);
assertEquals(0, p.getPoolSize());
}
}
private void print(final ThreadPoolExecutor pool) {
System.out.println("==========================================");
final int activeCount = pool.getActiveCount();
System.out.println("activeCount = " + activeCount);
final int corePoolSize = pool.getCorePoolSize();
System.out.println("corePoolSize = " + corePoolSize);
final int largestPoolSize = pool.getLargestPoolSize();
System.out.println("largestPoolSize = " + largestPoolSize);
final int maximumPoolSize = pool.getMaximumPoolSize();
System.out.println("maximumPoolSize = " + maximumPoolSize);
final int poolSize = pool.getPoolSize();
System.out.println("poolSize = " + poolSize);
final int queueSize = pool.getQueue().size();
System.out.println("queueSize = " + queueSize);
final long taskCount = pool.getTaskCount();
System.out.println("taskCount = " + taskCount);
System.out.println("==========================================");
}
public static int getPoolSize() {
ThreadPoolExecutor executer = (ThreadPoolExecutor)serviceRef.get();
if (serviceRef.get() == null) {
return 0;
}
return executer.getPoolSize();
}
private static void realMain(String[] args) throws Throwable {
final int n = 4;
final CyclicBarrier barrier = new CyclicBarrier(2*n+1);
final ThreadPoolExecutor pool
= new ThreadPoolExecutor(n, 2*n,
KEEPALIVE_MS, MILLISECONDS,
new SynchronousQueue<Runnable>());
final Runnable r = new Runnable() { public void run() {
try {
barrier.await();
barrier.await();
} catch (Throwable t) { unexpected(t); }}};
for (int i = 0; i < 2*n; i++)
pool.execute(r);
barrier.await();
checkPoolSizes(pool, 2*n, n, 2*n);
barrier.await();
long nap = KEEPALIVE_MS + (KEEPALIVE_MS >> 2);
for (long sleepyTime = 0L; pool.getPoolSize() > n; ) {
check((sleepyTime += nap) <= LONG_DELAY_MS);
Thread.sleep(nap);
}
checkPoolSizes(pool, n, n, 2*n);
Thread.sleep(nap);
checkPoolSizes(pool, n, n, 2*n);
pool.shutdown();
check(pool.awaitTermination(LONG_DELAY_MS, MILLISECONDS));
}
public static int getPoolSize() {
ThreadPoolExecutor executer = (ThreadPoolExecutor)serviceRef.get();
if (serviceRef.get() == null)
return 0;
return executer.getPoolSize();
}
public int getIdleThreads()
{
if (_executor instanceof ThreadPoolExecutor)
{
final ThreadPoolExecutor tpe = (ThreadPoolExecutor)_executor;
return tpe.getPoolSize() - tpe.getActiveCount();
}
return -1;
}
public int getThreads()
{
if (_executor instanceof ThreadPoolExecutor)
{
final ThreadPoolExecutor tpe = (ThreadPoolExecutor)_executor;
return tpe.getPoolSize();
}
return -1;
}
public boolean isLowOnThreads()
{
if (_executor instanceof ThreadPoolExecutor)
{
final ThreadPoolExecutor tpe = (ThreadPoolExecutor)_executor;
// getActiveCount() locks the thread pool, so execute it last
return tpe.getPoolSize() == tpe.getMaximumPoolSize() &&
tpe.getQueue().size() >= tpe.getPoolSize() - tpe.getActiveCount();
}
return false;
}
/**
* Description: 通过线程池来处理消息<br>
*
* @author 王伟<br>
* @taskId <br>
* @param channel
* @param message
* @throws InterruptedException <br>
*/
public static void execute(final String channel, final Runnable message) {
synchronized (channel) {
ThreadPoolExecutor executor = executorMap.get(channel);
if (executor == null) {
executor = createThreadPoolExecutor();
executorMap.put(channel, executor);
}
BlockingQueue<Runnable> bq = executor.getQueue();
// 当线程池中的队列出现阻塞后,暂停从redis中进行获取
try {
long count = 0;
while (bq.remainingCapacity() == 0 && executor.getMaximumPoolSize() == executor.getPoolSize()) {
if (count++ % NUM_100 == 0) {
LoggerUtil.debug("wait message[{0}] execute, current pool size is [{1}]", channel, bq.size());
}
Thread.sleep(NUM_100);
}
executor.execute(message);
}
catch (InterruptedException e) {
LoggerUtil.error(e);
}
}
}
private static void logExecutor(final String name, final ThreadPoolExecutor executor) {
final int corePoolSize = executor.getCorePoolSize();
final int poolSize = executor.getPoolSize();
final int activeCount = executor.getActiveCount();
final long taskCount = executor.getTaskCount();
final long completedCount = executor.getCompletedTaskCount();
final boolean isShutdown = executor.isShutdown();
final boolean isTerminated = executor.isTerminated();
Log.v(TAG, name + " CorePoolSize:" + corePoolSize + " PoolSize:" + poolSize);
Log.v(TAG, name + " isShutdown:" + isShutdown + " isTerminated:" + isTerminated);
Log.v(TAG, name + " activeCount:" + activeCount + " taskCount:" + taskCount
+ " completedCount:" + completedCount);
}
@Override
public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
if (executor.getPoolSize() >= executor.getCorePoolSize()) {
throw new RejectedExecutionException(
"Can't accept new asynchronous request. Too many asynchronous jobs in progress");
}
delegate.rejectedExecution(r, executor);
}
/**
* Get the number of idle threads
* @return The number; -1 if not supported
*/
public int getIdleThreads()
{
if (executor instanceof ThreadPoolExecutor)
{
final ThreadPoolExecutor tpe = (ThreadPoolExecutor)executor;
return tpe.getPoolSize() - tpe.getActiveCount();
}
return -1;
}
/**
* Get the number of threads
* @return The number; -1 if not supported
*/
public int getThreads()
{
if (executor instanceof ThreadPoolExecutor)
{
final ThreadPoolExecutor tpe = (ThreadPoolExecutor)executor;
return tpe.getPoolSize();
}
return -1;
}
static void awaitPoolSize(ThreadPoolExecutor pool, int n) {
while (pool.getPoolSize() != n) Thread.yield();
pass();
}
/**
* Core impl.
*
* @param service
* The service to be monitored.
* @param w
* The weight to be used by the {@link MovingAverageTask}s.
*/
public ThreadPoolExecutorBaseStatisticsTask(final ThreadPoolExecutor service,
final double w) {
if (service == null)
throw new IllegalArgumentException();
if (w <= 0d || w >= 1d)
throw new IllegalArgumentException();
this.service = service;
this.startNanos = System.nanoTime();
this.w = w;
queueSizeTask = new MovingAverageTask("queueSize",
new Callable<Integer>() {
public Integer call() {
return service.getQueue().size();
}
}, w);
activeCountTask = new MovingAverageTask("activeCount",
new Callable<Integer>() {
public Integer call() {
return service.getActiveCount();
}
}, w);
queueLengthTask = new MovingAverageTask("queueLength",
new Callable<Integer>() {
public Integer call() {
return service.getQueue().size()
+ service.getActiveCount();
}
}, w);
poolSizeTask = new MovingAverageTask("poolSize",
new Callable<Integer>() {
public Integer call() {
return service.getPoolSize();
}
}, w);
}