下面列出了java.util.concurrent.Future#isDone ( ) 实例代码,或者点击链接到github查看源代码,也可以在右侧发表评论。
/**
* Start {@code ChangeLoader} to work
*
* @throws BiremeException last call to {@code ChangeLoader} throw an Exception.
* @throws InterruptedException interrupted when get the result of last call to
* {@code ChangeLoader}.
*/
public void startLoad() throws BiremeException, InterruptedException {
Future<LoadTask> head = mergeResult.peek();
if (head != null && head.isDone()) {
// get result of last load
if (loadResult != null && loadResult.isDone()) {
try {
loadResult.get();
} catch (ExecutionException e) {
throw new BiremeException("Loader failed. ", e.getCause());
}
}
// start a new load
if (loadResult == null || loadResult.isDone()) {
loadResult = cxt.loaderPool.submit(loader);
}
}
}
private Optional<Output> checkMailboxes() {
for (int i = 0; i < pendingRequests; i++) {
@SuppressWarnings("unchecked")
Future<Output> future = (Future<Output>)mailboxes[i];
if (future.isDone()) {
Output value = getFuture(future);
if (iteratorHasNext && (iteratorHasNext = inputData.hasNext())) {
mailboxes[i] = callback.apply(inputData.next(), i);
} else {
pendingRequests--;
mailboxes[i] = mailboxes[pendingRequests];
mailboxes[pendingRequests] = null;
}
if (value == null)
return Optional.empty();
else
return Optional.of(value);
}
}
return null;
}
@Override
public void start(String entityClassName) {
synchronized (this) {
IndexTask<?> task = this.tasksMap.get(entityClassName);
Future<?> future = this.inFlightTasksMap.get(entityClassName);
if(future != null && !future.isDone()) {
if(future.cancel(true))
logger.info("Canceled task: " + task);
}
Future<?> submit = submit(task);
logger.debug("submitted task: " + task);
Class<?> domainClass = task.getDomainClass();
super.progress.put(domainClass.getCanonicalName(), new IndexProgress(domainClass, task.count(), (long) 0));
this.inFlightTasksMap.put(task.getDomainClass().getCanonicalName(), submit);
}
}
@Override
public void close()
{
boolean shouldSendDelete;
Future<?> future;
synchronized (this) {
shouldSendDelete = !closed;
closed = true;
future = this.future;
this.future = null;
lastUpdate = DateTime.now();
}
if (future != null && !future.isDone()) {
future.cancel(true);
}
// abort the output buffer on the remote node; response of delete is ignored
if (shouldSendDelete) {
sendDelete();
}
}
public synchronized boolean isDone() {
if (status.get() == RPC_PENDING) {
try {
Future<R> dataFuture = pendingDataOps.peek();
while (dataFuture != null && dataFuture.isDone()) {
dataFuture = pendingDataOps.poll();
R result = dataFuture.get();
this.aggregate(result);
dataFuture = pendingDataOps.peek();
}
if (pendingDataOps.isEmpty() && status.get() == RPC_PENDING) {
completeOperation();
}
} catch (Exception e) {
status.set(RPC_ERROR);
this.exception = e;
}
}
return status.get() > 0;
}
synchronized void close() {
closed = true;
executorService.shutdownNow();
boolean terminated = true;
for (Future<?> f : submited) {
if (!f.isDone()) {
terminated = false;
break;
}
}
if (!terminated) {
// We need to in order to have all the layout closed before the factory.
// This should not exeed few seconds, resolution stops has soon as it
// detects that we are closed (closing == true).
pmSession.println("Awaiting termination of background resolution...");
try {
executorService.awaitTermination(20, TimeUnit.SECONDS);
} catch (InterruptedException ex) {
Thread.interrupted();
pmSession.println("Interrupted");
}
}
}
@Override
public void start(String entityClassName) {
synchronized (this) {
IndexTask<?> task = this.tasksMap.get(entityClassName);
Future<?> future = this.inFlightTasksMap.get(entityClassName);
if(future != null && !future.isDone()) {
boolean cancel = future.cancel(true);
if(cancel)
logger.info("Canceled task: " + task);
}
Future<?> submit = submit(task);
this.inFlightTasksMap.put(task.getDomainClass().getCanonicalName(), submit);
logger.debug("added task: " + task);
}
}
/**
* Delays given task if neccessary - e.g. projects are currently openning - and reschedules the task if indexing is running
* This method waits for projects to open and thus blocks the current thread.
* @param task task to be delayed
* @param logger
* @param logMessagePrefix
* @return true if the task was rescheduled
*/
public boolean isDelayed (RequestProcessor.Task task, Logger logger, String logMessagePrefix) {
boolean rescheduled = false;
DelayedScan scan = getRegisteredScan(task);
Future<Project[]> projectOpenTask = OpenProjects.getDefault().openProjects();
if (!projectOpenTask.isDone()) {
try {
projectOpenTask.get();
} catch (Exception ex) {
// not interested
}
}
if (IndexingBridge.getInstance().isIndexingInProgress()
&& (BLOCK_INDEFINITELY || scan.waitingLoops * WAITING_PERIOD < MAX_WAITING_TIME)) {
// do not steal disk from openning projects and indexing tasks
Level level = ++scan.waitingLoops < 10 ? Level.FINE : Level.INFO;
logger.log(level, "{0}: Scanning in progress, trying again in {1}ms", new Object[]{logMessagePrefix, WAITING_PERIOD}); //NOI18N
task.schedule(WAITING_PERIOD); // try again later
rescheduled = true;
} else {
scan.waitingLoops = 0;
}
return rescheduled;
}
public boolean isResultAvailable(final P arg) {
Future<R> res = cache.get(arg);
if (res == null) {
return false;
}
return res.isDone() && !res.isCancelled();
}
/**
* 打印线程异常信息
*/
public static void printException(Runnable r, Throwable t)
{
if (t == null && r instanceof Future<?>)
{
try
{
Future<?> future = (Future<?>) r;
if (future.isDone())
{
future.get();
}
}
catch (CancellationException ce)
{
t = ce;
}
catch (ExecutionException ee)
{
t = ee.getCause();
}
catch (InterruptedException ie)
{
Thread.currentThread().interrupt();
}
}
if (t != null)
{
logger.error(t.getMessage(), t);
}
}
public void dispose() {
disposed = true;
for (Future future : updateStateMap.values()) {
if (future.isDone())
continue;
future.cancel(false);
}
updateStateMap = null;
}
private CrailBuffer getSlice(boolean blocking) throws Exception {
CrailBuffer slice = readySlices.peek();
if (slice == null){
Future<CrailResult> future = pendingFutures.peek();
if (future == null){
tmpSlices.clear();
while(!freeSlices.isEmpty()){
tmpSlices.add(freeSlices.poll());
}
while(!tmpSlices.isEmpty()){
triggerRead(tmpSlices.poll());
}
future = pendingFutures.peek();
}
if (future != null){
statistics.incTotalOps();
if (blocking){
future.get();
}
if (future.isDone()){
future = pendingFutures.poll();
statistics.incNonBlockingOps();
slice = pendingSlices.poll();
slice.flip();
readySlices.add(slice);
} else {
slice = null;
}
} else {
slice = null;
}
}
return slice;
}
/**
* 多个异步执行
*
* @param
* @return java.lang.String
* @throws
* @author wliduo[[email protected]]
* @date 2020/5/20 10:26
*/
@GetMapping("/run3")
public String run3() throws Exception {
logger.info("run3开始执行");
long start = System.currentTimeMillis();
Future<String> future3 = asyncService.task3();
Future<String> future4 = asyncService.task4();
// 这样与下面是一样的
logger.info(future3.get());
logger.info(future4.get());
// 先判断是否执行完成
boolean run3Done = Boolean.FALSE;
while (true) {
if (future3.isDone() && future4.isDone()) {
// 执行完成
run3Done = Boolean.TRUE;
break;
}
if (future3.isCancelled() || future4.isCancelled()) {
// 取消情况
break;
}
}
if (run3Done) {
logger.info(future3.get());
logger.info(future4.get());
} else {
// 其他异常情况
}
long end = System.currentTimeMillis();
logger.info("run3执行完成,执行时间: {}", end - start);
return "run3 success";
}
/**
* Test that an Asynchronous Service does not throw a TimeoutException where the service completes more
* quickly than the specified time out. The service is annotated with both @Asynchronous and @Timeout.
*
* A 2 second timeout is configured for serviceB but serviceB has a 0.5 second sleep so that, in this case, the
* service should NOT generate Timeout exceptions.
*/
@Test
public void testAsyncNoTimeout() {
// Call serviceB. As it is annotated @Asynchronous, serviceB should return a future straight away even though
// the method has a 0.5s sleep in it
long start = System.nanoTime();
Future<Connection> future = null;
try {
future = clientForAsyncTimeout.serviceB();
}
catch (InterruptedException e) {
throw new AssertionError("testAsyncNoTimeout: unexpected InterruptedException calling serviceB");
}
long end = System.nanoTime();
Duration duration = Duration.ofNanos(end - start);
// should have returned almost instantly, if it takes TEST_FUTURE_THRESHOLD then there is something wrong
assertThat("Method did not return quickly enough", duration, lessThan(TEST_FUTURE_THRESHOLD));
// serviceB is fast and should return normally after 0.5 seconds but check for premature
if (future.isDone()) {
throw new AssertionError("testAsyncNoTimeout: Future completed too fast");
}
// The service should complete normally, there should be no FT TimeoutException
try {
Connection conn = future.get(TEST_TIME_UNIT.toMillis(), TimeUnit.MILLISECONDS);
}
catch (Exception t) {
// Not Expected
Assert.fail("serviceB should not throw an Exception in testAsyncNoTimeout");
}
}
/**
* Test consuming and producing via KafkaProducer and KafkaConsumer instances.
*/
@Test
public void testProducerAndConsumer() throws Exception {
// Create a topic
final String topicName = "ProducerAndConsumerTest" + System.currentTimeMillis();
getKafkaTestUtils().createTopic(topicName, 1, (short) 1);
final int partitionId = 0;
// Define our message
final String expectedKey = "my-key";
final String expectedValue = "my test message";
// Define the record we want to produce
final ProducerRecord<String, String> producerRecord = new ProducerRecord<>(topicName, partitionId, expectedKey, expectedValue);
// Create a new producer
try (final KafkaProducer<String, String> producer =
getKafkaTestUtils().getKafkaProducer(StringSerializer.class, StringSerializer.class)) {
// Produce it & wait for it to complete.
final Future<RecordMetadata> future = producer.send(producerRecord);
producer.flush();
while (!future.isDone()) {
Thread.sleep(500L);
}
logger.info("Produce completed");
}
// Create consumer
try (final KafkaConsumer<String, String> kafkaConsumer =
getKafkaTestUtils().getKafkaConsumer(StringDeserializer.class, StringDeserializer.class)) {
final List<TopicPartition> topicPartitionList = new ArrayList<>();
for (final PartitionInfo partitionInfo: kafkaConsumer.partitionsFor(topicName)) {
topicPartitionList.add(new TopicPartition(partitionInfo.topic(), partitionInfo.partition()));
}
kafkaConsumer.assign(topicPartitionList);
kafkaConsumer.seekToBeginning(topicPartitionList);
// Pull records from kafka, keep polling until we get nothing back
ConsumerRecords<String, String> records;
do {
records = kafkaConsumer.poll(2000L);
logger.info("Found {} records in kafka", records.count());
for (ConsumerRecord<String, String> record: records) {
// Validate
assertEquals("Key matches expected", expectedKey, record.key());
assertEquals("value matches expected", expectedValue, record.value());
}
}
while (!records.isEmpty());
}
}
List<Future<List<String>>> processRegionsMetaCleanup(
ExecFunction<Map<TableName, List<T>>, List<String>> reportFunction,
ExecFunction<List<String>, List<T>> execFunction,
List<String> nameSpaceOrTable) throws IOException {
ExecutorService executorService = Executors.newFixedThreadPool(
(nameSpaceOrTable == null ||
nameSpaceOrTable.size() > Runtime.getRuntime().availableProcessors()) ?
Runtime.getRuntime().availableProcessors() :
nameSpaceOrTable.size());
List<Future<List<String>>> futures =
new ArrayList<>(nameSpaceOrTable == null ? 1 : nameSpaceOrTable.size());
try {
try(final Admin admin = conn.getAdmin()) {
Map<TableName,List<T>> report = reportFunction.execute(nameSpaceOrTable);
if(report.size() < 1) {
LOG.info("\nNo mismatches found in meta. Worth using related reporting function " +
"first.\nYou are likely passing non-existent " +
"namespace or table. Note that table names should include the namespace " +
"portion even for tables in the default namespace. " +
"See also the command usage.\n");
}
for (TableName tableName : report.keySet()) {
if(admin.tableExists(tableName)) {
futures.add(executorService.submit(new Callable<List<String>>() {
@Override
public List<String> call() throws Exception {
LOG.debug("running thread for {}", tableName.getNameWithNamespaceInclAsString());
return execFunction.execute(report.get(tableName));
}
}));
} else {
LOG.warn("Table {} does not exist! Skipping...",
tableName.getNameWithNamespaceInclAsString());
}
}
boolean allDone;
do {
allDone = true;
for (Future<List<String>> f : futures) {
allDone &= f.isDone();
}
} while(!allDone);
}
} finally {
executorService.shutdown();
}
return futures;
}
public static void main(String[] args) throws Exception {
InitialContext ctx = null;
List<Future> results = new ArrayList<>();
try {
// run the DeadServerTest with no timeouts set
// this should get stuck indefinitely, so we need to kill
// it after a timeout
System.out.println("Running connect timeout test with 20s kill switch");
Hashtable env = createEnv();
results.add(
testPool.submit(new DeadServerNoTimeoutTest(env, killSwitchPool)));
// run the ReadServerTest with connect timeout set
// this should get stuck indefinitely so we need to kill
// it after a timeout
System.out.println("Running read timeout test with 10ms connect timeout & 20s kill switch");
Hashtable env1 = createEnv();
env1.put("com.sun.jndi.ldap.connect.timeout", "10");
results.add(testPool.submit(
new ReadServerNoTimeoutTest(env1, killSwitchPool)));
// run the ReadServerTest with no timeouts set
// this should get stuck indefinitely, so we need to kill
// it after a timeout
System.out.println("Running read timeout test with 20s kill switch");
Hashtable env2 = createEnv();
results.add(testPool.submit(
new ReadServerNoTimeoutTest(env2, killSwitchPool)));
// run the DeadServerTest with connect / read timeouts set
// this should exit after the connect timeout expires
System.out.println("Running connect timeout test with 10ms connect timeout, 3000ms read timeout");
Hashtable env3 = createEnv();
env3.put("com.sun.jndi.ldap.connect.timeout", "10");
env3.put("com.sun.jndi.ldap.read.timeout", "3000");
results.add(testPool.submit(new DeadServerTimeoutTest(env3)));
// run the ReadServerTest with connect / read timeouts set
// this should exit after the connect timeout expires
//
// NOTE: commenting this test out as it is failing intermittently.
//
// System.out.println("Running read timeout test with 10ms connect timeout, 3000ms read timeout");
// Hashtable env4 = createEnv();
// env4.put("com.sun.jndi.ldap.connect.timeout", "10");
// env4.put("com.sun.jndi.ldap.read.timeout", "3000");
// results.add(testPool.submit(new ReadServerTimeoutTest(env4)));
// run the DeadServerTest with connect timeout set
// this should exit after the connect timeout expires
System.out.println("Running connect timeout test with 10ms connect timeout");
Hashtable env5 = createEnv();
env5.put("com.sun.jndi.ldap.connect.timeout", "10");
results.add(testPool.submit(new DeadServerTimeoutTest(env5)));
// 8000487: Java JNDI connection library on ldap conn is
// not honoring configured timeout
System.out.println("Running simple auth connection test");
Hashtable env6 = createEnv();
env6.put("com.sun.jndi.ldap.connect.timeout", "10");
env6.put("com.sun.jndi.ldap.read.timeout", "3000");
env6.put(Context.SECURITY_AUTHENTICATION, "simple");
env6.put(Context.SECURITY_PRINCIPAL, "user");
env6.put(Context.SECURITY_CREDENTIALS, "password");
results.add(testPool.submit(new DeadServerTimeoutTest(env6)));
boolean testFailed = false;
for (Future test : results) {
while (!test.isDone()) {
if ((Boolean) test.get() == false)
testFailed = true;
}
}
if (testFailed) {
throw new AssertionError("some tests failed");
}
} finally {
LdapTimeoutTest.killSwitchPool.shutdown();
LdapTimeoutTest.testPool.shutdown();
}
}
protected void checkSemantic(final String relFilePath, final String caretLine) throws Exception {
Source testSource = getTestSource(getTestFile(relFilePath));
if (caretLine != null) {
int caretOffset = getCaretOffset(testSource.createSnapshot().getText().toString(), caretLine);
enforceCaretOffset(testSource, caretOffset);
}
UserTask task = new UserTask() {
public @Override void run(ResultIterator resultIterator) throws Exception {
Parser.Result r = resultIterator.getParserResult();
assertTrue(r instanceof ParserResult);
ParserResult pr = (ParserResult) r;
SemanticAnalyzer analyzer = getSemanticAnalyzer();
assertNotNull("getSemanticAnalyzer must be implemented", analyzer);
analyzer.run(pr, null);
Map<OffsetRange, Set<ColoringAttributes>> highlights = analyzer.getHighlights();
if (highlights == null) {
highlights = Collections.emptyMap();
}
Document doc = GsfUtilities.getDocument(pr.getSnapshot().getSource().getFileObject(), true);
checkNoOverlaps(highlights.keySet(), doc);
String annotatedSource = annotateSemanticResults(doc, highlights);
assertDescriptionMatches(relFilePath, annotatedSource, false, ".semantic");
}
};
if (classPathsForTest == null || classPathsForTest.isEmpty()) {
ParserManager.parse(Collections.singleton(testSource), task);
} else {
Future<Void> future = ParserManager.parseWhenScanFinished(Collections.singleton(testSource), task);
if (!future.isDone()) {
future.get();
}
}
}
public static void main(String[] args) throws Exception {
InitialContext ctx = null;
List<Future> results = new ArrayList<>();
try {
// run the DeadServerTest with no timeouts set
// this should get stuck indefinitely, so we need to kill
// it after a timeout
System.out.println("Running connect timeout test with 20s kill switch");
Hashtable env = createEnv();
results.add(
testPool.submit(new DeadServerNoTimeoutTest(env, killSwitchPool)));
// run the ReadServerTest with connect timeout set
// this should get stuck indefinitely so we need to kill
// it after a timeout
System.out.println("Running read timeout test with 10ms connect timeout & 20s kill switch");
Hashtable env1 = createEnv();
env1.put("com.sun.jndi.ldap.connect.timeout", "10");
results.add(testPool.submit(
new ReadServerNoTimeoutTest(env1, killSwitchPool)));
// run the ReadServerTest with no timeouts set
// this should get stuck indefinitely, so we need to kill
// it after a timeout
System.out.println("Running read timeout test with 20s kill switch");
Hashtable env2 = createEnv();
results.add(testPool.submit(
new ReadServerNoTimeoutTest(env2, killSwitchPool)));
// run the DeadServerTest with connect / read timeouts set
// this should exit after the connect timeout expires
System.out.println("Running connect timeout test with 10ms connect timeout, 3000ms read timeout");
Hashtable env3 = createEnv();
env3.put("com.sun.jndi.ldap.connect.timeout", "10");
env3.put("com.sun.jndi.ldap.read.timeout", "3000");
results.add(testPool.submit(new DeadServerTimeoutTest(env3)));
// run the ReadServerTest with connect / read timeouts set
// this should exit after the connect timeout expires
//
// NOTE: commenting this test out as it is failing intermittently.
//
// System.out.println("Running read timeout test with 10ms connect timeout, 3000ms read timeout");
// Hashtable env4 = createEnv();
// env4.put("com.sun.jndi.ldap.connect.timeout", "10");
// env4.put("com.sun.jndi.ldap.read.timeout", "3000");
// results.add(testPool.submit(new ReadServerTimeoutTest(env4)));
// run the DeadServerTest with connect timeout set
// this should exit after the connect timeout expires
System.out.println("Running connect timeout test with 10ms connect timeout");
Hashtable env5 = createEnv();
env5.put("com.sun.jndi.ldap.connect.timeout", "10");
results.add(testPool.submit(new DeadServerTimeoutTest(env5)));
// 8000487: Java JNDI connection library on ldap conn is
// not honoring configured timeout
System.out.println("Running simple auth connection test");
Hashtable env6 = createEnv();
env6.put("com.sun.jndi.ldap.connect.timeout", "10");
env6.put("com.sun.jndi.ldap.read.timeout", "3000");
env6.put(Context.SECURITY_AUTHENTICATION, "simple");
env6.put(Context.SECURITY_PRINCIPAL, "user");
env6.put(Context.SECURITY_CREDENTIALS, "password");
results.add(testPool.submit(new DeadServerTimeoutTest(env6)));
boolean testFailed = false;
for (Future test : results) {
while (!test.isDone()) {
if ((Boolean) test.get() == false)
testFailed = true;
}
}
if (testFailed) {
throw new AssertionError("some tests failed");
}
} finally {
LdapTimeoutTest.killSwitchPool.shutdown();
LdapTimeoutTest.testPool.shutdown();
}
}
/**
* Whether execution is already in progress against this command instance.
*
* @return True if execution is in progress
*/
private boolean isExecutionInProgress() {
Future<LabeledResultSet> currentTask = this.currentTask;
return currentTask != null && !currentTask.isDone();
}