下面列出了java.util.concurrent.ScheduledFuture#isDone ( ) 实例代码,或者点击链接到github查看源代码,也可以在右侧发表评论。
protected void stopAndWaitScheduledFeature(ScheduledFuture scheduledFuture, long timeout) throws TimeoutException {
if (scheduledFuture != null) {
long t0 = System.currentTimeMillis();
while (!scheduledFuture.isDone()) {
if (System.currentTimeMillis() - t0 > timeout) {
throw new TimeoutException("Wait for async job timeout!");
}
scheduledFuture.cancel(true);
try {
Thread.sleep(50);
} catch (InterruptedException e) {
logger.warn("Exception: ", e);
}
}
}
}
@Test
public void stopOnFailureTrue() throws Exception {
Scheduler s = new Scheduler(new DefaultRegistry(), "test", 2);
Scheduler.Options opts = new Scheduler.Options()
.withFrequency(Scheduler.Policy.FIXED_DELAY, Duration.ofMillis(10))
.withStopOnFailure(true);
final CountDownLatch latch = new CountDownLatch(1);
ScheduledFuture<?> f = s.schedule(opts, () -> {
latch.countDown();
throw new RuntimeException("stop");
});
Assertions.assertTrue(latch.await(60, TimeUnit.SECONDS));
while (!f.isDone()); // This will be an endless loop if broken
s.shutdown();
}
@Override
public void close() {
state.set(State.STOPPED);
if (!initialValueFuture.isDone()) {
initialValueFuture.cancel(false);
}
// Cancel any scheduled operations.
final ScheduledFuture<?> currentScheduleFuture = this.currentScheduleFuture;
if (currentScheduleFuture != null && !currentScheduleFuture.isDone()) {
currentScheduleFuture.cancel(false);
}
final CompletableFuture<?> currentWatchFuture = this.currentWatchFuture;
if (currentWatchFuture != null && !currentWatchFuture.isDone()) {
currentWatchFuture.cancel(false);
}
}
protected void cancelHeartbeat() {
try {
ScheduledFuture<?> task = this.heartbeatTask;
this.heartbeatTask = null;
if ((task != null) && !task.isDone()) {
if (logger.isTraceEnabled()) {
logger.trace("Cancelling heartbeat in session " + getId());
}
task.cancel(false);
}
}
catch (Throwable ex) {
logger.debug("Failure while cancelling heartbeat in session " + getId(), ex);
}
}
void scheduleFlushWithDelayIfNeeded(final Callable<?> callable,
final AtomicReference<ScheduledFuture<?>> scheduledFutureRef) {
final long delayMs = Math.max(0, minDelayBetweenImmediateFlushMs - lastTransmit.elapsed(TimeUnit.MILLISECONDS));
final ScheduledFuture<?> scheduledFuture = scheduledFutureRef.get();
if ((null == scheduledFuture) || scheduledFuture.isDone()) {
scheduledFutureRef.set(scheduler.schedule(new Runnable() {
@Override
public void run() {
synchronized (this) {
scheduledFutureRef.set(null);
try {
callable.call();
// Flush was successful or wasn't needed, the exception should be unset.
scheduledFlushException.set(null);
} catch (Exception exc) {
scheduledFlushException.set(exc);
LOG.error("Delayed flush failed", exc);
}
}
}
}, delayMs, TimeUnit.MILLISECONDS));
}
}
void scheduleFlushWithDelayIfNeeded(final Callable<?> callable,
final AtomicReference<ScheduledFuture<?>> scheduledFutureRef) {
final long delayMs = Math.max(0, minDelayBetweenImmediateFlushMs - lastTransmit.elapsed(TimeUnit.MILLISECONDS));
final ScheduledFuture<?> scheduledFuture = scheduledFutureRef.get();
if ((null == scheduledFuture) || scheduledFuture.isDone()) {
scheduledFutureRef.set(scheduler.schedule(new Runnable() {
@Override
public void run() {
synchronized(this) {
scheduledFutureRef.set(null);
try {
callable.call();
// Flush was successful or wasn't needed, the exception should be unset.
scheduledFlushException.set(null);
} catch (Exception exc) {
scheduledFlushException.set(exc);
LOG.error("Delayed flush failed", exc);
}
}
}
}, delayMs, TimeUnit.MILLISECONDS));
}
}
private void disposeHelper() {
String helperKey = buildHelperKey(ontologySettings);
if (checkers.containsKey(helperKey)) {
ScheduledFuture checker = checkers.remove(helperKey);
if (!checker.isCancelled() && !checker.isDone()) {
logger.debug("Cancelling ScheduledFuture for {}", helperKey);
checker.cancel(false);
}
}
if (helpers.containsKey(helperKey)) {
OntologyHelper helper = helpers.remove(helperKey);
helper.dispose();
}
threadPool.stats();
logger.debug("helpers.size() = {}; checkers.size() = {}", helpers.size(), checkers.size());
}
/**
* Stops all jobs for this thing.
*/
private void stopJobs() {
logger.debug("Stopping scheduled jobs for thing {}", getThing().getUID());
monitor.lock();
try {
if (cronScheduler != null) {
if (dailyJob != null) {
dailyJob.cancel(true);
}
dailyJob = null;
}
for (ScheduledFuture<?> future : scheduledFutures) {
if (!future.isDone()) {
future.cancel(true);
}
}
scheduledFutures.clear();
} catch (Exception ex) {
logger.error("{}", ex.getMessage(), ex);
} finally {
monitor.unlock();
}
}
/**
* Return the so far accumulated objects, but do not deliver them to the target consumer anymore.
*
* @return A list of accumulated objects
*/
public List<TYPE> join() {
ScheduledFuture<?> scheduledFuture = this.future;
if (scheduledFuture != null && !scheduledFuture.isDone()) {
scheduledFuture.cancel(false);
}
List<TYPE> lqueue = new ArrayList<>();
synchronized (queue) {
lqueue.addAll(queue);
queue.clear();
}
return lqueue;
}
/**
* Manage "node arriving"-related tasks: schedule move to next node; send ON_NODE_ARRIVED event to Quest script
*
* @param npc NPC to manage
*/
public void onArrived(Npc npc) {
if (_activeRoutes.containsKey(npc.getObjectId())) {
final WalkInfo walk = _activeRoutes.get(npc.getObjectId());
// Opposite should not happen... but happens sometime
if ((walk.getCurrentNodeId() >= 0) && (walk.getCurrentNodeId() < walk.getRoute().getNodesCount())) {
final NpcWalkerNode node = walk.getRoute().getNodeList().get(walk.getCurrentNodeId());
if (isInsideRadius2D(npc, node, 10)) {
walk.calculateNextNode(npc);
walk.setBlocked(true); // prevents to be ran from walk check task, if there is delay in this node.
if (node.getNpcString() != null) {
npc.broadcastSay(ChatType.NPC_GENERAL, node.getNpcString());
} else if (!node.getChatText().isEmpty()) {
npc.broadcastSay(ChatType.NPC_GENERAL, node.getChatText());
}
final ScheduledFuture<?> task = _arriveTasks.get(npc);
if ((task == null) || task.isCancelled() || task.isDone())
{
_arriveTasks.put(npc, ThreadPool.schedule(new ArrivedTask(npc, walk), 100 + (node.getDelay() * 1000)));
}
}
}
}
}
public AbstractNettyChannel close() {
ScheduledFuture<?> eol = endOfLife;
if (eol != null && !eol.isDone() && !eol.cancel(false)) {
logger.warn("{} - maxLifeTime expiration task cancellation unexpectedly returned false for connection {}", getPoolName(), connection);
}
AbstractNettyChannel con = connection;
connection = null;
endOfLife = null;
return con;
}
@Test
public void verifyPolicyExecutionOrder() throws Exception {
List<Measurement> measurements = new ArrayList<>();
List<Symptom> symptoms = new ArrayList<>();
List<Diagnosis> diagnosis = new ArrayList<>();
List<Action> actions = new ArrayList<>();
IHealthPolicy mockPolicy = mock(IHealthPolicy.class);
when(mockPolicy.getDelay()).thenReturn(Duration.ZERO);
when(mockPolicy.executeSensors()).thenReturn(measurements);
when(mockPolicy.executeDetectors(measurements)).thenReturn(symptoms);
when(mockPolicy.executeDiagnosers(symptoms)).thenReturn(diagnosis);
when(mockPolicy.executeResolvers(diagnosis)).thenReturn(actions);
List<IHealthPolicy> policies = Collections.singletonList(mockPolicy);
PoliciesExecutor executor = new PoliciesExecutor(policies);
ScheduledFuture<?> future = executor.start();
try {
verify(mockPolicy, timeout(200l).atLeastOnce()).executeResolvers(diagnosis);
} catch (WantedButNotInvoked e) {
if (future.isDone()) {
System.out.println(future.get());
}
throw e;
}
InOrder order = Mockito.inOrder(mockPolicy);
order.verify(mockPolicy).executeSensors();
order.verify(mockPolicy).executeDetectors(measurements);
order.verify(mockPolicy).executeDiagnosers(symptoms);
order.verify(mockPolicy).executeResolvers(diagnosis);
executor.destroy();
}
public void close() {
System.out.println("TaskCenter closing");
this.monitoring.cancel(true);
// 停止所有定时任务
for (ScheduledFuture<?> sf : this.scheduledFutureList) {
if (!sf.isCancelled() || !sf.isDone()) {
sf.cancel(true);
}
}
this.scheduledFutureList.clear();
Iterator<Timer> iter = this.timers.values().iterator();
while (iter.hasNext()) {
Timer timer = iter.next();
timer.cancel();
}
this.timers.clear();
// 关闭滑动窗
this.slidingWindow.stop();
// 关闭线程池
this.mainExecutor.shutdown();
this.scheduledExecutor.shutdown();
System.out.println("TaskCenter closed");
}
@Override
public void run() {
Iterator<ScheduledFuture<?>> iter = scheduledFutureList.iterator();
while (iter.hasNext()) {
ScheduledFuture<?> sf = iter.next();
if (sf.isCancelled() || sf.isDone()) {
// 删除已经结束的任务
iter.remove();
}
}
}
protected void processSuccessfulCall() {
boolean stateChanged = false;
synchronized (this) {
// Reset the consecutive failure counter, make sure we're set to CLOSED state, and capture whether this is a
// state change or not.
consecutiveFailureCount = 0;
if (!State.CLOSED.equals(currentState)) {
logger.info(
"Setting circuit breaker state to CLOSED after successful call. "
+ "circuit_breaker_state_changed_to={}, circuit_breaker_id={}",
State.CLOSED.name(), id
);
currentState = State.CLOSED;
// Cancel any existing half-open scheduled timer.
ScheduledFuture<?> oldHalfOpenScheduledFuture = halfOpenScheduledFuture.getAndSet(null);
if (oldHalfOpenScheduledFuture != null && !oldHalfOpenScheduledFuture.isDone()) {
logger.debug(
"Cancelling half-open timeout check now that the circuit is closed. circuit_breaker_id={}", id
);
oldHalfOpenScheduledFuture.cancel(false);
}
stateChanged = true;
}
}
if (stateChanged) {
// On this particular call we went from OPEN to CLOSED. Notify listeners.
notifyOnCloseListeners();
}
}
private void tidyScheduledFutures() {
for (Iterator<ScheduledFuture<?>> iterator = scheduledFutures.iterator(); iterator.hasNext();) {
ScheduledFuture<?> future = iterator.next();
if (future.isDone()) {
logger.trace("Tidying up done future {}", future);
iterator.remove();
}
}
}
/**
* Deliver queued items now to the target consumer.
*/
public void forceProcessNow() {
ScheduledFuture<?> scheduledFuture = this.future;
if (scheduledFuture != null && !scheduledFuture.isDone()) {
scheduledFuture.cancel(false);
}
run();
}
/**
* Run a given task in background and manage task references and threads.
* @param userId
* @param serviceId
* @param delayMs
* @param task
* @return
*/
public static ServiceBackgroundTask runOnceInBackground(String userId, String serviceId, long delayMs, Runnable task){
//TODO: introduce max. delay ?
String taskId = getNewTaskId();
int corePoolSize = 1;
final ScheduledThreadPoolExecutor executor = ThreadManager.getNewScheduledThreadPool(corePoolSize);
executor.setRemoveOnCancelPolicy(true);
ScheduledFuture<?> future = executor.schedule(() -> {
//run task and...
try{
increaseThreadCounter();
task.run();
decreaseThreadCounter();
}catch (Exception e){
decreaseThreadCounter();
}
//... remove yourself from manager
removeFromSbtMap(taskId);
executor.purge();
executor.shutdown();
}, delayMs, TimeUnit.MILLISECONDS);
//other option (but does not support lambda expression):
//Timer timer = new Timer();
//timer.schedule(task, delayMs);
BooleanSupplier cancelFun = () -> {
if (future.isDone() || future.cancel(false)){
removeFromSbtMap(taskId);
executor.purge();
executor.shutdown();
return true;
}else{
executor.purge();
executor.shutdown();
return false;
}
};
ServiceBackgroundTask sbt = new ServiceBackgroundTask(serviceId, taskId, future, cancelFun);
addToSbtMap(taskId, sbt);
return sbt;
}
public static boolean isWaiting(ScheduledFuture<?> future) {
return future != null && !future.isCancelled() && !future.isDone();
}
/**
* Perform the HttpClient request and handle the results. A configurable list of status codes are accepted and the
* response content (expected to be json) is parsed into a Map. Values returned on errors and when response content
* is also configurable. The rest config "timeout" setting is imposed in this method and will abort the get request
* if exceeded.
*
* @param restConfig
* @param httpRequestBase
* @return
* @throws IOException
*/
protected static Object executeRequest(RestConfig restConfig, HttpRequestBase httpRequestBase) throws IOException {
URI uri = httpRequestBase.getURI();
HttpHost target = new HttpHost(uri.getHost(), uri.getPort(), uri.getScheme());
Optional<HttpHost> proxy = getProxy(restConfig);
HttpClientContext httpClientContext = getHttpClientContext(restConfig, target, proxy);
httpRequestBase.setConfig(getRequestConfig(restConfig, proxy));
// Schedule a command to abort the request if the timeout is exceeded
ScheduledFuture scheduledFuture = scheduledExecutorService.schedule(httpRequestBase::abort, restConfig.getTimeout(), TimeUnit.MILLISECONDS);
CloseableHttpResponse response;
try {
response = closeableHttpClient.execute(httpRequestBase, httpClientContext);
} catch(Exception e) {
// Report a timeout if the httpGet request was aborted. Otherwise rethrow exception.
if (httpRequestBase.isAborted()) {
throw new IOException(String.format("Total Stellar REST request time to %s exceeded the configured timeout of %d ms.", httpRequestBase.getURI().toString(), restConfig.getTimeout()));
} else {
throw e;
}
}
// Cancel the future if the request finished within the timeout
if (!scheduledFuture.isDone()) {
scheduledFuture.cancel(true);
}
int statusCode = response.getStatusLine().getStatusCode();
LOG.debug("request = {}; response = {}", httpRequestBase, response);
if (restConfig.getResponseCodesAllowed().contains(statusCode)) {
HttpEntity httpEntity = response.getEntity();
// Parse the response if present, return the empty value override if not
Optional<Object> parsedResponse = parseResponse(restConfig, httpRequestBase, httpEntity);
return parsedResponse.orElseGet(restConfig::getEmptyContentOverride);
} else {
throw new IOException(String.format("Stellar REST request to %s expected status code to be one of %s but " +
"failed with http status code %d: %s",
httpRequestBase.getURI().toString(),
restConfig.getResponseCodesAllowed().toString(),
statusCode,
EntityUtils.toString(response.getEntity())));
}
}