下面列出了java.util.concurrent.ThreadPoolExecutor#execute ( ) 实例代码,或者点击链接到github查看源代码,也可以在右侧发表评论。
/**
* getActiveCount increases but doesn't overestimate, when a
* thread becomes active
*/
public void testGetActiveCount() throws InterruptedException {
final CountDownLatch done = new CountDownLatch(1);
final ThreadPoolExecutor p = new CustomExecutor(2);
try (PoolCleaner cleaner = cleaner(p, done)) {
final CountDownLatch threadStarted = new CountDownLatch(1);
assertEquals(0, p.getActiveCount());
p.execute(new CheckedRunnable() {
public void realRun() throws InterruptedException {
threadStarted.countDown();
assertEquals(1, p.getActiveCount());
await(done);
}});
await(threadStarted);
assertEquals(1, p.getActiveCount());
}
}
/**
* allowCoreThreadTimeOut(true) causes idle threads to time out
*/
public void testAllowCoreThreadTimeOut_true() throws Exception {
long keepAliveTime = timeoutMillis();
final ThreadPoolExecutor p =
new ThreadPoolExecutor(2, 10,
keepAliveTime, MILLISECONDS,
new ArrayBlockingQueue<Runnable>(10));
try (PoolCleaner cleaner = cleaner(p)) {
final CountDownLatch threadStarted = new CountDownLatch(1);
p.allowCoreThreadTimeOut(true);
p.execute(new CheckedRunnable() {
public void realRun() {
threadStarted.countDown();
assertEquals(1, p.getPoolSize());
}});
await(threadStarted);
delay(keepAliveTime);
long startTime = System.nanoTime();
while (p.getPoolSize() > 0
&& millisElapsedSince(startTime) < LONG_DELAY_MS)
Thread.yield();
assertTrue(millisElapsedSince(startTime) < LONG_DELAY_MS);
assertEquals(0, p.getPoolSize());
}
}
/**
* getActiveCount increases but doesn't overestimate, when a
* thread becomes active
*/
public void testGetActiveCount() throws InterruptedException {
final CountDownLatch done = new CountDownLatch(1);
final ThreadPoolExecutor p =
new ThreadPoolExecutor(2, 2,
LONG_DELAY_MS, MILLISECONDS,
new ArrayBlockingQueue<Runnable>(10));
try (PoolCleaner cleaner = cleaner(p, done)) {
final CountDownLatch threadStarted = new CountDownLatch(1);
assertEquals(0, p.getActiveCount());
p.execute(new CheckedRunnable() {
public void realRun() throws InterruptedException {
threadStarted.countDown();
assertEquals(1, p.getActiveCount());
await(done);
}});
await(threadStarted);
assertEquals(1, p.getActiveCount());
}
}
/**
* allowCoreThreadTimeOut(false) causes idle threads not to time out
*/
public void testAllowCoreThreadTimeOut_false() throws Exception {
long keepAliveTime = timeoutMillis();
final ThreadPoolExecutor p =
new CustomTPE(2, 10,
keepAliveTime, MILLISECONDS,
new ArrayBlockingQueue<Runnable>(10));
try (PoolCleaner cleaner = cleaner(p)) {
final CountDownLatch threadStarted = new CountDownLatch(1);
p.allowCoreThreadTimeOut(false);
p.execute(new CheckedRunnable() {
public void realRun() throws InterruptedException {
threadStarted.countDown();
assertTrue(p.getPoolSize() >= 1);
}});
delay(2 * keepAliveTime);
assertTrue(p.getPoolSize() >= 1);
}
}
/**
* getLargestPoolSize increases, but doesn't overestimate, when
* multiple threads active
*/
public void testGetLargestPoolSize() throws InterruptedException {
final int THREADS = 3;
final CountDownLatch done = new CountDownLatch(1);
final ThreadPoolExecutor p =
new CustomTPE(THREADS, THREADS,
LONG_DELAY_MS, MILLISECONDS,
new ArrayBlockingQueue<Runnable>(10));
try (PoolCleaner cleaner = cleaner(p, done)) {
assertEquals(0, p.getLargestPoolSize());
final CountDownLatch threadsStarted = new CountDownLatch(THREADS);
for (int i = 0; i < THREADS; i++)
p.execute(new CheckedRunnable() {
public void realRun() throws InterruptedException {
threadsStarted.countDown();
await(done);
assertEquals(THREADS, p.getLargestPoolSize());
}});
await(threadsStarted);
assertEquals(THREADS, p.getLargestPoolSize());
}
assertEquals(THREADS, p.getLargestPoolSize());
}
/**
* getCompletedTaskCount increases, but doesn't overestimate,
* when tasks complete
*/
public void testGetCompletedTaskCount() throws InterruptedException {
final ThreadPoolExecutor p =
new ThreadPoolExecutor(2, 2,
LONG_DELAY_MS, MILLISECONDS,
new ArrayBlockingQueue<Runnable>(10));
try (PoolCleaner cleaner = cleaner(p)) {
final CountDownLatch threadStarted = new CountDownLatch(1);
final CountDownLatch threadProceed = new CountDownLatch(1);
final CountDownLatch threadDone = new CountDownLatch(1);
assertEquals(0, p.getCompletedTaskCount());
p.execute(new CheckedRunnable() {
public void realRun() throws InterruptedException {
threadStarted.countDown();
assertEquals(0, p.getCompletedTaskCount());
threadProceed.await();
threadDone.countDown();
}});
await(threadStarted);
assertEquals(0, p.getCompletedTaskCount());
threadProceed.countDown();
threadDone.await();
long startTime = System.nanoTime();
while (p.getCompletedTaskCount() != 1) {
if (millisElapsedSince(startTime) > LONG_DELAY_MS)
fail("timed out");
Thread.yield();
}
}
}
/**
* remove(task) removes queued task, and fails to remove active task
*/
public void testRemove() throws InterruptedException {
final CountDownLatch done = new CountDownLatch(1);
BlockingQueue<Runnable> q = new ArrayBlockingQueue<Runnable>(10);
final ThreadPoolExecutor p =
new ThreadPoolExecutor(1, 1,
LONG_DELAY_MS, MILLISECONDS,
q);
try (PoolCleaner cleaner = cleaner(p, done)) {
Runnable[] tasks = new Runnable[6];
final CountDownLatch threadStarted = new CountDownLatch(1);
for (int i = 0; i < tasks.length; i++) {
tasks[i] = new CheckedRunnable() {
public void realRun() throws InterruptedException {
threadStarted.countDown();
await(done);
}};
p.execute(tasks[i]);
}
await(threadStarted);
assertFalse(p.remove(tasks[0]));
assertTrue(q.contains(tasks[4]));
assertTrue(q.contains(tasks[3]));
assertTrue(p.remove(tasks[4]));
assertFalse(p.remove(tasks[4]));
assertFalse(q.contains(tasks[4]));
assertTrue(q.contains(tasks[3]));
assertTrue(p.remove(tasks[3]));
assertFalse(q.contains(tasks[3]));
}
}
/**
* Execute the task sometime in the future, using ThreadPools.
*/
public synchronized void execute(String root, Runnable task) {
ThreadPoolExecutor executor = executors.get(root);
if (executor == null) {
throw new RuntimeException("Cannot find root " + root
+ " for execution of task " + task);
} else {
executor.execute(task);
}
}
@Test(timeout = 30000)
public void concurrentAdd() throws NoSuchFieldException, IllegalAccessException, InterruptedException {
ThreadPoolExecutor executor = new ThreadPoolExecutor(10, 50, 10, TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>(100));
final Set<Runnable> added = Collections.synchronizedSet(new HashSet());
final CountDownLatch latch = new CountDownLatch(999);
// Adding 1000 Runnables from different threads
mHandler.post(new SleepyRunnable(0));
for (int i = 0; i < 999; ++i) {
final SleepyRunnable sleepyRunnable = new SleepyRunnable(i+1);
executor.execute(new Runnable() {
@Override
public void run() {
mHandler.post(sleepyRunnable);
added.add(sleepyRunnable);
latch.countDown();
}
});
}
// Waiting until all runnables added
// Notified by #Notify1
latch.await();
ChainedRef ref = mHandler.mRunnables.next;
while (ref != null) {
assertTrue("Must remove runnable from chained list: " + ref.runnable, added.remove(ref.runnable));
ref = ref.next;
}
assertTrue("All runnables should present in chain, however we still haven't found " + added, added.isEmpty());
}
/**
* execute using DiscardOldestPolicy drops task on shutdown
*/
public void testDiscardOldestOnShutdown() {
final ThreadPoolExecutor p =
new CustomTPE(1, 1,
LONG_DELAY_MS, MILLISECONDS,
new ArrayBlockingQueue<Runnable>(1),
new CustomTPE.DiscardOldestPolicy());
try { p.shutdown(); } catch (SecurityException ok) { return; }
try (PoolCleaner cleaner = cleaner(p)) {
TrackedNoOpRunnable r = new TrackedNoOpRunnable();
p.execute(r);
assertFalse(r.done);
}
}
/**
* Execute the task sometime in the future, using ThreadPools.
*/
public synchronized void execute(String root, Runnable task) {
ThreadPoolExecutor executor = executors.get(root);
if (executor == null) {
throw new RuntimeException("Cannot find root " + root
+ " for execution of task " + task);
} else {
executor.execute(task);
}
}
/**
* getTaskCount increases, but doesn't overestimate, when tasks submitted
*/
public void testGetTaskCount() throws InterruptedException {
final int TASKS = 3;
final CountDownLatch done = new CountDownLatch(1);
final ThreadPoolExecutor p =
new CustomTPE(1, 1,
LONG_DELAY_MS, MILLISECONDS,
new ArrayBlockingQueue<Runnable>(10));
try (PoolCleaner cleaner = cleaner(p, done)) {
final CountDownLatch threadStarted = new CountDownLatch(1);
assertEquals(0, p.getTaskCount());
assertEquals(0, p.getCompletedTaskCount());
p.execute(new CheckedRunnable() {
public void realRun() throws InterruptedException {
threadStarted.countDown();
await(done);
}});
await(threadStarted);
assertEquals(1, p.getTaskCount());
assertEquals(0, p.getCompletedTaskCount());
for (int i = 0; i < TASKS; i++) {
assertEquals(1 + i, p.getTaskCount());
p.execute(new CheckedRunnable() {
public void realRun() throws InterruptedException {
threadStarted.countDown();
assertEquals(1 + TASKS, p.getTaskCount());
await(done);
}});
}
assertEquals(1 + TASKS, p.getTaskCount());
assertEquals(0, p.getCompletedTaskCount());
}
assertEquals(1 + TASKS, p.getTaskCount());
assertEquals(1 + TASKS, p.getCompletedTaskCount());
}
/**
* 消息数据分片以及线程纷发
*/
private static void shardingAndMsgThread() {
PushForm pushForm = PushForm.getInstance();
Object[] data;
int maxThreadPoolSize = App.config.getMaxThreadPool();
ThreadPoolExecutor threadPoolExecutor = ThreadUtil.newExecutor(maxThreadPoolSize, maxThreadPoolSize);
MsgSendThread msgSendThread;
// 每个线程分配
int perThread = (int) (PushData.totalRecords / PushData.threadCount) + 1;
DefaultTableModel tableModel = (DefaultTableModel) pushForm.getPushThreadTable().getModel();
BaseMsgThread.msgType = App.config.getMsgType();
for (int i = 0; i < PushData.threadCount; i++) {
int startIndex = i * perThread;
if (startIndex > PushData.totalRecords - 1) {
PushData.threadCount = i;
break;
}
int endIndex = i * perThread + perThread;
if (endIndex > PushData.totalRecords - 1) {
endIndex = (int) (PushData.totalRecords);
}
IMsgSender msgSender = MsgSenderFactory.getMsgSender();
msgSendThread = new MsgSendThread(startIndex, endIndex, msgSender);
msgSendThread.setTableRow(i);
msgSendThread.setName("T-" + i);
data = new Object[6];
data[0] = msgSendThread.getName();
data[1] = startIndex + "-" + endIndex;
data[5] = 0;
tableModel.addRow(data);
threadPoolExecutor.execute(msgSendThread);
}
ConsoleUtil.consoleWithLog("所有线程宝宝启动完毕……");
}
/**
* Test 'rsh' mode server.
*/
@Test
public void testRshServer() {
try{
ThreadPoolExecutor executor = (ThreadPoolExecutor) Executors.newCachedThreadPool();
//adjust number of threads as needed for testing
for (int i = 1; i <= 2; i++)
{
String depotPath = "//depot/...";
SyncDepot task = new SyncDepot(depotPath);
System.out.println("A new task has been added to sync : " + depotPath);
executor.execute(task);
}
executor.shutdown();
while (!executor.isTerminated()) {
System.out.println("Threads are still running...");
Thread.sleep(1000);
}
System.out.println("Finished all threads");
} catch (Exception exc) {
fail("Unexpected exception: " + exc.getLocalizedMessage());
}
}
/**
* execute using CallerRunsPolicy drops task on shutdown
*/
public void testCallerRunsOnShutdown() {
RejectedExecutionHandler h = new ThreadPoolExecutor.CallerRunsPolicy();
final ThreadPoolExecutor p =
new ThreadPoolExecutor(1, 1,
LONG_DELAY_MS, MILLISECONDS,
new ArrayBlockingQueue<Runnable>(1), h);
try { p.shutdown(); } catch (SecurityException ok) { return; }
try (PoolCleaner cleaner = cleaner(p)) {
TrackedNoOpRunnable r = new TrackedNoOpRunnable();
p.execute(r);
assertFalse(r.done);
}
}
private static void MyThreadPoolExecutor() throws InterruptedException {
ThreadPoolExecutor defaultThreadPool = new ThreadPoolExecutor(256, 256, 0L, TimeUnit.MILLISECONDS,
new SynchronousQueue<Runnable>(), new NamedThreadFactory("async", true));
defaultThreadPool.execute(getThread());
//Daemon为true 主线程结束 ,多线程也会结束
for(int i = 0; i < 10; i++) {
System.out.println("结束");
Thread.sleep(1000);
}
}
private void testUnloadLotsOfCores() throws Exception {
JettySolrRunner jetty = jettys.get(0);
try (final HttpSolrClient adminClient = (HttpSolrClient) jetty.newClient(15000, 60000)) {
int numReplicas = atLeast(3);
ThreadPoolExecutor executor = new ExecutorUtil.MDCAwareThreadPoolExecutor(0, Integer.MAX_VALUE,
5, TimeUnit.SECONDS, new SynchronousQueue<>(),
new SolrNamedThreadFactory("testExecutor"));
try {
// create the cores
createCollectionInOneInstance(adminClient, jetty.getNodeName(), executor, "multiunload", 2, numReplicas);
} finally {
ExecutorUtil.shutdownAndAwaitTermination(executor);
}
executor = new ExecutorUtil.MDCAwareThreadPoolExecutor(0, Integer.MAX_VALUE, 5,
TimeUnit.SECONDS, new SynchronousQueue<>(),
new SolrNamedThreadFactory("testExecutor"));
try {
for (int j = 0; j < numReplicas; j++) {
final int freezeJ = j;
executor.execute(() -> {
Unload unloadCmd = new Unload(true);
unloadCmd.setCoreName("multiunload" + freezeJ);
try {
adminClient.request(unloadCmd);
} catch (SolrServerException | IOException e) {
throw new RuntimeException(e);
}
});
Thread.sleep(random().nextInt(50));
}
} finally {
ExecutorUtil.shutdownAndAwaitTermination(executor);
}
}
}
/**
* purge removes cancelled tasks from the queue
*/
public void testPurge() throws InterruptedException {
final CountDownLatch threadStarted = new CountDownLatch(1);
final CountDownLatch done = new CountDownLatch(1);
final BlockingQueue<Runnable> q = new ArrayBlockingQueue<Runnable>(10);
final ThreadPoolExecutor p =
new ThreadPoolExecutor(1, 1,
LONG_DELAY_MS, MILLISECONDS,
q);
try (PoolCleaner cleaner = cleaner(p, done)) {
FutureTask[] tasks = new FutureTask[5];
for (int i = 0; i < tasks.length; i++) {
Callable task = new CheckedCallable<Boolean>() {
public Boolean realCall() throws InterruptedException {
threadStarted.countDown();
await(done);
return Boolean.TRUE;
}};
tasks[i] = new FutureTask(task);
p.execute(tasks[i]);
}
await(threadStarted);
assertEquals(tasks.length, p.getTaskCount());
assertEquals(tasks.length - 1, q.size());
assertEquals(1L, p.getActiveCount());
assertEquals(0L, p.getCompletedTaskCount());
tasks[4].cancel(true);
tasks[3].cancel(false);
p.purge();
assertEquals(tasks.length - 3, q.size());
assertEquals(tasks.length - 2, p.getTaskCount());
p.purge(); // Nothing to do
assertEquals(tasks.length - 3, q.size());
assertEquals(tasks.length - 2, p.getTaskCount());
}
}
private void testSystemPropertiesThreadPoolSettings(int expectedMax, int expectedScheduled) throws Exception {
ServerLocatorImpl serverLocator = new ServerLocatorImpl(false);
serverLocator.isUseGlobalPools();
Method setThreadPools = ServerLocatorImpl.class.getDeclaredMethod("setThreadPools");
setThreadPools.setAccessible(true);
setThreadPools.invoke(serverLocator);
// TODO: I would get this from the ActiveMQClient
Field threadPoolField = ServerLocatorImpl.class.getDeclaredField("threadPool");
Field scheduledThreadPoolField = ServerLocatorImpl.class.getDeclaredField("scheduledThreadPool");
threadPoolField.setAccessible(true);
scheduledThreadPoolField.setAccessible(true);
ThreadPoolExecutor threadPool = (ThreadPoolExecutor) ActiveMQClient.getGlobalThreadPool();
final CountDownLatch doneMax = new CountDownLatch(expectedMax);
final CountDownLatch latch = new CountDownLatch(1);
final CountDownLatch latchTotal = new CountDownLatch(expectedMax * 3); // we will schedule 3 * max, so all runnables should execute
final AtomicInteger errors = new AtomicInteger(0);
// Set this to true if you need to debug why executions are not being performed.
final boolean debugExecutions = false;
for (int i = 0; i < expectedMax * 3; i++) {
final int localI = i;
threadPool.execute(new Runnable() {
@Override
public void run() {
try {
if (debugExecutions) {
System.out.println("runnable " + localI);
}
doneMax.countDown();
latch.await();
latchTotal.countDown();
} catch (Exception e) {
errors.incrementAndGet();
} finally {
if (debugExecutions) {
System.out.println("done " + localI);
}
}
}
});
}
Assert.assertTrue(doneMax.await(5, TimeUnit.SECONDS));
latch.countDown();
Assert.assertTrue(latchTotal.await(5, TimeUnit.SECONDS));
ScheduledThreadPoolExecutor scheduledThreadPool = (ScheduledThreadPoolExecutor) scheduledThreadPoolField.get(serverLocator);
assertEquals(expectedMax, threadPool.getMaximumPoolSize());
assertEquals(expectedScheduled, scheduledThreadPool.getCorePoolSize());
}
@Override
protected void serviceStart() throws Exception {
client.start();
ThreadFactory tf = new ThreadFactoryBuilder().setNameFormat(
this.getClass().getName() + " #%d").setDaemon(true).build();
// Start with a default core-pool size and change it dynamically.
int initSize = Math.min(INITIAL_THREAD_POOL_SIZE, maxThreadPoolSize);
threadPool = new ThreadPoolExecutor(initSize, Integer.MAX_VALUE, 1,
TimeUnit.HOURS, new LinkedBlockingQueue<Runnable>(), tf);
eventDispatcherThread = new Thread() {
@Override
public void run() {
ContainerEvent event = null;
Set<String> allNodes = new HashSet<String>();
while (!stopped.get() && !Thread.currentThread().isInterrupted()) {
try {
event = events.take();
} catch (InterruptedException e) {
if (!stopped.get()) {
LOG.error("Returning, thread interrupted", e);
}
return;
}
allNodes.add(event.getNodeId().toString());
int threadPoolSize = threadPool.getCorePoolSize();
// We can increase the pool size only if haven't reached the maximum
// limit yet.
if (threadPoolSize != maxThreadPoolSize) {
// nodes where containers will run at *this* point of time. This is
// *not* the cluster size and doesn't need to be.
int nodeNum = allNodes.size();
int idealThreadPoolSize = Math.min(maxThreadPoolSize, nodeNum);
if (threadPoolSize < idealThreadPoolSize) {
// Bump up the pool size to idealThreadPoolSize +
// INITIAL_POOL_SIZE, the later is just a buffer so we are not
// always increasing the pool-size
int newThreadPoolSize = Math.min(maxThreadPoolSize,
idealThreadPoolSize + INITIAL_THREAD_POOL_SIZE);
LOG.info("Set NMClientAsync thread pool size to " +
newThreadPoolSize + " as the number of nodes to talk to is "
+ nodeNum);
threadPool.setCorePoolSize(newThreadPoolSize);
}
}
// the events from the queue are handled in parallel with a thread
// pool
threadPool.execute(getContainerEventProcessor(event));
// TODO: Group launching of multiple containers to a single
// NodeManager into a single connection
}
}
};
eventDispatcherThread.setName("Container Event Dispatcher");
eventDispatcherThread.setDaemon(false);
eventDispatcherThread.start();
super.serviceStart();
}