下面列出了java.util.concurrent.ThreadPoolExecutor#awaitTermination ( ) 实例代码,或者点击链接到github查看源代码,也可以在右侧发表评论。
@Test
public void test_threadPool() {
int total = 4;
ThreadPoolExecutor executor = new ThreadPoolExecutor(total, total, 60, TimeUnit.SECONDS, new ArrayBlockingQueue<>(100));
for (int i = 0; i < total; i++) {
executor.submit(new Task(i));
}
try {
executor.awaitTermination(1, TimeUnit.SECONDS);
} catch (Exception e) {
//
}
stopped = true;
executor.shutdown();
}
@Test
public void priortiy() throws Exception {
ThreadPoolExecutor es = new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS,
MCRProcessableFactory.newPriorityBlockingQueue());
TaskConsumer callback = new TaskConsumer();
CompletableFuture.supplyAsync(new MCRPrioritySupplier<>(new Task(1), 1), es).thenAccept(callback);
CompletableFuture.supplyAsync(new MCRPrioritySupplier<>(new Task(2), 2), es).thenAccept(callback);
CompletableFuture.supplyAsync(new MCRPrioritySupplier<>(new Task(3), 3), es).thenAccept(callback);
CompletableFuture.supplyAsync(new MCRPrioritySupplier<>(new Task(4), 4), es).thenAccept(callback);
CompletableFuture.supplyAsync(new MCRPrioritySupplier<>(new Task(5), 5), es).thenAccept(callback);
CompletableFuture.supplyAsync(new MCRPrioritySupplier<>(new Task(10), 10), es).thenAccept(callback);
es.awaitTermination(1, TimeUnit.SECONDS);
assertEquals("all threads should be executed after termination", 6, TaskConsumer.COUNTER);
assertArrayEquals("threads should be executed in order: " + Arrays.toString(EXCPECTED), EXCPECTED,
TaskConsumer.ORDER);
}
/**
* Description: 关闭一个连接池,等待已有任务完成
*
* @param executor 被关闭线程池对象
* @return void
* @author fanxb
* @date 2018/10/12 13:45
*/
public static void shutdown(ThreadPoolExecutor executor) {
if (executor == null) {
return;
}
executor.shutdown();
try {
int count = 0;
int timeOut = 2;
while (executor.awaitTermination(timeOut, TimeUnit.SECONDS)) {
count++;
if (count == 100) {
executor.shutdownNow();
break;
}
}
} catch (Exception e) {
e.printStackTrace();
}
}
@Override
public void run() {
try {
ApplicationState.getInstance().runningProperty().set(true);
executor = new ThreadPoolExecutor(concurrency, concurrency, 1, TimeUnit.DAYS, tasks);
executor.allowCoreThreadTimeOut(true);
result.start();
buildWorkerPool.run();
executor.shutdown();
executor.awaitTermination(1, TimeUnit.DAYS);
result.stop();
} catch (InterruptedException ex) {
Logger.getLogger(BatchRunner.class.getName()).log(Level.SEVERE, null, ex);
if (!executor.isShutdown()) {
executor.getQueue().clear();
}
}
result.stop();
}
@Test
public void testSysOperationsIsClearedIfNothingToFetch() throws Exception {
ThreadPoolExecutor threadPoolExecutor = (ThreadPoolExecutor) Executors.newFixedThreadPool(1);
try {
JobsLogs jobsLogs = new JobsLogs(() -> true);
NodeFetchOperation fetchOperation = new NodeFetchOperation(
threadPoolExecutor,
2,
jobsLogs,
new TasksService(clusterService, jobsLogs),
new NoopCircuitBreaker("dummy"));
fetchOperation.fetch(UUID.randomUUID(), 1, null, true).get(5, TimeUnit.SECONDS);
assertThat(Iterables.size(jobsLogs.activeOperations()), is(0));
} finally {
threadPoolExecutor.shutdown();
threadPoolExecutor.awaitTermination(2, TimeUnit.SECONDS);
}
}
/**
* Scrap metadata for the current section. The metadata will be downloaded in
* form of a JSON file and saved in a database located at the base of the
* working direction.
*
* <b>Implnote:</b> abortIfAnEntryAlreadyExists was inserted because users will
* most likely forget to uncomment this method when running the example a second
* time.
*
* @param sectionEndpoint The name of the category or section to
* scrap
* @param abortIfAnEntryAlreadyExists skip this method call if the database
* already contains at least 1 entry to this
* section.
* <p>
* Pass false if the section shall still be
* parsed. Duplicate entries by id are still
* ignored.
* @throws SQLException if an SQL error occurs during database access
*/
public void scrapMetadata(String sectionEndpoint, boolean abortIfAnEntryAlreadyExists) throws SQLException {
LOGGER.info("Begin scrapping metadata");
if (abortIfAnEntryAlreadyExists && dbManager.containsPostItemFromSection(sectionEndpoint)) {
LOGGER.info(
"It appears the section was already scrapped. Ignore request. If you want to reparse the section either clear the databse"
+ " or call the scrapMetadata with abortIfAnEntryAlreadyExists = false");
return;
}
boundCachedThreadPoolExecutor = new ThreadPoolExecutor(0, scrappingParallelisationLevel, 60, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>());
scrap9GagContent(sectionEndpoint.toLowerCase(), "", 0);
try {
boundCachedThreadPoolExecutor.awaitTermination(1, TimeUnit.HOURS);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
public void shutdownTomcatThreadpool() {
connector.pause();
StandardLogger logger = LoggerUtil.getStandardLogger();
logger.info("Waiting for Tomcat threads to finish processing...");
int timeout = PropertyManager.getIntegerProperty(PropertyNames.MDW_THREADPOOL_TERMINATION_TIMEOUT, 30);
Executor executor = this.connector.getProtocolHandler().getExecutor();
if (executor instanceof ThreadPoolExecutor) {
try {
ThreadPoolExecutor threadPoolExecutor = (ThreadPoolExecutor) executor;
threadPoolExecutor.shutdown();
if (!threadPoolExecutor.awaitTermination(timeout, TimeUnit.SECONDS)) {
LoggerUtil.getStandardLogger().error("Thread pool fails to terminate after " + timeout + " seconds");
}
}
catch (InterruptedException ex) {
Thread.currentThread().interrupt();
}
}
}
/**
* Wait for the termination of the thread pools.
*
* @param milliseconds The number of milliseconds to wait
* @return true if all thread pools are terminated without time limit
* @throws InterruptedException
*/
public synchronized boolean awaitTermination(long milliseconds)
throws InterruptedException {
long end = Time.now() + milliseconds;
for (Map.Entry<String, ThreadPoolExecutor> e:
executors.entrySet()) {
ThreadPoolExecutor executor = e.getValue();
if (!executor.awaitTermination(
Math.max(end - Time.now(), 0),
TimeUnit.MILLISECONDS)) {
LOG.warn("AsyncDiskService awaitTermination timeout.");
return false;
}
}
LOG.info("All AsyncDiskService threads are terminated.");
return true;
}
/**
* @return {@code true} if the thread pool backing the (optional) executor service is shutdown
*/
private void shutdown(@Nullable final NexusExecutorService executorService) {
if (executorService != null) {
ThreadPoolExecutor threadPool = (ThreadPoolExecutor) executorService.getTargetExecutorService();
threadPool.shutdown();
try {
threadPool.awaitTermination(5L, TimeUnit.SECONDS);
}
catch (InterruptedException e) {
log.debug("Interrupted while waiting for termination", e);
}
}
}
private void start() {
threadPoolExecutor = new ThreadPoolExecutor(8, 8, 1, TimeUnit.DAYS, new ArrayBlockingQueue<Runnable>(100000));
client = LocalDevSetup.setupJson("http://localhost:8080");
loadDirectoryRecursive(path, null);
threadPoolExecutor.shutdown();
try {
threadPoolExecutor.awaitTermination(1, TimeUnit.MINUTES);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
@Override
public void customize(Connector connector) {
if (connector == null) {
log.info("We are running unit test");
return;
}
final Executor executor = connector.getProtocolHandler().getExecutor();
if (executor instanceof ThreadPoolExecutor) {
log.info("executor is ThreadPoolExecutor");
final ThreadPoolExecutor threadPoolExecutor = (ThreadPoolExecutor) executor;
if (threadPoolExecutor.isTerminated()) {
log.info("thread pool executor has terminated");
} else {
LocalDateTime startShutdown = LocalDateTime.now();
LocalDateTime stopShutdown = LocalDateTime.now();
try {
threadPoolExecutor.shutdown();
if (!threadPoolExecutor
.awaitTermination(tomcatGracefulShutdownProperties.getWaitTime(), TimeUnit.SECONDS)) {
log.warn("Tomcat thread pool did not shut down gracefully within"
+ tomcatGracefulShutdownProperties.getWaitTime() + " second(s). Proceeding with force shutdown");
threadPoolExecutor.shutdownNow();
} else {
log.info("Tomcat thread pool is empty,we stop now");
}
} catch (Exception e) {
log.error("The await termination has been interrupted : " + e.getMessage());
Thread.currentThread().interrupt();
;
} finally {
final long seconds = Duration.between(startShutdown, stopShutdown).getSeconds();
log.info("Shutdown performed in " + seconds + " second(s)");
}
}
}
}
/**
* Tests {@link com.sonymobile.tools.gerrit.gerritevents.GerritHandler#addListener(GerritEventListener)}.
* With 10000 listeners added by 10 threads at the same time.
*
* @throws Exception if so.
*/
@Test
public void testAddListenerManyAtTheSameTime() throws Exception {
final int nrOfListeners = 100000;
BlockingQueue<Runnable> listeners = new LinkedBlockingQueue<Runnable>(nrOfListeners);
System.out.print("Creating Listeners");
for (int i = 0; i < nrOfListeners; i++) {
listeners.add(new Runnable() {
GerritEventListener listener = new ListenerMock();
@Override
public void run() {
handler.addListener(listener);
}
});
if (i % 1000 == 0) {
System.out.print(".");
}
}
System.out.println(".Done!");
ThreadPoolExecutor executor = new ThreadPoolExecutor(100, 100, 1, TimeUnit.MINUTES, listeners);
executor.prestartAllCoreThreads();
executor.shutdown();
do {
System.out.printf("Waiting for listeners to be added...Running#: %5d Left#: %5d Count#: %5d\n",
executor.getActiveCount(), listeners.size(), handler.getEventListenersCount());
} while (!executor.awaitTermination(1, TimeUnit.SECONDS));
System.out.printf(" Listeners are added...Running#: %5d Left#: %5d Count#: %5d\n",
executor.getActiveCount(), listeners.size(), handler.getEventListenersCount());
assertEquals(nrOfListeners, handler.getEventListenersCount());
}
private void shutdown(ThreadPoolExecutor executor) {
executor.shutdown();
try {
if ( ! executor.awaitTermination(30, TimeUnit.SECONDS))
throw new RuntimeException("Rendering thread pool did not shutdown in 30 seconds");
}
catch (InterruptedException e) {
// return
}
}
private void shutdownPoolExecutor(ThreadPoolExecutor old) {
try {
old.awaitTermination(5, TimeUnit.MINUTES);
old.shutdown();
} catch (InterruptedException e) {
old.shutdownNow();
LOGGER.error("Shutdown Zuul Thread Pool:", e);
}
}
public void init(boolean suppressOutput) {
ThreadPoolExecutor executor = new ThreadPoolExecutor(Runtime.getRuntime().availableProcessors(), Runtime.getRuntime().availableProcessors(), 1, TimeUnit.HOURS, new ArrayBlockingQueue<>(5));
PrintStream oldErr = System.err;
PrintStream oldOut = System.out;
if (suppressOutput) {
PrintStream nop = new PrintStream(new ByteArrayOutputStream());
System.setErr(nop);
System.setOut(nop);
}
executor.submit(new PackageLoader(this, Ifc2x3tc1Package.eINSTANCE, Schema.IFC2X3TC1));
executor.submit(new PackageLoader(this, Ifc4Package.eINSTANCE, Schema.IFC4));
executor.submit(new PackageLoader(this, GeometryPackage.eINSTANCE, Schema.GEOMETRY));
executor.submit(new PackageLoader(this, StorePackage.eINSTANCE, Schema.STORE));
executor.submit(new PackageLoader(this, LogPackage.eINSTANCE, Schema.LOG));
executor.shutdown();
try {
executor.awaitTermination(1, TimeUnit.HOURS);
} catch (InterruptedException e) {
LOGGER.error("", e);
}
if (suppressOutput) {
System.setErr(oldErr);
System.setOut(oldOut);
}
initDependencies();
}
@Deployment
public void testConcurrentUsage() throws Exception {
if(!processEngineConfiguration.getDatabaseType().equals("h2") && !processEngineConfiguration.getDatabaseType().equals("db2")) {
int numberOfThreads = 5;
int numberOfProcessesPerThread = 5;
int totalNumberOfTasks = 2 * numberOfThreads * numberOfProcessesPerThread;
ThreadPoolExecutor executor = new ThreadPoolExecutor(10, 10, 1000, TimeUnit.MILLISECONDS, new ArrayBlockingQueue<Runnable>(numberOfThreads));
for(int i=0; i< numberOfThreads; i++) {
executor.execute(new ConcurrentProcessRunnerRunnable(numberOfProcessesPerThread, "kermit" + i));
}
// Wait for termination or timeout and check if all tasks are complete
executor.shutdown();
boolean isEnded = executor.awaitTermination(20000, TimeUnit.MILLISECONDS);
if(!isEnded) {
log.error("Executor was not shut down after timeout, not al tasks have been executed");
executor.shutdownNow();
}
assertEquals(0, executor.getActiveCount());
// Check there are no processes active anymore
assertEquals(0, runtimeService.createProcessInstanceQuery().count());
if(processEngineConfiguration.getHistoryLevel().isAtLeast(HistoryLevel.ACTIVITY)) {
// Check if all processes and tasks are complete
assertEquals(numberOfProcessesPerThread * numberOfThreads, historyService.createHistoricProcessInstanceQuery()
.finished().count());
assertEquals(totalNumberOfTasks, historyService.createHistoricTaskInstanceQuery()
.finished().count());
}
}
}
public static void awaitThreadPoolTermination(final String executorName, final ThreadPoolExecutor executorService,
final Duration timeBetweenChecks) {
try {
while (!executorService.awaitTermination(timeBetweenChecks.getSeconds(), TimeUnit.SECONDS)) {
log.info(String.format("%s waiting for job completion. Finished jobs - %d : Running jobs - %d : Queued jobs - %d",
executorName, executorService.getCompletedTaskCount(), executorService.getActiveCount(),
executorService.getQueue().size()));
}
} catch (final InterruptedException e) {
log.error("Interrupted exception caught: ", e);
}
}
public void testResubmittingRunnable()
{
final ThreadPoolExecutor executor = new ThreadPoolExecutor(1, 1,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue());
Runnable r = new Runnable()
{
public void run()
{
// do nothing... used to delay the executor;
sleep(100);
}
};
final int[]ran = new int[1];
final boolean[]bool = new boolean[1];
Runnable r2 = new Runnable()
{
public void run()
{
ran[0]++;
if (ran[0] < 5)
{
if (executor.isShutdown())
{
bool[0] = true;
}
executor.getQueue().add(this);
}
}
};
for(int i=0;i<10;i++)
{
executor.submit(r);
}
executor.submit(r2);
executor.shutdown();
try
{
executor.awaitTermination(100, TimeUnit.SECONDS);
}
catch (InterruptedException e)
{
fail();
}
assertTrue(bool[0]);
assertEquals(5, ran[0]);
}
@Test
public void testFixedTimingOutThreadPool() throws Exception {
int numMaxThreads = 5;
long timeoutMillis = 2;
ThreadPoolExecutor es = JavaUtils.newFixedTimingOutThreadPool(
numMaxThreads,
timeoutMillis,
"test"
);
Thread.sleep(timeoutMillis + 100);
Assertions.assertTrue(es.getPoolSize() == 0);
final CountDownLatch block = new CountDownLatch(1);
final CountDownLatch cdl = new CountDownLatch(numMaxThreads);
for (int i = 0; i < numMaxThreads; i++) {
es.submit(
new Runnable() {
@Override
public void run() {
try {
cdl.countDown();
cdl.await();
block.await();
} catch (Throwable t) {
throw new RuntimeException(t);
}
}
}
);
}
cdl.await();
// all threads are running:
Assertions.assertTrue(es.getPoolSize() == numMaxThreads);
block.countDown();
Thread.sleep(timeoutMillis + 100);
Assertions.assertTrue(es.getMaximumPoolSize() == numMaxThreads);
Assertions.assertTrue(es.getPoolSize() == 0);
es.shutdown();
es.awaitTermination(timeoutMillis + 1, TimeUnit.MILLISECONDS);
}
public void producerConsumer(final Producer<T> producer, final Consumer<T> consumer)
{
boolean isOuter = startParallelProcess(this);
//final Timer tim = new Timer("Parallel.producerConsumer");
if(getNumProcessors(this)>1) {
ThreadPoolExecutor pool = new ThreadPoolExecutor(
getNumProcessors(this),
getNumProcessors(this),
0,
TimeUnit.SECONDS,
new LinkedBlockingQueue<Runnable>(), new ThreadFactory() {
public Thread newThread(Runnable r) {
return new Thread(r, "Parallel.producerConsumer thread");
}
});
//ThreadPoolExecutor pool = getExecutor(getNumProcessors());
producer.setConsumer(consumer);
producer.setPool(pool);
RunnableProgressReporter rpr = new RunnableProgressReporter();
rpr.setPool(pool);
//p.setTimer(timerToReport);
if(isOuter)
rpr.start();
// start the producer thread
ITask producerTask = new Task() {
@Override
public void execute() {
//Timer tp = new Timer("Producer", tim);
producer.execute();
//tp.stop();
}
};
run(producerTask);
// wait for the producer thread to finish
join(producerTask);
// try {
// //p.join();
//
// } catch (InterruptedException e) {
// e.printStackTrace();
// }
//if(isOuter)
//System.out.println("Producer finished.");
// wait for the consumer threads to finish
pool.shutdown();
try {
pool.awaitTermination(1, TimeUnit.DAYS);
} catch (InterruptedException e) {
e.printStackTrace();
}
rpr.stop();
} else {
// run single-threaded
producer.setRunSingleThreaded(true);
producer.setConsumer(consumer);
producer.execute();
}
endParallelProcess(this);
}