java.util.concurrent.ScheduledExecutorService#scheduleWithFixedDelay ( )源码实例Demo

下面列出了java.util.concurrent.ScheduledExecutorService#scheduleWithFixedDelay ( ) 实例代码,或者点击链接到github查看源代码,也可以在右侧发表评论。

/**
 * Register the specified {@link ScheduledExecutorTask ScheduledExecutorTasks}
 * on the given {@link ScheduledExecutorService}.
 * @param tasks the specified ScheduledExecutorTasks (never empty)
 * @param executor the ScheduledExecutorService to register the tasks on.
 */
protected void registerTasks(ScheduledExecutorTask[] tasks, ScheduledExecutorService executor) {
	for (ScheduledExecutorTask task : tasks) {
		Runnable runnable = getRunnableToSchedule(task);
		if (task.isOneTimeTask()) {
			executor.schedule(runnable, task.getDelay(), task.getTimeUnit());
		}
		else {
			if (task.isFixedRate()) {
				executor.scheduleAtFixedRate(runnable, task.getDelay(), task.getPeriod(), task.getTimeUnit());
			}
			else {
				executor.scheduleWithFixedDelay(runnable, task.getDelay(), task.getPeriod(), task.getTimeUnit());
			}
		}
	}
}
 
源代码2 项目: emodb   文件: DefaultClaimStore.java
@Inject
public DefaultClaimStore(LifeCycleRegistry lifeCycle, @MetricsGroupName String metricsGroup, MetricRegistry metricRegistry) {
    ScheduledExecutorService scheduledExecutor = defaultScheduledExecutor(lifeCycle, metricsGroup);

    // Periodically cleanup ClaimSets with no active claims.
    scheduledExecutor.scheduleWithFixedDelay(new Runnable() {
        @Override
        public void run() {
            removeEmptyClaimSets();
        }
    }, 15, 15, TimeUnit.SECONDS);

    // Expose gauges for the # of channels and # of claims.
    metricRegistry.register(MetricRegistry.name(metricsGroup, "DefaultClaimStore", "channels"), new Gauge<Integer>() {
        @Override
        public Integer getValue() {
            return getNumChannels();
        }
    });
    metricRegistry.register(MetricRegistry.name(metricsGroup, "DefaultClaimStore", "claims"), new Gauge<Integer>() {
        @Override
        public Integer getValue() {
            return getNumClaims();
        }
    });
}
 
/**
 * Register the specified {@link ScheduledExecutorTask ScheduledExecutorTasks}
 * on the given {@link ScheduledExecutorService}.
 * @param tasks the specified ScheduledExecutorTasks (never empty)
 * @param executor the ScheduledExecutorService to register the tasks on.
 */
protected void registerTasks(ScheduledExecutorTask[] tasks, ScheduledExecutorService executor) {
	for (ScheduledExecutorTask task : tasks) {
		Runnable runnable = getRunnableToSchedule(task);
		if (task.isOneTimeTask()) {
			executor.schedule(runnable, task.getDelay(), task.getTimeUnit());
		}
		else {
			if (task.isFixedRate()) {
				executor.scheduleAtFixedRate(runnable, task.getDelay(), task.getPeriod(), task.getTimeUnit());
			}
			else {
				executor.scheduleWithFixedDelay(runnable, task.getDelay(), task.getPeriod(), task.getTimeUnit());
			}
		}
	}
}
 
源代码4 项目: qconfig   文件: CacheConfigTypeServiceImpl.java
@PostConstruct
public void init() {
    freshConfigTypeCache();

    ScheduledExecutorService scheduledExecutorService = Executors.newSingleThreadScheduledExecutor();
    scheduledExecutorService.scheduleWithFixedDelay(new Runnable() {
        @Override
        public void run() {
            Thread.currentThread().setName("fresh-config-type-thread");
            try {
                freshConfigTypeCache();
            } catch (Throwable e) {
                logger.error("fresh config type cache error", e);
            }
        }
    }, 3, 3, TimeUnit.MINUTES);
}
 
源代码5 项目: dal   文件: FreshnessSelector.java
/**
 * Need to be called during getQualifiedSlaveNames
 *
 */
private void initialize() {
    if (freshnessUpdatorRef.get() != null)
        return;

    synchronized (FreshnessReader.class) {
        if (freshnessUpdatorRef.get() != null)
            return;

        freshnessCache.clear();
        DalConfigure configure = DalClientFactory.getDalConfigure();
        for (String logicDbName : configure.getDatabaseSetNames()) {
            Map<String, Integer> logicDbFreshnessMap = new ConcurrentHashMap<>();
            freshnessCache.put(logicDbName, logicDbFreshnessMap);

            DatabaseSet dbSet = configure.getDatabaseSet(logicDbName);
            for (Map.Entry<String, DataBase> dbEntry : dbSet.getDatabases().entrySet()) {
                if (!dbEntry.getValue().isMaster())
                    logicDbFreshnessMap.put(dbEntry.getValue().getConnectionString(), INVALID);
            }
        }

        if (reader == null)
            return;

        // Init task
        ScheduledExecutorService executer = Executors.newScheduledThreadPool(1, new ThreadFactory() {
            AtomicInteger atomic = new AtomicInteger();

            @Override
            public Thread newThread(Runnable r) {
                return new Thread(r, "Dal-FreshnessScanner" + this.atomic.getAndIncrement());
            }
        });
        executer.scheduleWithFixedDelay(new FreshnessScanner(reader), 0, interval, TimeUnit.SECONDS);
        freshnessUpdatorRef.set(executer);
    }
}
 
源代码6 项目: joyrpc   文件: DemoServiceImpl.java
public DemoServiceImpl() {
    ScheduledExecutorService scheduled = Executors.newSingleThreadScheduledExecutor();
    scheduled.scheduleWithFixedDelay(() -> {
        callbacks.forEach(callback -> {
            try {
                boolean res = callback.echo("callback time: " + System.currentTimeMillis());
                logger.info("send callback is succeed! return " + res);
            } catch (Exception e) {
                logger.error("send callback is failed, cause by " + e.getMessage(), e);
            }
        });
    }, 5000, 5000, TimeUnit.MILLISECONDS);
}
 
源代码7 项目: singer   文件: ConfigFileWatcher.java
@VisibleForTesting
ConfigFileWatcher(int pollPeriodSeconds) {
  ScheduledExecutorService service = Executors.newSingleThreadScheduledExecutor(
      new ThreadFactoryBuilder().setDaemon(true).setNameFormat("ConfigFileWatcher-%d").build());
  this.watcherTask = new WatcherTask();
  service.scheduleWithFixedDelay(
      watcherTask, pollPeriodSeconds, pollPeriodSeconds, TimeUnit.SECONDS);
}
 
@Override
public ScheduledFuture<?> scheduleWithFixedDelay(Runnable task, long delay) {
	ScheduledExecutorService executor = getScheduledExecutor();
	try {
		return executor.scheduleWithFixedDelay(errorHandlingTask(task, true), 0, delay, TimeUnit.MILLISECONDS);
	}
	catch (RejectedExecutionException ex) {
		throw new TaskRejectedException("Executor [" + executor + "] did not accept task: " + task, ex);
	}
}
 
源代码9 项目: kubernetes-client   文件: JobOperationsImpl.java
/**
 * Lets wait until there are enough Ready pods of the given Job
 */
private void waitUntilJobIsScaled() {
  final CountDownLatch countDownLatch = new CountDownLatch(1);

  final AtomicReference<Job> atomicJob = new AtomicReference<>();

  final Runnable jobPoller = () -> {
    try {
      Job job = getMandatory();
      atomicJob.set(job);
      Integer activeJobs = job.getStatus().getActive();
      if (activeJobs == null) {
        activeJobs = 0;
      }
      if (Objects.equals(job.getSpec().getParallelism(), activeJobs)) {
        countDownLatch.countDown();
      } else {
        LOG.debug("Only {}/{} pods scheduled for Job: {} in namespace: {} seconds so waiting...",
          job.getStatus().getActive(), job.getSpec().getParallelism(), job.getMetadata().getName(), namespace);
      }
    } catch (Throwable t) {
      LOG.error("Error while waiting for Job to be scaled.", t);
    }
  };

  ScheduledExecutorService executor = Executors.newSingleThreadScheduledExecutor();
  ScheduledFuture poller = executor.scheduleWithFixedDelay(jobPoller, 0, POLL_INTERVAL_MS, TimeUnit.MILLISECONDS);
  try {
    countDownLatch.await(getConfig().getScaleTimeout(), TimeUnit.MILLISECONDS);
    executor.shutdown();
  } catch (InterruptedException e) {
    Thread.currentThread().interrupt();
    poller.cancel(true);
    executor.shutdown();
    LOG.error("Only {}/{} pod(s) ready for Job: {} in namespace: {} - giving up",
      atomicJob.get().getStatus().getActive(), atomicJob.get().getSpec().getParallelism(), atomicJob.get().getMetadata().getName(), namespace);
  }
}
 
源代码10 项目: tasmo   文件: TasmoProcessingStatsInitializer.java
public static TasmoServiceHandle<TasmoProcessingStats> initialize(final TasmoProcessingStatsConfig config) {
    final TasmoProcessingStats processingStats = new TasmoProcessingStats();
    final ScheduledExecutorService scheduledExecutorService = Executors.newSingleThreadScheduledExecutor();

    return new TasmoServiceHandle<TasmoProcessingStats>() {

        @Override
        public TasmoProcessingStats getService() {
            return processingStats;
        }

        @Override
        public void start() throws Exception {
            int logStatsEveryNSeconds = config.getLogStatsEveryNSeconds();
            if (logStatsEveryNSeconds > 0) {
                scheduledExecutorService.scheduleWithFixedDelay(new Runnable() {
                    @Override
                    public void run() {
                        try {
                            processingStats.logStats();
                        } catch (Exception x) {
                            LOG.error("Issue with logging stats. ", x);
                        }
                    }
                }, logStatsEveryNSeconds, logStatsEveryNSeconds, TimeUnit.SECONDS);
            }
        }

        @Override
        public void stop() throws Exception {
            scheduledExecutorService.shutdownNow();
        }
    };
}
 
源代码11 项目: crate   文件: TimeBasedQEviction.java
static ScheduledFuture<?> scheduleTruncate(long delayInMs,
                                           long intervalInMs,
                                           Queue<? extends ContextLog> q,
                                           ScheduledExecutorService scheduler,
                                           TimeValue expiration) {
    return scheduler.scheduleWithFixedDelay(
        () -> removeExpiredLogs(q, System.currentTimeMillis(), expiration.getMillis()),
        delayInMs,
        intervalInMs,
        TimeUnit.MILLISECONDS);
}
 
源代码12 项目: lams   文件: ThreadPoolTaskScheduler.java
@Override
public ScheduledFuture<?> scheduleWithFixedDelay(Runnable task, long delay) {
	ScheduledExecutorService executor = getScheduledExecutor();
	try {
		return executor.scheduleWithFixedDelay(errorHandlingTask(task, true), 0, delay, TimeUnit.MILLISECONDS);
	}
	catch (RejectedExecutionException ex) {
		throw new TaskRejectedException("Executor [" + executor + "] did not accept task: " + task, ex);
	}
}
 
源代码13 项目: localization_nifi   文件: StandardFlowService.java
@Override
public void start() throws LifeCycleStartException {
    writeLock.lock();
    try {
        if (isRunning()) {
            return;
        }

        running.set(true);

        final ScheduledExecutorService newExecutor = new FlowEngine(2, "Flow Service Tasks");
        newExecutor.scheduleWithFixedDelay(new SaveReportingTask(), 0L, 500L, TimeUnit.MILLISECONDS);
        this.executor.set(newExecutor);

        if (configuredForClustering) {
            senderListener.start();
        }

    } catch (final IOException ioe) {
        try {
            stop(/* force */true);
        } catch (final Exception e) {
        }

        throw new LifeCycleStartException("Failed to start Flow Service due to: " + ioe, ioe);
    } finally {
        writeLock.unlock();
    }
}
 
源代码14 项目: dark-mode-sync-plugin   文件: DarkModeSync.java
/** @param lafManager IDEA look-and-feel manager for getting and setting the current theme */
public DarkModeSync(final LafManager lafManager) {
  themes = ServiceManager.getService(DarkModeSyncThemes.class);
  this.lafManager = lafManager;
  // Checks if OS is Windows or MacOS
  if (!(SystemInfo.isMacOSMojave || SystemInfo.isWin10OrNewer)) {
    logger.error("Plugin only supports macOS Mojave and greater or Windows 10 and greater");
    scheduledFuture = null;
    return;
  }
  ScheduledExecutorService executor = JobScheduler.getScheduler();
  scheduledFuture =
      executor.scheduleWithFixedDelay(this::updateLafIfNecessary, 0, 3, TimeUnit.SECONDS);
}
 
源代码15 项目: localization_nifi   文件: NiFi.java
/**
 * Determine if the machine we're running on has timing issues.
 */
private void detectTimingIssues() {
    final int minRequiredOccurrences = 25;
    final int maxOccurrencesOutOfRange = 15;
    final AtomicLong lastTriggerMillis = new AtomicLong(System.currentTimeMillis());

    final ScheduledExecutorService service = Executors.newScheduledThreadPool(1, new ThreadFactory() {
        private final ThreadFactory defaultFactory = Executors.defaultThreadFactory();

        @Override
        public Thread newThread(final Runnable r) {
            final Thread t = defaultFactory.newThread(r);
            t.setDaemon(true);
            t.setName("Detect Timing Issues");
            return t;
        }
    });

    final AtomicInteger occurrencesOutOfRange = new AtomicInteger(0);
    final AtomicInteger occurrences = new AtomicInteger(0);
    final Runnable command = new Runnable() {
        @Override
        public void run() {
            final long curMillis = System.currentTimeMillis();
            final long difference = curMillis - lastTriggerMillis.get();
            final long millisOff = Math.abs(difference - 2000L);
            occurrences.incrementAndGet();
            if (millisOff > 500L) {
                occurrencesOutOfRange.incrementAndGet();
            }
            lastTriggerMillis.set(curMillis);
        }
    };

    final ScheduledFuture<?> future = service.scheduleWithFixedDelay(command, 2000L, 2000L, TimeUnit.MILLISECONDS);

    final TimerTask timerTask = new TimerTask() {
        @Override
        public void run() {
            future.cancel(true);
            service.shutdownNow();

            if (occurrences.get() < minRequiredOccurrences || occurrencesOutOfRange.get() > maxOccurrencesOutOfRange) {
                LOGGER.warn("NiFi has detected that this box is not responding within the expected timing interval, which may cause "
                        + "Processors to be scheduled erratically. Please see the NiFi documentation for more information.");
            }
        }
    };
    final Timer timer = new Timer(true);
    timer.schedule(timerTask, 60000L);
}
 
/**
 * Lets wait until there are enough Ready pods of the given Deployment
 */
private void waitUntilDeploymentConfigIsScaled(final int count) {
  final BlockingQueue<Object> queue = new ArrayBlockingQueue<Object>(1);
  final AtomicReference<Integer> replicasRef = new AtomicReference<>(0);

  final String name = checkName(getItem());
  final String namespace = checkNamespace(getItem());

  final Runnable deploymentPoller = () -> {
    try {
      DeploymentConfig deploymentConfig = get();
      //If the rs is gone, we shouldn't wait.
      if (deploymentConfig == null) {
        if (count == 0) {
          queue.put(true);
          return;
        } else {
          queue.put(new IllegalStateException("Can't wait for DeploymentConfig: " + checkName(getItem()) + " in namespace: " + checkName(getItem()) + " to scale. Resource is no longer available."));
          return;
        }
      }
      replicasRef.set(deploymentConfig.getStatus().getReplicas());
      int currentReplicas = deploymentConfig.getStatus().getReplicas() != null ? deploymentConfig.getStatus().getReplicas() : 0;
      if (deploymentConfig.getStatus().getObservedGeneration() >= deploymentConfig.getMetadata().getGeneration() && Objects.equals(deploymentConfig.getSpec().getReplicas(), currentReplicas)) {
        queue.put(true);
      } else {
        LOG.debug("Only {}/{} pods scheduled for DeploymentConfig: {} in namespace: {} seconds so waiting...",
          deploymentConfig.getStatus().getReplicas(), deploymentConfig.getSpec().getReplicas(), deploymentConfig.getMetadata().getName(), namespace);
      }
    } catch (Throwable t) {
      LOG.error("Error while waiting for Deployment to be scaled.", t);
    }
  };

    ScheduledExecutorService executor = Executors.newSingleThreadScheduledExecutor();
    ScheduledFuture poller = executor.scheduleWithFixedDelay(deploymentPoller, 0, POLL_INTERVAL_MS, TimeUnit.MILLISECONDS);
    try {
      if (Utils.waitUntilReady(queue, getConfig().getScaleTimeout(), TimeUnit.MILLISECONDS)) {
        LOG.debug("{}/{} pod(s) ready for DeploymentConfig: {} in namespace: {}.",
          replicasRef.get(), count, name, namespace);
      } else {
        LOG.error("{}/{} pod(s) ready for DeploymentConfig: {} in namespace: {}  after waiting for {} seconds so giving up",
          replicasRef.get(), count, name, namespace, TimeUnit.MILLISECONDS.toSeconds(getConfig().getScaleTimeout()));
      }
    } finally {
      poller.cancel(true);
      executor.shutdown();
    }
}
 
源代码17 项目: Flink-CEPplus   文件: JobManagerSharedServices.java
public static JobManagerSharedServices fromConfiguration(
		Configuration config,
		BlobServer blobServer) throws Exception {

	checkNotNull(config);
	checkNotNull(blobServer);

	final String classLoaderResolveOrder =
		config.getString(CoreOptions.CLASSLOADER_RESOLVE_ORDER);

	final String[] alwaysParentFirstLoaderPatterns = CoreOptions.getParentFirstLoaderPatterns(config);

	final BlobLibraryCacheManager libraryCacheManager =
		new BlobLibraryCacheManager(
			blobServer,
			FlinkUserCodeClassLoaders.ResolveOrder.fromString(classLoaderResolveOrder),
			alwaysParentFirstLoaderPatterns);

	final FiniteDuration timeout;
	try {
		timeout = AkkaUtils.getTimeout(config);
	} catch (NumberFormatException e) {
		throw new IllegalConfigurationException(AkkaUtils.formatDurationParsingErrorMessage());
	}

	final ScheduledExecutorService futureExecutor = Executors.newScheduledThreadPool(
			Hardware.getNumberCPUCores(),
			new ExecutorThreadFactory("jobmanager-future"));

	final StackTraceSampleCoordinator stackTraceSampleCoordinator =
		new StackTraceSampleCoordinator(futureExecutor, timeout.toMillis());
	final int cleanUpInterval = config.getInteger(WebOptions.BACKPRESSURE_CLEANUP_INTERVAL);
	final BackPressureStatsTrackerImpl backPressureStatsTracker = new BackPressureStatsTrackerImpl(
		stackTraceSampleCoordinator,
		cleanUpInterval,
		config.getInteger(WebOptions.BACKPRESSURE_NUM_SAMPLES),
		config.getInteger(WebOptions.BACKPRESSURE_REFRESH_INTERVAL),
		Time.milliseconds(config.getInteger(WebOptions.BACKPRESSURE_DELAY)));

	futureExecutor.scheduleWithFixedDelay(
		backPressureStatsTracker::cleanUpOperatorStatsCache,
		cleanUpInterval,
		cleanUpInterval,
		TimeUnit.MILLISECONDS);

	return new JobManagerSharedServices(
		futureExecutor,
		libraryCacheManager,
		RestartStrategyFactory.createRestartStrategyFactory(config),
		stackTraceSampleCoordinator,
		backPressureStatsTracker,
		blobServer);
}
 
源代码18 项目: helios   文件: EnvironmentVariableReporter.java
@Override
protected ScheduledFuture<?> schedule(final Runnable runnable,
                                      final ScheduledExecutorService executorService) {
  return executorService.scheduleWithFixedDelay(runnable, 0, RETRY_INTERVAL_MILLIS, MILLISECONDS);
}
 
源代码19 项目: stepchain   文件: ScheduledFixedDelayProcessor.java
protected void schedule(ScheduledExecutorService scheduledExecutorService,long milliseconds,Runnable runnable) {
 scheduledExecutorService.scheduleWithFixedDelay(runnable,  milliseconds,  milliseconds,
			TimeUnit.MILLISECONDS);
}
 
public static void main(String[] args) throws Exception {

		ScheduledExecutorService readers = Executors.newScheduledThreadPool(3);
		readers.scheduleWithFixedDelay(new Runnable() {
			
			@Override
			public void run() {
				
				System.out.println(Thread.currentThread().getName());
				downCount();
				
				try {
					Thread.sleep(1000);
				} catch (InterruptedException e) {
					e.printStackTrace();
				}
				
				
				
			}
		}, 0, 10, MILLISECONDS);
		
		
		synchronized (monitor) {
			if(count > 0) monitor.wait();
			readers.shutdown();
		}
		
	}