下面列出了org.springframework.boot.autoconfigure.jackson.Jackson2ObjectMapperBuilderCustomizer#org.springframework.batch.item.ExecutionContext 实例代码,或者点击链接到github查看源代码,也可以在右侧发表评论。
@Bean
public Job defaultExceptionJob() {
return jobBuilderFactory.get("defaultExceptionJob")
.start(
stepBuilderFactory.get("step")
.tasklet((stepContribution, chunkContext) -> {
// 获取执行上下文
ExecutionContext executionContext = chunkContext.getStepContext().getStepExecution().getExecutionContext();
if (executionContext.containsKey("success")) {
System.out.println("任务执行成功");
return RepeatStatus.FINISHED;
} else {
String errorMessage = "处理任务过程发生异常";
System.out.println(errorMessage);
executionContext.put("success", true);
throw new RuntimeException(errorMessage);
}
}).build()
).build();
}
@Test
public void testRunFlatFileToDbSkipJob_SkipInWrite_ProcessorNonTransactional() throws InterruptedException {
JobExecution jobExecution = runJob("flatFileToDbSkipProcessorNonTransactionalJob",
"metrics/flatFileToDbSkipJob_SkipInWrite.csv");
assertThat(jobExecution.getStatus(), is(BatchStatus.COMPLETED));
ExecutionContext executionContext = jobExecution.getStepExecutions().iterator().next().getExecutionContext();
long writeCount = 7L;
MetricValidator validator = MetricValidatorBuilder.metricValidator().withExecutionContext(executionContext)
.withBeforeChunkCount(4L).withStreamOpenCount(1L).withStreamUpdateCount(4L).withStreamCloseCount(0L)
.withBeforeReadCount(9L).withReadCount(9L).withAfterReadCount(8L).withReadErrorCount(0L)
.withBeforeProcessCount(8L).withProcessCount(8L).withAfterProcessCount(8L).withProcessErrorCount(0L)
.withBeforeWriteCount(5L).withWriteCount(writeCount).withAfterWriteCount(7L).withWriteErrorCount(4L)
.withAfterChunkCount(4L).withChunkErrorCount(2L).withSkipInReadCount(0L).withSkipInProcessCount(0L)
.withSkipInWriteCount(1L).build();
// TODO Bug in beforeWrite listener in Spring Batch?
validator.validate();
// if one is correct, all will be in the metricReader, so I check just one
Gauge gauge = meterRegistry.find(MetricsListener.METRIC_NAME)//
.tag("context", "flatFileToDbSkipProcessorNonTransactionalJob.step")//
.tag("name", MetricNames.PROCESS_COUNT.getName())//
.gauge();
assertThat((Double) gauge.value(), is(notNullValue()));
assertThat(jdbcTemplate.queryForObject("SELECT COUNT(*) FROM ITEM", Long.class), is(writeCount));
}
/**
* Reads the next item, jumping to next resource if necessary.
*/
@Override
public T read() throws Exception, UnexpectedInputException, ParseException {
if (noInput) {
return null;
}
// If there is no resource, then this is the first item, set the current
// resource to 0 and open the first delegate.
if (currentResource == -1) {
currentResource = 0;
delegate.setResource(resources[currentResource]);
delegate.open(new ExecutionContext());
}
return readNextItem();
}
private List<Table> buildTable(List<StepExecution> stepContexts) {
return stepContexts.stream().map(step -> {
String date = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS")
.format(step.getStartTime());
long millis = ChronoUnit.MILLIS.between(step.getStartTime().toInstant(),
step.getEndTime().toInstant());
ExecutionContext context = step.getExecutionContext();
ExecutionResultReport entity = (ExecutionResultReport) context.get("entity");
if (entity == null) {
return null;
}
String projectName = TrainPostReleaseReleaserTask.class
.isAssignableFrom(entity.getReleaserTaskType()) ? "postRelease"
: entity.getProjectName();
return new Table(date, time(millis), projectName, entity.getShortName(),
entity.getDescription(), entity.getState(), entity.getExceptions());
}).filter(Objects::nonNull).collect(Collectors.toCollection(LinkedList::new));
}
/**
* Assign the filename of each of the injected resources to an
* {@link ExecutionContext}.
*
* @see Partitioner#partition(int)
*/
@Override
public Map<String, ExecutionContext> partition(int gridSize) {
Map<String, ExecutionContext> map = new HashMap<String, ExecutionContext>(gridSize);
int i = 0, k = 1;
for (Resource resource : resources) {
ExecutionContext context = new ExecutionContext();
Assert.state(resource.exists(), "Resource does not exist: " + resource);
context.putString(keyName, resource.getFilename());
context.putString("opFileName", "output" + k++ + ".xml");
map.put(PARTITION_KEY + i, context);
i++;
}
return map;
}
@Test
public void testRunFlatFileToDbSkipJob_SkipInProcess_Failed() throws InterruptedException {
JobExecution jobExecution = runJob("flatFileToDbSkipJob",
"metrics/flatFileToDbSkipJob_SkipInProcess_Failed.csv");
assertThat(jobExecution.getStatus(), is(BatchStatus.FAILED));
ExecutionContext executionContext = jobExecution.getStepExecutions().iterator().next().getExecutionContext();
long writeCount = 7L;
MetricValidator validator = MetricValidatorBuilder.metricValidator().withExecutionContext(executionContext)
.withBeforeChunkCount(3L).withStreamOpenCount(1L).withStreamUpdateCount(4L).withStreamCloseCount(0L)
.withBeforeReadCount(12L).withReadCount(12L).withAfterReadCount(12L).withReadErrorCount(0L)
.withBeforeProcessCount(7L).withProcessCount(7L).withAfterProcessCount(7L).withProcessErrorCount(5L)
.withBeforeWriteCount(7L).withWriteCount(writeCount).withAfterWriteCount(7L).withAfterChunkCount(3L)
.withChunkErrorCount(6L).withSkipInReadCount(0L).withSkipInProcessCount(2L).withSkipInWriteCount(0L)
.build();
validator.validate();
// if one is correct, all will be in the metricReader, so I check just one
Gauge gauge = meterRegistry.find(MetricsListener.METRIC_NAME)//
.tag("context", "flatFileToDbSkipJob.step")//
.tag("name", MetricNames.PROCESS_COUNT.getName())//
.gauge();
assertThat((Double) gauge.value(), is(notNullValue()));
assertThat(jdbcTemplate.queryForObject("SELECT COUNT(*) FROM ITEM", Long.class), is(writeCount));
}
private boolean populateMap(ScheduledPartitionParams params, Map<String, ExecutionContext> result,
long start, long end, int counter) {
checkForEmptyPartitions = true;
if(checkForEmptyPartitions) {
long recordCountThisPartition = getRecordCountThisPartition(Long.toString(start), Long.toString(end),
params.getMinTimeStamp().toString(),
params.getMaxTimeStamp().toString());
if (recordCountThisPartition > 0L) {
result.put("partition" + counter, getNewExecutionContext(params, start, end));
logger.info("partition " + counter + " created");
return true;
} else {
return false;
}
}else{
result.put("partition" + counter, getNewExecutionContext(params, start, end));
logger.info("partition " + counter + " created");
return true;
}
}
@Test
public void testGetSqlRows() throws Exception {
NamedColumnJdbcItemReaderFactory factory = new NamedColumnJdbcItemReaderFactory();
factory.setDataSource(dataSource);
factory.setSql("SELECT name, id FROM test");
factory.afterPropertiesSet();
NamedColumnJdbcItemReader reader = factory.getObject();
reader.afterPropertiesSet();
reader.open(new ExecutionContext(new HashMap<String, Object>()));
reader.setDelimiter(",");
verifyRead(reader, "Bob,1");
verifyRead(reader, "Jane,2");
verifyRead(reader, "John,3");
verifyRead(reader, null);
}
@Test
public void testTwoPartitions() {
jdbc.execute("insert into bar (foo) values (1), (2), (3), (4)");
partitioner.setColumn("foo");
partitioner.setTable("bar");
partitioner.setPartitions(2);
partitioner.beforeStep(new StepExecution("step1", new JobExecution(5l)));
Map<String, ExecutionContext> partitions = partitioner.partition(1);
assertEquals(2, partitions.size());
assertTrue(partitions.containsKey("partition0"));
assertEquals("WHERE (foo BETWEEN 1 AND 2)", partitions.get("partition0").get("partClause"));
assertEquals("-p0", partitions.get("partition0").get("partSuffix"));
assertTrue(partitions.containsKey("partition1"));
assertEquals("WHERE (foo BETWEEN 3 AND 4)", partitions.get("partition1").get("partClause"));
assertEquals("-p1", partitions.get("partition1").get("partSuffix"));
}
/**
* Assert that by using the {@link ExecutionContextJacksonMixIn} Jackson renders the
* Step Execution Context correctly.
*
* @throws JsonProcessingException if a Json generation error occurs.
*/
@Test
public void testSerializationOfSingleStepExecution() throws JsonProcessingException {
final ObjectMapper objectMapper = new ObjectMapper();
objectMapper.addMixIn(StepExecution.class, StepExecutionJacksonMixIn.class);
objectMapper.addMixIn(ExecutionContext.class, ExecutionContextJacksonMixIn.class);
final StepExecution stepExecution = getStepExecution();
final String result = objectMapper.writeValueAsString(stepExecution);
assertThat(result, not(containsString("\"executionContext\":{\"dirty\":true,\"empty\":false}")));
assertThat(result, containsString("\"executionContext\":{\"dirty\":true,\"empty\":false,\"values\":[{"));
assertThat(result, containsString("{\"counter\":1234}"));
assertThat(result, containsString("{\"myDouble\":1.123456}"));
assertThat(result, containsString("{\"Josh\":4444444444}"));
assertThat(result, containsString("{\"awesomeString\":\"Yep\"}"));
assertThat(result, containsString("{\"hello\":\"world\""));
assertThat(result, containsString("{\"counter2\":9999}"));
}
@Test
public void testRunFlatFileToDbSkipJob_SkipInProcess() throws InterruptedException {
JobExecution jobExecution = runJob("flatFileToDbSkipJob", "metrics/flatFileToDbSkipJob_SkipInProcess.csv");
assertThat(jobExecution.getStatus(), is(BatchStatus.COMPLETED));
ExecutionContext executionContext = jobExecution.getStepExecutions().iterator().next().getExecutionContext();
long writeCount = 7L;
MetricValidator validator = MetricValidatorBuilder.metricValidator().withExecutionContext(executionContext)
.withBeforeChunkCount(3L).withStreamOpenCount(1L).withStreamUpdateCount(4L).withStreamCloseCount(0L)
.withBeforeReadCount(9L).withReadCount(9L).withAfterReadCount(8L).withReadErrorCount(0L)
.withBeforeProcessCount(7L).withProcessCount(7L).withAfterProcessCount(7L).withProcessErrorCount(1L)
.withBeforeWriteCount(7L).withWriteCount(writeCount).withAfterWriteCount(7L).withAfterChunkCount(3L)
.withChunkErrorCount(1L).withSkipInReadCount(0L).withSkipInProcessCount(1L).withSkipInWriteCount(0L)
.build();
validator.validate();
// if one is correct, all will be in the metricReader, so I check just one
Gauge gauge = meterRegistry.find(MetricsListener.METRIC_NAME)//
.tag("context", "flatFileToDbSkipJob.step")//
.tag("name", MetricNames.PROCESS_COUNT.getName())//
.gauge();
assertThat((Double) gauge.value(), is(notNullValue()));
assertThat(jdbcTemplate.queryForObject("SELECT COUNT(*) FROM ITEM", Long.class), is(writeCount));
}
@Test
public void testDeserializationOfBasicExecutionContext() throws IOException {
final ObjectMapper objectMapper = DataFlowTemplate.prepareObjectMapper(new ObjectMapper());
final InputStream inputStream = ExecutionContextDeserializationTests.class
.getResourceAsStream("/BasicExecutionContextJson.txt");
final String json = new String(StreamUtils.copyToByteArray(inputStream));
ExecutionContext executionContext = objectMapper.readValue(json,
new TypeReference<ExecutionContext>() {
});
assertEquals(2, executionContext.entrySet().size());
assertEquals("org.springframework.cloud.task.app.timestamp.batch.TimestampBatchTaskConfiguration$1", executionContext.get("batch.taskletType"));
assertEquals("org.springframework.batch.core.step.tasklet.TaskletStep", executionContext.get("batch.stepType"));
assertFalse(executionContext.isDirty());
assertFalse(executionContext.isEmpty());
}
/**
* Edge-case. If the JSON data contains values in the execution context but
* the {@code empty} property is {@code true}. The deserialized object will return
* {@code false}. The property is ignored during deserializtion.
*
* @throws IOException
*/
@Test
public void testFaultyExecutionContext() throws IOException {
final ObjectMapper objectMapper = DataFlowTemplate.prepareObjectMapper(new ObjectMapper());
final InputStream inputStream = ExecutionContextDeserializationTests.class
.getResourceAsStream("/FaultyExecutionContextJson.txt");
final String json = new String(StreamUtils.copyToByteArray(inputStream));
ExecutionContext executionContext = objectMapper.readValue(json,
new TypeReference<ExecutionContext>() {
});
assertEquals(2, executionContext.entrySet().size());
assertEquals("org.springframework.cloud.task.app.timestamp.batch.TimestampBatchTaskConfiguration$1", executionContext.get("batch.taskletType"));
assertEquals("org.springframework.batch.core.step.tasklet.TaskletStep", executionContext.get("batch.stepType"));
assertTrue(executionContext.isDirty());
assertFalse(executionContext.isEmpty());
}
@Test
public void testDeserializationOfSingleJobExecution() throws IOException {
final ObjectMapper objectMapper = DataFlowTemplate.prepareObjectMapper(new ObjectMapper());
final InputStream inputStream = JobExecutionDeserializationTests.class
.getResourceAsStream("/SingleJobExecutionJson.txt");
final String json = new String(StreamUtils.copyToByteArray(inputStream));
final JobExecutionResource jobExecutionInfoResource = objectMapper.readValue(json, JobExecutionResource.class);
assertNotNull(jobExecutionInfoResource);
assertEquals(Long.valueOf(1), jobExecutionInfoResource.getJobId());
assertEquals("ff.job", jobExecutionInfoResource.getName());
assertEquals("COMPLETED", jobExecutionInfoResource.getJobExecution().getStatus().name());
assertEquals(1, jobExecutionInfoResource.getJobExecution().getStepExecutions().size());
final StepExecution stepExecution = jobExecutionInfoResource.getJobExecution().getStepExecutions().iterator().next();
assertNotNull(stepExecution);
final ExecutionContext stepExecutionExecutionContext = stepExecution.getExecutionContext();
assertNotNull(stepExecutionExecutionContext);
assertEquals(2, stepExecutionExecutionContext.size());
}
@Before
public void setupMockMVC() {
this.mockMvc = MockMvcBuilders.webAppContextSetup(wac)
.defaultRequest(get("/").accept(MediaType.APPLICATION_JSON)).build();
if (!initialized) {
createStepExecution(JOB_NAME_ORIG, STEP_NAME_ORIG);
createStepExecution(JOB_NAME_FOO, STEP_NAME_ORIG, STEP_NAME_FOO);
createStepExecution(JOB_NAME_FOOBAR, STEP_NAME_ORIG, STEP_NAME_FOO, STEP_NAME_FOOBAR);
initialized = true;
}
for (HttpMessageConverter<?> converter : adapter.getMessageConverters()) {
if (converter instanceof MappingJackson2HttpMessageConverter) {
final MappingJackson2HttpMessageConverter jacksonConverter = (MappingJackson2HttpMessageConverter) converter;
jacksonConverter.getObjectMapper().addMixIn(StepExecution.class, StepExecutionJacksonMixIn.class);
jacksonConverter.getObjectMapper().addMixIn(ExecutionContext.class, ExecutionContextJacksonMixIn.class);
jacksonConverter.getObjectMapper().setDateFormat(new ISO8601DateFormatWithMilliSeconds());
}
}
}
@Bean
public Partitioner partitioner() {
return new Partitioner() {
@Override
public Map<String, ExecutionContext> partition(int gridSize) {
Map<String, ExecutionContext> partitions = new HashMap<>(gridSize);
for (int i = 0; i < GRID_SIZE; i++) {
ExecutionContext context1 = new ExecutionContext();
context1.put("partitionNumber", i);
partitions.put("partition" + i, context1);
}
return partitions;
}
};
}
@Test
public void testRunFlatFileToDbSkipJob_SkipInWrite() throws InterruptedException {
JobExecution jobExecution = runJob("flatFileToDbSkipJob", "metrics/flatFileToDbSkipJob_SkipInWrite.csv");
assertThat(jobExecution.getStatus(), is(BatchStatus.COMPLETED));
ExecutionContext executionContext = jobExecution.getStepExecutions().iterator().next().getExecutionContext();
long writeCount = 7L;
MetricValidator validator = MetricValidatorBuilder.metricValidator().withExecutionContext(executionContext)
.withBeforeChunkCount(4L).withStreamOpenCount(1L).withStreamUpdateCount(4L).withStreamCloseCount(0L)
.withBeforeReadCount(9L).withReadCount(9L).withAfterReadCount(8L).withReadErrorCount(0L)
.withBeforeProcessCount(7L).withProcessCount(7L).withAfterProcessCount(7L).withProcessErrorCount(0L)
.withBeforeWriteCount(5L).withWriteCount(writeCount).withAfterWriteCount(7L).withWriteErrorCount(4L)
.withAfterChunkCount(4L).withChunkErrorCount(2L).withSkipInReadCount(0L).withSkipInProcessCount(0L)
.withSkipInWriteCount(1L).build();
validator.validate();
// if one is correct, all will be in the metricReader, so I check just one
Gauge gauge = meterRegistry.find(MetricsListener.METRIC_NAME)//
.tag("context", "flatFileToDbSkipJob.step")//
.tag("name", MetricNames.PROCESS_COUNT.getName())//
.gauge();
assertThat((Double) gauge.value(), is(notNullValue()));
assertThat(jdbcTemplate.queryForObject("SELECT COUNT(*) FROM ITEM", Long.class), is(writeCount));
}
@Test
public void testRunFlatFileToDbSkipJob_SkipInRead() throws InterruptedException {
JobExecution jobExecution = runJob("flatFileToDbSkipJob", "metrics/flatFileToDbSkipJob_SkipInRead.csv");
assertThat(jobExecution.getStatus(), is(BatchStatus.COMPLETED));
ExecutionContext executionContext = jobExecution.getStepExecutions().iterator().next().getExecutionContext();
long writeCount = 7L;
MetricValidator validator = MetricValidatorBuilder.metricValidator().withExecutionContext(executionContext)
.withBeforeChunkCount(3L).withStreamOpenCount(1L).withStreamUpdateCount(4L).withStreamCloseCount(0L)
.withBeforeReadCount(9L).withReadCount(9L).withAfterReadCount(7L).withReadErrorCount(1L)
.withBeforeProcessCount(7L).withProcessCount(7L).withAfterProcessCount(7L).withProcessErrorCount(0L)
.withBeforeWriteCount(7L).withWriteCount(writeCount).withAfterWriteCount(7L).withWriteErrorCount(0L)
.withAfterChunkCount(3L).withChunkErrorCount(0L).withSkipInReadCount(1L).withSkipInProcessCount(0L)
.withSkipInWriteCount(0L).build();
validator.validate();
// if one is correct, all will be in the metricReader, so I check just one
Gauge gauge = meterRegistry.find(MetricsListener.METRIC_NAME)//
.tag("context", "flatFileToDbSkipJob.step")//
.tag("name", MetricNames.PROCESS_COUNT.getName())//
.gauge();
assertThat((Double) gauge.value(), is(notNullValue()));
assertThat(jdbcTemplate.queryForObject("SELECT COUNT(*) FROM ITEM", Long.class), is(writeCount));
}
@Test
public void testRunFlatFileToDbSkipJob_SkipInProcess_ReaderTransactional() throws InterruptedException {
JobExecution jobExecution = runJob("flatFileToDbSkipReaderTransactionalJob",
"metrics/flatFileToDbSkipJob_SkipInProcess.csv");
assertThat(jobExecution.getStatus(), is(BatchStatus.COMPLETED));
ExecutionContext executionContext = jobExecution.getStepExecutions().iterator().next().getExecutionContext();
long writeCount = 5L;
MetricValidator validator = MetricValidatorBuilder.metricValidator().withExecutionContext(executionContext)
.withBeforeChunkCount(2L).withStreamOpenCount(1L).withStreamUpdateCount(3L).withStreamCloseCount(0L)
.withBeforeReadCount(6L).withReadCount(6L).withAfterReadCount(5L).withReadErrorCount(0L)
.withBeforeProcessCount(5L).withProcessCount(5L).withAfterProcessCount(5L).withProcessErrorCount(1L)
.withBeforeWriteCount(5L).withWriteCount(writeCount).withAfterWriteCount(5L).withAfterChunkCount(2L)
.withChunkErrorCount(1L).withSkipInReadCount(0L).withSkipInProcessCount(0L).withSkipInWriteCount(0L)
.build();
validator.validate();
// if one is correct, all will be in the metricReader, so I check just one
Gauge gauge = meterRegistry.find(MetricsListener.METRIC_NAME)//
.tag("context", "flatFileToDbSkipReaderTransactionalJob.step")//
.tag("name", MetricNames.PROCESS_COUNT.getName())//
.gauge();
assertThat((Double) gauge.value(), is(notNullValue()));
assertThat(jdbcTemplate.queryForObject("SELECT COUNT(*) FROM ITEM", Long.class), is(writeCount));
}
@Override
public Map<String, ExecutionContext> partition(int gridSize) {
int partitionCount = gridSize * PARTITIONS_PER_NODE;
List<Long> employeeIds = taxCalculationRepository.getUnprocessedEmployeeIds(year, month, stepExecution.getJobExecutionId());
int size = employeeIds.size();
int targetSize = size / partitionCount + 1;
Map<String, ExecutionContext> result = new HashMap<>();
for (int index = 0, partitionNumber = 0; index < size; index += targetSize, partitionNumber++) {
ExecutionContext value = new ExecutionContext();
value.putLong("minValue", employeeIds.get(index));
value.putLong("maxValue", employeeIds.get(Math.min(index + targetSize - 1, size - 1)));
value.putLong("partition", partitionNumber);
result.put("partition" + partitionNumber, value);
}
return result;
}
@Test
public void readTest() {
StepExecution stepExecution = new StepExecution("alarmStep", null);
ExecutionContext executionContext = new ExecutionContext();
stepExecution.setExecutionContext(executionContext);
AlarmReader reader = new AlarmReader(dataCollectorFactory, applicationIndexDao, alarmService);
reader.beforeStep(stepExecution);
for(int i = 0; i < 7; i++) {
assertNotNull(reader.read());
}
assertNull(reader.read());
}
@Test
public void readTest3() {
StepExecution stepExecution = new StepExecution("alarmStep", null);
ExecutionContext executionContext = new ExecutionContext();
stepExecution.setExecutionContext(executionContext);
AlarmServiceImpl alarmService = new AlarmServiceImpl(mock(AlarmDao.class)) {
@Override
public java.util.List<Rule> selectRuleByApplicationId(String applicationId) {
return new LinkedList<>();
}
};
AlarmReader reader = new AlarmReader(dataCollectorFactory, applicationIndexDao, alarmService);
reader.beforeStep(stepExecution);
assertNull(reader.read());
}
@Test
public void testRunFlatFileToDbNoSkipJob_Success() throws InterruptedException {
JobExecution jobExecution = runJob("flatFileToDbNoSkipJob", "metrics/flatFileToDbNoSkipJob_Success.csv");
assertThat(jobExecution.getStatus(), is(BatchStatus.COMPLETED));
ExecutionContext executionContext = jobExecution.getStepExecutions().iterator().next().getExecutionContext();
long writeCount = 5L;
MetricValidator validator = MetricValidatorBuilder.metricValidator().withExecutionContext(executionContext)
.withBeforeChunkCount(2L).withStreamOpenCount(1L).withStreamUpdateCount(3L).withStreamCloseCount(0L)
.withBeforeReadCount(6L).withReadCount(6L).withAfterReadCount(5L).withReadErrorCount(0L)
.withBeforeProcessCount(5L).withProcessCount(5L).withAfterProcessCount(5L).withProcessErrorCount(0L)
.withBeforeWriteCount(5L).withWriteCount(writeCount).withAfterWriteCount(5L).withAfterChunkCount(2L)
.withChunkErrorCount(0L).withSkipInReadCount(0L).withSkipInProcessCount(0L).withSkipInWriteCount(0L)
.build();
validator.validate();
// if one is correct, all will be in the metricReader, so I check just one
Gauge gauge = meterRegistry.find(MetricsListener.METRIC_NAME)//
.tag("context", "flatFileToDbNoSkipJob.step")//
.tag("name", MetricNames.PROCESS_COUNT.getName())//
.gauge();
assertThat((Double) gauge.value(), is(notNullValue())); // TODO
assertThat(jdbcTemplate.queryForObject("SELECT COUNT(*) FROM ITEM", Long.class), is(writeCount));
}
@Test
public void testRunFlatFileToDbNoSkipJob_Failed() throws InterruptedException {
JobExecution jobExecution = runJob("flatFileToDbNoSkipJob", "metrics/flatFileToDbNoSkipJob_Failed.csv");
assertThat(jobExecution.getStatus(), is(BatchStatus.FAILED));
ExecutionContext executionContext = jobExecution.getStepExecutions().iterator().next().getExecutionContext();
long writeCount = 3L;
MetricValidator validator = MetricValidatorBuilder.metricValidator().withExecutionContext(executionContext)
.withBeforeChunkCount(1L).withStreamOpenCount(1L).withStreamUpdateCount(2L).withStreamCloseCount(0L)
.withBeforeReadCount(6L).withReadCount(6L).withAfterReadCount(5L).withReadErrorCount(0L)
.withBeforeProcessCount(3L).withProcessCount(3L).withAfterProcessCount(3L).withProcessErrorCount(1L)
.withBeforeWriteCount(3L).withWriteCount(writeCount).withAfterWriteCount(3L).withAfterChunkCount(1L)
.withChunkErrorCount(1L).withSkipInReadCount(0L).withSkipInProcessCount(0L).withSkipInWriteCount(0L)
.build();
validator.validate();
// if one is correct, all will be in the metricReader, so I check just one
Gauge gauge = meterRegistry.find(MetricsListener.METRIC_NAME)//
.tag("context", "flatFileToDbNoSkipJob.step")//
.tag("name", MetricNames.PROCESS_COUNT.getName())//
.gauge();
assertThat((Double) gauge.value(), is(notNullValue()));
assertThat(jdbcTemplate.queryForObject("SELECT COUNT(*) FROM ITEM", Long.class), is(writeCount));
}
@Override
protected void doReadPage() {
if (results == null) {
results = new CopyOnWriteArrayList<T>();
} else {
results.clear();
}
JobParameters params = stepExecution.getJobParameters();
ExecutionContext stepContext = stepExecution.getExecutionContext();
results.addAll(this.doRead(params, stepContext));
}
@Override
final protected void doOpen() throws Exception {
log.debug("处理记录开始 ");
super.doOpen();
// JobParameters params = JobParametersThreadLocal.get();
JobParameters params = stepExecution.getJobParameters();
ExecutionContext stepContext = stepExecution.getExecutionContext();
doOpen(params, stepContext);
}
/**
* Store the current resource index and position in the resource.
*/
@Override
public void update(ExecutionContext executionContext) throws ItemStreamException {
super.update(executionContext);
if (saveState) {
executionContext.putInt(getExecutionContextKey(RESOURCE_KEY), currentResource);
delegate.update(executionContext);
}
}
/**
* パーティションを追加します。
*
* @param partitions
* @param index
* @param gridSize
*/
protected void addPartitionInfo(Map<String, ExecutionContext> partitions, int index, int gridSize) {
val partitionInfo = createPartitionInfo(new PartitionInfo(index, gridSize));
val context = new ExecutionContext();
context.put(PartitionInfo.class.getCanonicalName(), partitionInfo);
val partitionName = String.format("partition%d", index);
partitions.put(partitionName, context);
}
@Override
public Map<String, ExecutionContext> partition(int gridSize) {
Map<String, ExecutionContext> result = null;
//determine whether to use PKs and Timestamps, or just PKs for partitioning logic
switch (partitionType) {
case "PK":
throw new UnsupportedOperationException("PK only partitions currently not supported");
case "PKTimeStamp":
result = configureForPKTimeStampPartitions(gridSize);
break;
default:
throw new RuntimeException("Partition type not specified");
}
return result;
}
private Map<String, ExecutionContext> getMap(int gridSize, ScheduledPartitionParams params) {
Map<String, ExecutionContext> result = new HashMap<>();
if ((params.getMaxId() -params.getMinId() +1) <= (gridSize+1)) {
populateContextMapWithPartitionCountLimit(params, result);
} else {
populateContextMapWithAllPartitions(gridSize, params, result);
}
if (params.getMaxTimeStamp() !=null){
informJobCompleteListenerOfLastDate(params.getMaxTimeStamp());
}
logger.info("partitioning complete");
return result;
}