下面列出了org.springframework.boot.autoconfigure.batch.JobExecutionEvent#org.springframework.batch.core.BatchStatus 实例代码,或者点击链接到github查看源代码,也可以在右侧发表评论。
@Override
public void afterJob(JobExecution jobExecution) {
if (jobExecution.getStatus() == BatchStatus.COMPLETED) {
log.info(">>>>> PAY ROLL JOB FINISHED! ");
jdbcTemplate.query("SELECT PERSON_IDENTIFICATION, CURRENCY, TX_AMMOUNT, ACCOUNT_TYPE, ACCOUNT_ID, TX_DESCRIPTION, FIRST_LAST_NAME FROM PAYROLL",
(rs, row) -> new PayrollTo(
rs.getInt(1),
rs.getString(2),
rs.getDouble(3),
rs.getString(4),
rs.getString(5),
rs.getString(6),
rs.getString(7))
).forEach(payroll -> log.info("Found <" + payroll + "> in the database."));
}
}
public static void main(String[] args) throws Exception{
GenericXmlApplicationContext applicationContext = new GenericXmlApplicationContext("/applicationContext-test.xml");
JobLauncherTestUtils testLauncher = applicationContext.getBean(JobLauncherTestUtils.class);
JobExecution jobExecution = testLauncher.launchJob(getParameters());
BatchStatus status = jobExecution.getStatus();
assertEquals(BatchStatus.COMPLETED, status);
applicationContext.close();
}
@Override
public synchronized void afterJob(JobExecution jobExecution) {
if(jobExecution.getStatus() == BatchStatus.COMPLETED) {
if(columnRangePartitioner.isFirstRun()){
columnRangePartitioner.setFirstRun(false);
}
log.info("!!! JOB FINISHED! promoting last good record date to JobExecutionContext");
jobExecution.getExecutionContext().put("last_successful_timestamp_from_this_job", timeOfNextJob);
jobRepository.updateExecutionContext(jobExecution);
// Workaround to close ElasticSearch REST properly (the job was stuck before this change)
if (esRestService != null) {
try {
esRestService.destroy();
} catch (IOException e) {
log.warn("IOException when destroying ElasticSearch REST service");
}
log.debug("Shutting down ElasticSearch REST service");
esServiceAlreadyClosed = true;
}
}
}
@Test
public void testRunJob() throws InterruptedException {
Long executionId = restTemplate.postForObject("http://localhost:" + port + "/batch/operations/jobs/delayJob",
"", Long.class);
Thread.sleep(500);
String runningExecutions = restTemplate
.getForObject("http://localhost:" + port + "/batch/monitoring/jobs/runningexecutions", String.class);
assertThat(runningExecutions.contains(executionId.toString()), is(true));
String runningExecutionsForDelayJob = restTemplate.getForObject(
"http://localhost:" + port + "/batch/monitoring/jobs/runningexecutions/delayJob", String.class);
assertThat(runningExecutionsForDelayJob.contains(executionId.toString()), is(true));
restTemplate.delete("http://localhost:" + port + "/batch/operations/jobs/executions/{executionId}",
executionId);
Thread.sleep(1500);
JobExecution jobExecution = jobExplorer.getJobExecution(executionId);
assertThat(jobExecution.getStatus(), is(BatchStatus.STOPPED));
String jobExecutionString = restTemplate.getForObject(
"http://localhost:" + port + "/batch/monitoring/jobs/executions/{executionId}", String.class,
executionId);
assertThat(jobExecutionString.contains("STOPPED"), is(true));
}
@Test
public void testRunJob() throws InterruptedException {
Long executionId = restTemplate.postForObject("http://localhost:" + port + "/batch/operations/jobs/simpleJob",
"", Long.class);
while (!restTemplate
.getForObject("http://localhost:" + port + "/batch/operations/jobs/executions/{executionId}",
String.class, executionId)
.equals("COMPLETED")) {
Thread.sleep(1000);
}
String log = restTemplate.getForObject(
"http://localhost:" + port + "/batch/operations/jobs/executions/{executionId}/log", String.class,
executionId);
assertThat(log.length() > 20, is(true));
JobExecution jobExecution = jobExplorer.getJobExecution(executionId);
assertThat(jobExecution.getStatus(), is(BatchStatus.COMPLETED));
String jobExecutionString = restTemplate.getForObject(
"http://localhost:" + port + "/batch/monitoring/jobs/executions/{executionId}", String.class,
executionId);
assertThat(jobExecutionString.contains("COMPLETED"), is(true));
}
@Test
public void testRunFlatFileToDbSkipJob_SkipInWrite() throws InterruptedException {
JobExecution jobExecution = runJob("flatFileToDbSkipJob", "metrics/flatFileToDbSkipJob_SkipInWrite.csv");
assertThat(jobExecution.getStatus(), is(BatchStatus.COMPLETED));
ExecutionContext executionContext = jobExecution.getStepExecutions().iterator().next().getExecutionContext();
long writeCount = 7L;
MetricValidator validator = MetricValidatorBuilder.metricValidator().withExecutionContext(executionContext)
.withBeforeChunkCount(4L).withStreamOpenCount(1L).withStreamUpdateCount(4L).withStreamCloseCount(0L)
.withBeforeReadCount(9L).withReadCount(9L).withAfterReadCount(8L).withReadErrorCount(0L)
.withBeforeProcessCount(7L).withProcessCount(7L).withAfterProcessCount(7L).withProcessErrorCount(0L)
.withBeforeWriteCount(5L).withWriteCount(writeCount).withAfterWriteCount(7L).withWriteErrorCount(4L)
.withAfterChunkCount(4L).withChunkErrorCount(2L).withSkipInReadCount(0L).withSkipInProcessCount(0L)
.withSkipInWriteCount(1L).build();
validator.validate();
// if one is correct, all will be in the metricReader, so I check just one
Gauge gauge = meterRegistry.find(MetricsListener.METRIC_NAME)//
.tag("context", "flatFileToDbSkipJob.step")//
.tag("name", MetricNames.PROCESS_COUNT.getName())//
.gauge();
assertThat((Double) gauge.value(), is(notNullValue()));
assertThat(jdbcTemplate.queryForObject("SELECT COUNT(*) FROM ITEM", Long.class), is(writeCount));
}
@Test
public void testRunFlatFileToDbNoSkipJob_Success() throws InterruptedException {
JobExecution jobExecution = runJob("flatFileToDbNoSkipJob", "metrics/flatFileToDbNoSkipJob_Success.csv");
assertThat(jobExecution.getStatus(), is(BatchStatus.COMPLETED));
ExecutionContext executionContext = jobExecution.getStepExecutions().iterator().next().getExecutionContext();
long writeCount = 5L;
MetricValidator validator = MetricValidatorBuilder.metricValidator().withExecutionContext(executionContext)
.withBeforeChunkCount(2L).withStreamOpenCount(1L).withStreamUpdateCount(3L).withStreamCloseCount(0L)
.withBeforeReadCount(6L).withReadCount(6L).withAfterReadCount(5L).withReadErrorCount(0L)
.withBeforeProcessCount(5L).withProcessCount(5L).withAfterProcessCount(5L).withProcessErrorCount(0L)
.withBeforeWriteCount(5L).withWriteCount(writeCount).withAfterWriteCount(5L).withAfterChunkCount(2L)
.withChunkErrorCount(0L).withSkipInReadCount(0L).withSkipInProcessCount(0L).withSkipInWriteCount(0L)
.build();
validator.validate();
// if one is correct, all will be in the metricReader, so I check just one
Gauge gauge = meterRegistry.find(MetricsListener.METRIC_NAME)//
.tag("context", "flatFileToDbNoSkipJob.step")//
.tag("name", MetricNames.PROCESS_COUNT.getName())//
.gauge();
assertThat((Double) gauge.value(), is(notNullValue())); // TODO
assertThat(jdbcTemplate.queryForObject("SELECT COUNT(*) FROM ITEM", Long.class), is(writeCount));
}
@Before
public void setup() throws Exception {
if (!initialized) {
registerApp(ApplicationType.task, "timestamp", "1.2.0.RELEASE");
initialize();
createJobExecution(JOB_NAME, BatchStatus.STARTED);
documentation.dontDocument(() -> this.mockMvc.perform(
post("/tasks/definitions")
.param("name", "DOCJOB_1")
.param("definition", "timestamp --format='YYYY MM DD'"))
.andExpect(status().isOk()));
initialized = true;
}
}
/**
* Retrieve all task job executions with the task name specified
*
* @param jobName name of the job. SQL server specific wildcards are enabled (eg.: myJob%,
* m_Job, ...)
* @param pageable page-able collection of {@code TaskJobExecution}s.
* @param assembler for the {@link TaskJobExecution}s
* @return list task/job executions with the specified jobName.
* @throws NoSuchJobException if the job with the given name does not exist.
*/
@RequestMapping(value = "", method = RequestMethod.GET, produces = "application/json")
@ResponseStatus(HttpStatus.OK)
public PagedModel<JobExecutionResource> retrieveJobsByParameters(
@RequestParam(value = "name", required = false) String jobName,
@RequestParam(value = "status", required = false) BatchStatus status,
Pageable pageable, PagedResourcesAssembler<TaskJobExecution> assembler) throws NoSuchJobException, NoSuchJobExecutionException {
List<TaskJobExecution> jobExecutions;
Page<TaskJobExecution> page;
if (jobName == null && status == null) {
jobExecutions = taskJobService.listJobExecutions(pageable);
page = new PageImpl<>(jobExecutions, pageable, taskJobService.countJobExecutions());
} else {
jobExecutions = taskJobService.listJobExecutionsForJob(pageable, jobName, status);
page = new PageImpl<>(jobExecutions, pageable,
taskJobService.countJobExecutionsForJob(jobName, status));
}
return assembler.toModel(page, jobAssembler);
}
@Test
public void testRunFlatFileToDbSkipJob_SkipInRead() throws InterruptedException {
JobExecution jobExecution = runJob("flatFileToDbSkipJob", "metrics/flatFileToDbSkipJob_SkipInRead.csv");
assertThat(jobExecution.getStatus(), is(BatchStatus.COMPLETED));
ExecutionContext executionContext = jobExecution.getStepExecutions().iterator().next().getExecutionContext();
long writeCount = 7L;
MetricValidator validator = MetricValidatorBuilder.metricValidator().withExecutionContext(executionContext)
.withBeforeChunkCount(3L).withStreamOpenCount(1L).withStreamUpdateCount(4L).withStreamCloseCount(0L)
.withBeforeReadCount(9L).withReadCount(9L).withAfterReadCount(7L).withReadErrorCount(1L)
.withBeforeProcessCount(7L).withProcessCount(7L).withAfterProcessCount(7L).withProcessErrorCount(0L)
.withBeforeWriteCount(7L).withWriteCount(writeCount).withAfterWriteCount(7L).withWriteErrorCount(0L)
.withAfterChunkCount(3L).withChunkErrorCount(0L).withSkipInReadCount(1L).withSkipInProcessCount(0L)
.withSkipInWriteCount(0L).build();
validator.validate();
// if one is correct, all will be in the metricReader, so I check just one
Gauge gauge = meterRegistry.find(MetricsListener.METRIC_NAME)//
.tag("context", "flatFileToDbSkipJob.step")//
.tag("name", MetricNames.PROCESS_COUNT.getName())//
.gauge();
assertThat((Double) gauge.value(), is(notNullValue()));
assertThat(jdbcTemplate.queryForObject("SELECT COUNT(*) FROM ITEM", Long.class), is(writeCount));
}
@Test
public void testRunJob() throws InterruptedException {
Long executionId = restTemplate
.postForObject("http://localhost:" + port + "/batch/operations/jobs/flatFile2JobXml", "", Long.class);
while (!restTemplate
.getForObject("http://localhost:" + port + "/batch/operations/jobs/executions/{executionId}",
String.class, executionId)
.equals("COMPLETED")) {
Thread.sleep(1000);
}
String log = restTemplate.getForObject(
"http://localhost:" + port + "/batch/operations/jobs/executions/{executionId}/log", String.class,
executionId);
assertThat(log.length() > 20, is(true));
JobExecution jobExecution = jobExplorer.getJobExecution(executionId);
assertThat(jobExecution.getStatus(), is(BatchStatus.COMPLETED));
String jobExecutionString = restTemplate.getForObject(
"http://localhost:" + port + "/batch/monitoring/jobs/executions/{executionId}", String.class,
executionId);
assertThat(jobExecutionString.contains("COMPLETED"), is(true));
}
public StepExecution mapRow(ResultSet rs, int rowNum) throws SQLException {
StepExecution stepExecution = new StepExecution(rs.getString(2), null);
stepExecution.setId(rs.getLong(1));
stepExecution.setStartTime(rs.getTimestamp(3));
stepExecution.setEndTime(rs.getTimestamp(4));
stepExecution.setStatus(BatchStatus.valueOf(rs.getString(5)));
stepExecution.setCommitCount(rs.getInt(6));
stepExecution.setReadCount(rs.getInt(7));
stepExecution.setFilterCount(rs.getInt(8));
stepExecution.setWriteCount(rs.getInt(9));
stepExecution.setExitStatus(new ExitStatus(rs.getString(10), rs.getString(11)));
stepExecution.setReadSkipCount(rs.getInt(12));
stepExecution.setWriteSkipCount(rs.getInt(13));
stepExecution.setProcessSkipCount(rs.getInt(14));
stepExecution.setRollbackCount(rs.getInt(15));
stepExecution.setLastUpdated(rs.getTimestamp(16));
stepExecution.setVersion(rs.getInt(17));
return stepExecution;
}
static MockMvc createBaseJobExecutionMockMvc(JobRepository jobRepository, TaskBatchDao taskBatchDao,
TaskExecutionDao taskExecutionDao, WebApplicationContext wac, RequestMappingHandlerAdapter adapter) {
MockMvc mockMvc = MockMvcBuilders.webAppContextSetup(wac)
.defaultRequest(get("/").accept(MediaType.APPLICATION_JSON)).build();
JobExecutionUtils.createSampleJob(jobRepository, taskBatchDao, taskExecutionDao, JOB_NAME_ORIG, 1);
JobExecutionUtils.createSampleJob(jobRepository, taskBatchDao, taskExecutionDao, JOB_NAME_FOO, 1);
JobExecutionUtils.createSampleJob(jobRepository, taskBatchDao, taskExecutionDao, JOB_NAME_FOOBAR, 2);
JobExecutionUtils.createSampleJob(jobRepository, taskBatchDao, taskExecutionDao, JOB_NAME_COMPLETED, 1, BatchStatus.COMPLETED);
JobExecutionUtils.createSampleJob(jobRepository, taskBatchDao, taskExecutionDao, JOB_NAME_STARTED, 1, BatchStatus.STARTED);
JobExecutionUtils.createSampleJob(jobRepository, taskBatchDao, taskExecutionDao, JOB_NAME_STOPPED, 1, BatchStatus.STOPPED);
JobExecutionUtils.createSampleJob(jobRepository, taskBatchDao, taskExecutionDao, JOB_NAME_FAILED1, 1, BatchStatus.FAILED);
JobExecutionUtils.createSampleJob(jobRepository, taskBatchDao, taskExecutionDao, JOB_NAME_FAILED2, 1, BatchStatus.FAILED);
for (HttpMessageConverter<?> converter : adapter.getMessageConverters()) {
if (converter instanceof MappingJackson2HttpMessageConverter) {
final MappingJackson2HttpMessageConverter jacksonConverter = (MappingJackson2HttpMessageConverter) converter;
jacksonConverter.getObjectMapper().addMixIn(StepExecution.class, StepExecutionJacksonMixIn.class);
jacksonConverter.getObjectMapper().addMixIn(ExecutionContext.class, ExecutionContextJacksonMixIn.class);
jacksonConverter.getObjectMapper().setDateFormat(new ISO8601DateFormatWithMilliSeconds());
}
}
return mockMvc;
}
private static void createSampleJob(JobRepository jobRepository, TaskBatchDao taskBatchDao,
TaskExecutionDao taskExecutionDao, String jobName,
int jobExecutionCount, BatchStatus status) {
JobInstance instance = jobRepository.createJobInstance(jobName, new JobParameters());
TaskExecution taskExecution = taskExecutionDao.createTaskExecution(jobName, new Date(), new ArrayList<>(), null);
JobExecution jobExecution;
for (int i = 0; i < jobExecutionCount; i++) {
jobExecution = jobRepository.createJobExecution(instance, new JobParameters(), null);
StepExecution stepExecution = new StepExecution("foo", jobExecution, 1L);
stepExecution.setId(null);
jobRepository.add(stepExecution);
taskBatchDao.saveRelationship(taskExecution, jobExecution);
jobExecution.setStatus(status);
jobExecution.setStartTime(new Date());
if (BatchStatus.STOPPED.equals(status)) {
jobExecution.setEndTime(new Date());
}
jobRepository.update(jobExecution);
}
}
private void createSampleJob(JobRepository jobRepository, TaskBatchDao taskBatchDao,
TaskExecutionDao taskExecutionDao, String jobName,
int jobExecutionCount, BatchStatus status) {
JobInstance instance = jobRepository.createJobInstance(jobName, new JobParameters());
TaskExecution taskExecution = taskExecutionDao.createTaskExecution(jobName, new Date(), new ArrayList<>(), null);
JobExecution jobExecution;
for (int i = 0; i < jobExecutionCount; i++) {
jobExecution = jobRepository.createJobExecution(instance,
this.jobParameters, null);
StepExecution stepExecution = new StepExecution("foo", jobExecution, 1L);
stepExecution.setId(null);
jobRepository.add(stepExecution);
taskBatchDao.saveRelationship(taskExecution, jobExecution);
jobExecution.setStatus(status);
jobExecution.setStartTime(new Date());
jobRepository.update(jobExecution);
}
}
@Test
public void testRunFlatFileToDbSkipJob_SkipInProcess_ReaderTransactional() throws InterruptedException {
JobExecution jobExecution = runJob("flatFileToDbSkipReaderTransactionalJob",
"metrics/flatFileToDbSkipJob_SkipInProcess.csv");
assertThat(jobExecution.getStatus(), is(BatchStatus.COMPLETED));
ExecutionContext executionContext = jobExecution.getStepExecutions().iterator().next().getExecutionContext();
long writeCount = 5L;
MetricValidator validator = MetricValidatorBuilder.metricValidator().withExecutionContext(executionContext)
.withBeforeChunkCount(2L).withStreamOpenCount(1L).withStreamUpdateCount(3L).withStreamCloseCount(0L)
.withBeforeReadCount(6L).withReadCount(6L).withAfterReadCount(5L).withReadErrorCount(0L)
.withBeforeProcessCount(5L).withProcessCount(5L).withAfterProcessCount(5L).withProcessErrorCount(1L)
.withBeforeWriteCount(5L).withWriteCount(writeCount).withAfterWriteCount(5L).withAfterChunkCount(2L)
.withChunkErrorCount(1L).withSkipInReadCount(0L).withSkipInProcessCount(0L).withSkipInWriteCount(0L)
.build();
validator.validate();
// if one is correct, all will be in the metricReader, so I check just one
Gauge gauge = meterRegistry.find(MetricsListener.METRIC_NAME)//
.tag("context", "flatFileToDbSkipReaderTransactionalJob.step")//
.tag("name", MetricNames.PROCESS_COUNT.getName())//
.gauge();
assertThat((Double) gauge.value(), is(notNullValue()));
assertThat(jdbcTemplate.queryForObject("SELECT COUNT(*) FROM ITEM", Long.class), is(writeCount));
}
@Override
public void afterJob(final JobExecution jobExecution) {
if(jobExecution.getStatus() == BatchStatus.COMPLETED) {
LOGGER.info("!!! JOB FINISHED! Time to verify the results");
jdbcTemplate.query("SELECT volt, time FROM voltage",
(rs, row) -> new Voltage(
rs.getBigDecimal(1),
rs.getDouble(2))
).forEach(voltage -> LOGGER.info("Found <" + voltage + "> in the database."));
}
}
@Override
public void afterJob(JobExecution jobExecution) {
if (jobExecution.getStatus() == BatchStatus.COMPLETED) {
log.info("JdbcBatchItemWriter job finished");
} else {
log.info("JdbcBatchItemWriter job " + jobExecution.getStatus().name());
}
}
@Test
public void testRunJob() throws InterruptedException {
Long executionId = restTemplate.postForObject(
"http://localhost:" + port + "/batch/operations/jobs/simpleBatchMetricsJob", "", Long.class);
while (!restTemplate
.getForObject("http://localhost:" + port + "/batch/operations/jobs/executions/{executionId}",
String.class, executionId)
.equals("COMPLETED")) {
Thread.sleep(1000);
}
String log = restTemplate.getForObject(
"http://localhost:" + port + "/batch/operations/jobs/executions/{executionId}/log", String.class,
executionId);
assertThat(log.length() > 20, is(true));
JobExecution jobExecution = jobExplorer.getJobExecution(executionId);
assertThat(jobExecution.getStatus(), is(BatchStatus.COMPLETED));
String jobExecutionString = restTemplate.getForObject(
"http://localhost:" + port + "/batch/monitoring/jobs/executions/{executionId}", String.class,
executionId);
assertThat(jobExecutionString.contains("COMPLETED"), is(true));
Gauge gauge = meterRegistry.find(MetricsListener.METRIC_NAME)//
.tag("context", "simpleBatchMetricsJob.simpleBatchMetricsStep")//
.tag("name", "processor")//
.gauge();
assertThat(gauge, is(notNullValue()));
assertThat((Double) gauge.value(), is(notNullValue()));
assertThat((Double) gauge.value(), is(7.0));
Timer timer = meterRegistry.find(MetricsListener.METRIC_NAME)//
.tag("context", "simpleBatchMetricsJob.simpleBatchMetricsStep")//
.tag("method", "DummyItemReader.read")//
.timer();
assertThat(timer, is(notNullValue()));
assertThat(timer.totalTime(TimeUnit.SECONDS), greaterThan(0d));
}
@Override
public void afterJob(JobExecution jobExecution) {
if (jobExecution.getStatus() == BatchStatus.COMPLETED) {
log.info("JpaPagingBatch job finished");
} else {
log.info("JpaPagingBatch job " + jobExecution.getStatus().name());
}
}
@Override
public void afterJob(JobExecution jobExecution) {
if(jobExecution.getStatus() == BatchStatus.COMPLETED){
LogUtil.getLogger().info("批量任务Job-->[{}-{}]-->处理完成,TotalDuration[{}]ms", jobExecution.getJobId(), jobExecution.getJobInstance().getJobName(), SystemClockUtil.INSTANCE.now()-jobExecution.getStartTime().getTime());
LogUtil.getLogger().info("=======================================================================");
}
}
@Override
public ExitStatus afterStep(StepExecution stepExecution) {
if(stepExecution.getStatus() == BatchStatus.COMPLETED){
LogUtil.getLogger().info("批量任务Step-->[{}-{}]-->处理完成,ReadCount=[{}],WriteCount=[{}],CommitCount==[{}],Duration[{}]ms", stepExecution.getJobExecutionId(), stepExecution.getStepName(), stepExecution.getReadCount(), stepExecution.getWriteCount(), stepExecution.getCommitCount(), SystemClockUtil.INSTANCE.now()-stepExecution.getStartTime().getTime());
LogUtil.getLogger().info("-----------------------------------------------------------------------");
}
return stepExecution.getExitStatus();
}
/**
* Determine whether the provided {@link JobExecution} is restartable.
*
* @param jobExecution Must not be null and its {@link BatchStatus} must not be null
* either.
* @return Never returns null
*/
public static boolean isJobExecutionRestartable(JobExecution jobExecution) {
Assert.notNull(jobExecution, "The provided jobExecution must not be null.");
final BatchStatus batchStatus = jobExecution.getStatus();
Assert.notNull(batchStatus, "The BatchStatus of the provided jobExecution must not be null.");
return batchStatus.isGreaterThan(BatchStatus.STOPPING) && batchStatus.isLessThan(BatchStatus.ABANDONED);
}
/**
* Determine whether the provided {@link JobExecution} is abandonable.
*
* @param jobExecution Must not be null and its {@link BatchStatus} must not be null
* either.
* @return Never returns null
*/
public static boolean isJobExecutionAbandonable(JobExecution jobExecution) {
Assert.notNull(jobExecution, "The provided jobExecution must not be null.");
final BatchStatus batchStatus = jobExecution.getStatus();
Assert.notNull(batchStatus, "The BatchStatus of the provided jobExecution must not be null.");
return batchStatus.isGreaterThan(BatchStatus.STARTED) && batchStatus != BatchStatus.ABANDONED;
}
@Test
public void testAfterJobJobExecutionHasBatchStatusFailedWhenWeHaveStepsWithStatusNotCompleted() throws Exception {
JobExecution jobExecution = new JobExecution(1L);
StepExecution failedStepExecution = createStepExecution(jobExecution, FAILED);
StepExecution completedStepExecution = createStepExecution(jobExecution, COMPLETED);
StepExecution stoppedStepExecution = createStepExecution(jobExecution, STOPPED);
jobExecution.addStepExecutions(asList(failedStepExecution, completedStepExecution, stoppedStepExecution));
changeStatusOnFailedStepsJobExecListener.afterJob(jobExecution);
assertThat(jobExecution.getStatus()).isEqualTo(BatchStatus.FAILED);
}
@Before
public void setup() throws Exception {
if (!initialized) {
registerApp(ApplicationType.task, "timestamp", "1.2.0.RELEASE");
initialize();
createJobExecution(JOB_NAME, BatchStatus.STARTED);
initialized = true;
}
}
@Test
public void jobTest() {
JobParametersBuilder builder = new JobParametersBuilder();
try {
JobExecution execution = jobLauncher.run(validJob, builder.toJobParameters());
assertEquals(BatchStatus.COMPLETED, execution.getStatus());
} catch (JobExecutionAlreadyRunningException | JobRestartException | JobInstanceAlreadyCompleteException
| JobParametersInvalidException e) {
e.printStackTrace();
}
}
private void createJobExecution(String name, BatchStatus status) {
TaskExecution taskExecution = this.dao.createTaskExecution(name, new Date(), new ArrayList<>(), null);
JobExecution jobExecution = this.jobRepository.createJobExecution(this.jobRepository.createJobInstance(name, new JobParameters()), new JobParameters(), null);
StepExecution stepExecution = new StepExecution(name + "_STEP", jobExecution, jobExecution.getId());
stepExecution.setId(null);
jobRepository.add(stepExecution);
this.taskBatchDao.saveRelationship(taskExecution, jobExecution);
jobExecution.setStatus(status);
jobExecution.setStartTime(new Date());
this.jobRepository.update(jobExecution);
}
@Test
public void runJavaConfigJob() throws Exception {
File file = new File("target/out-javaconfig.txt");
file.delete();
jobOperator.start("flatFileJob", "");
while (jobRepository.getLastJobExecution("flatFileJob", new JobParameters()).getStatus().isRunning()) {
Thread.sleep(100);
}
assertEquals(BatchStatus.COMPLETED,
jobRepository.getLastJobExecution("flatFileJob", new JobParameters()).getStatus());
assertEquals(3, FileUtils.readLines(file, StandardCharsets.UTF_8).size());
}
private void createJobExecution(String name, BatchStatus status) {
TaskExecution taskExecution = this.dao.createTaskExecution(name, new Date(), new ArrayList<>(), null);
Map<String, JobParameter> jobParameterMap = new HashMap<>();
jobParameterMap.put("-spring.cloud.data.flow.platformname", new JobParameter("default"));
JobParameters jobParameters = new JobParameters(jobParameterMap);
JobExecution jobExecution = this.jobRepository.createJobExecution(this.jobRepository.createJobInstance(name, new JobParameters()), jobParameters, null);
this.taskBatchDao.saveRelationship(taskExecution, jobExecution);
jobExecution.setStatus(status);
jobExecution.setStartTime(new Date());
this.jobRepository.update(jobExecution);
}