下面列出了怎么用java.util.concurrent.CompletionService的API类实例代码及写法,或者点击链接到github查看源代码。
/**
* Test that LoggingRegistry is concurrent-sage initialized over multiple calls. Creating more than 1000 threads can
* cause significant performance impact.
*
* @throws InterruptedException
* @throws ExecutionException
*/
@Test( timeout = 30000 )
public void testLoggingRegistryConcurrentInitialization() throws InterruptedException, ExecutionException {
CountDownLatch start = new CountDownLatch( 1 );
int count = 10;
CompletionService<LoggingRegistry> drover = registerHounds( count, start );
// fire!
start.countDown();
Set<LoggingRegistry> distinct = new HashSet<LoggingRegistry>();
int i = 0;
while ( i < count ) {
Future<LoggingRegistry> complete = drover.poll( 15, TimeUnit.SECONDS );
LoggingRegistry instance = complete.get();
distinct.add( instance );
i++;
}
Assert.assertEquals( "Only one singlton instance ;)", 1, distinct.size() );
}
@Test
public void testCompletionServiceRunnableCaptures() throws InterruptedException, Exception {
// Setup
ExecutorService executor = Executors.newCachedThreadPool();
CompletionService<Object> delegate = new ExecutorCompletionService<>(executor);
CompletionService<Object> cs = StateCapture.capturingDecorator(delegate);
CapturedState mockCapturedState = mock(CapturedState.class);
Runnable mockRunnable = mock(Runnable.class);
ThreadLocalStateCaptor.THREAD_LOCAL.set(mockCapturedState);
Object result = new Object();
Future<Object> futureResult = cs.submit(mockRunnable, result);
assertThat("Expected the delegate response to return",
result, sameInstance(futureResult.get()));
executor.shutdown();
verifyStandardCaptures(mockCapturedState, mockRunnable);
}
/**
* @param states This is a collection of TaskState.
*/
@Override
public void publishData(Collection<? extends WorkUnitState> states)
throws IOException {
CompletionService<Collection<HiveSpec>> completionService =
new ExecutorCompletionService<>(this.hivePolicyExecutor);
int toRegisterPathCount = computeSpecs(states, completionService);
for (int i = 0; i < toRegisterPathCount; i++) {
try {
for (HiveSpec spec : completionService.take().get()) {
allRegisteredPartitions.add(spec);
this.hiveRegister.register(spec);
}
} catch (InterruptedException | ExecutionException e) {
log.info("Failed to generate HiveSpec", e);
throw new IOException(e);
}
}
log.info("Finished registering all HiveSpecs");
}
public void testPoolBatch() throws Exception {
PbrpcClient client = PbrpcClientFactory.buildPooledConnection(new PooledConfiguration(),
"127.0.0.1", 8088, 60000);
int multiSize = 8;
int totalRequestSize = 100;
ExecutorService pool = Executors.newFixedThreadPool(multiSize);
CompletionService<DemoBatchResponse> completionService = new ExecutorCompletionService<DemoBatchResponse>(
pool);
BatchInvoker invoker = new BatchInvoker(client);
long time = System.currentTimeMillis();
for (int i = 0; i < totalRequestSize; i++) {
completionService.submit(invoker);
}
for (int i = 0; i < totalRequestSize; i++) {
completionService.take().get();
}
long timetook = System.currentTimeMillis() - time;
LOG.info("Total using " + timetook + "ms");
LOG.info("QPS:" + 1000f / ((timetook) / (1.0f * totalRequestSize)));
}
@Test
void example_parallel_completion_order() throws Exception {
ExecutorService executor = Executors.newFixedThreadPool(10);
CompletionService<Integer> completionService =
new ExecutorCompletionService<>(executor);
for (Integer integer : integers) {
completionService.submit(() -> Utils.process(integer));
}
List<Integer> results = new ArrayList<>();
for (int i = 0; i < integers.size(); i++) {
results.add(completionService.take().get());
}
assertThat(results)
.containsExactlyInAnyOrderElementsOf(integers);
}
/**
* GET urls in parallel using the executor service.
* @param urls absolute URLs to GET
* @param timeoutMs timeout in milliseconds for each GET request
* @return instance of CompletionService. Completion service will provide
* results as they arrive. The order is NOT same as the order of URLs
*/
public CompletionService<GetMethod> execute(List<String> urls, int timeoutMs) {
HttpClientParams clientParams = new HttpClientParams();
clientParams.setConnectionManagerTimeout(timeoutMs);
HttpClient client = new HttpClient(clientParams, _connectionManager);
CompletionService<GetMethod> completionService = new ExecutorCompletionService<>(_executor);
for (String url : urls) {
completionService.submit(() -> {
try {
GetMethod getMethod = new GetMethod(url);
getMethod.getParams().setSoTimeout(timeoutMs);
client.executeMethod(getMethod);
return getMethod;
} catch (Exception e) {
// Log only exception type and message instead of the whole stack trace
LOGGER.warn("Caught '{}' while executing GET on URL: {}", e.toString(), url);
throw e;
}
});
}
return completionService;
}
/**
* poll returns non-null when the returned task is completed
*/
public void testPoll1()
throws InterruptedException, ExecutionException {
CompletionService cs = new ExecutorCompletionService(cachedThreadPool);
assertNull(cs.poll());
cs.submit(new StringTask());
long startTime = System.nanoTime();
Future f;
while ((f = cs.poll()) == null) {
if (millisElapsedSince(startTime) > LONG_DELAY_MS)
fail("timed out");
Thread.yield();
}
assertTrue(f.isDone());
assertSame(TEST_STRING, f.get());
}
/**
* timed poll returns non-null when the returned task is completed
*/
public void testPoll2()
throws InterruptedException, ExecutionException {
CompletionService cs = new ExecutorCompletionService(cachedThreadPool);
assertNull(cs.poll());
cs.submit(new StringTask());
long startTime = System.nanoTime();
Future f;
while ((f = cs.poll(SHORT_DELAY_MS, MILLISECONDS)) == null) {
if (millisElapsedSince(startTime) > LONG_DELAY_MS)
fail("timed out");
Thread.yield();
}
assertTrue(f.isDone());
assertSame(TEST_STRING, f.get());
}
/**
* poll returns null before the returned task is completed
*/
public void testPollReturnsNull()
throws InterruptedException, ExecutionException {
CompletionService cs = new ExecutorCompletionService(cachedThreadPool);
final CountDownLatch proceed = new CountDownLatch(1);
cs.submit(new Callable() { public String call() throws Exception {
proceed.await();
return TEST_STRING;
}});
assertNull(cs.poll());
assertNull(cs.poll(0L, MILLISECONDS));
assertNull(cs.poll(Long.MIN_VALUE, MILLISECONDS));
long startTime = System.nanoTime();
assertNull(cs.poll(timeoutMillis(), MILLISECONDS));
assertTrue(millisElapsedSince(startTime) >= timeoutMillis());
proceed.countDown();
assertSame(TEST_STRING, cs.take().get());
}
/**
* successful and failed tasks are both returned
*/
public void testTaskAssortment()
throws InterruptedException, ExecutionException {
CompletionService cs = new ExecutorCompletionService(cachedThreadPool);
ArithmeticException ex = new ArithmeticException();
for (int i = 0; i < 2; i++) {
cs.submit(new StringTask());
cs.submit(callableThrowing(ex));
cs.submit(runnableThrowing(ex), null);
}
int normalCompletions = 0;
int exceptionalCompletions = 0;
for (int i = 0; i < 3 * 2; i++) {
try {
if (cs.take().get() == TEST_STRING)
normalCompletions++;
}
catch (ExecutionException expected) {
assertTrue(expected.getCause() instanceof ArithmeticException);
exceptionalCompletions++;
}
}
assertEquals(2 * 1, normalCompletions);
assertEquals(2 * 2, exceptionalCompletions);
assertNull(cs.poll());
}
void solveAny(Executor e,
Collection<Callable<Integer>> solvers)
throws InterruptedException {
CompletionService<Integer> cs
= new ExecutorCompletionService<>(e);
int n = solvers.size();
List<Future<Integer>> futures = new ArrayList<>(n);
Integer result = null;
try {
solvers.forEach(solver -> futures.add(cs.submit(solver)));
for (int i = n; i > 0; i--) {
try {
Integer r = cs.take().get();
if (r != null) {
result = r;
break;
}
} catch (ExecutionException ignore) {}
}
} finally {
futures.forEach(future -> future.cancel(true));
}
if (result != null)
use(result);
}
@Test
public void testMultiThreadRawLookupOrStartFindsActorPreviouslyStartedWIthRawLookupOrStart() {
final int size = 1000;
List<Address> addresses = IntStream.range(0, size)
.mapToObj((ignored) -> world.addressFactory().unique())
.collect(Collectors.toList());
CompletionService<Actor> completionService =
new ExecutorCompletionService<>(exec);
final Definition definition = Definition.has(ParentInterfaceActor.class,
ParentInterfaceActor::new);
multithreadedLookupOrStartTest(index ->
completionService.submit(() ->
world.stage()
.rawLookupOrStart(definition, addresses.get(index)))
, size);
}
private ByteBuffer getFirstToComplete(
CompletionService<ByteBuffer> hedgedService,
ArrayList<Future<ByteBuffer>> futures) throws InterruptedException {
if (futures.isEmpty()) {
throw new InterruptedException("let's retry");
}
Future<ByteBuffer> future = null;
try {
future = hedgedService.take();
ByteBuffer bb = future.get();
futures.remove(future);
return bb;
} catch (ExecutionException e) {
// already logged in the Callable
futures.remove(future);
} catch (CancellationException ce) {
// already logged in the Callable
futures.remove(future);
}
throw new InterruptedException("let's retry");
}
@Test
@Timeout(30)
void testConcurrentlyCreateProxyClasses() throws Exception {
ExecutorService executorService = Executors.newFixedThreadPool(4);
try {
CompletionService<Void> completionService = new ExecutorCompletionService<>(executorService);
for (int i = 0; i < 4; i++) {
completionService.submit(() -> {
for (int r = 0; r < 100; r++) {
PropertyUtils.getPropertyDescriptor(TestEntity.class, TestEntity::getNumber);
PropertyUtils.clearCache();
}
return null;
});
}
for (int i = 0; i < 4; i++) {
completionService.take().get();
}
} finally {
executorService.shutdown();
}
}
@Test
@SuppressWarnings("unchecked") // mocked generics
public void testContainerLocalizerClosesFilesystems() throws Exception {
// verify filesystems are closed when localizer doesn't fail
FileContext fs = FileContext.getLocalFSFileContext();
spylfs = spy(fs.getDefaultFileSystem());
ContainerLocalizer localizer = setupContainerLocalizerForTest();
doNothing().when(localizer).localizeFiles(any(LocalizationProtocol.class),
any(CompletionService.class), any(UserGroupInformation.class));
verify(localizer, never()).closeFileSystems(
any(UserGroupInformation.class));
localizer.runLocalization(nmAddr);
verify(localizer).closeFileSystems(any(UserGroupInformation.class));
spylfs = spy(fs.getDefaultFileSystem());
// verify filesystems are closed when localizer fails
localizer = setupContainerLocalizerForTest();
doThrow(new YarnRuntimeException("Forced Failure")).when(localizer).localizeFiles(
any(LocalizationProtocol.class), any(CompletionService.class),
any(UserGroupInformation.class));
verify(localizer, never()).closeFileSystems(
any(UserGroupInformation.class));
localizer.runLocalization(nmAddr);
verify(localizer).closeFileSystems(any(UserGroupInformation.class));
}
private ByteBuffer getFirstToComplete(
CompletionService<ByteBuffer> hedgedService,
ArrayList<Future<ByteBuffer>> futures) throws InterruptedException {
if (futures.isEmpty()) {
throw new InterruptedException("let's retry");
}
Future<ByteBuffer> future = null;
try {
future = hedgedService.take();
ByteBuffer bb = future.get();
futures.remove(future);
return bb;
} catch (ExecutionException e) {
// already logged in the Callable
futures.remove(future);
} catch (CancellationException ce) {
// already logged in the Callable
futures.remove(future);
}
throw new InterruptedException("let's retry");
}
/**
* Test that LoggingRegistry is concurrent-sage initialized over multiple calls. Creating more than 1000 threads can
* cause significant performance impact.
*
* @throws InterruptedException
* @throws ExecutionException
*
*/
@Test( timeout = 30000 )
public void testLoggingRegistryConcurrentInitialization() throws InterruptedException, ExecutionException {
CountDownLatch start = new CountDownLatch( 1 );
int count = 10;
CompletionService<LoggingRegistry> drover = registerHounds( count, start );
// fire!
start.countDown();
Set<LoggingRegistry> distinct = new HashSet<LoggingRegistry>();
int i = 0;
while ( i < count ) {
Future<LoggingRegistry> complete = drover.poll( 15, TimeUnit.SECONDS );
LoggingRegistry instance = complete.get();
distinct.add( instance );
i++;
}
Assert.assertEquals( "Only one singlton instance ;)", 1, distinct.size() );
}
@Test
public void testDoSmth() throws Exception {
Demo.DemoRequest.Builder req = Demo.DemoRequest.newBuilder();
req.setUserId(1);
int multiSize = 12;
int totalRequestSize = 10;
ExecutorService pool = Executors.newFixedThreadPool(multiSize);
CompletionService<Demo.DemoResponse> completionService = new ExecutorCompletionService<Demo.DemoResponse>(
pool);
Invoker invoker = new Invoker(req.build());
long time = System.currentTimeMillis();
for (int i = 0; i < totalRequestSize; i++) {
completionService.submit(invoker);
}
for (int i = 0; i < totalRequestSize; i++) {
completionService.take().get();
}
long timetook = System.currentTimeMillis() - time;
System.out.println("Total using " + timetook + "ms");
System.out.println("QPS:" + 1000f / ((timetook) / (1.0f * totalRequestSize)));
}
private static <T> T takeUnchecked(CompletionService<T> completionService)
throws InterruptedException
{
try {
return completionService.take().get();
}
catch (ExecutionException e) {
throw new RuntimeException(e);
}
}
private List<Partition> getPartitions(Table table, String expression)
{
if (partitionSegments == 1) {
return getPartitions(table, expression, null);
}
// Do parallel partition fetch.
CompletionService<List<Partition>> completionService = new ExecutorCompletionService<>(executor);
for (int i = 0; i < partitionSegments; i++) {
Segment segment = new Segment().withSegmentNumber(i).withTotalSegments(partitionSegments);
completionService.submit(() -> getPartitions(table, expression, segment));
}
List<Partition> partitions = new ArrayList<>();
try {
for (int i = 0; i < partitionSegments; i++) {
Future<List<Partition>> futurePartitions = completionService.take();
partitions.addAll(futurePartitions.get());
}
}
catch (ExecutionException | InterruptedException e) {
if (e instanceof InterruptedException) {
Thread.currentThread().interrupt();
}
throw new PrestoException(HIVE_METASTORE_ERROR, "Failed to fetch partitions from Glue Data Catalog", e);
}
partitions.sort(PARTITION_COMPARATOR);
return partitions;
}
@Override
public Boolean execute(I context, Collection<IProcessor<I, Boolean>> processors) throws InterruptedException, ExecutionException {
Boolean results = true;
if (!processors.isEmpty()) {
ThreadPoolExecutor threadPoolExecutor = ThreadPoolFactory.newFixedThreadPool(parallelCount);
CompletionService<Boolean> cService = new ExecutorCompletionService<>(threadPoolExecutor);
results = executeByServiceThreadPool(cService, context, processors);
threadPoolExecutor.shutdown();
threadPoolExecutor.awaitTermination(11, TimeUnit.HOURS);
}
return results;
}
protected Boolean executeByServiceThreadPool(CompletionService<Boolean> cService, I context, Collection<IProcessor<I, Boolean>> processors) throws InterruptedException, ExecutionException {
Boolean results = true;
for (IProcessor<I, Boolean> processor : processors) {
cService.submit(new Callable<Boolean>() {
@Override
public Boolean call() throws Exception {
if (processor.isEnabled()) {
try {
return processor.process(context);
} catch (Exception ex) {
logger.error("executeByServiceThreadPool", ex);
return false;
}
} else {
logger.info(String.format("processor:%s,%s", processor.getClass().getName(), processor.isEnabled()));
}
return true;
}
});
}
for (int i = 0; i < processors.size(); i++) {
results = results && cService.take().get();
if (!results) {
break;
}
}
return results;
}
private long executeQueries(final List<String> queries, final Context ctx) throws Exception {
int maxThreads = KylinConfig.getInstanceFromEnv().getMigrationRuleQueryLatencyMaxThreads();
int threadNum = Math.min(maxThreads, queries.size());
ExecutorService threadPool = Executors.newFixedThreadPool(threadNum);
CompletionService<Long> completionService = new ExecutorCompletionService<Long>(threadPool);
final Authentication auth = SecurityContextHolder.getContext().getAuthentication();
long start = System.currentTimeMillis();
for (final String query : queries) {
completionService.submit(new Callable<Long>() {
@Override
public Long call() throws Exception {
SecurityContextHolder.getContext().setAuthentication(auth);
SQLRequest sqlRequest = new SQLRequest();
sqlRequest.setProject(ctx.getSrcProjectName());
sqlRequest.setSql(query);
SQLResponse sqlResponse = ctx.getQueryService().doQueryWithCache(sqlRequest, false);
if (sqlResponse.getIsException()) {
throw new RuleValidationException(sqlResponse.getExceptionMessage());
}
return sqlResponse.getDuration();
}
});
}
long timeCostSum = 0L;
for (int i = 0; i < queries.size(); ++i) {
try {
timeCostSum += completionService.take().get();
} catch (InterruptedException | ExecutionException e) {
threadPool.shutdownNow();
throw e;
}
}
long end = System.currentTimeMillis();
logger.info("Execute" + queries.size() + " queries took " + (end - start) + " ms, query time cost sum "
+ timeCostSum + " ms.");
return timeCostSum / queries.size();
}
@Test(timeout = 11000)
public void fail_fast() throws InterruptedException, ExecutionException {
ExecutorService executorService = Executors.newFixedThreadPool(2);
CompletionService<Long> completionService = new ExecutorCompletionService<Long>(executorService);
BlockingQueue<OWLCompositeObject> queue = new LinkedBlockingQueue<OWLCompositeObject>();
BlockingQueue<OntologySetup> ontologyQueue = new LinkedBlockingQueue<OntologySetup>();
OwlOntologyProducer producer =
new OwlOntologyProducer(queue, ontologyQueue, new AtomicInteger(), graph);
OntologySetup ontologyConfig = new OntologySetup();
ontologyConfig.setUrl("http://localhost:10000/foo.owl");
List<Future<?>> futures = new ArrayList<>();
futures.add(completionService.submit(producer));
futures.add(completionService.submit(producer));
Thread.sleep(1000);
ontologyQueue.put(ontologyConfig);
expectedException.expect(ExecutionException.class);
while (futures.size() > 0) {
Future<?> completedFuture = completionService.take();
futures.remove(completedFuture);
completedFuture.get();
}
executorService.shutdown();
executorService.awaitTermination(10, TimeUnit.SECONDS);
}
public void doWork() throws Exception{
CompletionService<String> completionService = new ExecutorCompletionService<String>(threadPool);
List<Future<String>> futureList = new ArrayList<Future<String>>();
for(int i=0; i < 20; i++){
futureList.add(completionService.submit(new Count10(i)));
}
for(int i=0; i < 20; i++){
Future<String> future = completionService.take();
System.out.println(future.get());
}
}
AsynchronousGetRecordsRetrievalStrategy(KinesisDataFetcher dataFetcher, ExecutorService executorService,
int retryGetRecordsInSeconds, Supplier<CompletionService<DataFetcherResult>> completionServiceSupplier,
String shardId) {
this.dataFetcher = dataFetcher;
this.executorService = executorService;
this.retryGetRecordsInSeconds = retryGetRecordsInSeconds;
this.completionServiceSupplier = completionServiceSupplier;
this.shardId = shardId;
}
private void parallelExecute(Collection<Callable<KieBase>> solvers) throws Exception {
CompletionService<KieBase> ecs = new ExecutorCompletionService<KieBase>(executor);
for (Callable<KieBase> s : solvers) {
ecs.submit(s);
}
assertTimeoutPreemptively(Duration.ofSeconds(10), () -> {
for (int i = 0; i < PARALLEL_THREADS; ++i) {
KieBase kbase = ecs.take().get();
}
});
}
private void parallelExecute(Collection<Callable<List<String>>> solvers) throws Exception {
CompletionService<List<String>> ecs = new ExecutorCompletionService<List<String>>(executor);
for (Callable<List<String>> s : solvers) {
ecs.submit(s);
}
assertTimeoutPreemptively(Duration.ofSeconds(10), () -> {
for (int i = 0; i < PARALLEL_THREADS; ++i) {
List<String> events = ecs.take().get();
assertEquals(5, events.size());
}
});
}
private void wireInParallel(int wireListSize) throws Exception {
final int parallelThread = Runtime.getRuntime().availableProcessors();
CompletionService<Boolean> ecs = ExecutorProviderFactory.getExecutorProvider().getCompletionService();
int size = wireListSize / parallelThread;
for (int i = 1; i <= parallelThread; i++) {
List<String> subList = wireList.subList((i-1) * size, i == parallelThread ? wireListSize : i * size);
ecs.submit(new WiringExecutor(classLoader, invokerLookups, subList));
}
for (int i = 1; i <= parallelThread; i++) {
ecs.take().get();
}
}
public ConcurrentJunitRunner(final Class<?> klass) throws InitializationError {
super(klass);
setScheduler(new RunnerScheduler() {
ExecutorService executorService = Executors.newFixedThreadPool(klass.isAnnotationPresent(Concurrent.class) ? klass.getAnnotation(Concurrent.class).threads() : (int) (Runtime.getRuntime().availableProcessors() * 1.5), new NamedThreadFactory(klass.getSimpleName()));
CompletionService<Void> completionService = new ExecutorCompletionService<Void>(executorService);
Queue<Future<Void>> tasks = new LinkedList<Future<Void>>();
@Override
public void schedule(Runnable childStatement) {
tasks.offer(completionService.submit(childStatement, null));
}
@Override
public void finished() {
try {
while (!tasks.isEmpty()) {
tasks.remove(completionService.take());
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
} finally {
while (!tasks.isEmpty()) {
tasks.poll().cancel(true);
}
executorService.shutdownNow();
}
}
});
}