下面列出了java.util.concurrent.ScheduledExecutorService#execute ( ) 实例代码,或者点击链接到github查看源代码,也可以在右侧发表评论。
@Test
public void isTerminatedLongTest() {
final ScheduledExecutorService scheduler = makeScheduler(THREAD_COUNT);
try {
final int sleepTime = 100;
assertFalse(scheduler.isTerminated());
TestRunnable tr = new TestRunnable(sleepTime);
scheduler.execute(tr);
tr.blockTillStarted();
scheduler.shutdownNow();
tr.blockTillFinished();
new TestCondition(() -> scheduler.isTerminated()).blockTillTrue(1000);
} finally {
scheduler.shutdownNow();
}
}
@Test
public void awaitTerminationTest() throws InterruptedException {
ScheduledExecutorService scheduler = makeScheduler(THREAD_COUNT);
try {
assertFalse(scheduler.isTerminated());
TestRunnable tr = new TestRunnable(DELAY_TIME * 2);
long start = Clock.accurateForwardProgressingMillis();
scheduler.execute(tr);
tr.blockTillStarted();
scheduler.shutdown();
scheduler.awaitTermination(1000, TimeUnit.MILLISECONDS);
long stop = Clock.accurateForwardProgressingMillis();
assertTrue(stop - start >= (DELAY_TIME * 2) - 10);
} finally {
scheduler.shutdownNow();
}
}
@Test
public void isTerminatedShortTest() {
final ScheduledExecutorService scheduler = makeScheduler(THREAD_COUNT);
try {
assertFalse(scheduler.isTerminated());
TestRunnable tr = new TestRunnable();
scheduler.execute(tr);
tr.blockTillStarted();
scheduler.shutdownNow();
tr.blockTillFinished();
new TestCondition(() -> scheduler.isTerminated()).blockTillTrue(1000);
} finally {
scheduler.shutdownNow();
}
}
@Test
void gracefulShutdownBlockingTaskExecutor() {
final ScheduledExecutorService executor = Executors.newSingleThreadScheduledExecutor();
final Server server = Server.builder()
.blockingTaskExecutor(executor, true)
.service("/", (ctx, req) -> HttpResponse.of(200))
.build();
server.start().join();
executor.execute(() -> {
try {
Thread.sleep(processDelayMillis * 2);
} catch (InterruptedException ignored) {
// Ignored
}
});
server.stop().join();
assertThat(server.config().blockingTaskExecutor().isShutdown()).isTrue();
assertThat(server.config().blockingTaskExecutor().isTerminated()).isTrue();
}
@Test
public void testTaskFilter() {
FakeClock fakeClock = new FakeClock();
ScheduledExecutorService scheduledExecutorService = fakeClock.getScheduledExecutorService();
final AtomicBoolean selectedDone = new AtomicBoolean();
final AtomicBoolean ignoredDone = new AtomicBoolean();
final Runnable selectedRunnable = new Runnable() {
@Override
public void run() {
selectedDone.set(true);
}
};
Runnable ignoredRunnable = new Runnable() {
@Override
public void run() {
ignoredDone.set(true);
}
};
FakeClock.TaskFilter filter = new FakeClock.TaskFilter() {
@Override
public boolean shouldAccept(Runnable runnable) {
return runnable == selectedRunnable;
}
};
scheduledExecutorService.execute(selectedRunnable);
scheduledExecutorService.execute(ignoredRunnable);
assertEquals(2, fakeClock.numPendingTasks());
assertEquals(1, fakeClock.numPendingTasks(filter));
assertEquals(2, fakeClock.getPendingTasks().size());
assertEquals(1, fakeClock.getPendingTasks(filter).size());
assertSame(selectedRunnable, fakeClock.getPendingTasks(filter).iterator().next().command);
assertEquals(2, fakeClock.runDueTasks());
assertTrue(selectedDone.get());
assertTrue(ignoredDone.get());
}
@Test
public void testRejectException() throws InterruptedException {
final StringBuilder otherException = new StringBuilder();
final AtomicInteger count = new AtomicInteger();
ScheduledExecutorService executorService = Executors.newScheduledThreadPool(16);
for(int i = 0; i < 16; i++) {
executorService.execute(new Runnable() {
@Override
public void run() {
for (int i = 0; i < 4; i++) {
try {
String result = client.testString("string");
System.out.println("testString: " + result);
} catch (Exception e) {
if (e.getCause().getMessage().contains("RejectedExecutionException")) {
count.incrementAndGet();
} else {
otherException.append(e.getCause().getClass().getName());
}
}
}
}
});
}
executorService.shutdown();
while(true){
if(executorService.isTerminated()){
System.out.println("所有的子线程都结束了!");
break;
}
Thread.sleep(1000);
}
Assert.assertEquals("", otherException.toString());
Assert.assertTrue(count.get() > 0);
}
public static FakeScheduledExecutor fromScheduledExecutorService(ScheduledExecutorService mock) {
FakeScheduledExecutor executor = new FakeScheduledExecutor();
mock.schedule(
EasyMock.<Runnable>anyObject(),
EasyMock.anyLong(),
EasyMock.<TimeUnit>anyObject());
expectLastCall().andAnswer(answerExecuteWithDelay(executor)).anyTimes();
mock.execute(EasyMock.anyObject());
expectLastCall().andAnswer(answerExecute()).anyTimes();
return executor;
}
/**
* Notifies subscribed IO sample listeners that a new IO sample packet has
* been received.
*
* @param ioSample The received IO sample.
* @param remoteDevice The remote XBee device that sent the sample.
*
* @see com.digi.xbee.api.RemoteXBeeDevice
* @see com.digi.xbee.api.io.IOSample
*/
private void notifyIOSampleReceived(final RemoteXBeeDevice remoteDevice, final IOSample ioSample) {
logger.debug(connectionInterface.toString() + "IO sample received.");
try {
synchronized (ioSampleReceiveListeners) {
ScheduledExecutorService executor = Executors.newScheduledThreadPool(Math.min(MAXIMUM_PARALLEL_LISTENER_THREADS,
ioSampleReceiveListeners.size()));
for (final IIOSampleReceiveListener listener:ioSampleReceiveListeners) {
// Ensure that the reader is running to avoid a RejectedExecutionException.
if (!running)
break;
executor.execute(new Runnable() {
/*
* (non-Javadoc)
* @see java.lang.Runnable#run()
*/
@Override
public void run() {
// Synchronize the listener so it is not called
// twice. That is, let the listener to finish its job.
synchronized (listener) {
listener.ioSampleReceived(remoteDevice, ioSample);
}
}
});
}
executor.shutdown();
}
} catch (Exception e) {
logger.error(e.getMessage(), e);
}
}
/**
* Will atomically disable this service by invoking its @OnDisabled operation.
* It uses CAS operation on {@link #stateRef} to transition this service
* from ENABLED to DISABLING state. If such transition succeeds the service
* will be de-activated (see {@link ControllerServiceNode#isActive()}).
* If such transition doesn't succeed (the service is still in ENABLING state)
* then the service will still be transitioned to DISABLING state to ensure that
* no other transition could happen on this service. However in such event
* (e.g., its @OnEnabled finally succeeded), the {@link #enable(ScheduledExecutorService, long)}
* operation will initiate service disabling javadoc for (see {@link #enable(ScheduledExecutorService, long)}
* <br>
* Upon successful invocation of @OnDisabled this service will be transitioned to
* DISABLED state.
*/
@Override
public void disable(ScheduledExecutorService scheduler) {
/*
* The reason for synchronization is to ensure consistency of the
* service state when another thread is in the middle of enabling this
* service since it will attempt to transition service state from
* ENABLING to ENABLED but only if it's active.
*/
synchronized (this.active) {
this.active.set(false);
}
if (this.stateRef.compareAndSet(ControllerServiceState.ENABLED, ControllerServiceState.DISABLING)) {
final ConfigurationContext configContext = new StandardConfigurationContext(this, this.serviceProvider, null, getVariableRegistry());
scheduler.execute(new Runnable() {
@Override
public void run() {
try {
invokeDisable(configContext);
} finally {
stateRef.set(ControllerServiceState.DISABLED);
}
}
});
} else {
this.stateRef.compareAndSet(ControllerServiceState.ENABLING, ControllerServiceState.DISABLING);
}
}
/**
* Notifies subscribed SMS receive listeners that a new SMS packet has
* been received in form of an {@code SMSMessage}.
*
* @param smsMessage The SMS message to be sent to subscribed SMS listeners.
*
* @see com.digi.xbee.api.models.SMSMessage
*
* @since 1.2.0
*/
private void notifySMSReceived(final SMSMessage smsMessage) {
logger.info(connectionInterface.toString() +
"SMS received from {} >> {}.", smsMessage.getPhoneNumber(), smsMessage.getData());
try {
synchronized (smsReceiveListeners) {
ScheduledExecutorService executor = Executors.newScheduledThreadPool(Math.min(MAXIMUM_PARALLEL_LISTENER_THREADS,
smsReceiveListeners.size()));
for (final ISMSReceiveListener listener:smsReceiveListeners) {
// Ensure that the reader is running to avoid a RejectedExecutionException.
if (!running)
break;
executor.execute(new Runnable() {
/*
* (non-Javadoc)
* @see java.lang.Runnable#run()
*/
@Override
public void run() {
/* Synchronize the listener so it is not called
twice. That is, let the listener to finish its job. */
synchronized (listener) {
listener.smsReceived(smsMessage);
}
}
});
}
executor.shutdown();
}
} catch (Exception e) {
logger.error(e.getMessage(), e);
}
}
/**
* Notifies subscribed IP data receive listeners that a new IP data
* packet has been received in form of a {@code ipMessage}.
*
* @param ipMessage The IP message to be sent to subscribed
* IP data listeners.
*
* @see com.digi.xbee.api.models.IPMessage
*
* @since 1.2.0
*/
private void notifyIPDataReceived(final IPMessage ipMessage) {
logger.info(connectionInterface.toString() +
"IP data received from {} >> {}.", ipMessage.getHostAddress(), HexUtils.prettyHexString(ipMessage.getData()));
try {
synchronized (ipDataReceiveListeners) {
ScheduledExecutorService executor = Executors.newScheduledThreadPool(Math.min(MAXIMUM_PARALLEL_LISTENER_THREADS,
ipDataReceiveListeners.size()));
for (final IIPDataReceiveListener listener:ipDataReceiveListeners) {
// Ensure that the reader is running to avoid a RejectedExecutionException.
if (!running)
break;
executor.execute(new Runnable() {
/*
* (non-Javadoc)
* @see java.lang.Runnable#run()
*/
@Override
public void run() {
/* Synchronize the listener so it is not called
twice. That is, let the listener to finish its job. */
synchronized (listener) {
listener.ipDataReceived(ipMessage);
}
}
});
}
executor.shutdown();
}
} catch (Exception e) {
logger.error(e.getMessage(), e);
}
}
@Test
public void testScheduledExecutorServiceCaptures() throws InterruptedException {
// Setup
ScheduledExecutorService e = Executors.newScheduledThreadPool(10);
ScheduledExecutorService f = StateCapture.capturingDecorator(e);
CapturedState mockCapturedState = mock(CapturedState.class);
Runnable mockRunnable = mock(Runnable.class);
ThreadLocalStateCaptor.THREAD_LOCAL.set(mockCapturedState);
f.execute(mockRunnable);
e.shutdown();
e.awaitTermination(10, TimeUnit.HOURS);
verifyStandardCaptures(mockCapturedState, mockRunnable);
}
@SuppressWarnings({"FutureReturnValueIgnored", "PMD.InvalidLogMessageFormat"})
final Supplier<ScheduledExecutorService> schedulerConfiguredFrom(final ResourceUpdateConfiguration configuration) {
return () -> {
final ScheduledExecutorService configuredScheduler = Executors.newScheduledThreadPool(1,
r -> new Thread(null, r, "ResourceUpdateController (scheduler)"));
if (configuration.getScheduler().isEnabled()) {
LOGGER.debug("Register background resource update check task (interval={}, interval-unit={})",
configuration.getScheduler().getInterval(),
configuration.getScheduler().getIntervalUnit());
final ScheduledFuture<?> task = configuredScheduler.scheduleWithFixedDelay(this::run, 0, configuration.getScheduler().getInterval(),
configuration.getScheduler().getIntervalUnit());
CompletableFuture.supplyAsync(() -> {
try {
return task.get();
} catch (InterruptedException | ExecutionException e) {
throw new IllegalStateException(e);
}
}).whenComplete((x, t) -> {
if (t != null) {
LOGGER.warn("Failure in scheduled event processing", t);
}
});
} else {
LOGGER.debug("Execute one-time resource update check");
configuredScheduler.execute(this::run);
}
return configuredScheduler;
};
}
@Test
@SuppressWarnings("FutureReturnValueIgnored")
public void testPendingAndDueTasks() {
FakeClock fakeClock = new FakeClock();
ScheduledExecutorService scheduledExecutorService = fakeClock.getScheduledExecutorService();
scheduledExecutorService.schedule(newRunnable(), 200L, TimeUnit.NANOSECONDS);
scheduledExecutorService.execute(newRunnable());
scheduledExecutorService.schedule(newRunnable(), 0L, TimeUnit.NANOSECONDS);
scheduledExecutorService.schedule(newRunnable(), 80L, TimeUnit.NANOSECONDS);
scheduledExecutorService.schedule(newRunnable(), 90L, TimeUnit.NANOSECONDS);
scheduledExecutorService.schedule(newRunnable(), 100L, TimeUnit.NANOSECONDS);
scheduledExecutorService.schedule(newRunnable(), 110L, TimeUnit.NANOSECONDS);
scheduledExecutorService.schedule(newRunnable(), 120L, TimeUnit.NANOSECONDS);
assertEquals(8, fakeClock.numPendingTasks());
assertEquals(2, fakeClock.getDueTasks().size());
fakeClock.runDueTasks();
assertEquals(6, fakeClock.numPendingTasks());
assertEquals(0, fakeClock.getDueTasks().size());
fakeClock.forwardNanos(90L);
assertEquals(4, fakeClock.numPendingTasks());
assertEquals(0, fakeClock.getDueTasks().size());
fakeClock.forwardNanos(20L);
assertEquals(2, fakeClock.numPendingTasks());
assertEquals(0, fakeClock.getDueTasks().size());
}
/**
* Notifies subscribed Modem Status listeners that a Modem Status event
* packet has been received.
*
* @param modemStatusEvent The Modem Status event.
*
* @see com.digi.xbee.api.models.ModemStatusEvent
*/
private void notifyModemStatusReceived(final ModemStatusEvent modemStatusEvent) {
logger.debug(connectionInterface.toString() + "Modem Status event received.");
try {
synchronized (modemStatusListeners) {
ScheduledExecutorService executor = Executors.newScheduledThreadPool(Math.min(MAXIMUM_PARALLEL_LISTENER_THREADS,
modemStatusListeners.size()));
for (final IModemStatusReceiveListener listener:modemStatusListeners) {
// Ensure that the reader is running to avoid a RejectedExecutionException.
if (!running)
break;
executor.execute(new Runnable() {
/*
* (non-Javadoc)
* @see java.lang.Runnable#run()
*/
@Override
public void run() {
// Synchronize the listener so it is not called
// twice. That is, let the listener to finish its job.
synchronized (listener) {
listener.modemStatusEventReceived(modemStatusEvent);
}
}
});
}
executor.shutdown();
}
} catch (Exception e) {
logger.error(e.getMessage(), e);
}
}
/**
* Start the processing playback on the given scheduler.
* @param pool the scheduler
*/
public void start(final ScheduledExecutorService pool) {
pool.execute(new Runnable() {
@Override
public void run() {
prepare();
}
});
}
private void testExecuteAfterShutdown(ScheduledExecutorService scheduler) {
scheduler.shutdown();
try {
scheduler.execute(() -> {});
fail("Exception should have thrown");
} catch (RejectedExecutionException expected) {
// ignore
}
}
private void updateHealth() {
if (pingCheckFuture != null) {
pingCheckFuture.cancel(false);
}
if (updatedHealth) {
return;
}
updatedHealth = true;
ctx.updateHealth(isHealthy ? 1 : 0);
wasHealthy = isHealthy;
final ScheduledExecutorService executor = ctx.executor();
try {
// Send a long polling check immediately if:
// - Server has long polling enabled.
// - Server responded with 2xx or 5xx.
if (maxLongPollingSeconds > 0 && receivedExpectedResponse) {
executor.execute(HttpHealthChecker.this::check);
} else {
executor.schedule(HttpHealthChecker.this::check,
ctx.nextDelayMillis(), TimeUnit.MILLISECONDS);
}
} catch (RejectedExecutionException ignored) {
// Can happen if the Endpoint being checked has been disappeared from
// the delegate EndpointGroup. See HealthCheckedEndpointGroupTest.disappearedEndpoint().
}
}
/**
* Test that a streaming task will work correctly when an extractor is continuously producing records
* No converters
* Identity fork
* One writer
* @throws Exception
*/
@Test
public void testContinuousTask()
throws Exception {
for (Boolean taskExecutionSync: new Boolean[]{true, false}) {
ArrayList<Object> recordCollector = new ArrayList<>(100);
long perRecordExtractLatencyMillis = 1000; // 1 second per record
ContinuousExtractor continuousExtractor = new ContinuousExtractor(perRecordExtractLatencyMillis);
TaskContext mockTaskContext = getMockTaskContext(recordCollector, continuousExtractor, taskExecutionSync,
Integer.MAX_VALUE);
// Create a mock TaskStateTracker
TaskStateTracker mockTaskStateTracker = mock(TaskStateTracker.class);
// Create a TaskExecutor - a real TaskExecutor must be created so a Fork is run in a separate thread
TaskExecutor taskExecutor = new TaskExecutor(new Properties());
// Create the Task
Task task = new Task(mockTaskContext, mockTaskStateTracker, taskExecutor, Optional.<CountDownLatch>absent());
ScheduledExecutorService taskRunner = new ScheduledThreadPoolExecutor(1, ExecutorsUtils.newThreadFactory(Optional.of(log)));
taskRunner.execute(task);
// Let the task run for 10 seconds
int sleepIterations = 10;
int currentIteration = 0;
while (currentIteration < sleepIterations) {
Thread.sleep(1000);
currentIteration++;
Map<String, CheckpointableWatermark> externalWatermarkStorage = mockTaskContext.getWatermarkStorage()
.getCommittedWatermarks(CheckpointableWatermark.class, ImmutableList.of("default"));
if (!externalWatermarkStorage.isEmpty()) {
for (CheckpointableWatermark watermark : externalWatermarkStorage.values()) {
log.info("Observed committed watermark: {}", watermark);
}
log.info("Task progress: {}", task.getProgress());
// Ensure that watermarks seem reasonable at each step
Assert.assertTrue(continuousExtractor.validateWatermarks(false, externalWatermarkStorage));
}
}
// Let's try to shutdown the task
task.shutdown();
log.info("Shutting down task now");
boolean success = task.awaitShutdown(30000);
Assert.assertTrue(success, "Task should shutdown in 3 seconds");
log.info("Task done waiting to shutdown {}", success);
// Ensure that committed watermarks match exactly the input rows because we shutdown in an orderly manner.
Assert.assertTrue(continuousExtractor.validateWatermarks(true, mockTaskContext.getWatermarkStorage()
.getCommittedWatermarks(CheckpointableWatermark.class, ImmutableList.of("default"))));
task.commit();
Assert.assertTrue(mockTaskContext.getTaskState().getWorkingState() == WorkUnitState.WorkingState.SUCCESSFUL);
// Shutdown the executor
taskRunner.shutdown();
taskRunner.awaitTermination(100, TimeUnit.MILLISECONDS);
}
}
/**
* Notifies subscribed generic or specific User Data Relay receive listeners
* that a new User Data Relay packet has been received in form of a
* {@code UserDataRelayMessage}.
*
* @param relayMessage The User Data Relay message to be sent to subscribed
* User Data Relay listeners.
* @param notifyGeneric {@code true} to notify only the generic listeners,
* {@code false} to notify the specific ones.
*
* @see UserDataRelayMessage
*
* @since 1.3.0
*/
private void notifyUserDataRelayReceived(final UserDataRelayMessage relayMessage, final boolean notifyGeneric) {
ArrayList<?> listenerList = new ArrayList<>();
// Get the list of listeners that should be notified depending on the parameters.
if (notifyGeneric) {
listenerList = dataRelayReceiveListeners;
} else {
switch (relayMessage.getSourceInterface()) {
case SERIAL:
listenerList = serialDataReceiveListeners;
break;
case BLUETOOTH:
listenerList = bluetoothDataReceiveListeners;
break;
case MICROPYTHON:
listenerList = microPythonDataReceiveListeners;
break;
default:
break;
}
}
// Notify the appropriate listeners.
try {
synchronized (listenerList) {
ScheduledExecutorService executor = Executors.newScheduledThreadPool(Math.min(MAXIMUM_PARALLEL_LISTENER_THREADS,
listenerList.size()));
for (final Object listener : listenerList) {
// Ensure that the reader is running to avoid a RejectedExecutionException.
if (!running)
break;
executor.execute(new Runnable() {
@Override
public void run() {
/* Synchronize the listener so it is not called
twice. That is, let the listener to finish its job. */
synchronized (listener) {
if (notifyGeneric) {
((IUserDataRelayReceiveListener) listener).userDataRelayReceived(relayMessage);
} else {
switch (relayMessage.getSourceInterface()) {
case SERIAL:
((ISerialDataReceiveListener) listener).dataReceived(relayMessage.getData());
break;
case BLUETOOTH:
((IBluetoothDataReceiveListener) listener).dataReceived(relayMessage.getData());
break;
case MICROPYTHON:
((IMicroPythonDataReceiveListener) listener).dataReceived(relayMessage.getData());
break;
default:
break;
}
}
}
}
});
}
executor.shutdown();
}
} catch (Exception e) {
logger.error(e.getMessage(), e);
}
}