下面列出了java.util.concurrent.ThreadPoolExecutor#getActiveCount ( ) 实例代码,或者点击链接到github查看源代码,也可以在右侧发表评论。
/**
* This test verifies that processing a beacon in a scan (which starts its own thread) does not
* affect the size of the available threads in the main Android AsyncTask.THREAD_POOL_EXECUTOR
* @throws Exception
*/
@Test
public void beaconScanCallbackTest() throws Exception {
final ServiceController<BeaconService> beaconServiceServiceController =
Robolectric.buildService(BeaconService.class);
// beaconServiceServiceController.attach();
BeaconService beaconService = beaconServiceServiceController.get();
beaconService.onCreate();
CycledLeScanCallback callback = beaconService.getCycledLeScanCallback();
ThreadPoolExecutor executor = (ThreadPoolExecutor) AsyncTask.THREAD_POOL_EXECUTOR;
int activeThreadCountBeforeScan = executor.getActiveCount();
byte[] scanRecord = new byte[1];
callback.onLeScan(null, -59, scanRecord, 123456L);
int activeThreadCountAfterScan = executor.getActiveCount();
assertEquals("The size of the Android thread pool should be unchanged by beacon scanning",
activeThreadCountBeforeScan, activeThreadCountAfterScan);
// Need to sleep here until the thread in the above method completes, otherwise an exception
// is thrown. Maybe we don't care about this exception, so we could remove this.
Thread.sleep(100);
}
@Override
public void destroy() throws Exception {
for (Map.Entry<String, ThreadPoolExecutor> entry : container.entrySet()) {
ThreadPoolExecutor executor = entry.getValue();
frameLog.info("{} is shutting down now", entry.getKey());
try {
// executor.shutdown();
while (executor.getActiveCount() != 0 && executor.getQueue().size() != 0) {
executor.awaitTermination(1, TimeUnit.SECONDS);
frameLog.info("{} 's activeCount:{}, queueLength:{}",entry.getKey(),executor.getActiveCount(),executor.getQueue().size());
}
} catch (InterruptedException e) {
frameLog.error("shutting ThreadPoolExecutor error :{}", entry.getKey(), e);
throw e;
}
frameLog.info("{} is shutted down", entry.getKey());
}
}
private void print(final ThreadPoolExecutor pool) {
System.out.println("==========================================");
final int activeCount = pool.getActiveCount();
System.out.println("activeCount = " + activeCount);
final int corePoolSize = pool.getCorePoolSize();
System.out.println("corePoolSize = " + corePoolSize);
final int largestPoolSize = pool.getLargestPoolSize();
System.out.println("largestPoolSize = " + largestPoolSize);
final int maximumPoolSize = pool.getMaximumPoolSize();
System.out.println("maximumPoolSize = " + maximumPoolSize);
final int poolSize = pool.getPoolSize();
System.out.println("poolSize = " + poolSize);
final int queueSize = pool.getQueue().size();
System.out.println("queueSize = " + queueSize);
final long taskCount = pool.getTaskCount();
System.out.println("taskCount = " + taskCount);
System.out.println("==========================================");
}
@Test
public void shouldDelegateToClientThread() throws ExecutionException, InterruptedException {
for (int i = 0; i < 50; i++) {
backend.put("myKey", "myVal" + i);
service.getStringInFuture(collector);
}
backend.clear();
final ThreadPoolExecutor executor = this.executor.getThreadPoolExecutor();
while (executor.getQueue().size() > 0 || executor.getActiveCount() > 0) {
Thread.sleep(500L);
}
executor.shutdown();
executor.awaitTermination(1, TimeUnit.MINUTES);
assertThat(backend.copyToMap().entrySet(), is(empty()));
}
public void putExceed(Runnable e, ThreadPoolExecutor executor) {
mExceedQueue.offer(e);
final int activeCount = executor.getActiveCount();
if (activeCount <= 0) {
// In this case( the main queue is waiting for inserting or the active count is less
// than 0), we need to wake up the pool manually with the command from the head of
// exceed-queue.
final Runnable next = mExceedQueue.poll();
if (next != null) {
ThreadPoolLog.d(TAG, "putExceed and activeCount(%d), need to " +
"wake up the pool with next(%s)", activeCount, next);
executor.execute(next);
}
}
}
public void similarity() throws InterruptedException {
System.out.println("相似度");
ThreadPoolExecutor pool = new ThreadPoolExecutor(numThreads, numThreads, 1000,
TimeUnit.SECONDS, new ArrayBlockingQueue<Runnable>(capacity ));
similarityMap = new TIntIntHashMap[docs.size()];
Iterator<Entry<String, TIntArrayList>> iterator =
locationMap.entrySet().iterator();
while(iterator.hasNext()) {
if(pool.getQueue().remainingCapacity()==0){
Thread.sleep(10);
continue;
}
Entry<String, TIntArrayList> entry = iterator.next();
TIntArrayList al = entry.getValue();
CalcSimilarity cs = new CalcSimilarity(al);
pool.execute(cs);
}
while(pool.getActiveCount()>0){
Thread.sleep(10);
}
pool.shutdown();
}
@Override
public Status check() {
DataStore dataStore = ExtensionLoader.getExtensionLoader(DataStore.class).getDefaultExtension();
Map<String, Object> executors = dataStore.get(Constants.EXECUTOR_SERVICE_COMPONENT_KEY);
StringBuilder msg = new StringBuilder();
Status.Level level = Status.Level.OK;
for (Map.Entry<String, Object> entry : executors.entrySet()) {
String port = entry.getKey();
ExecutorService executor = (ExecutorService) entry.getValue();
if (executor != null && executor instanceof ThreadPoolExecutor) {
ThreadPoolExecutor tp = (ThreadPoolExecutor) executor;
boolean ok = tp.getActiveCount() < tp.getMaximumPoolSize() - 1;
Status.Level lvl = Status.Level.OK;
if (!ok) {
level = Status.Level.WARN;
lvl = Status.Level.WARN;
}
if (msg.length() > 0) {
msg.append(";");
}
msg.append("Pool status:" + lvl
+ ", max:" + tp.getMaximumPoolSize()
+ ", core:" + tp.getCorePoolSize()
+ ", largest:" + tp.getLargestPoolSize()
+ ", active:" + tp.getActiveCount()
+ ", task:" + tp.getTaskCount()
+ ", service port: " + port);
}
}
return msg.length() == 0 ? new Status(Status.Level.UNKNOWN) : new Status(level, msg.toString());
}
ExecutorStateEvent(ExecutorService executor, LeaseCoordinator leaseCoordinator) {
if (executor instanceof ThreadPoolExecutor) {
this.isThreadPoolExecutor = true;
ThreadPoolExecutor ex = (ThreadPoolExecutor) executor;
this.executorName = ex.getClass().getSimpleName();
this.currentQueueSize = ex.getQueue().size();
this.activeThreads = ex.getActiveCount();
this.coreThreads = ex.getCorePoolSize();
this.largestPoolSize = ex.getLargestPoolSize();
this.maximumPoolSize = ex.getMaximumPoolSize();
}
this.leasesOwned = leaseCoordinator.getAssignments().size();
}
private boolean completedBlockingTasks() {
if (!(blockingTaskExecutor instanceof ThreadPoolExecutor)) {
// Cannot determine if there's a blocking task.
return true;
}
final ThreadPoolExecutor threadPool = (ThreadPoolExecutor) blockingTaskExecutor;
return threadPool.getQueue().isEmpty() && threadPool.getActiveCount() == 0;
}
public int activeCount() {
int size = 0;
for (ThreadPoolExecutor singleThreadExecutor : map.values()) {
//todo 这样循环迭代计算size之和的方式是否可以改为读取实时更新的缓存值?
size += singleThreadExecutor.getActiveCount();
}
return size;
}
@Override
protected Result check() throws Exception {
DataStore dataStore = ExtensionLoader.getExtensionLoader(DataStore.class).getDefaultExtension();
Map<String, Object> executors = dataStore.get(Constants.EXECUTOR_SERVICE_COMPONENT_KEY);
for (Map.Entry<String, Object> entry : executors.entrySet()) {
ExecutorService executor = (ExecutorService) entry.getValue();
if (executor != null && executor instanceof ThreadPoolExecutor) {
ThreadPoolExecutor tp = (ThreadPoolExecutor) executor;
int activeCount = tp.getActiveCount();
int maximumPoolSize = tp.getMaximumPoolSize();
boolean ok = maximumPoolSize - activeCount > 1;
if (ok) {
return Result.healthy();
} else {
int remainingCapacity = tp.getQueue().remainingCapacity();
ok = remainingCapacity > 1;
if (ok) {
return Result.healthy();
} else {
return Result.unhealthy("maximumPoolSize:%s,activeCount:%s,remainingCapacity:%s", maximumPoolSize, activeCount,
remainingCapacity);
}
}
}
}
return Result.healthy();
}
public Status check() {
DataStore dataStore = ExtensionLoader.getExtensionLoader(DataStore.class).getDefaultExtension();
Map<String, Object> executors = dataStore.get(Constants.EXECUTOR_SERVICE_COMPONENT_KEY);
StringBuilder msg = new StringBuilder();
Status.Level level = Status.Level.OK;
for(Map.Entry<String, Object> entry : executors.entrySet()) {
String port = entry.getKey();
ExecutorService executor = (ExecutorService) entry.getValue();
if (executor != null && executor instanceof ThreadPoolExecutor) {
ThreadPoolExecutor tp = (ThreadPoolExecutor) executor;
boolean ok = tp.getActiveCount() < tp.getMaximumPoolSize() - 1;
Status.Level lvl = Status.Level.OK;
if(!ok) {
level = Status.Level.WARN;
lvl = Status.Level.WARN;
}
if(msg.length() > 0) {
msg.append(";");
}
msg.append("Pool status:" + lvl
+ ", max:" + tp.getMaximumPoolSize()
+ ", core:" + tp.getCorePoolSize()
+ ", largest:" + tp.getLargestPoolSize()
+ ", active:" + tp.getActiveCount()
+ ", task:" + tp.getTaskCount()
+ ", service port: " + port);
}
}
return msg.length() == 0 ? new Status(Status.Level.UNKNOWN) : new Status(level, msg.toString());
}
public Status check() {
DataStore dataStore = ExtensionLoader.getExtensionLoader(DataStore.class).getDefaultExtension();
Map<String, Object> executors = dataStore.get(Constants.EXECUTOR_SERVICE_COMPONENT_KEY);
StringBuilder msg = new StringBuilder();
Status.Level level = Status.Level.OK;
for(Map.Entry<String, Object> entry : executors.entrySet()) {
String port = entry.getKey();
ExecutorService executor = (ExecutorService) entry.getValue();
if (executor != null && executor instanceof ThreadPoolExecutor) {
ThreadPoolExecutor tp = (ThreadPoolExecutor) executor;
boolean ok = tp.getActiveCount() < tp.getMaximumPoolSize() - 1;
Status.Level lvl = Status.Level.OK;
if(!ok) {
level = Status.Level.WARN;
lvl = Status.Level.WARN;
}
if(msg.length() > 0) {
msg.append(";");
}
msg.append("Pool status:" + lvl
+ ", max:" + tp.getMaximumPoolSize()
+ ", core:" + tp.getCorePoolSize()
+ ", largest:" + tp.getLargestPoolSize()
+ ", active:" + tp.getActiveCount()
+ ", task:" + tp.getTaskCount()
+ ", service port: " + port);
}
}
return msg.length() == 0 ? new Status(Status.Level.UNKNOWN) : new Status(level, msg.toString());
}
/**
* Get the number of idle threads
* @return The number; -1 if not supported
*/
public int getIdleThreads()
{
if (executor instanceof ThreadPoolExecutor)
{
final ThreadPoolExecutor tpe = (ThreadPoolExecutor)executor;
return tpe.getPoolSize() - tpe.getActiveCount();
}
return -1;
}
public Status check() {
DataStore dataStore = ExtensionLoader.getExtensionLoader(DataStore.class).getDefaultExtension();
Map<String, Object> executors = dataStore.get(Constants.EXECUTOR_SERVICE_COMPONENT_KEY);
StringBuilder msg = new StringBuilder();
Status.Level level = Status.Level.OK;
for(Map.Entry<String, Object> entry : executors.entrySet()) {
String port = entry.getKey();
ExecutorService executor = (ExecutorService) entry.getValue();
if (executor != null && executor instanceof ThreadPoolExecutor) {
ThreadPoolExecutor tp = (ThreadPoolExecutor) executor;
boolean ok = tp.getActiveCount() < tp.getMaximumPoolSize() - 1;
Status.Level lvl = Status.Level.OK;
if(!ok) {
level = Status.Level.WARN;
lvl = Status.Level.WARN;
}
if(msg.length() > 0) {
msg.append(";");
}
msg.append("Pool status:" + lvl
+ ", max:" + tp.getMaximumPoolSize()
+ ", core:" + tp.getCorePoolSize()
+ ", largest:" + tp.getLargestPoolSize()
+ ", active:" + tp.getActiveCount()
+ ", task:" + tp.getTaskCount()
+ ", service port: " + port);
}
}
return msg.length() == 0 ? new Status(Status.Level.UNKNOWN) : new Status(level, msg.toString());
}
@Override
public void onApplicationEvent(ContextClosedEvent event) {
this.connector.pause();
Executor executor = this.connector.getProtocolHandler().getExecutor();
if (executor instanceof ThreadPoolExecutor) {
try {
ThreadPoolExecutor threadPoolExecutor = (ThreadPoolExecutor) executor;
int nfsActiveRequestCount = threadPoolExecutor.getActiveCount();
while (nfsActiveRequestCount > 0) {
nfsActiveRequestCount = threadPoolExecutor.getActiveCount();
System.err.println("NFS Active Request Count in while: " + nfsActiveRequestCount);
try {
Thread.sleep(1000); // wait for 1s
} catch (InterruptedException e) {
e.printStackTrace();
}
}
threadPoolExecutor.shutdown();
if (!threadPoolExecutor.awaitTermination(shutdownTimeout, TimeUnit.SECONDS)) {
System.err.println(
"NFS Server thread pool did not shut down gracefully within "
+ shutdownTimeout
+ " seconds. Proceeding with forceful shutdown");
threadPoolExecutor.shutdownNow();
if (!threadPoolExecutor.awaitTermination(shutdownTimeout, TimeUnit.SECONDS)) {
System.err.println("NFS Server thread pool did not terminate");
}
} else {
System.err.println("*** NFS Server Shutdown ***");
}
} catch (InterruptedException ex) {
Thread.currentThread().interrupt();
}
}
}
public int getIdleThreads()
{
if (_executor instanceof ThreadPoolExecutor)
{
final ThreadPoolExecutor tpe = (ThreadPoolExecutor)_executor;
return tpe.getPoolSize() - tpe.getActiveCount();
}
return -1;
}
/**
* @return {@code true} if the thread pool backing the (optional) executor service is inactive
*/
private boolean isCalmPeriod(@Nullable final NexusExecutorService executorService) {
if (executorService != null) {
ThreadPoolExecutor threadPool = (ThreadPoolExecutor) executorService.getTargetExecutorService();
return threadPool.getQueue().isEmpty() && threadPool.getActiveCount() == 0;
}
else {
return true; // service not enabled, consider as calm
}
}
private void logMessageIfThereAreStillActiveThreads(ThreadPoolExecutor executor) {
if (executor.getActiveCount() > 0) {
LOG.warn("{} thread(s) still active, force shutdown", executor.getActiveCount());
}
}
/**
* @param node node which query pool to check.
* @return {@code True} if {@link GridIoPolicy#QUERY_POOL} is empty. This means no queries are currntly executed and
* no queries are executed at the moment; {@code false} otherwise.
*/
private boolean queryPoolIsEmpty(IgniteEx node) {
ThreadPoolExecutor qryPool = (ThreadPoolExecutor)node.context().getQueryExecutorService();
return qryPool.getQueue().isEmpty() && qryPool.getActiveCount() == 0;
}