下面列出了怎么用org.springframework.core.task.AsyncTaskExecutor的API类实例代码及写法,或者点击链接到github查看源代码。
@Override
public Object postProcessAfterInitialization(Object bean, String beanName)
throws BeansException {
if (bean instanceof LazyTraceThreadPoolTaskExecutor
|| bean instanceof TraceableScheduledExecutorService
|| bean instanceof TraceableExecutorService
|| bean instanceof LazyTraceAsyncTaskExecutor
|| bean instanceof LazyTraceExecutor) {
log.info("Bean is already instrumented " + beanName);
return bean;
}
if (bean instanceof ThreadPoolTaskExecutor) {
return wrapThreadPoolTaskExecutor(bean);
} else if (bean instanceof ScheduledExecutorService) {
return wrapScheduledExecutorService(bean);
} else if (bean instanceof ExecutorService) {
return wrapExecutorService(bean);
} else if (bean instanceof AsyncTaskExecutor) {
return wrapAsyncTaskExecutor(bean);
} else if (bean instanceof Executor) {
return wrapExecutor(bean);
}
return bean;
}
/**
* Determine the specific executor to use when executing the given method.
* Should preferably return an {@link AsyncListenableTaskExecutor} implementation.
* @return the executor to use (or {@code null}, but just if no default executor is available)
*/
@Nullable
protected AsyncTaskExecutor determineAsyncExecutor(Method method) {
AsyncTaskExecutor executor = this.executors.get(method);
if (executor == null) {
Executor targetExecutor;
String qualifier = getExecutorQualifier(method);
if (StringUtils.hasLength(qualifier)) {
targetExecutor = findQualifiedExecutor(this.beanFactory, qualifier);
}
else {
targetExecutor = this.defaultExecutor.get();
}
if (targetExecutor == null) {
return null;
}
executor = (targetExecutor instanceof AsyncListenableTaskExecutor ?
(AsyncListenableTaskExecutor) targetExecutor : new TaskExecutorAdapter(targetExecutor));
this.executors.put(method, executor);
}
return executor;
}
/**
* Delegate for actually executing the given task with the chosen executor.
* @param task the task to execute
* @param executor the chosen executor
* @param returnType the declared return type (potentially a {@link Future} variant)
* @return the execution result (potentially a corresponding {@link Future} handle)
*/
@Nullable
protected Object doSubmit(Callable<Object> task, AsyncTaskExecutor executor, Class<?> returnType) {
if (CompletableFuture.class.isAssignableFrom(returnType)) {
return CompletableFuture.supplyAsync(() -> {
try {
return task.call();
}
catch (Throwable ex) {
throw new CompletionException(ex);
}
}, executor);
}
else if (ListenableFuture.class.isAssignableFrom(returnType)) {
return ((AsyncListenableTaskExecutor) executor).submitListenable(task);
}
else if (Future.class.isAssignableFrom(returnType)) {
return executor.submit(task);
}
else {
executor.submit(task);
return null;
}
}
@SuppressWarnings("unchecked")
@Test
public void startCallableProcessingWithAsyncTask() throws Exception {
AsyncTaskExecutor executor = mock(AsyncTaskExecutor.class);
given(this.asyncWebRequest.getNativeRequest(HttpServletRequest.class)).willReturn(this.servletRequest);
WebAsyncTask<Object> asyncTask = new WebAsyncTask<>(1000L, executor, mock(Callable.class));
this.asyncManager.startCallableProcessing(asyncTask);
verify(executor).submit((Runnable) notNull());
verify(this.asyncWebRequest).setTimeout(1000L);
verify(this.asyncWebRequest).addTimeoutHandler(any(Runnable.class));
verify(this.asyncWebRequest).addErrorHandler(any(Consumer.class));
verify(this.asyncWebRequest).addCompletionHandler(any(Runnable.class));
verify(this.asyncWebRequest).startAsync();
}
public ProgressAsyncWebProcessor buildProgressAsyncWebProcessor(){
if(asyncTaskExecutor==null){
asyncTaskExecutor = Springs.getInstance().getBean(AsyncTaskExecutor.class);
}
Assert.hasText(asynCallback);
// Assert.hasText(progressCallback);
response.setContentType(contentType);
DefaultProgressAsyncWebProcessor processor = null;
try {
if(useCompletableFeture){
processor = new CompletableProgressAsyncWebProcessor(response.getWriter(), messageTunnel, asyncTaskExecutor, /*progressCallback, */asynCallback);
}else{
processor = new DefaultProgressAsyncWebProcessor(response.getWriter(), messageTunnel, asyncTaskExecutor, /*progressCallback, */asynCallback);
}
processor.setSleepTime(flushInMilliSecond);
processor.setWriteEmptyMessage(writeEmptyMessage);
} catch (IOException e) {
throw new BaseException("build ProgressAsyncWebProcessor error: " + e.getMessage());
}
return processor;
}
/**
* Determine the specific executor to use when executing the given method.
* Should preferably return an {@link AsyncListenableTaskExecutor} implementation.
* @return the executor to use (or {@code null}, but just if no default executor is available)
*/
@Nullable
protected AsyncTaskExecutor determineAsyncExecutor(Method method) {
AsyncTaskExecutor executor = this.executors.get(method);
if (executor == null) {
Executor targetExecutor;
String qualifier = getExecutorQualifier(method);
if (StringUtils.hasLength(qualifier)) {
targetExecutor = findQualifiedExecutor(this.beanFactory, qualifier);
}
else {
targetExecutor = this.defaultExecutor.get();
}
if (targetExecutor == null) {
return null;
}
executor = (targetExecutor instanceof AsyncListenableTaskExecutor ?
(AsyncListenableTaskExecutor) targetExecutor : new TaskExecutorAdapter(targetExecutor));
this.executors.put(method, executor);
}
return executor;
}
@SuppressWarnings("unchecked")
@Test
public void startCallableProcessingWithAsyncTask() throws Exception {
AsyncTaskExecutor executor = mock(AsyncTaskExecutor.class);
given(this.asyncWebRequest.getNativeRequest(HttpServletRequest.class)).willReturn(this.servletRequest);
WebAsyncTask<Object> asyncTask = new WebAsyncTask<>(1000L, executor, mock(Callable.class));
this.asyncManager.startCallableProcessing(asyncTask);
verify(executor).submit((Runnable) notNull());
verify(this.asyncWebRequest).setTimeout(1000L);
verify(this.asyncWebRequest).addTimeoutHandler(any(Runnable.class));
verify(this.asyncWebRequest).addErrorHandler(any(Consumer.class));
verify(this.asyncWebRequest).addCompletionHandler(any(Runnable.class));
verify(this.asyncWebRequest).startAsync();
}
@SuppressWarnings("unchecked")
@Test
public void startCallableProcessingTimeoutAndCheckThreadInterrupted() throws Exception {
StubCallable callable = new StubCallable();
Future future = mock(Future.class);
AsyncTaskExecutor executor = mock(AsyncTaskExecutor.class);
when(executor.submit(any(Runnable.class))).thenReturn(future);
this.asyncManager.setTaskExecutor(executor);
this.asyncManager.startCallableProcessing(callable);
this.asyncWebRequest.onTimeout(ASYNC_EVENT);
assertTrue(this.asyncManager.hasConcurrentResult());
verify(future).cancel(true);
verifyNoMoreInteractions(future);
}
public AsyncTaskExecutor getAsyncExecutor() {
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
int availableProcessors = Runtime.getRuntime().availableProcessors();
int corePoolSize = Math.round(availableProcessors * properties.getCoreThreadTimes());
int maxPoolSize = Math.round(availableProcessors * properties.getMaxThreadTimes());
int queueCapacity = Math.round(maxPoolSize * properties.getQueueFactors());
executor.setCorePoolSize(corePoolSize);
executor.setMaxPoolSize(maxPoolSize);
executor.setQueueCapacity(queueCapacity);
executor.setKeepAliveSeconds(properties.getKeepAliveSeconds());
executor.setThreadNamePrefix(properties.getThreadNamePrefix());
executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
executor.initialize();
log.info("async executor is already now!");
log.info(
"async config:corePoolSize-{}, maxPoolSize-{}, queueCapacity-{}, keepAliveSeconds-{}, threadNamePrefix-{}",
corePoolSize, maxPoolSize, queueCapacity, properties.getKeepAliveSeconds(), properties
.getThreadNamePrefix());
return executor;
}
/**
* Create a default TaskExecutor. Called if no explicit TaskExecutor has been
* specified.
* <p>
* The default implementation builds a
* {@link org.springframework.core.task.SimpleAsyncTaskExecutor} with the specified
* bean name (or the class name, if no bean name specified) as thread name prefix.
* @return a {@link org.springframework.core.task.SimpleAsyncTaskExecutor} configured
* with the thread name prefix
* @see org.springframework.core.task.SimpleAsyncTaskExecutor#SimpleAsyncTaskExecutor(String)
*/
protected AsyncTaskExecutor createDefaultTaskExecutor() {
String beanName = getBeanName();
ThreadPoolTaskExecutor threadPoolTaskExecutor = new ThreadPoolTaskExecutor();
threadPoolTaskExecutor.setThreadNamePrefix(
beanName != null ? beanName + "-" : DEFAULT_THREAD_NAME_PREFIX);
int spinningThreads = this.getRegisteredQueues().size();
if (spinningThreads > 0) {
threadPoolTaskExecutor
.setCorePoolSize(spinningThreads * DEFAULT_WORKER_THREADS);
int maxNumberOfMessagePerBatch = getMaxNumberOfMessages() != null
? getMaxNumberOfMessages() : DEFAULT_MAX_NUMBER_OF_MESSAGES;
threadPoolTaskExecutor
.setMaxPoolSize(spinningThreads * (maxNumberOfMessagePerBatch + 1));
}
// No use of a thread pool executor queue to avoid retaining message to long in
// memory
threadPoolTaskExecutor.setQueueCapacity(0);
threadPoolTaskExecutor.afterPropertiesSet();
return threadPoolTaskExecutor;
}
@Test
public void startCallableProcessingWithAsyncTask() throws Exception {
AsyncTaskExecutor executor = mock(AsyncTaskExecutor.class);
given(this.asyncWebRequest.getNativeRequest(HttpServletRequest.class)).willReturn(this.servletRequest);
@SuppressWarnings("unchecked")
WebAsyncTask<Object> asyncTask = new WebAsyncTask<Object>(1000L, executor, mock(Callable.class));
this.asyncManager.startCallableProcessing(asyncTask);
verify(executor).submit((Runnable) notNull());
verify(this.asyncWebRequest).setTimeout(1000L);
verify(this.asyncWebRequest).addTimeoutHandler(any(Runnable.class));
verify(this.asyncWebRequest).addCompletionHandler(any(Runnable.class));
verify(this.asyncWebRequest).startAsync();
}
@Test
public void postProcessAfterInitialization() throws Exception {
assertThat(processor.postProcessAfterInitialization(mock(Executor.class), toBeExcluded).getClass(),
not(equalTo(ContextAwareExecutor.class)));
//concurrent
assertThat(processor.postProcessAfterInitialization(mock(Executor.class), beanName).getClass(),
equalTo(ContextAwareExecutor.class));
assertThat(processor.postProcessAfterInitialization(mock(ExecutorService.class), beanName).getClass(),
equalTo(ContextAwareExecutorService.class));
assertThat(processor.postProcessAfterInitialization(mock(ScheduledExecutorService.class), beanName).getClass(),
equalTo(ContextAwareScheduledExecutorService.class));
//spring
assertThat(processor.postProcessAfterInitialization(mock(TaskScheduler.class), beanName).getClass(),
equalTo(ContextAwareTaskScheduler.class));
assertThat(processor.postProcessAfterInitialization(new ThreadPoolTaskExecutor(), beanName).getClass(),
equalTo(ContextAwareThreadPoolTaskExecutor.class));
assertThat(processor.postProcessAfterInitialization(new ThreadPoolTaskScheduler(), beanName).getClass(),
equalTo(ContextAwareThreadPoolTaskScheduler.class));
assertThat(processor.postProcessAfterInitialization(mock(AsyncListenableTaskExecutor.class), beanName).getClass(),
equalTo(ContextAwareAsyncListenableTaskExecutor.class));
assertThat(processor.postProcessAfterInitialization(mock(AsyncTaskExecutor.class), beanName).getClass(),
equalTo(ContextAwareAsyncTaskExecutor.class));
assertThat(processor.postProcessAfterInitialization(mock(SchedulingTaskExecutor.class), beanName).getClass(),
equalTo(ContextAwareSchedulingTaskExecutor.class));
}
/**
* Delegate for actually executing the given task with the chosen executor.
* @param task the task to execute
* @param executor the chosen executor
* @param returnType the declared return type (potentially a {@link Future} variant)
* @return the execution result (potentially a corresponding {@link Future} handle)
*/
protected Object doSubmit(Callable<Object> task, AsyncTaskExecutor executor, Class<?> returnType) {
if (completableFuturePresent) {
Future<Object> result = CompletableFutureDelegate.processCompletableFuture(returnType, task, executor);
if (result != null) {
return result;
}
}
if (ListenableFuture.class.isAssignableFrom(returnType)) {
return ((AsyncListenableTaskExecutor) executor).submitListenable(task);
}
else if (Future.class.isAssignableFrom(returnType)) {
return executor.submit(task);
}
else {
executor.submit(task);
return null;
}
}
/**
* Delegate for actually executing the given task with the chosen executor.
* @param task the task to execute
* @param executor the chosen executor
* @param returnType the declared return type (potentially a {@link Future} variant)
* @return the execution result (potentially a corresponding {@link Future} handle)
*/
protected Object doSubmit(Callable<Object> task, AsyncTaskExecutor executor, Class<?> returnType) {
if (completableFuturePresent) {
Future<Object> result = CompletableFutureDelegate.processCompletableFuture(returnType, task, executor);
if (result != null) {
return result;
}
}
if (ListenableFuture.class.isAssignableFrom(returnType)) {
return ((AsyncListenableTaskExecutor) executor).submitListenable(task);
}
else if (Future.class.isAssignableFrom(returnType)) {
return executor.submit(task);
}
else {
executor.submit(task);
return null;
}
}
@Bean
@Primary
public AsyncTaskExecutor intermediateBuilderExecutor() {
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
executor.setCorePoolSize(intermediateCorePoolSize);
executor.setQueueCapacity(intermediateQueueCapacity);
executor.setThreadNamePrefix("intermediateBuilderExecutor-");
executor.setTaskDecorator(new MDCCleanerTaskDecorator());
executor.initialize();
return executor;
}
@Bean
public AsyncTaskExecutor asyncTaskExecutor() {
ThreadPoolTaskExecutor threadPoolTaskExecutor = new ThreadPoolTaskExecutor();
threadPoolTaskExecutor.setCorePoolSize(5);
threadPoolTaskExecutor.setMaxPoolSize(10);
threadPoolTaskExecutor.setQueueCapacity(25);
threadPoolTaskExecutor.initialize();
return new DelegatingSecurityContextAsyncTaskExecutor(threadPoolTaskExecutor);
}
private void createExecutor(QueueDetail queueDetail) {
Concurrency concurrency = queueDetail.getConcurrency();
AsyncTaskExecutor executor =
createTaskExecutor(queueDetail, concurrency.getMin(), concurrency.getMax());
Semaphore semaphore = new Semaphore(concurrency.getMax());
queueThreadMap.put(
queueDetail.getName(), new QueueThread(true, executor, semaphore, concurrency.getMax()));
}
@Bean(name = "pollableTaskExecutor")
public AsyncTaskExecutor getPollableTaskExecutor() {
ThreadPoolTaskExecutor threadPoolTaskExecutor = new ThreadPoolTaskExecutor();
threadPoolTaskExecutor.setBeanName("pollableTask");
threadPoolTaskExecutor.setCorePoolSize(5);
threadPoolTaskExecutor.setMaxPoolSize(30);
threadPoolTaskExecutor.initialize();
return new DelegatingSecurityContextAsyncTaskExecutor(threadPoolTaskExecutor);
}
@Bean
public AsyncTaskExecutor intermediateBuilderExecutor() {
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
executor.setCorePoolSize(intermediateCorePoolSize);
executor.setQueueCapacity(intermediateQueueCapacity);
executor.setThreadNamePrefix("intermediateBuilderExecutor-");
executor.setTaskDecorator(new MDCCleanerTaskDecorator());
executor.initialize();
return executor;
}
@Test
public void createContainer() {
MessageProcessor pre = new MessageProcessor() {};
MessageProcessor post = new MessageProcessor() {};
MessageProcessor deadLetter = new MessageProcessor() {};
MessageProcessor deletion = new MessageProcessor() {};
MessageProcessor discardMessageProcessor = new MessageProcessor() {};
TaskExecutionBackOff backOff = new FixedTaskExecutionBackOff(1000L, 4);
AsyncTaskExecutor executor = new TestAsyncTaskExecutor();
simpleRqueueListenerContainerFactory.setPostExecutionMessageProcessor(post);
simpleRqueueListenerContainerFactory.setPreExecutionMessageProcessor(pre);
simpleRqueueListenerContainerFactory.setDeadLetterQueueMessageProcessor(deadLetter);
simpleRqueueListenerContainerFactory.setManualDeletionMessageProcessor(deletion);
simpleRqueueListenerContainerFactory.setDiscardMessageProcessor(discardMessageProcessor);
simpleRqueueListenerContainerFactory.setRedisConnectionFactory(new LettuceConnectionFactory());
simpleRqueueListenerContainerFactory.setRqueueMessageHandler(new RqueueMessageHandler());
simpleRqueueListenerContainerFactory.setTaskExecutionBackOff(backOff);
simpleRqueueListenerContainerFactory.setTaskExecutor(executor);
simpleRqueueListenerContainerFactory.setPriorityMode(PriorityMode.WEIGHTED);
RqueueMessageListenerContainer container =
simpleRqueueListenerContainerFactory.createMessageListenerContainer();
assertNotNull(container);
assertEquals(PriorityMode.WEIGHTED, container.getPriorityMode());
assertEquals(backOff, container.getTaskExecutionBackOff());
assertEquals(executor, container.getTaskExecutor());
assertEquals(pre, container.getPreExecutionMessageProcessor());
assertEquals(post, container.getPostExecutionMessageProcessor());
assertEquals(deadLetter, container.getDeadLetterQueueMessageProcessor());
assertEquals(deletion, container.getManualDeletionMessageProcessor());
assertEquals(discardMessageProcessor, container.getDiscardMessageProcessor());
}
Object createAsyncTaskExecutorProxy(
Object bean, boolean cglibProxy,
AsyncTaskExecutor executor) {
return getProxiedObject(bean, cglibProxy, executor, () -> {
if (bean instanceof ThreadPoolTaskScheduler) {
return new LazyTraceThreadPoolTaskScheduler((ThreadPoolTaskScheduler) executor);
}
return new LazyTraceAsyncTaskExecutor(executor);
});
}
@Bean
public AsyncTaskExecutor taskExecutor() {
ThreadPoolTaskExecutor threadPoolTaskExecutor = new ThreadPoolTaskExecutor();
threadPoolTaskExecutor.setCorePoolSize(10);
threadPoolTaskExecutor.setMaxPoolSize(200);
threadPoolTaskExecutor.setQueueCapacity(0);
threadPoolTaskExecutor.setRejectedExecutionHandler(
new ThreadPoolExecutor.CallerRunsPolicy());
return threadPoolTaskExecutor;
}
/**
* Create a {@code WebAsyncTask} with a timeout value, an executor instance, and a Callable.
* @param timeout timeout value in milliseconds; ignored if {@code null}
* @param executor the executor to use
* @param callable the callable for concurrent handling
*/
public WebAsyncTask(Long timeout, AsyncTaskExecutor executor, Callable<V> callable) {
this(callable);
Assert.notNull(executor, "Executor must not be null");
this.executor = executor;
this.timeout = timeout;
}
/**
* Return the AsyncTaskExecutor to use for concurrent handling,
* or {@code null} if none specified.
*/
@Nullable
public AsyncTaskExecutor getExecutor() {
if (this.executor != null) {
return this.executor;
}
else if (this.executorName != null) {
Assert.state(this.beanFactory != null, "BeanFactory is required to look up an executor bean by name");
return this.beanFactory.getBean(this.executorName, AsyncTaskExecutor.class);
}
else {
return null;
}
}
@Bean
public AsyncTaskExecutor intermediateBuilderExecutor() {
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
executor.setCorePoolSize(intermediateCorePoolSize);
executor.setQueueCapacity(intermediateQueueCapacity);
executor.setThreadNamePrefix("intermediateBuilderExecutor-");
executor.setTaskDecorator(new MDCCleanerTaskDecorator());
executor.setWaitForTasksToCompleteOnShutdown(true);
executor.setAwaitTerminationSeconds(AWAIT_TERMINATION_SECONDS);
executor.initialize();
return executor;
}
@Before
public void setup() {
this.servletRequest = new MockHttpServletRequest("GET", "/test");
this.servletRequest.setAsyncSupported(true);
this.servletResponse = new MockHttpServletResponse();
this.asyncWebRequest = new StandardServletAsyncWebRequest(servletRequest, servletResponse);
AsyncTaskExecutor executor = mock(AsyncTaskExecutor.class);
this.asyncManager = WebAsyncUtils.getAsyncManager(servletRequest);
this.asyncManager.setTaskExecutor(executor);
this.asyncManager.setAsyncWebRequest(this.asyncWebRequest);
}
@Before
public void setup() {
this.servletRequest = new MockHttpServletRequest("GET", "/test");
this.servletRequest.setAsyncSupported(true);
this.servletResponse = new MockHttpServletResponse();
this.asyncWebRequest = new StandardServletAsyncWebRequest(servletRequest, servletResponse);
AsyncTaskExecutor executor = mock(AsyncTaskExecutor.class);
this.asyncManager = WebAsyncUtils.getAsyncManager(servletRequest);
this.asyncManager.setTaskExecutor(executor);
this.asyncManager.setAsyncWebRequest(this.asyncWebRequest);
}
@Override
@Bean (name = "taskExecutor")
public AsyncTaskExecutor getAsyncExecutor() {
log.debug("Creating Async Task Executor");
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
executor.setCorePoolSize(5);
executor.setMaxPoolSize(10);
executor.setQueueCapacity(25);
return executor;
}
/**
* Get a task executor for executing tasks asynchronously that don't need to be scheduled at a recurring rate.
*
* @param tasksExecutorPoolProperties The properties for the task executor thread pool
* @return The task executor the system to use
*/
@Bean
@ConditionalOnMissingBean(name = "genieAsyncTaskExecutor")
public AsyncTaskExecutor genieAsyncTaskExecutor(final TasksExecutorPoolProperties tasksExecutorPoolProperties) {
final ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
executor.setCorePoolSize(tasksExecutorPoolProperties.getSize());
executor.setThreadNamePrefix(tasksExecutorPoolProperties.getThreadNamePrefix());
return executor;
}
/**
* Create a {@code WebAsyncTask} with a timeout value, an executor instance, and a Callable.
* @param timeout timeout value in milliseconds; ignored if {@code null}
* @param executor the executor to use
* @param callable the callable for concurrent handling
*/
public WebAsyncTask(@Nullable Long timeout, AsyncTaskExecutor executor, Callable<V> callable) {
this(callable);
Assert.notNull(executor, "Executor must not be null");
this.executor = executor;
this.timeout = timeout;
}