下面列出了怎么用java.util.concurrent.ScheduledExecutorService的API类实例代码及写法,或者点击链接到github查看源代码。
public static void init() {
if(managerRef.get() !=null) {
return;
}
synchronized (MarkdownManager.class) {
if(managerRef.get() !=null) {
return;
}
ArrayList<ErrorDetector> detectors = new ArrayList<ErrorDetector>();
// We currently only have Timeout case
detectors.add(new TimeoutDetector());
detectorsRef.set(detectors);
ScheduledExecutorService manager = new ScheduledThreadPoolExecutor(1);
manager.scheduleAtFixedRate(new CollectExceptionTask(), durations,
durations, TimeUnit.MICROSECONDS);
managerRef.set(manager);
}
}
private void scheduleAggregatorThread(final TimelineMetricAggregator aggregator) {
if (!aggregator.isDisabled()) {
ScheduledExecutorService executorService = Executors.newSingleThreadScheduledExecutor(
new ThreadFactory() {
@Override
public Thread newThread(Runnable r) {
return new Thread(r, ACTUAL_AGGREGATOR_NAMES.get(aggregator.getName()));
}
}
);
scheduledExecutors.put(aggregator.getName(), executorService);
executorService.scheduleAtFixedRate(aggregator,
0l,
aggregator.getSleepIntervalMillis(),
TimeUnit.MILLISECONDS);
LOG.info("Scheduled aggregator thread " + aggregator.getName() + " every " +
+ aggregator.getSleepIntervalMillis() + " milliseconds.");
} else {
LOG.info("Skipped scheduling " + aggregator.getName() + " since it is disabled.");
}
}
public StorageImpl ( final File file, final BundleContext context, final DataFilePool pool, final ScheduledExecutorService queryExecutor, final ScheduledExecutorService updateExecutor, final ScheduledExecutorService eventExecutor ) throws Exception
{
super ( file, pool, queryExecutor, eventExecutor );
this.updateExecutor = updateExecutor;
this.heartbeatJob = updateExecutor.scheduleAtFixedRate ( new Runnable () {
@Override
public void run ()
{
heartbeat ();
}
}, 0, getHeartbeatPeriod (), TimeUnit.MILLISECONDS );
// register with OSGi
final Dictionary<String, Object> properties = new Hashtable<String, Object> ( 2 );
properties.put ( Constants.SERVICE_VENDOR, "Eclipse SCADA Project" );
properties.put ( Constants.SERVICE_PID, this.id );
this.handle = context.registerService ( StorageHistoricalItem.class, this, properties );
}
public static void main(String[] args) {
SchedulerExecutorTest2 executor = new SchedulerExecutorTest2("job1");
// 获取当前时间
Calendar currentDate = Calendar.getInstance();
long currentDateLong = currentDate.getTime().getTime();
System.out.println("Current Date = " + currentDate.getTime().toString());
// 计算满足条件的最近一次执行时间
Calendar earliestDate = executor.getEarliestDate(currentDate,3,16,38,10);
long earliestDateLong = earliestDate.getTime().getTime();
System.out.println("Earliest Date = " + earliestDate.getTime().toString());
// 计算从当前时间到最近一次执行时间的时间间隔
long delay = earliestDateLong - currentDateLong;
// 计算执行周期为一星期
// long period = 7 * 24 * 60 * 60 * 1000;
long period = 1000;
ScheduledExecutorService service = Executors.newScheduledThreadPool(10);
// 从现在开始delay毫秒之后,每隔一星期执行一次job1
service.scheduleAtFixedRate(executor, delay, period,
TimeUnit.MILLISECONDS);
}
@SuppressWarnings("unchecked")
public Object invokeForCompletionStage(MethodInvocation invocation,
RecoveryFunction<?> fallbackMethod,
io.github.resilience4j.timelimiter.TimeLimiter timeLimiter) {
ScheduledExecutorService scheduler = Execution.current().getController().getExecutor();
CompletableFuture<?> future = timeLimiter.executeCompletionStage(scheduler, () -> {
try {
return (CompletionStage) proceed(invocation);
} catch (Throwable t) {
final CompletableFuture<?> promise = new CompletableFuture<>();
promise.completeExceptionally(t);
return (CompletionStage) promise;
}
}).toCompletableFuture();
completeFailedFuture(new TimeoutException(), fallbackMethod, future);
return future;
}
@Test
public void futureCancelTest() throws InterruptedException, ExecutionException {
ScheduledExecutorService scheduler = makeScheduler(2);
BlockingTestRunnable btr = new BlockingTestRunnable();
try {
final Future<?> f = scheduler.submit(btr);
new Thread(new Runnable() {
@Override
public void run() {
TestUtils.sleep(DELAY_TIME);
f.cancel(true);
}
}).start();
try {
f.get();
fail("exception should have been thrown");
} catch (CancellationException e) {
// expected
}
} finally {
btr.unblock();
scheduler.shutdownNow();
}
}
@Test(timeout = 2000)
public void futureThrows() {
ScheduledExecutorService exec = Executors.newScheduledThreadPool(1);
try {
Future<Integer> f = exec.schedule(() -> { throw new RuntimeException("forced failure"); }, 500, TimeUnit.MILLISECONDS);
TestSubscriber<Integer> ts = new TestSubscriber<>();
new PublisherFuture<>(f).subscribe(ts);
ts.await();
ts.assertNoValues()
.assertError(ExecutionException.class)
.assertErrorCause(RuntimeException.class)
.assertNotComplete();
} finally {
exec.shutdown();
}
}
/**
* Tests the acquire() method if no limit is set. A test thread is started
* that calls the semaphore a large number of times. Even if the semaphore's
* period does not end, the thread should never block.
*/
@Test
public void testAcquireNoLimit() throws InterruptedException {
final ScheduledExecutorService service = EasyMock
.createMock(ScheduledExecutorService.class);
final ScheduledFuture<?> future = EasyMock.createMock(ScheduledFuture.class);
prepareStartTimer(service, future);
EasyMock.replay(service, future);
final TimedSemaphoreTestImpl semaphore = new TimedSemaphoreTestImpl(service,
PERIOD, UNIT, TimedSemaphore.NO_LIMIT);
final int count = 1000;
final CountDownLatch latch = new CountDownLatch(count);
final SemaphoreThread t = new SemaphoreThread(semaphore, latch, count, count);
t.start();
latch.await();
EasyMock.verify(service, future);
}
@Before
public void setUp() throws Exception {
storageUtil = new StorageTestUtil(this);
storageUtil.expectOperations();
ScheduledExecutorService executor = createMock(ScheduledExecutorService.class);
clock = FakeScheduledExecutor.fromScheduledExecutorService(executor);
backoffStrategy = createMock(BackoffStrategy.class);
taskScheduler = createMock(TaskScheduler.class);
rateLimiter = createMock(RateLimiter.class);
rescheduleCalculator = createMock(RescheduleCalculator.class);
batchWorker = createMock(TaskGroupBatchWorker.class);
statsProvider = new FakeStatsProvider();
taskGroups = new TaskGroups(
executor,
new TaskGroupsSettings(FIRST_SCHEDULE_DELAY, backoffStrategy, rateLimiter, 2),
taskScheduler,
rescheduleCalculator,
batchWorker,
statsProvider);
}
@VisibleForTesting
PravegaTablesStoreHelper(SegmentHelper segmentHelper, GrpcAuthHelper authHelper, ScheduledExecutorService executor, int numOfRetries) {
this.segmentHelper = segmentHelper;
this.executor = executor;
cache = new Cache(x -> {
TableCacheKey<?> entryKey = (TableCacheKey<?>) x;
// Since there are be multiple tables, we will cache `table+key` in our cache
return getEntry(entryKey.getTable(), entryKey.getKey(), entryKey.fromBytesFunc)
.thenApply(v -> new VersionedMetadata<>(v.getObject(), v.getVersion()));
});
this.authHelper = authHelper;
this.authToken = new AtomicReference<>(authHelper.retrieveMasterToken());
this.numOfRetries = numOfRetries;
}
static ReactiveSeq<Integer> interval(String cron,ScheduledExecutorService exec) {
ReactiveSubscriber<Integer> sub = reactiveSubscriber();
AtomicBoolean isOpen = new AtomicBoolean(true);
Subscription[] s= {null};
sub.onSubscribe(new Subscription() {
@Override
public void request(long n) {
s[0].request(n);
}
@Override
public void cancel() {
isOpen.set(false);
}
});
s[0] = ReactiveSeq.iterate(1, a -> a + 1)
.takeWhile(e -> isOpen.get())
.schedule(cron, exec)
.connect()
.forEach(1, e -> sub.onNext(e));
return sub.reactiveStream();
}
@Test
public void test2 () throws Exception
{
final ScheduledExecutorService executor = Executors.newSingleThreadScheduledExecutor ();
final ProcessBuilder pb = new ProcessBuilder ( "sleep", "3" ); // FIXME: works only on unix
final AbstractScheduledInput input = new ProcessInput ( executor, pb, Charset.forName ( "UTF-8" ), 1000 );
final TestListener listener = new TestListener ();
input.addInputListener ( listener );
logger.debug ( "test2 - start" );
input.start ();
Thread.sleep ( 100 );
logger.debug ( "test2 - stop" );
input.stop ();
logger.debug ( "test2 - dispose" );
input.dispose ();
logger.debug ( "test2 - shutdown" );
executor.shutdown ();
logger.debug ( "test2 - wait" );
executor.awaitTermination ( Long.MAX_VALUE, TimeUnit.MINUTES );
logger.debug ( "test2 - done" );
dumpData ( listener );
// TODO: test
}
public JournalStoreClient(List<URI> servers, JournalEntryParser journalEntryParser, ExecutorService asyncExecutor, ScheduledExecutorService scheduledExecutor, Properties properties) {
this.appendResultSerializer = new LongSerializer();
this.querySerializer = new JournalStoreQuerySerializer();
this.queryResultSerializer = new JournalStoreQueryResultSerializer(journalEntryParser);
BootStrap bootStrap = new BootStrap(
servers,
asyncExecutor, scheduledExecutor,
properties
);
raftClient = bootStrap.getClient();
}
@Test
public void testTaskFilter() {
FakeClock fakeClock = new FakeClock();
ScheduledExecutorService scheduledExecutorService = fakeClock.getScheduledExecutorService();
final AtomicBoolean selectedDone = new AtomicBoolean();
final AtomicBoolean ignoredDone = new AtomicBoolean();
final Runnable selectedRunnable = new Runnable() {
@Override
public void run() {
selectedDone.set(true);
}
};
Runnable ignoredRunnable = new Runnable() {
@Override
public void run() {
ignoredDone.set(true);
}
};
FakeClock.TaskFilter filter = new FakeClock.TaskFilter() {
@Override
public boolean shouldAccept(Runnable runnable) {
return runnable == selectedRunnable;
}
};
scheduledExecutorService.execute(selectedRunnable);
scheduledExecutorService.execute(ignoredRunnable);
assertEquals(2, fakeClock.numPendingTasks());
assertEquals(1, fakeClock.numPendingTasks(filter));
assertEquals(2, fakeClock.getPendingTasks().size());
assertEquals(1, fakeClock.getPendingTasks(filter).size());
assertSame(selectedRunnable, fakeClock.getPendingTasks(filter).iterator().next().command);
assertEquals(2, fakeClock.runDueTasks());
assertTrue(selectedDone.get());
assertTrue(ignoredDone.get());
}
/**
* Returns a {@link Scheduler} that schedules the task using the {@link
* ScheduledExecutorService#scheduleAtFixedRate} method.
*
* @param initialDelay the time to delay first execution
* @param period the period between successive executions of the task
* @param unit the time unit of the initialDelay and period parameters
*/
public static Scheduler newFixedRateSchedule(
final long initialDelay, final long period, final TimeUnit unit) {
checkNotNull(unit);
checkArgument(period > 0, "period must be > 0, found %s", period);
return new Scheduler() {
@Override
public Future<?> schedule(AbstractService service, ScheduledExecutorService executor, Runnable task) {
return executor.scheduleAtFixedRate(task, initialDelay, period, unit);
}
};
}
protected EthScheduler(
final ExecutorService syncWorkerExecutor,
final ScheduledExecutorService scheduler,
final ExecutorService txWorkerExecutor,
final ExecutorService servicesExecutor,
final ExecutorService computationExecutor) {
this.syncWorkerExecutor = syncWorkerExecutor;
this.scheduler = scheduler;
this.txWorkerExecutor = txWorkerExecutor;
this.servicesExecutor = servicesExecutor;
this.computationExecutor = computationExecutor;
}
private JobRunner(final String jobId,
final JobRunnable jobRunnable,
final ApplicationEventPublisher eventPublisher,
final ScheduledExecutorService executorService) {
this.jobId = jobId;
this.jobRunnable = jobRunnable;
this.eventPublisher = eventPublisher;
this.executorService = executorService;
this.jobMarker = JobMarker.jobMarker(jobRunnable.getJobDefinition().jobType());
}
@Test
public void scheduleRunnableTest() throws InterruptedException, ExecutionException {
ScheduledExecutorService scheduler = makeScheduler(1);
try {
TestRunnable tc = new TestRunnable();
ScheduledFuture<?> f = scheduler.schedule(tc, 0, TimeUnit.MILLISECONDS);
assertTrue(f.getDelay(TimeUnit.MILLISECONDS) <= 0);
assertNull(f.get());
assertTrue(f.isDone());
} finally {
scheduler.shutdownNow();
}
}
IdleKiller(
ScheduledExecutorService scheduledExecutorService,
Duration idleKillDelay,
Runnable killTask) {
this.scheduledExecutorService = scheduledExecutorService;
this.idleKillDelay = idleKillDelay;
this.killTask = killTask;
}
static ScheduledLeaseLock of(ScheduledExecutorService scheduledExecutorService,
ArtemisExecutor executor,
String lockName,
LeaseLock lock,
long renewPeriodMillis,
IOCriticalErrorListener ioCriticalErrorListener) {
return new ActiveMQScheduledLeaseLock(scheduledExecutorService, executor, lockName, lock, renewPeriodMillis, ioCriticalErrorListener);
}
public TestMessageContextManager(
@Value("#{contextWorkerService}") ScheduledExecutorService service,
MessagesMetricSet metrics,
@Value("${originName}") String origin,
@Value("${messageTimeoutSeconds:60}") long messageTimeout,
@Value("${messagePayloadSize:4096}") int payloadSize) {
this.service = service;
this.metrics = metrics;
this.origin = origin;
this.messageTimeout = Duration.ofSeconds(messageTimeout);
this.payloadSize = payloadSize;
partitionPreviousContext = new TestMessageContext[TOTAL_GROUPS];
}
public static void main(String[] args) {
HttpHandler exceptionHandler =
CustomHandlers.exception(CIRCUIT_BREAKER_HANDLER)
.addExceptionHandler(Throwable.class, FailsafeWebserver::serverError);
SimpleServer server = SimpleServer.simpleServer(exceptionHandler);
server.start();
// Warm-up the circuit breaker it needs to hit at least max executions
// Before it will reject anything. This will make that easier.
for (int i = 0; i < 10; i++) {
request("warmup", false, false);
}
ScheduledExecutorService schedExec = Executors.newScheduledThreadPool(1);
// A simple request that should always succeed
schedExec.scheduleAtFixedRate(() -> request("ping", false, false), 0, 500, TimeUnit.MILLISECONDS);
// Send a batch of 15 bad requests to trigger the circuit breaker
Runnable errors = () -> {
log.info("Start: Executing bad requests!");
for (int i = 0; i < 15; i++) {
request("bad request", true, false);
}
log.info("End: Executing bad requests!");
};
schedExec.schedule(errors, 1, TimeUnit.SECONDS);
// Send a batch of 15 requests that throw exceptions
Runnable exceptions = () -> {
log.info("Start: Executing requests that throw exceptions!");
for (int i = 0; i < 15; i++) {
request("exception request", false, true);
}
log.info("End: Executing requests that throw exceptions!");
};
schedExec.schedule(exceptions, 5, TimeUnit.SECONDS);
}
@Test
public void testThatFailSafeIsBrokenWithFallback() throws Exception {
CircuitBreaker<Integer> breaker = new CircuitBreaker<Integer>().withFailureThreshold(10, 100).withSuccessThreshold(2).withDelay(
Duration.ofMillis(100));
ScheduledExecutorService service = Executors.newSingleThreadScheduledExecutor();
int result = Failsafe.with(Fallback.of(e -> 999), breaker)
.with(service)
.getStageAsync(() -> CompletableFuture.completedFuture(223))
.get();
Assert.assertEquals(result, 223);
}
protected ExecutorService startHeartbeat( final long intervalInSeconds ) {
final ScheduledExecutorService heartbeat = Executors.newSingleThreadScheduledExecutor( new ThreadFactory() {
@Override
public Thread newThread( Runnable r ) {
Thread thread = new Thread( r, "Job Heartbeat Thread for: " + getName() );
thread.setDaemon( true );
return thread;
}
} );
heartbeat.scheduleAtFixedRate( new Runnable() {
public void run() {
if ( Job.this.isFinished() ) {
log.logBasic( "Shutting down heartbeat signal for " + jobMeta.getName() );
shutdownHeartbeat( heartbeat );
return;
}
try {
log.logDebug( "Triggering heartbeat signal for " + jobMeta.getName() + " at every " + intervalInSeconds
+ " seconds" );
ExtensionPointHandler.callExtensionPoint( log, KettleExtensionPoint.JobHeartbeat.id, Job.this );
} catch ( KettleException e ) {
log.logError( e.getMessage(), e );
}
}
}, intervalInSeconds /* initial delay */, intervalInSeconds /* interval delay */, TimeUnit.SECONDS );
return heartbeat;
}
@Bean(name = Names.SCHEDULER_NAME)
public ScheduledExecutorService scheduler() {
ScheduledExecutorFactoryBean factoryBean = new ScheduledExecutorFactoryBean();
factoryBean.setThreadNamePrefix("NextRTCConfig");
factoryBean.setPoolSize(properties.getSchedulerPoolSize());
factoryBean.afterPropertiesSet();
return factoryBean.getObject();
}
/**
* Stops {@link #cleanupExecutor}.
*/
public static synchronized void shutdown() {
CloverJMX instance = cloverJMX;
if (instance != null) {
ScheduledExecutorService executor = instance.cleanupExecutor;
if (executor != null) {
executor.shutdownNow();
}
}
}
@Test
public void restGetShouldClose() throws Exception {
RestFunctions.RestGet restGet = new RestFunctions.RestGet();
CloseableHttpClient httpClient = mock(CloseableHttpClient.class);
ScheduledExecutorService executorService = mock(ScheduledExecutorService.class);
RestFunctions.setCloseableHttpClient(httpClient);
RestFunctions.setScheduledExecutorService(executorService);
restGet.close();
verify(httpClient, times(1)).close();
verify(executorService, times(1)).shutdown();
verifyNoMoreInteractions(httpClient);
}
@Provides
@Singleton
@ForAsyncHttp
public static ScheduledExecutorService createAsyncHttpTimeoutExecutor(TaskManagerConfig config)
{
return newScheduledThreadPool(config.getHttpTimeoutThreads(), daemonThreadsNamed("async-http-timeout-%s"));
}
public AbstractKeeperMasterChooserAlgorithm(String clusterId, String shardId, DcMetaCache dcMetaCache,
CurrentMetaManager currentMetaManager, ScheduledExecutorService scheduled) {
this.dcMetaCache = dcMetaCache;
this.currentMetaManager = currentMetaManager;
this.clusterId = clusterId;
this.shardId = shardId;
this.scheduled = scheduled;
}
private ExecutionGraph createExecutionGraph(Configuration configuration) throws Exception {
final ScheduledExecutorService executor = TestingUtils.defaultExecutor();
final JobID jobId = new JobID();
final JobGraph jobGraph = new JobGraph(jobId, "test");
jobGraph.setSnapshotSettings(
new JobCheckpointingSettings(
Collections.<JobVertexID>emptyList(),
Collections.<JobVertexID>emptyList(),
Collections.<JobVertexID>emptyList(),
new CheckpointCoordinatorConfiguration(
100,
10 * 60 * 1000,
0,
1,
CheckpointRetentionPolicy.NEVER_RETAIN_AFTER_TERMINATION,
false,
false,
0),
null));
final Time timeout = Time.seconds(10L);
return ExecutionGraphBuilder.buildGraph(
null,
jobGraph,
configuration,
executor,
executor,
new ProgrammedSlotProvider(1),
getClass().getClassLoader(),
new StandaloneCheckpointRecoveryFactory(),
timeout,
new NoRestartStrategy(),
new UnregisteredMetricsGroup(),
blobWriter,
timeout,
LoggerFactory.getLogger(getClass()),
NettyShuffleMaster.INSTANCE,
NoOpPartitionTracker.INSTANCE);
}