下面列出了java.util.concurrent.ThreadPoolExecutor#shutdownNow ( ) 实例代码,或者点击链接到github查看源代码,也可以在右侧发表评论。
/**
* Shuts down the executor(s).
* Gracefully waits for {@link #WAIT_FOR_JOBS_SHUTDOWN_TIMEOUT} seconds for all jobs to finish
* before forcefully shutting them down.
*/
public static void shutdown() {
if (instance != null && instance.executor != null) {
ThreadPoolExecutor pool = instance.executor;
instance.executor = null;
pool.shutdown(); // Disable new tasks from being submitted
try {
// Wait a while for existing tasks to terminate
if (!pool.awaitTermination(WAIT_FOR_JOBS_SHUTDOWN_TIMEOUT, TimeUnit.SECONDS)) {
pool.shutdownNow(); // Cancel currently executing tasks
// Wait a while for tasks to respond to being cancelled
if (!pool.awaitTermination(WAIT_FOR_JOBS_SHUTDOWN_TIMEOUT, TimeUnit.SECONDS)) {
logger.error("Pool did not terminate");
}
}
} catch (InterruptedException ie) {
// (Re-)Cancel if current thread also interrupted
pool.shutdownNow();
// Preserve interrupt status
Thread.currentThread().interrupt();
}
}
}
private static void destroyPool(String alias, ThreadPoolExecutor executor) {
List<Runnable> tasks = executor.shutdownNow();
if (!tasks.isEmpty()) {
LOGGER.warn("Tasks remaining in pool '{}' at shutdown: {}", alias, tasks);
}
boolean interrupted = false;
try {
while (true) {
try {
if (executor.awaitTermination(30, SECONDS)) {
return;
} else {
LOGGER.warn("Still waiting for termination of pool '{}'", alias);
}
} catch (InterruptedException e) {
interrupted = true;
}
}
} finally {
if (interrupted) {
Thread.currentThread().interrupt();
}
}
}
/**
* Description: 关闭一个连接池,等待已有任务完成
*
* @param executor 被关闭线程池对象
* @return void
* @author fanxb
* @date 2018/10/12 13:45
*/
public static void shutdown(ThreadPoolExecutor executor) {
if (executor == null) {
return;
}
executor.shutdown();
try {
int count = 0;
int timeOut = 2;
while (executor.awaitTermination(timeOut, TimeUnit.SECONDS)) {
count++;
if (count == 100) {
executor.shutdownNow();
break;
}
}
} catch (Exception e) {
e.printStackTrace();
}
}
private boolean waitTermination(ThreadPoolExecutor executor) throws InterruptedException {
while (true) {
if (executor.isTerminated()) {
return true;
}
if (isCancelled()) {
executor.shutdownNow();
progressPane.changeLabel(this, task.getTitle() + " (Canceling)… ");
progressPane.changeIndeterminate(this, true);
// force termination
executor.awaitTermination(5, TimeUnit.SECONDS);
return false;
}
setProgress(calcProgress(executor.getCompletedTaskCount()));
Thread.sleep(500);
}
}
@Override
public CloseableExecutor newExecutor(Target target, String name) {
final ThreadPoolExecutor executor = new ThreadPoolExecutor(
coreWorkers(target),
maxWorkers(target),
120L,
TimeUnit.SECONDS,
workQueue(target),
threadFactory(name),
createRejectedPolicy(target, name, new RejectedTaskPolicyWithReport(name, "jupiter")));
return new CloseableExecutor() {
@Override
public void execute(Runnable task) {
executor.execute(task);
}
@Override
public void shutdown() {
logger.warn("ThreadPoolExecutorFactory#{} shutdown.", executor);
executor.shutdownNow();
}
};
}
/**
* Closes the handler.
*
* @param join if the method should wait for the thread to finish before returning.
*/
public void shutdown(boolean join) {
ThreadPoolExecutor pool = executor;
executor = null;
pool.shutdown(); // Disable new tasks from being submitted
if (join) {
try {
// Wait a while for existing tasks to terminate
if (!pool.awaitTermination(WAIT_FOR_JOBS_SHUTDOWN_TIMEOUT, TimeUnit.SECONDS)) {
pool.shutdownNow(); // Cancel currently executing tasks
// Wait a while for tasks to respond to being cancelled
if (!pool.awaitTermination(WAIT_FOR_JOBS_SHUTDOWN_TIMEOUT, TimeUnit.SECONDS)) {
logger.error("Pool did not terminate");
}
}
} catch (InterruptedException ie) {
// (Re-)Cancel if current thread also interrupted
pool.shutdownNow();
// Preserve interrupt status
Thread.currentThread().interrupt();
}
}
}
@Override
public void customize(Connector connector) {
if (connector == null) {
log.info("We are running unit test");
return;
}
final Executor executor = connector.getProtocolHandler().getExecutor();
if (executor instanceof ThreadPoolExecutor) {
log.info("executor is ThreadPoolExecutor");
final ThreadPoolExecutor threadPoolExecutor = (ThreadPoolExecutor) executor;
if (threadPoolExecutor.isTerminated()) {
log.info("thread pool executor has terminated");
} else {
LocalDateTime startShutdown = LocalDateTime.now();
LocalDateTime stopShutdown = LocalDateTime.now();
try {
threadPoolExecutor.shutdown();
if (!threadPoolExecutor
.awaitTermination(tomcatGracefulShutdownProperties.getWaitTime(), TimeUnit.SECONDS)) {
log.warn("Tomcat thread pool did not shut down gracefully within"
+ tomcatGracefulShutdownProperties.getWaitTime() + " second(s). Proceeding with force shutdown");
threadPoolExecutor.shutdownNow();
} else {
log.info("Tomcat thread pool is empty,we stop now");
}
} catch (Exception e) {
log.error("The await termination has been interrupted : " + e.getMessage());
Thread.currentThread().interrupt();
;
} finally {
final long seconds = Duration.between(startShutdown, stopShutdown).getSeconds();
log.info("Shutdown performed in " + seconds + " second(s)");
}
}
}
}
@After
@SuppressWarnings("unchecked")
public void clearUp() throws Exception {
ThreadPoolGovernor.stopSchedule();
Field f = ThreadPoolGovernor.class.getDeclaredField("registry");
f.setAccessible(true);
Map<String, ThreadPoolExecutor> registry = (Map<String, ThreadPoolExecutor>) f.get(null);
for (ThreadPoolExecutor executor : registry.values()) {
executor.shutdownNow();
}
registry.clear();
}
private void shutdownPoolExecutor(ThreadPoolExecutor old) {
try {
old.awaitTermination(5, TimeUnit.MINUTES);
old.shutdown();
} catch (InterruptedException e) {
old.shutdownNow();
LOGGER.error("Shutdown Zuul Thread Pool:", e);
}
}
@Deployment
public void testConcurrentUsage() throws Exception {
if (!processEngineConfiguration.getDatabaseType().equals("h2") && !processEngineConfiguration.getDatabaseType().equals("db2")) {
int numberOfThreads = 5;
int numberOfProcessesPerThread = 5;
int totalNumberOfTasks = 2 * numberOfThreads * numberOfProcessesPerThread;
ThreadPoolExecutor executor = new ThreadPoolExecutor(10, 10, 1000, TimeUnit.MILLISECONDS, new ArrayBlockingQueue<Runnable>(numberOfThreads));
for (int i = 0; i < numberOfThreads; i++) {
executor.execute(new ConcurrentProcessRunnerRunnable(numberOfProcessesPerThread, "kermit" + i));
}
// Wait for termination or timeout and check if all tasks are
// complete
executor.shutdown();
boolean isEnded = executor.awaitTermination(20000, TimeUnit.MILLISECONDS);
if (!isEnded) {
log.error("Executor was not shut down after timeout, not al tasks have been executed");
executor.shutdownNow();
}
assertEquals(0, executor.getActiveCount());
// Check there are no processes active anymore
assertEquals(0, runtimeService.createProcessInstanceQuery().count());
if (processEngineConfiguration.getHistoryLevel().isAtLeast(HistoryLevel.ACTIVITY)) {
// Check if all processes and tasks are complete
assertEquals(numberOfProcessesPerThread * numberOfThreads, historyService.createHistoricProcessInstanceQuery().finished().count());
assertEquals(totalNumberOfTasks, historyService.createHistoricTaskInstanceQuery().finished().count());
}
}
}
private void waitFor(ThreadPoolExecutor t, String name) {
boolean done = false;
while (!done) {
try {
done = t.awaitTermination(60, TimeUnit.SECONDS);
LOG.info("Waiting for " + name + " to finish...");
if (!done) {
t.shutdownNow();
}
} catch (InterruptedException ie) {
LOG.warn("Interrupted waiting for " + name + " to finish...");
t.shutdownNow();
}
}
}
/**
* shutdownNow returns a list containing tasks that were not run,
* and those tasks are drained from the queue
*/
public void testShutdownNow() throws InterruptedException {
final int poolSize = 2;
final int count = 5;
final AtomicInteger ran = new AtomicInteger(0);
final ThreadPoolExecutor p =
new ThreadPoolExecutor(poolSize, poolSize,
LONG_DELAY_MS, MILLISECONDS,
new ArrayBlockingQueue<Runnable>(10));
final CountDownLatch threadsStarted = new CountDownLatch(poolSize);
Runnable waiter = new CheckedRunnable() { public void realRun() {
threadsStarted.countDown();
try {
MILLISECONDS.sleep(2 * LONG_DELAY_MS);
} catch (InterruptedException success) {}
ran.getAndIncrement();
}};
for (int i = 0; i < count; i++)
p.execute(waiter);
await(threadsStarted);
assertEquals(poolSize, p.getActiveCount());
assertEquals(0, p.getCompletedTaskCount());
final List<Runnable> queuedTasks;
try {
queuedTasks = p.shutdownNow();
} catch (SecurityException ok) {
return; // Allowed in case test doesn't have privs
}
assertTrue(p.isShutdown());
assertTrue(p.getQueue().isEmpty());
assertEquals(count - poolSize, queuedTasks.size());
assertTrue(p.awaitTermination(LONG_DELAY_MS, MILLISECONDS));
assertTrue(p.isTerminated());
assertEquals(poolSize, ran.get());
assertEquals(poolSize, p.getCompletedTaskCount());
}
private void shutdownPool(ThreadPoolExecutor pool) {
// All the regions have already been closed
// so this pool shouldn't be doing anything.
List<Runnable> l = pool.shutdownNow();
for (Runnable runnable : l) {
if (l instanceof DiskStoreTask) {
((DiskStoreTask) l).taskCancelled();
}
}
}
@Deployment
public void testConcurrentUsage() throws Exception {
if (!processEngineConfiguration.getDatabaseType().equals("h2") && !processEngineConfiguration.getDatabaseType().equals("db2")) {
int numberOfThreads = 5;
int numberOfProcessesPerThread = 5;
int totalNumberOfTasks = 2 * numberOfThreads * numberOfProcessesPerThread;
ThreadPoolExecutor executor = new ThreadPoolExecutor(10, 10, 1000, TimeUnit.MILLISECONDS, new ArrayBlockingQueue<Runnable>(numberOfThreads));
for (int i = 0; i < numberOfThreads; i++) {
executor.execute(new ConcurrentProcessRunnerRunnable(numberOfProcessesPerThread, "kermit" + i));
}
// Wait for termination or timeout and check if all tasks are complete
executor.shutdown();
boolean isEnded = executor.awaitTermination(20000, TimeUnit.MILLISECONDS);
if (!isEnded) {
LOGGER.error("Executor was not shut down after timeout, not al tasks have been executed");
executor.shutdownNow();
}
assertEquals(0, executor.getActiveCount());
// Check there are no processes active anymore
assertEquals(0, runtimeService.createProcessInstanceQuery().count());
if (processEngineConfiguration.getHistoryLevel().isAtLeast(HistoryLevel.ACTIVITY)) {
// Check if all processes and tasks are complete
assertEquals(numberOfProcessesPerThread * numberOfThreads, historyService.createHistoricProcessInstanceQuery()
.finished().count());
assertEquals(totalNumberOfTasks, historyService.createHistoricTaskInstanceQuery()
.finished().count());
}
}
}
@Test
public void testThrottleContinuations() throws Exception {
QName serviceName = new QName("http://cxf.apache.org/systest/jaxws", "HelloContinuationService");
URL wsdlURL = getClass().getClassLoader().getResource(WSDL_PATH);
HelloContinuationService service = new HelloContinuationService(wsdlURL, serviceName);
final HelloContinuation helloPort = markForClose(service.getPort(HelloContinuation.class, cff));
ThreadPoolExecutor executor = new ThreadPoolExecutor(5, 5, 0, TimeUnit.SECONDS,
new ArrayBlockingQueue<Runnable>(10));
CountDownLatch startSignal = new CountDownLatch(1);
CountDownLatch helloDoneSignal = new CountDownLatch(5);
executor.execute(new HelloWorker(helloPort, "Fred", "", startSignal, helloDoneSignal));
startSignal.countDown();
Thread.sleep(10000);
executor.execute(new HelloWorker(helloPort, "Barry", "Jameson", startSignal, helloDoneSignal));
executor.execute(new HelloWorker(helloPort, "Harry", "", startSignal, helloDoneSignal));
executor.execute(new HelloWorker(helloPort, "Rob", "Davidson", startSignal, helloDoneSignal));
executor.execute(new HelloWorker(helloPort, "James", "ServiceMix", startSignal, helloDoneSignal));
helloDoneSignal.await(60, TimeUnit.SECONDS);
executor.shutdownNow();
Assert.assertEquals("Some invocations are still running", 0, helloDoneSignal.getCount());
}
@Test(timeout = 10000)
public void testNotificationsWithListenerJVMError() {
final CountDownLatch errorCaughtLatch = new CountDownLatch(1);
queueExecutor = new ThreadPoolExecutor(1, 1, 0, TimeUnit.SECONDS, new LinkedBlockingQueue<>()) {
@Override
@SuppressWarnings("checkstyle:illegalCatch")
public void execute(final Runnable command) {
super.execute(() -> {
try {
command.run();
} catch (Error e) {
errorCaughtLatch.countDown();
}
});
}
};
NotificationManager<TestListener<Integer>, Integer> manager = QueuedNotificationManager.create(queueExecutor,
new TestNotifier<>(), 10, "TestMgr");
TestListener<Integer> listener = new TestListener<>(2, 1);
listener.jvmError = mock(Error.class);
manager.submitNotification(listener, 1);
assertTrue("JVM Error caught", Uninterruptibles.awaitUninterruptibly(errorCaughtLatch, 5, TimeUnit.SECONDS));
manager.submitNotification(listener, 2);
listener.verifyNotifications();
List<Runnable> tasks = queueExecutor.shutdownNow();
assertTrue(tasks.isEmpty());
}
/**
* shutdownNow returns a list containing tasks that were not run,
* and those tasks are drained from the queue
*/
public void testShutdownNow() throws InterruptedException {
final int poolSize = 2;
final int count = 5;
final AtomicInteger ran = new AtomicInteger(0);
final ThreadPoolExecutor p =
new ThreadPoolExecutor(poolSize, poolSize,
LONG_DELAY_MS, MILLISECONDS,
new ArrayBlockingQueue<Runnable>(10));
final CountDownLatch threadsStarted = new CountDownLatch(poolSize);
Runnable waiter = new CheckedRunnable() { public void realRun() {
threadsStarted.countDown();
try {
MILLISECONDS.sleep(2 * LONG_DELAY_MS);
} catch (InterruptedException success) {}
ran.getAndIncrement();
}};
for (int i = 0; i < count; i++)
p.execute(waiter);
await(threadsStarted);
assertEquals(poolSize, p.getActiveCount());
assertEquals(0, p.getCompletedTaskCount());
final List<Runnable> queuedTasks;
try {
queuedTasks = p.shutdownNow();
} catch (SecurityException ok) {
return; // Allowed in case test doesn't have privs
}
assertTrue(p.isShutdown());
assertTrue(p.getQueue().isEmpty());
assertEquals(count - poolSize, queuedTasks.size());
assertTrue(p.awaitTermination(LONG_DELAY_MS, MILLISECONDS));
assertTrue(p.isTerminated());
assertEquals(poolSize, ran.get());
assertEquals(poolSize, p.getCompletedTaskCount());
}
@Test (timeout = 120000)
public void testPriorityBasedExecutor() throws Exception {
List<Priority> priorityList = Collections.synchronizedList(new ArrayList<Priority>());
ThreadFactory factory = new ThreadFactoryBuilder().setNameFormat("priority-exec-thread-%d").build();
ThreadPoolExecutor threadPoolExecutor = new PriorityBasedThreadPoolExecutor(2, 2, 0L,
TimeUnit.MILLISECONDS, factory);
Future<?> future1 = threadPoolExecutor.submit(
RunnableWithPriority.get(new DummyRunnable(priorityList, Priority.LOW), Priority.LOW));
Future<?> future2 = threadPoolExecutor.submit(
RunnableWithPriority.get(new DummyRunnable(priorityList, Priority.LOW), Priority.LOW));
Future<?> future3 = threadPoolExecutor.submit(
RunnableWithPriority.get(new DummyRunnable(priorityList, Priority.LOW), Priority.LOW));
Future<?> future4 = threadPoolExecutor.submit(
RunnableWithPriority.get(new DummyRunnable(priorityList, Priority.LOW), Priority.LOW));
Future<?> future5 = threadPoolExecutor.submit(
RunnableWithPriority.get(new DummyRunnable(priorityList, Priority.NORMAL), Priority.NORMAL));
Future<?> future6 = threadPoolExecutor.submit(
RunnableWithPriority.get(new DummyRunnable(priorityList, Priority.NORMAL), Priority.NORMAL));
Future<?> future7 = threadPoolExecutor.submit(
RunnableWithPriority.get(new DummyRunnable(priorityList, Priority.HIGH), Priority.HIGH));
Future<?> future8 = threadPoolExecutor.submit(
RunnableWithPriority.get(new DummyRunnable(priorityList, Priority.HIGH), Priority.HIGH));
List<Priority> expectedPriorityList = Lists.newArrayList(Priority.LOW, Priority.LOW);
while (priorityList.size() < 2) {
Thread.sleep(100);
}
Assert.assertEquals(expectedPriorityList, priorityList);
future1.cancel(true);
future2.cancel(true);
expectedPriorityList.addAll(Lists.newArrayList(Priority.HIGH, Priority.HIGH));
while (priorityList.size() < 4) {
Thread.sleep(100);
}
Assert.assertEquals(expectedPriorityList, priorityList);
future7.cancel(true);
future8.cancel(true);
expectedPriorityList.addAll(Lists.newArrayList(Priority.NORMAL, Priority.NORMAL));
while (priorityList.size() < 6) {
Thread.sleep(100);
}
Assert.assertEquals(expectedPriorityList, priorityList);
future5.cancel(true);
future6.cancel(true);
expectedPriorityList.addAll(Lists.newArrayList(Priority.LOW, Priority.LOW));
while (priorityList.size() < 8) {
Thread.sleep(100);
}
Assert.assertEquals(expectedPriorityList, priorityList);
future3.cancel(true);
future4.cancel(true);
threadPoolExecutor.shutdownNow();
}
@Test
public void shouldAllowMultipleConnectionsInParallel() throws InterruptedException
{
final int numberOfArchiveClients = 5;
final long connectTimeoutNs = TimeUnit.SECONDS.toNanos(10);
final CountDownLatch latch = new CountDownLatch(numberOfArchiveClients);
final ThreadPoolExecutor executor = (ThreadPoolExecutor)Executors.newFixedThreadPool(numberOfArchiveClients);
final ManyToOneConcurrentLinkedQueue<AeronArchive> archiveClientQueue = new ManyToOneConcurrentLinkedQueue<>();
final MediaDriver.Context driverCtx = new MediaDriver.Context()
.errorHandler(Tests::onError)
.clientLivenessTimeoutNs(connectTimeoutNs)
.dirDeleteOnStart(true)
.publicationUnblockTimeoutNs(connectTimeoutNs * 2)
.threadingMode(ThreadingMode.SHARED);
final Context archiveCtx = new Context()
.threadingMode(SHARED)
.connectTimeoutNs(connectTimeoutNs);
executor.prestartAllCoreThreads();
try (ArchivingMediaDriver driver = ArchivingMediaDriver.launch(driverCtx, archiveCtx))
{
for (int i = 0; i < numberOfArchiveClients; i++)
{
executor.execute(
() ->
{
final AeronArchive.Context ctx = new AeronArchive.Context().messageTimeoutNs(connectTimeoutNs);
final AeronArchive archive = AeronArchive.connect(ctx);
archiveClientQueue.add(archive);
latch.countDown();
});
}
latch.await(driver.archive().context().connectTimeoutNs() * 2, TimeUnit.NANOSECONDS);
AeronArchive archiveClient;
while (null != (archiveClient = archiveClientQueue.poll()))
{
archiveClient.close();
}
assertEquals(0L, latch.getCount());
}
finally
{
executor.shutdownNow();
archiveCtx.deleteDirectory();
driverCtx.deleteDirectory();
}
}
@Override
public void start(Executor executor, int port, int backlog, boolean forceExit) {
if (!ThreadPoolExecutor.class.isAssignableFrom(executor.getClass())) {
throw new RuntimeException("No Supportive Executor Exception: only support ThreadPoolExecutor");
}
ThreadPoolExecutor tpe = (ThreadPoolExecutor) executor;
start(port, backlog, Runtime.getRuntime().availableProcessors() * 2, tpe.getCorePoolSize(), forceExit);
tpe.shutdownNow();
}