Spring Batch4 核心 API

Spring Batch4 是一个强大的批处理框架,提供了丰富的 API 用于构建高效、可靠的批处理作业。以下是其核心 API 的详细介绍:

核心组件与接口

作业管理

  • Job:批处理作业的顶层接口,定义了作业的结构和执行逻辑。

  • JobInstance:作业的运行实例,由 Job + 参数唯一标识。

  • JobExecution:作业的一次具体执行,包含执行状态、开始时间、结束时间等信息。

  • JobParameters:作业运行时的参数,用于区分不同的 JobInstance。

步骤管理

  • Step:作业的执行单元,一个 Job 由多个 Step 组成。

  • StepExecution:Step 的一次具体执行,包含执行状态和统计信息。

数据处理

  • ItemReader:从数据源读取数据的接口。

  • ItemProcessor:处理 / 转换数据的接口。

  • ItemWriter:将数据写入目标位置的接口。

  • Chunk:数据块,包含多条记录,用于分块处理。

  

核心 API 详解

JobBuilderFactory & StepBuilderFactory

用于创建 Job 和 Step 的工厂类,是配置批处理作业的入口点。用法如下:

@Configuration
public class BatchConfig {

    @Autowired
    private JobBuilderFactory jobBuilderFactory;
    
    @Autowired
    private StepBuilderFactory stepBuilderFactory;
    
    // 创建Job
    Job myJob = jobBuilderFactory.get("myJob")
        .start(myStep1())
        .next(myStep2())
        .build();
    
    // 创建Step(分块处理示例)
    Step myStep1 = stepBuilderFactory.get("myStep1")
        .<Input,Output>chunk(100)  // 每100条记录为一个块
        .reader(itemReader())
        .processor(itemProcessor())
        .writer(itemWriter())
        .build();

}

  

ItemReader 接口

从各种数据源读取数据,支持迭代式读取。定义如下:

public interface ItemReader<T> {
    T read() throws Exception, UnexpectedInputException, ParseException;
}

常见实现类:

  • JdbcCursorItemReader:基于 JDBC 游标读取数据库记录。

  • FlatFileItemReader:读取文本文件(如 CSV、固定格式文件)。

  • StaxEventItemReader:读取 XML 文件。

  • JmsItemReader:从 JMS 队列读取消息。

image.png

  

ItemProcessor 接口

处理 / 转换数据,可选组件。定义如下:

public interface ItemProcessor<I, O> {
    O process(I item) throws Exception;
}

示例:

public class UserAgeProcessor implements ItemProcessor<User, User> {
    @Override
    public User process(User item) throws Exception {
        // 处理逻辑:年龄加倍
        item.setAge(item.getAge() * 2);
        return item;
    }
}

常见实现子类如下图:

image.png

  

ItemWriter 接口

将处理后的数据写入目标位置,支持批量写入。定义如下:

public interface ItemWriter<T> {
    void write(List<? extends T> items) throws Exception;
}

常见实现类:

  • JdbcBatchItemWriter:批量写入数据库。

  • FlatFileItemWriter:写入文本文件。

  • StaxEventItemWriter:写入 XML 文件。

  • JmsItemWriter:写入 JMS 队列。

image.png

  

Tasklet 接口

用于执行单一操作的步骤,不使用分块处理。定义如下:

public interface Tasklet {
    RepeatStatus execute(StepContribution contribution, ChunkContext chunkContext) throws Exception;
}

示例:

public class CustomTasklet implements Tasklet {
    @Override
    public RepeatStatus execute(StepContribution contribution, ChunkContext chunkContext) throws Exception {
        // 执行自定义任务(如文件清理、初始化等)
        System.out.println("执行自定义任务...");
        return RepeatStatus.FINISHED;  // 任务完成
    }
}

image.png  

  

JobRepository 接口

管理作业的元数据(如作业实例、执行状态、步骤执行等)。定义如下:

public interface JobRepository {
    void createJobExecution(String jobName, JobParameters jobParameters);
    JobExecution getLastJobExecution(String jobName, JobParameters jobParameters);
    // 其他管理方法...
}

  

JobLauncher 接口

用于启动作业的接口。定义如下:

public interface JobLauncher {
    JobExecution run(Job job, JobParameters jobParameters) throws JobExecutionAlreadyRunningException,
            JobRestartException, JobInstanceAlreadyCompleteException, JobParametersInvalidException;
}

  

高级特性 API

事务管理

  • DefaultTransactionAttribute:配置事务属性(如隔离级别、传播行为)。

  • PlatformTransactionManager:Spring 的事务管理器接口。

重试机制

  • RetryPolicy:定义重试策略(如最大重试次数)。

  • SimpleRetryPolicy、TimeoutRetryPolicy 等实现类。

  • BackOffPolicy:定义退避策略(重试间隔时间)。

跳过机制

  • SkipPolicy:定义跳过策略(如可跳过的异常类型、跳过限制)。

  • LimitCheckingItemSkipPolicy:基于次数限制的跳过策略。

监听器

  • JobExecutionListener:监听作业执行生命周期。

  • StepExecutionListener:监听步骤执行生命周期。

  • ChunkListener、ItemReadListener、ItemProcessListener、ItemWriteListener:监听分块和读写处理事件。

并行处理

  • TaskExecutorPartitionHandler:基于线程池的并行处理。

  • RemotePartitioningManagerStepBuilder:分布式并行处理。

  

配置方式

Java 配置(推荐)

使用 @Configuration 和 @EnableBatchProcessing 注解,如下:

@Configuration
@EnableBatchProcessing
public class BatchConfig {
    // 配置 Job、Step、Reader、Writer 等组件
}

XML 配置

传统的 Spring XML 配置方式,如下:

<job id="myJob">
    <step id="step1">
        <tasklet>
            <chunk reader="itemReader" processor="itemProcessor" writer="itemWriter" commit-interval="100"/>
        </tasklet>
    </step>
</job>

执行批处理作业

通过 JobLauncher 启动作业,如下:

@Autowired
private JobLauncher jobLauncher;

@Autowired
private Job myJob;

public void runBatchJob() throws Exception {
    JobParameters jobParameters = new JobParametersBuilder()
        .addString("inputFile", "data/input.csv")
        .addDate("runDate", new Date())
        .toJobParameters();
    
    JobExecution execution = jobLauncher.run(myJob, jobParameters);
    System.out.println("作业状态: " + execution.getStatus());
}

  

说说我的看法
全部评论(
没有评论
关于
本网站专注于 Java、数据库(MySQL、Oracle)、Linux、软件架构及大数据等多领域技术知识分享。涵盖丰富的原创与精选技术文章,助力技术传播与交流。无论是技术新手渴望入门,还是资深开发者寻求进阶,这里都能为您提供深度见解与实用经验,让复杂编码变得轻松易懂,携手共赴技术提升新高度。如有侵权,请来信告知:hxstrive@outlook.com
其他应用
公众号