类java.util.concurrent.ScheduledFuture源码实例Demo

下面列出了怎么用java.util.concurrent.ScheduledFuture的API类实例代码及写法,或者点击链接到github查看源代码。

源代码1 项目: lemon   文件: ProxyTaskScheduler.java
public ScheduledFuture<?> schedule(Runnable task, Trigger trigger) {
    if (!enabled) {
        logger.debug("skip : {}", task);

        return null;
    }

    ScheduledFuture<?> future = instance.schedule(task, trigger);
    String runnableKey = findRunnableKey(task);

    if (Boolean.FALSE.equals(skipMap.get(runnableKey))) {
        future.cancel(true);
    }

    return future;
}
 
源代码2 项目: buck   文件: ClientSideSlbTest.java
@Before
public void setUp() {
  mockBus = createNiceMock(BuckEventBus.class);
  mockFuture = createMock(ScheduledFuture.class);
  mockClient = createNiceMock(OkHttpClient.class);
  dispatcher = new Dispatcher(createMock(ExecutorService.class));
  EasyMock.expect(mockClient.dispatcher()).andReturn(dispatcher).anyTimes();
  mockScheduler = createMock(ScheduledExecutorService.class);
  mockClock = createMock(Clock.class);
  EasyMock.expect(mockClock.currentTimeMillis()).andReturn(42L).anyTimes();

  config =
      ImmutableClientSideSlbConfig.builder()
          .setClock(mockClock)
          .setServerPool(SERVERS)
          .setEventBus(mockBus)
          .setServerPoolName(SERVER_POOL_NAME)
          .build();
}
 
源代码3 项目: component-runtime   文件: LocalCacheService.java
@Override
public <T> T computeIfAbsent(final Class<T> expectedClass, final String key, final Predicate<Element> toRemove,
        final long timeoutMs, final Supplier<T> value) {

    final Integer maxSize = this.getConfigValue(CacheConfiguration::getDefaultMaxSize, -1);
    if (maxSize > 0 && this.cache.size() >= maxSize) {
        this.clean(); // clean before add one element.
        if (this.cache.size() >= maxSize) {
            synchronized (this.cache) {
                while (this.cache.size() >= maxSize) {
                    final String keyToRemove = this.cache.keySet().iterator().next();
                    this.cache.remove(keyToRemove);
                }
            }
        }
    }

    final ScheduledFuture<?> task = timeoutMs > 0 ? this.evictionTask(key, timeoutMs) : null;

    final long endOfValidity = this.calcEndOfValidity(timeoutMs);
    final ElementImpl element =
            this.addToMap(key, () -> new ElementImpl(value, toRemove, endOfValidity, task, this.timer));

    return element.getValue(expectedClass);
}
 
源代码4 项目: j2objc   文件: ScheduledExecutorSubclassTest.java
/**
 * scheduleAtFixedRate executes series of tasks at given rate.
 * Eventually, it must hold that:
 *   cycles - 1 <= elapsedMillis/delay < cycles
 */
public void testFixedRateSequence() throws InterruptedException {
    final CustomExecutor p = new CustomExecutor(1);
    try (PoolCleaner cleaner = cleaner(p)) {
        for (int delay = 1; delay <= LONG_DELAY_MS; delay *= 3) {
            final long startTime = System.nanoTime();
            final int cycles = 8;
            final CountDownLatch done = new CountDownLatch(cycles);
            final Runnable task = new CheckedRunnable() {
                public void realRun() { done.countDown(); }};
            final ScheduledFuture periodicTask =
                p.scheduleAtFixedRate(task, 0, delay, MILLISECONDS);
            final int totalDelayMillis = (cycles - 1) * delay;
            await(done, totalDelayMillis + LONG_DELAY_MS);
            periodicTask.cancel(true);
            final long elapsedMillis = millisElapsedSince(startTime);
            assertTrue(elapsedMillis >= totalDelayMillis);
            if (elapsedMillis <= cycles * delay)
                return;
            // else retry with longer delay
        }
        fail("unexpected execution rate");
    }
}
 
源代码5 项目: flink   文件: FutureUtils.java
/**
 * Times the given future out after the timeout.
 *
 * @param future to time out
 * @param timeout after which the given future is timed out
 * @param timeUnit time unit of the timeout
 * @param timeoutFailExecutor executor that will complete the future exceptionally after the timeout is reached
 * @param <T> type of the given future
 * @return The timeout enriched future
 */
public static <T> CompletableFuture<T> orTimeout(
	CompletableFuture<T> future,
	long timeout,
	TimeUnit timeUnit,
	Executor timeoutFailExecutor) {

	if (!future.isDone()) {
		final ScheduledFuture<?> timeoutFuture = Delayer.delay(
			() -> timeoutFailExecutor.execute(new Timeout(future)), timeout, timeUnit);

		future.whenComplete((T value, Throwable throwable) -> {
			if (!timeoutFuture.isDone()) {
				timeoutFuture.cancel(false);
			}
		});
	}

	return future;
}
 
@SuppressWarnings("unchecked")
@Test
public void startAndStopWithHeartbeatValue() {
	ScheduledFuture future = mock(ScheduledFuture.class);
	given(this.taskScheduler.scheduleWithFixedDelay(any(Runnable.class), eq(15000L))).willReturn(future);

	this.messageHandler.setTaskScheduler(this.taskScheduler);
	this.messageHandler.setHeartbeatValue(new long[] {15000, 16000});
	this.messageHandler.start();

	verify(this.taskScheduler).scheduleWithFixedDelay(any(Runnable.class), eq(15000L));
	verifyNoMoreInteractions(this.taskScheduler, future);

	this.messageHandler.stop();

	verify(future).cancel(true);
	verifyNoMoreInteractions(future);
}
 
源代码7 项目: ovsdb   文件: HwvtepOperGlobalListener.java
private void connect(Collection<DataTreeModification<Node>> changes) {
    changes.forEach((change) -> {
        InstanceIdentifier<Node> key = change.getRootPath().getRootIdentifier();
        DataObjectModification<Node> mod = change.getRootNode();
        Node node = getCreated(mod);
        if (node == null) {
            return;
        }
        CONNECTED_NODES.put(key, node);
        ScheduledFuture ft = TIMEOUT_FTS.remove(key);
        if (ft != null) {
            ft.cancel(false);
        }
        HwvtepGlobalAugmentation globalAugmentation = node.augmentation(HwvtepGlobalAugmentation.class);
        if (globalAugmentation != null) {
            ConnectionInfo connectionInfo = globalAugmentation.getConnectionInfo();
            if (connectionInfo != null) {
                NODE_CONNECTION_INFO.put(key, connectionInfo);
            }
        }
        if (node != null) {
            synchronized (HwvtepOperGlobalListener.class) {
                NODE_DELET_WAITING_JOBS.putIfAbsent(key, new ArrayList<>());
            }
        }
    });
}
 
源代码8 项目: BlogManagePlatform   文件: BaseTaskService.java
/**
 * 强制取消当前所有任务
 * @author Frodez
 * @date 2019-03-21
 */
public Result cancelAllTasks() {
	int total = taskMap.size();
	int alreadyCanceled = 0;
	for (Entry<Long, ScheduledFuture<?>> entry : taskMap.entrySet()) {
		if (entry.getValue().cancel(true)) {
			taskMap.remove(entry.getKey());
			taskInfoMap.remove(entry.getKey());
			++alreadyCanceled;
		}
	}
	return Result.success(StrUtil.concat("共计", Integer.valueOf(total).toString(), "个任务正在执行,已取消", Integer.valueOf(alreadyCanceled).toString(),
		"个。"));
}
 
源代码9 项目: L2jBrasil   文件: AutoSpawnHandler.java
/**
 * Sets the active state of the specified spawn.
 *
 * @param spawnInst
 * @param isActive
 */
@SuppressWarnings("rawtypes")
public void setSpawnActive(AutoSpawnInstance spawnInst, boolean isActive)
{
	if (spawnInst == null)
		return;

	int objectId = spawnInst._objectId;

	if (isSpawnRegistered(objectId))
	{
		ScheduledFuture spawnTask = null;

		if (isActive)
		{
			AutoSpawner rs = new AutoSpawner(objectId);

			if (spawnInst._desDelay > 0)
				spawnTask = ThreadPoolManager.getInstance().scheduleEffectAtFixedRate(rs,
						spawnInst._initDelay, spawnInst._resDelay);
			else
				spawnTask = ThreadPoolManager.getInstance().scheduleEffect(rs, spawnInst._initDelay);

			_runningSpawns.put(objectId, spawnTask);
		} else
		{
			AutoDespawner rd = new AutoDespawner(objectId);
			spawnTask = _runningSpawns.remove(objectId);

			if (spawnTask != null)
				spawnTask.cancel(false);

			ThreadPoolManager.getInstance().scheduleEffect(rd, 0);
		}

		spawnInst.setSpawnActive(isActive);
	}
}
 
源代码10 项目: lams   文件: ThreadPoolTaskScheduler.java
@Override
public ScheduledFuture<?> schedule(Runnable task, Trigger trigger) {
	ScheduledExecutorService executor = getScheduledExecutor();
	try {
		ErrorHandler errorHandler =
				(this.errorHandler != null ? this.errorHandler : TaskUtils.getDefaultErrorHandler(true));
		return new ReschedulingRunnable(task, trigger, executor, errorHandler).schedule();
	}
	catch (RejectedExecutionException ex) {
		throw new TaskRejectedException("Executor [" + executor + "] did not accept task: " + task, ex);
	}
}
 
源代码11 项目: j2objc   文件: ScheduledExecutorSubclassTest.java
/**
 * remove(task) removes queued task, and fails to remove active task
 */
public void testRemove() throws InterruptedException {
    final CountDownLatch done = new CountDownLatch(1);
    final ScheduledThreadPoolExecutor p = new CustomExecutor(1);
    try (PoolCleaner cleaner = cleaner(p, done)) {
        ScheduledFuture[] tasks = new ScheduledFuture[5];
        final CountDownLatch threadStarted = new CountDownLatch(1);
        for (int i = 0; i < tasks.length; i++) {
            Runnable r = new CheckedRunnable() {
                public void realRun() throws InterruptedException {
                    threadStarted.countDown();
                    await(done);
                }};
            tasks[i] = p.schedule(r, 1, MILLISECONDS);
        }
        await(threadStarted);
        BlockingQueue<Runnable> q = p.getQueue();
        assertFalse(p.remove((Runnable)tasks[0]));
        assertTrue(q.contains((Runnable)tasks[4]));
        assertTrue(q.contains((Runnable)tasks[3]));
        assertTrue(p.remove((Runnable)tasks[4]));
        assertFalse(p.remove((Runnable)tasks[4]));
        assertFalse(q.contains((Runnable)tasks[4]));
        assertTrue(q.contains((Runnable)tasks[3]));
        assertTrue(p.remove((Runnable)tasks[3]));
        assertFalse(q.contains((Runnable)tasks[3]));
    }
}
 
源代码12 项目: Cardshifter   文件: AISystem.java
private void aiPerform(ECSGame game) {
	Set<Entity> ais = game.getEntitiesWithComponent(AIComponent.class);
	
	logger.info("AI entities " + ais);
	for (Entity entity : ais) {
		AIComponent aiComp = ai.get(entity);
		if (aiComp.hasWaitingAction()) {
			logger.info(entity + " already has a waiting action");
			continue;
		}
		if (aiComp.isPaused()) {
			logger.info(entity + " AI is paused");
			continue;
		}
		
		long delay = aiComp.getDelay();
		ECSAction action = aiComp.getAI().getAction(entity);
		if (action != null && !game.isGameOver()) {
			logger.info(entity + " will perform " + action + " in " + delay + " milliseconds");
			Runnable runnable = () -> this.perform(entity, action);
			if (delay <= 0) {
				runnable.run();
			}
			else {
				ScheduledFuture<?> future = executor.schedule(runnable, delay, TimeUnit.MILLISECONDS);
				aiComp.future = future;
			}
			return;
		}
		else {
			logger.info(entity + ": No actions available");
		}
	}
}
 
源代码13 项目: openhab-core   文件: WatchQueueReader.java
private Map<Path, @Nullable ScheduledFuture<?>> getKeyFutures(WatchKey key) {
    Map<Path, @Nullable ScheduledFuture<?>> keyFutures = futures.get(key);
    if (keyFutures == null) {
        keyFutures = new ConcurrentHashMap<>();
        futures.put(key, keyFutures);
    }
    return keyFutures;
}
 
@Override
public ScheduledFuture<?> scheduleAtFixedRate(
        Runnable task,
        Instant startTime,
        Duration period) {
    return this.delegate.scheduleAtFixedRate(TraceRunnable.wrap(task), startTime, period);
}
 
/**
 * Stops the current execution of a task and schedules a runnable task for execution again.
 * This uses a fixed size scheduler to limit thread execution.
 *
 * @param futureTask the {@link ScheduledFuture} for the current scheduled task
 * @param runnableTask the {@link Runnable} to execute
 * @param delay the delay in milliseconds before the task will be executed
 * @return the {@link ScheduledFuture} for the scheduled task
 */
public ScheduledFuture<?> rescheduleTask(ScheduledFuture<?> futureTask, Runnable runnableTask, long delay) {
    futureTask.cancel(false);
    if (networkState != ZigBeeNetworkState.ONLINE) {
        logger.debug("ZigBeeNetworkManager rescheduleTask: not scheduling task while {}", networkState);
        return null;
    }

    return executorService.schedule(runnableTask, delay, TimeUnit.MILLISECONDS);
}
 
源代码16 项目: openjdk-8-source   文件: LdapTimeoutTest.java
ScheduledFuture killSwitch() {
    final Thread current = Thread.currentThread();
    return LdapTimeoutTest.pool.schedule(new Callable<Void>() {
        public Void call() throws Exception {
            System.err.println("Fail: killSwitch()");
            current.interrupt();
            return null;
        }
    }, 5000, TimeUnit.MILLISECONDS);
}
 
源代码17 项目: dhis2-core   文件: DefaultSchedulingManager.java
@Override
public void scheduleJob( JobConfiguration jobConfiguration )
{
    if ( ifJobInSystemStop( jobConfiguration.getUid() ) )
    {
        JobInstance jobInstance = new DefaultJobInstance( this, messageService, leaderManager );

        if ( jobConfiguration.getUid() != null && !futures.containsKey( jobConfiguration.getUid() ) )
        {
            log.info( String.format( "Scheduling job: %s", jobConfiguration ) );

            ScheduledFuture<?> future = null;

            if ( jobConfiguration.getJobType().isCronSchedulingType() )
            {
                future = jobScheduler.schedule( () -> jobInstance.execute( jobConfiguration ),
                    new CronTrigger( jobConfiguration.getCronExpression() ) );
            }
            else if ( jobConfiguration.getJobType().isFixedDelaySchedulingType() )
            {
                future = jobScheduler.scheduleWithFixedDelay( () -> jobInstance.execute( jobConfiguration ),
                    Instant.now().plusSeconds( DEFAULT_INITIAL_DELAY_S ),
                    Duration.of( jobConfiguration.getDelay(), ChronoUnit.SECONDS ) );
            }

            futures.put( jobConfiguration.getUid(), future );

            log.info( String.format( "Scheduled job: %s", jobConfiguration ) );
        }
    }
}
 
源代码18 项目: fiware-cepheus   文件: SubscriptionManagerTest.java
@Test
public void testUnsubscribeOnEventTypeRemoval() {

    // Mock the task scheduler and capture the runnable
    ArgumentCaptor<Runnable> runnableArg = ArgumentCaptor.forClass(Runnable.class);
    when(taskScheduler.scheduleWithFixedDelay(runnableArg.capture(), anyLong())).thenReturn(Mockito.mock(ScheduledFuture.class));

    // Mock the response to the subsribeContext
    ArgumentCaptor<SuccessCallback> successArg = ArgumentCaptor.forClass(SuccessCallback.class);
    ListenableFuture<SubscribeContextResponse> responseFuture = Mockito.mock(ListenableFuture.class);
    doNothing().when(responseFuture).addCallback(successArg.capture(), any());

    // Return the mocked future on subscription
    when(ngsiClient.subscribeContext(any(), any(), any())).thenReturn(responseFuture);

    Configuration configuration = getBasicConf();
    subscriptionManager.setConfiguration(configuration);

    // Execute scheduled runnable
    runnableArg.getValue().run();

    // Return the SubscribeContextResponse
    callSuccessCallback(successArg);

    // Mock future for unsubscribeContext
    ListenableFuture<UnsubscribeContextResponse> responseFuture2 = Mockito.mock(ListenableFuture.class);
    doNothing().when(responseFuture2).addCallback(successArg.capture(), any());
    when(ngsiClient.unsubscribeContext(eq("http://iotAgent"), eq(null), eq("12345678"))).thenReturn(responseFuture2);

    // Set a configuration without the eventType
    Configuration emptyConfiguration = new Configuration();
    emptyConfiguration.setEventTypeIns(Collections.emptyList());
    subscriptionManager.setConfiguration(emptyConfiguration);

    // Check that unsubsribe is called when a later configuration removed the event type
    Assert.notNull(successArg.getValue());
}
 
源代码19 项目: GitToolBox   文件: ReschedulingExecutor.java
public Future<?> schedule(String id, Runnable task, long delay, TimeUnit timeUnit) {
  if (active.get()) {
    ScheduledFuture<?> newFuture = executor.schedule(task, delay, timeUnit);
    log.debug("Scheduled ", id, ": ", task);
    Optional.ofNullable(tasks.put(id, newFuture)).ifPresent(oldFuture -> {
      log.debug("Cancelling ", id, " interrupt=", mayInterrupt, ": ", oldFuture);
      oldFuture.cancel(mayInterrupt);
      log.debug("Cancelled ", id, " interrupt=", mayInterrupt, ": ", oldFuture);
    });
    return newFuture;
  } else {
    log.debug("Schedule ", id, " while inactive: ", task);
    return Futures.immediateCancelledFuture();
  }
}
 
/**
 * Schedule statistics updater tasks for the given service/cartridge type.
 *
 * @param serviceName service name/cartridge type
 */
public void scheduleStatisticsUpdaterTasks(String serviceName) {
    synchronized (MockHealthStatisticsGenerator.class) {
        if (!statisticsUpdaterTasksScheduled(serviceName)) {
            List<MockHealthStatisticsPattern> statisticsPatterns = MockIaasConfig.getInstance().
                    getMockHealthStatisticsConfig().getStatisticsPatterns();

            Map<String, ScheduledFuture> taskList = serviceNameToTaskListMap.get(serviceName);
            if (taskList == null) {
                taskList = new ConcurrentHashMap<String, ScheduledFuture>();
                serviceNameToTaskListMap.put(serviceName, taskList);
            }

            for (MockHealthStatisticsPattern statisticsPattern : statisticsPatterns) {
                if (statisticsPattern.getCartridgeType().equals(serviceName) &&
                        (statisticsPattern.getSampleDuration() > 0)) {
                    MockHealthStatisticsUpdater runnable = new MockHealthStatisticsUpdater(statisticsPattern);
                    ScheduledFuture<?> task = scheduledExecutorService.scheduleAtFixedRate(runnable, 0,
                            statisticsPattern.getSampleDuration(), TimeUnit.SECONDS);
                    taskList.put(statisticsPattern.getFactor().toString(), task);
                }
            }

            if (log.isInfoEnabled()) {
                log.info(String.format("Mock statistics updaters scheduled: [service-name] %s", serviceName));
            }
        }
    }
}
 
源代码21 项目: j2objc   文件: ScheduledExecutorSubclassTest.java
/**
 * shutdownNow returns a list containing tasks that were not run,
 * and those tasks are drained from the queue
 */
public void testShutdownNow_delayedTasks() throws InterruptedException {
    final CustomExecutor p = new CustomExecutor(1);
    List<ScheduledFuture> tasks = new ArrayList<>();
    for (int i = 0; i < 3; i++) {
        Runnable r = new NoOpRunnable();
        tasks.add(p.schedule(r, 9, SECONDS));
        tasks.add(p.scheduleAtFixedRate(r, 9, 9, SECONDS));
        tasks.add(p.scheduleWithFixedDelay(r, 9, 9, SECONDS));
    }
    if (testImplementationDetails)
        assertEquals(new HashSet(tasks), new HashSet(p.getQueue()));
    final List<Runnable> queuedTasks;
    try {
        queuedTasks = p.shutdownNow();
    } catch (SecurityException ok) {
        return; // Allowed in case test doesn't have privs
    }
    assertTrue(p.isShutdown());
    assertTrue(p.getQueue().isEmpty());
    if (testImplementationDetails)
        assertEquals(new HashSet(tasks), new HashSet(queuedTasks));
    assertEquals(tasks.size(), queuedTasks.size());
    for (ScheduledFuture task : tasks) {
        assertFalse(((CustomTask)task).ran);
        assertFalse(task.isDone());
        assertFalse(task.isCancelled());
    }
    assertTrue(p.awaitTermination(LONG_DELAY_MS, MILLISECONDS));
    assertTrue(p.isTerminated());
}
 
源代码22 项目: grpc-nebula-java   文件: FakeClock.java
@Override public ScheduledFuture<?> schedule(Runnable cmd, long delay, TimeUnit unit) {
  ScheduledTask task = new ScheduledTask(currentTimeNanos + unit.toNanos(delay), cmd);
  if (delay > 0) {
    scheduledTasks.add(task);
  } else {
    dueTasks.add(task);
  }
  return task;
}
 
源代码23 项目: haven-platform   文件: SearchIndex.java
@Override
public void close() throws Exception {
    ScheduledFuture<?> future = this.future;
    if(future != null) {
        future.cancel(true);
    }
}
 
源代码24 项目: smarthome   文件: ChannelState.java
private void receivedOrTimeout() {
    final ScheduledFuture<?> scheduledFuture = this.scheduledFuture;
    if (scheduledFuture != null) { // Cancel timeout
        scheduledFuture.cancel(false);
        this.scheduledFuture = null;
    }
    future.complete(null);
}
 
源代码25 项目: tez   文件: ControlledScheduledExecutorService.java
@Override
public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command, long initialDelay, long delay, TimeUnit unit) {
  ScheduledFutureTask<Void> task = new ScheduledFutureTask<>(command, null,
      toTimestamp(initialDelay, unit), unit.toMillis(delay));
  schedule(task);
  return task;
}
 
源代码26 项目: actor4j-core   文件: ActorTimerExecuterService.java
@Override
public ScheduledFuture<?> scheduleOnce(final Supplier<ActorMessage<?>> supplier, final ActorGroup group, long delay, TimeUnit unit) {
	return timerExecuterService.schedule(new Runnable() {
		@Override
		public void run() {
			ActorMessage<?> message = supplier.get();
			for (UUID id : group) {
				message.dest = id;
				system.send(message);
			}
		}
	}, delay, unit); 
}
 
源代码27 项目: Flink-CEPplus   文件: FileCacheDirectoriesTest.java
@Override
public ScheduledFuture<?> schedule(Runnable command, long delay, TimeUnit unit) {

	if (command instanceof FileCache.DeleteProcess) {
		assertNull("Multiple delete process registered", lastDeleteProcess);
		lastDeleteProcess = (FileCache.DeleteProcess) command;
		lastDelayMillis = unit.toMillis(delay);
		return super.schedule(() -> {}, delay, unit);
	} else {
		return super.schedule(command, delay, unit);
	}
}
 
源代码28 项目: audit4j-core   文件: ReschedulingRunnable.java
/**
 * {@inheritDoc}
 * 
 * @see java.util.concurrent.Future#get(long, java.util.concurrent.TimeUnit)
 *
 */
@Override
public Object get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException {
    ScheduledFuture<?> curr;
    synchronized (this.triggerContextMonitor) {
        curr = this.currentFuture;
    }
    return curr.get(timeout, unit);
}
 
源代码29 项目: util4j   文件: TaskCenter.java
public void close() {
	System.out.println("TaskCenter closing");
	this.monitoring.cancel(true);

	// 停止所有定时任务
	for (ScheduledFuture<?> sf : this.scheduledFutureList) {
		if (!sf.isCancelled() || !sf.isDone()) {
			sf.cancel(true);
		}
	}
	this.scheduledFutureList.clear();

	Iterator<Timer> iter = this.timers.values().iterator();
	while (iter.hasNext()) {
		Timer timer = iter.next();
		timer.cancel();
	}
	this.timers.clear();

	// 关闭滑动窗
	this.slidingWindow.stop();

	// 关闭线程池
	this.mainExecutor.shutdown();
	this.scheduledExecutor.shutdown();
	System.out.println("TaskCenter closed");
}
 
源代码30 项目: flink   文件: AkkaRpcServiceTest.java
/**
 * Tests that the RPC service's scheduled executor service can execute runnable with a fixed
 * delay.
 */
@Test(timeout = 60000)
public void testScheduledExecutorServiceWithFixedDelaySchedule() throws Exception {
	ScheduledExecutor scheduledExecutor = akkaRpcService.getScheduledExecutor();

	final int tries = 4;
	final long delay = 10L;
	final CountDownLatch countDownLatch = new CountDownLatch(tries);

	long currentTime = System.nanoTime();

	ScheduledFuture<?> future = scheduledExecutor.scheduleWithFixedDelay(
		countDownLatch::countDown,
		delay,
		delay,
		TimeUnit.MILLISECONDS);

	assertTrue(!future.isDone());

	countDownLatch.await();

	// the future should not complete since we have a periodic task
	assertTrue(!future.isDone());

	long finalTime = System.nanoTime() - currentTime;

	// the processing should have taken at least delay times the number of count downs.
	assertTrue(finalTime >= tries * delay);

	future.cancel(true);
}
 
 类所在包
 同包方法