下面列出了java.util.concurrent.ScheduledExecutorService#shutdown ( ) 实例代码,或者点击链接到github查看源代码,也可以在右侧发表评论。
@Test
public void shouldNotExhaustThreads() throws Exception {
final ScheduledExecutorService executorService = Executors.newScheduledThreadPool(2, testingThreadFactory);
final GremlinExecutor gremlinExecutor = GremlinExecutor.build()
.executorService(executorService)
.scheduledExecutorService(executorService).create();
final AtomicInteger count = new AtomicInteger(0);
assertTrue(IntStream.range(0, 1000).mapToObj(i -> gremlinExecutor.eval("1+1")).allMatch(f -> {
try {
return (Integer) f.get() == 2;
} catch (Exception ex) {
throw new RuntimeException(ex);
} finally {
count.incrementAndGet();
}
}));
assertEquals(1000, count.intValue());
executorService.shutdown();
executorService.awaitTermination(30000, TimeUnit.MILLISECONDS);
}
public void stop() {
final ScheduledFuture<?> future = this.future.getAndSet(null);
if (future != null
&& !future.isDone() && !future.isCancelled()
&& !future.cancel(false)) {
Logger.getLogger(Pool.class.getName()).log(Level.WARNING, "Pool scheduler task termination timeout expired");
}
final ScheduledExecutorService scheduler = this.scheduler.getAndSet(null);
if (scheduler != null) {
scheduler.shutdown();
try {
if (!scheduler.awaitTermination(10, SECONDS)) { // should last something like 0s max since we killed the task
Logger.getLogger(Pool.class.getName()).log(Level.WARNING, "Pool scheduler termination timeout expired");
}
} catch (final InterruptedException e) {
//Ignore
}
}
}
public static void main(String[] args) {
SimpleMeterRegistry meterRegistry = new SimpleMeterRegistry();
Metrics.globalRegistry.add(meterRegistry);
MeterMapCleanerTask task = new MeterMapCleanerTask(Metrics.globalRegistry);
task.start("0/2 * * * * ?");
ScheduledExecutorService s = Executors.newSingleThreadScheduledExecutor();
s.scheduleAtFixedRate(() -> {
meterRegistry.counter(UUID.randomUUID().toString()).increment();
System.out.println(meterRegistry.getMeters().size());
}, 0, 100, TimeUnit.MILLISECONDS);
try {
TimeUnit.SECONDS.sleep(5);
} catch (InterruptedException e) {
e.printStackTrace();
}
s.shutdown();
task.stop();
}
@Test
public void testMonitorRestart() throws InterruptedException {
FlumeTimelineMetricsSink flumeTimelineMetricsSink = new FlumeTimelineMetricsSink();
TimelineMetricsCache timelineMetricsCache = getTimelineMetricsCache(flumeTimelineMetricsSink);
flumeTimelineMetricsSink.setPollFrequency(1);
mockStatic(Executors.class);
ScheduledExecutorService executor = createNiceMock(ScheduledExecutorService.class);
expect(Executors.newSingleThreadScheduledExecutor()).andReturn(executor);
FlumeTimelineMetricsSink.TimelineMetricsCollector collector = anyObject();
TimeUnit unit = anyObject();
expect(executor.scheduleWithFixedDelay(collector, eq(0), eq(1), unit)).andReturn(null);
executor.shutdown();
replay(timelineMetricsCache, Executors.class, executor);
flumeTimelineMetricsSink.start();
flumeTimelineMetricsSink.stop();
verifyAll();
}
@Test
public void subscribeToShard_ReceivesAllData() {
List<SdkBytes> producedData = new ArrayList<>();
ScheduledExecutorService producer = Executors.newScheduledThreadPool(1);
// Delay it a bit to allow us to subscribe first
producer.scheduleAtFixedRate(() -> putRecord().ifPresent(producedData::add), 10, 1, TimeUnit.SECONDS);
List<SdkBytes> receivedData = new ArrayList<>();
// Add every event's data to the receivedData list
Consumer<SubscribeToShardEvent> eventConsumer = s -> receivedData.addAll(
s.records().stream()
.map(Record::data)
.collect(Collectors.toList()));
asyncClient.subscribeToShard(r -> r.consumerARN(consumerArn)
.shardId(shardId)
.startingPosition(s -> s.type(ShardIteratorType.LATEST)),
SubscribeToShardResponseHandler.builder()
.onEventStream(p -> p.filter(SubscribeToShardEvent.class)
.subscribe(eventConsumer))
.onResponse(this::verifyHttpMetadata)
.build())
.join();
producer.shutdown();
// Make sure we all the data we received was data we published, we may have published more
// if the producer isn't shutdown immediately after we finish subscribing.
assertThat(producedData).containsSequence(receivedData);
}
@Override
protected void shutDown() {
// Delay stopping ZooKeeper server to ensure that clients close first during service shutdown.
ScheduledExecutorService executorService = Executors.newScheduledThreadPool(0);
executorService.schedule(() -> testServer.stop(), 1000, TimeUnit.MILLISECONDS);
executorService.shutdown();
}
@Test(expected = SafeScheduledExecutorServiceRethrowsException.class)
public void testScheduleReturnFutureThrowsException() throws Throwable {
ScheduledExecutorService executorService = new SafeScheduledExecutorService(1, "test");
Future<?> future = executorService.schedule(new RunnableWhichThrows(), 0, TimeUnit.DAYS);
try {
future.get();
} catch (ExecutionException e) {
throw e.getCause();
}
executorService.shutdown();
}
@Override
public void onDisable() {
debugMessage(ChatColor.RED + "Saving updates to database!");
// Schedule task to update database for the last time.
ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(1);
scheduler.execute(new UpdateDatabaseTask(this));
scheduler.shutdown();
this.getLogger().info(this.getDescription().getFullName() + " has been disabled!");
this.getLogsManager().writeToLogFile("Disabled Statz!");
}
/**
* Dispose controller forcibly
*
* @param disconnect
* if <code>true</code> the connection will also be disconnected
*/
public void dispose ( final boolean disconnect )
{
logger.debug ( "Disposing - disconnect: {}", disconnect );
final ScheduledExecutorService executor;
synchronized ( this )
{
executor = this.executor;
if ( this.executor != null )
{
if ( disconnect )
{
disconnect ();
}
this.executor = null;
}
}
if ( this.zombieJob != null )
{
this.zombieJob.cancel ( false );
this.zombieJob = null;
}
if ( executor != null )
{
// shutdown outside of sync lock
executor.shutdown ();
}
}
/**
* Notifies subscribed IO sample listeners that a new IO sample packet has
* been received.
*
* @param ioSample The received IO sample.
* @param remoteDevice The remote XBee device that sent the sample.
*
* @see com.digi.xbee.api.RemoteXBeeDevice
* @see com.digi.xbee.api.io.IOSample
*/
private void notifyIOSampleReceived(final RemoteXBeeDevice remoteDevice, final IOSample ioSample) {
logger.debug(connectionInterface.toString() + "IO sample received.");
try {
synchronized (ioSampleReceiveListeners) {
ScheduledExecutorService executor = Executors.newScheduledThreadPool(Math.min(MAXIMUM_PARALLEL_LISTENER_THREADS,
ioSampleReceiveListeners.size()));
for (final IIOSampleReceiveListener listener:ioSampleReceiveListeners) {
// Ensure that the reader is running to avoid a RejectedExecutionException.
if (!running)
break;
executor.execute(new Runnable() {
/*
* (non-Javadoc)
* @see java.lang.Runnable#run()
*/
@Override
public void run() {
// Synchronize the listener so it is not called
// twice. That is, let the listener to finish its job.
synchronized (listener) {
listener.ioSampleReceived(remoteDevice, ioSample);
}
}
});
}
executor.shutdown();
}
} catch (Exception e) {
logger.error(e.getMessage(), e);
}
}
/**
* Test to make sure we reschedule the task for execution
* if the new requested execution is earlier than the previous one
*/
public void testRescheduleForEarlierTime() throws InterruptedException {
ScheduledExecutorService ex = Executors.newScheduledThreadPool(1);
MyConflationListener listener = new MyConflationListener();
OneTaskOnlyExecutor decorator = new OneTaskOnlyExecutor(ex, listener);
final CountDownLatch latch = new CountDownLatch(1);
final AtomicInteger counter = new AtomicInteger();
Runnable increment = new Runnable() {
public void run() {
counter.incrementAndGet();
}
};
decorator.schedule(increment, 120, TimeUnit.SECONDS);
decorator.schedule(increment, 10, TimeUnit.MILLISECONDS);
long start = System.nanoTime();
ex.shutdown();
ex.awaitTermination(60, TimeUnit.SECONDS);
long elapsed = System.nanoTime() - start;
assertEquals(1, counter.get());
assertEquals(1, listener.getDropCount());
assertTrue(elapsed < TimeUnit.SECONDS.toNanos(120));
}
@Test
public void testNewScheduledThreadPoolWithThreadFactory() throws Exception {
final ScheduledExecutorService executorService = InstrumentedExecutors.newScheduledThreadPool(2, defaultThreadFactory, registry);
executorService.schedule(new NoopRunnable(), 0, TimeUnit.SECONDS);
final Field delegateField = InstrumentedScheduledExecutorService.class.getDeclaredField("delegate");
delegateField.setAccessible(true);
final ScheduledThreadPoolExecutor delegate = (ScheduledThreadPoolExecutor) delegateField.get(executorService);
assertThat(delegate.getCorePoolSize()).isEqualTo(2);
assertThat(delegate.getThreadFactory()).isSameAs(defaultThreadFactory);
executorService.shutdown();
}
@Test(expected = NdbcException.class)
public void cancellation() throws Throwable {
final ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(1);
try {
final Future<List<Row>> f = ds.query("SELECT pg_sleep(999)");
f.raise(new Exception(""));
f.get(timeout);
} finally {
scheduler.shutdown();
}
}
@Test
public void testPollFromAny() throws InterruptedException {
final RBoundedBlockingQueue<Integer> queue1 = redisson.getBoundedBlockingQueue("queue:pollany");
assertThat(queue1.trySetCapacity(10)).isTrue();
ScheduledExecutorService executor = Executors.newSingleThreadScheduledExecutor();
executor.schedule(() -> {
RBoundedBlockingQueue<Integer> queue2 = redisson.getBoundedBlockingQueue("queue:pollany1");
assertThat(queue2.trySetCapacity(10)).isTrue();
RBoundedBlockingQueue<Integer> queue3 = redisson.getBoundedBlockingQueue("queue:pollany2");
assertThat(queue3.trySetCapacity(10)).isTrue();
try {
queue3.put(2);
queue1.put(1);
queue2.put(3);
} catch (Exception e) {
Assert.fail();
}
}, 3, TimeUnit.SECONDS);
long s = System.currentTimeMillis();
int l = queue1.pollFromAny(40, TimeUnit.SECONDS, "queue:pollany1", "queue:pollany2");
Assert.assertEquals(2, l);
Assert.assertTrue(System.currentTimeMillis() - s > 2000);
executor.shutdown();
assertThat(executor.awaitTermination(1, TimeUnit.MINUTES)).isTrue();
}
@SuppressWarnings("FutureReturnValueIgnored")
@Override
public void close(ScheduledExecutorService instance) {
try {
if (instance instanceof EpollEventLoopGroup) {
((EpollEventLoopGroup)instance).shutdownGracefully(0, 0, TimeUnit.SECONDS).sync();
} else {
instance.shutdown();
}
} catch (InterruptedException e) {
logger.log(Level.SEVERE, "Interrupted during shutdown", e);
Thread.currentThread().interrupt();
}
}
/**
* This tests verifies that {{@link ZooKeeperDataCache} invalidates the cache if the get-operation time-out on that
* path.
*
* @throws Exception
*/
@Test
public void testTimedOutZKCacheRequestInvalidates() throws Exception {
OrderedScheduler executor = OrderedScheduler.newSchedulerBuilder().build();
ScheduledExecutorService scheduledExecutor = Executors.newScheduledThreadPool(2);
ExecutorService zkExecutor = Executors.newSingleThreadExecutor(new DefaultThreadFactory("mockZk"));
MockZooKeeper zkSession = spy(MockZooKeeper.newInstance(MoreExecutors.newDirectExecutorService()));
String path = "test";
doNothing().when(zkSession).getData(anyString(), any(Watcher.class), any(DataCallback.class), any());
zkClient.create("/test", new byte[0], null, null);
// add readOpDelayMs so, main thread will not serve zkCacahe-returned future and let zkExecutor-thread handle
// callback-result process
ZooKeeperCache zkCacheService = new LocalZooKeeperCache(zkSession, 1, executor);
ZooKeeperDataCache<String> zkCache = new ZooKeeperDataCache<String>(zkCacheService) {
@Override
public String deserialize(String key, byte[] content) throws Exception {
return new String(content);
}
};
// try to do get on the path which will time-out and async-cache will have non-completed Future
try {
zkCache.get(path);
} catch (Exception e) {
// Ok
}
retryStrategically((test) -> {
return zkCacheService.dataCache.getIfPresent(path) == null;
}, 5, 1000);
assertNull(zkCacheService.dataCache.getIfPresent(path));
executor.shutdown();
zkExecutor.shutdown();
scheduledExecutor.shutdown();
}
@Test
public void testNewSingleThreadScheduledExecutor() throws Exception {
final ScheduledExecutorService executorService = InstrumentedExecutors.newSingleThreadScheduledExecutor(registry);
executorService.schedule(new NoopRunnable(), 0, TimeUnit.SECONDS);
executorService.shutdown();
}
public void doTest(MediaProfileSpecType mediaProfileSpecType, String expectedVideoCodec,
String expectedAudioCodec, String extension) throws Exception {
final CountDownLatch recorderLatch = new CountDownLatch(1);
// Media Pipeline #1
MediaPipeline mp = kurentoClient.createMediaPipeline();
PlayerEndpoint playerEp =
new PlayerEndpoint.Builder(mp, getPlayerUrl("/video/10sec/green.webm")).build();
WebRtcEndpoint webRtcEp1 = new WebRtcEndpoint.Builder(mp).build();
String recordingFile = getRecordUrl(extension);
final RecorderEndpoint recorderEp = new RecorderEndpoint.Builder(mp, recordingFile)
.withMediaProfile(mediaProfileSpecType).build();
playerEp.connect(webRtcEp1);
playerEp.connect(recorderEp);
final CountDownLatch eosLatch = new CountDownLatch(1);
playerEp.addEndOfStreamListener(new EventListener<EndOfStreamEvent>() {
@Override
public void onEvent(EndOfStreamEvent event) {
eosLatch.countDown();
}
});
// Test execution #1. Play the video while it is recorded
launchBrowser(mp, webRtcEp1, playerEp, recorderEp, expectedVideoCodec, expectedAudioCodec,
recordingFile, EXPECTED_COLOR, 0, 0, PLAYTIME);
ScheduledExecutorService executor = Executors.newSingleThreadScheduledExecutor();
executor.schedule(new Runnable() {
@Override
public void run() {
recorderEp.stopAndWait(new Continuation<Void>() {
@Override
public void onSuccess(Void result) throws Exception {
recorderLatch.countDown();
}
@Override
public void onError(Throwable cause) throws Exception {
recorderLatch.countDown();
}
});
}
}, PLAYTIME / 2, TimeUnit.SECONDS);
// Wait for EOS
Assert.assertTrue("No EOS event", eosLatch.await(getPage().getTimeout(), TimeUnit.SECONDS));
Assert.assertTrue("Not stop properly",
recorderLatch.await(getPage().getTimeout(), TimeUnit.SECONDS));
// Release Media Pipeline #1
mp.release();
// Wait until file exists
waitForFileExists(recordingFile);
// Reloading browser
getPage().reload();
// Media Pipeline #2
MediaPipeline mp2 = kurentoClient.createMediaPipeline();
PlayerEndpoint playerEp2 = new PlayerEndpoint.Builder(mp2, recordingFile).build();
WebRtcEndpoint webRtcEp2 = new WebRtcEndpoint.Builder(mp2).build();
playerEp2.connect(webRtcEp2);
// Playing the recording
launchBrowser(null, webRtcEp2, playerEp2, null, expectedVideoCodec, expectedAudioCodec,
recordingFile, EXPECTED_COLOR, 0, 0, PLAYTIME / 2);
// Release Media Pipeline #2
mp2.release();
executor.shutdown();
success = true;
}
/**
* Lets wait until there are enough Ready pods of the given Deployment
*/
private void waitUntilDeploymentConfigIsScaled(final int count) {
final BlockingQueue<Object> queue = new ArrayBlockingQueue<Object>(1);
final AtomicReference<Integer> replicasRef = new AtomicReference<>(0);
final String name = checkName(getItem());
final String namespace = checkNamespace(getItem());
final Runnable deploymentPoller = () -> {
try {
DeploymentConfig deploymentConfig = get();
//If the rs is gone, we shouldn't wait.
if (deploymentConfig == null) {
if (count == 0) {
queue.put(true);
return;
} else {
queue.put(new IllegalStateException("Can't wait for DeploymentConfig: " + checkName(getItem()) + " in namespace: " + checkName(getItem()) + " to scale. Resource is no longer available."));
return;
}
}
replicasRef.set(deploymentConfig.getStatus().getReplicas());
int currentReplicas = deploymentConfig.getStatus().getReplicas() != null ? deploymentConfig.getStatus().getReplicas() : 0;
if (deploymentConfig.getStatus().getObservedGeneration() >= deploymentConfig.getMetadata().getGeneration() && Objects.equals(deploymentConfig.getSpec().getReplicas(), currentReplicas)) {
queue.put(true);
} else {
LOG.debug("Only {}/{} pods scheduled for DeploymentConfig: {} in namespace: {} seconds so waiting...",
deploymentConfig.getStatus().getReplicas(), deploymentConfig.getSpec().getReplicas(), deploymentConfig.getMetadata().getName(), namespace);
}
} catch (Throwable t) {
LOG.error("Error while waiting for Deployment to be scaled.", t);
}
};
ScheduledExecutorService executor = Executors.newSingleThreadScheduledExecutor();
ScheduledFuture poller = executor.scheduleWithFixedDelay(deploymentPoller, 0, POLL_INTERVAL_MS, TimeUnit.MILLISECONDS);
try {
if (Utils.waitUntilReady(queue, getConfig().getScaleTimeout(), TimeUnit.MILLISECONDS)) {
LOG.debug("{}/{} pod(s) ready for DeploymentConfig: {} in namespace: {}.",
replicasRef.get(), count, name, namespace);
} else {
LOG.error("{}/{} pod(s) ready for DeploymentConfig: {} in namespace: {} after waiting for {} seconds so giving up",
replicasRef.get(), count, name, namespace, TimeUnit.MILLISECONDS.toSeconds(getConfig().getScaleTimeout()));
}
} finally {
poller.cancel(true);
executor.shutdown();
}
}
@Override
public void stop(final boolean force) {
writeLock.lock();
try {
if (!isRunning()) {
return;
}
running.set(false);
if (clusterCoordinator != null) {
final Thread shutdownClusterCoordinator = new Thread(new Runnable() {
@Override
public void run() {
clusterCoordinator.shutdown();
}
});
shutdownClusterCoordinator.setDaemon(true);
shutdownClusterCoordinator.setName("Shutdown Cluster Coordinator");
shutdownClusterCoordinator.start();
}
if (!controller.isTerminated()) {
controller.shutdown(force);
}
if (configuredForClustering && senderListener != null) {
try {
senderListener.stop();
} catch (final IOException ioe) {
logger.warn("Protocol sender/listener did not stop gracefully due to: " + ioe);
}
}
final ScheduledExecutorService executorService = executor.get();
if (executorService != null) {
if (force) {
executorService.shutdownNow();
} else {
executorService.shutdown();
}
boolean graceful;
try {
graceful = executorService.awaitTermination(gracefulShutdownSeconds, TimeUnit.SECONDS);
} catch (final InterruptedException e) {
graceful = false;
}
if (!graceful) {
logger.warn("Scheduling service did not gracefully shutdown within configured " + gracefulShutdownSeconds + " second window");
}
}
} finally {
writeLock.unlock();
}
}