Spring Batch4 是一个强大的批处理框架,提供了丰富的 API 用于构建高效、可靠的批处理作业。以下是其核心 API 的详细介绍:
Job:批处理作业的顶层接口,定义了作业的结构和执行逻辑。
JobInstance:作业的运行实例,由 Job + 参数唯一标识。
JobExecution:作业的一次具体执行,包含执行状态、开始时间、结束时间等信息。
JobParameters:作业运行时的参数,用于区分不同的 JobInstance。
Step:作业的执行单元,一个 Job 由多个 Step 组成。
StepExecution:Step 的一次具体执行,包含执行状态和统计信息。
ItemReader:从数据源读取数据的接口。
ItemProcessor:处理 / 转换数据的接口。
ItemWriter:将数据写入目标位置的接口。
Chunk:数据块,包含多条记录,用于分块处理。
用于创建 Job 和 Step 的工厂类,是配置批处理作业的入口点。用法如下:
@Configuration public class BatchConfig { @Autowired private JobBuilderFactory jobBuilderFactory; @Autowired private StepBuilderFactory stepBuilderFactory; // 创建Job Job myJob = jobBuilderFactory.get("myJob") .start(myStep1()) .next(myStep2()) .build(); // 创建Step(分块处理示例) Step myStep1 = stepBuilderFactory.get("myStep1") .<Input,Output>chunk(100) // 每100条记录为一个块 .reader(itemReader()) .processor(itemProcessor()) .writer(itemWriter()) .build(); }
从各种数据源读取数据,支持迭代式读取。定义如下:
public interface ItemReader<T> { T read() throws Exception, UnexpectedInputException, ParseException; }
常见实现类:
JdbcCursorItemReader:基于 JDBC 游标读取数据库记录。
FlatFileItemReader:读取文本文件(如 CSV、固定格式文件)。
StaxEventItemReader:读取 XML 文件。
JmsItemReader:从 JMS 队列读取消息。
处理 / 转换数据,可选组件。定义如下:
public interface ItemProcessor<I, O> { O process(I item) throws Exception; }
示例:
public class UserAgeProcessor implements ItemProcessor<User, User> { @Override public User process(User item) throws Exception { // 处理逻辑:年龄加倍 item.setAge(item.getAge() * 2); return item; } }
常见实现子类如下图:
将处理后的数据写入目标位置,支持批量写入。定义如下:
public interface ItemWriter<T> { void write(List<? extends T> items) throws Exception; }
常见实现类:
JdbcBatchItemWriter:批量写入数据库。
FlatFileItemWriter:写入文本文件。
StaxEventItemWriter:写入 XML 文件。
JmsItemWriter:写入 JMS 队列。
用于执行单一操作的步骤,不使用分块处理。定义如下:
public interface Tasklet { RepeatStatus execute(StepContribution contribution, ChunkContext chunkContext) throws Exception; }
示例:
public class CustomTasklet implements Tasklet { @Override public RepeatStatus execute(StepContribution contribution, ChunkContext chunkContext) throws Exception { // 执行自定义任务(如文件清理、初始化等) System.out.println("执行自定义任务..."); return RepeatStatus.FINISHED; // 任务完成 } }
管理作业的元数据(如作业实例、执行状态、步骤执行等)。定义如下:
public interface JobRepository { void createJobExecution(String jobName, JobParameters jobParameters); JobExecution getLastJobExecution(String jobName, JobParameters jobParameters); // 其他管理方法... }
用于启动作业的接口。定义如下:
public interface JobLauncher { JobExecution run(Job job, JobParameters jobParameters) throws JobExecutionAlreadyRunningException, JobRestartException, JobInstanceAlreadyCompleteException, JobParametersInvalidException; }
DefaultTransactionAttribute:配置事务属性(如隔离级别、传播行为)。
PlatformTransactionManager:Spring 的事务管理器接口。
RetryPolicy:定义重试策略(如最大重试次数)。
SimpleRetryPolicy、TimeoutRetryPolicy 等实现类。
BackOffPolicy:定义退避策略(重试间隔时间)。
SkipPolicy:定义跳过策略(如可跳过的异常类型、跳过限制)。
LimitCheckingItemSkipPolicy:基于次数限制的跳过策略。
JobExecutionListener:监听作业执行生命周期。
StepExecutionListener:监听步骤执行生命周期。
ChunkListener、ItemReadListener、ItemProcessListener、ItemWriteListener:监听分块和读写处理事件。
TaskExecutorPartitionHandler:基于线程池的并行处理。
RemotePartitioningManagerStepBuilder:分布式并行处理。
使用 @Configuration 和 @EnableBatchProcessing 注解,如下:
@Configuration @EnableBatchProcessing public class BatchConfig { // 配置 Job、Step、Reader、Writer 等组件 }
传统的 Spring XML 配置方式,如下:
<job id="myJob"> <step id="step1"> <tasklet> <chunk reader="itemReader" processor="itemProcessor" writer="itemWriter" commit-interval="100"/> </tasklet> </step> </job>
通过 JobLauncher 启动作业,如下:
@Autowired private JobLauncher jobLauncher; @Autowired private Job myJob; public void runBatchJob() throws Exception { JobParameters jobParameters = new JobParametersBuilder() .addString("inputFile", "data/input.csv") .addDate("runDate", new Date()) .toJobParameters(); JobExecution execution = jobLauncher.run(myJob, jobParameters); System.out.println("作业状态: " + execution.getStatus()); }