下面列出了java.util.concurrent.CompletionService#take ( ) 实例代码,或者点击链接到github查看源代码,也可以在右侧发表评论。
private ByteBuffer getFirstToComplete(
CompletionService<ByteBuffer> hedgedService,
ArrayList<Future<ByteBuffer>> futures) throws InterruptedException {
if (futures.isEmpty()) {
throw new InterruptedException("let's retry");
}
Future<ByteBuffer> future = null;
try {
future = hedgedService.take();
ByteBuffer bb = future.get();
futures.remove(future);
return bb;
} catch (ExecutionException e) {
// already logged in the Callable
futures.remove(future);
} catch (CancellationException ce) {
// already logged in the Callable
futures.remove(future);
}
throw new InterruptedException("let's retry");
}
private ByteBuffer getFirstToComplete(
CompletionService<ByteBuffer> hedgedService,
ArrayList<Future<ByteBuffer>> futures) throws InterruptedException {
if (futures.isEmpty()) {
throw new InterruptedException("let's retry");
}
Future<ByteBuffer> future = null;
try {
future = hedgedService.take();
ByteBuffer bb = future.get();
futures.remove(future);
return bb;
} catch (ExecutionException e) {
// already logged in the Callable
futures.remove(future);
} catch (CancellationException ce) {
// already logged in the Callable
futures.remove(future);
}
throw new InterruptedException("let's retry");
}
@Test
public void testUpsertSelectSameBatchConcurrently() throws Exception {
try (Connection conn = driver.connect(url, props)) {
int numUpsertSelectRunners = 5;
ExecutorService exec = Executors.newFixedThreadPool(numUpsertSelectRunners);
CompletionService<Boolean> completionService = new ExecutorCompletionService<Boolean>(exec);
List<Future<Boolean>> futures = Lists.newArrayListWithExpectedSize(numUpsertSelectRunners);
// run one UPSERT SELECT for 100 rows (that locks the rows for a long time)
futures.add(completionService.submit(new UpsertSelectRunner(dataTable, 0, 105, 1)));
// run four UPSERT SELECTS for 5 rows (that overlap with slow running UPSERT SELECT)
for (int i = 0; i < 100; i += 25) {
futures.add(completionService.submit(new UpsertSelectRunner(dataTable, i, i+25, 5)));
}
int received = 0;
while (received < futures.size()) {
Future<Boolean> resultFuture = completionService.take();
Boolean result = resultFuture.get();
received++;
assertTrue(result);
}
exec.shutdownNow();
}
}
private List<Partition> getPartitions(Table table, String expression)
{
if (partitionSegments == 1) {
return getPartitions(table, expression, null);
}
// Do parallel partition fetch.
CompletionService<List<Partition>> completionService = new ExecutorCompletionService<>(executor);
for (int i = 0; i < partitionSegments; i++) {
Segment segment = new Segment().withSegmentNumber(i).withTotalSegments(partitionSegments);
completionService.submit(() -> getPartitions(table, expression, segment));
}
List<Partition> partitions = new ArrayList<>();
try {
for (int i = 0; i < partitionSegments; i++) {
Future<List<Partition>> futurePartitions = completionService.take();
partitions.addAll(futurePartitions.get());
}
}
catch (ExecutionException | InterruptedException e) {
if (e instanceof InterruptedException) {
Thread.currentThread().interrupt();
}
throw new PrestoException(HIVE_METASTORE_ERROR, "Failed to fetch partitions from Glue Data Catalog", e);
}
partitions.sort(PARTITION_COMPARATOR);
return partitions;
}
@Override
@NonNull
public Pair<Boolean,Collection<? extends URL>> execute(
@NonNull final Function<URL,Boolean> fnc,
@NonNull final Callable<Boolean> cancel,
@NonNull final Collection<? extends URL> binaries) {
final CompletionService<URL> cs = new ExecutorCompletionService<URL>(RP);
int submitted = 0;
for (URL binary : binaries) {
cs.submit(new Task(binary,fnc, cancel));
submitted++;
}
final Collection<URL> result = new ArrayDeque<URL>();
//Don't break the cycle when is canceled,
//rather wait for all submitted task, they should die fast.
//The break will cause logging of wrong number of scanned roots.
for (int i=0; i< submitted; i++) {
try {
final Future<URL> becomeURL = cs.take();
final URL url = becomeURL.get();
if (url != null) {
result.add(url);
}
} catch (Exception ex) {
Exceptions.printStackTrace(ex);
}
}
boolean success;
try {
success = !cancel.call();
} catch (Exception e) {
Exceptions.printStackTrace(e);
success = false;
}
LOG.log(Level.FINER, "Canceled: {0}", !success); //NOI18N
return Pair.<Boolean,Collection<? extends URL>>of(success,result);
}
/**
* A taken submitted task is completed
*/
public void testTake()
throws InterruptedException, ExecutionException {
CompletionService cs = new ExecutorCompletionService(cachedThreadPool);
cs.submit(new StringTask());
Future f = cs.take();
assertTrue(f.isDone());
assertSame(TEST_STRING, f.get());
}
/**
* Take returns the same future object returned by submit
*/
public void testTake2() throws InterruptedException {
CompletionService cs = new ExecutorCompletionService(cachedThreadPool);
Future f1 = cs.submit(new StringTask());
Future f2 = cs.take();
assertSame(f1, f2);
}
void configReloaded(ApplicationId applicationId) {
List<DelayedConfigResponses.DelayedConfigResponse> responses = delayedConfigResponses.drainQueue(applicationId);
String logPre = TenantRepository.logPre(applicationId);
if (log.isLoggable(Level.FINE)) {
log.log(Level.FINE, logPre + "Start of configReload: " + responses.size() + " requests on delayed requests queue");
}
int responsesSent = 0;
CompletionService<Boolean> completionService = new ExecutorCompletionService<>(executorService);
while (!responses.isEmpty()) {
DelayedConfigResponses.DelayedConfigResponse delayedConfigResponse = responses.remove(0);
// Discard the ones that we have already answered
// Doing cancel here deals with the case where the timer is already running or has not run, so
// there is no need for any extra check.
if (delayedConfigResponse.cancel()) {
if (log.isLoggable(Level.FINE)) {
logRequestDebug(Level.FINE, logPre + "Timer cancelled for ", delayedConfigResponse.request);
}
// Do not wait for this request if we were unable to execute
if (addToRequestQueue(delayedConfigResponse.request, false, completionService)) {
responsesSent++;
}
} else {
log.log(Level.FINE, logPre + "Timer already cancelled or finished or never scheduled");
}
}
for (int i = 0; i < responsesSent; i++) {
try {
completionService.take();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
log.log(Level.FINE, logPre + "Finished reloading " + responsesSent + " requests");
}
@Test(timeout = 11000)
public void fail_fast() throws InterruptedException, ExecutionException {
ExecutorService executorService = Executors.newFixedThreadPool(2);
CompletionService<Long> completionService = new ExecutorCompletionService<Long>(executorService);
BlockingQueue<OWLCompositeObject> queue = new LinkedBlockingQueue<OWLCompositeObject>();
BlockingQueue<OntologySetup> ontologyQueue = new LinkedBlockingQueue<OntologySetup>();
OwlOntologyProducer producer =
new OwlOntologyProducer(queue, ontologyQueue, new AtomicInteger(), graph);
OntologySetup ontologyConfig = new OntologySetup();
ontologyConfig.setUrl("http://localhost:10000/foo.owl");
List<Future<?>> futures = new ArrayList<>();
futures.add(completionService.submit(producer));
futures.add(completionService.submit(producer));
Thread.sleep(1000);
ontologyQueue.put(ontologyConfig);
expectedException.expect(ExecutionException.class);
while (futures.size() > 0) {
Future<?> completedFuture = completionService.take();
futures.remove(completedFuture);
completedFuture.get();
}
executorService.shutdown();
executorService.awaitTermination(10, TimeUnit.SECONDS);
}
public void doWork() throws Exception{
CompletionService<String> completionService = new ExecutorCompletionService<String>(threadPool);
List<Future<String>> futureList = new ArrayList<Future<String>>();
for(int i=0; i < 20; i++){
futureList.add(completionService.submit(new Count10(i)));
}
for(int i=0; i < 20; i++){
Future<String> future = completionService.take();
System.out.println(future.get());
}
}
private Stream<LabeledResultSet> executeAsync(int numberOfThreads,
Stream<Callable<LabeledResultSet>> callables,
MultiShardExecutionPolicy executionPolicy) throws SQLException, MultiShardException {
ExecutorService executorService = Executors.newFixedThreadPool(numberOfThreads);
try {
// CompletionService allows to terminate the parallel execution if one of the treads throws
// an exception
CompletionService<LabeledResultSet> completionService = new ExecutorCompletionService<>(executorService);
List<Future<LabeledResultSet>> futures = callables.map(completionService::submit).collect(Collectors.toList());
// Looping over the futures in order of completion: the first future to
// complete (or fail) is returned first by .take()
List<LabeledResultSet> resultSets = new ArrayList<>();
for (int i = 0; i < futures.size(); ++i) {
try {
this.currentTask = completionService.take();
resultSets.add(this.currentTask.get());
}
catch (Exception e) {
if (e.getCause() instanceof MultiShardException) {
MultiShardException ex = (MultiShardException) e.getCause();
ShardLocation loc = ex.getShardLocation();
if (this.currentTask.isCancelled()) {
log.info("MultiShardStatement.GetLabeledDbDataReaderTask; Command Cancelled;");
// Raise the shardExecutionCanceled event.
this.onShardExecutionCanceled(loc);
}
else {
log.info("MultiShardStatement.GetLabeledDbDataReaderTask; Command Failed");
// Raise the shardExecutionFaulted event.
this.onShardExecutionFaulted(loc, (Exception) e.getCause());
}
if (executionPolicy.equals(MultiShardExecutionPolicy.CompleteResults)) {
// In case one callable fails, cancel all pending and executing operations.
futures.forEach(f -> f.cancel(true));
throw ex;
}
resultSets.add(new LabeledResultSet(ex, loc, getConnectionForLocation(loc).prepareStatement(this.commandText)));
}
}
}
return resultSets.stream();
}
finally {
executorService.shutdown();
}
}
/**
* Generate a number of threads to fetch random numbers of certain bits
* generated through a shared SecureRandom instance.
* @param mech Mechanism name
* @param byteLen Number of bytes of random number to produce
* @param reSeed Call reseed() before generating random numbers
* @throws NoSuchAlgorithmException
* @throws InterruptedException
* @throws ExecutionException
*/
private static void forEachMech(String mech, int byteLen, SEED reSeed)
throws NoSuchAlgorithmException, InterruptedException,
ExecutionException {
if ("SHA1PRNG".equals(mech) && SEED.RESEED.equals(reSeed)) {
System.out.printf(
"%nreseed() api is not supported for '%s'", mech);
return;
}
System.out.printf("%nTest SecureRandom mechanism: '%s' with support of"
+ " reseed: '%s'", mech, reSeed);
int threadCount = (int) pow(2, 8 * byteLen);
System.out.printf("%nCreating %s number of threads to generate secure "
+ "random numbers concurrently.", threadCount);
ExecutorService executor
= Executors.newCachedThreadPool(new ThreadFactory() {
@Override
public Thread newThread(Runnable r) {
Thread t = Executors.defaultThreadFactory()
.newThread(r);
t.setDaemon(true);
return t;
}
});
CompletionService<Integer> completionService
= new ExecutorCompletionService<Integer>(executor);
CountDownLatch latch = new CountDownLatch(1);
SecureRandom rnd = null;
if (!mech.contains("_DRBG")) {
rnd = SecureRandom.getInstance(mech);
} else {
Security.setProperty(DRBG_CONFIG, mech);
rnd = SecureRandom.getInstance("DRBG");
}
try {
for (int i = 0; i < threadCount; i++) {
completionService.submit(new Task(rnd, latch, byteLen, reSeed));
}
latch.countDown();
for (int i = 0; i < threadCount; i++) {
completionService.take();
}
} finally {
executor.shutdown();
}
System.out.printf("%nCompleted Test for algorithm '%s' with thread "
+ "counts to '%s' using reseeding '%s'",
mech, threadCount, reSeed);
}
/**
* @param finalCheck
* If the internal nodePtrTbl should be restored for a subsequent
* liveness check. If this is the final/last check, it's pointless
* to re-create the nodePtrTable.
*/
protected int check0(final ITool tool, final boolean finalCheck) throws InterruptedException, IOException {
final long startTime = System.currentTimeMillis();
// Sum up the number of nodes in all disk graphs to indicate the amount
// of work to be done by liveness checking.
long sum = 0L;
for (int i = 0; i < checker.length; i++) {
sum += checker[i].getDiskGraph().size();
}
MP.printMessage(EC.TLC_CHECKING_TEMPORAL_PROPS, new String[] { finalCheck ? "complete" : "current",
Long.toString(sum), checker.length == 1 ? "" : checker.length + " branches of " });
// Copy the array of checkers into a concurrent-enabled queue
// that allows LiveWorker threads to easily get the next
// LiveChecker to work on. We don't really need the FIFO
// ordering of the BlockingQueue, just its support for removing
// elements concurrently.
//
// Logically the queue is the unit of work the group of LiveWorkers
// has to complete. Once the queue is empty, all work is done and
// the LiveWorker threads will terminate.
//
// An alternative implementation could partition the array of
// LiveChecker a-priori and assign one partition to each thread.
// However, that assumes the work in all partitions is evenly
// distributed, which is not necessarily true.
final BlockingQueue<ILiveChecker> queue = new ArrayBlockingQueue<ILiveChecker>(checker.length);
queue.addAll(Arrays.asList(checker));
/*
* A LiveWorker below can either complete a unit of work a) without finding a
* liveness violation, b) finds a violation, or c) fails to check because of an
* exception/error (such as going out of memory). In case an LW fails to check,
* we still wait for all other LWs to complete. A subset of the LWs might have
* found a violation. In other words, the OOM of an LW has lower precedence than
* a violation found by another LW. However, if any LW fails to check, we terminate
* model checking after all LWs completed.
*/
final int wNum = TLCGlobals.doSequentialLiveness() ? 1 : Math.min(checker.length, TLCGlobals.getNumWorkers());
final ExecutorService pool = Executors.newFixedThreadPool(wNum);
// CS is really just a container around the set of Futures returned by the pool. It saves us from
// creating a low-level array.
final CompletionService<Boolean> completionService = new ExecutorCompletionService<Boolean>(pool);
for (int i = 0; i < wNum; i++) {
completionService.submit(new LiveWorker(tool, i, wNum, this, queue, finalCheck));
}
// Wait for all LWs to complete.
pool.shutdown();
pool.awaitTermination(Long.MAX_VALUE, TimeUnit.DAYS); // wait forever
// Check if any one of the LWs found a violation (ignore failures for now).
ExecutionException ee = null;
for (int i = 0; i < wNum; i++) {
try {
final Future<Boolean> future = completionService.take();
if (future.get()) {
MP.printMessage(EC.TLC_CHECKING_TEMPORAL_PROPS_END,
TLC.convertRuntimeToHumanReadable(System.currentTimeMillis() - startTime));
return EC.TLC_TEMPORAL_PROPERTY_VIOLATED;
}
} catch (final ExecutionException e) {
// handled below!
ee = e;
}
}
// Terminate if any one of the LWs failed c)
if (ee != null) {
final Throwable cause = ee.getCause();
if (cause instanceof OutOfMemoryError) {
MP.printError(EC.SYSTEM_OUT_OF_MEMORY_LIVENESS, cause);
} else if (cause instanceof StackOverflowError) {
MP.printError(EC.SYSTEM_STACK_OVERFLOW, cause);
} else if (cause != null) {
MP.printError(EC.GENERAL, cause);
} else {
MP.printError(EC.GENERAL, ee);
}
System.exit(1);
}
// Reset after checking unless it's the final check:
if (finalCheck == false) {
for (int i = 0; i < checker.length; i++) {
checker[i].getDiskGraph().makeNodePtrTbl();
}
}
MP.printMessage(EC.TLC_CHECKING_TEMPORAL_PROPS_END, TLC.convertRuntimeToHumanReadable(System.currentTimeMillis() - startTime));
return EC.NO_ERROR;
}
public void loadOntology() throws InterruptedException, ExecutionException {
CompletionService<Long> completionService = new ExecutorCompletionService<Long>(exec);
Set<Future<?>> futures = new HashSet<>();
if (!ontologies.isEmpty()) {
for (int i = 0; i < numConsumers; i++) {
futures.add(completionService.submit(consumerProvider.get()));
}
for (int i = 0; i < numProducers; i++) {
futures.add(completionService.submit(producerProvider.get()));
}
for (OntologySetup ontology : ontologies) {
urlQueue.offer(ontology);
}
for (int i = 0; i < numProducers; i++) {
urlQueue.offer(POISON_STR);
}
}
while (futures.size() > 0) {
Future<?> completedFuture = completionService.take();
futures.remove(completedFuture);
try {
completedFuture.get();
} catch (ExecutionException e) {
logger.log(Level.SEVERE, "Stopping batchLoading due to: " + e.getMessage(), e);
e.printStackTrace();
exec.shutdownNow();
throw new InterruptedException(e.getCause().getMessage());
}
}
exec.shutdown();
exec.awaitTermination(10, TimeUnit.DAYS);
graph.shutdown();
logger.info("Postprocessing...");
postprocessorProvider.get().postprocess();
if (anonymousNodeProperty.isPresent()) {
postprocessorProvider.runAnonymousNodeTagger(anonymousNodeProperty.get());
}
if (cliqueConfiguration.isPresent()) {
postprocessorProvider.runCliquePostprocessor(cliqueConfiguration.get());
}
if (addEdgeLabel.orElse(false)) {
postprocessorProvider.runEdgeLabelerPostprocessor();
}
if (allNodesLabel.isPresent()) {
postprocessorProvider.runAllNodesLabeler(allNodesLabel.get());
}
postprocessorProvider.shutdown();
}
@Test
public void testCompletionServiceTake() throws InterruptedException {
Executor x = Executors.newSingleThreadExecutor();
CompletionService<String> s = new ExecutorCompletionService<String>(x);
Callable<String> c1 = new WithInstrumentation();
Callable<String> c2 = new WithoutInstrumentation();
assertTrue(c1 instanceof InstrumentedExecution);
assertFalse(c2 instanceof InstrumentedExecution);
Future<String> f1 = s.submit(c1);
assertTrue(f1 instanceof WrappedFuture);
assertEquals(((InstrumentedExecution) c1).observeExecutionRunContext(), ((WrappedFuture) f1).instrumented);
Future<String> f2 = s.take();
assertTrue(f2 instanceof WrappedFuture);
assertTrue(f1 == f2);
assertEquals(f1, f2);
assertEquals(f1.hashCode(), f2.hashCode());
assertEquals(((InstrumentedExecution) c1).observeExecutionRunContext(), ((WrappedFuture) f2).instrumented);
Future<String> f3 = s.submit(c2);
assertFalse(f3 instanceof WrappedFuture);
Future<String> f4 = s.take();
assertFalse(f4 instanceof WrappedFuture);
}