类java.util.concurrent.CompletionService源码实例Demo

下面列出了怎么用java.util.concurrent.CompletionService的API类实例代码及写法,或者点击链接到github查看源代码。

源代码1 项目: hop   文件: LoggingRegistrySingltonTest.java
/**
 * Test that LoggingRegistry is concurrent-sage initialized over multiple calls. Creating more than 1000 threads can
 * cause significant performance impact.
 *
 * @throws InterruptedException
 * @throws ExecutionException
 */
@Test( timeout = 30000 )
public void testLoggingRegistryConcurrentInitialization() throws InterruptedException, ExecutionException {
  CountDownLatch start = new CountDownLatch( 1 );

  int count = 10;
  CompletionService<LoggingRegistry> drover = registerHounds( count, start );
  // fire!
  start.countDown();

  Set<LoggingRegistry> distinct = new HashSet<LoggingRegistry>();

  int i = 0;
  while ( i < count ) {
    Future<LoggingRegistry> complete = drover.poll( 15, TimeUnit.SECONDS );
    LoggingRegistry instance = complete.get();
    distinct.add( instance );
    i++;
  }
  Assert.assertEquals( "Only one singlton instance ;)", 1, distinct.size() );
}
 
源代码2 项目: swage   文件: StateCaptureTest.java
@Test
public void testCompletionServiceRunnableCaptures() throws InterruptedException, Exception {
    // Setup
    ExecutorService executor = Executors.newCachedThreadPool();
    CompletionService<Object> delegate = new ExecutorCompletionService<>(executor);
    CompletionService<Object> cs = StateCapture.capturingDecorator(delegate);

    CapturedState mockCapturedState = mock(CapturedState.class);
    Runnable mockRunnable = mock(Runnable.class);
    ThreadLocalStateCaptor.THREAD_LOCAL.set(mockCapturedState);
    Object result = new Object();
    Future<Object> futureResult = cs.submit(mockRunnable, result);
    assertThat("Expected the delegate response to return",
        result, sameInstance(futureResult.get()));
    executor.shutdown();

    verifyStandardCaptures(mockCapturedState, mockRunnable);
}
 
/**
 * @param states This is a collection of TaskState.
 */
@Override
public void publishData(Collection<? extends WorkUnitState> states)
    throws IOException {
  CompletionService<Collection<HiveSpec>> completionService =
      new ExecutorCompletionService<>(this.hivePolicyExecutor);

  int toRegisterPathCount = computeSpecs(states, completionService);
  for (int i = 0; i < toRegisterPathCount; i++) {
    try {
      for (HiveSpec spec : completionService.take().get()) {
        allRegisteredPartitions.add(spec);
        this.hiveRegister.register(spec);
      }
    } catch (InterruptedException | ExecutionException e) {
      log.info("Failed to generate HiveSpec", e);
      throw new IOException(e);
    }
  }
  log.info("Finished registering all HiveSpecs");
}
 
源代码4 项目: navi-pbrpc   文件: PooledPbrpcClientMainTest.java
public void testPoolBatch() throws Exception {
    PbrpcClient client = PbrpcClientFactory.buildPooledConnection(new PooledConfiguration(),
            "127.0.0.1", 8088, 60000);
    int multiSize = 8;
    int totalRequestSize = 100;
    ExecutorService pool = Executors.newFixedThreadPool(multiSize);
    CompletionService<DemoBatchResponse> completionService = new ExecutorCompletionService<DemoBatchResponse>(
            pool);

    BatchInvoker invoker = new BatchInvoker(client);
    long time = System.currentTimeMillis();
    for (int i = 0; i < totalRequestSize; i++) {
        completionService.submit(invoker);
    }

    for (int i = 0; i < totalRequestSize; i++) {
        completionService.take().get();
    }

    long timetook = System.currentTimeMillis() - time;
    LOG.info("Total using " + timetook + "ms");
    LOG.info("QPS:" + 1000f / ((timetook) / (1.0f * totalRequestSize)));
}
 
源代码5 项目: articles   文件: PreJava8Test.java
@Test
void example_parallel_completion_order() throws Exception {
    ExecutorService executor = Executors.newFixedThreadPool(10);

    CompletionService<Integer> completionService =
      new ExecutorCompletionService<>(executor);

    for (Integer integer : integers) {
        completionService.submit(() -> Utils.process(integer));
    }

    List<Integer> results = new ArrayList<>();

    for (int i = 0; i < integers.size(); i++) {
        results.add(completionService.take().get());
    }

    assertThat(results)
      .containsExactlyInAnyOrderElementsOf(integers);
}
 
源代码6 项目: incubator-pinot   文件: MultiGetRequest.java
/**
 * GET urls in parallel using the executor service.
 * @param urls absolute URLs to GET
 * @param timeoutMs timeout in milliseconds for each GET request
 * @return instance of CompletionService. Completion service will provide
 *   results as they arrive. The order is NOT same as the order of URLs
 */
public CompletionService<GetMethod> execute(List<String> urls, int timeoutMs) {
  HttpClientParams clientParams = new HttpClientParams();
  clientParams.setConnectionManagerTimeout(timeoutMs);
  HttpClient client = new HttpClient(clientParams, _connectionManager);

  CompletionService<GetMethod> completionService = new ExecutorCompletionService<>(_executor);
  for (String url : urls) {
    completionService.submit(() -> {
      try {
        GetMethod getMethod = new GetMethod(url);
        getMethod.getParams().setSoTimeout(timeoutMs);
        client.executeMethod(getMethod);
        return getMethod;
      } catch (Exception e) {
        // Log only exception type and message instead of the whole stack trace
        LOGGER.warn("Caught '{}' while executing GET on URL: {}", e.toString(), url);
        throw e;
      }
    });
  }
  return completionService;
}
 
/**
 * poll returns non-null when the returned task is completed
 */
public void testPoll1()
    throws InterruptedException, ExecutionException {
    CompletionService cs = new ExecutorCompletionService(cachedThreadPool);
    assertNull(cs.poll());
    cs.submit(new StringTask());

    long startTime = System.nanoTime();
    Future f;
    while ((f = cs.poll()) == null) {
        if (millisElapsedSince(startTime) > LONG_DELAY_MS)
            fail("timed out");
        Thread.yield();
    }
    assertTrue(f.isDone());
    assertSame(TEST_STRING, f.get());
}
 
/**
 * timed poll returns non-null when the returned task is completed
 */
public void testPoll2()
    throws InterruptedException, ExecutionException {
    CompletionService cs = new ExecutorCompletionService(cachedThreadPool);
    assertNull(cs.poll());
    cs.submit(new StringTask());

    long startTime = System.nanoTime();
    Future f;
    while ((f = cs.poll(SHORT_DELAY_MS, MILLISECONDS)) == null) {
        if (millisElapsedSince(startTime) > LONG_DELAY_MS)
            fail("timed out");
        Thread.yield();
    }
    assertTrue(f.isDone());
    assertSame(TEST_STRING, f.get());
}
 
/**
 * poll returns null before the returned task is completed
 */
public void testPollReturnsNull()
    throws InterruptedException, ExecutionException {
    CompletionService cs = new ExecutorCompletionService(cachedThreadPool);
    final CountDownLatch proceed = new CountDownLatch(1);
    cs.submit(new Callable() { public String call() throws Exception {
        proceed.await();
        return TEST_STRING;
    }});
    assertNull(cs.poll());
    assertNull(cs.poll(0L, MILLISECONDS));
    assertNull(cs.poll(Long.MIN_VALUE, MILLISECONDS));
    long startTime = System.nanoTime();
    assertNull(cs.poll(timeoutMillis(), MILLISECONDS));
    assertTrue(millisElapsedSince(startTime) >= timeoutMillis());
    proceed.countDown();
    assertSame(TEST_STRING, cs.take().get());
}
 
/**
 * successful and failed tasks are both returned
 */
public void testTaskAssortment()
    throws InterruptedException, ExecutionException {
    CompletionService cs = new ExecutorCompletionService(cachedThreadPool);
    ArithmeticException ex = new ArithmeticException();
    for (int i = 0; i < 2; i++) {
        cs.submit(new StringTask());
        cs.submit(callableThrowing(ex));
        cs.submit(runnableThrowing(ex), null);
    }
    int normalCompletions = 0;
    int exceptionalCompletions = 0;
    for (int i = 0; i < 3 * 2; i++) {
        try {
            if (cs.take().get() == TEST_STRING)
                normalCompletions++;
        }
        catch (ExecutionException expected) {
            assertTrue(expected.getCause() instanceof ArithmeticException);
            exceptionalCompletions++;
        }
    }
    assertEquals(2 * 1, normalCompletions);
    assertEquals(2 * 2, exceptionalCompletions);
    assertNull(cs.poll());
}
 
void solveAny(Executor e,
              Collection<Callable<Integer>> solvers)
    throws InterruptedException {
    CompletionService<Integer> cs
        = new ExecutorCompletionService<>(e);
    int n = solvers.size();
    List<Future<Integer>> futures = new ArrayList<>(n);
    Integer result = null;
    try {
        solvers.forEach(solver -> futures.add(cs.submit(solver)));
        for (int i = n; i > 0; i--) {
            try {
                Integer r = cs.take().get();
                if (r != null) {
                    result = r;
                    break;
                }
            } catch (ExecutionException ignore) {}
        }
    } finally {
        futures.forEach(future -> future.cancel(true));
    }

    if (result != null)
        use(result);
}
 
源代码12 项目: vlingo-actors   文件: StageTest.java
@Test
public void testMultiThreadRawLookupOrStartFindsActorPreviouslyStartedWIthRawLookupOrStart() {
  final int size = 1000;

  List<Address> addresses = IntStream.range(0, size)
      .mapToObj((ignored) -> world.addressFactory().unique())
      .collect(Collectors.toList());

  CompletionService<Actor> completionService =
      new ExecutorCompletionService<>(exec);

  final Definition definition = Definition.has(ParentInterfaceActor.class,
      ParentInterfaceActor::new);

  multithreadedLookupOrStartTest(index ->
          completionService.submit(() ->
              world.stage()
                  .rawLookupOrStart(definition, addresses.get(index)))
      , size);
}
 
源代码13 项目: hadoop   文件: DFSInputStream.java
private ByteBuffer getFirstToComplete(
    CompletionService<ByteBuffer> hedgedService,
    ArrayList<Future<ByteBuffer>> futures) throws InterruptedException {
  if (futures.isEmpty()) {
    throw new InterruptedException("let's retry");
  }
  Future<ByteBuffer> future = null;
  try {
    future = hedgedService.take();
    ByteBuffer bb = future.get();
    futures.remove(future);
    return bb;
  } catch (ExecutionException e) {
    // already logged in the Callable
    futures.remove(future);
  } catch (CancellationException ce) {
    // already logged in the Callable
    futures.remove(future);
  }

  throw new InterruptedException("let's retry");
}
 
源代码14 项目: reflection-util   文件: PropertyUtilsTest.java
@Test
@Timeout(30)
void testConcurrentlyCreateProxyClasses() throws Exception {
	ExecutorService executorService = Executors.newFixedThreadPool(4);
	try {
		CompletionService<Void> completionService = new ExecutorCompletionService<>(executorService);
		for (int i = 0; i < 4; i++) {
			completionService.submit(() -> {
				for (int r = 0; r < 100; r++) {
					PropertyUtils.getPropertyDescriptor(TestEntity.class, TestEntity::getNumber);
					PropertyUtils.clearCache();
				}
				return null;
			});
		}
		for (int i = 0; i < 4; i++) {
			completionService.take().get();
		}
	} finally {
		executorService.shutdown();
	}
}
 
源代码15 项目: big-c   文件: TestContainerLocalizer.java
@Test
@SuppressWarnings("unchecked") // mocked generics
public void testContainerLocalizerClosesFilesystems() throws Exception {
  // verify filesystems are closed when localizer doesn't fail
  FileContext fs = FileContext.getLocalFSFileContext();
  spylfs = spy(fs.getDefaultFileSystem());
  ContainerLocalizer localizer = setupContainerLocalizerForTest();
  doNothing().when(localizer).localizeFiles(any(LocalizationProtocol.class),
      any(CompletionService.class), any(UserGroupInformation.class));
  verify(localizer, never()).closeFileSystems(
      any(UserGroupInformation.class));
  localizer.runLocalization(nmAddr);
  verify(localizer).closeFileSystems(any(UserGroupInformation.class));

  spylfs = spy(fs.getDefaultFileSystem());
  // verify filesystems are closed when localizer fails
  localizer = setupContainerLocalizerForTest();
  doThrow(new YarnRuntimeException("Forced Failure")).when(localizer).localizeFiles(
      any(LocalizationProtocol.class), any(CompletionService.class),
      any(UserGroupInformation.class));
  verify(localizer, never()).closeFileSystems(
      any(UserGroupInformation.class));
  localizer.runLocalization(nmAddr);
  verify(localizer).closeFileSystems(any(UserGroupInformation.class));
}
 
源代码16 项目: big-c   文件: DFSInputStream.java
private ByteBuffer getFirstToComplete(
    CompletionService<ByteBuffer> hedgedService,
    ArrayList<Future<ByteBuffer>> futures) throws InterruptedException {
  if (futures.isEmpty()) {
    throw new InterruptedException("let's retry");
  }
  Future<ByteBuffer> future = null;
  try {
    future = hedgedService.take();
    ByteBuffer bb = future.get();
    futures.remove(future);
    return bb;
  } catch (ExecutionException e) {
    // already logged in the Callable
    futures.remove(future);
  } catch (CancellationException ce) {
    // already logged in the Callable
    futures.remove(future);
  }

  throw new InterruptedException("let's retry");
}
 
/**
 * Test that LoggingRegistry is concurrent-sage initialized over multiple calls. Creating more than 1000 threads can
 * cause significant performance impact.
 * 
 * @throws InterruptedException
 * @throws ExecutionException
 * 
 */
@Test( timeout = 30000 )
public void testLoggingRegistryConcurrentInitialization() throws InterruptedException, ExecutionException {
  CountDownLatch start = new CountDownLatch( 1 );

  int count = 10;
  CompletionService<LoggingRegistry> drover = registerHounds( count, start );
  // fire!
  start.countDown();

  Set<LoggingRegistry> distinct = new HashSet<LoggingRegistry>();

  int i = 0;
  while ( i < count ) {
    Future<LoggingRegistry> complete = drover.poll( 15, TimeUnit.SECONDS );
    LoggingRegistry instance = complete.get();
    distinct.add( instance );
    i++;
  }
  Assert.assertEquals( "Only one singlton instance ;)", 1, distinct.size() );
}
 
@Test
public void testDoSmth() throws Exception {
    Demo.DemoRequest.Builder req = Demo.DemoRequest.newBuilder();
    req.setUserId(1);

    int multiSize = 12;
    int totalRequestSize = 10;
    ExecutorService pool = Executors.newFixedThreadPool(multiSize);
    CompletionService<Demo.DemoResponse> completionService = new ExecutorCompletionService<Demo.DemoResponse>(
            pool);

    Invoker invoker = new Invoker(req.build());
    long time = System.currentTimeMillis();
    for (int i = 0; i < totalRequestSize; i++) {
        completionService.submit(invoker);
    }

    for (int i = 0; i < totalRequestSize; i++) {
        completionService.take().get();
    }

    long timetook = System.currentTimeMillis() - time;
    System.out.println("Total using " + timetook + "ms");
    System.out.println("QPS:" + 1000f / ((timetook) / (1.0f * totalRequestSize)));
}
 
源代码19 项目: presto   文件: Verifier.java
private static <T> T takeUnchecked(CompletionService<T> completionService)
        throws InterruptedException
{
    try {
        return completionService.take().get();
    }
    catch (ExecutionException e) {
        throw new RuntimeException(e);
    }
}
 
源代码20 项目: presto   文件: GlueHiveMetastore.java
private List<Partition> getPartitions(Table table, String expression)
{
    if (partitionSegments == 1) {
        return getPartitions(table, expression, null);
    }

    // Do parallel partition fetch.
    CompletionService<List<Partition>> completionService = new ExecutorCompletionService<>(executor);
    for (int i = 0; i < partitionSegments; i++) {
        Segment segment = new Segment().withSegmentNumber(i).withTotalSegments(partitionSegments);
        completionService.submit(() -> getPartitions(table, expression, segment));
    }

    List<Partition> partitions = new ArrayList<>();
    try {
        for (int i = 0; i < partitionSegments; i++) {
            Future<List<Partition>> futurePartitions = completionService.take();
            partitions.addAll(futurePartitions.get());
        }
    }
    catch (ExecutionException | InterruptedException e) {
        if (e instanceof InterruptedException) {
            Thread.currentThread().interrupt();
        }
        throw new PrestoException(HIVE_METASTORE_ERROR, "Failed to fetch partitions from Glue Data Catalog", e);
    }

    partitions.sort(PARTITION_COMPARATOR);
    return partitions;
}
 
源代码21 项目: stepchain   文件: StepAsyncExecutor.java
@Override
public Boolean execute(I context, Collection<IProcessor<I, Boolean>> processors) throws InterruptedException, ExecutionException {
	Boolean results = true;

	if (!processors.isEmpty()) {
		ThreadPoolExecutor threadPoolExecutor = ThreadPoolFactory.newFixedThreadPool(parallelCount);
		CompletionService<Boolean> cService = new ExecutorCompletionService<>(threadPoolExecutor);
		results = executeByServiceThreadPool(cService, context, processors);
		threadPoolExecutor.shutdown();
		threadPoolExecutor.awaitTermination(11, TimeUnit.HOURS);
	}
	return results;
}
 
源代码22 项目: stepchain   文件: StepAsyncExecutor.java
protected Boolean executeByServiceThreadPool(CompletionService<Boolean> cService, I context, Collection<IProcessor<I, Boolean>> processors) throws InterruptedException, ExecutionException {
	Boolean results = true;
	for (IProcessor<I, Boolean> processor : processors) {
		cService.submit(new Callable<Boolean>() {
			@Override
			public Boolean call() throws Exception {
				if (processor.isEnabled()) {
					try {
						return processor.process(context);
					} catch (Exception ex) {
						logger.error("executeByServiceThreadPool", ex);
						return false;
					}
				} else {
					logger.info(String.format("processor:%s,%s", processor.getClass().getName(), processor.isEnabled()));
				}
				return true;
			}
		});
	}
	for (int i = 0; i < processors.size(); i++) {
		results = results && cService.take().get();
		if (!results) {
			break;
		}
	}
	return results;
}
 
源代码23 项目: kylin   文件: MigrationRuleSet.java
private long executeQueries(final List<String> queries, final Context ctx) throws Exception {
    int maxThreads = KylinConfig.getInstanceFromEnv().getMigrationRuleQueryLatencyMaxThreads();
    int threadNum = Math.min(maxThreads, queries.size());
    ExecutorService threadPool = Executors.newFixedThreadPool(threadNum);
    CompletionService<Long> completionService = new ExecutorCompletionService<Long>(threadPool);
    final Authentication auth = SecurityContextHolder.getContext().getAuthentication();
    long start = System.currentTimeMillis();
    for (final String query : queries) {
        completionService.submit(new Callable<Long>() {
            @Override
            public Long call() throws Exception {
                SecurityContextHolder.getContext().setAuthentication(auth);
                SQLRequest sqlRequest = new SQLRequest();
                sqlRequest.setProject(ctx.getSrcProjectName());
                sqlRequest.setSql(query);
                SQLResponse sqlResponse = ctx.getQueryService().doQueryWithCache(sqlRequest, false);
                if (sqlResponse.getIsException()) {
                    throw new RuleValidationException(sqlResponse.getExceptionMessage());
                }
                return sqlResponse.getDuration();
            }

        });
    }
    long timeCostSum = 0L;
    for (int i = 0; i < queries.size(); ++i) {
        try {
            timeCostSum += completionService.take().get();
        } catch (InterruptedException | ExecutionException e) {
            threadPool.shutdownNow();
            throw e;
        }
    }
    long end = System.currentTimeMillis();
    logger.info("Execute" + queries.size() + " queries took " + (end - start) + " ms, query time cost sum "
            + timeCostSum + " ms.");
    return timeCostSum / queries.size();
}
 
源代码24 项目: SciGraph   文件: OwlOntologyProducerFailFastTest.java
@Test(timeout = 11000)
public void fail_fast() throws InterruptedException, ExecutionException {
  ExecutorService executorService = Executors.newFixedThreadPool(2);
  CompletionService<Long> completionService = new ExecutorCompletionService<Long>(executorService);

  BlockingQueue<OWLCompositeObject> queue = new LinkedBlockingQueue<OWLCompositeObject>();
  BlockingQueue<OntologySetup> ontologyQueue = new LinkedBlockingQueue<OntologySetup>();
  OwlOntologyProducer producer =
      new OwlOntologyProducer(queue, ontologyQueue, new AtomicInteger(), graph);
  OntologySetup ontologyConfig = new OntologySetup();

  ontologyConfig.setUrl("http://localhost:10000/foo.owl");

  List<Future<?>> futures = new ArrayList<>();
  futures.add(completionService.submit(producer));
  futures.add(completionService.submit(producer));
  Thread.sleep(1000);
  ontologyQueue.put(ontologyConfig);

  expectedException.expect(ExecutionException.class);
  while (futures.size() > 0) {
    Future<?> completedFuture = completionService.take();
    futures.remove(completedFuture);
    completedFuture.get();
  }

  executorService.shutdown();
  executorService.awaitTermination(10, TimeUnit.SECONDS);

}
 
源代码25 项目: interview   文件: ThreadPoolExample.java
public void doWork() throws Exception{
    CompletionService<String> completionService = new ExecutorCompletionService<String>(threadPool);
    List<Future<String>> futureList = new ArrayList<Future<String>>();
    for(int i=0; i < 20; i++){
        futureList.add(completionService.submit(new Count10(i)));
    }
    for(int i=0; i < 20; i++){
        Future<String> future = completionService.take();
        System.out.println(future.get());
    }
}
 
AsynchronousGetRecordsRetrievalStrategy(KinesisDataFetcher dataFetcher, ExecutorService executorService,
        int retryGetRecordsInSeconds, Supplier<CompletionService<DataFetcherResult>> completionServiceSupplier,
        String shardId) {
    this.dataFetcher = dataFetcher;
    this.executorService = executorService;
    this.retryGetRecordsInSeconds = retryGetRecordsInSeconds;
    this.completionServiceSupplier = completionServiceSupplier;
    this.shardId = shardId;
}
 
源代码27 项目: kogito-runtimes   文件: ParallelCompilationTest.java
private void parallelExecute(Collection<Callable<KieBase>> solvers) throws Exception {
    CompletionService<KieBase> ecs = new ExecutorCompletionService<KieBase>(executor);
    for (Callable<KieBase> s : solvers) {
        ecs.submit(s);
    }
    assertTimeoutPreemptively(Duration.ofSeconds(10), () -> {
        for (int i = 0; i < PARALLEL_THREADS; ++i) {
            KieBase kbase = ecs.take().get();
        }
    });
}
 
源代码28 项目: kogito-runtimes   文件: DynamicRulesChangesTest.java
private void parallelExecute(Collection<Callable<List<String>>> solvers) throws Exception {
    CompletionService<List<String>> ecs = new ExecutorCompletionService<List<String>>(executor);
    for (Callable<List<String>> s : solvers) {
        ecs.submit(s);
    }
    assertTimeoutPreemptively(Duration.ofSeconds(10), () -> {
        for (int i = 0; i < PARALLEL_THREADS; ++i) {
            List<String> events = ecs.take().get();
            assertEquals(5, events.size());
        }
    });
}
 
源代码29 项目: kogito-runtimes   文件: JavaDialectRuntimeData.java
private void wireInParallel(int wireListSize) throws Exception {
    final int parallelThread = Runtime.getRuntime().availableProcessors();
    CompletionService<Boolean> ecs = ExecutorProviderFactory.getExecutorProvider().getCompletionService();

    int size = wireListSize / parallelThread;
    for (int i = 1; i <= parallelThread; i++) {
        List<String> subList = wireList.subList((i-1) * size, i == parallelThread ? wireListSize : i * size);
        ecs.submit(new WiringExecutor(classLoader, invokerLookups, subList));
    }
    for (int i = 1; i <= parallelThread; i++) {
        ecs.take().get();
    }
}
 
源代码30 项目: Concurnas   文件: ConcurrentJunitRunner.java
public ConcurrentJunitRunner(final Class<?> klass) throws InitializationError {
	super(klass);
	setScheduler(new RunnerScheduler() {
		ExecutorService executorService = Executors.newFixedThreadPool(klass.isAnnotationPresent(Concurrent.class) ? klass.getAnnotation(Concurrent.class).threads() : (int) (Runtime.getRuntime().availableProcessors() * 1.5), new NamedThreadFactory(klass.getSimpleName()));
		CompletionService<Void> completionService = new ExecutorCompletionService<Void>(executorService);
		Queue<Future<Void>> tasks = new LinkedList<Future<Void>>();

		@Override
		public void schedule(Runnable childStatement) {
			tasks.offer(completionService.submit(childStatement, null));
		}

		@Override
		public void finished() {
			try {
				while (!tasks.isEmpty()) {
					tasks.remove(completionService.take());
				}
			} catch (InterruptedException e) {
				Thread.currentThread().interrupt();
			} finally {
				while (!tasks.isEmpty()) {
					tasks.poll().cancel(true);
				}
				executorService.shutdownNow();
			}
		}
	});
}
 
 类所在包
 同包方法