前面章节介绍了 Spring Batch 的架构,下面将快速介绍的核心概念。理解这些概念是有效使用 Spring Batch 的关键。
作业(Job)可以理解为一个完整的批处理任务,是 Spring Batch 的顶层抽象。
一个作业由一个或多个步骤(Step)按顺序或条件组合而成。
作业的特性:
通过 JobBuilderFactory 创建,支持唯一命名(如 importUserJob)。
可配置 JobParameters(作业参数),用于标识作业实例(如日期、文件路径)。
支持作业实例(JobInstance) 和 作业执行(JobExecution) 的概念:
同一作业(Job)可通过不同参数创建多个实例(JobInstance)。
每个实例可执行多次(每次执行对应一个 JobExecution)。
步骤(Step)是作业的最小执行单元,包含完整的“读-处理-写”流程。
通常一个步骤由三部分组成:
ItemReader:数据读取器。
ItemProcessor:数据处理器(可选)。
ItemWriter:数据写入器。
步骤的特性:
通过 StepBuilderFactory 创建,支持唯一命名(如 step1)。
支持分块处理(Chunk):每次读取 commit-interval 条记录,处理后批量写入。
支持事务管理:每个 Chunk(分块)作为一个事务单元。
简单说就是将数据分块读取、处理和写出。在 Spring Batch 中,Chunk 是实现批处理的核心模式,它将大量数据按固定大小分块处理,确保高效且可靠的数据处理。
Chunk 是一种 “读 - 处理 - 写” 循环模式,每次处理固定数量(commit-interval)的记录,作为一个事务单元。
核心流程如下:
读取 N 条记录 → 处理 N 条记录 → 写入 N 条记录 → 提交事务
ItemReader 用来从数据源读取数据,Spring Batch 支持多种数据源:
关系型数据库(JDBC、JPA)。
文件(CSV、XML、JSON)。
消息队列(RabbitMQ、Kafka)。
自定义数据源(如 REST API)。
实现方式:
内置实现:JdbcCursorItemReader、FlatFileItemReader 等。
自定义实现:通过实现 ItemReader 接口。
ItemProcessor 用来对 ItemReader 读取的数据进行转换、过滤或业务处理。
ItemProcessor 的特性:
输入类型与输出类型可不同(如 ItemProcessor<Input, Output>)。
支持返回 null 以过滤不需要的数据(返回数据为 null,将不会传递给写入器)。
示例场景:
数据格式转换(如字符串转日期)。
数据校验(如过滤无效记录)。
业务逻辑计算(如金额汇总)。
ItemWriter 用来将处理后(数据可以来源 ItemReader,也可以来源 ItemProcessor 处理后的数据)的数据写入目标系统,支持多种目标:
关系型数据库(JDBC、JPA)。
文件(CSV、XML、JSON)。
消息队列。
REST API。
ItemWriter 的特性:
批量写入:接收 List 类型数据,一次性处理多条记录。
事务安全:配合 Chunk 模式,确保数据一致性。
JobRepository 负责持久化(保存)批处理作业的元数据到数据库,确保作业状态可追溯、可恢复。包括:
作业实例(JobInstance)
作业执行(JobExecution)及其状态(如 STARTED、COMPLETED、FAILED)
步骤执行(StepExecution)及其进度
作业参数(JobParameters)
JobRepository 的存储介质:
默认使用关系型数据库(如 H2、MySQL、PostgreSQL)。
表结构包含 7 张核心表(如 BATCH_JOB_INSTANCE、BATCH_STEP_EXECUTION)。
JobRepository 的作用:
故障恢复:重启作业时可从上次失败的位置继续执行。
作业监控:通过查询作业仓库获取历史执行记录。
幂等性保证:相同参数的作业实例不会重复启动。
JobLauncher 是 Spring Batch 中触发作业执行的核心组件,负责启动 Job 并管理其生命周期。它主要负责:
根据 Job 和 JobParameters 创建 JobExecution。
调用作业仓库记录执行状态。
执行作业并返回结果。
JobLauncher 的实现方式:
内置实现:SimpleJobLauncher(默认)。
JobLauncher 支持同步 / 异步执行:
同步:调用线程等待作业完成。
异步:通过配置 TaskExecutor 实现多线程执行。
分块处理是 Spring Batch 核心特性之一,特别适合处理大量数据的场景。这种模式结合了事务管理和批处理的优势,既能高效处理数据,又能保证数据的完整性。
分块处理的核心思想:读 - 处理 - 写循环
每次从数据源读取一条记录,处理后暂存,直到达到指定的块大小(chunk size),然后将整个块的数据一次性写入目标位置。
并且以块为单位提交事务,若处理过程中发生异常,仅当前块的数据会回滚,已处理的块不受影响。
Chunk 模式:
每次读取 commit-interval 条记录(如 10 条)。
所有记录处理完成后,批量写入目标。
整个 Chunk 作为一个事务单元,失败时自动回滚。
分块处理优势:
内存效率高:无需一次性加载所有数据。
事务安全:确保数据一致性。
故障恢复:可从失败的 Chunk 重新开始。
重试机制(RetryPolicy)用于处理临时性异常(如网络波动、数据库连接超时等),提高批处理作业的稳定性和成功率。通过合理配置重试策略,可以避免因偶发异常导致整个作业失败,而是自动重试直到成功或达到最大重试次数。
重试的常见实现:
SimpleRetryPolicy:简单次数重试。
CircuitBreakerRetryPolicy:熔断机制,连续失败后暂停重试。
跳过机制允许在处理过程中遇到异常时跳过错误记录,继续处理后续数据,而不是让整个作业失败。这对于处理大量数据时的偶发错误特别有用,例如数据格式错误、无效记录等。
注意事项:
需权衡跳过记录对业务的影响。
可结合 SkipListener 记录被跳过的数据。
监听器可以用来监听作业 / 步骤 / Chunk 的生命周期事件,实现自定义逻辑(如日志记录、统计)。
监听器类型:
作业级监听器:
JobExecutionListener:监听作业开始 / 结束。
步骤级监听器:
StepExecutionListener:监听步骤开始 / 结束。
细粒度监听器:
ItemReadListener、ItemProcessListener、ItemWriteListener:监听读写处理过程。
ChunkListener:监听 Chunk 开始 / 结束。
监听器实现方式:
接口实现:实现对应监听器接口。
注解方式:使用 @BeforeStep、@AfterChunk 等注解。
由 JobName + JobParameters 唯一标识,代表一个逻辑上的批处理任务。例如:每月 1 日执行的 “工资计算” 作业是一个 JobInstance。
代表 JobInstance 的一次实际执行,记录执行状态(如 STARTED、FAILED)、开始 / 结束时间等。同一 JobInstance 可有多条 JobExecution 记录(如失败重试)。
每个 Step 对应一个或多个 StepExecution,记录步骤的执行状态和进度。包含处理的记录数、跳过的记录数等统计信息。
注意: Spring Batch 的核心概念围绕 作业编排、数据处理、状态管理 和 执行控制 展开。通过合理使用这些概念,开发者可以构建出高效、可靠、可扩展的企业级批处理系统。关键是理解各组件间的协作关系,如 Job 与 Step 的组合、Reader/Processor/Writer 的数据流、JobRepository 的状态管理等。结合 Spring Boot 的自动化配置,Spring Batch 能显著提升批处理开发的效率和质量。