前面章节介绍了 Spring Batch 的核心概念等知识,下面将展示如何创建一个基础的 Spring Batch Job 作业。
在创建 Job 作业前,我们先在 application.properties 属性配置文件中添加如下配置:
spring.application.name=spring_batch_demo spring.datasource.driver-class-name=com.mysql.cj.jdbc.Driver spring.datasource.url=jdbc:mysql://localhost:3306/spring_batch4?useSSL=false&serverTimezone=UTC&allowPublicKeyRetrieval=true spring.datasource.username=root spring.datasource.password=aaaaaa spring.batch.jdbc.schema=classpath:org/springframework/batch/core/schema-@@platform@@.sql spring.batch.jdbc.platform=mysql spring.batch.jdbc.initialize-schema=always
上述配置信息中,配置了数据源和 spring batch 配置信息。
在 Spring Batch 里,start() 和 next() 方法是用来定义 Job 执行流程的关键方法。下面对它们的功能和使用方式进行详细说明:
此方法用于设定 Job 开始执行的首个 Step,每个 Job 都必须通过 start() 方法来开启执行流程。方法定义如下:
public SimpleJobBuilder start(Step step) { return (new SimpleJobBuilder(this)).start(step); }
简单用法如下:
@Bean public Job myJob(JobBuilderFactory jobs, Step step1) { return jobs.get("myJob") .start(step1) // 指定 step1 为 Job 的起始 Step .build(); }
注意,每个 Job 有且只能有一个起始 Step,且该方法返回 JobFlowBuilder 对象,可借助它进一步定义后续的执行步骤。
该方法用于定义 Step 执行的顺序,即当一个 Step 成功完成后,接下来要执行的 Step 是哪一个。方法定义如下:
public SimpleJobBuilder next(Step step) { this.steps.add(step); return this; }
简单用法如下:
@Bean public Job myJob(JobBuilderFactory jobs, Step step1, Step step2, Step step3) { return jobs.get("myJob") .start(step1) .next(step2) // 若 step1 成功完成,则执行 step2 .next(step3) // 若 step2 成功完成,则执行 step3 .build(); }
注意,只有在前一个 Step 成功完成(状态为 COMPLETED)时,才会按照 next() 方法的定义执行后续 Step。并且还可以链式调用 next() 方法,从而构建出线性的执行流程。
下面是一个完整的示例,该实例创建了三个 Step,它们将依次执行,代码如下:
package com.hxstrive.spring_batch.demo1.config; import org.springframework.batch.core.*; import org.springframework.batch.core.configuration.annotation.JobBuilderFactory; import org.springframework.batch.core.configuration.annotation.StepBuilderFactory; import org.springframework.batch.core.scope.context.ChunkContext; import org.springframework.batch.core.step.tasklet.Tasklet; import org.springframework.batch.repeat.RepeatStatus; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; /** * Spring Batch 配置类 * @author hxstrive.com */ @Configuration public class BatchConfig { // 用于创建和配置 Job 对象的工厂类 @Autowired private JobBuilderFactory jobBuilderFactory; // 用于创建和配置 Step 对象的工厂类 @Autowired private StepBuilderFactory stepBuilderFactory; // 创建一个名为 jobDemoJob 的任务 @Bean public Job jobDemoJob() { return jobBuilderFactory.get("jobDemoJob") // 配置任务要执行的步骤 .start(loadDataStep()) // 加载数据 .next(handleDataStep()) // 处理数据 .next(saveDataStep()) // 保存数据 .build(); } // 创建一个名为 loadDataStep 的步骤,模拟加载要处理的数据 @Bean public Step loadDataStep() { return stepBuilderFactory.get("loadDataStep").tasklet(new Tasklet() { @Override public RepeatStatus execute(StepContribution stepContribution, ChunkContext chunkContext) throws Exception { System.out.println("加载数据……"); return RepeatStatus.FINISHED; // 返回 FINISHED 表明任务执行结束 } }).build(); } // 创建一个名为 handleDataStep 的步骤,模拟处理数据 @Bean public Step handleDataStep() { return stepBuilderFactory.get("handleDataStep").tasklet(new Tasklet() { @Override public RepeatStatus execute(StepContribution stepContribution, ChunkContext chunkContext) throws Exception { System.out.println("处理数据……"); return RepeatStatus.FINISHED; // 返回 FINISHED 表明任务执行结束 } }).build(); } // 创建一个名为 handleDataStep 的步骤,模拟保存数据到数据库 @Bean public Step saveDataStep() { return stepBuilderFactory.get("saveDataStep").tasklet(new Tasklet() { @Override public RepeatStatus execute(StepContribution stepContribution, ChunkContext chunkContext) throws Exception { System.out.println("保存数据……"); return RepeatStatus.FINISHED; // 返回 FINISHED 表明任务执行结束 } }).build(); } }
执行上面示例,输出信息如下:
上图中,三个 Step 均成功执行。
在 Spring Batch 里,除了上述的 start()、next() 方法外,还提供了 on()、to()、from() 这些方法。这些方法都是用于构建作业流程的 DSL(领域特定语言)的重要组成部分。下面为你详细介绍它们的功能和用法。
该方法用于定义步骤执行后的跳转条件,通常和 end()、stopAndRestart() 或者 to() 搭配使用。on() 方法的参数是一个字符串,表示步骤执行后的状态(像 COMPLETED、FAILED 等)。方法定义如下:
public FlowBuilder.TransitionBuilder<FlowJobBuilder> on(String pattern) { Assert.state(this.steps.size() > 0, "You have to start a job with a step"); Iterator var2 = this.steps.iterator(); while(var2.hasNext()) { Step step = (Step)var2.next(); if (this.builder == null) { this.builder = new JobFlowBuilder(new FlowJobBuilder(this), step); } else { this.builder.next(step); } } return this.builder.on(pattern); }
简单用法如下:
@Bean public Job myJob(JobBuilderFactory jobs, Step step1, Step step2, Step step3) { return jobs.get("myJob") .start(step1) .on("COMPLETED").to(step2) // 若 step1 完成,就转到 step2 .from(step1).on("FAILED").to(step3) // 若 step1 失败,就转到 step3 .end() .build(); }
to() 方法用于指定当满足特定条件时要跳转的目标步骤。它要和 on() 方法配合起来使用。方法定义如下:
public FlowBuilder<Q> to(Step step) { State next = this.parent.createState(step); this.parent.addTransition(this.pattern, next); this.parent.currentState = next; return this.parent; }
简单用法如下:
@Bean public Job myJob(JobBuilderFactory jobs, Step step1, Step step2) { return jobs.get("myJob") .start(step1) .on("COMPLETED").to(step2) // 当 step1 状态为 COMPLETED 时,跳转到 step2 .from(step1).on("FAILED").end() // 当 step1 状态为 FAILED 时,结束作业 .build(); }
from() 方法有两个用途:
一是从某个步骤继续构建分支流程;
二是实现状态转换的循环(比如重试逻辑);
方法定义如下:
public FlowBuilder<Q> from(Step step) { this.doFrom(step); return this; }
简单用法如下:
@Bean public Job myJob(JobBuilderFactory jobs, Step step1, Step step2, Step retryStep) { return jobs.get("myJob") .start(step1) .on("FAILED").to(retryStep) // 若 step1 失败,转到重试步骤 .from(retryStep).on("FAILED").end() // 若重试失败,结束作业 .from(retryStep).on("COMPLETED").to(step2) // 若重试成功,转到 step2 .end() .build(); }
下面是一个完整的示例,该实例创建了三个 Step,它们将依次执行,代码如下:
package com.hxstrive.spring_batch.demo2.config; import org.springframework.batch.core.Job; import org.springframework.batch.core.Step; import org.springframework.batch.core.StepContribution; import org.springframework.batch.core.configuration.annotation.JobBuilderFactory; import org.springframework.batch.core.configuration.annotation.StepBuilderFactory; import org.springframework.batch.core.scope.context.ChunkContext; import org.springframework.batch.core.step.tasklet.Tasklet; import org.springframework.batch.repeat.RepeatStatus; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; /** * Spring Batch 配置类 * @author hxstrive.com */ @Configuration public class BatchConfig { // 用于创建和配置 Job 对象的工厂类 @Autowired private JobBuilderFactory jobBuilderFactory; // 用于创建和配置 Step 对象的工厂类 @Autowired private StepBuilderFactory stepBuilderFactory; // 创建一个名为 jobDemoJob2 的任务 @Bean public Job jobDemoJob2() { return jobBuilderFactory.get("jobDemoJob2") // 配置任务要执行的步骤 .start(loadDataStep()) // 加载数据 .on("COMPLETED").to(handleDataStep()) // 处理数据 .from(handleDataStep()).on("COMPLETED").to(saveDataStep()) // 保存数据 .from(saveDataStep()).end() .build(); } // 创建一个名为 loadDataStep 的步骤,模拟加载要处理的数据 @Bean public Step loadDataStep() { return stepBuilderFactory.get("loadDataStep").tasklet(new Tasklet() { @Override public RepeatStatus execute(StepContribution stepContribution, ChunkContext chunkContext) throws Exception { System.out.println("加载数据……"); return RepeatStatus.FINISHED; // 返回 FINISHED 表明任务执行结束 } }).allowStartIfComplete(true).build(); } // 创建一个名为 handleDataStep 的步骤,模拟处理数据 @Bean public Step handleDataStep() { return stepBuilderFactory.get("handleDataStep").tasklet(new Tasklet() { @Override public RepeatStatus execute(StepContribution stepContribution, ChunkContext chunkContext) throws Exception { System.out.println("处理数据……"); return RepeatStatus.FINISHED; // 返回 FINISHED 表明任务执行结束 } }).allowStartIfComplete(true).build(); } // 创建一个名为 handleDataStep 的步骤,模拟保存数据到数据库 @Bean public Step saveDataStep() { return stepBuilderFactory.get("saveDataStep").tasklet(new Tasklet() { @Override public RepeatStatus execute(StepContribution stepContribution, ChunkContext chunkContext) throws Exception { System.out.println("保存数据……"); return RepeatStatus.FINISHED; // 返回 FINISHED 表明任务执行结束 } }).allowStartIfComplete(true).build(); } }
执行上面示例,输出信息如下:
上图中,所有 Step 都执行成功。