下面列出了java.util.concurrent.ScheduledExecutorService#shutdownNow ( ) 实例代码,或者点击链接到github查看源代码,也可以在右侧发表评论。
public Pool start() {
ScheduledExecutorService scheduledExecutorService = this.scheduler.get();
boolean createdSES = scheduledExecutorService == null;
if (scheduledExecutorService == null) {
scheduledExecutorService = Executors.newScheduledThreadPool(1, new SchedulerThreadFactory());
if (!this.scheduler.compareAndSet(null, scheduledExecutorService)) {
scheduledExecutorService.shutdownNow();
scheduledExecutorService = this.scheduler.get();
createdSES = false;
}
}
final ScheduledFuture<?> scheduledFuture = scheduledExecutorService.scheduleAtFixedRate(sweeper, 0, this.sweepInterval, MILLISECONDS);
if (!this.future.compareAndSet(null, scheduledFuture)) {
scheduledFuture.cancel(true);
}
if (!createdSES) {
// we don't want to shutdown it, we'll just stop the task
this.scheduler.set(null);
}
return this;
}
@Test
public void invokeAllTimeoutTest() throws InterruptedException {
ScheduledExecutorService scheduler = makeScheduler(THREAD_COUNT);
try {
int runTime = 1000 * 10;
int timeoutTime = 5;
List<TestCallable> toInvoke = new ArrayList<>(TEST_QTY);
for (int i = 0; i < TEST_QTY; i++) {
toInvoke.add(new TestCallable(runTime));
}
long startTime = Clock.accurateForwardProgressingMillis();
List<Future<Object>> result = scheduler.invokeAll(toInvoke, timeoutTime, TimeUnit.MILLISECONDS);
long endTime = Clock.accurateForwardProgressingMillis();
assertEquals(toInvoke.size(), result.size());
assertTrue(endTime - startTime >= timeoutTime);
assertTrue(endTime - startTime < timeoutTime + (SLOW_MACHINE ? 5000 : 500));
} finally {
scheduler.shutdownNow();
}
}
@Test
public void isTerminatedLongTest() {
final ScheduledExecutorService scheduler = makeScheduler(THREAD_COUNT);
try {
final int sleepTime = 100;
assertFalse(scheduler.isTerminated());
TestRunnable tr = new TestRunnable(sleepTime);
scheduler.execute(tr);
tr.blockTillStarted();
scheduler.shutdownNow();
tr.blockTillFinished();
new TestCondition(() -> scheduler.isTerminated()).blockTillTrue(1000);
} finally {
scheduler.shutdownNow();
}
}
public static void killTasks(final String name) {
final ScheduledExecutorService executorService = getBackgroundExecutor(name);
executorService.shutdownNow();
try {
executorService.awaitTermination(5, TimeUnit.SECONDS);
} catch (InterruptedException e) {
Log.wtf(TAG, "Failed to shut down: " + name);
}
if (executorService == sExecutorServiceForTests) {
// Don't do anything to the test service.
return;
}
switch (name) {
case KEYBOARD:
sKeyboardExecutorService = newExecutorService(KEYBOARD);
break;
case SPELLING:
sSpellingExecutorService = newExecutorService(SPELLING);
break;
default:
throw new IllegalArgumentException("Invalid executor: " + name);
}
}
@Test
public void awaitTerminationTest() throws InterruptedException {
ScheduledExecutorService scheduler = makeScheduler(THREAD_COUNT);
try {
assertFalse(scheduler.isTerminated());
TestRunnable tr = new TestRunnable(DELAY_TIME * 2);
long start = Clock.accurateForwardProgressingMillis();
scheduler.execute(tr);
tr.blockTillStarted();
scheduler.shutdown();
scheduler.awaitTermination(1000, TimeUnit.MILLISECONDS);
long stop = Clock.accurateForwardProgressingMillis();
assertTrue(stop - start >= (DELAY_TIME * 2) - 10);
} finally {
scheduler.shutdownNow();
}
}
@Test
public void submitWithResultTest() throws InterruptedException, ExecutionException {
ScheduledExecutorService scheduler = makeScheduler(1);
try {
Object expectedResult = new Object();
Future<Object> f = scheduler.submit(DoNothingRunnable.instance(), expectedResult);
assertTrue(f.get() == expectedResult);
} finally {
scheduler.shutdownNow();
}
}
@Override
public Statement apply(final Statement base, final Description description) {
if (System.getProperty("os.name", "unknown").toLowerCase().startsWith("windows")) {
return base;
}
return new Statement() {
@Override
public void evaluate() throws Throwable {
final ScheduledExecutorService ses = Executors.newScheduledThreadPool(1, new DaemonThreadFactory(ThreadStackRule.class.getSimpleName() + "-"));
final ScheduledFuture<?> task = ses.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
final RuntimeMXBean bean = ManagementFactory.getRuntimeMXBean();
String pid = bean.getName();
if (pid.contains("@")) {
pid = pid.substring(0, pid.indexOf("@"));
}
try {
Pipe.pipe(Runtime.getRuntime().exec("kill -3 " + pid));
} catch (final Exception exception) {
exception.printStackTrace();
}
}
}, 2, 2, TimeUnit.MINUTES);
try {
base.evaluate();
} finally {
task.cancel(true);
ses.shutdownNow();
}
}
};
}
@Test (expected = NullPointerException.class)
public void scheduleCallableFail() {
ScheduledExecutorService scheduler = makeScheduler(1);
try {
scheduler.schedule((Callable<?>)null, 10, TimeUnit.MILLISECONDS);
} finally {
scheduler.shutdownNow();
}
}
@Test
public void invokeAllExceptionTest() throws InterruptedException, ExecutionException {
ScheduledExecutorService scheduler = makeScheduler(THREAD_COUNT);
try {
int exceptionCallableIndex = TEST_QTY / 2;
List<TestCallable> toInvoke = new ArrayList<>(TEST_QTY);
for (int i = 0; i < TEST_QTY; i++) {
TestCallable tc;
if (i == exceptionCallableIndex) {
tc = new TestCallable(0) {
@Override
protected void handleCallStart() {
throw new StackSuppressedRuntimeException();
}
};
} else {
tc = new TestCallable(0);
}
toInvoke.add(tc);
}
List<Future<Object>> result = scheduler.invokeAll(toInvoke);
assertEquals(toInvoke.size(), result.size());
Iterator<TestCallable> it = toInvoke.iterator();
Iterator<Future<Object>> resultIt = result.iterator();
for (int i = 0; i < TEST_QTY; i++) {
if (i != exceptionCallableIndex) {
assertTrue(resultIt.next().get() == it.next().getReturnedResult());
} else {
// skip fail entry
resultIt.next();
it.next();
}
}
} finally {
scheduler.shutdownNow();
}
}
@Test
public void testNoSpoolDirWithWaiting() throws Exception{
DirectorySpooler.Builder builder = initializeAndGetBuilder()
.setMaxSpoolFiles(1)
.waitForPathAppearance(true);
final DirectorySpooler spooler = builder.build();
spooler.init("x2");
ScheduledExecutorService schedService = new SafeScheduledExecutorService(1, "One Time pooler");
boolean test_passed = false;
try {
Callable<Boolean> task = new Callable<Boolean>(){
public Boolean call() {
try {
return (spooler.poolForFile(intervalMillis, TimeUnit.MILLISECONDS) != null);
}
catch (InterruptedException e) {
//Task Interrupted as it did not finish the task.
}
return false;
}
};
ScheduledFuture<Boolean> test_status = schedService.schedule(task, 0, TimeUnit.MILLISECONDS);
assertTrue(spoolDir.mkdirs());
File logFile = new File(spoolDir, "x2.log").getAbsoluteFile();
new FileWriter(logFile).close();
//Wait for 10 secs at max and then report false;
test_passed = test_status.get(10000, TimeUnit.MILLISECONDS);
} finally {
spooler.destroy();
schedService.shutdownNow();
}
assertTrue("Test did not pass, Spooler did not find files", test_passed);
}
@Test
public void testWriteWithClose()
throws Exception {
URI baseUri = URI.create("s3://test-bucket/scan");
ScheduledExecutorService uploadService = Executors.newScheduledThreadPool(2);
try {
PutObjectResult putObjectResult = new PutObjectResult();
putObjectResult.setETag("dummy-etag");
AmazonS3 amazonS3 = mock(AmazonS3.class);
when(amazonS3.putObject(argThat(putsIntoBucket("test-bucket"))))
.thenReturn(putObjectResult);
AmazonS3Provider amazonS3Provider = mock(AmazonS3Provider.class);
when(amazonS3Provider.getS3ClientForBucket("test-bucket")).thenReturn(amazonS3);
S3ScanWriter scanWriter = new S3ScanWriter(1, baseUri, Optional.of(2), new MetricRegistry(), amazonS3Provider, uploadService, new ObjectMapper());
ScanDestinationWriter scanDestinationWriters[] = new ScanDestinationWriter[2];
for (int i=0; i < 2; i++) {
scanDestinationWriters[i] = scanWriter.writeShardRows("table" + i, "p0", 0, i);
scanDestinationWriters[i].writeDocument(ImmutableMap.of("type", "review", "rating", i));
}
// Simulate closing shardWriter[0] but not shardWriter[1]
scanDestinationWriters[0].closeAndTransferAsync(Optional.of(1));
scanWriter.close();
verifyAllTransfersComplete(scanWriter, uploadService);
} finally {
uploadService.shutdownNow();
}
}
@Test
public void testFindAndForgetExternalExecutor() throws Exception {
ScheduledExecutorService executor = new SafeScheduledExecutorService(1, "FileFinder");
try {
testFindAndForget(null);
} finally {
executor.shutdownNow();
}
}
public static void shutdown(ScheduledExecutorService executor, long timeoutMillis) {
if (executor == null) {
return;
}
executor.shutdownNow();
boolean cleanlyTerminated;
try {
cleanlyTerminated = executor.awaitTermination(timeoutMillis, TimeUnit.MILLISECONDS);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
cleanlyTerminated = executor.isTerminated();
}
if (!cleanlyTerminated) {
String threadpoolName;
if (executor instanceof ScheduledThreadPoolExecutor) {
ThreadFactory factory = ((ScheduledThreadPoolExecutor) executor).getThreadFactory();
if (factory instanceof NamingThreadFactory) {
NamingThreadFactory namingFactory = (NamingThreadFactory) factory;
threadpoolName = namingFactory.getPrefix();
} else {
threadpoolName = "unknown[" + factory.getClass().getSimpleName() + "]";
}
} else {
threadpoolName = "unknown[" + executor.getClass().getSimpleName() + "]";
}
LOG.error("executor did not terminate in the specified time: {}", threadpoolName);
}
}
public static TasmoServiceHandle<TasmoProcessingStats> initialize(final TasmoProcessingStatsConfig config) {
final TasmoProcessingStats processingStats = new TasmoProcessingStats();
final ScheduledExecutorService scheduledExecutorService = Executors.newSingleThreadScheduledExecutor();
return new TasmoServiceHandle<TasmoProcessingStats>() {
@Override
public TasmoProcessingStats getService() {
return processingStats;
}
@Override
public void start() throws Exception {
int logStatsEveryNSeconds = config.getLogStatsEveryNSeconds();
if (logStatsEveryNSeconds > 0) {
scheduledExecutorService.scheduleWithFixedDelay(new Runnable() {
@Override
public void run() {
try {
processingStats.logStats();
} catch (Exception x) {
LOG.error("Issue with logging stats. ", x);
}
}
}, logStatsEveryNSeconds, logStatsEveryNSeconds, TimeUnit.SECONDS);
}
}
@Override
public void stop() throws Exception {
scheduledExecutorService.shutdownNow();
}
};
}
@Test (expected = TimeoutException.class)
public void futureGetTimeoutFail() throws InterruptedException, ExecutionException, TimeoutException {
ScheduledExecutorService scheduler = makeScheduler(1);
try {
TestCallable tc = new TestCallable(100);
Future<Object> f = scheduler.submit(tc);
f.get(1, TimeUnit.MILLISECONDS);
fail("Exception should have been thrown");
} finally {
scheduler.shutdownNow();
}
}
public synchronized void watch(WatchContext context, BuildService.BuildContext buildContext, List<ImageConfiguration> images) throws DockerAccessException,
MojoExecutionException {
// Important to be be a single threaded scheduler since watch jobs must run serialized
ScheduledExecutorService executor = null;
try {
executor = Executors.newSingleThreadScheduledExecutor();
for (StartOrderResolver.Resolvable resolvable : runService.getImagesConfigsInOrder(queryService, images)) {
final ImageConfiguration imageConfig = (ImageConfiguration) resolvable;
String imageId = queryService.getImageId(imageConfig.getName());
String containerId = runService.lookupContainer(imageConfig.getName());
ImageWatcher watcher = new ImageWatcher(imageConfig, context, imageId, containerId);
long interval = watcher.getInterval();
WatchMode watchMode = watcher.getWatchMode(imageConfig);
log.info("Watching " + imageConfig.getName() + (watchMode != null ? " using " + watchMode.getDescription() : ""));
ArrayList<String> tasks = new ArrayList<>();
if (imageConfig.getBuildConfiguration() != null &&
imageConfig.getBuildConfiguration().getAssemblyConfiguration() != null) {
if (watcher.isCopy()) {
String containerBaseDir = imageConfig.getBuildConfiguration().getAssemblyConfiguration().getTargetDir();
schedule(executor, createCopyWatchTask(watcher, context.getMojoParameters(), containerBaseDir), interval);
tasks.add("copying artifacts");
}
if (watcher.isBuild()) {
schedule(executor, createBuildWatchTask(watcher, context.getMojoParameters(), watchMode == WatchMode.both, buildContext), interval);
tasks.add("rebuilding");
}
}
if (watcher.isRun() && watcher.getContainerId() != null) {
schedule(executor, createRestartWatchTask(watcher), interval);
tasks.add("restarting");
}
if (tasks.size() > 0) {
log.info("%s: Watch for %s", imageConfig.getDescription(), StringUtils.join(tasks.toArray(), " and "));
}
}
log.info("Waiting ...");
if (!context.isKeepRunning()) {
runService.addShutdownHookForStoppingContainers(context.isKeepContainer(), context.isRemoveVolumes(), context.isAutoCreateCustomNetworks());
}
wait();
} catch (InterruptedException e) {
log.warn("Interrupted");
} finally {
if (executor != null) {
executor.shutdownNow();
}
}
}
/**
* Determine if the machine we're running on has timing issues.
*/
private void detectTimingIssues() {
final int minRequiredOccurrences = 25;
final int maxOccurrencesOutOfRange = 15;
final AtomicLong lastTriggerMillis = new AtomicLong(System.currentTimeMillis());
final ScheduledExecutorService service = Executors.newScheduledThreadPool(1, new ThreadFactory() {
private final ThreadFactory defaultFactory = Executors.defaultThreadFactory();
@Override
public Thread newThread(final Runnable r) {
final Thread t = defaultFactory.newThread(r);
t.setDaemon(true);
t.setName("Detect Timing Issues");
return t;
}
});
final AtomicInteger occurrencesOutOfRange = new AtomicInteger(0);
final AtomicInteger occurrences = new AtomicInteger(0);
final Runnable command = new Runnable() {
@Override
public void run() {
final long curMillis = System.currentTimeMillis();
final long difference = curMillis - lastTriggerMillis.get();
final long millisOff = Math.abs(difference - 2000L);
occurrences.incrementAndGet();
if (millisOff > 500L) {
occurrencesOutOfRange.incrementAndGet();
}
lastTriggerMillis.set(curMillis);
}
};
final ScheduledFuture<?> future = service.scheduleWithFixedDelay(command, 2000L, 2000L, TimeUnit.MILLISECONDS);
final TimerTask timerTask = new TimerTask() {
@Override
public void run() {
future.cancel(true);
service.shutdownNow();
if (occurrences.get() < minRequiredOccurrences || occurrencesOutOfRange.get() > maxOccurrencesOutOfRange) {
LOGGER.warn("NiFi has detected that this box is not responding within the expected timing interval, which may cause "
+ "Processors to be scheduled erratically. Please see the NiFi documentation for more information.");
}
}
};
final Timer timer = new Timer(true);
timer.schedule(timerTask, 60000L);
}
public static TasmoServiceHandle<TasmoViewModel> initialize(
ViewsProvider viewsProvider,
ViewPathKeyProvider viewPathKeyProvider,
final TasmoViewModelConfig config) {
final TenantId masterTenantId = new TenantId(config.getModelMasterTenantId());
final TasmoViewModel tasmoViewModel = new TasmoViewModel(
masterTenantId,
viewsProvider,
viewPathKeyProvider);
tasmoViewModel.loadModel(masterTenantId); // Move to start method?
final ScheduledExecutorService scheduledExecutorService = Executors.newSingleThreadScheduledExecutor();
return new TasmoServiceHandle<TasmoViewModel>() {
@Override
public TasmoViewModel getService() {
return tasmoViewModel;
}
@Override
public void start() throws Exception {
scheduledExecutorService.scheduleWithFixedDelay(new Runnable() {
@Override
public void run() {
try {
tasmoViewModel.reloadModels();
} catch (Exception x) {
LOG.error("Scheduled reloading of tasmo view model failed. ", x);
}
}
}, config.getPollForModelChangesEveryNSeconds(), config.getPollForModelChangesEveryNSeconds(), TimeUnit.SECONDS);
}
@Override
public void stop() throws Exception {
scheduledExecutorService.shutdownNow();
}
};
}
@Test
public void scanSupportBuffered() throws InterruptedException {
Executor plain = Runnable::run;
ExecutorService plainService = Executors.newSingleThreadExecutor();
ExecutorService threadPool = Executors.newFixedThreadPool(3);
ScheduledExecutorService scheduledThreadPool = Executors.newScheduledThreadPool(4);
DelegateServiceScheduler.UnsupportedScheduledExecutorService unsupportedScheduledExecutorService =
new DelegateServiceScheduler.UnsupportedScheduledExecutorService(threadPool);
try {
assertThat(Schedulers.scanExecutor(plain, Scannable.Attr.BUFFERED))
.as("plain").isEqualTo(null);
assertThat(Schedulers.scanExecutor(plainService, Scannable.Attr.BUFFERED))
.as("plainService").isEqualTo(null);
scheduledThreadPool.schedule(() -> {}, 500, TimeUnit.MILLISECONDS);
scheduledThreadPool.schedule(() -> {}, 500, TimeUnit.MILLISECONDS);
Thread.sleep(50); //give some leeway for the pool to have consistent accounting
assertThat(Schedulers.scanExecutor(scheduledThreadPool, Scannable.Attr.BUFFERED))
.as("scheduledThreadPool").isEqualTo(2);
threadPool.submit(() -> {
try { Thread.sleep(200); } catch (InterruptedException e) { e.printStackTrace(); }
});
assertThat(Schedulers.scanExecutor(threadPool, Scannable.Attr.BUFFERED))
.as("threadPool").isEqualTo(1);
assertThat(Schedulers.scanExecutor(unsupportedScheduledExecutorService, Scannable.Attr.BUFFERED))
.as("unwrapped").isEqualTo(1);
Thread.sleep(400);
assertThat(Schedulers.scanExecutor(unsupportedScheduledExecutorService, Scannable.Attr.BUFFERED))
.as("unwrapped after task").isEqualTo(0);
}
finally {
plainService.shutdownNow();
unsupportedScheduledExecutorService.shutdownNow();
threadPool.shutdownNow();
scheduledThreadPool.shutdownNow();
}
}
private void testLongPoll(boolean includeTags) throws Exception {
// Resource tests don't support asynchronous requests, so do the next best thing and use a DatabusResourcePoller
// directly.
ScheduledExecutorService keepAliveService = Executors.newSingleThreadScheduledExecutor();
ScheduledExecutorService pollService = Executors.newSingleThreadScheduledExecutor();
try {
DatabusResourcePoller poller = new DatabusResourcePoller(
Optional.of(new LongPollingExecutorServices(pollService, keepAliveService)), new MetricRegistry());
SubjectDatabus databus = mock(SubjectDatabus.class);
List<Event> pollResults = ImmutableList.of(
new Event("id-1", ImmutableMap.of("key-1", "value-1"), ImmutableList.<List<String>>of(ImmutableList.<String>of("tag-1"))),
new Event("id-2", ImmutableMap.of("key-2", "value-2"), ImmutableList.<List<String>>of(ImmutableList.<String>of("tag-2"))));
//noinspection unchecked
when(databus.poll(isSubject(), eq("queue-name"), eq(Duration.ofSeconds(10)), eq(100)))
.thenReturn(new PollResult(Iterators.emptyIterator(), 0, false))
.thenReturn(new PollResult(pollResults.iterator(), 2, true));
List<Event> expected;
Class<? extends EventViews.ContentOnly> view;
if (includeTags) {
// This is the default poll behavior
expected = pollResults;
view = EventViews.WithTags.class;
} else {
// Tags won't be returned
expected = ImmutableList.of(
new Event("id-1", ImmutableMap.of("key-1", "value-1"), ImmutableList.<List<String>>of()),
new Event("id-2", ImmutableMap.of("key-2", "value-2"), ImmutableList.<List<String>>of()));
view = EventViews.ContentOnly.class;
}
final StringWriter out = new StringWriter();
final AtomicBoolean complete = new AtomicBoolean(false);
HttpServletRequest request = setupLongPollingTest(out, complete);
PeekOrPollResponseHelper helper = new PeekOrPollResponseHelper(view);
poller.poll(createSubject(), databus, "queue-name", Duration.ofSeconds(10), 100, request, false, helper);
long failTime = System.currentTimeMillis() + Duration.ofSeconds(10).toMillis();
while (!complete.get() && System.currentTimeMillis() < failTime) {
Thread.sleep(100);
}
assertTrue(complete.get());
List<Event> actual = JsonHelper.convert(
JsonHelper.fromJson(out.toString(), List.class), new TypeReference<List<Event>>() {
});
assertEquals(actual, expected);
verify(request).startAsync();
verify(request.getAsyncContext()).complete();
} finally {
keepAliveService.shutdownNow();
pollService.shutdownNow();
}
}