java.util.concurrent.ThreadPoolExecutor#execute ( )源码实例Demo

下面列出了java.util.concurrent.ThreadPoolExecutor#execute ( ) 实例代码,或者点击链接到github查看源代码,也可以在右侧发表评论。

/**
 * getActiveCount increases but doesn't overestimate, when a
 * thread becomes active
 */
public void testGetActiveCount() throws InterruptedException {
    final CountDownLatch done = new CountDownLatch(1);
    final ThreadPoolExecutor p = new CustomExecutor(2);
    try (PoolCleaner cleaner = cleaner(p, done)) {
        final CountDownLatch threadStarted = new CountDownLatch(1);
        assertEquals(0, p.getActiveCount());
        p.execute(new CheckedRunnable() {
            public void realRun() throws InterruptedException {
                threadStarted.countDown();
                assertEquals(1, p.getActiveCount());
                await(done);
            }});
        await(threadStarted);
        assertEquals(1, p.getActiveCount());
    }
}
 
源代码2 项目: openjdk-jdk9   文件: ThreadPoolExecutorTest.java
/**
 * allowCoreThreadTimeOut(true) causes idle threads to time out
 */
public void testAllowCoreThreadTimeOut_true() throws Exception {
    long keepAliveTime = timeoutMillis();
    final ThreadPoolExecutor p =
        new ThreadPoolExecutor(2, 10,
                               keepAliveTime, MILLISECONDS,
                               new ArrayBlockingQueue<Runnable>(10));
    try (PoolCleaner cleaner = cleaner(p)) {
        final CountDownLatch threadStarted = new CountDownLatch(1);
        p.allowCoreThreadTimeOut(true);
        p.execute(new CheckedRunnable() {
            public void realRun() {
                threadStarted.countDown();
                assertEquals(1, p.getPoolSize());
            }});
        await(threadStarted);
        delay(keepAliveTime);
        long startTime = System.nanoTime();
        while (p.getPoolSize() > 0
               && millisElapsedSince(startTime) < LONG_DELAY_MS)
            Thread.yield();
        assertTrue(millisElapsedSince(startTime) < LONG_DELAY_MS);
        assertEquals(0, p.getPoolSize());
    }
}
 
源代码3 项目: j2objc   文件: ThreadPoolExecutorTest.java
/**
 * getActiveCount increases but doesn't overestimate, when a
 * thread becomes active
 */
public void testGetActiveCount() throws InterruptedException {
    final CountDownLatch done = new CountDownLatch(1);
    final ThreadPoolExecutor p =
        new ThreadPoolExecutor(2, 2,
                               LONG_DELAY_MS, MILLISECONDS,
                               new ArrayBlockingQueue<Runnable>(10));
    try (PoolCleaner cleaner = cleaner(p, done)) {
        final CountDownLatch threadStarted = new CountDownLatch(1);
        assertEquals(0, p.getActiveCount());
        p.execute(new CheckedRunnable() {
            public void realRun() throws InterruptedException {
                threadStarted.countDown();
                assertEquals(1, p.getActiveCount());
                await(done);
            }});
        await(threadStarted);
        assertEquals(1, p.getActiveCount());
    }
}
 
/**
 * allowCoreThreadTimeOut(false) causes idle threads not to time out
 */
public void testAllowCoreThreadTimeOut_false() throws Exception {
    long keepAliveTime = timeoutMillis();
    final ThreadPoolExecutor p =
        new CustomTPE(2, 10,
                      keepAliveTime, MILLISECONDS,
                      new ArrayBlockingQueue<Runnable>(10));
    try (PoolCleaner cleaner = cleaner(p)) {
        final CountDownLatch threadStarted = new CountDownLatch(1);
        p.allowCoreThreadTimeOut(false);
        p.execute(new CheckedRunnable() {
            public void realRun() throws InterruptedException {
                threadStarted.countDown();
                assertTrue(p.getPoolSize() >= 1);
            }});
        delay(2 * keepAliveTime);
        assertTrue(p.getPoolSize() >= 1);
    }
}
 
源代码5 项目: j2objc   文件: ThreadPoolExecutorSubclassTest.java
/**
 * getLargestPoolSize increases, but doesn't overestimate, when
 * multiple threads active
 */
public void testGetLargestPoolSize() throws InterruptedException {
    final int THREADS = 3;
    final CountDownLatch done = new CountDownLatch(1);
    final ThreadPoolExecutor p =
        new CustomTPE(THREADS, THREADS,
                      LONG_DELAY_MS, MILLISECONDS,
                      new ArrayBlockingQueue<Runnable>(10));
    try (PoolCleaner cleaner = cleaner(p, done)) {
        assertEquals(0, p.getLargestPoolSize());
        final CountDownLatch threadsStarted = new CountDownLatch(THREADS);
        for (int i = 0; i < THREADS; i++)
            p.execute(new CheckedRunnable() {
                public void realRun() throws InterruptedException {
                    threadsStarted.countDown();
                    await(done);
                    assertEquals(THREADS, p.getLargestPoolSize());
                }});
        await(threadsStarted);
        assertEquals(THREADS, p.getLargestPoolSize());
    }
    assertEquals(THREADS, p.getLargestPoolSize());
}
 
源代码6 项目: j2objc   文件: ThreadPoolExecutorTest.java
/**
 * getCompletedTaskCount increases, but doesn't overestimate,
 * when tasks complete
 */
public void testGetCompletedTaskCount() throws InterruptedException {
    final ThreadPoolExecutor p =
        new ThreadPoolExecutor(2, 2,
                               LONG_DELAY_MS, MILLISECONDS,
                               new ArrayBlockingQueue<Runnable>(10));
    try (PoolCleaner cleaner = cleaner(p)) {
        final CountDownLatch threadStarted = new CountDownLatch(1);
        final CountDownLatch threadProceed = new CountDownLatch(1);
        final CountDownLatch threadDone = new CountDownLatch(1);
        assertEquals(0, p.getCompletedTaskCount());
        p.execute(new CheckedRunnable() {
            public void realRun() throws InterruptedException {
                threadStarted.countDown();
                assertEquals(0, p.getCompletedTaskCount());
                threadProceed.await();
                threadDone.countDown();
            }});
        await(threadStarted);
        assertEquals(0, p.getCompletedTaskCount());
        threadProceed.countDown();
        threadDone.await();
        long startTime = System.nanoTime();
        while (p.getCompletedTaskCount() != 1) {
            if (millisElapsedSince(startTime) > LONG_DELAY_MS)
                fail("timed out");
            Thread.yield();
        }
    }
}
 
源代码7 项目: j2objc   文件: ThreadPoolExecutorTest.java
/**
 * remove(task) removes queued task, and fails to remove active task
 */
public void testRemove() throws InterruptedException {
    final CountDownLatch done = new CountDownLatch(1);
    BlockingQueue<Runnable> q = new ArrayBlockingQueue<Runnable>(10);
    final ThreadPoolExecutor p =
        new ThreadPoolExecutor(1, 1,
                               LONG_DELAY_MS, MILLISECONDS,
                               q);
    try (PoolCleaner cleaner = cleaner(p, done)) {
        Runnable[] tasks = new Runnable[6];
        final CountDownLatch threadStarted = new CountDownLatch(1);
        for (int i = 0; i < tasks.length; i++) {
            tasks[i] = new CheckedRunnable() {
                public void realRun() throws InterruptedException {
                    threadStarted.countDown();
                    await(done);
                }};
            p.execute(tasks[i]);
        }
        await(threadStarted);
        assertFalse(p.remove(tasks[0]));
        assertTrue(q.contains(tasks[4]));
        assertTrue(q.contains(tasks[3]));
        assertTrue(p.remove(tasks[4]));
        assertFalse(p.remove(tasks[4]));
        assertFalse(q.contains(tasks[4]));
        assertTrue(q.contains(tasks[3]));
        assertTrue(p.remove(tasks[3]));
        assertFalse(q.contains(tasks[3]));
    }
}
 
源代码8 项目: hadoop   文件: AsyncDiskService.java
/**
 * Execute the task sometime in the future, using ThreadPools.
 */
public synchronized void execute(String root, Runnable task) {
  ThreadPoolExecutor executor = executors.get(root);
  if (executor == null) {
    throw new RuntimeException("Cannot find root " + root
        + " for execution of task " + task);
  } else {
    executor.execute(task);
  }
}
 
源代码9 项目: weakHandler   文件: WeakHandlerTest.java
@Test(timeout = 30000)
public void concurrentAdd() throws NoSuchFieldException, IllegalAccessException, InterruptedException {
    ThreadPoolExecutor executor = new ThreadPoolExecutor(10, 50, 10, TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>(100));
    final Set<Runnable> added = Collections.synchronizedSet(new HashSet());
    final CountDownLatch latch = new CountDownLatch(999);
    // Adding 1000 Runnables from different threads
    mHandler.post(new SleepyRunnable(0));
    for (int i = 0; i < 999; ++i) {
        final SleepyRunnable sleepyRunnable = new SleepyRunnable(i+1);
        executor.execute(new Runnable() {
            @Override
            public void run() {
                mHandler.post(sleepyRunnable);
                added.add(sleepyRunnable);
                latch.countDown();
            }
        });
    }

    // Waiting until all runnables added
    // Notified by #Notify1
    latch.await();

    ChainedRef ref = mHandler.mRunnables.next;
    while (ref != null) {
        assertTrue("Must remove runnable from chained list: " + ref.runnable, added.remove(ref.runnable));
        ref = ref.next;
    }

    assertTrue("All runnables should present in chain, however we still haven't found " + added, added.isEmpty());
}
 
源代码10 项目: j2objc   文件: ThreadPoolExecutorSubclassTest.java
/**
 * execute using DiscardOldestPolicy drops task on shutdown
 */
public void testDiscardOldestOnShutdown() {
    final ThreadPoolExecutor p =
        new CustomTPE(1, 1,
                      LONG_DELAY_MS, MILLISECONDS,
                      new ArrayBlockingQueue<Runnable>(1),
                      new CustomTPE.DiscardOldestPolicy());

    try { p.shutdown(); } catch (SecurityException ok) { return; }
    try (PoolCleaner cleaner = cleaner(p)) {
        TrackedNoOpRunnable r = new TrackedNoOpRunnable();
        p.execute(r);
        assertFalse(r.done);
    }
}
 
源代码11 项目: RDFS   文件: AsyncDiskService.java
/**
 * Execute the task sometime in the future, using ThreadPools.
 */
public synchronized void execute(String root, Runnable task) {
  ThreadPoolExecutor executor = executors.get(root);
  if (executor == null) {
    throw new RuntimeException("Cannot find root " + root
        + " for execution of task " + task);
  } else {
    executor.execute(task);
  }
}
 
源代码12 项目: j2objc   文件: ThreadPoolExecutorSubclassTest.java
/**
 * getTaskCount increases, but doesn't overestimate, when tasks submitted
 */
public void testGetTaskCount() throws InterruptedException {
    final int TASKS = 3;
    final CountDownLatch done = new CountDownLatch(1);
    final ThreadPoolExecutor p =
        new CustomTPE(1, 1,
                      LONG_DELAY_MS, MILLISECONDS,
                      new ArrayBlockingQueue<Runnable>(10));
    try (PoolCleaner cleaner = cleaner(p, done)) {
        final CountDownLatch threadStarted = new CountDownLatch(1);
        assertEquals(0, p.getTaskCount());
        assertEquals(0, p.getCompletedTaskCount());
        p.execute(new CheckedRunnable() {
            public void realRun() throws InterruptedException {
                threadStarted.countDown();
                await(done);
            }});
        await(threadStarted);
        assertEquals(1, p.getTaskCount());
        assertEquals(0, p.getCompletedTaskCount());
        for (int i = 0; i < TASKS; i++) {
            assertEquals(1 + i, p.getTaskCount());
            p.execute(new CheckedRunnable() {
                public void realRun() throws InterruptedException {
                    threadStarted.countDown();
                    assertEquals(1 + TASKS, p.getTaskCount());
                    await(done);
                }});
        }
        assertEquals(1 + TASKS, p.getTaskCount());
        assertEquals(0, p.getCompletedTaskCount());
    }
    assertEquals(1 + TASKS, p.getTaskCount());
    assertEquals(1 + TASKS, p.getCompletedTaskCount());
}
 
源代码13 项目: WePush   文件: PushRunThread.java
/**
 * 消息数据分片以及线程纷发
 */
private static void shardingAndMsgThread() {
    PushForm pushForm = PushForm.getInstance();
    Object[] data;

    int maxThreadPoolSize = App.config.getMaxThreadPool();
    ThreadPoolExecutor threadPoolExecutor = ThreadUtil.newExecutor(maxThreadPoolSize, maxThreadPoolSize);
    MsgSendThread msgSendThread;
    // 每个线程分配
    int perThread = (int) (PushData.totalRecords / PushData.threadCount) + 1;
    DefaultTableModel tableModel = (DefaultTableModel) pushForm.getPushThreadTable().getModel();
    BaseMsgThread.msgType = App.config.getMsgType();
    for (int i = 0; i < PushData.threadCount; i++) {
        int startIndex = i * perThread;
        if (startIndex > PushData.totalRecords - 1) {
            PushData.threadCount = i;
            break;
        }
        int endIndex = i * perThread + perThread;
        if (endIndex > PushData.totalRecords - 1) {
            endIndex = (int) (PushData.totalRecords);
        }

        IMsgSender msgSender = MsgSenderFactory.getMsgSender();
        msgSendThread = new MsgSendThread(startIndex, endIndex, msgSender);

        msgSendThread.setTableRow(i);
        msgSendThread.setName("T-" + i);

        data = new Object[6];
        data[0] = msgSendThread.getName();
        data[1] = startIndex + "-" + endIndex;
        data[5] = 0;
        tableModel.addRow(data);

        threadPoolExecutor.execute(msgSendThread);
    }
    ConsoleUtil.consoleWithLog("所有线程宝宝启动完毕……");
}
 
源代码14 项目: p4ic4idea   文件: RshServerTestThreaded.java
/**
 * Test 'rsh' mode server.
 */
@Test
public void testRshServer() {

	try{        
		ThreadPoolExecutor executor = (ThreadPoolExecutor) Executors.newCachedThreadPool();
		//adjust number of threads as needed for testing		
		for (int i = 1; i <= 2; i++)
		{
			String depotPath = "//depot/...";
			SyncDepot task = new SyncDepot(depotPath);
			System.out.println("A new task has been added to sync : " + depotPath);
			executor.execute(task);
		}
		executor.shutdown();

		while (!executor.isTerminated()) {
			System.out.println("Threads are still running...");
			Thread.sleep(1000);
		}

		System.out.println("Finished all threads");

	} catch (Exception exc) {                            
		fail("Unexpected exception: " + exc.getLocalizedMessage());
	}
}
 
源代码15 项目: openjdk-jdk9   文件: ThreadPoolExecutorTest.java
/**
 * execute using CallerRunsPolicy drops task on shutdown
 */
public void testCallerRunsOnShutdown() {
    RejectedExecutionHandler h = new ThreadPoolExecutor.CallerRunsPolicy();
    final ThreadPoolExecutor p =
        new ThreadPoolExecutor(1, 1,
                               LONG_DELAY_MS, MILLISECONDS,
                               new ArrayBlockingQueue<Runnable>(1), h);

    try { p.shutdown(); } catch (SecurityException ok) { return; }
    try (PoolCleaner cleaner = cleaner(p)) {
        TrackedNoOpRunnable r = new TrackedNoOpRunnable();
        p.execute(r);
        assertFalse(r.done);
    }
}
 
源代码16 项目: javabase   文件: ExecutorReview.java
private static void MyThreadPoolExecutor() throws InterruptedException {
	ThreadPoolExecutor defaultThreadPool = new ThreadPoolExecutor(256, 256, 0L, TimeUnit.MILLISECONDS,
			new SynchronousQueue<Runnable>(), new NamedThreadFactory("async", true));
	defaultThreadPool.execute(getThread());
       //Daemon为true 主线程结束 ,多线程也会结束
	for(int i = 0; i < 10; i++) {
		System.out.println("结束");
		Thread.sleep(1000);
	}
}
 
源代码17 项目: lucene-solr   文件: UnloadDistributedZkTest.java
private void testUnloadLotsOfCores() throws Exception {
  JettySolrRunner jetty = jettys.get(0);
  try (final HttpSolrClient adminClient = (HttpSolrClient) jetty.newClient(15000, 60000)) {
    int numReplicas = atLeast(3);
    ThreadPoolExecutor executor = new ExecutorUtil.MDCAwareThreadPoolExecutor(0, Integer.MAX_VALUE,
        5, TimeUnit.SECONDS, new SynchronousQueue<>(),
        new SolrNamedThreadFactory("testExecutor"));
    try {
      // create the cores
      createCollectionInOneInstance(adminClient, jetty.getNodeName(), executor, "multiunload", 2, numReplicas);
    } finally {
      ExecutorUtil.shutdownAndAwaitTermination(executor);
    }

    executor = new ExecutorUtil.MDCAwareThreadPoolExecutor(0, Integer.MAX_VALUE, 5,
        TimeUnit.SECONDS, new SynchronousQueue<>(),
        new SolrNamedThreadFactory("testExecutor"));
    try {
      for (int j = 0; j < numReplicas; j++) {
        final int freezeJ = j;
        executor.execute(() -> {
          Unload unloadCmd = new Unload(true);
          unloadCmd.setCoreName("multiunload" + freezeJ);
          try {
            adminClient.request(unloadCmd);
          } catch (SolrServerException | IOException e) {
            throw new RuntimeException(e);
          }
        });
        Thread.sleep(random().nextInt(50));
      }
    } finally {
      ExecutorUtil.shutdownAndAwaitTermination(executor);
    }
  }
}
 
源代码18 项目: j2objc   文件: ThreadPoolExecutorTest.java
/**
 * purge removes cancelled tasks from the queue
 */
public void testPurge() throws InterruptedException {
    final CountDownLatch threadStarted = new CountDownLatch(1);
    final CountDownLatch done = new CountDownLatch(1);
    final BlockingQueue<Runnable> q = new ArrayBlockingQueue<Runnable>(10);
    final ThreadPoolExecutor p =
        new ThreadPoolExecutor(1, 1,
                               LONG_DELAY_MS, MILLISECONDS,
                               q);
    try (PoolCleaner cleaner = cleaner(p, done)) {
        FutureTask[] tasks = new FutureTask[5];
        for (int i = 0; i < tasks.length; i++) {
            Callable task = new CheckedCallable<Boolean>() {
                public Boolean realCall() throws InterruptedException {
                    threadStarted.countDown();
                    await(done);
                    return Boolean.TRUE;
                }};
            tasks[i] = new FutureTask(task);
            p.execute(tasks[i]);
        }
        await(threadStarted);
        assertEquals(tasks.length, p.getTaskCount());
        assertEquals(tasks.length - 1, q.size());
        assertEquals(1L, p.getActiveCount());
        assertEquals(0L, p.getCompletedTaskCount());
        tasks[4].cancel(true);
        tasks[3].cancel(false);
        p.purge();
        assertEquals(tasks.length - 3, q.size());
        assertEquals(tasks.length - 2, p.getTaskCount());
        p.purge();         // Nothing to do
        assertEquals(tasks.length - 3, q.size());
        assertEquals(tasks.length - 2, p.getTaskCount());
    }
}
 
源代码19 项目: activemq-artemis   文件: ClientThreadPoolsTest.java
private void testSystemPropertiesThreadPoolSettings(int expectedMax, int expectedScheduled) throws Exception {
   ServerLocatorImpl serverLocator = new ServerLocatorImpl(false);
   serverLocator.isUseGlobalPools();

   Method setThreadPools = ServerLocatorImpl.class.getDeclaredMethod("setThreadPools");
   setThreadPools.setAccessible(true);
   setThreadPools.invoke(serverLocator);

   // TODO: I would get this from the ActiveMQClient
   Field threadPoolField = ServerLocatorImpl.class.getDeclaredField("threadPool");
   Field scheduledThreadPoolField = ServerLocatorImpl.class.getDeclaredField("scheduledThreadPool");

   threadPoolField.setAccessible(true);
   scheduledThreadPoolField.setAccessible(true);

   ThreadPoolExecutor threadPool = (ThreadPoolExecutor) ActiveMQClient.getGlobalThreadPool();

   final CountDownLatch doneMax = new CountDownLatch(expectedMax);
   final CountDownLatch latch = new CountDownLatch(1);
   final CountDownLatch latchTotal = new CountDownLatch(expectedMax * 3); // we will schedule 3 * max, so all runnables should execute
   final AtomicInteger errors = new AtomicInteger(0);

   // Set this to true if you need to debug why executions are not being performed.
   final boolean debugExecutions = false;

   for (int i = 0; i < expectedMax * 3; i++) {
      final int localI = i;
      threadPool.execute(new Runnable() {
         @Override
         public void run() {
            try {

               if (debugExecutions) {
                  System.out.println("runnable " + localI);
               }
               doneMax.countDown();
               latch.await();
               latchTotal.countDown();
            } catch (Exception e) {
               errors.incrementAndGet();
            } finally {
               if (debugExecutions) {
                  System.out.println("done " + localI);
               }
            }
         }
      });
   }

   Assert.assertTrue(doneMax.await(5, TimeUnit.SECONDS));
   latch.countDown();
   Assert.assertTrue(latchTotal.await(5, TimeUnit.SECONDS));

   ScheduledThreadPoolExecutor scheduledThreadPool = (ScheduledThreadPoolExecutor) scheduledThreadPoolField.get(serverLocator);

   assertEquals(expectedMax, threadPool.getMaximumPoolSize());
   assertEquals(expectedScheduled, scheduledThreadPool.getCorePoolSize());
}
 
源代码20 项目: hadoop   文件: NMClientAsyncImpl.java
@Override
protected void serviceStart() throws Exception {
  client.start();

  ThreadFactory tf = new ThreadFactoryBuilder().setNameFormat(
      this.getClass().getName() + " #%d").setDaemon(true).build();

  // Start with a default core-pool size and change it dynamically.
  int initSize = Math.min(INITIAL_THREAD_POOL_SIZE, maxThreadPoolSize);
  threadPool = new ThreadPoolExecutor(initSize, Integer.MAX_VALUE, 1,
      TimeUnit.HOURS, new LinkedBlockingQueue<Runnable>(), tf);

  eventDispatcherThread = new Thread() {
    @Override
    public void run() {
      ContainerEvent event = null;
      Set<String> allNodes = new HashSet<String>();

      while (!stopped.get() && !Thread.currentThread().isInterrupted()) {
        try {
          event = events.take();
        } catch (InterruptedException e) {
          if (!stopped.get()) {
            LOG.error("Returning, thread interrupted", e);
          }
          return;
        }

        allNodes.add(event.getNodeId().toString());

        int threadPoolSize = threadPool.getCorePoolSize();

        // We can increase the pool size only if haven't reached the maximum
        // limit yet.
        if (threadPoolSize != maxThreadPoolSize) {

          // nodes where containers will run at *this* point of time. This is
          // *not* the cluster size and doesn't need to be.
          int nodeNum = allNodes.size();
          int idealThreadPoolSize = Math.min(maxThreadPoolSize, nodeNum);

          if (threadPoolSize < idealThreadPoolSize) {
            // Bump up the pool size to idealThreadPoolSize +
            // INITIAL_POOL_SIZE, the later is just a buffer so we are not
            // always increasing the pool-size
            int newThreadPoolSize = Math.min(maxThreadPoolSize,
                idealThreadPoolSize + INITIAL_THREAD_POOL_SIZE);
            LOG.info("Set NMClientAsync thread pool size to " +
                newThreadPoolSize + " as the number of nodes to talk to is "
                + nodeNum);
            threadPool.setCorePoolSize(newThreadPoolSize);
          }
        }

        // the events from the queue are handled in parallel with a thread
        // pool
        threadPool.execute(getContainerEventProcessor(event));

        // TODO: Group launching of multiple containers to a single
        // NodeManager into a single connection
      }
    }
  };
  eventDispatcherThread.setName("Container  Event Dispatcher");
  eventDispatcherThread.setDaemon(false);
  eventDispatcherThread.start();

  super.serviceStart();
}