下面列出了org.springframework.boot.autoconfigure.batch.JobExecutionEvent#org.springframework.batch.core.JobParametersBuilder 实例代码,或者点击链接到github查看源代码,也可以在右侧发表评论。
public JobResult launchJob(Job job) {
try {
JobParameters jobParameters = new JobParametersBuilder()
.addDate("timestamp", Calendar.getInstance().getTime())
.toJobParameters();
JobExecution jobExecution = jobLauncher.run(job, jobParameters);
return JobResult.builder()
.jobName(job.getName())
.jobId(jobExecution.getJobId())
.jobExitStatus(jobExecution.getExitStatus())
.timestamp(Calendar.getInstance().getTimeInMillis())
.build();
} catch (Exception e) {
log.error(e.getMessage());
throw new RuntimeException("launch job exception ", e);
}
}
/**
CREATE TABLE Z_TEST_APP (
appid INT,
zname VARCHAR2 (20),
flag VARCHAR2 (2),
CONSTRAINT app_pk PRIMARY KEY (appid)
);
CREATE TABLE Z_TEST_LOG (
logid INT,
msg VARCHAR2 (20),
logtime VARCHAR2 (8),
CONSTRAINT log_pk PRIMARY KEY (logid)
);
* @throws Exception
*/
@Test
public void testTwoJobs() throws Exception {
JobParameters jobParameters1 = new JobParametersBuilder()
.addLong("time", System.currentTimeMillis())
.addString("input.file.name", p.getCsvApp())
.toJobParameters();
jobLauncher.run(zappJob, jobParameters1);
JobParameters jobParameters2 = new JobParametersBuilder()
.addLong("time", System.currentTimeMillis())
.addString("input.file.name", p.getCsvLog())
.toJobParameters();
jobLauncher.run(zlogJob, jobParameters2);
logger.info("Main线程执行完成");
while (true) {
Thread.sleep(2000000L);
}
}
/**
* SpringBatch的断点续跑
* -----------------------------------------------------------------------------------------------
* 执行Step过程中发生异常时,而该异常又没有被配置为skip,那么整个Job会中断
* 当人工修正完异常数据后,再次调用jobLauncher.run(),SpringBatch会从上次异常的地方开始跑
* 1、当Step为读处理写时,假设10条数据,Step配置chunk=3(表明每三条Write一次),若第5条出现异常
* 那么前三条可以成功Write,第4条即便处理成功也不会写入数据库,在修复后再跑的时,会从第4条开始读
* 2、当Step为Tasklet时,仅当Tasklet全部执行完,且未发生异常,才会真正的提交事务,写入数据到数据库
* 即只要其中某一条数据处理时发生异常,那么无论之前提交了多少次数据到数据库,都不会真正的写入数据库
* 3、当并行Step中的某个Step出现异常时,那么并行中的其它Step不受影响,会继续跑完,然后才会中断Job
* 修复数据后再跑时,会直接从并行中发生异常的该Step开始跑,其它未发生异常的并行中的Step不会重复跑
* 注意:断点续跑时,传入的jobParameters必须相同,否则会认为是另一个任务,会从头跑,不会从断点的地方跑
* 也就是说,这一切都建立在jobParameters传值相同的条件下
* 另外:对于JobOperator.start()和restart()两个方法都试过,都没实现断点续跑的功能
* -----------------------------------------------------------------------------------------------
*/
@RequestMapping("/batch")
//@SeedQSSReg(qssHost="${qss.host}", appHost="${qss.appHost}", appname="${qss.appname}", name="${qss.name}", cron="${qss.cron}")
CommResult<JobInstance> batch(String bizDate) throws Exception {
//判断是否断点续跑
boolean isResume = false;
if(StringUtils.isBlank(bizDate)){
bizDate = DateFormatUtils.format(new Date(), "yyyyMMdd");
}else{
isResume = true;
}
LogUtil.getLogger().info("结算跑批{}:Starting...bizDate={}", isResume?":断点续跑":"", bizDate);
//构造JobParameters
JobParametersBuilder jobParametersBuilder = new JobParametersBuilder();
jobParametersBuilder.addString("bizDate", bizDate);
//执行job
JobExecution execution = jobLauncher.run(settleJob, jobParametersBuilder.toJobParameters());
LogUtil.getLogger().info("结算跑批{}:Ending......", isResume?":断点续跑":"");
return CommResult.success(execution.getJobInstance());
}
/**
* @param jobNameStr 任务名字符串
* @param jobNameDesc 任务名描述
* @param time 随机数:批量的唯一标志(非断点续跑可直接传null)
* @param parameterMap 其它参数:不会作为批量的唯一标志(无参可传null)
* Comment by 玄玉<https://jadyer.cn/> on 2019/8/12 18:25.
*/
private JobExecution runJob(String jobNameStr, String jobNameDesc, String time, Map<String, String> parameterMap) throws Exception {
//判断是否断点续跑
boolean isResume = false;
long timeLong;
if(StringUtils.isBlank(time)){
timeLong = SystemClockUtil.INSTANCE.now();
}else{
isResume = true;
timeLong = Long.parseLong(time);
}
LogUtil.getLogger().info("{}==>{}:Starting...time={}", jobNameDesc, isResume?":断点续跑":"", timeLong);
//构造JobParameters
JobParametersBuilder jobParametersBuilder = new JobParametersBuilder();
jobParametersBuilder.addLong("time", timeLong);
if(null!=parameterMap && !parameterMap.isEmpty()){
for(Map.Entry<String,String> entry : parameterMap.entrySet()){
jobParametersBuilder.addString(entry.getKey(), entry.getValue(), false);
}
}
//执行job
Job xmlSettleJob = (Job)SpringContextHolder.getBean(jobNameStr);
JobExecution execution = jobLauncher.run(xmlSettleJob, jobParametersBuilder.toJobParameters());
LogUtil.getLogger().info("{}==>{}:Ending......jobInstance={}", jobNameDesc, isResume?":断点续跑":"", execution.getJobInstance());
return execution;
}
CommandLineRunner runner(JobLauncher launcher,
Job job,
@Value("${file}") File in,
JdbcTemplate jdbcTemplate) {
return args -> {
JobExecution execution = launcher.run(job,
new JobParametersBuilder()
.addString("file", in.getAbsolutePath())
.toJobParameters());
System.out.println("execution status: " + execution.getExitStatus().toString());
List<Person> personList = jdbcTemplate.query("select * from PEOPLE", (resultSet, i) -> new Person(resultSet.getString("first"),
resultSet.getString("last"),
resultSet.getString("email")));
personList.forEach(System.out::println);
};
}
@Test
public void registerPeriodSchedulerIT() {
final JobSchedulerConfiguration jobSchedulerConfiguration = DomainTestHelper.createJobSchedulerConfiguration(null,
10L, 10L, JobSchedulerType.PERIOD);
final JobConfiguration jobConfiguration = DomainTestHelper.createJobConfiguration(jobSchedulerConfiguration);
final ThreadPoolTaskScheduler scheduler = new ThreadPoolTaskScheduler();
scheduler.initialize();
final SchedulerConstructorWrapper schedulerConstructorWrapper = new SchedulerConstructorWrapper();
schedulerConstructorWrapper.setJob(this.simpleJob);
schedulerConstructorWrapper.setJobConfiguration(jobConfiguration);
schedulerConstructorWrapper.setJobIncrementer(JobIncrementer.DATE);
schedulerConstructorWrapper.setJobLauncher(this.jobLauncher);
schedulerConstructorWrapper.setJobParameters(new JobParametersBuilder().toJobParameters());
schedulerConstructorWrapper.setThreadPoolTaskScheduler(scheduler);
final Set<Object> constructorValues = new HashSet<>();
constructorValues.add(schedulerConstructorWrapper);
this.beanRegistrar
.registerBean(PeriodScheduler.class, "sampleBeanRegistrar", constructorValues, null, null, null, null);
final PeriodScheduler periodScheduler = this.applicationContext.getBean("sampleBeanRegistrar", PeriodScheduler
.class);
assertThat(periodScheduler).isNotNull();
periodScheduler.schedule();
this.applicationContext.close();
}
private static void attachJobParameter(final JobParametersBuilder jobParametersBuilder, final String parameterName,
final Object parameterValue) {
if (parameterValue instanceof Long) {
jobParametersBuilder.addLong(parameterName, (Long) parameterValue);
} else if (parameterValue instanceof Integer) {
jobParametersBuilder.addLong(parameterName, Long.valueOf((Integer) parameterValue));
} else if (parameterValue instanceof Date) {
jobParametersBuilder.addDate(parameterName, (Date) parameterValue);
} else if (parameterValue instanceof String) {
jobParametersBuilder.addString(parameterName, (String) parameterValue);
} else if (parameterValue instanceof Double) {
jobParametersBuilder.addDouble(parameterName, (Double) parameterValue);
} else {
log.error("Could not add Parameter. Cause: Unsupported Parameter Type:" + parameterValue.getClass() + " !");
}
}
@Test
public void testActivateListener() {
final JobListenerConfiguration jobListenerConfiguration = DomainTestHelper.createJobListenerConfiguration
("src/test/", "*.txt", JobListenerType.LOCAL_FOLDER_LISTENER);
jobListenerConfiguration.setBeanName("testBean");
jobListenerConfiguration.setListenerStatus(ListenerStatus.ACTIVE);
final JobConfiguration jobConfiguration = DomainTestHelper.createJobConfiguration(jobListenerConfiguration);
final ListenerConstructorWrapper listenerConstructorWrapper = new ListenerConstructorWrapper();
listenerConstructorWrapper.setJobIncrementer(JobIncrementer.DATE);
listenerConstructorWrapper.setJob(this.job);
listenerConstructorWrapper.setJobConfiguration(jobConfiguration);
listenerConstructorWrapper.setJobLauncher(this.jobLauncher);
listenerConstructorWrapper.setJobParameters(new JobParametersBuilder().toJobParameters());
final FolderListener folderListener = new FolderListener(listenerConstructorWrapper);
when(this.applicationContext.getBean(anyString(), any(Class.class))).thenReturn(folderListener);
when(this.applicationContext.containsBean(anyString())).thenReturn(Boolean.TRUE);
try {
this.listenerService.activateListener("testBean", Boolean.FALSE);
} catch (final Exception e) {
fail(e.getMessage());
}
}
@Test
public void testTerminateListener() {
final JobListenerConfiguration jobListenerConfiguration = DomainTestHelper.createJobListenerConfiguration
("src/test/", "*.txt", JobListenerType.LOCAL_FOLDER_LISTENER);
jobListenerConfiguration.setBeanName("testBean");
jobListenerConfiguration.setListenerStatus(ListenerStatus.ACTIVE);
final JobConfiguration jobConfiguration = DomainTestHelper.createJobConfiguration(jobListenerConfiguration);
final ListenerConstructorWrapper listenerConstructorWrapper = new ListenerConstructorWrapper();
listenerConstructorWrapper.setJobIncrementer(JobIncrementer.DATE);
listenerConstructorWrapper.setJob(this.job);
listenerConstructorWrapper.setJobConfiguration(jobConfiguration);
listenerConstructorWrapper.setJobLauncher(this.jobLauncher);
listenerConstructorWrapper.setJobParameters(new JobParametersBuilder().toJobParameters());
final FolderListener folderListener = new FolderListener(listenerConstructorWrapper);
when(this.applicationContext.getBean(anyString(), any(Class.class))).thenReturn(folderListener);
when(this.applicationContext.containsBean(anyString())).thenReturn(Boolean.TRUE);
try {
this.listenerService.terminateListener("testBean");
} catch (final Exception e) {
fail(e.getMessage());
}
}
@Before
public void init() throws Exception {
final MapJobRepositoryFactoryBean mapJobRepositoryFactoryBean = new MapJobRepositoryFactoryBean();
mapJobRepositoryFactoryBean.getObject();
this.jobExecutionDao = mapJobRepositoryFactoryBean.getJobExecutionDao();
this.jobInstanceDao = mapJobRepositoryFactoryBean.getJobInstanceDao();
final MapJobExplorerFactoryBean mapJobExplorerFactoryBean = new MapJobExplorerFactoryBean(
mapJobRepositoryFactoryBean);
this.jobExplorer = mapJobExplorerFactoryBean.getObject();
this.mapLightminJobExecutionDao = new MapLightminJobExecutionDao(this.jobExplorer);
this.jobInstance = this.jobInstanceDao.createJobInstance("someJob", new JobParametersBuilder().toJobParameters());
final List<JobExecution> jobExecutions = DomainTestHelper.createJobExecutions(JOB_EXECUTION_COUNT);
for (final JobExecution jobExecution : jobExecutions) {
jobExecution.setId(null);
jobExecution.setJobInstance(this.jobInstance);
this.jobExecutionDao.saveJobExecution(jobExecution);
}
}
@Before
public void init() {
try {
final org.springframework.batch.core.JobExecution execution = this.jobLauncher.run(this.simpleJob, new JobParametersBuilder().addLong("time", System
.currentTimeMillis())
.toJobParameters());
this.launchedJobExecutionId = execution.getId();
this.launchedJobInstanceId = execution.getJobInstance().getId();
final Collection<org.springframework.batch.core.StepExecution> stepExecutions = execution.getStepExecutions();
for (final org.springframework.batch.core.StepExecution stepExecution : stepExecutions) {
this.launchedStepExecutionId = stepExecution.getId();
}
this.jobExecution = execution;
} catch (final Exception e) {
fail(e.getMessage());
}
}
@Before
public void init() {
try {
final org.springframework.batch.core.JobExecution execution = this.jobLauncher.run(this.simpleJob, new JobParametersBuilder().addLong("time", System
.currentTimeMillis())
.toJobParameters());
this.launchedJobExecutionId = execution.getId();
this.launchedJobInstanceId = execution.getJobInstance().getId();
final Collection<org.springframework.batch.core.StepExecution> stepExecutions = execution.getStepExecutions();
for (final org.springframework.batch.core.StepExecution stepExecution : stepExecutions) {
this.launchedStepExecutionId = stepExecution.getId();
}
this.jobExecution = execution;
} catch (final Exception e) {
fail(e.getMessage());
}
}
@DirtiesContext
@Test
public void runDifferentInstances() throws Exception {
this.job = this.jobs.get("job")
.start(this.steps.get("step").tasklet(throwingTasklet()).build()).build();
// start a job instance
JobParameters jobParameters = new JobParametersBuilder().addString("name", "foo")
.toJobParameters();
runFailedJob(jobParameters);
assertThat(this.jobExplorer.getJobInstances("job", 0, 100)).hasSize(1);
// start a different job instance
JobParameters otherJobParameters = new JobParametersBuilder()
.addString("name", "bar").toJobParameters();
runFailedJob(otherJobParameters);
assertThat(this.jobExplorer.getJobInstances("job", 0, 100)).hasSize(2);
}
@DirtiesContext
@Test
public void retryFailedExecutionOnNonRestartableJob() throws Exception {
this.job = this.jobs.get("job").preventRestart()
.start(this.steps.get("step").tasklet(throwingTasklet()).build())
.incrementer(new RunIdIncrementer()).build();
runFailedJob(new JobParameters());
runFailedJob(new JobParameters());
// A failed job that is not restartable does not re-use the job params of
// the last execution, but creates a new job instance when running it again.
assertThat(this.jobExplorer.getJobInstances("job", 0, 100)).hasSize(2);
// try to re-run a failed execution
Executable executable = () -> this.runner.execute(this.job,
new JobParametersBuilder().addLong("run.id", 1L).toJobParameters());
assertThatExceptionOfType(JobRestartException.class)
.isThrownBy(executable::execute)
.withMessage("JobInstance already exists and is not restartable");
}
@Transactional(propagation = Propagation.NOT_SUPPORTED)
public void updatePostViews() {
LocalDateTime now = LocalDateTime.now();
Set<JobExecution> jobExecutions = jobExplorer.findRunningJobExecutions("updatePostViewsJob");
for (JobExecution jobExecution : jobExecutions) {
LocalDateTime startTime = LocalDateTime.ofInstant(jobExecution.getStartTime().toInstant(), ZoneId.systemDefault());
Duration d = Duration.between(now, startTime);
if (Math.abs(d.toMinutes()) == 0) {
logger.info("Skip processing because the job is running.");
return;
}
}
JobParameters params = new JobParametersBuilder()
.addDate("now", Date.from(now.atZone(ZoneId.systemDefault()).toInstant()))
.toJobParameters();
try {
jobLauncher.run(updatePostViewsJob, params);
} catch (Exception e) {
throw new ServiceException(e);
}
}
@Test
public void taxCalculationStep_generatesCorrectCalculation() throws Exception {
Employee employee = haveOneEmployee();
JobParameters jobParameters = new JobParametersBuilder()
.addLong("year", 2014L, true)
.addLong("month", 5L, true)
.toJobParameters();
JobExecution jobExecution = jobLauncherTestUtils.launchStep(EmployeeJobConfigSingleJvm.TAX_CALCULATION_STEP, jobParameters);
assertThat(jobExecution.getExitStatus()).isEqualTo(ExitStatus.COMPLETED);
List<TaxCalculation> byEmployee = taxCalculationRepository.findByEmployee(employee);
assertThat(byEmployee).hasSize(1);
TaxCalculation taxCalculation = byEmployee.get(0);
assertThat(taxCalculation.getEmployee().getId()).isEqualTo(employee.getId());
assertThat(taxCalculation.getYear()).isEqualTo(2014);
assertThat(taxCalculation.getMonth()).isEqualTo(5);
List<TaxCalculation> byYearAndMonth = taxCalculationRepository.find(2014, 5, 1L);
assertThat(byYearAndMonth).hasSize(1);
}
@Test
public void createProtocol() throws Exception {
// Given
JobExecution jobExecution = new JobExecution(1L,
new JobParametersBuilder().addString("test", "value").toJobParameters());
jobExecution.setJobInstance(new JobInstance(1L, "test-job"));
jobExecution.setCreateTime(new Date());
jobExecution.setStartTime(new Date());
jobExecution.setEndTime(new Date());
jobExecution.setExitStatus(new ExitStatus("COMPLETED_WITH_ERRORS", "This is a default exit message"));
jobExecution.getExecutionContext().put("jobCounter", 1);
StepExecution stepExecution = jobExecution.createStepExecution("test-step-1");
stepExecution.getExecutionContext().put("stepCounter", 1);
ProtocolListener protocolListener = new ProtocolListener();
// When
protocolListener.afterJob(jobExecution);
// Then
String output = this.outputCapture.toString();
assertThat(output, containsString("Protocol for test-job"));
assertThat(output, containsString("COMPLETED_WITH_ERRORS"));
}
private static void runJob(AnnotationConfigApplicationContext context, String batchJobName) {
final JobLauncher jobLauncher = (JobLauncher) context.getBean("jobLauncher");
final Job job = (Job) context.getBean(batchJobName);
LOGGER.info("Starting the batch job: {}", batchJobName);
try {
// To enable multiple execution of a job with the same parameters
JobParameters jobParameters = new JobParametersBuilder().addString("jobID", String.valueOf(System.currentTimeMillis()))
.toJobParameters();
final JobExecution execution = jobLauncher.run(job, jobParameters);
LOGGER.info("Job Status : {}", execution.getStatus());
} catch (final Exception e) {
e.printStackTrace();
LOGGER.error("Job failed {}", e.getMessage());
}
}
/**
* 根据类名反射运行相应的任务
*
* @param c 定义的Bean类
*/
public void runTask(Class c) throws Exception {
TableName a = (TableName) c.getAnnotation(TableName.class);
String tableName = a.value();
Field[] fields = c.getDeclaredFields();
List<String> fieldNames = new ArrayList<>();
List<String> paramNames = new ArrayList<>();
for (Field f : fields) {
fieldNames.add(f.getName());
paramNames.add(":" + f.getName());
}
String columnsStr = String.join(",", fieldNames);
String paramsStr = String.join(",", paramNames);
String csvFileName;
if (p.getLocation() == 1) {
csvFileName = p.getCsvDir() + tableName + ".csv";
} else {
csvFileName = tableName + ".csv";
}
JobParameters jobParameters1 = new JobParametersBuilder()
.addLong("time", System.currentTimeMillis())
.addString(KEY_JOB_NAME, tableName)
.addString(KEY_FILE_NAME, csvFileName)
.addString(KEY_VO_NAME, c.getCanonicalName())
.addString(KEY_COLUMNS, String.join(",", fieldNames))
.addString(KEY_SQL, "insert into " + tableName + " (" + columnsStr + ")" + " values(" + paramsStr + ")")
.toJobParameters();
jobLauncher.run(commonJob, jobParameters1);
}
@Test
public void testBudgetVtoll() throws Exception {
JobParameters jobParameters = new JobParametersBuilder()
.addLong("time", System.currentTimeMillis())
.addString("input.file.name", p.getCsvVtoll())
.toJobParameters();
jobLauncher.run(vtollJob, jobParameters);
logger.info("Main线程执行完成");
while (true) {
Thread.sleep(2000000L);
}
}
@Test
public void testCanton() throws Exception {
JobParameters jobParameters = new JobParametersBuilder()
.addLong("time", System.currentTimeMillis())
.addString("input.file.name", p.getCsvCanton())
.toJobParameters();
jobLauncher.run(cantonJob, jobParameters);
logger.info("Main线程执行完成");
while (true) {
Thread.sleep(2000000L);
}
}
/**
* 测试一个配置类,可同时运行多个任务
* @throws Exception 异常
*/
@Test
public void testCommonJobs() throws Exception {
JobParameters jobParameters1 = new JobParametersBuilder()
.addLong("time",System.currentTimeMillis())
.addString(KEY_JOB_NAME, "App")
.addString(KEY_FILE_NAME, p.getCsvApp())
.addString(KEY_VO_NAME, "com.xncoding.trans.modules.zapp.App")
.addString(KEY_COLUMNS, String.join(",", new String[]{
"appid", "zname", "flag"
}))
.addString(KEY_SQL, "insert into z_test_App (appid, zname, flag) values(:appid, :zname, :flag)")
.toJobParameters();
jobLauncher.run(commonJob, jobParameters1);
JobParameters jobParameters2 = new JobParametersBuilder()
.addLong("time",System.currentTimeMillis())
.addString(KEY_JOB_NAME, "Log")
.addString(KEY_FILE_NAME, p.getCsvLog())
.addString(KEY_VO_NAME, "com.xncoding.trans.modules.zlog.Log")
.addString(KEY_COLUMNS, String.join(",", new String[]{
"logid", "msg", "logtime"
}))
.addString(KEY_SQL, "insert into z_test_Log (logid, msg, logtime) values(:logid, :msg, :logtime)")
.toJobParameters();
jobLauncher.run(commonJob, jobParameters2);
logger.info("Main线程执行完成");
while (true) {
Thread.sleep(2000000L);
}
}
/**
* 一起测试4个CSV文件导入
* @throws Exception 异常
*/
@Test
public void testImportCsv4() throws Exception {
JobParameters jobParameters1 = new JobParametersBuilder()
.addLong("time",System.currentTimeMillis())
.addString(KEY_JOB_NAME, "BscExeOffice")
.addString(KEY_FILE_NAME, p.getCsvExeOffice())
.addString(KEY_VO_NAME, "com.xncoding.trans.modules.common.vo.BscExeOffice")
.addString(KEY_COLUMNS, String.join(",", new String[]{
"id","cantonid","code","name","memcode","supdeptid","comdeptid","contactman","tel","mobil","email","bgofficeid","infomobil","infoman","logpass","startdate","stopdate","status","memo","auditer","audittime","isaudit","edittime","platform_id","isprintbill"
}))
.addString(KEY_SQL, "insert into NT_BSC_EXEOFFICE (F_ID,F_CANTONID,F_CODE,F_NAME,F_MEMCODE,F_SUPDEPTID,F_COMDEPTID,F_CONTACTMAN,F_TEL,F_MOBIL,F_EMAIL,F_BGOFFICEID,F_INFOMOBIL,F_INFOMAN,F_LOGPASS,F_STARTDATE,F_STOPDATE,F_STATUS,F_MEMO,F_AUDITER,F_AUDITTIME,F_ISAUDIT,F_EDITTIME,F_PLATFORM_ID,F_ISPRINTBILL)" +
" values(:id, :cantonid, :code, :name, :memcode, :supdeptid, :comdeptid, :contactman, :tel, :mobil, :email, :bgofficeid, :infomobil, :infoman, :logpass, :startdate, :stopdate, :status, :memo, :auditer, :audittime, :isaudit, :edittime, :platform_id, :isprintbill)")
.toJobParameters();
jobLauncher.run(commonJob, jobParameters1);
// JobParameters jobParameters2 = new JobParametersBuilder()
// .addLong("time",System.currentTimeMillis())
// .addString(KEY_JOB_NAME, "Log")
// .addString(KEY_FILE_NAME, p.getCsvLog())
// .addString(KEY_VO_NAME, "com.xncoding.trans.modules.zlog.Log")
// .addString(KEY_COLUMNS, String.join(",", new String[]{
// "logid", "msg", "logtime"
// }))
// .addString(KEY_SQL, "insert into z_test_Log (logid, msg, logtime) values(:logid, :msg, :logtime)")
// .toJobParameters();
// jobLauncher.run(commonJob, jobParameters2);
logger.info("Main线程执行完成");
while (true) {
Thread.sleep(2000000L);
}
}
@Scheduled(cron = "0/10 * * * * ?")
public void runBatchJob()
throws JobExecutionAlreadyRunningException,
JobRestartException, JobInstanceAlreadyCompleteException,
JobParametersInvalidException {
LOGGER.info("start runBatchJob");
if (enabled) {
jobLauncher.run(capitalizeNamesJob, new JobParametersBuilder()
.addDate("date", new Date()).toJobParameters());
}
}
@GetMapping("launcher/{message}")
public String launcher(@PathVariable String message) throws Exception {
JobParameters parameters = new JobParametersBuilder()
.addString("message", message)
.toJobParameters();
// 将参数传递给任务
jobLauncher.run(job, parameters);
return "success";
}
@Scheduled(fixedRate = 5000)
public void startJob() throws Exception {
JobExecution execution = jobLauncher.run(
deptBatchJob(),
new JobParametersBuilder().addLong("procId", System.nanoTime()).toJobParameters()
);
}
@Scheduled(fixedRate = 5000)
public void startJob() throws Exception {
JobExecution execution = jobLauncher.run(
deptBatchJob(),
new JobParametersBuilder().addLong("procId", System.nanoTime()).toJobParameters()
);
}
@Scheduled(fixedRate = 5000)
public void run() throws Exception {
JobExecution execution = jobLauncher.run(
customerReportJob(),
new JobParametersBuilder().addLong("uniqueness", System.nanoTime()).toJobParameters()
);
log.info("Exit status: {}", execution.getStatus());
}
@Bean
IntegrationFlow batchJobFlow(Job job,
JdbcTemplate jdbcTemplate,
JobLauncher launcher,
MessageChannel files) {
return IntegrationFlows.from(files)
.transform((GenericTransformer<Object,JobLaunchRequest>) file -> {
System.out.println(file.toString());
System.out.println(file.getClass());
return null ;
})
.transform((GenericTransformer<File, JobLaunchRequest>) file -> {
JobParameters jp = new JobParametersBuilder()
.addString("file", file.getAbsolutePath())
.toJobParameters();
return new JobLaunchRequest(job, jp);
})
.handle(new JobLaunchingGateway(launcher))
.handle(JobExecution.class, (payload, headers) -> {
System.out.println("job execution status: " + payload.getExitStatus().toString());
List<Person> personList = jdbcTemplate.query("select * from PEOPLE",
(resultSet, i) -> new Person(resultSet.getString("first"),
resultSet.getString("last"),
resultSet.getString("email")));
personList.forEach(System.out::println);
return null;
})
.get();
}
@Test
public void parseJobParametersToStringTest() {
final JobParameters jobParameters = new JobParametersBuilder().addLong("long", 1L).addString("String",
"someString").toJobParameters();
final String result = DomainParameterParser.parseJobParametersToString(jobParameters);
assertThat(result).isEqualTo("String(String)=someString,long(Long)=1");
}