下面列出了怎么用org.springframework.core.task.TaskExecutor的API类实例代码及写法,或者点击链接到github查看源代码。
@Bean(name = CAMUNDA_TASK_EXECUTOR_QUALIFIER)
@ConditionalOnMissingBean(name = CAMUNDA_TASK_EXECUTOR_QUALIFIER)
@ConditionalOnProperty(prefix = "camunda.bpm.job-execution", name = "enabled", havingValue = "true", matchIfMissing = true)
public static TaskExecutor camundaTaskExecutor(CamundaBpmProperties properties) {
int corePoolSize = properties.getJobExecution().getCorePoolSize();
int maxPoolSize = properties.getJobExecution().getMaxPoolSize();
int queueCapacity = properties.getJobExecution().getQueueCapacity();
final ThreadPoolTaskExecutor threadPoolTaskExecutor = new ThreadPoolTaskExecutor();
threadPoolTaskExecutor.setCorePoolSize(corePoolSize);
threadPoolTaskExecutor.setMaxPoolSize(maxPoolSize);
threadPoolTaskExecutor.setQueueCapacity(queueCapacity);
Optional.ofNullable(properties.getJobExecution().getKeepAliveSeconds())
.ifPresent(threadPoolTaskExecutor::setKeepAliveSeconds);
LOG.configureJobExecutorPool(corePoolSize, maxPoolSize);
return threadPoolTaskExecutor;
}
/**
* Construct the import director
*
* @param pathToArchiveDirectory path to archive folder.
* @param pathToImportDirectory path to use.
* @param dataDescriptorResolver descriptor resolver
* @param executor async executor
* @param nodeService node service
* @param asyncContextFactory async context factory
* @param systemService system service
* @param zipUtils zip algorithm
* @param federationFacade data federation service
* @param bulkImportServiceBeanMap map of import services
*/
public ImportDirectorImplService(final String pathToArchiveDirectory,
final String pathToImportDirectory,
final DataDescriptorResolver<ImportDescriptor> dataDescriptorResolver,
final TaskExecutor executor,
final NodeService nodeService,
final AsyncContextFactory asyncContextFactory,
final SystemService systemService,
final ZipUtils zipUtils,
final FederationFacade federationFacade,
final Map<String, String> bulkImportServiceBeanMap) {
super(executor);
this.pathToArchiveDirectory = pathToArchiveDirectory;
this.pathToImportDirectory = pathToImportDirectory;
this.dataDescriptorResolver = dataDescriptorResolver;
this.nodeService = nodeService;
this.asyncContextFactory = asyncContextFactory;
this.systemService = systemService;
this.zipUtils = zipUtils;
this.federationFacade = federationFacade;
this.bulkImportServiceBeanMap = bulkImportServiceBeanMap;
}
/**
* Manages kafka consumers running in a background processing thread for websocket consumers.
* @param webKafkaConsumerFactory Factory for creating new Consumers
* @param messagingTemplate messaging template instance for passing websocket messages.
* @param backgroundConsumerExecutor The executor to run our manager in.
* @param appProperties defined app properties.
* @return manager instance for web socket consumers.
*/
@Bean
public WebSocketConsumersManager getWebSocketConsumersManager(
final WebKafkaConsumerFactory webKafkaConsumerFactory,
final SimpMessagingTemplate messagingTemplate,
final TaskExecutor backgroundConsumerExecutor,
final AppProperties appProperties) {
// Create manager
final WebSocketConsumersManager manager = new WebSocketConsumersManager(
webKafkaConsumerFactory,
messagingTemplate,
appProperties.getMaxConcurrentWebSocketConsumers()
);
// Submit to executor service
backgroundConsumerExecutor.execute(manager);
return manager;
}
public void copyActivityFeed( final EntityRef connectingEntity, final EntityRef connectedEntityRef )
throws Exception {
if (logger.isTraceEnabled()) {
logger.trace("Copying activities to feed...");
}
TaskExecutor taskExecutor = ( TaskExecutor ) getApplicationContext().getBean( "taskExecutor" );
taskExecutor.execute( new Runnable() {
@Override
public void run() {
try {
em.copyRelationships( connectedEntityRef, "activities", connectingEntity, "feed" );
}
catch ( Exception e ) {
logger.error( "Error while copying activities into feed", e );
}
}
} );
}
@Test
public void cmmnAndProcessEngineShouldUseDistinctTaskExecutors() {
contextRunner.withUserConfiguration(DedicatedTaskExecutorsConfiguration.class)
.run((context -> {
AsyncExecutor processAsyncExecutor = context.getBean(ProcessEngine.class).getProcessEngineConfiguration().getAsyncExecutor();
AsyncExecutor cmmnAsyncExecutor = context.getBean(CmmnEngine.class).getCmmnEngineConfiguration().getAsyncExecutor();
assertThat(context)
.doesNotHaveBean("taskExecutor")
.hasBean("cmmnTaskExecutor")
.hasBean("processTaskExecutor");
TaskExecutor cmmnTaskExecutorBean = context.getBean("cmmnTaskExecutor", TaskExecutor.class);
TaskExecutor processTaskExecutorBean = context.getBean("processTaskExecutor", TaskExecutor.class);
assertThat(((SpringAsyncExecutor) processAsyncExecutor).getTaskExecutor())
.as("Process Async Task Executor")
.isSameAs(processTaskExecutorBean)
.isNotSameAs(((SpringAsyncExecutor) cmmnAsyncExecutor).getTaskExecutor());
assertThat(((SpringAsyncExecutor) cmmnAsyncExecutor).getTaskExecutor())
.as("Cmmn Async Task Executor")
.isSameAs(cmmnTaskExecutorBean);
}));
}
@Bean
@ConditionalOnMissingBean(JobExecutor.class)
@ConditionalOnProperty(prefix = "camunda.bpm.job-execution", name = "enabled", havingValue = "true", matchIfMissing = true)
public static JobExecutor jobExecutor(@Qualifier(CAMUNDA_TASK_EXECUTOR_QUALIFIER) final TaskExecutor taskExecutor, CamundaBpmProperties properties) {
final SpringJobExecutor springJobExecutor = new SpringJobExecutor();
springJobExecutor.setTaskExecutor(taskExecutor);
springJobExecutor.setRejectedJobsHandler(new NotifyAcquisitionRejectedJobsHandler());
JobExecutionProperty jobExecution = properties.getJobExecution();
Optional.ofNullable(jobExecution.getLockTimeInMillis()).ifPresent(springJobExecutor::setLockTimeInMillis);
Optional.ofNullable(jobExecution.getMaxJobsPerAcquisition()).ifPresent(springJobExecutor::setMaxJobsPerAcquisition);
Optional.ofNullable(jobExecution.getWaitTimeInMillis()).ifPresent(springJobExecutor::setWaitTimeInMillis);
Optional.ofNullable(jobExecution.getMaxWait()).ifPresent(springJobExecutor::setMaxWait);
Optional.ofNullable(jobExecution.getBackoffTimeInMillis()).ifPresent(springJobExecutor::setBackoffTimeInMillis);
Optional.ofNullable(jobExecution.getMaxBackoff()).ifPresent(springJobExecutor::setMaxBackoff);
Optional.ofNullable(jobExecution.getBackoffDecreaseThreshold()).ifPresent(springJobExecutor::setBackoffDecreaseThreshold);
Optional.ofNullable(jobExecution.getWaitIncreaseFactor()).ifPresent(springJobExecutor::setWaitIncreaseFactor);
return springJobExecutor;
}
@Bean
public TaskExecutor taskExecutor() {
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
//配置核心线程数
executor.setCorePoolSize(200);
//配置最大线程数
executor.setMaxPoolSize(1000);
//配置队列大小
executor.setQueueCapacity(400);
//配置线程池中的线程的名称前缀
executor.setThreadNamePrefix("thread-");
// rejection-policy:当pool已经达到max size的时候,如何处理新任务
// CALLER_RUNS:不在新线程中执行任务,而是有调用者所在的线程来执行
executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
//执行初始化
executor.initialize();
return executor;
}
@Bean
public TaskExecutor taskExecutor() {
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
// 设置核心线程数
executor.setCorePoolSize(10);
// 设置最大线程数
executor.setMaxPoolSize(15);
// 设置队列容量
executor.setQueueCapacity(20);
// 设置线程活跃时间(秒)
executor.setKeepAliveSeconds(60);
// 设置默认线程名称
executor.setThreadNamePrefix("batch-task-running-");
// 设置拒绝策略
executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
// 等待所有任务结束后再关闭线程池
executor.setWaitForTasksToCompleteOnShutdown(true);
return executor;
}
public OrderStateChangeListenerAspect(final TaskExecutor taskExecutor,
final MailService mailService,
final MailComposer mailComposer,
final CustomerService customerService,
final CustomerOrderService customerOrderService,
final ShopService shopService,
final ThemeService themeService,
final ProductSkuService productSkuService,
final Map<String, String> shopperTemplates,
final Map<String, String> adminTemplates) {
super(taskExecutor, themeService);
this.mailService = mailService;
this.mailComposer = mailComposer;
this.customerService = customerService;
this.customerOrderService = customerOrderService;
this.shopService = shopService;
this.shopperTemplates = shopperTemplates;
this.adminTemplates = adminTemplates;
this.productSkuService = productSkuService;
}
@Bean("mvcTaskexecutor")
public TaskExecutor getAsyncExecutor() {
ConcurrentTaskExecutor executor = new ConcurrentTaskExecutor(
Executors.newFixedThreadPool(100));
executor.setTaskDecorator(new TaskDecorator() {
@Override
public Runnable decorate (Runnable runnable) {
return () -> {
long t = System.currentTimeMillis();
runnable.run();
System.out.printf("Thread %s has a processing time: %s%n", Thread.currentThread().getName(),
(System.currentTimeMillis() - t));
};
}
});
return executor;
}
@Bean
@Qualifier("compositeSlaveStep")
public Step compositeSlaveStep(
ItemReader<Document> reader,
@Qualifier("compositeItemProcessor") ItemProcessor<Document, Document> processor,
@Qualifier("compositeESandJdbcItemWriter") ItemWriter<Document> writer,
@Qualifier("slaveTaskExecutor")TaskExecutor taskExecutor,
@Qualifier("nonFatalExceptionItemProcessorListener")
ItemProcessListener nonFatalExceptionItemProcessorListener,
//@Qualifier("targetDatasourceTransactionManager")PlatformTransactionManager manager,
StepBuilderFactory stepBuilderFactory
) {
FaultTolerantStepBuilder stepBuilder = stepBuilderFactory.get("compositeSlaveStep")
.<Document, Document> chunk(chunkSize)
.reader(reader)
.processor(processor)
.writer(writer)
.faultTolerant()
.skipLimit(skipLimit)
.skip(WebserviceProcessingFailedException.class);
if (env.acceptsProfiles("jdbc_out_map")) {
stepBuilder = stepBuilder.skip(InvalidDataAccessApiUsageException.class);
}
return stepBuilder.noSkip(Exception.class)
// .listener(nonFatalExceptionItemProcessorListener)
.listener(new SkipListener())
.taskExecutor(taskExecutor)
.build();
}
@Test
public void asyncHistoryExecutorBeanAvailable() {
contextRunner.withPropertyValues("flowable.process.async-history.enable=true").run((context -> {
assertThat(context)
.hasSingleBean(ProcessEngine.class)
.hasBean("taskExecutor")
.hasBean("processAsyncExecutor")
.hasBean("asyncHistoryExecutor");
AsyncExecutor processAsyncExecutor = context.getBean(ProcessEngine.class).getProcessEngineConfiguration().getAsyncExecutor();
assertThat(processAsyncExecutor).isNotNull();
AsyncExecutor processAsyncHistoryExecutor = context.getBean(ProcessEngine.class).getProcessEngineConfiguration().getAsyncHistoryExecutor();
assertThat(processAsyncHistoryExecutor).isNotNull();
assertThat(processAsyncExecutor).isNotSameAs(processAsyncHistoryExecutor);
TaskExecutor taskExecutorBean = context.getBean("taskExecutor", TaskExecutor.class);
assertThat(((SpringAsyncExecutor) processAsyncExecutor).getTaskExecutor()).isSameAs(taskExecutorBean);
assertThat(((SpringAsyncExecutor) processAsyncHistoryExecutor).getTaskExecutor()).isSameAs(taskExecutorBean);
assertThat(context.getBean(ProcessEngine.class).getProcessEngineConfiguration().isAsyncExecutorActivate()).isTrue();
assertThat(context.getBean(ProcessEngine.class).getProcessEngineConfiguration().isAsyncHistoryExecutorActivate()).isTrue();
assertThat(((ProcessEngineConfigurationImpl) context.getBean(ProcessEngine.class).getProcessEngineConfiguration()).isAsyncHistoryEnabled()).isTrue();
}));
}
@Bean(name = "executionJob")
public Job reportsExecutionJob(CustomItemReader<List<Long>> reader, CustomItemProcessor processor,
CustomItemWriter<List<PostDTO>> writer, TaskExecutor taskExecutor) {
Step step = this.stepBuilderFactory.get("execution-step").allowStartIfComplete(true)
.<List<Long>, List<PostDTO>>chunk(1).reader(reader).processor(processor).writer(writer)
.taskExecutor(taskExecutor).build();
return this.jobBuilderFactory.get("reporting-job").incrementer(new RunIdIncrementer()).listener(this)
.start(step).build();
}
@Bean
public TaskExecutor taskExecutor() {
ThreadPoolTaskExecutor executor = new CustomThreadPoolTaskExecutor();
executor.setCorePoolSize(corePoolSize);
executor.setMaxPoolSize(maxPoolSize);
executor.setQueueCapacity(queueCapacity);
executor.setThreadNamePrefix(threadNamePrefix);
/*
rejection-policy:当pool已经达到max size的时候,如何处理新任务
CALLER_RUNS:不在新线程中执行任务,而是有调用者所在的线程来执行
*/
executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
executor.initialize();
return executor;
}
MachineConfiguration(StateMachineConfigurationBuilder<S, E> configurationBuilder, StateMachineStateBuilder<S, E> stateBuilder,
StateMachineTransitionBuilder<S, E> transitionBuilder, StateMachineListener<S, E> listener, TaskExecutor executor) {
this.configurationBuilder = configurationBuilder;
this.stateBuilder = stateBuilder;
this.transitionBuilder = transitionBuilder;
this.listener = listener;
this.executor = executor;
}
@Bean("threadPoolTaskExecutor")
public TaskExecutor getAsyncExecutor() {
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
executor.setCorePoolSize(20);
executor.setMaxPoolSize(1000);
executor.setWaitForTasksToCompleteOnShutdown(true);
executor.setThreadNamePrefix("Async-");
return executor;
}
ReactiveTypeHandler(ReactiveAdapterRegistry registry, TaskExecutor executor, ContentNegotiationManager manager) {
Assert.notNull(registry, "ReactiveAdapterRegistry is required");
Assert.notNull(executor, "TaskExecutor is required");
Assert.notNull(manager, "ContentNegotiationManager is required");
this.reactiveRegistry = registry;
this.taskExecutor = executor;
this.taskExecutorWarning = executor instanceof SimpleAsyncTaskExecutor || executor instanceof SyncTaskExecutor;
this.contentNegotiationManager = manager;
}
@Bean
public TaskExecutor taskExecutor() {
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
executor.setCorePoolSize(2);
executor.setMaxPoolSize(8);
executor.setQueueCapacity(100);
executor.setKeepAliveSeconds(60);
executor.setThreadNamePrefix("mtons.mblog.logThread-");
executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
executor.setWaitForTasksToCompleteOnShutdown(true);
return executor;
}
@Autowired
public AnnotationListenerAspect(ApplicationContext applicationContext, @Qualifier("listenerTaskExecutor") TaskExecutor taskExecutor, @Value("#{'${listeners.registered}'.split(',')}") List<String> configuredListeners) throws ClassNotFoundException {
this.applicationContext = applicationContext;
this.taskExecutor = taskExecutor;
this.synchronousRegisteredListeners = new ArrayList<RegisteredListener>();
this.asynchronousRegisteredListeners = new ArrayList<RegisteredListener>();
initialiseConfiguredListeners(configuredListeners);
}
@Test
public void testDeployShouldPushAndStartLoopWaitInstance() throws Exception {
context.register(Config.class);
context.refresh();
TestYarnCloudAppService yarnCloudAppService = new TestYarnCloudAppService();
yarnCloudAppService.getInstancesCountBeforeReturn = 2;
TaskExecutor taskExecutor = context.getBean(TaskExecutor.class);
AppDeployerStateMachine ycasm = new AppDeployerStateMachine(yarnCloudAppService, taskExecutor, context, context);
ycasm.setAutoStart(false);
StateMachine<String, String> stateMachine = ycasm.buildStateMachine();
Message<String> message = MessageBuilder.withPayload(AppDeployerStateMachine.EVENT_DEPLOY)
.setHeader(AppDeployerStateMachine.HEADER_APP_VERSION, "app")
.setHeader(AppDeployerStateMachine.HEADER_CLUSTER_ID, "fakeClusterId")
.setHeader(AppDeployerStateMachine.HEADER_GROUP_ID, "fakeGroup")
.setHeader(AppDeployerStateMachine.HEADER_COUNT, 1)
.setHeader(AppDeployerStateMachine.HEADER_DEFINITION_PARAMETERS, new HashMap<Object, Object>())
.build();
StateMachineTestPlan<String, String> plan =
StateMachineTestPlanBuilder.<String, String>builder()
.defaultAwaitTime(10)
.stateMachine(stateMachine)
.step()
.expectStateMachineStarted(1)
.expectStates(AppDeployerStateMachine.STATE_READY)
.and()
.step()
.sendEvent(message)
.expectStateChanged(14)
.expectStates(AppDeployerStateMachine.STATE_READY)
.and()
.build();
plan.test();
}
@Bean(name="dbThreadPoolExecutor")
public TaskExecutor getTaskExecutor() {
ThreadPoolTaskExecutor tpte = new ThreadPoolTaskExecutor();
tpte.setCorePoolSize(THREAD_POOL_DB_INIT_SIZE);
tpte.setMaxPoolSize(THREAD_POOL_DB_MAX_SIZE);
tpte.setQueueCapacity(THREAD_POOL_DB_QUEUE_SIZE);
tpte.initialize();
return tpte;
}
@Bean
public CommandLineRunner schedulingRunner(TaskExecutor executor) {
return new CommandLineRunner() {
public void run(String... args) throws Exception {
simulator = new ActivitySimulator(opts);
executor.execute(simulator);
System.out.println("Simulator thread started...");
}
};
}
/**
* TaskExecutor used to start and configure adapters
*/
private static TaskExecutor getTaskExecutor() {
if(taskExecutor == null) {
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
executor.setCorePoolSize(2);
executor.setMaxPoolSize(10);
executor.initialize();
taskExecutor = executor;
}
return taskExecutor;
}
/**
* @return an Authenticated task executor for event multi casting.
* @see DataPrepEvents
*/
@Bean(name = "applicationEventMulticaster#executor")
public TaskExecutor dataPrepAsyncTaskExecutor() {
final ThreadPoolTaskExecutor taskExecutor = new ThreadPoolTaskExecutor();
taskExecutor.setCorePoolSize(2);
taskExecutor.setMaxPoolSize(10);
taskExecutor.setWaitForTasksToCompleteOnShutdown(false);
taskExecutor.initialize();
return taskExecutor;
}
/**
* Constructor.
*
* @param s3ClientFactory The S3 client factory to use to get S3 client instances
* @param s3TaskExecutor A task executor to use for uploading files to S3
*/
public S3ProtocolResolver(
final S3ClientFactory s3ClientFactory,
final TaskExecutor s3TaskExecutor
) {
this.s3ClientFactory = s3ClientFactory;
this.s3TaskExecutor = s3TaskExecutor;
}
public static JobExecutor jobExecutor(@Qualifier(JobConfiguration.CAMUNDA_TASK_EXECUTOR_QUALIFIER) final TaskExecutor taskExecutor) {
final SpringJobExecutor springJobExecutor = new SpringJobExecutor();
springJobExecutor.setTaskExecutor(taskExecutor);
springJobExecutor.setRejectedJobsHandler(new CallerRunsRejectedJobsHandler());
springJobExecutor.setWaitTimeInMillis(10);
// springJobExecutor.setWaitIncreaseFactor(1.0f);
springJobExecutor.setMaxWait(20);
return springJobExecutor;
}
@Bean
TaskExecutor batchTaskExecutor() {
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
executor.setCorePoolSize(4);
executor.initialize();
return executor;
}
/**
* This thread runs the WebSocketConsumerManager, which manages any consumers for web sockets.
* It only needs a single thread, because the manager starts up its own managed thread pool.
* @return new ThreadPool Task executor.
*/
@Bean
public TaskExecutor backgroundConsumerExecutor() {
final ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
// Only a single thread in the pool
executor.setCorePoolSize(1);
executor.setMaxPoolSize(1);
executor.setThreadNamePrefix("Web Socket Consumer Manager");
executor.initialize();
return executor;
}
@Bean
public TaskExecutor taskExecutor() {
return new SyncTaskExecutor();
}
/**
* Return the configured {@link TaskExecutor}.
*/
public TaskExecutor getTaskExecutor() {
return this.taskExecutor;
}