下面列出了java.util.concurrent.ScheduledExecutorService#scheduleWithFixedDelay ( ) 实例代码,或者点击链接到github查看源代码,也可以在右侧发表评论。
/**
* Register the specified {@link ScheduledExecutorTask ScheduledExecutorTasks}
* on the given {@link ScheduledExecutorService}.
* @param tasks the specified ScheduledExecutorTasks (never empty)
* @param executor the ScheduledExecutorService to register the tasks on.
*/
protected void registerTasks(ScheduledExecutorTask[] tasks, ScheduledExecutorService executor) {
for (ScheduledExecutorTask task : tasks) {
Runnable runnable = getRunnableToSchedule(task);
if (task.isOneTimeTask()) {
executor.schedule(runnable, task.getDelay(), task.getTimeUnit());
}
else {
if (task.isFixedRate()) {
executor.scheduleAtFixedRate(runnable, task.getDelay(), task.getPeriod(), task.getTimeUnit());
}
else {
executor.scheduleWithFixedDelay(runnable, task.getDelay(), task.getPeriod(), task.getTimeUnit());
}
}
}
}
@Inject
public DefaultClaimStore(LifeCycleRegistry lifeCycle, @MetricsGroupName String metricsGroup, MetricRegistry metricRegistry) {
ScheduledExecutorService scheduledExecutor = defaultScheduledExecutor(lifeCycle, metricsGroup);
// Periodically cleanup ClaimSets with no active claims.
scheduledExecutor.scheduleWithFixedDelay(new Runnable() {
@Override
public void run() {
removeEmptyClaimSets();
}
}, 15, 15, TimeUnit.SECONDS);
// Expose gauges for the # of channels and # of claims.
metricRegistry.register(MetricRegistry.name(metricsGroup, "DefaultClaimStore", "channels"), new Gauge<Integer>() {
@Override
public Integer getValue() {
return getNumChannels();
}
});
metricRegistry.register(MetricRegistry.name(metricsGroup, "DefaultClaimStore", "claims"), new Gauge<Integer>() {
@Override
public Integer getValue() {
return getNumClaims();
}
});
}
/**
* Register the specified {@link ScheduledExecutorTask ScheduledExecutorTasks}
* on the given {@link ScheduledExecutorService}.
* @param tasks the specified ScheduledExecutorTasks (never empty)
* @param executor the ScheduledExecutorService to register the tasks on.
*/
protected void registerTasks(ScheduledExecutorTask[] tasks, ScheduledExecutorService executor) {
for (ScheduledExecutorTask task : tasks) {
Runnable runnable = getRunnableToSchedule(task);
if (task.isOneTimeTask()) {
executor.schedule(runnable, task.getDelay(), task.getTimeUnit());
}
else {
if (task.isFixedRate()) {
executor.scheduleAtFixedRate(runnable, task.getDelay(), task.getPeriod(), task.getTimeUnit());
}
else {
executor.scheduleWithFixedDelay(runnable, task.getDelay(), task.getPeriod(), task.getTimeUnit());
}
}
}
}
@PostConstruct
public void init() {
freshConfigTypeCache();
ScheduledExecutorService scheduledExecutorService = Executors.newSingleThreadScheduledExecutor();
scheduledExecutorService.scheduleWithFixedDelay(new Runnable() {
@Override
public void run() {
Thread.currentThread().setName("fresh-config-type-thread");
try {
freshConfigTypeCache();
} catch (Throwable e) {
logger.error("fresh config type cache error", e);
}
}
}, 3, 3, TimeUnit.MINUTES);
}
/**
* Need to be called during getQualifiedSlaveNames
*
*/
private void initialize() {
if (freshnessUpdatorRef.get() != null)
return;
synchronized (FreshnessReader.class) {
if (freshnessUpdatorRef.get() != null)
return;
freshnessCache.clear();
DalConfigure configure = DalClientFactory.getDalConfigure();
for (String logicDbName : configure.getDatabaseSetNames()) {
Map<String, Integer> logicDbFreshnessMap = new ConcurrentHashMap<>();
freshnessCache.put(logicDbName, logicDbFreshnessMap);
DatabaseSet dbSet = configure.getDatabaseSet(logicDbName);
for (Map.Entry<String, DataBase> dbEntry : dbSet.getDatabases().entrySet()) {
if (!dbEntry.getValue().isMaster())
logicDbFreshnessMap.put(dbEntry.getValue().getConnectionString(), INVALID);
}
}
if (reader == null)
return;
// Init task
ScheduledExecutorService executer = Executors.newScheduledThreadPool(1, new ThreadFactory() {
AtomicInteger atomic = new AtomicInteger();
@Override
public Thread newThread(Runnable r) {
return new Thread(r, "Dal-FreshnessScanner" + this.atomic.getAndIncrement());
}
});
executer.scheduleWithFixedDelay(new FreshnessScanner(reader), 0, interval, TimeUnit.SECONDS);
freshnessUpdatorRef.set(executer);
}
}
public DemoServiceImpl() {
ScheduledExecutorService scheduled = Executors.newSingleThreadScheduledExecutor();
scheduled.scheduleWithFixedDelay(() -> {
callbacks.forEach(callback -> {
try {
boolean res = callback.echo("callback time: " + System.currentTimeMillis());
logger.info("send callback is succeed! return " + res);
} catch (Exception e) {
logger.error("send callback is failed, cause by " + e.getMessage(), e);
}
});
}, 5000, 5000, TimeUnit.MILLISECONDS);
}
@VisibleForTesting
ConfigFileWatcher(int pollPeriodSeconds) {
ScheduledExecutorService service = Executors.newSingleThreadScheduledExecutor(
new ThreadFactoryBuilder().setDaemon(true).setNameFormat("ConfigFileWatcher-%d").build());
this.watcherTask = new WatcherTask();
service.scheduleWithFixedDelay(
watcherTask, pollPeriodSeconds, pollPeriodSeconds, TimeUnit.SECONDS);
}
@Override
public ScheduledFuture<?> scheduleWithFixedDelay(Runnable task, long delay) {
ScheduledExecutorService executor = getScheduledExecutor();
try {
return executor.scheduleWithFixedDelay(errorHandlingTask(task, true), 0, delay, TimeUnit.MILLISECONDS);
}
catch (RejectedExecutionException ex) {
throw new TaskRejectedException("Executor [" + executor + "] did not accept task: " + task, ex);
}
}
/**
* Lets wait until there are enough Ready pods of the given Job
*/
private void waitUntilJobIsScaled() {
final CountDownLatch countDownLatch = new CountDownLatch(1);
final AtomicReference<Job> atomicJob = new AtomicReference<>();
final Runnable jobPoller = () -> {
try {
Job job = getMandatory();
atomicJob.set(job);
Integer activeJobs = job.getStatus().getActive();
if (activeJobs == null) {
activeJobs = 0;
}
if (Objects.equals(job.getSpec().getParallelism(), activeJobs)) {
countDownLatch.countDown();
} else {
LOG.debug("Only {}/{} pods scheduled for Job: {} in namespace: {} seconds so waiting...",
job.getStatus().getActive(), job.getSpec().getParallelism(), job.getMetadata().getName(), namespace);
}
} catch (Throwable t) {
LOG.error("Error while waiting for Job to be scaled.", t);
}
};
ScheduledExecutorService executor = Executors.newSingleThreadScheduledExecutor();
ScheduledFuture poller = executor.scheduleWithFixedDelay(jobPoller, 0, POLL_INTERVAL_MS, TimeUnit.MILLISECONDS);
try {
countDownLatch.await(getConfig().getScaleTimeout(), TimeUnit.MILLISECONDS);
executor.shutdown();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
poller.cancel(true);
executor.shutdown();
LOG.error("Only {}/{} pod(s) ready for Job: {} in namespace: {} - giving up",
atomicJob.get().getStatus().getActive(), atomicJob.get().getSpec().getParallelism(), atomicJob.get().getMetadata().getName(), namespace);
}
}
public static TasmoServiceHandle<TasmoProcessingStats> initialize(final TasmoProcessingStatsConfig config) {
final TasmoProcessingStats processingStats = new TasmoProcessingStats();
final ScheduledExecutorService scheduledExecutorService = Executors.newSingleThreadScheduledExecutor();
return new TasmoServiceHandle<TasmoProcessingStats>() {
@Override
public TasmoProcessingStats getService() {
return processingStats;
}
@Override
public void start() throws Exception {
int logStatsEveryNSeconds = config.getLogStatsEveryNSeconds();
if (logStatsEveryNSeconds > 0) {
scheduledExecutorService.scheduleWithFixedDelay(new Runnable() {
@Override
public void run() {
try {
processingStats.logStats();
} catch (Exception x) {
LOG.error("Issue with logging stats. ", x);
}
}
}, logStatsEveryNSeconds, logStatsEveryNSeconds, TimeUnit.SECONDS);
}
}
@Override
public void stop() throws Exception {
scheduledExecutorService.shutdownNow();
}
};
}
static ScheduledFuture<?> scheduleTruncate(long delayInMs,
long intervalInMs,
Queue<? extends ContextLog> q,
ScheduledExecutorService scheduler,
TimeValue expiration) {
return scheduler.scheduleWithFixedDelay(
() -> removeExpiredLogs(q, System.currentTimeMillis(), expiration.getMillis()),
delayInMs,
intervalInMs,
TimeUnit.MILLISECONDS);
}
@Override
public ScheduledFuture<?> scheduleWithFixedDelay(Runnable task, long delay) {
ScheduledExecutorService executor = getScheduledExecutor();
try {
return executor.scheduleWithFixedDelay(errorHandlingTask(task, true), 0, delay, TimeUnit.MILLISECONDS);
}
catch (RejectedExecutionException ex) {
throw new TaskRejectedException("Executor [" + executor + "] did not accept task: " + task, ex);
}
}
@Override
public void start() throws LifeCycleStartException {
writeLock.lock();
try {
if (isRunning()) {
return;
}
running.set(true);
final ScheduledExecutorService newExecutor = new FlowEngine(2, "Flow Service Tasks");
newExecutor.scheduleWithFixedDelay(new SaveReportingTask(), 0L, 500L, TimeUnit.MILLISECONDS);
this.executor.set(newExecutor);
if (configuredForClustering) {
senderListener.start();
}
} catch (final IOException ioe) {
try {
stop(/* force */true);
} catch (final Exception e) {
}
throw new LifeCycleStartException("Failed to start Flow Service due to: " + ioe, ioe);
} finally {
writeLock.unlock();
}
}
/** @param lafManager IDEA look-and-feel manager for getting and setting the current theme */
public DarkModeSync(final LafManager lafManager) {
themes = ServiceManager.getService(DarkModeSyncThemes.class);
this.lafManager = lafManager;
// Checks if OS is Windows or MacOS
if (!(SystemInfo.isMacOSMojave || SystemInfo.isWin10OrNewer)) {
logger.error("Plugin only supports macOS Mojave and greater or Windows 10 and greater");
scheduledFuture = null;
return;
}
ScheduledExecutorService executor = JobScheduler.getScheduler();
scheduledFuture =
executor.scheduleWithFixedDelay(this::updateLafIfNecessary, 0, 3, TimeUnit.SECONDS);
}
/**
* Determine if the machine we're running on has timing issues.
*/
private void detectTimingIssues() {
final int minRequiredOccurrences = 25;
final int maxOccurrencesOutOfRange = 15;
final AtomicLong lastTriggerMillis = new AtomicLong(System.currentTimeMillis());
final ScheduledExecutorService service = Executors.newScheduledThreadPool(1, new ThreadFactory() {
private final ThreadFactory defaultFactory = Executors.defaultThreadFactory();
@Override
public Thread newThread(final Runnable r) {
final Thread t = defaultFactory.newThread(r);
t.setDaemon(true);
t.setName("Detect Timing Issues");
return t;
}
});
final AtomicInteger occurrencesOutOfRange = new AtomicInteger(0);
final AtomicInteger occurrences = new AtomicInteger(0);
final Runnable command = new Runnable() {
@Override
public void run() {
final long curMillis = System.currentTimeMillis();
final long difference = curMillis - lastTriggerMillis.get();
final long millisOff = Math.abs(difference - 2000L);
occurrences.incrementAndGet();
if (millisOff > 500L) {
occurrencesOutOfRange.incrementAndGet();
}
lastTriggerMillis.set(curMillis);
}
};
final ScheduledFuture<?> future = service.scheduleWithFixedDelay(command, 2000L, 2000L, TimeUnit.MILLISECONDS);
final TimerTask timerTask = new TimerTask() {
@Override
public void run() {
future.cancel(true);
service.shutdownNow();
if (occurrences.get() < minRequiredOccurrences || occurrencesOutOfRange.get() > maxOccurrencesOutOfRange) {
LOGGER.warn("NiFi has detected that this box is not responding within the expected timing interval, which may cause "
+ "Processors to be scheduled erratically. Please see the NiFi documentation for more information.");
}
}
};
final Timer timer = new Timer(true);
timer.schedule(timerTask, 60000L);
}
/**
* Lets wait until there are enough Ready pods of the given Deployment
*/
private void waitUntilDeploymentConfigIsScaled(final int count) {
final BlockingQueue<Object> queue = new ArrayBlockingQueue<Object>(1);
final AtomicReference<Integer> replicasRef = new AtomicReference<>(0);
final String name = checkName(getItem());
final String namespace = checkNamespace(getItem());
final Runnable deploymentPoller = () -> {
try {
DeploymentConfig deploymentConfig = get();
//If the rs is gone, we shouldn't wait.
if (deploymentConfig == null) {
if (count == 0) {
queue.put(true);
return;
} else {
queue.put(new IllegalStateException("Can't wait for DeploymentConfig: " + checkName(getItem()) + " in namespace: " + checkName(getItem()) + " to scale. Resource is no longer available."));
return;
}
}
replicasRef.set(deploymentConfig.getStatus().getReplicas());
int currentReplicas = deploymentConfig.getStatus().getReplicas() != null ? deploymentConfig.getStatus().getReplicas() : 0;
if (deploymentConfig.getStatus().getObservedGeneration() >= deploymentConfig.getMetadata().getGeneration() && Objects.equals(deploymentConfig.getSpec().getReplicas(), currentReplicas)) {
queue.put(true);
} else {
LOG.debug("Only {}/{} pods scheduled for DeploymentConfig: {} in namespace: {} seconds so waiting...",
deploymentConfig.getStatus().getReplicas(), deploymentConfig.getSpec().getReplicas(), deploymentConfig.getMetadata().getName(), namespace);
}
} catch (Throwable t) {
LOG.error("Error while waiting for Deployment to be scaled.", t);
}
};
ScheduledExecutorService executor = Executors.newSingleThreadScheduledExecutor();
ScheduledFuture poller = executor.scheduleWithFixedDelay(deploymentPoller, 0, POLL_INTERVAL_MS, TimeUnit.MILLISECONDS);
try {
if (Utils.waitUntilReady(queue, getConfig().getScaleTimeout(), TimeUnit.MILLISECONDS)) {
LOG.debug("{}/{} pod(s) ready for DeploymentConfig: {} in namespace: {}.",
replicasRef.get(), count, name, namespace);
} else {
LOG.error("{}/{} pod(s) ready for DeploymentConfig: {} in namespace: {} after waiting for {} seconds so giving up",
replicasRef.get(), count, name, namespace, TimeUnit.MILLISECONDS.toSeconds(getConfig().getScaleTimeout()));
}
} finally {
poller.cancel(true);
executor.shutdown();
}
}
public static JobManagerSharedServices fromConfiguration(
Configuration config,
BlobServer blobServer) throws Exception {
checkNotNull(config);
checkNotNull(blobServer);
final String classLoaderResolveOrder =
config.getString(CoreOptions.CLASSLOADER_RESOLVE_ORDER);
final String[] alwaysParentFirstLoaderPatterns = CoreOptions.getParentFirstLoaderPatterns(config);
final BlobLibraryCacheManager libraryCacheManager =
new BlobLibraryCacheManager(
blobServer,
FlinkUserCodeClassLoaders.ResolveOrder.fromString(classLoaderResolveOrder),
alwaysParentFirstLoaderPatterns);
final FiniteDuration timeout;
try {
timeout = AkkaUtils.getTimeout(config);
} catch (NumberFormatException e) {
throw new IllegalConfigurationException(AkkaUtils.formatDurationParsingErrorMessage());
}
final ScheduledExecutorService futureExecutor = Executors.newScheduledThreadPool(
Hardware.getNumberCPUCores(),
new ExecutorThreadFactory("jobmanager-future"));
final StackTraceSampleCoordinator stackTraceSampleCoordinator =
new StackTraceSampleCoordinator(futureExecutor, timeout.toMillis());
final int cleanUpInterval = config.getInteger(WebOptions.BACKPRESSURE_CLEANUP_INTERVAL);
final BackPressureStatsTrackerImpl backPressureStatsTracker = new BackPressureStatsTrackerImpl(
stackTraceSampleCoordinator,
cleanUpInterval,
config.getInteger(WebOptions.BACKPRESSURE_NUM_SAMPLES),
config.getInteger(WebOptions.BACKPRESSURE_REFRESH_INTERVAL),
Time.milliseconds(config.getInteger(WebOptions.BACKPRESSURE_DELAY)));
futureExecutor.scheduleWithFixedDelay(
backPressureStatsTracker::cleanUpOperatorStatsCache,
cleanUpInterval,
cleanUpInterval,
TimeUnit.MILLISECONDS);
return new JobManagerSharedServices(
futureExecutor,
libraryCacheManager,
RestartStrategyFactory.createRestartStrategyFactory(config),
stackTraceSampleCoordinator,
backPressureStatsTracker,
blobServer);
}
@Override
protected ScheduledFuture<?> schedule(final Runnable runnable,
final ScheduledExecutorService executorService) {
return executorService.scheduleWithFixedDelay(runnable, 0, RETRY_INTERVAL_MILLIS, MILLISECONDS);
}
protected void schedule(ScheduledExecutorService scheduledExecutorService,long milliseconds,Runnable runnable) {
scheduledExecutorService.scheduleWithFixedDelay(runnable, milliseconds, milliseconds,
TimeUnit.MILLISECONDS);
}
public static void main(String[] args) throws Exception {
ScheduledExecutorService readers = Executors.newScheduledThreadPool(3);
readers.scheduleWithFixedDelay(new Runnable() {
@Override
public void run() {
System.out.println(Thread.currentThread().getName());
downCount();
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}, 0, 10, MILLISECONDS);
synchronized (monitor) {
if(count > 0) monitor.wait();
readers.shutdown();
}
}