org.springframework.boot.actuate.health.HealthIndicator#org.camunda.bpm.engine.impl.jobexecutor.JobExecutor源码实例Demo

下面列出了org.springframework.boot.actuate.health.HealthIndicator#org.camunda.bpm.engine.impl.jobexecutor.JobExecutor 实例代码,或者点击链接到github查看源代码,也可以在右侧发表评论。

@Bean
@ConditionalOnMissingBean(JobExecutor.class)
@ConditionalOnProperty(prefix = "camunda.bpm.job-execution", name = "enabled", havingValue = "true", matchIfMissing = true)
public static JobExecutor jobExecutor(@Qualifier(CAMUNDA_TASK_EXECUTOR_QUALIFIER) final TaskExecutor taskExecutor, CamundaBpmProperties properties) {
  final SpringJobExecutor springJobExecutor = new SpringJobExecutor();
  springJobExecutor.setTaskExecutor(taskExecutor);
  springJobExecutor.setRejectedJobsHandler(new CallerRunsRejectedJobsHandler());

  JobExecutionProperty jobExecution = properties.getJobExecution();
  Optional.ofNullable(jobExecution.getLockTimeInMillis()).ifPresent(springJobExecutor::setLockTimeInMillis);
  Optional.ofNullable(jobExecution.getMaxJobsPerAcquisition()).ifPresent(springJobExecutor::setMaxJobsPerAcquisition);
  Optional.ofNullable(jobExecution.getWaitTimeInMillis()).ifPresent(springJobExecutor::setWaitTimeInMillis);
  Optional.ofNullable(jobExecution.getMaxWait()).ifPresent(springJobExecutor::setMaxWait);
  Optional.ofNullable(jobExecution.getBackoffTimeInMillis()).ifPresent(springJobExecutor::setBackoffTimeInMillis);
  Optional.ofNullable(jobExecution.getMaxBackoff()).ifPresent(springJobExecutor::setMaxBackoff);
  Optional.ofNullable(jobExecution.getBackoffDecreaseThreshold()).ifPresent(springJobExecutor::setBackoffDecreaseThreshold);
  Optional.ofNullable(jobExecution.getWaitIncreaseFactor()).ifPresent(springJobExecutor::setWaitIncreaseFactor);

  return springJobExecutor;
}
 
@Bean
@ConditionalOnMissingBean(JobExecutor.class)
@ConditionalOnProperty(prefix = "camunda.bpm.job-execution", name = "enabled", havingValue = "true", matchIfMissing = true)
public static JobExecutor jobExecutor(@Qualifier(CAMUNDA_TASK_EXECUTOR_QUALIFIER) final TaskExecutor taskExecutor, CamundaBpmProperties properties) {
  final SpringJobExecutor springJobExecutor = new SpringJobExecutor();
  springJobExecutor.setTaskExecutor(taskExecutor);
  springJobExecutor.setRejectedJobsHandler(new NotifyAcquisitionRejectedJobsHandler());

  JobExecutionProperty jobExecution = properties.getJobExecution();
  Optional.ofNullable(jobExecution.getLockTimeInMillis()).ifPresent(springJobExecutor::setLockTimeInMillis);
  Optional.ofNullable(jobExecution.getMaxJobsPerAcquisition()).ifPresent(springJobExecutor::setMaxJobsPerAcquisition);
  Optional.ofNullable(jobExecution.getWaitTimeInMillis()).ifPresent(springJobExecutor::setWaitTimeInMillis);
  Optional.ofNullable(jobExecution.getMaxWait()).ifPresent(springJobExecutor::setMaxWait);
  Optional.ofNullable(jobExecution.getBackoffTimeInMillis()).ifPresent(springJobExecutor::setBackoffTimeInMillis);
  Optional.ofNullable(jobExecution.getMaxBackoff()).ifPresent(springJobExecutor::setMaxBackoff);
  Optional.ofNullable(jobExecution.getBackoffDecreaseThreshold()).ifPresent(springJobExecutor::setBackoffDecreaseThreshold);
  Optional.ofNullable(jobExecution.getWaitIncreaseFactor()).ifPresent(springJobExecutor::setWaitIncreaseFactor);

  return springJobExecutor;
}
 
public void waitForJobExecutorToProcessAllJobs(long maxMillisToWait, long intervalMillis) {
  JobExecutor jobExecutor = processEngineConfiguration.getJobExecutor();
  jobExecutor.start();

  try {
    Timer timer = new Timer();
    InteruptTask task = new InteruptTask(Thread.currentThread());
    timer.schedule(task, maxMillisToWait);
    boolean areJobsAvailable = true;
    try {
      while (areJobsAvailable && !task.isTimeLimitExceeded()) {
        Thread.sleep(intervalMillis);
        areJobsAvailable = areJobsAvailable();
      }
    } catch (InterruptedException e) {
    } finally {
      timer.cancel();
    }
    if (areJobsAvailable) {
      throw new ProcessEngineException("time limit of " + maxMillisToWait + " was exceeded");
    }

  } finally {
    jobExecutor.shutdown();
  }
}
 
源代码4 项目: camunda-bpm-platform   文件: TestHelper.java
public static void waitForJobExecutorToProcessAllJobs(ProcessEngineConfigurationImpl processEngineConfiguration, long maxMillisToWait, long intervalMillis) {
  JobExecutor jobExecutor = processEngineConfiguration.getJobExecutor();
  jobExecutor.start();

  try {
    Timer timer = new Timer();
    InteruptTask task = new InteruptTask(Thread.currentThread());
    timer.schedule(task, maxMillisToWait);
    boolean areJobsAvailable = true;
    try {
      while (areJobsAvailable && !task.isTimeLimitExceeded()) {
        Thread.sleep(intervalMillis);
        areJobsAvailable = areJobsAvailable(processEngineConfiguration);
      }
    } catch (InterruptedException e) {
    } finally {
      timer.cancel();
    }
    if (areJobsAvailable) {
      throw new ProcessEngineException("time limit of " + maxMillisToWait + " was exceeded");
    }

  } finally {
    jobExecutor.shutdown();
  }
}
 
public void waitForJobExecutorToProcessAllJobs(long maxMillisToWait, long intervalMillis, JobExecutor jobExecutor, ManagementService managementService, boolean shutdown) {

    try {
      Timer timer = new Timer();
      InteruptTask task = new InteruptTask(Thread.currentThread());
      timer.schedule(task, maxMillisToWait);
      boolean areJobsAvailable = true;
      try {
        while (areJobsAvailable && !task.isTimeLimitExceeded()) {
          Thread.sleep(intervalMillis);
          areJobsAvailable = areJobsAvailable(managementService);
        }
      } catch (InterruptedException e) {
      } finally {
        timer.cancel();
      }
      if (areJobsAvailable) {
        throw new ProcessEngineException("time limit of " + maxMillisToWait + " was exceeded");
      }

    } finally {
      if (shutdown) {
        jobExecutor.shutdown();
      }
    }
  }
 
@Test
@Deployment(resources = "org/camunda/bpm/engine/test/jobexecutor/delegateThrowsException.bpmn20.xml")
public void shouldLogFailingJobOnlyOnceReducedLogging() {
  // given a job that always throws an Exception
  processEngineConfiguration.setEnableCmdExceptionLogging(false);
  runtimeService.startProcessInstanceByKey("testProcess");

  // when executing the job and wait
  JobExecutor jobExecutor = processEngineConfiguration.getJobExecutor();
  jobExecutor.start();
  testRule.waitForJobExecutorToProcessAllJobs();
  jobExecutor.shutdown();

  List<ILoggingEvent> jobLog = loggingRule.getFilteredLog(JOBEXECUTOR_LOGGER, "Exception while executing job");
  List<ILoggingEvent> ctxLog = loggingRule.getFilteredLog(CONTEXT_LOGGER, "Exception while closing command context");

  // then
  assertThat(jobLog.size()).isEqualTo(1);
  assertThat(ctxLog.size()).isEqualTo(0);
}
 
@Test
@Deployment(resources = "org/camunda/bpm/engine/test/jobexecutor/delegateThrowsException.bpmn20.xml")
public void shouldLogFailingJobTwiceDefaultLogging() {
  // given a job that always throws an Exception
  processEngineConfiguration.setEnableCmdExceptionLogging(true);
  runtimeService.startProcessInstanceByKey("testProcess");
  
  // when executing the job and wait
  JobExecutor jobExecutor = processEngineConfiguration.getJobExecutor();
  jobExecutor.start();
  testRule.waitForJobExecutorToProcessAllJobs();
  jobExecutor.shutdown();

  List<ILoggingEvent> jobLog = loggingRule.getFilteredLog(JOBEXECUTOR_LOGGER, "Exception while executing job");
  List<ILoggingEvent> ctxLog = loggingRule.getFilteredLog(CONTEXT_LOGGER, "Exception while closing command context");
  
  // then
  assertThat(jobLog.size()).isEqualTo(1);
  assertThat(ctxLog.size()).isEqualTo(1);
}
 
@Before
public void initCommand() {
  JobExecutor jobExecutor = mock(JobExecutor.class);
  when(jobExecutor.getMaxJobsPerAcquisition()).thenReturn(3);
  when(jobExecutor.getLockOwner()).thenReturn("test");
  when(jobExecutor.getLockTimeInMillis()).thenReturn(5 * 60 * 1000);

  acquireJobsCmd = new AcquireJobsCmd(jobExecutor);

  commandContext = mock(CommandContext.class);

  DbEntityManager dbEntityManager = mock(DbEntityManager.class);
  when(commandContext.getDbEntityManager()).thenReturn(dbEntityManager);

  jobManager = mock(JobManager.class);
  when(commandContext.getJobManager()).thenReturn(jobManager);
}
 
源代码9 项目: camunda-bpm-platform   文件: PropertyHelperTest.java
public void testJobExecutorConfigurationProperties() {
  // given
  JobExecutor jobExecutor = new DefaultJobExecutor();

  Map<String, String> propertiesToSet = new HashMap<String, String>();
  propertiesToSet.put(MAX_JOBS_PER_ACQUISITION, Integer.toString(Integer.MAX_VALUE));
  propertiesToSet.put(MAX_WAIT, Long.toString(Long.MAX_VALUE));
  propertiesToSet.put(WAIT_INCREASE_FACTOR, Float.toString(Float.MAX_VALUE));
  propertiesToSet.put(BACKOFF_TIME_IN_MILLIS, Integer.toString(Integer.MAX_VALUE));

  // when
  PropertyHelper.applyProperties(jobExecutor, propertiesToSet);

  // then
  Assert.assertEquals(Integer.MAX_VALUE, jobExecutor.getMaxJobsPerAcquisition());
  Assert.assertEquals(Long.MAX_VALUE, jobExecutor.getMaxWait());
  Assert.assertEquals(Float.MAX_VALUE, jobExecutor.getWaitIncreaseFactor(), 0.0001d);
  Assert.assertEquals(Integer.MAX_VALUE, jobExecutor.getBackoffTimeInMillis());
}
 
@OperateOnDeployment("pa1")
@Test
public void testDeploymentAwareJobAcquisition() {
  JobExecutor jobExecutor1 = engine1Configuration.getJobExecutor();

  ProcessInstance instance1 = engine1.getRuntimeService().startProcessInstanceByKey("archive1Process");
  ProcessInstance instance2 = processEngine.getRuntimeService().startProcessInstanceByKey("archive2Process");

  Job job1 = managementService.createJobQuery().processInstanceId(instance1.getId()).singleResult();
  Job job2 = managementService.createJobQuery().processInstanceId(instance2.getId()).singleResult();


  // the deployment aware configuration should only return the jobs of the registered deployments
  CommandExecutor commandExecutor = engine1Configuration.getCommandExecutorTxRequired();
  AcquiredJobs acquiredJobs = commandExecutor.execute(new AcquireJobsCmd(jobExecutor1));

  Assert.assertEquals(1, acquiredJobs.size());
  Assert.assertTrue(acquiredJobs.contains(job1.getId()));
  Assert.assertFalse(acquiredJobs.contains(job2.getId()));
}
 
@OperateOnDeployment("pa1")
@Test
public void testDeploymentUnawareJobAcquisition() {
  JobExecutor defaultJobExecutor = processEngineConfiguration.getJobExecutor();

  ProcessInstance instance1 = engine1.getRuntimeService().startProcessInstanceByKey("archive1Process");
  ProcessInstance instance2 = processEngine.getRuntimeService().startProcessInstanceByKey("archive2Process");

  Job job1 = managementService.createJobQuery().processInstanceId(instance1.getId()).singleResult();
  Job job2 = managementService.createJobQuery().processInstanceId(instance2.getId()).singleResult();

  // the deployment unaware configuration should return both jobs
  CommandExecutor commandExecutor = processEngineConfiguration.getCommandExecutorTxRequired();
  processEngineConfiguration.setJobExecutorDeploymentAware(false);
  try {
    AcquiredJobs acquiredJobs = commandExecutor.execute(new AcquireJobsCmd(defaultJobExecutor));

    Assert.assertEquals(2, acquiredJobs.size());
    Assert.assertTrue(acquiredJobs.contains(job1.getId()));
    Assert.assertTrue(acquiredJobs.contains(job2.getId()));
  } finally {
    processEngineConfiguration.setJobExecutorDeploymentAware(true);
  }
}
 
protected void waitForJobExecutorToProcessAllJobs(String processInstanceId, long maxMillisToWait, long intervalMillis) {
  JobExecutor jobExecutor = ((ProcessEngineConfigurationImpl) processEngine.getProcessEngineConfiguration()).getJobExecutor();
  jobExecutor.start();

  try {
    Timer timer = new Timer();
    InteruptTask task = new InteruptTask(Thread.currentThread());
    timer.schedule(task, maxMillisToWait);
    boolean areJobsAvailable = true;
    try {
      while (areJobsAvailable && !task.isTimeLimitExceeded()) {
        Thread.sleep(intervalMillis);
        areJobsAvailable = areJobsAvailable(processInstanceId);
      }
    } catch (InterruptedException e) {
    } finally {
      timer.cancel();
    }
    if (areJobsAvailable) {
      throw new ProcessEngineException("time limit of " + maxMillisToWait + " was exceeded");
    }

  } finally {
    jobExecutor.shutdown();
  }
}
 
public static JobExecutor jobExecutor(@Qualifier(JobConfiguration.CAMUNDA_TASK_EXECUTOR_QUALIFIER) final TaskExecutor taskExecutor) {
    final SpringJobExecutor springJobExecutor = new SpringJobExecutor();
    springJobExecutor.setTaskExecutor(taskExecutor);
    springJobExecutor.setRejectedJobsHandler(new CallerRunsRejectedJobsHandler());
    
    springJobExecutor.setWaitTimeInMillis(10);
//    springJobExecutor.setWaitIncreaseFactor(1.0f);
    springJobExecutor.setMaxWait(20);
    
    return springJobExecutor;
  }
 
private static Details from(JobExecutor jobExecutor) {
  final DetailsBuilder builder = Details.builder()
    .name(jobExecutor.getName())
    .lockOwner(jobExecutor.getLockOwner())
    .lockTimeInMillis(jobExecutor.getLockTimeInMillis())
    .maxJobsPerAcquisition(jobExecutor.getMaxJobsPerAcquisition())
    .waitTimeInMillis(jobExecutor.getWaitTimeInMillis());

  for (ProcessEngineImpl processEngineImpl : jobExecutor.getProcessEngines()) {
    builder.processEngineName(processEngineImpl.getName());
  }

  return builder.build();
}
 
private static Details from(JobExecutor jobExecutor) {
  final DetailsBuilder builder = Details.builder()
    .name(jobExecutor.getName())
    .lockOwner(jobExecutor.getLockOwner())
    .lockTimeInMillis(jobExecutor.getLockTimeInMillis())
    .maxJobsPerAcquisition(jobExecutor.getMaxJobsPerAcquisition())
    .waitTimeInMillis(jobExecutor.getWaitTimeInMillis());

  for (ProcessEngineImpl processEngineImpl : jobExecutor.getProcessEngines()) {
    builder.processEngineName(processEngineImpl.getName());
  }

  return builder.build();
}
 
源代码16 项目: camunda-bpm-platform   文件: JobManager.java
private void hintJobExecutorIfNeeded(JobEntity jobEntity, Date duedate) {
  // Check if this timer fires before the next time the job executor will check for new timers to fire.
  // This is highly unlikely because normally waitTimeInMillis is 5000 (5 seconds)
  // and timers are usually set further in the future
  JobExecutor jobExecutor = Context.getProcessEngineConfiguration().getJobExecutor();
  int waitTimeInMillis = jobExecutor.getWaitTimeInMillis();
  if (duedate.getTime() < (ClockUtil.getCurrentTime().getTime() + waitTimeInMillis)) {
    hintJobExecutor(jobEntity);
  }
}
 
源代码17 项目: camunda-bpm-platform   文件: JobManager.java
protected void hintJobExecutor(JobEntity job) {
  JobExecutor jobExecutor = Context.getProcessEngineConfiguration().getJobExecutor();
  if (!jobExecutor.isActive()) {
    return;
  }

  JobExecutorContext jobExecutorContext = Context.getJobExecutorContext();
  TransactionListener transactionListener = null;
  // add job to be executed in the current processor
  if(!job.isSuspended()
          && job.isExclusive()
          && isJobDue(job)
          && jobExecutorContext != null
          && jobExecutorContext.isExecutingExclusiveJob()
          && areInSameProcessInstance(job, jobExecutorContext.getCurrentJob())) {
    // lock job & add to the queue of the current processor
    Date currentTime = ClockUtil.getCurrentTime();
    job.setLockExpirationTime(new Date(currentTime.getTime() + jobExecutor.getLockTimeInMillis()));
    job.setLockOwner(jobExecutor.getLockOwner());
    transactionListener = new ExclusiveJobAddedNotification(job.getId(), jobExecutorContext);
  } else {
    // reset Acquisition strategy and notify the JobExecutor that
    // a new Job is available for execution on future runs
    transactionListener = new MessageAddedNotification(jobExecutor);
  }
  Context.getCommandContext()
    .getTransactionContext()
    .addTransactionListener(TransactionState.COMMITTED, transactionListener);
}
 
public void waitForJobExecutorToProcessAllJobs(long maxMillisToWait) {
  JobExecutor jobExecutor = processEngineConfiguration.getJobExecutor();
  jobExecutor.start();
  long intervalMillis = 1000;

  int jobExecutorWaitTime = jobExecutor.getWaitTimeInMillis() * 2;
  if(maxMillisToWait < jobExecutorWaitTime) {
    maxMillisToWait = jobExecutorWaitTime;
  }

  try {
    Timer timer = new Timer();
    InterruptTask task = new InterruptTask(Thread.currentThread());
    timer.schedule(task, maxMillisToWait);
    boolean areJobsAvailable = true;
    try {
      while (areJobsAvailable && !task.isTimeLimitExceeded()) {
        Thread.sleep(intervalMillis);
        try {
          areJobsAvailable = areJobsAvailable();
        } catch(Throwable t) {
          // Ignore, possible that exception occurs due to locking/updating of table on MSSQL when
          // isolation level doesn't allow READ of the table
        }
      }
    } catch (InterruptedException e) {
    } finally {
      timer.cancel();
    }
    if (areJobsAvailable) {
      throw new ProcessEngineException("time limit of " + maxMillisToWait + " was exceeded");
    }

  } finally {
    jobExecutor.shutdown();
  }
}
 
public void performOperationStep(DeploymentOperation operationContext) {

    final PlatformServiceContainer serviceContainer = operationContext.getServiceContainer();
    final AbstractProcessApplication processApplication = operationContext.getAttachment(PROCESS_APPLICATION);

    ClassLoader configurationClassloader = null;

    if(processApplication != null) {
      configurationClassloader = processApplication.getProcessApplicationClassloader();
    } else {
      configurationClassloader = ProcessEngineConfiguration.class.getClassLoader();
    }

    String configurationClassName = jobAcquisitionXml.getJobExecutorClassName();

    if(configurationClassName == null || configurationClassName.isEmpty()) {
      configurationClassName = RuntimeContainerJobExecutor.class.getName();
    }

    // create & instantiate the job executor class
    Class<? extends JobExecutor> jobExecutorClass = loadJobExecutorClass(configurationClassloader, configurationClassName);
    JobExecutor jobExecutor = instantiateJobExecutor(jobExecutorClass);

    // apply properties
    Map<String, String> properties = jobAcquisitionXml.getProperties();
    PropertyHelper.applyProperties(jobExecutor, properties);

    // construct service for job executor
    JmxManagedJobExecutor jmxManagedJobExecutor = new JmxManagedJobExecutor(jobExecutor);

    // deploy the job executor service into the container
    serviceContainer.startService(ServiceTypes.JOB_EXECUTOR, jobAcquisitionXml.getName(), jmxManagedJobExecutor);
  }
 
protected JobExecutor instantiateJobExecutor(Class<? extends JobExecutor> configurationClass) {
  try {
    return configurationClass.newInstance();
  }
  catch (Exception e) {
    throw LOG.couldNotInstantiateJobExecutorClass(e);
  }
}
 
@SuppressWarnings("unchecked")
protected Class<? extends JobExecutor> loadJobExecutorClass(ClassLoader processApplicationClassloader, String jobExecutorClassname) {
  try {
    return (Class<? extends JobExecutor>) processApplicationClassloader.loadClass(jobExecutorClassname);
  }
  catch (ClassNotFoundException e) {
    throw LOG.couldNotLoadJobExecutorClass(e);
  }
}
 
public void run() {
  try {
    JobExecutor jobExecutor = processEngineConfiguration.getJobExecutor();
    jobs = (AcquiredJobs) processEngineConfiguration
      .getCommandExecutorTxRequired()
      .execute(new ControlledCommand(activeThread, new AcquireJobsCmd(jobExecutor)));

  } catch (OptimisticLockingException e) {
    this.exception = e;
  }
  LOG.debug(getName()+" ends");
}
 
@Override
public void run() {
  try {
    JobExecutor jobExecutor = processEngineConfiguration.getJobExecutor();
    acquiredJobs = processEngineConfiguration.getCommandExecutorTxRequired()
      .execute(new ControlledCommand<AcquiredJobs>(activeThread, new AcquireJobsCmd(jobExecutor)));

  } catch (OptimisticLockingException e) {
    this.exception = e;
  }
  LOG.debug(getName()+" ends");
}
 
@Deployment(resources = {"org/camunda/bpm/engine/test/api/mgmt/timerOnTask.bpmn20.xml"})
public void testDeleteJobThatWasAlreadyAcquired() {
  ClockUtil.setCurrentTime(new Date());

  ProcessInstance processInstance = runtimeService.startProcessInstanceByKey("timerOnTask");
  Job timerJob = managementService.createJobQuery().processInstanceId(processInstance.getId()).singleResult();

  // We need to move time at least one hour to make the timer executable
  ClockUtil.setCurrentTime(new Date(ClockUtil.getCurrentTime().getTime() + 7200000L));

  // Acquire job by running the acquire command manually
  ProcessEngineImpl processEngineImpl = (ProcessEngineImpl) processEngine;
  JobExecutor jobExecutor = processEngineImpl.getProcessEngineConfiguration().getJobExecutor();
  AcquireJobsCmd acquireJobsCmd = new AcquireJobsCmd(jobExecutor);
  CommandExecutor commandExecutor = processEngineImpl.getProcessEngineConfiguration().getCommandExecutorTxRequired();
  commandExecutor.execute(acquireJobsCmd);

  // Try to delete the job. This should fail.
  try {
    managementService.deleteJob(timerJob.getId());
    fail();
  } catch (ProcessEngineException e) {
    // Exception is expected
  }

  // Clean up
  managementService.executeJob(timerJob.getId());
}
 
public void testJobCommandsWithMessage() {
  CommandExecutor commandExecutor = processEngineConfiguration.getCommandExecutorTxRequired();
  JobExecutor jobExecutor = processEngineConfiguration.getJobExecutor();
  String jobId = commandExecutor.execute(new Command<String>() {

    public String execute(CommandContext commandContext) {
      MessageEntity message = createTweetMessage("i'm coding a test");
      commandContext.getJobManager().send(message);
      return message.getId();
    }
  });

  AcquiredJobs acquiredJobs = commandExecutor.execute(new AcquireJobsCmd(jobExecutor));
  List<List<String>> jobIdsList = acquiredJobs.getJobIdBatches();
  assertEquals(1, jobIdsList.size());

  List<String> jobIds = jobIdsList.get(0);

  List<String> expectedJobIds = new ArrayList<String>();
  expectedJobIds.add(jobId);

  assertEquals(expectedJobIds, new ArrayList<String>(jobIds));
  assertEquals(0, tweetHandler.getMessages().size());

  ExecuteJobHelper.executeJob(jobId, commandExecutor);

  assertEquals("i'm coding a test", tweetHandler.getMessages().get(0));
  assertEquals(1, tweetHandler.getMessages().size());

  clearDatabase();
}
 
public void waitForJobExecutorToProcessAllJobs(long maxMillisToWait) {
  ProcessEngineConfigurationImpl processEngineConfiguration = (ProcessEngineConfigurationImpl) processEngine.getProcessEngineConfiguration();
  JobExecutor jobExecutor = processEngineConfiguration.getJobExecutor();
  jobExecutor.start();
  long intervalMillis = 1000;

  int jobExecutorWaitTime = jobExecutor.getWaitTimeInMillis() * 2;
  if(maxMillisToWait < jobExecutorWaitTime) {
    maxMillisToWait = jobExecutorWaitTime;
  }

  try {
    Timer timer = new Timer();
    InterruptTask task = new InterruptTask(Thread.currentThread());
    timer.schedule(task, maxMillisToWait);
    boolean areJobsAvailable = true;
    try {
      while (areJobsAvailable && !task.isTimeLimitExceeded()) {
        Thread.sleep(intervalMillis);
        try {
          areJobsAvailable = areJobsAvailable();
        } catch(Throwable t) {
          // Ignore, possible that exception occurs due to locking/updating of table on MSSQL when
          // isolation level doesn't allow READ of the table
        }
      }
    } catch (InterruptedException e) {
    } finally {
      timer.cancel();
    }
    if (areJobsAvailable) {
      throw new AssertionError("time limit of " + maxMillisToWait + " was exceeded");
    }

  } finally {
    jobExecutor.shutdown();
  }
}
 
private void moveByHours(int hours) throws Exception {
  ClockUtil.setCurrentTime(new Date(ClockUtil.getCurrentTime().getTime() + ((hours * 60 * 1000 * 60) + 5000)));
  JobExecutor jobExecutor = processEngineConfiguration.getJobExecutor();
  jobExecutor.start();
  Thread.sleep(1000);
  jobExecutor.shutdown();
}
 
public void waitForJobExecutorToProcessAllJobs(JobExecutor jobExecutor, long maxMillisToWait) {

    int checkInterval = 1000;

    jobExecutor.start();

    try {
      Timer timer = new Timer();
      InterruptTask task = new InterruptTask(Thread.currentThread());
      timer.schedule(task, maxMillisToWait);
      boolean areJobsAvailable = true;
      try {
        while (areJobsAvailable && !task.isTimeLimitExceeded()) {
          Thread.sleep(checkInterval);
          areJobsAvailable = areJobsAvailable();
        }
      } catch (InterruptedException e) {
      } finally {
        timer.cancel();
      }
      if (areJobsAvailable) {
        throw new RuntimeException("time limit of " + maxMillisToWait + " was exceeded (still " + numberOfJobsAvailable() + " jobs available)");
      }

    } finally {
      jobExecutor.shutdown();
    }
  }
 
@Test
public void shouldNotActiateJobExecutor() {

  ProcessEngine processEngine = processEngineService.getProcessEngine("jobExecutorActivate-FALSE-engine");
  ProcessEngineConfiguration configuration = processEngine.getProcessEngineConfiguration();
  JobExecutor jobExecutor = ((ProcessEngineConfigurationImpl)configuration).getJobExecutor();
  assertFalse(jobExecutor.isActive());

  processEngine = processEngineService.getProcessEngine("jobExecutorActivate-UNDEFINED-engine");
  configuration = processEngine.getProcessEngineConfiguration();
  jobExecutor = ((ProcessEngineConfigurationImpl)configuration).getJobExecutor();
  assertTrue(jobExecutor.isActive());

}
 
@Bean
@ConditionalOnBean(name = "jobExecutor")
@ConditionalOnMissingBean(name = "jobExecutorHealthIndicator")
public HealthIndicator jobExecutorHealthIndicator(JobExecutor jobExecutor) {
  return new JobExecutorHealthIndicator(jobExecutor);
}