下面列出了java.util.concurrent.ThreadPoolExecutor#isTerminated ( ) 实例代码,或者点击链接到github查看源代码,也可以在右侧发表评论。
public static void main(String[] args) throws InterruptedException {
int corePoolSize = 5;
int maximumPoolSize = 10;
int queueCapacity = 100;
ThreadPoolExecutor executor = new ThreadPoolExecutor(corePoolSize, // Số corePoolSize
maximumPoolSize, // số maximumPoolSize
10, // thời gian một thread được sống nếu không làm gì
TimeUnit.SECONDS,
new ArrayBlockingQueue<>(queueCapacity)); // Blocking queue để cho request đợi
// 1000 request đến dồn dập, liền 1 phát, không nghỉ
for (int i = 0; i < 1000; i++) {
executor.execute(new RequestHandler("request-" + i));
Thread.sleep(50);
}
// executor.shutdown(); // Không cho threadpool nhận thêm nhiệm vụ nào nữa
while (!executor.isTerminated()) {
// Chờ xử lý hết các request còn chờ trong Queue ...
}
}
/**
* Waits for all active threads in the thread pool to complete.
*
* @param log
* @param executor
* @param type
* @param poolSize
* @param workUnits
* @param start
* @return time taken to complete all tasks
*/
public static long waitForThreads(Logger log, ThreadPoolExecutor executor, String type, int poolSize, long workUnits, long start) {
long cur = System.currentTimeMillis();
int active = executor.getActiveCount();
int qSize = executor.getQueue().size();
long compl = executor.getCompletedTaskCount();
long time = 0;
while (((qSize > 0) || (active > 0) || (compl < workUnits)) && !executor.isTerminated()) {
if (log != null && (time < (System.currentTimeMillis() - (1000L * 10L)))) {
log.info(type + " running, T: " + active + "/" + poolSize + ", Completed: " + compl + "/" + workUnits + ", " + ", Remaining: " + qSize + ", "
+ (cur - start) + " ms elapsed");
time = System.currentTimeMillis();
}
cur = System.currentTimeMillis();
active = executor.getActiveCount();
qSize = executor.getQueue().size();
compl = executor.getCompletedTaskCount();
}
if (log != null) {
log.info("Finished Waiting for " + type + " running, T: " + active + "/" + poolSize + ", Completed: " + compl + "/" + workUnits + ", "
+ ", Remaining: " + qSize + ", " + (cur - start) + " ms elapsed");
}
long stop = System.currentTimeMillis();
return (stop - start);
}
public static void main(String[] args) {
//使用阿里巴巴推荐的创建线程池的方式
//通过ThreadPoolExecutor构造函数自定义参数创建
ThreadPoolExecutor executor = new ThreadPoolExecutor(
CORE_POOL_SIZE,
MAX_POOL_SIZE,
KEEP_ALIVE_TIME,
TimeUnit.SECONDS,
new ArrayBlockingQueue<>(QUEUE_CAPACITY),
new ThreadPoolExecutor.CallerRunsPolicy());
for (int i = 0; i < 10; i++) {
//创建WorkerThread对象(WorkerThread类实现了Runnable 接口)
Runnable worker = new MyRunnable("" + i);
//执行Runnable
executor.execute(worker);
}
//终止线程池
executor.shutdown();
while (!executor.isTerminated()) {
}
System.out.println("Finished all threads");
}
private void migrateData() throws SQLException{
executor = new ThreadPoolExecutor(margs.getThreadCount(), margs.getThreadCount(),
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>(),new ThreadPoolExecutor.CallerRunsPolicy());
for(TableMigrateInfo table:migrateTables){
if(!table.isError()){ //忽略已出错的拆分表
List<DataNodeMigrateInfo> detailList = table.getDataNodesDetail();
for(DataNodeMigrateInfo info:detailList){
LOGGER.info("{}",info);
executor.execute(new DataMigrateRunner(table, info.getSrc(), info.getTarget(), table.getTableName(), info.getTempFile()));
}
}
}
executor.shutdown();
while(true){
if(executor.isTerminated()){
break;
}
try {
Thread.sleep(200);
} catch (InterruptedException e) {
LOGGER.error("error",e);
}
}
}
private boolean waitTermination(ThreadPoolExecutor executor) throws InterruptedException {
while (true) {
if (executor.isTerminated()) {
return true;
}
if (isCancelled()) {
executor.shutdownNow();
progressPane.changeLabel(this, task.getTitle() + " (Canceling)… ");
progressPane.changeIndeterminate(this, true);
// force termination
executor.awaitTermination(5, TimeUnit.SECONDS);
return false;
}
setProgress(calcProgress(executor.getCompletedTaskCount()));
Thread.sleep(500);
}
}
@Override
public void customize(Connector connector) {
if (connector == null) {
log.info("We are running unit test");
return;
}
final Executor executor = connector.getProtocolHandler().getExecutor();
if (executor instanceof ThreadPoolExecutor) {
log.info("executor is ThreadPoolExecutor");
final ThreadPoolExecutor threadPoolExecutor = (ThreadPoolExecutor) executor;
if (threadPoolExecutor.isTerminated()) {
log.info("thread pool executor has terminated");
} else {
LocalDateTime startShutdown = LocalDateTime.now();
LocalDateTime stopShutdown = LocalDateTime.now();
try {
threadPoolExecutor.shutdown();
if (!threadPoolExecutor
.awaitTermination(tomcatGracefulShutdownProperties.getWaitTime(), TimeUnit.SECONDS)) {
log.warn("Tomcat thread pool did not shut down gracefully within"
+ tomcatGracefulShutdownProperties.getWaitTime() + " second(s). Proceeding with force shutdown");
threadPoolExecutor.shutdownNow();
} else {
log.info("Tomcat thread pool is empty,we stop now");
}
} catch (Exception e) {
log.error("The await termination has been interrupted : " + e.getMessage());
Thread.currentThread().interrupt();
;
} finally {
final long seconds = Duration.between(startShutdown, stopShutdown).getSeconds();
log.info("Shutdown performed in " + seconds + " second(s)");
}
}
}
}
/*******************
* Enumeration mode main function.
******************/
public void run() {
ThreadPoolExecutor tpe = (ThreadPoolExecutor)Executors.newFixedThreadPool(this._opts.getThreadCount());
ArrayList<TCPEndpoint> targets = this._opts.getTargets();
RMIEnumerator rmie = new RMIEnumerator(this._opts);
//Initialise the list of known attacks with the current program options
RMIAttackFactory.setProgramOptions(this._opts);
//Status
System.out.println("Scanning " + targets.size() + " target(s) for objects exposed via an RMI registry...");
System.out.println("");
//Pass all tasks to the thread pool executor
for(TCPEndpoint t: targets) {
tpe.execute(new EnumerationTask(t, rmie, this._opts));
}
//Shutdown the thread pool and wait for threads to finish executing
tpe.shutdown();
while(tpe.isTerminated() == false) { }
//Done
System.out.println("Successfully scanned " + targets.size() + " target(s) for objects exposed via RMI.");
//Clean up all attacks (e.g. stop proxies that were started to enumerate endpoints)
RMIAttackFactory.cleanUp();
}
@Test
public void testPoolSize() {
ThreadPoolExecutor executorService
= new ThreadPoolExecutor(2, 4, 1L,
TimeUnit.MINUTES, new LinkedBlockingQueue<>(6));
for (int i = 0; i < 10; i++) {
executorService.execute(new Task("Task-" + i));
}
executorService.shutdown();
while (!executorService.isTerminated()) {
}
System.out.println("Finish all thread...");
}
@Test
public void shouldCreateGenericCollection() throws ExecutionException, InterruptedException {
List<PojoClass> pojoClasses = new ArrayList<PojoClass>(2);
pojoClasses.add(PojoClassFactory.getPojoClass(AClassWithGenericCollection.class));
pojoClasses.add(PojoClassFactory.getPojoClass(AClassWithExhaustiveCollection.class));
Assert.assertEquals(2, pojoClasses.size());
RejectedExecutionHandlerImpl rejectionHandler = new RejectedExecutionHandlerImpl();
ThreadPoolExecutor executorPool = new ThreadPoolExecutor(ttl_jobs / per_thread, ttl_jobs / per_thread, 10,
TimeUnit.SECONDS,
new ArrayBlockingQueue<Runnable>(ttl_jobs),
Executors.defaultThreadFactory(), rejectionHandler);
List<Future> tasksStatus = new ArrayList<Future>(ttl_jobs);
for (int i = 0; i < ttl_jobs; i++) {
tasksStatus.add(executorPool.submit(new Verify(pojoClasses)));
}
executorPool.shutdown();
while (!executorPool.isTerminated())
try {
Thread.sleep(200);
} catch (InterruptedException ignored) {
}
Assert.assertEquals(ttl_jobs, executorPool.getCompletedTaskCount());
Assert.assertEquals(0, rejectionHandler.getCount());
for (Future f : tasksStatus) {
Assert.assertNull(f.get());
}
}
/**
* Test 'rsh' mode server.
*/
@Test
public void testRshServer() {
try{
ThreadPoolExecutor executor = (ThreadPoolExecutor) Executors.newCachedThreadPool();
//adjust number of threads as needed for testing
for (int i = 1; i <= 2; i++)
{
String depotPath = "//depot/...";
SyncDepot task = new SyncDepot(depotPath);
System.out.println("A new task has been added to sync : " + depotPath);
executor.execute(task);
}
executor.shutdown();
while (!executor.isTerminated()) {
System.out.println("Threads are still running...");
Thread.sleep(1000);
}
System.out.println("Finished all threads");
} catch (Exception exc) {
fail("Unexpected exception: " + exc.getLocalizedMessage());
}
}
/**
* Test 'rsh' mode server.
*/
@Test
public void testRshServer() {
try{
ThreadPoolExecutor executor = (ThreadPoolExecutor) Executors.newCachedThreadPool();
//adjust number of threads as needed for testing
for (int i = 1; i <= 2; i++)
{
String depotPath = "//depot/...";
SyncDepot task = new SyncDepot(depotPath);
System.out.println("A new task has been added to sync : " + depotPath);
executor.execute(task);
}
executor.shutdown();
while (!executor.isTerminated()) {
System.out.println("Threads are still running...");
Thread.sleep(1000);
}
System.out.println("Finished all threads");
} catch (Exception exc) {
fail("Unexpected exception: " + exc.getLocalizedMessage());
}
}
/**
* Test 'rsh' mode server.
*/
@Test
public void testRshServer() {
try{
ThreadPoolExecutor executor = (ThreadPoolExecutor) Executors.newCachedThreadPool();
//adjust number of threads as needed for testing
for (int i = 1; i <= 2; i++)
{
String depotPath = "//depot/...";
SyncDepot task = new SyncDepot(depotPath);
System.out.println("A new task has been added to sync : " + depotPath);
executor.execute(task);
}
executor.shutdown();
while (!executor.isTerminated()) {
System.out.println("Threads are still running...");
Thread.sleep(1000);
}
System.out.println("Finished all threads");
} catch (Exception exc) {
fail("Unexpected exception: " + exc.getLocalizedMessage());
}
}
/**
* Test 'rsh' mode server.
*/
@Test
public void testRshServer() {
try{
ThreadPoolExecutor executor = (ThreadPoolExecutor) Executors.newCachedThreadPool();
//adjust number of threads as needed for testing
for (int i = 1; i <= 2; i++)
{
String depotPath = "//depot/...";
SyncDepot task = new SyncDepot(depotPath);
System.out.println("A new task has been added to sync : " + depotPath);
executor.execute(task);
}
executor.shutdown();
while (!executor.isTerminated()) {
System.out.println("Threads are still running...");
Thread.sleep(1000);
}
System.out.println("Finished all threads");
} catch (Exception exc) {
fail("Unexpected exception: " + exc.getLocalizedMessage());
}
}
private static void logExecutor(final String name, final ThreadPoolExecutor executor) {
final int corePoolSize = executor.getCorePoolSize();
final int poolSize = executor.getPoolSize();
final int activeCount = executor.getActiveCount();
final long taskCount = executor.getTaskCount();
final long completedCount = executor.getCompletedTaskCount();
final boolean isShutdown = executor.isShutdown();
final boolean isTerminated = executor.isTerminated();
Log.v(TAG, name + " CorePoolSize:" + corePoolSize + " PoolSize:" + poolSize);
Log.v(TAG, name + " isShutdown:" + isShutdown + " isTerminated:" + isTerminated);
Log.v(TAG, name + " activeCount:" + activeCount + " taskCount:" + taskCount
+ " completedCount:" + completedCount);
}
@Override
public void rejectedExecution(final Runnable r, final ThreadPoolExecutor tpe) {
if (null == r || null == tpe || tpe.isShutdown() || tpe.isTerminated() || tpe.isTerminating()) {
return;
}
try {
if (!tpe.getQueue().offer(r, timeout, seconds)) {
throw new RejectedExecutionException("Timeout waiting for executor slot: waited " + timeout + " " + seconds.toString().toLowerCase());
}
} catch (final InterruptedException e) {
throw new RejectedExecutionException("Interrupted waiting for executor slot");
}
}
private void shutdown(ThreadPoolExecutor executor) {
executor.shutdown();
while(!executor.isTerminated()) {
try {
executor.awaitTermination(1, TimeUnit.DAYS);
} catch (final InterruptedException e) { }
}
}
@Override
public String getFansListTask(String next_openid, String jwid) {
//获取微信公众账号的关注粉丝(同步openid)
String returnMsg;
int total=0;
try {
returnMsg = "粉丝同步成功,同步粉丝条数:";
//获取token
String accessToken=WeiXinHttpUtil.getRedisWeixinToken(jwid);
if(StringUtils.isNotEmpty(accessToken)){
//多线程处理数据
ThreadPoolExecutor executor = new ThreadPoolExecutor(10,10,1,TimeUnit.SECONDS,new LinkedBlockingQueue());
List<Future<Boolean>> futures = new ArrayList<Future<Boolean>>(2000);
int k=0;
//获取粉丝列表信息
String requestUrl=user_List_url.replace("NEXT_OPENID", "").replace("ACCESS_TOKEN", accessToken);
while(oConvertUtils.isNotEmpty(next_openid) && k<2000){
k++;
//调用接口获取粉丝列表(一次最多拉取10000)
JSONObject jsonObj=WeixinUtil.httpRequest(requestUrl, "GET", "");
next_openid=null; //防止死循环
if(jsonObj==null){
continue;
}
if(!jsonObj.containsKey("errmsg")){
total = jsonObj.getInt("total");
int count=jsonObj.getInt("count");
if(count!=0){
//获取拉取的粉丝的openid
JSONArray openIdArr = jsonObj.getJSONObject("data").getJSONArray("openid");
//将粉丝信息存到数据库
futures.add(executor.submit(new SyncFansInfo(jwid,openIdArr)));
}
next_openid = jsonObj.getString("next_openid");
//使用next_openid继续获取下一页粉丝数据[循环]
//update-begin--Author:zhangweijian Date:20181015 for:同步粉丝问题
requestUrl=user_List_url.replace("ACCESS_TOKEN",accessToken).replace("NEXT_OPENID", next_openid);
//update-end--Author:zhangweijian Date:20181015 for:同步粉丝问题
}
}
executor.shutdown();
//update-begin-zhangweijian-----Date:20180809---for:线程池结束判断
while (true) {
if (executor.isTerminated()) {
break;
}
Thread.sleep(200);
}
//update-end-zhangweijian-----Date:20180809---for:线程池结束判断
}
} catch (Exception e) {
e.printStackTrace();
}
return "同步任务已启动,请稍候刷新。公众号粉丝总数:"+total;
}
@Override
public void rejectedExecution(final Runnable runnable, final ThreadPoolExecutor executor) {
if (!executor.isShutdown() && !executor.isTerminated())
throw new IllegalStateException("Message cannot be sent due to current system resource limitations.");
}