下面列出了org.springframework.jdbc.core.namedparam.BeanPropertySqlParameterSource#org.springframework.batch.core.scope.context.ChunkContext 实例代码,或者点击链接到github查看源代码,也可以在右侧发表评论。
@Test
@DirtiesContext
public void testTaskLauncherTasklet() throws Exception{
createCompleteTaskExecution(0);
TaskLauncherTasklet taskLauncherTasklet =
getTaskExecutionTasklet();
ChunkContext chunkContext = chunkContext();
mockReturnValForTaskExecution(1L);
execute(taskLauncherTasklet, null, chunkContext);
assertEquals(1L, chunkContext.getStepContext()
.getStepExecution().getExecutionContext()
.get("task-execution-id"));
mockReturnValForTaskExecution(2L);
chunkContext = chunkContext();
createCompleteTaskExecution(0);
taskLauncherTasklet = getTaskExecutionTasklet();
execute(taskLauncherTasklet, null, chunkContext);
assertEquals(2L, chunkContext.getStepContext()
.getStepExecution().getExecutionContext()
.get("task-execution-id"));
}
@Override
public RepeatStatus execute(StepContribution contribution, ChunkContext chunkContext) throws Exception {
List<String> urlList = generatedFlinkManagerServerApi();
if (urlList.isEmpty()) {
return RepeatStatus.FINISHED;
}
Map<String, Boolean> jobExecuteStatus = createjobExecuteStatus();
for (String url : urlList) {
try {
ResponseEntity<Map> responseEntity = this.restTemplate.exchange(url, HttpMethod.GET, null, Map.class);
if (responseEntity.getStatusCode() != HttpStatus.OK) {
continue;
}
checkJobExecuteStatus(responseEntity, jobExecuteStatus);
} catch (Exception e) {
logger.error("fail call api to flink server.", e);
}
}
List<String> notExecuteJobList = new ArrayList<>(3);
for (Map.Entry<String, Boolean> entry : jobExecuteStatus.entrySet()) {
if (entry.getValue().equals(Boolean.FALSE)) {
notExecuteJobList.add(entry.getKey());
}
}
if (notExecuteJobList.size() > 0) {
String exceptionMessage = String.format("job fail : %s", notExecuteJobList);
throw new Exception(exceptionMessage);
}
return RepeatStatus.FINISHED;
}
@Test
@DirtiesContext
public void testInvalidTaskName() {
final String ERROR_MESSAGE =
"Could not find task definition named " + TASK_NAME;
VndErrors errors = new VndErrors("message", ERROR_MESSAGE, new Link("ref"));
Mockito.doThrow(new DataFlowClientException(errors))
.when(this.taskOperations)
.launch(ArgumentMatchers.anyString(),
ArgumentMatchers.any(),
ArgumentMatchers.any(), ArgumentMatchers.any());
TaskLauncherTasklet taskLauncherTasklet = getTaskExecutionTasklet();
ChunkContext chunkContext = chunkContext();
Throwable exception = assertThrows(DataFlowClientException.class,
() -> taskLauncherTasklet.execute(null, chunkContext));
Assertions.assertThat(exception.getMessage()).isEqualTo(ERROR_MESSAGE);
}
@Override
public RepeatStatus execute(StepContribution contribution, ChunkContext chunkContext) throws IOException {
val status = super.execute(contribution, chunkContext);
val context = BatchContextHolder.getContext();
val errors = getErrors(context);
if (isNotEmpty(errors)) {
errors.forEach(e -> {
val sourceName = e.getSourceName();
val position = e.getPosition();
val errorMessage = e.getErrorMessage();
log.error("エラーがあります。ファイル名={}, 行数={}, エラーメッセージ={}", sourceName, position, errorMessage);
});
throw new ValidationException("取り込むファイルに不正な行が含まれています。");
}
return status;
}
@Test
@DirtiesContext
public void testTaskLauncherTasklet() throws Exception{
createCompleteTaskExecution(0);
TaskLauncherTasklet taskLauncherTasklet =
getTaskExecutionTasklet();
ChunkContext chunkContext = chunkContext();
mockReturnValForTaskExecution(1L);
execute(taskLauncherTasklet, null, chunkContext);
assertEquals(1L, chunkContext.getStepContext()
.getStepExecution().getExecutionContext()
.get("task-execution-id"));
mockReturnValForTaskExecution(2L);
chunkContext = chunkContext();
createCompleteTaskExecution(0);
taskLauncherTasklet = getTaskExecutionTasklet();
execute(taskLauncherTasklet, null, chunkContext);
assertEquals(2L, chunkContext.getStepContext()
.getStepExecution().getExecutionContext()
.get("task-execution-id"));
}
@Test
@DirtiesContext
public void testTaskLauncherTaskletWithTaskExecutionId() throws Exception{
TaskLauncherTasklet taskLauncherTasklet = prepTaskLauncherTests();
TaskProperties taskProperties = new TaskProperties();
taskProperties.setExecutionid(88L);
mockReturnValForTaskExecution(2L);
ChunkContext chunkContext = chunkContext();
createCompleteTaskExecution(0);
taskLauncherTasklet = getTaskExecutionTasklet(taskProperties);
taskLauncherTasklet.setArguments(null);
execute(taskLauncherTasklet, null, chunkContext);
assertEquals(2L, chunkContext.getStepContext()
.getStepExecution().getExecutionContext()
.get("task-execution-id"));
assertEquals("--spring.cloud.task.parent-execution-id=88", ((List)chunkContext.getStepContext()
.getStepExecution().getExecutionContext()
.get("task-arguments")).get(0));
}
@Test
@DirtiesContext
public void testTaskLauncherTaskletWithTaskExecutionIdWithPreviousParentID() throws Exception{
TaskLauncherTasklet taskLauncherTasklet = prepTaskLauncherTests();
TaskProperties taskProperties = new TaskProperties();
taskProperties.setExecutionid(88L);
mockReturnValForTaskExecution(2L);
ChunkContext chunkContext = chunkContext();
createCompleteTaskExecution(0);
chunkContext.getStepContext()
.getStepExecution().getExecutionContext().put("task-arguments", new ArrayList<String>());
((List)chunkContext.getStepContext()
.getStepExecution().getExecutionContext()
.get("task-arguments")).add("--spring.cloud.task.parent-execution-id=84");
taskLauncherTasklet = getTaskExecutionTasklet(taskProperties);
taskLauncherTasklet.setArguments(null);
execute(taskLauncherTasklet, null, chunkContext);
assertEquals(2L, chunkContext.getStepContext()
.getStepExecution().getExecutionContext()
.get("task-execution-id"));
assertEquals("--spring.cloud.task.parent-execution-id=88", ((List)chunkContext.getStepContext()
.getStepExecution().getExecutionContext()
.get("task-arguments")).get(0));
}
@Test
@DirtiesContext
public void testInvalidTaskName() {
final String ERROR_MESSAGE =
"Could not find task definition named " + TASK_NAME;
VndErrors errors = new VndErrors("message", ERROR_MESSAGE, new Link("ref"));
Mockito.doThrow(new DataFlowClientException(errors))
.when(this.taskOperations)
.launch(ArgumentMatchers.anyString(),
ArgumentMatchers.any(),
ArgumentMatchers.any(), ArgumentMatchers.any());
TaskLauncherTasklet taskLauncherTasklet = getTaskExecutionTasklet();
ChunkContext chunkContext = chunkContext();
Throwable exception = assertThrows(DataFlowClientException.class,
() -> taskLauncherTasklet.execute(null, chunkContext));
Assertions.assertThat(exception.getMessage()).isEqualTo(ERROR_MESSAGE);
}
@Bean
public Step step1() {
return this.stepBuilderFactory.get("step1")
.tasklet(new Tasklet() {
@Override
public RepeatStatus execute(StepContribution stepContribution, ChunkContext chunkContext) throws Exception {
System.out.println(">> This was run in a Spring Batch app!");
return RepeatStatus.FINISHED;
}
}).build();
}
@Bean
public Step ourBatchStep() {
return stepBuilderFactory.get("stepPackPub1")
.tasklet(new Tasklet() {
public RepeatStatus execute(StepContribution contribution, ChunkContext chunkContext) {
return null;
}
})
.build();
}
@Override
public RepeatStatus execute(StepContribution contribution, ChunkContext chunkContext) {
propertyResolverConsumer.accept(new PropertySourcesPropertyResolver(propertySources(
stepExecution.getJobExecution().getJobConfigurationName(),
stepExecution.getJobParameters().toProperties(), environment)));
return FINISHED;
}
@Override
public RepeatStatus execute(StepContribution stepContribution,
ChunkContext chunkContext) {
try (Stream<Path> walk =
Files.walk(Paths.get(directory.getFile().getPath()))) {
walk.filter(Files::isRegularFile).map(Path::toFile)
.forEach(File::delete);
} catch (IOException e) {
LOGGER.error("error deleting files", e);
throw new UnexpectedJobExecutionException(
"unable to delete files");
}
return RepeatStatus.FINISHED;
}
@Test
@DirtiesContext
public void testTaskLauncherTaskletWithTaskExecutionId() throws Exception{
createCompleteTaskExecution(0);
TaskLauncherTasklet taskLauncherTasklet =
getTaskExecutionTasklet();
ChunkContext chunkContext = chunkContext();
mockReturnValForTaskExecution(1L);
execute(taskLauncherTasklet, null, chunkContext);
assertEquals(1L, chunkContext.getStepContext()
.getStepExecution().getExecutionContext()
.get("task-execution-id"));
assertNull(chunkContext.getStepContext()
.getStepExecution().getExecutionContext()
.get("task-arguments"));
TaskProperties taskProperties = new TaskProperties();
taskProperties.setExecutionid(88l);
mockReturnValForTaskExecution(2L);
chunkContext = chunkContext();
createCompleteTaskExecution(0);
taskLauncherTasklet = getTaskExecutionTasklet(taskProperties);
taskLauncherTasklet.setArguments(null);
execute(taskLauncherTasklet, null, chunkContext);
assertEquals(2L, chunkContext.getStepContext()
.getStepExecution().getExecutionContext()
.get("task-execution-id"));
assertEquals("--spring.cloud.task.parent-execution-id=88", ((List)chunkContext.getStepContext()
.getStepExecution().getExecutionContext()
.get("task-arguments")).get(0));
}
@Test
@DirtiesContext
public void testTaskLauncherTaskletTimeout() {
mockReturnValForTaskExecution(1L);
this.composedTaskProperties.setMaxWaitTime(500);
this.composedTaskProperties.setIntervalTimeBetweenChecks(1000);
TaskLauncherTasklet taskLauncherTasklet = getTaskExecutionTasklet();
ChunkContext chunkContext = chunkContext();
Throwable exception = assertThrows(TaskExecutionTimeoutException.class, () -> execute(taskLauncherTasklet, null, chunkContext));
Assertions.assertThat(exception.getMessage()).isEqualTo("Timeout occurred while " +
"processing task with Execution Id 1");
}
@Test
@DirtiesContext
public void testNoDataFlowServer() {
final String ERROR_MESSAGE =
"I/O error on GET request for \"http://localhost:9393\": Connection refused; nested exception is java.net.ConnectException: Connection refused";
Mockito.doThrow(new ResourceAccessException(ERROR_MESSAGE))
.when(this.taskOperations).launch(ArgumentMatchers.anyString(),
ArgumentMatchers.any(),
ArgumentMatchers.any(), ArgumentMatchers.any());
TaskLauncherTasklet taskLauncherTasklet = getTaskExecutionTasklet();
ChunkContext chunkContext = chunkContext();
Throwable exception = assertThrows(ResourceAccessException.class,
() -> execute(taskLauncherTasklet, null, chunkContext));
Assertions.assertThat(exception.getMessage()).isEqualTo(ERROR_MESSAGE);
}
@Test
@DirtiesContext
public void testTaskLauncherTaskletFailure() {
mockReturnValForTaskExecution(1L);
TaskLauncherTasklet taskLauncherTasklet = getTaskExecutionTasklet();
ChunkContext chunkContext = chunkContext();
createCompleteTaskExecution(1);
Throwable exception = assertThrows(UnexpectedJobExecutionException.class,
() -> execute(taskLauncherTasklet, null, chunkContext));
Assertions.assertThat(exception.getMessage()).isEqualTo("Task returned a non zero exit code.");
}
private RepeatStatus execute(TaskLauncherTasklet taskLauncherTasklet, StepContribution contribution,
ChunkContext chunkContext) throws Exception{
RepeatStatus status = taskLauncherTasklet.execute(contribution, chunkContext);
if (!status.isContinuable()) {
throw new IllegalStateException("Expected continuable status for the first execution.");
}
return taskLauncherTasklet.execute(contribution, chunkContext);
}
@Test
@DirtiesContext
public void testTaskLauncherTaskletNullResult() throws Exception {
boolean isException = false;
mockReturnValForTaskExecution(1L);
TaskLauncherTasklet taskLauncherTasklet = getTaskExecutionTasklet();
ChunkContext chunkContext = chunkContext();
getCompleteTaskExecutionWithNull();
Throwable exception = assertThrows(UnexpectedJobExecutionException.class,
() -> execute(taskLauncherTasklet, null, chunkContext));
Assertions.assertThat(exception.getMessage()).isEqualTo("Task returned a null exit code.");
}
private ChunkContext chunkContext ()
{
final long JOB_EXECUTION_ID = 123L;
final String STEP_NAME = "myTestStep";
JobExecution jobExecution = new JobExecution(JOB_EXECUTION_ID);
StepExecution stepExecution = new StepExecution(STEP_NAME, jobExecution);
StepContext stepContext = new StepContext(stepExecution);
return new ChunkContext(stepContext);
}
private Step createTaskletStepWithListener(final String taskName,
StepExecutionListener stepExecutionListener) {
return this.steps.get(taskName)
.tasklet(new Tasklet() {
@Override
public RepeatStatus execute(StepContribution contribution, ChunkContext chunkContext) throws Exception {
return RepeatStatus.FINISHED;
}
})
.transactionAttribute(getTransactionAttribute())
.listener(stepExecutionListener)
.build();
}
private Step createTaskletStep(final String taskName) {
return this.steps.get(taskName)
.tasklet(new Tasklet() {
@Override
public RepeatStatus execute(StepContribution contribution, ChunkContext chunkContext) throws Exception {
return RepeatStatus.FINISHED;
}
})
.transactionAttribute(getTransactionAttribute())
.build();
}
@Override
final public RepeatStatus execute(StepContribution stepContribution, ChunkContext chunkContext) throws Exception {
this.stepContribution = stepContribution;
this.chunkContext = chunkContext;
// 画面传入参数取得
Map<String, Object> jobParameters = chunkContext.getStepContext().getJobParameters();
this.doExec(jobParameters);
return RepeatStatus.FINISHED;
}
private Tasklet nextBizDateTasklet(){
return new Tasklet() {
@Override
public RepeatStatus execute(StepContribution contribution, ChunkContext chunkContext) {
//入参[email protected]ef3fc[readCount=0,writeCount=0,filterCount=0,parentSkipCount=0,readSkipCount=0,writeSkipCount=0,processSkipCount=0,exitStatus=exitCode=EXECUTING;exitDescription=],chunkCo[email protected]2313a648[stepContext=SynchronizedAttributeAccessor: [], stepExecutionContext={batch.taskletType=com.jadyer.seed.controller.batch.javaconfig.StepService0005$1, batch.stepType=org.springframework.batch.core.step.tasklet.TaskletStep}, jobExecutionContext={}, jobParameters={time=1536145118798},complete=false,attributes={}],chunkContext.getStepC[email protected]3441dec5[stepExecution=StepExecution: id=23, version=1, name=step0005, status=STARTED, exitStatus=EXECUTING, readCount=0, filterCount=0, writeCount=0 readSkipCount=0, writeSkipCount=0, processSkipCount=0, commitCount=0, rollbackCount=0, exitDescription=,callbacks={},propertyContext=<null>,support=org.sprin[email protected]0]
LogUtil.getLogger().info("入参contribution={},chunkContext={},chunkContext.getStepContext={}", ReflectionToStringBuilder.toString(contribution), ReflectionToStringBuilder.toString(chunkContext), ReflectionToStringBuilder.toString(chunkContext.getStepContext()));
try {
TimeUnit.SECONDS.sleep(5);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
return RepeatStatus.FINISHED;
}
};
}
@Override
public RepeatStatus execute(StepContribution contribution, ChunkContext chunkContext) throws Exception {
File dataFile = new File("D:\\data\\seedboot-batch-result.txt");
for(Person obj : personRepository.findAll()){
StringBuilder sb = new StringBuilder();
sb.append(obj.getAge()).append("|").append(obj.getRealName()).append("\n");
FileUtils.writeStringToFile(dataFile, sb.toString(), StandardCharsets.UTF_8, true);
}
return RepeatStatus.FINISHED;
}
@Bean
public Step step1() {
return stepBuilderFactory.get("step1")
.tasklet(new Tasklet() {
public RepeatStatus execute(StepContribution contribution, ChunkContext chunkContext) {
return null;
}
})
.build();
}
private TaskLauncherTasklet prepTaskLauncherTests() throws Exception{
createCompleteTaskExecution(0);
TaskLauncherTasklet taskLauncherTasklet =
getTaskExecutionTasklet();
ChunkContext chunkContext = chunkContext();
mockReturnValForTaskExecution(1L);
execute(taskLauncherTasklet, null, chunkContext);
assertEquals(1L, chunkContext.getStepContext()
.getStepExecution().getExecutionContext()
.get("task-execution-id"));
assertNull(chunkContext.getStepContext()
.getStepExecution().getExecutionContext()
.get("task-arguments"));
return taskLauncherTasklet;
}
@Test
@DirtiesContext
public void testTaskLauncherTaskletTimeout() {
mockReturnValForTaskExecution(1L);
this.composedTaskProperties.setMaxWaitTime(500);
this.composedTaskProperties.setIntervalTimeBetweenChecks(1000);
TaskLauncherTasklet taskLauncherTasklet = getTaskExecutionTasklet();
ChunkContext chunkContext = chunkContext();
Throwable exception = assertThrows(TaskExecutionTimeoutException.class, () -> execute(taskLauncherTasklet, null, chunkContext));
Assertions.assertThat(exception.getMessage()).isEqualTo("Timeout occurred while " +
"processing task with Execution Id 1");
}
@Test
@DirtiesContext
public void testNoDataFlowServer() {
final String ERROR_MESSAGE =
"I/O error on GET request for \"http://localhost:9393\": Connection refused; nested exception is java.net.ConnectException: Connection refused";
Mockito.doThrow(new ResourceAccessException(ERROR_MESSAGE))
.when(this.taskOperations).launch(ArgumentMatchers.anyString(),
ArgumentMatchers.any(),
ArgumentMatchers.any(), ArgumentMatchers.any());
TaskLauncherTasklet taskLauncherTasklet = getTaskExecutionTasklet();
ChunkContext chunkContext = chunkContext();
Throwable exception = assertThrows(ResourceAccessException.class,
() -> execute(taskLauncherTasklet, null, chunkContext));
Assertions.assertThat(exception.getMessage()).isEqualTo(ERROR_MESSAGE);
}
@Test
@DirtiesContext
public void testTaskLauncherTaskletFailure() {
mockReturnValForTaskExecution(1L);
TaskLauncherTasklet taskLauncherTasklet = getTaskExecutionTasklet();
ChunkContext chunkContext = chunkContext();
createCompleteTaskExecution(1);
Throwable exception = assertThrows(UnexpectedJobExecutionException.class,
() -> execute(taskLauncherTasklet, null, chunkContext));
Assertions.assertThat(exception.getMessage()).isEqualTo("Task returned a non zero exit code.");
}
private RepeatStatus execute(TaskLauncherTasklet taskLauncherTasklet, StepContribution contribution,
ChunkContext chunkContext) throws Exception{
RepeatStatus status = taskLauncherTasklet.execute(contribution, chunkContext);
if (!status.isContinuable()) {
throw new IllegalStateException("Expected continuable status for the first execution.");
}
return taskLauncherTasklet.execute(contribution, chunkContext);
}