下面列出了org.springframework.boot.actuate.health.HealthIndicator#org.camunda.bpm.engine.impl.jobexecutor.JobExecutor 实例代码,或者点击链接到github查看源代码,也可以在右侧发表评论。
@Bean
@ConditionalOnMissingBean(JobExecutor.class)
@ConditionalOnProperty(prefix = "camunda.bpm.job-execution", name = "enabled", havingValue = "true", matchIfMissing = true)
public static JobExecutor jobExecutor(@Qualifier(CAMUNDA_TASK_EXECUTOR_QUALIFIER) final TaskExecutor taskExecutor, CamundaBpmProperties properties) {
final SpringJobExecutor springJobExecutor = new SpringJobExecutor();
springJobExecutor.setTaskExecutor(taskExecutor);
springJobExecutor.setRejectedJobsHandler(new CallerRunsRejectedJobsHandler());
JobExecutionProperty jobExecution = properties.getJobExecution();
Optional.ofNullable(jobExecution.getLockTimeInMillis()).ifPresent(springJobExecutor::setLockTimeInMillis);
Optional.ofNullable(jobExecution.getMaxJobsPerAcquisition()).ifPresent(springJobExecutor::setMaxJobsPerAcquisition);
Optional.ofNullable(jobExecution.getWaitTimeInMillis()).ifPresent(springJobExecutor::setWaitTimeInMillis);
Optional.ofNullable(jobExecution.getMaxWait()).ifPresent(springJobExecutor::setMaxWait);
Optional.ofNullable(jobExecution.getBackoffTimeInMillis()).ifPresent(springJobExecutor::setBackoffTimeInMillis);
Optional.ofNullable(jobExecution.getMaxBackoff()).ifPresent(springJobExecutor::setMaxBackoff);
Optional.ofNullable(jobExecution.getBackoffDecreaseThreshold()).ifPresent(springJobExecutor::setBackoffDecreaseThreshold);
Optional.ofNullable(jobExecution.getWaitIncreaseFactor()).ifPresent(springJobExecutor::setWaitIncreaseFactor);
return springJobExecutor;
}
@Bean
@ConditionalOnMissingBean(JobExecutor.class)
@ConditionalOnProperty(prefix = "camunda.bpm.job-execution", name = "enabled", havingValue = "true", matchIfMissing = true)
public static JobExecutor jobExecutor(@Qualifier(CAMUNDA_TASK_EXECUTOR_QUALIFIER) final TaskExecutor taskExecutor, CamundaBpmProperties properties) {
final SpringJobExecutor springJobExecutor = new SpringJobExecutor();
springJobExecutor.setTaskExecutor(taskExecutor);
springJobExecutor.setRejectedJobsHandler(new NotifyAcquisitionRejectedJobsHandler());
JobExecutionProperty jobExecution = properties.getJobExecution();
Optional.ofNullable(jobExecution.getLockTimeInMillis()).ifPresent(springJobExecutor::setLockTimeInMillis);
Optional.ofNullable(jobExecution.getMaxJobsPerAcquisition()).ifPresent(springJobExecutor::setMaxJobsPerAcquisition);
Optional.ofNullable(jobExecution.getWaitTimeInMillis()).ifPresent(springJobExecutor::setWaitTimeInMillis);
Optional.ofNullable(jobExecution.getMaxWait()).ifPresent(springJobExecutor::setMaxWait);
Optional.ofNullable(jobExecution.getBackoffTimeInMillis()).ifPresent(springJobExecutor::setBackoffTimeInMillis);
Optional.ofNullable(jobExecution.getMaxBackoff()).ifPresent(springJobExecutor::setMaxBackoff);
Optional.ofNullable(jobExecution.getBackoffDecreaseThreshold()).ifPresent(springJobExecutor::setBackoffDecreaseThreshold);
Optional.ofNullable(jobExecution.getWaitIncreaseFactor()).ifPresent(springJobExecutor::setWaitIncreaseFactor);
return springJobExecutor;
}
public void waitForJobExecutorToProcessAllJobs(long maxMillisToWait, long intervalMillis) {
JobExecutor jobExecutor = processEngineConfiguration.getJobExecutor();
jobExecutor.start();
try {
Timer timer = new Timer();
InteruptTask task = new InteruptTask(Thread.currentThread());
timer.schedule(task, maxMillisToWait);
boolean areJobsAvailable = true;
try {
while (areJobsAvailable && !task.isTimeLimitExceeded()) {
Thread.sleep(intervalMillis);
areJobsAvailable = areJobsAvailable();
}
} catch (InterruptedException e) {
} finally {
timer.cancel();
}
if (areJobsAvailable) {
throw new ProcessEngineException("time limit of " + maxMillisToWait + " was exceeded");
}
} finally {
jobExecutor.shutdown();
}
}
public static void waitForJobExecutorToProcessAllJobs(ProcessEngineConfigurationImpl processEngineConfiguration, long maxMillisToWait, long intervalMillis) {
JobExecutor jobExecutor = processEngineConfiguration.getJobExecutor();
jobExecutor.start();
try {
Timer timer = new Timer();
InteruptTask task = new InteruptTask(Thread.currentThread());
timer.schedule(task, maxMillisToWait);
boolean areJobsAvailable = true;
try {
while (areJobsAvailable && !task.isTimeLimitExceeded()) {
Thread.sleep(intervalMillis);
areJobsAvailable = areJobsAvailable(processEngineConfiguration);
}
} catch (InterruptedException e) {
} finally {
timer.cancel();
}
if (areJobsAvailable) {
throw new ProcessEngineException("time limit of " + maxMillisToWait + " was exceeded");
}
} finally {
jobExecutor.shutdown();
}
}
public void waitForJobExecutorToProcessAllJobs(long maxMillisToWait, long intervalMillis, JobExecutor jobExecutor, ManagementService managementService, boolean shutdown) {
try {
Timer timer = new Timer();
InteruptTask task = new InteruptTask(Thread.currentThread());
timer.schedule(task, maxMillisToWait);
boolean areJobsAvailable = true;
try {
while (areJobsAvailable && !task.isTimeLimitExceeded()) {
Thread.sleep(intervalMillis);
areJobsAvailable = areJobsAvailable(managementService);
}
} catch (InterruptedException e) {
} finally {
timer.cancel();
}
if (areJobsAvailable) {
throw new ProcessEngineException("time limit of " + maxMillisToWait + " was exceeded");
}
} finally {
if (shutdown) {
jobExecutor.shutdown();
}
}
}
@Test
@Deployment(resources = "org/camunda/bpm/engine/test/jobexecutor/delegateThrowsException.bpmn20.xml")
public void shouldLogFailingJobOnlyOnceReducedLogging() {
// given a job that always throws an Exception
processEngineConfiguration.setEnableCmdExceptionLogging(false);
runtimeService.startProcessInstanceByKey("testProcess");
// when executing the job and wait
JobExecutor jobExecutor = processEngineConfiguration.getJobExecutor();
jobExecutor.start();
testRule.waitForJobExecutorToProcessAllJobs();
jobExecutor.shutdown();
List<ILoggingEvent> jobLog = loggingRule.getFilteredLog(JOBEXECUTOR_LOGGER, "Exception while executing job");
List<ILoggingEvent> ctxLog = loggingRule.getFilteredLog(CONTEXT_LOGGER, "Exception while closing command context");
// then
assertThat(jobLog.size()).isEqualTo(1);
assertThat(ctxLog.size()).isEqualTo(0);
}
@Test
@Deployment(resources = "org/camunda/bpm/engine/test/jobexecutor/delegateThrowsException.bpmn20.xml")
public void shouldLogFailingJobTwiceDefaultLogging() {
// given a job that always throws an Exception
processEngineConfiguration.setEnableCmdExceptionLogging(true);
runtimeService.startProcessInstanceByKey("testProcess");
// when executing the job and wait
JobExecutor jobExecutor = processEngineConfiguration.getJobExecutor();
jobExecutor.start();
testRule.waitForJobExecutorToProcessAllJobs();
jobExecutor.shutdown();
List<ILoggingEvent> jobLog = loggingRule.getFilteredLog(JOBEXECUTOR_LOGGER, "Exception while executing job");
List<ILoggingEvent> ctxLog = loggingRule.getFilteredLog(CONTEXT_LOGGER, "Exception while closing command context");
// then
assertThat(jobLog.size()).isEqualTo(1);
assertThat(ctxLog.size()).isEqualTo(1);
}
@Before
public void initCommand() {
JobExecutor jobExecutor = mock(JobExecutor.class);
when(jobExecutor.getMaxJobsPerAcquisition()).thenReturn(3);
when(jobExecutor.getLockOwner()).thenReturn("test");
when(jobExecutor.getLockTimeInMillis()).thenReturn(5 * 60 * 1000);
acquireJobsCmd = new AcquireJobsCmd(jobExecutor);
commandContext = mock(CommandContext.class);
DbEntityManager dbEntityManager = mock(DbEntityManager.class);
when(commandContext.getDbEntityManager()).thenReturn(dbEntityManager);
jobManager = mock(JobManager.class);
when(commandContext.getJobManager()).thenReturn(jobManager);
}
public void testJobExecutorConfigurationProperties() {
// given
JobExecutor jobExecutor = new DefaultJobExecutor();
Map<String, String> propertiesToSet = new HashMap<String, String>();
propertiesToSet.put(MAX_JOBS_PER_ACQUISITION, Integer.toString(Integer.MAX_VALUE));
propertiesToSet.put(MAX_WAIT, Long.toString(Long.MAX_VALUE));
propertiesToSet.put(WAIT_INCREASE_FACTOR, Float.toString(Float.MAX_VALUE));
propertiesToSet.put(BACKOFF_TIME_IN_MILLIS, Integer.toString(Integer.MAX_VALUE));
// when
PropertyHelper.applyProperties(jobExecutor, propertiesToSet);
// then
Assert.assertEquals(Integer.MAX_VALUE, jobExecutor.getMaxJobsPerAcquisition());
Assert.assertEquals(Long.MAX_VALUE, jobExecutor.getMaxWait());
Assert.assertEquals(Float.MAX_VALUE, jobExecutor.getWaitIncreaseFactor(), 0.0001d);
Assert.assertEquals(Integer.MAX_VALUE, jobExecutor.getBackoffTimeInMillis());
}
@OperateOnDeployment("pa1")
@Test
public void testDeploymentAwareJobAcquisition() {
JobExecutor jobExecutor1 = engine1Configuration.getJobExecutor();
ProcessInstance instance1 = engine1.getRuntimeService().startProcessInstanceByKey("archive1Process");
ProcessInstance instance2 = processEngine.getRuntimeService().startProcessInstanceByKey("archive2Process");
Job job1 = managementService.createJobQuery().processInstanceId(instance1.getId()).singleResult();
Job job2 = managementService.createJobQuery().processInstanceId(instance2.getId()).singleResult();
// the deployment aware configuration should only return the jobs of the registered deployments
CommandExecutor commandExecutor = engine1Configuration.getCommandExecutorTxRequired();
AcquiredJobs acquiredJobs = commandExecutor.execute(new AcquireJobsCmd(jobExecutor1));
Assert.assertEquals(1, acquiredJobs.size());
Assert.assertTrue(acquiredJobs.contains(job1.getId()));
Assert.assertFalse(acquiredJobs.contains(job2.getId()));
}
@OperateOnDeployment("pa1")
@Test
public void testDeploymentUnawareJobAcquisition() {
JobExecutor defaultJobExecutor = processEngineConfiguration.getJobExecutor();
ProcessInstance instance1 = engine1.getRuntimeService().startProcessInstanceByKey("archive1Process");
ProcessInstance instance2 = processEngine.getRuntimeService().startProcessInstanceByKey("archive2Process");
Job job1 = managementService.createJobQuery().processInstanceId(instance1.getId()).singleResult();
Job job2 = managementService.createJobQuery().processInstanceId(instance2.getId()).singleResult();
// the deployment unaware configuration should return both jobs
CommandExecutor commandExecutor = processEngineConfiguration.getCommandExecutorTxRequired();
processEngineConfiguration.setJobExecutorDeploymentAware(false);
try {
AcquiredJobs acquiredJobs = commandExecutor.execute(new AcquireJobsCmd(defaultJobExecutor));
Assert.assertEquals(2, acquiredJobs.size());
Assert.assertTrue(acquiredJobs.contains(job1.getId()));
Assert.assertTrue(acquiredJobs.contains(job2.getId()));
} finally {
processEngineConfiguration.setJobExecutorDeploymentAware(true);
}
}
protected void waitForJobExecutorToProcessAllJobs(String processInstanceId, long maxMillisToWait, long intervalMillis) {
JobExecutor jobExecutor = ((ProcessEngineConfigurationImpl) processEngine.getProcessEngineConfiguration()).getJobExecutor();
jobExecutor.start();
try {
Timer timer = new Timer();
InteruptTask task = new InteruptTask(Thread.currentThread());
timer.schedule(task, maxMillisToWait);
boolean areJobsAvailable = true;
try {
while (areJobsAvailable && !task.isTimeLimitExceeded()) {
Thread.sleep(intervalMillis);
areJobsAvailable = areJobsAvailable(processInstanceId);
}
} catch (InterruptedException e) {
} finally {
timer.cancel();
}
if (areJobsAvailable) {
throw new ProcessEngineException("time limit of " + maxMillisToWait + " was exceeded");
}
} finally {
jobExecutor.shutdown();
}
}
public static JobExecutor jobExecutor(@Qualifier(JobConfiguration.CAMUNDA_TASK_EXECUTOR_QUALIFIER) final TaskExecutor taskExecutor) {
final SpringJobExecutor springJobExecutor = new SpringJobExecutor();
springJobExecutor.setTaskExecutor(taskExecutor);
springJobExecutor.setRejectedJobsHandler(new CallerRunsRejectedJobsHandler());
springJobExecutor.setWaitTimeInMillis(10);
// springJobExecutor.setWaitIncreaseFactor(1.0f);
springJobExecutor.setMaxWait(20);
return springJobExecutor;
}
private static Details from(JobExecutor jobExecutor) {
final DetailsBuilder builder = Details.builder()
.name(jobExecutor.getName())
.lockOwner(jobExecutor.getLockOwner())
.lockTimeInMillis(jobExecutor.getLockTimeInMillis())
.maxJobsPerAcquisition(jobExecutor.getMaxJobsPerAcquisition())
.waitTimeInMillis(jobExecutor.getWaitTimeInMillis());
for (ProcessEngineImpl processEngineImpl : jobExecutor.getProcessEngines()) {
builder.processEngineName(processEngineImpl.getName());
}
return builder.build();
}
private static Details from(JobExecutor jobExecutor) {
final DetailsBuilder builder = Details.builder()
.name(jobExecutor.getName())
.lockOwner(jobExecutor.getLockOwner())
.lockTimeInMillis(jobExecutor.getLockTimeInMillis())
.maxJobsPerAcquisition(jobExecutor.getMaxJobsPerAcquisition())
.waitTimeInMillis(jobExecutor.getWaitTimeInMillis());
for (ProcessEngineImpl processEngineImpl : jobExecutor.getProcessEngines()) {
builder.processEngineName(processEngineImpl.getName());
}
return builder.build();
}
private void hintJobExecutorIfNeeded(JobEntity jobEntity, Date duedate) {
// Check if this timer fires before the next time the job executor will check for new timers to fire.
// This is highly unlikely because normally waitTimeInMillis is 5000 (5 seconds)
// and timers are usually set further in the future
JobExecutor jobExecutor = Context.getProcessEngineConfiguration().getJobExecutor();
int waitTimeInMillis = jobExecutor.getWaitTimeInMillis();
if (duedate.getTime() < (ClockUtil.getCurrentTime().getTime() + waitTimeInMillis)) {
hintJobExecutor(jobEntity);
}
}
protected void hintJobExecutor(JobEntity job) {
JobExecutor jobExecutor = Context.getProcessEngineConfiguration().getJobExecutor();
if (!jobExecutor.isActive()) {
return;
}
JobExecutorContext jobExecutorContext = Context.getJobExecutorContext();
TransactionListener transactionListener = null;
// add job to be executed in the current processor
if(!job.isSuspended()
&& job.isExclusive()
&& isJobDue(job)
&& jobExecutorContext != null
&& jobExecutorContext.isExecutingExclusiveJob()
&& areInSameProcessInstance(job, jobExecutorContext.getCurrentJob())) {
// lock job & add to the queue of the current processor
Date currentTime = ClockUtil.getCurrentTime();
job.setLockExpirationTime(new Date(currentTime.getTime() + jobExecutor.getLockTimeInMillis()));
job.setLockOwner(jobExecutor.getLockOwner());
transactionListener = new ExclusiveJobAddedNotification(job.getId(), jobExecutorContext);
} else {
// reset Acquisition strategy and notify the JobExecutor that
// a new Job is available for execution on future runs
transactionListener = new MessageAddedNotification(jobExecutor);
}
Context.getCommandContext()
.getTransactionContext()
.addTransactionListener(TransactionState.COMMITTED, transactionListener);
}
public void waitForJobExecutorToProcessAllJobs(long maxMillisToWait) {
JobExecutor jobExecutor = processEngineConfiguration.getJobExecutor();
jobExecutor.start();
long intervalMillis = 1000;
int jobExecutorWaitTime = jobExecutor.getWaitTimeInMillis() * 2;
if(maxMillisToWait < jobExecutorWaitTime) {
maxMillisToWait = jobExecutorWaitTime;
}
try {
Timer timer = new Timer();
InterruptTask task = new InterruptTask(Thread.currentThread());
timer.schedule(task, maxMillisToWait);
boolean areJobsAvailable = true;
try {
while (areJobsAvailable && !task.isTimeLimitExceeded()) {
Thread.sleep(intervalMillis);
try {
areJobsAvailable = areJobsAvailable();
} catch(Throwable t) {
// Ignore, possible that exception occurs due to locking/updating of table on MSSQL when
// isolation level doesn't allow READ of the table
}
}
} catch (InterruptedException e) {
} finally {
timer.cancel();
}
if (areJobsAvailable) {
throw new ProcessEngineException("time limit of " + maxMillisToWait + " was exceeded");
}
} finally {
jobExecutor.shutdown();
}
}
public void performOperationStep(DeploymentOperation operationContext) {
final PlatformServiceContainer serviceContainer = operationContext.getServiceContainer();
final AbstractProcessApplication processApplication = operationContext.getAttachment(PROCESS_APPLICATION);
ClassLoader configurationClassloader = null;
if(processApplication != null) {
configurationClassloader = processApplication.getProcessApplicationClassloader();
} else {
configurationClassloader = ProcessEngineConfiguration.class.getClassLoader();
}
String configurationClassName = jobAcquisitionXml.getJobExecutorClassName();
if(configurationClassName == null || configurationClassName.isEmpty()) {
configurationClassName = RuntimeContainerJobExecutor.class.getName();
}
// create & instantiate the job executor class
Class<? extends JobExecutor> jobExecutorClass = loadJobExecutorClass(configurationClassloader, configurationClassName);
JobExecutor jobExecutor = instantiateJobExecutor(jobExecutorClass);
// apply properties
Map<String, String> properties = jobAcquisitionXml.getProperties();
PropertyHelper.applyProperties(jobExecutor, properties);
// construct service for job executor
JmxManagedJobExecutor jmxManagedJobExecutor = new JmxManagedJobExecutor(jobExecutor);
// deploy the job executor service into the container
serviceContainer.startService(ServiceTypes.JOB_EXECUTOR, jobAcquisitionXml.getName(), jmxManagedJobExecutor);
}
protected JobExecutor instantiateJobExecutor(Class<? extends JobExecutor> configurationClass) {
try {
return configurationClass.newInstance();
}
catch (Exception e) {
throw LOG.couldNotInstantiateJobExecutorClass(e);
}
}
@SuppressWarnings("unchecked")
protected Class<? extends JobExecutor> loadJobExecutorClass(ClassLoader processApplicationClassloader, String jobExecutorClassname) {
try {
return (Class<? extends JobExecutor>) processApplicationClassloader.loadClass(jobExecutorClassname);
}
catch (ClassNotFoundException e) {
throw LOG.couldNotLoadJobExecutorClass(e);
}
}
public void run() {
try {
JobExecutor jobExecutor = processEngineConfiguration.getJobExecutor();
jobs = (AcquiredJobs) processEngineConfiguration
.getCommandExecutorTxRequired()
.execute(new ControlledCommand(activeThread, new AcquireJobsCmd(jobExecutor)));
} catch (OptimisticLockingException e) {
this.exception = e;
}
LOG.debug(getName()+" ends");
}
@Override
public void run() {
try {
JobExecutor jobExecutor = processEngineConfiguration.getJobExecutor();
acquiredJobs = processEngineConfiguration.getCommandExecutorTxRequired()
.execute(new ControlledCommand<AcquiredJobs>(activeThread, new AcquireJobsCmd(jobExecutor)));
} catch (OptimisticLockingException e) {
this.exception = e;
}
LOG.debug(getName()+" ends");
}
@Deployment(resources = {"org/camunda/bpm/engine/test/api/mgmt/timerOnTask.bpmn20.xml"})
public void testDeleteJobThatWasAlreadyAcquired() {
ClockUtil.setCurrentTime(new Date());
ProcessInstance processInstance = runtimeService.startProcessInstanceByKey("timerOnTask");
Job timerJob = managementService.createJobQuery().processInstanceId(processInstance.getId()).singleResult();
// We need to move time at least one hour to make the timer executable
ClockUtil.setCurrentTime(new Date(ClockUtil.getCurrentTime().getTime() + 7200000L));
// Acquire job by running the acquire command manually
ProcessEngineImpl processEngineImpl = (ProcessEngineImpl) processEngine;
JobExecutor jobExecutor = processEngineImpl.getProcessEngineConfiguration().getJobExecutor();
AcquireJobsCmd acquireJobsCmd = new AcquireJobsCmd(jobExecutor);
CommandExecutor commandExecutor = processEngineImpl.getProcessEngineConfiguration().getCommandExecutorTxRequired();
commandExecutor.execute(acquireJobsCmd);
// Try to delete the job. This should fail.
try {
managementService.deleteJob(timerJob.getId());
fail();
} catch (ProcessEngineException e) {
// Exception is expected
}
// Clean up
managementService.executeJob(timerJob.getId());
}
public void testJobCommandsWithMessage() {
CommandExecutor commandExecutor = processEngineConfiguration.getCommandExecutorTxRequired();
JobExecutor jobExecutor = processEngineConfiguration.getJobExecutor();
String jobId = commandExecutor.execute(new Command<String>() {
public String execute(CommandContext commandContext) {
MessageEntity message = createTweetMessage("i'm coding a test");
commandContext.getJobManager().send(message);
return message.getId();
}
});
AcquiredJobs acquiredJobs = commandExecutor.execute(new AcquireJobsCmd(jobExecutor));
List<List<String>> jobIdsList = acquiredJobs.getJobIdBatches();
assertEquals(1, jobIdsList.size());
List<String> jobIds = jobIdsList.get(0);
List<String> expectedJobIds = new ArrayList<String>();
expectedJobIds.add(jobId);
assertEquals(expectedJobIds, new ArrayList<String>(jobIds));
assertEquals(0, tweetHandler.getMessages().size());
ExecuteJobHelper.executeJob(jobId, commandExecutor);
assertEquals("i'm coding a test", tweetHandler.getMessages().get(0));
assertEquals(1, tweetHandler.getMessages().size());
clearDatabase();
}
public void waitForJobExecutorToProcessAllJobs(long maxMillisToWait) {
ProcessEngineConfigurationImpl processEngineConfiguration = (ProcessEngineConfigurationImpl) processEngine.getProcessEngineConfiguration();
JobExecutor jobExecutor = processEngineConfiguration.getJobExecutor();
jobExecutor.start();
long intervalMillis = 1000;
int jobExecutorWaitTime = jobExecutor.getWaitTimeInMillis() * 2;
if(maxMillisToWait < jobExecutorWaitTime) {
maxMillisToWait = jobExecutorWaitTime;
}
try {
Timer timer = new Timer();
InterruptTask task = new InterruptTask(Thread.currentThread());
timer.schedule(task, maxMillisToWait);
boolean areJobsAvailable = true;
try {
while (areJobsAvailable && !task.isTimeLimitExceeded()) {
Thread.sleep(intervalMillis);
try {
areJobsAvailable = areJobsAvailable();
} catch(Throwable t) {
// Ignore, possible that exception occurs due to locking/updating of table on MSSQL when
// isolation level doesn't allow READ of the table
}
}
} catch (InterruptedException e) {
} finally {
timer.cancel();
}
if (areJobsAvailable) {
throw new AssertionError("time limit of " + maxMillisToWait + " was exceeded");
}
} finally {
jobExecutor.shutdown();
}
}
private void moveByHours(int hours) throws Exception {
ClockUtil.setCurrentTime(new Date(ClockUtil.getCurrentTime().getTime() + ((hours * 60 * 1000 * 60) + 5000)));
JobExecutor jobExecutor = processEngineConfiguration.getJobExecutor();
jobExecutor.start();
Thread.sleep(1000);
jobExecutor.shutdown();
}
public void waitForJobExecutorToProcessAllJobs(JobExecutor jobExecutor, long maxMillisToWait) {
int checkInterval = 1000;
jobExecutor.start();
try {
Timer timer = new Timer();
InterruptTask task = new InterruptTask(Thread.currentThread());
timer.schedule(task, maxMillisToWait);
boolean areJobsAvailable = true;
try {
while (areJobsAvailable && !task.isTimeLimitExceeded()) {
Thread.sleep(checkInterval);
areJobsAvailable = areJobsAvailable();
}
} catch (InterruptedException e) {
} finally {
timer.cancel();
}
if (areJobsAvailable) {
throw new RuntimeException("time limit of " + maxMillisToWait + " was exceeded (still " + numberOfJobsAvailable() + " jobs available)");
}
} finally {
jobExecutor.shutdown();
}
}
@Test
public void shouldNotActiateJobExecutor() {
ProcessEngine processEngine = processEngineService.getProcessEngine("jobExecutorActivate-FALSE-engine");
ProcessEngineConfiguration configuration = processEngine.getProcessEngineConfiguration();
JobExecutor jobExecutor = ((ProcessEngineConfigurationImpl)configuration).getJobExecutor();
assertFalse(jobExecutor.isActive());
processEngine = processEngineService.getProcessEngine("jobExecutorActivate-UNDEFINED-engine");
configuration = processEngine.getProcessEngineConfiguration();
jobExecutor = ((ProcessEngineConfigurationImpl)configuration).getJobExecutor();
assertTrue(jobExecutor.isActive());
}
@Bean
@ConditionalOnBean(name = "jobExecutor")
@ConditionalOnMissingBean(name = "jobExecutorHealthIndicator")
public HealthIndicator jobExecutorHealthIndicator(JobExecutor jobExecutor) {
return new JobExecutorHealthIndicator(jobExecutor);
}