java.util.concurrent.ScheduledExecutorService#shutdownNow ( )源码实例Demo

下面列出了java.util.concurrent.ScheduledExecutorService#shutdownNow ( ) 实例代码,或者点击链接到github查看源代码,也可以在右侧发表评论。

源代码1 项目: tomee   文件: Pool.java
public Pool start() {
    ScheduledExecutorService scheduledExecutorService = this.scheduler.get();
    boolean createdSES = scheduledExecutorService == null;
    if (scheduledExecutorService == null) {
        scheduledExecutorService = Executors.newScheduledThreadPool(1, new SchedulerThreadFactory());
        if (!this.scheduler.compareAndSet(null, scheduledExecutorService)) {
            scheduledExecutorService.shutdownNow();
            scheduledExecutorService = this.scheduler.get();
            createdSES = false;
        }
    }
    final ScheduledFuture<?> scheduledFuture = scheduledExecutorService.scheduleAtFixedRate(sweeper, 0, this.sweepInterval, MILLISECONDS);
    if (!this.future.compareAndSet(null, scheduledFuture)) {
        scheduledFuture.cancel(true);
    }
    if (!createdSES) {
        // we don't want to shutdown it, we'll just stop the task
        this.scheduler.set(null);
    }
    return this;
}
 
源代码2 项目: threadly   文件: ScheduledExecutorServiceTest.java
@Test
public void invokeAllTimeoutTest() throws InterruptedException {
  ScheduledExecutorService scheduler = makeScheduler(THREAD_COUNT);
  try {
    int runTime = 1000 * 10;
    int timeoutTime = 5;
    
    List<TestCallable> toInvoke = new ArrayList<>(TEST_QTY);
    for (int i = 0; i < TEST_QTY; i++) {
      toInvoke.add(new TestCallable(runTime));
    }
    
    long startTime = Clock.accurateForwardProgressingMillis();
    List<Future<Object>> result = scheduler.invokeAll(toInvoke, timeoutTime, TimeUnit.MILLISECONDS);
    long endTime = Clock.accurateForwardProgressingMillis();
    
    assertEquals(toInvoke.size(), result.size());
    
    assertTrue(endTime - startTime >= timeoutTime);
    assertTrue(endTime - startTime < timeoutTime + (SLOW_MACHINE ? 5000 : 500));
  } finally {
    scheduler.shutdownNow();
  }
}
 
源代码3 项目: threadly   文件: ScheduledExecutorServiceTest.java
@Test
public void isTerminatedLongTest() {
  final ScheduledExecutorService scheduler = makeScheduler(THREAD_COUNT);
  try {
    final int sleepTime = 100;
    
    assertFalse(scheduler.isTerminated());
    
    TestRunnable tr = new TestRunnable(sleepTime);
    scheduler.execute(tr);
    
    tr.blockTillStarted();
    scheduler.shutdownNow();

    tr.blockTillFinished();
    new TestCondition(() -> scheduler.isTerminated()).blockTillTrue(1000);
  } finally {
    scheduler.shutdownNow();
  }
}
 
源代码4 项目: AOSP-Kayboard-7.1.2   文件: ExecutorUtils.java
public static void killTasks(final String name) {
    final ScheduledExecutorService executorService = getBackgroundExecutor(name);
    executorService.shutdownNow();
    try {
        executorService.awaitTermination(5, TimeUnit.SECONDS);
    } catch (InterruptedException e) {
        Log.wtf(TAG, "Failed to shut down: " + name);
    }
    if (executorService == sExecutorServiceForTests) {
        // Don't do anything to the test service.
        return;
    }
    switch (name) {
        case KEYBOARD:
            sKeyboardExecutorService = newExecutorService(KEYBOARD);
            break;
        case SPELLING:
            sSpellingExecutorService = newExecutorService(SPELLING);
            break;
        default:
            throw new IllegalArgumentException("Invalid executor: " + name);
    }
}
 
源代码5 项目: threadly   文件: ScheduledExecutorServiceTest.java
@Test
public void awaitTerminationTest() throws InterruptedException {
  ScheduledExecutorService scheduler = makeScheduler(THREAD_COUNT);
  try {
    assertFalse(scheduler.isTerminated());
    
    TestRunnable tr = new TestRunnable(DELAY_TIME * 2);
    long start = Clock.accurateForwardProgressingMillis();
    scheduler.execute(tr);
    
    tr.blockTillStarted();
    scheduler.shutdown();

    scheduler.awaitTermination(1000, TimeUnit.MILLISECONDS);
    long stop = Clock.accurateForwardProgressingMillis();
    
    assertTrue(stop - start >= (DELAY_TIME * 2) - 10);
  } finally {
    scheduler.shutdownNow();
  }
}
 
源代码6 项目: threadly   文件: ScheduledExecutorServiceTest.java
@Test
public void submitWithResultTest() throws InterruptedException, ExecutionException {
  ScheduledExecutorService scheduler = makeScheduler(1);
  try {
    Object expectedResult = new Object();
    Future<Object> f = scheduler.submit(DoNothingRunnable.instance(), expectedResult);
    
    assertTrue(f.get() == expectedResult);
  } finally {
    scheduler.shutdownNow();
  }
}
 
源代码7 项目: tomee   文件: ThreadStackRule.java
@Override
public Statement apply(final Statement base, final Description description) {
    if (System.getProperty("os.name", "unknown").toLowerCase().startsWith("windows")) {
        return base;
    }
    return new Statement() {
        @Override
        public void evaluate() throws Throwable {
            final ScheduledExecutorService ses = Executors.newScheduledThreadPool(1, new DaemonThreadFactory(ThreadStackRule.class.getSimpleName() + "-"));
            final ScheduledFuture<?> task = ses.scheduleAtFixedRate(new Runnable() {
                @Override
                public void run() {
                    final RuntimeMXBean bean = ManagementFactory.getRuntimeMXBean();

                    String pid = bean.getName();
                    if (pid.contains("@")) {
                        pid = pid.substring(0, pid.indexOf("@"));
                    }

                    try {
                        Pipe.pipe(Runtime.getRuntime().exec("kill -3 " + pid));
                    } catch (final Exception exception) {
                        exception.printStackTrace();
                    }
                }
            }, 2, 2, TimeUnit.MINUTES);
            try {
                base.evaluate();
            } finally {
                task.cancel(true);
                ses.shutdownNow();
            }
        }
    };
}
 
源代码8 项目: threadly   文件: ScheduledExecutorServiceTest.java
@Test (expected = NullPointerException.class)
public void scheduleCallableFail() {
  ScheduledExecutorService scheduler = makeScheduler(1);
  try {
    scheduler.schedule((Callable<?>)null, 10, TimeUnit.MILLISECONDS);
  } finally {
    scheduler.shutdownNow();
  }
}
 
源代码9 项目: threadly   文件: ScheduledExecutorServiceTest.java
@Test
public void invokeAllExceptionTest() throws InterruptedException, ExecutionException {
  ScheduledExecutorService scheduler = makeScheduler(THREAD_COUNT);
  try {
    int exceptionCallableIndex = TEST_QTY / 2;
    
    List<TestCallable> toInvoke = new ArrayList<>(TEST_QTY);
    for (int i = 0; i < TEST_QTY; i++) {
      TestCallable tc;
      if (i == exceptionCallableIndex) {
        tc = new TestCallable(0) {
          @Override
          protected void handleCallStart() {
            throw new StackSuppressedRuntimeException();
          }
        };
      } else {
        tc = new TestCallable(0);
      }
      toInvoke.add(tc);
    }
    List<Future<Object>> result = scheduler.invokeAll(toInvoke);
    
    assertEquals(toInvoke.size(), result.size());
    
    Iterator<TestCallable> it = toInvoke.iterator();
    Iterator<Future<Object>> resultIt = result.iterator();
    for (int i = 0; i < TEST_QTY; i++) {
      if (i != exceptionCallableIndex) {
        assertTrue(resultIt.next().get() == it.next().getReturnedResult());
      } else {
        // skip fail entry
        resultIt.next();
        it.next();
      }
    }
  } finally {
    scheduler.shutdownNow();
  }
}
 
源代码10 项目: datacollector   文件: TestDirectorySpooler.java
@Test
public void testNoSpoolDirWithWaiting() throws Exception{
  DirectorySpooler.Builder builder = initializeAndGetBuilder()
      .setMaxSpoolFiles(1)
      .waitForPathAppearance(true);

  final DirectorySpooler spooler = builder.build();
  spooler.init("x2");
  ScheduledExecutorService schedService = new SafeScheduledExecutorService(1, "One Time pooler");
  boolean test_passed = false;
  try {
    Callable<Boolean> task = new Callable<Boolean>(){
      public Boolean call() {
        try {
          return (spooler.poolForFile(intervalMillis, TimeUnit.MILLISECONDS) != null);
        }
        catch (InterruptedException e) {
          //Task Interrupted as it did not finish the task.
        }
        return false;
      }
    };
    ScheduledFuture<Boolean> test_status = schedService.schedule(task, 0, TimeUnit.MILLISECONDS);
    assertTrue(spoolDir.mkdirs());

    File logFile = new File(spoolDir, "x2.log").getAbsoluteFile();
    new FileWriter(logFile).close();
    //Wait for 10 secs at max and then report false;
    test_passed = test_status.get(10000, TimeUnit.MILLISECONDS);

  } finally {
    spooler.destroy();
    schedService.shutdownNow();
  }
  assertTrue("Test did not pass, Spooler did not find files", test_passed);
}
 
源代码11 项目: emodb   文件: S3ScanWriterTest.java
@Test
public void testWriteWithClose()
        throws Exception {

    URI baseUri = URI.create("s3://test-bucket/scan");
    ScheduledExecutorService uploadService = Executors.newScheduledThreadPool(2);

    try {
        PutObjectResult putObjectResult = new PutObjectResult();
        putObjectResult.setETag("dummy-etag");

        AmazonS3 amazonS3 = mock(AmazonS3.class);
        when(amazonS3.putObject(argThat(putsIntoBucket("test-bucket"))))
                .thenReturn(putObjectResult);

        AmazonS3Provider amazonS3Provider = mock(AmazonS3Provider.class);
        when(amazonS3Provider.getS3ClientForBucket("test-bucket")).thenReturn(amazonS3);

        S3ScanWriter scanWriter = new S3ScanWriter(1, baseUri, Optional.of(2), new MetricRegistry(), amazonS3Provider, uploadService, new ObjectMapper());

        ScanDestinationWriter scanDestinationWriters[] = new ScanDestinationWriter[2];

        for (int i=0; i < 2; i++) {
            scanDestinationWriters[i] = scanWriter.writeShardRows("table" + i, "p0", 0, i);
            scanDestinationWriters[i].writeDocument(ImmutableMap.of("type", "review", "rating", i));
        }

        // Simulate closing shardWriter[0] but not shardWriter[1]
        scanDestinationWriters[0].closeAndTransferAsync(Optional.of(1));

        scanWriter.close();

        verifyAllTransfersComplete(scanWriter, uploadService);
    } finally {
        uploadService.shutdownNow();
    }
}
 
源代码12 项目: datacollector   文件: TestAsynchronousFileFinder.java
@Test
public void testFindAndForgetExternalExecutor() throws Exception {
  ScheduledExecutorService executor = new SafeScheduledExecutorService(1, "FileFinder");
  try {
    testFindAndForget(null);
  } finally {
    executor.shutdownNow();
  }
}
 
源代码13 项目: r2cloud   文件: Util.java
public static void shutdown(ScheduledExecutorService executor, long timeoutMillis) {
	if (executor == null) {
		return;
	}
	executor.shutdownNow();
	boolean cleanlyTerminated;
	try {
		cleanlyTerminated = executor.awaitTermination(timeoutMillis, TimeUnit.MILLISECONDS);
	} catch (InterruptedException e) {
		Thread.currentThread().interrupt();
		cleanlyTerminated = executor.isTerminated();
	}
	if (!cleanlyTerminated) {
		String threadpoolName;
		if (executor instanceof ScheduledThreadPoolExecutor) {
			ThreadFactory factory = ((ScheduledThreadPoolExecutor) executor).getThreadFactory();
			if (factory instanceof NamingThreadFactory) {
				NamingThreadFactory namingFactory = (NamingThreadFactory) factory;
				threadpoolName = namingFactory.getPrefix();
			} else {
				threadpoolName = "unknown[" + factory.getClass().getSimpleName() + "]";
			}
		} else {
			threadpoolName = "unknown[" + executor.getClass().getSimpleName() + "]";
		}
		LOG.error("executor did not terminate in the specified time: {}", threadpoolName);
	}
}
 
源代码14 项目: tasmo   文件: TasmoProcessingStatsInitializer.java
public static TasmoServiceHandle<TasmoProcessingStats> initialize(final TasmoProcessingStatsConfig config) {
    final TasmoProcessingStats processingStats = new TasmoProcessingStats();
    final ScheduledExecutorService scheduledExecutorService = Executors.newSingleThreadScheduledExecutor();

    return new TasmoServiceHandle<TasmoProcessingStats>() {

        @Override
        public TasmoProcessingStats getService() {
            return processingStats;
        }

        @Override
        public void start() throws Exception {
            int logStatsEveryNSeconds = config.getLogStatsEveryNSeconds();
            if (logStatsEveryNSeconds > 0) {
                scheduledExecutorService.scheduleWithFixedDelay(new Runnable() {
                    @Override
                    public void run() {
                        try {
                            processingStats.logStats();
                        } catch (Exception x) {
                            LOG.error("Issue with logging stats. ", x);
                        }
                    }
                }, logStatsEveryNSeconds, logStatsEveryNSeconds, TimeUnit.SECONDS);
            }
        }

        @Override
        public void stop() throws Exception {
            scheduledExecutorService.shutdownNow();
        }
    };
}
 
源代码15 项目: threadly   文件: ScheduledExecutorServiceTest.java
@Test (expected = TimeoutException.class)
public void futureGetTimeoutFail() throws InterruptedException, ExecutionException, TimeoutException {
  ScheduledExecutorService scheduler = makeScheduler(1);
  try {
    TestCallable tc = new TestCallable(100);
    Future<Object> f = scheduler.submit(tc);
    f.get(1, TimeUnit.MILLISECONDS);
    fail("Exception should have been thrown");
  } finally {
    scheduler.shutdownNow();
  }
}
 
源代码16 项目: docker-maven-plugin   文件: WatchService.java
public synchronized void watch(WatchContext context, BuildService.BuildContext buildContext, List<ImageConfiguration> images) throws DockerAccessException,
        MojoExecutionException {

    // Important to be be a single threaded scheduler since watch jobs must run serialized
    ScheduledExecutorService executor = null;
    try {
        executor = Executors.newSingleThreadScheduledExecutor();

        for (StartOrderResolver.Resolvable resolvable : runService.getImagesConfigsInOrder(queryService, images)) {
            final ImageConfiguration imageConfig = (ImageConfiguration) resolvable;

            String imageId = queryService.getImageId(imageConfig.getName());
            String containerId = runService.lookupContainer(imageConfig.getName());

            ImageWatcher watcher = new ImageWatcher(imageConfig, context, imageId, containerId);
            long interval = watcher.getInterval();

            WatchMode watchMode = watcher.getWatchMode(imageConfig);
            log.info("Watching " + imageConfig.getName() + (watchMode != null ? " using " + watchMode.getDescription() : ""));

            ArrayList<String> tasks = new ArrayList<>();

            if (imageConfig.getBuildConfiguration() != null &&
                    imageConfig.getBuildConfiguration().getAssemblyConfiguration() != null) {
                if (watcher.isCopy()) {
                    String containerBaseDir = imageConfig.getBuildConfiguration().getAssemblyConfiguration().getTargetDir();
                    schedule(executor, createCopyWatchTask(watcher, context.getMojoParameters(), containerBaseDir), interval);
                    tasks.add("copying artifacts");
                }

                if (watcher.isBuild()) {
                    schedule(executor, createBuildWatchTask(watcher, context.getMojoParameters(), watchMode == WatchMode.both, buildContext), interval);
                    tasks.add("rebuilding");
                }
            }

            if (watcher.isRun() && watcher.getContainerId() != null) {
                schedule(executor, createRestartWatchTask(watcher), interval);
                tasks.add("restarting");
            }

            if (tasks.size() > 0) {
                log.info("%s: Watch for %s", imageConfig.getDescription(), StringUtils.join(tasks.toArray(), " and "));
            }
        }
        log.info("Waiting ...");
        if (!context.isKeepRunning()) {
            runService.addShutdownHookForStoppingContainers(context.isKeepContainer(), context.isRemoveVolumes(), context.isAutoCreateCustomNetworks());
        }
        wait();
    } catch (InterruptedException e) {
        log.warn("Interrupted");
    } finally {
        if (executor != null) {
            executor.shutdownNow();
        }
    }
}
 
源代码17 项目: nifi   文件: NiFi.java
/**
 * Determine if the machine we're running on has timing issues.
 */
private void detectTimingIssues() {
    final int minRequiredOccurrences = 25;
    final int maxOccurrencesOutOfRange = 15;
    final AtomicLong lastTriggerMillis = new AtomicLong(System.currentTimeMillis());

    final ScheduledExecutorService service = Executors.newScheduledThreadPool(1, new ThreadFactory() {
        private final ThreadFactory defaultFactory = Executors.defaultThreadFactory();

        @Override
        public Thread newThread(final Runnable r) {
            final Thread t = defaultFactory.newThread(r);
            t.setDaemon(true);
            t.setName("Detect Timing Issues");
            return t;
        }
    });

    final AtomicInteger occurrencesOutOfRange = new AtomicInteger(0);
    final AtomicInteger occurrences = new AtomicInteger(0);
    final Runnable command = new Runnable() {
        @Override
        public void run() {
            final long curMillis = System.currentTimeMillis();
            final long difference = curMillis - lastTriggerMillis.get();
            final long millisOff = Math.abs(difference - 2000L);
            occurrences.incrementAndGet();
            if (millisOff > 500L) {
                occurrencesOutOfRange.incrementAndGet();
            }
            lastTriggerMillis.set(curMillis);
        }
    };

    final ScheduledFuture<?> future = service.scheduleWithFixedDelay(command, 2000L, 2000L, TimeUnit.MILLISECONDS);

    final TimerTask timerTask = new TimerTask() {
        @Override
        public void run() {
            future.cancel(true);
            service.shutdownNow();

            if (occurrences.get() < minRequiredOccurrences || occurrencesOutOfRange.get() > maxOccurrencesOutOfRange) {
                LOGGER.warn("NiFi has detected that this box is not responding within the expected timing interval, which may cause "
                        + "Processors to be scheduled erratically. Please see the NiFi documentation for more information.");
            }
        }
    };
    final Timer timer = new Timer(true);
    timer.schedule(timerTask, 60000L);
}
 
源代码18 项目: tasmo   文件: TasmoViewModelInitializer.java
public static TasmoServiceHandle<TasmoViewModel> initialize(
    ViewsProvider viewsProvider,
    ViewPathKeyProvider viewPathKeyProvider,
    final TasmoViewModelConfig config) {

    final TenantId masterTenantId = new TenantId(config.getModelMasterTenantId());
    final TasmoViewModel tasmoViewModel = new TasmoViewModel(
        masterTenantId,
        viewsProvider,
        viewPathKeyProvider);
    tasmoViewModel.loadModel(masterTenantId); // Move to start method?


    final ScheduledExecutorService scheduledExecutorService = Executors.newSingleThreadScheduledExecutor();

    return new TasmoServiceHandle<TasmoViewModel>() {

        @Override
        public TasmoViewModel getService() {
            return tasmoViewModel;
        }

        @Override
        public void start() throws Exception {

            scheduledExecutorService.scheduleWithFixedDelay(new Runnable() {
                @Override
                public void run() {
                    try {
                        tasmoViewModel.reloadModels();
                    } catch (Exception x) {
                        LOG.error("Scheduled reloading of tasmo view model failed. ", x);
                    }
                }
            }, config.getPollForModelChangesEveryNSeconds(), config.getPollForModelChangesEveryNSeconds(), TimeUnit.SECONDS);
        }

        @Override
        public void stop() throws Exception {
            scheduledExecutorService.shutdownNow();
        }
    };
}
 
源代码19 项目: reactor-core   文件: SchedulersTest.java
@Test
public void scanSupportBuffered() throws InterruptedException {
	Executor plain = Runnable::run;
	ExecutorService plainService = Executors.newSingleThreadExecutor();

	ExecutorService threadPool = Executors.newFixedThreadPool(3);
	ScheduledExecutorService scheduledThreadPool = Executors.newScheduledThreadPool(4);

	DelegateServiceScheduler.UnsupportedScheduledExecutorService unsupportedScheduledExecutorService =
			new DelegateServiceScheduler.UnsupportedScheduledExecutorService(threadPool);

	try {
		assertThat(Schedulers.scanExecutor(plain, Scannable.Attr.BUFFERED))
				.as("plain").isEqualTo(null);
		assertThat(Schedulers.scanExecutor(plainService, Scannable.Attr.BUFFERED))
				.as("plainService").isEqualTo(null);

		scheduledThreadPool.schedule(() -> {}, 500, TimeUnit.MILLISECONDS);
		scheduledThreadPool.schedule(() -> {}, 500, TimeUnit.MILLISECONDS);
		Thread.sleep(50); //give some leeway for the pool to have consistent accounting

		assertThat(Schedulers.scanExecutor(scheduledThreadPool, Scannable.Attr.BUFFERED))
				.as("scheduledThreadPool").isEqualTo(2);

		threadPool.submit(() -> {
			try { Thread.sleep(200); } catch (InterruptedException e) { e.printStackTrace(); }
		});

		assertThat(Schedulers.scanExecutor(threadPool, Scannable.Attr.BUFFERED))
				.as("threadPool").isEqualTo(1);
		assertThat(Schedulers.scanExecutor(unsupportedScheduledExecutorService, Scannable.Attr.BUFFERED))
				.as("unwrapped").isEqualTo(1);

		Thread.sleep(400);

		assertThat(Schedulers.scanExecutor(unsupportedScheduledExecutorService, Scannable.Attr.BUFFERED))
				.as("unwrapped after task").isEqualTo(0);
	}
	finally {
		plainService.shutdownNow();
		unsupportedScheduledExecutorService.shutdownNow();
		threadPool.shutdownNow();
		scheduledThreadPool.shutdownNow();
	}
}
 
源代码20 项目: emodb   文件: DatabusJerseyTest.java
private void testLongPoll(boolean includeTags) throws Exception {
    // Resource tests don't support asynchronous requests, so do the next best thing and use a DatabusResourcePoller
    // directly.

    ScheduledExecutorService keepAliveService = Executors.newSingleThreadScheduledExecutor();
    ScheduledExecutorService pollService = Executors.newSingleThreadScheduledExecutor();

    try {
        DatabusResourcePoller poller = new DatabusResourcePoller(
                Optional.of(new LongPollingExecutorServices(pollService, keepAliveService)), new MetricRegistry());

        SubjectDatabus databus = mock(SubjectDatabus.class);
        List<Event> pollResults = ImmutableList.of(
                new Event("id-1", ImmutableMap.of("key-1", "value-1"), ImmutableList.<List<String>>of(ImmutableList.<String>of("tag-1"))),
                new Event("id-2", ImmutableMap.of("key-2", "value-2"), ImmutableList.<List<String>>of(ImmutableList.<String>of("tag-2"))));
        //noinspection unchecked
        when(databus.poll(isSubject(), eq("queue-name"), eq(Duration.ofSeconds(10)), eq(100)))
                .thenReturn(new PollResult(Iterators.emptyIterator(), 0, false))
                .thenReturn(new PollResult(pollResults.iterator(), 2, true));

        List<Event> expected;
        Class<? extends EventViews.ContentOnly> view;

        if (includeTags) {
            // This is the default poll behavior
            expected = pollResults;
            view = EventViews.WithTags.class;
        } else {
            // Tags won't be returned
            expected = ImmutableList.of(
                    new Event("id-1", ImmutableMap.of("key-1", "value-1"), ImmutableList.<List<String>>of()),
                    new Event("id-2", ImmutableMap.of("key-2", "value-2"), ImmutableList.<List<String>>of()));
            view = EventViews.ContentOnly.class;
        }

        final StringWriter out = new StringWriter();
        final AtomicBoolean complete = new AtomicBoolean(false);

        HttpServletRequest request = setupLongPollingTest(out, complete);

        PeekOrPollResponseHelper helper = new PeekOrPollResponseHelper(view);
        poller.poll(createSubject(), databus, "queue-name", Duration.ofSeconds(10), 100, request, false, helper);

        long failTime = System.currentTimeMillis() + Duration.ofSeconds(10).toMillis();

        while (!complete.get() && System.currentTimeMillis() < failTime) {
            Thread.sleep(100);
        }

        assertTrue(complete.get());

        List<Event> actual = JsonHelper.convert(
                JsonHelper.fromJson(out.toString(), List.class), new TypeReference<List<Event>>() {
        });

        assertEquals(actual, expected);
        verify(request).startAsync();
        verify(request.getAsyncContext()).complete();
    } finally {
        keepAliveService.shutdownNow();
        pollService.shutdownNow();
    }
}