下面列出了怎么用java.util.concurrent.ScheduledFuture的API类实例代码及写法,或者点击链接到github查看源代码。
public ScheduledFuture<?> schedule(Runnable task, Trigger trigger) {
if (!enabled) {
logger.debug("skip : {}", task);
return null;
}
ScheduledFuture<?> future = instance.schedule(task, trigger);
String runnableKey = findRunnableKey(task);
if (Boolean.FALSE.equals(skipMap.get(runnableKey))) {
future.cancel(true);
}
return future;
}
@Before
public void setUp() {
mockBus = createNiceMock(BuckEventBus.class);
mockFuture = createMock(ScheduledFuture.class);
mockClient = createNiceMock(OkHttpClient.class);
dispatcher = new Dispatcher(createMock(ExecutorService.class));
EasyMock.expect(mockClient.dispatcher()).andReturn(dispatcher).anyTimes();
mockScheduler = createMock(ScheduledExecutorService.class);
mockClock = createMock(Clock.class);
EasyMock.expect(mockClock.currentTimeMillis()).andReturn(42L).anyTimes();
config =
ImmutableClientSideSlbConfig.builder()
.setClock(mockClock)
.setServerPool(SERVERS)
.setEventBus(mockBus)
.setServerPoolName(SERVER_POOL_NAME)
.build();
}
@Override
public <T> T computeIfAbsent(final Class<T> expectedClass, final String key, final Predicate<Element> toRemove,
final long timeoutMs, final Supplier<T> value) {
final Integer maxSize = this.getConfigValue(CacheConfiguration::getDefaultMaxSize, -1);
if (maxSize > 0 && this.cache.size() >= maxSize) {
this.clean(); // clean before add one element.
if (this.cache.size() >= maxSize) {
synchronized (this.cache) {
while (this.cache.size() >= maxSize) {
final String keyToRemove = this.cache.keySet().iterator().next();
this.cache.remove(keyToRemove);
}
}
}
}
final ScheduledFuture<?> task = timeoutMs > 0 ? this.evictionTask(key, timeoutMs) : null;
final long endOfValidity = this.calcEndOfValidity(timeoutMs);
final ElementImpl element =
this.addToMap(key, () -> new ElementImpl(value, toRemove, endOfValidity, task, this.timer));
return element.getValue(expectedClass);
}
/**
* scheduleAtFixedRate executes series of tasks at given rate.
* Eventually, it must hold that:
* cycles - 1 <= elapsedMillis/delay < cycles
*/
public void testFixedRateSequence() throws InterruptedException {
final CustomExecutor p = new CustomExecutor(1);
try (PoolCleaner cleaner = cleaner(p)) {
for (int delay = 1; delay <= LONG_DELAY_MS; delay *= 3) {
final long startTime = System.nanoTime();
final int cycles = 8;
final CountDownLatch done = new CountDownLatch(cycles);
final Runnable task = new CheckedRunnable() {
public void realRun() { done.countDown(); }};
final ScheduledFuture periodicTask =
p.scheduleAtFixedRate(task, 0, delay, MILLISECONDS);
final int totalDelayMillis = (cycles - 1) * delay;
await(done, totalDelayMillis + LONG_DELAY_MS);
periodicTask.cancel(true);
final long elapsedMillis = millisElapsedSince(startTime);
assertTrue(elapsedMillis >= totalDelayMillis);
if (elapsedMillis <= cycles * delay)
return;
// else retry with longer delay
}
fail("unexpected execution rate");
}
}
/**
* Times the given future out after the timeout.
*
* @param future to time out
* @param timeout after which the given future is timed out
* @param timeUnit time unit of the timeout
* @param timeoutFailExecutor executor that will complete the future exceptionally after the timeout is reached
* @param <T> type of the given future
* @return The timeout enriched future
*/
public static <T> CompletableFuture<T> orTimeout(
CompletableFuture<T> future,
long timeout,
TimeUnit timeUnit,
Executor timeoutFailExecutor) {
if (!future.isDone()) {
final ScheduledFuture<?> timeoutFuture = Delayer.delay(
() -> timeoutFailExecutor.execute(new Timeout(future)), timeout, timeUnit);
future.whenComplete((T value, Throwable throwable) -> {
if (!timeoutFuture.isDone()) {
timeoutFuture.cancel(false);
}
});
}
return future;
}
@SuppressWarnings("unchecked")
@Test
public void startAndStopWithHeartbeatValue() {
ScheduledFuture future = mock(ScheduledFuture.class);
given(this.taskScheduler.scheduleWithFixedDelay(any(Runnable.class), eq(15000L))).willReturn(future);
this.messageHandler.setTaskScheduler(this.taskScheduler);
this.messageHandler.setHeartbeatValue(new long[] {15000, 16000});
this.messageHandler.start();
verify(this.taskScheduler).scheduleWithFixedDelay(any(Runnable.class), eq(15000L));
verifyNoMoreInteractions(this.taskScheduler, future);
this.messageHandler.stop();
verify(future).cancel(true);
verifyNoMoreInteractions(future);
}
private void connect(Collection<DataTreeModification<Node>> changes) {
changes.forEach((change) -> {
InstanceIdentifier<Node> key = change.getRootPath().getRootIdentifier();
DataObjectModification<Node> mod = change.getRootNode();
Node node = getCreated(mod);
if (node == null) {
return;
}
CONNECTED_NODES.put(key, node);
ScheduledFuture ft = TIMEOUT_FTS.remove(key);
if (ft != null) {
ft.cancel(false);
}
HwvtepGlobalAugmentation globalAugmentation = node.augmentation(HwvtepGlobalAugmentation.class);
if (globalAugmentation != null) {
ConnectionInfo connectionInfo = globalAugmentation.getConnectionInfo();
if (connectionInfo != null) {
NODE_CONNECTION_INFO.put(key, connectionInfo);
}
}
if (node != null) {
synchronized (HwvtepOperGlobalListener.class) {
NODE_DELET_WAITING_JOBS.putIfAbsent(key, new ArrayList<>());
}
}
});
}
/**
* 强制取消当前所有任务
* @author Frodez
* @date 2019-03-21
*/
public Result cancelAllTasks() {
int total = taskMap.size();
int alreadyCanceled = 0;
for (Entry<Long, ScheduledFuture<?>> entry : taskMap.entrySet()) {
if (entry.getValue().cancel(true)) {
taskMap.remove(entry.getKey());
taskInfoMap.remove(entry.getKey());
++alreadyCanceled;
}
}
return Result.success(StrUtil.concat("共计", Integer.valueOf(total).toString(), "个任务正在执行,已取消", Integer.valueOf(alreadyCanceled).toString(),
"个。"));
}
/**
* Sets the active state of the specified spawn.
*
* @param spawnInst
* @param isActive
*/
@SuppressWarnings("rawtypes")
public void setSpawnActive(AutoSpawnInstance spawnInst, boolean isActive)
{
if (spawnInst == null)
return;
int objectId = spawnInst._objectId;
if (isSpawnRegistered(objectId))
{
ScheduledFuture spawnTask = null;
if (isActive)
{
AutoSpawner rs = new AutoSpawner(objectId);
if (spawnInst._desDelay > 0)
spawnTask = ThreadPoolManager.getInstance().scheduleEffectAtFixedRate(rs,
spawnInst._initDelay, spawnInst._resDelay);
else
spawnTask = ThreadPoolManager.getInstance().scheduleEffect(rs, spawnInst._initDelay);
_runningSpawns.put(objectId, spawnTask);
} else
{
AutoDespawner rd = new AutoDespawner(objectId);
spawnTask = _runningSpawns.remove(objectId);
if (spawnTask != null)
spawnTask.cancel(false);
ThreadPoolManager.getInstance().scheduleEffect(rd, 0);
}
spawnInst.setSpawnActive(isActive);
}
}
@Override
public ScheduledFuture<?> schedule(Runnable task, Trigger trigger) {
ScheduledExecutorService executor = getScheduledExecutor();
try {
ErrorHandler errorHandler =
(this.errorHandler != null ? this.errorHandler : TaskUtils.getDefaultErrorHandler(true));
return new ReschedulingRunnable(task, trigger, executor, errorHandler).schedule();
}
catch (RejectedExecutionException ex) {
throw new TaskRejectedException("Executor [" + executor + "] did not accept task: " + task, ex);
}
}
/**
* remove(task) removes queued task, and fails to remove active task
*/
public void testRemove() throws InterruptedException {
final CountDownLatch done = new CountDownLatch(1);
final ScheduledThreadPoolExecutor p = new CustomExecutor(1);
try (PoolCleaner cleaner = cleaner(p, done)) {
ScheduledFuture[] tasks = new ScheduledFuture[5];
final CountDownLatch threadStarted = new CountDownLatch(1);
for (int i = 0; i < tasks.length; i++) {
Runnable r = new CheckedRunnable() {
public void realRun() throws InterruptedException {
threadStarted.countDown();
await(done);
}};
tasks[i] = p.schedule(r, 1, MILLISECONDS);
}
await(threadStarted);
BlockingQueue<Runnable> q = p.getQueue();
assertFalse(p.remove((Runnable)tasks[0]));
assertTrue(q.contains((Runnable)tasks[4]));
assertTrue(q.contains((Runnable)tasks[3]));
assertTrue(p.remove((Runnable)tasks[4]));
assertFalse(p.remove((Runnable)tasks[4]));
assertFalse(q.contains((Runnable)tasks[4]));
assertTrue(q.contains((Runnable)tasks[3]));
assertTrue(p.remove((Runnable)tasks[3]));
assertFalse(q.contains((Runnable)tasks[3]));
}
}
private void aiPerform(ECSGame game) {
Set<Entity> ais = game.getEntitiesWithComponent(AIComponent.class);
logger.info("AI entities " + ais);
for (Entity entity : ais) {
AIComponent aiComp = ai.get(entity);
if (aiComp.hasWaitingAction()) {
logger.info(entity + " already has a waiting action");
continue;
}
if (aiComp.isPaused()) {
logger.info(entity + " AI is paused");
continue;
}
long delay = aiComp.getDelay();
ECSAction action = aiComp.getAI().getAction(entity);
if (action != null && !game.isGameOver()) {
logger.info(entity + " will perform " + action + " in " + delay + " milliseconds");
Runnable runnable = () -> this.perform(entity, action);
if (delay <= 0) {
runnable.run();
}
else {
ScheduledFuture<?> future = executor.schedule(runnable, delay, TimeUnit.MILLISECONDS);
aiComp.future = future;
}
return;
}
else {
logger.info(entity + ": No actions available");
}
}
}
private Map<Path, @Nullable ScheduledFuture<?>> getKeyFutures(WatchKey key) {
Map<Path, @Nullable ScheduledFuture<?>> keyFutures = futures.get(key);
if (keyFutures == null) {
keyFutures = new ConcurrentHashMap<>();
futures.put(key, keyFutures);
}
return keyFutures;
}
@Override
public ScheduledFuture<?> scheduleAtFixedRate(
Runnable task,
Instant startTime,
Duration period) {
return this.delegate.scheduleAtFixedRate(TraceRunnable.wrap(task), startTime, period);
}
/**
* Stops the current execution of a task and schedules a runnable task for execution again.
* This uses a fixed size scheduler to limit thread execution.
*
* @param futureTask the {@link ScheduledFuture} for the current scheduled task
* @param runnableTask the {@link Runnable} to execute
* @param delay the delay in milliseconds before the task will be executed
* @return the {@link ScheduledFuture} for the scheduled task
*/
public ScheduledFuture<?> rescheduleTask(ScheduledFuture<?> futureTask, Runnable runnableTask, long delay) {
futureTask.cancel(false);
if (networkState != ZigBeeNetworkState.ONLINE) {
logger.debug("ZigBeeNetworkManager rescheduleTask: not scheduling task while {}", networkState);
return null;
}
return executorService.schedule(runnableTask, delay, TimeUnit.MILLISECONDS);
}
ScheduledFuture killSwitch() {
final Thread current = Thread.currentThread();
return LdapTimeoutTest.pool.schedule(new Callable<Void>() {
public Void call() throws Exception {
System.err.println("Fail: killSwitch()");
current.interrupt();
return null;
}
}, 5000, TimeUnit.MILLISECONDS);
}
@Override
public void scheduleJob( JobConfiguration jobConfiguration )
{
if ( ifJobInSystemStop( jobConfiguration.getUid() ) )
{
JobInstance jobInstance = new DefaultJobInstance( this, messageService, leaderManager );
if ( jobConfiguration.getUid() != null && !futures.containsKey( jobConfiguration.getUid() ) )
{
log.info( String.format( "Scheduling job: %s", jobConfiguration ) );
ScheduledFuture<?> future = null;
if ( jobConfiguration.getJobType().isCronSchedulingType() )
{
future = jobScheduler.schedule( () -> jobInstance.execute( jobConfiguration ),
new CronTrigger( jobConfiguration.getCronExpression() ) );
}
else if ( jobConfiguration.getJobType().isFixedDelaySchedulingType() )
{
future = jobScheduler.scheduleWithFixedDelay( () -> jobInstance.execute( jobConfiguration ),
Instant.now().plusSeconds( DEFAULT_INITIAL_DELAY_S ),
Duration.of( jobConfiguration.getDelay(), ChronoUnit.SECONDS ) );
}
futures.put( jobConfiguration.getUid(), future );
log.info( String.format( "Scheduled job: %s", jobConfiguration ) );
}
}
}
@Test
public void testUnsubscribeOnEventTypeRemoval() {
// Mock the task scheduler and capture the runnable
ArgumentCaptor<Runnable> runnableArg = ArgumentCaptor.forClass(Runnable.class);
when(taskScheduler.scheduleWithFixedDelay(runnableArg.capture(), anyLong())).thenReturn(Mockito.mock(ScheduledFuture.class));
// Mock the response to the subsribeContext
ArgumentCaptor<SuccessCallback> successArg = ArgumentCaptor.forClass(SuccessCallback.class);
ListenableFuture<SubscribeContextResponse> responseFuture = Mockito.mock(ListenableFuture.class);
doNothing().when(responseFuture).addCallback(successArg.capture(), any());
// Return the mocked future on subscription
when(ngsiClient.subscribeContext(any(), any(), any())).thenReturn(responseFuture);
Configuration configuration = getBasicConf();
subscriptionManager.setConfiguration(configuration);
// Execute scheduled runnable
runnableArg.getValue().run();
// Return the SubscribeContextResponse
callSuccessCallback(successArg);
// Mock future for unsubscribeContext
ListenableFuture<UnsubscribeContextResponse> responseFuture2 = Mockito.mock(ListenableFuture.class);
doNothing().when(responseFuture2).addCallback(successArg.capture(), any());
when(ngsiClient.unsubscribeContext(eq("http://iotAgent"), eq(null), eq("12345678"))).thenReturn(responseFuture2);
// Set a configuration without the eventType
Configuration emptyConfiguration = new Configuration();
emptyConfiguration.setEventTypeIns(Collections.emptyList());
subscriptionManager.setConfiguration(emptyConfiguration);
// Check that unsubsribe is called when a later configuration removed the event type
Assert.notNull(successArg.getValue());
}
public Future<?> schedule(String id, Runnable task, long delay, TimeUnit timeUnit) {
if (active.get()) {
ScheduledFuture<?> newFuture = executor.schedule(task, delay, timeUnit);
log.debug("Scheduled ", id, ": ", task);
Optional.ofNullable(tasks.put(id, newFuture)).ifPresent(oldFuture -> {
log.debug("Cancelling ", id, " interrupt=", mayInterrupt, ": ", oldFuture);
oldFuture.cancel(mayInterrupt);
log.debug("Cancelled ", id, " interrupt=", mayInterrupt, ": ", oldFuture);
});
return newFuture;
} else {
log.debug("Schedule ", id, " while inactive: ", task);
return Futures.immediateCancelledFuture();
}
}
/**
* Schedule statistics updater tasks for the given service/cartridge type.
*
* @param serviceName service name/cartridge type
*/
public void scheduleStatisticsUpdaterTasks(String serviceName) {
synchronized (MockHealthStatisticsGenerator.class) {
if (!statisticsUpdaterTasksScheduled(serviceName)) {
List<MockHealthStatisticsPattern> statisticsPatterns = MockIaasConfig.getInstance().
getMockHealthStatisticsConfig().getStatisticsPatterns();
Map<String, ScheduledFuture> taskList = serviceNameToTaskListMap.get(serviceName);
if (taskList == null) {
taskList = new ConcurrentHashMap<String, ScheduledFuture>();
serviceNameToTaskListMap.put(serviceName, taskList);
}
for (MockHealthStatisticsPattern statisticsPattern : statisticsPatterns) {
if (statisticsPattern.getCartridgeType().equals(serviceName) &&
(statisticsPattern.getSampleDuration() > 0)) {
MockHealthStatisticsUpdater runnable = new MockHealthStatisticsUpdater(statisticsPattern);
ScheduledFuture<?> task = scheduledExecutorService.scheduleAtFixedRate(runnable, 0,
statisticsPattern.getSampleDuration(), TimeUnit.SECONDS);
taskList.put(statisticsPattern.getFactor().toString(), task);
}
}
if (log.isInfoEnabled()) {
log.info(String.format("Mock statistics updaters scheduled: [service-name] %s", serviceName));
}
}
}
}
/**
* shutdownNow returns a list containing tasks that were not run,
* and those tasks are drained from the queue
*/
public void testShutdownNow_delayedTasks() throws InterruptedException {
final CustomExecutor p = new CustomExecutor(1);
List<ScheduledFuture> tasks = new ArrayList<>();
for (int i = 0; i < 3; i++) {
Runnable r = new NoOpRunnable();
tasks.add(p.schedule(r, 9, SECONDS));
tasks.add(p.scheduleAtFixedRate(r, 9, 9, SECONDS));
tasks.add(p.scheduleWithFixedDelay(r, 9, 9, SECONDS));
}
if (testImplementationDetails)
assertEquals(new HashSet(tasks), new HashSet(p.getQueue()));
final List<Runnable> queuedTasks;
try {
queuedTasks = p.shutdownNow();
} catch (SecurityException ok) {
return; // Allowed in case test doesn't have privs
}
assertTrue(p.isShutdown());
assertTrue(p.getQueue().isEmpty());
if (testImplementationDetails)
assertEquals(new HashSet(tasks), new HashSet(queuedTasks));
assertEquals(tasks.size(), queuedTasks.size());
for (ScheduledFuture task : tasks) {
assertFalse(((CustomTask)task).ran);
assertFalse(task.isDone());
assertFalse(task.isCancelled());
}
assertTrue(p.awaitTermination(LONG_DELAY_MS, MILLISECONDS));
assertTrue(p.isTerminated());
}
@Override public ScheduledFuture<?> schedule(Runnable cmd, long delay, TimeUnit unit) {
ScheduledTask task = new ScheduledTask(currentTimeNanos + unit.toNanos(delay), cmd);
if (delay > 0) {
scheduledTasks.add(task);
} else {
dueTasks.add(task);
}
return task;
}
@Override
public void close() throws Exception {
ScheduledFuture<?> future = this.future;
if(future != null) {
future.cancel(true);
}
}
private void receivedOrTimeout() {
final ScheduledFuture<?> scheduledFuture = this.scheduledFuture;
if (scheduledFuture != null) { // Cancel timeout
scheduledFuture.cancel(false);
this.scheduledFuture = null;
}
future.complete(null);
}
@Override
public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command, long initialDelay, long delay, TimeUnit unit) {
ScheduledFutureTask<Void> task = new ScheduledFutureTask<>(command, null,
toTimestamp(initialDelay, unit), unit.toMillis(delay));
schedule(task);
return task;
}
@Override
public ScheduledFuture<?> scheduleOnce(final Supplier<ActorMessage<?>> supplier, final ActorGroup group, long delay, TimeUnit unit) {
return timerExecuterService.schedule(new Runnable() {
@Override
public void run() {
ActorMessage<?> message = supplier.get();
for (UUID id : group) {
message.dest = id;
system.send(message);
}
}
}, delay, unit);
}
@Override
public ScheduledFuture<?> schedule(Runnable command, long delay, TimeUnit unit) {
if (command instanceof FileCache.DeleteProcess) {
assertNull("Multiple delete process registered", lastDeleteProcess);
lastDeleteProcess = (FileCache.DeleteProcess) command;
lastDelayMillis = unit.toMillis(delay);
return super.schedule(() -> {}, delay, unit);
} else {
return super.schedule(command, delay, unit);
}
}
/**
* {@inheritDoc}
*
* @see java.util.concurrent.Future#get(long, java.util.concurrent.TimeUnit)
*
*/
@Override
public Object get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException {
ScheduledFuture<?> curr;
synchronized (this.triggerContextMonitor) {
curr = this.currentFuture;
}
return curr.get(timeout, unit);
}
public void close() {
System.out.println("TaskCenter closing");
this.monitoring.cancel(true);
// 停止所有定时任务
for (ScheduledFuture<?> sf : this.scheduledFutureList) {
if (!sf.isCancelled() || !sf.isDone()) {
sf.cancel(true);
}
}
this.scheduledFutureList.clear();
Iterator<Timer> iter = this.timers.values().iterator();
while (iter.hasNext()) {
Timer timer = iter.next();
timer.cancel();
}
this.timers.clear();
// 关闭滑动窗
this.slidingWindow.stop();
// 关闭线程池
this.mainExecutor.shutdown();
this.scheduledExecutor.shutdown();
System.out.println("TaskCenter closed");
}
/**
* Tests that the RPC service's scheduled executor service can execute runnable with a fixed
* delay.
*/
@Test(timeout = 60000)
public void testScheduledExecutorServiceWithFixedDelaySchedule() throws Exception {
ScheduledExecutor scheduledExecutor = akkaRpcService.getScheduledExecutor();
final int tries = 4;
final long delay = 10L;
final CountDownLatch countDownLatch = new CountDownLatch(tries);
long currentTime = System.nanoTime();
ScheduledFuture<?> future = scheduledExecutor.scheduleWithFixedDelay(
countDownLatch::countDown,
delay,
delay,
TimeUnit.MILLISECONDS);
assertTrue(!future.isDone());
countDownLatch.await();
// the future should not complete since we have a periodic task
assertTrue(!future.isDone());
long finalTime = System.nanoTime() - currentTime;
// the processing should have taken at least delay times the number of count downs.
assertTrue(finalTime >= tries * delay);
future.cancel(true);
}