下面列出了java.util.concurrent.ThreadPoolExecutor.AbortPolicy#org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor 实例代码,或者点击链接到github查看源代码,也可以在右侧发表评论。
@Override
public Executor getAsyncExecutor() {
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
//核心线程池大小
executor.setCorePoolSize(config.getCorePoolSize());
//最大线程数
executor.setMaxPoolSize(config.getMaxPoolSize());
//队列容量
executor.setQueueCapacity(config.getQueueCapacity());
//活跃时间
executor.setKeepAliveSeconds(config.getKeepAliveSeconds());
//线程名字前缀
executor.setThreadNamePrefix("sk-async-");
// setRejectedExecutionHandler:当pool已经达到max size的时候,如何处理新任务
// CallerRunsPolicy:不在新线程中执行任务,而是由调用者所在的线程来执行
executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
executor.initialize();
return executor;
}
@Test
public void clientOutboundChannelCustomized() {
ApplicationContext context = loadConfig(CustomConfig.class);
AbstractSubscribableChannel channel = context.getBean(
"clientOutboundChannel", AbstractSubscribableChannel.class);
assertEquals(4, channel.getInterceptors().size());
ThreadPoolTaskExecutor taskExecutor = context.getBean(
"clientOutboundChannelExecutor", ThreadPoolTaskExecutor.class);
assertEquals(21, taskExecutor.getCorePoolSize());
assertEquals(22, taskExecutor.getMaxPoolSize());
assertEquals(23, taskExecutor.getKeepAliveSeconds());
SimpleBrokerMessageHandler broker =
context.getBean("simpleBrokerMessageHandler", SimpleBrokerMessageHandler.class);
assertTrue(broker.isPreservePublishOrder());
}
@Test
public void postProcessAfterInitialization() throws Exception {
assertThat(processor.postProcessAfterInitialization(mock(Executor.class), toBeExcluded).getClass(),
not(equalTo(ContextAwareExecutor.class)));
//concurrent
assertThat(processor.postProcessAfterInitialization(mock(Executor.class), beanName).getClass(),
equalTo(ContextAwareExecutor.class));
assertThat(processor.postProcessAfterInitialization(mock(ExecutorService.class), beanName).getClass(),
equalTo(ContextAwareExecutorService.class));
assertThat(processor.postProcessAfterInitialization(mock(ScheduledExecutorService.class), beanName).getClass(),
equalTo(ContextAwareScheduledExecutorService.class));
//spring
assertThat(processor.postProcessAfterInitialization(mock(TaskScheduler.class), beanName).getClass(),
equalTo(ContextAwareTaskScheduler.class));
assertThat(processor.postProcessAfterInitialization(new ThreadPoolTaskExecutor(), beanName).getClass(),
equalTo(ContextAwareThreadPoolTaskExecutor.class));
assertThat(processor.postProcessAfterInitialization(new ThreadPoolTaskScheduler(), beanName).getClass(),
equalTo(ContextAwareThreadPoolTaskScheduler.class));
assertThat(processor.postProcessAfterInitialization(mock(AsyncListenableTaskExecutor.class), beanName).getClass(),
equalTo(ContextAwareAsyncListenableTaskExecutor.class));
assertThat(processor.postProcessAfterInitialization(mock(AsyncTaskExecutor.class), beanName).getClass(),
equalTo(ContextAwareAsyncTaskExecutor.class));
assertThat(processor.postProcessAfterInitialization(mock(SchedulingTaskExecutor.class), beanName).getClass(),
equalTo(ContextAwareSchedulingTaskExecutor.class));
}
@Test
public void defaultExecutor() throws Exception {
ThreadPoolTaskExecutor executor = this.context.getBean("default", ThreadPoolTaskExecutor.class);
assertEquals(1, getCorePoolSize(executor));
assertEquals(Integer.MAX_VALUE, getMaxPoolSize(executor));
assertEquals(Integer.MAX_VALUE, getQueueCapacity(executor));
assertEquals(60, getKeepAliveSeconds(executor));
assertEquals(false, getAllowCoreThreadTimeOut(executor));
FutureTask<String> task = new FutureTask<>(new Callable<String>() {
@Override
public String call() throws Exception {
return "foo";
}
});
executor.execute(task);
assertEquals("foo", task.get());
}
@Bean("asynExecutor")
public Executor AsynExecutor() {
/**
* setCorePoolSize核心线程数10:线程池创建时候初始化的线程数
* setMaxPoolSize最大线程数20:线程池最大的线程数,只有在缓冲队列满了之后才会申请超过核心线程数的线程
* setQueueCapacity缓冲队列200:用来缓冲执行任务的队列
* setKeepAliveSeconds允许线程的空闲时间60秒:当超过了核心线程出之外的线程在空闲时间到达之后会被销毁
* setThreadNamePrefix线程池名的前缀:设置好了之后可以方便我们定位处理任务所在的线程池
* setRejectedExecutionHandler线程池对拒绝任务的处理策略:这里采用了CallerRunsPolicy策略,当线程池没有处理能力的时候,该策略会直接在 execute 方法的调用线程中运行被拒绝的任务;如果执行程序已关闭,则会丢弃该任务
*/
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
executor.setCorePoolSize(10);
executor.setMaxPoolSize(20);
executor.setQueueCapacity(200);
executor.setKeepAliveSeconds(60);
executor.setThreadNamePrefix("AsynTaskExecutor-");
executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
// 执行初始化
executor.initialize();
return executor;
}
@Test
public void taskExecutorByBeanType() {
StaticApplicationContext context = new StaticApplicationContext();
BeanDefinition processorDefinition = new RootBeanDefinition(AsyncAnnotationBeanPostProcessor.class);
context.registerBeanDefinition("postProcessor", processorDefinition);
BeanDefinition executorDefinition = new RootBeanDefinition(ThreadPoolTaskExecutor.class);
executorDefinition.getPropertyValues().add("threadNamePrefix", "testExecutor");
context.registerBeanDefinition("myExecutor", executorDefinition);
BeanDefinition targetDefinition =
new RootBeanDefinition(AsyncAnnotationBeanPostProcessorTests.TestBean.class);
context.registerBeanDefinition("target", targetDefinition);
context.refresh();
ITestBean testBean = context.getBean("target", ITestBean.class);
testBean.test();
testBean.await(3000);
Thread asyncThread = testBean.getThread();
assertTrue(asyncThread.getName().startsWith("testExecutor"));
context.close();
}
/**
* 作业平台使用的线程池
* @return
*/
@Bean(name = "uKeFuTaskExecutor")
public ThreadPoolTaskExecutor taskExecutor() {
ThreadPoolTaskExecutor poolTaskExecutor = new ThreadPoolTaskExecutor();
// 线程池维护线程的最少数量
poolTaskExecutor.setCorePoolSize(CORE_POOL_SIZE);
// 线程池维护线程的最大数量
poolTaskExecutor.setMaxPoolSize(MAX_POOL_SIZE);
// 线程池所使用的缓冲队列
poolTaskExecutor.setQueueCapacity(200);
// 线程池维护线程所允许的空闲时间
poolTaskExecutor.setKeepAliveSeconds(30);
poolTaskExecutor.setWaitForTasksToCompleteOnShutdown(true);
poolTaskExecutor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
return poolTaskExecutor;
}
@Bean
public TaskExecutor taskExecutor() {
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
// 设置核心线程数
executor.setCorePoolSize(10);
// 设置最大线程数
executor.setMaxPoolSize(15);
// 设置队列容量
executor.setQueueCapacity(20);
// 设置线程活跃时间(秒)
executor.setKeepAliveSeconds(60);
// 设置默认线程名称
executor.setThreadNamePrefix("batch-task-running-");
// 设置拒绝策略
executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
// 等待所有任务结束后再关闭线程池
executor.setWaitForTasksToCompleteOnShutdown(true);
return executor;
}
@Test
public void threadNamePrefix() {
BeanDefinition processorDefinition = new RootBeanDefinition(AsyncAnnotationBeanPostProcessor.class);
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
executor.setThreadNamePrefix("testExecutor");
executor.afterPropertiesSet();
processorDefinition.getPropertyValues().add("executor", executor);
ConfigurableApplicationContext context = initContext(processorDefinition);
ITestBean testBean = context.getBean("target", ITestBean.class);
testBean.test();
testBean.await(3000);
Thread asyncThread = testBean.getThread();
assertTrue(asyncThread.getName().startsWith("testExecutor"));
context.close();
}
public static NettyBootstrap build(
P2PConfig p2pConfig,
ThreadPoolTaskExecutor threadPool,
MessageCallBack messageCallBack) {
System.out.println("Initializing NettyBootstrap ...");
ChannelHandlerCallBack channelHandlerCallBack = new ChannelHandlerCallBack();
channelHandlerCallBack.setThreadPool(threadPool);
channelHandlerCallBack.setCallBack(messageCallBack);
NettyBootstrap bootstrap = new NettyBootstrap();
bootstrap.setConfig(p2pConfig);
bootstrap.setChannelHandlerCallBack(channelHandlerCallBack);
return bootstrap;
}
@Test
public void threadPoolSizeDefault() {
String name = "clientInboundChannelExecutor";
ThreadPoolTaskExecutor executor = this.defaultContext.getBean(name, ThreadPoolTaskExecutor.class);
assertEquals(Runtime.getRuntime().availableProcessors() * 2, executor.getCorePoolSize());
// No way to verify queue capacity
name = "clientOutboundChannelExecutor";
executor = this.defaultContext.getBean(name, ThreadPoolTaskExecutor.class);
assertEquals(Runtime.getRuntime().availableProcessors() * 2, executor.getCorePoolSize());
name = "brokerChannelExecutor";
executor = this.defaultContext.getBean(name, ThreadPoolTaskExecutor.class);
assertEquals(0, executor.getCorePoolSize());
assertEquals(1, executor.getMaxPoolSize());
}
@Bean
public P2PService newP2PService() {
ThreadPoolTaskExecutor threadPool =
ThreadPoolTaskExecutorFactory.build(
p2pConfig.getThreadNum(),
p2pConfig.getThreadQueueCapacity(),
"p2p-callback");
SeqMapper seqMapper = SeqMapperFactory.build();
MessageCallBack messageCallback =
MessageCallbackFactory.build(seqMapper, peerManager, zoneManager);
NettyBootstrap nettyBootstrap =
NettyBootstrapFactory.build(p2pConfig, threadPool, messageCallback);
NettyService nettyService =
NettyServiceFactory.build(seqMapper, threadPool, nettyBootstrap);
P2PService p2PService = new P2PService();
p2PService.setNettyService(nettyService);
return p2PService;
}
@Bean("asyncExecutor")
public Executor getAsyncExecutor() {
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
executor.setCorePoolSize(configuration.getFlowableJobExecutorCoreThreads());
executor.setMaxPoolSize(configuration.getFlowableJobExecutorMaxThreads());
executor.setQueueCapacity(configuration.getFlowableJobExecutorQueueCapacity());
executor.initialize();
return executor;
}
/**
* pull block and trans from chain async
* @return ThreadPoolTaskExecutor
*/
@Bean
public ThreadPoolTaskExecutor mgrAsyncExecutor() {
log.info("start mgrAsyncExecutor init..");
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
executor.setCorePoolSize(executorProperties.getCorePoolSize());
executor.setMaxPoolSize(executorProperties.getMaxPoolSize());
executor.setQueueCapacity(executorProperties.getQueueSize());
executor.setThreadNamePrefix(executorProperties.getThreadNamePrefix());
executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
// init executor
executor.initialize();
return executor;
}
@Bean
public ThreadPoolTaskExecutor clientInboundChannelExecutor() {
TaskExecutorRegistration reg = getClientInboundChannelRegistration().getOrCreateTaskExecRegistration();
ThreadPoolTaskExecutor executor = reg.getTaskExecutor();
executor.setThreadNamePrefix("clientInboundChannel-");
return executor;
}
/**
* Activiti's dedicated TaskExecutor bean definition.
*
* @return TaskExecutor
*/
@Bean
public TaskExecutor activitiTaskExecutor()
{
ThreadPoolTaskExecutor taskExecutor = new ThreadPoolTaskExecutor();
taskExecutor.setCorePoolSize(configurationHelper.getProperty(ConfigurationValue.ACTIVITI_THREAD_POOL_CORE_POOL_SIZE, Integer.class));
taskExecutor.setMaxPoolSize(configurationHelper.getProperty(ConfigurationValue.ACTIVITI_THREAD_POOL_MAX_POOL_SIZE, Integer.class));
taskExecutor.setKeepAliveSeconds(configurationHelper.getProperty(ConfigurationValue.ACTIVITI_THREAD_POOL_KEEP_ALIVE_SECS, Integer.class));
taskExecutor.setQueueCapacity(configurationHelper.getProperty(ConfigurationValue.ACTIVITI_THREAD_POOL_QUEUE_CAPACITY, Integer.class));
return taskExecutor;
}
@Override
@Bean(name = "taskExecutor")
public Executor getAsyncExecutor() {
log.debug("Creating Async Task Executor");
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
executor.setCorePoolSize(jHipsterProperties.getAsync().getCorePoolSize());
executor.setMaxPoolSize(jHipsterProperties.getAsync().getMaxPoolSize());
executor.setQueueCapacity(jHipsterProperties.getAsync().getQueueCapacity());
executor.setThreadNamePrefix("blog-Executor-");
return new ExceptionHandlingAsyncTaskExecutor(executor);
}
@Bean
public ThreadPoolTaskExecutor taskExecutor() {
ThreadPoolTaskExecutor singleThreadPool = new ThreadPoolTaskExecutor();
singleThreadPool.setCorePoolSize(20);
singleThreadPool.setMaxPoolSize(20);
singleThreadPool.setQueueCapacity(20);
singleThreadPool.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
singleThreadPool.initialize();
return singleThreadPool;
}
@Override
@Bean(name = "taskExecutor")
public Executor getAsyncExecutor() {
log.debug("Creating Async Task Executor");
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
executor.setCorePoolSize(jHipsterProperties.getAsync().getCorePoolSize());
executor.setMaxPoolSize(jHipsterProperties.getAsync().getMaxPoolSize());
executor.setQueueCapacity(jHipsterProperties.getAsync().getQueueCapacity());
executor.setThreadNamePrefix("uaa-Executor-");
return new ExceptionHandlingAsyncTaskExecutor(executor);
}
@Bean
public Executor taskExecutor() {
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
executor.setCorePoolSize(corePoolSize);
executor.setMaxPoolSize(maxPoolSize);
executor.setQueueCapacity(queueCapacity);
executor.initialize();
return executor;
}
@Test
public void setTaskExecutor() {
assertNull(container.getTaskExecutor());
ThreadPoolTaskExecutor asyncTaskExecutor = new ThreadPoolTaskExecutor();
asyncTaskExecutor.setThreadNamePrefix("testExecutor");
container.setTaskExecutor(asyncTaskExecutor);
assertEquals(
"testExecutor",
((ThreadPoolTaskExecutor) container.getTaskExecutor()).getThreadNamePrefix());
}
@Bean
public ThreadPoolTaskExecutor clientInboundChannelExecutor() {
TaskExecutorRegistration reg = getClientInboundChannelRegistration().taskExecutor();
ThreadPoolTaskExecutor executor = reg.getTaskExecutor();
executor.setThreadNamePrefix("clientInboundChannel-");
return executor;
}
@Bean
public AsyncTaskExecutor resourceBuilderExecutor() {
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
executor.setCorePoolSize(corePoolSize);
executor.setQueueCapacity(queueCapacity);
executor.setThreadNamePrefix("resourceBuilderExecutor-");
executor.setTaskDecorator(new MDCCleanerTaskDecorator());
executor.initialize();
return executor;
}
protected ThreadPoolTaskExecutor getTaskExecutor() {
if (this.corePoolSize != null) {
this.taskExecutor.setCorePoolSize(this.corePoolSize);
}
if (this.maxPoolSize != null) {
this.taskExecutor.setMaxPoolSize(this.maxPoolSize);
}
if (this.keepAliveSeconds != null) {
this.taskExecutor.setKeepAliveSeconds(this.keepAliveSeconds);
}
if (this.queueCapacity != null) {
this.taskExecutor.setQueueCapacity(this.queueCapacity);
}
return this.taskExecutor;
}
@Bean (name = "taskExecutor")
public Executor taskExecutor() {
LOGGER.debug("Creating Async Task Executor");
final ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
executor.setCorePoolSize(2);
executor.setMaxPoolSize(2);
executor.setQueueCapacity(25);
executor.setThreadNamePrefix("CarThread-");
executor.initialize();
return executor;
}
@Override
@Bean(name = "taskExecutor")
public Executor getAsyncExecutor() {
log.debug("Creating Async Task Executor");
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
executor.setCorePoolSize(jHipsterProperties.getAsync().getCorePoolSize());
executor.setMaxPoolSize(jHipsterProperties.getAsync().getMaxPoolSize());
executor.setQueueCapacity(jHipsterProperties.getAsync().getQueueCapacity());
executor.setThreadNamePrefix("store-Executor-");
return new ExceptionHandlingAsyncTaskExecutor(executor);
}
@Override
@Bean(name = "taskExecutor")
public Executor getAsyncExecutor() {
log.debug("Creating Async Task Executor");
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
executor.setCorePoolSize(jHipsterProperties.getAsync().getCorePoolSize());
executor.setMaxPoolSize(jHipsterProperties.getAsync().getMaxPoolSize());
executor.setQueueCapacity(jHipsterProperties.getAsync().getQueueCapacity());
executor.setThreadNamePrefix("gateway-Executor-");
return new ExceptionHandlingAsyncTaskExecutor(executor);
}
@Bean(ASYNC_TASK_BEAN_NAME)
@ConditionalOnMissingBean(AsyncTaskExecutor.class)
public AsyncTaskExecutor mvcAsyncTaskExecutor(SimpleTaskDecorator simpleTaskDecorator) {
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
executor.setCorePoolSize(asyncTaskProperties.getCorePoolSize());
executor.setMaxPoolSize(asyncTaskProperties.getMaxPoolSize());
executor.setQueueCapacity(asyncTaskProperties.getQueueCapacity());
executor.setTaskDecorator(simpleTaskDecorator);
return executor;
}
@Bean(name = "taskExecutor")
public ThreadPoolTaskExecutor taskExecutor() {
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
executor.setCorePoolSize(5);
executor.setMaxPoolSize(10);
executor.setQueueCapacity(10);
executor.setDaemon(true);
executor.setThreadNamePrefix("taskExecutor-");
return executor;
}
@Bean("myThreadPool")
public ThreadPoolTaskExecutor threadPoolTaskExecutor(){
ThreadPoolTaskExecutor threadPool = new ThreadPoolTaskExecutor();
threadPool.setCorePoolSize(1);
threadPool.setMaxPoolSize(1);
threadPool.setKeepAliveSeconds(30);
threadPool.setQueueCapacity(1000);
threadPool.setAllowCoreThreadTimeOut(true);
threadPool.setAwaitTerminationSeconds(10);
// 丢弃任务,不抛出异常
threadPool.setRejectedExecutionHandler(new ThreadPoolExecutor.DiscardPolicy());
threadPool.initialize();
return threadPool;
}