下面列出了org.springframework.core.task.support.TaskExecutorAdapter#org.springframework.core.task.AsyncListenableTaskExecutor 实例代码,或者点击链接到github查看源代码,也可以在右侧发表评论。
/**
* Determine the specific executor to use when executing the given method.
* Should preferably return an {@link AsyncListenableTaskExecutor} implementation.
* @return the executor to use (or {@code null}, but just if no default executor is available)
*/
@Nullable
protected AsyncTaskExecutor determineAsyncExecutor(Method method) {
AsyncTaskExecutor executor = this.executors.get(method);
if (executor == null) {
Executor targetExecutor;
String qualifier = getExecutorQualifier(method);
if (StringUtils.hasLength(qualifier)) {
targetExecutor = findQualifiedExecutor(this.beanFactory, qualifier);
}
else {
targetExecutor = this.defaultExecutor.get();
}
if (targetExecutor == null) {
return null;
}
executor = (targetExecutor instanceof AsyncListenableTaskExecutor ?
(AsyncListenableTaskExecutor) targetExecutor : new TaskExecutorAdapter(targetExecutor));
this.executors.put(method, executor);
}
return executor;
}
/**
* Delegate for actually executing the given task with the chosen executor.
* @param task the task to execute
* @param executor the chosen executor
* @param returnType the declared return type (potentially a {@link Future} variant)
* @return the execution result (potentially a corresponding {@link Future} handle)
*/
@Nullable
protected Object doSubmit(Callable<Object> task, AsyncTaskExecutor executor, Class<?> returnType) {
if (CompletableFuture.class.isAssignableFrom(returnType)) {
return CompletableFuture.supplyAsync(() -> {
try {
return task.call();
}
catch (Throwable ex) {
throw new CompletionException(ex);
}
}, executor);
}
else if (ListenableFuture.class.isAssignableFrom(returnType)) {
return ((AsyncListenableTaskExecutor) executor).submitListenable(task);
}
else if (Future.class.isAssignableFrom(returnType)) {
return executor.submit(task);
}
else {
executor.submit(task);
return null;
}
}
/**
* Determine the specific executor to use when executing the given method.
* Should preferably return an {@link AsyncListenableTaskExecutor} implementation.
* @return the executor to use (or {@code null}, but just if no default executor is available)
*/
@Nullable
protected AsyncTaskExecutor determineAsyncExecutor(Method method) {
AsyncTaskExecutor executor = this.executors.get(method);
if (executor == null) {
Executor targetExecutor;
String qualifier = getExecutorQualifier(method);
if (StringUtils.hasLength(qualifier)) {
targetExecutor = findQualifiedExecutor(this.beanFactory, qualifier);
}
else {
targetExecutor = this.defaultExecutor.get();
}
if (targetExecutor == null) {
return null;
}
executor = (targetExecutor instanceof AsyncListenableTaskExecutor ?
(AsyncListenableTaskExecutor) targetExecutor : new TaskExecutorAdapter(targetExecutor));
this.executors.put(method, executor);
}
return executor;
}
/**
* Delegate for actually executing the given task with the chosen executor.
* @param task the task to execute
* @param executor the chosen executor
* @param returnType the declared return type (potentially a {@link Future} variant)
* @return the execution result (potentially a corresponding {@link Future} handle)
*/
@Nullable
protected Object doSubmit(Callable<Object> task, AsyncTaskExecutor executor, Class<?> returnType) {
if (CompletableFuture.class.isAssignableFrom(returnType)) {
return CompletableFuture.supplyAsync(() -> {
try {
return task.call();
}
catch (Throwable ex) {
throw new CompletionException(ex);
}
}, executor);
}
else if (ListenableFuture.class.isAssignableFrom(returnType)) {
return ((AsyncListenableTaskExecutor) executor).submitListenable(task);
}
else if (Future.class.isAssignableFrom(returnType)) {
return executor.submit(task);
}
else {
executor.submit(task);
return null;
}
}
@Test
public void postProcessAfterInitialization() throws Exception {
assertThat(processor.postProcessAfterInitialization(mock(Executor.class), toBeExcluded).getClass(),
not(equalTo(ContextAwareExecutor.class)));
//concurrent
assertThat(processor.postProcessAfterInitialization(mock(Executor.class), beanName).getClass(),
equalTo(ContextAwareExecutor.class));
assertThat(processor.postProcessAfterInitialization(mock(ExecutorService.class), beanName).getClass(),
equalTo(ContextAwareExecutorService.class));
assertThat(processor.postProcessAfterInitialization(mock(ScheduledExecutorService.class), beanName).getClass(),
equalTo(ContextAwareScheduledExecutorService.class));
//spring
assertThat(processor.postProcessAfterInitialization(mock(TaskScheduler.class), beanName).getClass(),
equalTo(ContextAwareTaskScheduler.class));
assertThat(processor.postProcessAfterInitialization(new ThreadPoolTaskExecutor(), beanName).getClass(),
equalTo(ContextAwareThreadPoolTaskExecutor.class));
assertThat(processor.postProcessAfterInitialization(new ThreadPoolTaskScheduler(), beanName).getClass(),
equalTo(ContextAwareThreadPoolTaskScheduler.class));
assertThat(processor.postProcessAfterInitialization(mock(AsyncListenableTaskExecutor.class), beanName).getClass(),
equalTo(ContextAwareAsyncListenableTaskExecutor.class));
assertThat(processor.postProcessAfterInitialization(mock(AsyncTaskExecutor.class), beanName).getClass(),
equalTo(ContextAwareAsyncTaskExecutor.class));
assertThat(processor.postProcessAfterInitialization(mock(SchedulingTaskExecutor.class), beanName).getClass(),
equalTo(ContextAwareSchedulingTaskExecutor.class));
}
/**
* Delegate for actually executing the given task with the chosen executor.
* @param task the task to execute
* @param executor the chosen executor
* @param returnType the declared return type (potentially a {@link Future} variant)
* @return the execution result (potentially a corresponding {@link Future} handle)
*/
protected Object doSubmit(Callable<Object> task, AsyncTaskExecutor executor, Class<?> returnType) {
if (completableFuturePresent) {
Future<Object> result = CompletableFutureDelegate.processCompletableFuture(returnType, task, executor);
if (result != null) {
return result;
}
}
if (ListenableFuture.class.isAssignableFrom(returnType)) {
return ((AsyncListenableTaskExecutor) executor).submitListenable(task);
}
else if (Future.class.isAssignableFrom(returnType)) {
return executor.submit(task);
}
else {
executor.submit(task);
return null;
}
}
/**
* Determine the specific executor to use when executing the given method.
* Should preferably return an {@link AsyncListenableTaskExecutor} implementation.
* @return the executor to use (or {@code null}, but just if no default executor has been set)
*/
protected AsyncTaskExecutor determineAsyncExecutor(Method method) {
AsyncTaskExecutor executor = this.executors.get(method);
if (executor == null) {
Executor executorToUse = this.defaultExecutor;
String qualifier = getExecutorQualifier(method);
if (StringUtils.hasLength(qualifier)) {
if (this.beanFactory == null) {
throw new IllegalStateException("BeanFactory must be set on " + getClass().getSimpleName() +
" to access qualified executor '" + qualifier + "'");
}
executorToUse = BeanFactoryAnnotationUtils.qualifiedBeanOfType(
this.beanFactory, Executor.class, qualifier);
}
else if (executorToUse == null) {
return null;
}
executor = (executorToUse instanceof AsyncListenableTaskExecutor ?
(AsyncListenableTaskExecutor) executorToUse : new TaskExecutorAdapter(executorToUse));
this.executors.put(method, executor);
}
return executor;
}
/**
* Delegate for actually executing the given task with the chosen executor.
* @param task the task to execute
* @param executor the chosen executor
* @param returnType the declared return type (potentially a {@link Future} variant)
* @return the execution result (potentially a corresponding {@link Future} handle)
*/
protected Object doSubmit(Callable<Object> task, AsyncTaskExecutor executor, Class<?> returnType) {
if (completableFuturePresent) {
Future<Object> result = CompletableFutureDelegate.processCompletableFuture(returnType, task, executor);
if (result != null) {
return result;
}
}
if (ListenableFuture.class.isAssignableFrom(returnType)) {
return ((AsyncListenableTaskExecutor) executor).submitListenable(task);
}
else if (Future.class.isAssignableFrom(returnType)) {
return executor.submit(task);
}
else {
executor.submit(task);
return null;
}
}
public DefaultSchedulingManager( JobConfigurationService jobConfigurationService, MessageService messageService,
LeaderManager leaderManager, @Qualifier( "taskScheduler" ) TaskScheduler jobScheduler,
@Qualifier( "taskScheduler" ) AsyncListenableTaskExecutor jobExecutor, ApplicationContext applicationContext )
{
checkNotNull( jobConfigurationService );
checkNotNull( messageService );
checkNotNull( leaderManager );
checkNotNull( jobScheduler );
checkNotNull( jobExecutor );
checkNotNull( applicationContext );
this.jobConfigurationService = jobConfigurationService;
this.messageService = messageService;
this.leaderManager = leaderManager;
this.jobScheduler = jobScheduler;
this.jobExecutor = jobExecutor;
this.applicationContext = applicationContext;
}
@Override
protected AsyncListenableTaskExecutor buildExecutor() {
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
executor.setTaskDecorator(runnable ->
new DelegatingErrorHandlingRunnable(runnable, TaskUtils.LOG_AND_PROPAGATE_ERROR_HANDLER));
executor.setThreadNamePrefix(THREAD_NAME_PREFIX);
executor.setMaxPoolSize(1);
executor.afterPropertiesSet();
return executor;
}
@Override
protected AsyncListenableTaskExecutor buildExecutor() {
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
executor.setThreadNamePrefix(THREAD_NAME_PREFIX);
executor.setMaxPoolSize(1);
executor.afterPropertiesSet();
return executor;
}
/**
* Create a new instance of the {@code AsyncRestTemplate} using the given
* {@link AsyncTaskExecutor}.
* <p>This constructor uses a {@link SimpleClientHttpRequestFactory} in combination
* with the given {@code AsyncTaskExecutor} for asynchronous execution.
*/
public AsyncRestTemplate(AsyncListenableTaskExecutor taskExecutor) {
Assert.notNull(taskExecutor, "AsyncTaskExecutor must not be null");
SimpleClientHttpRequestFactory requestFactory = new SimpleClientHttpRequestFactory();
requestFactory.setTaskExecutor(taskExecutor);
this.syncTemplate = new RestTemplate(requestFactory);
setAsyncRequestFactory(requestFactory);
}
SimpleStreamingAsyncClientHttpRequest(HttpURLConnection connection, int chunkSize,
boolean outputStreaming, AsyncListenableTaskExecutor taskExecutor) {
this.connection = connection;
this.chunkSize = chunkSize;
this.outputStreaming = outputStreaming;
this.taskExecutor = taskExecutor;
}
SimpleBufferingAsyncClientHttpRequest(HttpURLConnection connection,
boolean outputStreaming, AsyncListenableTaskExecutor taskExecutor) {
this.connection = connection;
this.outputStreaming = outputStreaming;
this.taskExecutor = taskExecutor;
}
@SuppressWarnings("deprecation")
@Override
protected AsyncClientHttpRequestFactory createRequestFactory() {
SimpleClientHttpRequestFactory requestFactory = new SimpleClientHttpRequestFactory();
AsyncListenableTaskExecutor taskExecutor = new SimpleAsyncTaskExecutor();
requestFactory.setTaskExecutor(taskExecutor);
return requestFactory;
}
@Override
protected AsyncListenableTaskExecutor buildExecutor() {
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
executor.setTaskDecorator(runnable ->
new DelegatingErrorHandlingRunnable(runnable, TaskUtils.LOG_AND_PROPAGATE_ERROR_HANDLER));
executor.setThreadNamePrefix(THREAD_NAME_PREFIX);
executor.setMaxPoolSize(1);
executor.afterPropertiesSet();
return executor;
}
@Override
protected AsyncListenableTaskExecutor buildExecutor() {
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
executor.setThreadNamePrefix(THREAD_NAME_PREFIX);
executor.setMaxPoolSize(1);
executor.afterPropertiesSet();
return executor;
}
/**
* Create a new instance of the {@code AsyncRestTemplate} using the given
* {@link AsyncTaskExecutor}.
* <p>This constructor uses a {@link SimpleClientHttpRequestFactory} in combination
* with the given {@code AsyncTaskExecutor} for asynchronous execution.
*/
public AsyncRestTemplate(AsyncListenableTaskExecutor taskExecutor) {
Assert.notNull(taskExecutor, "AsyncTaskExecutor must not be null");
SimpleClientHttpRequestFactory requestFactory = new SimpleClientHttpRequestFactory();
requestFactory.setTaskExecutor(taskExecutor);
this.syncTemplate = new RestTemplate(requestFactory);
setAsyncRequestFactory(requestFactory);
}
SimpleStreamingAsyncClientHttpRequest(HttpURLConnection connection, int chunkSize,
boolean outputStreaming, AsyncListenableTaskExecutor taskExecutor) {
this.connection = connection;
this.chunkSize = chunkSize;
this.outputStreaming = outputStreaming;
this.taskExecutor = taskExecutor;
}
SimpleBufferingAsyncClientHttpRequest(HttpURLConnection connection,
boolean outputStreaming, AsyncListenableTaskExecutor taskExecutor) {
this.connection = connection;
this.outputStreaming = outputStreaming;
this.taskExecutor = taskExecutor;
}
@SuppressWarnings("deprecation")
@Override
protected AsyncClientHttpRequestFactory createRequestFactory() {
SimpleClientHttpRequestFactory requestFactory = new SimpleClientHttpRequestFactory();
AsyncListenableTaskExecutor taskExecutor = new SimpleAsyncTaskExecutor();
requestFactory.setTaskExecutor(taskExecutor);
return requestFactory;
}
/**
* {@inheritDoc}
*/
@Override
public Object postProcessAfterInitialization(Object bean, String beanName) {
if (bean instanceof Executor || bean instanceof TaskScheduler) {
if (properties.getExecutor().accept(beanName)) {
if (bean instanceof AsyncListenableTaskExecutor && bean instanceof SchedulingTaskExecutor && bean instanceof TaskScheduler) {
log.info("Context propagation enabled for ~ThreadPoolTaskScheduler [{}]:[{}].", beanName, bean.getClass().getName());
return new ContextAwareThreadPoolTaskScheduler((AsyncListenableTaskExecutor) bean, (SchedulingTaskExecutor) bean, (TaskScheduler) bean);
} else if (bean instanceof AsyncListenableTaskExecutor && bean instanceof SchedulingTaskExecutor) {
log.info("Context propagation enabled for ~ThreadPoolTaskExecutor [{}]:[{}].", beanName, bean.getClass().getName());
return new ContextAwareThreadPoolTaskExecutor((AsyncListenableTaskExecutor) bean, (SchedulingTaskExecutor) bean);
} else if (bean instanceof TaskScheduler) {
log.info("Context propagation enabled for TaskScheduler [{}]:[{}].", beanName, bean.getClass().getName());
return new ContextAwareTaskScheduler((TaskScheduler) bean);
} else if (bean instanceof SchedulingTaskExecutor) {
log.info("Context propagation enabled for SchedulingTaskExecutor [{}]:[{}].", beanName, bean.getClass().getName());
return new ContextAwareSchedulingTaskExecutor((SchedulingTaskExecutor) bean);
} else if (bean instanceof AsyncListenableTaskExecutor) {
log.info("Context propagation enabled for AsyncListenableTaskExecutor [{}]:[{}].", beanName, bean.getClass().getName());
return new ContextAwareAsyncListenableTaskExecutor((AsyncListenableTaskExecutor) bean);
} else if (bean instanceof AsyncTaskExecutor) {
log.info("Context propagation enabled for AsyncTaskExecutor [{}]:[{}].", beanName, bean.getClass().getName());
return new ContextAwareAsyncTaskExecutor((AsyncTaskExecutor) bean);
} else if (bean instanceof ScheduledExecutorService) {
log.info("Context propagation enabled for ScheduledExecutorService [{}]:[{}].", beanName, bean.getClass().getName());
return new ContextAwareScheduledExecutorService((ScheduledExecutorService) bean);
} else if (bean instanceof ExecutorService) {
log.info("Context propagation enabled for ExecutorService [{}]:[{}].", beanName, bean.getClass().getName());
return new ContextAwareExecutorService((ExecutorService) bean);
} else {
log.info("Context propagation enabled for Executor [{}]:[{}].", beanName, bean.getClass().getName());
return new ContextAwareExecutor((Executor) bean);
}
} else {
log.debug("Context propagation disabled for Executor [{}]", beanName);
}
}
return bean;
}
/**
* Create a new instance of the {@code AsyncRestTemplate} using the given
* {@link AsyncTaskExecutor}.
* <p>This constructor uses a {@link SimpleClientHttpRequestFactory} in combination
* with the given {@code AsyncTaskExecutor} for asynchronous execution.
*/
public AsyncRestTemplate(AsyncListenableTaskExecutor taskExecutor) {
Assert.notNull(taskExecutor, "AsyncTaskExecutor must not be null");
SimpleClientHttpRequestFactory requestFactory = new SimpleClientHttpRequestFactory();
requestFactory.setTaskExecutor(taskExecutor);
this.syncTemplate = new RestTemplate(requestFactory);
setAsyncRequestFactory(requestFactory);
}
SimpleStreamingAsyncClientHttpRequest(HttpURLConnection connection, int chunkSize,
boolean outputStreaming, AsyncListenableTaskExecutor taskExecutor) {
this.connection = connection;
this.chunkSize = chunkSize;
this.outputStreaming = outputStreaming;
this.taskExecutor = taskExecutor;
}
SimpleBufferingAsyncClientHttpRequest(HttpURLConnection connection,
boolean outputStreaming, AsyncListenableTaskExecutor taskExecutor) {
this.connection = connection;
this.outputStreaming = outputStreaming;
this.taskExecutor = taskExecutor;
}
/**
* Determine the specific executor to use when executing the given method.
* Should preferably return an {@link AsyncListenableTaskExecutor} implementation.
* @return the executor to use (or {@code null}, but just if no default executor is available)
*/
protected AsyncTaskExecutor determineAsyncExecutor(Method method) {
AsyncTaskExecutor executor = this.executors.get(method);
if (executor == null) {
Executor targetExecutor;
String qualifier = getExecutorQualifier(method);
if (StringUtils.hasLength(qualifier)) {
targetExecutor = findQualifiedExecutor(this.beanFactory, qualifier);
}
else {
targetExecutor = this.defaultExecutor;
if (targetExecutor == null) {
synchronized (this.executors) {
if (this.defaultExecutor == null) {
this.defaultExecutor = getDefaultExecutor(this.beanFactory);
}
targetExecutor = this.defaultExecutor;
}
}
}
if (targetExecutor == null) {
return null;
}
executor = (targetExecutor instanceof AsyncListenableTaskExecutor ?
(AsyncListenableTaskExecutor) targetExecutor : new TaskExecutorAdapter(targetExecutor));
this.executors.put(method, executor);
}
return executor;
}
/**
* Create a new instance of the {@code AsyncRestTemplate} using the given
* {@link AsyncTaskExecutor}.
* <p>This constructor uses a {@link SimpleClientHttpRequestFactory} in combination
* with the given {@code AsyncTaskExecutor} for asynchronous execution.
*/
public AsyncRestTemplate(AsyncListenableTaskExecutor taskExecutor) {
Assert.notNull(taskExecutor, "AsyncTaskExecutor must not be null");
SimpleClientHttpRequestFactory requestFactory = new SimpleClientHttpRequestFactory();
requestFactory.setTaskExecutor(taskExecutor);
this.syncTemplate = new RestTemplate(requestFactory);
setAsyncRequestFactory(requestFactory);
}
SimpleStreamingAsyncClientHttpRequest(HttpURLConnection connection, int chunkSize,
boolean outputStreaming, AsyncListenableTaskExecutor taskExecutor) {
this.connection = connection;
this.chunkSize = chunkSize;
this.outputStreaming = outputStreaming;
this.taskExecutor = taskExecutor;
}
SimpleBufferingAsyncClientHttpRequest(HttpURLConnection connection,
boolean outputStreaming, AsyncListenableTaskExecutor taskExecutor) {
this.connection = connection;
this.outputStreaming = outputStreaming;
this.taskExecutor = taskExecutor;
}
@Override
protected AsyncClientHttpRequestFactory createRequestFactory() {
SimpleClientHttpRequestFactory requestFactory = new SimpleClientHttpRequestFactory();
AsyncListenableTaskExecutor taskExecutor = new SimpleAsyncTaskExecutor();
requestFactory.setTaskExecutor(taskExecutor);
return requestFactory;
}